Realtime Infrastructure
Status: ✅ Production (Migrations 199-202 - February 2026)
Realtime Infrastructure
Table of Contents
- Overview
- Ably Migration
- Architecture
- Realtime Abstraction Layer
- Provider System
- React Hooks
- SSE Heartbeat
- Database Tables
- Event Bus Integration
- Frontend Integration
- Migration Guide
Overview
Status: ✅ Production (Migrations 199-202 - February 2026)
The Realtime Infrastructure provides a unified abstraction layer for pub/sub messaging, supporting both Ably (production) and mock providers (testing). It replaces the legacy SSE-only approach with a scalable, provider-agnostic system.
Key Features
✅ Provider abstraction — Swap Ably, Pusher, or mock providers without code changes
✅ SSE heartbeat system — Keep-alive pings with configurable intervals
✅ React hooks — useRealtimeConnection, useRealtimeChannel, useRealtimePresence
✅ Event bus integration — Bridge unified events to realtime channels
✅ Connection monitoring — Track connections, heartbeats, message counts
✅ Database-backed configuration — Channel registry, heartbeat config, connection stats
Ably Migration
Why Ably?
Problems with SSE-only:
- No server→client push outside HTTP response
- No connection persistence across requests
- No presence tracking
- No message history
- No scalability across multiple servers
Ably Benefits:
- True bidirectional messaging
- Built-in presence
- Message history and replay
- Automatic reconnection
- Horizontal scaling
- 99.999% uptime SLA
Migration Timeline
Phase 1: Abstraction layer (DONE) Phase 2: Frontend migration (DONE) Phase 3: Backend cleanup (DONE) Phase 4: Chat endpoint fixes (DONE) Phase 5: Discovery streaming (DONE)
Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ REALTIME INFRASTRUCTURE ARCHITECTURE │
│ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ FRONTEND (React) │ │
│ │ │ │
│ │ ┌────────────────────┐ ┌─────────────────────┐ │ │
│ │ │ useRealtimeChannel │ │ useRealtimePresence │ │ │
│ │ └─────────┬──────────┘ └──────────┬──────────┘ │ │
│ │ │ │ │ │
│ │ └────────────┬───────────┘ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────┐ │ │
│ │ │ useRealtimeConnection│ │ │
│ │ └──────────┬───────────┘ │ │
│ │ │ │ │
│ └─────────────────────────┼──────────────────────────────────────────────┘
│ │
│ ┌─────────────────────────┼──────────────────────────────────────────────┐
│ │ ABSTRACTION LAYER (src/lib/realtime/) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────┐ │ │
│ │ │ RealtimeClientFactory │ │ │
│ │ └──────────┬──────────────┘ │ │
│ │ │ │ │
│ │ │ creates │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────┐ │ │
│ │ │ RealtimeProvider Interface │ │ │
│ │ │ │ │ │
│ │ │ - connect() │ │ │
│ │ │ - disconnect() │ │ │
│ │ │ - subscribe(channel) │ │ │
│ │ │ - publish(channel, event) │ │ │
│ │ │ - presence.enter/leave │ │ │
│ │ └───────┬──────────────────────────────────┘ │ │
│ │ │ │ │
│ │ │ implemented by │ │
│ │ │ │ │
│ │ ┌──────┴──────┬──────────────┬────────────────┐ │ │
│ │ ▼ ▼ ▼ ▼ │ │
│ │ ┌────────┐ ┌─────────┐ ┌──────────┐ ┌──────────────┐ │ │
│ │ │ Ably │ │ Pusher │ │ Mock │ │ Future │ │ │
│ │ │Provider│ │Provider │ │ Provider │ │ Providers │ │ │
│ │ └────────┘ └─────────┘ └──────────┘ └──────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘
│
│ ┌─────────────────────────────────────────────────────────────────────┐
│ │ BACKEND (Node.js) │ │
│ │ │ │
│ │ ┌──────────────────────┐ ┌──────────────────────────┐ │ │
│ │ │ Unified Event Bus │───────▶│ Realtime Event Bridge │ │ │
│ │ │ │ │ │ │ │
│ │ │ Events: │ │ Publishes to: │ │ │
│ │ │ - project.updated │ │ - project:{id} │ │ │
│ │ │ - session.message │ │ - session:{id} │ │ │
│ │ │ - obligation.changed│ │ - organization:{id} │ │ │
│ │ └──────────────────────┘ └──────────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ SSE HEARTBEAT SYSTEM │ │ │
│ │ │ │ │ │
│ │ │ Every 30s (configurable): │ │ │
│ │ │ data: {"type":"heartbeat","timestamp":1234567890} │ │ │
│ │ │ │ │ │
│ │ │ Timeout: 90s (3 missed heartbeats = disconnected) │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘
│
│ ┌─────────────────────────────────────────────────────────────────────┐
│ │ DATABASE (PostgreSQL) │ │
│ │ │ │
│ │ ┌──────────────────────┐ ┌──────────────────────────┐ │ │
│ │ │ realtime_channels │ │ realtime_heartbeat_config│ │ │
│ │ │ │ │ │ │ │
│ │ │ - tenant_id │ │ - channel_type │ │ │
│ │ │ - channel_name │ │ - heartbeat_interval_ms │ │ │
│ │ │ - channel_type │ │ - timeout_ms │ │ │
│ │ │ - is_active │ └──────────────────────────┘ │ │
│ │ └──────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────────┐ ┌──────────────────────────┐ │ │
│ │ │ realtime_events_log │ │ realtime_connection_stats│ │ │
│ │ │ │ │ │ │ │
│ │ │ - channel_id │ │ - channel_id │ │ │
│ │ │ - event_type │ │ - connection_id │ │ │
│ │ │ - event_data │ │ - last_heartbeat_at │ │ │
│ │ │ - published_at │ │ - message_count │ │ │
│ │ └──────────────────────┘ └──────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘
│
└─────────────────────────────────────────────────────────────────────────────┘Realtime Abstraction Layer
Location: src/lib/realtime/
Core Interface
// src/lib/realtime/types.ts
export interface RealtimeProvider {
// Connection
connect(apiKey: string, clientId: string): Promise<void>;
disconnect(): Promise<void>;
isConnected(): boolean;
// Channels
subscribe(
channelName: string,
callback: (message: RealtimeMessage) => void
): Promise<RealtimeChannel>;
unsubscribe(channelName: string): Promise<void>;
publish(channelName: string, event: string, data: any): Promise<void>;
// Presence
presence: {
enter(channelName: string, data?: any): Promise<void>;
leave(channelName: string): Promise<void>;
get(channelName: string): Promise<PresenceMember[]>;
};
}
export interface RealtimeChannel {
name: string;
subscribe(event: string, callback: (message: any) => void): void;
unsubscribe(event?: string): void;
publish(event: string, data: any): Promise<void>;
}
export interface RealtimeMessage {
name: string;
data: any;
timestamp: number;
clientId: string;
}Provider System
Ably Provider
File: src/lib/realtime/client.ts
import * as Ably from 'ably';
export class AblyRealtimeProvider implements RealtimeProvider {
private client: Ably.Realtime | null = null;
private channels = new Map<string, Ably.RealtimeChannel>();
async connect(apiKey: string, clientId: string): Promise<void> {
this.client = new Ably.Realtime({
key: apiKey,
clientId,
autoConnect: true,
echoMessages: false,
});
return new Promise((resolve, reject) => {
this.client!.connection.once('connected', () => resolve());
this.client!.connection.once('failed', reject);
});
}
async subscribe(
channelName: string,
callback: (message: RealtimeMessage) => void
): Promise<RealtimeChannel> {
if (!this.client) throw new Error('Not connected');
const channel = this.client.channels.get(channelName);
this.channels.set(channelName, channel);
channel.subscribe((message: Ably.Message) => {
callback({
name: message.name,
data: message.data,
timestamp: message.timestamp,
clientId: message.clientId,
});
});
return {
name: channelName,
subscribe: (event, cb) => channel.subscribe(event, cb),
unsubscribe: (event) => channel.unsubscribe(event),
publish: (event, data) => channel.publish(event, data),
};
}
async publish(channelName: string, event: string, data: any): Promise<void> {
const channel = this.channels.get(channelName);
if (!channel) throw new Error(`Not subscribed to ${channelName}`);
await channel.publish(event, data);
}
presence = {
enter: async (channelName: string, data?: any) => {
const channel = this.channels.get(channelName);
if (!channel) throw new Error(`Not subscribed to ${channelName}`);
await channel.presence.enter(data);
},
leave: async (channelName: string) => {
const channel = this.channels.get(channelName);
if (!channel) throw new Error(`Not subscribed to ${channelName}`);
await channel.presence.leave();
},
get: async (channelName: string) => {
const channel = this.channels.get(channelName);
if (!channel) throw new Error(`Not subscribed to ${channelName}`);
const members = await channel.presence.get();
return members.map(m => ({
clientId: m.clientId,
data: m.data,
}));
},
};
}Mock Provider
File: src/lib/realtime/client.ts
Used for testing. Stores messages in-memory:
export class MockRealtimeProvider implements RealtimeProvider {
private connected = false;
private subscriptions = new Map<string, Set<(msg: RealtimeMessage) => void>>();
private presenceMembers = new Map<string, Set<string>>();
async connect(): Promise<void> {
this.connected = true;
}
async subscribe(
channelName: string,
callback: (message: RealtimeMessage) => void
): Promise<RealtimeChannel> {
if (!this.subscriptions.has(channelName)) {
this.subscriptions.set(channelName, new Set());
}
this.subscriptions.get(channelName)!.add(callback);
return {
name: channelName,
subscribe: (event, cb) => { /* mock */ },
unsubscribe: () => { /* mock */ },
publish: async (event, data) => {
this.publish(channelName, event, data);
},
};
}
async publish(channelName: string, event: string, data: any): Promise<void> {
const callbacks = this.subscriptions.get(channelName);
if (callbacks) {
const message: RealtimeMessage = {
name: event,
data,
timestamp: Date.now(),
clientId: 'mock-client',
};
callbacks.forEach(cb => cb(message));
}
}
}Client Factory
File: src/lib/realtime/client.ts
export function createRealtimeClient(
provider: 'ably' | 'mock',
config?: RealtimeConfig
): RealtimeProvider {
switch (provider) {
case 'ably':
return new AblyRealtimeProvider();
case 'mock':
return new MockRealtimeProvider();
default:
throw new Error(`Unknown provider: ${provider}`);
}
}Usage:
import { createRealtimeClient } from '@/lib/realtime';
// Production
const client = createRealtimeClient('ably');
await client.connect(process.env.ABLY_API_KEY, userId);
// Testing
const mockClient = createRealtimeClient('mock');
await mockClient.connect('', 'test-user');React Hooks
Location: testing-ui/src/hooks/
useRealtimeConnection
Manages connection to realtime provider.
// testing-ui/src/hooks/useRealtimeConnection.ts
export function useRealtimeConnection(userId: string) {
const [isConnected, setIsConnected] = useState(false);
const [error, setError] = useState<Error | null>(null);
const clientRef = useRef<RealtimeProvider | null>(null);
useEffect(() => {
const client = createRealtimeClient('ably');
client.connect(process.env.NEXT_PUBLIC_ABLY_KEY, userId)
.then(() => {
clientRef.current = client;
setIsConnected(true);
})
.catch(err => {
setError(err);
});
return () => {
client.disconnect();
};
}, [userId]);
return { client: clientRef.current, isConnected, error };
}useRealtimeChannel
Subscribe to a specific channel.
// testing-ui/src/hooks/useRealtimeChannel.ts
export function useRealtimeChannel(
client: RealtimeProvider | null,
channelName: string,
onMessage: (message: RealtimeMessage) => void
) {
const [channel, setChannel] = useState<RealtimeChannel | null>(null);
useEffect(() => {
if (!client) return;
let subscribed = true;
client.subscribe(channelName, (msg) => {
if (subscribed) {
onMessage(msg);
}
}).then(ch => {
if (subscribed) {
setChannel(ch);
}
});
return () => {
subscribed = false;
client.unsubscribe(channelName);
};
}, [client, channelName]);
const publish = useCallback(
(event: string, data: any) => {
if (channel) {
return channel.publish(event, data);
}
return Promise.reject(new Error('Channel not ready'));
},
[channel]
);
return { channel, publish };
}useRealtimePresence
Track who's online in a channel.
// testing-ui/src/hooks/useRealtimePresence.ts
export function useRealtimePresence(
client: RealtimeProvider | null,
channelName: string
) {
const [members, setMembers] = useState<PresenceMember[]>([]);
useEffect(() => {
if (!client) return;
// Enter presence
client.presence.enter(channelName, { name: 'User' });
// Poll for members (Ably also has presence.subscribe for real-time updates)
const interval = setInterval(() => {
client.presence.get(channelName).then(setMembers);
}, 5000);
return () => {
clearInterval(interval);
client.presence.leave(channelName);
};
}, [client, channelName]);
return { members };
}SSE Heartbeat
Purpose: Keep SSE connections alive with periodic pings.
Configuration
Database:
CREATE TABLE realtime_heartbeat_config (
id UUID PRIMARY KEY,
channel_type TEXT UNIQUE CHECK (channel_type IN (
'project', 'session', 'organization', 'user', 'system'
)),
heartbeat_interval_ms INTEGER DEFAULT 30000, -- 30s
timeout_ms INTEGER DEFAULT 90000, -- 90s
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
);Default Config:
| Channel Type | Heartbeat Interval | Timeout |
|---|---|---|
| project | 30s | 90s |
| session | 30s | 90s |
| organization | 60s | 180s |
| user | 30s | 90s |
| system | 60s | 180s |
Implementation
Backend:
// src/api/helpers/sse-heartbeat.ts
export function startSSEHeartbeat(
response: ServerResponse,
channelType: string,
intervalMs = 30000
): NodeJS.Timeout {
const interval = setInterval(() => {
if (response.writableEnded) {
clearInterval(interval);
return;
}
const heartbeat = {
type: 'heartbeat',
timestamp: Date.now(),
};
response.write(`data: ${JSON.stringify(heartbeat)}\n\n`);
}, intervalMs);
return interval;
}Frontend:
// testing-ui/src/hooks/useSSEHeartbeat.ts
export function useSSEHeartbeat(
onTimeout: () => void,
timeoutMs = 90000
) {
const lastHeartbeatRef = useRef<number>(Date.now());
const handleHeartbeat = useCallback(() => {
lastHeartbeatRef.current = Date.now();
}, []);
useEffect(() => {
const checkInterval = setInterval(() => {
const elapsed = Date.now() - lastHeartbeatRef.current;
if (elapsed > timeoutMs) {
onTimeout();
}
}, 10000); // Check every 10s
return () => clearInterval(checkInterval);
}, [timeoutMs, onTimeout]);
return { handleHeartbeat };
}Database Tables
realtime_channels
CREATE TABLE realtime_channels (
id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
channel_name TEXT NOT NULL,
channel_type TEXT CHECK (channel_type IN (
'project', 'session', 'organization', 'user', 'system'
)),
entity_id UUID,
config JSONB DEFAULT '{}',
is_active BOOLEAN DEFAULT true,
last_activity_at TIMESTAMPTZ,
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
);
CREATE UNIQUE INDEX idx_realtime_channels_name
ON realtime_channels(tenant_id, channel_name);Example Channels:
project:abc123→ All updates for project abc123session:def456→ Discovery session def456 chatorganization:org789→ Organization-wide eventsuser:user001→ User-specific notifications
realtime_events_log
CREATE TABLE realtime_events_log (
id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
channel_id UUID REFERENCES realtime_channels(id),
event_type TEXT NOT NULL,
event_data JSONB NOT NULL,
published_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_realtime_events_log_channel
ON realtime_events_log(channel_id, published_at);Usage: Audit trail of all realtime events.
realtime_connection_stats
CREATE TABLE realtime_connection_stats (
id UUID PRIMARY KEY,
tenant_id UUID NOT NULL,
channel_id UUID REFERENCES realtime_channels(id),
connection_id TEXT NOT NULL,
client_info JSONB,
connected_at TIMESTAMPTZ DEFAULT NOW(),
disconnected_at TIMESTAMPTZ,
last_heartbeat_at TIMESTAMPTZ,
message_count INTEGER DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_realtime_connection_stats_active
ON realtime_connection_stats(disconnected_at) WHERE disconnected_at IS NULL;Usage: Monitor active connections, detect stale connections.
Event Bus Integration
Unified Event Bus → Realtime Bridge
File: src/services/events/conversation-event-bridge.ts
Bridges unified events to realtime channels:
import { EventBus } from './unified-event-bus.js';
import { createRealtimeClient } from '../../lib/realtime/index.js';
export class ConversationEventBridge {
private eventBus: EventBus;
private realtimeClient: RealtimeProvider;
constructor() {
this.eventBus = EventBus.getInstance();
this.realtimeClient = createRealtimeClient('ably');
}
async start() {
await this.realtimeClient.connect(
process.env.ABLY_API_KEY!,
'server'
);
// Bridge project events
this.eventBus.on('project.updated', async (event) => {
await this.realtimeClient.publish(
`project:${event.payload.projectId}`,
'project.updated',
event.payload
);
});
// Bridge session events
this.eventBus.on('session.message', async (event) => {
await this.realtimeClient.publish(
`session:${event.payload.sessionId}`,
'session.message',
event.payload
);
});
// Bridge obligation events
this.eventBus.on('obligation.changed', async (event) => {
await this.realtimeClient.publish(
`session:${event.payload.sessionId}`,
'obligation.changed',
event.payload
);
});
}
}Usage:
// src/index.ts
import { ConversationEventBridge } from './services/events/conversation-event-bridge.js';
const bridge = new ConversationEventBridge();
await bridge.start();Frontend Integration
Provider Setup
File: testing-ui/src/providers/RealtimeProvider.tsx
import { createContext, useContext, useEffect, useState } from 'react';
import { createRealtimeClient } from '@/lib/realtime';
const RealtimeContext = createContext<RealtimeProvider | null>(null);
export function RealtimeProvider({ children, userId }: Props) {
const [client, setClient] = useState<RealtimeProvider | null>(null);
useEffect(() => {
const realtimeClient = createRealtimeClient('ably');
realtimeClient.connect(
process.env.NEXT_PUBLIC_ABLY_KEY!,
userId
).then(() => {
setClient(realtimeClient);
});
return () => {
realtimeClient.disconnect();
};
}, [userId]);
return (
<RealtimeContext.Provider value={client}>
{children}
</RealtimeContext.Provider>
);
}
export function useRealtime() {
return useContext(RealtimeContext);
}App Wrapper
File: testing-ui/src/app/layout.tsx
import { RealtimeProvider } from '@/providers/RealtimeProvider';
export default function RootLayout({ children }) {
const { userId } = useAuth();
return (
<html>
<body>
<RealtimeProvider userId={userId}>
{children}
</RealtimeProvider>
</body>
</html>
);
}Migration Guide
From SSE to Ably
Old Code (SSE):
// OLD: testing-ui/src/hooks/useSSE.ts
const eventSource = new EventSource('/api/stream');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
handleMessage(data);
};New Code (Ably):
// NEW: testing-ui/src/hooks/useProjectEvents.ts
const client = useRealtime();
const { channel } = useRealtimeChannel(
client,
`project:${projectId}`,
(message) => {
if (message.name === 'project.updated') {
handleMessage(message.data);
}
}
);Backend: Publish Events
Old Code:
// OLD: Inline SSE write
response.write(`data: ${JSON.stringify(event)}\n\n`);New Code:
// NEW: Use event bus
import { EventBus } from '@/services/events/unified-event-bus';
EventBus.getInstance().emit('project.updated', {
projectId,
changes: { status: 'completed' }
});
// Event bridge automatically publishes to realtime channelTesting
Unit Tests
File: tests/unit/lib/realtime/mock-provider.test.ts
Tests mock provider functionality.
File: tests/unit/lib/realtime/client-factory.test.ts
Tests client factory creation.
File: tests/unit/api/helpers/sse-heartbeat.test.ts
Tests SSE heartbeat logic.
E2E Tests
File: testing-ui/tests/e2e/features/realtime-events.spec.ts
Tests end-to-end realtime event flow.
Related Documentation
- 02-product-architecture.md — System architecture
- 11-database-schema.md — Database schema
- 04-features-current.md — Features overview
Implemented: February 2026 (Migrations 199-202) Last Updated: February 2026