J'ai déjà commencé à apprendre Kafka. Essayer des opérations de base dessus. J'ai collé sur un point qui concerne les «courtiers».

Mon kafka fonctionne mais quand je veux créer une partition.

 from kafka import TopicPartition
(ERROR THERE) consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
 consumer.assign([TopicPartition('foobar', 2)])
 msg = next(consumer)

traceback (dernier appel le plus récent): Fichier "", ligne 1, dans Fichier "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", ligne 284, dans init self._client = KafkaClient (métriques = self._metrics, ** self.config) Fichier "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", ligne 202, dans init self.config ['api_version'] = self.check_version (timeout = check_timeout) Fichier "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", ligne 791, dans check_version générer des erreurs.NoBrokersAvailable () kafka.errors.NoBrokersAvailable: NoBrokersAvailable

7
Beyhan Gül 9 août 2016 à 18:34

5 réponses

Meilleure réponse

Vous ne pouvez pas créer de partitions au sein d'un consommateur. Les partitions sont créées lorsque vous créez un sujet. Par exemple, en utilisant l'outil de ligne de commande:

bin/kafka-topics.sh \
  --zookeeper localhost:2181 \
  --create --topic myNewTopic \
  --partitions 10 \
  --replication-factor 3

Cela crée une nouvelle rubrique "myNewTopic" avec 10 partitions (numérotées de 0 à 9) et le facteur de réplication 3. (voir http://docs.confluent.io/3.0.0/kafka/post-deployment.html#admin-operations et https://kafka.apache.org/documentation.html#quickstart_createtopic)

Au sein de votre consommateur, si vous appelez assign(), cela signifie que vous souhaitez consommer la partition correspondante et cette partition doit déjà exister.

4
Matthias J. Sax 10 août 2016 à 08:06

Le problème pour moi était la règle de pare-feu lorsque j'exécute Kafka sur Google Cloud .

Cela fonctionnait pour moi hier et je me grattais la tête aujourd'hui pendant 1 heure en pensant pourquoi cela ne fonctionne pas maintenant.

Étant donné que l'adresse IP publique de mon système local change à chaque fois que je me connecte à un autre LAN ou WiFi, j'ai dû autoriser l'IP publique de mon système local dans les règles de pare-feu. Je suggère d'utiliser une machine avec une adresse IP publique fixe.

Ces petits changements dans les configurations prennent trop de temps pour déboguer et les corriger. Je me sentais comme perdu une heure pour ça.

1
Amit Yadav 13 févr. 2020 à 06:49

Il semble que vous souhaitiez commencer à consommer des messages au lieu de créer des partitions. Néanmoins - pouvez-vous atteindre Kafka au port 1234? 9092 est le port par défaut de kafkas, vous pouvez peut-être essayer celui-ci. Si vous avez trouvé le bon port mais que votre application génère toujours des erreurs, vous pouvez essayer d'utiliser un consommateur de console pour tester votre configuration:

bin/kafka-console-producer.sh --broker-list localhost:<yourportnumber> --topic foobar

Le consommateur de console fait partie de la distribution standard de kafka. Peut-être que cela vous rapproche un peu plus de la source du problème.

2
TobiSH 9 août 2016 à 20:01

Je ne sais pas si cette réponse est toujours pertinente, mais a récemment résolu ce même problème dans un courtier VM VBox non accessible à partir du système d'exploitation Windows hôte. Puisque vous avez mentionné bootsrap_servers dans KafkaConsumer, je suppose que vous utilisez au moins kafka 0.10.0.0

Veuillez rechercher la propriété advertised.listeners dans le fichier server.properties et la définir sur PLAINTEXT://localhost:9092 ou PLAINTEXT://<broker_ip>:9092

Mais avant de définir cela, assurez-vous que votre courtier est accessible à partir de l'environnement dans lequel votre consommateur s'exécute (en faisant ping localhost).

En outre, vous devez redémarrer le serveur kafka et le consommateur / producteur (quel que soit le fonctionnement) et essayer d'envoyer / recevoir.

Par exemple, si vous exécutez une machine virtuelle, vous souhaiterez peut-être utiliser l'adaptateur hôte uniquement pour rendre le courtier accessible à partir de la machine hôte.

REMARQUE: Cette configuration fonctionne pour Kafka Server> = 0.10.X.X mais pas pour 0.8.2.X. N'ont pas vérifié 0.9.0.X

2
somnathchakrabarti 27 sept. 2017 à 09:57

J'ai eu la même erreur lors de la diffusion de kafka. Le code ci-dessous a résolu mon erreur: nous devons définir la version de l'API dans KafkaProducer.

KafkaProducer(bootstrap_servers=['localhost:9092'],
api_version=(0,11,5),
value_serializer=lambda x: dumps(x).encode('utf-8'))
1
user1934212 22 oct. 2019 à 09:49