热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

grpc(三)之grpc客户端使用连接池

本文使用commons-pool2来实现连接池应用1、定义一个产生连接池的工厂,需要继承BasePooledObjectFactory,其用处是生产和销毁连接池中保存的对象。根据需求,现在池子

  本文使用commons-pool2来实现连接池应用

1、定义一个产生连接池的工厂,需要继承BasePooledObjectFactory,其用处是生产和销毁连接池中保存的对象。根据需求,现在池子里保存的应该是grpc客户端对象。

  GrpcClientFactory类:

package com.oy.grpc;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import com.oy.grpc.client.GrpcClient;
import com.oy.utils.UtilFunctions;

public class GrpcClientFactory extends BasePooledObjectFactory {

    @Override
    public GrpcClient create() throws Exception {
        return new GrpcClient("localhost", 23333);
    }

    @Override
    public PooledObject wrap(GrpcClient client) {
        return new DefaultPooledObject<>(client);
    }

    @Override
    public void destroyObject(PooledObject p) throws Exception {
        UtilFunctions.log.info("==== GrpcClientFactory#destroyObject ====");
        p.getObject().shutdown();
        super.destroyObject(p);
    }

}

 

2、连接池GrpcClientPool类

package com.oy.grpc;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import com.oy.grpc.client.GrpcClient;
import com.oy.utils.UtilFunctions;

public class GrpcClientPool {

    private static GenericObjectPool objectPool = null;

    static {
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        // 池中的最大连接数
        poolConfig.setMaxTotal(8);
        // 最少的空闲连接数
        poolConfig.setMinIdle(0);
        // 最多的空闲连接数
        poolConfig.setMaxIdle(8);
        // 当连接池资源耗尽时,调用者最大阻塞的时间,超时时抛出异常 单位:毫秒数
        poolConfig.setMaxWaitMillis(-1);
        // 连接池存放池化对象方式,true放在空闲队列最前面,false放在空闲队列最后
        poolConfig.setLifo(true);
        // 连接空闲的最小时间,达到此值后空闲连接可能会被移除,默认即为30分钟
        poolConfig.setMinEvictableIdleTimeMillis(1000L * 60L * 30L);// 连接耗尽时是否阻塞,默认为true
        poolConfig.setBlockWhenExhausted(true);
        objectPool = new GenericObjectPool<>(new GrpcClientFactory(), poolConfig);
    }

    public static GrpcClient borrowObject() {
        try {
            GrpcClient client = objectPool.borrowObject();
            UtilFunctions.log.info("=======total threads created: " + objectPool.getCreatedCount());
            return client;
        } catch (Exception e) {
            UtilFunctions.log.error("objectPool.borrowObject error, msg:{}, exception:{}", e.toString(), e);
        }
        return createClient();
    }

    public static void returnObject(GrpcClient client) {
        try {
            objectPool.returnObject(client);
        } catch (Exception e) {
            UtilFunctions.log.error("objectPool.returnObject error, msg:{}, exception:{}", e.toString(), e);
        }
    }

    private static GrpcClient createClient() {
        return new GrpcClient("localhost", 23333);
    }

}

 

3、客户端程序

  这里仅仅简单列出了客户端GrpcClient类的代码,其他代码包括服务端代码见另一篇博客grpc(一)之helloworld。

package com.oy.grpc.client;

import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
import com.oy.grpc.BookServiceGrpc;
import com.oy.grpc.GrpcClientPool;
import com.oy.grpc.GrpcLib.GrpcReply;
import com.oy.grpc.GrpcLib.addBookRequest;
import com.oy.grpc.GrpcLib.getUserByIdRequest;
import com.oy.grpc.UserServiceGrpc;
import com.oy.utils.UtilFunctions;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;

public class GrpcClient {
    public static String host = "localhost";
    private final ManagedChannel channel;
    private final UserServiceGrpc.UserServiceBlockingStub userBlockingStub;
    private final BookServiceGrpc.BookServiceBlockingStub bookBlockingStub;

    public GrpcClient(String host, int port) {
        channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
        userBlockingStub = UserServiceGrpc.newBlockingStub(channel);
        bookBlockingStub = BookServiceGrpc.newBlockingStub(channel);
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(10, TimeUnit.SECONDS);
    }

    @SuppressWarnings({ "rawtypes" })
    public static Object call(String rpcMethoddName, Object... args) throws Exception {
        UtilFunctions.log.info("=========== GrpcClient#call begin ===========");
        GrpcClient client = null;
        try {
            client = GrpcClientPool.borrowObject();
            // client = new GrpcClient(host, 23333);

            Class[] argsTypes = new Class[args.length];
            for (int i = 0; i ) {
                UtilFunctions.log.info("args types: {}", args[i].getClass());
                argsTypes[i] = args[i].getClass();
            }
            Method method = client.getClass().getMethod(rpcMethoddName, argsTypes);
            Object result = method.invoke(client, args);
            UtilFunctions.log.info("=========== GrpcClient#call end ===========");
            return result;
        } catch (Exception e) {
            UtilFunctions.log.error("GrpcClient#call error, msg:{}, exception:{}", e.toString(), e);
            return null;
        } finally {
            if (client != null) {
                GrpcClientPool.returnObject(client);
                // client.shutdown();
            }
        }
    }

    // ============= User module =============
    public Object getUserById(Integer id) {
        UtilFunctions.log.info("=========== GrpcClient#getUserById begin ===========");
        getUserByIdRequest request = getUserByIdRequest.newBuilder().setId(id).build();
        GrpcReply response;
        try {
            response = userBlockingStub.getUserById(request);
            UtilFunctions.log.info("GrpcClient#getUserById response, code:{}, data:{}", response.getCode(),
                    response.getData());
        } catch (StatusRuntimeException e) {
            UtilFunctions.log.error("GrpcClient#addBook error, msg:{}, exception:{}", e.toString(), e);
            return null;
        }
        return response;
    }

    // ============= Book module =============
    public Object addBook(Integer id, String name, Double price) {
        UtilFunctions.log.info("=========== GrpcClient#addBook begin ===========");
        addBookRequest request = addBookRequest.newBuilder().setId(id).setName(name).setPrice(price).build();
        GrpcReply response;
        try {
            response = bookBlockingStub.addBook(request);
            UtilFunctions.log.info("GrpcClient#addBook response, code:{}, data:{}", response.getCode(),
                    response.getData());
            UtilFunctions.log.info("=========== GrpcClient#addBook end ===========");
        } catch (StatusRuntimeException e) {
            UtilFunctions.log.error("GrpcClient#addBook error, msg:{}, exception:{}", e.toString(), e);
            return null;
        }
        return response;
    }

}

 

4、客户端测试

package com.oy.grpc.client;

import com.oy.grpc.GrpcClientPool;
import com.oy.grpc.GrpcLib.GrpcReply;
import com.oy.utils.UtilFunctions;

public class TestService {

    public static void main(String[] args) throws Exception {

        for (int i = 0; i <4; i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    GrpcReply result = null;
                    try {
                        // result = (GrpcReply) GrpcClient.call("getUserById", Integer.valueOf("1"));
                        // result = (GrpcReply) GrpcClient.call("getUserById", 2);
                        result = (GrpcReply) GrpcClient.call("addBook", 1, "thinking in java", 50.0);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (result != null) {
                        UtilFunctions.log.info("client call interface, get code:{}, data:{}", result.getCode(),
                                result.getData());
                    }

            // 如果注释掉下面两句,则客户端程序结束后,服务端报java.io.IOException: 远程主机强迫关闭了一个现有的连接。
                    // UtilFunctions.log.info("TestService#main: objectPool is closing...");
                    // GrpcClientPool.getObjectPool().close();
                }
            }).start();
        }
    }
}

 

  运行testService类的main()方法,客户端能正常调用grpc server得到数据,但是grpc服务端报错:

2019-04-11 14:29:30.458  INFO 1192 --- [-worker-ELG-3-1] i.g.n.NettyServerTransport.connections   : Transport failed
java.io.IOException: 远程主机强迫关闭了一个现有的连接。

  出现这个问题的原因:客户端强制断开连接。参考https://stackoverflow.com/questions/46802521/io-grpc-netty-nettyservertransport-notifyterminated,

  

 

  我在GrpcClientFactory里面也实现了销毁方法:

@Override
public void destroyObject(PooledObject p) throws Exception {
    UtilFunctions.log.info("==== GrpcClientFactory#destroyObject ====");
    p.getObject().shutdown(); super.destroyObject(p);
}

  但是运行testService类的main()方法结束后服务端程序就结束了,程序没有主动调用destroyObject()方法销毁池子中的对象,所以grpcClient也没有shutdown,所以报错。

 

5、启动客户端springboot项目来测试

package com.oy;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.oy.grpc.client.TestService;

@SpringBootApplication
public class Grpc007ClientMainApplication {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(Grpc007ClientMainApplication.class, args);
        TestService.main(args);
    }

}

  但是这样当关闭客户端程序,还是出现同样的问题。其实很好理解,因为关闭客户端程序时,池中的对象还处于空闲状态,没有销毁,destroyObject()方法没有调用,所以grpcClient也没有shutdown

  

6、解决方法

  客户端程序关闭时,池也要close。

package com.oy;

import javax.annotation.PreDestroy;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.springframework.stereotype.Controller;
import com.oy.grpc.GrpcClientPool;
import com.oy.grpc.client.GrpcClient;
import com.oy.utils.UtilFunctions;

@Controller
public class InitController {

    @PreDestroy
    public void destroy() {
        UtilFunctions.log.info("InitController#destroy running...");
        
        GenericObjectPool objectPool = GrpcClientPool.getObjectPool();
        UtilFunctions.log.info("InitController#destroy, total threads created: " + objectPool.getCreatedCount());
        
        UtilFunctions.log.info("InitController#destroy objectPool is closing..."); objectPool.close();
    }
}

    

 


推荐阅读
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • Java自带的观察者模式及实现方法详解
    本文介绍了Java自带的观察者模式,包括Observer和Observable对象的定义和使用方法。通过添加观察者和设置内部标志位,当被观察者中的事件发生变化时,通知观察者对象并执行相应的操作。实现观察者模式非常简单,只需继承Observable类和实现Observer接口即可。详情请参考Java官方api文档。 ... [详细]
  • CEPH LIO iSCSI Gateway及其使用参考文档
    本文介绍了CEPH LIO iSCSI Gateway以及使用该网关的参考文档,包括Ceph Block Device、CEPH ISCSI GATEWAY、USING AN ISCSI GATEWAY等。同时提供了多个参考链接,详细介绍了CEPH LIO iSCSI Gateway的配置和使用方法。 ... [详细]
  • 图像因存在错误而无法显示 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • 标题: ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • Java在运行已编译完成的类时,是通过java虚拟机来装载和执行的,java虚拟机通过操作系统命令JAVA_HOMEbinjava–option来启 ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
author-avatar
verde公寓_401
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有