MicroPython[ESP32]:ESP32-S3设置天气预报,使用FastAPI+MQTT+APScheduler

NO.1
内容汇总

今日份内容:多了一个天气预报:

  1. 使用和风天气API获取当前的实时天气(免费额度)

  2. 使用FastAPI作为后端框架写API

  3. 使用FastAPI+MQTT+APScheduler定时任务发送天气状态

此图片的alt属性为空;文件名为image-806-1024x655.png

NO.2
问题梳理

一.MicroPython的urequests数据解析出错

未解析的response <Response object at 3d84f3b0>

使用response.json()解析报错ValueError: syntax error in JSON

字面上的意思是获取到API的结果是一个对象,但是这个对象无法解析为JSON

此图片的alt属性为空;文件名为image-807-1024x260.png

二.屏幕固定

要使用触屏,屏幕要固定在3D打印的外壳上

目前只是套了一个简易外壳,要使屏幕固定,还需要设置螺纹孔,设置后盖

待设计

NO.3
笔记梳理

一.FastAPI的使用

官方文档

https://fastapi.tiangolo.com/zh/

此图片的alt属性为空;文件名为image-808-1024x557.png

.FastAPI+MQTT

github地址

https://github.com/sabuhish/fastapi-mqtt

文档地址

https://sabuhish.github.io/fastapi-mqtt/

引入依赖

from fastapi_mqtt.fastmqtt import FastMQTT
from fastapi_mqtt.config import MQTTConfig

写入代码

# 设置mqtt
mqtt_config = MQTTConfig(host="xxx.domain.com", port=1883, will_message_topic="topic",
                         will_message_payload="msg", will_delay_interval=3)
mqtt = FastMQTT(
    config=mqtt_config
)
mqtt.init_app(app)

@mqtt.on_connect()
def connect(client, flags, rc, properties):
    mqtt.client.subscribe("topic")  # subscribing mqtt topic
    mqtt.publish("topic", "topic")
    print("Connected: ", client, flags, rc, properties)


@mqtt.on_message()
async def message(client, topic, payload, qos, properties):
    print("Received message: ", topic, payload.decode(), qos, properties)
    return 0


@mqtt.subscribe("my/mqtt/topic/#")
async def message_to_topic(client, topic, payload, qos, properties):
    print("Received message to specific topic: ", topic, payload.decode(), qos, properties)


@mqtt.on_disconnect()
def disconnect(client, packet, exc=None):
    print("Disconnected")


@mqtt.on_subscribe()
def subscribe(client, mid, qos, properties):
    print("subscribed", client, mid, qos, properties)

三.FastAPI+APScheduler

定时任务框架APScheduler

引入依赖

from apscheduler.schedulers.background import BackgroundScheduler

设置定时任务要执行的函数

# 设置定时任务
def get_weather_res():
    print("获取和风天气结果")

设置定时任务

scheduler = BackgroundScheduler()
scheduler.add_job(get_weather_res, 'interval', minutes=10, id='get_weather_res')
scheduler.start()

已上内容标识10分钟获取一次和风天气

获取到内容后再使用MQTT发送订阅消息

NO.3
Tips

哔哩哔哩计数也可以转为FastAPI,使用MQTT推送订阅消息

也就是凡是网络的内容,都可以使用服务器部署服务,在服务端设置定时器和业务逻辑

ESP32-S3客户端只需要开启WIFI+MQTT,接收订阅消息,然后根据不同的订阅主题来处理不同的响应逻辑

既可以减少HTTP的请求,也可以实时响应数据,并且是数据处理转移到服务器,减少性能消耗,可以做数据包装处理

本文就是因为短时间无法处理和风天气API数据JSON解析问题,改而采用服务端FastAPI来处理

END