Let me preface that I'm a noob to Logic Apps and Data Factory. Anyways, I'm currently working on an integration and one part of it is that I need to trigger a pipeline in Data Factory from Logic Apps. I've successfully done that, the one part I can't seem to figure out is how to pass parameters to my pipeline. I've tried altering the JSON under both the "parameters" & "triggers" sections but haven't gotten anything to click so far. The pipeline ends up executing, but only with the default parameters.
Has anybody had any success in doing this yet? Any help is appreciated.
You can use the body property of the logic app's "Create a pipeline run" action to pass parameters to the pipeline. As always, be careful because not only is this action in preview but I also couldn't find this solution in any MS documentation. I just made an educated guess based on how other similar actions are formatted.
Example:
"Run_my_pipeline": {
"inputs": {
"host": {
"connection": {
"name": "@parameters('$connections')['azuredatafactory']['connectionId']"
}
},
"method": "post",
"body": {
"param1": "myParamValue",
"param2": "myParamValue"
},
"path": "...",
"queries": {
"x-ms-api-version": "2017-09-01-preview"
},
"authentication": "@parameters('$authentication')"
}
}
as I said in the comment I create a workaround with azure functions. Azure functions and logic app work well together.
On this link you can see how to create and manage pipelines with .net
https://docs.microsoft.com/en-us/azure/data-factory/quickstart-create-data-factory-dot-net
If you already have ADF and pipeline, you just want to run it (with pipelines) then you can just
Dictionary<string, object> parameters = new Dictionary<string, object>
{
{"BoxSerialNumbers", req.BoxSerialNumbers},
{"StartDate", req.StartDate },
{"EndDate",req.EndDate },
{"Recipient", req.Recipient }
};//this is how you add initialaze parameters
var client = Authenticate(); //Authentication with azure
log.Info("Creating.");
CreateRunResponse runResponse = client.Pipelines.CreateRun(resourceGroup, dataFactoryName, "pipeline1", parameters);//run pipeline, you can do this async (it's better)
log.Info("Created.");
var response = new HttpResponseMessage();
if (client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId).Status.Equals("InProgress"))
{
response = new HttpResponseMessage(HttpStatusCode.OK)
{
Content = new StringContent(runResponse.RunId, Encoding.UTF8)
};
}
else
{
response = new HttpResponseMessage(HttpStatusCode.BadRequest)
{
Content = new StringContent("Pipeline didn't started", Encoding.UTF8)//just some validation for function
};
}
return response;
public static DataFactoryManagementClient Authenticate()
{
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationID, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
return new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionID };
}
So in the request, you can pass your parameters from a logic app, with runId you can check status. Then in logic app just simple HTTP request to call this function. Hope this help someone.
I used DraganB's solution but the call signature on
CreateRunResponse runResponse = client.Pipelines.CreateRun(resourceGroup, dataFactoryName, "pipeline1", parameters);
has changed. Minor edits make this work perfectly:
CreateRunResponse runResponse = client.Pipelines.CreateRun(resourceGroup, dataFactoryName, "pipeline1", parameters: parameters);
Here's the function for anyone that needs it.
[FunctionName("DatafactoryShim")]
public async static Task<HttpResponseMessage> Run(
[HttpTrigger(AuthorizationLevel.Function, "post")]
HttpRequestMessage req,
ExecutionContext context,
TraceWriter log
)
{
string messageBody = await req.Content.ReadAsStringAsync();
BlobToDatalakeFactoryParameters postValues = JsonHelper.ToClass<BlobToDatalakeFactoryParameters>(messageBody);
Dictionary<string, object> parameters = new Dictionary<string, object>
{
{"blobContainer", postValues.BlobContainer},
{"blobFolder", postValues.BlobFolder },
{"relativeDatalakeFolder", postValues.RelativeDatalakeFolder },
{"modelType", postValues.ModelType }
}; //this is how you add initialaze parameters
var client = Authenticate(); //Authentication with azure
string resourceGroup = ConfigurationManager.AppSettings["resourceGroup"];
string dataFactoryName = ConfigurationManager.AppSettings["dataFactoryName"];
string pipelineName = ConfigurationManager.AppSettings["pipelineName"];
Console.WriteLine("Creating pipeline run...");
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
resourceGroup,
dataFactoryName,
pipelineName,
parameters: parameters).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
var response = new HttpResponseMessage();
if (client.PipelineRuns.Get(ConfigurationManager.AppSettings["resourceGroup"],
ConfigurationManager.AppSettings["dataFactoryName"], runResponse.RunId).Status.Equals("InProgress"))
{
response = new HttpResponseMessage(HttpStatusCode.OK)
{
Content = new StringContent(runResponse.RunId, Encoding.UTF8)
};
}
else
{
response = new HttpResponseMessage(HttpStatusCode.BadRequest)
{
Content =
new StringContent("Pipeline didn't started", Encoding.UTF8) //just some validation for function
};
}
return response;
}