Je veux implémenter quelque chose comme un gestionnaire de tâches qui sera poussé vers de nouvelles tâches. Chacune de ces tâches pourrait être une opération asynchrone comme attendre l'utilisateur ou faire des appels API ou autre chose. Le gestionnaire de tâches s'assure qu'à un moment donné, seul le nombre de tâches autorisé peut s'exécuter, tandis que d'autres tâches continueront d'attendre jusqu'à ce que leur tour arrive.

class Runner {
  constructor(concurrent) {
    this.taskQueue = []; //this should have "concurrent" number of tasks running at any given time

  }

  push(task) {
    /* pushes to the queue and then runs the whole queue */
  }
}

Le modèle d'appel serait

let runner = new Runner(3);
runner.push(task1);
runner.push(task2);
runner.push(task3);
runner.push(task4);

Où tâche est une référence de fonction qui exécutera un rappel à la fin par lequel nous pouvons savoir qu'elle est terminée. Donc ça devrait être comme

let task = function(callback) {
  /* does something which is waiting on IO or network or something else*/
  callback(); 
}

Je pousse donc une fermeture au coureur comme

runner.push(function(){return task(callback);});

Je pense que je pourrais également avoir besoin d'ajouter une file d'attente WaitList. Mais les tâches ne se promettent pas, donc je ne sais pas comment vérifier si elles sont terminées.

Quoi qu'il en soit, j'ai besoin de la bonne approche.

4
Roy 14 avril 2018 à 02:24

6 réponses

Meilleure réponse

Je pousse donc une fermeture au coureur comme

runner.push(function(){return task(callback);});

On dirait que des morceaux manquants du coureur sont ajoutés à la syntaxe d'appel. Un coureur plus complet pourrait ressembler à:

class Runner {
  constructor(concurrent) {
    this.taskQueue = []; // run at most "concurrent" number of tasks at once
    this.runCount = 0;
    this.maxCount = concurrent;
    this.notifyEnd = this.notifyEnd.bind(this);
  }

  notifyEnd() {
    --this.runCount;
    this.run();
  }

  run() {
    while( (this.runCount < this.maxCount) && taskQueue.length) {
      ++this.runCount;
      // call task with callback bound to this instance (in the constructor)
      taskQueue.shift()(this.notifyEnd);
    } 
  }

  push(task) {
    this.taskQueue.push(task);
    this.run();
  }
}

Maintenant, la méthode push du coureur est appelée avec une fonction prenant un paramètre de rappel. L'état de l'exécution est contenu dans la valeur de runCount, 0 pour un entier inactif ou positif pour les tâches en cours d'exécution.

Il reste quelques problèmes:

  1. La tâche peut être appelée de manière synchrone pour coder en l'ajoutant au runner. Il manque l'approche stricte de Promises qui appelle toujours un rappel then de manière asynchrone à partir de la file d'attente des événements.

  2. Le code de tâche doit retourner normalement sans erreur. Ce n'est pas inconnu en JavaScript, où le suivi d'hôte pour les erreurs de rejet de promesse non capturées doit faire la même chose, mais c'est assez inhabituel dans le script d'application. L'appel du coureur à la tâche peut être placé dans un bloc try/catch pour détecter les erreurs synchrones, mais il doit également ajouter du code pour ignorer l'erreur si un rappel a été reçu avant que la tâche ne génère une erreur synchrone - sinon le nombre de tâches en cours pourrait se tromper.

  3. Si la tâche appelle le rappel plusieurs fois, le nombre de tâches en cours d'exécution sera perturbé dans le coureur ci-dessus.

Des considérations similaires à celles-ci étaient à l'origine du développement et de la normalisation de l'interface Promise. Je suggère qu'après avoir pris en considération les inconvénients potentiels, si un simple exécuteur de tâches répond à toutes les exigences, utilisez-en un. Si une robustesse supplémentaire est requise, promettre des tâches et écrire un coureur plus axé sur les promesses pourrait s'avérer une meilleure alternative.

2
traktor53 11 mai 2019 à 00:34

Je pousse donc une fermeture au coureur comme

runner.push(function(){return task(callback);});

Seriez-vous en mesure de spécifier les fonctions tâche et rappel en tant que paramètres distincts dans la fonction push ? Si oui, vous pouvez probablement faire quelque chose comme ça.

class Runner {
  constructor(maxCount = 1) {
    this.taskQueue = [];
    this.maxCount = maxCount;
    this.currentCount = 0;
  }

  run() {
    if (this.taskQueue.length && this.currentCount < this.maxCount) {
      const task = this.taskQueue.shift();
      task();
    }
  }

  push(task, callback) {
    this.taskQueue.push(() => {
      this.currentCount++;
      task((...args) => {
        this.currentCount--;
        callback(...args);
        this.run();
      })
    })
    this.run();
  }
}

// Example usage
const myCallback = (caller) => {
  console.log(`myCallback called by ${caller} ${new Date()}`);
};

const task1 = (callback) => {
  console.log(`task1 started ${new Date()}`);
  setTimeout(() => {
    callback('task1');
  }, 3000);
};

const task2 = (callback) => {
  console.log(`task2 started ${new Date()}`);
  setTimeout(() => {
    callback('task2');
  }, 3000);
};

const task3 = (callback) => {
  console.log(`task3 started ${new Date()}`);
  setTimeout(() => {
    callback('task3');
  }, 3000);
};

const task4 = (callback) => {
  console.log(`task4 started ${new Date()}`);
  setTimeout(() => {
    callback('task4');
  }, 3000);
};

const runner = new Runner(2);
runner.push(task1, myCallback);
runner.push(task2, myCallback);
runner.push(task3, myCallback);
runner.push(task4, myCallback);
1
JoshA 15 avril 2018 à 05:57

Une simple démonstration du concept. Modification des noms de variables pour une meilleure compréhension.

class Runner {
  constructor(concurrency = 1) {
    this.concurrency = concurrency;
    this.waitList = [];
    this.count = 0;
    this.currentQ = [];
  }

  push(task) {
    this.waitList.push(task);
    this.run();
  }

  run() {
    let me = this;
    if (this.count < this.concurrency) {
      this.count++;
      if (this.waitList.length > 0) {
        let task = this.waitList.shift();
        let id = task.id;
        this.currentQ.push(id);
        this.showQ();
        task.task(function() {
          this.currentQ.splice(this.currentQ.indexOf(id), 1);
          this.showQ();
          this.count--;
          this.run();
        }.bind(me))
      }
    }
  }

  showQ() {
    let q = "";
    q = this.currentQ.join(', ');
    document.getElementById("running").innerHTML = q;
  }
}

let output = document.getElementById("output");

let task1 = {
  id: 1,
  task: function(done) {
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task1");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task1");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 3000)
  }
}
let task2 = {
  id: 2,
  task: function(done) {
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task2");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task2");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 6000)
  }
}
let task3 = {
  id: 3,
  task: function(done) {
    this.id = "3";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task3");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task3");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 10000)
  }
}
let task4 = {
  id: 4,
  task: function(done) {
    this.id = "4";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task4");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task4");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 5000)
  }
}
let task5 = {
  id: 5,
  task: function(done) {
    this.id = "5";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task5");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task5");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 6000)
  }
}
let task6 = {
  id: 6,
  task: function(done) {
    this.id = "6";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task6");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task6");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 4000)
  }
}
let task7 = {
  id: 7,
  task: function(done) {
    this.id = "7";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task7");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task7");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 5000)
  }
}

let r = new Runner(3);
r.push(task1);
r.push(task2);
r.push(task3);
r.push(task4);
r.push(task5);
r.push(task6);
r.push(task7);
Currently running
<div id="running">

</div>
<hr>
<div id="output">

</div>
2
Roy 16 avril 2018 à 00:57

Il est raisonnable de définir les tâches comme des promesses (plus précisément, des fonctions de retour de promesse) car c'est un bon cas d'utilisation pour elles; actuellement, les erreurs ne peuvent pas être gérées (sans promesses, elles pourraient être gérées de manière conventionnelle avec des rappels de style Node). Même si elles ne sont pas des promesses, les promesses peuvent être utilisées en interne:

  constructor(concurrent = 1) {
    this.concurrent = concurrent;
    this.taskQueue = [];
  }

  push(task) {
    this.taskQueue.push(task);
  }

  run() {
    let tasksPromise = Promise.resolve();
    for (let i = 0; i < this.taskQueue.length; i += this.concurrent) {
      const taskChunk = this.taskQueue.slice(i, i + this.concurrent));
      const taskChunkPromises = taskChunk.map(task => new Promise(resolve => task(resolve)));
      tasksPromise = tasksPromise.then(() => Promise.all(taskChunkPromises));
    }

    return tasksPromise;
  }

async..await peut offrir des avantages dans ce cas:

  async run() {
    for (let i = 0; i < this.taskQueue.length; i += this.concurrent) {
      const taskChunk = this.taskQueue.slice(i, i + this.concurrent));
      const taskChunkPromises = taskChunk.map(task => new Promise(resolve => task(resolve)));
      await Promise.all(taskChunkPromises);
    }
  }
2
Estus Flask 13 avril 2018 à 23:58

Style de rappel:

run () {
    var task = this.taskQueue.unshift();
    task(() => this.run());
}
0
Ben West 14 avril 2018 à 00:46

Question interessante. J'ai donc essayé d'implémenter un runner de tâche asynchrone très simple pour JS. Je pense que cela est essentiel lors de l'envoi de courriels en vrac et autres. Pourtant, celui-ci montre particulièrement le travail sur Fetch API et je suis sûr qu'il peut facilement être implémenté pour n'importe quel travail asynchrone.

Ici, j'ai un constructeur qui nous donne une instance d'un exécuteur de tâches asynchrones dans lequel nous exécuterons un certain nombre de tâches asynchrones en tant que bloc simultanément et attendrons une période de temps donnée pour continuer avec le bloc suivant jusqu'à ce que nous soyons à court de tout. les tâches dans le taskQueue. Cependant, en attendant, nous pouvons toujours insérer de nouvelles tâches et il continuera à invoquer les tâches en tant que morceaux, y compris ceux nouvellement ajoutés. Tout au long de ce processus, nous sommes également libres de modifier l'intervalle de chaque bloc en cours de traitement.

Ce que je n'ai pas implémenté ici, c'est une gestion des erreurs appropriée, autre que .catch(console.log) et un mécanisme essayer n fois puis échouer . Qui peut simplement être implémenté à partir de une de mes réponses précédentes.

Lorsque nous alimentons la tâche asynchrone, nous avons bien sûr besoin des étapes conséquentes .then() pour terminer le travail. Dans mon abstraction, elles sont fournies dans un tableau sous forme de fonctions todo. Disons donc au total que vous devez faire 20 fetch es comme;

var url     = "https://jsonplaceholder.typicode.com/posts/",
    fetches = Array(20).fill().map((_,i) => () => fetch(`${url+(i+1)}`));

Alors vous pouvez fournir un tableau todo comme;

var todos   = [resp => resp.json(), json => console.log(json)];

Où chaque élément est un rappel pour une étape .then() conséquente comme mentionné précédemment. Le code suivant exécute les tâches par blocs de 3 initialement à des intervalles de 1000 ms, mais après 2 secondes, il passe à des intervalles de 500 ms.

function TaskRunner(tasks = [],
                    todos = [],
                    group = 1,
                    interval = 1000){

  this.interval   = interval;
  this.concurrent = group;
  this.taskQueue  = tasks;
  this.todos      = todos;
}

TaskRunner.prototype.enqueue = function(ts = []){
                                 var cps; // current promises
                                 this.taskQueue = this.taskQueue.concat(ts);
                                 cps = this.taskQueue.splice(0,this.concurrent)
                                                     .map(t => this.todos.reduce((p,td) => p.then(td), t())
                                                                         .catch(console.log));
                                 this.taskQueue.length && setTimeout(this.enqueue.bind(this), this.interval);
                               };

var url     = "https://jsonplaceholder.typicode.com/posts/",
    fetches = Array(20).fill().map((_,i) => () => fetch(`${url+(i+1)}`)),
    todos   = [resp => resp.json(), json => console.log(json)],
    goFetch = new TaskRunner();

goFetch.todos.push(...todos);
goFetch.concurrent = 2;
goFetch.enqueue(fetches);
setTimeout(() => goFetch.interval = 500, 2000);
.as-console-wrapper {
max-height: 100% !important
}
0
Redu 6 mai 2018 à 18:19