Distributed Coordination With ZooKeeper Part 3: Group Membership Example
Posted on July 01, 2013 by Scott Leberknight
This is the third in a series of blogs that introduce Apache ZooKeeper. In the second blog, you took a test drive of ZooKeeper using its command-line shell. In this blog, we'll re-implement the group membership example using the ZooKeeper Java API.
Apache ZooKeeper is implemented in Java, and its native API is also Java. ZooKeeper also provides a C language API, and the distribution provides contrib modules for Perl, Python, and RESTful clients. The ZooKeeper APIs come in two flavors, synchronous or asynchronous. Which one you use depends on the situation. For example you might choose the asynchronous Java API if you are implementing a Java application to process a large number of child znodes independently of one another; in this case you could make good use of the asynchronous API to simultaneously launch all the independent tasks in parallel. On the other hand, if you are implementing simple tasks that perform sequential operations in ZooKeeper, the synchronous API is easier to use and might be a better fit in such cases.
For our group membership example, we'll use the synchronous Java API. The first thing we need to do is connect to ZooKeeper and get an instance of ZooKeeper
, which is the main client API through which you perform operations like creating znodes, setting data on znodes, listing znodes, and so on. The ZooKeeper
constructor launches a separate thread to connect, and returns immediately. As a result, you need to watch for the SyncConnected
event which indicates when the connection has been established. Listing 1 shows code to connect to ZooKeeper, in which we use a CountDownLatch
to block until we've received the connected event. (Sample code for this blog is available on GitHub at https://github.com/sleberknight/zookeeper-samples).
Listing 1 - Connecting to ZooKeeper
public ZooKeeper connect(String hosts, int sessionTimeout)
throws IOException, InterruptedException {
final CountDownLatch connectedSignal = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper(hosts, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}
});
connectedSignal.await();
return zk;
}
The next thing we need to do is create a znode for the group. As in the test drive, this znode should be persistent, so that it hangs around regardless of whether any clients are connected or not. Listing 2 shows creating a group znode.
Listing 2 - Creating the group znode
public void createGroup(String groupName)
throws KeeperException, InterruptedException {
String path = "/" + groupName;
zk.create(path,
null /* data */,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
Note in Listing 2 that we prepended a leading slash to the group name since ZooKeeper requires that all paths be absolute. The create
operation takes arguments for the path, a byte[]
for data which is optional, a list of ACLs (access control list) to control who can access the znode, and finally the type of znode, in this case persistent. Creating the group member znodes is almost identical to creating the group znode, except we need to create an ephemeral, sequential znode. Let's also say that we need to store some information about each member, so we'll set data on the member znodes. This is shown in Listing 3.
Listing 3 - Creating group member znodes with data
public String joinGroup(String groupName, String memberName, byte[] data)
throws KeeperException, InterruptedException {
String path = "/" + groupName + "/" + memberName + "-";
String createdPath = zk.create(path,
data,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
return createdPath;
}
Now that we can create the group allow members to join the group, it would be nice to have some way to monitor the group membership. To do this we'll first need to list children for the group znode, then set a watch on the group znode, and whenever the watch triggers an event, we'll query ZooKeeper for the group's (updated) members, as shown in Listing 4. This process continues in an infinite loop, hence the class name ListGroupForever
.
Listing 4 - Listing a group's members indefinitely
public class ListGroupForever {
private ZooKeeper zooKeeper;
private Semaphore semaphore = new Semaphore(1);
public ListGroupForever(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ConnectionHelper().connect(args[0]);
new ListGroupForever(zk).listForever(args[1]);
}
public void listForever(String groupName)
throws KeeperException, InterruptedException {
semaphore.acquire();
while (true) {
list(groupName);
semaphore.acquire();
}
}
private void list(String groupName)
throws KeeperException, InterruptedException {
String path = "/" + groupName;
List<String> children = zooKeeper.getChildren(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
semaphore.release();
}
}
});
if (children.isEmpty()) {
System.out.printf("No members in group %s\n", groupName);
return;
}
Collections.sort(children);
System.out.println(children);
System.out.println("--------------------");
}
The ListGroupForever
class in Listing 4 has some interesting characteristics. The listForever
method loops infinitely and uses a semaphore to block until changes occur to the group node. The list
method calls getChildren
to actually retrieve the child nodes from ZooKeeper, and critically sets a Watcher
to watch for changes of type NodeChildrenChanged
. When the NodeChildrenChanged
event occurs, the watcher releases the semaphore, which permits listForever
to re-acquire the semaphore and then retrieve and display the updated group znodes. This process continues until ListGroupForever
is terminated.
To round out the example, we'll create a method to delete the group. As shown in the test drive, ZooKeeper doesn't permit znodes that have children to be deleted, so we first need to delete all the children, and then delete the group (parent) znode. This is shown in Listing 5.
Listing 5 - Deleting a group
public void delete(String groupName)
throws KeeperException, InterruptedException {
String path = "/" + groupName;
try {
List<String> children = zk.getChildren(path, false);
for (String child : children) {
zk.delete(path + "/" + child, -1);
}
zk.delete(path, -1);
}
catch (KeeperException.NoNodeException e) {
System.out.printf("Group %s does not exist\n", groupName);
}
}
When deleting a group, we passed -1
to the delete
method to unconditionally delete the znodes. We could also have passed in a version, so that if we have the correct version number, the znode is deleted but otherwise we receive an optimistic locking violation in the form of a BadVersionException
.
Conclusion to Part 3
In this third blog on ZooKeeper, we implemented a group membership example using the Java API. You saw how to connect to ZooKeeper; how to create persistent, ephemeral, and sequential znodes; how to list znodes and set watches to receive events; and finally how to delete znodes.
In the next blog, we'll back off from the code level and get an overview of ZooKeeper's architecture.
References
- Source code for these blogs, https://github.com/sleberknight/zookeeper-samples
- Presentation on ZooKeeper, http://www.slideshare.net/scottleber/apache-zookeeper
- ZooKeeper web site, http://zookeeper.apache.org/