业务背景

最近有个需求:需要先启动一个web服务,然后这个服务去部署mysql,redis等相关服务,并且该服务要动态的把mysql在不重启项目的情况加载进去。

项目思路

项目先使用内存数据库或者内存进行数据暂存(这里我选择了内存数据库hsql),启动先加载hsqldb数据源,然后根据mysql部署情况动态的添加到相应的数据源

框架选型

springboot-2.11+mysql+hsql

1.pom文件(因为是直接贴出来的,有些不需要的请自行忽略)

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjrt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
        </dependency>
        <dependency>
            <groupId>com.github.penggle</groupId>
            <artifactId>kaptcha</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>${mybatis-spring-boot-starter.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.tomcat</groupId>
                    <artifactId>tomcat-jdbc</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- spring data jpa -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <!-- Swagger -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>${io.springfox.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>${io.springfox.version}</version>
        </dependency>

        <!-- MySQL Connector-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql-connetcor.version}</version>
            <!--<scope>runtime</scope>-->
        </dependency>

        <dependency>
            <groupId>org.mybatis.generator</groupId>
            <artifactId>mybatis-generator-core</artifactId>
            <version>1.3.2</version>
            <scope>compile</scope>
            <optional>true</optional>
        </dependency>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>

        <dependency>
            <groupId>commons-httpclient</groupId>
            <artifactId>commons-httpclient</artifactId>
            <version>${commons-httpclient.version}</version>
        </dependency>

        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>${commons-lang.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>

        <!-- 内存数据库hsqldb -->
        <dependency>
            <groupId>org.hsqldb</groupId>
            <artifactId>hsqldb</artifactId>
            <scope>runtime</scope>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.70</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.2.1</version>
        </dependency>
    </dependencies>

首先需要定义一个数据源切换注解,调用时表明使用哪一个数据源


import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target({ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface DataSource {
    DataSourceTypeEnum value() default DataSourceTypeEnum.MYSQL;
}

既然申明了注解,那么我们就需要去拦截并切换到对应的数据源,这时候就申明一个aop进行拦截处理


import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;

@Aspect
@Component
public class DataSourceAop {
    private static final Logger logger = LoggerFactory.getLogger(DataSourceAop.class);

    @Autowired
    private HttpServletRequest request;

    /**
     * 拦截类作用域的注解
     * @param dataSource
     */
    @Before("@within(dataSource) ")
    public void beforeDataSource(DataSource dataSource) {
        String value = dataSource.value().getDb();
        DataSourceContextHolder.setDataSource(value);

        try {
            logger.info("当前请求:{},当前使用的数据源为:{}",request.getRequestURI(), value);
        }catch (Exception e){
        }
    }

    /**
     * 拦截method作用域的注解
     * @param dataSource
     */
    @Before("@annotation(dataSource) ")
    public void beforeDataSourceMethod(JoinPoint joinPoint,DataSource dataSource) {
        String value = dataSource.value().getDb();
        DataSourceContextHolder.setDataSource(value);
        try {
            logger.info("当前请求:{},当前使用的数据源为:{}",request.getRequestURI(), value);
        }catch (Exception e){
        }
    }

    /**
     * 按需在结束之后处理一些业务
     * @param dataSource
     */
    @After("@within(dataSource) ")
    public void afterDataSource(DataSource dataSource) {
        // todo something with yourself;
    }

    @After("@annotation(dataSource) ")
    public void afterDataSourceMethod(DataSource dataSource) {
        // todo something with yourself;
    }
}

细心的朋友应该已经发现这里用到了DataSourceContextHolder,并且设置了对应的数据源,那这个是做什么的呢?我们继续往下看

import java.util.Map;

public class DataSourceContextHolder {
    // 存放当前线程使用的数据源类型
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();

    //把当前事物下的连接塞入,用于事物处理
    private static ThreadLocal<Map<String, ConnectWarp>> connectionThreadLocal = new ThreadLocal<>();

    // 设置数据源
    public static void setDataSource(String type){
        contextHolder.set(type);
    }

    // 获取数据源
    public static String getDataSource(){
        return contextHolder.get();
    }

    // 清除数据源
    public static void clearDataSource(){
        contextHolder.remove();
    }


    // 设置连接
    public static void setConnection(Map<String, ConnectWarp> connections){
        connectionThreadLocal.set(connections);
    }

    // 获取连接
    public static Map<String, ConnectWarp> getConnection(){
        return connectionThreadLocal.get();
    }

    // 清除连接
    public static void clearConnection(){
        connectionThreadLocal.remove();
    }
    

通过上面的代码我们发现它好像没做什么业务处理,只是单纯的申明了线程变量,以及基本的赋值和查询,那么数据源真正的切换是如何实现的呢?别急,我们马上接近真相了

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

public class DynamicDataSource extends AbstractRoutingDataSource {

    private static Map<Object, Object> dataSourceMap = new HashMap<Object, Object>();

    private static DynamicDataSource instance;

    private static byte[] lock = new byte[0];

    public static DynamicDataSource getInstance() {
        if (instance == null) {
            synchronized (lock) {
                if (instance == null) {
                    instance = new DynamicDataSource();
                }
            }
        }
        return instance;
    }
    
    /**
     * 重写setTargetDataSources,通过入参targetDataSources进行数据源的添加
     * @param targetDataSources
     */
    @Override
    public void setTargetDataSources(Map<Object, Object> targetDataSources) {
        super.setTargetDataSources(targetDataSources);
        dataSourceMap.putAll(targetDataSources);
        super.afterPropertiesSet();
    }

    @Override
    protected Object determineCurrentLookupKey() {
        String dataSource = DataSourceContextHolder.getDataSource();
        return dataSource;
    }

    /**
     * 获取到原有的多数据源,并从该数据源基础上添加一个或多个数据源后,通过上面的setTargetDataSources进行加载
     *
     * @return
     */
    public Map<Object, Object> getDataSourceMap() {
        return dataSourceMap;
    }


    /**
     * 开启事物的时候,把连接放入 线程中,后续crud 都会拿对应的连接操作
     *
     * @param key
     * @param connection
     */
    public void bindConnection(String key, Connection connection) {
        Map<String, ConnectWarp> connectionMap = DataSourceContextHolder.getConnection();
        if (connectionMap == null) {
            connectionMap = new HashMap<>();
        }
        ConnectWarp connectWarp = new ConnectWarp(connection);

        connectionMap.put(key, connectWarp);
        DataSourceContextHolder.setConnection(connectionMap);

    }


    /**
     * 提交事物
     *
     * @throws SQLException
     */
    protected void doCommit() throws SQLException {
        Map<String, ConnectWarp> stringConnectionMap = DataSourceContextHolder.getConnection();
        if (stringConnectionMap == null) {
            return;
        }
        for (String dataSourceName : stringConnectionMap.keySet()) {
            ConnectWarp connection = stringConnectionMap.get(dataSourceName);
            connection.commit(true);
            connection.close(true);
        }
        DataSourceContextHolder.clearConnection();
    }

    /**
     * 撤销事物
     *
     * @throws SQLException
     */
    protected void rollback() throws SQLException {
        Map<String, ConnectWarp> stringConnectionMap = DataSourceContextHolder.getConnection();
        if (stringConnectionMap == null) {
            return;
        }
        for (String dataSourceName : stringConnectionMap.keySet()) {
            ConnectWarp connection = stringConnectionMap.get(dataSourceName);
            connection.rollback();
            connection.close(true);
        }
        DataSourceContextHolder.clearConnection();
    }


    /**
     * 如果 在connectionThreadLocal 中有 说明开启了事物,就从这里面拿
     *
     * @return
     * @throws SQLException
     */
    @Override
    public Connection getConnection() throws SQLException {
        Map<String, ConnectWarp> stringConnectionMap = DataSourceContextHolder.getConnection();
        if (stringConnectionMap == null) {
            //没开事物 直接走
            return determineTargetDataSource().getConnection();
        } else {
            //开了事物,从当前线程中拿,而且拿到的是 包装过的connect 只有我能关闭O__O "…
            String currentName = (String) determineCurrentLookupKey();
            return stringConnectionMap.get(currentName);
        }

    }
}

这个类里面最核心的就是setTargetDataSourcesdetermineCurrentLookupKey两个方法。首先setTargetDataSources重写,保证我们能够动态的添加数据源;然后determineCurrentLookupKey重写,保证我们能够从我们动态添加的数据源里面取出相应的数据源。
到这里呢就差不多快成功了,只差最后一步了,我们来看看这关键的最后一步:添加到spring管理


import com.github.pagehelper.PageInterceptor;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;


@Configuration
public class DataSourceConfig {


    @Autowired
    private HsqlConfig hsqlConfig;

    @Bean("hsqlDataSource")
    @Primary
    public DataSource hsqlDataSource() {
        HikariDataSource dataSource = new HikariDataSource();
        BeanUtils.copyProperties(hsqlConfig, dataSource);
        dataSource.setJdbcUrl(hsqlConfig.getJdbcUrl());
        return dataSource;
    }

    @Bean
    public DynamicDataSource dataSource(@Qualifier("hsqlDataSource") DataSource hsqlDataSource) {
        Map<Object, Object> map = new HashMap<>();
        map.put(DataSourceTypeEnum.HSQL.getDb(), hsqlDataSource);

        DynamicDataSource dynamicDataSource =  DynamicDataSource.getInstance();
        dynamicDataSource.setTargetDataSources(map);

        return dynamicDataSource;
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory(DynamicDataSource dynamicDataSource) throws Exception {

        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(dynamicDataSource);
        factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));
        //分页插件
        Interceptor interceptor = new PageInterceptor();
        Properties properties = new Properties();
        properties.setProperty("helperDialect", "mysql");
        properties.setProperty("offsetAsPageNum", "true");
        properties.setProperty("rowBoundsWithCount", "true");
        properties.setProperty("reasonable", "true");
        properties.setProperty("supportMethodsArguments", "true");
        interceptor.setProperties(properties);

        Interceptor[] plugins = {interceptor};
        factoryBean.setPlugins(plugins);
        return factoryBean.getObject();

    }

    @Bean
    @Qualifier("transactionManager")
    public PlatformTransactionManager transactionManager(DynamicDataSource dynamicDataSource) {
        return new DataSourceTransactionManager(dynamicDataSource);
    }

}

到这里这个多数据动态添加已经切换就已经OK了,然后有些很细心很细心的朋友发现了,我里面对connect也做了写处理,这个是重写了事务机制,这个我们下期继续分析。

Logo

更多推荐