local_tracer.c 9.33 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
#include <stdio.h>
#include <string.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
11
#include <inttypes.h>
12
#include <signal.h>
13

14 15
#include "T.h"
#include "T_messages.txt.h"
Cedric Roux's avatar
Cedric Roux committed
16
#include "T_defs.h"
17
#include "T_IDs.h"
18

19
static T_cache_t *T_local_cache;
20 21 22 23 24 25 26 27 28 29
static int T_busylist_head;

typedef struct databuf {
  char *d;
  int l;
  struct databuf *next;
} databuf;

typedef struct {
  int socket_local;
30 31
  volatile int socket_remote;
  int remote_port;
32 33 34 35 36 37
  pthread_mutex_t lock;
  pthread_cond_t cond;
  databuf * volatile head, *tail;
  uint64_t memusage;
  uint64_t last_warning_memusage;
} forward_data;
38

39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
/****************************************************************************/
/*                      utility functions                                   */
/****************************************************************************/

static void new_thread(void *(*f)(void *), void *data)
{
  pthread_t t;
  pthread_attr_t att;

  if (pthread_attr_init(&att))
    { fprintf(stderr, "pthread_attr_init err\n"); exit(1); }
  if (pthread_attr_setdetachstate(&att, PTHREAD_CREATE_DETACHED))
    { fprintf(stderr, "pthread_attr_setdetachstate err\n"); exit(1); }
  if (pthread_attr_setstacksize(&att, 10000000))
    { fprintf(stderr, "pthread_attr_setstacksize err\n"); exit(1); }
  if (pthread_create(&t, &att, f, data))
    { fprintf(stderr, "pthread_create err\n"); exit(1); }
  if (pthread_attr_destroy(&att))
    { fprintf(stderr, "pthread_attr_destroy err\n"); exit(1); }
}

static int get_connection(char *addr, int port)
61 62 63 64 65
{
  struct sockaddr_in a;
  socklen_t alen;
  int s, t;

Cedric Roux's avatar
Cedric Roux committed
66
  printf("T tracer: waiting for connection on %s:%d\n", addr, port);
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84

  s = socket(AF_INET, SOCK_STREAM, 0);
  if (s == -1) { perror("socket"); exit(1); }
  t = 1;
  if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &t, sizeof(int)))
    { perror("setsockopt"); exit(1); }

  a.sin_family = AF_INET;
  a.sin_port = htons(port);
  a.sin_addr.s_addr = inet_addr(addr);

  if (bind(s, (struct sockaddr *)&a, sizeof(a))) { perror("bind"); exit(1); }
  if (listen(s, 5)) { perror("bind"); exit(1); }
  alen = sizeof(a);
  t = accept(s, (struct sockaddr *)&a, &alen);
  if (t == -1) { perror("accept"); exit(1); }
  close(s);

Cedric Roux's avatar
Cedric Roux committed
85
  printf("T tracer: connected\n");
86 87 88 89

  return t;
}

90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
static void forward(void *_forwarder, char *buf, int size);

void send_T_messages_txt(void *forwarder)
{
  char buf[T_BUFFER_MAX];
  char *T_LOCAL_buf = buf;
  int T_LOCAL_size;
  unsigned char *src;
  int src_len;

  /* trace T_message.txt
   * Send several messages -1 with content followed by message -2.
   */
  src = T_messages_txt;
  src_len = T_messages_txt_len;
  while (src_len) {
    int send_size = src_len;
    if (send_size > T_PAYLOAD_MAXSIZE - sizeof(int))
      send_size = T_PAYLOAD_MAXSIZE - sizeof(int);
    /* TODO: be careful, we use internal T stuff, to rewrite? */
    T_LOCAL_size = 0;
    T_HEADER(T_ID(-1));
    T_PUT_buffer(1, ((T_buffer){addr:(src), length:(send_size)}));
    forward(forwarder, buf, T_LOCAL_size);
    src += send_size;
    src_len -= send_size;
  }
  T_LOCAL_size = 0;
  T_HEADER(T_ID(-2));
  forward(forwarder, buf, T_LOCAL_size);
}

122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
/****************************************************************************/
/*                      forward functions                                   */
/****************************************************************************/

static void *data_sender(void *_f)
{
  forward_data *f = _f;
  databuf *cur;
  char *buf, *b;
  int size;

wait:
  if (pthread_mutex_lock(&f->lock)) abort();
  while (f->head == NULL)
    if (pthread_cond_wait(&f->cond, &f->lock)) abort();
  cur = f->head;
  buf = cur->d;
  size = cur->l;
  f->head = cur->next;
  f->memusage -= size;
  if (f->head == NULL) f->tail = NULL;
  if (pthread_mutex_unlock(&f->lock)) abort();
  free(cur);
  goto process;

process:
  b = buf;
149
  if (f->socket_remote != -1)
150 151
  while (size) {
    int l = write(f->socket_remote, b, size);
152
    if (l <= 0) {
Cedric Roux's avatar
Cedric Roux committed
153
      printf("T tracer: forward error\n");
154 155 156 157
      close(f->socket_remote);
      f->socket_remote = -1;
      break;
    }
158 159 160 161 162 163 164 165 166 167 168
    size -= l;
    b += l;
  }

  free(buf);

  goto wait;
}

static void *forward_remote_messages(void *_f)
{
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
#define PUT(x) do { \
    if (bufsize == bufmaxsize) { \
      bufmaxsize += 4096; \
      buf = realloc(buf, bufmaxsize); \
      if (buf == NULL) abort(); \
    } \
    buf[bufsize] = x; \
    bufsize++; \
  } while (0)
#define PUT_BUF(x, l) do { \
    char *zz = (char *)(x); \
    int len = l; \
    while (len) { PUT(*zz); zz++; len--; } \
  } while (0)

184
  forward_data *f = _f;
185 186
  int from;
  int to;
187 188
  int l, len;
  char *b;
189 190 191 192
  char *buf = NULL;
  int bufsize = 0;
  int bufmaxsize = 0;
  char t;
193 194 195

again:

196
  while (1) {
197 198
    from = f->socket_remote;
    to = f->socket_local;
199

200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
    bufsize = 0;

    /* let's read and process messages */
    len = read(from, &t, 1); if (len <= 0) goto dead;
    PUT(t);

    switch (t) {
    case 0:
    case 1:
      /* message 0 and 1: get a length and then 'length' numbers */
      if (read(from, &len, sizeof(int)) != sizeof(int)) goto dead;
      PUT_BUF(&len, 4);
      while (len) {
        if (read(from, &l, sizeof(int)) != sizeof(int)) goto dead;
        PUT_BUF(&l, 4);
        len--;
      }
      break;

    case 2: break;
Cedric Roux's avatar
Cedric Roux committed
220 221 222 223
    default:
      printf("%s:%d:%s: unhandled message type %d\n",
          __FILE__, __LINE__, __FUNCTION__, t);
      abort();
224 225 226 227 228
    }

    b = buf;
    while (bufsize) {
      l = write(to, b, bufsize);
229
      if (l <= 0) abort();
230
      bufsize -= l;
231 232 233
      b += l;
    }
  }
234

235
dead:
236
  /* socket died, let's stop all traces and wait for another tracer */
237
  /* TODO: be careful with those write, they might write less than wanted */
238 239 240 241 242 243 244 245 246 247 248
  buf[0] = 1;
  if (write(to, buf, 1) != 1) abort();
  len = T_NUMBER_OF_IDS;
  if (write(to, &len, sizeof(int)) != sizeof(int)) abort();
  l = 0;
  while (len) {
    if (write(to, &l, sizeof(int)) != sizeof(int)) abort();
    len--;
  };

  close(f->socket_remote);
249
  f->socket_remote = get_connection("0.0.0.0", f->remote_port);
250
  send_T_messages_txt(f);
251 252
  goto again;

253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
  return NULL;
}

static void *forwarder(int port, int s)
{
  forward_data *f;

  f = malloc(sizeof(*f)); if (f == NULL) abort();

  pthread_mutex_init(&f->lock, NULL);
  pthread_cond_init(&f->cond, NULL);

  f->socket_local = s;
  f->head = f->tail = NULL;

  f->memusage = 0;
  f->last_warning_memusage = 0;

Cedric Roux's avatar
Cedric Roux committed
271
  printf("T tracer: waiting for remote tracer on port %d\n", port);
272

273
  f->remote_port = port;
274
  f->socket_remote = get_connection("0.0.0.0", port);
275
  send_T_messages_txt(f);
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302

  new_thread(data_sender, f);
  new_thread(forward_remote_messages, f);

  return f;
}

static void forward(void *_forwarder, char *buf, int size)
{
  forward_data *f = _forwarder;
  int32_t ssize = size;
  databuf *new;

  new = malloc(sizeof(*new)); if (new == NULL) abort();

  if (pthread_mutex_lock(&f->lock)) abort();

  new->d = malloc(size + 4); if (new->d == NULL) abort();
  /* put the size of the message at the head */
  memcpy(new->d, &ssize, 4);
  memcpy(new->d+4, buf, size);
  new->l = size+4;
  new->next = NULL;
  if (f->head == NULL) f->head = new;
  if (f->tail != NULL) f->tail->next = new;
  f->tail = new;

303 304 305 306 307 308 309 310 311 312 313 314 315 316
#if BASIC_SIMULATOR
  /* When runnng the basic simulator, the tracer may be too slow.
   * Let's not take too much memory in the tracee and
   * wait if there is too much data to send. 200MB is
   * arbitrary.
   */
  while (f->memusage > 200 * 1024 * 1024) {
    if (pthread_cond_signal(&f->cond)) abort();
    if (pthread_mutex_unlock(&f->lock)) abort();
    usleep(1000);
    if (pthread_mutex_lock(&f->lock)) abort();
  }
#endif /* BASIC_SIMULATOR */

317 318 319 320 321
  f->memusage += size+4;
  /* warn every 100MB */
  if (f->memusage > f->last_warning_memusage &&
      f->memusage - f->last_warning_memusage > 100000000) {
    f->last_warning_memusage += 100000000;
Cedric Roux's avatar
Cedric Roux committed
322
    printf("T tracer: WARNING: memory usage is over %"PRIu64"MB\n",
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
           f->last_warning_memusage / 1000000);
  } else
  if (f->memusage < f->last_warning_memusage &&
      f->last_warning_memusage - f->memusage > 100000000) {
    f->last_warning_memusage = (f->memusage/100000000) * 100000000;
  }

  if (pthread_cond_signal(&f->cond)) abort();
  if (pthread_mutex_unlock(&f->lock)) abort();
}

/****************************************************************************/
/*                      local functions                                     */
/****************************************************************************/

static void wait_message(void)
339
{
Cedric Roux's avatar
Cedric Roux committed
340
  while ((T_local_cache[T_busylist_head].busy & 0x02) == 0) usleep(1000);
341 342
}

Cedric Roux's avatar
Cedric Roux committed
343
void T_local_tracer_main(int remote_port, int wait_for_tracer,
Cedric Roux's avatar
Cedric Roux committed
344
    int local_socket, void *shm_array)
345 346
{
  int s;
Cedric Roux's avatar
Cedric Roux committed
347 348
  int port = remote_port;
  int dont_wait = wait_for_tracer ? 0 : 1;
349
  void *f;
Cedric Roux's avatar
Cedric Roux committed
350

351
  /* write on a socket fails if the other end is closed and we get SIGPIPE */
Cedric Roux's avatar
Cedric Roux committed
352
  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
353 354
       printf("local tracer received SIGPIPE\n");
       abort();
Cedric Roux's avatar
Cedric Roux committed
355 356 357
  }

  T_local_cache = shm_array;
358

Cedric Roux's avatar
Cedric Roux committed
359
  s = local_socket;
Cedric Roux's avatar
Cedric Roux committed
360

361 362 363 364
  if (dont_wait) {
    char t = 2;
    if (write(s, &t, 1) != 1) abort();
  }
Cedric Roux's avatar
Cedric Roux committed
365

366
  f = forwarder(port, s);
Cedric Roux's avatar
Cedric Roux committed
367

368 369 370 371
  /* read messages */
  while (1) {
    wait_message();
    __sync_synchronize();
372 373 374
    forward(f, T_local_cache[T_busylist_head].buffer,
            T_local_cache[T_busylist_head].length);
    T_local_cache[T_busylist_head].busy = 0;
375 376 377 378
    T_busylist_head++;
    T_busylist_head &= T_CACHE_SIZE - 1;
  }
}