bilibili-视频演示
注意,这里没有选择umqtt.simple
因为在实时使用过程中有以下几个问题
-
需要while循环,堵塞进程
-
总是在持续一段时间后,尤其是收到遗嘱(断联)信息后,会报错并终止MQTT(这个问题Github有被提到过,没解决)
所以改用了异步无阻塞的micropython-mqtt
这是官方驱动程序的替代品。它已经在以下平台上进行了测试。
-
ESP8266
-
ESP32、ESP32-S2 和 ESP32-S3
-
Pyboard D
-
Arduino Nano Connect
-
Raspberry Pi Pico W
该驱动程序的主要特点是:
-
使用 uasyncio 的应用程序的非阻塞操作。
-
从 WiFi 和代理中断中自动恢复。
-
真正的qos == 1重传操作。
-
改进了 WiFi 范围,因为它可以容忍连接不良。
旧图:在线人数 1
github地址
https://github.com/peterhinch/micropython-mqtt/tree/master
文档地址
https://github.com/peterhinch/micropython-mqtt/blob/master/mqtt_as/README.md
首先需要先部署MQTT服务,本文使用的是EMQX
参考文章
《docker-compose自部署:EMQX设置MQTT》
把mqtt_as.py文件传输到ESP32-S3中
https://github.com/peterhinch/micropython-mqtt/tree/master/mqtt_as
引入依赖
from mqtt_as import MQTTClient, config
import uasyncio as asyncio
设置config配置
需要设置
-
wifi名称
-
wifi密码
-
mqtt的ip地址
config['ssid'] = 'wifi_name'
config['wifi_pw'] = 'wifi_password'
config['server'] = 'mqtt_ip_addr'
设置收到订阅消息的回调函数
# 监听消息接收
def msg_call(topic, msg, retained):
print("接收到消息")
topic_str=topic.decode()
msg_str=msg.decode()
print("topic_str",topic_str)
print("msg_str",msg_str)
设置订阅主题的函数
async def conn_han(client):
await client.subscribe('web', 1)
await client.subscribe('weather', 1)
设置config回调函数和订阅主题
config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
设置mqtt定时刷新消息(间隔几秒)
async def main(client):
await client.connect()
n = 0
while True:
await asyncio.sleep(5)
print('publish', n)
# If WiFi is down the following will pause for the duration.
await client.publish('result', '{}'.format(n), qos = 1)
n += 1
最后实例化mqtt并开始连接
MQTTClient.DEBUG = True # Optional: print diagnostic messages
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
控制台提示
Checking WiFi integrity.
Got reliable connection
Connecting to broker.
Connected to broker.
publish 0
publish 1
publish 2
RAM free 8139680 alloc 56800
publish 3
修改要发送的订阅主题
client.publish是发送订阅消息
把result改为weather主题,测试是否接收消息
async def main(client):
await client.connect()
n = 0
while True:
await asyncio.sleep(5)
print('publish', n)
# If WiFi is down the following will pause for the duration.
await client.publish('weather', '{}'.format(n), qos = 1)
n += 1
可以看到接收消息的回调函数收到并执行方法打印到控制台了
网站在线人数
步骤拆解
-
优先检测当前浏览器本地永久存储localStorage中的指定key的数据
-
如果没有,则生成对应的uuid作为mqtt的clientId并存储到localStorage
-
如果有,则动态加载mqttjs的库,并监听加载完成设置回调函数
-
加载mqttjs完成后,根据当前的uuid作为mqtt的clientId建立连接(这样无论打开几个tab,都只会有一个唯一连接)
-
设置mqtt连接成功回调中订阅指定主题和推送消息(如主题为域名)
-
设置mqtt停止连接后回调中推送主题消息(如主题为域名)
-
设置遗嘱消息(Will Message),当用户关闭网页失去连接时,推送主题消息
通过EMQX的API:/api/v5/subscriptions,可以获取当前主题的订阅数
参考文档地址
https://www.emqx.io/docs/zh/v5.0/admin/api.html
https://www.emqx.io/docs/zh/v5.0/admin/api-docs.html
代码如下 online.js
// 生成uuid
function getUuid() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
var r = (Math.random() * 16) | 0,
v = c == 'x' ? r : (r & 0x3) | 0x8;
return v.toString(16);
});
}
// 动态加载js
function loadScript(src, callback) {
var script = document.createElement('script'),
head = document.getElementsByTagName('head')[0];
script.type = 'text/javascript';
script.charset = 'UTF-8';
script.src = src;
if (script.addEventListener) {
script.addEventListener('load', function () {
callback();
}, false);
} else if (script.attachEvent) {
script.attachEvent('onreadystatechange', function () {
var target = window.event.srcElement;
if (target.readyState == 'loaded') {
callback();
}
});
}
head.appendChild(script);
}
// 先检测本地存储是否有对应id
let mqtt_uuid = localStorage.getItem('mqtt_uuid');
console.log(`mqtt_uuid`, mqtt_uuid)
if (!mqtt_uuid) {
// 如果没有mqtt_uuid则创建一个新的
mqtt_uuid = getUuid()
console.log(`mqtt_uuid_res`, mqtt_uuid)
// 保存uuid到本地永久存储
localStorage.setItem('mqtt_uuid', mqtt_uuid);
}
// 动态加载mqttjs到页面
// 参考地址
// https://www.npmjs.com/package/mqtt
let mqttJsUrl = `https://xxx.com/mqtt.min.js`
loadScript(mqttJsUrl, () => {
console.log(`已完成mqtt.js的加载`)
// 订阅主题 设置为当前网站
let domainStr = ``
// 使用本地存储的id建立mqtt连接
const client = mqtt.connect('wss://mqtt.com/wss', {
clientId: mqtt_uuid,
port: `443`,
// host:`ip addr`
// 设置遗嘱消息
will: {
// 要发布的主题
topic: domainStr,
// 要发布的消息
payload: `_disconnect`,
// 服务质量
qos: 2,
// 保留标志
retain: false
}
})
// 监听连接成功后回调
client.on('connect', function () {
client.subscribe(domainStr, function (err) {
if (!err) {
// 订阅成功则给该主题推送一个消息来测试
client.publish(domainStr, '_online')
}
})
})
// 接收订阅消息后的回调
client.on('message', function (topic, message) {
// message is Buffer
console.log(`接收到订阅通知`, message.toString())
})
// 监听断开连接
client.on('disconnect', function () {
console.log(`监听断开连接`)
client.publish(domainStr, '_disconnect')
})
})
然后随意在任何一个网页放入上述的online.js文件即可
<script src="online.js"></script>
部署好后,再使用LVGL在ESP32-S3写文案,收到消息后,更新当前在线人数
新增对网页MQTT消息响应,修改实时在线人数
在msg_call添加
其中get_online_num方法是通过emqx http api获取当前订阅主题的连接数
if msg_str=="web_online":
print("获取当前在线人数-新创建连接")
get_online_num()
elif msg_str=="web_disconnect":
print("获取当前在线人数——已断开连接")
get_online_num()
设置emqx http api方法获取连接数
# 获取网站当前在线人数
def get_online_num():
print("获取网站当前在线人数")
username = 'de0ce9f4ba6bd2xx'
password = 'qluJUCfxSYN8c19BSBLihxxfXjZgVQFgwx6DJ'
# auth_header = "Basic " + base64.b64encode((username + ":" + password).encode()).decode()
# 注意这里get请求只支持http,使用https会报错
response = urequests.get('http://domain.com/api/v5/subscriptions',headers ={
'Content-Type':'application/json',
"Authorization":"Basic ZGUwY2U5ZjiZDRkMHVKVUNmeFNZTjhqTXc5QXBLeWMxOUJTQkxpaHh4ZlhqWmdWUUZnd3g2REo="
})
parsed = response.json()
list_data=parsed["data"]
list_online=[]
# 遍历当前在线订阅连接数
for item in list_data:
if item["topic"]=="web":
list_online.append(item)
# 打印当前在线人数
print("len",len(list_online))
len_str=str(len(list_online))
# label_online_num.set_style_text_font(font_cn, 0) # set the font
label_online_num.set_text(""+len_str)
上述的username 和password 需要到emqx后台创建
创建成功
在代码get请求时,auth_header 需要转为base64 (方法见注释)
以下注意事项
-
在浏览器中mqttjs只能使用websocket方法连接,无法使用mqtt
-
如果网站为https,则同样的要使用wss(ssl websocket),需要配置nginx和证书等
-
ESP32-S3尚未测试ssl和非ssl区别,如果ssl有问题,就直接使用ip即可
总结如下
-
使用EMQX自建MQTT服务
-
使用micropython-mqtt设置ESP32-S3 MQ
-
使用MQTTJS库封装一个js文件,可以到任何网站建立MQTT(websocker连接),并使用EMQX的API获取在线人数
-
使用LVGL显示对应的在线人数
往期内容
《MicroPython[ESP32-S3]:LVGL显示GIF+http request显示哔哩哔哩粉丝计数》
《MicroPython[ESP32-S3]:使用lv_font_conv制作中文字体文件然后加载和显示》
《MicroPython[ESP32-S3]:lvgl使用fontTools提取字体子集设置中文手写字体》
END