I have a Hive table that holds data of customer calls.
For simplicity consider it has 2 columns, first column holds the customer ID and the second column holds the timestamp of the call (unix timestamp).
I can query this table to find all the calls for each customer:
SELECT * FROM mytable SORT BY customer_id, call_time;
The result is:
Customer1 timestamp11
Customer1 timestamp12
Customer1 timestamp13
Customer2 timestamp21
Customer3 timestamp31
Customer3 timestamp32
...
Is it possible to create a Hive query that returns, for each customer, starting from the second call, the time interval between two succesive calls?
For the above example that query should return:
Customer1 timestamp12-timestamp11
Customer1 timestamp13-timestamp12
Customer3 timestamp32-timestamp31
...
I have tried to adapt the solutions from the sql solution, but I'm stuck with the Hive limitations: it accepts subqueries only in FROM and joins must contain only equalities.
Thank you.
EDIT1:
I have tried to use a Hive UDF function:
public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;
public String evaluate(String customerId, LongWritable callTime) {
long callTimeValue = callTime.get();
String timeDifference = null;
if (customerId.equals(previousCustomerId)) {
timeDifference = new Long(callTimeValue - previousCallTime).toString();
}
previousCustomerId = customerId;
previousCallTime = callTimeValue;
return timeDifference;
}}
and use it with the name "delta".
But it seems (from the logs and result) that it is being used at MAP time. 2 problems arise from this:
First: The table data must be sorted by Customer ID and timestamp BEFORE using this function. The query:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;
does not work because the sorting part is performed at REDUCE time, long after my function is being used.
I can sort the table data before using the function, but I'm not happy with this because it is an overhead I hope to avoid.
Second: In case of a distributed Hadoop configuration, data is split among the available job trackers. So I believe there will be multiple instances of this function, one for each mapper, so it is possible to have the same customer data split between 2 mappers. In this case I will lose customer calls, which is not acceptable.
I don't know how to solve this issue. I know that DISTRIBUTE BY ensures that all data with a specific value is sent to the same reducer (thus ensuring that SORT works as expected), does anybody know if there is something similar for the mapper?
Next I plan to follow libjack's suggestion to use a reduce script. This "computation" is needed between some other hive queries, so I want to try everything Hive offers, before moving to another tool, as suggested by Balaswamy vaddeman.
EDIT2:
I started to investigate the custom scripts solution. But, in the first page of chapter 14 in Programming Hive book (this chapter presents the custom scripts), I found the following paragraph:
Streaming is usually less efficient than coding the comparable UDFs or
InputFormat objects. Serializing and deserializing data to pass it in and
out of the pipe is relatively inefficient. It is also harder to debug the whole
program in a unified manner. However, it is useful for fast prototyping
and for leveraging existing code that is not written in Java. For Hive
users who don’t want to write Java code, it can be a very effective
approach.
So it was clear that the custom scripts is not the best solution in terms of efficiency.
But how should I keep my UDF function, but make sure it works as expected in a distributed Hadoop configuration? I found the answer to this question in the UDF Internals section of the Language Manual UDF wiki page. If I write my query:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;
it is executed at REDUCE time and DISTRIBUTE BY and SORT BY constructs guarantee that all the records from the same customer are being processed by the same reducer, in order of calls.
So the above UDF and this query construct solve my problem.
(Sorry for not adding the links, but I'm not allowed to do it because I don't have enough reputation points)
It's an old question, but for future references, I write here another proposition:
Hive Windowing functions allows to use previous / next values in your query.
A similar code query may be :
SELECT customer_id, call_time - LAG(call_time, 1, 0) OVER (PARTITION BY customer_id ORDER BY call_time) FROM mytable;
You can use explicit MAP-REDUCE
with other programming language like Java or Python.
Where emit from map {cutomer_id,call_time}
and in reducer you will get {customer_id,list{time_stamp}}
and in reducer you can sort these time stamps and can process the data.
Maybe someone encounters a similar requirement, the solution I found is the following:
1) Create a custom function:
package com.example;
// imports (they depend on the hive version)
@Description(name = "delta", value = "_FUNC_(customer id column, call time column) "
+ "- computes the time passed between two succesive records from the same customer. "
+ "It generates 3 columns: first contains the customer id, second contains call time "
+ "and third contains the time passed from the previous call. This function returns only "
+ "the records that have a previous call from the same customer (requirements are not applicable "
+ "to the first call)", extended = "Example:\n> SELECT _FUNC_(customer_id, call_time) AS"
+ "(customer_id, call_time, time_passed) FROM (SELECT customer_id, call_time FROM mytable "
+ "DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;")
public class DeltaComputerUDTF extends GenericUDTF {
private static final int NUM_COLS = 3;
private Text[] retCols; // array of returned column values
private ObjectInspector[] inputOIs; // input ObjectInspectors
private String prevCustomerId;
private Long prevCallTime;
@Override
public StructObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException {
if (ois.length != 2) {
throw new UDFArgumentException(
"There must be 2 arguments: customer Id column name and call time column name");
}
inputOIs = ois;
// construct the output column data holders
retCols = new Text[NUM_COLS];
for (int i = 0; i < NUM_COLS; ++i) {
retCols[i] = new Text();
}
// construct output object inspector
List<String> fieldNames = new ArrayList<String>(NUM_COLS);
List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(NUM_COLS);
for (int i = 0; i < NUM_COLS; ++i) {
// column name can be anything since it will be named by UDTF as clause
fieldNames.add("c" + i);
// all returned type will be Text
fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
}
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
String customerId = ((StringObjectInspector) inputOIs[0]).getPrimitiveJavaObject(args[0]);
Long callTime = ((LongObjectInspector) inputOIs[1]).get(args[1]);
if (customerId.equals(prevCustomerId)) {
retCols[0].set(customerId);
retCols[1].set(callTime.toString());
retCols[2].set(new Long(callTime - prevCallTime).toString());
forward(retCols);
}
// Store the current customer data, for the next line
prevCustomerId = customerId;
prevCallTime = callTime;
}
@Override
public void close() throws HiveException {
// TODO Auto-generated method stub
}
}
2) Create a jar containing this function. Suppose the jarname is myjar.jar.
3) Copy the jar to the machine with Hive. Suppose it is placed in /tmp
4) Define the custom function inside Hive:
ADD JAR /tmp/myjar.jar;
CREATE TEMPORARY FUNCTION delta AS 'com.example.DeltaComputerUDTF';
5) Execute the query:
SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) FROM
(SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;
Remarks:
a. I assumed that the call_time column stores data as bigint. In case it is string, in process function we retrieve it as string (as we do with the customerId), then parse it to Long
b. I decided to use a UDTF instead of UDF because this way it generates all the data it needs. Otherwise (with UDF) the generated data needs to be filtered to skip NULL values. So, with the UDF function (DeltaComputerUDF) described in the first edit of the original post, the query will be:
SELECT customer_id, call_time, time_difference
FROM
(
SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference)
FROM
(
SELECT customer_id, call_time FROM mytable
DISTRIBUTE BY customer_id
SORT BY customer_id, call_time
) t
) u
WHERE time_difference IS NOT NULL;
c. Both functions (UDF and UDTF) work as desired, no matter the order of rows inside the table (so there is no requirement that table data to be sorted by customer id and call time before using delta functions)