Commit 003dd281 authored by Robert Schmidt's avatar Robert Schmidt

CI: refactor Iperf(): no queues or locks, use ThreadPoolExecutor

parent 2a733173
......@@ -43,7 +43,7 @@ import logging
import datetime
import signal
import statistics as stat
from multiprocessing import Process, Lock, SimpleQueue
from multiprocessing import SimpleQueue
import concurrent.futures
#import our libs
......@@ -494,15 +494,6 @@ class OaiCiTest():
messages = [f.result() for f in futures]
HTML.CreateHtmlTestRowQueue('NA', 'OK', messages)
def ping_iperf_wrong_exit(self, lock, UE_IPAddress, device_id, statusQueue, message):
logging.error(f"ue {device_id} {UE_IPAddress}: {message}")
lock.acquire()
statusQueue.put(-1)
statusQueue.put(device_id)
statusQueue.put(UE_IPAddress)
statusQueue.put(message)
lock.release()
def Ping_common(self, EPC, ue, RAN):
# Launch ping on the EPC side (true for ltebox and old open-air-cn)
ping_status = 0
......@@ -617,7 +608,7 @@ class OaiCiTest():
def Iperf_ComputeModifiedBW(self, idx, ue_num):
result = re.search('-b (?P<iperf_bandwidth>[0-9\.]+)[KMG]', str(self.iperf_args))
if result is None:
logging.debug('\u001B[1;37;41m Iperf bandwidth Not Found! \u001B[0m')
logging.error('\u001B[1;37;41m Iperf bandwidth Not Found! \u001B[0m')
sys.exit(1)
iperf_bandwidth = result.group('iperf_bandwidth')
if self.iperf_profile == 'balanced':
......@@ -635,11 +626,11 @@ class OaiCiTest():
iperf_bandwidth_str_new = f"-b {'%.2f' % iperf_bandwidth_new}"
result = re.sub(iperf_bandwidth_str, iperf_bandwidth_str_new, str(self.iperf_args))
if result is None:
logging.debug('\u001B[1;37;41m Calculate Iperf bandwidth Failed! \u001B[0m')
logging.error('\u001B[1;37;41m Calculate Iperf bandwidth Failed! \u001B[0m')
sys.exit(1)
return result
def Iperf_analyzeV2TCPOutput(self, lock, UE_IPAddress, device_id, statusQueue, iperf_real_options,EPC,SSH, filename):
def Iperf_analyzeV2TCPOutput(self, SSH, filename):
SSH.command(f'awk -f /tmp/tcp_iperf_stats.awk {filename}', '\$', 5)
result = re.search('Avg Bitrate : (?P<average>[0-9\.]+ Mbits\/sec) Max Bitrate : (?P<maximum>[0-9\.]+ Mbits\/sec) Min Bitrate : (?P<minimum>[0-9\.]+ Mbits\/sec)', SSH.getBefore())
......@@ -647,44 +638,34 @@ class OaiCiTest():
avgbitrate = result.group('average')
maxbitrate = result.group('maximum')
minbitrate = result.group('minimum')
lock.acquire()
logging.debug(f'\u001B[1;37;44m TCP iperf result ({UE_IPAddress}) \u001B[0m')
msg = 'TCP Stats :\n'
if avgbitrate is not None:
logging.debug(f'\u001B[1;34m Avg Bitrate : {avgbitrate} \u001B[0m')
msg += f'Avg Bitrate : {avgbitrate} \n'
if maxbitrate is not None:
logging.debug(f'\u001B[1;34m Max Bitrate : {maxbitrate} \u001B[0m')
msg += f'Max Bitrate : {maxbitrate} \n'
if minbitrate is not None:
logging.debug(f'\u001B[1;34m Min Bitrate : {minbitrate} \u001B[0m')
msg += f'Min Bitrate : {minbitrate} \n'
statusQueue.put(0)
statusQueue.put(device_id)
statusQueue.put(UE_IPAddress)
statusQueue.put(msg)
lock.release()
return (True, msg)
return 0
return (False, "could not analyze log file")
def Iperf_analyzeV2Output(self, lock, UE_IPAddress, device_id, statusQueue, iperf_real_options,EPC,SSH):
def Iperf_analyzeV2Output(self, iperf_real_options, EPC, SSH):
result = re.search('-u', str(iperf_real_options))
if result is None:
logging.debug('Into Iperf_analyzeV2TCPOutput client')
filename = f'{EPC.SourceCodePath}/scripts/iperf_{self.testCase_id}_{device_id}.log'
response = self.Iperf_analyzeV2TCPOutput(lock, UE_IPAddress, device_id, statusQueue, iperf_real_options, EPC, SSH, filename)
logging.debug(f'Iperf_analyzeV2TCPOutput response returned value = {response}')
response = self.Iperf_analyzeV2TCPOutput(SSH, filename)
return response
result = re.search('Server Report:', SSH.getBefore())
if result is None:
result = re.search('read failed: Connection refused', SSH.getBefore())
if result is not None:
logging.debug('\u001B[1;37;41m Could not connect to iperf server! \u001B[0m')
msg = 'Could not connect to iperf server!'
return (False, msg)
else:
logging.debug('\u001B[1;37;41m Server Report and Connection refused Not Found! \u001B[0m')
return -1
msg = 'Server Report and Connection refused Not Found!'
return (False, msg)
# Computing the requested bandwidth in float
result = re.search('-b (?P<iperf_bandwidth>[0-9\.]+)[KMG]', str(iperf_real_options))
if result is not None:
......@@ -708,14 +689,11 @@ class OaiCiTest():
bitrate = result.group('bitrate')
packetloss = result.group('packetloss')
jitter = result.group('jitter')
lock.acquire()
logging.debug(f'\u001B[1;37;44m iperf result {UE_IPAddress} \u001B[0m')
iperfStatus = True
msg = f'Req Bitrate : {req_bandwidth} \n'
logging.debug(f'\u001B[1;34m Req Bitrate : {req_bandwidth} \u001B[0m')
if bitrate is not None:
msg += f'Bitrate : {bitrate} \n'
logging.debug(f'\u001B[1;34m Bitrate : {bitrate} \u001B[0m')
result = re.search('(?P<real_bw>[0-9\.]+) [KMG]bits/sec', str(bitrate))
if result is not None:
actual_bw = float(str(result.group('real_bw')))
......@@ -731,50 +709,37 @@ class OaiCiTest():
br_loss = 100 * actual_bw / req_bw
bitperf = '%.2f ' % br_loss
msg += f'Bitrate Perf: {bitperf} %\n'
logging.debug(f'\u001B[1;34m Bitrate Perf: {bitperf} %\u001B[0m')
if packetloss is not None:
msg += f'Packet Loss : {packetloss} %\n'
logging.debug(f'\u001B[1;34m Packet Loss : {packetloss} %\u001B[0m')
if float(packetloss) > float(self.iperf_packetloss_threshold):
msg += 'Packet Loss too high!\n'
logging.debug('\u001B[1;37;41m Packet Loss too high \u001B[0m')
iperfStatus = False
if jitter is not None:
msg += f'Jitter : {jitter} \n'
logging.debug(f'\u001B[1;34m Jitter : {jitter} \u001B[0m')
if (iperfStatus):
statusQueue.put(0)
else:
statusQueue.put(-1)
statusQueue.put(device_id)
statusQueue.put(UE_IPAddress)
statusQueue.put(msg)
lock.release()
return 0
return (iperfStatus, msg)
else:
return -2
return (False, "could not analyze server log")
def Iperf_analyzeV2BIDIR(self, lock, UE_IPAddress, device_id, statusQueue,server_filename,client_filename):
def Iperf_analyzeV2BIDIR(self, server_filename, client_filename):
#check the 2 files are here
if (not os.path.isfile(client_filename)) or (not os.path.isfile(server_filename)):
self.ping_iperf_wrong_exit(lock, UE_IPAddress, device_id, statusQueue, 'Bidir TCP : Client or Server Log File not present')
return
return (False, 'Bidir TCP: Client or Server Log File not present')
#check the 2 files size
if (os.path.getsize(client_filename)==0) and (os.path.getsize(server_filename)==0):
self.ping_iperf_wrong_exit(lock, UE_IPAddress, device_id, statusQueue, 'Bidir TCP : Client and Server Log File are empty')
return
return (False, 'Bidir TCP: Client and Server Log File are empty')
report_msg='TCP BIDIR Report:\n'
report_msg = ''
#if client is not empty, all the info is in, otherwise we ll use the server file to get some partial info
client_filesize = os.path.getsize(client_filename)
if client_filesize == 0:
report_msg+="Client file (UE) present but !!! EMPTY !!!\n"
report_msg+="Partial report from server file\n"
report_msg+="Partial report from server file"
filename = server_filename
else :
report_msg+="Report from client file (UE)\n"
report_msg+="Report from client file (UE)"
filename = client_filename
report=[] #used to check if relevant lines were found
......@@ -784,32 +749,19 @@ class OaiCiTest():
result = re.search(rf'^\[\s+\d+\](?P<direction>\[.+\]).*\s+(?P<bitrate>[0-9\.]+ [KMG]bits\/sec).*\s+(?P<role>\bsender|receiver\b)', str(line))
if result is not None:
report.append(str(line))
report_msg+=result.group('role') + ' ' + result.group('direction')+ '\t = ' +result.group('bitrate')+'\n'
if len(report)>0:
lock.acquire()
statusQueue.put(0)
statusQueue.put(device_id)
statusQueue.put(UE_IPAddress)
statusQueue.put(report_msg)
logging.debug(f'\u001B[1;37;45m TCP Bidir Iperf Result ({UE_IPAddress}) \u001B[0m')
for rLine in report_msg.split('\n'):
logging.debug(f'\u001B[1;35m {rLine} \u001B[0m')
lock.release()
else:
self.ping_iperf_wrong_exit(lock, UE_IPAddress, device_id, statusQueue, 'Bidir TCP : Could not analyze from Log file')
report_msg += f"\n{result.group('role')} {result.group('direction')}\t: {result.group('bitrate')}"
if len(report) == 0:
return (False, 'Bidir TCP: Could not analyze from Log file')
return (True, report_msg)
def Iperf_analyzeV2Server(self, lock, UE_IPAddress, device_id, statusQueue, iperf_real_options, filename,type):
def Iperf_analyzeV2Server(self, iperf_real_options, filename, type):
if (not os.path.isfile(filename)):
self.ping_iperf_wrong_exit(lock, UE_IPAddress, device_id, statusQueue, 'Could not analyze from server log')
return
return (False, 'Could not analyze from server log')
# Computing the requested bandwidth in float
result = re.search('-b (?P<iperf_bandwidth>[0-9\.]+)[KMG]', str(iperf_real_options))
if result is None:
logging.debug('Iperf bandwidth Not Found!')
self.ping_iperf_wrong_exit(lock, UE_IPAddress, device_id, statusQueue, 'Could not compute Iperf bandwidth!')
return
return (False, 'Could not compute Iperf bandwidth!')
else:
req_bandwidth = result.group('iperf_bandwidth')
req_bw = float(req_bandwidth)
......@@ -856,6 +808,8 @@ class OaiCiTest():
br_sum = curr_br + br_sum
ji_sum = float(ji[0]) + ji_sum
server_file.close()
if (row_idx > 0):
br_sum = br_sum / row_idx
ji_sum = ji_sum / row_idx
......@@ -876,87 +830,25 @@ class OaiCiTest():
pl = float(100 * pl_sum / ps_sum)
packetloss = '%2.1f ' % (pl)
packetloss += '%'
#checking packet loss compliance
if float(pl) > float(self.iperf_packetloss_threshold):
pal_too_high_msg = 'Packet Loss too high : tested = '+packetloss+', target = '+self.iperf_packetloss_threshold+'%'
else:
pal_too_high_msg='Packet Loss value is within acceptance range'
#checking bitrate perf compliance
if float(br_loss) < float(self.iperf_bitrate_threshold):
bit_too_low_msg = 'Bitrate too low : tested = '+bitperf+', target = '+self.iperf_bitrate_threshold+'%'
else:
bit_too_low_msg='Bitrate perf value is within acceptance range'
lock.acquire()
if (float(br_loss) < float(self.iperf_bitrate_threshold)) and (float(pl) > float(self.iperf_packetloss_threshold)):
statusQueue.put(-1)
elif (float(br_loss) < float(self.iperf_bitrate_threshold)) or (float(pl) > float(self.iperf_packetloss_threshold)):
statusQueue.put(1)
else:
statusQueue.put(0)
statusQueue.put(device_id)
statusQueue.put(UE_IPAddress)
result = float(br_loss) >= float(self.iperf_bitrate_threshold) and float(pl) <= float(self.iperf_packetloss_threshold)
req_msg = f'Req Bitrate : {req_bandwidth}'
bir_msg = f'Bitrate : {bitrate}'
brl_msg = f'Bitrate Perf: {bitperf}'
if float(br_loss) < float(self.iperf_bitrate_threshold):
brl_msg += f' (too low! <{self.iperf_bitrate_threshold}%)'
jit_msg = f'Jitter : {jitter}'
pal_msg = f'Packet Loss : {packetloss}'
statusQueue.put(f'{req_msg}\n{bir_msg}\n{brl_msg}\n{jit_msg}\n{pal_msg}\n{pal_too_high_msg}\n{bit_too_low_msg}\n')
logging.debug(f'\u001B[1;37;45m iperf result ({UE_IPAddress}) \u001B[0m')
logging.debug(f'\u001B[1;35m {req_msg} \u001B[0m')
logging.debug(f'\u001B[1;35m {bir_msg} \u001B[0m')
logging.debug(f'\u001B[1;35m {brl_msg} \u001B[0m')
logging.debug(f'\u001B[1;35m {jit_msg} \u001B[0m')
logging.debug(f'\u001B[1;35m {pal_msg} \u001B[0m')
logging.debug(f'\u001B[1;35m {pal_too_high_msg} \u001B[0m')
logging.debug(f'\u001B[1;35m {bit_too_low_msg} \u001B[0m')
lock.release()
else:
self.ping_iperf_wrong_exit(lock, UE_IPAddress, device_id, statusQueue, 'Could not analyze from server log')
server_file.close()
def Iperf_analyzeV3Output(self, lock, UE_IPAddress, device_id, statusQueue,SSH):
result = re.search('(?P<bitrate>[0-9\.]+ [KMG]bits\/sec) +(?:|[0-9\.]+ ms +\d+\/\d+ \((?P<packetloss>[0-9\.]+)%\)) +(?:|receiver)\r\n(?:|\[ *\d+\] Sent \d+ datagrams)\r\niperf Done\.', SSH.getBefore())
if result is None:
result = re.search('(?P<error>iperf: error - [a-zA-Z0-9 :]+)', SSH.getBefore())
lock.acquire()
statusQueue.put(-1)
statusQueue.put(device_id)
statusQueue.put(UE_IPAddress)
if result is not None:
logging.debug(f"\u001B[1;37;41m {result.group('error')} \u001B[0m")
statusQueue.put(result.group('error'))
else:
logging.debug('\u001B[1;37;41m Bitrate and/or Packet Loss Not Found! \u001B[0m')
statusQueue.put('Bitrate and/or Packet Loss Not Found!')
lock.release()
bitrate = result.group('bitrate')
packetloss = result.group('packetloss')
lock.acquire()
logging.debug(f'\u001B[1;37;44m iperf result ({UE_IPAddress}) \u001B[0m')
logging.debug(f'\u001B[1;34m Bitrate : {bitrate} \u001B[0m')
msg = f'Bitrate : {bitrate} \n'
iperfStatus = True
if packetloss is not None:
logging.debug(f'\u001B[1;34m Packet Loss : {packetloss} %\u001B[0m')
msg += f'Packet Loss : {packetloss} %\n'
if float(packetloss) > float(self.iperf_packetloss_threshold):
logging.debug('\u001B[1;37;41m Packet Loss too high \u001B[0m')
msg += 'Packet Loss too high!\n'
iperfStatus = False
if (iperfStatus):
statusQueue.put(0)
if float(pl) > float(self.iperf_packetloss_threshold):
pal_msg += f' (too high! >{self.iperf_packetloss_threshold}%)'
return (result, f'{req_msg}\n{bir_msg}\n{brl_msg}\n{jit_msg}\n{pal_msg}')
else:
statusQueue.put(-1)
statusQueue.put(device_id)
statusQueue.put(UE_IPAddress)
statusQueue.put(msg)
lock.release()
return (False, 'Could not analyze from server log')
def Iperf_Module(self, lock, statusQueue, EPC, ue, RAN, idx, ue_num):
def Iperf_Module(self, EPC, ue, RAN, idx, ue_num):
ueIP = ue.getIP()
if not ueIP:
return (False, f"UE {ue.getName()} has no IP address")
SSH = sshconnection.SSHConnection()
server_filename = f'iperf_server_{self.testCase_id}_{ue.getName()}.log'
client_filename = f'iperf_client_{self.testCase_id}_{ue.getName()}.log'
......@@ -987,16 +879,18 @@ class OaiCiTest():
# hack: the ADB UEs don't have iperf in $PATH, so we need to hardcode for the moment
iperf_ue = '/data/local/tmp/iperf' if re.search('adb', ue.getName()) else 'iperf'
ue_header = f'UE {ue.getName()} ({ueIP})'
if self.iperf_direction == "DL":
logging.debug("Iperf in DL requested")
cmd = cls_cmd.getConnection(ue.getHost())
cmd.run(f'rm {server_filename}')
cmd.run(f'{ue.getCmdPrefix()} {iperf_ue} -s -B {ue.getIP()} {udpSwitch} -i 1 -t {iperf_time * 1.5} {port} &> /tmp/{server_filename} &')
cmd.run(f'{ue.getCmdPrefix()} {iperf_ue} -s -B {ueIP} {udpSwitch} -i 1 -t {iperf_time * 1.5} {port} &> /tmp/{server_filename} &')
cmd.close()
cmd = cls_cmd.getConnection(EPC.IPAddress)
cmd.run(f'rm {EPC.SourceCodePath}/{client_filename}')
cmd.run(f'{cn_iperf_prefix} iperf -c {ue.getIP()} {iperf_opt} {port} &> {EPC.SourceCodePath}/{client_filename}', timeout=iperf_time * 1.5)
cmd.run(f'{cn_iperf_prefix} iperf -c {ueIP} {iperf_opt} {port} &> {EPC.SourceCodePath}/{client_filename}', timeout=iperf_time * 1.5)
cmd.copyin(f'{EPC.SourceCodePath}/{client_filename}', client_filename)
cmd.close()
......@@ -1005,10 +899,10 @@ class OaiCiTest():
cmd.close()
if udpIperf:
self.Iperf_analyzeV2Server(lock, ue.getIP(), ue.getName(), statusQueue, iperf_opt, server_filename, 1)
status, msg = self.Iperf_analyzeV2Server(iperf_opt, server_filename, 1)
else:
cmd = cls_cmd.getConnection(EPC.IPAddress)
self.Iperf_analyzeV2TCPOutput(lock, ue.getIP(), ue.getName(), statusQueue, iperf_opt, EPC, cmd, f"{EPC.SourceCodePath}/{client_filename}")
status, msg = self.Iperf_analyzeV2TCPOutput(cmd, f"{EPC.SourceCodePath}/{client_filename}")
cmd.close()
elif self.iperf_direction == "UL":
......@@ -1020,7 +914,7 @@ class OaiCiTest():
cmd = cls_cmd.getConnection(ue.getHost())
cmd.run(f'rm /tmp/{client_filename}')
cmd.run(f'{ue.getCmdPrefix()} {iperf_ue} -B {ue.getIP()} -c {cn_target_ip} {iperf_opt} {port} &> /tmp/{client_filename}', timeout=iperf_time*1.5)
cmd.run(f'{ue.getCmdPrefix()} {iperf_ue} -B {ueIP} -c {cn_target_ip} {iperf_opt} {port} &> /tmp/{client_filename}', timeout=iperf_time*1.5)
cmd.copyin(f'/tmp/{client_filename}', client_filename)
cmd.close()
......@@ -1029,10 +923,10 @@ class OaiCiTest():
cmd.close()
if udpIperf:
self.Iperf_analyzeV2Server(lock, ue.getIP(), ue.getName(), statusQueue, iperf_opt, server_filename, 1)
status, msg = self.Iperf_analyzeV2Server(iperf_opt, server_filename, 1)
else:
cmd = cls_cmd.getConnection(ue.getHost())
self.Iperf_analyzeV2TCPOutput(lock, ue.getIP(), ue.getName(), statusQueue, iperf_opt, EPC, cmd, f"/tmp/{client_filename}")
status, msg = self.Iperf_analyzeV2TCPOutput(cmd, f"/tmp/{client_filename}")
cmd.close()
elif self.iperf_direction=="BIDIR":
......@@ -1044,7 +938,7 @@ class OaiCiTest():
cmd = cls_cmd.getConnection(ue.getHost())
cmd.run(f'rm /tmp/{client_filename}')
cmd.run(f'iperf3 -B {ue.getIP()} -c {cn_target_ip} {iperf_opt} {port} &> /tmp/{client_filename}', timeout=iperf_time*1.5)
cmd.run(f'iperf3 -B {ueIP} -c {cn_target_ip} {iperf_opt} {port} &> /tmp/{client_filename}', timeout=iperf_time*1.5)
cmd.copyin(f'/tmp/{client_filename}', client_filename)
cmd.close()
......@@ -1052,129 +946,32 @@ class OaiCiTest():
cmd.copyin(f'{EPC.SourceCodePath}/{server_filename}', server_filename)
cmd.close()
self.Iperf_analyzeV2BIDIR(lock, ue.getHost(), ue.getName(), statusQueue, server_filename, client_filename)
status, msg = self.Iperf_analyzeV2BIDIR(server_filename, client_filename)
elif self.iperf_direction == "IPERF3":
cmd = cls_cmd.getConnection(ue.getHost())
cmd.run(f'rm /tmp/{server_filename}', reportNonZero=False)
port = f'{5002+idx}'
cmd.run(f'{ue.getCmdPrefix()} iperf3 -B {ue.getIP()} -c {cn_target_ip} -p {port} {iperf_opt} --get-server-output &> /tmp/{server_filename}', timeout=iperf_time*1.5)
cmd.run(f'{ue.getCmdPrefix()} iperf3 -B {ueIP} -c {cn_target_ip} -p {port} {iperf_opt} --get-server-output &> /tmp/{server_filename}', timeout=iperf_time*1.5)
cmd.copyin(f'/tmp/{server_filename}', server_filename)
cmd.close()
if udpIperf:
self.Iperf_analyzeV2Server(lock, ue.getIP(), ue.getName(), statusQueue, iperf_opt, server_filename, 1)
status, msg = self.Iperf_analyzeV2Server(iperf_opt, server_filename, 1)
else:
cmd = cls_cmd.getConnection(EPC.IPAddress)
self.Iperf_analyzeV2TCPOutput(lock, ue.getIP(), ue.getName(), statusQueue, iperf_opt, EPC, cmd, f'/tmp/{server_filename}')
status, msg = self.Iperf_analyzeV2TCPOutput(cmd, f'/tmp/{server_filename}')
cmd.close()
else :
raise Exception("Incorrect or missing IPERF direction in XML")
def IperfNoS1(self,HTML,RAN,EPC,CONTAINERS):
SSH = sshconnection.SSHConnection()
if RAN.eNBIPAddress == '' or RAN.eNBUserName == '' or RAN.eNBPassword == '' or self.UEIPAddress == '' or self.UEUserName == '' or self.UEPassword == '':
HELP.GenericHelp(CONST.Version)
sys.exit('Insufficient Parameter')
check_eNB = True
check_OAI_UE = True
server_on_enb = re.search('-R', str(self.iperf_args))
if server_on_enb is not None:
iServerIPAddr = RAN.eNBIPAddress
iServerUser = RAN.eNBUserName
iServerPasswd = RAN.eNBPassword
iClientIPAddr = self.UEIPAddress
iClientUser = self.UEUserName
iClientPasswd = self.UEPassword
else:
iServerIPAddr = self.UEIPAddress
iServerUser = self.UEUserName
iServerPasswd = self.UEPassword
iClientIPAddr = RAN.eNBIPAddress
iClientUser = RAN.eNBUserName
iClientPasswd = RAN.eNBPassword
if self.iperf_options != 'sink':
# Starting the iperf server
SSH.open(iServerIPAddr, iServerUser, iServerPasswd)
# args SHALL be "-c client -u any"
# -c 10.0.1.2 -u -b 1M -t 30 -i 1 -fm -B 10.0.1.1
# -B 10.0.1.1 -u -s -i 1 -fm
server_options = re.sub('-u.*$', '-u -s -i 1 -fm', str(self.iperf_args))
server_options = server_options.replace('-c','-B')
SSH.command(f'rm -f /tmp/tmp_iperf_server_{self.testCase_id}.log', '\$', 5)
SSH.command(f'echo $USER; nohup iperf {server_options} > /tmp/tmp_iperf_server_{self.testCase_id}.log 2>&1 &', iServerUser, 5)
time.sleep(0.5)
SSH.close()
# Starting the iperf client
modified_options = self.Iperf_ComputeModifiedBW(0, 1)
modified_options = modified_options.replace('-R','')
iperf_time = self.Iperf_ComputeTime()
SSH.open(iClientIPAddr, iClientUser, iClientPasswd)
SSH.command(f'rm -f /tmp/tmp_iperf_{self.testCase_id}.log', '\$', 5)
iperf_status = SSH.command(f'stdbuf -o0 iperf {modified_options} 2>&1 | stdbuf -o0 tee /tmp/tmp_iperf_{self.testCase_id}.log', '\$', int(iperf_time)*5.0)
status_queue = SimpleQueue()
lock = Lock()
if iperf_status < 0:
message = 'iperf on OAI UE crashed due to TIMEOUT !'
logging.debug(f'\u001B[1;37;41m {message} \u001B[0m')
clientStatus = -2
else:
if self.iperf_options == 'sink':
clientStatus = 0
status_queue.put(0)
status_queue.put('OAI-UE')
status_queue.put('10.0.1.2')
status_queue.put('Sink Test : no check')
else:
clientStatus = self.Iperf_analyzeV2Output(lock, '10.0.1.2', 'OAI-UE', status_queue, modified_options, EPC,SSH)
SSH.close()
# Stopping the iperf server
if self.iperf_options != 'sink':
SSH.open(iServerIPAddr, iServerUser, iServerPasswd)
SSH.command('killall --signal SIGKILL iperf', '\$', 5)
time.sleep(0.5)
SSH.close()
logging.info(f'\u001B[1;37;45m iperf result for {ue_header}\u001B[0m')
for l in msg.split('\n'):
logging.info(f'\u001B[1;35m {l} \u001B[0m')
return (status, f'{ue_header}\n{msg}')
if (clientStatus == -1):
if (os.path.isfile(f'iperf_server_{self.testCase_id}.log')):
os.remove(f'iperf_server_{self.testCase_id}.log')
SSH.copyin(iServerIPAddr, iServerUser, iServerPasswd, f'/tmp/tmp_iperf_server_{self.testCase_id}.log', f'iperf_server_{self.testCase_id}_OAI-UE.log')
filename=f'iperf_server_{self.testCase_id}_OAI-UE.log'
self.Iperf_analyzeV2Server(lock, '10.0.1.2', 'OAI-UE', status_queue, modified_options,filename,0)
# copying on the EPC server for logCollection
if (clientStatus == -1):
copyin_res = SSH.copyin(iServerIPAddr, iServerUser, iServerPasswd, f'/tmp/tmp_iperf_server_{self.testCase_id}.log', f'iperf_server_{self.testCase_id}_OAI-UE.log')
if (copyin_res == 0):
SSH.copyout(EPC.IPAddress, EPC.UserName, EPC.Password, f'iperf_server_{self.testCase_id}_OAI-UE.log', f'{EPC.SourceCodePath}/scripts')
copyin_res = SSH.copyin(iClientIPAddr, iClientUser, iClientPasswd, f'/tmp/tmp_iperf_{self.testCase_id}.log', f'iperf_{self.testCase_id}_OAI-UE.log')
if (copyin_res == 0):
SSH.copyout(EPC.IPAddress, EPC.UserName, EPC.Password, f'iperf_{self.testCase_id}_OAI-UE.log', f'{EPC.SourceCodePath}/scripts')
iperf_noperf = False
if status_queue.empty():
iperf_status = False
else:
iperf_status = True
messages = []
while (not status_queue.empty()):
count = status_queue.get()
if (count < 0):
iperf_status = False
if (count > 0):
iperf_noperf = True
device_id = status_queue.get()
ip_addr = status_queue.get()
message = status_queue.get()
messages.append(f'UE ({device_id})\nIP Address : {ip_addr}\n{message}')
if (iperf_noperf and iperf_status):
HTML.CreateHtmlTestRowQueue(self.iperf_args, 'PERF NOT MET', messages)
elif (iperf_status):
HTML.CreateHtmlTestRowQueue(self.iperf_args, 'OK', messages)
else:
HTML.CreateHtmlTestRowQueue(self.iperf_args, 'KO', messages)
self.AutoTerminateUEandeNB(HTML,RAN,EPC,CONTAINERS)
def IperfNoS1(self,HTML,RAN,EPC,CONTAINERS):
raise 'IperfNoS1 not implemented'
def Iperf(self,HTML,RAN,EPC,CONTAINERS):
result = re.search('noS1', str(RAN.Initialize_eNB_args))
......@@ -1187,60 +984,14 @@ class OaiCiTest():
logging.debug(f'Iperf: iperf_args "{self.iperf_args}" iperf_direction "{self.iperf_direction}" iperf_packetloss_threshold "{self.iperf_packetloss_threshold}" iperf_bitrate_threshold "{self.iperf_bitrate_threshold}" iperf_profile "{self.iperf_profile}" iperf_options "{self.iperf_options}"')
ues = []
for ue_name in self.ue_ids:
ue = cls_module_ue.Module_UE(ue_name)
if not ue.getIP():
logging.error("no IP addresses returned")
HTML.CreateHtmlTestRow(self.ping_args, 'KO', CONST.UE_IP_ADDRESS_ISSUE)
self.AutoTerminateUEandeNB(HTML,RAN,EPC,CONTAINERS)
ues.append(ue)
ues = [cls_module_ue.Module_UE(n.strip()) for n in self.ue_ids]
logging.debug(ues)
self.dummyIperfVersion = '2.0.10'
#cmd = 'iperf --version'
#logging.debug(cmd + '\n')
#iperfStdout = subprocess.check_output(cmd, shell=True, universal_newlines=True)
#result = re.search('iperf version 2.0.5', str(iperfStdout.strip()))
#if result is not None:
# dummyIperfVersion = '2.0.5'
#result = re.search('iperf version 2.0.10', str(iperfStdout.strip()))
#if result is not None:
# dummyIperfVersion = '2.0.10'
multi_jobs = []
ue_num = len(ues)
i = 0
lock = Lock()
status_queue = SimpleQueue()
for ue in ues:
p = Process(target = self.Iperf_Module ,args = (lock, status_queue, EPC, ue, RAN, i, ue_num))
p.daemon = True
p.start()
multi_jobs.append(p)
i = i + 1
for job in multi_jobs:
job.join()
if (status_queue.empty()):
HTML.CreateHtmlTestRow(self.iperf_args, 'KO', CONST.ALL_PROCESSES_OK)
self.AutoTerminateUEandeNB(HTML,RAN,EPC,CONTAINERS)
else:
iperf_status = True
iperf_noperf = False
messages = []
while (not status_queue.empty()):
count = status_queue.get()
if (count < 0):
iperf_status = False
if (count > 0):
iperf_noperf = True
device_id = status_queue.get()
ip_addr = status_queue.get()
msg = status_queue.get()
messages.append(f'UE ({device_id})\nIP Address : {ip_addr}\n{msg}')
if (iperf_noperf and iperf_status):
HTML.CreateHtmlTestRowQueue(self.iperf_args, 'PERF NOT MET', messages)
elif (iperf_status):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(self.Iperf_Module, EPC, ue, RAN, i, len(ues)) for i, ue in enumerate(ues)]
results = [f.result() for f in futures]
# each result in results is a tuple, first member goes to successes, second to messages
successes, messages = map(list, zip(*results))
if len(successes) == len(ues) and all(successes):
HTML.CreateHtmlTestRowQueue(self.iperf_args, 'OK', messages)
else:
HTML.CreateHtmlTestRowQueue(self.iperf_args, 'KO', messages)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment