stream 从 kakfa 取 json 数据加载到 inceptor 表中


#1

===========非json数据=========================================================

CREATE STREAM demo(id INT, letter STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ TBLPROPERTIES(“topic”=“demo”, “kafka.zookeeper”=“192.168.16.75:2181”,“kafka.broker.list”=“192.168.16.75:9092”);

CREATE TABLE demotable(id INT,letter STRING);

kafka/bin/kafka-topics.sh --create --zookeeper 192.168.16.75:2181 --replication-factor 3 --partitions 3 --topic demo

INSERT INTO demotable SELECT * FROM demo;

kafka/bin/kafka-console-producer.sh --broker-list 192.168.16.75:9092 --topic demo

===========josn数据批量导入=========================================================

add jar hdfs://192.168.16.75:8020/hive-hcatalog-core-0.14.0.jar;

create table jsontest(id string, age string, name string) ROW FORMAT SERDE ‘org.apache.hive.hcatalog.data.JsonSerDe’ STORED AS TEXTFILE;

load data local inpath ‘/json.txt’ into table jsontest;

{“id”:“1”,“age”:“18”,“name”:“zhangsan”}
{“id”:“2”,“age”:“18”,“name”:“lisi”}

==========stream读kafka的json=================================================

kafka/bin/kafka-topics.sh --create --zookeeper 192.168.16.75:2181 --replication-factor 3 --partitions 3 --topic jsonkafka

CREATE STREAM jsonkafkastream(
id string,
age string,
name string
)
ROW FORMAT SERDE ‘org.apache.hive.hcatalog.data.JsonSerDe’
TBLPROPERTIES(“topic”=“jsonkafka”,“kafka.zookeeper”=“192.168.16.75:2181”,“kafka.broker.list”=“192.168.16.75:9092”);

CREATE table jsonkafkatable(
id string,
age string,
name string
);

INSERT INTO jsonkafkatable SELECT * FROM jsonkafkastream;

kafka/bin/kafka-console-producer.sh --broker-list 192.168.16.75:9092 --topic jsonkafka

{“id”:“1”,“age”:“18”,“name”:“zhangsan”}
{“id”:“2”,“age”:“18”,“name”:“lisi”}

【报错】【报错】【报错】【报错】【报错】【报错】
java.lang.ClassCastException:
org.apache.hive.hcatalog.data.HCatRecordObjectInspector
cannot be cast to
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector
【报错】【报错】【报错】【报错】【报错】【报错】

==========stream读kafka的json=================================================

数据样本:

{“appVersion”:“3.01.002”,“clientid”:“77035555”,“ip”:“117.136.56.78”,“mobile”:“13816495948”,“interfaceVersion”:“1.1”,“pid”:“201081017”,“systemVersion”:“5.1”,“uid”:“ac0ae910-523e-435a-b6dc-2524851bd601”,“event_id”:“3009000”,
@timestamp”:“2018-03-23T00:30:11+0800”,“systemName”:“Android”,“machine”:“vivo|vivo”,“eventName”:“进入hybrid页面”,“time”:“2018-03-23 00:30:09.520”,“currentPage”:“yjb:///yjb/information/build/News.html?stock_code=300618&code_type=285774339&skin_type=1&app_id=yjb3.0&stock_name=%E5%AF%92%E9%94%90%E9%92%B4%E4%B8%9A”}

CREATE STREAM pocstream(
appVersion string,
clientid string,
ip string,
mobile string,
interfaceVersion string,
pid string,
systemVersion string,
uid string,
event_id string,
@timestamp string,
systemName string,
machine string,
eventName string,
time string,
currentPage string
)
ROW FORMAT SERDE ‘org.apache.hive.hcatalog.data.JsonSerDe’
TBLPROPERTIES(“topic”=“poctest”,“kafka.zookeeper”=“192.168.16.75:2181”,“kafka.broker.list”=“192.168.16.75:9092”);

CREATE table poctable(
appVersion string,
clientid string,
ip string,
mobile string,
interfaceVersion string,
pid string,
systemVersion string,
uid string,
event_id string,
@timestamp string,
systemName string,
machine string,
eventName string,
time string,
currentPage string
)
ROW FORMAT SERDE ‘org.apache.hive.hcatalog.data.JsonSerDe’;

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic poctest

INSERT INTO poctable SELECT * FROM pocstream;

kafka/bin/kafka-console-producer.sh --broker-list 192.168.16.75:9092 --topic poctest

【报错】【报错】【报错】【报错】【报错】【报错】
@timestamp string,
【报错】【报错】【报错】【报错】【报错】【报错】

================================================================================================================================================


#2

stream读取json文件可以使用udf解析,或者在从kafka读数的时候指定decoder方式。可参照:


我们的技术专家在这边有个案例分享。欢迎讨论。


#3

搞定了!多谢

还有一些问题:
问题1:
beeline -u “jdbc:hive2://ip:10000/”
beeline -u “jdbc:hive2://ip:10010/”
Slipstream上面建立的表,在Inceptor上查看不到是怎么回事,两个端口数据不是互通的呀。不是很理解。

问题2:


#4

问题2:
在开发的过程中add jar后,发现jar包程序有问题,重新编辑后,再次add jar

运行结果还是原来的jar包结果,把类换了名字后,才得到想要的结果。

对应有add jar 的操作,有没有类似 delect jar 的这种操作呢?

现在通过登录各个pod节点手动删除jar包解决的。


#5

public class JsonDecoder extends ArrayDecoder<byte[]> {
public JsonDecoder(VerifiableProperties props) {
super(props);
}

@SuppressWarnings("unchecked")
@Override
public byte[][] arrayFromBytes(byte[] bytes) {
	
    String jsonString = new String(bytes);
    
	JSONObject fromObject = JSONObject.fromObject(jsonString);
	
	Map<String, String> map = fromObject;
	
	Iterator<Entry<String, String>> iterator = map.entrySet().iterator();
	
	StringBuilder sb = new StringBuilder();
	
	while(iterator.hasNext()){
		Entry<String, String> next = iterator.next();
		String value = next.getValue();
		sb.append(value+",");
	}
	
	sb.deleteCharAt(sb.length()-1);
    return new byte[][]{sb.toString().getBytes()};
}

}


#6

问题1:Slipstream 需要和inceptor共享metastore才可以


#7

问题2,delete jar确实有点麻烦,测试的时候我都是重命名udf 类名再创建不重名的函数。测试没问题后,重启inceptor然后最后add jar和create temporary function


#8

感谢分享。


#9

show jars
列出已添加jar

deleteres jar </path/to/xxxx.jar>
删除指定jar

deleteres jars
删除所有jar

重启Slipstream也可以清除jar


在线客服
在线客服
微信客服
微信客服