# The Problem with Fragile AI Workflows
Hey there, fellow AI builder! Ever poured hours into crafting a slick AI agent, only for it to crumble when the API hiccups, a task times out, or the whole system scales up? In the world of business workflows—like processing e-commerce orders or automating customer support—**stateful AI** is the holy grail. But without proper orchestration, you're stuck with flaky scripts that lose context, retry manually, or just plain fail.
Traditional tools like Airflow or Celery handle scheduling, but they falter on **durable execution**: what if your server restarts mid-way through a Claude-powered analysis? Enter **Temporal**, the open-source workflow engine that guarantees completion, even across failures. Paired with **Claude API**, it unlocks resilient, scalable AI pipelines. Today, we'll build one for e-commerce order processing—complete with retries, state management, and real Python code.
## Why Stateful AI Workflows Need Temporal
AI isn't like a simple cron job. Claude models (Opus for deep reasoning, Sonnet for speed) shine in complex tasks: intent classification, content generation, decision-making. But:
- **Non-determinism**: Claude might hallucinate or vary outputs.
- **External dependencies**: Inventory checks, payments—APIs fail 10-20% of the time.
- **Long-running flows**: An order might need human approval, taking hours or days.
- **State loss**: Stateless functions forget history on crashes.
Temporal solves this with **workflows** (stateful coordinators) and **activities** (idempotent tasks). Key perks:
- **Durable**: Events are persisted; replay on failure.
- **Retries & timeouts**: Built-in backoff for Claude rate limits.
- **Scalability**: Horizontal workers handle millions of workflows.
- **Python SDK**: Native integration with Anthropic's Claude API.
 *(Conceptual diagram: Workflow orchestrates Activities like Claude calls.)*
## Getting Started: Setup Temporal + Claude
First, install Temporal locally (Docker) or use Temporal Cloud. Then, pip install:
```bash
pip install temporalio anthropic
```
Grab API keys:
- Temporal Namespace (default: default)
- Anthropic API key from console.anthropic.com
Start a worker:
```python
# worker.py
from temporalio.worker import Worker
from your_workflows import OrderProcessingWorkflow
async def main():
worker = Worker(
client=await Client.connect("localhost:7233"),
task_queue="order-queue",
workflows=[OrderProcessingWorkflow],
activities=[...], # Define later
)
await worker.run()
if __name__ == "main":
asyncio.run(main())
```
Run with `python worker.py`. Now, let's define activities.
## Building Activities: Claude Meets Real Work
Activities are the workhorses—pure functions with retries. Wrap Claude calls here for fault-tolerance.
### Example 1: Validate Order with Claude
Claude classifies if an order is fraudulent or legit.
```python
# activities.py
import json
from temporalio import activity
from anthropic import Anthropic
claude = Anthropic(api_key="your-anthropic-key")
@activity.defn
async def validate_order(order_data: dict) -> dict:
prompt = f"""
Analyze this e-commerce order for fraud risk:
{json.dumps(order_data, indent=2)}
Respond JSON: {{"risk_score": 0-100, "reason": "str", "approved": bool}}
"""
msg = claude.messages.create(
model="claude-3-5-sonnet-20240620",
max_tokens=300,
messages=[{"role": "user", "content": prompt}]
)
return json.loads(msg.content[0].text)
```
Temporal auto-retries on Claude 429s (rate limits) or timeouts.
### Example 2: Check Inventory (Non-AI Activity)
```python
@activity.defn
async def check_inventory(items: list) -> dict:
# Simulate external API
await asyncio.sleep(0.1) # Network delay
return {"item1": 10, "item2": 0} # Stock levels
```
## Orchestrating with Workflows: The Brain
Workflows maintain state across activities. No local vars—use **Temporal signals/queries** for dynamic updates.
```python
# workflows.py
from temporalio import workflow
from temporalio.common import RetryPolicy
from .activities import validate_order, check_inventory, process_payment, generate_confirmation
@workflow.defn
class OrderProcessingWorkflow:
async def run(self, order_data: dict) -> str:
# Step 1: AI Validation
validation = await workflow.execute_activity(
validate_order,
order_data,
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_attempts=3,
),
)
if not validation["approved"]:
return f"Order rejected: {validation['reason']}"
# Step 2: Inventory Check with Retry
inventory = await workflow.execute_activity(
check_inventory,
order_data["items'],
retry_policy=RetryPolicy(maximum_attempts=5),
)
out_of_stock = [item for item, stock in inventory.items() if stock == 0]
if out_of_stock:
# Signal for human intervention (advanced)
workflow.signal_order_update("out_of_stock", out_of_stock)
return "Pending stock resolution"
# Step 3: Process Payment
payment_result = await workflow.execute_activity(
process_payment,
order_data["payment_info"],
schedule_to_close_timeout=timedelta(minutes=10),
)
# Step 4: Claude Confirmation Email
confirmation = await workflow.execute_activity(
generate_confirmation,
order_data, payment_result,
)
return confirmation
```
**Key wins**:
- **State preserved**: If payment fails, replay from inventory step.
- **Timeouts**: Claude gen won't hang forever.
- **Custom retries**: Exponential backoff for flaky APIs.
## Real-World E-Commerce Pipeline
Imagine a Shopify order webhook triggers this:
```python
# starter.py
from temporalio.client import Client
async def start_order_workflow(order_json: dict):
client = await Client.connect("localhost:7233")
handle = await client.start_workflow(
"OrderProcessingWorkflow",
OrderProcessingWorkflow.run,
order_json,
id=f"order-{order_json['id']}",
task_queue="order-queue",
)
result = await handle.result() # Wait or poll
print(result)
```
**Full flow**:
1. Webhook -> `start_order_workflow`.
2. Claude validates: "High risk? Reject."
3. Inventory: Retry 5x on stock API outage.
4. Payment: Timeout after 10min, alert team.
5. Claude crafts personalized email: "Thanks for your order of artisanal coffee!"
**Fault scenarios handled**:
- Claude rate limit: Auto-retry.
- Inventory down: Backoff + eventual success.
- Power outage: Workflow resumes exactly where left off.
Test it: `tctl workflow show --workflow-id order-123` for history.
## Scaling & Advanced Tricks
**Production scaling**:
- Deploy workers on Kubernetes.
- Use Temporal Cloud for 99.999% uptime.
- **Child workflows**: Nest for sub-orders.
**Claude-Specific Best Practices**:
- **Prompt engineering**: Use XML tags for structured output.
```prompt
<order>{json}</order>
<response><risk_score>num</risk_score>...</response>
```
- **Model selection**: Sonnet for speed, Opus for complex fraud detection.
- **Cost optimization**: Cache common validations as workflow state.
**Queries & Signals**:
Update mid-workflow:
```python
# In starter
await handle.query("get_status")
await handle.signal("cancel_order")
```
**Monitoring**: Temporal UI dashboard + Prometheus.
## Wrapping Up: Your Turn to Build
Temporal + Claude isn't hype—it's production-ready resilience. We've covered setup, code, and e-commerce example, solving real pains like retries and state. Fork the [GitHub repo](https://github.com/example/claude-temporal-ecom) (imagine one), tweak prompts, and deploy.
Questions? Drop 'em in comments. Next: AI agents with MCP servers. Stay tuned!
*Word count: ~1450*