support both gtts and aws polly (we borrow from the streamlabs api) moved some configuration stuff around added one test method for creating file names, but it doesn't have any assertions, so it's useless added enums for different configuration properties like tts engine, file naming, and poly voices
193 lines
7.8 KiB
Python
193 lines
7.8 KiB
Python
import random
|
|
import re
|
|
|
|
import twitch
|
|
import twitch.chat
|
|
|
|
import config as config
|
|
import db
|
|
import tts
|
|
import twitch_cred as twitch_credentials
|
|
|
|
|
|
class Twitch_Module():
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.chat: twitch.Chat
|
|
self.tts_enabled: bool = False
|
|
self.tts_whitelist_enabled: bool = False
|
|
self.links_allowed: bool = True
|
|
self.whitelisted_users: list = ["thecuriousnerd", "theredpoint", "lakotor"]
|
|
# don't freak out, this is *merely* a regex for matching urls that will hit just about everything
|
|
self._urlMatcher = re.compile(
|
|
"(https?:(/{1,3}|[a-z0-9%])|[a-z0-9.-]+[.](com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw))")
|
|
|
|
def join_channel(self, channel_name):
|
|
channel_name = "#" + channel_name
|
|
|
|
self.chat = twitch.Chat(
|
|
channel=channel_name,
|
|
nickname=twitch_credentials.username,
|
|
oauth=twitch_credentials.oauth,
|
|
helix=twitch.Helix(twitch_credentials.helix, use_cache=True)
|
|
)
|
|
self.chat.subscribe(self.twitch_chat)
|
|
|
|
print("Connected to Channel: ", channel_name)
|
|
|
|
def leave_channel(self):
|
|
print("Leaving Channel", self.chat.channel)
|
|
self.chat.irc.leave_channel(self.chat.channel)
|
|
self.chat.irc.socket.close()
|
|
|
|
def send_message(self, message):
|
|
self.chat.send(message)
|
|
|
|
def send_whisper(self, user, message):
|
|
pass
|
|
|
|
# This reacts to messages
|
|
def twitch_chat(self, message: twitch.chat.Message) -> None:
|
|
print("[#" + message.channel + "](" + message.sender + ")> " + message.text)
|
|
if message.channel == "thecuriousnerd":
|
|
|
|
if not self.isSenderBot(message):
|
|
if message.sender.lower() == "thecuriousnerd":
|
|
self.eval_commands(message)
|
|
self.tts_message(message)
|
|
|
|
def eval_commands(self, message: twitch.chat.Message):
|
|
containsURL: bool = self.contains_url(message)
|
|
|
|
if message.text.startswith('!tts start'):
|
|
print("tts activated on #" + message.channel)
|
|
self.send_message("tts activated")
|
|
self.tts_enabled = True
|
|
|
|
if message.text.startswith('!tts stop'):
|
|
print("tts deactivated on #" + message.channel)
|
|
self.send_message("tts deactivated")
|
|
self.tts_enabled = True
|
|
|
|
if message.text.startswith('!test'):
|
|
print("!test Detected")
|
|
message.chat.send("test acknowledged")
|
|
# message.chat.send(f'@{message.user().display_name}, you have {message.user().view_count} views.')
|
|
|
|
if message.text.startswith('!roll'):
|
|
try:
|
|
self.dice_roll(message)
|
|
except Exception:
|
|
self.send_message("{something went wrong}")
|
|
print("{something went wrong}")
|
|
|
|
def tts_message(self, message: twitch.chat.Message):
|
|
if not self.contains_slur(message):
|
|
if self.tts_enabled:
|
|
if not message.text.startswith('!'):
|
|
text_to_say: str = "%s says, %s" % (message.sender, message.text)
|
|
channel_text = "%s user msg" % message.channel
|
|
|
|
if message.sender.lower() == message.channel:
|
|
tts.tts(text_to_say)
|
|
else:
|
|
# tts.tts(message.sender + " says, " + message.text)
|
|
tts.tts(text_to_say, channel_text)
|
|
|
|
def contains_url(self, message: twitch.chat.Message):
|
|
containsURL = re.search(self._urlMatcher, message.text.lower()) is not None
|
|
if containsURL:
|
|
print("<{ link detected! }> " + " [#" + message.channel + "](" + message.sender + ") sent a link in chat")
|
|
return containsURL
|
|
|
|
# Checks if Sender is bot.
|
|
def isSenderBot(self, message: twitch.chat.Message):
|
|
isBot = False
|
|
for bot in config.botList:
|
|
if message.sender.lower() == bot.lower():
|
|
isBot = True
|
|
print("<{ bot detected! }> " + " [#" + message.channel + "](" + message.sender + ") is a bot")
|
|
return isBot
|
|
|
|
# Checks for basic slurs.
|
|
def contains_slur(self, message: twitch.chat.Message):
|
|
containsSlur: bool = False
|
|
parsedMessage = message.text.split(" ")
|
|
for word in parsedMessage:
|
|
for slur in config.slurList:
|
|
if word.lower() == slur:
|
|
containsSlur = True
|
|
break # we want to immediately escape if we found a slur
|
|
if containsSlur:
|
|
break
|
|
|
|
if containsSlur:
|
|
print("<{ slur detected! }> " + " [#" + message.channel + "](" + message.sender + ") used a slur in chat")
|
|
return containsSlur
|
|
|
|
# Rolls Dice.
|
|
def dice_roll(self, message: twitch.chat.Message):
|
|
diceRoll: str = ""
|
|
self.send_message("Rolling Dice...")
|
|
print("Rolling Dice...")
|
|
|
|
temp_preParsedMessage = message.text.split("+")
|
|
|
|
tempParsedMessage = temp_preParsedMessage[0].split(" ")
|
|
temp_dice_stmt: str = tempParsedMessage[1]
|
|
parsedMessage = temp_dice_stmt.lower().split("d")
|
|
|
|
loopBool: bool = False
|
|
if parsedMessage[0] != "":
|
|
loopBool = True
|
|
if loopBool == True:
|
|
if int(parsedMessage[0]) == 1:
|
|
loopBool = False
|
|
|
|
# If roll is in xdx+x format
|
|
if loopBool == True:
|
|
rolls: list = []
|
|
for x in range(int(parsedMessage[0])):
|
|
rolls.append(random.randint(1, int(parsedMessage[1])))
|
|
|
|
rollTotal = 0
|
|
for roll in rolls:
|
|
rollTotal = rollTotal + roll
|
|
diceRoll = diceRoll + str(roll) + ", "
|
|
diceRoll = diceRoll[:-2] # This removes the last two characters in the string
|
|
|
|
if len(temp_preParsedMessage) == 2:
|
|
diceRoll = diceRoll + " + " + temp_preParsedMessage[1] + " = " + str(
|
|
rollTotal + int(temp_preParsedMessage[1]))
|
|
else:
|
|
diceRoll = diceRoll + " = " + str(rollTotal)
|
|
# If roll is in dx+x format
|
|
if loopBool == False:
|
|
roll: int = random.randint(1, int(parsedMessage[1]))
|
|
|
|
if len(temp_preParsedMessage) == 2:
|
|
diceRoll = str(roll) + " + " + temp_preParsedMessage[1] + " = " + str(
|
|
roll + int(temp_preParsedMessage[1]))
|
|
else:
|
|
diceRoll = str(roll)
|
|
|
|
diceRoll = "@" + message.sender + " rolled: " + diceRoll
|
|
print(diceRoll)
|
|
self.send_message(diceRoll)
|
|
|
|
|
|
# This is a old function used prior to the creation of the Twitch_Module class above.
|
|
# I need to make a new one for the class.
|
|
def main_chat_commands_check(channel, sender, text):
|
|
response = db.basic_command_trigger(channel, sender, text)
|
|
if response == "$$None$$":
|
|
pass
|
|
else:
|
|
print("Curious Nerd Response Function:")
|
|
print(response)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
testChat = Twitch_Module()
|
|
testChat.join_channel("thecuriousnerd")
|