0%

hive存储过程HplSQL使用示例

在Hive开发中,要实现全面的 ETL、报告、分析和数据挖掘流程,不仅需要 MapReduce、Spark 或 Tez 等分布式处理引擎, 还需要一种表达全面业务规则的方法。比如可能存在满足某个条件再执行的脚本,虽然Azkaban调度中可以实现,但并不灵活,这时HplSQL就派上用场了!

官网:http://www.hplsql.org/download

配置:https://www.cnblogs.com/guotianqi/p/8041636.html

HPL/SQL尽量支持所有广泛使用的过程语言的语法。您无需从头开始学习新的程序语言。这有助于开发新代码以及将现有代码库迁移到Hadoop。 虽然官网是这么描述的,但实际使用发现坑也很多,比如sql文中嵌入变量。下面是我的使用示例,包含动态传参、流程控制、变量定义赋值等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
CREATE PROCEDURE controller(
--------------------------- 后推背包礼物->抽取 ----------------------------
-- 创建者:呼松涛
-- 创建时间:20211018
-- 上次修改:20211118
-- 目标表:dws_fin_goods_sna_d、dws_fin_user_goods_sna_d
-- 对应存储过程:usp_auto_userGoods
-- 备注:该作业存在IF判断,故使用存储过程。
-------------------------------------------------------------------------------
@engine string
)

BEGIN

DECLARE var_date1 INT,var_date2 INT,var_date3 INT;
SELECT var_date1 = NVL(MAX(dateKey),0) FROM mht.dwd_fin_user_package_sna_d;
SELECT var_date2 = NVL(default.fn_datekey_add(var_date1,1),0);
SELECT var_date3 = NVL(MAX(dateKey),0) FROM mht.dws_fin_user_goods_sna_d;

print '变量var_date1 =' || var_date1 || '';
print '变量var_date2 =' || var_date2 || '';
print '变量var_date3 =' || var_date3 || '';


IF var_date3 < var_date1 THEN
print '满足执行条件:var_date3 < var_date1 ,即将执行hql脚本!';

DECLARE sqlA STRING;
sqlA='set hive.execution.engine=' || @engine || ';' ;
print sqlA;
hive -e sqlA;

DECLARE sqlB STRING;
sqlB= '
drop table if exists tmp.tmp_mht_dws_snap_d_userGoods_tbl_a;
drop table if exists tmp.tmp_mht_dws_snap_d_userGoods_tbl_b;
drop table if exists tmp.tmp_mht_dws_snap_d_userGoods_tbl_c;
drop table if exists tmp.tmp_mht_dws_snap_d_userGoods_tbl_d;


create table tmp.tmp_mht_dws_snap_d_userGoods_tbl_a AS
SELECT *
FROM mht.dws_fin_user_goods_sna_d
WHERE endKey = ' || var_date1 || '
;
create table tmp.tmp_mht_dws_snap_d_userGoods_tbl_b AS
SELECT *
FROM mht.dwd_fin_user_package_sna_d
WHERE dt = ' || var_date1 || '
;
create table tmp.tmp_mht_dws_snap_d_userGoods_tbl_c AS
SELECT userId,
goodsId,
SUM(amount) AS goodsCount,
SUM(CAST(NVL(amount, 0) AS BIGINT)
* CAST(NVL(goodsPrice, 0) AS BIGINT)) AS totlePrice
FROM tmp.tmp_mht_dws_snap_d_userGoods_tbl_b
GROUP BY userId,
goodsId
;
create table tmp.tmp_mht_dws_snap_d_userGoods_tbl_d AS
SELECT a.*
FROM tmp.tmp_mht_dws_snap_d_userGoods_tbl_a a
INNER JOIN tmp.tmp_mht_dws_snap_d_userGoods_tbl_c c ON c.userId = a.userId
AND c.goodsId = a.goodsId
WHERE a.goodsCount = c.goodsCount
AND a.totlePrice = c.totlePrice
;'

print sqlB;
hive -e sqlB;


DECLARE sqlB1 STRING;
sqlB1= '
--1.存在且一样(只更endkey)
INSERT OVERWRITE TABLE mht.dws_fin_user_goods_sna_d
SELECT
b.dateKey,
b.userId,
b.goodsId,
b.goodsCount,
b.totlePrice,
IF(d.userId IS NOT NULL,' || var_date2 || ',b.endKey) AS endKey,
b.createTime,
b.updateTime
FROM mht.dws_fin_user_goods_sna_d b
LEFT JOIN tmp.tmp_mht_dws_snap_d_userGoods_tbl_d d
ON d.userId = b.userId AND d.goodsId = b.goodsId AND b.endKey = ' || var_date1 || '
;



--2.存在不一样(插入)
INSERT INTO TABLE mht.dws_fin_user_goods_sna_d
SELECT
' || var_date1 || ' AS dateKey,
b.userId,
b.goodsId,
b.goodsCount,
b.totlePrice,
' || var_date2 || ' AS endKey,
CURRENT_TIMESTAMP() AS createTime,
CURRENT_TIMESTAMP() AS updateTime
FROM tmp.tmp_mht_dws_snap_d_userGoods_tbl_a a
JOIN tmp.tmp_mht_dws_snap_d_userGoods_tbl_c b
ON b.userId = a.userId AND b.goodsId = a.goodsId
WHERE a.goodsCount <> b.goodsCount
AND a.totlePrice <> b.totlePrice
;

--3.不存在(插入)
INSERT INTO TABLE mht.dws_fin_user_goods_sna_d
SELECT
' || var_date1 || ' AS dateKey,
a.userId,
a.goodsId,
a.goodsCount,
a.totlePrice,
' || var_date2 || ' AS endKey,
CURRENT_TIMESTAMP() AS createTime,
CURRENT_TIMESTAMP() AS updateTime
FROM tmp.tmp_mht_dws_snap_d_userGoods_tbl_c a
WHERE NOT EXISTS
( SELECT 1
FROM tmp.tmp_mht_dws_snap_d_userGoods_tbl_a b
WHERE b.userId = a.userId
AND b.goodsId = a.goodsId )
;'

print sqlB1;
hive -e sqlB1;


DECLARE sqlC STRING;
sqlC='
--平台背包期初
INSERT OVERWRITE TABLE mht.dws_fin_goods_sna_d partition(dt)
SELECT
' || var_date1 || ' AS dateKey,
a.goodsId,
SUM(NVL(a.goodsCount, 0)), --总个数
SUM(NVL(a.totlePrice, 0)), --总金额
------------------------------------------------
SUM(IF(a.userId IN ( 10326832, 10326836 ), 0, NVL(a.goodsCount,
0))), --排除系统账号总个数
SUM(IF(a.userId IN ( 10326832, 10326836 ), 0, NVL(a.totlePrice,
0))), --排除系统账号总金额
------------------------------------------------
SUM(IF(a.userId IN ( 10326832, 10326836 ), NVL(a.goodsCount,
0), 0)), --系统账号总个数
SUM(IF(a.userId IN ( 10326832, 10326836 ), NVL(a.totlePrice,
0), 0)), --系统账号总金额
CURRENT_TIMESTAMP() AS createTime,
' || var_date1 || ' AS dt
FROM mht.dws_fin_user_goods_sna_d a
WHERE a.dateKey <= ' || var_date1 || '
AND a.endKey > ' || var_date1 || '
GROUP BY a.goodsId;'

print sqlC;
hive -e sqlC;

ELSE
print '不满足执行条件:var_date3 < var_date1 ,即将退出!';
END IF;
END;

调用时命令:

1
/usr/local/hplsql-0.3.31/hplsql -f /usr/local/hive-2.3.9/workspace/sqlText/hpl/proc_user_goods_sna.sql -main controller -d @engine=tez