有朋友问我连接池是怎么实现的,我曾经写过一个python mysql的连接池库,他的原理就是利用python queue或则是list实现的。具体点就是一开始构造实例化对象的时候,预先在一个队列里放入指定数目的链接对象。
那么这样的话,每次我们要调用链接对象的时候,他会从链接队列里pop一个,如果这个链接对象不能用,那就重新pop一个,另外创建一个新的链接对象塞入到队列里面。 有一个细节大家需要注意,有些服务不是很稳定,或者说server端有timeout值,我们可以在连接池里加入过期时间的控制,也就是说你如果长期没有使用这些链接的话,他会自动销毁主动销毁,或者是每次被动访问的时候,都会判断一次是否ttl过期。 对于主动探测时间过期,需要另起一个线程来扫描的。如果这些链接你都不想释放,或者说,每次高频调用的时候,不想把时间放在重建连接上,那么可以间隔性的把每个链接都pop出来,然后发送hello \ pong包。
这里就不再多说关于python 连接池的实现了,下面有两个使用python实现连接池的例子,一个是gevent tcp pool的实现,另一个是redis pool连接池的实现。对于mysql连接池也是这么实现的。
关于python 连接池的文章,总是会没完没了的修改,为毛? 瞎问 ! 点击原文链接查看更新后的文章:
#blog: xiaorui.cc
class Pool(object):
def __init__(self, factory, options={}, initial_connections=0,
max_connections=200, reap_expired_connections=True,
reap_interval=180):
self._factory = factory
self._options = options
self._max_connections = max_connections
self._pool = collections.deque()
self._using = collections.deque()
assert initial_connections <= max_connections, "initial_connections must be less than max_connections"
for i in range(initial_connections):
self._pool.append(self._create_connection())
if reap_expired_connections:
self._reaper = ConnectionReaper(self, reap_interval)
self._reaper.start()
def __del__(self):
for conn in self._pool:
conn.close()
self._pool = None
for conn in self._using:
conn.close()
self._using = None
@contextlib.contextmanager
def connection(self):
conn = self.acquire()
try:
yield conn
finally:
self.release(conn)
@property
def size(self):
"""Returns the pool size."""
return len(self._pool) + len(self._using)
def acquire(self, retry=10, retried=0):
if len(self._pool):
conn = self._pool.popleft()
self._using.append(conn)
return conn
else:
if len(self._pool) + len(self._using) < self._max_connections:
conn = self._create_connection()
self._using.append(conn)
return conn
else:
if retried >= retry:
raise PoolExhaustedError()
retried += 1
gevent.sleep(0.1)
return self.acquire(retry=retry, retried=retried)
def release(self, conn):
if conn in self._using:
self._using.remove(conn)
self._pool.append(conn)
else:
raise ConnectionNotFoundError()
def drop(self, conn):
if conn in self._pool:
self._pool.remove(conn)
if conn.is_connected():
conn.close()
else:
raise ConnectionNotFoundError()
def drop_expired(self):
expired_conns = [conn for conn in self._pool if conn.is_expired()]
for conn in expired_conns:
self.drop(conn)
def _create_connection(self):
conn = self._factory(**self._options)
conn.open()
return conn
下面是redis-py 连接池实现的方式…
#blog: xiaorui.cc
class ConnectionPool(object):
"Generic connection pool"
def __init__(self, connection_class=Connection, max_connections=None,
**connection_kwargs):
max_connections = max_connections or 2 ** 31
if not isinstance(max_connections, (int, long)) or max_connections < 0:
raise ValueError('"max_connections" must be a positive integer')
self.connection_class = connection_class
self.connection_kwargs = connection_kwargs
self.max_connections = max_connections
self.reset()
def __repr__(self):
return "%s<%s>" % (
type(self).__name__,
self.connection_class.description_format % self.connection_kwargs,
)
def reset(self):
self.pid = os.getpid()
self._created_connections = 0
self._available_connections = []
self._in_use_connections = set()
self._check_lock = threading.Lock()
def _checkpid(self):
if self.pid != os.getpid():
with self._check_lock:
if self.pid == os.getpid():
# another thread already did the work while we waited
# on the lock.
return
self.disconnect()
self.reset()
def get_connection(self, command_name, *keys, **options):
"Get a connection from the pool"
self._checkpid()
try:
connection = self._available_connections.pop()
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection)
return connection
def make_connection(self):
"Create a new connection"
if self._created_connections >= self.max_connections:
raise ConnectionError("Too many connections")
self._created_connections += 1
return self.connection_class(**self.connection_kwargs)
def release(self, connection):
"Releases the connection back to the pool"
self._checkpid()
if connection.pid != self.pid:
return
self._in_use_connections.remove(connection)
self._available_connections.append(connection)
def disconnect(self):
"Disconnects all connections in the pool"
all_conns = chain(self._available_connections,
self._in_use_connections)
for connection in all_conns:
connection.disconnect()
关于python 实现连接池的文章就说这么多了…
