J'essaie de faire passer mon travail Flink (v1.8 exécuté sur EMR) de l'utilisation de BucketingSink au nouveau StreamingFileSink.

J'ai le nouveau code en cours d'exécution et presque tout semble bon. Les fichiers sont écrits sur S3 et transférés pour terminer. Le seul problème est que l'ACL de S3 n'est pas définie comme avec l'ancien code.

J'ai mon core-site.xml ensemble comme ça

<configuration>

    <property>
        <name>fs.s3a.acl.default</name>
        <value>BucketOwnerFullControl</value>
    </property>

</configuration>

J'utilise également s3a:// comme préfixe du chemin dans l'argument forRowFormat() du générateur StreamingFileSink.

De plus, lors du passage à StreamingFileSink, j'ai dû ajouter une nouvelle dépendance à mon build.gradle

flinkShadowJar "org.apache.flink:flink-s3-fs-hadoop:${flinkVersion}"

Je ne sais pas très bien comment j'écrivais sur S3 en utilisant le préfixe s3a:// sans ce pot lorsque j'utilisais l'API BucketingSink. D'une manière ou d'une autre, j'écris maintenant sur S3 d'une manière qui ne respecte pas mes paramètres core-site.xml.

2
jlunavtgrad 25 févr. 2020 à 00:40

1 réponse

Meilleure réponse

J'ai découvert par beaucoup d'essais et d'erreurs que l'ajout de la ligne suivante à mon flink-conf.yml a résolu ce problème.

fs.s3a.acl.default: BucketOwnerFullControl
0
jlunavtgrad 26 févr. 2020 à 23:01