Use Case GuidesWebhook Processor

Building a Webhook Processor with Aerostack

Learn how to build a robust webhook processor that receives, validates, processes, and routes webhooks from third-party services (Stripe, GitHub, Shopify, etc.).

Use Case: Multi-Service Webhook Handler

Requirements:

  • Receive webhooks from multiple services
  • Validate webhook signatures
  • Process events asynchronously
  • Route to appropriate handlers
  • Retry failed processing
  • Store webhook history

Traditional Approach: 2-3 days
Aerostack Approach: 20-30 minutes


Architecture

Webhook → Aerostack API → Validate → Enqueue → Background Worker → Process

                                                            Cache + Database

Step 1: Generic Webhook Receiver

Endpoint: POST /custom/webhook-receiver
Settings: Public access (no API key), Rate limit: 1000/min

export default async function(sdk, event) {
  const { provider, payload, headers } = event.data;
  
  // Supported providers
  const validProviders = ['stripe', 'github', 'shopify', 'sendgrid'];
  
  if (!validProviders.includes(provider)) {
    throw new Error('Unsupported webhook provider');
  }
  
  // Log webhook receipt
  const timestamp = Date.now();
  const webhookId = `${provider}_${timestamp}_${Math.random().toString(36).substr(2, 9)}`;
  
  // Store in database for audit trail
  await sdk.db.query(`
    INSERT INTO webhook_logs (id, provider, payload, headers, status, created_at)
    VALUES (?, ?, ?, ?, 'received', ?)
  `, [webhookId, provider, JSON.stringify(payload), JSON.stringify(headers), timestamp]);
  
  // Validate signature based on provider
  const isValid = await validateWebhookSignature(sdk, provider, payload, headers);
  
  if (!isValid) {
    await sdk.db.query(
      'UPDATE webhook_logs SET status = ? WHERE id = ?',
      ['invalid_signature', webhookId]
    );
    throw new Error('Invalid webhook signature');
  }
  
  // Enqueue for async processing
  await sdk.queue.enqueue(`process-${provider}-webhook`, {
    webhookId,
    provider,
    payload,
    headers
  });
  
  // Update status
  await sdk.db.query(
    'UPDATE webhook_logs SET status = ? WHERE id = ?',
    ['queued', webhookId]
  );
  
  return { 
    success: true, 
    webhook_id: webhookId,
    message: 'Webhook received and queued for processing'
  };
}
 
async function validateWebhookSignature(sdk, provider, payload, headers) {
  switch (provider) {
    case 'stripe':
      return await validateStripeSignature(sdk, payload, headers);
    case 'github':
      return await validateGitHubSignature(sdk, payload, headers);
    case 'shopify':
      return await validateShopifySignature(sdk, payload, headers);
    default:
      return true;
  }
}
 
async function validateStripeSignature(sdk, payload, headers) {
  const signature = headers['stripe-signature'];
  const secret = await sdk.secrets.get('STRIPE_WEBHOOK_SECRET');
  
  // Stripe signature validation logic
  const expectedSignature = await createHmacSignature(
    `${headers['stripe-timestamp']}.${JSON.stringify(payload)}`,
    secret
  );
  
  return signature.includes(expectedSignature);
}
 
async function validateGitHubSignature(sdk, payload, headers) {
  const signature = headers['x-hub-signature-256'];
  const secret = await sdk.secrets.get('GITHUB_WEBHOOK_SECRET');
  
  const expectedSig = 'sha256=' + await createHmacSignature(
    JSON.stringify(payload),
    secret
  );
  
  return signature === expectedSig;
}
 
async function createHmacSignature(data, secret) {
  // Use crypto API or similar
  const encoder = new TextEncoder();
  const keyData = encoder.encode(secret);
  const key = await crypto.subtle.importKey(
    'raw',
    keyData,
    { name: 'HMAC', hash: 'SHA-256' },
    false,
    ['sign']
  );
  
  const sigBuffer = await crypto.subtle.sign(
    'HMAC',
    key,
    encoder.encode(data)
  );
  
  return Array.from(new Uint8Array(sigBuffer))
    .map(b => b.toString(16).padStart(2, '0'))
    .join('');
}

Step 2: Stripe Webhook Processor

Worker: process-stripe-webhook

export async function processStripeWebhook(sdk, job) {
  const { webhookId, payload } = job.data;
  const { type, data } = payload;
  
  try {
    switch (type) {
      case 'payment_intent.succeeded':
        await handlePaymentSuccess(sdk, data.object);
        break;
        
      case 'payment_intent.payment_failed':
        await handlePaymentFailure(sdk, data.object);
        break;
        
      case 'customer.subscription.created':
        await handleSubscriptionCreated(sdk, data.object);
        break;
        
      case 'customer.subscription.deleted':
        await handleSubscriptionCanceled(sdk, data.object);
        break;
        
      default:
        console.log(`Unhandled Stripe event: ${type}`);
    }
    
    // Update webhook log
    await sdk.db.query(
      'UPDATE webhook_logs SET status = ?, processed_at = ? WHERE id = ?',
      ['processed', Date.now(), webhookId]
    );
    
    return { success: true, event: type };
  } catch (error) {
    // Mark as failed
    await sdk.db.query(
      'UPDATE webhook_logs SET status = ?, error = ? WHERE id = ?',
      ['failed', error.message, webhookId]
    );
    
    throw error; // Triggers retry
  }
}
 
async function handlePaymentSuccess(sdk, payment) {
  const { id, amount, customer, metadata } = payment;
  
  // Update order status
  if (metadata.order_id) {
    await sdk.db.query(`
      UPDATE orders 
      SET status = 'paid', 
          stripe_payment_id = ?,
          paid_at = ?
      WHERE id = ?
    `, [id, Date.now(), metadata.order_id]);
    
    // Clear order cache
    await sdk.cache.delete(`order:${metadata.order_id}`);
    
    // Send confirmation email
    await sdk.queue.enqueue('send-payment-confirmation', {
      order_id: metadata.order_id,
      amount: amount / 100,
      customer
    });
  }
}
 
async function handlePaymentFailure(sdk, payment) {
  const { id, last_payment_error, metadata } = payment;
  
  // Log failed payment
  await sdk.db.query(`
    INSERT INTO payment_failures (payment_id, order_id, error, created_at)
    VALUES (?, ?, ?, ?)
  `, [id, metadata.order_id, last_payment_error?.message, Date.now()]);
  
  // Notify user
  await sdk.queue.enqueue('send-payment-failed-email', {
    order_id: metadata.order_id,
    error: last_payment_error?.message
  });
}

Step 3: GitHub Webhook Processor

Worker: process-github-webhook

export async function processGitHubWebhook(sdk, job) {
  const { webhookId, payload } = job.data;
  const { action, repository, sender } = payload;
  
  try {
    // Handle different GitHub events
    if (payload.pull_request) {
      await handlePullRequest(sdk, payload);
    } else if (payload.issue) {
      await handleIssue(sdk, payload);
    } else if (payload.push) {
      await handlePush(sdk, payload);
    }
    
    await sdk.db.query(
      'UPDATE webhook_logs SET status = ?, processed_at = ? WHERE id = ?',
      ['processed', Date.now(), webhookId]
    );
    
    return { success: true };
  } catch (error) {
    await sdk.db.query(
      'UPDATE webhook_logs SET status = ?, error = ? WHERE id = ?',
      ['failed', error.message, webhookId]
    );
    throw error;
  }
}
 
async function handlePullRequest(sdk, payload) {
  const { action, pull_request, repository } = payload;
  
  if (action === 'opened') {
    // Use AI to analyze PR and suggest reviewers
    const prDiff = await sdk.fetch(pull_request.diff_url);
    const analysis = await sdk.ai.chat('@cf/meta/llama-3-8b-instruct', [
      {
        role: 'system',
        content: 'Analyze this PR and suggest appropriate reviewers based on changed files.'
      },
      {
        role: 'user',
        content: `PR: ${pull_request.title}\n\nFiles changed: ${pull_request.changed_files}`
      }
    ]);
    
    // Store analysis
    await sdk.cache.set(
      `pr:${repository.id}:${pull_request.number}`,
      { analysis, timestamp: Date.now() },
      86400 // 24 hours
    );
  }
}

Step 4: Webhook Dashboard API

Endpoint: GET /custom/webhook-stats

export default async function(sdk, event) {
  const { provider, timeframe = '24h' } = event.data;
  const cacheKey = `webhook-stats:${provider}:${timeframe}`;
  
  // Check cache (5 minute TTL)
  const cached = await sdk.cache.get(cacheKey);
  if (cached) return cached;
  
  const milliseconds = timeframe === '24h' ? 86400000 : 3600000;
  const since = Date.now() - milliseconds;
  
  // Get webhook statistics
  const stats = await sdk.db.query(`
    SELECT 
      status,
      COUNT(*) as count,
      AVG(processed_at - created_at) as avg_processing_time
    FROM webhook_logs
    WHERE provider = ? AND created_at > ?
    GROUP BY status
  `, [provider, since]);
  
  const result = {
    provider,
    timeframe,
    total: stats.reduce((sum, s) => sum + s.count, 0),
    by_status: stats,
    updated_at: Date.now()
  };
  
  // Cache for 5 minutes
  await sdk.cache.set(cacheKey, result, 300);
  
  return result;
}

Database Schema

CREATE TABLE webhook_logs (
  id TEXT PRIMARY KEY,
  provider TEXT NOT NULL,
  payload TEXT NOT NULL,
  headers TEXT NOT NULL,
  status TEXT NOT NULL,
  error TEXT,
  created_at INTEGER NOT NULL,
  processed_at INTEGER
);
 
CREATE INDEX idx_webhooks_provider ON webhook_logs(provider);
CREATE INDEX idx_webhooks_status ON webhook_logs(status);
CREATE INDEX idx_webhooks_created ON webhook_logs(created_at);

What You Built

In 20-30 minutes:

Generic webhook receiver - Supports multiple providers
Signature validation - Stripe, GitHub, Shopify
Async processing - Queue-based workers
Audit trail - All webhooks logged
Error handling - Automatic retries
Analytics - Webhook stats with caching
AI integration - Smart PR analysis

No Redis. No RabbitMQ. No separate workers. Just code.


Production Features Included

  • Rate limiting - 1000 webhooks/min
  • Retry logic - Automatic on failure
  • Monitoring - Built-in logs
  • Security - Signature validation
  • Scalability - Auto-scaling workers

Ready to process webhooks at scale?

Get Started →