前言:
年前在北京往回跑的时候,笔兄又在问连接池的问题,尼玛我怎么会。连接池要不要释放,不释放又会如何。年后闲着没事看了generic-pool的源码,原本准备打印出来看,结果打印店房间了就用pad看,然后竟然看到了js写的链表,为了降低理解难度(毕竟是菜逼)看了version 2一个单文件完成的降级版本。generic-pool: https://github.com/coopernurse/node-pool
实现流程:
为了帮助理解先把代码注释放一下,天生英语残,看英文解释看不懂,看代码结合草根英语翻译…
- // defaults
- //一个未使用链接的最大存活时间
- factory.idleTimeoutMillis = factory.idleTimeoutMillis || 30000
- //如果为true最近释放的链接将允许被再次分配
- factory.returnToHead = factory.returnToHead || false
- //空闲资源在或低于最小阈值时应被销毁/重新创建。默认 true
- factory.refreshIdle = ('refreshIdle' in factory) ? factory.refreshIdle : true
- //检查连接是否超时时间
- factory.reapInterval = factory.reapIntervalMillis || 1000
- //优先级1-x数值越小优先值越高,这里是队列限制值,用于初始化
- factory.priorityRange = factory.priorityRange || 1
- /* 一个函数,如果连接池资源是正常的返回函数return true 如果资源不正常将会被销毁。这里应该写逻辑 arguments 传入连接对象,使用者判断是否正常,如果
- 不正常返回false,此连接将会被销毁。
- 这货在acquire()中被调用,在从连接池获得资源之前。*/
- factory.validate = factory.validate || function () { return true }
- //最大最小连接数
- factory.max = parseInt(factory.max, 10)
- factory.min = parseInt(factory.min, 10)
- this._factory = factory
- //使用的对象数组
- this._inUseObjects = []
- this._draining = false
- //初始化队列
- this._waitingClients = new PriorityQueue(factory.priorityRange)
- //可获取的链接
- this._availableObjects = []
- this._asyncTestObjects = []
- //活跃连接数
- this._count = 0
- this._removeIdleTimer = null
- this._removeIdleScheduled = false
实现优先级队列:
用户通过priorityRange设置优先级区段,version3应该是实现了一个链表结构(等我弄清楚的..),这里通过PriorityQueue类创建一个数组,数组长度为priorityRange设定值。初始化完成这个样子Array[[],[],[]] 此时我们拥有了3个优先级,当我们调用pool.acquire时可以设定优先,默认为0。也就是说在没有设置优先值的前提下我们只会用到第一个数组。
连接池补充(初始化):
_ensureMinimum 来实现连接池初始化,如果小于min值,通过调用_createResource函数来补充连接至最小值。creatResource :将connection压入inUseObjects[],同时调用addResourceToAvailableObjects给连接对象封装成objWithTimeout对象,增加timeout属性 例- objWithTimeout = {obj: obj,timeout:timeout} 并压入availableObjects[] 。紧接着调用dispense函数,因为初始化过程中队列不存在对象,直接return。到此初始化完成。
使用连接池调用过程:
- Pool.prototype.acquire = function acquire (callback, priority) {
- if (this._draining) {
- throw new Error('pool is draining and cannot accept work')
- }
- if (process.domain) {
- callback = process.domain.bind(callback)
- }
- //1
- this._waitingClients.enqueue(callback, priority)
- //2
- this._dispense()
- return (this._count < this._factory.max)
- }
这里的callback就是acquire里的query,重要的就1,2。先看1.
- PriorityQueue.prototype.enqueue = function enqueue (obj, priority) {
- var priorityOrig
- // Convert to integer with a default value of 0.
- priority = priority && +priority | 0 || 0
- // Clear cache for total.
- this._total = null
- if (priority) {
- priorityOrig = priority
- if (priority < 0 || priority >= this._size) {
- priority = (this._size - 1)
- // put obj at the end of the line
- console.error('invalid priority: ' + priorityOrig + ' must be between 0 and ' + priority)
- }
- }
- this._slots[priority].push(obj)
- }
从入队源码可以看出实现优先级应该怎么办,没错就是分配。来看2。
- while (this._availableObjects.length > 0) {
- this._log('dispense() - reusing obj', 'verbose')
- objWithTimeout = this._availableObjects[0]
- //不合法处理,弱返回true执行销毁
- if (!this._factory.validate(objWithTimeout.obj)) {
- this.destroy(objWithTimeout.obj)
- continue
- }
- //available出栈
- this._availableObjects.shift()
- //inUse入栈
- this._inUseObjects.push(objWithTimeout.obj)
- //链接对象出队
- clientCb = this._waitingClients.dequeue()
- return clientCb(null, objWithTimeout.obj)
- }
- //继续补充连接
- if (this._count < this._factory.max) {
- this._createResource()
- }
注意出队,也是优先级实现的一部分,slots中越往后的越慢出栈。代码如下:
- PriorityQueue.prototype.dequeue = function dequeue (callback) {
- var obj = null
- // Clear cache for total.
- this._total = null
- for (var i = 0, sl = this._slots.length; i < sl; i += 1) {
- //如果队列存在则出栈
- if (this._slots[i].length) {
- obj = this._slots[i].shift()
- break
- }
- }
- return obj
- }
去除过期失效的链接
_scheduleRemoveIdle函数在addResourceToAvailableObjects中被调用,去除(执行destroy)完成后,若可用连接对象不为0则继续按时检查可用连接对象是否过期。
连接复用问题:
从availableObj到inUseObj的队列中,在执行pool.release(client)之后,连接被释放,这里的代码是这样的。
- Pool.prototype.release = function release (obj) {
- // check to see if this object has already been released (i.e., is back in the pool of this._availableObjects)
- if (this._availableObjects.some(function (objWithTimeout) { return (objWithTimeout.obj === obj) })) {
- this._log('release called twice for the same resource: ' + (new Error().stack), 'error')
- return
- }
- // check to see if this object exists in the `in use` list and remove it
- var index = this._inUseObjects.indexOf(obj)
- if (index < 0) {
- this._log('attempt to release an invalid resource: ' + (new Error().stack), 'error')
- return
- }
- // this._log("return to pool")
- this._inUseObjects.splice(index, 1)
- this._addResourceToAvailableObjects(obj)
- }
那么问题来了,在第一个判断不断的抛出错误,看释放的链接是否在可用连接队列里。这里一直返回true,问题就是这样,暂没有找到作者为毛这样写的依据(已解决,我是智障。。)。连接的复用其实就是从inUseObj搬家到availableObj的过程。看了源码之后感觉这里有个问题。复用过程中连接是否超过生存期的问题,如果再次复用,会重新设定时间戳,其实这个连接已经活了很久了。这是一个通用连接池,所以关于连接是否有效还需要拿出处理,这不在讨论范围内。这是一个比较简单的demo,有空version3看一看。
后记:
有很多很多基础问题木有解决。!! 先各种做一些小实验压压惊。。通过分析这个数据库连接池源码,搞懂了通用连接池内部是怎么实现功能的。写可能还不行,今后半年咸鱼时间都用来看模块熟悉大神们的笔迹吧哈哈。再过几天去北京了,献给我一个寒假最后一天可以自己支配的安静的下午,晚安~