热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

mysql百万数据更新_如何更新数据库百万级,千万级数据?

先上两个基础的解决方案,后续还会通过消息队列,celery,kafka,akka来升级解决方案#!python3#-*-coding:utf-8-*-i

先上两个基础的解决方案,后续还会通过消息队列,celery, kafka, akka来升级解决方案

#! python3

# -*- coding: utf-8 -*-

import time

import pymysql

import html2text

from DBUtils.PooledDB import PooledDB

from sentiment_analysis import Sentiment_Analysis

from dialogue.dumblog import dlog

logger = dlog(__file__, console='debug')

class DB(object):

pool = PooledDB(creator=pymysql, mincached=1, maxcached=20, host='',

port=33312, user='', passwd='', db='table',

charset='utf8', cursorclass=pymysql.cursors.DictCursor)

def connect(self):

db = self.pool.connection()

cursor = db.cursor()

return db, cursor

def row_count(self):

db, cursor = self.connect()

cursor.execute("SELECT COUNT(*) FROM cluster WHERE binary_sentiment is NULL")

row_count = cursor.fetchone()['COUNT(*)']

return row_count

def calculate_sentiment(self, news_dict):

sa = Sentiment_Analysis()

result_tuple = sa.sentiment_analysis(news_dict)[0]

binary_sentiment_degree = int(result_tuple[0])

probability_sentiment_degree = result_tuple[1]

return binary_sentiment_degree, probability_sentiment_degree

def loop_data(self, rows):

res = []

for row in rows:

print(row['object_id'])

title = html2text.html2text(row['title'])

print(title)

content = html2text.html2text(row['content'])

news_dict = {'title': title, 'content': content}

binary_sentiment_degree, probability_sentiment_degree = self.calculate_sentiment(news_dict)

res.append({'object_id': row['object_id'], 'title': title, 'content': content, 'binary_sentiment_degree': binary_sentiment_degree, 'probability_sentiment_degree': probability_sentiment_degree})

return res

def update_data(self, res):

attempts = 0

basic_sleep_time = 1.2

base_time = 10

while attempts <100:

try:

db, cursor &#61; self.connect()

for item in res:

# res长度为1000条

cursor.execute("""

UPDATE cluster

SET title&#61;%s, content&#61;%s, binary_sentiment&#61;%s, probability_sentiment&#61;%s, update_time&#61;%s

WHERE object_id&#61;%s

""", (item[&#39;title&#39;], item[&#39;content&#39;], item[&#39;binary_sentiment_degree&#39;], item[&#39;probability_sentiment_degree&#39;], time.strftime(&#39;%Y-%m-%d %H:%M:%S&#39;), item[&#39;object_id&#39;]))

db.commit()

except Exception as err:

# 记录log

logger.error(err)

attempts &#43;&#61; 1

total_time &#61; base_time * (basic_sleep_time ** attempts)

logger.info(total_time)

time.sleep(total_time)

else:

break

finally:

cursor.close()

db.close()

def shift_data(self):

row_count &#61; self.row_count()

logger.info("total rows need to be updated {}".format(row_count))

offset_number &#61; 0

while offset_number <&#61; row_count:

db, cursor &#61; self.connect()

cursor.execute("SELECT * FROM cluster WHERE binary_sentiment is NULL order by object_id LIMIT 1000 OFFSET %s" % offset_number)

rows &#61; cursor.fetchall()

res &#61; self.loop_data(rows)

self.update_data(res)

logger.info("already update rows: {}".format(offset_number))

offset_number &#43;&#61; 1000

def main():

db &#61; DB()

db.shift_data()

if __name__ &#61;&#61; &#39;__main__&#39;:

main()

第一种解决方案通过limit, offset分页解决&#xff0c;同时针对数据库不停的断开的情况加入了重试和连接池机制(数据库不停断开我真的是第一次遇到&#xff0c;报错就是mysql server has gone away&#xff0c; 真的很诡异)&#xff0c;同时在面对这么多数据的时候&#xff0c;普通写法cursor.execute, peewee orm全部都会失效&#xff0c;因为它们会将数据一次性载入内存中&#xff0c;会报内存不足的错误&#xff0c;分页才能解决内存不足的错误。但这种方法的问题是什么呢&#xff1f;limit offset越到后面越慢&#xff0c;还有就是我这里主要是在用机器学习算法更新文章的情感系数&#xff0c;而我这里只用了一个进程去跑&#xff0c;自然很慢的&#xff0c;用代码性能检测工具cprofile去检测更新1000条数据的性能和时间&#xff0c;会发现除了建立连接的时间&#xff0c;就是情感分析的代码消耗时间最长了&#xff0c;

import cProfile

import pstats

cProfile.run("db.shift_data()", "thing.txt")

p &#61; pstats.Stats("thing.txt")

p.sort_stats("cumulative").print_stats(100)

所以自然引出多进程的版本

#! python3

# -*- coding: utf-8 -*-

import MySQLdb

from MySQLdb.cursors import DictCursor

import html2text

from multiprocessing import Process, cpu_count, Queue

from DBUtils.PooledDB import PooledDB

from sentiment_analysis import Sentiment_Analysis

POOL &#61; PooledDB(creator&#61;MySQLdb, mincached&#61;1, maxcached&#61;20, host&#61;&#39;&#39;,

port&#61;33312, user&#61;&#39;&#39;, passwd&#61;&#39;&#39;, db&#61;&#39;table&#39;,

charset&#61;&#39;utf8mb4&#39;, use_unicode&#61;True, cursorclass&#61;DictCursor)

def connect():

db &#61; POOL.connection()

cursor &#61; db.cursor()

return db, cursor

def add_job(job):

db, cursor &#61; connect()

total &#61; 8746218

print("total rows need to be updated {}".format(total))

offset_number &#61; 0

while offset_number <&#61; total:

sql &#61; """SELECT * FROM cluster order by object_id LIMIT %s OFFSET %s"""

cursor.execute(sql, (1000, offset_number))

rows &#61; cursor.fetchall()

job.put(rows)

offset_number &#43;&#61; 1000

#print("already update {} rows".format(offset_number))

def create_process(job, concurrency):

for _ in range(concurrency):

process &#61; Process(target&#61;worker, args&#61;(job, ))

process.start()

def worker(job):

while True:

try:

task &#61; job.get()

insert(task)

except Exception as err:

print(err)

def insert(task):

batch &#61; []

for item in task:

title &#61; html2text.html2text(item[&#39;title&#39;])

content &#61; html2text.html2text(item[&#39;content&#39;])

news_dict &#61; {&#39;title&#39;: title, &#39;content&#39;: content}

sa &#61; Sentiment_Analysis()

result_tuple &#61; sa.sentiment_analysis(news_dict)[0]

binary_sentiment_degree &#61; int(result_tuple[0])

probability_sentiment_degree &#61; result_tuple[1]

batch.append((item[&#39;object_id&#39;],

item[&#39;url&#39;],

item[&#39;title&#39;],

item[&#39;html&#39;],

item[&#39;content&#39;],

item[&#39;category_name&#39;],

item[&#39;occur_time&#39;],

item[&#39;category_link&#39;],

item[&#39;desc&#39;],

item[&#39;source&#39;],

item[&#39;created_time&#39;],

item[&#39;keyword&#39;],

binary_sentiment_degree,

probability_sentiment_degree,

item[&#39;update_time&#39;]))

db, cursor &#61; connect()

sql &#61; """INSERT INTO news (object_id, url, title, html, content, category_name, occur_time, category_link, news.desc, source, created_time, keyword, binary_sentiment, probability_sentiment, update_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""

try:

cursor.executemany(sql, batch)

db.commit()

print(&#39;1000 rows has inserted!&#39;)

except MySQLdb.IntegrityError:

pass

finally:

cursor.close()

db.close()

def start(concurrency&#61;cpu_count()):

job &#61; Queue()

create_process(job, concurrency)

add_job(job)

try:

job.close()

job.join_thread()

except Exception as err:

print(err)

if __name__ &#61;&#61; &#39;__main__&#39;:

start()

首先我要解释一下为什么这里切换到了MySQLdb, 是有原因的&#xff0c;因为代码在linux上跑的时候&#xff0c;用pymysql竟然报了编码错误&#xff0c;错误信息如下&#xff1a;

cursor.execute(sql, (1000, offset_number)), unicodedecodeerror: &#39;utf-8&#39; codec can&#39;t decode byte 0xb1 in position 5221: invalid start byte

反正也很诡异&#xff0c;然后我只能切换到MySQLdb,https://stackoverflow.com/questions/25865270/how-to-install-python-mysqldb-module-using-pip​stackoverflow.com

主要参照这篇文章来去安装&#xff0c;注意是安装mysqlclient&#xff0c;但有可能会报错&#xff0c;要先安装sudo apt-get install python-pip python-dev libmysqlclient-dev&#xff0c;然后这个问题就解决了&#xff0c;其实这里的多进程是采用了读写分离&#xff0c;一个生产者&#xff0c;多个消费者的写法(有多少个进程&#xff0c;就有多少个消费者)&#xff0c;将数据全部转移到了另外一张表。

大概情况就是这样&#xff0c;但还有没有优化的空间呢&#xff1f;可以考虑消息队列&#xff0c;比方说每次读取100万数据&#xff0c;平均分配给10个队列&#xff0c;作为10个生产者队列&#xff0c;然后每1个生产者队列又切分成10份&#xff0c;也就是100个消费者队列&#xff0c;思路就是这样。至于为什么选择celery, kafka, akka, 一个是主流的中间件框架&#xff0c;还有就是python, java, scala用的比较多。

未完待续&#xff0c;下面的文章就是队列版本了。

同时python打日志可以用logbook, 任务调用除了crontab可以用Azkabanhttps://logbook.readthedocs.io/en/stable/​logbook.readthedocs.io王彦鸿&#xff1a;Azkaban入门篇​zhuanlan.zhihu.com10c6258d4c98ee4d2fa5df364cbbf41f.png

后续更新&#xff1a;在与别人交流过后&#xff0c;有一些地方需要更新&#xff0c;首先可以用limit offset, 但一定要用order by&#xff0c;然后每次取1000条中最大的&#xff0c;不断更新最大的数值&#xff0c;还有就是mysql一张表如果优化的好的话&#xff0c;1亿的数据存在一张表里面是完全没有问题的。



推荐阅读
  • HTTP(HyperTextTransferProtocol)是超文本传输协议的缩写,它用于传送www方式的数据。HTTP协议采用了请求响应模型。客服端向服务器发送一 ... [详细]
  • 机器学习算法:SVM(支持向量机)
    SVM算法(SupportVectorMachine,支持向量机)的核心思想有2点:1、如果数据线性可分,那么基于最大间隔的方式来确定超平面,以确保全局最优, ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
  • 网站访问全流程解析
    本文详细介绍了从用户在浏览器中输入一个域名(如www.yy.com)到页面完全展示的整个过程,包括DNS解析、TCP连接、请求响应等多个步骤。 ... [详细]
  • 在Delphi7下要制作系统托盘,只能制作一个比较简单的系统托盘,因为ShellAPI文件定义的TNotifyIconData结构体是比较早的版本。定义如下:1234 ... [详细]
  • 在尝试对 QQmlPropertyMap 类进行测试驱动开发时,发现其派生类中无法正常调用槽函数或 Q_INVOKABLE 方法。这可能是由于 QQmlPropertyMap 的内部实现机制导致的,需要进一步研究以找到解决方案。 ... [详细]
  • 小程序的授权和登陆
    小程序的授权和登陆 ... [详细]
  • Leetcode学习成长记:天池leetcode基础训练营Task01数组
    前言这是本人第一次参加由Datawhale举办的组队学习活动,这个活动每月一次,之前也一直关注,但未亲身参与过,这次看到活动 ... [详细]
  • 本文将深入探讨 iOS 中的 Grand Central Dispatch (GCD),并介绍如何利用 GCD 进行高效多线程编程。如果你对线程的基本概念还不熟悉,建议先阅读相关基础资料。 ... [详细]
  • 本文总结了Java初学者需要掌握的六大核心知识点,帮助你更好地理解和应用Java编程。无论你是刚刚入门还是希望巩固基础,这些知识点都是必不可少的。 ... [详细]
  • 本文介绍了如何在 Spring 3.0.5 中使用 JdbcTemplate 插入数据并获取 MySQL 表中的自增主键。 ... [详细]
  • JUC(三):深入解析AQS
    本文详细介绍了Java并发工具包中的核心类AQS(AbstractQueuedSynchronizer),包括其基本概念、数据结构、源码分析及核心方法的实现。 ... [详细]
  • WinMain 函数详解及示例
    本文详细介绍了 WinMain 函数的参数及其用途,并提供了一个具体的示例代码来解析 WinMain 函数的实现。 ... [详细]
  • 本文详细解析了使用C++实现的键盘输入记录程序的源代码,该程序在Windows应用程序开发中具有很高的实用价值。键盘记录功能不仅在远程控制软件中广泛应用,还为开发者提供了强大的调试和监控工具。通过具体实例,本文深入探讨了C++键盘记录程序的设计与实现,适合需要相关技术的开发者参考。 ... [详细]
  • 本文探讨了如何通过编程手段在Linux系统中禁用硬件预取功能。基于Intel® Core™微架构的应用性能优化需求,文章详细介绍了相关配置方法和代码实现,旨在帮助开发人员有效控制硬件预取行为,提升应用程序的运行效率。 ... [详细]
author-avatar
爱是种承诺ml
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有