J'ai un serveur qui communique avec 50 appareils ou plus via TCP LAN. Il existe un Task.Run pour chaque boucle de message de lecture de socket.

Je tamponne chaque portée de message dans une file d'attente de blocage, où chaque file d'attente de blocage a un Task.Run à l'aide d'un BlockingCollection.Take ().

Donc quelque chose comme (semi-pseudocode):

Tâche de lecture de socket

Task.Run(() =>
{
    while (notCancelled)
    {
        element = ReadXml();
        switch (element)
        {
            case messageheader:
                MessageBlockingQueue.Add(deserialze<messageType>());
            ...
        }
    }
});

Tâche de tampon de messages

Task.Run(() =>
{
    while (notCancelled)
    {
        Process(MessageQueue.Take());
    }
});

Cela ferait donc plus de 50 tâches de lecture et plus de 50 tâches de blocage sur leurs propres tampons.

Je l'ai fait de cette façon pour éviter de bloquer la boucle de lecture et permettre au programme de répartir plus équitablement le temps de traitement des messages, du moins je crois.

Est-ce une manière inefficace de le gérer? quel serait le meilleur moyen?

3
FinalFortune 20 nov. 2018 à 11:43

3 réponses

Meilleure réponse

Vous pourriez être intéressé par le travail sur les "canaux", en particulier: System.Threading.Channels. L'objectif est de fournir des files d'attente de producteurs / consommateurs asynchrones , couvrant à la fois des scénarios de producteur et de consommateur uniques et multiples, des limites supérieures, etc. En utilisant une API asynchrone, vous n'attachez pas beaucoup de threads attendant juste quelque chose à faire.

Votre boucle de lecture deviendrait:

while (notCancelled) {
    var next = await queue.Reader.ReadAsync(optionalCancellationToken);
    Process(next);
}

Et le producteur:

switch (element)
{
    case messageheader:
        queue.Writer.TryWrite(deserialze<messageType>());
        ...
}

Donc: changements minimes


Alternativement - ou en combinaison - vous pouvez examiner des choses comme les "pipelines" (https: // www.nuget.org/packages/System.IO.Pipelines/) - puisque vous avez affaire à des données TCP, ce serait un ajustement idéal, et c'est quelque chose que j'ai regardé pour le web-socket personnalisé serveur ici sur Stack Overflow (qui traite un énorme nombre de connexions). Étant donné que l'API est asynchrone partout, elle fait un bon travail d'équilibrage - et l'API pipelines est conçue avec des scénarios TCP typiques à l'esprit, par exemple en consommant partiellement les flux de données entrants lorsque vous détectez les limites de trame. J'ai beaucoup écrit sur cette utilisation, avec des exemples de code principalement ici. Notez que "pipelines" n'inclut pas de couche TCP directe, mais que le serveur "kestrel" en inclut une, ou la bibliothèque tierce https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ fait (divulgation: je l'ai écrit).

5
Marc Gravell 20 nov. 2018 à 11:03

Oui, c'est un peu inefficace, car vous bloquez les threads ThreadPool. J'ai déjà discuté de ce problème Utilisation de Task.Yield pour surmonter la famine ThreadPool lors de la mise en œuvre du modèle producteur / consommateur

Vous pouvez également consulter des exemples de test d'un modèle producteur-consommateur ici: https://github.com/BBGONE/TestThreadAffinity

Vous pouvez utiliser await Task.Yield dans la boucle pour donner à d'autres tâches l'accès à ce thread.

Vous pouvez également le résoudre en utilisant des threads dédiés ou mieux un ThreadScheduler personnalisé qui utilise son propre pool de threads. Mais il est inefficace de créer plus de 50 fils simples. Mieux vaut ajuster la tâche, donc ce serait plus coopératif.

Si vous utilisez un BlockingCollection (car il peut bloquer le thread pendant longtemps en attendant d'écrire ( s'il est limité ) ou de lire ou aucun élément à lire), il est préférable d'utiliser System.Threading .Tasks.Channels https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md

Ils ne bloquent pas le thread en attendant que la collection soit disponible en écriture ou en lecture. Voici un exemple de son utilisation https://github.com/BBGONE/TestThreadAffinity / tree / master / ThreadingChannelsCoreFX / ChannelsTest

1
Maxim T 20 nov. 2018 à 11:00

Je fais en fait quelque chose de similaire dans un autre projet. Ce que j'ai appris ou ferais différemment sont les suivants:

  1. Tout d'abord, il vaut mieux utiliser des threads dédiés pour la boucle de lecture / écriture (avec new Thread(ParameterizedThreadStart)) car Task.Run utilise un thread de pool et comme vous l'utilisez dans une boucle (presque) sans fin, le thread n'est pratiquement jamais retourné à la piscine.

    var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
    thread.Start(cancellationToken);
    
  2. Votre Process peut être un événement, que vous pouvez appeler de manière asynchrone afin que votre boucle de lecture puisse être renvoyée immédiatement pour traiter les nouveaux paquets entrants aussi vite que possible:

    private void ReaderLoop(object state)
    {
        var token = (CancellationToken)state;
        while (!token.IsCancellationRequested)
        {
            try
            {
                var message = MessageQueue.Take(token);
                OnMessageReceived(new MessageReceivedEventArgs(message));
            }
            catch (OperationCanceledException)
            {
                if (!disposed && IsRunning)
                    Stop();
                break;
            }
        }
    }
    

Veuillez noter que si un délégué a plusieurs cibles, son appel asynchrone n'est pas trivial. J'ai créé cette méthode d'extension pour appeler un délégué sur les threads de pool:

public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
{
    void Callback(IAsyncResult ar)
    {
        var method = (EventHandler<TEventArgs>)ar.AsyncState;
        try
        {
            method.EndInvoke(ar);
        }
        catch (Exception e)
        {
            HandleError(e, method);
        }
    }

    foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
        handler.BeginInvoke(sender, args, Callback, handler);
}

Ainsi, la mise en œuvre OnMessageReceived peut être:

protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
    => messageReceivedHandler.InvokeAsync(this, e);
  1. Enfin, ce fut une grande leçon que BlockingCollection<T> a quelques problèmes de performances. Il utilise SpinWait en interne, dont la méthode SpinOnce attend de plus en plus longtemps s'il n'y a pas de données entrantes pendant longtemps. C'est un problème délicat car même si vous enregistrez chaque étape du traitement, vous ne remarquerez pas que tout est démarré en retard, à moins que vous ne puissiez vous moquer également du côté serveur. Ici, vous pouvez trouver une mise en œuvre rapide BlockingCollection utilisant un AutoResetEvent pour déclencher les données entrantes. Je lui ai ajouté une surcharge Take(CancellationToken) comme suit:

    /// <summary>
    /// Takes an item from the <see cref="FastBlockingCollection{T}"/>
    /// </summary>
    public T Take(CancellationToken token)
    {
        T item;
        while (!queue.TryDequeue(out item))
        {
            waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
            token.ThrowIfCancellationRequested();
        }
    
        return item;
    }
    

En gros, c'est tout. Peut-être que tout n'est pas applicable dans votre cas, par exemple. si la réponse presque immédiate n'est pas cruciale, le BlockingCollection régulier le fera également.

1
György Kőszeg 20 nov. 2018 à 09:59