热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

mysqlpool_ErlangpoolmanagementEmysqlpool

从这篇开始,这一系列主要分析在开源社区中,Erlang相关pool的管理和使用.在开源社区,Emysql是Erlang较为受欢迎的一个MySQL驱动.Emysql对pool的管理和

从这篇开始,这一系列主要分析在开源社区中,Erlang 相关pool 的管理和使用.

在开源社区,Emysql 是Erlang 较为受欢迎的一个MySQL 驱动. Emysql 对pool 的管理和使用是非常典型的,pool 的管理角色中,主要有available(记录当前pool 中可供使用的成员),locked(记录当前pool 中正在被使用的成员),waiting(记录当前正在处理等待该pool 的用户).用户进程在使用pool 过程中, pool 中的成员在这三个角色中来回迁移.

pool 数据结构

Emysql pool 的数据结构如下:

1 -record(pool, {pool_id :: atom(),2 size :: number(),3 user :: string(),4 password :: string(),5 host :: string(),6 port :: number(),7 database :: string(),8 encoding :: utf8 | latin1 | {utf8, utf8_unicode_ci} |{utf8, utf8_general_ci},9 available=queue:new() :: queue(),10 locked=gb_trees:empty() :: gb_tree(),11 waiting=queue:new() :: queue(),12 start_cmds=[] :: string(),13 conn_test_period=0:: number(),14 connect_timeout=infinity :: number() |infinity,15 warnings=false :: boolean()}).

L1 处的pool_id 为pool 的标识

L2 定义了pool 中成员的数量

L3 L4 为用以连接MySQL 数据库的用户名和密码

L5 L6 L7 L8 为连接MySQL 数据库的IP, 端口, 默认数据库, 编码

L9 用以记录当前pool 中可用的成员

L10 用以记录当前pool 中正在被使用的成员

L11 用以记录当前等待pool 中成员的用户

L12 为在与数据库建立连接后的初始化命令

L14 是用于gen_tcp:connect 时的超时参数

pool 添加操作

在Emysql 项目中,emysql module 定义了所有外部操作的API, 其中添加操作的API有:

1, add_pool/2

2, add_pool/8

3, add_pool/9

以下代码片段为add_pool 的实质性处理逻辑:

1 add_pool(#pool{pool_id=PoolId,size=Size,user=User,password=Password,host=Host,port=Port,2 database=Database,encoding=Encoding,start_cmds=StartCmds,3 connect_timeout=ConnectTimeout,warnings=Warnings}=PoolSettings)->

4 config_ok(PoolSettings),5 case emysql_conn_mgr:has_pool(PoolId) of

6 true ->

7 {error,pool_already_exists};8 false ->

9 Pool =#pool{10 pool_id =PoolId,11 size =Size,12 user =User,13 password =Password,14 host =Host,15 port =Port,16 database =Database,17 encoding =Encoding,18 start_cmds =StartCmds,19 connect_timeout =ConnectTimeout,20 warnings =Warnings21 },22 Pool2 = case emysql_conn:open_connections(Pool) of

23 {ok, Pool1} ->Pool1;24 {error, Reason} ->throw(Reason)25 end,26 emysql_conn_mgr:add_pool(Pool2)27 end.

处理逻辑主要有:

1, 确认参数的数据类型

2, 检查当前是否已经有相同ID的 pool

3, 与MySQL server 建立connection

4, 在emysql_conn_mgr 中添加 该pool

在当前的Emysql 项目中,emysql_conn_mgr 是用来管理所有pool 的gen_server 进程. 对于所有的pool 而言,其内部成员的状态管理, 都是由emysql_conn_mgr 调度的, 包括某个pool 中connection 成员的使用,归还等.

确认参数的数据类型

确认参数的数据类型主要使用erlang:is_{{type}} bif func, 在Erlang VM 内存,对于每种数据类型都是以后缀来识别的.

1 config_ok(#pool{pool_id=PoolId,size=Size,user=User,password=Password,host=Host,port=Port,2 database=Database,encoding=Encoding,start_cmds=StartCmds,3 connect_timeout=ConnectTimeout,warnings=Warnings})4 whenis_atom(PoolId),5 is_integer(Size),6 is_list(User),7 is_list(Password),8 is_list(Host),9 is_integer(Port),10 is_list(Database) orelse Database ==undefined,11 is_list(StartCmds),12 is_integer(ConnectTimeout) orelse ConnectTimeout ==infinity,13 is_boolean(Warnings) ->

14 encoding_ok(Encoding);15 config_ok(_BadOptions) ->

16 erlang:error(badarg).17

18 encoding_ok(Enc) when is_atom(Enc) ->ok;19 encoding_ok({Enc, Coll}) when is_atom(Enc), is_atom(Coll) ->ok;20 encoding_ok(_) -> erlang:error(badarg).

因此, 这部分的操作足够高效.以 is_list/1为例:

1 #define TAG_PRIMARY_LIST 0x1

2 #define is_list(x) (((x) & _TAG_PRIMARY_MASK) == TAG_PRIMARY_LIST)

3 #define is_not_list(x) (!is_list((x)))

与MySQL server 建立链接

此处与MySQL server 建立链接是调用emysql_conn module 中的open_connections 函数.

1 %%@doc Opens connections for the necessary pool.

2 %%3 %%If connection opening fails, removes all connections from the pool

4 %%Does not remove pool from emysql_conn_mgr due to a possible deadlock.

5 %%Caller must do it by itself.

6 open_connections(Pool) ->

7 %-% io:format("open connections loop: .. "),8 case (queue:len(Pool#pool.available) + gb_trees:size(Pool#pool.locked))

9 true ->

10 case catch open_connection(Pool) of

11 #emysql_connection{} = Conn ->

12 open_connections(Pool#pool{available =queue:in(Conn, Pool#pool.available)});13 {'EXIT', Reason} ->

14 AllConns =lists:append(15 queue:to_list(Pool#pool.available),16 gb_trees:values(Pool#pool.locked)17 ),18 lists:foreach(fun emysql_conn:close_connection/1, AllConns),19 {error, Reason}20 end;21 false ->

22 {ok, Pool}23 end.

链接的总数为pool 结构中的size 字段(L8),成功建立链接后,将Conn 放入 available queue 中(L12).

d76031098bf0880a19ec82d3e9953aaf.png

在emysql_conn_mgr 中添加 pool

当pool 与MySQL server 建立链接完成后,需要将pool 添加到emysql_conn_mgr 中, 以便emysql_conn_mgr gen_server 进程对pool 进行管理.

添加pool add_pool/1 的操作:

1 add_pool(Pool) ->

2 do_gen_call({add_pool, Pool}).3

4 ...5

6 handle_call({add_pool, Pool}, _From, State) ->

7 case find_pool(Pool#pool.pool_id, State#state.pools) of

8 {_, _} ->

9 {reply, {error, pool_already_exists}, State};10 undefined ->

11 {reply, ok, State#state{pools = [Pool|State#state.pools]}}12 end;

如果当前emysql_conn_mgr gen_server 进程中,并未记录(L8)该pool 的话,就将该pool 添加到emysql_conn_mgr gen_server 进程的state 数据中(L11).

has_pool/1 的操作:

1 has_pool(Pool) ->

2 do_gen_call({has_pool, Pool}).3

4 ....5

6 handle_call({has_pool, PoolID}, _From, State) ->

7 case find_pool(PoolID, State#state.pools) of

8 {_, _} ->

9 {reply, true, State};10 undefined ->

11 {reply, false, State}12 end;

以上,即为add_pool 操作的整个流程. emysql_conn_mgr gen_server 进程是管理pool 的非常重要的进程.

pool 使用管理

取出connection

在execute 执行一条SQL语句时, 用户进程需要先请求emysql_conn_mgr gen_server 进程从给定pool_id 的pool 中取出一个成员.

1 execute(PoolId, Query, Args, Timeout) when (is_list(Query) orelse is_binary(Query)) andalso is_list(Args) andalso (is_integer(Timeout) orelse Timeout == infinity) ->

2 Connection =emysql_conn_mgr:wait_for_connection(PoolId),3 monitor_work(Connection, Timeout, [Connection, Query, Args]);

1 wait_for_connection(PoolId ,Timeout)->

2 %%try to lock a connection. if no connections are available then

3 %%wait to be notified of the next available connection

4 %-% io:format("~p waits for connection to pool ~p~n", [self(), PoolId]),5 case do_gen_call({lock_connection, PoolId, true, self()}) of

6 unavailable ->

7 %-% io:format("~p is queued~n", [self()]),8 receive

9 {connection, Connection} ->Connection10 after Timeout ->

11 do_gen_call({abort_wait, PoolId}),12 receive

13 {connection, Connection} ->Connection14 after

15 0 ->exit(connection_lock_timeout)16 end

17 end;18 Connection ->

19 %-% io:format("~p gets connection~n", [self()]),20 Connection21 end.

然后对于wait_for_connection/2 函数的操作, 首先会调用emysql_conn_mgr gen_server handle_call 操作 lock_connection

65ffc4e2cff14c551815c50e64e51f13.png

而lock_next_connection 函数的主要功能是从pool 的available queue 中out 一个元素, 并monitor 调用进程(以防调用进程异常退出而没有归还conn).

1 lock_next_connection(Available ,Locked, Who) ->

2 case queue:out(Available) of

3 {{value, Conn}, OtherAvailable} ->

4 MonitorRef =erlang:monitor(process, Who),5 NewConn =connection_locked_at(Conn, MonitorRef),6 MonitorTuple ={MonitorRef,7 {NewConn#emysql_connection.pool_id, NewConn#emysql_connection.id}},8 NewLocked =gb_trees:enter(NewConn#emysql_connection.id, NewConn, Locked),9 {ok, NewConn, OtherAvailable, NewLocked, MonitorTuple};10 {empty, _} ->

11 unavailable12 end.

L2 处会尝试从available queue 中取出元素conn, 如果queue 此时不为空, emysql_conn_mgr 进程就会monitor (L4)用户进程,然后将该conn gb_tress enter 到locked tree 中(L8).

在"无可用的conn"的情况下, emysql_conn_mgr gen_server 进程会将用户进程写入到pool 的waiting queue中, 并且返回'unavailable', 用户进程就会等待conn 的其他使用者归还conn .

归还connection

在用户使用完conn 之后,应该及时归还给pool, 以防链接资源泄露.

在 emysql_conn_mgr module 中定义了pass_connection/1 函数以及在 handl_call callback 中实现了 handle_call({{replace_connection, Kind}, OldConn, NewConn}, _From, State).

在handle_call callback 函数中, 首先会从locked tree 中delete 掉该conn.然后从waiting queue 中取出之前等待的用户进程ID, 将conn 发送给alive 的等待进程, 并更新locked tree waiting queue. 如果waiting queue 中无alive 等待进程, 就将conn 还回给available queue, 并更新相关的管理角色.

1 serve_waiting_pids(Waiting, Available, Locked, MonitorRefs) ->

2 case queue:is_empty(Waiting) of

3 false ->

4 Who =queue:get(Waiting),5 case lock_next_connection(Available, Locked, Who) of

6 {ok, Connection, OtherAvailable, NewLocked, NewRef} ->

7 {{value, Pid}, OtherWaiting} =queue:out(Waiting),8 case erlang:is_process_alive(Pid) of

9 true ->

10 erlang:send(Pid, {connection, Connection}),11 serve_waiting_pids(OtherWaiting, OtherAvailable, NewLocked, [NewRef |MonitorRefs]);12 _ ->

13 serve_waiting_pids(OtherWaiting, Available, Locked, MonitorRefs)14 end;15 unavailable ->

16 {Waiting, Available, Locked, MonitorRefs}17 end;18 true ->

19 {Waiting, Available, Locked, MonitorRefs}20 end.

conn 使用进程退出

如果某用户进程在从pool 中取出conn 使用, 但是在使用过程中, 用户进程异常退出, 无法调用pass_connection/1 函数归还conn , 就会出现资源泄露的问题.

在emysql_conn_mgr module 中, 是使用monitor 用户进程的方式处理的. 在用户进程获得一个conn 之后, emysql_conn_mgr 会使用BIF erlang:monitor/2 函数 monitor 用户进程.当用户进程异常退出后,emysql_conn_mgr 进程就会收到'DOWN' 的message.然后在emysql_conn_mgr module 中的handle_info callback 函数中处理:

1 handle_info({'DOWN', MonitorRef, _, _, _}, State) ->

2 case dict:find(MonitorRef, State#state.lockers) of

3 {ok, {PoolId, ConnId}} ->

4 case find_pool(PoolId, State#state.pools) of

5 {Pool, _} ->

6 case gb_trees:lookup(ConnId, Pool#pool.locked) of

7 {value, Conn} ->async_reset_conn(State#state.pools, Conn);8 _ ->ok9 end;10 _ ->

11 ok12 end;13 _ ->

14 ok15 end,16 {noreply, State};

可以看出,在emysql_conn_mgr 进程接收到'DOWN' 的message 之后, 会在进程dict 的locker 中 查找poolID和 connID, 继而重置conn .

等待进程 abort_wait

某进程在获取pool 中 conn 时, 在Timeout 之后, 用户进程会调用abort_wait, emysql_conn_mgr 进程就会从waiting queue 中, 将不再等待的用户进程remove

1 handle_call({abort_wait, PoolId}, {From, _Mref}, State) ->

2 case find_pool(PoolId, State#state.pools) of

3 {Pool, OtherPools} ->

4 %%Remove From from the wait queue

5 QueueNow =queue:filter(6 fun(Pid) -> Pid =/= From end,7 Pool#pool.waiting),8 PoolNow = Pool#pool{ waiting =QueueNow },9 %%See if the length changed to know if From was removed.

10 OldLen =queue:len(Pool#pool.waiting),11 NewLen =queue:len(QueueNow),12 if

13 OldLen =:= NewLen ->

14 Reply =not_waiting;15 true ->

16 Reply =ok17 end,18 {reply, Reply, State#state{pools=[PoolNow|OtherPools]}};19 undefined ->

20 {reply, {error, pool_not_found}, State}21 end;

这样的设计同样是为了尽可能的保证避免资源的泄露,试想如果emysql_conn_mgr 进程将conn 发送给已经不再等待(不再需要)的进程,那该conn 就不可能再被归还.

在大多数情况下,这种设计是可以保证conn 不会被发送给不再等待的用户进程,但是在达成"gen_server 进程的处理是顺序性"这样共识的前提下考虑以下情况:

9bfeb749660aa23e18cedcc63a46502f.png

也就是在处理pass_connection , emysql_conn_mgr 进程从waiting queue 取出了ProcessA进程的同时, ProcessA 进程因为Timeout 调用了abort_wait 且exit 退出. emysql_conn_mgr 的顺序性处理,使得只有在处理完pass_connection 之后, 才能处理abort_wait 操作. 最终导致的结果就是将conn 发送给已经exit (但是还没有从waiting queue remove) 的用户进程ProcessA, 是conn 不能再被归还.

当这种情况出现的时候,就应该在发送conn 给 ProcessA 进程之前, 判断ProcessA 是否alive(这个pr 已经被merged了).

总结

在Emysql 的pool 管理中,主要使用了:

1, available queue 记录所有可用的pool 成员

2, locked tree 记录所有正在被使用的pool 成员

3, waiting queue 记录所有正在等待的用户进程

4, monitor 所有正在使用pool 成员的用户进程, 处理异常退出的case

5, 处理等待进程的abort_wait 请求, 更新waiting queue

6, 在发送pool 成员之前, 应该判断用户进程是否alive, 防止资源泄露

遗漏

现在的emysql_conn_mgr gen_server 进程属于单点,也就是所有的pool 的管理调度都是由一个进程来完成.

------------------------------

觉得写的还可以,就扫个码,打个赏呗。

c06b8fb1dd9a85503c8ad3e0050102c9.png



推荐阅读
  • 本指南介绍了如何在ASP.NET Web应用程序中利用C#和JavaScript实现基于指纹识别的登录系统。通过集成指纹识别技术,用户无需输入传统的登录ID即可完成身份验证,从而提升用户体验和安全性。我们将详细探讨如何配置和部署这一功能,确保系统的稳定性和可靠性。 ... [详细]
  • 开机自启动的几种方式
    0x01快速自启动目录快速启动目录自启动方式源于Windows中的一个目录,这个目录一般叫启动或者Startup。位于该目录下的PE文件会在开机后进行自启动 ... [详细]
  • 本文介绍了如何在 Vue 3 组合 API 中正确设置 setup() 函数的 TypeScript 类型,以避免隐式 any 类型的问题。 ... [详细]
  • WinMain 函数详解及示例
    本文详细介绍了 WinMain 函数的参数及其用途,并提供了一个具体的示例代码来解析 WinMain 函数的实现。 ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
  • 在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 深入解析 Lifecycle 的实现原理
    本文将详细介绍 Android Jetpack 中 Lifecycle 组件的实现原理,帮助开发者更好地理解和使用 Lifecycle,避免常见的内存泄漏问题。 ... [详细]
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
  • 在CentOS 7环境中安装配置Redis及使用Redis Desktop Manager连接时的注意事项与技巧
    在 CentOS 7 环境中安装和配置 Redis 时,需要注意一些关键步骤和最佳实践。本文详细介绍了从安装 Redis 到配置其基本参数的全过程,并提供了使用 Redis Desktop Manager 连接 Redis 服务器的技巧和注意事项。此外,还探讨了如何优化性能和确保数据安全,帮助用户在生产环境中高效地管理和使用 Redis。 ... [详细]
  • 您的数据库配置是否安全?DBSAT工具助您一臂之力!
    本文探讨了Oracle提供的免费工具DBSAT,该工具能够有效协助用户检测和优化数据库配置的安全性。通过全面的分析和报告,DBSAT帮助用户识别潜在的安全漏洞,并提供针对性的改进建议,确保数据库系统的稳定性和安全性。 ... [详细]
  • 深入解析Android GPS机制:第五部分 ... [详细]
  • 在使用 Cacti 进行监控时,发现已运行的转码机未产生流量,导致 Cacti 监控界面显示该转码机处于宕机状态。进一步检查 Cacti 日志,发现数据库中存在 SQL 查询失败的问题,错误代码为 145。此问题可能是由于数据库表损坏或索引失效所致,建议对相关表进行修复操作以恢复监控功能。 ... [详细]
  • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
    本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
author-avatar
mobiledu2502889253
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有