Data Export Guide

This guide covers comprehensive strategies for exporting data from Geode to various formats including CSV, JSON/JSONL, and streaming exports for analytics pipelines. Learn how to perform selective exports, incremental exports, and handle large datasets efficiently.

Overview

Geode supports multiple export methods:

MethodBest ForOutput Format
CSV ExportSpreadsheets, BI toolsTabular data
JSON ExportAPIs, web applicationsStructured data
JSONL ExportStreaming, log processingLine-delimited JSON
Streaming ExportLarge datasetsContinuous stream
Incremental ExportChange trackingDelta updates

Exporting to CSV

Basic CSV Export

Export all nodes of a specific label to CSV format.

GQL Query:

MATCH (u:User)
RETURN u.id, u.name, u.email, u.age, u.city
ORDER BY u.id

Multi-Language CSV Export Examples

package main

import (
    "context"
    "database/sql"
    "encoding/csv"
    "log"
    "os"
    "strconv"

    _ "geodedb.com/geode"
)

func main() {
    db, err := sql.Open("geode", "localhost:3141")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ctx := context.Background()

    // Query data
    rows, err := db.QueryContext(ctx, `
        MATCH (u:User)
        RETURN u.id, u.name, u.email, u.age, u.city
        ORDER BY u.id
    `)
    if err != nil {
        log.Fatal(err)
    }
    defer rows.Close()

    // Create CSV file
    file, err := os.Create("users_export.csv")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    writer := csv.NewWriter(file)
    defer writer.Flush()

    // Write header
    writer.Write([]string{"id", "name", "email", "age", "city"})

    // Write rows
    rowCount := 0
    for rows.Next() {
        var id, age int
        var name, email, city string

        if err := rows.Scan(&id, &name, &email, &age, &city); err != nil {
            log.Printf("Error scanning row: %v", err)
            continue
        }

        writer.Write([]string{
            strconv.Itoa(id),
            name,
            email,
            strconv.Itoa(age),
            city,
        })
        rowCount++
    }

    log.Printf("Exported %d users to users_export.csv", rowCount)
}
import asyncio
import csv
from geode_client import Client

async def export_to_csv(filename: str):
    client = Client(host="localhost", port=3141, skip_verify=True)

    async with client.connection() as conn:
        # Query all users
        page, has_more = await conn.query("""
            MATCH (u:User)
            RETURN u.id, u.name, u.email, u.age, u.city
            ORDER BY u.id
        """)

        with open(filename, 'w', newline='') as file:
            writer = csv.writer(file)

            # Write header
            writer.writerow(['id', 'name', 'email', 'age', 'city'])

            # Write first page
            for row in page.rows:
                writer.writerow([
                    row['u.id'].as_int,
                    row['u.name'].as_string,
                    row['u.email'].as_string,
                    row['u.age'].as_int,
                    row['u.city'].as_string
                ])

            # Fetch remaining pages
            while has_more:
                page, has_more = await conn.pull()
                for row in page.rows:
                    writer.writerow([
                        row['u.id'].as_int,
                        row['u.name'].as_string,
                        row['u.email'].as_string,
                        row['u.age'].as_int,
                        row['u.city'].as_string
                    ])

        print(f"Exported to {filename}")

asyncio.run(export_to_csv("users_export.csv"))
use geode_client::Client;
use std::fs::File;
use std::io::Write;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new("127.0.0.1", 3141).skip_verify(true);
    let mut conn = client.connect().await?;

    // Query data
    let (page, _) = conn.query(r#"
        MATCH (u:User)
        RETURN u.id, u.name, u.email, u.age, u.city
        ORDER BY u.id
    "#).await?;

    // Create CSV file
    let mut file = File::create("users_export.csv")?;

    // Write header
    writeln!(file, "id,name,email,age,city")?;

    // Write rows
    let mut count = 0;
    for row in &page.rows {
        let id = row.get("u.id").unwrap().as_int()?;
        let name = row.get("u.name").unwrap().as_string()?;
        let email = row.get("u.email").unwrap().as_string()?;
        let age = row.get("u.age").unwrap().as_int()?;
        let city = row.get("u.city").unwrap().as_string()?;

        // Escape CSV fields properly
        writeln!(file, "{},\"{}\",\"{}\",{},\"{}\"",
            id,
            name.replace("\"", "\"\""),
            email.replace("\"", "\"\""),
            age,
            city.replace("\"", "\"\"")
        )?;
        count += 1;
    }

    println!("Exported {} users to users_export.csv", count);
    Ok(())
}
import { createClient } from '@geodedb/client';
import { createWriteStream } from 'fs';

async function exportToCSV(filename: string) {
    const client = await createClient('quic://localhost:3141');

    const writeStream = createWriteStream(filename);

    // Write header
    writeStream.write('id,name,email,age,city\n');

    // Query and stream results
    const rows = await client.queryAll(`
        MATCH (u:User)
        RETURN u.id, u.name, u.email, u.age, u.city
        ORDER BY u.id
    `);

    let count = 0;
    for (const row of rows) {
        const id = row.get('u.id')?.asNumber;
        const name = row.get('u.name')?.asString?.replace(/"/g, '""');
        const email = row.get('u.email')?.asString?.replace(/"/g, '""');
        const age = row.get('u.age')?.asNumber;
        const city = row.get('u.city')?.asString?.replace(/"/g, '""');

        writeStream.write(`${id},"${name}","${email}",${age},"${city}"\n`);
        count++;
    }

    writeStream.end();
    console.log(`Exported ${count} users to ${filename}`);

    await client.close();
}

exportToCSV('users_export.csv');
const std = @import("std");
const geode = @import("geode_client");

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    var client = geode.GeodeClient.init(allocator, "localhost", 3141, true);
    defer client.deinit();

    try client.connect();
    try client.sendHello("csv-export", "1.0.0");
    _ = try client.receiveMessage(30000);

    // Query data
    try client.sendRunGql(1,
        \\MATCH (u:User)
        \\RETURN u.id, u.name, u.email, u.age, u.city
        \\ORDER BY u.id
    , null);
    _ = try client.receiveMessage(30000);

    // Pull results
    try client.sendPull(1, 10000);
    const result = try client.receiveMessage(30000);
    defer allocator.free(result);

    // Create output file
    const file = try std.fs.cwd().createFile("users_export.csv", .{});
    defer file.close();

    // Write header
    try file.writeAll("id,name,email,age,city\n");

    // Parse JSON result and write CSV rows
    const parsed = try std.json.parseFromSlice(std.json.Value, allocator, result, .{});
    defer parsed.deinit();

    if (parsed.value.object.get("rows")) |rows| {
        for (rows.array.items) |row| {
            // Extract fields and write CSV line
            const id = row.object.get("u.id").?.integer;
            const name = row.object.get("u.name").?.string;
            const email = row.object.get("u.email").?.string;
            const age = row.object.get("u.age").?.integer;
            const city = row.object.get("u.city").?.string;

            var buf: [1024]u8 = undefined;
            const line = try std.fmt.bufPrint(&buf, "{d},\"{s}\",\"{s}\",{d},\"{s}\"\n", .{
                id, name, email, age, city,
            });
            try file.writeAll(line);
        }
    }

    std.debug.print("Export complete\n", .{});
}

Exporting Relationships to CSV

Export relationships with their properties:

MATCH (u1:User)-[r:KNOWS]->(u2:User)
RETURN u1.id AS from_id,
       u2.id AS to_id,
       r.since AS since,
       r.strength AS strength
ORDER BY u1.id, u2.id
import asyncio
import csv
from geode_client import Client

async def export_relationships():
    client = Client(host="localhost", port=3141, skip_verify=True)

    async with client.connection() as conn:
        page, _ = await conn.query("""
            MATCH (u1:User)-[r:KNOWS]->(u2:User)
            RETURN u1.id AS from_id,
                   u2.id AS to_id,
                   r.since AS since,
                   r.strength AS strength
            ORDER BY u1.id, u2.id
        """)

        with open('relationships_export.csv', 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['from_id', 'to_id', 'since', 'strength'])

            for row in page.rows:
                writer.writerow([
                    row['from_id'].as_int,
                    row['to_id'].as_int,
                    row['since'].as_string if row['since'] else '',
                    row['strength'].as_float if row['strength'] else ''
                ])

        print(f"Exported {len(page.rows)} relationships")

asyncio.run(export_relationships())

Exporting to JSON/JSONL

Full JSON Export

Export data as a complete JSON document:

package main

import (
    "context"
    "database/sql"
    "encoding/json"
    "log"
    "os"

    _ "geodedb.com/geode"
)

type User struct {
    ID    int      `json:"id"`
    Name  string   `json:"name"`
    Email string   `json:"email"`
    Age   int      `json:"age"`
    City  string   `json:"city"`
    Tags  []string `json:"tags,omitempty"`
}

type ExportData struct {
    ExportedAt string `json:"exported_at"`
    Count      int    `json:"count"`
    Users      []User `json:"users"`
}

func main() {
    db, err := sql.Open("geode", "localhost:3141")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ctx := context.Background()

    rows, err := db.QueryContext(ctx, `
        MATCH (u:User)
        RETURN u.id, u.name, u.email, u.age, u.city, u.tags
        ORDER BY u.id
    `)
    if err != nil {
        log.Fatal(err)
    }
    defer rows.Close()

    var users []User
    for rows.Next() {
        var u User
        var tagsJSON string

        if err := rows.Scan(&u.ID, &u.Name, &u.Email, &u.Age, &u.City, &tagsJSON); err != nil {
            continue
        }

        if tagsJSON != "" {
            json.Unmarshal([]byte(tagsJSON), &u.Tags)
        }

        users = append(users, u)
    }

    export := ExportData{
        ExportedAt: "2026-01-28T10:00:00Z",
        Count:      len(users),
        Users:      users,
    }

    file, _ := os.Create("users_export.json")
    defer file.Close()

    encoder := json.NewEncoder(file)
    encoder.SetIndent("", "  ")
    encoder.Encode(export)

    log.Printf("Exported %d users to users_export.json", len(users))
}
import asyncio
import json
from datetime import datetime
from geode_client import Client

async def export_to_json(filename: str):
    client = Client(host="localhost", port=3141, skip_verify=True)

    async with client.connection() as conn:
        page, _ = await conn.query("""
            MATCH (u:User)
            RETURN u.id, u.name, u.email, u.age, u.city, u.tags
            ORDER BY u.id
        """)

        users = []
        for row in page.rows:
            user = {
                "id": row['u.id'].as_int,
                "name": row['u.name'].as_string,
                "email": row['u.email'].as_string,
                "age": row['u.age'].as_int,
                "city": row['u.city'].as_string,
            }

            # Handle optional tags
            if row.get('u.tags'):
                user["tags"] = row['u.tags'].as_list

            users.append(user)

        export_data = {
            "exported_at": datetime.utcnow().isoformat() + "Z",
            "count": len(users),
            "users": users
        }

        with open(filename, 'w') as f:
            json.dump(export_data, f, indent=2)

        print(f"Exported {len(users)} users to {filename}")

asyncio.run(export_to_json("users_export.json"))
use geode_client::Client;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::Write;
use chrono::Utc;

#[derive(Serialize)]
struct User {
    id: i64,
    name: String,
    email: String,
    age: i64,
    city: String,
    #[serde(skip_serializing_if = "Vec::is_empty")]
    tags: Vec<String>,
}

#[derive(Serialize)]
struct ExportData {
    exported_at: String,
    count: usize,
    users: Vec<User>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new("127.0.0.1", 3141).skip_verify(true);
    let mut conn = client.connect().await?;

    let (page, _) = conn.query(r#"
        MATCH (u:User)
        RETURN u.id, u.name, u.email, u.age, u.city, u.tags
        ORDER BY u.id
    "#).await?;

    let mut users = Vec::new();
    for row in &page.rows {
        let tags: Vec<String> = row.get("u.tags")
            .map(|v| v.as_list().unwrap_or_default()
                .iter()
                .map(|t| t.as_string().unwrap_or_default())
                .collect())
            .unwrap_or_default();

        users.push(User {
            id: row.get("u.id").unwrap().as_int()?,
            name: row.get("u.name").unwrap().as_string()?,
            email: row.get("u.email").unwrap().as_string()?,
            age: row.get("u.age").unwrap().as_int()?,
            city: row.get("u.city").unwrap().as_string()?,
            tags,
        });
    }

    let export = ExportData {
        exported_at: Utc::now().to_rfc3339(),
        count: users.len(),
        users,
    };

    let mut file = File::create("users_export.json")?;
    let json = serde_json::to_string_pretty(&export)?;
    file.write_all(json.as_bytes())?;

    println!("Exported {} users", export.count);
    Ok(())
}
import { createClient } from '@geodedb/client';
import { writeFileSync } from 'fs';

interface User {
    id: number;
    name: string;
    email: string;
    age: number;
    city: string;
    tags?: string[];
}

interface ExportData {
    exported_at: string;
    count: number;
    users: User[];
}

async function exportToJSON(filename: string) {
    const client = await createClient('quic://localhost:3141');

    const rows = await client.queryAll(`
        MATCH (u:User)
        RETURN u.id, u.name, u.email, u.age, u.city, u.tags
        ORDER BY u.id
    `);

    const users: User[] = [];
    for (const row of rows) {
        const user: User = {
            id: row.get('u.id')?.asNumber!,
            name: row.get('u.name')?.asString!,
            email: row.get('u.email')?.asString!,
            age: row.get('u.age')?.asNumber!,
            city: row.get('u.city')?.asString!,
        };

        const tags = row.get('u.tags')?.asList;
        if (tags && tags.length > 0) {
            user.tags = tags;
        }

        users.push(user);
    }

    const exportData: ExportData = {
        exported_at: new Date().toISOString(),
        count: users.length,
        users,
    };

    writeFileSync(filename, JSON.stringify(exportData, null, 2));
    console.log(`Exported ${users.length} users to ${filename}`);

    await client.close();
}

exportToJSON('users_export.json');

JSONL Export (Streaming)

JSONL format is ideal for streaming large datasets:

import asyncio
import json
from geode_client import Client

async def export_to_jsonl(filename: str, batch_size: int = 1000):
    """Export data to JSONL format with streaming."""
    client = Client(host="localhost", port=3141, skip_verify=True)

    async with client.connection() as conn:
        total_exported = 0

        with open(filename, 'w') as file:
            # Query with pagination
            page, has_more = await conn.query(
                """
                MATCH (u:User)
                RETURN u.id, u.name, u.email, u.age, u.city
                ORDER BY u.id
                """,
                fetch_size=batch_size
            )

            while True:
                # Write current page
                for row in page.rows:
                    record = {
                        "id": row['u.id'].as_int,
                        "name": row['u.name'].as_string,
                        "email": row['u.email'].as_string,
                        "age": row['u.age'].as_int,
                        "city": row['u.city'].as_string
                    }
                    file.write(json.dumps(record) + '\n')
                    total_exported += 1

                if not has_more:
                    break

                # Fetch next page
                page, has_more = await conn.pull(batch_size)

                if total_exported % 10000 == 0:
                    print(f"Exported {total_exported} records...")

        print(f"Total exported: {total_exported} records to {filename}")

asyncio.run(export_to_jsonl("users_export.jsonl"))
package main

import (
    "bufio"
    "context"
    "database/sql"
    "encoding/json"
    "log"
    "os"

    _ "geodedb.com/geode"
)

func main() {
    db, err := sql.Open("geode", "localhost:3141")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ctx := context.Background()

    rows, err := db.QueryContext(ctx, `
        MATCH (u:User)
        RETURN u.id, u.name, u.email, u.age, u.city
        ORDER BY u.id
    `)
    if err != nil {
        log.Fatal(err)
    }
    defer rows.Close()

    file, _ := os.Create("users_export.jsonl")
    defer file.Close()

    writer := bufio.NewWriter(file)
    defer writer.Flush()

    count := 0
    for rows.Next() {
        var id, age int
        var name, email, city string

        rows.Scan(&id, &name, &email, &age, &city)

        record := map[string]interface{}{
            "id":    id,
            "name":  name,
            "email": email,
            "age":   age,
            "city":  city,
        }

        jsonLine, _ := json.Marshal(record)
        writer.Write(jsonLine)
        writer.WriteString("\n")
        count++

        if count%10000 == 0 {
            writer.Flush()
            log.Printf("Exported %d records...", count)
        }
    }

    log.Printf("Total exported: %d records", count)
}

Selective Exports with Queries

Filtering by Properties

// Export users from a specific city
MATCH (u:User)
WHERE u.city = "New York"
RETURN u.id, u.name, u.email

// Export users within age range
MATCH (u:User)
WHERE u.age >= 25 AND u.age <= 35
RETURN u.id, u.name, u.age

// Export users with specific tags
MATCH (u:User)
WHERE "developer" IN u.tags
RETURN u.id, u.name, u.tags

Exporting Graph Subsets

// Export users and their immediate friends
MATCH (u:User)-[:KNOWS]->(friend:User)
WHERE u.city = "San Francisco"
RETURN u.id AS user_id,
       u.name AS user_name,
       collect(friend.name) AS friends

// Export 2-hop network
MATCH path = (u:User {name: "Alice"})-[:KNOWS*1..2]->(connected:User)
RETURN DISTINCT connected.id, connected.name, length(path) AS distance

Complex Filtering Example

import asyncio
import json
from geode_client import Client
from datetime import datetime, timedelta

async def export_active_users(days: int = 30):
    """Export users active in the last N days."""
    client = Client(host="localhost", port=3141, skip_verify=True)

    cutoff_date = (datetime.utcnow() - timedelta(days=days)).isoformat()

    async with client.connection() as conn:
        page, _ = await conn.query(
            """
            MATCH (u:User)
            WHERE u.last_active > $cutoff
            RETURN u.id, u.name, u.email, u.last_active
            ORDER BY u.last_active DESC
            """,
            {"cutoff": cutoff_date}
        )

        users = []
        for row in page.rows:
            users.append({
                "id": row['u.id'].as_int,
                "name": row['u.name'].as_string,
                "email": row['u.email'].as_string,
                "last_active": row['u.last_active'].as_string
            })

        with open(f"active_users_{days}d.json", 'w') as f:
            json.dump({
                "exported_at": datetime.utcnow().isoformat(),
                "filter": f"active in last {days} days",
                "count": len(users),
                "users": users
            }, f, indent=2)

        print(f"Exported {len(users)} active users")

asyncio.run(export_active_users(30))

Incremental Exports

Timestamp-Based Incremental Export

Track changes using timestamp columns:

import asyncio
import json
from datetime import datetime
from geode_client import Client

class IncrementalExporter:
    def __init__(self, state_file: str = "export_state.json"):
        self.state_file = state_file
        self.state = self._load_state()

    def _load_state(self) -> dict:
        try:
            with open(self.state_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return {"last_export": "1970-01-01T00:00:00Z"}

    def _save_state(self):
        with open(self.state_file, 'w') as f:
            json.dump(self.state, f)

    async def export_changes(self, output_file: str):
        """Export records changed since last export."""
        client = Client(host="localhost", port=3141, skip_verify=True)

        async with client.connection() as conn:
            page, _ = await conn.query(
                """
                MATCH (u:User)
                WHERE u.updated_at > $since
                RETURN u.id, u.name, u.email, u.updated_at
                ORDER BY u.updated_at
                """,
                {"since": self.state["last_export"]}
            )

            if not page.rows:
                print("No changes since last export")
                return

            changes = []
            latest_timestamp = self.state["last_export"]

            for row in page.rows:
                updated_at = row['u.updated_at'].as_string

                changes.append({
                    "id": row['u.id'].as_int,
                    "name": row['u.name'].as_string,
                    "email": row['u.email'].as_string,
                    "updated_at": updated_at
                })

                if updated_at > latest_timestamp:
                    latest_timestamp = updated_at

            # Write changes
            with open(output_file, 'w') as f:
                json.dump({
                    "export_type": "incremental",
                    "since": self.state["last_export"],
                    "until": latest_timestamp,
                    "count": len(changes),
                    "changes": changes
                }, f, indent=2)

            # Update state
            self.state["last_export"] = latest_timestamp
            self._save_state()

            print(f"Exported {len(changes)} changed records")

async def main():
    exporter = IncrementalExporter()
    await exporter.export_changes("incremental_export.json")

asyncio.run(main())
package main

import (
    "context"
    "database/sql"
    "encoding/json"
    "log"
    "os"
    "time"

    _ "geodedb.com/geode"
)

type ExportState struct {
    LastExport string `json:"last_export"`
}

type Change struct {
    ID        int    `json:"id"`
    Name      string `json:"name"`
    Email     string `json:"email"`
    UpdatedAt string `json:"updated_at"`
}

func loadState(filename string) ExportState {
    data, err := os.ReadFile(filename)
    if err != nil {
        return ExportState{LastExport: "1970-01-01T00:00:00Z"}
    }
    var state ExportState
    json.Unmarshal(data, &state)
    return state
}

func saveState(filename string, state ExportState) {
    data, _ := json.Marshal(state)
    os.WriteFile(filename, data, 0644)
}

func main() {
    db, err := sql.Open("geode", "localhost:3141")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ctx := context.Background()
    state := loadState("export_state.json")

    rows, err := db.QueryContext(ctx, `
        MATCH (u:User)
        WHERE u.updated_at > ?
        RETURN u.id, u.name, u.email, u.updated_at
        ORDER BY u.updated_at
    `, state.LastExport)
    if err != nil {
        log.Fatal(err)
    }
    defer rows.Close()

    var changes []Change
    var latestTimestamp string = state.LastExport

    for rows.Next() {
        var c Change
        rows.Scan(&c.ID, &c.Name, &c.Email, &c.UpdatedAt)
        changes = append(changes, c)

        if c.UpdatedAt > latestTimestamp {
            latestTimestamp = c.UpdatedAt
        }
    }

    if len(changes) == 0 {
        log.Println("No changes since last export")
        return
    }

    export := map[string]interface{}{
        "export_type": "incremental",
        "since":       state.LastExport,
        "until":       latestTimestamp,
        "count":       len(changes),
        "changes":     changes,
    }

    file, _ := os.Create("incremental_export.json")
    defer file.Close()
    json.NewEncoder(file).Encode(export)

    state.LastExport = latestTimestamp
    saveState("export_state.json", state)

    log.Printf("Exported %d changed records", len(changes))
}

Change Data Capture Pattern

import asyncio
import json
from datetime import datetime
from geode_client import Client

async def capture_changes():
    """Capture and export changes with operation type."""
    client = Client(host="localhost", port=3141, skip_verify=True)

    async with client.connection() as conn:
        # Query change log (assuming you have a change tracking mechanism)
        page, _ = await conn.query("""
            MATCH (c:ChangeLog)
            WHERE c.timestamp > $since
            RETURN c.operation, c.entity_type, c.entity_id,
                   c.old_values, c.new_values, c.timestamp
            ORDER BY c.timestamp
        """, {"since": "2026-01-27T00:00:00Z"})

        changes = []
        for row in page.rows:
            changes.append({
                "operation": row['c.operation'].as_string,  # CREATE, UPDATE, DELETE
                "entity_type": row['c.entity_type'].as_string,
                "entity_id": row['c.entity_id'].as_int,
                "old_values": json.loads(row['c.old_values'].as_string) if row['c.old_values'] else None,
                "new_values": json.loads(row['c.new_values'].as_string) if row['c.new_values'] else None,
                "timestamp": row['c.timestamp'].as_string
            })

        with open('cdc_export.jsonl', 'w') as f:
            for change in changes:
                f.write(json.dumps(change) + '\n')

        print(f"Captured {len(changes)} changes")

asyncio.run(capture_changes())

Streaming Large Datasets

Paginated Export

import asyncio
import json
from geode_client import Client

async def stream_export(output_file: str, page_size: int = 10000):
    """Stream large dataset export with pagination."""
    client = Client(host="localhost", port=3141, skip_verify=True)

    async with client.connection() as conn:
        total_exported = 0
        offset = 0

        with open(output_file, 'w') as f:
            while True:
                page, _ = await conn.query(
                    """
                    MATCH (u:User)
                    RETURN u.id, u.name, u.email, u.age
                    ORDER BY u.id
                    SKIP $offset
                    LIMIT $limit
                    """,
                    {"offset": offset, "limit": page_size}
                )

                if not page.rows:
                    break

                for row in page.rows:
                    record = {
                        "id": row['u.id'].as_int,
                        "name": row['u.name'].as_string,
                        "email": row['u.email'].as_string,
                        "age": row['u.age'].as_int
                    }
                    f.write(json.dumps(record) + '\n')

                total_exported += len(page.rows)
                offset += page_size

                print(f"Exported {total_exported} records...")

                # Break if we got fewer records than page size
                if len(page.rows) < page_size:
                    break

        print(f"Stream export complete: {total_exported} total records")

asyncio.run(stream_export("large_export.jsonl"))
package main

import (
    "bufio"
    "context"
    "database/sql"
    "encoding/json"
    "log"
    "os"

    _ "geodedb.com/geode"
)

func main() {
    db, err := sql.Open("geode", "localhost:3141")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ctx := context.Background()

    file, _ := os.Create("large_export.jsonl")
    defer file.Close()
    writer := bufio.NewWriter(file)
    defer writer.Flush()

    pageSize := 10000
    offset := 0
    totalExported := 0

    for {
        rows, err := db.QueryContext(ctx, `
            MATCH (u:User)
            RETURN u.id, u.name, u.email, u.age
            ORDER BY u.id
            SKIP ?
            LIMIT ?
        `, offset, pageSize)
        if err != nil {
            log.Fatal(err)
        }

        count := 0
        for rows.Next() {
            var id, age int
            var name, email string
            rows.Scan(&id, &name, &email, &age)

            record := map[string]interface{}{
                "id":    id,
                "name":  name,
                "email": email,
                "age":   age,
            }

            jsonLine, _ := json.Marshal(record)
            writer.Write(jsonLine)
            writer.WriteString("\n")
            count++
        }
        rows.Close()

        totalExported += count
        log.Printf("Exported %d records...", totalExported)

        if count < pageSize {
            break
        }
        offset += pageSize
    }

    log.Printf("Stream export complete: %d total records", totalExported)
}

Cursor-Based Export

For very large datasets, use cursor-based pagination:

import asyncio
import json
from geode_client import Client

async def cursor_export(output_file: str, batch_size: int = 10000):
    """Export using cursor-based pagination for better performance."""
    client = Client(host="localhost", port=3141, skip_verify=True)

    async with client.connection() as conn:
        last_id = 0
        total = 0

        with open(output_file, 'w') as f:
            while True:
                page, _ = await conn.query(
                    """
                    MATCH (u:User)
                    WHERE u.id > $last_id
                    RETURN u.id, u.name, u.email
                    ORDER BY u.id
                    LIMIT $limit
                    """,
                    {"last_id": last_id, "limit": batch_size}
                )

                if not page.rows:
                    break

                for row in page.rows:
                    record = {
                        "id": row['u.id'].as_int,
                        "name": row['u.name'].as_string,
                        "email": row['u.email'].as_string
                    }
                    f.write(json.dumps(record) + '\n')
                    last_id = record['id']

                total += len(page.rows)
                print(f"Exported {total} records (last_id: {last_id})")

        print(f"Export complete: {total} records")

asyncio.run(cursor_export("cursor_export.jsonl"))

Export for Analytics Pipelines

Apache Parquet Export

For analytics workloads, export to Parquet format:

import asyncio
import pyarrow as pa
import pyarrow.parquet as pq
from geode_client import Client

async def export_to_parquet(output_file: str):
    """Export data to Apache Parquet format for analytics."""
    client = Client(host="localhost", port=3141, skip_verify=True)

    async with client.connection() as conn:
        page, _ = await conn.query("""
            MATCH (u:User)
            RETURN u.id, u.name, u.email, u.age, u.city, u.created_at
            ORDER BY u.id
        """)

        # Collect data
        ids = []
        names = []
        emails = []
        ages = []
        cities = []
        created_ats = []

        for row in page.rows:
            ids.append(row['u.id'].as_int)
            names.append(row['u.name'].as_string)
            emails.append(row['u.email'].as_string)
            ages.append(row['u.age'].as_int)
            cities.append(row['u.city'].as_string)
            created_ats.append(row['u.created_at'].as_string)

        # Create PyArrow table
        table = pa.table({
            'id': pa.array(ids, type=pa.int64()),
            'name': pa.array(names, type=pa.string()),
            'email': pa.array(emails, type=pa.string()),
            'age': pa.array(ages, type=pa.int32()),
            'city': pa.array(cities, type=pa.string()),
            'created_at': pa.array(created_ats, type=pa.string())
        })

        # Write to Parquet
        pq.write_table(table, output_file, compression='snappy')

        print(f"Exported {len(ids)} records to {output_file}")

asyncio.run(export_to_parquet("users.parquet"))

Data Lake Export

Export data partitioned by date for data lake ingestion:

import asyncio
import json
import os
from datetime import datetime, timedelta
from geode_client import Client

async def export_partitioned(base_path: str, start_date: str, end_date: str):
    """Export data partitioned by date for data lake."""
    client = Client(host="localhost", port=3141, skip_verify=True)

    start = datetime.fromisoformat(start_date)
    end = datetime.fromisoformat(end_date)
    current = start

    async with client.connection() as conn:
        while current <= end:
            date_str = current.strftime("%Y-%m-%d")
            next_date = current + timedelta(days=1)

            page, _ = await conn.query(
                """
                MATCH (e:Event)
                WHERE e.timestamp >= $start AND e.timestamp < $end
                RETURN e.id, e.type, e.user_id, e.timestamp, e.data
                ORDER BY e.timestamp
                """,
                {
                    "start": current.isoformat(),
                    "end": next_date.isoformat()
                }
            )

            if page.rows:
                # Create partition directory
                partition_path = os.path.join(
                    base_path,
                    f"year={current.year}",
                    f"month={current.month:02d}",
                    f"day={current.day:02d}"
                )
                os.makedirs(partition_path, exist_ok=True)

                # Write data
                output_file = os.path.join(partition_path, "events.jsonl")
                with open(output_file, 'w') as f:
                    for row in page.rows:
                        record = {
                            "id": row['e.id'].as_int,
                            "type": row['e.type'].as_string,
                            "user_id": row['e.user_id'].as_int,
                            "timestamp": row['e.timestamp'].as_string,
                            "data": row['e.data'].as_string
                        }
                        f.write(json.dumps(record) + '\n')

                print(f"Exported {len(page.rows)} events for {date_str}")

            current = next_date

asyncio.run(export_partitioned(
    "/data/events",
    "2026-01-01",
    "2026-01-28"
))

Streaming to Message Queue

import asyncio
import json
from geode_client import Client
# from kafka import KafkaProducer  # Uncomment for actual Kafka usage

async def export_to_kafka(topic: str):
    """Stream export to Kafka for real-time analytics."""
    client = Client(host="localhost", port=3141, skip_verify=True)

    # producer = KafkaProducer(
    #     bootstrap_servers=['localhost:9092'],
    #     value_serializer=lambda v: json.dumps(v).encode('utf-8')
    # )

    async with client.connection() as conn:
        page, has_more = await conn.query(
            """
            MATCH (u:User)
            RETURN u.id, u.name, u.email, u.updated_at
            ORDER BY u.updated_at DESC
            """,
            fetch_size=1000
        )

        total = 0
        while True:
            for row in page.rows:
                message = {
                    "id": row['u.id'].as_int,
                    "name": row['u.name'].as_string,
                    "email": row['u.email'].as_string,
                    "updated_at": row['u.updated_at'].as_string
                }

                # producer.send(topic, value=message, key=str(message['id']).encode())
                print(f"Would send to Kafka: {message}")  # Demo output
                total += 1

            if not has_more:
                break

            page, has_more = await conn.pull(1000)

        # producer.flush()
        print(f"Streamed {total} records to Kafka topic '{topic}'")

asyncio.run(export_to_kafka("user-updates"))

Export Utilities

Export Progress Tracking

import asyncio
import json
import time
from geode_client import Client

class ExportProgress:
    def __init__(self, total_estimate: int = None):
        self.total_estimate = total_estimate
        self.exported = 0
        self.start_time = time.time()

    def update(self, count: int):
        self.exported += count
        elapsed = time.time() - self.start_time
        rate = self.exported / elapsed if elapsed > 0 else 0

        if self.total_estimate:
            pct = (self.exported / self.total_estimate) * 100
            eta = (self.total_estimate - self.exported) / rate if rate > 0 else 0
            print(f"\rExported: {self.exported:,} / {self.total_estimate:,} "
                  f"({pct:.1f}%) | Rate: {rate:.0f}/s | ETA: {eta:.0f}s", end='')
        else:
            print(f"\rExported: {self.exported:,} | Rate: {rate:.0f}/s", end='')

    def finish(self):
        elapsed = time.time() - self.start_time
        rate = self.exported / elapsed if elapsed > 0 else 0
        print(f"\nExport complete: {self.exported:,} records in {elapsed:.1f}s ({rate:.0f}/s)")

async def export_with_progress(output_file: str):
    client = Client(host="localhost", port=3141, skip_verify=True)

    async with client.connection() as conn:
        # Get total count estimate
        count_result, _ = await conn.query("MATCH (u:User) RETURN count(u) AS total")
        total = count_result.rows[0]['total'].as_int if count_result.rows else None

        progress = ExportProgress(total)

        with open(output_file, 'w') as f:
            page, has_more = await conn.query(
                "MATCH (u:User) RETURN u.id, u.name, u.email ORDER BY u.id",
                fetch_size=5000
            )

            while True:
                for row in page.rows:
                    f.write(json.dumps({
                        "id": row['u.id'].as_int,
                        "name": row['u.name'].as_string,
                        "email": row['u.email'].as_string
                    }) + '\n')

                progress.update(len(page.rows))

                if not has_more:
                    break

                page, has_more = await conn.pull(5000)

        progress.finish()

asyncio.run(export_with_progress("export_with_progress.jsonl"))

Export Validation

import asyncio
import json
from geode_client import Client

async def validate_export(export_file: str):
    """Validate exported data against source."""
    client = Client(host="localhost", port=3141, skip_verify=True)

    # Read export file
    exported_ids = set()
    with open(export_file, 'r') as f:
        for line in f:
            record = json.loads(line)
            exported_ids.add(record['id'])

    print(f"Exported records: {len(exported_ids)}")

    # Compare with source
    async with client.connection() as conn:
        page, _ = await conn.query("MATCH (u:User) RETURN u.id")

        source_ids = {row['u.id'].as_int for row in page.rows}

        print(f"Source records: {len(source_ids)}")

        missing = source_ids - exported_ids
        extra = exported_ids - source_ids

        if missing:
            print(f"Missing from export: {len(missing)} records")
            print(f"  Sample: {list(missing)[:10]}")

        if extra:
            print(f"Extra in export: {len(extra)} records")
            print(f"  Sample: {list(extra)[:10]}")

        if not missing and not extra:
            print("Validation passed: Export matches source")

asyncio.run(validate_export("users_export.jsonl"))

Next Steps

Resources


Questions? Discuss data export strategies in our forum .