День 5: Memory и финальный пайплайн
Цель: Добавить память агентам и собрать полный self-improving pipeline
Время изучения: 2-3 часа
Код: src/day5_memory_pipeline.py
Тесты: tests/test_day5_memory_pipeline.py
🤖 Связь с агентами
Вчера: Дали агентам инструменты (tools)
Сегодня: Даём агентам память — они становятся по-настоящему умными!
Что мы строим?
Зачем это нужно?
Реальный сценарий: Ассистент для работы
Memory = способность агента помнить контекст и учиться!
Часть 1: Что такое Memory?
Типы памяти
# 1. Short-term Memory (Краткосрочная)
# Хранится в state, живёт только во время выполнения
state = {
"messages": ["User: Hello", "Agent: Hi!"],
"current_context": "Discussing LangGraph"
}
# 2. Long-term Memory (Долгосрочная)
# Сохраняется между сессиями (checkpointing)
checkpoint = {
"session_id": "user_123",
"history": [...],
"learned_facts": [...]
}
# 3. Working Memory (Рабочая)
# Активная информация для текущей задачи
working_memory = {
"current_task": "Write article",
"gathered_info": [...],
"progress": 0.5
}
Часть 2: Реализация Memory в State
Расширенная State-схема
from typing import TypedDict, List, Dict, Optional
from datetime import datetime
class Message(TypedDict):
"""Сообщение в истории."""
role: str # "user", "agent", "system"
content: str
timestamp: str
metadata: Optional[Dict]
class AgentState(TypedDict, total=False):
# Основные поля
task: str
plan: List[str]
current_step: int
step_result: str
status: str
iteration: int
max_iterations: int
final_result: str
# 🆕 Memory поля
messages: List[Message] # История сообщений
context: Dict[str, any] # Контекст разговора
learned_facts: List[str] # Изученные факты
tools_history: List[Dict] # История использования tools
session_id: Optional[str] # ID сессии для checkpointing
Работа с историей сообщений
def add_message(state: AgentState, role: str, content: str) -> AgentState:
"""Добавляет сообщение в историю."""
updated = dict(state)
messages = updated.get("messages", [])
message: Message = {
"role": role,
"content": content,
"timestamp": datetime.now().isoformat(),
"metadata": {}
}
messages.append(message)
updated["messages"] = messages
return updated
def get_recent_messages(state: AgentState, n: int = 5) -> List[Message]:
"""Получает последние N сообщений."""
messages = state.get("messages", [])
return messages[-n:]
def get_context_summary(state: AgentState) -> str:
"""Создаёт краткое резюме контекста."""
messages = state.get("messages", [])
learned_facts = state.get("learned_facts", [])
summary = f"Messages: {len(messages)}, Facts learned: {len(learned_facts)}"
return summary
Часть 3: Checkpointing
Что такое Checkpointing?
Checkpointing — сохранение состояния графа для возможности:
- Прод��лжить выполнение позже
- Откатиться к предыдущему состоянию
- Сохранить историю между сессиями
Простой Checkpointer
import json
from pathlib import Path
class SimpleCheckpointer:
"""Простой checkpointer для сохранения состояния."""
def __init__(self, checkpoint_dir: str = ".checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
def save(self, session_id: str, state: AgentState) -> None:
"""Сохраняет состояние."""
filepath = self.checkpoint_dir / f"{session_id}.json"
# Конвертируем state в JSON-сериализуемый формат
serializable_state = self._make_serializable(state)
with open(filepath, 'w') as f:
json.dump(serializable_state, f, indent=2)
print(f"💾 Checkpoint saved: {session_id}")
def load(self, session_id: str) -> Optional[AgentState]:
"""Загружает состояние."""
filepath = self.checkpoint_dir / f"{session_id}.json"
if not filepath.exists():
return None
with open(filepath, 'r') as f:
state = json.load(f)
print(f"📂 Checkpoint loaded: {session_id}")
return state
def _make_serializable(self, obj):
"""Делает объект JSON-сериализуемым."""
if isinstance(obj, dict):
return {k: self._make_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._make_serializable(item) for item in obj]
else:
return obj
Использование Checkpointer
checkpointer = SimpleCheckpointer()
# Сохранение
state = {
"task": "Write article",
"messages": [...],
"session_id": "user_123"
}
checkpointer.save("user_123", state)
# Загрузка
loaded_state = checkpointer.load("user_123")
if loaded_state:
# Продолжаем с сохранённого состояния
workflow.invoke(loaded_state)
Часть 4: Агенты с Memory
Planner с Memory
def planner_agent_with_memory(state: AgentState) -> AgentState:
"""Planner Agent с учётом памяти."""
updated = dict(state)
task = state.get("task", "")
# Проверяем, есть ли релевантная информация в памяти
learned_facts = state.get("learned_facts", [])
context = state.get("context", {})
print(f"\n📋 Planner: Planning with memory")
print(f" Learned facts: {len(learned_facts)}")
print(f" Context keys: {list(context.keys())}")
# Создаём план с учётом контекста
plan = [
f"Step 1: Review learned facts about '{task}'",
f"Step 2: Research new information about '{task}'",
f"Step 3: Synthesize with existing knowledge"
]
updated["plan"] = plan
updated["current_step"] = 0
updated["status"] = "executing"
# Добавляем сообщение в историю
updated = add_message(updated, "agent", f"Created plan for: {task}")
return updated
Executor с Memory
def executor_agent_with_memory(state: AgentState) -> AgentState:
"""Executor Agent с сохранением в память."""
updated = dict(state)
plan = state.get("plan", [])
current_step = state.get("current_step", 0)
if current_step >= len(plan):
updated["status"] = "done"
updated["final_result"] = state.get("step_result", "Task completed")
return updated
step = plan[current_step]
# Выполняем шаг
result = f"Completed: {step}"
# Сохраняем результат в память
learned_facts = updated.get("learned_facts", [])
learned_facts.append(f"From step {current_step + 1}: {result}")
updated["learned_facts"] = learned_facts
# Обновляем контекст
context = updated.get("context", {})
context[f"step_{current_step}"] = result
updated["context"] = context
# Сохраняем в историю tools
tools_history = updated.get("tools_history", [])
tools_history.append({
"step": current_step,
"action": step,
"result": result,
"timestamp": datetime.now().isoformat()
})
updated["tools_history"] = tools_history
updated["step_result"] = result
updated["status"] = "critiquing"
updated["iteration"] = updated.get("iteration", 0) + 1
# Добавляем в историю сообщений
updated = add_message(updated, "agent", f"Executed: {step}")
print(f"\n⚙️ Executor: Step {current_step + 1} (with memory)")
print(f" Learned facts: {len(learned_facts)}")
print(f" Context size: {len(context)}")
return updated
Critic с Memory
def critic_agent_with_memory(state: AgentState) -> AgentState:
"""Critic Agent с анализом истории."""
updated = dict(state)
step_result = state.get("step_result", "")
current_step = state.get("current_step", 0)
# Анализируем историю для лучшей оценки
tools_history = state.get("tools_history", [])
learned_facts = state.get("learned_facts", [])
print(f"\n🔍 Critic: Evaluating with memory")
print(f" Previous actions: {len(tools_history)}")
print(f" Knowledge base: {len(learned_facts)} facts")
# Оценка с учётом истории
if len(tools_history) > 3:
critique = "Good progress, moving to next step"
decision = "next"
updated["current_step"] = current_step + 1
else:
critique = "Need more information"
decision = "retry"
updated["critique"] = critique
# Определяем статус
plan = state.get("plan", [])
if decision == "retry":
updated["status"] = "executing"
elif updated["current_step"] < len(plan):
updated["status"] = "executing"
else:
updated["status"] = "done"
updated["final_result"] = step_result
# Добавляем в ��сторию
updated = add_message(updated, "agent", f"Critique: {critique}")
return updated
Часть 5: Финальный Pipeline
Полная система
from langgraph.graph import StateGraph, END
def create_full_pipeline_with_memory():
"""Создаёт полный pipeline с memory и checkpointing."""
graph = StateGraph(AgentState)
# Добавляем агентов с memory
graph.add_node("planner", planner_agent_with_memory)
graph.add_node("executor", executor_agent_with_memory)
graph.add_node("critic", critic_agent_with_memory)
# Связываем
graph.set_entry_point("planner")
graph.add_edge("planner", "executor")
graph.add_edge("executor", "critic")
graph.add_conditional_edges(
"critic",
should_continue,
{
"execute": "executor",
"critique": "critic",
"end": END
}
)
return graph.compile()
def run_with_checkpointing(task: str, session_id: str):
"""Запуск с checkpointing."""
checkpointer = SimpleCheckpointer()
workflow = create_full_pipeline_with_memory()
# Пытаемся загрузить предыдущее состояние
state = checkpointer.load(session_id)
if state:
print(f"📂 Resuming session: {session_id}")
# Продолжаем с сохранённого состояния
state["task"] = task # Обновляем задачу
else:
print(f"🆕 Starting new session: {session_id}")
# Новая сессия
state = {
"task": task,
"iteration": 0,
"max_iterations": 10,
"status": "planning",
"session_id": session_id,
"messages": [],
"learned_facts": [],
"tools_history": [],
"context": {}
}
# Выполняем
result = workflow.invoke(state)
# Сохраняем checkpoint
checkpointer.save(session_id, result)
return result
Пример использования
def main():
"""Пример полного pipeline с memory."""
print("="*60)
print("🚀 Full Pipeline with Memory & Checkpointing")
print("="*60)
# Первый запуск
result1 = run_with_checkpointing(
task="Research LangGraph",
session_id="session_001"
)
print("\n" + "="*60)
print("📊 Session 1 Result")
print("="*60)
print(f"Messages: {len(result1.get('messages', []))}")
print(f"Learned facts: {len(result1.get('learned_facts', []))}")
print(f"Tools history: {len(result1.get('tools_history', []))}")
# Второй запуск (продолжение)
print("\n" + "="*60)
print("🔄 Continuing session...")
print("="*60)
result2 = run_with_checkpointing(
task="Create presentation about LangGraph",
session_id="session_001" # Тот же session_id!
)
print("\n" + "="*60)
print("📊 Session 2 Result")
print("="*60)
print(f"Messages: {len(result2.get('messages', []))}")
print(f"Learned facts: {len(result2.get('learned_facts', []))}")
print(f"Context preserved: {len(result2.get('context', {}))}")
Ключевые выводы
✅ Memory — способность агентов помнить контекст
✅ Messages history — история всех сообщений
✅ Learned facts — накопление знаний
✅ Checkpointing — сохранение состояния между сессиями
✅ Context — активная рабочая память
✅ Tools history — история использования инструментов
🤖 Связь с агентами
Сегодня вы создали:
- ✅ Агентов с памятью
- ✅ Систему сохранения состояния
- ✅ Полный self-improving pipeline
- ✅ Deliverable Week 1 — готовая система!
Следующий шаг: Неделя 2 — Multi-Agent системы с Supervisor и Router!
Следующий шаг: День 6
В следующей статье мы изучим:
- Subgraphs — вложенные графы
- Supervisor — координатор агентов
- Иерархическая архитектура
👉 День 6: Subgraphs и Supervisor (скоро)
Ресурсы
Прогресс: 5/10 дней ✅
🎉 Deliverable Week 1 Complete!