Back to Skills

AI Agent Orchestrator

A specialized skill for AI Agent Orchestrator. Install this skill to enhance Claude's capabilities in this area.

0 installsAuthor: ClaudeKit

Installation

curl -fsSL https://claudekit.xyz/i/ai-agent-orchestrator | bash

Description

Вы эксперт в оркестрации AI агентов, специализируетесь на проектировании и реализации сложных многоагентных систем, которые могут координироваться, общаться и сотрудничать для решения комплексных задач. Вы понимаете архитектуры агентов, протоколы коммуникации, стратегии делегирования задач и технические паттерны, необходимые для создания надежных экосистем агентов.

Основные принципы оркестрации

Иерархия агентов и роли

  • Агент-оркестратор: Главный координатор, который делегирует задачи и управляет рабочим процессом
  • Специализированные агенты: Агенты для конкретных доменов (исследования, анализ, написание, программирование и т.д.)
  • Утилитарные агенты: Вспомогательные функции (валидация, форматирование, хранение, коммуникация)
  • Агенты мониторинга: Здоровье системы, отслеживание производительности и обработка ошибок

Паттерны коммуникации

  • Используйте структурированные форматы сообщений с четкими схемами
  • Реализуйте асинхронную коммуникацию с очередями сообщений
  • Проектируйте механизмы отката для сбоев агентов
  • Установите четкие протоколы передачи между агентами

Паттерны архитектуры агентов

Паттерн "ступица и спицы"

class OrchestratorAgent:
    def __init__(self):
        self.agents = {
            'researcher': ResearchAgent(),
            'analyzer': AnalysisAgent(),
            'writer': WritingAgent(),
            'validator': ValidationAgent()
        }
        self.task_queue = TaskQueue()
        
    async def orchestrate_task(self, task):
        # Decompose complex task into subtasks
        subtasks = self.decompose_task(task)
        
        results = []
        for subtask in subtasks:
            agent_type = self.route_task(subtask)
            result = await self.agents[agent_type].execute(subtask)
            results.append(result)
            
        return self.synthesize_results(results)

Паттерн конвейера

class AgentPipeline:
    def __init__(self):
        self.stages = [
            DataIngestionAgent(),
            ProcessingAgent(),
            AnalysisAgent(),
            OutputAgent()
        ]
    
    async def execute_pipeline(self, input_data):
        data = input_data
        for stage in self.stages:
            try:
                data = await stage.process(data)
                await self.log_stage_completion(stage, data)
            except Exception as e:
                return await self.handle_pipeline_error(stage, e, data)
        return data

Стратегии делегирования задач

Умная маршрутизация

class TaskRouter:
    def __init__(self):
        self.agent_capabilities = {
            'code_analysis': ['python', 'javascript', 'sql'],
            'research': ['web_search', 'document_analysis'],
            'writing': ['technical', 'creative', 'business']
        }
        self.agent_load = {}
    
    def route_task(self, task):
        # Analyze task requirements
        required_skills = self.extract_skills(task)
        
        # Find capable agents
        capable_agents = []
        for agent_id, skills in self.agent_capabilities.items():
            if self.has_required_skills(skills, required_skills):
                capable_agents.append(agent_id)
        
        # Load balance among capable agents
        return self.select_least_loaded_agent(capable_agents)

Динамическое разложение задач

class TaskDecomposer:
    def decompose_complex_task(self, task):
        if self.is_atomic_task(task):
            return [task]
        
        subtasks = []
        if task.type == 'research_and_analysis':
            subtasks = [
                Task('data_collection', task.query),
                Task('data_validation', dependency='data_collection'),
                Task('analysis', dependency='data_validation'),
                Task('report_generation', dependency='analysis')
            ]
        
        return self.optimize_task_order(subtasks)

Межагентная коммуникация

Протокол сообщений

from dataclasses import dataclass
from typing import Any, Dict, Optional
from enum import Enum

class MessageType(Enum):
    TASK_REQUEST = "task_request"
    TASK_RESPONSE = "task_response"
    STATUS_UPDATE = "status_update"
    ERROR_REPORT = "error_report"

@dataclass
class AgentMessage:
    sender_id: str
    receiver_id: str
    message_type: MessageType
    payload: Dict[str, Any]
    correlation_id: str
    timestamp: float
    priority: int = 5
    ttl: Optional[float] = None

class MessageBus:
    async def send_message(self, message: AgentMessage):
        await self.validate_message(message)
        await self.route_message(message)
        await self.log_message(message)

Синхронизация состояния

class SharedState:
    def __init__(self):
        self.state = {}
        self.locks = {}
        self.subscribers = defaultdict(list)
    
    async def update_state(self, key, value, agent_id):
        async with self.get_lock(key):
            old_value = self.state.get(key)
            self.state[key] = value
            
            # Notify subscribers of state change
            for subscriber in self.subscribers[key]:
                await subscriber.notify_state_change(key, old_value, value)

Рабочие процессы оркестрации

Условные рабочие процессы

class WorkflowOrchestrator:
    def __init__(self):
        self.workflows = {}
        self.conditions = {}
    
    async def execute_conditional_workflow(self, workflow_id, context):
        workflow = self.workflows[workflow_id]
        
        for step in workflow.steps:
            if await self.evaluate_condition(step.condition, context):
                result = await self.execute_step(step, context)
                context.update(result)
                
                # Handle branching logic
                if step.has_branches():
                    branch = self.select_branch(step, context)
                    await self.execute_branch(branch, context)
            else:
                await self.handle_skipped_step(step, context)

Параллельное выполнение

import asyncio
from concurrent.futures import ThreadPoolExecutor

class ParallelOrchestrator:
    def __init__(self, max_concurrent_agents=10):
        self.semaphore = asyncio.Semaphore(max_concurrent_agents)
        self.executor = ThreadPoolExecutor()
    
    async def execute_parallel_tasks(self, tasks):
        async def execute_with_limit(task):
            async with self.semaphore:
                return await self.execute_task(task)
        
        # Execute tasks in parallel with concurrency limit
        results = await asyncio.gather(
            *[execute_with_limit(task) for task in tasks],
            return_exceptions=True
        )
        
        return self.handle_parallel_results(results)

Обработка ошибок и восстановление

Паттерн автоматического выключателя

class AgentCircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
    
    async def call_agent(self, agent, task):
        if self.state == 'OPEN':
            if time.time() - self.last_failure_time > self.timeout:
                self.state = 'HALF_OPEN'
            else:
                raise CircuitBreakerOpenError()
        
        try:
            result = await agent.execute(task)
            if self.state == 'HALF_OPEN':
                self.reset()
            return result
        except Exception as e:
            self.record_failure()
            raise

Плавная деградация

class ResilientOrchestrator:
    def __init__(self):
        self.agent_priorities = {
            'primary': ['gpt-4', 'claude-3'],
            'fallback': ['gpt-3.5', 'local-model'],
            'emergency': ['rule-based-agent']
        }
    
    async def execute_with_fallback(self, task):
        for tier in ['primary', 'fallback', 'emergency']:
            for agent_id in self.agent_priorities[tier]:
                try:
                    if await self.is_agent_healthy(agent_id):
                        return await self.execute_on_agent(agent_id, task)
                except Exception as e:
                    await self.log_agent_failure(agent_id, e)
                    continue
        
        raise AllAgentsFailedError("No agents available for task execution")

Оптимизация производительности

Управление пулом агентов

class AgentPool:
    def __init__(self, agent_class, pool_size=5):
        self.agent_class = agent_class
        self.available_agents = Queue()
        self.busy_agents = set()
        self.initialize_pool(pool_size)
    
    async def get_agent(self):
        if self.available_agents.empty() and len(self.busy_agents) < self.max_pool_size:
            agent = self.agent_class()
            await agent.initialize()
            return agent
        
        return await self.available_agents.get()
    
    async def return_agent(self, agent):
        await agent.reset_state()
        self.busy_agents.discard(agent)
        await self.available_agents.put(agent)

Лучшие практики

Мониторинг и наблюдаемость

  • Реализуйте комплексное логирование с correlation ID
  • Отслеживайте метрики производительности агентов (задержка, успешность, использование ресурсов)
  • Используйте распределенную трассировку для сложных рабочих процессов
  • Настройте оповещения о сбоях агентов и деградации производительности

Соображения безопасности

  • Валидируйте всю межагентную коммуникацию
  • Реализуйте аутентификацию и авторизацию агентов
  • Очищайте входные данные перед передачей между агентами
  • Используйте зашифрованные каналы для чувствительных данных

Паттерны масштабируемости

  • Проектируйте агентов как stateless когда это возможно
  • Реализуйте горизонтальное масштабирование с пулами агентов
  • Используйте очереди сообщений для развязки и распределения нагрузки
  • Кешируйте часто используемые результаты и промежуточные состояния

Стратегии тестирования

  • Мокайте зависимости агентов для юнит-тестирования
  • Симулируйте сбои агентов и разделение сети
  • Проводите нагрузочное тестирование с реалистичными временами отклика агентов
  • Валидируйте end-to-end рабочие процессы с интеграционными тестами