运行一个非常简单的程序(.NET核心1.0.0-preview2-003121,NEST 2.4.2,视窗7),其与随机数据生成文档,并将它们推到与本地主机上运行的ElasticSearch 2.3.4节点ES_HEAP_SIZE=4g
。
该程序记录批次插入和使用BulkPutAsync API发送。 它运行正常了一段时间,超过500K的文件中插入好。 然后程序似乎停滞,ElasticSearch似乎停止索引(CPU使用率下降),最后一个TaskCanceledException
被抛出。 ElasticSearch日志不显示任何可疑的东西,群集状态仍然是绿色的。
Generator.cs:
public static void Generate(params IAdaptor[] adaptors)
{
...
for (int s = 0; s < TotalSemesters; s++)
{
...
if (responses.Count >= 10000)
{
var toInsert = responses.ToArray();
var insertions = adaptors.Select(a => a.Insert(indexName, toInsert)).ToArray();
Task.WaitAll(insertions);
responses.Clear();
}
...
}
...
}
ElasticSearchAdaptor.cs:
public async Task Insert(string indexName, ResponseRootObject[] items)
{
var objects = new List<object>();
foreach (var item in items)
{
objects.Add(new
{
index = new
{
_index = indexName,
_type = "responses"
}
});
objects.Add(item);
}
var r = await _client.BulkPutAsync<ResponseRootObject>(indexName, "responses", new PostData<object>(objects));
}
例外:
Unhandled Exception: System.AggregateException: One or more errors occurred. (A task was canceled.) ---> Elasticsearch.Net.UnexpectedElasticsearchClientException: A task was canceled. ---> System.Threading.Tasks.TaskCanceledException: A task was canceled.
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
at System.Net.Http.HttpClient.<FinishSendAsync>d__58.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Elasticsearch.Net.HttpConnection.<RequestAsync>d__7`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Elasticsearch.Net.RequestPipeline.<CallElasticsearchAsync>d__66`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Elasticsearch.Net.Transport`1.<RequestAsync>d__15`1.MoveNext()
--- End of inner exception stack trace ---
at Elasticsearch.Net.Transport`1.<RequestAsync>d__15`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
at Xyz.BusinessIntelligence.DataGenerator.ElasticSearchAdaptor.<Insert>d__2.MoveNext()
--- End of inner exception stack trace ---
at System.Threading.Tasks.Task.WaitAll(Task[] tasks, Int32 millisecondsTimeout, CancellationToken cancellationToken)
at System.Threading.Tasks.Task.WaitAll(Task[] tasks, Int32 millisecondsTimeout)
at System.Threading.Tasks.Task.WaitAll(Task[] tasks)
at Xyz.BusinessIntelligence.DataGenerator.Generator.Generate(IAdaptor[] adaptors)
at Xyz.BusinessIntelligence.DataGenerator.Program.Main(String[] args)
有任何想法吗?