侧边栏壁纸
博主头像
林雷博主等级

斜月沉沉藏海雾,碣石潇湘无限路

  • 累计撰写 132 篇文章
  • 累计创建 47 个标签
  • 累计收到 3 条评论

目 录CONTENT

文章目录

14、Seata AT+SpringCloud+MyBatisPlus+Nacos进行分布式事务操作

林雷
2020-03-28 / 0 评论 / 0 点赞 / 507 阅读 / 19,888 字

一 使用AT模式的分布式事务案例

1.1 环境介绍

我这里使用的环境如下:

  • JDK8
  • IDEA
  • Spring Cloud
  • Seata
  • Nacos
  • MySQL5.7
  • MyBatis
  • MyBatis-Plus

我这里演示的功能如下图所示:
image-84184e55

  • 我这里有四个服务加一个公共工程
  • 当用户发起购买操作时,business-server发起购买操作,通过Feign Client调用storage-service库存服务,如果扣减没有问题则调用order-service订单服务,在order-service订单服务里面,先扣除资金账户的资金,如果扣除资金没有问题,则创建订单
  • 在整个过程如果有任何一方发生异常,则整个过程的数据不受影响,依然是用户购买前的数据
  • 整个过程基于Feign Client调用

对于Nacos和Seata Server的搭建过程在这里就不多做介绍了。Nacos可以看:
Nacos + Spring Cloud

对于Seata Server的搭建过程可以看:
Seata原理及搭建过程

我这里创建的项目如下:

  • common-service:公共的工程,存放公共的东西,比如全局异常处理的APO、XID传递的Interceptor、雪花算法工具类等。其他微服务都引用该工程
  • storage-basic:库存服务的父项目
    • storage-client:库存服务客户端信息。主要存放实体类、枚举、Feign Client等
    • storage-service:具体的服务业务
  • account-basic:资金账户服务的父项目
    • account-client:同上
    • account-service:同上
  • order-basic:订单服务的父项目
    • order-client:同上
    • order-service:同上
  • business-basic:业务服务的父项目
    • business-client:同上
    • business-service:同上

1.2 创建数据库

1.2.1 创建数据库

创建四个数据库,分别对应order服务、storage服务、account服务和business服务。

CREATE DATABASE `seata_business` DEFAULT CHARACTER SET utf8;

CREATE DATABASE `seata_storage` DEFAULT CHARACTER SET utf8;

CREATE DATABASE `seata_account` DEFAULT CHARACTER SET utf8;

CREATE DATABASE `seata_order` DEFAULT CHARACTER SET utf8;

1.2.2 创建表

在四个数据库上(所有涉及到的客户端服务的数据库)创建undo_log表,该表可以去Seata的GitHub上获取(注意版本即可):https://github.com/seata/seata/blob/develop/script/client/at/db/mysql.sql

1.1.0版本的undo_log如下:

CREATE TABLE IF NOT EXISTS `undo_log`
(
    `id`            BIGINT(20)   NOT NULL AUTO_INCREMENT COMMENT 'increment id',
    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME     NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME     NOT NULL COMMENT 'modify datetime',
    PRIMARY KEY (`id`),
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

然后就是创建业务表。首先创建business-service可以自行创建测试表,这里不做演示。在seata_order库上创建订单表:t_order:

CREATE TABLE `t_order` (
  `id` bigint(20) NOT NULL COMMENT '主键ID',
  `order_no` varchar(255) DEFAULT NULL COMMENT '订单编号',
  `user_id` varchar(255) DEFAULT NULL COMMENT '用户ID',
  `commodity_code` varchar(255) DEFAULT NULL COMMENT '商品Code',
  `count` int(11) DEFAULT '0' COMMENT '数量',
  `amount` decimal(14,2) DEFAULT '0.00' COMMENT '钱',
  `create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT NULL COMMENT '最后一次修改时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_orderNo_uniq` (`order_no`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

在seata-store库上创建库存表:t_storage:

CREATE TABLE `t_storage` (
  `id` bigint(20) NOT NULL COMMENT '主键ID',
  `commodity_code` varchar(255) DEFAULT NULL COMMENT '商品ID',
  `name` varchar(255) DEFAULT NULL COMMENT '名称',
  `count` int(11) DEFAULT '0' COMMENT '数量',
  `create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT NULL COMMENT '最后一个更新时间',
  `delete_status` int(4) NOT NULL DEFAULT '0' COMMENT '删除状态。0: 未删除; 1: 已删除',
  PRIMARY KEY (`id`),
  UNIQUE KEY `commodity_code` (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

在seata_account库里创建资金账户表:t_account:

CREATE TABLE `t_account` (
  `id` bigint(20) NOT NULL COMMENT '主键ID',
  `user_id` varchar(255) DEFAULT NULL COMMENT '用户ID',
  `amount` decimal(14,2) DEFAULT '0.00' COMMENT '资金',
  `create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT NULL COMMENT '最后一次更新时间',
  `delete_status` int(4) NOT NULL DEFAULT '0' COMMENT '删除状态。0: 未删除; 1: 已删除',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_userId_uniq` (`user_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

1.3 创建公共工程

公共工程为common-service。在这里面我主要存放公共的工具类,以及很重要的全局事务ID的传递(Feign Client调用和Ribbon(RestTemplate调用))。我这里是通过Spring的拦截器将XID放到请求头中,然后进行绑定的操作。整个工程代码如下:
commonservice.zip

我这里只介绍核心的几个AOP,其他代码可查看具体的工程。

1.3.1 FeignClient拦截

使用FeignClient拦截到全局事务ID,然后绑定到请求头中去,代码如下:

public class SeataFeignClientInterceptor implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate template) {
        String xid = RootContext.getXID();
        if (StringUtils.isNotBlank(xid)) {
            template.header(RootContext.KEY_XID, xid);
        }
    }
}

然后将其配置到IoC的容器中:

@Configuration
public class CommonBeanConfig {
    @Bean
    public RequestInterceptor requestInterceptor() {
        return new SeataFeignClientInterceptor();
    }
}

1.3.2 Ribbon拦截(RestTemplate)

使用Ribbon拦截到全局事务ID,然后绑定到请求头中。

public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {
    /**
     * RestTemplate请求拦截器
     * 在头部设置全局事务ID
     * @param request request
     * @param body 书序
     * @param execution ClientHttpRequestExecution
     * @return ClientHttpResponse
     * @throws IOException 异常
     */
    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
        HttpRequestWrapper requestWrapper = new HttpRequestWrapper(request);

        String xid = RootContext.getXID();
        if (StringUtils.isNotBlank(xid)) {
            requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
        }
        return execution.execute(requestWrapper, body);
    }
}

对RestTemplate加入到配置中:

@Configuration
public class SeataRestTemplateConfig {
    public SeataRestTemplateConfig(Collection<RestTemplate> restTemplates) {
        this.restTemplates = restTemplates;
    }
    @Bean
    public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
        return new SeataRestTemplateInterceptor();
    }
    private final Collection<RestTemplate> restTemplates;

    /**
     * 在构造函数之后执行
     */
    @PostConstruct
    public void init() {
        if (restTemplates != null && restTemplates.size() > 0) {
            restTemplates.forEach(restTemplate -> {
                List<ClientHttpRequestInterceptor> interceptorList = Lists
                        .newArrayList(restTemplate.getInterceptors());
                interceptorList.add(this.seataRestTemplateInterceptor());
                restTemplate.setInterceptors(interceptorList);
            });
        }
    }
}

1.3.3 绑定操作

最后编写拦截器,拦截所有的头部的全局事务ID,然后绑定操作。

@Slf4j
public class SeataHandlerInterceptor extends HandlerInterceptorAdapter {
    /**
     * 拦截前处理
     * 将全局事务ID绑定到上下文中
     * @param request HttpServletRequest
     * @param response HttpServletResponse
     * @param handler handler
     * @return 是否继续下一步
     * @throws Exception 异常
     */
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        String currentXid = RootContext.getXID();
        String globalXid = request.getHeader(RootContext.KEY_XID);
        if (StringUtils.isBlank(currentXid) && StringUtils.isNotBlank(globalXid)) {
            RootContext.bind(globalXid);
        }
        return true;
    }

    /**
     * 拦截后处理
     * @param request HttpServletRequest
     * @param response HttpServletResponse
     * @param handler handler
     * @param ex 异常
     * @throws Exception 异常
     */
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        String globalXid = request.getHeader(RootContext.KEY_XID);
        if (StringUtils.isBlank(globalXid)) {
            return;
        }

        String unBindXid = RootContext.unbind();
        if (!globalXid.equalsIgnoreCase(unBindXid)) {//在事务期间被更改过
            RootContext.bind(unBindXid);
        }
    }
}

最后配置该拦截器拦截所有的请求:

@Configuration
public class InterceptorConfig extends WebMvcConfigurationSupport {
    @Override
    protected void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new SeataHandlerInterceptor()).addPathPatterns("/**");
        super.addInterceptors(registry);
    }
}

其他代码可查看具体的工程。在此不多做介绍。

1.4 创建库存服务

库存服务主要是用户的购买行为,扣减商品的库存数量。整个项目代码如下:
storagebasic.zip

引入的Seata包如下:

<!-- 引入分布式事务框架Seata Start -->
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>${seata-starter.version}</version>
</dependency>
<!-- 引入分布式事务框架Seata End -->

Nacos的配置storage-service.yml如下:

spring:
  datasource:
    # 数据源连接池
    type: com.zaxxer.hikari.HikariDataSource
    # 驱动
    driver-class-name: com.mysql.cj.jdbc.Driver
    # 地址. 注意这里使用的是jdbc-url
    url: jdbc:mysql://192.168.10.3:3306/seata_store?useUnicode=true&characterEncoding=UTF-8&useSSL=false&useTimezone=true&serverTimezone=Asia/Shanghai&allowMultiQueries=true
    # 用户名
    username: root
    # 密码
    password: 123456

# 服务熔断
feign:
  hystrix:
    enabled: true

ribbon:
  ConnectTimeout: 3000
  ReadTimeout: 6000
  OkToRetryOnAllOperations: true
  MaxAutoRetriesNextServer: 1
  MaxAutoRetries: 1
  NFLoadBalancerRuleClassName: com.netflix.loadbalancer.WeightedResponseTimeRule

#===================== Seata Config Start ====================
seata:
  # 开启Seata
  enabled: true
  # 唯一标识
  application-id: ${spring.application.name}-seata
  # 事务群组(可以每个应用单独取名, 也可以使用相同的名字)
  tx-service-group: service_tx_group
  client:
    rm:
      # 是否上报一阶段成功
      report-success-enable: true
      # 自动刷新缓存中的表结构
      table-meta-check-enable: true
      # 一阶段结果上报TC重试次数
      report-retry-count: 5
      # 异步提交缓存队列长度
      async-commit-buffer-limit: 1000
      lock:
        # 校验或占用全局锁重试间隔, 单位毫秒
        retry-interval: 10
        # 校验或占用全局锁重试次数
        retry-times: 30
        # 分支事务与其他全局回滚事务冲突时锁策略(true表示优先释放本地锁)
        retry-policy-branch-rollback-on-conflict: true
    tm:
      # 一阶段全局提交结果上报TC重试次数
      commit-retry-count: 3
      # 一阶段全局回滚结果上报TC重试次数
      rollback-retry-count: 3
    undo:
      # 二阶段回滚镜像校验
      data-validation: true
      # undo序列化方式
      log-serialization: jackson
      # undo表名
      log-table: undo_log
    log:
      # 日志异常输出频率
      exception-rate: 100
  # 是否开启数据源自动代理
  enable-auto-data-source-proxy: false
  service:
    # 事务群组(必须与seata-server保持一致)
    vgroup-mapping:
      service_tx_group: default
    # 降级开关
    enable-degrade: false
    # 是否禁用全局事务开关
    disable-global-transaction: false
    # TC 服务列表。仅在注册中心为file时使用(我这里服务端使用的是Nacos配置, 所以此项无需配置)
    # grouplist: 192.168.10.16:8091, 192.168.10.17:8091
  # Netty 相关配置 Start
  transport:
    shutdown:
      wait: 3
    thread-factory:
      boss-thread-prefix: NettyBoss
      worker-thread-prefix: NettyServerNIOWorker
      server-executor-thread-prefix: NettyServerBizHandler
      share-boss-worker: false
      client-selector-thread-prefix: NettyClientSelector
      client-selector-thread-size: 1
      client-worker-thread-prefix: NettyClientWorkerThread
    type: TCP
    server: NIO
    heartbeat: true
    serialization: seata
    compressor: none
    # 客户端事务消息请求是否批量合并发送(默认true)
    enable-client-batch-send-request: true
  # Netty 相关配置 Start
  # 注册中心和配置中心相关 Start
  registry:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.discovery.server-addr}
      namespace:
      cluster: default
  config:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.config.server-addr}
      namespace:
      group: SEATA_GROUP
  # 注册中心和配置中心相关 End
#===================== Seata Config End ====================

具体内容,请注意上述的注释部分。注意,这里的组要和服务端的组一致,及SEATA_GROUP

由于我这里使用的是MyBatis-Plus,所以需要手动配置数据源,配置如下:

@Configuration
public class SeataDataSourceConfig {
    private final DataSourceProperties dataSourceProperties;

    public SeataDataSourceConfig(DataSourceProperties dataSourceProperties) {
        this.dataSourceProperties = dataSourceProperties;
    }

    @Bean
    @Primary
    public DataSource dataSource() {
        HikariDataSource dataSource = new HikariDataSource();
        dataSource.setJdbcUrl(dataSourceProperties.getUrl());
        dataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
        dataSource.setUsername(dataSourceProperties.getUsername());
        dataSource.setPassword(dataSourceProperties.getPassword());
        return dataSource;
    }
}

具体的业务代码请看工程里面的代码

1.5 创建资金账户服务

资金服务account-service,主要是用户购买后,扣减用户的资金账户的资金数据。整个项目代码如下:
accountbasic.zip

引入POM和数据源配置同上。不做介绍。

Nacos的配置account-service.yml配置如下:

spring:
  datasource:
    # 数据源连接池
    type: com.zaxxer.hikari.HikariDataSource
    # 驱动
    driver-class-name: com.mysql.cj.jdbc.Driver
    # 地址. 注意这里使用的是jdbc-url
    url: jdbc:mysql://192.168.10.3:3306/seata_account?useUnicode=true&characterEncoding=UTF-8&useSSL=false&useTimezone=true&serverTimezone=Asia/Shanghai&allowMultiQueries=true
    # 用户名
    username: root
    # 密码
    password: 123456

# 服务熔断
feign:
  hystrix:
    enabled: true

ribbon:
  ConnectTimeout: 3000
  ReadTimeout: 6000
  OkToRetryOnAllOperations: true
  MaxAutoRetriesNextServer: 1
  MaxAutoRetries: 1
  NFLoadBalancerRuleClassName: com.netflix.loadbalancer.WeightedResponseTimeRule

#===================== Seata Config Start ====================
seata:
  # 开启Seata
  enabled: true
  # 唯一标识
  application-id: ${spring.application.name}-seata
  # 事务群组(可以每个应用单独取名, 也可以使用相同的名字)
  tx-service-group: service_tx_group
  client:
    rm:
      # 是否上报一阶段成功
      report-success-enable: true
      # 自动刷新缓存中的表结构
      table-meta-check-enable: true
      # 一阶段结果上报TC重试次数
      report-retry-count: 5
      # 异步提交缓存队列长度
      async-commit-buffer-limit: 1000
      lock:
        # 校验或占用全局锁重试间隔, 单位毫秒
        retry-interval: 10
        # 校验或占用全局锁重试次数
        retry-times: 30
        # 分支事务与其他全局回滚事务冲突时锁策略(true表示优先释放本地锁)
        retry-policy-branch-rollback-on-conflict: true
    tm:
      # 一阶段全局提交结果上报TC重试次数
      commit-retry-count: 3
      # 一阶段全局回滚结果上报TC重试次数
      rollback-retry-count: 3
    undo:
      # 二阶段回滚镜像校验
      data-validation: true
      # undo序列化方式
      log-serialization: jackson
      # undo表名
      log-table: undo_log
    log:
      # 日志异常输出频率
      exception-rate: 100
  # 是否开启数据源自动代理
  enable-auto-data-source-proxy: false
  service:
    # 事务群组(必须与seata-server保持一致)
    vgroup-mapping:
      service_tx_group: default
    # 降级开关
    enable-degrade: false
    # 是否禁用全局事务开关
    disable-global-transaction: false
    # TC 服务列表。仅在注册中心为file时使用(我这里服务端使用的是Nacos配置, 所以此项无需配置)
    # grouplist: 192.168.10.16:8091, 192.168.10.17:8091
  # Netty 相关配置 Start
  transport:
    shutdown:
      wait: 3
    thread-factory:
      boss-thread-prefix: NettyBoss
      worker-thread-prefix: NettyServerNIOWorker
      server-executor-thread-prefix: NettyServerBizHandler
      share-boss-worker: false
      client-selector-thread-prefix: NettyClientSelector
      client-selector-thread-size: 1
      client-worker-thread-prefix: NettyClientWorkerThread
    type: TCP
    server: NIO
    heartbeat: true
    serialization: seata
    compressor: none
    # 客户端事务消息请求是否批量合并发送(默认true)
    enable-client-batch-send-request: true
  # Netty 相关配置 Start
  # 注册中心和配置中心相关 Start
  registry:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.discovery.server-addr}
      namespace:
      cluster: default
  config:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.config.server-addr}
      namespace:
      group: SEATA_GROUP
  # 注册中心和配置中心相关 End
#===================== Seata Config End ====================

请仔细阅读上述的注释部分。

1.6 创建订单服务

订单服务order-service,主要用户购买后下订单,并扣除资金账户的资金。其他配置同上。具体代码如下:
orderbasic.zip

Nacos的order-service.yml内容如下:

spring:
  datasource:
    # 数据源连接池
    type: com.zaxxer.hikari.HikariDataSource
    # 驱动
    driver-class-name: com.mysql.cj.jdbc.Driver
    # 地址. 注意这里使用的是jdbc-url
    url: jdbc:mysql://192.168.10.3:3306/seata_order?useUnicode=true&characterEncoding=UTF-8&useSSL=false&useTimezone=true&serverTimezone=Asia/Shanghai&allowMultiQueries=true
    # 用户名
    username: root
    # 密码
    password: 123456

# 服务熔断
feign:
  hystrix:
    enabled: true

ribbon:
  ConnectTimeout: 3000
  ReadTimeout: 6000
  OkToRetryOnAllOperations: true
  MaxAutoRetriesNextServer: 1
  MaxAutoRetries: 1
  NFLoadBalancerRuleClassName: com.netflix.loadbalancer.WeightedResponseTimeRule

#===================== Seata Config Start ====================
seata:
  # 开启Seata
  enabled: true
  # 唯一标识
  application-id: ${spring.application.name}-seata
  # 事务群组(可以每个应用单独取名, 也可以使用相同的名字)
  tx-service-group: service_tx_group
  client:
    rm:
      # 是否上报一阶段成功
      report-success-enable: true
      # 自动刷新缓存中的表结构
      table-meta-check-enable: true
      # 一阶段结果上报TC重试次数
      report-retry-count: 5
      # 异步提交缓存队列长度
      async-commit-buffer-limit: 1000
      lock:
        # 校验或占用全局锁重试间隔, 单位毫秒
        retry-interval: 10
        # 校验或占用全局锁重试次数
        retry-times: 30
        # 分支事务与其他全局回滚事务冲突时锁策略(true表示优先释放本地锁)
        retry-policy-branch-rollback-on-conflict: true
    tm:
      # 一阶段全局提交结果上报TC重试次数
      commit-retry-count: 3
      # 一阶段全局回滚结果上报TC重试次数
      rollback-retry-count: 3
    undo:
      # 二阶段回滚镜像校验
      data-validation: true
      # undo序列化方式
      log-serialization: jackson
      # undo表名
      log-table: undo_log
    log:
      # 日志异常输出频率
      exception-rate: 100
  # 是否开启数据源自动代理
  enable-auto-data-source-proxy: false
  service:
    # 事务群组(必须与seata-server保持一致)
    vgroup-mapping:
      service_tx_group: default
    # 降级开关
    enable-degrade: false
    # 是否禁用全局事务开关
    disable-global-transaction: false
    # TC 服务列表。仅在注册中心为file时使用(我这里服务端使用的是Nacos配置, 所以此项无需配置)
    # grouplist: 192.168.10.16:8091, 192.168.10.17:8091
  # Netty 相关配置 Start
  transport:
    shutdown:
      wait: 3
    thread-factory:
      boss-thread-prefix: NettyBoss
      worker-thread-prefix: NettyServerNIOWorker
      server-executor-thread-prefix: NettyServerBizHandler
      share-boss-worker: false
      client-selector-thread-prefix: NettyClientSelector
      client-selector-thread-size: 1
      client-worker-thread-prefix: NettyClientWorkerThread
    type: TCP
    server: NIO
    heartbeat: true
    serialization: seata
    compressor: none
    # 客户端事务消息请求是否批量合并发送(默认true)
    enable-client-batch-send-request: true
  # Netty 相关配置 Start
  # 注册中心和配置中心相关 Start
  registry:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.discovery.server-addr}
      namespace:
      cluster: default
  config:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.config.server-addr}
      namespace:
      group: SEATA_GROUP
  # 注册中心和配置中心相关 End
#===================== Seata Config End ====================

其他配置同上。

1.7 创建业务服务

业务服务business-service,主要提供用户购买行为的入口,和开启分布式事务。整个项目的代码如下:
businessbasic.zip

Nacos配置business-service.yml配置如下:

spring:
  datasource:
    # 数据源连接池
    type: com.zaxxer.hikari.HikariDataSource
    # 驱动
    driver-class-name: com.mysql.cj.jdbc.Driver
    # 地址. 注意这里使用的是jdbc-url
    url: jdbc:mysql://192.168.10.3:3306/seata_business?useUnicode=true&characterEncoding=UTF-8&useSSL=false&useTimezone=true&serverTimezone=Asia/Shanghai&allowMultiQueries=true
    # 用户名
    username: root
    # 密码
    password: 123456

# 服务熔断
feign:
  hystrix:
    enabled: true

ribbon:
  ConnectTimeout: 3000
  ReadTimeout: 6000
  OkToRetryOnAllOperations: true
  MaxAutoRetriesNextServer: 1
  MaxAutoRetries: 1
  NFLoadBalancerRuleClassName: com.netflix.loadbalancer.WeightedResponseTimeRule

#===================== Seata Config Start ====================
seata:
  # 开启Seata
  enabled: true
  # 唯一标识
  application-id: ${spring.application.name}-seata
  # 事务群组(可以每个应用单独取名, 也可以使用相同的名字)
  tx-service-group: service_tx_group
  client:
    rm:
      # 是否上报一阶段成功
      report-success-enable: true
      # 自动刷新缓存中的表结构
      table-meta-check-enable: true
      # 一阶段结果上报TC重试次数
      report-retry-count: 5
      # 异步提交缓存队列长度
      async-commit-buffer-limit: 1000
      lock:
        # 校验或占用全局锁重试间隔, 单位毫秒
        retry-interval: 10
        # 校验或占用全局锁重试次数
        retry-times: 30
        # 分支事务与其他全局回滚事务冲突时锁策略(true表示优先释放本地锁)
        retry-policy-branch-rollback-on-conflict: true
    tm:
      # 一阶段全局提交结果上报TC重试次数
      commit-retry-count: 3
      # 一阶段全局回滚结果上报TC重试次数
      rollback-retry-count: 3
    undo:
      # 二阶段回滚镜像校验
      data-validation: true
      # undo序列化方式
      log-serialization: jackson
      # undo表名
      log-table: undo_log
    log:
      # 日志异常输出频率
      exception-rate: 100
  # 是否开启数据源自动代理
  enable-auto-data-source-proxy: false
  service:
    # 事务群组(必须与seata-server保持一致)
    vgroup-mapping:
      service_tx_group: default
    # 降级开关
    enable-degrade: false
    # 是否禁用全局事务开关
    disable-global-transaction: false
    # TC 服务列表。仅在注册中心为file时使用(我这里服务端使用的是Nacos配置, 所以此项无需配置)
    # grouplist: 192.168.10.16:8091, 192.168.10.17:8091
  # Netty 相关配置 Start
  transport:
    shutdown:
      wait: 3
    thread-factory:
      boss-thread-prefix: NettyBoss
      worker-thread-prefix: NettyServerNIOWorker
      server-executor-thread-prefix: NettyServerBizHandler
      share-boss-worker: false
      client-selector-thread-prefix: NettyClientSelector
      client-selector-thread-size: 1
      client-worker-thread-prefix: NettyClientWorkerThread
    type: TCP
    server: NIO
    heartbeat: true
    serialization: seata
    compressor: none
    # 客户端事务消息请求是否批量合并发送(默认true)
    enable-client-batch-send-request: true
  # Netty 相关配置 Start
  # 注册中心和配置中心相关 Start
  registry:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.discovery.server-addr}
      namespace:
      cluster: default
  config:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.config.server-addr}
      namespace:
      group: SEATA_GROUP
  # 注册中心和配置中心相关 End
#===================== Seata Config End ====================

请仔细阅读上述的注释。避免出错。

业务操作,开启分布式事务,部分代码如下:

/**
* 用户购买商品处理
* @param businessReq 购买商品信息
* @return 成功/失败
*/
@Override
@GlobalTransactional(timeoutMills = 30000, name = "business-gts-seata")
public boolean buyHandle(@RequestBody BusinessReq businessReq) {
    log.info("buyHandle: 开始全局事务, XID --> {}", RootContext.getXID());

    //首先扣减商品库存 Start
    StorageReq storageReq = StorageReq.builder().commodityCode(businessReq.getCommodityCode())
            .count(businessReq.getCount()).build();
    log.info("buyHandle: storageFeignClient.decreaseStorage -- storageReq --> {}", storageReq);
    StatusDTO storageStatus = this.storageFeignClient.decreaseStorage(storageReq);
    log.info("buyHandle: storageFeignClient.decreaseStorage result --> {}", JSONObject.toJSONString(storageStatus));
    if (Objects.isNull(storageStatus) || !storageStatus.isSuccess()) {
        log.warn("buyHandle: storageFeignClient.decreaseStorage fail -- commodityCode --> {}, count --> {}", businessReq.getCommodityCode(), businessReq.getCount());
        throw new DefaultException("扣减库存异常");
    }
    //首先扣减商品库存 End

    //创建订单 Start
    OrderReq orderReq = OrderReq.builder().userId(businessReq.getUserId()).amount(businessReq.getAmount())
            .commodityCode(businessReq.getCommodityCode()).count(businessReq.getCount())
            .build();
    log.info("buyHandle: orderFeignClient.createOrder -- orderReq --> {}", orderReq);
    StatusDTO<OrderResp> orderStatus = this.orderFeignClient.createOrder(orderReq);
    log.info("buyHandle: orderFeignClient.createOrder result --> {}", JSONObject.toJSONString(orderStatus));
    if (Objects.isNull(orderStatus) || !orderStatus.isSuccess()) {
        log.warn("buyHandle: orderFeignClient.createOrder fail -- data --> {}", orderReq);
        throw new DefaultException("创建订单失败");
    }
    //创建订单 End
    return true;
}

添加注解:@GlobalTransaction开启全局事务功能。最后即可进行测试了。

0

评论区