Lineagentic-Flow

Agentic AI solution for building end-to-end data lineage across diverse types of data processing pipelines.

AI-AgentsPythonSparkSQLAirflowNeo4jJava

Lineagentic-Flow

Lineagentic-flow is an agentic AI solution for building end-to-end data lineage across diverse types of data processing scripts across different platforms. It is designed to be modular and customizable, and can be extended to support new data processing script types. In a nutshell, this is what it does:

┌─────────────┐    ┌───────────────────────────────┐    ┌────────────---───┐
│ source-code │───▶│   lineagentic-flow-algorithm  │───▶│  lineage output  │
│             │    │                               │    │                  │
└─────────────┘    └───────────────────────────────┘    └──────────────---─┘

Features

  • Plugin-based design pattern, simple to extend and customize
  • Command line interface for quick analysis
  • Support for multiple data processing script types (SQL, Python, Airflow, Spark, etc.)
  • Simple demo server to run locally and in Hugging Face Spaces

Quick Start

Installation

Install the package from PyPI:

pip install lineagentic-flow

Basic Usage

import asyncio
from lf_algorithm.framework_agent import FrameworkAgent
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

async def main():
    # Create an agent for SQL lineage extraction
    agent = FrameworkAgent(
        agent_name="sql-lineage-agent",
        model_name="gpt-4o-mini",
        source_code="SELECT id, name FROM users WHERE active = true"
    )
    
    # Run the agent to extract lineage
    result = await agent.run_agent()
    print(result)

# Run the example
asyncio.run(main())

Environment Variables

Set your API keys:

export OPENAI_API_KEY="your-openai-api-key"
export HF_TOKEN="your-huggingface-token"  # Optional

Supported Agents

Following table shows the current development agents in Lineagentic-flow algorithm:

Agent NameDoneUnder DevelopmentIn BacklogComment
python-lineage_agent
airflow_lineage_agent
java_lineage_agent
spark_lineage_agent
sql_lineage_agent
flink_lineage_agent
beam_lineage_agent
shell_lineage_agent
scala_lineage_agent
dbt_lineage_agent

Components

Algorithm Module

This is the brain of the Lineagentic-flow. It contains agents, which are implemented as plugins and acting as chain of thought process to extract lineage from different types of data processing scripts. The module is built using a plugin-based design pattern, allowing you to easily develop and integrate your own custom agents.

CLI Module

The CLI module provides a command line interface around the algorithm API and connects to a unified service layer.

Demo Module

The demo module is for teams who want to demo Lineagentic-flow in a fast and simple way, deployable into Hugging Face Spaces.

Command Line Interface (CLI)

Lineagentic-flow provides a powerful CLI tool for quick analysis:

# Basic SQL query analysis
lineagentic analyze --agent-name sql-lineage-agent --query "SELECT user_id, name FROM users WHERE active = true" --verbose

# Analyze with lineage configuration
lineagentic analyze --agent-name python-lineage-agent --query-file "my_script.py" --verbose

For more details, see the CLI documentation.

Architecture

The following figure illustrates the architecture behind the Lineagentic-flow, which is essentially a multi-layer architecture of backend and agentic AI algorithm that leverages a chain-of-thought process to construct lineage across various script types.

Mathematical Foundation

Following shows the mathematical foundation behind each layer of the algorithm.

Agent Framework

The agent framework does IO operations, memory management, and prompt engineering according to the script type (T) and its content (C).

P := f(T, C)

Runtime Orchestration Agent

The runtime orchestration agent orchestrates the execution of the required agents provided by the agent framework (P) by selecting the appropriate agent (A) and its corresponding task (T).

G = h([(A₁, T₁), (A₂, T₂), (A₃, T₃), (A₄, T₄)], P)

Syntax Analysis Agent

The Syntax Analysis agent analyzes the syntactic structure of the raw script to identify subqueries and nested structures and decompose the script into multiple subscripts.

sa₁, ⋯, saₙ := h([A₁, T₁], P)

Field Derivation Agent

The Field Derivation agent processes each subscript from the syntax analysis agent to derive field-level mapping relationships and processing logic.

fd₁, ⋯, fdₙ := h([A₂, T₂], sa₁, ⋯, saₙ)

Operation Tracing Agent

The Operation Tracing agent analyzes the complex conditions within each subscript identified in the syntax analysis agent, including filter conditions, join conditions, grouping conditions, and sorting conditions.

ot₁, ⋯, otₙ := h([A₃, T₃], sa₁, ⋯, saₙ)

Event Composer Agent

The Event Composer agent consolidates the results from the syntax analysis agent, the field derivation agent, and the operation tracing agent to generate the final lineage result.

A := h([A₄, T₄], sa₁, ⋯, saₙ, fd₁, ⋯, fdₙ, ot₁, ⋯, otₙ)

Activation and Deployment

To simplify the usage of Lineagentic-flow, a Makefile has been created to manage various activation and deployment tasks. You can explore the available targets directly within the Makefile.

  1. To start demo server:

    make start-demo-server
    
  2. To run all tests:

    make test
    
  3. To build package and publish to PyPI:

    make build-package
    make publish-pypi
    
  4. To clean all stack:

    make clean-all-stack
    
  5. To deploy Lineagentic-flow to Hugging Face Spaces:

    make gradio-deploy
    

    (You need to have a Hugging Face account and put secret keys there if you are going to use paid models)

Getting Started

Visit the GitHub repository to start using Lineagentic-flow.