热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

今日指数项目之ETL数据业务开发(功能实现)【八】

重点:ETL架构设计逻辑架构ETL整体架构设计ETL指的是Extraction,Transformation与Loading。Extraction指的是如何将数从
重点:ETL架构设计

逻辑架构

ETL整体架构设计
ETL指的是 Extraction, Transformation 与 Loading。
Extraction 指的是如何将数从来源端 (Source System) 中截取出来;
Transformation 指的是在截取出来的数据格式与数据仓储所需要的数据做转换;
Loading 指的是将数据加载至大数据平台中;
ETL服务器的作用有
(1)从源端抽取数据;
(2)加载数据到大数据集群;
(3)向集群下达数据处理的指令;
(4)ETL任务调度;
(5)数据预处理。 部分数据会在ETL服务器上先做预处理

在这里插入图片描述

平台整个体系架构分为以下几个层次:
1)源数据层:数据是整个大数据平台架构规划的基础,存储的批量数据包括存储网关委托日志、成交日志、行情数据和持仓数据等。

  1. 数据采集/接入:大数接平台通过FTP服务采集上游数据接口文件,利用大数据分布式数据库集群数据导入组件装载到大数据平台。

  2. 数据存储:数据存储主要采用HDFS存储,包括贴源数据、基础数据区、集市数据区、归档数据区,分别满足不同层面的数据应用要求。

  3. 数据计算:数据计算和查询引擎采用基于分布式架构的组件,该组件支持分布式批量计算、实时计算,以期满足数据预处理加工清洗和批处理计算要求。

  4. 数据服务:按照数据应用要求(业务需求驱动),构建不同的数据类查询、统计分析类应用。


数据流程

在这里插入图片描述

ETL数据开发

Hive表结构介绍

具体表结构请参见“第2章\4.资料\ETL\表结构设计.docx”

启动集群

1、启动HDFS
2、启动YARN集群
3、启动 metastore
nohup hive --service metastore &
4、启动 hiverserver2
nohup hive --service hiveserver2 &
5、启动Yarn history server
mr-jobhistory-daemon.sh start historyserver

初始化数据

5.3.1.上传数据文件
5.3.1.1.上传建表脚本
在服务器磁盘目录创建文件夹:
mkdir -p /export/servers/tmp/create_table

将建表脚本“4.资料\ETL\建表脚本”中的:
(ods层)create_table_ods.sql
(dm层)create_table_dw.sql
(dm层)create_table_dm.sql
上传至服务器磁盘目录:/export/servers/tmp/create_table
5.3.1.2.上传ODS层文本数据
1.源文件对应的数据表
名称 表名 文件名
上证指数日交易信息表 maynor_quot_ods.tra_index_day_info ssIndexTxDay.txt
深证指数日交易信息表 maynor_quot_ods.tra_index_day_info_sz szIndexTxDay.txt
板块基本信息表 maynor_quot_ods.pd_sector_base_info sectorInfo.txt
上证个股日交易信息表 maynor_quot_ods.tra_stock_day_info ssStockDayDtl.txt
深证个股日交易信息表 maynor_quot_ods.tra_stock_day_info_sz szStockDayDtl.txt
交易日历表 maynor_quot_ods.tra_tcc_date data-2020.txt
个股证券基本信息 maynor_quot_ods.pd_stock_sec_info stockSecInfo.txt
万德数据证券基础信息 maynor_quot_ods.pd_wind_base_info windSecInfo.txt
指数日流通基本信息 maynor_quot_ods.pd_index_sec_info indexSecInfo.txt

2.上传数据源文件到服务器目录
在服务器磁盘目录创建数据源文件夹:
mkdir -p /export/servers/tmp/ods_table_source

上传“4.资料\ETL\ODS层数据源”中的所有txt文件到服务器目录:/export/servers/tmp/ods_table_source

5.3.2.建库建表
5.3.2.1.创建数据库
CREATE DATABASE IF NOT EXISTS maynor_quot_ods;
CREATE DATABASE IF NOT EXISTS maynor_quot_dw;
CREATE DATABASE IF NOT EXISTS maynor_quot_dm;

5.3.2.2.创建ods层数据表
执行命令:
cd /export/servers/tmp/create_table
hive -f create_table_ods.sql

5.3.2.3.创建dw层数据表
执行命令:
cd /export/servers/tmp/create_table
hive -f create_table_dw.sql

5.3.2.4.创建dm层数据表
执行命令:
cd /export/servers/tmp/create_table
hive -f create_table_dm.sql

5.3.3.加载ODS层表数据
方式一:手动加载每一条命令
load data local inpath ‘/export/servers/tmp/ods_table_source/stockSecInfo.txt’ overwrite into table pd_stock_sec_info;
load data local inpath ‘/export/servers/tmp/ods_table_source/data-2020.txt’ into table tra_tcc_date ;
load data local inpath ‘/export/servers/tmp/ods_table_source/indexSecInfo.txt’ overwrite into table pd_index_sec_info;
load data local inpath ‘/export/servers/tmp/ods_table_source/windSecInfo.txt’ overwrite into table pd_wind_base_info;
load data local inpath ‘/export/servers/tmp/ods_table_source/ssStockDayDtl.txt’ overwrite into table tra_stock_day_info ;
load data local inpath ‘/export/servers/tmp/ods_table_source/szStockDayDtl.txt’ overwrite into table tra_stock_day_info_sz ;
load data local inpath ‘/export/servers/tmp/ods_table_source/ssIndexTxDay.txt’ overwrite into table tra_index_day_info ;
load data local inpath ‘/export/servers/tmp/ods_table_source/szIndexTxDay.txt’ overwrite into table tra_index_day_info_sz ;
load data local inpath ‘/export/servers/tmp/ods_table_source/sectorInfo.txt’ overwrite into table pd_sector_base_info;

方式二:脚本一键加载
1.创建服务器脚本目录:
mkdir -p /export/servers/tmp/shell

2.上传脚本到服务器目录
将“4.资料\ETL\ODS层数据加载脚本\load_ods_table.sh”上传到服务文件路径:/export/servers/tmp/shell

3.脚本赋权限
脚本赋执行权限:chmod 755 load_ods_table.sh

4.执行脚本
sh load_ods_table.sh

5.4.ods数据到dw层
5.4.1.沪市每日证券收盘明细表
1、关联ods层沪市个股交易信息、个股证券基本信息和万德数据证券基础信息表,将数据插入到日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
上证个股日交易信息表(tra_stock_day_info)与个股证券基本信息表(pd_stock_sec_info)和万德数据证券基础信息表(pd_wind_base_info)通过证券代码(sec_code)进行关联,组成沪市每日证券收盘明细表(sum_stock_close_dtl_day)。
新表字段如下:
列名 字段 类型 来源表
交易日期 trade_date date tra_stock_day_info
证券代码 sec_code char(6) tra_stock_day_info
证券简称 sec_abbr varchar(8) tra_stock_day_info
昨日收盘价 last_close_price decimal(8,2) tra_stock_day_info
当日开盘价 cur_open_price decimal(8,2) tra_stock_day_info
当日收盘价 cur_high_price decimal(8,2) tra_stock_day_info
当日最高价 cur_low_price decimal(8,2) tra_stock_day_info
当日最低价 cur_close_price decimal(8,2) tra_stock_day_info
当日成交量 cur_trade_vol decimal(18,0) tra_stock_day_info
当日成交金额 cur_trade_amt decimal(18,2) tra_stock_day_info
总股本 tot_cap decimal(18,0) pd_stock_sec_info
流通股本 nego_cap decimal(18,0) pd_stock_sec_info
信用评级 credit_level varchar(20) pd_stock_sec_info
是否已上市 if_list char(1) pd_wind_base_info
交易货币代码 trade_curr_code varchar(20) pd_wind_base_info
交易所代码 exch_code varchar(10) pd_wind_base_info
备注:ODS数据表中的trade_date字段为历史数据,如果想获取T-1日日期,可通过date_sub(current_date,1) 获取。
参考代码:
insert overwrite table maynor_quot_dw.sum_stock_close_dtl_day partition (dt=‘20200704’)
select
a.trade_date
,a.sec_code
,a.sec_abbr
,a.last_close_price
,a.cur_open_price
,a.cur_high_price
,a.cur_low_price
,a.cur_close_price
,a.cur_trade_vol
,a.cur_trade_amt
,b.tot_cap
,b.nego_cap
,b.credit_level
,c.if_list
,c.trade_curr_code
,c.exch_code
from maynor_quot_ods.tra_stock_day_info a
join maynor_quot_ods.pd_stock_sec_info b
on a.sec_code = b.sec_code
join maynor_quot_ods.pd_wind_base_info c
on a.sec_code = c.sec_code;

查询结果数据:
select * from maynor_quot_dw.sum_stock_close_dtl_day limit 5;
删除分区表数据:
alter table maynor_quot_dw.sum_stock_close_dtl_day drop partition(dt=‘20200704’);

5.4.2.深市每日证券收盘明细表
1、关联ods层深市个股交易信息和基础信息,将数据插入到日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
深证个股日交易信息表(tra_stock_day_info_sz)与个股证券基本信息表(pd_stock_sec_info)和万德数据证券基础信息表(pd_wind_base_info)通过证券代码(sec_code)进行关联,组成深市每日证券收盘明细表(sum_stock_close_dtl_day_sz)。
新表字段如下:
列名 字段 类型 来源表
交易日期 trade_date date tra_stock_day_info_sz
证券代码 sec_code char(6) tra_stock_day_info_sz
证券简称 sec_abbr varchar(8) tra_stock_day_info_sz
昨日收盘价 last_close_price decimal(8,2) tra_stock_day_info_sz
当日开盘价 cur_open_price decimal(8,2) tra_stock_day_info_sz
当日收盘价 cur_high_price decimal(8,2) tra_stock_day_info_sz
当日最高价 cur_low_price decimal(8,2) tra_stock_day_info_sz
当日最低价 cur_close_price decimal(8,2) tra_stock_day_info_sz
当日成交量 cur_trade_vol decimal(18,0) tra_stock_day_info_sz
当日成交金额 cur_trade_amt decimal(18,2) tra_stock_day_info_sz
总股本 tot_cap decimal(18,0) pd_stock_sec_info
流通股本 nego_cap decimal(18,0) pd_stock_sec_info
信用评级 credit_level varchar(20) pd_stock_sec_info
是否已上市 if_list char(1) pd_wind_base_info
交易货币代码 trade_curr_code varchar(20) pd_wind_base_info
交易所代码 exch_code varchar(10) pd_wind_base_info
参考代码:
insert overwrite table maynor_quot_dw.sum_stock_close_dtl_day_sz partition (dt=‘20200704’)
select
a.trade_date
,a.sec_code
,a.sec_abbr
,a.last_close_price
,a.cur_open_price
,a.cur_high_price
,a.cur_low_price
,a.cur_close_price
,a.cur_trade_vol
,a.cur_trade_amt
,b.tot_cap
,b.nego_cap
,b.credit_level
,c.if_list
,c.trade_curr_code
,c.exch_code
from maynor_quot_ods.tra_stock_day_info_sz a
join maynor_quot_ods.pd_stock_sec_info b
on a.sec_code = b.sec_code
join maynor_quot_ods.pd_wind_base_info c
on a.sec_code = c.sec_code;

查询结果数据:
select * from maynor_quot_dw.sum_stock_close_dtl_day_sz limit 5;

5.4.3.板块个股对应基本信息表
1、关联ods层板块基本信息表、万德数据证券基础信息和个股证券基本信息,将数据插入到日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
板块基本信息表(pd_sector_base_info)与万德数据证券基础信息表(pd_wind_base_info)通过证券id(sec_id)关联;再通过证券代码(sec_code)关联个股证券基本信息表(pd_stock_sec_info),按照算法描述过滤数据,最后组成板块个股对应基本信息表。(sum_sector_stock_base_info)。
过滤条件:1、行业板块代码以85开头
2、只筛选行业板块数据中板块类型(sec_type_code)为A股数据
新表字段如下:
列名 字段 类型 来源表
更新时间 etl_time timestamp
交易日期 trade_date date pd_sector_base_info
板块代码 sector_code varchar (6) pd_sector_base_info
板块名称 sector_name varchar (20) pd_sector_base_info
板块类型 sector_type char (1) pd_sector_base_info
个股代码 sec_code char (6) pd_wind_base_info
个股流通股本 nego_cap decimal(18,0) pd_stock_sec_info
证券上市地点简称 sec_abbr varchar (10) pd_sector_base_info
参考代码:
insert overwrite table maynor_quot_dw.sum_sector_stock_base_info partition(dt=‘20200719’)
select
current_timestamp,
a.trade_date ,
a.sector_code,
a.sector_name ,
a.sector_type,
b.sec_code ,
c.nego_cap ,
a.sec_abbr
from maynor_quot_ods.pd_sector_base_info a
join maynor_quot_ods.pd_wind_base_info b
on a.sec_id = b.sec_id
join maynor_quot_ods.pd_stock_sec_info c
on b.sec_code=c.sec_code
where a.sector_code like ‘85%’
and b.sec_type_code=‘A’
;

查询结果数据:
select * from maynor_quot_dw.sum_sector_stock_base_info limit 5;

5.4.4.沪市指数日收盘明细表
1、关联ods层上证指数日交易信息表和指数日流通基本信息表,将数据插入到日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
上证指数日交易信息表(tra_index_day_info)与指数日流通基本信息(pd_index_sec_info)通过指数代码(index_code)进行关联,组成沪市指数日收盘明细表(sum_index_close_dtl_day)。
新表字段如下:
列名 字段 类型 来源表
交易日期 trade_date date tra_index_day_info
指数代码 index_code char(6) tra_index_day_info
指数名称 index_name varchar(20) tra_index_day_info
当日成交量 cur_trade_vol decimal(18,0) tra_index_day_info
当日成交金额 cur_trade_amt decimal(18,2) tra_index_day_info
昨日收盘价 last_close_price decimal(8,2) tra_index_day_info
当日开盘价 cur_open_price decimal(8,2) tra_index_day_info
当日最高价 cur_high_price decimal(8,2) tra_index_day_info
当日最低价 cur_low_price decimal(8,2) tra_index_day_info
当日收盘价 cur_close_price decimal(8,2) tra_index_day_info
涨跌幅 rf_range varchar(10) pd_index_sec_info
涨跌额 rf_val decimal(8,2) pd_index_sec_info
振幅 swing_range varchar(10) pd_index_sec_info
量比 volumn_ratio decimal(8,2) pd_index_sec_info
参考代码:
INSERT overwrite TABLE maynor_quot_dw.sum_index_close_dtl_day PARTITION (dt=‘20200704’)
SELECT
a.trade_date
,a.index_code
,a.index_name
,a.cur_trade_vol
,a.cur_trade_amt
,a.last_close_price
,a.cur_open_price
,a.cur_high_price
,a.cur_low_price
,a.cur_close_price
,b.rf_range
,b.rf_val
,b.swing_range
,b.volumn_ratio
FROM
maynor_quot_ods.tra_index_day_info a
INNER JOIN
maynor_quot_ods.pd_index_sec_info b
ON
a.index_code = b.index_code;

查询结果数据:
select * from maynor_quot_dw.sum_index_close_dtl_day limit 5;

5.4.5.深市指数日收盘明细表
1、关联ods层深证指数日交易信息表和指数日流通基本信息表,将数据插入到日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
深证指数日交易信息表(tra_index_day_info_sz)与指数日流通基本信息(pd_index_sec_info)通过指数代码(index_code)进行关联,组成深市指数日收盘明细表(sum_index_close_dtl_day_sz)。
新表字段如下:
列名 字段 类型 来源表
交易日期 trade_date date tra_index_day_info_sz
指数代码 index_code char(6) tra_index_day_info_sz
指数名称 index_name varchar(20) tra_index_day_info_sz
当日成交量 cur_trade_vol decimal(18,0) tra_index_day_info_sz
当日成交金额 cur_trade_amt decimal(18,2) tra_index_day_info_sz
昨日收盘价 last_close_price decimal(8,2) tra_index_day_info_sz
当日开盘价 cur_open_price decimal(8,2) tra_index_day_info_sz
当日最高价 cur_high_price decimal(8,2) tra_index_day_info_sz
当日最低价 cur_low_price decimal(8,2) tra_index_day_info_sz
当日收盘价 cur_close_price decimal(8,2) tra_index_day_info_sz
涨跌幅 rf_range varchar(10) pd_index_sec_info
涨跌额 rf_val decimal(8,2) pd_index_sec_info
振幅 swing_range varchar(10) pd_index_sec_info
量比 volumn_ratio decimal(8,2) pd_index_sec_info
参考代码:
INSERT overwrite TABLE maynor_quot_dw.sum_index_close_dtl_day_sz PARTITION (dt=‘20200704’)
SELECT
a.trade_date
,a.index_code
,a.index_name
,a.cur_trade_vol
,a.cur_trade_amt
,a.last_close_price
,a.cur_open_price
,a.cur_high_price
,a.cur_low_price
,a.cur_close_price
,b.rf_range
,b.rf_val
,b.swing_range
,b.volumn_ratio
FROM
maynor_quot_ods.tra_index_day_info_sz a
INNER JOIN
maynor_quot_ods.pd_index_sec_info b
ON
a.index_code = b.index_code;

查询结果数据:
select * from maynor_quot_dw.sum_index_close_dtl_day_sz limit 5;

5.5.dw层数据到dm层
5.5.1.个股K线表
1、沪、深两市每日证券收盘明细表关联交易日历表,将数据插入到个股K线表日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
沪市每日证券收盘明细表(sum_stock_close_dtl_day)、深市每日证券收盘明细表(sum_stock_close_dtl_day_sz)和交易日历表(tra_tcc_date)通过交易时间(trade_date)进行关联,组成个股K线表(app_sec_quot_stock_kline)。
过滤条件:1、上证股票代码以600和900开头,深证股票代码以00、200、300开头
2、沪、深两市每日证券收盘明细表T日分区数据
注意事项:查询表字段与数据保存字段是否一致。
新表字段如下:
列名 字段 类型 来源表
更新时间 update_time timestamp
交易时间 trade_date date 沪、深两市每日证券收盘明细表
证券代码 sec_code char(6) 沪、深两市每日证券收盘明细表
证券名称 sec_name varchar(20) 沪、深两市每日证券收盘明细表
k线类型 k_line_type char(1) 日K(1)、周K(2)、月K(3)
前收盘价 pre_close_price decimal(8,2) 沪、深两市每日证券收盘明细表
开盘价 open_price decimal(8,2) 沪、深两市每日证券收盘明细表
最高价 high_price decimal(8,2) 沪、深两市每日证券收盘明细表
最低价 low_price decimal(8,2) 沪、深两市每日证券收盘明细表
收盘价 close_price decimal(8,2) 沪、深两市每日证券收盘明细表
均价 avgprice decimal(8,2) 成交额/成交量
成交量 trade_vol decimal(18,0) 沪、深两市每日证券收盘明细表
成交金额 trade_amt decimal(18,2) 沪、深两市每日证券收盘明细表
参考代码:

–沪市
INSERT INTO TABLE maynor_quot_dm.app_sec_quot_stock_kline PARTITION (dt=‘20200901’)
SELECT
CURRENT_TIMESTAMP,
a.trade_date ,
a.sec_code ,
a.sec_abbr ,
‘1’,
a.last_close_price,
a.cur_open_price ,
a.cur_high_price ,
a.cur_low_price ,
a.cur_close_price,
CASE WHEN a.cur_trade_vol =0 THEN 0 ELSE ROUND(a.cur_trade_amt/a.cur_trade_vol,2) END AS avgprice,
a.cur_trade_vol,
a.cur_trade_amt
FROM
maynor_quot_dw.sum_stock_close_dtl_day a
JOIN maynor_quot_ods.tra_tcc_date b
ON a.trade_date = b.trade_date
WHERE
a.sec_code LIKE ‘60%’ OR a.sec_code LIKE ‘90%’
AND a.dt = ‘20200901’ ;

–深市
insert into table maynor_quot_dm.app_sec_quot_stock_kline partition (dt=‘20200901’)
select
current_timestamp,
a.trade_date ,
a.sec_code ,
a.sec_abbr ,
‘1’,
a.last_close_price,
a.cur_open_price ,
a.cur_high_price ,
a.cur_low_price ,
a.cur_close_price,
case when a.cur_trade_vol =0 then 0 else round(a.cur_trade_amt/a.cur_trade_vol,2) end as avgprice,
a.cur_trade_vol,
a.cur_trade_amt
from
maynor_quot_dw.sum_stock_close_dtl_day_sz a
join maynor_quot_ods.tra_tcc_date b
on a.trade_date = b.trade_date
where
a.sec_code like ‘00%’ or a.sec_code like ‘20%’ or a.sec_code like ‘30%’
and a.dt = ‘20200901’ ;

查询结果数据:
select * from maynor_quot_dm.app_sec_quot_stock_kline limit 5;

5.5.2.板块成分股表
1、关联板块个股对应基本信息表、个股K线表,将数据插入到日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
板块个股对应基本信息表(sum_sector_stock_base_info)与个股K线表(app_sec_quot_stock_kline)通过证券代码(sec_code)进行关联,组成板块成分股表(app_sec_sector_stock)。
过滤条件:1、行业板块代码以85开头
2、个股K线表T日分区数据
3、K线类型是日K(1)
新表字段如下:
列名 字段 类型 来源表
更新时间 etl_time timestamp
交易日期 trade_date date sum_sector_stock_base_info
板块代码 sector_code varchar (6) sum_sector_stock_base_info
板块名称 sector_name varchar (20) sum_sector_stock_base_info
板块类型 sector_type char (1) sum_sector_stock_base_info
个股代码 sec_code char (6) sum_sector_stock_base_info
个股流通股本 nego_cap decimal(18,0) sum_sector_stock_base_info
前一交易日个股流通市值 pre_nego_cap decimal(18,2) 收盘价个股流通股本
前一交易日板块总流通市值 pre_sector_nego_cap decimal(18,2) 板块下个股数据汇总
SUM(收盘价
个股流通股本)
证券上市地点简称 sec_abbr varchar (10) sum_sector_stock_base_info
参考代码:
INSERT overwrite TABLE maynor_quot_dm.app_sec_sector_stock PARTITION (dt=‘20200902’)
SELECT
CURRENT_TIMESTAMP AS etl_time
,a.trade_date
,a.sector_code
,a.sector_name
,a.sector_type
,a.sec_code
,a.nego_cap
,a.nego_cape.close_price AS pre_nego_cap
,SUM(a.nego_cap
e.close_price) over(PARTITION BY a.sector_code) AS pre_sector_nego_cap
,a.sec_abbr
FROM maynor_quot_dw.sum_sector_stock_base_info a
JOIN maynor_quot_dm.app_sec_quot_stock_kline e
ON a.sec_code=e.sec_code
AND e.k_line_type=‘1’
WHERE a.sector_code LIKE ‘85%’
AND e.dt= ‘20200902’
;

查询结果数据:
select * from maynor_quot_dm.app_sec_sector_stock limit 5;

5.5.3.指数K线表
1、沪、深两市指数日收盘明细表关联交易日历表,将数据插入到指数K线表日分区: 20200704分区
2、插入数据后使用 hive/beeline确认数据是否正确映射
沪市指数日收盘明细表(sum_index_close_dtl_day)、深市指数日收盘明细表(sum_index_close_dtl_day_sz)和交易日历表(tra_tcc_date)进行关联,组成指数K线表(app_sec_quot_index_kline)。
过滤条件:1、过滤沪市指数代码以00开头,深市指数以39开头的指数行情数据;
2、沪、深两市指数日收盘明细表T日分区数据
注意事项:查询表字段与数据保存字段是否一致。
新表字段如下:
列名 字段 类型 来源表
更新时间 update_time timestamp
交易时间 trade_date date 沪、深两市指数日收盘明细表
指数代码 index_code char(6) 沪、深两市指数日收盘明细表
指数名称 index_name varchar(20) 沪、深两市指数日收盘明细表
k线类型 k_line_type char(1) 日K(1)、周K(2)、月K(3)
前收盘价 pre_close_price decimal(8,2) 沪、深两市指数日收盘明细表
开盘价 open_price decimal(8,2) 沪、深两市指数日收盘明细表
最高价 high_price decimal(8,2) 沪、深两市指数日收盘明细表
最低价 low_price decimal(8,2) 沪、深两市指数日收盘明细表
收盘价 close_price decimal(8,2) 沪、深两市指数日收盘明细表
均价 avgprice decimal(8,2) 成交额/成交量
成交量 trade_vol decimal(18,0) 沪、深两市指数日收盘明细表
成交金额 trade_amt decimal(18,2) 沪、深两市指数日收盘明细表
参考代码:
INSERT overwrite TABLE maynor_quot_dm.app_sec_quot_index_kline PARTITION (dt=‘20200704’)
SELECT
CURRENT_TIMESTAMP AS update_time
,a.trade_date AS trade_date
,a.index_code
,a.index_name
,‘1’ AS k_line_type
,a.last_close_price AS pre_close_price
,a.cur_open_price AS open_price
,a.cur_high_price AS high_price
,a.cur_low_price AS low_price
,a.cur_close_price AS close_price
,CASE WHEN a.cur_trade_vol=0 THEN 0 ELSE ROUND(a.cur_trade_amt/a.cur_trade_vol,2) END AS avgprice
,a.cur_trade_vol AS trade_vol
,a.cur_trade_amt AS trade_amt
FROM maynor_quot_dw.sum_index_close_dtl_day a
JOIN maynor_quot_ods.tra_tcc_date b
ON a.trade_date=b.trade_date
WHERE
a.index_code LIKE ‘00%’
AND a.dt = ‘20190905’
UNION ALL
SELECT
CURRENT_TIMESTAMP AS update_time
,a.trade_date AS trade_date
,a.index_code
,a.index_name
,‘1’ AS k_line_type
,a.last_close_price AS pre_close_price
,a.cur_open_price AS open_price
,a.cur_high_price AS high_price
,a.cur_low_price AS low_price
,a.cur_close_price AS close_price
,CASE WHEN a.cur_trade_vol=0 THEN 0 ELSE ROUND(a.cur_trade_amt/a.cur_trade_vol,2) END AS avgprice
,a.cur_trade_vol AS trade_vol
,a.cur_trade_amt AS trade_amt
FROM maynor_quot_dw.sum_index_close_dtl_day_sz a
JOIN maynor_quot_ods.tra_tcc_date b
ON a.trade_date=b.trade_date
WHERE
a.index_code LIKE ‘39%’
and a.dt = ‘20190905’ ;

查询结果数据:
select * from maynor_quot_dm.app_sec_quot_index_kline limit 5;

数据同步

数据同步是将hive表中的数据同步到mysql中,这里数据同步技术组件我们采用的是Sqoop。
Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
6.1.板块成分股
进入sqoop服务器安装目录:cd /export/servers/sqoop-1.4.6-cdh5.14.0/bin

sqoop export
–connect jdbc:mysql://node01:3306/bdp_quot?characterEncoding=utf-8
–username root
–password 123456
–table bdp_sector_stock
–export-dir /user/hive/warehouse/maynor_quot_dm.db/app_sec_sector_stock/dt=20200903
–input-fields-terminated-by ‘,’
–input-null-string ‘\N’
–input-null-non-string ‘\N’

6.2.指数K线
进入sqoop服务器安装目录:cd /export/servers/sqoop-1.4.6-cdh5.14.0/bin

sqoop export
–connect jdbc:mysql://node01:3306/bdp_quot?characterEncoding=utf-8
–username root
–password 123456
–table bdp_quot_index_kline_day
–export-dir /user/hive/warehouse/maynor_quot_dm.db/app_sec_quot_index_kline/dt=20200903
–input-fields-terminated-by ‘,’
–input-null-string ‘\N’
–input-null-non-string ‘\N’
–num-mappers 2

6.3.个股K线
进入sqoop服务器安装目录:cd /export/servers/sqoop-1.4.6-cdh5.14.0/bin

sqoop export
–connect jdbc:mysql://node01:3306/bdp_quot?characterEncoding=utf-8
–username root
–password 123456
–table bdp_quot_stock_kline_day
–export-dir /user/hive/warehouse/maynor_quot_dm.db/app_sec_quot_stock_kline/dt=20200903
–input-fields-terminated-by ‘,’
–input-null-string ‘\N’
–input-null-non-string ‘\N’
–num-mappers 2


推荐阅读
  • Nginx使用AWStats日志分析的步骤及注意事项
    本文介绍了在Centos7操作系统上使用Nginx和AWStats进行日志分析的步骤和注意事项。通过AWStats可以统计网站的访问量、IP地址、操作系统、浏览器等信息,并提供精确到每月、每日、每小时的数据。在部署AWStats之前需要确认服务器上已经安装了Perl环境,并进行DNS解析。 ... [详细]
  • Centos7.6安装Gitlab教程及注意事项
    本文介绍了在Centos7.6系统下安装Gitlab的详细教程,并提供了一些注意事项。教程包括查看系统版本、安装必要的软件包、配置防火墙等步骤。同时,还强调了使用阿里云服务器时的特殊配置需求,以及建议至少4GB的可用RAM来运行GitLab。 ... [详细]
  • 如何使用Java获取服务器硬件信息和磁盘负载率
    本文介绍了使用Java编程语言获取服务器硬件信息和磁盘负载率的方法。首先在远程服务器上搭建一个支持服务端语言的HTTP服务,并获取服务器的磁盘信息,并将结果输出。然后在本地使用JS编写一个AJAX脚本,远程请求服务端的程序,得到结果并展示给用户。其中还介绍了如何提取硬盘序列号的方法。 ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • Linux如何安装Mongodb的详细步骤和注意事项
    本文介绍了Linux如何安装Mongodb的详细步骤和注意事项,同时介绍了Mongodb的特点和优势。Mongodb是一个开源的数据库,适用于各种规模的企业和各类应用程序。它具有灵活的数据模式和高性能的数据读写操作,能够提高企业的敏捷性和可扩展性。文章还提供了Mongodb的下载安装包地址。 ... [详细]
  • 本文介绍了Windows操作系统的版本及其特点,包括Windows 7系统的6个版本:Starter、Home Basic、Home Premium、Professional、Enterprise、Ultimate。Windows操作系统是微软公司研发的一套操作系统,具有人机操作性优异、支持的应用软件较多、对硬件支持良好等优点。Windows 7 Starter是功能最少的版本,缺乏Aero特效功能,没有64位支持,最初设计不能同时运行三个以上应用程序。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • 本文介绍了一个React Native新手在尝试将数据发布到服务器时遇到的问题,以及他的React Native代码和服务器端代码。他使用fetch方法将数据发送到服务器,但无法在服务器端读取/获取发布的数据。 ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • 本文介绍了在rhel5.5操作系统下搭建网关+LAMP+postfix+dhcp的步骤和配置方法。通过配置dhcp自动分配ip、实现外网访问公司网站、内网收发邮件、内网上网以及SNAT转换等功能。详细介绍了安装dhcp和配置相关文件的步骤,并提供了相关的命令和配置示例。 ... [详细]
  • 搭建Windows Server 2012 R2 IIS8.5+PHP(FastCGI)+MySQL环境的详细步骤
    本文详细介绍了搭建Windows Server 2012 R2 IIS8.5+PHP(FastCGI)+MySQL环境的步骤,包括环境说明、相关软件下载的地址以及所需的插件下载地址。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • 本文详细介绍了cisco路由器IOS损坏时的恢复方法,包括进入ROMMON模式、设置IP地址、子网掩码、默认网关以及使用TFTP服务器传输IOS文件的步骤。 ... [详细]
author-avatar
手机用户2502928203
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有