from __future__ import annotations import os from datetime import datetime from flask import Flask, render_template, url_for, jsonify, Response, redirect from requests import requests app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY") def render_page(body_tpl: str, **ctx): title = ctx.get("title") or APP_BRAND return render_template ("base.html") # ============================================================================= # Routes: Auth + Dashboard # ============================================================================= @app.route("/") def index(): if current_user.is_authenticated: return redirect(url_for("dashboard")) return redirect(url_for("login")) @limiter.limit("5/minute") @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username = (request.form.get("username") or "").strip() password = request.form.get("password") or "" user = User.query.filter_by(username=username).first() if not user or not user.is_active or not user.check_password(password): flash("Invalid credentials.", "error") return redirect(url_for("login")) login_user(user, remember=True) if user.must_change_password: flash("Please set a new password to continue.", "ok") return redirect(url_for("change_password")) return redirect(url_for("dashboard")) return render_template ("tpl_login_body.html", title="Login") @app.post("/logout") @login_required def logout(): logout_user() flash("Signed out.", "ok") return redirect(url_for("login")) @app.route("/dashboard") @login_required def dashboard(): return render_template ("tpl_dashboard_body.html", title="Dashboard", user=current_user) @app.route("/change-password", methods=["GET", "POST"]) @login_required def change_password(): if request.method == "POST": curr = request.form.get("current_password") or "" new1 = request.form.get("new_password") or "" new2 = request.form.get("confirm_password") or "" if not current_user.check_password(curr): flash("Current password is incorrect.", "error") return redirect(url_for("change_password")) if len(new1) < 10: flash("Choose a stronger password (min 10 chars).", "error") return redirect(url_for("change_password")) if new1 != new2: flash("Passwords do not match.", "error") return redirect(url_for("change_password")) current_user.set_password(new1) current_user.must_change_password = False db.session.commit() flash("Password updated.", "ok") return redirect(url_for("dashboard")) return render_template ("tpl_change_password.html", title="Change Password", user=current_user) # ============================================================================= # Secret Santa # ============================================================================= @app.route("/secret-santa", methods=["GET", "POST"]) @login_required def secret_santa(): entry = SecretSantaEntry.query.filter_by(user_id=current_user.id).first() if request.method == "POST": full_name = (request.form.get("full_name") or "").strip() if not full_name: flash("Name is required.", "error") return redirect(url_for("secret_santa")) age_raw = request.form.get("age") birthday = (request.form.get("birthday") or "").strip() hobbies = (request.form.get("hobbies") or "").strip() gift_card = (request.form.get("gift_card") or "").strip() fav_movie = (request.form.get("fav_movie") or "").strip() jewelry = request.form.get("jewelry") == "yes" try: age = int(age_raw) if (age_raw or "").strip() else None except ValueError: age = None if entry: entry.full_name = full_name entry.age = age entry.birthday = birthday or None entry.hobbies = hobbies entry.gift_card = gift_card entry.fav_movie = fav_movie entry.jewelry = jewelry else: entry = SecretSantaEntry( user_id=current_user.id, full_name=full_name, age=age, birthday=birthday or None, hobbies=hobbies, gift_card=gift_card, fav_movie=fav_movie, jewelry=jewelry, ) db.session.add(entry) db.session.commit() flash("Secret Santa saved.", "ok") return redirect(url_for("secret_santa")) form = { "full_name": entry.full_name if entry else current_user.username, "age": entry.age if entry else "", "birthday": entry.birthday if entry else "", "hobbies": entry.hobbies if entry else "", "gift_card": entry.gift_card if entry else "", "fav_movie": entry.fav_movie if entry else "", "jewelry": bool(entry.jewelry) if (entry and entry.jewelry is not None) else False, } return render_template ("tpl_secret_santa.html", title="Secret Santa", form=form) @app.get("/admin/secret-santa") @admin_required def admin_secret_santa(): rows = (SecretSantaEntry.query .join(User, SecretSantaEntry.user_id == User.id) .order_by(User.username.asc()) .all()) return render_template ("admin_secret_santa.html", title="Secret Santa", rows=rows) # ============================================================================= # Request Off # ============================================================================= @app.get("/request-off") @login_required def request_off(): my_reqs = (TimeOffRequest.query .filter_by(user_id=current_user.id) .order_by(TimeOffRequest.created_at.desc()) .all()) return render_template ("tpl_request_off_body.html", title="Request Off", my_reqs=my_reqs) @app.post("/requests/new") @login_required def requests_new(): date = (request.form.get("date") or "").strip() note = (request.form.get("note") or "").strip()[:240] if not date or not _validate_iso_date(date): flash("Please choose a valid date (YYYY-MM-DD).", "error") return redirect(url_for("request_off")) r = TimeOffRequest(user_id=current_user.id, date=date, note=note, status="pending") db.session.add(r); db.session.commit() flash("Request submitted.", "ok") return redirect(url_for("request_off")) @app.post("/requests//cancel") @login_required def requests_cancel(req_id: int): r = db.session.get(TimeOffRequest, req_id) if not r or r.user_id != current_user.id: flash("Not found.", "error"); return redirect(url_for("request_off")) if r.status != "pending": flash("Only pending requests can be cancelled.", "error"); return redirect(url_for("request_off")) r.status = "cancelled"; r.decided_at = datetime.utcnow(); db.session.commit() flash("Request cancelled.", "ok"); return redirect(url_for("request_off")) # ============================================================================= # Availability (member self-service) # ============================================================================= @app.route("/availability", methods=["GET", "POST"]) @login_required def availability(): aw = AvailabilityWeekly.query.filter_by(user_id=current_user.id).first() if request.method == "POST": week = _parse_week_from_form(request.form) if not aw: aw = AvailabilityWeekly(user_id=current_user.id, data_json=_json.dumps(week)) db.session.add(aw) else: aw.data_json = _json.dumps(week) db.session.commit() flash("Availability saved.", "ok") return redirect(url_for("availability")) week = _default_week() if aw and aw.data_json: try: data = _json.loads(aw.data_json) or {} for k,_ in DAY_NAMES: if k in data: week[k] = { "avail": bool(data[k].get("avail")), "start": (data[k].get("start") or "08:00")[:5], "end": (data[k].get("end") or "17:00")[:5], } except Exception as e: app.logger.warning("Failed to parse availability JSON for user %s: %s", current_user.id, e) return render_template ("tpl_avail.html", title="Your Availability", day_names=DAY_NAMES, week=week) # ============================================================================= # Admin: Availability grid + EDITOR (NEW) # ============================================================================= @app.get("/admin/availability") @admin_required def admin_availability(): rows = [] users = User.query.filter_by(is_active=True).order_by(User.username.asc()).all() for u in users: aw = AvailabilityWeekly.query.filter_by(user_id=u.id).first() wk = _default_week() if aw and aw.data_json: try: data = _json.loads(aw.data_json) or {} for k,_ in DAY_NAMES: if k in data: wk[k] = { "avail": bool(data[k].get("avail")), "start": (data[k].get("start") or "08:00")[:5], "end": (data[k].get("end") or "17:00")[:5], } except Exception as e: app.logger.warning("Failed to parse availability JSON for user %s: %s", u.id, e) rows.append({"user_id": u.id, "username": u.username, "week": wk}) return render_template ("admin_avail.html", title="Team Availability", rows=rows, day_names=DAY_NAMES) @app.route("/admin/availability/edit", methods=["GET", "POST"]) @admin_required def admin_availability_edit(): # Save for a specific user if request.method == "POST": try: target_id = int(request.form.get("user_id") or "0") except ValueError: flash("Invalid user.", "error") return redirect(url_for("admin_availability_edit")) target = db.session.get(User, target_id) if not target: flash("User not found.", "error") return redirect(url_for("admin_availability_edit")) week = _parse_week_from_form(request.form) aw = AvailabilityWeekly.query.filter_by(user_id=target.id).first() if not aw: aw = AvailabilityWeekly(user_id=target.id, data_json=_json.dumps(week)) db.session.add(aw) else: aw.data_json = _json.dumps(week) db.session.commit() flash(f"Availability saved for {target.username}.", "ok") return redirect(url_for("admin_availability_edit", user_id=target.id)) # GET: pick a user then load their week users = User.query.filter_by(is_active=True).order_by(User.username.asc()).all() user_id = request.args.get("user_id", type=int) user = db.session.get(User, user_id) if user_id else None week = _default_week() if user: aw = AvailabilityWeekly.query.filter_by(user_id=user.id).first() if aw and aw.data_json: try: data = _json.loads(aw.data_json) or {} for k,_ in DAY_NAMES: if k in data: week[k] = { "avail": bool(data[k].get("avail")), "start": (data[k].get("start") or "08:00")[:5], "end": (data[k].get("end") or "17:00")[:5], } except Exception as e: app.logger.warning("Admin edit: bad JSON for user %s: %s", user.id, e) return render_template ("admin_availability.html", title="Edit Availability", users=users, user=user, day_names=DAY_NAMES, week=week) # ============================================================================= # Admin: Requests + Wishlists # ============================================================================= @app.get("/admin/requests") @admin_required def admin_requests(): rows = (TimeOffRequest.query .join(User, TimeOffRequest.user_id == User.id) .order_by(TimeOffRequest.status.asc(), TimeOffRequest.created_at.desc()) .all()) return render_page(TPL_ADMIN_REQS_BODY, title="Time-off Requests", rows=rows) @app.post("/admin/requests//") @admin_required def admin_requests_action(req_id: int, action: str): r = db.session.get(TimeOffRequest, req_id) if not r: flash("Request not found.", "error"); return redirect(url_for("admin_requests")) if r.status != "pending": flash("Request is already decided.", "error"); return redirect(url_for("admin_requests")) if action not in ("approve","deny"): flash("Invalid action.", "error"); return redirect(url_for("admin_requests")) r.status = "approved" if action == "approve" else "denied" r.decided_at = datetime.utcnow() db.session.commit() flash(f"Request {action}d.", "ok"); return redirect(url_for("admin_requests")) @app.get("/admin/wishlists") @admin_required def admin_wishlists(): rows = (Wishlist.query .join(User, Wishlist.user_id == User.id) .order_by(User.username.asc()) .all()) return render_page(TPL_ADMIN_WISHLISTS_BODY, title="Wishlists", rows=rows) # ============================================================================= # Exports # ============================================================================= @app.get("/admin/secret-santa/export.csv") @admin_required def admin_secret_santa_export(): rows = (SecretSantaEntry.query.join(User).order_by(User.username.asc()).all()) sio = StringIO() w = csv.writer(sio) w.writerow(["username","full_name","age","birthday","hobbies","gift_card","fav_movie","jewelry","updated_at"]) for e in rows: w.writerow([ e.user.username, e.full_name, e.age or "", e.birthday or "", e.hobbies or "", e.gift_card or "", e.fav_movie or "", "Yes" if e.jewelry else "No", e.updated_at.isoformat() if e.updated_at else "" ]) resp = make_response(sio.getvalue()) resp.headers["Content-Type"] = "text/csv; charset=utf-8" resp.headers["Content-Disposition"] = "attachment; filename=secret_santa.csv" return resp @app.get("/admin/requests/export.csv") @admin_required def admin_requests_export(): rows = TimeOffRequest.query.join(User).order_by(TimeOffRequest.created_at.desc()).all() sio = StringIO() w = csv.writer(sio) w.writerow(["username","date","status","note","created_at","decided_at"]) for r in rows: w.writerow([ r.user.username, r.date, r.status, r.note or "", r.created_at.isoformat(), r.decided_at.isoformat() if r.decided_at else "" ]) resp = make_response(sio.getvalue()) resp.headers["Content-Type"] = "text/csv; charset=utf-8" resp.headers["Content-Disposition"] = "attachment; filename=time_off_requests.csv" return resp # ============================================================================= # Admin: Users CRUD # ============================================================================= @app.get("/admin/users") @admin_required def admin_users(): users = User.query.order_by(User.created_at.desc()).all() return render_page(TPL_ADMIN_USERS_BODY, title="Users", users=users) @app.post("/admin/users/create") @admin_required def admin_users_create(): username = (request.form.get("username") or "").strip() role = (request.form.get("role") or "member").strip().lower() role = "admin" if role == "admin" else "member" if not username: flash("Username is required.", "error") return redirect(url_for("admin_users")) if User.query.filter_by(username=username).first(): flash("Username already exists.", "error") return redirect(url_for("admin_users")) temp_pw = gen_temp_password() u = User(username=username, role=role, is_active=True, must_change_password=True) u.set_password(temp_pw) db.session.add(u) db.session.commit() flash(f"User '{username}' created with temporary password: {temp_pw}", "ok") return redirect(url_for("admin_users")) @app.post("/admin/users//reset") @admin_required def admin_users_reset(user_id: int): u = db.session.get(User, user_id) if not u: flash("User not found.", "error") return redirect(url_for("admin_users")) temp_pw = gen_temp_password() u.set_password(temp_pw) u.must_change_password = True db.session.commit() flash(f"Password reset for '{u.username}'. New temp: {temp_pw}", "ok") return redirect(url_for("admin_users")) @app.post("/admin/users//role") @admin_required def admin_users_role(user_id: int): u = db.session.get(User, user_id) if not u: flash("User not found.", "error") return redirect(url_for("admin_users")) new_role = (request.form.get("role") or "member").lower() u.role = "admin" if new_role == "admin" else "member" db.session.commit() flash(f"Role updated for '{u.username}' → {u.role}", "ok") return redirect(url_for("admin_users")) @app.post("/admin/users//toggle") @admin_required def admin_users_toggle(user_id: int): u = db.session.get(User, user_id) if not u: flash("User not found.", "error") return redirect(url_for("admin_users")) # Prevent locking yourself out of the last admin if u.id == current_user.id and u.role == "admin": admins = User.query.filter_by(role="admin", is_active=True).count() if admins <= 1: flash("You are the last active admin; cannot deactivate.", "error") return redirect(url_for("admin_users")) u.is_active = not u.is_active db.session.commit() flash(f"User '{u.username}' active={u.is_active}", "ok") return redirect(url_for("admin_users")) # ============================================================================= # JSON API + Health # ============================================================================= @app.get("/api/me") @login_required def api_me(): u: User = current_user # type: ignore return jsonify( id=u.id, username=u.username, role=u.role, must_change_password=u.must_change_password, is_active=u.is_active ) # --------------------------- # Info: user view # --------------------------- @app.get("/info") @login_required def info_page(): contacts = (InfoContact.query .filter_by(is_active=True) .order_by(InfoContact.priority.asc(), InfoContact.name.asc()) .all()) exts = (DeptExtension.query .filter_by(is_active=True) .order_by(DeptExtension.ext.asc()) .all()) supports = (SupportItem.query .filter_by(is_active=True) .filter((SupportItem.audience == "all") | (SupportItem.audience == ("admin" if current_user.is_admin() else "zzz"))) .order_by(SupportItem.category.asc()) .all()) admin_secrets = [] if current_user.is_admin(): admin_secrets = (LocalSecret.query .filter_by(is_active=True) .order_by(LocalSecret.updated_at.desc()) .all()) return render_page(TPL_INFO_PAGE, title="Store Info", contacts=contacts, exts=exts, supports=supports, admin_secrets=admin_secrets) # --------------------------- # Info: admin console # --------------------------- @app.get("/admin/info") @admin_required def admin_info_console(): contacts = InfoContact.query.order_by(InfoContact.priority.asc(), InfoContact.name.asc()).all() exts = DeptExtension.query.order_by(DeptExtension.ext.asc()).all() supports = SupportItem.query.order_by(SupportItem.category.asc()).all() secrets = LocalSecret.query.order_by(LocalSecret.updated_at.desc()).all() return render_page(TPL_ADMIN_INFO, title="Info Admin", contacts=contacts, exts=exts, supports=supports, secrets=secrets) # ---- Contacts CRUD @app.post("/admin/info/contacts/create") @admin_required def admin_info_contacts_create(): name = (request.form.get("name") or "").strip() role = (request.form.get("role") or "").strip() phone = (request.form.get("phone") or "").strip() priority = request.form.get("priority", type=int) or 5 if not name: flash("Name is required.", "error"); return redirect(url_for("admin_info_console")) db.session.add(InfoContact(name=name, role=role, phone=phone, priority=max(1, min(priority, 9)))) db.session.commit() flash("Contact added.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/contacts//toggle") @admin_required def admin_info_contacts_toggle(cid:int): c = db.session.get(InfoContact, cid) if not c: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) c.is_active = not c.is_active; db.session.commit() flash("Contact updated.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/contacts//delete") @admin_required def admin_info_contacts_delete(cid:int): c = db.session.get(InfoContact, cid) if not c: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) db.session.delete(c); db.session.commit() flash("Contact deleted.", "ok"); return redirect(url_for("admin_info_console")) # ---- Dept Extensions CRUD @app.post("/admin/info/exts/create") @admin_required def admin_info_exts_create(): ext = (request.form.get("ext") or "").strip() dept = (request.form.get("dept") or "").strip() if not ext or not dept: flash("Extension and department are required.", "error"); return redirect(url_for("admin_info_console")) db.session.add(DeptExtension(ext=ext, dept=dept)) db.session.commit() flash("Extension added.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/exts//toggle") @admin_required def admin_info_exts_toggle(did:int): d = db.session.get(DeptExtension, did) if not d: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) d.is_active = not d.is_active; db.session.commit() flash("Extension updated.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/exts//delete") @admin_required def admin_info_exts_delete(did:int): d = db.session.get(DeptExtension, did) if not d: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) db.session.delete(d); db.session.commit() flash("Extension deleted.", "ok"); return redirect(url_for("admin_info_console")) # ---- Support Items CRUD @app.post("/admin/info/support/create") @admin_required def admin_info_support_create(): cat = (request.form.get("category") or "").strip() email = (request.form.get("email") or "").strip() phone = (request.form.get("phone") or "").strip() note = (request.form.get("note") or "").strip() admin_only = request.form.get("admin_only") == "on" issues_text = (request.form.get("issues") or "").strip() issues = [ln.strip() for ln in issues_text.splitlines() if ln.strip()] if not cat: flash("Category is required.", "error"); return redirect(url_for("admin_info_console")) db.session.add(SupportItem(category=cat, email=email, phone=phone, note=note, issues_json=_json.dumps(issues), audience=("admin" if admin_only else "all"))) db.session.commit() flash("Support item added.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/support//toggle") @admin_required def admin_info_support_toggle(sid:int): s = db.session.get(SupportItem, sid) if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) s.is_active = not s.is_active; db.session.commit() flash("Support item updated.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/support//delete") @admin_required def admin_info_support_delete(sid:int): s = db.session.get(SupportItem, sid) if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) db.session.delete(s); db.session.commit() flash("Support item deleted.", "ok"); return redirect(url_for("admin_info_console")) # ---- Secrets CRUD (admin-only) @app.post("/admin/info/secret/create") @admin_required def admin_info_secret_create(): label = (request.form.get("label") or "").strip() value = (request.form.get("value") or "").strip() notes = (request.form.get("notes") or "").strip() if not label or not value: flash("Label and value are required.", "error"); return redirect(url_for("admin_info_console")) db.session.add(LocalSecret(label=label, value=value, notes=notes)) db.session.commit() flash("Secret added.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/secret//toggle") @admin_required def admin_info_secret_toggle(sid:int): s = db.session.get(LocalSecret, sid) if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) s.is_active = not s.is_active; db.session.commit() flash("Secret updated.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/secret//delete") @admin_required def admin_info_secret_delete(sid:int): s = db.session.get(LocalSecret, sid) if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) db.session.delete(s); db.session.commit() flash("Secret deleted.", "ok"); return redirect(url_for("admin_info_console")) @app.get("/healthz") def healthz(): return {"ok": True}, 200 # ============================================================================= # Error handlers # ============================================================================= @app.errorhandler(403) def error_403(e): return render_page("

Forbidden

You don't have access to this resource.

"), 403 @app.errorhandler(404) def error_404(e): return render_page("

Not Found

We couldn't find what you were looking for.

"), 404 @app.errorhandler(500) def error_500(e): return render_page("

Server Error

Something went wrong. Please try again.

"), 500 # ============================================================================= # Main # ============================================================================= if __name__ == "__main__": app.run(debug=True, port=5000)