Migrating from PostgreSQL to Geode

This guide provides a comprehensive approach to migrating from PostgreSQL to Geode. Unlike migrating between similar databases, moving from a relational model to a graph model requires rethinking how you structure and query your data.

Migration Overview

When to Use a Graph Database

Graph databases excel when:

  • Relationships are first-class citizens: Your queries traverse multiple relationships
  • Schema is flexible: New relationship types emerge over time
  • Many-to-many relationships dominate: Junction tables become unwieldy
  • Path queries are common: Finding connections between entities

When to Keep PostgreSQL

Consider keeping PostgreSQL for:

  • Simple CRUD operations with fixed schemas
  • Heavy aggregation/reporting workloads
  • Strong ACID requirements with complex constraints
  • Well-defined, stable relational structures

Hybrid Approach

Many organizations run both:

  • PostgreSQL for transactional data and reporting
  • Geode for relationship-heavy queries and traversals

Relational to Graph Model Conversion

Core Concepts Mapping

Relational ConceptGraph Equivalent
TableNode Label
RowNode
ColumnProperty
Primary KeyNode ID property
Foreign KeyRelationship
Junction TableRelationship (with properties)
ViewStored Query

Example: E-Commerce Schema

PostgreSQL Schema:

CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    price DECIMAL(10, 2) NOT NULL,
    category_id INTEGER REFERENCES categories(id)
);

CREATE TABLE categories (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    parent_id INTEGER REFERENCES categories(id)
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id),
    order_date TIMESTAMP DEFAULT NOW(),
    status VARCHAR(50)
);

CREATE TABLE order_items (
    id SERIAL PRIMARY KEY,
    order_id INTEGER REFERENCES orders(id),
    product_id INTEGER REFERENCES products(id),
    quantity INTEGER NOT NULL,
    unit_price DECIMAL(10, 2) NOT NULL
);

CREATE TABLE reviews (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id),
    product_id INTEGER REFERENCES products(id),
    rating INTEGER CHECK (rating BETWEEN 1 AND 5),
    comment TEXT,
    created_at TIMESTAMP DEFAULT NOW()
);

Geode Graph Model:

// Nodes
(:Customer {id, name, email, created_at})
(:Product {id, name, price})
(:Category {id, name})
(:Order {id, order_date, status})

// Relationships
(:Customer)-[:PLACED]->(:Order)
(:Order)-[:CONTAINS {quantity, unit_price}]->(:Product)
(:Product)-[:IN_CATEGORY]->(:Category)
(:Category)-[:PARENT]->(:Category)
(:Customer)-[:REVIEWED {rating, comment, created_at}]->(:Product)

Visual Representation:

                    +-----------+
                    | Category  |
                    +-----------+
                         ^
                         | :PARENT (self-referential)
                         |
                    +-----------+
                    | Category  |
                    +-----------+
                         ^
                         | :IN_CATEGORY
                         |
+-----------+       +-----------+       +-----------+
| Customer  |------>|  Order    |------>|  Product  |
+-----------+ :PLACED +-----------+ :CONTAINS +-----------+
     |                                       ^
     |                                       |
     +---------------------------------------+
                    :REVIEWED

Table to Node Mapping

Simple Tables

PostgreSQL:

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(100) UNIQUE,
    email VARCHAR(255),
    age INTEGER,
    active BOOLEAN DEFAULT true
);

GQL Node Creation:

CREATE (:User {
    id: 1,
    username: 'alice',
    email: 'alice@example.com',
    age: 30,
    active: true
})

Tables with Enums

PostgreSQL:

CREATE TYPE user_role AS ENUM ('admin', 'user', 'guest');

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255),
    role user_role
);

GQL:

// Option 1: Store as string property
CREATE (:User {id: 1, name: 'Alice', role: 'admin'})

// Option 2: Use labels for roles
CREATE (:User:Admin {id: 1, name: 'Alice'})

Tables with Arrays

PostgreSQL:

CREATE TABLE articles (
    id SERIAL PRIMARY KEY,
    title VARCHAR(255),
    tags TEXT[]
);

GQL:

// Option 1: Store as list property
CREATE (:Article {id: 1, title: 'Graph Databases', tags: ['database', 'graph', 'nosql']})

// Option 2: Create Tag nodes with relationships
CREATE (:Article {id: 1, title: 'Graph Databases'})
CREATE (:Tag {name: 'database'})
CREATE (:Tag {name: 'graph'})
MATCH (a:Article {id: 1}), (t:Tag {name: 'database'})
CREATE (a)-[:TAGGED]->(t)

Tables with JSON

PostgreSQL:

CREATE TABLE events (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255),
    metadata JSONB
);

GQL:

// Option 1: Store as map property
CREATE (:Event {
    id: 1,
    name: 'PageView',
    metadata: {browser: 'Chrome', os: 'Windows', version: '120.0'}
})

// Option 2: Flatten commonly queried fields
CREATE (:Event {
    id: 1,
    name: 'PageView',
    browser: 'Chrome',
    os: 'Windows',
    metadata: {version: '120.0'}
})

Foreign Keys to Relationships

One-to-Many Relationships

PostgreSQL:

CREATE TABLE departments (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255)
);

CREATE TABLE employees (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255),
    department_id INTEGER REFERENCES departments(id)
);

GQL:

// Create nodes
CREATE (:Department {id: 1, name: 'Engineering'})
CREATE (:Employee {id: 1, name: 'Alice'})

// Create relationship
MATCH (e:Employee {id: 1})
MATCH (d:Department {id: 1})
CREATE (e)-[:WORKS_IN]->(d)

Many-to-Many Relationships (Junction Tables)

PostgreSQL:

CREATE TABLE students (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255)
);

CREATE TABLE courses (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255)
);

CREATE TABLE enrollments (
    student_id INTEGER REFERENCES students(id),
    course_id INTEGER REFERENCES courses(id),
    enrolled_at TIMESTAMP DEFAULT NOW(),
    grade VARCHAR(2),
    PRIMARY KEY (student_id, course_id)
);

GQL:

// Junction table becomes relationship with properties
CREATE (:Student {id: 1, name: 'Alice'})
CREATE (:Course {id: 1, name: 'Database Systems'})

MATCH (s:Student {id: 1})
MATCH (c:Course {id: 1})
CREATE (s)-[:ENROLLED_IN {enrolled_at: timestamp(), grade: 'A'}]->(c)

Self-Referential Relationships

PostgreSQL:

CREATE TABLE employees (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255),
    manager_id INTEGER REFERENCES employees(id)
);

GQL:

CREATE (:Employee {id: 1, name: 'CEO'})
CREATE (:Employee {id: 2, name: 'VP Engineering'})
CREATE (:Employee {id: 3, name: 'Developer'})

MATCH (m:Employee {id: 1}), (e:Employee {id: 2})
CREATE (e)-[:REPORTS_TO]->(m)

MATCH (m:Employee {id: 2}), (e:Employee {id: 3})
CREATE (e)-[:REPORTS_TO]->(m)

SQL to GQL Translation

Basic Queries

SELECT:

-- SQL
SELECT name, email FROM customers WHERE age > 25;
-- GQL
MATCH (c:Customer)
WHERE c.age > 25
RETURN c.name, c.email

INSERT:

-- SQL
INSERT INTO customers (name, email, age) VALUES ('Alice', '[email protected]', 30);
-- GQL
CREATE (:Customer {name: 'Alice', email: 'alice@example.com', age: 30})

UPDATE:

-- SQL
UPDATE customers SET age = 31 WHERE name = 'Alice';
-- GQL
MATCH (c:Customer {name: 'Alice'})
SET c.age = 31

DELETE:

-- SQL
DELETE FROM customers WHERE id = 1;
-- GQL
MATCH (c:Customer {id: 1})
DELETE c

Joins to Pattern Matching

Inner Join:

-- SQL
SELECT o.id, c.name
FROM orders o
INNER JOIN customers c ON o.customer_id = c.id;
-- GQL
MATCH (c:Customer)-[:PLACED]->(o:Order)
RETURN o.id, c.name

Left Join:

-- SQL
SELECT c.name, o.id
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id;
-- GQL
MATCH (c:Customer)
OPTIONAL MATCH (c)-[:PLACED]->(o:Order)
RETURN c.name, o.id

Multiple Joins:

-- SQL
SELECT c.name, p.name, oi.quantity
FROM customers c
JOIN orders o ON c.id = o.customer_id
JOIN order_items oi ON o.id = oi.order_id
JOIN products p ON oi.product_id = p.id;
-- GQL
MATCH (c:Customer)-[:PLACED]->(o:Order)-[item:CONTAINS]->(p:Product)
RETURN c.name, p.name, item.quantity

Subqueries

Correlated Subquery:

-- SQL
SELECT c.name,
       (SELECT COUNT(*) FROM orders WHERE customer_id = c.id) AS order_count
FROM customers c;
-- GQL
MATCH (c:Customer)
OPTIONAL MATCH (c)-[:PLACED]->(o:Order)
RETURN c.name, count(o) AS order_count

EXISTS Subquery:

-- SQL
SELECT * FROM customers c
WHERE EXISTS (
    SELECT 1 FROM orders WHERE customer_id = c.id
);
-- GQL
MATCH (c:Customer)-[:PLACED]->(:Order)
RETURN DISTINCT c

Aggregations

GROUP BY:

-- SQL
SELECT category_id, COUNT(*), AVG(price)
FROM products
GROUP BY category_id;
-- GQL
MATCH (p:Product)-[:IN_CATEGORY]->(c:Category)
RETURN c.id, count(p), avg(p.price)

HAVING:

-- SQL
SELECT customer_id, COUNT(*) as order_count
FROM orders
GROUP BY customer_id
HAVING COUNT(*) > 5;
-- GQL
MATCH (c:Customer)-[:PLACED]->(o:Order)
WITH c, count(o) AS order_count
WHERE order_count > 5
RETURN c.id, order_count

Window Functions

ROW_NUMBER:

-- SQL
SELECT name, price,
       ROW_NUMBER() OVER (ORDER BY price DESC) as rank
FROM products;
-- GQL (using collect and unwind)
MATCH (p:Product)
WITH p ORDER BY p.price DESC
WITH collect(p) AS products
UNWIND range(0, size(products)-1) AS idx
RETURN products[idx].name, products[idx].price, idx + 1 AS rank

Common Table Expressions (CTEs)

-- SQL
WITH high_value_customers AS (
    SELECT customer_id, SUM(total) as total_spent
    FROM orders
    GROUP BY customer_id
    HAVING SUM(total) > 1000
)
SELECT c.name, hvc.total_spent
FROM customers c
JOIN high_value_customers hvc ON c.id = hvc.customer_id;
-- GQL
MATCH (c:Customer)-[:PLACED]->(o:Order)
WITH c, sum(o.total) AS total_spent
WHERE total_spent > 1000
RETURN c.name, total_spent

Recursive Queries

PostgreSQL Recursive CTE:

-- SQL: Find all subordinates
WITH RECURSIVE subordinates AS (
    SELECT id, name, manager_id
    FROM employees
    WHERE id = 1

    UNION ALL

    SELECT e.id, e.name, e.manager_id
    FROM employees e
    JOIN subordinates s ON e.manager_id = s.id
)
SELECT * FROM subordinates;

GQL Variable-Length Path:

-- GQL: Much simpler!
MATCH (manager:Employee {id: 1})<-[:REPORTS_TO*]-(subordinate)
RETURN subordinate.id, subordinate.name

ETL Pipeline Setup

Architecture Overview

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ PostgreSQL  │────▶│   ETL Job   │────▶│   Geode     │
└─────────────┘     └─────────────┘     └─────────────┘
       │                   │
       │            ┌──────┴──────┐
       │            │             │
       ▼            ▼             ▼
  ┌─────────┐  ┌─────────┐  ┌─────────┐
  │ Extract │  │Transform│  │  Load   │
  └─────────┘  └─────────┘  └─────────┘

Python ETL Script

import asyncio
import psycopg
from geode_client import Client

# Configuration
PG_CONFIG = {
    "host": "localhost",
    "port": 5432,
    "user": "postgres",
    "password": "password",
    "dbname": "ecommerce"
}

GEODE_CONFIG = {
    "host": "localhost",
    "port": 3141,
    "skip_verify": True
}

class PostgresToGeodeETL:
    def __init__(self):
        self.pg_conn = None
        self.geode_client = None

    async def connect(self):
        # Connect to PostgreSQL
        self.pg_conn = await psycopg.AsyncConnection.connect(
            **PG_CONFIG,
            autocommit=True
        )

        # Connect to Geode
        self.geode_client = Client(**GEODE_CONFIG)

    async def extract_table(self, table_name, batch_size=1000):
        """Extract data from PostgreSQL table in batches."""
        async with self.pg_conn.cursor() as cur:
            await cur.execute(f"SELECT COUNT(*) FROM {table_name}")
            total = (await cur.fetchone())[0]

            for offset in range(0, total, batch_size):
                await cur.execute(
                    f"SELECT * FROM {table_name} ORDER BY id LIMIT %s OFFSET %s",
                    (batch_size, offset)
                )
                columns = [desc[0] for desc in cur.description]
                rows = await cur.fetchall()

                for row in rows:
                    yield dict(zip(columns, row))

    async def load_nodes(self, label, data_generator, id_field='id'):
        """Load nodes into Geode."""
        async with self.geode_client.connection() as conn:
            count = 0
            await conn.begin()

            async for row in data_generator:
                # Convert types as needed
                props = self.transform_properties(row)

                await conn.execute(
                    f"CREATE (n:{label} $props)",
                    {"props": props}
                )
                count += 1

                # Commit in batches
                if count % 1000 == 0:
                    await conn.commit()
                    await conn.begin()
                    print(f"Loaded {count} {label} nodes...")

            await conn.commit()
            print(f"Total: {count} {label} nodes loaded")

    async def load_relationships(self, rel_type, from_label, to_label,
                                  from_field, to_field, data_generator):
        """Load relationships into Geode."""
        async with self.geode_client.connection() as conn:
            count = 0
            await conn.begin()

            async for row in data_generator:
                props = {k: v for k, v in row.items()
                        if k not in [from_field, to_field]}
                props = self.transform_properties(props)

                await conn.execute(f"""
                    MATCH (a:{from_label} {{id: $from_id}})
                    MATCH (b:{to_label} {{id: $to_id}})
                    CREATE (a)-[:{rel_type} $props]->(b)
                """, {
                    "from_id": row[from_field],
                    "to_id": row[to_field],
                    "props": props
                })
                count += 1

                if count % 1000 == 0:
                    await conn.commit()
                    await conn.begin()
                    print(f"Loaded {count} {rel_type} relationships...")

            await conn.commit()
            print(f"Total: {count} {rel_type} relationships loaded")

    def transform_properties(self, row):
        """Transform PostgreSQL types to Geode-compatible types."""
        result = {}
        for key, value in row.items():
            if value is None:
                continue  # Skip null values
            elif isinstance(value, datetime):
                result[key] = value.isoformat()
            elif isinstance(value, Decimal):
                result[key] = float(value)
            elif isinstance(value, (list, dict)):
                result[key] = value  # Lists and maps are supported
            else:
                result[key] = value
        return result

    async def run_migration(self):
        """Run the complete migration."""
        await self.connect()

        # Step 1: Create indexes for efficient relationship creation
        async with self.geode_client.connection() as conn:
            await conn.execute("CREATE INDEX customer_id ON :Customer(id)")
            await conn.execute("CREATE INDEX product_id ON :Product(id)")
            await conn.execute("CREATE INDEX category_id ON :Category(id)")
            await conn.execute("CREATE INDEX order_id ON :Order(id)")

        # Step 2: Load nodes
        print("\n=== Loading Nodes ===")

        await self.load_nodes('Customer',
            self.extract_table('customers'))

        await self.load_nodes('Category',
            self.extract_table('categories'))

        await self.load_nodes('Product',
            self.extract_table('products'))

        await self.load_nodes('Order',
            self.extract_table('orders'))

        # Step 3: Load relationships
        print("\n=== Loading Relationships ===")

        # Product -> Category
        async with self.pg_conn.cursor() as cur:
            await cur.execute("""
                SELECT id as product_id, category_id
                FROM products
                WHERE category_id IS NOT NULL
            """)
            columns = [desc[0] for desc in cur.description]
            rows = await cur.fetchall()

        async def product_category_gen():
            for row in rows:
                yield dict(zip(columns, row))

        await self.load_relationships(
            'IN_CATEGORY', 'Product', 'Category',
            'product_id', 'category_id',
            product_category_gen()
        )

        # Category -> Parent Category
        async with self.pg_conn.cursor() as cur:
            await cur.execute("""
                SELECT id as child_id, parent_id
                FROM categories
                WHERE parent_id IS NOT NULL
            """)
            columns = [desc[0] for desc in cur.description]
            rows = await cur.fetchall()

        async def category_parent_gen():
            for row in rows:
                yield dict(zip(columns, row))

        await self.load_relationships(
            'PARENT', 'Category', 'Category',
            'child_id', 'parent_id',
            category_parent_gen()
        )

        # Customer -> Order
        async with self.pg_conn.cursor() as cur:
            await cur.execute("""
                SELECT id as order_id, customer_id
                FROM orders
            """)
            columns = [desc[0] for desc in cur.description]
            rows = await cur.fetchall()

        async def customer_order_gen():
            for row in rows:
                yield dict(zip(columns, row))

        await self.load_relationships(
            'PLACED', 'Customer', 'Order',
            'customer_id', 'order_id',
            customer_order_gen()
        )

        # Order -> Product (from order_items)
        await self.load_relationships(
            'CONTAINS', 'Order', 'Product',
            'order_id', 'product_id',
            self.extract_table('order_items')
        )

        # Customer -> Product (reviews)
        await self.load_relationships(
            'REVIEWED', 'Customer', 'Product',
            'customer_id', 'product_id',
            self.extract_table('reviews')
        )

        print("\n=== Migration Complete ===")

# Run migration
async def main():
    etl = PostgresToGeodeETL()
    await etl.run_migration()

asyncio.run(main())

Go ETL Script

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"

    _ "github.com/lib/pq"
    _ "geodedb.com/geode"
)

type ETL struct {
    pg    *sql.DB
    geode *sql.DB
}

func NewETL(pgConn, geodeConn string) (*ETL, error) {
    pg, err := sql.Open("postgres", pgConn)
    if err != nil {
        return nil, err
    }

    geode, err := sql.Open("geode", geodeConn)
    if err != nil {
        return nil, err
    }

    return &ETL{pg: pg, geode: geode}, nil
}

func (e *ETL) MigrateTable(ctx context.Context, table, label string) error {
    // Count rows
    var count int
    err := e.pg.QueryRowContext(ctx,
        fmt.Sprintf("SELECT COUNT(*) FROM %s", table)).Scan(&count)
    if err != nil {
        return err
    }
    log.Printf("Migrating %d rows from %s to %s", count, table, label)

    // Query source data
    rows, err := e.pg.QueryContext(ctx,
        fmt.Sprintf("SELECT * FROM %s", table))
    if err != nil {
        return err
    }
    defer rows.Close()

    columns, err := rows.Columns()
    if err != nil {
        return err
    }

    // Begin transaction
    tx, err := e.geode.BeginTx(ctx, nil)
    if err != nil {
        return err
    }

    migrated := 0
    for rows.Next() {
        // Scan row
        values := make([]interface{}, len(columns))
        valuePtrs := make([]interface{}, len(columns))
        for i := range values {
            valuePtrs[i] = &values[i]
        }
        if err := rows.Scan(valuePtrs...); err != nil {
            return err
        }

        // Build property map
        props := make(map[string]interface{})
        for i, col := range columns {
            if values[i] != nil {
                props[col] = values[i]
            }
        }

        // Insert into Geode
        _, err = tx.ExecContext(ctx,
            fmt.Sprintf("CREATE (n:%s $props)", label),
            props,
        )
        if err != nil {
            tx.Rollback()
            return err
        }

        migrated++
        if migrated%1000 == 0 {
            log.Printf("Migrated %d/%d rows", migrated, count)
        }
    }

    if err := tx.Commit(); err != nil {
        return err
    }

    log.Printf("Completed: %d rows migrated", migrated)
    return nil
}

func main() {
    etl, err := NewETL(
        "postgres://user:pass@localhost/db?sslmode=disable",
        "localhost:3141",
    )
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()

    // Migrate tables
    tables := map[string]string{
        "customers":  "Customer",
        "products":   "Product",
        "categories": "Category",
        "orders":     "Order",
    }

    for table, label := range tables {
        if err := etl.MigrateTable(ctx, table, label); err != nil {
            log.Fatalf("Failed to migrate %s: %v", table, err)
        }
    }

    log.Println("Migration complete!")
}

Incremental Migration

Change Data Capture (CDC) Setup

Use PostgreSQL logical replication to capture changes:

-- Enable logical replication in postgresql.conf
-- wal_level = logical

-- Create replication slot
SELECT pg_create_logical_replication_slot('geode_slot', 'pgoutput');

-- Create publication
CREATE PUBLICATION geode_pub FOR TABLE customers, products, orders;

CDC Consumer

import asyncio
import psycopg
from psycopg import sql
from geode_client import Client

class CDCConsumer:
    def __init__(self, pg_config, geode_config):
        self.pg_config = pg_config
        self.geode_config = geode_config
        self.geode_client = None

    async def connect(self):
        self.geode_client = Client(**self.geode_config)

    async def process_change(self, change):
        """Process a single CDC change event."""
        async with self.geode_client.connection() as conn:
            table = change['table']
            operation = change['operation']
            data = change['data']

            # Map table to label
            label = table.title().rstrip('s')  # customers -> Customer

            if operation == 'INSERT':
                await conn.execute(
                    f"CREATE (n:{label} $props)",
                    {"props": data}
                )

            elif operation == 'UPDATE':
                set_clause = ', '.join(f"n.{k} = ${k}" for k in data.keys())
                await conn.execute(
                    f"MATCH (n:{label} {{id: $id}}) SET {set_clause}",
                    data
                )

            elif operation == 'DELETE':
                await conn.execute(
                    f"MATCH (n:{label} {{id: $id}}) DELETE n",
                    {"id": data['id']}
                )

    async def start_replication(self):
        """Start consuming CDC events."""
        await self.connect()

        conn = await psycopg.AsyncConnection.connect(
            **self.pg_config,
            autocommit=True
        )

        async with conn.cursor() as cur:
            # Start replication
            await cur.execute(
                "SELECT * FROM pg_logical_slot_get_changes('geode_slot', NULL, NULL)"
            )

            async for row in cur:
                change = self.parse_change(row)
                await self.process_change(change)

    def parse_change(self, row):
        """Parse PostgreSQL logical replication message."""
        # Implementation depends on output plugin format
        pass

# Run CDC consumer
async def main():
    consumer = CDCConsumer(
        pg_config={"host": "localhost", "user": "postgres", "dbname": "ecommerce"},
        geode_config={"host": "localhost", "port": 3141, "skip_verify": True}
    )
    await consumer.start_replication()

asyncio.run(main())

Dual-Write Pattern

During migration, write to both databases:

class DualWriteRepository:
    def __init__(self, pg_conn, geode_client):
        self.pg = pg_conn
        self.geode = geode_client

    async def create_customer(self, name, email, age):
        """Write to both PostgreSQL and Geode."""
        # Write to PostgreSQL (primary)
        async with self.pg.cursor() as cur:
            await cur.execute(
                "INSERT INTO customers (name, email, age) VALUES (%s, %s, %s) RETURNING id",
                (name, email, age)
            )
            customer_id = (await cur.fetchone())[0]

        # Write to Geode (secondary)
        try:
            async with self.geode.connection() as conn:
                await conn.execute(
                    "CREATE (:Customer {id: $id, name: $name, email: $email, age: $age})",
                    {"id": customer_id, "name": name, "email": email, "age": age}
                )
        except Exception as e:
            # Log error but don't fail - PostgreSQL is source of truth
            logging.error(f"Failed to write to Geode: {e}")

        return customer_id

    async def query_customer_network(self, customer_id, depth=3):
        """Query from Geode for graph traversals."""
        async with self.geode.connection() as conn:
            page, _ = await conn.query(f"""
                MATCH (c:Customer {{id: $id}})-[:KNOWS*1..{depth}]->(friend)
                RETURN DISTINCT friend
            """, {"id": customer_id})
            return page.rows

Hybrid Operation Period

Query Routing

Route queries to the appropriate database:

class QueryRouter:
    def __init__(self, pg_conn, geode_client):
        self.pg = pg_conn
        self.geode = geode_client

        # Define which queries go where
        self.graph_patterns = [
            'KNOWS', 'FRIENDS', 'FOLLOWS',  # Social relationships
            'PURCHASED', 'VIEWED',           # Customer behavior
            'REPORTS_TO', 'MANAGES',         # Hierarchies
        ]

    def should_use_graph(self, query):
        """Determine if query should go to graph database."""
        query_upper = query.upper()

        # Check for relationship patterns
        for pattern in self.graph_patterns:
            if f':{pattern}' in query_upper or f'-[:{pattern}' in query_upper:
                return True

        # Check for path queries
        if '*' in query and ('-[' in query or '->' in query):
            return True

        return False

    async def execute(self, query, params=None):
        """Route and execute query."""
        if self.should_use_graph(query):
            async with self.geode.connection() as conn:
                return await conn.query(query, params or {})
        else:
            # Translate to SQL if needed
            sql_query = self.translate_to_sql(query)
            async with self.pg.cursor() as cur:
                await cur.execute(sql_query, params)
                return await cur.fetchall()

Data Consistency Checks

async def verify_data_consistency(pg_conn, geode_client):
    """Verify data is consistent between PostgreSQL and Geode."""
    discrepancies = []

    # Check node counts
    tables_labels = [
        ('customers', 'Customer'),
        ('products', 'Product'),
        ('orders', 'Order'),
    ]

    for table, label in tables_labels:
        # PostgreSQL count
        async with pg_conn.cursor() as cur:
            await cur.execute(f"SELECT COUNT(*) FROM {table}")
            pg_count = (await cur.fetchone())[0]

        # Geode count
        async with geode_client.connection() as conn:
            page, _ = await conn.query(f"MATCH (n:{label}) RETURN count(n)")
            geode_count = page.rows[0]['count(n)'].as_int

        if pg_count != geode_count:
            discrepancies.append({
                'type': 'count_mismatch',
                'entity': label,
                'postgresql': pg_count,
                'geode': geode_count
            })

    # Sample data verification
    async with pg_conn.cursor() as cur:
        await cur.execute("SELECT id, name, email FROM customers ORDER BY RANDOM() LIMIT 100")
        samples = await cur.fetchall()

    async with geode_client.connection() as conn:
        for pg_id, pg_name, pg_email in samples:
            page, _ = await conn.query(
                "MATCH (c:Customer {id: $id}) RETURN c.name, c.email",
                {"id": pg_id}
            )
            if not page.rows:
                discrepancies.append({
                    'type': 'missing_node',
                    'entity': 'Customer',
                    'id': pg_id
                })
            else:
                row = page.rows[0]
                if row['c.name'].as_string != pg_name or row['c.email'].as_string != pg_email:
                    discrepancies.append({
                        'type': 'data_mismatch',
                        'entity': 'Customer',
                        'id': pg_id,
                        'postgresql': {'name': pg_name, 'email': pg_email},
                        'geode': {'name': row['c.name'].as_string, 'email': row['c.email'].as_string}
                    })

    return discrepancies

Migration Validation

Comprehensive Validation Script

async def run_migration_validation(pg_conn, geode_client):
    """Run comprehensive migration validation."""
    print("=" * 60)
    print("MIGRATION VALIDATION REPORT")
    print("=" * 60)

    # 1. Count validation
    print("\n1. NODE COUNT VALIDATION")
    print("-" * 40)

    entities = [
        ('customers', 'Customer'),
        ('products', 'Product'),
        ('categories', 'Category'),
        ('orders', 'Order'),
    ]

    all_counts_match = True
    for table, label in entities:
        async with pg_conn.cursor() as cur:
            await cur.execute(f"SELECT COUNT(*) FROM {table}")
            pg_count = (await cur.fetchone())[0]

        async with geode_client.connection() as conn:
            page, _ = await conn.query(f"MATCH (n:{label}) RETURN count(n) AS c")
            geode_count = page.rows[0]['c'].as_int

        status = "PASS" if pg_count == geode_count else "FAIL"
        if status == "FAIL":
            all_counts_match = False
        print(f"  {label}: PG={pg_count}, Geode={geode_count} [{status}]")

    # 2. Relationship count validation
    print("\n2. RELATIONSHIP COUNT VALIDATION")
    print("-" * 40)

    relationships = [
        ("SELECT COUNT(*) FROM order_items", "MATCH ()-[r:CONTAINS]->() RETURN count(r)"),
        ("SELECT COUNT(*) FROM reviews", "MATCH ()-[r:REVIEWED]->() RETURN count(r)"),
        ("SELECT COUNT(*) FROM products WHERE category_id IS NOT NULL",
         "MATCH ()-[r:IN_CATEGORY]->() RETURN count(r)"),
    ]

    for pg_query, gql_query in relationships:
        async with pg_conn.cursor() as cur:
            await cur.execute(pg_query)
            pg_count = (await cur.fetchone())[0]

        async with geode_client.connection() as conn:
            page, _ = await conn.query(gql_query)
            geode_count = page.rows[0]['count(r)'].as_int

        rel_type = gql_query.split(':')[1].split(']')[0]
        status = "PASS" if pg_count == geode_count else "FAIL"
        print(f"  {rel_type}: PG={pg_count}, Geode={geode_count} [{status}]")

    # 3. Data integrity validation
    print("\n3. DATA INTEGRITY VALIDATION")
    print("-" * 40)

    # Check referential integrity
    async with geode_client.connection() as conn:
        # Orders should have customers
        page, _ = await conn.query("""
            MATCH (o:Order)
            WHERE NOT (o)<-[:PLACED]-(:Customer)
            RETURN count(o) AS orphaned
        """)
        orphaned_orders = page.rows[0]['orphaned'].as_int
        print(f"  Orphaned orders: {orphaned_orders} [{'FAIL' if orphaned_orders > 0 else 'PASS'}]")

        # Order items should have products
        page, _ = await conn.query("""
            MATCH (o:Order)-[c:CONTAINS]->()
            WHERE NOT ()-[c]->(:Product)
            RETURN count(c) AS orphaned
        """)
        orphaned_items = page.rows[0]['orphaned'].as_int
        print(f"  Orphaned order items: {orphaned_items} [{'FAIL' if orphaned_items > 0 else 'PASS'}]")

    # 4. Query equivalence validation
    print("\n4. QUERY EQUIVALENCE VALIDATION")
    print("-" * 40)

    test_queries = [
        {
            'name': 'Top customers by order count',
            'sql': """
                SELECT c.name, COUNT(o.id) as order_count
                FROM customers c
                LEFT JOIN orders o ON c.id = o.customer_id
                GROUP BY c.id, c.name
                ORDER BY order_count DESC
                LIMIT 5
            """,
            'gql': """
                MATCH (c:Customer)
                OPTIONAL MATCH (c)-[:PLACED]->(o:Order)
                RETURN c.name, count(o) AS order_count
                ORDER BY order_count DESC
                LIMIT 5
            """
        },
        {
            'name': 'Products by category',
            'sql': """
                SELECT cat.name, COUNT(p.id)
                FROM categories cat
                LEFT JOIN products p ON p.category_id = cat.id
                GROUP BY cat.id, cat.name
            """,
            'gql': """
                MATCH (cat:Category)
                OPTIONAL MATCH (p:Product)-[:IN_CATEGORY]->(cat)
                RETURN cat.name, count(p)
            """
        }
    ]

    for test in test_queries:
        async with pg_conn.cursor() as cur:
            await cur.execute(test['sql'])
            pg_results = await cur.fetchall()

        async with geode_client.connection() as conn:
            page, _ = await conn.query(test['gql'])
            geode_results = [(r[0].as_string, r[1].as_int) for r in
                            [(row.values()[0], row.values()[1]) for row in page.rows]]

        # Compare first few results
        match = len(pg_results) == len(geode_results)
        print(f"  {test['name']}: [{'PASS' if match else 'FAIL'}]")

    print("\n" + "=" * 60)
    print("VALIDATION COMPLETE")
    print("=" * 60)

Common Pitfalls

1. Losing Referential Integrity

Problem: Foreign key constraints aren’t automatically enforced.

Solution: Create application-level validation or use Geode constraints.

// Create constraints
CREATE CONSTRAINT customer_id_unique ON :Customer(id) ASSERT UNIQUE
CREATE CONSTRAINT order_customer_exists ON :Order(customer_id) ASSERT EXISTS

2. N+1 Query Pattern

Problem: Fetching related data with multiple queries.

Solution: Use graph patterns to fetch all related data at once.

// Instead of multiple queries
// Query 1: Get customer
// Query 2: Get orders
// Query 3: Get products for each order

// Use a single query
MATCH (c:Customer {id: $id})-[:PLACED]->(o:Order)-[:CONTAINS]->(p:Product)
RETURN c, o, p

3. Missing Indexes

Problem: Slow queries due to missing indexes.

Solution: Create indexes for frequently queried properties.

CREATE INDEX customer_email ON :Customer(email)
CREATE INDEX product_name ON :Product(name)
CREATE INDEX order_date ON :Order(order_date)

4. Over-normalization

Problem: Creating too many node types for simple values.

Solution: Use properties for simple values, nodes for entities.

// Bad: Over-normalized
(:Customer)-[:HAS_EMAIL]->(:Email {address: 'alice@example.com'})

// Good: Property
(:Customer {email: 'alice@example.com'})

5. Ignoring NULL Semantics

Problem: Different NULL handling between SQL and GQL.

Solution: Use COALESCE and explicit NULL checks.

MATCH (c:Customer)
WHERE c.middle_name IS NOT NULL
RETURN c.name, COALESCE(c.nickname, c.name) AS display_name

Migration Checklist

Pre-Migration

  • Analyze PostgreSQL schema complexity
  • Identify tables suitable for graph modeling
  • Document all foreign key relationships
  • Assess query patterns and performance requirements
  • Plan hybrid operation period

Schema Migration

  • Design node labels and properties
  • Design relationship types and properties
  • Create Geode schema (indexes, constraints)
  • Document schema mapping

Data Migration

  • Create ETL pipeline
  • Test with sample data
  • Run full data migration
  • Validate row/node counts
  • Validate relationship counts

Application Migration

  • Update data access layer
  • Translate SQL queries to GQL
  • Implement dual-write if needed
  • Update connection configuration
  • Test all CRUD operations

Validation

  • Run data consistency checks
  • Execute query equivalence tests
  • Perform load testing
  • Verify backup/restore procedures

Cutover

  • Plan maintenance window
  • Prepare rollback procedure
  • Execute final data sync
  • Switch application to Geode
  • Monitor for errors
  • Decommission PostgreSQL integration

Resources

Getting Help

If you encounter issues during migration: