""" CCSOne Python libary Copyright (C) 2023 Paul Schürholz This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ import socket import time import xmltodict from math import floor, ceil from threading import Thread import keyboard # Main Class CCSOne class CCSOne: def __init__(self, rip, rport, authname, hotkeys=True): self.rip = rip self.rport = rport self.cams = {} self.devices = {} self.connected = False self.success = False self. requesttimeout = 2.5 self.timeout = 5 self.name = "" self.location = "" self.authname = authname self.initsocket() self.auth(authname) self.getdeviceinformation() if hotkeys: self.setupkeyhooks() def __del__(self): self.close() def close(self): keyboard.unhook_all() self.closesocket() print("Closed") # Subclass for devices such as OCPs class Device: def __init__(self, sessionid, name, devicetype, connectionstate): self.sessionid = sessionid self.name = name self.type = devicetype self.connectionstate = connectionstate # Subclass for Camera devices class Cam: def __init__(self, num, ccsone): self.num = num self.ccsone = ccsone self.functions = {} # get current gain setting for specified color def getgain(self, color): functionid = {'r': 513, 'g': 514, 'b': 515} return floor(self.getfunctionvalue(functionid[color])/2.56) # set gain setting for specified color def setgain(self, color, value): functionid = {'r': 513, 'g': 514, 'b': 515} return self.setfunctionvalue(functionid[color], ceil(value*2.56)) # get current tally state def gettally(self, invers=True): return bool(not self.getfunctionvalue(8470) if invers else self.getfunctionvalue(8470)) # set tally state def settally(self, state, invers=True): return self.setfunctionvalue(8470, int(not state if invers else state)) def toggletally(self): return self.settally(not self.gettally()) # get current FStop Value def getf(self): xmltof = {9: "1.8", 11: "2.0", 12: "2.2", 43: "2.4", 13: "2.6", 14: "2.8", 15: "3.1", 44: "3.4", 16: "3.7", 17: "4.0", 18: "4.4", 45: "4.8", 19: "5.2", 20: "5.6", 21: "6.2", 46: "6.7", 22: "7.3", 23: "8.0", 24: "8.7", 47: "9.5", 25: "10.0", 26: "11.0", 27: "12.0", 48: "13.0", 29: "15.0", 29: "16.0", 49: "17.0", 50: "19.0", 30: "21.0", 51: "22.0", 31: "25.0"} return xmltof[self.getfunctionvalue(1039)] #return round(1.122462048 ** (self.getfunctionvalue(1039)-5), 1) # subscribe to function value def valuesubscribe(self, function, subscribe = "true"): return self.ccsone.send('%s'%(subscribe, self.num, function)) # get function value either from functions dict or directly from ccsone if unknown def getfunctionvalue(self, function, subscribe = "true"): if function in self.functions: functionvalue = self.functions[function] else: self.valuesubscribe(function, subscribe) timeout = self.ccsone.requesttimeout while not function in self.functions: time.sleep(0.01) timeout - 0.01 if timeout <= 0: return False functionvalue = self.functions[function] return functionvalue # set function value def setfunctionvalue(self, function, value): self.ccsone.success = False self.ccsone.send('%s%s'%(self.num, function, value)) return self.ccsone.waitforsuccess() # add a cam object by number to ccsone obj def addcam(self, num): try: self.cams[int(num)] = (self.Cam(int(num), self)) except: return False return True # get information about connected devices and subscribe def getdeviceinformation(self): return self.send('') # auth against ccsone def auth(self, name): self.send('%s'%name) # init socket and start recieve thread def initsocket(self): self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setblocking(0) self.socket.settimeout(self.timeout) try: self.socket.connect((self.rip, self.rport)) except TimeoutError: print("Connection Timeout") return False except: print("Connection Error") return False self.connected = True self.recvthread = Thread(target=self.recv) self.recvthread.start() return True # close socket def closesocket(self): try: self.socket.shutdown(socket.SHUT_WR) self.socket.close() except: return False self.connected = False return True # wait for positive response def waitforsuccess(self): timeout = self.requesttimeout while not self.success: time.sleep(0.01) timeout - 0.01 if timeout <= 0: return False return True # send data via socket def send(self, data): self.success = False try: self.socket.send(data.encode()) except: return False return self.waitforsuccess() # add newly reported devices into devices dict def updatedevicelist(self, device): if not device['sessionid'] in self.devices: self.devices[device['sessionid']] = self.Device(device['sessionid'], device['name'], device['type'], device['@connection-state']) if device['type'] == "Camera" and not device['name']['#text'] in self.cams: self.addcam(device['name']['#text']) # handle various responses from ccsone def handleresponse(self, data): xml = xmltodict.parse(data) for xmlroot in xml: if xmlroot == "request-response": if xml[xmlroot]['@result'] == "Ok": self.success = True else: print("Error %s"%xml[xmlroot]['@result']) elif xmlroot == "application-authentication-indication": print("Auth indication Name: %s Location: %s"%(xml[xmlroot]['name'],xml[xmlroot]['location'])) self.name = xml[xmlroot]['name'] self.location = xml[xmlroot]['location'] elif xmlroot == "function-value-indication": if type(xml[xmlroot]['device']) == list: self.cams[int(xml[xmlroot]['device'][0]['name'])].functions[int(xml[xmlroot]['device'][0]['function']['@id'])] = int(xml[xmlroot]['device'][0]['function']['value']) else: self.cams[int(xml[xmlroot]['device']['name'])].functions[int(xml[xmlroot]['device']['function']['@id'])] = int(xml[xmlroot]['device']['function']['value']) elif xmlroot == "device-information-indication": if type(xml[xmlroot]['device']) == list: for device in xml[xmlroot]['device']: self.updatedevicelist(device) else: self.updatedevicelist(xml[xmlroot]) # recv function for recieve thread def recv(self): while self.connected: self.socket.settimeout(None) data = self.socket.recv(8192).decode() if not data: return self.handleresponse(data) def setupkeyhooks(self): keyboard.on_press(self.handlekeypress) def handlekeypress(self, keydown): if keydown.name == 'q': self.close() elif keydown.name in range(1,10): self.cams[keydown.name].toggletally()