热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

在单个后台线程定期修改它的同时读取Map

如何解决《在单个后台线程定期修改它的同时读取Map》经验,为你挑选了1个好方法。

我有一个类,我liveSocketsByDatacenterupdateLiveSockets()方法中每30秒从一个后台线程填充一个映射,然后我有一个方法getNextSocket(),将由多个读取器线程调用以获得一个可用的实时套接字,它使用相同的映射来获取此信息.

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final AtomicReference>> liveSocketsByDatacenter =
      new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
  private final ZContext ctx = new ZContext();

  // Lazy Loaded Singleton Pattern
  private static class Holder {
    private static final SocketManager instance = new SocketManager();
  }

  public static SocketManager getInstance() {
    return Holder.instance;
  }

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets();
      }
    }, 30, 30, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map> socketsByDatacenter = Utils.SERVERS;
    // The map in which I put all the live sockets
    Map> updatedLiveSocketsByDatacenter = new HashMap<>();
    for (Map.Entry> entry : socketsByDatacenter.entrySet()) {
      List addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
      updatedLiveSocketsByDatacenter.put(entry.getKey(),
          Collections.unmodifiableList(addedColoSockets));
    }
    // Update the map content
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
  }

  private List connect(Datacenters colo, List addresses, int socketType) {
    List socketList = new ArrayList<>();
    for (String address : addresses) {
      try {
        Socket client = ctx.createSocket(socketType);
        // Set random identity to make tracing easier
        String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
        client.setIdentity(identity.getBytes(ZMQ.CHARSET));
        client.setTCPKeepAlive(1);
        client.setSendTimeOut(7);
        client.setLinger(0);
        client.connect(address);

        SocketHolder zmq = new SocketHolder(client, ctx, address, true);
        socketList.add(zmq);
      } catch (Exception ex) {
        // log error
      }
    }
    return socketList;
  }

  // this method will be called by multiple threads to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional getNextSocket() {
    // For the sake of consistency make sure to use the same map instance
    // in the whole implementation of my method by getting my entries
    // from the local variable instead of the member variable
    Map> liveSocketsByDatacenter =
        this.liveSocketsByDatacenter.get();
    Optional liveSocket = Optional.absent();
    List dcs = Datacenters.getOrderedDatacenters();
    for (Datacenters dc : dcs) {
      liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        break;
      }
    }
    return liveSocket;
  }

  // is there any concurrency or thread safety issue or race condition here?
  private Optional getLiveSocketX(final List endpoints) {
    if (!CollectionUtils.isEmpty(endpoints)) {
      // The list of live sockets
      List liveOnly= new ArrayList<>(endpoints.size());
      for (SocketHolder obj : endpoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        Collections.shuffle(liveOnly);
        return Optional.of(liveOnly.get(0));
      }
    }
    return Optional.absent();
  }

  // Added the modifier synchronized to prevent concurrent modification
  // it is needed because to build the new map we first need to get the
  // old one so both must be done atomically to prevent concistency issues
  private synchronized void updateLiveSockets() {
    Map> socketsByDatacenter = Utils.SERVERS;

    // Initialize my new map with the current map content
    Map> liveSocketsByDatacenter =
        new HashMap<>(this.liveSocketsByDatacenter.get());

    for (Entry> entry : socketsByDatacenter.entrySet()) {
      List liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) { // LINE A
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
        boolean isLive = (status) ? true : false;
        // is there any problem the way I am using `SocketHolder` class?
        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(),
          Collections.unmodifiableList(liveUpdatedSockets));
    }
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
  }
}

正如你在我班上看到的那样:

从一个每30秒运行一次的后台线程,我liveSocketsByDatacenter用方法中的所有实时套接字填充地图updateLiveSockets().

然后从多个线程,我调用该getNextSocket()方法给我一个可用的实时套接字,它使用一个liveSocketsByDatacentermap来获取所需的信息.

我的代码工作正常,没有任何问题,并希望看看是否有更好或更有效的方法来编写它.我还希望得到关于线程安全问题或任何竞争条件的意见,如果有的话,但到目前为止我还没有看到任何,但我可能是错的.

我主要担心的是updateLiveSockets()方法和getLiveSocketX()方法.我迭代liveSockets这是一个ListSocketHolder在A线,然后产生一个新的SocketHolder对象,并加入到另一个新的列表.这可以吗?

注意: SocketHolder是一个不可变的类.你可以忽略ZeroMQ我拥有的东西.



1> bowmore..:

您使用以下同步技术.

    带有实时套接字数据的映射位于原子引用之后,这样可以安全地切换映射.

    updateLiveSockets()方法是同步的(隐式地),这将防止同时通过两个线程切换映射.

    如果在getNextSocket()方法期间发生切换,则在使用时对地图进行本地引用以避免混淆.

它现在是线程安全吗?

线程安全始终取决于共享可变数据是否存在正确的同步.在这种情况下,共享可变数据是数据中心到其SocketHolders列表的映射.

地图位于a AtomicReference中并制作本地副本以供使用的事实是地图上的足够同步.您的方法采用地图版本并使用它,由于其性质,切换版本是线程安全的AtomicReference.这也可以通过为地图创建成员字段来实现volatile,因为您所做的只是更新引用(您不对其执行任何check-then-act操作).

由于scheduleAtFixedRate()保证传递Runnable不会与自身同时运行,因此不需要synchronizedon updateLiveSockets(),但是,它也没有任何真正的伤害.

所以,是的,这个类是线程安全的,因为它是.

但是,并不完全清楚是否SocketHolder可以同时使用多个线程.实际上,这个类只是试图SocketHolder通过选择一个随机的实时来最小化s的并发使用(不需要随机抽取整个数组来选择一个随机索引).它实际上没有阻止并发使用.

可以提高效率吗?

我相信它可以.在查看updateLiveSockets()方法时,它似乎构建完全相同的映射,除了SocketHolders可能具有不同的isLive标志值.这使我得出结论,我只想切换地图中的每个列表,而不是切换整个地图.并且为了以线程安全的方式更改地图中的条目,我可以使用ConcurrentHashMap.

如果我使用了一个ConcurrentHashMap,并且不切换地图,而是地图中的值,我就可以摆脱了AtomicReference.

要更改映射,我可以构建新列表并将其直接放入映射中.这更有效率,因为我更快地发布数据,并且我创建了更少的对象,而我的同步只是建立在现成的组件上,这有利于可读性.

这是我的构建(为简洁起见,省略了一些不太相关的部分)

public class SocketManager {
    private static final Random random = new Random();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final Map> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap
    private final ZContext ctx = new ZContext();

    // ...

    private SocketManager() {
      connectToZMQSockets();
      scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
    }

    // during startup, making a connection and populate once
    private void connectToZMQSockets() {
      Map> socketsByDatacenter = Utils.SERVERS;
      for (Map.Entry> entry : socketsByDatacenter.entrySet()) {
        List addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
        liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map
      }
    }

    // ...      

    // this method will be called by multiple threads to get the next live socket
    // is there any concurrency or thread safety issue or race condition here?
    public Optional getNextSocket() {
      for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
        Optional liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List
        if (liveSocket.isPresent()) {
          return liveSocket;
        }
      }
      return Optional.absent();
    }

    // is there any concurrency or thread safety issue or race condition here?
    private Optional getLiveSocket(final List listOfEndPoints) {
      if (!CollectionUtils.isEmpty(listOfEndPoints)) {
        // The list of live sockets
        List liveOnly= new ArrayList<>(listOfEndPoints.size());
        for (SocketHolder obj : listOfEndPoints) {
          if (obj.isLive()) {
            liveOnly.add(obj);
          }
        }
        if (!liveOnly.isEmpty()) {
          // The list is not empty so we shuffle it an return the first element
          return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
        }
      }
      return Optional.absent();
    }

    // no need to make this synchronized
    private void updateLiveSockets() {
      Map> socketsByDatacenter = Utils.SERVERS;

      for (Map.Entry> entry : socketsByDatacenter.entrySet()) {
        List liveSockets = liveSocketsByDatacenter.get(entry.getKey());
        List liveUpdatedSockets = new ArrayList<>();
        for (SocketHolder liveSocket : liveSockets) { // LINE A
          Socket socket = liveSocket.getSocket();
          String endpoint = liveSocket.getEndpoint();
          Map holder = populateMap();
          Message message = new Message(holder, Partition.COMMAND);

          boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
          boolean isLive = (status) ? true : false;

          SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
          liveUpdatedSockets.add(zmq);
        }
        liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner.
      }
    }

}


推荐阅读
  • 本文介绍了C#中生成随机数的三种方法,并分析了其中存在的问题。首先介绍了使用Random类生成随机数的默认方法,但在高并发情况下可能会出现重复的情况。接着通过循环生成了一系列随机数,进一步突显了这个问题。文章指出,随机数生成在任何编程语言中都是必备的功能,但Random类生成的随机数并不可靠。最后,提出了需要寻找其他可靠的随机数生成方法的建议。 ... [详细]
  • 本文讨论了编写可保护的代码的重要性,包括提高代码的可读性、可调试性和直观性。同时介绍了优化代码的方法,如代码格式化、解释函数和提炼函数等。还提到了一些常见的坏代码味道,如不规范的命名、重复代码、过长的函数和参数列表等。最后,介绍了如何处理数据泥团和进行函数重构,以提高代码质量和可维护性。 ... [详细]
  • 如何查询zone下的表的信息
    本文介绍了如何通过TcaplusDB知识库查询zone下的表的信息。包括请求地址、GET请求参数说明、返回参数说明等内容。通过curl方法发起请求,并提供了请求示例。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • centos安装Mysql的方法及步骤详解
    本文介绍了centos安装Mysql的两种方式:rpm方式和绿色方式安装,详细介绍了安装所需的软件包以及安装过程中的注意事项,包括检查是否安装成功的方法。通过本文,读者可以了解到在centos系统上如何正确安装Mysql。 ... [详细]
  • 开源Keras Faster RCNN模型介绍及代码结构解析
    本文介绍了开源Keras Faster RCNN模型的环境需求和代码结构,包括FasterRCNN源码解析、RPN与classifier定义、data_generators.py文件的功能以及损失计算。同时提供了该模型的开源地址和安装所需的库。 ... [详细]
  • 本文介绍了安全性要求高的真正密码随机数生成器的概念和原理。首先解释了统计学意义上的伪随机数和真随机数的区别,以及伪随机数在密码学安全中的应用。然后讨论了真随机数的定义和产生方法,并指出了实际情况下真随机数的不可预测性和复杂性。最后介绍了随机数生成器的概念和方法。 ... [详细]
  • 关于如何快速定义自己的数据集,可以参考我的前一篇文章PyTorch中快速加载自定义数据(入门)_晨曦473的博客-CSDN博客刚开始学习P ... [详细]
  • java 单链表和双_Java链表,单链表和双链表
    Java-链表1、什么是链表?2、链表的特点是什么?3、链表的实现原理?4、如何自己写出一个链表?1、什么是链表࿱ ... [详细]
  • quartus管脚分配后需要保存吗_嵌入式必须会的一些硬件面试题,要试一试吗?你过来呀!...
    1、下面是一些基本的数字电路知识问题,请简要回答之。(1)什么是Setup和Hold时间?答:SetupHoldTime用于测试芯片对输入 ... [详细]
  • python网络编程 day27 网络编程初识 ——socket
    一、内容回顾1、两个内置函数(装饰器)及类中的魔术方法 ... [详细]
  • Docker 快速入门指引
    本文最早发表于本人博客:Docker快速入门指引Docker是什么?Docker是Docker.Inc公司开源的一个基于LXC技术之上构建的Container容器引擎,基于Go语言 ... [详细]
  • java local socket_网络通信之Socket与LocalSocket的比较
    Socket与LocalSocket都可以实现网络通信,两个有什么区别呢?LocalSocket其通信方式与Socket差不多,只是Loc ... [详细]
  • redis使用日志(4):如何让外部服务器访问
    开启redis允许外网IP访问在Linux中安装了redis服务,当在客户端通过远程连接的方式连接时,报couldnotconnect错误。错误的原因很 ... [详细]
author-avatar
森南有鹿63N
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有