Extensions API Reference

Minimal, auto-generated API docs for extensions.

LoggingExtension

class scrapy_item_ingest.LoggingExtension(settings)[source]

Bases: BaseExtension

Extension for logging spider events to the database.

__init__(settings)[source]
classmethod from_crawler(crawler)[source]

Create an extension instance from crawler.

spider_opened(spider)[source]

Called when a spider is opened.

spider_closed(spider, reason)[source]

Called when a spider is closed.

engine_stopped()[source]

Called when the Scrapy engine stops.

Notes

  • Persists spider and selected Scrapy logs to job_logs.

  • Configure inclusion/exclusion via settings: LOG_DB_LEVEL, LOG_DB_CAPTURE_LEVEL, LOG_DB_LOGGERS, LOG_DB_EXCLUDE_LOGGERS, LOG_DB_EXCLUDE_PATTERNS.

  • See configuration for settings and Quickstart/Examples for usage.

    Called when an item is dropped by a pipeline.

    param item:

    The dropped item

    type item:

    dict or scrapy.Item

    param response:

    The response from which the item was scraped

    type response:

    scrapy.Response

    param exception:

    The exception that caused the drop

    type exception:

    Exception

    param spider:

    The spider instance

    type spider:

    scrapy.Spider

Base Extension Class

BaseExtension

class scrapy_item_ingest.extensions.base.BaseExtension(settings)[source]

Bases: object

Base extension with common functionality

Base class for all extensions. Provides common functionality for database operations and logging.

Methods:

from_crawler(cls, crawler)[source]

Create extension instance from crawler. Class method used by Scrapy.

Parameters:

crawler (scrapy.crawler.Crawler) – The crawler instance

Returns:

Extension instance

Return type:

BaseExtension

get_database_connection()

Get database connection for logging operations.

Returns:

Database connection

Return type:

psycopg2.connection

log_message(spider, log_type, message)

Store log message in database.

Parameters:
  • spider (scrapy.Spider) – The spider instance

  • log_type (str) – Type of log message (INFO, ERROR, WARNING, etc.)

  • message (str) – Log message content

get_job_id(spider)

Get job ID from spider or settings.

Parameters:

spider (scrapy.Spider) – The spider instance

Returns:

Job identifier

Return type:

str or int

__init__(settings)[source]
classmethod from_crawler(crawler)[source]

Create extension instance from crawler

get_identifier_info(spider)[source]

Get identifier column and value for the spider

Custom Extension Development

Creating Custom Extensions

You can create custom extensions by inheriting from BaseExtension:

from scrapy_item_ingest.extensions.base import BaseExtension
from scrapy import signals
import time

class PerformanceMonitoringExtension(BaseExtension):
    """Custom extension for monitoring spider performance"""

    def __init__(self, settings):
        super().__init__(settings)
        self.start_time = None
        self.item_count = 0
        self.error_count = 0

    @classmethod
    def from_crawler(cls, crawler):
        ext = cls(crawler.settings)

        # Connect to Scrapy signals
        crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
        crawler.signals.connect(ext.item_scraped, signal=signals.item_scraped)
        crawler.signals.connect(ext.spider_error, signal=signals.spider_error)

        return ext

    def spider_opened(self, spider):
        self.start_time = time.time()
        self.log_message(spider, 'INFO', f'Performance monitoring started for {spider.name}')

    def spider_closed(self, spider):
        duration = time.time() - self.start_time
        rate = self.item_count / duration if duration > 0 else 0

        message = (f'Spider {spider.name} performance summary: '
                  f'Items: {self.item_count}, '
                  f'Errors: {self.error_count}, '
                  f'Duration: {duration:.2f}s, '
                  f'Rate: {rate:.2f} items/sec')

        self.log_message(spider, 'INFO', message)

    def item_scraped(self, item, response, spider):
        self.item_count += 1

        # Log milestone progress
        if self.item_count % 1000 == 0:
            elapsed = time.time() - self.start_time
            rate = self.item_count / elapsed
            message = f'Progress: {self.item_count} items scraped, rate: {rate:.2f} items/sec'
            self.log_message(spider, 'INFO', message)

    def spider_error(self, failure, response, spider):
        self.error_count += 1
        self.log_message(spider, 'ERROR', f'Spider error #{self.error_count}: {failure.value}')

Signal Handling

Extensions can connect to various Scrapy signals:

from scrapy import signals

class SignalHandlingExtension(BaseExtension):
    @classmethod
    def from_crawler(cls, crawler):
        ext = cls(crawler.settings)

        # Spider signals
        crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
        crawler.signals.connect(ext.spider_idle, signal=signals.spider_idle)
        crawler.signals.connect(ext.spider_error, signal=signals.spider_error)

        # Item signals
        crawler.signals.connect(ext.item_scraped, signal=signals.item_scraped)
        crawler.signals.connect(ext.item_dropped, signal=signals.item_dropped)

        # Request/Response signals
        crawler.signals.connect(ext.request_scheduled, signal=signals.request_scheduled)
        crawler.signals.connect(ext.response_received, signal=signals.response_received)
        crawler.signals.connect(ext.request_dropped, signal=signals.request_dropped)

        return ext

Extension Configuration

Settings and Parameters

Extensions can be configured through Scrapy settings:

class ConfigurableExtension(BaseExtension):
    def __init__(self, settings):
        super().__init__(settings)

        # Read custom settings
        self.monitoring_interval = settings.getint('MONITORING_INTERVAL', 60)
        self.alert_threshold = settings.getfloat('ERROR_THRESHOLD', 0.05)
        self.notification_url = settings.get('NOTIFICATION_WEBHOOK_URL')

        # Validate configuration
        if self.monitoring_interval < 10:
            raise ValueError("MONITORING_INTERVAL must be at least 10 seconds")

# In settings.py
EXTENSIONS = {
    'myproject.extensions.ConfigurableExtension': 500,
}

# Extension settings
MONITORING_INTERVAL = 120  # 2 minutes
ERROR_THRESHOLD = 0.10     # 10% error rate
NOTIFICATION_WEBHOOK_URL = 'https://hooks.slack.com/...'

Integration Examples

Webhook Notifications

import requests
import json
from datetime import datetime

class WebhookNotificationExtension(BaseExtension):
    def __init__(self, settings):
        super().__init__(settings)
        self.webhook_url = settings.get('WEBHOOK_URL')
        self.notification_interval = settings.getint('NOTIFICATION_INTERVAL', 1000)
        self.item_count = 0

    def item_scraped(self, item, response, spider):
        self.item_count += 1

        if self.item_count % self.notification_interval == 0:
            self.send_progress_notification(spider)

    def spider_closed(self, spider):
        self.send_completion_notification(spider)

    def send_progress_notification(self, spider):
        if not self.webhook_url:
            return

        payload = {
            'type': 'progress',
            'spider': spider.name,
            'items_scraped': self.item_count,
            'timestamp': datetime.now().isoformat()
        }

        try:
            response = requests.post(self.webhook_url, json=payload, timeout=5)
            response.raise_for_status()
            self.log_message(spider, 'INFO', f'Progress notification sent: {self.item_count} items')
        except Exception as e:
            self.log_message(spider, 'WARNING', f'Webhook notification failed: {e}')

    def send_completion_notification(self, spider):
        if not self.webhook_url:
            return

        stats = spider.crawler.stats.get_stats()
        payload = {
            'type': 'completed',
            'spider': spider.name,
            'final_stats': {
                'items_scraped': stats.get('item_scraped_count', 0),
                'requests_count': stats.get('downloader/request_count', 0),
                'duration': stats.get('elapsed_time_seconds', 0),
            },
            'timestamp': datetime.now().isoformat()
        }

        try:
            response = requests.post(self.webhook_url, json=payload, timeout=5)
            response.raise_for_status()
            self.log_message(spider, 'INFO', 'Completion notification sent')
        except Exception as e:
            self.log_message(spider, 'WARNING', f'Completion notification failed: {e}')

Metrics Collection

from prometheus_client import Counter, Gauge, Histogram, start_http_server

class PrometheusMetricsExtension(BaseExtension):
    def __init__(self, settings):
        super().__init__(settings)

        # Prometheus metrics
        self.items_total = Counter('scrapy_items_scraped_total', 'Total items scraped', ['spider'])
        self.requests_total = Counter('scrapy_requests_total', 'Total requests made', ['spider', 'status'])
        self.response_time = Histogram('scrapy_response_time_seconds', 'Response time', ['spider'])
        self.active_spiders = Gauge('scrapy_active_spiders', 'Number of active spiders')

        # Start metrics server
        metrics_port = settings.getint('PROMETHEUS_PORT', 8000)
        start_http_server(metrics_port)

    def item_scraped(self, item, response, spider):
        self.items_total.labels(spider=spider.name).inc()

    def response_received(self, response, request, spider):
        self.requests_total.labels(
            spider=spider.name,
            status=response.status
        ).inc()

        # Calculate response time
        if hasattr(request, 'meta') and 'download_slot' in request.meta:
            response_time = getattr(response, 'download_latency', 0)
            self.response_time.labels(spider=spider.name).observe(response_time)

    def spider_opened(self, spider):
        self.active_spiders.inc()

    def spider_closed(self, spider):
        self.active_spiders.dec()

Testing Extensions

Unit Testing

import unittest
from unittest.mock import Mock, patch
from scrapy_item_ingest.extensions.logging import LoggingExtension

class TestLoggingExtension(unittest.TestCase):
    def setUp(self):
        self.settings = Mock()
        self.settings.get.return_value = 'postgresql://test:test@localhost/test'
        self.extension = LoggingExtension(self.settings)

    def test_spider_opened_logs_event(self):
        spider = Mock()
        spider.name = 'test_spider'

        with patch.object(self.extension, 'log_message') as mock_log:
            self.extension.spider_opened(spider)
            mock_log.assert_called_once()

            # Verify log message
            call_args = mock_log.call_args[0]
            self.assertEqual(call_args[1], 'INFO')
            self.assertIn('Spider opened', call_args[2])

    def test_error_logging(self):
        spider = Mock()
        failure = Mock()
        failure.value = Exception("Test error")

        with patch.object(self.extension, 'log_message') as mock_log:
            self.extension.spider_error(failure, None, spider)
            mock_log.assert_called_once_with(spider, 'ERROR', 'Spider error: Test error')

Integration Testing

from scrapy.utils.test import get_crawler
from scrapy_item_ingest.extensions.logging import LoggingExtension

def test_extension_integration():
    # Create test crawler
    crawler = get_crawler(spidercls=None, settings_dict={
        'EXTENSIONS': {
            'scrapy_item_ingest.LoggingExtension': 500,
        },
        'DB_URL': 'postgresql://test:test@localhost/test_db'
    })

    # Get extension instance
    extension = LoggingExtension.from_crawler(crawler)

    # Test extension functionality
    spider = Mock()
    spider.name = 'test_spider'

    extension.spider_opened(spider)
    # Verify database logging occurred

Error Handling

Exception Handling in Extensions

class RobustExtension(BaseExtension):
    def spider_error(self, failure, response, spider):
        try:
            # Log the error
            error_msg = f"Spider error: {failure.value}"
            self.log_message(spider, 'ERROR', error_msg)

            # Additional error processing
            self.handle_error_notification(spider, failure)

        except Exception as e:
            # Fallback logging if database fails
            spider.logger.error(f"Extension error handling failed: {e}")

    def handle_error_notification(self, spider, failure):
        """Handle error notifications with proper exception handling"""