该BulkAll方法是索引的所有正确的记录,并返回健康=绿色; 但BulkAllObservable经由的onError轨道与以下消息返回:
在POST不成功低水平电话:/ [ 索引名称 ] / [ 索引类型 ] / _散装漂亮=真**此API调用的#审计线索: - [1] BadResponse:节点: HTTP://本地主机:9200 /接过:00:00:00.0028955
SERVERERROR:SERVERERROR:400Type:parse_exception原因: “请求主体需要”
OriginalException:System.Net.WebException:远程服务器返回错误:(400)错误的请求**
在System.Net.HttpWebRequest.EndGetResponse(IAsyncResult的asyncResult)在System.Threading.Tasks.TaskFactory 1.FromAsyncCoreLogic(IAsyncResult iar, Func
2 endfunction下面,动作1 endAction, Task
1个承诺,布尔requiresSynchronization)---堆栈跟踪结束从以前的位置在那里引发异常---在System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(工作任务)在System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(工作任务)在System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(任务任务)在Elasticsearch.Net.HttpConnection.d__14`1.MoveNext()
请求:
# Response:
{
"error" : {
"root_cause" : [
{
"type" : "parse_exception",
"reason" : "request body is required"
}
],
"type" : "parse_exception",
"reason" : "request body is required"
},
我们试图改变退避时间和批量大小无济于事。
方法:
var cancellationTokenSource = new CancellationTokenSource();
return _elasticClient.BulkAll(capacities.Value, ba => ba
.Index(indexName)
.Type(IndexType)
.MaxDegreeOfParallelism(_settingsHelper.GetBulkAllDegreeOfParallelism())
// in case of 429 response (too many requests), how long we should wait before retrying
.BackOffTime(TimeSpan.FromSeconds(_settingsHelper.GetBulkAllBackOffTimeoutInSeconds()))
// in case of 429 response, how many times to retry before failing
.BackOffRetries(_settingsHelper.GetBulkAllBackOffRetries())
// number of documents to send in each request
.Size(_settingsHelper.GetBulkAllBatchSize())
.RefreshOnCompleted(),
cancellationTokenSource.Token
);
调用代码:
public async Task BuildIndex(string indexName)
{
_logger.Debug($"MatchingIndexEngine - BuildIndex(indexName) - Index: {indexName}");
Lazy<IEnumerable<Elastic.CarrierCapacity>> _lazyElasticCapacities;
var indexExistsResponse = await IndexExistsAsync(indexName).ConfigureAwait(false);
if (!indexExistsResponse)
{
await CreateIndexAsync(indexName).ConfigureAwait(false);
_lazyElasticCapacities = new Lazy<IEnumerable<Elastic.CarrierCapacity>>(GetPagedElasticCapacities);
var handle = new ManualResetEvent(false);
var bulkAllObservable = _matchingIndexRepository.BulkAll(_lazyElasticCapacities, indexName);
var observer = new BulkAllObserver(
onNext: r =>
{
if (r.Page % (_settingsHelper.PagesPerLog()) == 0)
{
_logger.Debug($"BulkIndexCapacitiesHandler - added batch {r.Page}");
}
},
onError: async e =>
{
_logger.Error($"MatchingIndexEngine - error building index: {indexName} | Message: {e.Message} | Inner Exception: {e.InnerException} | Data: {e.Data}");
await ActivateIndexAsync(indexName).ConfigureAwait(false);
var deprecatedIndexes = await GetDeprecatedMatchingIndexesAsync(indexName).ConfigureAwait(false);
await DeleteIndexesAsync(deprecatedIndexes).ConfigureAwait(false);
throw new ElasticException($"MatchingIndexEngine - error building index: {indexName}");
},
onCompleted: async () =>
{
await ActivateIndexAsync(indexName).ConfigureAwait(false);
var deprecatedIndexes = await GetDeprecatedMatchingIndexesAsync(indexName).ConfigureAwait(false);
await DeleteIndexesAsync(deprecatedIndexes).ConfigureAwait(false);
_logger.Info($"Rebuild of new index was successful - switched alias to new index: {indexName}.");
}
);
bulkAllObservable.Subscribe(observer);
handle.WaitOne();
handle.Dispose();
}