This is the third in a series of blogs that introduce Apache ZooKeeper. In the second blog, you took a test drive of ZooKeeper using its command-line shell. In this blog, we'll re-implement the group membership example using the ZooKeeper Java API.

Apache ZooKeeper is implemented in Java, and its native API is also Java. ZooKeeper also provides a C language API, and the distribution provides contrib modules for Perl, Python, and RESTful clients. The ZooKeeper APIs come in two flavors, synchronous or asynchronous. Which one you use depends on the situation. For example you might choose the asynchronous Java API if you are implementing a Java application to process a large number of child znodes independently of one another; in this case you could make good use of the asynchronous API to simultaneously launch all the independent tasks in parallel. On the other hand, if you are implementing simple tasks that perform sequential operations in ZooKeeper, the synchronous API is easier to use and might be a better fit in such cases.

For our group membership example, we'll use the synchronous Java API. The first thing we need to do is connect to ZooKeeper and get an instance of ZooKeeper, which is the main client API through which you perform operations like creating znodes, setting data on znodes, listing znodes, and so on. The ZooKeeper constructor launches a separate thread to connect, and returns immediately. As a result, you need to watch for the SyncConnected event which indicates when the connection has been established. Listing 1 shows code to connect to ZooKeeper, in which we use a CountDownLatch to block until we've received the connected event. (Sample code for this blog is available on GitHub at https://github.com/sleberknight/zookeeper-samples).

Listing 1 - Connecting to ZooKeeper

public ZooKeeper connect(String hosts, int sessionTimeout)
        throws IOException, InterruptedException {
  final CountDownLatch connectedSignal = new CountDownLatch(1);
  ZooKeeper zk = new ZooKeeper(hosts, sessionTimeout, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
      if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
        connectedSignal.countDown();
      }
    }
  });
  connectedSignal.await();
  return zk;
}

The next thing we need to do is create a znode for the group. As in the test drive, this znode should be persistent, so that it hangs around regardless of whether any clients are connected or not. Listing 2 shows creating a group znode.

Listing 2 - Creating the group znode

public void createGroup(String groupName)
        throws KeeperException, InterruptedException {
  String path = "/" + groupName;
  zk.create(path,
            null /* data */,
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.PERSISTENT);
}

Note in Listing 2 that we prepended a leading slash to the group name since ZooKeeper requires that all paths be absolute. The create operation takes arguments for the path, a byte[] for data which is optional, a list of ACLs (access control list) to control who can access the znode, and finally the type of znode, in this case persistent. Creating the group member znodes is almost identical to creating the group znode, except we need to create an ephemeral, sequential znode. Let's also say that we need to store some information about each member, so we'll set data on the member znodes. This is shown in Listing 3.

Listing 3 - Creating group member znodes with data

public String joinGroup(String groupName, String memberName, byte[] data)
        throws KeeperException, InterruptedException {
  String path = "/" + groupName + "/" + memberName + "-";
  String createdPath = zk.create(path,
          data,
          ZooDefs.Ids.OPEN_ACL_UNSAFE,
          CreateMode.EPHEMERAL_SEQUENTIAL);
  return createdPath;
}

Now that we can create the group allow members to join the group, it would be nice to have some way to monitor the group membership. To do this we'll first need to list children for the group znode, then set a watch on the group znode, and whenever the watch triggers an event, we'll query ZooKeeper for the group's (updated) members, as shown in Listing 4. This process continues in an infinite loop, hence the class name ListGroupForever.

Listing 4 - Listing a group's members indefinitely

public class ListGroupForever {
  private ZooKeeper zooKeeper;
  private Semaphore semaphore = new Semaphore(1);

  public ListGroupForever(ZooKeeper zooKeeper) {
    this.zooKeeper = zooKeeper;
  }

  public static void main(String[] args) throws Exception {
    ZooKeeper zk = new ConnectionHelper().connect(args[0]);
    new ListGroupForever(zk).listForever(args[1]);
  }

  public void listForever(String groupName)
          throws KeeperException, InterruptedException {
    semaphore.acquire();
    while (true) {
      list(groupName);
      semaphore.acquire();
    }
  }

  private void list(String groupName)
          throws KeeperException, InterruptedException {
    String path = "/" + groupName;
    List<String> children = zooKeeper.getChildren(path, new Watcher() {
      @Override
      public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeChildrenChanged) {
          semaphore.release();
        }
      }
    });
    if (children.isEmpty()) {
      System.out.printf("No members in group %s\n", groupName);
      return;
    }
    Collections.sort(children);
    System.out.println(children);
    System.out.println("--------------------");
}

The ListGroupForever class in Listing 4 has some interesting characteristics. The listForever method loops infinitely and uses a semaphore to block until changes occur to the group node. The list method calls getChildren to actually retrieve the child nodes from ZooKeeper, and critically sets a Watcher to watch for changes of type NodeChildrenChanged. When the NodeChildrenChanged event occurs, the watcher releases the semaphore, which permits listForever to re-acquire the semaphore and then retrieve and display the updated group znodes. This process continues until ListGroupForever is terminated.

To round out the example, we'll create a method to delete the group. As shown in the test drive, ZooKeeper doesn't permit znodes that have children to be deleted, so we first need to delete all the children, and then delete the group (parent) znode. This is shown in Listing 5.

Listing 5 - Deleting a group

public void delete(String groupName)
        throws KeeperException, InterruptedException {
  String path = "/" + groupName;
  try {
    List<String> children = zk.getChildren(path, false);
    for (String child : children) {
      zk.delete(path + "/" + child, -1);
    }
    zk.delete(path, -1);
  }
  catch (KeeperException.NoNodeException e) {
    System.out.printf("Group %s does not exist\n", groupName);
  }
}

When deleting a group, we passed -1 to the delete method to unconditionally delete the znodes. We could also have passed in a version, so that if we have the correct version number, the znode is deleted but otherwise we receive an optimistic locking violation in the form of a BadVersionException.

Conclusion to Part 3

In this third blog on ZooKeeper, we implemented a group membership example using the Java API. You saw how to connect to ZooKeeper; how to create persistent, ephemeral, and sequential znodes; how to list znodes and set watches to receive events; and finally how to delete znodes.

In the next blog, we'll back off from the code level and get an overview of ZooKeeper's architecture.

References



Post a Comment:
Comments are closed for this entry.