近日做了一个自动关灯的小东西,放在宿舍里可以避免断电后忘记关灯导致第二天”怀民亦未寝.jpg”。不过有一个问题,这东西是粘在墙上的,想要调试的话总不能搬个电脑蹲在灯旁边debug一个下午吧。正当笔者苦恼于又要买一个3m超长数据线的时候,灵光一现,想到python作为一种脚本语言,是否可以在运行时更新代码呢?
说干就干,先想想怎么写出一个可以自己更新自己的python代码。
一、蟒蛇的自我更新
诶,上网一查,已经有前人想出这鬼点子了,并且还给出了部分的代码(
https://www.zhihu.com/question/626768033/answer/3255532727)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| max = 200
def updateOption(max): with open(__file__, 'r+') as f: arrLines = f.readlines() idxOptionBegin = arrLines.index('# option begin\n') idxOptionEnd = arrLines.index('# option end\n') for idx, optionLine in enumerate(arrLines): if optionLine.startswith('max = ') and idxOptionBegin < idx < idxOptionEnd: arrLines[idx] = 'max = ' + str(max) + '\n' f.seek(0) f.writelines(arrLines) """ Author: 中等难度的贪吃蛇 """
|
我们来copy一小段学习一下,不得不说这位答主的码风很让人赏心悦目,把python的语法糖用的恰到好处。话归正题,这段代码用注释#option begin/end
标注了可修改的区间,__file__
是python内置变量,即为文件自身的名字,总体上先读取所有代码行,把待修改行进行修改后全部输出到源文件中去。
复制粘贴好是好,但是不实操总有东西弄不懂。比方如果在执行过程中把后面的语句删除了,那么他还会执行吗?
简单写下代码
1 2 3
| with open(__file__, 'w+') as f: f.write("111\n" * 3) f.write("222")
|
运行一下会发现原文件变成如下
可见python在运行之初,会将所有的代码纳入内存之中,即使修改代码文件,也不会对运行结果造成影响。
那现在再来看一下esp32上面的micropython是否支持这一特性吧。毕竟micropython有些库不太稳定,让笔者曾一度怀疑它的实力。简单修改一下上面”贪吃蛇“的代码,写了一段更好看的小段代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| param = 0
def updateOption(**kwargs): with open(__file__, 'r+') as f: arrLines = f.readlines() idxOptionBegin = arrLines.index('# option begin\n') idxOptionEnd = arrLines.index('# option end\n') for idx, optionLine in enumerate(arrLines): if not idxOptionBegin < idx < idxOptionEnd: continue for key in kwargs: if optionLine.startswith(key + ' = '): arrLines[idx] = key + ' = ' + str(kwargs[key]) + '\n' with open(__file__, "w+") as f: f.writelines(arrLines)
updateOption(param = param + 1) print(param)
|
这段代码在pc上运行的结果是每次运行param都会加1,将其改名为“main.py”,放在esp32之中尝试运行后重新打开文件,param = 1
!看来mipy是支持这种热修改的,太棒了。
二、Link Start!
那么一切顺利,我们现在可以考虑如何让电脑端传出的待更新代码传到esp32上面去。
有几种方法,一是把代码放到公网上,然后esp32再从公网上将代码取下来;二是局域网通信,esp32和pc端直接进行通信。乍一看似乎法一好些,毕竟可以身处天涯而心系esp32,在哪里都可以更新,但其实有很大的弊端:首先存放代码的公网服务器的延迟都不低,效率和稳定性不如局域网,再者局域网可以实现指令的即时传递,进而实现对esp32文件系统更加便捷的控制,最后就是那个代码桶的网站是境外的,小esp32翻不了墙。
局域网通信的话就又有问题要考虑了。公网上的ip总是固定的,但局域网内的ip每次断电重连都会变化。总不能从127.0.0.0到255.255.255.255挨个试吧。诶,255.255.255.255?广播!我们可以用udp把自己的地址广播出去(应该不会收到二向箔吧),然后等待另一方的连接。
好,那么我们就可以初步利用udp的广播和tcp协议实现局域网未知ip匹配并进行消息传递的功能。
下面给出的是esp32端的核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
|
udp_addr = ('255.255.255.255', PORT) udp_sock = socket(AF_INET, SOCK_DGRAM)
tcp_addr = (ip, PORT) tcp_listen_sock = socket(AF_INET, SOCK_STREAM) tcp_listen_sock.settimeout(1) tcp_listen_sock.bind(tcp_addr) tcp_listen_sock.listen(1)
conn, addr = None, None print("waiting for connect...") while True: message = "[HI]" udp_sock.sendto(message, udp_addr)
try: conn, addr = tcp_listen_sock.accept() print(conn, addr, "connected") break except Exception as exc: if str(exc) == "[Errno 116] ETIMEDOUT" or str(exc) == "[Errno 11] EAGAIN": pass else: raise OSError(exc)
if (conn, addr) == (None, None): udp_sock.close() sys.exit()
conn.settimeout(5.0) while True: data = conn.recv(1024) if len(data) == 0: print("close socket") conn.close() break print(data) ret = conn.send(data)
udp_sock.close()
|
接下来是pc端的核心代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| address = ('', PORT) s = socket(AF_INET, SOCK_DGRAM) s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
s.bind(address)
while True: print(' wait recv...') data, address = s.recvfrom(1024) print(' [recv form %s:%d]:%s' % (address[0], address[1], data))
s.close()
if data.decode() == "[HI]": break IPADDR = address[0]
tcp_client_socket = socket(AF_INET, SOCK_STREAM)
server_ip, server_port = IPADDR, PORT tcp_client_socket.connect((server_ip, server_port))
send_data = input("请输入要发送的数据:") tcp_client_socket.send(send_data.encode())
recvData = tcp_client_socket.recv(1024) print('接收到的数据为:', recvData.decode())
tcp_client_socket.close()
|
三、抽象,抽象,更加抽象
在成功实现广播连接之后,实现OTA在线更新的大厦已经落成,所剩只是一点修饰的工作了。两小朵乌云之一是定义好各个指令的协议,并将这个esp32上位机模块和esp32OTA模块封装起来,之二且待后文详说。
首先我们要将recv、send等等函数封装一下,因为发送接收时会出现各种问题,不是掉链接,没回应,超时,就是回应的很奇怪,不符合预期。而这些问题的重要性也都不同,掉链接的话可以直接结束ota程序了,其他的话有的可以再试试,有的根本不用在意。
所以我们先定义了几种返回值,然后对两个关键函数封装了一下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| SUCCESS = 0 NOTICE = 1 WARNING = 2 ERROR = 3 CRITICAL = 4
class UpperComputer(object):
def recv_signal(self, description_of_this_time, expected_dat = None): try: dat = self.conn.recv(1024).decode() if len(dat) == 0: log.print_only("upper comp lost connection when {}".format(description_of_this_time)) return "", CRITICAL if expected_dat != None and dat != expected_dat: log.error("Response Invalid when {}. (res: {})".format(description_of_this_time, dat)) return dat, ERROR
return dat, SUCCESS except Exception as exc: log.error("No Response when {}.(exc: {})".format(description_of_this_time, exc)) return "", ERROR def send_signal(self, dat): try: self.conn.send(dat.encode()) return True, SUCCESS except Exception as exc: log.error("Send Failed {}. {}".format(dat, exc)) return False, CRITICAL
|
这样封装的好处是每次不需要进行繁复的try-except,只需要判断返回值是否正常即可。同时还避免了重复的log语句。
接下来是初始化函数,对udpsock和tcpsock分别进行初始化;广播连接函数已经介绍过了,之后是指令处理函数。需要注意tcp协议下如果连续发送消息,那么接收方可能一次recv收到对方两次send的内容,就可能会导致invalid response或者no reponse。为了避免这种情况,应该双方一唱一和,A发完消息B要回复收到。所以对于上位机发送的有后续补充信息的指令,要回复“ready”,没有补充信息的,就只需要回复”finish”或者查询的值即可。下面是esp32上用于和上位机通信的模块,pc端用于通信的模块类似,只是没有operation_handler()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| class UpperComputer (object): def __init__(self, self_ip, port): self.port = port self.ip = self_ip self.usock = socket(..) self.tsock = socket(..) def broadcast_and_connect(self): return True def operation_handler(self): dat, ret = self.recv_signal("ready to recv operation") if ret >= WARNING: return ret if dat == "[UPDATE]": self.send_signal("[READY]") return self.update() elif dat == '[GETFILELIST]': return self.get_file_list() else: self.conn.send(dat.encode()) return NOTICE
|
经过这样的抽象处理,OTA的功能就已经实现了,并且使用起来很简便。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| FILE_ROOT = "D:\\我的文档\\学习\\ESP32\\update\\" SEND_FILE_DICT = {"下位机程序.py": "main.py"} SAVE_PATH = "D:\\我的文档\\学习\\ESP32\\download\\" DOWNLOAD_LIST = ['log.txt', 'main.py']
if __name__ == "__main__": esp32 = ESP32Ctrl(PORT) esp32.connect() file_list, _ = esp32.get_file_list() print(file_list) esp32.delete("log.txt") esp32.close()
|
四、ESPREMOTE
但是每次更新都需要去改pc端的代码,比方需要上传或下载什么文件、想要获取文件列表还要重新改代码运行一次,颇有些麻烦。笔者想到初次接触到esp32时,用到了一个模块叫做ampy。这个模块利用命令行实现了对esp32上放置、移除文件、运行代码等操作,我们是否也可以把我们的上位机功能做成命令行呢?
python在实现命令行中os模块非常有用,os.path可以对文件路径进行诸多操作,os.chdir还可以自动记录当前所在文件夹,不用手动判断文件夹是否存在了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| class COMMANDER(object): def command_handler(self, command): arg_list = command.split(" ") op = arg_list [0].lower()
if op in ['cd']: try: command = command[len(op + " ") : ] command.strip("\"\'") os.chdir( './' + command) self.update_cwd() except Exception as exc: print(Fore.RED + f"ERROR: {exc}" + Fore.RESET) return
|
我们现将本模块称为espremote,调用指令为espremote command [args]。下面给出了espremote的实现方法和效果展示。
1 2 3 4 5 6 7 8 9 10 11 12 13
| elif op in ['espremote']: if len(arg_list) == 1 or arg_list[1] == 'help': print(help_info['help']) return
op2 = arg_list[1].lower() arg_list = arg_list[2:] if op2 in ['conn']: esp32.connect() return
|

espremote模块是一个能够给予特定协议远程控制esp32开发板的实用工具。利用Espremote可以远程连接板子、管理文件系统以及上传、下载文件。
该模块的pc端、esp32端模块和示例程序已经放在本人的仓库里了,欢迎大家取用。