前言:
年前在北京往回跑的时候,笔兄又在问连接池的问题,尼玛我怎么会。连接池要不要释放,不释放又会如何。年后闲着没事看了generic-pool的源码,原本准备打印出来看,结果打印店房间了就用pad看,然后竟然看到了js写的链表,为了降低理解难度(毕竟是菜逼)看了version 2一个单文件完成的降级版本。generic-pool: https://github.com/coopernurse/node-pool
实现流程:
为了帮助理解先把代码注释放一下,天生英语残,看英文解释看不懂,看代码结合草根英语翻译…
// defaults //一个未使用链接的最大存活时间 factory.idleTimeoutMillis = factory.idleTimeoutMillis || 30000 //如果为true最近释放的链接将允许被再次分配 factory.returnToHead = factory.returnToHead || false //空闲资源在或低于最小阈值时应被销毁/重新创建。默认 true factory.refreshIdle = ('refreshIdle' in factory) ? factory.refreshIdle : true //检查连接是否超时时间 factory.reapInterval = factory.reapIntervalMillis || 1000 //优先级1-x数值越小优先值越高,这里是队列限制值,用于初始化 factory.priorityRange = factory.priorityRange || 1 /* 一个函数,如果连接池资源是正常的返回函数return true 如果资源不正常将会被销毁。这里应该写逻辑 arguments 传入连接对象,使用者判断是否正常,如果 不正常返回false,此连接将会被销毁。 这货在acquire()中被调用,在从连接池获得资源之前。*/ factory.validate = factory.validate || function () { return true } //最大最小连接数 factory.max = parseInt(factory.max, 10) factory.min = parseInt(factory.min, 10) this._factory = factory //使用的对象数组 this._inUseObjects = [] this._draining = false //初始化队列 this._waitingClients = new PriorityQueue(factory.priorityRange) //可获取的链接 this._availableObjects = [] this._asyncTestObjects = [] //活跃连接数 this._count = 0 this._removeIdleTimer = null this._removeIdleScheduled = false
实现优先级队列:
用户通过priorityRange设置优先级区段,version3应该是实现了一个链表结构(等我弄清楚的..),这里通过PriorityQueue类创建一个数组,数组长度为priorityRange设定值。初始化完成这个样子Array[[],[],[]] 此时我们拥有了3个优先级,当我们调用pool.acquire时可以设定优先,默认为0。也就是说在没有设置优先值的前提下我们只会用到第一个数组。
连接池补充(初始化):
_ensureMinimum 来实现连接池初始化,如果小于min值,通过调用_createResource函数来补充连接至最小值。creatResource :将connection压入inUseObjects[],同时调用addResourceToAvailableObjects给连接对象封装成objWithTimeout对象,增加timeout属性 例- objWithTimeout = {obj: obj,timeout:timeout} 并压入availableObjects[] 。紧接着调用dispense函数,因为初始化过程中队列不存在对象,直接return。到此初始化完成。
使用连接池调用过程:
Pool.prototype.acquire = function acquire (callback, priority) { if (this._draining) { throw new Error('pool is draining and cannot accept work') } if (process.domain) { callback = process.domain.bind(callback) } //1 this._waitingClients.enqueue(callback, priority) //2 this._dispense() return (this._count < this._factory.max) }
这里的callback就是acquire里的query,重要的就1,2。先看1.
PriorityQueue.prototype.enqueue = function enqueue (obj, priority) { var priorityOrig // Convert to integer with a default value of 0. priority = priority && +priority | 0 || 0 // Clear cache for total. this._total = null if (priority) { priorityOrig = priority if (priority < 0 || priority >= this._size) { priority = (this._size - 1) // put obj at the end of the line console.error('invalid priority: ' + priorityOrig + ' must be between 0 and ' + priority) } } this._slots[priority].push(obj) }
从入队源码可以看出实现优先级应该怎么办,没错就是分配。来看2。
while (this._availableObjects.length > 0) { this._log('dispense() - reusing obj', 'verbose') objWithTimeout = this._availableObjects[0] //不合法处理,弱返回true执行销毁 if (!this._factory.validate(objWithTimeout.obj)) { this.destroy(objWithTimeout.obj) continue } //available出栈 this._availableObjects.shift() //inUse入栈 this._inUseObjects.push(objWithTimeout.obj) //链接对象出队 clientCb = this._waitingClients.dequeue() return clientCb(null, objWithTimeout.obj) } //继续补充连接 if (this._count < this._factory.max) { this._createResource() }
注意出队,也是优先级实现的一部分,slots中越往后的越慢出栈。代码如下:
PriorityQueue.prototype.dequeue = function dequeue (callback) { var obj = null // Clear cache for total. this._total = null for (var i = 0, sl = this._slots.length; i < sl; i += 1) { //如果队列存在则出栈 if (this._slots[i].length) { obj = this._slots[i].shift() break } } return obj }
去除过期失效的链接
_scheduleRemoveIdle函数在addResourceToAvailableObjects中被调用,去除(执行destroy)完成后,若可用连接对象不为0则继续按时检查可用连接对象是否过期。
连接复用问题:
从availableObj到inUseObj的队列中,在执行pool.release(client)之后,连接被释放,这里的代码是这样的。
Pool.prototype.release = function release (obj) { // check to see if this object has already been released (i.e., is back in the pool of this._availableObjects) if (this._availableObjects.some(function (objWithTimeout) { return (objWithTimeout.obj === obj) })) { this._log('release called twice for the same resource: ' + (new Error().stack), 'error') return } // check to see if this object exists in the `in use` list and remove it var index = this._inUseObjects.indexOf(obj) if (index < 0) { this._log('attempt to release an invalid resource: ' + (new Error().stack), 'error') return } // this._log("return to pool") this._inUseObjects.splice(index, 1) this._addResourceToAvailableObjects(obj) }
那么问题来了,在第一个判断不断的抛出错误,看释放的链接是否在可用连接队列里。这里一直返回true,问题就是这样,暂没有找到作者为毛这样写的依据(已解决,我是智障。。)。连接的复用其实就是从inUseObj搬家到availableObj的过程。看了源码之后感觉这里有个问题。复用过程中连接是否超过生存期的问题,如果再次复用,会重新设定时间戳,其实这个连接已经活了很久了。这是一个通用连接池,所以关于连接是否有效还需要拿出处理,这不在讨论范围内。这是一个比较简单的demo,有空version3看一看。
后记:
有很多很多基础问题木有解决。!! 先各种做一些小实验压压惊。。通过分析这个数据库连接池源码,搞懂了通用连接池内部是怎么实现功能的。写可能还不行,今后半年咸鱼时间都用来看模块熟悉大神们的笔迹吧哈哈。再过几天去北京了,献给我一个寒假最后一天可以自己支配的安静的下午,晚安~