友情链接:UDF部署教程
背景:
UDF来源与HIVE,Hive可以允许用户编写自己定义的函数UDF,来在查询中使用。Inceptor中UDF的开发规范与Hive相同,有3种UDF:
UDF: 以单个数据行为参数,输出单个数据行;
UDAF: 以多个数据行为参数,输出一个数据行。
UDTF: 以一个数据行为参数,输出多个数据行为一个表作为输出。
A. UDF(User Defined Function),即用户自定义函数,能结合SQL语句一起使用,更好地表达复杂的业务逻辑,一般以单个数据行为参数,输出单个数据行;比如数学函数、字符串函数、时间函数、拼接函数;
B. UDTF(User Defined Table Function),即用户自定义表函数,它与UDF类似。区别在于UDF只能实现一对一,而它用来实现多(行/列)对多(行/列)数据的处理逻辑。一般以一个数据行为参数,输出多个数据行为一个表作为输出,如lateral、view、explore。
C. UDAF(User Defined Aggregate Function),用户自定义聚合函数,是由用户自主定义的,用法同如MAX、MIN和SUM已定义的聚合函数一样的处理函数。由于UDAF也属于聚合函数中的一种,同样也需要与GROUPBY结合使用。一般UDAF以多个数据行为参数,接收多个数据行,并输出一个数据行,比如COUNT、MAX。
UDF、UDTF、UDAF的开发要点及如何使用?
UDF开发要点:package io.transwarp.udf; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.LongWritable; public final class ToTimestamp extends UDF { public LongWritable evaluate(String time){ String str1 = time.substring(0, time.indexOf(".")); String str2 = time.substring(time.indexOf(".")+1); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); long millisec; Date date= new Date(); try { date = simpleDateFormat.parse(str1); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } millisec = date.getTime()+Long.parseLong(str2); LongWritable result = new LongWritable(millisec); System.out.println(result); return result; } }UDTF开发要点:
package io.transwarp.udtf; import java.util.ArrayList; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; public class SplitUDF extends GenericUDTF{ @Override public void close() throws HiveException { // TODO Auto-generated method stub } @Override public StructObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException { // TODO Auto-generated method stub if(arg0.length != 1){ throw new UDFArgumentLengthException("SplitString only takes one argument"); } if(arg0[0].getCategory() != ObjectInspector.Category.PRIMITIVE){ throw new UDFArgumentException("SplitString only takes string as a parameter"); } ArrayList<String> fieldNames = new ArrayList<>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<>(); fieldNames.add("col1"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("col2"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } @Override public void process(Object[] arg0) throws HiveException { // TODO Auto-generated method stub String input = arg0[0].toString(); String[] inputSplits = input.split("#"); for (int i = 0; i < inputSplits.length; i++) { try { String[] result = inputSplits[i].split(":"); forward(result); } catch (Exception e) { continue; } } } }执行效果如下:
package udaf.transwarp.io; import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; import org.apache.hadoop.io.IntWritable; //UDAF是输入多个数据行,产生一个数据行 //用户自定义的UDAF必须是继承了UDAF,且内部包含多个实现了exec的静态类 public class MaxiNumber extends UDAF{ public static class MaxiNumberIntUDAFEvaluator implements UDAFEvaluator{ //最终结果 private IntWritable result; //负责初始化计算函数并设置它的内部状态,result是存放最终结果的 @Override public void init() { result=null; } //每次对一个新值进行聚集计算都会调用iterate方法 public boolean iterate(IntWritable value) { if(value==null) return false; if(result==null) result=new IntWritable(value.get()); else result.set(Math.max(result.get(), value.get())); return true; } //Hive需要部分聚集结果的时候会调用该方法 //会返回一个封装了聚集计算当前状态的对象 public IntWritable terminatePartial() { return result; } //合并两个部分聚集值会调用这个方法 public boolean merge(IntWritable other) { return iterate(other); } //Hive需要最终聚集结果时候会调用该方法 public IntWritable terminate() { return result; } } }