Literate source for Python scripts and the weaveback-graph prototype. Tangle pass: dir = "scripts/", gen = "scripts/", << >> delimiters.

install.py

Cross-platform installer. Downloads the current split binaries from GitHub releases (or builds them from source with --source). With --diagrams it also installs a JDK for PlantUML rendering.

// <<@file install.py>>=
#!/usr/bin/env python3
"""
install.py — weaveback installer

Installs the public weaveback binaries from GitHub releases (or builds them
from source). With --diagrams, also installs a JDK so you can use
--plantuml-jar.

Usage:
  python3 install.py [options]

  --diagrams          Also install JDK (for PlantUML diagram rendering)
  --source            Build from source instead of downloading a release
  --prefix DIR        Install binary to DIR
                        default: ~/.local/bin  (Unix)
                                 %LOCALAPPDATA%\\Programs\\weaveback  (Windows)
  --version VER       Install a specific release tag (default: latest)
"""

import argparse
import json
import os
import platform
import shutil
import subprocess
import sys
import tarfile
import tempfile
import urllib.request
from pathlib import Path

REPO = "giannifer7/weaveback"

# ── Output helpers ─────────────────────────────────────────────────────────────

def info(msg):  print(f"  {msg}")
def ok(msg):    print(f"  \u2713 {msg}")
def warn(msg):  print(f"  ! {msg}", file=sys.stderr)
def die(msg):   sys.exit(f"\nError: {msg}")


def run(cmd, *, check=True, **kwargs):
    print(f"  $ {' '.join(str(c) for c in cmd)}")
    return subprocess.run(cmd, check=check, **kwargs)


def which(name):
    return shutil.which(name) is not None


def fetch_json(url):
    req = urllib.request.Request(url, headers={"User-Agent": "weaveback-installer/1"})
    with urllib.request.urlopen(req, timeout=30) as r:
        return json.loads(r.read())


def download(url, dest: Path):
    info(f"Downloading {url}")
    req = urllib.request.Request(url, headers={"User-Agent": "weaveback-installer/1"})
    with urllib.request.urlopen(req, timeout=120) as r, open(dest, "wb") as f:
        total = int(r.headers.get("Content-Length", 0))
        received = 0
        while chunk := r.read(65536):
            f.write(chunk)
            received += len(chunk)
            if total:
                pct = received * 100 // total
                print(f"\r  {pct:3d}%", end="", flush=True)
    print()
    ok(f"Saved {dest.name} ({received // 1024} KB)")


# ── Platform detection ─────────────────────────────────────────────────────────

def detect_platform():
    system  = platform.system()    # 'Linux', 'Darwin', 'Windows'
    machine = platform.machine()   # 'x86_64', 'aarch64', 'AMD64', ...
    arch    = "x86_64" if machine in ("x86_64", "AMD64") else machine.lower()

    pkg_manager = None
    distro_id   = ""
    distro_like = ""

    if system == "Linux":
        try:
            with open("/etc/os-release") as f:
                rel = {}
                for line in f:
                    k, _, v = line.strip().partition("=")
                    rel[k] = v.strip('"')
            distro_id   = rel.get("ID", "").lower()
            distro_like = rel.get("ID_LIKE", "").lower()
        except FileNotFoundError:
            pass

        for pm in ("paru", "yay", "pacman", "apt-get", "dnf", "zypper", "apk"):
            if which(pm):
                pkg_manager = "apt" if pm == "apt-get" else pm
                break

    elif system == "Darwin":
        if which("brew"):
            pkg_manager = "brew"

    elif system == "Windows":
        for pm in ("winget", "choco", "scoop"):
            if which(pm):
                pkg_manager = pm
                break

    return {
        "system":      system,
        "arch":        arch,
        "pkg_manager": pkg_manager,
        "distro_id":   distro_id,
        "distro_like": distro_like,
    }


# ── System dependency tables ───────────────────────────────────────────────────

_JDK = {
    "paru":   ["jdk-openjdk"],
    "yay":    ["jdk-openjdk"],
    "pacman": ["jdk-openjdk"],
    "apt":    ["default-jdk-headless"],
    "dnf":    ["java-latest-openjdk-headless"],
    "zypper": ["java-21-openjdk-headless"],
    "brew":   ["openjdk"],
    "winget": ["Microsoft.OpenJDK.21"],
    "choco":  ["openjdk"],
    "scoop":  ["openjdk"],
}


def _pkg_install(pm, packages):
    if pm in ("paru", "yay"):
        run([pm, "-S", "--needed", "--noconfirm"] + packages)
    elif pm == "pacman":
        run(["sudo", "pacman", "-S", "--needed", "--noconfirm"] + packages)
    elif pm == "apt":
        run(["sudo", "apt-get", "install", "-y"] + packages)
    elif pm == "dnf":
        run(["sudo", "dnf", "install", "-y"] + packages)
    elif pm == "zypper":
        run(["sudo", "zypper", "install", "-y"] + packages)
    elif pm == "brew":
        run(["brew", "install"] + packages)
    elif pm == "winget":
        for pkg in packages:
            run(["winget", "install", "--accept-source-agreements",
                 "--accept-package-agreements", "-e", "--id", pkg])
    elif pm == "choco":
        run(["choco", "install", "-y"] + packages)
    elif pm == "scoop":
        run(["scoop", "install"] + packages)
    else:
        die(f"Unhandled package manager: {pm}")


def install_system_deps(pf, diagrams):
    if not diagrams:
        return
    print("\n\u2500\u2500 System packages \u2500\u2500")
    pm = pf["pkg_manager"]
    if not pm:
        warn("No supported package manager found \u2014 install JDK manually.")
        return
    if which("java"):
        ok("JDK already installed")
    else:
        pkgs = _JDK.get(pm)
        if pkgs:
            _pkg_install(pm, pkgs)
        else:
            warn(f"Don\u2019t know how to install JDK via {pm} \u2014 install manually.")


# ── Binary installation ────────────────────────────────────────────────────────

PUBLIC_BINS = ["wb-tangle", "wb-query", "wb-serve", "wb-mcp"]
ALL_RELEASE_BINS = PUBLIC_BINS + ["weaveback-macro", "weaveback-tangle", "weaveback-docgen"]


def _asset_spec(pf):
    """Return the preferred release asset type for this platform, or None."""
    system, arch = pf["system"], pf["arch"]

    if system == "Windows" and arch == "x86_64":
        return "windows-mingw"

    if system == "Darwin":
        return None   # no macOS binaries yet

    if system == "Linux" and arch == "x86_64":
        return "linux-glibc-tarball"

    return None


def _get_release(version):
    url = f"https://api.github.com/repos/{REPO}/releases/" + \
          ("latest" if version is None else f"tags/{version}")
    try:
        return fetch_json(url)
    except Exception as exc:
        die(f"Could not fetch release info from GitHub: {exc}")


def install_binary_from_release(pf, prefix: Path, version=None):
    print("\n\u2500\u2500 weaveback binaries \u2500\u2500")

    # Arch: prefer AUR
    if pf["distro_id"] in ("arch", "manjaro", "endeavouros", "garuda") \
            and pf["pkg_manager"] in ("paru", "yay"):
        pm = pf["pkg_manager"]
        info(f"Arch Linux \u2014 installing via AUR ({pm} -S weaveback-bin)")
        run([pm, "-S", "--needed", "--noconfirm", "weaveback-bin"])
        return

    want = _asset_spec(pf)
    if want is None:
        if pf["system"] == "Darwin":
            warn("No macOS binary in releases \u2014 build from source:")
            warn("  cargo build --release --workspace")
        else:
            warn(f"No pre-built binary for {pf['system']}/{pf['arch']} \u2014 use --source.")
        return

    release = _get_release(version)
    tag     = release["tag_name"]
    assets  = {a["name"]: a["browser_download_url"] for a in release.get("assets", [])}
    prefix.mkdir(parents=True, exist_ok=True)

    with tempfile.TemporaryDirectory() as tmp:
        if want == "linux-glibc-tarball":
            asset = "weaveback-x86_64-linux.tar.gz"
            if asset not in assets:
                die(f"Expected asset \u2018{asset}\u2019 not found in release {tag}.\n"
                    f"Available: {sorted(assets)}")
            dest = Path(tmp) / asset
            download(assets[asset], dest)
            extract_dir = Path(tmp) / "untar"
            extract_dir.mkdir()
            with tarfile.open(dest, "r:gz") as tar:
                tar.extractall(extract_dir)
            for name in ALL_RELEASE_BINS:
                src = extract_dir / name
                if src.exists():
                    target = prefix / name
                    shutil.copy2(src, target)
                    target.chmod(0o755)
                    ok(f"Installed to {target}")
            _unix_path_hint(prefix)
        elif want == "windows-mingw":
            for name in ALL_RELEASE_BINS:
                asset = f"{name}-mingw64.exe"
                if asset not in assets:
                    die(f"Expected asset \u2018{asset}\u2019 not found in release {tag}.\n"
                        f"Available: {sorted(assets)}")
                target = prefix / f"{name}.exe"
                download(assets[asset], target)
                ok(f"Installed to {target}")
            _windows_add_to_path(prefix)
        else:
            die(f"Unhandled asset selection mode: {want}")


def install_from_source(prefix: Path):
    print("\n\u2500\u2500 Building from source \u2500\u2500")
    if not which("cargo"):
        die("cargo not found \u2014 install Rust from https://rustup.rs")
    run(["cargo", "build", "--release", "--workspace"])
    prefix.mkdir(parents=True, exist_ok=True)
    ext = ".exe" if platform.system() == "Windows" else ""
    for name in ALL_RELEASE_BINS:
        src = Path("target/release") / f"{name}{ext}"
        if not src.exists():
            die(f"Build succeeded but binary not found at {src}")
        target = prefix / src.name
        shutil.copy2(src, target)
        if platform.system() != "Windows":
            target.chmod(0o755)
        ok(f"Installed to {target}")
    if platform.system() != "Windows":
        _unix_path_hint(prefix)
    else:
        _windows_add_to_path(prefix)


def _unix_path_hint(prefix: Path):
    try:
        in_path = any(
            Path(p).resolve() == prefix.resolve()
            for p in os.environ.get("PATH", "").split(":")
            if p
        )
    except Exception:
        in_path = False
    if not in_path:
        warn(f"{prefix} is not in PATH.")
        warn(f"  Add to your shell profile:  export PATH=\"$PATH:{prefix}\"")


def _windows_add_to_path(prefix: Path):
    try:
        import winreg  # type: ignore[import-untyped]
        key = winreg.OpenKey(
            winreg.HKEY_CURRENT_USER, "Environment",
            access=winreg.KEY_READ | winreg.KEY_WRITE,
        )
        try:
            current, _ = winreg.QueryValueEx(key, "Path")
        except FileNotFoundError:
            current = ""
        s = str(prefix)
        if s.lower() not in current.lower():
            winreg.SetValueEx(key, "Path", 0, winreg.REG_EXPAND_SZ,
                              f"{current};{s}" if current else s)
            ok(f"Added {prefix} to user PATH \u2014 restart your terminal")
        else:
            ok(f"{prefix} already in PATH")
        winreg.CloseKey(key)
    except Exception as exc:
        warn(f"Could not update PATH automatically: {exc}")
        warn(f"  Add manually: {prefix}")


# ── Verify ────────────────────────────────────────────────────────────────────

def verify():
    print("\n\u2500\u2500 Verification \u2500\u2500")
    checks = [
        ("wb-tangle", ["wb-tangle", "--version"]),
        ("wb-query", ["wb-query", "--version"]),
        ("wb-serve", ["wb-serve", "--version"]),
        ("wb-mcp", ["wb-mcp", "--version"]),
    ]
    all_ok = True
    for name, cmd in checks:
        try:
            r = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
            if r.returncode == 0:
                ok(f"{name}: {r.stdout.splitlines()[0][:72]}")
            else:
                warn(f"{name}: exited with code {r.returncode}")
                all_ok = False
        except FileNotFoundError:
            warn(f"{name}: not found in PATH")
            all_ok = False
    return all_ok


# ── Main ──────────────────────────────────────────────────────────────────────

def default_prefix():
    if platform.system() == "Windows":
        base = os.environ.get("LOCALAPPDATA", str(Path.home() / "AppData" / "Local"))
        return Path(base) / "Programs" / "weaveback"
    return Path.home() / ".local" / "bin"


def main():
    ap = argparse.ArgumentParser(
        description="Install weaveback split binaries.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""\
Examples:
  python3 install.py                   # install binary from latest release
  python3 install.py --diagrams        # also install JDK for PlantUML
  python3 install.py --source          # build from source (needs cargo)
  python3 install.py --version v0.4.1  # pin to a specific release
""",
    )
    ap.add_argument("--diagrams",  action="store_true",
                    help="Install JDK (for PlantUML support via --plantuml-jar)")
    ap.add_argument("--source",    action="store_true",
                    help="Build weaveback binaries from source (requires Rust/cargo)")
    ap.add_argument("--prefix",    type=Path, default=None, metavar="DIR",
                    help="Directory to install the weaveback binaries")
    ap.add_argument("--version",   default=None, metavar="VER",
                    help="Release tag to install, e.g. v0.4.1 (default: latest)")
    args = ap.parse_args()

    prefix = args.prefix or default_prefix()
    pf     = detect_platform()

    print(f"Platform : {pf['system']} {pf['arch']}"
          + (f"  ({pf['distro_id']})" if pf["distro_id"] else ""))
    print(f"Pkg mgr  : {pf['pkg_manager'] or 'none detected'}")
    print(f"Prefix   : {prefix}")
    print(f"Diagrams : {'yes' if args.diagrams else 'no'}")

    install_system_deps(pf, args.diagrams)

    if args.source:
        install_from_source(prefix)
    else:
        install_binary_from_release(pf, prefix, version=args.version)

    all_good = verify()
    print()
    if all_good:
        print("Installation complete.")
    else:
        print("Installation finished with warnings \u2014 check PATH messages above.")


if __name__ == "__main__":
    main()
// @

gliner_experiment.py

Experimental script: uses the GLiNER NLP model to extract semantic entities (constraints, intents, dependencies) from architecture documentation.

// <<@file gliner_experiment.py>>=
# scripts/gliner_experiment.py
import sys
import os
from gliner import GLiNER

def main():
    # Load GLiNER model (using a widely supported base model for verification)
    print("Loading GLiNER model (numind/gliner-base-v1)...", file=sys.stderr)
    model = GLiNER.from_pretrained("numind/gliner-base-v1")

    # Path to architecture docs
    arch_doc_path = "docs/architecture.adoc"
    if not os.path.exists(arch_doc_path):
        print(f"Error: {arch_doc_path} not found.", file=sys.stderr)
        return

    with open(arch_doc_path, "r") as f:
        text = f.read()

    # Semantic labels based on "Intent and Constraint" extraction
    labels = [
        "Constraint",
        "Invariant",
        "Dependency Relation",
        "Intent",
        "Architectural Component"
    ]

    # Process in smaller chunks (paragraphs) for higher precision
    paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]

    found = {}
    print(f"Extracting semantic links from {len(paragraphs)} paragraphs...", file=sys.stderr)

    for p in paragraphs:
        # Lower threshold for exploration, but we'll print confidence
        entities = model.predict_entities(p, labels, threshold=0.3)
        for entity in entities:
            label = entity["label"]
            text = entity["text"].strip()
            score = entity["score"]

            if label not in found:
                found[label] = {}

            # Keep highest score for each unique text
            if text not in found[label] or score > found[label][text]:
                found[label][text] = score

    print("\n--- GLiNER Semantic Extraction Results ---\n")
    for label in sorted(found.keys()):
        print(f"[{label}]")
        # Sort by confidence
        sorted_items = sorted(found[label].items(), key=lambda x: x[1], reverse=True)
        for item, score in sorted_items:
            print(f"  - {item:<50} (conf: {score:.2f})")
        print()

if __name__ == "__main__":
    main()
// @

weaveback-graph

A prototype knowledge-graph explorer for the weaveback workspace using the real-ladybug graph database.

pyproject.toml

// <<@file weaveback-graph/pyproject.toml>>=
[project]
name = "weaveback-graph"
version = "0.1.0"
description = "Add your description here"
authors = [
    { name = "giannifer7", email = "gianni.ferrarotti@gmail.com" }
]
requires-python = ">=3.14"
dependencies = [
    "real-ladybug>=0.15.2",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
// @

.python-version

// <<@file weaveback-graph/.python-version>>=
3.14
// @

src/weaveback_graph/init.py

// <<@file weaveback-graph/src/weaveback_graph/__init__.py>>=
def hello() -> str:
    return "Hello from weaveback-graph!"
// @

src/weaveback_graph/py.typed

PEP 561 marker file — empty, signals that this package ships type stubs.

// <<@file weaveback-graph/src/weaveback_graph/py.typed>>=
// @

src/weaveback_graph/main.py

// <<@file weaveback-graph/src/weaveback_graph/main.py>>=
import real_ladybug as lbug
import os
import shutil
from pathlib import Path

def setup_schema(conn: lbug.Connection) -> None:
    """Define the Project Knowledge Graph schema."""
    # Node Tables
    conn.execute("CREATE NODE TABLE Crate(name STRING, PRIMARY KEY (name))")
    conn.execute("CREATE NODE TABLE File(path STRING, PRIMARY KEY (path))")
    conn.execute("CREATE NODE TABLE Chunk(name STRING, PRIMARY KEY (name))")

    # Relationship Tables
    conn.execute("CREATE REL TABLE DEPENDS_ON(FROM Crate TO Crate)")
    conn.execute("CREATE REL TABLE OWNS(FROM Crate TO File)")
    conn.execute("CREATE REL TABLE DEFINES(FROM File TO Chunk)")

def populate_graph(conn: lbug.Connection, root: Path) -> None:
    """Populate the graph by scanning Weaveback workspace."""
    crates_dir = root / "crates"
    if not crates_dir.exists():
        print(f"DEBUG: crates dir not found at {crates_dir}")
        return

    # 1. Add Crates
    for crate_path in crates_dir.iterdir():
        if crate_path.is_dir():
            name = crate_path.name
            conn.execute(f"CREATE (c:Crate {{name: '{name}'}})")

            # 2. Add Files owned by Crate
            src_dir = crate_path / "src"
            if src_dir.exists():
                for file_path in src_dir.rglob("*.adoc"):
                    rel_path = file_path.relative_to(root).as_posix()
                    conn.execute(f"CREATE (f:File {{path: '{rel_path}'}})")
                    conn.execute(f"MATCH (c:Crate), (f:File) WHERE c.name = '{name}' AND f.path = '{rel_path}' CREATE (c)-[:OWNS]->(f)")

    # 3. Simple dependency modeling (hardcoded for this example)
    deps = [
        ("weaveback", "weaveback-macro"),
        ("weaveback", "weaveback-tangle"),
        ("weaveback", "weaveback-lsp"),
        ("weaveback-lsp", "weaveback-core"),
        ("weaveback-macro", "weaveback-core"),
        ("weaveback-tangle", "weaveback-core"),
    ]
    for src, dst in deps:
        conn.execute(f"MATCH (a:Crate), (b:Crate) WHERE a.name = '{src}' AND b.name = '{dst}' CREATE (a)-[:DEPENDS_ON]->(b)")

def run_queries(conn: lbug.Connection) -> None:
    """Run example Cypher queries to show the graph's power."""
    print("\n--- Query 1: Which files are owned by the 'weaveback' crate? ---")
    result = conn.execute("MATCH (c:Crate)-[:OWNS]->(f:File) WHERE c.name = 'weaveback' RETURN f.path")
    while result.has_next():
        row = result.get_next()
        print(f"  - {row[0]}")

    print("\n--- Query 2: Transitively find everything that depends on 'weaveback-core' ---")
    result = conn.execute("MATCH (a:Crate)-[:DEPENDS_ON*]->(b:Crate) WHERE b.name = 'weaveback-core' RETURN DISTINCT a.name")
    while result.has_next():
        row = result.get_next()
        print(f"  - {row[0]}")

def main() -> None:
    db_path = "weaveback_memory.lbdb"

    # Robust cleanup
    if os.path.exists(db_path):
        if os.path.isdir(db_path):
            shutil.rmtree(db_path)
        else:
            os.remove(db_path)

    db = lbug.Database(db_path)
    conn = lbug.Connection(db)

    setup_schema(conn)

    # Find project root reliably
    curr = Path(__file__).resolve()
    while curr.name != "weaveback" and curr.parent != curr:
        curr = curr.parent

    project_root = curr
    print(f"Populating graph from workspace: {project_root}...")
    populate_graph(conn, project_root)

    run_queries(conn)

if __name__ == "__main__":
    main()
// @