diff --git a/tools/prism_ingest.py b/tools/prism_ingest.py new file mode 100644 index 0000000..dffa598 --- /dev/null +++ b/tools/prism_ingest.py @@ -0,0 +1,1086 @@ +#!/usr/bin/env python3 +""" +Push Story/*.md Prism profiles to prism.tal.one via the production API. + +Auth (pick one — use environment variables, never commit secrets): + PRISM_API_KEY If set, sends Authorization: Bearer by default. + PRISM_AUTH_HEADER Optional full header value, e.g. "Bearer sk-..." or "ApiKey ...". + PRISM_SESSION_COOKIE Raw session cookie string (same value as browser Cookie "session"). + +Optional: + PRISM_BASE_URL Default https://prism.tal.one + PRISM_PRODUCTION_ID Default 111 + PRISM_ALLOW_WRITE Must be exactly "1" for sync --apply + PRISM_USER_AGENT Override browser-like User-Agent (default: Chrome on Windows) + PRISM_ORIGIN Default https://prism.tal.one (Origin + Referer for API calls) + +Examples: + python tools/prism_ingest.py list + python tools/prism_ingest.py parse Story/Azure.md + python tools/prism_ingest.py parse Story/Azure.md --blob + python tools/prism_ingest.py inspect + python tools/prism_ingest.py sync + python tools/prism_ingest.py sync --map-md + $env:PRISM_ALLOW_WRITE='1'; $env:PRISM_API_KEY='…'; python tools/prism_ingest.py sync --apply --map-md + +Writes need a destination field on the character JSON. Run `inspect` on one id, pick a string or object slot +(e.g. internal notes), pass its dot-path as --attach-field. Use --experimental-merge only if our blob keys +match Prism's schema exactly (unlikely without talw's field map). +""" + +from __future__ import annotations + +import argparse +import json +import os +import re +import sys +import urllib.error +import urllib.request +from pathlib import Path +from typing import Any + +FIELD_BULLET = re.compile(r"^-\s+\*\*(.+?)\*\*:\s*(.*)\s*$") +# Alternate: `- **Label:** value` (colon inside bold — common typo / variant) +FIELD_BULLET_COLON_IN_BOLD = re.compile(r"^-\s+\*\*(.+?):\*\*\s*(.*)\s*$") +REL_LINE = re.compile(r"^-\s+\*\*(.+?)\*\*\s+(.*)$") +HEADING = re.compile(r"^(#{2,3})\s+(.+?)\s*$") + + +_DEFAULT_UA = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36" +) + + +def _auth_headers() -> dict[str, str]: + """Authorization / session only (no browser fingerprint).""" + h: dict[str, str] = {} + custom = os.environ.get("PRISM_AUTH_HEADER") + if custom: + if ":" in custom: + name, _, val = custom.partition(":") + h[name.strip()] = val.strip() + else: + h["Authorization"] = custom.strip() + return h + key = os.environ.get("PRISM_API_KEY", "").strip() + if key: + h["Authorization"] = f"Bearer {key}" + return h + sess = os.environ.get("PRISM_SESSION_COOKIE", "").strip() + if sess: + h["Cookie"] = f"session={sess}" + return h + return h + + +def _full_headers(*, json_body: bool) -> dict[str, str]: + """ + Prism's edge often returns HTTP 403 + HTML 'Connection Error' for bare urllib clients. + Send the same shape of headers the web app uses. + """ + origin = os.environ.get("PRISM_ORIGIN", "https://prism.tal.one").rstrip("/") + h: dict[str, str] = { + "Accept": "application/json, text/plain, */*", + "Accept-Language": "en-US,en;q=0.9", + "User-Agent": os.environ.get("PRISM_USER_AGENT", _DEFAULT_UA).strip() or _DEFAULT_UA, + "Origin": origin, + "Referer": os.environ.get("PRISM_REFERER", f"{origin}/productions/"), + } + h.update(_auth_headers()) + if json_body: + h["Content-Type"] = "application/json" + return h + + +def _req( + method: str, + url: str, + data: bytes | None = None, + headers: dict[str, str] | None = None, +) -> tuple[int, str]: + hdrs = dict(headers or {}) + r = urllib.request.Request(url, data=data, method=method, headers=hdrs) + try: + with urllib.request.urlopen(r, timeout=120) as resp: + return resp.getcode(), resp.read().decode("utf-8", errors="replace") + except urllib.error.HTTPError as e: + body = e.read().decode("utf-8", errors="replace") + return e.code, body + + +def _normalize_list_payload(data: Any) -> list[dict[str, Any]]: + if isinstance(data, list): + raw = [x for x in data if isinstance(x, dict)] + elif isinstance(data, dict): + raw = [] + for key in ("characters", "data", "items", "results", "rows"): + v = data.get(key) + if isinstance(v, list): + raw = [x for x in v if isinstance(x, dict)] + break + else: + raise ValueError(f"Unexpected list JSON shape: {type(data).__name__}") + return [_unwrap_row(x) for x in raw] + + +def _unwrap_row(obj: dict[str, Any]) -> dict[str, Any]: + """List items may be `{ \"data\": { ...character } }` like detail responses.""" + inner = obj.get("data") + if isinstance(inner, dict): + out = dict(inner) + out.setdefault("uuid", obj.get("uuid")) + return out + return obj + + +def _char_id(obj: dict[str, Any]) -> str | None: + # Prefer uuid — numeric `id` breaks /characters/{id} routes on Prism. + for k in ("uuid", "characterId", "character_id", "_id", "id"): + v = obj.get(k) + if v is not None and str(v).strip(): + return str(v) + return None + + +def _char_name(obj: dict[str, Any]) -> str | None: + for k in ( + "display_name", + "canonical_name", + "name", + "title", + "characterName", + "displayName", + "fullName", + "label", + "slug", + ): + v = obj.get(k) + if isinstance(v, str) and v.strip(): + return v.strip() + return None + + +def _norm_key(s: str) -> str: + return re.sub(r"[^a-z0-9]+", "", s.lower()) + + +def _load_match_config(path: Path | None) -> tuple[dict[str, str], dict[str, str]]: + """Returns (filename_to_prism_display_name, filename_to_uuid).""" + if not path or not path.is_file(): + return {}, {} + data = json.loads(path.read_text(encoding="utf-8")) + raw = data.get("filename_to_prism_name", data) + names = {str(k): str(v) for k, v in raw.items() if not str(k).startswith("_")} + uuids = {str(k): str(v) for k, v in data.get("filename_to_uuid", {}).items()} + return names, uuids + + +SYNC_EXCLUDE_FILES = frozenset( + { + "Long description.md", + "README.md", + "Worldbuilding.md", + "PRISM_FORMAT.md", + "Cameo-Creatures.md", + } +) + + +def _parse_relationship_tail(tail: str) -> tuple[str, str]: + """Split em-dash-separated tail (after `- **Name**`) into type + remainder.""" + parts = [p.strip() for p in re.split(r"\s*—\s*", tail) if p.strip()] + if not parts: + return "", "" + if len(parts) == 1: + return parts[0], "" + return parts[0], " — ".join(parts[1:]) + + +def _parse_section_block(block: list[str]) -> dict[str, Any]: + """Split a ## section body into ### subsections, or loose lines (e.g. Relationships).""" + subsections: dict[str, Any] = {} + current_sub: str | None = None + buf: list[str] = [] + for line in block: + if line.startswith("### ") and not line.startswith("####"): + if current_sub is not None: + subsections[current_sub] = _parse_subsection(buf) + current_sub = line[4:].strip() + buf = [] + else: + buf.append(line) + if current_sub is not None: + subsections[current_sub] = _parse_subsection(buf) + elif buf: + subsections["_lines"] = [x for x in buf if x.strip()] + return subsections + + +def parse_character_md(text: str, source: str) -> dict[str, Any]: + lines = text.splitlines() + stop = len(lines) + for i, line in enumerate(lines): + if line.strip() == "## Extended Canon Notes": + stop = i + break + lines = lines[:stop] + + title = "Untitled" + off = 0 + if lines and lines[0].startswith("# "): + title = lines[0][2:].strip() + off = 1 + + sections: dict[str, Any] = {} + i = off + while i < len(lines): + line = lines[i] + if not line.startswith("## ") or line.startswith("###"): + i += 1 + continue + sec_name = line[3:].strip() + i += 1 + block: list[str] = [] + while i < len(lines): + if lines[i].startswith("## ") and not lines[i].startswith("###"): + break + block.append(lines[i]) + i += 1 + sections[sec_name] = _parse_section_block(block) + + rel_rows: list[dict[str, str]] = [] + rel_sec = sections.get("Relationships") + if isinstance(rel_sec, dict): + for line in rel_sec.get("_lines", []): + m = REL_LINE.match(line.strip()) + if not m: + continue + related = m.group(1).strip() + tail = m.group(2).strip() + rtype, details = _parse_relationship_tail(tail) + rel_rows.append( + { + "related": related, + "relationship_type": rtype, + "label_or_details": details, + } + ) + sections["Relationships"] = {"rows": rel_rows} if rel_rows else {"rows": []} + + out: dict[str, Any] = { + "meta": {"source_file": source, "title": title}, + "sections": sections, + } + return out + + +def _parse_subsection(lines: list[str]) -> Any: + fields: dict[str, Any] = {} + narrative: list[str] = [] + generic_bullets: list[str] = [] + pending_list_key: str | None = None + + for line in lines: + if pending_list_key: + msub = re.match(r"^\s+-\s+(.+)$", line) + if msub: + entry = msub.group(1).strip() + bucket = fields[pending_list_key] + if isinstance(bucket, list): + bucket.append(entry) + continue + pending_list_key = None + + fm = FIELD_BULLET.match(line) + if not fm: + fm = FIELD_BULLET_COLON_IN_BOLD.match(line) + if fm: + key, val = fm.group(1).strip(), fm.group(2).strip() + if not val and key == "Catch Phrases": + fields[key] = [] + pending_list_key = key + else: + fields[key] = val + continue + + gm = re.match(r"^-\s+(.+)$", line) + if gm and "**" not in line: + generic_bullets.append(gm.group(1).strip()) + continue + + if line.strip(): + narrative.append(line.rstrip()) + + sec: dict[str, Any] = {"fields": fields} + if narrative: + sec["narrative"] = "\n".join(narrative).strip() + if generic_bullets: + sec["bullets"] = generic_bullets + if not fields and not narrative and not generic_bullets: + return {} + if not fields and len(sec) == 1 and "narrative" in sec: + return sec["narrative"] + return sec + + +def intermediate_to_prism_profile_blob(parsed: dict[str, Any]) -> dict[str, Any]: + """Flatten nested 'sections' into a single JSON object for API merge experiments.""" + blob: dict[str, Any] = {"meta": parsed.get("meta", {})} + sections = parsed.get("sections") or {} + for sec_name, sub in sections.items(): + if not isinstance(sub, dict): + continue + if sec_name == "Relationships" and "rows" in sub: + blob["relationships"] = sub["rows"] + continue + bucket: dict[str, Any] = {} + for sub_name, content in sub.items(): + if sub_name.startswith("_"): + continue + bucket[sub_name] = content + key = sec_name.lower().replace(" ", "_").replace("/", "_") + blob[key] = bucket + return blob + + +# --- Prism API `data` shape (from GET /characters/{uuid}); field names are snake_case. --- + +_PROTECTED_DATA_KEYS = frozenset( + { + "uuid", + "id", + "slug", + "portrait_url", + "production_id", + "created_at", + "updated_at", + "role_type", + "color", + "color_index", + "actor", + } +) + +_ATTR_FROM_LABELS: dict[str, str] = { + "Height": "height", + "Weight": "weight", + "Build": "build", + "Hair Color": "hair_color", + "Hair Style": "hair_style", + "Eye Color": "eye_color", + "Skin Tone": "skin_tone", + "Distinguishing Marks": "distinguishing_marks", + "Sex": "sex", + "Age": "age", + "Birth Year": "birth_year", + "Birth Place": "birth_place", + "Ethnicity": "ethnicity", + "Nationality": "nationality", + "Dominant Hand": "dominant_hand", + "Accent / Dialect": "accent", + "Voice Quality": "voice_quality", + "Gait / Movement": "gait", +} + + +def _blob_nav(blob: dict[str, Any], *segments: str) -> Any: + d: Any = blob + for s in segments: + if not isinstance(d, dict): + return None + d = d.get(s) + return d + + +def _blob_field_map(blob: dict[str, Any], *segments: str) -> dict[str, Any]: + sub = _blob_nav(blob, *segments) + if isinstance(sub, dict) and isinstance(sub.get("fields"), dict): + return dict(sub["fields"]) + return {} + + +def _blob_narrative(blob: dict[str, Any], *segments: str) -> str: + sub = _blob_nav(blob, *segments) + if isinstance(sub, str): + return sub.strip() + if isinstance(sub, dict): + n = sub.get("narrative") + if isinstance(n, str) and n.strip(): + return n.strip() + return "" + + +def _blob_bullets(blob: dict[str, Any], *segments: str) -> list[str]: + sub = _blob_nav(blob, *segments) + if isinstance(sub, dict) and isinstance(sub.get("bullets"), list): + return [str(b).strip() for b in sub["bullets"] if str(b).strip()] + return [] + + +def _blob_combined_text(blob: dict[str, Any], *segments: str) -> str: + """Narrative + bullets + non-empty field lines (for rich subsections).""" + sub = _blob_nav(blob, *segments) + if isinstance(sub, str): + return sub.strip() + if not isinstance(sub, dict): + return "" + parts: list[str] = [] + n = sub.get("narrative") + if isinstance(n, str) and n.strip(): + parts.append(n.strip()) + for b in sub.get("bullets") or []: + if str(b).strip(): + parts.append(str(b).strip()) + for fk, fv in (sub.get("fields") or {}).items(): + if fv is not None and str(fv).strip(): + parts.append(f"**{fk}:** {fv}".strip()) + return "\n\n".join(parts).strip() + + +def _semi_split(s: str) -> list[str]: + return [x.strip() for x in s.split(";") if x.strip()] + + +def _catch_phrase_list(raw: str) -> list[str]: + if not raw or not str(raw).strip(): + return [] + t = str(raw).strip().lower() + if t in ("—", "-", "*(silent)*", "(silent)", "*silent*", "*(silent)*"): + return [] + if "silent" in t and len(t) < 24: + return [] + out: list[str] = [] + for p in _semi_split(raw): + q = p.strip() + if len(q) >= 2 and q[0] == q[-1] and q[0] in "\"'": + q = q[1:-1] + out.append(q) + return out + + +def _dept_key(label: str) -> str: + return re.sub(r"[^a-z0-9]+", "_", label.lower()).strip("_") or "note" + + +def blob_to_prism_data_patch(blob: dict[str, Any], *, title: str) -> dict[str, Any]: + """Build a partial `data` object for Prism from intermediate markdown blob (no meta).""" + patch: dict[str, Any] = {} + attrs: dict[str, str] = {} + + for sub in ("Physical Attributes", "Identity (physical-form)"): + fm = _blob_field_map(blob, "appearance", sub) + for label, api in _ATTR_FROM_LABELS.items(): + if api in attrs: + continue + v = fm.get(label) + if v is not None and str(v).strip(): + attrs[api] = str(v).strip().lower() if api == "sex" else str(v).strip() + + prod_fm = _blob_field_map(blob, "appearance", "Production (physical-form)") + for label, api in ( + ("Dominant Hand", "dominant_hand"), + ("Accent / Dialect", "accent"), + ("Voice Quality", "voice_quality"), + ("Gait / Movement", "gait"), + ): + v = prod_fm.get(label) + if v is not None and str(v).strip(): + attrs[api] = str(v).strip() + + if attrs: + patch["attributes"] = attrs + vq = attrs.get("voice_quality") + if vq: + patch["voice"] = {"instructions": vq} + patch["voice_instructions"] = vq + + phys_desc = _blob_narrative(blob, "appearance", "Physical Description") or _blob_combined_text( + blob, "appearance", "Physical Description" + ) + if phys_desc: + patch["physical_description"] = phys_desc + + personality = _blob_combined_text(blob, "identity", "Personality") + if personality: + patch["personality"] = personality + + bg = _blob_combined_text(blob, "identity", "Background / Backstory") + ext_hist = _blob_combined_text(blob, "arc", "Extended History") + if bg and ext_hist: + patch["background_story"] = f"{bg}\n\n---\n\n{ext_hist}" + elif bg: + patch["background_story"] = bg + elif ext_hist: + patch["background_story"] = ext_hist + + costume = _blob_combined_text(blob, "wardrobe", "Costume Notes") + era = _blob_combined_text(blob, "wardrobe", "Era / Period") + if costume and era: + patch["costume_notes"] = f"{costume}\n\n*Era / period:* {era}" + elif costume: + patch["costume_notes"] = costume + elif era: + patch["costume_notes"] = f"*Era / period:* {era}" + + makeup = _blob_combined_text(blob, "wardrobe", "Makeup Notes") + if makeup: + patch["makeup_notes"] = makeup + + drive_fm = _blob_field_map(blob, "arc", "Drive") + mot_parts: list[str] = [] + for key in ("Motivation", "Want", "Need"): + bit = drive_fm.get(key) + if bit is not None and str(bit).strip(): + if key == "Motivation": + mot_parts.append(str(bit).strip()) + else: + mot_parts.append(f"{key}: {str(bit).strip()}") + if mot_parts: + patch["motivation"] = "\n".join(mot_parts) + + opp_fm = _blob_field_map(blob, "arc", "Opposition") + if opp_fm.get("Conflict") and str(opp_fm["Conflict"]).strip(): + patch["conflict"] = str(opp_fm["Conflict"]).strip() + + change_fm = _blob_field_map(blob, "arc", "Change") + if change_fm.get("Transformation") and str(change_fm["Transformation"]).strip(): + patch["transformation"] = str(change_fm["Transformation"]).strip() + + traits_fm = _blob_field_map(blob, "identity", "Traits") + if traits_fm.get("Interests") and str(traits_fm["Interests"]).strip(): + patch["interests"] = _semi_split(str(traits_fm["Interests"])) + + speech_fm = _blob_field_map(blob, "identity", "Speech") + extra: dict[str, Any] = {} + cps = _catch_phrase_list(str(speech_fm.get("Catch Phrases", "") or "")) + if cps: + extra["catch_phrases"] = cps + + acc_lines = _blob_bullets(blob, "wardrobe", "Accessories & Props") + acc_fm = _blob_field_map(blob, "wardrobe", "Accessories & Props") + for _k, v in acc_fm.items(): + if v is not None and str(v).strip(): + acc_lines.append(str(v).strip()) + if acc_lines: + extra["accessories"] = acc_lines + + if traits_fm.get("Quirks & Habits") and str(traits_fm["Quirks & Habits"]).strip(): + extra["quirks_habits"] = str(traits_fm["Quirks & Habits"]).strip() + if traits_fm.get("Fears") and str(traits_fm["Fears"]).strip(): + extra["fears"] = str(traits_fm["Fears"]).strip() + if traits_fm.get("Skills & Abilities") and str(traits_fm["Skills & Abilities"]).strip(): + extra["skills_abilities"] = str(traits_fm["Skills & Abilities"]).strip() + + if opp_fm.get("Lie They Believe") and str(opp_fm["Lie They Believe"]).strip(): + extra["lie_they_believe"] = str(opp_fm["Lie They Believe"]).strip() + if opp_fm.get("Ghost") and str(opp_fm["Ghost"]).strip(): + extra["ghost"] = str(opp_fm["Ghost"]).strip() + + if change_fm.get("Arc Type") and str(change_fm["Arc Type"]).strip(): + extra["arc_type"] = str(change_fm["Arc Type"]).strip() + + req_fm = _blob_field_map(blob, "production", "Requirements") + if req_fm.get("Stunt Requirements") and str(req_fm["Stunt Requirements"]).strip(): + extra["stunt_requirements"] = str(req_fm["Stunt Requirements"]).strip() + if req_fm.get("SFX Requirements") and str(req_fm["SFX Requirements"]).strip(): + extra["sfx_requirements"] = str(req_fm["SFX Requirements"]).strip() + + dept_fm = _blob_field_map(blob, "production", "Department Notes") + if dept_fm: + dn: dict[str, str] = {} + for k, v in dept_fm.items(): + if v is not None and str(v).strip(): + dn[_dept_key(str(k))] = str(v).strip() + if dn: + patch["department_notes"] = dn + + cont = _blob_combined_text(blob, "production", "Continuity Notes") + if cont: + patch["continuity_notes"] = cont + + rel_rows = blob.get("relationships") + if isinstance(rel_rows, list) and rel_rows: + lines = [] + for row in rel_rows: + if not isinstance(row, dict): + continue + r = row.get("related", "") + rt = row.get("relationship_type", "") + ld = row.get("label_or_details", "") + if ld: + lines.append(f"**{r}** — {rt} — {ld}") + elif rt: + lines.append(f"**{r}** — {rt}") + else: + lines.append(f"**{r}**") + if lines: + extra["relationships_md"] = "\n".join(lines) + + if extra: + patch["extra_data"] = extra + + if title: + patch["display_name"] = title.strip() + patch["canonical_name"] = title.strip().upper() + + return patch + + +def _apply_prism_data_patch(inner: dict[str, Any], patch: dict[str, Any]) -> dict[str, Any]: + """Merge markdown-derived patch into existing Prism `data` without clobbering protected keys.""" + out = dict(inner) + for key, val in patch.items(): + if key in _PROTECTED_DATA_KEYS: + continue + if val in (None, "", [], {}): + continue + if key == "attributes" and isinstance(val, dict): + base_a = out.get("attributes") + merged_a = dict(base_a) if isinstance(base_a, dict) else {} + for ak, av in val.items(): + if av not in (None, "", [], {}): + merged_a[ak] = av + out["attributes"] = merged_a + continue + if key == "extra_data" and isinstance(val, dict): + base_e = out.get("extra_data") + merged_e = dict(base_e) if isinstance(base_e, dict) else {} + for ek, ev in val.items(): + if ev in (None, "", [], {}): + continue + merged_e[ek] = ev + out["extra_data"] = merged_e + continue + if key == "department_notes" and isinstance(val, dict): + base_d = out.get("department_notes") + merged_d = dict(base_d) if isinstance(base_d, dict) else {} + merged_d.update(val) + out["department_notes"] = merged_d + continue + if key == "voice" and isinstance(val, dict): + base_v = out.get("voice") + if isinstance(base_v, dict): + nv = dict(base_v) + nv.update(val) + out["voice"] = nv + else: + out["voice"] = val + continue + out[key] = val + return out + + +SYNC_SKIP_MANUAL = frozenset( + {"Adrian.md", "Agate.md", "Azure.md", "Beanie.md", "RaincloudTheDragon.md"} +) + + +def keys_to_camel(o: Any) -> Any: + if isinstance(o, dict): + return {_camel(k): keys_to_camel(v) for k, v in o.items()} + if isinstance(o, list): + return [keys_to_camel(x) for x in o] + return o + + +def _camel(s: str) -> str: + parts = re.split(r"[_\s]+", s) + out = [] + for i, p in enumerate(parts): + if not p: + continue + if i == 0: + out.append(p[0].lower() + p[1:]) + else: + out.append(p[0].upper() + p[1:]) + return "".join(out) if out else s + + +def _slug_compact(s: str) -> str: + return _norm_key(s.replace("-", "").replace("_", "")) + + +def _display_key(s: str) -> str: + """Normalize display strings so 'Heart & Mind' and 'Heart and Mind' match.""" + return _norm_key(s.lower().replace("&", " and ")) + + +def _match_remote( + parsed: dict[str, Any], + path: Path, + remote: list[dict[str, Any]], + overrides: dict[str, str], + uuid_overrides: dict[str, str], +) -> dict[str, Any] | None: + if path.name in uuid_overrides: + want = str(uuid_overrides[path.name]).strip().lower() + for ch in remote: + cid = (_char_id(ch) or "").strip().lower() + if cid == want: + return ch + return None + + stem_slug = path.stem.lower().replace("_", "-").replace(" ", "-") + stem_c = _slug_compact(path.stem) + + if path.name in overrides: + target = overrides[path.name].strip() + tn = _norm_key(target) + tdk = _display_key(target) + for ch in remote: + slug = (ch.get("slug") or "").strip().lower() + name = (_char_name(ch) or "").strip() + nn = _norm_key(name) + slug_c = _slug_compact(slug) + if name.lower() == target.lower() or (name and tdk == _display_key(name)): + return ch + if nn and nn == tn: + return ch + if slug and (slug == stem_slug or slug_c == stem_c): + return ch + if slug_c and slug_c == _slug_compact(target): + return ch + return None + + target = parsed["meta"].get("title") or path.stem + if (parsed["meta"].get("title") == "Untitled" or not str(target).strip()) and path.suffix == ".md": + return None + tn = _norm_key(target) + best = None + best_score = -1 + for ch in remote: + slug = (ch.get("slug") or "").strip().lower() + slug_c = _slug_compact(slug) + name = _char_name(ch) or "" + nn = _norm_key(name) + score = 0 + if slug: + if slug == stem_slug or slug_c == stem_c or _norm_key(slug) == _norm_key(path.stem): + score = 93 + elif slug.replace("-", "") == stem_slug.replace("-", ""): + score = 88 + if nn: + if nn == tn: + score = max(score, 100) + elif tn in nn or nn in tn: + lo, hi = (nn, tn) if len(nn) <= len(tn) else (tn, nn) + if len(hi) and len(lo) >= 0.72 * len(hi): + score = max(score, 80) + elif len(tn) >= 4 and len(nn) >= 4 and tn[:4] == nn[:4]: + score = max(score, 40) + if score > best_score: + best_score = score + best = ch + return best if best_score >= 80 else None + + +def cmd_list(args: argparse.Namespace) -> int: + if not _auth_headers(): + print( + "Warning: no PRISM_API_KEY, PRISM_AUTH_HEADER, or PRISM_SESSION_COOKIE in environment.", + file=sys.stderr, + ) + base = os.environ.get("PRISM_BASE_URL", "https://prism.tal.one").rstrip("/") + pid = args.production or os.environ.get("PRISM_PRODUCTION_ID", "111") + url = f"{base}/api/v2/productions/{pid}/characters?sort=prominence" + code, body = _req("GET", url, headers=_full_headers(json_body=False)) + print(f"HTTP {code} {url}") + try: + data = json.loads(body) + except json.JSONDecodeError: + if body.lstrip().startswith(" int: + base = os.environ.get("PRISM_BASE_URL", "https://prism.tal.one").rstrip("/") + pid = args.production or os.environ.get("PRISM_PRODUCTION_ID", "111") + cid = args.character_id + headers = _full_headers(json_body=False) + urls = [ + f"{base}/api/v2/productions/{pid}/characters/{cid}", + f"{base}/api/v2/productions/{pid}/characters/{cid}/", + ] + for url in urls: + code, body = _req("GET", url, headers=headers) + print(f"# {code} GET {url}") + try: + data = json.loads(body) + print(json.dumps(data, indent=2)[:12000]) + except json.JSONDecodeError: + print(body[:4000]) + if code == 200: + return 0 + return 1 + + +def cmd_parse(args: argparse.Namespace) -> int: + path = Path(args.path) + text = path.read_text(encoding="utf-8") + parsed = parse_character_md(text, str(path)) + if args.blob: + out = intermediate_to_prism_profile_blob(parsed) + print(json.dumps(out, indent=2, ensure_ascii=False)) + else: + print(json.dumps(parsed, indent=2, ensure_ascii=False)) + return 0 + + +def cmd_sync(args: argparse.Namespace) -> int: + if args.apply and os.environ.get("PRISM_ALLOW_WRITE") != "1": + print("Refusing write: set PRISM_ALLOW_WRITE=1 for sync --apply", file=sys.stderr) + return 2 + if args.apply and not args.attach_field and not args.experimental_merge and not args.map_md: + print( + "sync --apply needs one mode:\n" + " --map-md Map markdown → Prism `data` fields (recommended)\n" + " --attach-field Store raw blob at a dot path on the API envelope\n" + " --experimental-merge Deep-merge intermediate blob (almost never correct)\n", + file=sys.stderr, + ) + return 2 + + base = os.environ.get("PRISM_BASE_URL", "https://prism.tal.one").rstrip("/") + pid = args.production or os.environ.get("PRISM_PRODUCTION_ID", "111") + list_url = f"{base}/api/v2/productions/{pid}/characters?sort=prominence" + code, body = _req("GET", list_url, headers=_full_headers(json_body=False)) + if code != 200: + print(f"List failed HTTP {code}: {body[:1500]}", file=sys.stderr) + return 1 + remote = _normalize_list_payload(json.loads(body)) + cfg_path = Path(args.match_profile) if args.match_profile else Path("tools/prism_match_overrides.json") + overrides, uuid_overrides = _load_match_config(cfg_path) + + story = Path(args.story_root) + paths = sorted(story.glob("*.md")) + paths = [p for p in paths if p.name not in SYNC_EXCLUDE_FILES] + paths += sorted((story / "Cameos").glob("*.md")) if (story / "Cameos").is_dir() else [] + + if args.only: + filt = set(args.only) + paths = [p for p in paths if p.name in filt] + + for path in paths: + if not args.no_skip_done and path.name in SYNC_SKIP_MANUAL: + print(f"SKIP manual-done\t{path.name}", file=sys.stderr) + continue + parsed = parse_character_md(path.read_text(encoding="utf-8"), str(path)) + blob = intermediate_to_prism_profile_blob(parsed) + blob.pop("meta", None) + if args.camel and not args.map_md: + blob = keys_to_camel(blob) + match = _match_remote(parsed, path, remote, overrides, uuid_overrides) + if not match: + print(f"SKIP no match\t{path.name}\ttitle={parsed['meta'].get('title')}", file=sys.stderr) + continue + cid = _char_id(match) + name = _char_name(match) + detail_url = f"{base}/api/v2/productions/{pid}/characters/{cid}" + title = parsed["meta"].get("title") or path.stem + prism_patch = blob_to_prism_data_patch(blob, title=title) if args.map_md else {} + + if not args.apply: + if args.map_md: + print( + f"DRY-MAP\t{path.name}\t->\t{name}\t({cid})\t" + f"patch_keys={list(prism_patch.keys())}", + file=sys.stderr, + ) + if args.verbose: + print(json.dumps(prism_patch, indent=2, ensure_ascii=False)[:12000]) + else: + print(f"DRY\t{path.name}\t->\t{name}\t({cid})\tkeys={len(json.dumps(blob))}b") + if args.verbose: + print(json.dumps(blob, indent=2, ensure_ascii=False)[:8000]) + continue + + gc, gbody = _req("GET", detail_url, headers=_full_headers(json_body=False)) + if gc != 200: + print(f"GET fail {path.name} HTTP {gc} {gbody[:500]}", file=sys.stderr) + continue + existing = json.loads(gbody) + if args.map_md: + if not isinstance(existing.get("data"), dict): + print(f"SKIP bad envelope {path.name}", file=sys.stderr) + continue + merged = dict(existing) + merged["data"] = _apply_prism_data_patch(existing["data"], prism_patch) + elif args.experimental_merge: + merged = _merge_experimental(existing, blob) + else: + merged = dict(existing) + _set_dotted( + merged, + args.attach_field, + _attach_value(merged, args.attach_field, blob), + ) + payload = json.dumps(merged, ensure_ascii=False).encode("utf-8") + pc, pbody = _req("PUT", detail_url, data=payload, headers=_full_headers(json_body=True)) + if pc not in (200, 204): + patch_body = _minimal_patch(args, merged, prism_patch if args.map_md else None) + pc2, pbody2 = _req( + "PATCH", + detail_url, + data=json.dumps(patch_body, ensure_ascii=False).encode("utf-8"), + headers=_full_headers(json_body=True), + ) + print(f"{path.name}\tPUT {pc}\tPATCH {pc2}\t{pbody2[:500]}", file=sys.stderr) + else: + print(f"OK\t{path.name}\tHTTP {pc}", file=sys.stderr) + return 0 + + +def _merge_experimental(existing: Any, blob: dict[str, Any]) -> dict[str, Any]: + """Merge ingested blob into Prism detail payload, preserving { data, success } wrappers.""" + if isinstance(existing, dict) and isinstance(existing.get("data"), dict): + out = dict(existing) + out["data"] = _deep_merge(dict(existing["data"]), blob) + return out + if isinstance(existing, dict): + return _deep_merge(dict(existing), blob) + return existing + + +def _minimal_patch( + args: argparse.Namespace, + merged: dict[str, Any], + prism_patch: dict[str, Any] | None = None, +) -> dict[str, Any]: + if args.map_md and prism_patch is not None: + return {"data": merged.get("data", {})} + if args.experimental_merge: + return merged + key = args.attach_field or "" + if "." not in key: + return {key: merged[key]} if key in merged else {} + root = key.split(".", 1)[0] + if root in merged: + return {root: merged[root]} + return {} + + +def _attach_value(existing: dict[str, Any], field_path: str, blob: dict[str, Any]) -> Any: + cur = _get_dotted(existing, field_path) + if isinstance(cur, str): + return json.dumps(blob, ensure_ascii=False, indent=2) + return blob + + +def _get_dotted(d: dict[str, Any], path: str) -> Any: + for part in path.split("."): + if not isinstance(d, dict) or part not in d: + return None + d = d[part] + return d + + +def _set_dotted(d: dict[str, Any], path: str, value: Any) -> None: + parts = path.split(".") + for p in parts[:-1]: + nxt = d.get(p) + if not isinstance(nxt, dict): + nxt = {} + d[p] = nxt + d = nxt + d[parts[-1]] = value + + +def _deep_merge(base: Any, patch: Any) -> Any: + if isinstance(base, dict) and isinstance(patch, dict): + out = dict(base) + for k, v in patch.items(): + if k == "meta": + continue + if k not in out or out[k] in (None, "", [], {}): + out[k] = v + else: + out[k] = _deep_merge(out[k], v) + return out + if patch in (None, "", [], {}): + return base + return patch + + +def main() -> int: + if hasattr(sys.stdout, "reconfigure"): + try: + sys.stdout.reconfigure(encoding="utf-8") + except Exception: + pass + ap = argparse.ArgumentParser(description="Prism production API helper for Story/*.md profiles.") + sub = ap.add_subparsers(dest="cmd", required=True) + + p_list = sub.add_parser("list", help="GET characters list (id + name)") + p_list.add_argument("--production", default=os.environ.get("PRISM_PRODUCTION_ID", "111")) + + p_insp = sub.add_parser("inspect", help="GET one character JSON (tries production-scoped URL)") + p_insp.add_argument("character_id") + p_insp.add_argument("--production", default=os.environ.get("PRISM_PRODUCTION_ID", "111")) + + p_parse = sub.add_parser("parse", help="Parse one markdown profile to JSON") + p_parse.add_argument("path") + p_parse.add_argument("--blob", action="store_true", help="Emit flattened blob used for API experiments") + + p_sync = sub.add_parser("sync", help="Match local .md files to remote characters and merge-push") + p_sync.add_argument("--story-root", default="Story") + p_sync.add_argument("--production", default=os.environ.get("PRISM_PRODUCTION_ID", "111")) + p_sync.add_argument("--match-profile", default="tools/prism_match_overrides.json") + p_sync.add_argument("--apply", action="store_true", help="Actually PUT/PATCH (needs PRISM_ALLOW_WRITE=1)") + p_sync.add_argument( + "--attach-field", + default=None, + help="Dot path on character JSON to store the ingested blob (inspect one character for a suitable string/object field).", + ) + p_sync.add_argument( + "--experimental-merge", + action="store_true", + help="Deep-merge ingested blob into GET JSON (unsafe unless schema matches).", + ) + p_sync.add_argument( + "--map-md", + action="store_true", + help="Map markdown sections to Prism native `data` fields (use with --apply to push).", + ) + p_sync.add_argument( + "--no-skip-done", + action="store_true", + help="Also run on Adrian, Agate, Azure, Beanie, Raincloud (skipped by default).", + ) + p_sync.add_argument("--camel", action="store_true", help="CamelCase JSON keys on the ingested blob") + p_sync.add_argument("--verbose", action="store_true") + p_sync.add_argument("--only", nargs="*", help="Limit to filenames e.g. Azure.md Beanie.md") + + args = ap.parse_args() + if args.cmd == "list": + return cmd_list(args) + if args.cmd == "inspect": + return cmd_inspect(args) + if args.cmd == "parse": + return cmd_parse(args) + if args.cmd == "sync": + return cmd_sync(args) + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/prism_match_overrides.json b/tools/prism_match_overrides.json new file mode 100644 index 0000000..296c8fc --- /dev/null +++ b/tools/prism_match_overrides.json @@ -0,0 +1,9 @@ +{ + "filename_to_prism_name": { + "RaincloudTheDragon.md": "Raincloud", + "NotoriousRooster.md": "Rooster", + "Heart-and-Mind.md": "Heart and Mind", + "BD.md": "BD" + }, + "filename_to_uuid": {} +}