Apache Curator Path Cache Watcher

可以监控某一路径的直接子结点(一级子结点)变化,add,update,delete。
利用此特性可以很方便的监控集群中的所有结点,当然也就很方便的可以实现简单的key.hashCode()%serverCount式的分布式计算,还可以实现简单的定制规则的负载均衡。
1.run ChildrenListener

2.run CLTest

package com.collonn.javaUtilMvn.zookeeper.curator.PathCache;

public class CLTest {
    public static void main(String[] args) throws Exception {
        CLClient01.main(null);
        CLClient02.main(null);
        CLClient03.main(null);
    }
}


package com.collonn.javaUtilMvn.zookeeper.curator.PathCache;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

import java.util.List;

public class ChildrenListener {
    public static final String C_PATH = "/TestPath";
    public static final String CHARSET = "UTF-8";

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        String zookeeperConnectionString = "127.0.0.1:2181";
                        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                        CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                        client.start();

                        //ensure path of /test
                        new EnsurePath(C_PATH).ensure(client.getZookeeperClient());

                        final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, C_PATH, true);
                        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
                            @Override
                            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                                System.out.println("================== catch children change ==================");
                                System.out.println("===" + event.getType() + "," + event.getData().getPath() + "," + event.getData().getData());
                                List<ChildData> childDataList = pathChildrenCache.getCurrentData();
                                if (childDataList != null && childDataList.size() > 0) {
                                    System.out.println("===all children as:");
                                    for (ChildData childData : childDataList) {
                                        System.out.println("==" + childData.getPath() + "," + new String(childData.getData(), "UTF-8"));
                                    }
                                }
                            }
                        });
                        pathChildrenCache.start();

                        Thread.sleep(Integer.MAX_VALUE);
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}


package com.collonn.javaUtilMvn.zookeeper.curator.PathCache;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.util.Random;

public class CLClient01 {
    public static final String C_PATH_SUB = ChildrenListener.C_PATH + "/dog";

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String zookeeperConnectionString = "127.0.0.1:2181";
                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                    CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                    client.start();

                    Random random = new Random();
                    Thread.sleep(1000 * random.nextInt(3));

                    Stat stat = client.checkExists().forPath(C_PATH_SUB);
                    if(stat == null){
                        client.create().withMode(CreateMode.EPHEMERAL).forPath(C_PATH_SUB, "dogData".getBytes(ChildrenListener.CHARSET));
                    }

                    Thread.sleep(1000 * random.nextInt(3));
                    client.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}


package com.collonn.javaUtilMvn.zookeeper.curator.PathCache;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.nio.charset.Charset;
import java.util.Random;

public class CLClient02 {
    public static final String C_PATH_SUB = ChildrenListener.C_PATH + "/cat";

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String zookeeperConnectionString = "127.0.0.1:2181";
                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                    CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                    client.start();

                    Random random = new Random();
                    Thread.sleep(1000 * random.nextInt(3));

                    Stat stat = client.checkExists().forPath(C_PATH_SUB);
                    if(stat == null){
                        client.create().withMode(CreateMode.EPHEMERAL).forPath(C_PATH_SUB, "catData".getBytes(Charset.forName(ChildrenListener.CHARSET)));
                    }

                    Thread.sleep(1000 * random.nextInt(3));
                    client.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}


package com.collonn.javaUtilMvn.zookeeper.curator.PathCache;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.nio.charset.Charset;
import java.util.Random;

public class CLClient03 {
    public static final String C_PATH_SUB = ChildrenListener.C_PATH + "/rabbit";

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    String zookeeperConnectionString = "127.0.0.1:2181";
                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                    CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                    client.start();

                    Random random = new Random();
                    Thread.sleep(1000 * random.nextInt(3));

                    Stat stat = client.checkExists().forPath(C_PATH_SUB);
                    if(stat == null){
                        client.create().withMode(CreateMode.EPHEMERAL).forPath(C_PATH_SUB, "rabbitData".getBytes(Charset.forName(ChildrenListener.CHARSET)));
                    }

                    Thread.sleep(1000 * random.nextInt(3));
                    client.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}









郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。