前两天因为一个服务内存泄漏的问题,让我想看看python mysql是如何实现的. 以前没注意到MySQLdb的代码实现,通过这两天的学习,还是很有收获的。
以前总是以为MySQLdb的_mysql.c很是复杂,事实上虽然也复杂了点,但基本都看的懂,_mysql.c里不含有mysql的底层协议解析及一些底层针对mysql的互动。
该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新. http://xiaorui.cc/?p=3234
python MySQLdb的源代码地址, https://github.com/farcepest/MySQLdb1
_mysql.c在头部引入了mysql c库模块,在python下是有必要使用mysql c api的,不然你又要实现更复杂的轮子. 另外python不能直接引用C库代码的,必修经过比如PyObject改造。 有个趣事,mysql官方也有一个python的mysql库,他其实就是咱们现在说的MySQLdb/_mysql.c以前的版本。 在stackoverflow看到一些蛛丝马迹,说是最开始的python _mysql.c是有bug的,后来这老外在MySQLdb里修复了后,这_mysql.c也被mysql官方集成进去了。 所以我们在mysql官方看到的样例是基于_mysql.c以前的版本。
#xiaorui.cc #include "my_config.h" #include "mysql.h" #include "mysqld_error.h" #include "errmsg.h"
下面是_mysql.c提供给MySQLdb调用的函数, 利用PyObject我们可以使python调用C实现的一些函数,这没的说. 下面是_mysql.c提供给MySQLdb的主要的几个函数.
#xiaorui.cc
static PyMethodDef
_mysql_methods[] = {
{
"connect", #用来创建连接
(PyCFunction)_mysql_connect,
METH_VARARGS | METH_KEYWORDS,
_mysql_connect__doc__
},
{
"escape", #序列化sql
(PyCFunction)_mysql_escape,
METH_VARARGS,
_mysql_escape__doc__
},
{
"fetch_row", #在result集合里获取结果
(PyCFunction)_mysql_ResultObject_fetch_row,
METH_VARARGS | METH_KEYWORDS,
_mysql_ResultObject_fetch_row__doc__
},
{
"commit", #针对autocommit为False的情况下,提交事务
(PyCFunction)_mysql_ConnectionObject_commit,
METH_VARARGS,
_mysql_ConnectionObject_commit__doc__
},
{
"rollback", #回滚事务
(PyCFunction)_mysql_ConnectionObject_rollback,
METH_VARARGS,
_mysql_ConnectionObject_rollback__doc__
},
{
"close", #关闭连接
(PyCFunction)_mysql_ConnectionObject_close,
METH_VARARGS,
_mysql_ConnectionObject_close__doc__
},
{
"insert_id", #获取主键表的最后一个id
(PyCFunction)_mysql_ConnectionObject_insert_id,
METH_VARARGS,
_mysql_ConnectionObject_insert_id__doc__
},
{
"kill", #干掉一个sql任务
(PyCFunction)_mysql_ConnectionObject_kill,
METH_VARARGS,
_mysql_ConnectionObject_kill__doc__
},
{
"ping", #探测连接
(PyCFunction)_mysql_ConnectionObject_ping,
METH_VARARGS,
_mysql_ConnectionObject_ping__doc__
},
{
"query", #查询语句
(PyCFunction)_mysql_ConnectionObject_query,
METH_VARARGS,
_mysql_ConnectionObject_query__doc__
},
{
"select_db", #选择数据库
(PyCFunction)_mysql_ConnectionObject_select_db,
METH_VARARGS,
_mysql_ConnectionObject_select_db__doc__
},
... ... ....省略了很多 !
... ... ....省略了很多 !
我们从创建连接开始说, _mysql_connect主要是调用了_mysql_ConnectionObject_Initialize方法,传递的参数是host,user,passwd,charset等连接选项.
#xiaorui.cc
static PyObject *
_mysql_connect(
...
PyObject *args,
PyObject *kwargs)
{
_mysql_ConnectionObject *c=NULL;
c = MyAlloc(_mysql_ConnectionObject, _mysql_ConnectionObject_Type);
if (c == NULL) return NULL;
if (_mysql_ConnectionObject_Initialize(c, args, kwargs)) {
Py_DECREF(c);
c = NULL;
}
return (PyObject *) c;
}
初始化连接的函数主要的代码就一行,mysql_real_connect是mysql c库的函数,不是_mysql.c构建的.
_mysql_ConnectionObject_Initialize()
conn = mysql_real_connect(&(self->connection), host, user, passwd, db,
port, unix_socket, client_flag);
我们再来说明下cursor的实现, cursor其实就是结果数据集合。 对于cursor存储结果是有两种存储方式. 下面是mysql c api关于存储的那两个函数.
mysql_store_result(MYSQL *mysql);
mysql_use_result(MYSQL *mysql);
这两个函数分别代表了获取查询结果的两种方式, 我们大多数的场景还是会选择store_result。
第一种,调用mysql_store_result函数将从Mysql服务器查询的所有数据都存储到客户端,然后读取;
第二种,调用mysql_use_result初始化检索,以便于后面一行一行的读取结果集,而它本身并没有从服务器读取任何数据,这种方式较之第一种速度更快且所需内存更少,但它会绑定服务器,阻止其他线程更新任何表,而且必须重复执行mysql_fetch_row读取数据,直至返回NULL,否则未读取的行会在下一次查询时作为结果的一部分返回,故经常我们使用mysql_store_result。
下面的逻辑对应MySQLdb的是cursor = db_conn.cursor(cursorclass=xxx)的动作,cursor 可以理解为存储的游标。 我们可以看到每次创建cursor的时候,他都会创建新的result对象并初始化,另外会为此申请新的内存区。
#xiaorui.cc
static PyObject *
_mysql_ConnectionObject_store_result(
_mysql_ConnectionObject *self,
PyObject *args)
{
PyObject *arglist=NULL, *kwarglist=NULL, *result=NULL;
_mysql_ResultObject *r=NULL;
if (!PyArg_ParseTuple(args, "")) return NULL;
check_connection(self); #这里会判断clinet --> mysql连接正常是否?
arglist = Py_BuildValue("(OiO)", self, 0, self->converter);
if (!arglist) goto error;
kwarglist = PyDict_New();
if (!kwarglist) goto error;
r = MyAlloc(_mysql_ResultObject, _mysql_ResultObject_Type);
if (!r) goto error;
if (_mysql_ResultObject_Initialize(r, arglist, kwarglist))
goto error;
result = (PyObject *) r;
if (!(r->result)) {
Py_DECREF(result);
Py_INCREF(Py_None);
result = Py_None;
}
error:
Py_XDECREF(arglist);
Py_XDECREF(kwarglist);
return result;
}
我们通过MySQLdb的execute执行sql语句,这里不限于select、insert、update、delete,执行的主要代码就是调用_mysql.c的_query()函数.
def execute(self, query, args=None):
...
db = self._get_db()
if isinstance(query, unicode):
query = query.encode(db.unicode_literal.charset) #charset转换
if args is not None:
if isinstance(args, dict): #格式化sql语句
query = query % dict((key, db.literal(item))
for key, item in args.iteritems())
else:
query = query % tuple([db.literal(item) for item in args])
r = None
r = self._query(query) #这是主要的调取函数.
还是调用了mysql原生c库的mysql_real_query()函数。 该函数有三个参数,mysql连接,query请求语句,盘问语句的长度.
#xiaorui.cc
static PyObject *
_mysql_ConnectionObject_query(
_mysql_ConnectionObject *self,
PyObject *args)
{
char *query;
int len, r;
if (!PyArg_ParseTuple(args, "s#:query", &query, &len)) return NULL;
check_connection(self);
Py_BEGIN_ALLOW_THREADS
r = mysql_real_query(&(self->connection), query, len);
Py_END_ALLOW_THREADS
if (r) return _mysql_Exception(self);
Py_INCREF(Py_None);
return Py_None;
}
我们在MySQLdb模块里是如何取出查询结果的? 有三个获取方法, cursor.fetchone(), cursor.fetchall() ,cursor.fetchmany().
下面是MySQLdb/cursors.py文件的CursorUseResultMixIn类. 我们会发现不管是fetchone(1)单个,fetchall(0)所有,fetchmany(x)多个都调用了_fetch_row函数,他的参数是是size,这size代码你要获取多少数据。_fetch_row调用的是_mysql.c提供的fetch_row函数.
#xiaorui.cc
class CursorUseResultMixIn(object):
def fetchone(self):
"""我从cursor里就拿一个"""
self._check_executed()
r = self._fetch_row(1)
if not r:
self._warning_check()
return None
self.rownumber = self.rownumber + 1
return r[0]
def fetchmany(self, size=None):
"""我可以自定义size大小,arraysize默认是1个"""
self._check_executed()
r = self._fetch_row(size or self.arraysize)
self.rownumber = self.rownumber + len(r)
if not r:
self._warning_check()
return r
def fetchall(self):
"""获取所有可用的数据"""
self._check_executed()
r = self._fetch_row(0)
self.rownumber = self.rownumber + len(r)
self._warning_check()
return r
def _fetch_row(self, size=1):
if not self._result:
return ()
return self._result.fetch_row(size, self._fetch_type)
#迭代器
def __iter__(self):
self._check_executed()
result = self.rownumber and self._rows[self.rownumber:] or self._rows
return iter(result)
#调用_mysql.c的fetch_row获取结果,这里的self._fetch_type是cursorclass类型,或者是tuple,或者是dict kv
def _fetch_row(self, size=1):
if not self._result:
return ()
return self._result.fetch_row(size, self._fetch_type)
下面是_mysql.c fetch_row的实现代码. 在_mysql_ResultObject_fetch_row里也是可以看到两大参数,一个maxrows,一个格式 .
#xiaorui.cc
static PyObject *
_mysql_ResultObject_fetch_row(
_mysql_ResultObject *self,
...
{
static char *kwlist[] = { "maxrows", "how", NULL };
static _PYFUNC *row_converters[] =
{
_mysql_row_to_tuple,
_mysql_row_to_dict,
_mysql_row_to_dict_old
};
...
unsigned int maxrows=1, how=0, skiprows=0, rowsadded;
check_result_connection(self);
#type类型错误抛出异常
if (how < 0 || how >= sizeof(row_converters)) {
PyErr_SetString(PyExc_ValueError, "how out of range");
return NULL;
}
convert_row = row_converters[how];
if (maxrows) {
if (!(r = PyTuple_New(maxrows))) goto error;
rowsadded = _mysql__fetch_row(self, &r, skiprows, maxrows,
convert_row);
if (rowsadded == -1) goto error;
} else {
if (self->use) {
#如果你是fetchall()获取的,那么他会按照每次1000个从&resut获取结果. 这里有skip ,maxrows的概念.
maxrows = 1000;
if (!(r = PyTuple_New(maxrows))) goto error;
while (1) {
rowsadded = _mysql__fetch_row(self, &r, skiprows,
maxrows, convert_row);
skiprows += rowsadded;
if (rowsadded < maxrows) break;
if (MyTuple_Resize(&r, skiprows+maxrows, 0) == -1)
goto error;
}
...
}
return r;
...
}
_mysql_ResultObject_fetch_row调用_mysql__fetch_row 获取结果. 主要两个参数 skiprows,maxrows,用途当然是分页的.
_mysql__fetch_row(
_mysql_ResultObject *self,
PyObject **r,
int skiprows,
int maxrows,
_PYFUNC *convert_row)
{
unsigned int i;
MYSQL_ROW row;
for (i = skiprows; i<(skiprows+maxrows); i++) {
PyObject *v;
if (!self->use)
row = mysql_fetch_row(self->result);
else {
Py_BEGIN_ALLOW_THREADS;
row = mysql_fetch_row(self->result);
Py_END_ALLOW_THREADS;
}
if (!row && mysql_errno(&(((_mysql_ConnectionObject *)(self->conn))->connection))) {
_mysql_Exception((_mysql_ConnectionObject *)self->conn);
goto error;
}
if (!row) {
if (MyTuple_Resize(r, i, 0) == -1) goto error;
break;
}
}
return i-skiprows;
error:
return -1;
}
一旦调用了mysql_store_result()并获得了不是Null指针的结果,可以调用mysql_fetch_row()来获取结果集中的行,或调用mysql_row_seek()和mysql_row_tell()来获取或设置结果集中的当前行位置。
Cursor.rowcount 可以查看影响行数
#xiaorui.cc
static PyObject *
_mysql_ConnectionObject_affected_rows(
_mysql_ConnectionObject *self,
PyObject *args)
{
if (!PyArg_ParseTuple(args, "")) return NULL;
check_connection(self);
return PyLong_FromUnsignedLongLong(mysql_affected_rows(&(self->connection))); #得到的结果是Long,我们需要转换下.
}
insert_id是可以帮助我们拿到当前表的最新主键id, 这个方法只适用于有自增id的表里. 这个结果不是来自mysqldb本身的记录,而是从服务端获取的,当然这操作没有原子性保证.
#xiaorui.cc
static PyObject *
_mysql_ConnectionObject_insert_id(
_mysql_ConnectionObject *self,
PyObject *args)
{
my_ulonglong r;
if (!PyArg_ParseTuple(args, "")) return NULL;
check_connection(self);
Py_BEGIN_ALLOW_THREADS
r = mysql_insert_id(&(self->connection));
Py_END_ALLOW_THREADS
return PyLong_FromUnsignedLongLong(r);
}
mysql的close()函数比较简单,直接调用mysql c库的mysql_close()方法来关闭连接.
static PyObject *
_mysql_ConnectionObject_close(
_mysql_ConnectionObject *self,
PyObject *args)
{
if (args) {
if (!PyArg_ParseTuple(args, "")) return NULL;
}
if (self->open) {
Py_BEGIN_ALLOW_THREADS
mysql_close(&(self->connection));
Py_END_ALLOW_THREADS
self->open = 0;
} else {
PyErr_SetString(_mysql_ProgrammingError,
"closing a closed connection");
return NULL;
}
_mysql_ConnectionObject_clear(self);
Py_INCREF(Py_None);
return Py_None;
}
当我们每次去创建新cursor结果集的时候,_mysql.c做了什么? 清理回收空间.
static void
_mysql_ResultObject_dealloc(
_mysql_ResultObject *self)
{
PyObject_GC_UnTrack((PyObject *)self);
mysql_free_result(self->result);
_mysql_ResultObject_clear(self);
MyFree(self);
}
这么一看其实python的mysqldb实现还是有点意思的,MySQLdb的python代码只是做了围绕_mysql.c的封装, 如果出于好玩的化,我们也可以直接 import _mysql来实现db的增删改查。 MYSQL_RES的接受及空间回收是在_mysql.c里实现的,话说在python下回收大数据是个很郁闷的事情。
对于_mysql.c实现就说这么多了,我们的讲述已经涵盖了 connect, execute ,fetchall, close主要四大功能函数。
END.
