Client Library Development

Geode provides official client libraries for multiple languages, all built on the QUIC+TLS protocol for secure, high-performance connectivity. This page covers client library architecture, development patterns, and integration guidelines.

Official Client Libraries

LanguagePackageProtocolAsync Model
Gogeodedb.com/geodeQUIC/gRPCGoroutines
Pythongeode-clientQUIC/gRPCasyncio
Rustgeode-clientQUIC/gRPCtokio
Ziggeode-client-zigQUIC/gRPCNative
Node.js@geodedb/clientQUIC/gRPCPromises

Wire Protocol

All Geode clients communicate using a Protobuf wire protocol over QUIC+TLS (default) or gRPC:

Message Types

Client → Server:

  • HelloRequest
  • ExecuteRequest
  • PullRequest
  • BeginRequest / CommitRequest / RollbackRequest
  • PingRequest

Server → Client:

  • ExecutionResponse (payloads: SchemaDefinition, DataPage, Error, ExplainPayload, ProfilePayload)

Connection Lifecycle

Client                          Server
  |                               |
  |------ HelloRequest ---------->|
  |<----- HelloResponse ----------|
  |                               |
  |------ ExecuteRequest -------->|
  |<----- ExecutionResponse ------|
  |------ PullRequest ----------->|
  |<----- ExecutionResponse ------|
  |                               |
  |------ PingRequest ----------->|
  |<----- PingResponse -----------|

Client Architecture

Core Components

Every Geode client library implements these components:

┌─────────────────────────────────────────────┐
│              Application Code               │
├─────────────────────────────────────────────┤
│              Query Builder                  │
│         (Fluent API for GQL)               │
├─────────────────────────────────────────────┤
│           Connection Pool                   │
│    (Connection lifecycle management)        │
├─────────────────────────────────────────────┤
│              Connection                     │
│     (Single QUIC stream wrapper)           │
├─────────────────────────────────────────────┤
│            Wire Protocol                    │
│    (JSON serialization/parsing)            │
├─────────────────────────────────────────────┤
│              QUIC + TLS                     │
│      (Transport encryption)                │
└─────────────────────────────────────────────┘

Connection Pool Design

class ConnectionPool:
    """Connection pool architecture pattern"""

    def __init__(self, config):
        self.min_connections = config.min_connections  # Warm pool
        self.max_connections = config.max_connections  # Hard limit
        self.max_idle_time = config.max_idle_time      # Reap idle
        self.health_check_interval = config.health_check

        self._available = Queue()  # Ready connections
        self._in_use = Set()       # Active connections
        self._lock = Lock()

    async def acquire(self, timeout=None):
        """Get connection from pool"""
        # Try available pool first
        # Create new if under max
        # Wait with timeout if at max
        pass

    async def release(self, conn):
        """Return connection to pool"""
        # Validate connection health
        # Return to available or close
        pass

Query Builder Pattern

class QueryBuilder:
    """Fluent query builder pattern"""

    def __init__(self):
        self._clauses = []
        self._params = {}

    def match(self, pattern: str) -> 'QueryBuilder':
        self._clauses.append(f"MATCH {pattern}")
        return self

    def where(self, condition: str) -> 'QueryBuilder':
        self._clauses.append(f"WHERE {condition}")
        return self

    def with_param(self, name: str, value: Any) -> 'QueryBuilder':
        self._params[name] = value
        return self

    def return_(self, *expressions: str) -> 'QueryBuilder':
        self._clauses.append(f"RETURN {', '.join(expressions)}")
        return self

    def build(self) -> Tuple[str, Dict]:
        return '\n'.join(self._clauses), self._params

# Usage
query, params = (QueryBuilder()
    .match("(u:User {id: $id})")
    .with_param("id", user_id)
    .return_("u.name", "u.email")
    .build())

Implementing a Client

Step 1: QUIC Connection

// Rust example using Quinn
use quinn::{ClientConfig, Endpoint};

pub struct GeodeConnection {
    endpoint: Endpoint,
    connection: quinn::Connection,
}

impl GeodeConnection {
    pub async fn connect(addr: &str, tls_config: ClientConfig) -> Result<Self> {
        let endpoint = Endpoint::client("0.0.0.0:0".parse()?)?;
        let connection = endpoint.connect_with(tls_config, addr, "geode")?.await?;

        Ok(Self { endpoint, connection })
    }
}

Step 2: Wire Protocol

use serde::{Deserialize, Serialize};

#[derive(Serialize)]
#[serde(tag = "type")]
enum ClientMessage {
    HELLO { version: u32, client: String },
    RUN_GQL { query: String, params: HashMap<String, Value> },
    PULL { n: i64 },
    BEGIN { isolation: Option<String> },
    COMMIT,
    ROLLBACK,
    PING,
}

#[derive(Deserialize)]
#[serde(tag = "type")]
enum ServerMessage {
    SUCCESS { stats: Option<QueryStats> },
    SCHEMA { columns: Vec<String>, types: Vec<String> },
    BINDINGS { data: Vec<HashMap<String, Value>> },
    ERROR { code: String, message: String },
    PONG,
}

Step 3: Connection Pool

use tokio::sync::{Semaphore, Mutex};

pub struct ConnectionPool {
    config: PoolConfig,
    available: Mutex<Vec<GeodeConnection>>,
    semaphore: Semaphore,
}

impl ConnectionPool {
    pub async fn acquire(&self) -> Result<PooledConnection> {
        let permit = self.semaphore.acquire().await?;

        let conn = {
            let mut available = self.available.lock().await;
            available.pop()
        };

        let conn = match conn {
            Some(c) => c,
            None => GeodeConnection::connect(&self.config.dsn).await?,
        };

        Ok(PooledConnection { conn, permit, pool: self })
    }
}

Step 4: Transaction Support

pub struct Transaction<'a> {
    conn: &'a mut GeodeConnection,
    active: bool,
}

impl<'a> Transaction<'a> {
    pub async fn begin(conn: &'a mut GeodeConnection) -> Result<Self> {
        conn.send(ClientMessage::BEGIN { isolation: None }).await?;
        conn.expect_success().await?;
        Ok(Self { conn, active: true })
    }

    pub async fn commit(mut self) -> Result<()> {
        self.conn.send(ClientMessage::COMMIT).await?;
        self.conn.expect_success().await?;
        self.active = false;
        Ok(())
    }

    pub async fn rollback(mut self) -> Result<()> {
        self.conn.send(ClientMessage::ROLLBACK).await?;
        self.conn.expect_success().await?;
        self.active = false;
        Ok(())
    }
}

impl Drop for Transaction<'_> {
    fn drop(&mut self) {
        if self.active {
            // Automatic rollback on drop
            // (spawn rollback task)
        }
    }
}

Error Handling

GQL Error Codes

class GqlError(Exception):
    """Base GQL error with ISO status code"""

    def __init__(self, code: str, message: str):
        self.code = code
        self.message = message
        self.category = code[:2]  # First two digits

    @property
    def is_retryable(self) -> bool:
        """Transient errors that may succeed on retry"""
        return self.category in ('08', '40', '57')

# Error categories
# 02xxx - No data
# 08xxx - Connection exception (retryable)
# 22xxx - Data exception
# 23xxx - Constraint violation
# 40xxx - Transaction rollback (retryable)
# 42xxx - Syntax error
# 57xxx - Resource limit (retryable)

Retry Logic

async def execute_with_retry(
    pool: ConnectionPool,
    query: str,
    params: dict,
    max_retries: int = 3
) -> Result:
    """Execute query with automatic retry for transient errors"""

    last_error = None

    for attempt in range(max_retries):
        try:
            async with pool.acquire() as conn:
                return await conn.execute(query, params)

        except GqlError as e:
            last_error = e
            if not e.is_retryable:
                raise

            # Exponential backoff
            await asyncio.sleep(0.1 * (2 ** attempt))

        except ConnectionError as e:
            last_error = e
            await asyncio.sleep(0.1 * (2 ** attempt))

    raise last_error

Testing Clients

Unit Tests

import pytest
from unittest.mock import AsyncMock

@pytest.fixture
def mock_connection():
    conn = AsyncMock()
    conn.execute.return_value = Result(
        columns=['name'],
        rows=[{'name': 'Alice'}]
    )
    return conn

async def test_query_builder(mock_connection):
    builder = QueryBuilder(mock_connection)

    result = await (builder
        .match("(u:User)")
        .where("u.active = true")
        .return_("u.name")
        .execute())

    assert result.rows[0]['name'] == 'Alice'
    mock_connection.execute.assert_called_once()

Integration Tests

import pytest
from testcontainers.geode import GeodeContainer

@pytest.fixture(scope="module")
async def geode():
    """Start Geode container for integration tests"""
    with GeodeContainer() as container:
        yield container.get_connection_url()

async def test_real_query(geode):
    pool = ConnectionPool(geode)

    async with pool.acquire() as conn:
        # Create test data
        await conn.execute(
            "CREATE (u:User {name: $name})",
            {"name": "Test User"}
        )

        # Query it back
        result = await conn.execute(
            "MATCH (u:User {name: $name}) RETURN u.name",
            {"name": "Test User"}
        )

        assert result.rows[0]['u.name'] == 'Test User'

Performance Optimization

Connection Reuse

# Configure pool for workload
pool = ConnectionPool(
    dsn="quic://localhost:3141",
    min_connections=5,      # Keep warm
    max_connections=50,     # Scale limit
    max_idle_time=300,      # 5 minutes
    connection_timeout=5,   # Fast fail
)

# Reuse connections
async def handler(request):
    async with pool.acquire() as conn:
        # Connection returned to pool after use
        return await conn.execute(query, params)

Prepared Statements

// Go example - prepare once, execute many
stmt, err := db.Prepare("MATCH (u:User {id: $1}) RETURN u")
if err != nil {
    log.Fatal(err)
}
defer stmt.Close()

// Execute prepared statement
for _, id := range userIDs {
    rows, err := stmt.Query(id)
    // Process rows...
}

Batch Operations

# Batch inserts for better throughput
async def batch_insert(conn, users):
    await conn.execute("""
        UNWIND $users AS user
        CREATE (u:User {
            id: user.id,
            name: user.name,
            email: user.email
        })
    """, {"users": users})

# Insert 1000 users in single round-trip
await batch_insert(conn, user_list)

Further Reading


Related Articles

No articles found with this tag yet.

Back to Home