Skip to main content

CSV Import System Architecture

Overview

The CSV Import System is a scalable, extensible architecture for processing CSV files from various integration sources (PayPal, Shopify, Amazon, etc.) and converting them into CONA documents. The system uses a registry pattern to support multiple CSV formats and leverages Temporal workflows for reliable batch processing.

Core Components

1. Base Parser Interface

The CsvParser<T> interface defines the contract that all CSV parsers must implement:
interface CsvParser<T> {
  integrationSlug: string; // e.g., "paypal", "shopify"
  objectTypeSlug: string; // e.g., "payment", "order"
  validateCsvFormat(csvContent: string): ValidationResult;
  parseRow(row: Record<string, string>, rowIndex: number): T;
  validateRow(row: T, rowIndex: number): ValidationResult;
  getRequiredHeaders(): string[];
  getOptionalHeaders(): string[];
}
Key Methods:
  • validateCsvFormat(): Pre-validates the entire CSV to ensure it’s the correct format
  • parseRow(): Converts a CSV row into a typed object
  • validateRow(): Validates the parsed object for business rules
  • getRequiredHeaders() / getOptionalHeaders(): Define expected CSV columns

2. Parser Registry

The CsvParserRegistry manages all available parsers using a key-based lookup system:
class CsvParserRegistry {
  private parsers = new Map<string, CsvParser<any>>();

  register<T>(parser: CsvParser<T>): void;
  getParser(integrationSlug: string, objectTypeSlug: string): CsvParser<any> | null;
}
Key Features:
  • Composite Key: Uses ${integrationSlug}:${objectTypeSlug} (e.g., “paypal:payment”)
  • Global Instance: Single registry accessible throughout the application
  • Type Safety: Maintains type information for each parser

3. Document Processors

Document processors handle the conversion of parsed CSV records into CONA documents:
interface DocumentProcessor<T> {
  sourceType: string;
  importType: string;
  processBatch(records: CsvImportRecord[]): Promise<ProcessingResult>;
}
Note: Currently, document processing is handled directly in the Temporal workflow rather than through separate processor classes. This may be refactored in the future for better separation of concerns.

Data Flow

1. Upload & Parse Phase

  1. Upload: User uploads CSV file through UI
  2. Format Validation: Parser validates overall CSV structure
  3. Row Parsing: Each row is parsed into typed objects
  4. Row Validation: Business rules validation
  5. Database Storage: Valid records stored in csv_import_batches and csv_import_records
  6. Workflow Trigger: Temporal workflow started for processing

2. Processing Phase

Database Schema

csv_import_batches

Tracks CSV import batches with metadata:
  • id: Unique batch identifier
  • organization_id: Organization context
  • integration_id: Source integration
  • object_type_id: Target object type (payment, order, etc.)
  • filename: Original filename
  • total_records: Total CSV records
  • processed_records: Successfully processed count
  • failed_records: Failed processing count
  • status: pending, processing, completed, failed
  • workflow_id: Temporal workflow ID

csv_import_records

Stores individual CSV records as JSON:
  • id: Unique record identifier
  • batch_id: Reference to batch
  • record_index: Position in CSV
  • record_data: Parsed CSV data as JSON
  • processed: Processing status
  • document_id: Created document ID (if successful)
  • error_message: Processing error details

Example Implementation: PayPal Parser

Parser Registration

// Register PayPal parser if not already registered
if (!csvParserRegistry.getParser("paypal", "payment")) {
  csvParserRegistry.register(new PaypalPaymentsCsvParser());
}

Format Validation

validateCsvFormat(csvContent: string): ValidationResult {
  const lines = csvContent.split("\n");

  // Look for PayPal-specific format markers
  const hasDataHeader = lines.some((line) => line.startsWith("RD,"));

  if (!hasDataHeader) {
    return {
      isValid: false,
      errors: ["Not a PayPal Balance Reconciliation Report"]
    };
  }

  return { isValid: true, errors: [] };
}

Row Parsing

parseRow(row: Record<string, string>, rowIndex: number): PaypalPaymentRecord {
  return {
    recordNumber: row.Datensatznummer,
    recordType: row["Art des Datensatzes"],
    transactionCurrency: row.Transaktionswährung,
    netTransactionAmount: row.Nettotransaktionsbetrag,
    // ... map other fields
  };
}

Usage Patterns

1. Processing CSV Import

// Generic CSV processing
const csvResult = await processCsvImport({
  csvContent,
  integrationId,
  organizationId,
  integrationSlug: "paypal",
  objectTypeId,
  objectTypeSlug: "payment",
  filename,
});

// Start Temporal workflow in CSV mode
const workflowId = `paypal-csv-import-${batchId}`;
await client.workflow.start("syncPaypalPaymentsWorkflow", {
  args: [{ integrationId, organizationId, csvBatchId: batchId }],
  taskQueue: PAYPAL_PAYMENTS_TASK_QUEUE,
  workflowId,
});

2. Temporal Workflow Processing

// Workflow determines processing mode
const isCsvMode = !!csvBatchId;

if (isCsvMode && csvBatchId) {
  // CSV Import Processing Mode
  const csvRecords = await getCsvBatchRecordsActivity({
    batchId: csvBatchId,
    offset,
    limit: BATCH_SIZE,
    unprocessedOnly: true,
  });

  // Convert CSV records to documents
  const documentsToCreate = csvRecords.flatMap((csvRecord) => {
    const paypalData = csvRecord.recordData;
    // Transform to document format...
  });
}

Adding New Parsers

1. Create Parser Class

export class ShopifyOrdersCsvParser implements CsvParser<ShopifyOrderRecord> {
  integrationSlug = "shopify";
  objectTypeSlug = "order";

  getRequiredHeaders(): string[] {
    return ["Name", "Email", "Financial Status", "Total"];
  }

  validateCsvFormat(csvContent: string): ValidationResult {
    // Shopify-specific validation logic
  }

  parseRow(row: Record<string, string>): ShopifyOrderRecord {
    // Convert CSV row to typed object
  }

  validateRow(row: ShopifyOrderRecord): ValidationResult {
    // Business rules validation
  }
}

2. Register Parser

if (!csvParserRegistry.getParser("shopify", "order")) {
  csvParserRegistry.register(new ShopifyOrdersCsvParser());
}

3. Update Workflow

Modify the appropriate Temporal workflow to handle the new CSV format, similar to how PayPal CSV processing was added to syncPaypalPaymentsWorkflow.

Error Handling

Validation Levels

  1. Format Validation: Entire CSV structure and type
  2. Header Validation: Required columns presence
  3. Row Validation: Individual record data
  4. Business Validation: Domain-specific rules

Error Recovery

  • Partial Success: Valid records are processed even if some fail
  • Progress Tracking: Real-time updates on processing status
  • Retry Logic: Temporal handles workflow retries automatically
  • Error Logging: Comprehensive error tracking at record and batch level

Performance Considerations

Scalability Features

  • Batch Processing: Handles large files (1M+ records) through pagination
  • Memory Efficiency: Streams through database instead of loading entire CSV
  • Configurable Batch Sizes: Adjustable for different data types
  • Temporal Reliability: Built-in retry and failure handling

Optimization Strategies

  • Early Validation: Fail fast on format issues
  • Duplicate Detection: Prevent re-importing existing records
  • Progress Checkpoints: Resume processing from failure points
  • Parallel Processing: Multiple workflow instances for different batches

Monitoring & Observability

Metrics Tracked

  • Import success rates
  • Processing times per batch
  • Error rates by validation type
  • Record throughput

Logging Points

  • CSV upload and validation
  • Batch creation and status changes
  • Workflow start/completion
  • Document creation results
  • Error details and recovery attempts

Future Enhancements

Planned Features

  • Schema Evolution: Handle CSV format changes over time
  • Data Transformation Pipelines: More complex data mapping
  • Validation Rules Engine: Configurable validation rules
  • Import Templates: Pre-configured import settings
  • Incremental Imports: Delta processing for large datasets

Architecture Improvements

  • Processor Registry: Separate document processing from workflows
  • Plugin System: Dynamic parser loading
  • Configuration UI: Admin interface for parser management
  • Audit Trail: Complete import history and rollback capabilities

Conclusion

The CSV Import System provides a robust, scalable foundation for handling data imports from multiple sources. The registry pattern enables easy extensibility, while Temporal workflows ensure reliable processing of large datasets. The modular design allows teams to independently develop parsers for new integration sources without affecting existing functionality.
I