SSE Package Documentation

Server-Sent Events integration for Count Cachula - enabling real-time cache invalidation and preload hints.

Installation

Terminal window
npm install @countcachula/sse

Basic Setup

import { CacheHub } from '@countcachula/sse';
import { streamSSE } from 'hono/streaming';
// Create a hub instance
const hub = new CacheHub();
// Set up SSE endpoint
app.get('/api/cache-events', (c) => {
return streamSSE(c, async (stream) => {
hub.addConnection(stream);
await stream.writeSSE({
event: 'connected',
data: 'connected'
});
c.req.raw.signal.addEventListener('abort', () => {
hub.removeConnection(stream);
});
// Keep alive
while (!c.req.raw.signal.aborted) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
});
});
// Invalidate cache on mutations
app.patch('/api/items/:id', async (c) => {
// ... update logic ...
hub.invalidate([`item:${id}`, 'items-list']);
return c.json({ success: true });
});

Core Concepts

Server-Sent Events Overview

Server-Sent Events (SSE) provide a one-way communication channel from server to client over HTTP:

  • Persistent Connection - Single long-lived HTTP connection
  • Automatic Reconnection - Browser handles reconnection automatically
  • Text-Based Protocol - Simple event stream format
  • Real-Time Updates - Server can push updates instantly

SSE is perfect for cache invalidation because:

  • • Lightweight compared to WebSockets
  • • Built-in browser support
  • • Works through proxies and firewalls
  • • Ideal for server-to-client notifications

Cache Invalidation Patterns

Cache invalidation tells clients when their cached data is stale:

Tag-Based Invalidation

// Tag your cache entries conceptually
hub.invalidate([
'user:123', // Specific user data
'users-list', // List containing this user
'stats:users' // User statistics
]);

Cascade Invalidation

// When a parent changes, invalidate children
hub.invalidate([
'project:456',
'project:456:tasks',
'project:456:members'
]);

Global Invalidation

// System-wide changes
hub.invalidate(['*']); // Clear everything

Preload Hints Explained

Preload hints tell clients what data they'll likely need next:

// After returning a list, hint about detail pages
app.get('/api/issues', async (c) => {
const issues = await getIssues();
// Tell clients to preload individual issue pages
const urls = issues.map(i => `/api/issues/${i.id}`);
hub.preloadHint(urls);
return c.json(issues);
});

Benefits:

  • Instant Navigation - Data is already cached when needed
  • Predictive Loading - Load likely next steps
  • Bandwidth Efficient - Only preload what's probable

API Reference

CacheHub

Central manager for SSE connections and cache events.

Constructor

const hub = new CacheHub();

Creates a new hub instance. Typically one per server.

addConnection(stream: SSEStream): void

Register a new SSE connection.

Parameters:
  • stream: SSEStream - SSE stream object with writeSSE method
Example:
app.get('/sse', (c) => {
return streamSSE(c, async (stream) => {
hub.addConnection(stream);
// ...
});
});

removeConnection(stream: SSEStream): void

Unregister an SSE connection.

Parameters:
  • stream: SSEStream - Previously added stream to remove
Example:
c.req.raw.signal.addEventListener('abort', () => {
hub.removeConnection(stream);
});

invalidate(tags: string[]): Promise<void>

Broadcast cache invalidation event to all connected clients.

Parameters:
  • tags: string[] - Array of cache tags to invalidate
Returns:
  • Promise<void> - Resolves when broadcast is complete
Example:
// Single tag
await hub.invalidate(['user:123']);
// Multiple tags
await hub.invalidate(['posts-list', 'post:456', 'comments:456']);
// Pattern matching (client-side implementation)
await hub.invalidate(['posts:*']); // All posts

preloadHint(routes: string[]): Promise<void>

Broadcast preload hint event to all connected clients.

Parameters:
  • routes: string[] - Array of URLs to preload
Returns:
  • Promise<void> - Resolves when broadcast is complete
Example:
// Hint about related data
await hub.preloadHint([
'/api/user/profile',
'/api/user/settings',
'/api/user/notifications'
]);
// Hint about next page
await hub.preloadHint(['/api/items?page=2']);

SSEStream Interface

The stream interface expected by CacheHub:

interface SSEStream {
writeSSE: (message: {
data: string;
event: string;
}) => Promise<void>;
}

Compatible with:

  • • Hono's streamSSE
  • • Express with express-sse
  • • Custom implementations

Guides

Setting Up SSE Endpoint

With Hono

import { Hono } from 'hono';
import { streamSSE } from 'hono/streaming';
import { CacheHub } from '@countcachula/sse';
const app = new Hono();
const hub = new CacheHub();
app.get('/api/cache-events', (c) => {
return streamSSE(c, async (stream) => {
// Register connection
hub.addConnection(stream);
// Send initial connection event
await stream.writeSSE({
event: 'connected',
data: JSON.stringify({ timestamp: Date.now() })
});
// Clean up on disconnect
c.req.raw.signal.addEventListener('abort', () => {
hub.removeConnection(stream);
console.log('Client disconnected');
});
// Keep connection alive with heartbeat
while (!c.req.raw.signal.aborted) {
await new Promise(resolve => setTimeout(resolve, 30000));
// Send heartbeat
await stream.writeSSE({
event: 'heartbeat',
data: 'ping'
});
}
});
});

With Express

import express from 'express';
import { CacheHub } from '@countcachula/sse';
const app = express();
const hub = new CacheHub();
app.get('/api/cache-events', (req, res) => {
// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// Create stream wrapper
const stream = {
writeSSE: async ({ event, data }) => {
res.write(`event: ${event}\n`);
res.write(`data: ${data}\n\n`);
}
};
hub.addConnection(stream);
// Send initial event
stream.writeSSE({
event: 'connected',
data: 'connected'
});
// Clean up on disconnect
req.on('close', () => {
hub.removeConnection(stream);
});
});

Invalidation Strategies

Entity-Based Invalidation

// When an entity changes, invalidate all related caches
app.patch('/api/users/:id', async (c) => {
const userId = c.req.param('id');
const updates = await c.req.json();
await updateUser(userId, updates);
// Invalidate specific user and any lists containing them
await hub.invalidate([
`user:${userId}`, // User detail
'users-list', // Main list
`team:${updates.teamId}:members`, // Team member list
'stats:users' // User statistics
]);
return c.json({ success: true });
});

Hierarchical Invalidation

// Invalidate parent and all children
app.delete('/api/projects/:id', async (c) => {
const projectId = c.req.param('id');
await deleteProject(projectId);
await hub.invalidate([
`project:${projectId}`,
`project:${projectId}:tasks`,
`project:${projectId}:members`,
`project:${projectId}:files`,
'projects-list'
]);
return c.json({ success: true });
});

Time-Based Invalidation

// Periodically invalidate volatile data
setInterval(async () => {
await hub.invalidate([
'stats:realtime',
'dashboard:metrics',
'active-users'
]);
}, 60000); // Every minute

Preloading Patterns

List-to-Detail Preloading

app.get('/api/posts', async (c) => {
const posts = await getPosts();
// After sending list, hint about individual posts
if (posts.length > 0) {
// Preload first 5 posts
const urls = posts
.slice(0, 5)
.map(p => `/api/posts/${p.id}`);
await hub.preloadHint(urls);
}
return c.json(posts);
});

Navigation-Based Preloading

app.get('/api/user/profile', async (c) => {
const profile = await getUserProfile(c.userId);
// Preload likely next navigation
await hub.preloadHint([
'/api/user/settings', // Common next step
'/api/user/notifications', // Often checked together
`/api/users/${profile.id}/posts` // User's content
]);
return c.json(profile);
});

Pagination Preloading

app.get('/api/items', async (c) => {
const page = parseInt(c.req.query('page') || '1');
const items = await getItems(page);
// Preload next page
if (items.hasMore) {
await hub.preloadHint([
`/api/items?page=${page + 1}`
]);
}
return c.json(items);
});

Examples

Complete Server Setup

import { Hono } from 'hono';
import { streamSSE } from 'hono/streaming';
import { CacheHub } from '@countcachula/sse';
const app = new Hono();
const hub = new CacheHub();
// In-memory database
const items = new Map();
// SSE endpoint
app.get('/api/events', (c) => {
return streamSSE(c, async (stream) => {
hub.addConnection(stream);
await stream.writeSSE({
event: 'connected',
data: new Date().toISOString()
});
c.req.raw.signal.addEventListener('abort', () => {
hub.removeConnection(stream);
});
while (!c.req.raw.signal.aborted) {
await new Promise(r => setTimeout(r, 1000));
}
});
});
// CRUD operations with cache management
app.get('/api/items', (c) => {
const itemsList = Array.from(items.values());
// Preload first 3 items
const preloads = itemsList
.slice(0, 3)
.map(item => `/api/items/${item.id}`);
if (preloads.length > 0) {
hub.preloadHint(preloads);
}
return c.json(itemsList);
});
app.post('/api/items', async (c) => {
const data = await c.req.json();
const item = {
id: crypto.randomUUID(),
...data,
createdAt: new Date()
};
items.set(item.id, item);
// Invalidate list cache
await hub.invalidate(['items-list']);
// Preload the new item
await hub.preloadHint([`/api/items/${item.id}`]);
return c.json(item, 201);
});
app.patch('/api/items/:id', async (c) => {
const id = c.req.param('id');
const updates = await c.req.json();
const item = items.get(id);
if (!item) {
return c.json({ error: 'Not found' }, 404);
}
const updated = { ...item, ...updates, updatedAt: new Date() };
items.set(id, updated);
// Invalidate specific item and list
await hub.invalidate([`item:${id}`, 'items-list']);
return c.json(updated);
});

Client-Side Integration

// Client code to consume SSE events
class CacheManager {
constructor() {
this.eventSource = null;
this.cache = new Map();
}
connect(url) {
this.eventSource = new EventSource(url);
this.eventSource.addEventListener('connected', (e) => {
console.log('Connected to cache events');
});
this.eventSource.addEventListener('invalidate', (e) => {
const tags = JSON.parse(e.data);
this.handleInvalidation(tags);
});
this.eventSource.addEventListener('preload', (e) => {
const routes = JSON.parse(e.data);
this.handlePreload(routes);
});
}
handleInvalidation(tags) {
// Clear matching cache entries
tags.forEach(tag => {
this.cache.delete(tag);
// Pattern matching for wildcards
if (tag.includes('*')) {
const pattern = new RegExp(tag.replace('*', '.*'));
for (const key of this.cache.keys()) {
if (pattern.test(key)) {
this.cache.delete(key);
}
}
}
});
}
async handlePreload(routes) {
// Preload in background
for (const route of routes) {
fetch(route)
.then(response => response.json())
.then(data => {
this.cache.set(route, data);
console.log('Preloaded:', route);
});
}
}
disconnect() {
this.eventSource?.close();
}
}
// Usage
const cacheManager = new CacheManager();
cacheManager.connect('/api/cache-events');

Real-World Patterns

Batch Operations

app.post('/api/items/batch-update', async (c) => {
const updates = await c.req.json(); // Array of updates
const invalidationTags = [];
const preloadUrls = [];
for (const update of updates) {
await updateItem(update.id, update.data);
invalidationTags.push(`item:${update.id}`);
preloadUrls.push(`/api/items/${update.id}`);
}
// Single invalidation for all changes
invalidationTags.push('items-list');
await hub.invalidate(invalidationTags);
// Preload all updated items
await hub.preloadHint(preloadUrls);
return c.json({ updated: updates.length });
});

Conditional Invalidation

app.patch('/api/posts/:id', async (c) => {
const id = c.req.param('id');
const updates = await c.req.json();
const oldPost = await getPost(id);
await updatePost(id, updates);
const tags = [`post:${id}`];
// Only invalidate list if visibility changed
if (updates.status !== oldPost.status) {
tags.push('posts-list');
tags.push(`posts:${updates.status}`);
}
// Only invalidate author's posts if author changed
if (updates.authorId !== oldPost.authorId) {
tags.push(`author:${oldPost.authorId}:posts`);
tags.push(`author:${updates.authorId}:posts`);
}
await hub.invalidate(tags);
return c.json({ success: true });
});

Advanced Topics

Custom Event Types

Extend the hub for custom events:

class ExtendedCacheHub extends CacheHub {
async notifyUpdate(entityType: string, entityId: string, data: any) {
const message = JSON.stringify({
type: entityType,
id: entityId,
data
});
await this.broadcast('entity-update', message);
}
async broadcastMetrics(metrics: any) {
await this.broadcast('metrics', JSON.stringify(metrics));
}
}
// Usage
const hub = new ExtendedCacheHub();
app.patch('/api/users/:id', async (c) => {
const user = await updateUser(id, data);
// Standard invalidation
await hub.invalidate([`user:${id}`]);
// Custom notification
await hub.notifyUpdate('user', id, user);
return c.json(user);
});

Performance Optimization

Debounced Invalidation

class DebouncedCacheHub extends CacheHub {
constructor() {
super();
this.pendingInvalidations = new Set();
this.debounceTimer = null;
}
async invalidate(tags: string[]) {
tags.forEach(tag => this.pendingInvalidations.add(tag));
if (this.debounceTimer) {
clearTimeout(this.debounceTimer);
}
this.debounceTimer = setTimeout(async () => {
const tags = Array.from(this.pendingInvalidations);
this.pendingInvalidations.clear();
await super.invalidate(tags);
}, 100); // Batch invalidations within 100ms
}
}

Selective Broadcasting

class SelectiveCacheHub extends CacheHub {
constructor() {
super();
this.subscriptions = new Map(); // connection -> interests
}
addConnection(stream, interests = ['*']) {
super.addConnection(stream);
this.subscriptions.set(stream, new Set(interests));
}
async invalidate(tags: string[]) {
for (const [connection, interests] of this.subscriptions) {
// Only send if connection is interested
const relevantTags = tags.filter(tag => {
return interests.has('*') ||
interests.has(tag) ||
Array.from(interests).some(i => tag.startsWith(i));
});
if (relevantTags.length > 0) {
await connection.writeSSE({
event: 'invalidate',
data: JSON.stringify(relevantTags)
});
}
}
}
}

Troubleshooting

Clients keep disconnecting and reconnecting

Ensure you're sending periodic heartbeats. Some proxies close idle connections after 30-60 seconds.

Invalidation events aren't received

Check CORS headers and ensure the SSE endpoint is accessible from your client domain.

Memory usage increases over time

Verify dead connections are being removed. The hub auto-cleans on broadcast, but you should also remove on abort.

Events are received multiple times

Ensure you're not adding the same stream multiple times. Use a Set or check for duplicates.

Next Steps