Pipeline · Memory · Event Bus

Bộ Máy Vận Hành

Từ monolithic 804 dòng tới pipeline 8 stage có thể test riêng —
cùng Memory 3 tầng và Event Bus phân tách phụ thuộc.

0
Pipeline
Stages
0
Run
Substates
0
Pipeline
Callbacks

Hình dung một nhà bếp mà đầu bếp vừa nấu, vừa rửa bát, vừa nhận order, vừa mua nguyên liệu — tất cả trong 1 hàm 804 dòng. Đó là runLoop() v2. Không thể test riêng từng bước, không thể thay thế một công đoạn mà không ảnh hưởng toàn bộ.

v2 — Trước

runLoop() — 804 dòng

Tất cả logic gộp chung: gọi LLM, thực thi tool, prune context, lưu message, xử lý lỗi — tất cả trong một hàm duy nhất. Bug ở bất kỳ bước nào cũng đòi hỏi debug toàn bộ hàm.

v3 — Sau

Pipeline — 8 Stage Độc Lập

Mỗi stage là một unit riêng biệt: có input/output rõ ràng, có thể test độc lập, có thể thay thế mà không ảnh hưởng stage khác. Bug khoanh vùng chính xác trong vài giây.

3 Pha của Pipeline

Setup một lần → Lặp nhiều vòng → Kết thúc dọn dẹp.

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#262220', 'primaryTextColor': '#f5f0eb', 'primaryBorderColor': '#3d3835', 'lineColor': '#e07030', 'secondaryColor': '#1f1c18', 'tertiaryColor': '#1a1714', 'background': '#1a1714', 'mainBkg': '#262220', 'nodeBorder': '#4a4543', 'clusterBkg': '#1a1714', 'titleColor': '#f5f0eb', 'edgeLabelBackground': '#262220', 'fontFamily': 'system-ui'} }}%% flowchart TD Start(["▶ RunRequest"]) --> CS subgraph SETUP["Pha 1: Setup"] CS["ContextStage\n⚙️ Build Context"] end CS --> LOOP subgraph LOOP["Pha 2: Iteration Loop"] direction TB TS["ThinkStage\n🧠 Gọi LLM"] --> PS["PruneStage\n✂️ Kiểm tra token"] PS --> TOS["ToolStage\n🔧 Thực thi tools"] TOS --> OS["ObserveStage\n👁️ Thu thập kết quả"] OS --> CPS["CheckpointStage\n💾 Lưu mỗi 5 vòng"] CPS -->|"Continue"| TS end TS -->|"BreakLoop\n(không có tool calls)"| FS PS -->|"AbortRun\n(vẫn vượt budget)"| FS TOS -->|"BreakLoop\n(loop kill / read-only / hết budget)"| FS CPS -->|"BreakLoop"| FS subgraph FINALIZE["Pha 3: Finalize"] FS["FinalizeStage\n🏁 Dọn dẹp & flush"] end FS --> End(["✅ RunResult"]) style SETUP fill:#1a1714,stroke:#3d3835 style LOOP fill:#1a1714,stroke:#3d3835 style FINALIZE fill:#1a1714,stroke:#3d3835

8 Stage — Từng Bước Một

Nhấn vào mỗi stage để xem chi tiết cách hoạt động.

Stage khởi tạo toàn bộ context cần thiết cho phiên làm việc. Chạy một lần duy nhất trước khi vòng lặp bắt đầu.

  • Inject identity: render SOUL.md + IDENTITY.md vào system prompt
  • Resolve workspace: xác định working directory dựa theo loại agent (open/predefined)
  • Load context files: đọc agent_context_files + user_context_files
  • Load history: lấy conversation history từ DB, đặt vào MessageBuffer
  • Build messages: ghép system prompt + history + user message mới
  • Count tokens: đếm token chính xác từ lần trả lời LLM gần nhất
  • Enrich media: resolve media attachments, chuyển URL thành base64 nếu cần
  • Inject reminders: thêm reminder messages đặc biệt theo agent config
  • Auto-inject L0 memory: nếu V3RetrievalEnabled && AutoInject != nil && có message — inject tóm tắt ngắn từ episodic memory vào system prompt (context_stage.go:97)

Tim đập của pipeline — stage này chịu trách nhiệm giao tiếp với LLM provider và xử lý kết quả trả về.

  • Build filtered tools: lọc danh sách tool theo agent config và quyền hạn
  • Gọi LLM (streaming hoặc sync) tùy provider — Anthropic native SSE, OpenAI-compat, DashScope
  • Accumulate tokens: cộng dồn token usage từ response vào RunState
  • Truncation retry: nếu LLM trả về lỗi context too long → trim nhẹ và retry, tối đa 3 lần
  • Budget nudge: phát cảnh báo nudge vào system prompt khi đạt 70% và 90% token budget
  • BreakLoop nếu response không có tool calls — nghĩa là LLM đã hoàn thành, chuẩn bị trả kết quả
Lưu ý Khi BreakLoop, pipeline nhảy thẳng sang FinalizeStage — bỏ qua PruneStage, ToolStage, ObserveStage, CheckpointStage của vòng đó.

Quản lý context window — đảm bảo không bao giờ vượt ngưỡng token của model, với chiến lược 2 pha tốc độ khác nhau.

  • Pha 1 — Soft Trim (70% budget): loại bỏ messages cũ nhất từ history, giữ lại context gần đây nhất. Nhanh, không cần LLM call thêm
  • Pha 2 — Hard Compaction (100% budget): khi soft trim không đủ → gọi LLM tóm tắt toàn bộ conversation thành một đoạn ngắn. Đắt hơn nhưng giữ được ý nghĩa
  • Sau hard compaction → trigger MemoryFlushStage để lưu episodic summary
  • AbortRun nếu sau cả 2 pha mà vẫn vượt budget — trường hợp cực hiếm, thường do context file quá lớn
Tại sao 2 pha? Soft trim bảo toàn context gần đây (quan trọng nhất cho reasoning). Hard compact chỉ kích hoạt khi thực sự cần — tiết kiệm cost và latency trong 90%+ trường hợp.

Stage thực thi danh sách tool calls từ ThinkStage — với chiến lược song song hoặc tuần tự tùy loại tool.

  • Parallel mode: ExecuteToolRaw chạy song song tất cả tools cùng lúc → sau đó ProcessToolResult xử lý kết quả tuần tự để tránh race condition
  • Sequential fallback: một số tool (write file, update DB) yêu cầu tuần tự — được đánh dấu qua metadata
  • Check ReadOnly: nếu agent ở chế độ read-only → skip tools có side effect → BreakLoop
  • MaxToolCalls: nếu vượt giới hạn tool calls per run → BreakLoop
  • Loop detection: phát hiện tool call loop (gọi cùng tool với cùng input nhiều lần) → BreakLoop
  • BreakLoop khi: loop kill trigger / read-only violation / hết budget tool calls

Stage quan sát — tổng hợp tất cả kết quả trong vòng lặp hiện tại và chuẩn bị cho vòng tiếp theo.

  • Drain InjectCh (non-blocking): nhận các message được inject từ bên ngoài (ví dụ: user interrupt, system event)
  • Đếm BlockReplies: theo dõi số lần reply bị chặn để phát hiện deadlock
  • Tích luỹ FinalContent: ghép nối content từ các vòng lặp để build response cuối
  • Tích luỹ FinalThinking: ghép nội dung thinking (cho extended thinking mode)
  • Luôn trả về Continue — quyết định dừng vòng lặp thuộc về ThinkStage và ToolStage
Thiết kế ObserveStage không bao giờ quyết định BreakLoop hay AbortRun. Trách nhiệm duy nhất là thu thập và tích luỹ — phân tách rõ ràng giữa observation và control flow.

Bảo vệ dữ liệu trong long-running sessions — flush messages định kỳ để chống mất dữ liệu khi crash.

  • Flush pending messages vào DB mỗi 5 vòng lặp (iteration % 5 == 0)
  • Lỗi checkpoint không fatal — log warning và tiếp tục vòng lặp, không dừng agent
  • Hỗ trợ crash recovery: nếu server restart, agent có thể resume từ checkpoint gần nhất
  • Chỉ flush messages đang pending — không re-flush những gì đã lưu trước đó
Chiến lược lỗi Checkpoint lỗi không được phép kill session — worst case mất 5 vòng cuối, tốt hơn nhiều so với mất toàn bộ run.

Chuyển kiến thức từ Working Memory (context window) xuống Episodic Memory (database) — kích hoạt khi PruneStage thực hiện hard compaction.

  • Gọi RunMemoryFlush để xử lý nội dung đang bị compact
  • Tạo episodic summary: tóm tắt ngắn gọn conversation bị compact
  • Tính embedding vector cho summary → lưu vào episodic_summaries table
  • Phát sự kiện episodic.created → trigger consolidation worker bất đồng bộ
  • Không block vòng lặp chính — consolidation xảy ra async qua Event Bus

Stage cuối cùng — đảm bảo tất cả dữ liệu được lưu đúng cách và cleanup hoàn toàn, ngay cả khi context bị cancel.

  • Sanitize content: làm sạch output trước khi trả về client
  • Dedup media: loại bỏ media attachment trùng lặp
  • Flush messages còn lại: đảm bảo không có pending message nào bị bỏ sót
  • Update metadata: cập nhật session stats, token usage, run duration
  • Bootstrap cleanup: dọn dẹp temporary context files nếu có
  • Maybe summarize (async): nếu session đủ dài → trigger async summarization cho L1 memory
  • Dùng context.WithoutCancel() — đảm bảo finalize hoàn thành kể cả khi client đã ngắt kết nối
Vì sao WithoutCancel? Khi user cancel request, Go context bị cancel. Nếu FinalizeStage dùng context gốc → mọi DB write đều fail → mất dữ liệu. WithoutCancel tạo context mới không bao giờ cancel, chỉ cho phần cleanup.

V3 Tool Inventory

4 tool mới được thêm trong v3 — mở rộng khả năng agent với memory, vault, và delegation.

Khác với subagent (clone chính mình), delegate gửi task cho một agent khác đã được liên kết qua agent_links.

Tham số agent_key (required), task (required), mode (async|sync), timeout (max 600s)
Async Fire-and-forget — agent chính tiếp tục ngay, không chờ kết quả
Sync Chờ kết quả trả về trong timeout giây, kết quả inject vào conversation
Permission Phải có agent_links record giữa 2 agent, cùng tenant. Emit event delegate.completed

File: internal/tools/delegate_tool.go

Tool tìm kiếm chính — tìm đồng thời trên vault docs, episodic memory, và knowledge graph. Kết quả xếp hạng theo relevance.

Tham số query (required), scope (personal|team|shared), types (context,memory,note,skill,episodic), maxResults
Hybrid search FTS (BM25) + vector similarity — kết quả combine từ cả hai phương pháp
So với memory_search chỉ tìm trong memory store, kg_search chỉ tìm trong graph. vault_search tìm tất cả

File: internal/tools/vault_search.go

Giống [[wikilinks]] trong markdown — tạo liên kết hai chiều giữa 2 tài liệu. Auto-register nếu tài liệu chưa tồn tại trong vault.

Tham số from (required), to (required), context (optional — mô tả mối quan hệ)
Auto-register Nếu source/target chưa trong vault → tự đăng ký dựa trên đường dẫn workspace-relative

File: internal/tools/vault_link.go

Lấy toàn bộ nội dung episodic memory theo ID — dùng khi agent cần reconstruct lịch sử chi tiết từ kết quả memory_search.

Tham số id (required — episodic memory ID từ kết quả memory_search)
Output Full summary + metadata (session key, created date, turn count)
Yêu cầu V3 episodic memory phải enabled — trả lỗi nếu không có

File: internal/tools/memory_expand.go

Cấu Trúc Nội Bộ

RunState, MessageBuffer, Flow Control và Adapter Pattern.

RunState là struct trung tâm truyền qua mọi stage — chứa tất cả thông tin về một lần thực thi agent.

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#262220', 'primaryTextColor': '#f5f0eb', 'primaryBorderColor': '#3d3835', 'lineColor': '#e07030', 'secondaryColor': '#1f1c18', 'tertiaryColor': '#1a1714', 'background': '#1a1714', 'mainBkg': '#262220', 'nodeBorder': '#4a4543', 'clusterBkg': '#1a1714', 'titleColor': '#f5f0eb', 'edgeLabelBackground': '#262220', 'fontFamily': 'system-ui'} }}%% classDiagram class RunState { +Input: string +Model: string +Provider: LLMProvider +Workspace: WorkspaceInfo +Messages: MessageBuffer +Iteration: int +ExitReason: string +TokenUsage: TokenUsage +ToolCallCount: int +FinalContent: string +FinalThinking: string } class SubStates { ContextState ThinkState PruneState ToolState ObserveState CompactState EvolutionState } RunState --> SubStates : contains

7 substates phản ánh trạng thái hiện tại của agent — được emit ra ngoài qua events để UI hiển thị spinner đúng loại.

MessageBuffer không phải một mảng đơn giản — nó chia messages thành 3 vùng với semantics khác nhau.

🔒

System (Bất biến)

System prompt đã render — không bao giờ bị prune. Chứa identity, workspace info, memory injection.

📚

History (Đã lưu)

Messages đã được flush vào DB. PruneStage có thể trim vùng này. ReplaceHistory() khi compact.

✏️

Pending (Mới)

Messages trong vòng lặp hiện tại chưa lưu. FlushPending() tại checkpoint. All() ghép cả 3 vùng.

Mỗi stage trả về một trong 3 signal để điều khiển vòng lặp pipeline.

Signal Ý nghĩa Hành động Pipeline
Continue Stage hoàn thành, tiếp tục vòng lặp Chạy stage tiếp theo trong vòng lặp
BreakLoop Kết thúc vòng lặp bình thường Thoát khỏi iteration loop → chạy FinalizeStage
AbortRun Dừng khẩn cấp do lỗi nghiêm trọng Dừng ngay lập tức — ghi lỗi, không qua FinalizeStage

Thay vì rewrite toàn bộ 804 dòng code v2, team dùng Adapter Pattern: closures wrap các callbacks của v2 Loop thành interface mà Pipeline stages có thể gọi.

  • loop_pipeline_adapter.go: bridge từ Loop methods sang Pipeline callbacks
  • loop_pipeline_callbacks.go: 30+ callback implementations (sendMessage, handleTool, emitEvent, v.v.)
  • Closures capture Loop context tự nhiên — không cần truyền struct phức tạp
  • Adapter layer = một lớp indirection nhỏ, đánh đổi lấy zero rewrite cost
Lưu ý Closure phải cẩn thận không capture stale state khi reuse giữa các vòng lặp. Mỗi vòng lặp nên tạo closure mới hoặc reset state rõ ràng.

Working memory = bảng trắng trên bàn làm việc — thao tác nhanh, nhưng tắt đèn là mất. Episodic memory = nhật ký ghi chép hàng ngày — đọc lại được, nhưng phải tóm lược. Semantic memory = bách khoa toàn thư cá nhân — tri thức có cấu trúc, tra cứu theo chủ đề.

Tầng 1
Working Memory
"Bảng trắng trên bàn"
Nằm trong context window của model. Không lưu trữ riêng — tự động tồn tại qua history messages trong MessageBuffer. Khi vượt budget → PruneStage cắt tỉa hoặc compact.
Auto-managed Volatile
Tầng 2
Episodic Memory
"Nhật ký hàng ngày"
Bảng episodic_summaries — tóm tắt từng phiên hội thoại + embedding vector. Truy cập: L0 auto-inject vào system prompt, L1 qua memory_search tool, L2 semantic search sâu hơn.
pgvector Persistent
Tầng 3
Semantic Memory
"Bách khoa toàn thư cá nhân"
Bảng kg_entities + kg_relations với valid_from/valid_until. Tri thức có thời hạn — entity hết hạn tự động lỗi thời. Truy cập qua kg_search tool.
Temporal Structured

Từ Context đến Knowledge Graph

Tri thức chảy từ tầng 1 lên tầng 3 — và ngược lại được inject vào tầng 1.

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#262220', 'primaryTextColor': '#f5f0eb', 'primaryBorderColor': '#3d3835', 'lineColor': '#e07030', 'secondaryColor': '#1f1c18', 'tertiaryColor': '#1a1714', 'background': '#1a1714', 'mainBkg': '#262220', 'nodeBorder': '#4a4543', 'clusterBkg': '#1a1714', 'titleColor': '#f5f0eb', 'edgeLabelBackground': '#262220', 'fontFamily': 'system-ui'} }}%% flowchart LR WM["🗒️ Working Memory\n(Context Window)"] EM["📖 Episodic Memory\n(episodic_summaries)"] KG["🧠 Semantic Memory\n(kg_entities / kg_relations)"] WM -->|"session.completed\nprune.compacted"| EM EM -->|"episodic.created"| KG EM -->|"L0 auto-inject\n(ContextStage)"| WM EM -->|"L1/L2 memory_search\n(ToolStage)"| WM KG -->|"kg_search tool\n(ToolStage)"| WM style WM fill:#262220,stroke:#e07030 style EM fill:#262220,stroke:#c89030 style KG fill:#262220,stroke:#2ea85a

ContextStage tự động inject memory vào system prompt trước mỗi vòng lặp — agent "nhớ" mà không cần user nhắc lại.

  • Điều kiện kích hoạt: V3RetrievalEnabled && AutoInject != nil && Message != "" (context_stage.go:97)
  • L0 — Tự động: tóm tắt ngắn các phiên trước (top-3 relevant), inject vào system prompt. Luôn chạy nếu retrieval enabled
  • L1 — Theo yêu cầu: chi tiết hơn L0, agent gọi memory_search khi cần thêm context. Vector similarity search trên embedding
  • L2 — Sâu: toàn bộ nội dung episodic, không tóm lược. Dùng khi agent cần reconstruct lịch sử đầy đủ

Sau mỗi session kết thúc hoặc memory được compact, hệ thống tự động consolidate — tất cả bất đồng bộ, không block user.

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#262220', 'primaryTextColor': '#f5f0eb', 'primaryBorderColor': '#3d3835', 'lineColor': '#e07030', 'secondaryColor': '#1f1c18', 'tertiaryColor': '#1a1714', 'background': '#1a1714', 'mainBkg': '#262220', 'nodeBorder': '#4a4543', 'clusterBkg': '#1a1714', 'titleColor': '#f5f0eb', 'edgeLabelBackground': '#262220', 'fontFamily': 'system-ui'} }}%% sequenceDiagram participant S as Session / PruneStage participant B as DomainEventBus participant W1 as Worker 1 participant W2 as Worker 2 participant W3 as Worker 3 S->>B: session.completed / prune.compacted B->>W1: dispatch event W1->>W1: tạo episodic summary + embedding W1->>B: episodic.created B->>W2: dispatch event W2->>W2: trích xuất entities từ summary W2->>B: entities.extracted B->>W3: dispatch event W3->>W3: xây dựng relations giữa entities

Ba worker chạy tuần tự qua Event Bus — mỗi worker chờ kết quả của worker trước qua event, không phải direct call.

Không phải mọi tri thức đều vĩnh viễn — KG v3 thêm chiều thời gian vào mỗi entity và relation.

v2 — Knowledge Graph

Entities và relations vĩnh viễn — một khi tạo ra không bao giờ tự động lỗi thời. "Việt dùng React" mãi mãi đúng dù thực tế đã thay đổi.

v3 — Temporal KG

Mỗi entity/relation có valid_fromvalid_until. Tri thức tự động lỗi thời theo thời gian — phản ánh thực tế thay đổi.

Ví dụ thực tế Entity "Việt dùng React 18" → valid_from: 2025-01 / valid_until: 2026-03. Sau tháng 3/2026, kg_search sẽ không trả về entity này nữa — tránh context lỗi thời làm agent trả lời sai.

Hệ thống bưu điện nội bộ — bạn gửi thư bỏ vào hòm thư, không cần biết ai nhận, không cần chờ họ đọc xong. Người cần thì tự đến lấy. Module không biết nhau, chỉ biết sự kiện.

Producer → Bus → Consumer

%%{init: {'theme': 'dark', 'themeVariables': {'primaryColor': '#262220', 'primaryTextColor': '#f5f0eb', 'primaryBorderColor': '#3d3835', 'lineColor': '#e07030', 'secondaryColor': '#1f1c18', 'tertiaryColor': '#1a1714', 'background': '#1a1714', 'mainBkg': '#262220', 'nodeBorder': '#4a4543', 'clusterBkg': '#1a1714', 'titleColor': '#f5f0eb', 'edgeLabelBackground': '#262220', 'fontFamily': 'system-ui'} }}%% flowchart LR subgraph P["Producers"] P1["Pipeline\nsession.completed\nprune.compacted"] P2["Memory\nepisodic.created\nentities.extracted"] P3["Evolution\nmetrics.recorded"] P4["Tools\ndelegate.completed"] end subgraph B["DomainEventBus"] Q["Queue\n(bounded channel)"] D["Dedup\n(UUID cache)"] WP["Worker Pool\n(N goroutines)"] R["Retry\n(backoff)"] Q --> D --> WP --> R end subgraph C["Consumers"] C1["Consolidation\nWorkers"] C2["Evolution\nEngine"] C3["Notification\n(Telegram, Webhook)"] C4["Tracing\n(OTel)"] end P1 --> Q P2 --> Q P3 --> Q P4 --> Q WP --> C1 WP --> C2 WP --> C3 WP --> C4

3 Cơ Chế Bảo Vệ

🔑
Deduplication
Mỗi event có UUID duy nhất. Bus cache lại các ID đã xử lý — event trùng bị bỏ qua ngay tại bước dedup, không bao giờ tới consumer. Đảm bảo idempotency trong mọi trường hợp retry.
Worker Pool
N goroutines chạy đồng thời xử lý events. Producer phát event xong đi tiếp ngay — non-blocking. Bounded channel tạo backpressure tự nhiên khi consumer chậm hơn producer.
🔄
Retry với Backoff
Consumer lỗi → event quay lại queue để retry. Backoff tăng dần (1s → 2s → 4s...) tránh storm khi downstream service gặp sự cố. Tối đa N lần retry trước khi dead-letter.

Chuỗi Sự Kiện

Sự Kiện Producer Consumer Mục Đích
session.completed FinalizeStage Consolidation Worker 1 Tạo episodic summary sau khi session kết thúc
prune.compacted PruneStage MemoryFlushStage Flush memory khi hard compaction xảy ra
episodic.created Consolidation W1 Consolidation Worker 2 Trích xuất entities từ episodic summary mới
entities.extracted Consolidation W2 Consolidation Worker 3 Xây dựng relations giữa các entities vừa trích xuất
metrics.recorded Pipeline stages Evolution Engine Thu thập performance metrics sau mỗi run
delegate.completed Delegate Tool Evolution Engine Ghi nhận kết quả uỷ quyền cho sub-agent

Vấn đề: Memory module cần trigger Evolution, Evolution cần đọc Memory metrics, Consolidation cần notify cả hai — tạo vòng import circular không thể compile trong Go.

  • Direct call: memory.Notify(evolution)evolution.import memory → circular
  • Event Bus: Memory phát metrics.recorded, Evolution subscribe → không có import nào giữa hai module
  • Strongly-typed events (struct, không phải string) → compile-time safety
  • Async by default — phù hợp với nature của consolidation và evolution (không cần realtime)
Trade-off Debug khó hơn vì không có call stack trực tiếp. Event có thể đến sau một chút delay (milliseconds). Cần log event ID để trace xuyên suốt pipeline.