热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

实时流处理项目具体步骤

一、产生实时数据(使用python编写),为以后的处理打基础构建:编写python脚本,用crontab定时执行编写python脚本文件,行内容包括time,ip,url,stat

一、产生实时数据(使用python编写),为以后的处理打基础


构建:编写python脚本,用crontab定时执行



  1. 编写python脚本文件,行内容包括time,ip,url,statu_code,referer的数据

    generate_log.py

    #coding=UTF-8
    # 导入random函数
    import random
    import time
    # 定义url字段
    url_paths = [
    "class/301.html",
    "class/215.html",
    "class/172.html",
    "class/153.html",
    "class/322.html",
    "class/272.html",
    "learn/1102",
    "course/list"
    ]
    # 定义ip字段
    ip_slices = [132,134,10,29,167,198,55,63,72,98,22,25]
    # 引流网站
    http_referers = [
    "https://www.baidu.com/s?wd={query}",
    "https://www.sogou.com/web?query={query}",
    "https://cn.bing.com/search?q={query}",
    "https://search.yahoo.com/search?p={query}"
    ]
    # 查看的key-value
    search_keyword = [
    "Spark SQL实战",
    "Hadoop基础",
    "Storm实战",
    "Spark Streaming实战",
    "大数据面试"
    ]
    # 状态码
    status_codes = ["200", "404", "500"]
    # 随机生产url
    def sample_url():
    return random.sample(url_paths, 1)[0]
    # 随机生成ip
    def sample_ip():
    slice = random.sample(ip_slices, 4)
    return ".".join([str(item) for item in slice])
    # 随机生成引流网站,没有就是 -
    def sample_referer():
    if random.uniform(0, 1) > 0.2:
    return "-"
    refer_str = random.sample(http_referers, 1)
    query_str = random.sample(search_keyword, 1)
    return refer_str[0].format(query=query_str[0])
    # 随机生成状态码
    def sample_status_code():
    return random.sample(status_codes, 1)[0]
    # 将url与ip进行连接,生成查询日志
    def generate_log(count = 10):
    # 生成时间,需要导入time函数
    time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    f = open("/Users/hadoop/Desktop/data/logs/access.log", "w+")
    while count >= 1:
    query_log = "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\t{status_code}\t{referer}".format(url=sample_url(), ip=sample_ip(), referer=sample_referer(),status_code=sample_status_code(),local_time=time_str)
    print query_log
    f.write(query_log + "\n")
    count = count - 1
    # 主函数
    # 保存后在Terminal中运行 python generate_log.py
    if __name__ == '__main__':
    generate_log(200)


  2. 定时执行脚本产生日志:一分钟一次

    crontab -e
    */1 * * * * /Users/hadoop/Desktop/data/log_generator.sh

    Notes: crontab工具使用说明
    linux crontab
    网站:https://tool.lu/crontab/
    每一分钟执行一次的crontab表达式: */1 * * * *

    Linux
    * * * * * *
    - - - - - -
    | | | | | |
    | | | | | + year [optional]
    | | | | +----- day of week (0 - 7) (Sunday=0 or 7)
    | | | +---------- month (1 - 12)
    | | +--------------- day of month (1 - 31)
    | +-------------------- hour (0 - 23)
    +------------------------- min (0 - 59)







二 、对接python日志产生器输出的日志到Flume

Streaming_project.conf

exec-memory-logger.sources = exec-source
exec-memory-logger.sinks = logger-sink
exec-memory-logger.channels = memory-channel
exec-memory-logger.sources.exec-source.type = exec
exec-memory-logger.sources.exec-source.command = tail -F /Users/hadoop/Desktop/data/logs/access.log
exec-memory-logger.sources.exec-source.shell = /bin/sh -c
exec-memory-logger.channels.memory-channel.type = memory
exec-memory-logger.sinks.logger-sink.type = logger
exec-memory-logger.sources.exec-source.channels = memory-channel
exec-memory-logger.sinks.logger-sink.channel = memory-channel

启动flume

flume-ng agent --conf $FLUME_HOME/conf --conf-file /Users/hadoop/Desktop/data/streaming.conf --name exec-memory-logger -Dflume.root.logger=INFO,console





三 、Flume对接Kafka


日志 ==> flume ==> kafka


  1. 启动zk: zkServer.sh start

  2. 启动Kafka Server: ./kafka-server-start.sh -daemon /Users/hadoop/app/kafka/config/server.properties &

  3. 创建topic ./kafka-topics.sh --create --zookeeper localhost:2181 --repalication-factor 1 --partitions 1 --topic streaming_log_py

  4. 查看topic ./kafka-topics.sh --list --zookeeper localhost:2181

  5. 启动kafka消费者 ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic streaming_log_py

  6. 修改Flume配置文件使得flume sink数据到kafka


streaming_kafka.conf

exec-memory-kafka.sources = exec-source
exec-memory-kafka.sinks = kafka-sink
exec-memory-kafka.channels = memory-channel
exec-memory-kafka.sources.exec-source.type = exec
exec-memory-kafka.sources.exec-source.command = tail -F /Users/hadoop/Desktop/data/logs/access.log
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c
exec-memory-kafka.channels.memory-channel.type = memory
exec-memory-kafka.channels.memory-channel.transactiOnCapacity= 10000
exec-memory-kafka.channels.memory-channel.byteCapacityBufferPercentage = 20
exec-memory-kafka.channels.memory-channel.byteCapacity = 800000
exec-memory-kafka.channels.memory-channel.keep-alive = 60
exec-memory-kafka.channels.memory-channel.capacity = 1000000
exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.topic = streaming_log_py
exec-memory-kafka.sinks.kafka-sink.brokerList = localhost:9092
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1
exec-memory-kafka.sinks.kafka-sink.batchSize = 5
exec-memory-kafka.sources.exec-source.channels = memory-channel
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel

启动flume:

flume-ng agent --conf $FLUME_HOME/conf --conf-file /Users/hadoop/Desktop/data/streaming_kafka.conf --name exec-memory-kafka -Dflume.root.logger=INFO,console

flume正常启动后,查看crontab(启动),去kafka消费者控制台,查看是否有预计信息输出.
Notes: 出现了数据的丢失
修改channel:

agent1.channels.memoryChannel.byteCapacity = 800000
agent1.channels.memoryChannel.keep-alive = 60
agent1.channels.memoryChannel.capacity = 1000000

修改java最大内存大小: ($FLUME_HOME/)目录下
vi bin/flume-ng

JAVA_OPTS="-Xmx1024m"

解决方法,参照博客: https://www.cnblogs.com/zlslch/p/7253943.html
具体字段信息,可以flume官网的文档中查询 Memory Channel

项目实战

打通Flume&Kafka&Spark Streaming线路


  • 在Spark应用程序接收到数据并完成记录数统计 kafka采用的是receive模式,这是一种已经废弃的模式,要用direct模式

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 测试Kafka对接Spark Streaming
*/
object ImoocStatStreamingApp {
def main(args: Array[String]): Unit = {
if (args.length != 4) {
System.err.println("Usage: ")
System.exit(1)
}
val Array(zkQuorum, groupId, topics,numThreads) = args
val sparkcOnf= new SparkConf()
.setAppName("ImoocStatStreamingApp")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkconf, Seconds(60))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val message = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap)
// 测试步骤一: 测试数据接收
message.map(_._2).count().print()

ssc.start()
ssc.awaitTermination()
}
}


推荐阅读
  • 本文介绍了计算机网络的定义和通信流程,包括客户端编译文件、二进制转换、三层路由设备等。同时,还介绍了计算机网络中常用的关键词,如MAC地址和IP地址。 ... [详细]
  • iOS超签签名服务器搭建及其优劣势
    本文介绍了搭建iOS超签签名服务器的原因和优势,包括不掉签、用户可以直接安装不需要信任、体验好等。同时也提到了超签的劣势,即一个证书只能安装100个,成本较高。文章还详细介绍了超签的实现原理,包括用户请求服务器安装mobileconfig文件、服务器调用苹果接口添加udid等步骤。最后,还提到了生成mobileconfig文件和导出AppleWorldwideDeveloperRelationsCertificationAuthority证书的方法。 ... [详细]
  • YOLOv7基于自己的数据集从零构建模型完整训练、推理计算超详细教程
    本文介绍了关于人工智能、神经网络和深度学习的知识点,并提供了YOLOv7基于自己的数据集从零构建模型完整训练、推理计算的详细教程。文章还提到了郑州最低生活保障的话题。对于从事目标检测任务的人来说,YOLO是一个熟悉的模型。文章还提到了yolov4和yolov6的相关内容,以及选择模型的优化思路。 ... [详细]
  • 本文介绍了在Python3中如何使用选择文件对话框的格式打开和保存图片的方法。通过使用tkinter库中的filedialog模块的asksaveasfilename和askopenfilename函数,可以方便地选择要打开或保存的图片文件,并进行相关操作。具体的代码示例和操作步骤也被提供。 ... [详细]
  • Nginx使用(server参数配置)
    本文介绍了Nginx的使用,重点讲解了server参数配置,包括端口号、主机名、根目录等内容。同时,还介绍了Nginx的反向代理功能。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • baresip android编译、运行教程1语音通话
    本文介绍了如何在安卓平台上编译和运行baresip android,包括下载相关的sdk和ndk,修改ndk路径和输出目录,以及创建一个c++的安卓工程并将目录考到cpp下。详细步骤可参考给出的链接和文档。 ... [详细]
  • Python瓦片图下载、合并、绘图、标记的代码示例
    本文提供了Python瓦片图下载、合并、绘图、标记的代码示例,包括下载代码、多线程下载、图像处理等功能。通过参考geoserver,使用PIL、cv2、numpy、gdal、osr等库实现了瓦片图的下载、合并、绘图和标记功能。代码示例详细介绍了各个功能的实现方法,供读者参考使用。 ... [详细]
  • 移动端常用单位——rem的使用方法和注意事项
    本文介绍了移动端常用的单位rem的使用方法和注意事项,包括px、%、em、vw、vh等其他常用单位的比较。同时还介绍了如何通过JS获取视口宽度并动态调整rem的值,以适应不同设备的屏幕大小。此外,还提到了rem目前在移动端的主流地位。 ... [详细]
  • 树莓派语音控制的配置方法和步骤
    本文介绍了在树莓派上实现语音控制的配置方法和步骤。首先感谢博主Eoman的帮助,文章参考了他的内容。树莓派的配置需要通过sudo raspi-config进行,然后使用Eoman的控制方法,即安装wiringPi库并编写控制引脚的脚本。具体的安装步骤和脚本编写方法在文章中详细介绍。 ... [详细]
  • SpringMVC接收请求参数的方式总结
    本文总结了在SpringMVC开发中处理控制器参数的各种方式,包括处理使用@RequestParam注解的参数、MultipartFile类型参数和Simple类型参数的RequestParamMethodArgumentResolver,处理@RequestBody注解的参数的RequestResponseBodyMethodProcessor,以及PathVariableMapMethodArgumentResol等子类。 ... [详细]
  • VScode格式化文档换行或不换行的设置方法
    本文介绍了在VScode中设置格式化文档换行或不换行的方法,包括使用插件和修改settings.json文件的内容。详细步骤为:找到settings.json文件,将其中的代码替换为指定的代码。 ... [详细]
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • MyBatis多表查询与动态SQL使用
    本文介绍了MyBatis多表查询与动态SQL的使用方法,包括一对一查询和一对多查询。同时还介绍了动态SQL的使用,包括if标签、trim标签、where标签、set标签和foreach标签的用法。文章还提供了相关的配置信息和示例代码。 ... [详细]
author-avatar
拐久了_618
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有