Use Case GuidesData Synchronization

Building a Data Synchronization System

Learn how to build a bidirectional sync system between your app and third-party services (CRM, e-commerce platforms, analytics tools).

Use Case: CRM Data Sync

Requirements:

  • Sync contacts between your app and HubSpot/Salesforce
  • Real-time updates (bidirectional)
  • Conflict resolution
  • Batch import/export
  • Change tracking

Traditional Approach: 4-5 days
Aerostack Approach: 40-60 minutes


Architecture

Your App ←→ Aerostack API ←→ Queue Workers ←→ External CRM API

         Cache + Database

Step 1: Contact Sync API

Endpoint: POST /custom/sync-contact

export default async function(sdk, event) {
  const { contact_id, direction = 'to_crm' } = event.data;
  
  // Get contact from database
  const [contact] = await sdk.db.query(
    'SELECT * FROM contacts WHERE id = ?',
    [contact_id]
  );
  
  if (!contact) {
    throw new Error('Contact not found');
  }
  
  // Check if already syncing
  const lockKey = `sync:contact:${contact_id}`;
  const isLocked = await sdk.cache.get(lockKey);
  
  if (isLocked) {
    return { 
      status: 'already_syncing',
      message: 'This contact is currently being synced' 
    };
  }
  
  // Set lock (5 minutes)
  await sdk.cache.set(lockKey, true, 300);
  
  try {
    if (direction === 'to_crm') {
      // Sync to external CRM
      await sdk.queue.enqueue('sync-to-crm', {
        contact_id,
        contact_data: contact
      });
    } else {
      // Pull from external CRM
      await sdk.queue.enqueue('sync-from-crm', {
        contact_id,
        external_id: contact.external_id
      });
    }
    
    // Update sync status
    await sdk.db.query(`
      UPDATE contacts 
      SET sync_status = 'pending', last_sync_attempt = ?
      WHERE id = ?
    `, [Date.now(), contact_id]);
    
    return { 
      status: 'queued',
      contact_id,
      direction 
    };
  } finally {
    // Release lock after a short delay
    setTimeout(async () => {
      await sdk.cache.delete(lockKey);
    }, 5000);
  }
}

Step 2: Sync to CRM Worker

Worker: sync-to-crm

export async function syncToCRM(sdk, job) {
  const { contact_id, contact_data } = job.data;
  
  try {
    // Get CRM credentials
    const crmType = await sdk.secrets.get('CRM_TYPE'); // 'hubspot' or 'salesforce'
    const apiKey = await sdk.secrets.get('CRM_API_KEY');
    
    let externalId;
    
    if (crmType === 'hubspot') {
      externalId = await syncToHubSpot(sdk, contact_data, apiKey);
    } else if (crmType === 'salesforce') {
      externalId = await syncToSalesforce(sdk, contact_data, apiKey);
    } else {
      throw new Error(`Unsupported CRM: ${crmType}`);
    }
    
    // Update local record
    await sdk.db.query(`
      UPDATE contacts 
      SET 
        external_id = ?,
        sync_status = 'synced',
        last_synced_at = ?,
        sync_error = NULL
      WHERE id = ?
    `, [externalId, Date.now(), contact_id]);
    
    // Clear cache
    await sdk.cache.delete(`contact:${contact_id}`);
    
    return { success: true, external_id: externalId };
    
  } catch (error) {
    // Log error
    await sdk.db.query(`
      UPDATE contacts 
      SET 
        sync_status = 'failed',
        sync_error = ?,
        last_sync_attempt = ?
      WHERE id = ?
    `, [error.message, Date.now(), contact_id]);
    
    throw error; // Triggers retry
  }
}
 
async function syncToHubSpot(sdk, contact, apiKey) {
  const { external_id, email, first_name, last_name, phone, company } = contact;
  
  const method = external_id ? 'PATCH' : 'POST';
  const url = external_id 
    ? `https://api.hubapi.com/crm/v3/objects/contacts/${external_id}`
    : 'https://api.hubapi.com/crm/v3/objects/contacts';
  
  const response = await sdk.fetch(url, {
    method,
    headers: {
      'Authorization': `Bearer ${apiKey}`,
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      properties: {
        email,
        firstname: first_name,
        lastname: last_name,
        phone,
        company
      }
    })
  });
  
  const data = await response.json();
  return data.id;
}
 
async function syncToSalesforce(sdk, contact, credentials) {
  // Salesforce OAuth flow and API calls
  // ...similar pattern
}

Step 3: Webhook Receiver for CRM Updates

Endpoint: POST /custom/crm-webhook
Settings: Public access

export default async function(sdk, event) {
  const { external_id, updates, timestamp } = event.data;
  
  // Find local contact by external ID
  const [contact] = await sdk.db.query(
    'SELECT * FROM contacts WHERE external_id = ?',
    [external_id]
  );
  
  if (!contact) {
    return { 
      status: 'skipped',
      message: 'Contact not found locally' 
    };
  }
  
  // Check for conflicts
  if (contact.updated_at > timestamp) {
    // Local version is newer - need conflict resolution
    await sdk.queue.enqueue('resolve-sync-conflict', {
      contact_id: contact.id,
      local_data: contact,
      remote_data: updates,
      conflict_time: Date.now()
    });
    
    return { 
      status: 'conflict',
      message: 'Conflict detected, queued for resolution' 
    };
  }
  
  // Remote is newer - update local
  await sdk.db.query(`
    UPDATE contacts 
    SET 
      first_name = ?,
      last_name = ?,
      email = ?,
      phone = ?,
      company = ?,
      updated_at = ?,
      sync_status = 'synced',
      last_synced_at = ?
    WHERE id = ?
  `, [
    updates.firstname,
    updates.lastname,
    updates.email,
    updates.phone,
    updates.company,
    timestamp,
    Date.now(),
    contact.id
  ]);
  
  // Clear cache
  await sdk.cache.delete(`contact:${contact.id}`);
  
  return { 
    status: 'updated',
    contact_id: contact.id 
  };
}

Step 4: Conflict Resolution with AI

Worker: resolve-sync-conflict

export async function resolveSyncConflict(sdk, job) {
  const { contact_id, local_data, remote_data } = job.data;
  
  // Use AI to intelligently merge conflicting data
  const resolution = await sdk.ai.chat('@cf/meta/llama-3-8b-instruct', [
    {
      role: 'system',
      content: 'You are a data conflict resolver. Merge two versions of contact data intelligently, preferring the most recent or complete information. Return merged data as JSON.'
    },
    {
      role: 'user',
      content: `
        Local version (updated ${new Date(local_data.updated_at).toISOString()}):
        ${JSON.stringify(local_data)}
        
        Remote version (updated ${new Date(remote_data.timestamp).toISOString()}):
        ${JSON.stringify(remote_data)}
        
        Return the merged contact object.
      `
    }
  ]);
  
  // Parse AI response
  const merged = JSON.parse(resolution);
  
  // Update local with merged data
  await sdk.db.query(`
    UPDATE contacts 
    SET 
      first_name = ?,
      last_name = ?,
      email = ?,
      phone = ?,
      company = ?,
      updated_at = ?,
      sync_status = 'synced',
      last_synced_at = ?
    WHERE id = ?
  `, [
    merged.first_name,
    merged.last_name,
    merged.email,
    merged.phone,
    merged.company,
    Date.now(),
    Date.now(),
    contact_id
  ]);
  
  // Sync merged version back to CRM
  await sdk.queue.enqueue('sync-to-crm', {
    contact_id,
    contact_data: merged
  });
  
  // Log resolution
  await sdk.db.query(`
    INSERT INTO sync_conflicts (
      contact_id, local_version, remote_version, 
      resolution, resolved_at
    )
    VALUES (?, ?, ?, ?, ?)
  `, [
    contact_id,
    JSON.stringify(local_data),
    JSON.stringify(remote_data),
    JSON.stringify(merged),
    Date.now()
  ]);
  
  return { success: true, merged };
}

Step 5: Batch Import from CRM

Endpoint: POST /custom/import-from-crm

export default async function(sdk, event) {
  const { since_timestamp } = event.data;
  
  // Enqueue batch import job
  const jobId = `import_${Date.now()}`;
  
  await sdk.queue.enqueue('batch-import-crm', {
    job_id: jobId,
    since: since_timestamp || 0
  });
  
  // Create import job record
  await sdk.db.query(`
    INSERT INTO import_jobs (id, status, started_at)
    VALUES (?, 'running', ?)
  `, [jobId, Date.now()]);
  
  return { 
    job_id: jobId,
    status: 'started',
    message: 'Batch import has been queued'
  };
}

Worker: batch-import-crm

export async function batchImportCRM(sdk, job) {
  const { job_id, since } = job.data;
  
  const apiKey = await sdk.secrets.get('CRM_API_KEY');
  let imported = 0;
  let page = 0;
  const pageSize = 100;
  
  try {
    while (true) {
      // Fetch page of contacts from CRM
      const response = await sdk.fetch(
        `https://api.hubapi.com/crm/v3/objects/contacts?limit=${pageSize}&after=${page * pageSize}`,
        {
          headers: { 'Authorization': `Bearer ${apiKey}` }
        }
      );
      
      const data = await response.json();
      
      if (!data.results || data.results.length === 0) break;
      
      // Import each contact
      for (const crmContact of data.results) {
        const props = crmContact.properties;
        
        // Check if already exists
        const [existing] = await sdk.db.query(
          'SELECT id FROM contacts WHERE external_id = ?',
          [crmContact.id]
        );
        
        if (existing) {
          // Update
          await sdk.db.query(`
            UPDATE contacts 
            SET first_name = ?, last_name = ?, email = ?, phone = ?,
                company = ?, updated_at = ?
            WHERE external_id = ?
          `, [
            props.firstname, props.lastname, props.email, 
            props.phone, props.company, Date.now(), crmContact.id
          ]);
        } else {
          // Insert
          await sdk.db.query(`
            INSERT INTO contacts (
              external_id, first_name, last_name, email, 
              phone, company, created_at, updated_at
            )
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
          `, [
            crmContact.id, props.firstname, props.lastname, 
            props.email, props.phone, props.company, 
            Date.now(), Date.now()
          ]);
        }
        
        imported++;
      }
      
      page++;
      
      // Update progress
      await sdk.db.query(
        'UPDATE import_jobs SET imported_count = ? WHERE id = ?',
        [imported, job_id]
      );
    }
    
    // Mark as complete
    await sdk.db.query(`
      UPDATE import_jobs 
      SET status = 'completed', completed_at = ?, imported_count = ?
      WHERE id = ?
    `, [Date.now(), imported, job_id]);
    
    return { success: true, imported };
    
  } catch (error) {
    await sdk.db.query(
      'UPDATE import_jobs SET status = ?, error = ? WHERE id = ?',
      ['failed', error.message, job_id]
    );
    throw error;
  }
}

What You Built

In 40-60 minutes:

Bidirectional sync - Your app ↔ CRM
Real-time updates - Webhook-based
Conflict resolution - AI-powered intelligent merging
Batch import - Handle thousands of contacts
Change tracking - Full audit trail
Lock mechanism - Prevent duplicate syncs
Error handling - Automatic retries

No complex queue setup. No Redis locks. No conflict resolution library. Just code.


Database Schema

CREATE TABLE contacts (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  external_id TEXT UNIQUE,
  first_name TEXT,
  last_name TEXT,
  email TEXT,
  phone TEXT,
  company TEXT,
  sync_status TEXT DEFAULT 'pending',
  sync_error TEXT,
  last_sync_attempt INTEGER,
  last_synced_at INTEGER,
  created_at INTEGER NOT NULL,
  updated_at INTEGER NOT NULL
);
 
CREATE TABLE import_jobs (
  id TEXT PRIMARY KEY,
  status TEXT NOT NULL,
  imported_count INTEGER DEFAULT 0,
  error TEXT,
  started_at INTEGER NOT NULL,
  completed_at INTEGER
);
 
CREATE TABLE sync_conflicts (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  contact_id INTEGER NOT NULL,
  local_version TEXT NOT NULL,
  remote_version TEXT NOT NULL,
  resolution TEXT NOT NULL,
  resolved_at INTEGER NOT NULL
);

Production Features

  • Conflict detection - Timestamp-based
  • AI resolution - Smart data merging
  • Batch processing - Paginated imports
  • Retry logic - Automatic on failure
  • Monitoring - Import job tracking
  • Audit trail - All conflicts logged

Ready to sync your data?

Start Building →