Two approaches: add OpenLineage methods directly to operators you own (recommended), or create custom extractors for third-party operators you cannot modify
Extractors intercept operator execution at three points: before execution for static lineage, after success for runtime-determined outputs, and optionally after failure for partial lineage
Confirm successful installation by checking the skill directory location:
.cursor/skills/creating-openlineage-extractors
Restart Cursor to activate creating-openlineage-extractors. Access via /creating-openlineage-extractors in your agent's command palette.
β
Security Notice
We perform automated surface-level scans (Gen AI Scanner, Socket, Snyk) during installation. These checks detect common vulnerabilities but do not guarantee complete security. Always review skill source code and verify the publisher's reputation before production use.
Skills execute code in your environment. Always review source, verify the publisher, and test in isolation before production.
Important: Always prefer OpenLineage methods over custom extractors when possible. Extractors are harder to write, easier to diverge from operator behavior after changes, and harder to debug.
On Astro
Astro includes built-in OpenLineage integration β no additional transport configuration is needed. Lineage events are automatically collected and displayed in the Astro UI's Lineage tab. Custom extractors deployed to an Astro project are automatically picked up, so you only need to register them in airflow.cfg or via environment variable and deploy.
Two Approaches
1. OpenLineage Methods (Recommended)
Use when you can add methods directly to your custom operator. This is the go-to solution for operators you own.
2. Custom Extractors
Use when you need lineage from third-party or provider operators that you cannot modify.
Approach 1: OpenLineage Methods (Recommended)
When you own the operator, add OpenLineage methods directly:
from airflow.models import BaseOperator
classMyCustomOperator(BaseOperator):"""Custom operator with built-in OpenLineage support."""def__init__(self, source_table:str, target_table:str,**kwargs):super().__init__(**kwargs) self.source_table = source_table
self.target_table = target_table
self._rows_processed =0# Set during executiondefexecute(self, context):# Do the actual work self._rows_processed = self._process_data()return self._rows_processed
defget_openlineage_facets_on_start(self):"""Called when task starts. Return known inputs/outputs."""# Import locally to avoid circular importsfrom openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage( inputs=[Dataset(namespace="postgres://db", name=self.source_table)], outputs=[Dataset(namespace="postgres://db", name=self.target_table)],)defget_openlineage_facets_on_complete(self, task_instance):"""Called after success. Add runtime metadata."""from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import output_statistics_output_dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage( inputs=[Dataset(namespace="postgres://db", name=self.source_table)], outputs=[ Dataset( namespace="postgres://db", name=self.target_table, facets={"outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet( rowCount=self._rows_processed
)},)],)defget_openlineage_facets_on_failure(self, task_instance):"""Called after failure. Optional - for partial lineage."""returnNone
OpenLineage Methods Reference
Method
When Called
Required
get_openlineage_facets_on_start()
Task enters RUNNING
No
get_openlineage_facets_on_complete(ti)
Task succeeds
No
get_openlineage_facets_on_failure(ti)
Task fails
No
Implement only the methods you need. Unimplemented methods fall through to Hook-Level Lineage or inlets/outlets.
Approach 2: Custom Extractors
Use this approach only when you cannot modify the operator (e.g., third-party or provider operators).
Basic Structure
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
classMyOperatorExtractor(BaseExtractor):"""Extract lineage from MyCustomOperator."""@classmethoddefget_operator_classnames(cls)->list[str]:"""Return operator class names this extractor handles."""return["MyCustomOperator"]def_execute_extraction(self)-> OperatorLineage |None:"""Called BEFORE operator executes. Use for known inputs/outputs."""# Access operator properties via self.operator source_table = self.operator.source_table
target_table = self.operator.target_table
return OperatorLineage( inputs=[ Dataset( namespace="postgres://mydb:5432", name=f"public.{source_table}",)], outputs=[ Dataset( namespace="postgres://mydb:5432", name=f"public.{target_table}",)],)defextract_on_complete(self, task_instance)-> OperatorLineage |None:"""Called AFTER operator executes. Use for runtime-determined lineage."""# Access properties set during execution# Useful for operators that determine outputs at runtimereturnNone
OperatorLineage Structure
from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
lineage = OperatorLineage( inputs=[Dataset(namespace="...", name="...")],# Input datasets outputs=[Dataset(namespace="...", name="...")],# Output datasets run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")},# Run metadata job_facets={},# Job metadata)
βΊAccess to product documentation and roadmap tools (Jira, Notion, etc.)
βΊUnderstanding of product management frameworks (RICE, Jobs-to-be-Done, etc.)
βΊStakeholder contact information and communication channels
Time Estimate
30-60 minutes to see productivity improvements
Steps
1Install product management skill
2Start with user story generation for known feature
3Progress to competitive analysis: research 2-3 competitors
4Use for roadmap prioritization: apply RICE/ICE scoring
5Draft stakeholder communications and refine based on feedback
6Build template library for recurring PM tasks
7Share effective prompts with product team
Common Pitfalls
β Not validating competitive researchβverify facts before sharing
β Accepting user stories without involving engineering team
β Over-relying on frameworks without qualitative judgment
β Not customizing outputs to company culture and communication style
β Skipping stakeholder validation of generated requirements
Best Practices
β Do
+Validate research and competitive analysis with real data
+Collaborate with engineering when generating technical requirements
+Customize frameworks and templates to your company context
+Use skill for first drafts, refine with stakeholder input
+Document successful prompt patterns for PM tasks
+Combine AI efficiency with human judgment and intuition
β Don't
βDon't publish competitive analysis without fact-checking
βDon't finalize user stories without engineering review
βDon't make prioritization decisions solely on AI scoring
βDon't skip customer validation of generated requirements
βDon't ignore company-specific context and culture
π‘ Pro Tips
β Provide context: company goals, constraints, customer feedback
β Ask for alternatives: 'Show 3 ways to prioritize this roadmap'
β Request stakeholder-specific formatting: 'Executive summary vs. engineering spec'
β Use skill for 70% generation + 30% customization to company needs
When to Use This
β Use when
Use for user story writing, competitive research, roadmap prioritization, stakeholder communication, and PRD drafting. Best for reducing repetitive documentation and research work.
β Avoid when
Avoid for strategic product vision (requires deep customer empathy), pricing decisions (needs market and financial expertise), or when face-to-face customer discovery is more valuable than speed.
Learning Path
1Basic: user stories, feature specs, status updates