内容: 1. Exactly once 2. 输出不重复 一、Spark Streaming中确保数据处理Exactly once 首先,说一下何为事物处理,事务性处理即数据一定会被处理且只被处理一次。(例如银行转账)。Spark Streaming可能会需要处理事务性的作业,例如Task输出
序
本文主要讲述如何在java里头使用redis进行cas操作。其实呢,redis不像memcached那样显示地支持cas操作,不过它有事务的概念。
准备
redis的乐观锁支持
Redis通过使用WATCH, MULTI, and EXEC组成的事务来实现乐观锁(注意没有用DISCARD),Redis事务没有回滚操作。在SpringDataRedis当中通过RedisTemplate的SessionCallback中来支持(否则事务不生效)。discard的话不需要自己代码处理,callback返回null,成的话,返回非null,依据这个来判断事务是否成功(没有抛异常)。
实例
@Test
public void cas() throws InterruptedException, ExecutionException {
String key = "test-cas-1";
ValueOperations strOps = redisTemplate.opsForValue();
strOps.set(key, "hello");
ExecutorService pool = Executors.newCachedThreadPool();
List> tasks &#61; new ArrayList<>();
for(int i&#61;0;i<5;i&#43;&#43;){
final int idx &#61; i;
tasks.add(new Callable() {
&#64;Override
public Object call() throws Exception {
return redisTemplate.execute(new SessionCallback() {
&#64;Override
public Object execute(RedisOperations operations) throws DataAccessException {
operations.watch(key);
String origin &#61; (String) operations.opsForValue().get(key);
operations.multi();
operations.opsForValue().set(key, origin &#43; idx);
Object rs &#61; operations.exec();
System.out.println("set:"&#43;origin&#43;idx&#43;" rs:"&#43;rs);
return rs;
}
});
}
});
}
List> futures &#61; pool.invokeAll(tasks);
for(Future f:futures){
System.out.println(f.get());
}
pool.shutdown();
pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
}
输出事务处理在应用程序中起着至关重要的作用。本篇博客将进入Spring的事务处理学习。下面&#xff0c;首先我们来简单回想一下与事务有关的一些概念。【事务】 所谓事务&#xff0c;就是一系列必须成功的操作&#xff0c;只要有一步操作失败&#xff0c;所以其他步骤都将要撤销。当所有的步骤都
set:hello2 rs:null
set:hello3 rs:[]
set:hello1 rs:null
set:hello4 rs:null
set:hello0 rs:null
查看该值
127.0.0.1:6379> get test-cas-1
"\"hello3\""
坑
SessionCallback
没有在SessionCallback里头执行watch、multi、exec&#xff0c;而是自己单独写
与数据库事务的混淆
template.setEnableTransactionSupport(true);
这个应该是支持数据库的事务成功才执行的意思。
/**
* Gets a Redis connection. Is aware of and will return any existing corresponding connections bound to the current
* thread, for example when using a transaction manager. Will create a new Connection otherwise, if
* {&#64;code allowCreate} is true.
*
* &#64;param factory connection factory for creating the connection
* &#64;param allowCreate whether a new (unbound) connection should be created when no connection can be found for the
* current thread
* &#64;param bind binds the connection to the thread, in case one was created
* &#64;param enableTransactionSupport
* &#64;return an active Redis connection
*/
public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
boolean enableTransactionSupport) {
Assert.notNull(factory, "No RedisConnectionFactory specified");
RedisConnectionHolder connHolder &#61; (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
if (connHolder !&#61; null) {
if (enableTransactionSupport) {
potentiallyRegisterTransactionSynchronisation(connHolder, factory);
}
return connHolder.getConnection();
}
if (!allowCreate) {
throw new IllegalArgumentException("No connection found and allowCreate &#61; false");
}
if (log.isDebugEnabled()) {
log.debug("Opening RedisConnection");
}
RedisConnection conn &#61; factory.getConnection();
if (bind) {
RedisConnection connectionToBind &#61; conn;
if (enableTransactionSupport && isActualNonReadonlyTransactionActive()) {
connectionToBind &#61; createConnectionProxy(conn, factory);
}
connHolder &#61; new RedisConnectionHolder(connectionToBind);
TransactionSynchronizationManager.bindResource(factory, connHolder);
if (enableTransactionSupport) {
potentiallyRegisterTransactionSynchronisation(connHolder, factory);
}
return connHolder.getConnection();
}
return conn;
}
不要跟本文的乐观锁说的事务混淆在一起。
参考