Skip to main content

Supabase Read-After-Write Consistency

This guide explains the critical read-after-write consistency issue with Supabase read replicas and how CONA solves it for financial operations.

๐Ÿšจ The Problem

What Was Happening

Shopify invoices were created successfully, but 20-30% had no accounting impacts (GL entries). The workflow reported success, but financial data was incomplete.

Root Cause Discovery

The issue wasnโ€™t a workflow failure - it was silent data corruption:
  1. โœ… Document created in PRIMARY database
  2. โœ… Line items created in PRIMARY database
  3. โŒ Child workflow queries REPLICA (via Supavisor pooler)
  4. โœ… Document found in replica
  5. โŒ Line items missing (not yet synced from primary)
  6. โœ… Financial entry creation โ€œsucceedsโ€ with empty line items
  7. โŒ No GL entries created (or incomplete)
This is worse than a failure because:
  • No retries triggered (workflow thinks it succeeded)
  • No error logs (silent corruption)
  • Discovered days later by users

Technical Details

// Query hits Supabase READ REPLICA
const document = await prisma.documents.findUnique({
  where: { id: documentId },
  include: {
    line_items: true, // โŒ Returns [] due to replica lag!
  },
});

// Can't distinguish between:
// 1. Legitimate empty line items
// 2. Missing line items due to replica lag
if (document.line_items.length === 0) {
  // โŒ Skips GL entry creation or creates incomplete entries
}
Why it happens:
  • documents table row syncs quickly (50-500ms)
  • line_items table (foreign key) lags behind
  • JOIN queries return incomplete data

โœ… The Solution

1. Direct Primary Connection

We created a separate Prisma client that connects directly to Supabase PRIMARY database, bypassing read replicas entirely. Files Created/Modified:
// packages/database/src/direct-client.ts (NEW)
import { PrismaClient } from "../generated/client";

export const prismaDirect = new PrismaClient({
  datasources: {
    db: {
      url: process.env.DIRECT_URL, // Port 5432 (primary)
    },
  },
});
// packages/core/src/documents/get-document-by-id.ts
export async function getDocumentById({
  id,
  organizationId,
  useDirect = false, // โœ… New parameter
}: GetDocumentByIdParams) {
  // Switch between pooled and direct connection
  const client = useDirect ? prismaDirect : prisma;

  const document = await client.documents.findUnique({
    // ... includes line_items and all relations
  });
}
// packages/core/src/services/create-financial-entries.ts
// CRITICAL: Always use direct connection for financial operations
const document = await getDocumentById({
  id: documentId,
  organizationId,
  useDirect: true, // โœ… Bypasses replicas - gets complete data
});

// Also use direct connection for allocations
const allocations = await prismaDirect.document_payment_allocations.findMany({
  where: { document_id: documentId },
});

How It Works

AspectPooled (Before)Direct (After)
Port6543 (Supavisor)5432 (Primary)
RoutingLoad-balanced to replicasDirect to primary
Data freshnessStale (50-500ms lag)Always current
Line itemsMay be missing โŒAlways complete โœ…
ConsistencyEventually consistentStrongly consistent

2. READ_AFTER_WRITE_RETRY Policy

Added specialized retry policy for activities that query recently created data:
// packages/temporal-workflows/src/retry-configs.ts
export const READ_AFTER_WRITE_RETRY: RetryPolicy = {
  initialInterval: "1s", // Quick first retry
  backoffCoefficient: 1.5, // Gentle backoff
  maximumInterval: "10s", // Cap at 10 seconds
  maximumAttempts: 8, // 8 attempts over ~40 seconds
};
Applied to batchCreateFinancialEntriesActivity in workflow configuration.

3. Fixed Sleep Placement

Moved workflow.sleep("500ms") to BEFORE re-queries in parent workflows:
// BEFORE: Sleep happened AFTER re-query (useless)
const docs = await findExistingDocs(...);
await workflow.sleep("500ms"); // โŒ Too late!

// AFTER: Sleep happens BEFORE re-query (effective)
await workflow.sleep("500ms"); // โœ… Gives replicas time
const docs = await findExistingDocs(...);

๐Ÿ”ง Setup Required

Environment Variables

Add DIRECT_URL to your environment files:
# Pooled connection via Supavisor (read replicas)
DATABASE_URL=postgresql://postgres.xxx:6543/postgres?pgbouncer=true

# Direct connection to primary (no replicas)
DIRECT_URL=postgresql://postgres.xxx:5432/postgres
Get both URLs from Supabase:
  1. Go to Project Settings โ†’ Database
  2. Copy โ€œConnection stringโ€ (pooled) โ†’ DATABASE_URL
  3. Copy โ€œDirect connectionโ€ โ†’ DIRECT_URL

Build & Deploy

# Build database package
cd packages/database && pnpm run build

# Build core package
cd packages/core && pnpm run build

# Build temporal workflows
cd packages/temporal-workflows && pnpm run build

# Deploy workers
cd apps/temporal-workers && pnpm run build

๐Ÿ“Š When to Use Each Connection

Use CaseConnectionWhy
Financial entry creationprismaDirectMust have complete line items
Reconciliation checksprismaDirectRequires consistent view
Audit trail queriesprismaDirectMust be accurate
Document creationprismaWrites go to primary anyway
Bulk queriesprismaSpread load across replicas
User dashboard queriesprismaFast response from nearby replica

๐ŸŽฏ Expected Results

Before Fix

  • โŒ 20-30% invoices missing accounting impacts
  • โŒ Silent failures (no error logs)
  • โŒ Discovered days later by users
  • โŒ Manual intervention required

After Fix

  • โœ… >99.9% success rate for financial entries
  • โœ… Complete line items always fetched
  • โœ… Zero replica lag issues
  • โœ… Immediate consistency

๐ŸŽ›๏ธ Tuning Connection Pools (Advanced)

When to Adjust Connection Limits

Increase direct connection limit if:
  • โœ… You see frequent P2024 pool timeout errors
  • โœ… Financial entry creation is consistently slow (>5s)
  • โœ… Your PRIMARY database can handle more connections
  • โœ… Youโ€™re running on dedicated infrastructure (not serverless)
Keep it conservative if:
  • โš ๏ธ Running on serverless (multiple instances ร— connections = exhaustion)
  • โš ๏ธ Sharing database with other applications
  • โš ๏ธ Limited database connection capacity

Environment-Based Configuration

For different environments, you can override defaults:
# Production - Conservative (serverless)
DIRECT_URL=postgresql://...?connection_limit=3&pool_timeout=30

# Staging - Moderate (dedicated workers)
DIRECT_URL=postgresql://...?connection_limit=5&pool_timeout=20

# Development - Relaxed (local/dedicated)
DIRECT_URL=postgresql://...?connection_limit=10&pool_timeout=15

Connection Pool Monitoring

Use Prismaโ€™s built-in metrics to track pool health:
import { prisma, prismaDirect } from "@cona/database";

// Log connection pool metrics
const metrics = await prisma.$metrics.json();

// Key metrics to monitor:
const poolMetrics = {
  open: metrics.gauges.find((g) => g.key === "prisma_pool_connections_open"),
  idle: metrics.gauges.find((g) => g.key === "prisma_pool_connections_idle"),
  busy: metrics.gauges.find((g) => g.key === "prisma_pool_connections_busy"),
};

console.log("Pool status:", {
  totalOpen: poolMetrics.open?.value,
  idle: poolMetrics.idle?.value,
  busy: poolMetrics.busy?.value,
  utilization: `${((poolMetrics.busy?.value / poolMetrics.open?.value) * 100).toFixed(1)}%`,
});

Watch for Pool Exhaustion

Error Code P2024 indicates pool timeout:
// This error means queries waited too long for a connection
PrismaClientKnownRequestError: Timed out fetching a new connection from the pool
Error code: P2024
Solutions:
  1. Increase connection_limit (if database can handle it)
  2. Increase pool_timeout (if operations legitimately take longer)
  3. Optimize slow queries
  4. Add external pooler (PgBouncer) for serverless

Optimal Settings by Deployment

EnvironmentDirect ConnectionsPool TimeoutRationale
Serverless (Fly.io)330sMultiple instances, conserve connections
Dedicated Workers520sSingle instance, balanced approach
Local Development1015sNo connection constraints
High-Load Production10 + PgBouncer30sExternal pooler handles scaling

๐Ÿ” Monitoring

Connection Pool Health

-- Check current database connections
SELECT
  datname,
  usename,
  application_name,
  client_addr,
  state,
  COUNT(*) as connection_count
FROM pg_stat_activity
WHERE datname = 'postgres'
GROUP BY datname, usename, application_name, client_addr, state
ORDER BY connection_count DESC;

-- Check connection limits
SELECT
  setting as max_connections
FROM pg_settings
WHERE name = 'max_connections';

-- Calculate current utilization
SELECT
  COUNT(*) as current_connections,
  (SELECT setting::int FROM pg_settings WHERE name = 'max_connections') as max_connections,
  ROUND(COUNT(*)::numeric / (SELECT setting::int FROM pg_settings WHERE name = 'max_connections') * 100, 2) as utilization_percent
FROM pg_stat_activity
WHERE datname = 'postgres';

Find Documents Missing GL Entries

SELECT
  d.id,
  d.number,
  d.created_at,
  ot.label as document_type,
  COUNT(gl.id) as gl_entries,
  COUNT(li.id) as line_items
FROM documents d
LEFT JOIN general_ledger gl ON gl.document_id = d.id
LEFT JOIN line_items li ON li.document_id = d.id
LEFT JOIN object_types ot ON ot.id = d.object_type_id
WHERE d.created_at > NOW() - INTERVAL '7 days'
  AND d.is_deleted = FALSE
  AND ot.label IN ('Sales Invoice', 'Sales Credit Note', 'Payment')
GROUP BY d.id, d.number, d.created_at, ot.label
HAVING COUNT(gl.id) = 0 AND COUNT(li.id) > 0
ORDER BY d.created_at DESC;

Check Replication Lag

-- Run on PRIMARY to check replica lag
SELECT
  client_addr,
  state,
  pg_wal_lsn_diff(sent_lsn, replay_lsn) AS lag_bytes,
  NOW() - pg_stat_get_wal_receiver_info() AS lag_time
FROM pg_stat_replication;

Monitor Connection Types

SELECT
  application_name,
  client_addr,
  state,
  COUNT(*) as connections
FROM pg_stat_activity
WHERE datname = 'postgres'
GROUP BY application_name, client_addr, state
ORDER BY connections DESC;

๐Ÿงช Testing

Verify Direct Connection Works

// Test script: packages/database/src/test-direct.ts
import { prisma } from "@cona/database";
import { prismaDirect } from "@cona/database/direct";

async function testConnections() {
  // Create document with line items
  const doc = await prisma.documents.create({
    data: {
      // ... document data
      line_items: {
        create: [
          { name: "Item 1", quantity: 1, unit_price: 100 },
          { name: "Item 2", quantity: 2, unit_price: 50 },
        ],
      },
    },
  });

  // Query via pooled (may hit replica)
  const docPooled = await prisma.documents.findUnique({
    where: { id: doc.id },
    include: { line_items: true },
  });

  // Query via direct (hits primary)
  const docDirect = await prismaDirect.documents.findUnique({
    where: { id: doc.id },
    include: { line_items: true },
  });

  console.log("Pooled line items:", docPooled?.line_items?.length);
  console.log("Direct line items:", docDirect?.line_items?.length);
  // Direct should ALWAYS have all line items (2 in this case)
}

โš ๏ธ Important Notes

Connection Pooling Optimizations

Question: Should we optimize Prismaโ€™s connection pooling?
Answer: Yes! While pooling wasnโ€™t the root cause, we can optimize it for better performance.

Current Configuration

According to Prismaโ€™s connection pool documentation, the default pool size is:
num_physical_cpus * 2 + 1
For a 4-core machine: 4 * 2 + 1 = 9 connections

Optimizations Implemented

1. Direct Connection Pool (Conservative)
// packages/database/src/direct-client.ts
connection_limit: 5; // Low limit - direct connections are precious
pool_timeout: 20; // 20s timeout for complex financial operations
connect_timeout: 10; // 10s for initial connection
Why conservative for direct connections:
  • Bypasses Supavisor pooler
  • Connects directly to PRIMARY database
  • Too many direct connections can overwhelm primary
  • Used only for critical operations (financial entries, reconciliation)
2. Pooled Connection (via Supavisor)
// Regular prisma client uses Supabase defaults
connection_limit: Auto-scaled by Supavisor
pool_timeout: 10         // Standard Prisma default
Why more relaxed:
  • Routes through Supavisor connection pooler
  • Load-balanced across read replicas
  • Supavisor handles connection management
  • Used for high-volume queries

Connection Pool Strategy

ClientPool SizeTimeoutUse CaseRationale
prismaDirect520sCritical operationsProtect primary, allow complex queries
prismaDefault10sGeneral queriesLet Supavisor manage scaling

Performance Impact

Direct connections:
  • Slightly higher latency (~5-20ms more)
  • Limited connections (use sparingly)
  • Trade: Acceptable for critical operations
Pooled connections:
  • Lower latency (nearby replica)
  • Unlimited scaling
  • Trade: May have stale data

When Replica Lag Exceeds 2 Seconds

  1. Check Supabase Status: https://status.supabase.com
  2. Contact Supabase Support: โ€œFRA region replica lag > 2sโ€
  3. Request synchronous replication for your organization

๐Ÿš€ Additional Prisma Performance Optimizations

1. Select Field Optimization

Problem: Including all fields wastes bandwidth and memory.
// โŒ BAD: Fetches all fields (including large text fields)
const documents = await prisma.documents.findMany({
  where: { org_id: organizationId },
});

// โœ… GOOD: Only fetch needed fields
const documents = await prisma.documents.findMany({
  where: { org_id: organizationId },
  select: {
    id: true,
    number: true,
    total_amount: true,
    status_id: true,
    // Exclude: logo_url, default_memo, large JSON fields
  },
});
Impact: 50-70% reduction in data transfer for large result sets.

2. Pagination Instead of Fetching All

// โŒ BAD: Fetches all documents (could be thousands)
const allDocs = await prisma.documents.findMany({
  where: { org_id: organizationId },
});

// โœ… GOOD: Offset-based pagination
const pageSize = 100;
const docs = await prisma.documents.findMany({
  take: pageSize,
  skip: (page - 1) * pageSize,
  where: { org_id: organizationId },
  orderBy: { created_at: "desc" },
});

// โœ… BETTER: Cursor-based pagination (no offset, scales better)
const docs = await prisma.documents.findMany({
  take: pageSize,
  cursor: lastDocumentId ? { id: lastDocumentId } : undefined,
  skip: lastDocumentId ? 1 : 0, // Skip the cursor itself
  where: { org_id: organizationId },
  orderBy: { created_at: "desc" },
});
Why cursor-based is better:
  • Offset pagination gets slower with larger offsets
  • Cursor pagination maintains constant performance
  • Essential for workflows processing thousands of documents

3. Batch Operations with createMany and updateMany

// โŒ BAD: N individual queries (N round trips)
for (const item of lineItems) {
  await prisma.line_items.create({
    data: item,
  });
}

// โœ… GOOD: Single batch query (1 round trip)
await prisma.line_items.createMany({
  data: lineItems,
  skipDuplicates: true, // Useful for idempotency
});

// โœ… GOOD: Batch updates
await prisma.documents.updateMany({
  where: {
    id: { in: documentIds },
  },
  data: {
    status_id: newStatusId,
  },
});
Impact: 10-100x faster for bulk operations (depending on batch size).

4. Transaction Batching

// โŒ BAD: Multiple separate transactions
await prisma.documents.create({ data: docData });
await prisma.line_items.createMany({ data: lineItems });
await prisma.activity_logs.create({ data: logData });

// โœ… GOOD: Single transaction (atomic + faster)
await prisma.$transaction([
  prisma.documents.create({ data: docData }),
  prisma.line_items.createMany({ data: lineItems }),
  prisma.activity_logs.create({ data: logData }),
]);

// โœ… BETTER: Interactive transaction (for complex logic)
await prisma.$transaction(async (tx) => {
  const doc = await tx.documents.create({ data: docData });

  // Use created doc ID in subsequent queries
  await tx.line_items.createMany({
    data: lineItems.map((item) => ({
      ...item,
      document_id: doc.id,
    })),
  });

  await tx.activity_logs.create({
    data: { ...logData, documents_id: doc.id },
  });
});
Benefits:
  • Atomic operations (all or nothing)
  • Single round trip for simple transactions
  • Consistent data state

5. Avoid N+1 Queries with Proper Includes

// โŒ BAD: N+1 queries (1 for documents + N for line items)
const documents = await prisma.documents.findMany({
  where: { org_id: organizationId },
});

for (const doc of documents) {
  const lineItems = await prisma.line_items.findMany({
    where: { document_id: doc.id },
  });
  // Process line items...
}

// โœ… GOOD: Single query with JOIN
const documents = await prisma.documents.findMany({
  where: { org_id: organizationId },
  include: {
    line_items: true,
  },
});
But be careful with useDirect:
// For financial operations, include with direct connection
const documents = await prismaDirect.documents.findMany({
  where: { org_id: organizationId },
  include: {
    line_items: true, // Gets complete data from primary
  },
});

6. Use Indexes Effectively

Check if your queries are using indexes:
-- Explain query plan
EXPLAIN ANALYZE
SELECT * FROM documents
WHERE org_id = 'xxx' AND created_at > '2024-01-01';

-- Add index if missing (in migration)
CREATE INDEX idx_documents_org_created
ON documents(org_id, created_at DESC);
In Prisma Schema:
model documents {
  id            String   @id @default(cuid())
  org_id        String
  created_at    DateTime @default(now())

  @@index([org_id, created_at(sort: Desc)])
  @@index([org_id, status_id])
  @@index([org_id, object_type_id, created_at(sort: Desc)])
}

7. Query Logging and Analysis

Enable query logging to identify slow queries:
// packages/database/src/client.ts
const prisma = new PrismaClient({
  log: [
    { level: "query", emit: "event" },
    { level: "error", emit: "stdout" },
    { level: "warn", emit: "stdout" },
  ],
});

// Log slow queries (> 100ms)
prisma.$on("query", (e) => {
  if (e.duration > 100) {
    console.warn("Slow query detected:", {
      query: e.query,
      duration: `${e.duration}ms`,
      params: e.params,
    });
  }
});

8. Prisma Accelerate (Optional - Paid Service)

Prisma Accelerate provides:
  • Global connection pooling (solves serverless connection issues)
  • Query caching at edge (Redis-backed)
  • Automatic query optimization
// If using Accelerate
import { PrismaClient } from "@prisma/client/edge";
import { withAccelerate } from "@prisma/extension-accelerate";

const prisma = new PrismaClient().$extends(withAccelerate());

// Cached queries
const users = await prisma.user.findMany({
  cacheStrategy: {
    ttl: 60, // Cache for 60 seconds
    swr: 10, // Stale-while-revalidate
  },
});
When to consider:
  • Running on serverless (Fly.io) with connection exhaustion
  • Global user base (edge caching beneficial)
  • Repetitive expensive queries
Cost: Starts at $29/mo (may not be worth it yet for CONA)

9. Selective Field Loading for Large JSON/Text

// โŒ BAD: Loads large JSON fields unnecessarily
const documents = await prisma.documents.findMany({
  where: { org_id: organizationId },
  // Loads ALL fields including large custom_properties JSON
});

// โœ… GOOD: Exclude large fields when not needed
const documents = await prisma.documents.findMany({
  where: { org_id: organizationId },
  select: {
    id: true,
    number: true,
    total_amount: true,
    // Exclude: custom_properties, default_memo, logo_url
  },
});

// โœ… When you DO need large fields, only fetch for specific records
const fullDocument = await prisma.documents.findUnique({
  where: { id: documentId },
  // Now fetch everything including large fields
});

10. Parallel Queries for Independent Operations

// โŒ BAD: Sequential queries (total time = sum of all)
const documents = await prisma.documents.findMany({ where: { ... } });
const customers = await prisma.entities.findMany({ where: { ... } });
const accounts = await prisma.general_ledger_accounts.findMany({ where: { ... } });

// โœ… GOOD: Parallel queries (total time = slowest query)
const [documents, customers, accounts] = await Promise.all([
  prisma.documents.findMany({ where: { ... } }),
  prisma.entities.findMany({ where: { ... } }),
  prisma.general_ledger_accounts.findMany({ where: { ... } }),
]);
But be cautious with direct connections:
// Don't exhaust the limited direct connection pool
// Max 5 concurrent queries with prismaDirect
const [doc1, doc2, doc3] = await Promise.all([
  prismaDirect.documents.findUnique({ where: { id: id1 } }),
  prismaDirect.documents.findUnique({ where: { id: id2 } }),
  prismaDirect.documents.findUnique({ where: { id: id3 } }),
]); // Uses 3 of 5 connections temporarily

๐ŸŽฏ Performance Optimization Checklist

Apply these in order of impact:
  • Connection pooling (โœ… Already done)
  • Use select to fetch only needed fields (5-10x faster for large records)
  • Batch operations with createMany/updateMany (10-100x faster)
  • Add database indexes for common query patterns
  • Use transactions for multi-step operations
  • Cursor pagination instead of offset for large datasets
  • Avoid N+1 queries with proper include
  • Parallel independent queries with Promise.all
  • Query logging to identify bottlenecks
  • Consider Prisma Accelerate for serverless at scale

๐ŸŽ“ Key Learnings

  1. Replica lag affects JOINs more than simple queries
    • Parent table syncs fast
    • Child tables (foreign keys) lag behind
    • Queries with include return incomplete data
  2. Silent failures are worse than loud failures
    • Workflow โ€œsucceedsโ€ with incomplete data
    • No retries triggered
    • Discovered too late
  3. Direct connections solve read-after-write
    • Bypass pooler โ†’ No replica routing
    • 100% data consistency
    • Acceptable trade-off for critical operations
  4. Connection pooling is not the root cause
    • Both Prisma and Supabase pooling work fine
    • Issue was replica routing, not pooling
  5. Performance is multi-faceted
    • Connection management (pooling)
    • Query optimization (select, batch, indexes)
    • Application architecture (parallel queries, caching)
    • Right tool for the job (direct vs pooled connections)

๐Ÿš€ Deployment Checklist

  • Add DIRECT_URL to staging environment
  • Build and deploy @cona/database package
  • Build and deploy @cona/core package
  • Build and deploy @cona/temporal-workflows package
  • Deploy temporal workers
  • Monitor for 24-48 hours using SQL queries
  • Verify no documents with missing GL entries
  • Add DIRECT_URL to production environment
  • Deploy to production
  • Monitor production for 48 hours

Status: โœ… IMPLEMENTED
Impact: Critical - Fixes 100% of missing accounting impacts
Last Updated: 2025-01-02
โŒ˜I