Files
2026-03-31_BattleRoyale/Assets/Scripts/prism_ingest.py
T
Raincloud 71e31c5703 prism ingest attempts
from 2 days previous
2026-04-25 13:42:15 -06:00

1339 lines
48 KiB
Python

#!/usr/bin/env python3
"""
Push Story/*.md Prism profiles to prism.tal.one via the production API.
Auth (pick one — use environment variables, never commit secrets):
PRISM_API_KEY If set, sends Authorization: Bearer <value> by default.
PRISM_AUTH_HEADER Optional full header value, e.g. "Bearer sk-..." or "ApiKey ...".
PRISM_SESSION_COOKIE Session only: the cookie *value* (script sends as session=<value>), or a full Cookie
fragment e.g. session=…; XSRF-TOKEN=… (used as-is — do not double-prefix).
Persistent local config (no PowerShell paste each run):
Create Assets/Scripts/prism.local.env (gitignored) with lines like PRISM_ALLOW_WRITE=1, PRISM_SESSION_COOKIE=…,
PRISM_CSRF_TOKEN=… (optional if bootstrap works), PRISM_API_KEY=… (best — skips CSRF entirely).
Or set PRISM_ENV_FILE to another path. Optional CLI: --prism-env-file PATH (must appear before the subcommand).
Real environment variables always override values from the file.
Optional:
PRISM_BASE_URL Default https://prism.tal.one
PRISM_PRODUCTION_ID Default 111
PRISM_ALLOW_WRITE Must be exactly "1" for sync --apply
PRISM_USER_AGENT Override browser-like User-Agent (default: Chrome on Windows)
PRISM_ORIGIN Default https://prism.tal.one (Origin + Referer for API calls)
PRISM_CSRF_TOKEN If writes return CSRF_ERROR with session cookie auth: paste token from DevTools, or rely on auto-bootstrap below.
PRISM_CSRF_HEADER Header name for that token (default X-CSRFToken — matches /static/js/csrf.js on prism.tal.one).
PRISM_CSRF_SEND_BOTH Set to 1 to also send X-CSRF-Token and X-XSRF-TOKEN with the same value (if a proxy strips one header).
Examples:
python Assets/Scripts/prism_ingest.py list
python Assets/Scripts/prism_ingest.py parse Story/Azure.md
python Assets/Scripts/prism_ingest.py parse Story/Azure.md --blob
python Assets/Scripts/prism_ingest.py inspect <CHARACTER_UUID>
python Assets/Scripts/prism_ingest.py sync
python Assets/Scripts/prism_ingest.py sync --map-md
$env:PRISM_ALLOW_WRITE='1'; $env:PRISM_API_KEY=''; python Assets/Scripts/prism_ingest.py sync --apply --map-md
Writes need a destination field on the character JSON. Run `inspect` on one id, pick a string or object slot
(e.g. internal notes), pass its dot-path as --attach-field. Use --experimental-merge only if our blob keys
match Prism's schema exactly (unlikely without talw's field map).
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import urllib.error
import urllib.request
from urllib.parse import urlparse
from http.cookiejar import CookieJar
from pathlib import Path
from typing import Any
from urllib.parse import unquote
FIELD_BULLET = re.compile(r"^-\s+\*\*(.+?)\*\*:\s*(.*)\s*$")
# Alternate: `- **Label:** value` (colon inside bold — common typo / variant)
FIELD_BULLET_COLON_IN_BOLD = re.compile(r"^-\s+\*\*(.+?):\*\*\s*(.*)\s*$")
REL_LINE = re.compile(r"^-\s+\*\*(.+?)\*\*\s+(.*)$")
HEADING = re.compile(r"^(#{2,3})\s+(.+?)\s*$")
_DEFAULT_UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
)
def _strip_prism_env_file_from_argv() -> Path | None:
"""Remove --prism-env-file PATH from sys.argv; return path if present."""
argv = sys.argv
out: list[str] = [argv[0]]
i = 1
found: Path | None = None
while i < len(argv):
a = argv[i]
if a == "--prism-env-file" and i + 1 < len(argv):
found = Path(argv[i + 1])
i += 2
continue
if a.startswith("--prism-env-file="):
found = Path(a.split("=", 1)[1])
i += 1
continue
out.append(a)
i += 1
if found is not None:
sys.argv[:] = out
return found
def _load_prism_env_file(path: Path, *, warn_missing: bool) -> None:
"""Set os.environ for keys from a simple KEY=value file. Does not override existing env."""
if not path.is_file():
if warn_missing:
print(f"Prism: env file not found: {path}", file=sys.stderr)
return
raw = path.read_text(encoding="utf-8")
for line in raw.splitlines():
s = line.strip()
if not s or s.startswith("#"):
continue
if "=" not in s:
continue
k, _, v = s.partition("=")
k = k.strip()
v = v.strip()
if not k:
continue
if len(v) >= 2 and v[0] == v[-1] and v[0] in "\"'":
v = v[1:-1]
if k not in os.environ:
os.environ[k] = v
def _apply_prism_env_files() -> None:
cli_path = _strip_prism_env_file_from_argv()
explicit = bool(cli_path) or bool(os.environ.get("PRISM_ENV_FILE", "").strip())
path = cli_path
if path is None:
e = os.environ.get("PRISM_ENV_FILE", "").strip()
if e:
path = Path(e)
if path is None:
default = Path(__file__).resolve().parent / "prism.local.env"
if default.is_file():
path = default
explicit = False
if path is not None:
_load_prism_env_file(path, warn_missing=explicit)
def _cookie_header_from_session_env() -> str:
"""Build Cookie header from PRISM_SESSION_COOKIE without duplicating session=."""
s = os.environ.get("PRISM_SESSION_COOKIE", "").strip()
if not s:
return ""
sl = s.lower()
if sl.startswith("session=") or ";" in s or s.count("=") > 1:
return s
return f"session={s}"
def _auth_headers() -> dict[str, str]:
"""Authorization / session only (no browser fingerprint)."""
h: dict[str, str] = {}
custom = os.environ.get("PRISM_AUTH_HEADER")
if custom:
if ":" in custom:
name, _, val = custom.partition(":")
h[name.strip()] = val.strip()
else:
h["Authorization"] = custom.strip()
return h
key = os.environ.get("PRISM_API_KEY", "").strip()
if key:
h["Authorization"] = f"Bearer {key}"
return h
ck = _cookie_header_from_session_env()
if ck:
h["Cookie"] = ck
return h
return h
def _full_headers(*, json_body: bool) -> dict[str, str]:
"""
Prism's edge often returns HTTP 403 + HTML 'Connection Error' for bare urllib clients.
Send the same shape of headers the web app uses.
"""
origin = os.environ.get("PRISM_ORIGIN", "https://prism.tal.one").rstrip("/")
h: dict[str, str] = {
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US,en;q=0.9",
"User-Agent": os.environ.get("PRISM_USER_AGENT", _DEFAULT_UA).strip() or _DEFAULT_UA,
"Origin": origin,
"Referer": os.environ.get("PRISM_REFERER", f"{origin}/productions/"),
}
h.update(_auth_headers())
if json_body:
h["Content-Type"] = "application/json"
return h
_csrf_extra_cache: dict[str, str] | None = None
_csrf_extra_pid: str | None = None
def _session_cookie_auth() -> bool:
return bool(os.environ.get("PRISM_SESSION_COOKIE", "").strip())
def _api_key_auth() -> bool:
return bool(os.environ.get("PRISM_API_KEY", "").strip() or os.environ.get("PRISM_AUTH_HEADER", "").strip())
def _bootstrap_write_extras(production_id: str) -> dict[str, str]:
"""
Hit a same-origin HTML route so Set-Cookie can attach XSRF (etc.).
Returns extra headers for mutating API calls: optional Cookie merge + CSRF header.
"""
origin = os.environ.get("PRISM_ORIGIN", "https://prism.tal.one").rstrip("/")
paths = (f"/productions/{production_id}", "/productions", "/")
base_h = _full_headers(json_body=False)
base_sess = _cookie_header_from_session_env()
for path in paths:
url = f"{origin}{path}"
jar = CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar))
req = urllib.request.Request(url, headers=base_h)
try:
with opener.open(req, timeout=60) as resp:
html = resp.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as e:
html = e.read().decode("utf-8", errors="replace")
except OSError:
continue
from_jar = [f"{c.name}={c.value}" for c in jar]
xsrf: str | None = None
for c in jar:
norm = re.sub(r"[_-]", "", c.name.lower())
if norm in ("xsrftoken", "csrftoken", "csrf"):
xsrf = unquote(c.value)
break
if not xsrf:
m = re.search(
r'<meta\s+name=["\']csrf-token["\']\s+content=["\']([^"\']+)["\']',
html,
re.I,
)
if m:
xsrf = m.group(1)
if not xsrf:
m = re.search(r'"csrfToken"\s*:\s*"([^"]+)"', html)
if m:
xsrf = m.group(1)
out: dict[str, str] = {}
if from_jar:
tail = "; ".join(from_jar)
out["Cookie"] = f"{base_sess}; {tail}" if base_sess else tail
if xsrf:
hname = os.environ.get("PRISM_CSRF_HEADER", "X-CSRFToken").strip() or "X-CSRFToken"
out[hname] = xsrf
if os.environ.get("PRISM_CSRF_SEND_BOTH") == "1":
for alt in ("X-CSRFToken", "X-CSRF-Token", "X-XSRF-TOKEN"):
if alt.lower() != hname.lower():
out[alt] = xsrf
if out:
return out
return {}
def _csrf_headers_for_write(*, production_id: str) -> dict[str, str]:
global _csrf_extra_cache, _csrf_extra_pid
if _csrf_extra_cache is not None and _csrf_extra_pid == production_id:
return _csrf_extra_cache
out: dict[str, str] = {}
if _api_key_auth():
_csrf_extra_cache, _csrf_extra_pid = out, production_id
return out
hname = os.environ.get("PRISM_CSRF_HEADER", "X-CSRFToken").strip() or "X-CSRFToken"
tok = os.environ.get("PRISM_CSRF_TOKEN", "").strip()
if _session_cookie_auth():
out = _bootstrap_write_extras(production_id)
if tok:
out = dict(out)
out[hname] = tok
_csrf_extra_cache, _csrf_extra_pid = out, production_id
return out
def _req(
method: str,
url: str,
data: bytes | None = None,
headers: dict[str, str] | None = None,
) -> tuple[int, str]:
hdrs = dict(headers or {})
r = urllib.request.Request(url, data=data, method=method, headers=hdrs)
try:
with urllib.request.urlopen(r, timeout=120) as resp:
return resp.getcode(), resp.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
return e.code, body
except urllib.error.URLError as e:
host = urlparse(url).hostname or "?"
reason = repr(e.reason)
hint = ""
if "getaddrinfo" in reason or "11001" in reason or "Name or service not known" in reason:
hint = (
f"\n DNS could not resolve host {host!r}. Fix PRISM_BASE_URL to match the site you open in the "
f'browser (no typos), check network/VPN, then: Resolve-DnsName {host} in PowerShell.'
)
print(f"Request failed {method} {url}\n {e!r}{hint}", file=sys.stderr)
raise SystemExit(1) from e
def _normalize_list_payload(data: Any) -> list[dict[str, Any]]:
if isinstance(data, list):
raw = [x for x in data if isinstance(x, dict)]
elif isinstance(data, dict):
raw = []
for key in ("characters", "data", "items", "results", "rows"):
v = data.get(key)
if isinstance(v, list):
raw = [x for x in v if isinstance(x, dict)]
break
else:
raise ValueError(f"Unexpected list JSON shape: {type(data).__name__}")
return [_unwrap_row(x) for x in raw]
def _unwrap_row(obj: dict[str, Any]) -> dict[str, Any]:
"""List items may be `{ \"data\": { ...character } }` like detail responses."""
inner = obj.get("data")
if isinstance(inner, dict):
out = dict(inner)
out.setdefault("uuid", obj.get("uuid"))
return out
return obj
def _char_id(obj: dict[str, Any]) -> str | None:
# Prefer uuid — numeric `id` breaks /characters/{id} routes on Prism.
for k in ("uuid", "characterId", "character_id", "_id", "id"):
v = obj.get(k)
if v is not None and str(v).strip():
return str(v)
return None
def _char_name(obj: dict[str, Any]) -> str | None:
for k in (
"display_name",
"canonical_name",
"name",
"title",
"characterName",
"displayName",
"fullName",
"label",
"slug",
):
v = obj.get(k)
if isinstance(v, str) and v.strip():
return v.strip()
return None
def _norm_key(s: str) -> str:
return re.sub(r"[^a-z0-9]+", "", s.lower())
def _load_match_config(path: Path | None) -> tuple[dict[str, str], dict[str, str]]:
"""Returns (filename_to_prism_display_name, filename_to_uuid)."""
if not path or not path.is_file():
return {}, {}
data = json.loads(path.read_text(encoding="utf-8"))
raw = data.get("filename_to_prism_name", data)
names = {str(k): str(v) for k, v in raw.items() if not str(k).startswith("_")}
uuids = {str(k): str(v) for k, v in data.get("filename_to_uuid", {}).items()}
return names, uuids
SYNC_EXCLUDE_FILES = frozenset(
{
"Long description.md",
"README.md",
"Worldbuilding.md",
"PRISM_FORMAT.md",
"Cameo-Creatures.md",
}
)
def _parse_relationship_tail(tail: str) -> tuple[str, str]:
"""Split em-dash-separated tail (after `- **Name**`) into type + remainder."""
parts = [p.strip() for p in re.split(r"\s*—\s*", tail) if p.strip()]
if not parts:
return "", ""
if len(parts) == 1:
return parts[0], ""
return parts[0], "".join(parts[1:])
def _parse_section_block(block: list[str]) -> dict[str, Any]:
"""Split a ## section body into ### subsections, or loose lines (e.g. Relationships)."""
subsections: dict[str, Any] = {}
current_sub: str | None = None
buf: list[str] = []
for line in block:
if line.startswith("### ") and not line.startswith("####"):
if current_sub is not None:
subsections[current_sub] = _parse_subsection(buf)
current_sub = line[4:].strip()
buf = []
else:
buf.append(line)
if current_sub is not None:
subsections[current_sub] = _parse_subsection(buf)
elif buf:
subsections["_lines"] = [x for x in buf if x.strip()]
return subsections
def parse_character_md(text: str, source: str) -> dict[str, Any]:
lines = text.splitlines()
stop = len(lines)
for i, line in enumerate(lines):
if line.strip() == "## Extended Canon Notes":
stop = i
break
lines = lines[:stop]
title = "Untitled"
off = 0
if lines and lines[0].startswith("# "):
title = lines[0][2:].strip()
off = 1
sections: dict[str, Any] = {}
i = off
while i < len(lines):
line = lines[i]
if not line.startswith("## ") or line.startswith("###"):
i += 1
continue
sec_name = line[3:].strip()
i += 1
block: list[str] = []
while i < len(lines):
if lines[i].startswith("## ") and not lines[i].startswith("###"):
break
block.append(lines[i])
i += 1
sections[sec_name] = _parse_section_block(block)
rel_rows: list[dict[str, str]] = []
rel_sec = sections.get("Relationships")
if isinstance(rel_sec, dict):
for line in rel_sec.get("_lines", []):
m = REL_LINE.match(line.strip())
if not m:
continue
related = m.group(1).strip()
tail = m.group(2).strip()
rtype, details = _parse_relationship_tail(tail)
rel_rows.append(
{
"related": related,
"relationship_type": rtype,
"label_or_details": details,
}
)
sections["Relationships"] = {"rows": rel_rows} if rel_rows else {"rows": []}
out: dict[str, Any] = {
"meta": {"source_file": source, "title": title},
"sections": sections,
}
return out
def _parse_subsection(lines: list[str]) -> Any:
fields: dict[str, Any] = {}
narrative: list[str] = []
generic_bullets: list[str] = []
pending_list_key: str | None = None
for line in lines:
if pending_list_key:
msub = re.match(r"^\s+-\s+(.+)$", line)
if msub:
entry = msub.group(1).strip()
bucket = fields[pending_list_key]
if isinstance(bucket, list):
bucket.append(entry)
continue
pending_list_key = None
fm = FIELD_BULLET.match(line)
if not fm:
fm = FIELD_BULLET_COLON_IN_BOLD.match(line)
if fm:
key, val = fm.group(1).strip(), fm.group(2).strip()
if not val and key == "Catch Phrases":
fields[key] = []
pending_list_key = key
else:
fields[key] = val
continue
gm = re.match(r"^-\s+(.+)$", line)
if gm and "**" not in line:
generic_bullets.append(gm.group(1).strip())
continue
if line.strip():
narrative.append(line.rstrip())
sec: dict[str, Any] = {"fields": fields}
if narrative:
sec["narrative"] = "\n".join(narrative).strip()
if generic_bullets:
sec["bullets"] = generic_bullets
if not fields and not narrative and not generic_bullets:
return {}
if not fields and len(sec) == 1 and "narrative" in sec:
return sec["narrative"]
return sec
def intermediate_to_prism_profile_blob(parsed: dict[str, Any]) -> dict[str, Any]:
"""Flatten nested 'sections' into a single JSON object for API merge experiments."""
blob: dict[str, Any] = {"meta": parsed.get("meta", {})}
sections = parsed.get("sections") or {}
for sec_name, sub in sections.items():
if not isinstance(sub, dict):
continue
if sec_name == "Relationships" and "rows" in sub:
blob["relationships"] = sub["rows"]
continue
bucket: dict[str, Any] = {}
for sub_name, content in sub.items():
if sub_name.startswith("_"):
continue
bucket[sub_name] = content
key = sec_name.lower().replace(" ", "_").replace("/", "_")
blob[key] = bucket
return blob
# --- Prism API `data` shape (from GET /characters/{uuid}); field names are snake_case. ---
_PROTECTED_DATA_KEYS = frozenset(
{
"uuid",
"id",
"slug",
"portrait_url",
"production_id",
"created_at",
"updated_at",
"role_type",
"color",
"color_index",
"actor",
}
)
_ATTR_FROM_LABELS: dict[str, str] = {
"Height": "height",
"Weight": "weight",
"Build": "build",
"Hair Color": "hair_color",
"Hair Style": "hair_style",
"Eye Color": "eye_color",
"Skin Tone": "skin_tone",
"Distinguishing Marks": "distinguishing_marks",
"Sex": "sex",
"Age": "age",
"Birth Year": "birth_year",
"Birth Place": "birth_place",
"Ethnicity": "ethnicity",
"Nationality": "nationality",
"Dominant Hand": "dominant_hand",
"Accent / Dialect": "accent",
"Voice Quality": "voice_quality",
"Gait / Movement": "gait",
}
def _blob_nav(blob: dict[str, Any], *segments: str) -> Any:
d: Any = blob
for s in segments:
if not isinstance(d, dict):
return None
d = d.get(s)
return d
def _blob_field_map(blob: dict[str, Any], *segments: str) -> dict[str, Any]:
sub = _blob_nav(blob, *segments)
if isinstance(sub, dict) and isinstance(sub.get("fields"), dict):
return dict(sub["fields"])
return {}
def _blob_narrative(blob: dict[str, Any], *segments: str) -> str:
sub = _blob_nav(blob, *segments)
if isinstance(sub, str):
return sub.strip()
if isinstance(sub, dict):
n = sub.get("narrative")
if isinstance(n, str) and n.strip():
return n.strip()
return ""
def _blob_bullets(blob: dict[str, Any], *segments: str) -> list[str]:
sub = _blob_nav(blob, *segments)
if isinstance(sub, dict) and isinstance(sub.get("bullets"), list):
return [str(b).strip() for b in sub["bullets"] if str(b).strip()]
return []
def _blob_combined_text(blob: dict[str, Any], *segments: str) -> str:
"""Narrative + bullets + non-empty field lines (for rich subsections)."""
sub = _blob_nav(blob, *segments)
if isinstance(sub, str):
return sub.strip()
if not isinstance(sub, dict):
return ""
parts: list[str] = []
n = sub.get("narrative")
if isinstance(n, str) and n.strip():
parts.append(n.strip())
for b in sub.get("bullets") or []:
if str(b).strip():
parts.append(str(b).strip())
for fk, fv in (sub.get("fields") or {}).items():
if fv is not None and str(fv).strip():
parts.append(f"**{fk}:** {fv}".strip())
return "\n\n".join(parts).strip()
def _semi_split(s: str) -> list[str]:
return [x.strip() for x in s.split(";") if x.strip()]
def _catch_phrase_list(raw: str) -> list[str]:
if not raw or not str(raw).strip():
return []
t = str(raw).strip().lower()
if t in ("", "-", "*(silent)*", "(silent)", "*silent*", "*(silent)*"):
return []
if "silent" in t and len(t) < 24:
return []
out: list[str] = []
for p in _semi_split(raw):
q = p.strip()
if len(q) >= 2 and q[0] == q[-1] and q[0] in "\"'":
q = q[1:-1]
out.append(q)
return out
def _dept_key(label: str) -> str:
return re.sub(r"[^a-z0-9]+", "_", label.lower()).strip("_") or "note"
def blob_to_prism_data_patch(blob: dict[str, Any], *, title: str) -> dict[str, Any]:
"""Build a partial `data` object for Prism from intermediate markdown blob (no meta)."""
patch: dict[str, Any] = {}
attrs: dict[str, str] = {}
for sub in ("Physical Attributes", "Identity (physical-form)"):
fm = _blob_field_map(blob, "appearance", sub)
for label, api in _ATTR_FROM_LABELS.items():
if api in attrs:
continue
v = fm.get(label)
if v is not None and str(v).strip():
attrs[api] = str(v).strip().lower() if api == "sex" else str(v).strip()
prod_fm = _blob_field_map(blob, "appearance", "Production (physical-form)")
for label, api in (
("Dominant Hand", "dominant_hand"),
("Accent / Dialect", "accent"),
("Voice Quality", "voice_quality"),
("Gait / Movement", "gait"),
):
v = prod_fm.get(label)
if v is not None and str(v).strip():
attrs[api] = str(v).strip()
if attrs:
vq = attrs.get("voice_quality")
if vq:
attrs["voice"] = {"instructions": vq}
attrs["voice_instructions"] = vq
patch["attributes"] = attrs
phys_desc = _blob_narrative(blob, "appearance", "Physical Description") or _blob_combined_text(
blob, "appearance", "Physical Description"
)
if phys_desc:
patch["physical_description"] = phys_desc
personality = _blob_combined_text(blob, "identity", "Personality")
if personality:
patch["personality"] = personality
bg = _blob_combined_text(blob, "identity", "Background / Backstory")
ext_hist = _blob_combined_text(blob, "arc", "Extended History")
if bg and ext_hist:
patch["background_story"] = f"{bg}\n\n---\n\n{ext_hist}"
elif bg:
patch["background_story"] = bg
elif ext_hist:
patch["background_story"] = ext_hist
costume = _blob_combined_text(blob, "wardrobe", "Costume Notes")
era = _blob_combined_text(blob, "wardrobe", "Era / Period")
if costume and era:
patch["costume_notes"] = f"{costume}\n\n*Era / period:* {era}"
elif costume:
patch["costume_notes"] = costume
elif era:
patch["costume_notes"] = f"*Era / period:* {era}"
makeup = _blob_combined_text(blob, "wardrobe", "Makeup Notes")
if makeup:
patch["makeup_notes"] = makeup
drive_fm = _blob_field_map(blob, "arc", "Drive")
mot_parts: list[str] = []
for key in ("Motivation", "Want", "Need"):
bit = drive_fm.get(key)
if bit is not None and str(bit).strip():
if key == "Motivation":
mot_parts.append(str(bit).strip())
else:
mot_parts.append(f"{key}: {str(bit).strip()}")
if mot_parts:
patch["motivation"] = "\n".join(mot_parts)
opp_fm = _blob_field_map(blob, "arc", "Opposition")
if opp_fm.get("Conflict") and str(opp_fm["Conflict"]).strip():
patch["conflict"] = str(opp_fm["Conflict"]).strip()
change_fm = _blob_field_map(blob, "arc", "Change")
if change_fm.get("Transformation") and str(change_fm["Transformation"]).strip():
patch["transformation"] = str(change_fm["Transformation"]).strip()
traits_fm = _blob_field_map(blob, "identity", "Traits")
if traits_fm.get("Interests") and str(traits_fm["Interests"]).strip():
patch["interests"] = _semi_split(str(traits_fm["Interests"]))
speech_fm = _blob_field_map(blob, "identity", "Speech")
extra: dict[str, Any] = {}
cps = _catch_phrase_list(str(speech_fm.get("Catch Phrases", "") or ""))
if cps:
extra["catch_phrases"] = cps
acc_lines = _blob_bullets(blob, "wardrobe", "Accessories & Props")
acc_fm = _blob_field_map(blob, "wardrobe", "Accessories & Props")
for _k, v in acc_fm.items():
if v is not None and str(v).strip():
acc_lines.append(str(v).strip())
if acc_lines:
extra["accessories"] = acc_lines
if traits_fm.get("Quirks & Habits") and str(traits_fm["Quirks & Habits"]).strip():
extra["quirks_habits"] = str(traits_fm["Quirks & Habits"]).strip()
if traits_fm.get("Fears") and str(traits_fm["Fears"]).strip():
extra["fears"] = str(traits_fm["Fears"]).strip()
if traits_fm.get("Skills & Abilities") and str(traits_fm["Skills & Abilities"]).strip():
extra["skills_abilities"] = str(traits_fm["Skills & Abilities"]).strip()
if opp_fm.get("Lie They Believe") and str(opp_fm["Lie They Believe"]).strip():
extra["lie_they_believe"] = str(opp_fm["Lie They Believe"]).strip()
if opp_fm.get("Ghost") and str(opp_fm["Ghost"]).strip():
extra["ghost"] = str(opp_fm["Ghost"]).strip()
if change_fm.get("Arc Type") and str(change_fm["Arc Type"]).strip():
extra["arc_type"] = str(change_fm["Arc Type"]).strip()
req_fm = _blob_field_map(blob, "production", "Requirements")
if req_fm.get("Stunt Requirements") and str(req_fm["Stunt Requirements"]).strip():
extra["stunt_requirements"] = str(req_fm["Stunt Requirements"]).strip()
if req_fm.get("SFX Requirements") and str(req_fm["SFX Requirements"]).strip():
extra["sfx_requirements"] = str(req_fm["SFX Requirements"]).strip()
dept_fm = _blob_field_map(blob, "production", "Department Notes")
if dept_fm:
dn: dict[str, str] = {}
for k, v in dept_fm.items():
if v is not None and str(v).strip():
dn[_dept_key(str(k))] = str(v).strip()
if dn:
patch["department_notes"] = dn
cont = _blob_combined_text(blob, "production", "Continuity Notes")
if cont:
patch["continuity_notes"] = cont
rel_rows = blob.get("relationships")
if isinstance(rel_rows, list) and rel_rows:
lines = []
for row in rel_rows:
if not isinstance(row, dict):
continue
r = row.get("related", "")
rt = row.get("relationship_type", "")
ld = row.get("label_or_details", "")
if ld:
lines.append(f"**{r}** — {rt}{ld}")
elif rt:
lines.append(f"**{r}** — {rt}")
else:
lines.append(f"**{r}**")
if lines:
extra["relationships_md"] = "\n".join(lines)
if extra:
patch["extra_data"] = extra
if title:
patch["display_name"] = title.strip()
patch["canonical_name"] = title.strip().upper()
return patch
def _apply_prism_data_patch(inner: dict[str, Any], patch: dict[str, Any]) -> dict[str, Any]:
"""Merge markdown-derived patch into existing Prism `data` without clobbering protected keys."""
out = dict(inner)
for key, val in patch.items():
if key in _PROTECTED_DATA_KEYS:
continue
if val in (None, "", [], {}):
continue
if key == "attributes" and isinstance(val, dict):
base_a = out.get("attributes")
merged_a = dict(base_a) if isinstance(base_a, dict) else {}
for ak, av in val.items():
if av not in (None, "", [], {}):
merged_a[ak] = av
out["attributes"] = merged_a
continue
if key == "extra_data" and isinstance(val, dict):
base_e = out.get("extra_data")
merged_e = dict(base_e) if isinstance(base_e, dict) else {}
for ek, ev in val.items():
if ev in (None, "", [], {}):
continue
merged_e[ek] = ev
out["extra_data"] = merged_e
continue
if key == "department_notes" and isinstance(val, dict):
base_d = out.get("department_notes")
merged_d = dict(base_d) if isinstance(base_d, dict) else {}
merged_d.update(val)
out["department_notes"] = merged_d
continue
if key == "voice" and isinstance(val, dict):
base_v = out.get("voice")
if isinstance(base_v, dict):
nv = dict(base_v)
nv.update(val)
out["voice"] = nv
else:
out["voice"] = val
continue
out[key] = val
return out
_READONLY_CHARACTER_FIELDS = frozenset({"created_at", "updated_at"})
def _strip_readonly_character_fields(data: dict[str, Any]) -> dict[str, Any]:
out = dict(data)
for k in _READONLY_CHARACTER_FIELDS:
out.pop(k, None)
return out
def _prism_write_payload(merged: dict[str, Any]) -> dict[str, Any]:
"""Body for PUT/PATCH: never send GET-only keys like `success`; strip server timestamps from `data`."""
if isinstance(merged.get("data"), dict):
body: dict[str, Any] = {
"data": _strip_readonly_character_fields(dict(merged["data"])),
}
for k, v in merged.items():
if k in ("data", "success"):
continue
body[k] = v
return body
return {k: v for k, v in merged.items() if k != "success"}
SYNC_SKIP_MANUAL = frozenset(
{"Adrian.md", "Agate.md", "Azure.md", "Beanie.md", "RaincloudTheDragon.md"}
)
def keys_to_camel(o: Any) -> Any:
if isinstance(o, dict):
return {_camel(k): keys_to_camel(v) for k, v in o.items()}
if isinstance(o, list):
return [keys_to_camel(x) for x in o]
return o
def _camel(s: str) -> str:
parts = re.split(r"[_\s]+", s)
out = []
for i, p in enumerate(parts):
if not p:
continue
if i == 0:
out.append(p[0].lower() + p[1:])
else:
out.append(p[0].upper() + p[1:])
return "".join(out) if out else s
def _slug_compact(s: str) -> str:
return _norm_key(s.replace("-", "").replace("_", ""))
def _display_key(s: str) -> str:
"""Normalize display strings so 'Heart & Mind' and 'Heart and Mind' match."""
return _norm_key(s.lower().replace("&", " and "))
def _match_remote(
parsed: dict[str, Any],
path: Path,
remote: list[dict[str, Any]],
overrides: dict[str, str],
uuid_overrides: dict[str, str],
) -> dict[str, Any] | None:
if path.name in uuid_overrides:
want = str(uuid_overrides[path.name]).strip().lower()
for ch in remote:
cid = (_char_id(ch) or "").strip().lower()
if cid == want:
return ch
return None
stem_slug = path.stem.lower().replace("_", "-").replace(" ", "-")
stem_c = _slug_compact(path.stem)
if path.name in overrides:
target = overrides[path.name].strip()
tn = _norm_key(target)
tdk = _display_key(target)
for ch in remote:
slug = (ch.get("slug") or "").strip().lower()
name = (_char_name(ch) or "").strip()
nn = _norm_key(name)
slug_c = _slug_compact(slug)
if name.lower() == target.lower() or (name and tdk == _display_key(name)):
return ch
if nn and nn == tn:
return ch
if slug and (slug == stem_slug or slug_c == stem_c):
return ch
if slug_c and slug_c == _slug_compact(target):
return ch
return None
target = parsed["meta"].get("title") or path.stem
if (parsed["meta"].get("title") == "Untitled" or not str(target).strip()) and path.suffix == ".md":
return None
tn = _norm_key(target)
best = None
best_score = -1
for ch in remote:
slug = (ch.get("slug") or "").strip().lower()
slug_c = _slug_compact(slug)
name = _char_name(ch) or ""
nn = _norm_key(name)
score = 0
if slug:
if slug == stem_slug or slug_c == stem_c or _norm_key(slug) == _norm_key(path.stem):
score = 93
elif slug.replace("-", "") == stem_slug.replace("-", ""):
score = 88
if nn:
if nn == tn:
score = max(score, 100)
elif tn in nn or nn in tn:
lo, hi = (nn, tn) if len(nn) <= len(tn) else (tn, nn)
if len(hi) and len(lo) >= 0.72 * len(hi):
score = max(score, 80)
elif len(tn) >= 4 and len(nn) >= 4 and tn[:4] == nn[:4]:
score = max(score, 40)
if score > best_score:
best_score = score
best = ch
return best if best_score >= 80 else None
def cmd_list(args: argparse.Namespace) -> int:
if not _auth_headers():
print(
"Warning: no PRISM_API_KEY, PRISM_AUTH_HEADER, or PRISM_SESSION_COOKIE in environment.",
file=sys.stderr,
)
base = os.environ.get("PRISM_BASE_URL", "https://prism.tal.one").rstrip("/")
pid = args.production or os.environ.get("PRISM_PRODUCTION_ID", "111")
url = f"{base}/api/v2/productions/{pid}/characters?sort=prominence"
code, body = _req("GET", url, headers=_full_headers(json_body=False))
print(f"HTTP {code} {url}")
try:
data = json.loads(body)
except json.JSONDecodeError:
if body.lstrip().startswith("<!") or "Connection Error" in body[:500]:
print(
"Got HTML instead of JSON — usually Cloudflare/WAF blocking non-browser clients.\n"
"This script now sends a Chrome User-Agent + Origin/Referer. Retry.\n"
"If it persists: set PRISM_USER_AGENT to match your real browser (DevTools → Network → "
"User-Agent), confirm PRISM_API_KEY or PRISM_SESSION_COOKIE is set in this shell, "
"or ask talw whether API keys must use a different host/header.",
file=sys.stderr,
)
print(body[:2000])
return 1
if code != 200:
print(json.dumps(data, indent=2) if isinstance(data, (dict, list)) else body[:2000])
return 1
chars = _normalize_list_payload(data)
for ch in chars:
cid = _char_id(ch)
name = _char_name(ch)
print(f"{cid}\t{name}")
return 0
def cmd_inspect(args: argparse.Namespace) -> int:
base = os.environ.get("PRISM_BASE_URL", "https://prism.tal.one").rstrip("/")
pid = args.production or os.environ.get("PRISM_PRODUCTION_ID", "111")
cid = args.character_id
headers = _full_headers(json_body=False)
urls = [
f"{base}/api/v2/productions/{pid}/characters/{cid}",
f"{base}/api/v2/productions/{pid}/characters/{cid}/",
]
for url in urls:
code, body = _req("GET", url, headers=headers)
print(f"# {code} GET {url}")
try:
data = json.loads(body)
print(json.dumps(data, indent=2)[:12000])
except json.JSONDecodeError:
print(body[:4000])
if code == 200:
return 0
return 1
def cmd_parse(args: argparse.Namespace) -> int:
path = Path(args.path)
text = path.read_text(encoding="utf-8")
parsed = parse_character_md(text, str(path))
if args.blob:
out = intermediate_to_prism_profile_blob(parsed)
print(json.dumps(out, indent=2, ensure_ascii=False))
else:
print(json.dumps(parsed, indent=2, ensure_ascii=False))
return 0
def cmd_sync(args: argparse.Namespace) -> int:
if args.apply and os.environ.get("PRISM_ALLOW_WRITE") != "1":
print("Refusing write: set PRISM_ALLOW_WRITE=1 for sync --apply", file=sys.stderr)
return 2
if args.apply and not args.attach_field and not args.experimental_merge and not args.map_md:
print(
"sync --apply needs one mode:\n"
" --map-md Map markdown → Prism `data` fields (recommended)\n"
" --attach-field <path> Store raw blob at a dot path on the API envelope\n"
" --experimental-merge Deep-merge intermediate blob (almost never correct)\n",
file=sys.stderr,
)
return 2
base = os.environ.get("PRISM_BASE_URL", "https://prism.tal.one").rstrip("/")
pid = args.production or os.environ.get("PRISM_PRODUCTION_ID", "111")
list_url = f"{base}/api/v2/productions/{pid}/characters?sort=prominence"
code, body = _req("GET", list_url, headers=_full_headers(json_body=False))
if code != 200:
print(f"List failed HTTP {code}: {body[:1500]}", file=sys.stderr)
return 1
remote = _normalize_list_payload(json.loads(body))
cfg_path = Path(args.match_profile) if args.match_profile else Path("Assets/Scripts/prism_match_overrides.json")
overrides, uuid_overrides = _load_match_config(cfg_path)
story = Path(args.story_root)
paths = sorted(story.glob("*.md"))
paths = [p for p in paths if p.name not in SYNC_EXCLUDE_FILES]
paths += sorted((story / "Cameos").glob("*.md")) if (story / "Cameos").is_dir() else []
if args.only:
filt = set(args.only)
paths = [p for p in paths if p.name in filt]
write_json_headers: dict[str, str] | None = None
if args.apply:
write_json_headers = dict(_full_headers(json_body=True))
csrf_extra = _csrf_headers_for_write(production_id=str(pid))
write_json_headers.update(csrf_extra)
web_origin = os.environ.get("PRISM_ORIGIN", base).rstrip("/")
if _session_cookie_auth() and not _api_key_auth():
write_json_headers["Referer"] = os.environ.get(
"PRISM_REFERER",
f"{web_origin}/productions/{pid}/",
)
if _session_cookie_auth() and not _api_key_auth() and not csrf_extra:
print(
"Warning: session auth but CSRF bootstrap found no token; writes may fail with CSRF_ERROR. "
"Set PRISM_CSRF_TOKEN (or add it to prism.local.env) or use PRISM_API_KEY if available.",
file=sys.stderr,
)
for path in paths:
if not args.no_skip_done and path.name in SYNC_SKIP_MANUAL:
print(f"SKIP manual-done\t{path.name}", file=sys.stderr)
continue
parsed = parse_character_md(path.read_text(encoding="utf-8"), str(path))
blob = intermediate_to_prism_profile_blob(parsed)
blob.pop("meta", None)
if args.camel and not args.map_md:
blob = keys_to_camel(blob)
match = _match_remote(parsed, path, remote, overrides, uuid_overrides)
if not match:
print(f"SKIP no match\t{path.name}\ttitle={parsed['meta'].get('title')}", file=sys.stderr)
continue
cid = _char_id(match)
name = _char_name(match)
detail_url = f"{base}/api/v2/productions/{pid}/characters/{cid}"
title = parsed["meta"].get("title") or path.stem
prism_patch = blob_to_prism_data_patch(blob, title=title) if args.map_md else {}
if not args.apply:
if args.map_md:
print(
f"DRY-MAP\t{path.name}\t->\t{name}\t({cid})\t"
f"patch_keys={list(prism_patch.keys())}",
file=sys.stderr,
)
if args.verbose:
print(json.dumps(prism_patch, indent=2, ensure_ascii=False)[:12000])
else:
print(f"DRY\t{path.name}\t->\t{name}\t({cid})\tkeys={len(json.dumps(blob))}b")
if args.verbose:
print(json.dumps(blob, indent=2, ensure_ascii=False)[:8000])
continue
gc, gbody = _req("GET", detail_url, headers=_full_headers(json_body=False))
if gc != 200:
print(f"GET fail {path.name} HTTP {gc} {gbody[:500]}", file=sys.stderr)
continue
existing = json.loads(gbody)
if args.map_md:
if not isinstance(existing.get("data"), dict):
print(f"SKIP bad envelope {path.name}", file=sys.stderr)
continue
merged = dict(existing)
merged["data"] = _apply_prism_data_patch(existing["data"], prism_patch)
elif args.experimental_merge:
merged = _merge_experimental(existing, blob)
else:
merged = dict(existing)
_set_dotted(
merged,
args.attach_field,
_attach_value(merged, args.attach_field, blob),
)
assert write_json_headers is not None
write_obj = _prism_write_payload(merged)
payload = json.dumps(write_obj, ensure_ascii=False).encode("utf-8")
pc, pbody = _req("PUT", detail_url, data=payload, headers=write_json_headers)
if pc == 400 and args.map_md and isinstance(write_obj.get("data"), dict):
flat = json.dumps(write_obj["data"], ensure_ascii=False).encode("utf-8")
pc, pbody = _req("PUT", detail_url, data=flat, headers=write_json_headers)
if pc not in (200, 204):
print(
f"{path.name}\tPUT {pc}\t(response {min(len(pbody), 2000)} chars)\t{pbody[:2000]}",
file=sys.stderr,
)
if "CSRF_ERROR" in pbody:
print(
" → CSRF rejected: refresh PRISM_CSRF_TOKEN (meta csrf-token) and ensure PRISM_SESSION_COOKIE "
"matches the same browser session (include cf_clearance if the browser sends it). "
"Or use PRISM_API_KEY. If using prism.local.env, edit that file — real env vars override it.",
file=sys.stderr,
)
patch_body = _minimal_patch(args, write_obj, prism_patch if args.map_md else None)
pc2, pbody2 = _req(
"PATCH",
detail_url,
data=json.dumps(patch_body, ensure_ascii=False).encode("utf-8"),
headers=write_json_headers,
)
if pc2 not in (200, 204):
print(
f"{path.name}\tPATCH {pc2}\t{pbody2[:2000]}",
file=sys.stderr,
)
else:
print(f"OK\t{path.name}\tPATCH HTTP {pc2}", file=sys.stderr)
else:
print(f"OK\t{path.name}\tHTTP {pc}", file=sys.stderr)
return 0
def _merge_experimental(existing: Any, blob: dict[str, Any]) -> dict[str, Any]:
"""Merge ingested blob into Prism detail payload, preserving { data, success } wrappers."""
if isinstance(existing, dict) and isinstance(existing.get("data"), dict):
out = dict(existing)
out["data"] = _deep_merge(dict(existing["data"]), blob)
return out
if isinstance(existing, dict):
return _deep_merge(dict(existing), blob)
return existing
def _minimal_patch(
args: argparse.Namespace,
merged: dict[str, Any],
prism_patch: dict[str, Any] | None = None,
) -> dict[str, Any]:
if args.map_md and prism_patch is not None:
return {"data": merged.get("data", {})} if isinstance(merged.get("data"), dict) else merged
if args.experimental_merge:
return merged
key = args.attach_field or ""
if "." not in key:
return {key: merged[key]} if key in merged else {}
root = key.split(".", 1)[0]
if root in merged:
return {root: merged[root]}
return {}
def _attach_value(existing: dict[str, Any], field_path: str, blob: dict[str, Any]) -> Any:
cur = _get_dotted(existing, field_path)
if isinstance(cur, str):
return json.dumps(blob, ensure_ascii=False, indent=2)
return blob
def _get_dotted(d: dict[str, Any], path: str) -> Any:
for part in path.split("."):
if not isinstance(d, dict) or part not in d:
return None
d = d[part]
return d
def _set_dotted(d: dict[str, Any], path: str, value: Any) -> None:
parts = path.split(".")
for p in parts[:-1]:
nxt = d.get(p)
if not isinstance(nxt, dict):
nxt = {}
d[p] = nxt
d = nxt
d[parts[-1]] = value
def _deep_merge(base: Any, patch: Any) -> Any:
if isinstance(base, dict) and isinstance(patch, dict):
out = dict(base)
for k, v in patch.items():
if k == "meta":
continue
if k not in out or out[k] in (None, "", [], {}):
out[k] = v
else:
out[k] = _deep_merge(out[k], v)
return out
if patch in (None, "", [], {}):
return base
return patch
def main() -> int:
if hasattr(sys.stdout, "reconfigure"):
try:
sys.stdout.reconfigure(encoding="utf-8")
except Exception:
pass
_apply_prism_env_files()
ap = argparse.ArgumentParser(description="Prism production API helper for Story/*.md profiles.")
sub = ap.add_subparsers(dest="cmd", required=True)
p_list = sub.add_parser("list", help="GET characters list (id + name)")
p_list.add_argument("--production", default=os.environ.get("PRISM_PRODUCTION_ID", "111"))
p_insp = sub.add_parser("inspect", help="GET one character JSON (tries production-scoped URL)")
p_insp.add_argument("character_id")
p_insp.add_argument("--production", default=os.environ.get("PRISM_PRODUCTION_ID", "111"))
p_parse = sub.add_parser("parse", help="Parse one markdown profile to JSON")
p_parse.add_argument("path")
p_parse.add_argument("--blob", action="store_true", help="Emit flattened blob used for API experiments")
p_sync = sub.add_parser("sync", help="Match local .md files to remote characters and merge-push")
p_sync.add_argument("--story-root", default="Story")
p_sync.add_argument("--production", default=os.environ.get("PRISM_PRODUCTION_ID", "111"))
p_sync.add_argument("--match-profile", default="Assets/Scripts/prism_match_overrides.json")
p_sync.add_argument("--apply", action="store_true", help="Actually PUT/PATCH (needs PRISM_ALLOW_WRITE=1)")
p_sync.add_argument(
"--attach-field",
default=None,
help="Dot path on character JSON to store the ingested blob (inspect one character for a suitable string/object field).",
)
p_sync.add_argument(
"--experimental-merge",
action="store_true",
help="Deep-merge ingested blob into GET JSON (unsafe unless schema matches).",
)
p_sync.add_argument(
"--map-md",
action="store_true",
help="Map markdown sections to Prism native `data` fields (use with --apply to push).",
)
p_sync.add_argument(
"--no-skip-done",
action="store_true",
help="Also run on Adrian, Agate, Azure, Beanie, Raincloud (skipped by default).",
)
p_sync.add_argument("--camel", action="store_true", help="CamelCase JSON keys on the ingested blob")
p_sync.add_argument("--verbose", action="store_true")
p_sync.add_argument("--only", nargs="*", help="Limit to filenames e.g. Azure.md Beanie.md")
args = ap.parse_args()
if args.cmd == "list":
return cmd_list(args)
if args.cmd == "inspect":
return cmd_inspect(args)
if args.cmd == "parse":
return cmd_parse(args)
if args.cmd == "sync":
return cmd_sync(args)
return 1
if __name__ == "__main__":
raise SystemExit(main())