mybatis实现数据库读写分离

news/2024/7/5 1:52:20 标签: mybatis, mybatis读写分离, 事务, 数据源, 数据库

ps:本文解决mybatis实现数据库读写分离,项目基础是ruoyi-vue
方案一:
自定义一个注解@DataSource, 利用aop切该注解,切了后设置注解@DataSource的值到ThreadLocal里面,再利用AbstractRoutingDataSource的determineCurrentLookupKey方法去选择数据源

优点:通过注解方式,修改简便
缺点:dao上配置注解,有点麻烦

public enum DataSourceType
{
    /**
     * 主库, write
     */
    MASTER,

    /**
     * 从库,read
     */
    SLAVE
}
@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface DataSource
{
    /**
     * 切换数据源名称
     */
    public DataSourceType value() default DataSourceType.MASTER;
}
@Aspect
@Order(1)
@Component
public class DataSourceAspect
{
    protected Logger logger = LoggerFactory.getLogger(getClass());

    @Pointcut("@annotation(com.ruoyi.common.annotation.DataSource)"
            + "|| @within(com.ruoyi.common.annotation.DataSource)")
    public void dsPointCut() {}


    @Around("dsPointCut()")
    public Object around(ProceedingJoinPoint point) throws Throwable
    {
        DataSource dataSource = getDataSource(point);

        if (StringUtils.isNotNull(dataSource))
        {
            DynamicDataSourceContextHolder.setDataSourceType(dataSource.value().name());
        }

        try
        {
            return point.proceed();
        }
        finally
        {
            // 销毁数据源 在执行方法之后
            DynamicDataSourceContextHolder.clearDataSourceType();
        }
    }


    /**
     * 获取需要切换的数据源
     */
    public DataSource getDataSource(ProceedingJoinPoint point)
    {
        MethodSignature signature = (MethodSignature) point.getSignature();
        DataSource dataSource = AnnotationUtils.findAnnotation(signature.getMethod(), DataSource.class);
        if (Objects.nonNull(dataSource))
        {
            return dataSource;
        }

        return AnnotationUtils.findAnnotation(signature.getDeclaringType(), DataSource.class);
    }


}
public class DynamicDataSourceContextHolder
{
    public static final Logger log = LoggerFactory.getLogger(DynamicDataSourceContextHolder.class);

    /**
     * 使用ThreadLocal维护变量,ThreadLocal为每个使用该变量的线程提供独立的变量副本,
     *  所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
     */
    private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();

    /**
     * 设置数据源的变量
     */
    public static void setDataSourceType(String dsType)
    {
        log.info("切换到{}数据源", dsType);
        CONTEXT_HOLDER.set(dsType);
    }

    /**
     * 获得数据源的变量
     */
    public static String getDataSourceType()
    {
        return CONTEXT_HOLDER.get();
    }

    /**
     * 清空数据源变量
     */
    public static void clearDataSourceType()
    {
        CONTEXT_HOLDER.remove();
    }
}

@Slf4j
@Component
public class DynamicDataSource extends AbstractRoutingDataSource
{
    //写数据源
    @Autowired
    private DataSource writeDataSource;

    //读数据源
    private List<DataSource> readDataSources;

    @Value("${spring.datasource.druid.slaveNum}")
    private int slaveNum;

    @Value("${spring.datasource.druid.slaveChangeStrategy}")
    //获取读数据源的方式,轮询=0,随机=1
    private int slaveChangeStrategy = 0;

    //决策当前应该使用什么数据源
    //根本不会被调用
    @Override
    protected Object determineCurrentLookupKey() {
        //获取当前线程的数据源类型master slave
        String dataSourceType = DynamicDataSourceContextHolder.getDataSourceType();
        if(StringUtils.isEmpty(dataSourceType)||
                DataSourceType.MASTER.name().equals(dataSourceType)||
                slaveNum<=0){
            logger.info("使用写数据源master");
            //使用写数据源
            return DataSourceType.MASTER.name();
        }

        int index = SlaveChangeStrategyFactory.getSlaveChangeStrategy(slaveChangeStrategy).select(slaveNum);
        logger.info("使用读数据源"+(dataSourceType + index));

        return dataSourceType + index;
    }


//    public DynamicDataSource(DataSource defaultTargetDataSource, Map<Object, Object> targetDataSources)
//    {
//        super.setDefaultTargetDataSource(defaultTargetDataSource);
//        super.setTargetDataSources(targetDataSources);
//        super.afterPropertiesSet();
//    }

    @Override
    public void afterPropertiesSet() {
        if (this.writeDataSource == null) {
            throw new IllegalArgumentException("Property 'writeDataSource' is required");
        }
        //设置默认的数据源
        super.setDefaultTargetDataSource(writeDataSource);

        Map<Object, Object> targetDataSources = new HashMap<>();
        //放入写数据源
        targetDataSources.put(DataSourceType.MASTER.name(), writeDataSource);

        //放入读数据源
        readDataSources = new ArrayList<>();
        for(int i=1;i<=slaveNum;i++){
            readDataSources.add(SpringUtils.getBean("readDataSource"+i));
            targetDataSources.put(DataSourceType.SLAVE.name() + i, readDataSources.get(i-1));
        }
        //设置所有的数据源
        super.setTargetDataSources(targetDataSources);
        super.afterPropertiesSet();
    }
}
public class SlaveChangeStrategyFactory {

    private static Map<Integer,SlaveChangeStrategy> strategyMap = new HashMap();
    static{
        strategyMap.put(0,new RoundRobinSlaveChangeStrategy());
        strategyMap.put(1,new RandomSlaveChangeStrategy());
    }

    public static SlaveChangeStrategy getSlaveChangeStrategy(int strategy){
        SlaveChangeStrategy scStrategy = strategyMap.get(strategy);
        if(null==scStrategy){
            throw new RuntimeException("从数据库没有策略"+strategy);
        }
        return scStrategy;
    }

}

public interface SlaveChangeStrategy {

    int select(int slaveNum);

}

public class RoundRobinSlaveChangeStrategy implements SlaveChangeStrategy {

    private AtomicLong counter = new AtomicLong(0);

    private static final Long MAX_POOL = Long.MAX_VALUE;

    private final Lock lock = new ReentrantLock();

    @Override
    public int select(int slaveNum) {
        //轮询方式
        long currValue = counter.incrementAndGet();
        if((currValue + 1) >= MAX_POOL) {
            try {
                lock.lock();
                if((currValue + 1) >= MAX_POOL) {
                    counter.set(0);
                }
            } finally {
                lock.unlock();
            }
        }
        return (int) (currValue % slaveNum)+1;
    }

}
public class RandomSlaveChangeStrategy implements SlaveChangeStrategy {

    @Override
    public int select(int slaveNum) {
        return ThreadLocalRandom.current().nextInt(1, slaveNum+1);
    }

}
@Configuration
public class DruidConfig
{
    @Bean
    @Primary
    @ConfigurationProperties("spring.datasource.druid.master")
    public DataSource writeDataSource(DruidProperties druidProperties)
    {
        DruidDataSource dataSource = DruidDataSourceBuilder.create().build();
        return druidProperties.dataSource(dataSource);
    }

    @Bean
    @ConfigurationProperties("spring.datasource.druid.slave1")
    public DataSource readDataSource1(DruidProperties druidProperties)
    {
        DruidDataSource dataSource = DruidDataSourceBuilder.create().build();
        return druidProperties.dataSource(dataSource);
    }

    @Bean
    @ConfigurationProperties("spring.datasource.druid.slave2")
    public DataSource readDataSource2(DruidProperties druidProperties)
    {
        DruidDataSource dataSource = DruidDataSourceBuilder.create().build();
        return druidProperties.dataSource(dataSource);
    }
}
@Configuration
public class DruidProperties
{
    @Value("${spring.datasource.druid.initialSize}")
    private int initialSize;

    @Value("${spring.datasource.druid.minIdle}")
    private int minIdle;

    @Value("${spring.datasource.druid.maxActive}")
    private int maxActive;

    @Value("${spring.datasource.druid.maxWait}")
    private int maxWait;

    @Value("${spring.datasource.druid.timeBetweenEvictionRunsMillis}")
    private int timeBetweenEvictionRunsMillis;

    @Value("${spring.datasource.druid.minEvictableIdleTimeMillis}")
    private int minEvictableIdleTimeMillis;

    @Value("${spring.datasource.druid.maxEvictableIdleTimeMillis}")
    private int maxEvictableIdleTimeMillis;

    @Value("${spring.datasource.druid.validationQuery}")
    private String validationQuery;

    @Value("${spring.datasource.druid.testWhileIdle}")
    private boolean testWhileIdle;

    @Value("${spring.datasource.druid.testOnBorrow}")
    private boolean testOnBorrow;

    @Value("${spring.datasource.druid.testOnReturn}")
    private boolean testOnReturn;

    public DruidDataSource dataSource(DruidDataSource datasource)
    {
        /** 配置初始化大小、最小、最大 */
        datasource.setInitialSize(initialSize);
        datasource.setMaxActive(maxActive);
        datasource.setMinIdle(minIdle);

        /** 配置获取连接等待超时的时间 */
        datasource.setMaxWait(maxWait);

        /** 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 */
        datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);

        /** 配置一个连接在池中最小、最大生存的时间,单位是毫秒 */
        datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
        datasource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis);

        /**
         * 用来检测连接是否有效的sql,要求是一个查询语句,常用select 'x'。如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。
         */
        datasource.setValidationQuery(validationQuery);
        /** 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 */
        datasource.setTestWhileIdle(testWhileIdle);
        /** 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
        datasource.setTestOnBorrow(testOnBorrow);
        /** 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
        datasource.setTestOnReturn(testOnReturn);
        return datasource;
    }
}

@Configuration
public class MyBatisConfig
{
  
    //注意这里要设置动态数据源
    @Bean
    public SqlSessionFactory sqlSessionFactory(DynamicDataSource dataSource) throws Exception
    {
        String typeAliasesPackage = env.getProperty("mybatis.typeAliasesPackage");
        String mapperLocations = env.getProperty("mybatis.mapperLocations");
        String configLocation = env.getProperty("mybatis.configLocation");
        typeAliasesPackage = setTypeAliasesPackage(typeAliasesPackage);
        VFS.addImplClass(SpringBootVFS.class);

        final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
        //设置动态数据源
        sessionFactory.setDataSource(dataSource);
        //设置mybatis plugin,主要是拦截数据库读写操作,给threadLocal设置master或者slave
        sessionFactory.setPlugins(new DynamicPlugin());
        //设置事务工厂,解决一个事务使用多个数据源的问题
        sessionFactory.setTransactionFactory(new MultiDataSourceTransactionFactory());


        //mybatis mapper和xml包的位置
        sessionFactory.setTypeAliasesPackage(typeAliasesPackage);
        sessionFactory.setMapperLocations(resolveMapperLocations(StringUtils.split(mapperLocations, ",")));
        sessionFactory.setConfigLocation(new DefaultResourceLoader().getResource(configLocation));
        return sessionFactory.getObject();
    }
}
# 数据源配置
spring:
    datasource:
        type: com.alibaba.druid.pool.DruidDataSource
        driverClassName: com.mysql.cj.jdbc.Driver
        druid:
            # 主库数据源
            master:
                url: jdbc:mysql://192.168.238.100:3306/ry-vue?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
                username: root
                password: root
            #从节点个数
            slaveNum: 2
            #从节点选择策略,默认轮询0,随机1
            slaveChangeStrategy: 0
            # 从库数据源
            slave1:
                url: jdbc:mysql://192.168.238.100:3306/ry-vue1?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
                username: root
                password: root
            slave2:
                url: jdbc:mysql://192.168.238.100:3306/ry-vue1?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
                username: root
                password: root
            # 初始连接数
            initialSize: 5
            # 最小连接池数量
            minIdle: 10
            # 最大连接池数量
            maxActive: 20
            # 配置获取连接等待超时的时间
            maxWait: 60000
            # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
            timeBetweenEvictionRunsMillis: 60000
            # 配置一个连接在池中最小生存的时间,单位是毫秒
            minEvictableIdleTimeMillis: 300000
            # 配置一个连接在池中最大生存的时间,单位是毫秒
            maxEvictableIdleTimeMillis: 900000
            # 配置检测连接是否有效
            validationQuery: SELECT 1 FROM DUAL
            testWhileIdle: true
            testOnBorrow: false
            testOnReturn: false
            webStatFilter: 
                enabled: true
            statViewServlet:
                enabled: true
                # 设置白名单,不填则允许所有访问
                allow:
                url-pattern: /druid/*
                # 控制台管理用户名和密码
                login-username: ruoyi
                login-password: 123456
            filter:
                stat:
                    enabled: true
                    # 慢SQL记录
                    log-slow-sql: true
                    slow-sql-millis: 1000
                    merge-sql: true
                wall:
                    config:
                        multi-statement-allow: true

如果要解决一个事务使用多个数据源的问题:可以添加如下这两类并且MybatisConfig加上如下代码
//设置事务工厂,解决一个事务使用多个数据源的问题
sessionFactory.setTransactionFactory(new MultiDataSourceTransactionFactory());

public class MultiDataSourceTransactionFactory extends SpringManagedTransactionFactory {
    @Override
    public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {
        if (dataSource instanceof DynamicDataSource) {
            return new MultiDataSourceTransaction((DynamicDataSource) dataSource);
        } else {
            return super.newTransaction(dataSource, level, autoCommit);
        }
    }
}
public class MultiDataSourceTransaction implements Transaction{
    private static final Log LOGGER = LogFactory.getLog(MultiDataSourceTransaction.class);

    //我的动态数据源
    private final DynamicDataSource dataSource;
 
    private Connection mainConnection;
 
    private String mainDatabaseIdentification;
 
    private Set<Connection> readConnections;

    private boolean isConnectionTransactional;
 
    private boolean autoCommit;
 
 
    public MultiDataSourceTransaction(DynamicDataSource dataSource) {
        notNull(dataSource, "No DataSource specified");
        this.dataSource = dataSource;
        this.readConnections = new HashSet<>();
        mainDatabaseIdentification= DynamicDataSourceContextHolder.getDataSourceType();
    }
 
 
    /**
     * {@inheritDoc}
     */
    @Override
    public Connection getConnection() throws SQLException {
        String databaseIdentification = DynamicDataSourceContextHolder.getDataSourceType();

        if (StringUtils.isEmpty(databaseIdentification)||databaseIdentification.equals(mainDatabaseIdentification)) {
            if (mainConnection != null) return mainConnection;
            else {
                openMainConnection();
                mainDatabaseIdentification =databaseIdentification;
                return mainConnection;
            }
        } else {
                try {
                    Connection conn = dataSource.getConnection();
                    readConnections.add(conn);
                    return conn;
                } catch (SQLException ex) {
                    throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex);
                }
        }
 
    }
 
 
    private void openMainConnection() throws SQLException {
        this.mainConnection = DataSourceUtils.getConnection(this.dataSource);
        this.autoCommit = this.mainConnection.getAutoCommit();
        this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.mainConnection, this.dataSource);
 
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(
                    "JDBC Connection ["
                            + this.mainConnection
                            + "] will"
                            + (this.isConnectionTransactional ? " " : " not ")
                            + "be managed by Spring");
        }
    }
 
    /**
     * {@inheritDoc}
     */
    @Override
    public void commit() throws SQLException {
        if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Committing JDBC Connection [" + this.mainConnection + "]");
            }
            this.mainConnection.commit();
            for (Connection connection : readConnections) {
                connection.commit();
            }
        }
    }
 
    /**
     * {@inheritDoc}
     */
    @Override
    public void rollback() throws SQLException {
        if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Rolling back JDBC Connection [" + this.mainConnection + "]");
            }
            this.mainConnection.rollback();
            for (Connection connection : readConnections) {
                connection.rollback();
            }
        }
    }
 
    /**
     * {@inheritDoc}
     */
    @Override
    public void close() throws SQLException {
        DataSourceUtils.releaseConnection(this.mainConnection, this.dataSource);
        for (Connection connection : readConnections) {
            DataSourceUtils.releaseConnection(connection, this.dataSource);
        }
    }
 
    @Override
    public Integer getTimeout() throws SQLException {
        return null;
    }
}

方案二:
利用mybatis plugin提供的拦截器,可以拦截到当前数据库操作是读还是写,设置到ThreadLocal里面,再利用AbstractRoutingDataSource的determineCurrentLookupKey方法去选择数据源
优点:无需修改代码

添加如下类DynamicPlugin ,此后你的@DataSource可以不再使用了,DataSourceAspect也可以注释掉了
MybatisConfig加上如下代码
//设置mybatis plugin,主要是拦截数据库读写操作,给threadLocal设置master或者slave
sessionFactory.setPlugins(new DynamicPlugin());

@Intercepts({
@Signature(type = Executor.class, method = "update", args = {
        MappedStatement.class, Object.class }),
@Signature(type = Executor.class, method = "query", args = {
        MappedStatement.class, Object.class, RowBounds.class,
        ResultHandler.class }) })
public class DynamicPlugin implements Interceptor {

    protected static final Logger logger = LoggerFactory.getLogger(DynamicPlugin.class);

    private static final String REGEX = ".*insert\\u0020.*|.*delete\\u0020.*|.*update\\u0020.*";

    private static final Map<String, String> cacheMap = new ConcurrentHashMap<>();

    @Override
    public Object intercept(Invocation invocation) throws Throwable {

        boolean synchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();
        if(!synchronizationActive) {
            Object[] objects = invocation.getArgs();
            MappedStatement ms = (MappedStatement) objects[0];

            String dataSourceType = null;

            if((dataSourceType = cacheMap.get(ms.getId())) == null) {
                //读方法
                if(ms.getSqlCommandType().equals(SqlCommandType.SELECT)) {
                    //!selectKey 为自增id查询主键(SELECT LAST_INSERT_ID() )方法,使用主库
                    if(ms.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)) {
                        dataSourceType = DataSourceType.MASTER.name();
                    } else {
                        BoundSql boundSql = ms.getSqlSource().getBoundSql(objects[1]);
                        String sql = boundSql.getSql().toLowerCase(Locale.CHINA).replaceAll("[\\t\\n\\r]", " ");
                        if(sql.matches(REGEX)) {
                            dataSourceType = DataSourceType.MASTER.name();
                        } else {
                            dataSourceType = DataSourceType.SLAVE.name();

                        }
                    }
                }else{
                    dataSourceType = DataSourceType.MASTER.name();
                }
                logger.warn("设置方法[{}] use [{}] Strategy, SqlCommandType [{}]..", ms.getId(), dataSourceType, ms.getSqlCommandType().name());
                cacheMap.put(ms.getId(), dataSourceType);
            }

            DynamicDataSourceContextHolder.setDataSourceType(dataSourceType);
        }

        return invocation.proceed();
    }

    @Override
    public Object plugin(Object target) {
        if (target instanceof Executor) {
            return Plugin.wrap(target, this);
        } else {
            return target;
        }
    }

    @Override
    public void setProperties(Properties properties) {
        //
    }
}

总结: 利用AbstractRoutingDataSource的determineCurrentLookupKey方法实现动态数据源切换,方案一利用aop修改ThreadLocal的值,方案二利用mybatis plugin 的拦截器修改ThreadLocal的值,如果要实现一个事务使用多个数据源,可以继承SpringManagedTransactionFactory,实现自己的事务工厂,让他开启一个事务后,不缓存数据源,每次都会重新选择一个数据源
参考: Spring+MyBatis实现数据库读写分离方案


http://www.niftyadmin.cn/n/1037398.html

相关文章

BUG这个词的由来

提到BUG 相信身为程序员的我们都不会模式 但你知道BUG为什么叫BUG吗&#xff1f; 这是因为早起 有一台跑程序的机器突然出现了故障&#xff0c;工作人员检查是发现 一只小甲虫在机器的一块电路板上被电死了 工作人员随口而出小甲虫 而他说的是英文 BUG 所以 后期就将程序故障称…

priority_queue优先队列的使用

优先队列&#xff0c;本质上就是一个最大堆或者最小堆&#xff0c;最大堆就是优先级最大的那个数在堆顶&#xff0c;最小堆就是优先级最小的在堆顶&#xff0c;这里的优先级指的是数值的大小&#xff0c;也可以通过自己重载<号来重新定义比较方式 头文件 : #include < qu…

java练习notepad工具下载安装步骤

我推荐用这个地址 Notepad 8.1.9 官方中文版 然后点击打开下载好的安装包 然后点OK 下一步 我接受 然后修改一下目录 然后点击下一步 下一步 安装 完成 打开后 点击设置 首选项 左侧点击 新建 默认语言选java 编码选第一个 然后点击关闭即可 然后用notepad打开java文…

冒泡排序及其优化

冒泡核心思想&#xff1a; 冒泡排序向右边冒泡的话&#xff0c;结果就是最后一个数就是最大的 冒泡排序向左边冒泡的话&#xff0c;结果就是第一个数就是最小的 这个好像不是冒泡吧。。。 private static void maoPaoSort(int [] arr){for(int i0;i<arr.length;i){for(int…

set的使用

set&#xff0c;顾名思义&#xff0c;它是一个集合&#xff0c;它当中没有重复的元素&#xff0c;并按照顺序排列好 使用场景 : set一般用于查找问题中的查找有无&#xff0c;即给定一个元素判断这个元素是否存在于这个数组中。还可以用来删除数组中特定的数&#xff08;因为复…

LongAdder是啥?

本文源码研究基于jdk1.8 阅读ConcurrentHashMap源码的时候发现了很多CountCell&#xff0c;看不太懂&#xff0c;所以先来研究一下LongAdder。 LongAdder是啥&#xff1f; LongAdder是用来做线程安全的i自增操作的&#xff0c;我们知道AtomicLong也可以现实这个功能&#xff0…

ConcurrentHashMap-属性解释

ConcurrentHashMap-属性解释 代表hashmap最大能存这么多个键值对 高两位目的是为了控制&#xff1f;知道的评论区说下 private static final int MAXIMUM_CAPACITY 1 << 30;代表hashmap默认容量 private static final int DEFAULT_CAPACITY 16;数组的最大长度 stat…

map的使用

map&#xff0c;我们一般称为字典或是映射&#xff0c;它提供了一对一的映射关系 使用场景 : map用于查找问题中某个元素出现的次数&#xff0c;比如问‘a’这个元素在数组中出现了多少次。这个只是最基本的应用场景&#xff0c;由于可以有任意的键值对的对应&#xff0c;map的…