Fraud Detection Guide
This guide demonstrates how to use Geode for fraud detection. You’ll learn to model fraud-relevant data, identify suspicious patterns, score risk in real-time, and build investigation workflows.
Overview
Graph databases excel at fraud detection because fraudulent activity often involves hidden relationships and patterns that are difficult to detect in traditional databases:
- Account networks - Fraudsters share devices, addresses, or payment methods
- Transaction rings - Money laundering through circular transactions
- Identity fraud - Multiple identities with shared attributes
- Collusion patterns - Coordinated fraudulent behavior
Geode’s graph traversal capabilities make it easy to uncover these patterns in real-time.
Data Model
Core Entities
// User accounts
(:Account {
id: STRING,
email: STRING,
phone: STRING,
created_at: TIMESTAMP,
status: STRING, // "active", "suspended", "flagged"
risk_score: FLOAT,
last_activity: TIMESTAMP,
verification_level: STRING
})
// Personal identity information
(:Identity {
id: STRING,
ssn_hash: STRING, // Hashed SSN
name: STRING,
date_of_birth: DATE,
address_hash: STRING
})
// Devices used for access
(:Device {
id: STRING,
fingerprint: STRING,
device_type: STRING,
os: STRING,
browser: STRING,
first_seen: TIMESTAMP,
last_seen: TIMESTAMP,
risk_score: FLOAT
})
// IP Addresses
(:IPAddress {
ip: STRING,
asn: STRING,
country: STRING,
city: STRING,
is_proxy: BOOLEAN,
is_vpn: BOOLEAN,
is_tor: BOOLEAN,
risk_score: FLOAT
})
// Payment methods
(:PaymentMethod {
id: STRING,
type: STRING, // "credit_card", "bank_account", "crypto"
last_four: STRING,
bin: STRING, // Bank identification number
issuer: STRING,
country: STRING,
created_at: TIMESTAMP,
risk_score: FLOAT
})
// Transactions
(:Transaction {
id: STRING,
amount: FLOAT,
currency: STRING,
type: STRING, // "purchase", "transfer", "withdrawal"
status: STRING, // "pending", "completed", "failed", "flagged"
timestamp: TIMESTAMP,
risk_score: FLOAT,
fraud_flags: LIST<STRING>
})
// Merchants
(:Merchant {
id: STRING,
name: STRING,
category: STRING,
risk_category: STRING,
country: STRING
})
// Fraud cases
(:FraudCase {
id: STRING,
type: STRING,
status: STRING, // "open", "investigating", "confirmed", "closed"
created_at: TIMESTAMP,
resolved_at: TIMESTAMP,
amount_at_risk: FLOAT,
investigator: STRING
})
Relationship Types
// Account relationships
(:Account)-[:HAS_IDENTITY]->(:Identity)
(:Account)-[:USES_DEVICE {first_used: TIMESTAMP, last_used: TIMESTAMP, count: INTEGER}]->(:Device)
(:Account)-[:LOGGED_IN_FROM {timestamp: TIMESTAMP}]->(:IPAddress)
(:Account)-[:HAS_PAYMENT_METHOD {added_at: TIMESTAMP}]->(:PaymentMethod)
(:Account)-[:REFERRED_BY]->(:Account)
// Transaction relationships
(:Account)-[:INITIATED {timestamp: TIMESTAMP}]->(:Transaction)
(:Transaction)-[:PAID_TO]->(:Account)
(:Transaction)-[:USED {timestamp: TIMESTAMP}]->(:PaymentMethod)
(:Transaction)-[:FROM_IP]->(:IPAddress)
(:Transaction)-[:FROM_DEVICE]->(:Device)
(:Transaction)-[:AT]->(:Merchant)
// Fraud relationships
(:Account)-[:FLAGGED_IN]->(:FraudCase)
(:Transaction)-[:FLAGGED_IN]->(:FraudCase)
(:FraudCase)-[:RELATED_TO]->(:FraudCase)
Schema Setup
// Constraints
CREATE CONSTRAINT account_id_unique ON :Account(id) ASSERT UNIQUE
CREATE CONSTRAINT account_email_unique ON :Account(email) ASSERT UNIQUE
CREATE CONSTRAINT device_id_unique ON :Device(id) ASSERT UNIQUE
CREATE CONSTRAINT device_fingerprint_unique ON :Device(fingerprint) ASSERT UNIQUE
CREATE CONSTRAINT transaction_id_unique ON :Transaction(id) ASSERT UNIQUE
CREATE CONSTRAINT payment_method_id_unique ON :PaymentMethod(id) ASSERT UNIQUE
// Indexes for fraud queries
CREATE INDEX account_risk ON :Account(risk_score)
CREATE INDEX account_status ON :Account(status)
CREATE INDEX device_risk ON :Device(risk_score)
CREATE INDEX device_fingerprint ON :Device(fingerprint)
CREATE INDEX ip_address ON :IPAddress(ip)
CREATE INDEX transaction_timestamp ON :Transaction(timestamp)
CREATE INDEX transaction_risk ON :Transaction(risk_score)
CREATE INDEX identity_ssn ON :Identity(ssn_hash)
Fraud Graph Patterns
Pattern 1: Shared Device Network
Multiple accounts using the same device is a strong fraud indicator.
// Find accounts sharing devices
MATCH (device:Device)<-[:USES_DEVICE]-(account:Account)
WITH device, collect(account) AS accounts, count(account) AS account_count
WHERE account_count > 1
RETURN
device.fingerprint AS device,
device.device_type AS type,
account_count,
[a IN accounts | a.email] AS accounts,
[a IN accounts | a.risk_score] AS risk_scores
ORDER BY account_count DESC
LIMIT 20
package main
import (
"context"
"database/sql"
"log"
_ "geodedb.com/geode"
)
type SharedDeviceAlert struct {
DeviceFingerprint string
DeviceType string
AccountCount int
Accounts []string
RiskScores []float64
}
func FindSharedDevices(ctx context.Context, db *sql.DB, minAccounts int) ([]SharedDeviceAlert, error) {
rows, err := db.QueryContext(ctx, `
MATCH (device:Device)<-[:USES_DEVICE]-(account:Account)
WITH device, collect(account) AS accounts, count(account) AS account_count
WHERE account_count > ?
RETURN
device.fingerprint AS device_fingerprint,
device.device_type AS device_type,
account_count,
[a IN accounts | a.email] AS account_emails,
[a IN accounts | a.risk_score] AS risk_scores
ORDER BY account_count DESC
LIMIT 100
`, minAccounts)
if err != nil {
return nil, err
}
defer rows.Close()
var alerts []SharedDeviceAlert
for rows.Next() {
var alert SharedDeviceAlert
err := rows.Scan(
&alert.DeviceFingerprint,
&alert.DeviceType,
&alert.AccountCount,
&alert.Accounts,
&alert.RiskScores,
)
if err != nil {
return nil, err
}
alerts = append(alerts, alert)
}
return alerts, nil
}
func main() {
db, err := sql.Open("geode", "localhost:3141")
if err != nil {
log.Fatal(err)
}
defer db.Close()
ctx := context.Background()
alerts, err := FindSharedDevices(ctx, db, 2)
if err != nil {
log.Fatal(err)
}
for _, alert := range alerts {
log.Printf("Device %s shared by %d accounts: %v",
alert.DeviceFingerprint, alert.AccountCount, alert.Accounts)
}
}
import asyncio
from dataclasses import dataclass
from typing import List
from geode_client import Client
@dataclass
class SharedDeviceAlert:
device_fingerprint: str
device_type: str
account_count: int
accounts: List[str]
risk_scores: List[float]
async def find_shared_devices(
client,
min_accounts: int = 2
) -> List[SharedDeviceAlert]:
"""Find devices shared by multiple accounts."""
async with client.connection() as conn:
result, _ = await conn.query("""
MATCH (device:Device)<-[:USES_DEVICE]-(account:Account)
WITH device, collect(account) AS accounts, count(account) AS account_count
WHERE account_count > $min_accounts
RETURN
device.fingerprint AS device_fingerprint,
device.device_type AS device_type,
account_count,
[a IN accounts | a.email] AS account_emails,
[a IN accounts | a.risk_score] AS risk_scores
ORDER BY account_count DESC
LIMIT 100
""", {"min_accounts": min_accounts})
return [
SharedDeviceAlert(
device_fingerprint=row['device_fingerprint'].as_string,
device_type=row['device_type'].as_string,
account_count=row['account_count'].as_int,
accounts=row['account_emails'].as_list,
risk_scores=row['risk_scores'].as_list
)
for row in result.rows
]
async def main():
client = Client(host="localhost", port=3141, skip_verify=True)
alerts = await find_shared_devices(client, min_accounts=2)
for alert in alerts:
print(f"Device {alert.device_fingerprint} shared by {alert.account_count} accounts:")
for account in alert.accounts:
print(f" - {account}")
asyncio.run(main())
use geode_client::{Client, Value};
use std::collections::HashMap;
#[derive(Debug)]
struct SharedDeviceAlert {
device_fingerprint: String,
device_type: String,
account_count: i64,
accounts: Vec<String>,
risk_scores: Vec<f64>,
}
async fn find_shared_devices(
conn: &mut geode_client::Connection,
min_accounts: i64,
) -> Result<Vec<SharedDeviceAlert>, Box<dyn std::error::Error>> {
let mut params = HashMap::new();
params.insert("min_accounts".to_string(), Value::int(min_accounts));
let (page, _) = conn.query_with_params(r#"
MATCH (device:Device)<-[:USES_DEVICE]-(account:Account)
WITH device, collect(account) AS accounts, count(account) AS account_count
WHERE account_count > $min_accounts
RETURN
device.fingerprint AS device_fingerprint,
device.device_type AS device_type,
account_count,
[a IN accounts | a.email] AS account_emails,
[a IN accounts | a.risk_score] AS risk_scores
ORDER BY account_count DESC
LIMIT 100
"#, ¶ms).await?;
let mut alerts = Vec::new();
for row in &page.rows {
alerts.push(SharedDeviceAlert {
device_fingerprint: row.get("device_fingerprint").unwrap().as_string()?,
device_type: row.get("device_type").unwrap().as_string()?,
account_count: row.get("account_count").unwrap().as_int()?,
accounts: row.get("account_emails").unwrap().as_string_list()?,
risk_scores: row.get("risk_scores").unwrap().as_float_list()?,
});
}
Ok(alerts)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new("127.0.0.1", 3141).skip_verify(true);
let mut conn = client.connect().await?;
let alerts = find_shared_devices(&mut conn, 2).await?;
for alert in alerts {
println!("Device {} shared by {} accounts: {:?}",
alert.device_fingerprint, alert.account_count, alert.accounts);
}
Ok(())
}
import { createClient, Client } from '@geodedb/client';
interface SharedDeviceAlert {
deviceFingerprint: string;
deviceType: string;
accountCount: number;
accounts: string[];
riskScores: number[];
}
async function findSharedDevices(
client: Client,
minAccounts: number = 2
): Promise<SharedDeviceAlert[]> {
const rows = await client.queryAll(`
MATCH (device:Device)<-[:USES_DEVICE]-(account:Account)
WITH device, collect(account) AS accounts, count(account) AS account_count
WHERE account_count > $min_accounts
RETURN
device.fingerprint AS device_fingerprint,
device.device_type AS device_type,
account_count,
[a IN accounts | a.email] AS account_emails,
[a IN accounts | a.risk_score] AS risk_scores
ORDER BY account_count DESC
LIMIT 100
`, { params: { min_accounts: minAccounts } });
return rows.map(row => ({
deviceFingerprint: row.get('device_fingerprint')?.asString ?? '',
deviceType: row.get('device_type')?.asString ?? '',
accountCount: row.get('account_count')?.asNumber ?? 0,
accounts: row.get('account_emails')?.asList?.map(v => v.asString ?? '') ?? [],
riskScores: row.get('risk_scores')?.asList?.map(v => v.asNumber ?? 0) ?? [],
}));
}
async function main() {
const client = await createClient('quic://localhost:3141');
const alerts = await findSharedDevices(client, 2);
for (const alert of alerts) {
console.log(`Device ${alert.deviceFingerprint} shared by ${alert.accountCount} accounts:`);
alert.accounts.forEach(account => console.log(` - ${account}`));
}
await client.close();
}
main();
const std = @import("std");
const geode = @import("geode_client");
const SharedDeviceAlert = struct {
device_fingerprint: []const u8,
device_type: []const u8,
account_count: i64,
accounts: std.ArrayList([]const u8),
risk_scores: std.ArrayList(f64),
};
pub fn findSharedDevices(
client: *geode.GeodeClient,
allocator: std.mem.Allocator,
min_accounts: i64,
) !std.ArrayList(SharedDeviceAlert) {
var params = std.json.ObjectMap.init(allocator);
defer params.deinit();
try params.put("min_accounts", .{ .integer = min_accounts });
try client.sendRunGql(1,
\\MATCH (device:Device)<-[:USES_DEVICE]-(account:Account)
\\WITH device, collect(account) AS accounts, count(account) AS account_count
\\WHERE account_count > $min_accounts
\\RETURN
\\ device.fingerprint AS device_fingerprint,
\\ device.device_type AS device_type,
\\ account_count,
\\ [a IN accounts | a.email] AS account_emails,
\\ [a IN accounts | a.risk_score] AS risk_scores
\\ORDER BY account_count DESC
\\LIMIT 100
, .{ .object = params });
_ = try client.receiveMessage(30000);
try client.sendPull(1, 1000);
const result = try client.receiveMessage(30000);
defer allocator.free(result);
var alerts = std.ArrayList(SharedDeviceAlert).init(allocator);
// Parse result and populate alerts
return alerts;
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var client = geode.GeodeClient.init(allocator, "localhost", 3141, true);
defer client.deinit();
try client.connect();
try client.sendHello("fraud-detection", "1.0.0");
_ = try client.receiveMessage(30000);
const alerts = try findSharedDevices(&client, allocator, 2);
defer alerts.deinit();
for (alerts.items) |alert| {
std.debug.print("Device {s} shared by {d} accounts\n", .{
alert.device_fingerprint,
alert.account_count,
});
}
}
Pattern 2: Transaction Rings
Circular transaction patterns often indicate money laundering.
// Find circular transaction patterns (money laundering rings)
MATCH ring = (start:Account)-[:INITIATED]->(:Transaction)-[:PAID_TO]->
(a2:Account)-[:INITIATED]->(:Transaction)-[:PAID_TO]->
(a3:Account)-[:INITIATED]->(:Transaction)-[:PAID_TO]->(start)
WHERE start <> a2 AND start <> a3 AND a2 <> a3
RETURN
[n IN nodes(ring) WHERE n:Account | n.email] AS ring_accounts,
[n IN nodes(ring) WHERE n:Transaction | n.amount] AS amounts,
[n IN nodes(ring) WHERE n:Transaction | n.timestamp] AS timestamps
LIMIT 20
Pattern 3: Velocity Patterns
Unusual activity spikes indicate potential fraud.
// Find accounts with unusual transaction velocity
MATCH (account:Account)-[:INITIATED]->(tx:Transaction)
WHERE tx.timestamp > timestamp() - duration('PT1H')
WITH account, count(tx) AS tx_count, sum(tx.amount) AS total_amount
WHERE tx_count > 10 OR total_amount > 10000
// Compare to historical average
OPTIONAL MATCH (account)-[:INITIATED]->(historical:Transaction)
WHERE historical.timestamp > timestamp() - duration('P30D')
AND historical.timestamp < timestamp() - duration('PT1H')
WITH account, tx_count, total_amount,
count(historical) / 720.0 AS avg_hourly_count, // 720 hours in 30 days
sum(historical.amount) / 720.0 AS avg_hourly_amount
WHERE tx_count > avg_hourly_count * 5
OR total_amount > avg_hourly_amount * 5
RETURN
account.email,
tx_count AS recent_transactions,
total_amount AS recent_amount,
avg_hourly_count AS typical_hourly_count,
avg_hourly_amount AS typical_hourly_amount,
tx_count / avg_hourly_count AS velocity_multiplier
ORDER BY velocity_multiplier DESC
type VelocityAlert struct {
AccountEmail string
RecentTxCount int
RecentAmount float64
TypicalHourlyCount float64
TypicalHourlyAmount float64
VelocityMultiplier float64
}
func FindVelocityAnomalies(ctx context.Context, db *sql.DB) ([]VelocityAlert, error) {
rows, err := db.QueryContext(ctx, `
MATCH (account:Account)-[:INITIATED]->(tx:Transaction)
WHERE tx.timestamp > timestamp() - duration('PT1H')
WITH account, count(tx) AS tx_count, sum(tx.amount) AS total_amount
WHERE tx_count > 10 OR total_amount > 10000
OPTIONAL MATCH (account)-[:INITIATED]->(historical:Transaction)
WHERE historical.timestamp > timestamp() - duration('P30D')
AND historical.timestamp < timestamp() - duration('PT1H')
WITH account, tx_count, total_amount,
count(historical) / 720.0 AS avg_hourly_count,
sum(historical.amount) / 720.0 AS avg_hourly_amount
WHERE tx_count > avg_hourly_count * 5
OR total_amount > avg_hourly_amount * 5
RETURN
account.email AS email,
tx_count AS recent_transactions,
total_amount AS recent_amount,
avg_hourly_count AS typical_count,
avg_hourly_amount AS typical_amount,
CASE WHEN avg_hourly_count > 0
THEN tx_count / avg_hourly_count
ELSE 999 END AS velocity_multiplier
ORDER BY velocity_multiplier DESC
LIMIT 50
`)
if err != nil {
return nil, err
}
defer rows.Close()
var alerts []VelocityAlert
for rows.Next() {
var alert VelocityAlert
err := rows.Scan(
&alert.AccountEmail,
&alert.RecentTxCount,
&alert.RecentAmount,
&alert.TypicalHourlyCount,
&alert.TypicalHourlyAmount,
&alert.VelocityMultiplier,
)
if err != nil {
return nil, err
}
alerts = append(alerts, alert)
}
return alerts, nil
}
@dataclass
class VelocityAlert:
account_email: str
recent_tx_count: int
recent_amount: float
typical_hourly_count: float
typical_hourly_amount: float
velocity_multiplier: float
async def find_velocity_anomalies(client) -> List[VelocityAlert]:
"""Find accounts with unusual transaction velocity."""
async with client.connection() as conn:
result, _ = await conn.query("""
MATCH (account:Account)-[:INITIATED]->(tx:Transaction)
WHERE tx.timestamp > timestamp() - duration('PT1H')
WITH account, count(tx) AS tx_count, sum(tx.amount) AS total_amount
WHERE tx_count > 10 OR total_amount > 10000
OPTIONAL MATCH (account)-[:INITIATED]->(historical:Transaction)
WHERE historical.timestamp > timestamp() - duration('P30D')
AND historical.timestamp < timestamp() - duration('PT1H')
WITH account, tx_count, total_amount,
count(historical) / 720.0 AS avg_hourly_count,
sum(historical.amount) / 720.0 AS avg_hourly_amount
WHERE tx_count > avg_hourly_count * 5
OR total_amount > avg_hourly_amount * 5
RETURN
account.email AS email,
tx_count AS recent_transactions,
total_amount AS recent_amount,
avg_hourly_count AS typical_count,
avg_hourly_amount AS typical_amount,
CASE WHEN avg_hourly_count > 0
THEN tx_count / avg_hourly_count
ELSE 999 END AS velocity_multiplier
ORDER BY velocity_multiplier DESC
LIMIT 50
""")
return [
VelocityAlert(
account_email=row['email'].as_string,
recent_tx_count=row['recent_transactions'].as_int,
recent_amount=row['recent_amount'].as_float,
typical_hourly_count=row['typical_count'].as_float,
typical_hourly_amount=row['typical_amount'].as_float,
velocity_multiplier=row['velocity_multiplier'].as_float
)
for row in result.rows
]
#[derive(Debug)]
struct VelocityAlert {
account_email: String,
recent_tx_count: i64,
recent_amount: f64,
typical_hourly_count: f64,
typical_hourly_amount: f64,
velocity_multiplier: f64,
}
async fn find_velocity_anomalies(
conn: &mut geode_client::Connection,
) -> Result<Vec<VelocityAlert>, Box<dyn std::error::Error>> {
let (page, _) = conn.query(r#"
MATCH (account:Account)-[:INITIATED]->(tx:Transaction)
WHERE tx.timestamp > timestamp() - duration('PT1H')
WITH account, count(tx) AS tx_count, sum(tx.amount) AS total_amount
WHERE tx_count > 10 OR total_amount > 10000
OPTIONAL MATCH (account)-[:INITIATED]->(historical:Transaction)
WHERE historical.timestamp > timestamp() - duration('P30D')
AND historical.timestamp < timestamp() - duration('PT1H')
WITH account, tx_count, total_amount,
count(historical) / 720.0 AS avg_hourly_count,
sum(historical.amount) / 720.0 AS avg_hourly_amount
WHERE tx_count > avg_hourly_count * 5
OR total_amount > avg_hourly_amount * 5
RETURN
account.email AS email,
tx_count AS recent_transactions,
total_amount AS recent_amount,
avg_hourly_count AS typical_count,
avg_hourly_amount AS typical_amount,
CASE WHEN avg_hourly_count > 0
THEN tx_count / avg_hourly_count
ELSE 999 END AS velocity_multiplier
ORDER BY velocity_multiplier DESC
LIMIT 50
"#).await?;
let mut alerts = Vec::new();
for row in &page.rows {
alerts.push(VelocityAlert {
account_email: row.get("email").unwrap().as_string()?,
recent_tx_count: row.get("recent_transactions").unwrap().as_int()?,
recent_amount: row.get("recent_amount").unwrap().as_float()?,
typical_hourly_count: row.get("typical_count").unwrap().as_float()?,
typical_hourly_amount: row.get("typical_amount").unwrap().as_float()?,
velocity_multiplier: row.get("velocity_multiplier").unwrap().as_float()?,
});
}
Ok(alerts)
}
interface VelocityAlert {
accountEmail: string;
recentTxCount: number;
recentAmount: number;
typicalHourlyCount: number;
typicalHourlyAmount: number;
velocityMultiplier: number;
}
async function findVelocityAnomalies(client: Client): Promise<VelocityAlert[]> {
const rows = await client.queryAll(`
MATCH (account:Account)-[:INITIATED]->(tx:Transaction)
WHERE tx.timestamp > timestamp() - duration('PT1H')
WITH account, count(tx) AS tx_count, sum(tx.amount) AS total_amount
WHERE tx_count > 10 OR total_amount > 10000
OPTIONAL MATCH (account)-[:INITIATED]->(historical:Transaction)
WHERE historical.timestamp > timestamp() - duration('P30D')
AND historical.timestamp < timestamp() - duration('PT1H')
WITH account, tx_count, total_amount,
count(historical) / 720.0 AS avg_hourly_count,
sum(historical.amount) / 720.0 AS avg_hourly_amount
WHERE tx_count > avg_hourly_count * 5
OR total_amount > avg_hourly_amount * 5
RETURN
account.email AS email,
tx_count AS recent_transactions,
total_amount AS recent_amount,
avg_hourly_count AS typical_count,
avg_hourly_amount AS typical_amount,
CASE WHEN avg_hourly_count > 0
THEN tx_count / avg_hourly_count
ELSE 999 END AS velocity_multiplier
ORDER BY velocity_multiplier DESC
LIMIT 50
`);
return rows.map(row => ({
accountEmail: row.get('email')?.asString ?? '',
recentTxCount: row.get('recent_transactions')?.asNumber ?? 0,
recentAmount: row.get('recent_amount')?.asNumber ?? 0,
typicalHourlyCount: row.get('typical_count')?.asNumber ?? 0,
typicalHourlyAmount: row.get('typical_amount')?.asNumber ?? 0,
velocityMultiplier: row.get('velocity_multiplier')?.asNumber ?? 0,
}));
}
const VelocityAlert = struct {
account_email: []const u8,
recent_tx_count: i64,
recent_amount: f64,
typical_hourly_count: f64,
typical_hourly_amount: f64,
velocity_multiplier: f64,
};
pub fn findVelocityAnomalies(
client: *geode.GeodeClient,
allocator: std.mem.Allocator,
) !std.ArrayList(VelocityAlert) {
try client.sendRunGql(1,
\\MATCH (account:Account)-[:INITIATED]->(tx:Transaction)
\\WHERE tx.timestamp > timestamp() - duration('PT1H')
\\WITH account, count(tx) AS tx_count, sum(tx.amount) AS total_amount
\\WHERE tx_count > 10 OR total_amount > 10000
\\OPTIONAL MATCH (account)-[:INITIATED]->(historical:Transaction)
\\WHERE historical.timestamp > timestamp() - duration('P30D')
\\ AND historical.timestamp < timestamp() - duration('PT1H')
\\WITH account, tx_count, total_amount,
\\ count(historical) / 720.0 AS avg_hourly_count,
\\ sum(historical.amount) / 720.0 AS avg_hourly_amount
\\WHERE tx_count > avg_hourly_count * 5
\\ OR total_amount > avg_hourly_amount * 5
\\RETURN
\\ account.email AS email,
\\ tx_count AS recent_transactions,
\\ total_amount AS recent_amount,
\\ avg_hourly_count AS typical_count,
\\ avg_hourly_amount AS typical_amount,
\\ CASE WHEN avg_hourly_count > 0
\\ THEN tx_count / avg_hourly_count
\\ ELSE 999 END AS velocity_multiplier
\\ORDER BY velocity_multiplier DESC
\\LIMIT 50
, null);
_ = try client.receiveMessage(30000);
try client.sendPull(1, 1000);
const result = try client.receiveMessage(30000);
defer allocator.free(result);
var alerts = std.ArrayList(VelocityAlert).init(allocator);
return alerts;
}
Pattern 4: Identity Fraud
Multiple accounts sharing identity attributes.
// Find accounts sharing identity information
MATCH (a1:Account)-[:HAS_IDENTITY]->(i1:Identity)
MATCH (a2:Account)-[:HAS_IDENTITY]->(i2:Identity)
WHERE a1 <> a2
AND (i1.ssn_hash = i2.ssn_hash
OR i1.address_hash = i2.address_hash
OR (i1.name = i2.name AND i1.date_of_birth = i2.date_of_birth))
RETURN
a1.email AS account1,
a2.email AS account2,
i1.name AS name1,
i2.name AS name2,
CASE
WHEN i1.ssn_hash = i2.ssn_hash THEN 'SSN'
WHEN i1.address_hash = i2.address_hash THEN 'Address'
ELSE 'Name+DOB'
END AS match_type
Pattern 5: Cross-Border Suspicious Activity
// Find suspicious cross-border transactions
MATCH (sender:Account)-[:INITIATED]->(tx:Transaction)-[:PAID_TO]->(receiver:Account)
MATCH (tx)-[:FROM_IP]->(ip:IPAddress)
MATCH (tx)-[:USED]->(pm:PaymentMethod)
WHERE ip.country <> pm.country
OR (ip.is_vpn = true OR ip.is_tor = true)
WITH sender, receiver, tx, ip, pm,
CASE
WHEN ip.is_tor = true THEN 50
WHEN ip.is_vpn = true THEN 30
WHEN ip.country <> pm.country THEN 20
ELSE 0
END AS risk_points
WHERE risk_points > 0
RETURN
sender.email AS sender,
receiver.email AS receiver,
tx.amount AS amount,
ip.country AS ip_country,
pm.country AS payment_country,
ip.is_vpn AS is_vpn,
ip.is_tor AS is_tor,
risk_points
ORDER BY risk_points DESC, tx.amount DESC
Real-Time Fraud Scoring
Transaction Risk Score
// Calculate real-time risk score for a transaction
MATCH (account:Account {id: $account_id})
// Factor 1: Account risk (0-25 points)
WITH account,
account.risk_score * 25 AS account_risk
// Factor 2: Device risk (0-25 points)
OPTIONAL MATCH (account)-[:USES_DEVICE]->(device:Device {id: $device_id})
WITH account, account_risk,
COALESCE(device.risk_score * 25, 25) AS device_risk // Unknown device = max risk
// Factor 3: IP risk (0-25 points)
OPTIONAL MATCH (ip:IPAddress {ip: $ip_address})
WITH account, account_risk, device_risk,
CASE
WHEN ip IS NULL THEN 15 // Unknown IP
WHEN ip.is_tor = true THEN 25
WHEN ip.is_vpn = true THEN 20
WHEN ip.is_proxy = true THEN 15
ELSE ip.risk_score * 25
END AS ip_risk
// Factor 4: Transaction pattern risk (0-25 points)
OPTIONAL MATCH (account)-[:INITIATED]->(recent:Transaction)
WHERE recent.timestamp > timestamp() - duration('PT1H')
WITH account, account_risk, device_risk, ip_risk,
count(recent) AS recent_count,
sum(recent.amount) AS recent_total
WITH account_risk, device_risk, ip_risk,
CASE
WHEN recent_count > 20 THEN 25
WHEN recent_count > 10 THEN 15
WHEN recent_total > 10000 THEN 20
WHEN recent_total > 5000 THEN 10
ELSE 0
END AS pattern_risk
// Calculate total risk score (0-100)
RETURN
account_risk + device_risk + ip_risk + pattern_risk AS total_risk,
account_risk,
device_risk,
ip_risk,
pattern_risk
type RiskScore struct {
TotalRisk float64
AccountRisk float64
DeviceRisk float64
IPRisk float64
PatternRisk float64
}
func CalculateTransactionRisk(
ctx context.Context,
db *sql.DB,
accountID, deviceID, ipAddress string,
amount float64,
) (*RiskScore, error) {
row := db.QueryRowContext(ctx, `
MATCH (account:Account {id: ?})
WITH account,
account.risk_score * 25 AS account_risk
OPTIONAL MATCH (account)-[:USES_DEVICE]->(device:Device {id: ?})
WITH account, account_risk,
COALESCE(device.risk_score * 25, 25) AS device_risk
OPTIONAL MATCH (ip:IPAddress {ip: ?})
WITH account, account_risk, device_risk,
CASE
WHEN ip IS NULL THEN 15
WHEN ip.is_tor = true THEN 25
WHEN ip.is_vpn = true THEN 20
WHEN ip.is_proxy = true THEN 15
ELSE ip.risk_score * 25
END AS ip_risk
OPTIONAL MATCH (account)-[:INITIATED]->(recent:Transaction)
WHERE recent.timestamp > timestamp() - duration('PT1H')
WITH account, account_risk, device_risk, ip_risk,
count(recent) AS recent_count,
sum(recent.amount) AS recent_total
WITH account_risk, device_risk, ip_risk,
CASE
WHEN recent_count > 20 THEN 25
WHEN recent_count > 10 THEN 15
WHEN recent_total > 10000 THEN 20
WHEN recent_total > 5000 THEN 10
ELSE 0
END AS pattern_risk
RETURN
account_risk + device_risk + ip_risk + pattern_risk AS total_risk,
account_risk,
device_risk,
ip_risk,
pattern_risk
`, accountID, deviceID, ipAddress)
var score RiskScore
err := row.Scan(
&score.TotalRisk,
&score.AccountRisk,
&score.DeviceRisk,
&score.IPRisk,
&score.PatternRisk,
)
if err != nil {
return nil, err
}
return &score, nil
}
// Decision based on risk score
func ShouldBlockTransaction(score *RiskScore, amount float64) (bool, string) {
if score.TotalRisk >= 80 {
return true, "High risk score"
}
if score.TotalRisk >= 50 && amount >= 1000 {
return true, "Elevated risk with high amount"
}
return false, ""
}
@dataclass
class RiskScore:
total_risk: float
account_risk: float
device_risk: float
ip_risk: float
pattern_risk: float
async def calculate_transaction_risk(
client,
account_id: str,
device_id: str,
ip_address: str,
amount: float
) -> RiskScore:
"""Calculate real-time risk score for a transaction."""
async with client.connection() as conn:
result, _ = await conn.query("""
MATCH (account:Account {id: $account_id})
WITH account,
account.risk_score * 25 AS account_risk
OPTIONAL MATCH (account)-[:USES_DEVICE]->(device:Device {id: $device_id})
WITH account, account_risk,
COALESCE(device.risk_score * 25, 25) AS device_risk
OPTIONAL MATCH (ip:IPAddress {ip: $ip_address})
WITH account, account_risk, device_risk,
CASE
WHEN ip IS NULL THEN 15
WHEN ip.is_tor = true THEN 25
WHEN ip.is_vpn = true THEN 20
WHEN ip.is_proxy = true THEN 15
ELSE ip.risk_score * 25
END AS ip_risk
OPTIONAL MATCH (account)-[:INITIATED]->(recent:Transaction)
WHERE recent.timestamp > timestamp() - duration('PT1H')
WITH account, account_risk, device_risk, ip_risk,
count(recent) AS recent_count,
sum(recent.amount) AS recent_total
WITH account_risk, device_risk, ip_risk,
CASE
WHEN recent_count > 20 THEN 25
WHEN recent_count > 10 THEN 15
WHEN recent_total > 10000 THEN 20
WHEN recent_total > 5000 THEN 10
ELSE 0
END AS pattern_risk
RETURN
account_risk + device_risk + ip_risk + pattern_risk AS total_risk,
account_risk,
device_risk,
ip_risk,
pattern_risk
""", {
"account_id": account_id,
"device_id": device_id,
"ip_address": ip_address
})
if not result.rows:
return RiskScore(100, 25, 25, 25, 25) # Max risk for unknown account
row = result.rows[0]
return RiskScore(
total_risk=row['total_risk'].as_float,
account_risk=row['account_risk'].as_float,
device_risk=row['device_risk'].as_float,
ip_risk=row['ip_risk'].as_float,
pattern_risk=row['pattern_risk'].as_float
)
def should_block_transaction(score: RiskScore, amount: float) -> tuple[bool, str]:
"""Determine if transaction should be blocked based on risk."""
if score.total_risk >= 80:
return True, "High risk score"
if score.total_risk >= 50 and amount >= 1000:
return True, "Elevated risk with high amount"
if score.total_risk >= 60 and score.device_risk >= 20:
return True, "Risky device"
return False, ""
async def process_transaction(
client,
account_id: str,
device_id: str,
ip_address: str,
amount: float
) -> dict:
"""Process transaction with fraud scoring."""
score = await calculate_transaction_risk(
client, account_id, device_id, ip_address, amount
)
should_block, reason = should_block_transaction(score, amount)
return {
"approved": not should_block,
"risk_score": score.total_risk,
"reason": reason if should_block else None,
"breakdown": {
"account": score.account_risk,
"device": score.device_risk,
"ip": score.ip_risk,
"pattern": score.pattern_risk
}
}
#[derive(Debug)]
struct RiskScore {
total_risk: f64,
account_risk: f64,
device_risk: f64,
ip_risk: f64,
pattern_risk: f64,
}
async fn calculate_transaction_risk(
conn: &mut geode_client::Connection,
account_id: &str,
device_id: &str,
ip_address: &str,
) -> Result<RiskScore, Box<dyn std::error::Error>> {
let mut params = HashMap::new();
params.insert("account_id".to_string(), Value::string(account_id));
params.insert("device_id".to_string(), Value::string(device_id));
params.insert("ip_address".to_string(), Value::string(ip_address));
let (page, _) = conn.query_with_params(r#"
MATCH (account:Account {id: $account_id})
WITH account, account.risk_score * 25 AS account_risk
OPTIONAL MATCH (account)-[:USES_DEVICE]->(device:Device {id: $device_id})
WITH account, account_risk,
COALESCE(device.risk_score * 25, 25) AS device_risk
OPTIONAL MATCH (ip:IPAddress {ip: $ip_address})
WITH account, account_risk, device_risk,
CASE
WHEN ip IS NULL THEN 15
WHEN ip.is_tor = true THEN 25
WHEN ip.is_vpn = true THEN 20
ELSE ip.risk_score * 25
END AS ip_risk
OPTIONAL MATCH (account)-[:INITIATED]->(recent:Transaction)
WHERE recent.timestamp > timestamp() - duration('PT1H')
WITH account_risk, device_risk, ip_risk,
count(recent) AS recent_count,
sum(recent.amount) AS recent_total
WITH account_risk, device_risk, ip_risk,
CASE
WHEN recent_count > 20 THEN 25
WHEN recent_count > 10 THEN 15
WHEN recent_total > 10000 THEN 20
ELSE 0
END AS pattern_risk
RETURN
account_risk + device_risk + ip_risk + pattern_risk AS total_risk,
account_risk, device_risk, ip_risk, pattern_risk
"#, ¶ms).await?;
if let Some(row) = page.rows.first() {
Ok(RiskScore {
total_risk: row.get("total_risk").unwrap().as_float()?,
account_risk: row.get("account_risk").unwrap().as_float()?,
device_risk: row.get("device_risk").unwrap().as_float()?,
ip_risk: row.get("ip_risk").unwrap().as_float()?,
pattern_risk: row.get("pattern_risk").unwrap().as_float()?,
})
} else {
Ok(RiskScore {
total_risk: 100.0,
account_risk: 25.0,
device_risk: 25.0,
ip_risk: 25.0,
pattern_risk: 25.0,
})
}
}
interface RiskScore {
totalRisk: number;
accountRisk: number;
deviceRisk: number;
ipRisk: number;
patternRisk: number;
}
async function calculateTransactionRisk(
client: Client,
accountId: string,
deviceId: string,
ipAddress: string
): Promise<RiskScore> {
const rows = await client.queryAll(`
MATCH (account:Account {id: $account_id})
WITH account, account.risk_score * 25 AS account_risk
OPTIONAL MATCH (account)-[:USES_DEVICE]->(device:Device {id: $device_id})
WITH account, account_risk,
COALESCE(device.risk_score * 25, 25) AS device_risk
OPTIONAL MATCH (ip:IPAddress {ip: $ip_address})
WITH account, account_risk, device_risk,
CASE
WHEN ip IS NULL THEN 15
WHEN ip.is_tor = true THEN 25
WHEN ip.is_vpn = true THEN 20
ELSE ip.risk_score * 25
END AS ip_risk
OPTIONAL MATCH (account)-[:INITIATED]->(recent:Transaction)
WHERE recent.timestamp > timestamp() - duration('PT1H')
WITH account_risk, device_risk, ip_risk,
count(recent) AS recent_count
WITH account_risk, device_risk, ip_risk,
CASE WHEN recent_count > 20 THEN 25 ELSE 0 END AS pattern_risk
RETURN
account_risk + device_risk + ip_risk + pattern_risk AS total_risk,
account_risk, device_risk, ip_risk, pattern_risk
`, { params: { account_id: accountId, device_id: deviceId, ip_address: ipAddress } });
if (rows.length === 0) {
return { totalRisk: 100, accountRisk: 25, deviceRisk: 25, ipRisk: 25, patternRisk: 25 };
}
const row = rows[0];
return {
totalRisk: row.get('total_risk')?.asNumber ?? 100,
accountRisk: row.get('account_risk')?.asNumber ?? 25,
deviceRisk: row.get('device_risk')?.asNumber ?? 25,
ipRisk: row.get('ip_risk')?.asNumber ?? 25,
patternRisk: row.get('pattern_risk')?.asNumber ?? 25,
};
}
const RiskScore = struct {
total_risk: f64,
account_risk: f64,
device_risk: f64,
ip_risk: f64,
pattern_risk: f64,
};
pub fn calculateTransactionRisk(
client: *geode.GeodeClient,
allocator: std.mem.Allocator,
account_id: []const u8,
device_id: []const u8,
ip_address: []const u8,
) !RiskScore {
var params = std.json.ObjectMap.init(allocator);
defer params.deinit();
try params.put("account_id", .{ .string = account_id });
try params.put("device_id", .{ .string = device_id });
try params.put("ip_address", .{ .string = ip_address });
try client.sendRunGql(1,
\\MATCH (account:Account {id: $account_id})
\\WITH account, account.risk_score * 25 AS account_risk
\\OPTIONAL MATCH (account)-[:USES_DEVICE]->(device:Device {id: $device_id})
\\WITH account, account_risk,
\\ COALESCE(device.risk_score * 25, 25) AS device_risk
\\OPTIONAL MATCH (ip:IPAddress {ip: $ip_address})
\\WITH account, account_risk, device_risk,
\\ CASE WHEN ip IS NULL THEN 15
\\ WHEN ip.is_tor = true THEN 25
\\ ELSE ip.risk_score * 25 END AS ip_risk
\\OPTIONAL MATCH (account)-[:INITIATED]->(recent:Transaction)
\\WHERE recent.timestamp > timestamp() - duration('PT1H')
\\WITH account_risk, device_risk, ip_risk, count(recent) AS recent_count
\\WITH account_risk, device_risk, ip_risk,
\\ CASE WHEN recent_count > 20 THEN 25 ELSE 0 END AS pattern_risk
\\RETURN account_risk + device_risk + ip_risk + pattern_risk AS total_risk,
\\ account_risk, device_risk, ip_risk, pattern_risk
, .{ .object = params });
_ = try client.receiveMessage(30000);
try client.sendPull(1, 1);
const result = try client.receiveMessage(30000);
defer allocator.free(result);
// Parse and return risk score
return RiskScore{
.total_risk = 0,
.account_risk = 0,
.device_risk = 0,
.ip_risk = 0,
.pattern_risk = 0,
};
}
Investigation Workflows
Create Fraud Case
// Create a new fraud case
CREATE (case:FraudCase {
id: $case_id,
type: $fraud_type,
status: 'open',
created_at: timestamp(),
amount_at_risk: $amount,
investigator: $investigator
})
// Link flagged accounts and transactions
WITH case
UNWIND $account_ids AS account_id
MATCH (account:Account {id: account_id})
CREATE (account)-[:FLAGGED_IN]->(case)
WITH case
UNWIND $transaction_ids AS tx_id
MATCH (tx:Transaction {id: tx_id})
CREATE (tx)-[:FLAGGED_IN]->(case)
RETURN case
Investigate Account Network
// Get full network for investigation
MATCH (account:Account {id: $account_id})
// Get connected accounts via shared attributes
OPTIONAL MATCH (account)-[:USES_DEVICE]->(:Device)<-[:USES_DEVICE]-(device_linked:Account)
OPTIONAL MATCH (account)-[:HAS_PAYMENT_METHOD]->(:PaymentMethod)<-[:HAS_PAYMENT_METHOD]-(payment_linked:Account)
OPTIONAL MATCH (account)-[:HAS_IDENTITY]->(:Identity)<-[:HAS_IDENTITY]-(identity_linked:Account)
OPTIONAL MATCH (account)-[:LOGGED_IN_FROM]->(:IPAddress)<-[:LOGGED_IN_FROM]-(ip_linked:Account)
WITH account,
collect(DISTINCT device_linked) AS device_linked_accounts,
collect(DISTINCT payment_linked) AS payment_linked_accounts,
collect(DISTINCT identity_linked) AS identity_linked_accounts,
collect(DISTINCT ip_linked) AS ip_linked_accounts
// Get transactions
OPTIONAL MATCH (account)-[:INITIATED]->(sent:Transaction)-[:PAID_TO]->(recipient:Account)
OPTIONAL MATCH (sender:Account)-[:INITIATED]->(received:Transaction)-[:PAID_TO]->(account)
RETURN {
account: account,
device_linked: [a IN device_linked_accounts | {id: a.id, email: a.email, risk: a.risk_score}],
payment_linked: [a IN payment_linked_accounts | {id: a.id, email: a.email, risk: a.risk_score}],
identity_linked: [a IN identity_linked_accounts | {id: a.id, email: a.email, risk: a.risk_score}],
ip_linked: [a IN ip_linked_accounts | {id: a.id, email: a.email, risk: a.risk_score}],
sent_to: collect(DISTINCT {recipient: recipient.email, amount: sent.amount, timestamp: sent.timestamp}),
received_from: collect(DISTINCT {sender: sender.email, amount: received.amount, timestamp: received.timestamp})
} AS investigation_data
Timeline Analysis
// Build activity timeline for investigation
MATCH (account:Account {id: $account_id})
// Get all activities
OPTIONAL MATCH (account)-[tx_rel:INITIATED]->(tx:Transaction)
OPTIONAL MATCH (account)-[login_rel:LOGGED_IN_FROM]->(ip:IPAddress)
OPTIONAL MATCH (account)-[device_rel:USES_DEVICE]->(device:Device)
WITH account, collect({
type: 'transaction',
timestamp: tx.timestamp,
details: {amount: tx.amount, status: tx.status, risk: tx.risk_score}
}) AS tx_events
UNION ALL
MATCH (account:Account {id: $account_id})-[r:LOGGED_IN_FROM]->(ip:IPAddress)
RETURN {
type: 'login',
timestamp: r.timestamp,
details: {ip: ip.ip, country: ip.country, is_vpn: ip.is_vpn}
}
ORDER BY timestamp DESC
LIMIT 100
Alert Generation
Real-Time Alert Rules
// Rule 1: High-risk transaction
MATCH (account:Account)-[:INITIATED]->(tx:Transaction)
WHERE tx.timestamp > timestamp() - duration('PT5M')
AND tx.risk_score > 80
AND tx.status = 'pending'
CREATE (alert:Alert {
id: randomUUID(),
type: 'high_risk_transaction',
severity: 'critical',
account_id: account.id,
transaction_id: tx.id,
risk_score: tx.risk_score,
amount: tx.amount,
created_at: timestamp()
})
RETURN alert
// Rule 2: New device with high-value transaction
MATCH (account:Account)-[:USES_DEVICE]->(device:Device)
WHERE device.first_seen > timestamp() - duration('PT24H')
MATCH (account)-[:INITIATED]->(tx:Transaction)
WHERE tx.timestamp > timestamp() - duration('PT1H')
AND tx.amount > 5000
CREATE (alert:Alert {
id: randomUUID(),
type: 'new_device_high_value',
severity: 'high',
account_id: account.id,
device_id: device.id,
transaction_id: tx.id,
amount: tx.amount,
created_at: timestamp()
})
RETURN alert
// Rule 3: Velocity spike
MATCH (account:Account)-[:INITIATED]->(tx:Transaction)
WHERE tx.timestamp > timestamp() - duration('PT1H')
WITH account, count(tx) AS tx_count, sum(tx.amount) AS total_amount
WHERE tx_count > 20 OR total_amount > 50000
CREATE (alert:Alert {
id: randomUUID(),
type: 'velocity_spike',
severity: 'high',
account_id: account.id,
transaction_count: tx_count,
total_amount: total_amount,
created_at: timestamp()
})
RETURN alert
Compliance Considerations
Audit Trail
// Log all fraud-related actions
CREATE (audit:AuditLog {
id: $audit_id,
action: $action, // "flag_account", "block_transaction", etc.
actor: $actor, // Who performed the action
target_type: $target_type, // "account", "transaction"
target_id: $target_id,
reason: $reason,
timestamp: timestamp(),
metadata: $metadata
})
// Link to relevant entities
WITH audit
MATCH (target) WHERE target.id = $target_id
CREATE (audit)-[:ACTED_ON]->(target)
Regulatory Reporting
// Generate SAR (Suspicious Activity Report) data
MATCH (case:FraudCase {id: $case_id})
MATCH (account:Account)-[:FLAGGED_IN]->(case)
OPTIONAL MATCH (account)-[:HAS_IDENTITY]->(identity:Identity)
OPTIONAL MATCH (tx:Transaction)-[:FLAGGED_IN]->(case)
WITH case, account, identity, collect(tx) AS transactions
RETURN {
case_id: case.id,
case_type: case.type,
created_at: case.created_at,
accounts: collect({
id: account.id,
email: account.email,
name: identity.name,
risk_score: account.risk_score
}),
transactions: [t IN transactions | {
id: t.id,
amount: t.amount,
currency: t.currency,
timestamp: t.timestamp,
risk_score: t.risk_score
}],
total_amount_at_risk: sum([t IN transactions | t.amount])
} AS sar_data
Data Retention
// Archive old fraud data (compliance with retention policies)
MATCH (case:FraudCase)
WHERE case.status = 'closed'
AND case.resolved_at < timestamp() - duration('P7Y')
// Archive before deletion
CREATE (archive:ArchivedCase {
original_id: case.id,
type: case.type,
amount_at_risk: case.amount_at_risk,
created_at: case.created_at,
resolved_at: case.resolved_at,
archived_at: timestamp()
})
// Delete original
DETACH DELETE case
Performance Optimization
Indexing for Fraud Queries
// Essential indexes
CREATE INDEX account_status ON :Account(status)
CREATE INDEX account_risk ON :Account(risk_score)
CREATE INDEX device_fingerprint ON :Device(fingerprint)
CREATE INDEX ip_address ON :IPAddress(ip)
CREATE INDEX transaction_timestamp ON :Transaction(timestamp)
CREATE INDEX transaction_status ON :Transaction(status)
// Composite indexes for common queries
CREATE INDEX transaction_timestamp_risk ON :Transaction(timestamp, risk_score)
CREATE INDEX account_status_risk ON :Account(status, risk_score)
Batch Processing
// Batch update risk scores
CALL {
MATCH (account:Account)
WHERE account.last_risk_update < timestamp() - duration('PT1H')
OR account.last_risk_update IS NULL
// Calculate new risk score
OPTIONAL MATCH (account)-[:FLAGGED_IN]->(:FraudCase {status: 'confirmed'})
WITH account, count(*) AS fraud_cases
OPTIONAL MATCH (account)-[:USES_DEVICE]->(device:Device)
WHERE device.risk_score > 0.5
WITH account, fraud_cases, count(device) AS risky_devices
SET account.risk_score = CASE
WHEN fraud_cases > 0 THEN 1.0
WHEN risky_devices > 2 THEN 0.8
WHEN risky_devices > 0 THEN 0.5
ELSE account.risk_score
END,
account.last_risk_update = timestamp()
RETURN count(account) AS updated
}
Monitoring Dashboard
Key Metrics
// Fraud detection metrics
MATCH (tx:Transaction)
WHERE tx.timestamp > timestamp() - duration('P1D')
WITH count(tx) AS total_transactions,
count(CASE WHEN tx.status = 'flagged' THEN 1 END) AS flagged_transactions,
sum(tx.amount) AS total_volume,
sum(CASE WHEN tx.status = 'flagged' THEN tx.amount END) AS flagged_volume
MATCH (case:FraudCase)
WHERE case.created_at > timestamp() - duration('P1D')
WITH total_transactions, flagged_transactions, total_volume, flagged_volume,
count(case) AS new_cases,
sum(case.amount_at_risk) AS amount_at_risk
MATCH (alert:Alert)
WHERE alert.created_at > timestamp() - duration('P1D')
WITH total_transactions, flagged_transactions, total_volume, flagged_volume,
new_cases, amount_at_risk,
count(alert) AS total_alerts,
count(CASE WHEN alert.severity = 'critical' THEN 1 END) AS critical_alerts
RETURN {
total_transactions: total_transactions,
flagged_transactions: flagged_transactions,
flag_rate: toFloat(flagged_transactions) / total_transactions,
total_volume: total_volume,
flagged_volume: flagged_volume,
new_cases: new_cases,
amount_at_risk: amount_at_risk,
total_alerts: total_alerts,
critical_alerts: critical_alerts
} AS daily_metrics
Next Steps
- Knowledge Graph Guide - Entity resolution for fraud
- Social Network Guide - Network analysis techniques
- Query Performance Guide - Optimize fraud queries
- Security Guide - Secure your fraud data
Resources
Questions? Join our community forum to discuss fraud detection strategies.