Building a Webhook Processor with Aerostack
Learn how to build a robust webhook processor that receives, validates, processes, and routes webhooks from third-party services (Stripe, GitHub, Shopify, etc.).
Use Case: Multi-Service Webhook Handler
Requirements:
- Receive webhooks from multiple services
- Validate webhook signatures
- Process events asynchronously
- Route to appropriate handlers
- Retry failed processing
- Store webhook history
Traditional Approach: 2-3 days
Aerostack Approach: 20-30 minutes
Architecture
Webhook → Aerostack API → Validate → Enqueue → Background Worker → Process
↓
Cache + DatabaseStep 1: Generic Webhook Receiver
Endpoint: POST /custom/webhook-receiver
Settings: Public access (no API key), Rate limit: 1000/min
export default async function(sdk, event) {
const { provider, payload, headers } = event.data;
// Supported providers
const validProviders = ['stripe', 'github', 'shopify', 'sendgrid'];
if (!validProviders.includes(provider)) {
throw new Error('Unsupported webhook provider');
}
// Log webhook receipt
const timestamp = Date.now();
const webhookId = `${provider}_${timestamp}_${Math.random().toString(36).substr(2, 9)}`;
// Store in database for audit trail
await sdk.db.query(`
INSERT INTO webhook_logs (id, provider, payload, headers, status, created_at)
VALUES (?, ?, ?, ?, 'received', ?)
`, [webhookId, provider, JSON.stringify(payload), JSON.stringify(headers), timestamp]);
// Validate signature based on provider
const isValid = await validateWebhookSignature(sdk, provider, payload, headers);
if (!isValid) {
await sdk.db.query(
'UPDATE webhook_logs SET status = ? WHERE id = ?',
['invalid_signature', webhookId]
);
throw new Error('Invalid webhook signature');
}
// Enqueue for async processing
await sdk.queue.enqueue(`process-${provider}-webhook`, {
webhookId,
provider,
payload,
headers
});
// Update status
await sdk.db.query(
'UPDATE webhook_logs SET status = ? WHERE id = ?',
['queued', webhookId]
);
return {
success: true,
webhook_id: webhookId,
message: 'Webhook received and queued for processing'
};
}
async function validateWebhookSignature(sdk, provider, payload, headers) {
switch (provider) {
case 'stripe':
return await validateStripeSignature(sdk, payload, headers);
case 'github':
return await validateGitHubSignature(sdk, payload, headers);
case 'shopify':
return await validateShopifySignature(sdk, payload, headers);
default:
return true;
}
}
async function validateStripeSignature(sdk, payload, headers) {
const signature = headers['stripe-signature'];
const secret = await sdk.secrets.get('STRIPE_WEBHOOK_SECRET');
// Stripe signature validation logic
const expectedSignature = await createHmacSignature(
`${headers['stripe-timestamp']}.${JSON.stringify(payload)}`,
secret
);
return signature.includes(expectedSignature);
}
async function validateGitHubSignature(sdk, payload, headers) {
const signature = headers['x-hub-signature-256'];
const secret = await sdk.secrets.get('GITHUB_WEBHOOK_SECRET');
const expectedSig = 'sha256=' + await createHmacSignature(
JSON.stringify(payload),
secret
);
return signature === expectedSig;
}
async function createHmacSignature(data, secret) {
// Use crypto API or similar
const encoder = new TextEncoder();
const keyData = encoder.encode(secret);
const key = await crypto.subtle.importKey(
'raw',
keyData,
{ name: 'HMAC', hash: 'SHA-256' },
false,
['sign']
);
const sigBuffer = await crypto.subtle.sign(
'HMAC',
key,
encoder.encode(data)
);
return Array.from(new Uint8Array(sigBuffer))
.map(b => b.toString(16).padStart(2, '0'))
.join('');
}Step 2: Stripe Webhook Processor
Worker: process-stripe-webhook
export async function processStripeWebhook(sdk, job) {
const { webhookId, payload } = job.data;
const { type, data } = payload;
try {
switch (type) {
case 'payment_intent.succeeded':
await handlePaymentSuccess(sdk, data.object);
break;
case 'payment_intent.payment_failed':
await handlePaymentFailure(sdk, data.object);
break;
case 'customer.subscription.created':
await handleSubscriptionCreated(sdk, data.object);
break;
case 'customer.subscription.deleted':
await handleSubscriptionCanceled(sdk, data.object);
break;
default:
console.log(`Unhandled Stripe event: ${type}`);
}
// Update webhook log
await sdk.db.query(
'UPDATE webhook_logs SET status = ?, processed_at = ? WHERE id = ?',
['processed', Date.now(), webhookId]
);
return { success: true, event: type };
} catch (error) {
// Mark as failed
await sdk.db.query(
'UPDATE webhook_logs SET status = ?, error = ? WHERE id = ?',
['failed', error.message, webhookId]
);
throw error; // Triggers retry
}
}
async function handlePaymentSuccess(sdk, payment) {
const { id, amount, customer, metadata } = payment;
// Update order status
if (metadata.order_id) {
await sdk.db.query(`
UPDATE orders
SET status = 'paid',
stripe_payment_id = ?,
paid_at = ?
WHERE id = ?
`, [id, Date.now(), metadata.order_id]);
// Clear order cache
await sdk.cache.delete(`order:${metadata.order_id}`);
// Send confirmation email
await sdk.queue.enqueue('send-payment-confirmation', {
order_id: metadata.order_id,
amount: amount / 100,
customer
});
}
}
async function handlePaymentFailure(sdk, payment) {
const { id, last_payment_error, metadata } = payment;
// Log failed payment
await sdk.db.query(`
INSERT INTO payment_failures (payment_id, order_id, error, created_at)
VALUES (?, ?, ?, ?)
`, [id, metadata.order_id, last_payment_error?.message, Date.now()]);
// Notify user
await sdk.queue.enqueue('send-payment-failed-email', {
order_id: metadata.order_id,
error: last_payment_error?.message
});
}Step 3: GitHub Webhook Processor
Worker: process-github-webhook
export async function processGitHubWebhook(sdk, job) {
const { webhookId, payload } = job.data;
const { action, repository, sender } = payload;
try {
// Handle different GitHub events
if (payload.pull_request) {
await handlePullRequest(sdk, payload);
} else if (payload.issue) {
await handleIssue(sdk, payload);
} else if (payload.push) {
await handlePush(sdk, payload);
}
await sdk.db.query(
'UPDATE webhook_logs SET status = ?, processed_at = ? WHERE id = ?',
['processed', Date.now(), webhookId]
);
return { success: true };
} catch (error) {
await sdk.db.query(
'UPDATE webhook_logs SET status = ?, error = ? WHERE id = ?',
['failed', error.message, webhookId]
);
throw error;
}
}
async function handlePullRequest(sdk, payload) {
const { action, pull_request, repository } = payload;
if (action === 'opened') {
// Use AI to analyze PR and suggest reviewers
const prDiff = await sdk.fetch(pull_request.diff_url);
const analysis = await sdk.ai.chat('@cf/meta/llama-3-8b-instruct', [
{
role: 'system',
content: 'Analyze this PR and suggest appropriate reviewers based on changed files.'
},
{
role: 'user',
content: `PR: ${pull_request.title}\n\nFiles changed: ${pull_request.changed_files}`
}
]);
// Store analysis
await sdk.cache.set(
`pr:${repository.id}:${pull_request.number}`,
{ analysis, timestamp: Date.now() },
86400 // 24 hours
);
}
}Step 4: Webhook Dashboard API
Endpoint: GET /custom/webhook-stats
export default async function(sdk, event) {
const { provider, timeframe = '24h' } = event.data;
const cacheKey = `webhook-stats:${provider}:${timeframe}`;
// Check cache (5 minute TTL)
const cached = await sdk.cache.get(cacheKey);
if (cached) return cached;
const milliseconds = timeframe === '24h' ? 86400000 : 3600000;
const since = Date.now() - milliseconds;
// Get webhook statistics
const stats = await sdk.db.query(`
SELECT
status,
COUNT(*) as count,
AVG(processed_at - created_at) as avg_processing_time
FROM webhook_logs
WHERE provider = ? AND created_at > ?
GROUP BY status
`, [provider, since]);
const result = {
provider,
timeframe,
total: stats.reduce((sum, s) => sum + s.count, 0),
by_status: stats,
updated_at: Date.now()
};
// Cache for 5 minutes
await sdk.cache.set(cacheKey, result, 300);
return result;
}Database Schema
CREATE TABLE webhook_logs (
id TEXT PRIMARY KEY,
provider TEXT NOT NULL,
payload TEXT NOT NULL,
headers TEXT NOT NULL,
status TEXT NOT NULL,
error TEXT,
created_at INTEGER NOT NULL,
processed_at INTEGER
);
CREATE INDEX idx_webhooks_provider ON webhook_logs(provider);
CREATE INDEX idx_webhooks_status ON webhook_logs(status);
CREATE INDEX idx_webhooks_created ON webhook_logs(created_at);What You Built
In 20-30 minutes:
✅ Generic webhook receiver - Supports multiple providers
✅ Signature validation - Stripe, GitHub, Shopify
✅ Async processing - Queue-based workers
✅ Audit trail - All webhooks logged
✅ Error handling - Automatic retries
✅ Analytics - Webhook stats with caching
✅ AI integration - Smart PR analysis
No Redis. No RabbitMQ. No separate workers. Just code.
Production Features Included
- ✅ Rate limiting - 1000 webhooks/min
- ✅ Retry logic - Automatic on failure
- ✅ Monitoring - Built-in logs
- ✅ Security - Signature validation
- ✅ Scalability - Auto-scaling workers
Ready to process webhooks at scale?
Get Started →