作者:阿川那小子 | 来源:互联网 | 2023-08-18 18:35
注:将整个项目的数据处理过程,从数据采集到数据分析,再到结果数据的导出,一系列的任务分割成若干个oozie的工作流,并用coordinator进行协调。
工作流定义示例
Ooize配置片段示例,详见项目工程
1.日志预处理mr程序工作流定义
<workflow-app name&#61;"weblogpreprocess" xmlns&#61;"uri:oozie:workflow:0.4">
<start to&#61;"firstjob"/>
<action name&#61;"firstjob">
<map-reduce>
<job-tracker>${jobTracker}job-tracker>
<name-node>${nameNode}name-node>
<prepare>
<delete path&#61;"${nameNode}/${outpath}"/>
prepare>
<configuration>
<property>
<name>mapreduce.job.map.classname>
<value>cn.itcast.bigdata.hive.mr.WeblogPreProcess$WeblogPreProcessMappervalue>
property><property>
<name>mapreduce.job.output.key.classname>
<value>org.apache.hadoop.io.Textvalue>
property>
<property>
<name>mapreduce.job.output.value.classname>
<value>org.apache.hadoop.io.NullWritablevalue>
property><property>
<name>mapreduce.input.fileinputformat.inputdirname>
<value>${inpath}value>
property>
<property>
<name>mapreduce.output.fileoutputformat.outputdirname>
<value>${outpath}value>
property>
<property>
<name>mapred.mapper.new-apiname>
<value>truevalue>
property>
<property>
<name>mapred.reducer.new-apiname>
<value>truevalue>
property>configuration>
map-reduce>
<ok to&#61;"end"/>
<error to&#61;"kill"/>
2.数据加载etl工作流定义
<workflow-app xmlns&#61;"uri:oozie:workflow:0.5" name&#61;"hive2-wf">
<start to&#61;"hive2-node"/><action name&#61;"hive2-node">
<hive2 xmlns&#61;"uri:oozie:hive2-action:0.1">
<job-tracker>${jobTracker}job-tracker>
<name-node>${nameNode}name-node>
<configuration>
<property>
<name>mapred.job.queue.namename>
<value>${queueName}value>
property>
configuration>
<jdbc-url>jdbc:hive2://hdp-node-01:10000jdbc-url>
<script>script.qscript>
<param>input&#61;/weblog/outpre2param>
hive2>
<ok to&#61;"end"/>
<error to&#61;"fail"/>
action><kill name&#61;"fail">
<message>Hive2 (Beeline) action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]message>
kill>
<end name&#61;"end"/>
workflow-app>
3.数据加载工作流所用hive脚本
create database if not exists dw_weblog;
use dw_weblog;
drop table if exists t_orgin_weblog;
create table t_orgin_weblog(valid string,remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
http_referer string,
http_user_agent string)
row format delimited
fields terminated by &#39;\001&#39;;
load data inpath &#39;/weblog/preout&#39; overwrite into table t_orgin_weblog;drop table if exists t_ods_detail_tmp_referurl;
create table t_ods_detail_tmp_referurl as
SELECT a.*,b.*
FROM t_orgin_weblog a
LATERAL VIEW parse_url_tuple(regexp_replace(http_referer, "\"", ""), &#39;HOST&#39;, &#39;PATH&#39;,&#39;QUERY&#39;, &#39;QUERY:id&#39;) b as host, path, query, query_id;drop table if exists t_ods_detail;
create table t_ods_detail as
select b.*,substring(time_local,0,11) as daystr,
substring(time_local,13) as tmstr,
substring(time_local,4,3) as month,
substring(time_local,0,2) as day,
substring(time_local,13,2) as hour
from t_ods_detail_tmp_referurl b;drop table t_ods_detail_prt;
create table t_ods_detail_prt(
valid string,
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
http_referer string,
http_user_agent string,
host string,
path string,
query string,
query_id string,
daystr string,
tmstr string,
month string,
day string,
hour string)
partitioned by (mm string,dd string);insert into table t_ods_detail_prt partition(mm&#61;&#39;Sep&#39;,dd&#61;&#39;18&#39;)
select * from t_ods_detail where daystr&#61;&#39;18/Sep/2013&#39;;
insert into table t_ods_detail_prt partition(mm&#61;&#39;Sep&#39;,dd&#61;&#39;19&#39;)
select * from t_ods_detail where daystr&#61;&#39;19/Sep/2013&#39;;
工作流单元测试
1、工作流定义配置上传
[hadoop&#64;hdp-node-01 wf-oozie]$ hadoop fs -put hive2-etl /user/hadoop/oozie/myapps/
[hadoop&#64;hdp-node-01 wf-oozie]$ hadoop fs -put hive2-dw /user/hadoop/oozie/myapps/
[hadoop&#64;hdp-node-01 wf-oozie]$ ll
total 12
drwxrwxr-x. 2 hadoop hadoop 4096 Nov 23 16:32 hive2-dw
drwxrwxr-x. 2 hadoop hadoop 4096 Nov 23 16:32 hive2-etl
drwxrwxr-x. 3 hadoop hadoop 4096 Nov 23 11:24 weblog
[hadoop&#64;hdp-node-01 wf-oozie]$ export OOZIE_URL&#61;http://localhost:11000/oozie
2、工作流单元提交启动
oozie job -D inpath&#61;/weblog/input -D outpath&#61;/weblog/outpre -config weblog/job.properties -run
启动etl的hive工作流
oozie job -config hive2-etl/job.properties -run
启动pvs统计的hive工作流
oozie job -config hive2-dw/job.properties -run
3、工作流coordinator配置&#xff08;片段&#xff09;
多个工作流job用coordinator组织协调&#xff1a;
[hadoop&#64;hdp-node-01 hive2-etl]$ ll
total 28
-rw-rw-r--. 1 hadoop hadoop 265 Nov 13 16:39 config-default.xml
-rw-rw-r--. 1 hadoop hadoop 512 Nov 26 16:43 coordinator.xml
-rw-rw-r--. 1 hadoop hadoop 382 Nov 26 16:49 job.properties
drwxrwxr-x. 2 hadoop hadoop 4096 Nov 27 11:26 lib
-rw-rw-r--. 1 hadoop hadoop 1910 Nov 23 17:49 script.q
-rw-rw-r--. 1 hadoop hadoop 687 Nov 23 16:32 workflow.xml
config-default.xml:
<configuration>
<property>
<name>jobTrackername>
<value>hdp-node-01:8032value>
property>
<property>
<name>nameNodename>
<value>hdfs://hdp-node-01:9000value>
property>
<property>
<name>queueNamename>
<value>defaultvalue>
property>
configuration>
job.properties:
user.name&#61;hadoop
oozie.use.system.libpath&#61;true
oozie.libpath&#61;hdfs://hdp-node-01:9000/user/hadoop/share/lib
oozie.wf.application.path&#61;hdfs://hdp-node-01:9000/user/hadoop/oozie/myapps/hive2-etl/
workflow.xml:
<workflow-app xmlns&#61;"uri:oozie:workflow:0.5" name&#61;"hive2-wf">
<start to&#61;"hive2-node"/><action name&#61;"hive2-node">
<hive2 xmlns&#61;"uri:oozie:hive2-action:0.1">
<job-tracker>${jobTracker}job-tracker>
<name-node>${nameNode}name-node>
<configuration>
<property>
<name>mapred.job.queue.namename>
<value>${queueName}value>
property>
configuration>
<jdbc-url>jdbc:hive2://hdp-node-01:10000jdbc-url>
<script>script.qscript>
<param>input&#61;/weblog/outpre2param>
hive2>
<ok to&#61;"end"/>
<error to&#61;"fail"/>
action><kill name&#61;"fail">
<message>Hive2 (Beeline) action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]message>
kill>
<end name&#61;"end"/>
workflow-app>
coordinator.xml:
<coordinator-app name&#61;"cron-coord" frequency&#61;"${coord:minutes(5)}" start&#61;"${start}" end&#61;"${end}" timezone&#61;"Asia/Shanghai" xmlns&#61;"uri:oozie:coordinator:0.2">
<action>
<workflow>
<app-path>${workflowAppUri}app-path>
<configuration>
<property>
<name>jobTrackername>
<value>${jobTracker}value>
property>
<property>
<name>nameNodename>
<value>${nameNode}value>
property>
<property>
<name>queueNamename>
<value>${queueName}value>
property>
configuration>
workflow>
action>
coordinator-app>