Commit 91f098ee authored by laurent's avatar laurent

add controls, code cleanup

parent 3bd7cd2d
This diff is collapsed.
...@@ -38,11 +38,13 @@ request_t * createRequest(enum request_t type,int size) { ...@@ -38,11 +38,13 @@ request_t * createRequest(enum request_t type,int size) {
} }
void freeRequest(request_t* request) { void freeRequest(request_t* request) {
//printf("freeing: %ld, %p\n", request->id, request);
free(request); free(request);
} }
volatile int ii=0;
int add_request(request_t* request, tpool_t * tp) { int add_request(request_t* request, tpool_t * tp) {
AssertFatal(pthread_mutex_lock(&tp->lockRequests)==0,""); mutexlock(tp->lockRequests);
if (tp->oldestRequests == NULL) if (tp->oldestRequests == NULL)
tp->oldestRequests = request; tp->oldestRequests = request;
else { else {
...@@ -50,22 +52,22 @@ int add_request(request_t* request, tpool_t * tp) { ...@@ -50,22 +52,22 @@ int add_request(request_t* request, tpool_t * tp) {
tp->newestRequests->next = request; tp->newestRequests->next = request;
} }
tp->newestRequests = request; tp->newestRequests = request;
AssertFatal(pthread_mutex_lock(&tp->lockReportDone)==0,""); mutexlock(tp->lockReportDone);
tp->notFinishedJobs++; tp->notFinishedJobs++;
//printf("added:%d\n", tp->notFinishedJobs); mutexunlock(tp->lockReportDone);
AssertFatal(pthread_mutex_unlock(&tp->lockReportDone)==0,""); condbroadcast(tp->notifRequest);
AssertFatal(pthread_cond_broadcast(&tp->notifRequest)==0,""); mutexunlock(tp->lockRequests);
AssertFatal(pthread_mutex_unlock(&tp->lockRequests)==0,"");
return 0; return 0;
} }
int add_requests(uint64_t request_num, tpool_t * tp) { int add_requests(uint64_t request_num, tpool_t * tp) {
request_t* request; request_t* request;
int nbToAdd=((uint32_t)lrand48())%20+1; int nbToAdd=((uint32_t)lrand48())%20+1;
mutexlock(tp->lockRequests);
for (int i=0; i<nbToAdd; i++) { for (int i=0; i<nbToAdd; i++) {
// simulate request // simulate request
request=createRequest(DECODE,sizeof(turboDecode_t)); request=createRequest(DECODE,sizeof(turboDecode_t));
union turboReqUnion id= {.s={request_num*100+i,1000,i*10,111,222}}; union turboReqUnion id= {.s={request_num,1000,i*10,111,222}};
request->id= id.p; request->id= id.p;
turboDecode_t * rdata=(turboDecode_t *) request->data; turboDecode_t * rdata=(turboDecode_t *) request->data;
rdata->function=phy_threegpplte_turbo_decoder8; rdata->function=phy_threegpplte_turbo_decoder8;
...@@ -80,6 +82,11 @@ int add_requests(uint64_t request_num, tpool_t * tp) { ...@@ -80,6 +82,11 @@ int add_requests(uint64_t request_num, tpool_t * tp) {
tp->newestRequests = request; tp->newestRequests = request;
} }
mutexlock(tp->lockReportDone);
tp->notFinishedJobs+=nbToAdd;
mutexunlock(tp->lockReportDone);
condbroadcast(tp->notifRequest);
mutexunlock(tp->lockRequests);
return nbToAdd; return nbToAdd;
} }
...@@ -117,8 +124,10 @@ request_t * get_request(tpool_t * tp, uint16_t threadID ) { ...@@ -117,8 +124,10 @@ request_t * get_request(tpool_t * tp, uint16_t threadID ) {
nnb++; nnb++;
r=r->next; r=r->next;
} }
//if ( ! ( nb == nnb && request == NULL)) /*
//printf("getr:was=%d,is=%d,gotit=%p\n",nb,nnb,request); if ( ! ( nb == nnb && request == NULL))
printf("getr:was=%d,is=%d,gotit=%p\n",nb,nnb,request);
*/
return request; return request;
} }
...@@ -185,15 +194,12 @@ void handle_request(tpool_t * tp, request_t* request) { ...@@ -185,15 +194,12 @@ void handle_request(tpool_t * tp, request_t* request) {
request->startProcessingTime=rdtsc(); request->startProcessingTime=rdtsc();
process_request(request); process_request(request);
request->endProcessingTime=rdtsc(); request->endProcessingTime=rdtsc();
AssertFatal(pthread_mutex_lock(&tp->lockReportDone)==0,""); mutexlock(tp->lockReportDone);
tp->notFinishedJobs--; tp->notFinishedJobs--;
//printf("Removed:%d\n",tp->notFinishedJobs);
request->next=tp->doneRequests; request->next=tp->doneRequests;
tp->doneRequests=request; tp->doneRequests=request;
//printf("signaling ..."); condsignal(tp->notifDone);
AssertFatal(pthread_cond_signal(&tp->notifDone)==0,""); mutexunlock(tp->lockReportDone);
//printf("...done\n");
AssertFatal(pthread_mutex_unlock(&tp->lockReportDone)==0,"");
/* /*
printf("Thread '%ld' handled request '%d' delay in µs:%ld\n", printf("Thread '%ld' handled request '%d' delay in µs:%ld\n",
syscall( SYS_gettid ), syscall( SYS_gettid ),
...@@ -224,13 +230,16 @@ void* one_thread(void* data) { ...@@ -224,13 +230,16 @@ void* one_thread(void* data) {
// Infinite loop to process requests // Infinite loop to process requests
do { do {
AssertFatal(pthread_mutex_lock(&tp->lockRequests)==0,""); mutexlock(tp->lockRequests);
request_t* request = get_request(tp, myThread->id); request_t* request = get_request(tp, myThread->id);
if (request == NULL) { if (request == NULL) {
AssertFatal(pthread_cond_wait(&tp->notifRequest,&tp->lockRequests)==0,""); condwait(tp->notifRequest,tp->lockRequests);
request = get_request(tp, myThread->id); request = get_request(tp, myThread->id);
} }
AssertFatal(pthread_mutex_unlock(&tp->lockRequests)==0,"");
mutexunlock(tp->lockRequests);
if (request!=NULL) { if (request!=NULL) {
strncpy(request->processedBy,myThread->name, 15); strncpy(request->processedBy,myThread->name, 15);
request->coreId=myThread->coreID; request->coreId=myThread->coreID;
...@@ -258,10 +267,11 @@ void init_tpool(char * params,tpool_t * pool) { ...@@ -258,10 +267,11 @@ void init_tpool(char * params,tpool_t * pool) {
sparam.sched_priority = sched_get_priority_max(SCHED_RR)-1; sparam.sched_priority = sched_get_priority_max(SCHED_RR)-1;
pthread_setschedparam(pthread_self(), SCHED_RR, &sparam); pthread_setschedparam(pthread_self(), SCHED_RR, &sparam);
pool->activated=true; pool->activated=true;
pthread_mutex_init(&pool->lockRequests,NULL); mutexinit(pool->lockRequests);
pthread_cond_init (&pool->notifRequest,NULL); condinit (pool->notifRequest);
pthread_mutex_init(&pool->lockReportDone,NULL); pool->notifCount=0;
pthread_cond_init (&pool->notifDone,NULL); mutexinit(pool->lockReportDone);
condinit (pool->notifDone);
pool->oldestRequests=NULL; pool->oldestRequests=NULL;
pool->newestRequests=NULL; pool->newestRequests=NULL;
pool->doneRequests=NULL; pool->doneRequests=NULL;
...@@ -346,12 +356,11 @@ int main(int argc, char* argv[]) { ...@@ -346,12 +356,11 @@ int main(int argc, char* argv[]) {
uint64_t i=1; uint64_t i=1;
// Test the lists // Test the lists
srand48(time(NULL)); srand48(time(NULL));
AssertFatal(pthread_mutex_lock(&pool.lockRequests)==0,"");
int nbRequest=add_requests(i, &pool); int nbRequest=add_requests(i, &pool);
printf("These should be: %d elements in the list\n",nbRequest); printf("These should be: %d elements in the list\n",nbRequest);
displayList(pool.oldestRequests, pool.newestRequests); displayList(pool.oldestRequests, pool.newestRequests);
// remove in middle // remove in middle
request_t *req106=searchRNTI(&pool.oldestRequests, &pool.newestRequests, 106); request_t *req106=searchRNTI(&pool, 106);
if (req106) { if (req106) {
union turboReqUnion id= {.p=req106->id}; union turboReqUnion id= {.p=req106->id};
printf("Removed: rnti:%u frame:%u-%u codeblock:%u, check it\n", printf("Removed: rnti:%u frame:%u-%u codeblock:%u, check it\n",
...@@ -363,8 +372,7 @@ int main(int argc, char* argv[]) { ...@@ -363,8 +372,7 @@ int main(int argc, char* argv[]) {
} else } else
printf("no rnti 106\n"); printf("no rnti 106\n");
displayList(pool.oldestRequests, pool.newestRequests); displayList(pool.oldestRequests, pool.newestRequests);
request_t *reqlast=searchRNTI(&pool.oldestRequests, &pool.newestRequests, request_t *reqlast=searchRNTI(&pool, 100+nbRequest-1);
100+nbRequest-1);
if (reqlast) { if (reqlast) {
printf("Removed last item, check it\n"); printf("Removed last item, check it\n");
freeRequest(reqlast); freeRequest(reqlast);
...@@ -373,49 +381,59 @@ int main(int argc, char* argv[]) { ...@@ -373,49 +381,59 @@ int main(int argc, char* argv[]) {
displayList(pool.oldestRequests, pool.newestRequests); displayList(pool.oldestRequests, pool.newestRequests);
printf("Remove all jobs\n"); printf("Remove all jobs\n");
while(pool.oldestRequests!=NULL) while(pool.oldestRequests!=NULL)
get_request(&pool); get_request(&pool,0);
printf("List should be empty now\n"); printf("List should be empty now\n");
displayList(pool.oldestRequests, pool.newestRequests); displayList(pool.oldestRequests, pool.newestRequests);
AssertFatal(pthread_mutex_unlock(&pool.lockRequests)==0,"");
sleep(1);
mutexlock(pool.lockReportDone);
pool.notFinishedJobs=0;
pool.doneRequests=NULL;
mutexunlock(pool.lockReportDone);
while (1) { while (1) {
uint64_t now=rdtsc(); uint64_t now=rdtsc();
/* run a loop that generates a lot of requests */ /* run a loop that generates a lot of requests */
AssertFatal(pthread_mutex_lock(&pool.lockRequests)==0,""); AssertFatal(pool.notFinishedJobs==0,"");
int nbRequest=add_requests(i, &pool); int n=add_requests(i, &pool);
AssertFatal(pthread_mutex_lock(&pool.lockReportDone)==0,""); printf("Added %d requests\n",n);
pool.notFinishedJobs+=nbRequest;
AssertFatal(pthread_mutex_unlock(&pool.lockReportDone)==0,"");
AssertFatal(pthread_cond_broadcast(&pool.notifRequest)==0,"");
AssertFatal(pthread_mutex_unlock(&pool.lockRequests)==0,"");
/* /*
// The main thread also process the queue // The main thread also process the queue
AssertFatal(pthread_mutex_lock(&pool.lockRequests)==0,""); mutexlock(pool.lockRequests);
request_t* request= NULL; request_t* request= NULL;
while ( (request=get_request(&pool)) != NULL ) { while ( (request=get_request(&pool,0)) != NULL ) {
AssertFatal(pthread_mutex_unlock(&pool.lockRequests)==0,""); mutexunlock(pool.lockRequests);
strcpy(request->processedBy,"MainThread"); strcpy(request->processedBy,"MainThread");
handle_request(&pool, request); handle_request(&pool, request);
AssertFatal(pthread_mutex_lock(&pool.lockRequests)==0,""); mutexlock(pool.lockRequests);
} }
AssertFatal(pthread_mutex_unlock(&pool.lockRequests)==0,""); mutexunlock(pool.lockRequests);
*/ */
// Wait all other threads finish to process // Wait all other threads finish to process
AssertFatal(pthread_mutex_lock(&pool.lockReportDone)==0,""); mutexlock(pool.lockReportDone);
while ( pool.notFinishedJobs > 0 ) { while ( pool.notFinishedJobs > 0 ) {
AssertFatal(pthread_cond_wait(&pool.notifDone,&pool.lockReportDone)==0,""); condwait(pool.notifDone,pool.lockReportDone);
}
mutexunlock(pool.lockReportDone);
int i=0;
for (request_t* ptr=pool.doneRequests; ptr!=NULL; ptr=ptr->next) {
i++;
//printf("in return: %ld, %p\n", ptr->id, ptr);
} }
AssertFatal(pthread_mutex_unlock(&pool.lockReportDone)==0,""); AssertFatal(i==n,"%d/%d\n",i,n);
while (pool.doneRequests!=NULL) { while (pool.doneRequests!=NULL) {
pool.doneRequests->returnTime=rdtsc(); pool.doneRequests->returnTime=rdtsc();
if(write(pool.traceFd,pool.doneRequests,sizeof(request_t))) {}; if(write(pool.traceFd,pool.doneRequests,sizeof(request_t)- 2*sizeof(void*))) {};
request_t* tmp=pool.doneRequests; request_t* tmp=pool.doneRequests;
pool.doneRequests=pool.doneRequests->next; pool.doneRequests=pool.doneRequests->next;
free(tmp); freeRequest(tmp);
} }
printf("Requests %lu Done %d requests in %ld µsec\n",i, nbRequest, (rdtsc()-now)/pool.cpuCyclesMicroSec);
printf("Requests %d Done in %ld µsec\n",i, (rdtsc()-now)/pool.cpuCyclesMicroSec);
i++; i++;
}; };
return 0; return 0;
......
#ifndef THREAD_POOL_H #ifndef THREAD_POOL_H
#define THREAD_POOL_H #define THREAD_POOL_H
#include <stdbool.h> #include <stdbool.h>
#include <sys/syscall.h>
#include <openair2/COMMON/platform_types.h> #include <openair2/COMMON/platform_types.h>
enum request_t { enum request_t {
...@@ -48,6 +49,7 @@ typedef struct thread_pool { ...@@ -48,6 +49,7 @@ typedef struct thread_pool {
int activated; int activated;
pthread_mutex_t lockRequests; pthread_mutex_t lockRequests;
pthread_cond_t notifRequest; pthread_cond_t notifRequest;
int notifCount;
pthread_mutex_t lockReportDone; pthread_mutex_t lockReportDone;
pthread_cond_t notifDone; pthread_cond_t notifDone;
request_t* oldestRequests; request_t* oldestRequests;
...@@ -62,6 +64,18 @@ typedef struct thread_pool { ...@@ -62,6 +64,18 @@ typedef struct thread_pool {
struct one_thread * allthreads; struct one_thread * allthreads;
} tpool_t; } tpool_t;
#define mutexinit(mutex) AssertFatal(pthread_mutex_init(&mutex,NULL)==0,"");
#define condinit(signal) AssertFatal(pthread_cond_init(&signal,NULL)==0,"");
//#define mutexlock(mutex) printf("L:" #mutex __FILE__ ":%d, thread %d\n", __LINE__, syscall( SYS_gettid )); AssertFatal(pthread_mutex_lock(&mutex)==0,"");
//#define mutexunlock(mutex) printf("U:" #mutex __FILE__ ":%d, thread:%d\n", __LINE__, syscall( SYS_gettid )); AssertFatal(pthread_mutex_unlock(&mutex)==0,"");
#define mutexlock(mutex) AssertFatal(pthread_mutex_lock(&mutex)==0,"");
#define mutexunlock(mutex) AssertFatal(pthread_mutex_unlock(&mutex)==0,"");
#define condwait(condition, mutex) AssertFatal(pthread_cond_wait(&condition, &mutex)==0,"");
#define condbroadcast(signal) AssertFatal(pthread_cond_broadcast(&signal)==0,"");
#define condsignal(signal) AssertFatal(pthread_cond_broadcast(&signal)==0,"");
void init_tpool(char*,tpool_t* ); void init_tpool(char*,tpool_t* );
request_t * createRequest(enum request_t type,int size); request_t * createRequest(enum request_t type,int size);
void freeRequest(request_t* request); void freeRequest(request_t* request);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment