Commit 645a2c16 authored by Sensing's avatar Sensing

Delete pipeline.py

parent 71f667bf
'''
data : 2021_10_22
function : pipeline for IO and CPU
'''
import threading
import queue
import udp
from xy_extract import *
from xy_plot import *
from motionflag import *
flen, blen, slen = 100, 1000, 500 # 100滤波前摇+1000投影+100滤波后摇 = 1200CSI,每次进入500CSI
thr = 0.45
def IO_udp(csi_queue: queue.Queue):
# 初始化
print('task io_udp is starting...')
try:
counter = 0
start_flag = True # 控制接受1200包(启动时间)还是接受500的包(正常工作)
tfi = int((2 * flen + blen) / 100) # 启动时间
raw_CSI = np.zeros((3, 30, 2 * flen + blen), dtype=complex) # 一个csi.deque元素,3*30*1200
store_CSI = np.zeros((3, 30, 2 * flen + blen - slen), dtype=complex) # 承载队列更新工作, 3*30*700
s = udp.udp_init(5563) # create a udp handle 指定端口
# 不断接受udp包
while True:
data, _ = udp.recv(s) # receive a udp socket
Info = []
for i in range(1, len(data)):
Info.append(data[i]) # decode csi from udp
CSI = read_one(Info) # print(CSI.shape) (3, 30, 1)
# 获取raw_csi
if start_flag: # 启动时间:填满CSI整个队列
raw_CSI[:, :, counter] = CSI[:, :, 0]
counter += 1
# 填满后
if counter == 1200:
csi_queue.put(raw_CSI) # 入队
counter = 0 # 计数器归零
store_CSI = raw_CSI[:, :, slen:] # 丢弃0:slen所有包
start_flag = False # 启动完毕
# 启动报时
if counter % 100 == 0:
print('初始启动时间总计{}s: 当前{}s'.format(tfi, counter / 100))
else: # 工作时间
# 更新raw_CSI 后slen个包 之前的所有包
if counter == 0:
raw_CSI[:, :, :-slen] = store_CSI
# 更新raw_CSI的 最后slen个包
raw_CSI[:, :, store_CSI.shape[2] + counter] = CSI[:, :, 0]
counter += 1
if counter == slen: # 本轮更新结束
print(threading.current_thread().name, ": csi.queue.size=", csi_queue.qsize())
csi_queue.put(raw_CSI) # 入队
counter = 0 # 计数器归零
store_CSI = raw_CSI[:, :, slen:] # 丢弃0:slen所有包
except KeyboardInterrupt:
udp.close(s) # close udp
def CPU_extract(csi_queue: queue.Queue, wave_queue: queue.Queue):
print('task cpu_extract is starting...')
his = 0
while True:
raw_CSI = csi_queue.get()
print(threading.current_thread().name, ": csi.queue.size=", csi_queue.qsize())
flag = motion_detect(raw_CSI[:, :, -slen:], thr)
if flag:
wave = extract(raw_CSI, his)
else: # 如果检测到突发性大动作
oc = np.random.randn(slen) / 10000
oc = (oc - (oc[0] - his))
rpm = 0
wave = [list(oc), rpm]
his = wave[0][-1]
wave_queue.put(wave)
def CPU_plot(wave_queue: queue.Queue):
try:
print('task cpu_plot is starting...')
f = Display() # initialize the realview procedure
f.display()
while True:
wave = wave_queue.get()
print(threading.current_thread().name, ": wave.queue.size=", wave_queue.qsize())
for i in range(slen):
f.push([wave[0][i], wave[1]])
time.sleep(0.005)
except RuntimeError:
f = f.stop()
print('task cpu_plot is starting...')
f = Display() # initialize the realview procedure
f.display()
while True:
wave = wave_queue.get()
print(threading.current_thread().name, ": wave.queue.size=", wave_queue.qsize())
for i in range(slen):
f.push([wave[0][i], wave[1]])
time.sleep(0.005)
def real_camera():
print('task camera is starting...')
import camera
if __name__ == "__main__":
csi_queue = queue.Queue()
wave_queue = queue.Queue()
t_io = threading.Thread(target=IO_udp, args=(csi_queue,), name="IO_udp")
t_extract = threading.Thread(target=CPU_extract, args=(csi_queue, wave_queue), name="CPU_extract")
t_plot = threading.Thread(target=CPU_plot, args=(wave_queue,), name="CPU_plot")
t_camera = threading.Thread(target=real_camera)
t_io.start()
t_extract.start()
t_plot.start()
t_camera.start()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment