作者:森南有鹿63N | 来源:互联网 | 2023-01-11 09:44
我有一个类,我liveSocketsByDatacenter
在updateLiveSockets()
方法中每30秒从一个后台线程填充一个映射,然后我有一个方法getNextSocket()
,将由多个读取器线程调用以获得一个可用的实时套接字,它使用相同的映射来获取此信息.
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicReference
正如你在我班上看到的那样:
从一个每30秒运行一次的后台线程,我liveSocketsByDatacenter
用方法中的所有实时套接字填充地图updateLiveSockets()
.
然后从多个线程,我调用该getNextSocket()
方法给我一个可用的实时套接字,它使用一个liveSocketsByDatacenter
map来获取所需的信息.
我的代码工作正常,没有任何问题,并希望看看是否有更好或更有效的方法来编写它.我还希望得到关于线程安全问题或任何竞争条件的意见,如果有的话,但到目前为止我还没有看到任何,但我可能是错的.
我主要担心的是updateLiveSockets()
方法和getLiveSocketX()
方法.我迭代liveSockets
这是一个List
的SocketHolder
在A线,然后产生一个新的SocketHolder
对象,并加入到另一个新的列表.这可以吗?
注意: SocketHolder
是一个不可变的类.你可以忽略ZeroMQ
我拥有的东西.
1> bowmore..:
您使用以下同步技术.
带有实时套接字数据的映射位于原子引用之后,这样可以安全地切换映射.
该updateLiveSockets()
方法是同步的(隐式地),这将防止同时通过两个线程切换映射.
如果在getNextSocket()
方法期间发生切换,则在使用时对地图进行本地引用以避免混淆.
它现在是线程安全吗?
线程安全始终取决于共享可变数据是否存在正确的同步.在这种情况下,共享可变数据是数据中心到其SocketHolders列表的映射.
地图位于a AtomicReference
中并制作本地副本以供使用的事实是地图上的足够同步.您的方法采用地图版本并使用它,由于其性质,切换版本是线程安全的AtomicReference
.这也可以通过为地图创建成员字段来实现volatile
,因为您所做的只是更新引用(您不对其执行任何check-then-act操作).
由于scheduleAtFixedRate()
保证传递Runnable
不会与自身同时运行,因此不需要synchronized
on updateLiveSockets()
,但是,它也没有任何真正的伤害.
所以,是的,这个类是线程安全的,因为它是.
但是,并不完全清楚是否SocketHolder
可以同时使用多个线程.实际上,这个类只是试图SocketHolder
通过选择一个随机的实时来最小化s的并发使用(不需要随机抽取整个数组来选择一个随机索引).它实际上没有阻止并发使用.
可以提高效率吗?
我相信它可以.在查看updateLiveSockets()
方法时,它似乎构建完全相同的映射,除了SocketHolder
s可能具有不同的isLive
标志值.这使我得出结论,我只想切换地图中的每个列表,而不是切换整个地图.并且为了以线程安全的方式更改地图中的条目,我可以使用ConcurrentHashMap
.
如果我使用了一个ConcurrentHashMap
,并且不切换地图,而是地图中的值,我就可以摆脱了AtomicReference
.
要更改映射,我可以构建新列表并将其直接放入映射中.这更有效率,因为我更快地发布数据,并且我创建了更少的对象,而我的同步只是建立在现成的组件上,这有利于可读性.
这是我的构建(为简洁起见,省略了一些不太相关的部分)
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry> entry : socketsByDatacenter.entrySet()) {
List addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map
}
}
// ...
// this method will be called by multiple threads to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
// is there any concurrency or thread safety issue or race condition here?
private Optional getLiveSocket(final List listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List liveOnly= new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// no need to make this synchronized
private void updateLiveSockets() {
Map> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry> entry : socketsByDatacenter.entrySet()) {
List liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner.
}
}
}