Skip to main content

Executive Summary

User-triggered accounting impact recreations (clicking “Recreate” in the UI) are currently stuck behind bulk sync backlogs in the global FIFO accounting queue. A user who triggers a single-document recreation may wait several minutes before seeing any effect because hundreds of sync-originated jobs are ahead of them. This RFC proposes two targeted fixes:
  1. Priority column on accounting_work_queue: user-triggered jobs get high priority and are fetched before normal priority sync jobs.
  2. Direct fast-path: for small recreations (≤ 10 documents), skip the queue entirely and process the accounting impact directly within the recreation workflow, which runs on a dedicated higher-capacity worker.
These changes are fully backward-compatible (no breaking changes, no data migration) and can ship in a single PR. Not in scope: migrating to native Temporal task queues (tracked in CONA-862).

Problem

Current Architecture

UI click "Recreate"
  → Server action
  → Temporal recreateAccountingImpactViaQueueWorkflow
  → prepareAndEnqueueAccountingRecreationActivity
      (soft-delete GL entries, bump accounting_ready_version)
  → batchEnqueueAccountingJobs
      → INSERT into accounting_work_queue (ready_at = now)

Separately, singleton poller (syncAccountingQueueWorkflow):
  → fetchAccountingJobsActivity
      SELECT ... ORDER BY ready_at ASC LIMIT 100 FOR UPDATE SKIP LOCKED
  → batchProcessAccountingJobsActivity
      (process 6 jobs concurrently)
  → poll again immediately if work exists, else sleep 10s

Why Users Wait

The queue is globally ordered by ready_at ASC. A bulk sync (e.g. Shopify order sync) can enqueue hundreds of jobs with ready_at ≈ now, all timestamped moments before the user’s recreation job. The user’s job goes to the back of the line. With 6 concurrent slots per batch and each GL creation taking 2-10 seconds (and up to 3 minutes for complex deferred revenue documents), a 500-job backlog means a user waits 1.5 to 15+ minutes before their single recreation is processed.

Solution

Fix 1: Priority Column

Add a priority enum column to accounting_work_queue. The fetch query orders by priority first so high jobs always leapfrog the normal backlog.

Schema Change

model accounting_work_queue {
  // ... existing fields ...
  priority   Int  @default(1)  // 1 = normal, 2 = high (user-triggered)

  @@index([state, priority, ready_at])  // replaces @@index([state, ready_at])
  @@index([org_id])
}
Priority levels:
ValueMeaningSet by
1Normal — default for all automated/sync workAll existing callers
2High — user-triggered recreationsprepareAndEnqueueAccountingRecreationActivity
Integer is preferred over an enum because:
  • No ALTER TYPE migration needed to add levels later (e.g. a 3 = urgent tier in future)
  • ORDER BY priority DESC is clean and obvious
  • Existing Int columns in the table (run_count, version_hint) follow the same pattern

Fetch Query Change

In fetch-accounting-jobs-activity.ts, change the ORDER BY clause:
-- Before
ORDER BY ready_at ASC

-- After
ORDER BY priority DESC, ready_at ASC
This is the only change needed to the hot path. All existing jobs default to normal priority — no data migration required.

Where to Set Priority

Only user-triggered recreations should be 2. Everything else stays at the default 1:
CallerPriority
prepareAndEnqueueAccountingRecreationActivity (UI recreation)2
enqueueAccountingJobsActivity (sync chunk processor)1 (default)
enqueueAccountingJob in finalizeInvoice1 (default)
enqueueAccountingJob in create-payment1 (default)
The batchEnqueueAccountingJobs and enqueueAccountingJob functions accept an optional priority param (defaults to 1). Only the recreation activity passes priority: 2.

Fix 2: Direct Fast-Path for Small Recreations

For recreations of ≤ 10 documents, skip the queue entirely. The recreateAccountingImpactViaQueueWorkflow already runs on the recreate-accounting-impact-batch worker which has higher capacity (10 concurrent activities, 20/sec) than the queue worker (6 concurrent, 6/sec).

Flow Comparison

Large batch (> 10 docs):           Small batch (≤ 10 docs):
  Prepare (delete GL, bump ver)      Prepare (delete GL, bump ver)
    ↓                                  ↓
  Enqueue with priority=high         processAccountingImpactDirectActivity
    ↓                                  ↓  runs on recreate worker directly
  Wait for singleton poller          Done in seconds

  Process (may still wait for
  other high-priority jobs)

New Activity

// packages/temporal-workflows/src/activities/accounting/process-accounting-impact-direct-activity.ts

export async function processAccountingImpactDirectActivity({
  jobs,               // { orgId, documentId, versionHint, documentType }[]
  concurrencyLimit,
}: ProcessAccountingImpactDirectParams): Promise<ProcessAccountingImpactDirectResult> {
  // Same pre-resolution of system actors and status IDs as batchProcessAccountingJobsActivity
  // Same per-job transaction logic (version check, createAccountingImpact, status update)
  // Difference: no queue row to lease/delete — jobs are passed directly as params
}

Workflow Branching

// In recreateAccountingImpactViaQueueWorkflow:

const DIRECT_PROCESSING_THRESHOLD = 10;

const result = await prepareAndEnqueueActivity({ ... });

if (result.jobsEnqueued <= DIRECT_PROCESSING_THRESHOLD) {
  // Fast path: process directly on this worker
  await processAccountingImpactDirectActivity({
    jobs: result.jobs,  // prepareAndEnqueue returns prepared job descriptors
    concurrencyLimit: 6,
  });
} else {
  // Queue path: high-priority enqueue, processed by singleton poller
  // (prepareAndEnqueue already enqueued with priority=high)
}
Note: prepareAndEnqueueAccountingRecreationActivity needs to return the job descriptors (not just counts) so the workflow can pass them directly to processAccountingImpactDirectActivity. This is a minor addition to the existing return type.

Files Changed

FileChange
packages/database/prisma/schema.prismaAdd ACCOUNTING_JOB_PRIORITY enum + priority column + updated index
packages/database/prisma/migrations/...Generated migration
packages/core/src/accounting/queue/enqueue-accounting-job.tsAccept optional priority param
packages/core/src/accounting/queue/batch-enqueue-accounting-jobs.tsAccept optional priority param
packages/temporal-workflows/src/activities/accounting/prepare-and-enqueue-accounting-recreation.tsPass priority: 'high'; return job descriptors in result
packages/temporal-workflows/src/activities/accounting/fetch-accounting-jobs-activity.tsUpdate ORDER BY in raw SQL
packages/temporal-workflows/src/activities/accounting/process-accounting-impact-direct-activity.tsNew — direct fast-path processor
packages/temporal-workflows/src/activities/accounting/index.tsExport new activity
packages/temporal-workflows/src/workflows/accounting/recreate-accounting-impact-via-queue.tsAdd fast-path branching (threshold = 10 docs)
apps/temporal-workers/src/worker.tsRegister new activity on recreateAccountingImpactWorker
apps/internal-docs/core-cona-logic/accounting-job-queue.mdxDocument priority column and fast-path

What Doesn’t Change

  • The accounting_work_queue table structure (only adds a column with a default)
  • The syncAccountingQueueWorkflow poller — only its fetch query changes
  • The batchProcessAccountingJobsActivity — unchanged
  • All existing callers of enqueueAccountingJob and batchEnqueueAccountingJobs — unchanged (default to normal)
  • Retry behavior, stale lease recovery, version coalescing — unchanged

Edge Cases

What if a high-priority job arrives while processing a batch of normal jobs?

The current batch completes, then the next fetchAccountingJobsActivity call picks up high-priority jobs first. Maximum wait = time to finish the current batch of 6 concurrent jobs. In practice, < 30 seconds. If this latency is still too much in the future, the poller could be enhanced to check for high priority jobs mid-batch and preempt — but this is out of scope for this RFC.

Fair-share across organizations

Not addressed in this RFC. A single org running a large bulk sync with high priority could still delay another org’s high priority jobs. A round-robin fetch (one job per org per batch) is documented as a future improvement in CONA-862.

Fast-path failure handling

If processAccountingImpactDirectActivity fails, the Temporal workflow fails (with Temporal’s own retry policy). This is acceptable — the user gets an error and can retry. For small batches this is fine. If more robustness is needed, the failed jobs can be re-enqueued on failure (future improvement).

Future Work

  • CONA-862: Migrate to native Temporal task queues — eliminates the Postgres queue entirely, replaces the singleton poller with per-document workflows on accounting-high / accounting-normal task queues
  • Per-org fair scheduling: Round-robin fetch to prevent one org’s backlog from monopolizing the queue
  • pg_notify wake-up: Notify the poller immediately on high-priority enqueue instead of waiting for the next poll cycle