This note sketches a concrete first layout for a Python-facing agent stack on top of weaveback.

The design goal is narrow on purpose:

  • Rust remains the source of truth for project state, tracing, and mutation.

  • Python becomes the typed orchestration layer for agent loops.

  • Mutations go through a typed ChangePlan, not arbitrary file writes.

  • The core system stays usable without shell-first workflows or editor-specific glue.

The result is a three-layer stack:

  • weaveback-agent-core — pure Rust application API over existing weaveback primitives

  • weaveback-py — a thin PyO3 extension module

  • python/weaveback-agent — a modern Python package with Pydantic models and agent loop

Why This Split

The main risk in adding Python is accidentally creating a second, weaker source of truth. If Python can patch files directly, it will bypass the very thing that makes weaveback interesting: edits are traceable back to literate sources and can be oracle-verified against generated output.

That is why the split is asymmetric:

  • Rust owns reads from the database, source maps, chunk context, and edit application.

  • Python owns typed planning, orchestration, and model integration.

  • The PyO3 boundary is intentionally thin so it is easy to audit.

This also keeps the future Helix side clean. Helix can render plans, previews, and diagnostics, but it does not need to become the execution substrate.

Why maturin + uv

For a new Python extension in 2026, the practical default is:

  • maturin for building and publishing the PyO3 extension

  • uv for environment management, dependency locking, and task execution

  • src/ layout for the Python package

This avoids the older split where Rust packaging and Python packaging drift apart. maturin keeps the extension story standard, and uv keeps Python tooling reproducible without turning the system into a shell script nest.

System Diagram

The architecture is easiest to reason about if reads and writes are shown as separate channels.

D2 diagram

Mutation Sequence

The write path should stay boring and explicit.

PlantUML diagram

Workspace Shape

The first pass should add two Rust crates and one Python project:

crates/
  weaveback-agent-core/
    Cargo.toml
    src/
      lib.rs
      workspace.rs
      read_api.rs
      change_plan.rs
      apply.rs
  weaveback-py/
    Cargo.toml
    src/
      lib.rs
python/
  weaveback-agent/
    pyproject.toml
    README.md
    src/
      weaveback_agent/
        __init__.py
        models.py
        agent.py

The separation between weaveback-agent-core and weaveback-py matters for one reason: the Rust API should still be callable from Rust tests, from a future local HTTP service, or from the existing MCP server without Python being on the critical path.

Canonical Sources

This page is an architecture note. It explains the design and shows representative code excerpts, but it does not assemble files.

The canonical literate sources live with the crates they explain:

The Python package under python/weaveback-agent/ is ordinary source rather than a literate subtree. The snippets below describe that public surface; they are not another source of truth.

Rust Core Crate

The core crate is where the agent-facing API becomes explicit. It should not contain PyO3 types, and it should not know anything about Pydantic.

Cargo.toml

[package]
name = "weaveback-agent-core"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true

[dependencies]
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
ureq.workspace = true

weaveback-core.workspace = true
weaveback-lsp.workspace = true
weaveback-macro.workspace = true
weaveback-tangle.workspace = true

Public Surface

The public surface should look like an application service, not like a bag of free functions. A Workspace can cache configuration and reusable handles.

pub mod apply;
pub mod change_plan;
pub mod read_api;
pub mod workspace;

pub use apply::{ApplyResult, ChangePreview, PlanValidation};
pub use change_plan::{ChangePlan, ChangeTarget, PlannedEdit};
pub use read_api::{ChunkContext, SearchHit, TraceResult};
pub use workspace::{Session, Workspace, WorkspaceConfig};

Workspace API

The Workspace type gives Python one stable object to hold. It prevents the extension layer from growing a sprawling function namespace, and it gives Rust a place to manage db paths, gen_dir, project root, and future caches.

use crate::apply::{apply_change_plan, preview_change_plan, validate_change_plan, ApplyResult, ChangePreview, PlanValidation};
use crate::change_plan::ChangePlan;
use crate::read_api::{ChunkContext, SearchHit, TraceResult};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkspaceConfig {
    pub project_root: PathBuf,
    pub db_path: PathBuf,
    pub gen_dir: PathBuf,
}

#[derive(Debug, Clone)]
pub struct Workspace {
    config: WorkspaceConfig,
}

#[derive(Debug, Clone)]
pub struct Session {
    config: WorkspaceConfig,
}

impl Workspace {
    pub fn open(config: WorkspaceConfig) -> Self {
        Self { config }
    }

    pub fn session(&self) -> Session {
        Session {
            config: self.config.clone(),
        }
    }
}

impl Session {
    pub fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchHit>, String> {
        crate::read_api::search(&self.config, query, limit)
    }

    pub fn trace(&self, out_file: &str, out_line: u32, out_col: u32) -> Result<Option<TraceResult>, String> {
        crate::read_api::trace(&self.config, out_file, out_line, out_col)
    }

    pub fn chunk_context(&self, file: &str, name: &str, nth: u32) -> Result<ChunkContext, String> {
        crate::read_api::chunk_context(&self.config, file, name, nth)
    }

    pub fn validate_change_plan(&self, plan: &ChangePlan) -> Result<PlanValidation, String> {
        validate_change_plan(&self.config, plan)
    }

    pub fn preview_change_plan(&self, plan: &ChangePlan) -> Result<ChangePreview, String> {
        preview_change_plan(&self.config, plan)
    }

    pub fn apply_change_plan(&self, plan: &ChangePlan) -> Result<ApplyResult, String> {
        apply_change_plan(&self.config, plan)
    }
}

Change Plan Types

The edit model has to be strict enough that Rust can reject unsafe plans before the system mutates anything. The main design choice is that the plan targets source ranges but also carries an output anchor for oracle verification.

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangePlan {
    pub plan_id: String,
    pub goal: String,
    pub constraints: Vec<String>,
    pub edits: Vec<PlannedEdit>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlannedEdit {
    pub edit_id: String,
    pub rationale: String,
    pub target: ChangeTarget,
    pub new_src_lines: Vec<String>,
    pub anchor: OutputAnchor,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeTarget {
    pub src_file: String,
    pub src_line: usize,
    pub src_line_end: usize,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OutputAnchor {
    pub out_file: String,
    pub out_line: u32,
    pub expected_output: String,
}

Read Models

The read API should return plain serde models. That keeps the PyO3 layer thin and lets the same types flow through tests and future transports.

use crate::workspace::WorkspaceConfig;
use serde::{Deserialize, Serialize};
use weaveback_core::PathResolver;
use weaveback_macro::evaluator::output::{PreciseTracingOutput, SourceSpan, SpanKind, SpanRange};
use weaveback_macro::evaluator::{EvalConfig, Evaluator};
use weaveback_macro::macro_api::process_string_precise;
use weaveback_tangle::db::WeavebackDb;
use weaveback_tangle::lookup::{find_best_noweb_entry, find_line_col};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SearchHit {
    pub src_file: String,
    pub block_type: String,
    pub line_start: usize,
    pub line_end: usize,
    pub snippet: String,
    pub tags: Vec<String>,
    pub score: f64,
    pub channels: Vec<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TraceResult {
    pub out_file: String,
    pub out_line: u32,
    pub src_file: Option<String>,
    pub src_line: Option<u32>,
    pub src_col: Option<u32>,
    pub kind: Option<String>,
    pub macro_name: Option<String>,
    pub param_name: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChunkContext {
    pub file: String,
    pub name: String,
    pub nth: u32,
    pub section_breadcrumb: Vec<String>,
    pub prose: String,
    pub body: String,
    pub direct_dependencies: Vec<String>,
    pub outputs: Vec<String>,
}

fn open_db(config: &WorkspaceConfig) -> Result<WeavebackDb, String> {
    if !config.db_path.exists() {
        return Err(format!(
            "Database not found at {}. Run weaveback on your source files first.",
            config.db_path.display()
        ));
    }
    WeavebackDb::open_read_only(&config.db_path).map_err(|e| e.to_string())
}

fn build_eval_config() -> EvalConfig {
    EvalConfig::default()
}

fn span_at_line<'a>(
    expanded: &str,
    ranges: &'a [SpanRange],
    line_0: u32,
    col_char_0: u32,
) -> Option<&'a SourceSpan> {
    let line_start = if line_0 == 0 {
        0usize
    } else {
        let mut count = 0u32;
        let mut found = None;
        for (idx, byte) in expanded.bytes().enumerate() {
            if byte == b'\n' {
                count += 1;
                if count == line_0 {
                    found = Some(idx + 1);
                    break;
                }
            }
        }
        found?
    };

    let line_text = &expanded[line_start..];
    let byte_col = line_text
        .char_indices()
        .nth(col_char_0 as usize)
        .map(|(idx, _)| idx)
        .unwrap_or(line_text.len());

    PreciseTracingOutput::span_at_byte(ranges, line_start + byte_col)
}

fn trace_result_from_span(result: &mut TraceResult, span: &SourceSpan, evaluator: &Evaluator) {
    let source_manager = evaluator.sources();
    let Some(src_path) = source_manager.source_files().get(span.src as usize) else {
        return;
    };
    let Some(src_bytes) = source_manager.get_source(span.src) else {
        return;
    };
    let src_content = String::from_utf8_lossy(src_bytes);
    let (src_line, src_col) = find_line_col(&src_content, span.pos);

    result.src_file = Some(src_path.to_string_lossy().into_owned());
    result.src_line = Some(src_line);
    result.src_col = Some(src_col);
    result.kind = Some(match &span.kind {
        SpanKind::Literal => "Literal",
        SpanKind::MacroBody { .. } => "MacroBody",
        SpanKind::MacroArg { .. } => "MacroArg",
        SpanKind::VarBinding { .. } => "VarBinding",
        SpanKind::Computed => "Computed",
    }
    .to_string());

    match &span.kind {
        SpanKind::MacroBody { macro_name } => {
            result.macro_name = Some(macro_name.clone());
        }
        SpanKind::MacroArg { macro_name, param_name } => {
            result.macro_name = Some(macro_name.clone());
            result.param_name = Some(param_name.clone());
        }
        SpanKind::VarBinding { .. } | SpanKind::Literal | SpanKind::Computed => {}
    }
}

fn heading_level(line: &str) -> Option<usize> {
    let trimmed = line.trim_end();
    if trimmed.is_empty() {
        return None;
    }
    let count = trimmed.bytes().take_while(|&b| b == b'=').count();
    if count > 0 && trimmed.len() > count && trimmed.as_bytes()[count] == b' ' {
        Some(count)
    } else {
        None
    }
}

fn section_range(lines: &[&str], def_start: usize) -> (usize, usize) {
    let mut sec_start = 0usize;
    let mut sec_level = 1usize;
    for idx in (0..def_start).rev() {
        if let Some(level) = heading_level(lines[idx]) {
            sec_start = idx;
            sec_level = level;
            break;
        }
    }

    let sec_end = lines[def_start..]
        .iter()
        .enumerate()
        .skip(1)
        .find(|(_, line)| heading_level(line).is_some_and(|level| level <= sec_level))
        .map(|(idx, _)| def_start + idx)
        .unwrap_or(lines.len());

    (sec_start, sec_end)
}

fn title_chain(lines: &[&str], def_start: usize) -> Vec<String> {
    let mut chain = Vec::new();
    for line in lines.iter().take(def_start) {
        if let Some(level) = heading_level(line) {
            let title = line[level + 1..].trim().to_string();
            chain.retain(|(existing_level, _): &(usize, String)| *existing_level < level);
            chain.push((level, title));
        }
    }
    chain.into_iter().map(|(_, title)| title).collect()
}

fn extract_prose(lines: &[&str], start: usize, end: usize) -> String {
    let end = end.min(lines.len());
    let mut in_fence = false;
    let mut out = Vec::new();

    for line in lines.iter().take(end).skip(start) {
        if line.trim() == "----" {
            in_fence = !in_fence;
            continue;
        }
        if !in_fence {
            out.push(*line);
        }
    }

    while out.first().is_some_and(|line| line.trim().is_empty()) {
        out.remove(0);
    }
    while out.last().is_some_and(|line| line.trim().is_empty()) {
        out.pop();
    }

    out.join("\n")
}

fn prepare_fts_query(query: &str) -> String {
    if query.contains('"')
        || query.contains(" AND ")
        || query.contains(" OR ")
        || query.contains(" NOT ")
    {
        return query.to_owned();
    }

    query
        .split_whitespace()
        .map(|token| {
            let safe = token
                .chars()
                .all(|char| char.is_alphanumeric() || char == '*' || char == '^');
            if safe {
                token.to_owned()
            } else {
                format!("\"{}\"", token.replace('"', "\"\""))
            }
        })
        .collect::<Vec<_>>()
        .join(" ")
}

fn reciprocal_rank(rank: usize) -> f64 {
    1.0 / (60.0 + rank as f64)
}

fn call_openai_embeddings(
    api_key: Option<&str>,
    base_url: &str,
    model: &str,
    inputs: &[String],
) -> Result<Vec<Vec<f32>>, String> {
    let url = format!("{}/embeddings", base_url.trim_end_matches('/'));
    let body = serde_json::json!({
        "model": model,
        "input": inputs,
    });
    let mut req = ureq::AgentBuilder::new()
        .build()
        .post(&url)
        .set("content-type", "application/json");
    if let Some(key) = api_key {
        req = req.set("Authorization", &format!("Bearer {key}"));
    }
    let resp = req.send_json(&body).map_err(|e| e.to_string())?;
    let value: serde_json::Value = resp.into_json().map_err(|e| e.to_string())?;
    let Some(items) = value.get("data").and_then(|v| v.as_array()) else {
        return Err(format!("unexpected embedding response: {value}"));
    };
    items.iter()
        .map(|item| {
            let Some(embedding) = item.get("embedding").and_then(|v| v.as_array()) else {
                return Err(format!("missing embedding array: {item}"));
            };
            embedding.iter()
                .map(|v| v.as_f64().map(|n| n as f32).ok_or_else(|| format!("invalid embedding value: {v}")))
                .collect()
        })
        .collect()
}

fn call_gemini_embeddings(
    api_key: &str,
    model: &str,
    inputs: &[String],
) -> Result<Vec<Vec<f32>>, String> {
    let url = format!(
        "https://generativelanguage.googleapis.com/v1beta/models/{}:batchEmbedContents?key={}",
        model,
        api_key,
    );
    let requests: Vec<serde_json::Value> = inputs
        .iter()
        .map(|text| {
            serde_json::json!({
                "model": format!("models/{model}"),
                "content": {
                    "parts": [{ "text": text }]
                }
            })
        })
        .collect();
    let body = serde_json::json!({ "requests": requests });
    let resp = ureq::AgentBuilder::new()
        .build()
        .post(&url)
        .set("content-type", "application/json")
        .send_json(&body)
        .map_err(|e| e.to_string())?;
    let value: serde_json::Value = resp.into_json().map_err(|e| e.to_string())?;
    let Some(items) = value.get("embeddings").and_then(|v| v.as_array()) else {
        return Err(format!("unexpected Gemini embedding response: {value}"));
    };
    items.iter()
        .map(|item| {
            let Some(values) = item.get("values").and_then(|v| v.as_array()) else {
                return Err(format!("missing Gemini embedding values: {item}"));
            };
            values.iter()
                .map(|v| v.as_f64().map(|n| n as f32).ok_or_else(|| format!("invalid embedding value: {v}")))
                .collect()
        })
        .collect()
}

fn embed_query(db: &WeavebackDb, query: &str) -> Result<Option<Vec<f32>>, String> {
    let Some(model) = db.get_run_config("semantic.model").map_err(|e| e.to_string())? else {
        return Ok(None);
    };
    let backend = db
        .get_run_config("semantic.backend")
        .map_err(|e| e.to_string())?
        .unwrap_or_else(|| "openai".to_string());
    let endpoint = db.get_run_config("semantic.endpoint").map_err(|e| e.to_string())?;
    let query_vec = match backend.as_str() {
        "gemini" => {
            let key = std::env::var("GOOGLE_API_KEY")
                .map_err(|_| "GOOGLE_API_KEY not set".to_string())?;
            call_gemini_embeddings(&key, &model, &[query.to_string()])?
        }
        "ollama" => {
            let base = endpoint.as_deref().filter(|v| !v.is_empty()).unwrap_or("http://localhost:11434/v1");
            call_openai_embeddings(None, base, &model, &[query.to_string()])?
        }
        "anthropic" => return Ok(None),
        _ => {
            let key = std::env::var("OPENAI_API_KEY").ok();
            let base = endpoint.as_deref().filter(|v| !v.is_empty()).unwrap_or("https://api.openai.com/v1");
            call_openai_embeddings(key.as_deref(), base, &model, &[query.to_string()])?
        }
    };
    Ok(query_vec.into_iter().next())
}

pub fn search(config: &WorkspaceConfig, query: &str, limit: usize) -> Result<Vec<SearchHit>, String> {
    let db = open_db(config)?;
    let fts_query = prepare_fts_query(query);
    let lexical = db.search_prose(&fts_query, limit.saturating_mul(4)).map_err(|e| e.to_string())?;
    let semantic = embed_query(&db, query)
        .ok()
        .flatten()
        .and_then(|query_embedding| db.search_prose_by_embedding(&query_embedding, limit.saturating_mul(4)).ok())
        .unwrap_or_default();

    let mut merged: std::collections::BTreeMap<(String, String, usize, usize), SearchHit> = std::collections::BTreeMap::new();

    for (idx, result) in lexical.into_iter().enumerate() {
        let key = (
            result.src_file.clone(),
            result.block_type.clone(),
            result.line_start as usize,
            result.line_end as usize,
        );
        let entry = merged.entry(key).or_insert_with(|| SearchHit {
            src_file: result.src_file.clone(),
            block_type: result.block_type.clone(),
            line_start: result.line_start as usize,
            line_end: result.line_end as usize,
            snippet: result.snippet.clone(),
            tags: result
                .tags
                .split(',')
                .map(str::trim)
                .filter(|tag| !tag.is_empty())
                .map(str::to_string)
                .collect(),
            score: 0.0,
            channels: Vec::new(),
        });
        entry.score += reciprocal_rank(idx + 1);
        if !entry.channels.iter().any(|channel| channel == "fts") {
            entry.channels.push("fts".to_string());
        }
    }

    for (idx, result) in semantic.into_iter().enumerate() {
        let key = (
            result.src_file.clone(),
            result.block_type.clone(),
            result.line_start as usize,
            result.line_end as usize,
        );
        let entry = merged.entry(key).or_insert_with(|| SearchHit {
            src_file: result.src_file.clone(),
            block_type: result.block_type.clone(),
            line_start: result.line_start as usize,
            line_end: result.line_end as usize,
            snippet: result.snippet.clone(),
            tags: result
                .tags
                .split(',')
                .map(str::trim)
                .filter(|tag| !tag.is_empty())
                .map(str::to_string)
                .collect(),
            score: 0.0,
            channels: Vec::new(),
        });
        entry.score += reciprocal_rank(idx + 1) + f64::from(result.score.max(0.0)) * 0.25;
        if !entry.channels.iter().any(|channel| channel == "semantic") {
            entry.channels.push("semantic".to_string());
        }
    }

    let mut hits: Vec<SearchHit> = merged.into_values().collect();
    hits.sort_by(|lhs, rhs| {
        rhs.score
            .partial_cmp(&lhs.score)
            .unwrap_or(std::cmp::Ordering::Equal)
            .then_with(|| lhs.src_file.cmp(&rhs.src_file))
            .then_with(|| lhs.line_start.cmp(&rhs.line_start))
    });
    hits.truncate(limit);
    Ok(hits)
}

pub fn trace(
    config: &WorkspaceConfig,
    out_file: &str,
    out_line: u32,
    out_col: u32,
) -> Result<Option<TraceResult>, String> {
    if out_line == 0 {
        return Err("out_line must be >= 1".to_string());
    }

    let db = open_db(config)?;
    let resolver = PathResolver::new(config.project_root.clone(), config.gen_dir.clone());
    let eval_config = build_eval_config();
    let out_line_0 = out_line - 1;
    let nw_entry = match find_best_noweb_entry(&db, out_file, out_line_0, &resolver)
        .map_err(|e| e.to_string())?
    {
        Some(entry) => entry,
        None => return Ok(None),
    };

    let mut result = TraceResult {
        out_file: out_file.to_string(),
        out_line,
        src_file: None,
        src_line: None,
        src_col: None,
        kind: None,
        macro_name: None,
        param_name: None,
    };

    let src_path = resolver.resolve_src(&nw_entry.src_file);
    let src_content = if let Ok(Some(bytes)) = db.get_src_snapshot(&nw_entry.src_file) {
        String::from_utf8_lossy(&bytes).into_owned()
    } else {
        match std::fs::read_to_string(&src_path) {
            Ok(content) => content,
            Err(_) => return Ok(Some(result)),
        }
    };

    let mut evaluator = Evaluator::new(eval_config);
    if let Ok((expanded, ranges)) = process_string_precise(&src_content, Some(&src_path), &mut evaluator) {
        let indent_char_len = nw_entry.indent.chars().count() as u32;
        let col_1 = out_col.max(1);
        if col_1 > indent_char_len {
            let adjusted_col_0 = col_1 - 1 - indent_char_len;
            if let Some(span) = span_at_line(&expanded, &ranges, nw_entry.src_line, adjusted_col_0) {
                trace_result_from_span(&mut result, span, &evaluator);
            }
        }
    }

    Ok(Some(result))
}

pub fn chunk_context(
    config: &WorkspaceConfig,
    file: &str,
    name: &str,
    nth: u32,
) -> Result<ChunkContext, String> {
    let db = open_db(config)?;
    let entry = db
        .get_chunk_def(file, name, nth)
        .map_err(|e| e.to_string())?
        .ok_or_else(|| format!("Chunk not found: {file}#{name}[{nth}]"))?;

    let src_path = config.project_root.join(file);
    let src_text = std::fs::read_to_string(&src_path)
        .map_err(|e| format!("Cannot read {}: {e}", src_path.display()))?;
    let src_lines: Vec<&str> = src_text.lines().collect();
    let def_start = entry.def_start as usize;
    let def_end = entry.def_end as usize;

    let body = if def_start < src_lines.len() && def_end <= src_lines.len() && def_end > 0 {
        src_lines[def_start..def_end - 1].join("\n")
    } else {
        String::new()
    };

    let section_breadcrumb = title_chain(&src_lines, def_start);
    let (sec_start, sec_end) = section_range(&src_lines, def_start);
    let prose = extract_prose(&src_lines, sec_start, sec_end);
    let raw_deps = db.query_chunk_deps(name).map_err(|e| e.to_string())?;
    let direct_dependencies = raw_deps.into_iter().map(|(name, _)| name).collect();
    let outputs = db.query_chunk_output_files(name).map_err(|e| e.to_string())?;

    Ok(ChunkContext {
        file: file.to_string(),
        name: name.to_string(),
        nth,
        section_breadcrumb,
        prose,
        body,
        direct_dependencies,
        outputs,
    })
}

Validation and Apply Path

The validation step is where the system enforces policy. The write path should be impossible to reach without a valid plan.

use crate::change_plan::ChangePlan;
use crate::workspace::WorkspaceConfig;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashSet};
use weaveback_core::PathResolver;
use weaveback_macro::evaluator::{EvalConfig, Evaluator};
use weaveback_macro::macro_api::process_string;
use weaveback_tangle::db::WeavebackDb;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanValidation {
    pub ok: bool,
    pub issues: Vec<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PreviewedEdit {
    pub edit_id: String,
    pub oracle_ok: bool,
    pub src_before: Vec<String>,
    pub src_after: Vec<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangePreview {
    pub plan_id: String,
    pub edits: Vec<PreviewedEdit>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApplyResult {
    pub plan_id: String,
    pub applied: bool,
    pub applied_edit_ids: Vec<String>,
    pub failed_edit_ids: Vec<String>,
}

struct ApplyFixRequest<'a> {
    src_file: &'a str,
    src_line_1: usize,
    src_line_end_1: usize,
    new_lines: &'a [String],
    out_file: &'a str,
    out_line_1: u32,
    expected: &'a str,
    write_changes: bool,
}

fn open_db(config: &WorkspaceConfig) -> Result<WeavebackDb, String> {
    if !config.db_path.exists() {
        return Err(format!(
            "Database not found at {}. Run weaveback on your source files first.",
            config.db_path.display()
        ));
    }
    WeavebackDb::open_read_only(&config.db_path).map_err(|e| e.to_string())
}

fn build_eval_config() -> EvalConfig {
    EvalConfig::default()
}

fn sorted_plan_edit_ids(plan: &ChangePlan) -> Vec<usize> {
    let mut indices: Vec<usize> = (0..plan.edits.len()).collect();
    indices.sort_by(|left, right| {
        let left_edit = &plan.edits[*left];
        let right_edit = &plan.edits[*right];
        left_edit
            .target
            .src_file
            .cmp(&right_edit.target.src_file)
            .then_with(|| right_edit.target.src_line.cmp(&left_edit.target.src_line))
            .then_with(|| right_edit.target.src_line_end.cmp(&left_edit.target.src_line_end))
    });
    indices
}

fn apply_fix_impl(
    request: ApplyFixRequest<'_>,
    db: &WeavebackDb,
    resolver: &PathResolver,
    eval_config: &EvalConfig,
) -> Result<PreviewedEdit, String> {
    let db_path = resolver.normalize(request.out_file);
    let nw_entry = db
        .get_noweb_entry(&db_path, request.out_line_1 - 1)
        .map_err(|e| format!("db error: {e}"))?
        .ok_or_else(|| format!("No noweb map entry for {}:{}", request.out_file, request.out_line_1))?;

    let expanded_line_1 = nw_entry.src_line as usize + 1;
    let content = std::fs::read_to_string(request.src_file)
        .map_err(|e| format!("Cannot read {}: {e}", request.src_file))?;
    let orig_lines: Vec<&str> = content.lines().collect();
    let file_len = orig_lines.len();

    if request.src_line_1 == 0 || request.src_line_1 > file_len {
        return Err(format!("src_line {} out of range (file has {file_len} lines)", request.src_line_1));
    }
    if request.src_line_end_1 > file_len {
        return Err(format!("src_line_end {} out of range (file has {file_len} lines)", request.src_line_end_1));
    }
    if request.src_line_end_1 < request.src_line_1 {
        return Err("src_line_end must be >= src_line".to_string());
    }

    let lo = request.src_line_1 - 1;
    let hi = request.src_line_end_1 - 1;
    let src_before: Vec<String> = orig_lines[lo..=hi].iter().map(|line| (*line).to_string()).collect();

    let patched_lines: Vec<&str> = orig_lines[..lo]
        .iter().copied()
        .chain(request.new_lines.iter().map(String::as_str))
        .chain(orig_lines[hi + 1..].iter().copied())
        .collect();

    let had_trailing_newline = content.ends_with('\n');
    let mut patched = patched_lines.join("\n");
    if had_trailing_newline {
        patched.push('\n');
    }

    let oracle_path = std::path::Path::new(request.src_file).with_file_name("<oracle>");
    let mut oracle_config = eval_config.clone();
    if let Ok(Some(cfg)) = weaveback_tangle::lookup::find_best_source_config(db, request.src_file) {
        oracle_config.sigil = cfg.sigil;
    }

    let mut evaluator = Evaluator::new(oracle_config);
    let expanded_bytes = process_string(&patched, Some(&oracle_path), &mut evaluator)
        .map_err(|e| format!("Evaluation error: {e:?}"))?;
    let expanded = String::from_utf8_lossy(&expanded_bytes);

    let actual_line = expanded.lines().nth(expanded_line_1 - 1)
        .ok_or_else(|| format!("Expanded output has fewer than {expanded_line_1} lines"))?;

    if actual_line != request.expected {
        return Err(format!(
            "Oracle check failed for {}:{}-{}: got {:?}, expected {:?}",
            request.src_file, request.src_line_1, request.src_line_end_1, actual_line, request.expected
        ));
    }

    if request.write_changes {
        std::fs::write(request.src_file, &patched)
            .map_err(|e| format!("Cannot write {}: {e}", request.src_file))?;
    }

    Ok(PreviewedEdit {
        edit_id: String::new(),
        oracle_ok: true,
        src_before,
        src_after: request.new_lines.to_vec(),
    })
}

pub fn validate_change_plan(_config: &WorkspaceConfig, plan: &ChangePlan) -> Result<PlanValidation, String> {
    let mut issues = Vec::new();
    let mut ids = HashSet::new();
    let mut per_file_ranges: BTreeMap<&str, Vec<(usize, usize, &str)>> = BTreeMap::new();

    if plan.edits.is_empty() {
        issues.push("plan must contain at least one edit".to_string());
    }

    for edit in &plan.edits {
        if !ids.insert(edit.edit_id.as_str()) {
            issues.push(format!("duplicate edit_id: {}", edit.edit_id));
        }
        if edit.target.src_line == 0 || edit.target.src_line_end < edit.target.src_line {
            issues.push(format!("{} has an invalid source range", edit.edit_id));
        }

        if edit.anchor.out_line == 0 {
            issues.push(format!("{} has an invalid output anchor", edit.edit_id));
        }

        per_file_ranges
            .entry(edit.target.src_file.as_str())
            .or_default()
            .push((edit.target.src_line, edit.target.src_line_end, edit.edit_id.as_str()));
    }

    for (src_file, ranges) in &mut per_file_ranges {
        ranges.sort_by_key(|(start, end, _)| (*start, *end));
        for pair in ranges.windows(2) {
            let (left_start, left_end, left_id) = pair[0];
            let (right_start, right_end, right_id) = pair[1];
            if right_start <= left_end {
                issues.push(format!(
                    "overlapping edits in {}: {} ({}-{}) overlaps {} ({}-{})",
                    src_file, left_id, left_start, left_end, right_id, right_start, right_end
                ));
            }
        }
    }

    Ok(PlanValidation {
        ok: issues.is_empty(),
        issues,
    })
}

pub fn preview_change_plan(config: &WorkspaceConfig, plan: &ChangePlan) -> Result<ChangePreview, String> {
    let validation = validate_change_plan(config, plan)?;
    if !validation.ok {
        return Err(validation.issues.join("\n"));
    }

    let db = open_db(config)?;
    let resolver = PathResolver::new(config.project_root.clone(), config.gen_dir.clone());
    let eval_config = build_eval_config();
    let mut previews = Vec::new();

    for idx in sorted_plan_edit_ids(plan) {
        let edit = &plan.edits[idx];
        let mut preview = apply_fix_impl(
            ApplyFixRequest {
                src_file: &edit.target.src_file,
                src_line_1: edit.target.src_line,
                src_line_end_1: edit.target.src_line_end,
                new_lines: &edit.new_src_lines,
                out_file: &edit.anchor.out_file,
                out_line_1: edit.anchor.out_line,
                expected: &edit.anchor.expected_output,
                write_changes: false,
            },
            &db,
            &resolver,
            &eval_config,
        )?;
        preview.edit_id = edit.edit_id.clone();
        previews.push(preview);
    }

    Ok(ChangePreview {
        plan_id: plan.plan_id.clone(),
        edits: previews,
    })
}

pub fn apply_change_plan(config: &WorkspaceConfig, plan: &ChangePlan) -> Result<ApplyResult, String> {
    let validation = validate_change_plan(config, plan)?;
    if !validation.ok {
        return Err(validation.issues.join("\n"));
    }

    let db = open_db(config)?;
    let resolver = PathResolver::new(config.project_root.clone(), config.gen_dir.clone());
    let eval_config = build_eval_config();
    let mut applied_edit_ids = Vec::new();
    let mut failed_edit_ids = Vec::new();

    for idx in sorted_plan_edit_ids(plan) {
        let edit = &plan.edits[idx];
        match apply_fix_impl(
            ApplyFixRequest {
                src_file: &edit.target.src_file,
                src_line_1: edit.target.src_line,
                src_line_end_1: edit.target.src_line_end,
                new_lines: &edit.new_src_lines,
                out_file: &edit.anchor.out_file,
                out_line_1: edit.anchor.out_line,
                expected: &edit.anchor.expected_output,
                write_changes: true,
            },
            &db,
            &resolver,
            &eval_config,
        ) {
            Ok(_) => applied_edit_ids.push(edit.edit_id.clone()),
            Err(_) => failed_edit_ids.push(edit.edit_id.clone()),
        }
    }

    Ok(ApplyResult {
        plan_id: plan.plan_id.clone(),
        applied: failed_edit_ids.is_empty(),
        applied_edit_ids,
        failed_edit_ids,
    })
}

PyO3 Crate

The PyO3 layer should be as dumb as possible. It should convert Python inputs into Rust structs, call weaveback-agent-core, and convert serde outputs back into Python values.

This is the wrong place for planning logic, prompt policy, or change-plan semantics.

Because the Python floor here is 3.14, the extension crate should track the current PyO3 line rather than pinning an older compatibility window.

Cargo.toml

[package]
name = "weaveback-py"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true

[lib]
name = "_weaveback"
crate-type = ["cdylib"]

[dependencies]
pyo3.workspace = true
pythonize.workspace = true
serde.workspace = true
serde_json.workspace = true
weaveback-agent-core.workspace = true

Extension Module

The methods return Python-native structures, but the real schemas live in the Python package. That gives Python room to evolve the public Pydantic models without forcing Rust-level ABI churn for every small wrapper change.

use pyo3::exceptions::PyRuntimeError;
use pyo3::prelude::*;
use pythonize::{depythonize, pythonize};
use weaveback_agent_core::{ChangePlan, Workspace, WorkspaceConfig};

#[pyclass]
struct PyWorkspace {
    inner: Workspace,
}

#[pymethods]
impl PyWorkspace {
    #[new]
    fn new(project_root: String, db_path: String, gen_dir: String) -> Self {
        let config = WorkspaceConfig {
            project_root: project_root.into(),
            db_path: db_path.into(),
            gen_dir: gen_dir.into(),
        };

        Self {
            inner: Workspace::open(config),
        }
    }

    fn search(&self, py: Python<'_>, query: &str, limit: usize) -> PyResult<Py<PyAny>> {
        let value = self.inner.session().search(query, limit)
            .map_err(PyRuntimeError::new_err)?;
        pythonize(py, &value)
            .map(|value| value.unbind())
            .map_err(Into::into)
    }

    fn trace(&self, py: Python<'_>, out_file: &str, out_line: u32, out_col: u32) -> PyResult<Py<PyAny>> {
        let value = self.inner.session().trace(out_file, out_line, out_col)
            .map_err(PyRuntimeError::new_err)?;
        pythonize(py, &value)
            .map(|value| value.unbind())
            .map_err(Into::into)
    }

    fn validate_change_plan(&self, py: Python<'_>, plan: Bound<'_, PyAny>) -> PyResult<Py<PyAny>> {
        let plan: ChangePlan = depythonize(&plan)?;
        let value = self.inner.session().validate_change_plan(&plan)
            .map_err(PyRuntimeError::new_err)?;
        pythonize(py, &value)
            .map(|value| value.unbind())
            .map_err(Into::into)
    }

    fn preview_change_plan(&self, py: Python<'_>, plan: Bound<'_, PyAny>) -> PyResult<Py<PyAny>> {
        let plan: ChangePlan = depythonize(&plan)?;
        let value = self.inner.session().preview_change_plan(&plan)
            .map_err(PyRuntimeError::new_err)?;
        pythonize(py, &value)
            .map(|value| value.unbind())
            .map_err(Into::into)
    }

    fn apply_change_plan(&self, py: Python<'_>, plan: Bound<'_, PyAny>) -> PyResult<Py<PyAny>> {
        let plan: ChangePlan = depythonize(&plan)?;
        let value = self.inner.session().apply_change_plan(&plan)
            .map_err(PyRuntimeError::new_err)?;
        pythonize(py, &value)
            .map(|value| value.unbind())
            .map_err(Into::into)
    }
}

#[pymodule]
fn _weaveback(_py: Python<'_>, module: &Bound<'_, PyModule>) -> PyResult<()> {
    module.add_class::<PyWorkspace>()?;
    Ok(())
}

Python Package

The Python package is where agent code should actually live: model wrappers, Pydantic validation, prompt assembly, and the high-level loop that returns a structured plan.

pyproject.toml

[build-system]
requires = ["maturin>=1.8,<2"]
build-backend = "maturin"

[project]
name = "weaveback-agent"
version = "0.1.0"
description = "Typed Python agent APIs for weaveback"
readme = "python/weaveback-agent/README.md"
requires-python = ">=3.14"
dependencies = [
  "pydantic>=2.11",
]

[dependency-groups]
dev = [
  "maturin>=1.13.1",
  "pyright>=1.1.400",
  "pytest>=8.3",
  "ruff>=0.11",
]

[tool.maturin]
module-name = "weaveback_agent._weaveback"
python-source = "python/weaveback-agent/src"
manifest-path = "crates/weaveback-py/Cargo.toml"

[tool.ruff]
line-length = 100

[tool.pyright]
pythonVersion = "3.14"
typeCheckingMode = "strict"

README.md

The Python project should explain that Rust owns project mutation and that the package is an orchestration layer over weaveback rather than a separate editor backend.

# weaveback-agent

Typed Python bindings and agent orchestration helpers for weaveback.

This package is intentionally thin:

- Rust owns tracing, context gathering, and verified source edits.
- Python owns typed planning, Pydantic models, and agent-loop composition.
- Edits are expected to go through a typed `ChangePlan`, not arbitrary file writes.

Package Entry Point

The package root should expose the typed surface, not the raw extension.

from weaveback_agent.agent import AgentLoop, AgentResponse
from weaveback_agent.models import (
    ApplyResult,
    ChangePlan,
    ChangePreview,
    ChangeTarget,
    OutputAnchor,
    PlanValidation,
    PlannedEdit,
    PreviewedEdit,
    TraceResult,
    WorkspaceConfig,
)

__all__ = [
    "AgentLoop",
    "AgentResponse",
    "ApplyResult",
    "ChangePlan",
    "ChangePreview",
    "ChangeTarget",
    "OutputAnchor",
    "PlanValidation",
    "PlannedEdit",
    "PreviewedEdit",
    "TraceResult",
    "WorkspaceConfig",
]

Pydantic Models

These models are the public contract for Python callers and for any PydanticAI-style loop that asks the LLM to produce typed data.

from typing import Annotated, Self

from pydantic import BaseModel, ConfigDict, Field, StringConstraints, model_validator

StrictModelConfig = ConfigDict(extra="forbid", strict=True, validate_assignment=True)

NonEmptyStr = Annotated[str, StringConstraints(strip_whitespace=True, min_length=1)]
SourceLine = Annotated[int, Field(ge=1)]
OutputLine = Annotated[int, Field(ge=1)]
SourceLines = Annotated[list[str], Field(min_length=1)]
IssueList = Annotated[list[str], Field(default_factory=list)]


class StrictModel(BaseModel):
    model_config = StrictModelConfig


class WorkspaceConfig(StrictModel):
    project_root: NonEmptyStr
    db_path: NonEmptyStr = "weaveback.db"
    gen_dir: NonEmptyStr = "gen"


class ChangeTarget(StrictModel):
    src_file: NonEmptyStr
    src_line: SourceLine
    src_line_end: SourceLine

    @model_validator(mode="after")
    def validate_line_range(self) -> Self:
        if self.src_line_end < self.src_line:
            msg = "src_line_end must be greater than or equal to src_line"
            raise ValueError(msg)
        return self


class OutputAnchor(StrictModel):
    out_file: NonEmptyStr
    out_line: OutputLine
    expected_output: str


class PlannedEdit(StrictModel):
    edit_id: NonEmptyStr
    rationale: NonEmptyStr
    target: ChangeTarget
    new_src_lines: SourceLines
    anchor: OutputAnchor


class ChangePlan(StrictModel):
    plan_id: NonEmptyStr
    goal: NonEmptyStr
    constraints: list[NonEmptyStr] = Field(default_factory=list)
    edits: Annotated[list[PlannedEdit], Field(min_length=1)]

    @model_validator(mode="after")
    def validate_edit_ids(self) -> Self:
        edit_ids = [edit.edit_id for edit in self.edits]
        if len(edit_ids) != len(set(edit_ids)):
            msg = "edit_id values must be unique within a ChangePlan"
            raise ValueError(msg)
        return self


class PlanValidation(StrictModel):
    ok: bool
    issues: IssueList


class PreviewedEdit(StrictModel):
    edit_id: NonEmptyStr
    oracle_ok: bool
    src_before: SourceLines
    src_after: SourceLines


class ChangePreview(StrictModel):
    plan_id: NonEmptyStr
    edits: Annotated[list[PreviewedEdit], Field(min_length=1)]


class ApplyResult(StrictModel):
    plan_id: NonEmptyStr
    applied: bool
    applied_edit_ids: list[NonEmptyStr]
    failed_edit_ids: list[NonEmptyStr]


class TraceResult(StrictModel):
    out_file: NonEmptyStr
    out_line: OutputLine
    src_file: NonEmptyStr | None = None
    src_line: SourceLine | None = None
    src_col: SourceLine | None = None
    kind: NonEmptyStr | None = None
    macro_name: NonEmptyStr | None = None
    param_name: NonEmptyStr | None = None

Agent Loop

The loop below is intentionally small. It reads with weaveback primitives, asks a planner for a typed ChangePlan, validates the plan in Rust, previews the plan in Rust, and returns structure instead of applying by default.

from typing import Annotated

from pydantic import Field

from weaveback_agent._weaveback import PyWorkspace
from weaveback_agent.models import (
    ChangePlan,
    ChangePreview,
    PlanValidation,
    StrictModel,
    TraceResult,
    WorkspaceConfig,
)


class AgentResponse(StrictModel):
    summary: Annotated[str, Field(min_length=1)]
    plan: ChangePlan | None = None
    validation: PlanValidation | None = None
    preview: ChangePreview | None = None
    traces: list[TraceResult] = Field(default_factory=list)


class AgentLoop:
    def __init__(self, config: WorkspaceConfig) -> None:
        self._workspace = PyWorkspace(
            project_root=config.project_root,
            db_path=config.db_path,
            gen_dir=config.gen_dir,
        )

    def inspect_trace(self, out_file: str, out_line: int, out_col: int = 1) -> TraceResult | None:
        raw = self._workspace.trace(out_file, out_line, out_col)
        if raw is None:
            return None
        return TraceResult.model_validate(raw)

    def validate_plan(self, plan: ChangePlan) -> PlanValidation:
        raw = self._workspace.validate_change_plan(plan.model_dump(mode="python"))
        return PlanValidation.model_validate(raw)

    def preview_plan(self, plan: ChangePlan) -> ChangePreview:
        raw = self._workspace.preview_change_plan(plan.model_dump(mode="python"))
        return ChangePreview.model_validate(raw)

    def run_once(self, task: str, planner: object) -> AgentResponse:
        del task
        del planner
        return AgentResponse(
            summary="Planner integration belongs here; the Rust boundary stays narrow.",
        )

Python Integration Tests

These tests validate the supported path for the PyO3 extension: install it with maturin develop, then import it from Python and exercise the thin agent wrapper over a real temporary workspace.

from pathlib import Path
import pytest

from weaveback_agent import AgentLoop, WorkspaceConfig
from weaveback_agent._weaveback import PyWorkspace


def test_pyworkspace_basic(tmp_path: Path) -> None:
    db_path = tmp_path / "wb.db"
    gen_dir = tmp_path / "gen"
    gen_dir.mkdir()

    ws = PyWorkspace(str(tmp_path), str(db_path), str(gen_dir))

    with pytest.raises(RuntimeError, match="Database not found"):
        ws.search("test", 10)

    with pytest.raises(RuntimeError, match="Database not found"):
        ws.trace("nonexistent.rs", 1, 1)


def test_agent_loop_trace_and_summary(tmp_path: Path) -> None:
    db_path = tmp_path / "wb.db"
    gen_dir = tmp_path / "gen"
    gen_dir.mkdir()

    loop = AgentLoop(
        WorkspaceConfig(
            project_root=str(tmp_path),
            db_path=str(db_path),
            gen_dir=str(gen_dir),
        )
    )

    with pytest.raises(RuntimeError, match="Database not found"):
        loop.inspect_trace("nonexistent.rs", 1, 1)

    response = loop.run_once("do nothing", planner=object())
    assert "Planner integration belongs here" in response.summary
    assert response.plan is None
    assert response.preview is None

Next Step

The next implementation step should not be "add more agent features". It should be:

  1. extract reusable read/apply logic out of crates/weaveback/src/mcp.rs into a Rust library surface

  2. make weaveback-agent-core call that surface

  3. make weaveback-py call weaveback-agent-core

That keeps one mutation engine and one tracing engine, which is the main thing worth preserving.