<!-- CANARY: REQ=REQ-DOCS-001; FEATURE="Docs"; ASPECT=Documentation; STATUS=TESTED; OWNER=docs; UPDATED=2026-01-15 -->
<h2 id="publish-subscribe-messaging" class="position-relative d-flex align-items-center group">
<span>Publish-Subscribe Messaging</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="publish-subscribe-messaging"
aria-haspopup="dialog"
aria-label="Share link: Publish-Subscribe Messaging">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h2><div id="headingShareModal" class="heading-share-modal" role="dialog" aria-modal="true" aria-labelledby="headingShareTitle" hidden>
<div class="hsm-dialog" role="document">
<div class="hsm-header">
<h2 id="headingShareTitle" class="h6 mb-0 fw-bold">Share this section</h2>
<button type="button" class="hsm-close" aria-label="Close">
<i class="fa-solid fa-xmark"></i>
</button>
</div>
<div class="hsm-body">
<label for="headingShareInput" class="form-label small text-muted mb-1 text-uppercase fw-bold" style="font-size: 0.7rem; letter-spacing: 0.5px;">Permalink</label>
<div class="input-group mb-4 hsm-url-group">
<input id="headingShareInput" type="text" class="form-control font-monospace" readonly aria-readonly="true" style="font-size: 0.85rem;" />
<button class="btn btn-primary hsm-copy" type="button" aria-label="Copy" title="Copy">
<i class="fa-duotone fa-clipboard" aria-hidden="true"></i>
</button>
</div>
<div class="small fw-bold mb-2 text-muted text-uppercase" style="font-size: 0.7rem; letter-spacing: 0.5px;">Share via</div>
<div class="hsm-share-grid">
<a id="share-twitter" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer">
<i class="fa-brands fa-twitter me-2"></i>Twitter
</a>
<a id="share-linkedin" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer">
<i class="fa-brands fa-linkedin me-2"></i>LinkedIn
</a>
<a id="share-facebook" class="btn btn-outline-secondary w-100" target="_blank" rel="noopener noreferrer">
<i class="fa-brands fa-facebook me-2"></i>Facebook
</a>
</div>
</div>
</div>
</div>
<style>
.heading-share-modal {
position: fixed;
inset: 0;
display: flex;
justify-content: center;
align-items: center;
background: rgba(0, 0, 0, 0.6);
z-index: 1050;
padding: 1rem;
backdrop-filter: blur(4px);
-webkit-backdrop-filter: blur(4px);
}
.heading-share-modal[hidden] { display: none !important; }
.hsm-dialog {
max-width: 420px;
width: 100%;
background: var(--bs-body-bg, #fff);
color: var(--bs-body-color, #212529);
border: 1px solid var(--bs-border-color, rgba(0,0,0,0.1));
border-radius: 1rem;
box-shadow: 0 25px 50px -12px rgba(0, 0, 0, 0.25);
overflow: hidden;
animation: hsm-fade-in 0.2s ease-out;
}
@keyframes hsm-fade-in {
from { opacity: 0; transform: scale(0.95); }
to { opacity: 1; transform: scale(1); }
}
[data-bs-theme="dark"] .hsm-dialog {
background: #1e293b;
border-color: rgba(255,255,255,0.1);
color: #f8f9fa;
}
.hsm-header {
display: flex;
justify-content: space-between;
align-items: center;
padding: 1rem 1.5rem;
border-bottom: 1px solid var(--bs-border-color, rgba(0,0,0,0.1));
background: rgba(0,0,0,0.02);
}
[data-bs-theme="dark"] .hsm-header {
background: rgba(255,255,255,0.02);
border-color: rgba(255,255,255,0.1);
}
.hsm-close {
background: transparent;
border: none;
color: inherit;
opacity: 0.5;
padding: 0.25rem 0.5rem;
border-radius: 0.25rem;
font-size: 1.2rem;
line-height: 1;
transition: opacity 0.2s;
}
.hsm-close:hover {
opacity: 1;
}
.hsm-body {
padding: 1.5rem;
}
.hsm-url-group {
display: flex !important;
align-items: stretch;
}
.hsm-url-group .form-control {
flex: 1;
min-width: 0;
margin: 0;
background: var(--bs-secondary-bg, #f8f9fa);
border-color: var(--bs-border-color, #dee2e6);
border-top-right-radius: 0;
border-bottom-right-radius: 0;
height: 42px;
}
.hsm-url-group .btn {
flex: 0 0 auto;
margin: 0;
margin-left: -1px;
border-top-left-radius: 0;
border-bottom-left-radius: 0;
height: 42px;
display: flex;
align-items: center;
justify-content: center;
padding: 0 1.25rem;
z-index: 2;
}
[data-bs-theme="dark"] .hsm-url-group .form-control {
background: #0f172a;
border-color: #334155;
color: #e2e8f0;
}
.hsm-share-grid {
display: flex;
flex-direction: column;
gap: 0.5rem;
}
.hsm-share-grid .btn {
display: flex;
align-items: center;
justify-content: center;
font-size: 0.9rem;
padding: 0.6rem;
border-color: var(--bs-border-color);
width: 100%;
}
[data-bs-theme="dark"] .hsm-share-grid .btn {
color: #e2e8f0;
border-color: #475569;
}
[data-bs-theme="dark"] .hsm-share-grid .btn:hover {
background: #334155;
border-color: #cbd5e1;
}
</style>
<script>
(function(){
const modal = document.getElementById('headingShareModal');
if(!modal) return;
const input = modal.querySelector('#headingShareInput');
const copyBtn = modal.querySelector('.hsm-copy');
const twitter = modal.querySelector('#share-twitter');
const linkedin = modal.querySelector('#share-linkedin');
const facebook = modal.querySelector('#share-facebook');
const closeBtn = modal.querySelector('.hsm-close');
let lastFocus=null;
let trapBound=false;
function buildUrl(id){ return window.location.origin + window.location.pathname + '#' + id; }
function isOpen(){ return !modal.hasAttribute('hidden'); }
function hydrate(id){
const url=buildUrl(id);
input.value=url;
const enc=encodeURIComponent(url);
const text=encodeURIComponent(document.title);
if(twitter) twitter.href=`https://twitter.com/intent/tweet?url=${enc}&text=${text}`;
if(linkedin) linkedin.href=`https://www.linkedin.com/sharing/share-offsite/?url=${enc}`;
if(facebook) facebook.href=`https://www.facebook.com/sharer/sharer.php?u=${enc}`;
}
function openModal(id){
lastFocus=document.activeElement;
hydrate(id);
if(!isOpen()){
modal.removeAttribute('hidden');
}
requestAnimationFrame(()=>{ input.focus(); });
trapFocus();
}
function closeModal(){
if(!isOpen()) return;
modal.setAttribute('hidden','');
if(lastFocus && typeof lastFocus.focus==='function') lastFocus.focus();
}
function copyCurrent(){
try{ navigator.clipboard.writeText(input.value).then(()=>feedback(true),()=>fallback()); }
catch(e){ fallback(); }
}
function fallback(){ input.select(); try{ document.execCommand('copy'); feedback(true);}catch(e){ feedback(false);} }
function feedback(ok){ if(!copyBtn) return; const icon=copyBtn.querySelector('i'); if(!icon) return; const prev=copyBtn.getAttribute('data-prev')||icon.className; if(!copyBtn.getAttribute('data-prev')) copyBtn.setAttribute('data-prev',prev); icon.className= ok ? 'fa-duotone fa-clipboard-check':'fa-duotone fa-circle-exclamation'; setTimeout(()=>{ icon.className=prev; },1800); }
function handleShareClick(e){ e.preventDefault(); const btn=e.currentTarget; const id=btn.getAttribute('data-share-target'); if(id) openModal(id); }
function bindShareButtons(){
document.querySelectorAll('.h-share').forEach(btn=>{
if(!btn.dataset.hShareBound){ btn.addEventListener('click', handleShareClick); btn.dataset.hShareBound='1'; }
});
}
bindShareButtons();
if(document.readyState==='loading'){
document.addEventListener('DOMContentLoaded', bindShareButtons);
} else {
requestAnimationFrame(bindShareButtons);
}
document.addEventListener('click', function(e){
const shareBtn=e.target.closest && e.target.closest('.h-share');
if(shareBtn && !shareBtn.dataset.hShareBound){ handleShareClick.call(shareBtn, e); }
}, true);
document.addEventListener('click', e=>{
if(e.target===modal) closeModal();
if(e.target.closest && e.target.closest('.hsm-close')){ e.preventDefault(); closeModal(); }
if(copyBtn && (e.target===copyBtn || (e.target.closest && e.target.closest('.hsm-copy')))) { e.preventDefault(); copyCurrent(); }
});
document.addEventListener('keydown', e=>{ if(e.key==='Escape' && isOpen()) closeModal(); });
function trapFocus(){
if(trapBound) return;
trapBound=true;
modal.addEventListener('keydown', f=>{ if(f.key==='Tab' && isOpen()){ const focusable=[...modal.querySelectorAll('a[href],button,input,textarea,select,[tabindex]:not([tabindex="-1"])')].filter(el=>!el.hasAttribute('disabled')); if(!focusable.length) return; const first=focusable[0]; const last=focusable[focusable.length-1]; if(f.shiftKey && document.activeElement===first){ f.preventDefault(); last.focus(); } else if(!f.shiftKey && document.activeElement===last){ f.preventDefault(); first.focus(); } } });
}
if(closeBtn) closeBtn.addEventListener('click', e=>{ e.preventDefault(); closeModal(); });
})();
</script><p>Publish-Subscribe (Pub/Sub) is a messaging pattern where publishers send messages to topics without knowing who will receive them, and subscribers receive messages from topics they’re interested in without knowing who sent them. This decoupling enables scalable, flexible, event-driven architectures.</p>
<h3 id="pubsub-fundamentals" class="position-relative d-flex align-items-center group">
<span>Pub/Sub Fundamentals</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="pubsub-fundamentals"
aria-haspopup="dialog"
aria-label="Share link: Pub/Sub Fundamentals">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="core-concepts" class="position-relative d-flex align-items-center group">
<span>Core Concepts</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="core-concepts"
aria-haspopup="dialog"
aria-label="Share link: Core Concepts">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p><strong>Publishers</strong> - Send messages to topics
<strong>Subscribers</strong> - Receive messages from topics
<strong>Topics</strong> - Named channels for message distribution
<strong>Messages</strong> - Data packets being exchanged</p>
<h4 id="benefits" class="position-relative d-flex align-items-center group">
<span>Benefits</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="benefits"
aria-haspopup="dialog"
aria-label="Share link: Benefits">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><ul>
<li><strong>Decoupling</strong> - Publishers and subscribers are independent</li>
<li><strong>Scalability</strong> - Add subscribers without affecting publishers</li>
<li><strong>Flexibility</strong> - Dynamic subscription management</li>
<li><strong>Fan-Out</strong> - One message reaches multiple subscribers</li>
</ul>
<h3 id="geode-pubsub" class="position-relative d-flex align-items-center group">
<span>Geode Pub/Sub</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="geode-pubsub"
aria-haspopup="dialog"
aria-label="Share link: Geode Pub/Sub">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="creating-topics" class="position-relative d-flex align-items-center group">
<span>Creating Topics</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="creating-topics"
aria-haspopup="dialog"
aria-label="Share link: Creating Topics">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Define topics for message distribution:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Create</span><span class="w"> </span><span class="py">topic</span><span class="w"> </span><span class="py">for</span><span class="w"> </span><span class="py">user</span><span class="w"> </span><span class="py">events</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">TOPIC</span><span class="w"> </span><span class="py">user_events</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WITH</span><span class="w"> </span><span class="py">OPTIONS</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">retention_period</span><span class="p">:</span><span class="w"> </span><span class="err">'</span><span class="nc">7</span><span class="w"> </span><span class="py">days</span><span class="err">'</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">message_ttl</span><span class="p">:</span><span class="w"> </span><span class="err">'</span><span class="nc">24</span><span class="w"> </span><span class="py">hours</span><span class="err">'</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">max_subscribers</span><span class="p">:</span><span class="w"> </span><span class="nc">100</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="err">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="err">--</span><span class="w"> </span><span class="py">Create</span><span class="w"> </span><span class="py">topic</span><span class="w"> </span><span class="py">with</span><span class="w"> </span><span class="py">message</span><span class="w"> </span><span class="kd">schema</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">CREATE</span><span class="w"> </span><span class="py">TOPIC</span><span class="w"> </span><span class="py">order_notifications</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kc">SCHEMA</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">order_id</span><span class="w"> </span><span class="py">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">status</span><span class="w"> </span><span class="py">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">timestamp</span><span class="w"> </span><span class="py">TIMESTAMP</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">details</span><span class="w"> </span><span class="py">MAP</span><span class="err"><</span><span class="py">STRING</span><span class="p">,</span><span class="w"> </span><span class="py">ANY</span><span class="err">></span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div>
<h4 id="publishing-messages" class="position-relative d-flex align-items-center group">
<span>Publishing Messages</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="publishing-messages"
aria-haspopup="dialog"
aria-label="Share link: Publishing Messages">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Publish to topics:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">geode_client</span> <span class="kn">import</span> <span class="n">Client</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">publish_user_event</span><span class="p">(</span><span class="n">client</span><span class="p">,</span> <span class="n">user_id</span><span class="p">,</span> <span class="n">event_type</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Publish user event to topic"""</span>
</span></span><span class="line"><span class="cl"> <span class="n">message</span> <span class="o">=</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"user_id"</span><span class="p">:</span> <span class="n">user_id</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"event_type"</span><span class="p">:</span> <span class="n">event_type</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"timestamp"</span><span class="p">:</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span><span class="o">.</span><span class="n">isoformat</span><span class="p">(),</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"metadata"</span><span class="p">:</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"source"</span><span class="p">:</span> <span class="s2">"user-service"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"version"</span><span class="p">:</span> <span class="s2">"1.0"</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">"user_events"</span><span class="p">,</span> <span class="n">message</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="nb">print</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Published: </span><span class="si">{</span><span class="n">event_type</span><span class="si">}</span><span class="s2"> for user </span><span class="si">{</span><span class="n">user_id</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
</span></span></code></pre></div>
<h4 id="subscribing-to-topics" class="position-relative d-flex align-items-center group">
<span>Subscribing to Topics</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="subscribing-to-topics"
aria-haspopup="dialog"
aria-label="Share link: Subscribing to Topics">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Subscribe to receive messages:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">subscribe_to_user_events</span><span class="p">(</span><span class="n">client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Subscribe to user event topic"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">"user_events"</span><span class="p">)</span> <span class="k">as</span> <span class="n">subscription</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">subscription</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">handle_user_event</span><span class="p">(</span><span class="n">message</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">handle_user_event</span><span class="p">(</span><span class="n">message</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Process user event"""</span>
</span></span><span class="line"><span class="cl"> <span class="n">event_type</span> <span class="o">=</span> <span class="n">message</span><span class="p">[</span><span class="s2">"event_type"</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"> <span class="n">user_id</span> <span class="o">=</span> <span class="n">message</span><span class="p">[</span><span class="s2">"user_id"</span><span class="p">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">event_type</span> <span class="o">==</span> <span class="s2">"signup"</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">send_welcome_email</span><span class="p">(</span><span class="n">user_id</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event_type</span> <span class="o">==</span> <span class="s2">"purchase"</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">update_analytics</span><span class="p">(</span><span class="n">user_id</span><span class="p">,</span> <span class="n">message</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event_type</span> <span class="o">==</span> <span class="s2">"churn"</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">trigger_retention_campaign</span><span class="p">(</span><span class="n">user_id</span><span class="p">)</span>
</span></span></code></pre></div>
<h3 id="messaging-patterns" class="position-relative d-flex align-items-center group">
<span>Messaging Patterns</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="messaging-patterns"
aria-haspopup="dialog"
aria-label="Share link: Messaging Patterns">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="fan-out-pattern" class="position-relative d-flex align-items-center group">
<span>Fan-Out Pattern</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="fan-out-pattern"
aria-haspopup="dialog"
aria-label="Share link: Fan-Out Pattern">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>One message to many subscribers:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Publisher</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">notify_all</span><span class="p">(</span><span class="n">client</span><span class="p">,</span> <span class="n">notification</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Send notification to all subscribers"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">"notifications"</span><span class="p">,</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"title"</span><span class="p">:</span> <span class="n">notification</span><span class="o">.</span><span class="n">title</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"body"</span><span class="p">:</span> <span class="n">notification</span><span class="o">.</span><span class="n">body</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"priority"</span><span class="p">:</span> <span class="s2">"high"</span>
</span></span><span class="line"><span class="cl"> <span class="p">})</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Multiple subscribers receive the same message</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">email_subscriber</span><span class="p">(</span><span class="n">client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">"notifications"</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">msg</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">send_email</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">sms_subscriber</span><span class="p">(</span><span class="n">client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">"notifications"</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">msg</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">send_sms</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">push_subscriber</span><span class="p">(</span><span class="n">client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">"notifications"</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">msg</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">send_push_notification</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span>
</span></span></code></pre></div>
<h4 id="topic-filtering" class="position-relative d-flex align-items-center group">
<span>Topic Filtering</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="topic-filtering"
aria-haspopup="dialog"
aria-label="Share link: Topic Filtering">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Subscribe with filters:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="err">--</span><span class="w"> </span><span class="py">Subscribe</span><span class="w"> </span><span class="kd">on</span><span class="py">ly</span><span class="w"> </span><span class="py">to</span><span class="w"> </span><span class="py">high</span><span class="err">-</span><span class="py">priority</span><span class="w"> </span><span class="py">orders</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SUBSCRIBE</span><span class="w"> </span><span class="py">TO</span><span class="w"> </span><span class="py">order_notifications</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WHERE</span><span class="w"> </span><span class="py">priority</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="err">'</span><span class="py">high</span><span class="err">';</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="err">--</span><span class="w"> </span><span class="py">Subscribe</span><span class="w"> </span><span class="py">to</span><span class="w"> </span><span class="py">specific</span><span class="w"> </span><span class="py">user</span><span class="err">'</span><span class="py">s</span><span class="w"> </span><span class="py">events</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">SUBSCRIBE</span><span class="w"> </span><span class="py">TO</span><span class="w"> </span><span class="py">user_events</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WHERE</span><span class="w"> </span><span class="py">user_id</span><span class="w"> </span><span class="p">=</span><span class="w"> </span><span class="err">'</span><span class="py">user123</span><span class="err">';</span><span class="w">
</span></span></span></code></pre></div><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">filtered_subscription</span><span class="p">(</span><span class="n">client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Subscribe with filter"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"order_notifications"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="nb">filter</span><span class="o">=</span><span class="s2">"priority = 'high'"</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span> <span class="k">as</span> <span class="n">subscription</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">subscription</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">handle_urgent_order</span><span class="p">(</span><span class="n">message</span><span class="p">)</span>
</span></span></code></pre></div>
<h4 id="request-reply-pattern" class="position-relative d-flex align-items-center group">
<span>Request-Reply Pattern</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="request-reply-pattern"
aria-haspopup="dialog"
aria-label="Share link: Request-Reply Pattern">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Implement RPC over pub/sub:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">uuid</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">RPCClient</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">client</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">pending_requests</span> <span class="o">=</span> <span class="p">{}</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">reply_topic</span> <span class="o">=</span> <span class="sa">f</span><span class="s2">"replies_</span><span class="si">{</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">()</span><span class="si">}</span><span class="s2">"</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">call</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">method</span><span class="p">,</span> <span class="n">params</span><span class="p">,</span> <span class="n">timeout</span><span class="o">=</span><span class="mf">5.0</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Make RPC call"""</span>
</span></span><span class="line"><span class="cl"> <span class="n">request_id</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Set up reply handler</span>
</span></span><span class="line"><span class="cl"> <span class="n">reply_future</span> <span class="o">=</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">Future</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">pending_requests</span><span class="p">[</span><span class="n">request_id</span><span class="p">]</span> <span class="o">=</span> <span class="n">reply_future</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Publish request</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">"rpc_requests"</span><span class="p">,</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"id"</span><span class="p">:</span> <span class="n">request_id</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"method"</span><span class="p">:</span> <span class="n">method</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"params"</span><span class="p">:</span> <span class="n">params</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"reply_to"</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">reply_topic</span>
</span></span><span class="line"><span class="cl"> <span class="p">})</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Wait for reply</span>
</span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">result</span> <span class="o">=</span> <span class="k">await</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">wait_for</span><span class="p">(</span><span class="n">reply_future</span><span class="p">,</span> <span class="n">timeout</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">result</span>
</span></span><span class="line"><span class="cl"> <span class="k">finally</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">del</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_requests</span><span class="p">[</span><span class="n">request_id</span><span class="p">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">listen_for_replies</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Listen for RPC replies"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">reply_topic</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">request_id</span> <span class="o">=</span> <span class="n">message</span><span class="p">[</span><span class="s2">"request_id"</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">request_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">pending_requests</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">pending_requests</span><span class="p">[</span><span class="n">request_id</span><span class="p">]</span><span class="o">.</span><span class="n">set_result</span><span class="p">(</span><span class="n">message</span><span class="p">[</span><span class="s2">"result"</span><span class="p">])</span>
</span></span></code></pre></div>
<h4 id="topic-hierarchies" class="position-relative d-flex align-items-center group">
<span>Topic Hierarchies</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="topic-hierarchies"
aria-haspopup="dialog"
aria-label="Share link: Topic Hierarchies">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Organize topics hierarchically:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-fallback" data-lang="fallback"><span class="line"><span class="cl">users/
</span></span><span class="line"><span class="cl"> users/created
</span></span><span class="line"><span class="cl"> users/updated
</span></span><span class="line"><span class="cl"> users/deleted
</span></span><span class="line"><span class="cl"> users/verified
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl">orders/
</span></span><span class="line"><span class="cl"> orders/placed
</span></span><span class="line"><span class="cl"> orders/shipped
</span></span><span class="line"><span class="cl"> orders/delivered
</span></span><span class="line"><span class="cl"> orders/canceled
</span></span></code></pre></div><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Subscribe to all user events</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">"users/*"</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">handle_user_event</span><span class="p">(</span><span class="n">message</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Subscribe to specific event type</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">"users/created"</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">handle_new_user</span><span class="p">(</span><span class="n">message</span><span class="p">)</span>
</span></span></code></pre></div>
<h3 id="message-delivery-guarantees" class="position-relative d-flex align-items-center group">
<span>Message Delivery Guarantees</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="message-delivery-guarantees"
aria-haspopup="dialog"
aria-label="Share link: Message Delivery Guarantees">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="at-most-once" class="position-relative d-flex align-items-center group">
<span>At-Most-Once</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="at-most-once"
aria-haspopup="dialog"
aria-label="Share link: At-Most-Once">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Message delivered zero or one time:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">CREATE</span><span class="w"> </span><span class="py">TOPIC</span><span class="w"> </span><span class="py">notifications</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WITH</span><span class="w"> </span><span class="py">DELIVERY</span><span class="w"> </span><span class="py">at_most_once</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div><ul>
<li>Fastest performance</li>
<li>No retries</li>
<li>Suitable for non-critical notifications</li>
</ul>
<h4 id="at-least-once" class="position-relative d-flex align-items-center group">
<span>At-Least-Once</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="at-least-once"
aria-haspopup="dialog"
aria-label="Share link: At-Least-Once">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Message delivered one or more times:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">CREATE</span><span class="w"> </span><span class="py">TOPIC</span><span class="w"> </span><span class="py">orders</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WITH</span><span class="w"> </span><span class="py">DELIVERY</span><span class="w"> </span><span class="py">at_least_once</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div><ul>
<li>Messages may be duplicated</li>
<li>Subscribers must be idempotent</li>
<li>Suitable for most applications</li>
</ul>
<h4 id="exactly-once" class="position-relative d-flex align-items-center group">
<span>Exactly-Once</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="exactly-once"
aria-haspopup="dialog"
aria-label="Share link: Exactly-Once">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Message delivered exactly one time:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">CREATE</span><span class="w"> </span><span class="py">TOPIC</span><span class="w"> </span><span class="py">transactions</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WITH</span><span class="w"> </span><span class="py">DELIVERY</span><span class="w"> </span><span class="py">exactly_once</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div><ul>
<li>Highest latency</li>
<li>No duplicates</li>
<li>Suitable for financial transactions</li>
</ul>
<h3 id="advanced-features" class="position-relative d-flex align-items-center group">
<span>Advanced Features</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="advanced-features"
aria-haspopup="dialog"
aria-label="Share link: Advanced Features">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="message-priorities" class="position-relative d-flex align-items-center group">
<span>Message Priorities</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="message-priorities"
aria-haspopup="dialog"
aria-label="Share link: Message Priorities">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Prioritize message delivery:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Publish with priority</span>
</span></span><span class="line"><span class="cl"><span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">"tasks"</span><span class="p">,</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"task_id"</span><span class="p">:</span> <span class="s2">"task123"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"action"</span><span class="p">:</span> <span class="s2">"process_payment"</span>
</span></span><span class="line"><span class="cl"><span class="p">},</span> <span class="n">priority</span><span class="o">=</span><span class="s2">"high"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Subscribe to high-priority messages first</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"tasks"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">priority_order</span><span class="o">=</span><span class="kc">True</span>
</span></span><span class="line"><span class="cl"><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_task</span><span class="p">(</span><span class="n">message</span><span class="p">)</span>
</span></span></code></pre></div>
<h4 id="message-expiration" class="position-relative d-flex align-items-center group">
<span>Message Expiration</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="message-expiration"
aria-haspopup="dialog"
aria-label="Share link: Message Expiration">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Set message TTL:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Publish with TTL</span>
</span></span><span class="line"><span class="cl"><span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">"flash_sales"</span><span class="p">,</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"sale_id"</span><span class="p">:</span> <span class="s2">"sale123"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"discount"</span><span class="p">:</span> <span class="mf">0.5</span>
</span></span><span class="line"><span class="cl"><span class="p">},</span> <span class="n">ttl_seconds</span><span class="o">=</span><span class="mi">3600</span><span class="p">)</span> <span class="c1"># Expire after 1 hour</span>
</span></span></code></pre></div>
<h4 id="dead-letter-queues" class="position-relative d-flex align-items-center group">
<span>Dead Letter Queues</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="dead-letter-queues"
aria-haspopup="dialog"
aria-label="Share link: Dead Letter Queues">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Handle failed messages:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">CREATE</span><span class="w"> </span><span class="py">TOPIC</span><span class="w"> </span><span class="py">payment_processing</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WITH</span><span class="w"> </span><span class="py">OPTIONS</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">max_retries</span><span class="p">:</span><span class="w"> </span><span class="nc">3</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">dead_letter_topic</span><span class="p">:</span><span class="w"> </span><span class="err">'</span><span class="nc">payment_failures</span><span class="err">'</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">process_failed_payments</span><span class="p">(</span><span class="n">client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Handle messages that exceeded retry limit"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">"payment_failures"</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">failed_message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">manual_review</span><span class="p">(</span><span class="n">failed_message</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">notify_support</span><span class="p">(</span><span class="n">failed_message</span><span class="p">)</span>
</span></span></code></pre></div>
<h4 id="message-batching" class="position-relative d-flex align-items-center group">
<span>Message Batching</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="message-batching"
aria-haspopup="dialog"
aria-label="Share link: Message Batching">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Improve throughput with batching:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">batch_publisher</span><span class="p">(</span><span class="n">client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Publish messages in batches"""</span>
</span></span><span class="line"><span class="cl"> <span class="n">messages</span> <span class="o">=</span> <span class="p">[</span>
</span></span><span class="line"><span class="cl"> <span class="p">{</span><span class="s2">"user_id"</span><span class="p">:</span> <span class="sa">f</span><span class="s2">"user</span><span class="si">{</span><span class="n">i</span><span class="si">}</span><span class="s2">"</span><span class="p">,</span> <span class="s2">"action"</span><span class="p">:</span> <span class="s2">"login"</span><span class="p">}</span>
</span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">1000</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="p">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Batch publish</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish_batch</span><span class="p">(</span><span class="s2">"user_events"</span><span class="p">,</span> <span class="n">messages</span><span class="p">)</span>
</span></span></code></pre></div>
<h3 id="monitoring-and-observability" class="position-relative d-flex align-items-center group">
<span>Monitoring and Observability</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="monitoring-and-observability"
aria-haspopup="dialog"
aria-label="Share link: Monitoring and Observability">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="topic-metrics" class="position-relative d-flex align-items-center group">
<span>Topic Metrics</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="topic-metrics"
aria-haspopup="dialog"
aria-label="Share link: Topic Metrics">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Monitor topic health:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">topic_name</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">messages_published</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">messages_consumed</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">pending_messages</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">subscriber_count</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">publish_rate</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">consume_rate</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">SYSTEM</span><span class="err">.</span><span class="py">topics</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WHERE</span><span class="w"> </span><span class="py">pending_messages</span><span class="w"> </span><span class="err">></span><span class="w"> </span><span class="py">10000</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div>
<h4 id="subscriber-lag" class="position-relative d-flex align-items-center group">
<span>Subscriber Lag</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="subscriber-lag"
aria-haspopup="dialog"
aria-label="Share link: Subscriber Lag">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Track consumer lag:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-gql" data-lang="gql"><span class="line"><span class="cl"><span class="py">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">subscriber_id</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">topic_name</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">lag_messages</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">lag_seconds</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="py">last_message_time</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">FROM</span><span class="w"> </span><span class="py">SYSTEM</span><span class="err">.</span><span class="py">subscribers</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="py">WHERE</span><span class="w"> </span><span class="py">lag_messages</span><span class="w"> </span><span class="err">></span><span class="w"> </span><span class="py">1000</span><span class="err">;</span><span class="w">
</span></span></span></code></pre></div>
<h3 id="best-practices" class="position-relative d-flex align-items-center group">
<span>Best Practices</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="best-practices"
aria-haspopup="dialog"
aria-label="Share link: Best Practices">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="message-design" class="position-relative d-flex align-items-center group">
<span>Message Design</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="message-design"
aria-haspopup="dialog"
aria-label="Share link: Message Design">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><ul>
<li>Keep messages small and focused</li>
<li>Include timestamp and metadata</li>
<li>Version message schemas</li>
<li>Make messages self-contained</li>
<li>Use consistent naming conventions</li>
</ul>
<h4 id="subscription-management" class="position-relative d-flex align-items-center group">
<span>Subscription Management</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="subscription-management"
aria-haspopup="dialog"
aria-label="Share link: Subscription Management">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><ul>
<li>Clean up unused subscriptions</li>
<li>Monitor subscriber lag</li>
<li>Implement backpressure handling</li>
<li>Use appropriate delivery guarantees</li>
<li>Handle redelivery idempotently</li>
</ul>
<h4 id="error-handling" class="position-relative d-flex align-items-center group">
<span>Error Handling</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="error-handling"
aria-haspopup="dialog"
aria-label="Share link: Error Handling">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">robust_subscriber</span><span class="p">(</span><span class="n">client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Handle errors gracefully"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">while</span> <span class="kc">True</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">"events"</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_message</span><span class="p">(</span><span class="n">message</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Error processing message: </span><span class="si">{</span><span class="n">e</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">send_to_dlq</span><span class="p">(</span><span class="n">message</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">))</span>
</span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">ConnectionError</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="mi">5</span><span class="p">)</span> <span class="c1"># Reconnect delay</span>
</span></span></code></pre></div>
<h3 id="related-topics" class="position-relative d-flex align-items-center group">
<span>Related Topics</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="related-topics"
aria-haspopup="dialog"
aria-label="Share link: Related Topics">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3><ul>
<li><a
href="/tags/events"
>Events</a>
- Event-driven architecture</li>
<li><a
href="/tags/streaming"
>Streaming</a>
- Event streaming</li>
<li><a
href="/tags/cdc"
>CDC</a>
- Change data capture</li>
<li><a
href="/tags/websockets"
>WebSockets</a>
- Real-time communication</li>
</ul>
<h3 id="production-patterns" class="position-relative d-flex align-items-center group">
<span>Production Patterns</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="production-patterns"
aria-haspopup="dialog"
aria-label="Share link: Production Patterns">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="multi-region-pubsub" class="position-relative d-flex align-items-center group">
<span>Multi-Region Pub/Sub</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="multi-region-pubsub"
aria-haspopup="dialog"
aria-label="Share link: Multi-Region Pub/Sub">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Distribute messages across geographic regions:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-yaml" data-lang="yaml"><span class="line"><span class="cl"><span class="nt">topics</span><span class="p">:</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">global_notifications</span><span class="p">:</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">replicas</span><span class="p">:</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span>- <span class="nt">region</span><span class="p">:</span><span class="w"> </span><span class="l">us-east</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">priority</span><span class="p">:</span><span class="w"> </span><span class="l">primary</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span>- <span class="nt">region</span><span class="p">:</span><span class="w"> </span><span class="l">eu-west</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">priority</span><span class="p">:</span><span class="w"> </span><span class="l">secondary</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span>- <span class="nt">region</span><span class="p">:</span><span class="w"> </span><span class="l">ap-south</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">priority</span><span class="p">:</span><span class="w"> </span><span class="l">secondary</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">replication</span><span class="p">:</span><span class="w"> </span><span class="l">async</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">message_routing</span><span class="p">:</span><span class="w"> </span><span class="l">geo_nearest </span><span class="w"> </span><span class="c"># Route to nearest region</span><span class="w">
</span></span></span></code></pre></div><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Publisher in US publishes to local topic</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">"global_notifications"</span><span class="p">,</span> <span class="n">message</span><span class="p">,</span> <span class="n">region</span><span class="o">=</span><span class="s2">"us-east"</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="k">pass</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Subscribers in EU/AP receive via replication</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">"global_notifications"</span><span class="p">,</span> <span class="n">region</span><span class="o">=</span><span class="s2">"eu-west"</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">msg</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Receives messages from US with replication lag</span>
</span></span><span class="line"><span class="cl"> <span class="n">process_message</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span>
</span></span></code></pre></div>
<h4 id="message-acknowledgment-strategies" class="position-relative d-flex align-items-center group">
<span>Message Acknowledgment Strategies</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="message-acknowledgment-strategies"
aria-haspopup="dialog"
aria-label="Share link: Message Acknowledgment Strategies">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Control message delivery and acknowledgment:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">manual_acknowledgment</span><span class="p">():</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Fine-grained control over message acknowledgment"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">"orders"</span><span class="p">,</span> <span class="n">auto_ack</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Process message</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_order</span><span class="p">(</span><span class="n">message</span><span class="o">.</span><span class="n">data</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Explicitly acknowledge</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">message</span><span class="o">.</span><span class="n">ack</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Processed and acked: </span><span class="si">{</span><span class="n">message</span><span class="o">.</span><span class="n">id</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="n">ProcessingError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Negative acknowledgment (requeue)</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">message</span><span class="o">.</span><span class="n">nack</span><span class="p">(</span><span class="n">requeue</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Failed to process </span><span class="si">{</span><span class="n">message</span><span class="o">.</span><span class="n">id</span><span class="si">}</span><span class="s2">: </span><span class="si">{</span><span class="n">e</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="n">FatalError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Dead letter (don't requeue)</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">message</span><span class="o">.</span><span class="n">nack</span><span class="p">(</span><span class="n">requeue</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">critical</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Fatal error on </span><span class="si">{</span><span class="n">message</span><span class="o">.</span><span class="n">id</span><span class="si">}</span><span class="s2">: </span><span class="si">{</span><span class="n">e</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
</span></span></code></pre></div>
<h4 id="consumer-groups" class="position-relative d-flex align-items-center group">
<span>Consumer Groups</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="consumer-groups"
aria-haspopup="dialog"
aria-label="Share link: Consumer Groups">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Load balance message processing across consumers:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Consumer group for parallel processing</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">consumer_group_worker</span><span class="p">(</span><span class="n">worker_id</span><span class="p">,</span> <span class="n">group_id</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Worker in consumer group"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"order_processing"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">consumer_group</span><span class="o">=</span><span class="n">group_id</span><span class="p">,</span> <span class="c1"># Messages distributed across group</span>
</span></span><span class="line"><span class="cl"> <span class="n">consumer_id</span><span class="o">=</span><span class="n">worker_id</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">message</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="sa">f</span><span class="s2">"Worker </span><span class="si">{</span><span class="n">worker_id</span><span class="si">}</span><span class="s2"> processing </span><span class="si">{</span><span class="n">message</span><span class="o">.</span><span class="n">id</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_order</span><span class="p">(</span><span class="n">message</span><span class="o">.</span><span class="n">data</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">message</span><span class="o">.</span><span class="n">ack</span><span class="p">()</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Start multiple workers (messages distributed among them)</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">main</span><span class="p">():</span>
</span></span><span class="line"><span class="cl"> <span class="n">workers</span> <span class="o">=</span> <span class="p">[</span>
</span></span><span class="line"><span class="cl"> <span class="n">consumer_group_worker</span><span class="p">(</span><span class="sa">f</span><span class="s2">"worker-</span><span class="si">{</span><span class="n">i</span><span class="si">}</span><span class="s2">"</span><span class="p">,</span> <span class="s2">"order-processors"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">10</span><span class="p">)</span> <span class="c1"># 10 workers share the load</span>
</span></span><span class="line"><span class="cl"> <span class="p">]</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">gather</span><span class="p">(</span><span class="o">*</span><span class="n">workers</span><span class="p">)</span>
</span></span></code></pre></div>
<h4 id="ordered-message-processing" class="position-relative d-flex align-items-center group">
<span>Ordered Message Processing</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="ordered-message-processing"
aria-haspopup="dialog"
aria-label="Share link: Ordered Message Processing">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Maintain message ordering when required:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Publish with partition key (same key → same partition → ordered)</span>
</span></span><span class="line"><span class="cl"><span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">"user_events"</span><span class="p">,</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"user_id"</span><span class="p">:</span> <span class="s2">"user123"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"action"</span><span class="p">:</span> <span class="s2">"login"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"timestamp"</span><span class="p">:</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"><span class="p">},</span> <span class="n">partition_key</span><span class="o">=</span><span class="s2">"user123"</span><span class="p">)</span> <span class="c1"># All events for user123 ordered</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Subscribe with ordering guarantee</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"user_events"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">ordering</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">partition_key_filter</span><span class="o">=</span><span class="s2">"user123"</span> <span class="c1"># Process only this user's events in order</span>
</span></span><span class="line"><span class="cl"><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Events for user123 arrive in published order</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_user_event</span><span class="p">(</span><span class="n">event</span><span class="p">)</span>
</span></span></code></pre></div>
<h3 id="advanced-messaging-patterns" class="position-relative d-flex align-items-center group">
<span>Advanced Messaging Patterns</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="advanced-messaging-patterns"
aria-haspopup="dialog"
aria-label="Share link: Advanced Messaging Patterns">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="saga-pattern-with-pubsub" class="position-relative d-flex align-items-center group">
<span>Saga Pattern with Pub/Sub</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="saga-pattern-with-pubsub"
aria-haspopup="dialog"
aria-label="Share link: Saga Pattern with Pub/Sub">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Coordinate distributed transactions:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">OrderSaga</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Distributed transaction using pub/sub"""</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">client</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">start_order</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">order_data</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Initiate saga"""</span>
</span></span><span class="line"><span class="cl"> <span class="n">saga_id</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Publish order created event</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">"saga_events"</span><span class="p">,</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"saga_id"</span><span class="p">:</span> <span class="n">saga_id</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"type"</span><span class="p">:</span> <span class="s2">"ORDER_CREATED"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"data"</span><span class="p">:</span> <span class="n">order_data</span>
</span></span><span class="line"><span class="cl"> <span class="p">})</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">saga_id</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">saga_orchestrator</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Orchestrate saga steps"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">"saga_events"</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">saga_id</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">data</span><span class="p">[</span><span class="s1">'saga_id'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"> <span class="n">event_type</span> <span class="o">=</span> <span class="n">event</span><span class="o">.</span><span class="n">data</span><span class="p">[</span><span class="s1">'type'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">event_type</span> <span class="o">==</span> <span class="s2">"ORDER_CREATED"</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Step 1: Reserve inventory</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">reserve_inventory</span><span class="p">(</span><span class="n">saga_id</span><span class="p">,</span> <span class="n">event</span><span class="o">.</span><span class="n">data</span><span class="p">[</span><span class="s1">'data'</span><span class="p">])</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event_type</span> <span class="o">==</span> <span class="s2">"INVENTORY_RESERVED"</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Step 2: Process payment</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">process_payment</span><span class="p">(</span><span class="n">saga_id</span><span class="p">,</span> <span class="n">event</span><span class="o">.</span><span class="n">data</span><span class="p">[</span><span class="s1">'data'</span><span class="p">])</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event_type</span> <span class="o">==</span> <span class="s2">"PAYMENT_PROCESSED"</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Step 3: Ship order</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">ship_order</span><span class="p">(</span><span class="n">saga_id</span><span class="p">,</span> <span class="n">event</span><span class="o">.</span><span class="n">data</span><span class="p">[</span><span class="s1">'data'</span><span class="p">])</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event_type</span> <span class="o">==</span> <span class="s2">"SHIPPING_CONFIRMED"</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Saga complete</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">complete_saga</span><span class="p">(</span><span class="n">saga_id</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event_type</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="s2">"_FAILED"</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Compensating actions</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">rollback_saga</span><span class="p">(</span><span class="n">saga_id</span><span class="p">,</span> <span class="n">event_type</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">reserve_inventory</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">saga_id</span><span class="p">,</span> <span class="n">order</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Reserve inventory (Step 1)"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Business logic</span>
</span></span><span class="line"><span class="cl"> <span class="n">result</span> <span class="o">=</span> <span class="k">await</span> <span class="n">inventory_service</span><span class="o">.</span><span class="n">reserve</span><span class="p">(</span><span class="n">order</span><span class="p">[</span><span class="s1">'items'</span><span class="p">])</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Publish success event</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">"saga_events"</span><span class="p">,</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"saga_id"</span><span class="p">:</span> <span class="n">saga_id</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"type"</span><span class="p">:</span> <span class="s2">"INVENTORY_RESERVED"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"data"</span><span class="p">:</span> <span class="n">result</span>
</span></span><span class="line"><span class="cl"> <span class="p">})</span>
</span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Publish failure event</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">"saga_events"</span><span class="p">,</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"saga_id"</span><span class="p">:</span> <span class="n">saga_id</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"type"</span><span class="p">:</span> <span class="s2">"INVENTORY_RESERVATION_FAILED"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"error"</span><span class="p">:</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="p">})</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">rollback_saga</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">saga_id</span><span class="p">,</span> <span class="n">failed_step</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Compensating transactions"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">failed_step</span> <span class="ow">in</span> <span class="p">[</span><span class="s2">"PAYMENT_FAILED"</span><span class="p">,</span> <span class="s2">"SHIPPING_FAILED"</span><span class="p">]:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Release inventory reservation</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">release_inventory</span><span class="p">(</span><span class="n">saga_id</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">failed_step</span> <span class="o">==</span> <span class="s2">"SHIPPING_FAILED"</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Refund payment</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">refund_payment</span><span class="p">(</span><span class="n">saga_id</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Mark saga as rolled back</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">"saga_events"</span><span class="p">,</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"saga_id"</span><span class="p">:</span> <span class="n">saga_id</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"type"</span><span class="p">:</span> <span class="s2">"SAGA_ROLLED_BACK"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"reason"</span><span class="p">:</span> <span class="n">failed_step</span>
</span></span><span class="line"><span class="cl"> <span class="p">})</span>
</span></span></code></pre></div>
<h4 id="event-sourcing" class="position-relative d-flex align-items-center group">
<span>Event Sourcing</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="event-sourcing"
aria-haspopup="dialog"
aria-label="Share link: Event Sourcing">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Store state as sequence of events:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">EventSourcedEntity</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Entity rebuilt from event stream"""</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">entity_id</span><span class="p">,</span> <span class="n">client</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">entity_id</span> <span class="o">=</span> <span class="n">entity_id</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">client</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="p">{}</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">version</span> <span class="o">=</span> <span class="mi">0</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">load</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Rebuild state from events"""</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Subscribe to entity's event stream</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="sa">f</span><span class="s2">"entity_</span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">entity_id</span><span class="si">}</span><span class="s2">"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">from_beginning</span><span class="o">=</span><span class="kc">True</span> <span class="c1"># Replay all events</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">apply_event</span><span class="p">(</span><span class="n">event</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">version</span> <span class="o">+=</span> <span class="mi">1</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="nf">apply_event</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Apply event to state"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">event</span><span class="p">[</span><span class="s1">'type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'ACCOUNT_CREATED'</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="s1">'balance'</span><span class="p">]</span> <span class="o">=</span> <span class="n">event</span><span class="p">[</span><span class="s1">'initial_balance'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="s1">'created_at'</span><span class="p">]</span> <span class="o">=</span> <span class="n">event</span><span class="p">[</span><span class="s1">'timestamp'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event</span><span class="p">[</span><span class="s1">'type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'DEPOSIT'</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="s1">'balance'</span><span class="p">]</span> <span class="o">+=</span> <span class="n">event</span><span class="p">[</span><span class="s1">'amount'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event</span><span class="p">[</span><span class="s1">'type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'WITHDRAWAL'</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="s1">'balance'</span><span class="p">]</span> <span class="o">-=</span> <span class="n">event</span><span class="p">[</span><span class="s1">'amount'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">def</span> <span class="nf">deposit</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">amount</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Command: deposit money"""</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Validate</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">amount</span> <span class="o"><=</span> <span class="mi">0</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"Amount must be positive"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Publish event</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="sa">f</span><span class="s2">"entity_</span><span class="si">{</span><span class="bp">self</span><span class="o">.</span><span class="n">entity_id</span><span class="si">}</span><span class="s2">"</span><span class="p">,</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"type"</span><span class="p">:</span> <span class="s2">"DEPOSIT"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"amount"</span><span class="p">:</span> <span class="n">amount</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"timestamp"</span><span class="p">:</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">(),</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"version"</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">version</span> <span class="o">+</span> <span class="mi">1</span>
</span></span><span class="line"><span class="cl"> <span class="p">})</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Apply to local state</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">[</span><span class="s1">'balance'</span><span class="p">]</span> <span class="o">+=</span> <span class="n">amount</span>
</span></span><span class="line"><span class="cl"> <span class="bp">self</span><span class="o">.</span><span class="n">version</span> <span class="o">+=</span> <span class="mi">1</span>
</span></span></code></pre></div>
<h4 id="cqrs-with-pubsub" class="position-relative d-flex align-items-center group">
<span>CQRS with Pub/Sub</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="cqrs-with-pubsub"
aria-haspopup="dialog"
aria-label="Share link: CQRS with Pub/Sub">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Separate read and write models:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Write side: publish events</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">create_user</span><span class="p">(</span><span class="n">user_data</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Command: create user (write model)"""</span>
</span></span><span class="line"><span class="cl"> <span class="n">user_id</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">uuid</span><span class="o">.</span><span class="n">uuid4</span><span class="p">())</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Store in write database</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">geode_client</span><span class="o">.</span><span class="n">connection</span><span class="p">()</span> <span class="k">as</span> <span class="n">conn</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">begin</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="k">try</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="s2">"""
</span></span></span><span class="line"><span class="cl"><span class="s2"> CREATE (u:User {id: $id, email: $email, created_at: datetime()})
</span></span></span><span class="line"><span class="cl"><span class="s2"> """</span><span class="p">,</span> <span class="p">{</span><span class="s2">"id"</span><span class="p">:</span> <span class="n">user_id</span><span class="p">,</span> <span class="s2">"email"</span><span class="p">:</span> <span class="n">user_data</span><span class="p">[</span><span class="s1">'email'</span><span class="p">]})</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">conn</span><span class="o">.</span><span class="n">rollback</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="k">raise</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Publish event</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">pubsub_client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="s2">"user_events"</span><span class="p">,</span> <span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"type"</span><span class="p">:</span> <span class="s2">"USER_CREATED"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"user_id"</span><span class="p">:</span> <span class="n">user_id</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"data"</span><span class="p">:</span> <span class="n">user_data</span>
</span></span><span class="line"><span class="cl"> <span class="p">})</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">user_id</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Read side: consume events and update read model</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">user_projection_updater</span><span class="p">():</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Update read-optimized view"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">pubsub_client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="s2">"user_events"</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">event</span><span class="p">[</span><span class="s1">'type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'USER_CREATED'</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Update read database (denormalized for fast queries)</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">redis</span><span class="o">.</span><span class="n">hset</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="sa">f</span><span class="s2">"user:</span><span class="si">{</span><span class="n">event</span><span class="p">[</span><span class="s1">'user_id'</span><span class="p">]</span><span class="si">}</span><span class="s2">"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">mapping</span><span class="o">=</span><span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"email"</span><span class="p">:</span> <span class="n">event</span><span class="p">[</span><span class="s1">'data'</span><span class="p">][</span><span class="s1">'email'</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"created_at"</span><span class="p">:</span> <span class="n">event</span><span class="p">[</span><span class="s1">'data'</span><span class="p">][</span><span class="s1">'created_at'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"> <span class="p">}</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">elif</span> <span class="n">event</span><span class="p">[</span><span class="s1">'type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'USER_UPDATED'</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Update read model</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">redis</span><span class="o">.</span><span class="n">hset</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="sa">f</span><span class="s2">"user:</span><span class="si">{</span><span class="n">event</span><span class="p">[</span><span class="s1">'user_id'</span><span class="p">]</span><span class="si">}</span><span class="s2">"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">mapping</span><span class="o">=</span><span class="n">event</span><span class="p">[</span><span class="s1">'data'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Read from read model (fast)</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">get_user</span><span class="p">(</span><span class="n">user_id</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Query: get user (read model)"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="k">await</span> <span class="n">redis</span><span class="o">.</span><span class="n">hgetall</span><span class="p">(</span><span class="sa">f</span><span class="s2">"user:</span><span class="si">{</span><span class="n">user_id</span><span class="si">}</span><span class="s2">"</span><span class="p">)</span>
</span></span></code></pre></div>
<h3 id="performance-tuning" class="position-relative d-flex align-items-center group">
<span>Performance Tuning</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="performance-tuning"
aria-haspopup="dialog"
aria-label="Share link: Performance Tuning">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="batch-publishing" class="position-relative d-flex align-items-center group">
<span>Batch Publishing</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="batch-publishing"
aria-haspopup="dialog"
aria-label="Share link: Batch Publishing">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Improve throughput with batching:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">batch_publish_example</span><span class="p">():</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Publish messages in batches"""</span>
</span></span><span class="line"><span class="cl"> <span class="n">messages</span> <span class="o">=</span> <span class="p">[</span>
</span></span><span class="line"><span class="cl"> <span class="p">{</span><span class="s2">"user_id"</span><span class="p">:</span> <span class="n">i</span><span class="p">,</span> <span class="s2">"action"</span><span class="p">:</span> <span class="s2">"click"</span><span class="p">,</span> <span class="s2">"timestamp"</span><span class="p">:</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()}</span>
</span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">10000</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="p">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Batch publish (100x faster than individual publishes)</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish_batch</span><span class="p">(</span><span class="s2">"user_actions"</span><span class="p">,</span> <span class="n">messages</span><span class="p">,</span> <span class="n">batch_size</span><span class="o">=</span><span class="mi">100</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Publishes 100 messages per network round-trip</span>
</span></span></code></pre></div>
<h4 id="message-compression" class="position-relative d-flex align-items-center group">
<span>Message Compression</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="message-compression"
aria-haspopup="dialog"
aria-label="Share link: Message Compression">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Reduce network bandwidth:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Enable compression for large messages</span>
</span></span><span class="line"><span class="cl"><span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"analytics_data"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">large_payload</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">compression</span><span class="o">=</span><span class="s2">"lz4"</span> <span class="c1"># Fast compression algorithm</span>
</span></span><span class="line"><span class="cl"><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Compression reduces bandwidth by 70-90% for JSON payloads</span>
</span></span></code></pre></div>
<h4 id="topic-sharding" class="position-relative d-flex align-items-center group">
<span>Topic Sharding</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="topic-sharding"
aria-haspopup="dialog"
aria-label="Share link: Topic Sharding">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Scale topics horizontally:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-yaml" data-lang="yaml"><span class="line"><span class="cl"><span class="c"># Shard topic across multiple partitions</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="nt">topics</span><span class="p">:</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">high_volume_events</span><span class="p">:</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">partitions</span><span class="p">:</span><span class="w"> </span><span class="m">64</span><span class="w"> </span><span class="c"># 64 shards for parallel processing</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nt">partition_key</span><span class="p">:</span><span class="w"> </span><span class="l">user_id </span><span class="w"> </span><span class="c"># Partition by user ID</span><span class="w">
</span></span></span></code></pre></div><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Messages automatically sharded by partition key</span>
</span></span><span class="line"><span class="cl"><span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"high_volume_events"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">message</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">partition_key</span><span class="o">=</span><span class="n">user_id</span> <span class="c1"># Determines which partition</span>
</span></span><span class="line"><span class="cl"><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Subscribers process partitions in parallel</span>
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">consume_partition</span><span class="p">(</span><span class="n">partition_id</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"high_volume_events"</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">partition</span><span class="o">=</span><span class="n">partition_id</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">msg</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_message</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># 64 workers, each processing one partition</span>
</span></span><span class="line"><span class="cl"><span class="n">workers</span> <span class="o">=</span> <span class="p">[</span><span class="n">consume_partition</span><span class="p">(</span><span class="n">i</span><span class="p">)</span> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">64</span><span class="p">)]</span>
</span></span><span class="line"><span class="cl"><span class="k">await</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">gather</span><span class="p">(</span><span class="o">*</span><span class="n">workers</span><span class="p">)</span>
</span></span></code></pre></div>
<h3 id="monitoring-and-operations" class="position-relative d-flex align-items-center group">
<span>Monitoring and Operations</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="monitoring-and-operations"
aria-haspopup="dialog"
aria-label="Share link: Monitoring and Operations">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3>
<h4 id="message-flow-metrics" class="position-relative d-flex align-items-center group">
<span>Message Flow Metrics</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="message-flow-metrics"
aria-haspopup="dialog"
aria-label="Share link: Message Flow Metrics">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Track message flow through system:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">prometheus_client</span> <span class="kn">import</span> <span class="n">Counter</span><span class="p">,</span> <span class="n">Histogram</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">messages_published</span> <span class="o">=</span> <span class="n">Counter</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'pubsub_messages_published_total'</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'Total messages published'</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="p">[</span><span class="s1">'topic'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">messages_consumed</span> <span class="o">=</span> <span class="n">Counter</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'pubsub_messages_consumed_total'</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'Total messages consumed'</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="p">[</span><span class="s1">'topic'</span><span class="p">,</span> <span class="s1">'consumer_group'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">message_latency</span> <span class="o">=</span> <span class="n">Histogram</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'pubsub_message_latency_seconds'</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="s1">'Message end-to-end latency'</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="p">[</span><span class="s1">'topic'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">publish_with_metrics</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span> <span class="n">message</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Publish with metrics tracking"""</span>
</span></span><span class="line"><span class="cl"> <span class="n">start_time</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="n">message</span><span class="p">[</span><span class="s1">'_published_at'</span><span class="p">]</span> <span class="o">=</span> <span class="n">start_time</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">publish</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span> <span class="n">message</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="n">messages_published</span><span class="o">.</span><span class="n">labels</span><span class="p">(</span><span class="n">topic</span><span class="o">=</span><span class="n">topic</span><span class="p">)</span><span class="o">.</span><span class="n">inc</span><span class="p">()</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">consume_with_metrics</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span> <span class="n">consumer_group</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Consume with metrics tracking"""</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">with</span> <span class="n">client</span><span class="o">.</span><span class="n">subscribe</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span> <span class="n">consumer_group</span><span class="o">=</span><span class="n">consumer_group</span><span class="p">)</span> <span class="k">as</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="k">async</span> <span class="k">for</span> <span class="n">msg</span> <span class="ow">in</span> <span class="n">sub</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Track latency</span>
</span></span><span class="line"><span class="cl"> <span class="n">latency</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">-</span> <span class="n">msg</span><span class="o">.</span><span class="n">data</span><span class="p">[</span><span class="s1">'_published_at'</span><span class="p">]</span>
</span></span><span class="line"><span class="cl"> <span class="n">message_latency</span><span class="o">.</span><span class="n">labels</span><span class="p">(</span><span class="n">topic</span><span class="o">=</span><span class="n">topic</span><span class="p">)</span><span class="o">.</span><span class="n">observe</span><span class="p">(</span><span class="n">latency</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Process</span>
</span></span><span class="line"><span class="cl"> <span class="k">await</span> <span class="n">process_message</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="n">messages_consumed</span><span class="o">.</span><span class="n">labels</span><span class="p">(</span>
</span></span><span class="line"><span class="cl"> <span class="n">topic</span><span class="o">=</span><span class="n">topic</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">consumer_group</span><span class="o">=</span><span class="n">consumer_group</span>
</span></span><span class="line"><span class="cl"> <span class="p">)</span><span class="o">.</span><span class="n">inc</span><span class="p">()</span>
</span></span></code></pre></div>
<h4 id="health-checks" class="position-relative d-flex align-items-center group">
<span>Health Checks</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="health-checks"
aria-haspopup="dialog"
aria-label="Share link: Health Checks">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h4><p>Monitor pub/sub system health:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="k">async</span> <span class="k">def</span> <span class="nf">health_check</span><span class="p">():</span>
</span></span><span class="line"><span class="cl"> <span class="s2">"""Check pub/sub system health"""</span>
</span></span><span class="line"><span class="cl"> <span class="c1"># Check topic lag</span>
</span></span><span class="line"><span class="cl"> <span class="n">lag</span> <span class="o">=</span> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">get_consumer_lag</span><span class="p">(</span><span class="s2">"critical_events"</span><span class="p">,</span> <span class="s2">"processors"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">lag</span> <span class="o">></span> <span class="mi">10000</span><span class="p">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">alert</span><span class="p">(</span><span class="s2">"High consumer lag: </span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">lag</span><span class="p">))</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Check error rates</span>
</span></span><span class="line"><span class="cl"> <span class="n">error_rate</span> <span class="o">=</span> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">get_error_rate</span><span class="p">(</span><span class="s2">"critical_events"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">error_rate</span> <span class="o">></span> <span class="mf">0.01</span><span class="p">:</span> <span class="c1"># >1% error rate</span>
</span></span><span class="line"><span class="cl"> <span class="n">alert</span><span class="p">(</span><span class="s2">"High error rate: </span><span class="si">{:.2%}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">error_rate</span><span class="p">))</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1"># Check throughput</span>
</span></span><span class="line"><span class="cl"> <span class="n">throughput</span> <span class="o">=</span> <span class="k">await</span> <span class="n">client</span><span class="o">.</span><span class="n">get_throughput</span><span class="p">(</span><span class="s2">"critical_events"</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="n">throughput</span> <span class="o"><</span> <span class="mi">100</span><span class="p">:</span> <span class="c1"># <100 msgs/sec</span>
</span></span><span class="line"><span class="cl"> <span class="n">alert</span><span class="p">(</span><span class="s2">"Low throughput: </span><span class="si">{}</span><span class="s2"> msgs/sec"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">throughput</span><span class="p">))</span>
</span></span></code></pre></div>
<h3 id="learn-more" class="position-relative d-flex align-items-center group">
<span>Learn More</span>
<button type="button"
class="h-share btn btn-link p-0 text-decoration-none link-secondary opacity-50 hover-opacity-100 transition-all ms-1"
data-share-target="learn-more"
aria-haspopup="dialog"
aria-label="Share link: Learn More">
<i class="fa-sharp-duotone fa-solid fa-share-nodes" aria-hidden="true" style="font-size: 0.8em;"></i>
<span class="visually-hidden">Share link</span>
</button>
</h3><ul>
<li><a
href="https://docs.microsoft.com/en-us/azure/architecture/patterns/publisher-subscriber"
aria-label="Pub/Sub Patterns – opens in new window"
target="_blank" rel="noopener noreferrer"
>Pub/Sub Patterns
<span aria-hidden="true" class="external-icon">↗</span>
</a>
</li>
<li><a
href="https://cloud.google.com/pubsub/docs"
aria-label="Google Cloud Pub/Sub – opens in new window"
target="_blank" rel="noopener noreferrer"
>Google Cloud Pub/Sub
<span aria-hidden="true" class="external-icon">↗</span>
</a>
</li>
<li><a
href="https://kafka.apache.org/"
aria-label="Apache Kafka – opens in new window"
target="_blank" rel="noopener noreferrer"
>Apache Kafka
<span aria-hidden="true" class="external-icon">↗</span>
</a>
</li>
<li><a
href="https://martinfowler.com/articles/201701-event-driven.html"
aria-label="Event-Driven Architecture – opens in new window"
target="_blank" rel="noopener noreferrer"
>Event-Driven Architecture
<span aria-hidden="true" class="external-icon">↗</span>
</a>
</li>
<li><a
href="https://microservices.io/patterns/data/saga.html"
aria-label="Saga Pattern – opens in new window"
target="_blank" rel="noopener noreferrer"
>Saga Pattern
<span aria-hidden="true" class="external-icon">↗</span>
</a>
</li>
<li><a
href="https://martinfowler.com/eaaDev/EventSourcing.html"
aria-label="Event Sourcing – opens in new window"
target="_blank" rel="noopener noreferrer"
>Event Sourcing
<span aria-hidden="true" class="external-icon">↗</span>
</a>
</li>
</ul>
Related Articles
No articles found with this tag yet.
Back to Home