I have a SQL Server CDC set up on few tables. Once the CDC is intiated the cdc tables are populated. I would want to process these changes and generate MQ messages for every change that occurs to send it to an external message queue.
What is the best way to process this data. I have looked at few products like sqdata but I was thinking if there is any better way of doing it. I have looked at CDC with Service broker but this generates message to be send to an external application only.
My other problem is when the CDC change generates the message I would want it to delete the change data so that if I want to scale this processing service then it should not process the already processed data.
Recently I created a solution using virtual cdc instances (whereby an unlimited number of separate and distinct instances can act upon a single target table, using a single real capture instance, and this was one of the issues that I needed to overcome.
My solution was to use a persisted data table to receive the data from the CT table and then process the data from there. By setting this up using a job that runs every 30 seconds, and a table that stores the instance name and the last LSN, I am able to hive off the data into persisted storage, either on the same client database or on a separate common database (on or off the same sql instance).
This allows me to specify the cleanup and reduce the amount of data stored locally at any time.
For the processing aspect, that will require your use of the cdc functions that are created when the instances are created. By using the [$update_mask] column, you can determine the actual column changes, and simply pull those values out (notwithstanding an operation 1,2 or 3/4) together with the [$start_lsn]/__$seqval, timestamp (garnered from the LSN).
To decipher the update_mask column and Timestamp, you can use the following code:
CDC used is predicated on the use of LSNs to find out what you've processed in the stream. You'll need to keep track of intervals that you've processed somehow (I like to stick them in a table in the database). The table looks something like:
Your processing loop would look something like the following (for each capture instance):
where IsProcessed = 0
"sys.fn_cdc_max_lsn()
. If you found one, just use it as though you had inserted it yourself.cdc.fn_cdc_get_all_changes_<capture_instance>
orcdc.fn_cdc_get_net_changes_<capture_instance>
as appropriate for your situation. Either way, you'll need a value for from_lsn. If the dbo.CDCProcessing table has rows for this CaptureInstance, grab the one with the largest FarEndpoint value and callsys.fn_cdc_increment_lsn
on it. If there are no rows, callsys.fn_cdc_get_min_lsn
for this capture instance.The above is pretty dumb as far as what happens if you're in the middle of an interval and it aborts. That is, you could end up processing some CDC records more than once. If this is important to you, you could amend the process to take into account what the last message that was processed by your downstream system was and update the CDCProcessing table accordingly. But that's left as an exercise to the reader.
As for your other problem regarding purging, that's not quite how it works. When you set up CDC, a job should be created that keeps a rolling interval (I think it defaults to 3 days worth). The job runs periodically and trims the CDC data to the retention interval. So, assuming that that job runs, you shouldn't have to worry about it.