local_tracer.c 9.57 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
#include <stdio.h>
#include <string.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
11
#include <inttypes.h>
12
#include <signal.h>
13

14 15
#include "T.h"
#include "T_messages.txt.h"
Cedric Roux's avatar
Cedric Roux committed
16
#include "T_defs.h"
17
#include "T_IDs.h"
18

19
static T_cache_t *T_local_cache;
20 21 22 23 24 25 26 27 28 29
static int T_busylist_head;

typedef struct databuf {
  char *d;
  int l;
  struct databuf *next;
} databuf;

typedef struct {
  int socket_local;
30 31
  volatile int socket_remote;
  int remote_port;
32 33
  pthread_mutex_t lock;
  pthread_cond_t cond;
frtabu's avatar
frtabu committed
34
  databuf *volatile head, *tail;
35 36 37
  uint64_t memusage;
  uint64_t last_warning_memusage;
} forward_data;
38

39 40 41 42
/****************************************************************************/
/*                      utility functions                                   */
/****************************************************************************/

frtabu's avatar
frtabu committed
43
static void new_thread(void *(*f)(void *), void *data) {
44 45 46
  pthread_t t;
  pthread_attr_t att;

frtabu's avatar
frtabu committed
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
  if (pthread_attr_init(&att)) {
    fprintf(stderr, "pthread_attr_init err\n");
    exit(1);
  }

  if (pthread_attr_setdetachstate(&att, PTHREAD_CREATE_DETACHED)) {
    fprintf(stderr, "pthread_attr_setdetachstate err\n");
    exit(1);
  }

  if (pthread_attr_setstacksize(&att, 10000000)) {
    fprintf(stderr, "pthread_attr_setstacksize err\n");
    exit(1);
  }

  if (pthread_create(&t, &att, f, data)) {
    fprintf(stderr, "pthread_create err\n");
    exit(1);
  }

  if (pthread_attr_destroy(&att)) {
    fprintf(stderr, "pthread_attr_destroy err\n");
    exit(1);
  }
71 72
}

frtabu's avatar
frtabu committed
73
static int get_connection(char *addr, int port) {
74 75 76
  struct sockaddr_in a;
  socklen_t alen;
  int s, t;
Cedric Roux's avatar
Cedric Roux committed
77
  printf("T tracer: waiting for connection on %s:%d\n", addr, port);
78
  s = socket(AF_INET, SOCK_STREAM, 0);
frtabu's avatar
frtabu committed
79 80 81 82 83 84

  if (s == -1) {
    perror("socket");
    exit(1);
  }

85
  t = 1;
frtabu's avatar
frtabu committed
86 87 88 89 90

  if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &t, sizeof(int))) {
    perror("setsockopt");
    exit(1);
  }
91 92 93 94 95

  a.sin_family = AF_INET;
  a.sin_port = htons(port);
  a.sin_addr.s_addr = inet_addr(addr);

frtabu's avatar
frtabu committed
96 97 98 99 100 101 102 103 104 105
  if (bind(s, (struct sockaddr *)&a, sizeof(a))) {
    perror("bind");
    exit(1);
  }

  if (listen(s, 5)) {
    perror("bind");
    exit(1);
  }

106 107 108
  alen = sizeof(a);
  t = accept(s, (struct sockaddr *)&a, &alen);

frtabu's avatar
frtabu committed
109 110 111 112
  if (t == -1) {
    perror("accept");
    exit(1);
  }
113

frtabu's avatar
frtabu committed
114 115
  close(s);
  printf("T tracer: connected\n");
116 117 118
  return t;
}

119 120
static void forward(void *_forwarder, char *buf, int size);

frtabu's avatar
frtabu committed
121
void send_T_messages_txt(void *forwarder) {
122 123 124 125 126 127 128 129 130 131
  char buf[T_BUFFER_MAX];
  char *T_LOCAL_buf = buf;
  int T_LOCAL_size;
  unsigned char *src;
  int src_len;
  /* trace T_message.txt
   * Send several messages -1 with content followed by message -2.
   */
  src = T_messages_txt;
  src_len = T_messages_txt_len;
frtabu's avatar
frtabu committed
132

133 134
  while (src_len) {
    int send_size = src_len;
frtabu's avatar
frtabu committed
135

136 137
    if (send_size > T_PAYLOAD_MAXSIZE - sizeof(int))
      send_size = T_PAYLOAD_MAXSIZE - sizeof(int);
frtabu's avatar
frtabu committed
138

139 140 141
    /* TODO: be careful, we use internal T stuff, to rewrite? */
    T_LOCAL_size = 0;
    T_HEADER(T_ID(-1));
frtabu's avatar
frtabu committed
142 143 144 145 146
    T_PUT_buffer(1, ((T_buffer) {
addr:
(src), length:
      (send_size)
    }));
147 148 149 150
    forward(forwarder, buf, T_LOCAL_size);
    src += send_size;
    src_len -= send_size;
  }
frtabu's avatar
frtabu committed
151

152 153 154 155 156
  T_LOCAL_size = 0;
  T_HEADER(T_ID(-2));
  forward(forwarder, buf, T_LOCAL_size);
}

157 158 159 160
/****************************************************************************/
/*                      forward functions                                   */
/****************************************************************************/

frtabu's avatar
frtabu committed
161
static void *data_sender(void *_f) {
162 163 164 165 166
  forward_data *f = _f;
  databuf *cur;
  char *buf, *b;
  int size;
wait:
frtabu's avatar
frtabu committed
167

168
  if (pthread_mutex_lock(&f->lock)) abort();
frtabu's avatar
frtabu committed
169

170 171
  while (f->head == NULL)
    if (pthread_cond_wait(&f->cond, &f->lock)) abort();
frtabu's avatar
frtabu committed
172

173 174 175 176 177
  cur = f->head;
  buf = cur->d;
  size = cur->l;
  f->head = cur->next;
  f->memusage -= size;
frtabu's avatar
frtabu committed
178

179
  if (f->head == NULL) f->tail = NULL;
frtabu's avatar
frtabu committed
180

181
  if (pthread_mutex_unlock(&f->lock)) abort();
frtabu's avatar
frtabu committed
182

183 184 185 186
  free(cur);
  goto process;
process:
  b = buf;
frtabu's avatar
frtabu committed
187

188
  if (f->socket_remote != -1)
frtabu's avatar
frtabu committed
189 190 191 192 193 194 195 196 197 198 199 200
    while (size) {
      int l = write(f->socket_remote, b, size);

      if (l <= 0) {
        printf("T tracer: forward error\n");
        close(f->socket_remote);
        f->socket_remote = -1;
        break;
      }

      size -= l;
      b += l;
201
    }
202 203 204 205 206

  free(buf);
  goto wait;
}

frtabu's avatar
frtabu committed
207 208

static void *forward_remote_messages(void *_f) {
209 210 211 212 213 214 215 216 217 218 219 220 221 222
#define PUT(x) do { \
    if (bufsize == bufmaxsize) { \
      bufmaxsize += 4096; \
      buf = realloc(buf, bufmaxsize); \
      if (buf == NULL) abort(); \
    } \
    buf[bufsize] = x; \
    bufsize++; \
  } while (0)
#define PUT_BUF(x, l) do { \
    char *zz = (char *)(x); \
    int len = l; \
    while (len) { PUT(*zz); zz++; len--; } \
  } while (0)
223
  forward_data *f = _f;
224 225
  int from;
  int to;
226 227
  int l, len;
  char *b;
228 229 230 231
  char *buf = NULL;
  int bufsize = 0;
  int bufmaxsize = 0;
  char t;
232 233
again:

234
  while (1) {
235 236
    from = f->socket_remote;
    to = f->socket_local;
237 238
    bufsize = 0;
    /* let's read and process messages */
frtabu's avatar
frtabu committed
239 240 241 242
    len = read(from, &t, 1);

    if (len <= 0) goto dead;

Raphael Defosseux's avatar
Raphael Defosseux committed
243 244
    if (buf != NULL)
      PUT(t);
245 246

    switch (t) {
frtabu's avatar
frtabu committed
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
      case 0:
      case 1:

        /* message 0 and 1: get a length and then 'length' numbers */
        if (read(from, &len, sizeof(int)) != sizeof(int)) goto dead;

        PUT_BUF(&len, 4);

        while (len) {
          if (read(from, &l, sizeof(int)) != sizeof(int)) goto dead;

          PUT_BUF(&l, 4);
          len--;
        }

        break;

      case 2:
        break;
266

frtabu's avatar
frtabu committed
267 268 269 270
      default:
        printf("%s:%d:%s: unhandled message type %d\n",
               __FILE__, __LINE__, __FUNCTION__, t);
        abort();
271 272 273
    }

    b = buf;
frtabu's avatar
frtabu committed
274

275 276
    while (bufsize) {
      l = write(to, b, bufsize);
frtabu's avatar
frtabu committed
277

278
      if (l <= 0) abort();
frtabu's avatar
frtabu committed
279

280
      bufsize -= l;
281 282 283
      b += l;
    }
  }
284

285
dead:
286
  /* socket died, let's stop all traces and wait for another tracer */
287
  /* TODO: be careful with those write, they might write less than wanted */
288
  buf[0] = 1;
frtabu's avatar
frtabu committed
289

290
  if (write(to, buf, 1) != 1) abort();
frtabu's avatar
frtabu committed
291

292
  len = T_NUMBER_OF_IDS;
frtabu's avatar
frtabu committed
293

294
  if (write(to, &len, sizeof(int)) != sizeof(int)) abort();
frtabu's avatar
frtabu committed
295

296
  l = 0;
frtabu's avatar
frtabu committed
297

298 299
  while (len) {
    if (write(to, &l, sizeof(int)) != sizeof(int)) abort();
frtabu's avatar
frtabu committed
300

301 302 303 304
    len--;
  };

  close(f->socket_remote);
frtabu's avatar
frtabu committed
305

306
  f->socket_remote = get_connection("0.0.0.0", f->remote_port);
frtabu's avatar
frtabu committed
307

308
  send_T_messages_txt(f);
frtabu's avatar
frtabu committed
309

310 311
  goto again;

312 313 314
  return NULL;
}

frtabu's avatar
frtabu committed
315
static void *forwarder(int port, int s) {
316
  forward_data *f;
frtabu's avatar
frtabu committed
317
  f = malloc(sizeof(*f));
318

frtabu's avatar
frtabu committed
319
  if (f == NULL) abort();
320 321 322 323 324 325 326

  pthread_mutex_init(&f->lock, NULL);
  pthread_cond_init(&f->cond, NULL);
  f->socket_local = s;
  f->head = f->tail = NULL;
  f->memusage = 0;
  f->last_warning_memusage = 0;
Cedric Roux's avatar
Cedric Roux committed
327
  printf("T tracer: waiting for remote tracer on port %d\n", port);
328
  f->remote_port = port;
329
  f->socket_remote = get_connection("0.0.0.0", port);
330
  send_T_messages_txt(f);
331 332 333 334 335
  new_thread(data_sender, f);
  new_thread(forward_remote_messages, f);
  return f;
}

frtabu's avatar
frtabu committed
336
static void forward(void *_forwarder, char *buf, int size) {
337 338 339
  forward_data *f = _forwarder;
  int32_t ssize = size;
  databuf *new;
frtabu's avatar
frtabu committed
340
  new = malloc(sizeof(*new));
341

frtabu's avatar
frtabu committed
342
  if (new == NULL) abort();
343 344 345

  if (pthread_mutex_lock(&f->lock)) abort();

frtabu's avatar
frtabu committed
346 347 348 349
  new->d = malloc(size + 4);

  if (new->d == NULL) abort();

350 351 352 353 354
  /* put the size of the message at the head */
  memcpy(new->d, &ssize, 4);
  memcpy(new->d+4, buf, size);
  new->l = size+4;
  new->next = NULL;
frtabu's avatar
frtabu committed
355

356
  if (f->head == NULL) f->head = new;
frtabu's avatar
frtabu committed
357

358 359
  if (f->tail != NULL) f->tail->next = new;

frtabu's avatar
frtabu committed
360
  f->tail = new;
361
#if BASIC_SIMULATOR
frtabu's avatar
frtabu committed
362

363 364 365 366 367 368 369
  /* When runnng the basic simulator, the tracer may be too slow.
   * Let's not take too much memory in the tracee and
   * wait if there is too much data to send. 200MB is
   * arbitrary.
   */
  while (f->memusage > 200 * 1024 * 1024) {
    if (pthread_cond_signal(&f->cond)) abort();
frtabu's avatar
frtabu committed
370

371
    if (pthread_mutex_unlock(&f->lock)) abort();
frtabu's avatar
frtabu committed
372

373
    usleep(1000);
frtabu's avatar
frtabu committed
374

375 376 377
    if (pthread_mutex_lock(&f->lock)) abort();
  }

frtabu's avatar
frtabu committed
378
#endif /* BASIC_SIMULATOR */
379
  f->memusage += size+4;
frtabu's avatar
frtabu committed
380

381 382 383 384
  /* warn every 100MB */
  if (f->memusage > f->last_warning_memusage &&
      f->memusage - f->last_warning_memusage > 100000000) {
    f->last_warning_memusage += 100000000;
Cedric Roux's avatar
Cedric Roux committed
385
    printf("T tracer: WARNING: memory usage is over %"PRIu64"MB\n",
386
           f->last_warning_memusage / 1000000);
frtabu's avatar
frtabu committed
387 388
  } else if (f->memusage < f->last_warning_memusage &&
             f->last_warning_memusage - f->memusage > 100000000) {
389 390 391 392
    f->last_warning_memusage = (f->memusage/100000000) * 100000000;
  }

  if (pthread_cond_signal(&f->cond)) abort();
frtabu's avatar
frtabu committed
393

394 395 396 397 398 399 400
  if (pthread_mutex_unlock(&f->lock)) abort();
}

/****************************************************************************/
/*                      local functions                                     */
/****************************************************************************/

frtabu's avatar
frtabu committed
401
static void wait_message(void) {
Cedric Roux's avatar
Cedric Roux committed
402
  while ((T_local_cache[T_busylist_head].busy & 0x02) == 0) usleep(1000);
403 404
}

Cedric Roux's avatar
Cedric Roux committed
405
void T_local_tracer_main(int remote_port, int wait_for_tracer,
frtabu's avatar
frtabu committed
406
                         int local_socket, void *shm_array) {
407
  int s;
Cedric Roux's avatar
Cedric Roux committed
408 409
  int port = remote_port;
  int dont_wait = wait_for_tracer ? 0 : 1;
410
  void *f;
Cedric Roux's avatar
Cedric Roux committed
411

412
  /* write on a socket fails if the other end is closed and we get SIGPIPE */
Cedric Roux's avatar
Cedric Roux committed
413
  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
frtabu's avatar
frtabu committed
414 415
    printf("local tracer received SIGPIPE\n");
    abort();
Cedric Roux's avatar
Cedric Roux committed
416 417 418
  }

  T_local_cache = shm_array;
Cedric Roux's avatar
Cedric Roux committed
419
  s = local_socket;
Cedric Roux's avatar
Cedric Roux committed
420

421 422
  if (dont_wait) {
    char t = 2;
frtabu's avatar
frtabu committed
423

424 425
    if (write(s, &t, 1) != 1) abort();
  }
Cedric Roux's avatar
Cedric Roux committed
426

427
  f = forwarder(port, s);
Cedric Roux's avatar
Cedric Roux committed
428

429 430 431 432
  /* read messages */
  while (1) {
    wait_message();
    __sync_synchronize();
433 434 435
    forward(f, T_local_cache[T_busylist_head].buffer,
            T_local_cache[T_busylist_head].length);
    T_local_cache[T_busylist_head].busy = 0;
436 437 438 439
    T_busylist_head++;
    T_busylist_head &= T_CACHE_SIZE - 1;
  }
}