热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

大数据反爬日记01

篇首语:本文由编程笔记#小编为大家整理,主要介绍了大数据反爬日记01相关的知识,希望对你有一定的参考价值。 大数

篇首语:本文由编程笔记#小编为大家整理,主要介绍了大数据反爬日记01相关的知识,希望对你有一定的参考价值。






大数据反爬日记01

记录自己的反爬日记



既然要做反爬,就肯定得有有爬虫来爬取页面,这里前面已经写好了一个简单的爬虫,将爬取的数据通过python+flask+gunicorn+nginx部署到linux上面了,接下来通过采集爬虫对页面的请求日志进行分析



1.环境准备

hadoop (因为是采用最近比较火的大数据技术,所以需要提前准备好大数据的相关环境)

hive(用于分析离线指标)

spark(用spark引擎来分析实时请求的日志)

Hbase(大数据的数据库)

filebeat+logstash(请求日志的采集工具)

kafka(高吞吐的实时的消息队列)

zookeeper

azkaban(用于任务的调度)

以上作为整个反爬的架构准备,本文主要讲逻辑开发,环境搭建可以参考网上的博文,后续可能会更新大数据框架的搭建

本次仅记录采集数据到kafka,通过spark进行数据etl+结构化输出

下个博客在进行爬虫分析


2.UML图

简单设计建模以下,后续可能会有改动,但是大致的方向就如上图所示,比较简单

补充:

关于数据采集方面,有很多选择,比如flume,或值filebeat可以直接发送到kafka,这里之所以这么选型主要是模拟的是一种大数据环境.

filebeat是logstash的一种轻量化实现,用于部署在承载着web程序的服务器上,减少服务器压力,将采集的日志转发到部署logstash的服务器上,由于lostash是重量级的java程序,所以应该部署在一个资源相对多的服务上专门作为处理数据转发的服务器(不参与业务服务)


3.开干


数据采集

​ 数据源

python爬虫日记01

​ Python爬虫日记02-数据可视化_

​ 采集工具

​ filebeat的搭建与配置

在程序根目录创建conf文件夹,并新建配置文件filebeat_logstash.yml

进行配置filebeat_logstash.yml

filebeat:
prospectors:
- input_type: log
paths:
- /usr/local/nginx/logs/user_access.log
output.logstash:
hosts: ["192.168.100.230:5044"]


/usr/local/nginx/logs/user_access.log 是我们采集的日志路径


hosts: [“192.168.100.230:5044”] 是我们转发到对应logstash所监听的端口 这里为了方便调试就在同一台机器进行实现,如果要在不同的服务器部署,需要开通对应的防火墙和通信


在程序根目录启动filebeat测试

./filebeat -e -c conf/filebeat_logstash.yml

这里报错是因为我们还没有启动logstash监听5044端口

部署logstash

在根目录创建myconfs目录,并新建配置文件

filebeat-logstash-kafka.conf

input
beats
port => "5044"

output
file
path => ['/logall/data/log/filebeat/filebeat-output.log']

启动logstash进行测试

bin/logstash -f myconfs/filebeat-logstash-kafka.conf

成功启动了监听

然后启动filebeat

也启动成功了

查看我们logstash的落地文件/logall/data/log/filebeat/filebeat-output.log

cat /logall/data/log/filebeat/filebeat-output.log

可以看到我们的数据已经同步过来了,我们的请求数据是以#cs#作为分割符进行拼接的,这个在之前一篇博客有nginx的配置

这里数据仅仅是落地到另一个文件而已,我们期望的是转发到kafka中,所以我们需要准备好kafka


数据发送到消息队列

对接kafka

kafka依赖于zookeeper,所以我们需要先启动zookeeper

这里准备了三台虚拟机,分别启动zk,进入zookeeper的根目录

bin/zkServer.sh start

分别启动了三台之后,查看zk状态

bin/zkServer.sh status

这样就是启动成功了

然后分别启动kafka

进入kafka的目录,采用后台启动的方式

nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

这般则为启动成功

新建一个topic 在kafka的根目录下执行

bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 1 --partitions 3 --topic spider_nginx_log

简单介绍以下这几个参数



kafka-topics.sh 这个是调用脚本,针对kafkatopic主题的


create 是告诉脚本创建topic


zookeeper 是指定我们kafka已经配置好的zookeeper,里面存放着一些元数据信息,包括历史的topic,一些offset等等…


replication-factor 指定我们数据的副本数量,这里测试,所以不需要太多副本


partitions 指定我们的分区,分区决定了我们数据的分散成都,也是kafka最大的特点之一,因为分区的存在,让kafka有很大的并行度支持大吞吐,当然支持大吞吐还有零拷贝这一大杀器


topic 指定我们创建的topic


修改logstash的配置,之前落地的是文件,现在修改成输出到kafka

input
beats
port => "5044"

output
kafka
bootstrap_servers => "192.168.100.210:9092"
topic_id => "spider_nginx_log"
batch_size => 5

重新启动

bin/logstash -f myconfs/filebeat-logstash-kafka.conf

启动之前部署好的web程序Python爬虫日记02

监听kafka,消费一下topic,然后访问页面观察是否有数据

bin/kafka-console-consumer.sh --from-beginning --topic spider_nginx_log --zookeeper node01:2181,node02:2181,node03:2181

这边看到数据已经传入了,接下来配置spark对接kafka


Spark接入kafka

1.在本地创建maven项目

使用的工具是IDEA

pom文件


<project xmlns&#61;"http://maven.apache.org/POM/4.0.0"
xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation&#61;"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0modelVersion>
<groupId>it.lukegroupId>
<artifactId>Spider_sparkartifactId>
<version>1.0-SNAPSHOTversion>
<properties>
<scala.version>2.11.8scala.version>
<spark.version>2.2.0spark.version>
properties>
<dependencies>

<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka-clientsartifactId>
<version> 0.10.0.0version>
dependency>

<dependency>
<groupId>org.scala-langgroupId>
<artifactId>scala-libraryartifactId>
<version>2.11.8version>
dependency>

<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_2.11artifactId>
<version>$spark.versionversion>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-sql-kafka-0-10_2.11artifactId>
<version>2.2.0version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-sql_2.11artifactId>
<version>$spark.versionversion>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-streaming-kafka-0-10_2.11artifactId>
<version>2.2.0version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-streaming_2.11artifactId>
<version>$spark.versionversion>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>2.6.0version>
dependency>
dependencies>
<build>
<sourceDirectory>src/main/scalasourceDirectory>
<testSourceDirectory>src/test/scalatestSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.0version>
<configuration>
<source>1.8source>
<target>1.8target>
<encoding>UTF-8encoding>
configuration>
plugin>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<version>3.2.0version>
<executions>
<execution>
<goals>
<goal>compilegoal>
<goal>testCompilegoal>
goals>
<configuration>
<args>
<arg>-dependencyfilearg>
<arg>$project.build.directory/.scala_dependenciesarg>
args>
configuration>
execution>
executions>
plugin>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-shade-pluginartifactId>
<version>3.1.1version>
<executions>
<execution>
<phase>packagephase>
<goals>
<goal>shadegoal>
goals>
<configuration>
<filters>
<filter>
<artifact>*:*artifact>
<excludes>
<exclude>META-INF/*.SFexclude>
<exclude>META-INF/*.DSAexclude>
<exclude>META-INF/*.RSAexclude>
excludes>
filter>
filters>
<transformers>
<transformer implementation&#61;"org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>mainClass>
transformer>
transformers>
configuration>
execution>
executions>
plugin>
plugins>
build>
project>

新建scala object

package it.luke.spark.spider_kafka
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.Seconds, StreamingContext
/**
* &#64;author luke
* &#64;date 2021/5/1521:17
*/

object KafkaStream
def main(args: Array[String]): Unit &#61;
/*
* 1.创建配置对象,生成sparkStream对象
* 2.配置kafka参数,连接kafka
* 3.读取kafka,解析数据,结构化,输出
*
* */

//sparkconf
var conf &#61; new SparkConf().setMaster("local").setAppName("my_spiderapp")
//本地测试
//创建stream对象
val scc &#61; new StreamingContext(conf, Seconds(2))
scc.sparkContext.setLogLevel("WARN") //提高日志输出级别,方便调试
//kafka 配置
//kafka topic
val topics &#61; Array("spider_nginx_log")
//kafka 其它参数
val kafkaParams &#61; Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "mygroup_spider_nginx_log",
"auto.offset.reset" -> "latest", //可以指定读取kafka的进度
"enable.auto.commit" -> (false: java.lang.Boolean)
)
/*createDirectStream(
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V])
* */

val source &#61; KafkaUtils.createDirectStream(scc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
/*
* LocationStrategies.PreferConsistent,是一种分配策略,让每个executor参与分区
* ConsumerStrategies.Subscribe 是一种消费策略
* Subscribe为consumer自动分配partition&#xff0c;有内部算法保证topic-partitions以最优的方式均匀分配给同group下的不同consumer
* */


source.foreachRDD(rdd &#61;>
val asd: RDD[ConsumerRecord[String, String]] &#61; rdd
asd.map(_.value()).foreach(x&#61;>

var strings &#61; x.split("#csc#")
if(strings.length >2)

var obj&#61; req_obj(strings(0),
strings(2),
strings(3),
strings(4),
strings(5),
strings(6),
strings(7),
strings(8)
)
println(obj)
)
// println(req_obj.toString())
//提交offset
val offsets &#61; rdd.asInstanceOf[HasOffsetRanges].offsetRanges
source.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
)
//启动streaming
scc.start()
//阻塞
scc.awaitTermination()


/*
* &#39;$remote_addr"#csc#"-"#csc#"$remote_user"#csc#"[$time_local]"#csc#""$request""#csc#"&#39;
&#39;$status"#csc#"$body_bytes_sent"#csc#""$http_referer""#csc#"&#39;
&#39;"$http_user_agent""#csc#""$http_x_forwarded_for"&#39;;
* */

case class req_obj(remote_addr: String,
remote_user: String,
time_local: String,
request: String,
body_bytes_sent: String,
http_referer: String,
http_user_agent: String,
http_x_forwarded_for: String
)

这里看到,数据已经从kafka对接过来了,spark是一种伪流式的引擎,因为我们需要指定每一批的接受间隔,

val scc &#61; new StreamingContext(conf, Seconds(2))

这里是每两秒处理一批,将数据封装到rdd里面进行处理


4.总结

接下来要根据采集到的数据,分为两个方向,一个是离线分析,一个是实时分析

离线分析需要主要是展示一种未来或者过去的一种趋势作为分析,所以需要有一定的分析数据.

实时的话,是根据统计短时间内抓到的几批数据中,最有可能是爬虫的数据,进行打标,加入黑名单,并入库

后续就可以根据对接的数据,进行实时分析,根据自定义的策略抓爬虫了^

欢迎分享交流~







推荐阅读
  • 字节跳动深圳研发中心安全业务团队正在火热招募人才! ... [详细]
  • Python 数据可视化实战指南
    本文详细介绍如何使用 Python 进行数据可视化,涵盖从环境搭建到具体实例的全过程。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • PHP自学必备:从零开始的准备工作与工具选择 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 周排行与月排行榜开发总结
    本文详细介绍了如何在PHP中实现周排行和月排行榜的开发,包括数据库设计、数据记录和查询方法。涉及的知识点包括MySQL的GROUP BY、WEEK和MONTH函数。 ... [详细]
  • 在将Web服务器和MySQL服务器分离的情况下,是否需要在Web服务器上安装MySQL?如果安装了MySQL,如何解决PHP连接MySQL服务器时出现的连接失败问题? ... [详细]
  • 为了确保iOS应用能够安全地访问网站数据,本文介绍了如何在Nginx服务器上轻松配置CertBot以实现SSL证书的自动化管理。通过这一过程,可以确保应用始终使用HTTPS协议,从而提升数据传输的安全性和可靠性。文章详细阐述了配置步骤和常见问题的解决方法,帮助读者快速上手并成功部署SSL证书。 ... [详细]
  • 小王详解:内部网络中最易理解的NAT原理剖析,挑战你的认知极限
    小王详解:内部网络中最易理解的NAT原理剖析,挑战你的认知极限 ... [详细]
  • 求助:在CentOS 5.8系统上安装PECL扩展遇到问题
    在 CentOS 5.8 系统上尝试安装 APC 扩展时遇到了问题,具体表现为 PECL 工具无法正常工作。为了确保顺利安装,需要解决 PECL 的相关依赖和配置问题。建议检查 PHP 和 PECL 的版本兼容性,并确保所有必要的库和开发工具已正确安装。此外,可以尝试手动下载 APC 扩展的源代码并进行编译安装,以绕过 PECL 工具的限制。 ... [详细]
  • 在 Kubernetes 中,Pod 的调度通常由集群的自动调度策略决定,这些策略主要关注资源充足性和负载均衡。然而,在某些场景下,用户可能需要更精细地控制 Pod 的调度行为,例如将特定的服务(如 GitLab)部署到特定节点上,以提高性能或满足特定需求。本文深入解析了 Kubernetes 的亲和性调度机制,并探讨了多种优化策略,帮助用户实现更高效、更灵活的资源管理。 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • 本文深入探讨了使用Puppet进行软件包分发与管理的方法。首先介绍了fpm这一跨平台的软件包制作工具,其简便的操作流程使得软件包的创建变得轻松快捷。fpm的项目地址为:https://github.com/jordansissel/fpm。通过结合Puppet和fpm,可以实现高效、可靠的软件包管理和部署。 ... [详细]
  • 在项目开发过程中,掌握一些关键的Linux命令至关重要。例如,使用 `Ctrl+C` 可以立即终止当前正在执行的命令;通过 `ps -ef | grep ias` 可以查看特定服务的进程信息,包括进程ID(PID)和JVM参数(如内存分配和远程连接端口);而 `netstat -apn | more` 则用于显示网络连接状态,帮助开发者监控和调试网络服务。这些命令不仅提高了开发效率,还能有效解决运行时的各种问题。 ... [详细]
  • 修复一个 Bug 竟耗时两天?真的有那么复杂吗?
    修复一个 Bug 竟然耗费了两天时间?这背后究竟隐藏着怎样的复杂性?本文将深入探讨这个看似简单的 Bug 为何会如此棘手,从代码层面剖析问题根源,并分享解决过程中遇到的技术挑战和心得。 ... [详细]
author-avatar
牛粪不插花88
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有