在编写爬虫的过程中我们经常会遇到需要代理的情况,代理可以到网上找免费的也可以用付费的。付费的使用网站提供的API就可以轻松获取代理,免费的就只能到处找然后采集而且代理质量还不高(付费的也不一定好)。但是喜欢动手的我还是更偏向后者,即自己找代理,搭建起一个代理池然后提供API给爬虫使用。
在我准备动手搭建的时候我发现Github上已经有一个优秀的代理池项目了proxy_pool。试用了一下感觉不错然后就看了下源码,读源码的过程中发现这个项目有很多值得学习的地方,于是决定仿照它写一个,这样既能把代理池搭建起来也能学到知识,于是便有了ProxyPool。
在这里感谢原作者——jhao104 提供这样的优秀项目
注意:因为是仿照所以本项目内容与原项目基本一致
本章节内容来源:proxy_pool/introduce.md
项目通过爬虫方式持续抓取代理网站公布的免费代理IP,实时校验,维护部分可以使用的代理,并通过api的形式提供外部使用
注意:以下原作者的框架图与本项目有点小出入,因为本项目在仿照过程中有所修改,但总体是一致的
包含ProxyManager和ProxyGetter, ProxyManager用于在DB和Api间进行数据传输。ProxyGetter用于获取代理,目前有4个免费代理源,每调用一次就会抓取这个4个网站的最新代理放入DB,支持自定义扩展额外的代理获取函数
用于存放代理IP
计划任务,定时去检测DB中的代理可用性,删除不可用的代理。同时也会主动通过ProxyGetter去获取最新代理放入DB;
代理池的外部接口,由Flask实现,功能是给爬虫提供与代理池交互的接口。
api接口相关代码,目前api是由Flask实现,代码也非常简单。客户端请求传给Flask,Flask调用ProxyManager
中的实现,包括get/get_all/get_status
数据库相关代码,目前数据库支持Redis。代码用工厂模式实现,方便日后扩展其他类型数据库
ProxyManager:get/refresh/get_all/get_status
等接口的具体实现类,负责管理proxy
ProxyGetter:获取代理的类,支持自己扩展获取代理接口
定时任务相关代码,现在只是实现定时去刷新代理,并验证可用代理,采用多进程方式
存放一些公共的类或函数,包含Config
:读取配置文件config.ini的类, Singleton
:实现单例,LazyProperty
:实现类属性惰性计算。等等
配置文件: Config.ini
数据库配置和代理获取接口配置,在ProxyGetter中添加新的代理获取方法,需在Config.ini中注册方可使用
项目中会涉及大量的配置信息,为了便于管理我们应该将配置信息统一存放并且设计一个类
进行统一管理。
配置文件采用ini文件,Python内置的ConfigParser类可以解析ini文件,我们只需继承内置的
ConfigParser类即可定制配置解析类
这里因为配置解析类在很多地方都要用到,所以把它放到Util中
配置文件:
# ProxyPool/Config.ini
[Database]
Host = 127.0.0.1
Port = 6379
配置解析类:
# ProxyPool/Util/ConfigGetter.py
from configparser import ConfigParser
import os
class Config():
def __init__(self):
self.current_path = os.path.split(os.path.abspath(__file__))[0]
self.file_path = os.path.join(os.path.split(self.current_path)[0], 'Config.ini')
self.parser = ConfigParser()
self.parser.read(self.file_path)
@property
def db_host(self):
return self.parser['Database']['Host']
@property
def db_port(self):
return self.parser['Database']['Port']
# 测试代码
if __name__ == '__main__':
c = Config()
print(c.db_host)
print(c.db_port)
涉及知识点:
外部接口的实现使用Flask
# ProxyPool/Api/ProxyApi.py
from flask import Flask
app = Flask('ProxyPool')
@app.route("/")
def hello():
return 'Hello Coder'
if __name__ == '__main__':
app.run()
到这里我们完成了简单的通用的获取配置信息的类以及外部接口,当然了现在类是不完善
的,往下的内容涉及的类如无特殊说明都是未完善的类,但是到本篇慢慢结束时都会是完
善的。毕竟我当初开始仿照时就是一步步由不完善到完善的。
数据库主要有两个表useful和raw,raw存放刚采集的proxy,useful存放验证有效的proxy
数据库相对来说也比较简单只要放出进行数据存取的接口就行。在这里主要考虑一个问题:
数据库扩展
数据库应该是可扩展的,毕竟不同的人可能希望使用不同的数据库进行存储。
所以我们应该设计一个工厂类,和特定数据库类。特定数据库类完成数据存储的真正工作并
放出接口。工厂类根据配置文件使用特定的数据库类,工厂类只放出接口并不实现,全部调
用指定的数据库类的接口
层次结构:
工厂类:
# ProxyPool/Db/DbClient.py
import sys
import os
from Util.getConfig import Config
sys.path.append(os.path.dirname(__file__))
class DbClient():
"""
数据库工厂类
"""
def __init__(self):
self.cOnfig= Config()
self.init_db_client()
def init_db_client(self):
# 所有支持的数据库类型
types = ['Redis']
db_type = self.config.db_type
assert db_type in types, 'DbTypeError: not support {}'.format(db_type)
self.client = getattr(__import__(db_type + 'Client'), db_type + 'Client')(self.config.db_host,
self.config.db_port)
def get(self):
return self.client.get()
def put(self, *args, **kwargs):
self.client.put(*args, **kwargs)
def get_all(self):
return self.client.get_all()
def get_status(self):
return self.client.get_status()
if __name__ == '__main__':
db = DbClient()
db.put('se2')
db.put('aaa2', sname='raw')
print(db.get())
print(db.get_all())
print(db.get_status())
涉及知识点:
特定数据库类(Redis):
# ProxyPool/Db/RedisClient.py
import sys
sys.path.append('../')
import redis
class RedisClient():
def __init__(self, host, port):
self._cOnn= redis.Redis(host, port)
def get(self):
proxy = self._conn.srandmember('useful')
if proxy:
return proxy.decode('ascii')
return None
# 存放方式待改进
def put(self, proxy, sname='useful'):
return self._conn.sadd(sname, proxy)
def get_all(self):
proxies = self._conn.smembers('useful')
if proxies:
proxy_list = []
for i in proxies:
proxy_list.append(i.decode('ascii'))
return proxy_list
return None
def get_status(self):
status = dict()
status['useful'] = self._conn.scard('useful')
status['raw'] = self._conn.scard('raw')
return status
if __name__ == '__main__':
r = RedisClient('xxxxxxxx', '6379')
print(r.get())
print(r.get_all())
因为工厂类中还需要数据库类型信息,所以给Config增加个函数:
# ProxyPool/Util/ConfigGetter.py
# ADD
@property
def db_type(self):
return self.parser['Database']['Type']
配置文件里面把数据库类型添加进去:
# ProxyPool/Config.ini
# ADD
Type = Redis
往数据库预存数据,然后在外部接口中增加相应接口测试数据库相关类是否正常工作:
# ProxyPool/Api/ProxyApi.py
from flask import Flask, jsonify
from Db.DbClient import DbClient
db = DbClient()
app = Flask('ProxyPool')
@app.route("/")
def hello():
return 'Hello Coder'
+@app.route("/get")
def get():
return jsonify(db.get())
@app.route("/get_all")
def get_all():
return jsonify(db.get_all())
@app.route("/get_status")
def get_status():
return jsonify(db.get_status())
if __name__ == '__main__':
app.run()
运行 ProxyApi.py,浏览器输入127.0.0.1:5010/get 可以获取到预先存入的数据,说明
外部接口,数据库,配置获取等都正常工作。
我们要对多个不同的网站进行采集,而每个网站的数据处理方式又不一样,所以针对
每个网站设计一个函数进行采集。为了方便管理,这些采集函数应该由一个类进行统
一管理,即采集函数作为类的静态函数。
考虑到采集函数会有一些共同点,例如获取页面,所以设计一些公共函数来处理通用
的事物,这里暂时只考虑获取页面的公共函数
对于获取页面的公共函数它涉及到一些需要定制的情况,例如某些网站可能对于请求
头有特殊的要求或者需要某些特殊的参数等等,所以公共函数要考虑可定制性。
采集函数管理:
# ProxyPool/Proxy/ProxyGetter.py
import sys
sys.path.append('../')
from Util.UtilFunction import get_html_tree
class ProxyGetter():
def __init__(self):
pass
@staticmethod
def get_proxy_one():
url = 'http://www.data5u.com/free/index.shtml'
html_tree = get_html_tree(url)
proxies = html_tree.xpath('//ul[@class="l2"]')
for proxy in proxies:
ip = proxy.xpath('.//span[1]/li/text()')[0]
port = proxy.xpath('.//span[2]/li/text()')[0]
# 所有采集函数都以 ip:port的形式抛出代理
yield ip + ':' + port
if __name__ == '__main__':
for i in ProxyGetter.get_proxy_one():
print(i)
公共函数:
# ProxyPool/Util/UtilFunction
from Util.WebRequest import WebRequest
from lxml import etree
def get_html_tree(url):
wr = WebRequest()
html = wr.get(url).content
公共函数可定制性:
# ProxyPool/Util/WebRequest.py
import requests
class WebRequest():
def __init__(self):
pass
@property
def header(self):
headers = {
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) '\
'Chrome/62.0.3202.94 Safari/537.36'
}
return headers
# 很多处理没做
def get(self, url, header=None):
headers = self.header
if header and isinstance(header, dict):
headers.update(header)
html = requests.get(url, headers=headers)
return html
涉及知识点:
到这里函数采集可以工作了,不过还没集成到项目里面
前面我们已经完成了代理采集,代理存储和外部接口,现在它们都是独立工作的,为
了把它们联系起来创建一个代理管理类:
先在配置文件中注册采集函数:
# ProxyPool/Config.ini
# ADD
[GetProxyFunction]
get_proxy_One= 1
配置获取类中增加相应函数:
# ProxyPool/Util/ConfigGetter.py
# ADD
@property
def get_proxy_function(self):
for func in self.parser['GetProxyFunction']:
if self.parser['GetProxyFunction'][func] == '1':
yield func
代理管理类:
# ProxyPool/Proxy/ProxyManager.py
import sys
sys.path.append('../')
from Proxy.ProxyGetter import ProxyGetter
from Db.DbClient import DbClient
from Util.ConfigGetter import Config
class ProxyManager():
"""
代理管理类
"""
def __init__(self):
self.db_client = DbClient()
self.cOnfig= Config()
def refresh(self):
for func in self.config.get_proxy_function:
fc = getattr(ProxyGetter, func)
for proxy in fc():
self.db_client.put(proxy, 'raw')
def get(self):
return self.db_client.get()
def get_all(self):
return self.db_client.get_all()
def get_status(self):
return self.db_client.get_status()
if __name__ == '__main__':
m = ProxyManager()
m.refresh()
print(m.get_all())
print(m.get_status())
运行ProxyPool/Proxy/ProxyManager.py,根据输出可以知道代理部分已经整合并且正常工作
涉及知识点:
将代理部分和外部接口整合起来:
# ProxyPool/Api/ProxyApi.py
import sys
sys.path.append('../')
from flask import Flask, jsonify
from Proxy.ProxyManager import ProxyManager
app = Flask('ProxyPool')
@app.route("/")
def hello():
return 'Hello Coder'
# 返回单个proxy带有双引号,待解决
@app.route("/get")
def get():
return jsonify(ProxyManager().get())
@app.route("/get_all")
def get_all():
return jsonify(ProxyManager().get_all())
@app.route("/get_status")
def get_status():
return jsonify(ProxyManager().get_status())
if __name__ == '__main__':
app.run()
运行ProxyPool/Api/ProxyApi.py,在浏览器中访问可以得到数据,说明代理部分
和外部接口整合成功。
到这里已经完成项目的基本框架
现在我们需要项目能够定时获取代理到raw中,并定时将raw中的可用代理放入
useful中。
首先我们要确保获取到的新代理格式正确再放入raw中,然后要确保raw中取出
来的代理可用才放入到useful中,为此增加两个公共函数。
代理验证:
# ProxyPool/Util/UtilFunction
# ADD
import re
import requests
# ADD
def proxy_format_valid(proxy):
ls = re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1, 5}', proxy)
return True if len(ls) == 1 and ls[0] == proxy else False
# ADD
def proxy_useful_valid(proxy):
proxies = {"http": "http://{0}".format(proxy)}
try:
r = requests.get('http://httpbin.org/ip', proxies=proxies, timeout=10, verify=False)
if r.status_code == 200:
return True
except Exception as e:
return False
对代理进行处理的过程中涉及代理的弹出和判断是否存在等操作,所以数据库类
增加相应函数
工厂类:
# ProxyPool/Db/DbClient.py
# ADD
def exists(self, *args, **kwargs):
return self.client.exists(*args, **kwargs)
# ADD
def pop(self, *args, **kwargs):
return self.client.pop(*args, **kwargs)
特定数据库类:
# ProxyPool/Db/RedisClient.py
# ADD
def exists(self, proxy):
"""
判断proxy是否存在数据库的useful中
:param proxy:
:return:
"""
return self._conn.sismember('useful', proxy)
# ADD
def pop(self, sname):
"""
从数据库的sname表中弹出一个代理
:param sname:表名
:return:
"""
proxy = self._conn.spop(sname)
if proxy:
return proxy.decode('ascii')
else:
return None
改进ProxyManager的refresh函数,使它能把获取到的代理插入数据库
# ProxyPool/Proxy/ProxyManager.py
# ADD
from Util.UtilFunction import proxy_format_valid
# Modify
def refresh(self):
"""
获取新代理放入raw中
:return:
"""
proxies = set()
for func in self.config.get_proxy_function:
fc = getattr(ProxyGetter, func)
for proxy in fc():
proxies.add(proxy)
for proxy in proxies:
if proxy_format_valid(proxy)
# useful中
if self.db_client.exists(proxy):
continue
# raw中
self.db_client.put(proxy)
上面已经完成代理格式和可用性的检查,也完成了代理的获取和放入raw中,现在
可以设计调度类让以上工作定期执行同时将raw中可用代理放入useful
检查raw的调度类:
# ProxyPool/Schedule/ProxyRefreshSchudule.py
import sys
sys.path.append('../')
from Proxy.ProxyManager import ProxyManager
from Util.UtilFunction import proxy_useful_valid
from threading import Thread
from apscheduler.schedulers.blocking import BlockingScheduler
class ProxyRefreshSchedule(ProxyManager):
"""
定时刷新raw中代理,将可用代理放入useful
"""
def __init__(self):
ProxyManager.__init__(self)
self.raw_set = 'raw'
self.useful_set = 'useful'
def start(self):
proxy = self.db_client.pop(self.raw_set)
while proxy:
if proxy_useful_valid(proxy):
self.db_client.put(proxy, self.useful_set)
print('proxy valid {}'.format(proxy))
else:
print('proxy not valid {}'.format(proxy))
proxy = self.db_client.pop(self.raw_set)
print('end')
def refresh_pool():
prs = ProxyRefreshSchedule()
prs.start()
def mul_thread_refresh(threads=10):
"""
多线程刷新代理
:param threads:线程数量
:return:
"""
p = ProxyRefreshSchedule()
# 获取新代理
p.refresh()
pl = []
# 多线程检查
for i in range(threads):
p = Thread(target=refresh_pool, args=())
pl.append(p)
for p in pl:
p.deamon = True
p.start()
for p in pl:
p.join()
def run():
"""
定时刷新
:return:
"""
mul_thread_refresh()
schedule = BlockingScheduler()
schedule.add_job(mul_thread_refresh, 'interval', minutes=1)
schedule.start()
if __name__ == '__main__':
run()
涉及知识点:
现在项目每隔一分钟就会获取新代理存入raw,然后检查raw中的代理将可用的代理
存入useful。
代理都是有时效性的,也许放进useful的时候还能用但过了几秒它就失效了,所以
要定期的检查useful中的代理,将不可用的剔除。
首先数据库相关类增加删除代理的接口
工厂类:
# ProxyPool/Db/DbClient.py
# ADD
def delete(self, *args, **kwargs):
self.client.delete(*args, **kwargs)
特定数据库类:
# ProxyPool/Db/RedisClient.py
# ADD
def delete(self, proxy, sname):
"""
从sname表中删除指定代理
:param proxy:
:param sname: 表名
:return:
"""
self._conn.srem(sname, proxy)
接下来设计刷新useful的类
检查useful:
# ProxyPool/Schedule/ProxyCheck.py
import sys
sys.path.append('../')
from Proxy.ProxyManager import ProxyManager
from Util.UtilFunction import proxy_useful_valid
from threading import Thread
from time import sleep
class ProxyCheck(ProxyManager, Thread):
"""
检查useful中的代理,不可用的删除
"""
def __init__(self):
ProxyManager.__init__(self)
Thread.__init__(self)
def run(self):
while True:
proxy = self.db_client.pop('useful')
while proxy:
if proxy_useful_valid(proxy):
print('useful valid pass {0}'.format(proxy))
self.db_client.put(proxy, 'useful')
else:
print('useful valid faild {0}'.format(proxy))
self.db_client.delete(proxy, 'useful')
proxy = self.db_client.pop('useful')
sleep(5 * 60)
if __name__ == '__main__':
p = ProxyCheck()
p.run()
调度检查:
# ProxyPool/Schedule/ProxyValidSchedule.py
import sys
sys.path.append('../')
from Schedule.ProxyCheck import ProxyCheck
class ProxyValidSchedule():
def __init__(self):
pass
def start(self, threads=5):
pl = []
for i in range(threads):
pl.append(ProxyCheck())
for i in pl:
i.deamon = True
i.start()
for i in pl:
i.join()
def run():
pp = ProxyValidSchedule()
pp.start()
if __name__ == '__main__':
p = ProxyValidSchedule()
p.start()
对于useful的检查和调度检查类你可能会觉得奇怪,ProxyCheck的run方法不是死
循环吗?还有调度类为什么不像 ProxyRefreshSchedule 那样使用 BlockingSchedule ?
首先解释为什么不用BlockingSchedule,ProxyRefreshSchedule是获取代理后才
立即执行的,代理只需要隔一段时间获取一次就好所以代理获取的功能不能和代
理检查整合到一块,不然每个检查线程执行的时候都会执行一次代理获取,这显
然不是我们需要的。这时我们就需要一个调度器隔一段时间进行一次代理获取然
后创建多个线程对获取的代理进行检查。回到useful的检查,因为检查只取决于
useful中存在的代理,所以没必要用BlockingSchedule。
然后是死循环问题,没错那就是一个死循环。你也许会想既然是死循环那么在
useful的代理耗尽之前检查不是不会退出么?那其他工作就没法继续了啊?其实
只要raw代理的刷新调度在useful的代理刷新调度之前启动就没问题,因为raw
调度是定期执行的,useful调度执行过程中如果raw调度的时间到了会切换到raw
调度执行,raw调度执行完才切换回useful调度。
为什么要设计成死循环?因为useful中的代理随时可能失效,所以检查应该时刻
进行着,这样才能及时把失效代理剔除。
涉及知识点:
到这里项目已经可以运行了,只需要将Api,raw和useful调度的主文件分别执行即可。
上一节的内容完成后其实项目已经可以运行,但是文件分散运行起来不方便,所
以将启动环节整合一下。
外部接口:
# ProxyPool/Api/ProxyApi.py
# ADD
def run():
app.run()
项目运行:
# ProxyPool/Run/main.py
import sys
sys.path.append('../')
from multiprocessing import Process
from Api.ProxyApi import run as ApiRun
from Schedule.ProxyValidSchedule import run as ValidRun
from Schedule.ProxyRefreshSchudule import run as RefreshRun
def run():
pl = []
p1 = Process(target=ApiRun, name='ApiRun')
pl.append(p1)
p2 = Process(target=ValidRun, name='ValidRun')
pl.append(p2)
p3 = Process(target=RefreshRun, name='RefreshRun')
pl.append(p3)
for p in pl:
p.deamon = True
p.start()
for p in pl:
p.join()
if __name__ == '__main__':
run()
运行ProxyPool/Run/main.py即可启动项目。但是启动后好像有哪里不对???
单例模式 是设计模式的一种,简单将就是类在整个运行过程中只有一个实例,
这对某些资源的统一管理以及节省内存有很大作用
项目采用 元类 的方式实现单例模式
单例元类:
# ProxyPool/Util/UtilClass
class Singleton(type):
"""
控制单例的元类
资料:https://segmentfault.com/a/1190000008141049
"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
考虑一下需要单例的类,配置管理类 Util/ConfigGetter,数据库工厂类 Db/DbClient,
代理管理类 Proxy/ProxyManager 好像也行?
这里其实可以排除ProxyManager,因为Schedule/ProxyRefreshSchedule是继承自
它的,如果ProxyManager是单例的,那么ProxyRefreshSchedule就无法多线程进
行刷新。
类变成单例模式很简单,继承单例类即可
数据库工厂类:
# ProxyPool/Db/DbClient.py
# ADD
from Util.UtilClass import Singleton
# Modify
class DbClient(metaclass=Singleton):
配置管理类:
# ProxyPool/Util/ConfigGetter.py
# ADD
from Util.UtilClass import Singleton
# Modify
class Config(metaclass=Singleton):
简历单例元类,需要单例模式的类继承该元类就完成了特定类的单例化
涉及知识点:
延迟绑定 主要用于提高性能,减少重复运算以及程序对内存的需求
项目采用类装饰器的方式实现延迟绑定
延迟装饰类:
# ProxyPool/Util/UtilClass.py
class LazyProperty(object):
"""
延迟绑定类
资料:https://segmentfault.com/a/1190000005818249
"""
def __init__(self, func):
self.func = func
def __get__(self, instance, cls):
val = self.func(instance)
setattr(instance, self.func.__name__, val)
return val
项目中延迟绑定主要用于配置管理类Util/ConfigGetter,将ConfigGetter中的@property
都换成@LazyProperty即可
配置管理类:
# ProxyPool/Util/ConfigGetter.py
# Modify
from Util.UtilClass import LazyProperty, Singleton
# Modify
all @property change to @ LazyProperty
到这里延迟绑定就完成了
涉及知识点:
以上就完成了单例模式和延迟绑定
回想之前的数据库操作,尤其是插入操作我们需要依赖插入函数的默认函数或者直
接传入参数来将代理插入到指定的表中,这样其实不太规范也不方便使用,所以给
数据库相关类增加个改变表的函数
还有就是连接数据库的时候也没有指定数据库的功能,这也要加上。当然具体是否
指定,怎样指定这些都要根据不同的数据库的实际来做,项目中的数据库是Redis,
也许会和其他数据库操作不同
指定数据库,在Config.ini中增加相应的项即可
# ProxyPool/Config.ini
[DataBase]
# ADD
Name = proxy
配置管理类:
# ProxyPool/Util/ConfigGetter.py
# ADD
@LazyProperty
def db_name(self):
return self.parser['Database']['Name']
工厂类:
# ProxyPool/Db/DbClient.py
# Modify
self.client = getattr(__import__(db_type + 'Client'), db_type + 'Client')(self.config.db_name,
self.config.db_host,
self.config.db_port)
特定数据库类,项目中使用Redis且指定数据库为0,所以Config.ini中的Name实际
不起作用,但也可先占个位
特定数据库类:
# ProxyPool/Db/RedisClient.py
# Modify
def __init__(self, table, host, port):
"""
para:table Config.ini中的Name
"""
self._table = table
self._cOnn= redis.Redis(host=host, port=port, db=0)
接下来给数据库相关类增加改变表的接口
工厂类:
# ProxyPool/Db/DbClient.py
# ADD
def change_table(self, *args, **kwargs):
self.client.change_table(*args, **kwargs)
特定数据库类中增加change_table后改动较多,主要就是原来依赖于默认
参数或者手动指定参数确定表的地方全部改为使用self.table,而self.table
的改变由change_table完成
特定数据库类:
# ProxyPool/Db/RedisClient.py
def get(self):
...
# Modify
proxy = self._conn.srandmember(self._table)
...
# Modify
def put(self, proxy):
"""
将proxy放入数据库
:param proxy:
:return:
"""
return self._conn.sadd(self._table, proxy)
def get_all(self):
...
# Modify
proxies = self._conn.smembers(self._table)
...
# Modify
def get_status(self):
return self._conn.scard(self._table)
def exists(self, proxy)
...
# Modify
return self._conn.sismember(self._table, proxy)
...
# Modify
def pop(self):
"""
从数据库中弹出一个proxy
:return:
"""
proxy = self._conn.spop(self._table)
if proxy:
return proxy.decode('ascii')
else:
return None
# Modify
def delete(self, proxy):
"""
从数据中删除一个代理
:param proxy:
:return:
"""
self._conn.srem(self._table, proxy)
# ADD
def change_table(self, table):
"""
改变当前数据库表
:param table:
:return:
"""
self._table = table
然后就是需要对数据库进行插入操作的地方修改过来
raw调度:
# ProxyPool/Schedule/ProxyRefreshSchedule.py
class ProxyRefreshSchedule(ProxyManager):
"""
定时刷新raw中代理,将可用代理放入useful
"""
def __init__(self):
ProxyManager.__init__(self)
# Modify
def start(self):
self.db_client.change_table(self.raw_proxy)
proxy = self.db_client.pop()
while proxy:
if proxy_useful_valid(proxy):
self.db_client.change_table(self.useful_proxy)
self.db_client.put(proxy)
self.db_client.change_table(self.raw_proxy)
print('proxy valid {}'.format(proxy))
else:
print('proxy not valid {}'.format(proxy))
proxy = self.db_client.pop()
useful检查:
# ProxyPool/Schedule/ProxyCheck.py
class ProxyCheck(ProxyManager, Thread):
"""
检查useful中的代理,不可用的删除
"""
def __init__(self):
ProxyManager.__init__(self)
Thread.__init__(self)
# Modify
def run(self):
self.db_client.change_table(self.useful_proxy)
while True:
proxy = self.db_client.pop()
while proxy:
if proxy_useful_valid(proxy):
print('useful valid pass {0}'.format(proxy))
self.db_client.put(proxy)
else:
print('useful valid faild {0}'.format(proxy))
self.db_client.delete(proxy)
proxy = self.db_client.pop()
sleep(1 * 60)
代理管理:
# ProxyPool/Proxy/ProxyManager.py
import sys
sys.path.append('../')
from Proxy.ProxyGetter import ProxyGetter
from Db.DbClient import DbClient
from Util.ConfigGetter import Config
from Util.UtilFunction import proxy_format_valid
class ProxyManager():
"""
代理管理类
"""
def __init__(self):
self.db_client = DbClient()
self.cOnfig= Config()
self.raw_proxy = 'raw'
self.useful_proxy = 'useful'
def refresh(self):
"""
获取新代理放入raw中
:return:
"""
proxies = set()
for func in self.config.get_proxy_function:
fc = getattr(ProxyGetter, func)
for proxy in fc():
print('fetch proxy {0}'.format(proxy))
proxies.add(proxy)
for proxy in proxies:
if proxy_format_valid(proxy):
self.db_client.change_table(self.useful_proxy)
if self.db_client.exists(proxy):
continue
self.db_client.change_table(self.raw_proxy)
self.db_client.put(proxy)
def get(self):
"""
返回useful中一个代理
:return:
"""
self.db_client.change_table(self.useful_proxy)
return self.db_client.get()
def get_all(self):
"""
返回useful中所有代理
:return:
"""
self.db_client.change_table(self.useful_proxy)
return self.db_client.get_all()
def get_status(self):
"""
获取代理存储状态
:return:
"""
status = dict()
self.db_client.change_table(self.raw_proxy)
status[self.raw_proxy] = self.db_client.get_status()
self.db_client.change_table(self.useful_proxy)
status[self.useful_proxy] = self.db_client.get_status()
return status
到这里项目可以日常使用,各个类也基本完善
日志对于一个需要长期稳定运行的项目是必不可少的,不然项目出问题你
都不知道去哪里找原因。
日志一般来说都会保存在 .log文件中,但是如果所有日志都保存在一个文
件那么时间长了文件就会很大,难以阅读,所以日志文件应该每隔一段时
间就新建一个,然后到了一定的时间还要删除太旧的日志
项目中日志默认会输出在终端以及保存在文件中
为了满足以上对日志类的要求,我们需要继承内置的 logging.Logger 并定
制自己的日志类
日志类:
import os
import logging
from logging.handlers import TimedRotatingFileHandler
CRITICAL = 50
ERROR = 40
WARNING = 30
INFO = 20
DEBUG = 10
NOTSET = 0
CURRENT_PATH = os.path.dirname(os.path.abspath(__file__))
# 更保险的做法 https://stackoverflow.com/questions/2860153/how-do-i-get-the-parent-directory-in-python
ROOT_PATH = os.path.abspath(os.path.join(CURRENT_PATH, os.pardir))
LOG_PATH = os.path.join(ROOT_PATH, 'Log')
class LogHandler(logging.Logger):
"""
日志类
"""
def __init__(self, name, level=DEBUG, stream=True, file=True):
self.name = name
self.level = level
logging.Logger.__init__(self, self.name, self.level)
if stream:
self._set_stream_handler_()
if file:
self._set_file_handler_()
def _set_stream_handler_(self):
stream_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
stream_handler.setFormatter(formatter)
stream_handler.setLevel(self.level)
self.addHandler(stream_handler)
def _set_file_handler_(self):
file_name = os.path.join(LOG_PATH, '{}.log'.format(self.name))
# 定期更换日志文件
file_handler = TimedRotatingFileHandler(filename=file_name, when='D', interval=1, backupCount=10)
file_handler.suffix = '%Y%m%d.log'
formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
file_handler.setFormatter(formatter)
file_handler.setLevel(self.level)
self.file_handler = file_handler
self.addHandler(file_handler)
然后在需要使用日志的地方使用即可,例如项目中使用print的地方
涉及知识点:
在 “项目初运行” 那一章的末尾提到 “好像有哪里不对”,其实项目确实有个
bug,那就是useful检查的时候ProxyCheck不能按照调度定时执行检查。
异常代码:
def run(self):
self.db_client.change_table(self.useful_proxy)
while True:
proxy = self.db_client.pop()
while proxy:
if proxy_useful_valid(proxy):
print('useful valid pass {0}'.format(proxy))
self.db_client.put(proxy)
else:
print('useful valid faild {0}'.format(proxy))
self.db_client.delete(proxy)
proxy = self.db_client.pop()
sleep(1 * 60)
注意到run函数只在进入while循环之前做了一个改变表的动作,这时表
切换到useful中,代码正确执行。但是当ProxyRefreshSchedule开始
执行的时候表会切换到raw并且raw中的代理将消耗完,当ProxyRefreshSchedule
执行完后表还是raw并且raw中无数据,这时切换到ProxyCheck,它检
查的就是raw中的代理,由于raw中无代理所以pop不出数据,然后ProxyCheck
就会睡眠。然后又到ProxyRefreshSchedule执行,执行完又到ProxyCheck,
过程中表一直是raw,所以ProxyCheck将陷入无尽的睡眠
正确代码:
def run(self):
while True:
self.db_client.change_table(self.useful_proxy)
proxy = self.db_client.pop()
while proxy:
if proxy_useful_valid(proxy):
print('useful valid pass {0}'.format(proxy))
self.db_client.put(proxy)
else:
print('useful valid faild {0}'.format(proxy))
self.db_client.delete(proxy)
proxy = self.db_client.pop()
sleep(1 * 60)
到这里项目就完成了,各个类也已经完善,可以部署上线长期运行了