Data Pipeline to Cloud
Build an automated pipeline that scrapes web data on a schedule, extracts structured fields with AI, and exports everything to Amazon S3 for downstream analysis.
Prerequisites
Overview
A typical web data pipeline has five stages: collect raw pages, extract structured fields, store the output, schedule it to repeat, and monitor for failures. This tutorial wires all five together using AlterLab APIs so the pipeline runs hands-free.
By the end you will have a cron-scheduled batch scrape that extracts structured product data from multiple URLs, pushes JSONL files to S3 after every run, and pings a webhook if anything fails.
Step 1: Configure Cloud Storage
First, register your S3 bucket with AlterLab. The platform validates write access by uploading and deleting a small test file, then encrypts your credentials at rest.
import requests
API_KEY = "YOUR_API_KEY"
BASE = "https://api.alterlab.io/api/v1"
# Register an S3 storage integration
resp = requests.post(
f"{BASE}/integrations/storage",
headers={"X-API-Key": API_KEY},
json={
"provider": "s3",
"name": "pipeline-bucket",
"bucket": "my-data-pipeline",
"region": "us-east-1",
"prefix": "scraping/raw/",
"credentials": {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLE"
}
}
)
integration = resp.json()
integration_id = integration["id"]
print(f"Storage integration created: {integration_id}")S3-Compatible Stores
endpoint_url field inside credentials. See the Cloud Storage guide for provider-specific examples.After creating the integration, test the connection to make sure AlterLab can write to your bucket:
# Test the connection
test = requests.post(
f"{BASE}/integrations/storage/{integration_id}/test",
headers={"X-API-Key": API_KEY}
)
print(test.json())
# {"status": "ok", "message": "Successfully wrote and deleted test file"}Step 2: Define an Extraction Schema
An extraction schema tells AlterLab which fields to pull from each page. This is the “transform” step of your pipeline. AlterLab uses AI to extract the fields you describe, so you do not need to write CSS selectors or XPath.
# Define the schema for product pages
product_schema = {
"type": "object",
"properties": {
"product_name": {
"type": "string",
"description": "Full product name or title"
},
"price": {
"type": "number",
"description": "Current selling price as a number"
},
"currency": {
"type": "string",
"description": "ISO 4217 currency code (USD, EUR, GBP)"
},
"in_stock": {
"type": "boolean",
"description": "Whether the product is currently in stock"
},
"rating": {
"type": "number",
"description": "Average customer rating out of 5"
},
"review_count": {
"type": "integer",
"description": "Total number of customer reviews"
}
},
"required": ["product_name", "price", "currency"]
}Schema Tip
description to every field. AlterLab's AI extraction uses these descriptions to locate the right data on the page, even when the HTML structure varies between sites. See the Structured Extraction tutorial for advanced schema patterns.Step 3: Create a Scheduled Batch Scrape
Now combine the extraction schema with a batch of URLs and a cron schedule. AlterLab will scrape all the URLs on each run, extract structured data using your schema, and deliver the results.
# Target URLs for the pipeline
urls = [
"https://example.com/products/widget-a",
"https://example.com/products/widget-b",
"https://example.com/products/widget-c",
"https://store.example.com/gadget-pro",
"https://store.example.com/gadget-lite",
]
# Create a scheduled batch scrape
schedule = requests.post(
f"{BASE}/schedules",
headers={"X-API-Key": API_KEY},
json={
"name": "product-data-pipeline",
"cron": "0 6 * * *", # Every day at 06:00 UTC
"timezone": "UTC",
"batch": {
"urls": [{"url": u} for u in urls],
"formats": ["json"],
"extraction_schema": product_schema,
}
}
)
schedule_data = schedule.json()
schedule_id = schedule_data["id"]
print(f"Schedule created: {schedule_id}")
print(f"Next run: {schedule_data['next_run_at']}")Cron Tip
cron field accepts standard 5-field cron syntax. 0 6 * * * runs daily at 6 AM, 0 */4 * * * runs every 4 hours, and 0 9 * * 1-5 runs weekdays at 9 AM. See the Scheduler guide for all options.Step 4: Auto-Export Results to S3
Link the storage integration to your schedule so every completed run automatically exports results to your S3 bucket. Each export produces a JSONL file with one JSON object per URL.
# Enable auto-export on the schedule
resp = requests.patch(
f"{BASE}/schedules/{schedule_id}",
headers={"X-API-Key": API_KEY},
json={
"export": {
"integration_id": integration_id,
"format": "jsonl",
"path_template": "products/{date}/{schedule_name}.jsonl"
}
}
)
print(resp.json()["export"])
# {
# "integration_id": "int_abc123",
# "format": "jsonl",
# "path_template": "products/{date}/{schedule_name}.jsonl"
# }
# After the next run, files appear at:
# s3://my-data-pipeline/scraping/raw/products/2026-03-24/product-data-pipeline.jsonlThe path_template supports these placeholders:
| Placeholder | Example | Description |
|---|---|---|
| {date} | 2026-03-24 | Run date in ISO format |
| {schedule_name} | product-data-pipeline | Slug of the schedule name |
| {run_id} | run_x7k9m2 | Unique run identifier |
| {timestamp} | 1711267200 | Unix epoch of the run |
Step 5: Set Up Failure Alerts
Register a webhook to receive notifications when a pipeline run fails or only partially succeeds. This is your monitoring layer.
# Register a webhook for schedule events
webhook = requests.post(
f"{BASE}/webhooks",
headers={"X-API-Key": API_KEY},
json={
"url": "https://your-server.com/hooks/pipeline-alerts",
"events": [
"schedule.run.failed",
"schedule.run.partial",
"schedule.run.completed"
],
"secret": "whsec_your_signing_secret"
}
)
print(webhook.json()["id"])
# Example failure payload your server receives:
# {
# "event": "schedule.run.failed",
# "schedule_id": "sch_abc123",
# "schedule_name": "product-data-pipeline",
# "run_id": "run_x7k9m2",
# "failed_urls": 3,
# "total_urls": 5,
# "errors": [
# {"url": "https://...", "error": "timeout", "status": 408}
# ]
# }Webhook Security
secret. See the Webhooks guide for verification examples.Step 6: Query Stored Data
Once the pipeline runs, JSONL files accumulate in your S3 bucket. Use standard tools to read and analyze them. Each line is a standalone JSON object with the extracted fields plus metadata.
import boto3
import json
from datetime import date
s3 = boto3.client("s3")
bucket = "my-data-pipeline"
today = date.today().isoformat()
key = f"scraping/raw/products/{today}/product-data-pipeline.jsonl"
# Download today's export
obj = s3.get_object(Bucket=bucket, Key=key)
lines = obj["Body"].read().decode().strip().split("\n")
products = [json.loads(line) for line in lines]
# Analyze: find cheapest product
cheapest = min(products, key=lambda p: p["extracted"]["price"])
print(f"Cheapest: {cheapest['extracted']['product_name']}")
print(f"Price: {cheapest['extracted']['price']} {cheapest['extracted']['currency']}")
# Aggregate: average price across all products
avg_price = sum(p["extracted"]["price"] for p in products) / len(products)
print(f"Average price: {avg_price:.2f}")
# Filter: out-of-stock items
out_of_stock = [
p for p in products
if not p["extracted"].get("in_stock", True)
]
print(f"Out of stock: {len(out_of_stock)} of {len(products)}")Large Datasets
S3 Select to query in place without downloading the full file.Full Pipeline Example
Here is a single script that sets up the entire pipeline from scratch. Run it once, and AlterLab handles everything from that point on.
"""
AlterLab Data Pipeline — Full Setup Script
Creates:
1. S3 storage integration
2. Scheduled batch scrape with extraction schema
3. Auto-export to S3
4. Failure alert webhook
After running, the pipeline executes daily at 06:00 UTC.
"""
import requests
API_KEY = "YOUR_API_KEY"
BASE = "https://api.alterlab.io/api/v1"
headers = {"X-API-Key": API_KEY}
# --- 1. Storage Integration ---
storage = requests.post(
f"{BASE}/integrations/storage",
headers=headers,
json={
"provider": "s3",
"name": "pipeline-bucket",
"bucket": "my-data-pipeline",
"region": "us-east-1",
"prefix": "scraping/raw/",
"credentials": {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLE"
}
}
).json()
integration_id = storage["id"]
print(f"[1/4] Storage integration: {integration_id}")
# --- 2. Extraction Schema ---
schema = {
"type": "object",
"properties": {
"product_name": {"type": "string", "description": "Product name"},
"price": {"type": "number", "description": "Current price"},
"currency": {"type": "string", "description": "Currency code"},
"in_stock": {"type": "boolean", "description": "Availability"},
"rating": {"type": "number", "description": "Rating out of 5"},
"review_count": {"type": "integer", "description": "Number of reviews"}
},
"required": ["product_name", "price", "currency"]
}
# --- 3. Scheduled Batch with Export ---
urls = [
"https://example.com/products/widget-a",
"https://example.com/products/widget-b",
"https://example.com/products/widget-c",
]
schedule = requests.post(
f"{BASE}/schedules",
headers=headers,
json={
"name": "product-data-pipeline",
"cron": "0 6 * * *",
"timezone": "UTC",
"batch": {
"urls": [{"url": u} for u in urls],
"formats": ["json"],
"extraction_schema": schema,
},
"export": {
"integration_id": integration_id,
"format": "jsonl",
"path_template": "products/{date}/{schedule_name}.jsonl"
}
}
).json()
print(f"[2/4] Schedule: {schedule['id']}")
print(f" Next run: {schedule['next_run_at']}")
# --- 4. Failure Webhook ---
webhook = requests.post(
f"{BASE}/webhooks",
headers=headers,
json={
"url": "https://your-server.com/hooks/pipeline-alerts",
"events": [
"schedule.run.failed",
"schedule.run.partial",
"schedule.run.completed"
],
"secret": "whsec_your_signing_secret"
}
).json()
print(f"[3/4] Webhook: {webhook['id']}")
print("[4/4] Pipeline is live. Results export to S3 after each run.")Best Practices
Partition by date
Use {date} in your path template so each day's data lands in its own folder. This makes it simple to query specific time ranges or set up S3 lifecycle rules for archival.
Use JSONL format
JSONL (one JSON object per line) is natively supported by DuckDB, Athena, BigQuery, and pandas. It also streams efficiently for large exports.
Monitor partial failures
Subscribe to schedule.run.partial events, not just schedule.run.failed. A run where 1 of 50 URLs times out still exports the other 49, but you want to know about the gap.
Separate staging and production
Use different S3 prefixes (or buckets) for development and production pipelines. This prevents test runs from polluting your analytics data.
Set spend limits
Configure spend limits on your account to cap how much the pipeline can consume per day. This protects against runaway costs if a schedule is misconfigured.
Rotate credentials
Use the credential rotation endpoint to update S3 keys without downtime. AlterLab validates the new credentials before applying them.