关于使用python来实现redis和mysql的连接池

有朋友问我连接池是怎么实现的,我曾经写过一个python mysql的连接池库,他的原理就是利用python queue或则是list实现的。具体点就是一开始构造实例化对象的时候,预先在一个队列里放入指定数目的链接对象。 


那么这样的话,每次我们要调用链接对象的时候,他会从链接队列里pop一个,如果这个链接对象不能用,那就重新pop一个,另外创建一个新的链接对象塞入到队列里面。  有一个细节大家需要注意,有些服务不是很稳定,或者说server端有timeout值,我们可以在连接池里加入过期时间的控制,也就是说你如果长期没有使用这些链接的话,他会自动销毁主动销毁,或者是每次被动访问的时候,都会判断一次是否ttl过期。  对于主动探测时间过期,需要另起一个线程来扫描的。如果这些链接你都不想释放,或者说,每次高频调用的时候,不想把时间放在重建连接上,那么可以间隔性的把每个链接都pop出来,然后发送hello \ pong包。

这里就不再多说关于python 连接池的实现了,下面有两个使用python实现连接池的例子,一个是gevent tcp pool的实现,另一个是redis pool连接池的实现。对于mysql连接池也是这么实现的。 


关于python 连接池的文章,总是会没完没了的修改,为毛?  瞎问 ! 点击原文链接查看更新后的文章:

http://xiaorui.cc/2015/11/18/%E5%85%B3%E4%BA%8E%E4%BD%BF%E7%94%A8python%E6%9D%A5%E5%AE%9E%E7%8E%B0redis%E5%92%8Cmysql%E7%9A%84%E8%BF%9E%E6%8E%A5%E6%B1%A0/

http://xiaorui.cc/?p=2362

#blog:  xiaorui.cc
class Pool(object):
    def __init__(self, factory, options={}, initial_connections=0,
                 max_connections=200, reap_expired_connections=True,
                 reap_interval=180):

        self._factory = factory
        self._options = options
        self._max_connections = max_connections

        self._pool = collections.deque()
        self._using = collections.deque()

        assert initial_connections <= max_connections, "initial_connections must be less than max_connections"

        for i in range(initial_connections):
            self._pool.append(self._create_connection())

        if reap_expired_connections:
            self._reaper = ConnectionReaper(self, reap_interval)
            self._reaper.start()

    def __del__(self):
        for conn in self._pool:
            conn.close()
        self._pool = None

        for conn in self._using:
            conn.close()
        self._using = None

    @contextlib.contextmanager
    def connection(self):
        conn = self.acquire()
        try:
            yield conn

        finally:
            self.release(conn)

    @property
    def size(self):
        """Returns the pool size."""

        return len(self._pool) + len(self._using)

    def acquire(self, retry=10, retried=0):
        if len(self._pool):
            conn = self._pool.popleft()
            self._using.append(conn)

            return conn

        else:
            if len(self._pool) + len(self._using) < self._max_connections:
                conn = self._create_connection()
                self._using.append(conn)
                return conn

            else:
                if retried >= retry:
                    raise PoolExhaustedError()
                retried += 1

                gevent.sleep(0.1)

                return self.acquire(retry=retry, retried=retried)

    def release(self, conn):

        if conn in self._using:
            self._using.remove(conn)
            self._pool.append(conn)

        else:
            raise ConnectionNotFoundError()

    def drop(self, conn):
        if conn in self._pool:
            self._pool.remove(conn)
            if conn.is_connected():
                conn.close()

        else:
            raise ConnectionNotFoundError()

    def drop_expired(self):
        expired_conns = [conn for conn in self._pool if conn.is_expired()]

        for conn in expired_conns:
            self.drop(conn)

    def _create_connection(self):
        conn = self._factory(**self._options)
        conn.open()

        return conn

下面是redis-py 连接池实现的方式…

#blog:  xiaorui.cc
class ConnectionPool(object):
    "Generic connection pool"
    def __init__(self, connection_class=Connection, max_connections=None,
                 **connection_kwargs):
        max_connections = max_connections or 2 ** 31
        if not isinstance(max_connections, (int, long)) or max_connections < 0:
            raise ValueError('"max_connections" must be a positive integer')

        self.connection_class = connection_class
        self.connection_kwargs = connection_kwargs
        self.max_connections = max_connections

        self.reset()

    def __repr__(self):
        return "%s<%s>" % (
            type(self).__name__,
            self.connection_class.description_format % self.connection_kwargs,
        )

    def reset(self):
        self.pid = os.getpid()
        self._created_connections = 0
        self._available_connections = []
        self._in_use_connections = set()
        self._check_lock = threading.Lock()

    def _checkpid(self):
        if self.pid != os.getpid():
            with self._check_lock:
                if self.pid == os.getpid():
                    # another thread already did the work while we waited
                    # on the lock.
                    return
                self.disconnect()
                self.reset()

    def get_connection(self, command_name, *keys, **options):
        "Get a connection from the pool"
        self._checkpid()
        try:
            connection = self._available_connections.pop()
        except IndexError:
            connection = self.make_connection()
        self._in_use_connections.add(connection)
        return connection

    def make_connection(self):
        "Create a new connection"
        if self._created_connections >= self.max_connections:
            raise ConnectionError("Too many connections")
        self._created_connections += 1
        return self.connection_class(**self.connection_kwargs)

    def release(self, connection):
        "Releases the connection back to the pool"
        self._checkpid()
        if connection.pid != self.pid:
            return
        self._in_use_connections.remove(connection)
        self._available_connections.append(connection)

    def disconnect(self):
        "Disconnects all connections in the pool"
        all_conns = chain(self._available_connections,
                          self._in_use_connections)
        for connection in all_conns:
            connection.disconnect()

关于python 实现连接池的文章就说这么多了…   


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

发表评论

邮箱地址不会被公开。 必填项已用*标注