Linear funnel from a collection of events with Mon

2019-04-11 03:08发布

问题:

I have a number of event documents, each event has a number of fields, but the ones that are relevant for my query are:

  • person_id - a reference to the person that triggered the event
  • event - a string key to identify the event
  • occurred_at - the utc of the time the event occurred

What I want to achieve is:

  • for a list of event keys eg `['event_1','event_2', 'event_3']
  • get counts of the number of people that performed each event and all the event previous to that event, in order, ie:
    • the number of people who performed event_1
    • the number of people who performed event_1, and then event_2
    • the number of people who performed event_1, and then event_2, and then event_3
    • etc
  • a secondary goal is to be able to get the average occurred_at date for each event so that I can calculate the average time between each event

The best I have got is the following two map reduces:

db.events.mapReduce(function () {
  emit(this.person_id, {
    e: [{
      e: this.event,
      o: this.occurred_at
    }]
  })
}, function (key, values) {
  return {
    e: [].concat.apply([], values.map(function (x) {
      return x.e
    }))
  }
}, {
  query: {
    account_id: ObjectId('52011239b1b9229f92000003'),
    event: {
      $in: ['event_a', 'event_b', 'event_c','event_d','event_e','event_f']
    }
  },
  out: 'people_funnel_chains',
  sort: { person_id: 1, occurred_at: 1 }
})

And then:

db.people_funnel_chains.mapReduce(function() {
  funnel = ['event_a', 'event_b', 'event_c','event_d','event_e','event_f']
  events = this.value.e;
  for (var e in funnel) {
    e = funnel[e];
    if ((i = events.map(function (x) {
      return x.e
    }).indexOf(e)) > -1) {
      emit(e, { c: 1, o: events[i].o })
      events = events.slice(i + 1, events.length);
    } else {
      break;
    }
  }
}, function(key,values) {
    return {
        c: Array.sum(values.map(function(x) { return x.c })),
        o: new Date(Array.sum(values.map(function(x) { return x.o.getTime() }))/values.length)
    };
}, { out: {inline: 1} })

I would like to achieve this is in real time using the aggregate framework but can see no way to do it. For 10s of thousands of records this is taking 10s of seconds, I can run it incrementally which means its fast enough for new data coming in but if I want to modify the original query (eg change the event chain) it can't be done in a single request which I would love it to be able to do.

Update using Cursor.forEach()

Using Cursor.forEach() I've managed to get huge improvement on this (essentially removing the requirement for the first map reduce).

var time = new Date().getTime(), funnel_event_keys = ['event_a', 'event_b', 'event_c','event_d','event_e','event_f'], looking_for_i = 0, looking_for = funnel_event_keys[0], funnel = {}, last_person_id = null;
for (var i in funnel_event_keys) { funnel[funnel_event_keys[i]] = [0,null] };
db.events.find({
  account_id: ObjectId('52011239b1b9229f92000003'),
  event: {
    $in: funnel_event_keys
  }
}, { person_id: 1, event: 1, occurred_at: 1 }).sort({ person_id: 1, occurred_at: 1 }).forEach(function(e) {

  var current_person_id = e['person_id'].str; 

  if (last_person_id != current_person_id) {
    looking_for_i = 0;
    looking_for = funnel_event_keys[0]
  }

  if (e['event'] == looking_for) {
    var funnel_event = funnel[looking_for]
    funnel_event[0] = funnel_event[0] + 1;
    funnel_event[1] = ((funnel_event[1] || e['occurred_at'].getTime()) + e['occurred_at'].getTime())/2;
    looking_for_i = looking_for_i + 1;
    looking_for = funnel_event_keys[looking_for_i]
  }

  last_person_id = current_person_id;
})
funnel;
new Date().getTime() - time;

I wonder if something custom with data in memory would be able to improve on this? Getting 100s of thousands of records out of MongoDB into memory (on a different machine) is going to be a bottle neck, is there a technology I'm not aware of that could do this?

回答1:

I wrote up a complete answer on my MongoDB blog but as a summary, what you have to do is project your actions based on which ones you care about to map values of action field into appropriate key names, group by person aggregating for the three actions when they did them (and optionally how many times) and then project new fields which check if action2 was done after action1, and action3 was done after action2... Last phase just sums up the number of people who did just 1, or 1 and then 2, or 1 and then 2 and then 3.

Using a function to generate the aggregation pipeline, it's possible to generate results based on array of actions passed in.

In my test case, the entire pipeline ran in under 200ms for a collection of 40,000 documents (this was on my small laptop).

As it was correctly pointed out, the general solution I describe assumes that while an actor can take any action multiple times that they can only advance from action1 to action2 but that they cannot skip directly from action1 to action3 (interpreting action order as describing prerequisites where you cannot do action3 until you've done action2).

As it turns out, aggregation framework can be used even for sequences of events where the order is completely arbitrary but you still want to know how many people at some point did the sequence action1, action2, action3.

The main adjustment to make on the original answer is to add an extra two-stage step in the middle. This step unwinds the collected by person document to re-group it finding the first occurrence of the second action that comes after the first occurrence of the first action.

Once we have that the final comparison becomes for action1, followed by earliest occurrence of action2 and compare that to the latest occurrence of action3.

It can probably be generalized to handle arbitrary number of events but every additional event past two would add two more stages to the aggregation.

Here is my write-up of the modification of the pipeline to achieve the answer you are looking for.