Redis客户端工具JedisCluster

释放双眼,带上耳机,听听看~!

JedisCluster介绍

Jedis为Redis Cluster提供了Smart客户端,也就是JedisCluster类。下面我们看一下JedisCluster的初始化方法。

  public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, GenericObjectPoolConfig poolConfig) {
    super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, poolConfig);
  }


JedisCluster在初始化时主要有5个参数,下面我们详细介绍一下这5个参数的作用。


- Set<HostAndPort> jedisClusterNode:所有Redis Cluster节点信息。
- int connectionTimeout:连接超时。
- int soTimeout:读写超时。
- int maxAttempts:重试次数。
- GenericObjectPoolConfig poolConfig:连接池参数,JedisCluster会为RedisCluster的每个节点都创建连接池。


下面我们用代码来演示一下,怎么用JedisCluster向Redis集群中添加数据。

import java.util.HashSet;
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

public class Test {

  public static void main(String[] args) {
    Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6379));
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6380));
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6381));
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6382));
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6383));
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6384));
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6385));
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6386));

    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    JedisCluster jedisCluster = new JedisCluster(jedisClusterNode, 1000, 1000, 5, poolConfig);

    System.out.println(jedisCluster.get("jilinwula"));
    jedisCluster.set("jilinwula", "jilinwula.com");
    System.out.println(jedisCluster.get("jilinwula"));

  }

}

日志:

null
jilinwula.com


备注:

  • JedisCluster包含了所有节点的连接池,所以建议JedisCluster使用单例。
  • JedisCluster每次操作完成后,不需要管理连接池的借还,它在内部完成。
  • JedisCluster一般不要执行close()操作,它会将所有JedisPool执行destroy操作。

多节点命令和操作

JedisCluster虽然提供了分布式的特性,但有点的操作还是需要遍历所有节点才可以完成。具体步骤如下:

  1. 通过JedisCluster.getClusterNodes()方法获取所有节点的连接池。
  2. 使用info replication筛选主节点。
  3. 遍历主节点,使用scan命令找到指定模式的key,然后使用Pipeline机制删除,已保证原子性。
public class Test {

  public static void main(String[] args) {
    Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6379));
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6380));
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6381));
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6382));
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6383));
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6384));
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6385));
    jedisClusterNode.add(new HostAndPort("127.0.0.1", 6386));

    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    JedisCluster jedisCluster = new JedisCluster(jedisClusterNode, 1000, 1000, 5, poolConfig);
    Map<String, JedisPool> jedisPoolMap = jedisCluster.getClusterNodes();
    for (Map.Entry<String, JedisPool> entry : jedisPoolMap.entrySet()) {
      Jedis jedis = entry.getValue().getResource();
      String[] datas = jedis.info("Replication").split("
");
      for (String line : datas) {
        if ("role:master".equals(line.trim())) {
          Pipeline pipeline = jedis.pipelined();
          String cursor = "0";
          ScanParams params = new ScanParams().count(1).match("jilinwula");
          while (true) {
            ScanResult<String> scanResult = jedis.scan(cursor, params);
            List<String> keyList = scanResult.getResult();
            if (keyList != null && keyList.size() > 0) {
              for (String key : keyList) {
                pipeline.del(key);
              }
              pipeline.syncAndReturnAll();
            }
            cursor = scanResult.getStringCursor();
            if ("0".equals(cursor)) {
              break;
            }
          }
        }
      }
    }
  }

}
0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧