import asyncio
import time


class MessageStream:
    """
    A message stream for consumers to subscribe to,
    and for producers to publish to.
    """
    def __init__(self):
        self._loop = asyncio.get_running_loop()
        self._waiter = self._loop.create_future()

    def publish(self, message):
        """
        Publish a message to this MessageStream

        :param message: The message to publish
        """
        waiter, self._waiter = self._waiter, self._loop.create_future()
        waiter.set_result((message, time.time(), self._waiter))

    async def __aiter__(self):
        """
        Iterate over the messages in the message stream
        """
        waiter = self._waiter
        while True:
            # Shield the future from being cancelled by a task waiting on it
            message, ts, waiter = await asyncio.shield(waiter)
            yield message, ts