I designed a RecursiveTask
Here is the code for task I designed.
public class SearchTask extends RecursiveTask<Map<Short, Long>> {
private static final long serialVersionUID = 1L;
private int majorDataThreshold = 16001;
private ConcurrentNavigableMap<Short, Long> dataMap;
private long fromRange;
private long toRange;
private boolean fromInclusive;
private boolean toInclusive;
public SearchTask(final Map<Short, Long> dataSource, final long fromRange, final long toRange,
final boolean fromInclusive, final boolean toInclusive) {
this.dataMap = new ConcurrentSkipListMap<>(dataSource);
this.fromRange = fromRange;
this.toRange = toRange;
this.fromInclusive = fromInclusive;
this.toInclusive = toInclusive;
}
@Override
protected Map<Short, Long> compute() {
final int size = dataMap.size();
// This is not a perfect RecursiveTask, because the if condition is designed to overcome a stackoverflow error when map filled with 32k data
if (size > majorDataThreshold+1000) {
// List<SearchTask> tasks = createSubtasks();
// tasks.get(0).fork();
// tasks.get(1).fork();
// Map<Short, Long> map = new ConcurrentHashMap<>(tasks.get(0).join());
// map.putAll(tasks.get(1).join());
// return map;
return ForkJoinTask.invokeAll(createSubtasks()).stream().map(ForkJoinTask::join)
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toConcurrentMap(Entry::getKey, Entry::getValue));
}
return search();
}
private List<SearchTask> createSubtasks() {
final short lastKey = dataMap.lastKey();
final short midkey = (short) (lastKey / 2);
final short firstKey = dataMap.firstKey();
final List<SearchTask> dividedTasks = new ArrayList<>();
dividedTasks.add(
new SearchTask(new ConcurrentSkipListMap<Short, Long>(dataMap.subMap(firstKey, true, midkey, false)),
fromRange, toRange, fromInclusive, toInclusive));
dividedTasks
.add(new SearchTask(new ConcurrentSkipListMap<Short, Long>(dataMap.subMap(midkey, true, lastKey, true)),
fromRange, toRange, fromInclusive, toInclusive));
return dividedTasks;
}
private Map<Short, Long> search() {
final Map<Short, Long> result = dataMap.entrySet().stream()
.filter(serchPredicate(fromRange, toRange, fromInclusive, toInclusive))
.collect(Collectors.toConcurrentMap(p -> p.getKey(), p -> p.getValue()));
return result;
}
private static Predicate<? super Entry<Short, Long>> serchPredicate(final long fromValue, final long toValue,
final boolean fromInclusive, final boolean toInclusive) {
if (fromInclusive && !toInclusive)
return p -> (p.getValue() >= fromValue && p.getValue() < toValue);
else if (!fromInclusive && toInclusive)
return p -> (p.getValue() > fromValue && p.getValue() <= toValue);
else if (fromInclusive && toInclusive)
return p -> (p.getValue() >= fromValue && p.getValue() <= toValue);
else
return p -> (p.getValue() > fromValue && p.getValue() < toValue);
}
Maximum data handled this task is 32000 (32k)
In the code I'm splitting up tasks if it pass a Threshold
if (size > majorDataThreshold)
When I try to reduce the majorDataThreshold less than 16001 value I'm getting an error
Stack Trace
at java.util.concurrent.RecursiveTask.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinPool.helpStealer(Unknown Source)
at java.util.concurrent.ForkJoinPool.awaitJoin(Unknown Source)
at java.util.concurrent.ForkJoinTask.doJoin(Unknown Source)
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
...........................Same trace
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
Caused by: java.lang.StackOverflowError
... 1024 more
Caused by: java.lang.StackOverflowError
... 1024 more
.................Same trace
Caused by: java.lang.StackOverflowError
at java.util.Collection.stream(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.search(SearchTask.java:74)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:56)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
at java.util.concurrent.RecursiveTask.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:52)
at com.ed.search.framework.forkjoin.SearchTask.compute(SearchTask.java:1)
To Solve this I tried to use
Collectors.toMap()
ConcurrentHashMap
Join Manually
Still issue didn't got resolved
Could some one help me to find what is wrong in my RecursiveTask
task.
Unit Test Code
public class Container32kUniqueDataTest {
private ForkJoinRangeContainer forkJoinContianer;
@Before
public void setUp(){
long[] data = genrateTestData();
forkJoinContianer = new ForkJoinRangeContainer(data)
}
private long[] genrateTestData(){
long[] data= new long[32000];
for (int i = 0; i < 32000; i++) {
data[i]=i+1;
}
return data;
}
@Test
public void runARangeQuery_forkJoin(){
Set<Short> ids = forkJoinContianer.findIdsInRange(14, 17, true, true);
assertEquals(true, ids.size()>0);
}
}
A skimmed version of Container Code
public class ForkJoinRangeContainer {
private Map<Short, Long> dataSource = new HashMap<Short, Long>();
public ForkJoinRangeContainer(long[] data) {
populateData(data);
}
private void populateData(final long[] data) {
for (short i = 0; i < data.length; i++) {
dataSource.put(i, data[i]);
}
}
public Set<Short> findIdsInRange(final long fromValue, long toValue, boolean fromInclusive, boolean toInclusive) {
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
SearchTask task = new SearchTask(dataSource, fromValue, toValue, fromInclusive, toInclusive);
Map<Short, Long> map = forkJoinPool.invoke(task);
forkJoinPool.shutdown();
return map.keySet();
}
public static void main(String[] args) {
long[] data = new long[32000];
for (int i = 0; i < 32000; i++) {
data[i] = i + 1;
}
ForkJoinRangeContainer rf2 = new ForkJoinRangeContainer(data);
Set<Short> ids = rf2.findIdsInRange(14, 17, true, true);
if (ids.size() > 0) {
System.out.println("Found Ids");
}
}