感谢作者贡献源码,传送门

        在使用tcc的过程,默认是使用mysql,在项目中使用时,因为存储的数据量较大,导致总是出现数据值过大的错误(通过更新字段类型或更新字段长度可以解决), 于是为了不更新源码,采用了redis作为了持久层,但是线上的环境redis是不会单实例的,保证高可靠,势必会采用主从,提高性能,势必采用集群,那么问题来了,tcc源码中未对redis的集群进行支持。这是扩展源码原因一。

        redis实现集群有多种方式,原生集群,通过twemproxy(twitter)或是其他的开源插件如codis都能实现集群,但是他们都不支持key的模糊查询,及时redis原生的集群也是使用jedisCluster而不用redis(通过jedisCluster能得到jedis,后面代码中有使用示例)。此为扩展源码的原因之二。

        直接上关键代码并加以说明

1、定义jedisCluster的模板方法接口

public interface JedisClusterCallback<T> {
    /**
    *  支持redis cluster
    */
    public T doInJedisCluster(JedisCluster jedisCluster);
}

2、扩展RedisHelper添加JedisCluster模板方法的调用

/**
     *  扩展redis cluster
     *  @Method_Name             :executeCluster
     *  @param jedisCluster
     *  @param callback
     *  @return T
     *  @Creation Date           :2018/6/12
     */
    public static <T> T execute(JedisCluster jedisCluster, JedisClusterCallback<T> callback) {
        try {
            return callback.doInJedisCluster(jedisCluster);
        } finally {
            if (jedisCluster != null) {
//                次数不要执行jedisCluster.close()的方法,jedisCluster在使用结束后会自动释放jedis资源
            }
        }
    }

3、添加RedisClusterTransactionReository类并实现CachableTransactionRepository接口,RedisClusterTransactionReository与RedisTransactionReository接口方法一样,只是调用的JedisCluster的模板方法进行值得操作,后面只对模糊查询的实现进行说明,其他的增删,JedisCluster与Jedis的用法基本一致。

package org.mengyun.tcctransaction.repository;

import org.apache.log4j.Logger;
import org.mengyun.tcctransaction.Transaction;
import org.mengyun.tcctransaction.repository.helper.ExpandTransactionSerializer;
import org.mengyun.tcctransaction.repository.helper.JedisClusterCallback;
import org.mengyun.tcctransaction.repository.helper.JedisClusterExtend;
import org.mengyun.tcctransaction.repository.helper.RedisHelper;
import org.mengyun.tcctransaction.serializer.JdkSerializationSerializer;
import org.mengyun.tcctransaction.serializer.ObjectSerializer;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;

import javax.transaction.xa.Xid;
import java.util.*;

/**
 * Created by zc.ding on 2018/06/12
 */
public class RedisClusterTransactionRepository extends CachableTransactionRepository {

    static final Logger logger = Logger.getLogger(RedisClusterTransactionRepository.class.getSimpleName());

    private JedisCluster jedisCluster;
    
    private JedisClusterExtend jedisClusterExtend;

    private String keyPrefix = "TCC:";

    public void setKeyPrefix(String keyPrefix) {
        this.keyPrefix = keyPrefix;
    }

    private ObjectSerializer serializer = new JdkSerializationSerializer();

    public void setSerializer(ObjectSerializer serializer) {
        this.serializer = serializer;
    }

    public void setJedisClusterExtend(JedisClusterExtend jedisClusterExtend) {
        this.jedisClusterExtend = jedisClusterExtend;
        this.jedisCluster = jedisClusterExtend.getJedisCluster();
    }

    public void setJedisCluster(JedisCluster jedisCluster) {
        this.jedisCluster = jedisCluster;
    }

    @Override
    protected int doCreate(final Transaction transaction) {
        try {
            Long statusCode = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Long>() {
                @Override
                public Long doInJedisCluster(JedisCluster jedisCluster) {
                    List<byte[]> params = new ArrayList<byte[]>();
                    for (Map.Entry<byte[], byte[]> entry : ExpandTransactionSerializer.serialize(serializer, transaction).entrySet()) {
                        params.add(entry.getKey());
                        params.add(entry.getValue());
                    }
                    Object result = jedisCluster.eval("if redis.call('exists', KEYS[1]) == 0 then redis.call('hmset', KEYS[1], unpack(ARGV)); return 1; end; return 0;".getBytes(),
                            Arrays.asList(RedisHelper.getRedisKey(keyPrefix, transaction.getXid())), params);
                    return (Long) result;
                }
            });
            return statusCode.intValue();
        } catch (Exception e) {
            throw new TransactionIOException(e);
        }
    }

    @Override
    protected int doUpdate(final Transaction transaction) {
        try {
            Long statusCode = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Long>() {
                @Override
                public Long doInJedisCluster(JedisCluster jedisCluster) {
                    transaction.updateTime();
                    transaction.updateVersion();
                    List<byte[]> params = new ArrayList<byte[]>();
                    for (Map.Entry<byte[], byte[]> entry : ExpandTransactionSerializer.serialize(serializer, transaction).entrySet()) {
                        params.add(entry.getKey());
                        params.add(entry.getValue());
                    }
                    Object result = jedisCluster.eval(String.format("if redis.call('hget',KEYS[1],'VERSION') == '%s' then redis.call('hmset', KEYS[1], unpack(ARGV)); return 1; end; return 0;",
                            transaction.getVersion() - 1).getBytes(),
                            Arrays.asList(RedisHelper.getRedisKey(keyPrefix, transaction.getXid())), params);

                    return (Long) result;
                }
            });
            return statusCode.intValue();
        } catch (Exception e) {
            throw new TransactionIOException(e);
        }
    }

    @Override
    protected int doDelete(final Transaction transaction) {
        try {
            Long result = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Long>() {
                @Override
                public Long doInJedisCluster(JedisCluster jedisCluster) {
                    return jedisCluster.del(RedisHelper.getRedisKey(keyPrefix, transaction.getXid()));
                }
            });
            return result.intValue();
        } catch (Exception e) {
            throw new TransactionIOException(e);
        }
    }

    @Override
    protected Transaction doFindOne(final Xid xid) {
        try {
            Long startTime = System.currentTimeMillis();
            Map<byte[], byte[]> content = RedisHelper.execute(jedisCluster, new JedisClusterCallback<Map<byte[], byte[]>>() {
                @Override
                public Map<byte[], byte[]> doInJedisCluster(JedisCluster jedisCluster) {
                    return jedisCluster.hgetAll(RedisHelper.getRedisKey(keyPrefix, xid));
                }
            });
            logger.info("redis find cost time :" + (System.currentTimeMillis() - startTime));
            if (content != null && content.size() > 0) {
                return ExpandTransactionSerializer.deserialize(serializer, content);
            }
            return null;
        } catch (Exception e) {
            throw new TransactionIOException(e);
        }
    }

    @Override
    protected List<Transaction> doFindAllUnmodifiedSince(Date date) {
        List<Transaction> allTransactions = doFindAll();
        List<Transaction> allUnmodifiedSince = new ArrayList<Transaction>();
        for (Transaction transaction : allTransactions) {
            if (transaction.getLastUpdateTime().compareTo(date) < 0) {
                allUnmodifiedSince.add(transaction);
            }
        }
        return allUnmodifiedSince;
    }

    protected List<Transaction> doFindAll() {
        List<Transaction> list = new ArrayList<Transaction>();
        try {
            Set<byte[]> allKeys = new HashSet<byte[]>();
            Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
            String pattern = keyPrefix + "*";
            for(String k : clusterNodes.keySet()){
                logger.debug("Getting keys from: " + pattern);
                JedisPool jp = clusterNodes.get(k);
                Jedis jedis = jp.getResource();
                try {
                    allKeys.addAll(jedis.keys(pattern.getBytes()));
                } catch(Exception e){
                    logger.error("Getting keys error: {}", e);
                } finally{
                    logger.debug("Connection closed.");
                    jedis.close();
                }
            }
            for (final byte[] key : allKeys) {
                Map<byte[], byte[]> map = jedisCluster.hgetAll(key);
                list.add(ExpandTransactionSerializer.deserialize(serializer, map));
            }
        } catch (Exception e) {
            throw new TransactionIOException(e);
        }
        return list;
    }
}

doFindAll()是定时依赖的方法,此处用到了模糊查询(TCC监控的模块也用到了模糊查询,原理基本一致),

protected List<Transaction> doFindAll() {
        List<Transaction> list = new ArrayList<Transaction>();
        try {
            Set<byte[]> allKeys = new HashSet<byte[]>();//一定不要用List<byte[]>,只能用Set,防止下面从redis从节点取出重复数据
            Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();//获得集群的所有节点,主从节点都有,及时我们定义是只定义了主节点列表
            String pattern = keyPrefix + "*";
            for(String k : clusterNodes.keySet()){
                logger.debug("Getting keys from: " + pattern);
                JedisPool jp = clusterNodes.get(k);
                Jedis jedis = jp.getResource();
                try {
                    allKeys.addAll(jedis.keys(pattern.getBytes()));//遍历主从节点,拿出所有满足的key
                } catch(Exception e){
                    logger.error("Getting keys error: {}", e);
                } finally{
                    logger.debug("Connection closed.");
                    jedis.close();//此处一点要close(), 因为此时用的redis而不是redisCluster
                }
            }
            for (final byte[] key : allKeys) {
                Map<byte[], byte[]> map = jedisCluster.hgetAll(key);
                list.add(ExpandTransactionSerializer.deserialize(serializer, map));
            }
        } catch (Exception e) {
            throw new TransactionIOException(e);
        }
        return list;
    }

其模糊查询的原理,就是遍历所有的主从节点,遍历每一个节点上的满足条件的key,由于是集群+主从,所有相同的key一定会有两份,通过Set自身去重,剩下就是我们这次模糊查询结果。

JedisCluster的配置是基于HostAndPort配置,查看JedisCluster发现配置集群依赖Set<HostAndPort>,为了便于配置在RedisClusterTransactionReository中添加了jedisClusterExtend的扩展方便配置。jedisClusterExtend实现方式相对清晰,自定义属性redisClusterIp支持ip:port,ip:port方式解析,这个配置就简单很多了。

例如配置JedisCluster集群如下

<!-- redisl cluster -->
    <bean id="transactionRepository" class="org.mengyun.tcctransaction.repository.RedisClusterTransactionRepository">
        <property name="keyPrefix" value="TCC:CAP:"/>
        <!--<property name="jedisCluster" ref="jedisCluster"/>-->
        <property name="jedisClusterExtend" ref="jedisClusterExtend"/>
    </bean> 
    <bean id="jedisClusterExtend" class="org.mengyun.tcctransaction.repository.helper.JedisClusterExtend">
        <constructor-arg index="0" value="#{tccDb['redis.cluster.ip']}" type="java.lang.String"/>
        <constructor-arg index="1" ref="jedisPoolConfig"/>
    </bean>    
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
        <property name="maxTotal" value="#{tccDb['redis.total']}"/>
        <property name="maxWaitMillis" value="#{tccDb['redis.max.wait.millis']}"/>
    </bean>

properties中属性redis.cluster.ip=127.0.0.1:7000,127.0.0.1:7001即可,可以对比JedisCluster默认配置,会发现简洁很多。

      注意,使用redis_cluster时,redis的客户端需使用2.9.0以上版本,否则初始化JedisCluster会出现ip端口解析失败的异常。    

      至此tcc-transaction支持redis远程集群的核心内容已说明,TCC监控的服务也已支持redis原生集群。更多详细内容请看源码中dubbo的demo和tcc-transcation-server项目,分支request已提交给原作者。这里再次为tcc原作者开源精神点赞!

       欢迎各位指点交流......

Logo

更多推荐