aiohttp a une prise en charge intégrée pour websockets. C'est très simple et fonctionne bien.

Une version simplifiée de l'exemple dans la documentation est:

async def handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    # Async iterate the messages the client sends
    async for message in ws:
        ws.send_str('You sent: %s' % (message.data,))

    print('websocket connection closed')

Dans l'exemple, ws est une référence à une connexion Websocket avec un client. Je pourrais facilement mettre ces références dans request.app, comme @ Crandel le fait ici (c'est-à-dire l'état global), mais pas dans une application de production, car chaque serveur d'application (et même chaque travailleur) aura sa propre instance app.

Existe-t-il un modèle accepté pour cela? Y a-t-il un autre moyen?

Remarque: je ne parle pas des sessions. Je fais référence aux connexions. Je souhaite envoyer un message aux clients qui se sont connectés au serveur A lorsque des événements se produisent dans le code d'application du serveur B, etc.

2
orokusaki 6 mars 2016 à 01:38

3 réponses

Meilleure réponse

Mise à jour (février 2017)

Les chaînes n'ont (heureusement) pas été fusionnées avec Django. Cela restera probablement un excellent projet, mais il n'appartenait pas vraiment à Django proprement dit.

En outre, je recommanderais fortement de jeter un coup d'œil au support intégré relativement nouveau de Postgres pour pub / sub. Il surpassera probablement tout le reste, et la création d'un une solution personnalisée sur aiohttp, utilisant Postgres comme service de support, pourrait être votre meilleur pari.

Originale

Bien que non aiohttp, les Django Channels, qui sont susceptibles d'être fusionnés dans Django 1.10, résout ce problème de manière très intuitive, et il est écrit par Andrew Godwin, l'auteur de Django migrations.

Django Channels résume la notion de "nombreux processus sur plusieurs serveurs" en créant une couche de routage devant une application Django. Cette couche de routage parle avec un backend (par exemple, Redis) pour maintenir un état partageable parmi les processus, et utilise un nouveau ASGI protocole pour faciliter la gestion des requêtes HTTP et des WebSockets, tout en déléguant chacun à leur" consommateurs" (par exemple, livré avec un gestionnaire intégré pour les requêtes HTTP, et vous pouvez écrire le vôtre pour WebSockets).

Django Channels a un concept appelé Groups, qui gère les " diffuser "la nature du problème; c'est-à-dire qu'il permet à un événement qui se produit sur un serveur de déclencher des messages aux clients qui sont dans ce groupe, qu'ils soient connectés au même processus ou au serveur ou au serveur différent.

À mon humble avis, Django Channels est très susceptible d'être résumé dans une bibliothèque Python plus générale. Il existe un couple autre Bibliothèques Python qui atteignent Go-like Canaux mais, à ce jour, rien de remarquable n'offre la transparence du réseau; la capacité des canaux à communiquer entre les processus et les serveurs.

1
orokusaki 14 févr. 2017 à 13:43

Je ne connais donc que Socket.IO dans Node, mais il est assez facile de faire évoluer les websockets horizontalement avec Socket.IO.

Les sockets peuvent venir avec des sessions, donc chaque session est gérée par un serveur spécifique. Cela permet d'enregistrer facilement l'état de chaque socket ouvert et d'équilibrer la charge sur tous vos serveurs.

Voici SocketIO pour Python:

https://pypi.python.org/pypi/socketIO-client

Voici une très bonne lecture sur la façon d'attacher des sessions à un redis-store pour le rendre encore plus rapide et l'équilibrage de charge entre les serveurs plus facile à gérer.

Comment partager des sessions avec Socket.IO 1.x et Express 4.x?

Je sais que cela ne répond pas à votre question sur aiohttp, mais j'espère que cela vous donnera une meilleure idée de la façon dont les sockets peuvent fonctionner.

Edit: écrit dans Node-

Dans Socket.IO, c'est vraiment facile, il a une tonne de fonctions pour diffuser des messages de différentes manières.

Pour votre exemple, si vous souhaitez envoyer un message à tout le monde dans chaque chat. Par exemple, tout le monde qui a un socket ouvert, vous pouvez facilement l'écrire.

socket.broadcast.emit('WARNING', "this is a test");

Supposons que vous ayez des salles ouvertes, vous ne pouvez diffuser des messages qu'aux personnes dans cette salle avec une fonction simple appelée .to(). Exemple J'ai une pièce nommée 'BBQ':

socket.broadcast.to('BBQ').emit('invitation', 'Come get some food!');

Cela enverra un message à tout le monde dans le canal BBQ - Venez chercher de la nourriture!

Modifier: Modifier:

Ceci est une écriture fantastique sur le fonctionnement de Socket.IO, assurez-vous de lire la deuxième réponse pour les versions mises à jour des fonctions. Il est beaucoup plus facile à lire que leur documentation.

Envoyer une réponse à tous les clients sauf l'expéditeur (Socket.io)

Pour autant que je sache, c'est ainsi que tout fonctionne dans la mise en œuvre de python. Pour la facilité d'utilisation, je l'utiliserais certainement pour les websockets. L'aiohttp semble vraiment puissant mais n'a pas cette fonctionnalité, enterré dans la documentation, ou écrit uniquement dans le code sans aucune documentation pour le moment.

2
Community 23 mai 2017 à 12:01

Si je vous comprends bien, vous voulez avoir plusieurs serveurs WebSocket, chacun avec plusieurs clients connectés, mais vous voulez pouvoir communiquer potentiellement avec tous les clients connectés.

Voici un exemple qui crée trois serveurs triviaux - un écho de capitalisation, un devis aléatoire et l'heure de la journée - puis envoie un message de diffusion à tous les clients connectés. Peut-être que cela contient des idées utiles.

Pastebin: https://pastebin.com/xDSACmdV

#!/usr/bin/env python3
"""
Illustrates how to have multiple websocket servers running and send
messages to all their various clients at once.

In response to stackoverflow question:
https://stackoverflow.com/questions/35820782/how-to-manage-websockets-across-multiple-servers-workers

Pastebin: https://pastebin.com/xDSACmdV
"""
import asyncio
import datetime
import random
import time
import webbrowser

import aiohttp
from aiohttp import web

__author__ = "Robert Harder"
__email__ = "rob@iharder.net"
__license__ = "Public Domain"


def main():
    # Create servers
    cap_srv = CapitalizeEchoServer(port=9990)
    rnd_srv = RandomQuoteServer(port=9991)
    tim_srv = TimeOfDayServer(port=9992)

    # Queue their start operation
    loop = asyncio.get_event_loop()
    loop.create_task(cap_srv.start())
    loop.create_task(rnd_srv.start())
    loop.create_task(tim_srv.start())

    # Open web pages to test them
    webtests = [9990, 9991, 9991, 9992, 9992]
    for port in webtests:
        url = "http://www.websocket.org/echo.html?location=ws://localhost:{}".format(port)
        webbrowser.open(url)
    print("Be sure to click 'Connect' on the webpages that just opened.")

    # Queue a simulated broadcast-to-all message
    def _alert_all(msg):
        print("Sending alert:", msg)
        msg_dict = {"alert": msg}
        cap_srv.broadcast_message(msg_dict)
        rnd_srv.broadcast_message(msg_dict)
        tim_srv.broadcast_message(msg_dict)

    loop.call_later(17, _alert_all, "ALL YOUR BASE ARE BELONG TO US")

    # Run event loop
    loop.run_forever()


class MyServer:
    def __init__(self, port):
        self.port = port  # type: int
        self.loop = None  # type: asyncio.AbstractEventLoop
        self.app = None  # type: web.Application
        self.srv = None  # type: asyncio.base_events.Server

    async def start(self):
        self.loop = asyncio.get_event_loop()
        self.app = web.Application()
        self.app["websockets"] = []  # type: [web.WebSocketResponse]
        self.app.router.add_get("/", self._websocket_handler)
        await self.app.startup()
        handler = self.app.make_handler()
        self.srv = await asyncio.get_event_loop().create_server(handler, port=self.port)
        print("{} listening on port {}".format(self.__class__.__name__, self.port))

    async def close(self):
        assert self.loop is asyncio.get_event_loop()
        self.srv.close()
        await self.srv.wait_closed()

        for ws in self.app["websockets"]:  # type: web.WebSocketResponse
            await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server shutdown')

        await self.app.shutdown()
        await self.app.cleanup()

    async def _websocket_handler(self, request):
        assert self.loop is asyncio.get_event_loop()
        ws = web.WebSocketResponse()
        await ws.prepare(request)
        self.app["websockets"].append(ws)

        await self.do_websocket(ws)

        self.app["websockets"].remove(ws)
        return ws

    async def do_websocket(self, ws: web.WebSocketResponse):
        async for ws_msg in ws:  # type: aiohttp.WSMessage
            pass

    def broadcast_message(self, msg: dict):
        for ws in self.app["websockets"]:  # type: web.WebSocketResponse
            ws.send_json(msg)


class CapitalizeEchoServer(MyServer):
    """ Echoes back to client whatever they sent, but capitalized. """

    async def do_websocket(self, ws: web.WebSocketResponse):
        async for ws_msg in ws:  # type: aiohttp.WSMessage
            cap = ws_msg.data.upper()
            ws.send_str(cap)


class RandomQuoteServer(MyServer):
    """ Sends a random quote to the client every so many seconds. """
    QUOTES = ["Wherever you go, there you are.",
              "80% of all statistics are made up.",
              "If a tree falls in the woods, and no one is around to hear it, does it make a noise?"]

    def __init__(self, interval: float = 10, *kargs, **kwargs):
        super().__init__(*kargs, **kwargs)
        self.interval = interval

    async def do_websocket(self, ws: web.WebSocketResponse):
        async def _regular_interval():
            while self.srv.sockets is not None:
                quote = random.choice(RandomQuoteServer.QUOTES)
                ws.send_json({"quote": quote})
                await asyncio.sleep(self.interval)

        self.loop.create_task(_regular_interval())

        await super().do_websocket(ws)  # leave client connected here indefinitely


class TimeOfDayServer(MyServer):
    """ Sends a message to all clients simultaneously about time of day. """

    async def start(self):
        await super().start()

        async def _regular_interval():
            while self.srv.sockets is not None:
                if int(time.time()) % 10 == 0:  # Only on the 10 second mark
                    timestamp = "{:%Y-%m-%d %H:%M:%S}".format(datetime.datetime.now())
                    self.broadcast_message({"timestamp": timestamp})
                await asyncio.sleep(1)

        self.loop.create_task(_regular_interval())


if __name__ == "__main__":
    main()
2
Cœur 21 mars 2018 à 12:29