ETL整体架构设计
ETL指的是 Extraction, Transformation 与 Loading。
Extraction 指的是如何将数从来源端 (Source System) 中截取出来;
Transformation 指的是在截取出来的数据格式与数据仓储所需要的数据做转换;
Loading 指的是将数据加载至大数据平台中;
ETL服务器的作用有
(1)从源端抽取数据;
(2)加载数据到大数据集群;
(3)向集群下达数据处理的指令;
(4)ETL任务调度;
(5)数据预处理。 部分数据会在ETL服务器上先做预处理
平台整个体系架构分为以下几个层次:
1)源数据层:数据是整个大数据平台架构规划的基础,存储的批量数据包括存储网关委托日志、成交日志、行情数据和持仓数据等。
数据采集/接入:大数接平台通过FTP服务采集上游数据接口文件,利用大数据分布式数据库集群数据导入组件装载到大数据平台。
数据存储:数据存储主要采用HDFS存储,包括贴源数据、基础数据区、集市数据区、归档数据区,分别满足不同层面的数据应用要求。
数据计算:数据计算和查询引擎采用基于分布式架构的组件,该组件支持分布式批量计算、实时计算,以期满足数据预处理加工清洗和批处理计算要求。
数据服务:按照数据应用要求(业务需求驱动),构建不同的数据类查询、统计分析类应用。
具体表结构请参见“第2章\4.资料\ETL\表结构设计.docx”
1、启动HDFS
2、启动YARN集群
3、启动 metastore
nohup hive --service metastore &
4、启动 hiverserver2
nohup hive --service hiveserver2 &
5、启动Yarn history server
mr-jobhistory-daemon.sh start historyserver
5.3.1.上传数据文件
5.3.1.1.上传建表脚本
在服务器磁盘目录创建文件夹:
mkdir -p /export/servers/tmp/create_table
将建表脚本“4.资料\ETL\建表脚本”中的:
(ods层)create_table_ods.sql
(dm层)create_table_dw.sql
(dm层)create_table_dm.sql
上传至服务器磁盘目录:/export/servers/tmp/create_table
5.3.1.2.上传ODS层文本数据
1.源文件对应的数据表
名称 表名 文件名
上证指数日交易信息表 maynor_quot_ods.tra_index_day_info ssIndexTxDay.txt
深证指数日交易信息表 maynor_quot_ods.tra_index_day_info_sz szIndexTxDay.txt
板块基本信息表 maynor_quot_ods.pd_sector_base_info sectorInfo.txt
上证个股日交易信息表 maynor_quot_ods.tra_stock_day_info ssStockDayDtl.txt
深证个股日交易信息表 maynor_quot_ods.tra_stock_day_info_sz szStockDayDtl.txt
交易日历表 maynor_quot_ods.tra_tcc_date data-2020.txt
个股证券基本信息 maynor_quot_ods.pd_stock_sec_info stockSecInfo.txt
万德数据证券基础信息 maynor_quot_ods.pd_wind_base_info windSecInfo.txt
指数日流通基本信息 maynor_quot_ods.pd_index_sec_info indexSecInfo.txt
2.上传数据源文件到服务器目录
在服务器磁盘目录创建数据源文件夹:
mkdir -p /export/servers/tmp/ods_table_source
上传“4.资料\ETL\ODS层数据源”中的所有txt文件到服务器目录:/export/servers/tmp/ods_table_source
5.3.2.建库建表
5.3.2.1.创建数据库
CREATE DATABASE IF NOT EXISTS maynor_quot_ods
;
CREATE DATABASE IF NOT EXISTS maynor_quot_dw
;
CREATE DATABASE IF NOT EXISTS maynor_quot_dm
;
5.3.2.2.创建ods层数据表
执行命令:
cd /export/servers/tmp/create_table
hive -f create_table_ods.sql
5.3.2.3.创建dw层数据表
执行命令:
cd /export/servers/tmp/create_table
hive -f create_table_dw.sql
5.3.2.4.创建dm层数据表
执行命令:
cd /export/servers/tmp/create_table
hive -f create_table_dm.sql
5.3.3.加载ODS层表数据
方式一:手动加载每一条命令
load data local inpath ‘/export/servers/tmp/ods_table_source/stockSecInfo.txt’ overwrite into table pd_stock_sec_info;
load data local inpath ‘/export/servers/tmp/ods_table_source/data-2020.txt’ into table tra_tcc_date ;
load data local inpath ‘/export/servers/tmp/ods_table_source/indexSecInfo.txt’ overwrite into table pd_index_sec_info;
load data local inpath ‘/export/servers/tmp/ods_table_source/windSecInfo.txt’ overwrite into table pd_wind_base_info;
load data local inpath ‘/export/servers/tmp/ods_table_source/ssStockDayDtl.txt’ overwrite into table tra_stock_day_info ;
load data local inpath ‘/export/servers/tmp/ods_table_source/szStockDayDtl.txt’ overwrite into table tra_stock_day_info_sz ;
load data local inpath ‘/export/servers/tmp/ods_table_source/ssIndexTxDay.txt’ overwrite into table tra_index_day_info ;
load data local inpath ‘/export/servers/tmp/ods_table_source/szIndexTxDay.txt’ overwrite into table tra_index_day_info_sz ;
load data local inpath ‘/export/servers/tmp/ods_table_source/sectorInfo.txt’ overwrite into table pd_sector_base_info;
方式二:脚本一键加载
1.创建服务器脚本目录:
mkdir -p /export/servers/tmp/shell
2.上传脚本到服务器目录
将“4.资料\ETL\ODS层数据加载脚本\load_ods_table.sh”上传到服务文件路径:/export/servers/tmp/shell
3.脚本赋权限
脚本赋执行权限:chmod 755 load_ods_table.sh
4.执行脚本
sh load_ods_table.sh
5.4.ods数据到dw层
5.4.1.沪市每日证券收盘明细表
1、关联ods层沪市个股交易信息、个股证券基本信息和万德数据证券基础信息表,将数据插入到日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
上证个股日交易信息表(tra_stock_day_info)与个股证券基本信息表(pd_stock_sec_info)和万德数据证券基础信息表(pd_wind_base_info)通过证券代码(sec_code)进行关联,组成沪市每日证券收盘明细表(sum_stock_close_dtl_day)。
新表字段如下:
列名 字段 类型 来源表
交易日期 trade_date date tra_stock_day_info
证券代码 sec_code char(6) tra_stock_day_info
证券简称 sec_abbr varchar(8) tra_stock_day_info
昨日收盘价 last_close_price decimal(8,2) tra_stock_day_info
当日开盘价 cur_open_price decimal(8,2) tra_stock_day_info
当日收盘价 cur_high_price decimal(8,2) tra_stock_day_info
当日最高价 cur_low_price decimal(8,2) tra_stock_day_info
当日最低价 cur_close_price decimal(8,2) tra_stock_day_info
当日成交量 cur_trade_vol decimal(18,0) tra_stock_day_info
当日成交金额 cur_trade_amt decimal(18,2) tra_stock_day_info
总股本 tot_cap decimal(18,0) pd_stock_sec_info
流通股本 nego_cap decimal(18,0) pd_stock_sec_info
信用评级 credit_level varchar(20) pd_stock_sec_info
是否已上市 if_list char(1) pd_wind_base_info
交易货币代码 trade_curr_code varchar(20) pd_wind_base_info
交易所代码 exch_code varchar(10) pd_wind_base_info
备注:ODS数据表中的trade_date字段为历史数据,如果想获取T-1日日期,可通过date_sub(current_date,1) 获取。
参考代码:
insert overwrite table maynor_quot_dw.sum_stock_close_dtl_day partition (dt=‘20200704’)
select
a.trade_date
,a.sec_code
,a.sec_abbr
,a.last_close_price
,a.cur_open_price
,a.cur_high_price
,a.cur_low_price
,a.cur_close_price
,a.cur_trade_vol
,a.cur_trade_amt
,b.tot_cap
,b.nego_cap
,b.credit_level
,c.if_list
,c.trade_curr_code
,c.exch_code
from maynor_quot_ods.tra_stock_day_info a
join maynor_quot_ods.pd_stock_sec_info b
on a.sec_code = b.sec_code
join maynor_quot_ods.pd_wind_base_info c
on a.sec_code = c.sec_code;
查询结果数据:
select * from maynor_quot_dw.sum_stock_close_dtl_day limit 5;
删除分区表数据:
alter table maynor_quot_dw.sum_stock_close_dtl_day drop partition(dt=‘20200704’);
5.4.2.深市每日证券收盘明细表
1、关联ods层深市个股交易信息和基础信息,将数据插入到日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
深证个股日交易信息表(tra_stock_day_info_sz)与个股证券基本信息表(pd_stock_sec_info)和万德数据证券基础信息表(pd_wind_base_info)通过证券代码(sec_code)进行关联,组成深市每日证券收盘明细表(sum_stock_close_dtl_day_sz)。
新表字段如下:
列名 字段 类型 来源表
交易日期 trade_date date tra_stock_day_info_sz
证券代码 sec_code char(6) tra_stock_day_info_sz
证券简称 sec_abbr varchar(8) tra_stock_day_info_sz
昨日收盘价 last_close_price decimal(8,2) tra_stock_day_info_sz
当日开盘价 cur_open_price decimal(8,2) tra_stock_day_info_sz
当日收盘价 cur_high_price decimal(8,2) tra_stock_day_info_sz
当日最高价 cur_low_price decimal(8,2) tra_stock_day_info_sz
当日最低价 cur_close_price decimal(8,2) tra_stock_day_info_sz
当日成交量 cur_trade_vol decimal(18,0) tra_stock_day_info_sz
当日成交金额 cur_trade_amt decimal(18,2) tra_stock_day_info_sz
总股本 tot_cap decimal(18,0) pd_stock_sec_info
流通股本 nego_cap decimal(18,0) pd_stock_sec_info
信用评级 credit_level varchar(20) pd_stock_sec_info
是否已上市 if_list char(1) pd_wind_base_info
交易货币代码 trade_curr_code varchar(20) pd_wind_base_info
交易所代码 exch_code varchar(10) pd_wind_base_info
参考代码:
insert overwrite table maynor_quot_dw.sum_stock_close_dtl_day_sz partition (dt=‘20200704’)
select
a.trade_date
,a.sec_code
,a.sec_abbr
,a.last_close_price
,a.cur_open_price
,a.cur_high_price
,a.cur_low_price
,a.cur_close_price
,a.cur_trade_vol
,a.cur_trade_amt
,b.tot_cap
,b.nego_cap
,b.credit_level
,c.if_list
,c.trade_curr_code
,c.exch_code
from maynor_quot_ods.tra_stock_day_info_sz a
join maynor_quot_ods.pd_stock_sec_info b
on a.sec_code = b.sec_code
join maynor_quot_ods.pd_wind_base_info c
on a.sec_code = c.sec_code;
查询结果数据:
select * from maynor_quot_dw.sum_stock_close_dtl_day_sz limit 5;
5.4.3.板块个股对应基本信息表
1、关联ods层板块基本信息表、万德数据证券基础信息和个股证券基本信息,将数据插入到日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
板块基本信息表(pd_sector_base_info)与万德数据证券基础信息表(pd_wind_base_info)通过证券id(sec_id)关联;再通过证券代码(sec_code)关联个股证券基本信息表(pd_stock_sec_info),按照算法描述过滤数据,最后组成板块个股对应基本信息表。(sum_sector_stock_base_info)。
过滤条件:1、行业板块代码以85开头
2、只筛选行业板块数据中板块类型(sec_type_code)为A股数据
新表字段如下:
列名 字段 类型 来源表
更新时间 etl_time timestamp
交易日期 trade_date date pd_sector_base_info
板块代码 sector_code varchar (6) pd_sector_base_info
板块名称 sector_name varchar (20) pd_sector_base_info
板块类型 sector_type char (1) pd_sector_base_info
个股代码 sec_code char (6) pd_wind_base_info
个股流通股本 nego_cap decimal(18,0) pd_stock_sec_info
证券上市地点简称 sec_abbr varchar (10) pd_sector_base_info
参考代码:
insert overwrite table maynor_quot_dw.sum_sector_stock_base_info partition(dt=‘20200719’)
select
current_timestamp,
a.trade_date ,
a.sector_code,
a.sector_name ,
a.sector_type,
b.sec_code ,
c.nego_cap ,
a.sec_abbr
from maynor_quot_ods.pd_sector_base_info a
join maynor_quot_ods.pd_wind_base_info b
on a.sec_id = b.sec_id
join maynor_quot_ods.pd_stock_sec_info c
on b.sec_code=c.sec_code
where a.sector_code like ‘85%’
and b.sec_type_code=‘A’
;
查询结果数据:
select * from maynor_quot_dw.sum_sector_stock_base_info limit 5;
5.4.4.沪市指数日收盘明细表
1、关联ods层上证指数日交易信息表和指数日流通基本信息表,将数据插入到日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
上证指数日交易信息表(tra_index_day_info)与指数日流通基本信息(pd_index_sec_info)通过指数代码(index_code)进行关联,组成沪市指数日收盘明细表(sum_index_close_dtl_day)。
新表字段如下:
列名 字段 类型 来源表
交易日期 trade_date date tra_index_day_info
指数代码 index_code char(6) tra_index_day_info
指数名称 index_name varchar(20) tra_index_day_info
当日成交量 cur_trade_vol decimal(18,0) tra_index_day_info
当日成交金额 cur_trade_amt decimal(18,2) tra_index_day_info
昨日收盘价 last_close_price decimal(8,2) tra_index_day_info
当日开盘价 cur_open_price decimal(8,2) tra_index_day_info
当日最高价 cur_high_price decimal(8,2) tra_index_day_info
当日最低价 cur_low_price decimal(8,2) tra_index_day_info
当日收盘价 cur_close_price decimal(8,2) tra_index_day_info
涨跌幅 rf_range varchar(10) pd_index_sec_info
涨跌额 rf_val decimal(8,2) pd_index_sec_info
振幅 swing_range varchar(10) pd_index_sec_info
量比 volumn_ratio decimal(8,2) pd_index_sec_info
参考代码:
INSERT overwrite TABLE maynor_quot_dw.sum_index_close_dtl_day PARTITION (dt=‘20200704’)
SELECT
a.trade_date
,a.index_code
,a.index_name
,a.cur_trade_vol
,a.cur_trade_amt
,a.last_close_price
,a.cur_open_price
,a.cur_high_price
,a.cur_low_price
,a.cur_close_price
,b.rf_range
,b.rf_val
,b.swing_range
,b.volumn_ratio
FROM
maynor_quot_ods.tra_index_day_info a
INNER JOIN
maynor_quot_ods.pd_index_sec_info b
ON
a.index_code = b.index_code;
查询结果数据:
select * from maynor_quot_dw.sum_index_close_dtl_day limit 5;
5.4.5.深市指数日收盘明细表
1、关联ods层深证指数日交易信息表和指数日流通基本信息表,将数据插入到日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
深证指数日交易信息表(tra_index_day_info_sz)与指数日流通基本信息(pd_index_sec_info)通过指数代码(index_code)进行关联,组成深市指数日收盘明细表(sum_index_close_dtl_day_sz)。
新表字段如下:
列名 字段 类型 来源表
交易日期 trade_date date tra_index_day_info_sz
指数代码 index_code char(6) tra_index_day_info_sz
指数名称 index_name varchar(20) tra_index_day_info_sz
当日成交量 cur_trade_vol decimal(18,0) tra_index_day_info_sz
当日成交金额 cur_trade_amt decimal(18,2) tra_index_day_info_sz
昨日收盘价 last_close_price decimal(8,2) tra_index_day_info_sz
当日开盘价 cur_open_price decimal(8,2) tra_index_day_info_sz
当日最高价 cur_high_price decimal(8,2) tra_index_day_info_sz
当日最低价 cur_low_price decimal(8,2) tra_index_day_info_sz
当日收盘价 cur_close_price decimal(8,2) tra_index_day_info_sz
涨跌幅 rf_range varchar(10) pd_index_sec_info
涨跌额 rf_val decimal(8,2) pd_index_sec_info
振幅 swing_range varchar(10) pd_index_sec_info
量比 volumn_ratio decimal(8,2) pd_index_sec_info
参考代码:
INSERT overwrite TABLE maynor_quot_dw.sum_index_close_dtl_day_sz PARTITION (dt=‘20200704’)
SELECT
a.trade_date
,a.index_code
,a.index_name
,a.cur_trade_vol
,a.cur_trade_amt
,a.last_close_price
,a.cur_open_price
,a.cur_high_price
,a.cur_low_price
,a.cur_close_price
,b.rf_range
,b.rf_val
,b.swing_range
,b.volumn_ratio
FROM
maynor_quot_ods.tra_index_day_info_sz a
INNER JOIN
maynor_quot_ods.pd_index_sec_info b
ON
a.index_code = b.index_code;
查询结果数据:
select * from maynor_quot_dw.sum_index_close_dtl_day_sz limit 5;
5.5.dw层数据到dm层
5.5.1.个股K线表
1、沪、深两市每日证券收盘明细表关联交易日历表,将数据插入到个股K线表日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
沪市每日证券收盘明细表(sum_stock_close_dtl_day)、深市每日证券收盘明细表(sum_stock_close_dtl_day_sz)和交易日历表(tra_tcc_date)通过交易时间(trade_date)进行关联,组成个股K线表(app_sec_quot_stock_kline)。
过滤条件:1、上证股票代码以600和900开头,深证股票代码以00、200、300开头
2、沪、深两市每日证券收盘明细表T日分区数据
注意事项:查询表字段与数据保存字段是否一致。
新表字段如下:
列名 字段 类型 来源表
更新时间 update_time timestamp
交易时间 trade_date date 沪、深两市每日证券收盘明细表
证券代码 sec_code char(6) 沪、深两市每日证券收盘明细表
证券名称 sec_name varchar(20) 沪、深两市每日证券收盘明细表
k线类型 k_line_type char(1) 日K(1)、周K(2)、月K(3)
前收盘价 pre_close_price decimal(8,2) 沪、深两市每日证券收盘明细表
开盘价 open_price decimal(8,2) 沪、深两市每日证券收盘明细表
最高价 high_price decimal(8,2) 沪、深两市每日证券收盘明细表
最低价 low_price decimal(8,2) 沪、深两市每日证券收盘明细表
收盘价 close_price decimal(8,2) 沪、深两市每日证券收盘明细表
均价 avgprice decimal(8,2) 成交额/成交量
成交量 trade_vol decimal(18,0) 沪、深两市每日证券收盘明细表
成交金额 trade_amt decimal(18,2) 沪、深两市每日证券收盘明细表
参考代码:
–沪市
INSERT INTO TABLE maynor_quot_dm.app_sec_quot_stock_kline PARTITION (dt=‘20200901’)
SELECT
CURRENT_TIMESTAMP,
a.trade_date ,
a.sec_code ,
a.sec_abbr ,
‘1’,
a.last_close_price,
a.cur_open_price ,
a.cur_high_price ,
a.cur_low_price ,
a.cur_close_price,
CASE WHEN a.cur_trade_vol =0 THEN 0 ELSE ROUND(a.cur_trade_amt/a.cur_trade_vol,2) END AS avgprice,
a.cur_trade_vol,
a.cur_trade_amt
FROM
maynor_quot_dw.sum_stock_close_dtl_day a
JOIN maynor_quot_ods.tra_tcc_date b
ON a.trade_date = b.trade_date
WHERE
a.sec_code LIKE ‘60%’ OR a.sec_code LIKE ‘90%’
AND a.dt = ‘20200901’ ;
–深市
insert into table maynor_quot_dm.app_sec_quot_stock_kline partition (dt=‘20200901’)
select
current_timestamp,
a.trade_date ,
a.sec_code ,
a.sec_abbr ,
‘1’,
a.last_close_price,
a.cur_open_price ,
a.cur_high_price ,
a.cur_low_price ,
a.cur_close_price,
case when a.cur_trade_vol =0 then 0 else round(a.cur_trade_amt/a.cur_trade_vol,2) end as avgprice,
a.cur_trade_vol,
a.cur_trade_amt
from
maynor_quot_dw.sum_stock_close_dtl_day_sz a
join maynor_quot_ods.tra_tcc_date b
on a.trade_date = b.trade_date
where
a.sec_code like ‘00%’ or a.sec_code like ‘20%’ or a.sec_code like ‘30%’
and a.dt = ‘20200901’ ;
查询结果数据:
select * from maynor_quot_dm.app_sec_quot_stock_kline limit 5;
5.5.2.板块成分股表
1、关联板块个股对应基本信息表、个股K线表,将数据插入到日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
板块个股对应基本信息表(sum_sector_stock_base_info)与个股K线表(app_sec_quot_stock_kline)通过证券代码(sec_code)进行关联,组成板块成分股表(app_sec_sector_stock)。
过滤条件:1、行业板块代码以85开头
2、个股K线表T日分区数据
3、K线类型是日K(1)
新表字段如下:
列名 字段 类型 来源表
更新时间 etl_time timestamp
交易日期 trade_date date sum_sector_stock_base_info
板块代码 sector_code varchar (6) sum_sector_stock_base_info
板块名称 sector_name varchar (20) sum_sector_stock_base_info
板块类型 sector_type char (1) sum_sector_stock_base_info
个股代码 sec_code char (6) sum_sector_stock_base_info
个股流通股本 nego_cap decimal(18,0) sum_sector_stock_base_info
前一交易日个股流通市值 pre_nego_cap decimal(18,2) 收盘价个股流通股本
前一交易日板块总流通市值 pre_sector_nego_cap decimal(18,2) 板块下个股数据汇总
SUM(收盘价个股流通股本)
证券上市地点简称 sec_abbr varchar (10) sum_sector_stock_base_info
参考代码:
INSERT overwrite TABLE maynor_quot_dm.app_sec_sector_stock PARTITION (dt=‘20200902’)
SELECT
CURRENT_TIMESTAMP AS etl_time
,a.trade_date
,a.sector_code
,a.sector_name
,a.sector_type
,a.sec_code
,a.nego_cap
,a.nego_cape.close_price AS pre_nego_cap
,SUM(a.nego_cape.close_price) over(PARTITION BY a.sector_code) AS pre_sector_nego_cap
,a.sec_abbr
FROM maynor_quot_dw.sum_sector_stock_base_info a
JOIN maynor_quot_dm.app_sec_quot_stock_kline e
ON a.sec_code=e.sec_code
AND e.k_line_type=‘1’
WHERE a.sector_code LIKE ‘85%’
AND e.dt= ‘20200902’
;
查询结果数据:
select * from maynor_quot_dm.app_sec_sector_stock limit 5;
5.5.3.指数K线表
1、沪、深两市指数日收盘明细表关联交易日历表,将数据插入到指数K线表日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
沪市指数日收盘明细表(sum_index_close_dtl_day)、深市指数日收盘明细表(sum_index_close_dtl_day_sz)和交易日历表(tra_tcc_date)进行关联,组成指数K线表(app_sec_quot_index_kline)。
过滤条件:1、过滤沪市指数代码以00开头,深市指数以39开头的指数行情数据;
2、沪、深两市指数日收盘明细表T日分区数据
注意事项:查询表字段与数据保存字段是否一致。
新表字段如下:
列名 字段 类型 来源表
更新时间 update_time timestamp
交易时间 trade_date date 沪、深两市指数日收盘明细表
指数代码 index_code char(6) 沪、深两市指数日收盘明细表
指数名称 index_name varchar(20) 沪、深两市指数日收盘明细表
k线类型 k_line_type char(1) 日K(1)、周K(2)、月K(3)
前收盘价 pre_close_price decimal(8,2) 沪、深两市指数日收盘明细表
开盘价 open_price decimal(8,2) 沪、深两市指数日收盘明细表
最高价 high_price decimal(8,2) 沪、深两市指数日收盘明细表
最低价 low_price decimal(8,2) 沪、深两市指数日收盘明细表
收盘价 close_price decimal(8,2) 沪、深两市指数日收盘明细表
均价 avgprice decimal(8,2) 成交额/成交量
成交量 trade_vol decimal(18,0) 沪、深两市指数日收盘明细表
成交金额 trade_amt decimal(18,2) 沪、深两市指数日收盘明细表
参考代码:
INSERT overwrite TABLE maynor_quot_dm.app_sec_quot_index_kline PARTITION (dt=‘20200704’)
SELECT
CURRENT_TIMESTAMP AS update_time
,a.trade_date AS trade_date
,a.index_code
,a.index_name
,‘1’ AS k_line_type
,a.last_close_price AS pre_close_price
,a.cur_open_price AS open_price
,a.cur_high_price AS high_price
,a.cur_low_price AS low_price
,a.cur_close_price AS close_price
,CASE WHEN a.cur_trade_vol=0 THEN 0 ELSE ROUND(a.cur_trade_amt/a.cur_trade_vol,2) END AS avgprice
,a.cur_trade_vol AS trade_vol
,a.cur_trade_amt AS trade_amt
FROM maynor_quot_dw.sum_index_close_dtl_day a
JOIN maynor_quot_ods.tra_tcc_date b
ON a.trade_date=b.trade_date
WHERE
a.index_code LIKE ‘00%’
AND a.dt = ‘20190905’
UNION ALL
SELECT
CURRENT_TIMESTAMP AS update_time
,a.trade_date AS trade_date
,a.index_code
,a.index_name
,‘1’ AS k_line_type
,a.last_close_price AS pre_close_price
,a.cur_open_price AS open_price
,a.cur_high_price AS high_price
,a.cur_low_price AS low_price
,a.cur_close_price AS close_price
,CASE WHEN a.cur_trade_vol=0 THEN 0 ELSE ROUND(a.cur_trade_amt/a.cur_trade_vol,2) END AS avgprice
,a.cur_trade_vol AS trade_vol
,a.cur_trade_amt AS trade_amt
FROM maynor_quot_dw.sum_index_close_dtl_day_sz a
JOIN maynor_quot_ods.tra_tcc_date b
ON a.trade_date=b.trade_date
WHERE
a.index_code LIKE ‘39%’
and a.dt = ‘20190905’ ;
查询结果数据:
select * from maynor_quot_dm.app_sec_quot_index_kline limit 5;
数据同步是将hive表中的数据同步到mysql中,这里数据同步技术组件我们采用的是Sqoop。
Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
6.1.板块成分股
进入sqoop服务器安装目录:cd /export/servers/sqoop-1.4.6-cdh5.14.0/bin
sqoop export
–connect jdbc:mysql://node01:3306/bdp_quot?characterEncoding=utf-8
–username root
–password 123456
–table bdp_sector_stock
–export-dir /user/hive/warehouse/maynor_quot_dm.db/app_sec_sector_stock/dt=20200903
–input-fields-terminated-by ‘,’
–input-null-string ‘\N’
–input-null-non-string ‘\N’
6.2.指数K线
进入sqoop服务器安装目录:cd /export/servers/sqoop-1.4.6-cdh5.14.0/bin
sqoop export
–connect jdbc:mysql://node01:3306/bdp_quot?characterEncoding=utf-8
–username root
–password 123456
–table bdp_quot_index_kline_day
–export-dir /user/hive/warehouse/maynor_quot_dm.db/app_sec_quot_index_kline/dt=20200903
–input-fields-terminated-by ‘,’
–input-null-string ‘\N’
–input-null-non-string ‘\N’
–num-mappers 2
6.3.个股K线
进入sqoop服务器安装目录:cd /export/servers/sqoop-1.4.6-cdh5.14.0/bin
sqoop export
–connect jdbc:mysql://node01:3306/bdp_quot?characterEncoding=utf-8
–username root
–password 123456
–table bdp_quot_stock_kline_day
–export-dir /user/hive/warehouse/maynor_quot_dm.db/app_sec_quot_stock_kline/dt=20200903
–input-fields-terminated-by ‘,’
–input-null-string ‘\N’
–input-null-non-string ‘\N’
–num-mappers 2