345 lines
13 KiB
Python
345 lines
13 KiB
Python
from flask import Flask, request, render_template, session, redirect
|
|
import database, logging, os, hashlib, html
|
|
from flask_session import Session
|
|
|
|
|
|
# Global variables
|
|
SYSTEMUID = None
|
|
SYSTEMBID = None
|
|
allowed_chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%£^&()-_=+[]{};:'\",.<>?/\\|`~ "
|
|
|
|
|
|
# Configure logging
|
|
console_log = logging.StreamHandler()
|
|
console_log.setFormatter(logging.Formatter("\033[1;32m%(asctime)s\033[0m - \033[1;34m%(levelname)s\033[0m - \033[1;31m%(name)s\033[0m - %(message)s"))
|
|
logger = logging.getLogger()
|
|
logger.addHandler(console_log)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
# Initialize Flask app
|
|
logger.info("Initializing Flask app...")
|
|
app = Flask(__name__, template_folder=os.getenv('TEMPLATE_FOLDER', 'html'), static_folder=os.getenv('STATIC_FOLDER', 'static'))
|
|
app.config["SESSION_PERMANENT"] = True
|
|
app.config["SESSION_TYPE"] = "filesystem"
|
|
Session(app)
|
|
logger.info("Flask app initialized.")
|
|
|
|
|
|
# Initialize database
|
|
logger.info("Initializing database...")
|
|
db = database.Database('app.db')
|
|
|
|
if db.get_user('SYSTEM') is None:
|
|
logger.info("Running first time setup...")
|
|
|
|
logger.info("Creating SYSTEM user...")
|
|
db.create_user('SYSTEM', 'SYSTEM')
|
|
SYSTEMUID = db.get_user('SYSTEM')[0]
|
|
logger.info("SYSTEM user created with UID: %s", SYSTEMUID)
|
|
|
|
logger.info("Creating SYSTEM board...")
|
|
db.create_board('System', 'System messages', SYSTEMUID)
|
|
db.create_board('General', 'General discussion', SYSTEMUID)
|
|
db.create_board('Linux', 'Linux discussion', SYSTEMUID)
|
|
db.create_board('Random', 'Random discussion', SYSTEMUID)
|
|
db.create_board('Tech', 'Tech discussion', SYSTEMUID)
|
|
db.create_board('Games', 'Games discussion', SYSTEMUID)
|
|
SYSTEMBID = db.get_board('System')[0]
|
|
logger.info("SYSTEM board created.")
|
|
|
|
logger.info("Creating First time setup post...")
|
|
db.create_post(SYSTEMBID, SYSTEMUID, 'Welcome!')
|
|
logger.info("First time setup post created.")
|
|
|
|
else:
|
|
SYSTEMUID = db.get_user('SYSTEM')[0]
|
|
SYSTEMBID = db.get_board('System')[0]
|
|
logger.info("SYSTEM user and board already exist.")
|
|
logger.info("SYSTEM user UID: %s", SYSTEMUID)
|
|
logger.info("SYSTEM board BID: %s", SYSTEMBID)
|
|
|
|
logger.info("Database initialized.")
|
|
|
|
|
|
# Helper functions
|
|
|
|
def sanitize_input(input_string):
|
|
logger.info("Sanitizing input...")
|
|
# Sanitize input to allow only certain characters
|
|
if not isinstance(input_string, str):
|
|
logger.error("Input is not a string.")
|
|
return None
|
|
sanitized = ''.join(c for c in input_string if c in allowed_chars)
|
|
sanitized = html.escape(sanitized)
|
|
logger.info("Sanitized input")
|
|
return sanitized
|
|
|
|
|
|
def hash_password(password):
|
|
logger.info("Hashing password...")
|
|
# Hash the password using SHA-256
|
|
if not isinstance(password, str):
|
|
logger.error("Password is not a string.")
|
|
return None
|
|
hashed = hashlib.sha256(password.encode()).hexdigest()
|
|
logger.info("Hashed password")
|
|
return hashed
|
|
|
|
|
|
# Define routes
|
|
@app.route('/')
|
|
def index():
|
|
logger.info("Rendering index page...")
|
|
latest_posts = db.get_latest_posts(5)
|
|
return render_template('index.html', posts=[db.post_to_dict(post) for post in latest_posts])
|
|
|
|
|
|
@app.route('/posts/<int:post_id>')
|
|
def post(post_id):
|
|
logger.info("Rendering post page for post ID: %s", post_id)
|
|
post = db.get_post(post_id)
|
|
if post is None:
|
|
logger.error("Post not found: %s", post_id)
|
|
return "Post not found", 404
|
|
|
|
post = db.post_to_dict(post)
|
|
page = request.args.get('page', 1, type=int)
|
|
|
|
logger.info("Post found: %s", post_id)
|
|
replies = db.get_post_references(post_id, 10, 10 * (page - 1))
|
|
if replies is None:
|
|
logger.error("No replies found for post ID: %s", post_id)
|
|
post['replies'] = []
|
|
else:
|
|
logger.info("Found %s replies for post ID: %s", len(replies), post_id)
|
|
post['replies'] = [db.post_to_dict(reply) for reply in replies]
|
|
post['replies'].sort(key=lambda x: x['created_at'], reverse=True)
|
|
|
|
return render_template('post.html', post=post, page=page)
|
|
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
logger.info("Rendering login page...")
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
username = sanitize_input(username)
|
|
password = sanitize_input(password)
|
|
token = db.create_session(username, hash_password(password))
|
|
if token:
|
|
logger.info("User %s logged in successfully.", username)
|
|
session['name'] = username
|
|
session['id'] = db.get_user(username)[0]
|
|
session['session'] = token
|
|
return redirect('/')
|
|
else:
|
|
logger.error("Invalid login attempt for user: %s", username)
|
|
return render_template('login.html', error="Invalid username or password.")
|
|
|
|
return render_template('login.html')
|
|
|
|
|
|
@app.route('/user/<string:username>')
|
|
def user(username):
|
|
logger.info("Rendering user page for user: %s", username)
|
|
username = sanitize_input(username)
|
|
user = db.get_user(username)
|
|
if user is None:
|
|
logger.error("User not found: %s", username)
|
|
return "User not found", 404
|
|
|
|
user = db.user_to_dict(user)
|
|
page = request.args.get('page', 1, type=int)
|
|
|
|
posts = db.get_user_posts(user['id'], 10, 10 * (page - 1))
|
|
if posts is None:
|
|
logger.error("No posts found for user: %s", username)
|
|
user['posts'] = []
|
|
else:
|
|
logger.info("Found %s posts for user: %s", len(posts), username)
|
|
user['posts'] = [db.post_to_dict(post) for post in posts]
|
|
|
|
return render_template('user.html', user=user, page=page)
|
|
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
logger.info("Logging out user: %s", session.get('name'))
|
|
session.clear()
|
|
return redirect('/')
|
|
|
|
|
|
@app.route('/posts')
|
|
def posts():
|
|
logger.info("Rendering posts page...")
|
|
page = request.args.get('page', 1, type=int)
|
|
posts = db.get_posts(None, 10, 10 * (page - 1))
|
|
if posts is None:
|
|
logger.error("No posts found.")
|
|
return "No posts found", 404
|
|
|
|
logger.info("Found %s posts.", len(posts))
|
|
return render_template('posts.html', posts=[db.post_to_dict(post) for post in posts], page=page)
|
|
|
|
|
|
@app.route('/boards')
|
|
def boards():
|
|
logger.info("Rendering boards page...")
|
|
|
|
page = request.args.get('page', 1, type=int)
|
|
boards = db.get_boards(10, 10 * (page - 1))
|
|
|
|
if boards is None:
|
|
logger.error("No boards found.")
|
|
return "No boards found", 404
|
|
|
|
logger.info("Found %s boards.", len(boards))
|
|
return render_template('boards.html', boards=[db.board_to_dict(board) for board in boards], page=page)
|
|
|
|
|
|
@app.route('/boards/<string:board>')
|
|
def board(board):
|
|
logger.info("Rendering board page for board: %s", board)
|
|
board = sanitize_input(board)
|
|
board = db.get_board(board)
|
|
if board is None:
|
|
logger.error("Board not found: %s", board)
|
|
return "Board not found", 404
|
|
board_id = board[0]
|
|
|
|
board = db.board_to_dict(board)
|
|
page = request.args.get('page', 1, type=int)
|
|
|
|
posts = db.get_board_posts(board_id, 10, 10 * (page - 1))
|
|
if posts is None:
|
|
logger.error("No posts found for board ID: %s", board_id)
|
|
board['posts'] = []
|
|
else:
|
|
logger.info("Found %s posts for board ID: %s", len(posts), board_id)
|
|
board['posts'] = [db.post_to_dict(post) for post in posts]
|
|
|
|
return render_template('board.html', board=board, page=page)
|
|
|
|
|
|
@app.route('/post', methods=['POST', 'GET'])
|
|
def post_create():
|
|
logger.info("Rendering post creation page...")
|
|
if request.method == 'POST':
|
|
content = request.form['content']
|
|
board_id = request.form['board']
|
|
ref = request.form.get('ref', None)
|
|
token = session.get('session')
|
|
|
|
# sanitize input
|
|
content = sanitize_input(content)
|
|
try:
|
|
board_id = int(board_id)
|
|
except ValueError:
|
|
logger.error("Invalid board ID: %s", board_id)
|
|
return render_template('newpost.html', error="Invalid board ID.")
|
|
if not content:
|
|
logger.error("Post content is empty.")
|
|
return render_template('newpost.html', error="Post content cannot be empty.")
|
|
if not board_id:
|
|
logger.error("Board ID is empty.")
|
|
return render_template('newpost.html', error="Board ID cannot be empty.")
|
|
if not token:
|
|
logger.error("Session token is missing.")
|
|
return render_template('newpost.html', error="Session expired. Please log in again.")
|
|
if ref:
|
|
try:
|
|
ref = int(ref)
|
|
except ValueError:
|
|
logger.error("Invalid reference post ID: %s", ref)
|
|
return render_template('newpost.html', error="Invalid reference post ID.")
|
|
if len(content) > 10000:
|
|
logger.error("Post content exceeds maximum length.")
|
|
return render_template('newpost.html', error="Post content exceeds maximum length.")
|
|
|
|
user = db.get_session(token)
|
|
if user is None:
|
|
logger.error("Session not found or expired.")
|
|
return render_template('newpost.html', error="Session expired. Please log in again.")
|
|
user_id = user[0]
|
|
|
|
if board_id == SYSTEMBID and user_id != SYSTEMUID:
|
|
logger.error("User %s is not allowed to post in SYSTEM board.", user_id)
|
|
return render_template('newpost.html', error="You are not allowed to post in this board.")
|
|
|
|
if ref:
|
|
ref = db.get_post(ref)
|
|
if ref is None:
|
|
logger.error("Reference post not found: %s", ref)
|
|
return render_template('newpost.html', error="Reference post not found.")
|
|
ref = ref[0]
|
|
|
|
logger.info("Creating post in board ID: %s", board_id)
|
|
status = db.create_post(user_id, board_id, content, ref)
|
|
|
|
if type(status) is not int:
|
|
logger.error("Post creation failed.")
|
|
return "Post creation failed", 500
|
|
|
|
logger.info("Post created successfully.")
|
|
if request.form.get('redirect'):
|
|
return redirect(request.form.get('redirect'))
|
|
return redirect('/posts/' + str(status))
|
|
return render_template('newpost.html', boards=db.get_all_boards_for_post())
|
|
|
|
|
|
@app.route('/register', methods=['GET', 'POST'])
|
|
def register():
|
|
logger.info("Rendering registration page...")
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
username = sanitize_input(username)
|
|
password = sanitize_input(password)
|
|
|
|
if not username or not password:
|
|
logger.error("Username or password is empty.")
|
|
return render_template('register.html', error="Username and password cannot be empty.")
|
|
|
|
if len(username) > 20:
|
|
logger.error("Username length is invalid.")
|
|
return render_template('register.html', error="Username must be less than 20 characters.")
|
|
|
|
if len(password) < 6 or len(password) > 200:
|
|
logger.error("Password length is invalid.")
|
|
return render_template('register.html', error="Password must be between 6 and 200 characters.")
|
|
|
|
if db.get_user(username):
|
|
logger.error("Username already exists: %s", username)
|
|
return render_template('register.html', error="Username already exists.")
|
|
|
|
hashed_password = hash_password(password)
|
|
db.create_user(username, hashed_password)
|
|
logger.info("User %s registered successfully.", username)
|
|
db.create_post(SYSTEMUID, SYSTEMUID, f"New user \"{username}\" registered.")
|
|
# Create a session for the new user
|
|
token = db.create_session(username, hashed_password)
|
|
if token:
|
|
session['name'] = username
|
|
session['id'] = db.get_user(username)[0]
|
|
session['session'] = token
|
|
logger.info("User %s logged in after registration.", username)
|
|
return redirect('/')
|
|
else:
|
|
logger.error("Session creation failed for user: %s", username)
|
|
return render_template('register.html', error="Session creation failed.")
|
|
|
|
logger.info("GET request for registration page.")
|
|
if session.get('name'):
|
|
logger.info("User %s is already logged in.", session['name'])
|
|
return redirect('/')
|
|
logger.info("Rendering registration form.")
|
|
|
|
return render_template('register.html')
|
|
|
|
|
|
# Main function
|
|
if __name__ == '__main__':
|
|
logger.info("Starting Flask app...")
|
|
app.run(host=os.getenv('FLASK_HOST', '0.0.0.0'), port=int(os.getenv('FLASK_PORT', 5000)), debug=os.getenv('FLASK_DEBUG', 'false').lower() == 'true') |