CSV Import System Architecture
Overview
The CSV Import System is a scalable, extensible architecture for processing CSV files from various integration sources (PayPal, Shopify, Amazon, etc.) and converting them into CONA documents. The system uses a registry pattern to support multiple CSV formats and leverages Temporal workflows for reliable batch processing.Core Components
1. Base Parser Interface
TheCsvParser<T>
interface defines the contract that all CSV parsers must implement:
validateCsvFormat()
: Pre-validates the entire CSV to ensure it’s the correct formatparseRow()
: Converts a CSV row into a typed objectvalidateRow()
: Validates the parsed object for business rulesgetRequiredHeaders()
/getOptionalHeaders()
: Define expected CSV columns
2. Parser Registry
TheCsvParserRegistry
manages all available parsers using a key-based lookup system:
- Composite Key: Uses
${integrationSlug}:${objectTypeSlug}
(e.g., “paypal:payment”) - Global Instance: Single registry accessible throughout the application
- Type Safety: Maintains type information for each parser
3. Document Processors
Document processors handle the conversion of parsed CSV records into CONA documents:Data Flow
1. Upload & Parse Phase
- Upload: User uploads CSV file through UI
- Format Validation: Parser validates overall CSV structure
- Row Parsing: Each row is parsed into typed objects
- Row Validation: Business rules validation
- Database Storage: Valid records stored in
csv_import_batches
andcsv_import_records
- Workflow Trigger: Temporal workflow started for processing
2. Processing Phase
Database Schema
csv_import_batches
Tracks CSV import batches with metadata:id
: Unique batch identifierorganization_id
: Organization contextintegration_id
: Source integrationobject_type_id
: Target object type (payment, order, etc.)filename
: Original filenametotal_records
: Total CSV recordsprocessed_records
: Successfully processed countfailed_records
: Failed processing countstatus
: pending, processing, completed, failedworkflow_id
: Temporal workflow ID
csv_import_records
Stores individual CSV records as JSON:id
: Unique record identifierbatch_id
: Reference to batchrecord_index
: Position in CSVrecord_data
: Parsed CSV data as JSONprocessed
: Processing statusdocument_id
: Created document ID (if successful)error_message
: Processing error details
Example Implementation: PayPal Parser
Parser Registration
Format Validation
Row Parsing
Usage Patterns
1. Processing CSV Import
2. Temporal Workflow Processing
Adding New Parsers
1. Create Parser Class
2. Register Parser
3. Update Workflow
Modify the appropriate Temporal workflow to handle the new CSV format, similar to how PayPal CSV processing was added tosyncPaypalPaymentsWorkflow
.
Error Handling
Validation Levels
- Format Validation: Entire CSV structure and type
- Header Validation: Required columns presence
- Row Validation: Individual record data
- Business Validation: Domain-specific rules
Error Recovery
- Partial Success: Valid records are processed even if some fail
- Progress Tracking: Real-time updates on processing status
- Retry Logic: Temporal handles workflow retries automatically
- Error Logging: Comprehensive error tracking at record and batch level
Performance Considerations
Scalability Features
- Batch Processing: Handles large files (1M+ records) through pagination
- Memory Efficiency: Streams through database instead of loading entire CSV
- Configurable Batch Sizes: Adjustable for different data types
- Temporal Reliability: Built-in retry and failure handling
Optimization Strategies
- Early Validation: Fail fast on format issues
- Duplicate Detection: Prevent re-importing existing records
- Progress Checkpoints: Resume processing from failure points
- Parallel Processing: Multiple workflow instances for different batches
Monitoring & Observability
Metrics Tracked
- Import success rates
- Processing times per batch
- Error rates by validation type
- Record throughput
Logging Points
- CSV upload and validation
- Batch creation and status changes
- Workflow start/completion
- Document creation results
- Error details and recovery attempts
Future Enhancements
Planned Features
- Schema Evolution: Handle CSV format changes over time
- Data Transformation Pipelines: More complex data mapping
- Validation Rules Engine: Configurable validation rules
- Import Templates: Pre-configured import settings
- Incremental Imports: Delta processing for large datasets
Architecture Improvements
- Processor Registry: Separate document processing from workflows
- Plugin System: Dynamic parser loading
- Configuration UI: Admin interface for parser management
- Audit Trail: Complete import history and rollback capabilities