多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
[TOC] # 基本使用 org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话 它提供以下几类主要方法 : ![](https://box.kancloud.cn/0c4b193d1e3fd67300c1af9eb4ec9631_527x206.png) # 增删改查znode数据 ~~~ package hello; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.concurrent.CountDownLatch; public class SimpleZk { //逗号后面别加空格 private static final String connectString = "192.168.33.12:2181,192.168.33.22:2181,192.168.3.33:2181"; private static final int sessionTimeOut = 2000; //latch就相当于一个对象,当latch.await()方法执行时,线程就会等待 //当latch的count减为0的时候,将会唤醒等待的线程 //让主线程阻塞掉 CountDownLatch latch = new CountDownLatch(1); ZooKeeper zkClient = null; @Before public void init() throws InterruptedException, IOException { //一new完就往下走,但是这时候客户端还没完成连接,所以我们要等他创建好 //一旦成功握手这边的process就会回调一次 zkClient = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { public void process(WatchedEvent watchedEvent) { //SyncConnected同步连接 //回调了并且事件等于连接成功 if (latch.getCount() > 0 && watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("countdown"); //把计数减少,然后主线程就可以往下走了 latch.countDown(); } //收到事件通知后的回调函数(应该是我们自己的事件处理逻辑) System.out.println(watchedEvent.getType() + "---" + watchedEvent.getPath() + "---" + watchedEvent.getState()); // try { // byte[] zkClientData = zkClient.getData("/idea", true, null); // System.out.println(new String(zkClientData, "UTF-8")); // } catch (Exception e) { // e.printStackTrace(); // } } }); latch.await(); } /** * 数据增删改查 */ //创建数据节点到zk中 @Test public void testCreate() throws KeeperException, InterruptedException { //参数1:要创建的节点的路径 //参数2:节点的数据参数 //参数3:节点的权限,这边用开放的权限 //参数4:节点的类型,有2种,短暂(ephemeral)(断开连接自己删除),持久(persistent)(断开连接不删除) //PERSISTENT (持久) //PERSISTENT_SEQUENTIAL(持久序列/test0000000019 ) //EPHEMERAL (临时的) //EPHEMERAL_SEQUENTIAL //返回一个节点的路径给你 String nodeCreated = zkClient.create("/idea/name", "jdxia".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //上传可以任意类型,但是都要转成bytes[] //这边已经创建完了 System.out.println(nodeCreated); zkClient.close(); } //查询数据 @Test public void getData() throws KeeperException, InterruptedException, UnsupportedEncodingException { //第一个参数是节点 //第二个参数是watcher,但是可以用true,表示重载之前的 //第三个参数,表示取数据的元信息,表示取最新版的还老版本的 byte[] zkClientData = zkClient.getData("/idea", true, null); System.out.println(new String(zkClientData, "UTF-8")); // Thread.sleep(Long.MAX_VALUE); } //修改数据 @Test public void setData() throws KeeperException, InterruptedException { //第三个参数表示版本,-1表示任何版本 //setACL是设置权限 Stat setData = zkClient.setData("/idea", "java".getBytes(), -1); System.out.println("修改数据成功"); } //删除数据 @Test public void delData() throws KeeperException, InterruptedException { //如果这个节点下面有子节点,这个是不能删除的 //第二个参数表示删除的是那个版本的,-1表示任何版本 zkClient.delete("/idea",-1); //查看节点状态,如果这个节点没有会返回null Stat exists = zkClient.exists("/idea", false); System.out.println(exists); } //查看节点的详细信息 @Test public void getInfo() throws KeeperException, InterruptedException { //查看节点状态,如果这个节点没有会返回null Stat stat = zkClient.exists("/idea", false); //获取数据的长度 int dataLength = stat.getDataLength(); //获取子节点的个数 int numChildren = stat.getNumChildren(); //获取数据的版本 int version = stat.getVersion(); System.out.println("数据的长度---"+dataLength); System.out.println("子节点的个数---"+numChildren); System.out.println("数据的版本号---"+version); } //关闭客户端 @Test public void zkClose() throws InterruptedException { zkClient.close(); } } ~~~ # 监听znode **zk中监听回调函数是一次性的,一旦被触发就被移除监听列表.如果需要永久监听,就需要持续进行回调函数注册** Zookeeper的监听器工作机制 ![](https://box.kancloud.cn/91ea981258ba044196e4167f559dee13_932x256.png) 监听器是一个接口,我们的代码中可以实现Wather这个接口,实现其中的process方法,方法中即我们自己的业务逻辑 ![](https://box.kancloud.cn/9fbce0815f7cc18cad9eb298a387ec4a_847x422.png) 监听器的注册是在获取数据的操作中实现: ~~~ getData(path,watch?)监听的事件是:节点数据变化事件 getChildren(path,watch?)监听的事件是:节点下的子节点增减变化事件 ~~~ ![](https://box.kancloud.cn/3ea7260fed43ccabfb850d06582262a7_232x406.png) 我们主线程sleep了,但是数据交换线程没用sleep,getData调用的是数据交换线程 可以让他响应后又监听,不然的话监听一次就没了 ## 监听器原理 ![](https://box.kancloud.cn/2428cb871ac9afe7a209d058109c54dc_1157x260.png) 1. 首先要有一个main()线程 2. 在main线程中创建zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener) 3. 通过connect线程将注册的监听事件发送给zookeeper 4. 在zookeeper的注册监听器列表中将注册的监听事件添加到列表中 5. zookeeper监听到有数据或路径变化,就会把这个消息发送给listener线程 6. listener线程内部调用了process()方法 ## 代码 ~~~ package hello; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.concurrent.CountDownLatch; public class SimpleZk { private static final String connectString = "192.168.33.12:2181,192.168.33.22:2181,192.168.3.33:2181"; private static final int sessionTimeOut = 2000; //latch就相当于一个对象,当latch.await()方法执行时,线程就会等待 //当latch的count减为0的时候,将会唤醒等待的线程 //让主线程阻塞掉 CountDownLatch latch = new CountDownLatch(1); ZooKeeper zkClient = null; @Before public void init() throws InterruptedException, IOException { //一new完就往下走,但是这时候客户端还没完成连接,所以我们要等他创建好 //一旦成功握手这边的process就会回调一次 zkClient = new ZooKeeper(connectString, sessionTimeOut, new Watcher() { public void process(WatchedEvent watchedEvent) { //SyncConnected同步连接 //回调了并且事件等于连接成功 if (latch.getCount() > 0 && watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("countdown"); //把计数减少,然后主线程就可以往下走了 latch.countDown(); } //收到事件通知后的回调函数(应该是我们自己的事件处理逻辑) System.out.println(watchedEvent.getType() + "---" + watchedEvent.getPath() + "---" + watchedEvent.getState()); try { //这边的第二个参数用true,表示又用了监听,这样会一直监听下去的 byte[] zkClientData = zkClient.getData("/idea", true, null); System.out.println(new String(zkClientData, "UTF-8")); } catch (Exception e) { e.printStackTrace(); } } }); latch.await(); } //查询数据 @Test public void getData() throws KeeperException, InterruptedException, UnsupportedEncodingException { //第一个参数是节点 //第二个参数是watcher,但是可以用true,表示重载之前的 //第三个参数,表示取数据的元信息,表示取最新版的还老版本的 byte[] zkClientData = zkClient.getData("/idea", true, null); System.out.println(new String(zkClientData, "UTF-8")); Thread.sleep(Long.MAX_VALUE); } } ~~~ **查询子节点** ~~~ //查询子节点 @Test public void getChilrenNode() throws KeeperException, InterruptedException { //用之前的监听器 List<String> children = zkClient.getChildren("/idea", true); for (String child:children) { System.out.println(child); } } ~~~ # 守护线程 他的listen/connect是守护线程 我们的主线程没必要再sleep了 ~~~ package hello; public class Test { public static void main(String[] args) { System.out.println("主线程开始了"); Thread thread = new Thread(new Runnable() { @Override public void run() { System.out.println("线程开始了"); while (true) { } } }); thread.setDaemon(true); thread.start(); } } ~~~