generic-pool v2 数据库连接池源码分析

Nodejs cyanprobe 8年前 (2017-02-02) 7946次浏览 已收录 7个评论

前言:

年前在北京往回跑的时候,笔兄又在问连接池的问题,尼玛我怎么会。连接池要不要释放,不释放又会如何。年后闲着没事看了generic-pool的源码,原本准备打印出来看,结果打印店房间了就用pad看,然后竟然看到了js写的链表,为了降低理解难度(毕竟是菜逼)看了version 2一个单文件完成的降级版本。generic-pool: https://github.com/coopernurse/node-pool

实现流程:

为了帮助理解先把代码注释放一下,天生英语残,看英文解释看不懂,看代码结合草根英语翻译…

  1.   // defaults
  2. //一个未使用链接的最大存活时间
  3. factory.idleTimeoutMillis = factory.idleTimeoutMillis || 30000
  4. //如果为true最近释放的链接将允许被再次分配
  5. factory.returnToHead = factory.returnToHead || false
  6. //空闲资源在或低于最小阈值时应被销毁/重新创建。默认 true
  7. factory.refreshIdle = ('refreshIdle' in factory) ? factory.refreshIdle : true
  8. //检查连接是否超时时间
  9. factory.reapInterval = factory.reapIntervalMillis || 1000
  10. //优先级1-x数值越小优先值越高,这里是队列限制值,用于初始化
  11. factory.priorityRange = factory.priorityRange || 1
  12. /* 一个函数,如果连接池资源是正常的返回函数return true 如果资源不正常将会被销毁。这里应该写逻辑 arguments 传入连接对象,使用者判断是否正常,如果
  13. 不正常返回false,此连接将会被销毁。
  14. 这货在acquire()中被调用,在从连接池获得资源之前。*/
  15. factory.validate = factory.validate || function () { return true }
  16. //最大最小连接数
  17. factory.max = parseInt(factory.max, 10)
  18. factory.min = parseInt(factory.min, 10)
  19. this._factory = factory
  20. //使用的对象数组
  21. this._inUseObjects = []
  22. this._draining = false
  23. //初始化队列
  24. this._waitingClients = new PriorityQueue(factory.priorityRange)
  25. //可获取的链接
  26. this._availableObjects = []
  27. this._asyncTestObjects = []
  28. //活跃连接数
  29. this._count = 0
  30. this._removeIdleTimer = null
  31. this._removeIdleScheduled = false

实现优先级队列:

用户通过priorityRange设置优先级区段,version3应该是实现了一个链表结构(等我弄清楚的..),这里通过PriorityQueue类创建一个数组,数组长度为priorityRange设定值。初始化完成这个样子Array[[],[],[]] 此时我们拥有了3个优先级,当我们调用pool.acquire时可以设定优先,默认为0。也就是说在没有设置优先值的前提下我们只会用到第一个数组。

连接池补充(初始化):

_ensureMinimum 来实现连接池初始化,如果小于min值,通过调用_createResource函数来补充连接至最小值。creatResource :将connection压入inUseObjects[],同时调用addResourceToAvailableObjects给连接对象封装成objWithTimeout对象,增加timeout属性 例- objWithTimeout = {obj: obj,timeout:timeout}  并压入availableObjects[] 。紧接着调用dispense函数,因为初始化过程中队列不存在对象,直接return。到此初始化完成。

使用连接池调用过程:

  1. Pool.prototype.acquire = function acquire (callback, priority) {
  2. if (this._draining) {
  3. throw new Error('pool is draining and cannot accept work')
  4. }
  5. if (process.domain) {
  6. callback = process.domain.bind(callback)
  7. }
  8. //1
  9. this._waitingClients.enqueue(callback, priority)
  10. //2
  11. this._dispense()
  12. return (this._count < this._factory.max)
  13. }

这里的callback就是acquire里的query,重要的就1,2。先看1.

  1. PriorityQueue.prototype.enqueue = function enqueue (obj, priority) {
  2. var priorityOrig
  3. // Convert to integer with a default value of 0.
  4. priority = priority && +priority | 0 || 0
  5. // Clear cache for total.
  6. this._total = null
  7. if (priority) {
  8. priorityOrig = priority
  9. if (priority < 0 || priority >= this._size) {
  10. priority = (this._size - 1)
  11. // put obj at the end of the line
  12. console.error('invalid priority: ' + priorityOrig + ' must be between 0 and ' + priority)
  13. }
  14. }
  15. this._slots[priority].push(obj)
  16. }

 
从入队源码可以看出实现优先级应该怎么办,没错就是分配。来看2。

  1. while (this._availableObjects.length > 0) {
  2. this._log('dispense() - reusing obj', 'verbose')
  3. objWithTimeout = this._availableObjects[0]
  4. //不合法处理,弱返回true执行销毁
  5. if (!this._factory.validate(objWithTimeout.obj)) {
  6. this.destroy(objWithTimeout.obj)
  7. continue
  8. }
  9. //available出栈
  10. this._availableObjects.shift()
  11. //inUse入栈
  12. this._inUseObjects.push(objWithTimeout.obj)
  13. //链接对象出队
  14. clientCb = this._waitingClients.dequeue()
  15. return clientCb(null, objWithTimeout.obj)
  16. }
  17. //继续补充连接
  18. if (this._count < this._factory.max) {
  19. this._createResource()
  20. }

注意出队,也是优先级实现的一部分,slots中越往后的越慢出栈。代码如下:

  1. PriorityQueue.prototype.dequeue = function dequeue (callback) {
  2. var obj = null
  3. // Clear cache for total.
  4. this._total = null
  5. for (var i = 0, sl = this._slots.length; i < sl; i += 1) {
  6. //如果队列存在则出栈
  7. if (this._slots[i].length) {
  8. obj = this._slots[i].shift()
  9. break
  10. }
  11. }
  12. return obj
  13. }

去除过期失效的链接

_scheduleRemoveIdle函数在addResourceToAvailableObjects中被调用,去除(执行destroy)完成后,若可用连接对象不为0则继续按时检查可用连接对象是否过期。

连接复用问题:

从availableObj到inUseObj的队列中,在执行pool.release(client)之后,连接被释放,这里的代码是这样的。

  1. Pool.prototype.release = function release (obj) {
  2. // check to see if this object has already been released (i.e., is back in the pool of this._availableObjects)
  3. if (this._availableObjects.some(function (objWithTimeout) { return (objWithTimeout.obj === obj) })) {
  4. this._log('release called twice for the same resource: ' + (new Error().stack), 'error')
  5. return
  6. }
  7. // check to see if this object exists in the `in use` list and remove it
  8. var index = this._inUseObjects.indexOf(obj)
  9. if (index < 0) {
  10. this._log('attempt to release an invalid resource: ' + (new Error().stack), 'error')
  11. return
  12. }
  13. // this._log("return to pool")
  14. this._inUseObjects.splice(index, 1)
  15. this._addResourceToAvailableObjects(obj)
  16. }

 
那么问题来了,在第一个判断不断的抛出错误,看释放的链接是否在可用连接队列里。这里一直返回true,问题就是这样,暂没有找到作者为毛这样写的依据(已解决,我是智障。。)。连接的复用其实就是从inUseObj搬家到availableObj的过程。看了源码之后感觉这里有个问题。复用过程中连接是否超过生存期的问题,如果再次复用,会重新设定时间戳,其实这个连接已经活了很久了。这是一个通用连接池,所以关于连接是否有效还需要拿出处理,这不在讨论范围内。这是一个比较简单的demo,有空version3看一看。

后记:

有很多很多基础问题木有解决。!! 先各种做一些小实验压压惊。。通过分析这个数据库连接池源码,搞懂了通用连接池内部是怎么实现功能的。写可能还不行,今后半年咸鱼时间都用来看模块熟悉大神们的笔迹吧哈哈。再过几天去北京了,献给我一个寒假最后一天可以自己支配的安静的下午,晚安~
 
 
 
 


CyanProbe , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:generic-pool v2 数据库连接池源码分析
喜欢 (1)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(7)个小伙伴在吐槽
  1. 不是代码牛表示看不懂,不过“打印店房间了”应该是放假了吧?
    姜辰2017-02-02 20:43 回复
  2. 看不懂啊。。
    2017-02-04 11:47 回复
  3. 以后会常来逛逛,博客很棒。
    Feeey个人博客2017-02-11 07:32 回复
  4. 阿西吧,我看不懂!老司机带带我吧 :lol:
    重庆游戏人2017-02-16 11:35 回复
  5. 博主最近没怎么更新啊,我来转转。
    Feeey个人博客2017-02-28 01:37 回复
  6. 大王叫我来巡山,我把博客转一转。(*^__^*) 嘻嘻……
    Feeey个人博客2017-03-12 00:36 回复