Skip to main content

Basic Usage

Lorem ipsum dolor sit amet. A handful of short, self-contained examples.

Hello world

import { pipeline, step } from '@flux/core';

export default pipeline('hello').step(
step('greet').run(() => ({ message: 'hello, world' })),
);

HTTP → Postgres

import { pipeline, step, source, sink } from '@flux/core';

export default pipeline('rates.daily')
.schedule('0 6 * * *') // every day at 06:00 UTC
.step(
step('fetch').from(source.http({
url: 'https://api.exchangerate.host/latest?base=EUR',
})),
)
.step(
step('store').to(sink.postgres({
table: 'fx_rates',
conflict: 'upsert:date',
})),
);

SQL → CSV

-- query.sql
SELECT order_id, total_cents, currency, placed_at
FROM orders
WHERE placed_at::date = current_date - 1
ORDER BY placed_at;
import { pipeline, step, source, sink } from '@flux/core';
import { readFileSync } from 'node:fs';

export default pipeline('orders.daily-export')
.step(step('query').from(source.postgres({ sql: readFileSync('./query.sql', 'utf8') })))
.step(step('upload').to(sink.s3({
bucket: 'acme-exports',
key: (ctx) => `orders/${ctx.runDate}.csv`,
format: 'csv',
})));