Lineagentic-Flow
Lineagentic-flow is an agentic AI solution for building end-to-end data lineage across diverse types of data processing scripts across different platforms. It is designed to be modular and customizable, and can be extended to support new data processing script types. In a nutshell, this is what it does:
┌─────────────┐ ┌───────────────────────────────┐ ┌────────────---───┐
│ source-code │───▶│ lineagentic-flow-algorithm │───▶│ lineage output │
│ │ │ │ │ │
└─────────────┘ └───────────────────────────────┘ └──────────────---─┘
Features
- Plugin-based design pattern, simple to extend and customize
- Command line interface for quick analysis
- Support for multiple data processing script types (SQL, Python, Airflow, Spark, etc.)
- Simple demo server to run locally and in Hugging Face Spaces
Quick Start
Installation
Install the package from PyPI:
pip install lineagentic-flow
Basic Usage
import asyncio from lf_algorithm.framework_agent import FrameworkAgent import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) async def main(): # Create an agent for SQL lineage extraction agent = FrameworkAgent( agent_name="sql-lineage-agent", model_name="gpt-4o-mini", source_code="SELECT id, name FROM users WHERE active = true" ) # Run the agent to extract lineage result = await agent.run_agent() print(result) # Run the example asyncio.run(main())
Environment Variables
Set your API keys:
export OPENAI_API_KEY="your-openai-api-key" export HF_TOKEN="your-huggingface-token" # Optional
Supported Agents
Following table shows the current development agents in Lineagentic-flow algorithm:
| Agent Name | Done | Under Development | In Backlog | Comment |
|---|---|---|---|---|
| python-lineage_agent | ✓ | |||
| airflow_lineage_agent | ✓ | |||
| java_lineage_agent | ✓ | |||
| spark_lineage_agent | ✓ | |||
| sql_lineage_agent | ✓ | |||
| flink_lineage_agent | ✓ | |||
| beam_lineage_agent | ✓ | |||
| shell_lineage_agent | ✓ | |||
| scala_lineage_agent | ✓ | |||
| dbt_lineage_agent | ✓ |
Components
Algorithm Module
This is the brain of the Lineagentic-flow. It contains agents, which are implemented as plugins and acting as chain of thought process to extract lineage from different types of data processing scripts. The module is built using a plugin-based design pattern, allowing you to easily develop and integrate your own custom agents.
CLI Module
The CLI module provides a command line interface around the algorithm API and connects to a unified service layer.
Demo Module
The demo module is for teams who want to demo Lineagentic-flow in a fast and simple way, deployable into Hugging Face Spaces.
Command Line Interface (CLI)
Lineagentic-flow provides a powerful CLI tool for quick analysis:
# Basic SQL query analysis lineagentic analyze --agent-name sql-lineage-agent --query "SELECT user_id, name FROM users WHERE active = true" --verbose # Analyze with lineage configuration lineagentic analyze --agent-name python-lineage-agent --query-file "my_script.py" --verbose
For more details, see the CLI documentation.
Architecture
The following figure illustrates the architecture behind the Lineagentic-flow, which is essentially a multi-layer architecture of backend and agentic AI algorithm that leverages a chain-of-thought process to construct lineage across various script types.
Mathematical Foundation
Following shows the mathematical foundation behind each layer of the algorithm.
Agent Framework
The agent framework does IO operations, memory management, and prompt engineering according to the script type (T) and its content (C).
P := f(T, C)
Runtime Orchestration Agent
The runtime orchestration agent orchestrates the execution of the required agents provided by the agent framework (P) by selecting the appropriate agent (A) and its corresponding task (T).
G = h([(A₁, T₁), (A₂, T₂), (A₃, T₃), (A₄, T₄)], P)
Syntax Analysis Agent
The Syntax Analysis agent analyzes the syntactic structure of the raw script to identify subqueries and nested structures and decompose the script into multiple subscripts.
sa₁, ⋯, saₙ := h([A₁, T₁], P)
Field Derivation Agent
The Field Derivation agent processes each subscript from the syntax analysis agent to derive field-level mapping relationships and processing logic.
fd₁, ⋯, fdₙ := h([A₂, T₂], sa₁, ⋯, saₙ)
Operation Tracing Agent
The Operation Tracing agent analyzes the complex conditions within each subscript identified in the syntax analysis agent, including filter conditions, join conditions, grouping conditions, and sorting conditions.
ot₁, ⋯, otₙ := h([A₃, T₃], sa₁, ⋯, saₙ)
Event Composer Agent
The Event Composer agent consolidates the results from the syntax analysis agent, the field derivation agent, and the operation tracing agent to generate the final lineage result.
A := h([A₄, T₄], sa₁, ⋯, saₙ, fd₁, ⋯, fdₙ, ot₁, ⋯, otₙ)
Activation and Deployment
To simplify the usage of Lineagentic-flow, a Makefile has been created to manage various activation and deployment tasks. You can explore the available targets directly within the Makefile.
-
To start demo server:
make start-demo-server -
To run all tests:
make test -
To build package and publish to PyPI:
make build-package make publish-pypi -
To clean all stack:
make clean-all-stack -
To deploy Lineagentic-flow to Hugging Face Spaces:
make gradio-deploy(You need to have a Hugging Face account and put secret keys there if you are going to use paid models)
Getting Started
Visit the GitHub repository to start using Lineagentic-flow.