Source code for ehforwarderbot.coordinator

# coding=utf-8

"""
Coordinator among channels.

Attributes:
    profile (str): Name of current profile..
    mutex (threading.Lock): Global interaction thread lock.
    master (Channel): The running master channel object.
    slaves (Dict[str, EFBChannel]): Dictionary of running slave channel object.
        Keys are the unique identifier of the channel.
    middlewares (List[Middleware]): List of middlewares
"""

import threading
from contextlib import suppress
from gettext import NullTranslations
from typing import List, Dict, Optional, cast, TYPE_CHECKING, Union

from .channel import Channel, MasterChannel, SlaveChannel
from .exceptions import EFBChannelNotFound
from .middleware import Middleware
from .types import ModuleID

if TYPE_CHECKING:
    from . import Message
    from .status import Status

profile: str = "default"
"""Current running profile name"""

mutex: threading.Lock = threading.Lock()
"""Mutual exclusive lock for user interaction through CLI interface"""

master: MasterChannel  # late init
"""The instance of the master channel."""

slaves: Dict[ModuleID, SlaveChannel] = dict()
"""Instances of slave channels. Keys are the channel IDs."""

middlewares: List[Middleware] = list()
"""Instances of middlewares. Sorted in the order of execution."""

master_thread: Optional[threading.Thread] = None
"""The thread running poll() of the master channel."""

slave_threads: Dict[ModuleID, threading.Thread] = dict()
"""Threads running poll() from slave channels. Keys are the channel IDs."""

translator: NullTranslations = NullTranslations()
"""Internal GNU gettext translator."""


[docs]def add_channel(channel: Channel): """ Register the channel with the coordinator. Args: channel (Channel): Channel to register """ global master, slaves if isinstance(channel, MasterChannel): master = channel elif isinstance(channel, SlaveChannel): slaves[channel.channel_id] = channel else: raise TypeError("Channel instance is expected")
[docs]def add_middleware(middleware: Middleware): """ Register a middleware with the coordinator. Args: middleware (Middleware): Middleware to register """ global middlewares if isinstance(middleware, Middleware): middlewares.append(middleware) else: raise TypeError("Middleware instance is expected")
[docs]def send_message(msg: 'Message') -> Optional['Message']: """ Deliver a new message or edited message to the destination channel. Args: msg (Message): The message Returns: The message processed and delivered by the destination channel, includes the updated message ID if sent to a slave channel. Returns ``None`` if the message is not sent. """ global middlewares, master, slaves if msg is None: return # Go through middlewares for i in middlewares: m = i.process_message(msg) if m is None: return None msg = m msg.verify() if msg.deliver_to.channel_id == master.channel_id: return master.send_message(msg) elif msg.deliver_to.channel_id in slaves: return slaves[msg.deliver_to.channel_id].send_message(msg) else: raise EFBChannelNotFound()
[docs]def send_status(status: 'Status'): """ Deliver a status to the destination channel. Args: status (Status): The status """ global middlewares, master if status is None: return s: 'Optional[Status]' = status # Go through middlewares for i in middlewares: s = i.process_status(cast('Status', s)) if s is None: return status = cast('Status', s) status.verify() status.destination_channel.send_status(status)
[docs]def get_module_by_id(module_id: ModuleID) -> Union[Channel, Middleware]: """ Return the module instance of a provided module ID Args: module_id: Module ID, with instance ID if available. Returns: Module instance requested. Raises: NameError: When the module is not found. """ with suppress(NameError): if master.channel_id == module_id: return master if module_id in slaves: return slaves[module_id] for i in middlewares: if i.middleware_id == module_id: return i raise NameError("Module ID {} is not found".format(module_id))