From a38f99c5a0080602a09ea85a6944f84ba9f8e932 Mon Sep 17 00:00:00 2001 From: dtookey Date: Tue, 20 Apr 2021 21:23:39 -0400 Subject: [PATCH] implemented dummy stuff --- requirements.txt | 3 +- standalone_command.py | 41 ++++++++ twitch_script_standalone.py | 184 ++++++++++++++++++++++++++++++++++++ 3 files changed, 227 insertions(+), 1 deletion(-) create mode 100644 standalone_command.py create mode 100644 twitch_script_standalone.py diff --git a/requirements.txt b/requirements.txt index 4045979..b5ac8a0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,5 @@ playsound discord.py psutil art -phue \ No newline at end of file +phue +flask \ No newline at end of file diff --git a/standalone_command.py b/standalone_command.py new file mode 100644 index 0000000..dcb28b9 --- /dev/null +++ b/standalone_command.py @@ -0,0 +1,41 @@ +import flask + +api = flask.Flask(__name__) +#enable/disable this to get web pages of crashes returned +api.config["DEBUG"] = True + +def init(): + #todo load entire command library and cache it here + pass + +def is_command(command:str)->bool: + if command == "!echo": + return True + else: + return False + +def handle_command(command, rest): + if command == "!echo": + message = "Got payload [%s]" % rest + return flask.make_response("{message:\"%s\"}" % message, 200, {"Content-Type": "application/json"}) + +@api.route('/api/v1/command', methods=['GET']) +def is_command(): + if 'name' in request.args: + if is_command(request.args['name']): + return flask.make_response('', 200) + else: + return flask.make_response('', 404) + +def exec_command(): + if 'command_name' not in request.args: + return flask.make_response('{text:"Argument \'command_name\' not in request"}', 400) + if 'rest' not in request.args: + return flask.make_response('{text:"Argument \'rest\' not in request"}', 400) + + return handle_command(request.args['command_name'], request.args['rest']) + + +if __name__ == '__main__': + init() + api.run(host='0.0.0.0') diff --git a/twitch_script_standalone.py b/twitch_script_standalone.py new file mode 100644 index 0000000..f9a706c --- /dev/null +++ b/twitch_script_standalone.py @@ -0,0 +1,184 @@ +from json import loads +import re + +import requests +import twitch +import twitch.chat +from urllib.parse import urlencode, quote_plus +import config as config +import credentials +from cooldowns import Cooldown_Module + + +class Twitch_Module(): + def __init__(self): + super().__init__() + self.twitchCredential: credentials.Twitch_Credential + self.chat: twitch.Chat + + self.tts_enabled: bool = False + self.block_tts_url: bool = False + self.tts_whitelist_enabled: bool = False + self.block_chat_url: bool = True + self.whitelisted_users: list = ["thecuriousnerd"] + # don't freak out, this is *merely* a regex for matching urls that will hit just about everything + self._urlMatcher = re.compile( + "(https?:(/{1,3}|[a-z0-9%])|[a-z0-9.-]+[.](com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw))") + + # Default Twitch Chat limit is 20 per 30 seconds + # If Mod or Op, Twitch Chat limit is 100 per 30 seconds + self.cooldownModule: Cooldown_Module = Cooldown_Module() + self.cooldownModule.setupCooldown("twitchChat", 20, 32) + + self.allow_rgbLightControl = config.autoEnabled_Twitch_rgbLightControl + + def join_channel(self, credential: credentials.Twitch_Credential, channel_name: str): + channel_name = "#" + channel_name + print("Connecting to Channel: " + channel_name + "...") + + if credential is None: + credential = self.twitchCredential + + self.chat = twitch.Chat( + channel=channel_name, + nickname=credential.username, + oauth=credential.oauth, + # LIBRARY UPDATE BROKE THE FOLLOWING LINE [FIX THIS] + # helix = twitch.Helix(credential.helix, use_cache=True) + ) + self.chat.subscribe(self.twitch_chat) + + print("Connected to Channel: ", channel_name) + + def leave_channel(self): + print("Leaving Channel", self.chat.channel) + self.chat.irc.leave_channel(self.chat.channel) + self.chat.irc.socket.close() + + def send_message(self, message): + isBlocked = self.isChannel_inConfigList(self.chat.channel, config.block_TwitchChannelsMessaging) + # print("isBlocked: " + str(isBlocked) + " for: " + self.chat.channel) + if self.cooldownModule.isCooldownActive( + "twitchChat") == False and not isBlocked and not config.blockAll_TwitchChatChannelsMessaging: + self.chat.send(message) + # print("Sent ChatMSG") + self.cooldownModule.actionTrigger("twitchChat") + + def parse_line(self, message: str) -> (str, str): + first_space = False + start = -1 + idx = -1 + for x in range(0, len(message)): + c = message[x] + if c == ' ': + if first_space: + idx = x + break + else: + first_space = True + pass + else: + if start == -1: + start = x + + command = message[start:idx] + rest = message[idx + 1:] + return command, rest + + def is_command(self, word: str) -> bool: + # todo need to url-escape word + clean_param = urlencode(word, quote_via=quote_plus) + url = "http://localhost:5000/api/v1/command?name=%s" % clean_param + resp = requests.get(url) + return resp.status_code == 200 + + def exec_command(self, command: str, rest: str): + #todo need to url-escape command and rest + clean_command = urlencode(command, quote_via=quote_plus) + clean_rest = urlencode(rest, quote_via=quote_plus) + url = "http://localhost:5000/api/v1/exec?command_name=%s&rest=%s" % (clean_command, clean_rest) + resp = requests.get(url) + if resp.status_code == 200: + data = loads(resp.text) + msg = data['message'] + if msg is not None: + self.send_message(msg) + else: + #todo handle failed requests + pass + + def send_whisper(self, user, message): + pass + + # This reacts to messages + def twitch_chat(self, message: twitch.chat.Message) -> None: + print("[#" + message.channel + "](" + message.sender + ")> " + message.text) + command, rest = self.parse_line(message.text) + is_actionable = self.is_command(command) + if is_actionable: + self.exec_command(command, rest) + + # todo send get request + + def contains_url(self, message: twitch.chat.Message): + containsURL = re.search(self._urlMatcher, message.text.lower()) is not None + if containsURL: + print("<{ link detected! }> " + " [#" + message.channel + "](" + message.sender + ") sent a link in chat") + return containsURL + + # Checks if Sender is bot. + def isSenderBot(self, message: twitch.chat.Message): + isBot = False + for bot in config.botList: + if message.sender.lower() == bot.lower(): + isBot = True + print("<{ bot detected! }> " + " [#" + message.channel + "](" + message.sender + ") is a bot") + return isBot + + # Checks for basic slurs. + def contains_slur(self, message: twitch.chat.Message): + containsSlur: bool = False + parsedMessage = message.text.split(" ") + for word in parsedMessage: + for slur in config.slurList: + if word.lower() == slur: + containsSlur = True + break # we want to immediately escape if we found a slur + if containsSlur: + break + + if containsSlur: + print("<{ slur detected! }> " + " [#" + message.channel + "](" + message.sender + ") used a slur in chat") + return containsSlur + + def isTTS_URL_Enabled(self, message: twitch.chat.Message): + is_ttsEnabled = False + if not config.blockAll_TTS_URL_Twitch or not self.block_tts_url: + if not self.contains_url(message): + is_ttsEnabled = True + return is_ttsEnabled + + def isChannel_inConfigList(self, selectedChannel, selectedList): + # print(channel) + # print(selectedList) + is_Self = False + for twitchChannel in selectedList: + if twitchChannel == selectedChannel: + is_Self = True + # if is_Self: + # print("Is Self") + # if not is_Self: + # print("Is Not Self") + return is_Self + + +if __name__ == "__main__": + testModule = Twitch_Module() + + credentials_manager = credentials.Credentials_Module() + credentials_manager.load_credentials() + testModule.twitchCredential = credentials_manager.find_Twitch_Credential(config.credentialsNickname) + testModule.dbCredential = credentials_manager.find_DB_Credential(config.credentialsNickname) + + for twitchChannel in config.autojoinTwitchChannels: + testModule.join_channel(None, twitchChannel)