Unit Testing in Geode
Unit testing is fundamental to Geode’s development methodology, ensuring code correctness, preventing regressions, and enabling confident refactoring. Geode maintains 97.4% test coverage with 1,644 passing tests out of 1,688 total tests.
Geode’s Testing Philosophy
Geode follows evidence-based development with CANARY markers tracking all implementations. The testing strategy emphasizes:
No Mocks for Database Logic: Tests run against real Geode server instances, not mocks. This ensures tests validate actual behavior rather than simulated responses.
Comprehensive Coverage: Every feature implementation must include corresponding tests. The GeodeTestLab suite provides comprehensive validation.
Fast Feedback: Unit tests complete in 2-3 minutes, providing rapid developer feedback.
ISO Conformance: 100% compliance for ISO/IEC 39075:2024 (see conformance profile).
Unit Test Structure in Geode
Server Tests (Zig)
Geode’s server is implemented in Zig with built-in test framework:
// geode/src/parser/lexer_test.zig
const std = @import("std");
const testing = std.testing;
const Lexer = @import("lexer.zig").Lexer;
test "lexer tokenizes simple GQL query" {
const allocator = testing.allocator;
const input = "MATCH (n:User) RETURN n";
var lexer = try Lexer.init(allocator, input);
defer lexer.deinit();
const tokens = try lexer.tokenize();
defer allocator.free(tokens);
try testing.expectEqual(@as(usize, 7), tokens.len);
try testing.expectEqual(Token.Type.MATCH, tokens[0].type);
try testing.expectEqual(Token.Type.LPAREN, tokens[1].type);
try testing.expectEqualStrings("n", tokens[2].value);
}
test "lexer handles Unicode identifiers" {
const allocator = testing.allocator;
const input = "MATCH (user:用户) RETURN user.名前";
var lexer = try Lexer.init(allocator, input);
defer lexer.deinit();
const tokens = try lexer.tokenize();
defer allocator.free(tokens);
// Verify Unicode support
try testing.expectEqualStrings("用户", tokens[3].value);
try testing.expectEqualStrings("名前", tokens[8].value);
}
test "lexer error handling - unterminated string" {
const allocator = testing.allocator;
const input = "MATCH (n) WHERE n.name = 'unterminated";
var lexer = try Lexer.init(allocator, input);
defer lexer.deinit();
const result = lexer.tokenize();
try testing.expectError(error.UnterminatedString, result);
}
Run Tests:
cd geode
# Run all tests
zig build test
# Run specific test file
zig build test -Dtest-filter="lexer_test"
# Run with coverage
zig build test -Dcoverage=true
Query Planner Tests
// geode/src/planner/query_planner_test.zig
const QueryPlanner = @import("query_planner.zig").QueryPlanner;
test "planner chooses index for equality filter" {
const allocator = testing.allocator;
var db = try TestDatabase.init(allocator);
defer db.deinit();
// Create index
try db.exec("CREATE INDEX user_email ON User(email)");
// Parse query
const query = "MATCH (u:User {email: $email}) RETURN u";
var planner = try QueryPlanner.init(allocator, &db);
defer planner.deinit();
const plan = try planner.plan(query);
defer plan.deinit();
// Verify planner chose index seek
try testing.expect(plan.operations[0].type == .IndexSeek);
try testing.expectEqualStrings("user_email", plan.operations[0].index_name);
}
test "planner avoids cartesian product" {
const allocator = testing.allocator;
var db = try TestDatabase.init(allocator);
defer db.deinit();
// Query that could create cartesian product
const query = "MATCH (a:User), (b:Product) WHERE a.pref = b.cat RETURN a, b";
var planner = try QueryPlanner.init(allocator, &db);
defer planner.deinit();
const plan = try planner.plan(query);
defer plan.deinit();
// Verify planner adds relationship or index to avoid cartesian product
try testing.expect(!plan.hasCartesianProduct());
}
Transaction Tests
// geode/src/storage/transaction_test.zig
test "transaction ACID properties" {
const allocator = testing.allocator;
var db = try TestDatabase.init(allocator);
defer db.deinit();
// Test Atomicity
{
var tx = try db.beginTransaction();
errdefer tx.rollback();
try tx.exec("CREATE (u:User {name: 'Alice'})");
try tx.exec("CREATE (u:User {name: 'Bob'})");
// Simulate error
const result = tx.exec("INVALID QUERY");
try testing.expectError(error.ParseError, result);
// Rollback
try tx.rollback();
// Verify no data written
const count = try db.exec("MATCH (u:User) RETURN count(u)");
try testing.expectEqual(@as(i64, 0), count);
}
// Test Isolation
{
var tx1 = try db.beginTransaction();
defer tx1.rollback();
var tx2 = try db.beginTransaction();
defer tx2.rollback();
// TX1 writes
try tx1.exec("CREATE (u:User {name: 'Charlie'})");
// TX2 should not see TX1's changes
const count = try tx2.exec("MATCH (u:User) RETURN count(u)");
try testing.expectEqual(@as(i64, 0), count);
}
}
Client Library Tests
Go Client Tests:
// geode-client-go/client_test.go
package geode
import (
"context"
"database/sql"
"testing"
)
func TestBasicQuery(t *testing.T) {
db, err := sql.Open("geode", "quic://localhost:3141")
if err != nil {
t.Fatal(err)
}
defer db.Close()
// Execute query
rows, err := db.Query("MATCH (n) RETURN count(n) AS cnt")
if err != nil {
t.Fatal(err)
}
defer rows.Close()
var count int64
if rows.Next() {
if err := rows.Scan(&count); err != nil {
t.Fatal(err)
}
}
if count < 0 {
t.Errorf("Expected non-negative count, got %d", count)
}
}
func TestPreparedStatement(t *testing.T) {
db, err := sql.Open("geode", "quic://localhost:3141")
if err != nil {
t.Fatal(err)
}
defer db.Close()
// Prepare statement
stmt, err := db.Prepare("MATCH (u:User {email: $1}) RETURN u.name")
if err != nil {
t.Fatal(err)
}
defer stmt.Close()
// Execute with parameter
var name string
err = stmt.QueryRow("[email protected]").Scan(&name)
if err != nil && err != sql.ErrNoRows {
t.Fatal(err)
}
}
func TestTransaction(t *testing.T) {
db, err := sql.Open("geode", "quic://localhost:3141")
if err != nil {
t.Fatal(err)
}
defer db.Close()
tx, err := db.Begin()
if err != nil {
t.Fatal(err)
}
// Execute in transaction
_, err = tx.Exec("CREATE (u:User {name: 'Test User'})")
if err != nil {
tx.Rollback()
t.Fatal(err)
}
// Commit
if err := tx.Commit(); err != nil {
t.Fatal(err)
}
// Verify data persisted
var count int
err = db.QueryRow("MATCH (u:User {name: 'Test User'}) RETURN count(u)").Scan(&count)
if err != nil {
t.Fatal(err)
}
if count != 1 {
t.Errorf("Expected 1 user, got %d", count)
}
}
Python Client Tests:
# geode-client-python/tests/test_client.py
import pytest
from geode_client import Client
@pytest.mark.asyncio
async def test_basic_query():
client = Client(host="localhost", port=3141)
async with client.connection() as conn:
result, _ = await conn.query("MATCH (n) RETURN count(n) AS cnt")
assert len(result.bindings) == 1
assert result.bindings[0]['cnt'] >= 0
@pytest.mark.asyncio
async def test_parameterized_query():
client = Client(host="localhost", port=3141)
async with client.connection() as conn:
result, _ = await conn.query(
"MATCH (u:User {email: $email}) RETURN u.name",
{"email": "[email protected]"}
)
# Should not raise exception
@pytest.mark.asyncio
async def test_transaction():
client = Client(host="localhost", port=3141)
async with client.connection() as conn:
async with client.connection() as tx:
await tx.begin()
await tx.execute("CREATE (u:User {name: 'Test'})")
await tx.execute("CREATE (u:User {name: 'Test2'})")
# Auto-commits on exit
# Verify committed
result, _ = await conn.query(
"MATCH (u:User) WHERE u.name STARTS WITH 'Test' RETURN count(u)"
)
assert result.bindings[0]['count(u)'] == 2
@pytest.mark.asyncio
async def test_transaction_rollback():
client = Client(host="localhost", port=3141)
async with client.connection() as conn:
try:
async with client.connection() as tx:
await tx.begin()
await tx.execute("CREATE (u:User {name: 'Rollback Test'})")
raise Exception("Simulated error")
except:
pass
# Verify rolled back
result, _ = await conn.query(
"MATCH (u:User {name: 'Rollback Test'}) RETURN count(u)"
)
assert result.bindings[0]['count(u)'] == 0
Test Coverage
Geode tracks test coverage meticulously:
# Generate coverage report
cd geode
zig build test -Dcoverage=true
# View coverage
zig build coverage-report
# Geode Coverage Stats:
# - Overall: 97.4% (1644/1688 tests passing)
# - Core modules: 95%+
# - Security code: 100%
# - GQL parser: 95%+
Coverage Requirements:
# scripts/check-coverage.py
def check_coverage_requirements(coverage_data):
requirements = {
'overall': 90.0,
'core': 95.0,
'security': 100.0,
'parser': 95.0,
}
for module, threshold in requirements.items():
actual = coverage_data.get_module_coverage(module)
if actual < threshold:
print(f"FAILED: {module} coverage {actual}% < {threshold}%")
return False
return True
Best Practices
1. Test Naming Convention
// Use descriptive test names
test "parser correctly handles nested parentheses" { }
test "transaction isolation prevents dirty reads" { }
test "index seek optimization for equality filters" { }
2. Arrange-Act-Assert Pattern
test "user creation with valid data" {
// Arrange
const allocator = testing.allocator;
var db = try TestDatabase.init(allocator);
defer db.deinit();
// Act
const result = try db.exec(
"CREATE (u:User {name: 'Alice', email: '[email protected]'}) RETURN u"
);
// Assert
try testing.expectEqual(@as(usize, 1), result.len);
try testing.expectEqualStrings("Alice", result[0].get("name"));
}
3. Test Data Management
const TestDatabase = struct {
db: *Database,
allocator: std.mem.Allocator,
pub fn init(allocator: std.mem.Allocator) !TestDatabase {
// Create temporary database
var db = try Database.initTemp(allocator);
return TestDatabase{ .db = db, .allocator = allocator };
}
pub fn deinit(self: *TestDatabase) void {
self.db.deinit();
}
};
4. Parallel Test Execution
# Run tests in parallel (8 shards)
zig build test -Dparallel=8
# Results: 8-minute suite runs in 1-2 minutes
Advanced Testing Techniques
Property-Based Testing
// Property-based testing for parser
const testing = std.testing;
const fuzzing = @import("fuzzing");
test "parser handles all valid GQL syntax" {
const allocator = testing.allocator;
// Generate random valid GQL queries
var generator = fuzzing.GqlGenerator.init(allocator);
defer generator.deinit();
var i: usize = 0;
while (i < 1000) : (i += 1) {
const query = try generator.generate_valid_query();
defer allocator.free(query);
var parser = try Parser.init(allocator, query);
defer parser.deinit();
// Should not crash or error on valid input
const ast = parser.parse() catch |err| {
std.debug.print("Failed on query: {s}\n", .{query});
return err;
};
defer ast.deinit();
}
}
test "lexer rejects all invalid input" {
const allocator = testing.allocator;
const invalid_inputs = [_][]const u8{
"MATCH (u:User {unterminated string",
"MATCH () ) ((",
"RETURN 12.34.56", // Invalid number
"MATCH $$$$$", // Invalid parameter
};
for (invalid_inputs) |input| {
var lexer = try Lexer.init(allocator, input);
defer lexer.deinit();
const result = lexer.tokenize();
try testing.expectError(error.ParseError, result);
}
}
Parameterized Tests
# geode-client-python/tests/test_parameterized.py
import pytest
@pytest.mark.parametrize("value,expected_type", [
(42, "integer"),
(3.14, "float"),
("hello", "string"),
(True, "boolean"),
(None, "null"),
([1, 2, 3], "list"),
({"key": "value"}, "map"),
])
async def test_data_type_handling(client, value, expected_type):
"""Test all data types are handled correctly"""
result, _ = await client.query(
"CREATE (n:Test {value: $value}) RETURN n.value, type(n.value)",
{"value": value}
)
assert result.bindings[0]['type(n.value)'] == expected_type
@pytest.mark.parametrize("query,should_error", [
("MATCH (n) RETURN n", False),
("INVALID SYNTAX", True),
("MATCH (n WHERE n.id = 1", True), # Missing parenthesis
("RETURN 1 + 'string'", True), # Type mismatch
])
async def test_query_validation(client, query, should_error):
"""Test query validation"""
if should_error:
with pytest.raises(Exception):
await client.execute(query)
else:
result, _ = await client.query(query)
assert result is not None
Performance Tests
// Benchmark tests
const std = @import("std");
const testing = std.testing;
test "query execution performance benchmark" {
const allocator = testing.allocator;
var db = try TestDatabase.init(allocator);
defer db.deinit();
// Create test data
var i: usize = 0;
while (i < 10000) : (i += 1) {
try db.exec("CREATE (u:User {id: $id, name: 'User $id'})",
.{.id = i});
}
// Benchmark query
const iterations = 1000;
const start = std.time.nanoTimestamp();
var j: usize = 0;
while (j < iterations) : (j += 1) {
const result = try db.exec("MATCH (u:User {id: 5000}) RETURN u");
result.deinit();
}
const end = std.time.nanoTimestamp();
const duration_ms = @intCast(u64, end - start) / 1_000_000;
const qps = iterations * 1000 / duration_ms;
std.debug.print("Query performance: {} QPS\n", .{qps});
// Assert minimum performance
const baseline_qps: u64 = 0; // Set based on your benchmark baseline
try testing.expect(qps >= baseline_qps);
}
Mutation Testing
# mutation_test.py
"""
Mutation testing verifies test quality by intentionally breaking code
"""
def run_mutation_tests():
mutations = [
("==", "!="), # Flip equality
(">", ">="), # Change boundary
("and", "or"), # Flip logic
("true", "false"),
]
results = []
for original, mutated in mutations:
# Apply mutation
apply_mutation(original, mutated)
# Run tests
test_result = run_tests()
if test_result.all_passed:
# Mutation survived - tests didn't catch the bug!
results.append({
'mutation': f"{original} -> {mutated}",
'survived': True,
'weakness': 'Tests do not cover this logic'
})
else:
# Mutation caught - tests are working
results.append({
'mutation': f"{original} -> {mutated}",
'survived': False
})
# Revert mutation
revert_mutation()
return results
Snapshot Testing
// geode-client-rust/tests/snapshot_tests.rs
use insta::assert_snapshot;
use geode_client::Client;
#[tokio::test]
async fn test_query_plan_snapshot() -> geode_client::Result<()> {
let client = Client::from_dsn("localhost:3141")?;
let mut conn = client.connect().await?;
let plan = conn
.explain("MATCH (u:User)-[:KNOWS]->(friend) WHERE u.country = 'USA' RETURN friend.name")
.await?;
// Compare against saved snapshot
assert_snapshot!(format!("{:#?}", plan));
Ok(())
}
#[tokio::test]
async fn test_error_message_snapshot() {
let client = Client::from_dsn("localhost:3141").unwrap();
let mut conn = client.connect().await.unwrap();
let error = conn.query("INVALID SYNTAX").await.unwrap_err();
// Ensure error messages stay consistent
assert_snapshot!(error.to_string());
}
Test Infrastructure
Continuous Integration Pipeline
# .github/workflows/test.yml
name: Test Suite
on: [push, pull_request]
jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Zig
uses: goto-bus-stop/setup-zig@v2
with:
version: 0.1.0
- name: Run Geode unit tests
run: |
cd geode
zig build test --summary all
- name: Generate coverage report
run: |
cd geode
zig build test -Dcoverage=true
zig build coverage-report
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
files: ./geode/coverage/lcov.info
client-tests:
runs-on: ubuntu-latest
strategy:
matrix:
client: [go, python, rust, zig]
steps:
- uses: actions/checkout@v3
- name: Run ${{ matrix.client }} tests
run: |
cd geode-client-${{ matrix.client }}
make test
integration-tests:
runs-on: ubuntu-latest
needs: [unit-tests, client-tests]
steps:
- uses: actions/checkout@v3
- name: Start Geode server
run: |
cd geode
zig build
./zig-out/bin/geode serve &
sleep 5
- name: Run integration tests
run: |
cd geode-test-harness
make test-all
Test Fixture Management
# conftest.py - Shared test fixtures
import pytest
import asyncio
from geode_client import Client
@pytest.fixture(scope="session")
def event_loop():
"""Create event loop for async tests"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def geode_server():
"""Start Geode server for testing"""
import subprocess
process = subprocess.Popen(
["./zig-out/bin/geode", "serve", "--test-mode"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Wait for server to be ready
await asyncio.sleep(2)
yield process
# Cleanup
process.terminate()
process.wait(timeout=10)
@pytest.fixture
async def client(geode_server):
"""Provide test client"""
client = Client("localhost", 3141)
async with client.connection() as c:
yield c
@pytest.fixture
async def clean_db(client):
"""Clean database before each test"""
await c.execute("MATCH (n) DETACH DELETE n")
yield
await c.execute("MATCH (n) DETACH DELETE n")
@pytest.fixture
def sample_graph():
"""Sample graph data"""
return {
"nodes": [
{"id": 1, "label": "User", "name": "Alice"},
{"id": 2, "label": "User", "name": "Bob"},
],
"edges": [
{"from": 1, "to": 2, "type": "KNOWS"},
]
}
Test Quality Metrics
Coverage Analysis
# coverage_analyzer.py
class CoverageAnalyzer:
def analyze_coverage(self, coverage_file):
"""Analyze test coverage and identify gaps"""
with open(coverage_file) as f:
coverage_data = json.load(f)
critical_paths = self.identify_critical_paths()
uncovered_critical = []
for path in critical_paths:
if coverage_data.get(path, {}).get('coverage', 0) < 100:
uncovered_critical.append({
'path': path,
'coverage': coverage_data.get(path, {}).get('coverage', 0),
'priority': 'high'
})
return {
'overall_coverage': coverage_data['summary']['coverage'],
'critical_gaps': uncovered_critical,
'recommendations': self.generate_recommendations(uncovered_critical)
}
def identify_critical_paths(self):
"""Identify critical code paths that must be tested"""
return [
'src/parser/parser.zig',
'src/storage/transaction.zig',
'src/security/auth.zig',
'src/query/executor.zig',
]
Related Topics
- Integration Tests : Integration testing strategies
- CI/CD : Continuous integration
- Continuous Integration : Build automation
- Deployment : Deployment strategies
- Testing : General testing practices
Further Reading
- Testing Guide:
/docs/development/testing-guide/ - Test Coverage:
/docs/development/test-coverage/ - GeodeTestLab:
/docs/development/geodetestlab/ - CANARY Markers:
/docs/development/canary-markers/ - Test Patterns:
/docs/development/test-patterns/ - Property-Based Testing:
/docs/development/property-based-testing/