一、背景
实习期间参与智能硬件公司的高速公路缴费机业务-打印机串口测试工具
一个用于打印机/读卡器联调的桌面测试程序。
程序通过 PyQt5 提供操作界面,从 ini 配置读取设备参数,
使用 pyserial 打开串口并发送十六进制设备指令,
同时支持日志记录、协议解析,并保留 MQTT 通信扩展能力。
二、串口
1.串口通信 pyserial
是什么:计算机跟外部设备通信的标准接口
场景:上位机软件需要控制光源控制器,给相机打光的,支持一些串口通信,发送不同的指令从而切换到不同的光源通道。
核心:串行通信-一位一位传输;异步通信- 没有时钟信号,靠波特率同步;全双工的方式:可以同时发送和接受数据
2.波特率
数据传输速度:常用9600/19200,波特率必须保持一致
数据位:每个字节的数据位数,常用:7,8(1个字节)
停止位:字节结束标志,常用:1、1.5、2
校验位:错误检测:None、Odd:奇校验、Even:偶校验
流控制:数据控制流:None、HardWare、Software
三、工业通讯协议
让电脑、PLC、传感器。机器人、仪器仪表之间稳定地收发数据
实际上就是:连接设备-发送数据-解析数据-控制设备
那么设备怎么连?
常用串口通信、网口通信、总线通信、无线通信
那么数据怎么发?
按照协议格式组装字节,再按照协议解析返回字节
MQTT协议:一种发布/订阅消息协议
# MQTT发布端程序
import json
import time
import random
import paho.mqtt.client as mqtt
# 服务器地址-类似消息中转站
broker = "broker.emqx.io"
port = 1883
# 消息分类路径:订阅这个主题的人,就可以收到这台设备的温度数据
topic = "factory/device001/temperature"
client = mqtt.Client()
# broker 服务器地址,keepalive为心跳时间:60s
client.connect(broker, port, 60)
while True:
# 构造数据
data = {
"device_id": "device001",
"temperature": round(random.uniform(24, 30), 2),
"timestamp": int(time.time())
}
payload = json.dumps(data)
# 发布消息,qos=1 至少发布一次,0为最多发布一次,2为只发送一次
client.publish(
topic=topic,
payload=payload,
qos=1
)
print("已发布:", payload)
time.sleep(5)
# 订阅者
import json
import paho.mqtt.client as mqtt
broker = "broker.emqx.io"
port = 1883
topic = "factory/device001/temperature"
# MQTT 客户端连接 Broker 成功后,自动执行
def on_connect(client, userdata, flags, rc):
print("MQTT 连接成功")
client.subscribe(topic, qos=1)
def on_message(client, userdata, msg):
payload = msg.payload.decode("utf-8")
data = json.loads(payload)
print("收到主题:", msg.topic)
print("设备编号:", data["device_id"])
print("温度:", data["temperature"])
print("时间戳:", data["timestamp"])
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker, port, 60)
# 一直监听消息
client.loop_forever()
TCP传输协议
# TCP:传输控制协议-面向连接+可靠传输+有顺序+有确认机制+保证可靠性速度慢一点
import socket
# 创建一个基于 IPv4 + TCP 的服务器
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("127.0.0.1", 9000))
# 监听:开始等待客户端连接,最多一个
server.listen(1)
print("TCP 设备服务已启动,等待连接...")
# 开始连接客户端,成功连接会返回通信对象和地址
conn, addr = server.accept()
print("客户端连接:", addr)
while True:
data = conn.recv(1024)
if not data:
break
message = data.decode("utf-8")
print("收到数据:", message)
if message == "read_temperature":
conn.send("temperature=26.5".encode("utf-8"))
else:
conn.send("unknown command".encode("utf-8"))
conn.close()
server.close()
import socket
# 创建一个基于 IPv4 + TCP 的网络通信客户端
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(("127.0.0.1", 9000))
client.send("read_temperature".encode("utf-8"))
# 从服务端接收数据,最多1024
data = client.recv(1024)
print("设备返回:", data.decode("utf-8"))
client.close()
UDP传输协议
# UDP 用户数据报协议-无连接+速度快+不保证送达+不保证顺序+开销小
import socket
# 创建一个 IPv4 + UDP 的服务端
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind(("127.0.0.1", 9001))
print("UDP 设备服务已启动...")
while True:
data, addr = server.recvfrom(1024)
message = data.decode("utf-8")
print("收到数据:", message, "来自:", addr)
if message == "read_status":
# 因为UDP没有连接,所以每次回复都要指定发给谁,也就是addr
server.sendto("status=running".encode("utf-8"), addr)
else:
server.sendto("unknown command".encode("utf-8"), addr)
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 发送数据:先把字符串转换成字节数据
client.sendto('read_status'.encode("utf-8"), ("127.0.0.1", 9001))
# 接收UDP服务端返回数据
data, addr = client.recvfrom(1024)
print('设备返回:', data.decode("utf-8"))
client.close()
工业通讯协议
Modbus
原理:客户端连接PLC服务端,读取PLC内存地址,控制设备
1.模拟plc
from pymodbus.server import StartTcpServer
from pymodbus.simulator import SimData, SimDevice, DataType
# 4 个区块顺序:
# coils, discrete inputs, holding registers, input registers
device = SimDevice(
id=1,
simdata=(
[SimData(address=0, values=[True] + [False] * 99, datatype=DataType.BITS)],
[SimData(address=0, values=[False] * 100, datatype=DataType.BITS)],
[SimData(address=0, values=[123, 456] + [0] * 98, datatype=DataType.REGISTERS)],
[SimData(address=0, values=[0] * 100, datatype=DataType.REGISTERS)],
)
)
print("模拟 PLC 已启动:127.0.0.1:1502")
print("Holding Register 0 = 123")
print("Holding Register 1 = 456")
StartTcpServer(
context=[device],
address=("127.0.0.1", 1502)
)
2.read
from pymodbus.client import ModbusTcpClient
client = ModbusTcpClient("127.0.0.1", port=1502)
if not client.connect():
print("连接失败")
exit()
result = client.read_holding_registers(
address=0,
count=2,
device_id=1
)
if result.isError():
print("读取失败:", result)
else:
print("读取成功:", result.registers)
client.close()
3.write
from pymodbus.client import ModbusTcpClient
client = ModbusTcpClient("127.0.0.1", port=1502)
if not client.connect():
print("连接失败")
exit()
# 写入:把地址 0 改成 999
result = client.write_register(
address=0,
value=999,
device_id=1
)
if result.isError():
print("写入失败:", result)
else:
print("写入成功")
# 读回来验证
result = client.read_holding_registers(
address=0,
count=2,
device_id=1
)
if result.isError():
print("读取失败:", result)
else:
print("当前寄存器:", result.registers)
client.close()
非特殊说明,本博所有文章均为博主原创。
共有 0 条评论