Commit beec74b0 authored by Robert Schmidt's avatar Robert Schmidt

Correctly stop link_manager sender and receiver threads

This changes the destroy_link_manager() function to:
* call a new function message_get_unlock() to unlock the sending thread from
  its blocking message_get()
* calls pthread_cancel() on the receiving thread, because this unlocks any
  blocking read from a file descriptor

The same message_get_unlock() function has been added ringbuffer_queue to keep
the interfaces the same (it does nothing, because ringbuffer_queue is
non-blocking).
parent 0643d6b5
...@@ -50,11 +50,6 @@ static void *link_manager_sender_thread(void *_manager) ...@@ -50,11 +50,6 @@ static void *link_manager_sender_thread(void *_manager)
link_send_packet(manager->socket_link, data, size, manager->peer_addr, manager->peer_port); link_send_packet(manager->socket_link, data, size, manager->peer_addr, manager->peer_port);
free(data); free(data);
} }
// if (message_get(manager->send_queue, &data, &size, &priority))
// goto error;
//if (link_send_packet(manager->socket_link, data, size))
// goto error;
//free(data);
} }
LOG_D(MAC, "link manager sender thread quits\n"); LOG_D(MAC, "link manager sender thread quits\n");
...@@ -148,10 +143,11 @@ error: ...@@ -148,10 +143,11 @@ error:
void destroy_link_manager(link_manager_t *manager) void destroy_link_manager(link_manager_t *manager)
{ {
LOG_D(MAC, "destroying link manager\n");
manager->run = 0; manager->run = 0;
message_get_unlock(manager->send_queue);
pthread_join(manager->sender, NULL); pthread_join(manager->sender, NULL);
pthread_join(manager->receiver, NULL); /* cancel aborts the read performed in the receiver, then cancels the thread */
pthread_cancel(manager->receiver);
} }
#ifdef SERVER_TEST #ifdef SERVER_TEST
......
...@@ -56,6 +56,10 @@ message_queue_t *new_message_queue(void) ...@@ -56,6 +56,10 @@ message_queue_t *new_message_queue(void)
if (pthread_cond_init(ret->cond, NULL)) if (pthread_cond_init(ret->cond, NULL))
goto error; goto error;
ret->head = NULL;
ret->tail = NULL;
ret->exit = 0;
return ret; return ret;
error: error:
...@@ -121,10 +125,15 @@ int message_get(message_queue_t *queue, void **data, int *priority) ...@@ -121,10 +125,15 @@ int message_get(message_queue_t *queue, void **data, int *priority)
goto error; goto error;
while (queue->count == 0) { while (queue->count == 0) {
if (pthread_cond_wait(queue->cond, queue->mutex)) { int rc = pthread_cond_wait(queue->cond, queue->mutex);
if (rc != 0) {
pthread_mutex_unlock(queue->mutex); pthread_mutex_unlock(queue->mutex);
goto error; goto error;
} }
if (queue->exit) {
pthread_mutex_unlock(queue->mutex);
return 0;
}
} }
m = queue->head; m = queue->head;
...@@ -148,6 +157,14 @@ error: ...@@ -148,6 +157,14 @@ error:
return -1; return -1;
} }
void message_get_unlock(message_queue_t *queue)
{
pthread_mutex_lock(queue->mutex);
queue->exit = 1;
pthread_mutex_unlock(queue->mutex);
pthread_cond_signal(queue->cond);
}
/* when calling this function, the queue must not be used anymore (we don't lock it) */ /* when calling this function, the queue must not be used anymore (we don't lock it) */
/* we suppose that the data pointer in messages was allocated by malloc/calloc/realloc */ /* we suppose that the data pointer in messages was allocated by malloc/calloc/realloc */
void destroy_message_queue(message_queue_t *queue) void destroy_message_queue(message_queue_t *queue)
......
...@@ -51,11 +51,13 @@ typedef struct { ...@@ -51,11 +51,13 @@ typedef struct {
volatile int count; volatile int count;
pthread_mutex_t *mutex; pthread_mutex_t *mutex;
pthread_cond_t *cond; pthread_cond_t *cond;
int exit;
} message_queue_t; } message_queue_t;
message_queue_t *new_message_queue(void); message_queue_t *new_message_queue(void);
int message_put(message_queue_t *queue, void *data, int size, int priority); int message_put(message_queue_t *queue, void *data, int size, int priority);
int message_get(message_queue_t *queue, void **data, int *priority); int message_get(message_queue_t *queue, void **data, int *priority);
void message_get_unlock(message_queue_t *queue);
void destroy_message_queue(message_queue_t *queue); void destroy_message_queue(message_queue_t *queue);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -119,6 +119,11 @@ int message_get(message_queue_t *queue, void **data, int *priority) { ...@@ -119,6 +119,11 @@ int message_get(message_queue_t *queue, void **data, int *priority) {
return size; return size;
} }
void message_get_unlock(message_queue_t *queue) {
/* don't do anything, this function exists to unlock a message_queue but is
* not needed in the case of the ringbuffer_queue */
}
void destroy_message_queue(message_queue_t *queue) { void destroy_message_queue(message_queue_t *queue) {
struct lfds700_misc_prng_state ls; struct lfds700_misc_prng_state ls;
......
...@@ -48,6 +48,7 @@ typedef struct { ...@@ -48,6 +48,7 @@ typedef struct {
message_queue_t * new_message_queue(int size); message_queue_t * new_message_queue(int size);
int message_put(message_queue_t *queue, void *data, int size, int priority); int message_put(message_queue_t *queue, void *data, int size, int priority);
int message_get(message_queue_t *queue, void **data, int *priority); int message_get(message_queue_t *queue, void **data, int *priority);
void message_get_unlock(message_queue_t *queue);
void destroy_message_queue(message_queue_t *queue); void destroy_message_queue(message_queue_t *queue);
#endif /* RINGBUFFER_QUEUE_H */ #endif /* RINGBUFFER_QUEUE_H */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment