los gehts
This commit is contained in:
parent
b902986d9e
commit
63e6baf1e0
|
@ -0,0 +1,72 @@
|
||||||
|
import sp
|
||||||
|
import yt
|
||||||
|
import ts3ab
|
||||||
|
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
def comparelists(sppl, ts3abpl):
|
||||||
|
newitems = []
|
||||||
|
delitems = []
|
||||||
|
for spplitem in sppl:
|
||||||
|
found = False
|
||||||
|
for ts3abplitem in ts3abpl.items:
|
||||||
|
if (ts3abplitem['Title'] == "{artist} - {name}".format(artist=' & '.join(spplitem['artists']), name=spplitem['name'])):
|
||||||
|
found = True
|
||||||
|
if (found == False):
|
||||||
|
newitems.append({'spitem': spplitem, 'string': "{artist} - {name}".format(artist=' & '.join(spplitem['artists']), name=spplitem['name'])})
|
||||||
|
for i, ts3abplitem in enumerate(ts3abpl.items):
|
||||||
|
found = False
|
||||||
|
for spplitem in sppl:
|
||||||
|
if (ts3abplitem['Title'] == "{artist} - {name}".format(artist=' & '.join(spplitem['artists']), name=spplitem['name'])):
|
||||||
|
found = True
|
||||||
|
if (found == False):
|
||||||
|
delitems.append({'listid': ts3abpl.id, 'index': i})
|
||||||
|
return newitems, delitems
|
||||||
|
|
||||||
|
def synclist(spobj, ts3abobj, sp_pl_id, ts3ab_listid=None, debug=False):
|
||||||
|
if debug: print("fetching spotify pl")
|
||||||
|
|
||||||
|
spplname, sppl = sp.getspotifyplitems(spobj, sp_pl_id)
|
||||||
|
|
||||||
|
if debug: print("spotifyplname is {spplname}".format(spplname=spplname))
|
||||||
|
|
||||||
|
if debug: print("fetching ts3ab pl")
|
||||||
|
|
||||||
|
if (ts3ab_listid == None): ts3ab_listid = re.sub(r"[\"\/\'\(\)\s\t]", "_", spplname)
|
||||||
|
|
||||||
|
ts3abpl = ts3abobj.getplaylist(ts3ab_listid)
|
||||||
|
|
||||||
|
if debug: print("comparing pls")
|
||||||
|
|
||||||
|
newitems, delitems = comparelists(sppl, ts3abpl)
|
||||||
|
|
||||||
|
if debug: print("deleting old ts3ab items")
|
||||||
|
|
||||||
|
for delitem in delitems:
|
||||||
|
if debug: print("deleting {listid}, {index}, {songname}".format(listid=ts3abpl.id, index=delitem['index'], songname=ts3abpl.items[delitem['index']]))
|
||||||
|
ts3abpl.delitem(delitem['index'])
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
if debug: print("adding new items to ts3ab")
|
||||||
|
|
||||||
|
for newitem in newitems:
|
||||||
|
tries = 0
|
||||||
|
found = False
|
||||||
|
while (not found):
|
||||||
|
if debug: print("getting ytvid {string} try {i}".format(string=newitem['string'], i=tries))
|
||||||
|
ytvid = yt.getytvideo(newitem['string'], tries)
|
||||||
|
if (ytvid['duration_s']*1000 < newitem['spitem']['duration_ms']+5000 and ytvid['duration_s']*1000 > newitem['spitem']['duration_ms']-5000):
|
||||||
|
found = True
|
||||||
|
else:
|
||||||
|
if debug: print("video does not match song length {yturl}".format(yturl=ytvid['url']))
|
||||||
|
tries += 1
|
||||||
|
if (tries > 5):
|
||||||
|
raise Exception("unable to find {string}".format(string=newitem['string']))
|
||||||
|
if debug: print("adding {yturl}".format(yturl=ytvid['url']))
|
||||||
|
ts3abpl.additem(ytvid['url'], newitem['string'])
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
#if debug: print("renaming items after adding")
|
||||||
|
#ts3abpl.clearrenamestash()
|
|
@ -0,0 +1,67 @@
|
||||||
|
import sp
|
||||||
|
import yt
|
||||||
|
import ts3ab
|
||||||
|
import helpers
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description='Sync Spotifyplaylists into ts3audiobot')
|
||||||
|
|
||||||
|
parser.add_argument('--sp_client_id',
|
||||||
|
dest='sp_client_id',
|
||||||
|
required=True,
|
||||||
|
help='Provide sp_client_id',
|
||||||
|
type=str
|
||||||
|
)
|
||||||
|
parser.add_argument('--sp_client_secret',
|
||||||
|
dest='sp_client_secret',
|
||||||
|
required=True,
|
||||||
|
help='Provide sp_client_secret',
|
||||||
|
type=str
|
||||||
|
)
|
||||||
|
parser.add_argument('--ts3ab_apiurl',
|
||||||
|
dest='ts3ab_apiurl',
|
||||||
|
required=True,
|
||||||
|
help='Provide ts3ab_apiurl',
|
||||||
|
type=str
|
||||||
|
)
|
||||||
|
parser.add_argument('--ts3ab_userid',
|
||||||
|
dest='ts3ab_userid',
|
||||||
|
required=True,
|
||||||
|
help='Provide ts3ab_userid',
|
||||||
|
type=str
|
||||||
|
)
|
||||||
|
parser.add_argument('--ts3ab_token',
|
||||||
|
dest='ts3ab_token',
|
||||||
|
required=True,
|
||||||
|
help='Provide ts3ab_token',
|
||||||
|
type=str
|
||||||
|
)
|
||||||
|
parser.add_argument('--sp_pl_id',
|
||||||
|
dest='sp_pl_id',
|
||||||
|
help='Provide sp_pl_id',
|
||||||
|
type=str
|
||||||
|
)
|
||||||
|
parser.add_argument('--ts3ablistid',
|
||||||
|
dest='ts3ablistid',
|
||||||
|
default=None,
|
||||||
|
help='Provide ts3ablistid',
|
||||||
|
type=str
|
||||||
|
)
|
||||||
|
parser.add_argument('-d', '--debug',
|
||||||
|
dest='debug',
|
||||||
|
default=None,
|
||||||
|
help='Provide ts3ablistid',
|
||||||
|
type=str
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
debug = True
|
||||||
|
|
||||||
|
spobj = sp.initsp(args.sp_client_id, args.sp_client_secret)
|
||||||
|
|
||||||
|
ts3abobj = ts3ab.ts3ab(args.ts3ab_apiurl, args.ts3ab_userid, args.ts3ab_token)
|
||||||
|
|
||||||
|
helpers.synclist(spobj, ts3abobj, args.sp_pl_id, args.ts3ablistid, args.debug)
|
|
@ -0,0 +1,23 @@
|
||||||
|
import spotipy
|
||||||
|
|
||||||
|
def initsp(client_id, client_secret):
|
||||||
|
client_credentials_manager = spotipy.oauth2.SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
|
||||||
|
return spotipy.Spotify(client_credentials_manager=client_credentials_manager)
|
||||||
|
|
||||||
|
def getspotifyplitems(spobj, pl_id):
|
||||||
|
plname = spobj.playlist(playlist_id=pl_id, fields='name')['name']
|
||||||
|
result = []
|
||||||
|
fields = 'items(track(name,album(name),artists(name),duration_ms)),next'
|
||||||
|
queryresult = spobj.playlist_tracks(playlist_id=pl_id, fields=fields)
|
||||||
|
tracks = queryresult['items']
|
||||||
|
i = 0
|
||||||
|
while (queryresult['next'] != None):
|
||||||
|
i += 1
|
||||||
|
queryresult = spobj.playlist_tracks(playlist_id=pl_id, fields=fields, offset=i*100)
|
||||||
|
tracks.extend(queryresult['items'])
|
||||||
|
for item in tracks:
|
||||||
|
artists = []
|
||||||
|
for artist in item['track']['artists']:
|
||||||
|
artists.append(artist['name'])
|
||||||
|
result.append({'name': item['track']['name'], 'artists': artists, 'album': item['track']['album']['name'], 'duration_ms': item['track']['duration_ms']})
|
||||||
|
return plname, result
|
|
@ -0,0 +1,129 @@
|
||||||
|
import requests
|
||||||
|
import base64
|
||||||
|
import urllib.parse
|
||||||
|
|
||||||
|
def stringtob64(inputstring):
|
||||||
|
return base64.b64encode(inputstring.encode('ascii')).decode('ascii')
|
||||||
|
|
||||||
|
class ts3ab:
|
||||||
|
def __init__(self, apiurl, userid, token, botid=0):
|
||||||
|
self._token = stringtob64("{userid}:{token}".format(userid=userid, token=token))
|
||||||
|
self.apiurl = apiurl
|
||||||
|
self.botid = botid
|
||||||
|
|
||||||
|
def gethistory(self):
|
||||||
|
response = requests.get("{apiurl}/bot/use/{botid}(/history/last/10".format(apiurl=self.apiurl, botid=self.botid),
|
||||||
|
headers={
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"Authorization": "Basic {token}".format(token=self._token)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def getplaylist(self, listid):
|
||||||
|
return self.playlist(self, listid)
|
||||||
|
|
||||||
|
def getplaylists(self):
|
||||||
|
response = requests.get("{apiurl}/bot/use/{botid}(/list/list".format(apiurl=self.apiurl, botid=self.botid),
|
||||||
|
headers={
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"Authorization": "Basic {token}".format(token=self._token)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
if (response.status_code != 200):
|
||||||
|
raise Exception("unable to fetch {status_code}".format(status_code=response.status_code))
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
def createplaylist(self, listid, title=None):
|
||||||
|
response = requests.get("{apiurl}/bot/use/{botid}(/list/create/{listid}/{title}".format(apiurl=self.apiurl, botid=self.botid, listid=listid, title=title if not title == None else listid),
|
||||||
|
headers={
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"Authorization": "Basic {token}".format(token=self._token)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
if (response.status_code != 204):
|
||||||
|
raise Exception("unable to fetch {status_code}".format(status_code=response.status_code))
|
||||||
|
return response
|
||||||
|
|
||||||
|
class playlist:
|
||||||
|
def __init__(self, outer, listid):
|
||||||
|
self._outer = outer
|
||||||
|
self.id = listid
|
||||||
|
if not self._checkiflistexists():
|
||||||
|
self._outer.createplaylist(self.id, self.id)
|
||||||
|
self.songcount = 0
|
||||||
|
self.title = self.id
|
||||||
|
self.items = []
|
||||||
|
self.renamestash = []
|
||||||
|
return
|
||||||
|
pljson = self._getpl(0, 20).json()
|
||||||
|
self.songcount = int(pljson['SongCount'])
|
||||||
|
self.title = pljson['Title']
|
||||||
|
self.items = pljson['Items']
|
||||||
|
self.renamestash = []
|
||||||
|
i = 1
|
||||||
|
while len(self.items) < self.songcount:
|
||||||
|
self.items += self._getpl(20*i).json()['Items']
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
def _checkiflistexists(self):
|
||||||
|
for playlist in self._outer.getplaylists():
|
||||||
|
if playlist['Id'] == self.id: return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _getpl(self, offset=0, count=20):
|
||||||
|
response = requests.get("{apiurl}/bot/use/{botid}(/list/show/{listid}/{offset}/{count}".format(apiurl=self._outer.apiurl, botid=self._outer.botid, listid=self.id, offset=offset, count=count),
|
||||||
|
headers={
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"Authorization": "Basic {token}".format(token=self._outer._token)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise Exception("unable to fetch {status_code}".format(status_code=response.status_code))
|
||||||
|
if "ErrorMessage" in response.json().keys():
|
||||||
|
raise Exception(response.json()['ErrorMessage'])
|
||||||
|
return response
|
||||||
|
|
||||||
|
def fetchsongcount(self):
|
||||||
|
self.songcount = self._getpl(0, 1).json()['SongCount']
|
||||||
|
|
||||||
|
def additem(self, url, name=None):
|
||||||
|
self.fetchsongcount()
|
||||||
|
response = requests.get("{apiurl}/bot/use/{botid}(/list/add/{listid}/{url}".format(apiurl=self._outer.apiurl, botid=self._outer.botid, listid=self.id, url=urllib.parse.quote(url, safe='')),
|
||||||
|
headers={
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"Authorization": "Basic {token}".format(token=self._outer._token)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise Exception("unable to add {status_code}".format(status_code=response.status_code))
|
||||||
|
if "ErrorMessage" in response.json().keys():
|
||||||
|
raise Exception(response.json()['ErrorMessage'])
|
||||||
|
#self.renamestash.append({'index': self.songcount, 'name': name})
|
||||||
|
if not name == None: self.renameitem(self.songcount, name)
|
||||||
|
|
||||||
|
def clearrenamestash(self):
|
||||||
|
for item in self.renamestash:
|
||||||
|
self.renameitem(item['index'], item['name'])
|
||||||
|
self.renamestash.remove(item)
|
||||||
|
|
||||||
|
def renameitem(self, index, name):
|
||||||
|
response = requests.get("{apiurl}/bot/use/{botid}(/list/item/name/{listid}/{index}/{title}".format(apiurl=self._outer.apiurl, botid=self._outer.botid, listid=self.id, index=index, title=urllib.parse.quote(name, safe='')),
|
||||||
|
headers={
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"Authorization": "Basic {token}".format(token=self._outer._token)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
if response.status_code != 204:
|
||||||
|
raise Exception("unable to rename {status_code}".format(status_code=response.status_code))
|
||||||
|
|
||||||
|
|
||||||
|
def delitem(self, index):
|
||||||
|
response = requests.get("{apiurl}/bot/use/{botid}(/list/item/delete/{listid}/{index}".format(apiurl=self._outer.apiurl, botid=self._outer.botid, listid=self.id, index=index),
|
||||||
|
headers={
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"Authorization": "Basic {token}".format(token=self._outer._token)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
if response.status_code != 204:
|
||||||
|
raise Exception("unable to delete {status_code}".format(status_code=response.status_code))
|
|
@ -0,0 +1,8 @@
|
||||||
|
import youtube_dl
|
||||||
|
import urllib.parse
|
||||||
|
|
||||||
|
def getytvideo(songstring, index=0):
|
||||||
|
ydl_opts = {'default_search': 'ytsearch{count}'.format(count=index+1), 'quiet': 'true', 'forceurl': 'true'}
|
||||||
|
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
|
||||||
|
video_info = ydl.extract_info(songstring, download=False)['entries'][index]
|
||||||
|
return {"url": "https://youtu.be/{video_id}".format(video_id=urllib.parse.quote(video_info['id'], safe='')), "duration_s": video_info['duration']}
|
Loading…
Reference in New Issue