Building Type-Safe ETL Pipelines in TypeScript

A practical guide to implementing Extract, Transform, Load workflows with full type safety

Introduction

ETL (Extract, Transform, Load) is a fundamental pattern in data engineering. Whether you’re syncing CRM data to a warehouse, migrating between databases, or integrating third-party APIs, ETL pipelines are everywhere. But building them in a type-safe, maintainable way has traditionally been challenging.

In this article, we’ll explore how to implement ETL pipelines in TypeScript using OpenETL, a modern framework that brings type safety and abstraction to data integration.

What is ETL?

ETL consists of three phases:

  1. Extract: Pull data from a source (database, API, file)
  2. Transform: Clean, reshape, or enrich the data
  3. Load: Write the data to a destination
┌─────────┐    ┌───────────┐    ┌─────────┐
│ Extract │ ── │ Transform │ ── │  Load   │
└─────────┘    └───────────┘    └─────────┘
    │                │               │
    ▼                ▼               ▼
 Database         Rename           Data
   API            Filter         Warehouse
   File           Merge             API

The Challenge with Traditional ETL

Traditional ETL implementations often suffer from:

  • Tight coupling: Source-specific code mixed with business logic
  • No type safety: Runtime errors from schema mismatches
  • Code duplication: Similar patterns repeated for each integration
  • Hard to test: Database connections embedded in business logic

The Adapter Pattern Solution

OpenETL solves these problems with the adapter pattern. Each data source implements a common interface, allowing you to swap sources without changing your pipeline logic.

// The adapter interface - all sources implement this
interface AdapterInstance {
  connect(): Promise<void>;
  disconnect(): Promise<void>;
  download(options: PageOptions): Promise<{ data: any[] }>;
  upload(data: any[]): Promise<void>;
}

This abstraction means your pipeline code doesn’t care whether data comes from PostgreSQL, MongoDB, or a REST API.

Getting Started

Installation

npm install openetl
npm install @openetl/postgresql  # or any adapter you need

Basic Pipeline

Here’s a simple pipeline that downloads data from PostgreSQL:

import { Orchestrator, Vault, Pipeline } from 'openetl';
import { postgresql } from '@openetl/postgresql';

// 1. Define credentials in a vault
const vault: Vault = {
  'my-database': {
    id: 'my-database',
    type: 'basic',
    credentials: {
      host: 'localhost',
      port: '5432',
      database: 'myapp',
      username: 'user',
      password: 'secret',
    },
  },
};

// 2. Register adapters
const adapters = { postgresql };

// 3. Define the pipeline
const pipeline: Pipeline = {
  id: 'export-users',
  source: {
    id: 'source',
    adapter_id: 'postgresql',
    endpoint_id: 'table_query',
    credential_id: 'my-database',
    fields: ['id', 'email', 'name', 'created_at'],
    config: {
      schema: 'public',
      table: 'users',
    },
  },
};

// 4. Execute
const etl = Orchestrator(vault, adapters);
const result = await etl.runPipeline(pipeline);

console.log(`Exported ${result.data.length} users`);

Working with Multiple Data Sources

The power of abstraction becomes clear when working with multiple sources. The same pipeline structure works regardless of the source:

import { postgresql } from '@openetl/postgresql';
import { mysql } from '@openetl/mysql';
import { mongodb } from '@openetl/mongodb';
import { hubspot } from '@openetl/hubspot';

const adapters = { postgresql, mysql, mongodb, hubspot };

// PostgreSQL source
const postgresSource = {
  adapter_id: 'postgresql',
  endpoint_id: 'table_query',
  config: { schema: 'public', table: 'customers' },
};

// MySQL source - same structure, different adapter
const mysqlSource = {
  adapter_id: 'mysql',
  endpoint_id: 'table_query',
  config: { database: 'sales', table: 'customers' },
};

// MongoDB source - still the same pattern
const mongoSource = {
  adapter_id: 'mongodb',
  endpoint_id: 'collection_query',
  config: { table: 'customers' },
};

// HubSpot API - REST API, same interface
const hubspotSource = {
  adapter_id: 'hubspot',
  endpoint_id: 'contacts',
};

Data Transformations

OpenETL includes 12 built-in transformation types. Transformations are applied in sequence during the pipeline execution:

const pipeline: Pipeline = {
  id: 'transform-contacts',
  source: {
    // ... source config
    transform: [
      // Combine first and last name
      {
        type: 'concat',
        options: {
          properties: ['first_name', 'last_name'],
          glue: ' ',
          to: 'full_name',
        },
      },
      // Normalize email to lowercase
      {
        type: 'lowercase',
        options: { field: 'email' },
      },
      // Remove whitespace
      {
        type: 'trim',
        options: { field: 'full_name' },
      },
      // Extract domain from email
      {
        type: 'extract',
        options: {
          field: 'email',
          pattern: '@(.+)$',
          to: 'email_domain',
        },
      },
    ],
  },
};

Available Transformations

Type Description Example
concat Combine multiple fields ['first', 'last']'full_name'
renameKey Copy/rename a field 'old_field''new_field'
uppercase Convert to uppercase 'hello''HELLO'
lowercase Convert to lowercase 'HELLO''hello'
trim Remove whitespace ' text ''text'
split Split into array 'a,b,c'['a','b','c']
replace Regex replacement 'foo''bar'
addPrefix Add prefix '123''ID-123'
addSuffix Add suffix 'file''file.txt'
toNumber Parse as number '42'42
extract Extract substring 'user@example.com''example.com'
mergeObjects Combine fields into object {a, b}{merged: {a, b}}

Filtering and Sorting

Apply filters and sorting at the source level for efficient data retrieval:

const pipeline: Pipeline = {
  id: 'active-premium-users',
  source: {
    // ...
    filters: [
      { field: 'status', operator: '=', value: 'active' },
      { field: 'plan', operator: '=', value: 'premium' },
      { field: 'created_at', operator: '>=', value: '2024-01-01' },
    ],
    sort: [
      { field: 'created_at', type: 'desc' },
      { field: 'name', type: 'asc' },
    ],
    limit: 1000,
  },
};

Filter Groups

For complex conditions, use filter groups with AND/OR logic:

filters: [
  {
    op: 'OR',
    filters: [
      { field: 'status', operator: '=', value: 'active' },
      { field: 'status', operator: '=', value: 'pending' },
    ],
  },
  { field: 'email_verified', operator: '=', value: 'true' },
]
// SQL: WHERE (status = 'active' OR status = 'pending') AND email_verified = true

Source to Target Pipelines

Move data between systems by defining both source and target:

const pipeline: Pipeline = {
  id: 'sync-crm-to-warehouse',
  source: {
    id: 'hubspot-source',
    adapter_id: 'hubspot',
    endpoint_id: 'contacts',
    credential_id: 'hubspot-api',
    fields: ['email', 'firstname', 'lastname', 'company'],
    pagination: { type: 'cursor', itemsPerPage: 100 },
  },
  target: {
    id: 'postgres-target',
    adapter_id: 'postgresql',
    endpoint_id: 'table_insert',
    credential_id: 'warehouse-db',
    fields: ['email', 'first_name', 'last_name', 'company'],
    config: {
      schema: 'analytics',
      table: 'crm_contacts',
    },
  },
};

const result = await etl.runPipeline(pipeline);
console.log(`Synced ${result.data.length} contacts to warehouse`);

Pagination Strategies

Different APIs use different pagination methods. OpenETL supports three strategies:

// Offset-based (SQL databases)
pagination: { type: 'offset', itemsPerPage: 100 }

// Cursor-based (modern APIs like HubSpot, Stripe)
pagination: { type: 'cursor', itemsPerPage: 100 }

// Page-based (traditional REST APIs)
pagination: { type: 'page', itemsPerPage: 50 }

Error Handling

Configure retry behavior for resilient pipelines:

const pipeline: Pipeline = {
  id: 'resilient-sync',
  source: { /* ... */ },
  error_handling: {
    max_retries: 3,
    retry_interval: 1000,  // ms between retries
    fail_on_error: true,   // stop on first error
  },
};

OpenETL uses exponential backoff with jitter to prevent thundering herd problems when retrying.

Pipeline Validation

Validate pipelines before execution to catch configuration errors early:

import { validatePipeline } from 'openetl';

const result = validatePipeline(pipeline, adapters, vault);

if (!result.valid) {
  console.error('Pipeline validation failed:');
  result.errors.forEach(err => console.error(`  - ${err}`));
} else {
  // Safe to run
  await etl.runPipeline(pipeline);
}

Building Custom Adapters

Create adapters for any data source by implementing the adapter interface:

import { Adapter, AdapterInstance, Connector, AuthConfig } from 'openetl';

const MyCustomAdapter = {
  id: 'my-adapter',
  name: 'My Custom Adapter',
  type: 'http',
  action: ['download', 'upload'],
  credential_type: 'api_key',
  base_url: 'https://api.example.com',
  endpoints: [
    {
      id: 'users',
      path: '/users',
      method: 'GET',
      description: 'Fetch users',
      supported_actions: ['download'],
    },
  ],
};

function myAdapter(connector: Connector, auth: AuthConfig): AdapterInstance {
  return {
    async connect() {
      // Validate connection
    },

    async disconnect() {
      // Cleanup
    },

    async download({ limit, offset }) {
      const response = await fetch(
        `https://api.example.com/users?limit=${limit}&offset=${offset}`,
        { headers: { 'Authorization': `Bearer ${auth.credentials.api_key}` } }
      );
      const data = await response.json();
      return { data };
    },

    async upload(data) {
      await fetch('https://api.example.com/users', {
        method: 'POST',
        body: JSON.stringify(data),
      });
    },
  };
}

export { myAdapter, MyCustomAdapter };

Type Safety Benefits

TypeScript provides compile-time checking for your pipelines:

// Type error: 'invalid' is not a valid pagination type
const pipeline: Pipeline = {
  source: {
    pagination: { type: 'invalid' }, // ❌ TypeScript error
  },
};

// Type error: missing required field
const auth: BasicAuth = {
  id: 'db',
  type: 'basic',
  credentials: {
    username: 'user',
    // password: missing  // ❌ TypeScript error
  },
};

Conclusion

OpenETL brings the benefits of TypeScript to ETL development:

  • Type safety catches configuration errors at compile time
  • Adapter abstraction decouples pipelines from data sources
  • Built-in transformations handle common data manipulation
  • Flexible pagination works with any API pattern
  • Error handling with exponential backoff for resilience

By using these patterns, you can build maintainable, testable ETL pipelines that scale with your data integration needs.

Resources

  • OpenETL GitHub Repository
  • API Documentation
  • Available Adapters
  • JavaScript Spreadsheet Component
  • JavaScript Calendar Component

OpenETL is open source under the MIT license. Contributions welcome!

DevegygiebyOL
DevegygiebyOL
Articles: 1190