<!-- CANARY: REQ=REQ-DOCS-001; FEATURE="Docs"; ASPECT=Documentation; STATUS=TESTED; OWNER=docs; UPDATED=2026-01-15 --> <h2 id="etl-pipelines--data-integration" class="position-relative d-flex align-items-center group"> <span>ETL Pipelines &amp;amp; Data Integration</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="etl-pipelines--data-integration" aria-haspopup="dialog" aria-label="Share link: ETL Pipelines &amp;amp; Data Integration"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h2><div id="headingShareModal" class="heading-share-modal" role="dialog" aria-modal="true" aria-labelledby="headingShareTitle" hidden> <div class="hsm-dialog" role="document"> <div class="hsm-header"> <h2 id="headingShareTitle" class="h6 mb-0 fw-bold">Share this section</h2> <button type="button" class="hsm-close" aria-label="Close"> <i class="fa-solid fa-xmark"></i> </button> </div> <div class="hsm-body"> <label for="headingShareInput" class="form-label small text-muted mb-1 text-uppercase fw-bold" style="font-size: 0.7rem; letter-spacing: 0.5px;">Permalink</label> <div class="input-group mb-4 hsm-url-group"> <input id="headingShareInput" type="text" class="form-control font-monospace" readonly aria-readonly="true" style="font-size: 0.85rem;" /> <button class="btn btn-primary hsm-copy" type="button" aria-label="Copy" title="Copy"> <i class="fa-duotone fa-clipboard" aria-hidden="true"></i> </button> </div> <div class="small fw-bold mb-2 text-muted text-uppercase" style="font-size: 0.7rem; letter-spacing: 0.5px;">Share via</div> <div class="hsm-share-grid"> <a id="share-twitter" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer"> <i class="fa-brands fa-twitter me-2"></i>Twitter </a> <a id="share-linkedin" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer"> <i class="fa-brands fa-linkedin me-2"></i>LinkedIn </a> <a id="share-facebook" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer"> <i class="fa-brands fa-facebook me-2"></i>Facebook </a> </div> </div> </div> </div> <style> .heading-share-modal { position: fixed; inset: 0; display: flex; justify-content: center; align-items: center; background: rgba(0, 0, 0, 0.6); z-index: 1050; padding: 1rem; backdrop-filter: blur(4px); -webkit-backdrop-filter: blur(4px); } .heading-share-modal[hidden] { display: none !important; } .hsm-dialog { max-width: 420px; width: 100%; background: var(--bs-body-bg, #fff); color: var(--bs-body-color, #212529); border: 1px solid var(--bs-border-color, rgba(0,0,0,0.1)); border-radius: 1rem; box-shadow: 0 25px 50px -12px rgba(0, 0, 0, 0.25); overflow: hidden; animation: hsm-fade-in 0.2s ease-out; } @keyframes hsm-fade-in { from { opacity: 0; transform: scale(0.95); } to { opacity: 1; transform: scale(1); } } [data-bs-theme="dark"] .hsm-dialog { background: #1e293b; border-color: rgba(255,255,255,0.1); color: #f8f9fa; } .hsm-header { display: flex; justify-content: space-between; align-items: center; padding: 1rem 1.5rem; border-bottom: 1px solid var(--bs-border-color, rgba(0,0,0,0.1)); background: rgba(0,0,0,0.02); } [data-bs-theme="dark"] .hsm-header { background: rgba(255,255,255,0.02); border-color: rgba(255,255,255,0.1); } .hsm-close { background: transparent; border: none; color: inherit; opacity: 0.5; padding: 0.25rem 0.5rem; border-radius: 0.25rem; font-size: 1.2rem; line-height: 1; transition: opacity 0.2s; } .hsm-close:hover { opacity: 1; } .hsm-body { padding: 1.5rem; } .hsm-url-group { display: flex !important; align-items: stretch; } .hsm-url-group .form-control { flex: 1; min-width: 0; margin: 0; background: var(--bs-secondary-bg, #f8f9fa); border-color: var(--bs-border-color, #dee2e6); border-top-right-radius: 0; border-bottom-right-radius: 0; height: 42px; } .hsm-url-group .btn { flex: 0 0 auto; margin: 0; margin-left: -1px; border-top-left-radius: 0; border-bottom-left-radius: 0; height: 42px; display: flex; align-items: center; justify-content: center; padding: 0 1.25rem; z-index: 2; } [data-bs-theme="dark"] .hsm-url-group .form-control { background: #0f172a; border-color: #334155; color: #e2e8f0; } .hsm-share-grid { display: flex; flex-direction: column; gap: 0.5rem; } .hsm-share-grid .btn { display: flex; align-items: center; justify-content: center; font-size: 0.9rem; padding: 0.6rem; border-color: var(--bs-border-color); width: 100%; } [data-bs-theme="dark"] .hsm-share-grid .btn { color: #e2e8f0; border-color: #475569; } [data-bs-theme="dark"] .hsm-share-grid .btn:hover { background: #334155; border-color: #cbd5e1; } </style> <script> (function(){ const modal = document.getElementById('headingShareModal'); if(!modal) return; const input = modal.querySelector('#headingShareInput'); const copyBtn = modal.querySelector('.hsm-copy'); const twitter = modal.querySelector('#share-twitter'); const linkedin = modal.querySelector('#share-linkedin'); const facebook = modal.querySelector('#share-facebook'); const closeBtn = modal.querySelector('.hsm-close'); let lastFocus=null; let trapBound=false; function buildUrl(id){ return window.location.origin + window.location.pathname + '#' + id; } function isOpen(){ return !modal.hasAttribute('hidden'); } function hydrate(id){ const url=buildUrl(id); input.value=url; const enc=encodeURIComponent(url); const text=encodeURIComponent(document.title); if(twitter) twitter.href=`https://twitter.com/intent/tweet?url=${enc}&text=${text}`; if(linkedin) linkedin.href=`https://www.linkedin.com/sharing/share-offsite/?url=${enc}`; if(facebook) facebook.href=`https://www.facebook.com/sharer/sharer.php?u=${enc}`; } function openModal(id){ lastFocus=document.activeElement; hydrate(id); if(!isOpen()){ modal.removeAttribute('hidden'); } requestAnimationFrame(()=>{ input.focus(); }); trapFocus(); } function closeModal(){ if(!isOpen()) return; modal.setAttribute('hidden',''); if(lastFocus && typeof lastFocus.focus==='function') lastFocus.focus(); } function copyCurrent(){ try{ navigator.clipboard.writeText(input.value).then(()=>feedback(true),()=>fallback()); } catch(e){ fallback(); } } function fallback(){ input.select(); try{ document.execCommand('copy'); feedback(true);}catch(e){ feedback(false);} } function feedback(ok){ if(!copyBtn) return; const icon=copyBtn.querySelector('i'); if(!icon) return; const prev=copyBtn.getAttribute('data-prev')||icon.className; if(!copyBtn.getAttribute('data-prev')) copyBtn.setAttribute('data-prev',prev); icon.className= ok ? 'fa-duotone fa-clipboard-check':'fa-duotone fa-circle-exclamation'; setTimeout(()=>{ icon.className=prev; },1800); } function handleShareClick(e){ e.preventDefault(); const btn=e.currentTarget; const id=btn.getAttribute('data-share-target'); if(id) openModal(id); } function bindShareButtons(){ document.querySelectorAll('.h-share').forEach(btn=>{ if(!btn.dataset.hShareBound){ btn.addEventListener('click', handleShareClick); btn.dataset.hShareBound='1'; } }); } bindShareButtons(); if(document.readyState==='loading'){ document.addEventListener('DOMContentLoaded', bindShareButtons); } else { requestAnimationFrame(bindShareButtons); } document.addEventListener('click', function(e){ const shareBtn=e.target.closest && e.target.closest('.h-share'); if(shareBtn && !shareBtn.dataset.hShareBound){ handleShareClick.call(shareBtn, e); } }, true); document.addEventListener('click', e=>{ if(e.target===modal) closeModal(); if(e.target.closest && e.target.closest('.hsm-close')){ e.preventDefault(); closeModal(); } if(copyBtn && (e.target===copyBtn || (e.target.closest && e.target.closest('.hsm-copy')))) { e.preventDefault(); copyCurrent(); } }); document.addEventListener('keydown', e=>{ if(e.key==='Escape' && isOpen()) closeModal(); }); function trapFocus(){ if(trapBound) return; trapBound=true; modal.addEventListener('keydown', f=>{ if(f.key==='Tab' && isOpen()){ const focusable=[...modal.querySelectorAll('a[href],button,input,textarea,select,[tabindex]:not([tabindex="-1"])')].filter(el=>!el.hasAttribute('disabled')); if(!focusable.length) return; const first=focusable[0]; const last=focusable[focusable.length-1]; if(f.shiftKey && document.activeElement===first){ f.preventDefault(); last.focus(); } else if(!f.shiftKey && document.activeElement===last){ f.preventDefault(); first.focus(); } } }); } if(closeBtn) closeBtn.addEventListener('click', e=>{ e.preventDefault(); closeModal(); }); })(); </script><p>ETL (Extract, Transform, Load) pipelines are critical for integrating data from diverse sources into your Geode graph database. Whether migrating from a relational database, aggregating data from multiple APIs, or streaming real-time events, well-designed ETL processes ensure data arrives in Geode accurately, efficiently, and reliably.</p> <h3 id="understanding-etl-in-graph-context" class="position-relative d-flex align-items-center group"> <span>Understanding ETL in Graph Context</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="understanding-etl-in-graph-context" aria-haspopup="dialog" aria-label="Share link: Understanding ETL in Graph Context"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3><p>Traditional ETL processes are designed for tabular data moving between relational databases. Graph ETL requires additional considerations:</p> <ul> <li><strong>Relationship Discovery</strong>: Identifying connections between entities during transformation</li> <li><strong>Node Deduplication</strong>: Ensuring the same entity isn&rsquo;t created multiple times</li> <li><strong>Edge Creation</strong>: Linking nodes based on foreign key relationships or business logic</li> <li><strong>Graph Schema Mapping</strong>: Converting normalized tables to denormalized graph properties</li> <li><strong>Traversal Optimization</strong>: Organizing data for efficient graph queries</li> </ul> <h3 id="the-etl-process" class="position-relative d-flex align-items-center group"> <span>The ETL Process</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="the-etl-process" aria-haspopup="dialog" aria-label="Share link: The ETL Process"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="extract-pulling-data-from-sources" class="position-relative d-flex align-items-center group"> <span>Extract: Pulling Data from Sources</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="extract-pulling-data-from-sources" aria-haspopup="dialog" aria-label="Share link: Extract: Pulling Data from Sources"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Extract phase reads data from source systems without modifying the original data.</p> <p><strong>Common Sources</strong>:</p> <ul> <li>Relational databases (PostgreSQL, MySQL, Oracle)</li> <li>NoSQL databases (MongoDB, Cassandra)</li> <li>REST APIs</li> <li>CSV/JSON files</li> <li>Message queues (Kafka, RabbitMQ)</li> <li>Data warehouses (Snowflake, BigQuery)</li> </ul> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Python - Extract from PostgreSQL</span> </span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">asyncpg</span> </span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">geode_client</span> <span class="kn">import</span> <span class="n">Client</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">extract_from_postgres</span><span class="p">(</span><span class="n">pg_pool</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Extract users and orders from PostgreSQL.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">pg_pool</span><span class="o">.</span><span class="n">acquire</span><span class="p">()</span> <span class="k">as</span> <span class="n">conn</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Extract users</span> </span></span><span class="line"><span class="cl"> <span class="n">users</span> <span class="o">=</span> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">fetch</span><span class="p">(</span><span class="s2">&#34;&#34;&#34; </span></span></span><span class="line"><span class="cl"><span class="s2"> SELECT </span></span></span><span class="line"><span class="cl"><span class="s2"> user_id, </span></span></span><span class="line"><span class="cl"><span class="s2"> email, </span></span></span><span class="line"><span class="cl"><span class="s2"> first_name, </span></span></span><span class="line"><span class="cl"><span class="s2"> last_name, </span></span></span><span class="line"><span class="cl"><span class="s2"> created_at, </span></span></span><span class="line"><span class="cl"><span class="s2"> country, </span></span></span><span class="line"><span class="cl"><span class="s2"> status </span></span></span><span class="line"><span class="cl"><span class="s2"> FROM users </span></span></span><span class="line"><span class="cl"><span class="s2"> WHERE updated_at &gt; $1 </span></span></span><span class="line"><span class="cl"><span class="s2"> &#34;&#34;&#34;</span><span class="p">,</span> <span class="n">last_sync_time</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Extract orders</span> </span></span><span class="line"><span class="cl"> <span class="n">orders</span> <span class="o">=</span> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">fetch</span><span class="p">(</span><span class="s2">&#34;&#34;&#34; </span></span></span><span class="line"><span class="cl"><span class="s2"> SELECT </span></span></span><span class="line"><span class="cl"><span class="s2"> order_id, </span></span></span><span class="line"><span class="cl"><span class="s2"> user_id, </span></span></span><span class="line"><span class="cl"><span class="s2"> total_amount, </span></span></span><span class="line"><span class="cl"><span class="s2"> currency, </span></span></span><span class="line"><span class="cl"><span class="s2"> status, </span></span></span><span class="line"><span class="cl"><span class="s2"> created_at </span></span></span><span class="line"><span class="cl"><span class="s2"> FROM orders </span></span></span><span class="line"><span class="cl"><span class="s2"> WHERE updated_at &gt; $1 </span></span></span><span class="line"><span class="cl"><span class="s2"> &#34;&#34;&#34;</span><span class="p">,</span> <span class="n">last_sync_time</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Extract order items (relationships)</span> </span></span><span class="line"><span class="cl"> <span class="n">order_items</span> <span class="o">=</span> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">fetch</span><span class="p">(</span><span class="s2">&#34;&#34;&#34; </span></span></span><span class="line"><span class="cl"><span class="s2"> SELECT </span></span></span><span class="line"><span class="cl"><span class="s2"> order_id, </span></span></span><span class="line"><span class="cl"><span class="s2"> product_id, </span></span></span><span class="line"><span class="cl"><span class="s2"> quantity, </span></span></span><span class="line"><span class="cl"><span class="s2"> unit_price </span></span></span><span class="line"><span class="cl"><span class="s2"> FROM order_items </span></span></span><span class="line"><span class="cl"><span class="s2"> WHERE updated_at &gt; $1 </span></span></span><span class="line"><span class="cl"><span class="s2"> &#34;&#34;&#34;</span><span class="p">,</span> <span class="n">last_sync_time</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;users&#34;</span><span class="p">:</span> <span class="n">users</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;orders&#34;</span><span class="p">:</span> <span class="n">orders</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;order_items&#34;</span><span class="p">:</span> <span class="n">order_items</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span></code></pre></div><p>Go example with database/sql:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-go" data-lang="go"><span class="line"><span class="cl"><span class="c1">// Go - Extract from MySQL </span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="kn">package</span> <span class="nx">main</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s">&#34;context&#34;</span> </span></span><span class="line"><span class="cl"> <span class="s">&#34;database/sql&#34;</span> </span></span><span class="line"><span class="cl"> <span class="s">&#34;time&#34;</span> </span></span><span class="line"><span class="cl"> <span class="nx">_</span> <span class="s">&#34;github.com/go-sql-driver/mysql&#34;</span> </span></span><span class="line"><span class="cl"><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="kd">type</span> <span class="nx">User</span> <span class="kd">struct</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="nx">ID</span> <span class="kt">string</span> </span></span><span class="line"><span class="cl"> <span class="nx">Email</span> <span class="kt">string</span> </span></span><span class="line"><span class="cl"> <span class="nx">FirstName</span> <span class="kt">string</span> </span></span><span class="line"><span class="cl"> <span class="nx">LastName</span> <span class="kt">string</span> </span></span><span class="line"><span class="cl"> <span class="nx">CreatedAt</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Time</span> </span></span><span class="line"><span class="cl"><span class="p">}</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="kd">func</span> <span class="nf">extractUsers</span><span class="p">(</span><span class="nx">ctx</span> <span class="nx">context</span><span class="p">.</span><span class="nx">Context</span><span class="p">,</span> <span class="nx">db</span> <span class="o">*</span><span class="nx">sql</span><span class="p">.</span><span class="nx">DB</span><span class="p">,</span> <span class="nx">since</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Time</span><span class="p">)</span> <span class="p">([]</span><span class="nx">User</span><span class="p">,</span> <span class="kt">error</span><span class="p">)</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="nx">rows</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">db</span><span class="p">.</span><span class="nf">QueryContext</span><span class="p">(</span><span class="nx">ctx</span><span class="p">,</span> <span class="s">` </span></span></span><span class="line"><span class="cl"><span class="s"> SELECT user_id, email, first_name, last_name, created_at </span></span></span><span class="line"><span class="cl"><span class="s"> FROM users </span></span></span><span class="line"><span class="cl"><span class="s"> WHERE updated_at &gt; ? </span></span></span><span class="line"><span class="cl"><span class="s"> ORDER BY updated_at ASC </span></span></span><span class="line"><span class="cl"><span class="s"> `</span><span class="p">,</span> <span class="nx">since</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="kc">nil</span><span class="p">,</span> <span class="nx">err</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> <span class="k">defer</span> <span class="nx">rows</span><span class="p">.</span><span class="nf">Close</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="kd">var</span> <span class="nx">users</span> <span class="p">[]</span><span class="nx">User</span> </span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="nx">rows</span><span class="p">.</span><span class="nf">Next</span><span class="p">()</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="kd">var</span> <span class="nx">u</span> <span class="nx">User</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">rows</span><span class="p">.</span><span class="nf">Scan</span><span class="p">(</span><span class="o">&amp;</span><span class="nx">u</span><span class="p">.</span><span class="nx">ID</span><span class="p">,</span> <span class="o">&amp;</span><span class="nx">u</span><span class="p">.</span><span class="nx">Email</span><span class="p">,</span> <span class="o">&amp;</span><span class="nx">u</span><span class="p">.</span><span class="nx">FirstName</span><span class="p">,</span> <span class="o">&amp;</span><span class="nx">u</span><span class="p">.</span><span class="nx">LastName</span><span class="p">,</span> <span class="o">&amp;</span><span class="nx">u</span><span class="p">.</span><span class="nx">CreatedAt</span><span class="p">);</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="kc">nil</span><span class="p">,</span> <span class="nx">err</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> <span class="nx">users</span> <span class="p">=</span> <span class="nb">append</span><span class="p">(</span><span class="nx">users</span><span class="p">,</span> <span class="nx">u</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="nx">users</span><span class="p">,</span> <span class="nx">rows</span><span class="p">.</span><span class="nf">Err</span><span class="p">()</span> </span></span><span class="line"><span class="cl"><span class="p">}</span> </span></span></code></pre></div> <h4 id="transform-converting-to-graph-structure" class="position-relative d-flex align-items-center group"> <span>Transform: Converting to Graph Structure</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="transform-converting-to-graph-structure" aria-haspopup="dialog" aria-label="Share link: Transform: Converting to Graph Structure"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Transform phase converts source data into nodes, edges, and properties suitable for Geode.</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Python - Transform relational data to graph structure</span> </span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Dict</span><span class="p">,</span> <span class="n">List</span><span class="p">,</span> <span class="n">Any</span> </span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">datetime</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">transform_user</span><span class="p">(</span><span class="n">raw_user</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Transform PostgreSQL user record to Geode Person node.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;labels&#34;</span><span class="p">:</span> <span class="p">[</span><span class="s2">&#34;Person&#34;</span><span class="p">,</span> <span class="s2">&#34;Customer&#34;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;properties&#34;</span><span class="p">:</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;id&#34;</span><span class="p">:</span> <span class="n">raw_user</span><span class="p">[</span><span class="s1">&#39;user_id&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;email&#34;</span><span class="p">:</span> <span class="n">raw_user</span><span class="p">[</span><span class="s1">&#39;email&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span><span class="o">.</span><span class="n">strip</span><span class="p">(),</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;name&#34;</span><span class="p">:</span> <span class="sa">f</span><span class="s2">&#34;</span><span class="si">{</span><span class="n">raw_user</span><span class="p">[</span><span class="s1">&#39;first_name&#39;</span><span class="p">]</span><span class="si">}</span><span class="s2"> </span><span class="si">{</span><span class="n">raw_user</span><span class="p">[</span><span class="s1">&#39;last_name&#39;</span><span class="p">]</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;first_name&#34;</span><span class="p">:</span> <span class="n">raw_user</span><span class="p">[</span><span class="s1">&#39;first_name&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;last_name&#34;</span><span class="p">:</span> <span class="n">raw_user</span><span class="p">[</span><span class="s1">&#39;last_name&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;country&#34;</span><span class="p">:</span> <span class="n">raw_user</span><span class="p">[</span><span class="s1">&#39;country&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;status&#34;</span><span class="p">:</span> <span class="n">raw_user</span><span class="p">[</span><span class="s1">&#39;status&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;created_at&#34;</span><span class="p">:</span> <span class="n">raw_user</span><span class="p">[</span><span class="s1">&#39;created_at&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">isoformat</span><span class="p">(),</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;imported_at&#34;</span><span class="p">:</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">transform_order</span><span class="p">(</span><span class="n">raw_order</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Transform order record to Geode Order node.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;labels&#34;</span><span class="p">:</span> <span class="p">[</span><span class="s2">&#34;Order&#34;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;properties&#34;</span><span class="p">:</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;id&#34;</span><span class="p">:</span> <span class="n">raw_order</span><span class="p">[</span><span class="s1">&#39;order_id&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;total&#34;</span><span class="p">:</span> <span class="nb">float</span><span class="p">(</span><span class="n">raw_order</span><span class="p">[</span><span class="s1">&#39;total_amount&#39;</span><span class="p">]),</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;currency&#34;</span><span class="p">:</span> <span class="n">raw_order</span><span class="p">[</span><span class="s1">&#39;currency&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;status&#34;</span><span class="p">:</span> <span class="n">raw_order</span><span class="p">[</span><span class="s1">&#39;status&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;created_at&#34;</span><span class="p">:</span> <span class="n">raw_order</span><span class="p">[</span><span class="s1">&#39;created_at&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">transform_order_relationship</span><span class="p">(</span><span class="n">raw_order</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Create PLACED edge between Person and Order.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;type&#34;</span><span class="p">:</span> <span class="s2">&#34;PLACED&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;from_id&#34;</span><span class="p">:</span> <span class="n">raw_order</span><span class="p">[</span><span class="s1">&#39;user_id&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;from_label&#34;</span><span class="p">:</span> <span class="s2">&#34;Person&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;to_id&#34;</span><span class="p">:</span> <span class="n">raw_order</span><span class="p">[</span><span class="s1">&#39;order_id&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;to_label&#34;</span><span class="p">:</span> <span class="s2">&#34;Order&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;properties&#34;</span><span class="p">:</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;placed_at&#34;</span><span class="p">:</span> <span class="n">raw_order</span><span class="p">[</span><span class="s1">&#39;created_at&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">transform_order_item_relationship</span><span class="p">(</span><span class="n">raw_item</span><span class="p">:</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Dict</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">Any</span><span class="p">]:</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Create CONTAINS edge between Order and Product.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;type&#34;</span><span class="p">:</span> <span class="s2">&#34;CONTAINS&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;from_id&#34;</span><span class="p">:</span> <span class="n">raw_item</span><span class="p">[</span><span class="s1">&#39;order_id&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;from_label&#34;</span><span class="p">:</span> <span class="s2">&#34;Order&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;to_id&#34;</span><span class="p">:</span> <span class="n">raw_item</span><span class="p">[</span><span class="s1">&#39;product_id&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;to_label&#34;</span><span class="p">:</span> <span class="s2">&#34;Product&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;properties&#34;</span><span class="p">:</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;quantity&#34;</span><span class="p">:</span> <span class="n">raw_item</span><span class="p">[</span><span class="s1">&#39;quantity&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;unit_price&#34;</span><span class="p">:</span> <span class="nb">float</span><span class="p">(</span><span class="n">raw_item</span><span class="p">[</span><span class="s1">&#39;unit_price&#39;</span><span class="p">])</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span></code></pre></div><p>Rust example with type safety:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-rust" data-lang="rust"><span class="line"><span class="cl"><span class="c1">// Rust - Transform with type-safe builders </span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">use</span><span class="w"> </span><span class="n">serde</span>::<span class="p">{</span><span class="n">Deserialize</span><span class="p">,</span><span class="w"> </span><span class="n">Serialize</span><span class="p">};</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">use</span><span class="w"> </span><span class="n">chrono</span>::<span class="p">{</span><span class="n">DateTime</span><span class="p">,</span><span class="w"> </span><span class="n">Utc</span><span class="p">};</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="cp">#[derive(Deserialize)]</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">struct</span> <span class="nc">RawUser</span><span class="w"> </span><span class="p">{</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">user_id</span>: <span class="nb">String</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">email</span>: <span class="nb">String</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">first_name</span>: <span class="nb">String</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">last_name</span>: <span class="nb">String</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">created_at</span>: <span class="nc">DateTime</span><span class="o">&lt;</span><span class="n">Utc</span><span class="o">&gt;</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="cp">#[derive(Serialize)]</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">struct</span> <span class="nc">PersonNode</span><span class="w"> </span><span class="p">{</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">labels</span>: <span class="nb">Vec</span><span class="o">&lt;</span><span class="nb">String</span><span class="o">&gt;</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">properties</span>: <span class="nc">PersonProperties</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="cp">#[derive(Serialize)]</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">struct</span> <span class="nc">PersonProperties</span><span class="w"> </span><span class="p">{</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">id</span>: <span class="nb">String</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">email</span>: <span class="nb">String</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">name</span>: <span class="nb">String</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">created_at</span>: <span class="nb">String</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">fn</span> <span class="nf">transform_user</span><span class="p">(</span><span class="n">raw</span>: <span class="nc">RawUser</span><span class="p">)</span><span class="w"> </span>-&gt; <span class="nc">PersonNode</span><span class="w"> </span><span class="p">{</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">PersonNode</span><span class="w"> </span><span class="p">{</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">labels</span>: <span class="nc">vec</span><span class="o">!</span><span class="p">[</span><span class="s">&#34;Person&#34;</span><span class="p">.</span><span class="n">into</span><span class="p">(),</span><span class="w"> </span><span class="s">&#34;Customer&#34;</span><span class="p">.</span><span class="n">into</span><span class="p">()],</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">properties</span>: <span class="nc">PersonProperties</span><span class="w"> </span><span class="p">{</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">id</span>: <span class="nc">raw</span><span class="p">.</span><span class="n">user_id</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">email</span>: <span class="nc">raw</span><span class="p">.</span><span class="n">email</span><span class="p">.</span><span class="n">to_lowercase</span><span class="p">().</span><span class="n">trim</span><span class="p">().</span><span class="n">to_string</span><span class="p">(),</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">name</span>: <span class="nc">format</span><span class="o">!</span><span class="p">(</span><span class="s">&#34;{} {}&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">raw</span><span class="p">.</span><span class="n">first_name</span><span class="p">,</span><span class="w"> </span><span class="n">raw</span><span class="p">.</span><span class="n">last_name</span><span class="p">),</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">created_at</span>: <span class="nc">raw</span><span class="p">.</span><span class="n">created_at</span><span class="p">.</span><span class="n">to_rfc3339</span><span class="p">(),</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">},</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w"> </span></span></span></code></pre></div> <h4 id="load-inserting-into-geode" class="position-relative d-flex align-items-center group"> <span>Load: Inserting into Geode</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="load-inserting-into-geode" aria-haspopup="dialog" aria-label="Share link: Load: Inserting into Geode"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Load phase efficiently inserts transformed data into Geode with proper error handling and retry logic.</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Python - Batch load with transaction support</span> </span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">geode_client</span> <span class="kn">import</span> <span class="n">Client</span> </span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">asyncio</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">load_to_geode</span><span class="p">(</span><span class="n">client</span><span class="p">:</span> <span class="n">Client</span><span class="p">,</span> <span class="n">nodes</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="n">Dict</span><span class="p">],</span> <span class="n">edges</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="n">Dict</span><span class="p">]):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Load nodes and edges to Geode in batches.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">batch_size</span> <span class="o">=</span> <span class="mi">1000</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">connection</span><span class="p">()</span> <span class="k">as</span> <span class="n">tx</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">tx</span><span class="o">.</span><span class="n">begin</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Load nodes in batches</span> </span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">nodes</span><span class="p">),</span> <span class="n">batch_size</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="n">batch</span> <span class="o">=</span> <span class="n">nodes</span><span class="p">[</span><span class="n">i</span><span class="p">:</span><span class="n">i</span> <span class="o">+</span> <span class="n">batch_size</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Use MERGE to handle duplicates idempotently</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">tx</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;UNWIND $batch AS row </span></span></span><span class="line"><span class="cl"><span class="s2"> MERGE (n:Person {id: row.id}) </span></span></span><span class="line"><span class="cl"><span class="s2"> SET n.email = row.email, </span></span></span><span class="line"><span class="cl"><span class="s2"> n.name = row.name, </span></span></span><span class="line"><span class="cl"><span class="s2"> n.created_at = row.created_at, </span></span></span><span class="line"><span class="cl"><span class="s2"> n.updated_at = NOW() </span></span></span><span class="line"><span class="cl"><span class="s2"> &#34;&#34;&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="p">{</span><span class="s2">&#34;batch&#34;</span><span class="p">:</span> <span class="p">[</span><span class="n">node</span><span class="p">[</span><span class="s1">&#39;properties&#39;</span><span class="p">]</span> <span class="k">for</span> <span class="n">node</span> <span class="ow">in</span> <span class="n">batch</span><span class="p">]}</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Loaded </span><span class="si">{</span><span class="n">i</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="n">batch</span><span class="p">)</span><span class="si">}</span><span class="s2">/</span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">nodes</span><span class="p">)</span><span class="si">}</span><span class="s2"> nodes&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Load edges after all nodes exist</span> </span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">edges</span><span class="p">),</span> <span class="n">batch_size</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="n">batch</span> <span class="o">=</span> <span class="n">edges</span><span class="p">[</span><span class="n">i</span><span class="p">:</span><span class="n">i</span> <span class="o">+</span> <span class="n">batch_size</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">tx</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;UNWIND $batch AS row </span></span></span><span class="line"><span class="cl"><span class="s2"> MATCH (from:Person {id: row.from_id}) </span></span></span><span class="line"><span class="cl"><span class="s2"> MATCH (to:Order {id: row.to_id}) </span></span></span><span class="line"><span class="cl"><span class="s2"> MERGE (from)-[r:PLACED]-&gt;(to) </span></span></span><span class="line"><span class="cl"><span class="s2"> SET r.placed_at = row.placed_at </span></span></span><span class="line"><span class="cl"><span class="s2"> &#34;&#34;&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="p">{</span><span class="s2">&#34;batch&#34;</span><span class="p">:</span> <span class="p">[</span><span class="n">edge</span><span class="p">[</span><span class="s1">&#39;properties&#39;</span><span class="p">]</span> <span class="k">for</span> <span class="n">edge</span> <span class="ow">in</span> <span class="n">batch</span><span class="p">]}</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Loaded </span><span class="si">{</span><span class="n">i</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="n">batch</span><span class="p">)</span><span class="si">}</span><span class="s2">/</span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">edges</span><span class="p">)</span><span class="si">}</span><span class="s2"> edges&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">tx</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> </span></span></code></pre></div> <h3 id="incremental-vs-full-load" class="position-relative d-flex align-items-center group"> <span>Incremental vs Full Load</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="incremental-vs-full-load" aria-haspopup="dialog" aria-label="Share link: Incremental vs Full Load"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="full-load" class="position-relative d-flex align-items-center group"> <span>Full Load</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="full-load" aria-haspopup="dialog" aria-label="Share link: Full Load"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Complete replacement of data, typically used for initial migration:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">full_load_etl</span><span class="p">(</span><span class="n">pg_pool</span><span class="p">,</span> <span class="n">geode_client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Full load: replace all data.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Extract all data</span> </span></span><span class="line"><span class="cl"> <span class="n">data</span> <span class="o">=</span> <span class="k">await</span> <span class="n">extract_all_from_postgres</span><span class="p">(</span><span class="n">pg_pool</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Transform</span> </span></span><span class="line"><span class="cl"> <span class="n">nodes</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_user</span><span class="p">(</span><span class="n">user</span><span class="p">)</span> <span class="k">for</span> <span class="n">user</span> <span class="ow">in</span> <span class="n">data</span><span class="p">[</span><span class="s1">&#39;users&#39;</span><span class="p">]]</span> </span></span><span class="line"><span class="cl"> <span class="n">edges</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_order_relationship</span><span class="p">(</span><span class="n">order</span><span class="p">)</span> <span class="k">for</span> <span class="n">order</span> <span class="ow">in</span> <span class="n">data</span><span class="p">[</span><span class="s1">&#39;orders&#39;</span><span class="p">]]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Clear existing data (optional)</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">geode_client</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="s2">&#34;MATCH (n:Person) DELETE n&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Load all</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">load_to_geode</span><span class="p">(</span><span class="n">geode_client</span><span class="p">,</span> <span class="n">nodes</span><span class="p">,</span> <span class="n">edges</span><span class="p">)</span> </span></span></code></pre></div> <h4 id="incremental-load-change-data-capture" class="position-relative d-flex align-items-center group"> <span>Incremental Load (Change Data Capture)</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="incremental-load-change-data-capture" aria-haspopup="dialog" aria-label="Share link: Incremental Load (Change Data Capture)"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Only load changed records since last sync:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">json</span> </span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">datetime</span><span class="p">,</span> <span class="n">timedelta</span> </span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">pathlib</span> <span class="kn">import</span> <span class="n">Path</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">IncrementalETL</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">state_file</span><span class="p">:</span> <span class="nb">str</span> <span class="o">=</span> <span class="s2">&#34;etl_state.json&#34;</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state_file</span> <span class="o">=</span> <span class="n">Path</span><span class="p">(</span><span class="n">state_file</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="nf">get_last_sync_time</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">datetime</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Get timestamp of last successful sync.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state_file</span><span class="o">.</span><span class="n">exists</span><span class="p">():</span> </span></span><span class="line"><span class="cl"> <span class="n">state</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">state_file</span><span class="o">.</span><span class="n">read_text</span><span class="p">())</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">datetime</span><span class="o">.</span><span class="n">fromisoformat</span><span class="p">(</span><span class="n">state</span><span class="p">[</span><span class="s1">&#39;last_sync&#39;</span><span class="p">])</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">datetime</span><span class="o">.</span><span class="n">min</span> <span class="c1"># First run, sync all data</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="nf">save_sync_time</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">timestamp</span><span class="p">:</span> <span class="n">datetime</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Save successful sync timestamp.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state_file</span><span class="o">.</span><span class="n">write_text</span><span class="p">(</span><span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">({</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;last_sync&#39;</span><span class="p">:</span> <span class="n">timestamp</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> <span class="p">}))</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">incremental_load</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pg_pool</span><span class="p">,</span> <span class="n">geode_client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Load only changed records since last sync.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">sync_start</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> <span class="n">last_sync</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_last_sync_time</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Syncing changes since </span><span class="si">{</span><span class="n">last_sync</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Extract only changed records</span> </span></span><span class="line"><span class="cl"> <span class="n">data</span> <span class="o">=</span> <span class="k">await</span> <span class="n">extract_from_postgres</span><span class="p">(</span><span class="n">pg_pool</span><span class="p">,</span> <span class="n">since</span><span class="o">=</span><span class="n">last_sync</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Extracted </span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">data</span><span class="p">[</span><span class="s1">&#39;users&#39;</span><span class="p">])</span><span class="si">}</span><span class="s2"> users, </span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">data</span><span class="p">[</span><span class="s1">&#39;orders&#39;</span><span class="p">])</span><span class="si">}</span><span class="s2"> orders&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Transform</span> </span></span><span class="line"><span class="cl"> <span class="n">nodes</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_user</span><span class="p">(</span><span class="n">user</span><span class="p">)</span> <span class="k">for</span> <span class="n">user</span> <span class="ow">in</span> <span class="n">data</span><span class="p">[</span><span class="s1">&#39;users&#39;</span><span class="p">]]</span> </span></span><span class="line"><span class="cl"> <span class="n">edges</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_order_relationship</span><span class="p">(</span><span class="n">order</span><span class="p">)</span> <span class="k">for</span> <span class="n">order</span> <span class="ow">in</span> <span class="n">data</span><span class="p">[</span><span class="s1">&#39;orders&#39;</span><span class="p">]]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Load (MERGE handles inserts and updates)</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">load_to_geode</span><span class="p">(</span><span class="n">geode_client</span><span class="p">,</span> <span class="n">nodes</span><span class="p">,</span> <span class="n">edges</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Save state only after successful load</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">save_sync_time</span><span class="p">(</span><span class="n">sync_start</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Sync completed successfully&#34;</span><span class="p">)</span> </span></span></code></pre></div> <h3 id="orchestration-with-apache-airflow" class="position-relative d-flex align-items-center group"> <span>Orchestration with Apache Airflow</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="orchestration-with-apache-airflow" aria-haspopup="dialog" aria-label="Share link: Orchestration with Apache Airflow"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3><p>Apache Airflow is industry-standard for scheduling and monitoring ETL pipelines.</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Airflow DAG for daily ETL</span> </span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">airflow</span> <span class="kn">import</span> <span class="n">DAG</span> </span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">airflow.operators.python</span> <span class="kn">import</span> <span class="n">PythonOperator</span> </span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">airflow.utils.dates</span> <span class="kn">import</span> <span class="n">days_ago</span> </span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">timedelta</span> </span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">asyncio</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="n">default_args</span> <span class="o">=</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;owner&#39;</span><span class="p">:</span> <span class="s1">&#39;data-team&#39;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;depends_on_past&#39;</span><span class="p">:</span> <span class="kc">False</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;email&#39;</span><span class="p">:</span> <span class="p">[</span><span class="s1">&#39;[email protected]&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;email_on_failure&#39;</span><span class="p">:</span> <span class="kc">True</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;email_on_retry&#39;</span><span class="p">:</span> <span class="kc">False</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;retries&#39;</span><span class="p">:</span> <span class="mi">3</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;retry_delay&#39;</span><span class="p">:</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">minutes</span><span class="o">=</span><span class="mi">5</span><span class="p">),</span> </span></span><span class="line"><span class="cl"><span class="p">}</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="n">dag</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;geode_etl_daily&#39;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">default_args</span><span class="o">=</span><span class="n">default_args</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">description</span><span class="o">=</span><span class="s1">&#39;Daily ETL from PostgreSQL to Geode&#39;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">schedule_interval</span><span class="o">=</span><span class="s1">&#39;0 2 * * *&#39;</span><span class="p">,</span> <span class="c1"># Run at 2 AM daily</span> </span></span><span class="line"><span class="cl"> <span class="n">start_date</span><span class="o">=</span><span class="n">days_ago</span><span class="p">(</span><span class="mi">1</span><span class="p">),</span> </span></span><span class="line"><span class="cl"> <span class="n">tags</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;etl&#39;</span><span class="p">,</span> <span class="s1">&#39;geode&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="n">catchup</span><span class="o">=</span><span class="kc">False</span> </span></span><span class="line"><span class="cl"><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">extract_task</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Extract data from source.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">data</span> <span class="o">=</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">run</span><span class="p">(</span><span class="n">extract_from_postgres</span><span class="p">(</span><span class="n">pg_pool</span><span class="p">))</span> </span></span><span class="line"><span class="cl"> <span class="n">context</span><span class="p">[</span><span class="s1">&#39;task_instance&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_push</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="s1">&#39;extracted_data&#39;</span><span class="p">,</span> <span class="n">value</span><span class="o">=</span><span class="n">data</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="sa">f</span><span class="s2">&#34;Extracted </span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">data</span><span class="p">[</span><span class="s1">&#39;users&#39;</span><span class="p">])</span><span class="si">}</span><span class="s2"> users&#34;</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">transform_task</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Transform extracted data.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">data</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">&#39;task_instance&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_pull</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="s1">&#39;extracted_data&#39;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="n">nodes</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_user</span><span class="p">(</span><span class="n">u</span><span class="p">)</span> <span class="k">for</span> <span class="n">u</span> <span class="ow">in</span> <span class="n">data</span><span class="p">[</span><span class="s1">&#39;users&#39;</span><span class="p">]]</span> </span></span><span class="line"><span class="cl"> <span class="n">edges</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_order_relationship</span><span class="p">(</span><span class="n">o</span><span class="p">)</span> <span class="k">for</span> <span class="n">o</span> <span class="ow">in</span> <span class="n">data</span><span class="p">[</span><span class="s1">&#39;orders&#39;</span><span class="p">]]</span> </span></span><span class="line"><span class="cl"> <span class="n">context</span><span class="p">[</span><span class="s1">&#39;task_instance&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_push</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="s1">&#39;transformed_data&#39;</span><span class="p">,</span> <span class="n">value</span><span class="o">=</span><span class="p">{</span><span class="s1">&#39;nodes&#39;</span><span class="p">:</span> <span class="n">nodes</span><span class="p">,</span> <span class="s1">&#39;edges&#39;</span><span class="p">:</span> <span class="n">edges</span><span class="p">})</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="sa">f</span><span class="s2">&#34;Transformed </span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">nodes</span><span class="p">)</span><span class="si">}</span><span class="s2"> nodes, </span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">edges</span><span class="p">)</span><span class="si">}</span><span class="s2"> edges&#34;</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">load_task</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Load data to Geode.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">data</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">&#39;task_instance&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_pull</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="s1">&#39;transformed_data&#39;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="n">asyncio</span><span class="o">.</span><span class="n">run</span><span class="p">(</span><span class="n">load_to_geode</span><span class="p">(</span><span class="n">geode_client</span><span class="p">,</span> <span class="n">data</span><span class="p">[</span><span class="s1">&#39;nodes&#39;</span><span class="p">],</span> <span class="n">data</span><span class="p">[</span><span class="s1">&#39;edges&#39;</span><span class="p">]))</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="sa">f</span><span class="s2">&#34;Loaded successfully&#34;</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">validate_task</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Validate data quality after load.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">result</span> <span class="o">=</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">run</span><span class="p">(</span><span class="n">geode_client</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="s2">&#34;&#34;&#34; </span></span></span><span class="line"><span class="cl"><span class="s2"> MATCH (p:Person) </span></span></span><span class="line"><span class="cl"><span class="s2"> WHERE p.email IS NULL OR p.email !~ &#39;^[^@]+@[^@]+\.[^@]+$&#39; </span></span></span><span class="line"><span class="cl"><span class="s2"> RETURN count(p) AS invalid_count </span></span></span><span class="line"><span class="cl"><span class="s2"> &#34;&#34;&#34;</span><span class="p">))</span> </span></span><span class="line"><span class="cl"> <span class="n">invalid</span> <span class="o">=</span> <span class="n">result</span><span class="o">.</span><span class="n">bindings</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="s1">&#39;invalid_count&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">invalid</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Found </span><span class="si">{</span><span class="n">invalid</span><span class="si">}</span><span class="s2"> persons with invalid emails&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="sa">f</span><span class="s2">&#34;Validation passed&#34;</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># Define task dependencies</span> </span></span><span class="line"><span class="cl"><span class="n">extract</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">&#39;extract&#39;</span><span class="p">,</span> <span class="n">python_callable</span><span class="o">=</span><span class="n">extract_task</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> </span></span><span class="line"><span class="cl"><span class="n">transform</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">&#39;transform&#39;</span><span class="p">,</span> <span class="n">python_callable</span><span class="o">=</span><span class="n">transform_task</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> </span></span><span class="line"><span class="cl"><span class="n">load</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">&#39;load&#39;</span><span class="p">,</span> <span class="n">python_callable</span><span class="o">=</span><span class="n">load_task</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> </span></span><span class="line"><span class="cl"><span class="n">validate</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">&#39;validate&#39;</span><span class="p">,</span> <span class="n">python_callable</span><span class="o">=</span><span class="n">validate_task</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># Pipeline: extract -&gt; transform -&gt; load -&gt; validate</span> </span></span><span class="line"><span class="cl"><span class="n">extract</span> <span class="o">&gt;&gt;</span> <span class="n">transform</span> <span class="o">&gt;&gt;</span> <span class="n">load</span> <span class="o">&gt;&gt;</span> <span class="n">validate</span> </span></span></code></pre></div> <h3 id="real-time-streaming-etl" class="position-relative d-flex align-items-center group"> <span>Real-Time Streaming ETL</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="real-time-streaming-etl" aria-haspopup="dialog" aria-label="Share link: Real-Time Streaming ETL"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3><p>For real-time data integration, use streaming ETL with Kafka or similar:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Python - Streaming ETL with Kafka</span> </span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">kafka</span> <span class="kn">import</span> <span class="n">KafkaConsumer</span> </span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">geode_client</span> <span class="kn">import</span> <span class="n">Client</span> </span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">json</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">streaming_etl</span><span class="p">():</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Process events from Kafka in real-time.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">consumer</span> <span class="o">=</span> <span class="n">KafkaConsumer</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;user-events&#39;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">bootstrap_servers</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;localhost:9092&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="n">value_deserializer</span><span class="o">=</span><span class="k">lambda</span> <span class="n">m</span><span class="p">:</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">m</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">)),</span> </span></span><span class="line"><span class="cl"> <span class="n">group_id</span><span class="o">=</span><span class="s1">&#39;geode-etl&#39;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">auto_offset_reset</span><span class="o">=</span><span class="s1">&#39;earliest&#39;</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="n">client</span> <span class="o">=</span> <span class="n">Client</span><span class="p">(</span><span class="n">host</span><span class="o">=</span><span class="s2">&#34;localhost&#34;</span><span class="p">,</span> <span class="n">port</span><span class="o">=</span><span class="mi">3141</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">connection</span><span class="p">()</span> <span class="k">as</span> <span class="n">conn</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">consumer</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">event</span> <span class="o">=</span> <span class="n">message</span><span class="o">.</span><span class="n">value</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;user.created&#39;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">node</span> <span class="o">=</span> <span class="n">transform_user</span><span class="p">(</span><span class="n">event</span><span class="p">[</span><span class="s1">&#39;data&#39;</span><span class="p">])</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;MERGE (p:Person {id: $id}) </span></span></span><span class="line"><span class="cl"><span class="s2"> SET p += $properties&#34;&#34;&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="p">{</span><span class="s2">&#34;id&#34;</span><span class="p">:</span> <span class="n">node</span><span class="p">[</span><span class="s1">&#39;properties&#39;</span><span class="p">][</span><span class="s1">&#39;id&#39;</span><span class="p">],</span> <span class="s2">&#34;properties&#34;</span><span class="p">:</span> <span class="n">node</span><span class="p">[</span><span class="s1">&#39;properties&#39;</span><span class="p">]}</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;order.placed&#39;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">order_node</span> <span class="o">=</span> <span class="n">transform_order</span><span class="p">(</span><span class="n">event</span><span class="p">[</span><span class="s1">&#39;data&#39;</span><span class="p">])</span> </span></span><span class="line"><span class="cl"> <span class="n">relationship</span> <span class="o">=</span> <span class="n">transform_order_relationship</span><span class="p">(</span><span class="n">event</span><span class="p">[</span><span class="s1">&#39;data&#39;</span><span class="p">])</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;MERGE (o:Order {id: $order_id}) </span></span></span><span class="line"><span class="cl"><span class="s2"> SET o += $order_props </span></span></span><span class="line"><span class="cl"><span class="s2"> WITH o </span></span></span><span class="line"><span class="cl"><span class="s2"> MATCH (p:Person {id: $user_id}) </span></span></span><span class="line"><span class="cl"><span class="s2"> MERGE (p)-[r:PLACED]-&gt;(o) </span></span></span><span class="line"><span class="cl"><span class="s2"> SET r.placed_at = $placed_at&#34;&#34;&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;order_id&#34;</span><span class="p">:</span> <span class="n">order_node</span><span class="p">[</span><span class="s1">&#39;properties&#39;</span><span class="p">][</span><span class="s1">&#39;id&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;order_props&#34;</span><span class="p">:</span> <span class="n">order_node</span><span class="p">[</span><span class="s1">&#39;properties&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;user_id&#34;</span><span class="p">:</span> <span class="n">relationship</span><span class="p">[</span><span class="s1">&#39;from_id&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;placed_at&#34;</span><span class="p">:</span> <span class="n">relationship</span><span class="p">[</span><span class="s1">&#39;properties&#39;</span><span class="p">][</span><span class="s1">&#39;placed_at&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Commit offset after successful processing</span> </span></span><span class="line"><span class="cl"> <span class="n">consumer</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Error processing event: </span><span class="si">{</span><span class="n">e</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Don&#39;t commit offset, retry on next poll</span> </span></span></code></pre></div> <h3 id="error-handling-and-recovery" class="position-relative d-flex align-items-center group"> <span>Error Handling and Recovery</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="error-handling-and-recovery" aria-haspopup="dialog" aria-label="Share link: Error Handling and Recovery"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Python - Robust error handling</span> </span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">logging</span> </span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">tenacity</span> <span class="kn">import</span> <span class="n">retry</span><span class="p">,</span> <span class="n">stop_after_attempt</span><span class="p">,</span> <span class="n">wait_exponential</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="n">logging</span><span class="o">.</span><span class="n">basicConfig</span><span class="p">(</span><span class="n">level</span><span class="o">=</span><span class="n">logging</span><span class="o">.</span><span class="n">INFO</span><span class="p">)</span> </span></span><span class="line"><span class="cl"><span class="n">logger</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="vm">__name__</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="nd">@retry</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="n">stop</span><span class="o">=</span><span class="n">stop_after_attempt</span><span class="p">(</span><span class="mi">3</span><span class="p">),</span> </span></span><span class="line"><span class="cl"> <span class="n">wait</span><span class="o">=</span><span class="n">wait_exponential</span><span class="p">(</span><span class="n">multiplier</span><span class="o">=</span><span class="mi">1</span><span class="p">,</span> <span class="nb">min</span><span class="o">=</span><span class="mi">4</span><span class="p">,</span> <span class="nb">max</span><span class="o">=</span><span class="mi">10</span><span class="p">)</span> </span></span><span class="line"><span class="cl"><span class="p">)</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">load_batch_with_retry</span><span class="p">(</span><span class="n">client</span><span class="p">,</span> <span class="n">batch</span><span class="p">,</span> <span class="n">batch_num</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Load batch with exponential backoff retry.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;UNWIND $batch AS row</span><span class="se">\n</span><span class="s2">&#34;</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;MERGE (n:Person {id: row.id})</span><span class="se">\n</span><span class="s2">&#34;</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;SET n += row.properties&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="p">{</span><span class="s2">&#34;batch&#34;</span><span class="p">:</span> <span class="n">batch</span><span class="p">}</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Batch </span><span class="si">{</span><span class="n">batch_num</span><span class="si">}</span><span class="s2"> loaded successfully&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Batch </span><span class="si">{</span><span class="n">batch_num</span><span class="si">}</span><span class="s2"> failed: </span><span class="si">{</span><span class="n">e</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">raise</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">resilient_etl</span><span class="p">(</span><span class="n">pg_pool</span><span class="p">,</span> <span class="n">geode_client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;ETL with comprehensive error handling.&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">failed_batches</span> <span class="o">=</span> <span class="p">[]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">data</span> <span class="o">=</span> <span class="k">await</span> <span class="n">extract_from_postgres</span><span class="p">(</span><span class="n">pg_pool</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Extract failed: </span><span class="si">{</span><span class="n">e</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="n">send_alert</span><span class="p">(</span><span class="s2">&#34;ETL Extract Failure&#34;</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">))</span> </span></span><span class="line"><span class="cl"> <span class="k">raise</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="n">nodes</span> <span class="o">=</span> <span class="p">[</span><span class="n">transform_user</span><span class="p">(</span><span class="n">u</span><span class="p">)</span> <span class="k">for</span> <span class="n">u</span> <span class="ow">in</span> <span class="n">data</span><span class="p">[</span><span class="s1">&#39;users&#39;</span><span class="p">]]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Process batches with individual error handling</span> </span></span><span class="line"><span class="cl"> <span class="n">batch_size</span> <span class="o">=</span> <span class="mi">1000</span> </span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="n">nodes</span><span class="p">),</span> <span class="n">batch_size</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="n">batch</span> <span class="o">=</span> <span class="n">nodes</span><span class="p">[</span><span class="n">i</span><span class="p">:</span><span class="n">i</span> <span class="o">+</span> <span class="n">batch_size</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> <span class="n">batch_num</span> <span class="o">=</span> <span class="n">i</span> <span class="o">//</span> <span class="n">batch_size</span> <span class="o">+</span> <span class="mi">1</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">load_batch_with_retry</span><span class="p">(</span><span class="n">geode_client</span><span class="p">,</span> <span class="n">batch</span><span class="p">,</span> <span class="n">batch_num</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Batch </span><span class="si">{</span><span class="n">batch_num</span><span class="si">}</span><span class="s2"> failed after retries: </span><span class="si">{</span><span class="n">e</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="n">failed_batches</span><span class="o">.</span><span class="n">append</span><span class="p">((</span><span class="n">batch_num</span><span class="p">,</span> <span class="n">batch</span><span class="p">))</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Continue processing other batches</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Report failures</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">failed_batches</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;</span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">failed_batches</span><span class="p">)</span><span class="si">}</span><span class="s2"> batches failed&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="n">send_alert</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;ETL Partial Failure&#34;</span><span class="p">,</span> <span class="sa">f</span><span class="s2">&#34;</span><span class="si">{</span><span class="nb">len</span><span class="p">(</span><span class="n">failed_batches</span><span class="p">)</span><span class="si">}</span><span class="s2"> batches failed&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">else</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&#34;ETL completed successfully&#34;</span><span class="p">)</span> </span></span></code></pre></div> <h3 id="best-practices" class="position-relative d-flex align-items-center group"> <span>Best Practices</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="best-practices" aria-haspopup="dialog" aria-label="Share link: Best Practices"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3><ol> <li><strong>Idempotent Operations</strong>: Use MERGE instead of INSERT to handle re-runs safely</li> <li><strong>Incremental Loading</strong>: Only process changed data to reduce load time</li> <li><strong>Batch Processing</strong>: Load data in batches (1000-10000 records) for efficiency</li> <li><strong>Transaction Management</strong>: Use transactions to ensure atomicity</li> <li><strong>Error Recovery</strong>: Implement retry logic with exponential backoff</li> <li><strong>Data Validation</strong>: Validate both before and after loading</li> <li><strong>Monitoring</strong>: Track ETL metrics (records processed, duration, errors)</li> <li><strong>State Management</strong>: Track last sync time for incremental loads</li> <li><strong>Deduplication</strong>: Handle duplicate records gracefully</li> <li><strong>Schema Versioning</strong>: Version your transformation logic</li> </ol> <h3 id="performance-optimization" class="position-relative d-flex align-items-center group"> <span>Performance Optimization</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="performance-optimization" aria-haspopup="dialog" aria-label="Share link: Performance Optimization"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3><p><strong>Parallel Processing</strong>: Load independent batches in parallel</p> <p><strong>Connection Pooling</strong>: Reuse database connections</p> <p><strong>Bulk Operations</strong>: Use batch inserts instead of individual queries</p> <p><strong>Index Management</strong>: Ensure target nodes have indexes on lookup keys</p> <h3 id="troubleshooting" class="position-relative d-flex align-items-center group"> <span>Troubleshooting</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="troubleshooting" aria-haspopup="dialog" aria-label="Share link: Troubleshooting"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3><p><strong>Slow extraction</strong>: Add indexes on source database timestamp columns</p> <p><strong>Memory issues</strong>: Reduce batch size or use streaming approach</p> <p><strong>Duplicate nodes</strong>: Use MERGE instead of INSERT, ensure unique constraints</p> <p><strong>Missing relationships</strong>: Verify all referenced nodes exist before creating edges</p> <h3 id="related-topics" class="position-relative d-flex align-items-center group"> <span>Related Topics</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="related-topics" aria-haspopup="dialog" aria-label="Share link: Related Topics"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3><ul> <li><a href="/tags/import" >Import</a> - Data import utilities</li> <li><a href="/tags/migration" >Migration</a> - Schema and data migration</li> <li><a href="/tags/data-quality" >Data Quality</a> - Quality checks in ETL</li> <li><a href="/tags/import/" >Batch Processing</a> - Batch operation patterns</li> <li><a href="/tags/transactions" >Transactions</a> - Transactional loading</li> <li><a href="/tags/performance" >Performance</a> - ETL performance optimization</li> </ul>

Related Articles