python scrapy怎么存入mysql_pipeline编写pymysql入库逻辑与异步存储优化

Scrapy Pipeline 用 pymysql 同步写入 MySQL 会卡住爬虫

Scrapy 是异步框架,但 pymysql 默认是同步阻塞的。直接在 process_item 里调用 cursor.execute() + conn.commit(),会让整个 Twisted 事件循环停等 IO,吞吐量断崖下跌,严重时触发 Twisted TimeoutError 或连接池耗尽。

实操建议:

别在 Pipeline 里新建 Connection:每次新建连接开销大,且易超限;改用连接池(pymysql.connections.ConnectionPool 或更推荐 DBUtils.PooledDB)显式关闭 cursor,但不要关 conn:连接由池管理,手动 close 会把连接还给池;漏关 cursor 可能导致 MySQL 报 Commands out of sync开启 autocommit=False 但手动 commit:避免每条 insert 都刷盘;批量提交前检查 len(items) >= 100 再 commit,比单条 commit 快 5–10 倍

示例关键片段:

from DBUtils.PooledDB import PooledDBimport pymysql<p>pool = PooledDB(creator=pymysql,host=’localhost’,user=’root’,password=’123′,database=’scrapy_db’,charset=’utf8mb4′,maxconnections=10,autocommit=False)</p><p>class MysqlPipeline:def process_item(self, item, spider):conn = pool.connection()cursor = conn.cursor()try:cursor.execute("INSERT INTO news (title, url) VALUES (%s, %s)",(item[‘title’], item[‘url’]))if self.batch_count >= 100:conn.commit()self.batch_count = 0else:self.batch_count += 1finally:cursor.close() # 必须关</p><h1>conn.close() 不要调用!还给池

想真正异步?别碰 pymysql,换 aiomysql + asyncio Pipeline

Scrapy 本身不原生支持 async Pipeline,但 2.6+ 版本允许返回 Deferred 或 Awaitable。硬上 aiomysql 能解阻塞,但要注意它和 Scrapy 的 event loop 兼容性——Scrapy 用 Twisted,aiomysql 用 asyncio,混用容易出 RuntimeError: There is no current event loop in thread。

立即学习“Python免费学习笔记(深入)”;

实操建议:

只在 Scrapy 2.7+ 且启用 ASYNCIO_PRIORITY_QUEUE=True 时才考虑 aiomysql必须用 asyncio.to_thread() 包裹 pymysql 操作:这是目前最稳的“伪异步”方案,不改 event loop,又规避了主线程阻塞别用 async def process_item 直接 await:Scrapy 会报 TypeError: object xxx can’t be used in ‘await’ expression

简写示意(非完整类):

import asynciofrom scrapy import signals<p>class AsyncMysqlPipeline:def <strong>init</strong>(self):self.pool = None</p><pre class=’brush:python;toolbar:false;’>@classmethoddef from_crawler(cls, crawler): pipeline = cls() crawler.signals.connect(pipeline.spider_opened, signal=signals.spider_opened) return pipelineasync def spider_opened(self, spider): self.pool = await aiomysql.create_pool( host=’localhost’, user=’root’, password=’123′, db=’scrapy_db’, loop=asyncio.get_event_loop() )def process_item(self, item, spider): # 丢进线程池执行,不阻塞 Twisted 主循环 return asyncio.to_thread(self._sync_save, item)def _sync_save(self, item): with self.pool.acquire() as conn: with conn.cursor() as cur: cur.execute("INSERT …", (item[‘title’],)) conn.commit()</pre>

MySQLdb 和 PyMySQL 在 Pipeline 里表现一样吗?

不一样。MySQLdb 是 C 扩展,快但已停止维护,Python 3.12+ 编译失败;PyMySQL 纯 Python,兼容性好,但默认没开 socket timeout,遇到 MySQL 暂时不可用会卡死 30 秒以上,表现为爬虫假死、log 无输出、Ctrl+C 也不响应。

实操建议:

强制加 connect_timeout=5 和 read_timeout=5:否则一次网络抖动就能拖垮整个爬虫周期捕获 pymysql.err.OperationalError 和 pymysql.err.IntegrityError:前者是连不上或超时,后者是唯一键冲突(比如重复 URL),该跳过就跳过,别让单条错导致 pipeline 中断别信 “charset=utf8”:MySQL 实际要 utf8mb4 才支持 emoji,否则存中文可能变 ??? 或报 Incorrect string value

为什么用 INSERT IGNORE 或 ON DUPLICATE KEY UPDATE 而不是先 SELECT 再判断?

因为并发写入时,两次查询 + 一次插入之间存在竞态窗口。哪怕你加了 UNIQUE KEY(url),不用原子写法,仍可能写入重复数据,或者抛出 IntegrityError 后没兜住,导致 item 丢失。

实操建议:

建表时务必加 UNIQUE KEY:比如 UNIQUE KEY `uk_url` (`url`),这是 INSERT IGNORE 生效前提用 INSERT IGNORE INTO … 最省事:冲突时静默跳过,适合去重场景需要更新时间戳或计数器?改用 INSERT … ON DUPLICATE KEY UPDATE updated_at=NOW(),但注意 VALUES(col) 引用的是本次 INSERT 的值,不是原记录值别在 Pipeline 里做 SELECT COUNT(*) 判断是否存在:慢、锁表、且无法解决并发冲突

入库逻辑真正的复杂点不在语法,而在连接生命周期管理、错误恢复策略、以及对 MySQL 事务边界的理解——比如 autocommit 关着时,一条 insert 失败,后续语句还在同一个事务里,不 rollback 就会污染下一批数据。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。