Accounting Job Queue System
The accounting job queue provides a durable, PostgreSQL-backed queue for processing accounting impacts. Jobs are persisted in the database, leased atomically usingFOR UPDATE SKIP LOCKED, and processed by dedicated Temporal workers.
Architecture
Components
- Queue Table (
accounting_work_queue): PostgreSQL table storing job state - Queue Functions (
@cona/core): Functions to enqueue and manage jobs - Temporal Activities: Fetch and batch process jobs
- Temporal Workflow: Continuous polling loop that processes jobs
- Temporal Worker: Dedicated worker pool for the queue
Job States
pending: Job is ready to be processedleased: Job has been claimed by a workercompleted: Job successfully processedfailed: Job failed after max retriescancelled: Job cancelled (e.g., document deleted)
Usage
Enqueuing Jobs
Document Readiness
Before enqueuing, documents must be marked as ready:Retry Behavior
Failed jobs are automatically retried with exponential backoff:- Attempt 1: 5 minutes
- Attempt 2: 15 minutes (5 × 3¹)
- Attempt 3: 45 minutes (5 × 3²)
- Attempt 4: 2 hours 15 minutes (5 × 3³)
- Attempt 5: 6 hours (5 × 3⁴, capped)
- Attempt 6+: 6 hours (capped)
failed and require manual intervention.
Performance
Batch Processing
- Batch size: 50 jobs per poll
- Concurrency: 3 jobs processed in parallel
- Throughput: ~22 jobs/minute average, ~50 jobs/minute peak
Adaptive Polling
The queue worker uses intelligent polling:- Jobs found: Immediately poll again (zero sleep, minimal latency)
- No jobs found: Sleep 10 seconds before next poll (avoid DB load)
Monitoring
Key Metrics
- Pending Jobs: Count of jobs in
pendingstate - Processing Time: Time from
ready_atto completion - Failure Rate: Percentage of jobs reaching
failedstate - Stale Jobs: Jobs in
leasedstate > 15 minutes
SQL Queries
Configuration
Environment Variables
ACCOUNTING_QUEUE_BATCH_SIZE: Number of jobs to fetch per poll (default: 50)ACCOUNTING_QUEUE_POLL_INTERVAL_SECONDS: Seconds between polls when idle (default: 10)ACCOUNTING_QUEUE_RETRY_BASE_SECONDS: Base retry delay in seconds (default: 300)
Worker Configuration
Located inapps/temporal-workers/src/worker.ts:
Error Handling
Transient Errors
Jobs with transient errors (network issues, temporary database unavailability) are automatically retried with exponential backoff.Document State Errors
- Document deleted: Job cancelled immediately
- Document not ready (
accounting_ready_atis null): Job rescheduled for 30 seconds - Version mismatch: Job rescheduled with updated version (5 second delay)
Permanent Errors
Jobs that fail 6 times are marked asfailed. These require:
- Investigation of the
last_errorfield - Manual correction of the underlying issue
- Resetting the job to
pendingstate:
Stale Job Recovery
Jobs stuck inleased state for > 15 minutes are automatically recovered and included in the next fetch. This handles worker crashes gracefully.
The fetch query includes:
Troubleshooting
Jobs Not Processing
- Check worker is running:
ps aux | grep temporal-worker - Check Temporal UI for workflow status at
https://cloud.temporal.io - Check database for pending jobs count
High Failure Rate
- Review
last_errorfield in failed jobs - Check database connection pool availability
- Check for schema mismatches or missing object types
- Verify accounting rules are configured
Growing Queue
If the queue is growing faster than it can be processed:- Increase
ACCOUNTING_QUEUE_BATCH_SIZE(current: 50) - Scale worker instances horizontally
- Check for slow
createFinancialEntriescalls - Verify database performance
Starting the Queue Worker
Start the workflow via Temporal CLI or UI:Implementation Details
Database Schema
Key Files
- Schema:
packages/database/prisma/schema.prisma - Core Queue:
packages/core/src/accounting/queue/ - Activities:
packages/temporal-workflows/src/activities/accounting/ - Workflows:
packages/temporal-workflows/src/workflows/accounting/ - Worker:
apps/temporal-workers/src/worker.ts
Migration from Old System
The old system usedaccounting_impact_created_at flag-based locking. The new queue system:
- ✅ Survives worker crashes (durable in PostgreSQL)
- ✅ Supports safe concurrent processing (
FOR UPDATE SKIP LOCKED) - ✅ Provides observability (query queue state)
- ✅ Handles retries with exponential backoff
- ✅ Decouples document creation from accounting processing
- ✅ Enables horizontal scaling
accounting_impact_created_at flag is still set for backwards compatibility but is no longer used for locking.