Source code for scrapy_item_ingest.extensions.logging

"""
Logging extension for capturing spider errors and logs and saving them to the database.
"""
from __future__ import annotations

import logging
import threading
from typing import List

from scrapy import signals
from scrapy.spiders import Spider
from scrapy.crawler import Crawler

from .base import BaseExtension

logger = logging.getLogger(__name__)


class ScrapyAndRootFilter(logging.Filter):
    """
    A logging filter that allows records from the 'root' logger and any logger
    within the 'scrapy' namespace.
    """
    def filter(self, record: logging.LogRecord) -> bool:
        # Allow logs from the spider itself (which might not be in 'scrapy' namespace)
        if hasattr(record, 'spider_name') and record.name == getattr(record, 'spider_name', None):
            return True
        return record.name == 'root' or record.name.startswith('scrapy')


class DatabaseLogHandler(logging.Handler):
    """
    Custom logging handler to save log records to the database in real-time.
    """
    _local = threading.local()

    def __init__(self, extension: 'LoggingExtension', spider: Spider):
        super().__init__()
        self.extension = extension
        self.spider = spider

    def emit(self, record: logging.LogRecord):
        if getattr(self._local, 'in_emit', False):
            return  # Prevent recursion

        # Avoid capturing logs generated by this extension's own exceptions
        if 'extensions/logging.py' in record.pathname:
            return

        self._local.in_emit = True
        try:
            # Add spider name to record for the filter
            record.spider_name = self.spider.name
            msg = self.format(record)
            level = record.levelname
            # Log directly to the database in real-time
            self.extension._log_to_database(self.spider, level, msg)
        except Exception:
            # Use logger directly to avoid recursion if formatting fails
            logger.exception("Error in DatabaseLogHandler.emit")
        finally:
            self._local.in_emit = False


[docs] class LoggingExtension(BaseExtension): """ Extension for logging spider events to the database. """
[docs] def __init__(self, settings): super().__init__(settings) crawler_settings = self.settings.crawler_settings self.log_level = crawler_settings.get('LOG_LEVEL', 'INFO').upper() self.log_format = '%(asctime)s [%(name)s] %(levelname)s: %(message)s' self.log_dateformat = '%Y-%m-%d %H:%M:%S' self._db_log_handler: DatabaseLogHandler | None = None self._root_logger_ref: logging.Logger | None = None
[docs] @classmethod def from_crawler(cls, crawler: Crawler): """Create an extension instance from crawler.""" ext = super().from_crawler(crawler) crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened) crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed) crawler.signals.connect(ext.engine_stopped, signal=signals.engine_stopped) return ext
[docs] def spider_opened(self, spider: Spider): """Called when a spider is opened.""" handler = DatabaseLogHandler(self, spider) level = getattr(logging, self.log_level, logging.INFO) handler.setLevel(level) formatter = logging.Formatter(fmt=self.log_format, datefmt=self.log_dateformat) handler.setFormatter(formatter) handler.addFilter(ScrapyAndRootFilter()) self._db_log_handler = handler root_logger = logging.getLogger() if not any(isinstance(h, DatabaseLogHandler) for h in root_logger.handlers): root_logger.addHandler(handler) self._root_logger_ref = root_logger identifier_column, identifier_value = self.get_identifier_info(spider) message = f"{identifier_column.title()} {identifier_value} started" spider.logger.info(message)
[docs] def spider_closed(self, spider: Spider, reason: str): """Called when a spider is closed.""" identifier_column, identifier_value = self.get_identifier_info(spider) message = f"{identifier_column.title()} {identifier_value} closed with reason: {reason}" spider.logger.info(message) self._cleanup()
[docs] def engine_stopped(self): """Called when the Scrapy engine stops.""" self._cleanup()
def _cleanup(self): """Removes the log handler.""" if self._db_log_handler and self._root_logger_ref: self._root_logger_ref.removeHandler(self._db_log_handler) self._db_log_handler.close() self._db_log_handler = None self._root_logger_ref = None