Nest 5.5 BulkAll Observable error

2019-09-16 14:57发布

问题:

This references a previous question: Nest 5.5 Error using BulkAll method. Here is the related code. The following is in a repository:

var cancellationTokenSource = new CancellationTokenSource();
        return _elasticClient.BulkAll(capacities.Value, ba => ba
            .Index(indexName)
            .Type(IndexType)
            .MaxDegreeOfParallelism(_settingsHelper.GetBulkAllDegreeOfParallelism())
            // in case of 429 response (too many requests), how long we should wait before retrying
            .BackOffTime(TimeSpan.FromSeconds(_settingsHelper.GetBulkAllBackOffTimeoutInSeconds()))
            // in case of 429 response, how many times to retry before failing
            .BackOffRetries(_settingsHelper.GetBulkAllBackOffRetries())
            // number of documents to send in each request
            .Size(_settingsHelper.GetBulkAllBatchSize())
            .RefreshOnCompleted(),
            cancellationTokenSource.Token
        );

It is then called from another file as such:

public async Task BuildIndex(string indexName)
    {
        _logger.Debug($"MatchingIndexEngine - BuildIndex(indexName) - Index: {indexName}");

        Lazy<IEnumerable<Elastic.CarrierCapacity>> _lazyElasticCapacities;

        var indexExistsResponse = await IndexExistsAsync(indexName).ConfigureAwait(false);
        if (!indexExistsResponse)
        {
            await CreateIndexAsync(indexName).ConfigureAwait(false);
            _lazyElasticCapacities = new Lazy<IEnumerable<Elastic.CarrierCapacity>>(GetPagedElasticCapacities);

            var handle = new ManualResetEvent(false);
            var bulkAllObservable = _matchingIndexRepository.BulkAll(_lazyElasticCapacities, indexName);

            var observer = new BulkAllObserver(
                onNext: r =>
                {
                    if (r.Page % (_settingsHelper.PagesPerLog()) == 0)
                    {
                        _logger.Debug($"BulkIndexCapacitiesHandler - added batch {r.Page}");
                    }
                },
                onError: async e =>
                {
                    _logger.Error($"MatchingIndexEngine - error building index: {indexName} | Message: {e.Message} | Inner Exception: {e.InnerException} | Data: {e.Data}");

                    await ActivateIndexAsync(indexName).ConfigureAwait(false);
                    var deprecatedIndexes = await GetDeprecatedMatchingIndexesAsync(indexName).ConfigureAwait(false);
                    await DeleteIndexesAsync(deprecatedIndexes).ConfigureAwait(false);

                    throw new ElasticException($"MatchingIndexEngine - error building index: {indexName}");
                },
                onCompleted: async () =>
                {
                    await ActivateIndexAsync(indexName).ConfigureAwait(false);
                    var deprecatedIndexes = await GetDeprecatedMatchingIndexesAsync(indexName).ConfigureAwait(false);
                    await DeleteIndexesAsync(deprecatedIndexes).ConfigureAwait(false);

                    _logger.Info($"Rebuild of new index was successful - switched alias to new index: {indexName}.");
                }
            );
            bulkAllObservable.Subscribe(observer);


            handle.WaitOne();
            handle.Dispose();
        }

Note that the code in the onError block, before the custom error, performs its operations on the newly created index as expected.