Skip to content

Platform API

Command

Bases: Enum

Command the platform should execute.

Source code in sramplatform/packet.py
class Command(Enum):
    """Command the platform should execute."""

    # Packet has been received correctly.
    ACK = 1
    # Identify devices in a chain.
    PING = 2
    # Read a region of memory.
    READ = 3
    # Write to a region of memory.
    WRITE = 4
    # Read the sensors of a device.
    SENSORS = 5
    # Load custom code in a device.
    LOAD = 6
    # Execute custom code on a device.
    EXEC = 7
    # Receive results from executing code.
    RETR = 8
    # Error when receiveng a packet.
    ERR = 255

Packet

Packet used for the communication protocol.

Attributes:

Name Type Description
command

Command the platform should execute.

pic

Position In Chain of the device.

options

Metadata for the packet.

uid

UID of the device.

data

Actual data of the packet.

bytes

Bytes representation of the packet.

checksum Optional[int]

CRC of the packet for integrity.

Source code in sramplatform/packet.py
class Packet:
    """Packet used for the communication protocol.

    Attributes:
        command: Command the platform should execute.
        pic: Position In Chain of the device.
        options: Metadata for the packet.
        uid: UID of the device.
        data: Actual data of the packet.
        bytes: Bytes representation of the packet.
        checksum: CRC of the packet for integrity.
    """

    def __init__(self, data_size: int):
        self.data_size = data_size
        self.bytes_fmt = f"<BBI25s{self.data_size}BH"
        self.size = struct.calcsize(self.bytes_fmt)

        self.__command: int = Command.PING.value
        self.__pic: int = 0
        self.__options: int = 0x0
        self.__uid: str = "0" * 25
        self.__data: List[int] = [0x7] * self.data_size
        self.checksum: Optional[int] = None
        self.__bytes: Optional[bytes] = None

    @classmethod
    def full_size(cls, data_size: int):
        packet = cls(data_size)
        return packet.size

    def __str__(self):
        checksum = self.checksum or 0
        return (
            f"<Packet {Command(self.__command).name} "
            f"{self.__pic:03d}:{format_uid(self.__uid)} "
            f"[0x{self.__options:04X}] "
            f"CRC(0x{checksum:04X})>"
        )

    def __repr__(self):
        checksum = self.checksum or 0
        return (
            f"<Packet {Command(self.__command).name} "
            f"{self.__pic:03d}:{format_uid(self.__uid)} "
            f"[0x{self.__options:04X}] "
            f"CRC(0x{checksum:04X}) "
            f"{bytes(self.__data)}>"
        )

    @property
    def command(self):
        "Getter for command"
        return self.__command

    @property
    def pic(self):
        "Getter for pic"
        return self.__pic

    @property
    def options(self):
        "Getter for options"
        return self.__options

    @property
    def uid(self):
        "Getter for uid"
        return self.__uid

    @property
    def data(self):
        "Getter for data"
        return self.__data

    def with_command(self, command: Union[int, Command]):
        """Set the command of the packet."""
        if isinstance(command, Command):
            self.__command = command.value
        else:
            self.__command = command

    def with_pic(self, pic: int):
        """Set the pic of the packet."""
        self.__pic = pic

    def with_uid(self, uid: str):
        """Set the uid of the packet."""
        self.__uid = uid

    def with_options(self, options: int):
        """Set the options of the packet."""
        self.__options = options

    def with_data(self, data: list[int]):
        """Set the data of the packet."""
        self.__data = data

    def with_checksum(self, checksum: int):
        """Set the checksum of the packet."""
        self.checksum = checksum

    def is_crafted(self) -> bool:
        """Check if a packet is ready to be sent.

        Returns:
            True if bytes is not None.
        """
        return self.__bytes is not None

    def craft(self):
        """Craft a packet to send.

        Calculate the checksum if it hasn't been calculated yet,
        and create the bytes representation of the packet.
        """
        if isinstance(self.__uid, bytes):
            uid = self.__uid
        else:
            uid = bytes(self.__uid, "utf-8")

        if self.checksum is None:
            self.__bytes = struct.pack(
                self.bytes_fmt,
                self.__command,
                self.__pic,
                self.__options,
                uid,
                *self.__data,
                0,
            )
            self.checksum = crc16(self.__bytes)
        self.__bytes = struct.pack(
            self.bytes_fmt,
            self.__command,
            self.__pic,
            self.__options,
            uid,
            *self.__data,
            self.checksum,
        )

    @classmethod
    def from_bytes(cls, data_size: int, raw_data: bytes):
        """
        Create a packet from bytes.

        Args:
          raw_data: Bytes representing the packet.

        Raises:
          ValueError: If the length of ``raw_data`` does not match ``Packet.SIZE``.

        Returns:
          Packet created from the bytes.
        """
        packet = cls(data_size)

        if len(raw_data) != packet.size:
            error = f"Packet size {len(raw_data)} does not match {data_size}"
            raise ValueError(error)
        (
            command,
            pic,
            options,
            uid,
            *data,
            checksum,
        ) = struct.unpack(packet.bytes_fmt, raw_data)

        packet.with_command(command)
        packet.with_pic(pic)
        packet.with_uid(uid)
        packet.with_options(options)
        packet.with_data(data)
        packet.with_checksum(checksum)
        packet.craft()
        return packet

    def extract_sensors(self) -> Dict:
        """Extract the values of the sensors from the data.

        The information of the sensors is stored in the following way:

        - `temp_110_cal`: Temperature calibration at 110 Celsius.
        - `temp_30_cal`: Temperature calibration at 30 Celsius.
        - `temp_raw`: Raw value of temperature.
        - `vdd_cal`: Calibration of VDD.
        - `vdd_raw`: Raw value of VDD.

        All the values are stored in 2 bytes.

        Returns:
            Dictionary containing ``temperature`` and ``vdd``.
        """

        def calc_vdd(vdd: int, vdd_cal: int) -> float:
            """
            Calculate the working voltage.

            Args:
              vdd: Raw value from the voltage sensor.
              vdd_cal: Calibration value.

            Returns:
              The working vdd in volts.
            """
            return round((3300 * vdd_cal / vdd) * 0.001, 5)

        def calc_temp(temp: int, cal_30: int, cal_110: int) -> float:
            """
            Calculate the working temperature.

            Args:
              temp: Raw value from the temperature sensor.
              cal_30: Calibration value at 30 degrees celsius.
              cal_110: Calibration value at 110 degrees celsius.

            Returns:
              The working temperature in degrees celsius.
            """
            return round(((110 - 30) / (cal_110 - cal_30)) * (temp - cal_30) + 30.0, 5)

        data = struct.unpack("<HHHHH", bytes(self.__data[:10]))
        return {
            "temperature": calc_temp(data[2], data[1], data[0]),
            "voltage": calc_vdd(data[4], data[3]),
        }

    def to_bytes(self) -> bytes:
        """Return the bytes representation the packet.

        Returns:
            Bytes representation of the packet.

        Raises:
            ValueError if packet is not crafted.
        """
        if self.__bytes is None:
            raise ValueError("Packet is not crafted. Call craft() method first")
        return self.__bytes

    def check_crc(self) -> bool:
        if self.__bytes is None:
            raise ValueError("Packet is not crafted. Call craft() method first")
        buffer = bytearray(self.__bytes)
        buffer[-1], buffer[-2] = 0, 0
        return crc16(buffer) == self.checksum

command property

Getter for command

data property

Getter for data

options property

Getter for options

pic property

Getter for pic

uid property

Getter for uid

craft()

Craft a packet to send.

Calculate the checksum if it hasn't been calculated yet, and create the bytes representation of the packet.

Source code in sramplatform/packet.py
def craft(self):
    """Craft a packet to send.

    Calculate the checksum if it hasn't been calculated yet,
    and create the bytes representation of the packet.
    """
    if isinstance(self.__uid, bytes):
        uid = self.__uid
    else:
        uid = bytes(self.__uid, "utf-8")

    if self.checksum is None:
        self.__bytes = struct.pack(
            self.bytes_fmt,
            self.__command,
            self.__pic,
            self.__options,
            uid,
            *self.__data,
            0,
        )
        self.checksum = crc16(self.__bytes)
    self.__bytes = struct.pack(
        self.bytes_fmt,
        self.__command,
        self.__pic,
        self.__options,
        uid,
        *self.__data,
        self.checksum,
    )

extract_sensors()

Extract the values of the sensors from the data.

The information of the sensors is stored in the following way:

  • temp_110_cal: Temperature calibration at 110 Celsius.
  • temp_30_cal: Temperature calibration at 30 Celsius.
  • temp_raw: Raw value of temperature.
  • vdd_cal: Calibration of VDD.
  • vdd_raw: Raw value of VDD.

All the values are stored in 2 bytes.

Returns:

Type Description
Dict

Dictionary containing temperature and vdd.

Source code in sramplatform/packet.py
def extract_sensors(self) -> Dict:
    """Extract the values of the sensors from the data.

    The information of the sensors is stored in the following way:

    - `temp_110_cal`: Temperature calibration at 110 Celsius.
    - `temp_30_cal`: Temperature calibration at 30 Celsius.
    - `temp_raw`: Raw value of temperature.
    - `vdd_cal`: Calibration of VDD.
    - `vdd_raw`: Raw value of VDD.

    All the values are stored in 2 bytes.

    Returns:
        Dictionary containing ``temperature`` and ``vdd``.
    """

    def calc_vdd(vdd: int, vdd_cal: int) -> float:
        """
        Calculate the working voltage.

        Args:
          vdd: Raw value from the voltage sensor.
          vdd_cal: Calibration value.

        Returns:
          The working vdd in volts.
        """
        return round((3300 * vdd_cal / vdd) * 0.001, 5)

    def calc_temp(temp: int, cal_30: int, cal_110: int) -> float:
        """
        Calculate the working temperature.

        Args:
          temp: Raw value from the temperature sensor.
          cal_30: Calibration value at 30 degrees celsius.
          cal_110: Calibration value at 110 degrees celsius.

        Returns:
          The working temperature in degrees celsius.
        """
        return round(((110 - 30) / (cal_110 - cal_30)) * (temp - cal_30) + 30.0, 5)

    data = struct.unpack("<HHHHH", bytes(self.__data[:10]))
    return {
        "temperature": calc_temp(data[2], data[1], data[0]),
        "voltage": calc_vdd(data[4], data[3]),
    }

from_bytes(data_size, raw_data) classmethod

Create a packet from bytes.

Parameters:

Name Type Description Default
raw_data bytes

Bytes representing the packet.

required

Raises:

Type Description
ValueError

If the length of raw_data does not match Packet.SIZE.

Returns:

Type Description

Packet created from the bytes.

Source code in sramplatform/packet.py
@classmethod
def from_bytes(cls, data_size: int, raw_data: bytes):
    """
    Create a packet from bytes.

    Args:
      raw_data: Bytes representing the packet.

    Raises:
      ValueError: If the length of ``raw_data`` does not match ``Packet.SIZE``.

    Returns:
      Packet created from the bytes.
    """
    packet = cls(data_size)

    if len(raw_data) != packet.size:
        error = f"Packet size {len(raw_data)} does not match {data_size}"
        raise ValueError(error)
    (
        command,
        pic,
        options,
        uid,
        *data,
        checksum,
    ) = struct.unpack(packet.bytes_fmt, raw_data)

    packet.with_command(command)
    packet.with_pic(pic)
    packet.with_uid(uid)
    packet.with_options(options)
    packet.with_data(data)
    packet.with_checksum(checksum)
    packet.craft()
    return packet

is_crafted()

Check if a packet is ready to be sent.

Returns:

Type Description
bool

True if bytes is not None.

Source code in sramplatform/packet.py
def is_crafted(self) -> bool:
    """Check if a packet is ready to be sent.

    Returns:
        True if bytes is not None.
    """
    return self.__bytes is not None

to_bytes()

Return the bytes representation the packet.

Returns:

Type Description
bytes

Bytes representation of the packet.

Source code in sramplatform/packet.py
def to_bytes(self) -> bytes:
    """Return the bytes representation the packet.

    Returns:
        Bytes representation of the packet.

    Raises:
        ValueError if packet is not crafted.
    """
    if self.__bytes is None:
        raise ValueError("Packet is not crafted. Call craft() method first")
    return self.__bytes

with_checksum(checksum)

Set the checksum of the packet.

Source code in sramplatform/packet.py
def with_checksum(self, checksum: int):
    """Set the checksum of the packet."""
    self.checksum = checksum

with_command(command)

Set the command of the packet.

Source code in sramplatform/packet.py
def with_command(self, command: Union[int, Command]):
    """Set the command of the packet."""
    if isinstance(command, Command):
        self.__command = command.value
    else:
        self.__command = command

with_data(data)

Set the data of the packet.

Source code in sramplatform/packet.py
def with_data(self, data: list[int]):
    """Set the data of the packet."""
    self.__data = data

with_options(options)

Set the options of the packet.

Source code in sramplatform/packet.py
def with_options(self, options: int):
    """Set the options of the packet."""
    self.__options = options

with_pic(pic)

Set the pic of the packet.

Source code in sramplatform/packet.py
def with_pic(self, pic: int):
    """Set the pic of the packet."""
    self.__pic = pic

with_uid(uid)

Set the uid of the packet.

Source code in sramplatform/packet.py
def with_uid(self, uid: str):
    """Set the uid of the packet."""
    self.__uid = uid

crc16(buffer)

Calculate the CRC16 from a byte buffer.

Parameters:

Name Type Description Default
buffer bytes

Buffer of bytes.

required

Returns:

Type Description
int

The calculated CRC.

Source code in sramplatform/packet.py
def crc16(buffer: bytes) -> int:
    """Calculate the CRC16 from a byte buffer.

    Args:
        buffer: Buffer of bytes.

    Returns:
        The calculated CRC.
    """
    crc = 0

    def crc16_byte(crc, data):
        """Helper function to get a value from the CRC16_LUT"""
        return (crc >> 8) ^ CRC16_LUT[(crc ^ data) & 0xFF]

    for byte in buffer:
        crc = crc16_byte(crc, byte)
    return crc

format_uid(uid)

Format a device UID.

If the uid is bytes, remove the null terminator.

Parameters:

Name Type Description Default
uid Union[str, bytes]

UID of the device.

required

Returns:

Type Description
str

Ther formmated UID as a string.

Source code in sramplatform/packet.py
def format_uid(uid: Union[str, bytes]) -> str:
    """Format a device UID.

    If the uid is bytes, remove the null terminator.

    Args:
        uid: UID of the device.

    Returns:
        Ther formmated UID as a string.
    """
    if isinstance(uid, str):
        return uid
    return uid.decode("ascii").split("\x00")[0]

offset_to_address(offset, data_size, sram_start=536870912)

Convert an offset in memory to absolute memory address.

Parameters:

Name Type Description Default
offset int

Offset from the start of the SRAM.

required
sram_start int

Byte representing the start of the SRAM. 0x20000000 by default.

536870912

Raises:

Type Description
ValueError

If offset is negative.

Returns:

Type Description
str

Address formated as 0xXXXXXXXX.

Source code in sramplatform/packet.py
def offset_to_address(offset: int, data_size: int, sram_start: int = 0x20000000) -> str:
    """Convert an offset in memory to absolute memory address.

    Args:
        offset: Offset from the start of the SRAM.
        sram_start: Byte representing the start of the SRAM. 0x20000000 by default.

    Raises:
        ValueError: If offset is negative.

    Returns:
        Address formated as 0xXXXXXXXX.
    """
    if offset < 0:
        raise ValueError("Offset cannot be negative")

    return f"0x{sram_start + (offset * data_size):08X}"

DBManager

Class to manage the communication with the database.

The default database is PostreSQL. However, another database can be used in place by configuring the DBManager.

The manager is allowed to insert into the database any object as long as it has been registered by using TableBase. For more instructions on how to register objects to the database please see https://docs.sqlalchemy.org/en/14/orm/declarative_tables.html.

Example

from sramplatform.storage import DBManager, TableBase

class Users(TableBase): tablename = "User" id = Column(Integer, primary_key=True) name = Column(String, nullable=False)

manager = DBManager(url)

Queries can be made by using the session attribute.

Example

manager.session.query(User).all

Attributes:

Name Type Description
session

Session to the DB

Parameters:

Name Type Description Default
url Union[str, DBParameters]

DBParameters instance or string containing the url to connect.

required
Source code in sramplatform/storage.py
class DBManager:
    """Class to manage the communication with the database.

    The default database is PostreSQL. However, another database can be used in place by configuring the DBManager.

    The manager is allowed to insert into the database any object as long as it has been registered by using ``TableBase``.  For more instructions on how to register objects to the database please see `https://docs.sqlalchemy.org/en/14/orm/declarative_tables.html`.

    Example:
        >>> from sramplatform.storage import DBManager, TableBase
        >>>
        >>> class Users(TableBase):
        >>>     __tablename__ = "User"
        >>>     id = Column(Integer, primary_key=True)
        >>>     name = Column(String, nullable=False)
        >>>
        >>> manager = DBManager(url)

    Queries can be made by using the session attribute.
    Example:
        >>> manager.session.query(User).all

    Attributes:
        session: Session to the DB

    Args:
        url: DBParameters instance or string containing the url to connect.
    """

    def __init__(self, url: Union[str, DBParameters]):
        engine = create_engine(str(url))

        Session = sessionmaker(engine)
        self.session = Session()
        TableBase.metadata.create_all(engine)

DBParameters dataclass

Class to manage database connection parameters.

By default it is assumed that the database is PostgreSQL. For a list of supported engines, please see https://docs.sqlalchemy.org/en/14/core/engines.html.

Attributes:

Name Type Description
user str

Username to connect to the database.

password str

User password to connect to the database.

dbname Optional[str]

Name of the database to connect.

engine Optional[str]

Database to use. Defaults to 'postgresql'.

host Optional[str]

Hostname for the database. Defaults to 'localhost'.

port Optional[int]

Connection port for the database. Defaults to 5432.

Source code in sramplatform/storage.py
@dataclass
class DBParameters:
    """Class to manage database connection parameters.

    By default it is assumed that the database is PostgreSQL.
    For a list of supported engines, please see `https://docs.sqlalchemy.org/en/14/core/engines.html`.

    Attributes:
        user: Username to connect to the database.
        password: User password to connect to the database.
        dbname: Name of the database to connect.
        engine: Database to use. Defaults to 'postgresql'.
        host: Hostname for the database. Defaults to 'localhost'.
        port: Connection port for the database. Defaults to 5432.
    """

    user: str
    password: str
    dbname: Optional[str] = None
    engine: Optional[str] = None
    host: Optional[str] = None
    port: Optional[int] = None

    def __post_init__(self):
        if self.engine is None:
            self.engine = "postgresql"
        if self.host is None:
            self.host = "localhost"
        if self.port is None:
            self.port = 5432

    def __str__(self):
        return (
            f"{self.engine}://"
            f"{self.user}:{self.password}"
            f"@{self.host}:{self.port}/{self.dbname}"
        )

Reader

Interface of a device reader.

The reader acts as a proxy between the station and the devices.

Attributes:

Name Type Description
name

Descriptive name of the reader.

Source code in sramplatform/reader.py
class Reader:
    """Interface of a device reader.

    The reader acts as a proxy between the station and the devices.

    Attributes:
        name: Descriptive name of the reader.
    """

    def __init__(self, name: str):
        self.name = name

    def send(self, data: bytes):
        raise NotImplementedError

    def receive(self):
        raise NotImplementedError

LogLevelFilter

Bases: logging.Filter

https://stackoverflow.com/a/7447596/190597 (robert)

Source code in sramplatform/logbook.py
class LogLevelFilter(logging.Filter):
    """https://stackoverflow.com/a/7447596/190597 (robert)"""

    def __init__(self, level):
        self.level = level

    def filter(self, record):
        return record.levelno < self.level

MailHandler

Bases: logging.StreamHandler

Log handler for Email.

Source code in sramplatform/logbook.py
class MailHandler(logging.StreamHandler):
    """Log handler for Email."""

    def __init__(self, email, oauth_path, recipients, subject):
        super(MailHandler, self).__init__(self)
        self.mail = yagmail.SMTP(email, oauth2_file=oauth_path)
        if isinstance(recipients, list):
            self.recipients = recipients
        else:
            self.recipients = list(recipients)
        self.subject = subject

    def emit(self, record):
        msg = self.format(record)
        template = f"""
        {msg}
        """
        self.mail.send(to=self.recipients, subject=self.subject, contents=template)

RabbitMQHandler

Bases: logging.StreamHandler

Log handler for RabbitMQ.

Source code in sramplatform/logbook.py
class RabbitMQHandler(logging.StreamHandler):
    """Log handler for RabbitMQ."""

    def __init__(self, url, name, exchange):
        super(RabbitMQHandler, self).__init__(self)
        self.agent = Sender(url, name, exchange)

    def emit(self, record):
        msg = self.format(record)
        self.agent.send(msg)

TelegramHandler

Bases: logging.StreamHandler

Log handler for Telegram.

Source code in sramplatform/logbook.py
class TelegramHandler(logging.StreamHandler):
    """Log handler for Telegram."""

    def __init__(self, token, chat_ids):
        super(TelegramHandler, self).__init__(self)
        self.bot = TeleBot(token)
        if isinstance(chat_ids, list):
            self.chat_ids = chat_ids
        else:
            self.chat_ids = [chat_ids]

    def emit(self, record):
        msg = self.format(record)
        for chat in self.chat_ids:
            self.bot.send_message(chat, msg)

create_handler(name, conf)

Create a handler from a name and a dict.

Parameters:

Name Type Description Default
name str

Name of the handler to create.

required
conf Dict[str, Any]

Dictionary containing the handler configuration.

required

Returns:

Type Description
HandlerType

The configured handler.

Source code in sramplatform/logbook.py
def create_handler(name: str, conf: Dict[str, Any]) -> HandlerType:
    """Create a handler from a name and a dict.

    Args:
        name: Name of the handler to create.
        conf: Dictionary containing the handler configuration.

    Returns:
        The configured handler.
    """
    if name == "TelegramHandler":
        return TelegramHandler(conf["token"], conf["chat_ids"])
    if name == "RabbitMQHandler":
        return RabbitMQHandler(conf["url"], conf["key"], conf["exachange"])
    if name == "MailHandler":
        return MailHandler(
            conf["mail"], conf["oauth"], conf["recipients"], conf["subject"]
        )
    if name == "FileHandler":
        return FileHandler(conf["path"])
    if name == "StreamHandler":
        return StreamHandler(sys.stdout)
    if name == "RotatingFileHandler":
        return RotatingFileHandler(
            conf["path"], maxBytes=conf["maxBytes"], backupCount=conf["backupCount"]
        )
    if name == "TimedRotatingFileHandler":
        return TimedRotatingFileHandler(
            conf["path"],
            when=conf["when"],
            backupCount=conf["backupCount"],
        )
    raise ValueError(f"Handler {name} is not available")

make_formatter(conf, fmt_default, datefmt_default)

Create a Log formatter from a dict.

Parameters:

Name Type Description Default
conf Dict[str, str]

Dictionary containing format and datefmt

required
fmt_default str

Default format to use if none specified.

required
datefmt_default str

Default datefmt to use if none specified.

required
Source code in sramplatform/logbook.py
def make_formatter(conf: Dict[str, str], fmt_default: str, datefmt_default: str):
    """Create a Log formatter from a dict.

    Args:
        conf: Dictionary containing format and datefmt
        fmt_default: Default format to use if none specified.
        datefmt_default: Default datefmt to use if none specified.
    """
    return logging.Formatter(
        conf.get("format", fmt_default),
        datefmt=conf.get("datefmt", datefmt_default),
    )

Dispatcher

Class used to dispatch reader methods based on the commands received.

Attributes:

Name Type Description
agent

Fenneq agent to listen for commands.

logger

Logger used to log information.

db_session

Session to a DB to store data.

Source code in sramplatform/platform.py
class Dispatcher:
    """Class used to dispatch reader methods based on the commands received.

    Attributes:
        agent: Fenneq agent to listen for commands.
        logger: Logger used to log information.
        db_session: Session to a DB to store data.
    """

    def __init__(self, agent, logger, dbmanager, timeout):
        self.agent = agent
        self.dbmanager = dbmanager
        self.logger = logger
        self.timeout = timeout

    def add_command(self, handler, func, **options):
        """
        """

        @self.agent.on(handler, **options)
        def handler_fn(msg):
            command = msg.body["command"]
            self.logger.debug("Handler %s called", command)
            try:
                signal.signal(signal.SIGALRM, timeout_handler)
                signal.alarm(self.timeout)
                func(msg.body, self.logger, self.dbmanager.session)
            except TimeoutError:
                self.logger.critical("Handler %s timeout", command)
            except CommandError as err:
                self.logger.error("%s", err)
            except Exception as excep:
                self.logger.critical(f"Error while executing handler: {excep}")
            else:
                self.logger.debug("Handler %s executed correctly", command)
            finally:
                signal.alarm(0)


    def run(self):
        """Make the dispatcher listen for commands."""
        self.logger.debug(f"{self.agent.name} listening on {self.agent.exchange}")
        self.agent.run()

run()

Make the dispatcher listen for commands.

Source code in sramplatform/platform.py
def run(self):
    """Make the dispatcher listen for commands."""
    self.logger.debug(f"{self.agent.name} listening on {self.agent.exchange}")
    self.agent.run()

from_config(config_path, reader_cls)

Read a YAML config to generate the components for a Dispatcher.

Parameters:

Name Type Description Default
config_path str

Path to the YAML config.

required
reader_cls ReaderType

Class to use to instanciate a Reader.

required

Returns:

Type Description
Tuple[Agent, ReaderType, object]

A tuple containing the Agent, Reader and Logger

Source code in sramplatform/platform.py
def from_config(
    config_path: str, reader_cls: ReaderType
) -> Tuple[Agent, ReaderType, object]:
    """Read a YAML config to generate the components for a Dispatcher.
    Args:
        config_path: Path to the YAML config.
        reader_cls: Class to use to instanciate a Reader.

    Returns:
        A tuple containing the Agent, Reader and Logger
    """
    with open(config_path, "r") as f:
        config = yaml.load(f, Loader=yaml.Loader)
        agent = Agent(**config["agent"])
        reader = reader_cls(**config["reader"])

        root_logger = logging.getLogger(agent.name)
        root_logger.setLevel(logging.DEBUG)
        conf_logging = config["logging"]

        fmt_default = conf_logging["format"]
        datefmt_default = conf_logging["datefmt"]

        for logger_conf in conf_logging.get("loggers", []):
            for name, conf in logger_conf.items():
                handler = create_handler(name, conf)
                level = logging.getLevelName(conf.get("level", logging.INFO))
                handler.setLevel(level)
                filter_level = conf.get("filter_level", None)
                if filter_level:
                    handler.addFilter(
                        LogLevelFilter(logging.getLevelName(filter_level))
                    )
                custom_fmt = make_formatter(conf, fmt_default, datefmt_default)
                handler.setFormatter(custom_fmt)
                root_logger.addHandler(handler)
    return agent, reader, root_logger