SSE Package Documentation
Server-Sent Events integration for Count Cachula - enabling real-time cache invalidation and preload hints.
Installation
npm install @countcachula/sseBasic Setup
import { CacheHub } from '@countcachula/sse';import { streamSSE } from 'hono/streaming';
// Create a hub instanceconst hub = new CacheHub();
// Set up SSE endpointapp.get('/api/cache-events', (c) => { return streamSSE(c, async (stream) => { hub.addConnection(stream);
await stream.writeSSE({ event: 'connected', data: 'connected' });
c.req.raw.signal.addEventListener('abort', () => { hub.removeConnection(stream); });
// Keep alive while (!c.req.raw.signal.aborted) { await new Promise(resolve => setTimeout(resolve, 1000)); } });});
// Invalidate cache on mutationsapp.patch('/api/items/:id', async (c) => { // ... update logic ... hub.invalidate([`item:${id}`, 'items-list']); return c.json({ success: true });});Core Concepts
Server-Sent Events Overview
Server-Sent Events (SSE) provide a one-way communication channel from server to client over HTTP:
- • Persistent Connection - Single long-lived HTTP connection
- • Automatic Reconnection - Browser handles reconnection automatically
- • Text-Based Protocol - Simple event stream format
- • Real-Time Updates - Server can push updates instantly
SSE is perfect for cache invalidation because:
- • Lightweight compared to WebSockets
- • Built-in browser support
- • Works through proxies and firewalls
- • Ideal for server-to-client notifications
Cache Invalidation Patterns
Cache invalidation tells clients when their cached data is stale:
Tag-Based Invalidation
// Tag your cache entries conceptuallyhub.invalidate([ 'user:123', // Specific user data 'users-list', // List containing this user 'stats:users' // User statistics]);Cascade Invalidation
// When a parent changes, invalidate childrenhub.invalidate([ 'project:456', 'project:456:tasks', 'project:456:members']);Global Invalidation
// System-wide changeshub.invalidate(['*']); // Clear everythingPreload Hints Explained
Preload hints tell clients what data they'll likely need next:
// After returning a list, hint about detail pagesapp.get('/api/issues', async (c) => { const issues = await getIssues();
// Tell clients to preload individual issue pages const urls = issues.map(i => `/api/issues/${i.id}`); hub.preloadHint(urls);
return c.json(issues);});Benefits:
- • Instant Navigation - Data is already cached when needed
- • Predictive Loading - Load likely next steps
- • Bandwidth Efficient - Only preload what's probable
API Reference
CacheHub
Central manager for SSE connections and cache events.
Constructor
const hub = new CacheHub();Creates a new hub instance. Typically one per server.
addConnection(stream: SSEStream): void
Register a new SSE connection.
Parameters:
stream: SSEStream- SSE stream object with writeSSE method
Example:
app.get('/sse', (c) => { return streamSSE(c, async (stream) => { hub.addConnection(stream); // ... });}); removeConnection(stream: SSEStream): void
Unregister an SSE connection.
Parameters:
stream: SSEStream- Previously added stream to remove
Example:
c.req.raw.signal.addEventListener('abort', () => { hub.removeConnection(stream);}); invalidate(tags: string[]): Promise<void>
Broadcast cache invalidation event to all connected clients.
Parameters:
tags: string[]- Array of cache tags to invalidate
Returns:
Promise<void>- Resolves when broadcast is complete
Example:
// Single tagawait hub.invalidate(['user:123']);
// Multiple tagsawait hub.invalidate(['posts-list', 'post:456', 'comments:456']);
// Pattern matching (client-side implementation)await hub.invalidate(['posts:*']); // All posts preloadHint(routes: string[]): Promise<void>
Broadcast preload hint event to all connected clients.
Parameters:
routes: string[]- Array of URLs to preload
Returns:
Promise<void>- Resolves when broadcast is complete
Example:
// Hint about related dataawait hub.preloadHint([ '/api/user/profile', '/api/user/settings', '/api/user/notifications']);
// Hint about next pageawait hub.preloadHint(['/api/items?page=2']); SSEStream Interface
The stream interface expected by CacheHub:
interface SSEStream { writeSSE: (message: { data: string; event: string; }) => Promise<void>;}Compatible with:
- • Hono's
streamSSE - • Express with
express-sse - • Custom implementations
Guides
Setting Up SSE Endpoint
With Hono
import { Hono } from 'hono';import { streamSSE } from 'hono/streaming';import { CacheHub } from '@countcachula/sse';
const app = new Hono();const hub = new CacheHub();
app.get('/api/cache-events', (c) => { return streamSSE(c, async (stream) => { // Register connection hub.addConnection(stream);
// Send initial connection event await stream.writeSSE({ event: 'connected', data: JSON.stringify({ timestamp: Date.now() }) });
// Clean up on disconnect c.req.raw.signal.addEventListener('abort', () => { hub.removeConnection(stream); console.log('Client disconnected'); });
// Keep connection alive with heartbeat while (!c.req.raw.signal.aborted) { await new Promise(resolve => setTimeout(resolve, 30000));
// Send heartbeat await stream.writeSSE({ event: 'heartbeat', data: 'ping' }); } });});With Express
import express from 'express';import { CacheHub } from '@countcachula/sse';
const app = express();const hub = new CacheHub();
app.get('/api/cache-events', (req, res) => { // Set SSE headers res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' });
// Create stream wrapper const stream = { writeSSE: async ({ event, data }) => { res.write(`event: ${event}\n`); res.write(`data: ${data}\n\n`); } };
hub.addConnection(stream);
// Send initial event stream.writeSSE({ event: 'connected', data: 'connected' });
// Clean up on disconnect req.on('close', () => { hub.removeConnection(stream); });});Invalidation Strategies
Entity-Based Invalidation
// When an entity changes, invalidate all related cachesapp.patch('/api/users/:id', async (c) => { const userId = c.req.param('id'); const updates = await c.req.json();
await updateUser(userId, updates);
// Invalidate specific user and any lists containing them await hub.invalidate([ `user:${userId}`, // User detail 'users-list', // Main list `team:${updates.teamId}:members`, // Team member list 'stats:users' // User statistics ]);
return c.json({ success: true });});Hierarchical Invalidation
// Invalidate parent and all childrenapp.delete('/api/projects/:id', async (c) => { const projectId = c.req.param('id');
await deleteProject(projectId);
await hub.invalidate([ `project:${projectId}`, `project:${projectId}:tasks`, `project:${projectId}:members`, `project:${projectId}:files`, 'projects-list' ]);
return c.json({ success: true });});Time-Based Invalidation
// Periodically invalidate volatile datasetInterval(async () => { await hub.invalidate([ 'stats:realtime', 'dashboard:metrics', 'active-users' ]);}, 60000); // Every minutePreloading Patterns
List-to-Detail Preloading
app.get('/api/posts', async (c) => { const posts = await getPosts();
// After sending list, hint about individual posts if (posts.length > 0) { // Preload first 5 posts const urls = posts .slice(0, 5) .map(p => `/api/posts/${p.id}`);
await hub.preloadHint(urls); }
return c.json(posts);});Navigation-Based Preloading
app.get('/api/user/profile', async (c) => { const profile = await getUserProfile(c.userId);
// Preload likely next navigation await hub.preloadHint([ '/api/user/settings', // Common next step '/api/user/notifications', // Often checked together `/api/users/${profile.id}/posts` // User's content ]);
return c.json(profile);});Pagination Preloading
app.get('/api/items', async (c) => { const page = parseInt(c.req.query('page') || '1'); const items = await getItems(page);
// Preload next page if (items.hasMore) { await hub.preloadHint([ `/api/items?page=${page + 1}` ]); }
return c.json(items);});Examples
Complete Server Setup
import { Hono } from 'hono';import { streamSSE } from 'hono/streaming';import { CacheHub } from '@countcachula/sse';
const app = new Hono();const hub = new CacheHub();
// In-memory databaseconst items = new Map();
// SSE endpointapp.get('/api/events', (c) => { return streamSSE(c, async (stream) => { hub.addConnection(stream);
await stream.writeSSE({ event: 'connected', data: new Date().toISOString() });
c.req.raw.signal.addEventListener('abort', () => { hub.removeConnection(stream); });
while (!c.req.raw.signal.aborted) { await new Promise(r => setTimeout(r, 1000)); } });});
// CRUD operations with cache managementapp.get('/api/items', (c) => { const itemsList = Array.from(items.values());
// Preload first 3 items const preloads = itemsList .slice(0, 3) .map(item => `/api/items/${item.id}`);
if (preloads.length > 0) { hub.preloadHint(preloads); }
return c.json(itemsList);});
app.post('/api/items', async (c) => { const data = await c.req.json(); const item = { id: crypto.randomUUID(), ...data, createdAt: new Date() };
items.set(item.id, item);
// Invalidate list cache await hub.invalidate(['items-list']);
// Preload the new item await hub.preloadHint([`/api/items/${item.id}`]);
return c.json(item, 201);});
app.patch('/api/items/:id', async (c) => { const id = c.req.param('id'); const updates = await c.req.json();
const item = items.get(id); if (!item) { return c.json({ error: 'Not found' }, 404); }
const updated = { ...item, ...updates, updatedAt: new Date() }; items.set(id, updated);
// Invalidate specific item and list await hub.invalidate([`item:${id}`, 'items-list']);
return c.json(updated);});Client-Side Integration
// Client code to consume SSE eventsclass CacheManager { constructor() { this.eventSource = null; this.cache = new Map(); }
connect(url) { this.eventSource = new EventSource(url);
this.eventSource.addEventListener('connected', (e) => { console.log('Connected to cache events'); });
this.eventSource.addEventListener('invalidate', (e) => { const tags = JSON.parse(e.data); this.handleInvalidation(tags); });
this.eventSource.addEventListener('preload', (e) => { const routes = JSON.parse(e.data); this.handlePreload(routes); }); }
handleInvalidation(tags) { // Clear matching cache entries tags.forEach(tag => { this.cache.delete(tag);
// Pattern matching for wildcards if (tag.includes('*')) { const pattern = new RegExp(tag.replace('*', '.*')); for (const key of this.cache.keys()) { if (pattern.test(key)) { this.cache.delete(key); } } } }); }
async handlePreload(routes) { // Preload in background for (const route of routes) { fetch(route) .then(response => response.json()) .then(data => { this.cache.set(route, data); console.log('Preloaded:', route); }); } }
disconnect() { this.eventSource?.close(); }}
// Usageconst cacheManager = new CacheManager();cacheManager.connect('/api/cache-events');Real-World Patterns
Batch Operations
app.post('/api/items/batch-update', async (c) => { const updates = await c.req.json(); // Array of updates
const invalidationTags = []; const preloadUrls = [];
for (const update of updates) { await updateItem(update.id, update.data); invalidationTags.push(`item:${update.id}`); preloadUrls.push(`/api/items/${update.id}`); }
// Single invalidation for all changes invalidationTags.push('items-list'); await hub.invalidate(invalidationTags);
// Preload all updated items await hub.preloadHint(preloadUrls);
return c.json({ updated: updates.length });});Conditional Invalidation
app.patch('/api/posts/:id', async (c) => { const id = c.req.param('id'); const updates = await c.req.json(); const oldPost = await getPost(id);
await updatePost(id, updates);
const tags = [`post:${id}`];
// Only invalidate list if visibility changed if (updates.status !== oldPost.status) { tags.push('posts-list'); tags.push(`posts:${updates.status}`); }
// Only invalidate author's posts if author changed if (updates.authorId !== oldPost.authorId) { tags.push(`author:${oldPost.authorId}:posts`); tags.push(`author:${updates.authorId}:posts`); }
await hub.invalidate(tags);
return c.json({ success: true });});Advanced Topics
Custom Event Types
Extend the hub for custom events:
class ExtendedCacheHub extends CacheHub { async notifyUpdate(entityType: string, entityId: string, data: any) { const message = JSON.stringify({ type: entityType, id: entityId, data });
await this.broadcast('entity-update', message); }
async broadcastMetrics(metrics: any) { await this.broadcast('metrics', JSON.stringify(metrics)); }}
// Usageconst hub = new ExtendedCacheHub();
app.patch('/api/users/:id', async (c) => { const user = await updateUser(id, data);
// Standard invalidation await hub.invalidate([`user:${id}`]);
// Custom notification await hub.notifyUpdate('user', id, user);
return c.json(user);});Performance Optimization
Debounced Invalidation
class DebouncedCacheHub extends CacheHub { constructor() { super(); this.pendingInvalidations = new Set(); this.debounceTimer = null; }
async invalidate(tags: string[]) { tags.forEach(tag => this.pendingInvalidations.add(tag));
if (this.debounceTimer) { clearTimeout(this.debounceTimer); }
this.debounceTimer = setTimeout(async () => { const tags = Array.from(this.pendingInvalidations); this.pendingInvalidations.clear(); await super.invalidate(tags); }, 100); // Batch invalidations within 100ms }}Selective Broadcasting
class SelectiveCacheHub extends CacheHub { constructor() { super(); this.subscriptions = new Map(); // connection -> interests }
addConnection(stream, interests = ['*']) { super.addConnection(stream); this.subscriptions.set(stream, new Set(interests)); }
async invalidate(tags: string[]) { for (const [connection, interests] of this.subscriptions) { // Only send if connection is interested const relevantTags = tags.filter(tag => { return interests.has('*') || interests.has(tag) || Array.from(interests).some(i => tag.startsWith(i)); });
if (relevantTags.length > 0) { await connection.writeSSE({ event: 'invalidate', data: JSON.stringify(relevantTags) }); } } }}Troubleshooting
Clients keep disconnecting and reconnecting
Ensure you're sending periodic heartbeats. Some proxies close idle connections after 30-60 seconds.
Invalidation events aren't received
Check CORS headers and ensure the SSE endpoint is accessible from your client domain.
Memory usage increases over time
Verify dead connections are being removed. The hub auto-cleans on broadcast, but you should also remove on abort.
Events are received multiple times
Ensure you're not adding the same stream multiple times. Use a Set or check for duplicates.