Commit 8ca955f3 authored by Cedric Roux's avatar Cedric Roux

better forwarder

parent cb2454cc
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
#define PLOT_IQ_POINTS 1 #define PLOT_IQ_POINTS 1
#define PLOT_MINMAX 2 #define PLOT_MINMAX 2
void new_thread(void *(*f)(void *), void *data);
/* ... is { int count; int type; char *color; } for 'nplots' plots */ /* ... is { int count; int type; char *color; } for 'nplots' plots */
void *make_plot(int width, int height, char *title, int nplots, ...); void *make_plot(int width, int height, char *title, int nplots, ...);
void plot_set(void *plot, float *data, int len, int pos, int pp); void plot_set(void *plot, float *data, int len, int pos, int pp);
...@@ -23,5 +25,6 @@ void on_off(void *d, char *item, int *a, int onoff); ...@@ -23,5 +25,6 @@ void on_off(void *d, char *item, int *a, int onoff);
void *forwarder(char *ip, int port); void *forwarder(char *ip, int port);
void forward(void *forwarder, char *buf, int size); void forward(void *forwarder, char *buf, int size);
void forward_start_client(void *forwarder, int socket);
#endif /* _TRACER_DEFS_H_ */ #endif /* _TRACER_DEFS_H_ */
...@@ -4,11 +4,60 @@ ...@@ -4,11 +4,60 @@
#include <netinet/ip.h> #include <netinet/ip.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <unistd.h> #include <unistd.h>
#include <pthread.h>
typedef struct { typedef struct {
int s; int s;
int sc;
pthread_mutex_t lock;
} forward_data; } forward_data;
static void do_forward(forward_data *f, int from, int to, int lock)
{
int l, len;
char *b;
char buf[1024];
while (1) {
len = read(from, buf, 1024);
if (len <= 0) break;
b = buf;
if (lock) if (pthread_mutex_lock(&f->lock)) abort();
while (len) {
l = write(to, b, len);
if (l <= 0) break;
len -= l;
b += l;
}
if (lock) if (pthread_mutex_unlock(&f->lock)) abort();
}
}
static void *forward_s_to_sc(void *_f)
{
forward_data *f = _f;
do_forward(f, f->s, f->sc, 0);
return NULL;
}
static void *forward_sc_to_s(void *_f)
{
forward_data *f = _f;
do_forward(f, f->sc, f->s, 1);
printf("INFO: forwarder exits\n");
return NULL;
}
void forward_start_client(void *_f, int s)
{
forward_data *f = _f;
f->sc = s;
new_thread(forward_s_to_sc, f);
new_thread(forward_sc_to_s, f);
}
void *forwarder(char *ip, int port) void *forwarder(char *ip, int port)
{ {
forward_data *f; forward_data *f;
...@@ -16,6 +65,8 @@ void *forwarder(char *ip, int port) ...@@ -16,6 +65,8 @@ void *forwarder(char *ip, int port)
f = malloc(sizeof(*f)); if (f == NULL) abort(); f = malloc(sizeof(*f)); if (f == NULL) abort();
pthread_mutex_init(&f->lock, NULL);
f->s = socket(AF_INET, SOCK_STREAM, 0); f->s = socket(AF_INET, SOCK_STREAM, 0);
if (f->s == -1) { perror("socket"); exit(1); } if (f->s == -1) { perror("socket"); exit(1); }
...@@ -33,10 +84,14 @@ void forward(void *_forwarder, char *buf, int size) ...@@ -33,10 +84,14 @@ void forward(void *_forwarder, char *buf, int size)
{ {
forward_data *f = _forwarder; forward_data *f = _forwarder;
if (pthread_mutex_lock(&f->lock)) abort();
while (size) { while (size) {
int l = write(f->s, buf, size); int l = write(f->s, buf, size);
if (l <= 0) { printf("forward error\n"); exit(1); } if (l <= 0) { printf("forward error\n"); exit(1); }
size -= l; size -= l;
buf += l; buf += l;
} }
if (pthread_mutex_unlock(&f->lock)) abort();
} }
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <fcntl.h> #include <fcntl.h>
#include <math.h> #include <math.h>
#include <pthread.h>
#include "defs.h" #include "defs.h"
...@@ -368,6 +369,23 @@ void init_shm(void) ...@@ -368,6 +369,23 @@ void init_shm(void)
#endif /* T_USE_SHARED_MEMORY */ #endif /* T_USE_SHARED_MEMORY */
void new_thread(void *(*f)(void *), void *data)
{
pthread_t t;
pthread_attr_t att;
if (pthread_attr_init(&att))
{ fprintf(stderr, "pthread_attr_init err\n"); exit(1); }
if (pthread_attr_setdetachstate(&att, PTHREAD_CREATE_DETACHED))
{ fprintf(stderr, "pthread_attr_setdetachstate err\n"); exit(1); }
if (pthread_attr_setstacksize(&att, 10000000))
{ fprintf(stderr, "pthread_attr_setstacksize err\n"); exit(1); }
if (pthread_create(&t, &att, f, data))
{ fprintf(stderr, "pthread_create err\n"); exit(1); }
if (pthread_attr_destroy(&att))
{ fprintf(stderr, "pthread_attr_destroy err\n"); exit(1); }
}
void usage(void) void usage(void)
{ {
printf( printf(
...@@ -473,6 +491,8 @@ int main(int n, char **v) ...@@ -473,6 +491,8 @@ int main(int n, char **v)
if (remote_local) f = forwarder(remote_ip, remote_port); if (remote_local) f = forwarder(remote_ip, remote_port);
#endif #endif
if (remote_local) goto no_database;
if (database_filename == NULL) { if (database_filename == NULL) {
printf("ERROR: provide a database file (-d)\n"); printf("ERROR: provide a database file (-d)\n");
exit(1); exit(1);
...@@ -485,6 +505,10 @@ int main(int n, char **v) ...@@ -485,6 +505,10 @@ int main(int n, char **v)
if (do_list_groups) { list_groups(database); return 0; } if (do_list_groups) { list_groups(database); return 0; }
if (do_dump_database) { dump_database(database); return 0; } if (do_dump_database) { dump_database(database); return 0; }
for (i = 0; i < on_off_n; i++)
on_off(database, on_off_name[i], is_on, on_off_action[i]);
no_database:
if (do_xforms) { if (do_xforms) {
ul_plot = make_plot(512, 100, "UL Input Signal", 1, ul_plot = make_plot(512, 100, "UL Input Signal", 1,
7680*10, PLOT_VS_TIME, BLUE); 7680*10, PLOT_VS_TIME, BLUE);
...@@ -501,13 +525,18 @@ int main(int n, char **v) ...@@ -501,13 +525,18 @@ int main(int n, char **v)
10240, PLOT_MINMAX, BLUE); 10240, PLOT_MINMAX, BLUE);
} }
for (i = 0; i < on_off_n; i++)
on_off(database, on_off_name[i], is_on, on_off_action[i]);
#ifdef T_USE_SHARED_MEMORY #ifdef T_USE_SHARED_MEMORY
init_shm(); init_shm();
#endif #endif
s = get_connection("127.0.0.1", 2020); s = get_connection("127.0.0.1", 2020);
if (remote_local) {
#ifdef T_USE_SHARED_MEMORY
forward_start_client(f, s);
#endif
goto no_init_message;
}
/* send the first message - activate all traces */ /* send the first message - activate all traces */
t = 0; t = 0;
if (write(s, &t, 1) != 1) abort(); if (write(s, &t, 1) != 1) abort();
...@@ -518,6 +547,8 @@ int main(int n, char **v) ...@@ -518,6 +547,8 @@ int main(int n, char **v)
if (is_on[l]) if (is_on[l])
if (write(s, &l, sizeof(int)) != sizeof(int)) abort(); if (write(s, &l, sizeof(int)) != sizeof(int)) abort();
no_init_message:
/* read messages */ /* read messages */
while (1) { while (1) {
#ifdef T_USE_SHARED_MEMORY #ifdef T_USE_SHARED_MEMORY
......
...@@ -6,7 +6,6 @@ ...@@ -6,7 +6,6 @@
#include <pthread.h> #include <pthread.h>
#include <math.h> #include <math.h>
#include <unistd.h> #include <unistd.h>
#include <pthread.h>
#include <sys/select.h> #include <sys/select.h>
#include <stdarg.h> #include <stdarg.h>
...@@ -146,23 +145,6 @@ static void *plot_thread(void *_p) ...@@ -146,23 +145,6 @@ static void *plot_thread(void *_p)
return NULL; return NULL;
} }
static void new_thread(void *(*f)(void *), void *data)
{
pthread_t t;
pthread_attr_t att;
if (pthread_attr_init(&att))
{ fprintf(stderr, "pthread_attr_init err\n"); exit(1); }
if (pthread_attr_setdetachstate(&att, PTHREAD_CREATE_DETACHED))
{ fprintf(stderr, "pthread_attr_setdetachstate err\n"); exit(1); }
if (pthread_attr_setstacksize(&att, 10000000))
{ fprintf(stderr, "pthread_attr_setstacksize err\n"); exit(1); }
if (pthread_create(&t, &att, f, data))
{ fprintf(stderr, "pthread_create err\n"); exit(1); }
if (pthread_attr_destroy(&att))
{ fprintf(stderr, "pthread_attr_destroy err\n"); exit(1); }
}
void *make_plot(int width, int height, char *title, int nplots, ...) void *make_plot(int width, int height, char *title, int nplots, ...)
{ {
plot *p; plot *p;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment