前两天因为一个服务内存泄漏的问题,让我想看看python mysql是如何实现的. 以前没注意到MySQLdb的代码实现,通过这两天的学习,还是很有收获的。
以前总是以为MySQLdb的_mysql.c很是复杂,事实上虽然也复杂了点,但基本都看的懂,_mysql.c里不含有mysql的底层协议解析及一些底层针对mysql的互动。
该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新. http://xiaorui.cc/?p=3234
python MySQLdb的源代码地址, https://github.com/farcepest/MySQLdb1
_mysql.c在头部引入了mysql c库模块,在python下是有必要使用mysql c api的,不然你又要实现更复杂的轮子. 另外python不能直接引用C库代码的,必修经过比如PyObject改造。 有个趣事,mysql官方也有一个python的mysql库,他其实就是咱们现在说的MySQLdb/_mysql.c以前的版本。 在stackoverflow看到一些蛛丝马迹,说是最开始的python _mysql.c是有bug的,后来这老外在MySQLdb里修复了后,这_mysql.c也被mysql官方集成进去了。 所以我们在mysql官方看到的样例是基于_mysql.c以前的版本。
#xiaorui.cc #include "my_config.h" #include "mysql.h" #include "mysqld_error.h" #include "errmsg.h"
下面是_mysql.c提供给MySQLdb调用的函数, 利用PyObject我们可以使python调用C实现的一些函数,这没的说. 下面是_mysql.c提供给MySQLdb的主要的几个函数.
#xiaorui.cc static PyMethodDef _mysql_methods[] = { { "connect", #用来创建连接 (PyCFunction)_mysql_connect, METH_VARARGS | METH_KEYWORDS, _mysql_connect__doc__ }, { "escape", #序列化sql (PyCFunction)_mysql_escape, METH_VARARGS, _mysql_escape__doc__ }, { "fetch_row", #在result集合里获取结果 (PyCFunction)_mysql_ResultObject_fetch_row, METH_VARARGS | METH_KEYWORDS, _mysql_ResultObject_fetch_row__doc__ }, { "commit", #针对autocommit为False的情况下,提交事务 (PyCFunction)_mysql_ConnectionObject_commit, METH_VARARGS, _mysql_ConnectionObject_commit__doc__ }, { "rollback", #回滚事务 (PyCFunction)_mysql_ConnectionObject_rollback, METH_VARARGS, _mysql_ConnectionObject_rollback__doc__ }, { "close", #关闭连接 (PyCFunction)_mysql_ConnectionObject_close, METH_VARARGS, _mysql_ConnectionObject_close__doc__ }, { "insert_id", #获取主键表的最后一个id (PyCFunction)_mysql_ConnectionObject_insert_id, METH_VARARGS, _mysql_ConnectionObject_insert_id__doc__ }, { "kill", #干掉一个sql任务 (PyCFunction)_mysql_ConnectionObject_kill, METH_VARARGS, _mysql_ConnectionObject_kill__doc__ }, { "ping", #探测连接 (PyCFunction)_mysql_ConnectionObject_ping, METH_VARARGS, _mysql_ConnectionObject_ping__doc__ }, { "query", #查询语句 (PyCFunction)_mysql_ConnectionObject_query, METH_VARARGS, _mysql_ConnectionObject_query__doc__ }, { "select_db", #选择数据库 (PyCFunction)_mysql_ConnectionObject_select_db, METH_VARARGS, _mysql_ConnectionObject_select_db__doc__ }, ... ... ....省略了很多 ! ... ... ....省略了很多 !
我们从创建连接开始说, _mysql_connect主要是调用了_mysql_ConnectionObject_Initialize方法,传递的参数是host,user,passwd,charset等连接选项.
#xiaorui.cc static PyObject * _mysql_connect( ... PyObject *args, PyObject *kwargs) { _mysql_ConnectionObject *c=NULL; c = MyAlloc(_mysql_ConnectionObject, _mysql_ConnectionObject_Type); if (c == NULL) return NULL; if (_mysql_ConnectionObject_Initialize(c, args, kwargs)) { Py_DECREF(c); c = NULL; } return (PyObject *) c; }
初始化连接的函数主要的代码就一行,mysql_real_connect是mysql c库的函数,不是_mysql.c构建的.
_mysql_ConnectionObject_Initialize() conn = mysql_real_connect(&(self->connection), host, user, passwd, db, port, unix_socket, client_flag);
我们再来说明下cursor的实现, cursor其实就是结果数据集合。 对于cursor存储结果是有两种存储方式. 下面是mysql c api关于存储的那两个函数.
mysql_store_result(MYSQL *mysql);
mysql_use_result(MYSQL *mysql);
这两个函数分别代表了获取查询结果的两种方式, 我们大多数的场景还是会选择store_result。
第一种,调用mysql_store_result函数将从Mysql服务器查询的所有数据都存储到客户端,然后读取;
第二种,调用mysql_use_result初始化检索,以便于后面一行一行的读取结果集,而它本身并没有从服务器读取任何数据,这种方式较之第一种速度更快且所需内存更少,但它会绑定服务器,阻止其他线程更新任何表,而且必须重复执行mysql_fetch_row读取数据,直至返回NULL,否则未读取的行会在下一次查询时作为结果的一部分返回,故经常我们使用mysql_store_result。
下面的逻辑对应MySQLdb的是cursor = db_conn.cursor(cursorclass=xxx)的动作,cursor 可以理解为存储的游标。 我们可以看到每次创建cursor的时候,他都会创建新的result对象并初始化,另外会为此申请新的内存区。
#xiaorui.cc static PyObject * _mysql_ConnectionObject_store_result( _mysql_ConnectionObject *self, PyObject *args) { PyObject *arglist=NULL, *kwarglist=NULL, *result=NULL; _mysql_ResultObject *r=NULL; if (!PyArg_ParseTuple(args, "")) return NULL; check_connection(self); #这里会判断clinet --> mysql连接正常是否? arglist = Py_BuildValue("(OiO)", self, 0, self->converter); if (!arglist) goto error; kwarglist = PyDict_New(); if (!kwarglist) goto error; r = MyAlloc(_mysql_ResultObject, _mysql_ResultObject_Type); if (!r) goto error; if (_mysql_ResultObject_Initialize(r, arglist, kwarglist)) goto error; result = (PyObject *) r; if (!(r->result)) { Py_DECREF(result); Py_INCREF(Py_None); result = Py_None; } error: Py_XDECREF(arglist); Py_XDECREF(kwarglist); return result; }
我们通过MySQLdb的execute执行sql语句,这里不限于select、insert、update、delete,执行的主要代码就是调用_mysql.c的_query()函数.
def execute(self, query, args=None): ... db = self._get_db() if isinstance(query, unicode): query = query.encode(db.unicode_literal.charset) #charset转换 if args is not None: if isinstance(args, dict): #格式化sql语句 query = query % dict((key, db.literal(item)) for key, item in args.iteritems()) else: query = query % tuple([db.literal(item) for item in args]) r = None r = self._query(query) #这是主要的调取函数.
还是调用了mysql原生c库的mysql_real_query()函数。 该函数有三个参数,mysql连接,query请求语句,盘问语句的长度.
#xiaorui.cc static PyObject * _mysql_ConnectionObject_query( _mysql_ConnectionObject *self, PyObject *args) { char *query; int len, r; if (!PyArg_ParseTuple(args, "s#:query", &query, &len)) return NULL; check_connection(self); Py_BEGIN_ALLOW_THREADS r = mysql_real_query(&(self->connection), query, len); Py_END_ALLOW_THREADS if (r) return _mysql_Exception(self); Py_INCREF(Py_None); return Py_None; }
我们在MySQLdb模块里是如何取出查询结果的? 有三个获取方法, cursor.fetchone(), cursor.fetchall() ,cursor.fetchmany().
下面是MySQLdb/cursors.py文件的CursorUseResultMixIn类. 我们会发现不管是fetchone(1)单个,fetchall(0)所有,fetchmany(x)多个都调用了_fetch_row函数,他的参数是是size,这size代码你要获取多少数据。_fetch_row调用的是_mysql.c提供的fetch_row函数.
#xiaorui.cc class CursorUseResultMixIn(object): def fetchone(self): """我从cursor里就拿一个""" self._check_executed() r = self._fetch_row(1) if not r: self._warning_check() return None self.rownumber = self.rownumber + 1 return r[0] def fetchmany(self, size=None): """我可以自定义size大小,arraysize默认是1个""" self._check_executed() r = self._fetch_row(size or self.arraysize) self.rownumber = self.rownumber + len(r) if not r: self._warning_check() return r def fetchall(self): """获取所有可用的数据""" self._check_executed() r = self._fetch_row(0) self.rownumber = self.rownumber + len(r) self._warning_check() return r def _fetch_row(self, size=1): if not self._result: return () return self._result.fetch_row(size, self._fetch_type) #迭代器 def __iter__(self): self._check_executed() result = self.rownumber and self._rows[self.rownumber:] or self._rows return iter(result) #调用_mysql.c的fetch_row获取结果,这里的self._fetch_type是cursorclass类型,或者是tuple,或者是dict kv def _fetch_row(self, size=1): if not self._result: return () return self._result.fetch_row(size, self._fetch_type)
下面是_mysql.c fetch_row的实现代码. 在_mysql_ResultObject_fetch_row里也是可以看到两大参数,一个maxrows,一个格式 .
#xiaorui.cc static PyObject * _mysql_ResultObject_fetch_row( _mysql_ResultObject *self, ... { static char *kwlist[] = { "maxrows", "how", NULL }; static _PYFUNC *row_converters[] = { _mysql_row_to_tuple, _mysql_row_to_dict, _mysql_row_to_dict_old }; ... unsigned int maxrows=1, how=0, skiprows=0, rowsadded; check_result_connection(self); #type类型错误抛出异常 if (how < 0 || how >= sizeof(row_converters)) { PyErr_SetString(PyExc_ValueError, "how out of range"); return NULL; } convert_row = row_converters[how]; if (maxrows) { if (!(r = PyTuple_New(maxrows))) goto error; rowsadded = _mysql__fetch_row(self, &r, skiprows, maxrows, convert_row); if (rowsadded == -1) goto error; } else { if (self->use) { #如果你是fetchall()获取的,那么他会按照每次1000个从&resut获取结果. 这里有skip ,maxrows的概念. maxrows = 1000; if (!(r = PyTuple_New(maxrows))) goto error; while (1) { rowsadded = _mysql__fetch_row(self, &r, skiprows, maxrows, convert_row); skiprows += rowsadded; if (rowsadded < maxrows) break; if (MyTuple_Resize(&r, skiprows+maxrows, 0) == -1) goto error; } ... } return r; ... }
_mysql_ResultObject_fetch_row调用_mysql__fetch_row 获取结果. 主要两个参数 skiprows,maxrows,用途当然是分页的.
_mysql__fetch_row( _mysql_ResultObject *self, PyObject **r, int skiprows, int maxrows, _PYFUNC *convert_row) { unsigned int i; MYSQL_ROW row; for (i = skiprows; i<(skiprows+maxrows); i++) { PyObject *v; if (!self->use) row = mysql_fetch_row(self->result); else { Py_BEGIN_ALLOW_THREADS; row = mysql_fetch_row(self->result); Py_END_ALLOW_THREADS; } if (!row && mysql_errno(&(((_mysql_ConnectionObject *)(self->conn))->connection))) { _mysql_Exception((_mysql_ConnectionObject *)self->conn); goto error; } if (!row) { if (MyTuple_Resize(r, i, 0) == -1) goto error; break; } } return i-skiprows; error: return -1; }
一旦调用了mysql_store_result()并获得了不是Null指针的结果,可以调用mysql_fetch_row()来获取结果集中的行,或调用mysql_row_seek()和mysql_row_tell()来获取或设置结果集中的当前行位置。
Cursor.rowcount 可以查看影响行数
#xiaorui.cc static PyObject * _mysql_ConnectionObject_affected_rows( _mysql_ConnectionObject *self, PyObject *args) { if (!PyArg_ParseTuple(args, "")) return NULL; check_connection(self); return PyLong_FromUnsignedLongLong(mysql_affected_rows(&(self->connection))); #得到的结果是Long,我们需要转换下. }
insert_id是可以帮助我们拿到当前表的最新主键id, 这个方法只适用于有自增id的表里. 这个结果不是来自mysqldb本身的记录,而是从服务端获取的,当然这操作没有原子性保证.
#xiaorui.cc static PyObject * _mysql_ConnectionObject_insert_id( _mysql_ConnectionObject *self, PyObject *args) { my_ulonglong r; if (!PyArg_ParseTuple(args, "")) return NULL; check_connection(self); Py_BEGIN_ALLOW_THREADS r = mysql_insert_id(&(self->connection)); Py_END_ALLOW_THREADS return PyLong_FromUnsignedLongLong(r); }
mysql的close()函数比较简单,直接调用mysql c库的mysql_close()方法来关闭连接.
static PyObject * _mysql_ConnectionObject_close( _mysql_ConnectionObject *self, PyObject *args) { if (args) { if (!PyArg_ParseTuple(args, "")) return NULL; } if (self->open) { Py_BEGIN_ALLOW_THREADS mysql_close(&(self->connection)); Py_END_ALLOW_THREADS self->open = 0; } else { PyErr_SetString(_mysql_ProgrammingError, "closing a closed connection"); return NULL; } _mysql_ConnectionObject_clear(self); Py_INCREF(Py_None); return Py_None; }
当我们每次去创建新cursor结果集的时候,_mysql.c做了什么? 清理回收空间.
static void _mysql_ResultObject_dealloc( _mysql_ResultObject *self) { PyObject_GC_UnTrack((PyObject *)self); mysql_free_result(self->result); _mysql_ResultObject_clear(self); MyFree(self); }
这么一看其实python的mysqldb实现还是有点意思的,MySQLdb的python代码只是做了围绕_mysql.c的封装, 如果出于好玩的化,我们也可以直接 import _mysql来实现db的增删改查。 MYSQL_RES的接受及空间回收是在_mysql.c里实现的,话说在python下回收大数据是个很郁闷的事情。
对于_mysql.c实现就说这么多了,我们的讲述已经涵盖了 connect, execute ,fetchall, close主要四大功能函数。
END.