Commit edb209f6 authored by Cedric Roux's avatar Cedric Roux

much better mobipass integration

with this commit I get very stable connection
over long period of time (more than 30 minutes)

I had 0 fake_rrh disconnet/reconnect messages.

I had very rarely timeouts on data from mobipass
(uplink).

Almost 0 nacks. I had some uplink nacks, which were
most certainly due to the timeouts above.

I was doing full udp DL (33Mb/s) and tcp UL (was going
around 15Mb/s) at the same time, with 1 UE.

Next step is to cleanup, remove globals, integrate properly
as a driver and put this in develop.
parent 72e6e34f
...@@ -27,7 +27,7 @@ struct mobipass_header { ...@@ -27,7 +27,7 @@ struct mobipass_header {
uint32_t timestamp; uint32_t timestamp;
} __attribute__((__packed__)); } __attribute__((__packed__));
int trx_eth_start(openair0_device *device) { return 0;} int trx_eth_start(openair0_device *device) { init_mobipass(); return 0;}
int trx_eth_request(openair0_device *device, void *msg, ssize_t msg_len) { abort(); return 0;} int trx_eth_request(openair0_device *device, void *msg, ssize_t msg_len) { abort(); return 0;}
int trx_eth_reply(openair0_device *device, void *msg, ssize_t msg_len) { abort(); return 0;} int trx_eth_reply(openair0_device *device, void *msg, ssize_t msg_len) { abort(); return 0;}
int trx_eth_get_stats(openair0_device* device) { return(0); } int trx_eth_get_stats(openair0_device* device) { return(0); }
...@@ -37,10 +37,12 @@ int trx_eth_stop(openair0_device *device) { return(0); } ...@@ -37,10 +37,12 @@ int trx_eth_stop(openair0_device *device) { return(0); }
int trx_eth_set_freq(openair0_device* device, openair0_config_t *openair0_cfg,int exmimo_dump_config) { return(0); } int trx_eth_set_freq(openair0_device* device, openair0_config_t *openair0_cfg,int exmimo_dump_config) { return(0); }
int trx_eth_set_gains(openair0_device* device, openair0_config_t *openair0_cfg) { return(0); } int trx_eth_set_gains(openair0_device* device, openair0_config_t *openair0_cfg) { return(0); }
int da__write(openair0_device *device, openair0_timestamp timestamp, void **buff, int nsamps, int cc, int flags) { int mobipass_write(openair0_device *device, openair0_timestamp timestamp, void **buff, int nsamps, int cc, int flags) {
struct mobipass_header *mh = (struct mobipass_header *)(((char *)buff[0]) + 14); struct mobipass_header *mh = (struct mobipass_header *)(((char *)buff[0]) + 14);
static uint32_t last_timestamp = 4*7680*2-640; static uint32_t last_timestamp = 4*7680*2-640;
last_timestamp += 640; last_timestamp += 640;
last_timestamp %= SAMPLES_PER_1024_FRAMES;
mh->timestamp = htonl(ntohl(mh->timestamp) % SAMPLES_PER_1024_FRAMES);
if (last_timestamp != ntohl(mh->timestamp)) { printf("bad timestamp wanted %d got %d\n", last_timestamp, ntohl(mh->timestamp)); exit(1); } if (last_timestamp != ntohl(mh->timestamp)) { printf("bad timestamp wanted %d got %d\n", last_timestamp, ntohl(mh->timestamp)); exit(1); }
//printf("__write nsamps %d timestamps %ld seqno %d (packet timestamp %d)\n", nsamps, timestamp, mh->seqno, ntohl(mh->timestamp)); //printf("__write nsamps %d timestamps %ld seqno %d (packet timestamp %d)\n", nsamps, timestamp, mh->seqno, ntohl(mh->timestamp));
if (nsamps != 640) abort(); if (nsamps != 640) abort();
...@@ -48,12 +50,13 @@ int da__write(openair0_device *device, openair0_timestamp timestamp, void **buff ...@@ -48,12 +50,13 @@ int da__write(openair0_device *device, openair0_timestamp timestamp, void **buff
return nsamps; return nsamps;
} }
int da__read(openair0_device *device, openair0_timestamp *timestamp, void **buff, int nsamps, int cc) { int mobipass_read(openair0_device *device, openair0_timestamp *timestamp, void **buff, int nsamps, int cc) {
static uint32_t ts = 0; static uint32_t ts = 0;
static unsigned char seqno = 0; static unsigned char seqno = 0;
//printf("__read nsamps %d return timestamp %d\n", nsamps, ts); //printf("__read nsamps %d return timestamp %d\n", nsamps, ts);
*timestamp = htonl(ts); *timestamp = htonl(ts);
ts += nsamps; ts += nsamps;
ts %= SAMPLES_PER_1024_FRAMES;
if (nsamps != 640) { printf("bad nsamps %d, should be 640\n", nsamps); fflush(stdout); abort(); } if (nsamps != 640) { printf("bad nsamps %d, should be 640\n", nsamps); fflush(stdout); abort(); }
dequeue_from_mobipass(ntohl(*timestamp), buff[0]); dequeue_from_mobipass(ntohl(*timestamp), buff[0]);
...@@ -74,7 +77,7 @@ int da__read(openair0_device *device, openair0_timestamp *timestamp, void **buff ...@@ -74,7 +77,7 @@ int da__read(openair0_device *device, openair0_timestamp *timestamp, void **buff
int transport_init(openair0_device *device, openair0_config_t *openair0_cfg, int transport_init(openair0_device *device, openair0_config_t *openair0_cfg,
eth_params_t * eth_params ) eth_params_t * eth_params )
{ {
init_mobipass(); //init_mobipass();
eth_state_t *eth = (eth_state_t*)malloc(sizeof(eth_state_t)); eth_state_t *eth = (eth_state_t*)malloc(sizeof(eth_state_t));
memset(eth, 0, sizeof(eth_state_t)); memset(eth, 0, sizeof(eth_state_t));
...@@ -96,8 +99,8 @@ int transport_init(openair0_device *device, openair0_config_t *openair0_cfg, ...@@ -96,8 +99,8 @@ int transport_init(openair0_device *device, openair0_config_t *openair0_cfg,
device->trx_stop_func = trx_eth_stop; device->trx_stop_func = trx_eth_stop;
device->trx_set_freq_func = trx_eth_set_freq; device->trx_set_freq_func = trx_eth_set_freq;
device->trx_set_gains_func = trx_eth_set_gains; device->trx_set_gains_func = trx_eth_set_gains;
device->trx_write_func = da__write; device->trx_write_func = mobipass_write;
device->trx_read_func = da__read; device->trx_read_func = mobipass_read;
eth->if_name = eth_params->local_if_name; eth->if_name = eth_params->local_if_name;
device->priv = eth; device->priv = eth;
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include <pthread.h> #include <pthread.h>
#include "queues.h" #include "queues.h"
#include "mobipass.h"
/******************************************************************/ /******************************************************************/
/* time begin */ /* time begin */
...@@ -36,6 +37,11 @@ static void init_time(void) ...@@ -36,6 +37,11 @@ static void init_time(void)
static void synch_time(uint32_t ts) static void synch_time(uint32_t ts)
{ {
static uint32_t last_ts = 0;
static uint64_t mega_ts = 0;
if (ts < last_ts) mega_ts++;
last_ts = ts;
struct timespec now; struct timespec now;
if (clock_gettime(CLOCK_MONOTONIC_RAW, &now)) abort(); if (clock_gettime(CLOCK_MONOTONIC_RAW, &now)) abort();
...@@ -47,16 +53,16 @@ static void synch_time(uint32_t ts) ...@@ -47,16 +53,16 @@ static void synch_time(uint32_t ts)
/* 15360000 samples/second, in nanoseconds: /* 15360000 samples/second, in nanoseconds:
* = 15360000 / 1000000000 = 1536 / 100000 = 48 / 3125*/ * = 15360000 / 1000000000 = 1536 / 100000 = 48 / 3125*/
uint64_t ts_ns = (uint64_t)ts * (uint64_t)3125 / (uint64_t)48; uint64_t ts_ns = ((uint64_t)ts + mega_ts * (uint64_t)SAMPLES_PER_1024_FRAMES) * (uint64_t)3125 / (uint64_t)48;
//printf("tnow %lu t0 %lu ts %u ts_ns %lu\n", tnow, t0, ts, ts_ns); //printf("tnow %lu t0 %lu ts %u cur %lu ts_ns %lu mega_ts %lu\n", tnow, t0, ts, cur, ts_ns, mega_ts);
if (cur >= ts_ns) return; if (cur >= ts_ns) return;
uint64_t delta = ts_ns - cur; uint64_t delta = ts_ns - cur;
/* don't sleep more than 1 ms */ /* don't sleep more than 1 ms */
if (delta > 1000*1000) delta = 1000*1000; if (delta > 1000*1000) delta = 1000*1000;
delta = delta/1000; delta = delta/1000;
printf("ts %u delta %lu\n", ts, delta); //printf("ts %u delta %lu\n", ts, delta);
if (delta) usleep(delta); if (delta) usleep(delta);
} }
...@@ -87,14 +93,15 @@ static void receive(int sock, unsigned char *b) ...@@ -87,14 +93,15 @@ static void receive(int sock, unsigned char *b)
{ {
if (recv(sock, b, 14+14+1280, 0) != 14+14+1280) { perror("recv"); exit(1); } if (recv(sock, b, 14+14+1280, 0) != 14+14+1280) { perror("recv"); exit(1); }
struct mobipass_header *mh = (struct mobipass_header *)(b+14); struct mobipass_header *mh = (struct mobipass_header *)(b+14);
mh->timestamp = htonl(ntohl(mh->timestamp)-45378/*40120*/); mh->timestamp = htonl((ntohl(mh->timestamp)-45378/*40120*/) % SAMPLES_PER_1024_FRAMES);
//printf("recv timestamp %u\n", ntohl(mh->timestamp));
} }
void mobipass_send(void *data) void mobipass_send(void *data)
{ {
//printf("SEND seqno %d ts %d\n", seqno, ts); //printf("SEND seqno %d ts %d\n", seqno, ts);
struct ethernet_header *eh = (struct ethernet_header *)data; struct ethernet_header *eh = (struct ethernet_header *)data;
//struct mobipass_header *mh = (struct mobipass_header *)(data+14); //struct mobipass_header *mh = (struct mobipass_header *)(data+14);
//printf("SEND seqno %d ts %d\n", mh->seqno, ntohl(mh->timestamp)); //printf("SEND seqno %d ts %d\n", mh->seqno, ntohl(mh->timestamp));
eh->dst[0] = 0x00; eh->dst[0] = 0x00;
...@@ -169,10 +176,12 @@ static void *receiver(void *_) ...@@ -169,10 +176,12 @@ static void *receiver(void *_)
void dosend(int sock, int seqno, uint32_t ts) void dosend(int sock, int seqno, uint32_t ts)
{ {
//printf("SEND seqno %d ts %d\n", seqno, ts);
struct ethernet_header *eh = (struct ethernet_header *)packet; struct ethernet_header *eh = (struct ethernet_header *)packet;
struct mobipass_header *mh = (struct mobipass_header *)(packet+14); struct mobipass_header *mh = (struct mobipass_header *)(packet+14);
ts %= SAMPLES_PER_1024_FRAMES;
//printf("SEND seqno %d ts %d\n", seqno, ts);
eh->dst[0] = 0x00; eh->dst[0] = 0x00;
eh->dst[1] = 0x21; eh->dst[1] = 0x21;
eh->dst[2] = 0x5e; eh->dst[2] = 0x5e;
...@@ -220,6 +229,7 @@ void *sender(void *_) ...@@ -220,6 +229,7 @@ void *sender(void *_)
dosend(sock, seqno, ts); dosend(sock, seqno, ts);
seqno++; seqno++;
ts += 640; ts += 640;
ts %= SAMPLES_PER_1024_FRAMES;
} }
} }
......
...@@ -4,4 +4,7 @@ ...@@ -4,4 +4,7 @@
void mobipass_send(void *data); void mobipass_send(void *data);
void init_mobipass(void); void init_mobipass(void);
/* TODO: following variable only works for 10MHz */
#define SAMPLES_PER_1024_FRAMES (7680*2*10*1024)
#endif /* _MOBIPASS_H_ */ #endif /* _MOBIPASS_H_ */
#include "queues.h" #include "queues.h"
#include "mobipass.h"
#include <pthread.h> #include <pthread.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <sys/time.h>
#define QSIZE 10000 #define QSIZE 10000
...@@ -43,6 +47,11 @@ static void enqueue(void *data, struct queue *q) ...@@ -43,6 +47,11 @@ static void enqueue(void *data, struct queue *q)
memcpy(q->buf[pos], data, 14+14+640*2); memcpy(q->buf[pos], data, 14+14+640*2);
q->len++; q->len++;
//if (q == &from_mobipass) {
//struct mobipass_header *mh = (struct mobipass_header *)((char*)data+14);
//printf("recv timestamp %u in pos %d\n", ntohl(mh->timestamp), pos);
//}
done: done:
if (pthread_cond_signal(&q->cond)) abort(); if (pthread_cond_signal(&q->cond)) abort();
if (pthread_mutex_unlock(&q->mutex)) abort(); if (pthread_mutex_unlock(&q->mutex)) abort();
...@@ -70,10 +79,23 @@ void dequeue_to_mobipass(uint32_t timestamp, void *data) ...@@ -70,10 +79,23 @@ void dequeue_to_mobipass(uint32_t timestamp, void *data)
void enqueue_from_mobipass(void *data) void enqueue_from_mobipass(void *data)
{ {
struct mobipass_header *mh = (struct mobipass_header *)((char*)data+14); struct mobipass_header *mh = (struct mobipass_header *)((char*)data+14);
//printf("from mobipass! timestamp %u seqno %d\n", ntohl(mh->timestamp), mh->seqno);
mh->timestamp = htonl(ntohl(mh->timestamp) % SAMPLES_PER_1024_FRAMES);
//printf("from mobipass! timestamp %u seqno %d\n", ntohl(mh->timestamp), mh->seqno); //printf("from mobipass! timestamp %u seqno %d\n", ntohl(mh->timestamp), mh->seqno);
enqueue(data, &from_mobipass); enqueue(data, &from_mobipass);
} }
static int cmp_timestamps(uint32_t a, uint32_t b)
{
if (a == b) return 0;
if (a < b) {
if (b-a > SAMPLES_PER_1024_FRAMES/2) return 1;
return -1;
}
if (a-b > SAMPLES_PER_1024_FRAMES/2) return -1;
return 1;
}
/* to be called with lock on */ /* to be called with lock on */
static void get_sample_from_mobipass(char *I, char *Q, uint32_t timestamp) static void get_sample_from_mobipass(char *I, char *Q, uint32_t timestamp)
{ {
...@@ -82,19 +104,37 @@ static void get_sample_from_mobipass(char *I, char *Q, uint32_t timestamp) ...@@ -82,19 +104,37 @@ static void get_sample_from_mobipass(char *I, char *Q, uint32_t timestamp)
struct mobipass_header *mh = NULL; struct mobipass_header *mh = NULL;
uint32_t packet_timestamp = 0; uint32_t packet_timestamp = 0;
uint32_t old_start = from_mobipass.start;
uint32_t old_len = from_mobipass.len;
b = from_mobipass.buf[from_mobipass.start];
mh = (struct mobipass_header *)(b+14);
uint32_t old_pts = from_mobipass.len ? ntohl(mh->timestamp) : -1;
b=NULL;
mh=NULL;
while (from_mobipass.len) { while (from_mobipass.len) {
b = from_mobipass.buf[from_mobipass.start]; b = from_mobipass.buf[from_mobipass.start];
mh = (struct mobipass_header *)(b+14); mh = (struct mobipass_header *)(b+14);
data = b + 14*2; data = b + 14*2;
packet_timestamp = ntohl(mh->timestamp); packet_timestamp = ntohl(mh->timestamp);
if (timestamp < packet_timestamp) goto nodata; //printf("cmp A %u pt %u start %d\n", timestamp, packet_timestamp, from_mobipass.start);
if (timestamp < packet_timestamp+640) break; if (cmp_timestamps(timestamp, packet_timestamp) < 0) goto nodata;
//printf("cmp B %u pt %u\n", timestamp, packet_timestamp);
if (cmp_timestamps(timestamp, (packet_timestamp+640) % SAMPLES_PER_1024_FRAMES) < 0) break;
//printf("cmp C %u pt %u\n", timestamp, packet_timestamp);
from_mobipass.len--; from_mobipass.len--;
from_mobipass.start = (from_mobipass.start+1) % QSIZE; from_mobipass.start = (from_mobipass.start+1) % QSIZE;
} }
if (from_mobipass.len == 0) goto nodata; if (from_mobipass.len == 0) goto nodata;
if (timestamp == (packet_timestamp + 639) % SAMPLES_PER_1024_FRAMES) {
from_mobipass.len--;
from_mobipass.start = (from_mobipass.start+1) % QSIZE;
}
if (timestamp < packet_timestamp) timestamp += SAMPLES_PER_1024_FRAMES;
*I = data[(timestamp - packet_timestamp) * 2]; *I = data[(timestamp - packet_timestamp) * 2];
*Q = data[(timestamp - packet_timestamp) * 2 + 1]; *Q = data[(timestamp - packet_timestamp) * 2 + 1];
...@@ -103,16 +143,31 @@ static void get_sample_from_mobipass(char *I, char *Q, uint32_t timestamp) ...@@ -103,16 +143,31 @@ static void get_sample_from_mobipass(char *I, char *Q, uint32_t timestamp)
nodata: nodata:
*I = 0; *I = 0;
*Q = 0; *Q = 0;
printf("no sample timestamp %u pt %u start %d old_start %d old_pt %u len %d old len %d\n", timestamp, packet_timestamp, from_mobipass.start, old_start, old_pts, from_mobipass.len, old_len);
}
/* doesn't work with delay more than 1s */
static void wait_for_data(pthread_cond_t *cond, pthread_mutex_t *mutex, int delay_us)
{
struct timeval now;
struct timespec target;
gettimeofday(&now, NULL);
target.tv_sec = now.tv_sec;
target.tv_nsec = (now.tv_usec + delay_us) * 1000;
if (target.tv_nsec >= 1000 * 1000 * 1000) { target.tv_nsec -= 1000 * 1000 * 1000; target.tv_sec++; }
int err = pthread_cond_timedwait(cond, mutex, &target);
if (err != 0 && err != ETIMEDOUT) { printf("pthread_cond_timedwait: err (%d) %s\n", err, strerror(err)); abort(); }
} }
void dequeue_from_mobipass(uint32_t timestamp, void *data) void dequeue_from_mobipass(uint32_t timestamp, void *data)
{ {
int i; int i;
int ts = timestamp; // int ts = timestamp;
int not_empty; int waiting_allowed;
#if 0
if (pthread_mutex_lock(&from_mobipass.mutex)) abort(); if (pthread_mutex_lock(&from_mobipass.mutex)) abort();
#if 0
printf("want dequeue ts %u queue (start %d len %d): [", timestamp, from_mobipass.start, from_mobipass.len); printf("want dequeue ts %u queue (start %d len %d): [", timestamp, from_mobipass.start, from_mobipass.len);
for (i = 0; i < from_mobipass.len; i++) { for (i = 0; i < from_mobipass.len; i++) {
unsigned char *b = NULL; unsigned char *b = NULL;
...@@ -126,23 +181,27 @@ printf("]\n"); ...@@ -126,23 +181,27 @@ printf("]\n");
#endif #endif
if (from_mobipass.len == 0) { if (from_mobipass.len == 0) {
if (pthread_mutex_unlock(&from_mobipass.mutex)) abort(); //if (pthread_mutex_unlock(&from_mobipass.mutex)) abort();
usleep(1000/3); printf("sleep 1\n");
if (pthread_mutex_lock(&from_mobipass.mutex)) abort(); //usleep(1000/3);
//if (pthread_mutex_lock(&from_mobipass.mutex)) abort();
wait_for_data(&from_mobipass.cond, &from_mobipass.mutex, 2000); //1000/3);
} }
not_empty = from_mobipass.len != 0; waiting_allowed = from_mobipass.len != 0;
for (i = 0; i < 640*2; i+=2) { for (i = 0; i < 640*2; i+=2) {
if (from_mobipass.len == 0 && not_empty) { if (from_mobipass.len == 0 && waiting_allowed) {
if (pthread_mutex_unlock(&from_mobipass.mutex)) abort(); //if (pthread_mutex_unlock(&from_mobipass.mutex)) abort();
usleep(1000/3); //printf("sleep 2\n");
if (pthread_mutex_lock(&from_mobipass.mutex)) abort(); //usleep(1000/3);
not_empty = from_mobipass.len != 0; //if (pthread_mutex_lock(&from_mobipass.mutex)) abort();
wait_for_data(&from_mobipass.cond, &from_mobipass.mutex, 2000); //1000/3);
waiting_allowed = from_mobipass.len != 0;
} }
get_sample_from_mobipass((char*)data + 14*2 + i, (char*)data + 14*2 + i+1, ts); get_sample_from_mobipass((char*)data + 14*2 + i, (char*)data + 14*2 + i+1, timestamp % SAMPLES_PER_1024_FRAMES);
ts++; timestamp++;
} }
if (pthread_mutex_unlock(&from_mobipass.mutex)) abort(); if (pthread_mutex_unlock(&from_mobipass.mutex)) abort();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment