Accounting Job Queue System
The accounting job queue provides a durable, PostgreSQL-backed queue for processing accounting impacts. Jobs are persisted in the database, leased atomically usingFOR UPDATE SKIP LOCKED
, and processed by dedicated Temporal workers.
Architecture
Components
- Queue Table (
accounting_work_queue
): PostgreSQL table storing job state - Queue Functions (
@cona/core
): Functions to enqueue and manage jobs - Temporal Activities: Fetch and batch process jobs
- Temporal Workflow: Continuous polling loop that processes jobs
- Temporal Worker: Dedicated worker pool for the queue
Job States
pending
: Job is ready to be processedleased
: Job has been claimed by a workercompleted
: Job successfully processedfailed
: Job failed after max retriescancelled
: Job cancelled (e.g., document deleted)
Usage
Enqueuing Jobs
Document Readiness
Before enqueuing, documents must be marked as ready:Retry Behavior
Failed jobs are automatically retried with exponential backoff:- Attempt 1: 5 minutes
- Attempt 2: 15 minutes (5 × 3¹)
- Attempt 3: 45 minutes (5 × 3²)
- Attempt 4: 2 hours 15 minutes (5 × 3³)
- Attempt 5: 6 hours (5 × 3⁴, capped)
- Attempt 6+: 6 hours (capped)
failed
and require manual intervention.
Performance
Batch Processing
- Batch size: 50 jobs per poll
- Concurrency: 3 jobs processed in parallel
- Throughput: ~22 jobs/minute average, ~50 jobs/minute peak
Adaptive Polling
The queue worker uses intelligent polling:- Jobs found: Immediately poll again (zero sleep, minimal latency)
- No jobs found: Sleep 10 seconds before next poll (avoid DB load)
Monitoring
Key Metrics
- Pending Jobs: Count of jobs in
pending
state - Processing Time: Time from
ready_at
to completion - Failure Rate: Percentage of jobs reaching
failed
state - Stale Jobs: Jobs in
leased
state > 15 minutes
SQL Queries
Configuration
Environment Variables
ACCOUNTING_QUEUE_BATCH_SIZE
: Number of jobs to fetch per poll (default: 50)ACCOUNTING_QUEUE_POLL_INTERVAL_SECONDS
: Seconds between polls when idle (default: 10)ACCOUNTING_QUEUE_RETRY_BASE_SECONDS
: Base retry delay in seconds (default: 300)
Worker Configuration
Located inapps/temporal-workers/src/worker.ts
:
Error Handling
Transient Errors
Jobs with transient errors (network issues, temporary database unavailability) are automatically retried with exponential backoff.Document State Errors
- Document deleted: Job cancelled immediately
- Document not ready (
accounting_ready_at
is null): Job rescheduled for 30 seconds - Version mismatch: Job rescheduled with updated version (5 second delay)
Permanent Errors
Jobs that fail 6 times are marked asfailed
. These require:
- Investigation of the
last_error
field - Manual correction of the underlying issue
- Resetting the job to
pending
state:
Stale Job Recovery
Jobs stuck inleased
state for > 15 minutes are automatically recovered and included in the next fetch. This handles worker crashes gracefully.
The fetch query includes:
Troubleshooting
Jobs Not Processing
- Check worker is running:
ps aux | grep temporal-worker
- Check Temporal UI for workflow status at
https://cloud.temporal.io
- Check database for pending jobs count
High Failure Rate
- Review
last_error
field in failed jobs - Check database connection pool availability
- Check for schema mismatches or missing object types
- Verify accounting rules are configured
Growing Queue
If the queue is growing faster than it can be processed:- Increase
ACCOUNTING_QUEUE_BATCH_SIZE
(current: 50) - Scale worker instances horizontally
- Check for slow
createFinancialEntries
calls - Verify database performance
Starting the Queue Worker
Start the workflow via Temporal CLI or UI:Implementation Details
Database Schema
Key Files
- Schema:
packages/database/prisma/schema.prisma
- Core Queue:
packages/core/src/accounting/queue/
- Activities:
packages/temporal-workflows/src/activities/accounting/
- Workflows:
packages/temporal-workflows/src/workflows/accounting/
- Worker:
apps/temporal-workers/src/worker.ts
Migration from Old System
The old system usedaccounting_impact_created_at
flag-based locking. The new queue system:
- ✅ Survives worker crashes (durable in PostgreSQL)
- ✅ Supports safe concurrent processing (
FOR UPDATE SKIP LOCKED
) - ✅ Provides observability (query queue state)
- ✅ Handles retries with exponential backoff
- ✅ Decouples document creation from accounting processing
- ✅ Enables horizontal scaling
accounting_impact_created_at
flag is still set for backwards compatibility but is no longer used for locking.