Want to use Task Parallel Library with progress re

2019-01-26 22:18发布

问题:

i developed a application where multiple connection string are stored. i just iterate in for loop and connect each db and execute sql against each db. in this way i update multiple database with bulk sql statement.

Now i need to use now Task Parallel Library to update multiple db simultaneously instead of updating one after one in loop. i also want to handle exception and also want to show multiple progress bar for multiple database operation. pause & resume functionality should be there.

when i will click on button then multiple db connection will be open and for each task a new progress bar will be added in my form. each progress bar will show each db operation progress. when any task will be finish then that corresponding progress bar will be removed from form.

any one can guide me with sample code how to do it using TPL. here i got one code which update one progressbar but i need to update multiple progress bar. int iterations = 100;

ProgressBar pb = new ProgressBar();   
pb.Maximum = iterations;   
pb.Dock = DockStyle.Fill;   
Controls.Add(pb);   

Task.Create(delegate   
{   
    Parallel.For(0, iterations, i =>  
    {   
        Thread.SpinWait(50000000); // do work here   
        BeginInvoke((Action)delegate { pb.Value++; });   
    });   
}); 

UPDATE Question

i did it in this way. code works but all progress bar value increase sequentially. i have one form and one user control in winform apps. please have a look at my code and tell me what is wrong there.

main for code

public partial class Main : Form
    {
        public Main()
        {
            InitializeComponent();
            this.DoubleBuffered = true;
        }

        private void btnStart_Click(object sender, EventArgs e)
        {
            Progress ucProgress = null;
            Dictionary<string, string> dicList = new Dictionary<string, string>();
            dicList.Add("GB", "conn1");
            dicList.Add("US", "conn2");
            dicList.Add("DE", "conn3");
            fpPanel.Controls.Clear();

            Task.Factory.StartNew(() =>
            {
                foreach (KeyValuePair<string, string> entry in dicList)
                {
                    ucProgress = new Progress();
                    ucProgress.Country = entry.Key;
                    ucProgress.DBConnection = entry.Value;

                    fpPanel.BeginInvoke((MethodInvoker)delegate
                    {
                        fpPanel.Controls.Add(ucProgress);
                        ucProgress.Process();
                    });
                    //fpPanel.Controls.Add(ucProgress);


                    System.Threading.Thread.SpinWait(5000000);
                }
            });

        }

        private void Main_Resize(object sender, EventArgs e)
        {
            this.Invalidate();
        }
        private void Main_Paint(object sender, PaintEventArgs e)
        {
            using (LinearGradientBrush brush = new LinearGradientBrush(this.ClientRectangle,
                                                               Color.WhiteSmoke,
                                                               Color.LightGray,
                                                               90F))
            {
                e.Graphics.FillRectangle(brush, this.ClientRectangle);
            }
        }
    }

user control code

public partial class Progress : UserControl
    {
        public Progress()
        {
            InitializeComponent();
            lblMsg.Text = "";
            pbStatus.Minimum = 0;
            pbStatus.Maximum = 100;
        }

        public string Country { get; set; }
        public string DBConnection { get; set; }
        public string Sql { get; set; }

        public void SetMessage(string strMsg)
        {
            lblMsg.Text = strMsg;
        }

        public void Process()
        {
            var uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
            Task.Factory.StartNew(() =>
            {
                lblMsg.BeginInvoke((MethodInvoker)delegate
                {
                    lblMsg.Text = "Connecting country " + Country;
                });

                pbStatus.BeginInvoke((MethodInvoker)delegate
                {
                    pbStatus.Value = 30;
                });
                System.Threading.Thread.SpinWait(50000000);

                //***********
                lblMsg.BeginInvoke((MethodInvoker)delegate
                {
                    lblMsg.Text = "executing sql for country " + Country;
                });

                pbStatus.BeginInvoke((MethodInvoker)delegate
                {
                    pbStatus.Value = 60;
                });
                System.Threading.Thread.SpinWait(50000000);

                //***********

                lblMsg.BeginInvoke((MethodInvoker)delegate
                {
                    lblMsg.Text = "sql executed successfully for country " + Country;
                });

                pbStatus.BeginInvoke((MethodInvoker)delegate
                {
                    pbStatus.Value = 100;
                });
                System.Threading.Thread.SpinWait(50000000);

            });
            //System.Threading.Thread.SpinWait(50000000); // do work here   
        }
    }

回答1:

perhaps it can be starting point. Handling pause/resume depends from your needs and can be tweaked.

var cancellationTokenSource = new CancellationTokenSource();
var cancellation = cancellationTokenSource.Token;

void UpdateDatabases(IEnumerable<...> databases, CancellationToken cancellation)
{
   foreach(db in databases)
   {


   //create as many ProgressBar instances as databases you want to update
   //check if ProgressBar exist, then return it and reuse, otherwise create new
   ProgressBar pb = new ProgressBar();   
   pb.Maximum = iterations;   
   pb.Dock = DockStyle.Fill;   

   Controls.Add(pb);  



  //start thread for every database/progress bar

  Task.Factory.StartNew(progressBar => 
  {   
      var start = (ProgressBar)progressBar).Value; //use last value in case of pause
      Parallel.For(start, iterations, 
          new ParallelOptions(){CancellationToken =  cancellation}
      (i, loopState) =>  
      {   
          if (loopState.ShouldExitCurrentIteration)
                return;
          //perhaps check loopState.ShouldExitCurrentIteration inside worker method
          Thread.SpinWait(50000000); // do work here   

          BeginInvoke((Action)delegate { ((ProgressBar)progressBar).Value++; });   
      });   
   }, 
   pb, cancellation)

   .ContinueWith(task => 
  {
      //to handle exceptions use task.Exception member

      var progressBar = (ProgressBar)task.AsyncState;
      if (!task.IsCancelled)
      {
          //hide progress bar here and reset pb.Value = 0
      }
  }, 
  TaskScheduler.FromCurrentSynchronizationContext() //update UI from UI thread
  ); 

   }
}

 //.........

 //Call
 UpdateDatabases(databases, cancellation)  

 //To suspend, call 
 cancellationTokenSource.Cancel();

 //To resume - simply call UpdateDatabases  again
 cancellationTokenSource = new CancellationTokenSource();
 cancellation = cancellationTokenSource.Token;
 UpdateDatabases(databases, cancellation)  

Update

I've reviewed your code. Take a look at the revisited code and adapt it to your needs. Main mistakes - mess with closures and creating the Progress from non-ui thread. To enable parallel processing you can use Parallel.ForEach (see MSND for possible overloads). Also the design looks little bit strange for me(you're mixing data and logic in the Progress ). From UI perspective it's also strange that progress bars will appear in order of processing but not in original order as they are in list (it will be a problem if you decide to sort the list alphabetically)

I hope it help.

main for code

   private void btnStart_Click(object sender, EventArgs e)
    {
        Progress ucProgress = null;
        Dictionary<string, string> dicList = new Dictionary<string, string>();
        dicList.Add("GB", "conn1");
        dicList.Add("US", "conn2");
        dicList.Add("DE", "conn3");
        fpPanel.Controls.Clear();

        Func<KeyValuePair<string, string>, object> createProgress = entry =>
        {

            var tmp = new Progress {Country = entry.Key, DBConnection = entry.Value};
            fpPanel.Controls.Add(tmp);
            return tmp;
        };

        Task.Factory.StartNew(() =>
        {
            //foreach (KeyValuePair<string, string> entry in dicList)

            Parallel.ForEach(dicList,
                entry =>
                {

                    //create and add the Progress in UI thread
                    var ucProgress = (Progress)fpPanel.Invoke(createProgress, entry);

                    //execute ucProgress.Process(); in non-UI thread in parallel. 
                    //the .Process(); must update UI by using *Invoke
                    ucProgress.Process();

                    System.Threading.Thread.SpinWait(5000000);
                });
        });

    }

user control code

public void Process()
    {
        //uiScheduler - Not used
        //var uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
        //The Task is not necessary because the Process() called from Parallel.ForEach 
        //Task.Factory.StartNew(() =>
        //{

            //BeginInvoke or Invoke required
            lblMsg.BeginInvoke((MethodInvoker)delegate
            {
                lblMsg.Text = "Connecting country " + Country;
            });

            pbStatus.BeginInvoke((MethodInvoker)delegate
            {
                pbStatus.Value = 30;
            });
            System.Threading.Thread.SpinWait(50000000);

            //***********
            lblMsg.BeginInvoke((MethodInvoker)delegate
            {
                lblMsg.Text = "executing sql for country " + Country;
            });

            pbStatus.BeginInvoke((MethodInvoker)delegate
            {
                pbStatus.Value = 60;
            });
            System.Threading.Thread.SpinWait(50000000);

            //***********

            lblMsg.BeginInvoke((MethodInvoker)delegate
            {
                lblMsg.Text = "sql executed successfully for country " + Country;
            });

            pbStatus.BeginInvoke((MethodInvoker)delegate
            {
                pbStatus.Value = 100;
            });
            System.Threading.Thread.SpinWait(50000000);

        //});
        //System.Threading.Thread.SpinWait(50000000); // do work here   
    }