creating-openlineage-extractors

astronomer/agents · updated Apr 8, 2026

MDX-style export adds YAML metadata + attribution linking explainx.ai and this canonical listing URL.

$npx skills add https://github.com/astronomer/agents --skill creating-openlineage-extractors
0 commentsdiscussion
summary

Custom OpenLineage extractors for unsupported Airflow operators and complex lineage scenarios.

  • Two approaches: add OpenLineage methods directly to operators you own (recommended), or create custom extractors for third-party operators you cannot modify
  • Extractors intercept operator execution at three points: before execution for static lineage, after success for runtime-determined outputs, and optionally after failure for partial lineage
  • Register extractors via airflow.cfg or environm
skill.md

Creating OpenLineage Extractors

This skill guides you through creating custom OpenLineage extractors to capture lineage from Airflow operators that don't have built-in support.

Reference: See the OpenLineage provider developer guide for the latest patterns and list of supported operators/hooks.

When to Use Each Approach

Scenario Approach
Operator you own/maintain OpenLineage Methods (recommended, simplest)
Third-party operator you can't modify Custom Extractor
Need column-level lineage OpenLineage Methods or Custom Extractor
Complex extraction logic OpenLineage Methods or Custom Extractor
Simple table-level lineage Inlets/Outlets (simplest, but lowest priority)

Important: Always prefer OpenLineage methods over custom extractors when possible. Extractors are harder to write, easier to diverge from operator behavior after changes, and harder to debug.

On Astro

Astro includes built-in OpenLineage integration — no additional transport configuration is needed. Lineage events are automatically collected and displayed in the Astro UI's Lineage tab. Custom extractors deployed to an Astro project are automatically picked up, so you only need to register them in airflow.cfg or via environment variable and deploy.


Two Approaches

1. OpenLineage Methods (Recommended)

Use when you can add methods directly to your custom operator. This is the go-to solution for operators you own.

2. Custom Extractors

Use when you need lineage from third-party or provider operators that you cannot modify.


Approach 1: OpenLineage Methods (Recommended)

When you own the operator, add OpenLineage methods directly:

from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    """Custom operator with built-in OpenLineage support."""

    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table
        self._rows_processed = 0  # Set during execution

    def execute(self, context):
        # Do the actual work
        self._rows_processed = self._process_data()
        return self._rows_processed

    def get_openlineage_facets_on_start(self):
        """Called when task starts. Return known inputs/outputs."""
        # Import locally to avoid circular imports
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
        )

    def get_openlineage_facets_on_complete(self, task_instance):
        """Called after success. Add runtime metadata."""
        from openlineage.client.event_v2 import Dataset
        from openlineage.client.facet_v2 import output_statistics_output_dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[
                Dataset(
                    namespace="postgres://db",
                    name=self.target_table,
                    facets={
                        "outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
                            rowCount=self._rows_processed
                        )
                    },
                )
            ],
        )

    def get_openlineage_facets_on_failure(self, task_instance):
        """Called after failure. Optional - for partial lineage."""
        return None

OpenLineage Methods Reference

Method When Called Required
get_openlineage_facets_on_start() Task enters RUNNING No
get_openlineage_facets_on_complete(ti) Task succeeds No
get_openlineage_facets_on_failure(ti) Task fails No

Implement only the methods you need. Unimplemented methods fall through to Hook-Level Lineage or inlets/outlets.


Approach 2: Custom Extractors

Use this approach only when you cannot modify the operator (e.g., third-party or provider operators).

Basic Structure

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class MyOperatorExtractor(BaseExtractor):
    """Extract lineage from MyCustomOperator."""

    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        """Return operator class names this extractor handles."""
        return ["MyCustomOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        """Called BEFORE operator executes. Use for known inputs/outputs."""
        # Access operator properties via self.operator
        source_table = self.operator.source_table
        target_table = self.operator.target_table

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{source_table}",
                )
            ],
            outputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{target_table}",
                )
            ],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        """Called AFTER operator executes. Use for runtime-determined lineage."""
        # Access properties set during execution
        # Useful for operators that determine outputs at runtime
        return None

OperatorLineage Structure

from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job

lineage = OperatorLineage(
    inputs=[Dataset(namespace="...", name="...")],      # Input datasets
    outputs=[Dataset(namespace="...", name="...")],     # Output datasets
    run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")},  # Run metadata
    job_facets={},                                      # Job metadata
)

Extraction Methods

Method When Called Use For
_execute_extraction() Before operator runs Static/known lineage
extract_on_complete(task_instance) After success Runtime-determined lineage
extract_on_failure(task_instance) After failure Partial lineage on errors

Registering Extractors

Option 1: Configuration file (airflow.cfg)

[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor

Option 2: Environment variable

AIRFLOW__OPENLINEAGE__EXTRACTORS
how to use creating-openlineage-extractors

How to use creating-openlineage-extractors on Cursor

AI-first code editor with Composer

1

Prerequisites

Before installing skills in Cursor, ensure your development environment meets these requirements:

  • Cursor installed and configured on your development machine
  • Node.js version 16.0+ with npm package manager (verify with node --version)
  • Active project directory or workspace where you want to add creating-openlineage-extractors
2

Execute installation command

Execute the skills CLI command in your project's root directory to begin installation:

$npx skills add https://github.com/astronomer/agents --skill creating-openlineage-extractors

The skills CLI fetches creating-openlineage-extractors from GitHub repository astronomer/agents and configures it for Cursor.

3

Select Cursor when prompted

The CLI will show a list of available agents. Use arrow keys to navigate and space to select Cursor:

◆ Which agents do you want to install to?
│ ── Universal (.agents/skills) ── always included ────
│ • Amp
│ • Antigravity
│ • Cline
│ • Codex
│ ●Cursor(selected)
│ • Cursor
│ • Windsurf
4

Verify installation

Confirm successful installation by checking the skill directory location:

.cursor/skills/creating-openlineage-extractors

Reload or restart Cursor to activate creating-openlineage-extractors. Access the skill through slash commands (e.g., /creating-openlineage-extractors) or your agent's skill management interface.

Security & Verification Notice

We perform automated surface-level scans (Gen AI Scanner, Socket, Snyk) during installation. These checks detect common vulnerabilities but do not guarantee complete security. Always review skill source code and verify the publisher's reputation before production use.

Skills execute code in your development environment. Always verify the publisher's identity, review recent commits, and test in isolated environments before production deployment.

List & Monetize Your Skill

Submit your Claude Code skill and start earning

GET_STARTED →

Use Cases

User Story & Requirements Generation

Create detailed user stories, acceptance criteria, and feature specs

Example

Generate user stories for 'password reset feature' with acceptance criteria, edge cases, and test scenarios

Reduce spec writing time by 50%, ensure comprehensive coverage

Competitive Analysis

Research competitors, compare features, identify gaps

Example

Analyze 5 competitor products, create feature comparison matrix, suggest differentiation opportunities

Complete competitive research in 2 hours instead of 2 days

Roadmap Prioritization

Evaluate features using frameworks (RICE, ICE, Kano) and create prioritized backlogs

Example

Score 20 feature ideas using RICE framework, generate prioritized roadmap with rationale

Make data-driven prioritization decisions faster

Stakeholder Communication

Draft PRDs, status updates, and stakeholder presentations

Example

Create executive summary of Q3 roadmap, monthly progress report, feature launch announcement

Save 3-5 hours/week on communication overhead

Implementation Guide

Prerequisites

  • Claude Desktop or compatible AI client
  • Access to product documentation and roadmap tools (Jira, Notion, etc.)
  • Understanding of product management frameworks (RICE, Jobs-to-be-Done, etc.)
  • Stakeholder contact information and communication channels

Time Estimate

30-60 minutes to see productivity improvements

Installation Steps

  1. 1.Install product management skill
  2. 2.Start with user story generation for known feature
  3. 3.Progress to competitive analysis: research 2-3 competitors
  4. 4.Use for roadmap prioritization: apply RICE/ICE scoring
  5. 5.Draft stakeholder communications and refine based on feedback
  6. 6.Build template library for recurring PM tasks
  7. 7.Share effective prompts with product team

Common Pitfalls

  • Not validating competitive research—verify facts before sharing
  • Accepting user stories without involving engineering team
  • Over-relying on frameworks without qualitative judgment
  • Not customizing outputs to company culture and communication style
  • Skipping stakeholder validation of generated requirements

Best Practices

✓ Do

  • +Validate research and competitive analysis with real data
  • +Collaborate with engineering when generating technical requirements
  • +Customize frameworks and templates to your company context
  • +Use skill for first drafts, refine with stakeholder input
  • +Document successful prompt patterns for PM tasks
  • +Combine AI efficiency with human judgment and intuition

✗ Don't

  • Don't publish competitive analysis without fact-checking
  • Don't finalize user stories without engineering review
  • Don't make prioritization decisions solely on AI scoring
  • Don't skip customer validation of generated requirements
  • Don't ignore company-specific context and culture

💡 Pro Tips

  • Provide context: company goals, constraints, customer feedback
  • Ask for alternatives: 'Show 3 ways to prioritize this roadmap'
  • Request stakeholder-specific formatting: 'Executive summary vs. engineering spec'
  • Use skill for 70% generation + 30% customization to company needs

When to Use This

✓ Use When

Use for user story writing, competitive research, roadmap prioritization, stakeholder communication, and PRD drafting. Best for reducing repetitive documentation and research work.

✗ Avoid When

Avoid for strategic product vision (requires deep customer empathy), pricing decisions (needs market and financial expertise), or when face-to-face customer discovery is more valuable than speed.

Learning Path

  1. 1Basic: user stories, feature specs, status updates
  2. 2Intermediate: competitive analysis, prioritization frameworks, PRDs
  3. 3Advanced: product strategy, go-to-market planning, OKR setting
  4. 4Expert: product vision, market positioning, business model innovation

Discussion

Product Hunt–style comments (not star reviews)
  • No comments yet — start the thread.
general reviews

Ratings

4.827 reviews
  • Shikha Mishra· Dec 20, 2024

    creating-openlineage-extractors has been reliable in day-to-day use. Documentation quality is above average for community skills.

  • Harper Zhang· Dec 16, 2024

    creating-openlineage-extractors reduced setup friction for our internal harness; good balance of opinion and flexibility.

  • Mateo Rahman· Dec 16, 2024

    creating-openlineage-extractors is among the better-maintained entries we tried; worth keeping pinned for repeat workflows.

  • Rahul Santra· Nov 11, 2024

    creating-openlineage-extractors reduced setup friction for our internal harness; good balance of opinion and flexibility.

  • Carlos Gill· Nov 7, 2024

    creating-openlineage-extractors has been reliable in day-to-day use. Documentation quality is above average for community skills.

  • Mateo Gonzalez· Nov 7, 2024

    Keeps context tight: creating-openlineage-extractors is the kind of skill you can hand to a new teammate without a long onboarding doc.

  • Carlos Bansal· Oct 26, 2024

    creating-openlineage-extractors fits our agent workflows well — practical, well scoped, and easy to wire into existing repos.

  • Pratham Ware· Oct 2, 2024

    We added creating-openlineage-extractors from the explainx registry; install was straightforward and the SKILL.md answered most questions upfront.

  • Alexander Tandon· Sep 9, 2024

    Registry listing for creating-openlineage-extractors matched our evaluation — installs cleanly and behaves as described in the markdown.

  • Neel Robinson· Sep 5, 2024

    creating-openlineage-extractors reduced setup friction for our internal harness; good balance of opinion and flexibility.

showing 1-10 of 27

1 / 3