added pyccsone.py, extended readme

This commit is contained in:
Paul 2023-03-04 12:35:37 +01:00
parent 56f4b719f5
commit 95c4e1e955
2 changed files with 306 additions and 1 deletions

View File

@ -1,3 +1,47 @@
# pyccsone
Python libary for communication with the Grass Valley CCS-ONE using its xml interface
Python libary for communication with the Grass Valley CCS-ONE using its xml interface
## Usage:
### create CCSOne object:
ccsoneobj = ccsone.CCSOne(IP_ADDRESS, PORT, AUTHNAME, HOTKEYS)
on creation connected devices and cameras are fetched
responses are handled automatically in a recieve thread
### close connection:
ccsoneobj.close()
### get device information:
ccsoneobj.getdeviceinformation()
### add camera object:
ccsoneobj.addcam(CAMERA_NUMBER)
### subscribe to function:
ccsoneobj.cams[CAMERA_NUMBER].valuesubscribe(FUNCTION_ID)
### get function value / subscribe if not known:
ccsoneobj.getfunctionvalue(FUNCTION_ID)
### change function value:
ccsoneobj.cams[CAMERA_NUMBER].functionvaluechange(FUNCTION_ID, VALUE)
### set tally state:
ccsoneobj.cams[CAMERA_NUMBER].settally(TALLY_STATE)
### toggle tally state:
ccsoneobj.cams[CAMERA_NUMBER].toggletally()
### get tally state:
ccsoneobj.cams[CAMERA_NUMBER].gettally()
### set gain COLOR=>(r,g,b) VALUE=>0-99:
ccsoneobj.cams[CAMERA_NUMBER].setgain(COLOR, VALUE)
### get gain (r,g,b):
ccsoneobj.cams[CAMERA_NUMBER].getgain(COLOR)
### get Fstop:
ccsoneobj.cams[CAMERA_NUMBER].getf()

261
pyccsone.py Normal file
View File

@ -0,0 +1,261 @@
"""
CCSOne Python libary
written by Paul Schürholz
Example usage:
create CCSOne object:
ccsoneobj = ccsone.CCSOne(IP_ADDRESS, PORT, AUTHNAME, HOTKEYS)
close connection:
ccsoneobj.closesocket()
get device information:
ccsoneobj.getdeviceinformation()
add camera object:
ccsoneobj.addcam(CAMERA_NUMBER)
subscribe to function:
ccsoneobj.cams[CAMERA_NUMBER].valuesubscribe(FUNCTION_ID)
get function value / subscribe if not known:
ccsoneobj.getfunctionvalue(FUNCTION_ID)
change function value:
ccsoneobj.cams[CAMERA_NUMBER].functionvaluechange(FUNCTION_ID, VALUE)
set tally state:
ccsoneobj.cams[CAMERA_NUMBER].settally(TALLY_STATE)
toggle tally state:
ccsoneobj.cams[CAMERA_NUMBER].toggletally()
get tally state:
ccsoneobj.cams[CAMERA_NUMBER].gettally()
set gain COLOR=>(r,g,b) VALUE=>0-99:
ccsoneobj.cams[CAMERA_NUMBER].setgain(COLOR, VALUE)
get gain (r,g,b):
ccsoneobj.cams[CAMERA_NUMBER].getgain(COLOR)
get Fstop:
ccsoneobj.cams[CAMERA_NUMBER].getf()
responses are handled automatically in a recieve thread
"""
import socket
import time
import xmltodict
from math import floor, ceil
from threading import Thread
import keyboard
# Main Class CCSOne
class CCSOne:
def __init__(self, rip, rport, authname, hotkeys=True):
self.rip = rip
self.rport = rport
self.cams = {}
self.devices = {}
self.connected = False
self.success = False
self. requesttimeout = 2.5
self.timeout = 5
self.name = ""
self.location = ""
self.authname = authname
self.initsocket()
self.auth(authname)
self.getdeviceinformation()
if hotkeys:
self.setupkeyhooks()
def __del__(self):
self.close()
def close(self):
keyboard.unhook_all()
self.closesocket()
print("Closed")
# Subclass for devices such as OCPs
class Device:
def __init__(self, sessionid, name, devicetype, connectionstate):
self.sessionid = sessionid
self.name = name
self.type = devicetype
self.connectionstate = connectionstate
# Subclass for Camera devices
class Cam:
def __init__(self, num, ccsone):
self.num = num
self.ccsone = ccsone
self.functions = {}
# get current gain setting for specified color
def getgain(self, color):
functionid = {'r': 513, 'g': 514, 'b': 515}
return floor(self.getfunctionvalue(functionid[color])/2.56)
# set gain setting for specified color
def setgain(self, color, value):
functionid = {'r': 513, 'g': 514, 'b': 515}
return self.setfunctionvalue(functionid[color], ceil(value*2.56))
# get current tally state
def gettally(self, invers=True):
return bool(not self.getfunctionvalue(8470) if invers else self.getfunctionvalue(8470))
# set tally state (1=no tally, 0=tally)
def settally(self, state, invers=True):
return self.setfunctionvalue(8470, int(not state if invers else state))
def toggletally(self):
return self.settally(not self.gettally())
# get current FStop Value
def getf(self):
xmltof = {9: "1.8", 11: "2.0", 12: "2.2", 43: "2.4", 13: "2.6", 14: "2.8", 15: "3.1", 44: "3.4", 16: "3.7", 17: "4.0", 18: "4.4", 45: "4.8", 19: "5.2", 20: "5.6", 21: "6.2", 46: "6.7", 22: "7.3", 23: "8.0", 24: "8.7", 47: "9.5", 25: "10.0", 26: "11.0", 27: "12.0", 48: "13.0", 29: "15.0", 29: "16.0", 49: "17.0", 50: "19.0", 30: "21.0", 51: "22.0", 31: "25.0"}
return xmltof[self.getfunctionvalue(1039)]
#return round(1.122462048 ** (self.getfunctionvalue(1039)-5), 1)
# subscribe to function value
def valuesubscribe(self, function, subscribe = "true"):
return self.ccsone.send('<function-value-request subscribe="%s" response-level="Always"><device><name>%s</name><function id="%s" /></device></function-value-request>'%(subscribe, self.num, function))
# get function value either from functions dict or directly from ccsone if unknown
def getfunctionvalue(self, function, subscribe = "true"):
if function in self.functions:
functionvalue = self.functions[function]
else:
self.valuesubscribe(function, subscribe)
timeout = self.ccsone.requesttimeout
while not function in self.functions:
time.sleep(0.05)
timeout - 0.05
if timeout <= 0:
return False
functionvalue = self.functions[function]
return functionvalue
# set function value
def setfunctionvalue(self, function, value):
self.ccsone.success = False
self.ccsone.send('<function-value-change response-level="Always"><device><name>%s</name><function id="%s"><value>%s</value></function></device></function-value-change>'%(self.num, function, value))
return self.ccsone.waitforsuccess()
# add a cam object by number to ccsone obj
def addcam(self, num):
try:
self.cams[int(num)] = (self.Cam(int(num), self))
except:
return False
return True
# get information about connected devices and subscribe
def getdeviceinformation(self):
return self.send('<device-information-request subscribe="true"/>')
# auth against ccsone
def auth(self, name):
self.send('<application-authentication-request xml-protocol="2.0" response-level="Always"><name>%s</name></application-authentication-request>'%name)
# init socket and start recieve thread
def initsocket(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setblocking(0)
self.socket.settimeout(self.timeout)
try:
self.socket.connect((self.rip, self.rport))
except TimeoutError:
print("Connection Timeout")
return False
except:
print("Connection Error")
return False
self.connected = True
self.recvthread = Thread(target=self.recv)
self.recvthread.start()
return True
# close socket
def closesocket(self):
try:
self.socket.shutdown(socket.SHUT_WR)
self.socket.close()
except:
return False
self.connected = False
return True
# wait for positive response
def waitforsuccess(self):
timeout = self.requesttimeout
while not self.success:
time.sleep(0.01)
timeout - 0.01
if timeout <= 0:
return False
return True
# send data via socket
def send(self, data):
self.success = False
try:
self.socket.send(data.encode())
except:
return False
return self.waitforsuccess()
# add newly reported devices into devices dict
def updatedevicelist(self, device):
if not device['sessionid'] in self.devices:
self.devices[device['sessionid']] = self.Device(device['sessionid'], device['name'], device['type'], device['@connection-state'])
if device['type'] == "Camera" and not device['name']['#text'] in self.cams:
self.addcam(device['name']['#text'])
# handle various responses from ccsone
def handleresponse(self, data):
xml = xmltodict.parse(data)
for xmlroot in xml:
if xmlroot == "request-response":
if xml[xmlroot]['@result'] == "Ok":
self.success = True
else:
print("Error %s"%xml[xmlroot]['@result'])
elif xmlroot == "application-authentication-indication":
print("Auth indication Name: %s Location: %s"%(xml[xmlroot]['name'],xml[xmlroot]['location']))
self.name = xml[xmlroot]['name']
self.location = xml[xmlroot]['location']
elif xmlroot == "function-value-indication":
if type(xml[xmlroot]['device']) == list:
self.cams[int(xml[xmlroot]['device'][0]['name'])].functions[int(xml[xmlroot]['device'][0]['function']['@id'])] = int(xml[xmlroot]['device'][0]['function']['value'])
else:
self.cams[int(xml[xmlroot]['device']['name'])].functions[int(xml[xmlroot]['device']['function']['@id'])] = int(xml[xmlroot]['device']['function']['value'])
elif xmlroot == "device-information-indication":
if type(xml[xmlroot]['device']) == list:
for device in xml[xmlroot]['device']:
self.updatedevicelist(device)
else:
self.updatedevicelist(xml[xmlroot])
# recv function for recieve thread
def recv(self):
while self.connected:
self.socket.settimeout(None)
data = self.socket.recv(8192).decode()
if not data:
return
self.handleresponse(data)
def setupkeyhooks(self):
keyboard.on_press(self.handlekeypress)
def handlekeypress(self, keydown):
if keydown.name == 'q':
self.close()
elif keydown.name in range(1,10):
self.cams[keydown.name].toggletally()