Commit 9b65203e authored by joliu's avatar joliu

Merge branch 'full-function' into 'master'

Full function

See merge request xidiancos/Opencps!2
parents 14ac0762 fd49891e
<<<<<<< README.md
# OpenCPS
## 自组织无中心物联网操作系统
## 无中心式物联网系统
### 新版本更新历史
##### 2019-4-12
1. 新增节点状态信息实时上报的功能
##### 2019-4-14
1. 构建矩阵解析程序
##### 2019-4-15
1. 重新设计完成新版的数据库
2. 基于矩阵解析程序构建完整任务处理逻辑程序
3. 完成任务自动循环处理的功能模块
4. 全部输入参数改为从数据库中调用
5. 新建DataCach数据库的控制函数
下一步计划:
1. 设计循环检测传感器数据并发送模块
2. 接收其他模块传递的数据并保存在dataCach数据库中
3. 构建基本控制器模块
4. 构建基于web控制的sanic脚本
##### 2019-4-16
1. 完成昨日计划1
2. 完成昨日计划3
##### 2019-4-21
1. 成功构建新版本的docker镜像,由于使用numpy需要安装额外的安装包
2. 完成在Docker上的测试功能
3. 实现基本的控制流程
下一步计划:
1. 将传感器数据传入云端数据库
2. 写一个好用的控制器脚本
3. 添加一个通过设备id查找相关配置信息的功能函数
4. 优化各个agent数据库内容显示
##### 2019-5-9
1. 构建命令行控制终端,模拟出物联网操作系统的样子
2. 添加外部直接控制传感器模块的任务模式
备注 : 上一次计划未完成,待续
### 迭代历史
##### 2019-3-21
......@@ -26,6 +71,15 @@
3. 搭建同时执行多条任务代码
## 版本说明
该版本由master版本迭代,在稳定运行基础上,添加如下新功能:
1. 添加故障检测功能
2. 添加在线设备检测功能
3. 优化控制接口,暂定`create`,`addtask`,`start`,`stop`,`delete`,`test`功能
注:multi-task多任务分支版本未完全开发完成,该版本为multi-task版本的拓展版本。
## 上一版本说明
智能体虚拟化模块主要有两大功能
......
import subprocess
import pymysql
def Print(msg):
print(msg)
def Input():
print("\033[1;35mopencps $ \033[0m", end='')
return input()
command_list = subprocess.getoutput('cd opencps && ls *.py').split('\n')
commands = [x.split('.')[0] for x in command_list if x != 'controler.py']
while True:
cmd = Input()
if cmd in ['quit', 'bye', 'exit']:
Print("bye")
break
if cmd[0] == '%':
c_list = [x for x in cmd[1:].split(' ')]
if c_list[0] == 'cd':
print("not support command")
continue
subprocess.call((c_list))
continue
if not cmd.split(' ')[0] in commands:
Print('Illegal Command!')
continue
else:
'''
c_list = cmd.split(' ')
run_commands = "python3 "
c_list[0] = 'opencps/' + c_list[0] + '.py'
c_list = ['python'] + c_list
subprocess.call((c_list))
'''
c_list = cmd.split(' ')
run_commands = "cd opencps&&python3 " + c_list[0] + '.py '
for i in range(1, len(c_list)):
run_commands = run_commands + c_list[i] + ' '
print(subprocess.getoutput(run_commands))
'''
***********************************************
* 智能体控制器函数 *
* 功能描述: *
* 1. 接收应用层的控制指令 *
* 2. 向智能体传递控制指令 *
* 3. 执行突发事件请求 *
* 4. 接收智能体未标识控制指令(暂未开发) *
* *
* author: joliu<joliu@s-an.org> *
* date: 2018-3-23 *
***********************************************
'''
# 通过socket传递信息
def sendBySocket(ip, port, cmd):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error as err_msg:
print("Error creating socket:%s" % err_msg)
s.close()
return (-1, err_msg)
try:
s.connect((ip, port))
except socket.error as err_msg:
print("Address-related error connecting to server: %s" % err_msg)
s.close()
return (-1, err_msg)
print("****************send:" + cmd)
try:
s.sendall(cmd.encode())
except socket.error as err_msg:
print("Error sending data: %s" % err_msg)
s.close()
return (-1, err_msg)
try:
response = s.recv(1024).decode()
except socket.error as err_msg:
print("Error receiving data: %s" % err_msg)
s.close()
return (-1, err_msg)
try:
response = s.recv(1024).decode()
except socket.error as err_msg:
print("Error receiving data: %s" % err_msg)
s.close()
return (-1, err_msg)
print(str(response))
s.close()
return (1, str(response))
# socket实现版本,也可以基于RESTful API实现
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
'''
监听控制请求
'''
def handle(self):
try:
# 接受socket消息
data = self.request.recv(1024).decode()
except socket.error as err_msg:
(status, output) = (-1, err_msg)
print("recv error!")
exit(1)
# 根据传递的消息进行操作
(status, output) = chooseTask(data)
# 返回控制结果
jresp = json.dumps((status, str(output)))
try:
self.request.sendall(jresp.encode())
except socket.error as err_msg:
print("socket send failed:%s" % err_msg)
exit(1)
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
if __name__ == "__main__":
# 设置host和port
HOST, PORT = "0.0.0.0", 3000
logger = logging.getLogger("TCPServer")
logger.setLevel(logging.INFO)
# 创建句柄
fh = logging.FileHandler("1.log")
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s -\
%(message)s')
# 添加句柄到logger类
logger.addHandler(fh)
logger.info("Program started")
socketserver.TCPServer.allow_reuse_address = True
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
ip, port = server.server_address
# 启动多进程监听服务
server_thread = threading.Thread(target=server.serve_forever)
# 当主进程中断时退出程序
server_thread.daemon = True
server_thread.start()
logger.info("Server loop running in thread:" + server_thread.name)
logger.info("....waiting for connection")
# 使用control + C 退出程序
server.serve_forever()
import socket
import sys
import numpy as np
import json
def execute(port, message):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("192.168.12.19", port))
s.sendall(message.encode())
response = s.recv(1024).decode()
return json.loads(response)
def Print(data):
print(format(data, "<20"), end='')
def PrintTitle(title):
for t in title:
Print(t)
print('')
import pymysql
def showDevice():
output = ''
try:
sql = 'select * from deviceinfo'
conn = pymysql.connect(host='127.0.0.1', port=12306, user='root', passwd='Vudo3423', db='HiDockerwifi', charset='utf8')
cur = conn.cursor()
cur.execute(sql)
output = cur.fetchall()
#print(output)
except pymysql.Error as err_msg:
print(err_msg)
except Exception as err_msg:
print(err_msg)
finally:
cur.close()
conn.close()
return output
if __name__ == "__main__":
device_data = showDevice()
print(format("deviceID", "<20"), format("IP", "<20"), format("status", "<20"), format("network status"))
for device in device_data:
if device[3] == 1:
status = 'on-line'
else:
status = 'off-line'
#print("%s\t\t%s\t%s\t%s" % (device[1], device[2], status, device[4]))
print(format(device[1], "<20"), format(device[2], "<20"), format(status, "<20"), format(device[4]))
from controler import execute
import sys
import pymysql
import json
def findDevicePort(deviceID):
output = ''
try:
sql = 'select port from portdb where equip="%s"' % deviceID
conn = pymysql.connect(host='127.0.0.1', port=12306, user='root', passwd='Vudo3423', db='HiDockerwifi', charset='utf8')
cur = conn.cursor()
cur.execute(sql)
output = cur.fetchone()
#print(output)
except pymysql.Error as err_msg:
print(err_msg)
except Exception as err_msg:
print(err_msg)
finally:
cur.close()
conn.close()
return output
def Print(data):
print(format(data, "<20"), end='')
def PrintTitle(title):
for t in title:
Print(t)
print('')
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: nodeID database_name")
exit(0)
port = findDevicePort(sys.argv[1])[0]
cmd = "controller&show&%s" % sys.argv[2]
(status, output) = execute(port, cmd)
if status == 0:
print("can't find this database, please check your input")
if sys.argv[2] == 'datacach':
output = json.loads(output)
title_list = ['DeviceID', 'Data', 'Updata Time', 'Status']
PrintTitle(title_list)
if output == []:
output = None
else:
for items in output:
items.remove(items[0])
for item in items:
Print(item)
print('')
elif sys.argv[2] == 'decide':
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Program started
FROM opengn.org:32333/easypi/alpine-arm
MAINTAINER joliu<joliu@s-an.org>
ENV TIME_ZONE Asia/Shanghai
RUN apk --update add python3 py3-pip tzdata
RUN pip3 install pymysql
RUN apk --update add python3 py3-pip tzdata make cmake gcc g++ gfortran python3-dev
RUN pip3 install pymysql cython numpy -i https://pypi.douban.com/simple
RUN echo "${TIME_ZONE}" > /etc/timezone \
&& ln -sf /usr/share/zoneinfo/${TIME_ZONE} /etc/localtime
WORKDIR /data
COPY listenSer.py /data
COPY workProcess.py /data
COPY testDevice.py /data
COPY getData.py /data
COPY resolveMatrix.py /data
COPY controllMatrix.py /data
COPY start.sh /data
CMD sh /data/start.sh
'''
***********************************************************
* 控制矩阵数据库的相关函数 *
* 1. resulovetable *
* author: joliu<joliu@s-an.org> *
* date: 2018-4-14 *
***********************************************************
'''
import sqlite3
import numpy as np
import json
import time
# ~~~~~~~~~~~~~~~~~~~inputDB操作函数~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# 更新任务执行控制表
def updateDeviceTask(data, dstIP, ctime):
sql = "select * from decidedestination where dst='%s'" % dstIP
(status, output) = sendToDB(sql)
print(output)
print('***********',output)
if status == -1:
return (status, output)
if output == []:
sql = "insert into decidedestination (data,dst,ctime) values \
('%s','%s',%d)" % (data, dstIP, ctime)
else:
sql = "update decidedestination set data='%s',dst='%s',ctime=%d" % \
(data, dstIP, ctime)
return sendToDB(sql)
# 获取任务执行控制表
def getDeviceTask():
sql = "select * from decidedestination"
(status, output) = sendToDB(sql)
if status == -1:
return (status, output)
# print(output)
return (status, output)
# ~~~~~~~~~~~~~~~~~~~dataCach操作函数~~~~~~~~~~~~~~~~~~~~~~~~~~~
# 更新设备输入数据信息
def updateDataCach(device, decideValue):
# 检查是否存在该设备在预接收的列表中,如果不存在则丢弃
sql = "select * from datacach where deviceid='%s'" % device
(status, output) = sendToDB(sql)
if status == -1:
return (-1, output)
if output == []:
return (-1, "drop this data")
sql = "update datacach set data='%s', updatetime='%s' where deviceid='%s'" \
% (decideValue, time.time(), device)
return sendToDB(sql)
# 插入设备输入节点信息
def insertDataIntoDataCach(device):
# 检查是否存在当前输入,如果有则插入
sql = "select * from datacach where deviceid='%s'" % device
(status, output) = sendToDB(sql)
if status == -1:
return (status, output)
if output != []:
return (1, 'device has been added')
# 在配置输入信息节点时,使用该函数插入
sql = "insert into datacach (deviceid, updatetime, groupid) values \
('%s', '%s', %d)" % (device, time.time(), 1)
return sendToDB(sql)
# 获取输入数据库的内容
def getDataFromDataCach(device):
sql = "select data from datacach where deviceid='%s'" % device
return sendToDB(sql)
# 判断是否存在相应的设备数据缓存信息
def existDevice(device):
(status, output) = getDataFromDataCach(device)
print(device, output)
if status == -1:
retrun (status, output)
if output == []:
return (1, False)
else:
return (1, True)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# 以下都是对output的数据库,即存储任务矩阵的数据库的操作
# 获取设备列表
def getDeviceListFromDB():
sql = "select devicelist from resulovetable"
(status, output) = sendToDB(sql)
if status == -1:
return (status, output)
if output == []:
return (-1, 'no deviceList')
return (status, output[0][0])
# 获取任务执行状态
def getTaskStatus():
sql = "select status from resulovetable"
(status, output) = sendToDB(sql)
print(output)
if status == -1:
return (status, output)
if output == []:
return (-1, 'no deviceList')
return (status, output[0][0])
# 更新任务执行状态
def updateTaskStatus(taskStatus):
(status, output) = getTaskStatus()
if status == -1:
return (status, output)
sql = 'update resulovetable set status=%d' % taskStatus
return sendToDB(sql)
# 插入任务到数据库
def insertDB(taskMatrix, inputTypeList, deviceList, status, ctime):
sql = "insert into resulovetable \
(taskmatrix,inputtype, status, devicelist, ctime) values \
('%s','%s',%d,'%s',%d)" % (taskMatrix, inputTypeList, status, deviceList, ctime)
return sendToDB(sql)
def sendToDB(sql):
try:
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute(sql)
status = 1
if sql.split(' ')[0] == 'select':
output = cursor.fetchall()
else:
conn.commit()
output = 'success'
except sqlite3.Error as err_msg:
(status, output) = (-1, err_msg)
except Exception.Error as err_msg:
(status, output) = (-1, err_msg)
finally:
cursor.close()
conn.close()
return (status, output)
def getTaskFromDB():
# 查询最新一条可用任务
sql = 'select * from resulovetable order by id desc limit 0,1'
(status, output) = sendToDB(sql)
return output
def getTaskFromDBByID(id):
# 从数据库中获取数据
sql = 'select * from resulovetable where id=%d' % id
(status, output) = sendToDB(sql)
return output[0]
def getValueByNodeID(nodeid):
# 获取devicelist
(id, inputTask, inputTypeList, status, deviceList, ctime) = getTaskFromDB()[0]
device = json.loads(deviceList)[nodeid]
if device == -1:
return (-1, "not a device")
data = getDataFromDataCach(device)
if data[1] == []:
return (-1, "not found device")
if data[1][0][0] == '-':
# 这里将缺省值都默认设置为0
return (1, 0)
return (1, int(data[1][0][0]))
# 获取循环时间
def getCircleTime():
data = getTaskFromDB()[0]
if data == ():
return 5
return data[5]
def showDB(tableName):
# 打印数据库内容
sql = "select * from %s" % tableName
return sendToDB(sql)
def clearDB():
# 清空数据库
dbNames = ['resulovetable', 'datacach', 'decidedestination']
for name in dbNames:
sql = "delete from %s" % name
(status, output) = sendToDB(sql)
if status == -1:
break
return (status, output)
if __name__ == '__main__':
'''
clearDB()
inputTask = np.array([[0,0,1,0,0,0,0,0],\
[0,0,1,0,0,0,0,0],\
[0,0,0,1,0,0,0,0],\
[0,0,0,0,1,0,0,0],\
[0,0,0,0,0,0,0,0],\
[1,0,0,0,0,0,0,0],\
[1,0,0,0,0,0,0,0],\
[0,1,0,0,0,0,0,0]])
inputTypeList = np.array([1,3,2,5,0,4,4,4])
inputDeviceData = np.array([0,0,0,5,0,1,1,0])
jsnInputTask = json.dumps(inputTask.tolist())
jsnInputTypeList = json.dumps(inputTypeList.tolist())
deviceList = [-1,-1,-1,"delay1","switch103","dht102","dht103","dht104"]
jsnDeviceList = json.dumps(deviceList)
insertDB(jsnInputTask, jsnInputTypeList, jsnDeviceList, 1, 2)
'''
# 添加输入信息到输入信息缓存中
data = [["delay1", "dht102", "dht103", "dht104"],[10,1,0,1]]
for i in range(len(data[0])):
insertDataIntoDataCach(data[0][i])
updateDataCach(data[0][i], data[1][i])
#showDB()
(id, inputTask, inputTypeList, status, deviceList, ctime) = getTaskFromDB()[0]
#print(inputTask)
print(getValueByNodeID(6))
from controllMatrix import *
from listenSer import sendBySocket
import os
import time
import socket
# 发送控制指令到传感器设备,参数cmd,控制命令
def sendCommandToDevice(cmd):
response = 'error'
# 通过容器的环境变量HOST获取绑定传感器的IP地址
ip, port = os.getenv('HOST'), 8085
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error as err_msg:
print("Error creating socket:%s" % err_msg)
s.close()
return (-1, err_msg)
try:
s.connect((ip, port))
except socket.gaierror as err_msg:
print('Address-related error connecting to server: %s' % err_msg)
s.close()
return (-1, err_msg)
try:
s.sendall(cmd.encode())
except socket.error as err_msg:
print("Error sending data: %s" % err_msg)
s.close()
return (-1, err_msg)
try:
response = s.recv(1024).decode()
except socket.error as err_msg:
print('Error receiving data: %s' % err_msg)
s.close()
return (-1, err_msg)
s.close()
# 程序运行正常,返回传感器传递的值
return (1, str(response))
while True:
ctime = 5
(status, output) = getDeviceTask()
if status == -1:
print(output)
continue
for data in getDeviceTask()[1]:
(id, data, dstIP, ctime) = data
(ip, port) = dstIP.split(':')
print(data)
(status, deviceValue) = sendCommandToDevice('on')
if status == -1:
print(deviceValue)
continue
time.sleep(2)
try:
deviceValue = float(deviceValue.split('&')[0])
except Exception as err:
print(err,deviceValue)
if deviceValue > data:
print('1')
msg = 'device&%s&1' % os.getenv('HOSTNAME')
else:
print('0')
msg = 'device&%s&0' % os.getenv('HOSTNAME')
(status, recvdata) = sendBySocket(ip, int(port), msg)
time.sleep(ctime)
......@@ -5,9 +5,12 @@
* 1. 监听智能体控制器设置请求 *
* 2. 循环处理任务队列中的任务 *
* 3. 接收请求并执行 *
* 4. 添加新的任务类型 *
* 5. 拓展输入输出数据库 *
* *
* author: joliu<joliu@s-an.org> *
* date: 2018-3-21 *
* modify: 2018-4-16 *
**************************************************
'''
......@@ -21,9 +24,12 @@ import subprocess
import logging
import sqlite3
from controllMatrix import *
# 两种控制模式,controller:控制器写入控制命令,device:接收其他传感器控制命令
controlModeList = ['controller', 'device']
controlMethodList = ['add', 'rm', 'clear', 'period', 'show']
controlMethodList = ['addInput','addOutput', 'addCach', 'rm', 'clear', 'period', 'show', 'start', 'stop', 'run']
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
'''
......@@ -63,23 +69,26 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
else:
# 匹配控制指令做出相应操作
(status, output) = executeCommand(command, message[2:])
print(message[2:])
# 监听来自device hfv模块的控制请求
elif controlMode == "device":
command = message[1]
deviceID = message[1]
data = message[2]
# 发送控制请求
(status, output) = sendCommandToDevice(command)
# (status, output) = (1, "test")
# (status, output) = sendCommandToDevice(command)
(status, output) = updateDataCach(deviceID, int(data))
print(deviceID, data)
#(status, output) = (1, "test")
else:
pass
else:
print("illegal controlMode")
print("illegal controlMode!")
(status, output) = (-1, 'illegal controlMode')
errorFlag = True
# 返回控制命令执行结果
jresp = json.dumps((status, str(output)))
jresp = json.dumps((status, json.dumps(output)))
try:
self.request.sendall(jresp.encode())
except socket.error as err_msg:
......@@ -137,31 +146,37 @@ def sendBySocket(ip, port, cmd):
# 执行控制指令
def executeCommand(command, information):
if command == "add":
# 目前假设information就是全部控制指令
task = information[0]
ctime = information[1]
print("****************")
print(task)
print(ctime)
(status, output) = insertDB(task, ctime)
print(output)
elif command == "clear":
# 清空任务队列
(status, output) = clearDB()
elif command == "period":
ctime = information[0]
# 设置查询循环周期
(status, output) = updatePeriod(ctime)
elif command == "show":
(status, output) = showDB()
print(output)
else:
# 可能由于更新可执行任务列表,而未实现功能导致的问题
(status, output) = (-1, "method isn't ready")
return (status, output)
# comand:input/output, information:将要存入数据库的内容
if command == 'addInput':
(data, dstIP, circleTime) = information
return updateDeviceTask(data, dstIP, int(circleTime))
elif command == 'addOutput':
clearDB()
(taskMatrixJOSN, deviceTypeListJOSN, deviceListJOSN, taskStatus,\
circleTime) = information
return insertDB(taskMatrixJOSN, deviceTypeListJOSN, deviceListJOSN,\
int(taskStatus), int(circleTime))
elif command == 'addCach':
data = information[0]
data = json.loads(data)
for i in range(len(data[0])):
(status, output) = insertDataIntoDataCach(data[0][i])
(status, output) = updateDataCach(data[0][i], data[1][i])
return (status, output)
elif command == 'show':
DBName = information[0]
return showDatabase(DBName)
elif command == 'clear':
return clearDB()
elif command == 'start':
return updateTaskStatus(1)
elif command == 'stop':
return updateTaskStatus(0)
elif command == 'run':
method = information[0]
return sendCommandToDevice(method)
# 创建数据库
def createDB():
......@@ -174,6 +189,53 @@ def createDB():
cursor.close()
conn.close()
# 创建输入数据库
def createInputDB():
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute("""CREATE TABLE if not exists `decidedestination` (
`id` integer primary key autoincrement,
`data` tinyint NOT NULL,
`dst` varchar(3) NOT NULL,
`ctime` tinyint DEFAULT 2)
""")
conn.commit()
cursor.close()
conn.close()
# 创建输出数据库
def createOutputDB():
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute("""CREATE TABLE if not exists `resulovetable` (
`id` integer primary key autoincrement,
`taskmatrix` text NOT NULL,
`inputtype` text NOT NULL,
`status` tinyint DEFAULT 0,
`devicelist` text NOT NULL,
`ctime` tinyint DEFAULT 2)
""")
conn.commit()
cursor.close()
conn.close()
# 创建数据缓存数据库
def createDataCach():
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute("""CREATE TABLE if not exists `datacach` (
`id` integer primary key autoincrement,
`deviceid` varchar(30) NOT NULL,
`data` integer default -1,
`updatetime` varchar(30) NOT NULL,
`groupid` integer NOT NULL)
""")
conn.commit()
cursor.close()
conn.close()
# 设置时间周期
def updatePeriod(cTime):
......@@ -194,71 +256,17 @@ def updatePeriod(cTime):
conn.close()
return (status, output)
# 插入任务到数据库
def insertDB(task, ctime):
try:
hashtext = str(time.time()).split(".")[1]
sql = "insert into task values ('" + task + "', '" + hashtext + "', " + ctime + ")"
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()
(status, output) = (1, hashtext)
except sqlite3.Error as err_msg:
print("Database error: %s", err_msg)
(status, output) = (-1, err_msg)
except Exception as err_msg:
(status, output) = (-1, err_msg)
finally:
cursor.close()
conn.close()
return (status, output)
# 清空数据库
def clearDB():
try:
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute("delete from task")
conn.commit()
(status, output) = (-1, "delete success")
except sqlite3.Error as err_msg:
print("Database error: %s", err_msg)
(status, output) = (-1, err_msg)
except Exception.Error as err_msg:
(status, output) = (-1, err_msg)
finally:
cursor.close()
conn.close()
return (status, output)
# 展示数据库内容
def showDB():
try:
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute("select * from task")
data = cursor.fetchall()
if data is None:
(status, output) = (1, 0)
else:
(status, output) = (1, data)
except sqlite3.Error as err_msg:
(status, output) = (-1, err_msg)
except Exception.Error as err_msg:
(status, output) = (-1, err_msg)
finally:
cursor.close()
conn.close()
return (status, output)
# 返回数据库内容
def showDatabase(tableName):
tableNames = ['resulovetable', 'datacach', 'decidedestination']
if not tableName in tableNames:
return (-1, "no this table")
return showDB(tableName)
if __name__ == "__main__":
createInputDB()
createOutputDB()
createDB()
createDataCach()
# 设置host和port
HOST, PORT = "0.0.0.0", 3000
logger = logging.getLogger("TCPServer")
......@@ -287,4 +295,3 @@ if __name__ == "__main__":
# 使用control + C 退出程序
server.serve_forever()
'''
*************************************************
* 任务流解析程序 *
* 程序说明: *
* 1. 输入任务流矩阵 *
* 1. 解析任务工作流 *
* *
* author: joliu<joliu@s-an.org> *
* date: 2018-4-13 *
*************************************************
'''
import json
import numpy as np
from controllMatrix import *
inputDeviceData = np.array([])
def resolveMatrix(inputTask, inputTypeList, outputNode):
'''
根据矩阵构建输入输出关系
inputTask: 输入矩阵;inputTypeList: 节点类型列表,指明各个节点的具体功能;outputNode: 输出节点;
inputDeviceData: 输入设备节点状态值
更改获取实时的传感器数据
'''
nodeType = inputTypeList[outputNode]
lastNode = np.where((inputTask.T[outputNode] == 1))[0]
if nodeType == 0:
output = resolveMatrix(inputTask, inputTypeList, lastNode[0])
elif nodeType == 1:
a = resolveMatrix(inputTask, inputTypeList, lastNode[0])
b = resolveMatrix(inputTask, inputTypeList, lastNode[1])
output = a & b
elif nodeType == 2:
a = resolveMatrix(inputTask, inputTypeList, lastNode[0])
b = resolveMatrix(inputTask, inputTypeList, lastNode[1])
output = a | b
elif nodeType == 3:
a = resolveMatrix(inputTask, inputTypeList, lastNode[0])
output = not a
elif nodeType == 5:
# 延时器
sleepTime = getValueByNodeID(outputNode)[1]
output = resolveMatrix(inputTask, inputTypeList, lastNode[0])
time.sleep(sleepTime)
elif nodeType == 4:
print(outputNode)
output = getValueByNodeID(outputNode)[1]
print(output)
return output
def checkMatrix(inputTask, inputTypeList):
# 验证数据数据库数据是否否和规定
(status, output) = getDeviceListFromDB()
if status == -1:
print(output)
return False
(getTaskStatusStatus, taskStatus) = getTaskStatus()
if getTaskStatusStatus == -1:
print(taskStatus)
if taskStatus == 0:
print("Task has been stopped!")
return False
deviceList = json.loads(output)
for device in deviceList:
if device == -1:
continue
if device[:5] == 'delay':
continue
if device[:6] == 'switch':
# 临时解决方案,把switch都作为输出屏蔽掉
continue
(runStatus, deviceStatus) = existDevice(device)
if runStatus == -1:
print(deviceStatus)
return False
if not deviceStatus:
print("need to set datacach database")
return False
# 测试矩阵是否符合要求
length = inputTask.shape[0]
zeroList = np.zeros(length)
outputNode = np.where((inputTask == zeroList).all(1))[0]
if outputNode.shape[0] != 1:
print("输出节点不符合单输出要求")
return False
# 验证inputTypeList与inputTask矩阵是否匹配,主要验证输入与类型是否一致
for i in range(length):
countInput = np.sum(inputTask.T[i] == 1)
if inputTypeList[i] in [0,3]:
if countInput == 1:
continue
else:
print("请检查%d行" % i)
return False
elif inputTypeList[i] in [1,2]:
if countInput == 2:
continue
else:
print("请检查%d行" % i)
return False
elif inputTypeList[i] in [4]:
if countInput == 0:
continue
else:
print("请检查%d行" % i)
return False
return True
def runTask():
# 输入各个节点的状态信息
data = getTaskFromDB()
if data == []:
return (-1, 'No Task')
(id, inputTask, inputTypeList, status, deviceList, ctime) = data[0]
inputTask = np.array(json.loads(inputTask))
inputTypeList = np.array(json.loads(inputTypeList))
if not checkMatrix(inputTask, inputTypeList):
return (-1, '矩阵检查不通过')
# 获取矩阵宽度
length = inputTask.shape[0]
# 创建零向量
zeroList = np.zeros(length)
# 获取输出节点,即行为零向量
outputNode = np.where((inputTask == zeroList).all(1))[0]
# 获取输入节点,即列为零向量
inputNodes = np.where((inputTask.T == zeroList).all(1))[0]
return (1, resolveMatrix(inputTask, inputTypeList, outputNode[0]))
if __name__ == "__main__":
inputData = [0,0,0,0,0,1,0,1]
print(runTask())
#bin/sh
python3 testDevice.py &
python3 listenSer.py &
python3 getData.py &
python3 workProcess.py
print('''
***********************************************
* 传感器设备检测程序 *
* 程序说明: *
* 1. 使用ping命令检测设备是否在线 *
* 2. 将检测结果保存在网关数据库中 *
* *
* author:joliu<joliu@s-an.org> *
* date: 2018-4-12 *
***********************************************
''')
import subprocess
import os
import pymysql
import time
def sdnToDatabase(sql):
# 发送sql命令到数据库
cmd = sql.split(' ')[0]
try:
conn = pymysql.connect(host='test-mysql', port=3306, user='root', passwd='Vudo3423', db='HiDockerwifi', charset='utf8')
cursor = conn.cursor()
cursor.execute(sql)
if cmd == 'select':
output = cursor.fetchall()
status = True
else:
conn.commit()
status = True
output = ''
except pymysql.Error as err_msg:
status = False
output = err_msg
except Exception as err_msg:
status = False
output = err_msg
finally:
cursor.close()
conn.close()
return (status, output)
def getDeviceIDByIP(ipaddr):
# 从数据库查找对应的设备编号
sql = "select equip from portdb where ipaddress='%s'" % ipaddr
result = sdnToDatabase(sql)
if result[0]:
if result[1] == ():
return False
print(result[0])
return result[1][0][0]
else:
return False
def updateDeviceInfo(ipaddress, equipid, device_status, delay):
# 根据ip地址更新设备
sql = "select * from deviceinfo where ipaddress='%s'" % ipaddress
(status, result) = sdnToDatabase(sql)
if status:
if result == ():
sql = "insert into deviceinfo (equipid, ipaddress, status, \
delay) VALUES ('%s', '%s',%d ,'%s')" % (equipid, ipaddress,\
device_status, delay)
else:
sql = "update deviceinfo set status=%d,delay='%s' where ipaddress='%s'" % \
(device_status, delay, ipaddress)
(status, output) = sdnToDatabase(sql)
if not status:
return False
return True
else:
print("connet database error")
return False
def pingTestHost(ip):
# 使用ping检测host设备的网络通信情况
cmd = 'ping -c 4 -w 4 ' + ip
equipid = getDeviceIDByIP(ip)
if not equipid:
return False
# 执行ping命令
device_status = 0
(status, output) = subprocess.getstatusoutput(cmd)
if status == 0:
# 如果检测设备正常,获取单次检测的时延
# delayTime = output.split('\n')[1].split(' ')[-2].split('=')[1]
delayTime = output.split('\n')[-1].split(' ')[-2]
print(delayTime)
device_status = 1
else:
delayTime = '-'
device_status = 0
return updateDeviceInfo(ip, equipid, device_status, delayTime)
if __name__ == "__main__":
host = os.getenv('HOST')
while True:
print(host)
print(pingTestHost(host))
time.sleep(1)
'''
****************************************************
* 任务执行模块 *
* 1. 检查任务队列是否存在任务 *
* 2. 从任务队列中取任务并执行 *
* 3. 发送控制命令到指定的智能体 *
* 4. 读取全部任务队列中任务,并依次执行 *
* 5. 未来或许可以根据配置的智能体参数, *
* 动态修改智能体的计算资源等 *
* *
* author: joliu<joliu@s-an.org> *
* date: 2018-3-22 *
****************************************************
'''
from listenSer import sendBySocket
from listenSer import sendCommandToDevice
import sqlite3
import time
def findTask():
from controllMatrix import *
from resolveMatrix import runTask
import os
import socket
# 发送控制指令到传感器设备,参数cmd,控制命令
def sendCommandToDevice(cmd):
response = 'error'
# 通过容器的环境变量HOST获取绑定传感器的IP地址
ip, port = os.getenv('HOST'), 8085
try:
sql = "select * from task"
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute(sql)
data = cursor.fetchall()
if data is None:
(status, output) = (-1, 'database is empty!')
else:
(status, output) = (1, data)
except sqlite3.Error as err_msg:
(status, output) = (-1, err_msg)
except Exception as err_msg:
(status, output) = (-1, err_msg)
finally:
return (status, output)
# 根据任务队列中任务,依次执行
def doTask():
(status, outputs) = findTask()
if status != 1:
return (status, outputs)
for output in outputs:
(task, taskid, ctime) = output
if len(task.split(';')) != 2 or len(task.split(':')) != 3:
print("Error task: %s" % task)
continue
# 初步定义task字符串模式 eg: >30;192.168.1.1:3000:off
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error as err_msg:
print("Error creating socket:%s" % err_msg)
s.close()
return (-1, err_msg)
try:
s.connect((ip, port))
except socket.gaierror as err_msg:
print('Address-related error connecting to server: %s' % err_msg)
s.close()
return (-1, err_msg)
(condition, command) = task.split(';')
(ip, port, method) = command.split(':')
# 构建控制命令
method = 'device&' + method
# 读取传感器数值
(status, data) = sendCommandToDevice(method)
# 千杀的dht11,需要处理下数据
data = data.split('&')[0]
print(output)
try:
s.sendall(cmd.encode())
except socket.error as err_msg:
print("Error sending data: %s" % err_msg)
s.close()
return (-1, err_msg)
try:
response = s.recv(1024).decode()
except socket.error as err_msg:
print('Error receiving data: %s' % err_msg)
s.close()
return (-1, err_msg)
s.close()
# 程序运行正常,返回传感器传递的值
return (1, str(response))
if __name__ == '__main__':
time.sleep(10)
while True:
(status, output) = runTask()
if status == -1:
print("get device data failed! ip: %s, method: %s" % (ip, method))
return (status, "get device data failed! ip: %s, method: %s" % (ip, method))
if compare(condition[0], float(data), float(condition[1:])):
# 当结果为真,向目标传感器发出指令
(status, recvdata) = sendBySocket(ip, int(port), method)
print(recvdata)
time.sleep(ctime)
print(output)
time.sleep(20)
continue
else:
pass
return (1, 'success')
# 根据符号来比较两个数值的大小
def compare(signal, value1, value2):
if signal == '>':
return value1 > value2
elif signal == '<':
return value1 < value2
elif signal == '=':
return value1 == value2
else:
return False
def mainWhileProcess(input_ctime):
while True:
print("cycle time :" + str(input_ctime))
time.sleep(input_ctime)
(status, output) = doTask()
print(output)
if __name__ == "__main__":
mainWhileProcess(5)
if output == 1:
msg = 'on'
print('on')
else:
msg = 'off'
print('off')
print(sendCommandToDevice(msg))
ctime = getCircleTime()
time.sleep(ctime)
CREATE TABLE `deviceinfo` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`equipid` varchar(30) NOT NULL,
`ipaddress` varchar(30) NOT NULL,
`status` tinyint DEFAULT 0,
`delay` varchar(30) DEFAULT NULL,
PRIMARY KEY (`id`)
)
import socket
import sys
import numpy as np
import json
port = 33335
inputTask = np.array([[0,0,1,0,0,0,0,0],\
[0,0,1,0,0,0,0,0],\
[0,0,0,1,0,0,0,0],\
[0,0,0,0,1,0,0,0],\
[0,0,0,0,0,0,0,0],\
[1,0,0,0,0,0,0,0],\
[1,0,0,0,0,0,0,0],\
[0,1,0,0,0,0,0,0]])
inputTypeList = np.array([1,3,2,5,0,4,4,4])
deviceList = [-1,-1,-1,"delay1","switch103","dht102","dht103","dht104"]
jsnInputTask = json.dumps(inputTask.tolist())
jsnInputTypeList = json.dumps(inputTypeList.tolist())
jsnDeviceList = json.dumps(deviceList)
message = 'controller&addOutput&%s&%s&%s&0&2' % (jsnInputTask, jsnInputTypeList, jsnDeviceList)
# 执行逻辑矩阵数据库
#message = 'controller&show&resulovetable'
# 数据缓存数据库
#message = 'controller&show&datacach'
# 数据采集控制数据库
#message = 'controller&show&decidedestination'
#message = 'controller&clear'
# 模拟Device设备发送信息
# message = 'device&dht102&0'
# 输出
# message = 'controller&stop'
#message = 'controller&start'
# 添加轮询查询传感器命令
#message = "controller&addInput&20&192.168.12.19:33335&20"
# 添加缓存数据库内容
data = [["delay1", "dht102", "dht103", "dht104"],[10,1,0,1]]
data_json = json.dumps(data)
message = "controller&addCach&%s" % data_json
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("192.168.12.19", port))
s.sendall(message.encode())
response = s.recv(1024).decode()
print(response)
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Program started
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
FROM opengn.org:32333/easypi/alpine-arm
MAINTAINER joliu<joliu@s-an.org>
ENV TIME_ZONE Asia/Shanghai
RUN apk --update add python3 py3-pip tzdata
RUN pip3 install pymysql
RUN echo "${TIME_ZONE}" > /etc/timezone \
&& ln -sf /usr/share/zoneinfo/${TIME_ZONE} /etc/localtime
WORKDIR /data
COPY listenSer.py /data
COPY workProcess.py /data
COPY start.sh /data
CMD sh /data/start.sh
# 测试程序说明
继电器虚拟化模块
指定传感器ip地址为:192.168.12.42
指定传感器listenSer端口号为: 30001
## 使用说明
需要先启动nodemcu,待自动部署成功之后再另行进行测试。
'''
**************************************************
* 智能体监听模块 *
* *
* 1. 监听智能体控制器设置请求 *
* 2. 循环处理任务队列中的任务 *
* 3. 接收请求并执行 *
* *
* author: joliu<joliu@s-an.org> *
* date: 2018-3-21 *
**************************************************
'''
import socket
import threading
import socketserver
import json
import os
import time
import subprocess
import logging
import sqlite3
# 两种控制模式,controller:控制器写入控制命令,device:接收其他传感器控制命令
controlModeList = ['controller', 'device']
controlMethodList = ['add', 'rm', 'clear', 'period', 'show']
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
'''
消息监听模块
'''
def handle(self):
# 设备忙碌标识
busyFlag = False
# 错误标识
errorFlag = False
try:
# 接收socket消息
data = self.request.recv(1024).decode()
except socket.error as err_msg:
# 返回异常信息
(status, output) = (-1, err_msg)
# 异常处理
print("recv error!")
exit(1)
message = data.split('&')
# 检测控制模式
controlMode = message[0]
print(message)
if controlMode in controlModeList:
if controlMode == "controller":
# 写入控制命令到任务队
print(message[1])
command = message[1]
# 检测是否是合法操作
if not command in controlMethodList:
print("error: illegal command")
errorFlag = True
(status, output) = (-1, "illegal controller command: %s" % command)
else:
# 匹配控制指令做出相应操作
(status, output) = executeCommand(command, message[2:])
# 监听来自device hfv模块的控制请求
elif controlMode == "device":
command = message[1]
# 发送控制请求
(status, output) = sendCommandToDevice(command)
# (status, output) = (1, "test")
else:
pass
else:
print("illegal controlMode")
(status, output) = (-1, 'illegal controlMode')
errorFlag = True
# 返回控制命令执行结果
jresp = json.dumps((status, str(output)))
try:
self.request.sendall(jresp.encode())
except socket.error as err_msg:
print("socket failed %s" % err_msg)
exit(1)
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
# 发送控制指令到Device
def sendCommandToDevice(cmd):
# 通过容器的环境变量HOST获取绑定传感器的IP地址
ip, port = "192.168.12.42", 8085
return sendBySocket(ip, port, cmd)
# 通过socket发送信息
def sendBySocket(ip, port, cmd):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error as err_msg:
print("Error creating socket:%s" % err_msg)
s.close()
return (-1, err_msg)
try:
s.connect((ip, port))
except socket.error as err_msg:
print("Address-related error connecting to server: %s" % err_msg)
s.close()
return (-1, err_msg)
print("****************send:" + cmd)
try:
s.sendall(cmd.encode())
except socket.error as err_msg:
print("Error sending data: %s" % err_msg)
s.close()
return (-1, err_msg)
try:
response = s.recv(1024).decode()
print(response)
except socket.error as err_msg:
print("Error receiving data: %s" % err_msg)
s.close()
return (-1, err_msg)
print(str(response))
s.close()
# 程序运行正常返回目标传感器返回的数据
return (1, str(response))
# 执行控制指令
def executeCommand(command, information):
if command == "add":
# 目前假设information就是全部控制指令
task = information[0]
ctime = information[1]
print("****************")
print(task)
print(ctime)
(status, output) = insertDB(task, ctime)
print(output)
elif command == "clear":
# 清空任务队列
(status, output) = clearDB()
elif command == "period":
ctime = information[0]
# 设置查询循环周期
(status, output) = updatePeriod(ctime)
elif command == "show":
(status, output) = showDB()
print(output)
else:
# 可能由于更新可执行任务列表,而未实现功能导致的问题
(status, output) = (-1, "method isn't ready")
return (status, output)
# 创建数据库
def createDB():
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute("""CREATE TABLE if not exists task
(cmd text, hashtext text, ctime int(5))
""")
conn.commit()
cursor.close()
conn.close()
# 设置时间周期
def updatePeriod(cTime):
try:
sql = 'update task set ctime=' + str(cTime)
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()
(status, output) = (1, cTime)
except sqlite3.Error as err_msg:
print("Database error: %s", err_msg)
(status, output) = (-1, err_msg)
except Exception as err_msg:
(status, output) = (-1, err_msg)
finally:
cursor.close()
conn.close()
return (status, output)
# 插入任务到数据库
def insertDB(task, ctime):
try:
hashtext = str(time.time()).split(".")[1]
sql = "insert into task values ('" + task + "', '" + hashtext + "', " + ctime + ")"
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()
(status, output) = (1, hashtext)
except sqlite3.Error as err_msg:
print("Database error: %s", err_msg)
(status, output) = (-1, err_msg)
except Exception as err_msg:
(status, output) = (-1, err_msg)
finally:
cursor.close()
conn.close()
return (status, output)
# 清空数据库
def clearDB():
try:
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute("delete from task")
conn.commit()
(status, output) = (-1, "delete success")
except sqlite3.Error as err_msg:
print("Database error: %s", err_msg)
(status, output) = (-1, err_msg)
except Exception.Error as err_msg:
(status, output) = (-1, err_msg)
finally:
cursor.close()
conn.close()
return (status, output)
# 展示数据库内容
def showDB():
try:
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute("select * from task")
data = cursor.fetchall()
if data is None:
(status, output) = (1, 0)
else:
(status, output) = (1, data)
except sqlite3.Error as err_msg:
(status, output) = (-1, err_msg)
except Exception.Error as err_msg:
(status, output) = (-1, err_msg)
finally:
cursor.close()
conn.close()
return (status, output)
if __name__ == "__main__":
createDB()
# 设置host和port
HOST, PORT = "0.0.0.0", 30001
logger = logging.getLogger("TCPServer")
logger.setLevel(logging.INFO)
# 创建句柄
fh = logging.FileHandler("1.log")
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s -\
%(message)s')
# 添加句柄到logger类
logger.addHandler(fh)
logger.info("Program started")
socketserver.TCPServer.allow_reuse_address = True
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
ip, port = server.server_address
# 启动多进程监听服务
server_thread = threading.Thread(target=server.serve_forever)
# 当主进程中断时退出程序
server_thread.daemon = True
server_thread.start()
logger.info("Server loop running in thread:" + server_thread.name)
logger.info("....waiting for connection")
# 使用control + C 退出程序
server.serve_forever()
#bin/sh
python3 listenSer.py &
python3 workProcess.py
'''
****************************************************
* 任务执行模块 *
* 1. 检查任务队列是否存在任务 *
* 2. 从任务队列中取任务并执行 *
* 3. 发送控制命令到指定的智能体 *
* 4. 未来或许可以根据配置的智能体参数, *
* 动态修改智能体的计算资源等 *
* *
* author: joliu<joliu@s-an.org> *
* date: 2018-3-22 *
****************************************************
'''
from listenSer import sendBySocket
from listenSer import sendCommandToDevice
import sqlite3
import time
def findTask():
try:
sql = "select * from task LIMIT 1"
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute(sql)
data = cursor.fetchone()
if data is None:
(status, output) = (-1, 'database is empty!')
else:
(status, output) = (1, data)
except sqlite3.Error as err_msg:
(status, output) = (-1, err_msg)
except Exception as err_msg:
(status, output) = (-1, err_msg)
finally:
return (status, output)
# 根据符号来比较两个数值的大小
def compare(signal, value1, value2):
if signal == '>':
return value1 > value2
elif signal == '<':
return value1 < value2
elif signal == '=':
return value1 == value2
else:
return False
def mainWhileProcess(input_ctime):
ctime = input_ctime
while True:
print(ctime)
time.sleep(ctime)
(status, output) = findTask()
if status == -1:
print(output)
# 如果数据库为空,或者错误,恢复初始设置
ctime = input_ctime
continue
(task, taskid, ctime) = output
if len(task.split(';')) != 2 or len(task.split(':')) != 3:
print("Error task: %s" % task)
continue
# 初步定义task字符串模式 eg: >30;192.168.1.1:3000:off
(condition, command) = task.split(';')
(ip, port, method) = command.split(':')
# 读取传感器数值
(status, output) = sendCommandToDevice(method)
if status == -1:
print("get device data failed! ip: %s, method: %s" % (ip, method))
continue
if compare(condition[0], float(output), float(condition[1:])):
# 当结果为真,向目标传感器发出指令
(status, output) = sendBySocket(ip, port, method)
print(output)
else:
pass
if __name__ == "__main__":
mainWhileProcess(5)
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Program started
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
Program started
Server loop running in thread:Thread-1
....waiting for connection
FROM opengn.org:32333/easypi/alpine-arm
MAINTAINER joliu<joliu@s-an.org>
ENV TIME_ZONE Asia/Shanghai
RUN apk --update add python3 py3-pip tzdata
RUN pip3 install pymysql
RUN echo "${TIME_ZONE}" > /etc/timezone \
&& ln -sf /usr/share/zoneinfo/${TIME_ZONE} /etc/localtime
WORKDIR /data
COPY listenSer.py /data
COPY workProcess.py /data
COPY start.sh /data
CMD sh /data/start.sh
# 测试程序说明
DHT11虚拟化模块
指定传感器ip地址为:192.168.12.75
指定传感器listenSer端口号为: 30002
## 使用说明
需要先启动nodemcu,待自动部署成功之后再另行进行测试。
'''
**************************************************
* 智能体监听模块 *
* *
* 1. 监听智能体控制器设置请求 *
* 2. 循环处理任务队列中的任务 *
* 3. 接收请求并执行 *
* *
* author: joliu<joliu@s-an.org> *
* date: 2018-3-21 *
**************************************************
'''
import socket
import threading
import socketserver
import json
import os
import time
import subprocess
import logging
import sqlite3
# 两种控制模式,controller:控制器写入控制命令,device:接收其他传感器控制命令
controlModeList = ['controller', 'device']
controlMethodList = ['add', 'rm', 'clear', 'period', 'show']
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
'''
消息监听模块
'''
def handle(self):
# 设备忙碌标识
busyFlag = False
# 错误标识
errorFlag = False
try:
# 接收socket消息
data = self.request.recv(1024).decode()
except socket.error as err_msg:
# 返回异常信息
(status, output) = (-1, err_msg)
# 异常处理
print("recv error!")
exit(1)
message = data.split('&')
# 检测控制模式
controlMode = message[0]
print(message)
if controlMode in controlModeList:
if controlMode == "controller":
# 写入控制命令到任务队
print(message[1])
command = message[1]
# 检测是否是合法操作
if not command in controlMethodList:
print("error: illegal command")
errorFlag = True
(status, output) = (-1, "illegal controller command: %s" % command)
else:
# 匹配控制指令做出相应操作
(status, output) = executeCommand(command, message[2:])
# 监听来自device hfv模块的控制请求
elif controlMode == "device":
command = message[1]
# 发送控制请求
(status, output) = sendCommandToDevice(command)
# (status, output) = (1, "test")
else:
pass
else:
print("illegal controlMode")
(status, output) = (-1, 'illegal controlMode')
errorFlag = True
# 返回控制命令执行结果
jresp = json.dumps((status, str(output)))
try:
self.request.sendall(jresp.encode())
except socket.error as err_msg:
print("socket failed %s" % err_msg)
exit(1)
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
# 发送控制指令到Device
def sendCommandToDevice(cmd):
# 通过容器的环境变量HOST获取绑定传感器的IP地址
ip, port = "192.168.12.75", 8085
return sendBySocket(ip, port, cmd)
# 通过socket发送信息
def sendBySocket(ip, port, cmd):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error as err_msg:
print("Error creating socket:%s" % err_msg)
s.close()
return (-1, err_msg)
try:
s.connect((ip, port))
except socket.error as err_msg:
print("Address-related error connecting to server: %s" % err_msg)
s.close()
return (-1, err_msg)
print("****************send:" + cmd)
try:
s.sendall(cmd.encode())
except socket.error as err_msg:
print("Error sending data: %s" % err_msg)
s.close()
return (-1, err_msg)
try:
response = s.recv(1024).decode()
print(response)
except socket.error as err_msg:
print("Error receiving data: %s" % err_msg)
s.close()
return (-1, err_msg)
try:
response = s.recv(1024).decode()
except socket.error as err_msg:
print("Error receiving data: %s" % err_msg)
s.close()
return (-1, err_msg)
print(str(response))
s.close()
# 程序运行正常返回目标传感器返回的数据
return (1, str(response))
# 执行控制指令
def executeCommand(command, information):
if command == "add":
# 目前假设information就是全部控制指令
task = information[0]
ctime = information[1]
print("****************")
print(task)
print(ctime)
(status, output) = insertDB(task, ctime)
print(output)
elif command == "clear":
# 清空任务队列
(status, output) = clearDB()
elif command == "period":
ctime = information[0]
# 设置查询循环周期
(status, output) = updatePeriod(ctime)
elif command == "show":
(status, output) = showDB()
print(output)
else:
# 可能由于更新可执行任务列表,而未实现功能导致的问题
(status, output) = (-1, "method isn't ready")
return (status, output)
# 创建数据库
def createDB():
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute("""CREATE TABLE if not exists task
(cmd text, hashtext text, ctime int(5))
""")
conn.commit()
cursor.close()
conn.close()
# 设置时间周期
def updatePeriod(cTime):
try:
sql = 'update task set ctime=' + str(cTime)
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()
(status, output) = (1, cTime)
except sqlite3.Error as err_msg:
print("Database error: %s", err_msg)
(status, output) = (-1, err_msg)
except Exception as err_msg:
(status, output) = (-1, err_msg)
finally:
cursor.close()
conn.close()
return (status, output)
# 插入任务到数据库
def insertDB(task, ctime):
try:
hashtext = str(time.time()).split(".")[1]
sql = "insert into task values ('" + task + "', '" + hashtext + "', " + ctime + ")"
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute(sql)
conn.commit()
(status, output) = (1, hashtext)
except sqlite3.Error as err_msg:
print("Database error: %s", err_msg)
(status, output) = (-1, err_msg)
except Exception as err_msg:
(status, output) = (-1, err_msg)
finally:
cursor.close()
conn.close()
return (status, output)
# 清空数据库
def clearDB():
try:
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute("delete from task")
conn.commit()
(status, output) = (-1, "delete success")
except sqlite3.Error as err_msg:
print("Database error: %s", err_msg)
(status, output) = (-1, err_msg)
except Exception.Error as err_msg:
(status, output) = (-1, err_msg)
finally:
cursor.close()
conn.close()
return (status, output)
# 展示数据库内容
def showDB():
try:
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute("select * from task")
data = cursor.fetchall()
if data is None:
(status, output) = (1, 0)
else:
(status, output) = (1, data)
except sqlite3.Error as err_msg:
(status, output) = (-1, err_msg)
except Exception.Error as err_msg:
(status, output) = (-1, err_msg)
finally:
cursor.close()
conn.close()
return (status, output)
if __name__ == "__main__":
createDB()
# 设置host和port
HOST, PORT = "0.0.0.0", 30002
logger = logging.getLogger("TCPServer")
logger.setLevel(logging.INFO)
# 创建句柄
fh = logging.FileHandler("1.log")
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s -\
%(message)s')
# 添加句柄到logger类
logger.addHandler(fh)
logger.info("Program started")
socketserver.TCPServer.allow_reuse_address = True
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
ip, port = server.server_address
# 启动多进程监听服务
server_thread = threading.Thread(target=server.serve_forever)
# 当主进程中断时退出程序
server_thread.daemon = True
server_thread.start()
logger.info("Server loop running in thread:" + server_thread.name)
logger.info("....waiting for connection")
# 使用control + C 退出程序
server.serve_forever()
#bin/sh
python3 listenSer.py &
python3 workProcess.py
'''
****************************************************
* 任务执行模块 *
* 1. 检查任务队列是否存在任务 *
* 2. 从任务队列中取任务并执行 *
* 3. 发送控制命令到指定的智能体 *
* 4. 读取全部任务队列中任务,并依次执行 *
* 5. 未来或许可以根据配置的智能体参数, *
* 动态修改智能体的计算资源等 *
* *
* author: joliu<joliu@s-an.org> *
* date: 2018-3-22 *
****************************************************
'''
import sqlite3
import time
import socket
# 发送控制指令到Device
def sendCommandToDevice(cmd):
# 通过容器的环境变量HOST获取绑定传感器的IP地址
ip, port = "192.168.12.75", 8085
return sendBySocket(ip, port, cmd)
def findTask():
try:
sql = "select * from task"
conn = sqlite3.connect("task.db")
cursor = conn.cursor()
cursor.execute(sql)
data = cursor.fetchall()
if data is None:
(status, output) = (-1, 'database is empty!')
else:
(status, output) = (1, data)
except sqlite3.Error as err_msg:
(status, output) = (-1, err_msg)
except Exception as err_msg:
(status, output) = (-1, err_msg)
finally:
return (status, output)
# 根据任务队列中任务,依次执行
def doTask():
(status, outputs) = findTask()
if status != 1:
return (status, outputs)
for output in outputs:
(task, taskid, ctime) = output
if len(task.split(';')) != 2 or len(task.split(':')) != 3:
print("Error task: %s" % task)
continue
# 初步定义task字符串模式 eg: >30;192.168.1.1:3000:off
(condition, command) = task.split(';')
(ip, port, method) = command.split(':')
# 构建控制命令
method = 'device&' + method
# 读取传感器数值
(status, data) = sendCommandToDevice(method)
# 千杀的dht11,需要处理下数据
data = data.split('&')[0]
print(output)
if status == -1:
print("get device data failed! ip: %s, method: %s" % (ip, method))
return (status, "get device data failed! ip: %s, method: %s" % (ip, method))
if compare(condition[0], float(data), float(condition[1:])):
# 当结果为真,向目标传感器发出指令
(status, recvdata) = sendBySocket(ip, int(port), method)
print(recvdata)
time.sleep(ctime)
else:
pass
return (1, 'success')
# 通过socket发送信息
def sendBySocket(ip, port, cmd):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error as err_msg:
print("Error creating socket:%s" % err_msg)
s.close()
return (-1, err_msg)
try:
s.connect((ip, port))
except socket.error as err_msg:
print("Address-related error connecting to server: %s" % err_msg)
s.close()
return (-1, err_msg)
print("****************send:" + cmd)
try:
s.sendall(cmd.encode())
except socket.error as err_msg:
print("Error sending data: %s" % err_msg)
s.close()
return (-1, err_msg)
try:
response = s.recv(1024).decode()
print(response)
except socket.error as err_msg:
print("Error receiving data: %s" % err_msg)
s.close()
return (-1, err_msg)
#print(str(response))
s.close()
# 程序运行正常返回目标传感器返回的数据
return (1, str(response))
# 根据符号来比较两个数值的大小
def compare(signal, value1, value2):
if signal == '>':
return value1 > value2
elif signal == '<':
return value1 < value2
elif signal == '=':
return value1 == value2
else:
return False
def mainWhileProcess(input_ctime):
while True:
print("cycle time :" + str(input_ctime))
time.sleep(input_ctime)
(status, output) = doTask()
print(output)
if __name__ == "__main__":
mainWhileProcess(5)
#print(doTask())
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment