Advanced Configurations
This section covers complex configuration scenarios and advanced usage patterns for Scrapy Item Ingest in enterprise and high-performance environments.
Multi-Environment Setup
Production-Grade Environment Configuration
A complete multi-environment setup with proper configuration management:
# config/base.py - Base configuration
import os
from pathlib import Path
class BaseConfig:
# Project settings
BOT_NAME = 'enterprise_scraper'
SPIDER_MODULES = ['enterprise_scraper.spiders']
NEWSPIDER_MODULE = 'enterprise_scraper.spiders'
# Default Scrapy settings
ROBOTSTXT_OBEY = True
USER_AGENT = 'enterprise_scraper (+https://yourcompany.com)'
# Database settings (will be overridden by environment)
CREATE_TABLES = False
# Pipeline configuration
ITEM_PIPELINES = {
'enterprise_scraper.pipelines.ValidationPipeline': 200,
'scrapy_item_ingest.DbInsertPipeline': 300,
'enterprise_scraper.pipelines.NotificationPipeline': 400,
}
# Extensions
EXTENSIONS = {
'scrapy_item_ingest.LoggingExtension': 500,
'enterprise_scraper.extensions.MetricsExtension': 510,
}
# config/development.py - Development environment
from .base import BaseConfig
class DevelopmentConfig(BaseConfig):
# Database
DB_URL = 'postgresql://dev_user:dev_pass@localhost:5432/scrapy_dev'
CREATE_TABLES = True
# Performance settings for development
CONCURRENT_REQUESTS = 8
DOWNLOAD_DELAY = 1
RANDOMIZE_DOWNLOAD_DELAY = 0.5
# Logging
LOG_LEVEL = 'DEBUG'
# Limit items for testing
CLOSESPIDER_ITEMCOUNT = 100
# config/staging.py - Staging environment
from .base import BaseConfig
import os
class StagingConfig(BaseConfig):
# Database
DB_URL = os.getenv('STAGING_DATABASE_URL')
CREATE_TABLES = True
# Performance settings
CONCURRENT_REQUESTS = 16
CONCURRENT_REQUESTS_PER_DOMAIN = 8
DOWNLOAD_DELAY = 0.5
# Autothrottle for staging
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 10
AUTOTHROTTLE_TARGET_CONCURRENCY = 4.0
# Logging
LOG_LEVEL = 'INFO'
# config/production.py - Production environment
from .base import BaseConfig
import os
class ProductionConfig(BaseConfig):
# Database (from environment variables)
DB_URL = os.getenv('DATABASE_URL')
CREATE_TABLES = False # Tables must exist in production
# High-performance settings
CONCURRENT_REQUESTS = 32
CONCURRENT_REQUESTS_PER_DOMAIN = 16
DOWNLOAD_DELAY = 0.1
# Advanced autothrottle
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 0.1
AUTOTHROTTLE_MAX_DELAY = 5
AUTOTHROTTLE_TARGET_CONCURRENCY = 8.0
AUTOTHROTTLE_DEBUG = False
# Production logging
LOG_LEVEL = 'WARNING'
TELNETCONSOLE_ENABLED = False
# Database connection pooling
DB_SETTINGS = {
'pool_size': 20,
'max_overflow': 30,
'pool_timeout': 30,
'pool_recycle': 3600,
'pool_pre_ping': True,
}
Environment Loader
# settings.py - Dynamic environment loading
import os
from importlib import import_module
def get_config():
env = os.getenv('SCRAPY_ENV', 'development')
config_map = {
'development': 'config.development.DevelopmentConfig',
'staging': 'config.staging.StagingConfig',
'production': 'config.production.ProductionConfig',
}
config_path = config_map.get(env)
if not config_path:
raise ValueError(f"Unknown environment: {env}")
module_path, class_name = config_path.rsplit('.', 1)
module = import_module(module_path)
return getattr(module, class_name)
# Load configuration based on environment
config = get_config()
# Apply all settings from config class
for setting in dir(config):
if setting.isupper():
globals()[setting] = getattr(config, setting)
Running with Different Environments
# Development
SCRAPY_ENV=development scrapy crawl products
# Staging
SCRAPY_ENV=staging scrapy crawl products
# Production
SCRAPY_ENV=production scrapy crawl products
Distributed Crawling Architecture
Redis-Based Job Queue System
# distributors/job_manager.py
import redis
import json
import uuid
from datetime import datetime
class DistributedJobManager:
def __init__(self, redis_url='redis://localhost:6379'):
self.redis = redis.from_url(redis_url)
self.jobs_queue = 'scrapy:jobs'
self.results_queue = 'scrapy:results'
def create_crawl_job(self, spider_name, urls, config=None):
"""Create a distributed crawl job"""
job_id = str(uuid.uuid4())
job_data = {
'job_id': job_id,
'spider_name': spider_name,
'urls': urls,
'config': config or {},
'status': 'pending',
'created_at': datetime.now().isoformat(),
'worker_id': None,
}
# Store job metadata
self.redis.hset(f'job:{job_id}', mapping=job_data)
# Queue URLs for processing
for url in urls:
url_data = {
'job_id': job_id,
'url': url,
'attempts': 0,
}
self.redis.lpush(self.jobs_queue, json.dumps(url_data))
return job_id
def get_next_job(self, worker_id):
"""Get next job for worker"""
job_data = self.redis.brpop(self.jobs_queue, timeout=30)
if job_data:
job = json.loads(job_data[1])
job['worker_id'] = worker_id
job['started_at'] = datetime.now().isoformat()
# Update job status
self.redis.hset(f"job:{job['job_id']}", 'status', 'processing')
self.redis.hset(f"job:{job['job_id']}", 'worker_id', worker_id)
return job
return None
def mark_job_completed(self, job_id, stats):
"""Mark job as completed with statistics"""
self.redis.hset(f'job:{job_id}', mapping={
'status': 'completed',
'completed_at': datetime.now().isoformat(),
'stats': json.dumps(stats),
})
# spiders/distributed_spider.py
import scrapy
import json
from distributors.job_manager import DistributedJobManager
class DistributedSpider(scrapy.Spider):
name = 'distributed'
def __init__(self, worker_id=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.worker_id = worker_id or f'worker_{os.getpid()}'
self.job_manager = DistributedJobManager()
self.current_job = None
def start_requests(self):
"""Get jobs from distributed queue"""
while True:
job = self.job_manager.get_next_job(self.worker_id)
if not job:
break
self.current_job = job
self.logger.info(f"Processing job {job['job_id']} URL: {job['url']}")
yield scrapy.Request(
job['url'],
self.parse,
meta={'job_data': job}
)
def parse(self, response):
job_data = response.meta['job_data']
# Extract data based on spider logic
for item in self.extract_items(response):
# Add job metadata to items
item['job_id'] = job_data['job_id']
item['worker_id'] = self.worker_id
yield item
# Follow links and add to queue if needed
for link in response.css('a::attr(href)').getall():
if self.should_follow_link(link):
new_job_data = {
'job_id': job_data['job_id'],
'url': response.urljoin(link),
'attempts': 0,
}
self.job_manager.redis.lpush(
self.job_manager.jobs_queue,
json.dumps(new_job_data)
)
Master-Worker Coordination
# coordinator/master.py
import time
import subprocess
from multiprocessing import Process
from distributors.job_manager import DistributedJobManager
class CrawlMaster:
def __init__(self, num_workers=4):
self.num_workers = num_workers
self.job_manager = DistributedJobManager()
self.workers = []
def start_crawl(self, spider_name, urls, config=None):
"""Start distributed crawl with multiple workers"""
# Create the main job
job_id = self.job_manager.create_crawl_job(spider_name, urls, config)
# Start worker processes
for i in range(self.num_workers):
worker_id = f'worker_{i}'
worker_process = Process(
target=self.start_worker,
args=(spider_name, worker_id, job_id)
)
worker_process.start()
self.workers.append(worker_process)
return job_id
def start_worker(self, spider_name, worker_id, job_id):
"""Start individual worker process"""
cmd = [
'scrapy', 'crawl', spider_name,
'-a', f'worker_id={worker_id}',
'-s', f'JOB_ID={job_id}',
]
subprocess.run(cmd)
def monitor_crawl(self, job_id):
"""Monitor crawl progress"""
while True:
job_status = self.job_manager.redis.hget(f'job:{job_id}', 'status')
if job_status == b'completed':
break
# Get current statistics
queue_size = self.job_manager.redis.llen(self.job_manager.jobs_queue)
print(f"Job {job_id}: Queue size: {queue_size}")
time.sleep(10)
Database Optimization Strategies
Connection Pooling and Performance
# database/optimized_connection.py
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
import json
import logging
class OptimizedDatabaseManager:
def __init__(self, db_url, settings=None):
self.settings = settings or {}
# Configure connection pool
pool_settings = {
'pool_size': self.settings.get('pool_size', 20),
'max_overflow': self.settings.get('max_overflow', 30),
'pool_timeout': self.settings.get('pool_timeout', 30),
'pool_recycle': self.settings.get('pool_recycle', 3600),
'pool_pre_ping': self.settings.get('pool_pre_ping', True),
}
self.engine = create_engine(
db_url,
poolclass=QueuePool,
**pool_settings,
echo=False # Set to True for SQL debugging
)
self.batch_size = self.settings.get('batch_size', 1000)
self.current_batch = []
def batch_insert_items(self, items, job_id):
"""Optimized batch insert for items"""
if not items:
return
# Prepare batch data
batch_data = [
{
'item': json.dumps(dict(item)),
'job_id': job_id,
'created_at': 'NOW()'
}
for item in items
]
# Use COPY for maximum performance
with self.engine.connect() as conn:
# Use PostgreSQL COPY for bulk insert
copy_sql = """
COPY job_items (item, job_id, created_at)
FROM STDIN WITH CSV
"""
# For large batches, use COPY
if len(batch_data) > 100:
self._bulk_copy_insert(conn, batch_data)
else:
# For smaller batches, use regular insert
self._regular_batch_insert(conn, batch_data)
def _bulk_copy_insert(self, conn, batch_data):
"""Use PostgreSQL COPY for maximum performance"""
import io
import csv
# Create CSV buffer
buffer = io.StringIO()
writer = csv.writer(buffer)
for item in batch_data:
writer.writerow([
item['item'],
item['job_id'],
'NOW()'
])
buffer.seek(0)
# Use raw connection for COPY
raw_conn = conn.connection
cursor = raw_conn.cursor()
try:
cursor.copy_expert(
"COPY job_items (item, job_id, created_at) FROM STDIN WITH CSV",
buffer
)
raw_conn.commit()
except Exception as e:
raw_conn.rollback()
raise e
finally:
cursor.close()
Advanced Pipeline Patterns
Multi-Stage Processing Pipeline
# pipelines/advanced_pipeline.py
import asyncio
import aioredis
from concurrent.futures import ThreadPoolExecutor
from scrapy.utils.defer import deferred_from_coro
class AsyncProcessingPipeline:
def __init__(self, settings):
self.redis_url = settings.get('REDIS_URL', 'redis://localhost:6379')
self.thread_pool = ThreadPoolExecutor(max_workers=10)
self.processing_queue = 'scrapy:processing'
def open_spider(self, spider):
"""Initialize async components"""
self.redis = None
# Initialize Redis connection
deferred_from_coro(self._init_redis())
async def _init_redis(self):
self.redis = await aioredis.from_url(self.redis_url)
def process_item(self, item, spider):
"""Process item with multiple stages"""
# Stage 1: Immediate validation
self._validate_item(item)
# Stage 2: Async enrichment
return deferred_from_coro(self._async_process_item(item, spider))
async def _async_process_item(self, item, spider):
"""Async processing stages"""
# Stage 2: Data enrichment
enriched_item = await self._enrich_item(item)
# Stage 3: External API calls
api_data = await self._fetch_external_data(enriched_item)
enriched_item.update(api_data)
# Stage 4: Cache results
await self._cache_item(enriched_item)
# Stage 5: Queue for further processing
await self._queue_for_postprocessing(enriched_item)
return enriched_item
async def _enrich_item(self, item):
"""Add computed fields and metadata"""
# Add processing timestamp
item['processed_at'] = datetime.now().isoformat()
# Add computed fields
if 'price' in item and 'original_price' in item:
item['discount_percentage'] = (
(item['original_price'] - item['price']) / item['original_price'] * 100
)
return item
async def _fetch_external_data(self, item):
"""Fetch additional data from external APIs"""
# Example: Fetch additional product data
if 'product_id' in item:
# Simulate API call
await asyncio.sleep(0.1)
return {
'external_rating': 4.5,
'external_reviews': 123,
}
return {}
async def _cache_item(self, item):
"""Cache processed item in Redis"""
if self.redis:
cache_key = f"item:{item.get('id', 'unknown')}"
await self.redis.setex(
cache_key,
3600, # 1 hour TTL
json.dumps(dict(item))
)
async def _queue_for_postprocessing(self, item):
"""Queue item for additional processing"""
if self.redis:
await self.redis.lpush(
self.processing_queue,
json.dumps(dict(item))
)
Real-time Analytics Pipeline
# pipelines/analytics_pipeline.py
import time
from collections import defaultdict, deque
from threading import Lock
class RealTimeAnalyticsPipeline:
def __init__(self, settings):
self.window_size = settings.getint('ANALYTICS_WINDOW_SIZE', 60) # 60 seconds
self.metrics = defaultdict(deque)
self.metrics_lock = Lock()
self.last_report = time.time()
self.report_interval = settings.getint('ANALYTICS_REPORT_INTERVAL', 30)
def process_item(self, item, spider):
"""Collect real-time metrics"""
current_time = time.time()
with self.metrics_lock:
# Update metrics
self.metrics['items_per_second'].append(current_time)
self.metrics['categories'][item.get('category', 'unknown')] += 1
if 'price' in item:
self.metrics['price_distribution'].append(float(item['price']))
# Clean old metrics (sliding window)
self._clean_old_metrics(current_time)
# Generate reports periodically
if current_time - self.last_report > self.report_interval:
self._generate_analytics_report(spider)
self.last_report = current_time
return item
def _clean_old_metrics(self, current_time):
"""Remove metrics outside the time window"""
cutoff_time = current_time - self.window_size
# Clean time-based metrics
while (self.metrics['items_per_second'] and
self.metrics['items_per_second'][0] < cutoff_time):
self.metrics['items_per_second'].popleft()
# Clean price distribution (keep only recent prices)
if len(self.metrics['price_distribution']) > 1000:
self.metrics['price_distribution'] = deque(
list(self.metrics['price_distribution'])[-500:],
maxlen=1000
)
def _generate_analytics_report(self, spider):
"""Generate and log analytics report"""
items_per_second = len(self.metrics['items_per_second']) / self.window_size
price_stats = {}
if self.metrics['price_distribution']:
prices = list(self.metrics['price_distribution'])
price_stats = {
'avg_price': sum(prices) / len(prices),
'min_price': min(prices),
'max_price': max(prices),
}
report = {
'timestamp': time.time(),
'items_per_second': round(items_per_second, 2),
'category_distribution': dict(self.metrics['categories']),
'price_statistics': price_stats,
}
spider.logger.info(f"Analytics Report: {report}")
# Send to external monitoring system
self._send_to_monitoring(report)
def _send_to_monitoring(self, report):
"""Send metrics to external monitoring system"""
# Integration with Prometheus, DataDog, etc.
pass
Integration with External Systems
Webhook Integration
# integrations/webhook_notifier.py
import requests
import json
from datetime import datetime
class WebhookNotificationPipeline:
def __init__(self, settings):
self.webhook_urls = settings.getlist('WEBHOOK_URLS', [])
self.notification_types = settings.getlist('WEBHOOK_TYPES', ['item_scraped', 'spider_closed'])
self.batch_size = settings.getint('WEBHOOK_BATCH_SIZE', 10)
self.pending_notifications = []
def process_item(self, item, spider):
"""Queue item notifications"""
if 'item_scraped' in self.notification_types:
notification = {
'type': 'item_scraped',
'spider': spider.name,
'job_id': getattr(spider, 'job_id', 'unknown'),
'item_count': getattr(spider, 'item_count', 0) + 1,
'timestamp': datetime.now().isoformat(),
'sample_item': dict(item) # Include sample data
}
self.pending_notifications.append(notification)
# Send batch when threshold reached
if len(self.pending_notifications) >= self.batch_size:
self._send_batch_notifications()
return item
def spider_closed(self, spider):
"""Send final notifications"""
if 'spider_closed' in self.notification_types:
stats = spider.crawler.stats.get_stats()
notification = {
'type': 'spider_closed',
'spider': spider.name,
'job_id': getattr(spider, 'job_id', 'unknown'),
'final_stats': {
'items_scraped': stats.get('item_scraped_count', 0),
'requests_count': stats.get('downloader/request_count', 0),
'duration': stats.get('elapsed_time_seconds', 0),
},
'timestamp': datetime.now().isoformat(),
}
self.pending_notifications.append(notification)
# Send any remaining notifications
self._send_batch_notifications()
def _send_batch_notifications(self):
"""Send batch of notifications to all webhooks"""
if not self.pending_notifications:
return
payload = {
'batch_id': str(uuid.uuid4()),
'timestamp': datetime.now().isoformat(),
'notifications': self.pending_notifications.copy()
}
for webhook_url in self.webhook_urls:
try:
response = requests.post(
webhook_url,
json=payload,
timeout=5,
headers={'Content-Type': 'application/json'}
)
response.raise_for_status()
except Exception as e:
logger.warning(f"Webhook notification failed for {webhook_url}: {e}")
self.pending_notifications.clear()
Next Steps
Production Deployment - Complete production deployment guide
Troubleshooting - Common issues and solutions
Pipelines API Reference - Detailed API reference