J'ai un code asynchrone qui appelle du code synchrone qui prend un certain temps à s'exécuter, j'ai donc suivi les suggestions décrites dans Quelle est la meilleure approche pour encapsuler le blocage E/S dans future-rs ?. Cependant, mon code asynchrone a un timeout, après quoi je ne suis plus intéressé par le résultat du calcul synchrone :

use std::{thread, time::Duration};
use tokio::{task, time}; // 0.2.10

// This takes 1 second
fn long_running_complicated_calculation() -> i32 {
    let mut sum = 0;
    for i in 0..10 {
        thread::sleep(Duration::from_millis(100));
        eprintln!("{}", i);
        sum += i;
        // Interruption point
    }
    sum
}

#[tokio::main]
async fn main() {
    let handle = task::spawn_blocking(long_running_complicated_calculation);
    let guarded = time::timeout(Duration::from_millis(250), handle);

    match guarded.await {
        Ok(s) => panic!("Sum was calculated: {:?}", s),
        Err(_) => eprintln!("Sum timed out (expected)"),
    }
}

L'exécution de ce code montre que le délai d'attente se déclenche, mais le code synchrone également continue de s'exécuter :

0
1
Sum timed out (expected)
2
3
4
5
6
7
8
9

Comment puis-je arrêter d'exécuter le code synchrone lorsque le futur emballage est supprimé ?

Je ne m'attends pas à ce que le compilateur puisse comme par magie arrêter mon code synchrone. J'ai annoté une ligne avec "point d'interruption" où je devrais mettre manuellement une sorte de contrôle pour quitter plus tôt ma fonction, mais je ne sais pas comment obtenir facilement une notification indiquant que le résultat de spawn_blocking (ou ThreadPool::spawn_with_handle, pour le code purement futuriste) a été supprimé.

4
Shepmaster 30 janv. 2020 à 03:53

1 réponse

Meilleure réponse

Vous pouvez passer un booléen atomique que vous utilisez ensuite pour signaler la tâche comme nécessitant une annulation. (Je ne suis pas sûr d'utiliser un Ordering approprié pour les appels load/store, cela nécessite probablement plus de considération)

Voici une version modifiée de votre code qui génère

0
1
Sum timed out (expected)
2
Interrupted...
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{thread, time::Duration};
use tokio::{task, time}; // 0.2.10

// This takes 1 second
fn long_running_complicated_calculation(flag: &AtomicBool) -> i32 {
    let mut sum = 0;
    for i in 0..10 {
        thread::sleep(Duration::from_millis(100));
        eprintln!("{}", i);
        sum += i;
        // Interruption point
        if !flag.load(Ordering::Relaxed) {
            eprintln!("Interrupted...");
            break;
        }
    }
    sum
}

#[tokio::main]
async fn main() {
    let some_bool = Arc::new(AtomicBool::new(true));

    let some_bool_clone = some_bool.clone();
    let handle =
        task::spawn_blocking(move || long_running_complicated_calculation(&some_bool_clone));
    let guarded = time::timeout(Duration::from_millis(250), handle);

    match guarded.await {
        Ok(s) => panic!("Sum was calculated: {:?}", s),
        Err(_) => {
            eprintln!("Sum timed out (expected)");
            some_bool.store(false, Ordering::Relaxed);
        }
    }
}

terrain de jeux


Il n'est pas vraiment possible de faire en sorte que cela se produise automatiquement lors de la suppression des futures / handles avec Tokio actuel. Certains travaux dans ce sens sont en cours sur http://github.com/tokio-rs/ tokio/issues/1830 et http://github.com/tokio- rs/tokio/issues/1879.

Cependant, vous pouvez obtenir quelque chose de similaire en enveloppant les contrats à terme dans un type personnalisé.

Voici un exemple qui ressemble presque au code d'origine, mais avec l'ajout d'un simple type wrapper dans un module. Ce serait encore plus ergonomique si j'implémentais Future<T> sur le type de wrapper qui se contente de transmettre à la poignée enveloppée, mais cela s'avérait fastidieux.

mod blocking_cancelable_task {
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::Arc;
    use tokio::task;

    pub struct BlockingCancelableTask<T> {
        pub h: Option<tokio::task::JoinHandle<T>>,
        flag: Arc<AtomicBool>,
    }

    impl<T> Drop for BlockingCancelableTask<T> {
        fn drop(&mut self) {
            eprintln!("Dropping...");
            self.flag.store(false, Ordering::Relaxed);
        }
    }

    impl<T> BlockingCancelableTask<T>
    where
        T: Send + 'static,
    {
        pub fn new<F>(f: F) -> BlockingCancelableTask<T>
        where
            F: FnOnce(&AtomicBool) -> T + Send + 'static,
        {
            let flag = Arc::new(AtomicBool::new(true));
            let flag_clone = flag.clone();
            let h = task::spawn_blocking(move || f(&flag_clone));
            BlockingCancelableTask { h: Some(h), flag }
        }
    }

    pub fn spawn<F, T>(f: F) -> BlockingCancelableTask<T>
    where
        T: Send + 'static,
        F: FnOnce(&AtomicBool) -> T + Send + 'static,
    {
        BlockingCancelableTask::new(f)
    }
}

use std::sync::atomic::{AtomicBool, Ordering};
use std::{thread, time::Duration};
use tokio::time; // 0.2.10

// This takes 1 second
fn long_running_complicated_calculation(flag: &AtomicBool) -> i32 {
    let mut sum = 0;
    for i in 0..10 {
        thread::sleep(Duration::from_millis(100));
        eprintln!("{}", i);
        sum += i;
        // Interruption point
        if !flag.load(Ordering::Relaxed) {
            eprintln!("Interrupted...");
            break;
        }
    }
    sum
}

#[tokio::main]
async fn main() {
    let mut h = blocking_cancelable_task::spawn(long_running_complicated_calculation);
    let guarded = time::timeout(Duration::from_millis(250), h.h.take().unwrap());
    match guarded.await {
        Ok(s) => panic!("Sum was calculated: {:?}", s),
        Err(_) => {
            eprintln!("Sum timed out (expected)");
        }
    }
}

terrain de jeux

4
Shepmaster 30 janv. 2020 à 19:21