Data Migration

Data migration is the process of transferring data from one system to another, often involving format transformation, schema mapping, and data quality validation. Geode provides comprehensive tools and strategies for smooth, reliable data migrations.

Migration Strategies

Big Bang Migration

Move all data at once:

Pros:

  • Simple and straightforward
  • Immediate cutover
  • No sync period

Cons:

  • Higher risk
  • Longer downtime
  • Difficult rollback
# Stop old system
# Export all data
geode export --source legacy_db --output data.json

# Import to Geode
geode import --input data.json --graph production

# Start new system

Incremental Migration

Migrate data in phases:

Pros:

  • Lower risk
  • Phased validation
  • Easier rollback

Cons:

  • More complex
  • Requires sync
  • Longer timeline
# Phase 1: Users
geode import --input users.json --graph production

# Phase 2: Relationships
geode import --input relationships.json --graph production

# Phase 3: Activity data
geode import --input activities.json --graph production

Parallel Run

Run old and new systems simultaneously:

Pros:

  • Lowest risk
  • Validate in production
  • Gradual transition

Cons:

  • Most complex
  • Highest cost
  • Sync overhead

Schema Migration

Schema Versioning

Track schema changes:

-- Create schema version table
CREATE TABLE schema_versions (
    version INTEGER PRIMARY KEY,
    description STRING,
    applied_at TIMESTAMP,
    applied_by STRING
);

-- Record migration
INSERT INTO schema_versions VALUES (
    1,
    'Initial schema',
    NOW(),
    CURRENT_USER()
);

Schema Evolution

Handle breaking changes:

-- Add new property
ALTER PROPERTY TYPE Person
ADD COLUMN middle_name STRING;

-- Populate from existing data
UPDATE Person
SET middle_name = SPLIT_PART(full_name, ' ', 2)
WHERE full_name LIKE '% % %';

-- Remove old property
ALTER PROPERTY TYPE Person
DROP COLUMN full_name;

Data Transformation

Type Conversion

Convert data types:

-- String to date
UPDATE Person
SET birth_date = TO_DATE(birth_date_string, 'YYYY-MM-DD')
WHERE birth_date IS NULL;

-- JSON to structured
UPDATE Event
SET metadata_parsed = JSON_PARSE(metadata_string);

Data Cleansing

Clean and normalize data:

-- Trim whitespace
UPDATE Person
SET email = TRIM(LOWER(email));

-- Standardize phone numbers
UPDATE Contact
SET phone = REGEXP_REPLACE(phone, '[^0-9]', '');

-- Remove duplicates
DELETE FROM Person p1
WHERE EXISTS (
    SELECT 1 FROM Person p2
    WHERE p1.email = p2.email
      AND p1.id > p2.id
);

Migration Tools

Geode CLI

Command-line migration tools:

# Export from SQL database
geode export \
  --source postgresql://localhost/mydb \
  --output backup.json \
  --format json

# Import to Geode
geode import \
  --input backup.json \
  --graph production \
  --batch-size 1000 \
  --parallel 4

# Verify migration
geode validate \
  --graph production \
  --check-constraints \
  --check-references

Python Migration Script

Custom migration logic:

from geode_client import Client
import asyncio

async def migrate_users(source_conn, geode_client):
    """Migrate users from SQL to Geode"""
    # Extract from source
    users = await source_conn.fetch("SELECT * FROM users")
    
    batch = []
    for user in users:
        # Transform
        node = {
            "labels": ["User"],
            "properties": {
                "id": user['id'],
                "name": user['name'],
                "email": user['email'].lower(),
                "created_at": user['created_at'].isoformat()
            }
        }
        
        batch.append(node)
        
        # Load in batches
        if len(batch) >= 1000:
            await geode_client.create_nodes_batch(batch)
            batch = []
    
    # Load remaining
    if batch:
        await geode_client.create_nodes_batch(batch)

async def main():
    source = await connect_source()
    geode = Client("localhost", 3141)
    
    await migrate_users(source, geode)
    await migrate_relationships(source, geode)
    await validate_migration(source, geode)

asyncio.run(main())

Validation

Data Quality Checks

Verify migration completeness:

-- Count comparison
SELECT 'source' AS system, COUNT(*) AS user_count
FROM source_users
UNION ALL
SELECT 'target' AS system, COUNT(*) AS user_count
FROM Person;

-- Sample verification
SELECT *
FROM Person p
WHERE NOT EXISTS (
    SELECT 1 FROM source_users s
    WHERE s.id = p.id
);

Referential Integrity

Validate relationships:

-- Check for orphaned edges
SELECT e.*
FROM FRIEND e
WHERE NOT EXISTS (
    SELECT 1 FROM Person p WHERE p.id = e.from_id
)
OR NOT EXISTS (
    SELECT 1 FROM Person p WHERE p.id = e.to_id
);

Best Practices

Planning

  • Document current state
  • Define success criteria
  • Create rollback plan
  • Test on copy first
  • Schedule during low-traffic

Execution

  • Use transactions
  • Batch operations
  • Monitor progress
  • Validate continuously
  • Keep audit trail

Post-Migration

  • Verify data quality
  • Update documentation
  • Monitor performance
  • Gather feedback
  • Archive old data
  • Import - Data import operations
  • Export - Data export operations
  • ETL - Extract Transform Load pipelines

Advanced Migration Patterns

Zero-Downtime Migration Strategy

Deploy changes without service interruption:

from geode_client import Client
import asyncio

class ZeroDowntimeMigration:
    """Manage migrations with zero downtime using dual-write pattern."""

    def __init__(self, old_db, new_db):
        self.old_db = old_db
        self.new_db = new_db
        self.migration_complete = False

    async def phase1_dual_write(self):
        """Phase 1: Write to both old and new systems."""
        print("Phase 1: Starting dual-write mode")

        # All new writes go to both systems
        async def write_user(user_data):
            # Write to old system
            await self.old_db.execute("""
                INSERT INTO users_table VALUES ($id, $name, $email)
            """, user_data)

            # Write to new system (Geode)
            await self.new_db.execute("""
                CREATE (u:User {
                    id: $id,
                    name: $name,
                    email: $email,
                    migrated_at: datetime()
                })
            """, user_data)

        return write_user

    async def phase2_backfill(self, batch_size=1000):
        """Phase 2: Backfill historical data."""
        print("Phase 2: Backfilling historical data")

        offset = 0
        while True:
            # Read batch from old system
            old_users = await self.old_db.fetch(f"""
                SELECT id, name, email, created_at
                FROM users_table
                ORDER BY id
                LIMIT {batch_size} OFFSET {offset}
            """)

            if not old_users:
                break

            # Write to new system
            async with self.new_db.transaction():
                await self.new_db.execute("""
                    UNWIND $users AS user
                    MERGE (u:User {id: user.id})
                    ON CREATE SET
                        u.name = user.name,
                        u.email = user.email,
                        u.created_at = user.created_at,
                        u.backfilled = true
                """, {'users': old_users})

            offset += batch_size
            print(f"Backfilled {offset} users")

    async def phase3_verify(self):
        """Phase 3: Verify data consistency."""
        print("Phase 3: Verifying data consistency")

        # Count records
        old_count = await self.old_db.fetch_val("SELECT COUNT(*) FROM users_table")
        new_count = await self.new_db.execute("MATCH (u:User) RETURN count(u) AS count")
        new_count = (new_count.rows[0] if new_count.rows else None)['count']

        if old_count != new_count:
            raise MigrationError(f"Count mismatch: {old_count} vs {new_count}")

        # Sample verification
        sample_ids = await self.old_db.fetch("SELECT id FROM users_table ORDER BY RANDOM() LIMIT 1000")

        for sample in sample_ids:
            old_user = await self.old_db.fetch_one(f"SELECT * FROM users_table WHERE id = {sample['id']}")
            new_user = await self.new_db.execute("MATCH (u:User {id: $id}) RETURN u", {'id': sample['id']})
            new_user = new_user.rows[0] if new_user.rows else None

            if not new_user or old_user['name'] != new_user['u']['name']:
                raise MigrationError(f"Data mismatch for user {sample['id']}")

        print("Verification complete")

    async def phase4_cutover(self):
        """Phase 4: Switch reads to new system."""
        print("Phase 4: Cutting over reads to new system")
        self.migration_complete = True

# Usage
async def run_zero_downtime_migration():
    old_postgres = PostgresClient("old-db:5432")
    new_geode = Client("new-geode:3141")

    migration = ZeroDowntimeMigration(old_postgres, new_geode)

    # Run migration phases
    await migration.phase1_dual_write()
    await migration.phase2_backfill(batch_size=10000)
    await migration.phase3_verify()
    await migration.phase4_cutover()

    print("Migration complete!")

asyncio.run(run_zero_downtime_migration())

Schema Transformation Patterns

Transform relational schema to graph model:

class RelationalToGraphTransformer:
    """Transform relational database schema to graph model."""

    def __init__(self, source_db, target_graph):
        self.source = source_db
        self.target = target_graph

    async def migrate_entities(self, table_name, label):
        """Migrate table rows to graph nodes."""
        print(f"Migrating {table_name} → :{label}")

        # Read table schema
        columns = await self.source.get_columns(table_name)

        # Migrate data in batches
        offset = 0
        batch_size = 5000

        while True:
            rows = await self.source.fetch(f"""
                SELECT * FROM {table_name}
                ORDER BY id
                LIMIT {batch_size} OFFSET {offset}
            """)

            if not rows:
                break

            # Transform and insert
            nodes = [
                {col: row[col] for col in columns}
                for row in rows
            ]

            await self.target.execute(f"""
                UNWIND $nodes AS node
                CREATE (n:{label})
                SET n = node
            """, {'nodes': nodes})

            offset += batch_size
            print(f"  Migrated {offset} {label} nodes")

    async def migrate_foreign_keys(self, fk_constraint):
        """Transform foreign keys to relationships."""
        table = fk_constraint['table']
        ref_table = fk_constraint['referenced_table']
        fk_column = fk_constraint['column']
        ref_column = fk_constraint['referenced_column']

        rel_type = self.infer_relationship_type(table, ref_table)

        print(f"Creating {rel_type} relationships: {table}{ref_table}")

        await self.target.execute(f"""
            MATCH (a:{table.title()})
            MATCH (b:{ref_table.title()} {{id: a.{fk_column}}})
            CREATE (a)-[:{rel_type}]->(b)
        """)

    def infer_relationship_type(self, from_table, to_table):
        """Infer relationship type from table names."""
        # Simple heuristic - customize for your schema
        return f"HAS_{to_table.upper()}"

    async def migrate_join_tables(self, join_table):
        """Transform many-to-many join tables to relationships."""
        left_col = join_table['left_column']
        right_col = join_table['right_column']
        left_entity = join_table['left_entity']
        right_entity = join_table['right_entity']

        rel_type = f"{left_entity}_TO_{right_entity}".upper()

        print(f"Migrating join table {join_table['name']}{rel_type}")

        rows = await self.source.fetch(f"""
            SELECT * FROM {join_table['name']}
        """)

        for row in rows:
            await self.target.execute(f"""
                MATCH (a:{left_entity} {{id: $left_id}})
                MATCH (b:{right_entity} {{id: $right_id}})
                CREATE (a)-[r:{rel_type}]->(b)
                SET r = $properties
            """, {
                'left_id': row[left_col],
                'right_id': row[right_col],
                'properties': {k: v for k, v in row.items() if k not in [left_col, right_col]}
            })

# Example usage
async def migrate_ecommerce_schema():
    transformer = RelationalToGraphTransformer(postgres, geode)

    # Migrate entities
    await transformer.migrate_entities('users', 'User')
    await transformer.migrate_entities('products', 'Product')
    await transformer.migrate_entities('orders', 'Order')

    # Migrate foreign keys
    await transformer.migrate_foreign_keys({
        'table': 'orders',
        'referenced_table': 'users',
        'column': 'user_id',
        'referenced_column': 'id'
    })

    # Migrate join tables
    await transformer.migrate_join_tables({
        'name': 'order_items',
        'left_column': 'order_id',
        'right_column': 'product_id',
        'left_entity': 'Order',
        'right_entity': 'Product'
    })

Data Quality Validation

Comprehensive validation during migration:

class MigrationValidator:
    """Validate data quality during and after migration."""

    def __init__(self, source_db, target_graph):
        self.source = source_db
        self.target = target_graph
        self.issues = []

    async def validate_counts(self, table_name, label):
        """Verify row/node counts match."""
        source_count = await self.source.fetch_val(f"SELECT COUNT(*) FROM {table_name}")

        result = await self.target.execute(f"MATCH (n:{label}) RETURN count(n) AS count")
        target_count = (result.rows[0] if result.rows else None)['count']

        if source_count != target_count:
            self.issues.append({
                'type': 'count_mismatch',
                'entity': label,
                'source': source_count,
                'target': target_count
            })

        return source_count == target_count

    async def validate_referential_integrity(self):
        """Check for orphaned relationships."""
        result = await self.target.execute("""
            MATCH ()-[r]->()
            WHERE NOT EXISTS {
                MATCH (start)-[r]->(end)
                WHERE id(start) IS NOT NULL AND id(end) IS NOT NULL
            }
            RETURN count(r) AS orphaned_relationships
        """)

        orphaned = (result.rows[0] if result.rows else None)['orphaned_relationships']

        if orphaned > 0:
            self.issues.append({
                'type': 'orphaned_relationships',
                'count': orphaned
            })

        return orphaned == 0

    async def validate_data_types(self, label, property_name, expected_type):
        """Verify property data types."""
        result = await self.target.execute(f"""
            MATCH (n:{label})
            WHERE n.{property_name} IS NOT NULL
            WITH n.{property_name} AS value, valueType(n.{property_name}) AS actual_type
            WHERE actual_type <> $expected_type
            RETURN count(*) AS type_mismatches
        """, {'expected_type': expected_type})

        mismatches = (result.rows[0] if result.rows else None)['type_mismatches']

        if mismatches > 0:
            self.issues.append({
                'type': 'data_type_mismatch',
                'entity': label,
                'property': property_name,
                'count': mismatches
            })

        return mismatches == 0

    async def validate_unique_constraints(self, label, property_name):
        """Check for duplicate values in unique properties."""
        result = await self.target.execute(f"""
            MATCH (n:{label})
            WITH n.{property_name} AS value, count(n) AS occurrences
            WHERE occurrences > 1
            RETURN count(*) AS duplicates
        """)

        duplicates = (result.rows[0] if result.rows else None)['duplicates']

        if duplicates > 0:
            self.issues.append({
                'type': 'unique_constraint_violation',
                'entity': label,
                'property': property_name,
                'count': duplicates
            })

        return duplicates == 0

    def generate_report(self):
        """Generate validation report."""
        if not self.issues:
            return "✓ All validations passed"

        report = "Migration Validation Issues:\n"
        for issue in self.issues:
            report += f"\n{issue['type']}:\n"
            for key, value in issue.items():
                if key != 'type':
                    report += f"  - {key}: {value}\n"

        return report

Incremental Migration with Change Data Capture

class CDCMigration:
    """Migrate data incrementally using change data capture."""

    def __init__(self, source_db, target_graph):
        self.source = source_db
        self.target = target_graph
        self.last_sync_timestamp = None

    async def capture_changes(self, since_timestamp=None):
        """Capture changes from source database."""
        timestamp = since_timestamp or self.last_sync_timestamp or '1970-01-01'

        changes = await self.source.fetch("""
            SELECT table_name, operation, record_id, new_data, changed_at
            FROM change_log
            WHERE changed_at > $timestamp
            ORDER BY changed_at
        """, {'timestamp': timestamp})

        return changes

    async def apply_changes(self, changes):
        """Apply changes to target graph database."""
        for change in changes:
            table = change['table_name']
            operation = change['operation']
            data = change['new_data']

            if operation == 'INSERT' or operation == 'UPDATE':
                await self.target.execute(f"""
                    MERGE (n:{table.title()} {{id: $id}})
                    SET n = $data,
                        n.synced_at = datetime()
                """, {'id': change['record_id'], 'data': data})

            elif operation == 'DELETE':
                await self.target.execute(f"""
                    MATCH (n:{table.title()} {{id: $id}})
                    DETACH DELETE n
                """, {'id': change['record_id']})

        if changes:
            self.last_sync_timestamp = changes[-1]['changed_at']

    async def continuous_sync(self, interval_seconds=60):
        """Continuously sync changes."""
        print("Starting continuous sync...")

        while True:
            try:
                changes = await self.capture_changes()
                if changes:
                    await self.apply_changes(changes)
                    print(f"Synced {len(changes)} changes")

                await asyncio.sleep(interval_seconds)

            except Exception as e:
                print(f"Sync error: {e}")
                await asyncio.sleep(interval_seconds)

Migration Rollback Strategy

class MigrationRollback:
    """Provide rollback capability for failed migrations."""

    def __init__(self, target_graph):
        self.target = target_graph
        self.migration_id = None
        self.checkpoints = []

    async def create_checkpoint(self, name):
        """Create a recovery checkpoint."""
        checkpoint = {
            'name': name,
            'timestamp': datetime.now(),
            'node_count': await self.get_node_count(),
            'relationship_count': await self.get_relationship_count()
        }

        self.checkpoints.append(checkpoint)
        print(f"Checkpoint created: {name}")

        return checkpoint

    async def get_node_count(self):
        """Get current node count."""
        result = await self.target.execute("MATCH (n) RETURN count(n) AS count")
        return (result.rows[0] if result.rows else None)['count']

    async def get_relationship_count(self):
        """Get current relationship count."""
        result = await self.target.execute("MATCH ()-[r]->() RETURN count(r) AS count")
        return (result.rows[0] if result.rows else None)['count']

    async def rollback_to_checkpoint(self, checkpoint_name):
        """Rollback to a specific checkpoint."""
        checkpoint = next((c for c in self.checkpoints if c['name'] == checkpoint_name), None)

        if not checkpoint:
            raise ValueError(f"Checkpoint not found: {checkpoint_name}")

        print(f"Rolling back to checkpoint: {checkpoint_name}")

        # Delete nodes created after checkpoint
        await self.target.execute("""
            MATCH (n)
            WHERE n.migrated_at > $checkpoint_time
            DETACH DELETE n
        """, {'checkpoint_time': checkpoint['timestamp']})

        print("Rollback complete")

## Performance Optimization

### Parallel Migration Processing

```python
class ParallelMigrator:
    """Migrate data using parallel workers."""

    def __init__(self, source_db, target_graph, num_workers=4):
        self.source = source_db
        self.target = target_graph
        self.num_workers = num_workers

    async def migrate_table_parallel(self, table_name, label):
        """Migrate table using parallel workers."""
        # Get total row count
        total_rows = await self.source.fetch_val(f"SELECT COUNT(*) FROM {table_name}")

        chunk_size = total_rows // self.num_workers + 1

        # Create worker tasks
        tasks = []
        for worker_id in range(self.num_workers):
            offset = worker_id * chunk_size
            task = self.migrate_chunk(table_name, label, offset, chunk_size, worker_id)
            tasks.append(task)

        # Execute in parallel
        results = await asyncio.gather(*tasks)

        total_migrated = sum(results)
        print(f"Migrated {total_migrated} rows from {table_name}")

    async def migrate_chunk(self, table_name, label, offset, limit, worker_id):
        """Migrate a chunk of data (worker task)."""
        rows = await self.source.fetch(f"""
            SELECT * FROM {table_name}
            ORDER BY id
            LIMIT {limit} OFFSET {offset}
        """)

        if not rows:
            return 0

        # Create connection for this worker
        worker_client = Client(host="localhost", port=3141)
        async with worker_client.connection() as conn:
            await conn.begin()
            try:
                await conn.execute(f"""
                    UNWIND $rows AS row
                    CREATE (n:{label})
                    SET n = row
                """, {'rows': rows})
                await conn.commit()
                print(f"Worker {worker_id}: Migrated {len(rows)} rows")
                return len(rows)
            except Exception:
                await conn.rollback()
                raise

Learn More


Related Articles