Initial import of Brookhaven site

This commit is contained in:
2025-11-18 20:28:52 +00:00
parent 7e27f39d01
commit 773a91dd2f
1900 changed files with 700543 additions and 0 deletions

336
app.py Normal file
View File

@@ -0,0 +1,336 @@
# app.py
from __future__ import annotations
import os, json
from datetime import datetime
from typing import Dict, Any
from urllib.parse import urlencode, urlsplit, urlunsplit, parse_qsl
from flask import Flask, render_template, request, redirect, url_for, jsonify, flash, Response
from sqlalchemy import create_engine, text, Column, Integer, String, DateTime, JSON
from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session
from sqlalchemy.exc import SQLAlchemyError
from functools import wraps
import hmac
from werkzeug.security import check_password_hash
# -----------------------------------------------------------------------------
# App
# -----------------------------------------------------------------------------
app = Flask(__name__, static_folder="static", static_url_path="/static")
BRAND = "BrookHaven Technologies"
TAGLINE = "Fast to prototype. Safe to scale."
# --- Personal contact (override via env) ---
CONTACT = {
"name": os.environ.get("BH_CONTACT_NAME", "Benjamin Mosley"),
"title": os.environ.get("BH_CONTACT_TITLE", "Founder, BrookHaven Technologies"),
"email": os.environ.get("BH_CONTACT_EMAIL", "ben@bennyshouse.net"),
"phone": os.environ.get("BH_CONTACT_PHONE", "(806) 655 2300)"),
"city": os.environ.get("BH_CONTACT_CITY", "Canyon / Amarillo / Borger / Remote"),
"cal": os.environ.get("BH_CONTACT_CAL", "https://calendly.com/bennyshouse24/30min"),
"link": os.environ.get("BH_CONTACT_LINK", "https://www.linkedin.com/in/benjamin-mosley-849643329/"),
"site": os.environ.get("BH_CONTACT_SITE", "https://bennyshouse.net"),
"hours": os.environ.get("BH_CONTACT_HOURS", "MonFri, 9a5p CT"),
}
app.config.update(
SECRET_KEY=os.environ.get("APP_SECRET_KEY", "dev"),
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE="Lax",
SESSION_COOKIE_SECURE=bool(int(os.environ.get("COOKIE_SECURE", "1"))), # set 1 in prod with HTTPS
)
# Admin credentials (env-driven)
ADMIN_USER = os.environ.get("BH_ADMIN_USER", "admin")
ADMIN_PW_HASH = os.environ.get("BH_ADMIN_PASSWORD_HASH", "32768:8:1$pgll8a2zdtxky50G$8ef13bb775569f480da14618433b7b80a93f5cb3ef99b67878ddfb058d39e858f05d81b25c88365737d81400ee287a156c76de7b51aed33ea667030f7a83e10d") # pbkdf2 hash
ADMIN_BEARER = os.environ.get("BH_ADMIN_BEARER", "") # optional static token
# -----------------------------------------------------------------------------
# DB (MariaDB)
# -----------------------------------------------------------------------------
DB_URL = os.environ.get("DB_URL", "mysql+pymysql://tapdown:Swaows.1234@127.0.0.1/tapdown")
engine = create_engine(
DB_URL,
pool_size=10,
max_overflow=20,
pool_recycle=1800,
pool_pre_ping=True,
isolation_level="READ COMMITTED",
future=True,
)
SessionLocal = scoped_session(sessionmaker(bind=engine, expire_on_commit=False, future=True))
Base = declarative_base()
class Inquiry(Base):
__tablename__ = "bh_inquiries"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(160), nullable=False)
email = Column(String(200), nullable=False)
message = Column(String(4000), nullable=False)
nda = Column(String(8), nullable=False, default="no") # yes|no
meta = Column(JSON, nullable=False, default={}) # e.g. user agent
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
_tables_ready = False
@app.before_request
def ensure_tables():
global _tables_ready
if _tables_ready:
return
try:
with engine.begin() as conn:
Base.metadata.create_all(conn)
_tables_ready = True
except SQLAlchemyError:
app.logger.exception("DB init failed; continuing without DB")
@app.teardown_appcontext
def remove_session(_=None):
SessionLocal.remove()
# -----------------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------------
def with_utm(url: str, extra: Dict[str, str] | None = None) -> str:
scheme, netloc, path, query, frag = urlsplit(url)
q = dict(parse_qsl(query))
q.update(extra or {})
return urlunsplit((scheme, netloc, path, urlencode(q), frag))
app.config.update(
SECRET_KEY=os.environ.get("APP_SECRET_KEY", "dev"),
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE="Lax",
SESSION_COOKIE_SECURE=bool(int(os.environ.get("COOKIE_SECURE", "0"))), # set 1 in prod with HTTPS
)
def _is_admin_request():
# 1) Bearer token (e.g., for automation or CSV curl)
authz = request.headers.get("Authorization", "")
if ADMIN_BEARER and authz.startswith("Bearer "):
token = authz[7:].strip()
if hmac.compare_digest(token, ADMIN_BEARER):
return True
# 2) HTTP Basic for humans
auth = request.authorization
if auth and ADMIN_PW_HASH and auth.username == ADMIN_USER:
try:
if check_password_hash(ADMIN_PW_HASH, auth.password):
return True
except Exception:
pass
return False
# -----------------------------------------------------------------------------
# Routes (pages)
# -----------------------------------------------------------------------------
@app.get("/")
def home():
return render_template("index.html", brand=BRAND, tagline=TAGLINE)
def require_admin(view):
@wraps(view)
def _wrapped(*args, **kwargs):
if _is_admin_request():
return view(*args, **kwargs)
return Response(
"Authentication required",
401,
{"WWW-Authenticate": 'Basic realm="BrookHaven Admin"'},
)
return _wrapped
@app.get("/about")
def about():
return render_template("about.html", brand=BRAND)
@app.get("/services")
def services():
return render_template("services.html", brand=BRAND)
@app.get("/work")
def work():
# Example case studies (could be a JSON file later)
cases = [
{
"title": "Tapdown Showdown — Cyber Sale Activation",
"desc": "Survey-powered mini-game; branded UI, MariaDB analytics, CPU mode for flaky networks.",
"bullets": ["90%+ survey completion", "UTM funnels to promo pages", "Works offline/kiosk"],
"image": "/static/brookhaven-case.jpg",
},
{
"title": "Kiosk Checkout Prototype",
"desc": "Self-serve event checkout with QR receipts and local-first sync.",
"bullets": ["Local cache", "Queue-busting UX", "Auto export"],
"image": "/static/brookhaven-kiosk.jpg",
},
]
return render_template("work.html", brand=BRAND, cases=cases)
@app.context_processor
def inject_contact():
return {"CONTACT": CONTACT}
@app.get("/contact.vcf")
def contact_vcf():
# Generate a simple vCard 3.0
n = CONTACT["name"]
parts = n.split(" ", 1)
last = parts[-1] if len(parts) > 1 else parts[0]
first = parts[0]
phone = CONTACT["phone"].replace(" ", "")
email = CONTACT["email"]
org = BRAND
title = CONTACT["title"]
url = CONTACT["site"]
city = CONTACT["city"]
vcard = f"""BEGIN:VCARD
VERSION:3.0
N:{last};{first};;;
FN:{n}
ORG:{org}
TITLE:{title}
TEL;TYPE=CELL,VOICE:{phone}
EMAIL;TYPE=INTERNET:{email}
URL:{url}
ADR;TYPE=WORK:;;{city};;;;
END:VCARD
"""
return (vcard, 200, {
"Content-Type": "text/vcard; charset=utf-8",
"Content-Disposition": 'attachment; filename="brookhaven-contact.vcf"',
})
@app.get("/contact")
def contact():
return render_template("contact.html", brand=BRAND)
@app.post("/contact")
def contact_post():
name = (request.form.get("name") or "").strip()
email = (request.form.get("email") or "").strip()
message = (request.form.get("message") or "").strip()
nda = "yes" if request.form.get("nda") in ("on", "yes", "true") else "no"
if not name or not email or not message:
flash("Please fill name, email, and a short description.", "error")
return redirect(url_for("contact"))
# Persist (best-effort)
meta = {"ua": request.headers.get("User-Agent", ""), "ip": request.remote_addr}
db = SessionLocal()
try:
db.add(Inquiry(name=name, email=email, message=message, nda=nda, meta=meta))
db.commit()
except SQLAlchemyError:
db.rollback()
app.logger.exception("Failed to write inquiry")
# keep going anyway to the thank-you page
return redirect(url_for("thanks"))
@app.get("/thanks")
def thanks():
return render_template("thanks.html", brand=BRAND)
# -----------------------------------------------------------------------------
# Admin (read-only)
# -----------------------------------------------------------------------------
@app.get("/admin/inquiries")
@require_admin
def admin_inquiries():
page = max(1, int(request.args.get("page", 1)))
per_page = 25
offset = (page - 1) * per_page
db = SessionLocal()
try:
rows = db.execute(
text("""SELECT id,name,email,message,nda,meta,created_at
FROM bh_inquiries ORDER BY created_at DESC
LIMIT :limit OFFSET :offset"""),
{"limit": per_page, "offset": offset}
).mappings().all()
total = db.execute(text("SELECT COUNT(*) FROM bh_inquiries")).scalar_one()
except Exception:
app.logger.exception("Query failed")
rows, total = [], 0
finally:
db.close()
# Convert meta JSON (string from PyMySQL) to dict
def parse_json(val):
if isinstance(val, dict): return val
if isinstance(val, str) and val:
try: return json.loads(val)
except Exception: return {}
return {}
processed = []
for r in rows:
processed.append({
"id": r["id"], "name": r["name"], "email": r["email"],
"message": r["message"], "nda": r["nda"],
"meta": parse_json(r["meta"]),
"created_at": r["created_at"],
})
pages = (total // per_page) + (1 if total % per_page else 0)
return render_template("admin_inquiries.html",
rows=processed, page=page, pages=pages, total=total, brand=BRAND)
@app.get("/admin/inquiries.csv")
@require_admin
def admin_inquiries_csv():
db = SessionLocal()
try:
rows = db.execute(
text("""SELECT id,name,email,message,nda,meta,created_at
FROM bh_inquiries ORDER BY created_at DESC""")
).mappings().all()
finally:
db.close()
import csv, io
buf = io.StringIO()
w = csv.writer(buf)
w.writerow(["id","name","email","nda","message","ua","ip","created_at"])
for r in rows:
meta = r["meta"]
if isinstance(meta, str):
try:
meta = json.loads(meta)
except Exception:
meta = {}
w.writerow([
r["id"], r["name"], r["email"], r["nda"],
(r["message"] or "").replace("\n"," ").strip(),
(meta or {}).get("ua",""), (meta or {}).get("ip",""),
r["created_at"],
])
out = buf.getvalue()
return out, 200, {
"Content-Type": "text/csv; charset=utf-8",
"Content-Disposition": "attachment; filename=brookhaven_inquiries.csv"
}
# -----------------------------------------------------------------------------
# Utilities
# -----------------------------------------------------------------------------
@app.get("/healthz")
def healthz():
return jsonify({"ok": True, "brand": BRAND}), 200
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5050, debug=False)