
Build a Production Web Scraping Pipeline for RAG in 2026
Learn how to build a reliable web scraping pipeline for RAG applications. Covers data extraction, cleaning, scheduling, and storage with practical code examples.
April 14, 2026
RAG applications live or die on data quality. Your embedding model can only be as good as the documents you feed it. This guide covers how to build a scraping pipeline that delivers clean, structured, and fresh data at scale.
The Architecture
A production RAG scraping pipeline has four stages:
- Fetch — retrieve pages reliably, bypassing anti-bot systems
- Extract — pull clean text from rendered HTML
- Transform — chunk, deduplicate, and format for embeddings
- Store — load into your vector database with metadata
Each stage needs to handle failure gracefully. Networks timeout. Pages change structure. Anti-bot systems update. Your pipeline should retry, alert, and recover without manual intervention.
Stage 1: Fetch — Reliable Page Retrieval
The hardest part of web scraping at scale is not the parsing. It is getting the page content in the first place. Modern sites use JavaScript rendering, CAPTCHAs, and fingerprinting to block automated requests.
You need three things: rotating proxies, headless browser support, and automatic anti-bot bypass. Here is how a fetch call looks with the AlterLab Python SDK:
from alterlab import AlterLab
client = AlterLab(api_key="YOUR_API_KEY")
response = client.scrape(
url="https://docs.example.com/api-reference",
formats=["markdown"],
min_tier=3,
timeout=30
)
print(response.markdown)The min_tier=3 parameter tells the system to skip basic HTTP fetches and go straight to a rendered browser session. This matters for SPAs and sites that load content dynamically.
The same request via cURL:
curl -X POST https://api.alterlab.io/v1/scrape \
-H "X-API-Key: YOUR_API_KEY" \
-d '{
"url": "https://docs.example.com/api-reference",
"formats": ["markdown"],
"min_tier": 3,
"timeout": 30
}'Try scraping this documentation page with AlterLab
Handling Failures
Not every page will return on the first attempt. Build retry logic with exponential backoff:
import time
from alterlab import AlterLab, APIError
client = AlterLab(api_key="YOUR_API_KEY")
def fetch_with_retry(url, max_retries=3):
for attempt in range(max_retries):
try:
response = client.scrape(url=url, formats=["markdown"])
return response.markdown
except APIError as e:
if attempt == max_retries - 1:
raise
wait = 2 ** attempt
time.sleep(wait)Log every failure with the URL, status code, and error type. This data tells you which sites need higher tiers or longer timeouts.
Stage 2: Extract — Clean Text from HTML
Raw HTML is noisy. Script tags, navigation menus, footers, and ad containers dilute your embedding quality. You want the main content and nothing else.
Request Markdown output directly. The API strips boilerplate and returns readable text:
response = client.scrape(
url="https://blog.example.com/post/rag-best-practices",
formats=["markdown"],
exclude_selectors=["nav", "footer", ".sidebar", ".comments"]
)
content = response.markdownThe exclude_selectors parameter removes page regions you know are irrelevant. Common targets: navigation bars, sidebars, comment sections, cookie banners.
Extracting Structured Metadata
RAG benefits from context. A chunk about "rate limits" is more useful when you know it came from the "Billing" section of a pricing page. Extract metadata alongside content:
response = client.scrape(
url="https://docs.example.com/pricing",
formats=["json", "markdown"],
extract={
"schema": {
"product_name": "h1",
"price": ".price-amount",
"features": ".feature-list li",
"last_updated": "time[datetime]"
}
}
)
metadata = response.json
content = response.markdownStore the metadata with each chunk. It becomes filterable context at query time.
Stage 3: Transform — Chunking for Embeddings
Raw documents are too large for most embedding models. You need to split them into overlapping chunks that preserve context.
Chunking Strategy
For documentation and articles, split on headings. Each section becomes a chunk with its heading as a prefix:
import re
def chunk_by_headings(markdown: str, max_tokens: 500) -> list[dict]:
sections = re.split(r'^(#{1,3})\s+', markdown, flags=re.MULTILINE)
chunks = []
current_heading = "Introduction"
buffer = ""
for part in sections:
if re.match(r'^#{1,3}$', part):
continue
if re.match(r'^.+', part) and len(part) < 80:
if buffer.strip():
chunks.append({"heading": current_heading, "text": buffer.strip()})
current_heading = part.strip()
buffer = ""
else:
buffer += part
if buffer.strip():
chunks.append({"heading": current_heading, "text": buffer.strip()})
return chunksFor longer sections that exceed your token limit, split on paragraph boundaries and maintain a 20 percent overlap between adjacent chunks. Overlap prevents context loss at chunk boundaries.
Deduplication
Scraping pipelines collect duplicate content. Canonical URLs change. Pagination repeats content. Run a deduplication pass before embedding:
from sentence_transformers import SentenceTransformer
import numpy as np
def deduplicate_chunks(chunks: list[dict], threshold: float = 0.92) -> list[dict]:
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode([c["text"] for c in chunks])
keep = []
for i, emb in enumerate(embeddings):
is_duplicate = False
for kept in keep:
similarity = np.dot(emb, kept) / (np.linalg.norm(emb) * np.linalg.norm(kept))
if similarity > threshold:
is_duplicate = True
break
if not is_duplicate:
keep.append(i)
return [chunks[i] for i in keep]This catches near-duplicates that exact string matching would miss.
Stage 4: Store — Vector Database Loading
Your chunks need to land in a vector store with metadata attached. The exact database depends on your stack — Pinecone, Weaviate, Qdrant, or pgvector all work.
from sentence_transformers import SentenceTransformer
import qdrant_client
model = SentenceTransformer("all-MiniLM-L6-v2")
client = qdrant_client.QdrantClient(url="http://localhost:6333")
def store_chunks(chunks: list[dict], collection: str, source_url: str):
texts = [c["text"] for c in chunks]
embeddings = model.encode(texts)
client.upsert(
collection_name=collection,
points=[
{
"id": i,
"vector": emb.tolist(),
"payload": {
"text": chunk["text"],
"heading": chunk["heading"],
"source": source_url,
"scraped_at": "2026-04-14T00:00:00Z"
}
}
for i, (chunk, emb) in enumerate(zip(chunks, embeddings))
]
)Include source and scraped_at in every payload. You will need them for freshness checks and source attribution in RAG responses.
Automating the Pipeline
Manual scrapes do not scale. You need scheduled runs that keep your vector store fresh without manual triggers.
Scheduling with Cron
Set up recurring scrapes for each source. Documentation sites update weekly. News sites need daily or hourly runs. E-commerce pricing changes multiple times per day.
schedule = client.schedules.create(
url="https://docs.example.com/api-reference",
formats=["markdown"],
cron="0 2 * * 1", # Every Monday at 2 AM UTC
webhook="https://your-server.com/webhook/scrape-complete",
output_format="json"
)
print(f"Schedule ID: {schedule.id}")The webhook fires when each scrape completes. Your handler receives the data, runs the transform pipeline, and loads the vector store. No polling required.
Monitoring for Changes
Not every page needs re-scraping on a fixed schedule. Use change detection to scrape only when content actually changed:
monitor = client.monitors.create(
url="https://pricing.example.com/plans",
check_interval="6h",
threshold=0.05, # Alert on 5%+ content change
webhook="https://your-server.com/webhook/page-changed"
)This saves compute on stable pages and catches updates on volatile ones. Pair monitoring with scheduling for a hybrid approach: scheduled scrapes for known update cycles, change-triggered scrapes for everything else.
Cost and Scale Considerations
Scraping pipelines have two cost drivers: number of pages and complexity per page. A static HTML page costs less than a JavaScript-rendered SPA behind a CAPTCHA.
Start with the lowest tier that works for each target. Set min_tier to avoid wasting balance on over-provisioned requests. Review your pricing plans to match tier selection with your budget.
Error Handling and Observability
Production pipelines fail. The difference between a hobby project and a production system is how you handle those failures.
Log these events at minimum:
- HTTP errors (4xx, 5xx) with URL and response body
- Timeout events with duration and tier used
- Content change alerts from monitors
- Webhook delivery failures
Set up alerts on scrape failure rates. If a source starts failing consistently, it likely updated its anti-bot measures. Bump the tier or adjust your selectors.
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("scrape_pipeline.log"),
logging.StreamHandler()
]
)
def scrape_and_log(url: str):
try:
response = client.scrape(url=url, formats=["markdown"])
logging.info(f"Scraped {url}: {len(response.markdown)} chars")
return response.markdown
except APIError as e:
logging.error(f"Failed to scrape {url}: {e.status_code} - {e.message}")
raiseRoute logs to your existing observability stack. Structured JSON logs work best for querying in Datadog, Grafana, or CloudWatch.
Putting It All Together
Here is the complete pipeline in a single script:
import logging
from alterlab import AlterLab
from sentence_transformers import SentenceTransformer
import qdrant_client
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
client = AlterLab(api_key="YOUR_API_KEY")
model = SentenceTransformer("all-MiniLM-L6-v2")
qdrant = qdrant_client.QdrantClient(url="http://localhost:6333")
TARGETS = [
{"url": "https://docs.example.com/api", "cron": "0 2 * * 1"},
{"url": "https://blog.example.com/tag/ai", "cron": "0 6 * * *"},
]
def run_pipeline():
for target in TARGETS:
logging.info(f"Scraping {target['url']}")
response = client.scrape(
url=target["url"],
formats=["markdown"],
min_tier=3,
exclude_selectors=["nav", "footer", ".sidebar"]
)
chunks = chunk_by_headings(response.markdown)
chunks = deduplicate_chunks(chunks)
store_chunks(chunks, collection="rag-docs", source_url=target["url"])
logging.info(f"Stored {len(chunks)} chunks from {target['url']}")
if __name__ == "__main__":
run_pipeline()Schedule this script via cron on your server or through the scheduling API for managed recurring execution.
Takeaway
A production RAG scraping pipeline needs four things: reliable fetch with anti-bot bypass, clean extraction in Markdown format, smart chunking with deduplication, and automated scheduling with change monitoring. Build each stage to handle failure independently. Log everything. Start with the lowest tier that works for each target and scale up only when needed.
The quickstart guide covers account setup and your first API call. For common questions about tiers, formats, and scheduling, check the FAQ.
Was this article helpful?
Frequently Asked Questions
Related Articles
Popular Posts
Recommended

Selenium Bot Detection: Why You Get Caught and How to Avoid It

How to Scrape Glassdoor: Complete Guide for 2026

Why Your Headless Browser Gets Detected (and How to Fix It)

Scraping JavaScript-Heavy SPAs with Python: Dynamic Content, Infinite Scroll, and API Interception

How to Bypass Cloudflare Bot Protection When Web Scraping
Newsletter
Scraping insights and API tips. No spam.
Recommended Reading

Selenium Bot Detection: Why You Get Caught and How to Avoid It

How to Scrape Glassdoor: Complete Guide for 2026

Why Your Headless Browser Gets Detected (and How to Fix It)

Scraping JavaScript-Heavy SPAs with Python: Dynamic Content, Infinite Scroll, and API Interception

How to Bypass Cloudflare Bot Protection When Web Scraping
Stay in the Loop
Get scraping insights, API tips, and platform updates. No spam — we only send when we have something worth reading.


