mirror of
https://gitlab.com/woem/truecaptcha_bot.git
synced 2023-02-13 20:53:57 -05:00
164 lines
5.2 KiB
Python
164 lines
5.2 KiB
Python
import time
|
|
import json
|
|
import atexit
|
|
import signal
|
|
import secrets
|
|
import telebot
|
|
import threading
|
|
from PIL import Image
|
|
from os import listdir, getenv
|
|
from telebot import types
|
|
from random import shuffle
|
|
from os.path import isfile, isdir, join
|
|
|
|
token = getenv('TOKEN')
|
|
bot_userid = token.split(':')[0]
|
|
bot = telebot.TeleBot(token)
|
|
bot.remove_webhook()
|
|
|
|
captcha_wait = {}
|
|
sem = threading.Semaphore(value=1)
|
|
|
|
def __change():
|
|
global captcha_wait
|
|
try:
|
|
with open('captcha_wait', 'w') as file:
|
|
sem.acquire()
|
|
file.write(json.dumps(captcha_wait))
|
|
sem.release()
|
|
except:
|
|
pass
|
|
|
|
def __startup():
|
|
global captcha_wait
|
|
try:
|
|
with open('captcha_wait', 'r') as file:
|
|
sem.acquire()
|
|
captcha_wait = json.loads(file.read())
|
|
sem.release()
|
|
except:
|
|
pass
|
|
|
|
def getFiles(dir):
|
|
return [f for f in listdir(dir) if isfile(join(dir, f))]
|
|
|
|
def getDirs(dir):
|
|
return [d for d in listdir(dir) if isdir(join(dir, d))]
|
|
|
|
def check_perm(message):
|
|
perms = bot.get_chat_member(chat_id=message.chat.id, user_id=bot_userid)
|
|
return (perms.can_restrict_members == True)
|
|
|
|
def get_keyboard(user_id, emojiis):
|
|
captcha_solve = types.InlineKeyboardMarkup(row_width=6)
|
|
ln = len(emojiis)
|
|
shuffle(emojiis)
|
|
arr = []
|
|
for i in range(0, ln):
|
|
arr.append(types.InlineKeyboardButton(text = emojiis[i], callback_data = str(user_id) + "_" + emojiis[i]),)
|
|
i+=1
|
|
captcha_solve.add(*arr)
|
|
return captcha_solve
|
|
|
|
def send_captcha(chat_id, user_id, mention):
|
|
bot.send_chat_action(chat_id, "upload_photo")
|
|
|
|
emojii_avail = getDirs("./captcha/")
|
|
emojii_choosen = secrets.choice(emojii_avail)
|
|
|
|
captcha_avail = getFiles("./captcha/" + emojii_choosen)
|
|
captcha_choosen = secrets.choice(captcha_avail)
|
|
|
|
captcha_loc = join(join("./captcha/", emojii_choosen), captcha_choosen)
|
|
captcha_image = open(captcha_loc, 'rb')
|
|
|
|
global captcha_wait
|
|
sem.acquire()
|
|
if chat_id not in captcha_wait:
|
|
captcha_wait[chat_id] = {}
|
|
if user_id not in captcha_wait[chat_id]:
|
|
captcha_wait[chat_id][user_id] = ""
|
|
captcha_wait[chat_id][user_id] = emojii_choosen
|
|
sem.release()
|
|
|
|
msg = bot.send_photo(chat_id=chat_id, photo=captcha_image, caption=mention, reply_markup=get_keyboard(user_id, emojii_avail))
|
|
|
|
__change()
|
|
|
|
threading.Timer(60 * 5, on_timeout, [chat_id, msg.message_id, user_id]).start()
|
|
|
|
img1 = Image.open(captcha_loc)
|
|
img1.save(captcha_loc) # this will change next file_id
|
|
|
|
def validate_captcha(chat_id, user_id, suggested_emojii):
|
|
global captcha_wait
|
|
sem.acquire()
|
|
result = False
|
|
if chat_id in captcha_wait:
|
|
if user_id in captcha_wait[chat_id]:
|
|
if suggested_emojii == captcha_wait[chat_id][user_id]:
|
|
result = True
|
|
del captcha_wait[chat_id][user_id]
|
|
sem.release()
|
|
return result
|
|
|
|
def on_timeout(chat_id, message_id, user_id):
|
|
try:
|
|
perms = bot.get_chat_member(chat_id=chat_id, user_id=user_id)
|
|
if perms.can_send_messages == False:
|
|
bot.kick_chat_member(
|
|
chat_id=chat_id,
|
|
user_id=user_id,
|
|
until_date=int(time.time()) + 3600 * 24
|
|
)
|
|
|
|
if chat_id in captcha_wait:
|
|
if user_id in captcha_wait[chat_id]:
|
|
del captcha_wait[chat_id][user_id]
|
|
|
|
bot.delete_message(chat_id=chat_id, message_id=message_id)
|
|
except:
|
|
pass
|
|
|
|
def get_mention(user):
|
|
mention = str(user.first_name[:32] + " ")
|
|
if "last_name" in user.__dict__:
|
|
if user.last_name is not None:
|
|
mention += str(user.last_name[:32])
|
|
mention = mention.rstrip(" ")
|
|
if "username" in user.__dict__:
|
|
if user.username is not None:
|
|
mention = "@" + str(user.username)
|
|
return mention
|
|
|
|
@bot.message_handler(func=check_perm, content_types=["new_chat_members"])
|
|
def handle_new_user(message):
|
|
for user in message.new_chat_members:
|
|
if user.is_bot == False:
|
|
bot.restrict_chat_member(
|
|
chat_id=message.chat.id,
|
|
user_id=user.id,
|
|
until_date=0
|
|
)
|
|
send_captcha(message.chat.id, user.id, get_mention(user) + ", что изображено на данной картинке? Нажмите на нужную кнопку для ответа.")
|
|
|
|
@bot.callback_query_handler(func=lambda c: True)
|
|
def handle_callback(x):
|
|
bot.answer_callback_query(x.id)
|
|
data = x.data.split("_")
|
|
if str(data[0]) != str(x.from_user.id):
|
|
return
|
|
result = validate_captcha(x.message.chat.id, x.from_user.id, data[1])
|
|
if result:
|
|
msg = bot.send_message(x.message.chat.id, "По моим данным, " + get_mention(x.from_user) + " похож на человека. Приветствуем!")
|
|
threading.Timer(60, bot.delete_message, [x.message.chat.id, msg.message_id]).start()
|
|
bot.restrict_chat_member(
|
|
chat_id=x.message.chat.id,
|
|
user_id=x.from_user.id,
|
|
until_date=int(time.time())+30,
|
|
can_send_messages=True
|
|
)
|
|
on_timeout(x.message.chat.id, x.message.message_id, x.from_user.id)
|
|
|
|
__startup()
|
|
bot.polling(none_stop=True, interval=0, timeout=60)
|