Skip to content

Support Classes

illuminate.observation.observation.Observation (IObservation)

Observation class, reads data from the source. Class must be inherited and method observe must be implemented in a child class.

Source code in illuminate/observation/observation.py
class Observation(IObservation):
    """
    Observation class, reads data from the source. Class must be inherited and
    method observe must be implemented in a child class.
    """

    def __hash__(self):
        """
        Observation object hash value.

        :return: None
        :raises BasicObservationException:
        """
        raise BasicObservationException(
            "Property hash must be implemented in child class"
        )

    def __init__(self, url: Any, xcom: Optional[Any] = None):
        """
        Observation's __init__ method.

        :param url: Data's URL
        :param xcom: Cross communication object
        """
        self.url = url
        self.xcom = xcom

    async def observe(self, *args, **kwargs):
        """
        Reads data from the source. Must be implemented in a child class.

        :return: None
        :raises BasicObservationException:
        """
        raise BasicObservationException(
            "Method observe must be implemented in child class"
        )

__hash__(self) special

Observation object hash value.

:return: None :raises BasicObservationException:

Source code in illuminate/observation/observation.py
def __hash__(self):
    """
    Observation object hash value.

    :return: None
    :raises BasicObservationException:
    """
    raise BasicObservationException(
        "Property hash must be implemented in child class"
    )

__init__(self, url: Any, xcom: Optional[Any] = None) special

Observation's init method.

:param url: Data's URL :param xcom: Cross communication object

Source code in illuminate/observation/observation.py
def __init__(self, url: Any, xcom: Optional[Any] = None):
    """
    Observation's __init__ method.

    :param url: Data's URL
    :param xcom: Cross communication object
    """
    self.url = url
    self.xcom = xcom

observe(self, *args, **kwargs) async

Reads data from the source. Must be implemented in a child class.

:return: None :raises BasicObservationException:

Source code in illuminate/observation/observation.py
async def observe(self, *args, **kwargs):
    """
    Reads data from the source. Must be implemented in a child class.

    :return: None
    :raises BasicObservationException:
    """
    raise BasicObservationException(
        "Method observe must be implemented in child class"
    )

illuminate.observation.file.FileObservation (Observation)

FileObservation class, reads file content asynchronously. Inherits Observation class and implements observe method.

Source code in illuminate/observation/file.py
class FileObservation(Observation):
    """
    FileObservation class, reads file content asynchronously. Inherits
    Observation class and implements observe method.
    """

    def __hash__(self) -> int:
        """
        FileObservation object hash value.

        :return: int
        """
        return hash(self.url)

    def __init__(
        self,
        url: str,
        /,
        callback: Callable[[FileIOWrapperBase, tuple, dict], Result],
        xcom: Optional[Any] = None,
        *args,
        **kwargs,
    ):
        """
        FileObservation's __init__ method.

        :param url: File path
        :param callback: Async function/method that manipulates
        FileIOWrapperBase object and returns Result
        :param xcom: Cross communication object
        """
        super().__init__(url, xcom=xcom)
        self._callback = callback

    @asynccontextmanager
    async def observe(
        self, *args, **kwargs
    ) -> AsyncIterator[Union[None, Result]]:
        """
        Opens IO file stream asynchronously, pass stream object to a callback
        and returns None or Result as a context manager.

        :return: AsyncIterator with None or Result
        """
        _file = None
        _items = None
        async with AsyncExitStack() as stack:
            try:
                _file = await stack.enter_async_context(
                    async_open(self.url, "r")
                )
                logger.info(f"{self}.observe() -> {_file}")
                _items = self._callback(_file, *args, **kwargs)
            except Exception as exception:
                logger.warning(f"{self}.observe() -> {exception}")
            finally:
                yield _items
                if _file:
                    await _file.close()

    def __repr__(self):
        """
        FileObservation's __repr__ method.

        :return: String representation of an instance
        """
        return f'FileObservation("{self.url}",callback="{self._callback}")'

__hash__(self) -> int special

FileObservation object hash value.

:return: int

Source code in illuminate/observation/file.py
def __hash__(self) -> int:
    """
    FileObservation object hash value.

    :return: int
    """
    return hash(self.url)

__init__(/, self, url: str, callback: Callable[[FileIOWrapperBase, tuple, dict], Result], xcom: Optional[Any] = None, *args, **kwargs) special

FileObservation's init method.

:param url: File path :param callback: Async function/method that manipulates FileIOWrapperBase object and returns Result :param xcom: Cross communication object

Source code in illuminate/observation/file.py
def __init__(
    self,
    url: str,
    /,
    callback: Callable[[FileIOWrapperBase, tuple, dict], Result],
    xcom: Optional[Any] = None,
    *args,
    **kwargs,
):
    """
    FileObservation's __init__ method.

    :param url: File path
    :param callback: Async function/method that manipulates
    FileIOWrapperBase object and returns Result
    :param xcom: Cross communication object
    """
    super().__init__(url, xcom=xcom)
    self._callback = callback

__repr__(self) special

FileObservation's repr method.

:return: String representation of an instance

Source code in illuminate/observation/file.py
def __repr__(self):
    """
    FileObservation's __repr__ method.

    :return: String representation of an instance
    """
    return f'FileObservation("{self.url}",callback="{self._callback}")'

observe(self, *args, **kwargs) -> AsyncIterator[Union[None, Result]]

Opens IO file stream asynchronously, pass stream object to a callback and returns None or Result as a context manager.

:return: AsyncIterator with None or Result

Source code in illuminate/observation/file.py
@asynccontextmanager
async def observe(
    self, *args, **kwargs
) -> AsyncIterator[Union[None, Result]]:
    """
    Opens IO file stream asynchronously, pass stream object to a callback
    and returns None or Result as a context manager.

    :return: AsyncIterator with None or Result
    """
    _file = None
    _items = None
    async with AsyncExitStack() as stack:
        try:
            _file = await stack.enter_async_context(
                async_open(self.url, "r")
            )
            logger.info(f"{self}.observe() -> {_file}")
            _items = self._callback(_file, *args, **kwargs)
        except Exception as exception:
            logger.warning(f"{self}.observe() -> {exception}")
        finally:
            yield _items
            if _file:
                await _file.close()

illuminate.observation.http.HTTPObservation (Observation)

HTTPObservation class, reads data from HTTP server asynchronously. Inherits Observation class and implements observe method.

Source code in illuminate/observation/http.py
class HTTPObservation(Observation):
    """
    HTTPObservation class, reads data from HTTP server asynchronously. Inherits
    Observation class and implements observe method.
    """

    def __hash__(self) -> int:
        """
        HTTPObservation object hash value.

        :return: int
        """
        body = self.configuration.get("body")
        method = self.configuration.get("method")
        return hash(f"{method}|{self.url}|:{body}")

    def __init__(
        self,
        url: str,
        /,
        allowed: Union[list[str], tuple[str]],
        callback: Callable[[HTTPResponse, tuple, dict], Result],
        xcom: Optional[Any] = None,
        *args,
        **kwargs,
    ):
        """
        HTTPObservation's __init__ method.

        :param url: Data's HTTP URL
        :param allowed: Collection of strings evaluated against self.url to
        determent if URL is allowed
        :param callback: Async function/method that manipulates HTTPResponse
        object and returns Result.
        :param xcom: Cross communication object
        """
        super().__init__(url, xcom=xcom)
        self._allowed = allowed
        self._callback = callback
        self.configuration = kwargs

    @property
    def allowed(self) -> bool:
        """
        Checks if HTTP URL is allowed to be requested.

        :return: bool
        """
        for allowed in self._allowed:
            if self.url.startswith(allowed):
                return True
        return False

    async def observe(self, *args, **kwargs) -> Union[None, Result]:
        """
        Requests data from HTTP server, passes response object to a callback
        and returns None or Result.

        :return: None or Result
        """
        try:
            response = await httpclient.AsyncHTTPClient().fetch(
                self.url, **self.configuration
            )
            logger.info(f"{self}.observe() -> {response}")
            return self._callback(response, *args, **kwargs)
        except Exception as exception:
            logger.warning(f"{self}.observe() -> {exception}")
            return None

    def __repr__(self):
        """
        HTTPObservation's __repr__ method.

        :return: String representation of an instance
        """
        return f'HTTPObservation("{self.url}",callback="{self._callback}")'

allowed: bool property readonly

Checks if HTTP URL is allowed to be requested.

:return: bool

__hash__(self) -> int special

HTTPObservation object hash value.

:return: int

Source code in illuminate/observation/http.py
def __hash__(self) -> int:
    """
    HTTPObservation object hash value.

    :return: int
    """
    body = self.configuration.get("body")
    method = self.configuration.get("method")
    return hash(f"{method}|{self.url}|:{body}")

__init__(/, self, url: str, allowed: Union[list[str], tuple[str]], callback: Callable[[HTTPResponse, tuple, dict], Result], xcom: Optional[Any] = None, *args, **kwargs) special

HTTPObservation's init method.

:param url: Data's HTTP URL :param allowed: Collection of strings evaluated against self.url to determent if URL is allowed :param callback: Async function/method that manipulates HTTPResponse object and returns Result. :param xcom: Cross communication object

Source code in illuminate/observation/http.py
def __init__(
    self,
    url: str,
    /,
    allowed: Union[list[str], tuple[str]],
    callback: Callable[[HTTPResponse, tuple, dict], Result],
    xcom: Optional[Any] = None,
    *args,
    **kwargs,
):
    """
    HTTPObservation's __init__ method.

    :param url: Data's HTTP URL
    :param allowed: Collection of strings evaluated against self.url to
    determent if URL is allowed
    :param callback: Async function/method that manipulates HTTPResponse
    object and returns Result.
    :param xcom: Cross communication object
    """
    super().__init__(url, xcom=xcom)
    self._allowed = allowed
    self._callback = callback
    self.configuration = kwargs

__repr__(self) special

HTTPObservation's repr method.

:return: String representation of an instance

Source code in illuminate/observation/http.py
def __repr__(self):
    """
    HTTPObservation's __repr__ method.

    :return: String representation of an instance
    """
    return f'HTTPObservation("{self.url}",callback="{self._callback}")'

observe(self, *args, **kwargs) -> Union[None, Result] async

Requests data from HTTP server, passes response object to a callback and returns None or Result.

:return: None or Result

Source code in illuminate/observation/http.py
async def observe(self, *args, **kwargs) -> Union[None, Result]:
    """
    Requests data from HTTP server, passes response object to a callback
    and returns None or Result.

    :return: None or Result
    """
    try:
        response = await httpclient.AsyncHTTPClient().fetch(
            self.url, **self.configuration
        )
        logger.info(f"{self}.observe() -> {response}")
        return self._callback(response, *args, **kwargs)
    except Exception as exception:
        logger.warning(f"{self}.observe() -> {exception}")
        return None

illuminate.observation.sql.SQLObservation (Observation)

SQLObservation class, reads data from database asynchronously. Inherits Observation class and implements observe method.

Source code in illuminate/observation/sql.py
class SQLObservation(Observation):
    """
    SQLObservation class, reads data from database asynchronously. Inherits
    Observation class and implements observe method.
    """

    def __hash__(self) -> int:
        """
        SQLObservation object hash value.

        :return: int
        """
        query = str(self.query)
        return hash(f"{self.url}|:{query}")

    def __init__(
        self,
        query: Union[Select, TextClause],
        url: str,
        /,
        callback: Callable[[AlchemyResult, tuple, dict], Result],
        xcom: Optional[Any] = None,
        *args,
        **kwargs,
    ):
        """
        SQLObservation's __init__ method.

        :param query: SQLAlchemy query object.
        :param url: Database name in project settings.
        :param callback: Async function/method that manipulates AlchemyResult
        object and returns Result.
        :param xcom: Cross communication object
        """
        super().__init__(url, xcom=xcom)
        self._callback = callback
        self.query = query

    async def observe(
        self, session: Type[AsyncSession], *args, **kwargs
    ) -> Union[None, Result]:
        """
        Reads data from database, passes response object to a callback
        and returns None or Result.

        :return: None or Result
        """
        try:
            async with session() as session:  # type: ignore
                async with session.begin():  # type: ignore
                    query = self.query
                    response = await session.execute(query)  # type: ignore
            logger.info(f'{self}.observe(session="{session}")')
            return self._callback(response, *args, **kwargs)
        except Exception as exception:
            logger.warning(f"{self}.observe() -> {exception}")
            return None

    def __repr__(self):
        """
        SQLObservation's __repr__ method.

        :return: String representation of an instance
        """
        return (
            f'SQLObservation("{self.query}","{self.url}",'
            f'callback="{self._callback}")'
        )

__hash__(self) -> int special

SQLObservation object hash value.

:return: int

Source code in illuminate/observation/sql.py
def __hash__(self) -> int:
    """
    SQLObservation object hash value.

    :return: int
    """
    query = str(self.query)
    return hash(f"{self.url}|:{query}")

__init__(/, self, query: Union[Select, TextClause], url: str, callback: Callable[[AlchemyResult, tuple, dict], Result], xcom: Optional[Any] = None, *args, **kwargs) special

SQLObservation's init method.

:param query: SQLAlchemy query object. :param url: Database name in project settings. :param callback: Async function/method that manipulates AlchemyResult object and returns Result. :param xcom: Cross communication object

Source code in illuminate/observation/sql.py
def __init__(
    self,
    query: Union[Select, TextClause],
    url: str,
    /,
    callback: Callable[[AlchemyResult, tuple, dict], Result],
    xcom: Optional[Any] = None,
    *args,
    **kwargs,
):
    """
    SQLObservation's __init__ method.

    :param query: SQLAlchemy query object.
    :param url: Database name in project settings.
    :param callback: Async function/method that manipulates AlchemyResult
    object and returns Result.
    :param xcom: Cross communication object
    """
    super().__init__(url, xcom=xcom)
    self._callback = callback
    self.query = query

__repr__(self) special

SQLObservation's repr method.

:return: String representation of an instance

Source code in illuminate/observation/sql.py
def __repr__(self):
    """
    SQLObservation's __repr__ method.

    :return: String representation of an instance
    """
    return (
        f'SQLObservation("{self.query}","{self.url}",'
        f'callback="{self._callback}")'
    )

observe(self, session: Type[AsyncSession], *args, **kwargs) -> Union[None, Result] async

Reads data from database, passes response object to a callback and returns None or Result.

:return: None or Result

Source code in illuminate/observation/sql.py
async def observe(
    self, session: Type[AsyncSession], *args, **kwargs
) -> Union[None, Result]:
    """
    Reads data from database, passes response object to a callback
    and returns None or Result.

    :return: None or Result
    """
    try:
        async with session() as session:  # type: ignore
            async with session.begin():  # type: ignore
                query = self.query
                response = await session.execute(query)  # type: ignore
        logger.info(f'{self}.observe(session="{session}")')
        return self._callback(response, *args, **kwargs)
    except Exception as exception:
        logger.warning(f"{self}.observe() -> {exception}")
        return None

illuminate.observation.http.SplashObservation (HTTPObservation)

SplashObservation class, reads data from HTTP server asynchronously. Inherits HTTPObservation class and implements observe method.

Constructor's kwargs are used to create Splash service URL. For full list of parameters visit https://splash.readthedocs.io/en/stable/api.html.

Note: URL is passed as positional argument. It will be used as param in Splash service URL.

Source code in illuminate/observation/http.py
class SplashObservation(HTTPObservation):
    """
    SplashObservation class, reads data from HTTP server asynchronously.
    Inherits HTTPObservation class and implements observe method.

    Constructor's kwargs are used to create Splash service URL. For full list
    of parameters visit https://splash.readthedocs.io/en/stable/api.html.

    Note: URL is passed as positional argument. It will be used as param
    in Splash service URL.
    """

    def __hash__(self) -> int:
        """
        SplashObservation object hash value.

        :return: int
        """
        return hash(self.service)

    @property
    def service(self):
        """
        Constructs URL of Splash service

        :return: Splash URL string
        """
        defaults: dict = {
            "host": "localhost",
            "port": 8050,
            "protocol": "http",
            "render": "html",
        }
        parameters = copy(self.configuration)
        for i in defaults:
            if i in parameters:
                defaults[i] = parameters[i]
                del parameters[i]
        parameters["url"] = self.url
        endpoint = "{protocol}://{host}:{port}/render.{render}?"
        endpoint = endpoint.format(**defaults)
        parameters = urllib.parse.urlencode(parameters)
        return f"{endpoint.format(**defaults)}{parameters}"

    async def observe(
        self, configuration: dict, *args, **kwargs
    ) -> Union[None, Result]:
        """
        Requests data from HTTP server and renders response with Splash, passes
        response object to a callback and returns None or Result.

        :param configuration: HTTP configuration dict from settings.py
        :return: None or Result
        """
        try:
            response = await httpclient.AsyncHTTPClient().fetch(
                self.service, **configuration
            )
            logger.info(f"{self}.observe() -> {response}")
            return self._callback(response, *args, **kwargs)
        except Exception as exception:
            logger.warning(f"{self}.observe() -> {exception}")
            return None

    def __repr__(self):
        """
        SplashObservation's __repr__ method.

        :return: String representation of an instance
        """
        return f'SplashObservation("{self.url}",callback="{self._callback}")'

service property readonly

Constructs URL of Splash service

:return: Splash URL string

__hash__(self) -> int special

SplashObservation object hash value.

:return: int

Source code in illuminate/observation/http.py
def __hash__(self) -> int:
    """
    SplashObservation object hash value.

    :return: int
    """
    return hash(self.service)

__repr__(self) special

SplashObservation's repr method.

:return: String representation of an instance

Source code in illuminate/observation/http.py
def __repr__(self):
    """
    SplashObservation's __repr__ method.

    :return: String representation of an instance
    """
    return f'SplashObservation("{self.url}",callback="{self._callback}")'

observe(self, configuration: dict, *args, **kwargs) -> Union[None, Result] async

Requests data from HTTP server and renders response with Splash, passes response object to a callback and returns None or Result.

:param configuration: HTTP configuration dict from settings.py :return: None or Result

Source code in illuminate/observation/http.py
async def observe(
    self, configuration: dict, *args, **kwargs
) -> Union[None, Result]:
    """
    Requests data from HTTP server and renders response with Splash, passes
    response object to a callback and returns None or Result.

    :param configuration: HTTP configuration dict from settings.py
    :return: None or Result
    """
    try:
        response = await httpclient.AsyncHTTPClient().fetch(
            self.service, **configuration
        )
        logger.info(f"{self}.observe() -> {response}")
        return self._callback(response, *args, **kwargs)
    except Exception as exception:
        logger.warning(f"{self}.observe() -> {exception}")
        return None

illuminate.observer.finding.Finding (IFinding)

Finding class, contains data that will be passed to Adapter object's adapt method.

Source code in illuminate/observer/finding.py
class Finding(IFinding):
    """
    Finding class, contains data that will be passed to Adapter object's
    adapt method.
    """

illuminate.adapter.adapter.Adapter (IAdapter)

Adapter class, generates Exporter and Observation objects using Finding instances with optional transformation. Class must be inherited and method adapt must be implemented in a child class.

Source code in illuminate/adapter/adapter.py
class Adapter(IAdapter):
    """
    Adapter class, generates Exporter and Observation objects using Finding
    instances with optional transformation. Class must be inherited and method
    adapt must be implemented in a child class.
    """

    priority: int
    """
    Place in Adapter list. If two Adapters have the same Finding in subscriber
    tuple, one with the higher priority will call method adapt first on
    Finding object.
    """

    subscribers: tuple[Type[Finding]]
    """
    Tuple of Finding class children used to determent if Adapter object should
    call method adapt on Finding instance.
    """

    def __init__(self, manager: Optional[Manager] = None):
        """
        Adapter's __init__ method.

        :param manager: Manager object
        """
        self.manager = manager

    async def adapt(
        self, finding: Finding, *args, **kwargs
    ) -> AsyncGenerator[Union[Exporter, Observation], None]:
        """
        Generates Exporter and Observation objects. Must be implemented in a
        child class.

        It is meant to be a scope where additional transformation up on
        finding objects should be performed (like data enrichment from
        additional data source) before yielding Exporter or Observation
        instances.

        :param finding: Finding object
        :return: Async Exporter and Observation object generator
        :raises BasicAdapterException:
        """
        raise BasicAdapterException(
            "Method adapt must be implemented in child class"
        )

__init__(self, manager: Optional[Manager] = None) special

Adapter's init method.

:param manager: Manager object

Source code in illuminate/adapter/adapter.py
def __init__(self, manager: Optional[Manager] = None):
    """
    Adapter's __init__ method.

    :param manager: Manager object
    """
    self.manager = manager

adapt(self, finding: Finding, *args, **kwargs) -> AsyncGenerator[Union[Exporter, Observation], None] async

Generates Exporter and Observation objects. Must be implemented in a child class.

It is meant to be a scope where additional transformation up on finding objects should be performed (like data enrichment from additional data source) before yielding Exporter or Observation instances.

:param finding: Finding object :return: Async Exporter and Observation object generator :raises BasicAdapterException:

Source code in illuminate/adapter/adapter.py
async def adapt(
    self, finding: Finding, *args, **kwargs
) -> AsyncGenerator[Union[Exporter, Observation], None]:
    """
    Generates Exporter and Observation objects. Must be implemented in a
    child class.

    It is meant to be a scope where additional transformation up on
    finding objects should be performed (like data enrichment from
    additional data source) before yielding Exporter or Observation
    instances.

    :param finding: Finding object
    :return: Async Exporter and Observation object generator
    :raises BasicAdapterException:
    """
    raise BasicAdapterException(
        "Method adapt must be implemented in child class"
    )

illuminate.exporter.exporter.Exporter (IExporter)

Exporter class, writes data to destination. Class must be inherited and method export must be implemented in a child class.

Source code in illuminate/exporter/exporter.py
class Exporter(IExporter):
    """
    Exporter class, writes data to destination. Class must be inherited and
    method export must be implemented in a child class.
    """

    async def export(self, *args, **kwargs):
        """
        Writes data to destination. Must be implemented in a child class.

        :return: None
        :raises BasicExporterException:
        """
        raise BasicExporterException(
            "Method export must be implemented in child class"
        )

export(self, *args, **kwargs) async

Writes data to destination. Must be implemented in a child class.

:return: None :raises BasicExporterException:

Source code in illuminate/exporter/exporter.py
async def export(self, *args, **kwargs):
    """
    Writes data to destination. Must be implemented in a child class.

    :return: None
    :raises BasicExporterException:
    """
    raise BasicExporterException(
        "Method export must be implemented in child class"
    )

illuminate.exporter.influxdb.InfluxDBExporter (Exporter)

InfluxDBExporter class, writes data to InfluxDB database asynchronously. Inherits Exporter class and implements export method.

Each InfluxDBExporter object is responsible for a single transaction with a single database. Attribute name is used to acquire database session object from Manager's sessions attribute.

Constructor kwargs will be passed to client write method. For more information on write method, visit: https://aioinflux.readthedocs.io/en/stable/api.html

Supported write data objects: - DataFrame with DatetimeIndex - Dict containing the keys: measurement, time, tags, fields - String (str or bytes) properly formatted in InfluxDB’s line protocol - An iterable of one of the above

Source code in illuminate/exporter/influxdb.py
class InfluxDBExporter(Exporter):
    """
    InfluxDBExporter class, writes data to InfluxDB database asynchronously.
    Inherits Exporter class and implements export method.

    Each InfluxDBExporter object is responsible for a single transaction with a
    single database. Attribute name is used to acquire database session object
    from Manager's sessions attribute.

    Constructor kwargs will be passed to client write method. For more
    information on write method, visit:
    https://aioinflux.readthedocs.io/en/stable/api.html

    Supported write data objects:
        - DataFrame with DatetimeIndex
        - Dict containing the keys: measurement, time, tags,
        fields
        - String (str or bytes) properly formatted in InfluxDB’s line protocol
        - An iterable of one of the above

    """

    name: str
    """
    InfluxDB database name selector used to get database session object from
    Manager.sessions attribute.
    """

    def __init__(
        self,
        points: Union[
            Union[DataFrame, dict, str, bytes],
            Iterable[Union[DataFrame, dict, str, bytes]],
        ],
        *args,
        **kwargs,
    ):
        """
        InfluxDBExporter's __init__ method.

        :param points: Supported data objects
        """
        self.params = kwargs
        self.points = points

    async def export(self, session: InfluxDBClient, *args, **kwargs) -> None:
        """
        Writes data to Influxdb asynchronously.

        :param session: InfluxDBClient object
        :return: None
        :raises BasicExporterException:
        """
        try:
            await session.write(self.points, **self.params)
        except Exception as exception:
            logger.warning(
                f'{self}.export(session="{session}") -> {exception}'
            )
            raise BasicExporterException
        logger.success(f'{self}.export(session="{session}")')

    def __repr__(self):
        """
        InfluxDBExporter's __repr__ method.

        :return: String representation of an instance
        """
        return f"InfluxDBExporter(points={self.points})"

__init__(self, points: Union[Union[DataFrame, dict, str, bytes], Iterable[Union[DataFrame, dict, str, bytes]]], *args, **kwargs) special

InfluxDBExporter's init method.

:param points: Supported data objects

Source code in illuminate/exporter/influxdb.py
def __init__(
    self,
    points: Union[
        Union[DataFrame, dict, str, bytes],
        Iterable[Union[DataFrame, dict, str, bytes]],
    ],
    *args,
    **kwargs,
):
    """
    InfluxDBExporter's __init__ method.

    :param points: Supported data objects
    """
    self.params = kwargs
    self.points = points

__repr__(self) special

InfluxDBExporter's repr method.

:return: String representation of an instance

Source code in illuminate/exporter/influxdb.py
def __repr__(self):
    """
    InfluxDBExporter's __repr__ method.

    :return: String representation of an instance
    """
    return f"InfluxDBExporter(points={self.points})"

export(self, session: InfluxDBClient, *args, **kwargs) -> None async

Writes data to Influxdb asynchronously.

:param session: InfluxDBClient object :return: None :raises BasicExporterException:

Source code in illuminate/exporter/influxdb.py
async def export(self, session: InfluxDBClient, *args, **kwargs) -> None:
    """
    Writes data to Influxdb asynchronously.

    :param session: InfluxDBClient object
    :return: None
    :raises BasicExporterException:
    """
    try:
        await session.write(self.points, **self.params)
    except Exception as exception:
        logger.warning(
            f'{self}.export(session="{session}") -> {exception}'
        )
        raise BasicExporterException
    logger.success(f'{self}.export(session="{session}")')

illuminate.exporter.sql.SQLExporter (Exporter)

SQLExporter class, writes data to SQL database asynchronously. Inherits Exporter class and implements export method.

Each SQLExporter object is responsible for a single transaction with a single database. Attribute name is used to acquire database session object from Manager's sessions attribute.

Supported dialects: - Mysql - Postgres

Source code in illuminate/exporter/sql.py
class SQLExporter(Exporter):
    """
    SQLExporter class, writes data to SQL database asynchronously. Inherits
    Exporter class and implements export method.

    Each SQLExporter object is responsible for a single transaction with a
    single database. Attribute name is used to acquire database session object
    from Manager's sessions attribute.

    Supported dialects:
        - Mysql
        - Postgres
    """

    name: str
    """
    SQL database name selector used to get database session object from
    Manager.sessions attribute.
    """

    def __init__(self, models: Union[list[M], tuple[M]]):
        """
        SQLExporter's __init__ method.

        :param models: SQLAlchemy model objects collection
        """
        self.models = models

    async def export(
        self, session: Type[AsyncSession], *args, **kwargs
    ) -> None:
        """
        Writes data to SQL database asynchronously.

        :param session: AsyncSession object
        :return: None
        :raises BasicExporterException:
        """
        async with session() as session:  # type: ignore
            async with session.begin():  # type: ignore
                session.add_all(self.models)  # type: ignore
                try:
                    await session.commit()  # type: ignore
                except Exception as exception:
                    logger.warning(
                        f'{self}.export(session="{session}") -> {exception}'
                    )
                    raise BasicExporterException
        logger.success(f'{self}.export(session="{session}")')

    def __repr__(self):
        """
        SQLExporter's __repr__ method.

        :return: String representation of an instance
        """
        return f"SQLExporter(models={self.models})"

__init__(self, models: Union[list[M], tuple[M]]) special

SQLExporter's init method.

:param models: SQLAlchemy model objects collection

Source code in illuminate/exporter/sql.py
def __init__(self, models: Union[list[M], tuple[M]]):
    """
    SQLExporter's __init__ method.

    :param models: SQLAlchemy model objects collection
    """
    self.models = models

__repr__(self) special

SQLExporter's repr method.

:return: String representation of an instance

Source code in illuminate/exporter/sql.py
def __repr__(self):
    """
    SQLExporter's __repr__ method.

    :return: String representation of an instance
    """
    return f"SQLExporter(models={self.models})"

export(self, session: Type[AsyncSession], *args, **kwargs) -> None async

Writes data to SQL database asynchronously.

:param session: AsyncSession object :return: None :raises BasicExporterException:

Source code in illuminate/exporter/sql.py
async def export(
    self, session: Type[AsyncSession], *args, **kwargs
) -> None:
    """
    Writes data to SQL database asynchronously.

    :param session: AsyncSession object
    :return: None
    :raises BasicExporterException:
    """
    async with session() as session:  # type: ignore
        async with session.begin():  # type: ignore
            session.add_all(self.models)  # type: ignore
            try:
                await session.commit()  # type: ignore
            except Exception as exception:
                logger.warning(
                    f'{self}.export(session="{session}") -> {exception}'
                )
                raise BasicExporterException
    logger.success(f'{self}.export(session="{session}")')