Skip to content

Support Classes

illuminate.observation.observation.Observation

Bases: IObservation

Observation class, reads data from the source. Class must be inherited and method observe must be implemented in a child class.

Source code in illuminate/observation/observation.py
class Observation(IObservation):
    """
    Observation class, reads data from the source. Class must be inherited and
    method observe must be implemented in a child class.
    """

    def __hash__(self):
        """
        Observation object hash value.

        :return: None
        :raises BasicObservationException:
        """
        raise BasicObservationException(
            "Property hash must be implemented in child class"
        )

    def __init__(self, url: Any, xcom: Optional[Any] = None):
        """
        Observation's __init__ method.

        :param url: Data's URL
        :param xcom: Cross communication object
        """
        self.url = url
        self.xcom = xcom

    async def observe(self, *args, **kwargs):
        """
        Reads data from the source. Must be implemented in a child class.

        :return: None
        :raises BasicObservationException:
        """
        raise BasicObservationException(
            "Method observe must be implemented in child class"
        )

__hash__()

Observation object hash value.

RETURNS DESCRIPTION

None

RAISES DESCRIPTION
BasicObservationException
Source code in illuminate/observation/observation.py
def __hash__(self):
    """
    Observation object hash value.

    :return: None
    :raises BasicObservationException:
    """
    raise BasicObservationException(
        "Property hash must be implemented in child class"
    )

__init__(url: Any, xcom: Optional[Any] = None)

Observation's init method.

PARAMETER DESCRIPTION
url

Data's URL

TYPE: Any

xcom

Cross communication object

TYPE: Optional[Any] DEFAULT: None

Source code in illuminate/observation/observation.py
def __init__(self, url: Any, xcom: Optional[Any] = None):
    """
    Observation's __init__ method.

    :param url: Data's URL
    :param xcom: Cross communication object
    """
    self.url = url
    self.xcom = xcom

observe(*args, **kwargs) async

Reads data from the source. Must be implemented in a child class.

RETURNS DESCRIPTION

None

RAISES DESCRIPTION
BasicObservationException
Source code in illuminate/observation/observation.py
async def observe(self, *args, **kwargs):
    """
    Reads data from the source. Must be implemented in a child class.

    :return: None
    :raises BasicObservationException:
    """
    raise BasicObservationException(
        "Method observe must be implemented in child class"
    )

illuminate.observation.file.FileObservation

Bases: Observation

FileObservation class, reads file content asynchronously. Inherits Observation class and implements observe method.

Source code in illuminate/observation/file.py
class FileObservation(Observation):
    """
    FileObservation class, reads file content asynchronously. Inherits
    Observation class and implements observe method.
    """

    def __hash__(self) -> int:
        """
        FileObservation object hash value.

        :return: int
        """
        return hash(self.url)

    def __init__(
        self,
        url: str,
        /,
        callback: Callable[[FileIOWrapperBase, tuple, dict], Result],
        xcom: Optional[Any] = None,
        *args,
        **kwargs,
    ):
        """
        FileObservation's __init__ method.

        :param url: File path
        :param callback: Async function/method that manipulates
        FileIOWrapperBase object and returns Result
        :param xcom: Cross communication object
        """
        super().__init__(url, xcom=xcom)
        self._callback = callback

    @asynccontextmanager
    async def observe(
        self, *args, **kwargs
    ) -> AsyncIterator[Union[None, Result]]:
        """
        Opens IO file stream asynchronously, pass stream object to a callback
        and returns None or Result as a context manager.

        :return: AsyncIterator with None or Result
        """
        _file = None
        _items = None
        async with AsyncExitStack() as stack:
            try:
                _file = await stack.enter_async_context(
                    async_open(self.url, "r")
                )
                logger.info(f"{self}.observe() -> {_file}")
                _items = self._callback(_file, *args, **kwargs)
            except Exception as exception:
                logger.warning(f"{self}.observe() -> {exception}")
            finally:
                yield _items
                if _file:
                    await _file.close()

    def __repr__(self):
        """
        FileObservation's __repr__ method.

        :return: String representation of an instance
        """
        return f'FileObservation("{self.url}",callback="{self._callback}")'

__hash__() -> int

FileObservation object hash value.

RETURNS DESCRIPTION
int

int

Source code in illuminate/observation/file.py
def __hash__(self) -> int:
    """
    FileObservation object hash value.

    :return: int
    """
    return hash(self.url)

__init__(url: str, /, callback: Callable[[FileIOWrapperBase, tuple, dict], Result], xcom: Optional[Any] = None, *args, **kwargs)

FileObservation's init method.

PARAMETER DESCRIPTION
url

File path

TYPE: str

callback

Async function/method that manipulates FileIOWrapperBase object and returns Result

TYPE: Callable[[FileIOWrapperBase, tuple, dict], Result]

xcom

Cross communication object

TYPE: Optional[Any] DEFAULT: None

Source code in illuminate/observation/file.py
def __init__(
    self,
    url: str,
    /,
    callback: Callable[[FileIOWrapperBase, tuple, dict], Result],
    xcom: Optional[Any] = None,
    *args,
    **kwargs,
):
    """
    FileObservation's __init__ method.

    :param url: File path
    :param callback: Async function/method that manipulates
    FileIOWrapperBase object and returns Result
    :param xcom: Cross communication object
    """
    super().__init__(url, xcom=xcom)
    self._callback = callback

__repr__()

FileObservation's repr method.

RETURNS DESCRIPTION

String representation of an instance

Source code in illuminate/observation/file.py
def __repr__(self):
    """
    FileObservation's __repr__ method.

    :return: String representation of an instance
    """
    return f'FileObservation("{self.url}",callback="{self._callback}")'

observe(*args, **kwargs) -> AsyncIterator[Union[None, Result]] async

Opens IO file stream asynchronously, pass stream object to a callback and returns None or Result as a context manager.

RETURNS DESCRIPTION
AsyncIterator[Union[None, Result]]

AsyncIterator with None or Result

Source code in illuminate/observation/file.py
@asynccontextmanager
async def observe(
    self, *args, **kwargs
) -> AsyncIterator[Union[None, Result]]:
    """
    Opens IO file stream asynchronously, pass stream object to a callback
    and returns None or Result as a context manager.

    :return: AsyncIterator with None or Result
    """
    _file = None
    _items = None
    async with AsyncExitStack() as stack:
        try:
            _file = await stack.enter_async_context(
                async_open(self.url, "r")
            )
            logger.info(f"{self}.observe() -> {_file}")
            _items = self._callback(_file, *args, **kwargs)
        except Exception as exception:
            logger.warning(f"{self}.observe() -> {exception}")
        finally:
            yield _items
            if _file:
                await _file.close()

illuminate.observation.http.HTTPObservation

Bases: Observation

HTTPObservation class, reads data from HTTP server asynchronously. Inherits Observation class and implements observe method.

Source code in illuminate/observation/http.py
class HTTPObservation(Observation):
    """
    HTTPObservation class, reads data from HTTP server asynchronously. Inherits
    Observation class and implements observe method.
    """

    def __hash__(self) -> int:
        """
        HTTPObservation object hash value.

        :return: int
        """
        body = self.configuration.get("body")
        method = self.configuration.get("method")
        return hash(f"{method}|{self.url}|:{body}")

    def __init__(
        self,
        url: str,
        /,
        allowed: Union[list[str], tuple[str]],
        callback: Callable[[HTTPResponse, tuple, dict], Result],
        xcom: Optional[Any] = None,
        *args,
        **kwargs,
    ):
        """
        HTTPObservation's __init__ method.

        :param url: Data's HTTP URL
        :param allowed: Collection of strings evaluated against self.url to
        determent if URL is allowed
        :param callback: Async function/method that manipulates HTTPResponse
        object and returns Result.
        :param xcom: Cross communication object
        """
        super().__init__(url, xcom=xcom)
        self._allowed = allowed
        self._callback = callback
        self.configuration = kwargs

    @property
    def allowed(self) -> bool:
        """
        Checks if HTTP URL is allowed to be requested.

        :return: bool
        """
        for allowed in self._allowed:
            if self.url.startswith(allowed):
                return True
        return False

    async def observe(self, *args, **kwargs) -> Union[None, Result]:
        """
        Requests data from HTTP server, passes response object to a callback
        and returns None or Result.

        :return: None or Result
        """
        try:
            response = await httpclient.AsyncHTTPClient().fetch(
                self.url, **self.configuration
            )
            logger.info(f"{self}.observe() -> {response}")
            return self._callback(response, *args, **kwargs)
        except Exception as exception:
            logger.warning(f"{self}.observe() -> {exception}")
            return None

    def __repr__(self):
        """
        HTTPObservation's __repr__ method.

        :return: String representation of an instance
        """
        return f'HTTPObservation("{self.url}",callback="{self._callback}")'

__hash__() -> int

HTTPObservation object hash value.

RETURNS DESCRIPTION
int

int

Source code in illuminate/observation/http.py
def __hash__(self) -> int:
    """
    HTTPObservation object hash value.

    :return: int
    """
    body = self.configuration.get("body")
    method = self.configuration.get("method")
    return hash(f"{method}|{self.url}|:{body}")

__init__(url: str, /, allowed: Union[list[str], tuple[str]], callback: Callable[[HTTPResponse, tuple, dict], Result], xcom: Optional[Any] = None, *args, **kwargs)

HTTPObservation's init method.

PARAMETER DESCRIPTION
url

Data's HTTP URL

TYPE: str

allowed

Collection of strings evaluated against self.url to determent if URL is allowed

TYPE: Union[list[str], tuple[str]]

callback

Async function/method that manipulates HTTPResponse object and returns Result.

TYPE: Callable[[HTTPResponse, tuple, dict], Result]

xcom

Cross communication object

TYPE: Optional[Any] DEFAULT: None

Source code in illuminate/observation/http.py
def __init__(
    self,
    url: str,
    /,
    allowed: Union[list[str], tuple[str]],
    callback: Callable[[HTTPResponse, tuple, dict], Result],
    xcom: Optional[Any] = None,
    *args,
    **kwargs,
):
    """
    HTTPObservation's __init__ method.

    :param url: Data's HTTP URL
    :param allowed: Collection of strings evaluated against self.url to
    determent if URL is allowed
    :param callback: Async function/method that manipulates HTTPResponse
    object and returns Result.
    :param xcom: Cross communication object
    """
    super().__init__(url, xcom=xcom)
    self._allowed = allowed
    self._callback = callback
    self.configuration = kwargs

__repr__()

HTTPObservation's repr method.

RETURNS DESCRIPTION

String representation of an instance

Source code in illuminate/observation/http.py
def __repr__(self):
    """
    HTTPObservation's __repr__ method.

    :return: String representation of an instance
    """
    return f'HTTPObservation("{self.url}",callback="{self._callback}")'

allowed() -> bool property

Checks if HTTP URL is allowed to be requested.

RETURNS DESCRIPTION
bool

bool

Source code in illuminate/observation/http.py
@property
def allowed(self) -> bool:
    """
    Checks if HTTP URL is allowed to be requested.

    :return: bool
    """
    for allowed in self._allowed:
        if self.url.startswith(allowed):
            return True
    return False

observe(*args, **kwargs) -> Union[None, Result] async

Requests data from HTTP server, passes response object to a callback and returns None or Result.

RETURNS DESCRIPTION
Union[None, Result]

None or Result

Source code in illuminate/observation/http.py
async def observe(self, *args, **kwargs) -> Union[None, Result]:
    """
    Requests data from HTTP server, passes response object to a callback
    and returns None or Result.

    :return: None or Result
    """
    try:
        response = await httpclient.AsyncHTTPClient().fetch(
            self.url, **self.configuration
        )
        logger.info(f"{self}.observe() -> {response}")
        return self._callback(response, *args, **kwargs)
    except Exception as exception:
        logger.warning(f"{self}.observe() -> {exception}")
        return None

illuminate.observation.sql.SQLObservation

Bases: Observation

SQLObservation class, reads data from database asynchronously. Inherits Observation class and implements observe method.

Source code in illuminate/observation/sql.py
class SQLObservation(Observation):
    """
    SQLObservation class, reads data from database asynchronously. Inherits
    Observation class and implements observe method.
    """

    def __hash__(self) -> int:
        """
        SQLObservation object hash value.

        :return: int
        """
        query = str(self.query)
        return hash(f"{self.url}|:{query}")

    def __init__(
        self,
        query: Union[Select, TextClause],
        url: str,
        /,
        callback: Callable[[AlchemyResult, tuple, dict], Result],
        xcom: Optional[Any] = None,
        *args,
        **kwargs,
    ):
        """
        SQLObservation's __init__ method.

        :param query: SQLAlchemy query object.
        :param url: Database name in project settings.
        :param callback: Async function/method that manipulates AlchemyResult
        object and returns Result.
        :param xcom: Cross communication object
        """
        super().__init__(url, xcom=xcom)
        self._callback = callback
        self.query = query

    async def observe(
        self, session: Type[AsyncSession], *args, **kwargs
    ) -> Union[None, Result]:
        """
        Reads data from database, passes response object to a callback
        and returns None or Result.

        :return: None or Result
        """
        try:
            async with session() as session:  # type: ignore
                async with session.begin():  # type: ignore
                    query = self.query
                    response = await session.execute(query)  # type: ignore
            logger.info(f'{self}.observe(session="{session}")')
            return self._callback(response, *args, **kwargs)
        except Exception as exception:
            logger.warning(f"{self}.observe() -> {exception}")
            return None

    def __repr__(self):
        """
        SQLObservation's __repr__ method.

        :return: String representation of an instance
        """
        return (
            f'SQLObservation("{self.query}","{self.url}",'
            f'callback="{self._callback}")'
        )

__hash__() -> int

SQLObservation object hash value.

RETURNS DESCRIPTION
int

int

Source code in illuminate/observation/sql.py
def __hash__(self) -> int:
    """
    SQLObservation object hash value.

    :return: int
    """
    query = str(self.query)
    return hash(f"{self.url}|:{query}")

__init__(query: Union[Select, TextClause], url: str, /, callback: Callable[[AlchemyResult, tuple, dict], Result], xcom: Optional[Any] = None, *args, **kwargs)

SQLObservation's init method.

PARAMETER DESCRIPTION
query

SQLAlchemy query object.

TYPE: Union[Select, TextClause]

url

Database name in project settings.

TYPE: str

callback

Async function/method that manipulates AlchemyResult object and returns Result.

TYPE: Callable[[AlchemyResult, tuple, dict], Result]

xcom

Cross communication object

TYPE: Optional[Any] DEFAULT: None

Source code in illuminate/observation/sql.py
def __init__(
    self,
    query: Union[Select, TextClause],
    url: str,
    /,
    callback: Callable[[AlchemyResult, tuple, dict], Result],
    xcom: Optional[Any] = None,
    *args,
    **kwargs,
):
    """
    SQLObservation's __init__ method.

    :param query: SQLAlchemy query object.
    :param url: Database name in project settings.
    :param callback: Async function/method that manipulates AlchemyResult
    object and returns Result.
    :param xcom: Cross communication object
    """
    super().__init__(url, xcom=xcom)
    self._callback = callback
    self.query = query

__repr__()

SQLObservation's repr method.

RETURNS DESCRIPTION

String representation of an instance

Source code in illuminate/observation/sql.py
def __repr__(self):
    """
    SQLObservation's __repr__ method.

    :return: String representation of an instance
    """
    return (
        f'SQLObservation("{self.query}","{self.url}",'
        f'callback="{self._callback}")'
    )

observe(session: Type[AsyncSession], *args, **kwargs) -> Union[None, Result] async

Reads data from database, passes response object to a callback and returns None or Result.

RETURNS DESCRIPTION
Union[None, Result]

None or Result

Source code in illuminate/observation/sql.py
async def observe(
    self, session: Type[AsyncSession], *args, **kwargs
) -> Union[None, Result]:
    """
    Reads data from database, passes response object to a callback
    and returns None or Result.

    :return: None or Result
    """
    try:
        async with session() as session:  # type: ignore
            async with session.begin():  # type: ignore
                query = self.query
                response = await session.execute(query)  # type: ignore
        logger.info(f'{self}.observe(session="{session}")')
        return self._callback(response, *args, **kwargs)
    except Exception as exception:
        logger.warning(f"{self}.observe() -> {exception}")
        return None

illuminate.observation.http.SplashObservation

Bases: HTTPObservation

SplashObservation class, reads data from HTTP server asynchronously. Inherits HTTPObservation class and implements observe method.

Constructor's kwargs are used to create Splash service URL. For full list of parameters visit https://splash.readthedocs.io/en/stable/api.html.

Note: URL is passed as positional argument. It will be used as param in Splash service URL.

Source code in illuminate/observation/http.py
class SplashObservation(HTTPObservation):
    """
    SplashObservation class, reads data from HTTP server asynchronously.
    Inherits HTTPObservation class and implements observe method.

    Constructor's kwargs are used to create Splash service URL. For full list
    of parameters visit https://splash.readthedocs.io/en/stable/api.html.

    Note: URL is passed as positional argument. It will be used as param
    in Splash service URL.
    """

    def __hash__(self) -> int:
        """
        SplashObservation object hash value.

        :return: int
        """
        return hash(self.service)

    @property
    def service(self):
        """
        Constructs URL of Splash service

        :return: Splash URL string
        """
        defaults: dict = {
            "host": "localhost",
            "port": 8050,
            "protocol": "http",
            "render": "html",
        }
        parameters = copy(self.configuration)
        for i in defaults:
            if i in parameters:
                defaults[i] = parameters[i]
                del parameters[i]
        parameters["url"] = self.url
        endpoint = "{protocol}://{host}:{port}/render.{render}?"
        endpoint = endpoint.format(**defaults)
        parameters = urllib.parse.urlencode(parameters)
        return f"{endpoint.format(**defaults)}{parameters}"

    async def observe(
        self, configuration: dict, *args, **kwargs
    ) -> Union[None, Result]:
        """
        Requests data from HTTP server and renders response with Splash, passes
        response object to a callback and returns None or Result.

        :param configuration: HTTP configuration dict from settings.py
        :return: None or Result
        """
        try:
            response = await httpclient.AsyncHTTPClient().fetch(
                self.service, **configuration
            )
            logger.info(f"{self}.observe() -> {response}")
            return self._callback(response, *args, **kwargs)
        except Exception as exception:
            logger.warning(f"{self}.observe() -> {exception}")
            return None

    def __repr__(self):
        """
        SplashObservation's __repr__ method.

        :return: String representation of an instance
        """
        return f'SplashObservation("{self.url}",callback="{self._callback}")'

__hash__() -> int

SplashObservation object hash value.

RETURNS DESCRIPTION
int

int

Source code in illuminate/observation/http.py
def __hash__(self) -> int:
    """
    SplashObservation object hash value.

    :return: int
    """
    return hash(self.service)

__repr__()

SplashObservation's repr method.

RETURNS DESCRIPTION

String representation of an instance

Source code in illuminate/observation/http.py
def __repr__(self):
    """
    SplashObservation's __repr__ method.

    :return: String representation of an instance
    """
    return f'SplashObservation("{self.url}",callback="{self._callback}")'

observe(configuration: dict, *args, **kwargs) -> Union[None, Result] async

Requests data from HTTP server and renders response with Splash, passes response object to a callback and returns None or Result.

PARAMETER DESCRIPTION
configuration

HTTP configuration dict from settings.py

TYPE: dict

RETURNS DESCRIPTION
Union[None, Result]

None or Result

Source code in illuminate/observation/http.py
async def observe(
    self, configuration: dict, *args, **kwargs
) -> Union[None, Result]:
    """
    Requests data from HTTP server and renders response with Splash, passes
    response object to a callback and returns None or Result.

    :param configuration: HTTP configuration dict from settings.py
    :return: None or Result
    """
    try:
        response = await httpclient.AsyncHTTPClient().fetch(
            self.service, **configuration
        )
        logger.info(f"{self}.observe() -> {response}")
        return self._callback(response, *args, **kwargs)
    except Exception as exception:
        logger.warning(f"{self}.observe() -> {exception}")
        return None

service() property

Constructs URL of Splash service

RETURNS DESCRIPTION

Splash URL string

Source code in illuminate/observation/http.py
@property
def service(self):
    """
    Constructs URL of Splash service

    :return: Splash URL string
    """
    defaults: dict = {
        "host": "localhost",
        "port": 8050,
        "protocol": "http",
        "render": "html",
    }
    parameters = copy(self.configuration)
    for i in defaults:
        if i in parameters:
            defaults[i] = parameters[i]
            del parameters[i]
    parameters["url"] = self.url
    endpoint = "{protocol}://{host}:{port}/render.{render}?"
    endpoint = endpoint.format(**defaults)
    parameters = urllib.parse.urlencode(parameters)
    return f"{endpoint.format(**defaults)}{parameters}"

illuminate.observer.finding.Finding

Bases: IFinding

Finding class, contains data that will be passed to Adapter object's adapt method.

Source code in illuminate/observer/finding.py
4
5
6
7
8
class Finding(IFinding):
    """
    Finding class, contains data that will be passed to Adapter object's
    adapt method.
    """

illuminate.adapter.adapter.Adapter

Bases: IAdapter

Adapter class, generates Exporter and Observation objects using Finding instances with optional transformation. Class must be inherited and method adapt must be implemented in a child class.

Source code in illuminate/adapter/adapter.py
class Adapter(IAdapter):
    """
    Adapter class, generates Exporter and Observation objects using Finding
    instances with optional transformation. Class must be inherited and method
    adapt must be implemented in a child class.
    """

    priority: int
    """
    Place in Adapter list. If two Adapters have the same Finding in subscriber
    tuple, one with the higher priority will call method adapt first on
    Finding object.
    """

    subscribers: tuple[Type[Finding]]
    """
    Tuple of Finding class children used to determent if Adapter object should
    call method adapt on Finding instance.
    """

    def __init__(self, manager: Optional[Manager] = None):
        """
        Adapter's __init__ method.

        :param manager: Manager object
        """
        self.manager = manager

    async def adapt(
        self, finding: Finding, *args, **kwargs
    ) -> AsyncGenerator[Union[Exporter, Observation], None]:
        """
        Generates Exporter and Observation objects. Must be implemented in a
        child class.

        It is meant to be a scope where additional transformation up on
        finding objects should be performed (like data enrichment from
        additional data source) before yielding Exporter or Observation
        instances.

        :param finding: Finding object
        :return: Async Exporter and Observation object generator
        :raises BasicAdapterException:
        """
        raise BasicAdapterException(
            "Method adapt must be implemented in child class"
        )

priority: int class-attribute

Place in Adapter list. If two Adapters have the same Finding in subscriber tuple, one with the higher priority will call method adapt first on Finding object.

subscribers: tuple[Type[Finding]] class-attribute

Tuple of Finding class children used to determent if Adapter object should call method adapt on Finding instance.

__init__(manager: Optional[Manager] = None)

Adapter's init method.

PARAMETER DESCRIPTION
manager

Manager object

TYPE: Optional[Manager] DEFAULT: None

Source code in illuminate/adapter/adapter.py
def __init__(self, manager: Optional[Manager] = None):
    """
    Adapter's __init__ method.

    :param manager: Manager object
    """
    self.manager = manager

adapt(finding: Finding, *args, **kwargs) -> AsyncGenerator[Union[Exporter, Observation], None] async

Generates Exporter and Observation objects. Must be implemented in a child class.

It is meant to be a scope where additional transformation up on finding objects should be performed (like data enrichment from additional data source) before yielding Exporter or Observation instances.

PARAMETER DESCRIPTION
finding

Finding object

TYPE: Finding

RETURNS DESCRIPTION
AsyncGenerator[Union[Exporter, Observation], None]

Async Exporter and Observation object generator

RAISES DESCRIPTION
BasicAdapterException
Source code in illuminate/adapter/adapter.py
async def adapt(
    self, finding: Finding, *args, **kwargs
) -> AsyncGenerator[Union[Exporter, Observation], None]:
    """
    Generates Exporter and Observation objects. Must be implemented in a
    child class.

    It is meant to be a scope where additional transformation up on
    finding objects should be performed (like data enrichment from
    additional data source) before yielding Exporter or Observation
    instances.

    :param finding: Finding object
    :return: Async Exporter and Observation object generator
    :raises BasicAdapterException:
    """
    raise BasicAdapterException(
        "Method adapt must be implemented in child class"
    )

illuminate.exporter.exporter.Exporter

Bases: IExporter

Exporter class, writes data to destination. Class must be inherited and method export must be implemented in a child class.

Source code in illuminate/exporter/exporter.py
class Exporter(IExporter):
    """
    Exporter class, writes data to destination. Class must be inherited and
    method export must be implemented in a child class.
    """

    async def export(self, *args, **kwargs):
        """
        Writes data to destination. Must be implemented in a child class.

        :return: None
        :raises BasicExporterException:
        """
        raise BasicExporterException(
            "Method export must be implemented in child class"
        )

export(*args, **kwargs) async

Writes data to destination. Must be implemented in a child class.

RETURNS DESCRIPTION

None

RAISES DESCRIPTION
BasicExporterException
Source code in illuminate/exporter/exporter.py
async def export(self, *args, **kwargs):
    """
    Writes data to destination. Must be implemented in a child class.

    :return: None
    :raises BasicExporterException:
    """
    raise BasicExporterException(
        "Method export must be implemented in child class"
    )

illuminate.exporter.influxdb.InfluxDBExporter

Bases: Exporter

InfluxDBExporter class, writes data to InfluxDB database asynchronously. Inherits Exporter class and implements export method.

Each InfluxDBExporter object is responsible for a single transaction with a single database. Attribute name is used to acquire database session object from Manager's sessions attribute.

Constructor kwargs will be passed to client write method. For more information on write method, visit: https://aioinflux.readthedocs.io/en/stable/api.html

Supported write data objects: - DataFrame with DatetimeIndex - Dict containing the keys: measurement, time, tags, fields - String (str or bytes) properly formatted in InfluxDB’s line protocol - An iterable of one of the above

Source code in illuminate/exporter/influxdb.py
class InfluxDBExporter(Exporter):
    """
    InfluxDBExporter class, writes data to InfluxDB database asynchronously.
    Inherits Exporter class and implements export method.

    Each InfluxDBExporter object is responsible for a single transaction with a
    single database. Attribute name is used to acquire database session object
    from Manager's sessions attribute.

    Constructor kwargs will be passed to client write method. For more
    information on write method, visit:
    https://aioinflux.readthedocs.io/en/stable/api.html

    Supported write data objects:
        - DataFrame with DatetimeIndex
        - Dict containing the keys: measurement, time, tags,
        fields
        - String (str or bytes) properly formatted in InfluxDB’s line protocol
        - An iterable of one of the above

    """

    name: str
    """
    InfluxDB database name selector used to get database session object from
    Manager.sessions attribute.
    """

    def __init__(
        self,
        points: Union[
            Union[DataFrame, dict, str, bytes],
            Iterable[Union[DataFrame, dict, str, bytes]],
        ],
        *args,
        **kwargs,
    ):
        """
        InfluxDBExporter's __init__ method.

        :param points: Supported data objects
        """
        self.params = kwargs
        self.points = points

    async def export(self, session: InfluxDBClient, *args, **kwargs) -> None:
        """
        Writes data to Influxdb asynchronously.

        :param session: InfluxDBClient object
        :return: None
        :raises BasicExporterException:
        """
        try:
            await session.write(self.points, **self.params)
        except Exception as exception:
            logger.warning(
                f'{self}.export(session="{session}") -> {exception}'
            )
            raise BasicExporterException
        logger.success(f'{self}.export(session="{session}")')

    def __repr__(self):
        """
        InfluxDBExporter's __repr__ method.

        :return: String representation of an instance
        """
        return f"InfluxDBExporter(points={self.points})"

name: str class-attribute

InfluxDB database name selector used to get database session object from Manager.sessions attribute.

__init__(points: Union[Union[DataFrame, dict, str, bytes], Iterable[Union[DataFrame, dict, str, bytes]]], *args, **kwargs)

InfluxDBExporter's init method.

PARAMETER DESCRIPTION
points

Supported data objects

TYPE: Union[Union[DataFrame, dict, str, bytes], Iterable[Union[DataFrame, dict, str, bytes]]]

Source code in illuminate/exporter/influxdb.py
def __init__(
    self,
    points: Union[
        Union[DataFrame, dict, str, bytes],
        Iterable[Union[DataFrame, dict, str, bytes]],
    ],
    *args,
    **kwargs,
):
    """
    InfluxDBExporter's __init__ method.

    :param points: Supported data objects
    """
    self.params = kwargs
    self.points = points

__repr__()

InfluxDBExporter's repr method.

RETURNS DESCRIPTION

String representation of an instance

Source code in illuminate/exporter/influxdb.py
def __repr__(self):
    """
    InfluxDBExporter's __repr__ method.

    :return: String representation of an instance
    """
    return f"InfluxDBExporter(points={self.points})"

export(session: InfluxDBClient, *args, **kwargs) -> None async

Writes data to Influxdb asynchronously.

PARAMETER DESCRIPTION
session

InfluxDBClient object

TYPE: InfluxDBClient

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
BasicExporterException
Source code in illuminate/exporter/influxdb.py
async def export(self, session: InfluxDBClient, *args, **kwargs) -> None:
    """
    Writes data to Influxdb asynchronously.

    :param session: InfluxDBClient object
    :return: None
    :raises BasicExporterException:
    """
    try:
        await session.write(self.points, **self.params)
    except Exception as exception:
        logger.warning(
            f'{self}.export(session="{session}") -> {exception}'
        )
        raise BasicExporterException
    logger.success(f'{self}.export(session="{session}")')

illuminate.exporter.sql.SQLExporter

Bases: Exporter

SQLExporter class, writes data to SQL database asynchronously. Inherits Exporter class and implements export method.

Each SQLExporter object is responsible for a single transaction with a single database. Attribute name is used to acquire database session object from Manager's sessions attribute.

Supported dialects: - Mysql - Postgres

Source code in illuminate/exporter/sql.py
class SQLExporter(Exporter):
    """
    SQLExporter class, writes data to SQL database asynchronously. Inherits
    Exporter class and implements export method.

    Each SQLExporter object is responsible for a single transaction with a
    single database. Attribute name is used to acquire database session object
    from Manager's sessions attribute.

    Supported dialects:
        - Mysql
        - Postgres
    """

    name: str
    """
    SQL database name selector used to get database session object from
    Manager.sessions attribute.
    """

    def __init__(self, models: Union[list[M], tuple[M]]):
        """
        SQLExporter's __init__ method.

        :param models: SQLAlchemy model objects collection
        """
        self.models = models

    async def export(
        self, session: Type[AsyncSession], *args, **kwargs
    ) -> None:
        """
        Writes data to SQL database asynchronously.

        :param session: AsyncSession object
        :return: None
        :raises BasicExporterException:
        """
        async with session() as session:  # type: ignore
            async with session.begin():  # type: ignore
                session.add_all(self.models)  # type: ignore
                try:
                    await session.commit()  # type: ignore
                except Exception as exception:
                    logger.warning(
                        f'{self}.export(session="{session}") -> {exception}'
                    )
                    raise BasicExporterException
        logger.success(f'{self}.export(session="{session}")')

    def __repr__(self):
        """
        SQLExporter's __repr__ method.

        :return: String representation of an instance
        """
        return f"SQLExporter(models={self.models})"

name: str class-attribute

SQL database name selector used to get database session object from Manager.sessions attribute.

__init__(models: Union[list[M], tuple[M]])

SQLExporter's init method.

PARAMETER DESCRIPTION
models

SQLAlchemy model objects collection

TYPE: Union[list[M], tuple[M]]

Source code in illuminate/exporter/sql.py
def __init__(self, models: Union[list[M], tuple[M]]):
    """
    SQLExporter's __init__ method.

    :param models: SQLAlchemy model objects collection
    """
    self.models = models

__repr__()

SQLExporter's repr method.

RETURNS DESCRIPTION

String representation of an instance

Source code in illuminate/exporter/sql.py
def __repr__(self):
    """
    SQLExporter's __repr__ method.

    :return: String representation of an instance
    """
    return f"SQLExporter(models={self.models})"

export(session: Type[AsyncSession], *args, **kwargs) -> None async

Writes data to SQL database asynchronously.

PARAMETER DESCRIPTION
session

AsyncSession object

TYPE: Type[AsyncSession]

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
BasicExporterException
Source code in illuminate/exporter/sql.py
async def export(
    self, session: Type[AsyncSession], *args, **kwargs
) -> None:
    """
    Writes data to SQL database asynchronously.

    :param session: AsyncSession object
    :return: None
    :raises BasicExporterException:
    """
    async with session() as session:  # type: ignore
        async with session.begin():  # type: ignore
            session.add_all(self.models)  # type: ignore
            try:
                await session.commit()  # type: ignore
            except Exception as exception:
                logger.warning(
                    f'{self}.export(session="{session}") -> {exception}'
                )
                raise BasicExporterException
    logger.success(f'{self}.export(session="{session}")')