# Python + Claude SDK: ETL Pipelines for Data Warehouses
## Introduction
Extract-Transform-Load (ETL) pipelines are the backbone of modern data engineering, powering analytics in data warehouses like Snowflake and BigQuery. But the transform step often bogs down teams with tedious cleaning, deduplication, and enrichment tasks. Enter Claude SDK: Anthropic's powerful Python library for Claude AI models.
Claude excels at reasoning over structured and unstructured data, generating cleaning logic, inferring schemas, and even suggesting business rules—all via natural language prompts. In this guide, we'll build a production-ready ETL pipeline that:
- Extracts data from CSV or APIs
- Uses Claude-3.5-Sonnet to transform (clean, enrich, validate)
- Loads into Snowflake or BigQuery
By the end, you'll have a scalable script handling real-world data messiness. Perfect for developers, data engineers, and business analysts automating workflows.
**Why Claude for ETL?**
- **Superior reasoning**: Handles edge cases like fuzzy matching or anomaly detection better than rigid rules.
- **Cost-effective**: Pay-per-token, no infrastructure needed.
- **Flexible**: Prompt-engineer for domain-specific transforms (e.g., HR data normalization).
Let's dive in.
## Prerequisites
Before coding, set up:
- Python 3.10+
- Anthropic API key (from [console.anthropic.com](https://console.anthropic.com))
- Snowflake or BigQuery account with credentials
- Libraries: `pip install anthropic pandas snowflake-connector-python google-cloud-bigquery`
Store secrets securely:
```bash
export ANTHROPIC_API_KEY='your-key'
export SNOWFLAKE_ACCOUNT='your-account' # etc.
```
## Step 1: Project Setup
Create `etl_claude.py`:
```python
import os
import json
import pandas as pd
from typing import List, Dict, Any
import anthropic
from snowflake.connector import connect # For Snowflake
# from google.cloud import bigquery # Uncomment for BigQuery
client = anthropic.Anthropic(api_key=os.getenv('ANTHROPIC_API_KEY'))
# Config
MODEL = 'claude-3-5-sonnet-20241022'
MAX_TOKENS = 4000
```
This initializes the Claude client. Claude-3.5-Sonnet is ideal for data tasks due to its 200K context window.
## Step 2: Extract Data
Start simple: Load a messy sales CSV.
```python
def extract_data(file_path: str) -> pd.DataFrame:
df = pd.read_csv(file_path)
print(f"Extracted {len(df)} rows")
return df
# Example usage
df_raw = extract_data('sales_data.csv')
```
Assume `sales_data.csv` has issues: inconsistent dates, duplicate emails, missing categories.
For APIs (e.g., Stripe), adapt with `requests`:
```python
import requests
def extract_from_api(url: str, params: Dict) -> pd.DataFrame:
resp = requests.get(url, params=params)
return pd.DataFrame(resp.json())
```
## Step 3: Transform with Claude
Here's the magic. Serialize data to JSON, prompt Claude for transforms.
```python
def claude_transform(df: pd.DataFrame, task: str) -> pd.DataFrame:
# Sample 100 rows for cost-efficiency; scale with batching
sample = df.head(100).to_json(orient='records', lines=True)
prompt = f"""You are a data engineer. Transform this JSON sales data for warehouse loading.
Task: {task}
Data (first 100 rows):
{sample}
Respond ONLY with valid JSON array of transformed records. Fix:
- Standardize dates to YYYY-MM-DD
- Deduplicate by email
- Categorize 'product_type' (e.g., 'electronics' -> 'Tech')
- Fill missing revenue with median
- Flag anomalies (e.g., revenue > 10000)
Output format: [{{"email": str, "date": str, "product_type": str, "revenue": float, "anomaly_flag": bool}}]"""
response = client.messages.create(
model=MODEL,
max_tokens=MAX_TOKENS,
messages=[{"role": "user", "content": prompt}]
)
transformed_json = json.loads(response.content[0].text)
return pd.DataFrame(transformed_json)
```
**Key Prompt Tips**:
- Be specific: List exact fixes.
- Output JSON only: Use `Respond ONLY with...` to parse easily.
- Context: Include schema or examples.
Test it:
```python
df_clean = claude_transform(df_raw, "Clean sales data as described")
print(df_clean.head())
```
Claude infers rules, handles variations (e.g., '01/02/23' vs '2023-02-01'). For full datasets, batch in chunks:
```python
def batch_transform(df: pd.DataFrame, batch_size: int = 500) -> pd.DataFrame:
results = []
for i in range(0, len(df), batch_size):
batch = df.iloc[i:i+batch_size]
results.append(claude_transform(batch, "..."))
return pd.concat(results, ignore_index=True)
```
## Step 4: Enrich Data with Claude Insights
Go beyond cleaning—generate features.
```python
def claude_enrich(df: pd.DataFrame) -> pd.DataFrame:
agg_prompt = f"""Analyze this sales data summary and generate enrichments:
{df.describe().to_json()}
Output JSON: [{{"avg_revenue": float, "top_category": str, "churn_risk": str}}] (one row total)"""
resp = client.messages.create(
model=MODEL,
max_tokens=1000,
messages=[{"role": "user", "content": agg_prompt}]
)
enrichment = json.loads(resp.content[0].text)[0]
df['avg_revenue'] = enrichment['avg_revenue']
df['top_category'] = enrichment['top_category']
df['churn_risk'] = enrichment['churn_risk']
return df
```
This adds ML-like insights without models.
## Step 5: Load to Data Warehouse
### Snowflake
```python
def load_snowflake(df: pd.DataFrame, table: str):
conn = connect(
user=os.getenv('SNOWFLAKE_USER'),
password=os.getenv('SNOWFLAKE_PASSWORD'),
account=os.getenv('SNOWFLAKE_ACCOUNT'),
warehouse='COMPUTE_WH',
database='MY_DB',
schema='PUBLIC'
)
cursor = conn.cursor()
cursor.execute(f"CREATE OR REPLACE TABLE {table} AS SELECT * FROM VALUES {df.to_records(index=False).tolist()}")
# Better: Use COPY INTO for large data
df.to_sql(table, conn, index=False, if_exists='replace', method='multi')
conn.close()
print(f"Loaded {len(df)} rows to Snowflake.{table}")
```
### BigQuery
```python
def load_bigquery(df: pd.DataFrame, dataset_id: str, table_id: str):
from google.cloud import bigquery
client = bigquery.Client()
table_ref = client.dataset(dataset_id).table(table_id)
job = client.load_table_from_dataframe(df, table_ref)
job.result() # Wait
print(f"Loaded {len(df)} rows to BigQuery.{dataset_id}.{table_id}")
```
## Full Pipeline
Tie it together:
```python
def run_etl(file_path: str, warehouse: str = 'snowflake', table: str = 'sales_clean'):
df = extract_data(file_path)
df = batch_transform(df)
df = claude_enrich(df)
if warehouse == 'snowflake':
load_snowflake(df, table)
elif warehouse == 'bigquery':
load_bigquery(df, 'my_dataset', table)
# Run
run_etl('sales_data.csv')
```
## Error Handling & Best Practices
- **Rate Limits**: Claude API: 50 RPM. Use `time.sleep(1)` in loops.
- **Cost Optimization**: Sample for schema inference, rules for bulk.
- **Validation**: Post-Claude, run `df.isnull().sum()`.
- **Logging**: Use `logging` module.
- **Prompt Engineering**: Chain prompts—first infer schema, then transform.
Example chain:
```python
schema_prompt = "Infer schema from this data: ... Output JSON schema."
# Use schema in next prompt
```
- **Scalability**: Integrate with Airflow/Dagster for orchestration.
- **Security**: Never send PII to Claude without anonymization.
**Token Estimation**: 100 rows ~1K input tokens, $0.003/1K input (Sonnet).
## Real-World Example: HR Data
Adapt for employee onboarding CSV: Claude normalizes names, extracts skills from resumes, flags compliance issues.
Prompt tweak: "Parse resumes, extract skills, categorize as 'Python', 'Leadership'..."
## Conclusion
You've built a Claude-powered ETL pipeline that's smarter than traditional scripts. Iterate prompts for your domain—marketing leads, legal docs, engineering metrics. Deploy via Docker/Lambda for automation.
Next steps:
- Monitor with LangSmith or custom logging.
- Experiment with Claude Opus for complex reasoning.
- Check Anthropic docs for streaming responses in long jobs.
Fork this on GitHub, share your variants in comments!
*Word count: ~1450*