Building a Data Synchronization System
Learn how to build a bidirectional sync system between your app and third-party services (CRM, e-commerce platforms, analytics tools).
Use Case: CRM Data Sync
Requirements:
- Sync contacts between your app and HubSpot/Salesforce
- Real-time updates (bidirectional)
- Conflict resolution
- Batch import/export
- Change tracking
Traditional Approach: 4-5 days
Aerostack Approach: 40-60 minutes
Architecture
Your App ←→ Aerostack API ←→ Queue Workers ←→ External CRM API
↓
Cache + DatabaseStep 1: Contact Sync API
Endpoint: POST /custom/sync-contact
export default async function(sdk, event) {
const { contact_id, direction = 'to_crm' } = event.data;
// Get contact from database
const [contact] = await sdk.db.query(
'SELECT * FROM contacts WHERE id = ?',
[contact_id]
);
if (!contact) {
throw new Error('Contact not found');
}
// Check if already syncing
const lockKey = `sync:contact:${contact_id}`;
const isLocked = await sdk.cache.get(lockKey);
if (isLocked) {
return {
status: 'already_syncing',
message: 'This contact is currently being synced'
};
}
// Set lock (5 minutes)
await sdk.cache.set(lockKey, true, 300);
try {
if (direction === 'to_crm') {
// Sync to external CRM
await sdk.queue.enqueue('sync-to-crm', {
contact_id,
contact_data: contact
});
} else {
// Pull from external CRM
await sdk.queue.enqueue('sync-from-crm', {
contact_id,
external_id: contact.external_id
});
}
// Update sync status
await sdk.db.query(`
UPDATE contacts
SET sync_status = 'pending', last_sync_attempt = ?
WHERE id = ?
`, [Date.now(), contact_id]);
return {
status: 'queued',
contact_id,
direction
};
} finally {
// Release lock after a short delay
setTimeout(async () => {
await sdk.cache.delete(lockKey);
}, 5000);
}
}Step 2: Sync to CRM Worker
Worker: sync-to-crm
export async function syncToCRM(sdk, job) {
const { contact_id, contact_data } = job.data;
try {
// Get CRM credentials
const crmType = await sdk.secrets.get('CRM_TYPE'); // 'hubspot' or 'salesforce'
const apiKey = await sdk.secrets.get('CRM_API_KEY');
let externalId;
if (crmType === 'hubspot') {
externalId = await syncToHubSpot(sdk, contact_data, apiKey);
} else if (crmType === 'salesforce') {
externalId = await syncToSalesforce(sdk, contact_data, apiKey);
} else {
throw new Error(`Unsupported CRM: ${crmType}`);
}
// Update local record
await sdk.db.query(`
UPDATE contacts
SET
external_id = ?,
sync_status = 'synced',
last_synced_at = ?,
sync_error = NULL
WHERE id = ?
`, [externalId, Date.now(), contact_id]);
// Clear cache
await sdk.cache.delete(`contact:${contact_id}`);
return { success: true, external_id: externalId };
} catch (error) {
// Log error
await sdk.db.query(`
UPDATE contacts
SET
sync_status = 'failed',
sync_error = ?,
last_sync_attempt = ?
WHERE id = ?
`, [error.message, Date.now(), contact_id]);
throw error; // Triggers retry
}
}
async function syncToHubSpot(sdk, contact, apiKey) {
const { external_id, email, first_name, last_name, phone, company } = contact;
const method = external_id ? 'PATCH' : 'POST';
const url = external_id
? `https://api.hubapi.com/crm/v3/objects/contacts/${external_id}`
: 'https://api.hubapi.com/crm/v3/objects/contacts';
const response = await sdk.fetch(url, {
method,
headers: {
'Authorization': `Bearer ${apiKey}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
properties: {
email,
firstname: first_name,
lastname: last_name,
phone,
company
}
})
});
const data = await response.json();
return data.id;
}
async function syncToSalesforce(sdk, contact, credentials) {
// Salesforce OAuth flow and API calls
// ...similar pattern
}Step 3: Webhook Receiver for CRM Updates
Endpoint: POST /custom/crm-webhook
Settings: Public access
export default async function(sdk, event) {
const { external_id, updates, timestamp } = event.data;
// Find local contact by external ID
const [contact] = await sdk.db.query(
'SELECT * FROM contacts WHERE external_id = ?',
[external_id]
);
if (!contact) {
return {
status: 'skipped',
message: 'Contact not found locally'
};
}
// Check for conflicts
if (contact.updated_at > timestamp) {
// Local version is newer - need conflict resolution
await sdk.queue.enqueue('resolve-sync-conflict', {
contact_id: contact.id,
local_data: contact,
remote_data: updates,
conflict_time: Date.now()
});
return {
status: 'conflict',
message: 'Conflict detected, queued for resolution'
};
}
// Remote is newer - update local
await sdk.db.query(`
UPDATE contacts
SET
first_name = ?,
last_name = ?,
email = ?,
phone = ?,
company = ?,
updated_at = ?,
sync_status = 'synced',
last_synced_at = ?
WHERE id = ?
`, [
updates.firstname,
updates.lastname,
updates.email,
updates.phone,
updates.company,
timestamp,
Date.now(),
contact.id
]);
// Clear cache
await sdk.cache.delete(`contact:${contact.id}`);
return {
status: 'updated',
contact_id: contact.id
};
}Step 4: Conflict Resolution with AI
Worker: resolve-sync-conflict
export async function resolveSyncConflict(sdk, job) {
const { contact_id, local_data, remote_data } = job.data;
// Use AI to intelligently merge conflicting data
const resolution = await sdk.ai.chat('@cf/meta/llama-3-8b-instruct', [
{
role: 'system',
content: 'You are a data conflict resolver. Merge two versions of contact data intelligently, preferring the most recent or complete information. Return merged data as JSON.'
},
{
role: 'user',
content: `
Local version (updated ${new Date(local_data.updated_at).toISOString()}):
${JSON.stringify(local_data)}
Remote version (updated ${new Date(remote_data.timestamp).toISOString()}):
${JSON.stringify(remote_data)}
Return the merged contact object.
`
}
]);
// Parse AI response
const merged = JSON.parse(resolution);
// Update local with merged data
await sdk.db.query(`
UPDATE contacts
SET
first_name = ?,
last_name = ?,
email = ?,
phone = ?,
company = ?,
updated_at = ?,
sync_status = 'synced',
last_synced_at = ?
WHERE id = ?
`, [
merged.first_name,
merged.last_name,
merged.email,
merged.phone,
merged.company,
Date.now(),
Date.now(),
contact_id
]);
// Sync merged version back to CRM
await sdk.queue.enqueue('sync-to-crm', {
contact_id,
contact_data: merged
});
// Log resolution
await sdk.db.query(`
INSERT INTO sync_conflicts (
contact_id, local_version, remote_version,
resolution, resolved_at
)
VALUES (?, ?, ?, ?, ?)
`, [
contact_id,
JSON.stringify(local_data),
JSON.stringify(remote_data),
JSON.stringify(merged),
Date.now()
]);
return { success: true, merged };
}Step 5: Batch Import from CRM
Endpoint: POST /custom/import-from-crm
export default async function(sdk, event) {
const { since_timestamp } = event.data;
// Enqueue batch import job
const jobId = `import_${Date.now()}`;
await sdk.queue.enqueue('batch-import-crm', {
job_id: jobId,
since: since_timestamp || 0
});
// Create import job record
await sdk.db.query(`
INSERT INTO import_jobs (id, status, started_at)
VALUES (?, 'running', ?)
`, [jobId, Date.now()]);
return {
job_id: jobId,
status: 'started',
message: 'Batch import has been queued'
};
}Worker: batch-import-crm
export async function batchImportCRM(sdk, job) {
const { job_id, since } = job.data;
const apiKey = await sdk.secrets.get('CRM_API_KEY');
let imported = 0;
let page = 0;
const pageSize = 100;
try {
while (true) {
// Fetch page of contacts from CRM
const response = await sdk.fetch(
`https://api.hubapi.com/crm/v3/objects/contacts?limit=${pageSize}&after=${page * pageSize}`,
{
headers: { 'Authorization': `Bearer ${apiKey}` }
}
);
const data = await response.json();
if (!data.results || data.results.length === 0) break;
// Import each contact
for (const crmContact of data.results) {
const props = crmContact.properties;
// Check if already exists
const [existing] = await sdk.db.query(
'SELECT id FROM contacts WHERE external_id = ?',
[crmContact.id]
);
if (existing) {
// Update
await sdk.db.query(`
UPDATE contacts
SET first_name = ?, last_name = ?, email = ?, phone = ?,
company = ?, updated_at = ?
WHERE external_id = ?
`, [
props.firstname, props.lastname, props.email,
props.phone, props.company, Date.now(), crmContact.id
]);
} else {
// Insert
await sdk.db.query(`
INSERT INTO contacts (
external_id, first_name, last_name, email,
phone, company, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`, [
crmContact.id, props.firstname, props.lastname,
props.email, props.phone, props.company,
Date.now(), Date.now()
]);
}
imported++;
}
page++;
// Update progress
await sdk.db.query(
'UPDATE import_jobs SET imported_count = ? WHERE id = ?',
[imported, job_id]
);
}
// Mark as complete
await sdk.db.query(`
UPDATE import_jobs
SET status = 'completed', completed_at = ?, imported_count = ?
WHERE id = ?
`, [Date.now(), imported, job_id]);
return { success: true, imported };
} catch (error) {
await sdk.db.query(
'UPDATE import_jobs SET status = ?, error = ? WHERE id = ?',
['failed', error.message, job_id]
);
throw error;
}
}What You Built
In 40-60 minutes:
✅ Bidirectional sync - Your app ↔ CRM
✅ Real-time updates - Webhook-based
✅ Conflict resolution - AI-powered intelligent merging
✅ Batch import - Handle thousands of contacts
✅ Change tracking - Full audit trail
✅ Lock mechanism - Prevent duplicate syncs
✅ Error handling - Automatic retries
No complex queue setup. No Redis locks. No conflict resolution library. Just code.
Database Schema
CREATE TABLE contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
external_id TEXT UNIQUE,
first_name TEXT,
last_name TEXT,
email TEXT,
phone TEXT,
company TEXT,
sync_status TEXT DEFAULT 'pending',
sync_error TEXT,
last_sync_attempt INTEGER,
last_synced_at INTEGER,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE TABLE import_jobs (
id TEXT PRIMARY KEY,
status TEXT NOT NULL,
imported_count INTEGER DEFAULT 0,
error TEXT,
started_at INTEGER NOT NULL,
completed_at INTEGER
);
CREATE TABLE sync_conflicts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
contact_id INTEGER NOT NULL,
local_version TEXT NOT NULL,
remote_version TEXT NOT NULL,
resolution TEXT NOT NULL,
resolved_at INTEGER NOT NULL
);Production Features
- ✅ Conflict detection - Timestamp-based
- ✅ AI resolution - Smart data merging
- ✅ Batch processing - Paginated imports
- ✅ Retry logic - Automatic on failure
- ✅ Monitoring - Import job tracking
- ✅ Audit trail - All conflicts logged
Ready to sync your data?
Start Building →