Support Classes
illuminate.observation.observation.Observation (IObservation)
Observation class, reads data from the source. Class must be inherited and method observe must be implemented in a child class.
Source code in illuminate/observation/observation.py
class Observation(IObservation):
"""
Observation class, reads data from the source. Class must be inherited and
method observe must be implemented in a child class.
"""
def __hash__(self):
"""
Observation object hash value.
:return: None
:raises BasicObservationException:
"""
raise BasicObservationException(
"Property hash must be implemented in child class"
)
def __init__(self, url: Any, xcom: Optional[Any] = None):
"""
Observation's __init__ method.
:param url: Data's URL
:param xcom: Cross communication object
"""
self.url = url
self.xcom = xcom
async def observe(self, *args, **kwargs):
"""
Reads data from the source. Must be implemented in a child class.
:return: None
:raises BasicObservationException:
"""
raise BasicObservationException(
"Method observe must be implemented in child class"
)
__hash__(self)
special
Observation object hash value.
:return: None :raises BasicObservationException:
__init__(self, url: Any, xcom: Optional[Any] = None)
special
Observation's init method.
:param url: Data's URL :param xcom: Cross communication object
observe(self, *args, **kwargs)
async
Reads data from the source. Must be implemented in a child class.
:return: None :raises BasicObservationException:
Source code in illuminate/observation/observation.py
illuminate.observation.file.FileObservation (Observation)
FileObservation class, reads file content asynchronously. Inherits Observation class and implements observe method.
Source code in illuminate/observation/file.py
class FileObservation(Observation):
"""
FileObservation class, reads file content asynchronously. Inherits
Observation class and implements observe method.
"""
def __hash__(self) -> int:
"""
FileObservation object hash value.
:return: int
"""
return hash(self.url)
def __init__(
self,
url: str,
/,
callback: Callable[[FileIOWrapperBase, tuple, dict], Result],
xcom: Optional[Any] = None,
*args,
**kwargs,
):
"""
FileObservation's __init__ method.
:param url: File path
:param callback: Async function/method that manipulates
FileIOWrapperBase object and returns Result
:param xcom: Cross communication object
"""
super().__init__(url, xcom=xcom)
self._callback = callback
@asynccontextmanager
async def observe(
self, *args, **kwargs
) -> AsyncIterator[Union[None, Result]]:
"""
Opens IO file stream asynchronously, pass stream object to a callback
and returns None or Result as a context manager.
:return: AsyncIterator with None or Result
"""
_file = None
_items = None
async with AsyncExitStack() as stack:
try:
_file = await stack.enter_async_context(
async_open(self.url, "r")
)
logger.info(f"{self}.observe() -> {_file}")
_items = self._callback(_file, *args, **kwargs)
except Exception as exception:
logger.warning(f"{self}.observe() -> {exception}")
finally:
yield _items
if _file:
await _file.close()
def __repr__(self):
"""
FileObservation's __repr__ method.
:return: String representation of an instance
"""
return f'FileObservation("{self.url}",callback="{self._callback}")'
__hash__(self) -> int
special
__init__(/, self, url: str, callback: Callable[[FileIOWrapperBase, tuple, dict], Result], xcom: Optional[Any] = None, *args, **kwargs)
special
FileObservation's init method.
:param url: File path :param callback: Async function/method that manipulates FileIOWrapperBase object and returns Result :param xcom: Cross communication object
Source code in illuminate/observation/file.py
def __init__(
self,
url: str,
/,
callback: Callable[[FileIOWrapperBase, tuple, dict], Result],
xcom: Optional[Any] = None,
*args,
**kwargs,
):
"""
FileObservation's __init__ method.
:param url: File path
:param callback: Async function/method that manipulates
FileIOWrapperBase object and returns Result
:param xcom: Cross communication object
"""
super().__init__(url, xcom=xcom)
self._callback = callback
__repr__(self)
special
observe(self, *args, **kwargs) -> AsyncIterator[Union[None, Result]]
Opens IO file stream asynchronously, pass stream object to a callback and returns None or Result as a context manager.
:return: AsyncIterator with None or Result
Source code in illuminate/observation/file.py
@asynccontextmanager
async def observe(
self, *args, **kwargs
) -> AsyncIterator[Union[None, Result]]:
"""
Opens IO file stream asynchronously, pass stream object to a callback
and returns None or Result as a context manager.
:return: AsyncIterator with None or Result
"""
_file = None
_items = None
async with AsyncExitStack() as stack:
try:
_file = await stack.enter_async_context(
async_open(self.url, "r")
)
logger.info(f"{self}.observe() -> {_file}")
_items = self._callback(_file, *args, **kwargs)
except Exception as exception:
logger.warning(f"{self}.observe() -> {exception}")
finally:
yield _items
if _file:
await _file.close()
illuminate.observation.http.HTTPObservation (Observation)
HTTPObservation class, reads data from HTTP server asynchronously. Inherits Observation class and implements observe method.
Source code in illuminate/observation/http.py
class HTTPObservation(Observation):
"""
HTTPObservation class, reads data from HTTP server asynchronously. Inherits
Observation class and implements observe method.
"""
def __hash__(self) -> int:
"""
HTTPObservation object hash value.
:return: int
"""
body = self.configuration.get("body")
method = self.configuration.get("method")
return hash(f"{method}|{self.url}|:{body}")
def __init__(
self,
url: str,
/,
allowed: Union[list[str], tuple[str]],
callback: Callable[[HTTPResponse, tuple, dict], Result],
xcom: Optional[Any] = None,
*args,
**kwargs,
):
"""
HTTPObservation's __init__ method.
:param url: Data's HTTP URL
:param allowed: Collection of strings evaluated against self.url to
determent if URL is allowed
:param callback: Async function/method that manipulates HTTPResponse
object and returns Result.
:param xcom: Cross communication object
"""
super().__init__(url, xcom=xcom)
self._allowed = allowed
self._callback = callback
self.configuration = kwargs
@property
def allowed(self) -> bool:
"""
Checks if HTTP URL is allowed to be requested.
:return: bool
"""
for allowed in self._allowed:
if self.url.startswith(allowed):
return True
return False
async def observe(self, *args, **kwargs) -> Union[None, Result]:
"""
Requests data from HTTP server, passes response object to a callback
and returns None or Result.
:return: None or Result
"""
try:
response = await httpclient.AsyncHTTPClient().fetch(
self.url, **self.configuration
)
logger.info(f"{self}.observe() -> {response}")
return self._callback(response, *args, **kwargs)
except Exception as exception:
logger.warning(f"{self}.observe() -> {exception}")
return None
def __repr__(self):
"""
HTTPObservation's __repr__ method.
:return: String representation of an instance
"""
return f'HTTPObservation("{self.url}",callback="{self._callback}")'
allowed: bool
property
readonly
Checks if HTTP URL is allowed to be requested.
:return: bool
__hash__(self) -> int
special
HTTPObservation object hash value.
:return: int
__init__(/, self, url: str, allowed: Union[list[str], tuple[str]], callback: Callable[[HTTPResponse, tuple, dict], Result], xcom: Optional[Any] = None, *args, **kwargs)
special
HTTPObservation's init method.
:param url: Data's HTTP URL :param allowed: Collection of strings evaluated against self.url to determent if URL is allowed :param callback: Async function/method that manipulates HTTPResponse object and returns Result. :param xcom: Cross communication object
Source code in illuminate/observation/http.py
def __init__(
self,
url: str,
/,
allowed: Union[list[str], tuple[str]],
callback: Callable[[HTTPResponse, tuple, dict], Result],
xcom: Optional[Any] = None,
*args,
**kwargs,
):
"""
HTTPObservation's __init__ method.
:param url: Data's HTTP URL
:param allowed: Collection of strings evaluated against self.url to
determent if URL is allowed
:param callback: Async function/method that manipulates HTTPResponse
object and returns Result.
:param xcom: Cross communication object
"""
super().__init__(url, xcom=xcom)
self._allowed = allowed
self._callback = callback
self.configuration = kwargs
__repr__(self)
special
observe(self, *args, **kwargs) -> Union[None, Result]
async
Requests data from HTTP server, passes response object to a callback and returns None or Result.
:return: None or Result
Source code in illuminate/observation/http.py
async def observe(self, *args, **kwargs) -> Union[None, Result]:
"""
Requests data from HTTP server, passes response object to a callback
and returns None or Result.
:return: None or Result
"""
try:
response = await httpclient.AsyncHTTPClient().fetch(
self.url, **self.configuration
)
logger.info(f"{self}.observe() -> {response}")
return self._callback(response, *args, **kwargs)
except Exception as exception:
logger.warning(f"{self}.observe() -> {exception}")
return None
illuminate.observation.sql.SQLObservation (Observation)
SQLObservation class, reads data from database asynchronously. Inherits Observation class and implements observe method.
Source code in illuminate/observation/sql.py
class SQLObservation(Observation):
"""
SQLObservation class, reads data from database asynchronously. Inherits
Observation class and implements observe method.
"""
def __hash__(self) -> int:
"""
SQLObservation object hash value.
:return: int
"""
query = str(self.query)
return hash(f"{self.url}|:{query}")
def __init__(
self,
query: Union[Select, TextClause],
url: str,
/,
callback: Callable[[AlchemyResult, tuple, dict], Result],
xcom: Optional[Any] = None,
*args,
**kwargs,
):
"""
SQLObservation's __init__ method.
:param query: SQLAlchemy query object.
:param url: Database name in project settings.
:param callback: Async function/method that manipulates AlchemyResult
object and returns Result.
:param xcom: Cross communication object
"""
super().__init__(url, xcom=xcom)
self._callback = callback
self.query = query
async def observe(
self, session: Type[AsyncSession], *args, **kwargs
) -> Union[None, Result]:
"""
Reads data from database, passes response object to a callback
and returns None or Result.
:return: None or Result
"""
try:
async with session() as session: # type: ignore
async with session.begin(): # type: ignore
query = self.query
response = await session.execute(query) # type: ignore
logger.info(f'{self}.observe(session="{session}")')
return self._callback(response, *args, **kwargs)
except Exception as exception:
logger.warning(f"{self}.observe() -> {exception}")
return None
def __repr__(self):
"""
SQLObservation's __repr__ method.
:return: String representation of an instance
"""
return (
f'SQLObservation("{self.query}","{self.url}",'
f'callback="{self._callback}")'
)
__hash__(self) -> int
special
__init__(/, self, query: Union[Select, TextClause], url: str, callback: Callable[[AlchemyResult, tuple, dict], Result], xcom: Optional[Any] = None, *args, **kwargs)
special
SQLObservation's init method.
:param query: SQLAlchemy query object. :param url: Database name in project settings. :param callback: Async function/method that manipulates AlchemyResult object and returns Result. :param xcom: Cross communication object
Source code in illuminate/observation/sql.py
def __init__(
self,
query: Union[Select, TextClause],
url: str,
/,
callback: Callable[[AlchemyResult, tuple, dict], Result],
xcom: Optional[Any] = None,
*args,
**kwargs,
):
"""
SQLObservation's __init__ method.
:param query: SQLAlchemy query object.
:param url: Database name in project settings.
:param callback: Async function/method that manipulates AlchemyResult
object and returns Result.
:param xcom: Cross communication object
"""
super().__init__(url, xcom=xcom)
self._callback = callback
self.query = query
__repr__(self)
special
SQLObservation's repr method.
:return: String representation of an instance
observe(self, session: Type[AsyncSession], *args, **kwargs) -> Union[None, Result]
async
Reads data from database, passes response object to a callback and returns None or Result.
:return: None or Result
Source code in illuminate/observation/sql.py
async def observe(
self, session: Type[AsyncSession], *args, **kwargs
) -> Union[None, Result]:
"""
Reads data from database, passes response object to a callback
and returns None or Result.
:return: None or Result
"""
try:
async with session() as session: # type: ignore
async with session.begin(): # type: ignore
query = self.query
response = await session.execute(query) # type: ignore
logger.info(f'{self}.observe(session="{session}")')
return self._callback(response, *args, **kwargs)
except Exception as exception:
logger.warning(f"{self}.observe() -> {exception}")
return None
illuminate.observation.http.SplashObservation (HTTPObservation)
SplashObservation class, reads data from HTTP server asynchronously. Inherits HTTPObservation class and implements observe method.
Constructor's kwargs are used to create Splash service URL. For full list of parameters visit https://splash.readthedocs.io/en/stable/api.html.
Note: URL is passed as positional argument. It will be used as param in Splash service URL.
Source code in illuminate/observation/http.py
class SplashObservation(HTTPObservation):
"""
SplashObservation class, reads data from HTTP server asynchronously.
Inherits HTTPObservation class and implements observe method.
Constructor's kwargs are used to create Splash service URL. For full list
of parameters visit https://splash.readthedocs.io/en/stable/api.html.
Note: URL is passed as positional argument. It will be used as param
in Splash service URL.
"""
def __hash__(self) -> int:
"""
SplashObservation object hash value.
:return: int
"""
return hash(self.service)
@property
def service(self):
"""
Constructs URL of Splash service
:return: Splash URL string
"""
defaults: dict = {
"host": "localhost",
"port": 8050,
"protocol": "http",
"render": "html",
}
parameters = copy(self.configuration)
for i in defaults:
if i in parameters:
defaults[i] = parameters[i]
del parameters[i]
parameters["url"] = self.url
endpoint = "{protocol}://{host}:{port}/render.{render}?"
endpoint = endpoint.format(**defaults)
parameters = urllib.parse.urlencode(parameters)
return f"{endpoint.format(**defaults)}{parameters}"
async def observe(
self, configuration: dict, *args, **kwargs
) -> Union[None, Result]:
"""
Requests data from HTTP server and renders response with Splash, passes
response object to a callback and returns None or Result.
:param configuration: HTTP configuration dict from settings.py
:return: None or Result
"""
try:
response = await httpclient.AsyncHTTPClient().fetch(
self.service, **configuration
)
logger.info(f"{self}.observe() -> {response}")
return self._callback(response, *args, **kwargs)
except Exception as exception:
logger.warning(f"{self}.observe() -> {exception}")
return None
def __repr__(self):
"""
SplashObservation's __repr__ method.
:return: String representation of an instance
"""
return f'SplashObservation("{self.url}",callback="{self._callback}")'
service
property
readonly
Constructs URL of Splash service
:return: Splash URL string
__hash__(self) -> int
special
__repr__(self)
special
SplashObservation's repr method.
:return: String representation of an instance
observe(self, configuration: dict, *args, **kwargs) -> Union[None, Result]
async
Requests data from HTTP server and renders response with Splash, passes response object to a callback and returns None or Result.
:param configuration: HTTP configuration dict from settings.py :return: None or Result
Source code in illuminate/observation/http.py
async def observe(
self, configuration: dict, *args, **kwargs
) -> Union[None, Result]:
"""
Requests data from HTTP server and renders response with Splash, passes
response object to a callback and returns None or Result.
:param configuration: HTTP configuration dict from settings.py
:return: None or Result
"""
try:
response = await httpclient.AsyncHTTPClient().fetch(
self.service, **configuration
)
logger.info(f"{self}.observe() -> {response}")
return self._callback(response, *args, **kwargs)
except Exception as exception:
logger.warning(f"{self}.observe() -> {exception}")
return None
illuminate.observer.finding.Finding (IFinding)
illuminate.adapter.adapter.Adapter (IAdapter)
Adapter class, generates Exporter and Observation objects using Finding instances with optional transformation. Class must be inherited and method adapt must be implemented in a child class.
Source code in illuminate/adapter/adapter.py
class Adapter(IAdapter):
"""
Adapter class, generates Exporter and Observation objects using Finding
instances with optional transformation. Class must be inherited and method
adapt must be implemented in a child class.
"""
priority: int
"""
Place in Adapter list. If two Adapters have the same Finding in subscriber
tuple, one with the higher priority will call method adapt first on
Finding object.
"""
subscribers: tuple[Type[Finding]]
"""
Tuple of Finding class children used to determent if Adapter object should
call method adapt on Finding instance.
"""
def __init__(self, manager: Optional[Manager] = None):
"""
Adapter's __init__ method.
:param manager: Manager object
"""
self.manager = manager
async def adapt(
self, finding: Finding, *args, **kwargs
) -> AsyncGenerator[Union[Exporter, Observation], None]:
"""
Generates Exporter and Observation objects. Must be implemented in a
child class.
It is meant to be a scope where additional transformation up on
finding objects should be performed (like data enrichment from
additional data source) before yielding Exporter or Observation
instances.
:param finding: Finding object
:return: Async Exporter and Observation object generator
:raises BasicAdapterException:
"""
raise BasicAdapterException(
"Method adapt must be implemented in child class"
)
__init__(self, manager: Optional[Manager] = None)
special
adapt(self, finding: Finding, *args, **kwargs) -> AsyncGenerator[Union[Exporter, Observation], None]
async
Generates Exporter and Observation objects. Must be implemented in a child class.
It is meant to be a scope where additional transformation up on finding objects should be performed (like data enrichment from additional data source) before yielding Exporter or Observation instances.
:param finding: Finding object :return: Async Exporter and Observation object generator :raises BasicAdapterException:
Source code in illuminate/adapter/adapter.py
async def adapt(
self, finding: Finding, *args, **kwargs
) -> AsyncGenerator[Union[Exporter, Observation], None]:
"""
Generates Exporter and Observation objects. Must be implemented in a
child class.
It is meant to be a scope where additional transformation up on
finding objects should be performed (like data enrichment from
additional data source) before yielding Exporter or Observation
instances.
:param finding: Finding object
:return: Async Exporter and Observation object generator
:raises BasicAdapterException:
"""
raise BasicAdapterException(
"Method adapt must be implemented in child class"
)
illuminate.exporter.exporter.Exporter (IExporter)
Exporter class, writes data to destination. Class must be inherited and method export must be implemented in a child class.
Source code in illuminate/exporter/exporter.py
class Exporter(IExporter):
"""
Exporter class, writes data to destination. Class must be inherited and
method export must be implemented in a child class.
"""
async def export(self, *args, **kwargs):
"""
Writes data to destination. Must be implemented in a child class.
:return: None
:raises BasicExporterException:
"""
raise BasicExporterException(
"Method export must be implemented in child class"
)
export(self, *args, **kwargs)
async
Writes data to destination. Must be implemented in a child class.
:return: None :raises BasicExporterException:
illuminate.exporter.influxdb.InfluxDBExporter (Exporter)
InfluxDBExporter class, writes data to InfluxDB database asynchronously. Inherits Exporter class and implements export method.
Each InfluxDBExporter object is responsible for a single transaction with a single database. Attribute name is used to acquire database session object from Manager's sessions attribute.
Constructor kwargs will be passed to client write method. For more information on write method, visit: https://aioinflux.readthedocs.io/en/stable/api.html
Supported write data objects: - DataFrame with DatetimeIndex - Dict containing the keys: measurement, time, tags, fields - String (str or bytes) properly formatted in InfluxDB’s line protocol - An iterable of one of the above
Source code in illuminate/exporter/influxdb.py
class InfluxDBExporter(Exporter):
"""
InfluxDBExporter class, writes data to InfluxDB database asynchronously.
Inherits Exporter class and implements export method.
Each InfluxDBExporter object is responsible for a single transaction with a
single database. Attribute name is used to acquire database session object
from Manager's sessions attribute.
Constructor kwargs will be passed to client write method. For more
information on write method, visit:
https://aioinflux.readthedocs.io/en/stable/api.html
Supported write data objects:
- DataFrame with DatetimeIndex
- Dict containing the keys: measurement, time, tags,
fields
- String (str or bytes) properly formatted in InfluxDB’s line protocol
- An iterable of one of the above
"""
name: str
"""
InfluxDB database name selector used to get database session object from
Manager.sessions attribute.
"""
def __init__(
self,
points: Union[
Union[DataFrame, dict, str, bytes],
Iterable[Union[DataFrame, dict, str, bytes]],
],
*args,
**kwargs,
):
"""
InfluxDBExporter's __init__ method.
:param points: Supported data objects
"""
self.params = kwargs
self.points = points
async def export(self, session: InfluxDBClient, *args, **kwargs) -> None:
"""
Writes data to Influxdb asynchronously.
:param session: InfluxDBClient object
:return: None
:raises BasicExporterException:
"""
try:
await session.write(self.points, **self.params)
except Exception as exception:
logger.warning(
f'{self}.export(session="{session}") -> {exception}'
)
raise BasicExporterException
logger.success(f'{self}.export(session="{session}")')
def __repr__(self):
"""
InfluxDBExporter's __repr__ method.
:return: String representation of an instance
"""
return f"InfluxDBExporter(points={self.points})"
__init__(self, points: Union[Union[DataFrame, dict, str, bytes], Iterable[Union[DataFrame, dict, str, bytes]]], *args, **kwargs)
special
InfluxDBExporter's init method.
:param points: Supported data objects
Source code in illuminate/exporter/influxdb.py
__repr__(self)
special
export(self, session: InfluxDBClient, *args, **kwargs) -> None
async
Writes data to Influxdb asynchronously.
:param session: InfluxDBClient object :return: None :raises BasicExporterException:
Source code in illuminate/exporter/influxdb.py
async def export(self, session: InfluxDBClient, *args, **kwargs) -> None:
"""
Writes data to Influxdb asynchronously.
:param session: InfluxDBClient object
:return: None
:raises BasicExporterException:
"""
try:
await session.write(self.points, **self.params)
except Exception as exception:
logger.warning(
f'{self}.export(session="{session}") -> {exception}'
)
raise BasicExporterException
logger.success(f'{self}.export(session="{session}")')
illuminate.exporter.sql.SQLExporter (Exporter)
SQLExporter class, writes data to SQL database asynchronously. Inherits Exporter class and implements export method.
Each SQLExporter object is responsible for a single transaction with a single database. Attribute name is used to acquire database session object from Manager's sessions attribute.
Supported dialects: - Mysql - Postgres
Source code in illuminate/exporter/sql.py
class SQLExporter(Exporter):
"""
SQLExporter class, writes data to SQL database asynchronously. Inherits
Exporter class and implements export method.
Each SQLExporter object is responsible for a single transaction with a
single database. Attribute name is used to acquire database session object
from Manager's sessions attribute.
Supported dialects:
- Mysql
- Postgres
"""
name: str
"""
SQL database name selector used to get database session object from
Manager.sessions attribute.
"""
def __init__(self, models: Union[list[M], tuple[M]]):
"""
SQLExporter's __init__ method.
:param models: SQLAlchemy model objects collection
"""
self.models = models
async def export(
self, session: Type[AsyncSession], *args, **kwargs
) -> None:
"""
Writes data to SQL database asynchronously.
:param session: AsyncSession object
:return: None
:raises BasicExporterException:
"""
async with session() as session: # type: ignore
async with session.begin(): # type: ignore
session.add_all(self.models) # type: ignore
try:
await session.commit() # type: ignore
except Exception as exception:
logger.warning(
f'{self}.export(session="{session}") -> {exception}'
)
raise BasicExporterException
logger.success(f'{self}.export(session="{session}")')
def __repr__(self):
"""
SQLExporter's __repr__ method.
:return: String representation of an instance
"""
return f"SQLExporter(models={self.models})"
__init__(self, models: Union[list[M], tuple[M]])
special
__repr__(self)
special
export(self, session: Type[AsyncSession], *args, **kwargs) -> None
async
Writes data to SQL database asynchronously.
:param session: AsyncSession object :return: None :raises BasicExporterException:
Source code in illuminate/exporter/sql.py
async def export(
self, session: Type[AsyncSession], *args, **kwargs
) -> None:
"""
Writes data to SQL database asynchronously.
:param session: AsyncSession object
:return: None
:raises BasicExporterException:
"""
async with session() as session: # type: ignore
async with session.begin(): # type: ignore
session.add_all(self.models) # type: ignore
try:
await session.commit() # type: ignore
except Exception as exception:
logger.warning(
f'{self}.export(session="{session}") -> {exception}'
)
raise BasicExporterException
logger.success(f'{self}.export(session="{session}")')