<!-- CANARY: REQ=REQ-DOCS-001; FEATURE="Docs"; ASPECT=Documentation; STATUS=TESTED; OWNER=docs; UPDATED=2026-01-15 --> <h2 id="event-streaming" class="position-relative d-flex align-items-center group"> <span>Event Streaming</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="event-streaming" aria-haspopup="dialog" aria-label="Share link: Event Streaming"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h2><div id="headingShareModal" class="heading-share-modal" role="dialog" aria-modal="true" aria-labelledby="headingShareTitle" hidden> <div class="hsm-dialog" role="document"> <div class="hsm-header"> <h2 id="headingShareTitle" class="h6 mb-0 fw-bold">Share this section</h2> <button type="button" class="hsm-close" aria-label="Close"> <i class="fa-solid fa-xmark"></i> </button> </div> <div class="hsm-body"> <label for="headingShareInput" class="form-label small text-muted mb-1 text-uppercase fw-bold" style="font-size: 0.7rem; letter-spacing: 0.5px;">Permalink</label> <div class="input-group mb-4 hsm-url-group"> <input id="headingShareInput" type="text" class="form-control font-monospace" readonly aria-readonly="true" style="font-size: 0.85rem;" /> <button class="btn btn-primary hsm-copy" type="button" aria-label="Copy" title="Copy"> <i class="fa-duotone fa-clipboard" aria-hidden="true"></i> </button> </div> <div class="small fw-bold mb-2 text-muted text-uppercase" style="font-size: 0.7rem; letter-spacing: 0.5px;">Share via</div> <div class="hsm-share-grid"> <a id="share-twitter" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer"> <i class="fa-brands fa-twitter me-2"></i>Twitter </a> <a id="share-linkedin" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer"> <i class="fa-brands fa-linkedin me-2"></i>LinkedIn </a> <a id="share-facebook" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer"> <i class="fa-brands fa-facebook me-2"></i>Facebook </a> </div> </div> </div> </div> <style> .heading-share-modal { position: fixed; inset: 0; display: flex; justify-content: center; align-items: center; background: rgba(0, 0, 0, 0.6); z-index: 1050; padding: 1rem; backdrop-filter: blur(4px); -webkit-backdrop-filter: blur(4px); } .heading-share-modal[hidden] { display: none !important; } .hsm-dialog { max-width: 420px; width: 100%; background: var(--bs-body-bg, #fff); color: var(--bs-body-color, #212529); border: 1px solid var(--bs-border-color, rgba(0,0,0,0.1)); border-radius: 1rem; box-shadow: 0 25px 50px -12px rgba(0, 0, 0, 0.25); overflow: hidden; animation: hsm-fade-in 0.2s ease-out; } @keyframes hsm-fade-in { from { opacity: 0; transform: scale(0.95); } to { opacity: 1; transform: scale(1); } } [data-bs-theme="dark"] .hsm-dialog { background: #1e293b; border-color: rgba(255,255,255,0.1); color: #f8f9fa; } .hsm-header { display: flex; justify-content: space-between; align-items: center; padding: 1rem 1.5rem; border-bottom: 1px solid var(--bs-border-color, rgba(0,0,0,0.1)); background: rgba(0,0,0,0.02); } [data-bs-theme="dark"] .hsm-header { background: rgba(255,255,255,0.02); border-color: rgba(255,255,255,0.1); } .hsm-close { background: transparent; border: none; color: inherit; opacity: 0.5; padding: 0.25rem 0.5rem; border-radius: 0.25rem; font-size: 1.2rem; line-height: 1; transition: opacity 0.2s; } .hsm-close:hover { opacity: 1; } .hsm-body { padding: 1.5rem; } .hsm-url-group { display: flex !important; align-items: stretch; } .hsm-url-group .form-control { flex: 1; min-width: 0; margin: 0; background: var(--bs-secondary-bg, #f8f9fa); border-color: var(--bs-border-color, #dee2e6); border-top-right-radius: 0; border-bottom-right-radius: 0; height: 42px; } .hsm-url-group .btn { flex: 0 0 auto; margin: 0; margin-left: -1px; border-top-left-radius: 0; border-bottom-left-radius: 0; height: 42px; display: flex; align-items: center; justify-content: center; padding: 0 1.25rem; z-index: 2; } [data-bs-theme="dark"] .hsm-url-group .form-control { background: #0f172a; border-color: #334155; color: #e2e8f0; } .hsm-share-grid { display: flex; flex-direction: column; gap: 0.5rem; } .hsm-share-grid .btn { display: flex; align-items: center; justify-content: center; font-size: 0.9rem; padding: 0.6rem; border-color: var(--bs-border-color); width: 100%; } [data-bs-theme="dark"] .hsm-share-grid .btn { color: #e2e8f0; border-color: #475569; } [data-bs-theme="dark"] .hsm-share-grid .btn:hover { background: #334155; border-color: #cbd5e1; } </style> <script> (function(){ const modal = document.getElementById('headingShareModal'); if(!modal) return; const input = modal.querySelector('#headingShareInput'); const copyBtn = modal.querySelector('.hsm-copy'); const twitter = modal.querySelector('#share-twitter'); const linkedin = modal.querySelector('#share-linkedin'); const facebook = modal.querySelector('#share-facebook'); const closeBtn = modal.querySelector('.hsm-close'); let lastFocus=null; let trapBound=false; function buildUrl(id){ return window.location.origin + window.location.pathname + '#' + id; } function isOpen(){ return !modal.hasAttribute('hidden'); } function hydrate(id){ const url=buildUrl(id); input.value=url; const enc=encodeURIComponent(url); const text=encodeURIComponent(document.title); if(twitter) twitter.href=`https://twitter.com/intent/tweet?url=${enc}&text=${text}`; if(linkedin) linkedin.href=`https://www.linkedin.com/sharing/share-offsite/?url=${enc}`; if(facebook) facebook.href=`https://www.facebook.com/sharer/sharer.php?u=${enc}`; } function openModal(id){ lastFocus=document.activeElement; hydrate(id); if(!isOpen()){ modal.removeAttribute('hidden'); } requestAnimationFrame(()=>{ input.focus(); }); trapFocus(); } function closeModal(){ if(!isOpen()) return; modal.setAttribute('hidden',''); if(lastFocus && typeof lastFocus.focus==='function') lastFocus.focus(); } function copyCurrent(){ try{ navigator.clipboard.writeText(input.value).then(()=>feedback(true),()=>fallback()); } catch(e){ fallback(); } } function fallback(){ input.select(); try{ document.execCommand('copy'); feedback(true);}catch(e){ feedback(false);} } function feedback(ok){ if(!copyBtn) return; const icon=copyBtn.querySelector('i'); if(!icon) return; const prev=copyBtn.getAttribute('data-prev')||icon.className; if(!copyBtn.getAttribute('data-prev')) copyBtn.setAttribute('data-prev',prev); icon.className= ok ? 'fa-duotone fa-clipboard-check':'fa-duotone fa-circle-exclamation'; setTimeout(()=>{ icon.className=prev; },1800); } function handleShareClick(e){ e.preventDefault(); const btn=e.currentTarget; const id=btn.getAttribute('data-share-target'); if(id) openModal(id); } function bindShareButtons(){ document.querySelectorAll('.h-share').forEach(btn=>{ if(!btn.dataset.hShareBound){ btn.addEventListener('click', handleShareClick); btn.dataset.hShareBound='1'; } }); } bindShareButtons(); if(document.readyState==='loading'){ document.addEventListener('DOMContentLoaded', bindShareButtons); } else { requestAnimationFrame(bindShareButtons); } document.addEventListener('click', function(e){ const shareBtn=e.target.closest && e.target.closest('.h-share'); if(shareBtn && !shareBtn.dataset.hShareBound){ handleShareClick.call(shareBtn, e); } }, true); document.addEventListener('click', e=>{ if(e.target===modal) closeModal(); if(e.target.closest && e.target.closest('.hsm-close')){ e.preventDefault(); closeModal(); } if(copyBtn && (e.target===copyBtn || (e.target.closest && e.target.closest('.hsm-copy')))) { e.preventDefault(); copyCurrent(); } }); document.addEventListener('keydown', e=>{ if(e.key==='Escape' && isOpen()) closeModal(); }); function trapFocus(){ if(trapBound) return; trapBound=true; modal.addEventListener('keydown', f=>{ if(f.key==='Tab' && isOpen()){ const focusable=[...modal.querySelectorAll('a[href],button,input,textarea,select,[tabindex]:not([tabindex="-1"])')].filter(el=>!el.hasAttribute('disabled')); if(!focusable.length) return; const first=focusable[0]; const last=focusable[focusable.length-1]; if(f.shiftKey && document.activeElement===first){ f.preventDefault(); last.focus(); } else if(!f.shiftKey && document.activeElement===last){ f.preventDefault(); first.focus(); } } }); } if(closeBtn) closeBtn.addEventListener('click', e=>{ e.preventDefault(); closeModal(); }); })(); </script><p>Event streaming is a paradigm for capturing, storing, and processing continuous flows of events in real-time. Geode&rsquo;s event streaming capabilities enable building reactive, scalable applications that respond to changes as they happen, powering use cases from real-time analytics to complex event processing.</p> <h3 id="event-streaming-fundamentals" class="position-relative d-flex align-items-center group"> <span>Event Streaming Fundamentals</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="event-streaming-fundamentals" aria-haspopup="dialog" aria-label="Share link: Event Streaming Fundamentals"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="what-is-event-streaming" class="position-relative d-flex align-items-center group"> <span>What is Event Streaming?</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="what-is-event-streaming" aria-haspopup="dialog" aria-label="Share link: What is Event Streaming?"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Event streaming treats data as a continuous flow of events:</p> <p><strong>Events</strong> - Immutable facts about what happened <strong>Streams</strong> - Ordered sequences of events <strong>Producers</strong> - Systems that emit events <strong>Consumers</strong> - Systems that process events</p> <h4 id="event-stream-characteristics" class="position-relative d-flex align-items-center group"> <span>Event Stream Characteristics</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="event-stream-characteristics" aria-haspopup="dialog" aria-label="Share link: Event Stream Characteristics"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Key properties of event streams:</p> <ul> <li><strong>Ordered</strong> - Events maintain temporal ordering</li> <li><strong>Immutable</strong> - Events cannot be changed once written</li> <li><strong>Append-Only</strong> - New events added to stream end</li> <li><strong>Retained</strong> - Events stored for configurable duration</li> <li><strong>Replayable</strong> - Consumers can replay historical events</li> </ul> <h3 id="geode-event-streaming" class="position-relative d-flex align-items-center group"> <span>Geode Event Streaming</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="geode-event-streaming" aria-haspopup="dialog" aria-label="Share link: Geode Event Streaming"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="creating-event-streams" class="position-relative d-flex align-items-center group"> <span>Creating Event Streams</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="creating-event-streams" aria-haspopup="dialog" aria-label="Share link: Creating Event Streams"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Define streams to capture graph events:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Create</span><span class="w"> </span><span class="py">stream</span><span class="w"> </span><span class="py">for</span><span class="w"> </span><span class="py">user</span><span class="w"> </span><span class="py">activities</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">user_activities</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">event_time</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">user_id</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">action_type</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">properties</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">user_actions</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="err">--</span><span class="w"> </span><span class="py">Create</span><span class="w"> </span><span class="py">stream</span><span class="w"> </span><span class="py">with</span><span class="w"> </span><span class="py">windowing</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">page_views</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">TUMBLING_WINDOW</span><span class="p">(</span><span class="py">5</span><span class="w"> </span><span class="py">MINUTES</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">window</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">page_url</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">COUNT</span><span class="p">(</span><span class="err">*</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">view_count</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">page_view_events</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">GROUP</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">page_url</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div> <h4 id="publishing-events" class="position-relative d-flex align-items-center group"> <span>Publishing Events</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="publishing-events" aria-haspopup="dialog" aria-label="Share link: Publishing Events"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Publish events to streams:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">geode_client</span> <span class="kn">import</span> <span class="n">Client</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">publish_user_action</span><span class="p">(</span><span class="n">client</span><span class="p">,</span> <span class="n">user_id</span><span class="p">,</span> <span class="n">action</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Publish user action event&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">event</span> <span class="o">=</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;event_time&#34;</span><span class="p">:</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span><span class="o">.</span><span class="n">isoformat</span><span class="p">(),</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;user_id&#34;</span><span class="p">:</span> <span class="n">user_id</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;action_type&#34;</span><span class="p">:</span> <span class="n">action</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;properties&#34;</span><span class="p">:</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;page&#34;</span><span class="p">:</span> <span class="s2">&#34;/products&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;duration_ms&#34;</span><span class="p">:</span> <span class="mi">1500</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish_event</span><span class="p">(</span><span class="s2">&#34;user_activities&#34;</span><span class="p">,</span> <span class="n">event</span><span class="p">)</span> </span></span></code></pre></div> <h4 id="consuming-events" class="position-relative d-flex align-items-center group"> <span>Consuming Events</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="consuming-events" aria-haspopup="dialog" aria-label="Share link: Consuming Events"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Subscribe to stream events:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">process_user_activities</span><span class="p">(</span><span class="n">client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Process user activity stream&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe_stream</span><span class="p">(</span><span class="s2">&#34;user_activities&#34;</span><span class="p">)</span> <span class="k">as</span> <span class="n">stream</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">stream</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;User </span><span class="si">{</span><span class="n">event</span><span class="p">[</span><span class="s1">&#39;user_id&#39;</span><span class="p">]</span><span class="si">}</span><span class="s2"> performed </span><span class="si">{</span><span class="n">event</span><span class="p">[</span><span class="s1">&#39;action_type&#39;</span><span class="p">]</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># React to specific actions</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;action_type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;purchase&#39;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">handle_purchase</span><span class="p">(</span><span class="n">event</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;action_type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;signup&#39;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">send_welcome_email</span><span class="p">(</span><span class="n">event</span><span class="p">[</span><span class="s1">&#39;user_id&#39;</span><span class="p">])</span> </span></span></code></pre></div> <h3 id="stream-processing-patterns" class="position-relative d-flex align-items-center group"> <span>Stream Processing Patterns</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="stream-processing-patterns" aria-haspopup="dialog" aria-label="Share link: Stream Processing Patterns"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="filter-streams" class="position-relative d-flex align-items-center group"> <span>Filter Streams</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="filter-streams" aria-haspopup="dialog" aria-label="Share link: Filter Streams"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Filter events based on criteria:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Stream</span><span class="w"> </span><span class="py">of</span><span class="w"> </span><span class="py">high</span><span class="err">-</span><span class="py">value</span><span class="w"> </span><span class="py">purchases</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">high_value_purchases</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w"> </span><span class="err">*</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">purchases</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WHERE</span><span class="w"> </span><span class="py">amount</span><span class="w"> </span><span class="err">&gt;</span><span class="w"> </span><span class="py">1000</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div> <h4 id="transform-streams" class="position-relative d-flex align-items-center group"> <span>Transform Streams</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="transform-streams" aria-haspopup="dialog" aria-label="Share link: Transform Streams"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Transform event data:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Enrich</span><span class="w"> </span><span class="py">events</span><span class="w"> </span><span class="py">with</span><span class="w"> </span><span class="py">user</span><span class="w"> </span><span class="py">data</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">enriched_purchases</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">p</span><span class="err">.</span><span class="py">purchase_id</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">p</span><span class="err">.</span><span class="py">amount</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">u</span><span class="err">.</span><span class="py">name</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">customer_name</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">u</span><span class="err">.</span><span class="py">tier</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">customer_tier</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">purchases</span><span class="w"> </span><span class="py">p</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">JOIN</span><span class="w"> </span><span class="py">users</span><span class="w"> </span><span class="py">u</span><span class="w"> </span><span class="py">ON</span><span class="w"> </span><span class="py">p</span><span class="err">.</span><span class="py">user_id</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="py">u</span><span class="err">.</span><span class="py">id</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div> <h4 id="aggregate-streams" class="position-relative d-flex align-items-center group"> <span>Aggregate Streams</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="aggregate-streams" aria-haspopup="dialog" aria-label="Share link: Aggregate Streams"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Compute aggregations over time:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Real</span><span class="err">-</span><span class="py">time</span><span class="w"> </span><span class="py">revenue</span><span class="w"> </span><span class="py">by</span><span class="w"> </span><span class="py">product</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">revenue_by_product</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">product_id</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">SUM</span><span class="p">(</span><span class="py">amount</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">total_revenue</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">COUNT</span><span class="p">(</span><span class="err">*</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">purchase_count</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">purchases</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">GROUP</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">product_id</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WINDOW</span><span class="w"> </span><span class="py">TUMBLING</span><span class="w"> </span><span class="p">(</span><span class="py">SIZE</span><span class="w"> </span><span class="py">1</span><span class="w"> </span><span class="py">HOUR</span><span class="p">)</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div> <h4 id="join-streams" class="position-relative d-flex align-items-center group"> <span>Join Streams</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="join-streams" aria-haspopup="dialog" aria-label="Share link: Join Streams"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Combine multiple streams:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Join</span><span class="w"> </span><span class="py">page</span><span class="w"> </span><span class="py">views</span><span class="w"> </span><span class="py">with</span><span class="w"> </span><span class="py">purchases</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">conversion_events</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">v</span><span class="err">.</span><span class="py">user_id</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">v</span><span class="err">.</span><span class="py">page_url</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">p</span><span class="err">.</span><span class="py">purchase_id</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">p</span><span class="err">.</span><span class="py">amount</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">(</span><span class="py">p</span><span class="err">.</span><span class="py">event_time</span><span class="w"> </span><span class="err">-</span><span class="w"> </span><span class="py">v</span><span class="err">.</span><span class="py">event_time</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">time_to_purchase</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">page_views</span><span class="w"> </span><span class="py">v</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">JOIN</span><span class="w"> </span><span class="py">purchases</span><span class="w"> </span><span class="py">p</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">ON</span><span class="w"> </span><span class="py">v</span><span class="err">.</span><span class="py">user_id</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="py">p</span><span class="err">.</span><span class="py">user_id</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">WITHIN</span><span class="w"> </span><span class="py">1</span><span class="w"> </span><span class="py">HOUR</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div> <h3 id="windowing-operations" class="position-relative d-flex align-items-center group"> <span>Windowing Operations</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="windowing-operations" aria-haspopup="dialog" aria-label="Share link: Windowing Operations"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="tumbling-windows" class="position-relative d-flex align-items-center group"> <span>Tumbling Windows</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="tumbling-windows" aria-haspopup="dialog" aria-label="Share link: Tumbling Windows"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Fixed-size, non-overlapping windows:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Count</span><span class="w"> </span><span class="py">events</span><span class="w"> </span><span class="py">per</span><span class="w"> </span><span class="py">5</span><span class="err">-</span><span class="py">minute</span><span class="w"> </span><span class="py">window</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">event_counts</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">TUMBLING_WINDOW</span><span class="p">(</span><span class="py">5</span><span class="w"> </span><span class="py">MINUTES</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">window</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">event_type</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">COUNT</span><span class="p">(</span><span class="err">*</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">count</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">events</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">GROUP</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">event_type</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div><p>Tumbling windows:</p> <ul> <li>Fixed size (e.g., 5 minutes)</li> <li>No overlap between windows</li> <li>Each event belongs to exactly one window</li> </ul> <h4 id="hopping-windows" class="position-relative d-flex align-items-center group"> <span>Hopping Windows</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="hopping-windows" aria-haspopup="dialog" aria-label="Share link: Hopping Windows"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Fixed-size windows with overlap:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Sliding</span><span class="w"> </span><span class="py">10</span><span class="err">-</span><span class="py">minute</span><span class="w"> </span><span class="py">window</span><span class="p">,</span><span class="w"> </span><span class="py">advancing</span><span class="w"> </span><span class="py">every</span><span class="w"> </span><span class="py">1</span><span class="w"> </span><span class="py">minute</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">sliding_averages</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">HOPPING_WINDOW</span><span class="p">(</span><span class="py">SIZE</span><span class="w"> </span><span class="py">10</span><span class="w"> </span><span class="py">MINUTES</span><span class="p">,</span><span class="w"> </span><span class="py">ADVANCE</span><span class="w"> </span><span class="py">1</span><span class="w"> </span><span class="py">MINUTE</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">window</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">AVG</span><span class="p">(</span><span class="py">response_time_ms</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">avg_response_time</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">api_requests</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div><p>Hopping windows:</p> <ul> <li>Fixed size with configurable advance</li> <li>Windows overlap</li> <li>Each event may appear in multiple windows</li> </ul> <h4 id="session-windows" class="position-relative d-flex align-items-center group"> <span>Session Windows</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="session-windows" aria-haspopup="dialog" aria-label="Share link: Session Windows"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Dynamic windows based on inactivity:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">User</span><span class="w"> </span><span class="py">sessions</span><span class="w"> </span><span class="p">(</span><span class="py">15</span><span class="w"> </span><span class="py">minute</span><span class="w"> </span><span class="py">timeout</span><span class="p">)</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">user_sessions</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">SESSION_WINDOW</span><span class="p">(</span><span class="py">15</span><span class="w"> </span><span class="py">MINUTES</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">session</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">user_id</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">COUNT</span><span class="p">(</span><span class="err">*</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">actions_in_session</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">COLLECT_LIST</span><span class="p">(</span><span class="py">action_type</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">actions</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">user_activities</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">GROUP</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">user_id</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div><p>Session windows:</p> <ul> <li>Variable size based on activity</li> <li>Gap threshold defines session boundary</li> <li>Useful for user behavior analysis</li> </ul> <h3 id="stream-processing" class="position-relative d-flex align-items-center group"> <span>Stream Processing</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="stream-processing" aria-haspopup="dialog" aria-label="Share link: Stream Processing"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="stateless-processing" class="position-relative d-flex align-items-center group"> <span>Stateless Processing</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="stateless-processing" aria-haspopup="dialog" aria-label="Share link: Stateless Processing"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Process each event independently:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">stateless_processor</span><span class="p">(</span><span class="n">event</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Process event without state&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Validate event</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="ow">not</span> <span class="n">validate_event</span><span class="p">(</span><span class="n">event</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Transform event</span> </span></span><span class="line"><span class="cl"> <span class="n">transformed</span> <span class="o">=</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;id&#34;</span><span class="p">:</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;id&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;timestamp&#34;</span><span class="p">:</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;timestamp&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;value&#34;</span><span class="p">:</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;value&#39;</span><span class="p">]</span> <span class="o">*</span> <span class="mf">1.5</span> <span class="c1"># Apply transformation</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Publish to output stream</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">publish</span><span class="p">(</span><span class="n">transformed</span><span class="p">)</span> </span></span></code></pre></div> <h4 id="stateful-processing" class="position-relative d-flex align-items-center group"> <span>Stateful Processing</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="stateful-processing" aria-haspopup="dialog" aria-label="Share link: Stateful Processing"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Maintain state across events:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">StatefulProcessor</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="p">{}</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Process event with state&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">user_id</span> <span class="o">=</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;user_id&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Retrieve user state</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">user_id</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="n">user_id</span><span class="p">]</span> <span class="o">=</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;total_purchases&#34;</span><span class="p">:</span> <span class="mi">0</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;total_spent&#34;</span><span class="p">:</span> <span class="mi">0</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Update state</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="n">user_id</span><span class="p">][</span><span class="s2">&#34;total_purchases&#34;</span><span class="p">]</span> <span class="o">+=</span> <span class="mi">1</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="n">user_id</span><span class="p">][</span><span class="s2">&#34;total_spent&#34;</span><span class="p">]</span> <span class="o">+=</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;amount&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Check for milestone</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="n">user_id</span><span class="p">][</span><span class="s2">&#34;total_spent&#34;</span><span class="p">]</span> <span class="o">&gt;</span> <span class="mi">10000</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">notify_vip_status</span><span class="p">(</span><span class="n">user_id</span><span class="p">)</span> </span></span></code></pre></div> <h4 id="complex-event-processing-cep" class="position-relative d-flex align-items-center group"> <span>Complex Event Processing (CEP)</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="complex-event-processing-cep" aria-haspopup="dialog" aria-label="Share link: Complex Event Processing (CEP)"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Detect patterns across multiple events:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Detect</span><span class="w"> </span><span class="py">suspicious</span><span class="w"> </span><span class="py">behavior</span><span class="w"> </span><span class="py">pattern</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">suspicious_activity</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">user_id</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">COLLECT_LIST</span><span class="p">(</span><span class="py">action_type</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">pattern</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">user_activities</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">MATCH_RECOGNIZE</span><span class="w"> </span><span class="p">(</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">PARTITION</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">user_id</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">ORDER</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">event_time</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">MEASURES</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">FIRST</span><span class="p">(</span><span class="py">A</span><span class="err">.</span><span class="py">event_time</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">start_time</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">LAST</span><span class="p">(</span><span class="py">B</span><span class="err">.</span><span class="py">event_time</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">end_time</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">PATTERN</span><span class="w"> </span><span class="p">(</span><span class="py">A</span><span class="w"> </span><span class="py">B</span><span class="err">+</span><span class="w"> </span><span class="py">C</span><span class="p">)</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">DEFINE</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">A</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">A</span><span class="err">.</span><span class="py">action_type</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="err">&#39;</span><span class="py">login_failure</span><span class="err">&#39;</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">B</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">B</span><span class="err">.</span><span class="py">action_type</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="err">&#39;</span><span class="py">login_failure</span><span class="err">&#39;</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">C</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">C</span><span class="err">.</span><span class="py">action_type</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="err">&#39;</span><span class="py">login_success</span><span class="err">&#39;</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">AND</span><span class="w"> </span><span class="py">C</span><span class="err">.</span><span class="py">ip_address</span><span class="w"> </span><span class="p">!=</span><span class="w"> </span><span class="py">A</span><span class="err">.</span><span class="py">ip_address</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div> <h3 id="event-sourcing" class="position-relative d-flex align-items-center group"> <span>Event Sourcing</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="event-sourcing" aria-haspopup="dialog" aria-label="Share link: Event Sourcing"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="event-store" class="position-relative d-flex align-items-center group"> <span>Event Store</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="event-store" aria-haspopup="dialog" aria-label="Share link: Event Store"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Store all events as source of truth:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Event</span><span class="err">-</span><span class="py">sourced</span><span class="w"> </span><span class="py">account</span><span class="w"> </span><span class="py">balance</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">EVENT_STORE</span><span class="w"> </span><span class="py">account_transactions</span><span class="w"> </span><span class="p">(</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">account_id</span><span class="w"> </span><span class="py">STRING</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">event_type</span><span class="w"> </span><span class="py">STRING</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">amount</span><span class="w"> </span><span class="py">DECIMAL</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">balance</span><span class="w"> </span><span class="py">DECIMAL</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">event_time</span><span class="w"> </span><span class="py">TIMESTAMP</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="py">PARTITION</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">account_id</span><span class="err">;</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="err">--</span><span class="w"> </span><span class="py">Publish</span><span class="w"> </span><span class="py">transaction</span><span class="w"> </span><span class="py">events</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">INSERT</span><span class="w"> </span><span class="py">INTO</span><span class="w"> </span><span class="py">account_transactions</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">VALUES</span><span class="w"> </span><span class="p">(</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="err">&#39;</span><span class="py">acct123</span><span class="err">&#39;</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="err">&#39;</span><span class="py">deposit</span><span class="err">&#39;</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">500</span><span class="mf">.00</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">1500</span><span class="mf">.00</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">NOW</span><span class="p">()</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div> <h4 id="event-replay" class="position-relative d-flex align-items-center group"> <span>Event Replay</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="event-replay" aria-haspopup="dialog" aria-label="Share link: Event Replay"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Rebuild state from events:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">rebuild_account_state</span><span class="p">(</span><span class="n">account_id</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Rebuild account state from events&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">balance</span> <span class="o">=</span> <span class="mi">0</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Replay all events</span> </span></span><span class="line"><span class="cl"> <span class="n">events</span> <span class="o">=</span> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">query</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;SELECT * FROM account_transactions WHERE account_id = $1 ORDER BY event_time&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="p">[</span><span class="n">account_id</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">events</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;event_type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;deposit&#39;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">balance</span> <span class="o">+=</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;amount&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;event_type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;withdrawal&#39;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">balance</span> <span class="o">-=</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;amount&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">balance</span> </span></span></code></pre></div> <h4 id="snapshots" class="position-relative d-flex align-items-center group"> <span>Snapshots</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="snapshots" aria-haspopup="dialog" aria-label="Share link: Snapshots"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Optimize replay with periodic snapshots:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Create</span><span class="w"> </span><span class="py">snapshot</span><span class="w"> </span><span class="py">table</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">TABLE</span><span class="w"> </span><span class="py">account_snapshots</span><span class="w"> </span><span class="p">(</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">account_id</span><span class="w"> </span><span class="py">STRING</span><span class="w"> </span><span class="py">PRIMARY</span><span class="w"> </span><span class="py">KEY</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">balance</span><span class="w"> </span><span class="py">DECIMAL</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">snapshot_time</span><span class="w"> </span><span class="py">TIMESTAMP</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">last_event_id</span><span class="w"> </span><span class="py">STRING</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="err">;</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="err">--</span><span class="w"> </span><span class="py">Periodically</span><span class="w"> </span><span class="py">save</span><span class="w"> </span><span class="py">snapshots</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">INSERT</span><span class="w"> </span><span class="py">INTO</span><span class="w"> </span><span class="py">account_snapshots</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">account_id</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">SUM</span><span class="p">(</span><span class="py">CASE</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">WHEN</span><span class="w"> </span><span class="py">event_type</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="err">&#39;</span><span class="py">deposit</span><span class="err">&#39;</span><span class="w"> </span><span class="py">THEN</span><span class="w"> </span><span class="py">amount</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">WHEN</span><span class="w"> </span><span class="py">event_type</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="err">&#39;</span><span class="py">withdrawal</span><span class="err">&#39;</span><span class="w"> </span><span class="py">THEN</span><span class="w"> </span><span class="err">-</span><span class="py">amount</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">END</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">balance</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">NOW</span><span class="p">()</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">snapshot_time</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">MAX</span><span class="p">(</span><span class="py">event_id</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">last_event_id</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">account_transactions</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">GROUP</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">account_id</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div> <h3 id="stream-integration" class="position-relative d-flex align-items-center group"> <span>Stream Integration</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="stream-integration" aria-haspopup="dialog" aria-label="Share link: Stream Integration"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="kafka-integration" class="position-relative d-flex align-items-center group"> <span>Kafka Integration</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="kafka-integration" aria-haspopup="dialog" aria-label="Share link: Kafka Integration"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Integrate with Apache Kafka:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">kafka</span> <span class="kn">import</span> <span class="n">KafkaProducer</span><span class="p">,</span> <span class="n">KafkaConsumer</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># Publish Geode events to Kafka</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">publish_to_kafka</span><span class="p">(</span><span class="n">geode_event</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="n">producer</span> <span class="o">=</span> <span class="n">KafkaProducer</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="n">bootstrap_servers</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;localhost:9092&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="n">value_serializer</span><span class="o">=</span><span class="k">lambda</span> <span class="n">v</span><span class="p">:</span> <span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="n">v</span><span class="p">)</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="n">producer</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="s1">&#39;user-activities&#39;</span><span class="p">,</span> <span class="n">geode_event</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="n">producer</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># Consume Kafka events to Geode</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">consume_from_kafka</span><span class="p">():</span> </span></span><span class="line"><span class="cl"> <span class="n">consumer</span> <span class="o">=</span> <span class="n">KafkaConsumer</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;user-activities&#39;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">bootstrap_servers</span><span class="o">=</span><span class="p">[</span><span class="s1">&#39;localhost:9092&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="n">value_deserializer</span><span class="o">=</span><span class="k">lambda</span> <span class="n">m</span><span class="p">:</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">m</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">))</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">consumer</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">geode_client</span><span class="o">.</span><span class="n">publish_event</span><span class="p">(</span><span class="s1">&#39;user_activities&#39;</span><span class="p">,</span> <span class="n">message</span><span class="o">.</span><span class="n">value</span><span class="p">)</span> </span></span></code></pre></div> <h4 id="pulsar-integration" class="position-relative d-flex align-items-center group"> <span>Pulsar Integration</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="pulsar-integration" aria-haspopup="dialog" aria-label="Share link: Pulsar Integration"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Connect with Apache Pulsar:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">pulsar</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">pulsar_to_geode</span><span class="p">():</span> </span></span><span class="line"><span class="cl"> <span class="n">client</span> <span class="o">=</span> <span class="n">pulsar</span><span class="o">.</span><span class="n">Client</span><span class="p">(</span><span class="s1">&#39;pulsar://localhost:6650&#39;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="n">consumer</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s1">&#39;user-events&#39;</span><span class="p">,</span> <span class="s1">&#39;geode-subscription&#39;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">while</span> <span class="kc">True</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">msg</span> <span class="o">=</span> <span class="n">consumer</span><span class="o">.</span><span class="n">receive</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">event</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">msg</span><span class="o">.</span><span class="n">data</span><span class="p">())</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">geode_client</span><span class="o">.</span><span class="n">publish_event</span><span class="p">(</span><span class="s1">&#39;user_activities&#39;</span><span class="p">,</span> <span class="n">event</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="n">consumer</span><span class="o">.</span><span class="n">acknowledge</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">consumer</span><span class="o">.</span><span class="n">negative_acknowledge</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span> </span></span></code></pre></div> <h3 id="performance-optimization" class="position-relative d-flex align-items-center group"> <span>Performance Optimization</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="performance-optimization" aria-haspopup="dialog" aria-label="Share link: Performance Optimization"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="batching" class="position-relative d-flex align-items-center group"> <span>Batching</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="batching" aria-haspopup="dialog" aria-label="Share link: Batching"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Improve throughput with batching:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">batch_processor</span><span class="p">(</span><span class="n">stream_name</span><span class="p">,</span> <span class="n">batch_size</span><span class="o">=</span><span class="mi">100</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Process events in batches&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">batch</span> <span class="o">=</span> <span class="p">[]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe_stream</span><span class="p">(</span><span class="n">stream_name</span><span class="p">)</span> <span class="k">as</span> <span class="n">stream</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">stream</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">batch</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">event</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">batch</span><span class="p">)</span> <span class="o">&gt;=</span> <span class="n">batch_size</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_batch</span><span class="p">(</span><span class="n">batch</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="n">batch</span> <span class="o">=</span> <span class="p">[]</span> </span></span></code></pre></div> <h4 id="parallel-processing" class="position-relative d-flex align-items-center group"> <span>Parallel Processing</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="parallel-processing" aria-haspopup="dialog" aria-label="Share link: Parallel Processing"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Scale with parallel consumers:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">asyncio</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">parallel_consumer</span><span class="p">(</span><span class="n">stream_name</span><span class="p">,</span> <span class="n">num_workers</span><span class="o">=</span><span class="mi">4</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Process stream with multiple workers&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">worker</span><span class="p">(</span><span class="n">worker_id</span><span class="p">,</span> <span class="n">partition</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe_stream</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="n">stream_name</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">partition</span><span class="o">=</span><span class="n">partition</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> <span class="k">as</span> <span class="n">stream</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">stream</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_event</span><span class="p">(</span><span class="n">event</span><span class="p">,</span> <span class="n">worker_id</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Start workers for each partition</span> </span></span><span class="line"><span class="cl"> <span class="n">tasks</span> <span class="o">=</span> <span class="p">[</span> </span></span><span class="line"><span class="cl"> <span class="n">worker</span><span class="p">(</span><span class="n">i</span><span class="p">,</span> <span class="n">i</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">num_workers</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="p">]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">gather</span><span class="p">(</span><span class="o">*</span><span class="n">tasks</span><span class="p">)</span> </span></span></code></pre></div> <h4 id="backpressure" class="position-relative d-flex align-items-center group"> <span>Backpressure</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="backpressure" aria-haspopup="dialog" aria-label="Share link: Backpressure"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Handle slow consumers:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">backpressure_handler</span><span class="p">(</span><span class="n">stream_name</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Handle backpressure gracefully&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">buffer</span> <span class="o">=</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">Queue</span><span class="p">(</span><span class="n">maxsize</span><span class="o">=</span><span class="mi">1000</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">producer</span><span class="p">():</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe_stream</span><span class="p">(</span><span class="n">stream_name</span><span class="p">)</span> <span class="k">as</span> <span class="n">stream</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">stream</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">buffer</span><span class="o">.</span><span class="n">put</span><span class="p">(</span><span class="n">event</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">consumer</span><span class="p">():</span> </span></span><span class="line"><span class="cl"> <span class="k">while</span> <span class="kc">True</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">event</span> <span class="o">=</span> <span class="k">await</span> <span class="n">buffer</span><span class="o">.</span><span class="n">get</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_event</span><span class="p">(</span><span class="n">event</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="n">buffer</span><span class="o">.</span><span class="n">task_done</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">gather</span><span class="p">(</span><span class="n">producer</span><span class="p">(),</span> <span class="n">consumer</span><span class="p">())</span> </span></span></code></pre></div> <h3 id="monitoring-streams" class="position-relative d-flex align-items-center group"> <span>Monitoring Streams</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="monitoring-streams" aria-haspopup="dialog" aria-label="Share link: Monitoring Streams"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="stream-metrics" class="position-relative d-flex align-items-center group"> <span>Stream Metrics</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="stream-metrics" aria-haspopup="dialog" aria-label="Share link: Stream Metrics"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Track stream health:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">SELECT</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">stream_name</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">events_per_second</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">bytes_per_second</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">consumer_lag</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">oldest_event_age</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">SYSTEM</span><span class="err">.</span><span class="py">streams</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WHERE</span><span class="w"> </span><span class="py">consumer_lag</span><span class="w"> </span><span class="err">&gt;</span><span class="w"> </span><span class="py">10000</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div> <h4 id="consumer-monitoring" class="position-relative d-flex align-items-center group"> <span>Consumer Monitoring</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="consumer-monitoring" aria-haspopup="dialog" aria-label="Share link: Consumer Monitoring"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Monitor consumer performance:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">SELECT</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">consumer_id</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">stream_name</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">throughput</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">error_rate</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">last_heartbeat</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">SYSTEM</span><span class="err">.</span><span class="py">stream_consumers</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WHERE</span><span class="w"> </span><span class="py">error_rate</span><span class="w"> </span><span class="err">&gt;</span><span class="w"> </span><span class="py">0</span><span class="mf">.01</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div> <h3 id="best-practices" class="position-relative d-flex align-items-center group"> <span>Best Practices</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="best-practices" aria-haspopup="dialog" aria-label="Share link: Best Practices"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="event-design" class="position-relative d-flex align-items-center group"> <span>Event Design</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="event-design" aria-haspopup="dialog" aria-label="Share link: Event Design"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><ul> <li>Use consistent event schemas</li> <li>Include sufficient context</li> <li>Version events for evolution</li> <li>Keep events immutable</li> <li>Use timestamps for ordering</li> </ul> <h4 id="stream-management" class="position-relative d-flex align-items-center group"> <span>Stream Management</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="stream-management" aria-haspopup="dialog" aria-label="Share link: Stream Management"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><ul> <li>Choose appropriate retention</li> <li>Partition for scalability</li> <li>Monitor and alert on lag</li> <li>Implement backpressure handling</li> <li>Test failure scenarios</li> </ul> <h4 id="processing-guarantees" class="position-relative d-flex align-items-center group"> <span>Processing Guarantees</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="processing-guarantees" aria-haspopup="dialog" aria-label="Share link: Processing Guarantees"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><ul> <li>Implement idempotent processing</li> <li>Use checkpointing for progress</li> <li>Handle exactly-once semantics</li> <li>Design for eventual consistency</li> <li>Monitor processing lag</li> </ul> <h3 id="related-topics" class="position-relative d-flex align-items-center group"> <span>Related Topics</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="related-topics" aria-haspopup="dialog" aria-label="Share link: Related Topics"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3><ul> <li><a href="/tags/cdc" >CDC</a> - Change data capture</li> <li><a href="/tags/pubsub" >Pub/Sub</a> - Publish-subscribe messaging</li> <li><a href="/tags/events" >Events</a> - Event-driven architecture</li> <li><a href="/tags/real-time" >Real-Time</a> - Real-time processing</li> </ul> <h3 id="learn-more" class="position-relative d-flex align-items-center group"> <span>Learn More</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="learn-more" aria-haspopup="dialog" aria-label="Share link: Learn More"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3><ul> <li><a href="https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying" aria-label="The Log: What every software engineer should know – opens in new window" target="_blank" rel="noopener noreferrer" >The Log: What every software engineer should know <span aria-hidden="true" class="external-icon">↗</span> </a> </li> <li><a href="https://www.confluent.io/blog/event-streaming-platform-1" aria-label="Event Streaming Patterns – opens in new window" target="_blank" rel="noopener noreferrer" >Event Streaming Patterns <span aria-hidden="true" class="external-icon">↗</span> </a> </li> <li><a href="https://www.confluent.io/resources/kafka-the-definitive-guide/" aria-label="Kafka: The Definitive Guide – opens in new window" target="_blank" rel="noopener noreferrer" >Kafka: The Definitive Guide <span aria-hidden="true" class="external-icon">↗</span> </a> </li> </ul>

Related Articles

No articles found with this tag yet.

Back to Home