diff --git a/channel_points/command_base.py b/channel_points/command_base.py new file mode 100644 index 0000000..b3afc97 --- /dev/null +++ b/channel_points/command_base.py @@ -0,0 +1,57 @@ +from abc import ABCMeta, abstractmethod +from enum import Enum, auto + + +class AbstractCommand(metaclass=ABCMeta): + """ + This is the base class for commands. In order to load a command a few conditions must be met: + 1) The class name MUST begin with 'Command' i.e. CommandTTS, CommandBan, etc... + 2) the class MUST extend AbstractCommand + + Generally, it would be advisable to define the command (something like !so, !tts, !songrequest) as a variable of the + class and to then call super().__init__(command) + """ + + class CommandType(Enum): + NONE = auto() + Praxis = auto() + TWITCH = auto() + DISCORD = auto() + Ver2 = auto() + + class CommandSource(Enum): + default = 0 + Praxis = 1 + Twitch = 2 + Discord = 3 + + def __init__(self, command: str, n_args: int = 0, command_type=CommandType.NONE, helpText:list=["No Help"], CommandEnabled = True): + self.command = command + self.n_args = n_args + self.command_type = command_type + self.help = helpText + self.isCommandEnabled = CommandEnabled + + # no touch! + def get_args(self, text: str) -> list: + return text.split(" ")[0:self.n_args + 1] + + # no touch! + def get_command(self) -> str: + return self.command + + # no touch! + def get_commandType(self): + return self.command_type + + # no touch! + def get_help(self): + return self.help + + # no touch! + def is_command_enabled(self): + return self.isCommandEnabled + + @abstractmethod + def do_command(self, bot, user, command, rest, bonusData): + pass \ No newline at end of file diff --git a/channel_points/implemented/Command_lights_v2.py b/channel_points/implemented/Command_lights_v2.py new file mode 100644 index 0000000..ccde22b --- /dev/null +++ b/channel_points/implemented/Command_lights_v2.py @@ -0,0 +1,75 @@ +from abc import ABCMeta + +import lights_module +from commands.command_base import AbstractCommand + +import utilities_script as utility + +class Command_lights_v2(AbstractCommand, metaclass=ABCMeta): + """ + this is the test command. + """ + command = "!lights" + + def __init__(self): + super().__init__(Command_lights_v2.command, n_args=1, command_type=AbstractCommand.CommandType.Ver2) + self.help = ["This command allows you to modify the lights via the Lights_Module.", + "\nExample:","lights \"SCENE\"","lights \"COLOR\"","lights \"R\" \"G\" \"B\"","lights \"1\" \"0.5\" \"0\""] + self.isCommandEnabled = True + + def do_command(self, source = AbstractCommand.CommandSource.default, user = "User", command = "", rest = "", bonusData = None): + returnString = "" + + tempBool = True + if tempBool == True: + LightModule = lights_module.Lights_Module() + LightModule.main() + #bot.return_message("\nRGB Command Detected!") + tempFix = command + " " + rest + + tempParsedMessage = tempFix.split(" ") + sceneCommand = False + if (len(tempParsedMessage)) > 2: + #bot.return_message("RGB Command!") + rgb_r = float(tempParsedMessage[1]) + rgb_g = float(tempParsedMessage[2]) + rgb_b = float(tempParsedMessage[3]) + xy_result = LightModule.rgb_to_xy(rgb_r, rgb_g, rgb_b) + #bot.return_message("got XY") + LightModule.bridge_.set_group(16, "xy", xy_result) + #bot.return_message("sent color to [Lights_Module]") + else: + if "stream" in tempParsedMessage: + sceneCommand = True + LightModule.bridge_.run_scene("Downstairs", "Stream") + elif "normal" in tempParsedMessage: + sceneCommand = True + LightModule.bridge_.run_scene("Downstairs", "Bright") + elif "haxor" in tempParsedMessage: + sceneCommand = True + LightModule.bridge_.run_scene("Downstairs", "hacker vibes") + elif "off" in tempParsedMessage: + sceneCommand = True + LightModule.bridge_.set_group("Downstairs", "on", False) + elif "on" in tempParsedMessage: + sceneCommand = True + LightModule.bridge_.set_group("Downstairs", "on", True) + elif "ravemode" in tempParsedMessage: + sceneCommand = True + LightModule.raveMode() + else: + #bot.return_message("Color Command!") + xy_result = LightModule.color_string_parser(tempParsedMessage) + #bot.return_message("got XY") + LightModule.bridge_.set_group(16, "xy", xy_result) + #bot.return_message("sent color to [Lights_Module]") + + #if sceneCommand == True: + #bot.return_message("Scene Command!") + + returnString = user + " changed the light's color!" + + return returnString + + def get_help(self): + return self.help \ No newline at end of file diff --git a/channel_points/implemented/Command_roll_v2.py b/channel_points/implemented/Command_roll_v2.py new file mode 100644 index 0000000..a174567 --- /dev/null +++ b/channel_points/implemented/Command_roll_v2.py @@ -0,0 +1,114 @@ +from abc import ABCMeta + +from commands.command_base import AbstractCommand + +import random +import utilities_script as utility + +class Command_roll_v2(AbstractCommand, metaclass=ABCMeta): + """ + this is the test command. + """ + command = "!roll" + + def __init__(self): + super().__init__(Command_roll_v2.command, n_args=1, command_type=AbstractCommand.CommandType.Ver2) + self.help = ["This will roll dice, based on your inputs.", + "\nExample:","roll \"d20\"", "roll \"1D20+5\"", "roll \"10df\"", "roll \"10Df+3\""] + self.isCommandEnabled = True + + def do_command(self, source = AbstractCommand.CommandSource.default, user = "User", command = "", rest = "", bonusData = None): + returnString = user + " sent: [ " + command + " ] with: " + rest + + if ("f") in rest.lower(): + returnString = self.roll(2, user, command + " " +rest) + else: + returnString = self.roll(1, user, command + " " +rest) + + return returnString + + def roll(self, roll_type, user, user_message): + diceRoll = "" + switch = { + 1: "Standard", + 2: "Fate Dice" + } + temp_preParsedMessage = user_message.split("+") + + tempParsedMessage = temp_preParsedMessage[0].split(" ") + temp_dice_stmt: str = tempParsedMessage[1] + parsedMessage = temp_dice_stmt.lower().split("d") + + loopBool: bool = False + if parsedMessage[0] != "": + loopBool = True + if loopBool == True: + if int(parsedMessage[0]) == 1: + loopBool = False + + if roll_type == 1: + print("-rolling...") + # If roll is in xdx+x format + if loopBool == True: + rolls: list = [] + for x in range(int(parsedMessage[0])): + rolls.append(random.randint(1, int(parsedMessage[1]))) # This is the roller + + rollTotal = 0 + for roll in rolls: + rollTotal = rollTotal + roll + diceRoll = diceRoll + str(roll) + ", " + diceRoll = diceRoll[:-2] # This removes the last two characters in the string + + if len(temp_preParsedMessage) == 2: + diceRoll = diceRoll + " + " + temp_preParsedMessage[1] + " = " + str( + rollTotal + int(temp_preParsedMessage[1])) + else: + diceRoll = diceRoll + " = " + str(rollTotal) + # If roll is in dx+x format + if loopBool == False: + roll: int = random.randint(1, int(parsedMessage[1])) # This is the roller + + if len(temp_preParsedMessage) == 2: + diceRoll = str(roll) + " + " + temp_preParsedMessage[1] + " = " + str( + roll + int(temp_preParsedMessage[1])) + else: + diceRoll = str(roll) + diceRoll = user + " rolled: " + diceRoll + + if roll_type == 2: + + print("-fate Rolling....") + # !roll 4df + # If roll is in xdx+x format + if loopBool == True: + rolls: list = [] + for x in range(int(parsedMessage[0])): + rolls.append(random.randint(-1, 1)) # This is the roller + + rollTotal = 0 + for roll in rolls: + rollTotal = rollTotal + roll + diceRoll = diceRoll + str(roll) + ", " + diceRoll = diceRoll[:-2] # This removes the last two characters in the string + + if len(temp_preParsedMessage) == 2: + diceRoll = diceRoll + " + " + temp_preParsedMessage[1] + " = " + str( + rollTotal + int(temp_preParsedMessage[1])) + else: + diceRoll = diceRoll + " = " + str(rollTotal) + # If roll is in dx+x format + if loopBool == False: + roll: int = random.randint(-1, 1) # This is the roller + + if len(temp_preParsedMessage) == 2: + diceRoll = str(roll) + " + " + temp_preParsedMessage[1] + " = " + str( + roll + int(temp_preParsedMessage[1])) + else: + diceRoll = str(roll) + diceRoll = user + " fate rolled: " + diceRoll + + return diceRoll + + def get_help(self): + return self.help \ No newline at end of file diff --git a/channel_points/implemented/Command_test_v2.py b/channel_points/implemented/Command_test_v2.py new file mode 100644 index 0000000..18995d3 --- /dev/null +++ b/channel_points/implemented/Command_test_v2.py @@ -0,0 +1,25 @@ +from abc import ABCMeta + +from commands.command_base import AbstractCommand + +import utilities_script as utility + +class Command_test_v2(AbstractCommand, metaclass=ABCMeta): + """ + this is the test command. + """ + command = "testerino" + + def __init__(self): + super().__init__(Command_test_v2.command, n_args=1, command_type=AbstractCommand.CommandType.Ver2) + self.help = ["This is a test command.", + "\nExample:","testerino"] + self.isCommandEnabled = True + + def do_command(self, source = AbstractCommand.CommandSource.default, user = "User", command = "", rest = "", bonusData = None): + returnString = user + " sent: [ " + command + " ] with: " + rest + #print(returnString) + return returnString + + def get_help(self): + return self.help \ No newline at end of file diff --git a/channel_points/loader.py b/channel_points/loader.py new file mode 100644 index 0000000..a92b544 --- /dev/null +++ b/channel_points/loader.py @@ -0,0 +1,79 @@ +import importlib +import importlib.util +import inspect +import os +import sys +from typing import Dict + +from commands.command_base import AbstractCommand + + +#New +def load_commands(commandType: AbstractCommand.CommandType) -> Dict[str, AbstractCommand]: + print(" -Loading ", commandType ," Commands...\n") + commands = compile_and_load(commandType) + return commands + +#New +def compile_and_load_file(path: str, commandType: AbstractCommand.CommandType): + module_name = os.path.split(path)[1].replace(".py", "") + spec = importlib.util.spec_from_file_location(module_name, path) + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + spec.loader.load_module(module_name) + + for name, obj in inspect.getmembers(module): + if inspect.isclass(obj) and name.startswith("Command"): + command_inst = obj() + if commandType == command_inst.get_commandType(): + print(" ---Successfully loaded %s: %s" % (commandType, command_inst.get_command())) + return command_inst.get_command(), command_inst + elif commandType != command_inst.get_commandType(): + print(" -%s CommandType did not match: %s for: %s" % (command_inst.get_commandType(), commandType, command_inst.get_command())) + return "", None + + +#New +def compile_and_load(commandType: AbstractCommand.CommandType) -> Dict[str, AbstractCommand]: + dic = {} + implementations = get_implementations_dir() + for dirName, subdirList, fileList in os.walk(implementations): + for file in fileList: + name = os.path.join(dirName, file) + print("compiling: %s" % name) + name, command = compile_and_load_file(name, commandType) + if command is not None and command.command_type is commandType: + dic[name] = command + break + return dic + +def get_base_dir() -> str: + cwd = os.getcwd() + split = os.path.split(cwd) + current = split[len(split) - 1] + if current == 'commands': + return check_dir(cwd) + elif current == 'Praxis_Bot' or current == 'Praxis': + return check_dir(os.path.join(cwd, "commands")) + else: + print("could not find working directory for Praxis_Bot/commands") + raise Exception + + +def get_implementations_dir() -> str: + return check_dir(os.path.join(get_base_dir(), "implemented")) + + +def get_compiled_dir() -> str: + return check_dir(os.path.join(get_base_dir(), "compiled")) + + +def check_dir(path: str) -> str: + if not os.path.exists(path): + os.mkdir(path, 0x777) + return path + + +if __name__ == "__main__": + cmds = load_commands() + diff --git a/standalone_channelpoints.py b/standalone_channelpoints.py new file mode 100644 index 0000000..53a6e28 --- /dev/null +++ b/standalone_channelpoints.py @@ -0,0 +1,78 @@ +import flask +from flask import request + +import commands.loader as command_loader +from commands.command_base import AbstractCommand + +api = flask.Flask(__name__) +# enable/disable this to get web pages of crashes returned +api.config["DEBUG"] = True + +loadedCommands = {} + +def init(): + # todo load entire command library and cache it here + load_commands() + + +def load_commands(): + global loadedCommands + loadedCommands = command_loader.load_commands(AbstractCommand.CommandType.Ver2) + + +def is_command(command: str) -> bool: + #print(command) + for cmd in loadedCommands: + #print(cmd) + if command == cmd: + return True + + if command == "!echo": + return True + else: + return False + +def handle_command(source, username, command, rest, bonusData): + if command == "!echo": + message = "Got payload [%s]" % rest + #print(message) + return flask.make_response("{\"message\":\"%s\"}" % message, 200, {"Content-Type": "application/json"}) + + cmd:AbstractCommand = loadedCommands[command] + if cmd is not None: + cmd_response = cmd.do_command(source, username, command, rest, bonusData) + return flask.make_response("{\"message\":\"%s\"}" % cmd_response, 200, {"Content-Type": "application/json"}) + + #print("Doing a command") + + +@api.route('/api/v1/command', methods=['GET']) +def command_check(): + if 'name' in request.args: + if is_command(request.args['name']): + return flask.make_response('', 200) + else: + return flask.make_response('', 404) + + +@api.route('/api/v1/exec', methods=['GET']) +def exec_command(): + if 'command_name' not in request.args: + return flask.make_response('{\"text\":"Argument \'command_name\' not in request"}', 400) + if 'rest' not in request.args: + return flask.make_response('{\"text\":"Argument \'rest\' not in request"}', 400) + + if 'command_source' not in request.args: + return flask.make_response('{\"text\":"Argument \'command_source\' not in request"}', 400) + + if 'user_name' not in request.args: + username = "User" + else: + username = request.args['user_name'] + + return handle_command(request.args['command_source'], username, request.args['command_name'], request.args['rest'], request.args['bonus_data']) + + +if __name__ == '__main__': + init() + api.run(host='0.0.0.0')