I have a Hive table that holds data of customer calls. For simplicity consider it has 2 columns, first column holds the customer ID and the second column holds the timestamp of the call (unix timestamp).
I can query this table to find all the calls for each customer:
SELECT * FROM mytable SORT BY customer_id, call_time;
The result is:
Customer1 timestamp11
Customer1 timestamp12
Customer1 timestamp13
Customer2 timestamp21
Customer3 timestamp31
Customer3 timestamp32
...
Is it possible to create a Hive query that returns, for each customer, starting from the second call, the time interval between two succesive calls? For the above example that query should return:
Customer1 timestamp12-timestamp11
Customer1 timestamp13-timestamp12
Customer3 timestamp32-timestamp31
...
I have tried to adapt the solutions from the sql solution, but I'm stuck with the Hive limitations: it accepts subqueries only in FROM and joins must contain only equalities.
Thank you.
EDIT1:
I have tried to use a Hive UDF function:
public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;
public String evaluate(String customerId, LongWritable callTime) {
long callTimeValue = callTime.get();
String timeDifference = null;
if (customerId.equals(previousCustomerId)) {
timeDifference = new Long(callTimeValue - previousCallTime).toString();
}
previousCustomerId = customerId;
previousCallTime = callTimeValue;
return timeDifference;
}}
and use it with the name "delta".
But it seems (from the logs and result) that it is being used at MAP time. 2 problems arise from this:
First: The table data must be sorted by Customer ID and timestamp BEFORE using this function. The query:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;
does not work because the sorting part is performed at REDUCE time, long after my function is being used.
I can sort the table data before using the function, but I'm not happy with this because it is an overhead I hope to avoid.
Second: In case of a distributed Hadoop configuration, data is split among the available job trackers. So I believe there will be multiple instances of this function, one for each mapper, so it is possible to have the same customer data split between 2 mappers. In this case I will lose customer calls, which is not acceptable.
I don't know how to solve this issue. I know that DISTRIBUTE BY ensures that all data with a specific value is sent to the same reducer (thus ensuring that SORT works as expected), does anybody know if there is something similar for the mapper?
Next I plan to follow libjack's suggestion to use a reduce script. This "computation" is needed between some other hive queries, so I want to try everything Hive offers, before moving to another tool, as suggested by Balaswamy vaddeman.
EDIT2:
I started to investigate the custom scripts solution. But, in the first page of chapter 14 in Programming Hive book (this chapter presents the custom scripts), I found the following paragraph:
Streaming is usually less efficient than coding the comparable UDFs or InputFormat objects. Serializing and deserializing data to pass it in and out of the pipe is relatively inefficient. It is also harder to debug the whole program in a unified manner. However, it is useful for fast prototyping and for leveraging existing code that is not written in Java. For Hive users who don’t want to write Java code, it can be a very effective approach.
So it was clear that the custom scripts is not the best solution in terms of efficiency.
But how should I keep my UDF function, but make sure it works as expected in a distributed Hadoop configuration? I found the answer to this question in the UDF Internals section of the Language Manual UDF wiki page. If I write my query:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;
it is executed at REDUCE time and DISTRIBUTE BY and SORT BY constructs guarantee that all the records from the same customer are being processed by the same reducer, in order of calls.
So the above UDF and this query construct solve my problem.
(Sorry for not adding the links, but I'm not allowed to do it because I don't have enough reputation points)
Maybe someone encounters a similar requirement, the solution I found is the following:
1) Create a custom function:
2) Create a jar containing this function. Suppose the jarname is myjar.jar.
3) Copy the jar to the machine with Hive. Suppose it is placed in /tmp
4) Define the custom function inside Hive:
5) Execute the query:
Remarks:
a. I assumed that the call_time column stores data as bigint. In case it is string, in process function we retrieve it as string (as we do with the customerId), then parse it to Long
b. I decided to use a UDTF instead of UDF because this way it generates all the data it needs. Otherwise (with UDF) the generated data needs to be filtered to skip NULL values. So, with the UDF function (DeltaComputerUDF) described in the first edit of the original post, the query will be:
c. Both functions (UDF and UDTF) work as desired, no matter the order of rows inside the table (so there is no requirement that table data to be sorted by customer id and call time before using delta functions)
It's an old question, but for future references, I write here another proposition:
Hive Windowing functions allows to use previous / next values in your query.
A similar code query may be :
You can use explicit
MAP-REDUCE
with other programming language like Java or Python. Where emit from map{cutomer_id,call_time}
and in reducer you will get{customer_id,list{time_stamp}}
and in reducer you can sort these time stamps and can process the data.