Skip to content

Gateway Processing Architecture

The gateway processing system provides a modular, extensible architecture for handling communication across different platforms (Telegram, Email, WhatsApp, Slack, Discord, etc.).

Overview

┌─────────────────────────────────────────────────────────────────┐
│                      Incoming Webhook                           │
│              /api/agents/{id}/gateways/{gid}/webhook            │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                      GatewayPipeline                            │
├─────────────────────────────────────────────────────────────────┤
│ 1. parseInput()         → Platform-specific parsing             │
│ 2. resolveContact()     → Address book lookup/create            │
│ 3. resolveConversation()→ Conversation lookup/create            │
│ 4. storeMessage()       → Store user message                    │
│ 5. generateResponse()   → Agent LLM processing                  │
│ 6. formatOutput()       → Platform-specific formatting          │
│ 7. send()               → Platform API delivery                 │
│ 8. storeMessage()       → Store assistant message               │
└─────────────────────────────────────────────────────────────────┘

           ┌────────────────────┼────────────────────┐
           ▼                    ▼                    ▼
   ┌──────────────┐     ┌──────────────┐     ┌──────────────┐
   │   Telegram   │     │    Email     │     │   WhatsApp   │
   │  Processor   │     │  Processor   │     │  Processor   │
   └──────────────┘     └──────────────┘     └──────────────┘

Data Flow

1. Input Processing

Raw webhook data from external platforms is normalized into a standard GatewayInput format:

typescript
interface GatewayInput {
  gatewayId: string;
  gatewayType: "telegram" | "email" | "whatsapp" | "slack" | "discord";

  // Normalized message
  message: string;
  messageId?: string;
  attachments?: GatewayAttachment[];

  // Resolved contact (from address book)
  contact: {
    id: string;
    name: string;
    platformId: string;  // telegram chat ID, email address, etc.
    username?: string;
  };

  // Conversation context
  conversationId: string;
  threadId?: string;
}

2. Agent Processing

The normalized input is passed to the agentic endpoint which:

  • Retrieves conversation history
  • Builds context with RAG
  • Executes LLM with tools
  • Returns response with any generated images/files

3. Output Processing

Agent response is normalized into GatewayOutput:

typescript
interface GatewayOutput {
  text: string;
  images: Array<{
    data: string;     // base64 or URL
    caption?: string;
    mimeType?: string;
  }>;
  files?: Array<{
    data: string;
    filename: string;
    mimeType: string;
  }>;
  actions?: Array<{
    type: "button" | "link";
    label: string;
    value: string;
  }>;
}

4. Platform Delivery

Each processor formats and sends the output using platform-specific APIs:

  • Telegram: HTML formatting, inline keyboards, photo/document upload
  • Email: HTML email with attachments
  • WhatsApp: Template messages, media messages
  • Slack: Block Kit formatting, file uploads

File Structure

src/agent/gateways/
├── types.ts        # Core interfaces and types
├── index.ts        # Registry and pipeline orchestrator
├── telegram.ts     # Telegram Bot API processor
├── email.ts        # Email processor (Resend, MailChannels)
├── whatsapp.ts     # WhatsApp Business API processor (TODO)
├── slack.ts        # Slack API processor (TODO)
└── discord.ts      # Discord API processor (TODO)

Processor Interface

Each platform implements the GatewayProcessor interface:

typescript
interface GatewayProcessor {
  // Parse raw webhook into standardized input
  parseInput(rawData: unknown, config: GatewayConfig): Promise<ParsedInput | null>;

  // Format output for this platform
  formatOutput(output: GatewayOutput): Promise<FormattedOutput>;

  // Send formatted output via platform API
  send(output: FormattedOutput, destination: GatewayDestination): Promise<SendResult>;

  // Optional: Verify webhook signature
  verifyWebhook?(request: Request, config: GatewayConfig): Promise<boolean>;

  // Optional: Register webhook URL with platform
  registerWebhook?(webhookUrl: string, config: GatewayConfig): Promise<WebhookResult>;

  // Optional: Verify credentials
  verifyCredentials?(config: GatewayConfig): Promise<VerifyResult>;
}

Adding a New Platform

1. Create Processor File

typescript
// src/agent/gateways/slack.ts
import type { GatewayProcessor, SlackConfig, ... } from "./types";

export class SlackProcessor implements GatewayProcessor {
  async parseInput(rawData: unknown, config: GatewayConfig): Promise<ParsedInput | null> {
    const event = rawData as SlackEvent;

    // Skip bot messages
    if (event.bot_id) return null;

    return {
      message: event.text,
      messageId: event.ts,
      sender: {
        platformId: event.channel,
        username: event.user,
        displayName: await this.getUserName(event.user, config),
      },
      threadId: event.thread_ts || event.ts,
      raw: event,
    };
  }

  async formatOutput(output: GatewayOutput): Promise<FormattedOutput> {
    // Convert to Slack Block Kit format
    const blocks = this.buildBlocks(output);
    return { messages: [{ type: "text", content: JSON.stringify(blocks) }] };
  }

  async send(output: FormattedOutput, destination: GatewayDestination): Promise<SendResult> {
    const config = destination.config as SlackConfig;

    const response = await fetch("https://slack.com/api/chat.postMessage", {
      method: "POST",
      headers: {
        "Authorization": `Bearer ${config.botToken}`,
        "Content-Type": "application/json",
      },
      body: JSON.stringify({
        channel: destination.targetId,
        blocks: JSON.parse(output.messages[0].content),
      }),
    });

    const data = await response.json();
    return { success: data.ok, messageIds: [data.ts] };
  }
}

export const slackProcessor = new SlackProcessor();

2. Register Processor

typescript
// src/agent/gateways/index.ts
import { slackProcessor } from "./slack";

const processors: GatewayProcessorRegistry = {
  telegram: telegramProcessor,
  email: emailProcessor,
  slack: slackProcessor,  // Add here
};

3. Add Config Type

typescript
// src/agent/gateways/types.ts
export interface SlackConfig extends GatewayConfig {
  botToken: string;
  signingSecret: string;
  appId?: string;
}

Platform-Specific Details

Telegram

  • Uses Bot API with HTML parse mode
  • Supports inline keyboards for actions
  • Handles photos via multipart form upload
  • Message length limit: 4096 characters
  • Caption length limit: 1024 characters

Email

  • Supports Resend and MailChannels providers
  • Converts markdown to HTML
  • Handles attachments as base64
  • Supports reply threading via In-Reply-To header

WhatsApp

  • Meta Business API support
  • Template messages for notifications
  • Session messages for conversations
  • 24-hour session window handling
  • Media message support (images, documents)

Slack (Planned)

  • Block Kit for rich formatting
  • Thread support via thread_ts
  • File uploads via files.upload
  • Webhook signature verification

Discord (Planned)

  • Interaction-based commands
  • Embed formatting
  • Attachment support
  • Webhook signature verification (Ed25519)

Database Schema

Contacts (Address Book)

sql
CREATE TABLE contacts (
  id TEXT PRIMARY KEY,
  agent_id TEXT NOT NULL,
  name TEXT NOT NULL,
  email TEXT,
  phone TEXT,
  telegram_id TEXT,
  telegram_username TEXT,
  whatsapp_id TEXT,
  slack_id TEXT,
  discord_id TEXT,
  last_interaction_at TEXT,
  interaction_count INTEGER DEFAULT 0,
  status TEXT DEFAULT 'active'
);

Gateways

sql
CREATE TABLE gateways (
  id TEXT PRIMARY KEY,
  agent_id TEXT NOT NULL,
  name TEXT NOT NULL,
  type TEXT NOT NULL,
  config TEXT DEFAULT '{}',
  webhook_url TEXT,
  webhook_secret TEXT,
  status TEXT DEFAULT 'inactive',
  verified INTEGER DEFAULT 0,
  event_count INTEGER DEFAULT 0,
  last_event_at TEXT,
  last_error TEXT
);

Conversations

sql
CREATE TABLE conversations (
  id TEXT PRIMARY KEY,
  agent_id TEXT NOT NULL,
  contact_id TEXT,
  gateway_id TEXT,
  channel_type TEXT,
  external_thread_id TEXT,
  message_count INTEGER DEFAULT 0,
  last_message_at TEXT
);

Error Handling

The pipeline handles errors at each stage:

  1. Parse errors: Return 200 OK (don't retry)
  2. Contact/conversation errors: Log and continue with defaults
  3. Agent errors: Return error message to user via gateway
  4. Send errors: Log, update gateway error tracking, retry if rate-limited

Rate Limiting

Each processor handles platform-specific rate limits:

  • Telegram: 30 messages/second per bot
  • Email: Provider-specific (Resend: 100/day free tier)
  • Slack: Tier-based rate limits
  • Discord: 50 requests/second

The SendResult includes rateLimited and retryAfter for retry logic.