大规模异步新闻爬虫: 让MySQL 数据库操作更方便

Python爬虫 2018-12-03 0:28:54 阅读(12990) 评论(2)

小猿们还记得最开始我们实现的那个槽点多多的百度新闻爬虫吗?那里的逻辑最后是把下载的网页和网址存储到数据库,但是我们只是简单的实现为打印信息。

现如今,我们能用的数据库很多,老牌关系型数据库如 MySQL(MariaDB), PostgreSQL 等,新型的NoSQL数据库,还有NewSqL数据库。选择实在太多,但MySQL(Mariadb)从易获取性、易使用性、稳定性、社区活跃性方面都有较大优势,所以,我们在够用的情况下都选择MySQL。

封装mysql成一个python包

今天,我们就把MySQL的操作单独拿出来探讨一下,并实现一个更方便的封装。

Python对MySQL操作的模块最好的两个模块是:

1. MySQLdb
这是一个老牌的MySQL模块,它封装了MySQL client的C语言API,但是它主要支持Python 2.x的版本,后来有人fork了一个版本加入了Python 3的支持,并起名为mysqlclient-python 它的pypi包名为mysqlclient,所以通过pip安装就是 pip install mysqlclient

2. PyMySQL
这是一个纯Python实现的MySQL客户端。因为是纯Python实现,它和Python 3的异步模块aysncio可以很好的结合起来,形成了aiomysql 模块,后面我们写异步爬虫时就可以对数据库进行异步操作了。

通过以上简单的对比,我们选择了PyMySQL来作为我们的数据库客户端模块。

老猿我在Python中操作MySQL的时间已经有十多年了,总结下来,还是tornado里面的那个torndb的封装使用比较方便。torndb在Python 2.x时代早就出现了,那时候它是对MySQLdb的封装。后来接触Python 3 和 PyMySQL,就自己参考torndb和自己的经验,对PyMySQL进行了一个封装,并给它起了个很土的名字: ezpymysql

不过,这个很土的名字背后,还是有着让人省心的方便,希望小猿们能看在它好用的份儿上,别计较它很土的名字。

废话不多讲,代码接着上!

1. 使用示例

首先我们先通过一个使用例子看看它的方便性:

from ezpymysql import Connection

db = Connection(
    'localhost',
    'db_name',
    'user',
    'password'
)
# 获取一条记录
sql = 'select * from test_table where id=%s'
data = db.get(sql, 2)

# 获取多天记录
sql = 'select * from test_table where id>%s'
data = db.query(sql, 2)

# 插入一条数据
sql = 'insert into test_table(title, url) values(%s, %s)'
last_id = db.execute(sql, 'test', 'http://a.com/')
# 或者
last_id = db.insert(sql, 'test', 'http://a.com/')


# 使用更高级的方法插入一条数据
item = {
    'title': 'test',
    'url': 'http://a.com/',
}
last_id = db.table_insert('test_table', item)

它的使用分两步:
首先,建立一个MySQL 连接;
然后,通过sql语句查询或插入数据。

可能有小猿会提出疑问,为什么不用像SQLAlchemy之类的ORM呢?简单说,就是因为这个简单,我们的操作基本上都是查询和插入,用基本的select, insert这些sql语句是最方便和简单的。而ORM要先对表建立映射模型,查询方法也是因ORM而不同,过度的封装很不适合爬虫应用场景。其实,老猿我在写web应用时,仍然是自己写sql,感觉就是那么的清爽!

好吧,不再卖关子了,该上ezpymysql的实现了。

2. 具体实现

#File: ezpymysql.py
#Author: veelion

"""A lightweight wrapper around PyMySQL.
only for python3

"""

import time
import logging
import traceback
import pymysql.cursors

version = "0.7"
version_info = (0, 7, 0, 0)


class Connection(object):
    """A lightweight wrapper around PyMySQL.
    """
    def __init__(self, host, database, user=None, password=None,
                 port=0,
                 max_idle_time=7 * 3600, connect_timeout=10,
                 time_zone="+0:00", charset = "utf8mb4", sql_mode="TRADITIONAL"):
        self.host = host
        self.database = database
        self.max_idle_time = float(max_idle_time)

        args = dict(use_unicode=True, charset=charset,
                    database=database,
                    init_command=('SET time_zone = "%s"' % time_zone),
                    cursorclass=pymysql.cursors.DictCursor,
                    connect_timeout=connect_timeout, sql_mode=sql_mode)
        if user is not None:
            args["user"] = user
        if password is not None:
            args["passwd"] = password

        # We accept a path to a MySQL socket file or a host(:port) string
        if "/" in host:
            args["unix_socket"] = host
        else:
            self.socket = None
            pair = host.split(":")
            if len(pair) == 2:
                args["host"] = pair[0]
                args["port"] = int(pair[1])
            else:
                args["host"] = host
                args["port"] = 3306
        if port:
            args['port'] = port

        self._db = None
        self._db_args = args
        self._last_use_time = time.time()
        try:
            self.reconnect()
        except Exception:
            logging.error("Cannot connect to MySQL on %s", self.host,
                          exc_info=True)

    def _ensure_connected(self):
        # Mysql by default closes client connections that are idle for
        # 8 hours, but the client library does not report this fact until
        # you try to perform a query and it fails.  Protect against this
        # case by preemptively closing and reopening the connection
        # if it has been idle for too long (7 hours by default).
        if (self._db is None or
            (time.time() - self._last_use_time > self.max_idle_time)):
            self.reconnect()
        self._last_use_time = time.time()

    def _cursor(self):
        self._ensure_connected()
        return self._db.cursor()

    def __del__(self):
        self.close()

    def close(self):
        """Closes this database connection."""
        if getattr(self, "_db", None) is not None:
            self._db.close()
            self._db = None

    def reconnect(self):
        """Closes the existing database connection and re-opens it."""
        self.close()
        self._db = pymysql.connect(**self._db_args)
        self._db.autocommit(True)

    def query(self, query, *parameters, **kwparameters):
        """Returns a row list for the given query and parameters."""
        cursor = self._cursor()
        try:
            cursor.execute(query, kwparameters or parameters)
            result = cursor.fetchall()
            return result
        finally:
            cursor.close()

    def get(self, query, *parameters, **kwparameters):
        """Returns the (singular) row returned by the given query.
        """
        cursor = self._cursor()
        try:
            cursor.execute(query, kwparameters or parameters)
            return cursor.fetchone()
        finally:
            cursor.close()

    def execute(self, query, *parameters, **kwparameters):
        """Executes the given query, returning the lastrowid from the query."""
        cursor = self._cursor()
        try:
            cursor.execute(query, kwparameters or parameters)
            return cursor.lastrowid
        except Exception as e:
            if e.args[0] == 1062:
                pass
            else:
                traceback.print_exc()
                raise e
        finally:
            cursor.close()

    insert = execute

    ## =============== high level method for table ===================

    def table_has(self, table_name, field, value):
        if isinstance(value, str):
            value = value.encode('utf8')
        sql = 'SELECT %s FROM %s WHERE %s="%s"' % (
            field,
            table_name,
            field,
            value)
        d = self.get(sql)
        return d

    def table_insert(self, table_name, item):
        '''item is a dict : key is mysql table field'''
        fields = list(item.keys())
        values = list(item.values())
        fieldstr = ','.join(fields)
        valstr = ','.join(['%s'] * len(item))
        for i in range(len(values)):
            if isinstance(values[i], str):
                values[i] = values[i].encode('utf8')
        sql = 'INSERT INTO %s (%s) VALUES(%s)' % (table_name, fieldstr, valstr)
        try:
            last_id = self.execute(sql, *values)
            return last_id
        except Exception as e:
            if e.args[0] == 1062:
                # just skip duplicated item
                pass
            else:
                traceback.print_exc()
                print('sql:', sql)
                print('item:')
                for i in range(len(fields)):
                    vs = str(values[i])
                    if len(vs) > 300:
                        print(fields[i], ' : ', len(vs), type(values[i]))
                    else:
                        print(fields[i], ' : ', vs, type(values[i]))
                raise e

    def table_update(self, table_name, updates,
                     field_where, value_where):
        '''updates is a dict of {field_update:value_update}'''
        upsets = []
        values = []
        for k, v in updates.items():
            s = '%s=%%s' % k
            upsets.append(s)
            values.append(v)
        upsets = ','.join(upsets)
        sql = 'UPDATE %s SET %s WHERE %s="%s"' % (
            table_name,
            upsets,
            field_where, value_where,
        )
        self.execute(sql, *(values))

3. 使用方法

这个实现是对pymysql的简单封装,但提供了一些方便的操作:

1. 建立MySQL连接

db = Connection(
    'localhost',
    'db_name',
    'user',
    'password'
)

一般只需要四个参数就可以建立连接了:

  • host:数据库地址,本节就是localhost
  • database: 数据库名
  • user: 数据库用户名
  • password:数据库用户的密码

后面还有几个参数可酌情使用:

  • max_idle_time: MySQL server默认8小时闲置就会断开客户端的连接;这个参数告诉客户端闲置多长时间要重新连接;
  • time_zone: 这里默认时区为0区,你可以设置为自己的时区,比如东8区 +8:00;
  • charset:默认为utf8mb4,即支持moji字符的utf8;

** 2. 操作数据库**
数据库操作分为两类:读和写。
读操作: 使用get()获取一个数据,返回的是一个dict,key就是数据库表的字段;使用query()来获取一组数据,返回的是一个list,其中每个item就是一个dict,跟get()返回的字典一样。
写操作: 使用insert()或execute(),看源码就知道,inseret就是execute的别名。

** 3. 高级操作**
以table_开头的方法:

  • table_has() 查询某个值是否存在于表中。查询的字段最好建立的在MySQL中建立了索引,不然数据量稍大就会很慢。
  • table_insert() 把一个字典类型的数据插入表中。字典的key必须是表的字段。
  • table_update() 更新表中的一条记录。其中, field_where最好是建立了索引,不然数据量稍大就会很慢。

好了,这就是我们封装的MySQL数据库模块,通过简洁的方法来使用,加快我们今后写爬虫的速度,是写爬虫存储数据的居家必备之良器哦,还不赶紧收藏起来。

爬虫知识点

1. logging 模块
Python提供的输出日志的模块,可以输出到屏幕(stdout、stderr),也可以输出到文件。爬虫在运行过程中,可能会碰到千奇百怪的异常,把这些异常都记录下来,可以很好的帮助改善爬虫。

2. pymysql
一个纯Python实现的MySQL客户端。在使用中,我们把它封装为ezpymysql。

准备工作都做完了,下一篇我们实现一个:
同步定向新闻爬虫

猿人学banner宣传图

我的公众号:猿人学 Python 上会分享更多心得体会,敬请关注。

***版权申明:若没有特殊说明,文章皆是猿人学 yuanrenxue.con 原创,没有猿人学授权,请勿以任何形式转载。***

说点什么吧...

  1. 1楼
    lwq 5年前 (2019-05-29)

    pip install leveldb报错
    Command “python setup.py egg_info” failed with error code 1 in C:\Users\OYHP\AppData\Local\Temp\pip-install-fvcr6i
    jz\leveldb\

    • 回复
      王平 5年前 (2019-05-30)
      回复 @lwq :不能直接在windows上安装,学习的话,推荐用ubuntu