Advanced Configurations

This section covers complex configuration scenarios and advanced usage patterns for Scrapy Item Ingest in enterprise and high-performance environments.

Multi-Environment Setup

Production-Grade Environment Configuration

A complete multi-environment setup with proper configuration management:

# config/base.py - Base configuration
import os
from pathlib import Path

class BaseConfig:
    # Project settings
    BOT_NAME = 'enterprise_scraper'
    SPIDER_MODULES = ['enterprise_scraper.spiders']
    NEWSPIDER_MODULE = 'enterprise_scraper.spiders'

    # Default Scrapy settings
    ROBOTSTXT_OBEY = True
    USER_AGENT = 'enterprise_scraper (+https://yourcompany.com)'

    # Database settings (will be overridden by environment)
    CREATE_TABLES = False

    # Pipeline configuration
    ITEM_PIPELINES = {
        'enterprise_scraper.pipelines.ValidationPipeline': 200,
        'scrapy_item_ingest.DbInsertPipeline': 300,
        'enterprise_scraper.pipelines.NotificationPipeline': 400,
    }

    # Extensions
    EXTENSIONS = {
        'scrapy_item_ingest.LoggingExtension': 500,
        'enterprise_scraper.extensions.MetricsExtension': 510,
    }
# config/development.py - Development environment
from .base import BaseConfig

class DevelopmentConfig(BaseConfig):
    # Database
    DB_URL = 'postgresql://dev_user:dev_pass@localhost:5432/scrapy_dev'
    CREATE_TABLES = True

    # Performance settings for development
    CONCURRENT_REQUESTS = 8
    DOWNLOAD_DELAY = 1
    RANDOMIZE_DOWNLOAD_DELAY = 0.5

    # Logging
    LOG_LEVEL = 'DEBUG'

    # Limit items for testing
    CLOSESPIDER_ITEMCOUNT = 100
# config/staging.py - Staging environment
from .base import BaseConfig
import os

class StagingConfig(BaseConfig):
    # Database
    DB_URL = os.getenv('STAGING_DATABASE_URL')
    CREATE_TABLES = True

    # Performance settings
    CONCURRENT_REQUESTS = 16
    CONCURRENT_REQUESTS_PER_DOMAIN = 8
    DOWNLOAD_DELAY = 0.5

    # Autothrottle for staging
    AUTOTHROTTLE_ENABLED = True
    AUTOTHROTTLE_START_DELAY = 1
    AUTOTHROTTLE_MAX_DELAY = 10
    AUTOTHROTTLE_TARGET_CONCURRENCY = 4.0

    # Logging
    LOG_LEVEL = 'INFO'
# config/production.py - Production environment
from .base import BaseConfig
import os

class ProductionConfig(BaseConfig):
    # Database (from environment variables)
    DB_URL = os.getenv('DATABASE_URL')
    CREATE_TABLES = False  # Tables must exist in production

    # High-performance settings
    CONCURRENT_REQUESTS = 32
    CONCURRENT_REQUESTS_PER_DOMAIN = 16
    DOWNLOAD_DELAY = 0.1

    # Advanced autothrottle
    AUTOTHROTTLE_ENABLED = True
    AUTOTHROTTLE_START_DELAY = 0.1
    AUTOTHROTTLE_MAX_DELAY = 5
    AUTOTHROTTLE_TARGET_CONCURRENCY = 8.0
    AUTOTHROTTLE_DEBUG = False

    # Production logging
    LOG_LEVEL = 'WARNING'
    TELNETCONSOLE_ENABLED = False

    # Database connection pooling
    DB_SETTINGS = {
        'pool_size': 20,
        'max_overflow': 30,
        'pool_timeout': 30,
        'pool_recycle': 3600,
        'pool_pre_ping': True,
    }

Environment Loader

# settings.py - Dynamic environment loading
import os
from importlib import import_module

def get_config():
    env = os.getenv('SCRAPY_ENV', 'development')
    config_map = {
        'development': 'config.development.DevelopmentConfig',
        'staging': 'config.staging.StagingConfig',
        'production': 'config.production.ProductionConfig',
    }

    config_path = config_map.get(env)
    if not config_path:
        raise ValueError(f"Unknown environment: {env}")

    module_path, class_name = config_path.rsplit('.', 1)
    module = import_module(module_path)
    return getattr(module, class_name)

# Load configuration based on environment
config = get_config()

# Apply all settings from config class
for setting in dir(config):
    if setting.isupper():
        globals()[setting] = getattr(config, setting)

Running with Different Environments

# Development
SCRAPY_ENV=development scrapy crawl products

# Staging
SCRAPY_ENV=staging scrapy crawl products

# Production
SCRAPY_ENV=production scrapy crawl products

Distributed Crawling Architecture

Redis-Based Job Queue System

# distributors/job_manager.py
import redis
import json
import uuid
from datetime import datetime

class DistributedJobManager:
    def __init__(self, redis_url='redis://localhost:6379'):
        self.redis = redis.from_url(redis_url)
        self.jobs_queue = 'scrapy:jobs'
        self.results_queue = 'scrapy:results'

    def create_crawl_job(self, spider_name, urls, config=None):
        """Create a distributed crawl job"""
        job_id = str(uuid.uuid4())

        job_data = {
            'job_id': job_id,
            'spider_name': spider_name,
            'urls': urls,
            'config': config or {},
            'status': 'pending',
            'created_at': datetime.now().isoformat(),
            'worker_id': None,
        }

        # Store job metadata
        self.redis.hset(f'job:{job_id}', mapping=job_data)

        # Queue URLs for processing
        for url in urls:
            url_data = {
                'job_id': job_id,
                'url': url,
                'attempts': 0,
            }
            self.redis.lpush(self.jobs_queue, json.dumps(url_data))

        return job_id

    def get_next_job(self, worker_id):
        """Get next job for worker"""
        job_data = self.redis.brpop(self.jobs_queue, timeout=30)
        if job_data:
            job = json.loads(job_data[1])
            job['worker_id'] = worker_id
            job['started_at'] = datetime.now().isoformat()

            # Update job status
            self.redis.hset(f"job:{job['job_id']}", 'status', 'processing')
            self.redis.hset(f"job:{job['job_id']}", 'worker_id', worker_id)

            return job
        return None

    def mark_job_completed(self, job_id, stats):
        """Mark job as completed with statistics"""
        self.redis.hset(f'job:{job_id}', mapping={
            'status': 'completed',
            'completed_at': datetime.now().isoformat(),
            'stats': json.dumps(stats),
        })
# spiders/distributed_spider.py
import scrapy
import json
from distributors.job_manager import DistributedJobManager

class DistributedSpider(scrapy.Spider):
    name = 'distributed'

    def __init__(self, worker_id=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.worker_id = worker_id or f'worker_{os.getpid()}'
        self.job_manager = DistributedJobManager()
        self.current_job = None

    def start_requests(self):
        """Get jobs from distributed queue"""
        while True:
            job = self.job_manager.get_next_job(self.worker_id)
            if not job:
                break

            self.current_job = job
            self.logger.info(f"Processing job {job['job_id']} URL: {job['url']}")

            yield scrapy.Request(
                job['url'],
                self.parse,
                meta={'job_data': job}
            )

    def parse(self, response):
        job_data = response.meta['job_data']

        # Extract data based on spider logic
        for item in self.extract_items(response):
            # Add job metadata to items
            item['job_id'] = job_data['job_id']
            item['worker_id'] = self.worker_id
            yield item

        # Follow links and add to queue if needed
        for link in response.css('a::attr(href)').getall():
            if self.should_follow_link(link):
                new_job_data = {
                    'job_id': job_data['job_id'],
                    'url': response.urljoin(link),
                    'attempts': 0,
                }
                self.job_manager.redis.lpush(
                    self.job_manager.jobs_queue,
                    json.dumps(new_job_data)
                )

Master-Worker Coordination

# coordinator/master.py
import time
import subprocess
from multiprocessing import Process
from distributors.job_manager import DistributedJobManager

class CrawlMaster:
    def __init__(self, num_workers=4):
        self.num_workers = num_workers
        self.job_manager = DistributedJobManager()
        self.workers = []

    def start_crawl(self, spider_name, urls, config=None):
        """Start distributed crawl with multiple workers"""
        # Create the main job
        job_id = self.job_manager.create_crawl_job(spider_name, urls, config)

        # Start worker processes
        for i in range(self.num_workers):
            worker_id = f'worker_{i}'
            worker_process = Process(
                target=self.start_worker,
                args=(spider_name, worker_id, job_id)
            )
            worker_process.start()
            self.workers.append(worker_process)

        return job_id

    def start_worker(self, spider_name, worker_id, job_id):
        """Start individual worker process"""
        cmd = [
            'scrapy', 'crawl', spider_name,
            '-a', f'worker_id={worker_id}',
            '-s', f'JOB_ID={job_id}',
        ]
        subprocess.run(cmd)

    def monitor_crawl(self, job_id):
        """Monitor crawl progress"""
        while True:
            job_status = self.job_manager.redis.hget(f'job:{job_id}', 'status')
            if job_status == b'completed':
                break

            # Get current statistics
            queue_size = self.job_manager.redis.llen(self.job_manager.jobs_queue)
            print(f"Job {job_id}: Queue size: {queue_size}")

            time.sleep(10)

Database Optimization Strategies

Connection Pooling and Performance

# database/optimized_connection.py
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
import json
import logging

class OptimizedDatabaseManager:
    def __init__(self, db_url, settings=None):
        self.settings = settings or {}

        # Configure connection pool
        pool_settings = {
            'pool_size': self.settings.get('pool_size', 20),
            'max_overflow': self.settings.get('max_overflow', 30),
            'pool_timeout': self.settings.get('pool_timeout', 30),
            'pool_recycle': self.settings.get('pool_recycle', 3600),
            'pool_pre_ping': self.settings.get('pool_pre_ping', True),
        }

        self.engine = create_engine(
            db_url,
            poolclass=QueuePool,
            **pool_settings,
            echo=False  # Set to True for SQL debugging
        )

        self.batch_size = self.settings.get('batch_size', 1000)
        self.current_batch = []

    def batch_insert_items(self, items, job_id):
        """Optimized batch insert for items"""
        if not items:
            return

        # Prepare batch data
        batch_data = [
            {
                'item': json.dumps(dict(item)),
                'job_id': job_id,
                'created_at': 'NOW()'
            }
            for item in items
        ]

        # Use COPY for maximum performance
        with self.engine.connect() as conn:
            # Use PostgreSQL COPY for bulk insert
            copy_sql = """
                COPY job_items (item, job_id, created_at)
                FROM STDIN WITH CSV
            """

            # For large batches, use COPY
            if len(batch_data) > 100:
                self._bulk_copy_insert(conn, batch_data)
            else:
                # For smaller batches, use regular insert
                self._regular_batch_insert(conn, batch_data)

    def _bulk_copy_insert(self, conn, batch_data):
        """Use PostgreSQL COPY for maximum performance"""
        import io
        import csv

        # Create CSV buffer
        buffer = io.StringIO()
        writer = csv.writer(buffer)

        for item in batch_data:
            writer.writerow([
                item['item'],
                item['job_id'],
                'NOW()'
            ])

        buffer.seek(0)

        # Use raw connection for COPY
        raw_conn = conn.connection
        cursor = raw_conn.cursor()

        try:
            cursor.copy_expert(
                "COPY job_items (item, job_id, created_at) FROM STDIN WITH CSV",
                buffer
            )
            raw_conn.commit()
        except Exception as e:
            raw_conn.rollback()
            raise e
        finally:
            cursor.close()

Advanced Pipeline Patterns

Multi-Stage Processing Pipeline

# pipelines/advanced_pipeline.py
import asyncio
import aioredis
from concurrent.futures import ThreadPoolExecutor
from scrapy.utils.defer import deferred_from_coro

class AsyncProcessingPipeline:
    def __init__(self, settings):
        self.redis_url = settings.get('REDIS_URL', 'redis://localhost:6379')
        self.thread_pool = ThreadPoolExecutor(max_workers=10)
        self.processing_queue = 'scrapy:processing'

    def open_spider(self, spider):
        """Initialize async components"""
        self.redis = None
        # Initialize Redis connection
        deferred_from_coro(self._init_redis())

    async def _init_redis(self):
        self.redis = await aioredis.from_url(self.redis_url)

    def process_item(self, item, spider):
        """Process item with multiple stages"""
        # Stage 1: Immediate validation
        self._validate_item(item)

        # Stage 2: Async enrichment
        return deferred_from_coro(self._async_process_item(item, spider))

    async def _async_process_item(self, item, spider):
        """Async processing stages"""
        # Stage 2: Data enrichment
        enriched_item = await self._enrich_item(item)

        # Stage 3: External API calls
        api_data = await self._fetch_external_data(enriched_item)
        enriched_item.update(api_data)

        # Stage 4: Cache results
        await self._cache_item(enriched_item)

        # Stage 5: Queue for further processing
        await self._queue_for_postprocessing(enriched_item)

        return enriched_item

    async def _enrich_item(self, item):
        """Add computed fields and metadata"""
        # Add processing timestamp
        item['processed_at'] = datetime.now().isoformat()

        # Add computed fields
        if 'price' in item and 'original_price' in item:
            item['discount_percentage'] = (
                (item['original_price'] - item['price']) / item['original_price'] * 100
            )

        return item

    async def _fetch_external_data(self, item):
        """Fetch additional data from external APIs"""
        # Example: Fetch additional product data
        if 'product_id' in item:
            # Simulate API call
            await asyncio.sleep(0.1)
            return {
                'external_rating': 4.5,
                'external_reviews': 123,
            }
        return {}

    async def _cache_item(self, item):
        """Cache processed item in Redis"""
        if self.redis:
            cache_key = f"item:{item.get('id', 'unknown')}"
            await self.redis.setex(
                cache_key,
                3600,  # 1 hour TTL
                json.dumps(dict(item))
            )

    async def _queue_for_postprocessing(self, item):
        """Queue item for additional processing"""
        if self.redis:
            await self.redis.lpush(
                self.processing_queue,
                json.dumps(dict(item))
            )

Real-time Analytics Pipeline

# pipelines/analytics_pipeline.py
import time
from collections import defaultdict, deque
from threading import Lock

class RealTimeAnalyticsPipeline:
    def __init__(self, settings):
        self.window_size = settings.getint('ANALYTICS_WINDOW_SIZE', 60)  # 60 seconds
        self.metrics = defaultdict(deque)
        self.metrics_lock = Lock()
        self.last_report = time.time()
        self.report_interval = settings.getint('ANALYTICS_REPORT_INTERVAL', 30)

    def process_item(self, item, spider):
        """Collect real-time metrics"""
        current_time = time.time()

        with self.metrics_lock:
            # Update metrics
            self.metrics['items_per_second'].append(current_time)
            self.metrics['categories'][item.get('category', 'unknown')] += 1

            if 'price' in item:
                self.metrics['price_distribution'].append(float(item['price']))

            # Clean old metrics (sliding window)
            self._clean_old_metrics(current_time)

            # Generate reports periodically
            if current_time - self.last_report > self.report_interval:
                self._generate_analytics_report(spider)
                self.last_report = current_time

        return item

    def _clean_old_metrics(self, current_time):
        """Remove metrics outside the time window"""
        cutoff_time = current_time - self.window_size

        # Clean time-based metrics
        while (self.metrics['items_per_second'] and
               self.metrics['items_per_second'][0] < cutoff_time):
            self.metrics['items_per_second'].popleft()

        # Clean price distribution (keep only recent prices)
        if len(self.metrics['price_distribution']) > 1000:
            self.metrics['price_distribution'] = deque(
                list(self.metrics['price_distribution'])[-500:],
                maxlen=1000
            )

    def _generate_analytics_report(self, spider):
        """Generate and log analytics report"""
        items_per_second = len(self.metrics['items_per_second']) / self.window_size

        price_stats = {}
        if self.metrics['price_distribution']:
            prices = list(self.metrics['price_distribution'])
            price_stats = {
                'avg_price': sum(prices) / len(prices),
                'min_price': min(prices),
                'max_price': max(prices),
            }

        report = {
            'timestamp': time.time(),
            'items_per_second': round(items_per_second, 2),
            'category_distribution': dict(self.metrics['categories']),
            'price_statistics': price_stats,
        }

        spider.logger.info(f"Analytics Report: {report}")

        # Send to external monitoring system
        self._send_to_monitoring(report)

    def _send_to_monitoring(self, report):
        """Send metrics to external monitoring system"""
        # Integration with Prometheus, DataDog, etc.
        pass

Integration with External Systems

Webhook Integration

# integrations/webhook_notifier.py
import requests
import json
from datetime import datetime

class WebhookNotificationPipeline:
    def __init__(self, settings):
        self.webhook_urls = settings.getlist('WEBHOOK_URLS', [])
        self.notification_types = settings.getlist('WEBHOOK_TYPES', ['item_scraped', 'spider_closed'])
        self.batch_size = settings.getint('WEBHOOK_BATCH_SIZE', 10)
        self.pending_notifications = []

    def process_item(self, item, spider):
        """Queue item notifications"""
        if 'item_scraped' in self.notification_types:
            notification = {
                'type': 'item_scraped',
                'spider': spider.name,
                'job_id': getattr(spider, 'job_id', 'unknown'),
                'item_count': getattr(spider, 'item_count', 0) + 1,
                'timestamp': datetime.now().isoformat(),
                'sample_item': dict(item)  # Include sample data
            }

            self.pending_notifications.append(notification)

            # Send batch when threshold reached
            if len(self.pending_notifications) >= self.batch_size:
                self._send_batch_notifications()

        return item

    def spider_closed(self, spider):
        """Send final notifications"""
        if 'spider_closed' in self.notification_types:
            stats = spider.crawler.stats.get_stats()
            notification = {
                'type': 'spider_closed',
                'spider': spider.name,
                'job_id': getattr(spider, 'job_id', 'unknown'),
                'final_stats': {
                    'items_scraped': stats.get('item_scraped_count', 0),
                    'requests_count': stats.get('downloader/request_count', 0),
                    'duration': stats.get('elapsed_time_seconds', 0),
                },
                'timestamp': datetime.now().isoformat(),
            }

            self.pending_notifications.append(notification)

        # Send any remaining notifications
        self._send_batch_notifications()

    def _send_batch_notifications(self):
        """Send batch of notifications to all webhooks"""
        if not self.pending_notifications:
            return

        payload = {
            'batch_id': str(uuid.uuid4()),
            'timestamp': datetime.now().isoformat(),
            'notifications': self.pending_notifications.copy()
        }

        for webhook_url in self.webhook_urls:
            try:
                response = requests.post(
                    webhook_url,
                    json=payload,
                    timeout=5,
                    headers={'Content-Type': 'application/json'}
                )
                response.raise_for_status()
            except Exception as e:
                logger.warning(f"Webhook notification failed for {webhook_url}: {e}")

        self.pending_notifications.clear()

Next Steps