Productivity

data-pipeline

claude-office-skills/skills · updated Apr 8, 2026

$npx skills add https://github.com/claude-office-skills/skills --skill data-pipeline
summary

Build data pipelines and ETL workflows for data integration, transformation, and analytics automation. Based on n8n's data workflow templates.

skill.md

Data Pipeline

Build data pipelines and ETL workflows for data integration, transformation, and analytics automation. Based on n8n's data workflow templates.

Overview

This skill covers:

  • Data extraction from multiple sources
  • Transformation and cleaning
  • Loading to destinations
  • Scheduling and monitoring
  • Error handling and alerts

ETL Patterns

Basic ETL Flow

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   EXTRACT   │───▶│  TRANSFORM  │───▶│    LOAD     │
│             │    │             │    │             │
│ • APIs      │    │ • Clean     │    │ • Database  │
│ • Databases │    │ • Map       │    │ • Warehouse │
│ • Files     │    │ • Aggregate │    │ • Files     │
│ • Webhooks  │    │ • Enrich    │    │ • APIs      │
└─────────────┘    └─────────────┘    └─────────────┘

n8n ETL Workflow

workflow: "Daily Sales ETL"
schedule: "2am daily"

nodes:
  # EXTRACT
  - name: "Extract from Shopify"
    type: shopify
    action: get_orders
    filter: created_at >= yesterday
    
  - name: "Extract from Stripe"
    type: stripe
    action: get_payments
    filter: created >= yesterday
    
  # TRANSFORM
  - name: "Merge Data"
    type: merge
    mode: combine_by_key
    key: order_id
    
  - name: "Transform"
    type: code
    code: |
      return items.map(item => ({
        date: item.created_at.split('T')[0],
        order_id: item.id,
        customer_email: item.email,
        total: parseFloat(item.total_price),
        currency: item.currency,
        items: item.line_items.length,
        source: item.source_name,
        payment_status: item.payment.status
      }));
      
  # LOAD
  - name: "Load to BigQuery"
    type: google_bigquery
    action: insert_rows
    table: sales_daily
    
  - name: "Update Google Sheets"
    type: google_sheets
    action: append_rows
    spreadsheet: "Daily Sales Report"

Data Sources

Common Extractors

extractors:
  databases:
    - postgresql:
        connection: connection_string
        query: "SELECT * FROM orders WHERE date >= $1"
        
    - mysql:
        connection: connection_string
        query: custom_sql
        
    - mongodb:
        connection: connection_string
        collection: orders
        filter: {date: {$gte: yesterday}}
        
  apis:
    - rest_api:
        url: "https://api.example.com/data"
        method: GET
        headers: {Authorization: "Bearer {token}"}
        pagination: handle_automatically
        
    - graphql:
        url: "https://api.example.com/graphql"
        query: graphql_query
        
  files:
    - csv:
        source: sftp/s3/google_drive
        delimiter: ","
        encoding: utf-8
        
    - excel:
        source: file_path
        sheet: "Sheet1"
        
    - json:
        source: api/file
        path: "data.items"
        
  saas:
    - salesforce: get_objects
    - hubspot: get_contacts/deals
    - stripe: get_charges
    - shopify: get_orders

Transformations

Common Transformations

transformations:
  cleaning:
    - remove_nulls: drop_or_fill
    - trim_whitespace: all_string_fields
    - deduplicate: by_key
    - validate: against_schema
    
  mapping:
    - rename_fields: {old_name: new_name}
    - convert_types: {date_string: date}
    - map_values: {status_code: status_name}
    
  aggregation:
    - group_by: [date, category]
    - sum: [revenue, quantity]
    - count: orders
    - average: order_value
    
  enrichment:
    - lookup: from_reference_table
    - geocode: from_address
    - calculate: derived_fields
    
  filtering:
    - where: condition
    - limit: n_rows
    - sample: percentage

Code Transform Examples

// Clean and normalize data
function transform(items) {
  return items.map(item => ({
    // Clean strings
    name: item.name?.trim().toLowerCase(),
    
    // Parse dates
    date: new Date(item.created_at).toISOString().split('T')[0],
    
    // Convert types
    amount: parseFloat(item.amount) || 0,
    
    // Map values
    status: statusMap[item.status_code] || 'unknown',
    
    // Calculate fields
    total: item.quantity * item.unit_price,
    
    // Filter nested
    tags: item.tags?.filter(t => t.active).map(t => t.name),
    
    // Default values
    source: item.source || 'direct'
  }));
}

// Aggregate data
function aggregate(items) {
  const grouped = {};
  
  items.forEach(item => {
    const key = `${item.date}_${item.category}`;
    if (!grouped[key]) {
      grouped[key] = {
        date: item.date,
        category: item.category,
        total_revenue: 0,
        order_count: 0
      };
    }
    grouped[key].total_revenue += item.amount;
    grouped[key].order_count += 1;
  });
  
  return Object.values(grouped);
}

Data Destinations

Common Loaders

loaders:
  data_warehouses:
    - bigquery:
        project: project_id
        dataset: analytics
        table: sales
        write_mode: append/truncate
        
    - snowflake:
        account: account_id
        warehouse: compute_wh
        database: analytics
        schema: public
        
    - redshift:
        cluster: cluster_id
        database: analytics
        
  databases:
    - postgresql:
        upsert: on_conflict_update
        
    - mysql:
        batch_insert: 1000_rows
        
  files:
    - s3:
        bucket: data-lake
        path: /processed/{date}/
        format: parquet
        
    - google_cloud_storage:
        bucket: data-bucket
        
  spreadsheets:
    - google_sheets:
        mode: append/overwrite
        
    - airtable:
        base: base_id
        table: table_name
        
  apis:
    - webhook:
        url: destination_url
        batch_size: 100

Scheduling & Monitoring

Pipeline Scheduling

scheduling:
  patterns:
    hourly:
      cron: "0 * * * *"
      use_for: real_time_dashboards
      
    daily:
      cron: "0 2 * * *"
      use_for: daily_reports
      
    weekly:
      cron: "0 3 * * 1"
      use_for: weekly_summaries
      
    on_demand:
      trigger: webhook/manual
      use_for: ad_hoc_analysis
      
  dependencies:
    - pipeline_a: must_complete_before pipeline_b
    - wait_for: all_extracts_complete
    
  retries:
    max_attempts: 3
    delay: exponential_backoff
    alert_on: final_failure

Monitoring & Alerts

monitoring:
  metrics:
    - rows_processed
    - execution_time
    - error_count
    - data_freshness
    
  alerts:
    pipeline_failed:
      channels: [slack, pagerduty]
      template: |
        🚨 *Pipeline Failed*
        
        Pipeline: {pipeline_name}
        Stage: {failed_stage}
        Error: {error_message}
        
        [View Logs]({logs_url})
        
    data_quality:
      trigger: anomaly_detected
      conditions:
        - row_count: differs_by > 50%
        - null_rate: exceeds_threshold
        - schema: changed_unexpectedly
        
    stale_data:
      trigger: last_update > threshold
      threshold: 2_hours

Data Quality

Quality Checks

data_quality:
  schema_validation:
    - required_fields: [id, date, amount]
    - field_types:
        id: integer
        date: date
        amount: number
    - allowed_values:
        status: [active, pending, closed]
        
  statistical_checks:
    - null_rate: < 5%
    - duplicate_rate: < 1%
    - value_range:
        amount: [0, 1000000]
        
  business_rules:
    - total_equals_sum_of_line_items
    - dates_are_not_in_future
    - email_format_valid
    
  trend_analysis:
    - row_count: within_2_std_of_mean
    - total_value: within_expected_range

Output Example

Request: "Create a daily sales data pipeline"

Output:

# Daily Sales Data Pipeline

## Pipeline Overview

Shopify + Stripe → Transform → BigQuery + Sheets


## Schedule
- Runs: 2am daily
- Timezone: UTC
- Retry: 3 attempts

## Extract

### Shopify Orders
```yaml
source: shopify
filter: created_at >= yesterday
fields: [id, email, total_price, line_items, created_at]

Stripe Payments

source: stripe
filter: created >= yesterday
fields: [id, amount, status, metadata.order_id]

Transform

// Join and clean data
{
  date: order.created_at.split('T')[0],
  order_id: order.id,
  customer: order.email,
  revenue: parseFloat(order.total_price),
  items: order.line_items.length,
  payment_status: payment.status
}

Load

BigQuery

  • Table: analytics.sales_daily
  • Mode: Append

Google Sheets

  • Sheet: "Daily Sales Dashboard"
  • Tab: "Raw Data"

Quality Checks

  • Row count > 0
  • No null order_ids
  • Revenue sum matches Stripe

Alerts

  • Slack: #data-alerts
  • On failure: @data-team

---

*Data Pipeline Skill - Part of Claude Office Skills*
general reviews

Ratings

4.829 reviews
  • Pratham Ware· Dec 28, 2024

    Useful defaults in data-pipeline — fewer surprises than typical one-off scripts, and it plays nicely with `npx skills` flows.

  • Soo Bhatia· Dec 28, 2024

    Useful defaults in data-pipeline — fewer surprises than typical one-off scripts, and it plays nicely with `npx skills` flows.

  • Mateo Haddad· Dec 20, 2024

    We added data-pipeline from the explainx registry; install was straightforward and the SKILL.md answered most questions upfront.

  • Sakshi Patil· Nov 19, 2024

    data-pipeline has been reliable in day-to-day use. Documentation quality is above average for community skills.

  • Carlos Jain· Nov 19, 2024

    data-pipeline has been reliable in day-to-day use. Documentation quality is above average for community skills.

  • Benjamin Haddad· Nov 11, 2024

    Keeps context tight: data-pipeline is the kind of skill you can hand to a new teammate without a long onboarding doc.

  • Chaitanya Patil· Oct 10, 2024

    Solid pick for teams standardizing on skills: data-pipeline is focused, and the summary matches what you get after install.

  • Nia Khan· Oct 10, 2024

    Solid pick for teams standardizing on skills: data-pipeline is focused, and the summary matches what you get after install.

  • Benjamin Garcia· Oct 2, 2024

    data-pipeline is among the better-maintained entries we tried; worth keeping pinned for repeat workflows.

  • Olivia Liu· Sep 21, 2024

    Useful defaults in data-pipeline — fewer surprises than typical one-off scripts, and it plays nicely with `npx skills` flows.

showing 1-10 of 29

1 / 3