1 日志统计分析
接入的数据类型就是日志
- 离线:Flume==>HDFS
- 实时:Kafka==>流处理引擎==>ES==>Kibana
项目功能
1)统计一分钟内每个域名访问产生的流量
Flink接收Kafka的进行处理
2)统计一分钟内每个用户产生的流量
域名和用户是有对应关系的
Flink接收Kafka的进行 + Flink读取域名和用户的配置数据 进行处理
aliyun CN A E [17/Jul/2018:17:07:50 +0800] 2 223.104.18.110 - 112.
29.213.35:80 0 v2.go2yd.com GET http://v1.go2yd.com/user_upload/1531633977627104fdec
dc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4 HTTP/1.1 - bytes 13869056-13885439/25136186 TCP_HIT/206 112.29.213.35 video/mp4 17168 16384 -:0 0 0 - - 11451601 - "JSP3/2.0.14" "-" "-" "-" http - 2 v1.g
o2yd.com 0.002 25136186 16384 - - - - - - - 1531818470104-114516
01-112.29.213.66#2705261172 644514568
2 mock 数据
package com.imooc.flink.java.project;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
public class MyKafkaProducer {public static void main(String[] args) throws Exception {Properties properties &#61; new Properties();properties.setProperty("bootstrap.servers", "master:9092");properties.setProperty("key.serializer", StringSerializer.class.getName());properties.setProperty("value.serializer", StringSerializer.class.getName());KafkaProducer<String,String> producer &#61; new KafkaProducer<String, String>(properties);String topic &#61; "tzbtest";while (true) {StringBuilder builder &#61; new StringBuilder();builder.append("icu996.com").append("\t").append("CN").append("\t").append(getLevels()).append("\t").append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())).append("\t").append(getIps()).append("\t").append(getDomains()).append("\t").append(getTraffic()).append("\t");System.out.println(builder.toString());producer.send(new ProducerRecord<String, String>(topic, builder.toString()));Thread.sleep(2000);}}private static long getTraffic() {return new Random().nextInt(10000);}private static String getDomains() {String[] domains &#61; new String[]{"v1.go2yd.com","v2.go2yd.com","v3.go2yd.com","v4.go2yd.com","vmi.go2yd.com"};return domains[new Random().nextInt(domains.length)];}private static String getIps() {String[] ips &#61; new String[]{"223.104.18.110","113.101.75.194","27.17.127.135","183.225.139.16","112.1.66.34","175.148.211.190","183.227.58.21","59.83.198.84","117.28.38.28","117.59.39.169"};return ips[new Random().nextInt(ips.length)];}public static String getLevels(){String[] levels &#61; new String[]{"M","E"};return levels[new Random().nextInt(levels.length)];}
}
2.1 测试生产者
- 所有节点同时启动 zookeeper
- 启动 kafka
bin/kafka-server-start.sh config/server.properties &
- 查看 topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
- 创建 消费者
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic tzbtest
- 运行生产者