Nest 5.5 Error using BulkAll method

2019-08-29 02:05发布


The BulkAll method is indexing all the correct records and returns health = green; but the BulkAllObservable is returning via the onError track with the following message:

Unsuccessful low level call on POST: /[Index Name]/[Index Type]/_bulk?pretty=true **# Audit trail of this API call: - [1] BadResponse: Node: http://localhost:9200/ Took: 00:00:00.0028955

ServerError: ServerError: 400Type: parse_exception Reason: "request body is required"

OriginalException: System.Net.WebException: The remote server returned an error: (400) Bad Request.**

at System.Net.HttpWebRequest.EndGetResponse(IAsyncResult asyncResult) at System.Threading.Tasks.TaskFactory1.FromAsyncCoreLogic(IAsyncResult iar, Func2 endFunction, Action1 endAction, Task1 promise, Boolean requiresSynchronization) --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task) at Elasticsearch.Net.HttpConnection.d__14`1.MoveNext()


# Response:
  "error" : {
    "root_cause" : [
        "type" : "parse_exception",
        "reason" : "request body is required"
    "type" : "parse_exception",
    "reason" : "request body is required"

We've tried varying the backoff time and batch size to no avail.


var cancellationTokenSource = new CancellationTokenSource();
    return _elasticClient.BulkAll(capacities.Value, ba => ba
        // in case of 429 response (too many requests), how long we should wait before retrying
        // in case of 429 response, how many times to retry before failing
        // number of documents to send in each request

Calling code:

public async Task BuildIndex(string indexName)
    _logger.Debug($"MatchingIndexEngine - BuildIndex(indexName) - Index: {indexName}");

    Lazy<IEnumerable<Elastic.CarrierCapacity>> _lazyElasticCapacities;

    var indexExistsResponse = await IndexExistsAsync(indexName).ConfigureAwait(false);
    if (!indexExistsResponse)
        await CreateIndexAsync(indexName).ConfigureAwait(false);
        _lazyElasticCapacities = new Lazy<IEnumerable<Elastic.CarrierCapacity>>(GetPagedElasticCapacities);

        var handle = new ManualResetEvent(false);
        var bulkAllObservable = _matchingIndexRepository.BulkAll(_lazyElasticCapacities, indexName);

        var observer = new BulkAllObserver(
            onNext: r =>
                if (r.Page % (_settingsHelper.PagesPerLog()) == 0)
                    _logger.Debug($"BulkIndexCapacitiesHandler - added batch {r.Page}");
            onError: async e =>
                _logger.Error($"MatchingIndexEngine - error building index: {indexName} | Message: {e.Message} | Inner Exception: {e.InnerException} | Data: {e.Data}");

                await ActivateIndexAsync(indexName).ConfigureAwait(false);
                var deprecatedIndexes = await GetDeprecatedMatchingIndexesAsync(indexName).ConfigureAwait(false);
                await DeleteIndexesAsync(deprecatedIndexes).ConfigureAwait(false);

                throw new ElasticException($"MatchingIndexEngine - error building index: {indexName}");
            onCompleted: async () =>
                await ActivateIndexAsync(indexName).ConfigureAwait(false);
                var deprecatedIndexes = await GetDeprecatedMatchingIndexesAsync(indexName).ConfigureAwait(false);
                await DeleteIndexesAsync(deprecatedIndexes).ConfigureAwait(false);

                _logger.Info($"Rebuild of new index was successful - switched alias to new index: {indexName}.");
