I am getting events from Kafka, enriching/filtering/transforming them on Spark and then storing them in ES. I am committing back the offsets to Kafka
I have two questions/problems:
(1) My current Spark job is VERY slow
I have 50 partitions for a topic and 20 executors. Each executor has 2 cores and 4g of memory each. My driver has 8g of memory. I am consuming 1000 events/partition/second and my batch interval is 10 seconds. This means, I am consuming 500000 events in 10 seconds
My ES cluster is as follows:
20 shards / index
3 master instances c5.xlarge.elasticsearch
12 instances m4.xlarge.elasticsearch
disk / node = 1024 GB so 12 TB in total
And I am getting huge scheduling and processing delays
(2) How can I commit offsets on executors?
Currently, I enrich/transform/filter my events on executors and then send everything to ES using BulkRequest. It's a synchronous process. If I get positive feedback, I send the offset list to driver. If not, I send back an empty list. On the driver, I commit offsets to Kafka. I believe, there should be a way, where I can commit offsets on executors but I don't know how to pass kafka Stream to executors:
((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, this::onComplete);
This is the code for committing offsets to Kafka which requires Kafka Stream
Here is my overall code:
kafkaStream.foreachRDD( // kafka topic
rdd -> { // runs on driver
String batchIdentifier =
LOGGER.info("@@ [" + batchIdentifier + "] Starting batch ...");
Instant batchStart = Instant.now();
List<OffsetRange> offsetsToCommit =
rdd.mapPartitionsWithIndex( // kafka partition
(index, eventsIterator) -> { // runs on worker
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
"@@ Consuming " + offsetRanges[index].count() + " events" + " partition: " + index
if (!eventsIterator.hasNext()) {
return Collections.emptyIterator();
// get single ES documents
List<SingleEventBaseDocument> eventList = getSingleEventBaseDocuments(eventsIterator);
// build request wrappers
List<InsertRequestWrapper> requestWrapperList = getRequestsToInsert(eventList, offsetRanges[index]);
"@@ Processed " + offsetRanges[index].count() + " events" + " partition: " + index + " list size: " + eventList.size()
BulkResponse bulkItemResponses = elasticSearchRepository.addElasticSearchDocumentsSync(requestWrapperList);
if (!bulkItemResponses.hasFailures()) {
return Arrays.asList(offsetRanges).iterator();
return Collections.emptyIterator();
"@@ [" + batchIdentifier + "] Collected all offsets in " + (Instant.now().toEpochMilli() - batchStart.toEpochMilli()) + "ms"
OffsetRange[] offsets = new OffsetRange[offsetsToCommit.size()];
for (int i = 0; i < offsets.length ; i++) {
offsets[i] = offsetsToCommit.get(i);
try {
} catch (Exception e) {
// ignore
"@@ [" + batchIdentifier + "] Finished batch of " + offsetsToCommit.size() + " messages " +
"in " + (Instant.now().toEpochMilli() - batchStart.toEpochMilli()) + "ms"
You can move the offset logic above the rdd loop ... I am using below template for better offset handling and performance