J'implémente un client TCP qui lit et envoie des fichiers et des chaînes et j'utilise Boost comme bibliothèque principale. J'aimerais continuer à lire ou à envoyer des fichiers pendant que je continue d'envoyer des chaînes, qui dans ce cas sont les commandes à envoyer au serveur. Pour cela j'ai pensé à utiliser un Thread Pool afin de ne pas surcharger le client. Ma question est la suivante : puis-je utiliser des contrats à terme pour utiliser des rappels lorsque l'un des threads du pool se termine ? Si je n'y arrive pas, existe-t-il une autre solution ? Je faisais quelque chose comme ça, où pool_ est un boost:asio:thread_pool

void send_file(std::string const& file_path){
    boost::asio::post(pool_, [this, &file_path] {
        handle_send_file(file_path);
    });
    // DO SOMETHING WHEN handle_send_file ENDS
}

void handle_send_file(std::string const& file_path) {
    boost::array<char, 1024> buf{};
    boost::system::error_code error;
    std::ifstream source_file(file_path, std::ios_base::binary | std::ios_base::ate);

    if(!source_file) {
        std::cout << "[ERROR] Failed to open " << file_path << std::endl;
        //TODO gestire errore
    }
    size_t file_size = source_file.tellg();
    source_file.seekg(0);

    std::string file_size_readable = file_size_to_readable(file_size);

    // First send file name and file size in bytes to server
    boost::asio::streambuf request;
    std::ostream request_stream(&request);
    request_stream << file_path << "\n"
                   << file_size << "\n\n"; // Consider sending readable version, does it change anything?

    // Send the request
    boost::asio::write(*socket_, request, error);
    if(error){
        std::cout << "[ERROR] Send request error:" << error << std::endl;
        //TODO lanciare un'eccezione? Qua dovrò controllare se il server funziona o no
    }
    if(DEBUG) {
        std::cout << "[DEBUG] " << file_path << " size is: " << file_size_readable << std::endl;
        std::cout << "[DEBUG] Start sending file content" << std::endl;
    }

    long bytes_sent = 0;
    float percent = 0;
    print_percentage(percent);

    while(!source_file.eof()) {
        source_file.read(buf.c_array(), (std::streamsize)buf.size());

        int bytes_read_from_file = source_file.gcount(); //int is fine because i read at most buf's size, 1024 in this case

        if(bytes_read_from_file<=0) {
            std::cout << "[ERROR] Read file error" << std::endl;
            break;
            //TODO gestire questo errore
        }

        percent = std::ceil((100.0 * bytes_sent) / file_size);
        print_percentage(percent);

        boost::asio::write(*socket_, boost::asio::buffer(buf.c_array(), source_file.gcount()),
                           boost::asio::transfer_all(), error);
        if(error) {
            std::cout << "[ERROR] Send file error:" << error << std::endl;
            //TODO lanciare un'eccezione?
        }

        bytes_sent += bytes_read_from_file;
    }

    std::cout << "\n" << "[INFO] File " << file_path << " sent successfully!" << std::endl;
}
1
Andreascopp 18 nov. 2020 à 00:23

1 réponse

Meilleure réponse

Les opérations publiées dans le pool se terminent sans que les threads se terminent. C'est tout le but de la mise en commun des threads.

void send_file(std::string const& file_path){
    post(pool_, [this, &file_path] {
        handle_send_file(file_path);
    });
    // DO SOMETHING WHEN handle_send_file ENDS
}

Cela a plusieurs problèmes. Le plus important est que vous ne devez pas capturer file_path par référence, car l'argument est bientôt hors de portée et l'appel handle_send_file s'exécutera à un moment non spécifié dans un autre thread. C'est une condition de course et une référence pendante. Résultats de Comportement non défini.

Puis le

    // DO SOMETHING WHEN handle_send_file ENDS

Est sur une ligne qui n'a pas de relation de séquence avec handle_send_file. En fait, il sera probablement exécuté avant que cette opération ait une chance de démarrer.

Simplifier

Voici une version simplifiée:

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;
    
static asio::thread_pool pool_;

struct X {
    std::unique_ptr<tcp::socket> socket_;
    
    explicit X(unsigned short port) : socket_(new tcp::socket{ pool_ }) {
        socket_->connect({ {}, port });
    }

    asio::thread_pool pool_;
    std::unique_ptr<tcp::socket> socket_{ new tcp::socket{ pool_ } };

    void send_file(std::string file_path) {
        post(pool_, [=, this] {
            send_file_implementation(file_path);
            // DO SOMETHING WHEN send_file_implementation ENDS
        });
    }

    // throws system_error exception
    void send_file_implementation(std::string file_path) {
        std::ifstream source_file(file_path,
                                  std::ios_base::binary | std::ios_base::ate);
        size_t file_size = source_file.tellg();
        source_file.seekg(0);

        write(*socket_,
                asio::buffer(file_path + "\n" + std::to_string(file_size) + "\n\n"));

        boost::array<char, 1024> buf{};
        while (source_file.read(buf.c_array(), buf.size()) ||
               source_file.gcount() > 0)
        {
            int n = source_file.gcount();

            if (n <= 0) {
                using namespace boost::system;
                throw system_error(errc::io_error, system_category());
            }

            write(*socket_, asio::buffer(buf), asio::transfer_exactly(n));
        }
    }
};

Maintenant, vous pouvez en effet exécuter plusieurs de ces opérations en parallèle (en supposant plusieurs instances de X, vous avez donc des connexions socket_ distinctes).

Pour faire quelque chose à la fin, il suffit de mettre le code là où j'ai déplacé le commentaire :

// DO SOMETHING WHEN send_file_implementation ENDS

Si vous ne savez pas quoi faire là-bas et que vous souhaitez préparer un avenir à ce stade, vous pouvez :

std::future<void> send_file(std::string file_path) {
    std::packaged_task<void()> task([=, this] {
        send_file_implementation(file_path);
    });

    return post(pool_, std::move(task));
}

Cette surcharge de post renvoie comme par magie¹ le futur de la tâche empaquetée. Cette tâche empaquetée définira la promesse interne avec la valeur de retour (void) ou l'exception levée.

Voyez-le en action : Live On Coliru

int main() {
    // send two files simultaneously to different connections
    X clientA(6868);
    X clientB(6969);

    std::future<void> futures[] = {
        clientA.send_file("main.cpp"),
        clientB.send_file("main.cpp"),
    };

    for (auto& fut : futures) try {
        fut.get();
        std::cout << "Everything completed without error\n";
    } catch(std::exception const& e) {
        std::cout << "Error occurred: " << e.what() << "\n";
    };

    pool_.join();
}

J'ai testé ceci en exécutant deux netcats pour écouter sur 6868/6969 :

nc -l -p 6868 | head& nc -l -p 6969 | md5sum&
./a.out
wait

Le serveur imprime:

Everything completed without error
Everything completed without error

Les netcats impriment leur sortie filtrée :

main.cpp
1907

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <fstream>
#include <iostream>
#include <future>
namespace asio = boost::asio;
using asio::ip::tcp;
7ecb71992bcbc22bda44d78ad3e2a5ef  -

¹ pas de magie : voir https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/async_result.html

1
sehe 17 nov. 2020 à 23:37