import contextlib import datetime import hashlib import os import pickle import random import time from itertools import islice from typing import Union from urllib.parse import urlparse import dateutil import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from urllib3.exceptions import MaxRetryError from bs4 import BeautifulSoup, Comment, NavigableString from bs4.element import PageElement from util import ANSICodes as AC, failhandler, link as lk class ArticleBaseClass: _default_args = { 'cache': f'{os.getcwd()}/.cache', 'debug': False } def __init__(self, **kwargs): if getattr(self, '_isinit', False): return kwargs = dict(list(ArticleBaseClass._default_args.items())+list(kwargs.items())) if diff := set(kwargs.keys()).difference(ArticleBaseClass._default_args.keys()): raise ValueError(f"keyword{'s' if len(diff) > 1 else ''} {', '.join(diff)} unknown. supported: {', '.join(self._default_args)}") self.cache = kwargs.get('cache') self._debug = kwargs.get('debug') if self.cache: if isinstance(self.cache, bool): self.cache = ArticleBaseClass._default_args['cache'] os.makedirs(self.cache, exist_ok=True) self._hash = hashlib.sha256() # self.get_page = file_cache(cache_dir=self.cache, verbose=self._debug)(self.get_page) self._isinit = True def update_target_from_source(self, target: dict, source:dict): for k, v in target.items(): if isinstance(v, dict): if isinstance(sk := source.get(k), dict): self.update_target_from_source(v, sk) else: target[k] = source.get(k) def add_debug(self, target): if isinstance(target, dict): target['debug'] = self._debug for _, v in target.items(): if isinstance(v, dict): self.add_debug(v) # @file_cache(cache_dir=self.cache) def get_page(self, link): def _get_page(link): with self.get_session() as s: page = s.get(link) return page if self.cache: try: self._hash.update(link.encode()) fname = self._hash.hexdigest() with open(f'{self.cache.rstrip('/')}/{fname}', 'rb') as f: # print(' -> cache hit!') page = pickle.load(f) except FileNotFoundError: # print(' -> not yet in cache') page = _get_page(link) if self.cache: with open(f'{self.cache.rstrip('/')}/{fname}', 'wb') as f: pickle.dump(page, f) else: page = _get_page(link) return page def get_session(self): local_session = self.session or requests.Session() retry = Retry(connect=self._http_retries, backoff_factor=0.5) adapter = HTTPAdapter(max_retries=retry) local_session.mount('https://', adapter) local_session.mount('http://', adapter) return local_session def close_session(self, session=None): if session is None: if self.session is not None: self.session.close() else: session.close() ... ############ class ArticleTitle: _default_args = { 'debug': False} def __init__(self, title:str='', suptitle:str='', **kwargs) -> None: self._debug = kwargs.get('debug', self._default_args['debug']) self.title = ' '.join(title.strip().splitlines()) self.suptitle = ' '.join(suptitle.strip().splitlines()) def __repr__(self) -> str: return f'{self.title}' def __str__(self) -> str: return f'({self.suptitle}) {self.title}' ... ############ class ArticleDepartment: _default_args = { 'max_link_departments': 5, 'debug': False} def __init__(self, department:str='', link:str='', **kwargs) -> None: self._debug = kwargs.get('debug', self._default_args['debug']) self._max_link_departments = kwargs.get('max_link_departments', self._default_args['max_link_departments']) self.department = ' '.join(department.strip().splitlines()) # get departments from split url [example.com, ressort-1, ressort-2, ...] self.departments_link = urlparse(link).path.split('/')[1:-1] # generate link string self._link_str = ' > '.join(self.departments_link) # pad to max_link_departments self.departments_link = (self.departments_link+self._max_link_departments*[''])[:self._max_link_departments] def __repr__(self) -> str: return f'{self.department}' def __str__(self) -> str: return f'{self.department} ({self._link_str})' ... ############ class ArticleMetadata: _default_args = { 'department': ArticleDepartment._default_args, 'title': ArticleTitle._default_args, 'datetime_fmt': '%Y-%m-%d %H:%M:%S', 'debug': False} def __init__(self, html:Union[PageElement, None]=None, base_url:str='example.com', date:Union[datetime.datetime,None]=None, **kwargs): self._debug = kwargs.get('debug', self._default_args['debug']) self._datetime_fmt = kwargs.get('datetime_fmt', self._default_args['datetime_fmt']) self._title_kwargs = self._default_args['title'] if title_args := kwargs.get('title'): self.update_target_from_source(self._title_kwargs, title_args) self._add_debug(self._title_kwargs) self._department_kwargs = self._default_args['department'] if department_args := kwargs.get('department'): self.update_target_from_source(self._department_kwargs, department_args) self._add_debug(self._department_kwargs) self.page = None self.base_url = base_url if html is None: self.create_empty() else: self.authors = None self.parse_html(html, date) def update_target_from_source(self, target: dict, source:dict): for k, v in target.items(): if isinstance(v, dict): if isinstance(sk := source.get(k), dict): self.update_target_from_source(v, sk) else: target[k] = source.get(k) def _add_debug(self, target): if isinstance(target, dict): target['debug'] = self._debug for _, v in target.items(): if isinstance(v, dict): self._add_debug(v) def create_empty(self): self.link = '' self.time = datetime.time() self.title = ArticleTitle() self.department = ArticleDepartment() self.authors = None def parse_html(self, html:PageElement, date:Union[datetime.datetime,None]): try: href = html.find('a', {'class': 'stage-feed-item__link'}).attrs['href'] self.link = self.base_url+href except (AttributeError, KeyError): self.link = '' try: datestring = html.find('time').attrs['datetime'] self.time = dateutil.parser.parse(datestring).astimezone(dateutil.tz.tzlocal()) except (AttributeError, KeyError): self.time = dateutil.parser.parse(datestring).astimezone(dateutil.tz.tzlocal()) if date else datetime.datetime() try: title = html.find('span', {'class': 'stage-feed-item__headline'}).contents[0] except AttributeError: title = '' try: suptitle = html.find('span', {'class': 'stage-feed-item__kicker'}).contents[0] except AttributeError: suptitle = '' self.title = ArticleTitle(title, suptitle, **self._title_kwargs) try: department = html.find('span', {'class': 'stage-feed-item__channel'}).contents[0] except AttributeError: department = '' self.department = ArticleDepartment(department, self.link, **self._department_kwargs) def csv_line(self, delimiter:str=',', quote_char:str='"', newline=True): def _quote(s:str): return f'{quote_char}{s}{quote_char}' elements = [ self.time.strftime('%Y-%m-%d') if self.time else '0000-00-00', self.time.strftime('%H:%M:%S') if self.time else '00:00:00', # self.time.strftime('%Y') if self.time else '00', # self.time.strftime('%m') if self.time else '00', # self.time.strftime('%d') if self.time else '00', # self.time.strftime('%H') if self.time else '00', # self.time.strftime('%M') if self.time else '00', # self.time.strftime('%S') if self.time else '00', _quote(self.title.title if self.title else ''), _quote(self.title.suptitle if self.title else ''), _quote(self.department.department if self.department else ''), *[_quote(str(dep)) for dep in (self.department.departments_link if self.department else ['']*self._department_kwargs['max_link_departments'])], _quote(self.link) or '', str(self.page.status_code) if self.page else ''] return delimiter.join(elements) + ('\n' if newline else '') def __repr__(self): return f'{self.title.title} ({self.time.strftime(self._datetime_fmt)})' def __str__(self): return ( f'{self.title.suptitle}\n' f'{self.title.title}\n' f'{self.department.department}\n' f'{self.department._link_str}\n' f'{self.time.strftime(self._datetime_fmt)}\n' f'{self.link}' ) ... ############ class Article(ArticleBaseClass): _default_args = { 'http_retries': 3, 'meta': ArticleMetadata._default_args, 'debug': False, 'full_text_exclude': [ ('aside', {'class': 'related-topics'}), ('figure', {}), ('div', {'class': 'ad-info'}), ('div', {'class': 'float-container'}), ('a', {'class': ['text-link--external', 'text-link']}), ]} def __init__(self, *, link:str=None, metadata:Union[ArticleMetadata, None]=None, session=None, **kwargs): super().__init__() self._debug = kwargs.get('debug', self._default_args['debug']) self._http_retries = kwargs.get('http_retries', self._default_args['http_retries']) self._meta_kwargs = self._default_args['meta'] if meta_args := kwargs.get('meta'): self.update_target_from_source(self._meta_kwargs, meta_args) self.add_debug(self._meta_kwargs) self.full_text_exclude = kwargs.get('full_text_exclude', self._default_args['full_text_exclude']) self.session = session self.meta = metadata or ArticleMetadata(**self._meta_kwargs) self.meta.link = link or self.meta.link self.full_text = None self.parse_page(self.meta.link) # parsers def parse_page(self, link): self.meta.page = self.get_page(link) soupy_page = BeautifulSoup(self.meta.page.content, 'html.parser') if article := soupy_page.find('article'): self.parse_article(article) if error_page := soupy_page.find('div', {'class': 'error-page'}): self.parse_error_page(error_page) def parse_error_page(self, error_page): with contextlib.suppress(AttributeError): wrapper = error_page.find('div', {'class': 'error-page__wrapper'}) self.full_text = self.get_fulltext(wrapper, exclude=(('a'),)) def parse_article(self, article): with contextlib.suppress(AttributeError): self.meta.title.title = self.get_fulltext(article.find('span', {'class': 'document-title__headline'})) with contextlib.suppress(AttributeError): self.meta.title.suptitle = self.get_fulltext(article.find('span', {'class': 'document-title__kicker'})) with contextlib.suppress(AttributeError): if article.find('div', {'class': 'author'}): self.meta.authors = [self.get_fulltext(article.find('span', {'class': 'author__name'}))] elif article.find('div', {'class': 'authors'}): authors = article.find_all('div', {'class': 'article_author__details'}) self.meta.authors = [self.get_fulltext(details) for details in authors] with contextlib.suppress(AttributeError, KeyError): if date := article.find('time', {'class': ['datetime']}): datestring = date.attrs['datetime'] self.meta.time = dateutil.parser.parse(datestring).astimezone(dateutil.tz.tzlocal()) with contextlib.suppress(AttributeError): body = article.find_all('div', {'class': 'article-body'}) self.full_text = self.get_fulltext(body) def _clean_exclude_list(self, excludes): if excludes is None: return excludes excl_names = [] excl_attrs = [] for excl in excludes: if isinstance(excl, (list, tuple)): excl_names.append(excl[0]) try: local_attr = { k: v if isinstance(v, (list, tuple)) else [v] for k, v in excl[1].items() } excl_attrs.append(local_attr) except (KeyError, IndexError): excl_attrs.append({}) else: excl_names.append(excl) excl_attrs.append({}) return list(zip(excl_names, excl_attrs)) # return excl_names,excl_attrs def skip_element(self, elm, excludes): if isinstance(elm, Comment): return True if excludes is None: return False for excl_name, excl_attr in excludes: if elm.name == excl_name: if not excl_attr: return True for k, v in excl_attr.items(): with contextlib.suppress(KeyError): if elm.attrs[k] == v: return True return False def get_fulltext(self, html:Union[PageElement, list], exclude:Union[list, None]=None, sep:str=' '): if html is None: return '' if exclude is not None: exclude = self._clean_exclude_list(tuple(exclude)) else: exclude = self.full_text_exclude local_elems = [] for elm in html: if self.skip_element(elm, exclude): continue if isinstance(elm, NavigableString): local_elems.append(elm) elif isinstance(elm, PageElement): local_elem = self.get_fulltext(elm, exclude=exclude, sep=sep) local_elems.append(local_elem) return sep.join(local_elems).strip() # util def to_csv_line(self, delimiter:str=',', quote_char:str='"', newline=True): def _quote(s:str): return f'{quote_char}{s}{quote_char}' line = delimiter.join(( self.meta.csv_line(delimiter=delimiter, quote_char=quote_char, newline=False), _quote(' '.join(self.full_text.splitlines())) if self.full_text else '') ) + ('\n' if newline else '') return line def __repr__(self): department = self.meta.department.department if self.meta.department else '' title = self.meta.title.title if self.meta.title else '' full_text = self.full_text or '' datestr = self.meta.time.strftime('%d.%m.%Y %H:%M:%S') if self.meta.time else '' return f'[{department}] {title} ({datestr}): {islice(full_text, 100)}...' def __str__(self) -> str: return ( f'{self.meta.title.suptitle if self.meta.title else ''}\n' f'{self.meta.title.title if self.meta.title else ''}\n' f'{self.meta.department.department if self.meta.department else ''}\n' f'{self.meta.department._link_str if self.meta.department else ''}\n' f'{self.meta.time.strftime('%d.%m.%Y %H:%M:%S') if self.meta.time else ''}\n' f'{self.meta.link or ''} {[self.meta.page.status_code]}\n' f'{self.full_text or ''}\n' ) ... ############ class ArticleCollection(ArticleBaseClass): _default_args = { 'min_date': datetime.datetime(year=2006, month=1, day=6), 'max_date': datetime.datetime.now(), 'random': True, 'out_file': 'out.csv', 'out_file_mode': 'new', 'out_file_header': 'date,time,title,suptitle,department,[link_departments],link,http status code,full text', 'failed_file': 'failed.txt', 'http_retries': 5, 'retries': 2, 'base_link': 'https://www.bild.de/themen/uebersicht/archiv/archiv-82532020.bild.html?archiveDate=', 'link_time_format': '%Y-%m-%d', 'article_args': Article._default_args, 'debug': False, } _file_modes_overwrite = ('new', 'overwrite', 'write', 'w') _file_modes_append = ('append', 'a') _file_modes = (*_file_modes_overwrite, *_file_modes_append) def __init__(self, session:Union[requests.Session,None]=None, **kwargs): self._debug = kwargs.get('debug', self._default_args['debug']) super().__init__(debug=self._debug) self._min_date = kwargs.get('min_date', self._default_args['min_date']) self._max_date = kwargs.get('max_date', self._default_args['max_date']) self._max_date = self._max_date.date() self._min_date = self._min_date.date() self._random = kwargs.get('random', self._default_args['random']) self._article_args = self._default_args['article_args'] if article_args := kwargs.get('article_args'): self.update_target_from_source(self._article_args, article_args) self.add_debug(self._article_args) self._out_file = kwargs.get('out_file', self._default_args['out_file']) self._out_file_mode = kwargs.get('out_file_mode', self._default_args['out_file_mode']) if self._out_file_mode not in self._file_modes: raise AttributeError(f'file mode {self._out_file_mode} unknown. supported: [{','.join(self._file_modes)}]') self._out_file_header = kwargs.get('out_file_header', self._default_args['out_file_header']) max_link_departments = self._article_args.get('meta', {}).get('department', {}).get('max_link_departments', self._default_args['article_args']['meta']['department']['max_link_departments']) link_dep_strings = [f'department from link {i}' for i in range(max_link_departments)] self._out_file_header = self._out_file_header.replace('[link_departments]', ','.join(link_dep_strings)) self._failed_file = kwargs.get('failed_file', self._default_args['failed_file']) self._http_retries = kwargs.get('http_retries', self._default_args['http_retries']) self._retries = kwargs.get('retries', self._default_args['retries']) self._base_link = kwargs.get('base_link', self._default_args['base_link']) self._link_time_format = kwargs.get('link_time_format', self._default_args['link_time_format']) self.prepare_dates() self.prepare_files() self.articles = [] self.article_metas = [] self.session = session self.get_page = failhandler(callback=self.write_failed_to_file)(lambda args: ArticleCollection.get_page(self, args)) def prepare_dates(self): self.dates = [self._max_date - datetime.timedelta(days=x) for x in range((self._max_date - self._min_date).days+1)] if self._random: random.shuffle(self.dates) def collect(self): self.session = self.get_session() print(f'Collecting article metadata from archive pages for {len(self.dates)} days') for i, date in enumerate(self.dates): link = self.build_archive_link(date) self.print_date(date, link, prefix=f'Date {i+1:>{len(str(len(self.dates)))}}/{len(self.dates)} ') self.process_archive_page(link) print() print(f'Collecting fulltext for {len(self.article_metas)} articles') self.get_fulltexts() self.close_session() def build_archive_link(self, date): return f'{self._base_link}{date.strftime(self._link_time_format)}' def print_date(self, date:datetime.datetime, link:str=None, fmt:str=None, prefix:str=None, suffix:str=None): if fmt is None: fmt = self._link_time_format print(f'{prefix or ''}{AC.UNDERLINE}{lk(link,date.strftime(fmt)) if link else date.strftime(fmt)}{AC.DEFAULT}{suffix or ''}') def prepare_files(self): if self._out_file_mode in self._file_modes: if self._out_file_mode in self._file_modes_overwrite and self._out_file: with open(self._out_file, 'w') as f: f.write(self._out_file_header.strip()+'\n') elif self._out_file_mode in self._file_modes_append and self._out_file: ... else: raise ValueError(f'file mode \'{self._out_file_mode}\' not supported. supported: {self._file_modes}') if self._failed_file: with open(self._failed_file, 'w') as f: f.write('') def process_archive_page(self, link): page = self.get_page(link) soupy_page = BeautifulSoup(page.content, 'html.parser') articles_html = soupy_page.find_all("article", {"class": "stage-feed-item"}) slice_args = (None, 3, None) if self._debug else (None, None, 1) for article_html in islice(articles_html, *slice_args): # debugging article_metadata = ArticleMetadata(article_html, 'https://www.bild.de', **self._article_args.get('meta', {})) self.print_article_metadata(article_metadata) # save metadata self.article_metas.append(article_metadata) def get_fulltexts(self): if self._random: random.shuffle(self.article_metas) for i, article_metadata in enumerate(self.article_metas): self.print_article_metadata(article_metadata, prefix=f'{i+1:>{len(str(len(self.article_metas)))}}/{len(self.article_metas)} ') self.process_article_from_meta(article_metadata) def process_article_from_meta(self, article_metadata): try: art = Article(metadata=article_metadata, session=self.session, **self._article_args) self.articles.append(art) if self._out_file: with open(self._out_file, 'a') as f: f.write(art.to_csv_line()) except (MaxRetryError,ConnectionError) as e: if self._debug: print(e) self.write_failed_to_file(e, article_metadata) def print_article_metadata(self, metadata, *, date_fmt=None, time_fmt=None, prefix:str=None, suffix:str=None): if date_fmt is None: date_fmt = self._link_time_format if time_fmt is None: time_fmt = '%H:%M:%S' datetime_fmt = f'{date_fmt} {time_fmt}' link = '' or metadata.link timestr = ( AC.FG_BRIGHT_GREY + metadata.time.strftime(datetime_fmt) + AC.DEFAULT ) if metadata.time else '' suptitle = ( AC.FG_BLUE + metadata.title.suptitle + AC.DEFAULT ) if metadata.title.suptitle else '' title = ( AC.STYLE_BOLD + AC.FG_BRIGHT_PURPLE + metadata.title.title + AC.DEFAULT ) if metadata.title.title else '' error_string = ( AC.STYLE_BOLD + AC.FG_BRIGHT_RED + f'[{metadata.page.status_code}]' + AC.DEFAULT ) if (metadata.page and metadata.page.status_code != 200) else '' print(f'{prefix or ''}{timestr} {error_string}({suptitle}) {lk(link, title) if link else title}{suffix or ''}') def write_failed_to_file(self, exception, elem): with open(self._failed_file, 'a') as f: if isinstance(elem, ArticleMetadata): f.write(f'{elem.link}, "{exception}"\n') elif isinstance(elem, str): f.write(f'{elem}, "{exception}"\n')