<!-- CANARY: REQ=REQ-DOCS-001; FEATURE="Docs"; ASPECT=Documentation; STATUS=TESTED; OWNER=docs; UPDATED=2026-01-15 --> <h2 id="publish-subscribe-messaging" class="position-relative d-flex align-items-center group"> <span>Publish-Subscribe Messaging</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="publish-subscribe-messaging" aria-haspopup="dialog" aria-label="Share link: Publish-Subscribe Messaging"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h2><div id="headingShareModal" class="heading-share-modal" role="dialog" aria-modal="true" aria-labelledby="headingShareTitle" hidden> <div class="hsm-dialog" role="document"> <div class="hsm-header"> <h2 id="headingShareTitle" class="h6 mb-0 fw-bold">Share this section</h2> <button type="button" class="hsm-close" aria-label="Close"> <i class="fa-solid fa-xmark"></i> </button> </div> <div class="hsm-body"> <label for="headingShareInput" class="form-label small text-muted mb-1 text-uppercase fw-bold" style="font-size: 0.7rem; letter-spacing: 0.5px;">Permalink</label> <div class="input-group mb-4 hsm-url-group"> <input id="headingShareInput" type="text" class="form-control font-monospace" readonly aria-readonly="true" style="font-size: 0.85rem;" /> <button class="btn btn-primary hsm-copy" type="button" aria-label="Copy" title="Copy"> <i class="fa-duotone fa-clipboard" aria-hidden="true"></i> </button> </div> <div class="small fw-bold mb-2 text-muted text-uppercase" style="font-size: 0.7rem; letter-spacing: 0.5px;">Share via</div> <div class="hsm-share-grid"> <a id="share-twitter" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer"> <i class="fa-brands fa-twitter me-2"></i>Twitter </a> <a id="share-linkedin" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer"> <i class="fa-brands fa-linkedin me-2"></i>LinkedIn </a> <a id="share-facebook" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer"> <i class="fa-brands fa-facebook me-2"></i>Facebook </a> </div> </div> </div> </div> <style> .heading-share-modal { position: fixed; inset: 0; display: flex; justify-content: center; align-items: center; background: rgba(0, 0, 0, 0.6); z-index: 1050; padding: 1rem; backdrop-filter: blur(4px); -webkit-backdrop-filter: blur(4px); } .heading-share-modal[hidden] { display: none !important; } .hsm-dialog { max-width: 420px; width: 100%; background: var(--bs-body-bg, #fff); color: var(--bs-body-color, #212529); border: 1px solid var(--bs-border-color, rgba(0,0,0,0.1)); border-radius: 1rem; box-shadow: 0 25px 50px -12px rgba(0, 0, 0, 0.25); overflow: hidden; animation: hsm-fade-in 0.2s ease-out; } @keyframes hsm-fade-in { from { opacity: 0; transform: scale(0.95); } to { opacity: 1; transform: scale(1); } } [data-bs-theme="dark"] .hsm-dialog { background: #1e293b; border-color: rgba(255,255,255,0.1); color: #f8f9fa; } .hsm-header { display: flex; justify-content: space-between; align-items: center; padding: 1rem 1.5rem; border-bottom: 1px solid var(--bs-border-color, rgba(0,0,0,0.1)); background: rgba(0,0,0,0.02); } [data-bs-theme="dark"] .hsm-header { background: rgba(255,255,255,0.02); border-color: rgba(255,255,255,0.1); } .hsm-close { background: transparent; border: none; color: inherit; opacity: 0.5; padding: 0.25rem 0.5rem; border-radius: 0.25rem; font-size: 1.2rem; line-height: 1; transition: opacity 0.2s; } .hsm-close:hover { opacity: 1; } .hsm-body { padding: 1.5rem; } .hsm-url-group { display: flex !important; align-items: stretch; } .hsm-url-group .form-control { flex: 1; min-width: 0; margin: 0; background: var(--bs-secondary-bg, #f8f9fa); border-color: var(--bs-border-color, #dee2e6); border-top-right-radius: 0; border-bottom-right-radius: 0; height: 42px; } .hsm-url-group .btn { flex: 0 0 auto; margin: 0; margin-left: -1px; border-top-left-radius: 0; border-bottom-left-radius: 0; height: 42px; display: flex; align-items: center; justify-content: center; padding: 0 1.25rem; z-index: 2; } [data-bs-theme="dark"] .hsm-url-group .form-control { background: #0f172a; border-color: #334155; color: #e2e8f0; } .hsm-share-grid { display: flex; flex-direction: column; gap: 0.5rem; } .hsm-share-grid .btn { display: flex; align-items: center; justify-content: center; font-size: 0.9rem; padding: 0.6rem; border-color: var(--bs-border-color); width: 100%; } [data-bs-theme="dark"] .hsm-share-grid .btn { color: #e2e8f0; border-color: #475569; } [data-bs-theme="dark"] .hsm-share-grid .btn:hover { background: #334155; border-color: #cbd5e1; } </style> <script> (function(){ const modal = document.getElementById('headingShareModal'); if(!modal) return; const input = modal.querySelector('#headingShareInput'); const copyBtn = modal.querySelector('.hsm-copy'); const twitter = modal.querySelector('#share-twitter'); const linkedin = modal.querySelector('#share-linkedin'); const facebook = modal.querySelector('#share-facebook'); const closeBtn = modal.querySelector('.hsm-close'); let lastFocus=null; let trapBound=false; function buildUrl(id){ return window.location.origin + window.location.pathname + '#' + id; } function isOpen(){ return !modal.hasAttribute('hidden'); } function hydrate(id){ const url=buildUrl(id); input.value=url; const enc=encodeURIComponent(url); const text=encodeURIComponent(document.title); if(twitter) twitter.href=`https://twitter.com/intent/tweet?url=${enc}&text=${text}`; if(linkedin) linkedin.href=`https://www.linkedin.com/sharing/share-offsite/?url=${enc}`; if(facebook) facebook.href=`https://www.facebook.com/sharer/sharer.php?u=${enc}`; } function openModal(id){ lastFocus=document.activeElement; hydrate(id); if(!isOpen()){ modal.removeAttribute('hidden'); } requestAnimationFrame(()=>{ input.focus(); }); trapFocus(); } function closeModal(){ if(!isOpen()) return; modal.setAttribute('hidden',''); if(lastFocus && typeof lastFocus.focus==='function') lastFocus.focus(); } function copyCurrent(){ try{ navigator.clipboard.writeText(input.value).then(()=>feedback(true),()=>fallback()); } catch(e){ fallback(); } } function fallback(){ input.select(); try{ document.execCommand('copy'); feedback(true);}catch(e){ feedback(false);} } function feedback(ok){ if(!copyBtn) return; const icon=copyBtn.querySelector('i'); if(!icon) return; const prev=copyBtn.getAttribute('data-prev')||icon.className; if(!copyBtn.getAttribute('data-prev')) copyBtn.setAttribute('data-prev',prev); icon.className= ok ? 'fa-duotone fa-clipboard-check':'fa-duotone fa-circle-exclamation'; setTimeout(()=>{ icon.className=prev; },1800); } function handleShareClick(e){ e.preventDefault(); const btn=e.currentTarget; const id=btn.getAttribute('data-share-target'); if(id) openModal(id); } function bindShareButtons(){ document.querySelectorAll('.h-share').forEach(btn=>{ if(!btn.dataset.hShareBound){ btn.addEventListener('click', handleShareClick); btn.dataset.hShareBound='1'; } }); } bindShareButtons(); if(document.readyState==='loading'){ document.addEventListener('DOMContentLoaded', bindShareButtons); } else { requestAnimationFrame(bindShareButtons); } document.addEventListener('click', function(e){ const shareBtn=e.target.closest && e.target.closest('.h-share'); if(shareBtn && !shareBtn.dataset.hShareBound){ handleShareClick.call(shareBtn, e); } }, true); document.addEventListener('click', e=>{ if(e.target===modal) closeModal(); if(e.target.closest && e.target.closest('.hsm-close')){ e.preventDefault(); closeModal(); } if(copyBtn && (e.target===copyBtn || (e.target.closest && e.target.closest('.hsm-copy')))) { e.preventDefault(); copyCurrent(); } }); document.addEventListener('keydown', e=>{ if(e.key==='Escape' && isOpen()) closeModal(); }); function trapFocus(){ if(trapBound) return; trapBound=true; modal.addEventListener('keydown', f=>{ if(f.key==='Tab' && isOpen()){ const focusable=[...modal.querySelectorAll('a[href],button,input,textarea,select,[tabindex]:not([tabindex="-1"])')].filter(el=>!el.hasAttribute('disabled')); if(!focusable.length) return; const first=focusable[0]; const last=focusable[focusable.length-1]; if(f.shiftKey && document.activeElement===first){ f.preventDefault(); last.focus(); } else if(!f.shiftKey && document.activeElement===last){ f.preventDefault(); first.focus(); } } }); } if(closeBtn) closeBtn.addEventListener('click', e=>{ e.preventDefault(); closeModal(); }); })(); </script><p>Publish-Subscribe (Pub/Sub) is a messaging pattern where publishers send messages to topics without knowing who will receive them, and subscribers receive messages from topics they&rsquo;re interested in without knowing who sent them. This decoupling enables scalable, flexible, event-driven architectures.</p> <h3 id="pubsub-fundamentals" class="position-relative d-flex align-items-center group"> <span>Pub/Sub Fundamentals</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="pubsub-fundamentals" aria-haspopup="dialog" aria-label="Share link: Pub/Sub Fundamentals"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="core-concepts" class="position-relative d-flex align-items-center group"> <span>Core Concepts</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="core-concepts" aria-haspopup="dialog" aria-label="Share link: Core Concepts"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p><strong>Publishers</strong> - Send messages to topics <strong>Subscribers</strong> - Receive messages from topics <strong>Topics</strong> - Named channels for message distribution <strong>Messages</strong> - Data packets being exchanged</p> <h4 id="benefits" class="position-relative d-flex align-items-center group"> <span>Benefits</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="benefits" aria-haspopup="dialog" aria-label="Share link: Benefits"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><ul> <li><strong>Decoupling</strong> - Publishers and subscribers are independent</li> <li><strong>Scalability</strong> - Add subscribers without affecting publishers</li> <li><strong>Flexibility</strong> - Dynamic subscription management</li> <li><strong>Fan-Out</strong> - One message reaches multiple subscribers</li> </ul> <h3 id="geode-pubsub" class="position-relative d-flex align-items-center group"> <span>Geode Pub/Sub</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="geode-pubsub" aria-haspopup="dialog" aria-label="Share link: Geode Pub/Sub"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="creating-topics" class="position-relative d-flex align-items-center group"> <span>Creating Topics</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="creating-topics" aria-haspopup="dialog" aria-label="Share link: Creating Topics"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Define topics for message distribution:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Create</span><span class="w"> </span><span class="py">topic</span><span class="w"> </span><span class="py">for</span><span class="w"> </span><span class="py">user</span><span class="w"> </span><span class="py">events</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">TOPIC</span><span class="w"> </span><span class="py">user_events</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WITH</span><span class="w"> </span><span class="py">OPTIONS</span><span class="w"> </span><span class="p">(</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">retention_period</span><span class="p">:</span><span class="w"> </span><span class="err">&#39;</span><span class="nc">7</span><span class="w"> </span><span class="py">days</span><span class="err">&#39;</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">message_ttl</span><span class="p">:</span><span class="w"> </span><span class="err">&#39;</span><span class="nc">24</span><span class="w"> </span><span class="py">hours</span><span class="err">&#39;</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">max_subscribers</span><span class="p">:</span><span class="w"> </span><span class="nc">100</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="err">;</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="err">--</span><span class="w"> </span><span class="py">Create</span><span class="w"> </span><span class="py">topic</span><span class="w"> </span><span class="py">with</span><span class="w"> </span><span class="py">message</span><span class="w"> </span><span class="kd">schema</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">TOPIC</span><span class="w"> </span><span class="py">order_notifications</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kc">SCHEMA</span><span class="w"> </span><span class="p">(</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">order_id</span><span class="w"> </span><span class="py">STRING</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">status</span><span class="w"> </span><span class="py">STRING</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">timestamp</span><span class="w"> </span><span class="py">TIMESTAMP</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">details</span><span class="w"> </span><span class="py">MAP</span><span class="err">&lt;</span><span class="py">STRING</span><span class="p">,</span><span class="w"> </span><span class="py">ANY</span><span class="err">&gt;</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div> <h4 id="publishing-messages" class="position-relative d-flex align-items-center group"> <span>Publishing Messages</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="publishing-messages" aria-haspopup="dialog" aria-label="Share link: Publishing Messages"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Publish to topics:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">geode_client</span> <span class="kn">import</span> <span class="n">Client</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">publish_user_event</span><span class="p">(</span><span class="n">client</span><span class="p">,</span> <span class="n">user_id</span><span class="p">,</span> <span class="n">event_type</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Publish user event to topic&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">message</span> <span class="o">=</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;user_id&#34;</span><span class="p">:</span> <span class="n">user_id</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;event_type&#34;</span><span class="p">:</span> <span class="n">event_type</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;timestamp&#34;</span><span class="p">:</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span><span class="o">.</span><span class="n">isoformat</span><span class="p">(),</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;metadata&#34;</span><span class="p">:</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;source&#34;</span><span class="p">:</span> <span class="s2">&#34;user-service&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;version&#34;</span><span class="p">:</span> <span class="s2">&#34;1.0&#34;</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">&#34;user_events&#34;</span><span class="p">,</span> <span class="n">message</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Published: </span><span class="si">{</span><span class="n">event_type</span><span class="si">}</span><span class="s2"> for user </span><span class="si">{</span><span class="n">user_id</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">)</span> </span></span></code></pre></div> <h4 id="subscribing-to-topics" class="position-relative d-flex align-items-center group"> <span>Subscribing to Topics</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="subscribing-to-topics" aria-haspopup="dialog" aria-label="Share link: Subscribing to Topics"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Subscribe to receive messages:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">subscribe_to_user_events</span><span class="p">(</span><span class="n">client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Subscribe to user event topic&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">&#34;user_events&#34;</span><span class="p">)</span> <span class="k">as</span> <span class="n">subscription</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">subscription</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">handle_user_event</span><span class="p">(</span><span class="n">message</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">handle_user_event</span><span class="p">(</span><span class="n">message</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Process user event&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">event_type</span> <span class="o">=</span> <span class="n">message</span><span class="p">[</span><span class="s2">&#34;event_type&#34;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> <span class="n">user_id</span> <span class="o">=</span> <span class="n">message</span><span class="p">[</span><span class="s2">&#34;user_id&#34;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">event_type</span> <span class="o">==</span> <span class="s2">&#34;signup&#34;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">send_welcome_email</span><span class="p">(</span><span class="n">user_id</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event_type</span> <span class="o">==</span> <span class="s2">&#34;purchase&#34;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">update_analytics</span><span class="p">(</span><span class="n">user_id</span><span class="p">,</span> <span class="n">message</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event_type</span> <span class="o">==</span> <span class="s2">&#34;churn&#34;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">trigger_retention_campaign</span><span class="p">(</span><span class="n">user_id</span><span class="p">)</span> </span></span></code></pre></div> <h3 id="messaging-patterns" class="position-relative d-flex align-items-center group"> <span>Messaging Patterns</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="messaging-patterns" aria-haspopup="dialog" aria-label="Share link: Messaging Patterns"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="fan-out-pattern" class="position-relative d-flex align-items-center group"> <span>Fan-Out Pattern</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="fan-out-pattern" aria-haspopup="dialog" aria-label="Share link: Fan-Out Pattern"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>One message to many subscribers:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Publisher</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">notify_all</span><span class="p">(</span><span class="n">client</span><span class="p">,</span> <span class="n">notification</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Send notification to all subscribers&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">&#34;notifications&#34;</span><span class="p">,</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;title&#34;</span><span class="p">:</span> <span class="n">notification</span><span class="o">.</span><span class="n">title</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;body&#34;</span><span class="p">:</span> <span class="n">notification</span><span class="o">.</span><span class="n">body</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;priority&#34;</span><span class="p">:</span> <span class="s2">&#34;high&#34;</span> </span></span><span class="line"><span class="cl"> <span class="p">})</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># Multiple subscribers receive the same message</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">email_subscriber</span><span class="p">(</span><span class="n">client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">&#34;notifications&#34;</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">msg</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">send_email</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">sms_subscriber</span><span class="p">(</span><span class="n">client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">&#34;notifications&#34;</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">msg</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">send_sms</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">push_subscriber</span><span class="p">(</span><span class="n">client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">&#34;notifications&#34;</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">msg</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">send_push_notification</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span> </span></span></code></pre></div> <h4 id="topic-filtering" class="position-relative d-flex align-items-center group"> <span>Topic Filtering</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="topic-filtering" aria-haspopup="dialog" aria-label="Share link: Topic Filtering"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Subscribe with filters:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Subscribe</span><span class="w"> </span><span class="kd">on</span><span class="py">ly</span><span class="w"> </span><span class="py">to</span><span class="w"> </span><span class="py">high</span><span class="err">-</span><span class="py">priority</span><span class="w"> </span><span class="py">orders</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SUBSCRIBE</span><span class="w"> </span><span class="py">TO</span><span class="w"> </span><span class="py">order_notifications</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WHERE</span><span class="w"> </span><span class="py">priority</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="err">&#39;</span><span class="py">high</span><span class="err">&#39;;</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="err">--</span><span class="w"> </span><span class="py">Subscribe</span><span class="w"> </span><span class="py">to</span><span class="w"> </span><span class="py">specific</span><span class="w"> </span><span class="py">user</span><span class="err">&#39;</span><span class="py">s</span><span class="w"> </span><span class="py">events</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SUBSCRIBE</span><span class="w"> </span><span class="py">TO</span><span class="w"> </span><span class="py">user_events</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WHERE</span><span class="w"> </span><span class="py">user_id</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="err">&#39;</span><span class="py">user123</span><span class="err">&#39;;</span><span class="w"> </span></span></span></code></pre></div><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">filtered_subscription</span><span class="p">(</span><span class="n">client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Subscribe with filter&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;order_notifications&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="nb">filter</span><span class="o">=</span><span class="s2">&#34;priority = &#39;high&#39;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> <span class="k">as</span> <span class="n">subscription</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">subscription</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">handle_urgent_order</span><span class="p">(</span><span class="n">message</span><span class="p">)</span> </span></span></code></pre></div> <h4 id="request-reply-pattern" class="position-relative d-flex align-items-center group"> <span>Request-Reply Pattern</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="request-reply-pattern" aria-haspopup="dialog" aria-label="Share link: Request-Reply Pattern"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Implement RPC over pub/sub:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">uuid</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">RPCClient</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">client</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">pending_requests</span> <span class="o">=</span> <span class="p">{}</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">reply_topic</span> <span class="o">=</span> <span class="sa">f</span><span class="s2">&#34;replies_</span><span class="si">{</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">()</span><span class="si">}</span><span class="s2">&#34;</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">call</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">method</span><span class="p">,</span> <span class="n">params</span><span class="p">,</span> <span class="n">timeout</span><span class="o">=</span><span class="mf">5.0</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Make RPC call&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">request_id</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Set up reply handler</span> </span></span><span class="line"><span class="cl"> <span class="n">reply_future</span> <span class="o">=</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">Future</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">pending_requests</span><span class="p">[</span><span class="n">request_id</span><span class="p">]</span> <span class="o">=</span> <span class="n">reply_future</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Publish request</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">&#34;rpc_requests&#34;</span><span class="p">,</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;id&#34;</span><span class="p">:</span> <span class="n">request_id</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;method&#34;</span><span class="p">:</span> <span class="n">method</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;params&#34;</span><span class="p">:</span> <span class="n">params</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;reply_to&#34;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">reply_topic</span> </span></span><span class="line"><span class="cl"> <span class="p">})</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Wait for reply</span> </span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">result</span> <span class="o">=</span> <span class="k">await</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">wait_for</span><span class="p">(</span><span class="n">reply_future</span><span class="p">,</span> <span class="n">timeout</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">result</span> </span></span><span class="line"><span class="cl"> <span class="k">finally</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">del</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_requests</span><span class="p">[</span><span class="n">request_id</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">listen_for_replies</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Listen for RPC replies&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">reply_topic</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">request_id</span> <span class="o">=</span> <span class="n">message</span><span class="p">[</span><span class="s2">&#34;request_id&#34;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">request_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_requests</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">pending_requests</span><span class="p">[</span><span class="n">request_id</span><span class="p">]</span><span class="o">.</span><span class="n">set_result</span><span class="p">(</span><span class="n">message</span><span class="p">[</span><span class="s2">&#34;result&#34;</span><span class="p">])</span> </span></span></code></pre></div> <h4 id="topic-hierarchies" class="position-relative d-flex align-items-center group"> <span>Topic Hierarchies</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="topic-hierarchies" aria-haspopup="dialog" aria-label="Share link: Topic Hierarchies"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Organize topics hierarchically:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-fallback" data-lang="fallback"><span class="line"><span class="cl">users/ </span></span><span class="line"><span class="cl"> users/created </span></span><span class="line"><span class="cl"> users/updated </span></span><span class="line"><span class="cl"> users/deleted </span></span><span class="line"><span class="cl"> users/verified </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl">orders/ </span></span><span class="line"><span class="cl"> orders/placed </span></span><span class="line"><span class="cl"> orders/shipped </span></span><span class="line"><span class="cl"> orders/delivered </span></span><span class="line"><span class="cl"> orders/canceled </span></span></code></pre></div><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Subscribe to all user events</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">&#34;users/*&#34;</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">handle_user_event</span><span class="p">(</span><span class="n">message</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># Subscribe to specific event type</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">&#34;users/created&#34;</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">handle_new_user</span><span class="p">(</span><span class="n">message</span><span class="p">)</span> </span></span></code></pre></div> <h3 id="message-delivery-guarantees" class="position-relative d-flex align-items-center group"> <span>Message Delivery Guarantees</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="message-delivery-guarantees" aria-haspopup="dialog" aria-label="Share link: Message Delivery Guarantees"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="at-most-once" class="position-relative d-flex align-items-center group"> <span>At-Most-Once</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="at-most-once" aria-haspopup="dialog" aria-label="Share link: At-Most-Once"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Message delivered zero or one time:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">CREATE</span><span class="w"> </span><span class="py">TOPIC</span><span class="w"> </span><span class="py">notifications</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WITH</span><span class="w"> </span><span class="py">DELIVERY</span><span class="w"> </span><span class="py">at_most_once</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div><ul> <li>Fastest performance</li> <li>No retries</li> <li>Suitable for non-critical notifications</li> </ul> <h4 id="at-least-once" class="position-relative d-flex align-items-center group"> <span>At-Least-Once</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="at-least-once" aria-haspopup="dialog" aria-label="Share link: At-Least-Once"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Message delivered one or more times:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">CREATE</span><span class="w"> </span><span class="py">TOPIC</span><span class="w"> </span><span class="py">orders</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WITH</span><span class="w"> </span><span class="py">DELIVERY</span><span class="w"> </span><span class="py">at_least_once</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div><ul> <li>Messages may be duplicated</li> <li>Subscribers must be idempotent</li> <li>Suitable for most applications</li> </ul> <h4 id="exactly-once" class="position-relative d-flex align-items-center group"> <span>Exactly-Once</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="exactly-once" aria-haspopup="dialog" aria-label="Share link: Exactly-Once"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Message delivered exactly one time:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">CREATE</span><span class="w"> </span><span class="py">TOPIC</span><span class="w"> </span><span class="py">transactions</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WITH</span><span class="w"> </span><span class="py">DELIVERY</span><span class="w"> </span><span class="py">exactly_once</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div><ul> <li>Highest latency</li> <li>No duplicates</li> <li>Suitable for financial transactions</li> </ul> <h3 id="advanced-features" class="position-relative d-flex align-items-center group"> <span>Advanced Features</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="advanced-features" aria-haspopup="dialog" aria-label="Share link: Advanced Features"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="message-priorities" class="position-relative d-flex align-items-center group"> <span>Message Priorities</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="message-priorities" aria-haspopup="dialog" aria-label="Share link: Message Priorities"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Prioritize message delivery:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Publish with priority</span> </span></span><span class="line"><span class="cl"><span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">&#34;tasks&#34;</span><span class="p">,</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;task_id&#34;</span><span class="p">:</span> <span class="s2">&#34;task123&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;action&#34;</span><span class="p">:</span> <span class="s2">&#34;process_payment&#34;</span> </span></span><span class="line"><span class="cl"><span class="p">},</span> <span class="n">priority</span><span class="o">=</span><span class="s2">&#34;high&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># Subscribe to high-priority messages first</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;tasks&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">priority_order</span><span class="o">=</span><span class="kc">True</span> </span></span><span class="line"><span class="cl"><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_task</span><span class="p">(</span><span class="n">message</span><span class="p">)</span> </span></span></code></pre></div> <h4 id="message-expiration" class="position-relative d-flex align-items-center group"> <span>Message Expiration</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="message-expiration" aria-haspopup="dialog" aria-label="Share link: Message Expiration"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Set message TTL:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Publish with TTL</span> </span></span><span class="line"><span class="cl"><span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">&#34;flash_sales&#34;</span><span class="p">,</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;sale_id&#34;</span><span class="p">:</span> <span class="s2">&#34;sale123&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;discount&#34;</span><span class="p">:</span> <span class="mf">0.5</span> </span></span><span class="line"><span class="cl"><span class="p">},</span> <span class="n">ttl_seconds</span><span class="o">=</span><span class="mi">3600</span><span class="p">)</span> <span class="c1"># Expire after 1 hour</span> </span></span></code></pre></div> <h4 id="dead-letter-queues" class="position-relative d-flex align-items-center group"> <span>Dead Letter Queues</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="dead-letter-queues" aria-haspopup="dialog" aria-label="Share link: Dead Letter Queues"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Handle failed messages:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">CREATE</span><span class="w"> </span><span class="py">TOPIC</span><span class="w"> </span><span class="py">payment_processing</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WITH</span><span class="w"> </span><span class="py">OPTIONS</span><span class="w"> </span><span class="p">(</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">max_retries</span><span class="p">:</span><span class="w"> </span><span class="nc">3</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">dead_letter_topic</span><span class="p">:</span><span class="w"> </span><span class="err">&#39;</span><span class="nc">payment_failures</span><span class="err">&#39;</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">process_failed_payments</span><span class="p">(</span><span class="n">client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Handle messages that exceeded retry limit&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">&#34;payment_failures&#34;</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">failed_message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">manual_review</span><span class="p">(</span><span class="n">failed_message</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">notify_support</span><span class="p">(</span><span class="n">failed_message</span><span class="p">)</span> </span></span></code></pre></div> <h4 id="message-batching" class="position-relative d-flex align-items-center group"> <span>Message Batching</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="message-batching" aria-haspopup="dialog" aria-label="Share link: Message Batching"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Improve throughput with batching:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">batch_publisher</span><span class="p">(</span><span class="n">client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Publish messages in batches&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">messages</span> <span class="o">=</span> <span class="p">[</span> </span></span><span class="line"><span class="cl"> <span class="p">{</span><span class="s2">&#34;user_id&#34;</span><span class="p">:</span> <span class="sa">f</span><span class="s2">&#34;user</span><span class="si">{</span><span class="n">i</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">,</span> <span class="s2">&#34;action&#34;</span><span class="p">:</span> <span class="s2">&#34;login&#34;</span><span class="p">}</span> </span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">1000</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="p">]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Batch publish</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish_batch</span><span class="p">(</span><span class="s2">&#34;user_events&#34;</span><span class="p">,</span> <span class="n">messages</span><span class="p">)</span> </span></span></code></pre></div> <h3 id="monitoring-and-observability" class="position-relative d-flex align-items-center group"> <span>Monitoring and Observability</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="monitoring-and-observability" aria-haspopup="dialog" aria-label="Share link: Monitoring and Observability"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="topic-metrics" class="position-relative d-flex align-items-center group"> <span>Topic Metrics</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="topic-metrics" aria-haspopup="dialog" aria-label="Share link: Topic Metrics"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Monitor topic health:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">SELECT</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">topic_name</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">messages_published</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">messages_consumed</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">pending_messages</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">subscriber_count</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">publish_rate</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">consume_rate</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">SYSTEM</span><span class="err">.</span><span class="py">topics</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WHERE</span><span class="w"> </span><span class="py">pending_messages</span><span class="w"> </span><span class="err">&gt;</span><span class="w"> </span><span class="py">10000</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div> <h4 id="subscriber-lag" class="position-relative d-flex align-items-center group"> <span>Subscriber Lag</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="subscriber-lag" aria-haspopup="dialog" aria-label="Share link: Subscriber Lag"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Track consumer lag:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">SELECT</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">subscriber_id</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">topic_name</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">lag_messages</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">lag_seconds</span><span class="p">,</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">last_message_time</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">SYSTEM</span><span class="err">.</span><span class="py">subscribers</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WHERE</span><span class="w"> </span><span class="py">lag_messages</span><span class="w"> </span><span class="err">&gt;</span><span class="w"> </span><span class="py">1000</span><span class="err">;</span><span class="w"> </span></span></span></code></pre></div> <h3 id="best-practices" class="position-relative d-flex align-items-center group"> <span>Best Practices</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="best-practices" aria-haspopup="dialog" aria-label="Share link: Best Practices"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="message-design" class="position-relative d-flex align-items-center group"> <span>Message Design</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="message-design" aria-haspopup="dialog" aria-label="Share link: Message Design"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><ul> <li>Keep messages small and focused</li> <li>Include timestamp and metadata</li> <li>Version message schemas</li> <li>Make messages self-contained</li> <li>Use consistent naming conventions</li> </ul> <h4 id="subscription-management" class="position-relative d-flex align-items-center group"> <span>Subscription Management</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="subscription-management" aria-haspopup="dialog" aria-label="Share link: Subscription Management"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><ul> <li>Clean up unused subscriptions</li> <li>Monitor subscriber lag</li> <li>Implement backpressure handling</li> <li>Use appropriate delivery guarantees</li> <li>Handle redelivery idempotently</li> </ul> <h4 id="error-handling" class="position-relative d-flex align-items-center group"> <span>Error Handling</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="error-handling" aria-haspopup="dialog" aria-label="Share link: Error Handling"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">robust_subscriber</span><span class="p">(</span><span class="n">client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Handle errors gracefully&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">while</span> <span class="kc">True</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">&#34;events&#34;</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_message</span><span class="p">(</span><span class="n">message</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Error processing message: </span><span class="si">{</span><span class="n">e</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">send_to_dlq</span><span class="p">(</span><span class="n">message</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">))</span> </span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">ConnectionError</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="mi">5</span><span class="p">)</span> <span class="c1"># Reconnect delay</span> </span></span></code></pre></div> <h3 id="related-topics" class="position-relative d-flex align-items-center group"> <span>Related Topics</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="related-topics" aria-haspopup="dialog" aria-label="Share link: Related Topics"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3><ul> <li><a href="/tags/events" >Events</a> - Event-driven architecture</li> <li><a href="/tags/streaming" >Streaming</a> - Event streaming</li> <li><a href="/tags/cdc" >CDC</a> - Change data capture</li> <li><a href="/tags/websockets" >WebSockets</a> - Real-time communication</li> </ul> <h3 id="production-patterns" class="position-relative d-flex align-items-center group"> <span>Production Patterns</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="production-patterns" aria-haspopup="dialog" aria-label="Share link: Production Patterns"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="multi-region-pubsub" class="position-relative d-flex align-items-center group"> <span>Multi-Region Pub/Sub</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="multi-region-pubsub" aria-haspopup="dialog" aria-label="Share link: Multi-Region Pub/Sub"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Distribute messages across geographic regions:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-yaml" data-lang="yaml"><span class="line"><span class="cl"><span class="nt">topics</span><span class="p">:</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">global_notifications</span><span class="p">:</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">replicas</span><span class="p">:</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span>- <span class="nt">region</span><span class="p">:</span><span class="w"> </span><span class="l">us-east</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">priority</span><span class="p">:</span><span class="w"> </span><span class="l">primary</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span>- <span class="nt">region</span><span class="p">:</span><span class="w"> </span><span class="l">eu-west</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">priority</span><span class="p">:</span><span class="w"> </span><span class="l">secondary</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span>- <span class="nt">region</span><span class="p">:</span><span class="w"> </span><span class="l">ap-south</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">priority</span><span class="p">:</span><span class="w"> </span><span class="l">secondary</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">replication</span><span class="p">:</span><span class="w"> </span><span class="l">async</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">message_routing</span><span class="p">:</span><span class="w"> </span><span class="l">geo_nearest </span><span class="w"> </span><span class="c"># Route to nearest region</span><span class="w"> </span></span></span></code></pre></div><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Publisher in US publishes to local topic</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">&#34;global_notifications&#34;</span><span class="p">,</span> <span class="n">message</span><span class="p">,</span> <span class="n">region</span><span class="o">=</span><span class="s2">&#34;us-east&#34;</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="k">pass</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># Subscribers in EU/AP receive via replication</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">&#34;global_notifications&#34;</span><span class="p">,</span> <span class="n">region</span><span class="o">=</span><span class="s2">&#34;eu-west&#34;</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">msg</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Receives messages from US with replication lag</span> </span></span><span class="line"><span class="cl"> <span class="n">process_message</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span> </span></span></code></pre></div> <h4 id="message-acknowledgment-strategies" class="position-relative d-flex align-items-center group"> <span>Message Acknowledgment Strategies</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="message-acknowledgment-strategies" aria-haspopup="dialog" aria-label="Share link: Message Acknowledgment Strategies"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Control message delivery and acknowledgment:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">manual_acknowledgment</span><span class="p">():</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Fine-grained control over message acknowledgment&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">&#34;orders&#34;</span><span class="p">,</span> <span class="n">auto_ack</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Process message</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_order</span><span class="p">(</span><span class="n">message</span><span class="o">.</span><span class="n">data</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Explicitly acknowledge</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">message</span><span class="o">.</span><span class="n">ack</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Processed and acked: </span><span class="si">{</span><span class="n">message</span><span class="o">.</span><span class="n">id</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="n">ProcessingError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Negative acknowledgment (requeue)</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">message</span><span class="o">.</span><span class="n">nack</span><span class="p">(</span><span class="n">requeue</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Failed to process </span><span class="si">{</span><span class="n">message</span><span class="o">.</span><span class="n">id</span><span class="si">}</span><span class="s2">: </span><span class="si">{</span><span class="n">e</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="n">FatalError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Dead letter (don&#39;t requeue)</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">message</span><span class="o">.</span><span class="n">nack</span><span class="p">(</span><span class="n">requeue</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">critical</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Fatal error on </span><span class="si">{</span><span class="n">message</span><span class="o">.</span><span class="n">id</span><span class="si">}</span><span class="s2">: </span><span class="si">{</span><span class="n">e</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">)</span> </span></span></code></pre></div> <h4 id="consumer-groups" class="position-relative d-flex align-items-center group"> <span>Consumer Groups</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="consumer-groups" aria-haspopup="dialog" aria-label="Share link: Consumer Groups"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Load balance message processing across consumers:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Consumer group for parallel processing</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">consumer_group_worker</span><span class="p">(</span><span class="n">worker_id</span><span class="p">,</span> <span class="n">group_id</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Worker in consumer group&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;order_processing&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">consumer_group</span><span class="o">=</span><span class="n">group_id</span><span class="p">,</span> <span class="c1"># Messages distributed across group</span> </span></span><span class="line"><span class="cl"> <span class="n">consumer_id</span><span class="o">=</span><span class="n">worker_id</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;Worker </span><span class="si">{</span><span class="n">worker_id</span><span class="si">}</span><span class="s2"> processing </span><span class="si">{</span><span class="n">message</span><span class="o">.</span><span class="n">id</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_order</span><span class="p">(</span><span class="n">message</span><span class="o">.</span><span class="n">data</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">message</span><span class="o">.</span><span class="n">ack</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># Start multiple workers (messages distributed among them)</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">main</span><span class="p">():</span> </span></span><span class="line"><span class="cl"> <span class="n">workers</span> <span class="o">=</span> <span class="p">[</span> </span></span><span class="line"><span class="cl"> <span class="n">consumer_group_worker</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;worker-</span><span class="si">{</span><span class="n">i</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">,</span> <span class="s2">&#34;order-processors&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">10</span><span class="p">)</span> <span class="c1"># 10 workers share the load</span> </span></span><span class="line"><span class="cl"> <span class="p">]</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">gather</span><span class="p">(</span><span class="o">*</span><span class="n">workers</span><span class="p">)</span> </span></span></code></pre></div> <h4 id="ordered-message-processing" class="position-relative d-flex align-items-center group"> <span>Ordered Message Processing</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="ordered-message-processing" aria-haspopup="dialog" aria-label="Share link: Ordered Message Processing"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Maintain message ordering when required:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Publish with partition key (same key → same partition → ordered)</span> </span></span><span class="line"><span class="cl"><span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">&#34;user_events&#34;</span><span class="p">,</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;user_id&#34;</span><span class="p">:</span> <span class="s2">&#34;user123&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;action&#34;</span><span class="p">:</span> <span class="s2">&#34;login&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;timestamp&#34;</span><span class="p">:</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> </span></span><span class="line"><span class="cl"><span class="p">},</span> <span class="n">partition_key</span><span class="o">=</span><span class="s2">&#34;user123&#34;</span><span class="p">)</span> <span class="c1"># All events for user123 ordered</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># Subscribe with ordering guarantee</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;user_events&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">ordering</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">partition_key_filter</span><span class="o">=</span><span class="s2">&#34;user123&#34;</span> <span class="c1"># Process only this user&#39;s events in order</span> </span></span><span class="line"><span class="cl"><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Events for user123 arrive in published order</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_user_event</span><span class="p">(</span><span class="n">event</span><span class="p">)</span> </span></span></code></pre></div> <h3 id="advanced-messaging-patterns" class="position-relative d-flex align-items-center group"> <span>Advanced Messaging Patterns</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="advanced-messaging-patterns" aria-haspopup="dialog" aria-label="Share link: Advanced Messaging Patterns"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="saga-pattern-with-pubsub" class="position-relative d-flex align-items-center group"> <span>Saga Pattern with Pub/Sub</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="saga-pattern-with-pubsub" aria-haspopup="dialog" aria-label="Share link: Saga Pattern with Pub/Sub"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Coordinate distributed transactions:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">OrderSaga</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Distributed transaction using pub/sub&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">client</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">start_order</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">order_data</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Initiate saga&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">saga_id</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Publish order created event</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">&#34;saga_events&#34;</span><span class="p">,</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;saga_id&#34;</span><span class="p">:</span> <span class="n">saga_id</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;type&#34;</span><span class="p">:</span> <span class="s2">&#34;ORDER_CREATED&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;data&#34;</span><span class="p">:</span> <span class="n">order_data</span> </span></span><span class="line"><span class="cl"> <span class="p">})</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">saga_id</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">saga_orchestrator</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Orchestrate saga steps&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">&#34;saga_events&#34;</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">saga_id</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">data</span><span class="p">[</span><span class="s1">&#39;saga_id&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> <span class="n">event_type</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">data</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">event_type</span> <span class="o">==</span> <span class="s2">&#34;ORDER_CREATED&#34;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Step 1: Reserve inventory</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">reserve_inventory</span><span class="p">(</span><span class="n">saga_id</span><span class="p">,</span> <span class="n">event</span><span class="o">.</span><span class="n">data</span><span class="p">[</span><span class="s1">&#39;data&#39;</span><span class="p">])</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event_type</span> <span class="o">==</span> <span class="s2">&#34;INVENTORY_RESERVED&#34;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Step 2: Process payment</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">process_payment</span><span class="p">(</span><span class="n">saga_id</span><span class="p">,</span> <span class="n">event</span><span class="o">.</span><span class="n">data</span><span class="p">[</span><span class="s1">&#39;data&#39;</span><span class="p">])</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event_type</span> <span class="o">==</span> <span class="s2">&#34;PAYMENT_PROCESSED&#34;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Step 3: Ship order</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">ship_order</span><span class="p">(</span><span class="n">saga_id</span><span class="p">,</span> <span class="n">event</span><span class="o">.</span><span class="n">data</span><span class="p">[</span><span class="s1">&#39;data&#39;</span><span class="p">])</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event_type</span> <span class="o">==</span> <span class="s2">&#34;SHIPPING_CONFIRMED&#34;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Saga complete</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">complete_saga</span><span class="p">(</span><span class="n">saga_id</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event_type</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="s2">&#34;_FAILED&#34;</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Compensating actions</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">rollback_saga</span><span class="p">(</span><span class="n">saga_id</span><span class="p">,</span> <span class="n">event_type</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">reserve_inventory</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">saga_id</span><span class="p">,</span> <span class="n">order</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Reserve inventory (Step 1)&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Business logic</span> </span></span><span class="line"><span class="cl"> <span class="n">result</span> <span class="o">=</span> <span class="k">await</span> <span class="n">inventory_service</span><span class="o">.</span><span class="n">reserve</span><span class="p">(</span><span class="n">order</span><span class="p">[</span><span class="s1">&#39;items&#39;</span><span class="p">])</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Publish success event</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">&#34;saga_events&#34;</span><span class="p">,</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;saga_id&#34;</span><span class="p">:</span> <span class="n">saga_id</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;type&#34;</span><span class="p">:</span> <span class="s2">&#34;INVENTORY_RESERVED&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;data&#34;</span><span class="p">:</span> <span class="n">result</span> </span></span><span class="line"><span class="cl"> <span class="p">})</span> </span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Publish failure event</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">&#34;saga_events&#34;</span><span class="p">,</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;saga_id&#34;</span><span class="p">:</span> <span class="n">saga_id</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;type&#34;</span><span class="p">:</span> <span class="s2">&#34;INVENTORY_RESERVATION_FAILED&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;error&#34;</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="p">})</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">rollback_saga</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">saga_id</span><span class="p">,</span> <span class="n">failed_step</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Compensating transactions&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">failed_step</span> <span class="ow">in</span> <span class="p">[</span><span class="s2">&#34;PAYMENT_FAILED&#34;</span><span class="p">,</span> <span class="s2">&#34;SHIPPING_FAILED&#34;</span><span class="p">]:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Release inventory reservation</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">release_inventory</span><span class="p">(</span><span class="n">saga_id</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">failed_step</span> <span class="o">==</span> <span class="s2">&#34;SHIPPING_FAILED&#34;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Refund payment</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">refund_payment</span><span class="p">(</span><span class="n">saga_id</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Mark saga as rolled back</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">&#34;saga_events&#34;</span><span class="p">,</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;saga_id&#34;</span><span class="p">:</span> <span class="n">saga_id</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;type&#34;</span><span class="p">:</span> <span class="s2">&#34;SAGA_ROLLED_BACK&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;reason&#34;</span><span class="p">:</span> <span class="n">failed_step</span> </span></span><span class="line"><span class="cl"> <span class="p">})</span> </span></span></code></pre></div> <h4 id="event-sourcing" class="position-relative d-flex align-items-center group"> <span>Event Sourcing</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="event-sourcing" aria-haspopup="dialog" aria-label="Share link: Event Sourcing"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Store state as sequence of events:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">EventSourcedEntity</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Entity rebuilt from event stream&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">entity_id</span><span class="p">,</span> <span class="n">client</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">entity_id</span> <span class="o">=</span> <span class="n">entity_id</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">client</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="p">{}</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">version</span> <span class="o">=</span> <span class="mi">0</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">load</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Rebuild state from events&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Subscribe to entity&#39;s event stream</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="sa">f</span><span class="s2">&#34;entity_</span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">entity_id</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">from_beginning</span><span class="o">=</span><span class="kc">True</span> <span class="c1"># Replay all events</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">apply_event</span><span class="p">(</span><span class="n">event</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">version</span> <span class="o">+=</span> <span class="mi">1</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="nf">apply_event</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Apply event to state&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;ACCOUNT_CREATED&#39;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="s1">&#39;balance&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;initial_balance&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="s1">&#39;created_at&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;timestamp&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;DEPOSIT&#39;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="s1">&#39;balance&#39;</span><span class="p">]</span> <span class="o">+=</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;amount&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;WITHDRAWAL&#39;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="s1">&#39;balance&#39;</span><span class="p">]</span> <span class="o">-=</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;amount&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">deposit</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">amount</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Command: deposit money&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Validate</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">amount</span> <span class="o">&lt;=</span> <span class="mi">0</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&#34;Amount must be positive&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Publish event</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;entity_</span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">entity_id</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">,</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;type&#34;</span><span class="p">:</span> <span class="s2">&#34;DEPOSIT&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;amount&#34;</span><span class="p">:</span> <span class="n">amount</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;timestamp&#34;</span><span class="p">:</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">(),</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;version&#34;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">version</span> <span class="o">+</span> <span class="mi">1</span> </span></span><span class="line"><span class="cl"> <span class="p">})</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Apply to local state</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="s1">&#39;balance&#39;</span><span class="p">]</span> <span class="o">+=</span> <span class="n">amount</span> </span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">version</span> <span class="o">+=</span> <span class="mi">1</span> </span></span></code></pre></div> <h4 id="cqrs-with-pubsub" class="position-relative d-flex align-items-center group"> <span>CQRS with Pub/Sub</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="cqrs-with-pubsub" aria-haspopup="dialog" aria-label="Share link: CQRS with Pub/Sub"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Separate read and write models:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Write side: publish events</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">create_user</span><span class="p">(</span><span class="n">user_data</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Command: create user (write model)&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">user_id</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Store in write database</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">geode_client</span><span class="o">.</span><span class="n">connection</span><span class="p">()</span> <span class="k">as</span> <span class="n">conn</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">begin</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="s2">&#34;&#34;&#34; </span></span></span><span class="line"><span class="cl"><span class="s2"> CREATE (u:User {id: $id, email: $email, created_at: datetime()}) </span></span></span><span class="line"><span class="cl"><span class="s2"> &#34;&#34;&#34;</span><span class="p">,</span> <span class="p">{</span><span class="s2">&#34;id&#34;</span><span class="p">:</span> <span class="n">user_id</span><span class="p">,</span> <span class="s2">&#34;email&#34;</span><span class="p">:</span> <span class="n">user_data</span><span class="p">[</span><span class="s1">&#39;email&#39;</span><span class="p">]})</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">rollback</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> <span class="k">raise</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Publish event</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">pubsub_client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">&#34;user_events&#34;</span><span class="p">,</span> <span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;type&#34;</span><span class="p">:</span> <span class="s2">&#34;USER_CREATED&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;user_id&#34;</span><span class="p">:</span> <span class="n">user_id</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;data&#34;</span><span class="p">:</span> <span class="n">user_data</span> </span></span><span class="line"><span class="cl"> <span class="p">})</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">user_id</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># Read side: consume events and update read model</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">user_projection_updater</span><span class="p">():</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Update read-optimized view&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">pubsub_client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">&#34;user_events&#34;</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;USER_CREATED&#39;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Update read database (denormalized for fast queries)</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">redis</span><span class="o">.</span><span class="n">hset</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="sa">f</span><span class="s2">&#34;user:</span><span class="si">{</span><span class="n">event</span><span class="p">[</span><span class="s1">&#39;user_id&#39;</span><span class="p">]</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">mapping</span><span class="o">=</span><span class="p">{</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;email&#34;</span><span class="p">:</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;data&#39;</span><span class="p">][</span><span class="s1">&#39;email&#39;</span><span class="p">],</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;created_at&#34;</span><span class="p">:</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;data&#39;</span><span class="p">][</span><span class="s1">&#39;created_at&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> <span class="p">}</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;USER_UPDATED&#39;</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Update read model</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">redis</span><span class="o">.</span><span class="n">hset</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="sa">f</span><span class="s2">&#34;user:</span><span class="si">{</span><span class="n">event</span><span class="p">[</span><span class="s1">&#39;user_id&#39;</span><span class="p">]</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">mapping</span><span class="o">=</span><span class="n">event</span><span class="p">[</span><span class="s1">&#39;data&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># Read from read model (fast)</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">get_user</span><span class="p">(</span><span class="n">user_id</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Query: get user (read model)&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="k">await</span> <span class="n">redis</span><span class="o">.</span><span class="n">hgetall</span><span class="p">(</span><span class="sa">f</span><span class="s2">&#34;user:</span><span class="si">{</span><span class="n">user_id</span><span class="si">}</span><span class="s2">&#34;</span><span class="p">)</span> </span></span></code></pre></div> <h3 id="performance-tuning" class="position-relative d-flex align-items-center group"> <span>Performance Tuning</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="performance-tuning" aria-haspopup="dialog" aria-label="Share link: Performance Tuning"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="batch-publishing" class="position-relative d-flex align-items-center group"> <span>Batch Publishing</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="batch-publishing" aria-haspopup="dialog" aria-label="Share link: Batch Publishing"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Improve throughput with batching:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">batch_publish_example</span><span class="p">():</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Publish messages in batches&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">messages</span> <span class="o">=</span> <span class="p">[</span> </span></span><span class="line"><span class="cl"> <span class="p">{</span><span class="s2">&#34;user_id&#34;</span><span class="p">:</span> <span class="n">i</span><span class="p">,</span> <span class="s2">&#34;action&#34;</span><span class="p">:</span> <span class="s2">&#34;click&#34;</span><span class="p">,</span> <span class="s2">&#34;timestamp&#34;</span><span class="p">:</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()}</span> </span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">10000</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="p">]</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Batch publish (100x faster than individual publishes)</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish_batch</span><span class="p">(</span><span class="s2">&#34;user_actions&#34;</span><span class="p">,</span> <span class="n">messages</span><span class="p">,</span> <span class="n">batch_size</span><span class="o">=</span><span class="mi">100</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Publishes 100 messages per network round-trip</span> </span></span></code></pre></div> <h4 id="message-compression" class="position-relative d-flex align-items-center group"> <span>Message Compression</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="message-compression" aria-haspopup="dialog" aria-label="Share link: Message Compression"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Reduce network bandwidth:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Enable compression for large messages</span> </span></span><span class="line"><span class="cl"><span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;analytics_data&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">large_payload</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">compression</span><span class="o">=</span><span class="s2">&#34;lz4&#34;</span> <span class="c1"># Fast compression algorithm</span> </span></span><span class="line"><span class="cl"><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># Compression reduces bandwidth by 70-90% for JSON payloads</span> </span></span></code></pre></div> <h4 id="topic-sharding" class="position-relative d-flex align-items-center group"> <span>Topic Sharding</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="topic-sharding" aria-haspopup="dialog" aria-label="Share link: Topic Sharding"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Scale topics horizontally:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-yaml" data-lang="yaml"><span class="line"><span class="cl"><span class="c"># Shard topic across multiple partitions</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="nt">topics</span><span class="p">:</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">high_volume_events</span><span class="p">:</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">partitions</span><span class="p">:</span><span class="w"> </span><span class="m">64</span><span class="w"> </span><span class="c"># 64 shards for parallel processing</span><span class="w"> </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">partition_key</span><span class="p">:</span><span class="w"> </span><span class="l">user_id </span><span class="w"> </span><span class="c"># Partition by user ID</span><span class="w"> </span></span></span></code></pre></div><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Messages automatically sharded by partition key</span> </span></span><span class="line"><span class="cl"><span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;high_volume_events&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">message</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">partition_key</span><span class="o">=</span><span class="n">user_id</span> <span class="c1"># Determines which partition</span> </span></span><span class="line"><span class="cl"><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># Subscribers process partitions in parallel</span> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">consume_partition</span><span class="p">(</span><span class="n">partition_id</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;high_volume_events&#34;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">partition</span><span class="o">=</span><span class="n">partition_id</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">msg</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_message</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="c1"># 64 workers, each processing one partition</span> </span></span><span class="line"><span class="cl"><span class="n">workers</span> <span class="o">=</span> <span class="p">[</span><span class="n">consume_partition</span><span class="p">(</span><span class="n">i</span><span class="p">)</span> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">64</span><span class="p">)]</span> </span></span><span class="line"><span class="cl"><span class="k">await</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">gather</span><span class="p">(</span><span class="o">*</span><span class="n">workers</span><span class="p">)</span> </span></span></code></pre></div> <h3 id="monitoring-and-operations" class="position-relative d-flex align-items-center group"> <span>Monitoring and Operations</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="monitoring-and-operations" aria-haspopup="dialog" aria-label="Share link: Monitoring and Operations"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3> <h4 id="message-flow-metrics" class="position-relative d-flex align-items-center group"> <span>Message Flow Metrics</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="message-flow-metrics" aria-haspopup="dialog" aria-label="Share link: Message Flow Metrics"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Track message flow through system:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">prometheus_client</span> <span class="kn">import</span> <span class="n">Counter</span><span class="p">,</span> <span class="n">Histogram</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="n">messages_published</span> <span class="o">=</span> <span class="n">Counter</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;pubsub_messages_published_total&#39;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;Total messages published&#39;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="p">[</span><span class="s1">&#39;topic&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="n">messages_consumed</span> <span class="o">=</span> <span class="n">Counter</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;pubsub_messages_consumed_total&#39;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;Total messages consumed&#39;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="p">[</span><span class="s1">&#39;topic&#39;</span><span class="p">,</span> <span class="s1">&#39;consumer_group&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="n">message_latency</span> <span class="o">=</span> <span class="n">Histogram</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;pubsub_message_latency_seconds&#39;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="s1">&#39;Message end-to-end latency&#39;</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="p">[</span><span class="s1">&#39;topic&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">publish_with_metrics</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span> <span class="n">message</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Publish with metrics tracking&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="n">start_time</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> <span class="n">message</span><span class="p">[</span><span class="s1">&#39;_published_at&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">start_time</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span> <span class="n">message</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="n">messages_published</span><span class="o">.</span><span class="n">labels</span><span class="p">(</span><span class="n">topic</span><span class="o">=</span><span class="n">topic</span><span class="p">)</span><span class="o">.</span><span class="n">inc</span><span class="p">()</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">consume_with_metrics</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span> <span class="n">consumer_group</span><span class="p">):</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Consume with metrics tracking&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span> <span class="n">consumer_group</span><span class="o">=</span><span class="n">consumer_group</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">msg</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Track latency</span> </span></span><span class="line"><span class="cl"> <span class="n">latency</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">-</span> <span class="n">msg</span><span class="o">.</span><span class="n">data</span><span class="p">[</span><span class="s1">&#39;_published_at&#39;</span><span class="p">]</span> </span></span><span class="line"><span class="cl"> <span class="n">message_latency</span><span class="o">.</span><span class="n">labels</span><span class="p">(</span><span class="n">topic</span><span class="o">=</span><span class="n">topic</span><span class="p">)</span><span class="o">.</span><span class="n">observe</span><span class="p">(</span><span class="n">latency</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Process</span> </span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_message</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="n">messages_consumed</span><span class="o">.</span><span class="n">labels</span><span class="p">(</span> </span></span><span class="line"><span class="cl"> <span class="n">topic</span><span class="o">=</span><span class="n">topic</span><span class="p">,</span> </span></span><span class="line"><span class="cl"> <span class="n">consumer_group</span><span class="o">=</span><span class="n">consumer_group</span> </span></span><span class="line"><span class="cl"> <span class="p">)</span><span class="o">.</span><span class="n">inc</span><span class="p">()</span> </span></span></code></pre></div> <h4 id="health-checks" class="position-relative d-flex align-items-center group"> <span>Health Checks</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="health-checks" aria-haspopup="dialog" aria-label="Share link: Health Checks"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h4><p>Monitor pub/sub system health:</p> <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">health_check</span><span class="p">():</span> </span></span><span class="line"><span class="cl"> <span class="s2">&#34;&#34;&#34;Check pub/sub system health&#34;&#34;&#34;</span> </span></span><span class="line"><span class="cl"> <span class="c1"># Check topic lag</span> </span></span><span class="line"><span class="cl"> <span class="n">lag</span> <span class="o">=</span> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">get_consumer_lag</span><span class="p">(</span><span class="s2">&#34;critical_events&#34;</span><span class="p">,</span> <span class="s2">&#34;processors&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">lag</span> <span class="o">&gt;</span> <span class="mi">10000</span><span class="p">:</span> </span></span><span class="line"><span class="cl"> <span class="n">alert</span><span class="p">(</span><span class="s2">&#34;High consumer lag: </span><span class="si">{}</span><span class="s2">&#34;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">lag</span><span class="p">))</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Check error rates</span> </span></span><span class="line"><span class="cl"> <span class="n">error_rate</span> <span class="o">=</span> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">get_error_rate</span><span class="p">(</span><span class="s2">&#34;critical_events&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">error_rate</span> <span class="o">&gt;</span> <span class="mf">0.01</span><span class="p">:</span> <span class="c1"># &gt;1% error rate</span> </span></span><span class="line"><span class="cl"> <span class="n">alert</span><span class="p">(</span><span class="s2">&#34;High error rate: </span><span class="si">{:.2%}</span><span class="s2">&#34;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">error_rate</span><span class="p">))</span> </span></span><span class="line"><span class="cl"> </span></span><span class="line"><span class="cl"> <span class="c1"># Check throughput</span> </span></span><span class="line"><span class="cl"> <span class="n">throughput</span> <span class="o">=</span> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">get_throughput</span><span class="p">(</span><span class="s2">&#34;critical_events&#34;</span><span class="p">)</span> </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">throughput</span> <span class="o">&lt;</span> <span class="mi">100</span><span class="p">:</span> <span class="c1"># &lt;100 msgs/sec</span> </span></span><span class="line"><span class="cl"> <span class="n">alert</span><span class="p">(</span><span class="s2">&#34;Low throughput: </span><span class="si">{}</span><span class="s2"> msgs/sec&#34;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">throughput</span><span class="p">))</span> </span></span></code></pre></div> <h3 id="learn-more" class="position-relative d-flex align-items-center group"> <span>Learn More</span> <button type="button" class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1" data-share-target="learn-more" aria-haspopup="dialog" aria-label="Share link: Learn More"> <i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i> <span class="visually-hidden">Share link</span> </button> </h3><ul> <li><a href="https://docs.microsoft.com/en-us/azure/architecture/patterns/publisher-subscriber" aria-label="Pub/Sub Patterns – opens in new window" target="_blank" rel="noopener noreferrer" >Pub/Sub Patterns <span aria-hidden="true" class="external-icon">↗</span> </a> </li> <li><a href="https://cloud.google.com/pubsub/docs" aria-label="Google Cloud Pub/Sub – opens in new window" target="_blank" rel="noopener noreferrer" >Google Cloud Pub/Sub <span aria-hidden="true" class="external-icon">↗</span> </a> </li> <li><a href="https://kafka.apache.org/" aria-label="Apache Kafka – opens in new window" target="_blank" rel="noopener noreferrer" >Apache Kafka <span aria-hidden="true" class="external-icon">↗</span> </a> </li> <li><a href="https://martinfowler.com/articles/201701-event-driven.html" aria-label="Event-Driven Architecture – opens in new window" target="_blank" rel="noopener noreferrer" >Event-Driven Architecture <span aria-hidden="true" class="external-icon">↗</span> </a> </li> <li><a href="https://microservices.io/patterns/data/saga.html" aria-label="Saga Pattern – opens in new window" target="_blank" rel="noopener noreferrer" >Saga Pattern <span aria-hidden="true" class="external-icon">↗</span> </a> </li> <li><a href="https://martinfowler.com/eaaDev/EventSourcing.html" aria-label="Event Sourcing – opens in new window" target="_blank" rel="noopener noreferrer" >Event Sourcing <span aria-hidden="true" class="external-icon">↗</span> </a> </li> </ul>

Related Articles

No articles found with this tag yet.

Back to Home