I am trying to export a lot of data (2 TB, 30kkk rows) from Cassandra to BigQuery. All my infrastructure is on GCP. My Cassandra cluster have 4 nodes (4 vCPUs, 26 GB memory, 2000 GB PD (HDD) each). There is one seed node in the cluster. I need to transform my data before writing to BQ, so I am using Dataflow. Worker type is n1-highmem-2
. Workers and Cassandra instances are at the same zone europe-west1-c
. My limits for Cassandra:
Part of my pipeline code responsible for reading transform is located here.
Autoscaling
The problem is that when I don't set --numWorkers
, the autoscaling set number of workers in such manner (2 workers average):
Load balancing
When I set --numWorkers=15
the rate of reading doesn't increase and only 2 workers communicate with Cassandra (I can tell it from iftop
and only these workers have CPU load ~60%).
At the same time Cassandra nodes don't have a lot of load (CPU usage 20-30%). Network and disk usage of the seed node is about 2 times higher than others, but not too high, I think:
And for the not seed node here:
Pipeline launch warnings
I have some warnings when pipeline is launching:
WARNING: Size estimation of the source failed:
org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@7569ea63
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.132.9.101:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.101:9042] Cannot connect), /10.132.9.102:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.102:9042] Cannot connect), /10.132.9.103:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.103:9042] Cannot connect), /10.132.9.104:9042 [only showing errors of first 3 hosts, use getErrors() for more details])
My Cassandra cluster is in GCE local network and it seams that some queries are made from my local machine and cannot reach the cluster (I am launching pipeline with Dataflow Eclipse plugin as described here). These queries are about size estimation of tables. Can I specify size estimation by hand or launch pipline from GCE instance? Or can I ignore these warnings? Does it have effect on rate of read?
I'v tried to launch pipeline from GCE VM. There is no more problem with connectivity. I don't have varchar columns in my tables but I get such warnings (no codec in datastax driver [varchar <-> java.lang.Long]). :
WARNING: Can't estimate the size
com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [varchar <-> java.lang.Long]
at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:741)
at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:588)
at com.datastax.driver.core.CodecRegistry.access$500(CodecRegistry.java:137)
at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:246)
at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:232)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
at com.google.common.cache.LocalCache.get(LocalCache.java:4053)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
at com.datastax.driver.core.CodecRegistry.lookupCodec(CodecRegistry.java:522)
at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:485)
at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:467)
at com.datastax.driver.core.AbstractGettableByIndexData.codecFor(AbstractGettableByIndexData.java:69)
at com.datastax.driver.core.AbstractGettableByIndexData.getLong(AbstractGettableByIndexData.java:152)
at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:26)
at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:95)
at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getTokenRanges(CassandraServiceImpl.java:279)
at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getEstimatedSizeBytes(CassandraServiceImpl.java:135)
at org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource.getEstimatedSizeBytes(CassandraIO.java:308)
at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.startDynamicSplitThread(BoundedReadEvaluatorFactory.java:166)
at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:142)
at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146)
at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Pipeline read code
// Read data from Cassandra table
PCollection<Model> pcollection = p.apply(CassandraIO.<Model>read()
.withHosts(Arrays.asList("10.10.10.101", "10.10.10.102", "10.10.10.103", "10.10.10.104")).withPort(9042)
.withKeyspace(keyspaceName).withTable(tableName)
.withEntity(Model.class).withCoder(SerializableCoder.of(Model.class))
.withConsistencyLevel(CASSA_CONSISTENCY_LEVEL));
// Transform pcollection to KV PCollection by rowName
PCollection<KV<Long, Model>> pcollection_by_rowName = pcollection
.apply(ParDo.of(new DoFn<Model, KV<Long, Model>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element().rowName, c.element()));
}
}));
Number of splits (Stackdriver log)
W Number of splits is less than 0 (0), fallback to 1
I Number of splits is 1
W Number of splits is less than 0 (0), fallback to 1
I Number of splits is 1
W Number of splits is less than 0 (0), fallback to 1
I Number of splits is 1
What I'v tried
No effect:
- set read consistency level to ONE
nodetool setstreamthroughput 1000
,nodetool setinterdcstreamthroughput 1000
- increase Cassandra read concurrency (in
cassandra.yaml
):concurrent_reads: 32
- setting different number of workers 1-40.
Some effect: 1. I'v set numSplits = 10 as @jkff proposed. Now I can see in logs:
I Murmur3Partitioner detected, splitting
W Can't estimate the size
W Can't estimate the size
W Number of splits is less than 0 (0), fallback to 10
I Number of splits is 10
W Number of splits is less than 0 (0), fallback to 10
I Number of splits is 10
I Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@6d83ee93 produced 10 bundles with total serialized response size 20799
I Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@25d02f5c produced 10 bundles with total serialized response size 19359
I Splitting source [0, 1) produced 1 bundles with total serialized response size 1091
I Murmur3Partitioner detected, splitting
W Can't estimate the size
I Splitting source [0, 0) produced 0 bundles with total serialized response size 76
W Number of splits is less than 0 (0), fallback to 10
I Number of splits is 10
I Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@2661dcf3 produced 10 bundles with total serialized response size 18527
But I'v got another exception:
java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.Cassandra...
(5d6339652002918d): java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@5f18c296
at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:582)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:347)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:183)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148)
at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68)
at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:336)
at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:294)
at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$'
at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)
at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:43)
at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl$CassandraReaderImpl.start(CassandraServiceImpl.java:80)
at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:579)
... 14 more
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$'
at com.datastax.driver.core.Responses$Error.asException(Responses.java:144)
at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)
at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:186)
at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:50)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:817)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:651)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1077)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1000)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
Maybe there is a mistake: CassandraServiceImpl.java#L220
And this statement looks like mistype: CassandraServiceImpl.java#L207
Changes I'v done to CassandraIO code
As @jkff proposed, I've change CassandraIO in the way I needed:
@VisibleForTesting
protected List<BoundedSource<T>> split(CassandraIO.Read<T> spec,
long desiredBundleSizeBytes,
long estimatedSizeBytes) {
long numSplits = 1;
List<BoundedSource<T>> sourceList = new ArrayList<>();
if (desiredBundleSizeBytes > 0) {
numSplits = estimatedSizeBytes / desiredBundleSizeBytes;
}
if (numSplits <= 0) {
LOG.warn("Number of splits is less than 0 ({}), fallback to 10", numSplits);
numSplits = 10;
}
LOG.info("Number of splits is {}", numSplits);
Long startRange = MIN_TOKEN;
Long endRange = MAX_TOKEN;
Long startToken, endToken;
String pk = "$pk";
switch (spec.table()) {
case "table1":
pk = "table1_pk";
break;
case "table2":
case "table3":
pk = "table23_pk";
break;
}
endToken = startRange;
Long incrementValue = endRange / numSplits - startRange / numSplits;
String splitQuery;
if (numSplits == 1) {
// we have an unique split
splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString();
sourceList.add(new CassandraIO.CassandraSource<T>(spec, splitQuery));
} else {
// we have more than one split
for (int i = 0; i < numSplits; i++) {
startToken = endToken;
endToken = startToken + incrementValue;
Select.Where builder = QueryBuilder.select().from(spec.keyspace(), spec.table()).where();
if (i > 0) {
builder = builder.and(QueryBuilder.gte("token(" + pk + ")", startToken));
}
if (i < (numSplits - 1)) {
builder = builder.and(QueryBuilder.lt("token(" + pk + ")", endToken));
}
sourceList.add(new CassandraIO.CassandraSource(spec, builder.toString()));
}
}
return sourceList;
}