How to Build a Queryable ERP Sales Knowledge Graph with Apache AGE and Python ETL

This article shows how to batch import relational data such as customers and orders from an ERP sales module into a knowledge graph using Python, CSV, and Apache AGE. The goal is to eliminate data silos and reduce the complexity of multi-table relationship queries. Keywords: Apache AGE, ERP, Knowledge Graph.

The technical specification snapshot outlines the solution

Parameter Description
Core scenario Graph modeling for ERP sales order data
Primary languages Python, SQL
Graph query protocol Cypher (based on Apache AGE)
Data interchange layer CSV
Data source types MySQL / PostgreSQL / other RDBMS
Core dependencies pandas, psycopg, Apache AGE
Graph entities Customer, SalesOrder
Graph relationships PLACED_ORDER
Star count Not provided in the original article

Relational ERP data should be refactored into a graph model

ERP sales systems typically split customers, orders, and products across multiple tables and rely on foreign keys to maintain relationships. This structure works well for transaction processing, but it is not ideal for tracing business relationships across layers or for enabling AI to perform complex path reasoning.

After you map relational tables into a graph, customers become nodes, orders become nodes, and customer ordering behavior becomes edges. The value of this approach is not just “changing databases.” It makes the business logic hidden in table structures explicit.

Illustration of mapping ERP relational tables to a graph model AI Visual Insight: This diagram illustrates the abstraction process from two-dimensional relational tables to a graph structure. It highlights customer nodes, order nodes, and the edges that connect them, showing how foreign key fields can be reconstructed into traversable business paths that support subsequent Cypher queries and graph reasoning.

The minimum viable graph model should focus on the core order flow first

For the first modeling phase, keep only the highest-value objects: Customer, SalesOrder, and the PLACED_ORDER relationship. This lets you validate the approach quickly and avoids model bloat caused by pulling in products, inventory, payments, and logistics too early.

graph_model = {
    "Customer": ["customer_id", "name", "region"],  # Customer node properties
    "SalesOrder": ["order_id", "amount", "status"],  # Order node properties
    "PLACED_ORDER": ["order_date"]  # Relationship properties from customer to order
}

This code defines a minimum viable graph model that is suitable for validating the sales workflow first.

Python and CSV together create a stable batch ETL pipeline

In production environments, writing directly to a graph database one record at a time usually provides limited throughput. A more efficient strategy is to let Python handle extraction and transformation, use CSV as the intermediate format, and let Apache AGE handle bulk loading.

This layered design provides clear separation of responsibilities. Business rules live in Python, while import performance is handled by built-in database functions. That makes the process easier to debug, replay, and rerun incrementally.

Start by extracting the core sales data from the relational database

import pandas as pd

# Simulate extracting customer data from the ERP database
customers_data = {
    "customer_id": ["CUST001", "CUST002"],
    "name": ["Some Technology Company", "Future Trading Group"],
    "region": ["East China", "North China"]
}

# Simulate extracting order data from the ERP database
orders_data = {
    "order_id": ["SO20260501", "SO20260502"],
    "customer_id": ["CUST001", "CUST002"],  # The foreign key will later be converted into an edge
    "amount": [50000, 120000],
    "status": ["Pending Shipment", "Completed"]
}

df_customers = pd.DataFrame(customers_data)
df_orders = pd.DataFrame(orders_data)

This code simulates source data extraction. The key is to preserve both entity fields and foreign key fields.

Convert order foreign keys into graph relationship files

import os
from datetime import datetime

etl_dir = f"/tmp/age_etl_{datetime.now().strftime('%Y%m%d%H%M')}"
os.makedirs(etl_dir, exist_ok=True)

# Export customer node CSV
customer_csv_path = f"{etl_dir}/customers.csv"
df_customers.to_csv(customer_csv_path, index=False)

# Export order node CSV
order_csv_path = f"{etl_dir}/orders.csv"
df_orders.to_csv(order_csv_path, index=False)

# Generate the edge CSV from the order foreign key
edges_data = {
    "start_id": df_orders["customer_id"],  # The source is the customer ID
    "end_id": df_orders["order_id"],       # The destination is the order ID
    "order_date": datetime.now().date()     # Relationship property: order date
}

edge_csv_path = f"{etl_dir}/placed_order_edges.csv"
pd.DataFrame(edges_data).to_csv(edge_csv_path, index=False)

This code transforms foreign key relationships from the relational database into edge files that the graph database can load directly.

Apache AGE native loading functions support high-throughput graph ingestion

Apache AGE runs on top of PostgreSQL, so you can connect with a standard Python database driver and use its bulk import functions to load nodes and edges. This approach is better suited than row-by-row MERGE operations for initial graph construction or offline synchronization.

There are two key steps: load the AGE extension and set the search_path, then call the node-loading and edge-loading functions separately.

Use psycopg to execute bulk imports for nodes and edges

import psycopg

DB_CONFIG = {
    "dbname": "erp_graph_db",
    "user": "postgres",
    "password": "your_password",
    "host": "localhost",
    "port": 5432
}

GRAPH_NAME = "erp_sales_graph"

def load_data_to_age(customer_csv_path, order_csv_path, edge_csv_path):
    with psycopg.connect(**DB_CONFIG) as conn:
        with conn.cursor() as cur:
            cur.execute("LOAD 'age';")  # Load the AGE extension
            cur.execute('SET search_path = ag_catalog, "$user", public;')  # Set the graph search path

            # Bulk import customer nodes
            cur.execute(
                "SELECT load_labels_from_file(%s, %s, %s, true, true);",
                (GRAPH_NAME, "Customer", customer_csv_path)
            )

            # Bulk import order nodes
            cur.execute(
                "SELECT load_labels_from_file(%s, %s, %s, true, true);",
                (GRAPH_NAME, "SalesOrder", order_csv_path)
            )

            # Bulk import customer-order relationships
            cur.execute(
                "SELECT load_edges_from_file(%s, %s, %s, true);",
                (GRAPH_NAME, "PLACED_ORDER", edge_csv_path)
            )

        conn.commit()  # Commit the transaction once

This code completes the core Apache AGE bulk import loop and is well suited for offline ETL jobs and scheduled synchronization tasks.

Production-grade ETL must include incremental sync and fault tolerance

If you rebuild the entire graph every time, the cost will rise quickly. A more practical approach is to add an update_time field to the ERP source tables, extract only the data changed since the last synchronization point, and then write the sync timestamp into a metadata table.

In addition, corrupted CSV files, network instability, and database lock waits can all cause import failures. You should add retries, logging, and failed-file retention to the loading step so that you can support precise replay.

A minimal incremental synchronization control pattern can look like this

def get_incremental_sql(last_sync_time):
    # Extract only orders changed since the last sync
    return f"""
    SELECT order_id, customer_id, amount, status, update_time
    FROM sales_orders
    WHERE update_time > '{last_sync_time}'
    ORDER BY update_time ASC;
    """

This code demonstrates the core idea behind incremental extraction: control the synchronization scope with a time-based watermark.

A graph-enabled ERP is what truly makes the system useful for AI applications

Once orders, customers, products, and regions are organized as a graph structure, query logic shifts from multi-table joins to path traversal. For example, identifying high-value incomplete orders in East China or tracing which customer orders were affected by an abnormal product issue becomes much more direct.

This is also a key step in evolving ERP into an AI-native system. The goal is not simply to store more data, but to give data a structure that supports connectivity, reasoning, and question answering.

FAQ answers the most common implementation questions

1. Why not use PostgreSQL relational tables directly for complex queries?

Relational tables can handle analytical and statistical queries, but analysis across multiple entities and multi-hop relationships becomes complex very quickly. A graph model is better suited for expressing path relationships among customers, orders, products, and regions.

2. Is Apache AGE better suited for real-time writes or offline synchronization?

It supports both, but the approach in this article is more oriented toward offline batch synchronization. In ERP scenarios, teams typically start with scheduled batch ETL and then gradually evolve toward near-real-time synchronization.

3. Which nodes and edges should be modeled in phase one?

Start with only the most critical path: customer nodes, order nodes, and order-placement relationship edges. After the query scenarios stabilize, you can extend the model to include products, invoices, warehousing, collections, and other entities.

The core takeaway summarizes the implementation path

This article reconstructs a practical implementation path from ERP relational tables to an Apache AGE graph database. It covers graph model design, Python-plus-CSV batch ETL, AGE native loading functions, and optimizations for incremental synchronization and fault tolerance. It is well suited for engineering teams that want to upgrade sales orders, customers, products, and other business data into AI-understandable knowledge graphs.