1

I'm trying to deploy a kafka broker on a docker container on a mesos cluster.

Specifically, I have a mesos cluster where I deploy various docker containers using marathon as init system. All the containers have service ports and are accessible via a proxy (HAproxy).

Problem

When I deploy a kafka container using marathon, I can create a topic, list all topics, but cannot run the produce/consume command. The produce command gives me the following error

[2016-01-18 11:10:09,926] WARN Failed to send producer request with correlation id 11 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

The docker image I am using is spotify/kafka it has zookeeper and kafka preinstalled. This image runs fine when I run it with docker run command.

I'm using the following marathon json file to deploy the container:

{
    "id": "spotify-kafka.marathon", 
    "cmd": "export ADVERTISED_HOST=$HOST; export ADVERTISED_PORT=9092; env; supervisord -n",
    "container": {
    "type": "DOCKER",
    "docker": {
            "image": "spotify/kafka",
            "network": "BRIDGE",
            "portMappings": [
                    {"containerPort": 2181, "hostPort": 0, "servicePort": 20000},
                    {"containerPort": 9092, "hostPort": 0, "servicePort": 20500}
          ]
    }
},
    "cpus": 0.5,
    "mem": 1024.0,
    "instances": 1
}

The cmd exports some env vars which set the internal host ip and port. The external ports are random and get caught by HAproxy which routes them to static ones.

The commands I am using to talk to kafka are from the documentation:

https://kafka.apache.org/documentation.html#quickstart

I have also used other images such as ches/kafka, wurstmeister/kafka and one I created myself. I have also found https://github.com/mesos/kafka which after built, you can send commands to port 7000 and deploy brokers to a cluster, which failed for me. Ideally I would like an image which already has zookeeper and kafka like the spotify image.

Update 1
So I changed the marathon JSON file and exported some more variables which seem to be needed. The final JSON is shown bellow

{
    "id": "spotify-kafka.marathon", 
    "cmd": "export ADVERTISED_HOST=$HOST; export ADVERTISED_PORT=9092; export PORT_9092=9092; export PORT=2181; export PORT0=2181; export PORT1=9092; export PORT_2181=2181 ; env; supervisord -n",
    "container": {
    "type": "DOCKER",
    "docker": {
            "image": "192.168.1.235:5000/spotify-kafka",
            "network": "BRIDGE",
            "portMappings": [
                    {"containerPort": 2181, "hostPort": 0, "servicePort": 20000},
                    {"containerPort": 9092, "hostPort": 0, "servicePort": 20500}
          ]
    }
},
    "cpus": 0.5,
    "mem": 1024.0,
    "instances": 1
}

This change gave me a different result when I tried to produce a message.

[2016-01-19 11:02:09,297] WARN Error while fetching metadata     [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,309] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,310] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,416] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,422] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,422] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,528] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,533] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,533] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,639] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,644] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,644] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,750] WARN Error while fetching metadata [{TopicMetadata for topic test -> 
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-01-19 11:02:09,750] ERROR Failed to send requests for topics test with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
[2016-01-19 11:02:09,751] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
    at scala.collection.immutable.Stream.foreach(Stream.scala:547)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

Update 2 - Solution
So digging around in the web, I found this repository https://github.com/tobilg/docker-kafka-marathon/ This guy created a shell script which automatically creates the properties file for you. Also you can scale this container and have multiple instances of kafka broker. The only downside for me is that it relies on an external zookeeper server, but I don't think that will be a problem to solve by installing it in the image.

So I mark this as solved.

ipetrousov
  • 11
  • 5

0 Answers0