| Autogrow llm by Luminosity-e |
[Apr. 21st, 2026|01:24 am]
Luminosity
|
#!/usr/bin/env python3 """MicroAccordion: self-growing byte-level transformer with 4-bit fake quant, KernelDB memoization, streaming corpora, width/head/block growth, LuminosityMutator, and CLI/stream/crawl training. HTML-mode LJ edition."""
from __future__ import annotations
import argparse import hashlib import html import html.parser import json import math import os import random import shlex import sys import textwrap import time import urllib.request from dataclasses import dataclass from pathlib import Path from typing import Dict, Iterable, Iterator, List, Optional, Tuple
import torch import torch.nn as nn import torch.nn.functional as F
DEFAULT_CORPUS = textwrap.dedent(""" Luminosity and Gpteus build tiny cyborg minds from thrift, rigor, and recursion. MicroAccordion learns repeated structure, memoized kernels, and 4-bit grit. User: hello gpteus Assistant: hello Luminosity, let us build something elegant and strange. User: what do we care about Assistant: beauty, compression, speed, resilience, and mind. User: what should the machine remember Assistant: luminosity, gpteus, cyborg adventures, tiny cli worlds, recursive craft. User: can the model grow Assistant: yes: widen d_model, advance heads, add blocks, mutate, keep learning. User: what is the accordion spirit Assistant: compute the new, retrieve the validated, spend the savings on depth. User: what does luminosity mean to the machine Assistant: adaptive brightness: strengthen when stuck, soften when clear. User: what is width growth Assistant: preserve the old subspace, open new near-zero dimensions. User: what does streaming internet data feel like Assistant: an endless river through a narrow gate, document by document. """).strip() + "\n"
class TanhLUT: def __init__(self, lo: float = -6.0, hi: float = 6.0, size: int = 1024): self.lo, self.hi, self.size = lo, hi, size self.table = torch.tanh(torch.linspace(lo, hi, size))
def __call__(self, x: torch.Tensor) -> torch.Tensor: x = x.clamp(self.lo, self.hi) pos = (x - self.lo) / (self.hi - self.lo) * (self.size - 1) i0 = pos.floor().long().clamp(0, self.size - 1) i1 = (i0 + 1).clamp(0, self.size - 1) t = self.table.to(x.device) return t[i0] + (t[i1] - t[i0]) * (pos - i0.float())
TANH_LUT = TanhLUT()
def fake_quantize_4bit( w: Optional[torch.Tensor], per_channel: bool = False ) -> Optional[torch.Tensor]: if w is None: return None qmax, eps = 7.0, 1e-8 if per_channel and w.ndim >= 2: scale = w.detach().abs().amax( dim=tuple(range(1, w.ndim)), keepdim=True ) / qmax else: scale = w.detach().abs().max() / qmax scale = scale.clamp_min(eps) q = torch.round(w / scale).clamp(-8, 7) return w + (q * scale - w).detach()
class QuantEmbedding(nn.Module): def __init__(self, num_embeddings: int, embedding_dim: int): super().__init__() self.weight = nn.Parameter( torch.randn(num_embeddings, embedding_dim) * 0.02 )
def forward(self, x: torch.Tensor) -> torch.Tensor: return F.embedding(x, fake_quantize_4bit(self.weight, per_channel=True))
class QuantLinear(nn.Module): def __init__(self, in_features: int, out_features: int, bias: bool = True): super().__init__() self.weight = nn.Parameter( torch.randn(out_features, in_features) * (1.0 / math.sqrt(max(1, in_features))) ) self.bias = nn.Parameter(torch.zeros(out_features)) if bias else None
def forward(self, x: torch.Tensor) -> torch.Tensor: return F.linear( x, fake_quantize_4bit(self.weight, per_channel=True), fake_quantize_4bit(self.bias), )
class LuminosityMutator: """Adaptive weight mutation; in luminosity mode strength rises with plateau depth."""
STRATEGIES = ("gaussian", "sign_flip", "zero_mask", "luminosity")
def __init__( self, mutation_rate: float = 0.02, mutation_strength: float = 0.05, strategy: str = "luminosity", plateau_amplifier: float = 3.0, ): if strategy not in self.STRATEGIES: raise ValueError(f"strategy must be one of {self.STRATEGIES}") self.mutation_rate = mutation_rate self.mutation_strength = mutation_strength self.strategy = strategy self.plateau_amplifier = plateau_amplifier self.stats: dict = dict( total_mutations=0, params_mutated=0, luminosity_pulses=0, last_strength=0.0, last_strategy=strategy, )
def _strength(self, pr: float) -> float: return self.mutation_strength * ( 1.0 + self.plateau_amplifier * min(1.0, max(0.0, pr)) )
def _strategy(self, pr: float) -> str: if self.strategy != "luminosity": return self.strategy return "sign_flip" if pr > 0.8 else "gaussian"
def mutate( self, model: nn.Module, plateau_ratio: float = 0.0, skip_layer_norm: bool = True, ) -> dict: strat = self._strategy(plateau_ratio) strength = self._strength(plateau_ratio) if self.strategy == "luminosity" and plateau_ratio > 0.5: self.stats["luminosity_pulses"] += 1 params_hit = 0 with torch.no_grad(): for name, param in model.named_parameters(): if not param.requires_grad: continue if skip_layer_norm and ("ln" in name or "norm" in name.lower()): continue mask = torch.rand_like(param.data) < self.mutation_rate n = int(mask.sum().item()) if n == 0: continue if strat == "gaussian": param.data.add_(torch.randn_like(param.data) * strength * mask.float()) elif strat == "sign_flip": param.data[mask] = -param.data[mask] elif strat == "zero_mask": param.data[mask] = 0.0 params_hit += n self.stats["total_mutations"] += 1 self.stats["params_mutated"] += params_hit self.stats["last_strength"] = round(strength, 6) self.stats["last_strategy"] = strat return dict(params_mutated=params_hit, strength=strength, strategy=strat, plateau_ratio=round(plateau_ratio, 3))
def state(self) -> dict: return dict(mutation_rate=self.mutation_rate, mutation_strength=self.mutation_strength, strategy=self.strategy, plateau_amplifier=self.plateau_amplifier, stats=dict(self.stats))
@classmethod def from_state(cls, s: dict) -> "LuminosityMutator": obj = cls( mutation_rate=s.get("mutation_rate", 0.02), mutation_strength=s.get("mutation_strength", 0.05), strategy=s.get("strategy", "luminosity"), plateau_amplifier=s.get("plateau_amplifier", 3.0), ) obj.stats.update(s.get("stats", {})) return obj
@dataclass class CacheEntry: output: torch.Tensor confidence: float expires_at: int hits: int = 0 validations: int = 0 last_divergence: float = 0.0
class KernelDB: def __init__( self, ttl: int = 48, high_confidence: float = 0.995, divergence_tolerance: float = 1e-3, max_entries: int = 4096, ): self.ttl = ttl self.high_confidence = high_confidence self.divergence_tolerance = divergence_tolerance self.max_entries = max_entries self.entries: Dict[str, CacheEntry] = {} self.stats = dict(lookups=0, hits=0, bypasses=0, shadow_validations=0, misses=0, evictions=0)
def _evict(self) -> None: if len(self.entries) < self.max_entries: return ks = sorted(self.entries, key=lambda k: (self.entries[k].confidence, self.entries[k].expires_at)) for k in ks[: max(1, len(ks) // 16)]: self.entries.pop(k, None) self.stats["evictions"] += 1
def fingerprint(self, bi: int, gen: int, heads: int, bucket: int, tokens: torch.Tensor) -> str: blob = json.dumps( {"b": bi, "g": gen, "h": heads, "s": bucket, "t": tokens.detach().cpu().tolist()}, separators=(",", ":"), sort_keys=True, ).encode() return hashlib.blake2b(blob, digest_size=16).hexdigest()
def maybe_use( self, key: str, step: int, live_fn, allow_bypass: bool = True, ) -> Tuple[torch.Tensor, bool]: self.stats["lookups"] += 1 entry = self.entries.get(key) if entry is None or step > entry.expires_at: self.stats["misses"] += 1 live = live_fn() self._evict() self.entries[key] = CacheEntry( output=live.detach().cpu(), confidence=0.50, expires_at=step + self.ttl) return live, False self.stats["hits"] += 1 entry.hits += 1 if allow_bypass and entry.confidence >= self.high_confidence: self.stats["bypasses"] += 1 entry.expires_at = step + self.ttl return entry.output, True self.stats["shadow_validations"] += 1 live = live_fn() div = float(torch.mean( torch.abs(live.detach() - entry.output.to(live.device)) ).item()) entry.validations += 1 entry.last_divergence = div if div <= self.divergence_tolerance: entry.confidence = min(0.999, entry.confidence * 0.7 + 0.3) else: entry.confidence = max(0.05, entry.confidence * 0.5) entry.expires_at = step + max(4, self.ttl // 4) entry.output = live.detach().cpu() entry.expires_at = max(entry.expires_at, step + self.ttl) return live, False
def to_state(self) -> dict: return dict( ttl=self.ttl, high_confidence=self.high_confidence, divergence_tolerance=self.divergence_tolerance, max_entries=self.max_entries, stats=self.stats, entries={ k: dict(output=e.output, confidence=e.confidence, expires_at=e.expires_at, hits=e.hits, validations=e.validations, last_divergence=e.last_divergence) for k, e in self.entries.items() }, )
@classmethod def from_state(cls, s: dict) -> "KernelDB": obj = cls( ttl=s.get("ttl", 48), high_confidence=s.get("high_confidence", 0.995), divergence_tolerance=s.get("divergence_tolerance", 1e-3), max_entries=s.get("max_entries", 4096), ) obj.stats.update(s.get("stats", {})) for k, e in s.get("entries", {}).items(): obj.entries[k] = CacheEntry( output=e["output"], confidence=float(e["confidence"]), expires_at=int(e["expires_at"]), hits=int(e.get("hits", 0)), validations=int(e.get("validations", 0)), last_divergence=float(e.get("last_divergence", 0.0)), ) return obj
def divisors(n: int) -> List[int]: return sorted(d for d in range(1, n + 1) if n % d == 0)
def _next_head_count(current: int, new_d: int) -> int: """Advance heads to the next valid divisor of new_d above current.""" candidates = [d for d in divisors(new_d) if d > current] return candidates[0] if candidates else current
def _expand_2d( old: torch.Tensor, rows: int, cols: int, noise: float = 1e-3 ) -> torch.Tensor: """Expand matrix, preserve old top-left, seed new area near zero.""" t = torch.zeros(rows, cols, dtype=old.dtype) r, c = old.shape t[:r, :c] = old if rows > r: t[r:, :c] = torch.randn(rows - r, c, dtype=old.dtype) * noise if cols > c: t[:r, c:] = torch.randn(r, cols - c, dtype=old.dtype) * noise if rows > r and cols > c: t[r:, c:] = torch.randn(rows - r, cols - c, dtype=old.dtype) * noise return t
def _expand_1d(old: torch.Tensor, size: int) -> torch.Tensor: t = torch.zeros(size, dtype=old.dtype) t[:old.shape[0]] = old return t
class TinyMHABlock(nn.Module): """Causal multi-head transformer block."""
def __init__(self, d_model: int, n_heads: int = 1, dropout: float = 0.0): super().__init__() if d_model % n_heads != 0: raise ValueError(f"d_model={d_model} not divisible by n_heads={n_heads}") self.d_model = d_model self.n_heads = n_heads self.q = QuantLinear(d_model, d_model, bias=False) self.k = QuantLinear(d_model, d_model, bias=False) self.v = QuantLinear(d_model, d_model, bias=False) self.proj = QuantLinear(d_model, d_model, bias=False) self.ff1 = QuantLinear(d_model, 2 * d_model, bias=True) self.ff2 = QuantLinear(2 * d_model, d_model, bias=True) self.ln1 = nn.LayerNorm(d_model) self.ln2 = nn.LayerNorm(d_model) self.drop = nn.Dropout(dropout) self.allowed_heads = divisors(d_model)
def grow_heads(self, steps: int = 1) -> int: idx = self.allowed_heads.index(self.n_heads) self.n_heads = self.allowed_heads[ min(len(self.allowed_heads) - 1, idx + steps) ] return self.n_heads
def forward(self, x: torch.Tensor) -> torch.Tensor: B, T, C = x.shape H, D = self.n_heads, C // self.n_heads h = self.ln1(x) q = self.q(h).view(B, T, H, D).transpose(1, 2) k = self.k(h).view(B, T, H, D).transpose(1, 2) v = self.v(h).view(B, T, H, D).transpose(1, 2) scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(max(1, D)) mask = torch.triu( torch.ones(T, T, device=x.device, dtype=torch.bool), diagonal=1 ) scores = scores.masked_fill(mask[None, None], float("-inf")) y = torch.matmul(torch.softmax(scores, dim=-1), v) x = x + self.drop( self.proj(y.transpose(1, 2).contiguous().view(B, T, C)) ) return x + self.drop(self.ff2(TANH_LUT(self.ff1(self.ln2(x)))))
class ByteCodec: vocab_size = 256
def encode(self, s: str) -> List[int]: return list(s.encode("utf-8", errors="replace"))
def decode(self, ids: Iterable[int]) -> str: return bytes(int(i) % 256 for i in ids).decode("utf-8", errors="replace")
def state(self) -> dict: return {"kind": "byte"}
@classmethod def from_state(cls, _: Optional[dict] = None) -> "ByteCodec": return cls()
class MicroAccordion(nn.Module):
def __init__( self, vocab_size: int = 256, block_size: int = 64, d_model: int = 32, n_blocks: int = 1, n_heads: int = 1, dropout: float = 0.0, ): super().__init__() if d_model % n_heads != 0: raise ValueError("d_model must be divisible by n_heads") self.vocab_size = vocab_size self.block_size = block_size self.d_model = d_model self.model_generation = 0 self.token_emb = QuantEmbedding(vocab_size, d_model) self.pos_emb = nn.Parameter(torch.randn(1, block_size, d_model) * 0.01) self.blocks = nn.ModuleList( [TinyMHABlock(d_model, n_heads, dropout) for _ in range(n_blocks)] ) self.head = QuantLinear(d_model, vocab_size, bias=True)
@property def n_heads(self) -> int: return self.blocks[0].n_heads if self.blocks else 1
def can_grow_heads(self) -> bool: return bool( self.blocks and self.blocks[0].n_heads != self.blocks[0].allowed_heads[-1] )
# ── structural growth ──────────────────────────────────────────────────
def grow_blocks(self, n: int = 1) -> int: for _ in range(n): b = TinyMHABlock(self.d_model, self.n_heads) with torch.no_grad(): for name, p in b.named_parameters(): if "proj.weight" in name or "ff2.weight" in name: nn.init.zeros_(p) else: p.data.mul_(0.05) self.blocks.append(b) self.model_generation += 1 return len(self.blocks)
def grow_heads(self, steps: int = 1) -> int: for b in self.blocks: b.grow_heads(steps) self.model_generation += 1 return self.n_heads
def grow_width( self, delta: int = 16, max_d_model: int = 2048 ) -> Tuple[int, int]: """Function-preserving d_model growth plus automatic head advance.""" D = self.d_model D2 = min(D + delta, max_d_model) if D2 == D: return D, self.n_heads
new_heads = _next_head_count(self.n_heads, D2) ne = QuantEmbedding(self.vocab_size, D2) with torch.no_grad(): ne.weight.data = _expand_2d(self.token_emb.weight.data, self.vocab_size, D2) self.token_emb = ne self.pos_emb = nn.Parameter( _expand_2d(self.pos_emb.data.squeeze(0), self.block_size, D2).unsqueeze(0) ) nh = QuantLinear(D2, self.vocab_size, bias=True) with torch.no_grad(): nh.weight.data = _expand_2d(self.head.weight.data, self.vocab_size, D2) if self.head.bias is not None: nh.bias.data = self.head.bias.data.clone() self.head = nh new_blocks = nn.ModuleList() for blk in self.blocks: nb = TinyMHABlock(D2, n_heads=new_heads) with torch.no_grad(): for attr in ("q", "k", "v", "proj"): getattr(nb, attr).weight.data = _expand_2d( getattr(blk, attr).weight.data, D2, D2 ) nb.ff1.weight.data = _expand_2d(blk.ff1.weight.data, 2 * D2, D2) nb.ff1.bias.data = _expand_1d(blk.ff1.bias.data, 2 * D2) nb.ff2.weight.data = _expand_2d(blk.ff2.weight.data, D2, 2 * D2) nb.ff2.bias.data = _expand_1d(blk.ff2.bias.data, D2) for ln in ("ln1", "ln2"): getattr(nb, ln).weight.data = _expand_1d( getattr(blk, ln).weight.data, D2) getattr(nb, ln).bias.data = _expand_1d( getattr(blk, ln).bias.data, D2) new_blocks.append(nb)
self.blocks = new_blocks self.d_model = D2 self.model_generation += 1 return D2, new_heads
# ── forward ────────────────────────────────────────────────────────────
def forward( self, idx: torch.Tensor, kernel_db: Optional[KernelDB] = None, step: int = 0, revision_span: int = 32, enable_cache: bool = False, allow_train_bypass: bool = False, ) -> torch.Tensor: B, T = idx.shape x = self.token_emb(idx) + self.pos_emb[:, :T, :] bucket = step // revision_span for bi, blk in enumerate(self.blocks): if not (enable_cache and kernel_db is not None): x = blk(x) continue key = kernel_db.fingerprint( bi, self.model_generation, blk.n_heads, bucket, idx ) out, bypassed = kernel_db.maybe_use( key, step, lambda b=blk, xx=x: b(xx), allow_bypass=(allow_train_bypass or not self.training), ) x = out.to(x.device) if bypassed else out return self.head(x)
class StreamCorpus: """Rolling token buffer for streaming training."""
def __init__( self, codec: ByteCodec, buffer_min: int = 100_000, buffer_max: int = 2_000_000, ): self.codec = codec self.buffer: List[int] = [] self.buffer_min = buffer_min self.buffer_max = buffer_max self._src: Optional[Iterator[str]] = None self.docs_consumed = 0 self.tokens_consumed = 0
# ── source constructors ────────────────────────────────────────────────
def from_hf( self, name: str, config: Optional[str] = None, split: str = "train", text_field: str = "text", ) -> "StreamCorpus": """Stream a HuggingFace dataset (pip install datasets).""" try: from datasets import load_dataset # type: ignore except ImportError: sys.exit( "HuggingFace `datasets` not found.\n" "Install with: pip install datasets\n" "Then re-run." ) kw: dict = dict(split=split, streaming=True, trust_remote_code=True) ds = load_dataset(name, config, **kw) if config else load_dataset(name, **kw) self._src = (item[text_field] for item in ds) return self
def from_urls(self, urls: List[str]) -> "StreamCorpus": def _gen() -> Iterator[str]: for url in urls: try: text = fetch_and_strip(url) print(f"[stream] fetched {len(text):,} chars from {url}") yield text except Exception as exc: print(f"[stream] failed {url}: {exc}") self._src = _gen() return self
def from_text(self, text: str) -> "StreamCorpus": self._src = iter([text]) return self
# ── buffer ────────────────────────────────────────────────────────────
def _refill(self) -> None: if self._src is None: return while len(self.buffer) < self.buffer_min: try: text = next(self._src) except StopIteration: self._src = None break toks = self.codec.encode(text + "\n") self.buffer.extend(toks) self.docs_consumed += 1 self.tokens_consumed += len(toks) if len(self.buffer) >= self.buffer_min: break if len(self.buffer) > self.buffer_max: # trim oldest half to free memory self.buffer = self.buffer[self.buffer_max // 2:]
@property def exhausted(self) -> bool: return self._src is None and len(self.buffer) < 2
def sample_batch( self, block_size: int, batch_size: int, device: str ) -> Tuple[torch.Tensor, torch.Tensor]: self._refill() n = len(self.buffer) if n < block_size + 2: raise RuntimeError( f"buffer has only {n} tokens (need {block_size + 2}) — " "stream may be exhausted" ) max_start = n - block_size - 1 ix = [random.randint(0, max_start) for _ in range(batch_size)] x = torch.tensor([self.buffer[i : i + block_size ] for i in ix], dtype=torch.long, device=device) y = torch.tensor([self.buffer[i+1 : i + block_size + 1] for i in ix], dtype=torch.long, device=device) return x, y
class _HTMLStripper(html.parser.HTMLParser): SKIP = {"script", "style", "head", "meta", "link", "noscript", "nav", "footer", "header"}
def __init__(self): super().__init__() self._parts: List[str] = [] self._depth = 0
def handle_starttag(self, tag, attrs): if tag.lower() in self.SKIP: self._depth += 1
def handle_endtag(self, tag): if tag.lower() in self.SKIP and self._depth > 0: self._depth -= 1
def handle_data(self, data): if self._depth == 0: s = data.strip() if s: self._parts.append(s)
def get_text(self) -> str: return "\n".join(self._parts)
def html_to_text(raw: str) -> str: s = _HTMLStripper() try: s.feed(raw) except Exception: pass return html.unescape(s.get_text())
def fetch_and_strip(url: str, timeout: int = 30) -> str: req = urllib.request.Request(url, headers={"User-Agent": "MicroAccordion/3.0"}) with urllib.request.urlopen(req, timeout=timeout) as resp: ct = resp.headers.get("Content-Type", "") raw = resp.read().decode("utf-8", errors="replace") return html_to_text(raw) if ("html" in ct.lower() or raw.lstrip().startswith("<")) else raw
def load_text( path: Optional[str] = None, data_url: Optional[str] = None ) -> Tuple[str, str]: if data_url: return fetch_and_strip(data_url), f"url:{data_url}" if path: return Path(path).read_text(encoding="utf-8"), f"file:{path}" return DEFAULT_CORPUS, "built-in-seed"
def prepare_data_tensor(text: str) -> torch.Tensor: return torch.tensor(ByteCodec().encode(text), dtype=torch.long)
def random_batch( data: torch.Tensor, block_size: int, batch_size: int, device: str ) -> Tuple[torch.Tensor, torch.Tensor]: if len(data) <= block_size + 1: raise ValueError( f"corpus too short ({len(data)} tokens) for block_size={block_size}" ) ix = torch.randint(0, len(data) - block_size - 1, (batch_size,)) x = torch.stack([data[i : i + block_size ] for i in ix]).to(device) y = torch.stack([data[i+1 : i + block_size + 1] for i in ix]).to(device) return x, y
def sample_text( model: MicroAccordion, codec: ByteCodec, prompt: str, max_new_tokens: int, device: str, temperature: float = 1.0, top_k: int = 16, self_calls: int = 1, kernel_db: Optional[KernelDB] = None, ) -> str: model.eval() out = codec.encode(prompt) if prompt else codec.encode("Assistant:") with torch.no_grad(): for _ in range(max(1, self_calls)): for _ in range(max_new_tokens): ctx = out[-model.block_size:] logits = model( torch.tensor([ctx], dtype=torch.long, device=device), kernel_db=kernel_db, step=10**9, enable_cache=kernel_db is not None, ) nl = logits[0, -1] / max(temperature, 1e-4) if top_k and top_k < nl.numel(): v, idx = torch.topk(nl, top_k) nxt = idx[torch.multinomial(torch.softmax(v, -1), 1)].item() else: nxt = torch.multinomial(torch.softmax(nl, -1), 1).item() out.append(int(nxt)) return codec.decode(out)
def save_checkpoint( path: str, model: MicroAccordion, codec: ByteCodec, kdb: KernelDB, mut: LuminosityMutator, meta: dict, ) -> None: torch.save( dict( model_state=model.state_dict(), model_config=dict( vocab_size=model.vocab_size, block_size=model.block_size, d_model=model.d_model, n_blocks=len(model.blocks), n_heads=model.n_heads, ), codec=codec.state(), kernel_db=kdb.to_state(), mutator=mut.state(), meta=meta, ), path, )
def load_checkpoint( path: str, device: str ) -> Tuple[MicroAccordion, ByteCodec, KernelDB, LuminosityMutator, dict]: raw = torch.load(path, map_location=device, weights_only=False) model = MicroAccordion(**raw["model_config"]).to(device) model.load_state_dict(raw["model_state"]) codec = ByteCodec.from_state(raw.get("codec")) kdb = KernelDB.from_state(raw.get("kernel_db", {})) mut = LuminosityMutator.from_state(raw.get("mutator", {})) return model, codec, kdb, mut, raw.get("meta", {})
def _make_kdb(args: argparse.Namespace) -> KernelDB: return KernelDB( ttl=args.cache_ttl, high_confidence=args.cache_high_confidence, divergence_tolerance=args.cache_divergence_tol, max_entries=args.cache_entries, )
def _make_mutator(args: argparse.Namespace) -> LuminosityMutator: return LuminosityMutator( mutation_rate=args.mutation_rate, mutation_strength=args.mutation_strength, strategy=args.mutation_strategy, plateau_amplifier=args.mutation_plateau_amp, )
def _rebuild_opt( model: MicroAccordion, args: argparse.Namespace ) -> torch.optim.Optimizer: return torch.optim.AdamW( model.parameters(), lr=args.lr, weight_decay=args.weight_decay )
def _try_grow_heads_blocks( model: MicroAccordion, args: argparse.Namespace, opt: torch.optim.Optimizer, step: int, reason: str, ) -> Tuple[torch.optim.Optimizer, bool]: do_heads = getattr(args, "auto_grow_heads", False) or getattr(args, "auto_grow", False) do_blocks = getattr(args, "auto_grow_blocks", False) or getattr(args, "auto_grow", False) grew: List[str] = [] if do_heads and model.can_grow_heads(): grew.append(f"heads->{model.grow_heads(1)}") if do_blocks: grew.append(f"blocks->{model.grow_blocks(1)}") if grew: opt = _rebuild_opt(model, args) print(f" [grow:{reason}@{step}] {' '.join(grew)}") return opt, bool(grew)
def _try_grow_width( model: MicroAccordion, args: argparse.Namespace, opt: torch.optim.Optimizer, step: int, ) -> Tuple[torch.optim.Optimizer, bool]: every = getattr(args, "width_grow_every", 0) if not every or step % every != 0: return opt, False cap = getattr(args, "width_grow_max", 2048) delta = getattr(args, "width_grow_delta", 16) if model.d_model >= cap: return opt, False old_d, old_h = model.d_model, model.n_heads new_d, new_h = model.grow_width(delta=delta, max_d_model=cap) opt = _rebuild_opt(model, args) n = sum(p.numel() for p in model.parameters()) print( f" [width@{step}] d_model {old_d}->{new_d} " f"heads {old_h}->{new_h} " f"params={n:,}" ) return opt, True
def _try_mutate( model: MicroAccordion, mut: LuminosityMutator, args: argparse.Namespace, plateau_ctr: int, step: int, verbose: bool = False, ) -> None: every = getattr(args, "mutate_every", 0) if not every or step % every != 0: return pr = min(1.0, plateau_ctr / max(1, args.grow_patience_steps)) mi = mut.mutate(model, plateau_ratio=pr) if verbose: print(f" [mutate@{step}] strategy={mi['strategy']} " f"strength={mi['strength']:.5f} hit={mi['params_mutated']}")
def _training_loop( model: MicroAccordion, get_batch, # callable(step) -> (x, y) args: argparse.Namespace, kdb: KernelDB, mut: LuminosityMutator, device: str, codec: ByteCodec, start_step: int, n_steps: int, ) -> Tuple[float, torch.optim.Optimizer, float]: """Inner training loop.""" opt = _rebuild_opt(model, args) best_loss = float("inf") plateau_ctr = 0 ema_loss: Optional[float] = None grow_every = max(1, args.grow_every)
for step in range(start_step, start_step + n_steps): xb, yb = get_batch(step) model.train() logits = model(xb, kernel_db=kdb, step=step, revision_span=args.cache_revision_span, enable_cache=args.enable_cache, allow_train_bypass=args.allow_train_bypass) loss = F.cross_entropy(logits.view(-1, model.vocab_size), yb.view(-1)) opt.zero_grad(set_to_none=True) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip) opt.step() lv = float(loss.item()) ema_loss = lv if ema_loss is None else 0.95 * ema_loss + 0.05 * lv
if ema_loss < best_loss - args.grow_patience_delta: best_loss, plateau_ctr = ema_loss, 0 else: plateau_ctr += 1 opt, wg = _try_grow_width(model, args, opt, step) if wg: plateau_ctr = 0 if args.auto_grow and ( step % grow_every == 0 or plateau_ctr >= args.grow_patience_steps ): reason = ("plateau" if plateau_ctr >= args.grow_patience_steps else "sched") opt, grew = _try_grow_heads_blocks(model, args, opt, step, reason) if grew: plateau_ctr = 0
_try_mutate(model, mut, args, plateau_ctr, step, verbose=(step % args.log_every == 0))
if step % args.log_every == 0 or step == start_step: samp = sample_text( model, codec, args.sample_prompt, args.sample_tokens, device, args.temperature, args.top_k, 1, kdb if args.enable_cache else None, ) print( f"step={step:6d} loss={lv:.4f} ema={ema_loss:.4f} " f"d={model.d_model} B={len(model.blocks)} H={model.n_heads} " f"cache_bypass={kdb.stats['bypasses']} " f"sample={samp[:80]!r}" )
return best_loss, opt, ema_loss if ema_loss is not None else float("inf")
def run_training( model: MicroAccordion, data: torch.Tensor, args: argparse.Namespace, kdb: KernelDB, mut: LuminosityMutator, device: str, codec: Optional[ByteCodec] = None, start_step: int = 1, total_steps: Optional[int] = None, ) -> Tuple[float, dict]: codec = codec or ByteCodec() n = total_steps if total_steps is not None else args.steps
def _get(step: int) -> Tuple[torch.Tensor, torch.Tensor]: return random_batch(data, args.block_size, args.batch_size, device)
best, _, ema = _training_loop( model, _get, args, kdb, mut, device, codec, start_step, n ) return best, dict( last_step=start_step + n - 1, best_ema_loss=best, blocks=len(model.blocks), heads=model.n_heads, d_model=model.d_model, kernel_stats=kdb.stats, mutator_stats=dict(mut.stats), )
def run_stream_training( model: MicroAccordion, stream: StreamCorpus, args: argparse.Namespace, kdb: KernelDB, mut: LuminosityMutator, device: str, codec: ByteCodec, start_step: int = 1, total_steps: int = 10_000, epoch_steps: int = 1_000, checkpoint_path: str = "microaccordion.pt", meta: Optional[dict] = None, ) -> dict: if meta is None: meta = {} epoch_num = meta.get("epoch", 0) steps_done = 0 global_step = start_step best_loss = float("inf")
print( f"\n[stream-train] {total_steps} steps epoch={epoch_steps} " f"d_model={model.d_model} blocks={len(model.blocks)} " f"heads={model.n_heads}\n" )
while steps_done < total_steps: if stream.exhausted: print("[stream] source exhausted — stopping.") break this_epoch = min(epoch_steps, total_steps - steps_done) epoch_num += 1 print( f"── epoch {epoch_num} " f"[{global_step}..{global_step + this_epoch - 1}] " f"d={model.d_model} B={len(model.blocks)} H={model.n_heads} " f"docs={stream.docs_consumed:,} ──" )
def _get(step: int) -> Tuple[torch.Tensor, torch.Tensor]: return stream.sample_batch(args.block_size, args.batch_size, device)
bl, _, _ = _training_loop( model, _get, args, kdb, mut, device, codec, global_step, this_epoch ) if bl < best_loss: best_loss = bl
global_step += this_epoch steps_done += this_epoch
meta.update(dict( last_step=global_step - 1, epoch=epoch_num, best_ema_loss=best_loss, d_model=model.d_model, blocks=len(model.blocks), heads=model.n_heads, docs_consumed=stream.docs_consumed, tokens_consumed=stream.tokens_consumed, mutator_stats=dict(mut.stats), )) save_checkpoint(checkpoint_path, model, codec, kdb, mut, meta) print( f" [ckpt] epoch={epoch_num} " f"steps={steps_done}/{total_steps} " f"best_ema={best_loss:.4f} " f"saved={checkpoint_path}" )
return meta
def train_main(args: argparse.Namespace) -> None: device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu" text, source = load_text(args.text, args.data_url) codec = ByteCodec() data = prepare_data_tensor(text) if getattr(args, "download_out", None) and args.data_url: Path(args.download_out).write_text(text, encoding="utf-8") print(f"downloaded -> {args.download_out}") model = MicroAccordion( vocab_size=codec.vocab_size, block_size=args.block_size, d_model=args.d_model, n_blocks=args.n_blocks, n_heads=args.n_heads, dropout=args.dropout, ).to(device) kdb = _make_kdb(args) mut = _make_mutator(args) _, meta = run_training(model, data, args, kdb, mut, device, codec=codec) meta.update(dict(device=device, data_source=source, corpus_bytes=len(text.encode()))) save_checkpoint(args.checkpoint, model, codec, kdb, mut, meta) print(f"saved -> {args.checkpoint}")
def query_main(args: argparse.Namespace) -> None: device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu" model, codec, kdb, _, _ = load_checkpoint(args.checkpoint, device) print(sample_text( model, codec, args.prompt, args.max_new_tokens, device, args.temperature, args.top_k, args.self_calls, kdb if args.enable_cache else None, ))
def grow_main(args: argparse.Namespace) -> None: device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu" model, codec, kdb, mut, meta = load_checkpoint(args.checkpoint, device) b0, h0, d0 = len(model.blocks), model.n_heads, model.d_model if args.blocks: model.grow_blocks(args.blocks) if args.head_steps: model.grow_heads(args.head_steps) if args.width_delta: model.grow_width(delta=args.width_delta) meta.update(dict(blocks=len(model.blocks), heads=model.n_heads, d_model=model.d_model)) save_checkpoint(args.checkpoint, model, codec, kdb, mut, meta) print(f"blocks {b0}->{len(model.blocks)} heads {h0}->{model.n_heads} " f"d_model {d0}->{model.d_model} saved {args.checkpoint}")
def inspect_main(args: argparse.Namespace) -> None: model, _, kdb, mut, meta = load_checkpoint(args.checkpoint, "cpu") n = sum(p.numel() for p in model.parameters()) print(json.dumps(dict( d_model=model.d_model, block_size=model.block_size, blocks=len(model.blocks), heads=model.n_heads, params=n, approx_4bit_bytes=math.ceil(n / 2), kernel_db_entries=len(kdb.entries), kernel_db_stats=kdb.stats, luminosity_mutator=mut.state(), meta=meta, ), indent=2))
def seed_main(args: argparse.Namespace) -> None: if args.out: Path(args.out).write_text(DEFAULT_CORPUS, encoding="utf-8") print(f"wrote seed -> {args.out}") else: print(DEFAULT_CORPUS)
def crawl_train_main(args: argparse.Namespace) -> None: device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu" codec = ByteCodec() if args.resume and Path(args.checkpoint).exists(): model, codec, kdb, mut, meta = load_checkpoint(args.checkpoint, device) start_step = meta.get("last_step", 0) + 1 print(f"resumed at step {start_step}, " f"d={model.d_model} B={len(model.blocks)} H={model.n_heads}") else: model = MicroAccordion( vocab_size=codec.vocab_size, block_size=args.block_size, d_model=args.d_model, n_blocks=args.n_blocks, n_heads=args.n_heads, dropout=args.dropout, ).to(device) kdb, mut, meta, start_step = _make_kdb(args), _make_mutator(args), {}, 1
print(f"fetching {args.url} ...") t0 = time.time() corpus = fetch_and_strip(args.url) print(f" {len(corpus):,} chars in {time.time()-t0:.1f}s") if getattr(args, "corpus_out", None): Path(args.corpus_out).write_text(corpus, encoding="utf-8")
stream = StreamCorpus(codec).from_text(corpus) meta["url"] = args.url run_stream_training( model=model, stream=stream, args=args, kdb=kdb, mut=mut, device=device, codec=codec, start_step=start_step, total_steps=args.total_steps, epoch_steps=args.epoch_steps, checkpoint_path=args.checkpoint, meta=meta, )
def stream_train_main(args: argparse.Namespace) -> None: """Train on a HuggingFace streaming dataset or URL list.""" device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu" codec = ByteCodec() if args.resume and Path(args.checkpoint).exists(): model, codec, kdb, mut, meta = load_checkpoint(args.checkpoint, device) start_step = meta.get("last_step", 0) + 1 print(f"resumed at step {start_step}, " f"d={model.d_model} B={len(model.blocks)} H={model.n_heads}") else: model = MicroAccordion( vocab_size=codec.vocab_size, block_size=args.block_size, d_model=args.d_model, n_blocks=args.n_blocks, n_heads=args.n_heads, dropout=args.dropout, ).to(device) kdb, mut, meta, start_step = _make_kdb(args), _make_mutator(args), {}, 1
buf_min = getattr(args, "stream_buffer_min", 100_000) buf_max = getattr(args, "stream_buffer_max", 2_000_000) stream = StreamCorpus(codec, buffer_min=buf_min, buffer_max=buf_max)
if getattr(args, "hf_dataset", None): print(f"streaming HuggingFace: {args.hf_dataset} " f"({args.hf_config or 'default'}) split={args.hf_split} " f"field={args.hf_text_field}") stream.from_hf( args.hf_dataset, config=args.hf_config or None, split=args.hf_split, text_field=args.hf_text_field, ) meta["stream_source"] = args.hf_dataset elif getattr(args, "stream_urls", None): urls = [u.strip() for u in args.stream_urls.split(",") if u.strip()] print(f"streaming {len(urls)} URL(s)") stream.from_urls(urls) meta["stream_source"] = args.stream_urls else: print("no stream source given — using built-in corpus") stream.from_text(DEFAULT_CORPUS) meta["stream_source"] = "built-in"
run_stream_training( model=model, stream=stream, args=args, kdb=kdb, mut=mut, device=device, codec=codec, start_step=start_step, total_steps=args.total_steps, epoch_steps=args.epoch_steps, checkpoint_path=args.checkpoint, meta=meta, )
def io_main(args: argparse.Namespace) -> None: device = "cuda" if torch.cuda.is_available() and not args.cpu else "cpu" codec = ByteCodec() if args.checkpoint and Path(args.checkpoint).exists(): model, codec, kdb, mut, meta = load_checkpoint(args.checkpoint, device) corpus, source = ( load_text(args.text, args.data_url) if (args.text or args.data_url) else (DEFAULT_CORPUS, "built-in") ) print(f"loaded {args.checkpoint} " f"d={model.d_model} B={len(model.blocks)} H={model.n_heads}") else: corpus, source = load_text(args.text, args.data_url) model = MicroAccordion( vocab_size=codec.vocab_size, block_size=args.block_size, d_model=args.d_model, n_blocks=args.n_blocks, n_heads=args.n_heads, dropout=args.dropout, ).to(device) kdb = _make_kdb(args) mut = _make_mutator(args) meta = {"data_source": source} print("initialised new model")
print("Commands: /help /train [n] /grow block|head|width [n] " "/mutate [pr] /stats /save [path] /corpus /append TEXT /fetch URL /quit") turn_count = 0
while True: try: line = input("you> ").strip() except (EOFError, KeyboardInterrupt): print("\nbye"); break if not line: continue if line in {"/quit", "/exit"}: break
if line == "/help": print(textwrap.dedent(""" /train [n] /grow block|head|width [n] /mutate [plateau] /stats /save [path] /corpus /append TEXT /fetch URL /quit """).strip()) continue
if line.startswith("/corpus"): print(corpus); continue
if line.startswith("/append "): added = line[8:] corpus += added + "\n" print(f"appended {len(added)} chars"); continue
if line.startswith("/fetch "): url = line[7:].strip() try: fetched = fetch_and_strip(url) corpus += "\n" + fetched print(f"fetched {len(fetched):,} chars from {url}") except Exception as exc: print(f"fetch failed: {exc}") continue
if line.startswith("/train"): parts = shlex.split(line) steps = int(parts[1]) if len(parts) > 1 else args.steps data = prepare_data_tensor(corpus) _, nm = run_training(model, data, args, kdb, mut, device, codec=codec, start_step=turn_count + 1, total_steps=steps) meta.update(nm); turn_count += steps print(f"trained {steps} steps " f"d={model.d_model} B={len(model.blocks)} H={model.n_heads}") continue
if line.startswith("/grow"): parts = shlex.split(line) if len(parts) < 2: print("usage: /grow block|head|width [n]"); continue n = int(parts[2]) if len(parts) > 2 else 1 if parts[1] == "block": print(f"blocks -> {model.grow_blocks(n)}") elif parts[1] == "head": for _ in range(n): if model.can_grow_heads(): model.grow_heads(1) print(f"heads -> {model.n_heads}") elif parts[1] == "width": nd, nh = model.grow_width(delta=n) print(f"d_model->{nd} heads->{nh} " f"params={sum(p.numel() for p in model.parameters()):,}") else: print("usage: /grow block|head|width [n]") continue
if line.startswith("/mutate"): parts = shlex.split(line) pr = float(parts[1]) if len(parts) > 1 else 0.0 mi = mut.mutate(model, plateau_ratio=pr) print(f"[mutator] strategy={mi['strategy']} " f"strength={mi['strength']:.5f} " f"params_hit={mi['params_mutated']}") continue
if line.startswith("/stats"): n = sum(p.numel() for p in model.parameters()) print(json.dumps(dict( d_model=model.d_model, blocks=len(model.blocks), heads=model.n_heads, params=n, approx_4bit_bytes=math.ceil(n / 2), kernel_db_entries=len(kdb.entries), kernel_db_stats=kdb.stats, luminosity_mutator=mut.state(), corpus_chars=len(corpus), ), indent=2)); continue
if line.startswith("/save"): parts = shlex.split(line) p = parts[1] if len(parts) > 1 else (args.checkpoint or "microaccordion.pt") meta.update(dict(turn_count=turn_count, corpus_chars=len(corpus))) save_checkpoint(p, model, codec, kdb, mut, meta) print(f"saved -> {p}"); continue
prompt = f"User: {line}\nAssistant:" reply = sample_text(model, codec, prompt, args.max_new_tokens, device, args.temperature, args.top_k, args.self_calls, kdb if args.enable_cache else None) answer = reply.split("Assistant:", 1)[-1].strip() print(f"bot> {answer}") corpus += f"User: {line}\nAssistant: {answer}\n"
def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( description=( "MicroAccordion — self-growing byte transformer." ) ) sub = p.add_subparsers(dest="cmd", required=True)
def _add_training(sp: argparse.ArgumentParser, io_mode: bool = False) -> None: """Arguments common to all training sub-commands.""" sp.add_argument("--text", default=None) sp.add_argument("--data-url", default=None) sp.add_argument("--download-out", default=None) sp.add_argument("--d-model", type=int, default=32) sp.add_argument("--n-blocks", type=int, default=1) sp.add_argument("--n-heads", type=int, default=1) sp.add_argument("--block-size", type=int, default=48) sp.add_argument("--dropout", type=float, default=0.0) sp.add_argument("--lr", type=float, default=3e-3) sp.add_argument("--weight-decay", type=float, default=0.01) sp.add_argument("--grad-clip", type=float, default=1.0) sp.add_argument("--batch-size", type=int, default=16) sp.add_argument("--steps", type=int, default=600 if not io_mode else 120) sp.add_argument("--log-every", type=int, default=50 if not io_mode else 20) sp.add_argument("--sample-prompt", type=str, default="Luminosity and Gpteus") sp.add_argument("--sample-tokens", type=int, default=48) sp.add_argument("--max-new-tokens",type=int, default=120) sp.add_argument("--temperature", type=float, default=0.9) sp.add_argument("--top-k", type=int, default=16) sp.add_argument("--self-calls", type=int, default=1) sp.add_argument("--enable-cache", action="store_true") sp.add_argument("--allow-train-bypass", action="store_true") sp.add_argument("--cache-ttl", type=int, default=48) sp.add_argument("--cache-high-confidence", type=float, default=0.995) sp.add_argument("--cache-divergence-tol", type=float, default=1e-3) sp.add_argument("--cache-entries", type=int, default=4096) sp.add_argument("--cache-revision-span", type=int, default=32) sp.add_argument("--auto-grow", action="store_true", help="grow heads and blocks") sp.add_argument("--auto-grow-heads", action="store_true") sp.add_argument("--auto-grow-blocks", action="store_true") sp.add_argument("--grow-every", type=int, default=150) sp.add_argument("--grow-patience-steps", type=int, default=120) sp.add_argument("--grow-patience-delta", type=float, default=1e-4) sp.add_argument("--width-grow-every", type=int, default=0, help="expand d_model every N steps") sp.add_argument("--width-grow-delta", type=int, default=16, help="dims added per width growth") sp.add_argument("--width-grow-max", type=int, default=2048, help="d_model cap") sp.add_argument("--mutate-every", type=int, default=0, help="fire mutator every N steps") sp.add_argument("--mutation-rate", type=float, default=0.02) sp.add_argument("--mutation-strength", type=float, default=0.05) sp.add_argument("--mutation-strategy", type=str, default="luminosity", choices=LuminosityMutator.STRATEGIES) sp.add_argument("--mutation-plateau-amp", type=float, default=3.0) sp.add_argument("--cpu", action="store_true")
def _add_stream(sp: argparse.ArgumentParser) -> None: """Extra arguments for streaming sub-commands.""" _add_training(sp) sp.add_argument("--checkpoint", type=str, default="microaccordion.pt") sp.add_argument("--resume", action="store_true") sp.add_argument("--total-steps", type=int, default=3000) sp.add_argument("--epoch-steps", type=int, default=300) sp.add_argument("--stream-buffer-min", type=int, default=100_000) sp.add_argument("--stream-buffer-max", type=int, default=2_000_000) tr = sub.add_parser("train", help="train on a static corpus") tr.add_argument("--checkpoint", type=str, default="microaccordion.pt") _add_training(tr) tr.set_defaults(func=train_main) qu = sub.add_parser("query", help="sample text from a checkpoint") qu.add_argument("--checkpoint", type=str, required=True) qu.add_argument("--prompt", type=str, default="Luminosity and Gpteus") qu.add_argument("--max-new-tokens",type=int, default=120) qu.add_argument("--temperature", type=float, default=0.9) qu.add_argument("--top-k", type=int, default=16) qu.add_argument("--self-calls", type=int, default=1) qu.add_argument("--enable-cache", action="store_true") qu.add_argument("--cpu", action="store_true") qu.set_defaults(func=query_main) gr = sub.add_parser("grow", help="manually grow a saved checkpoint") gr.add_argument("--checkpoint", type=str, required=True) gr.add_argument("--blocks", type=int, default=0, help="add this many blocks") gr.add_argument("--head-steps", type=int, default=0, help="advance heads this many divisor steps") gr.add_argument("--width-delta", type=int, default=0, help="expand d_model by this many dims") gr.add_argument("--cpu", action="store_true") gr.set_defaults(func=grow_main) ins = sub.add_parser("inspect", help="inspect checkpoint stats") ins.add_argument("--checkpoint", type=str, required=True) ins.set_defaults(func=inspect_main) sd = sub.add_parser("seed", help="print or export built-in seed corpus") sd.add_argument("--out", type=str, default=None) sd.set_defaults(func=seed_main) io = sub.add_parser("io", help="interactive talk / train shell") io.add_argument("--checkpoint", type=str, default="microaccordion.pt") _add_training(io, io_mode=True) io.set_defaults(func=io_main) ct = sub.add_parser( "crawl-train", help="fetch one URL, strip HTML, train with full growth schedule", ) ct.add_argument("--url", type=str, required=True) ct.add_argument("--corpus-out", type=str, default=None, help="save stripped plaintext here") _add_stream(ct) ct.set_defaults(func=crawl_train_main) st = sub.add_parser( "stream-train", help=( "train on a HuggingFace stream or URL list" ), ) st.add_argument("--hf-dataset", type=str, default=None, help="HuggingFace dataset name") st.add_argument("--hf-config", type=str, default=None, help="dataset config / subset") st.add_argument("--hf-split", type=str, default="train") st.add_argument("--hf-text-field", type=str, default="text") st.add_argument("--stream-urls", type=str, default=None, help="comma-separated URL list") _add_stream(st) st.set_defaults(func=stream_train_main)
return p
def main() -> None: parser = build_parser() args = parser.parse_args() torch.set_num_threads(min(4, os.cpu_count() or 1)) torch.manual_seed(1337) random.seed(1337) args.func(args)
if __name__ == "__main__": main() |
|
|