This references a previous question: Nest 5.5 Error using BulkAll method. Here is the related code. The following is in a repository:
var cancellationTokenSource = new CancellationTokenSource();
return _elasticClient.BulkAll(capacities.Value, ba => ba
.Index(indexName)
.Type(IndexType)
.MaxDegreeOfParallelism(_settingsHelper.GetBulkAllDegreeOfParallelism())
// in case of 429 response (too many requests), how long we should wait before retrying
.BackOffTime(TimeSpan.FromSeconds(_settingsHelper.GetBulkAllBackOffTimeoutInSeconds()))
// in case of 429 response, how many times to retry before failing
.BackOffRetries(_settingsHelper.GetBulkAllBackOffRetries())
// number of documents to send in each request
.Size(_settingsHelper.GetBulkAllBatchSize())
.RefreshOnCompleted(),
cancellationTokenSource.Token
);
It is then called from another file as such:
public async Task BuildIndex(string indexName)
{
_logger.Debug($"MatchingIndexEngine - BuildIndex(indexName) - Index: {indexName}");
Lazy<IEnumerable<Elastic.CarrierCapacity>> _lazyElasticCapacities;
var indexExistsResponse = await IndexExistsAsync(indexName).ConfigureAwait(false);
if (!indexExistsResponse)
{
await CreateIndexAsync(indexName).ConfigureAwait(false);
_lazyElasticCapacities = new Lazy<IEnumerable<Elastic.CarrierCapacity>>(GetPagedElasticCapacities);
var handle = new ManualResetEvent(false);
var bulkAllObservable = _matchingIndexRepository.BulkAll(_lazyElasticCapacities, indexName);
var observer = new BulkAllObserver(
onNext: r =>
{
if (r.Page % (_settingsHelper.PagesPerLog()) == 0)
{
_logger.Debug($"BulkIndexCapacitiesHandler - added batch {r.Page}");
}
},
onError: async e =>
{
_logger.Error($"MatchingIndexEngine - error building index: {indexName} | Message: {e.Message} | Inner Exception: {e.InnerException} | Data: {e.Data}");
await ActivateIndexAsync(indexName).ConfigureAwait(false);
var deprecatedIndexes = await GetDeprecatedMatchingIndexesAsync(indexName).ConfigureAwait(false);
await DeleteIndexesAsync(deprecatedIndexes).ConfigureAwait(false);
throw new ElasticException($"MatchingIndexEngine - error building index: {indexName}");
},
onCompleted: async () =>
{
await ActivateIndexAsync(indexName).ConfigureAwait(false);
var deprecatedIndexes = await GetDeprecatedMatchingIndexesAsync(indexName).ConfigureAwait(false);
await DeleteIndexesAsync(deprecatedIndexes).ConfigureAwait(false);
_logger.Info($"Rebuild of new index was successful - switched alias to new index: {indexName}.");
}
);
bulkAllObservable.Subscribe(observer);
handle.WaitOne();
handle.Dispose();
}
Note that the code in the onError block, before the custom error, performs its operations on the newly created index as expected.