I am working on a product data importing system which downloads product data from external sources, translates it into the proper schema, and stores the results - essentially an ETL system. The core type of message the system handles is "ImportProductCommand" which specifies the product to import and the source. Import commands however, are rarely sent individually. A typical business requirement is to import a whole set of products from a given source. Currently, this is expressed as an "ImportProductsCommand" message which can specify multiple products to import. A command handler consumes this message, converts it into individual "ImportProductCommand" messages and sends them to a queue for processing. The consumer of individual import requests publishes a "ProductImportedEvent" or "ProductImportFailedEvent". When the "ImportProductsCommand" message is received, the service assigns a GUID token to the message, places the message on a queue, and returns the token. The token is then used as a correlation ID so that individual import requests can be associated with the batch import request. Given this infrastructure, it is possible to determine the number of events associated with a given token, and thus the number of imported products or failed imports. What is missing is an explicit event to indicate that a batch import has completed. The handler of individual import requests isn't explicitly aware that it is part of a batch import request. This of course can be inferred by knowing how many products were to be imported and by counting the number of import events associated with a specific correlation ID. The currently implementation leverages the message queue system to handle process restarts and failures, but is less explicit about the batch import request. Overall, the queries the system needs to answer are:
- Is a given batch import done?
- How many individual imports are remaining for a given batch import?
- How many individual imports completed?
- How many were erroneous?
What are some best practices or suggested approaches to support these queries and still leverage the message queuing system for resilience? Currently, what ties it all together is the token mentioned above, but there is no explicit record to represent the batch import request entity, and if there was, then the individual import request processor would need to be aware of such an entity to update the status accordingly.
All of this is implemented using C#, NServiceBus and hosted as an IIS WCF application.