Loading...
Loading...
This document describes the REST API endpoints available for job executors (workers and serverless functions) to report progress, status, and logs back to the Restep server.
# Restep Job Execution SDK Documentation
This document describes the REST API endpoints available for job executors (workers and serverless functions) to report progress, status, and logs back to the Restep server.
## Base URL
All API endpoints are relative to your Restep server URL:
```
https://your-restep-server.com/api
```
## Authentication
All requests must include an API key in the `Authorization` header:
```
Authorization: Bearer YOUR_API_KEY
```
## Execution Lifecycle
When a job is triggered, an execution record is created with:
- `execution_id`: Unique identifier for this execution instance
- `job_id`: The job being executed
- Initial status: `pending` → `running` → `completed`/`failed`/`timeout`/`cancelled`
## Core SDK Methods
### 1. Heartbeat
Send periodic heartbeats to indicate the execution is still running. If heartbeats stop for more than 5 minutes, the execution will be marked as timed out.
**Endpoint:** `POST /api/executions/:execution_id/heartbeat`
**Request:**
```http
POST /api/executions/123e4567-e89b-12d3-a456-426614174000/heartbeat
Authorization: Bearer YOUR_API_KEY
```
**Response:**
```json
{
"data": {
"id": "123e4567-e89b-12d3-a456-426614174000",
"status": "running",
"last_heartbeat_at": "2025-12-23T22:00:00Z",
...
},
"message": "Heartbeat updated"
}
```
**Usage Pattern (Worker Mode):**
- Send every 30-60 seconds while execution is running
- Maintains WebSocket or long-polling connection
**Usage Pattern (Serverless Mode):**
- Send whenever making progress updates
- Less frequent since serverless functions are typically shorter-lived
---
### 2. Update Progress
Update the overall execution progress percentage and current step information.
**Endpoint:** `POST /api/executions/:execution_id/progress`
**Request Body:**
```json
{
"progress_percent": 45,
"current_step_name": "Processing batch 3",
"current_step_index": 2,
"metadata": {
"records_processed": 3000,
"records_total": 10000
}
}
```
**Response:**
```json
{
"data": {
"id": "123e4567-e89b-12d3-a456-426614174000",
"progress_percent": 45,
"current_step_name": "Processing batch 3",
"current_step_index": 2,
...
},
"message": "Progress updated"
}
```
**Field Descriptions:**
- `progress_percent` (optional): Integer 0-100
- `current_step_name` (optional): Human-readable current step
- `current_step_index` (optional): Zero-based step index
- `metadata` (optional): Custom metadata object
---
### 3. Log Messages
Create log entries for an execution. Supports both single and batch logging.
**Endpoint:** `POST /api/executions/:execution_id/log`
#### Single Log Entry
**Request Body:**
```json
{
"level": "info",
"message": "Started processing user records",
"log_data": {
"batch_size": 100,
"starting_offset": 0
},
"progress_percent": 10,
"execution_step_id": "step-uuid-here"
}
```
**Response:**
```json
{
"status": "ok",
"log_id": "log-uuid-here"
}
```
#### Batch Log Entries
**Request Body:**
```json
{
"logs": [
{
"level": "info",
"message": "Connecting to database"
},
{
"level": "debug",
"message": "Query executed successfully",
"log_data": {
"duration_ms": 45,
"rows_affected": 150
}
},
{
"level": "warn",
"message": "Retry attempt 2 of 3",
"progress_percent": 30
}
]
}
```
**Response:**
```json
{
"status": "ok",
"count": 3
}
```
**Field Descriptions:**
- `level`: One of `"debug"`, `"info"`, `"warn"`, `"error"`
- `message`: Log message (required)
- `log_data` (optional): Structured data/context
- `progress_percent` (optional): Progress at time of log (0-100)
- `execution_step_id` (optional): Associate log with specific step
**Usage Patterns:**
- **Worker Mode**: Stream logs in real-time as they occur
- **Serverless Mode**: Batch logs and send periodically or at completion
---
### 4. Complete Execution
Mark an execution as successfully completed.
**Endpoint:** `POST /api/executions/:execution_id/complete`
**Request Body:**
```json
{
"result_data": {
"records_processed": 10000,
"output_file": "s3://bucket/output.csv",
"summary": "Successfully processed all records"
}
}
```
**Response:**
```json
{
"data": {
"id": "123e4567-e89b-12d3-a456-426614174000",
"status": "completed",
"completed_at": "2025-12-23T22:30:00Z",
"duration_ms": 1800000,
"result_data": { ... }
},
"message": "Execution marked as completed"
}
```
---
### 5. Fail Execution
Mark an execution as failed with error details.
**Endpoint:** `POST /api/executions/:execution_id/fail`
**Request Body:**
```json
{
"error_message": "Database connection timeout",
"error_type": "TimeoutError",
"error_stack": "Error: Database connection timeout\n at Database.connect (db.js:45)\n ..."
}
```
**Response:**
```json
{
"data": {
"id": "123e4567-e89b-12d3-a456-426614174000",
"status": "failed",
"error_message": "Database connection timeout",
"error_type": "TimeoutError",
"retry_at": "2025-12-23T22:35:00Z",
...
},
"message": "Execution marked as failed"
}
```
**Field Descriptions:**
- `error_message` (required): Human-readable error description
- `error_type` (optional): Error class/type (e.g., "ValidationError", "NetworkError")
- `error_stack` (optional): Full stack trace
**Note:** If the execution has remaining retry attempts, it will be automatically scheduled for retry with exponential backoff.
---
### 6. Cancel Execution
Cancel a running execution.
**Endpoint:** `POST /api/executions/:execution_id/cancel`
**Request:** No body required
**Response:**
```json
{
"data": {
"id": "123e4567-e89b-12d3-a456-426614174000",
"status": "cancelled",
"completed_at": "2025-12-23T22:15:00Z"
},
"message": "Execution cancelled"
}
```
---
## Step Management
For jobs with multiple steps, you can track progress of individual steps.
### 7. Mark Step as Running
**Endpoint:** `POST /api/executions/:execution_id/steps/:step_id/running`
**Request:** No body required
**Response:**
```json
{
"status": "ok",
"step": {
"id": "step-uuid",
"status": "running"
}
}
```
---
### 8. Mark Step as Completed
**Endpoint:** `POST /api/executions/:execution_id/steps/:step_id/completed`
**Request Body:**
```json
{
"result_data": {
"rows_processed": 1500,
"output": "Step completed successfully"
},
"artifacts": {
"user_ids": [123, 456, 789],
"processing_summary": "Processed 1500 records",
"output_file": "s3://bucket/output.csv"
}
}
```
**Field Descriptions:**
- `result_data` (optional): Internal step results for logging
- `artifacts` (required for multi-step workflows): Outputs that become inputs to dependent steps. Automatically saved as checkpoint for workflow resumability.
**Response:**
```json
{
"status": "ok",
"step": {
"id": "step-uuid",
"status": "completed"
}
}
```
**Important:** When completing a step:
- Artifacts are automatically saved as checkpoint data
- Dependent steps receive artifacts as input_params
- Checkpoint size is calculated for monitoring
---
### 9. Mark Step as Failed
**Endpoint:** `POST /api/executions/:execution_id/steps/:step_id/failed`
**Request Body:**
```json
{
"error_message": "Validation failed for input data",
"error_type": "ValidationError",
"error_stack": "Error: Validation failed\n at validate (validator.js:12)\n ..."
}
```
**Response:**
```json
{
"status": "ok",
"step": {
"id": "step-uuid",
"status": "failed"
}
}
```
---
### 10. Update Step Progress
**Endpoint:** `POST /api/executions/:execution_id/steps/:step_id/progress`
**Request Body:**
```json
{
"progress_percent": 75,
"metadata": {
"current_record": 7500,
"total_records": 10000
},
"checkpoint_data": {
"last_processed_id": "user-7500",
"cursor": "offset-7500"
}
}
```
**Response:**
```json
{
"status": "ok",
"step": {
"id": "step-uuid",
"progress_percent": 75
}
}
```
**Field Descriptions:**
- `progress_percent` (optional): Integer 0-100
- `metadata` (optional): Custom step metadata
- `checkpoint_data` (optional): Data for resuming step if interrupted
---
## Parameter Passing and Workflow Orchestration
### Input Parameters
Every execution and step can receive input parameters that control their behavior.
#### Job-Level Parameters (input_params)
When triggering a job, you can pass parameters that are accessible to all steps:
**Example: Triggering a job with parameters**
```bash
POST /api/jobs/:job_id/trigger
{
"input_params": {
"user_id": 12345,
"batch_size": 100,
"environment": "production"
}
}
```
The execution receives these as `execution.input_params` and they're available to the first step.
#### Step-Level Parameters
Each step receives `input_params` from either:
1. **Job input** (for the first step)
2. **Previous step artifacts** (for subsequent steps)
3. **Input mapping** (defined in step configuration)
### Artifacts and Data Flow
Artifacts are step outputs that flow to dependent steps as inputs.
**Example workflow:**
```
Step 1: Extract Users
input_params: { batch_size: 100 } ← from job
artifacts: { user_ids: [1,2,3], count: 150 }
Step 2: Process Users
input_params: { user_ids: [1,2,3], count: 150 } ← from Step 1 artifacts
artifacts: { processed: 150, output_file: "s3://..." }
Step 3: Send Notifications
input_params: { processed: 150, output_file: "s3://..." } ← from Step 2 artifacts
```
### Input Mapping
Steps define how to construct their inputs from previous steps using `input_mapping`:
```json
{
"name": "process_users",
"step_index": 1,
"depends_on": ["extract_users"],
"input_mapping": {
"user_ids": "extract_users.artifacts.user_ids",
"config": "job.input_params.config",
"batch_size": "job.input_params.batch_size"
}
}
```
**Mapping syntax:**
- `"step_name.artifacts.field"` - Reference another step's artifacts
- `"job.input_params.field"` - Reference job-level parameters
- `"step_name.result_data.field"` - Reference step's result data
### Automatic Checkpointing
When you complete a step with artifacts, they're automatically saved as checkpoint data:
```javascript
// Complete step with artifacts
await sdk.completeStep(stepId, {
result_data: { logs: "..." },
artifacts: {
processed_records: 1500,
last_id: "user-1500",
output_location: "s3://bucket/batch1.csv"
}
});
```
**What happens:**
1. Artifacts are saved as `checkpoint_data`
2. Checkpoint size is calculated and stored
3. If workflow fails, it can resume from this checkpoint
4. Next steps receive artifacts as their `input_params`
### Resuming from Checkpoints
If a workflow fails mid-execution, you can resume from the last completed step:
```javascript
// Get execution with steps
const execution = await getExecution(executionId);
const completedSteps = execution.steps.filter(s => s.status === 'completed');
const lastCheckpoint = completedSteps[completedSteps.length - 1];
// Resume from checkpoint
const nextStepInputs = lastCheckpoint.checkpoint_data; // Same as artifacts
```
### Complete Workflow Example
**Job Definition:**
```json
{
"name": "user_data_pipeline",
"steps": [
{
"name": "extract",
"step_index": 0,
"step_type": "script",
"depends_on": []
},
{
"name": "transform",
"step_index": 1,
"step_type": "script",
"depends_on": ["extract"],
"input_mapping": {
"data": "extract.artifacts.data",
"config": "job.input_params.transform_config"
}
},
{
"name": "load",
"step_index": 2,
"step_type": "script",
"depends_on": ["transform"],
"input_mapping": {
"transformed_data": "transform.artifacts.output",
"destination": "job.input_params.destination"
}
}
]
}
```
**Execution Flow:**
```javascript
// 1. Trigger job with parameters
const execution = await triggerJob(jobId, {
input_params: {
transform_config: { format: "csv" },
destination: "s3://bucket/final"
}
});
// 2. Step 1: Extract
// Receives: execution.input_params
await sdk.completeStep(step1Id, null, {
data: [...],
record_count: 1000
});
// Checkpoint created automatically
// 3. Step 2: Transform
// Receives: { data: [...], config: { format: "csv" } }
await sdk.completeStep(step2Id, null, {
output: "s3://bucket/transformed.csv",
rows: 1000
});
// Checkpoint created automatically
// 4. Step 3: Load
// Receives: { transformed_data: "s3://...", destination: "s3://bucket/final" }
await sdk.completeStep(step3Id, null, {
loaded: true,
location: "s3://bucket/final/data.csv"
});
```
**Key Points:**
- First step gets job's `input_params`
- Each step completion creates automatic checkpoint
- Artifacts flow to dependent steps as `input_params`
- Input mapping controls how parameters are composed
- Workflow can resume from any checkpoint
---
## Complete SDK Implementation Example
### Worker Mode (Stateful, Long-Running)
```javascript
class RestepWorkerSDK {
constructor(apiUrl, apiKey, executionId) {
this.apiUrl = apiUrl;
this.apiKey = apiKey;
this.executionId = executionId;
this.heartbeatInterval = null;
}
// Start heartbeat (every 30 seconds)
startHeartbeat() {
this.heartbeatInterval = setInterval(async () => {
await this.heartbeat();
}, 30000);
}
stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
}
async heartbeat() {
await this.request('POST', `/executions/${this.executionId}/heartbeat`);
}
async updateProgress(progressPercent, currentStepName, metadata = {}) {
await this.request('POST', `/executions/${this.executionId}/progress`, {
progress_percent: progressPercent,
current_step_name: currentStepName,
metadata
});
}
async log(level, message, logData = null) {
await this.request('POST', `/executions/${this.executionId}/log`, {
level,
message,
log_data: logData
});
}
async complete(resultData = null) {
this.stopHeartbeat();
await this.request('POST', `/executions/${this.executionId}/complete`, {
result_data: resultData
});
}
async fail(errorMessage, errorType = null, errorStack = null) {
this.stopHeartbeat();
await this.request('POST', `/executions/${this.executionId}/fail`, {
error_message: errorMessage,
error_type: errorType,
error_stack: errorStack
});
}
async request(method, path, body = null) {
const response = await fetch(`${this.apiUrl}${path}`, {
method,
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
},
body: body ? JSON.stringify(body) : null
});
if (!response.ok) {
throw new Error(`API request failed: ${response.statusText}`);
}
return await response.json();
}
}
// Usage example
async function executeJob(executionId, jobConfig) {
const sdk = new RestepWorkerSDK(
'https://your-restep-server.com/api',
process.env.RESTEP_API_KEY,
executionId
);
try {
// Start heartbeat
sdk.startHeartbeat();
// Log start
await sdk.log('info', 'Job execution started');
// Do work with progress updates
for (let i = 0; i < 10; i++) {
await processChunk(i);
await sdk.updateProgress((i + 1) * 10, `Processing chunk ${i + 1}/10`);
await sdk.log('info', `Completed chunk ${i + 1}`);
}
// Mark as complete
await sdk.complete({ chunks_processed: 10 });
} catch (error) {
await sdk.fail(error.message, error.constructor.name, error.stack);
}
}
```
### Serverless Mode (Stateless, Short-Running)
```python
import os
import requests
from typing import Dict, Optional, List
class RestepServerlessSDK:
def __init__(self, api_url: str, api_key: str, execution_id: str):
self.api_url = api_url
self.api_key = api_key
self.execution_id = execution_id
self.logs_buffer: List[Dict] = []
def _request(self, method: str, path: str, body: Optional[Dict] = None):
url = f"{self.api_url}{path}"
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
response = requests.request(method, url, json=body, headers=headers)
response.raise_for_status()
return response.json()
def update_progress(self, progress_percent: int, metadata: Optional[Dict] = None):
self._request('POST', f'/executions/{self.execution_id}/progress', {
'progress_percent': progress_percent,
'metadata': metadata or {}
})
def log(self, level: str, message: str, log_data: Optional[Dict] = None):
"""Buffer logs to send in batch"""
self.logs_buffer.append({
'level': level,
'message': message,
'log_data': log_data
})
def flush_logs(self):
"""Send all buffered logs"""
if self.logs_buffer:
self._request('POST', f'/executions/{self.execution_id}/log', {
'logs': self.logs_buffer
})
self.logs_buffer.clear()
def complete(self, result_data: Optional[Dict] = None):
self.flush_logs() # Send any remaining logs
self._request('POST', f'/executions/{self.execution_id}/complete', {
'result_data': result_data
})
def fail(self, error_message: str, error_type: Optional[str] = None):
self.flush_logs() # Send any remaining logs
self._request('POST', f'/executions/{self.execution_id}/fail', {
'error_message': error_message,
'error_type': error_type
})
# Usage example (AWS Lambda)
def lambda_handler(event, context):
execution_id = event['execution_id']
sdk = RestepServerlessSDK(
os.environ['RESTEP_API_URL'],
os.environ['RESTEP_API_KEY'],
execution_id
)
try:
sdk.log('info', 'Lambda function started')
# Do work
for i in range(10):
process_batch(i)
sdk.log('info', f'Processed batch {i + 1}')
sdk.update_progress((i + 1) * 10)
# Flush logs and complete
sdk.complete({'batches_processed': 10})
except Exception as e:
sdk.fail(str(e), type(e).__name__)
raise
```
---
## Error Handling
All endpoints return appropriate HTTP status codes:
- `200 OK`: Request successful
- `400 Bad Request`: Invalid request body or parameters
- `401 Unauthorized`: Missing or invalid API key
- `404 Not Found`: Execution or step not found
- `422 Unprocessable Entity`: Validation errors
- `500 Internal Server Error`: Server error
Error responses have the format:
```json
{
"errors": {
"field_name": ["error message"]
}
}
```
---
## Best Practices
### Worker Mode
1. **Start heartbeat immediately** when execution begins
2. **Send heartbeat every 30-60 seconds** to prevent timeout
3. **Stream logs in real-time** for better observability
4. **Update progress frequently** to show activity
5. **Always stop heartbeat** when completing/failing
### Serverless Mode
1. **Batch logs** and send periodically to reduce API calls
2. **Update progress at milestone points** (every 10%, every step, etc.)
3. **Always flush logs before completion/failure**
4. **Include execution_id in serverless config** from trigger
5. **Set appropriate timeout** on serverless function
### Both Modes
1. **Use structured log_data** instead of putting everything in message
2. **Include error stack traces** for failures
3. **Store meaningful result_data** on completion
4. **Use checkpoint_data** for resumable steps
5. **Validate execution_id** before starting work
---
## Rate Limits
Currently no rate limits are enforced, but recommended limits:
- Heartbeat: Max 1 per 10 seconds
- Progress updates: Max 10 per minute
- Logs: Max 100 per minute (use batch logging)
---
## Changelog
### Version 1.0 (2025-12-23)
- Initial SDK release
- Core execution lifecycle methods
- Step management support
- Progress tracking and logging
Generate high-quality, SEO-optimized content briefs automatically using AI, real-time keyword research, SERP intelligence, and historical content context. This workflow standardizes user inputs, fetches search metrics, analyzes competitors, and produces structured SEO briefs with quality scoring and version control. It also stores all versions in Google Sheets and generates HTML previews for easy review and publishing. 🤖📄📈
Generate high-quality, SEO-optimized content briefs automatically using AI, real-time keyword research, SERP intelligence, and historical content context. This workflow standardizes user inputs, fetches search metrics, analyzes competitors, and produces structured SEO briefs with quality scoring and version control. It also stores all versions in Google Sheets and generates HTML previews for easy review and publishing. 🤖📄📈
description: PostToolUse workflow - runs analyzers after tool execution.
description: Ask questions about Honda motorcycle manuals using Vector Search with cost-optimized Function Calling