热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

mysqlpool_ErlangpoolmanagementEmysqlpool

从这篇开始,这一系列主要分析在开源社区中,Erlang相关pool的管理和使用.在开源社区,Emysql是Erlang较为受欢迎的一个MySQL驱动.Emysql对pool的管理和

从这篇开始,这一系列主要分析在开源社区中,Erlang 相关pool 的管理和使用.

在开源社区,Emysql 是Erlang 较为受欢迎的一个MySQL 驱动. Emysql 对pool 的管理和使用是非常典型的,pool 的管理角色中,主要有available(记录当前pool 中可供使用的成员),locked(记录当前pool 中正在被使用的成员),waiting(记录当前正在处理等待该pool 的用户).用户进程在使用pool 过程中, pool 中的成员在这三个角色中来回迁移.

pool 数据结构

Emysql pool 的数据结构如下:

1 -record(pool, {pool_id :: atom(),2 size :: number(),3 user :: string(),4 password :: string(),5 host :: string(),6 port :: number(),7 database :: string(),8 encoding :: utf8 | latin1 | {utf8, utf8_unicode_ci} |{utf8, utf8_general_ci},9 available=queue:new() :: queue(),10 locked=gb_trees:empty() :: gb_tree(),11 waiting=queue:new() :: queue(),12 start_cmds=[] :: string(),13 conn_test_period=0:: number(),14 connect_timeout=infinity :: number() |infinity,15 warnings=false :: boolean()}).

L1 处的pool_id 为pool 的标识

L2 定义了pool 中成员的数量

L3 L4 为用以连接MySQL 数据库的用户名和密码

L5 L6 L7 L8 为连接MySQL 数据库的IP, 端口, 默认数据库, 编码

L9 用以记录当前pool 中可用的成员

L10 用以记录当前pool 中正在被使用的成员

L11 用以记录当前等待pool 中成员的用户

L12 为在与数据库建立连接后的初始化命令

L14 是用于gen_tcp:connect 时的超时参数

pool 添加操作

在Emysql 项目中,emysql module 定义了所有外部操作的API, 其中添加操作的API有:

1, add_pool/2

2, add_pool/8

3, add_pool/9

以下代码片段为add_pool 的实质性处理逻辑:

1 add_pool(#pool{pool_id=PoolId,size=Size,user=User,password=Password,host=Host,port=Port,2 database=Database,encoding=Encoding,start_cmds=StartCmds,3 connect_timeout=ConnectTimeout,warnings=Warnings}=PoolSettings)->

4 config_ok(PoolSettings),5 case emysql_conn_mgr:has_pool(PoolId) of

6 true ->

7 {error,pool_already_exists};8 false ->

9 Pool =#pool{10 pool_id =PoolId,11 size =Size,12 user =User,13 password =Password,14 host =Host,15 port =Port,16 database =Database,17 encoding =Encoding,18 start_cmds =StartCmds,19 connect_timeout =ConnectTimeout,20 warnings =Warnings21 },22 Pool2 = case emysql_conn:open_connections(Pool) of

23 {ok, Pool1} ->Pool1;24 {error, Reason} ->throw(Reason)25 end,26 emysql_conn_mgr:add_pool(Pool2)27 end.

处理逻辑主要有:

1, 确认参数的数据类型

2, 检查当前是否已经有相同ID的 pool

3, 与MySQL server 建立connection

4, 在emysql_conn_mgr 中添加 该pool

在当前的Emysql 项目中,emysql_conn_mgr 是用来管理所有pool 的gen_server 进程. 对于所有的pool 而言,其内部成员的状态管理, 都是由emysql_conn_mgr 调度的, 包括某个pool 中connection 成员的使用,归还等.

确认参数的数据类型

确认参数的数据类型主要使用erlang:is_{{type}} bif func, 在Erlang VM 内存,对于每种数据类型都是以后缀来识别的.

1 config_ok(#pool{pool_id=PoolId,size=Size,user=User,password=Password,host=Host,port=Port,2 database=Database,encoding=Encoding,start_cmds=StartCmds,3 connect_timeout=ConnectTimeout,warnings=Warnings})4 whenis_atom(PoolId),5 is_integer(Size),6 is_list(User),7 is_list(Password),8 is_list(Host),9 is_integer(Port),10 is_list(Database) orelse Database ==undefined,11 is_list(StartCmds),12 is_integer(ConnectTimeout) orelse ConnectTimeout ==infinity,13 is_boolean(Warnings) ->

14 encoding_ok(Encoding);15 config_ok(_BadOptions) ->

16 erlang:error(badarg).17

18 encoding_ok(Enc) when is_atom(Enc) ->ok;19 encoding_ok({Enc, Coll}) when is_atom(Enc), is_atom(Coll) ->ok;20 encoding_ok(_) -> erlang:error(badarg).

因此, 这部分的操作足够高效.以 is_list/1为例:

1 #define TAG_PRIMARY_LIST 0x1

2 #define is_list(x) (((x) & _TAG_PRIMARY_MASK) == TAG_PRIMARY_LIST)

3 #define is_not_list(x) (!is_list((x)))

与MySQL server 建立链接

此处与MySQL server 建立链接是调用emysql_conn module 中的open_connections 函数.

1 %%@doc Opens connections for the necessary pool.

2 %%3 %%If connection opening fails, removes all connections from the pool

4 %%Does not remove pool from emysql_conn_mgr due to a possible deadlock.

5 %%Caller must do it by itself.

6 open_connections(Pool) ->

7 %-% io:format("open connections loop: .. "),8 case (queue:len(Pool#pool.available) + gb_trees:size(Pool#pool.locked))

9 true ->

10 case catch open_connection(Pool) of

11 #emysql_connection{} = Conn ->

12 open_connections(Pool#pool{available =queue:in(Conn, Pool#pool.available)});13 {'EXIT', Reason} ->

14 AllConns =lists:append(15 queue:to_list(Pool#pool.available),16 gb_trees:values(Pool#pool.locked)17 ),18 lists:foreach(fun emysql_conn:close_connection/1, AllConns),19 {error, Reason}20 end;21 false ->

22 {ok, Pool}23 end.

链接的总数为pool 结构中的size 字段(L8),成功建立链接后,将Conn 放入 available queue 中(L12).

d76031098bf0880a19ec82d3e9953aaf.png

在emysql_conn_mgr 中添加 pool

当pool 与MySQL server 建立链接完成后,需要将pool 添加到emysql_conn_mgr 中, 以便emysql_conn_mgr gen_server 进程对pool 进行管理.

添加pool add_pool/1 的操作:

1 add_pool(Pool) ->

2 do_gen_call({add_pool, Pool}).3

4 ...5

6 handle_call({add_pool, Pool}, _From, State) ->

7 case find_pool(Pool#pool.pool_id, State#state.pools) of

8 {_, _} ->

9 {reply, {error, pool_already_exists}, State};10 undefined ->

11 {reply, ok, State#state{pools = [Pool|State#state.pools]}}12 end;

如果当前emysql_conn_mgr gen_server 进程中,并未记录(L8)该pool 的话,就将该pool 添加到emysql_conn_mgr gen_server 进程的state 数据中(L11).

has_pool/1 的操作:

1 has_pool(Pool) ->

2 do_gen_call({has_pool, Pool}).3

4 ....5

6 handle_call({has_pool, PoolID}, _From, State) ->

7 case find_pool(PoolID, State#state.pools) of

8 {_, _} ->

9 {reply, true, State};10 undefined ->

11 {reply, false, State}12 end;

以上,即为add_pool 操作的整个流程. emysql_conn_mgr gen_server 进程是管理pool 的非常重要的进程.

pool 使用管理

取出connection

在execute 执行一条SQL语句时, 用户进程需要先请求emysql_conn_mgr gen_server 进程从给定pool_id 的pool 中取出一个成员.

1 execute(PoolId, Query, Args, Timeout) when (is_list(Query) orelse is_binary(Query)) andalso is_list(Args) andalso (is_integer(Timeout) orelse Timeout == infinity) ->

2 Connection =emysql_conn_mgr:wait_for_connection(PoolId),3 monitor_work(Connection, Timeout, [Connection, Query, Args]);

1 wait_for_connection(PoolId ,Timeout)->

2 %%try to lock a connection. if no connections are available then

3 %%wait to be notified of the next available connection

4 %-% io:format("~p waits for connection to pool ~p~n", [self(), PoolId]),5 case do_gen_call({lock_connection, PoolId, true, self()}) of

6 unavailable ->

7 %-% io:format("~p is queued~n", [self()]),8 receive

9 {connection, Connection} ->Connection10 after Timeout ->

11 do_gen_call({abort_wait, PoolId}),12 receive

13 {connection, Connection} ->Connection14 after

15 0 ->exit(connection_lock_timeout)16 end

17 end;18 Connection ->

19 %-% io:format("~p gets connection~n", [self()]),20 Connection21 end.

然后对于wait_for_connection/2 函数的操作, 首先会调用emysql_conn_mgr gen_server handle_call 操作 lock_connection

65ffc4e2cff14c551815c50e64e51f13.png

而lock_next_connection 函数的主要功能是从pool 的available queue 中out 一个元素, 并monitor 调用进程(以防调用进程异常退出而没有归还conn).

1 lock_next_connection(Available ,Locked, Who) ->

2 case queue:out(Available) of

3 {{value, Conn}, OtherAvailable} ->

4 MonitorRef =erlang:monitor(process, Who),5 NewConn =connection_locked_at(Conn, MonitorRef),6 MonitorTuple ={MonitorRef,7 {NewConn#emysql_connection.pool_id, NewConn#emysql_connection.id}},8 NewLocked =gb_trees:enter(NewConn#emysql_connection.id, NewConn, Locked),9 {ok, NewConn, OtherAvailable, NewLocked, MonitorTuple};10 {empty, _} ->

11 unavailable12 end.

L2 处会尝试从available queue 中取出元素conn, 如果queue 此时不为空, emysql_conn_mgr 进程就会monitor (L4)用户进程,然后将该conn gb_tress enter 到locked tree 中(L8).

在"无可用的conn"的情况下, emysql_conn_mgr gen_server 进程会将用户进程写入到pool 的waiting queue中, 并且返回'unavailable', 用户进程就会等待conn 的其他使用者归还conn .

归还connection

在用户使用完conn 之后,应该及时归还给pool, 以防链接资源泄露.

在 emysql_conn_mgr module 中定义了pass_connection/1 函数以及在 handl_call callback 中实现了 handle_call({{replace_connection, Kind}, OldConn, NewConn}, _From, State).

在handle_call callback 函数中, 首先会从locked tree 中delete 掉该conn.然后从waiting queue 中取出之前等待的用户进程ID, 将conn 发送给alive 的等待进程, 并更新locked tree waiting queue. 如果waiting queue 中无alive 等待进程, 就将conn 还回给available queue, 并更新相关的管理角色.

1 serve_waiting_pids(Waiting, Available, Locked, MonitorRefs) ->

2 case queue:is_empty(Waiting) of

3 false ->

4 Who =queue:get(Waiting),5 case lock_next_connection(Available, Locked, Who) of

6 {ok, Connection, OtherAvailable, NewLocked, NewRef} ->

7 {{value, Pid}, OtherWaiting} =queue:out(Waiting),8 case erlang:is_process_alive(Pid) of

9 true ->

10 erlang:send(Pid, {connection, Connection}),11 serve_waiting_pids(OtherWaiting, OtherAvailable, NewLocked, [NewRef |MonitorRefs]);12 _ ->

13 serve_waiting_pids(OtherWaiting, Available, Locked, MonitorRefs)14 end;15 unavailable ->

16 {Waiting, Available, Locked, MonitorRefs}17 end;18 true ->

19 {Waiting, Available, Locked, MonitorRefs}20 end.

conn 使用进程退出

如果某用户进程在从pool 中取出conn 使用, 但是在使用过程中, 用户进程异常退出, 无法调用pass_connection/1 函数归还conn , 就会出现资源泄露的问题.

在emysql_conn_mgr module 中, 是使用monitor 用户进程的方式处理的. 在用户进程获得一个conn 之后, emysql_conn_mgr 会使用BIF erlang:monitor/2 函数 monitor 用户进程.当用户进程异常退出后,emysql_conn_mgr 进程就会收到'DOWN' 的message.然后在emysql_conn_mgr module 中的handle_info callback 函数中处理:

1 handle_info({'DOWN', MonitorRef, _, _, _}, State) ->

2 case dict:find(MonitorRef, State#state.lockers) of

3 {ok, {PoolId, ConnId}} ->

4 case find_pool(PoolId, State#state.pools) of

5 {Pool, _} ->

6 case gb_trees:lookup(ConnId, Pool#pool.locked) of

7 {value, Conn} ->async_reset_conn(State#state.pools, Conn);8 _ ->ok9 end;10 _ ->

11 ok12 end;13 _ ->

14 ok15 end,16 {noreply, State};

可以看出,在emysql_conn_mgr 进程接收到'DOWN' 的message 之后, 会在进程dict 的locker 中 查找poolID和 connID, 继而重置conn .

等待进程 abort_wait

某进程在获取pool 中 conn 时, 在Timeout 之后, 用户进程会调用abort_wait, emysql_conn_mgr 进程就会从waiting queue 中, 将不再等待的用户进程remove

1 handle_call({abort_wait, PoolId}, {From, _Mref}, State) ->

2 case find_pool(PoolId, State#state.pools) of

3 {Pool, OtherPools} ->

4 %%Remove From from the wait queue

5 QueueNow =queue:filter(6 fun(Pid) -> Pid =/= From end,7 Pool#pool.waiting),8 PoolNow = Pool#pool{ waiting =QueueNow },9 %%See if the length changed to know if From was removed.

10 OldLen =queue:len(Pool#pool.waiting),11 NewLen =queue:len(QueueNow),12 if

13 OldLen =:= NewLen ->

14 Reply =not_waiting;15 true ->

16 Reply =ok17 end,18 {reply, Reply, State#state{pools=[PoolNow|OtherPools]}};19 undefined ->

20 {reply, {error, pool_not_found}, State}21 end;

这样的设计同样是为了尽可能的保证避免资源的泄露,试想如果emysql_conn_mgr 进程将conn 发送给已经不再等待(不再需要)的进程,那该conn 就不可能再被归还.

在大多数情况下,这种设计是可以保证conn 不会被发送给不再等待的用户进程,但是在达成"gen_server 进程的处理是顺序性"这样共识的前提下考虑以下情况:

9bfeb749660aa23e18cedcc63a46502f.png

也就是在处理pass_connection , emysql_conn_mgr 进程从waiting queue 取出了ProcessA进程的同时, ProcessA 进程因为Timeout 调用了abort_wait 且exit 退出. emysql_conn_mgr 的顺序性处理,使得只有在处理完pass_connection 之后, 才能处理abort_wait 操作. 最终导致的结果就是将conn 发送给已经exit (但是还没有从waiting queue remove) 的用户进程ProcessA, 是conn 不能再被归还.

当这种情况出现的时候,就应该在发送conn 给 ProcessA 进程之前, 判断ProcessA 是否alive(这个pr 已经被merged了).

总结

在Emysql 的pool 管理中,主要使用了:

1, available queue 记录所有可用的pool 成员

2, locked tree 记录所有正在被使用的pool 成员

3, waiting queue 记录所有正在等待的用户进程

4, monitor 所有正在使用pool 成员的用户进程, 处理异常退出的case

5, 处理等待进程的abort_wait 请求, 更新waiting queue

6, 在发送pool 成员之前, 应该判断用户进程是否alive, 防止资源泄露

遗漏

现在的emysql_conn_mgr gen_server 进程属于单点,也就是所有的pool 的管理调度都是由一个进程来完成.

------------------------------

觉得写的还可以,就扫个码,打个赏呗。

c06b8fb1dd9a85503c8ad3e0050102c9.png



推荐阅读
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 基于PgpoolII的PostgreSQL集群安装与配置教程
    本文介绍了基于PgpoolII的PostgreSQL集群的安装与配置教程。Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据库客户端之间的中间件,提供了连接池、复制、负载均衡、缓存、看门狗、限制链接等功能,可以用于搭建高可用的PostgreSQL集群。文章详细介绍了通过yum安装Pgpool-II的步骤,并提供了相关的官方参考地址。 ... [详细]
  • 本文讨论了使用差分约束系统求解House Man跳跃问题的思路与方法。给定一组不同高度,要求从最低点跳跃到最高点,每次跳跃的距离不超过D,并且不能改变给定的顺序。通过建立差分约束系统,将问题转化为图的建立和查询距离的问题。文章详细介绍了建立约束条件的方法,并使用SPFA算法判环并输出结果。同时还讨论了建边方向和跳跃顺序的关系。 ... [详细]
  • 知识图谱——机器大脑中的知识库
    本文介绍了知识图谱在机器大脑中的应用,以及搜索引擎在知识图谱方面的发展。以谷歌知识图谱为例,说明了知识图谱的智能化特点。通过搜索引擎用户可以获取更加智能化的答案,如搜索关键词"Marie Curie",会得到居里夫人的详细信息以及与之相关的历史人物。知识图谱的出现引起了搜索引擎行业的变革,不仅美国的微软必应,中国的百度、搜狗等搜索引擎公司也纷纷推出了自己的知识图谱。 ... [详细]
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
  • Python正则表达式学习记录及常用方法
    本文记录了学习Python正则表达式的过程,介绍了re模块的常用方法re.search,并解释了rawstring的作用。正则表达式是一种方便检查字符串匹配模式的工具,通过本文的学习可以掌握Python中使用正则表达式的基本方法。 ... [详细]
  • Linux如何安装Mongodb的详细步骤和注意事项
    本文介绍了Linux如何安装Mongodb的详细步骤和注意事项,同时介绍了Mongodb的特点和优势。Mongodb是一个开源的数据库,适用于各种规模的企业和各类应用程序。它具有灵活的数据模式和高性能的数据读写操作,能够提高企业的敏捷性和可扩展性。文章还提供了Mongodb的下载安装包地址。 ... [详细]
  • Python SQLAlchemy库的使用方法详解
    本文详细介绍了Python中使用SQLAlchemy库的方法。首先对SQLAlchemy进行了简介,包括其定义、适用的数据库类型等。然后讨论了SQLAlchemy提供的两种主要使用模式,即SQL表达式语言和ORM。针对不同的需求,给出了选择哪种模式的建议。最后,介绍了连接数据库的方法,包括创建SQLAlchemy引擎和执行SQL语句的接口。 ... [详细]
  • 本文主要复习了数据库的一些知识点,包括环境变量设置、表之间的引用关系等。同时介绍了一些常用的数据库命令及其使用方法,如创建数据库、查看已存在的数据库、切换数据库、创建表等操作。通过本文的学习,可以加深对数据库的理解和应用能力。 ... [详细]
  • MySQL语句大全:创建、授权、查询、修改等【MySQL】的使用方法详解
    本文详细介绍了MySQL语句的使用方法,包括创建用户、授权、查询、修改等操作。通过连接MySQL数据库,可以使用命令创建用户,并指定该用户在哪个主机上可以登录。同时,还可以设置用户的登录密码。通过本文,您可以全面了解MySQL语句的使用方法。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 本文介绍了一种轻巧方便的工具——集算器,通过使用集算器可以将文本日志变成结构化数据,然后可以使用SQL式查询。集算器利用集算语言的优点,将日志内容结构化为数据表结构,SPL支持直接对结构化的文件进行SQL查询,不再需要安装配置第三方数据库软件。本文还详细介绍了具体的实施过程。 ... [详细]
author-avatar
mobiledu2502889253
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有