Pipeline implementation

Configuration

class swisstext.cmd.scraping.config.Config(config: Union[str, dict, io.IOBase] = None)[source]

Bases: swisstext.cmd.base_config.BaseConfig

The default configuration file for the scraping pipeline is defined in config.yaml. This is the best way to understand what options are available.

class Options(num_workers=1, min_proba=0.85, crawl_depth=2, **kwargs)[source]

Bases: object

Holds the general options for the scraping pipeline.

__init__(num_workers=1, min_proba=0.85, crawl_depth=2, **kwargs)[source]

Initialize self. See help(type(self)) for accurate signature.

crawl_depth = None

maximum depth of the crawl, inclusive.

min_proba = None

minimum Swiss German probability to keep a sentence

num_workers = None

maximum number of threads to use

__init__(config: Union[str, dict, io.IOBase] = None)[source]

Create a configuration.

Subclasses should provide the path to a default configuration (YAML file) and optionally an option class to hold general options. The config is provided by the user and can be a file, a path to a file or even a dictionary (as long as it has the correct structure).

Parameters
  • default_config_path – the absolute path to a default configuration file.

  • option_class – the class to use for holding global options (a dictionary by default).

  • config – user configuration that overrides the default options. config can be either a path or a file object of a YAML configuration, or a dictionary

Raises

ValueError – if config is not of a supported type

create_pipeline() → swisstext.cmd.scraping.pipeline.Pipeline[source]

Instantiate a pipeline from the YAML configuration.

property interfaces_package

Should return the complete package where the interfaces are defined, if any. In case this is defined and a tool is missing from the list, we will try to instantiate the interface instead.

property tool_entry_name

Should return the name of the tool_entries option, i.e. the YAML path to the dictionary of [interface name, canonical class to instantiate].

property valid_tool_entries

Should return the list of valid tool entries under the tool_entry_name. Note that the order of tools in the list defines the order of tools instances returned by instantiate_tools().

Data structures

This module defines the generic data structures used across the module / between the different tools. They have been thought to be decoupled from MongoDB for better flexibility/adaptability.

class swisstext.cmd.scraping.data.Page(url, score=None, parent_url=None)[source]

Bases: object

Information about a page. Some attributes should be defined upon creation (see swisstext.cmd.scraping.interfaces.ISaver.get_page()), will other will be added/used incrementally by the different tools of the pipeline.

__init__(url, score=None, parent_url=None)[source]

Initialize self. See help(type(self)) for accurate signature.

blacklisted = None

is the URL blacklisted ?

crawl_results = None

results of the crawl (see ICrawler)

is_new() → bool[source]

Test if the page is new or not, based on the delta_date.

new_sg = None

new sentences found

parent_url = None

the parent URL, if the crawl depth is > 1

score = None

page score

sentence_count = None

total number of sentences on the page

sg_count = None

number of Swiss German sentences on the page, wether they are new or not

text = None

normalized text of the page

url = None

the URL of the page

class swisstext.cmd.scraping.data.PageScore(count=0, delta_count=0, delta_date=None)[source]

Bases: object

Scoring information for a page used, among other things, to decide if a URL should be crawled or not.

__init__(count=0, delta_count=0, delta_date=None)[source]

Initialize self. See help(type(self)) for accurate signature.

count = None

total number of new sentences found on this page (for all the visits)

delta_count = None

number of new sentences found on this page in the last visit

delta_date = None

date of the last visit (in UTC) or None if never visited

class swisstext.cmd.scraping.data.Sentence(text: str, proba: float)[source]

Bases: object

Information about a sentence.

__init__(text: str, proba: float)[source]

Initialize self. See help(type(self)) for accurate signature.

proba = None

the Swiss German probability

text = None

the exact text

Queue

class swisstext.cmd.scraping.page_queue.PageQueue(maxsize=0)[source]

Bases: queue.Queue

A Queue that ensures enqueued pages are unique and not clearly pointing to images or PDFs (i.e. ending with .jpg, .jpeg, .png or .pdf).

Elements in the queue should be tuples, with the first element a Page and the second the crawl depth (as int)

lock = <unlocked _thread.lock object>

Pipeline

This module contains the core of the scraping system.

In order to be fully customizable, it uses the concept of interfaces heavily: most of the decisions and steps are delegated to instances of classes defined in interfaces.

Here is an example usage.

from swisstext.cmd.scraping.pipeline import Pipeline, PipelineWorker
from swisstext.cmd.scraping.data import Page
from swisstext.cmd.scraping.page_queue import PageQueue
from swisstext.cmd.scraping.config import Config
from typing import List

# print some information to the console
import logging
logging.basicConfig(stream=sys.stdout)
logging.getLogger('swisstext').setLevel(level=logging.INFO)

queue: PageQueue = PageQueue()
new_sentences: List[str] = []

# load the config and create the default pipeline
# WARNING: the default config will try to connect to MongoDB on localhost:27017
# if you don't want to use Mongo, create a config file and use the ConsoleSaver instead
# or use the hack: pipeline.saver = ConsoleSaver()
config = Config(config_path=None) # set the config path if you have one
pipeline = config.create_pipeline()

# add one page to the queue
start_url = 'http://www.hunold.ch/zw/goeteborg.html'
queue.put((Page(start_url), 1)) # add the tuple (url, depth) with a depth of 1

# launch one worker
worker = PipelineWorker()
worker.run(queue, pipeline, new_sentences, max_depth=config.options.crawl_depth)
class swisstext.cmd.scraping.pipeline.Pipeline(crawler: swisstext.cmd.scraping.interfaces.ICrawler, normalizer: swisstext.cmd.scraping.interfaces.INormalizer, splitter: swisstext.cmd.scraping.interfaces.ISplitter, filter: swisstext.cmd.scraping.interfaces.ISentenceFilter, detector: swisstext.cmd.scraping.interfaces.ISgDetector, seeder: swisstext.cmd.scraping.interfaces.ISeedCreator, url_filter: swisstext.cmd.scraping.interfaces.IUrlFilter, decider: swisstext.cmd.scraping.interfaces.IDecider, saver: swisstext.cmd.scraping.interfaces.ISaver, min_proba=0.85)[source]

Bases: object

Holds instances of all needed interfaces and variables.

Note that pipelines are meant to be instantiated by a Config object, not manually.

__init__(crawler: swisstext.cmd.scraping.interfaces.ICrawler, normalizer: swisstext.cmd.scraping.interfaces.INormalizer, splitter: swisstext.cmd.scraping.interfaces.ISplitter, filter: swisstext.cmd.scraping.interfaces.ISentenceFilter, detector: swisstext.cmd.scraping.interfaces.ISgDetector, seeder: swisstext.cmd.scraping.interfaces.ISeedCreator, url_filter: swisstext.cmd.scraping.interfaces.IUrlFilter, decider: swisstext.cmd.scraping.interfaces.IDecider, saver: swisstext.cmd.scraping.interfaces.ISaver, min_proba=0.85)[source]

Initialize self. See help(type(self)) for accurate signature.

class swisstext.cmd.scraping.pipeline.PipelineWorker(id=-1)[source]

Bases: object

Pipeline workers actually do the magic and can be run in parallel.

__init__(id=-1)[source]

Initialize self. See help(type(self)) for accurate signature.

id = None

an identifier used in log messages, especially useful if multiple threads are used.

kill_received = None

If the worker is launched in a thread, you can use this flag to make it exit prematurely. Workers will always finish processing the current page before exiting, so that we can ensure coherence in the persistence layer.

run(queue: queue.Queue, p: swisstext.cmd.scraping.pipeline.Pipeline, new_sentences: List[str], max_depth=1)[source]

Do the work ! All the magic is in here :)

For each page pulled from the queue, the worker will execute all the steps of the scraping pipeline:

  • get text and links,

  • split and filter sentences,

  • keep sentences with Swiss German,

  • persist the results (url + sentences), if any

  • add new tasks to the queue (if the page contains links to interesting URLs)

New sentences are added to the new_sentences list, so that the caller can easily know how fruitful the scraping was and optionaly use the new sentences to generate seeds.

This method will stop:

  • when the queue is empty or

  • if the kill_received is set to true. In this case, it finishes processing the current task and exit

Warning

It is your responsibility to ensure that the URLs in the queue are not blacklisted.

Todo

This piece of code (and the commandline) has many flaws and should clearly be enhanced… For example:

Multithreading: currently, the worker stops when the queue is empty. This also means that if you launch the system with only 3 base urls but 5 workers, 2 workers will exit immediately instead of waiting for new tasks to be added to the queue.

A better way would be to keep track of active workers and stop only when all workers are idle, or when all workers reached a task with a depth > max depth…

Comportement on errors: in case fetching the URL triggers an error, we currently just log the thing. Should we also remove/blacklist the URL ? Should we allow the URL to fail X times before removal ?

Parameters
  • queue – the task queue

  • p – the pipeline to use

  • new_sentences – all new sentences discovered will be added to this list

  • max_depth – when do we stop (inclusive)