Patterns & Recipes
Copy-paste-ready patterns for common function architectures. Each pattern is a complete, working function.
REST API Handler
Section titled “REST API Handler”Route requests by method and pathname. This is the most common function pattern — a lightweight API server with no framework required.
interface Env { DB: Database CACHE: Cache}
export default { async fetch(request: Request, env: Env): Promise<Response> { const url = new URL(request.url) const path = url.pathname const method = request.method
// CORS preflight if (method === 'OPTIONS') { return new Response(null, { headers: { 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE', 'Access-Control-Allow-Headers': 'Content-Type, Authorization', } }) }
try { // GET /api/items if (method === 'GET' && path === '/api/items') { const { results } = await env.DB .prepare('SELECT * FROM items ORDER BY created_at DESC LIMIT 50') .all() return json(results) }
// GET /api/items/:id if (method === 'GET' && path.match(/^\/api\/items\/\d+$/)) { const id = path.split('/').pop() const item = await env.DB .prepare('SELECT * FROM items WHERE id = ?') .bind(id) .first()
if (!item) return json({ error: 'Not found' }, 404) return json(item) }
// POST /api/items if (method === 'POST' && path === '/api/items') { const body = await request.json<{ name: string; description: string }>()
if (!body.name) return json({ error: 'name is required' }, 400)
const item = await env.DB .prepare('INSERT INTO items (name, description) VALUES (?, ?) RETURNING *') .bind(body.name, body.description || '') .first()
return json(item, 201) }
// DELETE /api/items/:id if (method === 'DELETE' && path.match(/^\/api\/items\/\d+$/)) { const id = path.split('/').pop() await env.DB.prepare('DELETE FROM items WHERE id = ?').bind(id).run() return json({ deleted: true }) }
return json({ error: 'Not found' }, 404) } catch (err) { console.error('Request failed:', err) return json({ error: 'Internal server error' }, 500) } }}
function json(data: unknown, status = 200) { return Response.json(data, { status, headers: { 'Access-Control-Allow-Origin': '*', 'Content-Type': 'application/json' } })}Use case: Any API backend — user management, content APIs, form submissions, internal tools.
Queue Consumer
Section titled “Queue Consumer”Accept work via HTTP, enqueue it, and process it asynchronously. The HTTP response returns immediately while the job runs in the background.
interface Env { DB: Database QUEUE: Queue}
interface Job { type: string payload: Record<string, unknown> createdAt: number}
export default { async fetch(request: Request, env: Env): Promise<Response> { if (request.method !== 'POST') { return Response.json({ error: 'POST only' }, { status: 405 }) }
const body = await request.json<{ type: string; payload: Record<string, unknown> }>()
// Validate if (!body.type) { return Response.json({ error: 'type is required' }, { status: 400 }) }
// Record job in DB const job = await env.DB .prepare('INSERT INTO jobs (type, payload, status) VALUES (?, ?, ?) RETURNING id') .bind(body.type, JSON.stringify(body.payload), 'queued') .first<{ id: number }>()
// Enqueue for async processing await env.QUEUE.send({ type: body.type, payload: body.payload, jobId: job!.id, createdAt: Date.now() })
return Response.json({ jobId: job!.id, status: 'queued' }, { status: 202 }) },
async queue(batch: MessageBatch<Job>, env: Env): Promise<void> { for (const msg of batch.messages) { const { type, payload, jobId } = msg.body as Job & { jobId: number }
try { // Update status to processing await env.DB .prepare('UPDATE jobs SET status = ? WHERE id = ?') .bind('processing', jobId) .run()
// Do the actual work switch (type) { case 'generate-report': await generateReport(env, payload) break case 'send-notification': await sendNotification(payload) break case 'process-upload': await processUpload(env, payload) break default: throw new Error(`Unknown job type: ${type}`) }
// Mark complete await env.DB .prepare('UPDATE jobs SET status = ?, completed_at = datetime(\'now\') WHERE id = ?') .bind('completed', jobId) .run()
msg.ack() } catch (err) { console.error(`Job ${jobId} failed:`, err)
await env.DB .prepare('UPDATE jobs SET status = ?, error = ? WHERE id = ?') .bind('failed', String(err), jobId) .run()
msg.retry() } } }}
async function generateReport(env: Env, payload: Record<string, unknown>) { // Your report generation logic}
async function sendNotification(payload: Record<string, unknown>) { // Your notification logic}
async function processUpload(env: Env, payload: Record<string, unknown>) { // Your upload processing logic}Use case: Email sending, report generation, image processing, any work that should not block the user’s HTTP request.
Cron / Scheduled Function
Section titled “Cron / Scheduled Function”Run on a schedule. No HTTP trigger needed — Cloudflare invokes scheduled() based on your cron expression.
interface Env { DB: Database CACHE: Cache QUEUE: Queue}
export default { // Optional: also handle HTTP for health checks async fetch(request: Request, env: Env): Promise<Response> { return Response.json({ status: 'ok', next: 'runs every hour' }) },
async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext): Promise<void> { // Clean up expired sessions const deleted = await env.DB .prepare('DELETE FROM sessions WHERE expires_at < datetime(\'now\')') .run() console.log(`Cleaned ${deleted.meta.changes} expired sessions`)
// Compute and cache daily stats const stats = await env.DB .prepare(` SELECT COUNT(*) as total_users, COUNT(CASE WHEN created_at > datetime('now', '-1 day') THEN 1 END) as new_today, COUNT(CASE WHEN last_active > datetime('now', '-1 hour') THEN 1 END) as active_now FROM users `) .first()
await env.CACHE.put('stats:daily', JSON.stringify(stats), { expirationTtl: 7200 })
// Queue a summary notification await env.QUEUE.send({ type: 'daily-digest', stats, timestamp: Date.now() }) }}Configure the schedule in aerostack.toml:
[triggers]crons = ["0 * * * *"] # Every hour# crons = ["0 9 * * 1"] # Every Monday at 9 AM# crons = ["*/5 * * * *"] # Every 5 minutesUse case: Cleanup jobs, analytics aggregation, report generation, monitoring checks, periodic sync.
Webhook Receiver
Section titled “Webhook Receiver”Accept incoming webhooks, validate their signature, and process them reliably.
interface Env { DB: Database QUEUE: Queue CACHE: Cache}
export default { async fetch(request: Request, env: Env): Promise<Response> { if (request.method !== 'POST') { return new Response('Method not allowed', { status: 405 }) }
const url = new URL(request.url) const body = await request.text()
// --- Stripe webhook --- if (url.pathname === '/webhooks/stripe') { const signature = request.headers.get('stripe-signature') if (!signature) return new Response('Missing signature', { status: 401 })
// Verify signature (simplified — use stripe SDK in production) const isValid = await verifyStripeSignature(body, signature, env) if (!isValid) return new Response('Invalid signature', { status: 401 })
const event = JSON.parse(body)
// Deduplicate using event ID const seen = await env.CACHE.get(`webhook:stripe:${event.id}`) if (seen) return Response.json({ received: true, deduplicated: true })
// Mark as seen (24h TTL) await env.CACHE.put(`webhook:stripe:${event.id}`, '1', { expirationTtl: 86400 })
// Log and enqueue for processing await env.DB .prepare('INSERT INTO webhook_events (source, event_id, type, payload) VALUES (?, ?, ?, ?)') .bind('stripe', event.id, event.type, body) .run()
await env.QUEUE.send({ source: 'stripe', event })
return Response.json({ received: true }) }
// --- GitHub webhook --- if (url.pathname === '/webhooks/github') { const signature = request.headers.get('x-hub-signature-256') if (!signature) return new Response('Missing signature', { status: 401 })
const isValid = await verifyGitHubSignature(body, signature, env) if (!isValid) return new Response('Invalid signature', { status: 401 })
const event = JSON.parse(body) const eventType = request.headers.get('x-github-event') || 'unknown'
await env.QUEUE.send({ source: 'github', eventType, event })
return Response.json({ received: true }) }
return new Response('Unknown webhook endpoint', { status: 404 }) }}
async function verifyStripeSignature(body: string, signature: string, env: Env): Promise<boolean> { // Implement Stripe webhook signature verification // See: https://stripe.com/docs/webhooks/signatures return true // Replace with actual verification}
async function verifyGitHubSignature(body: string, signature: string, env: Env): Promise<boolean> { // Implement GitHub HMAC-SHA256 verification // See: https://docs.github.com/en/webhooks/using-webhooks/validating-webhook-deliveries return true // Replace with actual verification}Use case: Payment processors, CI/CD pipelines, form submissions, third-party integrations — any external system that sends events to your endpoint.
RAG Pipeline
Section titled “RAG Pipeline”A complete retrieval-augmented generation pipeline: ingest documents, chunk them, generate embeddings, store in vector search, and query with AI-powered answers.
interface Env { DB: Database AI: AI VECTORIZE: VectorSearch STORAGE: Storage}
export default { async fetch(request: Request, env: Env): Promise<Response> { const url = new URL(request.url)
// --- Ingest a document --- if (request.method === 'POST' && url.pathname === '/ingest') { const { title, content } = await request.json<{ title: string; content: string }>()
// Store original document const docId = crypto.randomUUID() await env.DB .prepare('INSERT INTO documents (id, title, content) VALUES (?, ?, ?)') .bind(docId, title, content) .run()
// Chunk the content (simple fixed-size chunking) const chunks = chunkText(content, 500, 50)
// Embed and store each chunk for (let i = 0; i < chunks.length; i++) { const embedding = await env.AI.run('@cf/baai/bge-base-en-v1.5', { text: chunks[i] })
await env.VECTORIZE.upsert([{ id: `${docId}-chunk-${i}`, values: embedding.data[0], metadata: { docId, title, chunkIndex: i, text: chunks[i] } }]) }
return Response.json({ docId, chunks: chunks.length }) }
// --- Query with RAG --- if (request.method === 'POST' && url.pathname === '/query') { const { question } = await request.json<{ question: string }>()
// 1. Embed the question const queryEmbedding = await env.AI.run('@cf/baai/bge-base-en-v1.5', { text: question })
// 2. Find relevant chunks const results = await env.VECTORIZE.query(queryEmbedding.data[0], { topK: 5, returnMetadata: 'all' })
// 3. Build context from retrieved chunks const context = results.matches .map(m => m.metadata?.text as string) .filter(Boolean) .join('\n\n---\n\n')
// 4. Generate answer with context const answer = await env.AI.run('@cf/meta/llama-3.1-8b-instruct', { messages: [ { role: 'system', content: `Answer the user's question based on the following context. If the context does not contain enough information, say so. Do not make up information.\n\nContext:\n${context}` }, { role: 'user', content: question } ], max_tokens: 500 })
return Response.json({ answer: answer.response, sources: results.matches.map(m => ({ docId: m.metadata?.docId, title: m.metadata?.title, score: m.score })) }) }
return Response.json({ error: 'Not found' }, { status: 404 }) }}
/** * Split text into overlapping chunks. * @param text - The full text to chunk * @param maxLength - Maximum characters per chunk * @param overlap - Characters of overlap between chunks */function chunkText(text: string, maxLength: number, overlap: number): string[] { const chunks: string[] = [] let start = 0
while (start < text.length) { const end = Math.min(start + maxLength, text.length) chunks.push(text.slice(start, end)) start += maxLength - overlap }
return chunks}Use case: Knowledge bases, documentation search, customer support bots, any application where you need AI answers grounded in your own data.
Data Enrichment
Section titled “Data Enrichment”Query your database, analyze results with AI, cache the output. A common pattern for dashboards, reports, and intelligent notifications.
interface Env { DB: Database AI: AI CACHE: Cache QUEUE: Queue}
export default { async fetch(request: Request, env: Env): Promise<Response> { const url = new URL(request.url)
if (url.pathname === '/api/insights') { // Check cache first (insights are expensive to generate) const cached = await env.CACHE.get('insights:latest', 'json') if (cached) return Response.json(cached)
// Pull raw data from DB const { results: recentOrders } = await env.DB .prepare(` SELECT o.*, u.name as customer_name FROM orders o JOIN users u ON o.user_id = u.id WHERE o.created_at > datetime('now', '-7 days') ORDER BY o.total DESC `) .all()
// Analyze with AI const analysis = await env.AI.run('@cf/meta/llama-3.1-8b-instruct', { messages: [{ role: 'user', content: `Analyze these orders from the past week and provide 3 key insights about trends, top customers, and recommendations:\n\n${JSON.stringify(recentOrders.slice(0, 50))}` }], max_tokens: 500 })
const result = { orderCount: recentOrders.length, totalRevenue: recentOrders.reduce((sum, o: any) => sum + o.total, 0), analysis: analysis.response, generatedAt: new Date().toISOString() }
// Cache for 1 hour await env.CACHE.put('insights:latest', JSON.stringify(result), { expirationTtl: 3600 })
// Queue notification if revenue threshold hit if (result.totalRevenue > 10000) { await env.QUEUE.send({ type: 'revenue-alert', revenue: result.totalRevenue, period: '7d' }) }
return Response.json(result) }
return Response.json({ error: 'Not found' }, { status: 404 }) }}Use case: Business intelligence dashboards, automated reports, anomaly detection, any workflow that combines data retrieval with AI analysis.