Compare commits
19 Commits
202b25ec26
...
87c1576e4f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
87c1576e4f | ||
|
|
5d97066b8b | ||
|
|
d00f01edc3 | ||
| 9ab31b6320 | |||
| a41446c606 | |||
|
|
59aba6ba20 | ||
|
|
5b2c7c0ee9 | ||
|
|
ee6e5bd0a6 | ||
|
|
bd675bdd32 | ||
|
|
1f86269bbb | ||
|
|
c6eb28d466 | ||
|
|
143691c186 | ||
|
|
98d8e96922 | ||
|
|
948877f0ee | ||
|
|
1993593150 | ||
|
|
7c870caa92 | ||
|
|
354a6da9ae | ||
|
|
9038d244f2 | ||
|
|
0e53eedfba |
35
commands/command_base.py
Normal file
35
commands/command_base.py
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
from abc import ABCMeta, abstractmethod
|
||||||
|
from enum import Enum, auto
|
||||||
|
|
||||||
|
|
||||||
|
class AbstractCommand(metaclass=ABCMeta):
|
||||||
|
"""
|
||||||
|
This is the base class for commands. In order to load a command a few conditions must be met:
|
||||||
|
1) The class name MUST begin with 'Command' i.e. CommandTTS, CommandBan, etc...
|
||||||
|
2) the class MUST extend AbstractCommand
|
||||||
|
|
||||||
|
Generally, it would be advisable to define the command (something like !so, !tts, !songrequest) as a variable of the
|
||||||
|
class and to then call super().__init__(command)
|
||||||
|
"""
|
||||||
|
|
||||||
|
class CommandType(Enum):
|
||||||
|
NONE = auto()
|
||||||
|
TWITCH = auto()
|
||||||
|
DISCORD = auto()
|
||||||
|
|
||||||
|
def __init__(self, command: str, n_args: int = 0, command_type=CommandType.NONE):
|
||||||
|
self.command = command
|
||||||
|
self.n_args = n_args
|
||||||
|
self.command_type = command_type
|
||||||
|
|
||||||
|
# no touch!
|
||||||
|
def get_args(self, text: str) -> list:
|
||||||
|
return text.split(" ")[0:self.n_args + 1]
|
||||||
|
|
||||||
|
# no touch!
|
||||||
|
def get_command(self) -> str:
|
||||||
|
return self.command
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def do_command(self, bot, twitch_message):
|
||||||
|
pass
|
||||||
66
commands/implemented/command_roll.py
Normal file
66
commands/implemented/command_roll.py
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
from abc import ABCMeta
|
||||||
|
|
||||||
|
from commands.command_base import AbstractCommand
|
||||||
|
|
||||||
|
import random
|
||||||
|
|
||||||
|
class CommandRoll(AbstractCommand, metaclass=ABCMeta):
|
||||||
|
"""
|
||||||
|
this is the roll command.
|
||||||
|
"""
|
||||||
|
command = "!roll"
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(CommandRoll.command, n_args=1, command_type=AbstractCommand.CommandType.TWITCH)
|
||||||
|
|
||||||
|
def do_command(self, bot, twitch_message):
|
||||||
|
print("!roll Detected")
|
||||||
|
#twitch_message.chat.send("test acknowledged")
|
||||||
|
|
||||||
|
diceRoll: str = ""
|
||||||
|
twitch_message.chat.send("Rolling Dice...")
|
||||||
|
print("Rolling Dice...")
|
||||||
|
|
||||||
|
temp_preParsedMessage = twitch_message.text.split("+")
|
||||||
|
|
||||||
|
tempParsedMessage = temp_preParsedMessage[0].split(" ")
|
||||||
|
temp_dice_stmt: str = tempParsedMessage[1]
|
||||||
|
parsedMessage = temp_dice_stmt.lower().split("d")
|
||||||
|
|
||||||
|
loopBool: bool = False
|
||||||
|
if parsedMessage[0] != "":
|
||||||
|
loopBool = True
|
||||||
|
if loopBool == True:
|
||||||
|
if int(parsedMessage[0]) == 1:
|
||||||
|
loopBool = False
|
||||||
|
|
||||||
|
# If roll is in xdx+x format
|
||||||
|
if loopBool == True:
|
||||||
|
rolls: list = []
|
||||||
|
for x in range(int(parsedMessage[0])):
|
||||||
|
rolls.append(random.randint(1, int(parsedMessage[1])))
|
||||||
|
|
||||||
|
rollTotal = 0
|
||||||
|
for roll in rolls:
|
||||||
|
rollTotal = rollTotal + roll
|
||||||
|
diceRoll = diceRoll + str(roll) + ", "
|
||||||
|
diceRoll = diceRoll[:-2] # This removes the last two characters in the string
|
||||||
|
|
||||||
|
if len(temp_preParsedMessage) == 2:
|
||||||
|
diceRoll = diceRoll + " + " + temp_preParsedMessage[1] + " = " + str(
|
||||||
|
rollTotal + int(temp_preParsedMessage[1]))
|
||||||
|
else:
|
||||||
|
diceRoll = diceRoll + " = " + str(rollTotal)
|
||||||
|
# If roll is in dx+x format
|
||||||
|
if loopBool == False:
|
||||||
|
roll: int = random.randint(1, int(parsedMessage[1]))
|
||||||
|
|
||||||
|
if len(temp_preParsedMessage) == 2:
|
||||||
|
diceRoll = str(roll) + " + " + temp_preParsedMessage[1] + " = " + str(
|
||||||
|
roll + int(temp_preParsedMessage[1]))
|
||||||
|
else:
|
||||||
|
diceRoll = str(roll)
|
||||||
|
|
||||||
|
diceRoll = "@" + twitch_message.sender + " rolled: " + diceRoll
|
||||||
|
print(diceRoll)
|
||||||
|
twitch_message.chat.send(diceRoll)
|
||||||
17
commands/implemented/command_test.py
Normal file
17
commands/implemented/command_test.py
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
from abc import ABCMeta
|
||||||
|
|
||||||
|
from commands.command_base import AbstractCommand
|
||||||
|
|
||||||
|
|
||||||
|
class CommandTest(AbstractCommand, metaclass=ABCMeta):
|
||||||
|
"""
|
||||||
|
this is a test command. and a poor excuse for a git commit.
|
||||||
|
"""
|
||||||
|
command = "!test"
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(CommandTest.command, command_type=AbstractCommand.CommandType.TWITCH)
|
||||||
|
|
||||||
|
def do_command(self, bot, twitch_message):
|
||||||
|
print("!test Detected")
|
||||||
|
twitch_message.chat.send("test acknowledged")
|
||||||
20
commands/implemented/command_tts.py
Normal file
20
commands/implemented/command_tts.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
from abc import ABCMeta
|
||||||
|
|
||||||
|
from commands.command_base import AbstractCommand
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class CommandTTS(AbstractCommand, metaclass=ABCMeta):
|
||||||
|
command = "!tts"
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(CommandTTS.command, n_args=1, command_type=AbstractCommand.CommandType.TWITCH)
|
||||||
|
|
||||||
|
def do_command(self, bot, twitch_message):
|
||||||
|
args = self.get_args(twitch_message.text)
|
||||||
|
if args[1] == "start":
|
||||||
|
bot.send_message("tts activated on #%s" % twitch_message.channel)
|
||||||
|
bot.tts_enabled = True
|
||||||
|
elif args[1] == "stop":
|
||||||
|
bot.send_message("tts deactivated")
|
||||||
|
bot.tts_enabled = False
|
||||||
74
commands/loader.py
Normal file
74
commands/loader.py
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
import importlib
|
||||||
|
import importlib.util
|
||||||
|
import inspect
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
from commands.command_base import AbstractCommand
|
||||||
|
|
||||||
|
|
||||||
|
def load_commands() -> Dict[str, AbstractCommand]:
|
||||||
|
commands = compile_and_load()
|
||||||
|
return commands
|
||||||
|
|
||||||
|
|
||||||
|
def compile_and_load_file(path: str) -> (str, AbstractCommand):
|
||||||
|
module_name = os.path.split(path)[1].replace(".py", "")
|
||||||
|
spec = importlib.util.spec_from_file_location(module_name, path)
|
||||||
|
module = importlib.util.module_from_spec(spec)
|
||||||
|
sys.modules[module_name] = module
|
||||||
|
spec.loader.load_module(module_name)
|
||||||
|
|
||||||
|
for name, obj in inspect.getmembers(module):
|
||||||
|
if inspect.isclass(obj) and name.startswith("Command"):
|
||||||
|
command_inst = obj()
|
||||||
|
print("Successfully loaded %s: %s" % (name, command_inst.get_command()))
|
||||||
|
return command_inst.get_command(), command_inst
|
||||||
|
return "", None
|
||||||
|
|
||||||
|
|
||||||
|
def compile_and_load() -> Dict[str, AbstractCommand]:
|
||||||
|
dic = {}
|
||||||
|
implementations = get_implementations_dir()
|
||||||
|
for dirName, subdirList, fileList in os.walk(implementations):
|
||||||
|
for file in fileList:
|
||||||
|
name = os.path.join(dirName, file)
|
||||||
|
print("compiling %s" % name)
|
||||||
|
name, command = compile_and_load_file(name)
|
||||||
|
if command is not None:
|
||||||
|
dic[name] = command
|
||||||
|
break
|
||||||
|
return dic
|
||||||
|
|
||||||
|
|
||||||
|
def get_base_dir() -> str:
|
||||||
|
cwd = os.getcwd()
|
||||||
|
split = os.path.split(cwd)
|
||||||
|
current = split[len(split) - 1]
|
||||||
|
if current == 'commands':
|
||||||
|
return check_dir(cwd)
|
||||||
|
elif current == 'Praxis_Bot' or current == 'Praxis':
|
||||||
|
return check_dir(os.path.join(cwd, "commands"))
|
||||||
|
else:
|
||||||
|
print("could not find working directory for Praxis_Bot/commands")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def get_implementations_dir() -> str:
|
||||||
|
return check_dir(os.path.join(get_base_dir(), "implemented"))
|
||||||
|
|
||||||
|
|
||||||
|
def get_compiled_dir() -> str:
|
||||||
|
return check_dir(os.path.join(get_base_dir(), "compiled"))
|
||||||
|
|
||||||
|
|
||||||
|
def check_dir(path: str) -> str:
|
||||||
|
if not os.path.exists(path):
|
||||||
|
os.mkdir(path, 0x777)
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
cmds = load_commands()
|
||||||
|
|
||||||
@ -26,7 +26,6 @@ class Discord_Credential():
|
|||||||
# Discord Credentials explanations here.
|
# Discord Credentials explanations here.
|
||||||
def __init__(self, nickname, token):
|
def __init__(self, nickname, token):
|
||||||
# super().__init__()
|
# super().__init__()
|
||||||
# all of this is completely made up, i just wanted to make sure your file name switch worked right
|
|
||||||
self.nickname = nickname
|
self.nickname = nickname
|
||||||
self.token = token
|
self.token = token
|
||||||
|
|
||||||
@ -131,7 +130,7 @@ class Credentials_Module():
|
|||||||
def find_Twitch_Credential(self, searchParam: str):
|
def find_Twitch_Credential(self, searchParam: str):
|
||||||
print("Searching for Twitch Credential named: " + searchParam)
|
print("Searching for Twitch Credential named: " + searchParam)
|
||||||
foundSomething = False
|
foundSomething = False
|
||||||
tempCert: Twitch_Credential
|
tempCert: Twitch_Credential = None
|
||||||
for cert in self.Twitch_Credentials_List:
|
for cert in self.Twitch_Credentials_List:
|
||||||
if cert.username == searchParam:
|
if cert.username == searchParam:
|
||||||
print("Twitch Credential Found: {" + cert.username + "}")
|
print("Twitch Credential Found: {" + cert.username + "}")
|
||||||
@ -145,7 +144,7 @@ class Credentials_Module():
|
|||||||
def find_Discord_Credential(self, searchParam: str):
|
def find_Discord_Credential(self, searchParam: str):
|
||||||
print("Searching for Discord Credential named: " + searchParam)
|
print("Searching for Discord Credential named: " + searchParam)
|
||||||
foundSomething = False
|
foundSomething = False
|
||||||
tempCert: Discord_Credential
|
tempCert: Discord_Credential = None
|
||||||
for cert in self.Discord_Credentials_List:
|
for cert in self.Discord_Credentials_List:
|
||||||
if cert.nickname == searchParam:
|
if cert.nickname == searchParam:
|
||||||
print("Discord Credential Found: {" + cert.nickname + "}")
|
print("Discord Credential Found: {" + cert.nickname + "}")
|
||||||
@ -159,7 +158,7 @@ class Credentials_Module():
|
|||||||
def find_DB_Credential(self, searchParam: str):
|
def find_DB_Credential(self, searchParam: str):
|
||||||
print("Searching for DB Credential named: " + searchParam)
|
print("Searching for DB Credential named: " + searchParam)
|
||||||
foundSomething = False
|
foundSomething = False
|
||||||
tempCert: DB_Credential
|
tempCert: DB_Credential = None
|
||||||
for cert in self.DB_Credentials_List:
|
for cert in self.DB_Credentials_List:
|
||||||
if cert.nickname == searchParam:
|
if cert.nickname == searchParam:
|
||||||
print("DB Credential Found: {" + cert.nickname + "}")
|
print("DB Credential Found: {" + cert.nickname + "}")
|
||||||
|
|||||||
@ -7,8 +7,11 @@ import twitch.chat
|
|||||||
import config as config
|
import config as config
|
||||||
import db
|
import db
|
||||||
import tts
|
import tts
|
||||||
|
import commands.loader as command_loader
|
||||||
|
|
||||||
import credentials
|
import credentials
|
||||||
|
from commands.command_base import AbstractCommand
|
||||||
|
|
||||||
|
|
||||||
class Twitch_Module():
|
class Twitch_Module():
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@ -18,7 +21,7 @@ class Twitch_Module():
|
|||||||
|
|
||||||
self.db_manager: db.db_module = db.db_module()
|
self.db_manager: db.db_module = db.db_module()
|
||||||
self.chat: twitch.Chat
|
self.chat: twitch.Chat
|
||||||
|
self.commands = command_loader.load_commands()
|
||||||
self.tts_enabled: bool = False
|
self.tts_enabled: bool = False
|
||||||
self.tts_whitelist_enabled: bool = False
|
self.tts_whitelist_enabled: bool = False
|
||||||
self.links_allowed: bool = True
|
self.links_allowed: bool = True
|
||||||
@ -66,29 +69,30 @@ class Twitch_Module():
|
|||||||
self.tts_message(message)
|
self.tts_message(message)
|
||||||
|
|
||||||
def eval_commands(self, message: twitch.chat.Message):
|
def eval_commands(self, message: twitch.chat.Message):
|
||||||
containsURL: bool = self.contains_url(message)
|
# containsURL: bool = self.contains_url(message)
|
||||||
|
try:
|
||||||
|
#first_space_idx = message.text.index(' ')
|
||||||
|
|
||||||
if message.text.startswith('!tts start'):
|
# This fixes a error where if you send a command without arguments it fails because
|
||||||
print("tts activated on #" + message.channel)
|
# it cant find the substring.
|
||||||
self.send_message("tts activated")
|
if message.text.find(" ") != -1:
|
||||||
self.tts_enabled = True
|
first_space_idx = message.text.index(' ')
|
||||||
|
else:
|
||||||
|
first_space_idx = -1
|
||||||
|
|
||||||
if message.text.startswith('!tts stop'):
|
command_text = ' '
|
||||||
print("tts deactivated on #" + message.channel)
|
if first_space_idx > -1:
|
||||||
self.send_message("tts deactivated")
|
command_text = message.text[0:first_space_idx]
|
||||||
self.tts_enabled = True
|
else:
|
||||||
|
command_text = message.text
|
||||||
|
|
||||||
if message.text.startswith('!test'):
|
command = self.commands[command_text]
|
||||||
print("!test Detected")
|
if command is not None and command.command_type is AbstractCommand.CommandType.TWITCH:
|
||||||
message.chat.send("test acknowledged")
|
command.do_command(self, message)
|
||||||
# message.chat.send(f'@{message.user().display_name}, you have {message.user().view_count} views.')
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
if message.text.startswith('!roll'):
|
pass # we don't care
|
||||||
try:
|
|
||||||
self.dice_roll(message)
|
|
||||||
except Exception:
|
|
||||||
self.send_message("{something went wrong}")
|
|
||||||
print("{something went wrong}")
|
|
||||||
|
|
||||||
def tts_message(self, message: twitch.chat.Message):
|
def tts_message(self, message: twitch.chat.Message):
|
||||||
if not self.contains_slur(message):
|
if not self.contains_slur(message):
|
||||||
@ -134,55 +138,6 @@ class Twitch_Module():
|
|||||||
print("<{ slur detected! }> " + " [#" + message.channel + "](" + message.sender + ") used a slur in chat")
|
print("<{ slur detected! }> " + " [#" + message.channel + "](" + message.sender + ") used a slur in chat")
|
||||||
return containsSlur
|
return containsSlur
|
||||||
|
|
||||||
# Rolls Dice.
|
|
||||||
def dice_roll(self, message: twitch.chat.Message):
|
|
||||||
diceRoll: str = ""
|
|
||||||
self.send_message("Rolling Dice...")
|
|
||||||
print("Rolling Dice...")
|
|
||||||
|
|
||||||
temp_preParsedMessage = message.text.split("+")
|
|
||||||
|
|
||||||
tempParsedMessage = temp_preParsedMessage[0].split(" ")
|
|
||||||
temp_dice_stmt: str = tempParsedMessage[1]
|
|
||||||
parsedMessage = temp_dice_stmt.lower().split("d")
|
|
||||||
|
|
||||||
loopBool: bool = False
|
|
||||||
if parsedMessage[0] != "":
|
|
||||||
loopBool = True
|
|
||||||
if loopBool == True:
|
|
||||||
if int(parsedMessage[0]) == 1:
|
|
||||||
loopBool = False
|
|
||||||
|
|
||||||
# If roll is in xdx+x format
|
|
||||||
if loopBool == True:
|
|
||||||
rolls: list = []
|
|
||||||
for x in range(int(parsedMessage[0])):
|
|
||||||
rolls.append(random.randint(1, int(parsedMessage[1])))
|
|
||||||
|
|
||||||
rollTotal = 0
|
|
||||||
for roll in rolls:
|
|
||||||
rollTotal = rollTotal + roll
|
|
||||||
diceRoll = diceRoll + str(roll) + ", "
|
|
||||||
diceRoll = diceRoll[:-2] # This removes the last two characters in the string
|
|
||||||
|
|
||||||
if len(temp_preParsedMessage) == 2:
|
|
||||||
diceRoll = diceRoll + " + " + temp_preParsedMessage[1] + " = " + str(
|
|
||||||
rollTotal + int(temp_preParsedMessage[1]))
|
|
||||||
else:
|
|
||||||
diceRoll = diceRoll + " = " + str(rollTotal)
|
|
||||||
# If roll is in dx+x format
|
|
||||||
if loopBool == False:
|
|
||||||
roll: int = random.randint(1, int(parsedMessage[1]))
|
|
||||||
|
|
||||||
if len(temp_preParsedMessage) == 2:
|
|
||||||
diceRoll = str(roll) + " + " + temp_preParsedMessage[1] + " = " + str(
|
|
||||||
roll + int(temp_preParsedMessage[1]))
|
|
||||||
else:
|
|
||||||
diceRoll = str(roll)
|
|
||||||
|
|
||||||
diceRoll = "@" + message.sender + " rolled: " + diceRoll
|
|
||||||
print(diceRoll)
|
|
||||||
self.send_message(diceRoll)
|
|
||||||
|
|
||||||
|
|
||||||
# This is a old function used prior to the creation of the Twitch_Module class above.
|
# This is a old function used prior to the creation of the Twitch_Module class above.
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user