667 lines
27 KiB
Python
667 lines
27 KiB
Python
from __future__ import annotations
|
|
import os
|
|
from datetime import datetime
|
|
from flask import Flask, render_template, url_for, jsonify, Response, redirect
|
|
from requests import requests
|
|
|
|
|
|
|
|
app = Flask(__name__)
|
|
|
|
app.secret_key = os.getenv("SECRET_KEY")
|
|
|
|
|
|
|
|
|
|
def render_page(body_tpl: str, **ctx):
|
|
|
|
title = ctx.get("title") or APP_BRAND
|
|
return render_template ("base.html")
|
|
|
|
# =============================================================================
|
|
# Routes: Auth + Dashboard
|
|
# =============================================================================
|
|
@app.route("/")
|
|
def index():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for("dashboard"))
|
|
return redirect(url_for("login"))
|
|
|
|
@limiter.limit("5/minute")
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if request.method == "POST":
|
|
username = (request.form.get("username") or "").strip()
|
|
password = request.form.get("password") or ""
|
|
user = User.query.filter_by(username=username).first()
|
|
if not user or not user.is_active or not user.check_password(password):
|
|
flash("Invalid credentials.", "error")
|
|
return redirect(url_for("login"))
|
|
|
|
login_user(user, remember=True)
|
|
|
|
if user.must_change_password:
|
|
flash("Please set a new password to continue.", "ok")
|
|
return redirect(url_for("change_password"))
|
|
|
|
return redirect(url_for("dashboard"))
|
|
|
|
return render_template ("tpl_login_body.html", title="Login")
|
|
|
|
@app.post("/logout")
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
flash("Signed out.", "ok")
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/dashboard")
|
|
@login_required
|
|
def dashboard():
|
|
return render_template ("tpl_dashboard_body.html", title="Dashboard", user=current_user)
|
|
|
|
@app.route("/change-password", methods=["GET", "POST"])
|
|
@login_required
|
|
def change_password():
|
|
if request.method == "POST":
|
|
curr = request.form.get("current_password") or ""
|
|
new1 = request.form.get("new_password") or ""
|
|
new2 = request.form.get("confirm_password") or ""
|
|
|
|
if not current_user.check_password(curr):
|
|
flash("Current password is incorrect.", "error")
|
|
return redirect(url_for("change_password"))
|
|
|
|
if len(new1) < 10:
|
|
flash("Choose a stronger password (min 10 chars).", "error")
|
|
return redirect(url_for("change_password"))
|
|
|
|
if new1 != new2:
|
|
flash("Passwords do not match.", "error")
|
|
return redirect(url_for("change_password"))
|
|
|
|
current_user.set_password(new1)
|
|
current_user.must_change_password = False
|
|
db.session.commit()
|
|
flash("Password updated.", "ok")
|
|
return redirect(url_for("dashboard"))
|
|
|
|
return render_template ("tpl_change_password.html", title="Change Password", user=current_user)
|
|
|
|
# =============================================================================
|
|
# Secret Santa
|
|
# =============================================================================
|
|
@app.route("/secret-santa", methods=["GET", "POST"])
|
|
@login_required
|
|
def secret_santa():
|
|
entry = SecretSantaEntry.query.filter_by(user_id=current_user.id).first()
|
|
|
|
if request.method == "POST":
|
|
full_name = (request.form.get("full_name") or "").strip()
|
|
if not full_name:
|
|
flash("Name is required.", "error")
|
|
return redirect(url_for("secret_santa"))
|
|
|
|
age_raw = request.form.get("age")
|
|
birthday = (request.form.get("birthday") or "").strip()
|
|
hobbies = (request.form.get("hobbies") or "").strip()
|
|
gift_card = (request.form.get("gift_card") or "").strip()
|
|
fav_movie = (request.form.get("fav_movie") or "").strip()
|
|
jewelry = request.form.get("jewelry") == "yes"
|
|
|
|
try:
|
|
age = int(age_raw) if (age_raw or "").strip() else None
|
|
except ValueError:
|
|
age = None
|
|
|
|
if entry:
|
|
entry.full_name = full_name
|
|
entry.age = age
|
|
entry.birthday = birthday or None
|
|
entry.hobbies = hobbies
|
|
entry.gift_card = gift_card
|
|
entry.fav_movie = fav_movie
|
|
entry.jewelry = jewelry
|
|
else:
|
|
entry = SecretSantaEntry(
|
|
user_id=current_user.id,
|
|
full_name=full_name, age=age, birthday=birthday or None,
|
|
hobbies=hobbies, gift_card=gift_card, fav_movie=fav_movie,
|
|
jewelry=jewelry,
|
|
)
|
|
db.session.add(entry)
|
|
|
|
db.session.commit()
|
|
flash("Secret Santa saved.", "ok")
|
|
return redirect(url_for("secret_santa"))
|
|
|
|
form = {
|
|
"full_name": entry.full_name if entry else current_user.username,
|
|
"age": entry.age if entry else "",
|
|
"birthday": entry.birthday if entry else "",
|
|
"hobbies": entry.hobbies if entry else "",
|
|
"gift_card": entry.gift_card if entry else "",
|
|
"fav_movie": entry.fav_movie if entry else "",
|
|
"jewelry": bool(entry.jewelry) if (entry and entry.jewelry is not None) else False,
|
|
}
|
|
return render_template ("tpl_secret_santa.html", title="Secret Santa", form=form)
|
|
|
|
@app.get("/admin/secret-santa")
|
|
@admin_required
|
|
def admin_secret_santa():
|
|
rows = (SecretSantaEntry.query
|
|
.join(User, SecretSantaEntry.user_id == User.id)
|
|
.order_by(User.username.asc())
|
|
.all())
|
|
return render_template ("admin_secret_santa.html", title="Secret Santa", rows=rows)
|
|
|
|
# =============================================================================
|
|
# Request Off
|
|
# =============================================================================
|
|
@app.get("/request-off")
|
|
@login_required
|
|
def request_off():
|
|
my_reqs = (TimeOffRequest.query
|
|
.filter_by(user_id=current_user.id)
|
|
.order_by(TimeOffRequest.created_at.desc())
|
|
.all())
|
|
return render_template ("tpl_request_off_body.html", title="Request Off", my_reqs=my_reqs)
|
|
|
|
@app.post("/requests/new")
|
|
@login_required
|
|
def requests_new():
|
|
date = (request.form.get("date") or "").strip()
|
|
note = (request.form.get("note") or "").strip()[:240]
|
|
if not date or not _validate_iso_date(date):
|
|
flash("Please choose a valid date (YYYY-MM-DD).", "error")
|
|
return redirect(url_for("request_off"))
|
|
r = TimeOffRequest(user_id=current_user.id, date=date, note=note, status="pending")
|
|
db.session.add(r); db.session.commit()
|
|
flash("Request submitted.", "ok")
|
|
return redirect(url_for("request_off"))
|
|
|
|
@app.post("/requests/<int:req_id>/cancel")
|
|
@login_required
|
|
def requests_cancel(req_id: int):
|
|
r = db.session.get(TimeOffRequest, req_id)
|
|
if not r or r.user_id != current_user.id:
|
|
flash("Not found.", "error"); return redirect(url_for("request_off"))
|
|
if r.status != "pending":
|
|
flash("Only pending requests can be cancelled.", "error"); return redirect(url_for("request_off"))
|
|
r.status = "cancelled"; r.decided_at = datetime.utcnow(); db.session.commit()
|
|
flash("Request cancelled.", "ok"); return redirect(url_for("request_off"))
|
|
|
|
# =============================================================================
|
|
# Availability (member self-service)
|
|
# =============================================================================
|
|
@app.route("/availability", methods=["GET", "POST"])
|
|
@login_required
|
|
def availability():
|
|
aw = AvailabilityWeekly.query.filter_by(user_id=current_user.id).first()
|
|
|
|
if request.method == "POST":
|
|
week = _parse_week_from_form(request.form)
|
|
if not aw:
|
|
aw = AvailabilityWeekly(user_id=current_user.id, data_json=_json.dumps(week))
|
|
db.session.add(aw)
|
|
else:
|
|
aw.data_json = _json.dumps(week)
|
|
db.session.commit()
|
|
flash("Availability saved.", "ok")
|
|
return redirect(url_for("availability"))
|
|
|
|
week = _default_week()
|
|
if aw and aw.data_json:
|
|
try:
|
|
data = _json.loads(aw.data_json) or {}
|
|
for k,_ in DAY_NAMES:
|
|
if k in data:
|
|
week[k] = {
|
|
"avail": bool(data[k].get("avail")),
|
|
"start": (data[k].get("start") or "08:00")[:5],
|
|
"end": (data[k].get("end") or "17:00")[:5],
|
|
}
|
|
except Exception as e:
|
|
app.logger.warning("Failed to parse availability JSON for user %s: %s", current_user.id, e)
|
|
|
|
return render_template ("tpl_avail.html", title="Your Availability",
|
|
day_names=DAY_NAMES, week=week)
|
|
|
|
# =============================================================================
|
|
# Admin: Availability grid + EDITOR (NEW)
|
|
# =============================================================================
|
|
@app.get("/admin/availability")
|
|
@admin_required
|
|
def admin_availability():
|
|
rows = []
|
|
users = User.query.filter_by(is_active=True).order_by(User.username.asc()).all()
|
|
for u in users:
|
|
aw = AvailabilityWeekly.query.filter_by(user_id=u.id).first()
|
|
wk = _default_week()
|
|
if aw and aw.data_json:
|
|
try:
|
|
data = _json.loads(aw.data_json) or {}
|
|
for k,_ in DAY_NAMES:
|
|
if k in data:
|
|
wk[k] = {
|
|
"avail": bool(data[k].get("avail")),
|
|
"start": (data[k].get("start") or "08:00")[:5],
|
|
"end": (data[k].get("end") or "17:00")[:5],
|
|
}
|
|
except Exception as e:
|
|
app.logger.warning("Failed to parse availability JSON for user %s: %s", u.id, e)
|
|
rows.append({"user_id": u.id, "username": u.username, "week": wk})
|
|
return render_template ("admin_avail.html", title="Team Availability", rows=rows, day_names=DAY_NAMES)
|
|
|
|
@app.route("/admin/availability/edit", methods=["GET", "POST"])
|
|
@admin_required
|
|
def admin_availability_edit():
|
|
# Save for a specific user
|
|
if request.method == "POST":
|
|
try:
|
|
target_id = int(request.form.get("user_id") or "0")
|
|
except ValueError:
|
|
flash("Invalid user.", "error")
|
|
return redirect(url_for("admin_availability_edit"))
|
|
target = db.session.get(User, target_id)
|
|
if not target:
|
|
flash("User not found.", "error")
|
|
return redirect(url_for("admin_availability_edit"))
|
|
|
|
week = _parse_week_from_form(request.form)
|
|
aw = AvailabilityWeekly.query.filter_by(user_id=target.id).first()
|
|
if not aw:
|
|
aw = AvailabilityWeekly(user_id=target.id, data_json=_json.dumps(week))
|
|
db.session.add(aw)
|
|
else:
|
|
aw.data_json = _json.dumps(week)
|
|
db.session.commit()
|
|
flash(f"Availability saved for {target.username}.", "ok")
|
|
return redirect(url_for("admin_availability_edit", user_id=target.id))
|
|
|
|
# GET: pick a user then load their week
|
|
users = User.query.filter_by(is_active=True).order_by(User.username.asc()).all()
|
|
user_id = request.args.get("user_id", type=int)
|
|
user = db.session.get(User, user_id) if user_id else None
|
|
|
|
week = _default_week()
|
|
if user:
|
|
aw = AvailabilityWeekly.query.filter_by(user_id=user.id).first()
|
|
if aw and aw.data_json:
|
|
try:
|
|
data = _json.loads(aw.data_json) or {}
|
|
for k,_ in DAY_NAMES:
|
|
if k in data:
|
|
week[k] = {
|
|
"avail": bool(data[k].get("avail")),
|
|
"start": (data[k].get("start") or "08:00")[:5],
|
|
"end": (data[k].get("end") or "17:00")[:5],
|
|
}
|
|
except Exception as e:
|
|
app.logger.warning("Admin edit: bad JSON for user %s: %s", user.id, e)
|
|
|
|
return render_template ("admin_availability.html",
|
|
title="Edit Availability",
|
|
users=users, user=user, day_names=DAY_NAMES, week=week)
|
|
|
|
# =============================================================================
|
|
# Admin: Requests + Wishlists
|
|
# =============================================================================
|
|
@app.get("/admin/requests")
|
|
@admin_required
|
|
def admin_requests():
|
|
rows = (TimeOffRequest.query
|
|
.join(User, TimeOffRequest.user_id == User.id)
|
|
.order_by(TimeOffRequest.status.asc(), TimeOffRequest.created_at.desc())
|
|
.all())
|
|
return render_page(TPL_ADMIN_REQS_BODY, title="Time-off Requests", rows=rows)
|
|
|
|
@app.post("/admin/requests/<int:req_id>/<action>")
|
|
@admin_required
|
|
def admin_requests_action(req_id: int, action: str):
|
|
r = db.session.get(TimeOffRequest, req_id)
|
|
if not r:
|
|
flash("Request not found.", "error"); return redirect(url_for("admin_requests"))
|
|
if r.status != "pending":
|
|
flash("Request is already decided.", "error"); return redirect(url_for("admin_requests"))
|
|
if action not in ("approve","deny"):
|
|
flash("Invalid action.", "error"); return redirect(url_for("admin_requests"))
|
|
r.status = "approved" if action == "approve" else "denied"
|
|
r.decided_at = datetime.utcnow()
|
|
db.session.commit()
|
|
flash(f"Request {action}d.", "ok"); return redirect(url_for("admin_requests"))
|
|
|
|
@app.get("/admin/wishlists")
|
|
@admin_required
|
|
def admin_wishlists():
|
|
rows = (Wishlist.query
|
|
.join(User, Wishlist.user_id == User.id)
|
|
.order_by(User.username.asc())
|
|
.all())
|
|
return render_page(TPL_ADMIN_WISHLISTS_BODY, title="Wishlists", rows=rows)
|
|
|
|
# =============================================================================
|
|
# Exports
|
|
# =============================================================================
|
|
@app.get("/admin/secret-santa/export.csv")
|
|
@admin_required
|
|
def admin_secret_santa_export():
|
|
rows = (SecretSantaEntry.query.join(User).order_by(User.username.asc()).all())
|
|
sio = StringIO()
|
|
w = csv.writer(sio)
|
|
w.writerow(["username","full_name","age","birthday","hobbies","gift_card","fav_movie","jewelry","updated_at"])
|
|
for e in rows:
|
|
w.writerow([
|
|
e.user.username, e.full_name, e.age or "", e.birthday or "", e.hobbies or "",
|
|
e.gift_card or "", e.fav_movie or "", "Yes" if e.jewelry else "No",
|
|
e.updated_at.isoformat() if e.updated_at else ""
|
|
])
|
|
resp = make_response(sio.getvalue())
|
|
resp.headers["Content-Type"] = "text/csv; charset=utf-8"
|
|
resp.headers["Content-Disposition"] = "attachment; filename=secret_santa.csv"
|
|
return resp
|
|
|
|
@app.get("/admin/requests/export.csv")
|
|
@admin_required
|
|
def admin_requests_export():
|
|
rows = TimeOffRequest.query.join(User).order_by(TimeOffRequest.created_at.desc()).all()
|
|
sio = StringIO()
|
|
w = csv.writer(sio)
|
|
w.writerow(["username","date","status","note","created_at","decided_at"])
|
|
for r in rows:
|
|
w.writerow([
|
|
r.user.username, r.date, r.status, r.note or "",
|
|
r.created_at.isoformat(),
|
|
r.decided_at.isoformat() if r.decided_at else ""
|
|
])
|
|
resp = make_response(sio.getvalue())
|
|
resp.headers["Content-Type"] = "text/csv; charset=utf-8"
|
|
resp.headers["Content-Disposition"] = "attachment; filename=time_off_requests.csv"
|
|
return resp
|
|
|
|
|
|
# =============================================================================
|
|
# Admin: Users CRUD
|
|
# =============================================================================
|
|
@app.get("/admin/users")
|
|
@admin_required
|
|
def admin_users():
|
|
users = User.query.order_by(User.created_at.desc()).all()
|
|
return render_page(TPL_ADMIN_USERS_BODY, title="Users", users=users)
|
|
|
|
@app.post("/admin/users/create")
|
|
@admin_required
|
|
def admin_users_create():
|
|
username = (request.form.get("username") or "").strip()
|
|
role = (request.form.get("role") or "member").strip().lower()
|
|
role = "admin" if role == "admin" else "member"
|
|
|
|
if not username:
|
|
flash("Username is required.", "error")
|
|
return redirect(url_for("admin_users"))
|
|
|
|
if User.query.filter_by(username=username).first():
|
|
flash("Username already exists.", "error")
|
|
return redirect(url_for("admin_users"))
|
|
|
|
temp_pw = gen_temp_password()
|
|
u = User(username=username, role=role, is_active=True, must_change_password=True)
|
|
u.set_password(temp_pw)
|
|
db.session.add(u)
|
|
db.session.commit()
|
|
|
|
flash(f"User '{username}' created with temporary password: {temp_pw}", "ok")
|
|
return redirect(url_for("admin_users"))
|
|
|
|
@app.post("/admin/users/<int:user_id>/reset")
|
|
@admin_required
|
|
def admin_users_reset(user_id: int):
|
|
u = db.session.get(User, user_id)
|
|
if not u:
|
|
flash("User not found.", "error")
|
|
return redirect(url_for("admin_users"))
|
|
|
|
temp_pw = gen_temp_password()
|
|
u.set_password(temp_pw)
|
|
u.must_change_password = True
|
|
db.session.commit()
|
|
flash(f"Password reset for '{u.username}'. New temp: {temp_pw}", "ok")
|
|
return redirect(url_for("admin_users"))
|
|
|
|
@app.post("/admin/users/<int:user_id>/role")
|
|
@admin_required
|
|
def admin_users_role(user_id: int):
|
|
u = db.session.get(User, user_id)
|
|
if not u:
|
|
flash("User not found.", "error")
|
|
return redirect(url_for("admin_users"))
|
|
|
|
new_role = (request.form.get("role") or "member").lower()
|
|
u.role = "admin" if new_role == "admin" else "member"
|
|
db.session.commit()
|
|
flash(f"Role updated for '{u.username}' → {u.role}", "ok")
|
|
return redirect(url_for("admin_users"))
|
|
|
|
@app.post("/admin/users/<int:user_id>/toggle")
|
|
@admin_required
|
|
def admin_users_toggle(user_id: int):
|
|
u = db.session.get(User, user_id)
|
|
if not u:
|
|
flash("User not found.", "error")
|
|
return redirect(url_for("admin_users"))
|
|
|
|
# Prevent locking yourself out of the last admin
|
|
if u.id == current_user.id and u.role == "admin":
|
|
admins = User.query.filter_by(role="admin", is_active=True).count()
|
|
if admins <= 1:
|
|
flash("You are the last active admin; cannot deactivate.", "error")
|
|
return redirect(url_for("admin_users"))
|
|
|
|
u.is_active = not u.is_active
|
|
db.session.commit()
|
|
flash(f"User '{u.username}' active={u.is_active}", "ok")
|
|
return redirect(url_for("admin_users"))
|
|
|
|
|
|
# =============================================================================
|
|
# JSON API + Health
|
|
# =============================================================================
|
|
@app.get("/api/me")
|
|
@login_required
|
|
def api_me():
|
|
u: User = current_user # type: ignore
|
|
return jsonify(
|
|
id=u.id, username=u.username, role=u.role,
|
|
must_change_password=u.must_change_password, is_active=u.is_active
|
|
)
|
|
|
|
|
|
# ---------------------------
|
|
# Info: user view
|
|
# ---------------------------
|
|
@app.get("/info")
|
|
@login_required
|
|
def info_page():
|
|
contacts = (InfoContact.query
|
|
.filter_by(is_active=True)
|
|
.order_by(InfoContact.priority.asc(), InfoContact.name.asc())
|
|
.all())
|
|
exts = (DeptExtension.query
|
|
.filter_by(is_active=True)
|
|
.order_by(DeptExtension.ext.asc())
|
|
.all())
|
|
supports = (SupportItem.query
|
|
.filter_by(is_active=True)
|
|
.filter((SupportItem.audience == "all") | (SupportItem.audience == ("admin" if current_user.is_admin() else "zzz")))
|
|
.order_by(SupportItem.category.asc())
|
|
.all())
|
|
admin_secrets = []
|
|
if current_user.is_admin():
|
|
admin_secrets = (LocalSecret.query
|
|
.filter_by(is_active=True)
|
|
.order_by(LocalSecret.updated_at.desc())
|
|
.all())
|
|
return render_page(TPL_INFO_PAGE, title="Store Info",
|
|
contacts=contacts, exts=exts, supports=supports, admin_secrets=admin_secrets)
|
|
|
|
# ---------------------------
|
|
# Info: admin console
|
|
# ---------------------------
|
|
@app.get("/admin/info")
|
|
@admin_required
|
|
def admin_info_console():
|
|
contacts = InfoContact.query.order_by(InfoContact.priority.asc(), InfoContact.name.asc()).all()
|
|
exts = DeptExtension.query.order_by(DeptExtension.ext.asc()).all()
|
|
supports = SupportItem.query.order_by(SupportItem.category.asc()).all()
|
|
secrets = LocalSecret.query.order_by(LocalSecret.updated_at.desc()).all()
|
|
return render_page(TPL_ADMIN_INFO, title="Info Admin",
|
|
contacts=contacts, exts=exts, supports=supports, secrets=secrets)
|
|
|
|
# ---- Contacts CRUD
|
|
@app.post("/admin/info/contacts/create")
|
|
@admin_required
|
|
def admin_info_contacts_create():
|
|
name = (request.form.get("name") or "").strip()
|
|
role = (request.form.get("role") or "").strip()
|
|
phone = (request.form.get("phone") or "").strip()
|
|
priority = request.form.get("priority", type=int) or 5
|
|
if not name:
|
|
flash("Name is required.", "error"); return redirect(url_for("admin_info_console"))
|
|
db.session.add(InfoContact(name=name, role=role, phone=phone, priority=max(1, min(priority, 9))))
|
|
db.session.commit()
|
|
flash("Contact added.", "ok"); return redirect(url_for("admin_info_console"))
|
|
|
|
@app.post("/admin/info/contacts/<int:cid>/toggle")
|
|
@admin_required
|
|
def admin_info_contacts_toggle(cid:int):
|
|
c = db.session.get(InfoContact, cid)
|
|
if not c: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
|
|
c.is_active = not c.is_active; db.session.commit()
|
|
flash("Contact updated.", "ok"); return redirect(url_for("admin_info_console"))
|
|
|
|
@app.post("/admin/info/contacts/<int:cid>/delete")
|
|
@admin_required
|
|
def admin_info_contacts_delete(cid:int):
|
|
c = db.session.get(InfoContact, cid)
|
|
if not c: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
|
|
db.session.delete(c); db.session.commit()
|
|
flash("Contact deleted.", "ok"); return redirect(url_for("admin_info_console"))
|
|
|
|
# ---- Dept Extensions CRUD
|
|
@app.post("/admin/info/exts/create")
|
|
@admin_required
|
|
def admin_info_exts_create():
|
|
ext = (request.form.get("ext") or "").strip()
|
|
dept = (request.form.get("dept") or "").strip()
|
|
if not ext or not dept:
|
|
flash("Extension and department are required.", "error"); return redirect(url_for("admin_info_console"))
|
|
db.session.add(DeptExtension(ext=ext, dept=dept))
|
|
db.session.commit()
|
|
flash("Extension added.", "ok"); return redirect(url_for("admin_info_console"))
|
|
|
|
@app.post("/admin/info/exts/<int:did>/toggle")
|
|
@admin_required
|
|
def admin_info_exts_toggle(did:int):
|
|
d = db.session.get(DeptExtension, did)
|
|
if not d: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
|
|
d.is_active = not d.is_active; db.session.commit()
|
|
flash("Extension updated.", "ok"); return redirect(url_for("admin_info_console"))
|
|
|
|
@app.post("/admin/info/exts/<int:did>/delete")
|
|
@admin_required
|
|
def admin_info_exts_delete(did:int):
|
|
d = db.session.get(DeptExtension, did)
|
|
if not d: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
|
|
db.session.delete(d); db.session.commit()
|
|
flash("Extension deleted.", "ok"); return redirect(url_for("admin_info_console"))
|
|
|
|
# ---- Support Items CRUD
|
|
@app.post("/admin/info/support/create")
|
|
@admin_required
|
|
def admin_info_support_create():
|
|
cat = (request.form.get("category") or "").strip()
|
|
email = (request.form.get("email") or "").strip()
|
|
phone = (request.form.get("phone") or "").strip()
|
|
note = (request.form.get("note") or "").strip()
|
|
admin_only = request.form.get("admin_only") == "on"
|
|
issues_text = (request.form.get("issues") or "").strip()
|
|
issues = [ln.strip() for ln in issues_text.splitlines() if ln.strip()]
|
|
if not cat:
|
|
flash("Category is required.", "error"); return redirect(url_for("admin_info_console"))
|
|
db.session.add(SupportItem(category=cat, email=email, phone=phone, note=note,
|
|
issues_json=_json.dumps(issues), audience=("admin" if admin_only else "all")))
|
|
db.session.commit()
|
|
flash("Support item added.", "ok"); return redirect(url_for("admin_info_console"))
|
|
|
|
@app.post("/admin/info/support/<int:sid>/toggle")
|
|
@admin_required
|
|
def admin_info_support_toggle(sid:int):
|
|
s = db.session.get(SupportItem, sid)
|
|
if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
|
|
s.is_active = not s.is_active; db.session.commit()
|
|
flash("Support item updated.", "ok"); return redirect(url_for("admin_info_console"))
|
|
|
|
@app.post("/admin/info/support/<int:sid>/delete")
|
|
@admin_required
|
|
def admin_info_support_delete(sid:int):
|
|
s = db.session.get(SupportItem, sid)
|
|
if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
|
|
db.session.delete(s); db.session.commit()
|
|
flash("Support item deleted.", "ok"); return redirect(url_for("admin_info_console"))
|
|
|
|
# ---- Secrets CRUD (admin-only)
|
|
@app.post("/admin/info/secret/create")
|
|
@admin_required
|
|
def admin_info_secret_create():
|
|
label = (request.form.get("label") or "").strip()
|
|
value = (request.form.get("value") or "").strip()
|
|
notes = (request.form.get("notes") or "").strip()
|
|
if not label or not value:
|
|
flash("Label and value are required.", "error"); return redirect(url_for("admin_info_console"))
|
|
db.session.add(LocalSecret(label=label, value=value, notes=notes))
|
|
db.session.commit()
|
|
flash("Secret added.", "ok"); return redirect(url_for("admin_info_console"))
|
|
|
|
@app.post("/admin/info/secret/<int:sid>/toggle")
|
|
@admin_required
|
|
def admin_info_secret_toggle(sid:int):
|
|
s = db.session.get(LocalSecret, sid)
|
|
if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
|
|
s.is_active = not s.is_active; db.session.commit()
|
|
flash("Secret updated.", "ok"); return redirect(url_for("admin_info_console"))
|
|
|
|
@app.post("/admin/info/secret/<int:sid>/delete")
|
|
@admin_required
|
|
def admin_info_secret_delete(sid:int):
|
|
s = db.session.get(LocalSecret, sid)
|
|
if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
|
|
db.session.delete(s); db.session.commit()
|
|
flash("Secret deleted.", "ok"); return redirect(url_for("admin_info_console"))
|
|
|
|
|
|
|
|
@app.get("/healthz")
|
|
def healthz():
|
|
return {"ok": True}, 200
|
|
|
|
# =============================================================================
|
|
# Error handlers
|
|
# =============================================================================
|
|
@app.errorhandler(403)
|
|
def error_403(e):
|
|
return render_page("<section><h1 class='text-xl font-bold'>Forbidden</h1><p class='text-slate-300 mt-2'>You don't have access to this resource.</p></section>"), 403
|
|
|
|
@app.errorhandler(404)
|
|
def error_404(e):
|
|
return render_page("<section><h1 class='text-xl font-bold'>Not Found</h1><p class='text-slate-300 mt-2'>We couldn't find what you were looking for.</p></section>"), 404
|
|
|
|
@app.errorhandler(500)
|
|
def error_500(e):
|
|
return render_page("<section><h1 class='text-xl font-bold'>Server Error</h1><p class='text-slate-300 mt-2'>Something went wrong. Please try again.</p></section>"), 500
|
|
|
|
# =============================================================================
|
|
# Main
|
|
# =============================================================================
|
|
if __name__ == "__main__":
|
|
app.run(debug=True, port=5000)
|