# Webhook Processor

> Receive and process webhooks from Stripe, GitHub, and Shopify. Validated, logged, and processed asynchronously via Aerostack Queue.

Receive, validate, and process webhooks from Stripe, GitHub, Shopify, and other services. Events are validated, logged, and processed asynchronously via Queue.

**Demonstrates:** Queue, Database, Cache, Secrets, HMAC signature validation

**Time to build:** 20–30 minutes

---

## Architecture

```
Incoming webhook → POST /webhook-receiver
  → Validate HMAC signature
  → Log to database (audit trail)
  → Enqueue for async processing
  → Return 200 immediately
        ↓
Queue worker picks up job
  → Route to provider handler (Stripe, GitHub...)
  → Update order/subscription state
  → Send notification emails
  → Mark webhook as processed
```

---

## Database schema

```sql
CREATE TABLE webhook_logs (
  id TEXT PRIMARY KEY,
  provider TEXT NOT NULL,
  event_type TEXT NOT NULL,
  payload TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'received',
  error TEXT,
  created_at INTEGER NOT NULL,
  processed_at INTEGER
);

CREATE INDEX idx_webhooks_provider ON webhook_logs(provider);
CREATE INDEX idx_webhooks_status ON webhook_logs(status);
```

---

## Step 1: Webhook receiver

```ts
// POST /api/webhook/:provider
app.post('/api/webhook/:provider', async (c) => {
  const provider = c.req.param('provider')
  const validProviders = ['stripe', 'github', 'shopify']

  if (!validProviders.includes(provider)) {
    return c.json({ error: 'Unknown provider' }, 400)
  }

  const rawBody = await c.req.text()
  const headers = Object.fromEntries(c.req.raw.headers)

  // Validate signature
  const isValid = await validateSignature(provider, rawBody, headers, c.env)
  if (!isValid) {
    return c.json({ error: 'Invalid signature' }, 401)
  }

  const payload = JSON.parse(rawBody)
  const webhookId = `${provider}_${Date.now()}_${Math.random().toString(36).slice(2, 9)}`

  // Log to DB
  await sdk.db.query(
    'INSERT INTO webhook_logs (id, provider, event_type, payload, status, created_at) VALUES (?, ?, ?, ?, ?, ?)',
    [webhookId, provider, payload.type ?? 'unknown', rawBody, 'received', Date.now()]
  )

  // Queue for processing
  await sdk.queue.send(`webhook:${provider}`, { webhookId, provider, payload })

  await sdk.db.query(
    'UPDATE webhook_logs SET status = ? WHERE id = ?',
    ['queued', webhookId]
  )

  return c.json({ ok: true, webhookId })
})

async function validateSignature(provider, rawBody, headers, env) {
  if (provider === 'stripe') {
    const sig = headers['stripe-signature']
    const secret = env.STRIPE_WEBHOOK_SECRET
    const timestamp = sig.match(/t=(\d+)/)?.[1]
    const expected = await hmac(`${timestamp}.${rawBody}`, secret)
    return sig.includes(expected)
  }

  if (provider === 'github') {
    const sig = headers['x-hub-signature-256']
    const secret = env.GITHUB_WEBHOOK_SECRET
    const expected = 'sha256=' + await hmac(rawBody, secret)
    return sig === expected
  }

  return true
}

async function hmac(data, secret) {
  const encoder = new TextEncoder()
  const key = await crypto.subtle.importKey(
    'raw', encoder.encode(secret),
    { name: 'HMAC', hash: 'SHA-256' }, false, ['sign']
  )
  const sig = await crypto.subtle.sign('HMAC', key, encoder.encode(data))
  return Array.from(new Uint8Array(sig)).map(b => b.toString(16).padStart(2, '0')).join('')
}
```

---

## Step 2: Stripe queue worker

```ts
// Worker: process 'webhook:stripe' queue messages

  for (const message of batch.messages) {
    const { webhookId, payload } = message.body

    try {
      switch (payload.type) {
        case 'payment_intent.succeeded':
          await handlePaymentSuccess(payload.data.object)
          break
        case 'customer.subscription.created':
          await handleSubscriptionCreated(payload.data.object)
          break
        case 'customer.subscription.deleted':
          await handleSubscriptionCancelled(payload.data.object)
          break
      }

      await sdk.db.query(
        'UPDATE webhook_logs SET status = ?, processed_at = ? WHERE id = ?',
        ['processed', Date.now(), webhookId]
      )

      message.ack()
    } catch (err) {
      await sdk.db.query(
        'UPDATE webhook_logs SET status = ?, error = ? WHERE id = ?',
        ['failed', err.message, webhookId]
      )
      message.retry({ delaySeconds: 60 }) // retry in 1 min
    }
  }
}

async function handlePaymentSuccess(payment) {
  const { id, amount, metadata } = payment
  if (!metadata.order_id) return

  await sdk.db.query(
    'UPDATE orders SET status = ?, stripe_payment_id = ?, paid_at = ? WHERE id = ?',
    ['paid', id, Date.now(), metadata.order_id]
  )

  await sdk.cache.delete(`order:${metadata.order_id}`)

  await sdk.queue.send('email:payment-confirmation', {
    orderId: metadata.order_id,
    amount: amount / 100,
  })
}
```

---

## Step 3: Webhook stats API

```ts
// GET /api/webhook-stats?provider=stripe&timeframe=24h
app.get('/api/webhook-stats', async (c) => {
  const provider = c.req.query('provider') ?? 'stripe'
  const timeframe = c.req.query('timeframe') ?? '24h'
  const cacheKey = `webhook-stats:${provider}:${timeframe}`

  const cached = await sdk.cache.get(cacheKey)
  if (cached) return c.json(cached)

  const since = Date.now() - (timeframe === '24h' ? 86_400_000 : 3_600_000)

  const { results } = await sdk.db.query(`
    SELECT status, COUNT(*) as count, AVG(processed_at - created_at) as avg_ms
    FROM webhook_logs
    WHERE provider = ? AND created_at > ?
    GROUP BY status
  `, [provider, since])

  const stats = { provider, timeframe, by_status: results, updated_at: Date.now() }

  await sdk.cache.set(cacheKey, stats, { ttl: 300 })

  return c.json(stats)
})
```

---

## Summary

In 20–30 minutes, you have:

- Webhook receiver for Stripe, GitHub, Shopify with HMAC validation
- Full audit trail in D1 with status tracking
- Async queue processing with automatic retries
- Payment/subscription state updates
- Stats API with 5-minute cache
