Nest 5.5 Error using BulkAll method

2019-08-29 02:05发布

问题:

The BulkAll method is indexing all the correct records and returns health = green; but the BulkAllObservable is returning via the onError track with the following message:

Unsuccessful low level call on POST: /[Index Name]/[Index Type]/_bulk?pretty=true **# Audit trail of this API call: - [1] BadResponse: Node: http://localhost:9200/ Took: 00:00:00.0028955

ServerError: ServerError: 400Type: parse_exception Reason: "request body is required"

OriginalException: System.Net.WebException: The remote server returned an error: (400) Bad Request.**

at System.Net.HttpWebRequest.EndGetResponse(IAsyncResult asyncResult) at System.Threading.Tasks.TaskFactory1.FromAsyncCoreLogic(IAsyncResult iar, Func2 endFunction, Action1 endAction, Task1 promise, Boolean requiresSynchronization) --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task) at Elasticsearch.Net.HttpConnection.d__14`1.MoveNext()

Request:

# Response:
{
  "error" : {
    "root_cause" : [
      {
        "type" : "parse_exception",
        "reason" : "request body is required"
      }
    ],
    "type" : "parse_exception",
    "reason" : "request body is required"
  },

We've tried varying the backoff time and batch size to no avail.

Method:

var cancellationTokenSource = new CancellationTokenSource();
    return _elasticClient.BulkAll(capacities.Value, ba => ba
        .Index(indexName)
        .Type(IndexType)
        .MaxDegreeOfParallelism(_settingsHelper.GetBulkAllDegreeOfParallelism())
        // in case of 429 response (too many requests), how long we should wait before retrying
        .BackOffTime(TimeSpan.FromSeconds(_settingsHelper.GetBulkAllBackOffTimeoutInSeconds()))
        // in case of 429 response, how many times to retry before failing
        .BackOffRetries(_settingsHelper.GetBulkAllBackOffRetries())
        // number of documents to send in each request
        .Size(_settingsHelper.GetBulkAllBatchSize())
        .RefreshOnCompleted(),
        cancellationTokenSource.Token
    );

Calling code:

public async Task BuildIndex(string indexName)
{
    _logger.Debug($"MatchingIndexEngine - BuildIndex(indexName) - Index: {indexName}");

    Lazy<IEnumerable<Elastic.CarrierCapacity>> _lazyElasticCapacities;

    var indexExistsResponse = await IndexExistsAsync(indexName).ConfigureAwait(false);
    if (!indexExistsResponse)
    {
        await CreateIndexAsync(indexName).ConfigureAwait(false);
        _lazyElasticCapacities = new Lazy<IEnumerable<Elastic.CarrierCapacity>>(GetPagedElasticCapacities);

        var handle = new ManualResetEvent(false);
        var bulkAllObservable = _matchingIndexRepository.BulkAll(_lazyElasticCapacities, indexName);

        var observer = new BulkAllObserver(
            onNext: r =>
            {
                if (r.Page % (_settingsHelper.PagesPerLog()) == 0)
                {
                    _logger.Debug($"BulkIndexCapacitiesHandler - added batch {r.Page}");
                }
            },
            onError: async e =>
            {
                _logger.Error($"MatchingIndexEngine - error building index: {indexName} | Message: {e.Message} | Inner Exception: {e.InnerException} | Data: {e.Data}");

                await ActivateIndexAsync(indexName).ConfigureAwait(false);
                var deprecatedIndexes = await GetDeprecatedMatchingIndexesAsync(indexName).ConfigureAwait(false);
                await DeleteIndexesAsync(deprecatedIndexes).ConfigureAwait(false);

                throw new ElasticException($"MatchingIndexEngine - error building index: {indexName}");
            },
            onCompleted: async () =>
            {
                await ActivateIndexAsync(indexName).ConfigureAwait(false);
                var deprecatedIndexes = await GetDeprecatedMatchingIndexesAsync(indexName).ConfigureAwait(false);
                await DeleteIndexesAsync(deprecatedIndexes).ConfigureAwait(false);

                _logger.Info($"Rebuild of new index was successful - switched alias to new index: {indexName}.");
            }
        );
        bulkAllObservable.Subscribe(observer);


        handle.WaitOne();
        handle.Dispose();
    }