etl-pipeline▌
claude-office-skills/skills · updated Apr 8, 2026
Comprehensive skill for designing and automating Extract, Transform, Load data pipelines.
ETL Pipeline
Comprehensive skill for designing and automating Extract, Transform, Load data pipelines.
Pipeline Architecture
Core ETL Flow
DATA PIPELINE ARCHITECTURE:
┌─────────────────────────────────────────────────────────┐
│ EXTRACT │
├─────────┬─────────┬─────────┬─────────┬─────────────────┤
│ Postgres│ MySQL │ MongoDB │ APIs │ Files (CSV/JSON)│
└────┬────┴────┬────┴────┬────┴────┬────┴────────┬────────┘
│ │ │ │ │
└─────────┴─────────┴────┬────┴──────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ TRANSFORM │
│ • Clean & Validate • Aggregate & Join │
│ • Normalize • Calculate Metrics │
│ • Deduplicate • Apply Business Rules │
└────────────────────────┬────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ LOAD │
├─────────────┬─────────────┬─────────────┬───────────────┤
│ BigQuery │ Snowflake │ Redshift │ Data Lake │
└─────────────┴─────────────┴─────────────┴───────────────┘
Source Connectors
Database Connections
sources:
postgres:
type: postgresql
host: db.example.com
port: 5432
database: production
ssl: true
extraction:
method: incremental
key: updated_at
batch_size: 10000
mysql:
type: mysql
host: mysql.example.com
port: 3306
database: analytics
extraction:
method: cdc
binlog: true
mongodb:
type: mongodb
connection_string: mongodb+srv://...
database: app_data
extraction:
method: change_streams
API Sources
api_sources:
stripe:
type: rest_api
base_url: https://api.stripe.com/v1
auth: bearer_token
endpoints:
- /charges
- /customers
- /subscriptions
pagination: cursor
rate_limit: 100/second
salesforce:
type: salesforce
instance_url: https://company.salesforce.com
auth: oauth2
objects:
- Account
- Opportunity
- Contact
bulk_api: true
Transformation Layer
Common Transformations
# Data Cleaning
transformations = {
"clean_nulls": {
"operation": "fill_null",
"columns": ["email", "phone"],
"value": "unknown"
},
"standardize_dates": {
"operation": "date_parse",
"columns": ["created_at", "updated_at"],
"format": "ISO8601"
},
"normalize_currency": {
"operation": "convert_currency",
"source_column": "amount",
"currency_column": "currency",
"target": "USD"
},
"deduplicate": {
"operation": "distinct",
"key_columns": ["customer_id", "transaction_id"],
"keep": "latest"
}
}
Aggregation Rules
-- Daily Revenue Aggregation
SELECT
DATE(created_at) as date,
product_category,
COUNT(*) as transactions,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value,
COUNT(DISTINCT customer_id) as unique_customers
FROM orders
WHERE created_at >= '${start_date}'
GROUP BY 1, 2
Join Operations
joins:
- name: enrich_orders
left: orders
right: customers
type: left
on:
- left: customer_id
right: id
select:
- orders.*
- customers.email
- customers.segment
- customers.lifetime_value
- name: add_product_details
left: enriched_orders
right: products
type: left
on:
- left: product_id
right: id
Load Strategies
BigQuery Load
bigquery_load:
project: my-project
dataset: analytics
table: fact_orders
schema:
- name: order_id
type: STRING
mode: REQUIRED
- name: customer_id
type: STRING
- name: amount
type: NUMERIC
- name: created_at
type: TIMESTAMP
load_config:
write_disposition: WRITE_APPEND
create_disposition: CREATE_IF_NEEDED
clustering_fields: [customer_id]
time_partitioning:
field: created_at
type: DAY
Snowflake Load
snowflake_load:
warehouse: ETL_WH
database: ANALYTICS
schema: PUBLIC
table: FACT_ORDERS
staging:
stage: '@MY_STAGE'
file_format: JSON
copy_options:
on_error: CONTINUE
purge: true
match_by_column_name: CASE_INSENSITIVE
Pipeline Orchestration
DAG Definition
pipeline:
name: daily_analytics_etl
schedule: "0 2 * * *" # 2 AM daily
tasks:
- id: extract_orders
type: extract
source: postgres
query: "SELECT * FROM orders WHERE date = '${execution_date}'"
- id: extract_customers
type: extract
source: postgres
query: "SELECT * FROM customers"
- id: transform_data
type: transform
dependencies: [extract_orders, extract_customers]
operations:
- join_customers
- calculate_metrics
- apply_business_rules
- id: load_warehouse
type: load
dependencies: [transform_data]
target: bigquery
table: fact_orders
- id: notify_complete
type: notification
dependencies: [load_warehouse]
channel: slack
message: "Daily ETL completed successfully"
Error Handling
error_handling:
retry:
max_attempts: 3
delay_seconds: 300
exponential_backoff: true
on_failure:
- log_error
- send_alert
- save_failed_records
dead_letter:
enabled: true
destination: s3://etl-errors/
retention_days: 30
Data Quality
Validation Rules
quality_checks:
- name: null_check
column: customer_id
rule: not_null
severity: error
- name: range_check
column: amount
rule: between
min: 0
max: 100000
severity: warning
- name: uniqueness
columns: [order_id]
rule: unique
severity: error
- name: referential_integrity
column: product_id
reference_table: products
reference_column: id
severity: error
- name: freshness
column: updated_at
rule: max_age_hours
value: 24
severity: warning
Quality Metrics Dashboard
DATA QUALITY REPORT - ${date}
═══════════════════════════════════════
Total Records Processed: 1,250,000
Passed Validation: 1,247,500 (99.8%)
Failed Validation: 2,500 (0.2%)
ISSUES BY TYPE:
┌─────────────────┬────────┬──────────┐
│ Issue Type │ Count │ Severity │
├─────────────────┼────────┼──────────┤
│ Null values │ 1,200 │ Warning │
│ Invalid format │ 850 │ Error │
│ Out of range │ 300 │ Warning │
│ Duplicates │ 150 │ Error │
└─────────────────┴────────┴──────────┘
Monitoring & Alerting
Pipeline Metrics
metrics:
- name: pipeline_duration
type: gauge
labels: [pipeline_name, status]
- name: records_processed
type: counter
labels: [pipeline_name, source, destination]
- name: error_count
type: counter
labels: [pipeline_name, error_type]
- name: data_freshness
type: gauge
labels: [table_name]
Alert Configuration
alerts:
- name: pipeline_failed
condition: status == 'failed'
channels: [pagerduty, slack]
- name: high_error_rate
condition: error_rate > 0.05
channels: [slack]
- name: slow_pipeline
condition: duration > 2 * avg_duration
channels: [slack]
- name: data_freshness
condition: freshness_hours > 24
channels: [email]
Best Practices
- Incremental Loading: Use incremental extraction when possible
- Idempotency: Ensure pipelines can be re-run safely
- Partitioning: Partition large tables by date
- Monitoring: Track pipeline health metrics
- Documentation: Document all transformations
- Testing: Test with sample data before production
- Version Control: Track pipeline changes in git
Ratings
4.7★★★★★60 reviews- ★★★★★Ava Thompson· Dec 24, 2024
etl-pipeline has been reliable in day-to-day use. Documentation quality is above average for community skills.
- ★★★★★Olivia Robinson· Dec 16, 2024
Registry listing for etl-pipeline matched our evaluation — installs cleanly and behaves as described in the markdown.
- ★★★★★Advait Brown· Dec 8, 2024
etl-pipeline fits our agent workflows well — practical, well scoped, and easy to wire into existing repos.
- ★★★★★Meera Martinez· Nov 27, 2024
etl-pipeline has been reliable in day-to-day use. Documentation quality is above average for community skills.
- ★★★★★Ava Khanna· Nov 15, 2024
etl-pipeline fits our agent workflows well — practical, well scoped, and easy to wire into existing repos.
- ★★★★★Naina Reddy· Nov 11, 2024
Keeps context tight: etl-pipeline is the kind of skill you can hand to a new teammate without a long onboarding doc.
- ★★★★★Olivia Choi· Nov 7, 2024
Useful defaults in etl-pipeline — fewer surprises than typical one-off scripts, and it plays nicely with `npx skills` flows.
- ★★★★★Noah Thomas· Oct 26, 2024
I recommend etl-pipeline for anyone iterating fast on agent tooling; clear intent and a small, reviewable surface area.
- ★★★★★Meera Huang· Oct 18, 2024
Solid pick for teams standardizing on skills: etl-pipeline is focused, and the summary matches what you get after install.
- ★★★★★Sakura Haddad· Oct 6, 2024
We added etl-pipeline from the explainx registry; install was straightforward and the SKILL.md answered most questions upfront.
showing 1-10 of 60