Skip to main content

Data Flow

At vero eos et accusamus et iusto odio dignissimos ducimus qui blanditiis praesentium voluptatum deleniti atque corrupti.

Flux models every pipeline as a DAG of steps. Each step is a pure function from Input → Promise<Output> with an optional side-effect contract.

Reading from a source

import { step, source } from '@flux/core';

const fetchOrders = step('fetch-orders')
.from(source.http({
method: 'GET',
url: 'https://api.example.com/orders',
headers: { authorization: 'Bearer secret://example/token' },
}))
.as<Order[]>();

Transforming

import { z } from 'zod';

const OrderV2 = z.object({
id: z.string(),
total: z.number().nonnegative(),
currency: z.string().length(3),
placedAt: z.string().datetime(),
});

const normalize = step('normalize')
.input<Order[]>()
.map((orders) => orders.map((o) => OrderV2.parse({
id: o.id,
total: o.amount_cents / 100,
currency: o.currency.toUpperCase(),
placedAt: o.placed_at,
})));

Fan-out

Return an array from .map() and Flux will fan out the next step over each element, in parallel up to limits.max_concurrent_runs:

normalize
.map((orders) => orders)
.step(step('enrich').run(enrichOrder))
.step(step('publish').to('kafka://orders.v1'));

Back-pressure

When a downstream sink signals back-pressure (Kafka quota exceeded, Postgres deadlock, …), Flux pauses the producing step and retries with exponential back-off. No step is ever silently dropped.

Ordering

Flux guarantees per-key ordering, not global ordering. If you need strict total order, partition your stream by a single key and set limits.max_concurrent_runs = 1 on that pipeline.