有朋友问我连接池是怎么实现的,我曾经写过一个python mysql的连接池库,他的原理就是利用python queue或则是list实现的。具体点就是一开始构造实例化对象的时候,预先在一个队列里放入指定数目的链接对象。
那么这样的话,每次我们要调用链接对象的时候,他会从链接队列里pop一个,如果这个链接对象不能用,那就重新pop一个,另外创建一个新的链接对象塞入到队列里面。 有一个细节大家需要注意,有些服务不是很稳定,或者说server端有timeout值,我们可以在连接池里加入过期时间的控制,也就是说你如果长期没有使用这些链接的话,他会自动销毁主动销毁,或者是每次被动访问的时候,都会判断一次是否ttl过期。 对于主动探测时间过期,需要另起一个线程来扫描的。如果这些链接你都不想释放,或者说,每次高频调用的时候,不想把时间放在重建连接上,那么可以间隔性的把每个链接都pop出来,然后发送hello \ pong包。
这里就不再多说关于python 连接池的实现了,下面有两个使用python实现连接池的例子,一个是gevent tcp pool的实现,另一个是redis pool连接池的实现。对于mysql连接池也是这么实现的。
关于python 连接池的文章,总是会没完没了的修改,为毛? 瞎问 ! 点击原文链接查看更新后的文章:
#blog: xiaorui.cc class Pool(object): def __init__(self, factory, options={}, initial_connections=0, max_connections=200, reap_expired_connections=True, reap_interval=180): self._factory = factory self._options = options self._max_connections = max_connections self._pool = collections.deque() self._using = collections.deque() assert initial_connections <= max_connections, "initial_connections must be less than max_connections" for i in range(initial_connections): self._pool.append(self._create_connection()) if reap_expired_connections: self._reaper = ConnectionReaper(self, reap_interval) self._reaper.start() def __del__(self): for conn in self._pool: conn.close() self._pool = None for conn in self._using: conn.close() self._using = None @contextlib.contextmanager def connection(self): conn = self.acquire() try: yield conn finally: self.release(conn) @property def size(self): """Returns the pool size.""" return len(self._pool) + len(self._using) def acquire(self, retry=10, retried=0): if len(self._pool): conn = self._pool.popleft() self._using.append(conn) return conn else: if len(self._pool) + len(self._using) < self._max_connections: conn = self._create_connection() self._using.append(conn) return conn else: if retried >= retry: raise PoolExhaustedError() retried += 1 gevent.sleep(0.1) return self.acquire(retry=retry, retried=retried) def release(self, conn): if conn in self._using: self._using.remove(conn) self._pool.append(conn) else: raise ConnectionNotFoundError() def drop(self, conn): if conn in self._pool: self._pool.remove(conn) if conn.is_connected(): conn.close() else: raise ConnectionNotFoundError() def drop_expired(self): expired_conns = [conn for conn in self._pool if conn.is_expired()] for conn in expired_conns: self.drop(conn) def _create_connection(self): conn = self._factory(**self._options) conn.open() return conn
下面是redis-py 连接池实现的方式…
#blog: xiaorui.cc class ConnectionPool(object): "Generic connection pool" def __init__(self, connection_class=Connection, max_connections=None, **connection_kwargs): max_connections = max_connections or 2 ** 31 if not isinstance(max_connections, (int, long)) or max_connections < 0: raise ValueError('"max_connections" must be a positive integer') self.connection_class = connection_class self.connection_kwargs = connection_kwargs self.max_connections = max_connections self.reset() def __repr__(self): return "%s<%s>" % ( type(self).__name__, self.connection_class.description_format % self.connection_kwargs, ) def reset(self): self.pid = os.getpid() self._created_connections = 0 self._available_connections = [] self._in_use_connections = set() self._check_lock = threading.Lock() def _checkpid(self): if self.pid != os.getpid(): with self._check_lock: if self.pid == os.getpid(): # another thread already did the work while we waited # on the lock. return self.disconnect() self.reset() def get_connection(self, command_name, *keys, **options): "Get a connection from the pool" self._checkpid() try: connection = self._available_connections.pop() except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) return connection def make_connection(self): "Create a new connection" if self._created_connections >= self.max_connections: raise ConnectionError("Too many connections") self._created_connections += 1 return self.connection_class(**self.connection_kwargs) def release(self, connection): "Releases the connection back to the pool" self._checkpid() if connection.pid != self.pid: return self._in_use_connections.remove(connection) self._available_connections.append(connection) def disconnect(self): "Disconnects all connections in the pool" all_conns = chain(self._available_connections, self._in_use_connections) for connection in all_conns: connection.disconnect()
关于python 实现连接池的文章就说这么多了…