Skip to main content

Temporal Workflow Architecture

This document explains CONA’s event-driven architecture for importing financial data and processing accounting impacts using Temporal workflows.

Core Concept

CONA uses a two-phase approach to financial data processing:
  1. Ingestion Phase - Import data from external sources (Shopify, PayPal) and create documents
  2. Accounting Phase - Generate accounting impact (general ledger entries) for those documents
These phases are completely decoupled through a PostgreSQL queue, enabling independent scaling, resilient failure handling, and idempotent operations.

Architecture Overview


Phase 1: Data Ingestion

Purpose

Transform external financial data into standardized CONA documents with line items.

How It Works

Shopify Integration:
  • Fetches orders and payments via Shopify API
  • Creates sales invoices, credit notes, and payment documents
  • Processes in chunks for memory efficiency
  • Handles customers, addresses, line items (products, shipping, discounts)
PayPal Integration:
  • Reads transactions from CSV uploads
  • Creates payment documents (main payment + optional fee documents)
  • Tracks processing status per CSV record
  • Handles duplicate detection via paypal_payment_id

Key Operations

Result: Documents are created in the database with:
  • Complete line item breakdown
  • accounting_ready_at timestamp (signals ready for accounting)
  • accounting_ready_version = 1 (version tracking for changes)
  • Corresponding job in accounting_work_queue

Phase 2: Accounting Processing

Purpose

Convert business documents into accounting entries (debits and credits) using posting matrix rules.

How It Works

The syncAccountingQueueWorkflow runs as an infinite loop:
  1. Polls the queue for pending jobs (batch of 50)
  2. Atomically leases jobs using FOR UPDATE SKIP LOCKED
  3. Processes 3 jobs concurrently
  4. Creates general ledger entries based on posting rules
  5. Marks jobs as completed
  6. Uses continueAsNew to reset workflow history and loop forever

Job State Machine

Processing Flow


Version Tracking

Why It Matters

Documents can be modified after creation (e.g., adding line items). Version tracking ensures accounting is always processed with the latest data.

How It Works

Key Points:
  • Documents start at version 1
  • Adding line items to a document with accounting increments the version
  • Workers check version before processing
  • Mismatches cause automatic rescheduling with the correct version

Key Design Patterns

1. Idempotency

Every operation can be safely retried without side effects:
  • Duplicate documents detected by external IDs (shopify_order_id, paypal_payment_id)
  • Jobs use unique constraint: one job per document per organization
  • Accounting entries can be deleted and recreated

2. Atomic Operations

All state changes happen within database transactions:
  • Document + line items created atomically
  • Job state changes within transactions
  • GL entries created atomically per document

3. Decoupled Phases

Ingestion and accounting are completely independent:
  • Ingestion workflows can run without accounting being ready
  • Accounting queue can catch up asynchronously
  • Failures in one phase don’t block the other

4. Resilient Failure Handling

Transient errors: Automatic retry with exponential backoff (5min, 15min, 45min, 2h, 6h) Permanent errors: Jobs marked as failed after 6 attempts, require manual investigation Worker crashes: Stale jobs (leased > 15min) automatically recovered

Configuration

Ingestion Workers

  • Shopify: 10 concurrent activities, 20 concurrent workflows
  • PayPal: 10 concurrent activities, 20 concurrent workflows
  • Optimized for high-throughput batch processing

Accounting Queue Worker

  • Concurrency: 3 activities, 5 workflows
  • Batch size: 50 jobs per poll
  • Poll interval: 10 seconds when idle, immediate when active
  • Optimized for consistent, controlled processing

Performance Characteristics

  • Shopify sync: ~250 orders/minute
  • PayPal CSV: ~100 transactions/minute
  • Accounting queue: ~22 jobs/minute average, ~50 jobs/minute peak

Monitoring

Quick Health Checks

-- Pending jobs count
SELECT COUNT(*) FROM accounting_work_queue WHERE state = 'pending';

-- Failed jobs requiring attention
SELECT * FROM accounting_work_queue
WHERE state = 'failed'
ORDER BY updated_at DESC;

-- Stale jobs (potential worker crash)
SELECT * FROM accounting_work_queue
WHERE state = 'leased'
  AND last_attempted_at < NOW() - INTERVAL '15 minutes';

Temporal UI

  • View active workflow executions at https://cloud.temporal.io
  • Track workflow history and activity retries
  • Monitor continue-as-new chains
  • Debug activity failures

When Things Go Wrong

Common Issues

Queue growing faster than processing:
  1. Check worker is running
  2. Increase batch size or concurrency
  3. Scale horizontally (add more workers)
Jobs stuck in failed state:
  1. Check last_error field for error message
  2. Fix underlying issue (missing rules, bad data)
  3. Reset job to pending state
Documents not getting accounting impact:
  1. Check accounting_ready_at is set
  2. Verify job exists in accounting_work_queue
  3. Check posting rules exist for document type
I