J'ai un pipeline Dagster composé de deux solides (exemple reproductible ci-dessous). Le premier (return_some_list) produit une liste de certains objets. Le deuxième solide (print_num) accepte un élément de la première liste (pas la liste complète) et effectue un traitement sur cet élément.

Comment suis-je censé appeler le deuxième solide pour chaque élément de la liste renvoyée par le premier solide? Veuillez également expliquer les meilleures pratiques.

Je ne sais pas si c'est la meilleure approche (faites le moi savoir), mais j'aimerais produire une instance de solide différente de print_num pour chaque élément de la sortie du premier solide. Cela m'aidera à paralléliser le solide à l'avenir et à mieux gérer les solides longs / gourmands en calculs.

from dagster import execute_pipeline, pipeline, solid

@solid
def return_some_list(context):
    return [1,2,3,4,5]

@solid
def print_num(context, some_num: int):
    print(some_num)
    return some_num


@pipeline
def some_pipeline():
    output_list = return_some_list()
    for some_num in output_list:
        print_num(some_num)

if __name__ == "__main__":
    result = execute_pipeline(some_pipeline)
0
cyau 12 mars 2021 à 16:50

1 réponse

Meilleure réponse

Il s'est avéré qu'il existe une fonctionnalité expérimentale (qui, espérons-le, deviendra officielle) qui permet de créer des tâches en fonction des éléments d'une sortie itérable. Le code de travail ci-dessous:

from dagster import execute_pipeline, pipeline, solid, Output, OutputDefinition
from dagster.experimental import DynamicOutput, DynamicOutputDefinition
from typing import List


@solid
def return_some_list(context):
    return [1, 2, 3, 4, 5]


@solid(output_defs=[DynamicOutputDefinition(int)])
def generate_subtasks(context, nums: List[int]):
    context.log.info(str(nums))
    for num in nums:
        yield DynamicOutput(num, mapping_key=f'subtask_{num}')


@solid
def print_num(context, some_num: int):
    context.log.info(str(some_num))
    return some_num


@pipeline
def some_pipeline():
    output_list = return_some_list()
    generate_subtasks(output_list).map(print_num)


if __name__ == "__main__":
    result = execute_pipeline(some_pipeline)

Ici, return_some_list renvoie un itérable. Nous voulons exécuter un solide pour chaque élément de cet itérable. Nous faisons cela dans le solide generate_subtasks, qui donne un DynamicOutput avec l'élément et un nom pour la sous-tâche qui sera générée pour lui. Les informations de type de DynamicOutput sont données dans le DynamicOutputDefinition de la spécification solid.

Pour connecter ces solides, nous obtenons d'abord la liste via return_some_list. Appelez ensuite generate_subtasks, qui est un générateur, et map à chacune de ses sorties la fonction print_num.

L'exécution de l'ensemble du pipeline devrait afficher un grand nombre d'informations pour chacune des sous-tâches générées par generate_subtasks, ressemblant à ceci (seule une partie de la sortie affichée):

2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - HANDLED_OUTPUT - Handled output "result" using output manager "io_manager"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - STEP_SUCCESS - Finished execution of step "print_num[subtask_4]" in 2.1ms.
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_START - Started execution of step "print_num[subtask_5]".
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - LOADED_INPUT - Loaded input "some_num" using input manager "io_manager", from output "result" of step "test"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_INPUT - Got input "some_num" of type "Int". (Type check passed).
2021-03-13 21:27:53 - dagster - INFO - system - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - print_num[subtask_5] - 5
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - HANDLED_OUTPUT - Handled output "result" using output manager "io_manager"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_SUCCESS - Finished execution of step "print_num[subtask_5]" in 1.98ms.
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - ENGINE_EVENT - Finished steps in process (pid: 33738) in 44ms
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - PIPELINE_SUCCESS - Finished execution of pipeline "some_pipeline".

Oh, et ce qui est cool: Dagster effectue une vérification de type et échoue rapidement si vous lui donnez un argument mal tapé. Donc, si nous devions fournir print_str, disons, à la fonction map, elle refuserait même de s'exécuter.

0
cyau 13 mars 2021 à 19:38