热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

redismysql事务处理_SpringDataRedis事务处理

内容:1.Exactlyonce2.输出不重复一、SparkStreaming中确保数据处理Exactlyonce首先,说一下何为事物处理,

内容: 1.      Exactly once 2.      输出不重复     一、Spark Streaming中确保数据处理Exactly once 首先,说一下何为事物处理,事务性处理即数据一定会被处理且只被处理一次。(例如银行转账)。Spark Streaming可能会需要处理事务性的作业,例如Task输出

本文主要讲述如何在java里头使用redis进行cas操作。其实呢,redis不像memcached那样显示地支持cas操作,不过它有事务的概念。

准备

redis的乐观锁支持

Redis通过使用WATCH, MULTI, and EXEC组成的事务来实现乐观锁(注意没有用DISCARD),Redis事务没有回滚操作。在SpringDataRedis当中通过RedisTemplate的SessionCallback中来支持(否则事务不生效)。discard的话不需要自己代码处理,callback返回null,成的话,返回非null,依据这个来判断事务是否成功(没有抛异常)。

实例

@Test

public void cas() throws InterruptedException, ExecutionException {

String key = "test-cas-1";

ValueOperations strOps = redisTemplate.opsForValue();

strOps.set(key, "hello");

ExecutorService pool = Executors.newCachedThreadPool();

List> tasks &#61; new ArrayList<>();

for(int i&#61;0;i<5;i&#43;&#43;){

final int idx &#61; i;

tasks.add(new Callable() {

&#64;Override

public Object call() throws Exception {

return redisTemplate.execute(new SessionCallback() {

&#64;Override

public Object execute(RedisOperations operations) throws DataAccessException {

operations.watch(key);

String origin &#61; (String) operations.opsForValue().get(key);

operations.multi();

operations.opsForValue().set(key, origin &#43; idx);

Object rs &#61; operations.exec();

System.out.println("set:"&#43;origin&#43;idx&#43;" rs:"&#43;rs);

return rs;

}

});

}

});

}

List> futures &#61; pool.invokeAll(tasks);

for(Future f:futures){

System.out.println(f.get());

}

pool.shutdown();

pool.awaitTermination(1000, TimeUnit.MILLISECONDS);

}

输出事务处理在应用程序中起着至关重要的作用。本篇博客将进入Spring的事务处理学习。下面&#xff0c;首先我们来简单回想一下与事务有关的一些概念。【事务】    所谓事务&#xff0c;就是一系列必须成功的操作&#xff0c;只要有一步操作失败&#xff0c;所以其他步骤都将要撤销。当所有的步骤都

set:hello2 rs:null

set:hello3 rs:[]

set:hello1 rs:null

set:hello4 rs:null

set:hello0 rs:null

查看该值

127.0.0.1:6379> get test-cas-1

"\"hello3\""

SessionCallback

没有在SessionCallback里头执行watch、multi、exec&#xff0c;而是自己单独写

与数据库事务的混淆

template.setEnableTransactionSupport(true);

这个应该是支持数据库的事务成功才执行的意思。

/**

* Gets a Redis connection. Is aware of and will return any existing corresponding connections bound to the current

* thread, for example when using a transaction manager. Will create a new Connection otherwise, if

* {&#64;code allowCreate} is true.

*

* &#64;param factory connection factory for creating the connection

* &#64;param allowCreate whether a new (unbound) connection should be created when no connection can be found for the

* current thread

* &#64;param bind binds the connection to the thread, in case one was created

* &#64;param enableTransactionSupport

* &#64;return an active Redis connection

*/

public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,

boolean enableTransactionSupport) {

Assert.notNull(factory, "No RedisConnectionFactory specified");

RedisConnectionHolder connHolder &#61; (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);

if (connHolder !&#61; null) {

if (enableTransactionSupport) {

potentiallyRegisterTransactionSynchronisation(connHolder, factory);

}

return connHolder.getConnection();

}

if (!allowCreate) {

throw new IllegalArgumentException("No connection found and allowCreate &#61; false");

}

if (log.isDebugEnabled()) {

log.debug("Opening RedisConnection");

}

RedisConnection conn &#61; factory.getConnection();

if (bind) {

RedisConnection connectionToBind &#61; conn;

if (enableTransactionSupport && isActualNonReadonlyTransactionActive()) {

connectionToBind &#61; createConnectionProxy(conn, factory);

}

connHolder &#61; new RedisConnectionHolder(connectionToBind);

TransactionSynchronizationManager.bindResource(factory, connHolder);

if (enableTransactionSupport) {

potentiallyRegisterTransactionSynchronisation(connHolder, factory);

}

return connHolder.getConnection();

}

return conn;

}

不要跟本文的乐观锁说的事务混淆在一起。

参考



推荐阅读
author-avatar
simona2006_827
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有