ElasticSearch .NET核心TaskCanceledException(ElasticS

2019-10-29 18:17发布

运行一个非常简单的程序(.NET核心1.0.0-preview2-003121,NEST 2.4.2,视窗7),其与随机数据生成文档,并将它们推到与本地主机上运行的ElasticSearch 2.3.4节点ES_HEAP_SIZE=4g

该程序记录批次插入和使用BulkPutAsync API发送。 它运行正常了一段时间,超过500K的文件中插入好。 然后程序似乎停滞,ElasticSearch似乎停止索引(CPU使用率下降),最后一个TaskCanceledException被抛出。 ElasticSearch日志不显示任何可疑的东西,群集状态仍然是绿色的。

Generator.cs:

public static void Generate(params IAdaptor[] adaptors)
{
    ...
    for (int s = 0; s < TotalSemesters; s++)
    {
        ...
        if (responses.Count >= 10000)
        {
            var toInsert = responses.ToArray();
            var insertions = adaptors.Select(a => a.Insert(indexName, toInsert)).ToArray();
            Task.WaitAll(insertions);
            responses.Clear();
        }
        ...
    }
    ...
}

ElasticSearchAdaptor.cs:

public async Task Insert(string indexName, ResponseRootObject[] items)
{
    var objects = new List<object>();
    foreach (var item in items)
    {
        objects.Add(new
        {
            index = new
            {
                _index = indexName,
                _type = "responses"
            }
        });
        objects.Add(item);
    }   

    var r = await _client.BulkPutAsync<ResponseRootObject>(indexName, "responses", new PostData<object>(objects));
}

例外:

Unhandled Exception: System.AggregateException: One or more errors occurred. (A task was canceled.) ---> Elasticsearch.Net.UnexpectedElasticsearchClientException: A task was canceled. ---> System.Threading.Tasks.TaskCanceledException: A task was canceled.
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
   at System.Net.Http.HttpClient.<FinishSendAsync>d__58.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Elasticsearch.Net.HttpConnection.<RequestAsync>d__7`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Elasticsearch.Net.RequestPipeline.<CallElasticsearchAsync>d__66`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Elasticsearch.Net.Transport`1.<RequestAsync>d__15`1.MoveNext()
   --- End of inner exception stack trace ---
   at Elasticsearch.Net.Transport`1.<RequestAsync>d__15`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   at Xyz.BusinessIntelligence.DataGenerator.ElasticSearchAdaptor.<Insert>d__2.MoveNext()
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.WaitAll(Task[] tasks, Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.WaitAll(Task[] tasks, Int32 millisecondsTimeout)
   at System.Threading.Tasks.Task.WaitAll(Task[] tasks)
   at Xyz.BusinessIntelligence.DataGenerator.Generator.Generate(IAdaptor[] adaptors)
   at Xyz.BusinessIntelligence.DataGenerator.Program.Main(String[] args)

有任何想法吗?

文章来源: ElasticSearch .NET Core TaskCanceledException