Je souhaite tester ma topologie Kafka Streams avec un test unitaire à l'aide de kafka-streams-test-utils. J'utilise cette bibliothèque depuis plus longtemps et j'ai déjà construit une couche abstraite autour de mes tests en utilisant TestNG. Mais depuis que j'ai ajouté un merge(...) à mon flux, j'ai eu l'exception suivante:

 org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=my-topic-2, partition=0, offset=0
 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:318)
at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:393)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: com.MyKey / value type: com.MyValue). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
... 3 more
Caused by: java.lang.ClassCastException: class com.MyKey cannot be cast to class [B (com.MyValue is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:156)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 15 more

Voici la partie comment je construis le Stream avec le StreamBuilder de TopologyTestDriver:

// Block 1
KStream<MyKey, MyValue> stream2 = streamsBuilder.stream(
    "my-topic-2",
    consumedAs(OtherKey.class, OtherValue.class, AllowEmpty.NONE) // Provides default json Serde
).flatMap(
    (key, value) -> {
        List<KeyValue<MyKey, MyValue>> list = new ArrayList<>();
        // Do stuff an fill out the list
        return list;
    })
 .through("tmp-topic");

// Block 2
KStream<MyKey, MyValue>[] branches = stream1
    .merge(stream2)
    ... business stuff

Pour produire des messages sur le sujet source, j'utilise TopologyTestDriver.pipeInput(...) initialisé avec JsonSerDes. L'exception se produit en castant le ByteArray, mais je ne sais pas pourquoi le paramètre attendu de ByteArraySerializer est la même classe mais d'un autre module que la classe consommée chargée. Ils peuvent également être chargés par un autre ClassLoaders. Mais il n'y a pas de pile Spring en arrière-plan et tout devrait fonctionner de manière synchrone.

Je suis vraiment confus à propos de ce comportement.

Apache Kafka Dependecies a la version: 2.0.1 et j'utilise openjdk-11 . Est-il possible d'aligner le chargement de classe des sérialiseurs? L'erreur se produit uniquement, si je produis quelque chose sur: my-topic-2 , l'autre sujet de la fusion fonctionne correctement.

1
Norbert Koch 21 avril 2020 à 14:01

2 réponses

Meilleure réponse

Comme mentionné par @bbejeck, vous devrez utiliser un version différente de .through(), celle qui vous permet de remplacer par défaut (ByteArraySerde ) serdes appliqués à K, V.

KStream<K,V> through​(java.lang.String topic,
                     Produced<K,V> produced) 

Matérialisez ce flux en un sujet et créez un nouveau KStream à partir du sujet à l'aide de l'instance Produced pour la configuration de key serde, value serde et StreamPartitioner. ... Cela équivaut à appeler to(someTopic, Produced.with(keySerde, valueSerde) et StreamsBuilder # stream (someTopicName, Consumed.with (keySerde, valueSerde)).

2
mazaneicha 21 avril 2020 à 19:36

Sans voir tout votre code, je ne peux pas le dire avec certitude, mais voici ce que je pense qu'il pourrait se passer.

Fournir aux Serdes Consumed fournit uniquement la dé / sérialisation lors de la consommation des enregistrements de la rubrique d'entrée; Kafka Streams ne les propage pas dans le reste de la topologie. À tout moment, si un Serde est à nouveau requis, Kafka Streams utilise ceux fournis dans le StreamsConfig. Le Serdes.ByteArraySerde est la valeur par défaut.

Je suggérerais deux choses à essayer:

  1. Utilisez Produced.with(keySerde, valueSerde) dans vos nœuds récepteurs
  2. Fournissez le Serde pour votre type via le StreamsConfig.

HTH, et laissez-moi savoir comment les choses fonctionnent.

-Facture

3
bbejeck 21 avril 2020 à 15:16