ElasticSearch .NET Core TaskCanceledException

2019-09-09 21:56发布

问题:

Running a very simple program (.NET Core 1.0.0-preview2-003121, NEST 2.4.2, Windows 7) that generates documents with random data and pushes them to an ElasticSearch 2.3.4 node running on localhost with ES_HEAP_SIZE=4g.

The program batches document inserts and sends them using the BulkPutAsync API. It runs fine for a while, inserting well over 500K documents. Then the program seems to stall, ElasticSearch seems to stop indexing (CPU usage goes down) and finally a TaskCanceledException is thrown. ElasticSearch logs are not showing anything suspicious, cluster state is still green.

Generator.cs:

public static void Generate(params IAdaptor[] adaptors)
{
    ...
    for (int s = 0; s < TotalSemesters; s++)
    {
        ...
        if (responses.Count >= 10000)
        {
            var toInsert = responses.ToArray();
            var insertions = adaptors.Select(a => a.Insert(indexName, toInsert)).ToArray();
            Task.WaitAll(insertions);
            responses.Clear();
        }
        ...
    }
    ...
}

ElasticSearchAdaptor.cs:

public async Task Insert(string indexName, ResponseRootObject[] items)
{
    var objects = new List<object>();
    foreach (var item in items)
    {
        objects.Add(new
        {
            index = new
            {
                _index = indexName,
                _type = "responses"
            }
        });
        objects.Add(item);
    }   

    var r = await _client.BulkPutAsync<ResponseRootObject>(indexName, "responses", new PostData<object>(objects));
}

Exception:

Unhandled Exception: System.AggregateException: One or more errors occurred. (A task was canceled.) ---> Elasticsearch.Net.UnexpectedElasticsearchClientException: A task was canceled. ---> System.Threading.Tasks.TaskCanceledException: A task was canceled.
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
   at System.Net.Http.HttpClient.<FinishSendAsync>d__58.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Elasticsearch.Net.HttpConnection.<RequestAsync>d__7`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Elasticsearch.Net.RequestPipeline.<CallElasticsearchAsync>d__66`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Elasticsearch.Net.Transport`1.<RequestAsync>d__15`1.MoveNext()
   --- End of inner exception stack trace ---
   at Elasticsearch.Net.Transport`1.<RequestAsync>d__15`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   at Xyz.BusinessIntelligence.DataGenerator.ElasticSearchAdaptor.<Insert>d__2.MoveNext()
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.WaitAll(Task[] tasks, Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.WaitAll(Task[] tasks, Int32 millisecondsTimeout)
   at System.Threading.Tasks.Task.WaitAll(Task[] tasks)
   at Xyz.BusinessIntelligence.DataGenerator.Generator.Generate(IAdaptor[] adaptors)
   at Xyz.BusinessIntelligence.DataGenerator.Program.Main(String[] args)

Any ideas?