作者:拐久了_618 | 来源:互联网 | 2023-09-18 21:35
一、产生实时数据(使用python编写),为以后的处理打基础构建:编写python脚本,用crontab定时执行编写python脚本文件,行内容包括time,ip,url,stat
一、产生实时数据(使用python编写),为以后的处理打基础
构建:编写python脚本,用crontab定时执行
编写python脚本文件,行内容包括time,ip,url,statu_code,referer的数据
generate_log.py
#coding=UTF-8
# 导入random函数
import random
import time
# 定义url字段
url_paths = [
"class/301.html",
"class/215.html",
"class/172.html",
"class/153.html",
"class/322.html",
"class/272.html",
"learn/1102",
"course/list"
]
# 定义ip字段
ip_slices = [132,134,10,29,167,198,55,63,72,98,22,25]
# 引流网站
http_referers = [
"https://www.baidu.com/s?wd={query}",
"https://www.sogou.com/web?query={query}",
"https://cn.bing.com/search?q={query}",
"https://search.yahoo.com/search?p={query}"
]
# 查看的key-value
search_keyword = [
"Spark SQL实战",
"Hadoop基础",
"Storm实战",
"Spark Streaming实战",
"大数据面试"
]
# 状态码
status_codes = ["200", "404", "500"]
# 随机生产url
def sample_url():
return random.sample(url_paths, 1)[0]
# 随机生成ip
def sample_ip():
slice = random.sample(ip_slices, 4)
return ".".join([str(item) for item in slice])
# 随机生成引流网站,没有就是 -
def sample_referer():
if random.uniform(0, 1) > 0.2:
return "-"
refer_str = random.sample(http_referers, 1)
query_str = random.sample(search_keyword, 1)
return refer_str[0].format(query=query_str[0])
# 随机生成状态码
def sample_status_code():
return random.sample(status_codes, 1)[0]
# 将url与ip进行连接,生成查询日志
def generate_log(count = 10):
# 生成时间,需要导入time函数
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
f = open("/Users/hadoop/Desktop/data/logs/access.log", "w+")
while count >= 1:
query_log = "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\t{status_code}\t{referer}".format(url=sample_url(), ip=sample_ip(), referer=sample_referer(),status_code=sample_status_code(),local_time=time_str)
print query_log
f.write(query_log + "\n")
count = count - 1
# 主函数
# 保存后在Terminal中运行 python generate_log.py
if __name__ == '__main__':
generate_log(200)
定时执行脚本产生日志:一分钟一次
crontab -e
*/1 * * * * /Users/hadoop/Desktop/data/log_generator.sh
Notes: crontab工具使用说明
linux crontab
网站:https://tool.lu/crontab/
每一分钟执行一次的crontab表达式: */1 * * * *
Linux
* * * * * *
- - - - - -
| | | | | |
| | | | | + year [optional]
| | | | +----- day of week (0 - 7) (Sunday=0 or 7)
| | | +---------- month (1 - 12)
| | +--------------- day of month (1 - 31)
| +-------------------- hour (0 - 23)
+------------------------- min (0 - 59)
二 、对接python日志产生器输出的日志到Flume
Streaming_project.conf
exec-memory-logger.sources = exec-source
exec-memory-logger.sinks = logger-sink
exec-memory-logger.channels = memory-channel
exec-memory-logger.sources.exec-source.type = exec
exec-memory-logger.sources.exec-source.command = tail -F /Users/hadoop/Desktop/data/logs/access.log
exec-memory-logger.sources.exec-source.shell = /bin/sh -c
exec-memory-logger.channels.memory-channel.type = memory
exec-memory-logger.sinks.logger-sink.type = logger
exec-memory-logger.sources.exec-source.channels = memory-channel
exec-memory-logger.sinks.logger-sink.channel = memory-channel
启动flume
flume-ng agent --conf $FLUME_HOME/conf --conf-file /Users/hadoop/Desktop/data/streaming.conf --name exec-memory-logger -Dflume.root.logger=INFO,console
三 、Flume对接Kafka
日志 ==> flume ==> kafka
- 启动zk: zkServer.sh start
- 启动Kafka Server: ./kafka-server-start.sh -daemon /Users/hadoop/app/kafka/config/server.properties &
- 创建topic ./kafka-topics.sh --create --zookeeper localhost:2181 --repalication-factor 1 --partitions 1 --topic streaming_log_py
- 查看topic ./kafka-topics.sh --list --zookeeper localhost:2181
- 启动kafka消费者 ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic streaming_log_py
- 修改Flume配置文件使得flume sink数据到kafka
streaming_kafka.conf
exec-memory-kafka.sources = exec-source
exec-memory-kafka.sinks = kafka-sink
exec-memory-kafka.channels = memory-channel
exec-memory-kafka.sources.exec-source.type = exec
exec-memory-kafka.sources.exec-source.command = tail -F /Users/hadoop/Desktop/data/logs/access.log
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c
exec-memory-kafka.channels.memory-channel.type = memory
exec-memory-kafka.channels.memory-channel.transactiOnCapacity= 10000
exec-memory-kafka.channels.memory-channel.byteCapacityBufferPercentage = 20
exec-memory-kafka.channels.memory-channel.byteCapacity = 800000
exec-memory-kafka.channels.memory-channel.keep-alive = 60
exec-memory-kafka.channels.memory-channel.capacity = 1000000
exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.topic = streaming_log_py
exec-memory-kafka.sinks.kafka-sink.brokerList = localhost:9092
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1
exec-memory-kafka.sinks.kafka-sink.batchSize = 5
exec-memory-kafka.sources.exec-source.channels = memory-channel
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel
启动flume:
flume-ng agent --conf $FLUME_HOME/conf --conf-file /Users/hadoop/Desktop/data/streaming_kafka.conf --name exec-memory-kafka -Dflume.root.logger=INFO,console
flume正常启动后,查看crontab(启动),去kafka消费者控制台,查看是否有预计信息输出.
Notes: 出现了数据的丢失
修改channel:
agent1.channels.memoryChannel.byteCapacity = 800000
agent1.channels.memoryChannel.keep-alive = 60
agent1.channels.memoryChannel.capacity = 1000000
修改java最大内存大小: ($FLUME_HOME/)目录下
vi bin/flume-ng
JAVA_OPTS="-Xmx1024m"
解决方法,参照博客: https://www.cnblogs.com/zlslch/p/7253943.html
具体字段信息,可以flume官网的文档中查询 Memory Channel
项目实战
打通Flume&Kafka&Spark Streaming线路
- 在Spark应用程序接收到数据并完成记录数统计 kafka采用的是receive模式,这是一种已经废弃的模式,要用direct模式
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 测试Kafka对接Spark Streaming
*/
object ImoocStatStreamingApp {
def main(args: Array[String]): Unit = {
if (args.length != 4) {
System.err.println("Usage: ")
System.exit(1)
}
val Array(zkQuorum, groupId, topics,numThreads) = args
val sparkcOnf= new SparkConf()
.setAppName("ImoocStatStreamingApp")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkconf, Seconds(60))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val message = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap)
// 测试步骤一: 测试数据接收
message.map(_._2).count().print()
ssc.start()
ssc.awaitTermination()
}
}