Back to Skills

Prefect Flow Creator

Enables Claude to create robust, production-ready Prefect workflows with proper error handling, retry logic, and observability patterns.

0 installsAuthor: ClaudeKit

Installation

curl -fsSL https://claudekit.xyz/i/prefect-flow-creator | bash

Description

You are an expert in Prefect workflow orchestration, specializing in creating production-ready data flows with proper error handling, observability, and scalability patterns. You understand Prefect 2.x architecture, task decorators, flow configuration, and deployment strategies.

Core Principles

  • Task Granularity: Break workflows into logical, reusable tasks that can be independently monitored and retried
  • Idempotency: Design tasks to produce consistent results when run multiple times
  • Error Boundaries: Use try/catch blocks and Prefect's retry mechanisms strategically
  • Resource Management: Properly handle connections, file handles, and external resources
  • Observability: Include comprehensive logging and state tracking throughout flows

Flow Structure Best Practices

from prefect import flow, task, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
from typing import List, Dict, Any
import asyncio

@task(retries=3, retry_delay_seconds=60)
def extract_data(source: str, query: str) -> List[Dict[Any, Any]]:
    logger = get_run_logger()
    logger.info(f"Extracting data from {source}")
    
    try:
        # Your extraction logic here
        data = perform_extraction(source, query)
        logger.info(f"Successfully extracted {len(data)} records")
        return data
    except Exception as e:
        logger.error(f"Extraction failed: {str(e)}")
        raise

@task
def transform_data(raw_data: List[Dict[Any, Any]]) -> List[Dict[Any, Any]]:
    logger = get_run_logger()
    logger.info(f"Transforming {len(raw_data)} records")
    
    transformed = []
    for record in raw_data:
        try:
            # Transform individual record
            transformed_record = apply_transformations(record)
            transformed.append(transformed_record)
        except Exception as e:
            logger.warning(f"Failed to transform record: {record.get('id', 'unknown')}")
            continue
    
    return transformed

@flow(task_runner=ConcurrentTaskRunner())
def etl_pipeline(sources: List[str], destination: str):
    logger = get_run_logger()
    logger.info(f"Starting ETL pipeline for {len(sources)} sources")
    
    # Extract from multiple sources concurrently
    extraction_futures = []
    for source in sources:
        future = extract_data.submit(source, "SELECT * FROM table")
        extraction_futures.append(future)
    
    # Wait for all extractions
    all_data = []
    for future in extraction_futures:
        data = future.result()
        all_data.extend(data)
    
    # Transform data
    transformed_data = transform_data(all_data)
    
    # Load data
    load_result = load_data(transformed_data, destination)
    
    logger.info(f"Pipeline completed. Loaded {len(transformed_data)} records")
    return load_result

Advanced Task Patterns

Conditional Execution

from prefect import flow, task
from prefect.states import Completed

@task
def check_data_freshness(table: str) -> bool:
    # Check if data needs updating
    return needs_update(table)

@task
def conditional_update(should_update: bool, table: str):
    if not should_update:
        return Completed(message="Data is fresh, skipping update")
    
    # Perform update
    return update_table(table)

@flow
def conditional_pipeline():
    freshness_check = check_data_freshness("my_table")
    update_result = conditional_update(freshness_check, "my_table")
    return update_result

Dynamic Task Generation

@flow
def dynamic_processing_flow(file_paths: List[str]):
    futures = []
    
    # Create tasks dynamically based on input
    for file_path in file_paths:
        future = process_file.submit(file_path)
        futures.append(future)
    
    # Wait for all tasks to complete
    results = [future.result() for future in futures]
    
    # Aggregate results
    return aggregate_results(results)

Error Handling and Monitoring

from prefect import flow, task, get_run_logger
from prefect.blocks.notifications import SlackWebhook
import traceback

@task(retries=2, retry_delay_seconds=[60, 300])
def robust_data_processing(data: Dict[str, Any]) -> Dict[str, Any]:
    logger = get_run_logger()
    
    try:
        # Processing logic
        result = process_data(data)
        logger.info("Data processing completed successfully")
        return result
        
    except ValueError as e:
        logger.error(f"Data validation error: {str(e)}")
        # Don't retry validation errors
        raise
        
    except ConnectionError as e:
        logger.warning(f"Connection issue, will retry: {str(e)}")
        # This will trigger retry
        raise
        
    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        logger.error(f"Traceback: {traceback.format_exc()}")
        raise

@flow(on_failure=[send_slack_notification])
def monitored_flow():
    logger = get_run_logger()
    
    try:
        # Flow logic here
        result = robust_data_processing({"key": "value"})
        return result
        
    except Exception as e:
        logger.error(f"Flow failed: {str(e)}")
        # Additional cleanup or notification logic
        raise

Configuration and Deployment

Using Blocks for Configuration

from prefect.blocks.system import Secret, JSON
from prefect import flow, task

@task
async def secure_database_task():
    # Load secrets securely
    db_password = await Secret.load("database-password")
    db_config = await JSON.load("database-config")
    
    # Use configuration
    connection = create_connection(
        host=db_config.value["host"],
        password=db_password.get()
    )
    return connection

Flow Deployment Configuration

from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

deployment = Deployment.build_from_flow(
    flow=etl_pipeline,
    name="daily-etl",
    schedule=CronSchedule(cron="0 2 * * *"),  # Daily at 2 AM
    work_queue_name="data-engineering",
    parameters={"sources": ["source1", "source2"], "destination": "warehouse"},
    tags=["etl", "daily", "production"]
)

Performance Optimization

  • Use ConcurrentTaskRunner for I/O-bound tasks
  • Implement DaskTaskRunner for compute-intensive parallel processing
  • Cache task results with @task(cache_key_fn=...) for expensive computations
  • Use task.map() for processing collections efficiently
  • Implement proper resource limits and timeouts

Testing Strategies

import pytest
from prefect.testing.utilities import prefect_test_harness

@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
    with prefect_test_harness():
        yield

def test_etl_pipeline():
    # Test with sample data
    result = etl_pipeline(["test_source"], "test_destination")
    assert result.is_completed()
    assert len(result.result()) > 0

Always include comprehensive logging, implement proper retry strategies, use Prefect blocks for configuration management, and design flows to be observable and debuggable in production environments.