prismic/src/main.py
2025-04-25 00:33:14 +01:00

697 lines
26 KiB
Python

from flask import Flask, render_template, request, send_file, session, redirect
from database import FETCHALL, FETCHONE
from flask_session import Session
import logging, database, utils
from os import getenv as env
from os import urandom
from io import BytesIO
from os import path
import hashlib
# if .env file exists, load it
if path.exists('.env'):
from dotenv import load_dotenv
load_dotenv()
# Create console handler with a specific format
console_log = logging.StreamHandler()
console_log.setFormatter(logging.Formatter("\033[1;32m%(asctime)s\033[0m - \033[1;34m%(levelname)s\033[0m - \033[1;31m%(name)s\033[0m - %(message)s"))
console_log.setLevel(logging.INFO)
# Create file handler with a specific format
file_log = logging.FileHandler(env('LOG_FILE', default='app.log'))
file_log.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s"))
file_log.setLevel(logging.DEBUG)
# Add handlers to the logger
log = logging.getLogger()
log.setLevel(logging.DEBUG)
log.addHandler(console_log)
log.addHandler(file_log)
log.info("Logging initialized")
# Initialize Flask
app = Flask(
__name__,
template_folder=env('TEMPLATE_FOLDER', default='html'),
static_folder=env('STATIC_FOLDER', default='static'),
)
app.config["SESSION_PERMANENT"] = True
app.config["SESSION_TYPE"] = "filesystem"
Session(app)
log.info("Flask initialized")
# Initialize Database
db = database.Database()
# Check if DB_TYPE is set to mysql else default to sqlite
if env('DB_TYPE') == 'mysql':
db.connect_mysql()
else:
db.connect_sqlite(env('DB_PATH', default='database.db'))
# check for first time run (if users table exists with any user)
try:
db.execute_query("SELECT * FROM users", fetch_type=FETCHONE)
except:
log.info("First time run detected, initializing database")
database.first_time_run(db)
# Configure utils
log.info("Configuring utils")
conv = utils.data_converter(db)
# Define routes
@app.route('/')
def index():
log.debug("Rendering index page")
# Get the total number of users, boards, and posts
total_users = db.execute_query("SELECT COUNT(*) FROM users", fetch_type=FETCHONE)[0]
total_boards = db.execute_query("SELECT COUNT(*) FROM boards", fetch_type=FETCHONE)[0]
total_posts = db.execute_query("SELECT COUNT(*) FROM posts", fetch_type=FETCHONE)[0]
total_attachments = db.execute_query("SELECT COUNT(*) FROM attachments", fetch_type=FETCHONE)[0]
log.debug(f"Total users: {total_users}, Total boards: {total_boards}, Total posts: {total_posts}, Total attachments: {total_attachments}")
return render_template(
'index.html',
total_users=total_users,
total_boards=total_boards,
total_posts=total_posts,
total_attachments=total_attachments,
)
@app.route('/boards')
def boards():
log.debug("Rendering boards page")
# Get the list of boards from the database
page = request.args.get('page', 1, type=int)
boards = db.execute_query("SELECT * FROM boards ORDER BY id LIMIT ? OFFSET ?", (10, (page - 1) * 10), fetch_type=FETCHALL)
boards = [conv.board_to_dict(board) for board in boards]
total_boards = db.execute_query("SELECT COUNT(*) FROM boards", fetch_type=FETCHONE)[0]
total_pages = (total_boards + 9) // 10
# Render the boards page with pagination
log.debug(f"Total boards: {total_boards}, Total pages: {total_pages}")
return render_template('boards.html', boards=boards, page=page, total_pages=total_pages)
@app.route('/boards/<string:board_name>')
def board(board_name):
log.debug(f"Validating board name")
# Validate the board name
if not utils.validate_board_name(board_name):
log.error(f"Invalid board name: {board_name}")
return "Invalid board name", 400
log.debug(f"Rendering board page for {board_name}")
# Get the board from the database
board = db.execute_query("SELECT * FROM boards WHERE name = ?", (board_name,), fetch_type=FETCHONE)
if not board:
log.error(f"Board {board_name} not found")
return "Board not found", 404
# Get the posts for the board
page = request.args.get('page', 1, type=int)
posts = db.execute_query(
"SELECT * FROM posts WHERE board_id = ? AND type = ? ORDER BY created_at DESC LIMIT ? OFFSET ?",
(board[0], 'post', 10, (page - 1) * 10),
fetch_type=FETCHALL
)
total_posts = db.execute_query("SELECT COUNT(*) FROM posts WHERE board_id = ?", (board[0],), fetch_type=FETCHONE)[0]
total_pages = (total_posts + 9) // 10
posts = [conv.post_to_dict(post) for post in posts]
log.debug(f"Total posts: {total_posts}, Total pages: {total_pages}")
return render_template('board.html', board=conv.board_to_dict(board), posts=posts, page=page, total_pages=total_pages)
@app.route('/posts/<int:post_id>')
def post(post_id):
log.debug(f"Rendering post page for post ID {post_id}")
# Get the post from the database
post = db.execute_query("SELECT * FROM posts WHERE id = ?", (post_id,), fetch_type=FETCHONE)
if not post:
log.error(f"Post {post_id} not found")
return "Post not found", 404
# Get the comments for the post
page = request.args.get('page', 1, type=int)
comments = db.execute_query("SELECT * FROM posts WHERE reference = ? ORDER BY created_at DESC LIMIT ? OFFSET ?", (post_id, 10, (page - 1) * 10), fetch_type=FETCHALL)
total_comments = db.execute_query("SELECT COUNT(*) FROM posts WHERE reference = ?", (post_id,), fetch_type=FETCHONE)[0]
total_pages = (total_comments + 9) // 10
comments = [conv.post_to_dict(comment, False) for comment in comments]
return render_template('post.html', post=conv.post_to_dict(post), replies=comments, page=page, total_pages=total_pages)
@app.route('/posts')
def posts():
log.debug("Rendering posts page")
# Get the list of posts from the database
page = request.args.get('page', 1, type=int)
posts = db.execute_query("SELECT * FROM posts WHERE type = ? ORDER BY created_at DESC LIMIT ? OFFSET ?", ('post', 10, (page - 1) * 10), fetch_type=FETCHALL)
total_posts = db.execute_query("SELECT COUNT(*) FROM posts WHERE type = ?", ('post',), fetch_type=FETCHONE)[0]
total_pages = (total_posts + 9) // 10
posts = [conv.post_to_dict(post) for post in posts]
log.debug(f"Total posts: {total_posts}, Total pages: {total_pages}")
return render_template('posts.html', posts=posts, page=page, total_pages=total_pages)
@app.route('/attachment/<int:attachment_id>')
def attachment(attachment_id):
log.debug(f"Rendering attachment page for attachment ID {attachment_id}")
# Get the attachment from the database
attachment = db.execute_query("SELECT * FROM attachments WHERE id = ?", (attachment_id,), fetch_type=FETCHONE)
if not attachment:
log.error(f"Attachment {attachment_id} not found")
return "Attachment not found", 404
# Render the attachment page
log.debug(f"Attachment found: {attachment}")
file = BytesIO(attachment[2])
return send_file(
file,
mimetype=attachment[4],
as_attachment=False,
download_name=attachment[3]
)
@app.route('/users')
def users():
log.debug("Rendering users page")
# Get the list of users from the database
page = request.args.get('page', 1, type=int)
users = db.execute_query("SELECT * FROM users ORDER BY id LIMIT ? OFFSET ?", (10, (page - 1) * 10), fetch_type=FETCHALL)
total_users = db.execute_query("SELECT COUNT(*) FROM users", fetch_type=FETCHONE)[0]
total_pages = (total_users + 9) // 10
users = [conv.user_to_dict(user) for user in users]
log.debug(f"Total users: {total_users}, Total pages: {total_pages}")
return render_template('users.html', users=users, page=page, total_pages=total_pages)
@app.route('/users/<string:user>')
def user(user):
log.debug(f"Validating user name")
# Validate the user name
if not utils.validate_username(user):
log.error(f"Invalid user name: {user}")
return "Invalid user name", 400
log.debug(f"Rendering user page for {user}")
# Get the user from the database
user = db.execute_query("SELECT * FROM users WHERE username = ?", (user,), fetch_type=FETCHONE)
if not user:
log.error(f"User {user} not found")
return "User not found", 404
# Get the posts for the user
page = request.args.get('page', 1, type=int)
posts = db.execute_query("SELECT * FROM posts WHERE user_id = ? AND type = ? ORDER BY created_at DESC LIMIT ? OFFSET ?", (user[0], 'post', 10, (page - 1) * 10), fetch_type=FETCHALL)
total_posts = db.execute_query("SELECT COUNT(*) FROM posts WHERE user_id = ? AND type = ?", (user[0], 'post'), fetch_type=FETCHONE)[0]
total_pages = (total_posts + 9) // 10
posts = [conv.post_to_dict(post) for post in posts]
log.debug(f"Total posts: {total_posts}, Total pages: {total_pages}")
return render_template('user.html', user=conv.user_to_dict(user), posts=posts, page=page, total_pages=total_pages)
@app.route('/login', methods=['GET'])
def login():
log.debug("Rendering login page")
# Render the login page
return render_template('login.html')
@app.route('/login', methods=['POST'])
def login_post():
log.debug("Processing login form")
# Process the login form
username = request.form.get('username')
if not username:
log.error("No username provided")
return render_template('login.html', error="No username provided")
password = request.form.get('password')
if not password:
log.error("No password provided")
return render_template('login.html', error="No password provided")
# Hash the password
try:
password = password.encode()
except AttributeError:
log.error("Password encoding failed")
return render_template('login.html', error="Password encoding failed")
password = hashlib.sha256(password).hexdigest()
log.debug(f"Hashed password: {password}")
# validate the username and password
if not utils.validate_username(username):
log.error("Invalid username")
return render_template('login.html', error="Invalid username")
# Validate the user
user = db.execute_query("SELECT * FROM users WHERE username = ? AND password = ?", (username, password), fetch_type=FETCHONE)
if not user:
log.error("Invalid username or password")
return render_template('login.html', error="Invalid username or password")
# make token
token = urandom(16).hex()
db.execute_query("UPDATE users SET token = ? WHERE id = ?", (token, user[0]))
log.debug(f"Token generated for user {username}")
# Log in the user
session['user_id'] = user[0]
session['name'] = user[1]
session['perms'] = user[4]
session['token'] = token
log.info(f"User {username} logged in")
return redirect('/')
@app.route('/register', methods=['GET'])
def register():
log.debug("Rendering register page")
# Render the register page
return render_template('register.html')
@app.route('/register', methods=['POST'])
def register_post():
log.debug("Processing register form")
# Process the register form
username = request.form.get('username')
if not username:
log.error("No username provided")
return render_template('login.html', error="No username provided")
password = request.form.get('password')
if not password:
log.error("No password provided")
return render_template('login.html', error="No password provided")
# Hash the password
try:
password = password.encode()
except AttributeError:
log.error("Password encoding failed")
return render_template('register.html', error="Password encoding failed")
password = hashlib.sha256(password).hexdigest()
log.debug(f"Hashed password: {password}")
# Validate the username and password
if not utils.validate_username(username):
log.error("Invalid username")
return render_template('register.html', error="Invalid username")
# Check if the user already exists
user = db.execute_query("SELECT * FROM users WHERE username = ?", (username,), fetch_type=FETCHONE)
if user:
log.error("User already exists")
return render_template('register.html', error="User already exists")
# Create the user
db.execute_query("INSERT INTO users (username, password) VALUES (?, ?)", (username, password))
log.info(f"User {username} registered successfully")
# Log in the user
user = db.execute_query("SELECT * FROM users WHERE username = ?", (username,), fetch_type=FETCHONE)
token = urandom(16).hex()
db.execute_query("UPDATE users SET token = ? WHERE id = ?", (token, user[0]))
log.debug(f"Token generated for user {username}")
session['user_id'] = user[0]
session['name'] = user[1] # Log in the user
session['perms'] = user[4]
session['token'] = token
log.info(f"User {username} logged in")
return redirect('/')
@app.route('/logout', methods=['GET'])
def logout():
log.debug("Logging out user")
# Log out the user
session.clear()
return render_template('index.html')
@app.route('/settings', methods=['GET'])
def settings():
log.debug("Rendering settings page")
# Check if the user is logged in
if 'token' not in session:
log.error("User not logged in")
return redirect('/login')
# Render the settings page
return render_template('settings.html')
@app.route('/settings', methods=['POST'])
def settings_post():
log.debug("Processing settings form")
# Process the settings form
option = request.form.get('type')
if not option:
log.error("No option provided")
return render_template('settings.html', error="No option provided")
token = session['token']
if not token:
log.error("Token not found")
return render_template('settings.html', error="Token not found")
# Validate the token
user = db.execute_query("SELECT * FROM users WHERE id = ? AND token = ?", (session['user_id'], token), fetch_type=FETCHONE)
if not user:
log.error("Invalid token")
return render_template('settings.html', error="Invalid token")
log.debug(f"Token validated for user {user[1]}")
if option == 'username':
username = request.form.get('username')
if not username:
log.error("No username provided")
return render_template('settings.html', error="No username provided")
if not utils.validate_username(username):
log.error("Invalid username format")
return render_template('settings.html', error="Invalid username format")
# Check if the username already exists
existing_user = db.execute_query("SELECT * FROM users WHERE username = ?", (username,), fetch_type=FETCHONE)
if existing_user:
log.error("Username already exists")
return render_template('settings.html', error="Username already exists")
# Update the username
db.execute_query("UPDATE users SET username = ? WHERE id = ?", (username, session['user_id']))
session['name'] = username
log.debug(f"Username updated to {username}")
elif option == 'password':
password = request.form.get('password')
if not password:
log.error("No password provided")
return render_template('settings.html', error="No password provided")
# Hash the password
try:
password = password.encode()
except AttributeError:
log.error("Password encoding failed")
return render_template('settings.html', error="Password encoding failed")
password = hashlib.sha256(password).hexdigest()
# Update the password
db.execute_query("UPDATE users SET password = ? WHERE id = ?", (password, session['user_id']))
log.debug("Password updated successfully")
elif option == 'avatar':
# Get the file from the request
file = request.files.get('avatar')
if not file:
log.error("No file uploaded")
return render_template('settings.html', error="No file uploaded")
if file.mimetype not in ['image/png', 'image/jpeg', 'image/jpg', 'image/gif']:
log.error("Invalid file type")
return render_template('settings.html', error="Invalid file type")
file_data = file.read()
if not file:
log.error("File is empty")
return render_template('settings.html', error="File is empty")
# Validate the file size 12mb
if len(file_data) > 12 * 1024 * 1024:
log.error("File is too large")
return render_template('settings.html', error="File is too large")
# Save the file to the database
db.execute_query("UPDATE users SET avatar = ?, avatar_type = ? WHERE id = ?", (file_data, file.mimetype, session['user_id']))
log.debug("Avatar updated successfully")
elif option == 'about':
about = request.form.get('about')
if not about:
log.error("No about text provided")
return render_template('settings.html', error="No about text provided")
# Update the about text
db.execute_query("UPDATE users SET about = ? WHERE id = ?", (about, session['user_id']))
log.debug("About text updated successfully")
elif option == 'delete':
# Delete the user
db.execute_query("DELETE FROM users WHERE id = ?", (session['user_id'],))
log.debug("User deleted successfully")
session.clear()
return redirect('/')
else:
log.error("Invalid option")
return render_template('settings.html', error="Invalid option")
return render_template('settings.html', success="Settings updated successfully")
@app.route('/users/<int:user_id>/avatar')
def user_avatar(user_id):
log.debug(f"Rendering avatar for user ID {user_id}")
# Get the user from the database
user = db.execute_query("SELECT * FROM users WHERE id = ?", (user_id,), fetch_type=FETCHONE)
if not user:
log.error(f"User {user_id} not found")
return "User not found", 404
# Render the avatar page
file = BytesIO(user[5])
return send_file(
file,
mimetype=user[8],
as_attachment=False,
download_name=f"{user[1]}_avatar.{user[8].split('/')[1]}" # Extract the file extension from the MIME type
)
@app.route('/boards/new', methods=['GET'])
def new_board():
log.debug("Rendering new board page")
# Check if the user is logged in
if 'token' not in session:
log.error("User not logged in")
return redirect('/login')
# Render the new board page
return render_template('new_board.html')
@app.route('/boards/new', methods=['POST'])
def new_board_post():
log.debug("Processing new board form")
# Check if the user is logged in
if 'token' not in session:
log.error("User not logged in")
return redirect('/login')
# Validate the token
user = db.execute_query("SELECT * FROM users WHERE id = ? AND token = ?", (session['user_id'], session['token']), fetch_type=FETCHONE)
if not user:
log.error("Invalid token")
return redirect('/login')
log.debug(f"Token validated for user {user[1]}")
# Process the new board form
name = request.form.get('name')
if not name:
log.error("No board name provided")
return render_template('new_board.html', error="No board name provided")
description = request.form.get('description')
if not description:
log.error("No board description provided")
return render_template('new_board.html', error="No board description provided")
# Validate the board name
if not utils.validate_board_name(name):
log.error("Invalid board name format")
return render_template('new_board.html', error="Invalid board name format")
# Check if the board already exists
board = db.execute_query("SELECT * FROM boards WHERE name = ?", (name,), fetch_type=FETCHONE)
if board:
log.error("Board already exists")
return render_template('new_board.html', error="Board already exists")
# Create the board
db.execute_query("INSERT INTO boards (name, description, owner_id) VALUES (?, ?, ?)", (name, description, session['user_id']))
log.info(f"Board {name} created successfully")
return redirect('/boards')
@app.route('/boards/delete/<int:board_id>', methods=['GET'])
def delete_board(board_id):
log.debug(f"Processing delete board request for board ID {board_id}")
# Check if the user is logged in
if 'token' not in session:
log.error("User not logged in")
return redirect('/login')
# Validate the token
user = db.execute_query("SELECT * FROM users WHERE id = ? AND token = ?", (session['user_id'], session['token']), fetch_type=FETCHONE)
if not user:
log.error("Invalid token")
return redirect('/login')
log.debug(f"Token validated for user {user[1]}")
# Delete the board
db.execute_query("DELETE FROM boards WHERE id = ?", (board_id,))
log.info(f"Board ID {board_id} deleted successfully")
return redirect('/boards')
@app.route('/post', methods=['POST'])
def new_post():
log.debug("Processing new post form")
# Check if the user is logged in
if 'token' not in session:
log.error("User not logged in")
return redirect('/login')
# Validate the token
user = db.execute_query("SELECT * FROM users WHERE id = ? AND token = ?", (session['user_id'], session['token']), fetch_type=FETCHONE)
if not user:
log.error("Invalid token")
return redirect('/login')
log.debug(f"Token validated for user {user[1]}")
# Process the new post form
board_id = request.form.get('board_id')
if not board_id:
log.error("No board ID provided")
return render_template('error.html', error="No board ID provided")
# Validate the board ID
try:
board_id = int(board_id)
# Check if the board exists
board = db.execute_query("SELECT * FROM boards WHERE id = ?", (board_id,), fetch_type=FETCHONE)
if not board:
log.error("Invalid board ID")
return render_template('error.html', error="Invalid board ID")
except ValueError:
log.error("Invalid board ID")
return render_template('error.html', error="Invalid board ID")
log.debug(f"Board ID {board_id} validated")
content = request.form.get('content')
if not content:
log.error("No post content provided")
return render_template('error.html', error="No post content provided")
attachments = request.files.getlist('attachments')
reference = request.form.get('reference')
if reference:
try:
reference = int(reference)
# Validate the reference ID
post = db.execute_query("SELECT * FROM posts WHERE id = ?", (reference,), fetch_type=FETCHONE)
if not post:
log.error("Invalid reference ID")
return render_template('error.html', error="Invalid reference ID")
log.debug(f"Reference ID {reference} validated")
except ValueError:
log.error("Invalid reference ID")
return render_template('error.html', error="Invalid reference ID")
else:
reference = None
post_type = request.form.get('type', 'post')
if post_type not in ['post', 'comment']:
log.error("Invalid post type")
return render_template('error.html', error="Invalid post type")
# create the post
db.execute_query(
"INSERT INTO posts (user_id, board_id, content, reference, type) VALUES (?, ?, ?, ?, ?)",
(session['user_id'], board_id, content, reference, post_type))
# get the post ID
post_id = db.cursor.lastrowid
log.debug(f"Post ID {post_id} created successfully")
# upload attachments
for attachment in attachments:
# Validate the file size 12mb
file = attachment.read()
if not file:
continue
if len(file) > 12 * 1024 * 1024:
log.error("Attachment is too large")
# Delete the post if the attachment is too large
db.execute_query("DELETE FROM posts WHERE id = ?", (post_id,))
log.debug(f"Post ID {post_id} deleted due to large attachment")
return render_template('error.html', error="Attachment is too large")
# Save the file to the database
db.execute_query("INSERT INTO attachments (post_id, file_name, file_data, file_type) VALUES (?, ?, ?, ?)", (post_id, attachment.filename, file, attachment.mimetype))
log.debug(f"Attachment {attachment.filename} uploaded successfully")
# get the board name
board = db.execute_query("SELECT * FROM boards WHERE id = ?", (board_id,), fetch_type=FETCHONE)
if not board:
log.error("Invalid board ID")
return render_template('error.html', error="Invalid board ID")
return redirect(f'/boards/{board[1]}')
@app.route('/delete/post/<int:post_id>', methods=['GET'])
def delete_post(post_id):
log.debug(f"Processing delete post request for post ID {post_id}")
# Check if the user is logged in
if 'token' not in session:
log.error("User not logged in")
return redirect('/login')
# Validate the token
user = db.execute_query("SELECT * FROM users WHERE id = ? AND token = ?", (session['user_id'], session['token']), fetch_type=FETCHONE)
if not user:
log.error("Invalid token")
return redirect('/login')
log.debug(f"Token validated for user {user[1]}")
# Delete the post
db.execute_query("DELETE FROM posts WHERE id = ?", (post_id,))
log.info(f"Post ID {post_id} deleted successfully")
redirect_url = request.referrer
if not redirect_url:
redirect_url = '/posts'
return redirect(redirect_url)
@app.route('/error/<error_message>')
def error(error_message):
log.debug(f"Rendering error page with message: {error_message}")
# Render the error page
return render_template('error.html', error=error_message)
# Run the app
if __name__ == '__main__':
pass