热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

分布式事务柔性事务解决方案:可靠消息最终一致性(异步确保型)——三、生产者实战

建议简单看看上一篇文章再往下阅读我们的项目就基于这个模型:接下来就到了我们的实战时刻~项目基于springcloud编写,没有springcloud基础看起来可能有一

建议简单看看上一篇文章再往下阅读

我们的项目就基于这个模型:

这里写图片描述

接下来就到了我们的实战时刻~

项目基于spring cloud编写,没有spring cloud基础看起来可能有一点点费力。

准备阶段:定义可靠消息接口

package com.anur.messageapi.api;


import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;

import java.util.Map;

/** * Created by Anur IjuoKaruKas on 2018/5/8 */
public interface TransactionMsgApi {

    /** * 预发送消息,先将消息保存到消息中心 */
    @RequestMapping(value = "prepare", method = RequestMethod.GET)
    int prepareMsg(
            @RequestParam("id") String id,
            @RequestParam("msg") String msg,
            @RequestParam("routingKey") String routingKey,
            @RequestParam("exchange") String exchange,
            @RequestParam("paramMap") String paramMap,
            @RequestParam("artist") String artist);

    /** * 生产者确认消息可投递 */
    @RequestMapping(value = "confirm", method = RequestMethod.GET)
    int confirmMsgToSend(@RequestParam("id") String id, @RequestParam("caller") String caller);

    /** * 向队列投递消息 */
    @RequestMapping(value = "send", method = RequestMethod.GET)
    void sendMsg(@RequestParam("id") String id);

    /** * 消费者确认消费成功 */
    @RequestMapping(value = "ack", method = RequestMethod.GET)
    int acknowledgement(@RequestParam("id") String id,
                        @RequestParam("artist") String artist);
}

我们先忽略后面的两个接口,先看第一个,一共有六个参数

  • id:消息的id,这个设计其实很自由,可以在可靠消息服务中生成,也可以在生产者端生成,本项目选择在生产者端生成。
  • msg:消息的主体,可以是普通的字符串,也可以是对象
  • routingKey:路由键,发送消息时用(不懂的可以去看看MQ基础)
  • exchange:交换器,发送消息时用(不懂的可以去看看MQ基础)
  • paramMap:可靠消息服务回查时用,比如说我一个消息发送到可靠消息服务,结果没确认,可靠消息服务就根据这个paramMap进行消息的回查,向生产者查询这个业务到底执行成功了没。
  • artist:回调(回查)地址,在springCloud中,其实就是serverName

具体场景解析:订单服务

一、创建预发送消息,并将其保存到数据库

我们首先生成一条消息,我们往paramMap中指定了,我们这个订单的订单id是orderId,消息内容我瞎写的,这条消息要保存到数据库(它的作用是保证消息一定被可靠消息接收并持久化)

        String routingKey = "test.key.testing";
        Map map = new HashMap<>();

        String orderId = UUID.randomUUID().toString() + System.currentTimeMillis();
        map.put("id", orderId);
        String mapStr = JSON.toJSONString(map);

        TestMsg testMsg = new TestMsg();
        testMsg.setContent("这是一条测试消息");
        String testMsgStr = JSON.toJSONString(testMsg);
        // ===============================

        // 要保存到数据库(它的作用是保证消息一定被可靠消息接收并持久化)
        PrepareMsg prepareMsg = prepareMsgService.genMsg(orderId, testMsgStr, routingKey, Constant.TEST_EXCHANGE, mapStr);

二、异步发送这条消息,将其标记为预发送

异步发送了一条【预发送】消息给消息可靠消息服务

        Future future = prepareMsgService.prepareMsg(prepareMsg);


// 下面是prepareMsg的实现

    @Async
    @Override
    public Future prepareMsg(PrepareMsg prepareMsg) {
    // 调用我们刚才在【准备阶段】定义的接口
        int result = transactionMsgService.prepareMsg(prepareMsg.getId(), prepareMsg.getMsg(), prepareMsg.getRoutingKey(), prepareMsg.getExchange(), prepareMsg.getParamMap(), artistConfiguration.getArtist());
    // 如果调用成功,删除刚才本地保存的数据库
        if (result == 1) {
            prepareMsgMapper.deleteByPrimaryKey(prepareMsg.getId());
        }
        return new AsyncResult<>(result);
    }

三、执行业务

你可以把下面那些想象成处理订单状态,上面的这个步骤是有事务的,也就是说:

  • 如果执行失败,我们的可靠消息服务只会收到一条预发送的消息,保证了操作的原子性。
  • 或者执行成功,但没有及时向可靠消息服务发送,这种情况往下看,先忽略它。
        ///////////// 事务
        ProviderOrder providerOrder = new ProviderOrder();
        providerOrder.setId(orderId);
        providerOrderService.save(providerOrder);
        ///////////// 事务

四、异步告知可靠消息服务,业务处理成功,将刚才预发送的消息标记为待发送

       // 确认消息可以被发送
        if (future.get() == 1) {
            prepareMsgService.confirmMsgToSend(orderId, this.getClass().getSimpleName());
        }

Extra、异常情况

1、执行成功,但没有及时向可靠消息服务发送通知。

这时候我们的artist和paramMap就发挥作用了,我们的可靠消息服务,可以拿着这两个东西,定时向生产者查询那些没有被标记为【待发送】的消息。比如说这样:

        // 这里是可靠消息服务
        String url = String.format("http://%s/check?", transactionMsg.getCreater());
        Map paramMap = JSON.parseObject(transactionMsg.getParamMap(), new TypeReference>() {
        });

        StringBuilder sb = new StringBuilder();

        for (Map.Entry stringStringEntry : paramMap.entrySet()) {
            sb.append(stringStringEntry.getKey()).append("=").append(stringStringEntry.getValue()).append("&");
        }

        sb.deleteCharAt(sb.length() - 1);

        // 结果为true,代表这条消息的业务执行成功了,可自助将消息状态标记为【待发送】
        // 反之执行失败
        resultBoolean = restTemplate.getForObject(url + sb, boolean.class);
2、执行失败,也没有及时向可靠消息服务发送通知。

这个情况并不影响,因为可靠消息服务会回查,发现消息没有执行成功,不会将消息投递出去。

这里要注意,每条消息最好设置一个查询次数的限制

3、预发送失败,业务执行成功

这时候我们在第一步事先存储的消息就发挥作用了,这里只要写一个定时任务,向可靠消息服务定时投递即可。这里要注意可靠消息服务的幂等性。

由于消息id是由生产者指定,所以即使可靠消息服务收到了重复的创建【预发送】的消息,插入数据库也是会失败的。

    @Scheduled(cron = "*/1 * * * * *")
    public void checkPrepareMsg() {
        List prepareMsgList = prepareMsgService.getUnConfirmList();
        if (prepareMsgList.size() > 0) {
            System.out.println("消息重发中");
        }
        for (PrepareMsg prepareMsg : prepareMsgList) {
            prepareMsgService.prepareMsg(prepareMsg);
        }
    }

Github – > 可靠消息服务 example


推荐阅读
  • 本文详细探讨了Java集合框架的使用方法及其性能特点。首先,通过关系图展示了集合接口之间的层次结构,如`Collection`接口作为对象集合的基础,其下分为`List`、`Set`和`Queue`等子接口。其中,`List`接口支持按插入顺序保存元素且允许重复,而`Set`接口则确保元素唯一性。此外,文章还深入分析了不同集合类在实际应用中的性能表现,为开发者选择合适的集合类型提供了参考依据。 ... [详细]
  • 在Spring与Ibatis集成的环境中,通过Spring AOP配置事务管理至服务层。当在一个服务方法中引入自定义多线程时,发现事务管理功能失效。若不使用多线程,事务管理则能正常工作。本文深入分析了这一现象背后的潜在风险,并探讨了可能的解决方案,以确保事务一致性和线程安全。 ... [详细]
  • 2019年后蚂蚁集团与拼多多面试经验详述与深度剖析
    2019年后蚂蚁集团与拼多多面试经验详述与深度剖析 ... [详细]
  • PHP中元素的计量单位是什么? ... [详细]
  • 本项目在Java Maven框架下,利用POI库实现了Excel数据的高效导入与导出功能。通过优化数据处理流程,提升了数据操作的性能和稳定性。项目已发布至GitHub,当前最新版本为0.0.5。该项目不仅适用于小型应用,也可扩展用于大型企业级系统,提供了灵活的数据管理解决方案。GitHub地址:https://github.com/83945105/holygrail,Maven坐标:`com.github.83945105:holygrail:0.0.5`。 ... [详细]
  • Spring框架入门指南:专为新手打造的详细学习笔记
    Spring框架是Java Web开发中广泛应用的轻量级应用框架,以其卓越的功能和出色的性能赢得了广大开发者的青睐。本文为初学者提供了详尽的学习指南,涵盖基础概念、核心组件及实际应用案例,帮助新手快速掌握Spring框架的核心技术与实践技巧。 ... [详细]
  • 本文深入探讨了Python线程池的内部实现机制,作为对Apshceduler调度器研究的延伸。在先前关于Apshceduler源码分析的文章中,我们提到调度器通过`def_do_submit_`函数将任务提交到线程池。本文将进一步解析线程池的工作原理,包括任务分配、线程管理及性能优化等方面,为读者提供更全面的技术理解。 ... [详细]
  • 本文详细解析了 MySQL 5.7.20 版本中二进制日志(binlog)崩溃恢复机制的工作流程。假设使用 InnoDB 存储引擎,并且启用了 `sync_binlog=1` 配置,文章深入探讨了在系统崩溃后如何通过 binlog 进行数据恢复,确保数据的一致性和完整性。 ... [详细]
  • Java 零基础入门:SQL Server 学习笔记(第21篇)
    Java 零基础入门:SQL Server 学习笔记(第21篇) ... [详细]
  • MySQL性能优化与调参指南【数据库管理】
    本文详细探讨了MySQL数据库的性能优化与参数调整技巧,旨在帮助数据库管理员和开发人员提升系统的运行效率。内容涵盖索引优化、查询优化、配置参数调整等方面,结合实际案例进行深入分析,提供实用的操作建议。此外,还介绍了常见的性能监控工具和方法,助力读者全面掌握MySQL性能优化的核心技能。 ... [详细]
  • 如何在Java中高效构建WebService
    本文介绍了如何利用XFire框架在Java中高效构建WebService。XFire是一个轻量级、高性能的Java SOAP框架,能够简化WebService的开发流程。通过结合MyEclipse集成开发环境,开发者可以更便捷地进行项目配置和代码编写,从而提高开发效率。此外,文章还详细探讨了XFire的关键特性和最佳实践,为读者提供了实用的参考。 ... [详细]
  • 本文深入探讨了 Python Watchdog 库的使用方法和应用场景。通过详细的代码示例,展示了如何利用 Watchdog 监控文件系统的变化,包括文件的创建、修改和删除等操作。文章不仅介绍了 Watchdog 的基本功能,还探讨了其在实际项目中的高级应用,如日志监控和自动化任务触发。读者将能够全面了解 Watchdog 的工作原理及其在不同场景下的应用技巧。 ... [详细]
  • 在Spring框架中,基于Schema的异常通知与环绕通知的实现方法具有重要的实践价值。首先,对于异常通知,需要创建一个实现ThrowsAdvice接口的通知类。尽管ThrowsAdvice接口本身不包含任何方法,但开发者需自定义方法来处理异常情况。此外,环绕通知则通过实现MethodInterceptor接口来实现,允许在方法调用前后执行特定逻辑,从而增强功能或进行必要的控制。这两种通知机制的结合使用,能够有效提升应用程序的健壮性和灵活性。 ... [详细]
  • Java 8 引入了 Stream API,这一新特性极大地增强了集合数据的处理能力。通过 Stream API,开发者可以更加高效、简洁地进行集合数据的遍历、过滤和转换操作。本文将详细解析 Stream API 的核心概念和常见用法,帮助读者更好地理解和应用这一强大的工具。 ... [详细]
  • 深入解析JWT的实现与应用
    本文深入探讨了JSON Web Token (JWT) 的实现机制及其应用场景。JWT 是一种基于 RFC 7519 标准的开放性认证协议,用于在各方之间安全地传输信息。文章详细分析了 JWT 的结构、生成和验证过程,并讨论了其在现代 Web 应用中的实际应用案例,为开发者提供了全面的理解和实践指导。 ... [详细]
author-avatar
jimscloudy
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有