Pipeline implementation¶
Configuration¶
-
class
swisstext.cmd.scraping.config.
Config
(config: Union[str, dict, io.IOBase] = None)[source]¶ Bases:
swisstext.cmd.base_config.BaseConfig
The default configuration file for the scraping pipeline is defined in
config.yaml
. This is the best way to understand what options are available.-
class
Options
(num_workers=1, min_proba=0.85, crawl_depth=2, **kwargs)[source]¶ Bases:
object
Holds the general options for the scraping pipeline.
-
__init__
(num_workers=1, min_proba=0.85, crawl_depth=2, **kwargs)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
crawl_depth
= None¶ maximum depth of the crawl, inclusive.
-
min_proba
= None¶ minimum Swiss German probability to keep a sentence
-
num_workers
= None¶ maximum number of threads to use
-
-
__init__
(config: Union[str, dict, io.IOBase] = None)[source]¶ Create a configuration.
Subclasses should provide the path to a default configuration (YAML file) and optionally an option class to hold general options. The config is provided by the user and can be a file, a path to a file or even a dictionary (as long as it has the correct structure).
- Parameters
default_config_path – the absolute path to a default configuration file.
option_class – the class to use for holding global options (a dictionary by default).
config – user configuration that overrides the default options. config can be either a path or a file object of a YAML configuration, or a dictionary
- Raises
ValueError – if config is not of a supported type
-
create_pipeline
() → swisstext.cmd.scraping.pipeline.Pipeline[source]¶ Instantiate a pipeline from the YAML configuration.
-
property
interfaces_package
¶ Should return the complete package where the interfaces are defined, if any. In case this is defined and a tool is missing from the list, we will try to instantiate the interface instead.
-
property
tool_entry_name
¶ Should return the name of the tool_entries option, i.e. the YAML path to the dictionary of [interface name, canonical class to instantiate].
-
property
valid_tool_entries
¶ Should return the list of valid tool entries under the
tool_entry_name
. Note that the order of tools in the list defines the order of tools instances returned byinstantiate_tools()
.
-
class
Data structures¶
This module defines the generic data structures used across the module / between the different tools. They have been thought to be decoupled from MongoDB for better flexibility/adaptability.
-
class
swisstext.cmd.scraping.data.
Page
(url, score=None, parent_url=None)[source]¶ Bases:
object
Information about a page. Some attributes should be defined upon creation (see
swisstext.cmd.scraping.interfaces.ISaver.get_page()
), will other will be added/used incrementally by the different tools of the pipeline.-
__init__
(url, score=None, parent_url=None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
blacklisted
= None¶ is the URL blacklisted ?
-
crawl_results
= None¶ results of the crawl (see
ICrawler
)
-
new_sg
= None¶ new sentences found
-
parent_url
= None¶ the parent URL, if the crawl depth is > 1
-
score
= None¶ page score
-
sentence_count
= None¶ total number of sentences on the page
-
sg_count
= None¶ number of Swiss German sentences on the page, wether they are new or not
-
text
= None¶ normalized text of the page
-
url
= None¶ the URL of the page
-
-
class
swisstext.cmd.scraping.data.
PageScore
(count=0, delta_count=0, delta_date=None)[source]¶ Bases:
object
Scoring information for a page used, among other things, to decide if a URL should be crawled or not.
-
__init__
(count=0, delta_count=0, delta_date=None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
count
= None¶ total number of new sentences found on this page (for all the visits)
-
delta_count
= None¶ number of new sentences found on this page in the last visit
-
delta_date
= None¶ date of the last visit (in UTC) or None if never visited
-
Queue¶
-
class
swisstext.cmd.scraping.page_queue.
PageQueue
(maxsize=0)[source]¶ Bases:
queue.Queue
A
Queue
that ensures enqueued pages are unique and not clearly pointing to images or PDFs (i.e. ending with .jpg, .jpeg, .png or .pdf).Elements in the queue should be tuples, with the first element a
Page
and the second the crawl depth (as int)-
lock
= <unlocked _thread.lock object>¶
-
Pipeline¶
This module contains the core of the scraping system.
In order to be fully customizable, it uses the concept of interfaces heavily: most of the decisions and steps
are delegated to instances of classes defined in interfaces
.
Here is an example usage.
from swisstext.cmd.scraping.pipeline import Pipeline, PipelineWorker
from swisstext.cmd.scraping.data import Page
from swisstext.cmd.scraping.page_queue import PageQueue
from swisstext.cmd.scraping.config import Config
from typing import List
# print some information to the console
import logging
logging.basicConfig(stream=sys.stdout)
logging.getLogger('swisstext').setLevel(level=logging.INFO)
queue: PageQueue = PageQueue()
new_sentences: List[str] = []
# load the config and create the default pipeline
# WARNING: the default config will try to connect to MongoDB on localhost:27017
# if you don't want to use Mongo, create a config file and use the ConsoleSaver instead
# or use the hack: pipeline.saver = ConsoleSaver()
config = Config(config_path=None) # set the config path if you have one
pipeline = config.create_pipeline()
# add one page to the queue
start_url = 'http://www.hunold.ch/zw/goeteborg.html'
queue.put((Page(start_url), 1)) # add the tuple (url, depth) with a depth of 1
# launch one worker
worker = PipelineWorker()
worker.run(queue, pipeline, new_sentences, max_depth=config.options.crawl_depth)
-
class
swisstext.cmd.scraping.pipeline.
Pipeline
(crawler: swisstext.cmd.scraping.interfaces.ICrawler, normalizer: swisstext.cmd.scraping.interfaces.INormalizer, splitter: swisstext.cmd.scraping.interfaces.ISplitter, filter: swisstext.cmd.scraping.interfaces.ISentenceFilter, detector: swisstext.cmd.scraping.interfaces.ISgDetector, seeder: swisstext.cmd.scraping.interfaces.ISeedCreator, url_filter: swisstext.cmd.scraping.interfaces.IUrlFilter, decider: swisstext.cmd.scraping.interfaces.IDecider, saver: swisstext.cmd.scraping.interfaces.ISaver, min_proba=0.85)[source]¶ Bases:
object
Holds instances of all needed interfaces and variables.
Note that pipelines are meant to be instantiated by a
Config
object, not manually.-
__init__
(crawler: swisstext.cmd.scraping.interfaces.ICrawler, normalizer: swisstext.cmd.scraping.interfaces.INormalizer, splitter: swisstext.cmd.scraping.interfaces.ISplitter, filter: swisstext.cmd.scraping.interfaces.ISentenceFilter, detector: swisstext.cmd.scraping.interfaces.ISgDetector, seeder: swisstext.cmd.scraping.interfaces.ISeedCreator, url_filter: swisstext.cmd.scraping.interfaces.IUrlFilter, decider: swisstext.cmd.scraping.interfaces.IDecider, saver: swisstext.cmd.scraping.interfaces.ISaver, min_proba=0.85)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
-
class
swisstext.cmd.scraping.pipeline.
PipelineWorker
(id=-1)[source]¶ Bases:
object
Pipeline workers actually do the magic and can be run in parallel.
-
id
= None¶ an identifier used in log messages, especially useful if multiple threads are used.
-
kill_received
= None¶ If the worker is launched in a thread, you can use this flag to make it exit prematurely. Workers will always finish processing the current page before exiting, so that we can ensure coherence in the persistence layer.
-
run
(queue: queue.Queue, p: swisstext.cmd.scraping.pipeline.Pipeline, new_sentences: List[str], max_depth=1)[source]¶ Do the work ! All the magic is in here :)
For each page pulled from the queue, the worker will execute all the steps of the scraping pipeline:
get text and links,
split and filter sentences,
keep sentences with Swiss German,
persist the results (url + sentences), if any
add new tasks to the queue (if the page contains links to interesting URLs)
New sentences are added to the new_sentences list, so that the caller can easily know how fruitful the scraping was and optionaly use the new sentences to generate seeds.
This method will stop:
when the queue is empty or
if the
kill_received
is set to true. In this case, it finishes processing the current task and exit
Warning
It is your responsibility to ensure that the URLs in the queue are not blacklisted.
Todo
This piece of code (and the commandline) has many flaws and should clearly be enhanced… For example:
Multithreading: currently, the worker stops when the queue is empty. This also means that if you launch the system with only 3 base urls but 5 workers, 2 workers will exit immediately instead of waiting for new tasks to be added to the queue.
A better way would be to keep track of active workers and stop only when all workers are idle, or when all workers reached a task with a depth > max depth…
Comportement on errors: in case fetching the URL triggers an error, we currently just log the thing. Should we also remove/blacklist the URL ? Should we allow the URL to fail X times before removal ?
- Parameters
queue – the task queue
p – the pipeline to use
new_sentences – all new sentences discovered will be added to this list
max_depth – when do we stop (inclusive)
-