python链家网二手房异步IO爬虫,使用asyncio、aiohttp和aiomysql
很多小伙伴初学python时都会学习到爬虫,刚入门时会使用requests、urllib这些同步的库进行单线程爬虫,速度是比较慢的,后学会用scrapy框架进行爬虫,速度很快,原因是scrapy是基于twisted多线程异步IO框架。
本例使用的asyncio也是一个异步IO框架,在python3.5以后加入了协程的关键字async,能够将协程和生成器区分开来,更加方便使用协程。
经过测试,平均1秒可以爬取30个详情页信息
可以使用asyncio.Semaphore来控制并发数,达到限速的效果
# -*- coding: utf-8 -*-
""":author: KK:url: http://github.com/PythonerKK:copyright: © 2019 KK <705555262&#64;qq.com.com>
"""
import asyncio
import re
import aiohttp
from pyquery import PyQuery
import aiomysql
from lxml import etreepool &#61; &#39;&#39;
#sem &#61; asyncio.Semaphore(4) 用来控制并发数&#xff0c;不指定会全速运行
stop &#61; False
headers &#61; {&#39;User-Agent&#39;: &#39;Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36&#39;
}
MAX_PAGE &#61; 10
TABLE_NAME &#61; &#39;data&#39; #数据表名
city &#61; &#39;zh&#39; #城市简写
url &#61; &#39;https://{}.lianjia.com/ershoufang/pg{}/&#39; #url地址拼接
urls &#61; [] #所有页的url列表
links_detail &#61; set() #爬取中的详情页链接的集合
crawled_links_detail &#61; set() #爬取完成的链接集合&#xff0c;方便去重async def fetch(url, session):&#39;&#39;&#39;aiohttp获取网页源码&#39;&#39;&#39;# async with sem:try:async with session.get(url, headers&#61;headers, verify_ssl&#61;False) as resp:if resp.status in [200, 201]:data &#61; await resp.text()return dataexcept Exception as e:print(e)def extract_links(source):&#39;&#39;&#39;提取出详情页的链接&#39;&#39;&#39;pq &#61; PyQuery(source)for link in pq.items("a"):_url &#61; link.attr("href")if _url and re.match(&#39;https://.*?/\d&#43;.html&#39;, _url) and _url.find(&#39;{}.lianjia.com&#39;.format(city)):links_detail.add(_url)print(links_detail)def extract_elements(source):&#39;&#39;&#39;提取出详情页里面的详情内容&#39;&#39;&#39;try:dom &#61; etree.HTML(source)id &#61; dom.xpath(&#39;//link[&#64;rel&#61;"canonical"]/&#64;href&#39;)[0]title &#61; dom.xpath(&#39;//title/text()&#39;)[0]price &#61; dom.xpath(&#39;//span[&#64;class&#61;"unitPriceValue"]/text()&#39;)[0]information &#61; dict(re.compile(&#39;(.*?)(.*?)&#39;).findall(source))information.update(title&#61;title, price&#61;price, url&#61;id)print(information)asyncio.ensure_future(save_to_database(information, pool&#61;pool))except Exception as e:print(&#39;解析详情页出错&#xff01;&#39;)passasync def save_to_database(information, pool):&#39;&#39;&#39;使用异步IO方式保存数据到mysql中注&#xff1a;如果不存在数据表&#xff0c;则创建对应的表&#39;&#39;&#39;COLstr &#61; &#39;&#39; # 列的字段ROWstr &#61; &#39;&#39; # 行字段ColumnStyle &#61; &#39; VARCHAR(255)&#39;for key in information.keys():COLstr &#61; COLstr &#43; &#39; &#39; &#43; key &#43; ColumnStyle &#43; &#39;,&#39;ROWstr &#61; (ROWstr &#43; &#39;"%s"&#39; &#43; &#39;,&#39;) % (information[key])# 异步IO方式插入数据库async with pool.acquire() as conn:async with conn.cursor() as cur:try:await cur.execute("SELECT * FROM %s" % (TABLE_NAME))await cur.execute("INSERT INTO %s VALUES (%s)"%(TABLE_NAME, ROWstr[:-1]))print(&#39;插入数据成功&#39;)except aiomysql.Error as e:await cur.execute("CREATE TABLE %s (%s)" % (TABLE_NAME, COLstr[:-1]))await cur.execute("INSERT INTO %s VALUES (%s)" % (TABLE_NAME, ROWstr[:-1]))except aiomysql.Error as e:print(&#39;mysql error %d: %s&#39; % (e.args[0], e.args[1]))async def handle_elements(link, session):&#39;&#39;&#39;获取详情页的内容并解析&#39;&#39;&#39;print(&#39;开始获取: {}&#39;.format(link))source &#61; await fetch(link, session)#添加到已爬取的集合中crawled_links_detail.add(link)extract_elements(source)async def consumer():&#39;&#39;&#39;消耗未爬取的链接&#39;&#39;&#39;async with aiohttp.ClientSession() as session:while not stop:if len(urls) !&#61; 0:_url &#61; urls.pop()source &#61; await fetch(_url, session)print(_url)extract_links(source)if len(links_detail) &#61;&#61; 0:print(&#39;目前没有待爬取的链接&#39;)await asyncio.sleep(2)continuelink &#61; links_detail.pop()if link not in crawled_links_detail:asyncio.ensure_future(handle_elements(link, session))async def main(loop):global poolpool &#61; await aiomysql.create_pool(host&#61;&#39;127.0.0.1&#39;, port&#61;3306,user&#61;&#39;root&#39;, password&#61;&#39;xxxxxx&#39;,db&#61;&#39;aiomysql_lianjia&#39;, loop&#61;loop, charset&#61;&#39;utf8&#39;,autocommit&#61;True)for i in range(1, MAX_PAGE):urls.append(url.format(city, str(i)))print(&#39;爬取总页数&#xff1a;{} 任务开始...&#39;.format(str(MAX_PAGE)))asyncio.ensure_future(consumer())if __name__ &#61;&#61; &#39;__main__&#39;:loop &#61; asyncio.get_event_loop()asyncio.ensure_future(main(loop))loop.run_forever()