creating-openlineage-extractors▌
astronomer/agents · updated Apr 8, 2026
MDX-style export adds YAML metadata + attribution linking explainx.ai and this canonical listing URL.
Custom OpenLineage extractors for unsupported Airflow operators and complex lineage scenarios.
- ›Two approaches: add OpenLineage methods directly to operators you own (recommended), or create custom extractors for third-party operators you cannot modify
- ›Extractors intercept operator execution at three points: before execution for static lineage, after success for runtime-determined outputs, and optionally after failure for partial lineage
- ›Register extractors via airflow.cfg or environm
Creating OpenLineage Extractors
This skill guides you through creating custom OpenLineage extractors to capture lineage from Airflow operators that don't have built-in support.
Reference: See the OpenLineage provider developer guide for the latest patterns and list of supported operators/hooks.
When to Use Each Approach
| Scenario | Approach |
|---|---|
| Operator you own/maintain | OpenLineage Methods (recommended, simplest) |
| Third-party operator you can't modify | Custom Extractor |
| Need column-level lineage | OpenLineage Methods or Custom Extractor |
| Complex extraction logic | OpenLineage Methods or Custom Extractor |
| Simple table-level lineage | Inlets/Outlets (simplest, but lowest priority) |
Important: Always prefer OpenLineage methods over custom extractors when possible. Extractors are harder to write, easier to diverge from operator behavior after changes, and harder to debug.
On Astro
Astro includes built-in OpenLineage integration — no additional transport configuration is needed. Lineage events are automatically collected and displayed in the Astro UI's Lineage tab. Custom extractors deployed to an Astro project are automatically picked up, so you only need to register them in airflow.cfg or via environment variable and deploy.
Two Approaches
1. OpenLineage Methods (Recommended)
Use when you can add methods directly to your custom operator. This is the go-to solution for operators you own.
2. Custom Extractors
Use when you need lineage from third-party or provider operators that you cannot modify.
Approach 1: OpenLineage Methods (Recommended)
When you own the operator, add OpenLineage methods directly:
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
"""Custom operator with built-in OpenLineage support."""
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
self._rows_processed = 0 # Set during execution
def execute(self, context):
# Do the actual work
self._rows_processed = self._process_data()
return self._rows_processed
def get_openlineage_facets_on_start(self):
"""Called when task starts. Return known inputs/outputs."""
# Import locally to avoid circular imports
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
)
def get_openlineage_facets_on_complete(self, task_instance):
"""Called after success. Add runtime metadata."""
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import output_statistics_output_dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[
Dataset(
namespace="postgres://db",
name=self.target_table,
facets={
"outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
rowCount=self._rows_processed
)
},
)
],
)
def get_openlineage_facets_on_failure(self, task_instance):
"""Called after failure. Optional - for partial lineage."""
return None
OpenLineage Methods Reference
| Method | When Called | Required |
|---|---|---|
get_openlineage_facets_on_start() |
Task enters RUNNING | No |
get_openlineage_facets_on_complete(ti) |
Task succeeds | No |
get_openlineage_facets_on_failure(ti) |
Task fails | No |
Implement only the methods you need. Unimplemented methods fall through to Hook-Level Lineage or inlets/outlets.
Approach 2: Custom Extractors
Use this approach only when you cannot modify the operator (e.g., third-party or provider operators).
Basic Structure
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class MyOperatorExtractor(BaseExtractor):
"""Extract lineage from MyCustomOperator."""
@classmethod
def get_operator_classnames(cls) -> list[str]:
"""Return operator class names this extractor handles."""
return ["MyCustomOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
"""Called BEFORE operator executes. Use for known inputs/outputs."""
# Access operator properties via self.operator
source_table = self.operator.source_table
target_table = self.operator.target_table
return OperatorLineage(
inputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{source_table}",
)
],
outputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{target_table}",
)
],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
"""Called AFTER operator executes. Use for runtime-determined lineage."""
# Access properties set during execution
# Useful for operators that determine outputs at runtime
return None
OperatorLineage Structure
from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
lineage = OperatorLineage(
inputs=[Dataset(namespace="...", name="...")], # Input datasets
outputs=[Dataset(namespace="...", name="...")], # Output datasets
run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")}, # Run metadata
job_facets={}, # Job metadata
)
Extraction Methods
| Method | When Called | Use For |
|---|---|---|
_execute_extraction() |
Before operator runs | Static/known lineage |
extract_on_complete(task_instance) |
After success | Runtime-determined lineage |
extract_on_failure(task_instance) |
After failure | Partial lineage on errors |
Registering Extractors
Option 1: Configuration file (airflow.cfg)
[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor
Option 2: Environment variable
AIRFLOW__OPENLINEAGE__EXTRACTORShow to use creating-openlineage-extractorsHow to use creating-openlineage-extractors on Cursor
AI-first code editor with Composer
1Prerequisites
Before installing skills in Cursor, ensure your development environment meets these requirements:
- ›Cursor installed and configured on your development machine
- ›Node.js version 16.0+ with npm package manager (verify with
node --version) - ›Active project directory or workspace where you want to add creating-openlineage-extractors
2Execute installation command
Execute the skills CLI command in your project's root directory to begin installation:
$npx skills add https://github.com/astronomer/agents --skill creating-openlineage-extractorsThe skills CLI fetches creating-openlineage-extractors from GitHub repository astronomer/agents and configures it for Cursor.
3Select Cursor when prompted
The CLI will show a list of available agents. Use arrow keys to navigate and space to select Cursor:
◆ Which agents do you want to install to?││ ── Universal (.agents/skills) ── always included ────│ • Amp│ • Antigravity│ • Cline│ • Codex│ ●Cursor(selected)│ • Cursor│ • Windsurf4Verify installation
Confirm successful installation by checking the skill directory location:
.cursor/skills/creating-openlineage-extractorsReload or restart Cursor to activate creating-openlineage-extractors. Access the skill through slash commands (e.g., /creating-openlineage-extractors) or your agent's skill management interface.
⚠Security & Verification Notice
We perform automated surface-level scans (Gen AI Scanner, Socket, Snyk) during installation. These checks detect common vulnerabilities but do not guarantee complete security. Always review skill source code and verify the publisher's reputation before production use.
Skills execute code in your development environment. Always verify the publisher's identity, review recent commits, and test in isolated environments before production deployment.
Additional Resources
List & Monetize Your Skill
Submit your Claude Code skill and start earning
GET_STARTED →Use Cases▌
User Story & Requirements Generation
Create detailed user stories, acceptance criteria, and feature specs
Example
Generate user stories for 'password reset feature' with acceptance criteria, edge cases, and test scenarios
✓Reduce spec writing time by 50%, ensure comprehensive coverage
Competitive Analysis
Research competitors, compare features, identify gaps
Example
Analyze 5 competitor products, create feature comparison matrix, suggest differentiation opportunities
✓Complete competitive research in 2 hours instead of 2 days
Roadmap Prioritization
Evaluate features using frameworks (RICE, ICE, Kano) and create prioritized backlogs
Example
Score 20 feature ideas using RICE framework, generate prioritized roadmap with rationale
✓Make data-driven prioritization decisions faster
Stakeholder Communication
Draft PRDs, status updates, and stakeholder presentations
Example
Create executive summary of Q3 roadmap, monthly progress report, feature launch announcement
✓Save 3-5 hours/week on communication overhead
Implementation Guide▌
Prerequisites
- ›Claude Desktop or compatible AI client
- ›Access to product documentation and roadmap tools (Jira, Notion, etc.)
- ›Understanding of product management frameworks (RICE, Jobs-to-be-Done, etc.)
- ›Stakeholder contact information and communication channels
Time Estimate
30-60 minutes to see productivity improvements
Installation Steps
- 1.Install product management skill
- 2.Start with user story generation for known feature
- 3.Progress to competitive analysis: research 2-3 competitors
- 4.Use for roadmap prioritization: apply RICE/ICE scoring
- 5.Draft stakeholder communications and refine based on feedback
- 6.Build template library for recurring PM tasks
- 7.Share effective prompts with product team
Common Pitfalls
- ⚠Not validating competitive research—verify facts before sharing
- ⚠Accepting user stories without involving engineering team
- ⚠Over-relying on frameworks without qualitative judgment
- ⚠Not customizing outputs to company culture and communication style
- ⚠Skipping stakeholder validation of generated requirements
Best Practices▌
✓ Do
- +Validate research and competitive analysis with real data
- +Collaborate with engineering when generating technical requirements
- +Customize frameworks and templates to your company context
- +Use skill for first drafts, refine with stakeholder input
- +Document successful prompt patterns for PM tasks
- +Combine AI efficiency with human judgment and intuition
✗ Don't
- −Don't publish competitive analysis without fact-checking
- −Don't finalize user stories without engineering review
- −Don't make prioritization decisions solely on AI scoring
- −Don't skip customer validation of generated requirements
- −Don't ignore company-specific context and culture
💡 Pro Tips
- ★Provide context: company goals, constraints, customer feedback
- ★Ask for alternatives: 'Show 3 ways to prioritize this roadmap'
- ★Request stakeholder-specific formatting: 'Executive summary vs. engineering spec'
- ★Use skill for 70% generation + 30% customization to company needs
When to Use This▌
✓ Use When
Use for user story writing, competitive research, roadmap prioritization, stakeholder communication, and PRD drafting. Best for reducing repetitive documentation and research work.
✗ Avoid When
Avoid for strategic product vision (requires deep customer empathy), pricing decisions (needs market and financial expertise), or when face-to-face customer discovery is more valuable than speed.
Learning Path▌
- 1Basic: user stories, feature specs, status updates
- 2Intermediate: competitive analysis, prioritization frameworks, PRDs
- 3Advanced: product strategy, go-to-market planning, OKR setting
- 4Expert: product vision, market positioning, business model innovation
Discussion
Product Hunt–style comments (not star reviews)- No comments yet — start the thread.
general reviewsRatings
4.8★★★★★27 reviews- ★★★★★Shikha Mishra· Dec 20, 2024
creating-openlineage-extractors has been reliable in day-to-day use. Documentation quality is above average for community skills.
- ★★★★★Harper Zhang· Dec 16, 2024
creating-openlineage-extractors reduced setup friction for our internal harness; good balance of opinion and flexibility.
- ★★★★★Mateo Rahman· Dec 16, 2024
creating-openlineage-extractors is among the better-maintained entries we tried; worth keeping pinned for repeat workflows.
- ★★★★★Rahul Santra· Nov 11, 2024
creating-openlineage-extractors reduced setup friction for our internal harness; good balance of opinion and flexibility.
- ★★★★★Carlos Gill· Nov 7, 2024
creating-openlineage-extractors has been reliable in day-to-day use. Documentation quality is above average for community skills.
- ★★★★★Mateo Gonzalez· Nov 7, 2024
Keeps context tight: creating-openlineage-extractors is the kind of skill you can hand to a new teammate without a long onboarding doc.
- ★★★★★Carlos Bansal· Oct 26, 2024
creating-openlineage-extractors fits our agent workflows well — practical, well scoped, and easy to wire into existing repos.
- ★★★★★Pratham Ware· Oct 2, 2024
We added creating-openlineage-extractors from the explainx registry; install was straightforward and the SKILL.md answered most questions upfront.
- ★★★★★Alexander Tandon· Sep 9, 2024
Registry listing for creating-openlineage-extractors matched our evaluation — installs cleanly and behaves as described in the markdown.
- ★★★★★Neel Robinson· Sep 5, 2024
creating-openlineage-extractors reduced setup friction for our internal harness; good balance of opinion and flexibility.
showing 1-10 of 27
1 / 3