Multi-threading a merge sorting algorithm

2019-06-07 00:24发布

问题:

I have a class that does some recursive merge sorting on a generic List, as long as the element implements Comparable. I'm trying to make the code multi-threaded to improve performance, and to do this, I have a static variable maxThreads which keeps the number of threads I create from not exploding, and I have a static variable currentThreads that keeps track of the number of threads I currently have running. There seems to be a race condition on my currentThreads variable, but I haven't been able to come up with a solution to fix it.

import java.util.ArrayList;
import java.util.List;

public class ThreadedMergeSorter<E extends Comparable<? super E>> implements, Runnable  
{
  private List<E> list;
  private List<E> left, right;
  private Thread t1, t2;
  private static final int maxThreads = 4;
  private static AtomicInteger currentThreads = new AtomicInteger(0);

  private ThreadedMergeSorter(List<E> list)
  {
    this.list = list;
  }

  public ThreadedMergeSorter(){}


  /**
   * Sorts a List<E> using the merge sorting algorithm
   * @param list the list to be merge sorted
   * @return 
   * @throws InterruptedException 
   */
  public void sort(List<E> list) 
  {
    if(list.size() > 1)
    {                  
      left = new ArrayList<E>(list.subList(0, list.size()/2));
      right = new ArrayList<E>(list.subList(list.size()/2, list.size()));

      list.clear();

      if(currentThreads.get() < maxThreads)
      {
        t1 = new Thread(new ThreadedMergeSorter<E>(left));
        t1.start();
        currentThreads.incrementAndGet();
      }
      else sort(left);

      if(currentThreads.get() < maxThreads)
      {
        t2 = new Thread(new ThreadedMergeSorter<E>(right));
        t2.start();
        currentThreads.incrementAndGet();
      }
      else sort(right);

      try{
        if(t1 != null)
        {
          t1.join();
          currentThreads.decrementAndGet();
        }
        if(t2 != null)
        {
          t2.join();
          currentThreads.decrementAndGet();
        }
      }catch(InterruptedException e){}

      list.addAll(mergeSortedLists(left, right)); 
    } 
  }

  /**
   * Merges two previously sorted List<E extends Comparable<E>> into a single List
   * @param leftArray a List of previously sorted elements
   * @param rightArray a List of previously sorted elements
   * @return an new sorted List
   */
  private List<E> mergeSortedLists(List<E> leftList, List<E> rightList)
  {
    ArrayList<E> list = new ArrayList<E>();

    while(!leftList.isEmpty() && !rightList.isEmpty())
    {
      if((leftList.get(0)).compareTo(rightList.get(0)) <= 0)
        list.add(leftList.remove(0));        
      else
        list.add(rightList.remove(0));
    }

    if(!leftList.isEmpty())
      list.addAll(leftList);
    if(!rightList.isEmpty())
      list.addAll(rightList);

    return list;
  }


  @Override
  public void run() 
  {
    sort(this.list);
  }
}

The problem is in the sort(List<E> list) method by the if statements and the try catch block.

回答1:

First off, you are not running anything in parallel. Threads are started with start(), not run(), which simply calls the run method on the current thread.

Second, if you have shared variables being updated, try to declare them as AtomicInteger:

private static AtomicInteger currentThreads = new AtomicInteger(0);

Then use these methods to atomically increment/decrement:

currentThreads.incrementAndGet();
currentThreads.decrementAndGet();


回答2:

Don't continually create, terminate and destroy threads. Don't try to micro-manage threads - it's very difficult and error-prone, as you have found out.

If you want to thread off a merge sort, (and it's not a bad idea:), look at ThreadPoolExecutor and CountDownLatch.



回答3:

If you are using Java 7 I would recommend to use the new Fork/Join, and use an AtomicReferenceArray<E> instead of a List so that you can do sorting in place in a thread-safe way.



回答4:

Another solution (assuming you are running Java 5 and newer) can be declaring currentThreads as a volatile class member:

private static volatile int currentThreads = 0;

You can read more about volatile keyword here.