甲GenericUDF函数提取一个字段从结构的数组(A GenericUDF Function to

2019-10-17 22:15发布

我想写一个GenericUDF功能,每条记录都有一个阵列内收集所有的特定结构域(S),并在一个数组的形式返回它们。

我写的GenericUDF(如下),它似乎工作,但:

1)不,当我在一个外部表进行这方面的工作,它工作正常在管理表中,任何想法?

2)我有一个艰难的时间写这个测试。 我附上测试​​我有,到目前为止,这是行不通的,总是得到“的java.util.ArrayList不能转换到org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector”或无法施展字符串LazyString”,我的问题是我怎么供应结构的列表为安勤的方法?

任何帮助将不胜感激。

桌子:

CREATE EXTERNAL TABLE FOO (  
  TS string,  
  customerId string,  
  products array< struct<productCategory:string> >  
)  
PARTITIONED BY (ds string)  
ROW FORMAT SERDE 'some.serde'  
WITH SERDEPROPERTIES ('error.ignore'='true')  
LOCATION 'some_locations'  
;

记录的行包含:
1340321132000, 'some_company', [{"productCategory":"footwear"},{"productCategory":"eyewear"}]

这是我的代码:

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.Text;

import java.util.ArrayList;

@Description(name = "extract_product_category",
    value = "_FUNC_( array< struct<productcategory:string> > ) - Collect all product category field values inside an array of struct(s), and return the results in an array<string>",
    extended = "Example:\n SELECT _FUNC_(array_of_structs_with_product_category_field)")
public class GenericUDFExtractProductCategory
        extends GenericUDF
{
    private ArrayList ret;

    private ListObjectInspector listOI;
    private StructObjectInspector structOI;
    private ObjectInspector prodCatOI;

    @Override
    public ObjectInspector initialize(ObjectInspector[] args)
            throws UDFArgumentException
    {
        if (args.length != 1) {
            throw new UDFArgumentLengthException("The function extract_product_category() requires exactly one argument.");
        }

        if (args[0].getCategory() != Category.LIST) {
            throw new UDFArgumentTypeException(0, "Type array<struct> is expected to be the argument for extract_product_category but " + args[0].getTypeName() + " is found instead");
        }

        listOI = ((ListObjectInspector) args[0]);
        structOI = ((StructObjectInspector) listOI.getListElementObjectInspector());

        if (structOI.getAllStructFieldRefs().size() != 1) {
            throw new UDFArgumentTypeException(0, "Incorrect number of fields in the struct, should be one");
        }

        StructField productCategoryField = structOI.getStructFieldRef("productCategory");
        //If not, throw exception
        if (productCategoryField == null) {
            throw new UDFArgumentTypeException(0, "NO \"productCategory\" field in input structure");
        }

        //Are they of the correct types?
        //We store these object inspectors for use in the evaluate() method
        prodCatOI = productCategoryField.getFieldObjectInspector();

        //First are they primitives
        if (prodCatOI.getCategory() != Category.PRIMITIVE) {
            throw new UDFArgumentTypeException(0, "productCategory field must be of string type");
        }

        //Are they of the correct primitives?
        if (((PrimitiveObjectInspector)prodCatOI).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            throw new UDFArgumentTypeException(0, "productCategory field must be of string type");
        }

        ret = new ArrayList();

        return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    }

    @Override
    public ArrayList evaluate(DeferredObject[] arguments)
            throws HiveException
    {
        ret.clear();

        if (arguments.length != 1) {
            return null;
        }

        if (arguments[0].get() == null) {
        return null;
        }

        int numElements = listOI.getListLength(arguments[0].get());

        for (int i = 0; i < numElements; i++) {
            LazyString prodCatDataObject = (LazyString) (structOI.getStructFieldData(listOI.getListElement(arguments[0].get(), i), structOI.getStructFieldRef("productCategory")));
            Text productCategoryValue = ((StringObjectInspector) prodCatOI).getPrimitiveWritableObject(prodCatDataObject);
            ret.add(productCategoryValue);
        }
        return ret;
    }

    @Override
    public String getDisplayString(String[] strings)
    {
        assert (strings.length > 0);
        StringBuilder sb = new StringBuilder();
        sb.append("extract_product_category(");
        sb.append(strings[0]);
        sb.append(")");
        return sb.toString();
    }
}

我的测试:

import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;

public class TestGenericUDFExtractShas
{
    ArrayList<String> fieldNames = new ArrayList<String>();
    ArrayList<ObjectInspector> fieldObjectInspectors = new ArrayList<ObjectInspector>();

    @Test
    public void simpleTest()
        throws Exception
    {
        ListObjectInspector firstInspector = new MyListObjectInspector();

        ArrayList test = new ArrayList();
        test.add("test");

        ArrayList test2 = new ArrayList();
        test2.add(test);

        StructObjectInspector soi = ObjectInspectorFactory.getStandardStructObjectInspector(test, test2);

        fieldNames.add("productCategory");
           fieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);

        GenericUDF.DeferredObject firstDeferredObject = new MyDeferredObject(test2);

        GenericUDF extract_product_category = new GenericUDFExtractProductCategory();

        extract_product_category.initialize(new ObjectInspector[]{firstInspector});

        extract_product_category.evaluate(new DeferredObject[]{firstDeferredObject});
    }

    public class MyDeferredObject implements DeferredObject
    {
        private Object value;

        public MyDeferredObject(Object value) {
            this.value = value;
        }

        @Override
        public Object get() throws HiveException
        {
            return value;
        }
    }

    private class MyListObjectInspector implements ListObjectInspector
    {
        @Override
        public ObjectInspector getListElementObjectInspector()
        {
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldObjectInspectors);
        }

        @Override
        public Object getListElement(Object data, int index)
        {
            List myList = (List) data;
            if (myList == null || index > myList.size()) {
                return null;
            }
            return myList.get(index);
        }

        @Override
        public int getListLength(Object data)
        {
            if (data == null) {
                return -1;
            }
            return ((List) data).size();
        }

        @Override
        public List<?> getList(Object data)
        {
            return (List) data;
        }

        @Override
        public String getTypeName()
        {
            return null;  //To change body of implemented methods use File | Settings | File Templates.
        }

        @Override
        public Category getCategory()
        {
            return Category.LIST;
        }
    }
}

Answer 1:

我无法测试说话,但一个警告下面讨论,我想我有一个与外部表的问题的解决方案。

在适应你的代码我需要我的评价方法改变串地龙:

您的代码:

LazyString prodCatDataObject = (LazyString) (structOI.getStructFieldData(listOI.getListElement(arguments[0].get(), i), structOI.getStructFieldRef("productCategory")));
Text productCategoryValue = ((StringObjectInspector) prodCatOI).getPrimitiveWritableObject(prodCatDataObject);

我以前的代码:

LazyLong indDataObject = (LazyLong) (structOI.getStructFieldData(listOI.getListElement(arguments[0].get(), i), structOI.getStructFieldRef(indexName)));
LongWritable indValue = ((LazyLongObjectInspector) indOI).getPrimitiveWritableObject(indDataObject);

你可以看到他们是同样的逻辑用不同的数据类型等。

这为我工作与非外部表。 不与外部表的工作。

我可以用这个代替我的旧代码来解决此问题:

long indValue = (Long) (structOI.getStructFieldData(listOI.getListElement(arguments[0].get(), i), structOI.getStructFieldRef(indexName)));

在另一个版本中,我在那里返回文本

你也许可以做类似的事情,即通过转换成文本/字符串中的第一步。

您还可能需要更改public Text evaluate(DeferredObject[] arguments)public Object evaluate(DeferredObject[] arguments)

对于处理阵列可用一些工作UDF的源代码在这里 。

现在的告诫:这不会出现存储为ORC表工作。 (同样没有原代码,请注意)。 我可能会创建一个关于这个问题。 我不知道是什么问题。



文章来源: A GenericUDF Function to Extract a Field From an Array of Structs‏