Skip to main content

Executive Summary

This RFC proposes refactoring the sync-amazon-orders.ts workflow (1,836 lines) as preparation for the Integration Adapter Pattern migration. Rather than creating Amazon-specific abstractions that will be discarded, we’ll refactor toward the patterns already proven in PayPal and aligned with the planned adapter interface. Goals:
  • Reduce sync-amazon-orders.ts from 1,836 lines to ~400 lines
  • Eliminate DRY violations between Invoice/Credit Note processing
  • Eliminate duplication between API mode and CSV mode
  • Create helper functions that map directly to the adapter transformToDocuments() pattern
  • Make the eventual adapter extraction trivial (copy-paste ready)
Non-Goals:
  • Creating Amazon-specific abstractions that don’t align with the adapter pattern
  • Changing the workflow’s external behavior
  • Modifying activity interfaces

Problem Statement

Current State

The Amazon orders workflow has the worst DRY violations of all integrations:
MetricShopifyPayPalAmazon
Main workflow LOC1,6275891,836
Has batch processing helper❌ (inline)processPaypalDocumentsBatch❌ (inline)
Document type patterncreateSalesDocuments(type)N/A (single type)❌ Duplicated functions
CSV/API modeSeparate workflows✅ Unified via helpers❌ Duplicated ~500 lines

Specific DRY Violations in Amazon

ViolationLines AffectedDuplication Factor
Invoice vs Credit Note duplicate checking701-760~2x
Invoice vs Credit Note customer processing785-849~2x
Invoice vs Credit Note address processing851-915~2x
Invoice vs Credit Note number allocation917-990~2x
Invoice vs Credit Note document creation992-1144~2x
Invoice vs Credit Note accounting jobs1156-1192~2x
API mode vs CSV mode processing353-1250 vs 1261-1835~2x
Helper functions (VATCustomerInputs vs VATRefundCustomerInputs)order-processing.ts~2x
Total estimated duplicated code: ~800+ lines

Alignment with Integration Adapter Pattern RFC

The adapter RFC (see integration-adapter-pattern.mdx) defines this interface:
interface IntegrationAdapter<TRawData, TResources> {
  transformToDocuments(
    rawData: TRawData[],
    resources: TResources,
    context: TransformContext
  ): TransformResult;
}

interface TransformResult {
  documents: UnifiedDocumentInput[];
  documentConnections?: DocumentConnection[];
  existingDocumentUpdates?: ExistingDocumentUpdate[];
}
The PayPal workflow already follows this pattern via helpers:
// PayPal's pattern (already in codebase)
const paypalTransactions = convertCsvRecordToPaypalTransaction(records);
const documents = convertPaypalTransactionToDocuments(transactions, resources);
const result = await processPaypalDocumentsBatch(documents, resources, organizationId, activities);
Amazon should adopt the same pattern so the adapter extraction is trivial.

Proposed Solution

Phase 1: Consolidate Document Type Processing

Create a configuration-driven approach (like Shopify’s createSalesDocuments):
// workflows/amazon/helpers/document-type-config.ts

export interface AmazonDocumentTypeConfig {
  type: "invoice" | "creditNote";
  objectTypeKey: "salesInvoice" | "salesCreditNote";
  customIdField: "amazon_transaction_id" | "amazon_refund_transaction_id";
  accountingJobType: "sales-invoice" | "sales-credit-note";
  filterOrders: (orders: ParsedVATTaxOrder[]) => ParsedVATTaxOrder[];
}

export const INVOICE_CONFIG: AmazonDocumentTypeConfig = {
  type: "invoice",
  objectTypeKey: "salesInvoice",
  customIdField: "amazon_transaction_id",
  accountingJobType: "sales-invoice",
  filterOrders: (orders) => orders.filter((o) => o.transactionType === "SALE"),
};

export const CREDIT_NOTE_CONFIG: AmazonDocumentTypeConfig = {
  type: "creditNote",
  objectTypeKey: "salesCreditNote",
  customIdField: "amazon_refund_transaction_id",
  accountingJobType: "sales-credit-note",
  filterOrders: (orders) => orders.filter((o) => o.transactionType === "REFUND"),
};
Consolidate helper functions:
// workflows/amazon/helpers/order-processing.ts (consolidated)

// BEFORE: Two nearly identical functions
export function prepareVATCustomerInputs(orders) { ... }
export function prepareVATRefundCustomerInputs(orders) { ... }

// AFTER: Single parameterized function
export function prepareCustomerInputs(
  orders: ParsedVATTaxOrder[],
  config: AmazonDocumentTypeConfig
): CustomerInput[] {
  return orders
    .filter(config.filterOrders)
    .map((order) => ({
      customerId: order.eventId,
      firstName: "Amazon",
      lastName: "Customer",
      email: undefined,
      additionalCustomProperties: {
        amazon_is_b2b: order.isB2B ? "true" : "false",
      },
    }));
}

// Same for addresses, document creation, etc.

Phase 2: Create Batch Processing Pipeline (Follow PayPal Pattern)

// workflows/amazon/helpers/amazon-batch-processing.ts

export interface AmazonBatchProcessingResult {
  processedCount: number;
  duplicateCount: number;
  failedCount: number;
  createdDocumentIds: string[];
}

/**
 * Process Amazon documents through the standard pipeline.
 * Mirrors PayPal's processPaypalDocumentsBatch for consistency.
 */
export async function processAmazonDocumentsBatch(
  orders: ParsedVATTaxOrder[],
  config: AmazonDocumentTypeConfig,
  resources: AmazonOrderResources,
  organizationId: string,
  activities: AmazonBatchProcessingActivities
): Promise<AmazonBatchProcessingResult> {
  // Step 1: Filter by document type
  const filteredOrders = config.filterOrders(orders);
  if (filteredOrders.length === 0) {
    return { processedCount: 0, duplicateCount: 0, failedCount: 0, createdDocumentIds: [] };
  }

  // Step 2: Duplicate check
  const transactionIds = filteredOrders.map((o) => o.eventId);
  const dupCheck = await activities.batchFindExistingDocumentsActivity({
    customPropertyIds: transactionIds,
    organizationId,
    objectTypeId: resources.objectTypes[config.objectTypeKey].id,
    customIdField: config.customIdField,
  });

  const newOrders = filteredOrders.filter((o) => dupCheck.newCustomPropertyIds.includes(o.eventId));
  const duplicateCount = filteredOrders.length - newOrders.length;

  if (newOrders.length === 0) {
    return { processedCount: 0, duplicateCount, failedCount: 0, createdDocumentIds: [] };
  }

  // Step 3: Process customers
  const customerInputs = prepareCustomerInputs(newOrders, config);
  const customerResult =
    customerInputs.length > 0
      ? await activities.batchProcessCustomersActivity({
          customers: customerInputs,
          organizationId,
          objectTypeId: resources.objectTypes.customer.id,
          modifiedByActorId: resources.actor.id,
          customIdField: config.type === "invoice" ? "amazon_order_id" : "amazon_refund_id",
        })
      : { customerMap: {}, customersProcessed: 0 };

  // Step 4: Process addresses
  const addressInputs = prepareAddressInputs(newOrders, customerResult.customerMap, config);
  const addressResult =
    addressInputs.length > 0
      ? await activities.batchProcessAddressesActivity({
          addresses: addressInputs,
          organizationId,
          modifiedByActorId: resources.actor.id,
          updateEntityDefaultAddresses: true,
        })
      : { addressMap: {}, addressesProcessed: 0 };

  // Step 5: Allocate numbers and periods
  const docsToCreate = prepareDocumentsForProcessing(newOrders, config, resources);
  const [numberResult, periodResult] = await Promise.all([
    activities.batchAllocateNumbersActivity({ documentsToCreate: docsToCreate, organizationId }),
    activities.batchProcessAccountingPeriodsActivity({
      documentsToCreate: docsToCreate,
      organizationId,
    }),
  ]);

  // Step 6: Create documents
  const documents = createDocuments(
    newOrders,
    config,
    resources,
    periodResult,
    customerResult.customerMap,
    addressResult.addressMap
  );
  const documentsWithNumbers = attachAllocatedNumbers(documents, numberResult, config, resources);

  const createResult = await createDocumentsInBatches(
    documentsWithNumbers,
    organizationId,
    config,
    periodResult,
    addressResult,
    resources,
    activities
  );

  // Step 7: Enqueue accounting jobs
  if (createResult.documentIds.length > 0) {
    await enqueueAccountingJobsInBatches(
      createResult.documentIds,
      organizationId,
      config.accountingJobType,
      activities
    );
  }

  return {
    processedCount: createResult.documentIds.length,
    duplicateCount,
    failedCount: createResult.failedDocuments?.length || 0,
    createdDocumentIds: createResult.documentIds,
  };
}

Phase 3: Simplify Main Workflow

After phases 1-2, the main workflow becomes:
// workflows/amazon/sync-amazon-orders.ts (simplified to ~400 lines)

export async function syncAmazonOrdersWorkflow(params: SyncAmazonOrdersParams): Promise<void> {
  const { integrationId, organizationId, csvBatchId } = params;

  // ... progress tracking setup (~50 lines)

  try {
    // Initialize resources
    const resources = await initializeAmazonResourcesActivity({ organizationId, integrationId });

    // Fetch data (API or CSV mode - unified helper)
    const orders = await fetchAmazonOrders(params, resources, activities);

    if (orders.length === 0) {
      updateProgress({ status: "completed", currentStep: "No orders to process" });
      return;
    }

    // Process invoices (uses batch processing helper)
    const invoiceResult = await processAmazonDocumentsBatch(
      orders,
      INVOICE_CONFIG,
      resources,
      organizationId,
      activities
    );

    // Process credit notes (uses same batch processing helper!)
    const creditNoteResult = await processAmazonDocumentsBatch(
      orders,
      CREDIT_NOTE_CONFIG,
      resources,
      organizationId,
      activities
    );

    // Update last sync
    await updateIntegrationLastSyncActivity({
      /* ... */
    });

    updateProgress({
      status: "completed",
      currentStep: `Created ${invoiceResult.processedCount} invoices, ${creditNoteResult.processedCount} credit notes`,
    });
  } catch (error) {
    // ... error handling (~20 lines)
  }
}

Phase 4: Unified CSV/API Data Fetching

// workflows/amazon/helpers/data-fetching.ts

/**
 * Unified data fetching for both API and CSV modes.
 * Returns the same ParsedVATTaxOrder[] format regardless of source.
 */
export async function fetchAmazonOrders(
  params: SyncAmazonOrdersParams,
  resources: AmazonOrderResources,
  activities: AmazonDataFetchingActivities
): Promise<ParsedVATTaxOrder[]> {
  if (params.csvBatchId) {
    // CSV mode: fetch from batch records
    return await fetchFromCsvBatch(params.csvBatchId, activities);
  } else {
    // API mode: fetch from Amazon Reports API
    return await fetchFromReportsApi(params, resources, activities);
  }
}

async function fetchFromCsvBatch(
  batchId: string,
  activities: AmazonDataFetchingActivities
): Promise<ParsedVATTaxOrder[]> {
  const records = await activities.getCsvBatchRecordsActivity({ batchId, unprocessedOnly: true });
  return convertCsvRecordsToVATTaxOrders(records);
}

async function fetchFromReportsApi(
  params: SyncAmazonOrdersParams,
  resources: AmazonOrderResources,
  activities: AmazonDataFetchingActivities
): Promise<ParsedVATTaxOrder[]> {
  // ... existing API mode logic
  const reportId = await activities.requestAmazonReportActivity({
    /* ... */
  });
  await pollForReportCompletion(reportId, activities);
  const downloadResult = await activities.downloadAmazonReportActivity({ reportId });
  return downloadResult.orders;
}

Alignment: How This Enables Adapter Migration

After this refactoring, creating the Amazon adapter becomes trivial copy-paste:
// Future: adapters/amazon/orders-adapter.ts

export const amazonOrdersAdapter: IntegrationAdapter<ParsedVATTaxOrder, AmazonOrderResources> = {
  slug: "amazon",
  displayName: "Amazon VAT Orders",

  features: {
    createsCustomers: true,
    createsAddresses: true,
    createsFeeDocuments: false,
    hasDocumentConnections: false, // Invoices/Credit notes are independent
    hasPostProcessing: false,
    hasPaymentAllocations: true,
  },

  fetchConfig: {
    type: "report-download",
    csvParserSlug: "amazon-vat-tax-report",
  },

  async initializeResources(params) {
    return initializeAmazonResourcesActivity(params);
  },

  transformToDocuments(orders, resources, context): TransformResult {
    // REUSE the refactored helpers directly!
    const documents: UnifiedDocumentInput[] = [];

    // Process sales orders
    for (const order of orders.filter((o) => o.transactionType === "SALE")) {
      documents.push(transformVATOrderToUnifiedDocument(order, resources, "invoice"));
    }

    // Process refunds
    for (const order of orders.filter((o) => o.transactionType === "REFUND")) {
      documents.push(transformVATOrderToUnifiedDocument(order, resources, "creditNote"));
    }

    return { documents };
  },
};

// This function is ALREADY written after Phase 1 refactoring
function transformVATOrderToUnifiedDocument(
  order: ParsedVATTaxOrder,
  resources: AmazonOrderResources,
  documentType: "invoice" | "creditNote"
): UnifiedDocumentInput {
  const config = documentType === "invoice" ? INVOICE_CONFIG : CREDIT_NOTE_CONFIG;

  return {
    externalId: order.eventId,
    objectTypeSlug: config.objectTypeKey,
    date: order.transactionDate,
    orderTotal: order.totalGross,
    currency: order.currency,
    lineItems: createLineItemsFromVATTaxOrder(order), // Existing helper
    paymentAllocations: [
      {
        gid: order.eventId,
        gateway: "amazon",
        amount: Math.abs(order.totalGross),
        ratio: 1,
        currency: order.currency,
      },
    ],
    customerInput: {
      customerId: order.eventId,
      firstName: "Amazon",
      lastName: "Customer",
    },
    addressInputs: prepareAddressInputs([order], {}, config),
    customProperties: {
      amazon_transaction_id: order.eventId,
      amazon_marketplace: order.marketplace,
      amazon_is_b2b: order.isB2B ? "true" : "false",
      // ... other properties
    },
  };
}

File Structure After Refactoring

workflows/amazon/
├── sync-amazon-orders.ts              # ~400 lines (down from 1,836)
├── sync-amazon-settlements.ts         # Unchanged (stays custom per adapter RFC)
├── types.ts                           # Unchanged
└── helpers/
    ├── document-type-config.ts        # NEW: Invoice/CreditNote configs (~50 lines)
    ├── amazon-batch-processing.ts     # NEW: processAmazonDocumentsBatch (~200 lines)
    ├── data-fetching.ts               # NEW: Unified API/CSV fetching (~150 lines)
    ├── order-processing.ts            # REFACTORED: Consolidated helpers (~400 lines, down from 854)
    └── csv-conversion.ts              # Unchanged

Migration Plan

Phase 1: Document Type Config (Week 1)

  • Create document-type-config.ts
  • Consolidate duplicate helper functions in order-processing.ts
  • No changes to main workflow yet
  • Add unit tests for consolidated helpers

Phase 2: Batch Processing Helper (Week 2)

  • Create amazon-batch-processing.ts following PayPal pattern
  • Extract batch processing from main workflow
  • Parallel testing: old vs new produce same results

Phase 3: Unified Data Fetching (Week 3)

  • Create data-fetching.ts
  • Unify CSV/API mode data retrieval
  • Simplify main workflow’s mode branching

Phase 4: Main Workflow Simplification (Week 4)

  • Rewrite main workflow using new helpers
  • Full regression testing
  • Deploy to staging

Phase 5: Adapter Extraction (Future - Part of Adapter RFC)

  • Copy refactored helpers into adapter
  • Register Amazon adapter
  • Deprecate standalone workflow

Success Metrics

MetricCurrentAfter RefactoringAfter Adapter Migration
Main workflow LOC1,836~400~0 (uses generic)
Helper total LOC854~600~400 (in adapter)
Duplicate code blocks15+2-30
Time to understand2+ hours30 min15 min

Relationship to Adapter RFC

This RFC is a precursor to the Integration Adapter Pattern RFC:
┌─────────────────────────────────────────────────────────────────┐
│                    THIS RFC (Amazon Refactoring)                 │
│                                                                  │
│  1. Consolidate duplicate code                                   │
│  2. Create batch processing helper (like PayPal)                 │
│  3. Unify CSV/API modes                                          │
│  4. Simplify main workflow                                       │
└─────────────────────────────┬───────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│               ADAPTER RFC (Integration Adapter Pattern)          │
│                                                                  │
│  5. Extract Amazon adapter using refactored helpers              │
│  6. Register with generic workflow orchestrator                  │
│  7. Deprecate standalone workflow                                │
└─────────────────────────────────────────────────────────────────┘
The refactored code is designed to be directly extractable into the adapter interface without additional modification.

References