initial commit

This commit is contained in:
Paul 2023-07-25 02:45:49 +02:00
parent 80571fa781
commit 179e543fee
12 changed files with 298 additions and 1 deletions

View File

@ -1,3 +1,15 @@
# esp-artnet-http-to-dmx
ESP Arpnet + HTTP to DMX bridge
ESP Arpnet + HTTP to DMX bridge
dmx.py handels outgoing dmx
dmx_dummy.py is a dummy version of dmx.py for local testing without requiering the esp UART interface
artnetserver.py handels incoming artnet
httpserver.py handels incoming http requests
fade.py handels color fades
testfiles for httpserver and artnetserver are in the test directory

14
artnet.py Normal file
View File

@ -0,0 +1,14 @@
class ArtNetServer():
def __init__(self, ip, port):
self.ip = ip
self.port = port
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(('', 6454))
s.listen()
while True:
conn, addr = s.accept()
print('Got a connection from %s' % str(addr))

38
artnetserver.py Normal file
View File

@ -0,0 +1,38 @@
try:
import usocket as socket
except:
import socket
class ArtNetServer():
def __init__(self, ip, port, universe=0):
self.ip = ip
self.port = port
self.universe = universe
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.setblocking(False)
self.socket.bind((ip, port))
def __del__(self):
self.close()
def close(self):
self.socket.close()
def checkdata(self, dmx):
try:
data, address = self.socket.recvfrom(1024)
except OSError as e:
err = e.args[0]
if err != 11:
raise e
else:
if self.validate_header(data):
universe = int.from_bytes(data[14:16], 'little')
if universe != self.universe:
return -1
dmx.set_channels(list(data)[18:])
return 0
def validate_header(self, header):
return header[:12] == b'Art-Net\x00\x00P\x00\x0e'

1
boot.py Normal file
View File

@ -0,0 +1 @@
# boot.py -- run on boot-up

34
dmx.py Normal file
View File

@ -0,0 +1,34 @@
from machine import UART, Pin
import time
from array import array
tx_pins = [1, 17]
class universe():
def __init__(self, uartport, dirselectport):
self.uartport = uartport
self.dirselect = Pin(dirselectport, Pin.OUT)
dmx_uart = uart = UART(uartport)
del(dmx_uart)
self.dmx_values = array('B', [0] * 513)
def get_rgb(self, address):
return {'r': dmx.dmx_values[address], 'g': dmx.dmx_values[address+1], 'b': dmx.dmx_values[address+2]}
def set_channels(self, frame):
for ch in frame:
self.dmx_values[ch] = frame[ch]
def write_frame(self):
self.dirselect.value(1)
dmx_uart = Pin(tx_pins[self.uartport], Pin.OUT)
dmx_uart.value(0)
time.sleep_us(74)
dmx_uart.value(1)
dmx_uart = UART(self.uartport)
dmx_uart.init(115200, bits=8, parity=None, stop=2)
dmx_uart.write(self.dmx_values)
del(dmx_uart)

12
dmx_dummy.py Normal file
View File

@ -0,0 +1,12 @@
from array import array
class universe():
def __init__(self):
self.dmx_values = array('B', [0] * 513)
def set_channels(self, frame):
for ch in message:
self.dmx_values[ch] = frame[ch]
def write_frame(self):
print(self.dmx_values)

34
fade.py Normal file
View File

@ -0,0 +1,34 @@
import time
class FadeHandler():
def __init__(self):
self.fades = []
class RGBFade():
def __init__(self, dmx, address, target, time):
self.address = address
self.target = target
self.before = dmx.get_rgb(address)
self.time = time
self.start = time.ticks_ms()
self.finished = False
def step(self, dmx):
if dmx.get_rgb(address) == target:
self.finished = True
return
fadeprogress = time.ticks_diff(time.ticks_ms(), self.start)/self.time
dmx.dmx_values[address] = self.target['r'] - self.before['r'] * fadeprogress
dmx.dmx_values[address+1] = self.target['g'] - self.before['g'] * fadeprogress
dmx.dmx_values[address+2] = self.target['b'] - self.before['b'] * fadeprogress
def addrgbfade(self, dmx, address, target, time):
self.fades.append(self.RGBFade(dmx, address, target, time))
def checkfades(self, dmx):
for fadei in self.fades:
if self.fades[fadei].finished:
self.fades = self.fades.pop(fadei)
continue
self.fades[fadei].step(dmx)

80
httpserver.py Normal file
View File

@ -0,0 +1,80 @@
try:
import usocket as socket
except:
import socket
import json
configpage = '<html><head><title>Configseite</title></head><body><h1>huhu</h1></body></html>'
class HTTPServer():
def __init__(self, ip, port):
self.ip = ip
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setblocking(False)
self.socket.bind((ip, port))
self.socket.listen()
def __del__(self):
self.close()
def close(self):
self.socket.close()
def checkdata(self, dmx, f):
try:
conn, address = self.socket.accept()
except OSError as e:
err = e.args[0]
if err != 11:
raise e
else:
request = conn.recv(1024).decode()
print(request)
requestline = request.split('\n')[0]
headers = dict(headerpair.split(': ') for headerpair in request.split('\n')[1:])
method = requestline.split()[0]
requestpath = requestline.split()[1].split("?", 1)
query = dict()
if len(requestpath) > 1:
query = dict(querypair.split('=') for querypair in requestpath[1].split('&'))
if method == 'POST':
if 'Content-Length' in headers and 'Content-Type' in headers:
if headers['Content-Type'] == 'application/json':
try:
body = json.loads(request[len(request)-headers['Content-Length']:])
except ValueError:
self.sendresponse(conn, 'HTTP/1.0 415 Unsupported Media Type\n\nUnsupported Media Type')
return -1
else:
self.sendresponse(conn, 'HTTP/1.0 415 Unsupported Media Type\n\nUnsupported Media Type')
return -1
else:
self.sendresponse(conn, 'HTTP/1.0 411 Length Required\n\nLength Required')
return -1
if requestpath[0] == '/' and method == 'GET':
self.sendresponse(conn, 'HTTP/1.0 200 OK\nConnection: close\nContent-Length: %s\nContent-Type: text/html; charset=UTF-8\n\n%s'%(len(configpage), configpage))
return 0
elif requestpath[0] == '/dmx' and method == 'POST':
dmx.set_channels(body['dmx'])
self.sendresponse(conn, 'HTTP/1.0 200 OK\n\n')
return 0
elif requestpath[0] == '/rgb' and method == 'GET':
dmx.dmx_values[int(query['address'])] = query['r']
dmx.dmx_values[int(query['address'])+1] = query['g']
dmx.dmx_values[int(query['address'])+2] = query['b']
self.sendresponse(conn, 'HTTP/1.0 200 OK\n\n')
return 0
elif requestpath[0] == '/fadeto' and method == 'GET':
f.addrgbfade(dmx, int(query['address']), {'r': query['r'], 'g': query['g'], 'b': query['b']}, query['time'])
self.sendresponse(conn, 'HTTP/1.0 200 OK\n\n')
return 0
else:
self.sendresponse(conn, 'HTTP/1.0 404 NOT FOUND\n\nFile Not Found')
return -1
def sendresponse(self, conn, response):
conn.sendall(response.encode())
conn.close()
return 0

34
main.py Normal file
View File

@ -0,0 +1,34 @@
# main.py -- put your code here!
import dmx
import artnetserver
import httpserver
import fade
import time
print("starting artnet server")
# create a artnetserver object
a = artnetserver.ArtNetServer('127.0.0.1', 6454, 0)
print("starting http server")
# create a httpserver object
h = httpserver.HTTPServer("127.0.0.1", 8080)
print("creating fadehandler")
# create a fadehandler object
f = fade.FadeHandler()
# create a dmx device on UART port 1 with port 21 as direction
dmx0 = dmx.universe(1, 21)
while 1:
# get incoming artnet data
a.checkdata(dmx)
# get incoming http data
h.checkdata(dmx, f)
# checkfaders for completion and do next step
f.checkfades(dmx)
print("writing dmx")
# writing dmx data
dmx0.write_frame()

3
pymakr.conf Normal file
View File

@ -0,0 +1,3 @@
{
"name": "ESP Arpnet + HTTP to DMX bridge"
}

28
test/artnet-test.py Normal file
View File

@ -0,0 +1,28 @@
import asyncio
from pyartnet import ArtNetNode
async def main():
# Run this code in your async function
node = ArtNetNode('127.0.0.1', 6454)
# Create universe 0
universe = node.add_universe(0)
# Add a channel to the universe which consists of 3 values
# Default size of a value is 8Bit (0..255) so this would fill
# the DMX values 1..3 of the universe
channel = universe.add_channel(start=12, width=3)
channel2 = universe.add_channel(start=15, width=3)
# Fade channel to 255,0,0 in 5s
# The fade will automatically run in the background
channel.add_fade([3,2,1], 500)
channel2.add_fade([1,2,3], 500)
#channel.set_values([255,0,127])
# this can be used to wait till the fade is complete
await channel
asyncio.run(main())

7
test/http-test.py Normal file
View File

@ -0,0 +1,7 @@
import requests
json_data = {'dmx': [3,7,1,0,3,4]}
response = requests.post('http://127.0.0.1:8081/dmx', json = json_data)
if response.status_code == 200:
print('Success!')