Realtime Infrastructure

Status: ✅ Production (Migrations 199-202 - February 2026)

Realtime Infrastructure

Table of Contents


Overview

Status: ✅ Production (Migrations 199-202 - February 2026)

The Realtime Infrastructure provides a unified abstraction layer for pub/sub messaging, supporting both Ably (production) and mock providers (testing). It replaces the legacy SSE-only approach with a scalable, provider-agnostic system.

Key Features

Provider abstraction — Swap Ably, Pusher, or mock providers without code changes ✅ SSE heartbeat system — Keep-alive pings with configurable intervals ✅ React hooksuseRealtimeConnection, useRealtimeChannel, useRealtimePresenceEvent bus integration — Bridge unified events to realtime channels ✅ Connection monitoring — Track connections, heartbeats, message counts ✅ Database-backed configuration — Channel registry, heartbeat config, connection stats


Ably Migration

Why Ably?

Problems with SSE-only:

  • No server→client push outside HTTP response
  • No connection persistence across requests
  • No presence tracking
  • No message history
  • No scalability across multiple servers

Ably Benefits:

  • True bidirectional messaging
  • Built-in presence
  • Message history and replay
  • Automatic reconnection
  • Horizontal scaling
  • 99.999% uptime SLA

Migration Timeline

Phase 1: Abstraction layer (DONE) Phase 2: Frontend migration (DONE) Phase 3: Backend cleanup (DONE) Phase 4: Chat endpoint fixes (DONE) Phase 5: Discovery streaming (DONE)


Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                      REALTIME INFRASTRUCTURE ARCHITECTURE                   │
│                                                                             │
│  ┌──────────────────────────────────────────────────────────────────────┐  │
│  │                          FRONTEND (React)                            │  │
│  │                                                                       │  │
│  │  ┌────────────────────┐  ┌─────────────────────┐                    │  │
│  │  │ useRealtimeChannel │  │ useRealtimePresence │                    │  │
│  │  └─────────┬──────────┘  └──────────┬──────────┘                    │  │
│  │            │                        │                                │  │
│  │            └────────────┬───────────┘                                │  │
│  │                         ▼                                            │  │
│  │              ┌──────────────────────┐                                │  │
│  │              │ useRealtimeConnection│                                │  │
│  │              └──────────┬───────────┘                                │  │
│  │                         │                                            │  │
│  └─────────────────────────┼──────────────────────────────────────────────┘
│                            │
│  ┌─────────────────────────┼──────────────────────────────────────────────┐
│  │             ABSTRACTION LAYER (src/lib/realtime/)                    │  │
│  │                         │                                            │  │
│  │                         ▼                                            │  │
│  │            ┌─────────────────────────┐                               │  │
│  │            │ RealtimeClientFactory   │                               │  │
│  │            └──────────┬──────────────┘                               │  │
│  │                       │                                              │  │
│  │                       │ creates                                      │  │
│  │                       ▼                                              │  │
│  │      ┌─────────────────────────────────────────┐                    │  │
│  │      │      RealtimeProvider Interface         │                    │  │
│  │      │                                          │                    │  │
│  │      │  - connect()                             │                    │  │
│  │      │  - disconnect()                          │                    │  │
│  │      │  - subscribe(channel)                    │                    │  │
│  │      │  - publish(channel, event)               │                    │  │
│  │      │  - presence.enter/leave                  │                    │  │
│  │      └───────┬──────────────────────────────────┘                    │  │
│  │              │                                                       │  │
│  │              │ implemented by                                        │  │
│  │              │                                                       │  │
│  │       ┌──────┴──────┬──────────────┬────────────────┐               │  │
│  │       ▼             ▼              ▼                ▼               │  │
│  │  ┌────────┐   ┌─────────┐   ┌──────────┐   ┌──────────────┐       │  │
│  │  │  Ably  │   │ Pusher  │   │   Mock   │   │   Future     │       │  │
│  │  │Provider│   │Provider │   │ Provider │   │  Providers   │       │  │
│  │  └────────┘   └─────────┘   └──────────┘   └──────────────┘       │  │
│  │                                                                     │  │
│  └─────────────────────────────────────────────────────────────────────┘

│  ┌─────────────────────────────────────────────────────────────────────┐
│  │                        BACKEND (Node.js)                            │  │
│  │                                                                     │  │
│  │  ┌──────────────────────┐        ┌──────────────────────────┐     │  │
│  │  │  Unified Event Bus   │───────▶│  Realtime Event Bridge   │     │  │
│  │  │                      │        │                          │     │  │
│  │  │  Events:             │        │  Publishes to:           │     │  │
│  │  │  - project.updated   │        │  - project:{id}          │     │  │
│  │  │  - session.message   │        │  - session:{id}          │     │  │
│  │  │  - obligation.changed│        │  - organization:{id}     │     │  │
│  │  └──────────────────────┘        └──────────────────────────┘     │  │
│  │                                                                     │  │
│  │  ┌──────────────────────────────────────────────────────────────┐  │  │
│  │  │              SSE HEARTBEAT SYSTEM                           │  │  │
│  │  │                                                              │  │  │
│  │  │  Every 30s (configurable):                                  │  │  │
│  │  │    data: {"type":"heartbeat","timestamp":1234567890}        │  │  │
│  │  │                                                              │  │  │
│  │  │  Timeout: 90s (3 missed heartbeats = disconnected)          │  │  │
│  │  └──────────────────────────────────────────────────────────────┘  │  │
│  └─────────────────────────────────────────────────────────────────────┘

│  ┌─────────────────────────────────────────────────────────────────────┐
│  │                    DATABASE (PostgreSQL)                            │  │
│  │                                                                     │  │
│  │  ┌──────────────────────┐  ┌──────────────────────────┐           │  │
│  │  │ realtime_channels    │  │ realtime_heartbeat_config│           │  │
│  │  │                      │  │                          │           │  │
│  │  │ - tenant_id          │  │ - channel_type           │           │  │
│  │  │ - channel_name       │  │ - heartbeat_interval_ms  │           │  │
│  │  │ - channel_type       │  │ - timeout_ms             │           │  │
│  │  │ - is_active          │  └──────────────────────────┘           │  │
│  │  └──────────────────────┘                                         │  │
│  │                                                                     │  │
│  │  ┌──────────────────────┐  ┌──────────────────────────┐           │  │
│  │  │ realtime_events_log  │  │ realtime_connection_stats│           │  │
│  │  │                      │  │                          │           │  │
│  │  │ - channel_id         │  │ - channel_id             │           │  │
│  │  │ - event_type         │  │ - connection_id          │           │  │
│  │  │ - event_data         │  │ - last_heartbeat_at      │           │  │
│  │  │ - published_at       │  │ - message_count          │           │  │
│  │  └──────────────────────┘  └──────────────────────────┘           │  │
│  └─────────────────────────────────────────────────────────────────────┘

└─────────────────────────────────────────────────────────────────────────────┘

Realtime Abstraction Layer

Location: src/lib/realtime/

Core Interface

// src/lib/realtime/types.ts

export interface RealtimeProvider {
  // Connection
  connect(apiKey: string, clientId: string): Promise<void>;
  disconnect(): Promise<void>;
  isConnected(): boolean;

  // Channels
  subscribe(
    channelName: string,
    callback: (message: RealtimeMessage) => void
  ): Promise<RealtimeChannel>;
  unsubscribe(channelName: string): Promise<void>;
  publish(channelName: string, event: string, data: any): Promise<void>;

  // Presence
  presence: {
    enter(channelName: string, data?: any): Promise<void>;
    leave(channelName: string): Promise<void>;
    get(channelName: string): Promise<PresenceMember[]>;
  };
}

export interface RealtimeChannel {
  name: string;
  subscribe(event: string, callback: (message: any) => void): void;
  unsubscribe(event?: string): void;
  publish(event: string, data: any): Promise<void>;
}

export interface RealtimeMessage {
  name: string;
  data: any;
  timestamp: number;
  clientId: string;
}

Provider System

Ably Provider

File: src/lib/realtime/client.ts

import * as Ably from 'ably';

export class AblyRealtimeProvider implements RealtimeProvider {
  private client: Ably.Realtime | null = null;
  private channels = new Map<string, Ably.RealtimeChannel>();

  async connect(apiKey: string, clientId: string): Promise<void> {
    this.client = new Ably.Realtime({
      key: apiKey,
      clientId,
      autoConnect: true,
      echoMessages: false,
    });

    return new Promise((resolve, reject) => {
      this.client!.connection.once('connected', () => resolve());
      this.client!.connection.once('failed', reject);
    });
  }

  async subscribe(
    channelName: string,
    callback: (message: RealtimeMessage) => void
  ): Promise<RealtimeChannel> {
    if (!this.client) throw new Error('Not connected');

    const channel = this.client.channels.get(channelName);
    this.channels.set(channelName, channel);

    channel.subscribe((message: Ably.Message) => {
      callback({
        name: message.name,
        data: message.data,
        timestamp: message.timestamp,
        clientId: message.clientId,
      });
    });

    return {
      name: channelName,
      subscribe: (event, cb) => channel.subscribe(event, cb),
      unsubscribe: (event) => channel.unsubscribe(event),
      publish: (event, data) => channel.publish(event, data),
    };
  }

  async publish(channelName: string, event: string, data: any): Promise<void> {
    const channel = this.channels.get(channelName);
    if (!channel) throw new Error(`Not subscribed to ${channelName}`);
    await channel.publish(event, data);
  }

  presence = {
    enter: async (channelName: string, data?: any) => {
      const channel = this.channels.get(channelName);
      if (!channel) throw new Error(`Not subscribed to ${channelName}`);
      await channel.presence.enter(data);
    },

    leave: async (channelName: string) => {
      const channel = this.channels.get(channelName);
      if (!channel) throw new Error(`Not subscribed to ${channelName}`);
      await channel.presence.leave();
    },

    get: async (channelName: string) => {
      const channel = this.channels.get(channelName);
      if (!channel) throw new Error(`Not subscribed to ${channelName}`);
      const members = await channel.presence.get();
      return members.map(m => ({
        clientId: m.clientId,
        data: m.data,
      }));
    },
  };
}

Mock Provider

File: src/lib/realtime/client.ts

Used for testing. Stores messages in-memory:

export class MockRealtimeProvider implements RealtimeProvider {
  private connected = false;
  private subscriptions = new Map<string, Set<(msg: RealtimeMessage) => void>>();
  private presenceMembers = new Map<string, Set<string>>();

  async connect(): Promise<void> {
    this.connected = true;
  }

  async subscribe(
    channelName: string,
    callback: (message: RealtimeMessage) => void
  ): Promise<RealtimeChannel> {
    if (!this.subscriptions.has(channelName)) {
      this.subscriptions.set(channelName, new Set());
    }
    this.subscriptions.get(channelName)!.add(callback);

    return {
      name: channelName,
      subscribe: (event, cb) => { /* mock */ },
      unsubscribe: () => { /* mock */ },
      publish: async (event, data) => {
        this.publish(channelName, event, data);
      },
    };
  }

  async publish(channelName: string, event: string, data: any): Promise<void> {
    const callbacks = this.subscriptions.get(channelName);
    if (callbacks) {
      const message: RealtimeMessage = {
        name: event,
        data,
        timestamp: Date.now(),
        clientId: 'mock-client',
      };
      callbacks.forEach(cb => cb(message));
    }
  }
}

Client Factory

File: src/lib/realtime/client.ts

export function createRealtimeClient(
  provider: 'ably' | 'mock',
  config?: RealtimeConfig
): RealtimeProvider {
  switch (provider) {
    case 'ably':
      return new AblyRealtimeProvider();
    case 'mock':
      return new MockRealtimeProvider();
    default:
      throw new Error(`Unknown provider: ${provider}`);
  }
}

Usage:

import { createRealtimeClient } from '@/lib/realtime';

// Production
const client = createRealtimeClient('ably');
await client.connect(process.env.ABLY_API_KEY, userId);

// Testing
const mockClient = createRealtimeClient('mock');
await mockClient.connect('', 'test-user');

React Hooks

Location: testing-ui/src/hooks/

useRealtimeConnection

Manages connection to realtime provider.

// testing-ui/src/hooks/useRealtimeConnection.ts

export function useRealtimeConnection(userId: string) {
  const [isConnected, setIsConnected] = useState(false);
  const [error, setError] = useState<Error | null>(null);
  const clientRef = useRef<RealtimeProvider | null>(null);

  useEffect(() => {
    const client = createRealtimeClient('ably');

    client.connect(process.env.NEXT_PUBLIC_ABLY_KEY, userId)
      .then(() => {
        clientRef.current = client;
        setIsConnected(true);
      })
      .catch(err => {
        setError(err);
      });

    return () => {
      client.disconnect();
    };
  }, [userId]);

  return { client: clientRef.current, isConnected, error };
}

useRealtimeChannel

Subscribe to a specific channel.

// testing-ui/src/hooks/useRealtimeChannel.ts

export function useRealtimeChannel(
  client: RealtimeProvider | null,
  channelName: string,
  onMessage: (message: RealtimeMessage) => void
) {
  const [channel, setChannel] = useState<RealtimeChannel | null>(null);

  useEffect(() => {
    if (!client) return;

    let subscribed = true;

    client.subscribe(channelName, (msg) => {
      if (subscribed) {
        onMessage(msg);
      }
    }).then(ch => {
      if (subscribed) {
        setChannel(ch);
      }
    });

    return () => {
      subscribed = false;
      client.unsubscribe(channelName);
    };
  }, [client, channelName]);

  const publish = useCallback(
    (event: string, data: any) => {
      if (channel) {
        return channel.publish(event, data);
      }
      return Promise.reject(new Error('Channel not ready'));
    },
    [channel]
  );

  return { channel, publish };
}

useRealtimePresence

Track who's online in a channel.

// testing-ui/src/hooks/useRealtimePresence.ts

export function useRealtimePresence(
  client: RealtimeProvider | null,
  channelName: string
) {
  const [members, setMembers] = useState<PresenceMember[]>([]);

  useEffect(() => {
    if (!client) return;

    // Enter presence
    client.presence.enter(channelName, { name: 'User' });

    // Poll for members (Ably also has presence.subscribe for real-time updates)
    const interval = setInterval(() => {
      client.presence.get(channelName).then(setMembers);
    }, 5000);

    return () => {
      clearInterval(interval);
      client.presence.leave(channelName);
    };
  }, [client, channelName]);

  return { members };
}

SSE Heartbeat

Purpose: Keep SSE connections alive with periodic pings.

Configuration

Database:

CREATE TABLE realtime_heartbeat_config (
  id UUID PRIMARY KEY,
  channel_type TEXT UNIQUE CHECK (channel_type IN (
    'project', 'session', 'organization', 'user', 'system'
  )),
  heartbeat_interval_ms INTEGER DEFAULT 30000,  -- 30s
  timeout_ms INTEGER DEFAULT 90000,              -- 90s
  created_at TIMESTAMPTZ,
  updated_at TIMESTAMPTZ
);

Default Config:

Channel TypeHeartbeat IntervalTimeout
project30s90s
session30s90s
organization60s180s
user30s90s
system60s180s

Implementation

Backend:

// src/api/helpers/sse-heartbeat.ts

export function startSSEHeartbeat(
  response: ServerResponse,
  channelType: string,
  intervalMs = 30000
): NodeJS.Timeout {
  const interval = setInterval(() => {
    if (response.writableEnded) {
      clearInterval(interval);
      return;
    }

    const heartbeat = {
      type: 'heartbeat',
      timestamp: Date.now(),
    };

    response.write(`data: ${JSON.stringify(heartbeat)}\n\n`);
  }, intervalMs);

  return interval;
}

Frontend:

// testing-ui/src/hooks/useSSEHeartbeat.ts

export function useSSEHeartbeat(
  onTimeout: () => void,
  timeoutMs = 90000
) {
  const lastHeartbeatRef = useRef<number>(Date.now());

  const handleHeartbeat = useCallback(() => {
    lastHeartbeatRef.current = Date.now();
  }, []);

  useEffect(() => {
    const checkInterval = setInterval(() => {
      const elapsed = Date.now() - lastHeartbeatRef.current;
      if (elapsed > timeoutMs) {
        onTimeout();
      }
    }, 10000); // Check every 10s

    return () => clearInterval(checkInterval);
  }, [timeoutMs, onTimeout]);

  return { handleHeartbeat };
}

Database Tables

realtime_channels

CREATE TABLE realtime_channels (
  id UUID PRIMARY KEY,
  tenant_id UUID NOT NULL,
  channel_name TEXT NOT NULL,
  channel_type TEXT CHECK (channel_type IN (
    'project', 'session', 'organization', 'user', 'system'
  )),
  entity_id UUID,
  config JSONB DEFAULT '{}',
  is_active BOOLEAN DEFAULT true,
  last_activity_at TIMESTAMPTZ,
  created_at TIMESTAMPTZ,
  updated_at TIMESTAMPTZ
);

CREATE UNIQUE INDEX idx_realtime_channels_name
  ON realtime_channels(tenant_id, channel_name);

Example Channels:

  • project:abc123 → All updates for project abc123
  • session:def456 → Discovery session def456 chat
  • organization:org789 → Organization-wide events
  • user:user001 → User-specific notifications

realtime_events_log

CREATE TABLE realtime_events_log (
  id UUID PRIMARY KEY,
  tenant_id UUID NOT NULL,
  channel_id UUID REFERENCES realtime_channels(id),
  event_type TEXT NOT NULL,
  event_data JSONB NOT NULL,
  published_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_realtime_events_log_channel
  ON realtime_events_log(channel_id, published_at);

Usage: Audit trail of all realtime events.


realtime_connection_stats

CREATE TABLE realtime_connection_stats (
  id UUID PRIMARY KEY,
  tenant_id UUID NOT NULL,
  channel_id UUID REFERENCES realtime_channels(id),
  connection_id TEXT NOT NULL,
  client_info JSONB,
  connected_at TIMESTAMPTZ DEFAULT NOW(),
  disconnected_at TIMESTAMPTZ,
  last_heartbeat_at TIMESTAMPTZ,
  message_count INTEGER DEFAULT 0,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_realtime_connection_stats_active
  ON realtime_connection_stats(disconnected_at) WHERE disconnected_at IS NULL;

Usage: Monitor active connections, detect stale connections.


Event Bus Integration

Unified Event Bus → Realtime Bridge

File: src/services/events/conversation-event-bridge.ts

Bridges unified events to realtime channels:

import { EventBus } from './unified-event-bus.js';
import { createRealtimeClient } from '../../lib/realtime/index.js';

export class ConversationEventBridge {
  private eventBus: EventBus;
  private realtimeClient: RealtimeProvider;

  constructor() {
    this.eventBus = EventBus.getInstance();
    this.realtimeClient = createRealtimeClient('ably');
  }

  async start() {
    await this.realtimeClient.connect(
      process.env.ABLY_API_KEY!,
      'server'
    );

    // Bridge project events
    this.eventBus.on('project.updated', async (event) => {
      await this.realtimeClient.publish(
        `project:${event.payload.projectId}`,
        'project.updated',
        event.payload
      );
    });

    // Bridge session events
    this.eventBus.on('session.message', async (event) => {
      await this.realtimeClient.publish(
        `session:${event.payload.sessionId}`,
        'session.message',
        event.payload
      );
    });

    // Bridge obligation events
    this.eventBus.on('obligation.changed', async (event) => {
      await this.realtimeClient.publish(
        `session:${event.payload.sessionId}`,
        'obligation.changed',
        event.payload
      );
    });
  }
}

Usage:

// src/index.ts
import { ConversationEventBridge } from './services/events/conversation-event-bridge.js';

const bridge = new ConversationEventBridge();
await bridge.start();

Frontend Integration

Provider Setup

File: testing-ui/src/providers/RealtimeProvider.tsx

import { createContext, useContext, useEffect, useState } from 'react';
import { createRealtimeClient } from '@/lib/realtime';

const RealtimeContext = createContext<RealtimeProvider | null>(null);

export function RealtimeProvider({ children, userId }: Props) {
  const [client, setClient] = useState<RealtimeProvider | null>(null);

  useEffect(() => {
    const realtimeClient = createRealtimeClient('ably');

    realtimeClient.connect(
      process.env.NEXT_PUBLIC_ABLY_KEY!,
      userId
    ).then(() => {
      setClient(realtimeClient);
    });

    return () => {
      realtimeClient.disconnect();
    };
  }, [userId]);

  return (
    <RealtimeContext.Provider value={client}>
      {children}
    </RealtimeContext.Provider>
  );
}

export function useRealtime() {
  return useContext(RealtimeContext);
}

App Wrapper

File: testing-ui/src/app/layout.tsx

import { RealtimeProvider } from '@/providers/RealtimeProvider';

export default function RootLayout({ children }) {
  const { userId } = useAuth();

  return (
    <html>
      <body>
        <RealtimeProvider userId={userId}>
          {children}
        </RealtimeProvider>
      </body>
    </html>
  );
}

Migration Guide

From SSE to Ably

Old Code (SSE):

// OLD: testing-ui/src/hooks/useSSE.ts
const eventSource = new EventSource('/api/stream');
eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);
  handleMessage(data);
};

New Code (Ably):

// NEW: testing-ui/src/hooks/useProjectEvents.ts
const client = useRealtime();
const { channel } = useRealtimeChannel(
  client,
  `project:${projectId}`,
  (message) => {
    if (message.name === 'project.updated') {
      handleMessage(message.data);
    }
  }
);

Backend: Publish Events

Old Code:

// OLD: Inline SSE write
response.write(`data: ${JSON.stringify(event)}\n\n`);

New Code:

// NEW: Use event bus
import { EventBus } from '@/services/events/unified-event-bus';

EventBus.getInstance().emit('project.updated', {
  projectId,
  changes: { status: 'completed' }
});

// Event bridge automatically publishes to realtime channel

Testing

Unit Tests

File: tests/unit/lib/realtime/mock-provider.test.ts

Tests mock provider functionality.

File: tests/unit/lib/realtime/client-factory.test.ts

Tests client factory creation.

File: tests/unit/api/helpers/sse-heartbeat.test.ts

Tests SSE heartbeat logic.


E2E Tests

File: testing-ui/tests/e2e/features/realtime-events.spec.ts

Tests end-to-end realtime event flow.



Implemented: February 2026 (Migrations 199-202) Last Updated: February 2026

Command Palette

Search for a command to run...