SlipStream输入计算后如何合并输出到kafka


#1

授信风控项目中,有这么个需求,不知道技术方案如何实现。
需求:
kafka输入一串json,json中带有用户的授信申请信息,包括用户信息、几个月的账单信息,运营商信息等,流入大数据平台, 进行计算出若干指标,比如用户的账单逾期次数,近3个月的还款率等,然后将指标和用户信息拼接成json,流出到kafka。
例如输入json:
{ "AO": { "frms_uuid": "adafsdfasdffaafasdffff", "frms_user_id": "alvin", "frms_user_name": "张三", "frms_id_no": "123123432452452345" }, "Bill": [ { "uuid": "345sdfasdfasdfads", "user_id": "alvin", "card_id": "6222341234111324", "month": "2018-05", "new_charges": "1000" }, { "uuid": "345sdfasdfasdfads", "user_id": "alvin", "card_id": "6222341234111324", "month": "2018-05", "new_charges": "1000" } ] }
例如 输出json:
{"frms_user_id": "alvin", "yuqi_count": 2, "yuqi_rate": 0.8}

目前在slipStream 事件模式下的测试情况:
1. kafka输入stream
CREATE STREAM BillStream(jsonstr STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘’
TBLPROPERTIES(“topic”=“testStream”,“kafka.zookeeper”=“172.19.49.22:2181”,“kafka.broker.list”=“172.19.49.22:9092”, “timeformat”=“yyyy/MM/dd HH:mm:ss”);
将输入stream分成多个dstream, 比如ao_stream 用户信息stream, 账单stream, bill_stream;

2. 将输入流拆成多个DSTREAM
– AO_STREAM
CREATE STREAM AO_JSON_STREAM AS SELECT get_json_object(jsonstr, ‘$.AO’) AS ao_json FROM BillStream;
CREATE STREAM AO_STREAM AS SELECT get_json_object(ao_json, ‘$.frms_uuid’) AS frms_uuid, get_json_object(ao_json, ‘$.frms_user_id’)AS frms_user_id,
get_json_object(ao_json, ‘$.frms_user_name’) AS frms_user_name, get_json_object(ao_json, ‘$.frms_id_no’) AS frms_id_no FROM AO_JSON_STREAM;

– Bill_STREAM
CREATE STREAM Bill_JSON_STREAM AS SELECT get_json_object(jsonstr, ‘$.Bill’) AS bill_json FROM BillStream;
– 解析jsonarray
CREATE STREAM SPLIT_JSON_STREAM AS select explode(split(regexp_replace(regexp_extract(bill_json,’^\[(.+)\]$’,1),’\}\,\{’, ‘\}\|\|\{’),’\|\|’)) AS splitbilljson from Bill_JSON_STREAM;
– 创建bil_stream
CREATE STREAM BILL_STREAM AS SELECT get_json_object(splitbilljson, ‘$.uuid’) AS uuid, get_json_object(splitbilljson, ‘$.user_id’)AS user_id,
get_json_object(splitbilljson, ‘$.card_id’) AS card_id, get_json_object(splitbilljson, ‘$.month’) AS month,
get_json_object(splitbilljson, ‘$.new_charges’) AS new_charges FROM SPLIT_JSON_STREAM;

3. 计算中间指标
– 计算账单数量 (简单,复杂可能要用plsql?)
CREATE TABLE indicator1(user_id STRING, lost_count int);
INSERT INTO indicator1 SELECT user_id, count(1) FROM BILL_STREAM GROUP BY user_id;

CREATE TABLE indicator2(user_id STRING, cnt int);
INSERT INTO indicator2 SELECT user_id, count(1) FROM BILL_STREAM WHERE new_charges > ‘1000’ GROUP BY user_id;

4. 如何将这些指标合并到一个json,输出到kafka里面呢? 因为有些指标计算慢,有些指标计算的快,如何最终等所有指标计算完成,一起输出到kafka呢????? 或者有没有其他方案

求帮忙啊,不然要跑路了


#2

这个场景可以用UDF / UDTF去实现

insert into kafka_output select udf(jsonstr) from BillStream

UDF里面可以解析json数据,做一些自定义统计,最后把结果拼成json字符串输出。
这样应该是最方便高效的,比你多次解析json效率很多。

还有一种想法是,把json一次性解析为如下格式 (通过UDTF或者kafka decoder解析)

frms_uud,
frms_user_id,
frms_user_name,
frms_id_no,
bill_uuid,
bill_uesr_id,
bill_card_id,
bill_month,
bill_new_charges

即bill数组里,每一个bill为一行数据,同时每一行bill都保留AO里的数据

这样你就可以直接通过时间窗口group by去聚合做一些指标计算

select user_id, count(1), max(month), udf(xxx) from 

当然最后也是需要通过UDF将结果转成json字符串再流回kakfa的

你的现在的做法的问题是

  1. 把AO信息拆开了,回头还要再合并回去,不合算,不如直接在每一行bill都保留AO
  2. 用了多个流任务去计算多个指标,多流join只能在同样的时间窗口下进行,你这里有指标计算快慢问题,不适合,只能将指标输出到某个持久化层(或redis混存)再起其他任务定时去把这些指标结果取出来拼接成json,不效率。

#3

非常感谢你


在线客服
在线客服
微信客服
微信客服