# Patterns & Recipes

> Production-ready code patterns for REST APIs, queue consumers, cron jobs, webhook receivers, RAG pipelines, and data enrichment functions.

Copy-paste-ready patterns for common function architectures. Each pattern is a complete, working function.

---

## REST API Handler

Route requests by method and pathname. This is the most common function pattern — a lightweight API server with no framework required.

```typescript
interface Env {
  DB: Database
  CACHE: Cache
}

  async fetch(request: Request, env: Env): Promise {
    const url = new URL(request.url)
    const path = url.pathname
    const method = request.method

    // CORS preflight
    if (method === 'OPTIONS') {
      return new Response(null, {
        headers: {
          'Access-Control-Allow-Origin': '*',
          'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE',
          'Access-Control-Allow-Headers': 'Content-Type, Authorization',
        }
      })
    }

    try {
      // GET /api/items
      if (method === 'GET' && path === '/api/items') {
        const { results } = await env.DB
          .prepare('SELECT * FROM items ORDER BY created_at DESC LIMIT 50')
          .all()
        return json(results)
      }

      // GET /api/items/:id
      if (method === 'GET' && path.match(/^\/api\/items\/\d+$/)) {
        const id = path.split('/').pop()
        const item = await env.DB
          .prepare('SELECT * FROM items WHERE id = ?')
          .bind(id)
          .first()

        if (!item) return json({ error: 'Not found' }, 404)
        return json(item)
      }

      // POST /api/items
      if (method === 'POST' && path === '/api/items') {
        const body = await request.json<{ name: string; description: string }>()

        if (!body.name) return json({ error: 'name is required' }, 400)

        const item = await env.DB
          .prepare('INSERT INTO items (name, description) VALUES (?, ?) RETURNING *')
          .bind(body.name, body.description || '')
          .first()

        return json(item, 201)
      }

      // DELETE /api/items/:id
      if (method === 'DELETE' && path.match(/^\/api\/items\/\d+$/)) {
        const id = path.split('/').pop()
        await env.DB.prepare('DELETE FROM items WHERE id = ?').bind(id).run()
        return json({ deleted: true })
      }

      return json({ error: 'Not found' }, 404)
    } catch (err) {
      console.error('Request failed:', err)
      return json({ error: 'Internal server error' }, 500)
    }
  }
}

function json(data: unknown, status = 200) {
  return Response.json(data, {
    status,
    headers: {
      'Access-Control-Allow-Origin': '*',
      'Content-Type': 'application/json'
    }
  })
}
```

**Use case:** Any API backend — user management, content APIs, form submissions, internal tools.

---

## Queue Consumer

Accept work via HTTP, enqueue it, and process it asynchronously. The HTTP response returns immediately while the job runs in the background.

```typescript
interface Env {
  DB: Database
  QUEUE: Queue
}

interface Job {
  type: string
  payload: Record<string, unknown>
  createdAt: number
}

  async fetch(request: Request, env: Env): Promise {
    if (request.method !== 'POST') {
      return Response.json({ error: 'POST only' }, { status: 405 })
    }

    const body = await request.json<{ type: string; payload: Record<string, unknown> }>()

    // Validate
    if (!body.type) {
      return Response.json({ error: 'type is required' }, { status: 400 })
    }

    // Record job in DB
    const job = await env.DB
      .prepare('INSERT INTO jobs (type, payload, status) VALUES (?, ?, ?) RETURNING id')
      .bind(body.type, JSON.stringify(body.payload), 'queued')
      .first<{ id: number }>()

    // Enqueue for async processing
    await env.QUEUE.send({
      type: body.type,
      payload: body.payload,
      jobId: job!.id,
      createdAt: Date.now()
    })

    return Response.json({ jobId: job!.id, status: 'queued' }, { status: 202 })
  },

  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const msg of batch.messages) {
      const { type, payload, jobId } = msg.body as Job & { jobId: number }

      try {
        // Update status to processing
        await env.DB
          .prepare('UPDATE jobs SET status = ? WHERE id = ?')
          .bind('processing', jobId)
          .run()

        // Do the actual work
        switch (type) {
          case 'generate-report':
            await generateReport(env, payload)
            break
          case 'send-notification':
            await sendNotification(payload)
            break
          case 'process-upload':
            await processUpload(env, payload)
            break
          default:
            throw new Error(`Unknown job type: ${type}`)
        }

        // Mark complete
        await env.DB
          .prepare('UPDATE jobs SET status = ?, completed_at = datetime(\'now\') WHERE id = ?')
          .bind('completed', jobId)
          .run()

        msg.ack()
      } catch (err) {
        console.error(`Job ${jobId} failed:`, err)

        await env.DB
          .prepare('UPDATE jobs SET status = ?, error = ? WHERE id = ?')
          .bind('failed', String(err), jobId)
          .run()

        msg.retry()
      }
    }
  }
}

async function generateReport(env: Env, payload: Record<string, unknown>) {
  // Your report generation logic
}

async function sendNotification(payload: Record<string, unknown>) {
  // Your notification logic
}

async function processUpload(env: Env, payload: Record<string, unknown>) {
  // Your upload processing logic
}
```

**Use case:** Email sending, report generation, image processing, any work that should not block the user's HTTP request.

---

## Cron / Scheduled Function

Run on a schedule. No HTTP trigger needed — Cloudflare invokes `scheduled()` based on your cron expression.

```typescript
interface Env {
  DB: Database
  CACHE: Cache
  QUEUE: Queue
}

  // Optional: also handle HTTP for health checks
  async fetch(request: Request, env: Env): Promise {
    return Response.json({ status: 'ok', next: 'runs every hour' })
  },

  async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext): Promise<void> {
    // Clean up expired sessions
    const deleted = await env.DB
      .prepare('DELETE FROM sessions WHERE expires_at < datetime(\'now\')')
      .run()
    console.log(`Cleaned ${deleted.meta.changes} expired sessions`)

    // Compute and cache daily stats
    const stats = await env.DB
      .prepare(`
        SELECT
          COUNT(*) as total_users,
          COUNT(CASE WHEN created_at > datetime('now', '-1 day') THEN 1 END) as new_today,
          COUNT(CASE WHEN last_active > datetime('now', '-1 hour') THEN 1 END) as active_now
        FROM users
      `)
      .first()

    await env.CACHE.put('stats:daily', JSON.stringify(stats), { expirationTtl: 7200 })

    // Queue a summary notification
    await env.QUEUE.send({
      type: 'daily-digest',
      stats,
      timestamp: Date.now()
    })
  }
}
```

Configure the schedule in `aerostack.toml`:

```toml
[triggers]
crons = ["0 * * * *"]    # Every hour
# crons = ["*/5 * * * *"] # Every 5 minutes
```

**Use case:** Cleanup jobs, analytics aggregation, report generation, monitoring checks, periodic sync.

---

## Webhook Receiver

Accept incoming webhooks, validate their signature, and process them reliably.

```typescript
interface Env {
  DB: Database
  QUEUE: Queue
  CACHE: Cache
}

  async fetch(request: Request, env: Env): Promise {
    if (request.method !== 'POST') {
      return new Response('Method not allowed', { status: 405 })
    }

    const url = new URL(request.url)
    const body = await request.text()

    // --- Stripe webhook ---
    if (url.pathname === '/webhooks/stripe') {
      const signature = request.headers.get('stripe-signature')
      if (!signature) return new Response('Missing signature', { status: 401 })

      // Verify signature (simplified — use stripe SDK in production)
      const isValid = await verifyStripeSignature(body, signature, env)
      if (!isValid) return new Response('Invalid signature', { status: 401 })

      const event = JSON.parse(body)

      // Deduplicate using event ID
      const seen = await env.CACHE.get(`webhook:stripe:${event.id}`)
      if (seen) return Response.json({ received: true, deduplicated: true })

      // Mark as seen (24h TTL)
      await env.CACHE.put(`webhook:stripe:${event.id}`, '1', { expirationTtl: 86400 })

      // Log and enqueue for processing
      await env.DB
        .prepare('INSERT INTO webhook_events (source, event_id, type, payload) VALUES (?, ?, ?, ?)')
        .bind('stripe', event.id, event.type, body)
        .run()

      await env.QUEUE.send({ source: 'stripe', event })

      return Response.json({ received: true })
    }

    // --- GitHub webhook ---
    if (url.pathname === '/webhooks/github') {
      const signature = request.headers.get('x-hub-signature-256')
      if (!signature) return new Response('Missing signature', { status: 401 })

      const isValid = await verifyGitHubSignature(body, signature, env)
      if (!isValid) return new Response('Invalid signature', { status: 401 })

      const event = JSON.parse(body)
      const eventType = request.headers.get('x-github-event') || 'unknown'

      await env.QUEUE.send({ source: 'github', eventType, event })

      return Response.json({ received: true })
    }

    return new Response('Unknown webhook endpoint', { status: 404 })
  }
}

async function verifyStripeSignature(body: string, signature: string, env: Env): Promise<boolean> {
  // Implement Stripe webhook signature verification
  // See: https://stripe.com/docs/webhooks/signatures
  return true // Replace with actual verification
}

async function verifyGitHubSignature(body: string, signature: string, env: Env): Promise<boolean> {
  // Implement GitHub HMAC-SHA256 verification
  // See: https://docs.github.com/en/webhooks/using-webhooks/validating-webhook-deliveries
  return true // Replace with actual verification
}
```

**Use case:** Payment processors, CI/CD pipelines, form submissions, third-party integrations — any external system that sends events to your endpoint.

---

## RAG Pipeline

A complete retrieval-augmented generation pipeline: ingest documents, chunk them, generate embeddings, store in vector search, and query with AI-powered answers.

```mermaid
flowchart LR
    A["Ingest\nDocument"] --> B["Chunk\nText"]
    B --> C["Embed\n(AI)"]
    C --> D["Store\n(Vector Search)"]

    E["User\nQuery"] --> F["Embed\nQuery"]
    F --> G["Search\n(Vector Search)"]
    G --> H["Build\nPrompt"]
    H --> I["Generate\nAnswer (AI)"]

    style A fill:#1e293b,stroke:#6b7280,color:#fff
    style B fill:#1e293b,stroke:#6b7280,color:#fff
    style C fill:#3b82f6,stroke:#2563eb,color:#fff
    style D fill:#1e293b,stroke:#6b7280,color:#fff
    style E fill:#1e293b,stroke:#6b7280,color:#fff
    style F fill:#3b82f6,stroke:#2563eb,color:#fff
    style G fill:#1e293b,stroke:#6b7280,color:#fff
    style H fill:#1e293b,stroke:#6b7280,color:#fff
    style I fill:#3b82f6,stroke:#2563eb,color:#fff
```

```typescript
interface Env {
  DB: Database
  AI: AI
  VECTORIZE: VectorSearch
  STORAGE: Storage
}

  async fetch(request: Request, env: Env): Promise {
    const url = new URL(request.url)

    // --- Ingest a document ---
    if (request.method === 'POST' && url.pathname === '/ingest') {
      const { title, content } = await request.json<{ title: string; content: string }>()

      // Store original document
      const docId = crypto.randomUUID()
      await env.DB
        .prepare('INSERT INTO documents (id, title, content) VALUES (?, ?, ?)')
        .bind(docId, title, content)
        .run()

      // Chunk the content (simple fixed-size chunking)
      const chunks = chunkText(content, 500, 50)

      // Embed and store each chunk
      for (let i = 0; i < chunks.length; i++) {
        const embedding = await env.AI.run('@cf/baai/bge-base-en-v1.5', {
          text: chunks[i]
        })

        await env.VECTORIZE.upsert([{
          id: `${docId}-chunk-${i}`,
          values: embedding.data[0],
          metadata: {
            docId,
            title,
            chunkIndex: i,
            text: chunks[i]
          }
        }])
      }

      return Response.json({ docId, chunks: chunks.length })
    }

    // --- Query with RAG ---
    if (request.method === 'POST' && url.pathname === '/query') {
      const { question } = await request.json<{ question: string }>()

      // 1. Embed the question
      const queryEmbedding = await env.AI.run('@cf/baai/bge-base-en-v1.5', {
        text: question
      })

      // 2. Find relevant chunks
      const results = await env.VECTORIZE.query(queryEmbedding.data[0], {
        topK: 5,
        returnMetadata: 'all'
      })

      // 3. Build context from retrieved chunks
      const context = results.matches
        .map(m => m.metadata?.text as string)
        .filter(Boolean)
        .join('\n\n---\n\n')

      // 4. Generate answer with context
      const answer = await env.AI.run('@cf/meta/llama-3.1-8b-instruct', {
        messages: [
          {
            role: 'system',
            content: `Answer the user's question based on the following context. If the context does not contain enough information, say so. Do not make up information.\n\nContext:\n${context}`
          },
          { role: 'user', content: question }
        ],
        max_tokens: 500
      })

      return Response.json({
        answer: answer.response,
        sources: results.matches.map(m => ({
          docId: m.metadata?.docId,
          title: m.metadata?.title,
          score: m.score
        }))
      })
    }

    return Response.json({ error: 'Not found' }, { status: 404 })
  }
}

/**
 * Split text into overlapping chunks.
 * @param text - The full text to chunk
 * @param maxLength - Maximum characters per chunk
 * @param overlap - Characters of overlap between chunks
 */
function chunkText(text: string, maxLength: number, overlap: number): string[] {
  const chunks: string[] = []
  let start = 0

  while (start < text.length) {
    const end = Math.min(start + maxLength, text.length)
    chunks.push(text.slice(start, end))
    start += maxLength - overlap
  }

  return chunks
}
```

**Use case:** Knowledge bases, documentation search, customer support bots, any application where you need AI answers grounded in your own data.

---

## Data Enrichment

Query your database, analyze results with AI, cache the output. A common pattern for dashboards, reports, and intelligent notifications.

```typescript
interface Env {
  DB: Database
  AI: AI
  CACHE: Cache
  QUEUE: Queue
}

  async fetch(request: Request, env: Env): Promise {
    const url = new URL(request.url)

    if (url.pathname === '/api/insights') {
      // Check cache first (insights are expensive to generate)
      const cached = await env.CACHE.get('insights:latest', 'json')
      if (cached) return Response.json(cached)

      // Pull raw data from DB
      const { results: recentOrders } = await env.DB
        .prepare(`
          SELECT o.*, u.name as customer_name
          FROM orders o
          JOIN users u ON o.user_id = u.id
          WHERE o.created_at > datetime('now', '-7 days')
          ORDER BY o.total DESC
        `)
        .all()

      // Analyze with AI
      const analysis = await env.AI.run('@cf/meta/llama-3.1-8b-instruct', {
        messages: [{
          role: 'user',
          content: `Analyze these orders from the past week and provide 3 key insights about trends, top customers, and recommendations:\n\n${JSON.stringify(recentOrders.slice(0, 50))}`
        }],
        max_tokens: 500
      })

      const result = {
        orderCount: recentOrders.length,
        totalRevenue: recentOrders.reduce((sum, o: any) => sum + o.total, 0),
        analysis: analysis.response,
        generatedAt: new Date().toISOString()
      }

      // Cache for 1 hour
      await env.CACHE.put('insights:latest', JSON.stringify(result), {
        expirationTtl: 3600
      })

      // Queue notification if revenue threshold hit
      if (result.totalRevenue > 10000) {
        await env.QUEUE.send({
          type: 'revenue-alert',
          revenue: result.totalRevenue,
          period: '7d'
        })
      }

      return Response.json(result)
    }

    return Response.json({ error: 'Not found' }, { status: 404 })
  }
}
```

**Use case:** Business intelligence dashboards, automated reports, anomaly detection, any workflow that combines data retrieval with AI analysis.
