计算Hadoop中与蜂巢查询succesive记录之间的差异(Compute differences

2019-07-19 02:50发布

我持有的客户呼叫数据的蜂巢表。 为了简单起见考虑它有两列,第一列保存客户ID和第二列包含该呼叫(Unix时间戳)的时间戳。

我可以查询此表为每个客户找到所有来电:

SELECT * FROM mytable SORT BY customer_id, call_time;

其结果是:

Customer1    timestamp11
Customer1    timestamp12
Customer1    timestamp13
Customer2    timestamp21
Customer3    timestamp31
Customer3    timestamp32
...

是否有可能创建一个配置单元查询,返回,为每一个客户,从第二个呼叫,两名succesive调用之间的时间间隔开始? 对于上面的例子,查询应该返回:

Customer1    timestamp12-timestamp11
Customer1    timestamp13-timestamp12
Customer3    timestamp32-timestamp31
...

我试图解决方案从适应SQL解决方案 ,但我坚持的蜂巢局限性: 它只能在接受子查询FROM和连接只能包含等式 。

谢谢。

EDIT1:

我曾尝试使用一个蜂巢UDF功能:

public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;

public String evaluate(String customerId, LongWritable callTime) {
    long callTimeValue = callTime.get();
    String timeDifference = null;

    if (customerId.equals(previousCustomerId)) {
        timeDifference = new Long(callTimeValue - previousCallTime).toString();
    }

    previousCustomerId = customerId;
    previousCallTime = callTimeValue;

    return timeDifference;
}}

和名称为“三角洲”使用它。

但似乎(从日志和结果),这是在MAP时使用。 从这个出现2个问题:

第一:表中的数据必须由客户ID和时间戳使用此功能之前排序。 查询:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;

因为分开部执行在缩短上市时间,正在使用我的功能不久后不起作用。

我可以使用该功能之前,表中的数据进行排序,但我不开心,是因为它是一个开销我希望避免的。

第二:在分布式Hadoop配置的情况下,数据是可用的任务跟踪器之间的分裂。 所以,我相信会有这个功能,每一个映射器的多个实例,所以它可能有2名映射器之间的相同的客户数据分离。 在这种情况下,我会失去客户的来电,这是不能接受的。

我不知道如何解决这个问题。 我知道,DISTRIBUTE BY确保具有特定值的所有数据被发送到相同的减速(从而确保排序按预期工作),没有任何人知道如果有一个映射器类似的东西?

接下来,我打算跟着libjack的建议使用减少脚本。 需要一些其他的蜂巢查询之间的“计算”,所以我想尝试一切蜂巢计划书,移动到另一个工具,通过Balaswamy vaddeman的建议之前。

EDIT2:

我开始调查了自定义脚本解决方案。 但是,在编程蜂巢书(本章介绍的自定义脚本)第14章的第一页,我发现下面的段落:

流通常比编码可比的UDF或InputFormat对象效率较低。 序列化和反序列化数据,将其进出管是相对低效的。 这也是很难以统一的方式调试整个程序。 然而,这是一个快速原型和利用未用Java编写的现有代码有用。 对于不想编写Java代码谁蜂巢的用户,它可以是一个很有效的办法。

所以很明显的是,自定义脚本是不是在效率方面的最佳解决方案。

但是我应该如何让我的UDF功能,但要确保它能正常工作在分布式Hadoop配置? 我找到了答案,在语言手册UDF wiki页面的UDF塔内部分这个问题。 如果我写我的查询:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;

它在减少时间和BY和SORT DISTRIBUTE BY结构中调用的顺序保证所有来自同一客户的记录被以相同的减速处理,执行。

所以上面的UDF与此查询构建解决我的问题。

(对不起,不添加链接,但我不能这样做,因为我没有足够的信誉分)

Answer 1:

这是一个老问题,但是为了将来的参考,我在这里写另一个命题:

蜂巢窗功能允许在查询中使用一个/下一个值。

类似的代码查询可以是:

SELECT CUSTOMER_ID,LAG(call_time,1,0)OVER(PARTITION BY CUSTOMER_ID ORDER BY call_time) - call_time FROM MYTABLE;



Answer 2:

您可以使用显式的MAP-REDUCE与其他编程语言如Java或Python。 从哪里发出的地图{cutomer_id,call_time}和减速机,你会得到{customer_id,list{time_stamp}}和减速机可以将这些时间戳进行排序,并可以处理数据。



Answer 3:

也许有人遇到类似的要求,我找到了解决办法如下:

1)创建一个自定义函数:

package com.example;
// imports (they depend on the hive version)
@Description(name = "delta", value = "_FUNC_(customer id column, call time column) "
    + "- computes the time passed between two succesive records from the same customer. "
    + "It generates 3 columns: first contains the customer id, second contains call time "
    + "and third contains the time passed from the previous call. This function returns only "
    + "the records that have a previous call from the same customer (requirements are not applicable "
    + "to the first call)", extended = "Example:\n> SELECT _FUNC_(customer_id, call_time) AS"
    + "(customer_id, call_time, time_passed) FROM (SELECT customer_id, call_time FROM mytable "
    + "DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;")
public class DeltaComputerUDTF extends GenericUDTF {
private static final int NUM_COLS = 3;

private Text[] retCols; // array of returned column values
private ObjectInspector[] inputOIs; // input ObjectInspectors
private String prevCustomerId;
private Long prevCallTime;

@Override
public StructObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException {
    if (ois.length != 2) {
        throw new UDFArgumentException(
                "There must be 2 arguments: customer Id column name and call time column name");
    }

    inputOIs = ois;

    // construct the output column data holders
    retCols = new Text[NUM_COLS];
    for (int i = 0; i < NUM_COLS; ++i) {
        retCols[i] = new Text();
    }

    // construct output object inspector
    List<String> fieldNames = new ArrayList<String>(NUM_COLS);
    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(NUM_COLS);
    for (int i = 0; i < NUM_COLS; ++i) {
        // column name can be anything since it will be named by UDTF as clause
        fieldNames.add("c" + i);
        // all returned type will be Text
        fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    }

    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}

@Override
public void process(Object[] args) throws HiveException {
    String customerId = ((StringObjectInspector) inputOIs[0]).getPrimitiveJavaObject(args[0]);
    Long callTime = ((LongObjectInspector) inputOIs[1]).get(args[1]);

    if (customerId.equals(prevCustomerId)) {
        retCols[0].set(customerId);
        retCols[1].set(callTime.toString());
        retCols[2].set(new Long(callTime - prevCallTime).toString());
        forward(retCols);
    }

    // Store the current customer data, for the next line
    prevCustomerId = customerId;
    prevCallTime = callTime;
}

@Override
public void close() throws HiveException {
    // TODO Auto-generated method stub

}

}

2)创建包含此功能的广口瓶中。 假设的JARname是myjar.jar。

3)罐复制到与配置单元的机器。 假设它被放置在/ tmp中

4)定义内部配置单元的自定义函数:

ADD JAR /tmp/myjar.jar;
CREATE TEMPORARY FUNCTION delta AS 'com.example.DeltaComputerUDTF';

5)执行该查询:

SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) FROM 
  (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;

备注:

一个。 我假定call_time列存储数据BIGINT。 如果它是字符串,在过程中的作用,我们检索的字符串(如我们做的客户ID),然后将其解析到龙

湾 我决定使用UDF的UDTF代替,因为这种方式产生它所需要的所有数据。 否则(与UDF)所产生的数据需要被过滤,以跳过NULL值。 因此,与在原岗位的第一个编辑描述的UDF函数(DeltaComputerUDF),查询将是:

SELECT customer_id, call_time, time_difference 
FROM 
  (
    SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) 
    FROM 
      (
         SELECT customer_id, call_time FROM mytable
         DISTRIBUTE BY customer_id
         SORT BY customer_id, call_time
       ) t
   ) u 
WHERE time_difference IS NOT NULL;

C。 这两个函数(UDF和UDTF)工作根据需要,不管表内行的顺序(所以没有该表的数据由客户ID进行排序,并使用δ函数之前调用时间要求)



文章来源: Compute differences between succesive records in Hadoop with Hive Queries
标签: hadoop hive