Task processing status with a message queue

2019-07-07 02:59发布

I am working on a product data importing system which downloads product data from external sources, translates it into the proper schema, and stores the results - essentially an ETL system. The core type of message the system handles is "ImportProductCommand" which specifies the product to import and the source. Import commands however, are rarely sent individually. A typical business requirement is to import a whole set of products from a given source. Currently, this is expressed as an "ImportProductsCommand" message which can specify multiple products to import. A command handler consumes this message, converts it into individual "ImportProductCommand" messages and sends them to a queue for processing. The consumer of individual import requests publishes a "ProductImportedEvent" or "ProductImportFailedEvent". When the "ImportProductsCommand" message is received, the service assigns a GUID token to the message, places the message on a queue, and returns the token. The token is then used as a correlation ID so that individual import requests can be associated with the batch import request. Given this infrastructure, it is possible to determine the number of events associated with a given token, and thus the number of imported products or failed imports. What is missing is an explicit event to indicate that a batch import has completed. The handler of individual import requests isn't explicitly aware that it is part of a batch import request. This of course can be inferred by knowing how many products were to be imported and by counting the number of import events associated with a specific correlation ID. The currently implementation leverages the message queue system to handle process restarts and failures, but is less explicit about the batch import request. Overall, the queries the system needs to answer are:

  • Is a given batch import done?
  • How many individual imports are remaining for a given batch import?
  • How many individual imports completed?
  • How many were erroneous?

What are some best practices or suggested approaches to support these queries and still leverage the message queuing system for resilience? Currently, what ties it all together is the token mentioned above, but there is no explicit record to represent the batch import request entity, and if there was, then the individual import request processor would need to be aware of such an entity to update the status accordingly.

All of this is implemented using C#, NServiceBus and hosted as an IIS WCF application.

1条回答
三岁会撩人
2楼-- · 2019-07-07 03:24

This can be implemented as NServiceBus Saga. ImportProductsCommand should be handled by a Saga(ImportProductsSaga), and Saga data can have the count of products to be imported as it sends the ImportProductCommand. ImportProductsSaga should handle ProductImportedEvent and ProductImportFailedEvent. On each of this event handled in ImportProductsSaga, increment ProductsImported or ProdctsFailedToImport. Also check the sum of (ProductsImported + ProdctsFailedToImport) equals to ProdctsToBeImported, if so, complete the saga.

ImportProductsSaga data needs to keep track of No of ImportProductCommand send, and reply received and you can calculate the pending reply etc. Saga data look something like following e.g:

   public class ImportProductsSataData{ 
       public Guid Id {get; set}
       public int ProdctsToBeImported {get; set}
       public int ProdctsImported {get; set}
       public int ProdctsFailedToImport {get; set}
}
查看更多
登录 后发表回答