基于PyQt5的工业化GUI软件串口工具

Rudy 2025-1-3 5 1/3

一、背景

实习期间参与智能硬件公司的高速公路缴费机业务-打印机串口测试工具

一个用于打印机/读卡器联调的桌面测试程序。
程序通过 PyQt5 提供操作界面,从 ini 配置读取设备参数,
使用 pyserial 打开串口并发送十六进制设备指令,
同时支持日志记录、协议解析,并保留 MQTT 通信扩展能力。

二、串口

1.串口通信 pyserial

是什么:计算机跟外部设备通信的标准接口

场景:上位机软件需要控制光源控制器,给相机打光的,支持一些串口通信,发送不同的指令从而切换到不同的光源通道。

核心:串行通信-一位一位传输;异步通信- 没有时钟信号,靠波特率同步;全双工的方式:可以同时发送和接受数据

2.波特率

数据传输速度:常用9600/19200,波特率必须保持一致

数据位:每个字节的数据位数,常用:7,8(1个字节)

停止位:字节结束标志,常用:1、1.5、2

校验位:错误检测:None、Odd:奇校验、Even:偶校验

流控制:数据控制流:None、HardWare、Software

三、工业通讯协议

让电脑、PLC、传感器。机器人、仪器仪表之间稳定地收发数据

实际上就是:连接设备-发送数据-解析数据-控制设备

那么设备怎么连?

常用串口通信、网口通信、总线通信、无线通信

那么数据怎么发?

按照协议格式组装字节,再按照协议解析返回字节

 

MQTT协议:一种发布/订阅消息协议

# MQTT发布端程序
import json
import time
import random
import paho.mqtt.client as mqtt

# 服务器地址-类似消息中转站
broker = "broker.emqx.io"
port = 1883
# 消息分类路径:订阅这个主题的人,就可以收到这台设备的温度数据
topic = "factory/device001/temperature"

client = mqtt.Client()
# broker 服务器地址,keepalive为心跳时间:60s
client.connect(broker, port, 60)

while True:
    # 构造数据
    data = {
        "device_id": "device001",
        "temperature": round(random.uniform(24, 30), 2),
        "timestamp": int(time.time())
    }

    payload = json.dumps(data)

    # 发布消息,qos=1 至少发布一次,0为最多发布一次,2为只发送一次
    client.publish(
        topic=topic,
        payload=payload,
        qos=1
    )

    print("已发布:", payload)

    time.sleep(5)
# 订阅者
import json
import paho.mqtt.client as mqtt

broker = "broker.emqx.io"
port = 1883
topic = "factory/device001/temperature"

# MQTT 客户端连接 Broker 成功后,自动执行
def on_connect(client, userdata, flags, rc):
    print("MQTT 连接成功")
    client.subscribe(topic, qos=1)

def on_message(client, userdata, msg):
    payload = msg.payload.decode("utf-8")
    data = json.loads(payload)

    print("收到主题:", msg.topic)
    print("设备编号:", data["device_id"])
    print("温度:", data["temperature"])
    print("时间戳:", data["timestamp"])

client = mqtt.Client()

client.on_connect = on_connect
client.on_message = on_message

client.connect(broker, port, 60)
# 一直监听消息
client.loop_forever()

TCP传输协议

# TCP:传输控制协议-面向连接+可靠传输+有顺序+有确认机制+保证可靠性速度慢一点
import socket

# 创建一个基于 IPv4 + TCP 的服务器
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

server.bind(("127.0.0.1", 9000))
# 监听:开始等待客户端连接,最多一个
server.listen(1)

print("TCP 设备服务已启动,等待连接...")

# 开始连接客户端,成功连接会返回通信对象和地址
conn, addr = server.accept()
print("客户端连接:", addr)

while True:
    data = conn.recv(1024)

    if not data:
        break

    message = data.decode("utf-8")
    print("收到数据:", message)

    if message == "read_temperature":
        conn.send("temperature=26.5".encode("utf-8"))
    else:
        conn.send("unknown command".encode("utf-8"))

conn.close()
server.close()
import socket

# 创建一个基于 IPv4 + TCP 的网络通信客户端
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

client.connect(("127.0.0.1", 9000))

client.send("read_temperature".encode("utf-8"))

# 从服务端接收数据,最多1024
data = client.recv(1024)
print("设备返回:", data.decode("utf-8"))

client.close()

UDP传输协议

# UDP 用户数据报协议-无连接+速度快+不保证送达+不保证顺序+开销小
import socket
# 创建一个 IPv4 + UDP 的服务端
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

server.bind(("127.0.0.1", 9001))

print("UDP 设备服务已启动...")

while True:
    data, addr = server.recvfrom(1024)

    message = data.decode("utf-8")
    print("收到数据:", message, "来自:", addr)

    if message == "read_status":
        # 因为UDP没有连接,所以每次回复都要指定发给谁,也就是addr
        server.sendto("status=running".encode("utf-8"), addr)
    else:
        server.sendto("unknown command".encode("utf-8"), addr)
import socket

client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 发送数据:先把字符串转换成字节数据
client.sendto('read_status'.encode("utf-8"), ("127.0.0.1", 9001))
# 接收UDP服务端返回数据
data, addr = client.recvfrom(1024)

print('设备返回:', data.decode("utf-8"))

client.close()

工业通讯协议

Modbus

原理:客户端连接PLC服务端,读取PLC内存地址,控制设备

1.模拟plc

from pymodbus.server import StartTcpServer
from pymodbus.simulator import SimData, SimDevice, DataType

# 4 个区块顺序:
# coils, discrete inputs, holding registers, input registers
device = SimDevice(
    id=1,
    simdata=(
        [SimData(address=0, values=[True] + [False] * 99, datatype=DataType.BITS)],
        [SimData(address=0, values=[False] * 100, datatype=DataType.BITS)],
        [SimData(address=0, values=[123, 456] + [0] * 98, datatype=DataType.REGISTERS)],
        [SimData(address=0, values=[0] * 100, datatype=DataType.REGISTERS)],
    )
)

print("模拟 PLC 已启动:127.0.0.1:1502")
print("Holding Register 0 = 123")
print("Holding Register 1 = 456")

StartTcpServer(
    context=[device],
    address=("127.0.0.1", 1502)
)

2.read

from pymodbus.client import ModbusTcpClient

client = ModbusTcpClient("127.0.0.1", port=1502)

if not client.connect():
    print("连接失败")
    exit()

result = client.read_holding_registers(
    address=0,
    count=2,
    device_id=1
)

if result.isError():
    print("读取失败:", result)
else:
    print("读取成功:", result.registers)

client.close()

3.write

from pymodbus.client import ModbusTcpClient

client = ModbusTcpClient("127.0.0.1", port=1502)

if not client.connect():
    print("连接失败")
    exit()

# 写入:把地址 0 改成 999
result = client.write_register(
    address=0,
    value=999,
    device_id=1
)

if result.isError():
    print("写入失败:", result)
else:
    print("写入成功")

# 读回来验证
result = client.read_holding_registers(
    address=0,
    count=2,
    device_id=1
)

if result.isError():
    print("读取失败:", result)
else:
    print("当前寄存器:", result.registers)

client.close()

 

 

 

 

- THE END -
最后修改:2026年6月7日
0

非特殊说明,本博所有文章均为博主原创。

共有 0 条评论