热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

基于OracleStreamingService的IoT数据实时分析(第三弹)

IoT实时数据分析最后一讲!\x0a数据实时加载至ADW,并基于指数平滑搭建预测模型,最后基于OAC做前

上周我们完成了MQTT Broker到 Streaming Service这部分数据的打通,今天就把数据加载到ADW(Autonomous Data Warehouse)进行预测模型的搭建,以及前端实时分析可视化界面的展示。


  1. kafka-connect-mqtt 部署(已完成)

  2. Streaming Service 部署(已完成)

  3. Oracle Kubernetes Engine 部署(已完成)

  4. OKE中部署Docker(已完成)

  5. Hive MQ 部署(本期进行)

  6. Stream数据加载ADW(本期进行)

  7. ADW数据加工及机器学习模型搭建(本期进行)

  8. OAC前端可视化界面开发(本期进行)


从架构图上来看,就是要实现下面红框这一部分。

好,咱们开始!


Hive MQ 部署

本步骤中我们搭建Hive MQ服务来模拟客户生产环境,代替客户的MQTTBroker进行数据的实时分发


1. 部署 Hive MQ

如下图,需要在Websockets Client中生成,Connection。然后单击Add New Topic Subscription生成“mqtt-iot-topic” Topic。


2. 配置 Kafka Connect

# Make a Directory
mkdir -p connect-cli
cd connect-cli


# Download the CLI
wget https://github.com/lensesio/kafka-connect-tools/releases/download/v1.0.6/connect-cli


# Prepare Environment
# Load-Balancer-Public-IP:
kubectl get svc


# 拷贝上述 EXTERNAL-IP
130.61.197.91
export KAFKA_CONNECT_REST="http://"


3. 创建Connect Config

创建mqtt-source.propeties 文件,并把如下代码复制进去

name=mqtt-source
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=3
topics=iot-topic-stream
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.kcql=INSERT INTO iot-topic-stream SELECT * FROM mqtt-iot-topic WITHCOnVERTER=`com.datamountaineer.streamreactor.connect
connect.mqtt.keep.alive=1000
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://broker.hivemq.com:1883
connect.mqtt.service.quality=1
connect.progress.enabled=true


设置环境

# connect-cli 权限
chmod 755 connect-cli


# 安装 Java
sudo yum install jre


创建Connector,并确认状态

./connect-cli create mqtt-source
./connect-cli ps


# mqtt-source 状态确认
./connect-cli status mqtt-source


connectorState: RUNNING
workerId: 10.244.0.12:8083
numberOfTasks: 1
tasks:
- taskId: 0
taskState: RUNNING
workerId: 10.244.0.12:8083


# 删除命令(必要时进行删除)
./connect-cli rm mqtt-source


4. MQTT Data Generator

安装PIP

sudo easy_install pip


安装 paho-mqtt

sudo pip install paho-mqtt


# MQTT-Generator 代码下载
# git 未安装时需要进行安装> sudo yum install -y git
git clone https://github.com/ufthelp/MQTT-Generator.git


# 确认 config
cat config.json


修改Config文件


生成数据

python mqttgen.py 10 1 1


在Hive MQ中确认数据


在Streaming Pool中确认实时数据。如下图,数据已成功写进Streaming中


Streaming数据加载到ADW

1. 上传ADW Wallet到 Connector Container

下载ADW Wallet

路径:OCI Menu → Autonomous Database → 选择Database  → DB Connection,选择下载DB Wallet


利用SCP本机的Wallet文件上传到VM中

scp Wallet_test.zip opc@140.238.14.154:~


把ADW Wallet Zip文件解压缩,放到/oracle_credentials_wallet目录中

sudo su
wallet_unzipped_folder=/oracle_credentials_wallet
mkdir -p $wallet_unzipped_folder
unzip -u /Wallet_test.zip -d $wallet_unzipped_folder


把Wallet从Oracle VM,上传到Kafka Connect nodes

docker cp oracle_credentials_wallet 0a7e6a8b8bcc:/oracle_credentials_wallet


执行Container,确认Wallet上传情况

docker start 0a7e6a8b8bcc
docker exec -it 0a7e6a8b8bcc /bin/bash


2. 安装Kafka jdbc-connercotor

执行Container,确认jdbc-connercotor安装情况

docker start 0a7e6a8b8bcc
docker exec -it 0a7e6a8b8bcc /bin/bash

确认 /usr/share/java/kafka-connect-jdbc 目录,发现已安装


如果未安装jdbc-connercoter时,需要安装

confluent-hub install confluentinc/kafka-connect-jdbc:5.4.0


3. 安装ODBC Driver

如下下载文件后,复制到 /usr/share/java/kafka-connect-jdbc 目录

wget https://objectstorage.us-phoenix-1.oraclecloud.com/n/intmahesht/b/oracledrivers/o/ojdbc8-full.tar.gz
tar xvzf ./ojdbc8-full.tar.gz
cp ojdbc8-full/ojdbc8.jar /usr/share/java/kafka-connect-jdbc
rm -rf ojdbc8-full
rm -rf ojdbc8-full.tar.g


4. Docker Image上传

执行如下代码,把刚刚修改的Docker文件,重新上传的到之前的Oracle Registry中,版本设定为0.2

docker login icn.ocir.io
username:frvvnnkmcble/oracleidentitycloudservice/alvin.jin@oracle.com
password(Authtoken):xZ#89)pl{Mt5#gw


docker commit 0a7e6a8b8bcc icn.ocir.io/frvvnnkmcble/kafka-connect-mqtt/kafka-connect:0.2
docker push icn.ocir.io/frvvnnkmcble/kafka-connect-mqtt/kafka-connect:0.2


connect-cli 目录中生成 jdbc-adw.properties文件,并把如下代码拷贝进去

name=jdbc-adw
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=iot-topic-stream
connection.url=jdbc:oracle:thin:@DBkafka_HIGH?TNS_ADMIN=/oracle_credentials_wallet
connection.user=ADMIN
connection.password=QWERasdf1234
auto.create=true


参照4. OKE中部署Docker步骤,删除Pod后,基于刚刚生成的0.2版本Image,重新部署Docker


ADW数据加工及机器学习模型搭建

在ADW中可以对IoT数据进行多维度的数据分析,本步骤中主要介绍如何利用指数平滑法进行时间序列预测


1. 登陆OML Notebook

路径:OCI Menu → Autonomous Database → 选择Database → Service Console


Development中选择 ML SQL Notebook


选择Notebook


创建Notebook


2. 预测模型


创建训练数据集

%script


-- Drop Previous Training Table
BEGIN
EXECUTE IMMEDIATE 'DROP TABLE ML_AIRDATA';
EXCEPTION
WHEN OTHERS THEN NULL;
END;
/


create table ML_AIRDATA
as select a.sn, a.radon, a.co2, a.dust10, a.temperature, a.humidity, a.air_station_time, substr(a.air_station_time,1,15) time_id, b.hospital_no from admin.air_station a left join admin.air_station_master b on a.sn = b.sn;


select * from ml_airdata;


创建模型及配置模型参数

%script


-- Cleanup old settings table
BEGIN EXECUTE IMMEDIATE 'DROP TABLE esm_radon_settings';
EXCEPTION WHEN OTHERS THEN NULL; END;
/


-- Cleanup old model with the same name
BEGIN DBMS_DATA_MINING.DROP_MODEL('ESM_RADON_SAMPLE');
EXCEPTION WHEN OTHERS THEN NULL; END;
/


-- Create input time series
CREATE OR REPLACE VIEW ESM_RADON_DATA
AS SELECT AIR_STATION_TIME, RADON
FROM ML_AIRDATA where sn = 'FS301S000300';


CREATE TABLE ESM_RADON_SETTINGS(SETTING_NAME VARCHAR2(30),
SETTING_VALUE VARCHAR2(128));
BEGIN
-- Select ESM as the algorithm
INSERT INTO ESM_RADON_SETTINGS
VALUES(DBMS_DATA_MINING.ALGO_NAME,
DBMS_DATA_MINING.ALGO_EXPONENTIAL_SMOOTHING);
-- Set accumulation interval to be minute
INSERT INTO ESM_RADON_SETTINGS
VALUES(DBMS_DATA_MINING.EXSM_INTERVAL,
DBMS_DATA_MINING.EXSM_INTERVAL_MIN);
-- Set prediction step to be 30 minutes (half hour)
INSERT INTO ESM_RADON_SETTINGS
VALUES(DBMS_DATA_MINING.EXSM_PREDICTION_STEP,'30');
-- Set ESM model to be Holt-Winters
INSERT INTO ESM_RADON_SETTINGS
VALUES(DBMS_DATA_MINING.EXSM_MODEL,
DBMS_DATA_MINING.EXSM_HW);
-- Set seasonal cycle to be 4
INSERT INTO ESM_RADON_SETTINGS
VALUES(DBMS_DATA_MINING.EXSM_SEASONALITY,'4');
END;
/


-- Build the Exponential Smotthing (ESM) model
BEGIN
DBMS_DATA_MINING.CREATE_MODEL(MODEL_NAME => 'ESM_RADON_SAMPLE',
MINING_FUNCTION => 'TIME_SERIES',
DATA_TABLE_NAME => 'ESM_RADON_DATA',
CASE_ID_COLUMN_NAME => 'AIR_STATION_TIME',
TARGET_COLUMN_NAME => 'RADON',
SETTINGS_TABLE_NAME => 'ESM_RADON_SETTINGS');
END;
/


模型参数确认

%sql


-- output setting table
SELECT SETTING_NAME, SETTING_VALUE
FROM USER_MINING_MODEL_SETTINGS
WHERE MODEL_NAME = UPPER('ESM_RADON_SAMPLE')
ORDER BY SETTING_NAME;


模型质量确认

%sql


-- get global diagnostics
SELECT NAME,
NUMERIC_VALUE,
STRING_VALUE
FROM DM$VGESM_RADON_SAMPLE
ORDER BY NAME;


预测结果确认

%sql




SELECT * FROM JETT.DM$VPESM_RADON_SAMPLE;


OAC前端可视化界面开发

PC端


移动端


时间序列预测结果


好了,到这里IoT实时数据分析做完了!

基于云端的实时分析方案,从架构到部署,涉及的内容比较多,我们来总结下!

  1. 首先我们在k8s集群上搭建了kafka-connect-mqtt环境,将基于C环境的IoT数据实时接入Streaming Service(流处理)。

  2. 然后把数据实时写入ADW(Autonomouse Data Warehouse),并进行机器学习预测。

  3. 最后在OAC(Oracle Analytics Cloud)上搭建了实时可视化界面。



推荐阅读
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
  • 基于PgpoolII的PostgreSQL集群安装与配置教程
    本文介绍了基于PgpoolII的PostgreSQL集群的安装与配置教程。Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据库客户端之间的中间件,提供了连接池、复制、负载均衡、缓存、看门狗、限制链接等功能,可以用于搭建高可用的PostgreSQL集群。文章详细介绍了通过yum安装Pgpool-II的步骤,并提供了相关的官方参考地址。 ... [详细]
  • 生成式对抗网络模型综述摘要生成式对抗网络模型(GAN)是基于深度学习的一种强大的生成模型,可以应用于计算机视觉、自然语言处理、半监督学习等重要领域。生成式对抗网络 ... [详细]
  • [译]技术公司十年经验的职场生涯回顾
    本文是一位在技术公司工作十年的职场人士对自己职业生涯的总结回顾。她的职业规划与众不同,令人深思又有趣。其中涉及到的内容有机器学习、创新创业以及引用了女性主义者在TED演讲中的部分讲义。文章表达了对职业生涯的愿望和希望,认为人类有能力不断改善自己。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • CentOS 7部署KVM虚拟化环境之一架构介绍
    本文介绍了CentOS 7部署KVM虚拟化环境的架构,详细解释了虚拟化技术的概念和原理,包括全虚拟化和半虚拟化。同时介绍了虚拟机的概念和虚拟化软件的作用。 ... [详细]
  • Windows7 64位系统安装PLSQL Developer的步骤和注意事项
    本文介绍了在Windows7 64位系统上安装PLSQL Developer的步骤和注意事项。首先下载并安装PLSQL Developer,注意不要安装在默认目录下。然后下载Windows 32位的oracle instant client,并解压到指定路径。最后,按照自己的喜好对解压后的文件进行命名和压缩。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • GPT-3发布,动动手指就能自动生成代码的神器来了!
    近日,OpenAI发布了最新的NLP模型GPT-3,该模型在GitHub趋势榜上名列前茅。GPT-3使用的数据集容量达到45TB,参数个数高达1750亿,训练好的模型需要700G的硬盘空间来存储。一位开发者根据GPT-3模型上线了一个名为debuid的网站,用户只需用英语描述需求,前端代码就能自动生成。这个神奇的功能让许多程序员感到惊讶。去年,OpenAI在与世界冠军OG战队的表演赛中展示了他们的强化学习模型,在限定条件下以2:0完胜人类冠军。 ... [详细]
  • centos安装Mysql的方法及步骤详解
    本文介绍了centos安装Mysql的两种方式:rpm方式和绿色方式安装,详细介绍了安装所需的软件包以及安装过程中的注意事项,包括检查是否安装成功的方法。通过本文,读者可以了解到在centos系统上如何正确安装Mysql。 ... [详细]
  • Jboss的EJB部署描述符standardjaws.xml配置步骤详解
    本文详细介绍了Jboss的EJB部署描述符standardjaws.xml的配置步骤,包括映射CMP实体EJB、数据源连接池的获取以及数据库配置等内容。 ... [详细]
  • Jmeter对RabbitMQ压力测试
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Jmeter对RabbitMQ压力测试相关的知识,希望对你有一定的参考价值。Jm ... [详细]
  • 【论文】ICLR 2020 九篇满分论文!!!
    点击上方,选择星标或置顶,每天给你送干货!阅读大概需要11分钟跟随小博主,每天进步一丢丢来自:深度学习技术前沿 ... [详细]
author-avatar
尚福惠珠綺裕
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有