<!-- CANARY: REQ=REQ-DOCS-001; FEATURE="Docs"; ASPECT=Documentation; STATUS=TESTED; OWNER=docs; UPDATED=2026-01-15 -->
<h2 id="etl-pipelines--data-integration" class="position-relative d-flex align-items-center group">
<span>ETL Pipelines &amp; Data Integration</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="etl-pipelines--data-integration"
aria-haspopup="dialog"
aria-label="Share link: ETL Pipelines &amp; Data Integration">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h2><div id="headingShareModal" class="heading-share-modal" role="dialog" aria-modal="true" aria-labelledby="headingShareTitle" hidden>
<div class="hsm-dialog" role="document">
<div class="hsm-header">
<h2 id="headingShareTitle" class="h6 mb-0 fw-bold">Share this section</h2>
<button type="button" class="hsm-close" aria-label="Close">
<i class="fa-solid fa-xmark"></i>
</button>
</div>
<div class="hsm-body">
<label for="headingShareInput" class="form-label small text-muted mb-1 text-uppercase fw-bold" style="font-size: 0.7rem; letter-spacing: 0.5px;">Permalink</label>
<div class="input-group mb-4 hsm-url-group">
<input id="headingShareInput" type="text" class="form-control font-monospace" readonly aria-readonly="true" style="font-size: 0.85rem;" />
<button class="btn btn-primary hsm-copy" type="button" aria-label="Copy" title="Copy">
<i class="fa-duotone fa-clipboard" aria-hidden="true"></i>
</button>
</div>
<div class="small fw-bold mb-2 text-muted text-uppercase" style="font-size: 0.7rem; letter-spacing: 0.5px;">Share via</div>
<div class="hsm-share-grid">
<a id="share-twitter" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer">
<i class="fa-brands fa-twitter me-2"></i>Twitter
</a>
<a id="share-linkedin" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer">
<i class="fa-brands fa-linkedin me-2"></i>LinkedIn
</a>
<a id="share-facebook" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer">
<i class="fa-brands fa-facebook me-2"></i>Facebook
</a>
</div>
</div>
</div>
</div>
<style>
.heading-share-modal {
position: fixed;
inset: 0;
display: flex;
justify-content: center;
align-items: center;
background: rgba(0, 0, 0, 0.6);
z-index: 1050;
padding: 1rem;
backdrop-filter: blur(4px);
-webkit-backdrop-filter: blur(4px);
}
.heading-share-modal[hidden] { display: none !important; }
.hsm-dialog {
max-width: 420px;
width: 100%;
background: var(--bs-body-bg, #fff);
color: var(--bs-body-color, #212529);
border: 1px solid var(--bs-border-color, rgba(0,0,0,0.1));
border-radius: 1rem;
box-shadow: 0 25px 50px -12px rgba(0, 0, 0, 0.25);
overflow: hidden;
animation: hsm-fade-in 0.2s ease-out;
}
@keyframes hsm-fade-in {
from { opacity: 0; transform: scale(0.95); }
to { opacity: 1; transform: scale(1); }
}
[data-bs-theme="dark"] .hsm-dialog {
background: #1e293b;
border-color: rgba(255,255,255,0.1);
color: #f8f9fa;
}
.hsm-header {
display: flex;
justify-content: space-between;
align-items: center;
padding: 1rem 1.5rem;
border-bottom: 1px solid var(--bs-border-color, rgba(0,0,0,0.1));
background: rgba(0,0,0,0.02);
}
[data-bs-theme="dark"] .hsm-header {
background: rgba(255,255,255,0.02);
border-color: rgba(255,255,255,0.1);
}
.hsm-close {
background: transparent;
border: none;
color: inherit;
opacity: 0.5;
padding: 0.25rem 0.5rem;
border-radius: 0.25rem;
font-size: 1.2rem;
line-height: 1;
transition: opacity 0.2s;
}
.hsm-close:hover {
opacity: 1;
}
.hsm-body {
padding: 1.5rem;
}
.hsm-url-group {
display: flex !important;
align-items: stretch;
}
.hsm-url-group .form-control {
flex: 1;
min-width: 0;
margin: 0;
background: var(--bs-secondary-bg, #f8f9fa);
border-color: var(--bs-border-color, #dee2e6);
border-top-right-radius: 0;
border-bottom-right-radius: 0;
height: 42px;
}
.hsm-url-group .btn {
flex: 0 0 auto;
margin: 0;
margin-left: -1px;
border-top-left-radius: 0;
border-bottom-left-radius: 0;
height: 42px;
display: flex;
align-items: center;
justify-content: center;
padding: 0 1.25rem;
z-index: 2;
}
[data-bs-theme="dark"] .hsm-url-group .form-control {
background: #0f172a;
border-color: #334155;
color: #e2e8f0;
}
.hsm-share-grid {
display: flex;
flex-direction: column;
gap: 0.5rem;
}
.hsm-share-grid .btn {
display: flex;
align-items: center;
justify-content: center;
font-size: 0.9rem;
padding: 0.6rem;
border-color: var(--bs-border-color);
width: 100%;
}
[data-bs-theme="dark"] .hsm-share-grid .btn {
color: #e2e8f0;
border-color: #475569;
}
[data-bs-theme="dark"] .hsm-share-grid .btn:hover {
background: #334155;
border-color: #cbd5e1;
}
</style>
<script>
(function(){
const modal = document.getElementById('headingShareModal');
if(!modal) return;
const input = modal.querySelector('#headingShareInput');
const copyBtn = modal.querySelector('.hsm-copy');
const twitter = modal.querySelector('#share-twitter');
const linkedin = modal.querySelector('#share-linkedin');
const facebook = modal.querySelector('#share-facebook');
const closeBtn = modal.querySelector('.hsm-close');
let lastFocus=null;
let trapBound=false;
function buildUrl(id){ return window.location.origin + window.location.pathname + '#' + id; }
function isOpen(){ return !modal.hasAttribute('hidden'); }
function hydrate(id){
const url=buildUrl(id);
input.value=url;
const enc=encodeURIComponent(url);
const text=encodeURIComponent(document.title);
if(twitter) twitter.href=`https://twitter.com/intent/tweet?url=${enc}&text=${text}`;
if(linkedin) linkedin.href=`https://www.linkedin.com/sharing/share-offsite/?url=${enc}`;
if(facebook) facebook.href=`https://www.facebook.com/sharer/sharer.php?u=${enc}`;
}
function openModal(id){
lastFocus=document.activeElement;
hydrate(id);
if(!isOpen()){
modal.removeAttribute('hidden');
}
requestAnimationFrame(()=>{ input.focus(); });
trapFocus();
}
function closeModal(){
if(!isOpen()) return;
modal.setAttribute('hidden','');
if(lastFocus && typeof lastFocus.focus==='function') lastFocus.focus();
}
function copyCurrent(){
try{ navigator.clipboard.writeText(input.value).then(()=>feedback(true),()=>fallback()); }
catch(e){ fallback(); }
}
function fallback(){ input.select(); try{ document.execCommand('copy'); feedback(true);}catch(e){ feedback(false);} }
function feedback(ok){ if(!copyBtn) return; const icon=copyBtn.querySelector('i'); if(!icon) return; const prev=copyBtn.getAttribute('data-prev')||icon.className; if(!copyBtn.getAttribute('data-prev')) copyBtn.setAttribute('data-prev',prev); icon.className= ok ? 'fa-duotone fa-clipboard-check':'fa-duotone fa-circle-exclamation'; setTimeout(()=>{ icon.className=prev; },1800); }
function handleShareClick(e){ e.preventDefault(); const btn=e.currentTarget; const id=btn.getAttribute('data-share-target'); if(id) openModal(id); }
function bindShareButtons(){
document.querySelectorAll('.h-share').forEach(btn=>{
if(!btn.dataset.hShareBound){ btn.addEventListener('click', handleShareClick); btn.dataset.hShareBound='1'; }
});
}
bindShareButtons();
if(document.readyState==='loading'){
document.addEventListener('DOMContentLoaded', bindShareButtons);
} else {
requestAnimationFrame(bindShareButtons);
}
document.addEventListener('click', function(e){
const shareBtn=e.target.closest && e.target.closest('.h-share');
if(shareBtn && !shareBtn.dataset.hShareBound){ handleShareClick.call(shareBtn, e); }
}, true);
document.addEventListener('click', e=>{
if(e.target===modal) closeModal();
if(e.target.closest && e.target.closest('.hsm-close')){ e.preventDefault(); closeModal(); }
if(copyBtn && (e.target===copyBtn || (e.target.closest && e.target.closest('.hsm-copy')))) { e.preventDefault(); copyCurrent(); }
});
document.addEventListener('keydown', e=>{ if(e.key==='Escape' && isOpen()) closeModal(); });
function trapFocus(){
if(trapBound) return;
trapBound=true;
modal.addEventListener('keydown', f=>{ if(f.key==='Tab' && isOpen()){ const focusable=[...modal.querySelectorAll('a[href],button,input,textarea,select,[tabindex]:not([tabindex="-1"])')].filter(el=>!el.hasAttribute('disabled')); if(!focusable.length) return; const first=focusable[0]; const last=focusable[focusable.length-1]; if(f.shiftKey && document.activeElement===first){ f.preventDefault(); last.focus(); } else if(!f.shiftKey && document.activeElement===last){ f.preventDefault(); first.focus(); } } });
}
if(closeBtn) closeBtn.addEventListener('click', e=>{ e.preventDefault(); closeModal(); });
})();
</script><p>ETL (Extract, Transform, Load) pipelines are critical for integrating data from diverse sources into your Geode graph database. Whether migrating from a relational database, aggregating data from multiple APIs, or streaming real-time events, well-designed ETL processes ensure data arrives in Geode accurately, efficiently, and reliably.</p>
<h3 id="understanding-etl-in-graph-context" class="position-relative d-flex align-items-center group">
<span>Understanding ETL in Graph Context</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="understanding-etl-in-graph-context"
aria-haspopup="dialog"
aria-label="Share link: Understanding ETL in Graph Context">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3><p>Traditional ETL processes are designed for tabular data moving between relational databases. Graph ETL requires additional considerations:</p>
<ul>
<li><strong>Relationship Discovery</strong>: Identifying connections between entities during transformation</li>
<li><strong>Node Deduplication</strong>: Ensuring the same entity isn’t created multiple times</li>
<li><strong>Edge Creation</strong>: Linking nodes based on foreign key relationships or business logic</li>
<li><strong>Graph Schema Mapping</strong>: Converting normalized tables to denormalized graph properties</li>
<li><strong>Traversal Optimization</strong>: Organizing data for efficient graph queries</li>
</ul>
<h3 id="the-etl-process" class="position-relative d-flex align-items-center group">
<span>The ETL Process</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="the-etl-process"
aria-haspopup="dialog"
aria-label="Share link: The ETL Process">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="extract-pulling-data-from-sources" class="position-relative d-flex align-items-center group">
<span>Extract: Pulling Data from Sources</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="extract-pulling-data-from-sources"
aria-haspopup="dialog"
aria-label="Share link: Extract: Pulling Data from Sources">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Extract phase reads data from source systems without modifying the original data.</p>
<p><strong>Common Sources</strong>:</p>
<ul>
<li>Relational databases (PostgreSQL, MySQL, Oracle)</li>
<li>NoSQL databases (MongoDB, Cassandra)</li>
<li>REST APIs</li>
<li>CSV/JSON files</li>
<li>Message queues (Kafka, RabbitMQ)</li>
<li>Data warehouses (Snowflake, BigQuery)</li>
</ul>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Python - Extract from PostgreSQL</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">asyncpg</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">geode_client</span> <span class="kn">import</span> <span class="n">Client</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">extract_from_postgres</span><span class="p">(</span><span class="n">pg_pool</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Extract users and orders from PostgreSQL."""</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">pg_pool</span><span class="o">.</span><span class="n">acquire</span><span class="p">()</span> <span class="k">as</span> <span class="n">conn</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Extract users</span>
</span></span><span class="line"><span class="cl"> <span class="n">users</span> <span class="o">=</span> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">fetch</span><span class="p">(</span><span class="s2">"""
</span></span></span><span class="line"><span class="cl"><span class="s2"> SELECT
</span></span></span><span class="line"><span class="cl"><span class="s2"> user_id,
</span></span></span><span class="line"><span class="cl"><span class="s2"> email,
</span></span></span><span class="line"><span class="cl"><span class="s2"> first_name,
</span></span></span><span class="line"><span class="cl"><span class="s2"> last_name,
</span></span></span><span class="line"><span class="cl"><span class="s2"> created_at,
</span></span></span><span class="line"><span class="cl"><span class="s2"> country,
</span></span></span><span class="line"><span class="cl"><span class="s2"> status
</span></span></span><span class="line"><span class="cl"><span class="s2"> FROM users
</span></span></span><span class="line"><span class="cl"><span class="s2"> WHERE updated_at > $1
</span></span></span><span class="line"><span class="cl"><span class="s2"> """</span><span class="p">,</span> <span class="n">last_sync_time</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Extract orders</span>
</span></span><span class="line"><span class="cl"> <span class="n">orders</span> <span class="o">=</span> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">fetch</span><span class="p">(</span><span class="s2">"""
</span></span></span><span class="line"><span class="cl"><span class="s2"> SELECT
</span></span></span><span class="line"><span class="cl"><span class="s2"> order_id,
</span></span></span><span class="line"><span class="cl"><span class="s2"> user_id,
</span></span></span><span class="line"><span class="cl"><span class="s2"> total_amount,
</span></span></span><span class="line"><span class="cl"><span class="s2"> currency,
</span></span></span><span class="line"><span class="cl"><span class="s2"> status,
</span></span></span><span class="line"><span class="cl"><span class="s2"> created_at
</span></span></span><span class="line"><span class="cl"><span class="s2"> FROM orders
</span></span></span><span class="line"><span class="cl"><span class="s2"> WHERE updated_at > $1
</span></span></span><span class="line"><span class="cl"><span class="s2"> """</span><span class="p">,</span> <span class="n">last_sync_time</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Extract order items (relationships)</span>
</span></span><span class="line"><span class="cl"> <span class="n">order_items</span> <span class="o">=</span> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">fetch</span><span class="p">(</span><span class="s2">"""
</span></span></span><span class="line"><span class="cl"><span class="s2"> SELECT
</span></span></span><span class="line"><span class="cl"><span class="s2"> order_id,
</span></span></span><span class="line"><span class="cl"><span class="s2"> product_id,
</span></span></span><span class="line"><span class="cl"><span class="s2"> quantity,
</span></span></span><span class="line"><span class="cl"><span class="s2"> unit_price
</span></span></span><span class="line"><span class="cl"><span class="s2"> FROM order_items
</span></span></span><span class="line"><span class="cl"><span class="s2"> WHERE updated_at > $1
</span></span></span><span class="line"><span class="cl"><span class="s2"> """</span><span class="p">,</span> <span class="n">last_sync_time</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"users"</span><span class="p">:</span> <span class="n">users</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"orders"</span><span class="p">:</span> <span class="n">orders</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"order_items"</span><span class="p">:</span> <span class="n">order_items</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span></code></pre></div><p>Go example with database/sql:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-go" data-lang="go"><span class="line"><span class="cl"><span class="c1">// Go - Extract from MySQL
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="kn">package</span> <span class="nx">main</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s">"context"</span>
</span></span><span class="line"><span class="cl"> <span class="s">"database/sql"</span>
</span></span><span class="line"><span class="cl"> <span class="s">"time"</span>
</span></span><span class="line"><span class="cl"> <span class="nx">_</span> <span class="s">"github.com/go-sql-driver/mysql"</span>
</span></span><span class="line"><span class="cl"><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="kd">type</span> <span class="nx">User</span> <span class="kd">struct</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="nx">ID</span> <span class="kt">string</span>
</span></span><span class="line"><span class="cl"> <span class="nx">Email</span> <span class="kt">string</span>
</span></span><span class="line"><span class="cl"> <span class="nx">FirstName</span> <span class="kt">string</span>
</span></span><span class="line"><span class="cl"> <span class="nx">LastName</span> <span class="kt">string</span>
</span></span><span class="line"><span class="cl"> <span class="nx">CreatedAt</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Time</span>
</span></span><span class="line"><span class="cl"><span class="p">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="kd">func</span> <span class="nf">extractUsers</span><span class="p">(</span><span class="nx">ctx</span> <span class="nx">context</span><span class="p">.</span><span class="nx">Context</span><span class="p">,</span> <span class="nx">db</span> <span class="o">*</span><span class="nx">sql</span><span class="p">.</span><span class="nx">DB</span><span class="p">,</span> <span class="nx">since</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Time</span><span class="p">)</span> <span class="p">([]</span><span class="nx">User</span><span class="p">,</span> <span class="kt">error</span><span class="p">)</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="nx">rows</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">db</span><span class="p">.</span><span class="nf">QueryContext</span><span class="p">(</span><span class="nx">ctx</span><span class="p">,</span> <span class="s">`
</span></span></span><span class="line"><span class="cl"><span class="s"> SELECT user_id, email, first_name, last_name, created_at
</span></span></span><span class="line"><span class="cl"><span class="s"> FROM users
</span></span></span><span class="line"><span class="cl"><span class="s"> WHERE updated_at > ?
</span></span></span><span class="line"><span class="cl"><span class="s"> ORDER BY updated_at ASC
</span></span></span><span class="line"><span class="cl"><span class="s"> `</span><span class="p">,</span> <span class="nx">since</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="kc">nil</span><span class="p">,</span> <span class="nx">err</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl"> <span class="k">defer</span> <span class="nx">rows</span><span class="p">.</span><span class="nf">Close</span><span class="p">()</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="kd">var</span> <span class="nx">users</span> <span class="p">[]</span><span class="nx">User</span>
</span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="nx">rows</span><span class="p">.</span><span class="nf">Next</span><span class="p">()</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="kd">var</span> <span class="nx">u</span> <span class="nx">User</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">rows</span><span class="p">.</span><span class="nf">Scan</span><span class="p">(</span><span class="o">&</span><span class="nx">u</span><span class="p">.</span><span class="nx">ID</span><span class="p">,</span> <span class="o">&</span><span class="nx">u</span><span class="p">.</span><span class="nx">Email</span><span class="p">,</span> <span class="o">&</span><span class="nx">u</span><span class="p">.</span><span class="nx">FirstName</span><span class="p">,</span> <span class="o">&</span><span class="nx">u</span><span class="p">.</span><span class="nx">LastName</span><span class="p">,</span> <span class="o">&</span><span class="nx">u</span><span class="p">.</span><span class="nx">CreatedAt</span><span class="p">);</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="kc">nil</span><span class="p">,</span> <span class="nx">err</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl"> <span class="nx">users</span> <span class="p">=</span> <span class="nb">append</span><span class="p">(</span><span class="nx">users</span><span class="p">,</span> <span class="nx">u</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="nx">users</span><span class="p">,</span> <span class="nx">rows</span><span class="p">.</span><span class="nf">Err</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"><span class="p">}</span>
</span></span></code></pre></div>
<h4 id="transform-converting-to-graph-structure" class="position-relative d-flex align-items-center group">
<span>Transform: Converting to Graph Structure</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="transform-converting-to-graph-structure"
aria-haspopup="dialog"
aria-label="Share link: Transform: Converting to Graph Structure">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Transform phase converts source data into nodes, edges, and properties suitable for Geode.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Python - Transform relational data to graph structure</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Dict</span><span class="p">,</span> <span class="n">List</span><span class="p">,</span> <span class="n">Any</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">datetime</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">transform_user</span><span class="p">(</span><span class="n">raw_user</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">])</span> <span class="o">-></span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Transform PostgreSQL user record to Geode Person node."""</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"labels"</span><span class="p">:</span> <span class="p">[</span><span class="s2">"Person"</span><span class="p">,</span> <span class="s2">"Customer"</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"properties"</span><span class="p">:</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"id"</span><span class="p">:</span> <span class="n">raw_user</span><span class="p">[</span><span class="s1">'user_id'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"email"</span><span class="p">:</span> <span class="n">raw_user</span><span class="p">[</span><span class="s1">'email'</span><span class="p">]</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span><span class="o">.</span><span class="n">strip</span><span class="p">(),</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"name"</span><span class="p">:</span> <span class="sa">f</span><span class="s2">"</span><span class="si">{</span><span class="n">raw_user</span><span class="p">[</span><span class="s1">'first_name'</span><span class="p">]</span><span class="si">}</span><span class="s2"> </span><span class="si">{</span><span class="n">raw_user</span><span class="p">[</span><span class="s1">'last_name'</span><span class="p">]</span><span class="si">}</span><span class="s2">"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"first_name"</span><span class="p">:</span> <span class="n">raw_user</span><span class="p">[</span><span class="s1">'first_name'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"last_name"</span><span class="p">:</span> <span class="n">raw_user</span><span class="p">[</span><span class="s1">'last_name'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"country"</span><span class="p">:</span> <span class="n">raw_user</span><span class="p">[</span><span class="s1">'country'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"status"</span><span class="p">:</span> <span class="n">raw_user</span><span class="p">[</span><span class="s1">'status'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"created_at"</span><span class="p">:</span> <span class="n">raw_user</span><span class="p">[</span><span class="s1">'created_at'</span><span class="p">]</span><span class="o">.</span><span class="n">isoformat</span><span class="p">(),</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"imported_at"</span><span class="p">:</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">transform_order</span><span class="p">(</span><span class="n">raw_order</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">])</span> <span class="o">-></span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Transform order record to Geode Order node."""</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"labels"</span><span class="p">:</span> <span class="p">[</span><span class="s2">"Order"</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"properties"</span><span class="p">:</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"id"</span><span class="p">:</span> <span class="n">raw_order</span><span class="p">[</span><span class="s1">'order_id'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"total"</span><span class="p">:</span> <span class="nb">float</span><span class="p">(</span><span class="n">raw_order</span><span class="p">[</span><span class="s1">'total_amount'</span><span class="p">]),</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"currency"</span><span class="p">:</span> <span class="n">raw_order</span><span class="p">[</span><span class="s1">'currency'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"status"</span><span class="p">:</span> <span class="n">raw_order</span><span class="p">[</span><span class="s1">'status'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"created_at"</span><span class="p">:</span> <span class="n">raw_order</span><span class="p">[</span><span class="s1">'created_at'</span><span class="p">]</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">transform_order_relationship</span><span class="p">(</span><span class="n">raw_order</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">])</span> <span class="o">-></span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Create PLACED edge between Person and Order."""</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"type"</span><span class="p">:</span> <span class="s2">"PLACED"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"from_id"</span><span class="p">:</span> <span class="n">raw_order</span><span class="p">[</span><span class="s1">'user_id'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"from_label"</span><span class="p">:</span> <span class="s2">"Person"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"to_id"</span><span class="p">:</span> <span class="n">raw_order</span><span class="p">[</span><span class="s1">'order_id'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"to_label"</span><span class="p">:</span> <span class="s2">"Order"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"properties"</span><span class="p">:</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"placed_at"</span><span class="p">:</span> <span class="n">raw_order</span><span class="p">[</span><span class="s1">'created_at'</span><span class="p">]</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">transform_order_item_relationship</span><span class="p">(</span><span class="n">raw_item</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">])</span> <span class="o">-></span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Create CONTAINS edge between Order and Product."""</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"type"</span><span class="p">:</span> <span class="s2">"CONTAINS"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"from_id"</span><span class="p">:</span> <span class="n">raw_item</span><span class="p">[</span><span class="s1">'order_id'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"from_label"</span><span class="p">:</span> <span class="s2">"Order"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"to_id"</span><span class="p">:</span> <span class="n">raw_item</span><span class="p">[</span><span class="s1">'product_id'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"to_label"</span><span class="p">:</span> <span class="s2">"Product"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"properties"</span><span class="p">:</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"quantity"</span><span class="p">:</span> <span class="n">raw_item</span><span class="p">[</span><span class="s1">'quantity'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"unit_price"</span><span class="p">:</span> <span class="nb">float</span><span class="p">(</span><span class="n">raw_item</span><span class="p">[</span><span class="s1">'unit_price'</span><span class="p">])</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span></code></pre></div><p>Rust example with type safety:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-rust" data-lang="rust"><span class="line"><span class="cl"><span class="c1">// Rust - Transform with type-safe builders
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">use</span><span class="w"> </span><span class="n">serde</span>::<span class="p">{</span><span class="n">Deserialize</span><span class="p">,</span><span class="w"> </span><span class="n">Serialize</span><span class="p">};</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">use</span><span class="w"> </span><span class="n">chrono</span>::<span class="p">{</span><span class="n">DateTime</span><span class="p">,</span><span class="w"> </span><span class="n">Utc</span><span class="p">};</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="cp">#[derive(Deserialize)]</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">struct</span> <span class="nc">RawUser</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">user_id</span>: <span class="nb">String</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">email</span>: <span class="nb">String</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">first_name</span>: <span class="nb">String</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">last_name</span>: <span class="nb">String</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">created_at</span>: <span class="nc">DateTime</span><span class="o"><</span><span class="n">Utc</span><span class="o">></span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="cp">#[derive(Serialize)]</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">struct</span> <span class="nc">PersonNode</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">labels</span>: <span class="nb">Vec</span><span class="o"><</span><span class="nb">String</span><span class="o">></span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">properties</span>: <span class="nc">PersonProperties</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="cp">#[derive(Serialize)]</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">struct</span> <span class="nc">PersonProperties</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">id</span>: <span class="nb">String</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">email</span>: <span class="nb">String</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">name</span>: <span class="nb">String</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">created_at</span>: <span class="nb">String</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">fn</span> <span class="nf">transform_user</span><span class="p">(</span><span class="n">raw</span>: <span class="nc">RawUser</span><span class="p">)</span><span class="w"> </span>-> <span class="nc">PersonNode</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">PersonNode</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">labels</span>: <span class="nc">vec</span><span class="o">!</span><span class="p">[</span><span class="s">"Person"</span><span class="p">.</span><span class="n">into</span><span class="p">(),</span><span class="w"> </span><span class="s">"Customer"</span><span class="p">.</span><span class="n">into</span><span class="p">()],</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">properties</span>: <span class="nc">PersonProperties</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">id</span>: <span class="nc">raw</span><span class="p">.</span><span class="n">user_id</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">email</span>: <span class="nc">raw</span><span class="p">.</span><span class="n">email</span><span class="p">.</span><span class="n">to_lowercase</span><span class="p">().</span><span class="n">trim</span><span class="p">().</span><span class="n">to_string</span><span class="p">(),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">name</span>: <span class="nc">format</span><span class="o">!</span><span class="p">(</span><span class="s">"{} {}"</span><span class="p">,</span><span class="w"> </span><span class="n">raw</span><span class="p">.</span><span class="n">first_name</span><span class="p">,</span><span class="w"> </span><span class="n">raw</span><span class="p">.</span><span class="n">last_name</span><span class="p">),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">created_at</span>: <span class="nc">raw</span><span class="p">.</span><span class="n">created_at</span><span class="p">.</span><span class="n">to_rfc3339</span><span class="p">(),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">},</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div>
<h4 id="load-inserting-into-geode" class="position-relative d-flex align-items-center group">
<span>Load: Inserting into Geode</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="load-inserting-into-geode"
aria-haspopup="dialog"
aria-label="Share link: Load: Inserting into Geode">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Load phase efficiently inserts transformed data into Geode with proper error handling and retry logic.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Python - Batch load with transaction support</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">geode_client</span> <span class="kn">import</span> <span class="n">Client</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">asyncio</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">load_to_geode</span><span class="p">(</span><span class="n">client</span><span class="p">:</span> <span class="n">Client</span><span class="p">,</span> <span class="n">nodes</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="n">Dict</span><span class="p">],</span> <span class="n">edges</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="n">Dict</span><span class="p">]):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Load nodes and edges to Geode in batches."""</span>
</span></span><span class="line"><span class="cl"> <span class="n">batch_size</span> <span class="o">=</span> <span class="mi">1000</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">connection</span><span class="p">()</span> <span class="k">as</span> <span class="n">tx</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">tx</span><span class="o">.</span><span class="n">begin</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Load nodes in batches</span>
</span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">nodes</span><span class="p">),</span> <span class="n">batch_size</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="n">batch</span> <span class="o">=</span> <span class="n">nodes</span><span class="p">[</span><span class="n">i</span><span class="p">:</span><span class="n">i</span> <span class="o">+</span> <span class="n">batch_size</span><span class="p">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Use MERGE to handle duplicates idempotently</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">tx</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""UNWIND $batch AS row
</span></span></span><span class="line"><span class="cl"><span class="s2"> MERGE (n:Person {id: row.id})
</span></span></span><span class="line"><span class="cl"><span class="s2"> SET n.email = row.email,
</span></span></span><span class="line"><span class="cl"><span class="s2"> n.name = row.name,
</span></span></span><span class="line"><span class="cl"><span class="s2"> n.created_at = row.created_at,
</span></span></span><span class="line"><span class="cl"><span class="s2"> n.updated_at = NOW()
</span></span></span><span class="line"><span class="cl"><span class="s2"> """</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="p">{</span><span class="s2">"batch"</span><span class="p">:</span> <span class="p">[</span><span class="n">node</span><span class="p">[</span><span class="s1">'properties'</span><span class="p">]</span> <span class="k">for</span> <span class="n">node</span> <span class="ow">in</span> <span class="n">batch</span><span class="p">]}</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Loaded </span><span class="si">{</span><span class="n">i</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="n">batch</span><span class="p">)</span><span class="si">}</span><span class="s2">/</span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">nodes</span><span class="p">)</span><span class="si">}</span><span class="s2"> nodes"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Load edges after all nodes exist</span>
</span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">edges</span><span class="p">),</span> <span class="n">batch_size</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="n">batch</span> <span class="o">=</span> <span class="n">edges</span><span class="p">[</span><span class="n">i</span><span class="p">:</span><span class="n">i</span> <span class="o">+</span> <span class="n">batch_size</span><span class="p">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">tx</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""UNWIND $batch AS row
</span></span></span><span class="line"><span class="cl"><span class="s2"> MATCH (from:Person {id: row.from_id})
</span></span></span><span class="line"><span class="cl"><span class="s2"> MATCH (to:Order {id: row.to_id})
</span></span></span><span class="line"><span class="cl"><span class="s2"> MERGE (from)-[r:PLACED]->(to)
</span></span></span><span class="line"><span class="cl"><span class="s2"> SET r.placed_at = row.placed_at
</span></span></span><span class="line"><span class="cl"><span class="s2"> """</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="p">{</span><span class="s2">"batch"</span><span class="p">:</span> <span class="p">[</span><span class="n">edge</span><span class="p">[</span><span class="s1">'properties'</span><span class="p">]</span> <span class="k">for</span> <span class="n">edge</span> <span class="ow">in</span> <span class="n">batch</span><span class="p">]}</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Loaded </span><span class="si">{</span><span class="n">i</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="n">batch</span><span class="p">)</span><span class="si">}</span><span class="s2">/</span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">edges</span><span class="p">)</span><span class="si">}</span><span class="s2"> edges"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">tx</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
</span></span></code></pre></div>
<h3 id="incremental-vs-full-load" class="position-relative d-flex align-items-center group">
<span>Incremental vs Full Load</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="incremental-vs-full-load"
aria-haspopup="dialog"
aria-label="Share link: Incremental vs Full Load">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="full-load" class="position-relative d-flex align-items-center group">
<span>Full Load</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="full-load"
aria-haspopup="dialog"
aria-label="Share link: Full Load">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Complete replacement of data, typically used for initial migration:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">full_load_etl</span><span class="p">(</span><span class="n">pg_pool</span><span class="p">,</span> <span class="n">geode_client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Full load: replace all data."""</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Extract all data</span>
</span></span><span class="line"><span class="cl"> <span class="n">data</span> <span class="o">=</span> <span class="k">await</span> <span class="n">extract_all_from_postgres</span><span class="p">(</span><span class="n">pg_pool</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Transform</span>
</span></span><span class="line"><span class="cl"> <span class="n">nodes</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_user</span><span class="p">(</span><span class="n">user</span><span class="p">)</span> <span class="k">for</span> <span class="n">user</span> <span class="ow">in</span> <span class="n">data</span><span class="p">[</span><span class="s1">'users'</span><span class="p">]]</span>
</span></span><span class="line"><span class="cl"> <span class="n">edges</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_order_relationship</span><span class="p">(</span><span class="n">order</span><span class="p">)</span> <span class="k">for</span> <span class="n">order</span> <span class="ow">in</span> <span class="n">data</span><span class="p">[</span><span class="s1">'orders'</span><span class="p">]]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Clear existing data (optional)</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">geode_client</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="s2">"MATCH (n:Person) DELETE n"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Load all</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">load_to_geode</span><span class="p">(</span><span class="n">geode_client</span><span class="p">,</span> <span class="n">nodes</span><span class="p">,</span> <span class="n">edges</span><span class="p">)</span>
</span></span></code></pre></div>
<h4 id="incremental-load-change-data-capture" class="position-relative d-flex align-items-center group">
<span>Incremental Load (Change Data Capture)</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="incremental-load-change-data-capture"
aria-haspopup="dialog"
aria-label="Share link: Incremental Load (Change Data Capture)">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Only load changed records since last sync:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">json</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">datetime</span><span class="p">,</span> <span class="n">timedelta</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">pathlib</span> <span class="kn">import</span> <span class="n">Path</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">IncrementalETL</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">state_file</span><span class="p">:</span> <span class="nb">str</span> <span class="o">=</span> <span class="s2">"etl_state.json"</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state_file</span> <span class="o">=</span> <span class="n">Path</span><span class="p">(</span><span class="n">state_file</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="nf">get_last_sync_time</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-></span> <span class="n">datetime</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Get timestamp of last successful sync."""</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state_file</span><span class="o">.</span><span class="n">exists</span><span class="p">():</span>
</span></span><span class="line"><span class="cl"> <span class="n">state</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">state_file</span><span class="o">.</span><span class="n">read_text</span><span class="p">())</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">datetime</span><span class="o">.</span><span class="n">fromisoformat</span><span class="p">(</span><span class="n">state</span><span class="p">[</span><span class="s1">'last_sync'</span><span class="p">])</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">datetime</span><span class="o">.</span><span class="n">min</span> <span class="c1"># First run, sync all data</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="nf">save_sync_time</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timestamp</span><span class="p">:</span> <span class="n">datetime</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Save successful sync timestamp."""</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state_file</span><span class="o">.</span><span class="n">write_text</span><span class="p">(</span><span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">({</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'last_sync'</span><span class="p">:</span> <span class="n">timestamp</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="p">}))</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">incremental_load</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pg_pool</span><span class="p">,</span> <span class="n">geode_client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Load only changed records since last sync."""</span>
</span></span><span class="line"><span class="cl"> <span class="n">sync_start</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="n">last_sync</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_last_sync_time</span><span class="p">()</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Syncing changes since </span><span class="si">{</span><span class="n">last_sync</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Extract only changed records</span>
</span></span><span class="line"><span class="cl"> <span class="n">data</span> <span class="o">=</span> <span class="k">await</span> <span class="n">extract_from_postgres</span><span class="p">(</span><span class="n">pg_pool</span><span class="p">,</span> <span class="n">since</span><span class="o">=</span><span class="n">last_sync</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Extracted </span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">data</span><span class="p">[</span><span class="s1">'users'</span><span class="p">])</span><span class="si">}</span><span class="s2"> users, </span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">data</span><span class="p">[</span><span class="s1">'orders'</span><span class="p">])</span><span class="si">}</span><span class="s2"> orders"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Transform</span>
</span></span><span class="line"><span class="cl"> <span class="n">nodes</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_user</span><span class="p">(</span><span class="n">user</span><span class="p">)</span> <span class="k">for</span> <span class="n">user</span> <span class="ow">in</span> <span class="n">data</span><span class="p">[</span><span class="s1">'users'</span><span class="p">]]</span>
</span></span><span class="line"><span class="cl"> <span class="n">edges</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_order_relationship</span><span class="p">(</span><span class="n">order</span><span class="p">)</span> <span class="k">for</span> <span class="n">order</span> <span class="ow">in</span> <span class="n">data</span><span class="p">[</span><span class="s1">'orders'</span><span class="p">]]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Load (MERGE handles inserts and updates)</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">load_to_geode</span><span class="p">(</span><span class="n">geode_client</span><span class="p">,</span> <span class="n">nodes</span><span class="p">,</span> <span class="n">edges</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Save state only after successful load</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">save_sync_time</span><span class="p">(</span><span class="n">sync_start</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Sync completed successfully"</span><span class="p">)</span>
</span></span></code></pre></div>
<h3 id="orchestration-with-apache-airflow" class="position-relative d-flex align-items-center group">
<span>Orchestration with Apache Airflow</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="orchestration-with-apache-airflow"
aria-haspopup="dialog"
aria-label="Share link: Orchestration with Apache Airflow">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3><p>Apache Airflow is industry-standard for scheduling and monitoring ETL pipelines.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Airflow DAG for daily ETL</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">airflow</span> <span class="kn">import</span> <span class="n">DAG</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">airflow.operators.python</span> <span class="kn">import</span> <span class="n">PythonOperator</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">airflow.utils.dates</span> <span class="kn">import</span> <span class="n">days_ago</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">timedelta</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">asyncio</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">default_args</span> <span class="o">=</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'owner'</span><span class="p">:</span> <span class="s1">'data-team'</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'depends_on_past'</span><span class="p">:</span> <span class="kc">False</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'email'</span><span class="p">:</span> <span class="p">[</span><span class="s1">'[email protected]'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'email_on_failure'</span><span class="p">:</span> <span class="kc">True</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'email_on_retry'</span><span class="p">:</span> <span class="kc">False</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'retries'</span><span class="p">:</span> <span class="mi">3</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'retry_delay'</span><span class="p">:</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">minutes</span><span class="o">=</span><span class="mi">5</span><span class="p">),</span>
</span></span><span class="line"><span class="cl"><span class="p">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">dag</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'geode_etl_daily'</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">default_args</span><span class="o">=</span><span class="n">default_args</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">description</span><span class="o">=</span><span class="s1">'Daily ETL from PostgreSQL to Geode'</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">schedule_interval</span><span class="o">=</span><span class="s1">'0 2 * * *'</span><span class="p">,</span> <span class="c1"># Run at 2 AM daily</span>
</span></span><span class="line"><span class="cl"> <span class="n">start_date</span><span class="o">=</span><span class="n">days_ago</span><span class="p">(</span><span class="mi">1</span><span class="p">),</span>
</span></span><span class="line"><span class="cl"> <span class="n">tags</span><span class="o">=</span><span class="p">[</span><span class="s1">'etl'</span><span class="p">,</span> <span class="s1">'geode'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="n">catchup</span><span class="o">=</span><span class="kc">False</span>
</span></span><span class="line"><span class="cl"><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">extract_task</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Extract data from source."""</span>
</span></span><span class="line"><span class="cl"> <span class="n">data</span> <span class="o">=</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">run</span><span class="p">(</span><span class="n">extract_from_postgres</span><span class="p">(</span><span class="n">pg_pool</span><span class="p">))</span>
</span></span><span class="line"><span class="cl"> <span class="n">context</span><span class="p">[</span><span class="s1">'task_instance'</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_push</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="s1">'extracted_data'</span><span class="p">,</span> <span class="n">value</span><span class="o">=</span><span class="n">data</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="sa">f</span><span class="s2">"Extracted </span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">data</span><span class="p">[</span><span class="s1">'users'</span><span class="p">])</span><span class="si">}</span><span class="s2"> users"</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">transform_task</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Transform extracted data."""</span>
</span></span><span class="line"><span class="cl"> <span class="n">data</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">'task_instance'</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_pull</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="s1">'extracted_data'</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="n">nodes</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_user</span><span class="p">(</span><span class="n">u</span><span class="p">)</span> <span class="k">for</span> <span class="n">u</span> <span class="ow">in</span> <span class="n">data</span><span class="p">[</span><span class="s1">'users'</span><span class="p">]]</span>
</span></span><span class="line"><span class="cl"> <span class="n">edges</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_order_relationship</span><span class="p">(</span><span class="n">o</span><span class="p">)</span> <span class="k">for</span> <span class="n">o</span> <span class="ow">in</span> <span class="n">data</span><span class="p">[</span><span class="s1">'orders'</span><span class="p">]]</span>
</span></span><span class="line"><span class="cl"> <span class="n">context</span><span class="p">[</span><span class="s1">'task_instance'</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_push</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="s1">'transformed_data'</span><span class="p">,</span> <span class="n">value</span><span class="o">=</span><span class="p">{</span><span class="s1">'nodes'</span><span class="p">:</span> <span class="n">nodes</span><span class="p">,</span> <span class="s1">'edges'</span><span class="p">:</span> <span class="n">edges</span><span class="p">})</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="sa">f</span><span class="s2">"Transformed </span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">nodes</span><span class="p">)</span><span class="si">}</span><span class="s2"> nodes, </span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">edges</span><span class="p">)</span><span class="si">}</span><span class="s2"> edges"</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">load_task</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Load data to Geode."""</span>
</span></span><span class="line"><span class="cl"> <span class="n">data</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">'task_instance'</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_pull</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="s1">'transformed_data'</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="n">asyncio</span><span class="o">.</span><span class="n">run</span><span class="p">(</span><span class="n">load_to_geode</span><span class="p">(</span><span class="n">geode_client</span><span class="p">,</span> <span class="n">data</span><span class="p">[</span><span class="s1">'nodes'</span><span class="p">],</span> <span class="n">data</span><span class="p">[</span><span class="s1">'edges'</span><span class="p">]))</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="sa">f</span><span class="s2">"Loaded successfully"</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">validate_task</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Validate data quality after load."""</span>
</span></span><span class="line"><span class="cl"> <span class="n">result</span> <span class="o">=</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">run</span><span class="p">(</span><span class="n">geode_client</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="s2">"""
</span></span></span><span class="line"><span class="cl"><span class="s2"> MATCH (p:Person)
</span></span></span><span class="line"><span class="cl"><span class="s2"> WHERE p.email IS NULL OR p.email !~ '^[^@]+@[^@]+\.[^@]+$'
</span></span></span><span class="line"><span class="cl"><span class="s2"> RETURN count(p) AS invalid_count
</span></span></span><span class="line"><span class="cl"><span class="s2"> """</span><span class="p">))</span>
</span></span><span class="line"><span class="cl"> <span class="n">invalid</span> <span class="o">=</span> <span class="n">result</span><span class="o">.</span><span class="n">bindings</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="s1">'invalid_count'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">invalid</span> <span class="o">></span> <span class="mi">0</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Found </span><span class="si">{</span><span class="n">invalid</span><span class="si">}</span><span class="s2"> persons with invalid emails"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="sa">f</span><span class="s2">"Validation passed"</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Define task dependencies</span>
</span></span><span class="line"><span class="cl"><span class="n">extract</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'extract'</span><span class="p">,</span> <span class="n">python_callable</span><span class="o">=</span><span class="n">extract_task</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"><span class="n">transform</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'transform'</span><span class="p">,</span> <span class="n">python_callable</span><span class="o">=</span><span class="n">transform_task</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"><span class="n">load</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'load'</span><span class="p">,</span> <span class="n">python_callable</span><span class="o">=</span><span class="n">load_task</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"><span class="n">validate</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'validate'</span><span class="p">,</span> <span class="n">python_callable</span><span class="o">=</span><span class="n">validate_task</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Pipeline: extract -> transform -> load -> validate</span>
</span></span><span class="line"><span class="cl"><span class="n">extract</span> <span class="o">>></span> <span class="n">transform</span> <span class="o">>></span> <span class="n">load</span> <span class="o">>></span> <span class="n">validate</span>
</span></span></code></pre></div>
<h3 id="real-time-streaming-etl" class="position-relative d-flex align-items-center group">
<span>Real-Time Streaming ETL</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="real-time-streaming-etl"
aria-haspopup="dialog"
aria-label="Share link: Real-Time Streaming ETL">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3><p>For real-time data integration, use streaming ETL with Kafka or similar:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Python - Streaming ETL with Kafka</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">kafka</span> <span class="kn">import</span> <span class="n">KafkaConsumer</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">geode_client</span> <span class="kn">import</span> <span class="n">Client</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">json</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">streaming_etl</span><span class="p">():</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Process events from Kafka in real-time."""</span>
</span></span><span class="line"><span class="cl"> <span class="n">consumer</span> <span class="o">=</span> <span class="n">KafkaConsumer</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'user-events'</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">bootstrap_servers</span><span class="o">=</span><span class="p">[</span><span class="s1">'localhost:9092'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="n">value_deserializer</span><span class="o">=</span><span class="k">lambda</span> <span class="n">m</span><span class="p">:</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">m</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">)),</span>
</span></span><span class="line"><span class="cl"> <span class="n">group_id</span><span class="o">=</span><span class="s1">'geode-etl'</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">auto_offset_reset</span><span class="o">=</span><span class="s1">'earliest'</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="n">client</span> <span class="o">=</span> <span class="n">Client</span><span class="p">(</span><span class="n">host</span><span class="o">=</span><span class="s2">"localhost"</span><span class="p">,</span> <span class="n">port</span><span class="o">=</span><span class="mi">3141</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">connection</span><span class="p">()</span> <span class="k">as</span> <span class="n">conn</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">consumer</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">event</span> <span class="o">=</span> <span class="n">message</span><span class="o">.</span><span class="n">value</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">event</span><span class="p">[</span><span class="s1">'type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'user.created'</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">node</span> <span class="o">=</span> <span class="n">transform_user</span><span class="p">(</span><span class="n">event</span><span class="p">[</span><span class="s1">'data'</span><span class="p">])</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""MERGE (p:Person {id: $id})
</span></span></span><span class="line"><span class="cl"><span class="s2"> SET p += $properties"""</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="p">{</span><span class="s2">"id"</span><span class="p">:</span> <span class="n">node</span><span class="p">[</span><span class="s1">'properties'</span><span class="p">][</span><span class="s1">'id'</span><span class="p">],</span> <span class="s2">"properties"</span><span class="p">:</span> <span class="n">node</span><span class="p">[</span><span class="s1">'properties'</span><span class="p">]}</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event</span><span class="p">[</span><span class="s1">'type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'order.placed'</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">order_node</span> <span class="o">=</span> <span class="n">transform_order</span><span class="p">(</span><span class="n">event</span><span class="p">[</span><span class="s1">'data'</span><span class="p">])</span>
</span></span><span class="line"><span class="cl"> <span class="n">relationship</span> <span class="o">=</span> <span class="n">transform_order_relationship</span><span class="p">(</span><span class="n">event</span><span class="p">[</span><span class="s1">'data'</span><span class="p">])</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""MERGE (o:Order {id: $order_id})
</span></span></span><span class="line"><span class="cl"><span class="s2"> SET o += $order_props
</span></span></span><span class="line"><span class="cl"><span class="s2"> WITH o
</span></span></span><span class="line"><span class="cl"><span class="s2"> MATCH (p:Person {id: $user_id})
</span></span></span><span class="line"><span class="cl"><span class="s2"> MERGE (p)-[r:PLACED]->(o)
</span></span></span><span class="line"><span class="cl"><span class="s2"> SET r.placed_at = $placed_at"""</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"order_id"</span><span class="p">:</span> <span class="n">order_node</span><span class="p">[</span><span class="s1">'properties'</span><span class="p">][</span><span class="s1">'id'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"order_props"</span><span class="p">:</span> <span class="n">order_node</span><span class="p">[</span><span class="s1">'properties'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"user_id"</span><span class="p">:</span> <span class="n">relationship</span><span class="p">[</span><span class="s1">'from_id'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"placed_at"</span><span class="p">:</span> <span class="n">relationship</span><span class="p">[</span><span class="s1">'properties'</span><span class="p">][</span><span class="s1">'placed_at'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Commit offset after successful processing</span>
</span></span><span class="line"><span class="cl"> <span class="n">consumer</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Error processing event: </span><span class="si">{</span><span class="n">e</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Don't commit offset, retry on next poll</span>
</span></span></code></pre></div>
<h3 id="error-handling-and-recovery" class="position-relative d-flex align-items-center group">
<span>Error Handling and Recovery</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="error-handling-and-recovery"
aria-haspopup="dialog"
aria-label="Share link: Error Handling and Recovery">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Python - Robust error handling</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">logging</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">tenacity</span> <span class="kn">import</span> <span class="n">retry</span><span class="p">,</span> <span class="n">stop_after_attempt</span><span class="p">,</span> <span class="n">wait_exponential</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">logging</span><span class="o">.</span><span class="n">basicConfig</span><span class="p">(</span><span class="n">level</span><span class="o">=</span><span class="n">logging</span><span class="o">.</span><span class="n">INFO</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"><span class="n">logger</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="nd">@retry</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="n">stop</span><span class="o">=</span><span class="n">stop_after_attempt</span><span class="p">(</span><span class="mi">3</span><span class="p">),</span>
</span></span><span class="line"><span class="cl"> <span class="n">wait</span><span class="o">=</span><span class="n">wait_exponential</span><span class="p">(</span><span class="n">multiplier</span><span class="o">=</span><span class="mi">1</span><span class="p">,</span> <span class="nb">min</span><span class="o">=</span><span class="mi">4</span><span class="p">,</span> <span class="nb">max</span><span class="o">=</span><span class="mi">10</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"><span class="p">)</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">load_batch_with_retry</span><span class="p">(</span><span class="n">client</span><span class="p">,</span> <span class="n">batch</span><span class="p">,</span> <span class="n">batch_num</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Load batch with exponential backoff retry."""</span>
</span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"UNWIND $batch AS row</span><span class="se">\n</span><span class="s2">"</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"MERGE (n:Person {id: row.id})</span><span class="se">\n</span><span class="s2">"</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"SET n += row.properties"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="p">{</span><span class="s2">"batch"</span><span class="p">:</span> <span class="n">batch</span><span class="p">}</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Batch </span><span class="si">{</span><span class="n">batch_num</span><span class="si">}</span><span class="s2"> loaded successfully"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Batch </span><span class="si">{</span><span class="n">batch_num</span><span class="si">}</span><span class="s2"> failed: </span><span class="si">{</span><span class="n">e</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">raise</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">resilient_etl</span><span class="p">(</span><span class="n">pg_pool</span><span class="p">,</span> <span class="n">geode_client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""ETL with comprehensive error handling."""</span>
</span></span><span class="line"><span class="cl"> <span class="n">failed_batches</span> <span class="o">=</span> <span class="p">[]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">data</span> <span class="o">=</span> <span class="k">await</span> <span class="n">extract_from_postgres</span><span class="p">(</span><span class="n">pg_pool</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Extract failed: </span><span class="si">{</span><span class="n">e</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="n">send_alert</span><span class="p">(</span><span class="s2">"ETL Extract Failure"</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">))</span>
</span></span><span class="line"><span class="cl"> <span class="k">raise</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="n">nodes</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_user</span><span class="p">(</span><span class="n">u</span><span class="p">)</span> <span class="k">for</span> <span class="n">u</span> <span class="ow">in</span> <span class="n">data</span><span class="p">[</span><span class="s1">'users'</span><span class="p">]]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Process batches with individual error handling</span>
</span></span><span class="line"><span class="cl"> <span class="n">batch_size</span> <span class="o">=</span> <span class="mi">1000</span>
</span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">nodes</span><span class="p">),</span> <span class="n">batch_size</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="n">batch</span> <span class="o">=</span> <span class="n">nodes</span><span class="p">[</span><span class="n">i</span><span class="p">:</span><span class="n">i</span> <span class="o">+</span> <span class="n">batch_size</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"> <span class="n">batch_num</span> <span class="o">=</span> <span class="n">i</span> <span class="o">//</span> <span class="n">batch_size</span> <span class="o">+</span> <span class="mi">1</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">load_batch_with_retry</span><span class="p">(</span><span class="n">geode_client</span><span class="p">,</span> <span class="n">batch</span><span class="p">,</span> <span class="n">batch_num</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Batch </span><span class="si">{</span><span class="n">batch_num</span><span class="si">}</span><span class="s2"> failed after retries: </span><span class="si">{</span><span class="n">e</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="n">failed_batches</span><span class="o">.</span><span class="n">append</span><span class="p">((</span><span class="n">batch_num</span><span class="p">,</span> <span class="n">batch</span><span class="p">))</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Continue processing other batches</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Report failures</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">failed_batches</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="sa">f</span><span class="s2">"</span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">failed_batches</span><span class="p">)</span><span class="si">}</span><span class="s2"> batches failed"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="n">send_alert</span><span class="p">(</span><span class="sa">f</span><span class="s2">"ETL Partial Failure"</span><span class="p">,</span> <span class="sa">f</span><span class="s2">"</span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">failed_batches</span><span class="p">)</span><span class="si">}</span><span class="s2"> batches failed"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">else</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"ETL completed successfully"</span><span class="p">)</span>
</span></span></code></pre></div>
<h3 id="best-practices" class="position-relative d-flex align-items-center group">
<span>Best Practices</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="best-practices"
aria-haspopup="dialog"
aria-label="Share link: Best Practices">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3><ol>
<li><strong>Idempotent Operations</strong>: Use MERGE instead of INSERT to handle re-runs safely</li>
<li><strong>Incremental Loading</strong>: Only process changed data to reduce load time</li>
<li><strong>Batch Processing</strong>: Load data in batches (1000-10000 records) for efficiency</li>
<li><strong>Transaction Management</strong>: Use transactions to ensure atomicity</li>
<li><strong>Error Recovery</strong>: Implement retry logic with exponential backoff</li>
<li><strong>Data Validation</strong>: Validate both before and after loading</li>
<li><strong>Monitoring</strong>: Track ETL metrics (records processed, duration, errors)</li>
<li><strong>State Management</strong>: Track last sync time for incremental loads</li>
<li><strong>Deduplication</strong>: Handle duplicate records gracefully</li>
<li><strong>Schema Versioning</strong>: Version your transformation logic</li>
</ol>
<h3 id="performance-optimization" class="position-relative d-flex align-items-center group">
<span>Performance Optimization</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="performance-optimization"
aria-haspopup="dialog"
aria-label="Share link: Performance Optimization">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3><p><strong>Parallel Processing</strong>: Load independent batches in parallel</p>
<p><strong>Connection Pooling</strong>: Reuse database connections</p>
<p><strong>Bulk Operations</strong>: Use batch inserts instead of individual queries</p>
<p><strong>Index Management</strong>: Ensure target nodes have indexes on lookup keys</p>
<h3 id="troubleshooting" class="position-relative d-flex align-items-center group">
<span>Troubleshooting</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="troubleshooting"
aria-haspopup="dialog"
aria-label="Share link: Troubleshooting">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3><p><strong>Slow extraction</strong>: Add indexes on source database timestamp columns</p>
<p><strong>Memory issues</strong>: Reduce batch size or use streaming approach</p>
<p><strong>Duplicate nodes</strong>: Use MERGE instead of INSERT, ensure unique constraints</p>
<p><strong>Missing relationships</strong>: Verify all referenced nodes exist before creating edges</p>
<h3 id="related-topics" class="position-relative d-flex align-items-center group">
<span>Related Topics</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="related-topics"
aria-haspopup="dialog"
aria-label="Share link: Related Topics">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3><ul>
<li><a
href="/tags/import"
>Import</a>
- Data import utilities</li>
<li><a
href="/tags/migration"
>Migration</a>
- Schema and data migration</li>
<li><a
href="/tags/data-quality"
>Data Quality</a>
- Quality checks in ETL</li>
<li><a
href="/tags/import/"
>Batch Processing</a>
- Batch operation patterns</li>
<li><a
href="/tags/transactions"
>Transactions</a>
- Transactional loading</li>
<li><a
href="/tags/performance"
>Performance</a>
- ETL performance optimization</li>
</ul>
Tag
2 articles
ETL Pipelines & Data Integration
Extract Transform Load pipelines for integrating data into Geode from relational databases, APIs, files, and streaming sources