Structure
If you have followed steps from the home page, and created a
project inside a tutorial/
directory there will be the following
project structure inside that directory.
tutorial
├── adapters
│ ├── __init__.py
│ └── example.py
├── exporters
│ ├── __init__.py
│ └── example.py
├── findings
│ ├── __init__.py
│ └── example.py
├── fixtures
│ └── example.json
├── migrations
│ ├── versions
│ ├── env.py
│ └── script.py.mako
├── models
│ ├── __init__.py
│ └── example.py
├── observers
│ ├── __init__.py
│ └── example.py
├── docker-compose.yaml
└── settings.py
Observers
observers/
package is a place
where you write your ETL configuration classes.
Once illuminate observe start
command is issued, all classes found in the package which name starts with the
word "Observer" will be added into context and initialized.
from __future__ import annotations
from collections.abc import AsyncGenerator, Generator
from typing import Optional, Union
from urllib.parse import urldefrag
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from illuminate.manager import Manager
from illuminate.observation import HTTPObservation
from illuminate.observer import Observer
from tornado.httpclient import HTTPResponse
from exporters.example import ExporterInfluxDBExample
from exporters.example import ExporterSQLExample
from findings.example import FindingExample
def _create_soup(response: HTTPResponse) -> BeautifulSoup:
html = response.body.decode(errors="ignore")
soup = BeautifulSoup(html, "html.parser")
return soup
def _extract_hrefs(
soup: BeautifulSoup, url: str
) -> Generator[str, None]:
links = soup.find_all("a")
for link in links:
_url = link.get("href")
if _url:
yield urljoin(url, urldefrag(_url)[0])
class ObserverExample(Observer):
"""
Observer is ETL configuration class. It represents entry point, and it
defines the flow with observe, and any additional method.
Once initialized by the framework, it will fill an observation queue with
objects from Observer's initial_observation collection, starting the whole
process.
Note: Multiple Observers can exist in the same project.
"""
ALLOWED: Union[list[str], tuple[str]] = ("https://webscraper.io/",)
"""
Collection of strings evaluated against URL to determent if URL is allowed
to be observed. If empty, no Observation wide restrictions are forced.
"""
LABELS: dict = {"tag": "example"}
"""Observer's labels."""
NAME: str = "example"
"""Observer's name."""
def __init__(self, manager: Optional[Manager] = None):
"""
Collection initial_observations is de facto entry point. It must
contain Observation objects initialized with URL, allowed list of
strings and callback method. If Observation's URL starts with element
in allowed collection, it will be performed. Otherwise, it is
rejected.
Manager's instance (object that is running whole process) is passed as
an argument when initializing Observers. This allows Observer object to
interact with Manager object attributes, like database sessions or
queues for advanced ETL flows, by simply asking for
self.manager.sessions["postgresql"]["main"] to acquire async database
session defined in tutorial/settings.py.
"""
super().__init__(manager)
self.initial_observations = [
HTTPObservation(
"https://webscraper.io/",
allowed=self.ALLOWED,
callback=self.observe,
),
]
async def observe(
self, response: HTTPResponse, *args, **kwargs
) -> AsyncGenerator[
Union[
ExporterInfluxDBExample,
ExporterSQLExample,
FindingExample,
HTTPObservation,
],
None,
]:
"""
ETL flow is regulated by a yielded object type of Observation's
callback method. Each object type corresponds to ETL stage:
* Observation -> Extract
* Finding -> Transform
* Exporter -> Load
In the example below, initial HTTPObservation returned Tornado's
HTTP response object that was used to extract all hrefs from HTML.
These hrefs will be used as URLs for new HTTPObservations, using
the same observe method as a callback and same allowed collection.
Finally, Finding object is yielded, representing a desired data.
This flow, with everything set, represents a simple web scraper that
will visit every page found on a domain, and take page's title and URL
as desired data.
Tip: If there is no need for some further data enrichment or
manipulation, yield Exporter object instead of Finding object.
For reference check tutorial/adapters/example.py.
Note: Illuminate takes care of tracking what resources were already
requested, avoiding duplication. Resource check pool is shared between
Observers, avoiding overlapping.
"""
soup = _create_soup(response)
hrefs = _extract_hrefs(soup, response.effective_url)
for href in hrefs:
yield HTTPObservation(
href,
allowed=self.ALLOWED,
callback=self.observe,
)
yield FindingExample(
response.request_time,
soup.title.text,
response.effective_url
)
Observer
vs Observation
Observer
is ETL configuration
class. It introduces Observation
objects, in this case
HTTPObservation
objects, through
initial_observations
, to a framework. It also should be used
(not forced, but suggested) to contain all callback methods used by
Observation
objects.
Observation
class is a type of
source that will be read. Once the data is pulled, it will provide a response
object to a callback function that will asynchronously generate additional
object related to ETL flow, using Illuminate ETL Mapping Classes.
NOTE: You can register any async generator as
HTTPObservation.callback
. For simplicity, we will useObserverExample.observe
method.
ETL Mapping Classes
Each stage in ETL flow is represented by a class from Illuminate framework.
- Extract is represented with
Observation
- Transform is represented with
Adapter
- Load is represented with
Exporter
If a response object from initial
Observation
is just a step before the data you want to export,
yield new Observation
with same or different callback, allowing
you to construct a complex set of rules.
If a response object holds the data you want to
adapt, yield Finding
object. It will be picked up by
Adapter
instances that will perform adapt
method with
Finding
object passed as argument if the condition is met.
If a response object is something that
requires minimal transformation, or it could be used raw, you can yield
Exporter
object and skip complex adaptation.
Findings
findings/
package is a place
where you write your data classes.
This class is meant to be picked up by a
framework and passed to Adapter
instance, to perform actual
adaptation up on data, and yield Exporter
or
Observation
objects.
It can only be yielded by
Observation
object's callback
, in our example
project, it will be yield byExampleObserver.observe
method.
from dataclasses import dataclass
from dataclasses import field
from illuminate.observer import Finding
@dataclass(frozen=True, order=True)
class FindingExample(Finding):
"""
Finding is a data class, meant to hold raw data extracted by Observation's
callback. Finding will be passed to Adapter object's adapt method if
it is subscribed to Adapter.
Check tutorial/adapters/example.py to learn more about subscription.
"""
load_time: float = field()
title: str = field()
url: str = field()
Model
models/
package is a place where
you write your database model classes.
Models are used by SQLExporter
in
our example and are recommended method of writing and reading data from SQL
database.
NOTE:
SQLObservation
class is not yet introduced to the framework.
from sqlalchemy import Column, Float, Integer, String
from models import Base
class ModelExample(Base):
"""
SQLAlchemy model used by SQLExporter object. For more information about
SQLExporter class check tutorial/exporters/example.py.
"""
__tablename__ = "tutorial"
id: int = Column(Integer, primary_key=True)
load_time: float = Column(Float)
title: str = Column(String)
url: str = Column(String)
def __repr__(self):
"""ModelExample's __repr__ method."""
return (
f'ModelExample(load_time={self.load_time},'
f'title="{self.title}",url="{self.url}")'
)
Adapter
adapters/
package is a place where
you write your transformation classes.
Once illuminate observe start
command is issued, all classes found in the package which name starts with the
word "Adapter" will be added into context and initialized.
Once the framework gets Finding
object from a queue, it will check if object is "subscribed" to
Adapter
. If object is subscribed, the framework will pass it to
adapt
method that will yield Exporter
or
Observation
objects.
Method adapt
is where you implement
your adaptation rules. In the following example, method adapt
is
just used to create a model instance to be passed to SQLExpoter
instance, yet to be saved later into the database.
Each Adapter
object must contain a
tuple of Finding
classes, called subscribers
that
will be passed toadapt
method. Others will be rejected by a
framework.
Since there could be multiple Adapter classes
with various implementation of adapt
method, if
Finding
is subscribed to multiple Adapters
,
priority
attribute will determent which adapt
method
should be called first.
NOTE: Method
adapt
can't yieldFindings
.
from __future__ import annotations
from collections.abc import AsyncGenerator
from typing import Type, Union
from illuminate.adapter import Adapter
from illuminate.observation import FileObservation
from illuminate.observation import HTTPObservation
from illuminate.observation import SQLObservation
from illuminate.observation import SplashObservation
from illuminate.observer import Finding
from exporters.example import ExporterInfluxDBExample
from exporters.example import ExporterSQLExample
from findings.example import FindingExample
from models.example import ModelExample
class AdapterExample(Adapter):
"""
Adapter class is responsible for turning Finding objects into Exporter or
Observation objects, and yielding them when adapt method is called.
It is also designated to be a place where any additional enrichment of data
should be performed, calling external services with some async library. If
additional data can be used to construct URL, additional Observations can
be yielded. For more information how to yield Observation object, check
tutorial/observers/example.py.
Attribute subscribers is a collection of Finding classes that will be
processed by Adapter. If Finding is subscribed to two or more Adapters,
priority in adaptation will be give to Adapter with higher priority score.
Manager's instance (object that is running whole process) is passed as
an argument when initializing Adapters. This allows Adapter object to
interact with Manager object attributes, like database sessions or
queues for advanced ETL flows, by simply asking for
self.manager.sessions["postgresql"]["main"] to acquire async database
session defined in tutorial/settings.py.
Note: Method adapt can not yield Findings.
"""
priority: int = 10
subscribers: tuple[Type[Finding]] = (FindingExample,)
async def adapt(
self, finding: FindingExample, *args, **kwargs
) -> AsyncGenerator[
Union[
ExporterInfluxDBExample,
ExporterSQLExample,
FileObservation,
HTTPObservation,
SQLObservation,
SplashObservation,
],
None,
]:
yield ExporterSQLExample(
models=[
ModelExample(
load_time=finding.load_time,
title=finding.title,
url=finding.url
)
]
)
yield ExporterInfluxDBExample(
points={
"measurement": "tutorial",
"tags": {"url": finding.url, "title": finding.title},
"fields": {"load_time": finding.load_time},
}
)
Exporter
exporters/
package is a place where
you write your dump classes.
from __future__ import annotations
from typing import Iterable, Union
from illuminate.exporter import InfluxDBExporter
from illuminate.exporter import SQLExporter
from pandas import DataFrame
from models.example import ModelExample
class ExporterInfluxDBExample(InfluxDBExporter):
"""
InfluxDBExporter class will write points to database using session. Points
are passed at initialization, while database session is found by name
attribute in the pool of existing sessions. Name must co-respond to DB
section in tutorial/settings.py. For more information how to initialize
InfluxDBExporter class, check tutorial/adapters/example.py
"""
name: str = "measurements"
def __init__(
self,
points: Union[
Union[DataFrame, dict, str, bytes],
Iterable[Union[DataFrame, dict, str, bytes]],
],
):
super().__init__(points)
class ExporterSQLExample(SQLExporter):
"""
SQLExporter class will commit models to database using session. Models are
passed at initialization, while database session is found by name attribute
in the pool of existing sessions. Name must co-respond to DB section in
tutorial/settings.py. For more information how to initialize SQLExporter
class, check tutorial/adapters/example.py
"""
name: str = "main"
def __init__(self, models: Union[list[ModelExample], tuple[ModelExample]]):
super().__init__(models)
In our example, everything is set for
Exporter
object to fetch a database connection from the framework
and use it to facilitate dump to the destination. Since we are writing to SQL
database, we will import SQLExporter
to inherit from in
ExporterExample
class.
Migrations
migrations/
package contains files needed by
Alembic
to perform database operations. It contains versions
directory
where migration scripts are held, providing Illuminate with the ability to
administer its own databases out of the box.
Usually, the content of this directory is not a user's concern.
Fixtures
Directory fixtures/
holds json files that contain
data needed in ETL flow beforehand. This data is injected into the database
with illuminate manage database populate
cli command.
Each object in a list represents a table/model.
Key name
represents table/model name and data
is a
list of rows to be inserted with CLI command.
[
{
"name": "tutorial",
"data": [
{
"load_time": "1.0",
"url": "https://webscraper.io/",
"title": "Web Scraper - The #1 web scraping extension"
},
{
"load_time": "1.0",
"url": "https://webscraper.io/tutorials",
"title": "Web Scraper Tutorials"
}
]
}
]
Settings
Module settings.py
contains all configuration
needed to run ETL process.
"""
This file represents project tutorial's settings. It will be imported by a
framework and used to configure run. The sections are as following:
* CONCURRENCY
Number of workers per queue type. I/O heavy queues can have more workers
assigned to them to exploit longer wait times.
* DB
Database related data used by SQLAlchemy to acquire sessions. Sessions are
obtained at the start of the ETL process and can be accessed by instantiating
Manager class and access sessions attribute.
* MODELS
List of SQLAlchemy models affected by illuminate cli when invoking
'db revision', 'db upgrade' and 'db populate' commands.
* NAME
Project name.
* OBSERVATION_CONFIGURATION
General and Observation type specific configuration. Type specific
configuration is used if Observation is not specifying its own.
"""
import os
from illuminate import __version__
CONCURRENCY = {
"adapters": 2,
"exporters": 8,
"observations": 8,
}
DB = {
"main": {
"host": "localhost",
"name": "tutorial",
"pass": os.environ.get("ILLUMINATE_MAIN_DB_PASSWORD"),
"port": "5432",
"user": "illuminate",
"type": "postgresql",
},
"measurements": {
"host": "localhost",
"name": "tutorial",
"pass": os.environ.get("ILLUMINATE_MEASUREMENTS_DB_PASSWORD"),
"port": "8086",
"user": "illuminate",
"type": "influxdb",
},
}
MODELS = [
"models.example.ModelExample",
]
NAME = "tutorial"
OBSERVATION_CONFIGURATION = {
"delay": 0.1,
"http": {
"auth_username": None,
"auth_password": None,
"connect_timeout": 10.0,
"body": None,
"headers": None,
"method": "GET",
"request_timeout": 10.0,
"user_agent": f"Illuminate-bot/{__version__}",
"validate_cert": False,
},
"splash": {
"body": "",
"host": "localhost",
"method": "GET",
"port": 8050,
"protocol": "http",
"render": "html",
"timeout": 30,
}
}
Environment
Environment for a development purposes is provided by docker-compose.yaml file. It will spin up postgres database and pgadmin containers to monitor your flow on your localhost.
NOTE: As additional Observations and Exporters are added, this file will grow as well.
version: '3.8'
services:
influxdb:
image: influxdb:1.8
container_name: influxdb
restart: always
environment:
- INFLUXDB_ADMIN_USER=illuminate
- INFLUXDB_ADMIN_PASSWORD=$ILLUMINATE_MEASUREMENTS_DB_PASSWORD
- INFLUXDB_DB=tutorial
- INFLUXDB_HTTP_AUTH_ENABLED=true
ports:
- '8086:8086'
volumes:
- influxdb:/var/lib/influxdb
pg:
container_name: pg
image: postgres:latest
restart: always
environment:
- POSTGRES_USER=illuminate
- POSTGRES_PASSWORD=$ILLUMINATE_MAIN_DB_PASSWORD
- POSTGRES_DB=tutorial
ports:
- '5432:5432'
volumes:
- postgres:/var/lib/postgresql/data
splash:
container_name: splash
image: scrapinghub/splash
restart: always
ports:
- "8050:8050"
volumes:
grafana:
driver: local
influxdb:
driver: local
postgres:
driver: local