Skip to main content

Delimited Data Import System Architecture

Overview

The Delimited Data Import System is a scalable, extensible architecture for processing delimited data files (CSV, TSV, etc.) from various integration sources (PayPal, Shopify, Amazon, etc.) and converting them into CONA documents. The system uses a registry pattern to support multiple file formats and leverages Temporal workflows for reliable batch processing. Supported Formats:
  • CSV (Comma-Separated Values): Used by PayPal Account Statements
  • TSV (Tab-Separated Values): Used by Amazon VAT Tax Reports and Settlement Reports

Core Components

1. Base Parser Interface

The DelimitedDataParser<T> interface defines the contract that all parsers must implement:
interface DelimitedDataParser<T> {
  integrationSlug: string; // e.g., "paypal", "shopify", "amazon"
  objectTypeSlug: string; // e.g., "payment", "order", "settlement"
  validateFormat(content: string): ValidationResult;
  parseRow(row: Record<string, string>, rowIndex: number): T;
  validateRow(row: T, rowIndex: number): ValidationResult;
  getRequiredHeaders(): string[];
  getOptionalHeaders(): string[];
}
Key Methods:
  • validateFormat(): Pre-validates the entire file to ensure it’s the correct format
  • parseRow(): Converts a parsed row into a typed object
  • validateRow(): Validates the parsed object for business rules
  • getRequiredHeaders() / getOptionalHeaders(): Define expected columns

2. Parser Registry

The DelimitedDataParserRegistry manages all available parsers using a key-based lookup system:
class DelimitedDataParserRegistry {
  private parsers = new Map<string, DelimitedDataParser<any>>();

  register<T>(parser: DelimitedDataParser<T>): void;
  getParser(integrationSlug: string, objectTypeSlug: string): DelimitedDataParser<any> | null;
}
Key Features:
  • Composite Key: Uses ${integrationSlug}:${objectTypeSlug} (e.g., “paypal:payment”)
  • Global Instance: Single registry accessible throughout the application
  • Type Safety: Maintains type information for each parser

3. Document Processors

Document processors handle the conversion of parsed CSV records into CONA documents:
interface DocumentProcessor<T> {
  sourceType: string;
  importType: string;
  processBatch(records: CsvImportRecord[]): Promise<ProcessingResult>;
}
Note: Currently, document processing is handled directly in the Temporal workflow rather than through separate processor classes. This may be refactored in the future for better separation of concerns.

Data Flow

1. Upload & Parse Phase

  1. Upload: User uploads file (CSV or TSV) through UI
  2. Format Validation: Parser validates overall file structure
  3. Row Parsing: Each row is parsed into typed objects (PapaParse auto-detects delimiter)
  4. Row Validation: Business rules validation
  5. Database Storage: Valid records stored in csv_import_batches and csv_import_records
  6. Workflow Trigger: Temporal workflow started for processing

2. Processing Phase

Database Schema

csv_import_batches

Tracks import batches with metadata (table name kept for backwards compatibility):
  • id: Unique batch identifier
  • organization_id: Organization context
  • integration_id: Source integration
  • object_type_id: Target object type (payment, order, etc.)
  • filename: Original filename
  • total_records: Total CSV records
  • processed_records: Successfully processed count
  • failed_records: Failed processing count
  • status: pending, processing, completed, failed
  • workflow_id: Temporal workflow ID

csv_import_records

Stores individual records as JSON (table name kept for backwards compatibility):
  • id: Unique record identifier
  • batch_id: Reference to batch
  • record_index: Position in CSV
  • record_data: Parsed CSV data as JSON
  • processed: Processing status
  • document_id: Created document ID (if successful)
  • error_message: Processing error details

Example Implementation: PayPal Parser

Parser Registration

// Register PayPal parser if not already registered
if (!delimitedDataParserRegistry.getParser("paypal", "payment")) {
  delimitedDataParserRegistry.register(new PaypalPaymentsCsvParser());
}

Format Validation

validateFormat(content: string): ValidationResult {
  const lines = content.split("\n");

  // Look for PayPal Account Statement format headers
  const headerLine = lines.find((line) => {
    return line.includes("Datum") &&
           line.includes("Uhrzeit") &&
           line.includes("Beschreibung") &&
           line.includes("Transaktionscode");
  });

  if (!headerLine) {
    return {
      isValid: false,
      errors: ["Not a PayPal Account Statement"]
    };
  }

  return { isValid: true, errors: [] };
}

Row Parsing

parseRow(row: Record<string, string>, rowIndex: number): PaypalPaymentRecord {
  // Normalize German number format (comma to dot)
  const normalizeAmount = (amount: string) => {
    return amount.replace(/\./g, "").replace(/,/g, ".");
  };

  return {
    recordNumber: `${rowIndex + 1}`,
    transactionId: row.Transaktionscode,
    transactionCurrency: row.Währung,
    grossTransactionAmount: normalizeAmount(row.Brutto),
    balance: normalizeAmount(row.Guthaben), // Working capital
    // ... map other fields
  };
}

Usage Patterns

1. Processing Delimited Data Import

// Generic delimited data processing (works for CSV and TSV)
const importResult = await processDelimitedDataImport({
  content,
  integrationId,
  organizationId,
  integrationSlug: "paypal",
  objectTypeSlug: "payment",
  filename,
});

// Start Temporal workflow in batch mode
const workflowId = `paypal-csv-import-${batchId}`;
await client.workflow.start("syncPaypalPaymentsWorkflow", {
  args: [{ integrationId, organizationId, csvBatchId: batchId }],
  taskQueue: PAYPAL_PAYMENTS_TASK_QUEUE,
  workflowId,
});

2. Temporal Workflow Processing

// Workflow determines processing mode
const isCsvMode = !!csvBatchId;

if (isCsvMode && csvBatchId) {
  // CSV Import Processing Mode
  const csvRecords = await getCsvBatchRecordsActivity({
    batchId: csvBatchId,
    offset,
    limit: BATCH_SIZE,
    unprocessedOnly: true,
  });

  // Convert CSV records to documents
  const documentsToCreate = csvRecords.flatMap((csvRecord) => {
    const paypalData = csvRecord.recordData;
    // Transform to document format...
  });
}

Adding New Parsers

1. Create Parser Class

export class ShopifyOrdersParser implements DelimitedDataParser<ShopifyOrderRecord> {
  integrationSlug = "shopify";
  objectTypeSlug = "order";

  getRequiredHeaders(): string[] {
    return ["Name", "Email", "Financial Status", "Total"];
  }

  validateFormat(content: string): ValidationResult {
    // Shopify-specific validation logic
  }

  parseRow(row: Record<string, string>): ShopifyOrderRecord {
    // Convert row to typed object
  }

  validateRow(row: ShopifyOrderRecord): ValidationResult {
    // Business rules validation
  }
}

2. Register Parser

if (!delimitedDataParserRegistry.getParser("shopify", "order")) {
  delimitedDataParserRegistry.register(new ShopifyOrdersParser());
}

3. Update Workflow

Modify the appropriate Temporal workflow to handle the new CSV format, similar to how PayPal CSV processing was added to syncPaypalPaymentsWorkflow.

Error Handling

Validation Levels

  1. Format Validation: Entire CSV structure and type
  2. Header Validation: Required columns presence
  3. Row Validation: Individual record data
  4. Business Validation: Domain-specific rules

Error Recovery

  • Partial Success: Valid records are processed even if some fail
  • Progress Tracking: Real-time updates on processing status
  • Retry Logic: Temporal handles workflow retries automatically
  • Error Logging: Comprehensive error tracking at record and batch level

Performance Considerations

Scalability Features

  • Batch Processing: Handles large files (1M+ records) through pagination
  • Memory Efficiency: Streams through database instead of loading entire CSV
  • Configurable Batch Sizes: Adjustable for different data types
  • Temporal Reliability: Built-in retry and failure handling

Optimization Strategies

  • Early Validation: Fail fast on format issues
  • Duplicate Detection: Prevent re-importing existing records
  • Progress Checkpoints: Resume processing from failure points
  • Parallel Processing: Multiple workflow instances for different batches

Monitoring & Observability

Metrics Tracked

  • Import success rates
  • Processing times per batch
  • Error rates by validation type
  • Record throughput

Logging Points

  • CSV upload and validation
  • Batch creation and status changes
  • Workflow start/completion
  • Document creation results
  • Error details and recovery attempts

Future Enhancements

Planned Features

  • Schema Evolution: Handle CSV format changes over time
  • Data Transformation Pipelines: More complex data mapping
  • Validation Rules Engine: Configurable validation rules
  • Import Templates: Pre-configured import settings
  • Incremental Imports: Delta processing for large datasets

Architecture Improvements

  • Processor Registry: Separate document processing from workflows
  • Plugin System: Dynamic parser loading
  • Configuration UI: Admin interface for parser management
  • Audit Trail: Complete import history and rollback capabilities

Conclusion

The Delimited Data Import System provides a robust, scalable foundation for handling data imports from multiple sources. It supports both CSV and TSV formats through automatic delimiter detection. The registry pattern enables easy extensibility, while Temporal workflows ensure reliable processing of large datasets. The modular design allows teams to independently develop parsers for new integration sources without affecting existing functionality.

Backwards Compatibility

The system maintains backwards compatibility through:
  • Database tables: csv_import_batches and csv_import_records table names are preserved
  • Type aliases: CsvParser, csvParserRegistry, and processCsvImport are available as deprecated aliases
  • Function wrappers: Old function signatures continue to work through wrapper functions