Running a very simple program (.NET Core 1.0.0-preview2-003121, NEST 2.4.2, Windows 7) that generates documents with random data and pushes them to an ElasticSearch 2.3.4 node running on localhost with ES_HEAP_SIZE=4g
.
The program batches document inserts and sends them using the BulkPutAsync API. It runs fine for a while, inserting well over 500K documents. Then the program seems to stall, ElasticSearch seems to stop indexing (CPU usage goes down) and finally a TaskCanceledException
is thrown. ElasticSearch logs are not showing anything suspicious, cluster state is still green.
Generator.cs:
public static void Generate(params IAdaptor[] adaptors)
{
...
for (int s = 0; s < TotalSemesters; s++)
{
...
if (responses.Count >= 10000)
{
var toInsert = responses.ToArray();
var insertions = adaptors.Select(a => a.Insert(indexName, toInsert)).ToArray();
Task.WaitAll(insertions);
responses.Clear();
}
...
}
...
}
ElasticSearchAdaptor.cs:
public async Task Insert(string indexName, ResponseRootObject[] items)
{
var objects = new List<object>();
foreach (var item in items)
{
objects.Add(new
{
index = new
{
_index = indexName,
_type = "responses"
}
});
objects.Add(item);
}
var r = await _client.BulkPutAsync<ResponseRootObject>(indexName, "responses", new PostData<object>(objects));
}
Exception:
Unhandled Exception: System.AggregateException: One or more errors occurred. (A task was canceled.) ---> Elasticsearch.Net.UnexpectedElasticsearchClientException: A task was canceled. ---> System.Threading.Tasks.TaskCanceledException: A task was canceled.
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
at System.Net.Http.HttpClient.<FinishSendAsync>d__58.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Elasticsearch.Net.HttpConnection.<RequestAsync>d__7`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Elasticsearch.Net.RequestPipeline.<CallElasticsearchAsync>d__66`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Elasticsearch.Net.Transport`1.<RequestAsync>d__15`1.MoveNext()
--- End of inner exception stack trace ---
at Elasticsearch.Net.Transport`1.<RequestAsync>d__15`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
at Xyz.BusinessIntelligence.DataGenerator.ElasticSearchAdaptor.<Insert>d__2.MoveNext()
--- End of inner exception stack trace ---
at System.Threading.Tasks.Task.WaitAll(Task[] tasks, Int32 millisecondsTimeout, CancellationToken cancellationToken)
at System.Threading.Tasks.Task.WaitAll(Task[] tasks, Int32 millisecondsTimeout)
at System.Threading.Tasks.Task.WaitAll(Task[] tasks)
at Xyz.BusinessIntelligence.DataGenerator.Generator.Generate(IAdaptor[] adaptors)
at Xyz.BusinessIntelligence.DataGenerator.Program.Main(String[] args)
Any ideas?