Building a Distributed Lock Revisited: Using Curator's InterProcessMutex

Posted on December 29, 2013 by Scott Leberknight

Last summer I wrote a series of blogs introducing Apache ZooKeeper, which is a distributed coordination service used in many open source projects like Hadoop, HBase, and Storm to manage clusters of machines. The fifth blog described how to use ZooKeeper to implement a distributed lock. In that blog I explained that the goals of a distributed lock are "to build a mutually exclusive lock between processes that could be running on different machines, possibly even on different networks or different data centers". I also mentioned that one significant benefit is that "clients know nothing about each other; they only know they need to use the lock to access some shared resource, and that they should not access it unless they own the lock." That blog described how to use the ZooKeeper WriteLock "recipe" that comes with ZooKeeper in the contrib modules to build a synchronous BlockingWriteLock with easier semantics in which you simply call a lock() method to acquire the lock, and call unlock() to release the lock. Earlier in the series, we learned how to connect to ZooKeeper in the Group Membership Example blog using a Watcher and a CountDownLatch to block until the SyncConnected event was received. All that code wasn't terribly complex but it also was fairly low-level, especially if you include the need to block until a connection event is received and the non-trival implementation of the WriteLock recipe.

In the wrap-up blog I mentioned the Curator project, originally opened sourced by Netflix and later donated by them to Apache. The Curator wiki describes Curator as "a set of Java libraries that make using Apache ZooKeeper much easier". In this blog we'll see how to use Curator to implement a distributed lock, without needing to write any of our own wrapper code for obtaining a connection or to implement the lock itself. In the distributed lock blog we saw how sequential ephemeral child nodes (e.g. child-lock-node-0000000000, child-lock-node-0000000001, child-lock-node-0000000002, etc.) are created under a persistent parent lock node. The client holding the lock on the child with the lowest sequence number owns the lock. We saw several potential gotchas: first, how does a client know whether it successfully created a child node in the case of a partial failure, i.e. a (temporary) connection loss, and how does it know which child node it created, i.e. the child with which sequence number? I noted that a solution was the embed the ZooKeeper session ID in the child node such that the client can easily identify the child node it created. Jordan Zimmerman (the creator of Curator) was kind enough to post a comment to that blog noting that using the session ID is "not ideal" because it "prevents the same ZK connection from being used in multiple threads for the same lock". He said "It's much better to use a GUID. This is what Curator uses."

Second, we noted that distributed lock clients should watch only the immediately preceding child node rather than the parent node in order to prevent a "herd effect" in which every client is notified for every single child node event, when in reality each client only need care about the child immediately preceding it. Curator handles both these cases plus adds other goodies such as a retry policy for connecting to ZooKeeper. So without further comment, lets see how to use a distributed lock in Curator.

First, we'll need to get an instance of CuratorFramework - this is an interface that represents a higher level abstraction API for working with ZooKeeper. It provides automatic connection management including retry operations, a fluent-style API, as well as a bunch of recipes you can use out-of-the-box for distributed data structures like locks, queues, leader election, etc. We can use the CuratorFrameworkFactory and a RetryPolicy of our choosing to get one.

String hosts = "host-1:2181,host-2:2181,host-3:2181";
int baseSleepTimeMills = 1000;
int maxRetries = 3;

RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMills, maxRetries);
CuratorFramework client = CuratorFrameworkFactory.newClient(hosts, retryPolicy);
client.start();

In the above code we first create a retry policy - in this case an ExponentialBackoffRetry using a base sleep time of 1000 milliseconds and up to 3 retries. Then we can use the CuratorFrameworkFactory.newClient() to obtain an instance of CuratorFramework. Finally we need to call start() (note we'll need to call close() when we're done with the client). Now that we have a client instance, we can use an implementation of InterProcessLock to create our distributed lock. The simplest one is the InterProcessMutex which is a re-entrant mutual exclusion lock that works across JVMs, by using ZooKeeper to hold the lock.

InterProcessLock lock = new InterProcessMutex(client, lockPath);
lock.acquire();
try {
  // do work while we hold the lock
} catch (Exception ex) {
  // handle exceptions as appropriate
} finally {
  lock.release();
}

The above code simply creates a InterProcessMutex for a specific lock path (lockPath), acquires the lock, does some work, and releases the lock. In this case acquire() will block until the lock becomes available. In many cases blocking indefinitely won't be a Good Thing, and Curator provides an overloaded version of acquire() which requires a maximum time to wait for the lock and returns true if the lock is obtained within the time limit and false otherwise.

InterProcessLock lock = new InterProcessMutex(client, lockPath);
if (lock.acquire(waitTimeSeconds, TimeUnit.SECONDS)) {
  try {
    // do work while we hold the lock
  } catch (Exception ex) {
    // handle exceptions as appropriate
  } finally {
    lock.release();
  }
} else {
  // we timed out waiting for lock, handle appropriately
}

The above code demonstrates using the timeout version of acquire. The code is slightly more complex since you need to check whether the lock is acquired or whether we timed out waiting for the lock. Regardless of which version of acquire() you use, you'll need to release() the lock in a finally block. The final piece is to remember to close the client when you're done with it:

client.close();

And that's pretty much it for using Curator's InterProcessMutex to implement a distributed lock. All the complexity in handling connection management, partial failures, the "herd effect", automatic retries, and so on are handled by the higher level Curator APIs. To paraphrase Stu Halloway, you should always understand at least one layer beneath the one you're working at - in this case you should have a decent understanding of how ZooKeeper works under the covers and some of the potential issues of distributed computing. But having said that, go ahead and use Curator to work at a higher level of abstraction and gain the benefits of all the distributed computing experience at Netflix as well as Yahoo (which created ZooKeeper). And last, Happy New Year 2014!

Distributed Coordination With ZooKeeper Part 6: Wrapping Up

Posted on July 15, 2013 by Scott Leberknight

This is the sixth (and last) in a series of blogs that introduce Apache ZooKeeper. In the fifth blog, we implemented a distributed lock, dealing with the issues of partial failure due to connection loss and the "herd effect" along the way.

In this final blog in the series you'll learn a few tips for administering and tuning ZooKeeper, and we'll introduce the Curator and Exhibitor frameworks.

Administration and Tuning

As with any complex distributed system, Apache ZooKeeper provides administrators plenty of knobs to control its behavior. Several important properties include the tickTime (the fundamental unit of time in ZooKeeper measured in milliseconds); the initLimit which is the time in ticks to allow followers to connect and sync to the leader; the syncLimit which is the time in ticks to allow a follower to synchronize with the leader; and the dataDir and dataLogDir which are the directories where ZooKeeper stores the in-memory database snapshots and transaction log, respectively.

Next, we'll cover just a few things you will want to be aware of when running a ZooKeeper ensemble in production.

First, when creating a ZooKeeper ensemble you should run each node on a dedicated server, meaning the only thing the server does is run an instance of ZooKeeper. The main reason you want to do this is to avoid any contention with other processes for both network and disk I/O. If you run other I/O and/or CPU-intensive processes on the same machines you are running a ZooKeeper node, you will likely see connection timeouts and other issues due to contention. I've seen this happen in production systems, and as soon as the ZooKeeper nodes were moved to their own dedicated machines, the connection loss problems disappeared.

Second, start with a three node ensemble and monitor the usage of those machines, for example using Ganglia and Nagios, to determine if your ensemble needs additional machines. Remember also to maintain an odd number of machines in the ensemble, so that there can be a majority when nodes commit write operations and when they need to vote for a new leader. Another really useful tool is zktop, which is very similar to the top command on *nix systems. It is a simple, quick and dirty way to easily start monitoring your ensemble.

Third, watch out for session timeouts, and modify the tickTime appropriately, for example maybe you have heavy network traffic and can increase tickTime to 5 seconds.

The above three tips are by no means the end of the story when it comes to administering and tuning ZooKeeper. For more in-depth information on setting up, running, administering and monitoring a ZooKeeper ensemble see the ZooKeeper Administrator's Guide on the ZooKeeper web site. Another resource is Kathleen Ting's Building an Impenetrable ZooKeeper presentation which I attended at Strange Loop 2013, and which provides a lot of very useful tips for running a ZooKeeper ensemble.

Getting a Curator

So far we've seen everything ZooKeeper provides out of the box. But when using ZooKeeper in production, you may quickly find that building recipes like distributed locks and other similar distributed data structures is harder than it looks, because you must be aware of many different kinds of problems that can arise - recall the connection loss and herd effect issues when constructing the distributed lock. You need to know when you can handle exceptions and retry an operation. For example if an idempotent operation fails during a client automatic failover event, you can simply retry the operation. The raw ZooKeeper library does not do much exception handling for you, and you need to implement retry logic yourself.

Helpfully Netflix uses ZooKeeper and has developed a framework named Curator, which they open sourced and later donated to Apache. The Curator wiki page describes it as "a set of Java libraries that make using Apache ZooKeeper much easier". While ZooKeeper comes bundled with the ZooKeeper Java client, using it to develop correct distributed data structures can be difficult and makes the code much harder to understand, due to problems such as connection loss and the "herd effect" which we saw in the previous blog.

Once you have a good understanding of ZooKeeper basics, check out Curator. It provides a client that replaces (wraps) the ZooKeeper class; a framework that contains a high-level API and improved connection and exception handling, along with built-in retry logic in the form of retry policies. Last, it provides a bunch of recipes that implement distributed data structures including locks, barriers, queues, and more. Curator even provides useful testing servers to run a single embedded ZooKeeper server or a test ensemble in unit tests.

Even better, Netflix also created Exhibitor, which is a "supervisor" for your ZooKeeper ensemble. It provides features such as monitoring, backups, a web-based interface for znode exploration, and a RESTful API.

Conclusion

In this series of blogs you were introduced to ZooKeeper; took a test drive in the ZooKeeper shell; worked with ZooKeeper's Java API to build a group membership application as well as a distributed lock; and toured the architecture and implementation details of ZooKeeper. If nothing else, remember that ZooKeeper is like a filesystem, except distributed and replicated. It allows you to build distributed coordination and data structures, is highly available, reliable, and fast due to its leader/follower design with no single point of failure, in-memory reads, and writes via the leader to maintain sequential consistency. Last, it provides clients with (mostly) transparent and automatic session failover in case of server failure. After becoming comfortable with ZooKeeper, be sure to have a look at the Curator framework by Apache (donated by Netflix recently) and also the Exhibitor monitoring application.

References

This is the fifth in a series of blogs that introduce Apache ZooKeeper. In the fourth blog, you saw a high-level view of ZooKeeper's architecture and data consistency guarantees. In this blog, we'll use all the knowledge we've gained thus far to implement a distributed lock.

You've now seen how to interact with Apache ZooKeeper and learned about its architecture and consistency model. Let's now use that knowledge to build a distributed lock. The goals are to build a mutually exclusive lock between processes that could be running on different machines, possibly even on different networks or different data centers. This also has the benefit that clients know nothing about each other; they only know they need to use the lock to access some shared resource, and that they should not access it unless they own the lock.

To build the lock, we'll create a persistent znode that will serve as the parent. Clients wishing to obtain the lock will create sequential, ephemeral child znodes under the parent znode. The lock is owned by the client process whose child znode has the lowest sequence number. In Figure 2, there are three children of the lock-node and child-1 owns the lock at this point in time, since it has the lowest sequence number. After child-1 is removed, the lock is relinquished and then the client who owns child-2 owns the lock, and so on.

Figure 2 - Parent lock znode and child znodes

Distributed Lock Nodes

The algorithm for clients to determine if they own the lock is straightforward, on the surface anyway. A client creates a new sequential ephemeral znode under the parent lock znode. The client then gets the children of the lock node and sets a watch on the lock node. If the child znode that the client created has the lowest sequence number, then the lock is acquired, and it can perform whatever actions are necessary with the resource that the lock is protecting. If the child znode it created does not have the lowest sequence number, then wait for the watch to trigger a watch event, then perform the same logic of getting the children, setting a watch, and checking for lock acquisition via the lowest sequence number. The client continues this process until the lock is acquired.

While this doesn't sound too bad there are a few potential gotchas. First, how would the client know that it successfully created the child znode if there is a partial failure (e.g. due to connection loss) during znode creation? The solution is to embed the client ZooKeeper session IDs in the child znode names, for example child-<sessionId>-; a failed-over client that retains the same session (and thus session ID) can easily determine if the child znode was created by looking for its session ID amongst the child znodes. Second, in our earlier algorithm, every client sets a watch on the parent lock znode. But this has the potential to create a "herd effect" - if every client is watching the parent znode, then every client is notified when any changes are made to the children, regardless of whether a client would be able to own the lock. If there are a small number of clients this probably doesn't matter, but if there are a large number it has the potential for a spike in network traffic. For example, the client owning child-9 need only watch the child immediately preceding it, which is most likely child-8 but could be an earlier child if the 8th child znode somehow died. Then, notifications are sent only to the client that can actually take ownership of the lock.

Fortunately for us, ZooKeeper comes with a lock "recipe" in the contrib modules called WriteLock. WriteLock implements a distributed lock using the above algorithm and takes into account partial failure and the herd effect. It uses an asynchronous callback model via a LockListener instance, whose lockAcquired method is called when the lock is acquired and lockReleased method is called when the lock is released. We can build a synchronous lock class on top of WriteLock by blocking until the lock is acquired. Listing 6 shows how we use a CountDownLatch to block until the lockAcquired method is called. (Sample code for this blog is available on GitHub at https://github.com/sleberknight/zookeeper-samples)

Listing 6 - Creating BlockingWriteLock on top of WriteLock

public class BlockingWriteLock {
  private String path;
  private WriteLock writeLock;
  private CountDownLatch signal = new CountDownLatch(1);

  public BlockingWriteLock(ZooKeeper zookeeper,
          String path, List<ACL> acls) {
    this.path = path;
    this.writeLock =
        new WriteLock(zookeeper, path, acls, new SyncLockListener());
  }

  public void lock() throws InterruptedException, KeeperException {
    writeLock.lock();
    signal.await();
  }

  public void unlock() {
    writeLock.unlock();
  }

  class SyncLockListener implements LockListener {
    @Override public void lockAcquired() {
      signal.countDown();
    }

    @Override public void lockReleased() { /* ignored */ }
  }
}

You can then use the BlockingWriteLock as shown in Listing 7.

Listing 7 - Using BlockingWriteLock

BlockingWriteLock lock =
  new BlockingWriteLock(zooKeeper, path, ZooDefs.Ids.OPEN_ACL_UNSAFE);
try {
  lock.lock();
  // do something while we own the lock
} catch (Exception ex) {
  // handle appropriately
} finally {
  lock.unlock();
}

You can take this a step further, wrapping the try/catch/finally logic and creating a class that takes commands which implement an interface. For example, you can create a DistributedLockOperationExecutor class that implements a withLock method that takes a DistributedLockOperation instance as an argument, as shown in Listing 8.

Listing 8 - Wrapping the BlockingWriteLock try/catch/finally logic

DistributedLockOperationExecutor executor =
  new DistributedLockOperationExecutor(zooKeeper);
executor.withLock(lockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE,
  new DistributedLockOperation() {
    @Override public Object execute() {
      // do something while we have the lock
    }
  });

The nice thing about wrapping try/catch/finally logic in DistributedLockOperationExecutor is that when you call withLock you eliminate boilerplate code and you cannot possibly forget to unlock the lock.

Conclusion to Part 5

In this fifth blog on ZooKeeper, you implemented a distributed lock and saw some of the potential problems that should be avoided such as partial failure on connection loss, and the "herd effect". We took our initial distributed lock and cleaned it up a bit, which resulted in a synchronous implementation using the DistributedLockOperationExecutor and DistributedLockOperation which ensures proper connection handling and lock release.

In the next (and final) blog, we'll briefly touch on administration and tuning ZooKeeper and introduce the Apache Curator framework, and finally summarize what we've learned.

References

This is the fourth in a series of blogs that introduce Apache ZooKeeper. In the third blog, you implemented a group membership example using the ZooKeeper Java API. In this blog, we'll get an overview of ZooKeeper's architecture.

Now that we've test driven Apache ZooKeeper in the shell and Java code, let's take a bird's eye view of the ZooKeeper architecture, and expand on the core concepts discussed earlier. As previously mentioned, ZooKeeper is essentially a distributed, hierarchical filesystem comprised of znodes, which can be either persistent or ephemeral. Persistent znodes can have chidren, whereas ephemeral nodes cannot, and persistent znodes persist after client sessions expire or disconnect. In contrast, ephemeral nodes cannot have children, and they are automatically destroyed as soon as the session in which they were created is closed. Both persistent and ephemeral znodes can have associated data, however the data must be less than 1MB (per znode). All znodes can optionally be sequential, for which ZooKeeper maintains a monotonically increasing number which is automatically appended to the znode name upon creation. Each sequence number is guaranteed to be unique. Finally, all znode operations (reads and writes) are atomic; they either succeed or fail and there is never a partial application of an operation. For example, if a client tries to set data on a znode, the operation will either set the data in its entirely, or no data will be changed at all.

A key element of ZooKeeper's architecture is the ability to set watches on read operations such as exist, getChildren, and getData. Write operations (i.e. create, delete, setData) on znodes trigger any watches previously set on those znodes, and watchers are notified via a WatchedEvent. How clients respond to events is entirely up to them, but setting watches and receiving notifications at some later point in time results in an event-driven, decoupled architecture. Suppose client A sets a watch on a znode. At some point in the future, when client B performs a write operation on the znode client A is watching, a WatchedEvent is generated and client A is called back via the processResult method. Client A and B are completely independent and need not even know anything about each other, so long as they each know their own responsibilities in relation to specific znodes.

Important to remember about watches is that they are one-time notifications about changes to a znode. If a client receives a WatchedEvent notification, it must re-register a new Watcher if it wants to be notified about future updates. During the period between receipt of the notification and re-registration, there exists the possibility that other clients could perform write operations on the znode before the new Watcher is registered which the client would not know about. In other words, it is entirely possible in a high write volume environment that a client can miss updates during the time it takes to process an event and re-register a new watch. Clients should assume updates can be missed, and not rely on having a complete history of every single event that occurs to a given znode.

ZooKeeper implements the hierarchical filesystem via an "ensemble" of servers. Figure 1 shows a three server ensemble with multiple clients reading and one client writing. The basic idea is that the filesystem state is replicated on each server in the ensemble, both on disk and in memory.

Figure 1 - ZooKeeper Ensemble

ZooKeeper Architecture

In Figure 1 you can see one of the servers in the ensemble acts as the leader, while the rest are followers. When an ensemble is first started, a leader election is held. During leader election, a leader is elected and the process is complete onces a simple majority of followers have synchronized their state with the leader. After leader election is complete, all write requests are routed through the leader, and changes are broacast to all followers - this is termed atomic broadcast. Once a majority of followers have persisted the change (to disk and memory), the leader commits the change and notifies the client of a successful update. Because only a majority of followers are required for a successful update, followers can lag the leader which means ZooKeeper is an eventually consistent system. Thus, different clients can read information about a given znode and receive a different answer. Every write is assigned a globally unique, sequentially ordered identifier called a zxid, or ZooKeeper transaction id. This guarantees a global order to all updates in a ZooKeeper ensemble. In addition, because all writes go through the leader, write throughput does not scale as more nodes are added.

This leader/follower architecture is not a master/slave setup, however, since the leader is not a single point of failure. If a leader dies, then a new leader election takes place and a new leader is elected (this is typically very fast and will not noticeably degrade performance, however). In addition, because leader election and writes both require a simple majority of servers, ZooKeeper ensembles should contain an odd number of machines; in a five node ensemble any two machines can go down and ZooKeeper can still remain available, whereas a six node ensemble can also only handle two machines going down because if three nodes fail, the remaining three are not a majority (of the original six).

All client read requests are served directly from the memory of the server they are connected to, which makes reads very fast. In addition, clients have no knowledge about the server they are connected to and do not know if they are connected to a leader or follower. Because reads are from the in-memory representation of the filesystem, read throughput increases as servers are added to an ensemble. But recall that write throughput is limited by the leader, so you cannot simply add more and more ZooKeepers forever and expect performance to increase.

Data Consistency

With ZooKeeper's leader/follower architecture in mind, let's consider what guarantees it makes regarding data consistency.

Sequential Updates

ZooKeeper guarantees that updates are made to the filesystem in the order they are received from clients. Since all writes route through the leader, the global order is simply the order in which the leader receives write requests.

Atomicity

All updates either succeed or fail, just like transactions in ACID-compliant relational databases. ZooKeeper, as of version 3.4.0, supports transactions as a thin wrapper around the multi operation, which performs a list of operations (instances of the Op class) and either all operations succeed or none succeed. So if you need to ensure that multiple znodes are updated at the same time, for example if two znodes are part of a graph, then you can use multi or the transaction wrapper around multi.

Consistent client view

Consistent client view means that a client will see the same view of the system, regardless of which server it is connected to. The offical ZooKeeper documentation calls this "single system image". So, if a client fails over to a different server during a session, it will never see an older view of the system than it has previously seen. A server will not accept a connection from a client until it has caught up with the state of the server to which the client was previously connected.

Durability

If an update succeeds, ZooKeeper guarantees it has been persisted and will survive server failures, even if all ZooKeeper ensemble nodes were forcefully killed at the same time! (Admittedly this would be an extreme situation, but the update would survive such an apocalypse.)

Eventual consistency

Because followers may lag the leader, ZooKeeper is an eventually consistent system. But ZooKeeper limits the amount of time a follower can lag the leader, and a follower will take itself offline if it falls too far behind. Clients can force a server to catch up with the leader by calling the asynchronous sync command. Despite the fact that sync is asynchronous, a ZooKeeper server will not process any operations until it has caught up to the leader.

Conclusion to Part 4

In this fourth blog on ZooKeeper you saw a bird's eye view of ZooKeeper's architecture, and learned about its data consistency guarantees. You also learned that ZooKeeper is an eventually consistent system.

In the next blog, we'll dive back into some code and use what we've learned so far to build a distributed lock.

References

This is the third in a series of blogs that introduce Apache ZooKeeper. In the second blog, you took a test drive of ZooKeeper using its command-line shell. In this blog, we'll re-implement the group membership example using the ZooKeeper Java API.

Apache ZooKeeper is implemented in Java, and its native API is also Java. ZooKeeper also provides a C language API, and the distribution provides contrib modules for Perl, Python, and RESTful clients. The ZooKeeper APIs come in two flavors, synchronous or asynchronous. Which one you use depends on the situation. For example you might choose the asynchronous Java API if you are implementing a Java application to process a large number of child znodes independently of one another; in this case you could make good use of the asynchronous API to simultaneously launch all the independent tasks in parallel. On the other hand, if you are implementing simple tasks that perform sequential operations in ZooKeeper, the synchronous API is easier to use and might be a better fit in such cases.

For our group membership example, we'll use the synchronous Java API. The first thing we need to do is connect to ZooKeeper and get an instance of ZooKeeper, which is the main client API through which you perform operations like creating znodes, setting data on znodes, listing znodes, and so on. The ZooKeeper constructor launches a separate thread to connect, and returns immediately. As a result, you need to watch for the SyncConnected event which indicates when the connection has been established. Listing 1 shows code to connect to ZooKeeper, in which we use a CountDownLatch to block until we've received the connected event. (Sample code for this blog is available on GitHub at https://github.com/sleberknight/zookeeper-samples).

Listing 1 - Connecting to ZooKeeper

public ZooKeeper connect(String hosts, int sessionTimeout)
        throws IOException, InterruptedException {
  final CountDownLatch connectedSignal = new CountDownLatch(1);
  ZooKeeper zk = new ZooKeeper(hosts, sessionTimeout, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
      if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
        connectedSignal.countDown();
      }
    }
  });
  connectedSignal.await();
  return zk;
}

The next thing we need to do is create a znode for the group. As in the test drive, this znode should be persistent, so that it hangs around regardless of whether any clients are connected or not. Listing 2 shows creating a group znode.

Listing 2 - Creating the group znode

public void createGroup(String groupName)
        throws KeeperException, InterruptedException {
  String path = "/" + groupName;
  zk.create(path,
            null /* data */,
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.PERSISTENT);
}

Note in Listing 2 that we prepended a leading slash to the group name since ZooKeeper requires that all paths be absolute. The create operation takes arguments for the path, a byte[] for data which is optional, a list of ACLs (access control list) to control who can access the znode, and finally the type of znode, in this case persistent. Creating the group member znodes is almost identical to creating the group znode, except we need to create an ephemeral, sequential znode. Let's also say that we need to store some information about each member, so we'll set data on the member znodes. This is shown in Listing 3.

Listing 3 - Creating group member znodes with data

public String joinGroup(String groupName, String memberName, byte[] data)
        throws KeeperException, InterruptedException {
  String path = "/" + groupName + "/" + memberName + "-";
  String createdPath = zk.create(path,
          data,
          ZooDefs.Ids.OPEN_ACL_UNSAFE,
          CreateMode.EPHEMERAL_SEQUENTIAL);
  return createdPath;
}

Now that we can create the group allow members to join the group, it would be nice to have some way to monitor the group membership. To do this we'll first need to list children for the group znode, then set a watch on the group znode, and whenever the watch triggers an event, we'll query ZooKeeper for the group's (updated) members, as shown in Listing 4. This process continues in an infinite loop, hence the class name ListGroupForever.

Listing 4 - Listing a group's members indefinitely

public class ListGroupForever {
  private ZooKeeper zooKeeper;
  private Semaphore semaphore = new Semaphore(1);

  public ListGroupForever(ZooKeeper zooKeeper) {
    this.zooKeeper = zooKeeper;
  }

  public static void main(String[] args) throws Exception {
    ZooKeeper zk = new ConnectionHelper().connect(args[0]);
    new ListGroupForever(zk).listForever(args[1]);
  }

  public void listForever(String groupName)
          throws KeeperException, InterruptedException {
    semaphore.acquire();
    while (true) {
      list(groupName);
      semaphore.acquire();
    }
  }

  private void list(String groupName)
          throws KeeperException, InterruptedException {
    String path = "/" + groupName;
    List<String> children = zooKeeper.getChildren(path, new Watcher() {
      @Override
      public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeChildrenChanged) {
          semaphore.release();
        }
      }
    });
    if (children.isEmpty()) {
      System.out.printf("No members in group %s\n", groupName);
      return;
    }
    Collections.sort(children);
    System.out.println(children);
    System.out.println("--------------------");
}

The ListGroupForever class in Listing 4 has some interesting characteristics. The listForever method loops infinitely and uses a semaphore to block until changes occur to the group node. The list method calls getChildren to actually retrieve the child nodes from ZooKeeper, and critically sets a Watcher to watch for changes of type NodeChildrenChanged. When the NodeChildrenChanged event occurs, the watcher releases the semaphore, which permits listForever to re-acquire the semaphore and then retrieve and display the updated group znodes. This process continues until ListGroupForever is terminated.

To round out the example, we'll create a method to delete the group. As shown in the test drive, ZooKeeper doesn't permit znodes that have children to be deleted, so we first need to delete all the children, and then delete the group (parent) znode. This is shown in Listing 5.

Listing 5 - Deleting a group

public void delete(String groupName)
        throws KeeperException, InterruptedException {
  String path = "/" + groupName;
  try {
    List<String> children = zk.getChildren(path, false);
    for (String child : children) {
      zk.delete(path + "/" + child, -1);
    }
    zk.delete(path, -1);
  }
  catch (KeeperException.NoNodeException e) {
    System.out.printf("Group %s does not exist\n", groupName);
  }
}

When deleting a group, we passed -1 to the delete method to unconditionally delete the znodes. We could also have passed in a version, so that if we have the correct version number, the znode is deleted but otherwise we receive an optimistic locking violation in the form of a BadVersionException.

Conclusion to Part 3

In this third blog on ZooKeeper, we implemented a group membership example using the Java API. You saw how to connect to ZooKeeper; how to create persistent, ephemeral, and sequential znodes; how to list znodes and set watches to receive events; and finally how to delete znodes.

In the next blog, we'll back off from the code level and get an overview of ZooKeeper's architecture.

References

Distributed Coordination With ZooKeeper Part 2: Test Drive

Posted on June 27, 2013 by Scott Leberknight

This is the second in a series of blogs that introduce Apache ZooKeeper. In the first blog, you got an introduction to ZooKeeper and its core concepts. In this blog, you'll take a brief test drive of ZooKeeper using its command line shell. This is a really fast and convenient way to get up and running with ZooKeeper immediately.

To get an idea of some of the basic building blocks in Apache ZooKeeper, let's take a test drive. ZooKeeper comes with a command-line shell that you can connect to and interact with the service. The following listing shows connecting to the shell, listing the znodes at the root level, and creating a znode named /sample-group which will serve as a parent znode for some other znodes that we'll create in a moment. All paths in ZooKeeper must be absolute and begin with a /. The first argument to the create command is the path, while the second is the data that is associated with the znode. Note also that when a connection is established, the default watcher sends the SyncConnected event, which you see in the listing below.

$ ./zkCli.sh
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is enabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 1] create /sample-group a-sample-group
Created /sample-group
[zk: localhost:2181(CONNECTED) 2] ls /
[sample-group, zookeeper]

At this point we want to create some child znodes under /sample-group. ZooKeeper znodes can be either persistent or ephemeral. Persistent znodes are permanent and once created, stick around until they are explicitly deleted. On the other hand, ephemeral znodes exist only as long as the client who created them is alive; once the client goes away for any reason, all ephemeral znodes it created are automatically destroyed. As you might imagine, if we want to build a group membership service for a distributed system, the client ( which is a group member) should indicate its status via ephemeral znodes, so that if it dies, the znode representing its membership is destroyed thus indicating the client is no longer a member of the group. When we created the group, we created a persistent znode. To create an ephemeral znode we use the -e option. In addition, maybe we'd like to know the order in which clients joined our group. ZooKeeper znodes can be automatically and uniquely ordered by their parent. In the shell we use -s to indicate we want to create the child znode as a sequential znode. Note also that we named the child nodes /sample-group/child- in each case. When creating sequential znodes, it is typical to end the name with a dash, to which a unique, monotonically increasing integer is automatically appended.

[zk: localhost:2181(CONNECTED) 3] create -s -e /sample-group/child- data-1
Created /sample-group/child-0000000000
[zk: localhost:2181(CONNECTED) 4] create -s -e /sample-group/child- data-2
Created /sample-group/child-0000000001
[zk: localhost:2181(CONNECTED) 5] create -s -e /sample-group/child- data-3
Created /sample-group/child-0000000002

Now let's set a watch on the /sample-group znode in order to receive change notifications whenever a child znode is added or removed. Setting the watch lets us monitor the group for changes and react accordingly. For example, if we are building a distributed search engine and a server in the search cluster dies, we need to know about that event and move the data held by the (now dead) server across the remaining servers, assuming the data is stored redundantly such as in Hadoop. This is exactly what the Apache Blur distributed search engine does in order to ensure data is not lost and that the cluster continues operating when one or more servers is lost. In ZooKeeper you set watches on read operations, for example when listing a znode or getting its data. We'll list the children under /sample-group and set a watch, indicated by using true as the second argument.

[zk: localhost:2181(CONNECTED) 6] ls /sample-group true
[child-0000000001, child-0000000002, child-0000000000]

Now if we create another child znode, the watch event will fire and notify us that a NodeChildrenChanged event occurred.

[zk: localhost:2181(CONNECTED) 7] create -s -e /sample-group/child- data-4

WATCHER::

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sample-group
Created /sample-group/child-0000000003

The event does not tell us what actually changed, however. To get the updated list of children we need to again list the contents of /sample-group. In addition, watchers are one-time events, and clients must re-register the watch to continue receiving change notifications. So if we now create another child znode, no watch will fire.

[zk: localhost:2181(CONNECTED) 8] create -s -e /sample-group/child- data-5
Created /sample-group/child-0000000004

To finish off our test drive, let's delete our test group.

[zk: localhost:2181(CONNECTED) 9] delete /sample-group
Node not empty: /sample-group

Oops. ZooKeeper won't allow znodes to be deleted if they have children. In addition updates, including deletes, are conditional upon a specific version, which is a form of optimistic locking that ensures a client update succeeds only if it passes the current version of the data. Otherwise the update fails with a BadVersionException. You can short-circuit the optimistic versioning behavior by passing -1 to updates, which tells ZooKeeper to perform the update unconditionally. So in order to delete our group, we first delete all the child znodes and then delete the group znode, all unconditionally.

[zk: localhost:2181(CONNECTED) 10] delete /sample-group/child-0000000000 -1
[zk: localhost:2181(CONNECTED) 11] delete /sample-group/child-0000000001 -1
[zk: localhost:2181(CONNECTED) 12] delete /sample-group/child-0000000002 -1
[zk: localhost:2181(CONNECTED) 13] delete /sample-group/child-0000000003 -1
[zk: localhost:2181(CONNECTED) 14] delete /sample-group/child-0000000004 -1
[zk: localhost:2181(CONNECTED) 15] delete /sample-group -1                 

In addition to the shell, ZooKeeper also provides commands referred to as the "four letter words". You issue the commands via telnet or nc (netcat). For example, let's ask ZooKeeper how it's feeling.

$ echo "ruok" | nc localhost 2181
imok

You can also use the stat command to get basic statistics on ZooKeeper.

$ echo "stat" | nc localhost 2181
Zookeeper version: 3.4.5-1392090, built on 09/30/2012 17:52 GMT
Clients:
 /0:0:0:0:0:0:0:1%0:63888[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0/157
Received: 338
Sent: 337
Connections: 1
Outstanding: 0
Zxid: 0xb
Mode: standalone
Node count: 17

In this test drive, we've seen some basic but important aspects of ZooKeeper. We created persistent and sequential ephemeral znodes, set a watch and received a change notification event when a znode's children changed, and deleted znodes. We also saw how znodes can have associated data. When building real systems you obviously won't be using the command line shell to implement behavior, however, so let's translate this simple group membership example into Java code.

Conclusion to Part 2

In this second part of the ZooKeeper series of blogs, you took a test drive using the command-line shell available in ZooKeeper. You created both persistent and ephemeral znodes. You created the ephemeral znodes as children of the persistent znode, and made them sequential as well so that ZooKeeper maintains a monotonically increasing, unique order. Finally you saw how to delete znodes and use a few of the "four letter words" to check ZooKeeper's status.

In the next blog, we'll recreate the group example you've just seen using the ZooKeeper Java API.

References

Distributed Coordination With ZooKeeper Part 1: Introduction

Posted on June 25, 2013 by Scott Leberknight

This is the first in a series of blogs that introduce Apache ZooKeeper. This blog provides an introduction to ZooKeeper and its core concepts and use cases. In later blogs you will test drive ZooKeeper, see some examples of the Java API, learn about its architecture, build a distributed data structure which can be used across independent processes and machines, and finally get a brief introduction to a higher-level API on top of ZooKeeper.

Consider a distributed system with multiple servers, each of which is responsible for holding data and performing operations on that data. This could be a distributed search engine, a distributed build system, or even something like Hadoop which has both a distributed file system and a Map/Reduce data processing framework that operates on the data in the file system. How would you determine which servers are alive and operating at any given moment in time? Or, how would you determine which servers are available to process a build in a distributed build system? Or for a distributed search system how would you know which servers are available to hold data and handle search requests? Most importantly, how would you do these things reliably in the face of the difficulties of distributed computing such as network failures, bandwidth limitations, variable latency connections, security concerns, and anything else that can go wrong in a networked environment, perhaps even across multiple data centers?

These and similar questions are the focus of Apache ZooKeeper, which is a fast, highly available, fault tolerant, distributed coordination service. Using ZooKeeper you can build reliable, distributed data structures for group membership, leader election, coordinated workflow, and configuration services, as well as generalized distributed data structures like locks, queues, barriers, and latches.

Many well-known and successful projects already rely on ZooKeeper. Just a few of them include HBase, Hadoop 2.0, Solr Cloud, Neo4J, Apache Blur (incubating), and Accumulo.

Core Concepts

ZooKeeper is a distributed, hierarchical file system that facilitates loose coupling between clients and provides an eventually consistent view of its znodes, which are like files and directories in a traditional file system. It provides basic operations such as creating, deleting, and checking existence of znodes. It provides an event-driven model in which clients can watch for changes to specific znodes, for example if a new child is added to an existing znode. ZooKeeper achieves high availability by running multiple ZooKeeper servers, called an ensemble, with each server holding an in-memory copy of the distributed file system to service client read requests. Each server also holds a persistent copy on disk.

One of the servers is elected as the leader, and all other servers are followers. The leader is responsible for all writes and for broadcasting changes to to followers. Assuming a majority of followers commit a change successfully, the write succeeds and the data is then durable even if the leader then fails. This means ZooKeeper is an eventually consistent system, because the followers may lag the leader by some small amount of time, hence clients might not always see the most up-to-date information. Importantly, the leader is not a master as in a master/slave architecture and thus is not a single point of failure; rather, if the leader dies, then the remaining followers hold an election for a new leader, and the new leader takes over where the old one left off.

Each client connects to ZooKeeper, passing in the list of servers in the ensemble. The client connects to one of the servers in the ensemble at random until a connection is established. Once connected, ZooKeeper creates a session with the client-specified timeout period. The ZooKeeper client automatically sends periodic heartbeats to keep the session alive if no operations are performed for a while, and automatically handles failover. If the ZooKeeper server a client is connected to fails, the client automatically detects this and tries to reconnect to a different server in the ensemble. The nice thing is that the same client session is retained during this failover event; however during failover it is possible that client operations could fail and, as with almost all ZooKeeper operations, client code must be vigilant and detect errors and deal with them as necessary.

Partial Failure

One of the fallacies of distributed computing is that the network is reliable. Having worked on a project for the past few years with multiple Hadoop, Apache Blur, and ZooKeeper clusters including hundreds of servers, I can definitely say from experience that the network is not reliable. Simply put, things break and you cannot assume the network is 100% reliable all the time. When designing distributed systems, you must keep this in mind and handle things you ordinarily would not even consider when building software for a single server. For example, assume a client sends an update to a server, but before the response is received the network connection is lost for a brief period. You need to ask several questions in this case. Did the message get through to the server? If it did, then did the operation actually complete successfully? Is it safe to retry an operation for which you don't even know whether it reached the server or if it failed at the server, in other words is the operation idempotent? You need to consider questions like these when building distributed systems. ZooKeeper cannot help with network problems or partial failures, but once you are aware of the kinds of problems which can arise, you are much better prepared to deal with problems when (not if) they occur. ZooKeeper provides certain guarantees regarding data consistency and atomicity that can aid you when building systems, as you will see later.

Conclusion to Part 1

In this blog we've learned that ZooKeeper is a distributed coordination service that facilitates loose coupling between distributed components. It is implemented as a distributed, hierarchical file system and you can use it to build distributed data structures such as locks, queues, and so on. In the next blog, we'll take a test drive of ZooKeeper using its command line shell.

References