Install
npx skillscat add smouj/auto-orchestrator-skill Install via the SkillsCat registry.
name: auto-orchestrator
version: 2.1.0
description: AI-powered orchestration engine for automated data workflows, ETL pipelines, and multi-agent task coordination
author: Kilocode Team
tags: [data, ai, automation, orchestration, pipeline, etl, workflow]
maintainer: ops@kilo.ai
homepage: https://kilo.ai/docs/skills/auto-orchestrator
repository: https://github.com/Kilo-Org/kilocode-skills/tree/main/skills/auto-orchestrator
license: MIT
platforms: [linux, darwin]
requirements:
python: ">=3.9"
system:
- name: git
purpose: Version control for pipeline definitions
- name: docker
purpose: Containerized task execution
- name: kubectl
purpose: Kubernetes cluster orchestration (optional)
- name: aws-cli
purpose: AWS service integration (optional)
python_packages:
- name: pyyaml
version: ">=6.0"
- name: croniter
version: ">=1.4.0"
- name: pydantic
version: ">=2.0"
- name: redis
version: ">=4.5"
- name: boto3
version: ">=1.28"
optional: true
- name: kubernetes
version: ">=26.0"
optional: true
env_vars:
- name: AUTO_ORCHESTRATOR_CONFIG_PATH
default: "~/.openclaw/config/auto-orchestrator.yaml"
purpose: Path to main configuration file
- name: ORCHESTRATION_WORKDIR
default: "/tmp/orchestration"
purpose: Working directory for pipeline execution
- name: REDIS_URL
default: "redis://localhost:6379/0"
purpose: Redis connection for state management
- name: ORCHESTRATION_LOG_LEVEL
default: "INFO"
purpose: Logging level (DEBUG, INFO, WARNING, ERROR)
- name: K8S_NAMESPACE
default: "orchestration"
purpose: Kubernetes namespace for pod deployments
- name: DRY_RUN
default: "false"
purpose: Set to true to validate without executing
capabilities:
- name: pipeline_execution
description: Execute multi-stage data pipelines with dependencies - name: schedule_management
description: Create, update, and manage scheduled workflows - name: resource_allocation
description: Dynamic CPU/memory allocation based on task requirements - name: failure_recovery
description: Automatic retry with exponential backoff - name: parallel_execution
description: Run independent tasks concurrently - name: state_tracking
description: Persistent state management with Redis - name: monitoring
description: Real-time metrics and alerting - name: rollback
description: Point-in-time recovery to previous pipeline states
# Auto Orchestrator Skill
## Purpose
Auto Orchestrator is an AI-powered orchestration engine designed to automate complex data workflows, ETL pipelines, and multi-agent task coordination. It enables declarative pipeline definitions with intelligent scheduling, dependency management, and fault tolerance.
### Real Use Cases
1. **Daily Sales ETL Pipeline**: Automatically extract sales data from 5 different APIs, transform and load into a data warehouse, then trigger downstream analytics jobs. Handles API rate limits, retries failed extracts, and sends alerts on data quality issues.
2. **ML Model Training Workflow**: Coordinate data preprocessing, feature engineering, model training, evaluation, and deployment stages. Automatically scales compute resources based on dataset size, tracks experiments, and rolls back to previous model if performance degrades.
3. **Data Quality Validation Suite**: Run daily validation checks across 100+ data sources, execute custom validation rules, quarantine bad data, and notify stakeholders. Parallelizes validation tasks to complete in under 15 minutes.
4. **Cross-System Data Synchronization**: Keep customer data synchronized between CRM, marketing platform, and support ticket system with conflict resolution, bidirectional sync, and change data capture.
5. **Report Generation Pipeline**: Aggregate data from multiple sources, apply business logic, generate PDF/Excel reports, and distribute to stakeholders via email/Slack on a schedule.
6. **Infrastructure Provisioning Workflow**: Orchestrate Terraform runs, database migrations, and service deployments in defined order with health checks between stages.
## Scope
### Core Commands
#### `orchestrate pipeline execute`
Execute a pipeline definition with real-time monitoring.
**Flags:**
- `--pipeline <file>`: Path to YAML/JSON pipeline definition (required)
- `--vars <file>`: JSON file with variable overrides
- `--env <key=value>`: Environment variable overrides (can be repeated)
- `--dry-run`: Validate without executing
- `--parallel`: Override parallel execution setting
- `--timeout <duration>`: Global timeout (e.g., 2h, 30m)
- `--priority <level>`: Priority level (low, normal, high, critical)
- `--trace-id <uuid>`: Custom trace ID for tracking
- `--log-format <format>`: json, text, or colored
- `--monitor`: Start monitoring dashboard after execution
- `--notify <webhook>`: Post completion notification
**Examples:**
```bash
orchestrate pipeline execute \
--pipeline ./etl/daily_sales.yaml \
--vars ./config/overrides.json \
--env EXTRACTION_BATCH_SIZE=500 \
--timeout 3h \
--priority high \
--trace-id $(uuidgen) \
--monitororchestrate pipeline create
Create a new pipeline definition from template.
Flags:
--name <string>: Pipeline name (required)--type <type>: etl, ml, sync, validation, custom--stages <count>: Number of stages (default: 3)--schedule <cron>: Optional cron schedule--template <file>: Custom template file--output <file>: Output file path (default: ./.yaml)--description <string>: Pipeline description--tags <list>: Comma-separated tags
Example:
orchestrate pipeline create \
--name customer_sync \
--type sync \
--stages 5 \
--schedule "0 */6 * * *" \
--description "Bidirectional CRM sync every 6 hours" \
--output ./pipelines/customer_sync.yamlorchestrate schedule add
Add a scheduled execution to the orchestrator.
Flags:
--pipeline <file>: Pipeline file path (required)--cron <expression>: Cron expression (required)--name <string>: Schedule name--vars <file>: Default variable overrides--max-retries <count>: Failed execution retries (default: 3)--concurrency <count>: Max concurrent runs (default: 1)--timezone <tz>: Timezone for schedule (default: UTC)--start <datetime>: When to start scheduling--end <datetime>: When to stop scheduling--skip-if-running: Skip if previous run still active--notify-on-failure <webhook>: Failure notification URL
Example:
orchestrate schedule add \
--pipeline ./pipelines/daily_etl.yaml \
--cron "0 2 * * *" \
--name "Daily ETL 2AM UTC" \
--max-retries 2 \
--concurrency 1 \
--timezone America/New_York \
--notify-on-failure https://hooks.slack.com/services/...orchestrate schedule list
List all scheduled pipelines.
Flags:
--active-only: Only show enabled schedules--format <format>: table, json, yaml--next <count>: Show next N executions--filter <expr>: Filter by name/tag/pipeline
Example:
orchestrate schedule list \
--active-only \
--format table \
--next 10orchestrate state get
Retrieve execution state from Redis.
Flags:
--execution-id <uuid>: Execution ID (required)--output <file>: Write to file instead of stdout--format <format>: json, yaml, summary--include-logs: Include task logs--include-artifacts: Include artifact metadata--compact: Remove whitespace from JSON output
Example:
orchestrate state get \
--execution-id abc123-def456 \
--format json \
--include-logs \
--output ./state_abc123.jsonorchestrate state set
Manually set execution state (emergency use).
Flags:
--execution-id <uuid>: Execution ID (required)--status <status>: running, succeeded, failed, cancelled--task <id>: Task ID to update--message <string>: Status message--force: Override validation checks--dry-run: Preview changes without applying
Example:
orchestrate state set \
--execution-id abc123-def456 \
--status failed \
--task extract_orders \
--message "Manual failover - API endpoint down" \
--forceorchestrate resource allocate
Allocate compute resources for a pipeline.
Flags:
--pipeline <name>: Pipeline name (required)--stage <id>: Specific stage (optional, defaults to all)--cpu <cores>: CPU cores (e.g., 2, 0.5)--memory <size>: Memory (e.g., 4Gi, 8192Mi)--gpu <count>: GPU count--disk <size>: Disk space (e.g., 100Gi)--priority-class <class>: Kubernetes priority class--node-selector <label>: Kubernetes node selector--persist: Persist allocations for future runs--dry-run: Show allocation plan without applying
Example:
orchestrate resource allocate \
--pipeline ml_training \
--stage feature_engineering \
--cpu 4 \
--memory 16Gi \
--gpu 1 \
--persistorchestrate rollback execute
Rollback a pipeline execution to a previous state.
Flags:
--execution-id <uuid>: Current execution ID (required)--to-execution-id <uuid>: Target execution to rollback to--to-timestamp <datetime>: Or rollback to specific time--strategy <strategy: checkpoint, restart, revert (default: checkpoint)--include-artifacts: Also rollback artifacts--force: Skip validation warnings--verify: Run verification checks after rollback--notify <webhook>: Notify on completion--dry-run: Show rollback plan without executing
Examples:
# Rollback to specific previous execution
orchestrate rollback execute \
--execution-id abc123 \
--to-execution-id xyz789 \
--strategy checkpoint \
--verify \
--force
# Rollback to state from 30 minutes ago
orchestrate rollback execute \
--execution-id abc123 \
--to-timestamp "$(date -d '30 minutes ago' -Iseconds)" \
--include-artifacts \
--notify https://alerts.example.com/rollbackorchestrate monitor start
Start real-time monitoring dashboard.
Flags:
--execution-id <uuid>: Specific execution to monitor (optional)--all: Monitor all active executions--port <number>: Dashboard port (default: 9090)--host <interface>: Bind address (default: 0.0.0.0)--refresh <seconds>: Refresh interval (default: 5)--metrics <endpoint>: Prometheus metrics endpoint--open: Automatically open browser--tls-cert <file>: TLS certificate path--tls-key <file>: TLS key path
Example:
orchestrate monitor start \
--execution-id abc123-def456 \
--port 9090 \
--refresh 2 \
--openorchestrate validate pipeline
Validate a pipeline definition without executing.
Flags:
--pipeline <file>: Pipeline file (required)--strict: Enable strict validation--schema <version>: Schema version to validate against--check-connections: Verify external connections--check-resources: Verify resource availability--benchmark: Estimate execution time--output <format>: json, sarif, human
Example:
orchestrate validate pipeline \
--pipeline ./new_pipeline.yaml \
--strict \
--check-connections \
--output sarif > validation_results.sarifConfiguration Commands
orchestrate config init
Initialize orchestrator configuration.
Flags:
--redis-url <url>: Redis connection URL--workdir <path>: Working directory--log-level <level>: Logging level--k8s-namespace <name>: Kubernetes namespace--executor <type>: local, docker, k8s, aws-batch--dry-run: Generate config without writing--force: Overwrite existing config
Example:
orchestrate config init \
--redis-url redis://redis-cluster:6379/0 \
--workdir /var/lib/orchestration \
--log-level INFO \
--executor k8s \
--k8s-namespace data-pipelinesorchestrate config show
Display current configuration.
Flags:
--format <format>: json, yaml, env--secrets: Include secrets (redacted by default)--show-origin: Show source of each config value
Example:
orchestrate config show \
--format yaml \
--show-originorchestrate config set
Set configuration values.
Flags:
--key <path>: Configuration key (dot notation)--value <value>: New value--type <type>: string, int, bool, float, json--global: Set in global config vs user config--dry-run: Preview without setting
Example:
orchestrate config set \
--key executor.docker.network_mode \
--value "bridge" \
--type stringState Management Commands
orchestrate state cleanup
Clean up old execution states.
Flags:
--older-than <duration>: Delete states older than (e.g., 30d, 90d)--status <list>: Only cleanup specific statuses (succeeded,failed)--keep-count <n>: Keep N most recent per pipeline--dry-run: Show what would be deleted--force: Skip interactive confirmation--compact: Compact Redis storage after cleanup
Example:
orchestrate state cleanup \
--older-than 90d \
--status succeeded,failed \
--keep-count 10 \
--forceorchestrate state archive
Archive execution state to S3 or filesystem.
Flags:
--execution-id <uuid>: Execution to archive--destination <path>: S3 path or local directory--format <format>: json, parquet, csv--compress: Gzip compression--include-logs: Include full task logs--include-artifacts: Include artifact files--dry-run: Preview without archiving
Example:
orchestrate state archive \
--execution-id abc123 \
--destination s3://orchestration-archives/2024/ \
--format parquet \
--compress \
--include-logsDetailed Work Process
Pipeline Definition Structure
version: "2.1"
name: "daily_sales_etl"
description: "Extract sales data from multiple sources, transform, and load to data warehouse"
tags: [etl, daily, sales]
# Global configuration
config:
timeout: 3h
retry_policy:
max_attempts: 3
backoff: exponential
initial_delay: 30s
max_delay: 10m
parallel: true
max_concurrent: 10
# Variable definitions with defaults
variables:
EXTRACTION_DATE: "{{ ds }}" # ds is execution date
SALES_API_ENDPOINT: "https://api.sales.com/v1"
DATAWAREHOUSE_CONN: "postgresql://user:pass@dw.example.com:5432/analytics"
RETRY_FLAG: false
# Resource requirements per stage (can be overridden per task)
resources:
default:
cpu: 1.0
memory: 2Gi
disk: 10Gi
extract:
cpu: 2.0
memory: 4Gi
transform:
cpu: 4.0
memory: 8Gi
load:
cpu: 2.0
memory: 4Gi
# Stage definitions (executed sequentially unless parallel enabled)
stages:
- name: extract
description: "Extract data from all source systems"
tasks:
- name: extract_orders
type: python_script
script: ./tasks/extract_orders.py
dependencies: []
env:
API_ENDPOINT: "{{ SALES_API_ENDPOINT }}"
BATCH_SIZE: 1000
resources:
cpu: 2.0
memory: 4Gi
timeout: 1h
retry: true
on_failure:
- type: notify
config:
webhook: "https://alerts.example.com/failure"
template: ./templates/alert_failure.json
output_artifacts:
- path: /tmp/orders_{{ EXTRACTION_DATE }}.json
type: dataset
upload_to: s3://raw-data/orders/
- name: extract_customers
type: python_script
script: ./tasks/extract_customers.py
dependencies: [] # Runs in parallel with extract_orders
env:
API_ENDPOINT: "{{ SALES_API_ENDPOINT }}/customers"
resources:
cpu: 1.0
memory: 2Gi
timeout: 30m
retry: false # No retry for this task
- name: transform
description: "Clean and transform extracted data"
tasks:
- name: transform_orders
type: spark_job
application: ./spark/transform_orders.py
dependencies:
- extract.extract_orders
- extract.extract_customers # Wait for both extracts
resources:
cpu: 4.0
memory: 8Gi
executor_instances: 2
conf:
spark.sql.shuffle.partitions: 4
spark.executor.memoryOverhead: 1g
input_artifacts:
- s3://raw-data/orders/{{ EXTRACTION_DATE }}/*.json
- s3://raw-data/customers/{{ EXTRACTION_DATE }}/*.json
output_artifacts:
- path: /tmp/transformed_orders.parquet
type: dataset
upload_to: s3://transformed/orders/
- name: validate
description: "Validate transformed data quality"
tasks:
- name: data_quality_checks
type: python_script
script: ./tasks/data_quality.py
dependencies:
- transform.transform_orders
env:
DW_CONNECTION: "{{ DATAWAREHOUSE_CONN }}"
MIN_ROWS_EXPECTED: 10000
resources:
cpu: 1.0
memory: 2Gi
timeout: 15m
retry:
max_attempts: 2
validation_rules:
- rule: unique_check
column: order_id
threshold: 0.99
- rule: not_null
columns: [customer_id, order_date, total_amount]
threshold: 1.0
on_failure:
- type: quarantine
config:
destination: s3://quarantine/{{ EXTRACTION_DATE }}/
- type: notify
config:
webhook: "https://alerts.example.com/data_quality"
severity: critical
- name: load
description: "Load validated data to data warehouse"
tasks:
- name: load_to_dw
type: sql_script
script: ./sql/load_orders.sql
dependencies:
- validate.data_quality_checks
env:
DW_CONNECTION: "{{ DATAWAREHOUSE_CONN }}"
SOURCE_PATH: "s3://transformed/orders/{{ EXTRACTION_DATE }}/"
resources:
cpu: 2.0
memory: 4Gi
timeout: 45m
retry: true
post_hooks:
- type: sql
script: ./sql/update_stats.sql
- type: notify
config:
webhook: "https://alerts.example.com/success"
template: ./templates/success_notification.json
# Notifications and hooks
notifications:
on_success:
- type: email
config:
to: ["data-team@example.com"]
subject: "Pipeline {{ pipeline.name }} succeeded"
- type: slack
config:
channel: "#data-pipelines"
template: ./templates/slack_success.json
on_failure:
- type: pagerduty
config:
service_key: "{{ PD_SERVICE_KEY }}"
severity: "{{ 'critical' if task.critical else 'error' }}"
on_retry:
- type: log
config:
level: warning
# Scheduling (optional, also managed via `orchestrate schedule add`)
schedule:
cron: "0 2 * * *"
timezone: "America/New_York"
concurrency: 1
skip_if_running: trueExecution Flow
Validation Phase:
- Parse pipeline definition
- Validate syntax against JSON schema
- Check all referenced files exist (scripts, templates)
- Verify resource availability
- Test external connections if
--check-connectionsflag - Generate execution plan with estimated duration
Initialization Phase:
- Create unique execution ID (UUID)
- Initialize Redis state with
PENDINGstatus - Resolve all variable substitutions (including from
--varsand--env) - Allocate compute resources (Docker containers, K8s pods, batch jobs)
- Create execution working directory
- Download input artifacts if needed
- Record start timestamp
Execution Phase (stages run sequentially unless
parallel: true):- For each stage:
- Set stage status to
RUNNING - For each task in stage:
- Check dependencies (blocking tasks until dependencies succeed)
- Allocate task-specific resources
- Prepare execution environment:
- Pull Docker images if needed
- Create K8s pod specification
- Mount volumes and secrets
- Set environment variables
- Start task execution:
- Local: subprocess with resource limits (cgroups)
- Docker:
docker runwith resource constraints - K8s: Create pod, wait for Ready state
- AWS Batch: Submit job, poll for completion
- Stream logs to stdout and Redis
- Monitor resource usage (CPU, memory)
- On task completion:
- Check exit code
- Upload output artifacts if specified
- Update task status (
SUCCEEDEDorFAILED) - Trigger
post_hooksif specified
- On task failure:
- Check retry policy
- If retries remain, wait with backoff and retry
- If no retries or critical failure:
- Execute
on_failurehooks - Set stage and pipeline status to
FAILED - Trigger pipeline-level failure notifications
- Abort subsequent stages
- Execute
- If stage succeeded, proceed to next stage
- Set stage status to
- Record final pipeline status (
SUCCEEDEDorFAILED) - Calculate total execution time
- Compress and archive logs
- For each stage:
Completion Phase:
- Store final state in Redis with TTL (default: 90 days)
- Upload artifacts to persistent storage
- Send success/failure notifications
- Generate execution report (HTML/JSON)
- Cleanup temporary resources (unless
--preserveflag) - Exit with code 0 for success, 1 for failure
State Schema (Redis)
{
"execution_id": "uuid",
"pipeline": "pipeline_name",
"status": "running|succeeded|failed|cancelled",
"started_at": "2024-01-15T10:30:00Z",
"completed_at": "2024-01-15T11:45:00Z",
"duration_seconds": 4500,
"triggered_by": "user@example.com|schedule|api",
"variables": {
"EXTRACTION_DATE": "2024-01-15"
},
"stages": [
{
"name": "extract",
"status": "succeeded",
"started_at": "2024-01-15T10:30:15Z",
"completed_at": "2024-01-15T10:45:30Z",
"tasks": [
{
"id": "extract_orders",
"status": "succeeded",
"attempt": 1,
"started_at": "2024-01-15T10:30:15Z",
"completed_at": "2024-01-15T10:42:00Z",
"exit_code": 0,
"resource_usage": {
"cpu_seconds": 720,
"memory_mb_peak": 3800,
"disk_io_mb": 150
},
"artifacts": [
{
"path": "/tmp/orders_2024-01-15.json",
"type": "dataset",
"size_bytes": 52428800,
"uploaded_to": "s3://raw-data/orders/2024-01-15/abc123.json"
}
]
}
]
}
],
"notifications": [
{
"type": "slack",
"status": "sent",
"timestamp": "2024-01-15T11:46:00Z"
}
],
"logs_url": "s3://pipeline-logs/abc123/combined.log.gz"
}Resource Allocation Logic
# Pseudo-code for resource management
def allocate_resources(task, pipeline_resources):
# Start with pipeline defaults
resources = pipeline_resources['default'].copy()
# Override with task-specific resources
if 'resources' in task:
resources.update(task['resources'])
# Apply executor-specific formatting
if executor == 'docker':
return format_docker_resources(resources)
elif executor == 'k8s':
return format_k8s_resources(resources)
elif executor == 'local':
return apply_cgroups(resources)
# Dynamic scaling based on historical usage
if task.get('auto_scale', False):
historical = get_historical_usage(task['name'])
resources = scale_resources(resources, historical)
return resourcesRetry Policy with Exponential Backoff
def should_retry(task, failure_reason, attempt):
if attempt >= task.get('max_retries', 3):
return False
# Check if failure is retryable
retryable_errors = [
'ConnectionError',
'TimeoutError',
'HTTP 5xx',
'K8s PodFailed',
'EC2 Spot Instance Interruption'
]
if failure_reason not in retryable_errors:
return False
# Calculate backoff: 2^attempt * initial_delay
base_delay = task.get('retry_delay', 30) # seconds
max_delay = task.get('max_retry_delay', 600) # 10 minutes
delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
# Add jitter (±25%)
jitter = delay * 0.25 * random.uniform(-1, 1)
final_delay = delay + jitter
time.sleep(final_delay)
return TrueGolden Rules
Pipeline Definitions Must Be Immutable: Once a pipeline executes, its definition is stored with the execution. For reproducibility, never modify definitions of completed executions. Create new versions instead.
External Dependencies Must Have Timeouts: All external calls (APIs, databases) must specify explicit timeouts. Relying on OS defaults leads to hung tasks that block entire pipelines.
Artifacts Must Be Immutable and Versioned: Once an artifact is created, never modify it. Store in write-once storage (S3 with object lock, GCS with retention policy). Include execution ID in artifact paths.
No Secrets in Pipeline Definitions: Secrets must come from environment variables, Kubernetes secrets, or HashiCorp Vault. Never hardcode credentials. Use
{{ env.VAR_NAME }}substitution.Each Task Must Be Atomic and Self-Contained: Tasks should do one thing well and not depend on local state from previous runs. All inputs come from artifacts or variables; all outputs are artifacts.
Failure Is Expected, Not Exceptional: Design pipelines assuming tasks will fail. Include validation, retries, quarantines, and compensation transactions. A pipeline without error handling is broken by design.
Resource Requests Must Be Realistic: Over-allocating wastes money; under-allocating causes OOM kills. Use historical metrics to set CPU/memory requests. Always set memory limits.
State Must Be Redis-Backed, Not Local: Never store execution state on local filesystem. All state (including task progress) must be in Redis for crash recovery and multi-node coordination.
Schedules Must Be Timezone-Aware: Cron expressions without explicit timezone default to UTC. Always specify timezone to avoid DST surprises.
Monitoring Is Not Optional: Every pipeline must have:
- Resource metrics (CPU, memory, I/O)
- Duration metrics per task/stage
- Failure rate tracking
- Alerting on failures and SLA breaches
- Integration with centralized logging (Loki/ELK)
Rollback Must Be Fast and Tested: For production pipelines, rollback procedures must be documented and tested quarterly. Rollback should complete in <50% of pipeline execution time.
Parallelism Requires Clear Dependencies: Use DAGs, not free-for-all parallel execution. Tasks that run in parallel must be truly independent; shared state without locking causes race conditions.
Variables Must Be Validated: All user-supplied variables (from
--varsor--env) must have schema validation. Reject pipeline start if required variables missing or invalid types.Logs Must Be Structured and Centralized: Use JSON logging. All logs must flow to centralized store with execution ID for traceability. Never rely on local log files.
Changes Through PRs Only: Pipeline definitions are code. All changes require pull request with review. No direct edits to production pipelines.
Examples
Example 1: Daily Batch ETL
Command:
orchestrate pipeline execute \
--pipeline ./pipelines/sales_etl_daily.yaml \
--env EXTRACTION_DATE=$(date -d 'yesterday' +%Y-%m-%d) \
--vars ./config/feature_flags.json \
--timeout 4h \
--priority high \
--trace-id $(uuidgen) \
--monitorPipeline Definition (sales_etl_daily.yaml):
version: "2.1"
name: "sales_etl_daily"
variables:
EXTRACTION_DATE: "{{ ds }}"
SOURCE_DB: "prod-sales-readonly"
TARGET_DW: "analytics-prod"
BATCH_SIZE: 10000
stages:
- name: extract
tasks:
- name: extract_orders
type: python_script
script: ./src/extract_orders.py
env:
SOURCE_DB: "{{ SOURCE_DB }}"
DATE: "{{ EXTRACTION_DATE }}"
BATCH_SIZE: "{{ BATCH_SIZE }}"
resources:
cpu: 2.0
memory: 4Gi
timeout: 2h
retry:
max_attempts: 3
backoff: exponential
output_artifacts:
- path: /tmp/orders_{{ EXTRACTION_DATE }}.jsonl
type: dataset
upload_to: s3://raw-sales-data/orders/date={{ EXTRACTION_DATE }}/
- name: extract_customers
type: python_script
script: ./src/extract_customers.py
dependencies: []
env:
SOURCE_DB: "{{ SOURCE_DB }}"
resources:
cpu: 1.0
memory: 2Gi
timeout: 1h
output_artifacts:
- path: /tmp/customers_{{ EXTRACTION_DATE }}.jsonl
type: reference
upload_to: s3://raw-sales-data/customers/date={{ EXTRACTION_DATE }}/
- name: transform
tasks:
- name: transform_orders
type: spark_job
application: ./spark/transform_orders.py
dependencies:
- extract.extract_orders
- extract.extract_customers
resources:
cpu: 8.0
memory: 16Gi
executor_instances: 4
conf:
spark.sql.adaptive.enabled: true
spark.sql.adaptive.coalescePartitions.enabled: true
input_artifacts:
- s3://raw-sales-data/orders/date={{ EXTRACTION_DATE }}/*.jsonl
- s3://raw-sales-data/customers/date={{ EXTRACTION_DATE }}/*.jsonl
output_artifacts:
- path: /tmp/transformed_orders_{{ EXTRACTION_DATE }}.parquet
type: dataset
upload_to: s3://transformed-sales-data/orders/date={{ EXTRACTION_DATE }}/
- name: load
tasks:
- name: load_to_dw
type: sql_script
script: ./sql/load_orders.sql
dependencies:
- transform.transform_orders
env:
DW_CONNECTION: "postgresql://{{ env.DW_USER }}:{{ env.DW_PASSWORD }}@{{ TARGET_DW }}:5432/analytics"
SOURCE_PATH: "s3://transformed-sales-data/orders/date={{ EXTRACTION_DATE }}/"
resources:
cpu: 2.0
memory: 4Gi
timeout: 1h
post_hooks:
- type: sql
script: ./sql/refresh_materialized_views.sqlExpected Output:
[INFO] 2024-01-15T10:00:00Z - Starting orchestration execution: abc123-def456
[INFO] Pipeline: sales_etl_daily, Triggered by: user@example.com
[INFO] Validating pipeline definition...
[PASS] Validation succeeded (3 stages, 5 tasks)
[INFO] Allocating resources: extract (2 CPUs, 4Gi RAM each task)
[RUN] Stage 1/3: extract (parallel tasks: 2)
[RUN] Task extract_orders (attempt 1/3) started at 10:00:05
[INFO] Extracting orders for 2024-01-14, batch size: 10000
[RUN] Task extract_customers (attempt 1/1) started at 10:00:06
[INFO] Extracting customers (full refresh)
[DONE] extract_orders succeeded (1h 12m 34s) - 2.3M orders extracted
[DONE] extract_customers succeeded (42m 18s) - 458K customers extracted
[RUN] Stage 2/3: transform
[RUN] Task transform_orders (attempt 1/1) started at 11:45:00
[INFO] Spark job started with 4 executors
[DONE] transform_orders succeeded (35m 12s) - Parquet written to s3://...
[RUN] Stage 3/3: load
[RUN] Task load_to_dw (attempt 1/3) started at 12:20:15
[DONE] load_to_dw succeeded (18m 45s) - 2.3M rows loaded
[INFO] Pipeline sales_etl_daily succeeded in 3h 6m 44s
[NOTIFY] Sending success notifications to: data-team@example.com, #data-pipelines
[ARCHIVE] Execution state saved to redis with 90d TTL
[ARCHIVE] Logs uploaded to: s3://pipeline-logs/abc123/combined.log.gz
[SUCCESS] Execution abc123-def456 completed successfullyExample 2: ML Training Pipeline with Auto-Scaling
Command:
orchestrate pipeline execute \
--pipeline ./ml/training_pipeline.yaml \
--env MODEL_TYPE=resnet50 \
--env TRAIN_DATA_PATH=s3://ml-data/training/2024-Q1/ \
--env VAL_DATA_PATH=s3://ml-data/validation/2024-Q1/ \
--vars ./config/hyperparameters.json \
--resources '{"train_model": {"gpu": 4, "memory": "64Gi"}}' \
--priority criticalPipeline Definition (training_pipeline.yaml):
version: "2.1"
name: "ml_training_resnet"
description: "Train ResNet50 model on image dataset"
variables:
MODEL_TYPE: "{{ env.MODEL_TYPE }}"
TRAIN_DATA_PATH: "{{ env.TRAIN_DATA_PATH }}"
VAL_DATA_PATH: "{{ env.VAL_DATA_PATH }}"
EPOCHS: 50
BATCH_SIZE: 256
LEARNING_RATE: 0.001
stages:
- name: preprocessing
tasks:
- name: data_validation
type: python_script
script: ./ml/validate_data.py
env:
DATA_PATH: "{{ TRAIN_DATA_PATH }}"
MIN_IMAGES: 100000
resources:
cpu: 4.0
memory: 8Gi
timeout: 2h
retry: false
validation_rules:
- rule: image_count
min: 100000
- rule: class_distribution
min_ratio: 0.01
- name: feature_engineering
type: spark_job
application: ./spark/preprocess_images.py
dependencies:
- preprocessing.data_validation
env:
INPUT_PATH: "{{ TRAIN_DATA_PATH }}"
OUTPUT_PATH: "s3://ml-preprocessed/train/$(date +%Y%m%d)/"
resources:
cpu: 16.0
memory: 32Gi
executor_instances: 8
timeout: 6h
- name: training
tasks:
- name: train_model
type: kubernetes_job
image: "registry.example.com/ml/trainer:{{ MODEL_TYPE }}-v2.1"
dependencies:
- preprocessing.feature_engineering
env:
TRAIN_DATA: "s3://ml-preprocessed/train/$(date +%Y%m%d)/"
VAL_DATA: "{{ VAL_DATA_PATH }}"
EPOCHS: "{{ EPOCHS }}"
BATCH_SIZE: "{{ BATCH_SIZE }}"
LEARNING_RATE: "{{ LEARNING_RATE }}"
WANDB_API_KEY: "{{ secrets.WANDB_API_KEY }}"
WANDB_PROJECT: "resnet-training"
resources:
cpu: 16.0 # Request for pod
memory: 64Gi
gpu: 4 # Must match node capacity
limits:
memory: 68Gi # Allow 4Gi burst
timeout: 48h
retry:
max_attempts: 2
checkpoint:
path: "s3://ml-checkpoints/{{ execution_id }}/"
interval: "every 5 epochs"
tensorboard:
log_dir: "s3://ml-logs/tensorboard/{{ execution_id }}/"
on_success:
- type:Promote
config:
model_bucket: "s3://ml-models/production/"
- name: evaluate_model
type: python_script
script: ./ml/evaluate.py
dependencies:
- training.train_model
env:
MODEL_PATH: "s3://ml-models/staging/{{ MODEL_TYPE }}/latest/"
VAL_DATA_PATH: "{{ VAL_DATA_PATH }}"
METRICS_OUTPUT: "./metrics.json"
resources:
cpu: 4.0
memory: 16Gi
timeout: 2h
validation_rules:
- rule: accuracy
min: 0.85
comparator: ">="
- rule: inference_latency_p99
max: 100 # ms
comparator: "<="
on_failure:
- type:reject
config:
reason: "Model did not meet accuracy threshold"
model_path: "s3://ml-models/staging/{{ MODEL_TYPE }}/latest/"
- name: deployment
tasks:
- name: canary_deploy
type: kubernetes_manifest
manifest: ./k8s/canary-deployment.yaml
dependencies:
- training.evaluate_model
template_vars:
MODEL_VERSION: "{{ execution_id }}"
IMAGE_TAG: "{{ MODEL_TYPE }}-{{ execution_id }}"
resources:
cpu: 1.0
memory: 512Mi
health_check:
endpoint: "/health"
timeout: 30s
min_healthy: 90 # Percentage
canary:
initial_traffic: 10
ramp_up: "every 5m, +10"
rollback_on_failure: true
timeout: 1hOutput Sample:
[INFO] Execution: abc789-xyz123 (ML Training ResNet50)
[VALIDATE] Pipeline structure valid
[ALLOCATE] GPU resources: requesting 4x NVIDIA A100 on GPU-enabled nodes
[RUN] Stage 1/3: preprocessing
[DONE] data_validation: 1.2M images validated (45m 12s)
[RUN] feature_engineering: Spark job starting on 8 executors (32 cores, 256Gi RAM total)
[PROGRESS] Spark UI: http://spark-master:4040 (stages: 15/23 complete)
[DONE] feature_engineering: Preprocessing complete (4h 22m) - 1.2M images written
[RUN] Stage 2/3: training
[RUN] train_model: Creating K8s job in namespace "ml-training"
[INFO] Pod ml-abc789-xyz123-0 created on node gpu-pool-5
[STREAM] Training logs:
Epoch 1/50, Loss: 4.256, Acc: 0.123
Epoch 5/50, Loss: 2.891, Acc: 0.456, Checkpoint saved
...
Epoch 50/50, Loss: 0.234, Acc: 0.923
[CHECKPOINT] Saved to s3://ml-checkpoints/abc789-xyz123/epoch_50.pt
[DONE] train_model: Training complete (47h 15m)
[RUN] evaluate_model: Running inference on validation set (50K images)
[DONE] evaluate_model: Accuracy: 0.923, Latency p99: 87ms (1h 33m)
[PASS] Validation rules passed (accuracy > 0.85)
[RUN] Stage 3/3: deployment
[RUN] canary_deploy: Creating K8s canary deployment
[PROGRESS] Rolling out canary: 10% → 20% → 30% (monitoring error rate)
[INFO] Error rate stable at 0.02%, continuing ramp-up
[DONE] canary_deploy: Canary at 100%, traffic shifted (45m)
[SUCCESS] Pipeline succeeded in 54h 10m
[NOTIFY] Slack: ":white_check_mark: ResNet50 model trained and deployed (acc: 92.3%)"
[ARCHIVE] Model artifacts: s3://ml-models/production/resnet50/abc789-xyz123/
[PROMOTE] Latest model pointer updatedExample 3: Schedule Management
Add a Schedule:
orchestrate schedule add \
--pipeline ./pipelines/customer_sync.yaml \
--cron "0 */6 * * *" \
--name "Customer Sync Every 6 Hours" \
--max-retries 2 \
--concurrency 1 \
--timezone America/New_York \
--skip-if-running \
--notify-on-failure https://hooks.slack.com/services/...List Schedules:
$ orchestrate schedule list --active-only --format table --next 5
NAME PIPELINE SCHEDULE NEXT RUN STATUS
Customer Sync Every 6h customer_sync 0 */6 * * * 2024-01-16 00:00 EST Active
Daily ETL 2AM UTC daily_sales_etl 0 2 * * * 2024-01-16 02:00 UTC Active
Hourly Metrics hourly_metrics 0 * * * * 2024-01-15 13:00 UTC ActiveManually Trigger Scheduled Run (with different variables):
orchestrate pipeline execute \
--pipeline ./pipelines/customer_sync.yaml \
--env FORCE_FULL_SYNC=true \
--priority high \
--trace-id manual-$(date +%s)Example 4: State Recovery and Rollback
Scenario: A pipeline failure due to transient error; after fix, resume from last successful stage.
Check State:
$ orchestrate state get --execution-id abc123-def456 --format json --compact
{
"execution_id": "abc123-def456",
"status": "failed",
"pipeline": "daily_sales_etl",
"started_at": "2024-01-15T02:00:00Z",
"completed_at": "2024-01-15T04:30:00Z",
"stages": [
{"name": "extract", "status": "succeeded", "tasks": [...]},
{"name": "transform", "status": "succeeded", "tasks": [...]},
{"name": "validate", "status": "failed",
"tasks": [{"id": "data_quality", "status": "failed", "error": "API timeout"}]}
]
}Rollback to Checkpoint (Before Failure Stage):
# The pipeline completed, but we want to restart from validate stage
orchestrate rollback execute \
--execution-id abc123-def456 \
--to-stage validate \
--strategy restart \
--include-artifacts \
--verify \
--notify https://alerts.example.com/rollback_completeOr rollback to previous successful execution:
# Find previous successful execution ID
$ orchestrate history list --pipeline daily_sales_etl --status succeeded --limit 5 | head -1
xyz789-uvw456 2024-01-14 02:00 UTC succeeded 2h 45m
# Rollback state to match that execution
orchestrate rollback execute \
--execution-id abc123-def456 \
--to-execution-id xyz789-uvw456 \
--strategy checkpoint \
--force \
--dry-runDry-run output:
[Dry Run] Rollback Plan for execution abc123-def456:
1. Restore Redis state from execution xyz789-uvw456
2. Download artifacts:
- s3://transformed-sales-data/orders/date=2024-01-13/xyz789_artifacts.tar.gz
-> /tmp/orchestration/artifacts/
3. Reset stage status: load → PENDING
4. Keep execution ID: abc123-ded456 (new run)
5. Verification checks:
- Validate artifact integrity (SHA256)
- Confirm dependency stage outputs exist
6. Estimated rollback time: 15m
[DRY-RUN] No changes applied (remove --dry-run to execute)Dependencies and Requirements
System Dependencies
- Python 3.9+: Core runtime
- Redis 6.2+: State persistence and pub/sub
- Docker 20.10+: Container execution (if using Docker executor)
- Kubernetes 1.24+: K8s job orchestration (if using K8s executor)
- AWS CLI 2.x: S3 artifact storage and Batch jobs (optional)
- Git 2.30+: Pipeline definition version control
Python Packages
pip install \
pyyaml==6.0 \
croniter==1.4.7 \
pydantic==2.4.2 \
redis==4.6.0 \
boto3==1.28.62 \
kubernetes==26.1.0 \
pyspark==3.5.0 \
sqlalchemy==2.0.23 \
requests==2.31.0 \
click==8.1.7Configuration File (~/.openclaw/config/auto-orchestrator.yaml)
orchestrator:
redis_url: "redis://localhost:6379/0"
workdir: "/var/lib/orchestration"
log_level: "INFO"
executor: "k8s" # local, docker, k8s, aws-batch
state:
ttl_days: 90
compression: true
archive_after_days: 30
resources:
default_cpu: 1.0
default_memory: "2Gi"
max_concurrent_tasks: 100
retry:
default_max_attempts: 3
default_backoff: "exponential"
initial_delay_seconds: 30
max_delay_seconds: 600
notifications:
smtp:
host: "smtp.example.com"
port: 587
from: "orchestrator@example.com"
slack_webhook: "https://hooks.slack.com/services/..."
pagerduty_service_key: "{{ env.PAGERDUTY_KEY }}"
storage:
artifact_store: "s3" # s3, gcs, azure, local
s3_bucket: "orchestration-artifacts"
s3_region: "us-east-1"
local_path: "/data/orchestration/artifacts"
monitoring:
prometheus_endpoint: "http://prometheus:9090"
grafana_dashboard: "orchestration-overview"
alert_webhooks:
- "https://alerts.example.com/pipelines"
executor_config:
docker:
network_mode: "bridge"
default_image: "python:3.9-slim"
registry: "registry.example.com"
k8s:
namespace: "data-pipelines"
service_account: "orchestrator"
image_pull_secrets: ["registry-creds"]
node_selectors:
gpu: "nvidia.com/gpu.present"
tolerations:
- key: "dedicated"
operator: "Equal"
value: "orchestration"
effect: "NoSchedule"
aws_batch:
job_queue: "orchestration-queue"
job_definition: "orchestration-job:1"
compute_environment: "orchestration-env"Security Requirements
Principle of Least Privilege:
- Docker containers run as non-root users (UID 1000+)
- K8s pods use dedicated service accounts with minimal RBAC
- AWS IAM roles scoped to specific S3 buckets and DynamoDB tables
Secrets Management:
- Never store secrets in pipeline definitions
- Use Kubernetes secrets, AWS Secrets Manager, or HashiCorp Vault
- Rotate secrets automatically (30-day rotation)
- Audit all secret access
Network Policies:
- K8s: Restrict pod-to-pod communication via NetworkPolicy
- Docker: Use custom bridge networks with firewall rules
- Egress only to whitelisted external endpoints
Resource Quotas:
- Per-namespace CPU/memory limits in K8s
- Docker daemon resource limits
- AWS Batch compute environment limits
Artifact Encryption:
- S3: SSE-S3 or SSE-KMS with rotation
- Transit: TLS 1.3 for all network traffic
- At-rest: Full-disk encryption for local storage
Verification Steps
After Pipeline Execution
- Check Execution State:
# Verify execution record
orchestrate state get --execution-id abc123-def456 --format json | jq '.status'
# Check all stages completed
orchestrate state get --execution-id abc123-def456 --format json | jq '.stages[].status'- Validate Artifact Integrity:
# Compare checksums
aws s3api head-object \
--bucket orchestration-artifacts \
--key "executions/abc123-def456/artifacts/transformed_orders.parquet" \
--query 'Metadata.checksum' | \
sha256sum -c -
# Verify all expected artifacts exist
EXPECTED=$(yq '.stages[].tasks[].output_artifacts[].upload_to' pipeline.yaml | wc -l)
ACTUAL=$(aws s3 ls s3://transformed-sales-data/ | grep abc123-def456 | wc -l)
test $EXPECTED -eq $ACTUAL && echo "All artifacts uploaded" || echo "Artifact count mismatch"- Confirm Data Quality:
# If data quality task ran, check its metrics
orchestrate state get --execution-id abc123-def456 \
--format json | jq '.stages[] | select(.name=="validate") | .tasks[] | select(.name=="data_quality_checks") | .outputs.metrics'- Verify Notifications Sent:
# Check notification log (if using Slack)
grep "abc123-def456" /var/log/orchestrator/notifications.log | tail -5- Review Resource Usage:
# Query Prometheus metrics
curl -s "http://prometheus:9090/api/v1/query?query=orchestrator_task_duration_seconds{execution_id=\"abc123-def456\"}" | jq '.data.result[] | {task: .metric.task, duration: .value[1]}'
# Check for over-provisioning
curl -s "http://prometheus:9090/api/v1/query?query=orchestrator_task_cpu_usage_percent{execution_id=\"abc123-def456\"}" | jq -r '.data.result[] | select(.value[1] | tonumber < 50) | "Underutilized: \(.metric.task) \(.value[1])%"'Pre-Execution Validation Checklist
# 1. Validate pipeline syntax
orchestrate validate pipeline --pipeline ./new_pipeline.yaml --strict
if [ $? -ne 0 ]; then exit 1; fi
# 2. Check all scripts exist
while read -r script; do
test -f "$script" || { echo "Missing script: $script"; exit 1; }
done < <(yq '.stages[].tasks[].script' new_pipeline.yaml | grep -v null)
# 3. Verify external connections
orchestrate validate pipeline \
--pipeline ./new_pipeline.yaml \
--check-connections \
--timeout 30s
# 4. Check resource quotas
orchestrate resource check \
--pipeline ./new_pipeline.yaml \
--executor k8s
# 5. Test variable substitution
export EXTRACTION_DATE="2024-01-15"
orchestrate validate pipeline \
--pipeline ./new_pipeline.yaml \
--env EXTRACTION_DATE \
--format json | jq '.variables'Health Check for Orchestrator Service
#!/bin/bash
# Check orchestrator daemon status
# 1. Check Redis connectivity
redis-cli -h localhost -p 6379 ping || { echo "Redis down"; exit 1; }
# 2. Check orchestrator process
systemctl is-active --quiet orchestrator || { echo "Orchestrator service stopped"; exit 1; }
# 3. Check recent executions haven't all failed
FAILED_24H=$(orchestrate history list --last 24h --status failed | wc -l)
SUCCESS_24H=$(orchestrate history list --last 24h --status succeeded | wc -l)
RATIO=$(( FAILED_24H * 100 / (FAILED_24H + SUCCESS_24H) ))
if [ $RATIO -gt 20 ]; then
echo "High failure rate: ${RATIO}% in last 24h"
exit 2
fi
# 4. Check disk space for artifacts
DISK_USAGE=$(df --output=pcent /var/lib/orchestration | tail -1 | tr -d '%')
if [ $DISK_USAGE -gt 85 ]; then
echo "Disk usage critical: ${DISK_USAGE}%"
exit 3
fi
echo "Orchestrator healthy"Troubleshooting
Common Issues and Solutions
Issue 1: "Pipeline validation failed: stage tasks reference non-existent script"
Cause: Script path in pipeline definition doesn't exist or is relative to wrong directory.
Fix:
# Ensure you're in pipeline directory
cd /path/to/pipeline/directory
# Verify all script paths
yq '.stages[].tasks[].script' pipeline.yaml | while read script; do
test -f "$script" || echo "Missing: $script"
done
# Use absolute paths or ensure pipeline is executed from correct workdir
orchestrate pipeline execute \
--pipeline /absolute/path/to/pipeline.yaml \
--workdir /absolute/path/to/Issue 2: "Kubernetes pod failed: OOMKilled"
Cause: Task memory request too low; container exceeded limit.
Fix:
# Check actual memory usage from logs
kubectl logs pod/ml-abc123 --previous | grep " Memory "
# Adjust resources in pipeline
# Increase memory request AND limit (limit should be ~10-20% higher than request)
resources:
memory: "16Gi" # Request
limits:
memory: "18Gi" # Allow burstPrevention: Set historical monitoring to auto-scale memory:
resources:
memory: "auto" # Uses 95th percentile from historical runs
limits:
memory: "auto+2Gi"Issue 3: "Task stuck in RUNNING state for > timeout"
Cause: Task exceeded its timeout or hung waiting for external resource.
Diagnosis:
# View current task state
orchestrate state get --execution-id abc123 --format json | jq '.stages[] | select(.status=="running") | .tasks[] | select(.status=="running")'
# Check logs via Redis (or S3 if uploaded)
redis-cli HGET orchestration:execution:abc123 tasks:extract_orders:logs | head -50
# If K8s pod
kubectl describe pod/ml-abc123 --namespace data-pipelines
kubectl logs pod/ml-abc123 --previous # Previous container if restartedFix:
# Increase timeout in pipeline
tasks:
- name: long_running_task
timeout: 6h # Instead of 2h
# Or manually cancel if stuck
orchestrate state set \
--execution-id abc123 \
--task stuck_task_id \
--status failed \
--message "Manual cancel - exceeded SLA" \
--forceIssue 4: "Redis connection refused"
Cause: Redis not running or wrong connection string.
Fix:
# Check Redis status
systemctl status redis
sudo redis-cli ping
# Fix Redis URL in config
orchestrate config set \
--key redis_url \
--value "redis://localhost:6379/0" \
--type string
# If Redis needs authentication
orchestrate config set \
--key redis_url \
--value "rediss://:password@redis-host:6379/0" \
--type stringIssue 5: "Pipeline failed: variable substitution error - undefined variable"
Cause: Required variable not provided via --env or --vars.
Fix:
# Check required variables
REQUIRED=$(yq '.variables | keys' pipeline.yaml)
PROVIDED=$(env | grep -E '^(EXTRACTION_DATE|...)=' | cut -d= -f1)
echo "Missing: $(comm -23 <(echo "$REQUIRED" | sort) <(echo "$PROVIDED" | sort))"
# Provide via vars file
cat > overrides.json << 'EOF'
{
"EXTRACTION_DATE": "2024-01-15",
"SALES_API_ENDPOINT": "https://api.sales.com/v2"
}
EOF
orchestrate pipeline execute \
--pipeline pipeline.yaml \
--vars overrides.jsonIssue 6: "Task failed: permission denied on S3 bucket"
Cause: IAM role or credentials don't have access to S3 bucket.
Fix:
- Check IAM policy attached to role:
aws iam get-role --role-name orchestration-task-role | jq '.Role.AssumeRolePolicyDocument'
aws iam list-attached-role-policies --role-name orchestration-task-role- Verify bucket policy allows access:
aws s3api get-bucket-policy --bucket raw-sales-data | jq .- Test with AWS CLI (using same credentials):
aws s3 ls s3://raw-sales-data/ --recursive --max-items 1- Update pipeline to use correct credentials:
tasks:
- name: extract_orders
env:
AWS_ACCESS_KEY_ID: "{{ secrets.AWS_ACCESS_KEY }}"
AWS_SECRET_ACCESS_KEY: "{{ secrets.AWS_SECRET_KEY }}"
AWS_DEFAULT_REGION: "us-east-1"
# OR use IAM role if running on EC2/EKSIssue 7: "Stage failed: task retry limit exceeded"
Cause: Retry policy exhausted without success.
Diagnosis:
# Check error pattern
orchestrate state get --execution-id abc123 --format json | jq '.stages[].tasks[] | select(.status=="failed") | {id: .id, error: .error, attempt: .attempt}'
# Look at logs to determine if transient or permanent
orchestrate logs --execution-id abc123 --task stuck_task --lines 50Fix:
- Transient errors (network, API rate limits): Increase retry count or backoff.
- Permanent errors (bad data, invalid query): Fix pipeline code and re-run.
To skip failed task and continue (if safe):
orchestrate pipeline execute \
--pipeline pipeline.yaml \
--skip-tasks stuck_task_id,another_failed_task \
--forceIssue 8: "K8s pod pending indefinitely"
Cause: Insufficient cluster resources or node selector mismatch.
Diagnosis:
kubectl describe pod/ml-abc123 --namespace data-pipelines | grep -A 10 "Events:"
kubectl get nodes -o json | jq '.items[] | {name: .metadata.name, allocatable: .status.allocatable}'Fix:
# Adjust resource requests to match cluster capacity
resources:
cpu: "2.0" # Not "16.0" if cluster has limited cores
memory: "4Gi"
# Or remove restrictive node selectors
# node_selector: {} # Allow any nodeTemporary workaround: Increase cluster size or use spot instances.
Issue 9: "Artifact upload failed: AccessDenied"
Cause: Orchestrator service account lacks write permissions to storage bucket.
Fix:
# Check current IAM role
aws sts get-caller-identity
# Update bucket policy to allow orchestration role
cat > bucket-policy.json << 'EOF'
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:role/orchestration-task-role"
},
"Action": ["s3:PutObject", "s3:GetObject"],
"Resource": "arn:aws:s3:::orchestration-artifacts/*"
}
]
}
EOF
aws s3api put-bucket-policy --bucket orchestration-artifacts --policy file://bucket-policy.jsonIssue 10: "Pipeline succeeded but data looks stale/incomplete"
Cause: Logic error in extraction/transformation code, not caught by validation.
Debugging:
# Compare row counts with previous run
PREV_COUNT=$(orchestrate state get --execution-id previous_run --format json | jq -r '.stages[].tasks[] | select(.name=="load_to_dw") | .outputs.row_count')
CUR_COUNT=$(orchestrate state get --execution-id abc123 --format json | jq -r '.stages[].tasks[] | select(.name=="load_to_dw") | .outputs.row_count')
echo "Previous: $PREV_COUNT, Current: $CUR_COUNT"
# If using data quality validation, check its metrics
orchestrate state get --execution-id abc123 --format json | jq '.stages[] | select(.name=="validate") | .tasks[] | .validation_results'
# Run data diff manually (if Delta Lake/Hudi)
spark-submit --class DataDiff ./tools/data-diff.jar \
--base s3://dw/table/date=2024-01-13/ \
--target s3://dw/table/date=2024-01-14/ \
--output diff_report.htmlPerformance Tuning
Slow Task:
- Profile code: add timing logs or use
cProfile - Increase allocated CPU/memory
- Parallelize within task (multi-threading, Spark partitions)
- Consider splitting into multiple pipeline tasks
- Profile code: add timing logs or use
Resource Waste:
- Enable auto-scaling based on historical metrics:
resources: cpu: "auto" # Uses 80th percentile from last 10 runs memory: "auto*1.2" # 20% bufferLong Pipeline Duration:
- Increase
max_concurrentin pipeline config - Identify and eliminate sequential dependencies
- Use
parallel: trueat stage level
- Increase
Log Debugging Commands
# Stream live logs (if running)
orchestrate logs --execution-id abc123 --follow
# Get logs from completed task
orchestrate logs --execution-id abc123 --task extract_orders --full > extract_orders.log
# Search logs for error pattern
orchestrate logs --execution-id abc123 --task extract_orders | grep -i "timeout\|error\|exception" | head -20
# Unified log tail with timestamps
orchestrate logs --execution-id abc123 --timestamps --format json | jq -r '"\(.timestamp) \(.task) \(.level): \(.message)"'Rollback Commands
Point-in-Time Rollback
Restore pipeline state to a previous successful execution or checkpoint:
# Rollback to previous successful execution (full state restore)
orchestrate rollback execute \
--execution-id abc123-def456 \
--to-execution-id xyz789-uvw456 \
--strategy checkpoint \
--include-artifacts \
--verify \
--force
# Rollback to specific timestamp (e.g., before bad data loaded)
orchestrate rollback execute \
--execution-id abc123-def456 \
--to-timestamp "2024-01-15T14:30:00Z" \
--strategy revert \
--dry-run
# Rollback specific stage only (partial rollback)
orchestrate rollback execute \
--execution-id abc123-def456 \
--to-execution-id xyz789-uvw456 \
--stage validate \
--strategy restart \
--forceArtifact Recovery
Manually restore artifacts from archive without affecting execution state:
# Download artifact from specific execution
orchestrate artifact download \
--execution-id xyz789-uvw456 \
--artifact transformed_orders.parquet \
--output ./recovered/transformed_orders.parquet
# Or download entire artifact set
orchestrate artifact download-all \
--execution-id xyz789-uvw456 \
--output ./recovered/Database Rollback (if using SQL scripts with transactions)
# Orchestrator can automatically wrap SQL tasks in transactions if configured
# Manual database rollback (PostgreSQL example):
orchestrate db rollback \
--target-dw analytics-prod \
--to-timestamp "2024-01-15 04:30:00 UTC" \
--tables orders,customers,order_items \
--dry-run
# Output:
# [DRY-RUN] Would execute:
# BEGIN;
# DELETE FROM orders WHERE created_at >= '2024-01-15 04:30:00';
# DELETE FROM order_items WHERE order_id IN (SELECT id FROM orders WHERE ...);
# DELETE FROM customers WHERE updated_at >= '2024-01-15 04:30:00';
# COMMIT;
# [DRY-RUN] Rows to delete: orders=2.3M, order_items=5.1M, customers=458KState Cleanup After Failed Rollback
If rollback leaves inconsistent state:
# Force cleanup partial execution
orchestrate state cleanup \
--execution-id abc123-def456 \
--force
# Reset specific task to retry
orchestrate state set \
--execution-id abc123-def456 \
--task load_to_dw \
--status pending \
--message "Retry after manual fix" \
--forceCluster/Infrastructure Rollback
If pipeline deployed infrastructure that needs teardown:
# Tear down K8s resources created by pipeline
orchestrate infra destroy \
--execution-id abc123-def456 \
--resources deployment,service,configmap \
--namespace data-pipelines \
--dry-run
# Rollback Terraform state (if pipeline applied Terraform)
orchestrate terraform rollback \
--execution-id abc123-def456 \
--state-file ./terraform.tfstate \
--backup-file ./terraform.tfstate.backup \
--plan ./rollback_planFull System Rollback (Emergency)
In case of catastrophic failure affecting multiple executions:
# Restore Redis from backup (last 24h)
redis-cli -h backup-redis -p 6379 --rdb /tmp/restore.rdb
systemctl restart redis
# Replay execution from archival state
orchestrate state restore-archive \
--archive s3://orchestration-archives/2024-01-15/abc123.parquet \
--dry-runRollback Verification Checklist
After any rollback, verify:
# 1. Execution state is consistent
orchestrate state get --execution-id abc123 --format json | jq '.status' # Should be succeeded or running
# 2. All expected artifacts exist
EXPECTED_ARTIFACTS=$(yq '.stages[].tasks[].output_artifacts[].upload_to' pipeline.yaml | wc -l)
ACTUAL_ARTIFACTS=$(aws s3 ls s3://transformed-sales-data/ | grep abc123 | wc -l)
test $EXPECTED_ARTIFACTS -eq $ACTUAL_ARTIFACTS || echo "Artifact count mismatch!"
# 3. Downstream systems received data
curl -s https://dw-healthcheck.example.com/status | jq '.last_updated > "2024-01-15T04:00:00Z"'
# 4. No orphaned resources
kubectl get pods --namespace data-pipelines --field-selector status.phase=Running | grep abc123 || echo "No orphaned pods"
# 5. Notifications sent
grep "Rollback completed" /var/log/orchestrator/notifications.log | tail -1Rollback Strategy Comparison
| Strategy | Use Case | Duration | Risk |
|---|---|---|---|
checkpoint |
Hardware failure mid-pipeline | Low (15m) | Minimal - restores exact state |
restart |
Logical error in last stage | Medium (full stage runtime) | Medium - re-runs from stage |
revert |
Bad data landed in warehouse | High (full pipeline) | High - may need manual cleanup |
Recommendation: Prefer checkpoint when available. Use revert only as last resort after understanding data contamination scope.
Rollback Dry-Run Preview
Always preview rollback impacts:
orchestrate rollback execute \
--execution-id abc123 \
--to-execution-id xyz789 \
--dry-run \
--verbose
# Typical output:
[DRY-RUN] Rollback plan:
1. Restore Redis keys: 147 keys from execution xyz789
2. Delete Redis keys for abc123: 89 keys
3. Download artifacts (2.3GB):
- s3://.../transformed_orders.parquet (1.8GB)
- s3://.../data_quality_report.json (15MB)
4. Delete S3 artifacts created by abc123: 47 objects
5. Delete K8s pods: ml-abc123-*
6. NOT destroying: persistent volumes, RDS instances
7. Verification steps will run after restore
[DRY-RUN] Estimated time: 12m
[DRY-RUN] Risk level: LOW (no schema changes detected)Important: Always test rollback procedures in non-production environments quarterly. Document pipeline-specific rollback steps in the pipeline's README.
```