I am investigating processing logs from web user sessions via Google Dataflow/Apache Beam and need to combine the user's logs as they come in (streaming) with the history of a user's session from the last month.
I have looked at the following approaches:
- Use a 30 day fixed window: most likely to large of a window to fit into memory, and I do not need to update the user's history, just refer to it
- Use CoGroupByKey to join two data sets, but the two data sets must have the same window size (https://cloud.google.com/dataflow/model/group-by-key#join), which isn't true in my case (24h vs 30 days)
- Use Side Input to retrieve the user's session history for a given
element
inprocessElement(ProcessContext processContext)
My understanding is that the data loaded via .withSideInputs(pCollectionView)
needs to fit into memory. I know I can fit all of a single user's session history into memory, but not all session histories.
My question is if there is a way to load/stream data from a side input that is only relevant to the current user session?
I am imagining a parDo function that would load the user's history session from the side input by specifying the user's ID. But only the current user's history session would fit into memory; loading all history sessions through side input would be too large.
Some pseudo code to illustrate:
public static class MetricFn extends DoFn<LogLine, String> {
final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;
public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
this.pHistoryView = historyView;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);
final LogLine currentLogLine = processContext.element();
final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
processContext.output(outputMetric);
}
}