from abc import ABCMeta from channel_rewards.channelRewards_base import AbstractChannelRewards from json import loads from urllib.parse import urlencode import requests import docker_utility import threading import random class ChannelReward_TwitchBits(AbstractChannelRewards, metaclass=ABCMeta): """ this is the hydration reward. """ ChannelRewardName = "TwitchBits" def __init__(self): super().__init__(ChannelReward_TwitchBits.ChannelRewardName, n_args=1, ChannelRewardType=AbstractChannelRewards.ChannelRewardsType.twitch_bits) self.help = ["This is a TwitchBits reward."] self.isChannelRewardEnabled = True self.threads = [] def do_ChannelReward(self, source = AbstractChannelRewards.ChannelRewardsSource.default, user = "User", rewardName = "", rewardPrompt = "", userInput = "", bonusData = None): #print("sending:",user, 16, "!lights hydration") try: #if self.is_ChannelReward_enabled: #thread_ = threading.Thread(target=self.send_Lights_Command, args=(user, 16, "!lights hydration", "")) #thread_.daemon = True #self.threads.append(thread_) #thread_.start() if self.is_ChannelReward_enabled: thread_ = threading.Thread(target=self.send_TTS, args=("", user + " showered the stream with " + userInput + " bits and says: " + rewardPrompt)) thread_.daemon = True self.threads.append(thread_) thread_.start() except: pass return None def send_Lights_Command(self, username, light_group, command, rest): # todo need to url-escape command and rest params = urlencode({'user_name': username, 'light_group': light_group, 'command': command, 'rest':rest}) #standalone_lights url = "http://%s:42069/api/v1/exec_lights?%s" % docker_utility.ifNotDockerUseLocalhost("standalone_lights"), params resp = requests.get(url) if resp.status_code == 200: print("Got the following message: %s" % resp.text) data = loads(resp.text) msg = data['message'] if msg is not None: return msg # todo send to logger and other relevent services else: # todo handle failed requests pass def send_TTS(self, username, message): params = urlencode({'tts_sender': username, 'tts_text': message}) #standalone_tts_core url = "http://%s:60809/api/v1/tts/send_text?%s" % docker_utility.ifNotDockerUseLocalhost("standalone_tts_core"), params resp = requests.get(url) if resp.status_code == 200: print("Got the following message: %s" % resp.text) data = loads(resp.text) msg = data['message'] if msg is not None: return msg # todo send to logger and other relevent services else: # todo handle failed requests pass def get_bitsAmount(self, dataToParse:dict): try: getData = str(dataToParse['data']['message']['data']['bits_used']) except: getData = "Error" return getData def get_bitsMessage(self, dataToParse:dict): try: getData = str(dataToParse['data']['message']['data']['chat_message']) except: getData = "Error" return getData def get_Phrase(self, defaultRewardPrompt, phrases = [""]): phrases.append(defaultRewardPrompt) totalPhrases = len(phrases) - 1 targetPhrase = phrases[random.randint(0,totalPhrases)] return targetPhrase def get_help(self): return self.help