Je suis nouveau sur Scala et je m'entraîne sur la librairie Futures en créant des schémas de relance. Ce faisant, j'ai obtenu le morceau de code suivant:

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object Retries extends App {

  var retries = 0

  def resetRetries(): Unit = retries = 0

  def calc() = if (retries > 3) 10 else {
    retries += 1
    println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
    throw new IllegalArgumentException("This failed")
  }

  def fCalc(): Future[Int] = Future(calc())

  resetRetries()

  val ff = fCalc() // 0 - should fail
    .fallbackTo(fCalc()) // 1 - should fail
    .fallbackTo(fCalc()) // 2 - should fail
    .fallbackTo(fCalc()) // 3 - should fail
    .fallbackTo(fCalc()) // 4 - should be a success

  Await.ready(ff, 10.second)

  println(ff.isCompleted)
  println(ff.value)
}

Chaque fois que j'exécute ce code, j'obtiens des résultats différents. Des exemples des résultats que j'obtiens sont les suivants

Sortie 1

I am thread 12 This is going to fail. Retry count 1
I am thread 14 This is going to fail. Retry count 3
I am thread 13 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))

Sortie 2

I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 1
I am thread 13 This is going to fail. Retry count 3
I am thread 14 This is going to fail. Retry count 4
true
Some(Success(10))

Sortie 3

I am thread 12 This is going to fail. Retry count 1
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 12 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))

Il n'est pas toujours vrai que les résultats alternent entre le succès et l'échec. Il peut y avoir plusieurs exécutions échouées jusqu'à ce qu'une réussite apparaisse.

À ma connaissance, il ne devrait y avoir que 4 journaux du "Je suis le thread x Cela va échouer. Nombre de tentatives x" et ceux-ci devraient être les suivants:

I am thread a This is going to fail. Retry count 1
I am thread b This is going to fail. Retry count 2
I am thread c This is going to fail. Retry count 3
I am thread d This is going to fail. Retry count 4

Pas nécessairement dans cet ordre - puisque je ne sais pas comment le modèle de filetage Scala fonctionne exactement - mais vous comprenez mon point. Néanmoins, j'obtiens cette sortie non déterministe que je ne peux pas comprendre. Donc ... ma question est: d'où vient cette sortie non déterministe?

Je tiens à mentionner que le mécanisme de nouvelle tentative suivant donne systématiquement les mêmes résultats:

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object Retries extends App {

  var retries = 0

  def resetRetries(): Unit = retries = 0

  def calc() = if (retries > 3) 10 else {
    retries += 1
    println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
    throw new IllegalArgumentException("This failed")
  }

  def retry[T](op: => T)(retries: Int): Future[T] = Future(op) recoverWith { case _ if retries > 0 => retry(op)(retries - 1) }

  resetRetries()
  val retriableFuture: Future[Future[Int]] = retry(calc())(5)
  Await.ready(retriableFuture, 10 second)

  println(retriableFuture.isCompleted)
  println(retriableFuture.value)
}

Production

I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Success(10))

Alors que si je réduis le nombre de tentatives (retry(calc())(3)), le résultat est un futur échoué comme prévu

I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))
1
Niko 27 janv. 2019 à 16:25

3 réponses

Meilleure réponse

Bien que techniquement @Tim soit correct, je ne pense pas qu'il réponde vraiment à cette question.

Je crois que la véritable source de votre confusion est votre incompréhension de ce que les constructions:

f.fallbackTo(Future(calc()))

Est-ce que. Et comment c'est différent de

f.recoverWith({ case _ => Future(calc())})

Il existe deux distinctions importantes:

  1. Dans le cas fallbackTo, le Future(calc()) est créé immédiatement et donc commence (presque) immédiatement l'exécution du calc(). Ainsi, le futur d'origine et le futur de repli sont exécutés simultanément. Dans le cas du recoverWith, le futur de secours n'est créé qu'après l'échec du futur d'origine. Cette différence affecte l'ordre de journalisation. Cela signifie également que l'accès à var retries est simultané et que vous pourriez donc voir le cas où tous les threads échouent parce que certaines mises à jour de retries sont perdues.

  2. Un autre point délicat est que fallbackTo est documenté as (la mise en évidence est à moi)

Crée un nouvel avenir qui contient le résultat de cet avenir s'il a été achevé avec succès, ou, sinon, le résultat de cet avenir s'il est terminé avec succès. Si les deux futurs échouent , le futur résultant contient l'objet jetable du premier futur .

Cette différence n'affecte pas vraiment votre exemple car l'exception que vous lancez dans toutes les tentatives infructueuses est la même, mais elle peut avoir affecté le résultat si elles étaient différentes. Par exemple, si vous modifiez votre code pour:

  def calc(attempt: Int) = if (retries > 3) 10 else {
    retries += 1
    println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
    throw new IllegalArgumentException(s"This failed $attempt")
  }

  def fCalc(attempt: Int): Future[Int] = Future(calc(attempt))

  val ff = fCalc(1) // 0 - should fail
      .fallbackTo(fCalc(2)) // 1 - should fail
      .fallbackTo(fCalc(3)) // 2 - should fail
      .fallbackTo(fCalc(4)) // 3 - should fail
      .fallbackTo(fCalc(5)) // 4 - should be a success

Alors vous devriez obtenir l'un de ces deux résultats

Some(Failure(java.lang.IllegalArgumentException: This failed 1))
Some(Success(10))

Et jamais aucune autre valeur «ratée».

Notez qu'ici je passe explicitement le attempt pour ne pas atteindre la condition de concurrence sur retries.


Réponse à d'autres commentaires (28 janvier)

La raison pour laquelle je passe explicitement attempt dans mon exemple précédent est que c'est le moyen le plus simple de garantir que le IllegalArgumentException créé par le premier calc logiquement obtiendra 1 comme valeur sous tous les programmes de threads (même pas très réalistes).

Si vous souhaitez simplement que tous les journaux aient des valeurs différentes, il existe un moyen beaucoup plus simple: utilisez une variable locale!

  def calc() = {
    val retries = atomicRetries.getAndIncrement()
    if (retries > 3) 10 
    else {
      println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
      throw new IllegalArgumentException(s"This failed $retries")
    }
  }

De cette façon, vous évitez le problème classique de TOCTOU.

4
SergGr 28 janv. 2019 à 18:15

Ce n'est pas un problème Scala mais un problème de multi-threading plus général avec la valeur retries. Vous avez plusieurs threads lisant et écrivant cette valeur sans aucune synchronisation, vous ne pouvez donc pas prédire quand chaque thread sera exécuté ou quelle valeur il verra.

Il semble que le problème spécifique soit que vous testez retries et que vous le mettez à jour plus tard. Il est possible que les quatre threads testent la valeur avant que l'un d'eux ne la mette à jour. Dans ce cas, ils verraient tous 0 et lanceraient une erreur.

La solution est de transformer retries en AtomicInteger et d'utiliser getAndIncrement. Cela récupérera atomiquement la valeur et l'incrémentera, de sorte que chaque thread verra la valeur appropriée.


Mettre à jour les commentaires suivants : l'autre réponse a expliqué pourquoi plusieurs threads sont démarrés en même temps, je ne vais donc pas le répéter ici. Avec plusieurs threads exécutés en parallèle, l'ordre de la journalisation sera toujours non déterministe.

2
Tim 28 janv. 2019 à 07:42

C'est ce qui a finalement fonctionné pour moi:

(Le code suivant pour la méthode calc() aborde de manière adéquate les problèmes de journalisation de la duplication et des résultats non déterministes des futurs)

var time = 0
  var resetTries = time = 0

  def calc() = this.synchronized {
    if (time > 3) 10 else {
      time += 1
      println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $time") // For debugging purposes
      throw new IllegalStateException(("not yet"))
    }
  }

Non AtomicInteger requis - rend les choses encore plus compliquées à mon avis. Un wrapper synchronised est ce qui est nécessaire.

Je dois souligner le fait que ce n'est qu'à des fins de démonstration et que l'utilisation d'une telle conception dans un code de production n'est peut-être pas la meilleure idée (bloquer les appels à la méthode calc). Il faut plutôt utiliser l'implémentation recoverWith.

Merci à @SergGr, @Tim et @MichalPolitowksi pour leur aide

0
Niko 28 janv. 2019 à 10:35