作者:ik82jht | 来源:互联网 | 2023-01-14 12:02
使用spark读取elasticsearch中的数据,使用es提供的api来进行,sc.esRDD("logstash").values官方网站也是这种方式读取数据的,但是我测试的
使用spark读取elasticsearch中的数据,使用es提供的api来进行,
sc.esRDD("logstash").values
官方网站也是这种方式读取数据的,但是我测试的时候有时候会出现读取数据不完整的情况,比如本来读取的数据是这样的
Map(msg -> 2016-03-18 15:24:14 System_ID=ruijie sp_malware:Client_IP=172.40.1.100 Server_IP=61.4.184.50 URL=app.weather.com.cn/dataService/appManage file_name=web scanner2,virus_name=CIH3 Action=block_log Method=POST, srcip -> 172.40.1.100
但是我读取的数据却是这样的
Map(@version -> 1, @timestamp -> Wed Jun 28 14:36:03 CST 2017)
Map(@version -> 1, @timestamp -> Wed Jun 28 14:36:04 CST 2017)
Map(@version -> 1, @timestamp -> Wed Jun 28 14:36:04 CST 2017)
Map(@version -> 1, @timestamp -> Wed Jun 28 14:36:04 CST 2017)
Map(@version -> 1, @timestamp -> Wed Jun 28 14:36:04 CST 2017)
Map(@version -> 1, @timestamp -> Wed Jun 28 14:36:05 CST 2017)
Map(@version -> 1, @timestamp -> Wed Jun 28 14:36:05 CST 2017)
Map(@version -> 1, @timestamp -> Wed Jun 28 14:36:05 CST 2017)
Map(@version -> 1, @timestamp -> Wed Jun 28 14:36:05 CST 2017)
Map(@version -> 1, @timestamp -> Wed Jun 28 14:36:05 CST 2017)
看来这种读数据的方式是有问题的,后来就改成读取方式为下面这种方式,加上type
sc.esRDD("logstash/count").values
然后问题解决,