源码分析python MySQLdb的实现细节-上

前两天因为一个服务内存泄漏的问题,让我想看看python mysql是如何实现的. 以前没注意到MySQLdb的代码实现,通过这两天的学习,还是很有收获的。

以前总是以为MySQLdb的_mysql.c很是复杂,事实上虽然也复杂了点,但基本都看的懂,_mysql.c里不含有mysql的底层协议解析及一些底层针对mysql的互动。

该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新. http://xiaorui.cc/?p=3234

python MySQLdb的源代码地址,  https://github.com/farcepest/MySQLdb1

_mysql.c在头部引入了mysql c库模块,在python下是有必要使用mysql c api的,不然你又要实现更复杂的轮子. 另外python不能直接引用C库代码的,必修经过比如PyObject改造。 有个趣事,mysql官方也有一个python的mysql库,他其实就是咱们现在说的MySQLdb/_mysql.c以前的版本。 在stackoverflow看到一些蛛丝马迹,说是最开始的python _mysql.c是有bug的,后来这老外在MySQLdb里修复了后,这_mysql.c也被mysql官方集成进去了。 所以我们在mysql官方看到的样例是基于_mysql.c以前的版本。

#xiaorui.cc
#include "my_config.h"
#include "mysql.h"
#include "mysqld_error.h"
#include "errmsg.h"

下面是_mysql.c提供给MySQLdb调用的函数, 利用PyObject我们可以使python调用C实现的一些函数,这没的说. 下面是_mysql.c提供给MySQLdb的主要的几个函数.

#xiaorui.cc

static PyMethodDef
_mysql_methods[] = {
    {
        "connect",   #用来创建连接
        (PyCFunction)_mysql_connect,
        METH_VARARGS | METH_KEYWORDS,
        _mysql_connect__doc__
    },
    {
        "escape",    #序列化sql
        (PyCFunction)_mysql_escape,
        METH_VARARGS,
        _mysql_escape__doc__
    },
    {
        "fetch_row",   #在result集合里获取结果
        (PyCFunction)_mysql_ResultObject_fetch_row,
        METH_VARARGS | METH_KEYWORDS,
        _mysql_ResultObject_fetch_row__doc__
    },
    {
        "commit",   #针对autocommit为False的情况下,提交事务
        (PyCFunction)_mysql_ConnectionObject_commit,
        METH_VARARGS,
        _mysql_ConnectionObject_commit__doc__
    },
    {
        "rollback",   #回滚事务
        (PyCFunction)_mysql_ConnectionObject_rollback,
        METH_VARARGS,
        _mysql_ConnectionObject_rollback__doc__
    },
    {
        "close",   #关闭连接
        (PyCFunction)_mysql_ConnectionObject_close,
        METH_VARARGS,
        _mysql_ConnectionObject_close__doc__
    },
    {
        "insert_id",  #获取主键表的最后一个id
        (PyCFunction)_mysql_ConnectionObject_insert_id,
        METH_VARARGS,
        _mysql_ConnectionObject_insert_id__doc__
    },
    {
        "kill",   #干掉一个sql任务
        (PyCFunction)_mysql_ConnectionObject_kill,
        METH_VARARGS,
        _mysql_ConnectionObject_kill__doc__
    },
    {
        "ping",  #探测连接
        (PyCFunction)_mysql_ConnectionObject_ping,
        METH_VARARGS,
        _mysql_ConnectionObject_ping__doc__
    },
    {
        "query",  #查询语句
        (PyCFunction)_mysql_ConnectionObject_query,
        METH_VARARGS,
        _mysql_ConnectionObject_query__doc__
    },
    {
        "select_db",   #选择数据库
        (PyCFunction)_mysql_ConnectionObject_select_db,
        METH_VARARGS,
        _mysql_ConnectionObject_select_db__doc__
    },
    ... ... ....省略了很多 !
    ... ... ....省略了很多 !


我们从创建连接开始说, _mysql_connect主要是调用了_mysql_ConnectionObject_Initialize方法,传递的参数是host,user,passwd,charset等连接选项.

#xiaorui.cc
static PyObject *
_mysql_connect(
    ...
    PyObject *args,
    PyObject *kwargs)
{
    _mysql_ConnectionObject *c=NULL;

    c = MyAlloc(_mysql_ConnectionObject, _mysql_ConnectionObject_Type);
    if (c == NULL) return NULL;
    if (_mysql_ConnectionObject_Initialize(c, args, kwargs)) {
        Py_DECREF(c);
        c = NULL;
    }
    return (PyObject *) c;
}

初始化连接的函数主要的代码就一行,mysql_real_connect是mysql c库的函数,不是_mysql.c构建的.

_mysql_ConnectionObject_Initialize()

    conn = mysql_real_connect(&(self->connection), host, user, passwd, db,
                  port, unix_socket, client_flag);

我们再来说明下cursor的实现, cursor其实就是结果数据集合。 对于cursor存储结果是有两种存储方式. 下面是mysql c api关于存储的那两个函数. 

mysql_store_result(MYSQL *mysql);

mysql_use_result(MYSQL *mysql);

这两个函数分别代表了获取查询结果的两种方式, 我们大多数的场景还是会选择store_result。
第一种,调用mysql_store_result函数将从Mysql服务器查询的所有数据都存储到客户端,然后读取;
第二种,调用mysql_use_result初始化检索,以便于后面一行一行的读取结果集,而它本身并没有从服务器读取任何数据,这种方式较之第一种速度更快且所需内存更少,但它会绑定服务器,阻止其他线程更新任何表,而且必须重复执行mysql_fetch_row读取数据,直至返回NULL,否则未读取的行会在下一次查询时作为结果的一部分返回,故经常我们使用mysql_store_result。

下面的逻辑对应MySQLdb的是cursor = db_conn.cursor(cursorclass=xxx)的动作,cursor 可以理解为存储的游标。 我们可以看到每次创建cursor的时候,他都会创建新的result对象并初始化,另外会为此申请新的内存区。

#xiaorui.cc

static PyObject *
_mysql_ConnectionObject_store_result(
    _mysql_ConnectionObject *self,
    PyObject *args)
{
    PyObject *arglist=NULL, *kwarglist=NULL, *result=NULL;
    _mysql_ResultObject *r=NULL;

    if (!PyArg_ParseTuple(args, "")) return NULL;
    check_connection(self);   #这里会判断clinet --> mysql连接正常是否? 
    arglist = Py_BuildValue("(OiO)", self, 0, self->converter);
    if (!arglist) goto error;
    kwarglist = PyDict_New();
    if (!kwarglist) goto error;
    r = MyAlloc(_mysql_ResultObject, _mysql_ResultObject_Type);
    if (!r) goto error;
    if (_mysql_ResultObject_Initialize(r, arglist, kwarglist))
        goto error;
    result = (PyObject *) r;
    if (!(r->result)) {

        Py_DECREF(result);
        Py_INCREF(Py_None);
        result = Py_None;
    }
  error:
    Py_XDECREF(arglist);
    Py_XDECREF(kwarglist);
    return result;
}

我们通过MySQLdb的execute执行sql语句,这里不限于select、insert、update、delete,执行的主要代码就是调用_mysql.c的_query()函数.

def execute(self, query, args=None):
    ...
    db = self._get_db()
    if isinstance(query, unicode):
        query = query.encode(db.unicode_literal.charset)  #charset转换
    if args is not None:
        if isinstance(args, dict): #格式化sql语句
            query = query % dict((key, db.literal(item))
                                 for key, item in args.iteritems())
        else:
            query = query % tuple([db.literal(item) for item in args])
        r = None
        r = self._query(query)   #这是主要的调取函数. 


还是调用了mysql原生c库的mysql_real_query()函数。 该函数有三个参数,mysql连接,query请求语句,盘问语句的长度. 

#xiaorui.cc
static PyObject *
_mysql_ConnectionObject_query(
    _mysql_ConnectionObject *self,
    PyObject *args)
{
    char *query;
    int len, r;
    if (!PyArg_ParseTuple(args, "s#:query", &query, &len)) return NULL;
    check_connection(self);
    Py_BEGIN_ALLOW_THREADS
    r = mysql_real_query(&(self->connection), query, len);
    Py_END_ALLOW_THREADS
    if (r) return _mysql_Exception(self);
    Py_INCREF(Py_None);
    return Py_None;
}


我们在MySQLdb模块里是如何取出查询结果的? 有三个获取方法, cursor.fetchone(), cursor.fetchall() ,cursor.fetchmany().  

下面是MySQLdb/cursors.py文件的CursorUseResultMixIn类.  我们会发现不管是fetchone(1)单个,fetchall(0)所有,fetchmany(x)多个都调用了_fetch_row函数,他的参数是是size,这size代码你要获取多少数据。_fetch_row调用的是_mysql.c提供的fetch_row函数.

#xiaorui.cc
class CursorUseResultMixIn(object):

    def fetchone(self):
        """我从cursor里就拿一个"""
        self._check_executed()
        r = self._fetch_row(1)
        if not r:
            self._warning_check()
            return None
        self.rownumber = self.rownumber + 1
        return r[0]

    def fetchmany(self, size=None):
        """我可以自定义size大小,arraysize默认是1个"""
        self._check_executed()
        r = self._fetch_row(size or self.arraysize)
        self.rownumber = self.rownumber + len(r)
        if not r:
            self._warning_check()
        return r

    def fetchall(self):
        """获取所有可用的数据"""
        self._check_executed()
        r = self._fetch_row(0)
        self.rownumber = self.rownumber + len(r)
        self._warning_check()
        return r

    def _fetch_row(self, size=1):
        if not self._result:
            return ()
        return self._result.fetch_row(size, self._fetch_type)

#迭代器
def __iter__(self):
    self._check_executed()
    result = self.rownumber and self._rows[self.rownumber:] or self._rows
    return iter(result)

#调用_mysql.c的fetch_row获取结果,这里的self._fetch_type是cursorclass类型,或者是tuple,或者是dict kv
def _fetch_row(self, size=1):
    if not self._result:
        return ()
    return self._result.fetch_row(size, self._fetch_type)

下面是_mysql.c fetch_row的实现代码. 在_mysql_ResultObject_fetch_row里也是可以看到两大参数,一个maxrows,一个格式 .

#xiaorui.cc
static PyObject *
_mysql_ResultObject_fetch_row(
    _mysql_ResultObject *self,
    ...
{
    static char *kwlist[] = { "maxrows", "how", NULL };
    static _PYFUNC *row_converters[] =
    {
        _mysql_row_to_tuple,
        _mysql_row_to_dict,
        _mysql_row_to_dict_old
    };
    ...
    unsigned int maxrows=1, how=0, skiprows=0, rowsadded;

    check_result_connection(self);
    #type类型错误抛出异常
    if (how < 0 || how >= sizeof(row_converters)) {
        PyErr_SetString(PyExc_ValueError, "how out of range");
        return NULL;
    }
    convert_row = row_converters[how];
    if (maxrows) {
        if (!(r = PyTuple_New(maxrows))) goto error;
        rowsadded = _mysql__fetch_row(self, &r, skiprows, maxrows,
                convert_row);
        if (rowsadded == -1) goto error;
    } else {
        if (self->use) {
            #如果你是fetchall()获取的,那么他会按照每次1000个从&resut获取结果. 这里有skip ,maxrows的概念.
            maxrows = 1000;
            if (!(r = PyTuple_New(maxrows))) goto error;
            while (1) {
                rowsadded = _mysql__fetch_row(self, &r, skiprows,
                        maxrows, convert_row);
                skiprows += rowsadded;
                if (rowsadded < maxrows) break;
                if (MyTuple_Resize(&r, skiprows+maxrows, 0) == -1)
                        goto error;
            }
            ...
    }
    return r;
    ...
}


_mysql_ResultObject_fetch_row调用_mysql__fetch_row 获取结果. 主要两个参数 skiprows,maxrows,用途当然是分页的.

_mysql__fetch_row(
    _mysql_ResultObject *self,
    PyObject **r,
    int skiprows,
    int maxrows,
    _PYFUNC *convert_row)
{
    unsigned int i;
    MYSQL_ROW row;

    for (i = skiprows; i<(skiprows+maxrows); i++) {
        PyObject *v;
        if (!self->use)
            row = mysql_fetch_row(self->result);
        else {
            Py_BEGIN_ALLOW_THREADS;
            row = mysql_fetch_row(self->result);
            Py_END_ALLOW_THREADS;
        }
        if (!row && mysql_errno(&(((_mysql_ConnectionObject *)(self->conn))->connection))) {
            _mysql_Exception((_mysql_ConnectionObject *)self->conn);
            goto error;
        }
        if (!row) {
            if (MyTuple_Resize(r, i, 0) == -1) goto error;
            break;
        }
    }
    return i-skiprows;
  error:
    return -1;
}

一旦调用了mysql_store_result()并获得了不是Null指针的结果,可以调用mysql_fetch_row()来获取结果集中的行,或调用mysql_row_seek()和mysql_row_tell()来获取或设置结果集中的当前行位置。


Cursor.rowcount 可以查看影响行数

#xiaorui.cc
static PyObject *
_mysql_ConnectionObject_affected_rows(
    _mysql_ConnectionObject *self,
    PyObject *args)
{
    if (!PyArg_ParseTuple(args, "")) return NULL;
    check_connection(self);
    return PyLong_FromUnsignedLongLong(mysql_affected_rows(&(self->connection)));  #得到的结果是Long,我们需要转换下.
}


insert_id是可以帮助我们拿到当前表的最新主键id, 这个方法只适用于有自增id的表里. 这个结果不是来自mysqldb本身的记录,而是从服务端获取的,当然这操作没有原子性保证. 

#xiaorui.cc
static PyObject *
_mysql_ConnectionObject_insert_id(
    _mysql_ConnectionObject *self,
    PyObject *args)
{
    my_ulonglong r;
    if (!PyArg_ParseTuple(args, "")) return NULL;
    check_connection(self);
    Py_BEGIN_ALLOW_THREADS
    r = mysql_insert_id(&(self->connection));
    Py_END_ALLOW_THREADS
    return PyLong_FromUnsignedLongLong(r);
}


mysql的close()函数比较简单,直接调用mysql c库的mysql_close()方法来关闭连接.

static PyObject *
_mysql_ConnectionObject_close(
    _mysql_ConnectionObject *self,
    PyObject *args)
{
    if (args) {
        if (!PyArg_ParseTuple(args, "")) return NULL;
    }
    if (self->open) {
        Py_BEGIN_ALLOW_THREADS
        mysql_close(&(self->connection));
        Py_END_ALLOW_THREADS
        self->open = 0;
    } else {
        PyErr_SetString(_mysql_ProgrammingError,
                "closing a closed connection");
        return NULL;
    }
    _mysql_ConnectionObject_clear(self);
    Py_INCREF(Py_None);
    return Py_None;
}

当我们每次去创建新cursor结果集的时候,_mysql.c做了什么? 清理回收空间.

static void
_mysql_ResultObject_dealloc(
    _mysql_ResultObject *self)
{
    PyObject_GC_UnTrack((PyObject *)self);
    mysql_free_result(self->result);
    _mysql_ResultObject_clear(self);
    MyFree(self);
}

这么一看其实python的mysqldb实现还是有点意思的,MySQLdb的python代码只是做了围绕_mysql.c的封装,    如果出于好玩的化,我们也可以直接 import _mysql来实现db的增删改查。 MYSQL_RES的接受及空间回收是在_mysql.c里实现的,话说在python下回收大数据是个很郁闷的事情。 

对于_mysql.c实现就说这么多了,我们的讲述已经涵盖了 connect, execute ,fetchall, close主要四大功能函数。 

END.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注