目标
使用MicroPython和核桃派Pico_W实现一个简单的BThome v2蓝牙温湿度计。需要用到蓝牙广播,传感器读取,数据打包结合到一起
准备工作
- ESP32开发板
- DHT11温湿度传感器
- Type-C数据线
- 烧录好01Studio出厂MicroPython固件
- IDE:Thonny
步骤1:导入必要模块
import bluetooth
import struct
from machine import Pin
import dht
import time
这些模块分别是:
bluetooth
:用于蓝牙功能。struct
:用于处理字节流和结构体数据。machine.Pin
:用于控制GPIO引脚。dht
:用于读取DHT11或DHT22传感器数据。time
:提供时间相关函数,如延时。
步骤2:定义BTHome v2常量
BT_HOME_FLAGS = b'\x02\x01\x06'
BT_HOME_SERVICE_DATA_SPECIAL_BYTE = 0x16
BT_HOME_UUID_BYTES = b'\xD2\xFC'
BT_HOME_DEVICE_INFORMATION_BYTE = 0x40
这些常量用于构建蓝牙广播数据包,确保遵循BTHome v2协议的要求。
协议格式要求详细参考官网了解: 参考资料 – BTHome
步骤3:定义BTHomeAdvertisement类
初始化蓝牙和传感器
class BTHomeAdvertisement:
def __init__(self, ble, name='TempHum'):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
self._connections = set()
self.dht = dht.DHT11(Pin(2))
self._last_temperature = None
self._last_humidity = None
self.name = name
self._start_advertising()
这里,我们初始化蓝牙,设置下DHT11传感器,一些变量用于温度和湿度数据。
构建蓝牙广播数据包
def _start_advertising(self):
self._payload = self._build_advertisement_payload()
self._ble.gap_advertise(100, adv_data=self._payload)
_build_advertisement_payload
:方法负责构建蓝牙广播数据包,确保包含所有必要信息。
def _build_advertisement_payload(self):
flags = BT_HOME_FLAGS
service_data = bytearray([BT_HOME_SERVICE_DATA_SPECIAL_BYTE]) + BT_HOME_UUID_BYTES + bytes([BT_HOME_DEVICE_INFORMATION_BYTE])
if self._last_temperature is not None:
temperature_bytes = struct.pack("<h", int(self._last_temperature * 100))
service_data += b'\x02' + temperature_bytes
if self._last_humidity is not None:
humidity_bytes = struct.pack("<H", int(self._last_humidity * 100))
service_data += b'\x03' + humidity_bytes
local_name = bytearray([len(self.name) + 1, 0x09]) + bytes(self.name, 'utf-8')
service_data = bytearray([len(service_data)]) + service_data
payload = flags + service_data + local_name
return payload
更新传感器数据
def update_sensors(self):
self.dht.measure()
temp = self.dht.temperature()
hum = self.dht.humidity()
print(f"温度: {temp:.2f} °C")
print(f"湿度: {hum:.2f} %")
if temp != self._last_temperature or hum != self._last_humidity:
self._last_temperature = temp
self._last_humidity = hum
self._start_advertising()
time.sleep_ms(500)
update_sensors
:方法负责读取传感器数据,检查是否有变化,如果有,就更新蓝牙广播。
处理蓝牙事件
def _irq(self, event, data):
# 处理连接和断开事件
pass
你可以选择需要添加蓝牙事件处理。
步骤4:主程序
ble = bluetooth.BLE()
dht_ble = BTHomeAdvertisement(ble)
while True:
dht_ble.update_sensors()
在主程序中,创建了蓝牙对像, BTHomeAdvertisement
:类,并进入一个无限循环,持续更新传感器数据。
总结
如何使用MicroPython和ESP32实现一个BTHome v2蓝牙温湿度计。这个示例不仅涵盖了蓝牙广播和传感器读取,还介绍了如何在微控制器上高效地处理数据和事件。你可以尝试修改设备名称、调整读取频率,甚至扩展功能,如增加电池电量监测等。接下来,我简单介绍一些这个示例使用步骤。
配合核桃派1B Home Assistant 镜像使用步骤:
先打开Thonny运行核桃派Pico-W程序参考官方教程: 点亮第1个LED | 核桃派 (walnutpi.com)
进入Home Assistant主页并按步骤操作,可参考官方添加MQTT集成教程: 添加MQTT集成 | 核桃派 (walnutpi.com)
1:点击配置
2:添加集成
3:搜索BTHome
4:选择对应设备之
5:打开刚刚添加的BTHome集成
可以看到多了1个设备和3个实体,有一个是默认禁用的不用管,可以点红色箭头对着的地方和实体进去查看详细的信息
可以选择添加到仪表盘:
实验结果:
可以看到在概览中查看到了温湿度计广播出来的数据:
源码以及完整代码参考:
'''
实验名称:Home Assistant BTHome + 温湿度传感器DHT11
实验平台:核桃派1B + 核桃派PicoW
作者:WalnutPi
说明:编程实现Home Assistant以BTHome方式实现DHT11温湿度传感器数据采集
社区:www.walnutpi.com
'''
import bluetooth
import struct
from machine import Pin
import dht
import utime
# BTHome v2 常量
BT_HOME_FLAGS = b'\x02\x01\x06'
BT_HOME_SERVICE_DATA_SPECIAL_BYTE = 0x16
BT_HOME_UUID_BYTES = b'\xD2\xFC'
BT_HOME_DEVICE_INFORMATION_BYTE = 0x40
class BTHomeAdvertisement:
def __init__(self, ble, name='TempHum'):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
self._connections = set()
self.LED= Pin(46, Pin.OUT)
self.dht = dht.DHT11(Pin(12))
utime.sleep(1)
self._last_temperature = None
self._last_humidity = None
self.name = name
self._start_advertising()
def _start_advertising(self):
self._payload = self._build_advertisement_payload()
self._ble.gap_advertise(100, adv_data=self._payload)
def _build_advertisement_payload(self):
flags = BT_HOME_FLAGS
service_data = bytearray([BT_HOME_SERVICE_DATA_SPECIAL_BYTE]) + BT_HOME_UUID_BYTES + bytes([BT_HOME_DEVICE_INFORMATION_BYTE])
if self._last_temperature is not None:
temperature_bytes = struct.pack("<h", int(self._last_temperature * 100))
service_data += b'\x02' + temperature_bytes
if self._last_humidity is not None:
humidity_bytes = struct.pack("<H", int(self._last_humidity * 100))
service_data += b'\x03' + humidity_bytes
local_name = bytearray([len(self.name) + 1, 0x09]) + bytes(self.name, 'utf-8')
service_data = bytearray([len(service_data)]) + service_data
payload = flags + service_data + local_name
return payload
def _irq(self, event, data):
pass
def update_sensors(self):
self.dht.measure()
temp = self.dht.temperature()
hum = self.dht.humidity()
print("Temperature: {:.2f} °C".format(temp))
print("Humidity: {:.2f} %".format(hum))
if temp != self._last_temperature or hum != self._last_humidity:
self._last_temperature = temp
self._last_humidity = hum
self._start_advertising()
self.LED.value(1) # 点亮LED
utime.sleep_ms(500) # 保持亮起状态500ms
self.LED.value(0) # 关闭LED
utime.sleep_ms(500) # 等待500ms
ble = bluetooth.BLE()
dht_ble = BTHomeAdvertisement(ble)
while True:
try:
dht_ble.update_sensors()
except Exception as e: #异常提示
print('Time Out!')
utime.sleep_ms(1000)
main.py (2.8 KB)