Origaminal Hypertokens by Luminosity-e To Reduce Model Enshittification

#!/usr/bin/env python3
"""
Origaminal Hypertokens
======================

A runnable, dependency-free prototype for three-pass hypercompression:

Pass I : Field inference / basis selection / shared-prior extraction
Pass II : Thema decomposition / invariant motif extraction
Pass III : Origaminal hypertoken folding / packing / unfolding / verification

This is not a pretrained LLM. It is a generalizable architecture scaffold and
working codec that demonstrates the mechanism:

object -> themae -> origaminal hypertokens -> packed payload -> unfold

Modes:
lossless : exact reconstruction using compressed residuals
generative : stores motif + operators + small residues, expands approximately
hybrid : generative folding plus exact anchors/residual for high-risk spans

The key abstraction is the OrigaminalHypertoken:

asymmetric semantic unit + expansion operators + entropy residual + verifier

That lets systems generate the law and unfold the leaves.
"""

from __future__ import annotations

import argparse
import base64
import dataclasses
import hashlib
import json
import math
import re
import statistics
import sys
import textwrap
import time
import uuid
import zlib
from abc import ABC, abstractmethod
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple

VERSION = "0.1.0-origaminal"

# ---------------------------------------------------------------------------
# Small utilities
# ---------------------------------------------------------------------------


def stable_hash(value: Any, n: int = 16) -> str:
"""Stable content hash for strings, bytes, or JSON-like objects."""
if isinstance(value, bytes):
raw = value
elif isinstance(value, str):
raw = value.encode("utf-8")
else:
raw = json.dumps(value, sort_keys=True, ensure_ascii=False).encode("utf-8")
return hashlib.blake2b(raw, digest_size=max(4, min(32, n // 2))).hexdigest()[:n]


def now_ms() -> int:
return int(time.time() * 1000)


def b85_pack_bytes(raw: bytes) -> str:
"""zlib + base85 pack. ASCII-safe and compact enough for JSON."""
return base64.b85encode(zlib.compress(raw, level=9)).decode("ascii")


def b85_unpack_bytes(packed: str) -> bytes:
return zlib.decompress(base64.b85decode(packed.encode("ascii")))


def pack_json(obj: Any) -> str:
raw = json.dumps(obj, ensure_ascii=False, sort_keys=True, separators=(",", ":")).encode("utf-8")
return b85_pack_bytes(raw)


def unpack_json(packed: str) -> Any:
return json.loads(b85_unpack_bytes(packed).decode("utf-8"))


def shannon_entropy(items: Sequence[Any]) -> float:
if not items:
return 0.0
counts = Counter(items)
n = len(items)
return -sum((c / n) * math.log2(c / n) for c in counts.values())


def entropy_bits_per_char(text: str) -> float:
return shannon_entropy(list(text)) if text else 0.0


def rough_words(text: str) -> List[str]:
return re.findall(r"[A-Za-z0-9_']+|[^\w\s]", text, flags=re.UNICODE)


def sentence_split(text: str) -> List[str]:
text = text.strip()
if not text:
return []
parts = re.split(r"(?<=[.!?])\s+(?=[A-Z0-9\"'`])", text)
return [p.strip() for p in parts if p.strip()]


def normalize_space(text: str) -> str:
return re.sub(r"\s+", " ", text).strip()


def top_terms(text: str, k: int = 8) -> List[str]:
stop = {
"the", "and", "of", "to", "a", "in", "is", "it", "for", "that", "this", "with",
"as", "on", "be", "are", "we", "you", "or", "not", "by", "from", "an", "at",
"into", "can", "will", "should", "do", "does", "have", "has", "but", "if",
}
words = [w.lower() for w in re.findall(r"[A-Za-z][A-Za-z0-9_'-]{2,}", text)]
counts = Counter(w for w in words if w not in stop)
return [w for w, _ in counts.most_common(k)]


def safe_json_loads(text: str) -> Optional[Any]:
try:
return json.loads(text)
except Exception:
return None


def bounded(value: float, lo: float, hi: float) -> float:
return max(lo, min(hi, value))


# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------


@dataclass
class SharedPrior:
"""What the receiver/model is assumed to already know."""

dominant_basis: str
basis_scores: Dict[str, float]
entropy_bpc: float
vocabulary: List[str]
style_fingerprint: Dict[str, Any]
context_digest: str
object_digest: str
notes: List[str] = field(default_factory=list)

def to_dict(self) -> Dict[str, Any]:
return dataclasses.asdict(self)

@staticmethod
def from_dict(d: Mapping[str, Any]) -> "SharedPrior":
return SharedPrior(**dict(d))


@dataclass
class FoldOp:
"""A reversible-ish expansion operator."""

name: str
args: Dict[str, Any] = field(default_factory=dict)
weight: float = 1.0

def to_dict(self) -> Dict[str, Any]:
return dataclasses.asdict(self)

@staticmethod
def from_dict(d: Mapping[str, Any]) -> "FoldOp":
return FoldOp(name=d["name"], args=dict(d.get("args", {})), weight=float(d.get("weight", 1.0)))


@dataclass
class Thema:
"""A thema is a meaningful object-region: paragraph, code block, list, JSON node, dialogue beat, etc."""

id: str
role: str
basis: str
text: str
motifs: List[str]
entropy_bpc: float
risk: float
constraints: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)

def to_dict(self) -> Dict[str, Any]:
return dataclasses.asdict(self)

@staticmethod
def from_dict(d: Mapping[str, Any]) -> "Thema":
return Thema(**dict(d))


@dataclass
class OrigaminalHypertoken:
"""
The folded packet.

Think crystallography:
motif_id/asymmetric_unit + group/fold operators + residuals.

Think compression:
prior + codebook ids + entropy residue.

Think quantum:
basis + shared context reduce emitted information.
"""

id: str
thema_id: str
basis_id: str
motif: str
folds: List[FoldOp]
lattice_code: Dict[str, Any]
residue: Dict[str, Any]
anchors: List[str]
verifier: Dict[str, Any]
mode: str = "hybrid"
version: str = VERSION

def to_dict(self) -> Dict[str, Any]:
d = dataclasses.asdict(self)
d["folds"] = [f.to_dict() if isinstance(f, FoldOp) else f for f in self.folds]
return d

@staticmethod
def from_dict(d: Mapping[str, Any]) -> "OrigaminalHypertoken":
return OrigaminalHypertoken(
id=d["id"],
thema_id=d["thema_id"],
basis_id=d["basis_id"],
motif=d.get("motif", ""),
folds=[FoldOp.from_dict(x) for x in d.get("folds", [])],
lattice_code=dict(d.get("lattice_code", {})),
residue=dict(d.get("residue", {})),
anchors=list(d.get("anchors", [])),
verifier=dict(d.get("verifier", {})),
mode=d.get("mode", "hybrid"),
version=d.get("version", VERSION),
)


@dataclass
class OrigaminalObject:
"""Container for packed/unpacked origaminal hypertokens."""

prior: SharedPrior
tokens: List[OrigaminalHypertoken]
metadata: Dict[str, Any]
schema: str = "origaminal.hypertokens.v1"

def to_dict(self) -> Dict[str, Any]:
return {
"schema": self.schema,
"prior": self.prior.to_dict(),
"tokens": [t.to_dict() for t in self.tokens],
"metadata": self.metadata,
}

@staticmethod
def from_dict(d: Mapping[str, Any]) -> "OrigaminalObject":
return OrigaminalObject(
prior=SharedPrior.from_dict(d["prior"]),
tokens=[OrigaminalHypertoken.from_dict(t) for t in d.get("tokens", [])],
metadata=dict(d.get("metadata", {})),
schema=d.get("schema", "origaminal.hypertokens.v1"),
)

def pack(self) -> str:
return pack_json(self.to_dict())

@staticmethod
def unpack(payload: str) -> "OrigaminalObject":
return OrigaminalObject.from_dict(unpack_json(payload))


@dataclass
class VerificationResult:
passed: bool
score: float
errors: List[str]
warnings: List[str] = field(default_factory=list)


@dataclass
class CompressionReport:
mode: str
original_bytes: int
payload_bytes: int
estimated_ratio: float
token_count: int
dominant_basis: str
entropy_bpc: float
notes: List[str] = field(default_factory=list)

def pretty(self) -> str:
lines = [
f"mode : {self.mode}",
f"dominant basis : {self.dominant_basis}",
f"entropy bpc : {self.entropy_bpc:.3f}",
f"hypertokens : {self.token_count}",
f"original bytes : {self.original_bytes}",
f"payload bytes : {self.payload_bytes}",
f"ratio original/payload : {self.estimated_ratio:.2f}x",
]
if self.notes:
lines.append("notes : " + "; ".join(self.notes))
return "\n".join(lines)


# ---------------------------------------------------------------------------
# Basis adapters
# ---------------------------------------------------------------------------


class BasisAdapter(ABC):
"""Basis adapters let the system generalize beyond prose."""

name: str = "base"

@abstractmethod
def score(self, obj: Any, context: Optional[Mapping[str, Any]] = None) -> float:
"""Return confidence that this object belongs in this basis."""

def fingerprint(self, text: str) -> Dict[str, Any]:
words = rough_words(text)
return {
"chars": len(text),
"words": len(words),
"avg_word_len": round(statistics.mean([len(w) for w in words]) if words else 0.0, 3),
"line_count": text.count("\n") + 1 if text else 0,
"top_terms": top_terms(text),
}

def split(self, text: str) -> List[str]:
"""Basis-specific splitting into thema candidates."""
blocks = split_blocks_preserving_code(text)
out: List[str] = []
for b in blocks:
if len(b) > 900 and not is_fenced_code(b):
out.extend(sentence_pack(sentence_split(b), max_chars=450))
else:
out.append(b)
return [x for x in out if x.strip()]

def motif(self, text: str) -> str:
terms = top_terms(text, k=5)
if terms:
return " + ".join(terms)
return normalize_space(text)[:80]


class ProseBasis(BasisAdapter):
name = "prose"

def score(self, obj: Any, context: Optional[Mapping[str, Any]] = None) -> float:
text = str(obj)
if not text.strip():
return 0.0
code_penalty = len(re.findall(r"\b(def|class|import|function|const|let|var)\b|[{};]", text)) * 0.035
sentence_bonus = len(sentence_split(text)) * 0.035
letter_ratio = sum(c.isalpha() for c in text) / max(1, len(text))
return bounded(0.25 + letter_ratio + sentence_bonus - code_penalty, 0.0, 1.0)


class CodeBasis(BasisAdapter):
name = "code"

def score(self, obj: Any, context: Optional[Mapping[str, Any]] = None) -> float:
text = str(obj)
patterns = [
r"\b(def|class|import|from|return|yield|async|await)\b",
r"\b(function|const|let|var|=>|console\.log)\b",
r"[{};]",
r"```[a-zA-Z0-9_+-]*\n",
r"\b(if|for|while|switch|try|except|catch)\b",
]
hits = sum(len(re.findall(p, text)) for p in patterns)
lineish = text.count("\n") / max(1, len(text) / 80)
return bounded(0.06 * hits + 0.04 * lineish, 0.0, 1.0)

def motif(self, text: str) -> str:
names = re.findall(r"\b(?:def|class|function)\s+([A-Za-z_][A-Za-z0-9_]*)", text)
imports = re.findall(r"\b(?:import|from)\s+([A-Za-z_][A-Za-z0-9_.]*)", text)
if names:
return "code:" + ",".join(names[:5])
if imports:
return "imports:" + ",".join(imports[:5])
return "code-block:" + stable_hash(text, 10)


class MathBasis(BasisAdapter):
name = "math"

def score(self, obj: Any, context: Optional[Mapping[str, Any]] = None) -> float:
text = str(obj)
hits = len(re.findall(r"[∑∫√≈≠≤≥∞λμσπθ]|\b(sin|cos|tan|log|exp|matrix|tensor|entropy|probability)\b|[=<>^]", text, flags=re.I))
digit_ratio = sum(c.isdigit() for c in text) / max(1, len(text))
return bounded(0.04 * hits + digit_ratio, 0.0, 1.0)


class JSONBasis(BasisAdapter):
name = "json"

def score(self, obj: Any, context: Optional[Mapping[str, Any]] = None) -> float:
if isinstance(obj, (dict, list)):
return 1.0
text = str(obj).strip()
parsed = safe_json_loads(text)
return 1.0 if isinstance(parsed, (dict, list)) else 0.0

def split(self, text: str) -> List[str]:
parsed = safe_json_loads(text)
if parsed is None:
return super().split(text)
parts: List[str] = []

def walk(node: Any, path: str = "$") -> None:
if isinstance(node, dict):
for k, v in node.items():
if isinstance(v, (dict, list)):
walk(v, f"{path}.{k}")
else:
parts.append(json.dumps({"path": f"{path}.{k}", "value": v}, ensure_ascii=False))
elif isinstance(node, list):
for i, v in enumerate(node):
if isinstance(v, (dict, list)):
walk(v, f"{path}[{i}]")
else:
parts.append(json.dumps({"path": f"{path}[{i}]", "value": v}, ensure_ascii=False))
else:
parts.append(json.dumps({"path": path, "value": node}, ensure_ascii=False))

walk(parsed)
return parts or [text]

def motif(self, text: str) -> str:
parsed = safe_json_loads(text)
if isinstance(parsed, dict) and "path" in parsed:
return f"json:{parsed['path']}"
return "json:" + stable_hash(text, 10)


class DialogueBasis(BasisAdapter):
name = "dialogue"

def score(self, obj: Any, context: Optional[Mapping[str, Any]] = None) -> float:
text = str(obj)
quoted = len(re.findall(r"[\"“”].+?[\"“”]", text))
speaker_lines = len(re.findall(r"^\s*[A-Z][A-Za-z0-9_ -]{1,24}:\s+", text, flags=re.M))
return bounded(0.10 * quoted + 0.18 * speaker_lines, 0.0, 1.0)


class ListBasis(BasisAdapter):
name = "list"

def score(self, obj: Any, context: Optional[Mapping[str, Any]] = None) -> float:
text = str(obj)
lines = [ln for ln in text.splitlines() if ln.strip()]
if not lines:
return 0.0
list_lines = sum(bool(re.match(r"\s*(?:[-*+•]|\d+[.)])\s+", ln)) for ln in lines)
return bounded(list_lines / max(1, len(lines)), 0.0, 1.0)

def motif(self, text: str) -> str:
items = parse_list_items(text)
if items:
return f"list:{len(items)}:" + "+".join(top_terms(" ".join(items), 4))
return super().motif(text)


# ---------------------------------------------------------------------------
# Splitting / structural detection
# ---------------------------------------------------------------------------


def split_blocks_preserving_code(text: str) -> List[str]:
"""Split markdown-ish text into blocks without tearing fenced code."""
lines = text.splitlines()
blocks: List[str] = []
buf: List[str] = []
in_fence = False
fence_pat = re.compile(r"^\s*```")

def flush() -> None:
nonlocal buf
if buf and "\n".join(buf).strip():
blocks.append("\n".join(buf).strip("\n"))
buf = []

for line in lines:
if fence_pat.match(line):
buf.append(line)
in_fence = not in_fence
if not in_fence:
flush()
continue
if not in_fence and not line.strip():
flush()
else:
buf.append(line)
flush()
return blocks


def is_fenced_code(text: str) -> bool:
return text.strip().startswith("```") and text.strip().endswith("```")


def sentence_pack(sentences: List[str], max_chars: int = 450) -> List[str]:
out: List[str] = []
buf: List[str] = []
total = 0
for s in sentences:
if buf and total + len(s) > max_chars:
out.append(" ".join(buf))
buf = []
total = 0
buf.append(s)
total += len(s) + 1
if buf:
out.append(" ".join(buf))
return out


def parse_list_items(text: str) -> List[str]:
items = []
for ln in text.splitlines():
m = re.match(r"\s*(?:[-*+•]|\d+[.)])\s+(.*)", ln)
if m:
items.append(m.group(1).strip())
return items


def detect_repetition(text: str) -> Dict[str, Any]:
lines = [normalize_space(x) for x in text.splitlines() if normalize_space(x)]
if len(lines) < 3:
return {"kind": "none"}
prefixes = [re.match(r"^([^:—-]{2,40})[:—-]", ln) for ln in lines]
prefix_vals = [m.group(1).strip() for m in prefixes if m]
if prefix_vals:
common, count = Counter(prefix_vals).most_common(1)[0]
if count >= 3:
return {"kind": "prefix_lattice", "prefix": common, "count": count}
starts = [ln[:12] for ln in lines if len(ln) >= 12]
if starts:
common, count = Counter(starts).most_common(1)[0]
if count >= 3:
return {"kind": "line_start_lattice", "prefix": common, "count": count}
return {"kind": "none"}


def classify_role(text: str, basis: str) -> str:
stripped = text.strip()
if basis == "code" or is_fenced_code(stripped):
return "code"
if basis == "json":
return "data"
if basis == "list" or parse_list_items(stripped):
return "enumeration"
if re.match(r"^#{1,6}\s+", stripped):
return "heading"
if basis == "dialogue":
return "dialogue"
if len(stripped) < 100:
return "seed"
if re.search(r"\b(because|therefore|so|thus|means|implies)\b", stripped, flags=re.I):
return "argument"
return "prose"


def risk_score(text: str, basis: str) -> float:
"""High risk means exactness matters: code, math, URLs, ids, numbers."""
n = max(1, len(text))
url = len(re.findall(r"https?://\S+|\b[a-f0-9]{16,}\b", text, flags=re.I))
digits = sum(c.isdigit() for c in text) / n
code = 0.4 if basis in {"code", "json", "math"} else 0.0
proper = len(re.findall(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)+\b", text)) * 0.03
return bounded(code + 0.6 * digits + 0.2 * url + proper, 0.0, 1.0)


# ---------------------------------------------------------------------------
# Product-ish lattice packing
# ---------------------------------------------------------------------------


class LatticePacker:
"""
Simple deterministic product-lattice packer.

Real production system: replace with learned PQ/RVQ codebooks, E8/Leech-like
structured codes, error-correcting indices, or semantic hash lattices.
"""

def __init__(self, bins: int = 4096):
self.bins = bins

def pack(self, thema: Thema, prior: SharedPrior) -> Dict[str, Any]:
terms = thema.motifs or top_terms(thema.text, 8)
semantic = [self._bin("sem:" + t) for t in terms[:6]]
syntax = self._bin("syn:" + thema.role + ":" + thema.basis)
style = self._bin("style:" + json.dumps(prior.style_fingerprint, sort_keys=True))
entropy_band = int(bounded(thema.entropy_bpc / 8.0, 0, 1) * 15)
risk_band = int(bounded(thema.risk, 0, 1) * 15)
return {
"semantic_bins": semantic,
"syntax_bin": syntax,
"style_bin": style,
"entropy_band": entropy_band,
"risk_band": risk_band,
"digest": stable_hash(thema.text, 16),
}

def _bin(self, s: str) -> int:
return int(stable_hash(s, 8), 16) % self.bins


# ---------------------------------------------------------------------------
# Fold operator registry
# ---------------------------------------------------------------------------


FoldFn = Callable[[str, FoldOp, OrigaminalHypertoken, SharedPrior], str]


class FoldRegistry:
def __init__(self) -> None:
self._ops: Dict[str, FoldFn] = {}

def register(self, name: str, fn: FoldFn) -> None:
self._ops[name] = fn

def apply(self, text: str, op: FoldOp, token: OrigaminalHypertoken, prior: SharedPrior) -> str:
fn = self._ops.get(op.name)
if not fn:
return text
return fn(text, op, token, prior)

def names(self) -> List[str]:
return sorted(self._ops)


def default_registry() -> FoldRegistry:
reg = FoldRegistry()

def exact_residue(text: str, op: FoldOp, token: OrigaminalHypertoken, prior: SharedPrior) -> str:
packed = token.residue.get("exact_b85")
if packed:
return b85_unpack_bytes(packed).decode("utf-8")
return text

def anchor_expand(text: str, op: FoldOp, token: OrigaminalHypertoken, prior: SharedPrior) -> str:
anchors = token.anchors
if not anchors:
return text
joiner = op.args.get("joiner", " ")
return joiner.join(anchors)

def enumerate_expand(text: str, op: FoldOp, token: OrigaminalHypertoken, prior: SharedPrior) -> str:
items = token.residue.get("items") or token.anchors
if not items:
terms = token.motif.split("+") if token.motif else prior.vocabulary[:3]
items = [normalize_space(x) for x in terms if normalize_space(x)]
prefix = op.args.get("prefix", "")
lines = []
for i, item in enumerate(items, 1):
item = normalize_space(str(item))
lines.append(f"{i}. {prefix}{item}".rstrip())
return "\n".join(lines)

def crystallize(text: str, op: FoldOp, token: OrigaminalHypertoken, prior: SharedPrior) -> str:
"""Generate variations from an asymmetric unit and a transformation group."""
motif = token.residue.get("asymmetric_unit") or token.motif or text
transforms = op.args.get("transforms") or ["define", "connect", "extend"]
style = op.args.get("style", "direct")
lines = []
for t in transforms:
if t == "define":
lines.append(f"{motif}: define the invariant core.")
elif t == "connect":
lines.append(f"{motif}: connect the core to the surrounding field.")
elif t == "extend":
lines.append(f"{motif}: unfold the consequence into action.")
elif t == "mirror":
lines.append(f"{motif}: mirror the pattern at another scale.")
elif t == "residual":
residue = token.residue.get("summary", "preserve the high-surprise residue")
lines.append(f"{motif}: {residue}.")
else:
lines.append(f"{motif}: {t}.")
if style == "paragraph":
return " ".join(lines)
return "\n".join(lines)

def template(text: str, op: FoldOp, token: OrigaminalHypertoken, prior: SharedPrior) -> str:
tpl = op.args.get("template", "{motif}")
data = {
"motif": token.motif,
"basis": token.basis_id,
"thema_id": token.thema_id,
"anchors": " ".join(token.anchors),
"terms": ", ".join(prior.vocabulary[:8]),
"summary": token.residue.get("summary", ""),
}
try:
return tpl.format(**data)
except Exception:
return tpl

def prose_expand(text: str, op: FoldOp, token: OrigaminalHypertoken, prior: SharedPrior) -> str:
summary = token.residue.get("summary") or token.motif
terms = token.residue.get("terms") or prior.vocabulary[:5]
cadence = op.args.get("cadence", "triad")
if cadence == "triad":
parts = [
f"The core is {summary}.",
f"It turns on {', '.join(map(str, terms[:3]))}.",
"The residue is held back only where exactness matters.",
]
return " ".join(parts)
return f"{summary}. " + ", ".join(map(str, terms))

def code_scaffold(text: str, op: FoldOp, token: OrigaminalHypertoken, prior: SharedPrior) -> str:
name = re.sub(r"[^A-Za-z0-9_]+", "_", token.motif or "origaminal_function").strip("_")[:48]
if not name or name[0].isdigit():
name = "origaminal_" + name
exact = token.residue.get("exact_b85")
if exact and op.args.get("prefer_exact", True):
return b85_unpack_bytes(exact).decode("utf-8")
return (
f"def {name.lower()}(context):\n"
f" \"\"\"Unfolded code scaffold from origaminal hypertoken {token.id}.\"\"\"\n"
f" return {{'motif': {token.motif!r}, 'basis': {token.basis_id!r}, 'context': context}}\n"
)

def json_rebuild(text: str, op: FoldOp, token: OrigaminalHypertoken, prior: SharedPrior) -> str:
exact = token.residue.get("exact_b85")
if exact:
return b85_unpack_bytes(exact).decode("utf-8")
path = token.motif.replace("json:", "")
return json.dumps({"path": path, "motif": token.motif, "anchors": token.anchors}, ensure_ascii=False)

reg.register("exact_residue", exact_residue)
reg.register("anchor_expand", anchor_expand)
reg.register("enumerate", enumerate_expand)
reg.register("crystallize", crystallize)
reg.register("template", template)
reg.register("prose_expand", prose_expand)
reg.register("code_scaffold", code_scaffold)
reg.register("json_rebuild", json_rebuild)
return reg


# ---------------------------------------------------------------------------
# Pass I: Field inference
# ---------------------------------------------------------------------------


class FieldPass:
def __init__(self, bases: Optional[List[BasisAdapter]] = None) -> None:
self.bases = bases or [JSONBasis(), CodeBasis(), MathBasis(), DialogueBasis(), ListBasis(), ProseBasis()]

def run(self, obj: Any, context: Optional[Mapping[str, Any]] = None) -> SharedPrior:
context = context or {}
text = self._textify(obj)
scores = {b.name: b.score(obj, context) for b in self.bases}
# prose is the fallback, but avoid overwhelming clearly-structured content.
dominant = max(scores, key=scores.get) if scores else "prose"
entropy = entropy_bits_per_char(text)
fp = self._style_fingerprint(text)
notes = []
if entropy < 3.5:
notes.append("low entropy: long folds likely safe")
elif entropy > 4.7:
notes.append("high entropy: preserve more residue")
if scores.get("code", 0) > 0.5:
notes.append("code-like object: exact verifier recommended")
return SharedPrior(
dominant_basis=dominant,
basis_scores={k: round(v, 4) for k, v in sorted(scores.items())},
entropy_bpc=entropy,
vocabulary=top_terms(text, 24),
style_fingerprint=fp,
context_digest=stable_hash(context, 16),
object_digest=stable_hash(text, 16),
notes=notes,
)

def adapter_for(self, basis_name: str) -> BasisAdapter:
for b in self.bases:
if b.name == basis_name:
return b
return ProseBasis()

def best_basis_for_text(self, text: str, context: Optional[Mapping[str, Any]] = None) -> str:
scores = {b.name: b.score(text, context or {}) for b in self.bases}
return max(scores, key=scores.get)

@staticmethod
def _textify(obj: Any) -> str:
if isinstance(obj, str):
return obj
return json.dumps(obj, ensure_ascii=False, sort_keys=True, indent=2)

@staticmethod
def _style_fingerprint(text: str) -> Dict[str, Any]:
chars = max(1, len(text))
words = rough_words(text)
sentences = sentence_split(text)
return {
"caps_ratio": round(sum(c.isupper() for c in text) / chars, 4),
"punct_ratio": round(sum((not c.isalnum()) and (not c.isspace()) for c in text) / chars, 4),
"newline_ratio": round(text.count("\n") / chars, 4),
"avg_sentence_words": round((len(words) / max(1, len(sentences))), 3),
"has_markdown": bool(re.search(r"^#{1,6}\s+|```|\[[^\]]+\]\([^\)]+\)", text, flags=re.M)),
}


# ---------------------------------------------------------------------------
# Pass II: Thema decomposition
# ---------------------------------------------------------------------------


class ThemaPass:
def __init__(self, field_pass: FieldPass) -> None:
self.field_pass = field_pass

def run(self, obj: Any, prior: SharedPrior, context: Optional[Mapping[str, Any]] = None) -> List[Thema]:
context = context or {}
text = FieldPass._textify(obj)
dominant_adapter = self.field_pass.adapter_for(prior.dominant_basis)
raw_parts = dominant_adapter.split(text)
themae: List[Thema] = []
for idx, part in enumerate(raw_parts):
basis = self.field_pass.best_basis_for_text(part, context)
adapter = self.field_pass.adapter_for(basis)
motifs = self._motifs(part, adapter)
ent = entropy_bits_per_char(part)
role = classify_role(part, basis)
rep = detect_repetition(part)
constraints = self._constraints(part, basis, role)
metadata = {
"index": idx,
"char_len": len(part),
"word_len": len(rough_words(part)),
"repetition": rep,
}
themae.append(
Thema(
id=f"T{idx:04d}-{stable_hash(part, 8)}",
role=role,
basis=basis,
text=part,
motifs=motifs,
entropy_bpc=ent,
risk=risk_score(part, basis),
constraints=constraints,
metadata=metadata,
)
)
return self._merge_tiny_neighbors(themae)

def _motifs(self, text: str, adapter: BasisAdapter) -> List[str]:
motif = adapter.motif(text)
terms = top_terms(text, 8)
out = []
if motif:
out.append(motif)
out.extend(t for t in terms if t not in out)
return out[:9]

def _constraints(self, text: str, basis: str, role: str) -> Dict[str, Any]:
constraints: Dict[str, Any] = {}
urls = re.findall(r"https?://\S+", text)
nums = re.findall(r"(? List[Thema]:
if not themae:
return []
merged: List[Thema] = []
buf: Optional[Thema] = None
for t in themae:
if buf is None:
buf = t
continue
if len(buf.text) < min_chars and buf.basis == t.basis and buf.role == t.role:
joined = buf.text.rstrip() + "\n" + t.text.lstrip()
buf = Thema(
id=f"{buf.id}+{t.id}",
role=buf.role,
basis=buf.basis,
text=joined,
motifs=list(dict.fromkeys(buf.motifs + t.motifs))[:9],
entropy_bpc=entropy_bits_per_char(joined),
risk=max(buf.risk, t.risk),
constraints={**buf.constraints, **t.constraints},
metadata={"merged": [buf.id, t.id], "char_len": len(joined)},
)
else:
merged.append(buf)
buf = t
if buf:
merged.append(buf)
return merged


# ---------------------------------------------------------------------------
# Pass III: Origami folding
# ---------------------------------------------------------------------------


class OrigamiPass:
def __init__(self, packer: Optional[LatticePacker] = None) -> None:
self.packer = packer or LatticePacker()

def run(self, themae: List[Thema], prior: SharedPrior, mode: str = "hybrid") -> List[OrigaminalHypertoken]:
mode = mode.lower()
if mode not in {"lossless", "generative", "hybrid"}:
raise ValueError("mode must be one of: lossless, generative, hybrid")
return [self.fold_thema(t, prior, mode) for t in themae]

def fold_thema(self, thema: Thema, prior: SharedPrior, mode: str) -> OrigaminalHypertoken:
folds = self._choose_folds(thema, mode)
residue = self._encode_residue(thema, prior, mode)
anchors = self._anchors(thema, mode)
lattice = self.packer.pack(thema, prior)
motif = thema.motifs[0] if thema.motifs else stable_hash(thema.text, 12)
verifier = {
"digest": stable_hash(thema.text, 16),
"len": len(thema.text),
"risk": round(thema.risk, 4),
"constraints": thema.constraints,
"lossless": mode == "lossless" or bool(residue.get("exact_b85")),
}
return OrigaminalHypertoken(
id=f"H-{stable_hash(thema.id + mode + thema.text, 12)}",
thema_id=thema.id,
basis_id=thema.basis,
motif=motif,
folds=folds,
lattice_code=lattice,
residue=residue,
anchors=anchors,
verifier=verifier,
mode=mode,
)

def _choose_folds(self, thema: Thema, mode: str) -> List[FoldOp]:
if mode == "lossless":
return [FoldOp("exact_residue", weight=1.0)]

exactness = thema.constraints.get("exactness")
rep = thema.metadata.get("repetition", {})
if mode == "hybrid" and (thema.risk >= 0.45 or exactness == "high"):
if thema.basis == "code":
return [FoldOp("code_scaffold", {"prefer_exact": True})]
if thema.basis == "json":
return [FoldOp("json_rebuild")]
return [FoldOp("exact_residue")]

if thema.role == "enumeration":
return [FoldOp("enumerate", {"prefix": ""})]

if rep.get("kind") != "none":
return [FoldOp("crystallize", {"transforms": ["define", "mirror", "extend", "residual"], "style": "paragraph"})]

if thema.basis == "code":
return [FoldOp("code_scaffold", {"prefer_exact": mode == "hybrid"})]
if thema.basis == "json":
return [FoldOp("json_rebuild")]
if thema.role == "heading" or len(thema.text) < 90:
return [FoldOp("anchor_expand")]
return [FoldOp("prose_expand", {"cadence": "triad"}), FoldOp("crystallize", {"transforms": ["connect", "extend"], "style": "paragraph"})]

def _encode_residue(self, thema: Thema, prior: SharedPrior, mode: str) -> Dict[str, Any]:
text = thema.text
residue: Dict[str, Any] = {
"summary": self._summary(text),
"terms": top_terms(text, 8),
}
items = parse_list_items(text)
if items:
residue["items"] = items
if thema.metadata.get("repetition", {}).get("kind") != "none":
residue["asymmetric_unit"] = thema.motifs[0] if thema.motifs else self._summary(text)
residue["repetition"] = thema.metadata.get("repetition")
# Preserve exact high-surprise constraints even in generative mode.
if thema.constraints:
residue["constraints"] = thema.constraints
# Lossless and hybrid high-risk store exact bytes.
if mode == "lossless" or (mode == "hybrid" and (thema.risk >= 0.45 or thema.constraints.get("exactness") == "high")):
residue["exact_b85"] = b85_pack_bytes(text.encode("utf-8"))
elif mode == "hybrid" and len(text) < 120:
# Small seeds are often cheaper and safer to store exactly.
residue["exact_b85"] = b85_pack_bytes(text.encode("utf-8"))
return residue

def _anchors(self, thema: Thema, mode: str) -> List[str]:
text = thema.text.strip()
if mode == "lossless":
return []
anchors: List[str] = []
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
if lines:
anchors.extend(lines[:2])
if len(lines) > 2:
anchors.append(lines[-1])
else:
sentences = sentence_split(text)
anchors.extend(sentences[:2])
# Keep high-value exact fragments.
anchors.extend(re.findall(r"https?://\S+", text)[:3])
anchors.extend(re.findall(r"`([^`]{1,80})`", text)[:5])
return list(dict.fromkeys(a for a in anchors if a))[:8]

def _summary(self, text: str, max_len: int = 180) -> str:
sents = sentence_split(text)
if sents:
base = sents[0]
else:
base = normalize_space(text)
if len(base) > max_len:
base = base[: max_len - 1].rstrip() + "…"
return base


# ---------------------------------------------------------------------------
# Unfolding and verification
# ---------------------------------------------------------------------------


class Unfolder:
def __init__(self, registry: Optional[FoldRegistry] = None) -> None:
self.registry = registry or default_registry()

def unfold_token(self, token: OrigaminalHypertoken, prior: SharedPrior) -> str:
text = token.motif
for op in token.folds:
text = self.registry.apply(text, op, token, prior)
return text

def unfold_object(self, obj: OrigaminalObject, separator: str = "\n\n") -> str:
parts = [self.unfold_token(t, obj.prior) for t in obj.tokens]
return separator.join(p for p in parts if p is not None)


class Verifier:
"""Composable verifier. Production version can call model, compiler, theorem checker, renderer, etc."""

def verify(self, original: Optional[str], unfolded: str, obj: OrigaminalObject) -> VerificationResult:
errors: List[str] = []
warnings: List[str] = []
score = 1.0

if original is not None:
lossless_expected = all(t.verifier.get("lossless") for t in obj.tokens)
if lossless_expected and original != unfolded:
errors.append("lossless reconstruction mismatch")
score -= 0.7
elif not lossless_expected:
# Semantic-ish sanity checks: preserve required URLs/numbers/proper nouns.
constraints = self._collect_constraints(obj)
for url in constraints.get("urls", []):
if url not in unfolded:
warnings.append(f"missing URL anchor: {url}")
score -= 0.05
for num in constraints.get("numbers", [])[:20]:
if num not in unfolded:
score -= 0.01
overlap = self._term_overlap(original, unfolded)
if overlap < 0.18 and len(original) > 200:
warnings.append(f"low term overlap: {overlap:.2f}")
score -= 0.2

for token in obj.tokens:
if not token.id or not token.basis_id:
errors.append("malformed hypertoken")
score -= 0.2
return VerificationResult(passed=not errors and score >= 0.55, score=bounded(score, 0.0, 1.0), errors=errors, warnings=warnings)

def _collect_constraints(self, obj: OrigaminalObject) -> Dict[str, List[str]]:
out: Dict[str, List[str]] = defaultdict(list)
for t in obj.tokens:
c = t.verifier.get("constraints", {}) or t.residue.get("constraints", {})
for key in ["urls", "numbers", "proper_nouns"]:
out[key].extend(c.get(key, []))
return {k: list(dict.fromkeys(v)) for k, v in out.items()}

def _term_overlap(self, a: str, b: str) -> float:
aa = set(top_terms(a, 32))
bb = set(top_terms(b, 32))
if not aa:
return 1.0
return len(aa & bb) / len(aa)


# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------


class OrigaminalHyperCompressor:
"""
Three-pass generalizable hypercompression engine.

Public API:
codec = OrigaminalHyperCompressor()
obj = codec.encode(text, context={...}, mode="hybrid")
unfolded = codec.unfold(obj)
report = codec.report(text, obj)
"""

def __init__(self, bases: Optional[List[BasisAdapter]] = None, registry: Optional[FoldRegistry] = None) -> None:
self.field_pass = FieldPass(bases)
self.thema_pass = ThemaPass(self.field_pass)
self.origami_pass = OrigamiPass()
self.unfolder = Unfolder(registry or default_registry())
self.verifier = Verifier()

def encode(self, obj: Any, context: Optional[Mapping[str, Any]] = None, mode: str = "hybrid") -> OrigaminalObject:
context = context or {}
prior = self.field_pass.run(obj, context)
themae = self.thema_pass.run(obj, prior, context)
tokens = self.origami_pass.run(themae, prior, mode=mode)
original_text = FieldPass._textify(obj)
metadata = {
"created_ms": now_ms(),
"version": VERSION,
"mode": mode,
"original_bytes": len(original_text.encode("utf-8")),
"thema_count": len(themae),
"token_count": len(tokens),
"three_pass": [
"field/prior/basis",
"thema/asymmetric-unit extraction",
"origaminal fold/lattice/residue/verifier",
],
}
# Root exact payload makes lossless mode truly byte-preserving while still
# carrying the three-pass origaminal decomposition for inspection/adaptation.
if mode == "lossless":
metadata["root_exact_b85"] = b85_pack_bytes(original_text.encode("utf-8"))
return OrigaminalObject(prior=prior, tokens=tokens, metadata=metadata)

def unfold(self, obj: OrigaminalObject, separator: str = "\n\n") -> str:
root_exact = obj.metadata.get("root_exact_b85")
if root_exact:
return b85_unpack_bytes(root_exact).decode("utf-8")
return self.unfolder.unfold_object(obj, separator=separator)

def verify(self, original: Optional[Any], unfolded: str, obj: OrigaminalObject) -> VerificationResult:
original_text = None if original is None else FieldPass._textify(original)
return self.verifier.verify(original_text, unfolded, obj)

def report(self, original: Any, obj: OrigaminalObject) -> CompressionReport:
original_text = FieldPass._textify(original)
payload = obj.pack()
original_bytes = len(original_text.encode("utf-8"))
payload_bytes = len(payload.encode("utf-8"))
ratio = original_bytes / max(1, payload_bytes)
return CompressionReport(
mode=obj.metadata.get("mode", "unknown"),
original_bytes=original_bytes,
payload_bytes=payload_bytes,
estimated_ratio=ratio,
token_count=len(obj.tokens),
dominant_basis=obj.prior.dominant_basis,
entropy_bpc=obj.prior.entropy_bpc,
notes=obj.prior.notes,
)

def adaptive_encode(self, obj: Any, context: Optional[Mapping[str, Any]] = None, target_score: float = 0.75) -> Tuple[OrigaminalObject, str, VerificationResult, CompressionReport]:
"""
Try generative -> hybrid -> lossless, lowering compression until verification passes.
"""
for mode in ["generative", "hybrid", "lossless"]:
encoded = self.encode(obj, context=context, mode=mode)
unfolded = self.unfold(encoded)
v = self.verify(obj, unfolded, encoded)
if v.passed and v.score >= target_score:
return encoded, unfolded, v, self.report(obj, encoded)
encoded = self.encode(obj, context=context, mode="lossless")
unfolded = self.unfold(encoded)
v = self.verify(obj, unfolded, encoded)
return encoded, unfolded, v, self.report(obj, encoded)


# ---------------------------------------------------------------------------
# Extension examples: non-text events and action traces
# ---------------------------------------------------------------------------


class EventTraceAdapter(BasisAdapter):
"""
Example adapter for action/event sequences.

Input can be a list of dicts like:
[{"actor":"robot", "action":"move", "target":"cup"}, ...]
"""

name = "event_trace"

def score(self, obj: Any, context: Optional[Mapping[str, Any]] = None) -> float:
if not isinstance(obj, list) or not obj:
return 0.0
dicts = sum(isinstance(x, dict) for x in obj)
actionish = sum(isinstance(x, dict) and ("action" in x or "event" in x or "type" in x) for x in obj)
return bounded((dicts + actionish) / (2 * len(obj)), 0.0, 1.0)

def fingerprint(self, text: str) -> Dict[str, Any]:
return {"kind": "event_trace", "digest": stable_hash(text, 16)}

def split(self, text: str) -> List[str]:
parsed = safe_json_loads(text)
if isinstance(parsed, list):
return [json.dumps(x, ensure_ascii=False, sort_keys=True) for x in parsed]
return super().split(text)

def motif(self, text: str) -> str:
parsed = safe_json_loads(text)
if isinstance(parsed, dict):
actor = parsed.get("actor", "agent")
action = parsed.get("action") or parsed.get("event") or parsed.get("type") or "act"
target = parsed.get("target") or parsed.get("object") or parsed.get("to") or "field"
return f"event:{actor}:{action}:{target}"
return "event:" + stable_hash(text, 10)


# ---------------------------------------------------------------------------
# Demo / CLI
# ---------------------------------------------------------------------------


DEMO_TEXT = """# Origaminal Hypertokens

The surface token is not the natural unit of thought. The natural unit is a folded object: a motif, a basis, an expansion group, and a residue.

- Compression removes redundancy.
- Encryption whitens structure.
- Crystallography stores the asymmetric unit and unfolds the full lattice through symmetry.
- Quantum coding uses the shared prior to reduce what must be sent.

Therefore the model should generate the law, not the leaves. It should emit an origaminal hypertoken that unfolds like semantic origami.

```python
def tiny_generator(seed):
return [seed * i for i in range(3)]
```
"""


def demo() -> None:
codec = OrigaminalHyperCompressor()
for mode in ["generative", "hybrid", "lossless"]:
obj = codec.encode(DEMO_TEXT, context={"user": "Luminosity", "task": "hypercompression"}, mode=mode)
unfolded = codec.unfold(obj)
ver = codec.verify(DEMO_TEXT, unfolded, obj)
rep = codec.report(DEMO_TEXT, obj)
print("=" * 78)
print(f"MODE: {mode}")
print(rep.pretty())
print(f"verify : passed={ver.passed} score={ver.score:.3f} warnings={len(ver.warnings)} errors={len(ver.errors)}")
print("--- packed preview ---")
print(obj.pack()[:220] + "...")
print("--- unfolded preview ---")
print(textwrap.shorten(unfolded.replace("\n", " / "), width=480, placeholder=" ..."))
print()


def selftest() -> None:
codec = OrigaminalHyperCompressor()
text = "Hello world. Hello world. Hello world.\n\n- alpha\n- beta\n- gamma\n"
lossless = codec.encode(text, mode="lossless")
unfolded = codec.unfold(lossless)
assert unfolded == text, "lossless reconstruction failed"
v = codec.verify(text, unfolded, lossless)
assert v.passed, v
payload = lossless.pack()
restored = OrigaminalObject.unpack(payload)
assert codec.unfold(restored) == unfolded

events = [
{"actor": "robot", "action": "approach", "target": "cup"},
{"actor": "robot", "action": "grasp", "target": "cup"},
{"actor": "robot", "action": "lift", "target": "cup"},
]
event_codec = OrigaminalHyperCompressor(bases=[EventTraceAdapter(), JSONBasis(), ProseBasis()])
obj = event_codec.encode(events, mode="hybrid")
assert obj.prior.dominant_basis in {"event_trace", "json"}
print("selftest passed")


def main(argv: Optional[List[str]] = None) -> int:
parser = argparse.ArgumentParser(description="Origaminal Hypertokens three-pass hypercompression prototype")
sub = parser.add_subparsers(dest="cmd")

sub.add_parser("demo")
sub.add_parser("selftest")

p_encode = sub.add_parser("encode")
p_encode.add_argument("input", help="input text file")
p_encode.add_argument("--mode", choices=["generative", "hybrid", "lossless"], default="hybrid")
p_encode.add_argument("--out", default="-", help="output packed payload file, or '-' for stdout")

p_unfold = sub.add_parser("unfold")
p_unfold.add_argument("payload", help="packed payload file")
p_unfold.add_argument("--out", default="-", help="output text file, or '-' for stdout")

args = parser.parse_args(argv)
codec = OrigaminalHyperCompressor()

if args.cmd == "demo" or args.cmd is None:
demo()
return 0
if args.cmd == "selftest":
selftest()
return 0
if args.cmd == "encode":
with open(args.input, "r", encoding="utf-8") as f:
text = f.read()
obj = codec.encode(text, mode=args.mode)
payload = obj.pack()
if args.out == "-":
print(payload)
else:
with open(args.out, "w", encoding="utf-8") as f:
f.write(payload)
print(codec.report(text, obj).pretty(), file=sys.stderr)
return 0
if args.cmd == "unfold":
with open(args.payload, "r", encoding="utf-8") as f:
payload = f.read().strip()
obj = OrigaminalObject.unpack(payload)
text = codec.unfold(obj)
if args.out == "-":
print(text)
else:
with open(args.out, "w", encoding="utf-8") as f:
f.write(text)
return 0
parser.print_help()
return 2


if __name__ == "__main__":
raise SystemExit(main())