The BulkAll method is indexing all the correct records and returns health = green; but the BulkAllObservable is returning via the onError track with the following message:
Unsuccessful low level call on POST: /[Index Name]/[Index Type]/_bulk?pretty=true **# Audit trail of this API call: - [1] BadResponse: Node: http://localhost:9200/ Took: 00:00:00.0028955
ServerError: ServerError: 400Type: parse_exception Reason: "request body is required"
OriginalException: System.Net.WebException: The remote server returned an error: (400) Bad Request.**
at System.Net.HttpWebRequest.EndGetResponse(IAsyncResult asyncResult)
at System.Threading.Tasks.TaskFactory1.FromAsyncCoreLogic(IAsyncResult iar, Func
2 endFunction, Action1 endAction, Task
1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at Elasticsearch.Net.HttpConnection.d__14`1.MoveNext()
Request:
# Response:
{
"error" : {
"root_cause" : [
{
"type" : "parse_exception",
"reason" : "request body is required"
}
],
"type" : "parse_exception",
"reason" : "request body is required"
},
We've tried varying the backoff time and batch size to no avail.
Method:
var cancellationTokenSource = new CancellationTokenSource();
return _elasticClient.BulkAll(capacities.Value, ba => ba
.Index(indexName)
.Type(IndexType)
.MaxDegreeOfParallelism(_settingsHelper.GetBulkAllDegreeOfParallelism())
// in case of 429 response (too many requests), how long we should wait before retrying
.BackOffTime(TimeSpan.FromSeconds(_settingsHelper.GetBulkAllBackOffTimeoutInSeconds()))
// in case of 429 response, how many times to retry before failing
.BackOffRetries(_settingsHelper.GetBulkAllBackOffRetries())
// number of documents to send in each request
.Size(_settingsHelper.GetBulkAllBatchSize())
.RefreshOnCompleted(),
cancellationTokenSource.Token
);
Calling code:
public async Task BuildIndex(string indexName)
{
_logger.Debug($"MatchingIndexEngine - BuildIndex(indexName) - Index: {indexName}");
Lazy<IEnumerable<Elastic.CarrierCapacity>> _lazyElasticCapacities;
var indexExistsResponse = await IndexExistsAsync(indexName).ConfigureAwait(false);
if (!indexExistsResponse)
{
await CreateIndexAsync(indexName).ConfigureAwait(false);
_lazyElasticCapacities = new Lazy<IEnumerable<Elastic.CarrierCapacity>>(GetPagedElasticCapacities);
var handle = new ManualResetEvent(false);
var bulkAllObservable = _matchingIndexRepository.BulkAll(_lazyElasticCapacities, indexName);
var observer = new BulkAllObserver(
onNext: r =>
{
if (r.Page % (_settingsHelper.PagesPerLog()) == 0)
{
_logger.Debug($"BulkIndexCapacitiesHandler - added batch {r.Page}");
}
},
onError: async e =>
{
_logger.Error($"MatchingIndexEngine - error building index: {indexName} | Message: {e.Message} | Inner Exception: {e.InnerException} | Data: {e.Data}");
await ActivateIndexAsync(indexName).ConfigureAwait(false);
var deprecatedIndexes = await GetDeprecatedMatchingIndexesAsync(indexName).ConfigureAwait(false);
await DeleteIndexesAsync(deprecatedIndexes).ConfigureAwait(false);
throw new ElasticException($"MatchingIndexEngine - error building index: {indexName}");
},
onCompleted: async () =>
{
await ActivateIndexAsync(indexName).ConfigureAwait(false);
var deprecatedIndexes = await GetDeprecatedMatchingIndexesAsync(indexName).ConfigureAwait(false);
await DeleteIndexesAsync(deprecatedIndexes).ConfigureAwait(false);
_logger.Info($"Rebuild of new index was successful - switched alias to new index: {indexName}.");
}
);
bulkAllObservable.Subscribe(observer);
handle.WaitOne();
handle.Dispose();
}