Most data pipelines start as scripts. Somebody needed to pull data from the CRM API into the warehouse, wrote a Python script that did the job, scheduled it as a cron, and moved on. The script worked. It also had no retry logic, so a transient API error meant a missing day of data. It had no deduplication, so a re-run produced duplicate rows. It had no error path, so a malformed record stopped the entire job. It had no monitoring, so the team only discovered failures when an analyst noticed the data was stale. Each of these is small individually and the kind of thing that bites once a quarter, which is exactly often enough to never quite be the priority to fix.
Pipelines done well have a different shape. Extraction handles transient failures with backoff and the rate limits the source enforces. Transformation includes type coercion, dedupe, and the business rules that turn raw API output into useful warehouse rows. Loading uses idempotent writes so re-runs are safe. Errors route to a dead letter queue with enough context to debug. Scheduling runs at the right cadence with alerting on failures. The discipline is well-known and rarely applied to ad-hoc pipelines because the script worked. The /flux-pipeline skill produces the production version as the default.
Why generalist AI ships fragile pipelines
Ask Cursor or ChatGPT for a pipeline that pulls from an API into Postgres. You get a Python script that fetches the API, parses the JSON, and inserts rows. The script works for the happy path. The first time the API returns a 429, the script crashes. The first time the script is re-run, it inserts duplicates. The first malformed record (a null where a field was expected) crashes the script halfway through, leaving half the data loaded and the team uncertain about what state the warehouse is in. None of these failures are dramatic; they are the small bugs that erode trust in the data.
The other failure mode is the missing scheduling and observability. The script runs by hand or by a cron the team set up months ago and forgot to monitor. When the script fails, nobody knows. The data goes stale, an analyst notices a week later, and the team has to investigate a week of missed runs. A pipeline orchestrator (Airflow, Dagster, Prefect) solves this by making schedules explicit and surfacing failures, but using the orchestrator correctly takes more discipline than running a cron, which is why teams default to cron until the cron has burned them enough times.
What a production pipeline requires
A useful pipeline has six parts. Extraction with retry: backoff on retryable errors, respect for rate limits, checkpointing so a partial failure resumes from the last successful page rather than starting over. Transformation with validation: type coercion at the boundary, dedupe by the source's natural key, business rules that handle edge cases (null fields, malformed timestamps, currency normalization). Loading with idempotency: upserts keyed on the source's natural key so re-runs do not duplicate. Error handling with dead letter: malformed records go to a quarantine table or a dead letter queue with enough context (source row, error reason, timestamp) to debug. Scheduling with the orchestrator: explicit cadence, dependency between jobs, alerting on failure. Monitoring: data freshness checks (the table should have new rows since N hours ago), volume checks (today's row count should be within the expected range).
The discipline is to wire all six together. A pipeline with extraction retry but no idempotent load produces duplicates on retry. A pipeline with idempotent load but no monitoring fails silently. A pipeline with monitoring but no dead letter requires the team to dig through logs to find the bad records. The combined artifact is what /flux-pipeline produces; piecemeal versions of the same pipeline are what generalist tools produce.
How /flux-pipeline works
Step one: characterize source and destination
When invoked, /flux-pipeline asks for the source (REST API, GraphQL, database query, file drop, event stream) and the destination (data warehouse, application database, search index). The characterization drives the extraction strategy: a paginated REST API needs different code from a database query, which needs different code from a Kafka topic. The destination drives the load strategy: a warehouse table benefits from upsert with a merge, an application database benefits from row-level idempotency, a search index benefits from versioned document IDs.
Step two: extraction with retry and checkpointing
The extractor handles the source's specific characteristics: pagination (cursor or offset), rate limits (with the right backoff and 429 retry), incremental fetching (only new data since the last successful run, tracked in a state table), and partial failure recovery (checkpoint after each page so a mid-run failure resumes from the last good checkpoint rather than starting over). The state table is part of the pipeline; without it, every run is a full re-fetch.
Step three: transformation with validation
Transformation includes the type coercion that the source's schema does not provide (strings that should be timestamps, numeric strings that should be integers, currency codes that should be ISO 4217). Dedupe is keyed on the source's natural key, which is usually documented but sometimes has to be inferred. Business rules are applied at this layer: currency normalization to a single currency for reporting, name parsing into first/last, address parsing if needed. Records that fail validation route to the dead letter rather than crashing the pipeline.
Step four: loading with idempotency, scheduling, monitoring
Loading uses upsert keyed on the natural key, so re-runs are safe. The pipeline is registered with the project's orchestrator (Airflow, Dagster, Prefect, or the project's existing tool) with a schedule and dependency on upstream jobs if any. Monitoring covers freshness (data has arrived in the expected window), volume (today's count is within bounds), and error rate (dead letter is below a threshold). Alerts route to the team's standard alerting destination.
The dead letter queue is the most underused pattern in data pipelines. It separates 'something failed' from 'the pipeline failed', so the pipeline can complete the good records while the bad records get triaged separately. /flux-pipeline ships the dead letter pattern by default.
Tonone's /flux-pipeline skill builds production data pipelines with extraction retry, validated transformation, idempotent loading, dead letter handling, scheduling, and monitoring.
When to use /flux-pipeline, and when not to
/flux-pipeline is the right call when moving data between systems on a schedule, when building a warehouse feed or analytics pipeline, when automating a manual data export and import process, or when syncing data from an external vendor API into the team's data store. The signal is when the data flow is recurring and matters to downstream consumers.
Skip the skill for one-off data migrations (use /flux-migrate for schema-level migrations or a one-off script for a single load). For database schema design, /flux-schema is the right call. For diagnosing slow queries that the pipeline produces, /flux-query is calibrated to that work.
| Capability | Tonone | Generalist chatbot | Cursor / Copilot |
|---|---|---|---|
| Extraction with retry and checkpointing | Yes, source-specific backoff and resume | Plain fetch loop | Not in scope |
| Transformation with validation | Yes, type coercion + dedupe + dead letter | Inline type guesses | Not in scope |
| Idempotent loading | Yes, upsert by natural key | Insert-and-hope | Not in scope |
| Orchestrator integration | Yes, Airflow/Dagster/Prefect | Cron | Not in scope |
| Freshness and volume monitoring | Yes, alerts on data quality | Not in scope | Not in scope |
A worked example: HubSpot contacts to warehouse
Suppose the brief is: build a pipeline that syncs HubSpot contacts to the data warehouse hourly. Run /flux-pipeline.
# pipelines/hubspot_contacts/extract.py
import time
import httpx
from backoff import on_exception, expo
@on_exception(expo, httpx.HTTPStatusError, max_tries=5,
giveup=lambda e: e.response.status_code < 500
and e.response.status_code != 429)
def fetch_page(after: str | None, since: int):
res = httpx.get(
'https://api.hubapi.com/crm/v3/objects/contacts',
params={'after': after, 'updatedAfter': since, 'limit': 100},
headers={'Authorization': f'Bearer {token}'},
timeout=30,
)
if res.status_code == 429:
retry_after = int(res.headers.get('Retry-After', 60))
time.sleep(retry_after)
res.raise_for_status() # triggers backoff
res.raise_for_status()
return res.json()
def extract(state):
last_sync = state.get('last_sync_ts', 0)
after = None
while True:
page = fetch_page(after, last_sync)
yield from page['results']
after = page.get('paging', {}).get('next', {}).get('after')
if not after: break
state.checkpoint({'last_after': after, 'last_sync_ts': now()})
# pipelines/hubspot_contacts/transform.py
from pydantic import BaseModel, ValidationError
class Contact(BaseModel):
id: str
email: str
first_name: str | None
last_name: str | None
created_at: datetime
updated_at: datetime
def transform(rows):
for row in rows:
try:
yield Contact.model_validate(row['properties']).model_dump()
except ValidationError as e:
yield {'__dead_letter': True, 'row': row, 'error': str(e)}
# pipelines/hubspot_contacts/load.py
def load(rows):
good, dead = partition(rows, lambda r: not r.get('__dead_letter'))
upsert_into('hubspot_contacts', good, key='id')
insert_into('hubspot_contacts__deadletter', dead)
# Dagster job: schedules hourly, monitors freshness + dead letter rateExtract retries with the right backoff, checkpoints per page, and respects rate limits. Transform validates each row and routes failures to a dead letter rather than crashing the run. Load is idempotent. Dagster handles scheduling, monitoring, and alerting. The pipeline runs hourly without intervention; failures are surfaced through the orchestrator's alerting rather than discovered by analysts who notice the data is stale.
Related skills
/flux-pipeline covers data movement. For the schema the data lands in, /flux-schema is the right call. For zero-downtime migration of an existing schema, /flux-migrate produces the plan. For optimizing the queries the pipeline produces or that consume the loaded data, /flux-query is calibrated to that work.
Install
/flux-pipeline ships with the Flux agent in the Tonone for Claude Code package. Install Tonone, invoke /flux-pipeline from any Claude Code session, and the skill produces a production data pipeline calibrated to the source, destination, and orchestrator.
1. Add to marketplace
2. Install Flux
Pipelines that the team trusts are the ones that handle failure as a routine case rather than a crash. The skill is built so the production patterns are the default.
Frequently asked questions
- What does /flux-pipeline do?
- It builds a production data pipeline with extraction retry, transformation with validation, idempotent loading, dead letter handling for bad records, and orchestrator integration for scheduling and monitoring.
- What orchestrators does /flux-pipeline support?
- Airflow, Dagster, and Prefect are first-class. The skill detects which the project uses and produces the equivalent. For greenfield, Dagster is recommended for its modern dataframe-aware ergonomics.
- How is /flux-pipeline different from a sync script?
- A script handles the happy path. /flux-pipeline handles failure as a routine case: retries on transient errors, dead letter for bad records, idempotent loading on re-run, and monitoring on freshness and volume.
- When should I use /flux-pipeline?
- When moving data between systems on a schedule, building a warehouse feed, automating a manual export/import, or syncing from an external vendor API.
- Does /flux-pipeline handle real-time pipelines?
- Yes, when the source supports streaming (Kafka, Pub/Sub, Kinesis). The same patterns apply: retry, dead letter, idempotent loading, monitoring; the orchestrator is the streaming consumer rather than a scheduled job.
- How do I install /flux-pipeline?
- Install Tonone for Claude Code via the get-started guide at tonone.ai/get-started. /flux-pipeline ships with the Flux agent and is invoked as a slash command in any Claude Code session. Tonone is free and MIT-licensed.
- Is /flux-pipeline free?
- Yes. The skill is part of Tonone, which is MIT-licensed. The only cost is Claude Code token usage during the work.
- Does /flux-pipeline produce dbt-compatible output?
- Yes. For pipelines feeding a warehouse with dbt downstream, the skill produces raw landing tables that dbt models can build on, with the staging layer convention the project uses.