Commit 6dd80ffd authored by Michael Cook's avatar Michael Cook

queue: add requeue, fix unqueue_matching

Synchronized with emane.git's implementation of this code which
includes a unit test.

This commit also fixes two bugs in unqueue_matching and one inb
unqueue:

- unqueue_matching was returning the wrong pointer when
  there was no match

- both unqueue_matching and unqueue were not clearing the vacated
  queue entry after removing the item
parent d449a4de
#include "queue.h"
#include "common/utils/LOG/log.h"
#include <string.h>
#include <assert.h>
#ifdef UNITTEST
#include <stdio.h>
#define LOG_ERROR(MSG) printf(MSG "\n")
#else
#include "common/utils/LOG/log.h"
#define LOG_ERROR(MSG) LOG_E(PHY, MSG "\n")
#endif
void init_queue(queue_t *q) {
memset(q, 0, sizeof(*q));
pthread_mutex_init(&q->mutex, NULL);
void init_queue(queue_t *q)
{
memset(q, 0, sizeof(*q));
pthread_mutex_init(&q->mutex, NULL);
}
bool put_queue(queue_t *q, void *item) {
if (pthread_mutex_lock(&q->mutex) != 0) {
LOG_E(PHY, "put_queue mutex_lock failed\n");
return false;
}
bool queued;
if (q->num_items >= MAX_QUEUE_SIZE) {
LOG_E(PHY, "Queue is full in put_queue\n");
queued = false;
} else {
q->items[q->write_index] = item;
q->write_index = (q->write_index + 1) % MAX_QUEUE_SIZE;
q->num_items++;
queued = true;
}
pthread_mutex_unlock(&q->mutex);
return queued;
bool put_queue(queue_t *q, void *item)
{
assert(item != NULL);
if (pthread_mutex_lock(&q->mutex) != 0)
{
LOG_ERROR("put_queue: mutex_lock failed");
return false;
}
bool queued;
if (q->num_items >= MAX_QUEUE_SIZE)
{
LOG_ERROR("put_queue: queue is full");
queued = false;
}
else
{
assert(q->items[q->write_index] == NULL);
q->items[q->write_index] = item;
q->write_index = (q->write_index + 1) % MAX_QUEUE_SIZE;
q->num_items++;
queued = true;
}
pthread_mutex_unlock(&q->mutex);
return queued;
}
void *get_queue(queue_t *q) {
void *get_queue(queue_t *q)
{
void *item = NULL;
if (pthread_mutex_lock(&q->mutex) != 0)
{
LOG_ERROR("get_queue: mutex_lock failed");
return NULL;
}
void *item = NULL;
if (pthread_mutex_lock(&q->mutex) != 0) {
LOG_E(PHY, "get_queue mutex_lock failed\n");
return NULL;
}
if (q->num_items > 0)
{
item = q->items[q->read_index];
assert(item != NULL);
q->items[q->read_index] = NULL;
q->read_index = (q->read_index + 1) % MAX_QUEUE_SIZE;
q->num_items--;
}
if (q->num_items > 0) {
item = q->items[q->read_index];
q->read_index = (q->read_index + 1) % MAX_QUEUE_SIZE;
q->num_items--;
}
pthread_mutex_unlock(&q->mutex);
return item;
}
pthread_mutex_unlock(&q->mutex);
return item;
bool requeue(queue_t *q, void *item)
{
assert(item != NULL);
if (pthread_mutex_lock(&q->mutex) != 0)
{
LOG_ERROR("requeue: mutex_lock failed");
return false;
}
bool queued;
if (q->num_items >= MAX_QUEUE_SIZE)
{
LOG_ERROR("requeue: queue is full");
queued = false;
}
else
{
q->read_index = (q->read_index + MAX_QUEUE_SIZE - 1) % MAX_QUEUE_SIZE;
assert(q->items[q->read_index] == NULL);
q->items[q->read_index] = item;
q->num_items++;
queued = true;
}
pthread_mutex_unlock(&q->mutex);
return queued;
}
void *unqueue(queue_t *q)
{
void *item = NULL;
if (pthread_mutex_lock(&q->mutex) != 0) {
LOG_E(PHY, "remove_from_back_of_queue mutex_lock failed\n");
return NULL;
}
if (q->num_items > 0) {
q->write_index = (q->write_index + MAX_QUEUE_SIZE - 1) % MAX_QUEUE_SIZE;
item = q->items[q->write_index];
q->num_items--;
}
pthread_mutex_unlock(&q->mutex);
return item;
void *item = NULL;
if (pthread_mutex_lock(&q->mutex) != 0) {
LOG_ERROR("unqueue: mutex_lock failed");
return NULL;
}
if (q->num_items > 0) {
q->write_index = (q->write_index + MAX_QUEUE_SIZE - 1) % MAX_QUEUE_SIZE;
item = q->items[q->write_index];
q->items[q->write_index] = NULL;
q->num_items--;
}
pthread_mutex_unlock(&q->mutex);
return item;
}
void *unqueue_matching(queue_t *q, queue_matcher_t *matcher, void *wanted)
{
if (pthread_mutex_lock(&q->mutex) != 0)
{
LOG_ERROR("unqueue_matching: mutex_lock failed");
return NULL;
}
void *item = NULL;
size_t endi = q->write_index;
for (size_t i = 0; i < q->num_items; i++)
{
endi = (endi + MAX_QUEUE_SIZE - 1) % MAX_QUEUE_SIZE;
void *candidate = q->items[endi];
if (matcher(wanted, candidate))
{
item = candidate;
// delete item from the queue and move other items down
for (;;)
{
size_t j = (endi + 1) % MAX_QUEUE_SIZE;
if (j == q->write_index)
{
q->items[endi] = NULL;
q->write_index = endi;
q->num_items--;
break;
}
q->items[endi] = q->items[j];
endi = j;
}
break;
}
}
pthread_mutex_unlock(&q->mutex);
return item;
}
......@@ -32,15 +32,30 @@
#define MAX_QUEUE_SIZE 512
typedef struct queue_t {
void *items[MAX_QUEUE_SIZE];
size_t read_index, write_index;
size_t num_items;
pthread_mutex_t mutex;
typedef struct queue_t
{
void *items[MAX_QUEUE_SIZE];
size_t read_index, write_index;
size_t num_items;
pthread_mutex_t mutex;
} queue_t;
void init_queue(queue_t *q);
bool put_queue(queue_t *q, void *item);
void *get_queue(queue_t *q);
/* Put the given item back onto this queue at the head.
(The next call to put_queue would return this item.)
Return true if successful, false if the queue was full */
bool requeue(queue_t *q, void *item);
/* Remove the last item queued.
Return the item or NULL if the queue was empty */
void *unqueue(queue_t *q);
typedef bool queue_matcher_t(void *wanted, void *candidate);
/* Unqueue the most recently queued item for watch `matcher(wanted, candidate)`
returns true where `candidate` is an item currently on the queue.
Returns the candidate item, or NULL if none matches */
void *unqueue_matching(queue_t *q, queue_matcher_t *matcher, void *wanted);
#include "queue.h"
#include <stdio.h>
#include <stdlib.h>
#define FAIL do { \
printf("\n*** FAILED at %s line %d\n", __FILE__, __LINE__); \
pass = false; \
} while (0)
#define EQUAL(A, B) do { \
if ((A) != (B)) \
FAIL; \
} while (0)
typedef uint32_t Thing_t; /* actual type doesn't matter */
static Thing_t things[MAX_QUEUE_SIZE];
static Thing_t thing1, thing2;
static bool matcher(void *wanted, void *candidate)
{
return wanted == candidate;
}
int main(void)
{
bool pass = true;
queue_t queue;
init_queue(&queue);
for (int i = 0; i < MAX_QUEUE_SIZE; ++i)
{
if (!put_queue(&queue, &things[i]))
{
FAIL;
}
}
/* queue is full */
if (put_queue(&queue, &thing1))
FAIL;
Thing_t *p;
for (int i = 0; i < MAX_QUEUE_SIZE; ++i)
{
p = get_queue(&queue);
EQUAL(p, &things[i]);
}
/* queue is empty */
p = get_queue(&queue);
EQUAL(p, NULL);
for (int i = 0; i < MAX_QUEUE_SIZE; ++i)
{
if (!put_queue(&queue, &things[i]))
{
FAIL;
}
}
p = get_queue(&queue);
EQUAL(p, &things[0]);
p = get_queue(&queue);
EQUAL(p, &things[1]);
if (!requeue(&queue, &thing1))
FAIL;
if (!requeue(&queue, &thing2))
FAIL;
p = get_queue(&queue);
EQUAL(p, &thing2);
p = get_queue(&queue);
EQUAL(p, &thing1);
if (!requeue(&queue, &things[1]))
FAIL;
if (!requeue(&queue, &things[0]))
FAIL;
for (int i = 0; i < MAX_QUEUE_SIZE / 2; ++i)
{
p = get_queue(&queue);
EQUAL(p, &things[i]);
}
for (int i = MAX_QUEUE_SIZE / 2; i < MAX_QUEUE_SIZE; ++i)
{
if (!put_queue(&queue, &things[i]))
FAIL;
}
p = get_queue(&queue);
EQUAL(p, &things[MAX_QUEUE_SIZE / 2]);
p = get_queue(&queue);
EQUAL(p, &things[MAX_QUEUE_SIZE / 2 + 1]);
// ---- unqueue ----
init_queue(&queue);
for (int i = 0; i < MAX_QUEUE_SIZE; ++i)
{
if (!put_queue(&queue, &things[i]))
{
FAIL;
}
}
for (int i = MAX_QUEUE_SIZE; --i >= 0;)
{
p = unqueue(&queue);
EQUAL(p, &things[i]);
EQUAL(queue.num_items, i);
}
EQUAL(queue.num_items, 0);
if (!put_queue(&queue, &thing1))
FAIL;
if (!put_queue(&queue, &thing2))
FAIL;
EQUAL(queue.num_items, 2);
p = get_queue(&queue);
EQUAL(p, &thing1);
p = get_queue(&queue);
EQUAL(p, &thing2);
// ---- unqueue_matching ----
init_queue(&queue);
// empty queue
p = unqueue_matching(&queue, matcher, &thing1);
EQUAL(p, NULL);
EQUAL(queue.num_items, 0);
// one item in queue
if (!put_queue(&queue, &thing1))
FAIL;
EQUAL(queue.num_items, 1);
p = unqueue_matching(&queue, matcher, &thing2);
EQUAL(p, NULL);
EQUAL(queue.num_items, 1);
p = unqueue_matching(&queue, matcher, &thing1);
EQUAL(p, &thing1);
EQUAL(queue.num_items, 0);
// fill the queue then remove every other item
for (int i = 0; i < MAX_QUEUE_SIZE; ++i)
{
if (!put_queue(&queue, &things[i]))
{
FAIL;
}
}
p = unqueue_matching(&queue, matcher, &thing1);
EQUAL(p, NULL);
for (int i = MAX_QUEUE_SIZE - 1; i >= 0; i -= 2)
{
p = unqueue_matching(&queue, matcher, &things[i]);
EQUAL(p, &things[i]);
}
EQUAL(queue.num_items, MAX_QUEUE_SIZE / 2);
p = unqueue_matching(&queue, matcher, &thing1);
EQUAL(p, NULL);
for (int i = 0; i < MAX_QUEUE_SIZE; i += 2)
{
p = get_queue(&queue);
EQUAL(p, &things[i]);
}
EQUAL(queue.num_items, 0);
// fill the queue then remove every third item
for (int i = 0; i < MAX_QUEUE_SIZE; ++i)
{
if (!put_queue(&queue, &things[i]))
{
FAIL;
}
}
p = unqueue_matching(&queue, matcher, &thing1);
EQUAL(p, NULL);
for (int i = 0; i < MAX_QUEUE_SIZE; i += 3)
{
p = unqueue_matching(&queue, matcher, &things[i]);
EQUAL(p, &things[i]);
}
EQUAL(queue.num_items, MAX_QUEUE_SIZE * 2 / 3);
p = unqueue_matching(&queue, matcher, &thing1);
EQUAL(p, NULL);
for (int i = 0; i < MAX_QUEUE_SIZE; ++i)
{
if (i % 3 == 0)
continue;
p = get_queue(&queue);
EQUAL(p, &things[i]);
}
EQUAL(queue.num_items, 0);
if (!pass)
return EXIT_FAILURE;
return EXIT_SUCCESS;
}
#!/bin/bash
opts=(
-Wall -Werror
-Wno-error=int-to-pointer-cast
-Wno-int-to-pointer-cast
-DUNITTEST
)
set -x
gcc "${opts[@]}" -fsanitize=address -o queue_test queue_test.c queue.c || exit
./queue_test || exit
gcc "${opts[@]}" -o queue_test queue_test.c queue.c || exit
valgrind ./queue_test || exit
: PASS
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment