springboot集成zookeeper简单使用

新增zk依赖

<!--zookeeper-->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.5</version>
    <exclusions>
        <exclusion>
            <artifactId>slf4j-log4j12</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>log4j</artifactId>
            <groupId>log4j</groupId>
        </exclusion>
    </exclusions>
</dependency>
<!-- curator-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
    <exclusions>
        <exclusion>
            <artifactId>zookeeper</artifactId>
            <groupId>org.apache.zookeeper</groupId>
        </exclusion>
    </exclusions>
</dependency>
​
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.3.0</version>
</dependency>

新增zk配置

zookeeper:
  #zk连接地址
  address: 192.168.23.130:2181
  #用户名
  username:
  #密码
  password:
  # zk超时信息设置
  connection-timeout-ms: 5000
  # 会话超时时间,默认是20*tickTime
  session-timeout-ms: 5000
  #重试连接间隔:ms
  base-sleep-time-ms: 1000
  # 最大重试次数
  max-retries: 20

新增加载配置类

package cn.yyzmain.zk.config;
​
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
​
/**
 * ZK配置信息
 */
@Component
@ConfigurationProperties(prefix = "zookeeper")
@Getter
@Setter
public class ZkProperties {
​
    /**
     * zk连接地址
     */
    private String address;
    /**
     * 用户名
     */
    private String username;
    /**
     * 密码
     */
    private String password;
    /**
     * 本地缓存路径
     */
    private String localCachePath;
​
    /**
     * 会话超时时间:ms
     */
    private int sessionTimeoutMs = 40000;
    /**
     * 连接超时时间:ms
     */
    private int connectionTimeoutMs = 40000;
    /**
     * 重试间隔:ms
     */
    private int baseSleepTimeMs = 1000;
    /**
     * 最大重试次数
     */
    private int maxRetries = 20;
​
}

新增zk初始化类

package cn.yyzmain.zk.config;
​
import cn.yyzmain.zk.utils.ZkTool;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
​
​
/**
 * zk初始化
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class ZkBootStrap implements ApplicationRunner {
​
    private final ZkProperties zkProperties;
​
    public static final ZkTool ZK = ZkTool.getInstance();
​
    @Override
    public void run(ApplicationArguments args) {
        try {
            log.info("========>>>zk client is starting...");
            //重连策略:间隔1秒重连,重连20次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(zkProperties.getBaseSleepTimeMs(), zkProperties.getMaxRetries());
            ZK.setUrl(zkProperties.getAddress());
            ZK.setUsername(zkProperties.getUsername());
            ZK.setPassword(zkProperties.getPassword());
            ZK.setSessionTimeoutMs(zkProperties.getSessionTimeoutMs());
            ZK.setConnectionTimeoutMs(zkProperties.getConnectionTimeoutMs());
            ZK.setRetryPolicy(retryPolicy);
            ZK.init();
            log.info("========>>>zk client is started..");
        } catch (Exception e) {
            log.error("========>>>zk client is start error..", e);
        }
    }
}

新增zk的Watcher类

package cn.yyzmain.zk.utils;
​
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
​
​
public class BaseZookeeper implements Watcher{
 
   private ZooKeeper zookeeper;
    /**
     * 超时时间
     */
   private static final int SESSION_TIME_OUT = 2;
   private CountDownLatch countDownLatch = new CountDownLatch(1);
   
   public void process(WatchedEvent event) {
      if (event.getState() == KeeperState.SyncConnected) {
         System.out.println("Watch received event");
         countDownLatch.countDown();
      }
   }
​
​
​
  
   /**连接zookeeper
    * @param host
    * @throws Exception
    */
   public void connectZookeeper(String host) throws Exception{
      zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
      countDownLatch.await();
      System.out.println("zookeeper connection success");
   }
  
   /**
    * 创建节点
    * @param path
    * @param data
    * @throws Exception
    */
   public String createNode(String path,String data) throws Exception{
      String s = this.zookeeper.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
      countDownLatch.countDown();
      return s;
​
   }
  
   /**
    * 获取路径下所有子节点
    * @param path
    * @return
    * @throws KeeperException
    * @throws InterruptedException
    */
   public List<String> getChildren(String path) throws KeeperException, InterruptedException{
      List<String> children = zookeeper.getChildren(path, false);
      return children;
   }
  
   /**
    * 获取节点上面的数据
    * @param path  路径
    * @return
    * @throws KeeperException
    * @throws InterruptedException
    */
   public String getData(String path) throws KeeperException, InterruptedException{
      byte[] data = zookeeper.getData(path, false, null);
      if (data == null) {
         return "";
      }
      return new String(data);
   }
  
   /**
    * 设置节点信息
    * @param path  路径
    * @param data  数据
    * @return
    * @throws KeeperException
    * @throws InterruptedException
    */
   public Stat setData(String path,String data) throws KeeperException, InterruptedException{
      Stat stat = zookeeper.setData(path, data.getBytes(), -1);
      return stat;
   }
  
   /**
    * 删除节点
    * @param path
    * @throws InterruptedException
    * @throws KeeperException
    */
   public void deleteNode(String path) throws InterruptedException, KeeperException{
      zookeeper.delete(path, -1);
   }
  
   /**
    * 获取创建时间
    * @param path
    * @return
    * @throws KeeperException
    * @throws InterruptedException
    */
   public String getCTime(String path) throws KeeperException, InterruptedException{
      Stat stat = zookeeper.exists(path, false);
      return String.valueOf(stat.getCtime());
   }
  
   /**
    * 获取某个路径下孩子的数量
    * @param path
    * @return
    * @throws KeeperException
    * @throws InterruptedException
    */
   public Integer getChildrenNum(String path) throws KeeperException, InterruptedException{
      int childenNum = zookeeper.getChildren(path, false).size();
      return childenNum;
   }
   /**
    * 关闭连接
    * @throws InterruptedException
    */
   public void closeConnection() throws InterruptedException{
      if (zookeeper != null) {
         zookeeper.close();
      }
   }
   public static void main(String[] args) throws Exception {
       BaseZookeeper zookeeper = new BaseZookeeper();
       zookeeper.connectZookeeper("192.168.126.128:2181");
       List<String> children = zookeeper.getChildren("/");
       System.out.println(children);
   }
}

新增zk工具类

以下工具类,包含zk的基本读写操作、锁等

package cn.yyzmain.zk.utils;
​
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
​
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
​
/**
 * <p>Title: zk工具类</p>
 *
 * @author yyzmain
 * @version 1.0
 */
@Slf4j
@Data
@NoArgsConstructor
public class ZkTool {
​
    /**
     * 客户端
     */
    private CuratorFramework client;
​
    /**
     * ZK连接地址
     */
    private String url;
​
    /**
     * 用户名
     */
    private String username;
    /**
     * 密码
     */
    private String password;
​
    /**
     * 本地缓存路径
     */
    private String localCachePath;
​
    /**
     * 会话超时时间
     */
    private int sessionTimeoutMs;
    /**
     * 连接超时时间
     */
    private int connectionTimeoutMs;
​
    /**
     * 用于本地缓存
     */
    public TreeCache cache;
​
    private RetryPolicy retryPolicy;
​
​
    /**
     * volatile 关键字阻止了使用singleton代码行前后的指令重排,保证线程安全
     */
    private static ZkTool instance;
​
    public static ZkTool getInstance() {
        if (instance == null) {
            synchronized (ZkTool.class) {
                if (instance == null) {
                    instance = new ZkTool();
                }
            }
        }
        return instance;
    }
​
    public void init() {
        try {
            if (Strings.isNullOrEmpty(username) || Strings.isNullOrEmpty(password)) {
                client = getClient(url, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
            } else {
                client = getClient(url, username, password, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
            }
            if (!Strings.isNullOrEmpty(localCachePath)) {
                initLocalCache(localCachePath);
            }
        } catch (Exception e) {
            log.error("===>>> zk client init error!", e);
        }
    }
​
​
    /**
     * 获取zk连接 用完close()
     *
     * @param url zk地址
     */
    public static CuratorFramework getClient(String url, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {
        CuratorFramework client = CuratorFrameworkFactory.newClient(url, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
        return startAndListenClient(client);
    }
​
    /**
     * 获取包含namespace的客户端,该客户端创建的path最终地址为namespace+path
     *
     * @param namespace 命名空间
     * @param url       zk连接地址
     */
    public static CuratorFramework getClientWithNamespace(String namespace, String url, int sessionTimeoutMs,
                                                          int connectionTimeoutMs, RetryPolicy retryPolicy) {
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .namespace(namespace)
                .connectString(url)
                .sessionTimeoutMs(sessionTimeoutMs)
                .connectionTimeoutMs(connectionTimeoutMs)
                .retryPolicy(retryPolicy)
                .build();
        return startAndListenClient(client);
    }
​
    /**
     * 获取zk连接(带用户名密码) 用完close()
     *
     * @param url zk地址
     */
    public static CuratorFramework getClient(String url, String username, String password, int sessionTimeoutMs,
                                             int connectionTimeoutMs, RetryPolicy retryPolicy) {
​
        CuratorFramework client = CuratorFrameworkFactory
                .builder()
                .connectString(url)
                .sessionTimeoutMs(sessionTimeoutMs)
                .connectionTimeoutMs(connectionTimeoutMs)
                .authorization("digest", (username + ":" + password).getBytes())
                .retryPolicy(retryPolicy)
                .build();
        return startAndListenClient(client);
    }
​
    private static CuratorFramework startAndListenClient(CuratorFramework client) {
        if (client != null) {
            client.start();
            client.getConnectionStateListenable().addListener((cli, state) -> {
                if (state == ConnectionState.LOST) {
                    //连接丢失
                    log.info("===>>>lost session with zookeeper");
                } else if (state == ConnectionState.CONNECTED) {
                    //连接新建
                    log.info("===>>>connected with zookeeper");
                } else if (state == ConnectionState.RECONNECTED) {
                    log.info("===>>>reconnected with zookeeper");
                }
            });
        }
        return client;
    }
​
    /**
     * 初始化本地缓存
     */
    private void initLocalCache(String watchRootPath) throws Exception {
        cache = new TreeCache(client, watchRootPath);
        TreeCacheListener listener = (cli, event) -> {
            log.info("event:" + event.getType() + " |path:" + (null != event.getData() ? event.getData().getPath() : null));
​
            if (event.getData() != null && event.getData().getData() != null) {
                log.info("发生变化的节点内容为:" + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
    }
​
    /**
     * 获取不可重入锁
     *
     * @param lockNode
     * @return
     */
    public InterProcessSemaphoreMutex getLock(String lockNode) {
        InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, lockNode);
        try {
            lock.acquire(100, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.error("===>>>获取锁异常!", e);
        }
        return lock;
    }
​
    /**
     * 释放不可重入锁
     *
     * @param lock
     * @return
     */
    public void releaseLock(InterProcessSemaphoreMutex lock) {
        if (lock != null && lock.isAcquiredInThisProcess()) {
            try {
                lock.release();
            } catch (Exception e) {
                log.error("===>>>解锁异常!", e);
            }
        } else {
            log.info("===>>>锁已释放!");
        }
    }
​
    /**
     * 释放锁(删除节点)
     *
     * @param lockName 锁节点
     */
    public void releaseLock(String lockName) {
        try {
            if (StringUtils.isNotEmpty(lockName)) {
                delete(lockName, true, true);
            }
        } catch (Exception e) {
            log.error("===>>>锁释放异常!", e);
        }
    }
​
    /**
     * 获取指定节点的数据
     */
    public byte[] getNodeData(String path) {
        try {
            if (cache != null) {
                ChildData data = cache.getCurrentData(path);
                if (data != null) {
                    return data.getData();
                }
            }
            client.getData().forPath(path);
            return client.getData().forPath(path);
        } catch (Exception ex) {
            log.error("===>>>getNodeData error !", ex);
        }
        return null;
    }
​
    /**
     * 获取路径下的子目录
     *
     * @param path 指定路径
     */
    public static List<String> getChildren(CuratorFramework client, String path) {
        try {
            if (!isExist(client, path)) {
                return Lists.newArrayList();
            }
            return client.getChildren().forPath(path);
        } catch (Exception e) {
            log.error("===>>>getChildren error!", e);
            return Lists.newArrayList();
        }
    }
​
    public List<String> getChildren(String path) {
        return getChildren(client, path);
    }
​
    /**
     * 关闭连接
     */
    public void stop() {
        client.close();
    }
​
    /**
     * 获取路径的数据
     */
    public static String getData(CuratorFramework client, String path) {
        try {
            if (isExist(client, path)) {
                byte[] bytes = client.getData().forPath(path);
                if (bytes != null) {
                    return new String(bytes);
                }
            } else {
                log.error("===>>>节点【{}】不存在!", path);
            }
        } catch (Exception e) {
            log.error("===>>>getData error!", e);
        }
        return null;
    }
​
    public String getData(String path) {
        return getData(client, path);
    }
​
    /**
     * 创建永久节点
     *
     * @param client curator 客户端
     * @param path   zk路径
     * @param data   节点数据
     * @return 节点路径
     */
    public static String createPersistentNode(CuratorFramework client, String path, String data) {
        return create(client, path, data, CreateMode.PERSISTENT);
    }
​
    public String createPersistentNode(String path, String data) {
        return createPersistentNode(client, path, data);
    }
​
    /**
     * 创建永久节点(权限控制)
     *
     * @return 节点路径
     */
    public static String createPersistentNode(CuratorFramework client, String path, String data, ArrayList<ACL> acls) {
        return create(client, path, data, CreateMode.PERSISTENT, acls);
    }
​
    public String createPersistentNode(String path, String data, ArrayList<ACL> acls) {
        return createPersistentNode(client, path, data, acls);
    }
​
    /**
     * 创建永久顺序节点
     *
     * @return 节点路径
     */
    public static String createPersistentSequentialNode(CuratorFramework client, String path, String data) {
        return create(client, path, data, CreateMode.PERSISTENT_SEQUENTIAL);
    }
​
    public String createPersistentSequentialNode(String path, String data) {
        return createPersistentSequentialNode(client, path, data);
    }
​
    /**
     * 创建永久顺序节点(权限控制)
     *
     * @return 节点路径
     */
    public static String createPersistentSequentialNode(CuratorFramework client, String path, String data, ArrayList<ACL> acls) {
        return create(client, path, data, CreateMode.PERSISTENT_SEQUENTIAL, acls);
    }
​
    public String createPersistentSequentialNode(String path, String data, ArrayList<ACL> acls) {
        return createPersistentSequentialNode(client, path, data, acls);
    }
​
    /**
     * 创建临时节点
     *
     * @return 节点路径
     */
    public static String createEphemeralNode(CuratorFramework client, String path, String data) {
        return create(client, path, data, CreateMode.EPHEMERAL);
    }
​
    public String createEphemeralNode(String path, String data) {
        return createEphemeralNode(client, path, data);
    }
​
    /**
     * 创建临时节点(权限控制)
     *
     * @return 节点路径
     */
    public static String createEphemeralNode(CuratorFramework client, String path, String data, ArrayList<ACL> acls) {
        return create(client, path, data, CreateMode.EPHEMERAL, acls);
    }
​
    public String createEphemeralNode(String path, String data, ArrayList<ACL> acls) {
        return createEphemeralNode(client, path, data, acls);
    }
​
    /**
     * 创建临时顺序节点
     *
     * @return 节点路径
     */
    public static String createEphemeralSequentialNode(CuratorFramework client, String path, String data) {
        return create(client, path, data, CreateMode.EPHEMERAL_SEQUENTIAL);
    }
​
    public String createEphemeralSequentialNode(String path, String data) {
        return createEphemeralSequentialNode(client, path, data);
    }
​
    /**
     * 创建临时顺序节点(权限控制)
     *
     * @return 节点路径
     */
    public static String createEphemeralSequentialNode(CuratorFramework client, String path, String data, ArrayList<ACL> acls) {
        return create(client, path, data, CreateMode.EPHEMERAL_SEQUENTIAL, acls);
    }
​
    public String createEphemeralSequentialNode(String path, String data, ArrayList<ACL> acls) {
        return createEphemeralSequentialNode(client, path, data, acls);
    }
​
    /**
     * 创建节点
     *
     * @param mode 节点类型
     *             1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
     *             2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
     *             3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
     *             4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,
     *             并且根据当前已经存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
     */
    private static String create(CuratorFramework client, String path, String data, CreateMode mode) {
        try {
            return client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, Strings.isNullOrEmpty(data) ? null : data.getBytes());
        } catch (Exception e) {
            log.error("===>>>create error!", e);
            return null;
        }
    }
​
    /**
     * 创建节点(带权限控制)
     */
    private static String create(CuratorFramework client, String path, String data, CreateMode mode, ArrayList<ACL> acls) {
        try {
            return client.create().creatingParentsIfNeeded().withMode(mode).withACL(acls).forPath(path, Strings.isNullOrEmpty(data) ? null : data.getBytes());
        } catch (Exception e) {
            log.error("===>>>create with acls error!", e);
            return null;
        }
    }
​
    /**
     * 修改数据
     */
    public static void update(CuratorFramework client, String path, String data) throws Exception {
        client.setData().forPath(path, Strings.isNullOrEmpty(data) ? null : data.getBytes());
    }
​
    public void update(String path, String data) throws Exception {
        if (isExist(path)) {
            update(client, path, data);
        }
    }
​
​
    /**
     * 删除节点
     *
     * @param deleteChildren 是否删除子节点
     * @param alwaysOnFailed 后台记录删除失败节点,一直删除直到删除成功
     */
    public static void delete(CuratorFramework client, String path, boolean deleteChildren, boolean alwaysOnFailed) throws Exception {
        if (isExist(client, path)) {
            if (alwaysOnFailed) {
                if (deleteChildren) {
                    client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath(path);
                } else {
                    client.delete().guaranteed().withVersion(-1).forPath(path);
                }
            } else {
                if (deleteChildren) {
                    client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);
                } else {
                    client.delete().withVersion(-1).forPath(path);
                }
            }
        }
    }
​
    public void delete(String path, boolean deleteChildren, boolean alwaysOnFailed) throws Exception {
        delete(client, path, deleteChildren, alwaysOnFailed);
    }
​
​
    /**
     * 删除节点(包括子节点)
     */
    public static void delete(CuratorFramework client, String path) throws Exception {
        //withVersion(-1)无视版本,直接删除
        client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);
    }
​
    /**
     * 删除节点(包括子节点)
     *
     * @param alwaysOnFailed 删除失败了也会一直在后台尝试删除 ,true or false.
     */
    public static void delete(CuratorFramework client, String path, boolean alwaysOnFailed) throws Exception {
        if (alwaysOnFailed) {
            client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath(path);
        } else {
            client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);
        }
    }
​
    /**
     * 是否存在路径
     */
    public static boolean isExist(CuratorFramework client, String path) {
        try {
            client.sync();
            return client.checkExists().forPath(path) != null;
        } catch (Exception e) {
            log.error("===>>> isExist error!", e);
            return false;
        }
    }
​
    public boolean isExist(String path) {
        return isExist(client, path);
    }
​
    /**
     * 新增节点监听
     */
    public static NodeCache addListener(CuratorFramework client, String path, NodeCacheListener listener) throws Exception {
        final NodeCache cache = new NodeCache(client, path);
        cache.start();
        cache.getListenable().addListener(listener);
        return cache;
    }
​
    public NodeCache addListener(String path, NodeCacheListener listener) throws Exception {
        return addListener(client, path, listener);
    }
​
    /**
     * 新增子节点监听
     */
    public static PathChildrenCache addChildrenListener(CuratorFramework client, String path, PathChildrenCacheListener listener) throws Exception {
        final PathChildrenCache cache = new PathChildrenCache(client, path, true);
        cache.start();
        cache.getListenable().addListener(listener);
        return cache;
    }
​
    public PathChildrenCache addChildrenListener(String path, PathChildrenCacheListener listener) throws Exception {
​
        return addChildrenListener(client, path, listener);
    }
​
    /**
     * 新增树监听
     */
    public static TreeCache addTreeListener(CuratorFramework client, String path, TreeCacheListener listener) throws Exception {
        TreeCache.Builder builder = TreeCache.newBuilder(client, path);
        TreeCache cache = builder.build();
        cache.start();
        cache.getListenable().addListener(listener);
        return cache;
    }
​
    public TreeCache addTreeListener(String path, TreeCacheListener listener) throws Exception {
​
        return addTreeListener(client, path, listener);
    }
​
    /**
     * 新增树监听
     *
     * @param maxDepth 该路径下的最大监听层级
     * @param executor 执行线程池
     */
    public static TreeCache addTreeListener(CuratorFramework client, String path, int maxDepth, TreeCacheListener listener, Executor executor) throws Exception {
        TreeCache.Builder builder = TreeCache.newBuilder(client, path);
        builder.setMaxDepth(maxDepth);
        TreeCache cache = builder.build();
        cache.start();
        cache.getListenable().addListener(listener, executor);
        return cache;
    }
​
    public TreeCache addTreeListener(String path, int maxDepth, TreeCacheListener listener, Executor executor) throws Exception {
        return addTreeListener(client, path, maxDepth, listener, executor);
    }
​
    /**
     * 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理
     */
    ExecutorService pool = Executors.newFixedThreadPool(2);
​
    /**
     * 监听数据节点的变化情况
     */
    public void watchPath(String watchPath, TreeCacheListener listener) {
        //   NodeCache nodeCache = new NodeCache(client, watchPath, false);
        TreeCache cache = new TreeCache(client, watchPath);
        cache.getListenable().addListener(listener, pool);
        try {
            cache.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
​
    /**
     * 获取读写锁
     */
    public static InterProcessReadWriteLock getReadWriteLock(CuratorFramework client, String path) {
        return new InterProcessReadWriteLock(client, path);
    }
​
    public InterProcessReadWriteLock getReadWriteLock(String path) {
        return getReadWriteLock(client, path);
    }
}
​

实现zk节点的监听

package cn.yyzmain.zk.listener;
​
import cn.yyzmain.zk.config.ZkBootStrap;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
​
import java.util.List;
​
/**
 * zk监听器
 */
@RequiredArgsConstructor
@Component
@Slf4j
public class TaskListener implements CommandLineRunner {
​
​
    @Value("${zk.listenerNodeName:/home}")
    private String listenerNodeName;
​
    @Override
    public void run(String... args) {
        log.info("开启节点监听---");
        addListener();
    }
​
    private void addListener() {
        try {
            //监听任务节点变化,若当前没有任务为空,那么调用competeTask进行任务执行
            ZkBootStrap.ZK.addChildrenListener(listenerNodeName, (client, event) -> {
                PathChildrenCacheEvent.Type type = event.getType();
                //新增和更新时检查线程执行
                log.info("zk节点发生改变,变化类型:{}", type);
                List<String> tasks = client.getChildren().forPath(listenerNodeName);
                log.info("获取到子节点信息:{}", tasks);
​
            });
        } catch (Exception e) {
            log.error("zk监听器异常!", e);
        }
    }
}
​

代码参考地址:https://github.com/yyzmain/main-study