MySQL单表千万级数据处理的思路分享
16lz
2021-12-11
项目背景
在处理过程中,今天上午需要更新A字段,下午爬虫组完成了规格书或图片的爬取又需要更新图片和规格书字段,由于单表千万级深度翻页会导致处理速度越来越慢。
select a,b,c from db.tb limit 10000 offset 9000000
改进思路
是否有可以不需要深度翻页也可以进行数据更新的凭据?
是的,利用自增id列
观察数据特征
此单表有自增id列且为主键,根据索引列查询数据和更新数据是最理想的途径。
select a,b, c from db.tb where id=9999999;update db.tb set a=x where id=9999999;
def mission_handler(all_missions, worker_mission_size): """ 根据总任务数和每个worker的任务数计算出任务列表, 任务列表元素为(任务开始id, 任务结束id)。 例: 总任务数100个,每个worker的任务数40, 那么任务列表为:[(1, 40), (41, 80), (81, 100)] :param all_missions: 总任务数 :param worker_mission_size: 每个worker的最大任务数 :return: [(start_id, end_id), (start_id, end_id), ...] """ worker_mission_ids = [] current_id = 0 while current_id <= all_missions: start_id = all_missions if current_id + 1 >= all_missions else current_id + 1 end_id = all_missions if current_id + worker_mission_size >= all_missions else current_id + worker_mission_size if start_id == end_id: if worker_mission_ids[-1][1] == start_id: break worker_mission_ids.append((start_id, end_id)) current_id += worker_mission_size return worker_mission_ids
>>> mission_handler(100, 40)[(1, 40), (41, 80), (81, 100)]
from concurrent.futures import ProcessPoolExecutordef main(): # 自增id最大值 max_id = 30000000 # 单worker处理数据量 worker_mission_size = 1000000 # 使用多进程进行处理 missions = mission_handler(max_id, worker_mission_size) workers = [] executor = ProcessPoolExecutor() for idx, mission in enumerate(missions): start_id, end_id = mission workers.append(executor.submit(data_handler, start_id, end_id, idx))def data_handler(start_id, end_id, worker_id): pass
# 用另外一张表记录处理状态insert into db.tb_handle_status(row_id, success) values (999, 0);
def data_handler(start_id, end_id, worker_id): # 数据连接 conn, cursor = mysql() current_id = start_id try: while current_id <= end_id: try: # TODO 数据处理代码 pass except Exception as e: # TODO 记录处理结果 # 数据移动到下一条 current_id += 1 continue else: # 无异常,继续处理下一条数据 current_id += 1 except Exception as e: return 'worker_id({}): result({})'.format(worker_id, False) finally: # 数据库资源释放 cursor.close() conn.close() return 'worker_id({}): result({})'.format(worker_id, True)
sql = """update db.tb set a=%s, b=%s where id=%s"""values = [ ('a_value', 'b_value', 9999), ('a_value', 'b_value', 9998), ... ]# 批量提交,减少网络io以及锁获取频率cursor.executemany(sql, values)
更多相关文章
- Android,一个思路实现APP版本更新
- Android描画简单圆形
- 详解Android(安卓)TextView属性ellipsize多行失效的解决思路
- android 类似QQ 换皮肤 实现思路 apk资源共享
- Android之SharedPreference轻量级数据存储
- 开始学习Android
- Android流媒体
- 读懂Android(安卓)(1):使用Android内部的DownloadProvider下载文
- Android(安卓)so注入( inject)和Hook(挂钩)的实现思路讨论