-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathwebserver.py
157 lines (135 loc) · 4.99 KB
/
webserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import network
import socket
import machine
import json
import utime
def unquote_plus(string):
# Replace '+' with ' ' and decode percent-encoded characters
string = string.replace('+', ' ')
parts = string.split('%')
if len(parts) > 1:
string = parts[0]
for item in parts[1:]:
try:
if len(item) >= 2:
string += chr(int(item[:2], 16)) + item[2:]
else:
string += '%' + item
except ValueError:
string += '%' + item
return string
# Define HTML escape function
def escape_html(text):
html_escape_table = {
"&": "&",
'"': """,
"'": "'",
">": ">",
"<": "<",
}
return "".join(html_escape_table.get(c, c) for c in text)
# Configure Access Point
ap = network.WLAN(network.AP_IF)
ap.active(True)
ap.config(essid='esp32-diesel-ecu', password='794759876')
# HTML page template
HTML_PAGE = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>ZorroBurner Configuration</title>
</head>
<body>
<div class="container">
<h1>ZorroBurner Configuration</h1>
<form action="/set" method="post">
{} <!-- Form fields will be injected here -->
<input type="submit" value="Save">
</form>
<form action="/restart" method="post">
<input type="submit" value="Restart ESP32" class="restart-btn">
</form>
</div>
</body>
</html>
"""
# Read config.json and return as dictionary
def read_config_params():
try:
with open('config.json', 'r') as f:
return json.load(f)
except OSError:
return {}
# Generate HTML page based on config.json
def generate_html_page(params):
input_fields = ""
for section, settings in params.items():
input_fields += f"<h2>{escape_html(section)}</h2>"
for key, value in settings.items():
safe_key = escape_html(key)
safe_value = escape_html(str(value))
input_fields += f'{safe_key}: <input type="text" name="{section}.{safe_key}" value="{safe_value}"><br>'
return HTML_PAGE.format(input_fields)
# Custom pretty-print function for JSON-like dictionaries
def pretty_print_json(data, indent=4, level=0):
if not isinstance(data, dict): # if the data is not a dictionary, just return it as a string
return str(data)
items = []
for key, value in data.items():
items.append(' ' * (level * indent) + f'"{key}": ' + (
pretty_print_json(value, indent, level + 1) if isinstance(value, dict) else json.dumps(value)))
return "{\n" + ",\n".join(items) + "\n" + ' ' * (level - 1) * indent + "}"
def handle_post_data(data):
params = read_config_params()
# Parse POST data
lines = data.split('&')
for line in lines:
section_key_value = line.split('=')
if len(section_key_value) == 2:
section_key, value = map(unquote_plus, section_key_value)
section, key = section_key.split('.')
if value.lower() in ('true', 'on'):
value = True
elif value.lower() == 'off':
value = False
else:
try:
value = float(value) if '.' in value else int(value)
except ValueError:
pass # If not a number, leave as string
# Ensure that both the section and the key exist before updating
if section not in params:
params[section] = {}
params[section][key] = value
# Write updated parameters back to config.json with custom pretty-printing
with open('config.json', 'w') as f:
f.write(pretty_print_json(params))
return params # Returning params is optional, depending on whether you want to use it after calling this function
# Web server function
def web_server():
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
s = socket.socket()
s.bind(addr)
s.listen(1)
while True:
conn, addr = s.accept()
request = conn.recv(1024)
request_str = str(request, 'utf-8')
if request_str.startswith('POST'):
post_data = request_str.split('\r\n\r\n')[-1]
if "/restart" in request_str:
conn.sendall("HTTP/1.1 200 OK\r\n\r\nRestarting...".encode('utf-8'))
conn.close()
utime.sleep(1) # Delay to ensure the response is sent before resetting
machine.reset()
else:
handle_post_data(post_data)
# Redirect to root
conn.sendall("HTTP/1.1 303 See Other\r\nLocation: /\r\n\r\n".encode('utf-8'))
else:
params = read_config_params()
html_page = generate_html_page(params)
conn.sendall("HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n".encode('utf-8') + html_page.encode('utf-8'))
conn.close()