AlterLabAlterLab
PricingComparePlaygroundBlogDocs
    AlterLabAlterLab
    PricingPlaygroundBlogDocsChangelog
    IntroductionInstallationYour First Request
    REST APIJob PollingAPI Keys
    OverviewPythonNode.js
    JavaScript RenderingOutput FormatsNewPDF & OCRCachingWebhooksWebSocket Real-TimeNewBring Your Own ProxyProWeb CrawlingNewBatch ScrapingNewSchedulerNewChange DetectionNewCloud Storage ExportNewSpend LimitsNewOrganizations & TeamsNewAlerts & NotificationsNew
    Structured ExtractionAIE-commerce ScrapingNews MonitoringPrice MonitoringNewMulti-Page CrawlingNewMonitoring DashboardNewAI Agent / MCPMCPData Pipeline to CloudNew
    PricingRate LimitsError Codes
    From FirecrawlFrom ApifyNewFrom ScrapingBee / ScraperAPINew
    PlaygroundPricingStatus
    Tutorial
    Pipeline

    Data Pipeline to Cloud

    Build an automated pipeline that scrapes web data on a schedule, extracts structured fields with AI, and exports everything to Amazon S3 for downstream analysis.

    Prerequisites

    This tutorial combines several AlterLab features. Familiarity with the Batch Scraping, Scheduler, and Cloud Storage Export guides is helpful but not required. You will need an AlterLab API key and an Amazon S3 bucket (or any S3-compatible store like MinIO, Cloudflare R2, or Backblaze B2).

    Overview

    A typical web data pipeline has five stages: collect raw pages, extract structured fields, store the output, schedule it to repeat, and monitor for failures. This tutorial wires all five together using AlterLab APIs so the pipeline runs hands-free.

    🪣Step 1Configure S3
    📋Step 2Define Schema
    🔄Step 3Schedule Batch
    📤Step 4Auto-Export
    📊Step 5Alert & Query

    By the end you will have a cron-scheduled batch scrape that extracts structured product data from multiple URLs, pushes JSONL files to S3 after every run, and pings a webhook if anything fails.

    Step 1: Configure Cloud Storage

    First, register your S3 bucket with AlterLab. The platform validates write access by uploading and deleting a small test file, then encrypts your credentials at rest.

    import requests
    
    API_KEY = "YOUR_API_KEY"
    BASE = "https://api.alterlab.io/api/v1"
    
    # Register an S3 storage integration
    resp = requests.post(
        f"{BASE}/integrations/storage",
        headers={"X-API-Key": API_KEY},
        json={
            "provider": "s3",
            "name": "pipeline-bucket",
            "bucket": "my-data-pipeline",
            "region": "us-east-1",
            "prefix": "scraping/raw/",
            "credentials": {
                "access_key_id": "AKIAIOSFODNN7EXAMPLE",
                "secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLE"
            }
        }
    )
    
    integration = resp.json()
    integration_id = integration["id"]
    print(f"Storage integration created: {integration_id}")

    S3-Compatible Stores

    AlterLab works with any S3-compatible storage. For MinIO, Cloudflare R2, or Backblaze B2, add an endpoint_url field inside credentials. See the Cloud Storage guide for provider-specific examples.

    After creating the integration, test the connection to make sure AlterLab can write to your bucket:

    # Test the connection
    test = requests.post(
        f"{BASE}/integrations/storage/{integration_id}/test",
        headers={"X-API-Key": API_KEY}
    )
    print(test.json())
    # {"status": "ok", "message": "Successfully wrote and deleted test file"}

    Step 2: Define an Extraction Schema

    An extraction schema tells AlterLab which fields to pull from each page. This is the “transform” step of your pipeline. AlterLab uses AI to extract the fields you describe, so you do not need to write CSS selectors or XPath.

    # Define the schema for product pages
    product_schema = {
        "type": "object",
        "properties": {
            "product_name": {
                "type": "string",
                "description": "Full product name or title"
            },
            "price": {
                "type": "number",
                "description": "Current selling price as a number"
            },
            "currency": {
                "type": "string",
                "description": "ISO 4217 currency code (USD, EUR, GBP)"
            },
            "in_stock": {
                "type": "boolean",
                "description": "Whether the product is currently in stock"
            },
            "rating": {
                "type": "number",
                "description": "Average customer rating out of 5"
            },
            "review_count": {
                "type": "integer",
                "description": "Total number of customer reviews"
            }
        },
        "required": ["product_name", "price", "currency"]
    }

    Schema Tip

    Add a description to every field. AlterLab's AI extraction uses these descriptions to locate the right data on the page, even when the HTML structure varies between sites. See the Structured Extraction tutorial for advanced schema patterns.

    Step 3: Create a Scheduled Batch Scrape

    Now combine the extraction schema with a batch of URLs and a cron schedule. AlterLab will scrape all the URLs on each run, extract structured data using your schema, and deliver the results.

    # Target URLs for the pipeline
    urls = [
        "https://example.com/products/widget-a",
        "https://example.com/products/widget-b",
        "https://example.com/products/widget-c",
        "https://store.example.com/gadget-pro",
        "https://store.example.com/gadget-lite",
    ]
    
    # Create a scheduled batch scrape
    schedule = requests.post(
        f"{BASE}/schedules",
        headers={"X-API-Key": API_KEY},
        json={
            "name": "product-data-pipeline",
            "cron": "0 6 * * *",          # Every day at 06:00 UTC
            "timezone": "UTC",
            "batch": {
                "urls": [{"url": u} for u in urls],
                "formats": ["json"],
                "extraction_schema": product_schema,
            }
        }
    )
    
    schedule_data = schedule.json()
    schedule_id = schedule_data["id"]
    print(f"Schedule created: {schedule_id}")
    print(f"Next run: {schedule_data['next_run_at']}")

    Cron Tip

    The cron field accepts standard 5-field cron syntax. 0 6 * * * runs daily at 6 AM, 0 */4 * * * runs every 4 hours, and 0 9 * * 1-5 runs weekdays at 9 AM. See the Scheduler guide for all options.

    Step 4: Auto-Export Results to S3

    Link the storage integration to your schedule so every completed run automatically exports results to your S3 bucket. Each export produces a JSONL file with one JSON object per URL.

    # Enable auto-export on the schedule
    resp = requests.patch(
        f"{BASE}/schedules/{schedule_id}",
        headers={"X-API-Key": API_KEY},
        json={
            "export": {
                "integration_id": integration_id,
                "format": "jsonl",
                "path_template": "products/{date}/{schedule_name}.jsonl"
            }
        }
    )
    
    print(resp.json()["export"])
    # {
    #   "integration_id": "int_abc123",
    #   "format": "jsonl",
    #   "path_template": "products/{date}/{schedule_name}.jsonl"
    # }
    
    # After the next run, files appear at:
    # s3://my-data-pipeline/scraping/raw/products/2026-03-24/product-data-pipeline.jsonl

    The path_template supports these placeholders:

    PlaceholderExampleDescription
    {date}2026-03-24Run date in ISO format
    {schedule_name}product-data-pipelineSlug of the schedule name
    {run_id}run_x7k9m2Unique run identifier
    {timestamp}1711267200Unix epoch of the run

    Step 5: Set Up Failure Alerts

    Register a webhook to receive notifications when a pipeline run fails or only partially succeeds. This is your monitoring layer.

    # Register a webhook for schedule events
    webhook = requests.post(
        f"{BASE}/webhooks",
        headers={"X-API-Key": API_KEY},
        json={
            "url": "https://your-server.com/hooks/pipeline-alerts",
            "events": [
                "schedule.run.failed",
                "schedule.run.partial",
                "schedule.run.completed"
            ],
            "secret": "whsec_your_signing_secret"
        }
    )
    
    print(webhook.json()["id"])
    
    # Example failure payload your server receives:
    # {
    #   "event": "schedule.run.failed",
    #   "schedule_id": "sch_abc123",
    #   "schedule_name": "product-data-pipeline",
    #   "run_id": "run_x7k9m2",
    #   "failed_urls": 3,
    #   "total_urls": 5,
    #   "errors": [
    #     {"url": "https://...", "error": "timeout", "status": 408}
    #   ]
    # }

    Webhook Security

    Always verify webhook signatures before processing payloads. AlterLab signs every payload with HMAC-SHA256 using your secret. See the Webhooks guide for verification examples.

    Step 6: Query Stored Data

    Once the pipeline runs, JSONL files accumulate in your S3 bucket. Use standard tools to read and analyze them. Each line is a standalone JSON object with the extracted fields plus metadata.

    import boto3
    import json
    from datetime import date
    
    s3 = boto3.client("s3")
    bucket = "my-data-pipeline"
    today = date.today().isoformat()
    key = f"scraping/raw/products/{today}/product-data-pipeline.jsonl"
    
    # Download today's export
    obj = s3.get_object(Bucket=bucket, Key=key)
    lines = obj["Body"].read().decode().strip().split("\n")
    
    products = [json.loads(line) for line in lines]
    
    # Analyze: find cheapest product
    cheapest = min(products, key=lambda p: p["extracted"]["price"])
    print(f"Cheapest: {cheapest['extracted']['product_name']}")
    print(f"Price: {cheapest['extracted']['price']} {cheapest['extracted']['currency']}")
    
    # Aggregate: average price across all products
    avg_price = sum(p["extracted"]["price"] for p in products) / len(products)
    print(f"Average price: {avg_price:.2f}")
    
    # Filter: out-of-stock items
    out_of_stock = [
        p for p in products
        if not p["extracted"].get("in_stock", True)
    ]
    print(f"Out of stock: {len(out_of_stock)} of {len(products)}")

    Large Datasets

    For pipelines that produce large exports, consider loading JSONL files into a data warehouse (Snowflake, BigQuery, DuckDB) or using S3 Select to query in place without downloading the full file.

    Full Pipeline Example

    Here is a single script that sets up the entire pipeline from scratch. Run it once, and AlterLab handles everything from that point on.

    """
    AlterLab Data Pipeline — Full Setup Script
    
    Creates:
    1. S3 storage integration
    2. Scheduled batch scrape with extraction schema
    3. Auto-export to S3
    4. Failure alert webhook
    
    After running, the pipeline executes daily at 06:00 UTC.
    """
    
    import requests
    
    API_KEY = "YOUR_API_KEY"
    BASE = "https://api.alterlab.io/api/v1"
    headers = {"X-API-Key": API_KEY}
    
    # --- 1. Storage Integration ---
    storage = requests.post(
        f"{BASE}/integrations/storage",
        headers=headers,
        json={
            "provider": "s3",
            "name": "pipeline-bucket",
            "bucket": "my-data-pipeline",
            "region": "us-east-1",
            "prefix": "scraping/raw/",
            "credentials": {
                "access_key_id": "AKIAIOSFODNN7EXAMPLE",
                "secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLE"
            }
        }
    ).json()
    
    integration_id = storage["id"]
    print(f"[1/4] Storage integration: {integration_id}")
    
    # --- 2. Extraction Schema ---
    schema = {
        "type": "object",
        "properties": {
            "product_name": {"type": "string", "description": "Product name"},
            "price": {"type": "number", "description": "Current price"},
            "currency": {"type": "string", "description": "Currency code"},
            "in_stock": {"type": "boolean", "description": "Availability"},
            "rating": {"type": "number", "description": "Rating out of 5"},
            "review_count": {"type": "integer", "description": "Number of reviews"}
        },
        "required": ["product_name", "price", "currency"]
    }
    
    # --- 3. Scheduled Batch with Export ---
    urls = [
        "https://example.com/products/widget-a",
        "https://example.com/products/widget-b",
        "https://example.com/products/widget-c",
    ]
    
    schedule = requests.post(
        f"{BASE}/schedules",
        headers=headers,
        json={
            "name": "product-data-pipeline",
            "cron": "0 6 * * *",
            "timezone": "UTC",
            "batch": {
                "urls": [{"url": u} for u in urls],
                "formats": ["json"],
                "extraction_schema": schema,
            },
            "export": {
                "integration_id": integration_id,
                "format": "jsonl",
                "path_template": "products/{date}/{schedule_name}.jsonl"
            }
        }
    ).json()
    
    print(f"[2/4] Schedule: {schedule['id']}")
    print(f"       Next run: {schedule['next_run_at']}")
    
    # --- 4. Failure Webhook ---
    webhook = requests.post(
        f"{BASE}/webhooks",
        headers=headers,
        json={
            "url": "https://your-server.com/hooks/pipeline-alerts",
            "events": [
                "schedule.run.failed",
                "schedule.run.partial",
                "schedule.run.completed"
            ],
            "secret": "whsec_your_signing_secret"
        }
    ).json()
    
    print(f"[3/4] Webhook: {webhook['id']}")
    print("[4/4] Pipeline is live. Results export to S3 after each run.")

    Best Practices

    Partition by date

    Use {date} in your path template so each day's data lands in its own folder. This makes it simple to query specific time ranges or set up S3 lifecycle rules for archival.

    Use JSONL format

    JSONL (one JSON object per line) is natively supported by DuckDB, Athena, BigQuery, and pandas. It also streams efficiently for large exports.

    Monitor partial failures

    Subscribe to schedule.run.partial events, not just schedule.run.failed. A run where 1 of 50 URLs times out still exports the other 49, but you want to know about the gap.

    Separate staging and production

    Use different S3 prefixes (or buckets) for development and production pipelines. This prevents test runs from polluting your analytics data.

    Set spend limits

    Configure spend limits on your account to cap how much the pipeline can consume per day. This protects against runaway costs if a schedule is misconfigured.

    Rotate credentials

    Use the credential rotation endpoint to update S3 keys without downtime. AlterLab validates the new credentials before applying them.

    ← Previous: AI Agent / MCPNext: Pricing Reference →
    Last updated: March 2026

    On this page

    AlterLabAlterLab

    AlterLab is the modern web scraping platform for developers. Reliable, scalable, and easy to use.

    Product

    • Pricing
    • Documentation
    • Changelog
    • Status

    Solutions

    • Python API
    • JS Rendering
    • Anti-Bot Bypass
    • Compare APIs

    Comparisons

    • Compare All
    • vs ScraperAPI
    • vs Firecrawl
    • vs ScrapingBee
    • vs Bright Data
    • vs Apify

    Company

    • About
    • Blog
    • Contact
    • FAQ

    Guides

    • Bypass Cloudflare
    • Playwright Anti-Detection
    • Puppeteer Bypass Guide
    • Selenium Detection Fix
    • Best Scraping APIs 2026

    Legal

    • Privacy
    • Terms
    • Acceptable Use
    • DPA
    • Cookie Policy
    • Licenses

    © 2026 RapierCraft Inc. All rights reserved.

    Middletown, DE