Files
2026-03-31_BattleRoyale/Assets/Scripts/prism_ingest.py
T
2026-04-23 02:00:28 -06:00

1112 lines
39 KiB
Python

#!/usr/bin/env python3
"""
Push Story/*.md Prism profiles to prism.tal.one via the production API.
Auth (pick one — use environment variables, never commit secrets):
PRISM_API_KEY If set, sends Authorization: Bearer <value> by default.
PRISM_AUTH_HEADER Optional full header value, e.g. "Bearer sk-..." or "ApiKey ...".
PRISM_SESSION_COOKIE Raw session cookie string (same value as browser Cookie "session").
Optional:
PRISM_BASE_URL Default https://prism.tal.one
PRISM_PRODUCTION_ID Default 111
PRISM_ALLOW_WRITE Must be exactly "1" for sync --apply
PRISM_USER_AGENT Override browser-like User-Agent (default: Chrome on Windows)
PRISM_ORIGIN Default https://prism.tal.one (Origin + Referer for API calls)
Examples:
python Assets/Scripts/prism_ingest.py list
python Assets/Scripts/prism_ingest.py parse Story/Azure.md
python Assets/Scripts/prism_ingest.py parse Story/Azure.md --blob
python Assets/Scripts/prism_ingest.py inspect <CHARACTER_UUID>
python Assets/Scripts/prism_ingest.py sync
python Assets/Scripts/prism_ingest.py sync --map-md
$env:PRISM_ALLOW_WRITE='1'; $env:PRISM_API_KEY=''; python Assets/Scripts/prism_ingest.py sync --apply --map-md
Writes need a destination field on the character JSON. Run `inspect` on one id, pick a string or object slot
(e.g. internal notes), pass its dot-path as --attach-field. Use --experimental-merge only if our blob keys
match Prism's schema exactly (unlikely without talw's field map).
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
FIELD_BULLET = re.compile(r"^-\s+\*\*(.+?)\*\*:\s*(.*)\s*$")
# Alternate: `- **Label:** value` (colon inside bold — common typo / variant)
FIELD_BULLET_COLON_IN_BOLD = re.compile(r"^-\s+\*\*(.+?):\*\*\s*(.*)\s*$")
REL_LINE = re.compile(r"^-\s+\*\*(.+?)\*\*\s+(.*)$")
HEADING = re.compile(r"^(#{2,3})\s+(.+?)\s*$")
_DEFAULT_UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
)
def _auth_headers() -> dict[str, str]:
"""Authorization / session only (no browser fingerprint)."""
h: dict[str, str] = {}
custom = os.environ.get("PRISM_AUTH_HEADER")
if custom:
if ":" in custom:
name, _, val = custom.partition(":")
h[name.strip()] = val.strip()
else:
h["Authorization"] = custom.strip()
return h
key = os.environ.get("PRISM_API_KEY", "").strip()
if key:
h["Authorization"] = f"Bearer {key}"
return h
sess = os.environ.get("PRISM_SESSION_COOKIE", "").strip()
if sess:
h["Cookie"] = f"session={sess}"
return h
return h
def _full_headers(*, json_body: bool) -> dict[str, str]:
"""
Prism's edge often returns HTTP 403 + HTML 'Connection Error' for bare urllib clients.
Send the same shape of headers the web app uses.
"""
origin = os.environ.get("PRISM_ORIGIN", "https://prism.tal.one").rstrip("/")
h: dict[str, str] = {
"Accept": "application/json, text/plain, */*",
"Accept-Language": "en-US,en;q=0.9",
"User-Agent": os.environ.get("PRISM_USER_AGENT", _DEFAULT_UA).strip() or _DEFAULT_UA,
"Origin": origin,
"Referer": os.environ.get("PRISM_REFERER", f"{origin}/productions/"),
}
h.update(_auth_headers())
if json_body:
h["Content-Type"] = "application/json"
return h
def _req(
method: str,
url: str,
data: bytes | None = None,
headers: dict[str, str] | None = None,
) -> tuple[int, str]:
hdrs = dict(headers or {})
r = urllib.request.Request(url, data=data, method=method, headers=hdrs)
try:
with urllib.request.urlopen(r, timeout=120) as resp:
return resp.getcode(), resp.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
return e.code, body
def _normalize_list_payload(data: Any) -> list[dict[str, Any]]:
if isinstance(data, list):
raw = [x for x in data if isinstance(x, dict)]
elif isinstance(data, dict):
raw = []
for key in ("characters", "data", "items", "results", "rows"):
v = data.get(key)
if isinstance(v, list):
raw = [x for x in v if isinstance(x, dict)]
break
else:
raise ValueError(f"Unexpected list JSON shape: {type(data).__name__}")
return [_unwrap_row(x) for x in raw]
def _unwrap_row(obj: dict[str, Any]) -> dict[str, Any]:
"""List items may be `{ \"data\": { ...character } }` like detail responses."""
inner = obj.get("data")
if isinstance(inner, dict):
out = dict(inner)
out.setdefault("uuid", obj.get("uuid"))
return out
return obj
def _char_id(obj: dict[str, Any]) -> str | None:
# Prefer uuid — numeric `id` breaks /characters/{id} routes on Prism.
for k in ("uuid", "characterId", "character_id", "_id", "id"):
v = obj.get(k)
if v is not None and str(v).strip():
return str(v)
return None
def _char_name(obj: dict[str, Any]) -> str | None:
for k in (
"display_name",
"canonical_name",
"name",
"title",
"characterName",
"displayName",
"fullName",
"label",
"slug",
):
v = obj.get(k)
if isinstance(v, str) and v.strip():
return v.strip()
return None
def _norm_key(s: str) -> str:
return re.sub(r"[^a-z0-9]+", "", s.lower())
def _load_match_config(path: Path | None) -> tuple[dict[str, str], dict[str, str]]:
"""Returns (filename_to_prism_display_name, filename_to_uuid)."""
if not path or not path.is_file():
return {}, {}
data = json.loads(path.read_text(encoding="utf-8"))
raw = data.get("filename_to_prism_name", data)
names = {str(k): str(v) for k, v in raw.items() if not str(k).startswith("_")}
uuids = {str(k): str(v) for k, v in data.get("filename_to_uuid", {}).items()}
return names, uuids
SYNC_EXCLUDE_FILES = frozenset(
{
"Long description.md",
"README.md",
"Worldbuilding.md",
"PRISM_FORMAT.md",
"Cameo-Creatures.md",
}
)
def _parse_relationship_tail(tail: str) -> tuple[str, str]:
"""Split em-dash-separated tail (after `- **Name**`) into type + remainder."""
parts = [p.strip() for p in re.split(r"\s*—\s*", tail) if p.strip()]
if not parts:
return "", ""
if len(parts) == 1:
return parts[0], ""
return parts[0], "".join(parts[1:])
def _parse_section_block(block: list[str]) -> dict[str, Any]:
"""Split a ## section body into ### subsections, or loose lines (e.g. Relationships)."""
subsections: dict[str, Any] = {}
current_sub: str | None = None
buf: list[str] = []
for line in block:
if line.startswith("### ") and not line.startswith("####"):
if current_sub is not None:
subsections[current_sub] = _parse_subsection(buf)
current_sub = line[4:].strip()
buf = []
else:
buf.append(line)
if current_sub is not None:
subsections[current_sub] = _parse_subsection(buf)
elif buf:
subsections["_lines"] = [x for x in buf if x.strip()]
return subsections
def parse_character_md(text: str, source: str) -> dict[str, Any]:
lines = text.splitlines()
stop = len(lines)
for i, line in enumerate(lines):
if line.strip() == "## Extended Canon Notes":
stop = i
break
lines = lines[:stop]
title = "Untitled"
off = 0
if lines and lines[0].startswith("# "):
title = lines[0][2:].strip()
off = 1
sections: dict[str, Any] = {}
i = off
while i < len(lines):
line = lines[i]
if not line.startswith("## ") or line.startswith("###"):
i += 1
continue
sec_name = line[3:].strip()
i += 1
block: list[str] = []
while i < len(lines):
if lines[i].startswith("## ") and not lines[i].startswith("###"):
break
block.append(lines[i])
i += 1
sections[sec_name] = _parse_section_block(block)
rel_rows: list[dict[str, str]] = []
rel_sec = sections.get("Relationships")
if isinstance(rel_sec, dict):
for line in rel_sec.get("_lines", []):
m = REL_LINE.match(line.strip())
if not m:
continue
related = m.group(1).strip()
tail = m.group(2).strip()
rtype, details = _parse_relationship_tail(tail)
rel_rows.append(
{
"related": related,
"relationship_type": rtype,
"label_or_details": details,
}
)
sections["Relationships"] = {"rows": rel_rows} if rel_rows else {"rows": []}
out: dict[str, Any] = {
"meta": {"source_file": source, "title": title},
"sections": sections,
}
return out
def _parse_subsection(lines: list[str]) -> Any:
fields: dict[str, Any] = {}
narrative: list[str] = []
generic_bullets: list[str] = []
pending_list_key: str | None = None
for line in lines:
if pending_list_key:
msub = re.match(r"^\s+-\s+(.+)$", line)
if msub:
entry = msub.group(1).strip()
bucket = fields[pending_list_key]
if isinstance(bucket, list):
bucket.append(entry)
continue
pending_list_key = None
fm = FIELD_BULLET.match(line)
if not fm:
fm = FIELD_BULLET_COLON_IN_BOLD.match(line)
if fm:
key, val = fm.group(1).strip(), fm.group(2).strip()
if not val and key == "Catch Phrases":
fields[key] = []
pending_list_key = key
else:
fields[key] = val
continue
gm = re.match(r"^-\s+(.+)$", line)
if gm and "**" not in line:
generic_bullets.append(gm.group(1).strip())
continue
if line.strip():
narrative.append(line.rstrip())
sec: dict[str, Any] = {"fields": fields}
if narrative:
sec["narrative"] = "\n".join(narrative).strip()
if generic_bullets:
sec["bullets"] = generic_bullets
if not fields and not narrative and not generic_bullets:
return {}
if not fields and len(sec) == 1 and "narrative" in sec:
return sec["narrative"]
return sec
def intermediate_to_prism_profile_blob(parsed: dict[str, Any]) -> dict[str, Any]:
"""Flatten nested 'sections' into a single JSON object for API merge experiments."""
blob: dict[str, Any] = {"meta": parsed.get("meta", {})}
sections = parsed.get("sections") or {}
for sec_name, sub in sections.items():
if not isinstance(sub, dict):
continue
if sec_name == "Relationships" and "rows" in sub:
blob["relationships"] = sub["rows"]
continue
bucket: dict[str, Any] = {}
for sub_name, content in sub.items():
if sub_name.startswith("_"):
continue
bucket[sub_name] = content
key = sec_name.lower().replace(" ", "_").replace("/", "_")
blob[key] = bucket
return blob
# --- Prism API `data` shape (from GET /characters/{uuid}); field names are snake_case. ---
_PROTECTED_DATA_KEYS = frozenset(
{
"uuid",
"id",
"slug",
"portrait_url",
"production_id",
"created_at",
"updated_at",
"role_type",
"color",
"color_index",
"actor",
}
)
_ATTR_FROM_LABELS: dict[str, str] = {
"Height": "height",
"Weight": "weight",
"Build": "build",
"Hair Color": "hair_color",
"Hair Style": "hair_style",
"Eye Color": "eye_color",
"Skin Tone": "skin_tone",
"Distinguishing Marks": "distinguishing_marks",
"Sex": "sex",
"Age": "age",
"Birth Year": "birth_year",
"Birth Place": "birth_place",
"Ethnicity": "ethnicity",
"Nationality": "nationality",
"Dominant Hand": "dominant_hand",
"Accent / Dialect": "accent",
"Voice Quality": "voice_quality",
"Gait / Movement": "gait",
}
def _blob_nav(blob: dict[str, Any], *segments: str) -> Any:
d: Any = blob
for s in segments:
if not isinstance(d, dict):
return None
d = d.get(s)
return d
def _blob_field_map(blob: dict[str, Any], *segments: str) -> dict[str, Any]:
sub = _blob_nav(blob, *segments)
if isinstance(sub, dict) and isinstance(sub.get("fields"), dict):
return dict(sub["fields"])
return {}
def _blob_narrative(blob: dict[str, Any], *segments: str) -> str:
sub = _blob_nav(blob, *segments)
if isinstance(sub, str):
return sub.strip()
if isinstance(sub, dict):
n = sub.get("narrative")
if isinstance(n, str) and n.strip():
return n.strip()
return ""
def _blob_bullets(blob: dict[str, Any], *segments: str) -> list[str]:
sub = _blob_nav(blob, *segments)
if isinstance(sub, dict) and isinstance(sub.get("bullets"), list):
return [str(b).strip() for b in sub["bullets"] if str(b).strip()]
return []
def _blob_combined_text(blob: dict[str, Any], *segments: str) -> str:
"""Narrative + bullets + non-empty field lines (for rich subsections)."""
sub = _blob_nav(blob, *segments)
if isinstance(sub, str):
return sub.strip()
if not isinstance(sub, dict):
return ""
parts: list[str] = []
n = sub.get("narrative")
if isinstance(n, str) and n.strip():
parts.append(n.strip())
for b in sub.get("bullets") or []:
if str(b).strip():
parts.append(str(b).strip())
for fk, fv in (sub.get("fields") or {}).items():
if fv is not None and str(fv).strip():
parts.append(f"**{fk}:** {fv}".strip())
return "\n\n".join(parts).strip()
def _semi_split(s: str) -> list[str]:
return [x.strip() for x in s.split(";") if x.strip()]
def _catch_phrase_list(raw: str) -> list[str]:
if not raw or not str(raw).strip():
return []
t = str(raw).strip().lower()
if t in ("", "-", "*(silent)*", "(silent)", "*silent*", "*(silent)*"):
return []
if "silent" in t and len(t) < 24:
return []
out: list[str] = []
for p in _semi_split(raw):
q = p.strip()
if len(q) >= 2 and q[0] == q[-1] and q[0] in "\"'":
q = q[1:-1]
out.append(q)
return out
def _dept_key(label: str) -> str:
return re.sub(r"[^a-z0-9]+", "_", label.lower()).strip("_") or "note"
def blob_to_prism_data_patch(blob: dict[str, Any], *, title: str) -> dict[str, Any]:
"""Build a partial `data` object for Prism from intermediate markdown blob (no meta)."""
patch: dict[str, Any] = {}
attrs: dict[str, str] = {}
for sub in ("Physical Attributes", "Identity (physical-form)"):
fm = _blob_field_map(blob, "appearance", sub)
for label, api in _ATTR_FROM_LABELS.items():
if api in attrs:
continue
v = fm.get(label)
if v is not None and str(v).strip():
attrs[api] = str(v).strip().lower() if api == "sex" else str(v).strip()
prod_fm = _blob_field_map(blob, "appearance", "Production (physical-form)")
for label, api in (
("Dominant Hand", "dominant_hand"),
("Accent / Dialect", "accent"),
("Voice Quality", "voice_quality"),
("Gait / Movement", "gait"),
):
v = prod_fm.get(label)
if v is not None and str(v).strip():
attrs[api] = str(v).strip()
if attrs:
vq = attrs.get("voice_quality")
if vq:
attrs["voice"] = {"instructions": vq}
attrs["voice_instructions"] = vq
patch["attributes"] = attrs
phys_desc = _blob_narrative(blob, "appearance", "Physical Description") or _blob_combined_text(
blob, "appearance", "Physical Description"
)
if phys_desc:
patch["physical_description"] = phys_desc
personality = _blob_combined_text(blob, "identity", "Personality")
if personality:
patch["personality"] = personality
bg = _blob_combined_text(blob, "identity", "Background / Backstory")
ext_hist = _blob_combined_text(blob, "arc", "Extended History")
if bg and ext_hist:
patch["background_story"] = f"{bg}\n\n---\n\n{ext_hist}"
elif bg:
patch["background_story"] = bg
elif ext_hist:
patch["background_story"] = ext_hist
costume = _blob_combined_text(blob, "wardrobe", "Costume Notes")
era = _blob_combined_text(blob, "wardrobe", "Era / Period")
if costume and era:
patch["costume_notes"] = f"{costume}\n\n*Era / period:* {era}"
elif costume:
patch["costume_notes"] = costume
elif era:
patch["costume_notes"] = f"*Era / period:* {era}"
makeup = _blob_combined_text(blob, "wardrobe", "Makeup Notes")
if makeup:
patch["makeup_notes"] = makeup
drive_fm = _blob_field_map(blob, "arc", "Drive")
mot_parts: list[str] = []
for key in ("Motivation", "Want", "Need"):
bit = drive_fm.get(key)
if bit is not None and str(bit).strip():
if key == "Motivation":
mot_parts.append(str(bit).strip())
else:
mot_parts.append(f"{key}: {str(bit).strip()}")
if mot_parts:
patch["motivation"] = "\n".join(mot_parts)
opp_fm = _blob_field_map(blob, "arc", "Opposition")
if opp_fm.get("Conflict") and str(opp_fm["Conflict"]).strip():
patch["conflict"] = str(opp_fm["Conflict"]).strip()
change_fm = _blob_field_map(blob, "arc", "Change")
if change_fm.get("Transformation") and str(change_fm["Transformation"]).strip():
patch["transformation"] = str(change_fm["Transformation"]).strip()
traits_fm = _blob_field_map(blob, "identity", "Traits")
if traits_fm.get("Interests") and str(traits_fm["Interests"]).strip():
patch["interests"] = _semi_split(str(traits_fm["Interests"]))
speech_fm = _blob_field_map(blob, "identity", "Speech")
extra: dict[str, Any] = {}
cps = _catch_phrase_list(str(speech_fm.get("Catch Phrases", "") or ""))
if cps:
extra["catch_phrases"] = cps
acc_lines = _blob_bullets(blob, "wardrobe", "Accessories & Props")
acc_fm = _blob_field_map(blob, "wardrobe", "Accessories & Props")
for _k, v in acc_fm.items():
if v is not None and str(v).strip():
acc_lines.append(str(v).strip())
if acc_lines:
extra["accessories"] = acc_lines
if traits_fm.get("Quirks & Habits") and str(traits_fm["Quirks & Habits"]).strip():
extra["quirks_habits"] = str(traits_fm["Quirks & Habits"]).strip()
if traits_fm.get("Fears") and str(traits_fm["Fears"]).strip():
extra["fears"] = str(traits_fm["Fears"]).strip()
if traits_fm.get("Skills & Abilities") and str(traits_fm["Skills & Abilities"]).strip():
extra["skills_abilities"] = str(traits_fm["Skills & Abilities"]).strip()
if opp_fm.get("Lie They Believe") and str(opp_fm["Lie They Believe"]).strip():
extra["lie_they_believe"] = str(opp_fm["Lie They Believe"]).strip()
if opp_fm.get("Ghost") and str(opp_fm["Ghost"]).strip():
extra["ghost"] = str(opp_fm["Ghost"]).strip()
if change_fm.get("Arc Type") and str(change_fm["Arc Type"]).strip():
extra["arc_type"] = str(change_fm["Arc Type"]).strip()
req_fm = _blob_field_map(blob, "production", "Requirements")
if req_fm.get("Stunt Requirements") and str(req_fm["Stunt Requirements"]).strip():
extra["stunt_requirements"] = str(req_fm["Stunt Requirements"]).strip()
if req_fm.get("SFX Requirements") and str(req_fm["SFX Requirements"]).strip():
extra["sfx_requirements"] = str(req_fm["SFX Requirements"]).strip()
dept_fm = _blob_field_map(blob, "production", "Department Notes")
if dept_fm:
dn: dict[str, str] = {}
for k, v in dept_fm.items():
if v is not None and str(v).strip():
dn[_dept_key(str(k))] = str(v).strip()
if dn:
patch["department_notes"] = dn
cont = _blob_combined_text(blob, "production", "Continuity Notes")
if cont:
patch["continuity_notes"] = cont
rel_rows = blob.get("relationships")
if isinstance(rel_rows, list) and rel_rows:
lines = []
for row in rel_rows:
if not isinstance(row, dict):
continue
r = row.get("related", "")
rt = row.get("relationship_type", "")
ld = row.get("label_or_details", "")
if ld:
lines.append(f"**{r}** — {rt}{ld}")
elif rt:
lines.append(f"**{r}** — {rt}")
else:
lines.append(f"**{r}**")
if lines:
extra["relationships_md"] = "\n".join(lines)
if extra:
patch["extra_data"] = extra
if title:
patch["display_name"] = title.strip()
patch["canonical_name"] = title.strip().upper()
return patch
def _apply_prism_data_patch(inner: dict[str, Any], patch: dict[str, Any]) -> dict[str, Any]:
"""Merge markdown-derived patch into existing Prism `data` without clobbering protected keys."""
out = dict(inner)
for key, val in patch.items():
if key in _PROTECTED_DATA_KEYS:
continue
if val in (None, "", [], {}):
continue
if key == "attributes" and isinstance(val, dict):
base_a = out.get("attributes")
merged_a = dict(base_a) if isinstance(base_a, dict) else {}
for ak, av in val.items():
if av not in (None, "", [], {}):
merged_a[ak] = av
out["attributes"] = merged_a
continue
if key == "extra_data" and isinstance(val, dict):
base_e = out.get("extra_data")
merged_e = dict(base_e) if isinstance(base_e, dict) else {}
for ek, ev in val.items():
if ev in (None, "", [], {}):
continue
merged_e[ek] = ev
out["extra_data"] = merged_e
continue
if key == "department_notes" and isinstance(val, dict):
base_d = out.get("department_notes")
merged_d = dict(base_d) if isinstance(base_d, dict) else {}
merged_d.update(val)
out["department_notes"] = merged_d
continue
if key == "voice" and isinstance(val, dict):
base_v = out.get("voice")
if isinstance(base_v, dict):
nv = dict(base_v)
nv.update(val)
out["voice"] = nv
else:
out["voice"] = val
continue
out[key] = val
return out
_READONLY_CHARACTER_FIELDS = frozenset({"created_at", "updated_at"})
def _strip_readonly_character_fields(data: dict[str, Any]) -> dict[str, Any]:
out = dict(data)
for k in _READONLY_CHARACTER_FIELDS:
out.pop(k, None)
return out
def _prism_write_payload(merged: dict[str, Any]) -> dict[str, Any]:
"""Body for PUT/PATCH: never send GET-only keys like `success`; strip server timestamps from `data`."""
if isinstance(merged.get("data"), dict):
body: dict[str, Any] = {
"data": _strip_readonly_character_fields(dict(merged["data"])),
}
for k, v in merged.items():
if k in ("data", "success"):
continue
body[k] = v
return body
return {k: v for k, v in merged.items() if k != "success"}
SYNC_SKIP_MANUAL = frozenset(
{"Adrian.md", "Agate.md", "Azure.md", "Beanie.md", "RaincloudTheDragon.md"}
)
def keys_to_camel(o: Any) -> Any:
if isinstance(o, dict):
return {_camel(k): keys_to_camel(v) for k, v in o.items()}
if isinstance(o, list):
return [keys_to_camel(x) for x in o]
return o
def _camel(s: str) -> str:
parts = re.split(r"[_\s]+", s)
out = []
for i, p in enumerate(parts):
if not p:
continue
if i == 0:
out.append(p[0].lower() + p[1:])
else:
out.append(p[0].upper() + p[1:])
return "".join(out) if out else s
def _slug_compact(s: str) -> str:
return _norm_key(s.replace("-", "").replace("_", ""))
def _display_key(s: str) -> str:
"""Normalize display strings so 'Heart & Mind' and 'Heart and Mind' match."""
return _norm_key(s.lower().replace("&", " and "))
def _match_remote(
parsed: dict[str, Any],
path: Path,
remote: list[dict[str, Any]],
overrides: dict[str, str],
uuid_overrides: dict[str, str],
) -> dict[str, Any] | None:
if path.name in uuid_overrides:
want = str(uuid_overrides[path.name]).strip().lower()
for ch in remote:
cid = (_char_id(ch) or "").strip().lower()
if cid == want:
return ch
return None
stem_slug = path.stem.lower().replace("_", "-").replace(" ", "-")
stem_c = _slug_compact(path.stem)
if path.name in overrides:
target = overrides[path.name].strip()
tn = _norm_key(target)
tdk = _display_key(target)
for ch in remote:
slug = (ch.get("slug") or "").strip().lower()
name = (_char_name(ch) or "").strip()
nn = _norm_key(name)
slug_c = _slug_compact(slug)
if name.lower() == target.lower() or (name and tdk == _display_key(name)):
return ch
if nn and nn == tn:
return ch
if slug and (slug == stem_slug or slug_c == stem_c):
return ch
if slug_c and slug_c == _slug_compact(target):
return ch
return None
target = parsed["meta"].get("title") or path.stem
if (parsed["meta"].get("title") == "Untitled" or not str(target).strip()) and path.suffix == ".md":
return None
tn = _norm_key(target)
best = None
best_score = -1
for ch in remote:
slug = (ch.get("slug") or "").strip().lower()
slug_c = _slug_compact(slug)
name = _char_name(ch) or ""
nn = _norm_key(name)
score = 0
if slug:
if slug == stem_slug or slug_c == stem_c or _norm_key(slug) == _norm_key(path.stem):
score = 93
elif slug.replace("-", "") == stem_slug.replace("-", ""):
score = 88
if nn:
if nn == tn:
score = max(score, 100)
elif tn in nn or nn in tn:
lo, hi = (nn, tn) if len(nn) <= len(tn) else (tn, nn)
if len(hi) and len(lo) >= 0.72 * len(hi):
score = max(score, 80)
elif len(tn) >= 4 and len(nn) >= 4 and tn[:4] == nn[:4]:
score = max(score, 40)
if score > best_score:
best_score = score
best = ch
return best if best_score >= 80 else None
def cmd_list(args: argparse.Namespace) -> int:
if not _auth_headers():
print(
"Warning: no PRISM_API_KEY, PRISM_AUTH_HEADER, or PRISM_SESSION_COOKIE in environment.",
file=sys.stderr,
)
base = os.environ.get("PRISM_BASE_URL", "https://prism.tal.one").rstrip("/")
pid = args.production or os.environ.get("PRISM_PRODUCTION_ID", "111")
url = f"{base}/api/v2/productions/{pid}/characters?sort=prominence"
code, body = _req("GET", url, headers=_full_headers(json_body=False))
print(f"HTTP {code} {url}")
try:
data = json.loads(body)
except json.JSONDecodeError:
if body.lstrip().startswith("<!") or "Connection Error" in body[:500]:
print(
"Got HTML instead of JSON — usually Cloudflare/WAF blocking non-browser clients.\n"
"This script now sends a Chrome User-Agent + Origin/Referer. Retry.\n"
"If it persists: set PRISM_USER_AGENT to match your real browser (DevTools → Network → "
"User-Agent), confirm PRISM_API_KEY or PRISM_SESSION_COOKIE is set in this shell, "
"or ask talw whether API keys must use a different host/header.",
file=sys.stderr,
)
print(body[:2000])
return 1
if code != 200:
print(json.dumps(data, indent=2) if isinstance(data, (dict, list)) else body[:2000])
return 1
chars = _normalize_list_payload(data)
for ch in chars:
cid = _char_id(ch)
name = _char_name(ch)
print(f"{cid}\t{name}")
return 0
def cmd_inspect(args: argparse.Namespace) -> int:
base = os.environ.get("PRISM_BASE_URL", "https://prism.tal.one").rstrip("/")
pid = args.production or os.environ.get("PRISM_PRODUCTION_ID", "111")
cid = args.character_id
headers = _full_headers(json_body=False)
urls = [
f"{base}/api/v2/productions/{pid}/characters/{cid}",
f"{base}/api/v2/productions/{pid}/characters/{cid}/",
]
for url in urls:
code, body = _req("GET", url, headers=headers)
print(f"# {code} GET {url}")
try:
data = json.loads(body)
print(json.dumps(data, indent=2)[:12000])
except json.JSONDecodeError:
print(body[:4000])
if code == 200:
return 0
return 1
def cmd_parse(args: argparse.Namespace) -> int:
path = Path(args.path)
text = path.read_text(encoding="utf-8")
parsed = parse_character_md(text, str(path))
if args.blob:
out = intermediate_to_prism_profile_blob(parsed)
print(json.dumps(out, indent=2, ensure_ascii=False))
else:
print(json.dumps(parsed, indent=2, ensure_ascii=False))
return 0
def cmd_sync(args: argparse.Namespace) -> int:
if args.apply and os.environ.get("PRISM_ALLOW_WRITE") != "1":
print("Refusing write: set PRISM_ALLOW_WRITE=1 for sync --apply", file=sys.stderr)
return 2
if args.apply and not args.attach_field and not args.experimental_merge and not args.map_md:
print(
"sync --apply needs one mode:\n"
" --map-md Map markdown → Prism `data` fields (recommended)\n"
" --attach-field <path> Store raw blob at a dot path on the API envelope\n"
" --experimental-merge Deep-merge intermediate blob (almost never correct)\n",
file=sys.stderr,
)
return 2
base = os.environ.get("PRISM_BASE_URL", "https://prism.tal.one").rstrip("/")
pid = args.production or os.environ.get("PRISM_PRODUCTION_ID", "111")
list_url = f"{base}/api/v2/productions/{pid}/characters?sort=prominence"
code, body = _req("GET", list_url, headers=_full_headers(json_body=False))
if code != 200:
print(f"List failed HTTP {code}: {body[:1500]}", file=sys.stderr)
return 1
remote = _normalize_list_payload(json.loads(body))
cfg_path = Path(args.match_profile) if args.match_profile else Path("Assets/Scripts/prism_match_overrides.json")
overrides, uuid_overrides = _load_match_config(cfg_path)
story = Path(args.story_root)
paths = sorted(story.glob("*.md"))
paths = [p for p in paths if p.name not in SYNC_EXCLUDE_FILES]
paths += sorted((story / "Cameos").glob("*.md")) if (story / "Cameos").is_dir() else []
if args.only:
filt = set(args.only)
paths = [p for p in paths if p.name in filt]
for path in paths:
if not args.no_skip_done and path.name in SYNC_SKIP_MANUAL:
print(f"SKIP manual-done\t{path.name}", file=sys.stderr)
continue
parsed = parse_character_md(path.read_text(encoding="utf-8"), str(path))
blob = intermediate_to_prism_profile_blob(parsed)
blob.pop("meta", None)
if args.camel and not args.map_md:
blob = keys_to_camel(blob)
match = _match_remote(parsed, path, remote, overrides, uuid_overrides)
if not match:
print(f"SKIP no match\t{path.name}\ttitle={parsed['meta'].get('title')}", file=sys.stderr)
continue
cid = _char_id(match)
name = _char_name(match)
detail_url = f"{base}/api/v2/productions/{pid}/characters/{cid}"
title = parsed["meta"].get("title") or path.stem
prism_patch = blob_to_prism_data_patch(blob, title=title) if args.map_md else {}
if not args.apply:
if args.map_md:
print(
f"DRY-MAP\t{path.name}\t->\t{name}\t({cid})\t"
f"patch_keys={list(prism_patch.keys())}",
file=sys.stderr,
)
if args.verbose:
print(json.dumps(prism_patch, indent=2, ensure_ascii=False)[:12000])
else:
print(f"DRY\t{path.name}\t->\t{name}\t({cid})\tkeys={len(json.dumps(blob))}b")
if args.verbose:
print(json.dumps(blob, indent=2, ensure_ascii=False)[:8000])
continue
gc, gbody = _req("GET", detail_url, headers=_full_headers(json_body=False))
if gc != 200:
print(f"GET fail {path.name} HTTP {gc} {gbody[:500]}", file=sys.stderr)
continue
existing = json.loads(gbody)
if args.map_md:
if not isinstance(existing.get("data"), dict):
print(f"SKIP bad envelope {path.name}", file=sys.stderr)
continue
merged = dict(existing)
merged["data"] = _apply_prism_data_patch(existing["data"], prism_patch)
elif args.experimental_merge:
merged = _merge_experimental(existing, blob)
else:
merged = dict(existing)
_set_dotted(
merged,
args.attach_field,
_attach_value(merged, args.attach_field, blob),
)
write_obj = _prism_write_payload(merged)
payload = json.dumps(write_obj, ensure_ascii=False).encode("utf-8")
pc, pbody = _req("PUT", detail_url, data=payload, headers=_full_headers(json_body=True))
if pc not in (200, 204):
patch_body = _minimal_patch(args, write_obj, prism_patch if args.map_md else None)
pc2, pbody2 = _req(
"PATCH",
detail_url,
data=json.dumps(patch_body, ensure_ascii=False).encode("utf-8"),
headers=_full_headers(json_body=True),
)
print(f"{path.name}\tPUT {pc}\tPATCH {pc2}\t{pbody2[:500]}", file=sys.stderr)
else:
print(f"OK\t{path.name}\tHTTP {pc}", file=sys.stderr)
return 0
def _merge_experimental(existing: Any, blob: dict[str, Any]) -> dict[str, Any]:
"""Merge ingested blob into Prism detail payload, preserving { data, success } wrappers."""
if isinstance(existing, dict) and isinstance(existing.get("data"), dict):
out = dict(existing)
out["data"] = _deep_merge(dict(existing["data"]), blob)
return out
if isinstance(existing, dict):
return _deep_merge(dict(existing), blob)
return existing
def _minimal_patch(
args: argparse.Namespace,
merged: dict[str, Any],
prism_patch: dict[str, Any] | None = None,
) -> dict[str, Any]:
if args.map_md and prism_patch is not None:
return {"data": merged.get("data", {})} if isinstance(merged.get("data"), dict) else merged
if args.experimental_merge:
return merged
key = args.attach_field or ""
if "." not in key:
return {key: merged[key]} if key in merged else {}
root = key.split(".", 1)[0]
if root in merged:
return {root: merged[root]}
return {}
def _attach_value(existing: dict[str, Any], field_path: str, blob: dict[str, Any]) -> Any:
cur = _get_dotted(existing, field_path)
if isinstance(cur, str):
return json.dumps(blob, ensure_ascii=False, indent=2)
return blob
def _get_dotted(d: dict[str, Any], path: str) -> Any:
for part in path.split("."):
if not isinstance(d, dict) or part not in d:
return None
d = d[part]
return d
def _set_dotted(d: dict[str, Any], path: str, value: Any) -> None:
parts = path.split(".")
for p in parts[:-1]:
nxt = d.get(p)
if not isinstance(nxt, dict):
nxt = {}
d[p] = nxt
d = nxt
d[parts[-1]] = value
def _deep_merge(base: Any, patch: Any) -> Any:
if isinstance(base, dict) and isinstance(patch, dict):
out = dict(base)
for k, v in patch.items():
if k == "meta":
continue
if k not in out or out[k] in (None, "", [], {}):
out[k] = v
else:
out[k] = _deep_merge(out[k], v)
return out
if patch in (None, "", [], {}):
return base
return patch
def main() -> int:
if hasattr(sys.stdout, "reconfigure"):
try:
sys.stdout.reconfigure(encoding="utf-8")
except Exception:
pass
ap = argparse.ArgumentParser(description="Prism production API helper for Story/*.md profiles.")
sub = ap.add_subparsers(dest="cmd", required=True)
p_list = sub.add_parser("list", help="GET characters list (id + name)")
p_list.add_argument("--production", default=os.environ.get("PRISM_PRODUCTION_ID", "111"))
p_insp = sub.add_parser("inspect", help="GET one character JSON (tries production-scoped URL)")
p_insp.add_argument("character_id")
p_insp.add_argument("--production", default=os.environ.get("PRISM_PRODUCTION_ID", "111"))
p_parse = sub.add_parser("parse", help="Parse one markdown profile to JSON")
p_parse.add_argument("path")
p_parse.add_argument("--blob", action="store_true", help="Emit flattened blob used for API experiments")
p_sync = sub.add_parser("sync", help="Match local .md files to remote characters and merge-push")
p_sync.add_argument("--story-root", default="Story")
p_sync.add_argument("--production", default=os.environ.get("PRISM_PRODUCTION_ID", "111"))
p_sync.add_argument("--match-profile", default="Assets/Scripts/prism_match_overrides.json")
p_sync.add_argument("--apply", action="store_true", help="Actually PUT/PATCH (needs PRISM_ALLOW_WRITE=1)")
p_sync.add_argument(
"--attach-field",
default=None,
help="Dot path on character JSON to store the ingested blob (inspect one character for a suitable string/object field).",
)
p_sync.add_argument(
"--experimental-merge",
action="store_true",
help="Deep-merge ingested blob into GET JSON (unsafe unless schema matches).",
)
p_sync.add_argument(
"--map-md",
action="store_true",
help="Map markdown sections to Prism native `data` fields (use with --apply to push).",
)
p_sync.add_argument(
"--no-skip-done",
action="store_true",
help="Also run on Adrian, Agate, Azure, Beanie, Raincloud (skipped by default).",
)
p_sync.add_argument("--camel", action="store_true", help="CamelCase JSON keys on the ingested blob")
p_sync.add_argument("--verbose", action="store_true")
p_sync.add_argument("--only", nargs="*", help="Limit to filenames e.g. Azure.md Beanie.md")
args = ap.parse_args()
if args.cmd == "list":
return cmd_list(args)
if args.cmd == "inspect":
return cmd_inspect(args)
if args.cmd == "parse":
return cmd_parse(args)
if args.cmd == "sync":
return cmd_sync(args)
return 1
if __name__ == "__main__":
raise SystemExit(main())