Webhook Processor
Receive, validate, and process webhooks from Stripe, GitHub, Shopify, and other services. Events are validated, logged, and processed asynchronously via Queue.
Demonstrates: Queue, Database, Cache, Secrets, HMAC signature validation
Time to build: 20–30 minutes
Architecture
Incoming webhook → POST /webhook-receiver
→ Validate HMAC signature
→ Log to database (audit trail)
→ Enqueue for async processing
→ Return 200 immediately
↓
Queue worker picks up job
→ Route to provider handler (Stripe, GitHub...)
→ Update order/subscription state
→ Send notification emails
→ Mark webhook as processedDatabase schema
CREATE TABLE webhook_logs (
id TEXT PRIMARY KEY,
provider TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'received',
error TEXT,
created_at INTEGER NOT NULL,
processed_at INTEGER
);
CREATE INDEX idx_webhooks_provider ON webhook_logs(provider);
CREATE INDEX idx_webhooks_status ON webhook_logs(status);Step 1: Webhook receiver
// POST /api/webhook/:provider
app.post('/api/webhook/:provider', async (c) => {
const provider = c.req.param('provider')
const validProviders = ['stripe', 'github', 'shopify']
if (!validProviders.includes(provider)) {
return c.json({ error: 'Unknown provider' }, 400)
}
const rawBody = await c.req.text()
const headers = Object.fromEntries(c.req.raw.headers)
// Validate signature
const isValid = await validateSignature(provider, rawBody, headers, c.env)
if (!isValid) {
return c.json({ error: 'Invalid signature' }, 401)
}
const payload = JSON.parse(rawBody)
const webhookId = `${provider}_${Date.now()}_${Math.random().toString(36).slice(2, 9)}`
// Log to DB
await sdk.db.query(
'INSERT INTO webhook_logs (id, provider, event_type, payload, status, created_at) VALUES (?, ?, ?, ?, ?, ?)',
[webhookId, provider, payload.type ?? 'unknown', rawBody, 'received', Date.now()]
)
// Queue for processing
await sdk.queue.send(`webhook:${provider}`, { webhookId, provider, payload })
await sdk.db.query(
'UPDATE webhook_logs SET status = ? WHERE id = ?',
['queued', webhookId]
)
return c.json({ ok: true, webhookId })
})
async function validateSignature(provider, rawBody, headers, env) {
if (provider === 'stripe') {
const sig = headers['stripe-signature']
const secret = env.STRIPE_WEBHOOK_SECRET
const timestamp = sig.match(/t=(\d+)/)?.[1]
const expected = await hmac(`${timestamp}.${rawBody}`, secret)
return sig.includes(expected)
}
if (provider === 'github') {
const sig = headers['x-hub-signature-256']
const secret = env.GITHUB_WEBHOOK_SECRET
const expected = 'sha256=' + await hmac(rawBody, secret)
return sig === expected
}
return true
}
async function hmac(data, secret) {
const encoder = new TextEncoder()
const key = await crypto.subtle.importKey(
'raw', encoder.encode(secret),
{ name: 'HMAC', hash: 'SHA-256' }, false, ['sign']
)
const sig = await crypto.subtle.sign('HMAC', key, encoder.encode(data))
return Array.from(new Uint8Array(sig)).map(b => b.toString(16).padStart(2, '0')).join('')
}Step 2: Stripe queue worker
// Worker: process 'webhook:stripe' queue messages
export async function stripeWebhookWorker(batch, env) {
for (const message of batch.messages) {
const { webhookId, payload } = message.body
try {
switch (payload.type) {
case 'payment_intent.succeeded':
await handlePaymentSuccess(payload.data.object)
break
case 'customer.subscription.created':
await handleSubscriptionCreated(payload.data.object)
break
case 'customer.subscription.deleted':
await handleSubscriptionCancelled(payload.data.object)
break
}
await sdk.db.query(
'UPDATE webhook_logs SET status = ?, processed_at = ? WHERE id = ?',
['processed', Date.now(), webhookId]
)
message.ack()
} catch (err) {
await sdk.db.query(
'UPDATE webhook_logs SET status = ?, error = ? WHERE id = ?',
['failed', err.message, webhookId]
)
message.retry({ delaySeconds: 60 }) // retry in 1 min
}
}
}
async function handlePaymentSuccess(payment) {
const { id, amount, metadata } = payment
if (!metadata.order_id) return
await sdk.db.query(
'UPDATE orders SET status = ?, stripe_payment_id = ?, paid_at = ? WHERE id = ?',
['paid', id, Date.now(), metadata.order_id]
)
await sdk.cache.delete(`order:${metadata.order_id}`)
await sdk.queue.send('email:payment-confirmation', {
orderId: metadata.order_id,
amount: amount / 100,
})
}Step 3: Webhook stats API
// GET /api/webhook-stats?provider=stripe&timeframe=24h
app.get('/api/webhook-stats', async (c) => {
const provider = c.req.query('provider') ?? 'stripe'
const timeframe = c.req.query('timeframe') ?? '24h'
const cacheKey = `webhook-stats:${provider}:${timeframe}`
const cached = await sdk.cache.get(cacheKey)
if (cached) return c.json(cached)
const since = Date.now() - (timeframe === '24h' ? 86_400_000 : 3_600_000)
const { results } = await sdk.db.query(`
SELECT status, COUNT(*) as count, AVG(processed_at - created_at) as avg_ms
FROM webhook_logs
WHERE provider = ? AND created_at > ?
GROUP BY status
`, [provider, since])
const stats = { provider, timeframe, by_status: results, updated_at: Date.now() }
await sdk.cache.set(cacheKey, stats, { ttl: 300 })
return c.json(stats)
})Summary
In 20–30 minutes, you have:
- Webhook receiver for Stripe, GitHub, Shopify with HMAC validation
- Full audit trail in D1 with status tracking
- Async queue processing with automatic retries
- Payment/subscription state updates
- Stats API with 5-minute cache