major update :O
This commit is contained in:
41
src/routes/dynamic_routes.py
Normal file
41
src/routes/dynamic_routes.py
Normal file
@@ -0,0 +1,41 @@
|
||||
# Imports
|
||||
from flask import Blueprint, render_template, request, abort
|
||||
from os import getenv as env
|
||||
import logging, os, re
|
||||
|
||||
# Create blueprint
|
||||
bp = Blueprint(
|
||||
'dynamic_routes',
|
||||
__name__,
|
||||
template_folder=env('TEMPLATE_FOLDER', default='../templates'),
|
||||
static_folder=env('STATIC_FOLDER', default='../static')
|
||||
)
|
||||
|
||||
# Create logger
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# Get all files in folder
|
||||
def ListFiles(path):
|
||||
files = []
|
||||
for root, dirs, files_in_dir in os.walk(path):
|
||||
for file in files_in_dir:
|
||||
files.append(os.path.relpath(os.path.join(root, file), path))
|
||||
for dir in dirs:
|
||||
files.append(os.path.relpath(os.path.join(root, dir), path) + '/')
|
||||
return files
|
||||
|
||||
# Catch-all route for generic pages
|
||||
@bp.route('/<path:filename>')
|
||||
def catch_all(filename):
|
||||
try:
|
||||
return render_template(f'pages/{filename if re.match(r'^.+\.[a-zA-Z0-9]+$', filename) else filename + '.html'}')
|
||||
|
||||
except Exception as e:
|
||||
os_path = os.path.join(bp.template_folder, 'pages', filename)[3:]
|
||||
print(os_path)
|
||||
if os.path.isdir(os_path):
|
||||
if not filename.endswith('/'): filename += '/'
|
||||
return render_template('bases/directory.html', directory=filename, pages=ListFiles(os_path))
|
||||
|
||||
# If it is a file, return a 404 error
|
||||
abort(404, f"Template '{filename}' not found: {e}")
|
||||
@@ -3,9 +3,6 @@ from flask import Blueprint, render_template
|
||||
from os import getenv as env
|
||||
import logging
|
||||
|
||||
import src.routes.snake as snake
|
||||
|
||||
|
||||
# Create blueprint
|
||||
bp = Blueprint(
|
||||
'error_handlers',
|
||||
@@ -14,11 +11,9 @@ bp = Blueprint(
|
||||
static_folder=env('STATIC_FOLDER', default='../static')
|
||||
)
|
||||
|
||||
|
||||
# Create logger
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Route for 500 error
|
||||
@bp.route('/500')
|
||||
@bp.app_errorhandler(500)
|
||||
@@ -27,17 +22,13 @@ def internal_server_error(error=None):
|
||||
log.error("Internal server error: %s", error)
|
||||
return render_template('errors/500.html'), 500
|
||||
|
||||
|
||||
# Route for 404 error
|
||||
@bp.route('/404')
|
||||
@bp.app_errorhandler(404)
|
||||
def not_found(error=None):
|
||||
if error is not None:
|
||||
log.warning("Page not found: %s", error)
|
||||
scores = snake.get_leaderboard()
|
||||
token = snake.generate_start_token()
|
||||
return render_template('errors/404.html', scores=scores, token=token, cap_key=env('CAP_KEY', default='')), 404 if error is not None else 200
|
||||
|
||||
return render_template('errors/404.html'), 404 if error is not None else 200
|
||||
|
||||
# Route for 400 error
|
||||
@bp.route('/400')
|
||||
|
||||
@@ -1,65 +0,0 @@
|
||||
# Imports
|
||||
from flask import Blueprint, render_template, request, abort, send_file
|
||||
from os import getenv as env
|
||||
import logging, os
|
||||
|
||||
|
||||
# Create blueprint
|
||||
bp = Blueprint(
|
||||
'generic',
|
||||
__name__,
|
||||
template_folder=env('TEMPLATE_FOLDER', default='../templates'),
|
||||
static_folder=env('STATIC_FOLDER', default='../static')
|
||||
)
|
||||
|
||||
|
||||
# Create logger
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Route for index page
|
||||
@bp.route('/')
|
||||
def index():
|
||||
return render_template('index.html')
|
||||
|
||||
|
||||
# Route for favicon
|
||||
@bp.route('/favicon.ico')
|
||||
def favicon():
|
||||
return send_file('../static/content/other/favicon.ico')
|
||||
|
||||
|
||||
# Route for robots.txt
|
||||
@bp.route('/robots.txt')
|
||||
def robots():
|
||||
return send_file('../static/content/other/robots.txt')
|
||||
|
||||
|
||||
# Route for sitemap.xml
|
||||
@bp.route('/sitemap.xml')
|
||||
def sitemap():
|
||||
return send_file('../static/content/other/sitemap.xml')
|
||||
|
||||
|
||||
# Catch-all route for generic pages
|
||||
@bp.route('/<path:filename>')
|
||||
def catch_all(filename):
|
||||
try: return render_template(f'pages/{filename if filename.endswith(".html") else filename + ".html"}')
|
||||
except Exception as e:
|
||||
# If the template is not found, check if it is a directory
|
||||
os_path = os.path.join(bp.template_folder, 'pages', filename)[3:]
|
||||
if os.path.isdir(os_path):
|
||||
# walk through the directory and find all files
|
||||
pages = []
|
||||
for root, dirs, files_in_dir in os.walk(os_path):
|
||||
for file in files_in_dir:
|
||||
pages.append(os.path.relpath(os.path.join(root, file), os_path))
|
||||
for dir in dirs:
|
||||
pages.append(os.path.relpath(os.path.join(root, dir), os_path) + '/')
|
||||
|
||||
# If it is a directory, render a directory page
|
||||
if not filename.endswith('/'): filename += '/'
|
||||
return render_template('bases/directory.html', directory=filename, pages=pages)
|
||||
|
||||
# If it is a file, return a 404 error
|
||||
abort(404, f"Template '{filename}' not found: {e}")
|
||||
@@ -1,153 +0,0 @@
|
||||
# Imports
|
||||
from flask import Blueprint, abort, request, redirect
|
||||
from os import urandom, getenv as env
|
||||
import src.utils.database as database
|
||||
import src.utils.cap as cap
|
||||
import logging, datetime, threading, time
|
||||
|
||||
|
||||
# Create blueprint
|
||||
bp = Blueprint(
|
||||
'snake',
|
||||
__name__,
|
||||
template_folder=env('TEMPLATE_FOLDER', default='../templates'),
|
||||
static_folder=env('STATIC_FOLDER', default='../static')
|
||||
)
|
||||
|
||||
|
||||
# Create logger
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Create database instance
|
||||
db = database.Database(
|
||||
host=env('DB_HOST', default='localhost'),
|
||||
port=env('DB_PORT', default=5432),
|
||||
user=env('DB_USER', default='user'),
|
||||
password=env('DB_PASSWORD', default='password'),
|
||||
db_name=env('DB_NAME', default='db_name')
|
||||
)
|
||||
|
||||
db.execute('CREATE TABLE IF NOT EXISTS snake_scores (id SERIAL PRIMARY KEY, name TEXT, score INTEGER)')
|
||||
db.execute('''CREATE TABLE IF NOT EXISTS snake_tokens (
|
||||
id SERIAL PRIMARY KEY,
|
||||
token TEXT UNIQUE NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
ip TEXT UNIQUE NOT NULL
|
||||
)''')
|
||||
|
||||
# Input validation function
|
||||
def valid_length(value, min_length=1, max_length=100):
|
||||
if not isinstance(value, str):
|
||||
return False
|
||||
return min_length <= len(value) <= max_length
|
||||
|
||||
|
||||
def valid_score(score, game_token):
|
||||
start_time = db.execute('SELECT created_at FROM snake_tokens WHERE token = %s', (game_token,)).fetchone()
|
||||
if not start_time:
|
||||
log.error("Game token not found.")
|
||||
return False
|
||||
|
||||
start_time = datetime.datetime.fromisoformat(start_time[0])
|
||||
current_time = datetime.datetime.now()
|
||||
elapsed_time = (current_time - start_time).total_seconds()
|
||||
|
||||
if elapsed_time < score / 10 * 3 + 10: # assuming that each point takes 3 seconds to achieve and 10 seconds to start the game and do captcha
|
||||
log.error("Score is too high for the elapsed time.")
|
||||
return False
|
||||
|
||||
if score <= 0 or score > 10000: # Arbitrary upper limit for scores
|
||||
log.error("Score is out of valid range.")
|
||||
return False
|
||||
|
||||
if score % 10 != 0:
|
||||
log.error("Score is not a multiple of 10.")
|
||||
return False
|
||||
|
||||
# delete the token after score validation
|
||||
db.execute('DELETE FROM snake_tokens WHERE token = %s', (game_token,))
|
||||
log.info(f"Score {score} validated successfully for token {game_token}.")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# Route for score submission
|
||||
@bp.route('/snake/submit', methods=['POST'])
|
||||
def submit_score():
|
||||
name = request.form.get('name')
|
||||
score = request.form.get('score')
|
||||
captcha_token = request.form.get('cap-token')
|
||||
game_token = request.form.get('game_token')
|
||||
|
||||
if not cap.verify_captcha(captcha_token):
|
||||
log.error("Captcha verification failed.")
|
||||
abort(400, "Captcha verification failed")
|
||||
|
||||
if not name or not score or not captcha_token or not game_token:
|
||||
log.error("Name, score, captcha token, or game token is missing.")
|
||||
abort(400, "Missing required fields")
|
||||
|
||||
if not valid_length(name, min_length=3, max_length=15):
|
||||
log.error("Invalid name length.")
|
||||
abort(400, "Name must be between 3 and 15 characters long.")
|
||||
|
||||
if not valid_score(int(score), game_token):
|
||||
log.error("Invalid score.")
|
||||
abort(400, "Score not vilid, so either you are trying to cheat the leaderboard or something is seriously wrong.")
|
||||
|
||||
try:
|
||||
db.execute('INSERT INTO snake_scores (name, score) VALUES (%s, %s)', (name, int(score)))
|
||||
db.execute('DELETE FROM snake_tokens WHERE token = %s', (game_token,))
|
||||
log.info(f"Score submitted: {name} - {score}")
|
||||
return redirect('/404')
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"Database error: {e}")
|
||||
abort(500, "Internal server error while submitting score.")
|
||||
|
||||
|
||||
# Generate a unique game token
|
||||
def generate_start_token():
|
||||
"""Generate a unique start token for the game."""
|
||||
token = urandom(16).hex()
|
||||
ip = request.headers.get('X-Forwarded-For', request.remote_addr)
|
||||
|
||||
ip_token = db.execute('SELECT token FROM snake_tokens WHERE ip = %s', (ip,)).fetchone()
|
||||
if ip_token:
|
||||
log.info(f"Token already exists for IP: {ip}, reusing token.")
|
||||
return ip_token[0]
|
||||
|
||||
log.info(f"Generated start token: {token}")
|
||||
db.execute('INSERT INTO snake_tokens (token, ip) VALUES (%s, %s)', (token, ip))
|
||||
return token
|
||||
|
||||
|
||||
# Get leaderboard scores
|
||||
def get_leaderboard():
|
||||
"""Fetch scores from the leaderboard."""
|
||||
try:
|
||||
scores = db.execute('SELECT name, score FROM snake_scores ORDER BY score DESC').fetchall()
|
||||
leaderboard = [{'position': i + 1, 'name': score[0], 'score': score[1]} for i, score in enumerate(scores)]
|
||||
log.info("Leaderboard fetched successfully.")
|
||||
return leaderboard
|
||||
except Exception as e:
|
||||
log.error(f"Error fetching leaderboard: {e}")
|
||||
return []
|
||||
|
||||
|
||||
# Clear all tokens older than 1 hour
|
||||
def clear_old_tokens():
|
||||
while True:
|
||||
try:
|
||||
one_hour_ago = datetime.datetime.now() - datetime.timedelta(hours=1)
|
||||
db.execute('DELETE FROM snake_tokens WHERE created_at < %s', (one_hour_ago,))
|
||||
log.info("Old tokens cleared.")
|
||||
except Exception as e:
|
||||
log.error(f"Error clearing old tokens: {e}")
|
||||
time.sleep(3600) # Run every hour
|
||||
|
||||
|
||||
# Start the token clearing thread
|
||||
token_thread = threading.Thread(target=clear_old_tokens, daemon=True)
|
||||
token_thread.start()
|
||||
@@ -1,42 +0,0 @@
|
||||
# Imports
|
||||
from os import getenv as env
|
||||
import requests, logging
|
||||
|
||||
|
||||
# Create logger
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Function to verify CAPTCHA response
|
||||
def verify_captcha(token: str) -> bool:
|
||||
"""
|
||||
Verify the CAP response token with the CAP server.
|
||||
|
||||
Args:
|
||||
token (str): The CAP response token to verify.
|
||||
|
||||
Returns:
|
||||
bool: True if the token is valid, False otherwise.
|
||||
"""
|
||||
if not token:
|
||||
return False
|
||||
|
||||
try:
|
||||
response = requests.post(
|
||||
f"https://cap.alfieking.dev/{env('CAP_KEY', default='')}/siteverify",
|
||||
json={
|
||||
'secret': env('CAP_SECRET', default=''),
|
||||
'response': token,
|
||||
},
|
||||
timeout=10
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
if response.status_code != 200:
|
||||
log.error("CAPTCHA verification failed with status code: %s", response.status_code)
|
||||
return False
|
||||
return response.json().get('success', False)
|
||||
|
||||
except Exception as e:
|
||||
log.error("Error verifying CAPTCHA: %s", e)
|
||||
return False
|
||||
@@ -1,23 +0,0 @@
|
||||
# Imports
|
||||
import psycopg2
|
||||
|
||||
class Database:
|
||||
def __init__(self, host, port, user, password, db_name):
|
||||
self.connection = psycopg2.connect(
|
||||
host=host,
|
||||
port=port,
|
||||
user=user,
|
||||
password=password,
|
||||
database=db_name
|
||||
)
|
||||
self.cursor = self.connection.cursor()
|
||||
|
||||
def execute(self, query, params=None):
|
||||
if params is None:
|
||||
params = []
|
||||
self.cursor.execute(query, params)
|
||||
self.connection.commit()
|
||||
return self.cursor
|
||||
|
||||
def close(self):
|
||||
self.connection.close()
|
||||
62
src/wsgi.py
62
src/wsgi.py
@@ -1,34 +1,33 @@
|
||||
# Imports
|
||||
from flask import Flask
|
||||
from flask import Flask, render_template, send_file
|
||||
from flask_session import Session
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from os import getenv as env, listdir
|
||||
import logging, importlib
|
||||
from os import getenv as env
|
||||
import logging
|
||||
|
||||
import src.routes.error_handlers
|
||||
import src.routes.dynamic_routes
|
||||
|
||||
# Load env
|
||||
load_dotenv()
|
||||
|
||||
# Create logger
|
||||
stream_handler = logging.StreamHandler()
|
||||
stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
||||
stream_handler.setLevel(logging.INFO)
|
||||
|
||||
# Create console log handler
|
||||
console_log = logging.StreamHandler()
|
||||
console_log.setFormatter(logging.Formatter("\033[1;32m%(asctime)s\033[0m - \033[1;34m%(levelname)s\033[0m - \033[1;31m%(name)s\033[0m - %(message)s"))
|
||||
console_log.setLevel(logging.INFO)
|
||||
|
||||
# Create file log handler
|
||||
file_log = logging.FileHandler(env('LOG_FILE', default='app.log'), mode=env('LOG_MODE', default='a'))
|
||||
file_log.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s"))
|
||||
file_log.setLevel(logging.DEBUG)
|
||||
file_handler = logging.FileHandler(filename='app.log')
|
||||
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
||||
file_handler.setLevel(logging.DEBUG)
|
||||
|
||||
# Add handlers to the logger
|
||||
log = logging.getLogger()
|
||||
log.setLevel(logging.DEBUG)
|
||||
log.addHandler(console_log)
|
||||
log.addHandler(file_log)
|
||||
log.addHandler(stream_handler)
|
||||
log.addHandler(file_handler)
|
||||
log.info("Logging initialized")
|
||||
|
||||
|
||||
# Create flask app
|
||||
app = Flask(
|
||||
__name__,
|
||||
@@ -41,18 +40,25 @@ app.config["SESSION_PERMANENT"] = True
|
||||
app.config["SESSION_TYPE"] = "filesystem"
|
||||
Session(app)
|
||||
|
||||
|
||||
# Load routes
|
||||
routes_dir = env('ROUTES_DIR', default='src/routes')
|
||||
for filename in listdir(routes_dir):
|
||||
if not filename.endswith('.py') and filename.startswith('__'):
|
||||
continue
|
||||
app.register_blueprint(src.routes.error_handlers.bp, url_prefix='/error')
|
||||
app.register_blueprint(src.routes.dynamic_routes.bp, url_prefix='/')
|
||||
|
||||
module_name = f"{routes_dir.replace('/', '.')}.{filename[:-3]}"
|
||||
try:
|
||||
module = importlib.import_module(module_name)
|
||||
if hasattr(module, 'bp'):
|
||||
app.register_blueprint(module.bp)
|
||||
log.info(f"Registered blueprint: {module_name}")
|
||||
except Exception as e:
|
||||
log.error(f"Failed to register blueprint {module_name}: {e}")
|
||||
# Generic routes
|
||||
@app.route('/')
|
||||
def index():
|
||||
return render_template('index.html')
|
||||
|
||||
@app.route('/favicon.ico')
|
||||
def favicon():
|
||||
return send_file('../static/content/other/favicon.ico')
|
||||
|
||||
@app.route('/robots.txt')
|
||||
def robots():
|
||||
return send_file('../static/content/other/robots.txt')
|
||||
|
||||
|
||||
# Route for sitemap.xml
|
||||
@app.route('/sitemap.xml')
|
||||
def sitemap():
|
||||
return send_file('../static/content/other/sitemap.xml')
|
||||
Reference in New Issue
Block a user