先上两个基础的解决方案,后续还会通过消息队列,celery, kafka, akka来升级解决方案
#! python3
# -*- coding: utf-8 -*-
import time
import pymysql
import html2text
from DBUtils.PooledDB import PooledDB
from sentiment_analysis import Sentiment_Analysis
from dialogue.dumblog import dlog
logger = dlog(__file__, console='debug')
class DB(object):
pool = PooledDB(creator=pymysql, mincached=1, maxcached=20, host='',
port=33312, user='', passwd='', db='table',
charset='utf8', cursorclass=pymysql.cursors.DictCursor)
def connect(self):
db = self.pool.connection()
cursor = db.cursor()
return db, cursor
def row_count(self):
db, cursor = self.connect()
cursor.execute("SELECT COUNT(*) FROM cluster WHERE binary_sentiment is NULL")
row_count = cursor.fetchone()['COUNT(*)']
return row_count
def calculate_sentiment(self, news_dict):
sa = Sentiment_Analysis()
result_tuple = sa.sentiment_analysis(news_dict)[0]
binary_sentiment_degree = int(result_tuple[0])
probability_sentiment_degree = result_tuple[1]
return binary_sentiment_degree, probability_sentiment_degree
def loop_data(self, rows):
res = []
for row in rows:
print(row['object_id'])
title = html2text.html2text(row['title'])
print(title)
content = html2text.html2text(row['content'])
news_dict = {'title': title, 'content': content}
binary_sentiment_degree, probability_sentiment_degree = self.calculate_sentiment(news_dict)
res.append({'object_id': row['object_id'], 'title': title, 'content': content, 'binary_sentiment_degree': binary_sentiment_degree, 'probability_sentiment_degree': probability_sentiment_degree})
return res
def update_data(self, res):
attempts = 0
basic_sleep_time = 1.2
base_time = 10
while attempts <100:
try:
db, cursor &#61; self.connect()
for item in res:
# res长度为1000条
cursor.execute("""
UPDATE cluster
SET title&#61;%s, content&#61;%s, binary_sentiment&#61;%s, probability_sentiment&#61;%s, update_time&#61;%s
WHERE object_id&#61;%s
""", (item[&#39;title&#39;], item[&#39;content&#39;], item[&#39;binary_sentiment_degree&#39;], item[&#39;probability_sentiment_degree&#39;], time.strftime(&#39;%Y-%m-%d %H:%M:%S&#39;), item[&#39;object_id&#39;]))
db.commit()
except Exception as err:
# 记录log
logger.error(err)
attempts &#43;&#61; 1
total_time &#61; base_time * (basic_sleep_time ** attempts)
logger.info(total_time)
time.sleep(total_time)
else:
break
finally:
cursor.close()
db.close()
def shift_data(self):
row_count &#61; self.row_count()
logger.info("total rows need to be updated {}".format(row_count))
offset_number &#61; 0
while offset_number <&#61; row_count:
db, cursor &#61; self.connect()
cursor.execute("SELECT * FROM cluster WHERE binary_sentiment is NULL order by object_id LIMIT 1000 OFFSET %s" % offset_number)
rows &#61; cursor.fetchall()
res &#61; self.loop_data(rows)
self.update_data(res)
logger.info("already update rows: {}".format(offset_number))
offset_number &#43;&#61; 1000
def main():
db &#61; DB()
db.shift_data()
if __name__ &#61;&#61; &#39;__main__&#39;:
main()
第一种解决方案通过limit, offset分页解决&#xff0c;同时针对数据库不停的断开的情况加入了重试和连接池机制(数据库不停断开我真的是第一次遇到&#xff0c;报错就是mysql server has gone away&#xff0c; 真的很诡异)&#xff0c;同时在面对这么多数据的时候&#xff0c;普通写法cursor.execute, peewee orm全部都会失效&#xff0c;因为它们会将数据一次性载入内存中&#xff0c;会报内存不足的错误&#xff0c;分页才能解决内存不足的错误。但这种方法的问题是什么呢&#xff1f;limit offset越到后面越慢&#xff0c;还有就是我这里主要是在用机器学习算法更新文章的情感系数&#xff0c;而我这里只用了一个进程去跑&#xff0c;自然很慢的&#xff0c;用代码性能检测工具cprofile去检测更新1000条数据的性能和时间&#xff0c;会发现除了建立连接的时间&#xff0c;就是情感分析的代码消耗时间最长了&#xff0c;
import cProfile
import pstats
cProfile.run("db.shift_data()", "thing.txt")
p &#61; pstats.Stats("thing.txt")
p.sort_stats("cumulative").print_stats(100)
所以自然引出多进程的版本
#! python3
# -*- coding: utf-8 -*-
import MySQLdb
from MySQLdb.cursors import DictCursor
import html2text
from multiprocessing import Process, cpu_count, Queue
from DBUtils.PooledDB import PooledDB
from sentiment_analysis import Sentiment_Analysis
POOL &#61; PooledDB(creator&#61;MySQLdb, mincached&#61;1, maxcached&#61;20, host&#61;&#39;&#39;,
port&#61;33312, user&#61;&#39;&#39;, passwd&#61;&#39;&#39;, db&#61;&#39;table&#39;,
charset&#61;&#39;utf8mb4&#39;, use_unicode&#61;True, cursorclass&#61;DictCursor)
def connect():
db &#61; POOL.connection()
cursor &#61; db.cursor()
return db, cursor
def add_job(job):
db, cursor &#61; connect()
total &#61; 8746218
print("total rows need to be updated {}".format(total))
offset_number &#61; 0
while offset_number <&#61; total:
sql &#61; """SELECT * FROM cluster order by object_id LIMIT %s OFFSET %s"""
cursor.execute(sql, (1000, offset_number))
rows &#61; cursor.fetchall()
job.put(rows)
offset_number &#43;&#61; 1000
#print("already update {} rows".format(offset_number))
def create_process(job, concurrency):
for _ in range(concurrency):
process &#61; Process(target&#61;worker, args&#61;(job, ))
process.start()
def worker(job):
while True:
try:
task &#61; job.get()
insert(task)
except Exception as err:
print(err)
def insert(task):
batch &#61; []
for item in task:
title &#61; html2text.html2text(item[&#39;title&#39;])
content &#61; html2text.html2text(item[&#39;content&#39;])
news_dict &#61; {&#39;title&#39;: title, &#39;content&#39;: content}
sa &#61; Sentiment_Analysis()
result_tuple &#61; sa.sentiment_analysis(news_dict)[0]
binary_sentiment_degree &#61; int(result_tuple[0])
probability_sentiment_degree &#61; result_tuple[1]
batch.append((item[&#39;object_id&#39;],
item[&#39;url&#39;],
item[&#39;title&#39;],
item[&#39;html&#39;],
item[&#39;content&#39;],
item[&#39;category_name&#39;],
item[&#39;occur_time&#39;],
item[&#39;category_link&#39;],
item[&#39;desc&#39;],
item[&#39;source&#39;],
item[&#39;created_time&#39;],
item[&#39;keyword&#39;],
binary_sentiment_degree,
probability_sentiment_degree,
item[&#39;update_time&#39;]))
db, cursor &#61; connect()
sql &#61; """INSERT INTO news (object_id, url, title, html, content, category_name, occur_time, category_link, news.desc, source, created_time, keyword, binary_sentiment, probability_sentiment, update_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
try:
cursor.executemany(sql, batch)
db.commit()
print(&#39;1000 rows has inserted!&#39;)
except MySQLdb.IntegrityError:
pass
finally:
cursor.close()
db.close()
def start(concurrency&#61;cpu_count()):
job &#61; Queue()
create_process(job, concurrency)
add_job(job)
try:
job.close()
job.join_thread()
except Exception as err:
print(err)
if __name__ &#61;&#61; &#39;__main__&#39;:
start()
首先我要解释一下为什么这里切换到了MySQLdb, 是有原因的&#xff0c;因为代码在linux上跑的时候&#xff0c;用pymysql竟然报了编码错误&#xff0c;错误信息如下&#xff1a;
cursor.execute(sql, (1000, offset_number)), unicodedecodeerror: &#39;utf-8&#39; codec can&#39;t decode byte 0xb1 in position 5221: invalid start byte
反正也很诡异&#xff0c;然后我只能切换到MySQLdb,https://stackoverflow.com/questions/25865270/how-to-install-python-mysqldb-module-using-pipstackoverflow.com
主要参照这篇文章来去安装&#xff0c;注意是安装mysqlclient&#xff0c;但有可能会报错&#xff0c;要先安装sudo apt-get install python-pip python-dev libmysqlclient-dev&#xff0c;然后这个问题就解决了&#xff0c;其实这里的多进程是采用了读写分离&#xff0c;一个生产者&#xff0c;多个消费者的写法(有多少个进程&#xff0c;就有多少个消费者)&#xff0c;将数据全部转移到了另外一张表。
大概情况就是这样&#xff0c;但还有没有优化的空间呢&#xff1f;可以考虑消息队列&#xff0c;比方说每次读取100万数据&#xff0c;平均分配给10个队列&#xff0c;作为10个生产者队列&#xff0c;然后每1个生产者队列又切分成10份&#xff0c;也就是100个消费者队列&#xff0c;思路就是这样。至于为什么选择celery, kafka, akka, 一个是主流的中间件框架&#xff0c;还有就是python, java, scala用的比较多。
未完待续&#xff0c;下面的文章就是队列版本了。
同时python打日志可以用logbook, 任务调用除了crontab可以用Azkabanhttps://logbook.readthedocs.io/en/stable/logbook.readthedocs.io王彦鸿&#xff1a;Azkaban入门篇zhuanlan.zhihu.com
后续更新&#xff1a;在与别人交流过后&#xff0c;有一些地方需要更新&#xff0c;首先可以用limit offset, 但一定要用order by&#xff0c;然后每次取1000条中最大的&#xff0c;不断更新最大的数值&#xff0c;还有就是mysql一张表如果优化的好的话&#xff0c;1亿的数据存在一张表里面是完全没有问题的。