I have a number of event documents, each event has a number of fields, but the ones that are relevant for my query are:
- person_id - a reference to the person that triggered the event
- event - a string key to identify the event
- occurred_at - the utc of the time the event occurred
What I want to achieve is:
- for a list of event keys eg `['event_1','event_2', 'event_3']
- get counts of the number of people that performed each event and all the event previous to that event, in order, ie:
- the number of people who performed event_1
- the number of people who performed event_1, and then event_2
- the number of people who performed event_1, and then event_2, and then event_3
- etc
- a secondary goal is to be able to get the average occurred_at date for each event so that I can calculate the average time between each event
The best I have got is the following two map reduces:
db.events.mapReduce(function () {
emit(this.person_id, {
e: [{
e: this.event,
o: this.occurred_at
}]
})
}, function (key, values) {
return {
e: [].concat.apply([], values.map(function (x) {
return x.e
}))
}
}, {
query: {
account_id: ObjectId('52011239b1b9229f92000003'),
event: {
$in: ['event_a', 'event_b', 'event_c','event_d','event_e','event_f']
}
},
out: 'people_funnel_chains',
sort: { person_id: 1, occurred_at: 1 }
})
And then:
db.people_funnel_chains.mapReduce(function() {
funnel = ['event_a', 'event_b', 'event_c','event_d','event_e','event_f']
events = this.value.e;
for (var e in funnel) {
e = funnel[e];
if ((i = events.map(function (x) {
return x.e
}).indexOf(e)) > -1) {
emit(e, { c: 1, o: events[i].o })
events = events.slice(i + 1, events.length);
} else {
break;
}
}
}, function(key,values) {
return {
c: Array.sum(values.map(function(x) { return x.c })),
o: new Date(Array.sum(values.map(function(x) { return x.o.getTime() }))/values.length)
};
}, { out: {inline: 1} })
I would like to achieve this is in real time using the aggregate framework but can see no way to do it. For 10s of thousands of records this is taking 10s of seconds, I can run it incrementally which means its fast enough for new data coming in but if I want to modify the original query (eg change the event chain) it can't be done in a single request which I would love it to be able to do.
Update using Cursor.forEach()
Using Cursor.forEach() I've managed to get huge improvement on this (essentially removing the requirement for the first map reduce).
var time = new Date().getTime(), funnel_event_keys = ['event_a', 'event_b', 'event_c','event_d','event_e','event_f'], looking_for_i = 0, looking_for = funnel_event_keys[0], funnel = {}, last_person_id = null;
for (var i in funnel_event_keys) { funnel[funnel_event_keys[i]] = [0,null] };
db.events.find({
account_id: ObjectId('52011239b1b9229f92000003'),
event: {
$in: funnel_event_keys
}
}, { person_id: 1, event: 1, occurred_at: 1 }).sort({ person_id: 1, occurred_at: 1 }).forEach(function(e) {
var current_person_id = e['person_id'].str;
if (last_person_id != current_person_id) {
looking_for_i = 0;
looking_for = funnel_event_keys[0]
}
if (e['event'] == looking_for) {
var funnel_event = funnel[looking_for]
funnel_event[0] = funnel_event[0] + 1;
funnel_event[1] = ((funnel_event[1] || e['occurred_at'].getTime()) + e['occurred_at'].getTime())/2;
looking_for_i = looking_for_i + 1;
looking_for = funnel_event_keys[looking_for_i]
}
last_person_id = current_person_id;
})
funnel;
new Date().getTime() - time;
I wonder if something custom with data in memory would be able to improve on this? Getting 100s of thousands of records out of MongoDB into memory (on a different machine) is going to be a bottle neck, is there a technology I'm not aware of that could do this?