I'm attempting to load a large amount of data into a 10-node Cassandra ring.
The script doing the inserts gets ~4000 inserts / s, blocked presumably on
network I/O. I launch 8 of these on a single machine, and the throughput scales
almost linearly. (The individual throughput goes down slightly, but is more
than compensated for by the additional processes.)
This works decently, however, I'm still not getting enough throughput, so I
launched the same setup on 3 more VMs. (Thus, 8 processes * 4 VM) After the
first additional VM, and with increasing frequency and severity as further VM
are added, the following occurs:
- The clients start receiving Timeout errors. They can re-try their writes, but
because they do so in batches, their forward progress is almost entirely
eliminated.
- The ring becomes unstable, and nodes start labelling themselves as "down".
Further, different nodes tend to have different ideas of who is down. The
ring doesn't recover when the scripts are aborted. (I've not even been able
to fix this by just restarting individual nodes: I've had to restart the
entire ring.)
"Down" varies. In my last run:
- 4 nodes died completely. (Cassandra wasn't running at all.) Checking the logs, there didn't appear to be anything logged as to why it died.
- On the fifth, Cassandra was running.
nodetool status
on that node hangs. Two threads appears to be in infinite loops of some sort. (They're using 100% CPU solidly.) There is a java.lang.OutOfMemoryError: Java heap space
in the logs.
The code is essentially:
def prepped_batch_insert(session, items, insert_query, silent=False):
# A mapping of number of inserts -> a prepared query for that number of
# inserts.
prepped_statements = {}
def get_prepped_statement(inserts):
if inserts in prepped:
# We already created a prepared query for this many inserts, use
# it:
return prepped_statements[inserts]
else:
# We haven't yet created a prepared query for this many inserts, so
# do so now:
query = ['BEGIN UNLOGGED BATCH']
for idx in xrange(inserts):
query.append(insert_query.query)
query.append('APPLY BATCH;')
query = '\n'.join(query)
ps = session.prepare(query)
prepped_statements[inserts] = ps
return ps
def do_prepped_batch_insert(batch)
ps = get_prepped_statement(len(batch))
# Generate the list of params to the prepared query:
params = []
for idx, item in enumerate(batch):
for k in insert_query.keyorder:
params.append(item[k])
# Do it.
session.execute(ps, params)
return inserter.insert_and_time(
items, # data generator
do_prepped_batch_insert, # The above function
_WHAT_APPEARS_TO_BE_THE_OPTIMAL_CASSANDRA_BATCH_SIZE, # = 200
silent=silent,
)
The function insert_and_time
splits items
up into batches of size 200, calls the above function, and times the whole kit and kaboodle. (This code is toxic to the ring.)
We attempted more reads because (I was told) 20k inserts / second was slow (it will take a while to insert the data I'd like to insert at that rate…), and that Cassandra was capable of high capacity.
My questions:
- Is there anything unusual about what I'm doing? Anything wrong?
- Am I simply DDoS-ing my ring?
- How can I go about debugging what's wrong?
- An errant client, IMHO, should never be able to kill the server. (And the above isn't terribly errant.) Anything I can do to prevent this?
¹The client appears to also slowly leak file descriptors. I don't think this is related. (I'm calling .shutdown
on both the cluster and the connection.) Looking at the driver source, there appear to be plenty of pathways where an exception would cause a leak.
It looks very much like you are DDoS-ing your setup.
The script doing the inserts gets ~4000 inserts / s, blocked presumably on network I/O. I launch 8 of these on a single machine, and the throughput scales almost linearly
You were not blocked on network IO if launching additional script instances that access Cassandra through the same NIC gives a near linear increase in throughput.
An errant client, IMHO, should never be able to kill the server.
Throw enough load at any server and it will begin to fail. Some servers attempt to minimize this e.g. web servers will typically accept a maximum number of concurrent HTTP requests after which they will reply that they are busy. However, even processing the connection and telling the client "go away for now" takes some cycles. Proper DoS protection involves dedicated network hardware.
(They're using 100% CPU solidly.) There is a java.lang.OutOfMemoryError: Java heap space
More evidence that you are simply loading your ring beyond its capacity.
To improve throughput, have a look at the hardware and Cassandra configuration.
- Is IO overloaded? That is a typical choke point for many persistence systems. I don't have specific experience with Cassandra, but with Mongo DB IO performance is hugely important as soon as the working set cannot fit in memory. If it is overloaded, consider faster IO subsystems (if this is e.g. AWS, there are various means to improve IO performance).
- Do you have enough RAM? You mentioned Java is running out of heap. Have you set the heap size to the maximum you can on each node, allowing that the OS and perhaps other processes on the system need some RAM?
- Is the CPU overloaded? Seems like yes. However I would look at CPU last, after considering appropriate disk and network IO performance, and reviewing the Cassandra configuration. High CPU can sometimes be a symptom of other things.
- Verify the Cassandra configuration. I'm no expert on that, but have a look at the configuration that Netflix used when benchmarking Cassandra performance http://techblog.netflix.com/2011/11/benchmarking-cassandra-scalability-on.html
Your situation sounds unusual but without any details about the hardware you are running on here is some speculation I have. The issue is most likely Heap size, followed by IO bottlenecking. Unless you are running on SSD's CPU usage should not be an issue.
1) If you are looking for a one time load of data followed by a smaller consistent stream of data consider using the bulk loading tool.
2) Possibly in the sense that you are attempt to load data a rate faster than the hardware you are using can handle.
3) You should take a look in to the Cassandra system logs for messages like "trying to flush memtable to recover space" that is symptomatic of running out of heap, they will have information about memory GC and other on going tasks. For real-time monitoring you can also connect via JMX using jconsole or visualvm to your Cassandra instances. When looking at these it should be obvious if heap begins to fill up and the system begins to back up. Most production Cassandra instances have a Heap size of 8GB with amounts larger than that giving diminishing returns as stop the world GC events become more prevalent.
The other thing you should be watching is pending compactions, this is one of the key IO bottlenecks of Cassandra. If this number grows without bound it means your system is limited by the hard drive speed and you can ease the stress with more machines, or by upgrading to SSDs.
Check this with
nodetool compactionstats
A tool you can use to monitor all of this is Datastax Opscenter. This tool will allow you to easily monitor your entire cluster from one place and the community edition is completely free.
I do wonder if something else is amiss though because I regularly benchmark Amazon m1.large instances and find that about 10 of these can support traffic in the 40~50k writes / s without any sort of system instability.
4) As noted by Eric, It is very difficult for a distributed performance oriented system like Cassandra to remain available and performant while also upholding guardrails against client behavior. The tradeoff was increased speed for minimal checking of the system state when writes occur. This allows for extremely fast writes but puts the onus on the maintainer to properly provision and monitor their system.