I am investigating processing logs from web user sessions via Google Dataflow/Apache Beam and need to combine the user's logs as they come in (streaming) with the history of a user's session from the last month.
I have looked at the following approaches:
- Use a 30 day fixed window: most likely to large of a window to fit into memory, and I do not need to update the user's history, just refer to it
- Use CoGroupByKey to join two data sets, but the two data sets must have the same window size (https://cloud.google.com/dataflow/model/group-by-key#join), which isn't true in my case (24h vs 30 days)
- Use Side Input to retrieve the user's session history for a given
element
inprocessElement(ProcessContext processContext)
My understanding is that the data loaded via .withSideInputs(pCollectionView)
needs to fit into memory. I know I can fit all of a single user's session history into memory, but not all session histories.
My question is if there is a way to load/stream data from a side input that is only relevant to the current user session?
I am imagining a parDo function that would load the user's history session from the side input by specifying the user's ID. But only the current user's history session would fit into memory; loading all history sessions through side input would be too large.
Some pseudo code to illustrate:
public static class MetricFn extends DoFn<LogLine, String> {
final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;
public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
this.pHistoryView = historyView;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);
final LogLine currentLogLine = processContext.element();
final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
processContext.output(outputMetric);
}
}
There is not currently a way of accessing per-key side inputs in streaming, but it would definitely be useful exactly as you describe, and it is something we are considering implementing.
One possible workaround is to use the side inputs to distribute pointers to the actual session history. The code generating the 24h session histories could upload them to GCS/BigQuery/etc, then send the locations as a side input to the joining code.