snake "security"

This commit is contained in:
2025-06-22 16:07:01 +01:00
parent f3d5cb9d53
commit d48dd04af9
4 changed files with 750 additions and 11 deletions

View File

@@ -22,8 +22,12 @@ log = logging.getLogger(__name__)
# Create database instance
db = database.Database(db_name=env('DB_NAME', default='db.sqlite'))
db.execute('CREATE TABLE IF NOT EXISTS snake_scores (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, score INTEGER)')
db.execute('CREATE TABLE IF NOT EXISTS snake_tokens (id INTEGER PRIMARY KEY AUTOINCREMENT, token TEXT UNIQUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)')
db.execute('''CREATE TABLE IF NOT EXISTS snake_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
token TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ip TEXT UNIQUE NOT NULL
)''')
# Input validation function
def valid_length(value, min_length=1, max_length=100):
@@ -49,6 +53,14 @@ def valid_score(score, game_token):
if score <= 0 or score > 10000: # Arbitrary upper limit for scores
log.error("Score is out of valid range.")
return False
if score % 10 != 0:
log.error("Score is not a multiple of 10.")
return False
# delete the token after score validation
db.execute('DELETE FROM snake_tokens WHERE token = ?', (game_token,))
log.info(f"Score {score} validated successfully for token {game_token}.")
return True
@@ -92,8 +104,15 @@ def submit_score():
def generate_start_token():
"""Generate a unique start token for the game."""
token = urandom(16).hex()
ip = request.headers.get('X-Forwarded-For', request.remote_addr)
ip_token = db.execute('SELECT token FROM snake_tokens WHERE ip = ?', (ip,)).fetchone()
if ip_token:
log.info(f"Token already exists for IP: {ip}, reusing token.")
return ip_token[0]
log.info(f"Generated start token: {token}")
db.execute('INSERT INTO snake_tokens (token) VALUES (?)', (token,))
db.execute('INSERT INTO snake_tokens (token, ip) VALUES (?, ?)', (token, ip))
return token

View File

@@ -14,13 +14,5 @@ class Database:
self.connection.commit()
return self.cursor
def fetchall(self, query, params=None):
cursor = self.execute(query, params)
return cursor.fetchall()
def fetchone(self, query, params=None):
cursor = self.execute(query, params)
return cursor.fetchone()
def close(self):
self.connection.close()