Source code for symaware.base.components.communication_sender

import asyncio
from abc import abstractmethod
from typing import TYPE_CHECKING, Any, Iterable

from symaware.base.data import (
    Identifier,
    Message,
    MultiAgentAwarenessVector,
    MultiAgentKnowledgeDatabase,
)
from symaware.base.utils import NullObject, Tasynclooplock

from .component import Component

if TYPE_CHECKING:
    import sys
    from typing import Callable

    from symaware.base.agent import Agent

    if sys.version_info >= (3, 10):
        from typing import TypeAlias
    else:
        from typing_extensions import TypeAlias

    SendingCommunicationCallback: TypeAlias = Callable[
        [
            Agent,
            MultiAgentAwarenessVector,
            MultiAgentKnowledgeDatabase,
            "Identifier | Iterable[Identifier] | None",
        ],
        Any,
    ]
    SentCommunicationCallback: TypeAlias = Callable[[Agent, Iterable[Message]], Any]


[docs] class CommunicationSender(Component[Tasynclooplock, "SendingCommunicationCallback", "SentCommunicationCallback"]): """ Generic communication system of an :class:`.symaware.base.Agent`. It is used to communicate with other agents. The information collected is then used to update the knowledge database of the agent. Before sending a message, it will be enqueued in the message queue. On the next iteration, the messages will be sent to the receiver agent(s) through the communication channel. Args ---- agent_id: Identifier of the agent this component belongs to async_loop_lock: Async loop lock to use for the communication sender send_to_self: If True, the agent is able to send messages to itself """ def __init__( self, agent_id: Identifier, async_loop_lock: "Tasynclooplock | None" = None, send_to_self: bool = False, ): super().__init__(agent_id, async_loop_lock) self._send_to_self = send_to_self self._message_queue: "asyncio.Queue[Message]" = asyncio.Queue() self._get_message_task: "asyncio.Task[Message] | None" = None
[docs] def enqueue_messages(self, *messages: Message): """ Enqueue a message to be sent to another agent. Args ---- message: Message to send to the receiver agent """ for message in messages: self._message_queue.put_nowait(message)
[docs] @abstractmethod def _send_communication_through_channel(self, message: Message): """ The information the agents wants to share has been encoded into a message and it sent to another agent using a communication channel. Example ------- Create a new communication system by subclassing the :class:`.CommunicationSender` and implementing the :meth:`_send_communication_through_channel` method. >>> from symaware.base import CommunicationSender, Message >>> class MyCommunicationSender(CommunicationSender): ... def _send_communication_through_channel(self, message: Message): ... # Your implementation here ... # Example: ... # Print the message to the console ... print(f"Sending message to {message.receiver_id}: {message}") Args ---- message: Message to send to the receiver agent through the communication channel """ pass
[docs] async def _async_send_communication_through_channel(self, message: Message): """ The information the agents wants to share has been encoded into a message and it sent to another agent using a communication channel asynchronously. Note ---- Check :meth:`_send_communication_through_channel` for more information about the method and :class:`.AsyncLoopLockable` for more information about the async loop. Args ---- message: Message to send to the receiver agent through the communication channel """ return self._send_communication_through_channel(message)
[docs] def _compute(self) -> Iterable[Message]: """ All the messages in the message queue are sent to the receiver agent(s) through the communication channel. Some implementations may put a limit on the number of messages sent per iteration, to avoid flooding the channel. Returns ------- Message(s) sent to the receiver agent(s) """ message_sent: list[Message] = [] while not self._message_queue.empty(): message = self._message_queue.get_nowait() if message.receiver_id == self._agent_id and not self._send_to_self: continue self._send_communication_through_channel(message) message_sent.append(message) return message_sent
[docs] async def _async_compute(self) -> Iterable[Message]: """ All the messages in the message queue are sent to the receiver agent(s) through the communication channel asynchronously. Note ---- Check :meth:`_compute` for more information about the method and :class:`.AsyncLoopLockable` for more information about the async loop. Returns: Message(s) sent to the receiver agent(s) """ message_sent: list[Message] = [] while True: try: self._get_message_task = self._async_loop_lock.loop.create_task(self._message_queue.get()) message = await self._get_message_task except asyncio.CancelledError: return message_sent if message.receiver_id == self._agent_id and not self._send_to_self: continue await self._async_send_communication_through_channel(message) message_sent.append(message) if self._message_queue.empty(): break return message_sent
[docs] def _update(self, messages_sent: Iterable[Message]): pass
[docs] async def async_stop(self): if self._get_message_task is not None: self._get_message_task.cancel() await super().async_stop()
[docs] class NullCommunicationSender(CommunicationSender, NullObject): """ Default communication sender used as a placeholder. It is used when no communication sender is set for an agent. An exception is raised if this object is used in any way. """ def __init__(self): super().__init__(-1)
[docs] def _send_communication_through_channel(self, message: Message): pass
[docs] class DefaultCommunicationSender(CommunicationSender[Tasynclooplock]): """ Default implementation of the communication sender. It does not send or receive any message. Args ---- agent_id: Identifier of the agent this component belongs to """
[docs] def _send_communication_through_channel(self, message: Message): pass