How to increase Dataflow read parallelism from Cas

2019-04-09 11:27发布

问题:

I am trying to export a lot of data (2 TB, 30kkk rows) from Cassandra to BigQuery. All my infrastructure is on GCP. My Cassandra cluster have 4 nodes (4 vCPUs, 26 GB memory, 2000 GB PD (HDD) each). There is one seed node in the cluster. I need to transform my data before writing to BQ, so I am using Dataflow. Worker type is n1-highmem-2. Workers and Cassandra instances are at the same zone europe-west1-c. My limits for Cassandra:

Part of my pipeline code responsible for reading transform is located here.

Autoscaling

The problem is that when I don't set --numWorkers, the autoscaling set number of workers in such manner (2 workers average):

Load balancing

When I set --numWorkers=15 the rate of reading doesn't increase and only 2 workers communicate with Cassandra (I can tell it from iftop and only these workers have CPU load ~60%).

At the same time Cassandra nodes don't have a lot of load (CPU usage 20-30%). Network and disk usage of the seed node is about 2 times higher than others, but not too high, I think:

And for the not seed node here:

Pipeline launch warnings

I have some warnings when pipeline is launching:

WARNING: Size estimation of the source failed: 
org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@7569ea63
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.132.9.101:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.101:9042] Cannot connect), /10.132.9.102:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.102:9042] Cannot connect), /10.132.9.103:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.103:9042] Cannot connect), /10.132.9.104:9042 [only showing errors of first 3 hosts, use getErrors() for more details])

My Cassandra cluster is in GCE local network and it seams that some queries are made from my local machine and cannot reach the cluster (I am launching pipeline with Dataflow Eclipse plugin as described here). These queries are about size estimation of tables. Can I specify size estimation by hand or launch pipline from GCE instance? Or can I ignore these warnings? Does it have effect on rate of read?

I'v tried to launch pipeline from GCE VM. There is no more problem with connectivity. I don't have varchar columns in my tables but I get such warnings (no codec in datastax driver [varchar <-> java.lang.Long]). :

WARNING: Can't estimate the size
com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [varchar <-> java.lang.Long]
        at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:741)
        at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:588)
        at com.datastax.driver.core.CodecRegistry.access$500(CodecRegistry.java:137)
        at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:246)
        at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:232)
        at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
        at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
        at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
        at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
        at com.google.common.cache.LocalCache.get(LocalCache.java:4053)
        at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
        at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
        at com.datastax.driver.core.CodecRegistry.lookupCodec(CodecRegistry.java:522)
        at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:485)
        at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:467)
        at com.datastax.driver.core.AbstractGettableByIndexData.codecFor(AbstractGettableByIndexData.java:69)
        at com.datastax.driver.core.AbstractGettableByIndexData.getLong(AbstractGettableByIndexData.java:152)
        at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:26)
        at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:95)
        at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getTokenRanges(CassandraServiceImpl.java:279)
        at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getEstimatedSizeBytes(CassandraServiceImpl.java:135)
        at org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource.getEstimatedSizeBytes(CassandraIO.java:308)
        at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.startDynamicSplitThread(BoundedReadEvaluatorFactory.java:166)
        at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:142)
        at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146)
        at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Pipeline read code

// Read data from Cassandra table
PCollection<Model> pcollection = p.apply(CassandraIO.<Model>read()
        .withHosts(Arrays.asList("10.10.10.101", "10.10.10.102", "10.10.10.103", "10.10.10.104")).withPort(9042)
        .withKeyspace(keyspaceName).withTable(tableName)
        .withEntity(Model.class).withCoder(SerializableCoder.of(Model.class))
        .withConsistencyLevel(CASSA_CONSISTENCY_LEVEL));

// Transform pcollection to KV PCollection by rowName
PCollection<KV<Long, Model>> pcollection_by_rowName = pcollection
        .apply(ParDo.of(new DoFn<Model, KV<Long, Model>>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                c.output(KV.of(c.element().rowName, c.element()));
            }
        }));

Number of splits (Stackdriver log)

W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 
W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 
W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 

What I'v tried

No effect:

  1. set read consistency level to ONE
  2. nodetool setstreamthroughput 1000, nodetool setinterdcstreamthroughput 1000
  3. increase Cassandra read concurrency (in cassandra.yaml): concurrent_reads: 32
  4. setting different number of workers 1-40.

Some effect: 1. I'v set numSplits = 10 as @jkff proposed. Now I can see in logs:

I  Murmur3Partitioner detected, splitting 
W  Can't estimate the size 
W  Can't estimate the size 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@6d83ee93 produced 10 bundles with total serialized response size 20799 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@25d02f5c produced 10 bundles with total serialized response size 19359 
I  Splitting source [0, 1) produced 1 bundles with total serialized response size 1091 
I  Murmur3Partitioner detected, splitting 
W  Can't estimate the size 
I  Splitting source [0, 0) produced 0 bundles with total serialized response size 76 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@2661dcf3 produced 10 bundles with total serialized response size 18527 

But I'v got another exception:

java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.Cassandra...
(5d6339652002918d): java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@5f18c296
    at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:582)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:347)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:183)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148)
    at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68)
    at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:336)
    at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:294)
    at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$'
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:43)
    at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl$CassandraReaderImpl.start(CassandraServiceImpl.java:80)
    at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:579)
    ... 14 more
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$'
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:144)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:186)
    at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:50)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:817)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:651)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1077)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1000)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    ... 1 more

Maybe there is a mistake: CassandraServiceImpl.java#L220

And this statement looks like mistype: CassandraServiceImpl.java#L207

Changes I'v done to CassandraIO code

As @jkff proposed, I've change CassandraIO in the way I needed:

@VisibleForTesting
protected List<BoundedSource<T>> split(CassandraIO.Read<T> spec,
                                              long desiredBundleSizeBytes,
                                              long estimatedSizeBytes) {
  long numSplits = 1;
  List<BoundedSource<T>> sourceList = new ArrayList<>();
  if (desiredBundleSizeBytes > 0) {
    numSplits = estimatedSizeBytes / desiredBundleSizeBytes;
  }
  if (numSplits <= 0) {
    LOG.warn("Number of splits is less than 0 ({}), fallback to 10", numSplits);
    numSplits = 10;
  }

  LOG.info("Number of splits is {}", numSplits);

  Long startRange = MIN_TOKEN;
  Long endRange = MAX_TOKEN;
  Long startToken, endToken;

  String pk = "$pk";
  switch (spec.table()) {
  case "table1":
          pk = "table1_pk";
          break;
  case "table2":
  case "table3":
          pk = "table23_pk";
          break;
  }

  endToken = startRange;
  Long incrementValue = endRange / numSplits - startRange / numSplits;
  String splitQuery;
  if (numSplits == 1) {
    // we have an unique split
    splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString();
    sourceList.add(new CassandraIO.CassandraSource<T>(spec, splitQuery));
  } else {
    // we have more than one split
    for (int i = 0; i < numSplits; i++) {
      startToken = endToken;
      endToken = startToken + incrementValue;
      Select.Where builder = QueryBuilder.select().from(spec.keyspace(), spec.table()).where();
      if (i > 0) {
        builder = builder.and(QueryBuilder.gte("token(" + pk + ")", startToken));
      }
      if (i < (numSplits - 1)) {
        builder = builder.and(QueryBuilder.lt("token(" + pk + ")", endToken));
      }
      sourceList.add(new CassandraIO.CassandraSource(spec, builder.toString()));
    }
  }
  return sourceList;
}

回答1:

I think this should be classified as a bug in CassandraIO. I filed BEAM-3424. You can try building your own version of Beam with that default of 1 changed to 100 or something like that, while this issue is being fixed.

I also filed BEAM-3425 for the bug during size estimation.