Skip to main content

Accounting Job Queue System

The accounting job queue provides a durable, PostgreSQL-backed queue for processing accounting impacts. Jobs are persisted in the database, leased atomically using FOR UPDATE SKIP LOCKED, and processed by dedicated Temporal workers.

Architecture

Components

  1. Queue Table (accounting_work_queue): PostgreSQL table storing job state
  2. Queue Functions (@cona/core): Functions to enqueue and manage jobs
  3. Temporal Activities: Fetch and batch process jobs
  4. Temporal Workflow: Continuous polling loop that processes jobs
  5. Temporal Worker: Dedicated worker pool for the queue

Job States

  • pending: Job is ready to be processed
  • leased: Job has been claimed by a worker
  • completed: Job successfully processed
  • failed: Job failed after max retries
  • cancelled: Job cancelled (e.g., document deleted)

Usage

Enqueuing Jobs

import { enqueueAccountingJob } from "@cona/core";

const result = await enqueueAccountingJob({
  organizationId: "org_123",
  documentId: "doc_456",
  versionHint: BigInt(1),
  readyAt: new Date(), // Optional, defaults to now
});

Document Readiness

Before enqueuing, documents must be marked as ready:
await tx.documents.update({
  where: { id: documentId },
  data: {
    accounting_ready_at: new Date(),
    accounting_ready_version: { increment: 1 },
  },
});

Retry Behavior

Failed jobs are automatically retried with exponential backoff:
  • Attempt 1: 5 minutes
  • Attempt 2: 15 minutes (5 × 3¹)
  • Attempt 3: 45 minutes (5 × 3²)
  • Attempt 4: 2 hours 15 minutes (5 × 3³)
  • Attempt 5: 6 hours (5 × 3⁴, capped)
  • Attempt 6+: 6 hours (capped)
After 6 attempts, jobs are marked as failed and require manual intervention.

Performance

Batch Processing

  • Batch size: 50 jobs per poll
  • Concurrency: 3 jobs processed in parallel
  • Throughput: ~22 jobs/minute average, ~50 jobs/minute peak

Adaptive Polling

The queue worker uses intelligent polling:
  • Jobs found: Immediately poll again (zero sleep, minimal latency)
  • No jobs found: Sleep 10 seconds before next poll (avoid DB load)
This ensures the queue drains quickly when active while being polite to the database when idle.

Monitoring

Key Metrics

  • Pending Jobs: Count of jobs in pending state
  • Processing Time: Time from ready_at to completion
  • Failure Rate: Percentage of jobs reaching failed state
  • Stale Jobs: Jobs in leased state > 15 minutes

SQL Queries

-- Pending jobs count
SELECT COUNT(*) FROM accounting_work_queue WHERE state = 'pending';

-- Failed jobs requiring attention
SELECT * FROM accounting_work_queue
WHERE state = 'failed'
ORDER BY updated_at DESC;

-- Stale leased jobs
SELECT * FROM accounting_work_queue
WHERE state = 'leased'
  AND last_attempted_at < NOW() - INTERVAL '15 minutes';

-- Processing stats
SELECT state, COUNT(*) FROM accounting_work_queue GROUP BY state;

Configuration

Environment Variables

  • ACCOUNTING_QUEUE_BATCH_SIZE: Number of jobs to fetch per poll (default: 50)
  • ACCOUNTING_QUEUE_POLL_INTERVAL_SECONDS: Seconds between polls when idle (default: 10)
  • ACCOUNTING_QUEUE_RETRY_BASE_SECONDS: Base retry delay in seconds (default: 300)

Worker Configuration

Located in apps/temporal-workers/src/worker.ts:
const accountingQueueWorker = await Worker.create({
  taskQueue: ACCOUNTING_QUEUE_TASK_QUEUE,
  maxConcurrentActivityTaskExecutions: 3,
  maxConcurrentWorkflowTaskExecutions: 5,
  // ...
});

Error Handling

Transient Errors

Jobs with transient errors (network issues, temporary database unavailability) are automatically retried with exponential backoff.

Document State Errors

  • Document deleted: Job cancelled immediately
  • Document not ready (accounting_ready_at is null): Job rescheduled for 30 seconds
  • Version mismatch: Job rescheduled with updated version (5 second delay)

Permanent Errors

Jobs that fail 6 times are marked as failed. These require:
  1. Investigation of the last_error field
  2. Manual correction of the underlying issue
  3. Resetting the job to pending state:
import { markAccountingJobPending } from "@cona/core";

await markAccountingJobPending({
  organizationId: "org_123",
  documentId: "doc_456",
  versionHint: BigInt(currentVersion),
  delaySeconds: 0,
});

Stale Job Recovery

Jobs stuck in leased state for > 15 minutes are automatically recovered and included in the next fetch. This handles worker crashes gracefully. The fetch query includes:
WHERE (
  (state = 'pending' AND ready_at <= NOW())
  OR (state = 'leased' AND last_attempted_at < NOW() - INTERVAL '15 minutes')
)

Troubleshooting

Jobs Not Processing

  1. Check worker is running: ps aux | grep temporal-worker
  2. Check Temporal UI for workflow status at https://cloud.temporal.io
  3. Check database for pending jobs count

High Failure Rate

  1. Review last_error field in failed jobs
  2. Check database connection pool availability
  3. Check for schema mismatches or missing object types
  4. Verify accounting rules are configured

Growing Queue

If the queue is growing faster than it can be processed:
  1. Increase ACCOUNTING_QUEUE_BATCH_SIZE (current: 50)
  2. Scale worker instances horizontally
  3. Check for slow createFinancialEntries calls
  4. Verify database performance

Starting the Queue Worker

Start the workflow via Temporal CLI or UI:
temporal workflow start \
  --task-queue accounting-queue \
  --type syncAccountingQueueWorkflow \
  --workflow-id accounting-queue-worker-production \
  --input '{"batchSize": 50, "pollIntervalSeconds": 10}'

Implementation Details

Database Schema

model accounting_work_queue {
  id                String               @id @default(cuid())
  created_at        DateTime             @default(now())
  updated_at        DateTime             @updatedAt
  org_id            String
  document_id       String
  version_hint      BigInt
  ready_at          DateTime             @default(now())
  state             ACCOUNTING_JOB_STATE @default(pending)
  run_count         Int                  @default(0)
  last_attempted_at DateTime?
  last_error        String?

  organization organization @relation(fields: [org_id], references: [id])
  document     documents    @relation(fields: [document_id], references: [id])

  @@unique([org_id, document_id])
  @@index([state, ready_at])
  @@index([org_id])
}

Key Files

  • Schema: packages/database/prisma/schema.prisma
  • Core Queue: packages/core/src/accounting/queue/
  • Activities: packages/temporal-workflows/src/activities/accounting/
  • Workflows: packages/temporal-workflows/src/workflows/accounting/
  • Worker: apps/temporal-workers/src/worker.ts

Migration from Old System

The old system used accounting_impact_created_at flag-based locking. The new queue system:
  • ✅ Survives worker crashes (durable in PostgreSQL)
  • ✅ Supports safe concurrent processing (FOR UPDATE SKIP LOCKED)
  • ✅ Provides observability (query queue state)
  • ✅ Handles retries with exponential backoff
  • ✅ Decouples document creation from accounting processing
  • ✅ Enables horizontal scaling
The accounting_impact_created_at flag is still set for backwards compatibility but is no longer used for locking.
I