← ClaudeAtlas

pipeline-orchestratorlisted

Chain a scrape -> AI -> alert pipeline where each stage is a plain callable, with exponential-backoff retry per stage and a JSON state checkpoint between stages so a crashed or rate-limited run resumes instead of restarting. Use for chain pipeline steps, scrape then summarize then notify, retry between stages, resumable pipeline, or orchestrate a job.
barobaonguyen/ai-automation-skills · ★ 0 · Data & Documents · score 75
Install: claude install-skill barobaonguyen/ai-automation-skills
# Pipeline Orchestrator Use this skill when an automation job is really a chain of steps - scrape, then run it through an AI model, then send an alert - and you want each step to retry on transient failure and the whole run to resume from the last good step. It is a generic, dependency-light runner: stages are ordinary functions, and state is checkpointed to a JSON file so a 429 in the AI step never forces a full re-scrape. ## When to invoke - User says: "chain these steps" / "scrape then summarize then notify" / "make the pipeline resumable" / "retry between stages" - Code in the conversation uses: a script that does fetch -> transform -> deliver in sequence and fails partway through. ## When NOT to invoke - The work is a single call with no real stages. - The user needs a distributed DAG engine (Airflow, Temporal) with workers and a scheduler, not a single-process chain. ## Concrete example User input: ```text My nightly job scrapes a board, asks Gemini to summarize, then pushes Telegram. If Gemini rate-limits, don't re-scrape. ``` Output: ```python # Copy assets/pipeline.py into your project, then: from pipeline import run_pipeline def scrape(_): return fetch_board() # your scraper def summarize(rows): return gemini_summary(rows) # your AI step def alert(text): return send_telegram(text) # see telegram-alerter # checkpoints after each stage; a crash in summarize resumes there, not at scrape result = run_pipeline([scrape, summarize, alert], sta