Seata 分布式事务 Seata 框架中有三个角色:
Transaction Coordinator(TC): 事务协调器(TC):维护全局事务和分支事务的状态,驱动全局提交或回滚。
Transaction Manager(TM): 事务管理器(TM):定义全局事务的范围:开始全局事务、提交或回滚全局事务。
**Resource Manager(RM):**资源管理器(RM):管理参与分支事务的资源,与 TC 通信以注册分支事务并报告分支事务的状态,并驱动分支事务的提交或回滚。
Seata 管理的分布式事务的典型生命周期:
TM 请求 TC 开始一个新的事务。TC 生成一个 XID 来表示全局事务。
XID 在微服务调用链中传播。
RM 将本地事务注册为 XID 对应的全局事务的分支到 TC。
TM 请求 TC 提交或回滚 XID 对应的全局事务。
TC 驱动所有分支事务在对应的 XID 全局事务下完成分支提交或回滚。
Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。致力于在微服务架构下提供高性能和简单易用的分布式事务服务。
Seata官网
创建四个服务,分别是seata-account、seata-busines、seata-storage、seata-order四个服务,分别在service下
DROP TABLE IF EXISTS `undo_log`;CREATE TABLE `undo_log` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `branch_id` bigint (20 ) NOT NULL , `xid` varchar (100 ) NOT NULL , `context` varchar (128 ) NOT NULL , `rollback_info` longblob NOT NULL , `log_status` int (11 ) NOT NULL , `log_created` datetime NOT NULL , `log_modified` datetime NOT NULL , `ext` varchar (100 ) DEFAULT NULL , PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE= InnoDB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
注意:
如果使用seata必须要写入undo_log日志
创建数据库 CREATE DATABASE IF NOT EXISTS `storage_db`;USE `storage_db`; DROP TABLE IF EXISTS `storage_tbl`;CREATE TABLE `storage_tbl` ( `id` int (11 ) NOT NULL AUTO_INCREMENT, `commodity_code` varchar (255 ) DEFAULT NULL , `count` int (11 ) DEFAULT 0 , PRIMARY KEY (`id`), UNIQUE KEY (`commodity_code`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8; INSERT INTO storage_tbl (commodity_code, count) VALUES ('P0001' , 100 );INSERT INTO storage_tbl (commodity_code, count) VALUES ('B1234' , 10 );DROP TABLE IF EXISTS `undo_log`;CREATE TABLE `undo_log` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `branch_id` bigint (20 ) NOT NULL , `xid` varchar (100 ) NOT NULL , `context` varchar (128 ) NOT NULL , `rollback_info` longblob NOT NULL , `log_status` int (11 ) NOT NULL , `log_created` datetime NOT NULL , `log_modified` datetime NOT NULL , `ext` varchar (100 ) DEFAULT NULL , PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE= InnoDB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8; CREATE DATABASE IF NOT EXISTS `order_db`;USE `order_db`; DROP TABLE IF EXISTS `order_tbl`;CREATE TABLE `order_tbl` ( `id` int (11 ) NOT NULL AUTO_INCREMENT, `user_id` varchar (255 ) DEFAULT NULL , `commodity_code` varchar (255 ) DEFAULT NULL , `count` int (11 ) DEFAULT 0 , `money` int (11 ) DEFAULT 0 , PRIMARY KEY (`id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8; DROP TABLE IF EXISTS `undo_log`;CREATE TABLE `undo_log` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `branch_id` bigint (20 ) NOT NULL , `xid` varchar (100 ) NOT NULL , `context` varchar (128 ) NOT NULL , `rollback_info` longblob NOT NULL , `log_status` int (11 ) NOT NULL , `log_created` datetime NOT NULL , `log_modified` datetime NOT NULL , `ext` varchar (100 ) DEFAULT NULL , PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE= InnoDB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8; CREATE DATABASE IF NOT EXISTS `account_db`;USE `account_db`; DROP TABLE IF EXISTS `account_tbl`;CREATE TABLE `account_tbl` ( `id` int (11 ) NOT NULL AUTO_INCREMENT, `user_id` varchar (255 ) DEFAULT NULL , `money` int (11 ) DEFAULT 0 , PRIMARY KEY (`id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8; INSERT INTO account_tbl (user_id, money) VALUES ('1' , 10000 );DROP TABLE IF EXISTS `undo_log`;CREATE TABLE `undo_log` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `branch_id` bigint (20 ) NOT NULL , `xid` varchar (100 ) NOT NULL , `context` varchar (128 ) NOT NULL , `rollback_info` longblob NOT NULL , `log_status` int (11 ) NOT NULL , `log_created` datetime NOT NULL , `log_modified` datetime NOT NULL , `ext` varchar (100 ) DEFAULT NULL , PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE= InnoDB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
seata-account<dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-loadbalancer</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.mybatis.spring.boot</groupId > <artifactId > mybatis-spring-boot-starter</artifactId > <version > 3.0.3</version > </dependency > <dependency > <groupId > com.mysql</groupId > <artifactId > mysql-connector-j</artifactId > <scope > runtime</scope > </dependency > </dependencies >
bean
package com.lazy.bean;@Data public class AccountTbl implements Serializable { private Integer id; private String userId; private Integer money; }
mapper
package com.lazy.mapper;public interface AccountTblMapper { int deleteByPrimaryKey (Long id) ; int insert (AccountTbl record) ; int insertSelective (AccountTbl record) ; AccountTbl selectByPrimaryKey (Long id) ; int updateByPrimaryKeySelective (AccountTbl record) ; int updateByPrimaryKey (AccountTbl record) ; void debit (String userId, int money) ; }
service
package com.lazy.service;public interface AccountService { void debit (String userId, int money) ; }
serviceImpl
package com.lazy.service;@Service public class AccountServiceImpl implements AccountService { @Autowired private AccountTblMapper accountTblMapper; @Override public void debit (String userId, int money) { accountTblMapper.debit(userId, money); } }
controller
package com.lazy.controller;@RestController public class AccountTbRestController { @Autowired AccountService accountService; @GetMapping("/debit") public String debit (@RequestParam("userId") String userId, @RequestParam("money") int money) { accountService.debit(userId, money); return "account debit success" ; } }
主启动类
package com.lazy;@MapperScan("com.lazy.mapper") @EnableDiscoveryClient @SpringBootApplication public class SeataAccountApplication { public static void main (String[] args) { SpringApplication.run(SeataAccountApplication.class, args); } }
配置文件
spring: application: name: seata-account datasource: url: jdbc:mysql://localhost:3306/account_db?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: root driver-class-name: com.mysql.jdbc.Driver cloud: nacos: server-addr: 127.0 .0 .1 :8848 config: import-check: enabled: false server: port: 10000 mybatis: mapper-locations: classpath:mapper/*.xml
mapper.xml
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace ="com.lazy.mapper.AccountTblMapper" > <resultMap id ="BaseResultMap" type ="com.lazy.bean.AccountTbl" > <id property ="id" column ="id" jdbcType ="INTEGER" /> <result property ="userId" column ="user_id" jdbcType ="VARCHAR" /> <result property ="money" column ="money" jdbcType ="INTEGER" /> </resultMap > <sql id ="Base_Column_List" > id,user_id,money </sql > <select id ="selectByPrimaryKey" parameterType ="java.lang.Long" resultMap ="BaseResultMap" > select <include refid ="Base_Column_List" /> from account_tbl where id = #{id,jdbcType=INTEGER} </select > <delete id ="deleteByPrimaryKey" parameterType ="java.lang.Long" > delete from account_tbl where id = #{id,jdbcType=INTEGER} </delete > <insert id ="insert" keyColumn ="id" keyProperty ="id" parameterType ="com.lazy.bean.AccountTbl" useGeneratedKeys ="true" > insert into account_tbl ( id,user_id,money ) values (#{id,jdbcType=INTEGER},#{userId,jdbcType=VARCHAR},#{money,jdbcType=INTEGER} ) </insert > <insert id ="insertSelective" keyColumn ="id" keyProperty ="id" parameterType ="com.lazy.bean.AccountTbl" useGeneratedKeys ="true" > insert into account_tbl <trim prefix ="(" suffix =")" suffixOverrides ="," > <if test ="id != null" > id,</if > <if test ="userId != null" > user_id,</if > <if test ="money != null" > money,</if > </trim > <trim prefix ="values (" suffix =")" suffixOverrides ="," > <if test ="id != null" > #{id,jdbcType=INTEGER},</if > <if test ="userId != null" > #{userId,jdbcType=VARCHAR},</if > <if test ="money != null" > #{money,jdbcType=INTEGER},</if > </trim > </insert > <update id ="updateByPrimaryKeySelective" parameterType ="com.lazy.bean.AccountTbl" > update account_tbl <set > <if test ="userId != null" > user_id = #{userId,jdbcType=VARCHAR}, </if > <if test ="money != null" > money = #{money,jdbcType=INTEGER}, </if > </set > where id = #{id,jdbcType=INTEGER} </update > <update id ="updateByPrimaryKey" parameterType ="com.lazy.bean.AccountTbl" > update account_tbl set user_id = #{userId,jdbcType=VARCHAR}, money = #{money,jdbcType=INTEGER} where id = #{id,jdbcType=INTEGER} </update > <update id ="debit" > update account_tbl set money = money - #{money,jdbcType=INTEGER} where user_id = #{userId,jdbcType=VARCHAR} </update > </mapper >
seata-busines依赖
<dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-loadbalancer</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency >
service
package com.lazy.business.service;public interface BusinessService { void purchase (String userId, String commodityCode, int orderCount) ; }
serviceImpl
package com.lazy.business.service.impl;@Service public class BusinessServiceImpl implements BusinessService { @Override public void purchase (String userId, String commodityCode, int orderCount) { } }
controller
package com.lazy.business.controller;@RestController public class PurchaseRestController { @Autowired BusinessService businessService; @GetMapping("/purchase") public String purchase (@RequestParam("userId") String userId, @RequestParam("commodityCode") String commodityCode, @RequestParam("count") int orderCount) { businessService.purchase(userId, commodityCode, orderCount); return "business purchase success" ; } }
主启动类
package com.lazy.business;@EnableDiscoveryClient @SpringBootApplication public class SeataBusinessMainApplication { public static void main (String[] args) { SpringApplication.run(SeataBusinessMainApplication.class, args); } }
yaml
spring: application: name: seata-business cloud: nacos: server-addr: 127.0 .0 .1 :8848 config: import-check: enabled: false server: port: 11000
seata-order依赖
<dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-loadbalancer</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.mybatis.spring.boot</groupId > <artifactId > mybatis-spring-boot-starter</artifactId > <version > 3.0.4</version > </dependency > <dependency > <groupId > com.mysql</groupId > <artifactId > mysql-connector-j</artifactId > <scope > runtime</scope > </dependency > </dependencies >
bean
package com.lazy.order.bean;@Data public class OrderTbl implements Serializable { private Integer id; private String userId; private String commodityCode; private Integer count; private Integer money; private static final long serialVersionUID = 1L ; }
mapper
package com.lazy.order.mapper;public interface OrderTblMapper { int deleteByPrimaryKey (Long id) ; int insert (OrderTbl record) ; int insertSelective (OrderTbl record) ; OrderTbl selectByPrimaryKey (Long id) ; int updateByPrimaryKeySelective (OrderTbl record) ; int updateByPrimaryKey (OrderTbl record) ; }
service
package com.lazy.order.service;public interface OrderService { OrderTbl create (String userId, String commodityCode, int orderCount) ; }
serviceImpl
package com.lazy.order.service.impl;@Service public class OrderServiceImpl implements OrderService { @Autowired OrderTblMapper orderTblMapper; @Override public OrderTbl create (String userId, String commodityCode, int orderCount) { int orderMoney = calculate(commodityCode, orderCount); OrderTbl orderTbl = new OrderTbl (); orderTbl.setUserId(userId); orderTbl.setCommodityCode(commodityCode); orderTbl.setCount(orderCount); orderTbl.setMoney(orderMoney); orderTblMapper.insert(orderTbl); return orderTbl; } private int calculate (String commodityCode, int orderCount) { return 9 *orderCount; } }
controller
package com.lazy.order.controller;@RestController public class OrderRestController { @Autowired OrderService orderService; @GetMapping("/create") public String create (@RequestParam("userId") String userId, @RequestParam("commodityCode") String commodityCode, @RequestParam("count") int orderCount) { OrderTbl tbl = orderService.create(userId, commodityCode, orderCount); return "order create success = 订单id:【" +tbl.getId()+"】" ; } }
主启动类
package com.lazy.order;@MapperScan("com.lazy.order.mapper") @EnableDiscoveryClient @SpringBootApplication public class SeataOrderMainApplication { public static void main (String[] args) { SpringApplication.run(SeataOrderMainApplication.class, args); } }
yaml
spring: application: name: seata-order datasource: url: jdbc:mysql://localhost:3306/order_db?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver cloud: nacos: server-addr: 127.0 .0 .1 :8848 config: import-check: enabled: false server: port: 12000 mybatis: mapper-locations: classpath:mapper/*.xml
mapper.xml
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace ="com.lazy.order.mapper.OrderTblMapper" > <resultMap id ="BaseResultMap" type ="com.lazy.order.bean.OrderTbl" > <id property ="id" column ="id" jdbcType ="INTEGER" /> <result property ="userId" column ="user_id" jdbcType ="VARCHAR" /> <result property ="commodityCode" column ="commodity_code" jdbcType ="VARCHAR" /> <result property ="count" column ="count" jdbcType ="INTEGER" /> <result property ="money" column ="money" jdbcType ="INTEGER" /> </resultMap > <sql id ="Base_Column_List" > id,user_id,commodity_code, count,money </sql > <select id ="selectByPrimaryKey" parameterType ="java.lang.Long" resultMap ="BaseResultMap" > select <include refid ="Base_Column_List" /> from order_tbl where id = #{id,jdbcType=INTEGER} </select > <delete id ="deleteByPrimaryKey" parameterType ="java.lang.Long" > delete from order_tbl where id = #{id,jdbcType=INTEGER} </delete > <insert id ="insert" keyColumn ="id" keyProperty ="id" parameterType ="com.lazy.order.bean.OrderTbl" useGeneratedKeys ="true" > insert into order_tbl ( id,user_id,commodity_code ,count,money) values (#{id,jdbcType=INTEGER},#{userId,jdbcType=VARCHAR},#{commodityCode,jdbcType=VARCHAR} ,#{count,jdbcType=INTEGER},#{money,jdbcType=INTEGER}) </insert > <insert id ="insertSelective" keyColumn ="id" keyProperty ="id" parameterType ="com.lazy.order.bean.OrderTbl" useGeneratedKeys ="true" > insert into order_tbl <trim prefix ="(" suffix =")" suffixOverrides ="," > <if test ="id != null" > id,</if > <if test ="userId != null" > user_id,</if > <if test ="commodityCode != null" > commodity_code,</if > <if test ="count != null" > count,</if > <if test ="money != null" > money,</if > </trim > <trim prefix ="values (" suffix =")" suffixOverrides ="," > <if test ="id != null" > #{id,jdbcType=INTEGER},</if > <if test ="userId != null" > #{userId,jdbcType=VARCHAR},</if > <if test ="commodityCode != null" > #{commodityCode,jdbcType=VARCHAR},</if > <if test ="count != null" > #{count,jdbcType=INTEGER},</if > <if test ="money != null" > #{money,jdbcType=INTEGER},</if > </trim > </insert > <update id ="updateByPrimaryKeySelective" parameterType ="com.lazy.order.bean.OrderTbl" > update order_tbl <set > <if test ="userId != null" > user_id = #{userId,jdbcType=VARCHAR}, </if > <if test ="commodityCode != null" > commodity_code = #{commodityCode,jdbcType=VARCHAR}, </if > <if test ="count != null" > count = #{count,jdbcType=INTEGER}, </if > <if test ="money != null" > money = #{money,jdbcType=INTEGER}, </if > </set > where id = #{id,jdbcType=INTEGER} </update > <update id ="updateByPrimaryKey" parameterType ="com.lazy.order.bean.OrderTbl" > update order_tbl set user_id = #{userId,jdbcType=VARCHAR}, commodity_code = #{commodityCode,jdbcType=VARCHAR}, count = #{count,jdbcType=INTEGER}, money = #{money,jdbcType=INTEGER} where id = #{id,jdbcType=INTEGER} </update > </mapper >
seata-storage依赖
<dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-loadbalancer</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.mybatis.spring.boot</groupId > <artifactId > mybatis-spring-boot-starter</artifactId > <version > 3.0.4</version > </dependency > <dependency > <groupId > com.mysql</groupId > <artifactId > mysql-connector-j</artifactId > <scope > runtime</scope > </dependency > </dependencies >
bean
package com.lazy.storage.bean;@Data public class StorageTbl implements Serializable { private Integer id; private String commodityCode; private Integer count; private static final long serialVersionUID = 1L ; }
mapper
package com.lazy.storage.mapper;public interface StorageTblMapper { int deleteByPrimaryKey (Long id) ; int insert (StorageTbl record) ; int insertSelective (StorageTbl record) ; StorageTbl selectByPrimaryKey (Long id) ; int updateByPrimaryKeySelective (StorageTbl record) ; int updateByPrimaryKey (StorageTbl record) ; void deduct (String commodityCode, int count) ; }
service
package com.lazy.storage.service;public interface StorageService { void deduct (String commodityCode, int count) ; }
serviceImpl
package com.lazy.storage.service.impl;@Service public class StorageServiceImpl implements StorageService { @Autowired StorageTblMapper storageTblMapper; @Override public void deduct (String commodityCode, int count) { storageTblMapper.deduct(commodityCode, count); } }
controller
package com.lazy.storage.controller;@RestController public class StorageRestController { @Autowired StorageService storageService; @GetMapping("/deduct") public String deduct (@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count) { storageService.deduct(commodityCode, count); return "storage deduct success" ; } }
主启动类
package com.lazy.storage;@MapperScan("com.lazy.storage.mapper") @EnableDiscoveryClient @SpringBootApplication public class SeataStorageMainApplication { public static void main (String[] args) { SpringApplication.run(SeataStorageMainApplication.class, args); } }
spring: application: name: seata-storage datasource: url: jdbc:mysql://localhost:3306/storage_db?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver cloud: nacos: server-addr: 127.0 .0 .1 :8848 config: import-check: enabled: false server: port: 13000 mybatis: mapper-locations: classpath:mapper/*.xml
mapper.xml
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace ="com.lazy.storage.mapper.StorageTblMapper" > <resultMap id ="BaseResultMap" type ="com.lazy.storage.bean.StorageTbl" > <id property ="id" column ="id" jdbcType ="INTEGER" /> <result property ="commodityCode" column ="commodity_code" jdbcType ="VARCHAR" /> <result property ="count" column ="count" jdbcType ="INTEGER" /> </resultMap > <sql id ="Base_Column_List" > id,commodity_code,count </sql > <select id ="selectByPrimaryKey" parameterType ="java.lang.Long" resultMap ="BaseResultMap" > select <include refid ="Base_Column_List" /> from storage_tbl where id = #{id,jdbcType=INTEGER} </select > <delete id ="deleteByPrimaryKey" parameterType ="java.lang.Long" > delete from storage_tbl where id = #{id,jdbcType=INTEGER} </delete > <insert id ="insert" keyColumn ="id" keyProperty ="id" parameterType ="com.lazy.storage.bean.StorageTbl" useGeneratedKeys ="true" > insert into storage_tbl ( id,commodity_code,count ) values (#{id,jdbcType=INTEGER},#{commodityCode,jdbcType=VARCHAR},#{count,jdbcType=INTEGER} ) </insert > <insert id ="insertSelective" keyColumn ="id" keyProperty ="id" parameterType ="com.lazy.storage.bean.StorageTbl" useGeneratedKeys ="true" > insert into storage_tbl <trim prefix ="(" suffix =")" suffixOverrides ="," > <if test ="id != null" > id,</if > <if test ="commodityCode != null" > commodity_code,</if > <if test ="count != null" > count,</if > </trim > <trim prefix ="values (" suffix =")" suffixOverrides ="," > <if test ="id != null" > #{id,jdbcType=INTEGER},</if > <if test ="commodityCode != null" > #{commodityCode,jdbcType=VARCHAR},</if > <if test ="count != null" > #{count,jdbcType=INTEGER},</if > </trim > </insert > <update id ="updateByPrimaryKeySelective" parameterType ="com.lazy.storage.bean.StorageTbl" > update storage_tbl <set > <if test ="commodityCode != null" > commodity_code = #{commodityCode,jdbcType=VARCHAR}, </if > <if test ="count != null" > count = #{count,jdbcType=INTEGER}, </if > </set > where id = #{id,jdbcType=INTEGER} </update > <update id ="updateByPrimaryKey" parameterType ="com.lazy.storage.bean.StorageTbl" > update storage_tbl set commodity_code = #{commodityCode,jdbcType=VARCHAR}, count = #{count,jdbcType=INTEGER} where id = #{id,jdbcType=INTEGER} </update > <update id ="deduct" > update storage_tbl set count = count - #{count} where commodity_code = #{commodityCode} </update > </mapper >
添加单个事务回滚 我们在seata-account、seata-order、seata-storage上的service添加事务,主启动类上开启事务
seata-account ,service
package com.lazy.service;@Service public class AccountServiceImpl implements AccountService { @Autowired private AccountTblMapper accountTblMapper; @Transactional @Override public void debit (String userId, int money) { accountTblMapper.debit(userId, money); } }
开启事务
package com.lazy;@EnableTransactionManagement @MapperScan("com.lazy.mapper") @EnableDiscoveryClient @SpringBootApplication public class SeataAccountApplication { public static void main (String[] args) { SpringApplication.run(SeataAccountApplication.class, args); } }
seata-order,service
package com.lazy.order.controller;@RestController public class OrderRestController { @Autowired OrderService orderService; @Transactional @GetMapping("/create") public String create (@RequestParam("userId") String userId, @RequestParam("commodityCode") String commodityCode, @RequestParam("count") int orderCount) { OrderTbl tbl = orderService.create(userId, commodityCode, orderCount); return "order create success = 订单id:【" +tbl.getId()+"】" ; } }
package com.lazy.order;@EnableTransactionManagement @MapperScan("com.lazy.order.mapper") @EnableDiscoveryClient @SpringBootApplication public class SeataOrderMainApplication { public static void main (String[] args) { SpringApplication.run(SeataOrderMainApplication.class, args); } }
seata-storage,service
package com.lazy.storage.service.impl;@Service public class StorageServiceImpl implements StorageService { @Autowired StorageTblMapper storageTblMapper; @Transactional @Override public void deduct (String commodityCode, int count) { storageTblMapper.deduct(commodityCode, count); if (count ==5 ){ throw new RuntimeException ("库存不足!" ); } } }
package com.lazy.storage;@EnableTransactionManagement @MapperScan("com.lazy.storage.mapper") @EnableDiscoveryClient @SpringBootApplication public class SeataStorageMainApplication { public static void main (String[] args) { SpringApplication.run(SeataStorageMainApplication.class, args); } }
seata-busines集成下订单和扣库存
在seata-busines创建两个接口
package com.lazy.business.feign;@FeignClient(value = "seata-order") public interface OrderFeignClient { @GetMapping("/create") public String create (@RequestParam("userId") String userId, @RequestParam("commodityCode") String commodityCode, @RequestParam("count") int orderCount) ;}
package com.lazy.business.feign;@FeignClient(value = "seata-storage") public interface StorageFeignClient { @GetMapping("/deduct") public String deduct (@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count) ;}
serviceImpl
package com.lazy.business.service.impl;@Service public class BusinessServiceImpl implements BusinessService { @Autowired private StorageFeignClient storageFeignClient; @Autowired private OrderFeignClient orderFeignClient; @Override public void purchase (String userId, String commodityCode, int orderCount) { storageFeignClient.deduct(commodityCode, orderCount); orderFeignClient.create(userId, commodityCode, orderCount); } }
主启动类添加开启FeignClient
package com.lazy.business;@EnableFeignClients @EnableDiscoveryClient @SpringBootApplication public class SeataBusinessMainApplication { public static void main (String[] args) { SpringApplication.run(SeataBusinessMainApplication.class, args); } }
改造seata-order
package com.lazy.order.feign;@FeignClient(value = "seata-account") public interface AccountFeignClient { @GetMapping("/debit") public String debit (@RequestParam("userId") String userId, @RequestParam("money") int money) ;}
serviceImpl
package com.lazy.order.service.impl;@Service public class OrderServiceImpl implements OrderService { @Autowired OrderTblMapper orderTblMapper; @Autowired private AccountFeignClient accountFeignClient; @Override public OrderTbl create (String userId, String commodityCode, int orderCount) { int orderMoney = calculate(commodityCode, orderCount); accountFeignClient.debit(userId, orderMoney); OrderTbl orderTbl = new OrderTbl (); orderTbl.setUserId(userId); orderTbl.setCommodityCode(commodityCode); orderTbl.setCount(orderCount); orderTbl.setMoney(orderMoney); orderTblMapper.insert(orderTbl); return orderTbl; } private int calculate (String commodityCode, int orderCount) { return 9 *orderCount; } }
主启动类添加开启Feign
Seata原理
下载Seata
启动seata
seata的访问地址
启动完seata我们就应该配置引入seata了,只要用到了就需要引入
<dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-seata</artifactId > </dependency >
配置文件,需要跟application.yaml同级,创建一个file.conf
完整的file.conf
transport { type = "TCP" server = "NIO" heartbeat = true enableTmClientBatchSendRequest = false enableRmClientBatchSendRequest = true rpcRmRequestTimeout = 2000 rpcTmRequestTimeout = 30000 rpcRmRequestTimeout = 15000 threadFactory { bossThreadPrefix = "NettyBoss" workerThreadPrefix = "NettyServerNIOWorker" serverExecutorThread-prefix = "NettyServerBizHandler" shareBossWorker = false clientSelectorThreadPrefix = "NettyClientSelector" clientSelectorThreadSize = 1 clientWorkerThreadPrefix = "NettyClientWorkerThread" bossThreadSize = 1 workerThreadSize = "default" } shutdown { wait = 3 } serialization = "seata" compressor = "none" } service { vgroupMapping.default_tx_group = "default" default.grouplist = "127.0.0.1:8091" enableDegrade = false disableGlobalTransaction = false } client { rm { asyncCommitBufferLimit = 10000 lock { retryInterval = 10 retryTimes = 30 retryPolicyBranchRollbackOnConflict = true } reportRetryCount = 5 tableMetaCheckEnable = false tableMetaCheckerInterval = 60000 reportSuccessEnable = false sagaBranchRegisterEnable = false sagaJsonParser = "fastjson" sagaRetryPersistModeUpdate = false sagaCompensatePersistModeUpdate = false tccActionInterceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000 sqlParserType = "druid" branchExecutionTimeoutXA = 60000 connectionTwoPhaseHoldTimeoutXA = 10000 } tm { commitRetryCount = 5 rollbackRetryCount = 5 defaultGlobalTransactionTimeout = 60000 degradeCheck = false degradeCheckPeriod = 2000 degradeCheckAllowTimes = 10 interceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000 } undo { dataValidation = true onlyCareUpdateColumns = true logSerialization = "jackson" logTable = "undo_log" compress { enable = true type = zip threshold = 64k } } loadBalance { type = "XID" virtualNodes = 10 } } log { exceptionRate = 100 } tcc { fence { logTableName = tcc_fence_log cleanPeriod = 1h } }
我们只需要引入部分即可,只要那个项目引入依赖,就得在下面配置这个,不然服务启动不起来
service { vgroupMapping.default_tx_group = "default" default.grouplist = "127.0.0.1:8091" # seata服务端的地址 enableDegrade = false disableGlobalTransaction = false }
只引入这个是实现不了分布式事务的,我们还需要在那个地方引入的分布式的上面添加@GlobalTransactional注解
这个项目我们是在seata-busines中的serviceImpl下调用的,所以在改方法上添加注解
package com.lazy.business.service.impl;@Service public class BusinessServiceImpl implements BusinessService { @Autowired private StorageFeignClient storageFeignClient; @Autowired private OrderFeignClient orderFeignClient; @GlobalTransactional # 添加seata的全局事务 @Override public void purchase (String userId, String commodityCode, int orderCount) { storageFeignClient.deduct(commodityCode, orderCount); orderFeignClient.create(userId, commodityCode, orderCount); } }
我们在启动项目测试
发现我们的代码报错了,但是我们的数据库是进行回滚了
seata二阶提交协议 Seata 通过引入 全局事务管理 和 分支事务 来解决分布式系统中的事务一致性问题。它使用类似于传统数据库事务中的 二阶段提交(2PC,Two-Phase Commit) 协议来确保事务的原子性。
二阶段提交(2PC)过程 Seata 采用二阶段提交协议(2PC)来确保分布式事务的一致性。以下是二阶段提交的详细流程:
第一阶段:事务预备(Try阶段) 全局事务开始:客户端应用向 Seata Server 发起请求,Seata 会生成一个全局事务 ID(XID),并返回给客户端应用。全局事务标识用于追踪整个分布式事务。
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();tx.begin();
分支事务注册:每个参与者(微服务)启动后,会向 Seata Server 注册自己作为一个分支事务,Seata Server 会为每个分支事务分配一个唯一的事务分支 ID(Branch ID)。Try 阶段:每个微服务会执行 Try 操作,即准备执行本地事务操作,但不提交数据。例如,更新某个数据库中的记录,但不提交。执行数据库操作:每个参与的子事务都会执行数据库更新,但不会真正提交,而是进入 prepare 状态(对于 AT 模式,这意味着生成 undo_log)。
第二阶段:提交(Commit)或回滚(Rollback)
提交(Commit):
Seata Server 收到全局事务提交请求后,通知所有分支事务提交事务。
分支事务提交数据库操作(删除 undo_log)。
回滚(Rollback):
如果 Seata Server 发现某个分支事务执行失败,则通知所有已提交的分支事务回滚,恢复 undo_log 记录的数据。
seata配置到nacos 官网地址
seata的四种事务模式 Seata AT 模式 概述 AT 模式是 Seata 创新的一种非侵入式的分布式事务解决方案,Seata 在内部做了对数据库操作的代理层,我们使用 Seata AT 模式时,实际上用的是 Seata 自带的数据源代理 DataSourceProxy,Seata 在这层代理中加入了很多逻辑,比如插入回滚 undo_log 日志,检查全局锁等。
本文中,我们将重点介绍 Seata AT 模式的使用,如果您对于 AT 模式原理感兴趣,还请阅读对应于本篇文章的开发者指南 。
整体机制 两阶段提交协议的演变:
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
二阶段:
提交异步化,非常快速地完成。
回滚通过一阶段的回滚日志进行反向补偿。
Seata TCC 模式 概述 TCC 模式是 Seata 支持的一种由业务方细粒度控制的侵入式分布式事务解决方案,是继 AT 模式后第二种支持的事务模式,最早由蚂蚁金服贡献。其分布式事务模型直接作用于服务层,不依赖底层数据库,可以灵活选择业务资源的锁定粒度,减少资源锁持有时间,可扩展性好,可以说是为独立部署的 SOA 服务而设计的。
本文中,我们将重点介绍 Seata TCC 模式的使用,如果您对于 TCC 模式原理感兴趣,想要了解 Seata TCC 对于幂等、空回滚、悬挂问题的解决,还请阅读对应于本篇文章的开发者指南 。
优势 TCC 完全不依赖底层数据库,能够实现跨数据库、跨应用资源管理,可以提供给业务方更细粒度的控制。
缺点 TCC 是一种侵入式的分布式事务解决方案,需要业务系统自行实现 Try,Confirm,Cancel 三个操作,对业务系统有着非常大的入侵性,设计相对复杂。
适用场景 TCC 模式是高性能分布式事务解决方案,适用于核心系统等对性能有很高要求的场景。
整体机制 在两阶段提交协议中,资源管理器(RM, Resource Manager)需要提供“准备”、“提交”和“回滚” 3 个操作;而事务管理器(TM, Transaction Manager)分 2 阶段协调所有资源管理器,在第一阶段询问所有资源管理器“准备”是否成功,如果所有资源均“准备”成功则在第二阶段执行所有资源的“提交”操作,否则在第二阶段执行所有资源的“回滚”操作,保证所有资源的最终状态是一致的,要么全部提交要么全部回滚。
资源管理器有很多实现方式,其中 TCC(Try-Confirm-Cancel)是资源管理器的一种服务化的实现;TCC 是一种比较成熟的分布式事务解决方案,可用于解决跨数据库、跨服务业务操作的数据一致性问题;TCC 其 Try、Confirm、Cancel 3 个方法均由业务编码实现,故 TCC 可以被称为是服务化的资源管理器。
TCC 的 Try 操作作为一阶段,负责资源的检查和预留;Confirm 操作作为二阶段提交操作,执行真正的业务;Cancel 是二阶段回滚操作,执行预留资源的取消,使资源回到初始状态。
Seata Saga 模式 概述 Saga 模式是 SEATA 提供的长事务解决方案,在 Saga 模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。
理论基础:Hector & Kenneth 发表论⽂ Sagas (1987)
适用场景:
业务流程长、业务流程多
参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口
优势:
一阶段提交本地事务,无锁,高性能
事件驱动架构,参与者可异步执行,高吞吐
补偿服务易于实现
缺点:
Saga 的实现: 基于状态机引擎的 Saga 实现: 目前 SEATA 提供的 Saga 模式是基于状态机引擎来实现的,机制是:
通过状态图来定义服务调用的流程并生成 json 状态语言定义文件
状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点
状态图 json 由状态机引擎驱动执行,当出现异常时状态引擎反向执行已成功节点对应的补偿节点将事务回滚
注意: 异常发生时是否进行补偿也可由用户自定义决定
可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能
示例状态图:
Seata XA 模式 概述 XA 模式是从 1.2 版本支持的事务模式。XA 规范 是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准。Seata XA 模式是利用事务资源(数据库、消息服务等)对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种事务模式。
本文中,我们将重点介绍 Seata XA 模式的使用,如果您对于 XA 模式原理感兴趣,还请阅读对应于本篇文章的开发者指南 。
优势 与 Seata 支持的其它事务模式不同,XA 协议要求事务资源本身提供对规范和协议的支持,所以事务资源(如数据库)可以保障从任意视角对数据的访问有效隔离,满足全局数据一致性。此外的一些优势还包括:
业务无侵入:和 AT 一样,XA 模式将是业务无侵入的,不给应用设计和开发带来额外负担。
数据库的支持广泛:XA 协议被主流关系型数据库广泛支持,不需要额外的适配即可使用。
缺点 XA prepare 后,分支事务进入阻塞阶段,收到 XA commit 或 XA rollback 前必须阻塞等待。事务资源长时间得不到释放,锁定周期长,而且在应用层上面无法干预,性能差。
适用场景 适用于想要迁移到 Seata 平台基于 XA 协议的老应用,使用 XA 模式将更平滑,还有 AT 模式未适配的数据库应用。
整体机制
执行阶段:
可回滚:业务 SQL 操作放在 XA 分支中进行,由资源对 XA 协议的支持来保证 可回滚
持久化:XA 分支完成后,执行 XA prepare,同样,由资源对 XA 协议的支持来保证 持久化 (即,之后任何意外都不会造成无法回滚的情况)
完成阶段:
分支提交:执行 XA 分支的 commit
分支回滚:执行 XA 分支的 rollback
切换seata数据源的代理模式 seata: data-source-proxy-mode: AT
总结 SpringCloud alibaba 总结