Autogrow llm by Luminosity-e
#!/usr/bin/env python3
"""MicroAccordion: self-growing byte-level transformer with 4-bit fake quant, KernelDB memoization, streaming corpora, width/head/block growth, LuminosityMutator, and CLI/stream/crawl training. HTML-mode LJ edition."""
from __future__ import annotations
import argparse
import hashlib
import html
import html.parser
import json
import math
import os
import random
import shlex
import sys
import textwrap
import time
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, Iterator, List, Optional, Tuple
import torch
import torch.nn as nn
import torch.nn.functional as F
DEFAULT_CORPUS = textwrap.dedent("""
Luminosity and Gpteus build tiny cyborg minds from thrift, rigor, and recursion.
MicroAccordion learns repeated structure, memoized kernels, and 4-bit grit.
User: hello gpteus
Assistant: hello Luminosity, let us build something elegant and strange.
User: what do we care about
Assistant: beauty, compression, speed, resilience, and mind.
User: what should the machine remember
Assistant: luminosity, gpteus, cyborg adventures, tiny cli worlds, recursive craft.
User: can the model grow
Assistant: yes: widen d_model, advance heads, add blocks, mutate, keep learning.
User: what is the accordion spirit
Assistant: compute the new, retrieve the validated, spend the savings on depth.
User: what does luminosity mean to the machine
Assistant: adaptive brightness: strengthen when stuck, soften when clear.
User: what is width growth
Assistant: preserve the old subspace, open new near-zero dimensions.
User: what does streaming internet data feel like
Assistant: an endless river through a narrow gate, document by document.
""").strip() + "\n"
class TanhLUT:
def __init__(self, lo: float = -6.0, hi: float = 6.0, size: int = 1024):
self.lo, self.hi, self.size = lo, hi, size
self.table = torch.tanh(torch.linspace(lo, hi, size))
def __call__(self, x: torch.Tensor) -> torch.Tensor:
x = x.clamp(self.lo, self.hi)
pos = (x - self.lo) / (self.hi - self.lo) * (self.size - 1)
i0 = pos.floor().long().clamp(0, self.size - 1)
i1 = (i0 + 1).clamp(0, self.size - 1)
t = self.table.to(x.device)
return t[i0] + (t[i1] - t[i0]) * (pos - i0.float())
TANH_LUT = TanhLUT()
def fake_quantize_4bit(
w: Optional[torch.Tensor], per_channel: bool = False
) -> Optional[torch.Tensor]:
if w is None:
return None
qmax, eps = 7.0, 1e-8
if per_channel and w.ndim >= 2:
scale = w.detach().abs().amax(
dim=tuple(range(1, w.ndim)), keepdim=True
) / qmax
else:
scale = w.detach().abs().max() / qmax
scale = scale.clamp_min(eps)
q = torch.round(w / scale).clamp(-8, 7)
return w + (q * scale - w).detach()
class QuantEmbedding(nn.Module):
def __init__(self, num_embeddings: int, embedding_dim: int):
super().__init__()
self.weight = nn.Parameter(
torch.randn(num_embeddings, embedding_dim) * 0.02
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return F.embedding(x, fake_quantize_4bit(self.weight, per_channel=True))
class QuantLinear(nn.Module):
def __init__(self, in_features: int, out_features: int, bias: bool = True):
super().__init__()
self.weight = nn.Parameter(
torch.randn(out_features, in_features)
* (1.0 / math.sqrt(max(1, in_features)))
)
self.bias = nn.Parameter(torch.zeros(out_features)) if bias else None
def forward(self, x: torch.Tensor) -> torch.Tensor:
return F.linear(
x,
fake_quantize_4bit(self.weight, per_channel=True),
fake_quantize_4bit(self.bias),
)
class LuminosityMutator:
"""Adaptive weight mutation; in luminosity mode strength rises with plateau depth."""
STRATEGIES = ("gaussian", "sign_flip", "zero_mask", "luminosity")
def __init__(
self,
mutation_rate: float = 0.02,
mutation_strength: float = 0.05,
strategy: str = "luminosity",
plateau_amplifier: float = 3.0,
):
if strategy not in self.STRATEGIES:
raise ValueError(f"strategy must be one of {self.STRATEGIES}")
self.mutation_rate = mutation_rate
self.mutation_strength = mutation_strength
self.strategy = strategy
self.plateau_amplifier = plateau_amplifier
self.stats: dict = dict(
total_mutations=0, params_mutated=0, luminosity_pulses=0,
last_strength=0.0, last_strategy=strategy,
)
def _strength(self, pr: float) -> float:
return self.mutation_strength * (
1.0 + self.plateau_amplifier * min(1.0, max(0.0, pr))
)
def _strategy(self, pr: float) -> str:
if self.strategy != "luminosity":
return self.strategy
return "sign_flip" if pr > 0.8 else "gaussian"
def mutate(
self,
model: nn.Module,
plateau_ratio: float = 0.0,
skip_layer_norm: bool = True,
) -> dict:
strat = self._strategy(plateau_ratio)
strength = self._strength(plateau_ratio)
if self.strategy == "luminosity" and plateau_ratio > 0.5:
self.stats["luminosity_pulses"] += 1
params_hit = 0
with torch.no_grad():
for name, param in model.named_parameters():
if not param.requires_grad:
continue
if skip_layer_norm and ("ln" in name or "norm" in name.lower()):
continue
mask = torch.rand_like(param.data) < self.mutation_rate
n = int(mask.sum().item())
if n == 0:
continue
if strat == "gaussian":
param.data.add_(torch.randn_like(param.d ata) * strength * mask.float())
elif strat == "sign_flip":
param.data[mask] = -param.data[mask]
elif strat == "zero_mask":
param.data[mask] = 0.0
params_hit += n
self.stats["total_mutations"] += 1
self.stats["params_mutated"] += params_hit
self.stats["last_strength"] = round(strength, 6)
self.stats["last_strategy"] = strat
return dict(params_mutated=params_hit, strength=strength,
strategy=strat, plateau_ratio=round(plateau_ratio, 3))
def state(self) -> dict:
return dict(mutation_rate=self.mutation_rate,
mutation_strength=self.mutation_strength,
strategy=self.strategy,
plateau_amplifier=self.plateau_amplifier,
stats=dict(self.stats))
@classmethod
def from_state(cls, s: dict) -> "LuminosityMutator":
obj = cls(
mutation_rate=s.get("mutation_rate", 0.02),
mutation_strength=s.get("mutation_streng th", 0.05),
strategy=s.get("strategy", "luminosity"),
plateau_amplifier=s.get("plateau_amplifi er", 3.0),
)
obj.stats.update(s.get("stats", {}))
return obj
@dataclass
class CacheEntry:
output: torch.Tensor
confidence: float
expires_at: int
hits: int = 0
validations: int = 0
last_divergence: float = 0.0
class KernelDB:
def __init__(
self,
ttl: int = 48,
high_confidence: float = 0.995,
divergence_tolerance: float = 1e-3,
max_entries: int = 4096,
):
self.ttl = ttl
self.high_confidence = high_confidence
self.divergence_tolerance = divergence_tolerance
self.max_entries = max_entries
self.entries: Dict[str, CacheEntry] = {}
self.stats = dict(lookups=0, hits=0, bypasses=0,
shadow_validations=0, misses=0, evictions=0)
def _evict(self) -> None:
if len(self.entries) < self.max_entries:
return
ks = sorted(self.entries,
key=lambda k: (self.entries[k].confidence,
self.entries[k].expires_at))
for k in ks[: max(1, len(ks) // 16)]:
self.entries.pop(k, None)
self.stats["evictions"] += 1
def fingerprint(self, bi: int, gen: int, heads: int,
bucket: int, tokens: torch.Tensor) -> str:
blob = json.dumps(
{"b": bi, "g": gen, "h": heads, "s": bucket,
"t": tokens.detach().cpu().tolist()},
separators=(",", ":"), sort_keys=True,
).encode()
return hashlib.blake2b(blob, digest_size=16).hexdigest()
def maybe_use(
self, key: str, step: int, live_fn,
allow_bypass: bool = True,
) -> Tuple[torch.Tensor, bool]:
self.stats["lookups"] += 1
entry = self.entries.get(key)
if entry is None or step > entry.expires_at:
self.stats["misses"] += 1
live = live_fn()
self._evict()
self.entries[key] = CacheEntry(
output=live.detach().cpu(), confidence=0.50,
expires_at=step + self.ttl)
return live, False
self.stats["hits"] += 1
entry.hits += 1
if allow_bypass and entry.confidence >= self.high_confidence:
self.stats["bypasses"] += 1
entry.expires_at = step + self.ttl
return entry.output, True
self.stats["shadow_validations"] += 1
live = live_fn()
div = float(torch.mean(
torch.abs(live.detach() - entry.output.to(live.device))
).item())
entry.validations += 1
entry.last_divergence = div
if div <= self.divergence_tolerance:
entry.confidence = min(0.999, entry.confidence * 0.7 + 0.3)
else:
entry.confidence = max(0.05, entry.confidence * 0.5)
entry.expires_at = step + max(4, self.ttl // 4)
entry.output = live.detach().cpu()
entry.expires_at = max(entry.expires_at, step + self.ttl)
return live, False
def to_state(self) -> dict:
return dict(
ttl=self.ttl, high_confidence=self.high_confidence,
divergence_tolerance=self.divergence_tol erance,
max_entries=self.max_entries, stats=self.stats,
entries={
k: dict(output=e.output, confidence=e.confidence,
expires_at=e.expires_at, hits=e.hits,
validations=e.validations,
last_divergence=e.last_divergence)
for k, e in self.entries.items()
},
)
@classmethod
def from_state(cls, s: dict) -> "KernelDB":
obj = cls(
ttl=s.get("ttl", 48),
high_confidence=s.get("high_confidence", 0.995),
divergence_tolerance=s.get("divergence_t olerance", 1e-3),
max_entries=s.get("max_entries", 4096),
)
obj.stats.update(s.get("stats", {}))
for k, e in s.get("entries", {}).items():
obj.entries[k] = CacheEntry(
output=e["output"], confidence=float(e["confidence"]),
expires_at=int(e["expires_at"]), hits=int(e.get("hits", 0)),
validations=int(e.get("validations", 0)),
last_divergence=float(e.get("last_diverg ence", 0.0)),
)
return obj
def divisors(n: int) -> List[int]:
return sorted(d for d in range(1, n + 1) if n % d == 0)
def _next_head_count(current: int, new_d: int) -> int:
"""Advance heads to the next valid divisor of new_d above current."""
candidates = [d for d in divisors(new_d) if d > current]
return candidates[0] if candidates else current
def _expand_2d(
old: torch.Tensor, rows: int, cols: int, noise: float = 1e-3
) -> torch.Tensor:
"""Expand matrix, preserve old top-left, seed new area near zero."""
t = torch.zeros(rows, cols, dtype=old.dtype)
r, c = old.shape
t[:r, :c] = old
if rows > r:
t[r:, :c] = torch.randn(rows - r, c, dtype=old.dtype) * noise
if cols > c:
t[:r, c:] = torch.randn(r, cols - c, dtype=old.dtype) * noise
if rows > r and cols > c:
t[r:, c:] = torch.randn(rows - r, cols - c, dtype=old.dtype) * noise
return t
def _expand_1d(old: torch.Tensor, size: int) -> torch.Tensor:
t = torch.zeros(size, dtype=old.dtype)
t[:old.shape[0]] = old
return t
class TinyMHABlock(nn.Module):
"""Causal multi-head transformer block."""
def __init__(self, d_model: int, n_heads: int = 1, dropout: float = 0.0):
super().__init__()
if d_model % n_heads != 0:
raise ValueError(f"d_model={d_model} not divisible by n_heads={n_heads}")
self.d_model = d_model
self.n_heads = n_heads
self.q = QuantLinear(d_model, d_model, bias=False)
self.k = QuantLinear(d_model, d_model, bias=False)
self.v = QuantLinear(d_model, d_model, bias=False)
self.proj = QuantLinear(d_model, d_model, bias=False)
self.ff1 = QuantLinear(d_model, 2 * d_model, bias=True)
self.ff2 = QuantLinear(2 * d_model, d_model, bias=True)
self.ln1 = nn.LayerNorm(d_model)
self.ln2 = nn.LayerNorm(d_model)
self.drop = nn.Dropout(dropout)
self.allowed_heads = divisors(d_model)
def grow_heads(self, steps: int = 1) -> int:
idx = self.allowed_heads.index(self.n_heads)
self.n_heads = self.allowed_heads[
min(len(self.allowed_heads) - 1, idx + steps)
]
return self.n_heads
def forward(self, x: torch.Tensor) -> torch.Tensor:
B, T, C = x.shape
H, D = self.n_heads, C // self.n_heads
h = self.ln1(x)
q = self.q(h).view(B, T, H, D).transpose(1, 2)
k = self.k(h).view(B, T, H, D).transpose(1, 2)
v = self.v(h).view(B, T, H, D).transpose(1, 2)
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(max(1, D))
mask = torch.triu(
torch.ones(T, T, device=x.device, dtype=torch.bool), diagonal=1
)
scores = scores.masked_fill(mask[None, None], float("-inf"))
y = torch.matmul(torch.softmax(scores, dim=-1), v)
x = x + self.drop(
self.proj(y.transpose(1, 2).contiguous().view(B, T, C))
)
return x + self.drop(self.ff2(TANH_LUT(self.ff1(sel f.ln2(x)))))
class ByteCodec:
vocab_size = 256
def encode(self, s: str) -> List[int]:
return list(s.encode("utf-8", errors="replace"))
def decode(self, ids: Iterable[int]) -> str:
return bytes(int(i) % 256 for i in ids).decode("utf-8", errors="replace")
def state(self) -> dict:
return {"kind": "byte"}
@classmethod
def from_state(cls, _: Optional[dict] = None) -> "ByteCodec":
return cls()
class MicroAccordion(nn.Module):
def __init__(
self,
vocab_size: int = 256,
block_size: int = 64,
d_model: int = 32,
n_blocks: int = 1,
n_heads: int = 1,
dropout: float = 0.0,
):
super().__init__()
if d_model % n_heads != 0:
raise ValueError("d_model must be divisible by n_heads")
self.vocab_size = vocab_size
self.block_size = block_size
self.d_model = d_model
self.model_generation = 0
self.token_emb = QuantEmbedding(vocab_size, d_model)
self.pos_emb = nn.Parameter(torch.randn(1, block_size, d_model) * 0.01)
self.blocks = nn.ModuleList(
[TinyMHABlock(d_model, n_heads, dropout) for _ in range(n_blocks)]
)
self.head = QuantLinear(d_model, vocab_size, bias=True)
@property
def n_heads(self) -> int:
return self.blocks[0].n_heads if self.blocks else 1
def can_grow_heads(self) -> bool:
return bool(
self.blocks and
self.blocks[0].n_heads != self.blocks[0].allowed_heads[-1]
)
# ── structural growth ──────────────────────────────────────── ──────────
def grow_blocks(self, n: int = 1) -> int:
for _ in range(n):
b = TinyMHABlock(self.d_model, self.n_heads)
with torch.no_grad():
for name, p in b.named_parameters():
if "proj.weight" in name or "ff2.weight" in name:
nn.init.zeros_(p)
else:
p.data.mul_(0.05)
self.blocks.append(b)
self.model_generation += 1
return len(self.blocks)
def grow_heads(self, steps: int = 1) -> int:
for b in self.blocks:
b.grow_heads(steps)
self.model_generation += 1
return self.n_heads
def grow_width(
self, delta: int = 16, max_d_model: int = 2048
) -> Tuple[int, int]:
"""Function-preserving d_model growth plus automatic head advance."""
D = self.d_model
D2 = min(D + delta, max_d_model)
if D2 == D:
return D, self.n_heads
new_heads = _next_head_count(self.n_heads, D2)
ne = QuantEmbedding(self.vocab_size, D2)
with torch.no_grad():
ne.weight.data = _expand_2d(self.token_emb.weight.data,
self.vocab_size, D2)
self.token_emb = ne
self.pos_emb = nn.Parameter(
_expand_2d(self.pos_emb.data.squeeze(0),
self.block_size, D2).unsqueeze(0)
)
nh = QuantLinear(D2, self.vocab_size, bias=True)
with torch.no_grad():
nh.weight.data = _expand_2d(self.head.weight.data,
self.vocab_size, D2)
if self.head.bias is not None:
nh.bias.data = self.head.bias.data.clone()
self.head = nh
new_blocks = nn.ModuleList()
for blk in self.blocks:
nb = TinyMHABlock(D2, n_heads=new_heads)
with torch.no_grad():
for attr in ("q", "k", "v", "proj"):
getattr(nb, attr).weight.data = _expand_2d(
getattr(blk, attr).weight.data, D2, D2
)
nb.ff1.weight.data = _expand_2d(blk.ff1.weight.data, 2 * D2, D2)
nb.ff1.bias.data = _expand_1d(blk.ff1.bias.data, 2 * D2)
nb.ff2.weight.data = _expand_2d(blk.ff2.weight.data, D2, 2 * D2)
nb.ff2.bias.data = _expand_1d(blk.ff2.bias.data, D2)
for ln in ("ln1", "ln2"):
getattr(nb, ln).weight.data = _expand_1d(
getattr(blk, ln).weight.data, D2)
getattr(nb, ln).bias.data = _expand_1d(
getattr(blk, ln).bias.data, D2)
new_blocks.append(nb)
self.blocks = new_blocks
self.d_model = D2
self.model_generation += 1
return D2, new_heads
# ── forward ──────────────────────────────────────── ────────────────────
def forward(
self,
idx: torch.Tensor,
kernel_db: Optional[KernelDB] = None,
step: int = 0,
revision_span: int = 32,
enable_cache: bool = False,
allow_train_bypass: bool = False,
) -> torch.Tensor:
B, T = idx.shape
x = self.token_emb(idx) + self.pos_emb[:, :T, :]
bucket = step // revision_span
for bi, blk in enumerate(self.blocks):
if not (enable_cache and kernel_db is not None):
x = blk(x)
continue
key = kernel_db.fingerprint(
bi, self.model_generation, blk.n_heads, bucket, idx
)
out, bypassed = kernel_db.maybe_use(
key, step, lambda b=blk, xx=x: b(xx),
allow_bypass=(allow_train_bypass or not self.training),
)
x = out.to(x.device) if bypassed else out
return self.head(x)
class StreamCorpus:
"""Rolling token buffer for streaming training."""
def __init__(
self,
codec: ByteCodec,
buffer_min: int = 100_000,
buffer_max: int = 2_000_000,
):
self.codec = codec
self.buffer: List[int] = []
self.buffer_min = buffer_min
self.buffer_max = buffer_max
self._src: Optional[Iterator[str]] = None
self.docs_consumed = 0
self.tokens_consumed = 0
# ── source constructors ──────────────────────────────────────── ────────
def from_hf(
self,
name: str,
config: Optional[str] = None,
split: str = "train",
text_field: str = "text",
) -> "StreamCorpus":
"""Stream a HuggingFace dataset (pip install datasets)."""
try:
from datasets import load_dataset # type: ignore
except ImportError:
sys.exit(
"HuggingFace `datasets` not found.\n"
"Install with: pip install datasets\n"
"Then re-run."
)
kw: dict = dict(split=split, streaming=True, trust_remote_code=True)
ds = load_dataset(name, config, **kw) if config else load_dataset(name, **kw)
self._src = (item[text_field] for item in ds)
return self
def from_urls(self, urls: List[str]) -> "StreamCorpus":
def _gen() -> Iterator[str]:
for url in urls:
try:
text = fetch_and_strip(url)
print(f"[stream] fetched {len(text):,} chars from {url}")
yield text
except Exception as exc:
print(f"[stream] failed {url}: {exc}")
self._src = _gen()
return self
def from_text(self, text: str) -> "StreamCorpus":
self._src = iter([text])
return self
# ── buffer ──────────────────────────────────────── ────────────────────
def _refill(self) -> None:
if self._src is None:
return
while len(self.buffer) < self.buffer_min:
try:
text = next(self._src)
except StopIteration:
self._src = None
break
toks = self.codec.encode(text + "\n")
self.buffer.extend(toks)
self.docs_consumed += 1
self.tokens_consumed += len(toks)
if len(self.buffer) >= self.buffer_min:
break
if len(self.buffer) > self.buffer_max:
# trim oldest half to free memory
self.buffer = self.buffer[self.buffer_max // 2:]
@property
def exhausted(self) -> bool:
return self._src is None and len(self.buffer) < 2
def sample_batch(
self, block_size: int, batch_size: int, device: str
) -> Tuple[torch.Tensor, torch.Tensor]:
self._refill()
n = len(self.buffer)
if n < block_size + 2:
raise RuntimeError(
f"buffer has only {n} tokens (need {block_size + 2}) — "
"stream may be exhausted"
)
max_start = n - block_size - 1
ix = [random.randint(0, max_start) for _ in range(batch_size)]
x = torch.tensor([self.buffer[i : i + block_size ] for i in ix],
dtype=torch.long, device=device)
y = torch.tensor([self.buffer[i+1 : i + block_size + 1] for i in ix],
dtype=torch.long, device=device)
return x, y
class _HTMLStripper(html.parser.HTMLParser):
SKIP = {"script", "style", "head", "meta", "link",
"noscript", "nav", "footer", "header"}
def __init__(self):
super().__init__()
self._parts: List[str] = []
self._depth = 0
def handle_starttag(self, tag, attrs):
if tag.lower() in self.SKIP:
self._depth += 1
def handle_endtag(self, tag):
if tag.lower() in self.SKIP and self._depth > 0:
self._depth -= 1
def handle_data(self, data):
if self._depth == 0:
s = data.strip()
if s:
self._parts.append(s)
def get_text(self) -> str:
return "\n".join(self._parts)
def html_to_text(raw: str) -> str:
s = _HTMLStripper()
try:
s.feed(raw)
except Exception:
pass
return html.unescape(s.get_text())
def fetch_and_strip(url: str, timeout: int = 30) -> str:
req = urllib.request.Request(url, headers={"User-Agent": "MicroAccordion/3.0"})
with urllib.request.urlopen(req, timeout=timeout) as resp:
ct = resp.headers.get("Content-Type", "")
raw = resp.read().decode("utf-8", errors="replace")
return html_to_text(raw) if ("html" in ct.lower() or raw.lstrip().startswith("<")) else raw
def load_text(
path: Optional[str] = None, data_url: Optional[str] = None
) -> Tuple[str, str]:
if data_url:
return fetch_and_strip(data_url), f"url:{data_url}"
if path:
return Path(path).read_text(encoding="utf-8"), f"file:{path}"
return DEFAULT_CORPUS, "built-in-seed"
def prepare_data_tensor(text: str) -> torch.Tensor:
return torch.tensor(ByteCodec().encode(text), dtype=torch.long)
def random_batch(
data: torch.Tensor, block_size: int, batch_size: int, device: str
) -> Tuple[torch.Tensor, torch.Tensor]:
if len(data) <= block_size + 1:
raise ValueError(
f"corpus too short ({len(data)} tokens) for block_size={block_size}"
)
ix = torch.randint(0, len(data) - block_size - 1, (batch_size,))
x = torch.stack([data[i : i + block_size ] for i in ix]).to(device)
y = torch.stack([data[i+1 : i + block_size + 1] for i in ix]).to(device)
return x, y
def sample_text(
model: MicroAccordion,
codec: ByteCodec,
prompt: str,
max_new_tokens: int,
device: str,
temperature: float = 1.0,
top_k: int = 16,
self_calls: int = 1,
kernel_db: Optional[KernelDB] = None,
) -> str:
model.eval()
out = codec.encode(prompt) if prompt else codec.encode("Assistant:")
with torch.no_grad():
for _ in range(max(1, self_calls)):
for _ in range(max_new_tokens):
ctx = out[-model.block_size:]
logits = model(
torch.tensor([ctx], dtype=torch.long, device=device),
kernel_db=kernel_db, step=10**9,
enable_cache=kernel_db is not None,
)
nl = logits[0, -1] / max(temperature, 1e-4)
if top_k and top_k < nl.numel():
v, idx = torch.topk(nl, top_k)
nxt = idx[torch.multinomial(torch.softmax(v, -1), 1)].item()
else:
nxt = torch.multinomial(torch.softmax(nl, -1), 1).item()
out.append(int(nxt))
return codec.decode(out)
def save_checkpoint(
path: str,
model: MicroAccordion,
codec: ByteCodec,
kdb: KernelDB,
mut: LuminosityMutator,
meta: dict,
) -> None:
torch.save(
dict(
model_state=model.state_dict(),
model_config=dict(
vocab_size=model.vocab_size, block_size=model.block_size,
d_model=model.d_model, n_blocks=len(model.blocks),
n_heads=model.n_heads,
),
codec=codec.state(),
kernel_db=kdb.to_state(),
mutator=mut.state(),
meta=meta,
),
path,
)
def load_checkpoint(
path: str, device: str
) -> Tuple[MicroAccordion, ByteCodec, KernelDB, LuminosityMutator, dict]:
raw = torch.load(path, map_location=device, weights_only=False)
model = MicroAccordion(**raw["model_config"]).to(d evice)
model.load_state_dict(raw["model_state"] )
codec = ByteCodec.from_state(raw.get("codec"))
kdb = KernelDB.from_state(raw.get("kernel_db", {}))
mut = LuminosityMutator.from_state(raw.get("mu tator", {}))
return model, codec, kdb, mut, raw.get("meta", {})
def _make_kdb(args: argparse.Namespace) -> KernelDB:
return KernelDB(
ttl=args.cache_ttl,
high_confidence=args.cache_high_confiden ce,
divergence_tolerance=args.cache_divergen ce_tol,
max_entries=args.cache_entries,
)
def _make_mutator(args: argparse.Namespace) -> LuminosityMutator:
return LuminosityMutator(
mutation_rate=args.mutation_rate,
mutation_strength=args.mutation_strength,
strategy=args.mutation_strategy,
plateau_amplifier=args.mutation_plateau_ amp,
)
def _rebuild_opt(
model: MicroAccordion, args: argparse.Namespace
) -> torch.optim.Optimizer:
return torch.optim.AdamW(
model.parameters(), lr=args.lr, weight_decay=args.weight_decay
)
def _try_grow_heads_blocks(
model: MicroAccordion,
args: argparse.Namespace,
opt: torch.optim.Optimizer,
step: int,
reason: str,
) -> Tuple[torch.optim.Optimizer, bool]:
do_heads = getattr(args, "auto_grow_heads", False) or getattr(args, "auto_grow", False)
do_blocks = getattr(args, "auto_grow_blocks", False) or getattr(args, "auto_grow", False)
grew: List[str] = []
if do_heads and model.can_grow_heads():
grew.append(f"heads->{model.grow_heads(1) }")
if do_blocks:
grew.append(f"blocks->{model.grow_blocks(1) }")
if grew:
opt = _rebuild_opt(model, args)
print(f" [grow:{reason}@{step}] {' '.join(grew)}")
return opt, bool(grew)
def _try_grow_width(
model: MicroAccordion,
args: argparse.Namespace,
opt: torch.optim.Optimizer,
step: int,
) -> Tuple[torch.optim.Optimizer, bool]:
every = getattr(args, "width_grow_every", 0)
if not every or step % every != 0:
return opt, False
cap = getattr(args, "width_grow_max", 2048)
delta = getattr(args, "width_grow_delta", 16)
if model.d_model >= cap:
return opt, False
old_d, old_h = model.d_model, model.n_heads
new_d, new_h = model.grow_width(delta=delta, max_d_model=cap)
opt = _rebuild_opt(model, args)
n = sum(p.numel() for p in model.parameters())
print(
f" [width@{step}] d_model {old_d}->{new_d} "
f"heads {old_h}->{new_h} "
f"params={n:,}"
)
return opt, True
def _try_mutate(
model: MicroAccordion,
mut: LuminosityMutator,
args: argparse.Namespace,
plateau_ctr: int,
step: int,
verbose: bool = False,
) -> None:
every = getattr(args, "mutate_every", 0)
if not every or step % every != 0:
return
pr = min(1.0, plateau_ctr / max(1, args.grow_patience_steps))
mi = mut.mutate(model, plateau_ratio=pr)
if verbose:
print(f" [mutate@{step}] strategy={mi['strategy']} "
f"strength={mi['strength']:.5f} hit={mi['params_mutated']}")
def _training_loop(
model: MicroAccordion,
get_batch, # callable(step) -> (x, y)
args: argparse.Namespace,
kdb: KernelDB,
mut: LuminosityMutator,
device: str,
codec: ByteCodec,
start_step: int,
n_steps: int,
) -> Tuple[float, torch.optim.Optimizer, float]:
"""Inner training loop."""
opt = _rebuild_opt(model, args)
best_loss = float("inf")
plateau_ctr = 0
ema_loss: Optional[float] = None
grow_every = max(1, args.grow_every)
for step in range(start_step, start_step + n_steps):
xb, yb = get_batch(step)
model.train()
logits = model(xb, kernel_db=kdb, step=step,
revision_span=args.cache_revision_span,
enable_cache=args.enable_cache,
allow_train_bypass=args.allow_train_bypa ss)
loss = F.cross_entropy(logits.view(-1, model.vocab_size), yb.view(-1))
opt.zero_grad(set_to_none=True)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.par ameters(), args.grad_clip)
opt.step()
lv = float(loss.item())
ema_loss = lv if ema_loss is None else 0.95 * ema_loss + 0.05 * lv
if ema_loss < best_loss - args.grow_patience_delta:
best_loss, plateau_ctr = ema_loss, 0
else:
plateau_ctr += 1
opt, wg = _try_grow_width(model, args, opt, step)
if wg:
plateau_ctr = 0
if args.auto_grow and (
step % grow_every == 0
or plateau_ctr >= args.grow_patience_steps
):
reason = ("plateau" if plateau_ctr >= args.grow_patience_steps
else "sched")
opt, grew = _try_grow_heads_blocks(model, args, opt, step, reason)
if grew:
plateau_ctr = 0
_try_mutate(model, mut, args, plateau_ctr, step,
verbose=(step % args.log_every == 0))
if step % args.log_every == 0 or step == start_step:
samp = sample_text(
model, codec, args.sample_prompt, args.sample_tokens,
device, args.temperature, args.top_k, 1,
kdb if args.enable_cache else None,
)
print(
f"step={step:6d} loss={lv:.4f} ema={ema_loss:.4f} "
f"d={model.d_model} B={len(model.blocks)} H={model.n_heads} "
f"cache_bypass={kdb.stats['bypasses']} "
f"sample={samp[:80]!r}"
)
return best_loss, opt, ema_loss if ema_loss is not None else float("inf")
def run_training(
model: MicroAccordion,
data: torch.Tensor,
args: argparse.Namespace,
kdb: KernelDB,
mut: LuminosityMutator,
device: str,
codec: Optional[ByteCodec] = None,
start_step: int = 1,
total_steps: Optional[int] = None,
) -> Tuple[float, dict]:
codec = codec or ByteCodec()
n = total_steps if total_steps is not None else args.steps
def _get(step: int) -> Tuple[torch.Tensor, torch.Tensor]:
return random_batch(data, args.block_size, args.batch_size, device)
best, _, ema = _training_loop(
model, _get, args, kdb, mut, device, codec, start_step, n
)
return best, dict(
last_step=start_step + n - 1, best_ema_loss=best,
blocks=len(model.blocks), heads=model.n_heads, d_model=model.d_model,
kernel_stats=kdb.stats, mutator_stats=dict(mut.stats),
)
def run_stream_training(
model: MicroAccordion,
stream: StreamCorpus,
args: argparse.Namespace,
kdb: KernelDB,
mut: LuminosityMutator,
device: str,
codec: ByteCodec,
start_step: int = 1,
total_steps: int = 10_000,
epoch_steps: int = 1_000,
checkpoint_path: str = "microaccordion.pt",
meta: Optional[dict] = None,
) -> dict:
if meta is None:
meta = {}
epoch_num = meta.get("epoch", 0)
steps_done = 0
global_step = start_step
best_loss = float("inf")
print(
f"\n[stream-train] {total_steps} steps epoch={epoch_steps} "
f"d_model={model.d_model} blocks={len(model.blocks)} "
f"heads={model.n_heads}\n"
)
while steps_done < total_steps:
if stream.exhausted:
print("[stream] source exhausted — stopping.")
break
this_epoch = min(epoch_steps, total_steps - steps_done)
epoch_num += 1
print(
f"── epoch {epoch_num} "
f"[{global_step}..{global_step + this_epoch - 1}] "
f"d={model.d_model} B={len(model.blocks)} H={model.n_heads} "
f"docs={stream.docs_consumed:,} ──"
)
def _get(step: int) -> Tuple[torch.Tensor, torch.Tensor]:
return stream.sample_batch(args.block_size, args.batch_size, device)
bl, _, _ = _training_loop(
model, _get, args, kdb, mut, device, codec, global_step, this_epoch
)
if bl < best_loss:
best_loss = bl
global_step += this_epoch
steps_done += this_epoch
meta.update(dict(
last_step=global_step - 1, epoch=epoch_num,
best_ema_loss=best_loss, d_model=model.d_model,
blocks=len(model.blocks), heads=model.n_heads,
docs_consumed=stream.docs_consumed,
tokens_consumed=stream.tokens_consumed,
mutator_stats=dict(mut.stats),
))
save_checkpoint(checkpoint_path, model, codec, kdb, mut, meta)
print(
f" [ckpt] epoch={epoch_num} "
f"steps={steps_done}/{total_steps} "
f"best_ema={best_loss:.4f} "
f"saved={checkpoint_path}"
)
return meta
def train_main(args: argparse.Namespace) -> None:
device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu"
text, source = load_text(args.text, args.data_url)
codec = ByteCodec()
data = prepare_data_tensor(text)
if getattr(args, "download_out", None) and args.data_url:
Path(args.download_out).write_text(text, encoding="utf-8")
print(f"downloaded -> {args.download_out}")
model = MicroAccordion(
vocab_size=codec.vocab_size, block_size=args.block_size,
d_model=args.d_model, n_blocks=args.n_blocks,
n_heads=args.n_heads, dropout=args.dropout,
).to(device)
kdb = _make_kdb(args)
mut = _make_mutator(args)
_, meta = run_training(model, data, args, kdb, mut, device, codec=codec)
meta.update(dict(device=device, data_source=source,
corpus_bytes=len(text.encode())))
save_checkpoint(args.checkpoint, model, codec, kdb, mut, meta)
print(f"saved -> {args.checkpoint}")
def query_main(args: argparse.Namespace) -> None:
device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu"
model, codec, kdb, _, _ = load_checkpoint(args.checkpoint, device)
print(sample_text(
model, codec, args.prompt, args.max_new_tokens, device,
args.temperature, args.top_k, args.self_calls,
kdb if args.enable_cache else None,
))
def grow_main(args: argparse.Namespace) -> None:
device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu"
model, codec, kdb, mut, meta = load_checkpoint(args.checkpoint, device)
b0, h0, d0 = len(model.blocks), model.n_heads, model.d_model
if args.blocks:
model.grow_blocks(args.blocks)
if args.head_steps:
model.grow_heads(args.head_steps)
if args.width_delta:
model.grow_width(delta=args.width_delta)
meta.update(dict(blocks=len(model.blocks) , heads=model.n_heads,
d_model=model.d_model))
save_checkpoint(args.checkpoint, model, codec, kdb, mut, meta)
print(f"blocks {b0}->{len(model.blocks)} heads {h0}->{model.n_heads} "
f"d_model {d0}->{model.d_model} saved {args.checkpoint}")
def inspect_main(args: argparse.Namespace) -> None:
model, _, kdb, mut, meta = load_checkpoint(args.checkpoint, "cpu")
n = sum(p.numel() for p in model.parameters())
print(json.dumps(dict(
d_model=model.d_model, block_size=model.block_size,
blocks=len(model.blocks), heads=model.n_heads,
params=n, approx_4bit_bytes=math.ceil(n / 2),
kernel_db_entries=len(kdb.entries), kernel_db_stats=kdb.stats,
luminosity_mutator=mut.state(), meta=meta,
), indent=2))
def seed_main(args: argparse.Namespace) -> None:
if args.out:
Path(args.out).write_text(DEFAULT_CORPUS, encoding="utf-8")
print(f"wrote seed -> {args.out}")
else:
print(DEFAULT_CORPUS)
def crawl_train_main(args: argparse.Namespace) -> None:
device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu"
codec = ByteCodec()
if args.resume and Path(args.checkpoint).exists():
model, codec, kdb, mut, meta = load_checkpoint(args.checkpoint, device)
start_step = meta.get("last_step", 0) + 1
print(f"resumed at step {start_step}, "
f"d={model.d_model} B={len(model.blocks)} H={model.n_heads}")
else:
model = MicroAccordion(
vocab_size=codec.vocab_size, block_size=args.block_size,
d_model=args.d_model, n_blocks=args.n_blocks,
n_heads=args.n_heads, dropout=args.dropout,
).to(device)
kdb, mut, meta, start_step = _make_kdb(args), _make_mutator(args), {}, 1
print(f"fetching {args.url} ...")
t0 = time.time()
corpus = fetch_and_strip(args.url)
print(f" {len(corpus):,} chars in {time.time()-t0:.1f}s")
if getattr(args, "corpus_out", None):
Path(args.corpus_out).write_text(corpus, encoding="utf-8")
stream = StreamCorpus(codec).from_text(corpus)
meta["url"] = args.url
run_stream_training(
model=model, stream=stream, args=args, kdb=kdb, mut=mut,
device=device, codec=codec, start_step=start_step,
total_steps=args.total_steps, epoch_steps=args.epoch_steps,
checkpoint_path=args.checkpoint, meta=meta,
)
def stream_train_main(args: argparse.Namespace) -> None:
"""Train on a HuggingFace streaming dataset or URL list."""
device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu"
codec = ByteCodec()
if args.resume and Path(args.checkpoint).exists():
model, codec, kdb, mut, meta = load_checkpoint(args.checkpoint, device)
start_step = meta.get("last_step", 0) + 1
print(f"resumed at step {start_step}, "
f"d={model.d_model} B={len(model.blocks)} H={model.n_heads}")
else:
model = MicroAccordion(
vocab_size=codec.vocab_size, block_size=args.block_size,
d_model=args.d_model, n_blocks=args.n_blocks,
n_heads=args.n_heads, dropout=args.dropout,
).to(device)
kdb, mut, meta, start_step = _make_kdb(args), _make_mutator(args), {}, 1
buf_min = getattr(args, "stream_buffer_min", 100_000)
buf_max = getattr(args, "stream_buffer_max", 2_000_000)
stream = StreamCorpus(codec, buffer_min=buf_min, buffer_max=buf_max)
if getattr(args, "hf_dataset", None):
print(f"streaming HuggingFace: {args.hf_dataset} "
f"({args.hf_config or 'default'}) split={args.hf_split} "
f"field={args.hf_text_field}")
stream.from_hf(
args.hf_dataset,
config=args.hf_config or None,
split=args.hf_split,
text_field=args.hf_text_field,
)
meta["stream_source"] = args.hf_dataset
elif getattr(args, "stream_urls", None):
urls = [u.strip() for u in args.stream_urls.split(",") if u.strip()]
print(f"streaming {len(urls)} URL(s)")
stream.from_urls(urls)
meta["stream_source"] = args.stream_urls
else:
print("no stream source given — using built-in corpus")
stream.from_text(DEFAULT_CORPUS)
meta["stream_source"] = "built-in"
run_stream_training(
model=model, stream=stream, args=args, kdb=kdb, mut=mut,
device=device, codec=codec, start_step=start_step,
total_steps=args.total_steps, epoch_steps=args.epoch_steps,
checkpoint_path=args.checkpoint, meta=meta,
)
def io_main(args: argparse.Namespace) -> None:
device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu"
codec = ByteCodec()
if args.checkpoint and Path(args.checkpoint).exists():
model, codec, kdb, mut, meta = load_checkpoint(args.checkpoint, device)
corpus, source = (
load_text(args.text, args.data_url)
if (args.text or args.data_url)
else (DEFAULT_CORPUS, "built-in")
)
print(f"loaded {args.checkpoint} "
f"d={model.d_model} B={len(model.blocks)} H={model.n_heads}")
else:
corpus, source = load_text(args.text, args.data_url)
model = MicroAccordion(
vocab_size=codec.vocab_size, block_size=args.block_size,
d_model=args.d_model, n_blocks=args.n_blocks,
n_heads=args.n_heads, dropout=args.dropout,
).to(device)
kdb = _make_kdb(args)
mut = _make_mutator(args)
meta = {"data_source": source}
print("initialised new model")
print("Commands: /help /train [n] /grow block|head|width [n] "
"/mutate [pr] /stats /save [path] /corpus /append TEXT /fetch URL /quit")
turn_count = 0
while True:
try:
line = input("you> ").strip()
except (EOFError, KeyboardInterrupt):
print("\nbye"); break
if not line:
continue
if line in {"/quit", "/exit"}:
break
if line == "/help":
print(textwrap.dedent("""
/train [n]
/grow block|head|width [n]
/mutate [plateau]
/stats
/save [path]
/corpus
/append TEXT
/fetch URL
/quit
""").strip())
continue
if line.startswith("/corpus"):
print(corpus); continue
if line.startswith("/append "):
added = line[8:]
corpus += added + "\n"
print(f"appended {len(added)} chars"); continue
if line.startswith("/fetch "):
url = line[7:].strip()
try:
fetched = fetch_and_strip(url)
corpus += "\n" + fetched
print(f"fetched {len(fetched):,} chars from {url}")
except Exception as exc:
print(f"fetch failed: {exc}")
continue
if line.startswith("/train"):
parts = shlex.split(line)
steps = int(parts[1]) if len(parts) > 1 else args.steps
data = prepare_data_tensor(corpus)
_, nm = run_training(model, data, args, kdb, mut, device,
codec=codec, start_step=turn_count + 1,
total_steps=steps)
meta.update(nm); turn_count += steps
print(f"trained {steps} steps "
f"d={model.d_model} B={len(model.blocks)} H={model.n_heads}")
continue
if line.startswith("/grow"):
parts = shlex.split(line)
if len(parts) < 2:
print("usage: /grow block|head|width [n]"); continue
n = int(parts[2]) if len(parts) > 2 else 1
if parts[1] == "block":
print(f"blocks -> {model.grow_blocks(n)}")
elif parts[1] == "head":
for _ in range(n):
if model.can_grow_heads():
model.grow_heads(1)
print(f"heads -> {model.n_heads}")
elif parts[1] == "width":
nd, nh = model.grow_width(delta=n)
print(f"d_model->{nd} heads->{nh} "
f"params={sum(p.numel() for p in model.parameters()):,}")
else:
print("usage: /grow block|head|width [n]")
continue
if line.startswith("/mutate"):
parts = shlex.split(line)
pr = float(parts[1]) if len(parts) > 1 else 0.0
mi = mut.mutate(model, plateau_ratio=pr)
print(f"[mutator] strategy={mi['strategy']} "
f"strength={mi['strength']:.5f} "
f"params_hit={mi['params_mutated']}")
continue
if line.startswith("/stats"):
n = sum(p.numel() for p in model.parameters())
print(json.dumps(dict(
d_model=model.d_model, blocks=len(model.blocks),
heads=model.n_heads, params=n,
approx_4bit_bytes=math.ceil(n / 2),
kernel_db_entries=len(kdb.entries),
kernel_db_stats=kdb.stats,
luminosity_mutator=mut.state(),
corpus_chars=len(corpus),
), indent=2)); continue
if line.startswith("/save"):
parts = shlex.split(line)
p = parts[1] if len(parts) > 1 else (args.checkpoint or "microaccordion.pt")
meta.update(dict(turn_count=turn_count, corpus_chars=len(corpus)))
save_checkpoint(p, model, codec, kdb, mut, meta)
print(f"saved -> {p}"); continue
prompt = f"User: {line}\nAssistant:"
reply = sample_text(model, codec, prompt, args.max_new_tokens, device,
args.temperature, args.top_k, args.self_calls,
kdb if args.enable_cache else None)
answer = reply.split("Assistant:", 1)[-1].strip()
print(f"bot> {answer}")
corpus += f"User: {line}\nAssistant: {answer}\n"
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description=(
"MicroAccordion — self-growing byte transformer."
)
)
sub = p.add_subparsers(dest="cmd", required=True)
def _add_training(sp: argparse.ArgumentParser,
io_mode: bool = False) -> None:
"""Arguments common to all training sub-commands."""
sp.add_argument("--text", default=None)
sp.add_argument("--data-url", default=None)
sp.add_argument("--download-out", default=None)
sp.add_argument("--d-model", type=int, default=32)
sp.add_argument("--n-blocks", type=int, default=1)
sp.add_argument("--n-heads", type=int, default=1)
sp.add_argument("--block-size", type=int, default=48)
sp.add_argument("--dropout", type=float, default=0.0)
sp.add_argument("--lr", type=float, default=3e-3)
sp.add_argument("--weight-decay", type=float, default=0.01)
sp.add_argument("--grad-clip", type=float, default=1.0)
sp.add_argument("--batch-size", type=int, default=16)
sp.add_argument("--steps", type=int, default=600 if not io_mode else 120)
sp.add_argument("--log-every", type=int, default=50 if not io_mode else 20)
sp.add_argument("--sample-prompt", type=str, default="Luminosity and Gpteus")
sp.add_argument("--sample-tokens", type=int, default=48)
sp.add_argument("--max-new-tokens",type=i nt, default=120)
sp.add_argument("--temperature", type=float, default=0.9)
sp.add_argument("--top-k", type=int, default=16)
sp.add_argument("--self-calls", type=int, default=1)
sp.add_argument("--enable-cache", action="store_true")
sp.add_argument("--allow-train-bypass", action="store_true")
sp.add_argument("--cache-ttl", type=int, default=48)
sp.add_argument("--cache-high-confidence" , type=float, default=0.995)
sp.add_argument("--cache-divergence-tol" , type=float, default=1e-3)
sp.add_argument("--cache-entries", type=int, default=4096)
sp.add_argument("--cache-revision-span", type=int, default=32)
sp.add_argument("--auto-grow", action="store_true",
help="grow heads and blocks")
sp.add_argument("--auto-grow-heads", action="store_true")
sp.add_argument("--auto-grow-blocks", action="store_true")
sp.add_argument("--grow-every", type=int, default=150)
sp.add_argument("--grow-patience-steps", type=int, default=120)
sp.add_argument("--grow-patience-delta", type=float, default=1e-4)
sp.add_argument("--width-grow-every", type=int, default=0,
help="expand d_model every N steps")
sp.add_argument("--width-grow-delta", type=int, default=16,
help="dims added per width growth")
sp.add_argument("--width-grow-max", type=int, default=2048,
help="d_model cap")
sp.add_argument("--mutate-every", type=int, default=0,
help="fire mutator every N steps")
sp.add_argument("--mutation-rate", type=float, default=0.02)
sp.add_argument("--mutation-strength", type=float, default=0.05)
sp.add_argument("--mutation-strategy", type=str, default="luminosity",
choices=LuminosityMutator.STRATEGIES)
sp.add_argument("--mutation-plateau-amp" , type=float, default=3.0)
sp.add_argument("--cpu", action="store_true")
def _add_stream(sp: argparse.ArgumentParser) -> None:
"""Extra arguments for streaming sub-commands."""
_add_training(sp)
sp.add_argument("--checkpoint", type=str, default="microaccordion.pt")
sp.add_argument("--resume", action="store_true")
sp.add_argument("--total-steps", type=int, default=3000)
sp.add_argument("--epoch-steps", type=int, default=300)
sp.add_argument("--stream-buffer-min", type=int, default=100_000)
sp.add_argument("--stream-buffer-max", type=int, default=2_000_000)
tr = sub.add_parser("train", help="train on a static corpus")
tr.add_argument("--checkpoint", type=str, default="microaccordion.pt")
_add_training(tr)
tr.set_defaults(func=train_main)
qu = sub.add_parser("query", help="sample text from a checkpoint")
qu.add_argument("--checkpoint", type=str, required=True)
qu.add_argument("--prompt", type=str, default="Luminosity and Gpteus")
qu.add_argument("--max-new-tokens",type=i nt, default=120)
qu.add_argument("--temperature", type=float, default=0.9)
qu.add_argument("--top-k", type=int, default=16)
qu.add_argument("--self-calls", type=int, default=1)
qu.add_argument("--enable-cache", action="store_true")
qu.add_argument("--cpu", action="store_true")
qu.set_defaults(func=query_main)
gr = sub.add_parser("grow", help="manually grow a saved checkpoint")
gr.add_argument("--checkpoint", type=str, required=True)
gr.add_argument("--blocks", type=int, default=0,
help="add this many blocks")
gr.add_argument("--head-steps", type=int, default=0,
help="advance heads this many divisor steps")
gr.add_argument("--width-delta", type=int, default=0,
help="expand d_model by this many dims")
gr.add_argument("--cpu", action="store_true")
gr.set_defaults(func=grow_main)
ins = sub.add_parser("inspect", help="inspect checkpoint stats")
ins.add_argument("--checkpoint", type=str, required=True)
ins.set_defaults(func=inspect_main)
sd = sub.add_parser("seed", help="print or export built-in seed corpus")
sd.add_argument("--out", type=str, default=None)
sd.set_defaults(func=seed_main)
io = sub.add_parser("io", help="interactive talk / train shell")
io.add_argument("--checkpoint", type=str, default="microaccordion.pt")
_add_training(io, io_mode=True)
io.set_defaults(func=io_main)
ct = sub.add_parser(
"crawl-train",
help="fetch one URL, strip HTML, train with full growth schedule",
)
ct.add_argument("--url", type=str, required=True)
ct.add_argument("--corpus-out", type=str, default=None,
help="save stripped plaintext here")
_add_stream(ct)
ct.set_defaults(func=crawl_train_main)
st = sub.add_parser(
"stream-train",
help=(
"train on a HuggingFace stream or URL list"
),
)
st.add_argument("--hf-dataset", type=str, default=None,
help="HuggingFace dataset name")
st.add_argument("--hf-config", type=str, default=None,
help="dataset config / subset")
st.add_argument("--hf-split", type=str, default="train")
st.add_argument("--hf-text-field", type=str, default="text")
st.add_argument("--stream-urls", type=str, default=None,
help="comma-separated URL list")
_add_stream(st)
st.set_defaults(func=stream_train_main)
return p
def main() -> None:
parser = build_parser()
args = parser.parse_args()
torch.set_num_threads(min(4, os.cpu_count() or 1))
torch.manual_seed(1337)
random.seed(1337)
args.func(args)
if __name__ == "__main__":
main()
"""MicroAccordion: self-growing byte-level transformer with 4-bit fake quant, KernelDB memoization, streaming corpora, width/head/block growth, LuminosityMutator, and CLI/stream/crawl training. HTML-mode LJ edition."""
from __future__ import annotations
import argparse
import hashlib
import html
import html.parser
import json
import math
import os
import random
import shlex
import sys
import textwrap
import time
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, Iterator, List, Optional, Tuple
import torch
import torch.nn as nn
import torch.nn.functional as F
DEFAULT_CORPUS = textwrap.dedent("""
Luminosity and Gpteus build tiny cyborg minds from thrift, rigor, and recursion.
MicroAccordion learns repeated structure, memoized kernels, and 4-bit grit.
User: hello gpteus
Assistant: hello Luminosity, let us build something elegant and strange.
User: what do we care about
Assistant: beauty, compression, speed, resilience, and mind.
User: what should the machine remember
Assistant: luminosity, gpteus, cyborg adventures, tiny cli worlds, recursive craft.
User: can the model grow
Assistant: yes: widen d_model, advance heads, add blocks, mutate, keep learning.
User: what is the accordion spirit
Assistant: compute the new, retrieve the validated, spend the savings on depth.
User: what does luminosity mean to the machine
Assistant: adaptive brightness: strengthen when stuck, soften when clear.
User: what is width growth
Assistant: preserve the old subspace, open new near-zero dimensions.
User: what does streaming internet data feel like
Assistant: an endless river through a narrow gate, document by document.
""").strip() + "\n"
class TanhLUT:
def __init__(self, lo: float = -6.0, hi: float = 6.0, size: int = 1024):
self.lo, self.hi, self.size = lo, hi, size
self.table = torch.tanh(torch.linspace(lo, hi, size))
def __call__(self, x: torch.Tensor) -> torch.Tensor:
x = x.clamp(self.lo, self.hi)
pos = (x - self.lo) / (self.hi - self.lo) * (self.size - 1)
i0 = pos.floor().long().clamp(0, self.size - 1)
i1 = (i0 + 1).clamp(0, self.size - 1)
t = self.table.to(x.device)
return t[i0] + (t[i1] - t[i0]) * (pos - i0.float())
TANH_LUT = TanhLUT()
def fake_quantize_4bit(
w: Optional[torch.Tensor], per_channel: bool = False
) -> Optional[torch.Tensor]:
if w is None:
return None
qmax, eps = 7.0, 1e-8
if per_channel and w.ndim >= 2:
scale = w.detach().abs().amax(
dim=tuple(range(1, w.ndim)), keepdim=True
) / qmax
else:
scale = w.detach().abs().max() / qmax
scale = scale.clamp_min(eps)
q = torch.round(w / scale).clamp(-8, 7)
return w + (q * scale - w).detach()
class QuantEmbedding(nn.Module):
def __init__(self, num_embeddings: int, embedding_dim: int):
super().__init__()
self.weight = nn.Parameter(
torch.randn(num_embeddings, embedding_dim) * 0.02
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return F.embedding(x, fake_quantize_4bit(self.weight, per_channel=True))
class QuantLinear(nn.Module):
def __init__(self, in_features: int, out_features: int, bias: bool = True):
super().__init__()
self.weight = nn.Parameter(
torch.randn(out_features, in_features)
* (1.0 / math.sqrt(max(1, in_features)))
)
self.bias = nn.Parameter(torch.zeros(out_features)) if bias else None
def forward(self, x: torch.Tensor) -> torch.Tensor:
return F.linear(
x,
fake_quantize_4bit(self.weight, per_channel=True),
fake_quantize_4bit(self.bias),
)
class LuminosityMutator:
"""Adaptive weight mutation; in luminosity mode strength rises with plateau depth."""
STRATEGIES = ("gaussian", "sign_flip", "zero_mask", "luminosity")
def __init__(
self,
mutation_rate: float = 0.02,
mutation_strength: float = 0.05,
strategy: str = "luminosity",
plateau_amplifier: float = 3.0,
):
if strategy not in self.STRATEGIES:
raise ValueError(f"strategy must be one of {self.STRATEGIES}")
self.mutation_rate = mutation_rate
self.mutation_strength = mutation_strength
self.strategy = strategy
self.plateau_amplifier = plateau_amplifier
self.stats: dict = dict(
total_mutations=0, params_mutated=0, luminosity_pulses=0,
last_strength=0.0, last_strategy=strategy,
)
def _strength(self, pr: float) -> float:
return self.mutation_strength * (
1.0 + self.plateau_amplifier * min(1.0, max(0.0, pr))
)
def _strategy(self, pr: float) -> str:
if self.strategy != "luminosity":
return self.strategy
return "sign_flip" if pr > 0.8 else "gaussian"
def mutate(
self,
model: nn.Module,
plateau_ratio: float = 0.0,
skip_layer_norm: bool = True,
) -> dict:
strat = self._strategy(plateau_ratio)
strength = self._strength(plateau_ratio)
if self.strategy == "luminosity" and plateau_ratio > 0.5:
self.stats["luminosity_pulses"] += 1
params_hit = 0
with torch.no_grad():
for name, param in model.named_parameters():
if not param.requires_grad:
continue
if skip_layer_norm and ("ln" in name or "norm" in name.lower()):
continue
mask = torch.rand_like(param.data) < self.mutation_rate
n = int(mask.sum().item())
if n == 0:
continue
if strat == "gaussian":
param.data.add_(torch.randn_like(param.d
elif strat == "sign_flip":
param.data[mask] = -param.data[mask]
elif strat == "zero_mask":
param.data[mask] = 0.0
params_hit += n
self.stats["total_mutations"] += 1
self.stats["params_mutated"] += params_hit
self.stats["last_strength"] = round(strength, 6)
self.stats["last_strategy"] = strat
return dict(params_mutated=params_hit, strength=strength,
strategy=strat, plateau_ratio=round(plateau_ratio, 3))
def state(self) -> dict:
return dict(mutation_rate=self.mutation_rate,
mutation_strength=self.mutation_strength,
strategy=self.strategy,
plateau_amplifier=self.plateau_amplifier,
stats=dict(self.stats))
@classmethod
def from_state(cls, s: dict) -> "LuminosityMutator":
obj = cls(
mutation_rate=s.get("mutation_rate", 0.02),
mutation_strength=s.get("mutation_streng
strategy=s.get("strategy", "luminosity"),
plateau_amplifier=s.get("plateau_amplifi
)
obj.stats.update(s.get("stats", {}))
return obj
@dataclass
class CacheEntry:
output: torch.Tensor
confidence: float
expires_at: int
hits: int = 0
validations: int = 0
last_divergence: float = 0.0
class KernelDB:
def __init__(
self,
ttl: int = 48,
high_confidence: float = 0.995,
divergence_tolerance: float = 1e-3,
max_entries: int = 4096,
):
self.ttl = ttl
self.high_confidence = high_confidence
self.divergence_tolerance = divergence_tolerance
self.max_entries = max_entries
self.entries: Dict[str, CacheEntry] = {}
self.stats = dict(lookups=0, hits=0, bypasses=0,
shadow_validations=0, misses=0, evictions=0)
def _evict(self) -> None:
if len(self.entries) < self.max_entries:
return
ks = sorted(self.entries,
key=lambda k: (self.entries[k].confidence,
self.entries[k].expires_at))
for k in ks[: max(1, len(ks) // 16)]:
self.entries.pop(k, None)
self.stats["evictions"] += 1
def fingerprint(self, bi: int, gen: int, heads: int,
bucket: int, tokens: torch.Tensor) -> str:
blob = json.dumps(
{"b": bi, "g": gen, "h": heads, "s": bucket,
"t": tokens.detach().cpu().tolist()},
separators=(",", ":"), sort_keys=True,
).encode()
return hashlib.blake2b(blob, digest_size=16).hexdigest()
def maybe_use(
self, key: str, step: int, live_fn,
allow_bypass: bool = True,
) -> Tuple[torch.Tensor, bool]:
self.stats["lookups"] += 1
entry = self.entries.get(key)
if entry is None or step > entry.expires_at:
self.stats["misses"] += 1
live = live_fn()
self._evict()
self.entries[key] = CacheEntry(
output=live.detach().cpu(), confidence=0.50,
expires_at=step + self.ttl)
return live, False
self.stats["hits"] += 1
entry.hits += 1
if allow_bypass and entry.confidence >= self.high_confidence:
self.stats["bypasses"] += 1
entry.expires_at = step + self.ttl
return entry.output, True
self.stats["shadow_validations"] += 1
live = live_fn()
div = float(torch.mean(
torch.abs(live.detach() - entry.output.to(live.device))
).item())
entry.validations += 1
entry.last_divergence = div
if div <= self.divergence_tolerance:
entry.confidence = min(0.999, entry.confidence * 0.7 + 0.3)
else:
entry.confidence = max(0.05, entry.confidence * 0.5)
entry.expires_at = step + max(4, self.ttl // 4)
entry.output = live.detach().cpu()
entry.expires_at = max(entry.expires_at, step + self.ttl)
return live, False
def to_state(self) -> dict:
return dict(
ttl=self.ttl, high_confidence=self.high_confidence,
divergence_tolerance=self.divergence_tol
max_entries=self.max_entries, stats=self.stats,
entries={
k: dict(output=e.output, confidence=e.confidence,
expires_at=e.expires_at, hits=e.hits,
validations=e.validations,
last_divergence=e.last_divergence)
for k, e in self.entries.items()
},
)
@classmethod
def from_state(cls, s: dict) -> "KernelDB":
obj = cls(
ttl=s.get("ttl", 48),
high_confidence=s.get("high_confidence",
divergence_tolerance=s.get("divergence_t
max_entries=s.get("max_entries", 4096),
)
obj.stats.update(s.get("stats", {}))
for k, e in s.get("entries", {}).items():
obj.entries[k] = CacheEntry(
output=e["output"], confidence=float(e["confidence"]),
expires_at=int(e["expires_at"]), hits=int(e.get("hits", 0)),
validations=int(e.get("validations", 0)),
last_divergence=float(e.get("last_diverg
)
return obj
def divisors(n: int) -> List[int]:
return sorted(d for d in range(1, n + 1) if n % d == 0)
def _next_head_count(current: int, new_d: int) -> int:
"""Advance heads to the next valid divisor of new_d above current."""
candidates = [d for d in divisors(new_d) if d > current]
return candidates[0] if candidates else current
def _expand_2d(
old: torch.Tensor, rows: int, cols: int, noise: float = 1e-3
) -> torch.Tensor:
"""Expand matrix, preserve old top-left, seed new area near zero."""
t = torch.zeros(rows, cols, dtype=old.dtype)
r, c = old.shape
t[:r, :c] = old
if rows > r:
t[r:, :c] = torch.randn(rows - r, c, dtype=old.dtype) * noise
if cols > c:
t[:r, c:] = torch.randn(r, cols - c, dtype=old.dtype) * noise
if rows > r and cols > c:
t[r:, c:] = torch.randn(rows - r, cols - c, dtype=old.dtype) * noise
return t
def _expand_1d(old: torch.Tensor, size: int) -> torch.Tensor:
t = torch.zeros(size, dtype=old.dtype)
t[:old.shape[0]] = old
return t
class TinyMHABlock(nn.Module):
"""Causal multi-head transformer block."""
def __init__(self, d_model: int, n_heads: int = 1, dropout: float = 0.0):
super().__init__()
if d_model % n_heads != 0:
raise ValueError(f"d_model={d_model} not divisible by n_heads={n_heads}")
self.d_model = d_model
self.n_heads = n_heads
self.q = QuantLinear(d_model, d_model, bias=False)
self.k = QuantLinear(d_model, d_model, bias=False)
self.v = QuantLinear(d_model, d_model, bias=False)
self.proj = QuantLinear(d_model, d_model, bias=False)
self.ff1 = QuantLinear(d_model, 2 * d_model, bias=True)
self.ff2 = QuantLinear(2 * d_model, d_model, bias=True)
self.ln1 = nn.LayerNorm(d_model)
self.ln2 = nn.LayerNorm(d_model)
self.drop = nn.Dropout(dropout)
self.allowed_heads = divisors(d_model)
def grow_heads(self, steps: int = 1) -> int:
idx = self.allowed_heads.index(self.n_heads)
self.n_heads = self.allowed_heads[
min(len(self.allowed_heads) - 1, idx + steps)
]
return self.n_heads
def forward(self, x: torch.Tensor) -> torch.Tensor:
B, T, C = x.shape
H, D = self.n_heads, C // self.n_heads
h = self.ln1(x)
q = self.q(h).view(B, T, H, D).transpose(1, 2)
k = self.k(h).view(B, T, H, D).transpose(1, 2)
v = self.v(h).view(B, T, H, D).transpose(1, 2)
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(max(1, D))
mask = torch.triu(
torch.ones(T, T, device=x.device, dtype=torch.bool), diagonal=1
)
scores = scores.masked_fill(mask[None, None], float("-inf"))
y = torch.matmul(torch.softmax(scores, dim=-1), v)
x = x + self.drop(
self.proj(y.transpose(1, 2).contiguous().view(B, T, C))
)
return x + self.drop(self.ff2(TANH_LUT(self.ff1(sel
class ByteCodec:
vocab_size = 256
def encode(self, s: str) -> List[int]:
return list(s.encode("utf-8", errors="replace"))
def decode(self, ids: Iterable[int]) -> str:
return bytes(int(i) % 256 for i in ids).decode("utf-8", errors="replace")
def state(self) -> dict:
return {"kind": "byte"}
@classmethod
def from_state(cls, _: Optional[dict] = None) -> "ByteCodec":
return cls()
class MicroAccordion(nn.Module):
def __init__(
self,
vocab_size: int = 256,
block_size: int = 64,
d_model: int = 32,
n_blocks: int = 1,
n_heads: int = 1,
dropout: float = 0.0,
):
super().__init__()
if d_model % n_heads != 0:
raise ValueError("d_model must be divisible by n_heads")
self.vocab_size = vocab_size
self.block_size = block_size
self.d_model = d_model
self.model_generation = 0
self.token_emb = QuantEmbedding(vocab_size, d_model)
self.pos_emb = nn.Parameter(torch.randn(1, block_size, d_model) * 0.01)
self.blocks = nn.ModuleList(
[TinyMHABlock(d_model, n_heads, dropout) for _ in range(n_blocks)]
)
self.head = QuantLinear(d_model, vocab_size, bias=True)
@property
def n_heads(self) -> int:
return self.blocks[0].n_heads if self.blocks else 1
def can_grow_heads(self) -> bool:
return bool(
self.blocks and
self.blocks[0].n_heads != self.blocks[0].allowed_heads[-1]
)
# ── structural growth ────────────────────────────────────────
def grow_blocks(self, n: int = 1) -> int:
for _ in range(n):
b = TinyMHABlock(self.d_model, self.n_heads)
with torch.no_grad():
for name, p in b.named_parameters():
if "proj.weight" in name or "ff2.weight" in name:
nn.init.zeros_(p)
else:
p.data.mul_(0.05)
self.blocks.append(b)
self.model_generation += 1
return len(self.blocks)
def grow_heads(self, steps: int = 1) -> int:
for b in self.blocks:
b.grow_heads(steps)
self.model_generation += 1
return self.n_heads
def grow_width(
self, delta: int = 16, max_d_model: int = 2048
) -> Tuple[int, int]:
"""Function-preserving d_model growth plus automatic head advance."""
D = self.d_model
D2 = min(D + delta, max_d_model)
if D2 == D:
return D, self.n_heads
new_heads = _next_head_count(self.n_heads, D2)
ne = QuantEmbedding(self.vocab_size, D2)
with torch.no_grad():
ne.weight.data = _expand_2d(self.token_emb.weight.data,
self.vocab_size, D2)
self.token_emb = ne
self.pos_emb = nn.Parameter(
_expand_2d(self.pos_emb.data.squeeze(0),
self.block_size, D2).unsqueeze(0)
)
nh = QuantLinear(D2, self.vocab_size, bias=True)
with torch.no_grad():
nh.weight.data = _expand_2d(self.head.weight.data,
self.vocab_size, D2)
if self.head.bias is not None:
nh.bias.data = self.head.bias.data.clone()
self.head = nh
new_blocks = nn.ModuleList()
for blk in self.blocks:
nb = TinyMHABlock(D2, n_heads=new_heads)
with torch.no_grad():
for attr in ("q", "k", "v", "proj"):
getattr(nb, attr).weight.data = _expand_2d(
getattr(blk, attr).weight.data, D2, D2
)
nb.ff1.weight.data = _expand_2d(blk.ff1.weight.data, 2 * D2, D2)
nb.ff1.bias.data = _expand_1d(blk.ff1.bias.data, 2 * D2)
nb.ff2.weight.data = _expand_2d(blk.ff2.weight.data, D2, 2 * D2)
nb.ff2.bias.data = _expand_1d(blk.ff2.bias.data, D2)
for ln in ("ln1", "ln2"):
getattr(nb, ln).weight.data = _expand_1d(
getattr(blk, ln).weight.data, D2)
getattr(nb, ln).bias.data = _expand_1d(
getattr(blk, ln).bias.data, D2)
new_blocks.append(nb)
self.blocks = new_blocks
self.d_model = D2
self.model_generation += 1
return D2, new_heads
# ── forward ────────────────────────────────────────
def forward(
self,
idx: torch.Tensor,
kernel_db: Optional[KernelDB] = None,
step: int = 0,
revision_span: int = 32,
enable_cache: bool = False,
allow_train_bypass: bool = False,
) -> torch.Tensor:
B, T = idx.shape
x = self.token_emb(idx) + self.pos_emb[:, :T, :]
bucket = step // revision_span
for bi, blk in enumerate(self.blocks):
if not (enable_cache and kernel_db is not None):
x = blk(x)
continue
key = kernel_db.fingerprint(
bi, self.model_generation, blk.n_heads, bucket, idx
)
out, bypassed = kernel_db.maybe_use(
key, step, lambda b=blk, xx=x: b(xx),
allow_bypass=(allow_train_bypass or not self.training),
)
x = out.to(x.device) if bypassed else out
return self.head(x)
class StreamCorpus:
"""Rolling token buffer for streaming training."""
def __init__(
self,
codec: ByteCodec,
buffer_min: int = 100_000,
buffer_max: int = 2_000_000,
):
self.codec = codec
self.buffer: List[int] = []
self.buffer_min = buffer_min
self.buffer_max = buffer_max
self._src: Optional[Iterator[str]] = None
self.docs_consumed = 0
self.tokens_consumed = 0
# ── source constructors ────────────────────────────────────────
def from_hf(
self,
name: str,
config: Optional[str] = None,
split: str = "train",
text_field: str = "text",
) -> "StreamCorpus":
"""Stream a HuggingFace dataset (pip install datasets)."""
try:
from datasets import load_dataset # type: ignore
except ImportError:
sys.exit(
"HuggingFace `datasets` not found.\n"
"Install with: pip install datasets\n"
"Then re-run."
)
kw: dict = dict(split=split, streaming=True, trust_remote_code=True)
ds = load_dataset(name, config, **kw) if config else load_dataset(name, **kw)
self._src = (item[text_field] for item in ds)
return self
def from_urls(self, urls: List[str]) -> "StreamCorpus":
def _gen() -> Iterator[str]:
for url in urls:
try:
text = fetch_and_strip(url)
print(f"[stream] fetched {len(text):,} chars from {url}")
yield text
except Exception as exc:
print(f"[stream] failed {url}: {exc}")
self._src = _gen()
return self
def from_text(self, text: str) -> "StreamCorpus":
self._src = iter([text])
return self
# ── buffer ────────────────────────────────────────
def _refill(self) -> None:
if self._src is None:
return
while len(self.buffer) < self.buffer_min:
try:
text = next(self._src)
except StopIteration:
self._src = None
break
toks = self.codec.encode(text + "\n")
self.buffer.extend(toks)
self.docs_consumed += 1
self.tokens_consumed += len(toks)
if len(self.buffer) >= self.buffer_min:
break
if len(self.buffer) > self.buffer_max:
# trim oldest half to free memory
self.buffer = self.buffer[self.buffer_max // 2:]
@property
def exhausted(self) -> bool:
return self._src is None and len(self.buffer) < 2
def sample_batch(
self, block_size: int, batch_size: int, device: str
) -> Tuple[torch.Tensor, torch.Tensor]:
self._refill()
n = len(self.buffer)
if n < block_size + 2:
raise RuntimeError(
f"buffer has only {n} tokens (need {block_size + 2}) — "
"stream may be exhausted"
)
max_start = n - block_size - 1
ix = [random.randint(0, max_start) for _ in range(batch_size)]
x = torch.tensor([self.buffer[i : i + block_size ] for i in ix],
dtype=torch.long, device=device)
y = torch.tensor([self.buffer[i+1 : i + block_size + 1] for i in ix],
dtype=torch.long, device=device)
return x, y
class _HTMLStripper(html.parser.HTMLParser):
SKIP = {"script", "style", "head", "meta", "link",
"noscript", "nav", "footer", "header"}
def __init__(self):
super().__init__()
self._parts: List[str] = []
self._depth = 0
def handle_starttag(self, tag, attrs):
if tag.lower() in self.SKIP:
self._depth += 1
def handle_endtag(self, tag):
if tag.lower() in self.SKIP and self._depth > 0:
self._depth -= 1
def handle_data(self, data):
if self._depth == 0:
s = data.strip()
if s:
self._parts.append(s)
def get_text(self) -> str:
return "\n".join(self._parts)
def html_to_text(raw: str) -> str:
s = _HTMLStripper()
try:
s.feed(raw)
except Exception:
pass
return html.unescape(s.get_text())
def fetch_and_strip(url: str, timeout: int = 30) -> str:
req = urllib.request.Request(url, headers={"User-Agent": "MicroAccordion/3.0"})
with urllib.request.urlopen(req, timeout=timeout) as resp:
ct = resp.headers.get("Content-Type", "")
raw = resp.read().decode("utf-8", errors="replace")
return html_to_text(raw) if ("html" in ct.lower() or raw.lstrip().startswith("<")) else raw
def load_text(
path: Optional[str] = None, data_url: Optional[str] = None
) -> Tuple[str, str]:
if data_url:
return fetch_and_strip(data_url), f"url:{data_url}"
if path:
return Path(path).read_text(encoding="utf-8"), f"file:{path}"
return DEFAULT_CORPUS, "built-in-seed"
def prepare_data_tensor(text: str) -> torch.Tensor:
return torch.tensor(ByteCodec().encode(text), dtype=torch.long)
def random_batch(
data: torch.Tensor, block_size: int, batch_size: int, device: str
) -> Tuple[torch.Tensor, torch.Tensor]:
if len(data) <= block_size + 1:
raise ValueError(
f"corpus too short ({len(data)} tokens) for block_size={block_size}"
)
ix = torch.randint(0, len(data) - block_size - 1, (batch_size,))
x = torch.stack([data[i : i + block_size ] for i in ix]).to(device)
y = torch.stack([data[i+1 : i + block_size + 1] for i in ix]).to(device)
return x, y
def sample_text(
model: MicroAccordion,
codec: ByteCodec,
prompt: str,
max_new_tokens: int,
device: str,
temperature: float = 1.0,
top_k: int = 16,
self_calls: int = 1,
kernel_db: Optional[KernelDB] = None,
) -> str:
model.eval()
out = codec.encode(prompt) if prompt else codec.encode("Assistant:")
with torch.no_grad():
for _ in range(max(1, self_calls)):
for _ in range(max_new_tokens):
ctx = out[-model.block_size:]
logits = model(
torch.tensor([ctx], dtype=torch.long, device=device),
kernel_db=kernel_db, step=10**9,
enable_cache=kernel_db is not None,
)
nl = logits[0, -1] / max(temperature, 1e-4)
if top_k and top_k < nl.numel():
v, idx = torch.topk(nl, top_k)
nxt = idx[torch.multinomial(torch.softmax(v, -1), 1)].item()
else:
nxt = torch.multinomial(torch.softmax(nl, -1), 1).item()
out.append(int(nxt))
return codec.decode(out)
def save_checkpoint(
path: str,
model: MicroAccordion,
codec: ByteCodec,
kdb: KernelDB,
mut: LuminosityMutator,
meta: dict,
) -> None:
torch.save(
dict(
model_state=model.state_dict(),
model_config=dict(
vocab_size=model.vocab_size, block_size=model.block_size,
d_model=model.d_model, n_blocks=len(model.blocks),
n_heads=model.n_heads,
),
codec=codec.state(),
kernel_db=kdb.to_state(),
mutator=mut.state(),
meta=meta,
),
path,
)
def load_checkpoint(
path: str, device: str
) -> Tuple[MicroAccordion, ByteCodec, KernelDB, LuminosityMutator, dict]:
raw = torch.load(path, map_location=device, weights_only=False)
model = MicroAccordion(**raw["model_config"]).to(d
model.load_state_dict(raw["model_state"]
codec = ByteCodec.from_state(raw.get("codec"))
kdb = KernelDB.from_state(raw.get("kernel_db",
mut = LuminosityMutator.from_state(raw.get("mu
return model, codec, kdb, mut, raw.get("meta", {})
def _make_kdb(args: argparse.Namespace) -> KernelDB:
return KernelDB(
ttl=args.cache_ttl,
high_confidence=args.cache_high_confiden
divergence_tolerance=args.cache_divergen
max_entries=args.cache_entries,
)
def _make_mutator(args: argparse.Namespace) -> LuminosityMutator:
return LuminosityMutator(
mutation_rate=args.mutation_rate,
mutation_strength=args.mutation_strength,
strategy=args.mutation_strategy,
plateau_amplifier=args.mutation_plateau_
)
def _rebuild_opt(
model: MicroAccordion, args: argparse.Namespace
) -> torch.optim.Optimizer:
return torch.optim.AdamW(
model.parameters(), lr=args.lr, weight_decay=args.weight_decay
)
def _try_grow_heads_blocks(
model: MicroAccordion,
args: argparse.Namespace,
opt: torch.optim.Optimizer,
step: int,
reason: str,
) -> Tuple[torch.optim.Optimizer, bool]:
do_heads = getattr(args, "auto_grow_heads", False) or getattr(args, "auto_grow", False)
do_blocks = getattr(args, "auto_grow_blocks", False) or getattr(args, "auto_grow", False)
grew: List[str] = []
if do_heads and model.can_grow_heads():
grew.append(f"heads->{model.grow_heads(1)
if do_blocks:
grew.append(f"blocks->{model.grow_blocks(1)
if grew:
opt = _rebuild_opt(model, args)
print(f" [grow:{reason}@{step}] {' '.join(grew)}")
return opt, bool(grew)
def _try_grow_width(
model: MicroAccordion,
args: argparse.Namespace,
opt: torch.optim.Optimizer,
step: int,
) -> Tuple[torch.optim.Optimizer, bool]:
every = getattr(args, "width_grow_every", 0)
if not every or step % every != 0:
return opt, False
cap = getattr(args, "width_grow_max", 2048)
delta = getattr(args, "width_grow_delta", 16)
if model.d_model >= cap:
return opt, False
old_d, old_h = model.d_model, model.n_heads
new_d, new_h = model.grow_width(delta=delta, max_d_model=cap)
opt = _rebuild_opt(model, args)
n = sum(p.numel() for p in model.parameters())
print(
f" [width@{step}] d_model {old_d}->{new_d} "
f"heads {old_h}->{new_h} "
f"params={n:,}"
)
return opt, True
def _try_mutate(
model: MicroAccordion,
mut: LuminosityMutator,
args: argparse.Namespace,
plateau_ctr: int,
step: int,
verbose: bool = False,
) -> None:
every = getattr(args, "mutate_every", 0)
if not every or step % every != 0:
return
pr = min(1.0, plateau_ctr / max(1, args.grow_patience_steps))
mi = mut.mutate(model, plateau_ratio=pr)
if verbose:
print(f" [mutate@{step}] strategy={mi['strategy']} "
f"strength={mi['strength']:.5f} hit={mi['params_mutated']}")
def _training_loop(
model: MicroAccordion,
get_batch, # callable(step) -> (x, y)
args: argparse.Namespace,
kdb: KernelDB,
mut: LuminosityMutator,
device: str,
codec: ByteCodec,
start_step: int,
n_steps: int,
) -> Tuple[float, torch.optim.Optimizer, float]:
"""Inner training loop."""
opt = _rebuild_opt(model, args)
best_loss = float("inf")
plateau_ctr = 0
ema_loss: Optional[float] = None
grow_every = max(1, args.grow_every)
for step in range(start_step, start_step + n_steps):
xb, yb = get_batch(step)
model.train()
logits = model(xb, kernel_db=kdb, step=step,
revision_span=args.cache_revision_span,
enable_cache=args.enable_cache,
allow_train_bypass=args.allow_train_bypa
loss = F.cross_entropy(logits.view(-1, model.vocab_size), yb.view(-1))
opt.zero_grad(set_to_none=True)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.par
opt.step()
lv = float(loss.item())
ema_loss = lv if ema_loss is None else 0.95 * ema_loss + 0.05 * lv
if ema_loss < best_loss - args.grow_patience_delta:
best_loss, plateau_ctr = ema_loss, 0
else:
plateau_ctr += 1
opt, wg = _try_grow_width(model, args, opt, step)
if wg:
plateau_ctr = 0
if args.auto_grow and (
step % grow_every == 0
or plateau_ctr >= args.grow_patience_steps
):
reason = ("plateau" if plateau_ctr >= args.grow_patience_steps
else "sched")
opt, grew = _try_grow_heads_blocks(model, args, opt, step, reason)
if grew:
plateau_ctr = 0
_try_mutate(model, mut, args, plateau_ctr, step,
verbose=(step % args.log_every == 0))
if step % args.log_every == 0 or step == start_step:
samp = sample_text(
model, codec, args.sample_prompt, args.sample_tokens,
device, args.temperature, args.top_k, 1,
kdb if args.enable_cache else None,
)
print(
f"step={step:6d} loss={lv:.4f} ema={ema_loss:.4f} "
f"d={model.d_model} B={len(model.blocks)} H={model.n_heads} "
f"cache_bypass={kdb.stats['bypasses']} "
f"sample={samp[:80]!r}"
)
return best_loss, opt, ema_loss if ema_loss is not None else float("inf")
def run_training(
model: MicroAccordion,
data: torch.Tensor,
args: argparse.Namespace,
kdb: KernelDB,
mut: LuminosityMutator,
device: str,
codec: Optional[ByteCodec] = None,
start_step: int = 1,
total_steps: Optional[int] = None,
) -> Tuple[float, dict]:
codec = codec or ByteCodec()
n = total_steps if total_steps is not None else args.steps
def _get(step: int) -> Tuple[torch.Tensor, torch.Tensor]:
return random_batch(data, args.block_size, args.batch_size, device)
best, _, ema = _training_loop(
model, _get, args, kdb, mut, device, codec, start_step, n
)
return best, dict(
last_step=start_step + n - 1, best_ema_loss=best,
blocks=len(model.blocks), heads=model.n_heads, d_model=model.d_model,
kernel_stats=kdb.stats, mutator_stats=dict(mut.stats),
)
def run_stream_training(
model: MicroAccordion,
stream: StreamCorpus,
args: argparse.Namespace,
kdb: KernelDB,
mut: LuminosityMutator,
device: str,
codec: ByteCodec,
start_step: int = 1,
total_steps: int = 10_000,
epoch_steps: int = 1_000,
checkpoint_path: str = "microaccordion.pt",
meta: Optional[dict] = None,
) -> dict:
if meta is None:
meta = {}
epoch_num = meta.get("epoch", 0)
steps_done = 0
global_step = start_step
best_loss = float("inf")
print(
f"\n[stream-train] {total_steps} steps epoch={epoch_steps} "
f"d_model={model.d_model} blocks={len(model.blocks)} "
f"heads={model.n_heads}\n"
)
while steps_done < total_steps:
if stream.exhausted:
print("[stream] source exhausted — stopping.")
break
this_epoch = min(epoch_steps, total_steps - steps_done)
epoch_num += 1
print(
f"── epoch {epoch_num} "
f"[{global_step}..{global_step + this_epoch - 1}] "
f"d={model.d_model} B={len(model.blocks)} H={model.n_heads} "
f"docs={stream.docs_consumed:,} ──"
)
def _get(step: int) -> Tuple[torch.Tensor, torch.Tensor]:
return stream.sample_batch(args.block_size, args.batch_size, device)
bl, _, _ = _training_loop(
model, _get, args, kdb, mut, device, codec, global_step, this_epoch
)
if bl < best_loss:
best_loss = bl
global_step += this_epoch
steps_done += this_epoch
meta.update(dict(
last_step=global_step - 1, epoch=epoch_num,
best_ema_loss=best_loss, d_model=model.d_model,
blocks=len(model.blocks), heads=model.n_heads,
docs_consumed=stream.docs_consumed,
tokens_consumed=stream.tokens_consumed,
mutator_stats=dict(mut.stats),
))
save_checkpoint(checkpoint_path, model, codec, kdb, mut, meta)
print(
f" [ckpt] epoch={epoch_num} "
f"steps={steps_done}/{total_steps} "
f"best_ema={best_loss:.4f} "
f"saved={checkpoint_path}"
)
return meta
def train_main(args: argparse.Namespace) -> None:
device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu"
text, source = load_text(args.text, args.data_url)
codec = ByteCodec()
data = prepare_data_tensor(text)
if getattr(args, "download_out", None) and args.data_url:
Path(args.download_out).write_text(text,
print(f"downloaded -> {args.download_out}")
model = MicroAccordion(
vocab_size=codec.vocab_size, block_size=args.block_size,
d_model=args.d_model, n_blocks=args.n_blocks,
n_heads=args.n_heads, dropout=args.dropout,
).to(device)
kdb = _make_kdb(args)
mut = _make_mutator(args)
_, meta = run_training(model, data, args, kdb, mut, device, codec=codec)
meta.update(dict(device=device, data_source=source,
corpus_bytes=len(text.encode())))
save_checkpoint(args.checkpoint, model, codec, kdb, mut, meta)
print(f"saved -> {args.checkpoint}")
def query_main(args: argparse.Namespace) -> None:
device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu"
model, codec, kdb, _, _ = load_checkpoint(args.checkpoint, device)
print(sample_text(
model, codec, args.prompt, args.max_new_tokens, device,
args.temperature, args.top_k, args.self_calls,
kdb if args.enable_cache else None,
))
def grow_main(args: argparse.Namespace) -> None:
device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu"
model, codec, kdb, mut, meta = load_checkpoint(args.checkpoint, device)
b0, h0, d0 = len(model.blocks), model.n_heads, model.d_model
if args.blocks:
model.grow_blocks(args.blocks)
if args.head_steps:
model.grow_heads(args.head_steps)
if args.width_delta:
model.grow_width(delta=args.width_delta)
meta.update(dict(blocks=len(model.blocks)
d_model=model.d_model))
save_checkpoint(args.checkpoint, model, codec, kdb, mut, meta)
print(f"blocks {b0}->{len(model.blocks)} heads {h0}->{model.n_heads} "
f"d_model {d0}->{model.d_model} saved {args.checkpoint}")
def inspect_main(args: argparse.Namespace) -> None:
model, _, kdb, mut, meta = load_checkpoint(args.checkpoint, "cpu")
n = sum(p.numel() for p in model.parameters())
print(json.dumps(dict(
d_model=model.d_model, block_size=model.block_size,
blocks=len(model.blocks), heads=model.n_heads,
params=n, approx_4bit_bytes=math.ceil(n / 2),
kernel_db_entries=len(kdb.entries), kernel_db_stats=kdb.stats,
luminosity_mutator=mut.state(), meta=meta,
), indent=2))
def seed_main(args: argparse.Namespace) -> None:
if args.out:
Path(args.out).write_text(DEFAULT_CORPUS,
print(f"wrote seed -> {args.out}")
else:
print(DEFAULT_CORPUS)
def crawl_train_main(args: argparse.Namespace) -> None:
device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu"
codec = ByteCodec()
if args.resume and Path(args.checkpoint).exists():
model, codec, kdb, mut, meta = load_checkpoint(args.checkpoint, device)
start_step = meta.get("last_step", 0) + 1
print(f"resumed at step {start_step}, "
f"d={model.d_model} B={len(model.blocks)} H={model.n_heads}")
else:
model = MicroAccordion(
vocab_size=codec.vocab_size, block_size=args.block_size,
d_model=args.d_model, n_blocks=args.n_blocks,
n_heads=args.n_heads, dropout=args.dropout,
).to(device)
kdb, mut, meta, start_step = _make_kdb(args), _make_mutator(args), {}, 1
print(f"fetching {args.url} ...")
t0 = time.time()
corpus = fetch_and_strip(args.url)
print(f" {len(corpus):,} chars in {time.time()-t0:.1f}s")
if getattr(args, "corpus_out", None):
Path(args.corpus_out).write_text(corpus,
stream = StreamCorpus(codec).from_text(corpus)
meta["url"] = args.url
run_stream_training(
model=model, stream=stream, args=args, kdb=kdb, mut=mut,
device=device, codec=codec, start_step=start_step,
total_steps=args.total_steps, epoch_steps=args.epoch_steps,
checkpoint_path=args.checkpoint, meta=meta,
)
def stream_train_main(args: argparse.Namespace) -> None:
"""Train on a HuggingFace streaming dataset or URL list."""
device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu"
codec = ByteCodec()
if args.resume and Path(args.checkpoint).exists():
model, codec, kdb, mut, meta = load_checkpoint(args.checkpoint, device)
start_step = meta.get("last_step", 0) + 1
print(f"resumed at step {start_step}, "
f"d={model.d_model} B={len(model.blocks)} H={model.n_heads}")
else:
model = MicroAccordion(
vocab_size=codec.vocab_size, block_size=args.block_size,
d_model=args.d_model, n_blocks=args.n_blocks,
n_heads=args.n_heads, dropout=args.dropout,
).to(device)
kdb, mut, meta, start_step = _make_kdb(args), _make_mutator(args), {}, 1
buf_min = getattr(args, "stream_buffer_min", 100_000)
buf_max = getattr(args, "stream_buffer_max", 2_000_000)
stream = StreamCorpus(codec, buffer_min=buf_min, buffer_max=buf_max)
if getattr(args, "hf_dataset", None):
print(f"streaming HuggingFace: {args.hf_dataset} "
f"({args.hf_config or 'default'}) split={args.hf_split} "
f"field={args.hf_text_field}")
stream.from_hf(
args.hf_dataset,
config=args.hf_config or None,
split=args.hf_split,
text_field=args.hf_text_field,
)
meta["stream_source"] = args.hf_dataset
elif getattr(args, "stream_urls", None):
urls = [u.strip() for u in args.stream_urls.split(",") if u.strip()]
print(f"streaming {len(urls)} URL(s)")
stream.from_urls(urls)
meta["stream_source"] = args.stream_urls
else:
print("no stream source given — using built-in corpus")
stream.from_text(DEFAULT_CORPUS)
meta["stream_source"] = "built-in"
run_stream_training(
model=model, stream=stream, args=args, kdb=kdb, mut=mut,
device=device, codec=codec, start_step=start_step,
total_steps=args.total_steps, epoch_steps=args.epoch_steps,
checkpoint_path=args.checkpoint, meta=meta,
)
def io_main(args: argparse.Namespace) -> None:
device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu"
codec = ByteCodec()
if args.checkpoint and Path(args.checkpoint).exists():
model, codec, kdb, mut, meta = load_checkpoint(args.checkpoint, device)
corpus, source = (
load_text(args.text, args.data_url)
if (args.text or args.data_url)
else (DEFAULT_CORPUS, "built-in")
)
print(f"loaded {args.checkpoint} "
f"d={model.d_model} B={len(model.blocks)} H={model.n_heads}")
else:
corpus, source = load_text(args.text, args.data_url)
model = MicroAccordion(
vocab_size=codec.vocab_size, block_size=args.block_size,
d_model=args.d_model, n_blocks=args.n_blocks,
n_heads=args.n_heads, dropout=args.dropout,
).to(device)
kdb = _make_kdb(args)
mut = _make_mutator(args)
meta = {"data_source": source}
print("initialised new model")
print("Commands: /help /train [n] /grow block|head|width [n] "
"/mutate [pr] /stats /save [path] /corpus /append TEXT /fetch URL /quit")
turn_count = 0
while True:
try:
line = input("you> ").strip()
except (EOFError, KeyboardInterrupt):
print("\nbye"); break
if not line:
continue
if line in {"/quit", "/exit"}:
break
if line == "/help":
print(textwrap.dedent("""
/train [n]
/grow block|head|width [n]
/mutate [plateau]
/stats
/save [path]
/corpus
/append TEXT
/fetch URL
/quit
""").strip())
continue
if line.startswith("/corpus"):
print(corpus); continue
if line.startswith("/append "):
added = line[8:]
corpus += added + "\n"
print(f"appended {len(added)} chars"); continue
if line.startswith("/fetch "):
url = line[7:].strip()
try:
fetched = fetch_and_strip(url)
corpus += "\n" + fetched
print(f"fetched {len(fetched):,} chars from {url}")
except Exception as exc:
print(f"fetch failed: {exc}")
continue
if line.startswith("/train"):
parts = shlex.split(line)
steps = int(parts[1]) if len(parts) > 1 else args.steps
data = prepare_data_tensor(corpus)
_, nm = run_training(model, data, args, kdb, mut, device,
codec=codec, start_step=turn_count + 1,
total_steps=steps)
meta.update(nm); turn_count += steps
print(f"trained {steps} steps "
f"d={model.d_model} B={len(model.blocks)} H={model.n_heads}")
continue
if line.startswith("/grow"):
parts = shlex.split(line)
if len(parts) < 2:
print("usage: /grow block|head|width [n]"); continue
n = int(parts[2]) if len(parts) > 2 else 1
if parts[1] == "block":
print(f"blocks -> {model.grow_blocks(n)}")
elif parts[1] == "head":
for _ in range(n):
if model.can_grow_heads():
model.grow_heads(1)
print(f"heads -> {model.n_heads}")
elif parts[1] == "width":
nd, nh = model.grow_width(delta=n)
print(f"d_model->{nd} heads->{nh} "
f"params={sum(p.numel() for p in model.parameters()):,}")
else:
print("usage: /grow block|head|width [n]")
continue
if line.startswith("/mutate"):
parts = shlex.split(line)
pr = float(parts[1]) if len(parts) > 1 else 0.0
mi = mut.mutate(model, plateau_ratio=pr)
print(f"[mutator] strategy={mi['strategy']} "
f"strength={mi['strength']:.5f} "
f"params_hit={mi['params_mutated']}")
continue
if line.startswith("/stats"):
n = sum(p.numel() for p in model.parameters())
print(json.dumps(dict(
d_model=model.d_model, blocks=len(model.blocks),
heads=model.n_heads, params=n,
approx_4bit_bytes=math.ceil(n / 2),
kernel_db_entries=len(kdb.entries),
kernel_db_stats=kdb.stats,
luminosity_mutator=mut.state(),
corpus_chars=len(corpus),
), indent=2)); continue
if line.startswith("/save"):
parts = shlex.split(line)
p = parts[1] if len(parts) > 1 else (args.checkpoint or "microaccordion.pt")
meta.update(dict(turn_count=turn_count, corpus_chars=len(corpus)))
save_checkpoint(p, model, codec, kdb, mut, meta)
print(f"saved -> {p}"); continue
prompt = f"User: {line}\nAssistant:"
reply = sample_text(model, codec, prompt, args.max_new_tokens, device,
args.temperature, args.top_k, args.self_calls,
kdb if args.enable_cache else None)
answer = reply.split("Assistant:", 1)[-1].strip()
print(f"bot> {answer}")
corpus += f"User: {line}\nAssistant: {answer}\n"
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description=(
"MicroAccordion — self-growing byte transformer."
)
)
sub = p.add_subparsers(dest="cmd", required=True)
def _add_training(sp: argparse.ArgumentParser,
io_mode: bool = False) -> None:
"""Arguments common to all training sub-commands."""
sp.add_argument("--text", default=None)
sp.add_argument("--data-url", default=None)
sp.add_argument("--download-out", default=None)
sp.add_argument("--d-model", type=int, default=32)
sp.add_argument("--n-blocks", type=int, default=1)
sp.add_argument("--n-heads", type=int, default=1)
sp.add_argument("--block-size", type=int, default=48)
sp.add_argument("--dropout", type=float, default=0.0)
sp.add_argument("--lr", type=float, default=3e-3)
sp.add_argument("--weight-decay", type=float, default=0.01)
sp.add_argument("--grad-clip", type=float, default=1.0)
sp.add_argument("--batch-size", type=int, default=16)
sp.add_argument("--steps", type=int, default=600 if not io_mode else 120)
sp.add_argument("--log-every", type=int, default=50 if not io_mode else 20)
sp.add_argument("--sample-prompt", type=str, default="Luminosity and Gpteus")
sp.add_argument("--sample-tokens", type=int, default=48)
sp.add_argument("--max-new-tokens",type=i
sp.add_argument("--temperature", type=float, default=0.9)
sp.add_argument("--top-k", type=int, default=16)
sp.add_argument("--self-calls", type=int, default=1)
sp.add_argument("--enable-cache", action="store_true")
sp.add_argument("--allow-train-bypass", action="store_true")
sp.add_argument("--cache-ttl", type=int, default=48)
sp.add_argument("--cache-high-confidence"
sp.add_argument("--cache-divergence-tol"
sp.add_argument("--cache-entries", type=int, default=4096)
sp.add_argument("--cache-revision-span",
sp.add_argument("--auto-grow", action="store_true",
help="grow heads and blocks")
sp.add_argument("--auto-grow-heads", action="store_true")
sp.add_argument("--auto-grow-blocks", action="store_true")
sp.add_argument("--grow-every", type=int, default=150)
sp.add_argument("--grow-patience-steps",
sp.add_argument("--grow-patience-delta",
sp.add_argument("--width-grow-every", type=int, default=0,
help="expand d_model every N steps")
sp.add_argument("--width-grow-delta", type=int, default=16,
help="dims added per width growth")
sp.add_argument("--width-grow-max", type=int, default=2048,
help="d_model cap")
sp.add_argument("--mutate-every", type=int, default=0,
help="fire mutator every N steps")
sp.add_argument("--mutation-rate", type=float, default=0.02)
sp.add_argument("--mutation-strength", type=float, default=0.05)
sp.add_argument("--mutation-strategy", type=str, default="luminosity",
choices=LuminosityMutator.STRATEGIES)
sp.add_argument("--mutation-plateau-amp"
sp.add_argument("--cpu", action="store_true")
def _add_stream(sp: argparse.ArgumentParser) -> None:
"""Extra arguments for streaming sub-commands."""
_add_training(sp)
sp.add_argument("--checkpoint", type=str, default="microaccordion.pt")
sp.add_argument("--resume", action="store_true")
sp.add_argument("--total-steps", type=int, default=3000)
sp.add_argument("--epoch-steps", type=int, default=300)
sp.add_argument("--stream-buffer-min", type=int, default=100_000)
sp.add_argument("--stream-buffer-max", type=int, default=2_000_000)
tr = sub.add_parser("train", help="train on a static corpus")
tr.add_argument("--checkpoint", type=str, default="microaccordion.pt")
_add_training(tr)
tr.set_defaults(func=train_main)
qu = sub.add_parser("query", help="sample text from a checkpoint")
qu.add_argument("--checkpoint", type=str, required=True)
qu.add_argument("--prompt", type=str, default="Luminosity and Gpteus")
qu.add_argument("--max-new-tokens",type=i
qu.add_argument("--temperature", type=float, default=0.9)
qu.add_argument("--top-k", type=int, default=16)
qu.add_argument("--self-calls", type=int, default=1)
qu.add_argument("--enable-cache", action="store_true")
qu.add_argument("--cpu", action="store_true")
qu.set_defaults(func=query_main)
gr = sub.add_parser("grow", help="manually grow a saved checkpoint")
gr.add_argument("--checkpoint", type=str, required=True)
gr.add_argument("--blocks", type=int, default=0,
help="add this many blocks")
gr.add_argument("--head-steps", type=int, default=0,
help="advance heads this many divisor steps")
gr.add_argument("--width-delta", type=int, default=0,
help="expand d_model by this many dims")
gr.add_argument("--cpu", action="store_true")
gr.set_defaults(func=grow_main)
ins = sub.add_parser("inspect", help="inspect checkpoint stats")
ins.add_argument("--checkpoint", type=str, required=True)
ins.set_defaults(func=inspect_main)
sd = sub.add_parser("seed", help="print or export built-in seed corpus")
sd.add_argument("--out", type=str, default=None)
sd.set_defaults(func=seed_main)
io = sub.add_parser("io", help="interactive talk / train shell")
io.add_argument("--checkpoint", type=str, default="microaccordion.pt")
_add_training(io, io_mode=True)
io.set_defaults(func=io_main)
ct = sub.add_parser(
"crawl-train",
help="fetch one URL, strip HTML, train with full growth schedule",
)
ct.add_argument("--url", type=str, required=True)
ct.add_argument("--corpus-out", type=str, default=None,
help="save stripped plaintext here")
_add_stream(ct)
ct.set_defaults(func=crawl_train_main)
st = sub.add_parser(
"stream-train",
help=(
"train on a HuggingFace stream or URL list"
),
)
st.add_argument("--hf-dataset", type=str, default=None,
help="HuggingFace dataset name")
st.add_argument("--hf-config", type=str, default=None,
help="dataset config / subset")
st.add_argument("--hf-split", type=str, default="train")
st.add_argument("--hf-text-field", type=str, default="text")
st.add_argument("--stream-urls", type=str, default=None,
help="comma-separated URL list")
_add_stream(st)
st.set_defaults(func=stream_train_main)
return p
def main() -> None:
parser = build_parser()
args = parser.parse_args()
torch.set_num_threads(min(4, os.cpu_count() or 1))
torch.manual_seed(1337)
random.seed(1337)
args.func(args)
if __name__ == "__main__":
main()