项目背景

在处理过程中,今天上午需要更新A字段,下午爬虫组完成了规格书或图片的爬取又需要更新图片和规格书字段,由于单表千万级深度翻页会导致处理速度越来越慢。

select a,b,c from db.tb limit 10000 offset 9000000

改进思路

是否有可以不需要深度翻页也可以进行数据更新的凭据?
是的,利用自增id列

观察数据特征

此单表有自增id列且为主键,根据索引列查询数据和更新数据是最理想的途径。

select a,b, c from db.tb where id=9999999;update db.tb set a=x where id=9999999;
def mission_handler(all_missions, worker_mission_size):    """    根据总任务数和每个worker的任务数计算出任务列表, 任务列表元素为(任务开始id, 任务结束id)。    例: 总任务数100个,每个worker的任务数40, 那么任务列表为:[(1, 40), (41, 80), (81, 100)]    :param all_missions: 总任务数    :param worker_mission_size: 每个worker的最大任务数    :return: [(start_id, end_id), (start_id, end_id), ...]    """    worker_mission_ids = []    current_id = 0    while current_id <= all_missions:        start_id = all_missions if current_id + 1 >= all_missions else current_id + 1        end_id = all_missions if current_id + worker_mission_size >= all_missions else current_id + worker_mission_size        if start_id == end_id:            if worker_mission_ids[-1][1] == start_id:                break        worker_mission_ids.append((start_id, end_id))        current_id += worker_mission_size    return worker_mission_ids
>>> mission_handler(100, 40)[(1, 40), (41, 80), (81, 100)]
from concurrent.futures import ProcessPoolExecutordef main():    # 自增id最大值    max_id = 30000000    # 单worker处理数据量    worker_mission_size = 1000000    # 使用多进程进行处理    missions = mission_handler(max_id, worker_mission_size)    workers = []    executor = ProcessPoolExecutor()    for idx, mission in enumerate(missions):        start_id, end_id = mission        workers.append(executor.submit(data_handler, start_id, end_id, idx))def data_handler(start_id, end_id, worker_id):    pass
# 用另外一张表记录处理状态insert into db.tb_handle_status(row_id, success) values (999, 0);
def data_handler(start_id, end_id, worker_id):    # 数据连接    conn, cursor = mysql()    current_id = start_id        try:            while current_id <= end_id:                try:                    # TODO 数据处理代码                    pass                except Exception as e:                    # TODO 记录处理结果                    # 数据移动到下一条                    current_id += 1                    continue                else:                    # 无异常,继续处理下一条数据                    current_id += 1        except Exception as e:            return 'worker_id({}): result({})'.format(worker_id, False)        finally:            # 数据库资源释放            cursor.close()            conn.close()        return 'worker_id({}): result({})'.format(worker_id, True)
sql = """update db.tb set a=%s, b=%s where id=%s"""values = [            ('a_value', 'b_value', 9999),            ('a_value', 'b_value', 9998),            ...         ]# 批量提交,减少网络io以及锁获取频率cursor.executemany(sql, values)

更多相关文章

  1. Android,一个思路实现APP版本更新
  2. Android描画简单圆形
  3. 详解Android(安卓)TextView属性ellipsize多行失效的解决思路
  4. android 类似QQ 换皮肤 实现思路 apk资源共享
  5. Android之SharedPreference轻量级数据存储
  6. 开始学习Android
  7. Android流媒体
  8. 读懂Android(安卓)(1):使用Android内部的DownloadProvider下载文
  9. Android(安卓)so注入( inject)和Hook(挂钩)的实现思路讨论

随机推荐

  1. 小女子想转学java,各位朋友能否给点建议
  2. java中的事件监听问题,如何将菜单项与组合
  3. java 调用 shell 得到返回值(二)
  4. android游戏开发学习 博客分类: android
  5. Java开发微信公众号(二)---开启开发者模式,
  6. linux环境下成功编译GDAL为JAVA库
  7. java项目中Classpath路径到底指的是哪里?
  8. java-信息安全(三)-PBE加密算法
  9. JAVA EXAM2 复习提纲
  10. dom4j-java-如何获取root中具有特定元素