Commit f846a9e5 authored by Cedric Roux's avatar Cedric Roux

forwarder uses a list to store messages

better in case of slow link between local and remote tracers
may smoke lots of memory though
parent 8ca955f3
...@@ -5,13 +5,61 @@ ...@@ -5,13 +5,61 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <unistd.h> #include <unistd.h>
#include <pthread.h> #include <pthread.h>
#include <string.h>
typedef struct databuf {
char *d;
int l;
struct databuf *next;
} databuf;
typedef struct { typedef struct {
int s; int s;
int sc; int sc;
pthread_mutex_t lock; pthread_mutex_t lock;
pthread_mutex_t datalock;
pthread_cond_t datacond;
databuf * volatile head, *tail;
} forward_data; } forward_data;
static void *data_sender(void *_f)
{
forward_data *f = _f;
databuf *cur;
char *buf, *b;
int size;
wait:
if (pthread_mutex_lock(&f->datalock)) abort();
while (f->head == NULL)
if (pthread_cond_wait(&f->datacond, &f->datalock)) abort();
cur = f->head;
buf = cur->d;
size = cur->l;
f->head = cur->next;
if (f->head == NULL) f->tail = NULL;
if (pthread_mutex_unlock(&f->datalock)) abort();
free(cur);
goto process;
process:
if (pthread_mutex_lock(&f->lock)) abort();
b = buf;
while (size) {
int l = write(f->s, b, size);
if (l <= 0) { printf("forward error\n"); exit(1); }
size -= l;
b += l;
}
if (pthread_mutex_unlock(&f->lock)) abort();
free(buf);
goto wait;
}
static void do_forward(forward_data *f, int from, int to, int lock) static void do_forward(forward_data *f, int from, int to, int lock)
{ {
int l, len; int l, len;
...@@ -66,6 +114,11 @@ void *forwarder(char *ip, int port) ...@@ -66,6 +114,11 @@ void *forwarder(char *ip, int port)
f = malloc(sizeof(*f)); if (f == NULL) abort(); f = malloc(sizeof(*f)); if (f == NULL) abort();
pthread_mutex_init(&f->lock, NULL); pthread_mutex_init(&f->lock, NULL);
pthread_mutex_init(&f->datalock, NULL);
pthread_cond_init(&f->datacond, NULL);
f->sc = -1;
f->head = f->tail = NULL;
f->s = socket(AF_INET, SOCK_STREAM, 0); f->s = socket(AF_INET, SOCK_STREAM, 0);
if (f->s == -1) { perror("socket"); exit(1); } if (f->s == -1) { perror("socket"); exit(1); }
...@@ -77,21 +130,27 @@ void *forwarder(char *ip, int port) ...@@ -77,21 +130,27 @@ void *forwarder(char *ip, int port)
if (connect(f->s, (struct sockaddr *)&a, sizeof(a)) == -1) if (connect(f->s, (struct sockaddr *)&a, sizeof(a)) == -1)
{ perror("connect"); exit(1); } { perror("connect"); exit(1); }
new_thread(data_sender, f);
return f; return f;
} }
void forward(void *_forwarder, char *buf, int size) void forward(void *_forwarder, char *buf, int size)
{ {
forward_data *f = _forwarder; forward_data *f = _forwarder;
databuf *new;
if (pthread_mutex_lock(&f->lock)) abort(); new = malloc(sizeof(*new)); if (new == NULL) abort();
while (size) { if (pthread_mutex_lock(&f->datalock)) abort();
int l = write(f->s, buf, size);
if (l <= 0) { printf("forward error\n"); exit(1); }
size -= l;
buf += l;
}
if (pthread_mutex_unlock(&f->lock)) abort(); new->d = malloc(size); if (new->d == NULL) abort();
memcpy(new->d, buf, size);
new->l = size;
new->next = NULL;
if (f->head == NULL) f->head = new;
f->tail = new;
if (pthread_cond_signal(&f->datacond)) abort();
if (pthread_mutex_unlock(&f->datalock)) abort();
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment