Skip to main content

Executive Summary

As CONA scales to support 100+ e-commerce and payment integrations, we need a more maintainable architecture. This RFC proposes an Adapter Pattern that separates integration-specific transformation logic from shared workflow orchestration, reducing code duplication while preserving the unique business logic each integration requires. Key Goals:
  • Reduce time-to-implement new integrations from ~2 weeks to ~2-3 days
  • Minimize code duplication across similar integration workflows
  • Maintain flexibility for integrations with unique business logic (Shopify refunds, PayPal fees, Amazon settlements)
  • Provide clear contracts for integration developers
  • Enable easier testing and maintenance
  • Establish a fixture-based approach for test data and schema discovery
Not Goals:
  • Forcing all integrations into a single generic workflow
  • Removing integration-specific business logic
  • Breaking existing integrations during migration

Problem Statement

Current Architecture

Each integration currently has its own complete Temporal workflow:
packages/temporal-workflows/src/workflows/
├── shopify/
│   ├── sync-shopify-shop.ts          (1,600+ lines)
│   ├── sync-shopify-payments.ts       (400+ lines)
│   └── helpers/
│       ├── order-processing.ts        (1,000+ lines)
│       └── payment-processing.ts      (300+ lines)
├── paypal/
│   ├── sync-paypal-payments.ts        (500+ lines)
│   └── helpers/
│       ├── paypal-document-processing.ts
│       └── paypal-batch-processing.ts
├── amazon/
│   ├── sync-amazon-orders.ts          (1,800+ lines)
│   ├── sync-amazon-settlements.ts     (1,200+ lines)
│   └── helpers/
│       ├── order-processing.ts
│       └── settlement-processing.ts

Problems at Scale

ProblemImpact
Code duplicationSame patterns (dedupe, number allocation, accounting) reimplemented per integration
Inconsistent patternsDifferent approaches to error handling, progress tracking, continue-as-new
High onboarding costNew integrations require understanding entire workflow structure
Maintenance burdenBug fixes or improvements need replication across all workflows
Testing complexityEach 1000+ line workflow needs comprehensive test coverage

Integration Complexity Matrix

Each integration has unique requirements that cannot be abstracted away:
IntegrationUnique Business Logic
Shopify OrdersCreates Sales Order + Invoice + Credit Note per order; Credit notes link back to invoices; Refunds update open amounts on original sales orders
Shopify PaymentsCreates payment + fee documents; Links payments to orders via gateway transaction ID
PayPalCreates 2 documents per transaction (payment + fee); Working Capital Loan detection; CSV and API dual-mode support
Amazon OrdersVAT Tax Report parsing; B2B customer detection; Multiple line item types (shipping, promo, gift wrap); NET vs GROSS price handling
Amazon SettlementsSettlement as virtual bank account; Multiple transaction types (Principal, Tax, Commission, FBA fees); Payout tracking; Reconciliation via Order ID

Proposed Solution: Integration Adapter Pattern

Core Concept

An adapter is a translator that converts data from one format (Shopify, PayPal, Amazon) into a common format (CONA documents). It separates:
  1. “What data to transform” → Integration Adapter (unique per integration)
  2. “How to process documents” → Generic Workflow Orchestrator (shared)
┌─────────────────────────────────────────────────────────────────┐
│                 GENERIC WORKFLOW ORCHESTRATOR                   │
│     (Sequences steps: dedupe → customers → addresses →          │
│      numbers → periods → create docs → accounting)              │
└──────────────────────────┬──────────────────────────────────────┘

    ┌──────────────────────┼──────────────────────┐
    │                      │                      │
    ▼                      ▼                      ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  SHOPIFY    │    │   PAYPAL    │    │   AMAZON    │
│   ADAPTER   │    │   ADAPTER   │    │   ADAPTER   │
│             │    │             │    │             │
│ transform() │    │ transform() │    │ transform() │
│ postProcess()│   │             │    │ postProcess()│
└─────────────┘    └─────────────┘    └─────────────┘
       │                  │                  │
       ▼                  ▼                  ▼
  Shopify JSON       PayPal CSV         Amazon TSV

Adapter Interface

// packages/temporal-workflows/src/adapters/types.ts

/**
 * Integration Adapter Interface
 *
 * Each integration implements this interface to define how its data
 * is transformed into CONA documents.
 *
 * @template TRawData - The raw data type from the integration (e.g., ShopifyOrderEdge)
 * @template TResources - The resources type needed for processing (e.g., ShopifyOrderProcessingResources)
 */
export interface IntegrationAdapter<TRawData, TResources> {
  /** Integration identifier (matches integration.slug) */
  slug: string;

  /** Human-readable name for logging */
  displayName: string;

  // =========================================
  // FEATURE FLAGS
  // =========================================

  /** Declares what features this integration needs */
  features: {
    /** Does this integration create customers? (Shopify: yes, PayPal: no) */
    createsCustomers: boolean;

    /** Does this integration create addresses? (Shopify: yes, PayPal: no) */
    createsAddresses: boolean;

    /** Does this integration create fee documents? (PayPal: yes, Shopify payments: yes) */
    createsFeeDocuments: boolean;

    /** Does this integration link documents together? (Shopify: invoice→order→credit note) */
    hasDocumentConnections: boolean;

    /** Does this integration have special post-processing? (Shopify: update sales order open amounts) */
    hasPostProcessing: boolean;

    /** Does this integration use payment allocations? (PayPal, Amazon: yes for reconciliation) */
    hasPaymentAllocations: boolean;
  };

  // =========================================
  // DATA SOURCE CONFIGURATION
  // =========================================

  /** How to fetch data from this integration */
  fetchConfig: {
    /** Data source type */
    type: "graphql-paginated" | "rest-api" | "csv-batch" | "report-download";

    /** For time-based syncing, how to chunk requests */
    chunkByTime?: boolean;
    defaultChunkHours?: number;

    /** For CSV imports, which parser to use */
    csvParserSlug?: string;
  };

  // =========================================
  // CORE ADAPTER METHODS
  // =========================================

  /**
   * Initialize integration resources (object types, statuses, actors, etc.)
   * This is called once at the start of the workflow.
   */
  initializeResources(params: {
    organizationId: string;
    integrationId: string;
  }): Promise<TResources>;

  /**
   * Transform raw integration data into unified document inputs.
   * This is the main "translation" function.
   *
   * @param rawData - Array of raw records from the integration
   * @param resources - Initialized resources (object types, statuses, etc.)
   * @param context - Workflow context (start time, source type, etc.)
   * @returns Transformed documents and any special instructions (connections, updates)
   */
  transformToDocuments(
    rawData: TRawData[],
    resources: TResources,
    context: TransformContext
  ): TransformResult;

  /**
   * Optional: Integration-specific post-processing after documents are created.
   *
   * Examples:
   * - Shopify: Update sales orders with refund line items, change status to "partially refunded"
   * - Amazon: Update settlement tracking
   *
   * @param createdDocuments - Documents that were successfully created
   * @param rawData - Original raw data for reference
   * @param resources - Integration resources
   */
  postProcess?(
    createdDocuments: CreatedDocument[],
    rawData: TRawData[],
    resources: TResources
  ): Promise<PostProcessResult>;
}

// =========================================
// SUPPORTING TYPES
// =========================================

export interface TransformContext {
  /** Deterministic workflow start time (for Temporal replay safety) */
  workflowStartTime: Date;

  /** Data source type for this sync */
  sourceType: "api" | "csv";

  /** Organization ID */
  organizationId: string;
}

export interface TransformResult {
  /** Documents to create */
  documents: UnifiedDocumentInput[];

  /** Optional: Document connections to establish (parent→child relationships) */
  documentConnections?: DocumentConnection[];

  /** Optional: Updates to apply to existing documents (e.g., Shopify refund→order linking) */
  existingDocumentUpdates?: ExistingDocumentUpdate[];
}

export interface UnifiedDocumentInput {
  // === IDENTIFICATION ===
  /** External ID from integration (Shopify order ID, PayPal txn ID, etc.) */
  externalId: string;

  /** Object type slug (sales_order, invoice, payment, etc.) */
  objectTypeSlug: string;

  // === CORE DOCUMENT DATA ===
  date: Date;
  orderTotal: number;
  currency: string;
  lineItems: UnifiedLineItem[];

  // === OPTIONAL FIELDS ===
  customProperties?: Record<string, unknown>;
  paymentAllocations?: PaymentAllocation[];
  customerInput?: CustomerInput;
  addressInputs?: AddressInput[];

  /** Reference to parent document (for credit notes linking to invoices) */
  parentDocumentRef?: string;

  /** Default memo text */
  defaultMemo?: string;

  /** Notes field */
  notes?: string;
}

export interface UnifiedLineItem {
  name: string;
  description: string;
  quantity: number;
  unitPrice: number;
  taxRate: number;
  type: "product" | "shipping" | "fee" | "discount" | "payment";
  order: number;
  customProperties?: Record<string, unknown>;
}

export interface DocumentConnection {
  parentExternalId: string;
  childExternalId: string;
  relationshipType: string;
}

export interface ExistingDocumentUpdate {
  externalId: string;
  updateType: "add_refund_line_items" | "update_status" | "add_payment_allocation";
  data: unknown;
}

export interface CreatedDocument {
  id: string;
  externalId: string;
  documentNumber: string;
  objectTypeSlug: string;
}

export interface PostProcessResult {
  success: boolean;
  updatedCount?: number;
  errors?: string[];
}

Example: Shopify Orders Adapter

// packages/temporal-workflows/src/adapters/shopify/orders-adapter.ts

import type { IntegrationAdapter, TransformResult, UnifiedDocumentInput } from "../types.js";
import type { ShopifyOrderEdge, ShopifyRefund } from "@cona/types";
import type { ShopifyOrderProcessingResources } from "../../activities/shopify/types.js";

// Reuse existing helper functions!
import {
  createLineItemsFromOrder,
  createLineItemsFromRefund,
  prepareCustomerInputs,
  prepareAddressInputs,
} from "../../workflows/shopify/helpers/order-processing.js";

export const shopifyOrdersAdapter: IntegrationAdapter<
  ShopifyOrderEdge,
  ShopifyOrderProcessingResources
> = {
  slug: "shopify",
  displayName: "Shopify Orders",

  features: {
    createsCustomers: true,
    createsAddresses: true,
    createsFeeDocuments: false,
    hasDocumentConnections: true, // Invoice → Sales Order, Credit Note → Invoice
    hasPostProcessing: true, // Update sales orders with refund amounts
    hasPaymentAllocations: false,
  },

  fetchConfig: {
    type: "graphql-paginated",
    chunkByTime: true,
    defaultChunkHours: 4,
  },

  async initializeResources(params) {
    // Use existing activity - no changes needed!
    return initializeOrderResourcesActivity(params);
  },

  transformToDocuments(orderEdges, resources, context): TransformResult {
    const documents: UnifiedDocumentInput[] = [];
    const documentConnections: DocumentConnection[] = [];
    const existingDocumentUpdates: ExistingDocumentUpdate[] = [];

    for (const edge of orderEdges) {
      const order = edge.node;

      // === SALES ORDER ===
      documents.push({
        externalId: order.id,
        objectTypeSlug: "sales_order",
        date: new Date(order.createdAt),
        orderTotal: parseFloat(order.totalPriceSet.shopMoney.amount),
        currency: order.totalPriceSet.shopMoney.currencyCode,
        lineItems: createLineItemsFromOrder(order), // Reuse existing helper!
        customProperties: {
          shopify_order_id: order.id,
          shopify_order_name: order.name,
          shopify_order_number: order.orderNumber,
        },
        customerInput: prepareCustomerInputs([edge], resources)[0],
        addressInputs: prepareAddressInputs([edge], resources),
      });

      // === SALES INVOICE ===
      documents.push({
        externalId: `${order.id}-invoice`,
        objectTypeSlug: "sales_invoice",
        date: new Date(order.createdAt),
        orderTotal: parseFloat(order.totalPriceSet.shopMoney.amount),
        currency: order.totalPriceSet.shopMoney.currencyCode,
        lineItems: createLineItemsFromOrder(order),
        customProperties: {
          shopify_order_id: order.id,
          shopify_invoice_for_order: order.name,
        },
        parentDocumentRef: order.id, // Links to sales order
      });

      // Connect invoice to sales order
      documentConnections.push({
        parentExternalId: order.id,
        childExternalId: `${order.id}-invoice`,
        relationshipType: "invoice_for_order",
      });

      // === CREDIT NOTES (for refunds) ===
      if (order.refunds?.edges?.length > 0) {
        for (const refundEdge of order.refunds.edges) {
          const refund = refundEdge.node;

          documents.push({
            externalId: refund.id,
            objectTypeSlug: "sales_credit_note",
            date: new Date(refund.createdAt),
            orderTotal: parseFloat(refund.totalRefundedSet?.shopMoney?.amount || "0"),
            currency: order.totalPriceSet.shopMoney.currencyCode,
            lineItems: createLineItemsFromRefund(refund), // Reuse existing helper!
            customProperties: {
              shopify_refund_id: refund.id,
              shopify_order_id: order.id,
            },
            parentDocumentRef: `${order.id}-invoice`, // Links to invoice
          });

          // Connect credit note to invoice
          documentConnections.push({
            parentExternalId: `${order.id}-invoice`,
            childExternalId: refund.id,
            relationshipType: "refund",
          });

          // Track refund to update sales order
          existingDocumentUpdates.push({
            externalId: order.id,
            updateType: "add_refund_line_items",
            data: refund,
          });
        }
      }
    }

    return { documents, documentConnections, existingDocumentUpdates };
  },

  async postProcess(createdDocuments, rawData, resources) {
    // Shopify-specific: Update sales orders with refund line items
    // This uses existing batchUpdateSalesOrdersWithRefundsActivity

    const refundsToProcess = extractRefundsFromRawData(rawData);

    if (refundsToProcess.length > 0) {
      const result = await batchUpdateSalesOrdersWithRefundsActivity({
        refunds: refundsToProcess,
        organizationId: resources.organizationId,
        actorId: resources.actor.id,
        statusIds: {
          refunded: resources.statuses.salesOrder.refunded.id,
          partiallyRefunded: resources.statuses.salesOrder.partiallyRefunded.id,
        },
      });

      return { success: true, updatedCount: result.results.length };
    }

    return { success: true, updatedCount: 0 };
  },
};

Example: PayPal Payments Adapter

// packages/temporal-workflows/src/adapters/paypal/payments-adapter.ts

export const paypalPaymentsAdapter: IntegrationAdapter<
  PaypalTransactionData,
  PaypalPaymentResources
> = {
  slug: "paypal",
  displayName: "PayPal Payments",

  features: {
    createsCustomers: false, // PayPal doesn't give us customer data
    createsAddresses: false,
    createsFeeDocuments: true, // Payment + Fee documents
    hasDocumentConnections: false,
    hasPostProcessing: false,
    hasPaymentAllocations: true, // For reconciliation
  },

  fetchConfig: {
    type: "csv-batch", // Also supports API mode
    csvParserSlug: "paypal-account-statement",
  },

  transformToDocuments(transactions, resources, context): TransformResult {
    const documents: UnifiedDocumentInput[] = [];

    for (const txn of transactions) {
      const amount = parseFloat(txn.grossTransactionAmount || "0");
      const fee = txn.feeAmount ? parseFloat(txn.feeAmount) : 0;

      // === MAIN PAYMENT DOCUMENT ===
      documents.push({
        externalId: txn.transactionId,
        objectTypeSlug: "payment",
        date: new Date(txn.transactionInitiationDate),
        orderTotal: amount,
        currency: txn.transactionCurrency,
        lineItems: [
          {
            name: "PayPal Payment",
            description: txn.transactionSubject || "PayPal transaction",
            quantity: 1,
            unitPrice: amount,
            taxRate: 0,
            type: "payment",
            order: 0,
          },
        ],
        paymentAllocations: [
          {
            gid: txn.relatedTransactionCode || txn.transactionId,
            gateway: "paypal",
            amount: Math.abs(amount),
            ratio: 1,
            currency: txn.transactionCurrency,
          },
        ],
        customProperties: {
          paypal_transaction_id: txn.transactionId,
          paypal_payer_email: txn.payerEmail,
        },
      });

      // === FEE DOCUMENT (PayPal-specific) ===
      if (fee !== 0) {
        documents.push({
          externalId: `${txn.transactionId}-fee`,
          objectTypeSlug: "payment_fee",
          date: new Date(txn.transactionInitiationDate),
          orderTotal: Math.abs(fee),
          currency: txn.transactionCurrency,
          lineItems: [
            {
              name: "PayPal Processing Fee",
              description: `Processing fee for ${txn.transactionId}`,
              quantity: 1,
              unitPrice: Math.abs(fee),
              taxRate: 0,
              type: "fee",
              order: 0,
            },
          ],
          customProperties: {
            paypal_transaction_id: txn.transactionId,
            paypal_fee_type: "processing",
          },
        });
      }
    }

    return { documents };
  },

  // No postProcess needed for PayPal
};

Generic Workflow Orchestrator

// packages/temporal-workflows/src/workflows/generic/sync-integration-workflow.ts

import * as workflow from "@temporalio/workflow";
import type { IntegrationAdapter, TransformResult } from "../../adapters/types.js";

export interface SyncIntegrationParams<TRaw, TRes> {
  integrationId: string;
  organizationId: string;
  adapterSlug: string;
  csvBatchId?: string;
  state?: WorkflowState;
}

/**
 * Generic workflow that processes any integration using the adapter pattern.
 *
 * The adapter defines HOW to transform data.
 * This workflow defines WHEN to run each step.
 */
export async function syncIntegrationWorkflow<TRaw, TRes>(
  params: SyncIntegrationParams<TRaw, TRes>
): Promise<void> {
  const { integrationId, organizationId, adapterSlug, csvBatchId, state } = params;

  // Get adapter from registry
  const adapter = getAdapterFromRegistry(adapterSlug);

  // Setup progress tracking
  const { updateProgress } = setupProgressTracking(integrationId, organizationId);

  try {
    // =========================================
    // STEP 1: INITIALIZE RESOURCES
    // =========================================
    await updateProgress({ status: "initializing", currentStep: "Initializing resources" });

    const resources = await adapter.initializeResources({
      organizationId,
      integrationId,
    });

    // =========================================
    // STEP 2: FETCH DATA
    // =========================================
    await updateProgress({ status: "processing", currentStep: "Fetching data" });

    const rawData = await fetchDataForAdapter(adapter.fetchConfig, params);

    if (rawData.length === 0) {
      await updateProgress({ status: "completed", currentStep: "No new data to sync" });
      return;
    }

    // =========================================
    // STEP 3: TRANSFORM (Adapter does this)
    // =========================================
    await updateProgress({ currentStep: "Transforming data" });

    const transformResult = adapter.transformToDocuments(rawData, resources, {
      workflowStartTime: workflow.workflowInfo().startTime,
      sourceType: csvBatchId ? "csv" : "api",
      organizationId,
    });

    // =========================================
    // STEP 4: DEDUPLICATE
    // =========================================
    await updateProgress({ currentStep: "Checking for duplicates" });

    const { newDocuments, existingDocIds } = await batchFindExistingDocumentsActivity({
      documents: transformResult.documents.map((d) => ({
        documentId: d.externalId,
        objectTypeId: getObjectTypeId(d.objectTypeSlug, resources),
      })),
      organizationId,
    });

    const documentsToCreate = transformResult.documents.filter(
      (d) => !existingDocIds.includes(d.externalId)
    );

    if (documentsToCreate.length === 0) {
      await updateProgress({ status: "completed", currentStep: "All documents already exist" });
      return;
    }

    // =========================================
    // STEP 5: CREATE CUSTOMERS (if adapter needs it)
    // =========================================
    let customerMapping = {};
    if (adapter.features.createsCustomers) {
      await updateProgress({ currentStep: "Processing customers" });

      const customerInputs = documentsToCreate
        .filter((d) => d.customerInput)
        .map((d) => d.customerInput!);

      if (customerInputs.length > 0) {
        customerMapping = await batchProcessCustomersActivity({
          customers: customerInputs,
          organizationId,
          modifiedByActorId: resources.actor.id,
        });
      }
    }

    // =========================================
    // STEP 6: CREATE ADDRESSES (if adapter needs it)
    // =========================================
    let addressMapping = {};
    if (adapter.features.createsAddresses) {
      await updateProgress({ currentStep: "Processing addresses" });

      const addressInputs = documentsToCreate.flatMap((d) => d.addressInputs || []);

      if (addressInputs.length > 0) {
        addressMapping = await batchProcessAddressesActivity({
          addresses: addressInputs,
          organizationId,
          modifiedByActorId: resources.actor.id,
        });
      }
    }

    // =========================================
    // STEP 7: ALLOCATE DOCUMENT NUMBERS
    // =========================================
    await updateProgress({ currentStep: "Allocating document numbers" });

    const numbersResult = await batchAllocateNumbersActivity({
      documentsToCreate: documentsToCreate.map((d) => ({
        documentId: d.externalId,
        date: d.date.toISOString(),
        objectTypeId: getObjectTypeId(d.objectTypeSlug, resources),
      })),
      organizationId,
    });

    // =========================================
    // STEP 8: PROCESS ACCOUNTING PERIODS
    // =========================================
    await updateProgress({ currentStep: "Processing accounting periods" });

    const periodsResult = await batchProcessAccountingPeriodsActivity({
      documentsToCreate: documentsToCreate.map((d) => ({
        documentId: d.externalId,
        date: d.date.toISOString(),
        objectTypeId: getObjectTypeId(d.objectTypeSlug, resources),
      })),
      organizationId,
    });

    // =========================================
    // STEP 9: CREATE DOCUMENTS
    // =========================================
    await updateProgress({ currentStep: "Creating documents" });

    const createResult = await batchCreateDocumentsActivity({
      documents: prepareDocumentsForCreation(
        documentsToCreate,
        resources,
        numbersResult,
        periodsResult,
        customerMapping,
        addressMapping
      ),
    });

    // =========================================
    // STEP 10: CONNECT DOCUMENTS (if adapter needs it)
    // =========================================
    if (adapter.features.hasDocumentConnections && transformResult.documentConnections) {
      await updateProgress({ currentStep: "Connecting documents" });

      await batchConnectDocumentsActivity({
        connections: transformResult.documentConnections.map((c) => ({
          parentId: createResult.documentMap[c.parentExternalId],
          childId: createResult.documentMap[c.childExternalId],
          relationshipType: c.relationshipType,
        })),
      });
    }

    // =========================================
    // STEP 11: POST-PROCESSING (if adapter needs it)
    // =========================================
    if (adapter.features.hasPostProcessing && adapter.postProcess) {
      await updateProgress({ currentStep: "Post-processing" });

      await adapter.postProcess(createResult.createdDocuments, rawData, resources);
    }

    // =========================================
    // STEP 12: ENQUEUE ACCOUNTING
    // =========================================
    await updateProgress({ currentStep: "Enqueuing accounting jobs" });

    await enqueueAccountingJobsActivity({
      documentIds: createResult.createdDocuments.map((d) => d.id),
      organizationId,
    });

    // =========================================
    // STEP 13: UPDATE LAST SYNC
    // =========================================
    await updateIntegrationLastSyncActivity({
      integrationId,
      dataType: adapter.slug,
      organizationId,
      utcNow: workflow.workflowInfo().startTime.toISOString(),
    });

    await updateProgress({
      status: "completed",
      currentStep: `Synced ${createResult.createdDocuments.length} documents`,
    });
  } catch (error) {
    await updateProgress({
      status: "failed",
      currentStep: `Error: ${error instanceof Error ? error.message : "Unknown error"}`,
    });
    throw error;
  }
}

Directory Structure

packages/temporal-workflows/src/
├── adapters/
│   ├── types.ts                    # IntegrationAdapter interface
│   ├── registry.ts                 # Maps slug → adapter
│   ├── shopify/
│   │   ├── orders-adapter.ts       # ~300 lines (uses existing helpers)
│   │   └── payments-adapter.ts
│   ├── paypal/
│   │   └── payments-adapter.ts     # ~200 lines
│   ├── amazon/
│   │   ├── orders-adapter.ts       # ~400 lines
│   │   └── settlements-adapter.ts  # Custom workflow (too unique)
│   └── index.ts
├── workflows/
│   ├── generic/
│   │   └── sync-integration-workflow.ts  # Generic orchestrator (~300 lines)
│   ├── shopify/                    # Keep existing (migration path)
│   │   └── helpers/                # Helpers become adapter internals
│   ├── paypal/
│   ├── amazon/
│   │   └── sync-amazon-settlements.ts   # Stays custom (too complex)
│   └── index.ts
├── activities/                     # UNCHANGED - shared activities
│   ├── documents/
│   ├── entities/
│   └── accounting/
└── utils/

Migration Strategy

Phase 1: Define Adapter Interface (Week 1)

  • Create packages/temporal-workflows/src/adapters/types.ts
  • Create adapter registry
  • No changes to existing workflows

Phase 2: Extract PayPal Adapter (Week 2)

  • Create paypal-payments-adapter.ts
  • PayPal is simplest (no refund tracking, no document connections)
  • Run both old workflow and new adapter-based workflow in parallel
  • Compare outputs for parity

Phase 3: Extract Shopify Orders Adapter (Weeks 3-4)

  • Create shopify-orders-adapter.ts
  • Reuse existing helper functions (no rewrite needed)
  • Handle unique Shopify logic (refunds, connections) in postProcess
  • Validate with real Shopify data

Phase 4: Extract Amazon Orders Adapter (Week 5)

  • Create amazon-orders-adapter.ts
  • Amazon Settlements stays as custom workflow (too unique)

Phase 5: New Integrations Use Adapter Pattern (Ongoing)

  • Etsy, WooCommerce, Stripe → implement adapter only
  • Estimated: 2-3 days per integration

Phase 6: Deprecate Old Workflows (After Validation)

  • Mark old workflows as deprecated
  • Eventually remove after 3+ months of adapter stability

Trade-offs & Risks

Benefits

BenefitImpact
Faster new integrations2 weeks → 2-3 days
Code reduction~1500 lines/integration → ~300 lines
Single fix, all integrationsBug in dedupe logic? Fix once
Clear contractsNew developers know exactly what to implement
TestableAdapters are pure functions, easy to unit test

Risks & Mitigations

RiskMitigation
Generic workflow doesn’t fit edge caseAdapters can use custom workflows via registry
Migration introduces bugsRun old/new in parallel, compare outputs
Team learning curveDocument pattern thoroughly, provide examples
Over-abstractionOnly add features to interface when 2+ integrations need them

When NOT to Use an Adapter

Some integrations are too unique for the generic workflow:
  1. Amazon Settlements - Settlement as virtual bank, complex payout tracking
  2. QuickBooks Sync - Two-way sync with conflict resolution
  3. Bank statement reconciliation - Fundamentally different flow
These should keep custom workflows. The registry supports both:
const integrationRegistry = {
  // Adapter-based integrations
  shopify: { type: "adapter", adapter: shopifyOrdersAdapter },
  paypal: { type: "adapter", adapter: paypalPaymentsAdapter },

  // Custom workflow integrations
  amazon_settlements: { type: "custom", workflow: syncAmazonSettlementsWorkflow },
};

Success Metrics

MetricCurrentTarget
Lines of code per integration~1,500~300
Time to implement new integration2 weeks2-3 days
Shared code coverage~40%~80%
Unique code per integration~60%~20%

Test Data & Schema Discovery Strategy

One of the hardest parts of building integrations is understanding incoming data schemas and obtaining realistic test data. This section defines our approach.

The Core Challenge

You want to build: Etsy Integration
You need to know: What does Etsy order JSON actually look like?
Reality: Documentation says "order object" with 50 fields,
         but which ones are actually populated?
         What are the edge cases?

Fixture Directory Structure

packages/core/src/integrations/
├── fixtures/                       # Test data for all integrations
│   ├── README.md                   # How to add new fixtures
│   ├── shopify/
│   │   ├── orders/
│   │   │   ├── basic-order.json
│   │   │   ├── order-with-refund.json
│   │   │   ├── order-with-multiple-currencies.json
│   │   │   ├── order-with-gift-cards.json
│   │   │   └── _schema.ts          # Zod schema derived from fixtures
│   │   └── payments/
│   │       ├── payout-basic.json
│   │       └── payout-with-fees.json
│   ├── paypal/
│   │   ├── api/
│   │   │   └── transaction.json
│   │   └── csv/
│   │       ├── account-statement-de.csv
│   │       ├── account-statement-us.csv
│   │       └── working-capital-loan.csv
│   └── amazon/
│       ├── vat-report/
│       │   ├── basic-sale.tsv
│       │   ├── refund.tsv
│       │   └── b2b-order.tsv
│       └── settlement/
│           ├── basic-settlement.tsv
│           └── multi-marketplace.tsv

Approaches by Data Source Type

Source TypeTest Data ApproachTools
REST/GraphQL APIsSandbox accounts + Record & ReplayShopify Partner stores, PayPal Sandbox
CSV/TSV ImportsCustomer-provided samples (anonymized)Manual export from platform
Report DownloadsSandbox reports + sample filesAmazon Seller Central test reports

Fixture Capture Script

// packages/core/src/integrations/fixtures/capture.ts

/**
 * Capture real API responses for fixture creation.
 * Run manually when adding a new integration or discovering edge cases.
 *
 * Usage:
 *   SHOPIFY_STORE=xxx SHOPIFY_TOKEN=xxx pnpm capture:fixtures shopify orders
 */

import { writeFileSync } from "fs";
import { anonymizeData } from "./anonymize";

export async function captureShopifyOrders() {
  const response = await shopifyApi.getOrders({ limit: 10 });

  // Anonymize sensitive data
  const anonymized = anonymizeData(response, {
    fields: ["email", "phone", "address1", "name", "company"],
    strategy: "faker", // Replace with realistic fake data
  });

  writeFileSync(
    `fixtures/shopify/orders/captured-${Date.now()}.json`,
    JSON.stringify(anonymized, null, 2)
  );

  console.log("Captured and anonymized. Review before committing.");
}

Schema Contract Testing

Use Zod schemas to define the MINIMUM fields we expect from each API:
// packages/core/src/integrations/shopify/contracts/order.contract.ts

import { z } from "zod";

/**
 * Shopify Order Contract
 *
 * Defines the MINIMUM fields we expect from Shopify.
 * If Shopify changes their API and breaks this, tests fail.
 */
export const ShopifyOrderContract = z
  .object({
    id: z.string(),
    name: z.string(), // e.g., "#1001"
    createdAt: z.string().datetime(),
    totalPriceSet: z.object({
      shopMoney: z.object({
        amount: z.string(),
        currencyCode: z.string(),
      }),
    }),
    lineItems: z.object({
      edges: z.array(
        z.object({
          node: z.object({
            title: z.string(),
            quantity: z.number(),
          }),
        })
      ),
    }),
    // Use .passthrough() to allow extra fields we don't care about
  })
  .passthrough();

// Validate fixtures match contract on CI
import fixture from "../fixtures/orders/basic-order.json";
ShopifyOrderContract.parse(fixture); // Throws if fixture is invalid

Integration Testing Against Fixtures

// packages/core/src/integrations/shopify/__tests__/adapter.test.ts

import { describe, it, expect } from "vitest";
import basicOrder from "../fixtures/orders/basic-order.json";
import orderWithRefund from "../fixtures/orders/order-with-refund.json";
import { shopifyOrdersAdapter } from "../adapters/shopify/orders-adapter";

describe("Shopify Orders Adapter", () => {
  const mockResources = createMockResources();

  it("transforms basic order correctly", () => {
    const result = shopifyOrdersAdapter.transformToDocuments(
      [{ node: basicOrder }],
      mockResources,
      mockContext
    );

    expect(result.documents).toHaveLength(2); // Order + Invoice
    expect(result.documents[0].orderTotal).toBe(100.0);
    expect(result.documents[0].objectTypeSlug).toBe("sales_order");
  });

  it("handles refunds correctly", () => {
    const result = shopifyOrdersAdapter.transformToDocuments(
      [{ node: orderWithRefund }],
      mockResources,
      mockContext
    );

    expect(result.documents).toHaveLength(3); // Order + Invoice + Credit Note
    expect(result.documentConnections).toContainEqual({
      parentExternalId: expect.any(String),
      childExternalId: expect.any(String),
      relationshipType: "refund",
    });
  });
});

Discovery Documentation

For each integration, create a DISCOVERY.md file:
<!-- packages/core/src/integrations/amazon/DISCOVERY.md -->

# Amazon SP-API Discovery Notes

## VAT Tax Report (SC_VAT_TAX_REPORT)

### How to get sample data

1. Go to Seller Central → Reports → Tax Document Library
2. Download any "VAT Invoice" report
3. Anonymize and save as `fixtures/amazon/vat-report/sample-YYYY-MM-DD.tsv`

### Fields we use

| Field                       | Type   | Notes                       |
| --------------------------- | ------ | --------------------------- |
| TRANSACTION_TYPE            | string | "SALE" or "REFUND"          |
| ORDER_ID                    | string | Amazon order ID             |
| PRICE_OF_ITEMS_AMT_VAT_INCL | number | Gross price (VAT inclusive) |
| VAT_RATE                    | number | e.g., 0.19 for 19%          |

### Edge cases discovered

- [ ] B2B orders have `BUYER_VAT_NUMBER` field populated
- [ ] Gift wrap appears as separate line item with own VAT rate
- [ ] RETURN ≠ REFUND (return is movement of goods, not money)
- [ ] Multi-item orders have one row per item, same ORDER_ID

### Fields we ignore

- MARKETPLACE_ID (we get from integration settings)
- SELLER_VAT_NUMBER (we get from org settings)

Test Data Workflow

Data Anonymization Strategy

Always redact sensitive information in test fixtures before committing to git. This is critical for CONA since we handle financial and customer data.

What to Redact vs. Preserve

Data TypeExample FieldsRedaction Strategy
PIINames, emails, phone numbersFaker.js / deterministic hashing
AddressesStreet, city, postal codeGeneric placeholders
FinancialBank accounts, card numbersMasked format (e.g., ****1234)
CredentialsAPI keys, tokensRemove entirely or use [REDACTED]
Business IDsOrder IDs, customer IDsHash or sequential replacement
StructureField names, nesting, array sizesKEEP - Safe to preserve
TypesNumbers stay numbers, dates datesKEEP - Safe to preserve
PatternsCurrency codes, country codesKEEP - Safe to preserve
AmountsOrder totals, line item pricesKEEP (or slightly randomize)
DatesOrder dates, timestampsKEEP (maybe shift by fixed offset)

Anonymization Utility

// packages/core/src/test-utils/anonymize-fixture.ts
import { createHash } from "crypto";

interface AnonymizeOptions {
  /** Preserve monetary amounts (usually safe) */
  preserveAmounts?: boolean;
  /** Shift dates by this many days (0 = keep original) */
  dateShiftDays?: number;
}

/**
 * Deterministic hash - same input always gives same output.
 * This ensures relationships between entities remain intact.
 */
function hashId(id: string, prefix: string): string {
  const hash = createHash("sha256").update(id).digest("hex").slice(0, 12);
  return `gid://${prefix}/${hash}`;
}

export function anonymizeShopifyOrder(
  order: ShopifyOrder,
  options: AnonymizeOptions = {}
): ShopifyOrder {
  return {
    ...order,
    // Keep structure, replace identifying values
    id: hashId(order.id, "shopify_order"),
    name: `#${Math.abs(hashCode(order.id)) % 10000}`, // e.g., #1234

    customer: order.customer
      ? {
          ...order.customer,
          email: `customer_${hashId(order.customer.id, "customer")}@example.com`,
          firstName: "Test",
          lastName: "Customer",
          phone: null,
        }
      : null,

    shippingAddress: order.shippingAddress
      ? {
          ...order.shippingAddress,
          address1: "123 Test Street",
          address2: null,
          city: "Test City",
          zip: "12345",
          name: "Test Customer",
        }
      : null,

    // Keep amounts (usually safe for testing)
    totalPriceSet: order.totalPriceSet,

    // Shift date if configured
    createdAt: options.dateShiftDays
      ? shiftDate(order.createdAt, options.dateShiftDays)
      : order.createdAt,
  };
}

Capture Script with Auto-Anonymization

// scripts/capture-and-anonymize-fixture.ts
import { writeFile } from "fs/promises";
import { anonymizeShopifyOrder, assertNoPII } from "@cona/core/test-utils";

async function captureAndAnonymize(orderId: string) {
  // 1. Fetch real data from sandbox
  const realOrder = await shopifyClient.getOrder(orderId);

  // 2. Anonymize immediately - never store raw PII
  const anonymizedOrder = anonymizeShopifyOrder(realOrder);

  // 3. Verify no PII leaked (throws if patterns detected)
  assertNoPII(anonymizedOrder);

  // 4. Save anonymized version only
  await writeFile(
    `fixtures/shopify/orders/order-${orderId.slice(-6)}.json`,
    JSON.stringify(anonymizedOrder, null, 2)
  );

  console.log(`✅ Saved anonymized fixture for order ${orderId}`);
}

/**
 * Scans JSON for common PII patterns.
 * Add to CI to prevent accidental commits.
 */
export function assertNoPII(data: unknown): void {
  const json = JSON.stringify(data);
  const piiPatterns = [
    // Real email addresses (exclude @example.com which is safe)
    /[a-zA-Z0-9._%+-]+@(?!example\.com)[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/g,
    // Phone numbers (10+ digits)
    /\+?\d{10,}/g,
    // Credit card patterns
    /\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b/g,
    // German postal codes with street names (common in CONA)
    /\b\d{5}\s+[A-Z][a-zäöü]+/g,
  ];

  for (const pattern of piiPatterns) {
    const match = json.match(pattern);
    if (match) {
      throw new Error(`PII detected in fixture: "${match[0]}" (pattern: ${pattern})`);
    }
  }
}

Fixture Storage Layers

LayerAction
CaptureAnonymize immediately on save
StorageOnly commit anonymized fixtures to git
CIAdd PII detection to prevent accidental commits
Local devCan use raw data locally (add to .gitignore)

CI PII Detection Job

# .github/workflows/ci.yml
- name: Check fixtures for PII
  run: |
    pnpm run test:fixtures:pii-check
// packages/core/src/test-utils/check-all-fixtures.ts
import { glob } from "glob";
import { readFile } from "fs/promises";
import { assertNoPII } from "./anonymize-fixture";

async function checkAllFixtures() {
  const fixtures = await glob("**/fixtures/**/*.json");

  for (const file of fixtures) {
    const content = await readFile(file, "utf-8");
    try {
      assertNoPII(JSON.parse(content));
    } catch (error) {
      console.error(`❌ PII found in ${file}:`, error.message);
      process.exit(1);
    }
  }

  console.log(`✅ Checked ${fixtures.length} fixtures - no PII detected`);
}

checkAllFixtures();

Quick Reference: Platform Sandbox Access

PlatformSandbox TypeHow to Access
ShopifyPartner Development Storepartners.shopify.com → Create dev store
PayPalSandbox Accountsdeveloper.paypal.com → Sandbox accounts
AmazonDraft App + Sandbox Modesellercentral.amazon.com → Developer portal
StripeTest ModeUse sk_test_ API keys
WooCommerceLocal WordPress + WooCommerceDocker or LocalWP
EtsySandbox Environmentdevelopers.etsy.com

Decisions Made

1. Continue-as-New Handling

Decision: The generic workflow handles continue-as-new automatically using Temporal’s built-in suggestion. The shouldContinueAsNew() utility checks workflowInfo().continueAsNewSuggested which Temporal sets when the workflow history is getting too large. This is adapter-agnostic and requires no configuration.
// packages/temporal-workflows/src/utils/continue-as-new.ts

import * as workflow from "@temporalio/workflow";

export function shouldContinueAsNew(
  unitIndex: number,
  params: ContinueAsNewWorkflowParams,
  config: ContinueAsNewConfig = {}
): boolean {
  // Check Temporal's built-in suggestion - primary mechanism
  if (workflow.workflowInfo().continueAsNewSuggested) {
    return true;
  }

  // Test hook for easier testing (optional)
  if (params.testContinueAsNew && unitIndex >= config.testMaxUnits) {
    return true;
  }

  return false;
}
Usage in generic workflow:
// In the generic workflow orchestrator
for (let chunkIndex = 0; chunkIndex < chunks.length; chunkIndex++) {
  await processChunk(chunks[chunkIndex]);

  // Check after each chunk - Temporal tells us when history is too large
  if (shouldContinueAsNew(chunkIndex, params)) {
    await continueAsNew<typeof genericIntegrationWorkflow>({
      ...params,
      resumeFromChunk: chunkIndex + 1,
    });
  }
}
Why this approach:
  • No adapter configuration needed
  • Temporal knows best when history is too large
  • Consistent behavior across all integrations
  • Test hook available for CI testing (testContinueAsNew: true)

2. Adapter Packaging Strategy

Decision: Adapters are NOT separate npm packages. They live in the monorepo. All adapters will be co-located in packages/core/src/integrations/adapters/:
packages/core/src/integrations/
├── adapters/
│   ├── shopify/
│   │   ├── orders-adapter.ts
│   │   ├── payments-adapter.ts
│   │   ├── fixtures/
│   │   └── index.ts
│   ├── paypal/
│   │   ├── payments-adapter.ts
│   │   ├── fixtures/
│   │   └── index.ts
│   ├── amazon/
│   │   ├── orders-adapter.ts
│   │   ├── settlements-adapter.ts
│   │   ├── fixtures/
│   │   └── index.ts
│   └── index.ts  ← Registry export
└── index.ts
Why this approach:
  • Simpler dependency management - No version mismatches between adapters
  • Faster iteration - Change adapter + workflow in one PR
  • Shared utilities - Adapters can easily share common transformation helpers
  • Unified testing - Run all adapter tests in one CI job
  • Build time is acceptable - With tree-shaking, unused adapters don’t bloat bundles
When to reconsider:
  • If build times exceed 5+ minutes due to adapter count
  • If external teams need to contribute adapters without monorepo access
  • If adapters need independent release cycles

Open Questions

  1. Where should fixtures live?
    • Option A: packages/core/src/integrations/adapters/{integration}/fixtures/ (close to code)
    • Option B: Separate packages/test-fixtures/ package (shared across packages)
    • Option C: Top-level fixtures/ directory in monorepo root
  2. How do we keep fixtures up-to-date when APIs change?
    • Scheduled CI job to validate fixtures against live sandbox
    • Contract tests that fail on schema drift
    • Manual review during integration maintenance

References


Appendix A: Full Adapter Interface

See the complete TypeScript interface in the “Adapter Interface” section above.

Appendix B: Example Adapters

Complete example adapters for Shopify and PayPal are provided in the “Example” sections above.