Commit 822f9849 authored by Robert Schmidt's avatar Robert Schmidt

Stop CU's PROTO_AGENT when DU disconnects

This adds an exit flag as well as the proto_agent_async_msg_recv_unlock()
function to unlock and release the proto_agent worker thread correctly.
parent beec74b0
...@@ -57,6 +57,8 @@ void cu_task_handle_sctp_association_resp(instance_t instance, sctp_new_associat ...@@ -57,6 +57,8 @@ void cu_task_handle_sctp_association_resp(instance_t instance, sctp_new_associat
instance, instance,
sctp_new_association_resp->ulp_cnx_id); sctp_new_association_resp->ulp_cnx_id);
if (sctp_new_association_resp->sctp_state == SCTP_STATE_SHUTDOWN)
proto_agent_stop(instance);
//f1ap_handle_setup_message(instance, sctp_new_association_resp->sctp_state == SCTP_STATE_SHUTDOWN); //f1ap_handle_setup_message(instance, sctp_new_association_resp->sctp_state == SCTP_STATE_SHUTDOWN);
return; // exit -1 for debugging return; // exit -1 for debugging
} }
...@@ -75,10 +77,6 @@ void cu_task_handle_sctp_association_resp(instance_t instance, sctp_new_associat ...@@ -75,10 +77,6 @@ void cu_task_handle_sctp_association_resp(instance_t instance, sctp_new_associat
.remote_ipv4_address = RC.rrc[instance]->eth_params_s.remote_addr, .remote_ipv4_address = RC.rrc[instance]->eth_params_s.remote_addr,
.remote_port = RC.rrc[instance]->eth_params_s.remote_portd .remote_port = RC.rrc[instance]->eth_params_s.remote_portd
}; };
/* stop, then start the PROTO_AGENT. If it is already stopped, stopping it
* again will do nothing, therefore it is safe to call here.
* TODO: call proto_agent_stop() when CU_TASK is informed about conn release */
proto_agent_stop(instance);
AssertFatal(proto_agent_start(instance, &params) == 0, AssertFatal(proto_agent_start(instance, &params) == 0,
"could not start PROTO_AGENT for F1U on instance %d!\n", instance); "could not start PROTO_AGENT for F1U on instance %d!\n", instance);
} }
......
...@@ -70,6 +70,7 @@ int proto_agent_start(mod_id_t mod_id, const cudu_params_t *p) ...@@ -70,6 +70,7 @@ int proto_agent_start(mod_id_t mod_id, const cudu_params_t *p)
//DevAssert(p->remote_port > 1024); // "unprivileged" port //DevAssert(p->remote_port > 1024); // "unprivileged" port
proto_agent[mod_id].mod_id = mod_id; proto_agent[mod_id].mod_id = mod_id;
proto_agent[mod_id].exit = 0;
/* Initialize the channel container */ /* Initialize the channel container */
...@@ -138,10 +139,14 @@ error: ...@@ -138,10 +139,14 @@ error:
void proto_agent_stop(mod_id_t mod_id) void proto_agent_stop(mod_id_t mod_id)
{ {
if (!proto_agent[mod_id].channel) return; if (!proto_agent[mod_id].channel) return;
/* unlock the independent read thread proto_agent_receive() */
proto_agent[mod_id].exit = 1;
proto_agent_async_msg_recv_unlock(proto_agent[mod_id].channel->channel_info);
proto_agent_async_release(proto_agent[mod_id].channel); proto_agent_async_release(proto_agent[mod_id].channel);
proto_agent_destroy_channel(proto_agent[mod_id].channel->channel_id); proto_agent_destroy_channel(proto_agent[mod_id].channel->channel_id);
free(proto_agent[mod_id].channel); free(proto_agent[mod_id].channel);
proto_agent[mod_id].channel = NULL; proto_agent[mod_id].channel = NULL;
LOG_W(PROTO_AGENT, "server stopped\n");
} }
//void //void
...@@ -266,10 +271,11 @@ proto_agent_receive(void *args) ...@@ -266,10 +271,11 @@ proto_agent_receive(void *args)
msg = NULL; msg = NULL;
ser_msg = NULL; ser_msg = NULL;
if ((size = proto_agent_async_msg_recv(&data, &priority, inst->channel->channel_info)) <= 0){ if ((size = proto_agent_async_msg_recv(&data, &priority, inst->channel->channel_info)) < 0){
err_code = PROTOCOL__FLEXSPLIT_ERR__MSG_ENQUEUING; err_code = PROTOCOL__FLEXSPLIT_ERR__MSG_ENQUEUING;
goto error; goto error;
} }
if (inst->exit) break;
LOG_D(PROTO_AGENT, "Server side Received message with size %d and priority %d, calling message handle\n", size, priority); LOG_D(PROTO_AGENT, "Server side Received message with size %d and priority %d, calling message handle\n", size, priority);
......
...@@ -84,6 +84,10 @@ int proto_agent_async_msg_recv(void **data, int *priority, void *channel_info) ...@@ -84,6 +84,10 @@ int proto_agent_async_msg_recv(void **data, int *priority, void *channel_info)
return message_get(channel->receive_queue, data, priority); return message_get(channel->receive_queue, data, priority);
} }
void proto_agent_async_msg_recv_unlock(proto_agent_async_channel_t *channel) {
message_get_unlock(channel->receive_queue);
}
void proto_agent_async_release(proto_agent_channel_t *channel) void proto_agent_async_release(proto_agent_channel_t *channel)
{ {
proto_agent_async_channel_t *channel_info = channel->channel_info; proto_agent_async_channel_t *channel_info = channel->channel_info;
......
...@@ -55,6 +55,9 @@ int proto_agent_async_msg_send(void *data, int size, int priority, void *channel ...@@ -55,6 +55,9 @@ int proto_agent_async_msg_send(void *data, int size, int priority, void *channel
int proto_agent_async_msg_recv(void **data, int *priority, void *channel_info); int proto_agent_async_msg_recv(void **data, int *priority, void *channel_info);
/* unlocks a running proto_agent_async_msg_recv() */
void proto_agent_async_msg_recv_unlock(proto_agent_async_channel_t *channel);
void proto_agent_async_release(proto_agent_channel_t *channel); void proto_agent_async_release(proto_agent_channel_t *channel);
......
...@@ -121,6 +121,7 @@ typedef struct proto_agent_instance_s { ...@@ -121,6 +121,7 @@ typedef struct proto_agent_instance_s {
proto_agent_info_t agent_info; proto_agent_info_t agent_info;
struct proto_agent_channel_s *channel; struct proto_agent_channel_s *channel;
pthread_t recv_thread; pthread_t recv_thread;
uint8_t exit;
} proto_agent_instance_t; } proto_agent_instance_t;
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment