import asyncio
from abc import abstractmethod
from typing import TYPE_CHECKING, Any, Iterable
from symaware.base.data import (
Identifier,
Message,
MultiAgentAwarenessVector,
MultiAgentKnowledgeDatabase,
)
from symaware.base.utils import NullObject, Tasynclooplock
from .component import Component
if TYPE_CHECKING:
import sys
from typing import Callable
from symaware.base.agent import Agent
if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias
SendingCommunicationCallback: TypeAlias = Callable[
[
Agent,
MultiAgentAwarenessVector,
MultiAgentKnowledgeDatabase,
"Identifier | Iterable[Identifier] | None",
],
Any,
]
SentCommunicationCallback: TypeAlias = Callable[[Agent, Iterable[Message]], Any]
[docs]
class CommunicationSender(Component[Tasynclooplock, "SendingCommunicationCallback", "SentCommunicationCallback"]):
"""
Generic communication system of an :class:`.symaware.base.Agent`.
It is used to communicate with other agents.
The information collected is then used to update the knowledge database of the agent.
Before sending a message, it will be enqueued in the message queue.
On the next iteration, the messages will be sent to the receiver agent(s) through the communication channel.
Args
----
agent_id:
Identifier of the agent this component belongs to
async_loop_lock:
Async loop lock to use for the communication sender
send_to_self:
If True, the agent is able to send messages to itself
"""
def __init__(
self,
agent_id: Identifier,
async_loop_lock: "Tasynclooplock | None" = None,
send_to_self: bool = False,
):
super().__init__(agent_id, async_loop_lock)
self._send_to_self = send_to_self
self._message_queue: "asyncio.Queue[Message]" = asyncio.Queue()
self._get_message_task: "asyncio.Task[Message] | None" = None
[docs]
def enqueue_messages(self, *messages: Message):
"""
Enqueue a message to be sent to another agent.
Args
----
message:
Message to send to the receiver agent
"""
for message in messages:
self._message_queue.put_nowait(message)
[docs]
@abstractmethod
def _send_communication_through_channel(self, message: Message):
"""
The information the agents wants to share has been encoded into a message and it sent to another agent
using a communication channel.
Example
-------
Create a new communication system by subclassing the :class:`.CommunicationSender` and implementing the
:meth:`_send_communication_through_channel` method.
>>> from symaware.base import CommunicationSender, Message
>>> class MyCommunicationSender(CommunicationSender):
... def _send_communication_through_channel(self, message: Message):
... # Your implementation here
... # Example:
... # Print the message to the console
... print(f"Sending message to {message.receiver_id}: {message}")
Args
----
message:
Message to send to the receiver agent through the communication channel
"""
pass
[docs]
async def _async_send_communication_through_channel(self, message: Message):
"""
The information the agents wants to share has been encoded into a message and it sent to another agent
using a communication channel asynchronously.
Note
----
Check :meth:`_send_communication_through_channel` for more information about the method
and :class:`.AsyncLoopLockable` for more information about the async loop.
Args
----
message:
Message to send to the receiver agent through the communication channel
"""
return self._send_communication_through_channel(message)
[docs]
def _compute(self) -> Iterable[Message]:
"""
All the messages in the message queue are sent to the receiver agent(s) through the communication channel.
Some implementations may put a limit on the number of messages sent per iteration,
to avoid flooding the channel.
Returns
-------
Message(s) sent to the receiver agent(s)
"""
message_sent: list[Message] = []
while not self._message_queue.empty():
message = self._message_queue.get_nowait()
if message.receiver_id == self._agent_id and not self._send_to_self:
continue
self._send_communication_through_channel(message)
message_sent.append(message)
return message_sent
[docs]
async def _async_compute(self) -> Iterable[Message]:
"""
All the messages in the message queue are sent to the receiver agent(s)
through the communication channel asynchronously.
Note
----
Check :meth:`_compute` for more information about the method
and :class:`.AsyncLoopLockable` for more information about the async loop.
Returns:
Message(s) sent to the receiver agent(s)
"""
message_sent: list[Message] = []
while True:
try:
self._get_message_task = self._async_loop_lock.loop.create_task(self._message_queue.get())
message = await self._get_message_task
except asyncio.CancelledError:
return message_sent
if message.receiver_id == self._agent_id and not self._send_to_self:
continue
await self._async_send_communication_through_channel(message)
message_sent.append(message)
if self._message_queue.empty():
break
return message_sent
[docs]
def _update(self, messages_sent: Iterable[Message]):
pass
[docs]
async def async_stop(self):
if self._get_message_task is not None:
self._get_message_task.cancel()
await super().async_stop()
[docs]
class NullCommunicationSender(CommunicationSender, NullObject):
"""
Default communication sender used as a placeholder.
It is used when no communication sender is set for an agent.
An exception is raised if this object is used in any way.
"""
def __init__(self):
super().__init__(-1)
[docs]
def _send_communication_through_channel(self, message: Message):
pass
[docs]
class DefaultCommunicationSender(CommunicationSender[Tasynclooplock]):
"""
Default implementation of the communication sender.
It does not send or receive any message.
Args
----
agent_id:
Identifier of the agent this component belongs to
"""
[docs]
def _send_communication_through_channel(self, message: Message):
pass