День 5: Memory и финальный пайплайн

Цель: Добавить память агентам и собрать полный self-improving pipeline

Время изучения: 2-3 часа
Код: src/day5_memory_pipeline.py
Тесты: tests/test_day5_memory_pipeline.py


🤖 Связь с агентами

Вчера: Дали агентам инструменты (tools)
Сегодня: Даём агентам память — они становятся по-настоящему умными!

Что мы строим?

В А К ч г а е е ж р н д а т ы : ы й б з е а з п р п о а с м я с т и н у л я А П - - - С г о е е м П К Р г н н р о е о т я е н з д ы т д у н : ы л я с д е ь : у к т п щ с а а и т т м е ы я р т д а р ь е з а ю й г б с о о т в т в о ы и р я а

Зачем это нужно?

Реальный сценарий: Ассистент для работы

П А П А П А о г о г о г л е л е л е ь н ь н ь н з т з т з т о : о : о : в в в а [ а [ а [ т И т И т П е щ е с е о л е л п л м ь т ь о ь н : , : л : и ь т " с " з " Н о Т у Д к а х е е о о й р п т б н д а е а т и н р и в е я ь н ь к и е ф с н т с о п т ф о р р о в з м и п р д а м р м п а ц е е а а й и р з ц м ю ы е и я п н ю т р и к т ь е з о а о ] з д ц е п а и L н а " и a т м , n а я g ц т д G и и о r ю , б a а p п н в h о е л " я э и е т щ т о е й т р е т з л е а е м н в е о а " в н о т ! н ] ы е п р и м е р ы ]

Memory = способность агента помнить контекст и учиться!


Часть 1: Что такое Memory?

Типы памяти

# 1. Short-term Memory (Краткосрочная)
# Хранится в state, живёт только во время выполнения
state = {
    "messages": ["User: Hello", "Agent: Hi!"],
    "current_context": "Discussing LangGraph"
}

# 2. Long-term Memory (Долгосрочная)
# Сохраняется между сессиями (checkpointing)
checkpoint = {
    "session_id": "user_123",
    "history": [...],
    "learned_facts": [...]
}

# 3. Working Memory (Рабочая)
# Активная информация для текущей задачи
working_memory = {
    "current_task": "Write article",
    "gathered_info": [...],
    "progress": 0.5
}

Часть 2: Реализация Memory в State

Расширенная State-схема

from typing import TypedDict, List, Dict, Optional
from datetime import datetime


class Message(TypedDict):
    """Сообщение в истории."""
    role: str  # "user", "agent", "system"
    content: str
    timestamp: str
    metadata: Optional[Dict]


class AgentState(TypedDict, total=False):
    # Основные поля
    task: str
    plan: List[str]
    current_step: int
    step_result: str
    status: str
    iteration: int
    max_iterations: int
    final_result: str
    
    # 🆕 Memory поля
    messages: List[Message]           # История сообщений
    context: Dict[str, any]           # Контекст разговора
    learned_facts: List[str]          # Изученные факты
    tools_history: List[Dict]         # История использования tools
    session_id: Optional[str]         # ID сессии для checkpointing

Работа с историей сообщений

def add_message(state: AgentState, role: str, content: str) -> AgentState:
    """Добавляет сообщение в историю."""
    updated = dict(state)
    messages = updated.get("messages", [])
    
    message: Message = {
        "role": role,
        "content": content,
        "timestamp": datetime.now().isoformat(),
        "metadata": {}
    }
    
    messages.append(message)
    updated["messages"] = messages
    
    return updated


def get_recent_messages(state: AgentState, n: int = 5) -> List[Message]:
    """Получает последние N сообщений."""
    messages = state.get("messages", [])
    return messages[-n:]


def get_context_summary(state: AgentState) -> str:
    """Создаёт краткое резюме контекста."""
    messages = state.get("messages", [])
    learned_facts = state.get("learned_facts", [])
    
    summary = f"Messages: {len(messages)}, Facts learned: {len(learned_facts)}"
    return summary

Часть 3: Checkpointing

Что такое Checkpointing?

Checkpointing — сохранение состояния графа для возможности:

  • Прод��лжить выполнение позже
  • Откатиться к предыдущему состоянию
  • Сохранить историю между сессиями

Простой Checkpointer

import json
from pathlib import Path


class SimpleCheckpointer:
    """Простой checkpointer для сохранения состояния."""
    
    def __init__(self, checkpoint_dir: str = ".checkpoints"):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)
    
    def save(self, session_id: str, state: AgentState) -> None:
        """Сохраняет состояние."""
        filepath = self.checkpoint_dir / f"{session_id}.json"
        
        # Конвертируем state в JSON-сериализуемый формат
        serializable_state = self._make_serializable(state)
        
        with open(filepath, 'w') as f:
            json.dump(serializable_state, f, indent=2)
        
        print(f"💾 Checkpoint saved: {session_id}")
    
    def load(self, session_id: str) -> Optional[AgentState]:
        """Загружает состояние."""
        filepath = self.checkpoint_dir / f"{session_id}.json"
        
        if not filepath.exists():
            return None
        
        with open(filepath, 'r') as f:
            state = json.load(f)
        
        print(f"📂 Checkpoint loaded: {session_id}")
        return state
    
    def _make_serializable(self, obj):
        """Делает объект JSON-сериализуемым."""
        if isinstance(obj, dict):
            return {k: self._make_serializable(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [self._make_serializable(item) for item in obj]
        else:
            return obj

Использование Checkpointer

checkpointer = SimpleCheckpointer()

# Сохранение
state = {
    "task": "Write article",
    "messages": [...],
    "session_id": "user_123"
}
checkpointer.save("user_123", state)

# Загрузка
loaded_state = checkpointer.load("user_123")
if loaded_state:
    # Продолжаем с сохранённого состояния
    workflow.invoke(loaded_state)

Часть 4: Агенты с Memory

Planner с Memory

def planner_agent_with_memory(state: AgentState) -> AgentState:
    """Planner Agent с учётом памяти."""
    updated = dict(state)
    task = state.get("task", "")
    
    # Проверяем, есть ли релевантная информация в памяти
    learned_facts = state.get("learned_facts", [])
    context = state.get("context", {})
    
    print(f"\n📋 Planner: Planning with memory")
    print(f"   Learned facts: {len(learned_facts)}")
    print(f"   Context keys: {list(context.keys())}")
    
    # Создаём план с учётом контекста
    plan = [
        f"Step 1: Review learned facts about '{task}'",
        f"Step 2: Research new information about '{task}'",
        f"Step 3: Synthesize with existing knowledge"
    ]
    
    updated["plan"] = plan
    updated["current_step"] = 0
    updated["status"] = "executing"
    
    # Добавляем сообщение в историю
    updated = add_message(updated, "agent", f"Created plan for: {task}")
    
    return updated

Executor с Memory

def executor_agent_with_memory(state: AgentState) -> AgentState:
    """Executor Agent с сохранением в память."""
    updated = dict(state)
    plan = state.get("plan", [])
    current_step = state.get("current_step", 0)
    
    if current_step >= len(plan):
        updated["status"] = "done"
        updated["final_result"] = state.get("step_result", "Task completed")
        return updated
    
    step = plan[current_step]
    
    # Выполняем шаг
    result = f"Completed: {step}"
    
    # Сохраняем результат в память
    learned_facts = updated.get("learned_facts", [])
    learned_facts.append(f"From step {current_step + 1}: {result}")
    updated["learned_facts"] = learned_facts
    
    # Обновляем контекст
    context = updated.get("context", {})
    context[f"step_{current_step}"] = result
    updated["context"] = context
    
    # Сохраняем в историю tools
    tools_history = updated.get("tools_history", [])
    tools_history.append({
        "step": current_step,
        "action": step,
        "result": result,
        "timestamp": datetime.now().isoformat()
    })
    updated["tools_history"] = tools_history
    
    updated["step_result"] = result
    updated["status"] = "critiquing"
    updated["iteration"] = updated.get("iteration", 0) + 1
    
    # Добавляем в историю сообщений
    updated = add_message(updated, "agent", f"Executed: {step}")
    
    print(f"\n⚙️  Executor: Step {current_step + 1} (with memory)")
    print(f"   Learned facts: {len(learned_facts)}")
    print(f"   Context size: {len(context)}")
    
    return updated

Critic с Memory

def critic_agent_with_memory(state: AgentState) -> AgentState:
    """Critic Agent с анализом истории."""
    updated = dict(state)
    step_result = state.get("step_result", "")
    current_step = state.get("current_step", 0)
    
    # Анализируем историю для лучшей оценки
    tools_history = state.get("tools_history", [])
    learned_facts = state.get("learned_facts", [])
    
    print(f"\n🔍 Critic: Evaluating with memory")
    print(f"   Previous actions: {len(tools_history)}")
    print(f"   Knowledge base: {len(learned_facts)} facts")
    
    # Оценка с учётом истории
    if len(tools_history) > 3:
        critique = "Good progress, moving to next step"
        decision = "next"
        updated["current_step"] = current_step + 1
    else:
        critique = "Need more information"
        decision = "retry"
    
    updated["critique"] = critique
    
    # Определяем статус
    plan = state.get("plan", [])
    if decision == "retry":
        updated["status"] = "executing"
    elif updated["current_step"] < len(plan):
        updated["status"] = "executing"
    else:
        updated["status"] = "done"
        updated["final_result"] = step_result
    
    # Добавляем в ��сторию
    updated = add_message(updated, "agent", f"Critique: {critique}")
    
    return updated

Часть 5: Финальный Pipeline

Полная система

from langgraph.graph import StateGraph, END


def create_full_pipeline_with_memory():
    """Создаёт полный pipeline с memory и checkpointing."""
    
    graph = StateGraph(AgentState)
    
    # Добавляем агентов с memory
    graph.add_node("planner", planner_agent_with_memory)
    graph.add_node("executor", executor_agent_with_memory)
    graph.add_node("critic", critic_agent_with_memory)
    
    # Связываем
    graph.set_entry_point("planner")
    graph.add_edge("planner", "executor")
    graph.add_edge("executor", "critic")
    
    graph.add_conditional_edges(
        "critic",
        should_continue,
        {
            "execute": "executor",
            "critique": "critic",
            "end": END
        }
    )
    
    return graph.compile()


def run_with_checkpointing(task: str, session_id: str):
    """Запуск с checkpointing."""
    
    checkpointer = SimpleCheckpointer()
    workflow = create_full_pipeline_with_memory()
    
    # Пытаемся загрузить предыдущее состояние
    state = checkpointer.load(session_id)
    
    if state:
        print(f"📂 Resuming session: {session_id}")
        # Продолжаем с сохранённого состояния
        state["task"] = task  # Обновляем задачу
    else:
        print(f"🆕 Starting new session: {session_id}")
        # Новая сессия
        state = {
            "task": task,
            "iteration": 0,
            "max_iterations": 10,
            "status": "planning",
            "session_id": session_id,
            "messages": [],
            "learned_facts": [],
            "tools_history": [],
            "context": {}
        }
    
    # Выполняем
    result = workflow.invoke(state)
    
    # Сохраняем checkpoint
    checkpointer.save(session_id, result)
    
    return result

Пример использования

def main():
    """Пример полного pipeline с memory."""
    
    print("="*60)
    print("🚀 Full Pipeline with Memory & Checkpointing")
    print("="*60)
    
    # Первый запуск
    result1 = run_with_checkpointing(
        task="Research LangGraph",
        session_id="session_001"
    )
    
    print("\n" + "="*60)
    print("📊 Session 1 Result")
    print("="*60)
    print(f"Messages: {len(result1.get('messages', []))}")
    print(f"Learned facts: {len(result1.get('learned_facts', []))}")
    print(f"Tools history: {len(result1.get('tools_history', []))}")
    
    # Второй запуск (продолжение)
    print("\n" + "="*60)
    print("🔄 Continuing session...")
    print("="*60)
    
    result2 = run_with_checkpointing(
        task="Create presentation about LangGraph",
        session_id="session_001"  # Тот же session_id!
    )
    
    print("\n" + "="*60)
    print("📊 Session 2 Result")
    print("="*60)
    print(f"Messages: {len(result2.get('messages', []))}")
    print(f"Learned facts: {len(result2.get('learned_facts', []))}")
    print(f"Context preserved: {len(result2.get('context', {}))}")

Ключевые выводы

Memory — способность агентов помнить контекст
Messages history — история всех сообщений
Learned facts — накопление знаний
Checkpointing — сохранение состояния между сессиями
Context — активная рабочая память
Tools history — история использования инструментов

🤖 Связь с агентами

Сегодня вы создали:

  • ✅ Агентов с памятью
  • ✅ Систему сохранения состояния
  • ✅ Полный self-improving pipeline
  • ✅ Deliverable Week 1 — готовая система!

Следующий шаг: Неделя 2 — Multi-Agent системы с Supervisor и Router!


Следующий шаг: День 6

В следующей статье мы изучим:

  • Subgraphs — вложенные графы
  • Supervisor — координатор агентов
  • Иерархическая архитектура

👉 День 6: Subgraphs и Supervisor (скоро)


Ресурсы


Прогресс: 5/10 дней ✅
🎉 Deliverable Week 1 Complete!