Price Monitoring Pipeline
Build an automated pipeline that tracks competitor prices daily, notifies you of changes via webhooks, and optionally archives results to S3 for historical analysis.
Prerequisites
Overview
A complete price monitoring pipeline has four moving parts:
1. Extraction Schema
A JSON schema that tells AlterLab which fields to pull from each product page: name, price, currency, availability.
2. Scheduler
A cron-based schedule that re-scrapes your target URLs on a daily or hourly cadence.
3. Webhooks
A callback URL that receives the extracted data as soon as each scheduled scrape completes.
4. Change Detection
Your application logic that compares incoming prices against stored values and triggers alerts.
Step 1: Define an Extraction Schema
The extraction schema tells AlterLab's AI exactly which data points to pull from a product page. Define it once and reuse it across all your monitored URLs.
import requests
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://api.alterlab.io/api/v1"
# Define the price extraction schema
price_schema = {
"type": "object",
"properties": {
"product_name": {
"type": "string",
"description": "Full product name or title"
},
"current_price": {
"type": "number",
"description": "Current selling price as a number"
},
"original_price": {
"type": "number",
"description": "Original/list price before any discount"
},
"currency": {
"type": "string",
"description": "Currency code (USD, EUR, GBP, etc.)"
},
"in_stock": {
"type": "boolean",
"description": "Whether the product is currently in stock"
},
"on_sale": {
"type": "boolean",
"description": "Whether a sale or discount is active"
},
"seller": {
"type": "string",
"description": "Seller or retailer name"
}
}
}
# Test the schema on a single product page
response = requests.post(
f"{BASE_URL}/scrape",
headers={"X-API-Key": API_KEY},
json={
"url": "https://www.example-store.com/product/widget-pro",
"extraction_schema": price_schema,
"extraction_prompt": "Extract the product pricing information. Use the current displayed price, not the original if on sale."
}
)
data = response.json()
if data.get("success"):
product = data["extracted"]
print(f"{product['product_name']}: {product['currency']} {product['current_price']}")
print(f"In stock: {product['in_stock']}, On sale: {product['on_sale']}")Schema Tip
description fields to your schema properties. AlterLab's extraction AI uses these hints to improve accuracy, especially for ambiguous fields like "price" vs "original price".Step 2: Schedule Daily Price Checks
Use the Scheduler API to create a recurring job. The schedule runs automatically, so you don't need to manage cron or hosting yourself.
import requests
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://api.alterlab.io/api/v1"
# URLs to monitor — one schedule per competitor product
products_to_monitor = [
{
"name": "Competitor A - Widget Pro",
"url": "https://competitor-a.com/products/widget-pro"
},
{
"name": "Competitor B - Widget Pro",
"url": "https://competitor-b.com/products/widget-pro"
},
{
"name": "Competitor C - Widget Deluxe",
"url": "https://competitor-c.com/products/widget-deluxe"
}
]
price_schema = {
"type": "object",
"properties": {
"product_name": {"type": "string"},
"current_price": {"type": "number"},
"original_price": {"type": "number"},
"currency": {"type": "string"},
"in_stock": {"type": "boolean"},
"on_sale": {"type": "boolean"}
}
}
for product in products_to_monitor:
response = requests.post(
f"{BASE_URL}/schedules",
headers={"X-API-Key": API_KEY},
json={
"name": f"Price check: {product['name']}",
"url": product["url"],
"cron": "0 9 * * *", # Every day at 9:00 AM UTC
"scrape_options": {
"extraction_schema": price_schema,
"extraction_prompt": "Extract current product pricing.",
"cache": False # Always fetch fresh data
},
"webhook_url": "https://your-app.com/webhooks/price-update",
"max_credits_per_execution": 5
}
)
schedule = response.json()
print(f"Created schedule {schedule['id']} for {product['name']}")
print(f" Cron: {schedule['cron']}")
print(f" Next run: {schedule.get('next_run_at')}")Cron Tip
0 */4 * * * (every 4 hours) or 0 */1 * * * (hourly). For stable categories, 0 9 * * * (daily) is usually enough. See the Scheduler guide for full cron syntax.Step 3: Configure Webhook Notifications
Each time a scheduled scrape completes, AlterLab sends the extracted data to your webhook URL. Register a webhook to receive these payloads and verify their authenticity.
import requests
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://api.alterlab.io/api/v1"
# Register a webhook endpoint
response = requests.post(
f"{BASE_URL}/webhooks",
headers={"X-API-Key": API_KEY},
json={
"url": "https://your-app.com/webhooks/price-update",
"events": ["scrape.completed", "scrape.failed"],
"description": "Price monitoring pipeline"
}
)
webhook = response.json()
print(f"Webhook ID: {webhook['id']}")
print(f"Secret: {webhook['secret']}") # Save this for signature verification
# Test the webhook
test_response = requests.post(
f"{BASE_URL}/webhooks/{webhook['id']}/test",
headers={"X-API-Key": API_KEY}
)
print(f"Test delivery: {test_response.json()['status']}")Verify the HMAC signature on incoming webhooks to ensure they are authentic:
import hmac
import hashlib
from flask import Flask, request, jsonify
app = Flask(__name__)
WEBHOOK_SECRET = "whsec_your_webhook_secret"
def verify_signature(payload: bytes, signature: str) -> bool:
"""Verify the HMAC-SHA256 signature from AlterLab."""
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
@app.route("/webhooks/price-update", methods=["POST"])
def handle_price_webhook():
# Verify signature
signature = request.headers.get("X-AlterLab-Signature", "")
if not verify_signature(request.data, signature):
return jsonify({"error": "Invalid signature"}), 401
payload = request.json
event = payload.get("event")
if event == "scrape.completed":
extracted = payload["data"]["extracted"]
url = payload["data"]["url"]
print(f"Price update for {url}: {extracted['current_price']}")
# Process the price data (see Step 4)
return jsonify({"received": True}), 200Step 4: Detect Price Changes
Compare incoming prices against your stored values. Trigger alerts for drops, increases, and stock changes.
import json
from datetime import datetime
from pathlib import Path
class PriceTracker:
def __init__(self, storage_path="prices.json"):
self.storage_path = Path(storage_path)
self.prices = self._load()
def _load(self):
if self.storage_path.exists():
return json.loads(self.storage_path.read_text())
return {}
def _save(self):
self.storage_path.write_text(json.dumps(self.prices, indent=2))
def process_webhook(self, payload):
"""Process a webhook payload and detect changes."""
url = payload["data"]["url"]
extracted = payload["data"]["extracted"]
now = datetime.utcnow().isoformat()
current_price = extracted.get("current_price")
product_name = extracted.get("product_name", url)
in_stock = extracted.get("in_stock", True)
# Get previous price data
previous = self.prices.get(url, {})
previous_price = previous.get("price")
was_in_stock = previous.get("in_stock", True)
alerts = []
# Detect price changes
if previous_price is not None and current_price is not None:
change = current_price - previous_price
pct = (change / previous_price) * 100 if previous_price else 0
if change < 0:
alerts.append({
"type": "price_drop",
"product": product_name,
"url": url,
"old_price": previous_price,
"new_price": current_price,
"change_pct": round(pct, 1),
"timestamp": now
})
elif change > 0:
alerts.append({
"type": "price_increase",
"product": product_name,
"url": url,
"old_price": previous_price,
"new_price": current_price,
"change_pct": round(pct, 1),
"timestamp": now
})
# Detect stock changes
if was_in_stock and not in_stock:
alerts.append({
"type": "out_of_stock",
"product": product_name,
"url": url,
"timestamp": now
})
elif not was_in_stock and in_stock:
alerts.append({
"type": "back_in_stock",
"product": product_name,
"url": url,
"price": current_price,
"timestamp": now
})
# Update stored price
self.prices[url] = {
"product": product_name,
"price": current_price,
"in_stock": in_stock,
"last_checked": now,
"history": previous.get("history", []) + [{
"price": current_price,
"in_stock": in_stock,
"timestamp": now
}]
}
self._save()
return alerts
# Usage in your webhook handler
tracker = PriceTracker()
def handle_webhook(payload):
alerts = tracker.process_webhook(payload)
for alert in alerts:
if alert["type"] == "price_drop":
print(f"PRICE DROP: {alert['product']}")
print(f" {alert['old_price']} -> {alert['new_price']} ({alert['change_pct']}%)")
# Send notification (email, Slack, etc.)
elif alert["type"] == "out_of_stock":
print(f"OUT OF STOCK: {alert['product']}")Step 5: Export Results to S3 Optional
For long-term trend analysis, push each price snapshot to Amazon S3 (or any S3-compatible storage like MinIO or Cloudflare R2). This gives you a historical dataset you can query with tools like Athena or DuckDB.
import boto3
import json
from datetime import datetime
s3 = boto3.client("s3")
BUCKET = "your-price-data-bucket"
def export_to_s3(url, extracted):
"""Save a price snapshot to S3, partitioned by date."""
now = datetime.utcnow()
date_key = now.strftime("%Y/%m/%d")
# Create a URL-safe key from the product URL
url_slug = url.replace("https://", "").replace("/", "_")[:100]
key = f"prices/{date_key}/{url_slug}/{now.strftime('%H%M%S')}.json"
record = {
"url": url,
"extracted": extracted,
"scraped_at": now.isoformat(),
"date": now.strftime("%Y-%m-%d")
}
s3.put_object(
Bucket=BUCKET,
Key=key,
Body=json.dumps(record),
ContentType="application/json"
)
print(f"Exported to s3://{BUCKET}/{key}")
# Call this from your webhook handler after processing
def handle_webhook_with_export(payload):
url = payload["data"]["url"]
extracted = payload["data"]["extracted"]
# Process price changes (Step 4)
tracker = PriceTracker()
alerts = tracker.process_webhook(payload)
# Export to S3 for historical analysis
export_to_s3(url, extracted)
return alertsPartitioning
prices/2026/03/24/...) makes it easy to query specific time ranges with AWS Athena or load into a data warehouse. Each file is small enough to process individually.Full Pipeline Example
Here is a complete setup script that creates schedules for a list of competitor products and a webhook to receive results:
import requests
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://api.alterlab.io/api/v1"
WEBHOOK_URL = "https://your-app.com/webhooks/price-update"
# --- 1. Price extraction schema ---
price_schema = {
"type": "object",
"properties": {
"product_name": {"type": "string", "description": "Full product name"},
"current_price": {"type": "number", "description": "Current selling price"},
"original_price": {"type": "number", "description": "Original price before discount"},
"currency": {"type": "string", "description": "Currency code"},
"in_stock": {"type": "boolean", "description": "Currently in stock"},
"on_sale": {"type": "boolean", "description": "Discount is active"},
"seller": {"type": "string", "description": "Seller name"}
}
}
# --- 2. Register webhook ---
webhook = requests.post(
f"{BASE_URL}/webhooks",
headers={"X-API-Key": API_KEY},
json={
"url": WEBHOOK_URL,
"events": ["scrape.completed", "scrape.failed"],
"description": "Price monitoring pipeline"
}
).json()
print(f"Webhook registered: {webhook['id']}")
print(f"Save this secret for verification: {webhook['secret']}")
# --- 3. Create schedules for each product ---
products = [
{"name": "Widget Pro - Store A", "url": "https://store-a.com/widget-pro"},
{"name": "Widget Pro - Store B", "url": "https://store-b.com/widget-pro"},
{"name": "Widget Pro - Store C", "url": "https://store-c.com/widget-pro"},
]
schedule_ids = []
for product in products:
schedule = requests.post(
f"{BASE_URL}/schedules",
headers={"X-API-Key": API_KEY},
json={
"name": f"Price: {product['name']}",
"url": product["url"],
"cron": "0 9 * * *",
"scrape_options": {
"extraction_schema": price_schema,
"extraction_prompt": "Extract current product pricing information.",
"cache": False
},
"webhook_url": WEBHOOK_URL,
"max_credits_per_execution": 5
}
).json()
schedule_ids.append(schedule["id"])
print(f"Schedule created: {schedule['id']} -> {product['name']}")
print(f"\nPipeline ready! {len(schedule_ids)} products monitored daily at 9 AM UTC.")
print(f"Results delivered to: {WEBHOOK_URL}")Best Practices
1. Start with a Small Product Set
Test your schema and pipeline with 3-5 products before scaling. Verify the extraction schema returns accurate data for each competitor's page layout.
2. Set Cost Limits
Use max_credits_per_execution on schedules to cap costs. A product page typically costs 1-3 credits depending on anti-bot complexity.
3. Handle Missing Data Gracefully
Product pages change layout, go offline, or add interstitials. Check for null values and failed scrapes in your webhook handler. Don't trigger false alerts on missing data.
4. Use Percentage Thresholds
Don't alert on every 1-cent change. Set a threshold (e.g., 5% change) to reduce noise. Track the full history for trend analysis.
5. Monitor Schedule Health
Check the execution history periodically. Repeated failures for a URL may indicate the page moved or the site blocked scraping.
6. Disable Caching
Set cache: false for price monitoring schedules. Cached responses defeat the purpose of checking for fresh prices.