|
本帖最后由 mao_jin_dao 于 2023-8-29 21:36 编辑
项目目标
esp8266设备A读取EC11旋转编码器数值,通过mqtt发送数据;
esp8266设备B通过mqtt接收数据,并绘制在ssd1306 oled屏幕上。
编写代码
设备A代码
main.py
- import time
- import json
- from rotary_irq_esp import RotaryIRQ
- from umqtt.simple import MQTTClient
- device_id = "esp0007"
- wifi_ssid='FAST_20CC'
- wifi_password='409409409'
- mqtt_serverip='192.168.1.113'
- mqtt_serverport=1883
- mqtt_clientid=device_id
- mqtt_publishtopic=b'espiot'
- message_template = {}
- message_template['source_device'] = mqtt_clientid
- message_template['target_device'] = 'server'
- message_template['msg_type'] = 'ReportRotateEncoder'
- def connectionWifi(ssid, password):
- wlan = network.WLAN(network.STA_IF)
- wlan.active(True)
- wlan.connect(ssid, password)
- while True:
- if not wlan.isconnected():
- print ("connecting...")
- else:
- print('connected to network')
- break
- time.sleep(1)
- if __name__== '__main__':
- connectionWifi(wifi_ssid,wifi_password)
-
- client = MQTTClient(mqtt_clientid,mqtt_serverip,mqtt_serverport)
- client.connect()
-
- rotate = RotaryIRQ(
- pin_num_clk=4,
- pin_num_dt=5,
- reverse=True,
- incr=1,
- range_mode=RotaryIRQ.RANGE_UNBOUNDED,
- pull_up=True,
- half_step=False,
- )
-
- val_old = rotate.value()
- while True:
- val_new = rotate.value()
- if val_old != val_new:
- val_old = val_new
- print("step =", val_new)
- message_template['step'] = str(val_new)
- client.publish(mqtt_publishtopic,json.dumps(message_template))
- time.sleep_ms(50)
复制代码
rotary_irq_esp.py
- # MIT License (MIT)
- # Copyright (c) 2020 Mike Teachman
- # https://opensource.org/licenses/MIT
- # Platform-specific MicroPython code for the rotary encoder module
- # ESP8266/ESP32 implementation
- # Documentation:
- # https://github.com/MikeTeachman/micropython-rotary
- from machine import Pin
- from rotary import Rotary
- from sys import platform
- _esp8266_deny_pins = [16]
- class RotaryIRQ(Rotary):
- def __init__(self, pin_num_clk, pin_num_dt, min_val=0, max_val=10, incr=1,
- reverse=False, range_mode=Rotary.RANGE_UNBOUNDED, pull_up=False, half_step=False, invert=False):
- if platform == 'esp8266':
- if pin_num_clk in _esp8266_deny_pins:
- raise ValueError(
- '%s: Pin %d not allowed. Not Available for Interrupt: %s' %
- (platform, pin_num_clk, _esp8266_deny_pins))
- if pin_num_dt in _esp8266_deny_pins:
- raise ValueError(
- '%s: Pin %d not allowed. Not Available for Interrupt: %s' %
- (platform, pin_num_dt, _esp8266_deny_pins))
- super().__init__(min_val, max_val, incr, reverse, range_mode, half_step, invert)
- if pull_up == True:
- self._pin_clk = Pin(pin_num_clk, Pin.IN, Pin.PULL_UP)
- self._pin_dt = Pin(pin_num_dt, Pin.IN, Pin.PULL_UP)
- else:
- self._pin_clk = Pin(pin_num_clk, Pin.IN)
- self._pin_dt = Pin(pin_num_dt, Pin.IN)
- self._enable_clk_irq(self._process_rotary_pins)
- self._enable_dt_irq(self._process_rotary_pins)
- def _enable_clk_irq(self, callback=None):
- self._pin_clk.irq(
- trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING,
- handler=callback)
- def _enable_dt_irq(self, callback=None):
- self._pin_dt.irq(
- trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING,
- handler=callback)
- def _disable_clk_irq(self):
- self._pin_clk.irq(handler=None)
- def _disable_dt_irq(self):
- self._pin_dt.irq(handler=None)
- def _hal_get_clk_value(self):
- return self._pin_clk.value()
- def _hal_get_dt_value(self):
- return self._pin_dt.value()
- def _hal_enable_irq(self):
- self._enable_clk_irq(self._process_rotary_pins)
- self._enable_dt_irq(self._process_rotary_pins)
- def _hal_disable_irq(self):
- self._disable_clk_irq()
- self._disable_dt_irq()
- def _hal_close(self):
- self._hal_disable_irq()
复制代码
rotary.py
- # MIT License (MIT)
- # Copyright (c) 2022 Mike Teachman
- # https://opensource.org/licenses/MIT
- # Platform-independent MicroPython code for the rotary encoder module
- # Documentation:
- # https://github.com/MikeTeachman/micropython-rotary
- import micropython
- _DIR_CW = const(0x10) # Clockwise step
- _DIR_CCW = const(0x20) # Counter-clockwise step
- # Rotary Encoder States
- _R_START = const(0x0)
- _R_CW_1 = const(0x1)
- _R_CW_2 = const(0x2)
- _R_CW_3 = const(0x3)
- _R_CCW_1 = const(0x4)
- _R_CCW_2 = const(0x5)
- _R_CCW_3 = const(0x6)
- _R_ILLEGAL = const(0x7)
- _transition_table = [
- # |------------- NEXT STATE -------------| |CURRENT STATE|
- # CLK/DT CLK/DT CLK/DT CLK/DT
- # 00 01 10 11
- [_R_START, _R_CCW_1, _R_CW_1, _R_START], # _R_START
- [_R_CW_2, _R_START, _R_CW_1, _R_START], # _R_CW_1
- [_R_CW_2, _R_CW_3, _R_CW_1, _R_START], # _R_CW_2
- [_R_CW_2, _R_CW_3, _R_START, _R_START | _DIR_CW], # _R_CW_3
- [_R_CCW_2, _R_CCW_1, _R_START, _R_START], # _R_CCW_1
- [_R_CCW_2, _R_CCW_1, _R_CCW_3, _R_START], # _R_CCW_2
- [_R_CCW_2, _R_START, _R_CCW_3, _R_START | _DIR_CCW], # _R_CCW_3
- [_R_START, _R_START, _R_START, _R_START]] # _R_ILLEGAL
- _transition_table_half_step = [
- [_R_CW_3, _R_CW_2, _R_CW_1, _R_START],
- [_R_CW_3 | _DIR_CCW, _R_START, _R_CW_1, _R_START],
- [_R_CW_3 | _DIR_CW, _R_CW_2, _R_START, _R_START],
- [_R_CW_3, _R_CCW_2, _R_CCW_1, _R_START],
- [_R_CW_3, _R_CW_2, _R_CCW_1, _R_START | _DIR_CW],
- [_R_CW_3, _R_CCW_2, _R_CW_3, _R_START | _DIR_CCW],
- [_R_START, _R_START, _R_START, _R_START],
- [_R_START, _R_START, _R_START, _R_START]]
- _STATE_MASK = const(0x07)
- _DIR_MASK = const(0x30)
- def _wrap(value, incr, lower_bound, upper_bound):
- range = upper_bound - lower_bound + 1
- value = value + incr
- if value < lower_bound:
- value += range * ((lower_bound - value) // range + 1)
- return lower_bound + (value - lower_bound) % range
- def _bound(value, incr, lower_bound, upper_bound):
- return min(upper_bound, max(lower_bound, value + incr))
- def _trigger(rotary_instance):
- for listener in rotary_instance._listener:
- listener()
- class Rotary(object):
- RANGE_UNBOUNDED = const(1)
- RANGE_WRAP = const(2)
- RANGE_BOUNDED = const(3)
- def __init__(self, min_val, max_val, incr, reverse, range_mode, half_step, invert):
- self._min_val = min_val
- self._max_val = max_val
- self._incr = incr
- self._reverse = -1 if reverse else 1
- self._range_mode = range_mode
- self._value = min_val
- self._state = _R_START
- self._half_step = half_step
- self._invert = invert
- self._listener = []
- def set(self, value=None, min_val=None, incr=None,
- max_val=None, reverse=None, range_mode=None):
- # disable DT and CLK pin interrupts
- self._hal_disable_irq()
- if value is not None:
- self._value = value
- if min_val is not None:
- self._min_val = min_val
- if max_val is not None:
- self._max_val = max_val
- if incr is not None:
- self._incr = incr
- if reverse is not None:
- self._reverse = -1 if reverse else 1
- if range_mode is not None:
- self._range_mode = range_mode
- self._state = _R_START
- # enable DT and CLK pin interrupts
- self._hal_enable_irq()
- def value(self):
- return self._value
- def reset(self):
- self._value = 0
- def close(self):
- self._hal_close()
- def add_listener(self, l):
- self._listener.append(l)
- def remove_listener(self, l):
- if l not in self._listener:
- raise ValueError('{} is not an installed listener'.format(l))
- self._listener.remove(l)
-
- def _process_rotary_pins(self, pin):
- old_value = self._value
- clk_dt_pins = (self._hal_get_clk_value() <<
- 1) | self._hal_get_dt_value()
-
- if self._invert:
- clk_dt_pins = ~clk_dt_pins & 0x03
-
- # Determine next state
- if self._half_step:
- self._state = _transition_table_half_step[self._state &
- _STATE_MASK][clk_dt_pins]
- else:
- self._state = _transition_table[self._state &
- _STATE_MASK][clk_dt_pins]
- direction = self._state & _DIR_MASK
- incr = 0
- if direction == _DIR_CW:
- incr = self._incr
- elif direction == _DIR_CCW:
- incr = -self._incr
- incr *= self._reverse
- if self._range_mode == self.RANGE_WRAP:
- self._value = _wrap(
- self._value,
- incr,
- self._min_val,
- self._max_val)
- elif self._range_mode == self.RANGE_BOUNDED:
- self._value = _bound(
- self._value,
- incr,
- self._min_val,
- self._max_val)
- else:
- self._value = self._value + incr
- try:
- if old_value != self._value and len(self._listener) != 0:
- _trigger(self)
- except:
- pass
复制代码
设备B代码
main.py
- from umqtt.simple import MQTTClient
- import urequests as requests
- import time
- import ujson
- import network
- from ssd1306 import SSD1306_I2C
- from machine import Pin, I2C
- import math
- device_id = "esp0008"
- wifi_ssid='FAST_20CC'
- wifi_password='409409409'
- mqtt_serverip='192.168.1.113'
- mqtt_serverport=1883
- mqtt_clientid=device_id
- mqtt_publishtopic=b'espiot'
- mqttSubTopic=mqtt_publishtopic
- def draw_circle():
- x=64
- y=32
- r=30
- angleList = [math.radians(i) for i in range(0, 360)]
- for i in angleList:
- light_dot(x+round(math.sin(i)*r), y+round(math.cos(i)*r))
- oled.show()
-
- rotate_list = []
- def draw_rotate(angle):
- tmp=math.radians(angle)
- x=64
- y=32
- r=30
- for i in range(0,len(rotate_list)):
- dot = rotate_list[i]
- extinguish_dot(dot[0],dot[1])
- rotate_list.clear()
-
- for i in range(1, 10):
- point_x = x+round(math.sin(tmp)*(r-i))
- point_y = y+round(math.cos(tmp)*(r-i))
- rotate_list.append((point_x,point_y))
- light_dot(point_x,point_y)
- oled.show()
-
- def light_dot(x, y):
- oled.pixel(x, y, 1)
- def extinguish_dot(x,y):
- oled.pixel(x ,y ,0)
- def connectionWifi(ssid, password):
- wlan = network.WLAN(network.STA_IF)
- wlan.active(True)
- wlan.connect(ssid, password)
- while True:
- if not wlan.isconnected():
- print ("connecting...")
- else:
- print('connected to network')
- break
- time.sleep(1)
-
- def sub_callback(topic, msg):
- msgStr = msg.decode()
- msgObj = ujson.loads(msgStr)
- if(msgObj['source_device'] != 'esp0007'):
- return
- step_str = msgObj['step']
- step = int(step_str)
- print(step)
- draw_rotate(step)
-
-
- def jsonHaveKey(json, key):
- """
- 判断json对象中是否包含key
- """
- isHave = True
- try:
- value = json[key]
- except :
- isHave = False
- return isHave
- if __name__ == "__main__":
- print ("main funtion")
-
- connectionWifi(wifi_ssid,wifi_password)
-
- client = MQTTClient(mqtt_clientid,mqtt_serverip,mqtt_serverport)
- client.set_callback(sub_callback)
- client.connect()
- print('mqtt client connected')
-
- i2c = I2C(scl=Pin(5), sda=Pin(4))
- oled = SSD1306_I2C(128, 64, i2c)
- draw_circle()
- client.subscribe(mqttSubTopic)
- while True:
- client.wait_msg()
复制代码
项目截图
|
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
x
打赏
-
查看全部打赏
|