SlipStream如何合并计算结果输出到kafka


#1

授信风控项目中,有这么个需求,不知道技术方案如何实现。
需求:
kafka输入一串json,json中带有用户的授信申请信息,包括用户信息、几个月的账单信息,运营商信息等,流入大数据平台, 进行计算出若干指标,比如用户的账单逾期次数,近3个月的还款率等,然后将指标和用户信息拼接成json,流出到kafka。
例如输入json:
{ "AO": { "frms_uuid": "adafsdfasdffaafasdffff", "frms_user_id": "alvin", "frms_user_name": "张三", "frms_id_no": "123123432452452345" }, "Bill": [ { "uuid": "345sdfasdfasdfads", "user_id": "alvin", "card_id": "6222341234111324", "month": "2018-05", "new_charges": "1000" }, { "uuid": "345sdfasdfasdfads", "user_id": "alvin", "card_id": "6222341234111324", "month": "2018-05", "new_charges": "1000" } ] }
例如 输出json:
{"frms_user_id": "alvin", "yuqi_count": 2, "yuqi_rate": 0.8}

目前在slipStream 事件模式下的测试情况:
1. kafka输入stream
CREATE STREAM BillStream(jsonstr STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘’
TBLPROPERTIES(“topic”=“testStream”,“kafka.zookeeper”=“172.19.49.22:2181”,“kafka.broker.list”=“172.19.49.22:9092”, “timeformat”=“yyyy/MM/dd HH:mm:ss”);
将输入stream分成多个dstream, 比如ao_stream 用户信息stream, 账单stream, bill_stream;

2. 将输入流拆成多个DSTREAM
– AO_STREAM
CREATE STREAM AO_JSON_STREAM AS SELECT get_json_object(jsonstr, ‘$.AO’) AS ao_json FROM BillStream;
CREATE STREAM AO_STREAM AS SELECT get_json_object(ao_json, ‘$.frms_uuid’) AS frms_uuid, get_json_object(ao_json, ‘$.frms_user_id’)AS frms_user_id,
get_json_object(ao_json, ‘$.frms_user_name’) AS frms_user_name, get_json_object(ao_json, ‘$.frms_id_no’) AS frms_id_no FROM AO_JSON_STREAM;

– Bill_STREAM
CREATE STREAM Bill_JSON_STREAM AS SELECT get_json_object(jsonstr, ‘$.Bill’) AS bill_json FROM BillStream;
– 解析jsonarray
CREATE STREAM SPLIT_JSON_STREAM AS select explode(split(regexp_replace(regexp_extract(bill_json,’^\[(.+)\]$’,1),’\}\,\{’, ‘\}\|\|\{’),’\|\|’)) AS splitbilljson from Bill_JSON_STREAM;
– 创建bil_stream
CREATE STREAM BILL_STREAM AS SELECT get_json_object(splitbilljson, ‘$.uuid’) AS uuid, get_json_object(splitbilljson, ‘$.user_id’)AS user_id,
get_json_object(splitbilljson, ‘$.card_id’) AS card_id, get_json_object(splitbilljson, ‘$.month’) AS month,
get_json_object(splitbilljson, ‘$.new_charges’) AS new_charges FROM SPLIT_JSON_STREAM;

3. 计算中间指标
– 计算账单数量 (简单,复杂可能要用plsql?)
CREATE TABLE indicator1(user_id STRING, lost_count int);
INSERT INTO indicator1 SELECT user_id, count(1) FROM BILL_STREAM GROUP BY user_id;

CREATE TABLE indicator2(user_id STRING, cnt int);
INSERT INTO indicator2 SELECT user_id, count(1) FROM BILL_STREAM WHERE new_charges > ‘1000’ GROUP BY user_id;

4. 如何将这些指标合并到一个json,输出到kafka里面呢? 因为有些指标计算慢,有些指标计算的快,如何最终等所有指标计算完成,一起输出到kafka呢????? 或者有没有其他方案

求帮忙啊,不然要跑路了


在线客服
在线客服
微信客服
微信客服