Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
OpenXG-RAN
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zzha zzha
OpenXG-RAN
Commits
41e81915
Commit
41e81915
authored
Feb 15, 2023
by
Robert Schmidt
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Improve cls_module_ue.py
parent
917bea36
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
164 additions
and
144 deletions
+164
-144
ci-scripts/cls_module_ue.py
ci-scripts/cls_module_ue.py
+164
-144
No files found.
ci-scripts/cls_module_ue.py
View file @
41e81915
...
...
@@ -28,164 +28,186 @@
import
os
import
sys
import
logging
#to create a SSH object locally in the methods
import
sshconnection
#time.sleep
import
time
import
re
import
subprocess
from
datetime
import
datetime
import
yaml
#for log rotation mgt
import
cls_log_mgt
import
cls_cmd
class
Module_UE
:
def
__init__
(
self
,
Module
):
#create attributes as in the Module dictionary
for
k
,
v
in
Module
.
items
():
setattr
(
self
,
k
,
v
)
self
.
UEIPAddress
=
""
#dictionary linking command names and related module scripts
self
.
cmd_dict
=
{
"wup"
:
self
.
WakeupScript
,
"detach"
:
self
.
DetachScript
}
#dictionary of function scripts
self
.
ue_trace
=
''
def
__init__
(
self
,
module_name
,
filename
=
"ci_ueinfra.yaml"
):
with
open
(
filename
,
'r'
)
as
f
:
all_ues
=
yaml
.
load
(
f
,
Loader
=
yaml
.
FullLoader
)
m
=
all_ues
.
get
(
module_name
)
if
m
is
None
:
raise
Exception
(
f'no such module name "
{
module_name
}
" in "
{
filename
}
"'
)
self
.
module_name
=
module_name
self
.
host
=
m
[
'Host'
]
self
.
cmd_dict
=
{
"attach"
:
m
.
get
(
'AttachScript'
),
"detach"
:
m
.
get
(
'DetachScript'
),
"initialize"
:
m
.
get
(
'InitScript'
),
"terminate"
:
m
.
get
(
'TermScript'
),
"getNetwork"
:
m
.
get
(
'UENetworkScript'
),
"check"
:
m
.
get
(
'CheckStatusScript'
),
"dataEnable"
:
m
.
get
(
'DataEnableScript'
),
"dataDisable"
:
m
.
get
(
'DataDisableScript'
),
}
self
.
interface
=
m
.
get
(
'IF'
)
self
.
MTU
=
m
.
get
(
'MTU'
)
self
.
trace
=
m
.
get
(
'trace'
)
==
True
self
.
logStore
=
m
.
get
(
'LogStore'
)
self
.
cmd_prefix
=
m
.
get
(
'CmdPrefix'
)
logging
.
info
(
f'initialized UE
{
self
.
module_name
}
@
{
self
.
host
}
from
{
filename
}
'
)
def
__str__
(
self
):
return
f"
{
self
.
module_name
}
@
{
self
.
host
}
[IP:
{
self
.
getIP
()
}
]"
def
__repr__
(
self
):
return
self
.
__str__
()
def
_command
(
self
,
cmd
,
silent
=
False
):
if
cmd
is
None
:
raise
Exception
(
"no command provided"
)
if
self
.
host
==
""
or
self
.
host
==
"localhost"
:
c
=
cls_cmd
.
LocalCmd
()
else
:
c
=
cls_cmd
.
RemoteCmd
(
self
.
host
)
response
=
c
.
run
(
cmd
,
silent
=
silent
)
c
.
close
()
return
response
.
stdout
#-----------------$
#PUBLIC Methods$
#-----------------$
#this method checks if the specified Process is running on the server hosting the module
#if not it will be started
def
CheckCMProcess
(
self
,
CNType
):
HOST
=
self
.
HostUsername
+
'@'
+
self
.
HostIPAddress
COMMAND
=
"ps aux | grep --colour=never "
+
self
.
Process
[
'Name'
]
+
" | grep -v grep "
logging
.
debug
(
COMMAND
)
ssh
=
subprocess
.
Popen
([
"ssh"
,
"%s"
%
HOST
,
COMMAND
],
shell
=
False
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
)
result
=
ssh
.
stdout
.
readlines
()
if
len
(
result
)
!=
0
:
logging
.
debug
(
self
.
Process
[
'Name'
]
+
" process found"
)
return
True
else
:
#start process and check again
logging
.
debug
(
self
.
Process
[
'Name'
]
+
" process NOT found"
)
#starting the process
logging
.
debug
(
'Starting '
+
self
.
Process
[
'Name'
])
mySSH
=
sshconnection
.
SSHConnection
()
mySSH
.
open
(
self
.
HostIPAddress
,
self
.
HostUsername
,
self
.
HostPassword
)
mySSH
.
command
(
'echo '
+
self
.
HostPassword
+
' | sudo -S rm -f /tmp/quectel-cm.log'
,
'\$'
,
5
)
mySSH
.
command
(
'echo $USER; echo '
+
self
.
HostPassword
+
' | nohup sudo -S stdbuf -o0 '
+
self
.
Process
[
'Cmd'
]
+
' '
+
self
.
Process
[
'Apn'
][
CNType
]
+
' > /tmp/quectel-cm.log 2>&1 &'
,
'\$'
,
5
)
mySSH
.
close
()
#checking the process
def
initialize
(
self
):
if
self
.
trace
:
raise
Exception
(
"UE tracing not implemented yet"
)
self
.
_enableTrace
()
# we first terminate to make sure the UE has been stopped
if
self
.
cmd_dict
[
"detach"
]:
self
.
_command
(
self
.
cmd_dict
[
"detach"
],
silent
=
True
)
self
.
_command
(
self
.
cmd_dict
[
"terminate"
],
silent
=
True
)
self
.
_command
(
self
.
cmd_dict
[
"initialize"
])
def
terminate
(
self
):
self
.
_command
(
self
.
cmd_dict
[
"terminate"
])
if
self
.
trace
:
raise
Exception
(
"UE tracing not implemented yet"
)
self
.
_disableTrace
()
return
self
.
_logCollect
()
return
None
def
attach
(
self
,
attach_tries
=
4
,
attach_timeout
=
60
):
ip
=
None
while
attach_tries
>
0
:
self
.
_command
(
self
.
cmd_dict
[
"attach"
])
timeout
=
attach_timeout
logging
.
debug
(
"Waiting for IP address to be assigned"
)
while
timeout
>
0
and
not
ip
:
time
.
sleep
(
5
)
HOST
=
self
.
HostUsername
+
'@'
+
self
.
HostIPAddress
COMMAND
=
"ps aux | grep --colour=never "
+
self
.
Process
[
'Name'
]
+
" | grep -v grep "
logging
.
debug
(
COMMAND
)
ssh
=
subprocess
.
Popen
([
"ssh"
,
"%s"
%
HOST
,
COMMAND
],
shell
=
False
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
)
result
=
ssh
.
stdout
.
readlines
()
if
len
(
result
)
!=
0
:
logging
.
debug
(
self
.
Process
[
'Name'
]
+
" process found"
)
return
True
timeout
-=
5
ip
=
self
.
getIP
()
if
ip
:
break
logging
.
warning
(
f"UE did not receive IP address after
{
attach_timeout
}
s, detaching"
)
attach_tries
-=
1
self
.
_command
(
self
.
cmd_dict
[
"detach"
])
time
.
sleep
(
5
)
if
ip
:
logging
.
debug
(
f'
\u001B
[1mUE IP Address is
{
ip
}
\u001B
[0m'
)
else
:
logging
.
debug
(
self
.
Process
[
'Name'
]
+
" process NOT found"
)
return
False
logging
.
debug
(
'
\u001B
[1;37;41mUE IP Address Not Found!
\u001B
[0m'
)
return
ip
#Generic command function, using function pointers dictionary
def
Command
(
self
,
cmd
):
mySSH
=
sshconnection
.
SSHConnection
()
mySSH
.
open
(
self
.
HostIPAddress
,
self
.
HostUsername
,
self
.
HostPassword
)
mySSH
.
command
(
'echo '
+
self
.
HostPassword
+
' | sudo -S python3 '
+
self
.
cmd_dict
[
cmd
],
'\$'
,
10
)
time
.
sleep
(
5
)
logging
.
debug
(
"Module "
+
cmd
)
mySSH
.
close
()
def
detach
(
self
):
self
.
_command
(
self
.
cmd_dict
[
"detach"
])
def
check
(
self
):
cmd
=
self
.
cmd_dict
[
"check"
]
if
cmd
:
return
self
.
_command
(
cmd
)
else
:
logging
.
warning
(
f"requested status check of UE
{
self
.
getName
()
}
, but operation is not supported"
)
return
f"UE
{
self
.
getName
()
}
does not support status checking"
#this method retrieves the Module IP address (not the Host IP address)
def
GetModuleIPAddress
(
self
):
HOST
=
self
.
HostUsername
+
'@'
+
self
.
HostIPAddress
response
=
[]
tentative
=
8
while
(
len
(
response
)
==
0
)
and
(
tentative
>
0
):
COMMAND
=
"ip a show dev "
+
self
.
UENetwork
+
" | grep --colour=never inet | grep "
+
self
.
UENetwork
if
tentative
==
8
:
logging
.
debug
(
COMMAND
)
ssh
=
subprocess
.
Popen
([
"ssh"
,
"%s"
%
HOST
,
COMMAND
],
shell
=
False
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
)
response
=
ssh
.
stdout
.
readlines
()
tentative
-=
1
time
.
sleep
(
2
)
if
(
tentative
==
0
)
and
(
len
(
response
)
==
0
):
logging
.
debug
(
'
\u001B
[1;37;41m Module IP Address Not Found! Time expired
\u001B
[0m'
)
return
-
1
else
:
#check response
result
=
re
.
search
(
'inet (?P<moduleipaddress>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)'
,
response
[
0
].
decode
(
"utf-8"
)
)
if
result
is
not
None
:
if
result
.
group
(
'moduleipaddress'
)
is
not
None
:
self
.
UEIPAddress
=
result
.
group
(
'moduleipaddress'
)
logging
.
debug
(
'
\u001B
[1mUE Module IP Address is '
+
self
.
UEIPAddress
+
'
\u001B
[0m'
)
return
0
def
dataEnable
(
self
):
cmd
=
self
.
cmd_dict
[
"dataEnable"
]
if
cmd
:
self
.
_command
(
cmd
)
return
True
else
:
logging
.
debug
(
'
\u001B
[1;37;41m Module IP Address Not Found!
\u001B
[0m'
)
return
-
1
message
=
f"requested enabling data of UE
{
self
.
getName
()
}
, but operation is not supported"
logging
.
error
(
message
)
return
False
def
dataDisable
(
self
):
cmd
=
self
.
cmd_dict
[
"dataDisable"
]
if
cmd
:
self
.
_command
(
cmd
)
return
True
else
:
logging
.
debug
(
'
\u001B
[1;37;41m Module IP Address Not Found!
\u001B
[0m'
)
return
-
1
def
CheckModuleMTU
(
self
):
HOST
=
self
.
HostUsername
+
'@'
+
self
.
HostIPAddress
response
=
[]
tentative
=
3
while
(
len
(
response
)
==
0
)
and
(
tentative
>
0
):
COMMAND
=
"ip a show dev "
+
self
.
UENetwork
+
" | grep --colour=never mtu"
logging
.
debug
(
COMMAND
)
ssh
=
subprocess
.
Popen
([
"ssh"
,
"%s"
%
HOST
,
COMMAND
],
shell
=
False
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
)
response
=
ssh
.
stdout
.
readlines
()
tentative
-=
1
time
.
sleep
(
10
)
if
(
tentative
==
0
)
and
(
len
(
response
)
==
0
):
logging
.
debug
(
'
\u001B
[1;37;41m Module NIC MTU Not Found! Time expired
\u001B
[0m'
)
return
-
1
else
:
#check response
result
=
re
.
search
(
'mtu (?P<mtu>[0-9]+)'
,
response
[
0
].
decode
(
"utf-8"
)
)
if
result
is
not
None
:
if
(
result
.
group
(
'mtu'
)
is
not
None
)
and
(
str
(
result
.
group
(
'mtu'
))
==
str
(
self
.
MTU
))
:
message
=
f"requested disabling data of UE
{
self
.
getName
()
}
, but operation is not supported"
logging
.
error
(
message
)
return
False
def
getIP
(
self
):
output
=
self
.
_command
(
self
.
cmd_dict
[
"getNetwork"
],
silent
=
True
)
result
=
re
.
search
(
'inet (?P<ip>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)'
,
output
)
if
result
and
result
.
group
(
'ip'
):
ip
=
result
.
group
(
'ip'
)
return
ip
return
None
def
checkMTU
(
self
):
output
=
self
.
_command
(
self
.
cmd_dict
[
"getNetwork"
],
silent
=
True
)
result
=
re
.
search
(
'mtu (?P<mtu>[0-9]+)'
,
output
)
if
result
and
result
.
group
(
'mtu'
)
and
int
(
result
.
group
(
'mtu'
))
==
self
.
MTU
:
logging
.
debug
(
'
\u001B
[1mUE Module NIC MTU is '
+
str
(
self
.
MTU
)
+
' as expected
\u001B
[0m'
)
return
0
else
:
logging
.
debug
(
'
\u001B
[1;37;41m Incorrect Module NIC MTU '
+
str
(
result
.
group
(
'mtu'
))
+
'! Expected : '
+
str
(
self
.
MTU
)
+
'
\u001B
[0m'
)
return
-
1
return
True
else
:
logging
.
debug
(
'
\u001B
[1;37;41m Module NIC MTU Not Found!
\u001B
[0m'
)
return
-
1
logging
.
debug
(
'
\u001B
[1;37;41m Incorrect Module NIC MTU or MTU not found! Expected: '
+
str
(
self
.
MTU
)
+
'
\u001B
[0m'
)
return
False
def
getName
(
self
):
return
self
.
module_name
def
getIFName
(
self
):
return
self
.
interface
def
EnableTrace
(
self
):
if
self
.
ue_trace
==
"yes"
:
def
getHost
(
self
):
return
self
.
host
def
getCmdPrefix
(
self
):
return
self
.
cmd_prefix
if
self
.
cmd_prefix
else
""
def
_enableTrace
(
self
):
raise
Exception
(
"not implemented"
)
mySSH
=
sshconnection
.
SSHConnection
()
mySSH
.
open
(
self
.
HostIPAddress
,
self
.
HostUsername
,
self
.
HostPassword
)
#delete old artifacts
mySSH
.
command
(
'echo '
+
self
.
HostPassword
+
' | sudo -S rm -rf ci_qlog'
,
'\$'
,
5
)
mySSH
.
command
(
'echo '
+
' '
+
' | sudo -S rm -rf ci_qlog'
,
'\$'
,
5
)
#start Trace, artifact is created in home dir
mySSH
.
command
(
'echo $USER; nohup sudo -E QLog/QLog -s ci_qlog -f NR5G.cfg > /dev/null 2>&1 &'
,
'\$'
,
5
)
mySSH
.
close
()
def
DisableTrace
(
self
):
def
_disableTrace
(
self
):
raise
Exception
(
"not implemented"
)
mySSH
=
sshconnection
.
SSHConnection
()
mySSH
.
open
(
self
.
HostIPAddress
,
self
.
HostUsername
,
self
.
HostPassword
)
mySSH
.
command
(
'echo '
+
self
.
HostPassword
+
' | sudo -S killall --signal=SIGINT *QLog*'
,
'\$'
,
5
)
mySSH
.
command
(
'echo '
+
' '
+
' | sudo -S killall --signal=SIGINT *QLog*'
,
'\$'
,
5
)
mySSH
.
close
()
def
DisableCM
(
self
):
mySSH
=
sshconnection
.
SSHConnection
()
mySSH
.
open
(
self
.
HostIPAddress
,
self
.
HostUsername
,
self
.
HostPassword
)
mySSH
.
command
(
'echo '
+
self
.
HostPassword
+
' | sudo -S killall --signal SIGKILL *'
+
self
.
Process
[
'Name'
]
+
'*'
,
'\$'
,
5
)
mySSH
.
close
()
def
LogCollect
(
self
):
if
self
.
ue_trace
==
"yes"
:
def
_logCollect
(
self
):
raise
Exception
(
"not implemented"
)
mySSH
=
sshconnection
.
SSHConnection
()
mySSH
.
open
(
self
.
HostIPAddress
,
self
.
HostUsername
,
self
.
HostPassword
)
#archive qlog to USB stick in /media/usb-drive/ci_qlogs with datetime suffix
...
...
@@ -194,11 +216,9 @@ class Module_UE:
source
=
'ci_qlog'
destination
=
self
.
LogStore
+
'/ci_qlog_'
+
now_string
+
'.zip'
#qlog artifact is zipped into the target folder
mySSH
.
command
(
'echo $USER; echo '
+
self
.
HostPassword
+
' | nohup sudo -S zip -r '
+
destination
+
' '
+
source
+
' > /dev/null 2>&1 &'
,
'\$'
,
10
)
mySSH
.
command
(
'echo $USER; echo '
+
' '
+
' | nohup sudo -S zip -r '
+
destination
+
' '
+
source
+
' > /dev/null 2>&1 &'
,
'\$'
,
10
)
mySSH
.
close
()
#post action : log cleaning to make sure enough space is reserved for the next run
Log_Mgt
=
cls_log_mgt
.
Log_Mgt
(
self
.
HostUsername
,
self
.
HostIPAddress
,
self
.
HostPassword
,
self
.
LogStore
)
Log_Mgt
.
LogRotation
()
else
:
destination
=
""
return
destination
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment