SpringBoot读写分离配置与事务

引入依赖

  • <dependency>
  • <groupId>com.baomidou</groupId>
  • <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
  • <version>3.5.1</version>
  • </dependency>

跟mybatis-plus属于同一个开源组织 苞米豆

配置文件

  • spring:
  • datasource:
  • dynamic:
  • primary: master
  • # 严格匹配数据源,默认false. true未匹配到指定数据源时抛异常,false使用默认数据源
  • strict: true
  • datasource:
  • master:
  • url: jdbc:mysql://192.168.101.128:3307/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
  • username: root
  • password: 123456
  • slave:
  • url: jdbc:mysql://192.168.101.128:3308/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
  • username: root
  • password: 123456
  • # 按需开启日志
  • logging:
  • level:
  • com.baomidou.dynamic: debug

方法或类上加上@DS注解即可切换数据源


该框架获取数据库连接的核心逻辑是以下这段

在被DS注解标记的方法上, 会被此拦截器拦截, 获取到注解上定义的值, 并存入栈结构中

  • com.baomidou.dynamic.datasource.aop.DynamicDataSourceAnnotationInterceptor
  • public Object invoke(MethodInvocation invocation) throws Throwable {
  • String dsKey = determineDatasourceKey(invocation);
  • //获取注解值入栈
  • DynamicDataSourceContextHolder.push(dsKey);
  • try {
  • return invocation.proceed();
  • } finally {
  • //方法结束后出栈
  • DynamicDataSourceContextHolder.poll();
  • }
  • }

然后com.baomidou.dynamic.datasource.ds.AbstractRoutingDataSource#getConnection()

  • public Connection getConnection() throws SQLException {
  • String xid = TransactionContext.getXID();
  • if (StringUtils.isEmpty(xid)) {
  • return determineDataSource().getConnection();
  • } else {
  • //获取栈顶的一个值
  • String ds = DynamicDataSourceContextHolder.peek();
  • ds = StringUtils.isEmpty(ds) ? "default" : ds;
  • //根据值获取对应数据库的连接
  • ConnectionProxy connection = ConnectionFactory.getConnection(ds);
  • return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;
  • }
  • }

可以发现框架是通过维护一个栈结构进行对应数据源的切换, 类似方法的栈, 因为方法间可能嵌套调用, 所以使用此结构便于管理


但Spring的@Transactional会影响@DS

例如

  • @Autowired
  • @Lazy
  • CurrentService currentService;
  • @DS("master")
  • @Transactional
  • public void updateUser() {
  • baseMapper.updateById(user);
  • System.out.println(currentService.get());
  • }
  • @DS("slave")
  • public User get() {
  • return baseMapper.selectById(1);
  • }

在这里, master和slave是使用binlog搭建的读写分离架构

但实际get方法却能读取到updateUser所做的修改, 通过Debug也能看到真正的数据库连接属性, get方法还是使用的master库

因为在Spring管理下, 获取到数据库连接后, 会和当前线程进行绑定, 如果后面的方法被判断为不需要新建连接, 则复用之前与线程绑定的连接, 那么即使有DS注解, 也切换不了库

如何判断需不需要新建连接? 看被调用方法是否定义了事务传播属性.

org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction方法中

  • if (isExistingTransaction(transaction)) {
  • //找到现有事务 -> 检查传播行为以了解行为方式
  • //当前已经存在一个事务
  • return handleExistingTransaction(def, transaction, debugEnabled);
  • }

继续调用org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction

  • //判断当前方法的隔离属性是否为PROPAGATION_REQUIRES_NEW
  • if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
  • if (debugEnabled) {
  • logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");
  • }
  • SuspendedResourcesHolder suspendedResources = suspend(transaction);
  • try {
  • return startTransaction(definition, transaction, debugEnabled, suspendedResources);
  • }
  • catch (RuntimeException | Error beginEx) {
  • resumeAfterBeginException(transaction, suspendedResources, beginEx);
  • throw beginEx;
  • }
  • }

继续调用org.springframework.transaction.support.AbstractPlatformTransactionManager#startTransaction

  • private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
  • boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
  • boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  • DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  • doBegin(transaction, definition);
  • prepareSynchronization(status, definition);
  • return status;
  • }

继续调用org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin

  • if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
  • //获取当前的数据源, 此处才能让@DS注解生效
  • Connection newCon = obtainDataSource().getConnection();
  • if (logger.isDebugEnabled()) {
  • logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
  • }
  • //将数据库连接绑定到事务
  • txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
  • }

所以如果想让get方法读取从库, 则需要定义传播属性以便让Spring建立新连接


第二种方法就是使用该框架的@DSTransactional

该方法也会进行事务管理, 但功能比较简陋

这个注解还提供了一个本地事务的功能: 解决多数据源的事务问题.

但这个功能也有问题, 不建议使用

看这个方法 com.baomidou.dynamic.datasource.tx.ConnectionFactory#notify

  • public static void notify(Boolean state) {
  • try {
  • Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();
  • //获取当前线程所有的数据库连接, 通知其进行回滚/提交, 可能存在某一个事务提交成功, 某一事务提交失败.
  • //并不能保证最终一致性
  • for (ConnectionProxy connectionProxy : concurrentHashMap.values()) {
  • connectionProxy.notify(state);
  • }
  • } finally {
  • CONNECTION_HOLDER.remove();
  • }
  • }

对于这种多库事务, 建议使用Seata或消息队列

贴心的是, 框架还与Seata进行了整合

引入依赖

  • <dependency>
  • <groupId>io.seata</groupId>
  • <artifactId>seata-spring-boot-starter</artifactId>
  • <version>1.4.2</version>
  • </dependency>
  • spring:
  • datasource:
  • dynamic:
  • #seata1.0之后支持自动代理 这里直接配置true
  • seata: true
本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
<<上一篇
下一篇>>