ExamplesWebhook Processor

Webhook Processor

Receive, validate, and process webhooks from Stripe, GitHub, Shopify, and other services. Events are validated, logged, and processed asynchronously via Queue.

Demonstrates: Queue, Database, Cache, Secrets, HMAC signature validation

Time to build: 20–30 minutes


Architecture

Incoming webhook → POST /webhook-receiver
  → Validate HMAC signature
  → Log to database (audit trail)
  → Enqueue for async processing
  → Return 200 immediately

Queue worker picks up job
  → Route to provider handler (Stripe, GitHub...)
  → Update order/subscription state
  → Send notification emails
  → Mark webhook as processed

Database schema

CREATE TABLE webhook_logs (
  id TEXT PRIMARY KEY,
  provider TEXT NOT NULL,
  event_type TEXT NOT NULL,
  payload TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'received',
  error TEXT,
  created_at INTEGER NOT NULL,
  processed_at INTEGER
);
 
CREATE INDEX idx_webhooks_provider ON webhook_logs(provider);
CREATE INDEX idx_webhooks_status ON webhook_logs(status);

Step 1: Webhook receiver

// POST /api/webhook/:provider
app.post('/api/webhook/:provider', async (c) => {
  const provider = c.req.param('provider')
  const validProviders = ['stripe', 'github', 'shopify']
 
  if (!validProviders.includes(provider)) {
    return c.json({ error: 'Unknown provider' }, 400)
  }
 
  const rawBody = await c.req.text()
  const headers = Object.fromEntries(c.req.raw.headers)
 
  // Validate signature
  const isValid = await validateSignature(provider, rawBody, headers, c.env)
  if (!isValid) {
    return c.json({ error: 'Invalid signature' }, 401)
  }
 
  const payload = JSON.parse(rawBody)
  const webhookId = `${provider}_${Date.now()}_${Math.random().toString(36).slice(2, 9)}`
 
  // Log to DB
  await sdk.db.query(
    'INSERT INTO webhook_logs (id, provider, event_type, payload, status, created_at) VALUES (?, ?, ?, ?, ?, ?)',
    [webhookId, provider, payload.type ?? 'unknown', rawBody, 'received', Date.now()]
  )
 
  // Queue for processing
  await sdk.queue.send(`webhook:${provider}`, { webhookId, provider, payload })
 
  await sdk.db.query(
    'UPDATE webhook_logs SET status = ? WHERE id = ?',
    ['queued', webhookId]
  )
 
  return c.json({ ok: true, webhookId })
})
 
async function validateSignature(provider, rawBody, headers, env) {
  if (provider === 'stripe') {
    const sig = headers['stripe-signature']
    const secret = env.STRIPE_WEBHOOK_SECRET
    const timestamp = sig.match(/t=(\d+)/)?.[1]
    const expected = await hmac(`${timestamp}.${rawBody}`, secret)
    return sig.includes(expected)
  }
 
  if (provider === 'github') {
    const sig = headers['x-hub-signature-256']
    const secret = env.GITHUB_WEBHOOK_SECRET
    const expected = 'sha256=' + await hmac(rawBody, secret)
    return sig === expected
  }
 
  return true
}
 
async function hmac(data, secret) {
  const encoder = new TextEncoder()
  const key = await crypto.subtle.importKey(
    'raw', encoder.encode(secret),
    { name: 'HMAC', hash: 'SHA-256' }, false, ['sign']
  )
  const sig = await crypto.subtle.sign('HMAC', key, encoder.encode(data))
  return Array.from(new Uint8Array(sig)).map(b => b.toString(16).padStart(2, '0')).join('')
}

Step 2: Stripe queue worker

// Worker: process 'webhook:stripe' queue messages
export async function stripeWebhookWorker(batch, env) {
  for (const message of batch.messages) {
    const { webhookId, payload } = message.body
 
    try {
      switch (payload.type) {
        case 'payment_intent.succeeded':
          await handlePaymentSuccess(payload.data.object)
          break
        case 'customer.subscription.created':
          await handleSubscriptionCreated(payload.data.object)
          break
        case 'customer.subscription.deleted':
          await handleSubscriptionCancelled(payload.data.object)
          break
      }
 
      await sdk.db.query(
        'UPDATE webhook_logs SET status = ?, processed_at = ? WHERE id = ?',
        ['processed', Date.now(), webhookId]
      )
 
      message.ack()
    } catch (err) {
      await sdk.db.query(
        'UPDATE webhook_logs SET status = ?, error = ? WHERE id = ?',
        ['failed', err.message, webhookId]
      )
      message.retry({ delaySeconds: 60 }) // retry in 1 min
    }
  }
}
 
async function handlePaymentSuccess(payment) {
  const { id, amount, metadata } = payment
  if (!metadata.order_id) return
 
  await sdk.db.query(
    'UPDATE orders SET status = ?, stripe_payment_id = ?, paid_at = ? WHERE id = ?',
    ['paid', id, Date.now(), metadata.order_id]
  )
 
  await sdk.cache.delete(`order:${metadata.order_id}`)
 
  await sdk.queue.send('email:payment-confirmation', {
    orderId: metadata.order_id,
    amount: amount / 100,
  })
}

Step 3: Webhook stats API

// GET /api/webhook-stats?provider=stripe&timeframe=24h
app.get('/api/webhook-stats', async (c) => {
  const provider = c.req.query('provider') ?? 'stripe'
  const timeframe = c.req.query('timeframe') ?? '24h'
  const cacheKey = `webhook-stats:${provider}:${timeframe}`
 
  const cached = await sdk.cache.get(cacheKey)
  if (cached) return c.json(cached)
 
  const since = Date.now() - (timeframe === '24h' ? 86_400_000 : 3_600_000)
 
  const { results } = await sdk.db.query(`
    SELECT status, COUNT(*) as count, AVG(processed_at - created_at) as avg_ms
    FROM webhook_logs
    WHERE provider = ? AND created_at > ?
    GROUP BY status
  `, [provider, since])
 
  const stats = { provider, timeframe, by_status: results, updated_at: Date.now() }
 
  await sdk.cache.set(cacheKey, stats, { ttl: 300 })
 
  return c.json(stats)
})

Summary

In 20–30 minutes, you have:

  • Webhook receiver for Stripe, GitHub, Shopify with HMAC validation
  • Full audit trail in D1 with status tracking
  • Async queue processing with automatic retries
  • Payment/subscription state updates
  • Stats API with 5-minute cache