Python: how to mock a kafka topic for unit tests?

2019-06-19 02:10发布

问题:

We have a message scheduler that generates a hash-key from the message attributes before placing it on a Kafka topic queue with the key.

This is done for de-duplication purposes. However, I am not sure how I could possibly test this deduplication without actually setting up a local cluster and checking that it is performing as expected.

Searching online for tools for mocking a Kafka topic queue has not helped, and I am concerned that I am perhaps thinking about this the wrong way.

Ultimately, whatever is used to mock the Kafka queue, should behave the same way as a local cluster - i.e. provide de-deuplication with Key inserts to a topic queue.

Are there any such tools?

回答1:

If you need to verify a kafka specific feature, or implementation with a kakfa specific feature, then the only way to do it is by using kakfa!

Does kafka have any tests around its deduplication logic? If so, the combination of the following maybe enough to mitigate your organizations perceived risks of failure:

  • unit tests of your hash logic (make sure that the same object does indeed generate the same hash)
  • kafka topic deduplication tests (internal to kafka project)
  • preflight smoke tests verifying your apps integration with kafka

If kafka does NOT have any sort of tests around its topic deduplication or you are concerned about breaking changes, then it is important to have automated checks around kafka specific functionality. This can be done through integration tests. I have had much success recently with docker based integration test pipelines. Once the initial legwork of creating a kafka docker image (one is probably already available from the community) it becomes trivial to setup integration test pipelines. A pipeline could look like:

  • application based unit tests are executed, (hash logic)
  • once those pass, your CI server starts up kafka
  • integration tests are executed, verifying that duplicate writes only emit a single message to a topic.

I think the important thing is to make sure kafka integration tests are minimized to ONLY include tests that absolutely rely on kafka specific functionality. Even using docker compose they may be orders of magnitude slower than unit tests, ~1ms vs 1 second? Another thing to consider is the overhead of maintaining an integration pipeline may be worth the risk of trusting that kakfa will provide the topic deduplication that it claims to.



回答2:

To mock Kafka uder Python unit tests with SBT test tasks I did as below. Pyspark should be installed.

in build.sbt define task that should be run with tests:

val testPythonTask = TaskKey[Unit]("testPython", "Run python tests.")

val command = "python3 -m unittest app_test.py"
val workingDirectory = new File("./project/src/main/python")

testPythonTask := {
  val s: TaskStreams = streams.value
  s.log.info("Executing task testPython")
  Process(command,
    workingDirectory,
    // arguments for using org.apache.spark.streaming.kafka.KafkaTestUtils in Python
    "PYSPARK_SUBMIT_ARGS" -> "--jars %s pyspark-shell"
      // collect all jar paths from project
      .format((fullClasspath in Runtime value)
      .map(_.data.getCanonicalPath)
        .filter(_.contains(".jar"))
        .mkString(",")),
    "PYSPARK_PYTHON" -> "python3") ! s.log
}

//attach custom test task to default test tasks
test in Test := {
  testPythonTask.value
  (test in Test).value
}

testOnly in Test := {
  testPythonTask.value
  (testOnly in Test).value
}

in python testcase (app_test.py):

import random
import unittest
from itertools import chain

from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.tests import PySparkStreamingTestCase

class KafkaStreamTests(PySparkStreamingTestCase):
    timeout = 20  # seconds
    duration = 1

    def setUp(self):
        super(KafkaStreamTests, self).setUp()

        kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
            .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
        self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
        self._kafkaTestUtils.setup()

    def tearDown(self):
        if self._kafkaTestUtils is not None:
            self._kafkaTestUtils.teardown()
            self._kafkaTestUtils = None

        super(KafkaStreamTests, self).tearDown()

    def _randomTopic(self):
        return "topic-%d" % random.randint(0, 10000)

    def _validateStreamResult(self, sendData, stream):
        result = {}
        for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
                                                   sum(sendData.values()))):
            result[i] = result.get(i, 0) + 1

        self.assertEqual(sendData, result)

    def test_kafka_stream(self):
        """Test the Python Kafka stream API."""
        topic = self._randomTopic()
        sendData = {"a": 3, "b": 5, "c": 10}

        self._kafkaTestUtils.createTopic(topic)
        self._kafkaTestUtils.sendMessages(topic, sendData)

        stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
                                         "test-streaming-consumer", {topic: 1},
                                         {"auto.offset.reset": "smallest"})
        self._validateStreamResult(sendData, stream)

More examples for Flume, Kinesis and other in pyspark.streaming.tests module.