Files
The-Workers-Club/app.py

667 lines
27 KiB
Python

from __future__ import annotations
import os
from datetime import datetime
from flask import Flask, render_template, url_for, jsonify, Response, redirect
from requests import requests
app = Flask(__name__)
app.secret_key = os.getenv("SECRET_KEY")
def render_page(body_tpl: str, **ctx):
title = ctx.get("title") or APP_BRAND
return render_template ("base.html")
# =============================================================================
# Routes: Auth + Dashboard
# =============================================================================
@app.route("/")
def index():
if current_user.is_authenticated:
return redirect(url_for("dashboard"))
return redirect(url_for("login"))
@limiter.limit("5/minute")
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
user = User.query.filter_by(username=username).first()
if not user or not user.is_active or not user.check_password(password):
flash("Invalid credentials.", "error")
return redirect(url_for("login"))
login_user(user, remember=True)
if user.must_change_password:
flash("Please set a new password to continue.", "ok")
return redirect(url_for("change_password"))
return redirect(url_for("dashboard"))
return render_template ("tpl_login_body.html", title="Login")
@app.post("/logout")
@login_required
def logout():
logout_user()
flash("Signed out.", "ok")
return redirect(url_for("login"))
@app.route("/dashboard")
@login_required
def dashboard():
return render_template ("tpl_dashboard_body.html", title="Dashboard", user=current_user)
@app.route("/change-password", methods=["GET", "POST"])
@login_required
def change_password():
if request.method == "POST":
curr = request.form.get("current_password") or ""
new1 = request.form.get("new_password") or ""
new2 = request.form.get("confirm_password") or ""
if not current_user.check_password(curr):
flash("Current password is incorrect.", "error")
return redirect(url_for("change_password"))
if len(new1) < 10:
flash("Choose a stronger password (min 10 chars).", "error")
return redirect(url_for("change_password"))
if new1 != new2:
flash("Passwords do not match.", "error")
return redirect(url_for("change_password"))
current_user.set_password(new1)
current_user.must_change_password = False
db.session.commit()
flash("Password updated.", "ok")
return redirect(url_for("dashboard"))
return render_template ("tpl_change_password.html", title="Change Password", user=current_user)
# =============================================================================
# Secret Santa
# =============================================================================
@app.route("/secret-santa", methods=["GET", "POST"])
@login_required
def secret_santa():
entry = SecretSantaEntry.query.filter_by(user_id=current_user.id).first()
if request.method == "POST":
full_name = (request.form.get("full_name") or "").strip()
if not full_name:
flash("Name is required.", "error")
return redirect(url_for("secret_santa"))
age_raw = request.form.get("age")
birthday = (request.form.get("birthday") or "").strip()
hobbies = (request.form.get("hobbies") or "").strip()
gift_card = (request.form.get("gift_card") or "").strip()
fav_movie = (request.form.get("fav_movie") or "").strip()
jewelry = request.form.get("jewelry") == "yes"
try:
age = int(age_raw) if (age_raw or "").strip() else None
except ValueError:
age = None
if entry:
entry.full_name = full_name
entry.age = age
entry.birthday = birthday or None
entry.hobbies = hobbies
entry.gift_card = gift_card
entry.fav_movie = fav_movie
entry.jewelry = jewelry
else:
entry = SecretSantaEntry(
user_id=current_user.id,
full_name=full_name, age=age, birthday=birthday or None,
hobbies=hobbies, gift_card=gift_card, fav_movie=fav_movie,
jewelry=jewelry,
)
db.session.add(entry)
db.session.commit()
flash("Secret Santa saved.", "ok")
return redirect(url_for("secret_santa"))
form = {
"full_name": entry.full_name if entry else current_user.username,
"age": entry.age if entry else "",
"birthday": entry.birthday if entry else "",
"hobbies": entry.hobbies if entry else "",
"gift_card": entry.gift_card if entry else "",
"fav_movie": entry.fav_movie if entry else "",
"jewelry": bool(entry.jewelry) if (entry and entry.jewelry is not None) else False,
}
return render_template ("tpl_secret_santa.html", title="Secret Santa", form=form)
@app.get("/admin/secret-santa")
@admin_required
def admin_secret_santa():
rows = (SecretSantaEntry.query
.join(User, SecretSantaEntry.user_id == User.id)
.order_by(User.username.asc())
.all())
return render_template ("admin_secret_santa.html", title="Secret Santa", rows=rows)
# =============================================================================
# Request Off
# =============================================================================
@app.get("/request-off")
@login_required
def request_off():
my_reqs = (TimeOffRequest.query
.filter_by(user_id=current_user.id)
.order_by(TimeOffRequest.created_at.desc())
.all())
return render_template ("tpl_request_off_body.html", title="Request Off", my_reqs=my_reqs)
@app.post("/requests/new")
@login_required
def requests_new():
date = (request.form.get("date") or "").strip()
note = (request.form.get("note") or "").strip()[:240]
if not date or not _validate_iso_date(date):
flash("Please choose a valid date (YYYY-MM-DD).", "error")
return redirect(url_for("request_off"))
r = TimeOffRequest(user_id=current_user.id, date=date, note=note, status="pending")
db.session.add(r); db.session.commit()
flash("Request submitted.", "ok")
return redirect(url_for("request_off"))
@app.post("/requests/<int:req_id>/cancel")
@login_required
def requests_cancel(req_id: int):
r = db.session.get(TimeOffRequest, req_id)
if not r or r.user_id != current_user.id:
flash("Not found.", "error"); return redirect(url_for("request_off"))
if r.status != "pending":
flash("Only pending requests can be cancelled.", "error"); return redirect(url_for("request_off"))
r.status = "cancelled"; r.decided_at = datetime.utcnow(); db.session.commit()
flash("Request cancelled.", "ok"); return redirect(url_for("request_off"))
# =============================================================================
# Availability (member self-service)
# =============================================================================
@app.route("/availability", methods=["GET", "POST"])
@login_required
def availability():
aw = AvailabilityWeekly.query.filter_by(user_id=current_user.id).first()
if request.method == "POST":
week = _parse_week_from_form(request.form)
if not aw:
aw = AvailabilityWeekly(user_id=current_user.id, data_json=_json.dumps(week))
db.session.add(aw)
else:
aw.data_json = _json.dumps(week)
db.session.commit()
flash("Availability saved.", "ok")
return redirect(url_for("availability"))
week = _default_week()
if aw and aw.data_json:
try:
data = _json.loads(aw.data_json) or {}
for k,_ in DAY_NAMES:
if k in data:
week[k] = {
"avail": bool(data[k].get("avail")),
"start": (data[k].get("start") or "08:00")[:5],
"end": (data[k].get("end") or "17:00")[:5],
}
except Exception as e:
app.logger.warning("Failed to parse availability JSON for user %s: %s", current_user.id, e)
return render_template ("tpl_avail.html", title="Your Availability",
day_names=DAY_NAMES, week=week)
# =============================================================================
# Admin: Availability grid + EDITOR (NEW)
# =============================================================================
@app.get("/admin/availability")
@admin_required
def admin_availability():
rows = []
users = User.query.filter_by(is_active=True).order_by(User.username.asc()).all()
for u in users:
aw = AvailabilityWeekly.query.filter_by(user_id=u.id).first()
wk = _default_week()
if aw and aw.data_json:
try:
data = _json.loads(aw.data_json) or {}
for k,_ in DAY_NAMES:
if k in data:
wk[k] = {
"avail": bool(data[k].get("avail")),
"start": (data[k].get("start") or "08:00")[:5],
"end": (data[k].get("end") or "17:00")[:5],
}
except Exception as e:
app.logger.warning("Failed to parse availability JSON for user %s: %s", u.id, e)
rows.append({"user_id": u.id, "username": u.username, "week": wk})
return render_template ("admin_avail.html", title="Team Availability", rows=rows, day_names=DAY_NAMES)
@app.route("/admin/availability/edit", methods=["GET", "POST"])
@admin_required
def admin_availability_edit():
# Save for a specific user
if request.method == "POST":
try:
target_id = int(request.form.get("user_id") or "0")
except ValueError:
flash("Invalid user.", "error")
return redirect(url_for("admin_availability_edit"))
target = db.session.get(User, target_id)
if not target:
flash("User not found.", "error")
return redirect(url_for("admin_availability_edit"))
week = _parse_week_from_form(request.form)
aw = AvailabilityWeekly.query.filter_by(user_id=target.id).first()
if not aw:
aw = AvailabilityWeekly(user_id=target.id, data_json=_json.dumps(week))
db.session.add(aw)
else:
aw.data_json = _json.dumps(week)
db.session.commit()
flash(f"Availability saved for {target.username}.", "ok")
return redirect(url_for("admin_availability_edit", user_id=target.id))
# GET: pick a user then load their week
users = User.query.filter_by(is_active=True).order_by(User.username.asc()).all()
user_id = request.args.get("user_id", type=int)
user = db.session.get(User, user_id) if user_id else None
week = _default_week()
if user:
aw = AvailabilityWeekly.query.filter_by(user_id=user.id).first()
if aw and aw.data_json:
try:
data = _json.loads(aw.data_json) or {}
for k,_ in DAY_NAMES:
if k in data:
week[k] = {
"avail": bool(data[k].get("avail")),
"start": (data[k].get("start") or "08:00")[:5],
"end": (data[k].get("end") or "17:00")[:5],
}
except Exception as e:
app.logger.warning("Admin edit: bad JSON for user %s: %s", user.id, e)
return render_template ("admin_availability.html",
title="Edit Availability",
users=users, user=user, day_names=DAY_NAMES, week=week)
# =============================================================================
# Admin: Requests + Wishlists
# =============================================================================
@app.get("/admin/requests")
@admin_required
def admin_requests():
rows = (TimeOffRequest.query
.join(User, TimeOffRequest.user_id == User.id)
.order_by(TimeOffRequest.status.asc(), TimeOffRequest.created_at.desc())
.all())
return render_page(TPL_ADMIN_REQS_BODY, title="Time-off Requests", rows=rows)
@app.post("/admin/requests/<int:req_id>/<action>")
@admin_required
def admin_requests_action(req_id: int, action: str):
r = db.session.get(TimeOffRequest, req_id)
if not r:
flash("Request not found.", "error"); return redirect(url_for("admin_requests"))
if r.status != "pending":
flash("Request is already decided.", "error"); return redirect(url_for("admin_requests"))
if action not in ("approve","deny"):
flash("Invalid action.", "error"); return redirect(url_for("admin_requests"))
r.status = "approved" if action == "approve" else "denied"
r.decided_at = datetime.utcnow()
db.session.commit()
flash(f"Request {action}d.", "ok"); return redirect(url_for("admin_requests"))
@app.get("/admin/wishlists")
@admin_required
def admin_wishlists():
rows = (Wishlist.query
.join(User, Wishlist.user_id == User.id)
.order_by(User.username.asc())
.all())
return render_page(TPL_ADMIN_WISHLISTS_BODY, title="Wishlists", rows=rows)
# =============================================================================
# Exports
# =============================================================================
@app.get("/admin/secret-santa/export.csv")
@admin_required
def admin_secret_santa_export():
rows = (SecretSantaEntry.query.join(User).order_by(User.username.asc()).all())
sio = StringIO()
w = csv.writer(sio)
w.writerow(["username","full_name","age","birthday","hobbies","gift_card","fav_movie","jewelry","updated_at"])
for e in rows:
w.writerow([
e.user.username, e.full_name, e.age or "", e.birthday or "", e.hobbies or "",
e.gift_card or "", e.fav_movie or "", "Yes" if e.jewelry else "No",
e.updated_at.isoformat() if e.updated_at else ""
])
resp = make_response(sio.getvalue())
resp.headers["Content-Type"] = "text/csv; charset=utf-8"
resp.headers["Content-Disposition"] = "attachment; filename=secret_santa.csv"
return resp
@app.get("/admin/requests/export.csv")
@admin_required
def admin_requests_export():
rows = TimeOffRequest.query.join(User).order_by(TimeOffRequest.created_at.desc()).all()
sio = StringIO()
w = csv.writer(sio)
w.writerow(["username","date","status","note","created_at","decided_at"])
for r in rows:
w.writerow([
r.user.username, r.date, r.status, r.note or "",
r.created_at.isoformat(),
r.decided_at.isoformat() if r.decided_at else ""
])
resp = make_response(sio.getvalue())
resp.headers["Content-Type"] = "text/csv; charset=utf-8"
resp.headers["Content-Disposition"] = "attachment; filename=time_off_requests.csv"
return resp
# =============================================================================
# Admin: Users CRUD
# =============================================================================
@app.get("/admin/users")
@admin_required
def admin_users():
users = User.query.order_by(User.created_at.desc()).all()
return render_page(TPL_ADMIN_USERS_BODY, title="Users", users=users)
@app.post("/admin/users/create")
@admin_required
def admin_users_create():
username = (request.form.get("username") or "").strip()
role = (request.form.get("role") or "member").strip().lower()
role = "admin" if role == "admin" else "member"
if not username:
flash("Username is required.", "error")
return redirect(url_for("admin_users"))
if User.query.filter_by(username=username).first():
flash("Username already exists.", "error")
return redirect(url_for("admin_users"))
temp_pw = gen_temp_password()
u = User(username=username, role=role, is_active=True, must_change_password=True)
u.set_password(temp_pw)
db.session.add(u)
db.session.commit()
flash(f"User '{username}' created with temporary password: {temp_pw}", "ok")
return redirect(url_for("admin_users"))
@app.post("/admin/users/<int:user_id>/reset")
@admin_required
def admin_users_reset(user_id: int):
u = db.session.get(User, user_id)
if not u:
flash("User not found.", "error")
return redirect(url_for("admin_users"))
temp_pw = gen_temp_password()
u.set_password(temp_pw)
u.must_change_password = True
db.session.commit()
flash(f"Password reset for '{u.username}'. New temp: {temp_pw}", "ok")
return redirect(url_for("admin_users"))
@app.post("/admin/users/<int:user_id>/role")
@admin_required
def admin_users_role(user_id: int):
u = db.session.get(User, user_id)
if not u:
flash("User not found.", "error")
return redirect(url_for("admin_users"))
new_role = (request.form.get("role") or "member").lower()
u.role = "admin" if new_role == "admin" else "member"
db.session.commit()
flash(f"Role updated for '{u.username}'{u.role}", "ok")
return redirect(url_for("admin_users"))
@app.post("/admin/users/<int:user_id>/toggle")
@admin_required
def admin_users_toggle(user_id: int):
u = db.session.get(User, user_id)
if not u:
flash("User not found.", "error")
return redirect(url_for("admin_users"))
# Prevent locking yourself out of the last admin
if u.id == current_user.id and u.role == "admin":
admins = User.query.filter_by(role="admin", is_active=True).count()
if admins <= 1:
flash("You are the last active admin; cannot deactivate.", "error")
return redirect(url_for("admin_users"))
u.is_active = not u.is_active
db.session.commit()
flash(f"User '{u.username}' active={u.is_active}", "ok")
return redirect(url_for("admin_users"))
# =============================================================================
# JSON API + Health
# =============================================================================
@app.get("/api/me")
@login_required
def api_me():
u: User = current_user # type: ignore
return jsonify(
id=u.id, username=u.username, role=u.role,
must_change_password=u.must_change_password, is_active=u.is_active
)
# ---------------------------
# Info: user view
# ---------------------------
@app.get("/info")
@login_required
def info_page():
contacts = (InfoContact.query
.filter_by(is_active=True)
.order_by(InfoContact.priority.asc(), InfoContact.name.asc())
.all())
exts = (DeptExtension.query
.filter_by(is_active=True)
.order_by(DeptExtension.ext.asc())
.all())
supports = (SupportItem.query
.filter_by(is_active=True)
.filter((SupportItem.audience == "all") | (SupportItem.audience == ("admin" if current_user.is_admin() else "zzz")))
.order_by(SupportItem.category.asc())
.all())
admin_secrets = []
if current_user.is_admin():
admin_secrets = (LocalSecret.query
.filter_by(is_active=True)
.order_by(LocalSecret.updated_at.desc())
.all())
return render_page(TPL_INFO_PAGE, title="Store Info",
contacts=contacts, exts=exts, supports=supports, admin_secrets=admin_secrets)
# ---------------------------
# Info: admin console
# ---------------------------
@app.get("/admin/info")
@admin_required
def admin_info_console():
contacts = InfoContact.query.order_by(InfoContact.priority.asc(), InfoContact.name.asc()).all()
exts = DeptExtension.query.order_by(DeptExtension.ext.asc()).all()
supports = SupportItem.query.order_by(SupportItem.category.asc()).all()
secrets = LocalSecret.query.order_by(LocalSecret.updated_at.desc()).all()
return render_page(TPL_ADMIN_INFO, title="Info Admin",
contacts=contacts, exts=exts, supports=supports, secrets=secrets)
# ---- Contacts CRUD
@app.post("/admin/info/contacts/create")
@admin_required
def admin_info_contacts_create():
name = (request.form.get("name") or "").strip()
role = (request.form.get("role") or "").strip()
phone = (request.form.get("phone") or "").strip()
priority = request.form.get("priority", type=int) or 5
if not name:
flash("Name is required.", "error"); return redirect(url_for("admin_info_console"))
db.session.add(InfoContact(name=name, role=role, phone=phone, priority=max(1, min(priority, 9))))
db.session.commit()
flash("Contact added.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/contacts/<int:cid>/toggle")
@admin_required
def admin_info_contacts_toggle(cid:int):
c = db.session.get(InfoContact, cid)
if not c: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
c.is_active = not c.is_active; db.session.commit()
flash("Contact updated.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/contacts/<int:cid>/delete")
@admin_required
def admin_info_contacts_delete(cid:int):
c = db.session.get(InfoContact, cid)
if not c: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
db.session.delete(c); db.session.commit()
flash("Contact deleted.", "ok"); return redirect(url_for("admin_info_console"))
# ---- Dept Extensions CRUD
@app.post("/admin/info/exts/create")
@admin_required
def admin_info_exts_create():
ext = (request.form.get("ext") or "").strip()
dept = (request.form.get("dept") or "").strip()
if not ext or not dept:
flash("Extension and department are required.", "error"); return redirect(url_for("admin_info_console"))
db.session.add(DeptExtension(ext=ext, dept=dept))
db.session.commit()
flash("Extension added.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/exts/<int:did>/toggle")
@admin_required
def admin_info_exts_toggle(did:int):
d = db.session.get(DeptExtension, did)
if not d: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
d.is_active = not d.is_active; db.session.commit()
flash("Extension updated.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/exts/<int:did>/delete")
@admin_required
def admin_info_exts_delete(did:int):
d = db.session.get(DeptExtension, did)
if not d: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
db.session.delete(d); db.session.commit()
flash("Extension deleted.", "ok"); return redirect(url_for("admin_info_console"))
# ---- Support Items CRUD
@app.post("/admin/info/support/create")
@admin_required
def admin_info_support_create():
cat = (request.form.get("category") or "").strip()
email = (request.form.get("email") or "").strip()
phone = (request.form.get("phone") or "").strip()
note = (request.form.get("note") or "").strip()
admin_only = request.form.get("admin_only") == "on"
issues_text = (request.form.get("issues") or "").strip()
issues = [ln.strip() for ln in issues_text.splitlines() if ln.strip()]
if not cat:
flash("Category is required.", "error"); return redirect(url_for("admin_info_console"))
db.session.add(SupportItem(category=cat, email=email, phone=phone, note=note,
issues_json=_json.dumps(issues), audience=("admin" if admin_only else "all")))
db.session.commit()
flash("Support item added.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/support/<int:sid>/toggle")
@admin_required
def admin_info_support_toggle(sid:int):
s = db.session.get(SupportItem, sid)
if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
s.is_active = not s.is_active; db.session.commit()
flash("Support item updated.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/support/<int:sid>/delete")
@admin_required
def admin_info_support_delete(sid:int):
s = db.session.get(SupportItem, sid)
if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
db.session.delete(s); db.session.commit()
flash("Support item deleted.", "ok"); return redirect(url_for("admin_info_console"))
# ---- Secrets CRUD (admin-only)
@app.post("/admin/info/secret/create")
@admin_required
def admin_info_secret_create():
label = (request.form.get("label") or "").strip()
value = (request.form.get("value") or "").strip()
notes = (request.form.get("notes") or "").strip()
if not label or not value:
flash("Label and value are required.", "error"); return redirect(url_for("admin_info_console"))
db.session.add(LocalSecret(label=label, value=value, notes=notes))
db.session.commit()
flash("Secret added.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/secret/<int:sid>/toggle")
@admin_required
def admin_info_secret_toggle(sid:int):
s = db.session.get(LocalSecret, sid)
if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
s.is_active = not s.is_active; db.session.commit()
flash("Secret updated.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/secret/<int:sid>/delete")
@admin_required
def admin_info_secret_delete(sid:int):
s = db.session.get(LocalSecret, sid)
if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
db.session.delete(s); db.session.commit()
flash("Secret deleted.", "ok"); return redirect(url_for("admin_info_console"))
@app.get("/healthz")
def healthz():
return {"ok": True}, 200
# =============================================================================
# Error handlers
# =============================================================================
@app.errorhandler(403)
def error_403(e):
return render_page("<section><h1 class='text-xl font-bold'>Forbidden</h1><p class='text-slate-300 mt-2'>You don't have access to this resource.</p></section>"), 403
@app.errorhandler(404)
def error_404(e):
return render_page("<section><h1 class='text-xl font-bold'>Not Found</h1><p class='text-slate-300 mt-2'>We couldn't find what you were looking for.</p></section>"), 404
@app.errorhandler(500)
def error_500(e):
return render_page("<section><h1 class='text-xl font-bold'>Server Error</h1><p class='text-slate-300 mt-2'>Something went wrong. Please try again.</p></section>"), 500
# =============================================================================
# Main
# =============================================================================
if __name__ == "__main__":
app.run(debug=True, port=5000)