<!-- CANARY: REQ=REQ-DOCS-001; FEATURE="Docs"; ASPECT=Documentation; STATUS=TESTED; OWNER=docs; UPDATED=2026-01-15 -->
<h2 id="event-streaming" class="position-relative d-flex align-items-center group">
<span>Event Streaming</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="event-streaming"
aria-haspopup="dialog"
aria-label="Share link: Event Streaming">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h2><div id="headingShareModal" class="heading-share-modal" role="dialog" aria-modal="true" aria-labelledby="headingShareTitle" hidden>
<div class="hsm-dialog" role="document">
<div class="hsm-header">
<h2 id="headingShareTitle" class="h6 mb-0 fw-bold">Share this section</h2>
<button type="button" class="hsm-close" aria-label="Close">
<i class="fa-solid fa-xmark"></i>
</button>
</div>
<div class="hsm-body">
<label for="headingShareInput" class="form-label small text-muted mb-1 text-uppercase fw-bold" style="font-size: 0.7rem; letter-spacing: 0.5px;">Permalink</label>
<div class="input-group mb-4 hsm-url-group">
<input id="headingShareInput" type="text" class="form-control font-monospace" readonly aria-readonly="true" style="font-size: 0.85rem;" />
<button class="btn btn-primary hsm-copy" type="button" aria-label="Copy" title="Copy">
<i class="fa-duotone fa-clipboard" aria-hidden="true"></i>
</button>
</div>
<div class="small fw-bold mb-2 text-muted text-uppercase" style="font-size: 0.7rem; letter-spacing: 0.5px;">Share via</div>
<div class="hsm-share-grid">
<a id="share-twitter" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer">
<i class="fa-brands fa-twitter me-2"></i>Twitter
</a>
<a id="share-linkedin" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer">
<i class="fa-brands fa-linkedin me-2"></i>LinkedIn
</a>
<a id="share-facebook" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer">
<i class="fa-brands fa-facebook me-2"></i>Facebook
</a>
</div>
</div>
</div>
</div>
<style>
.heading-share-modal {
position: fixed;
inset: 0;
display: flex;
justify-content: center;
align-items: center;
background: rgba(0, 0, 0, 0.6);
z-index: 1050;
padding: 1rem;
backdrop-filter: blur(4px);
-webkit-backdrop-filter: blur(4px);
}
.heading-share-modal[hidden] { display: none !important; }
.hsm-dialog {
max-width: 420px;
width: 100%;
background: var(--bs-body-bg, #fff);
color: var(--bs-body-color, #212529);
border: 1px solid var(--bs-border-color, rgba(0,0,0,0.1));
border-radius: 1rem;
box-shadow: 0 25px 50px -12px rgba(0, 0, 0, 0.25);
overflow: hidden;
animation: hsm-fade-in 0.2s ease-out;
}
@keyframes hsm-fade-in {
from { opacity: 0; transform: scale(0.95); }
to { opacity: 1; transform: scale(1); }
}
[data-bs-theme="dark"] .hsm-dialog {
background: #1e293b;
border-color: rgba(255,255,255,0.1);
color: #f8f9fa;
}
.hsm-header {
display: flex;
justify-content: space-between;
align-items: center;
padding: 1rem 1.5rem;
border-bottom: 1px solid var(--bs-border-color, rgba(0,0,0,0.1));
background: rgba(0,0,0,0.02);
}
[data-bs-theme="dark"] .hsm-header {
background: rgba(255,255,255,0.02);
border-color: rgba(255,255,255,0.1);
}
.hsm-close {
background: transparent;
border: none;
color: inherit;
opacity: 0.5;
padding: 0.25rem 0.5rem;
border-radius: 0.25rem;
font-size: 1.2rem;
line-height: 1;
transition: opacity 0.2s;
}
.hsm-close:hover {
opacity: 1;
}
.hsm-body {
padding: 1.5rem;
}
.hsm-url-group {
display: flex !important;
align-items: stretch;
}
.hsm-url-group .form-control {
flex: 1;
min-width: 0;
margin: 0;
background: var(--bs-secondary-bg, #f8f9fa);
border-color: var(--bs-border-color, #dee2e6);
border-top-right-radius: 0;
border-bottom-right-radius: 0;
height: 42px;
}
.hsm-url-group .btn {
flex: 0 0 auto;
margin: 0;
margin-left: -1px;
border-top-left-radius: 0;
border-bottom-left-radius: 0;
height: 42px;
display: flex;
align-items: center;
justify-content: center;
padding: 0 1.25rem;
z-index: 2;
}
[data-bs-theme="dark"] .hsm-url-group .form-control {
background: #0f172a;
border-color: #334155;
color: #e2e8f0;
}
.hsm-share-grid {
display: flex;
flex-direction: column;
gap: 0.5rem;
}
.hsm-share-grid .btn {
display: flex;
align-items: center;
justify-content: center;
font-size: 0.9rem;
padding: 0.6rem;
border-color: var(--bs-border-color);
width: 100%;
}
[data-bs-theme="dark"] .hsm-share-grid .btn {
color: #e2e8f0;
border-color: #475569;
}
[data-bs-theme="dark"] .hsm-share-grid .btn:hover {
background: #334155;
border-color: #cbd5e1;
}
</style>
<script>
(function(){
const modal = document.getElementById('headingShareModal');
if(!modal) return;
const input = modal.querySelector('#headingShareInput');
const copyBtn = modal.querySelector('.hsm-copy');
const twitter = modal.querySelector('#share-twitter');
const linkedin = modal.querySelector('#share-linkedin');
const facebook = modal.querySelector('#share-facebook');
const closeBtn = modal.querySelector('.hsm-close');
let lastFocus=null;
let trapBound=false;
function buildUrl(id){ return window.location.origin + window.location.pathname + '#' + id; }
function isOpen(){ return !modal.hasAttribute('hidden'); }
function hydrate(id){
const url=buildUrl(id);
input.value=url;
const enc=encodeURIComponent(url);
const text=encodeURIComponent(document.title);
if(twitter) twitter.href=`https://twitter.com/intent/tweet?url=${enc}&text=${text}`;
if(linkedin) linkedin.href=`https://www.linkedin.com/sharing/share-offsite/?url=${enc}`;
if(facebook) facebook.href=`https://www.facebook.com/sharer/sharer.php?u=${enc}`;
}
function openModal(id){
lastFocus=document.activeElement;
hydrate(id);
if(!isOpen()){
modal.removeAttribute('hidden');
}
requestAnimationFrame(()=>{ input.focus(); });
trapFocus();
}
function closeModal(){
if(!isOpen()) return;
modal.setAttribute('hidden','');
if(lastFocus && typeof lastFocus.focus==='function') lastFocus.focus();
}
function copyCurrent(){
try{ navigator.clipboard.writeText(input.value).then(()=>feedback(true),()=>fallback()); }
catch(e){ fallback(); }
}
function fallback(){ input.select(); try{ document.execCommand('copy'); feedback(true);}catch(e){ feedback(false);} }
function feedback(ok){ if(!copyBtn) return; const icon=copyBtn.querySelector('i'); if(!icon) return; const prev=copyBtn.getAttribute('data-prev')||icon.className; if(!copyBtn.getAttribute('data-prev')) copyBtn.setAttribute('data-prev',prev); icon.className= ok ? 'fa-duotone fa-clipboard-check':'fa-duotone fa-circle-exclamation'; setTimeout(()=>{ icon.className=prev; },1800); }
function handleShareClick(e){ e.preventDefault(); const btn=e.currentTarget; const id=btn.getAttribute('data-share-target'); if(id) openModal(id); }
function bindShareButtons(){
document.querySelectorAll('.h-share').forEach(btn=>{
if(!btn.dataset.hShareBound){ btn.addEventListener('click', handleShareClick); btn.dataset.hShareBound='1'; }
});
}
bindShareButtons();
if(document.readyState==='loading'){
document.addEventListener('DOMContentLoaded', bindShareButtons);
} else {
requestAnimationFrame(bindShareButtons);
}
document.addEventListener('click', function(e){
const shareBtn=e.target.closest && e.target.closest('.h-share');
if(shareBtn && !shareBtn.dataset.hShareBound){ handleShareClick.call(shareBtn, e); }
}, true);
document.addEventListener('click', e=>{
if(e.target===modal) closeModal();
if(e.target.closest && e.target.closest('.hsm-close')){ e.preventDefault(); closeModal(); }
if(copyBtn && (e.target===copyBtn || (e.target.closest && e.target.closest('.hsm-copy')))) { e.preventDefault(); copyCurrent(); }
});
document.addEventListener('keydown', e=>{ if(e.key==='Escape' && isOpen()) closeModal(); });
function trapFocus(){
if(trapBound) return;
trapBound=true;
modal.addEventListener('keydown', f=>{ if(f.key==='Tab' && isOpen()){ const focusable=[...modal.querySelectorAll('a[href],button,input,textarea,select,[tabindex]:not([tabindex="-1"])')].filter(el=>!el.hasAttribute('disabled')); if(!focusable.length) return; const first=focusable[0]; const last=focusable[focusable.length-1]; if(f.shiftKey && document.activeElement===first){ f.preventDefault(); last.focus(); } else if(!f.shiftKey && document.activeElement===last){ f.preventDefault(); first.focus(); } } });
}
if(closeBtn) closeBtn.addEventListener('click', e=>{ e.preventDefault(); closeModal(); });
})();
</script><p>Event streaming is a paradigm for capturing, storing, and processing continuous flows of events in real-time. Geode’s event streaming capabilities enable building reactive, scalable applications that respond to changes as they happen, powering use cases from real-time analytics to complex event processing.</p>
<h3 id="event-streaming-fundamentals" class="position-relative d-flex align-items-center group">
<span>Event Streaming Fundamentals</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="event-streaming-fundamentals"
aria-haspopup="dialog"
aria-label="Share link: Event Streaming Fundamentals">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="what-is-event-streaming" class="position-relative d-flex align-items-center group">
<span>What is Event Streaming?</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="what-is-event-streaming"
aria-haspopup="dialog"
aria-label="Share link: What is Event Streaming?">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Event streaming treats data as a continuous flow of events:</p>
<p><strong>Events</strong> - Immutable facts about what happened
<strong>Streams</strong> - Ordered sequences of events
<strong>Producers</strong> - Systems that emit events
<strong>Consumers</strong> - Systems that process events</p>
<h4 id="event-stream-characteristics" class="position-relative d-flex align-items-center group">
<span>Event Stream Characteristics</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="event-stream-characteristics"
aria-haspopup="dialog"
aria-label="Share link: Event Stream Characteristics">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Key properties of event streams:</p>
<ul>
<li><strong>Ordered</strong> - Events maintain temporal ordering</li>
<li><strong>Immutable</strong> - Events cannot be changed once written</li>
<li><strong>Append-Only</strong> - New events added to stream end</li>
<li><strong>Retained</strong> - Events stored for configurable duration</li>
<li><strong>Replayable</strong> - Consumers can replay historical events</li>
</ul>
<h3 id="geode-event-streaming" class="position-relative d-flex align-items-center group">
<span>Geode Event Streaming</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="geode-event-streaming"
aria-haspopup="dialog"
aria-label="Share link: Geode Event Streaming">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="creating-event-streams" class="position-relative d-flex align-items-center group">
<span>Creating Event Streams</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="creating-event-streams"
aria-haspopup="dialog"
aria-label="Share link: Creating Event Streams">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Define streams to capture graph events:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Create</span><span class="w"> </span><span class="py">stream</span><span class="w"> </span><span class="py">for</span><span class="w"> </span><span class="py">user</span><span class="w"> </span><span class="py">activities</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">user_activities</span><span class="w"> </span><span class="py">AS</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">event_time</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">user_id</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">action_type</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">properties</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">user_actions</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="err">--</span><span class="w"> </span><span class="py">Create</span><span class="w"> </span><span class="py">stream</span><span class="w"> </span><span class="py">with</span><span class="w"> </span><span class="py">windowing</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">page_views</span><span class="w"> </span><span class="py">AS</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">TUMBLING_WINDOW</span><span class="p">(</span><span class="py">5</span><span class="w"> </span><span class="py">MINUTES</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">window</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">page_url</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">COUNT</span><span class="p">(</span><span class="err">*</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">view_count</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">page_view_events</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">GROUP</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">page_url</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div>
<h4 id="publishing-events" class="position-relative d-flex align-items-center group">
<span>Publishing Events</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="publishing-events"
aria-haspopup="dialog"
aria-label="Share link: Publishing Events">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Publish events to streams:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">geode_client</span> <span class="kn">import</span> <span class="n">Client</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">publish_user_action</span><span class="p">(</span><span class="n">client</span><span class="p">,</span> <span class="n">user_id</span><span class="p">,</span> <span class="n">action</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Publish user action event"""</span>
</span></span><span class="line"><span class="cl"> <span class="n">event</span> <span class="o">=</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"event_time"</span><span class="p">:</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span><span class="o">.</span><span class="n">isoformat</span><span class="p">(),</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"user_id"</span><span class="p">:</span> <span class="n">user_id</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"action_type"</span><span class="p">:</span> <span class="n">action</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"properties"</span><span class="p">:</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"page"</span><span class="p">:</span> <span class="s2">"/products"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"duration_ms"</span><span class="p">:</span> <span class="mi">1500</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish_event</span><span class="p">(</span><span class="s2">"user_activities"</span><span class="p">,</span> <span class="n">event</span><span class="p">)</span>
</span></span></code></pre></div>
<h4 id="consuming-events" class="position-relative d-flex align-items-center group">
<span>Consuming Events</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="consuming-events"
aria-haspopup="dialog"
aria-label="Share link: Consuming Events">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Subscribe to stream events:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">process_user_activities</span><span class="p">(</span><span class="n">client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Process user activity stream"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe_stream</span><span class="p">(</span><span class="s2">"user_activities"</span><span class="p">)</span> <span class="k">as</span> <span class="n">stream</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">stream</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">"User </span><span class="si">{</span><span class="n">event</span><span class="p">[</span><span class="s1">'user_id'</span><span class="p">]</span><span class="si">}</span><span class="s2"> performed </span><span class="si">{</span><span class="n">event</span><span class="p">[</span><span class="s1">'action_type'</span><span class="p">]</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># React to specific actions</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">event</span><span class="p">[</span><span class="s1">'action_type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'purchase'</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">handle_purchase</span><span class="p">(</span><span class="n">event</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event</span><span class="p">[</span><span class="s1">'action_type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'signup'</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">send_welcome_email</span><span class="p">(</span><span class="n">event</span><span class="p">[</span><span class="s1">'user_id'</span><span class="p">])</span>
</span></span></code></pre></div>
<h3 id="stream-processing-patterns" class="position-relative d-flex align-items-center group">
<span>Stream Processing Patterns</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="stream-processing-patterns"
aria-haspopup="dialog"
aria-label="Share link: Stream Processing Patterns">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="filter-streams" class="position-relative d-flex align-items-center group">
<span>Filter Streams</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="filter-streams"
aria-haspopup="dialog"
aria-label="Share link: Filter Streams">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Filter events based on criteria:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Stream</span><span class="w"> </span><span class="py">of</span><span class="w"> </span><span class="py">high</span><span class="err">-</span><span class="py">value</span><span class="w"> </span><span class="py">purchases</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">high_value_purchases</span><span class="w"> </span><span class="py">AS</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w"> </span><span class="err">*</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">purchases</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WHERE</span><span class="w"> </span><span class="py">amount</span><span class="w"> </span><span class="err">></span><span class="w"> </span><span class="py">1000</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div>
<h4 id="transform-streams" class="position-relative d-flex align-items-center group">
<span>Transform Streams</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="transform-streams"
aria-haspopup="dialog"
aria-label="Share link: Transform Streams">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Transform event data:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Enrich</span><span class="w"> </span><span class="py">events</span><span class="w"> </span><span class="py">with</span><span class="w"> </span><span class="py">user</span><span class="w"> </span><span class="py">data</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">enriched_purchases</span><span class="w"> </span><span class="py">AS</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">p</span><span class="err">.</span><span class="py">purchase_id</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">p</span><span class="err">.</span><span class="py">amount</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">u</span><span class="err">.</span><span class="py">name</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">customer_name</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">u</span><span class="err">.</span><span class="py">tier</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">customer_tier</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">purchases</span><span class="w"> </span><span class="py">p</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">JOIN</span><span class="w"> </span><span class="py">users</span><span class="w"> </span><span class="py">u</span><span class="w"> </span><span class="py">ON</span><span class="w"> </span><span class="py">p</span><span class="err">.</span><span class="py">user_id</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="py">u</span><span class="err">.</span><span class="py">id</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div>
<h4 id="aggregate-streams" class="position-relative d-flex align-items-center group">
<span>Aggregate Streams</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="aggregate-streams"
aria-haspopup="dialog"
aria-label="Share link: Aggregate Streams">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Compute aggregations over time:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Real</span><span class="err">-</span><span class="py">time</span><span class="w"> </span><span class="py">revenue</span><span class="w"> </span><span class="py">by</span><span class="w"> </span><span class="py">product</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">revenue_by_product</span><span class="w"> </span><span class="py">AS</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">product_id</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">SUM</span><span class="p">(</span><span class="py">amount</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">total_revenue</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">COUNT</span><span class="p">(</span><span class="err">*</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">purchase_count</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">purchases</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">GROUP</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">product_id</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WINDOW</span><span class="w"> </span><span class="py">TUMBLING</span><span class="w"> </span><span class="p">(</span><span class="py">SIZE</span><span class="w"> </span><span class="py">1</span><span class="w"> </span><span class="py">HOUR</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div>
<h4 id="join-streams" class="position-relative d-flex align-items-center group">
<span>Join Streams</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="join-streams"
aria-haspopup="dialog"
aria-label="Share link: Join Streams">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Combine multiple streams:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Join</span><span class="w"> </span><span class="py">page</span><span class="w"> </span><span class="py">views</span><span class="w"> </span><span class="py">with</span><span class="w"> </span><span class="py">purchases</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">conversion_events</span><span class="w"> </span><span class="py">AS</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">v</span><span class="err">.</span><span class="py">user_id</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">v</span><span class="err">.</span><span class="py">page_url</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">p</span><span class="err">.</span><span class="py">purchase_id</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">p</span><span class="err">.</span><span class="py">amount</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">(</span><span class="py">p</span><span class="err">.</span><span class="py">event_time</span><span class="w"> </span><span class="err">-</span><span class="w"> </span><span class="py">v</span><span class="err">.</span><span class="py">event_time</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">time_to_purchase</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">page_views</span><span class="w"> </span><span class="py">v</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">JOIN</span><span class="w"> </span><span class="py">purchases</span><span class="w"> </span><span class="py">p</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">ON</span><span class="w"> </span><span class="py">v</span><span class="err">.</span><span class="py">user_id</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="py">p</span><span class="err">.</span><span class="py">user_id</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">WITHIN</span><span class="w"> </span><span class="py">1</span><span class="w"> </span><span class="py">HOUR</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div>
<h3 id="windowing-operations" class="position-relative d-flex align-items-center group">
<span>Windowing Operations</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="windowing-operations"
aria-haspopup="dialog"
aria-label="Share link: Windowing Operations">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="tumbling-windows" class="position-relative d-flex align-items-center group">
<span>Tumbling Windows</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="tumbling-windows"
aria-haspopup="dialog"
aria-label="Share link: Tumbling Windows">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Fixed-size, non-overlapping windows:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Count</span><span class="w"> </span><span class="py">events</span><span class="w"> </span><span class="py">per</span><span class="w"> </span><span class="py">5</span><span class="err">-</span><span class="py">minute</span><span class="w"> </span><span class="py">window</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">event_counts</span><span class="w"> </span><span class="py">AS</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">TUMBLING_WINDOW</span><span class="p">(</span><span class="py">5</span><span class="w"> </span><span class="py">MINUTES</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">window</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">event_type</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">COUNT</span><span class="p">(</span><span class="err">*</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">count</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">events</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">GROUP</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">event_type</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div><p>Tumbling windows:</p>
<ul>
<li>Fixed size (e.g., 5 minutes)</li>
<li>No overlap between windows</li>
<li>Each event belongs to exactly one window</li>
</ul>
<h4 id="hopping-windows" class="position-relative d-flex align-items-center group">
<span>Hopping Windows</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="hopping-windows"
aria-haspopup="dialog"
aria-label="Share link: Hopping Windows">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Fixed-size windows with overlap:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Sliding</span><span class="w"> </span><span class="py">10</span><span class="err">-</span><span class="py">minute</span><span class="w"> </span><span class="py">window</span><span class="p">,</span><span class="w"> </span><span class="py">advancing</span><span class="w"> </span><span class="py">every</span><span class="w"> </span><span class="py">1</span><span class="w"> </span><span class="py">minute</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">sliding_averages</span><span class="w"> </span><span class="py">AS</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">HOPPING_WINDOW</span><span class="p">(</span><span class="py">SIZE</span><span class="w"> </span><span class="py">10</span><span class="w"> </span><span class="py">MINUTES</span><span class="p">,</span><span class="w"> </span><span class="py">ADVANCE</span><span class="w"> </span><span class="py">1</span><span class="w"> </span><span class="py">MINUTE</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">window</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">AVG</span><span class="p">(</span><span class="py">response_time_ms</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">avg_response_time</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">api_requests</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div><p>Hopping windows:</p>
<ul>
<li>Fixed size with configurable advance</li>
<li>Windows overlap</li>
<li>Each event may appear in multiple windows</li>
</ul>
<h4 id="session-windows" class="position-relative d-flex align-items-center group">
<span>Session Windows</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="session-windows"
aria-haspopup="dialog"
aria-label="Share link: Session Windows">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Dynamic windows based on inactivity:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">User</span><span class="w"> </span><span class="py">sessions</span><span class="w"> </span><span class="p">(</span><span class="py">15</span><span class="w"> </span><span class="py">minute</span><span class="w"> </span><span class="py">timeout</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">user_sessions</span><span class="w"> </span><span class="py">AS</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">SESSION_WINDOW</span><span class="p">(</span><span class="py">15</span><span class="w"> </span><span class="py">MINUTES</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">session</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">user_id</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">COUNT</span><span class="p">(</span><span class="err">*</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">actions_in_session</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">COLLECT_LIST</span><span class="p">(</span><span class="py">action_type</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">actions</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">user_activities</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">GROUP</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">user_id</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div><p>Session windows:</p>
<ul>
<li>Variable size based on activity</li>
<li>Gap threshold defines session boundary</li>
<li>Useful for user behavior analysis</li>
</ul>
<h3 id="stream-processing" class="position-relative d-flex align-items-center group">
<span>Stream Processing</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="stream-processing"
aria-haspopup="dialog"
aria-label="Share link: Stream Processing">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="stateless-processing" class="position-relative d-flex align-items-center group">
<span>Stateless Processing</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="stateless-processing"
aria-haspopup="dialog"
aria-label="Share link: Stateless Processing">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Process each event independently:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">stateless_processor</span><span class="p">(</span><span class="n">event</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Process event without state"""</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Validate event</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="ow">not</span> <span class="n">validate_event</span><span class="p">(</span><span class="n">event</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Transform event</span>
</span></span><span class="line"><span class="cl"> <span class="n">transformed</span> <span class="o">=</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"id"</span><span class="p">:</span> <span class="n">event</span><span class="p">[</span><span class="s1">'id'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"timestamp"</span><span class="p">:</span> <span class="n">event</span><span class="p">[</span><span class="s1">'timestamp'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"value"</span><span class="p">:</span> <span class="n">event</span><span class="p">[</span><span class="s1">'value'</span><span class="p">]</span> <span class="o">*</span> <span class="mf">1.5</span> <span class="c1"># Apply transformation</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Publish to output stream</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">publish</span><span class="p">(</span><span class="n">transformed</span><span class="p">)</span>
</span></span></code></pre></div>
<h4 id="stateful-processing" class="position-relative d-flex align-items-center group">
<span>Stateful Processing</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="stateful-processing"
aria-haspopup="dialog"
aria-label="Share link: Stateful Processing">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Maintain state across events:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">StatefulProcessor</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="p">{}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Process event with state"""</span>
</span></span><span class="line"><span class="cl"> <span class="n">user_id</span> <span class="o">=</span> <span class="n">event</span><span class="p">[</span><span class="s1">'user_id'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Retrieve user state</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">user_id</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="n">user_id</span><span class="p">]</span> <span class="o">=</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"total_purchases"</span><span class="p">:</span> <span class="mi">0</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"total_spent"</span><span class="p">:</span> <span class="mi">0</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Update state</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="n">user_id</span><span class="p">][</span><span class="s2">"total_purchases"</span><span class="p">]</span> <span class="o">+=</span> <span class="mi">1</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="n">user_id</span><span class="p">][</span><span class="s2">"total_spent"</span><span class="p">]</span> <span class="o">+=</span> <span class="n">event</span><span class="p">[</span><span class="s1">'amount'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Check for milestone</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="n">user_id</span><span class="p">][</span><span class="s2">"total_spent"</span><span class="p">]</span> <span class="o">></span> <span class="mi">10000</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">notify_vip_status</span><span class="p">(</span><span class="n">user_id</span><span class="p">)</span>
</span></span></code></pre></div>
<h4 id="complex-event-processing-cep" class="position-relative d-flex align-items-center group">
<span>Complex Event Processing (CEP)</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="complex-event-processing-cep"
aria-haspopup="dialog"
aria-label="Share link: Complex Event Processing (CEP)">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Detect patterns across multiple events:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Detect</span><span class="w"> </span><span class="py">suspicious</span><span class="w"> </span><span class="py">behavior</span><span class="w"> </span><span class="py">pattern</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">STREAM</span><span class="w"> </span><span class="py">suspicious_activity</span><span class="w"> </span><span class="py">AS</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">user_id</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">COLLECT_LIST</span><span class="p">(</span><span class="py">action_type</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">pattern</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">user_activities</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">MATCH_RECOGNIZE</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">PARTITION</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">user_id</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">ORDER</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">event_time</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">MEASURES</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">FIRST</span><span class="p">(</span><span class="py">A</span><span class="err">.</span><span class="py">event_time</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">start_time</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">LAST</span><span class="p">(</span><span class="py">B</span><span class="err">.</span><span class="py">event_time</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">end_time</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">PATTERN</span><span class="w"> </span><span class="p">(</span><span class="py">A</span><span class="w"> </span><span class="py">B</span><span class="err">+</span><span class="w"> </span><span class="py">C</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">DEFINE</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">A</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">A</span><span class="err">.</span><span class="py">action_type</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="err">'</span><span class="py">login_failure</span><span class="err">'</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">B</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">B</span><span class="err">.</span><span class="py">action_type</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="err">'</span><span class="py">login_failure</span><span class="err">'</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">C</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">C</span><span class="err">.</span><span class="py">action_type</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="err">'</span><span class="py">login_success</span><span class="err">'</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">AND</span><span class="w"> </span><span class="py">C</span><span class="err">.</span><span class="py">ip_address</span><span class="w"> </span><span class="p">!=</span><span class="w"> </span><span class="py">A</span><span class="err">.</span><span class="py">ip_address</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">EMIT</span><span class="w"> </span><span class="py">CHANGES</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div>
<h3 id="event-sourcing" class="position-relative d-flex align-items-center group">
<span>Event Sourcing</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="event-sourcing"
aria-haspopup="dialog"
aria-label="Share link: Event Sourcing">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="event-store" class="position-relative d-flex align-items-center group">
<span>Event Store</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="event-store"
aria-haspopup="dialog"
aria-label="Share link: Event Store">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Store all events as source of truth:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Event</span><span class="err">-</span><span class="py">sourced</span><span class="w"> </span><span class="py">account</span><span class="w"> </span><span class="py">balance</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">EVENT_STORE</span><span class="w"> </span><span class="py">account_transactions</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">account_id</span><span class="w"> </span><span class="py">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">event_type</span><span class="w"> </span><span class="py">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">amount</span><span class="w"> </span><span class="py">DECIMAL</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">balance</span><span class="w"> </span><span class="py">DECIMAL</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">event_time</span><span class="w"> </span><span class="py">TIMESTAMP</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="py">PARTITION</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">account_id</span><span class="err">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="err">--</span><span class="w"> </span><span class="py">Publish</span><span class="w"> </span><span class="py">transaction</span><span class="w"> </span><span class="py">events</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">INSERT</span><span class="w"> </span><span class="py">INTO</span><span class="w"> </span><span class="py">account_transactions</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">VALUES</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="err">'</span><span class="py">acct123</span><span class="err">'</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="err">'</span><span class="py">deposit</span><span class="err">'</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">500</span><span class="mf">.00</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">1500</span><span class="mf">.00</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">NOW</span><span class="p">()</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div>
<h4 id="event-replay" class="position-relative d-flex align-items-center group">
<span>Event Replay</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="event-replay"
aria-haspopup="dialog"
aria-label="Share link: Event Replay">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Rebuild state from events:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">rebuild_account_state</span><span class="p">(</span><span class="n">account_id</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Rebuild account state from events"""</span>
</span></span><span class="line"><span class="cl"> <span class="n">balance</span> <span class="o">=</span> <span class="mi">0</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Replay all events</span>
</span></span><span class="line"><span class="cl"> <span class="n">events</span> <span class="o">=</span> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">query</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"SELECT * FROM account_transactions WHERE account_id = $1 ORDER BY event_time"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="p">[</span><span class="n">account_id</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">events</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">event</span><span class="p">[</span><span class="s1">'event_type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'deposit'</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">balance</span> <span class="o">+=</span> <span class="n">event</span><span class="p">[</span><span class="s1">'amount'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event</span><span class="p">[</span><span class="s1">'event_type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'withdrawal'</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">balance</span> <span class="o">-=</span> <span class="n">event</span><span class="p">[</span><span class="s1">'amount'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">balance</span>
</span></span></code></pre></div>
<h4 id="snapshots" class="position-relative d-flex align-items-center group">
<span>Snapshots</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="snapshots"
aria-haspopup="dialog"
aria-label="Share link: Snapshots">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Optimize replay with periodic snapshots:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Create</span><span class="w"> </span><span class="py">snapshot</span><span class="w"> </span><span class="py">table</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">TABLE</span><span class="w"> </span><span class="py">account_snapshots</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">account_id</span><span class="w"> </span><span class="py">STRING</span><span class="w"> </span><span class="py">PRIMARY</span><span class="w"> </span><span class="py">KEY</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">balance</span><span class="w"> </span><span class="py">DECIMAL</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">snapshot_time</span><span class="w"> </span><span class="py">TIMESTAMP</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">last_event_id</span><span class="w"> </span><span class="py">STRING</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="err">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="err">--</span><span class="w"> </span><span class="py">Periodically</span><span class="w"> </span><span class="py">save</span><span class="w"> </span><span class="py">snapshots</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">INSERT</span><span class="w"> </span><span class="py">INTO</span><span class="w"> </span><span class="py">account_snapshots</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">account_id</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">SUM</span><span class="p">(</span><span class="py">CASE</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">WHEN</span><span class="w"> </span><span class="py">event_type</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="err">'</span><span class="py">deposit</span><span class="err">'</span><span class="w"> </span><span class="py">THEN</span><span class="w"> </span><span class="py">amount</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">WHEN</span><span class="w"> </span><span class="py">event_type</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="err">'</span><span class="py">withdrawal</span><span class="err">'</span><span class="w"> </span><span class="py">THEN</span><span class="w"> </span><span class="err">-</span><span class="py">amount</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">END</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">balance</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">NOW</span><span class="p">()</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">snapshot_time</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">MAX</span><span class="p">(</span><span class="py">event_id</span><span class="p">)</span><span class="w"> </span><span class="py">AS</span><span class="w"> </span><span class="py">last_event_id</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">account_transactions</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">GROUP</span><span class="w"> </span><span class="py">BY</span><span class="w"> </span><span class="py">account_id</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div>
<h3 id="stream-integration" class="position-relative d-flex align-items-center group">
<span>Stream Integration</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="stream-integration"
aria-haspopup="dialog"
aria-label="Share link: Stream Integration">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="kafka-integration" class="position-relative d-flex align-items-center group">
<span>Kafka Integration</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="kafka-integration"
aria-haspopup="dialog"
aria-label="Share link: Kafka Integration">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Integrate with Apache Kafka:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">kafka</span> <span class="kn">import</span> <span class="n">KafkaProducer</span><span class="p">,</span> <span class="n">KafkaConsumer</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Publish Geode events to Kafka</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">publish_to_kafka</span><span class="p">(</span><span class="n">geode_event</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="n">producer</span> <span class="o">=</span> <span class="n">KafkaProducer</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="n">bootstrap_servers</span><span class="o">=</span><span class="p">[</span><span class="s1">'localhost:9092'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="n">value_serializer</span><span class="o">=</span><span class="k">lambda</span> <span class="n">v</span><span class="p">:</span> <span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="n">v</span><span class="p">)</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="n">producer</span><span class="o">.</span><span class="n">send</span><span class="p">(</span><span class="s1">'user-activities'</span><span class="p">,</span> <span class="n">geode_event</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="n">producer</span><span class="o">.</span><span class="n">flush</span><span class="p">()</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Consume Kafka events to Geode</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">consume_from_kafka</span><span class="p">():</span>
</span></span><span class="line"><span class="cl"> <span class="n">consumer</span> <span class="o">=</span> <span class="n">KafkaConsumer</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'user-activities'</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">bootstrap_servers</span><span class="o">=</span><span class="p">[</span><span class="s1">'localhost:9092'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="n">value_deserializer</span><span class="o">=</span><span class="k">lambda</span> <span class="n">m</span><span class="p">:</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">m</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">))</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">consumer</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">geode_client</span><span class="o">.</span><span class="n">publish_event</span><span class="p">(</span><span class="s1">'user_activities'</span><span class="p">,</span> <span class="n">message</span><span class="o">.</span><span class="n">value</span><span class="p">)</span>
</span></span></code></pre></div>
<h4 id="pulsar-integration" class="position-relative d-flex align-items-center group">
<span>Pulsar Integration</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="pulsar-integration"
aria-haspopup="dialog"
aria-label="Share link: Pulsar Integration">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Connect with Apache Pulsar:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">pulsar</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">pulsar_to_geode</span><span class="p">():</span>
</span></span><span class="line"><span class="cl"> <span class="n">client</span> <span class="o">=</span> <span class="n">pulsar</span><span class="o">.</span><span class="n">Client</span><span class="p">(</span><span class="s1">'pulsar://localhost:6650'</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="n">consumer</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s1">'user-events'</span><span class="p">,</span> <span class="s1">'geode-subscription'</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">while</span> <span class="kc">True</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">msg</span> <span class="o">=</span> <span class="n">consumer</span><span class="o">.</span><span class="n">receive</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">event</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">msg</span><span class="o">.</span><span class="n">data</span><span class="p">())</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">geode_client</span><span class="o">.</span><span class="n">publish_event</span><span class="p">(</span><span class="s1">'user_activities'</span><span class="p">,</span> <span class="n">event</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="n">consumer</span><span class="o">.</span><span class="n">acknowledge</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">consumer</span><span class="o">.</span><span class="n">negative_acknowledge</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span>
</span></span></code></pre></div>
<h3 id="performance-optimization" class="position-relative d-flex align-items-center group">
<span>Performance Optimization</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="performance-optimization"
aria-haspopup="dialog"
aria-label="Share link: Performance Optimization">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="batching" class="position-relative d-flex align-items-center group">
<span>Batching</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="batching"
aria-haspopup="dialog"
aria-label="Share link: Batching">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Improve throughput with batching:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">batch_processor</span><span class="p">(</span><span class="n">stream_name</span><span class="p">,</span> <span class="n">batch_size</span><span class="o">=</span><span class="mi">100</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Process events in batches"""</span>
</span></span><span class="line"><span class="cl"> <span class="n">batch</span> <span class="o">=</span> <span class="p">[]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe_stream</span><span class="p">(</span><span class="n">stream_name</span><span class="p">)</span> <span class="k">as</span> <span class="n">stream</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">stream</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">batch</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">event</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">batch</span><span class="p">)</span> <span class="o">>=</span> <span class="n">batch_size</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_batch</span><span class="p">(</span><span class="n">batch</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="n">batch</span> <span class="o">=</span> <span class="p">[]</span>
</span></span></code></pre></div>
<h4 id="parallel-processing" class="position-relative d-flex align-items-center group">
<span>Parallel Processing</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="parallel-processing"
aria-haspopup="dialog"
aria-label="Share link: Parallel Processing">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Scale with parallel consumers:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">asyncio</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">parallel_consumer</span><span class="p">(</span><span class="n">stream_name</span><span class="p">,</span> <span class="n">num_workers</span><span class="o">=</span><span class="mi">4</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Process stream with multiple workers"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">worker</span><span class="p">(</span><span class="n">worker_id</span><span class="p">,</span> <span class="n">partition</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe_stream</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="n">stream_name</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">partition</span><span class="o">=</span><span class="n">partition</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span> <span class="k">as</span> <span class="n">stream</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">stream</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_event</span><span class="p">(</span><span class="n">event</span><span class="p">,</span> <span class="n">worker_id</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Start workers for each partition</span>
</span></span><span class="line"><span class="cl"> <span class="n">tasks</span> <span class="o">=</span> <span class="p">[</span>
</span></span><span class="line"><span class="cl"> <span class="n">worker</span><span class="p">(</span><span class="n">i</span><span class="p">,</span> <span class="n">i</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">num_workers</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="p">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">gather</span><span class="p">(</span><span class="o">*</span><span class="n">tasks</span><span class="p">)</span>
</span></span></code></pre></div>
<h4 id="backpressure" class="position-relative d-flex align-items-center group">
<span>Backpressure</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="backpressure"
aria-haspopup="dialog"
aria-label="Share link: Backpressure">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Handle slow consumers:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">backpressure_handler</span><span class="p">(</span><span class="n">stream_name</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Handle backpressure gracefully"""</span>
</span></span><span class="line"><span class="cl"> <span class="n">buffer</span> <span class="o">=</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">Queue</span><span class="p">(</span><span class="n">maxsize</span><span class="o">=</span><span class="mi">1000</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">producer</span><span class="p">():</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe_stream</span><span class="p">(</span><span class="n">stream_name</span><span class="p">)</span> <span class="k">as</span> <span class="n">stream</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">stream</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">buffer</span><span class="o">.</span><span class="n">put</span><span class="p">(</span><span class="n">event</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">consumer</span><span class="p">():</span>
</span></span><span class="line"><span class="cl"> <span class="k">while</span> <span class="kc">True</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">event</span> <span class="o">=</span> <span class="k">await</span> <span class="n">buffer</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_event</span><span class="p">(</span><span class="n">event</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="n">buffer</span><span class="o">.</span><span class="n">task_done</span><span class="p">()</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">gather</span><span class="p">(</span><span class="n">producer</span><span class="p">(),</span> <span class="n">consumer</span><span class="p">())</span>
</span></span></code></pre></div>
<h3 id="monitoring-streams" class="position-relative d-flex align-items-center group">
<span>Monitoring Streams</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="monitoring-streams"
aria-haspopup="dialog"
aria-label="Share link: Monitoring Streams">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="stream-metrics" class="position-relative d-flex align-items-center group">
<span>Stream Metrics</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="stream-metrics"
aria-haspopup="dialog"
aria-label="Share link: Stream Metrics">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Track stream health:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">stream_name</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">events_per_second</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">bytes_per_second</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">consumer_lag</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">oldest_event_age</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">SYSTEM</span><span class="err">.</span><span class="py">streams</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WHERE</span><span class="w"> </span><span class="py">consumer_lag</span><span class="w"> </span><span class="err">></span><span class="w"> </span><span class="py">10000</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div>
<h4 id="consumer-monitoring" class="position-relative d-flex align-items-center group">
<span>Consumer Monitoring</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="consumer-monitoring"
aria-haspopup="dialog"
aria-label="Share link: Consumer Monitoring">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Monitor consumer performance:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">consumer_id</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">stream_name</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">throughput</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">error_rate</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">last_heartbeat</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">SYSTEM</span><span class="err">.</span><span class="py">stream_consumers</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WHERE</span><span class="w"> </span><span class="py">error_rate</span><span class="w"> </span><span class="err">></span><span class="w"> </span><span class="py">0</span><span class="mf">.01</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div>
<h3 id="best-practices" class="position-relative d-flex align-items-center group">
<span>Best Practices</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="best-practices"
aria-haspopup="dialog"
aria-label="Share link: Best Practices">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="event-design" class="position-relative d-flex align-items-center group">
<span>Event Design</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="event-design"
aria-haspopup="dialog"
aria-label="Share link: Event Design">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><ul>
<li>Use consistent event schemas</li>
<li>Include sufficient context</li>
<li>Version events for evolution</li>
<li>Keep events immutable</li>
<li>Use timestamps for ordering</li>
</ul>
<h4 id="stream-management" class="position-relative d-flex align-items-center group">
<span>Stream Management</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="stream-management"
aria-haspopup="dialog"
aria-label="Share link: Stream Management">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><ul>
<li>Choose appropriate retention</li>
<li>Partition for scalability</li>
<li>Monitor and alert on lag</li>
<li>Implement backpressure handling</li>
<li>Test failure scenarios</li>
</ul>
<h4 id="processing-guarantees" class="position-relative d-flex align-items-center group">
<span>Processing Guarantees</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="processing-guarantees"
aria-haspopup="dialog"
aria-label="Share link: Processing Guarantees">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><ul>
<li>Implement idempotent processing</li>
<li>Use checkpointing for progress</li>
<li>Handle exactly-once semantics</li>
<li>Design for eventual consistency</li>
<li>Monitor processing lag</li>
</ul>
<h3 id="related-topics" class="position-relative d-flex align-items-center group">
<span>Related Topics</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="related-topics"
aria-haspopup="dialog"
aria-label="Share link: Related Topics">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3><ul>
<li><a
href="/tags/cdc"
>CDC</a>
- Change data capture</li>
<li><a
href="/tags/pubsub"
>Pub/Sub</a>
- Publish-subscribe messaging</li>
<li><a
href="/tags/events"
>Events</a>
- Event-driven architecture</li>
<li><a
href="/tags/real-time"
>Real-Time</a>
- Real-time processing</li>
</ul>
<h3 id="learn-more" class="position-relative d-flex align-items-center group">
<span>Learn More</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="learn-more"
aria-haspopup="dialog"
aria-label="Share link: Learn More">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3><ul>
<li><a
href="https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying"
aria-label="The Log: What every software engineer should know – opens in new window"
target="_blank" rel="noopener noreferrer"
>The Log: What every software engineer should know
<span aria-hidden="true" class="external-icon">↗</span>
</a>
</li>
<li><a
href="https://www.confluent.io/blog/event-streaming-platform-1"
aria-label="Event Streaming Patterns – opens in new window"
target="_blank" rel="noopener noreferrer"
>Event Streaming Patterns
<span aria-hidden="true" class="external-icon">↗</span>
</a>
</li>
<li><a
href="https://www.confluent.io/resources/kafka-the-definitive-guide/"
aria-label="Kafka: The Definitive Guide – opens in new window"
target="_blank" rel="noopener noreferrer"
>Kafka: The Definitive Guide
<span aria-hidden="true" class="external-icon">↗</span>
</a>
</li>
</ul>
Related Articles
No articles found with this tag yet.
Back to Home