Group data with RXJava 2, add element to each grou

2019-08-22 05:57发布

I have an RXJava 2 observer that emits objects of type somethingWithDate. These objects have a property that contains a java date object as well as a boolean isHeader.

What I need to do with these objects is:

  1. Group them by month and year (all dates in January of 2017, all dates in January of 2016, all dates in February 2016, ...)
  2. Add a header (also a somethingWithDate) at the start of the group
  3. Sort groups in regard to each other (by date, sorting inside the groups would be nice too but not needed, the header needs to stay at first position in each group)
  4. Merged(?) all elements from all groups back into one list of elements single<List<somethingWithDate>>

Here is an example (all objects of type somethingWithDate, but I will only display the important properties here to make it more readable):

Input Data:

1.1.2017, 2.1.2017, 1.1.2016, 8.8.2017

Result:

header, 1.1.2016, header, 1.1.2017, 2.1.2017, header, 8.8.2017

I know 1. can be achieved by using groupBy(). I haven't found a way to do 2 though. I also know there is a toList() and toSortedList() to maybe use for 3. and 4. but I am unsure how that works with the groups from the groupBy() and how I can collect them back in one list.

Pseudocode:

inputListObserver
    .groupBy(obj.date.year + obj.date.month)
    .map(group -> group.addAtBeginning(headerElement))
    .sortBy(groupKey)
    .mergeToList()
    ...

2条回答
SAY GOODBYE
2楼-- · 2019-08-22 06:26

I think I finally found an answer. Using flatmap (thanks for the idea @masps)seems to do the trick. Here is the pseudo code:

.groupBy(makeKeyFromMonthAndYear()) // groups everything in months
.sorted(byGroupKey) // sorts the groups
.flatMap( // collect all groups back into one
   .startsWith(generateHeaderElement()) // add a header for each group
   .sorted() // optional, sort the values in a month
)
.toList() // output a Single<List<somethingWithDate>>
查看更多
倾城 Initia
3楼-- · 2019-08-22 06:36

I would use a ConnectableObservable to share the emissions of the initial stream. With this pseudocode you have 3 sorted streams by year with a header.

ConnectableSoureObservable = sourceObservable.publish()      
Observable.Concat(
                    ConnectableSoureObservable
                                        .filter(Year2015)
                                        .collectInto(newList<somethingWithDate>(initialValue))
                                        .toSortedList()
                                        .flatMapIterable(),
                    ConnectableSoureObservable
                                        .filter(Year2016)
                                        .collectInto(newList<somethingWithDate>(initialValue))
                                        .toSortedList()
                                        .flatMapIterable(),
                    ConnectableSoureObservable
                                        .filter(Year2017)
                                        .collectInto(newList<somethingWithDate>(initialValue))
                                        .toSortedList()
                                        .flatmapIterable())
                    ...
 ConnectableSoureObservable.connect()
查看更多
登录 后发表回答