Skip to content

markdown-flow-agent-py

Python backend agent for processing MarkdownFlow templates with AI integration. Built on FastAPI for high performance and async support.

Installation

pip install markdown-flow-agent
# or with poetry
poetry add markdown-flow-agent
# or with pipenv
pipenv install markdown-flow-agent

Quick Start

from fastapi import FastAPI
from markdown_flow_agent import FlowAgent, FlowConfig

app = FastAPI()
agent = FlowAgent(config=FlowConfig(
    llm_provider="openai",
    api_key="your-api-key"
))

@app.post("/process")
async def process_template(template: str, variables: dict):
    result = await agent.process(template, variables)
    return {"content": result.content, "metadata": result.metadata}

# Run with: uvicorn main:app --reload

Configuration

Basic Configuration

from markdown_flow_agent import FlowConfig

config = FlowConfig(
    # LLM Configuration
    llm_provider="openai",  # or "anthropic", "cohere", "local"
    api_key="your-api-key",
    model="gpt-4",          # or "claude-3", etc.

    # Processing Options
    temperature=0.7,
    max_tokens=2000,
    timeout=30,             # seconds

    # Caching
    enable_cache=True,
    cache_ttl=3600,         # seconds

    # Safety
    enable_content_filter=True,
    max_recursion_depth=5
)

agent = FlowAgent(config=config)

Environment Variables

# .env file
MARKDOWNFLOW_LLM_PROVIDER=openai
MARKDOWNFLOW_API_KEY=sk-...
MARKDOWNFLOW_MODEL=gpt-4
MARKDOWNFLOW_TEMPERATURE=0.7
MARKDOWNFLOW_CACHE_ENABLED=true
MARKDOWNFLOW_CACHE_REDIS_URL=redis://localhost:6379
from markdown_flow_agent import FlowAgent

# Automatically loads from environment
agent = FlowAgent()

Core Features

Template Processing

@app.post("/process")
async def process_template(request: ProcessRequest):
    """Process a MarkdownFlow template with variables."""

    result = await agent.process(
        template=request.template,
        variables=request.variables,
        context=request.context  # Optional additional context
    )

    return {
        "content": result.content,
        "variables_used": result.variables_used,
        "ai_calls": result.ai_calls,
        "processing_time": result.processing_time
    }

Variable Management

from markdown_flow_agent import VariableManager

var_manager = VariableManager()

# Extract variables from template
variables = var_manager.extract(template)
# Returns: ['user_name', 'skill_level', ...]

# Validate variables
validation = var_manager.validate(template, provided_variables)
# Returns: {'valid': True/False, 'missing': [...], 'extra': [...]}

# Interpolate variables
content = var_manager.interpolate(template, variables)

User Input Handling

from markdown_flow_agent import InputHandler

input_handler = InputHandler()

# Parse user inputs from template
inputs = input_handler.parse(template)
# Returns: [
#   {'variable': 'choice', 'options': ['A', 'B', 'C']},
#   ...
# ]

# Process user selection
result = input_handler.process_selection(
    variable="choice",
    value="B",
    template=template,
    variables=current_variables
)

AI Integration

# Configure multiple LLM providers
from markdown_flow_agent import MultiProviderAgent

agent = MultiProviderAgent({
    "primary": {
        "provider": "openai",
        "api_key": "sk-...",
        "model": "gpt-4"
    },
    "fallback": {
        "provider": "anthropic",
        "api_key": "sk-ant-...",
        "model": "claude-3"
    }
})

# Process with automatic fallback
result = await agent.process_with_fallback(template, variables)

Advanced Usage

Streaming Responses

from fastapi import StreamingResponse
import asyncio

@app.post("/stream")
async def stream_processing(request: ProcessRequest):
    async def generate():
        async for chunk in agent.stream_process(
            template=request.template,
            variables=request.variables
        ):
            yield f"data: {chunk.json()}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

Batch Processing

@app.post("/batch")
async def batch_process(requests: List[ProcessRequest]):
    """Process multiple templates concurrently."""

    tasks = [
        agent.process(req.template, req.variables)
        for req in requests
    ]

    results = await asyncio.gather(*tasks)
    return {"results": results}

Caching

from markdown_flow_agent import CacheManager
import redis

# Redis caching
redis_client = redis.Redis(host='localhost', port=6379, db=0)
cache_manager = CacheManager(backend=redis_client)

agent = FlowAgent(
    config=FlowConfig(enable_cache=True),
    cache_manager=cache_manager
)

# In-memory caching
from markdown_flow_agent.cache import InMemoryCache

cache = InMemoryCache(max_size=1000, ttl=3600)
agent = FlowAgent(cache=cache)

Middleware

from markdown_flow_agent import Middleware

class LoggingMiddleware(Middleware):
    async def process(self, template, variables, next_handler):
        print(f"Processing template with {len(variables)} variables")
        result = await next_handler(template, variables)
        print(f"Generated {len(result.content)} characters")
        return result

class RateLimitMiddleware(Middleware):
    def __init__(self, max_requests=100):
        self.max_requests = max_requests
        self.requests = {}

    async def process(self, template, variables, next_handler):
        # Implement rate limiting logic
        return await next_handler(template, variables)

# Apply middleware
agent.use(LoggingMiddleware())
agent.use(RateLimitMiddleware(max_requests=100))

Custom Processors

from markdown_flow_agent import Processor

class CustomProcessor(Processor):
    """Custom processor for specific template patterns."""

    def can_process(self, template):
        """Check if this processor can handle the template."""
        return "CUSTOM:" in template

    async def process(self, template, variables):
        """Process the template."""
        # Custom processing logic
        return ProcessResult(
            content="Processed content",
            metadata={"processor": "custom"}
        )

# Register custom processor
agent.register_processor(CustomProcessor())

API Endpoints

Complete FastAPI Application

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List, Dict
import asyncio

app = FastAPI(title="MarkdownFlow API")

class ProcessRequest(BaseModel):
    template: str
    variables: Dict[str, any]
    context: Optional[Dict] = None
    stream: bool = False

class ProcessResponse(BaseModel):
    content: str
    variables_used: List[str]
    processing_time: float
    cached: bool = False

@app.post("/process", response_model=ProcessResponse)
async def process_template(request: ProcessRequest):
    try:
        result = await agent.process(
            template=request.template,
            variables=request.variables,
            context=request.context
        )
        return ProcessResponse(**result.dict())
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/validate")
async def validate_template(template: str):
    """Validate MarkdownFlow syntax."""
    validation = agent.validate(template)
    return {
        "valid": validation.is_valid,
        "errors": validation.errors,
        "warnings": validation.warnings
    }

@app.get("/variables")
async def extract_variables(template: str):
    """Extract variables from template."""
    variables = agent.extract_variables(template)
    return {"variables": variables}

@app.post("/batch")
async def batch_process(requests: List[ProcessRequest]):
    """Process multiple templates."""
    tasks = [
        agent.process(req.template, req.variables)
        for req in requests
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    return {
        "results": [
            r.dict() if not isinstance(r, Exception) else {"error": str(r)}
            for r in results
        ]
    }

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """WebSocket for real-time processing."""
    await websocket.accept()

    try:
        while True:
            data = await websocket.receive_json()

            async for chunk in agent.stream_process(
                template=data["template"],
                variables=data.get("variables", {})
            ):
                await websocket.send_json({
                    "type": "chunk",
                    "content": chunk.content
                })

            await websocket.send_json({"type": "complete"})
    except WebSocketDisconnect:
        pass

Testing

Unit Tests

import pytest
from markdown_flow_agent import FlowAgent

@pytest.fixture
def agent():
    return FlowAgent(config=FlowConfig(
        llm_provider="mock",  # Use mock provider for testing
        enable_cache=False
    ))

@pytest.mark.asyncio
async def test_process_template(agent):
    template = "Hello {{name}}!"
    variables = {"name": "Test"}

    result = await agent.process(template, variables)

    assert "Test" in result.content
    assert "name" in result.variables_used

@pytest.mark.asyncio
async def test_user_input_parsing(agent):
    template = "?[%{{choice}}Yes|No]"

    inputs = agent.parse_inputs(template)

    assert len(inputs) == 1
    assert inputs[0]["variable"] == "choice"
    assert inputs[0]["options"] == ["Yes", "No"]

@pytest.mark.asyncio
async def test_conditional_processing(agent):
    template = """
    If {{level}} is "beginner":
        Start with basics.
    If {{level}} is "advanced":
        Dive into complex topics.
    """

    result = await agent.process(template, {"level": "beginner"})
    assert "basics" in result.content.lower()

Integration Tests

from fastapi.testclient import TestClient

client = TestClient(app)

def test_process_endpoint():
    response = client.post("/process", json={
        "template": "Hello {{name}}!",
        "variables": {"name": "World"}
    })

    assert response.status_code == 200
    assert "World" in response.json()["content"]

def test_websocket():
    with client.websocket_connect("/ws") as websocket:
        websocket.send_json({
            "template": "Generate story for {{character}}",
            "variables": {"character": "Alice"}
        })

        chunks = []
        while True:
            data = websocket.receive_json()
            if data["type"] == "complete":
                break
            chunks.append(data["content"])

        assert len(chunks) > 0

Performance Optimization

Async Processing

# Process multiple templates concurrently
async def process_many(templates_and_vars):
    tasks = [
        agent.process(t, v)
        for t, v in templates_and_vars
    ]
    return await asyncio.gather(*tasks)

Connection Pooling

import httpx

# Configure connection pooling for LLM API calls
client = httpx.AsyncClient(
    limits=httpx.Limits(
        max_keepalive_connections=20,
        max_connections=100
    )
)

agent = FlowAgent(
    config=FlowConfig(http_client=client)
)

Response Caching

from functools import lru_cache
import hashlib

@lru_cache(maxsize=1000)
def get_cached_result(template_hash, variables_hash):
    # Cache lookup logic
    pass

async def process_with_cache(template, variables):
    template_hash = hashlib.md5(template.encode()).hexdigest()
    variables_hash = hashlib.md5(str(variables).encode()).hexdigest()

    cached = get_cached_result(template_hash, variables_hash)
    if cached:
        return cached

    result = await agent.process(template, variables)
    # Store in cache
    return result

Deployment

Docker

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose

version: "3.8"

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - MARKDOWNFLOW_API_KEY=${API_KEY}
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis

  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

Production Settings

from markdown_flow_agent import FlowConfig

production_config = FlowConfig(
    # Performance
    worker_count=4,
    connection_pool_size=20,

    # Reliability
    retry_attempts=3,
    retry_delay=1.0,
    timeout=30,

    # Security
    enable_content_filter=True,
    allowed_domains=["api.openai.com", "api.anthropic.com"],

    # Monitoring
    enable_metrics=True,
    metrics_port=9090,

    # Logging
    log_level="INFO",
    log_format="json"
)

Monitoring

Prometheus Metrics

from prometheus_client import Counter, Histogram, start_http_server

# Define metrics
request_count = Counter('markdownflow_requests_total', 'Total requests')
request_duration = Histogram('markdownflow_request_duration_seconds', 'Request duration')
error_count = Counter('markdownflow_errors_total', 'Total errors')

# Start metrics server
start_http_server(9090)

# Track metrics
@request_duration.time()
async def process_with_metrics(template, variables):
    request_count.inc()
    try:
        return await agent.process(template, variables)
    except Exception as e:
        error_count.inc()
        raise

Logging

import logging
from markdown_flow_agent import setup_logging

# Configure logging
setup_logging(
    level=logging.INFO,
    format="json",
    output_file="markdownflow.log"
)

# Use structured logging
logger = logging.getLogger("markdownflow")

@app.post("/process")
async def process_template(request: ProcessRequest):
    logger.info("Processing template", extra={
        "template_length": len(request.template),
        "variable_count": len(request.variables)
    })

    result = await agent.process(request.template, request.variables)

    logger.info("Processing complete", extra={
        "content_length": len(result.content),
        "processing_time": result.processing_time
    })

    return result