First Push
Im so sorry for the bad code T_T (Also note to self, I deleted the old code)
This commit is contained in:
commit
994981cfdb
1
README.md
Normal file
1
README.md
Normal file
@ -0,0 +1 @@
|
||||
A chatbot to help with live stream production/effects.
|
||||
1
bots.py
Normal file
1
bots.py
Normal file
@ -0,0 +1 @@
|
||||
botList = ["Nightbot", "StreamElements", "Moobot", "praxis_bot"]
|
||||
64
db.py
Normal file
64
db.py
Normal file
@ -0,0 +1,64 @@
|
||||
import mysql.connector
|
||||
import os
|
||||
import db_cred as db_credentials
|
||||
|
||||
import pandas as pd
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
class db_module():
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.currentWorkingDB: str
|
||||
self.engine = None
|
||||
|
||||
def setup_engine(self):
|
||||
self.engine = create_engine(db_credentials.engine_url)
|
||||
print("Engine Created")
|
||||
|
||||
def create_table(self, tableName:str = ""):
|
||||
pass
|
||||
|
||||
def does_table_exist(self, tableName:str = ""):
|
||||
pass
|
||||
|
||||
def delete_table(self, tableName:str = ""):
|
||||
pass
|
||||
|
||||
#This was a old function used prior to the creation of this class. I need to remake it.
|
||||
def get_data_old(self, tableName:str = "", key:str = ""):
|
||||
table = '_channel_commands'
|
||||
table = tableName
|
||||
|
||||
|
||||
df = pd.read_sql_query('SELECT * FROM '+ table, engine)
|
||||
stmt = "trigger == '" + key + "'"
|
||||
temp = df.query(stmt)
|
||||
result = temp.get("response")
|
||||
|
||||
#print(result)
|
||||
i = len(temp.index.values)
|
||||
|
||||
if i == 1:
|
||||
output = result[temp.index.values[0]]
|
||||
pass
|
||||
else:
|
||||
output = "$$None$$"
|
||||
return output
|
||||
|
||||
def get_data(self, tableName:str = "", key:str = ""):
|
||||
pass
|
||||
|
||||
def insert_data(self, tableName:str = "", param:str = ""):
|
||||
pass
|
||||
|
||||
def edit_data(self, tableName:str = "", key:str = "", param:str = ""):
|
||||
pass
|
||||
|
||||
def delete_data(self, tableName:str = "", key:str = ""):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
db_connection = db_module()
|
||||
db_connection.setup_engine()
|
||||
1
db_cred.py
Normal file
1
db_cred.py
Normal file
@ -0,0 +1 @@
|
||||
engine_url = "mysql+mysqlconnector://"
|
||||
17
hotkey_script.py
Normal file
17
hotkey_script.py
Normal file
@ -0,0 +1,17 @@
|
||||
import pygetwindow as gw
|
||||
from pynput.keyboard import Key, Controller
|
||||
import time
|
||||
keyboard = Controller()
|
||||
|
||||
|
||||
def focusOBS():
|
||||
gw.Win32Window.activate(gw.getWindowsWithTitle('OBS')[0])
|
||||
|
||||
|
||||
def hotkey_trigger(key_input):
|
||||
time.sleep(1)
|
||||
focusOBS()
|
||||
time.sleep(1)
|
||||
with keyboard.pressed(Key.ctrl, Key.shift, Key.alt):
|
||||
keyboard.press(key_input)
|
||||
keyboard.release(key_input)
|
||||
32
main.py
Normal file
32
main.py
Normal file
@ -0,0 +1,32 @@
|
||||
# Install these:
|
||||
# pip install mysql-connector-python
|
||||
# pip install pynput
|
||||
# pip install twitch-python
|
||||
# pip install SQLAlchemy
|
||||
# pip install pandas
|
||||
# pip install numpy
|
||||
# pip install gTTS
|
||||
# pip install playsound
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
||||
import twitch_script_class
|
||||
|
||||
import utilities_script as utility
|
||||
|
||||
twitch_chat: twitch_script_class.Twitch_Module
|
||||
|
||||
def main():
|
||||
print("Connecting to Channels...")
|
||||
|
||||
global twitch_chat
|
||||
twitch_chat = twitch_script_class.Twitch_Module()
|
||||
|
||||
twitch_chat.join_channel("thecuriousnerd")
|
||||
#twitch_chat.send_message("activated")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
8
python bot.code-workspace
Normal file
8
python bot.code-workspace
Normal file
@ -0,0 +1,8 @@
|
||||
{
|
||||
"folders": [
|
||||
{
|
||||
"path": "."
|
||||
}
|
||||
],
|
||||
"settings": {}
|
||||
}
|
||||
1
slurs.py
Normal file
1
slurs.py
Normal file
@ -0,0 +1 @@
|
||||
slurList = ["fag", "faggot", "niga", "nigga", "nigger", "retard", "tard", "rtard", "coon"]
|
||||
40
tts.py
Normal file
40
tts.py
Normal file
@ -0,0 +1,40 @@
|
||||
from gtts import gTTS
|
||||
import os
|
||||
|
||||
import datetime
|
||||
|
||||
from playsound import playsound
|
||||
|
||||
|
||||
def tts(inputText:str, *args):
|
||||
|
||||
destPath = os.getcwd() + "\\tts\\"
|
||||
time = datetime.datetime.now()
|
||||
fileName:str = time.strftime("%m-%d-%Y_%H-%M-%S") + "_tts.mp3"
|
||||
|
||||
if len(args) == 1:
|
||||
fileName = args[0] + "_tts.mp3"
|
||||
|
||||
tts = gTTS(text=inputText, lang='en')
|
||||
tts.save(destPath + fileName)
|
||||
|
||||
playsound(destPath + fileName)
|
||||
|
||||
#os.system(filename)
|
||||
|
||||
def play_speech(fileName):
|
||||
destPath = os.getcwd() + "\\tts\\"
|
||||
playsound(destPath + fileName)
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Enter Text: ")
|
||||
textInput = str(input())
|
||||
print("Custom FileName? y/n: ")
|
||||
bool_string = str(input())
|
||||
|
||||
if bool_string == "y":
|
||||
print("Enter FileName: ")
|
||||
fileName = str(input())
|
||||
tts(textInput, fileName)
|
||||
else:
|
||||
tts(textInput)
|
||||
10
twitch_cred.py
Normal file
10
twitch_cred.py
Normal file
@ -0,0 +1,10 @@
|
||||
|
||||
username = ""
|
||||
helix = ""
|
||||
oauth = "oauth:"
|
||||
v5_Client = ""
|
||||
|
||||
|
||||
#Helix ID https://dev.twitch.tv/console/apps
|
||||
#Oauth https://twitchapps.com/tmi/
|
||||
#V5 Client ID https://twitchtokengenerator.com/
|
||||
199
twitch_script_class.py
Normal file
199
twitch_script_class.py
Normal file
@ -0,0 +1,199 @@
|
||||
import random
|
||||
import twitch
|
||||
import twitch.chat
|
||||
import twitch_cred as twitch_credentials
|
||||
|
||||
import bots as botList
|
||||
import slurs as slurList
|
||||
import tts
|
||||
|
||||
import db
|
||||
|
||||
class Twitch_Module():
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.chat: twitch.Chat
|
||||
self.tts_enabled:bool = False
|
||||
self.tts_whitelist_enabled:bool = False
|
||||
self.links_allowed:bool = True
|
||||
self.whitelisted_users:list = ["thecuriousnerd", "theredpoint", "lakotor"]
|
||||
|
||||
def join_channel(self, channel_name):
|
||||
channel_name = "#" + channel_name
|
||||
|
||||
self.chat = twitch.Chat(
|
||||
channel=channel_name,
|
||||
nickname=twitch_credentials.username,
|
||||
oauth=twitch_credentials.oauth,
|
||||
helix=twitch.Helix(twitch_credentials.helix, use_cache=True)
|
||||
)
|
||||
self.chat.subscribe(self.twitch_chat)
|
||||
|
||||
print("Connected to Channel: ", channel_name)
|
||||
|
||||
|
||||
|
||||
def leave_channel(self):
|
||||
print("Leaving Channel",self.chat.channel)
|
||||
self.chat.irc.leave_channel(self.chat.channel)
|
||||
self.chat.irc.socket.close()
|
||||
|
||||
def send_message(self, message):
|
||||
self.chat.send(message)
|
||||
|
||||
def send_whisper(self, user, message):
|
||||
pass
|
||||
|
||||
#This reacts to messages
|
||||
def twitch_chat(self, message: twitch.chat.Message) -> None:
|
||||
print("[#"+ message.channel + "](" + message.sender + ")> " + message.text)
|
||||
if message.channel == "thecuriousnerd":
|
||||
|
||||
if self.isSenderBot(message) == False:
|
||||
if message.sender.lower() == "thecuriousnerd":
|
||||
self.eval_commands(message)
|
||||
self.tts_message(message)
|
||||
|
||||
|
||||
def eval_commands(self, message: twitch.chat.Message):
|
||||
containsURL:bool = self.contains_url(message)
|
||||
|
||||
if message.text.startswith('!tts start'):
|
||||
print("tts activated on #" + message.channel)
|
||||
self.send_message("tts activated")
|
||||
self.tts_enabled = True
|
||||
|
||||
if message.text.startswith('!tts stop'):
|
||||
print("tts deactivated on #" + message.channel)
|
||||
self.send_message("tts deactivated")
|
||||
self.tts_enabled = True
|
||||
|
||||
if message.text.startswith('!test'):
|
||||
print("!test Detected")
|
||||
message.chat.send("test acknowledged")
|
||||
#message.chat.send(f'@{message.user().display_name}, you have {message.user().view_count} views.')
|
||||
|
||||
if message.text.startswith('!roll'):
|
||||
try:
|
||||
self.dice_roll(message)
|
||||
except Exception:
|
||||
self.send_message("{something went wrong}")
|
||||
print("{something went wrong}")
|
||||
|
||||
|
||||
def tts_message(self, message: twitch.chat.Message):
|
||||
if self.contains_slur(message) == False:
|
||||
if self.tts_enabled == True:
|
||||
if message.text.startswith('!') == False:
|
||||
if message.sender.lower() == message.channel:
|
||||
tts.tts(message.sender + " says, " + message.text)
|
||||
else:
|
||||
#tts.tts(message.sender + " says, " + message.text)
|
||||
tts.tts(message.sender + " says, " + message.text, message.channel + " user msg")
|
||||
|
||||
def contains_url(self, message: twitch.chat.Message):
|
||||
containsURL:bool = False
|
||||
if message.text.lower().find("http") != -1:
|
||||
containsURL = True
|
||||
if message.text.lower().find("https") != -1:
|
||||
containsURL = True
|
||||
if message.text.lower().find(".com") != -1:
|
||||
containsURL = True
|
||||
if message.text.lower().find(".net") != -1:
|
||||
containsURL = True
|
||||
if message.text.lower().find(".org") != -1:
|
||||
containsURL = True
|
||||
if message.text.lower().find(".tv") != -1:
|
||||
containsURL = True
|
||||
if message.text.lower().find(".io") != -1:
|
||||
containsURL = True
|
||||
if containsURL == True:
|
||||
print("<{ link detected! }> " + " [#"+ message.channel + "](" + message.sender + ") sent a link in chat")
|
||||
return containsURL
|
||||
|
||||
#Checks if Sender is bot.
|
||||
def isSenderBot(self, message: twitch.chat.Message):
|
||||
isBot = False
|
||||
for bot in botList.botList:
|
||||
if message.sender.lower() == bot.lower():
|
||||
isBot = True
|
||||
print("<{ bot detected! }> " + " [#"+ message.channel + "](" + message.sender + ") is a bot")
|
||||
return isBot
|
||||
|
||||
#Checks for basic slurs.
|
||||
def contains_slur(self, message: twitch.chat.Message):
|
||||
containsSlur:bool = False
|
||||
parsedMessage = message.text.split(" ")
|
||||
for word in parsedMessage:
|
||||
for slur in slurList.slurList:
|
||||
if word.lower() == slur:
|
||||
containsSlur = True
|
||||
|
||||
if containsSlur == True:
|
||||
print("<{ slur detected! }> " + " [#"+ message.channel + "](" + message.sender + ") used a slur in chat")
|
||||
return containsSlur
|
||||
|
||||
#Rolls Dice.
|
||||
def dice_roll(self, message: twitch.chat.Message):
|
||||
diceRoll:str = ""
|
||||
self.send_message("Rolling Dice...")
|
||||
print("Rolling Dice...")
|
||||
|
||||
temp_preParsedMessage = message.text.split("+")
|
||||
|
||||
tempParsedMessage = temp_preParsedMessage[0].split(" ")
|
||||
temp_dice_stmt:str = tempParsedMessage[1]
|
||||
parsedMessage = temp_dice_stmt.lower().split("d")
|
||||
|
||||
loopBool:bool = False
|
||||
if parsedMessage[0] != "":
|
||||
loopBool = True
|
||||
if loopBool == True:
|
||||
if int(parsedMessage[0]) == 1:
|
||||
loopBool = False
|
||||
|
||||
#If roll is in xdx+x format
|
||||
if loopBool == True:
|
||||
rolls:list = []
|
||||
for x in range(int(parsedMessage[0])):
|
||||
rolls.append(random.randint(1, int(parsedMessage[1])))
|
||||
|
||||
rollTotal = 0
|
||||
for roll in rolls:
|
||||
rollTotal = rollTotal + roll
|
||||
diceRoll = diceRoll + str(roll) + ", "
|
||||
diceRoll = diceRoll[:-2] #This removes the last two characters in the string
|
||||
|
||||
if len(temp_preParsedMessage) == 2:
|
||||
diceRoll = diceRoll + " + " + temp_preParsedMessage[1] + " = " + str(rollTotal + int(temp_preParsedMessage[1]))
|
||||
else:
|
||||
diceRoll = diceRoll + " = " + str(rollTotal)
|
||||
#If roll is in dx+x format
|
||||
if loopBool == False:
|
||||
roll:int = random.randint(1, int(parsedMessage[1]))
|
||||
|
||||
if len(temp_preParsedMessage) == 2:
|
||||
diceRoll = str(roll) + " + " + temp_preParsedMessage[1] + " = " + str(roll + int(temp_preParsedMessage[1]))
|
||||
else:
|
||||
diceRoll = str(roll)
|
||||
|
||||
diceRoll = "@" + message.sender + " rolled: " + diceRoll
|
||||
print(diceRoll)
|
||||
self.send_message(diceRoll)
|
||||
|
||||
|
||||
#This is a old function used prior to the creation of the Twitch_Module class above.
|
||||
#I need to make a new one for the class.
|
||||
def main_chat_commands_check(channel, sender, text):
|
||||
response = db.basic_command_trigger(channel, sender, text)
|
||||
if response == "$$None$$":
|
||||
pass
|
||||
else:
|
||||
print("Curious Nerd Response Function:")
|
||||
print(response)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
testChat = Twitch_Module()
|
||||
testChat.join_channel("thecuriousnerd")
|
||||
4
utilities_script.py
Normal file
4
utilities_script.py
Normal file
@ -0,0 +1,4 @@
|
||||
import os
|
||||
|
||||
clearScreen = lambda : os.system('cls' if os.name == 'nt' else 'clear')
|
||||
|
||||
Loading…
Reference in New Issue
Block a user