Skip to content

Commit

Permalink
Update ToEnWikipediaBot.py
Browse files Browse the repository at this point in the history
Enhanced privacy
  • Loading branch information
jnton authored Sep 15, 2024
1 parent 0b8f7dc commit 7c96029
Showing 1 changed file with 99 additions and 79 deletions.
178 changes: 99 additions & 79 deletions ToEnWikipediaBot.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,46 @@
from telegram.utils.helpers import escape
import os
import logging
import re
from urllib.parse import unquote
import aiohttp
import json
import asyncio
from urllib.parse import urlparse
from telegram import Update, InlineQueryResultArticle, InputTextMessageContent, MessageEntity
from telegram.ext import Application, MessageHandler, filters, CommandHandler, ContextTypes, InlineQueryHandler
from uuid import uuid4
import traceback # Ensure this is imported if you use traceback

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# Set logging level to WARNING for production
logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
query = update.inline_query.query
results = []
# Example using a simple in-memory rate limiter
user_requests = {}

logger.info(f"Handling inline query: {query}")
async def check_rate_limit(user_id):
from time import time

if query:
try:
async with aiohttp.ClientSession() as session:
# Simplified response for debugging
# This is where you would normally call `get_english_wikipedia_url`
# For now, let's return a static result to ensure inline queries work
static_response = "This is a static response for debugging."
results.append(
InlineQueryResultArticle(
id=str(uuid4()),
title="Debugging Title",
input_message_content=InputTextMessageContent(static_response),
description="Static response for testing"
)
)
await update.inline_query.answer(results, cache_time=10)
except Exception as e:
logger.error(f"Error handling inline query: {e}")
current_time = time()
window = 60 # 60-second window
max_requests = 30

async def get_english_wikipedia_url(session, original_url, article_title, language_code):
if language_code == 'en':
return None
wiki_api_url = f"https://{language_code}.wikipedia.org/w/api.php"
params = {
'action': 'query',
'format': 'json',
'titles': article_title.replace('_', ' '), # Correctly handle spaces in titles
'prop': 'pageprops|info',
'inprop': 'url' # Request the full URL to ensure we have it for reference
}
async with session.get(wiki_api_url, params=params) as response:
if response.status == 200:
data = await response.json()
pages = next(iter(data['query']['pages'].values()))
correct_title = pages.get('title', article_title.replace('_', ' ')) # Fetch the display title
wikidata_id = pages.get('pageprops', {}).get('wikibase_item')
if wikidata_id:
wikidata_url = f"https://www.wikidata.org/wiki/Special:EntityData/{wikidata_id}.json"
async with session.get(wikidata_url) as wd_response:
if wd_response.status == 200:
wd_data = await wd_response.json()
if 'enwiki' in wd_data['entities'][wikidata_id]['sitelinks']:
en_title = wd_data['entities'][wikidata_id]['sitelinks']['enwiki']['title']
en_url = f"https://en.wikipedia.org/wiki/{en_title.replace(' ', '_')}"
return f"<b>English Wikipedia page found for <a href=\"{original_url}\">{correct_title}</a></b>:\n{en_url}"
else:
return f"<b>No English Wikipedia page found for <a href=\"{original_url}\">{correct_title}</a></b>."
return None
request_times = user_requests.get(user_id, [])
# Remove outdated requests
request_times = [t for t in request_times if t > current_time - window]
request_times.append(current_time)
user_requests[user_id] = request_times

if len(request_times) > max_requests:
return False # Rate limit exceeded
return True

async def check_wiki_link(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if not await check_rate_limit(user_id):
await update.message.reply_text("You are sending requests too quickly. Please slow down.")
return
try:
message = update.message
links = []
Expand Down Expand Up @@ -111,19 +83,70 @@ async def check_wiki_link(update: Update, context: ContextTypes.DEFAULT_TYPE) ->
response = await get_english_wikipedia_url(session, original_url, article_title, language_code)
if response:
responses.append(response) # Collect and append response

if responses: # If there are responses to send back
reply_message = "\n\n".join(responses) # Aggregate responses into a single message
await update.message.reply_text(
reply_message,
parse_mode='HTML',
disable_web_page_preview=True,
reply_to_message_id=update.message.message_id # Explicitly set the reply
) # Send reply

except Exception:
logger.error("An error occurred in check_wiki_link.")
# Optionally, log traceback during development but remove in production
# logger.debug(traceback.format_exc())

if responses: # If there are responses to send back
reply_message = "\n\n".join(responses) # Aggregate responses into a single message
await update.message.reply_text(
reply_message,
parse_mode='HTML',
disable_web_page_preview=True,
reply_to_message_id=update.message.message_id # Explicitly set the reply
) # Send reply
async def get_english_wikipedia_url(session, original_url, article_title, language_code):
# Only allow requests to Wikipedia and Wikidata domains
allowed_domains = ["wikipedia.org", "wikidata.org"]

def is_valid_domain(url):
domain = urlparse(url).netloc
return any(domain.endswith(allowed) for allowed in allowed_domains)

except Exception as e:
logger.error(f"Error in check_wiki_link: {e}")
logger.error(traceback.format_exc())
wiki_api_url = f"https://{language_code}.wikipedia.org/w/api.php"

if not is_valid_domain(wiki_api_url):
logger.warning("Attempted to access an invalid domain.")
return None

if language_code == 'en':
return None

params = {
'action': 'query',
'format': 'json',
'titles': article_title.replace('_', ' '),
'prop': 'pageprops|info',
'inprop': 'url'
}
async with session.get(wiki_api_url, params=params) as response:
if response.status == 200:
data = await response.json()
pages = next(iter(data['query']['pages'].values()))
correct_title = pages.get('title', article_title.replace('_', ' '))
wikidata_id = pages.get('pageprops', {}).get('wikibase_item')
if wikidata_id:
wikidata_url = f"https://www.wikidata.org/wiki/Special:EntityData/{wikidata_id}.json"
async with session.get(wikidata_url) as wd_response:
if wd_response.status == 200:
wd_data = await wd_response.json()
if 'enwiki' in wd_data['entities'][wikidata_id]['sitelinks']:
en_title = wd_data['entities'][wikidata_id]['sitelinks']['enwiki']['title']
en_url = f"https://en.wikipedia.org/wiki/{en_title.replace(' ', '_')}"

# Escape user-supplied content
escaped_title = escape(correct_title)
escaped_url = escape(original_url)

return f"<b>English Wikipedia page found for <a href=\"{escaped_url}\">{escaped_title}</a></b>:\n{en_url}"
else:
escaped_title = escape(correct_title)
escaped_url = escape(original_url)
return f"<b>No English Wikipedia page found for <a href=\"{escaped_url}\">{escaped_title}</a></b>."
return None

async def process_link(session, original_url):
# Decode the URL to ensure special characters are handled properly
Expand Down Expand Up @@ -184,7 +207,6 @@ async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
description="Only non-English Wikipedia links are processed"
)]

await update.inline_query.answer(results, cache_time=10)
# Only process non-empty queries
if query:
async with aiohttp.ClientSession() as session:
Expand Down Expand Up @@ -235,34 +257,32 @@ async def async_lambda_handler(event, context):
await application.process_update(update)
await application.shutdown()
return {'statusCode': 200, 'body': json.dumps({'message': 'Success'})}
except Exception as e:
logger.error(f"An error occurred while processing the update: {e}")
except Exception:
logger.error("An error occurred while processing the update.")
return {'statusCode': 500, 'body': json.dumps({'error': 'Internal server error'})}
else:
logger.error("No update payload found.")
return {'statusCode': 400, 'body': json.dumps({'message': 'No update payload'})}

def lambda_handler(event, context):
print("Raw event:", event)
try:
body = json.loads(event['body']) # Correctly parse the JSON body from the event
print("Body parsed:", body)
except json.JSONDecodeError as error:
print("Error decoding JSON:", error)
return {
"statusCode": 400,
"body": json.dumps({"message": "Invalid JSON received"})
}

# print("Raw event:", event)
if 'body' not in event:
logger.error("No 'body' in event. Event structure incorrect or missing data.")
return {
'statusCode': 400,
'body': json.dumps({'message': 'Event structure incorrect or missing data'}),
'headers': {'Content-Type': 'application/json'}
}
try:
body = json.loads(event['body']) # Correctly parse the JSON body from the event
# print("Body parsed:", body)
except json.JSONDecodeError as error:
logger.error("Error decoding JSON.")
return {
"statusCode": 400,
"body": json.dumps({"message": "Invalid JSON received"})
}

# Ensure to use the correct variable name that contains the parsed JSON
return asyncio.run(async_lambda_handler(event, context))

# Ensure this part is not executed when the script is imported as a module in Lambda
Expand Down

0 comments on commit 7c96029

Please sign in to comment.