From 95c4e1e95548988ba145475ff19046a96fa5b54b Mon Sep 17 00:00:00 2001 From: Humorhenker Date: Sat, 4 Mar 2023 12:35:37 +0100 Subject: [PATCH] added pyccsone.py, extended readme --- README.md | 46 ++++++++- pyccsone.py | 261 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 306 insertions(+), 1 deletion(-) create mode 100644 pyccsone.py diff --git a/README.md b/README.md index d4619e8..aa04a40 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,47 @@ # pyccsone -Python libary for communication with the Grass Valley CCS-ONE using its xml interface \ No newline at end of file +Python libary for communication with the Grass Valley CCS-ONE using its xml interface + + +## Usage: +### create CCSOne object: +ccsoneobj = ccsone.CCSOne(IP_ADDRESS, PORT, AUTHNAME, HOTKEYS) + +on creation connected devices and cameras are fetched +responses are handled automatically in a recieve thread + +### close connection: +ccsoneobj.close() + +### get device information: +ccsoneobj.getdeviceinformation() + +### add camera object: +ccsoneobj.addcam(CAMERA_NUMBER) + +### subscribe to function: +ccsoneobj.cams[CAMERA_NUMBER].valuesubscribe(FUNCTION_ID) + +### get function value / subscribe if not known: +ccsoneobj.getfunctionvalue(FUNCTION_ID) + +### change function value: +ccsoneobj.cams[CAMERA_NUMBER].functionvaluechange(FUNCTION_ID, VALUE) + +### set tally state: +ccsoneobj.cams[CAMERA_NUMBER].settally(TALLY_STATE) + +### toggle tally state: +ccsoneobj.cams[CAMERA_NUMBER].toggletally() + +### get tally state: +ccsoneobj.cams[CAMERA_NUMBER].gettally() + +### set gain COLOR=>(r,g,b) VALUE=>0-99: +ccsoneobj.cams[CAMERA_NUMBER].setgain(COLOR, VALUE) + +### get gain (r,g,b): +ccsoneobj.cams[CAMERA_NUMBER].getgain(COLOR) + +### get Fstop: +ccsoneobj.cams[CAMERA_NUMBER].getf() \ No newline at end of file diff --git a/pyccsone.py b/pyccsone.py new file mode 100644 index 0000000..678db38 --- /dev/null +++ b/pyccsone.py @@ -0,0 +1,261 @@ +""" +CCSOne Python libary +written by Paul Schürholz + +Example usage: +create CCSOne object: +ccsoneobj = ccsone.CCSOne(IP_ADDRESS, PORT, AUTHNAME, HOTKEYS) + +close connection: +ccsoneobj.closesocket() + +get device information: +ccsoneobj.getdeviceinformation() + +add camera object: +ccsoneobj.addcam(CAMERA_NUMBER) + +subscribe to function: +ccsoneobj.cams[CAMERA_NUMBER].valuesubscribe(FUNCTION_ID) + +get function value / subscribe if not known: +ccsoneobj.getfunctionvalue(FUNCTION_ID) + +change function value: +ccsoneobj.cams[CAMERA_NUMBER].functionvaluechange(FUNCTION_ID, VALUE) + +set tally state: +ccsoneobj.cams[CAMERA_NUMBER].settally(TALLY_STATE) + +toggle tally state: +ccsoneobj.cams[CAMERA_NUMBER].toggletally() + +get tally state: +ccsoneobj.cams[CAMERA_NUMBER].gettally() + +set gain COLOR=>(r,g,b) VALUE=>0-99: +ccsoneobj.cams[CAMERA_NUMBER].setgain(COLOR, VALUE) + +get gain (r,g,b): +ccsoneobj.cams[CAMERA_NUMBER].getgain(COLOR) + +get Fstop: +ccsoneobj.cams[CAMERA_NUMBER].getf() + +responses are handled automatically in a recieve thread +""" +import socket +import time +import xmltodict +from math import floor, ceil +from threading import Thread +import keyboard + +# Main Class CCSOne +class CCSOne: + def __init__(self, rip, rport, authname, hotkeys=True): + self.rip = rip + self.rport = rport + self.cams = {} + self.devices = {} + self.connected = False + self.success = False + self. requesttimeout = 2.5 + self.timeout = 5 + self.name = "" + self.location = "" + self.authname = authname + + self.initsocket() + self.auth(authname) + self.getdeviceinformation() + if hotkeys: + self.setupkeyhooks() + + def __del__(self): + self.close() + + def close(self): + keyboard.unhook_all() + self.closesocket() + print("Closed") + + # Subclass for devices such as OCPs + class Device: + def __init__(self, sessionid, name, devicetype, connectionstate): + self.sessionid = sessionid + self.name = name + self.type = devicetype + self.connectionstate = connectionstate + + # Subclass for Camera devices + class Cam: + def __init__(self, num, ccsone): + self.num = num + self.ccsone = ccsone + self.functions = {} + + # get current gain setting for specified color + def getgain(self, color): + functionid = {'r': 513, 'g': 514, 'b': 515} + return floor(self.getfunctionvalue(functionid[color])/2.56) + + # set gain setting for specified color + def setgain(self, color, value): + functionid = {'r': 513, 'g': 514, 'b': 515} + return self.setfunctionvalue(functionid[color], ceil(value*2.56)) + + # get current tally state + def gettally(self, invers=True): + return bool(not self.getfunctionvalue(8470) if invers else self.getfunctionvalue(8470)) + + # set tally state (1=no tally, 0=tally) + def settally(self, state, invers=True): + return self.setfunctionvalue(8470, int(not state if invers else state)) + + def toggletally(self): + return self.settally(not self.gettally()) + + # get current FStop Value + def getf(self): + xmltof = {9: "1.8", 11: "2.0", 12: "2.2", 43: "2.4", 13: "2.6", 14: "2.8", 15: "3.1", 44: "3.4", 16: "3.7", 17: "4.0", 18: "4.4", 45: "4.8", 19: "5.2", 20: "5.6", 21: "6.2", 46: "6.7", 22: "7.3", 23: "8.0", 24: "8.7", 47: "9.5", 25: "10.0", 26: "11.0", 27: "12.0", 48: "13.0", 29: "15.0", 29: "16.0", 49: "17.0", 50: "19.0", 30: "21.0", 51: "22.0", 31: "25.0"} + return xmltof[self.getfunctionvalue(1039)] + #return round(1.122462048 ** (self.getfunctionvalue(1039)-5), 1) + + # subscribe to function value + def valuesubscribe(self, function, subscribe = "true"): + return self.ccsone.send('%s'%(subscribe, self.num, function)) + + # get function value either from functions dict or directly from ccsone if unknown + def getfunctionvalue(self, function, subscribe = "true"): + if function in self.functions: + functionvalue = self.functions[function] + else: + self.valuesubscribe(function, subscribe) + timeout = self.ccsone.requesttimeout + while not function in self.functions: + time.sleep(0.05) + timeout - 0.05 + if timeout <= 0: + return False + functionvalue = self.functions[function] + return functionvalue + + # set function value + def setfunctionvalue(self, function, value): + self.ccsone.success = False + self.ccsone.send('%s%s'%(self.num, function, value)) + return self.ccsone.waitforsuccess() + + # add a cam object by number to ccsone obj + def addcam(self, num): + try: + self.cams[int(num)] = (self.Cam(int(num), self)) + except: + return False + return True + + # get information about connected devices and subscribe + def getdeviceinformation(self): + return self.send('') + + # auth against ccsone + def auth(self, name): + self.send('%s'%name) + + # init socket and start recieve thread + def initsocket(self): + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setblocking(0) + self.socket.settimeout(self.timeout) + try: + self.socket.connect((self.rip, self.rport)) + except TimeoutError: + print("Connection Timeout") + return False + except: + print("Connection Error") + return False + self.connected = True + self.recvthread = Thread(target=self.recv) + self.recvthread.start() + return True + + # close socket + def closesocket(self): + try: + self.socket.shutdown(socket.SHUT_WR) + self.socket.close() + except: + return False + self.connected = False + return True + + # wait for positive response + def waitforsuccess(self): + timeout = self.requesttimeout + while not self.success: + time.sleep(0.01) + timeout - 0.01 + if timeout <= 0: + return False + return True + + # send data via socket + def send(self, data): + self.success = False + try: + self.socket.send(data.encode()) + except: + return False + return self.waitforsuccess() + + # add newly reported devices into devices dict + def updatedevicelist(self, device): + if not device['sessionid'] in self.devices: + self.devices[device['sessionid']] = self.Device(device['sessionid'], device['name'], device['type'], device['@connection-state']) + if device['type'] == "Camera" and not device['name']['#text'] in self.cams: + self.addcam(device['name']['#text']) + + # handle various responses from ccsone + def handleresponse(self, data): + xml = xmltodict.parse(data) + for xmlroot in xml: + if xmlroot == "request-response": + if xml[xmlroot]['@result'] == "Ok": + self.success = True + else: + print("Error %s"%xml[xmlroot]['@result']) + elif xmlroot == "application-authentication-indication": + print("Auth indication Name: %s Location: %s"%(xml[xmlroot]['name'],xml[xmlroot]['location'])) + self.name = xml[xmlroot]['name'] + self.location = xml[xmlroot]['location'] + elif xmlroot == "function-value-indication": + if type(xml[xmlroot]['device']) == list: + self.cams[int(xml[xmlroot]['device'][0]['name'])].functions[int(xml[xmlroot]['device'][0]['function']['@id'])] = int(xml[xmlroot]['device'][0]['function']['value']) + else: + self.cams[int(xml[xmlroot]['device']['name'])].functions[int(xml[xmlroot]['device']['function']['@id'])] = int(xml[xmlroot]['device']['function']['value']) + elif xmlroot == "device-information-indication": + if type(xml[xmlroot]['device']) == list: + for device in xml[xmlroot]['device']: + self.updatedevicelist(device) + else: + self.updatedevicelist(xml[xmlroot]) + + # recv function for recieve thread + def recv(self): + while self.connected: + self.socket.settimeout(None) + data = self.socket.recv(8192).decode() + if not data: + return + self.handleresponse(data) + + def setupkeyhooks(self): + keyboard.on_press(self.handlekeypress) + + def handlekeypress(self, keydown): + if keydown.name == 'q': + self.close() + elif keydown.name in range(1,10): + self.cams[keydown.name].toggletally()