Webhook Processor
Receive, validate, and process webhooks from Stripe, GitHub, Shopify, and other services. Events are validated, logged, and processed asynchronously via Queue.
Demonstrates: Queue, Database, Cache, Secrets, HMAC signature validation
Time to build: 20–30 minutes
Architecture
Section titled “Architecture”Incoming webhook → POST /webhook-receiver → Validate HMAC signature → Log to database (audit trail) → Enqueue for async processing → Return 200 immediately ↓Queue worker picks up job → Route to provider handler (Stripe, GitHub...) → Update order/subscription state → Send notification emails → Mark webhook as processedDatabase schema
Section titled “Database schema”CREATE TABLE webhook_logs ( id TEXT PRIMARY KEY, provider TEXT NOT NULL, event_type TEXT NOT NULL, payload TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'received', error TEXT, created_at INTEGER NOT NULL, processed_at INTEGER);
CREATE INDEX idx_webhooks_provider ON webhook_logs(provider);CREATE INDEX idx_webhooks_status ON webhook_logs(status);Step 1: Webhook receiver
Section titled “Step 1: Webhook receiver”// POST /api/webhook/:providerapp.post('/api/webhook/:provider', async (c) => { const provider = c.req.param('provider') const validProviders = ['stripe', 'github', 'shopify']
if (!validProviders.includes(provider)) { return c.json({ error: 'Unknown provider' }, 400) }
const rawBody = await c.req.text() const headers = Object.fromEntries(c.req.raw.headers)
// Validate signature const isValid = await validateSignature(provider, rawBody, headers, c.env) if (!isValid) { return c.json({ error: 'Invalid signature' }, 401) }
const payload = JSON.parse(rawBody) const webhookId = `${provider}_${Date.now()}_${Math.random().toString(36).slice(2, 9)}`
// Log to DB await sdk.db.query( 'INSERT INTO webhook_logs (id, provider, event_type, payload, status, created_at) VALUES (?, ?, ?, ?, ?, ?)', [webhookId, provider, payload.type ?? 'unknown', rawBody, 'received', Date.now()] )
// Queue for processing await sdk.queue.send(`webhook:${provider}`, { webhookId, provider, payload })
await sdk.db.query( 'UPDATE webhook_logs SET status = ? WHERE id = ?', ['queued', webhookId] )
return c.json({ ok: true, webhookId })})
async function validateSignature(provider, rawBody, headers, env) { if (provider === 'stripe') { const sig = headers['stripe-signature'] const secret = env.STRIPE_WEBHOOK_SECRET const timestamp = sig.match(/t=(\d+)/)?.[1] const expected = await hmac(`${timestamp}.${rawBody}`, secret) return sig.includes(expected) }
if (provider === 'github') { const sig = headers['x-hub-signature-256'] const secret = env.GITHUB_WEBHOOK_SECRET const expected = 'sha256=' + await hmac(rawBody, secret) return sig === expected }
return true}
async function hmac(data, secret) { const encoder = new TextEncoder() const key = await crypto.subtle.importKey( 'raw', encoder.encode(secret), { name: 'HMAC', hash: 'SHA-256' }, false, ['sign'] ) const sig = await crypto.subtle.sign('HMAC', key, encoder.encode(data)) return Array.from(new Uint8Array(sig)).map(b => b.toString(16).padStart(2, '0')).join('')}Step 2: Stripe queue worker
Section titled “Step 2: Stripe queue worker”// Worker: process 'webhook:stripe' queue messagesexport async function stripeWebhookWorker(batch, env) { for (const message of batch.messages) { const { webhookId, payload } = message.body
try { switch (payload.type) { case 'payment_intent.succeeded': await handlePaymentSuccess(payload.data.object) break case 'customer.subscription.created': await handleSubscriptionCreated(payload.data.object) break case 'customer.subscription.deleted': await handleSubscriptionCancelled(payload.data.object) break }
await sdk.db.query( 'UPDATE webhook_logs SET status = ?, processed_at = ? WHERE id = ?', ['processed', Date.now(), webhookId] )
message.ack() } catch (err) { await sdk.db.query( 'UPDATE webhook_logs SET status = ?, error = ? WHERE id = ?', ['failed', err.message, webhookId] ) message.retry({ delaySeconds: 60 }) // retry in 1 min } }}
async function handlePaymentSuccess(payment) { const { id, amount, metadata } = payment if (!metadata.order_id) return
await sdk.db.query( 'UPDATE orders SET status = ?, stripe_payment_id = ?, paid_at = ? WHERE id = ?', ['paid', id, Date.now(), metadata.order_id] )
await sdk.cache.delete(`order:${metadata.order_id}`)
await sdk.queue.send('email:payment-confirmation', { orderId: metadata.order_id, amount: amount / 100, })}Step 3: Webhook stats API
Section titled “Step 3: Webhook stats API”// GET /api/webhook-stats?provider=stripe&timeframe=24happ.get('/api/webhook-stats', async (c) => { const provider = c.req.query('provider') ?? 'stripe' const timeframe = c.req.query('timeframe') ?? '24h' const cacheKey = `webhook-stats:${provider}:${timeframe}`
const cached = await sdk.cache.get(cacheKey) if (cached) return c.json(cached)
const since = Date.now() - (timeframe === '24h' ? 86_400_000 : 3_600_000)
const { results } = await sdk.db.query(` SELECT status, COUNT(*) as count, AVG(processed_at - created_at) as avg_ms FROM webhook_logs WHERE provider = ? AND created_at > ? GROUP BY status `, [provider, since])
const stats = { provider, timeframe, by_status: results, updated_at: Date.now() }
await sdk.cache.set(cacheKey, stats, { ttl: 300 })
return c.json(stats)})Summary
Section titled “Summary”In 20–30 minutes, you have:
- Webhook receiver for Stripe, GitHub, Shopify with HMAC validation
- Full audit trail in D1 with status tracking
- Async queue processing with automatic retries
- Payment/subscription state updates
- Stats API with 5-minute cache