找到你要的答案

Q:Kafka test failing/succeding intermittently

Q:卡夫卡试验失败/成功的间歇

I am trying to write a simple test for an abstraction of the kafka scala client in kafka 0.8.2. It basically just writes a message to kafka and I then try to read it back. However, I had problems with it failing intermittantly so I boiled the test code down to the code below. This test sometimes (rarely) passes and sometimes fails. What am I doing worng?

package mykafkatest

import java.net.ServerSocket
import java.nio.file.Files
import java.util.{UUID, Properties}

import kafka.consumer.{Whitelist, ConsumerConfig, Consumer}
import kafka.producer.{ProducerConfig, Producer, KeyedMessage}
import kafka.serializer.StringDecoder
import kafka.server.KafkaConfig
import kafka.server.KafkaServerStartable
import org.apache.curator.test.TestingServer

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

class KafkaSenderTest extends org.scalatest.FunSpecLike with org.scalatest.ShouldMatchers with org.scalatest.BeforeAndAfterAll {

  import scala.concurrent.ExecutionContext.Implicits.global
  val zkServer = new TestingServer()

  val socket = new ServerSocket(0)
  val port = socket.getLocalPort.toString
  socket.close()
  val tmpDir = Files.createTempDirectory("kafka-test-logs")

  val serverProps = new Properties
  serverProps.put("broker.id", port)
  serverProps.put("log.dirs", tmpDir.toAbsolutePath.toString)
  serverProps.put("host.name", "localhost")
  serverProps.put("zookeeper.connect", zkServer.getConnectString)
  serverProps.put("port", port)

  val config = new KafkaConfig(serverProps)
  val kafkaServer = new KafkaServerStartable(config)

  override def beforeAll ={
    kafkaServer.startup()
  }

  override def afterAll = {
    kafkaServer.shutdown()
  }

  it("should put messages on a kafka queue") {
    println("zkServer: " + zkServer.getConnectString)
    println("broker port: " + port)

    val consumerProps = new Properties()
    consumerProps.put("group.id", UUID.randomUUID().toString)
    consumerProps.put("zookeeper.connect", zkServer.getConnectString)

    val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps))
    val topic = "some-topic"
    val filterSpec = new Whitelist(topic)
    val stream = consumerConnector.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head

    val producerProps = new Properties()
    producerProps.put("metadata.broker.list","localhost:"+port)

    val sender = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
    val keyedMessage = new KeyedMessage[Array[Byte], Array[Byte]](topic, "awesome message".getBytes("UTF-8"))
    sender.send(keyedMessage)

    val msg = Await.result(Future { stream.take(1) }, 5 seconds)
    msg.headOption should not be(empty)

  }
}

EDIT: I have created a new project with the following build.sbt and the above code as a test class.

name := "mykafkatest"

version := "1.0"

scalaVersion := "2.11.5"


libraryDependencies ++= Seq(
  "org.apache.kafka" %% "kafka" % "0.8.2.0",

  "org.scalatest" %% "scalatest" % "2.2.2" % "test",
  "org.apache.curator" % "curator-test" % "2.7.0" % "test"
)

And the test seem to pass more often, but it still fails intermittently...

我想写在卡夫卡0.8.2卡夫卡Scala客户抽象一个简单的测试。它基本上只是写一个信息给卡夫卡和我,然后尝试阅读它回来。然而,我曾有过失败的间歇我煮的测试代码,下面的代码问题。这个测试有时(很少)通过,有时失败。我在做什么啊?

package mykafkatest

import java.net.ServerSocket
import java.nio.file.Files
import java.util.{UUID, Properties}

import kafka.consumer.{Whitelist, ConsumerConfig, Consumer}
import kafka.producer.{ProducerConfig, Producer, KeyedMessage}
import kafka.serializer.StringDecoder
import kafka.server.KafkaConfig
import kafka.server.KafkaServerStartable
import org.apache.curator.test.TestingServer

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

class KafkaSenderTest extends org.scalatest.FunSpecLike with org.scalatest.ShouldMatchers with org.scalatest.BeforeAndAfterAll {

  import scala.concurrent.ExecutionContext.Implicits.global
  val zkServer = new TestingServer()

  val socket = new ServerSocket(0)
  val port = socket.getLocalPort.toString
  socket.close()
  val tmpDir = Files.createTempDirectory("kafka-test-logs")

  val serverProps = new Properties
  serverProps.put("broker.id", port)
  serverProps.put("log.dirs", tmpDir.toAbsolutePath.toString)
  serverProps.put("host.name", "localhost")
  serverProps.put("zookeeper.connect", zkServer.getConnectString)
  serverProps.put("port", port)

  val config = new KafkaConfig(serverProps)
  val kafkaServer = new KafkaServerStartable(config)

  override def beforeAll ={
    kafkaServer.startup()
  }

  override def afterAll = {
    kafkaServer.shutdown()
  }

  it("should put messages on a kafka queue") {
    println("zkServer: " + zkServer.getConnectString)
    println("broker port: " + port)

    val consumerProps = new Properties()
    consumerProps.put("group.id", UUID.randomUUID().toString)
    consumerProps.put("zookeeper.connect", zkServer.getConnectString)

    val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps))
    val topic = "some-topic"
    val filterSpec = new Whitelist(topic)
    val stream = consumerConnector.createMessageStreamsByFilter(filterSpec, 1, new StringDecoder, new StringDecoder).head

    val producerProps = new Properties()
    producerProps.put("metadata.broker.list","localhost:"+port)

    val sender = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
    val keyedMessage = new KeyedMessage[Array[Byte], Array[Byte]](topic, "awesome message".getBytes("UTF-8"))
    sender.send(keyedMessage)

    val msg = Await.result(Future { stream.take(1) }, 5 seconds)
    msg.headOption should not be(empty)

  }
}

EDIT: I have created a new project with the following build.sbt and the above code as a test class.

name := "mykafkatest"

version := "1.0"

scalaVersion := "2.11.5"


libraryDependencies ++= Seq(
  "org.apache.kafka" %% "kafka" % "0.8.2.0",

  "org.scalatest" %% "scalatest" % "2.2.2" % "test",
  "org.apache.curator" % "curator-test" % "2.7.0" % "test"
)

而且测试似乎越来越频繁,但它仍然间歇性地失败…

answer1: 回答1:

You may have a race condition leading to the consumer actually finishing its initialization after the message is sent, and then ignoring the message since it start at largest offset by default.

Try adding

consumerProps.put("auto.offset.reset", "smallest")

to your consumer properties

您可能有一个竞争条件,导致消费者在发送消息后实际上完成其初始化,然后忽略该消息,因为它在默认情况下以最大偏移开始。

尝试添加

consumerProps.put("auto.offset.reset", "smallest")

您的消费属性

answer2: 回答2:

I think this is some sort of message buffering issue. If you send 200 messages this works (for me):

(1 to 200).foreach(i => sender.send(keyedMessage))

199 messages fails. I tried changing configs around but couldn't find any magic to make 1 message work, though I'm sure there's some set of configs that could make this work.

我认为这是一种消息缓冲问题。如果你发送200条消息,这(对我来说):

(1 to 200).foreach(i => sender.send(keyedMessage))

199消息失败。我试图改变周围的配置却找不到任何的魔法使1的消息,但我相信会有一些设置的配置,可以使这项工作。

scala  scalatest  apache-kafka