How can SQL Server 2012 (or SSIS) notify NServiceB

2019-06-27 16:15发布

问题:

We have some very long running ETL packages (some run for hours) that need to be kicked off by NServiceBus endpoints. We do not need to keep a single transaction alive for the entire process, and can break it up into smaller transactions. Since an NServiceBus handler will wrap itself in a transaction for the entirety, we do not want to handle this in a single transaction because it will time out--let alone create issues with locking in the DBMS.

My current thoughts are that we could spawn another process asynchronously, immediately return from the handler, and publish an event upon completion (success or failure). I have not found a lot of documentation on how to integrate the new NServiceBus 4.0 SQL Server Broker support with the traditional MSMQ transport. Is that even possible?

What is the preferred way to have a long running process in SQL Server 2012 (or an SSIS package) notify NServiceBus subscribers when it completes in an asynchronous manner?

回答1:

It looks like it is possible to do a http request from SSIS, see How to make an HTTP request from SSIS?

With that in mind you can use send a message to NServiceBus via the Gateway (the Gateway is just an HttpListener) to your Publisher to tell it to publish a message informing all the subscribers that the long running ETL package has completed.

To send a message to the gateway you need to do something like:

var webRequest = (HttpWebRequest)WebRequest.Create("http://localhost:25898/Headquarters/");
webRequest.Method = "POST";
webRequest.ContentType = "text/xml; charset=utf-8";
webRequest.UserAgent = "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)";

webRequest.Headers.Add("Content-Encoding", "utf-8");
webRequest.Headers.Add("NServiceBus.CallType", "Submit");
webRequest.Headers.Add("NServiceBus.AutoAck", "true");
webRequest.Headers.Add("NServiceBus.Id", Guid.NewGuid().ToString("N"));

const string message = "<?xml version=\"1.0\" ?><Messages xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns=\"http://tempuri.net/NServiceBus.AcceptanceTests.Gateway\"><MyRequest></MyRequest></Messages>";

using (var messagePayload = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(message)))
{
    webRequest.Headers.Add(HttpRequestHeader.ContentMd5, HttpUtility.UrlEncode(Hasher.Hash(messagePayload))); //Need to specify MD5 hash of the payload
    webRequest.ContentLength = messagePayload.Length;

    using (var requestStream = webRequest.GetRequestStream())
    {
        messagePayload.CopyTo(requestStream);
    }
}

using (var myWebResponse = (HttpWebResponse) webRequest.GetResponse())
{
    if (myWebResponse.StatusCode == HttpStatusCode.OK)
    {
        //success
    }
}

Hope this helps!



回答2:

There is actually a task in SSIS 2012 for placing messages in an MSMQ, the Message Queue Task. You just point it to your MSMQ connection and can use an Expression to customize your message with the package name, success/failure, row counts, etc.

Depending on how many packages we're talking about and how customized you want the messages to be, your best bet is to write a standalone utility to create messages in whatever format you desire, and then use an Execute Process Task to invoke that utility with whatever parameters from the package you want to pass in to be formatted into the message.

You could also use that same codebase and just create a custom SSIS task (a lot easier than it sounds.)



回答3:

One thought I had to help adhere to the DRY principle would be to use a Master SSIS package.

In my mind, it would look something like an Execute Package Task with an X connected to that. Configure the package to take as a parameter a Package Name. Configure the Execute Package Task to use the Parameter for determining what package to call.

The X would probably be a Script Task but perhaps as @Kyle Hale points out, it might be the Message Queue Task. I leave that decision to those more versed in NServiceBus.

The important thing in my mind, is to not add this logic into every package as that'd be a maintenance nightmare.