This is the fifth in a series of blogs that introduce Apache ZooKeeper. In the fourth blog, you saw a high-level view of ZooKeeper's architecture and data consistency guarantees. In this blog, we'll use all the knowledge we've gained thus far to implement a distributed lock.

You've now seen how to interact with Apache ZooKeeper and learned about its architecture and consistency model. Let's now use that knowledge to build a distributed lock. The goals are to build a mutually exclusive lock between processes that could be running on different machines, possibly even on different networks or different data centers. This also has the benefit that clients know nothing about each other; they only know they need to use the lock to access some shared resource, and that they should not access it unless they own the lock.

To build the lock, we'll create a persistent znode that will serve as the parent. Clients wishing to obtain the lock will create sequential, ephemeral child znodes under the parent znode. The lock is owned by the client process whose child znode has the lowest sequence number. In Figure 2, there are three children of the lock-node and child-1 owns the lock at this point in time, since it has the lowest sequence number. After child-1 is removed, the lock is relinquished and then the client who owns child-2 owns the lock, and so on.

Figure 2 - Parent lock znode and child znodes

Distributed Lock Nodes

The algorithm for clients to determine if they own the lock is straightforward, on the surface anyway. A client creates a new sequential ephemeral znode under the parent lock znode. The client then gets the children of the lock node and sets a watch on the lock node. If the child znode that the client created has the lowest sequence number, then the lock is acquired, and it can perform whatever actions are necessary with the resource that the lock is protecting. If the child znode it created does not have the lowest sequence number, then wait for the watch to trigger a watch event, then perform the same logic of getting the children, setting a watch, and checking for lock acquisition via the lowest sequence number. The client continues this process until the lock is acquired.

While this doesn't sound too bad there are a few potential gotchas. First, how would the client know that it successfully created the child znode if there is a partial failure (e.g. due to connection loss) during znode creation? The solution is to embed the client ZooKeeper session IDs in the child znode names, for example child-<sessionId>-; a failed-over client that retains the same session (and thus session ID) can easily determine if the child znode was created by looking for its session ID amongst the child znodes. Second, in our earlier algorithm, every client sets a watch on the parent lock znode. But this has the potential to create a "herd effect" - if every client is watching the parent znode, then every client is notified when any changes are made to the children, regardless of whether a client would be able to own the lock. If there are a small number of clients this probably doesn't matter, but if there are a large number it has the potential for a spike in network traffic. For example, the client owning child-9 need only watch the child immediately preceding it, which is most likely child-8 but could be an earlier child if the 8th child znode somehow died. Then, notifications are sent only to the client that can actually take ownership of the lock.

Fortunately for us, ZooKeeper comes with a lock "recipe" in the contrib modules called WriteLock. WriteLock implements a distributed lock using the above algorithm and takes into account partial failure and the herd effect. It uses an asynchronous callback model via a LockListener instance, whose lockAcquired method is called when the lock is acquired and lockReleased method is called when the lock is released. We can build a synchronous lock class on top of WriteLock by blocking until the lock is acquired. Listing 6 shows how we use a CountDownLatch to block until the lockAcquired method is called. (Sample code for this blog is available on GitHub at https://github.com/sleberknight/zookeeper-samples)

Listing 6 - Creating BlockingWriteLock on top of WriteLock

public class BlockingWriteLock {
  private String path;
  private WriteLock writeLock;
  private CountDownLatch signal = new CountDownLatch(1);

  public BlockingWriteLock(ZooKeeper zookeeper,
          String path, List<ACL> acls) {
    this.path = path;
    this.writeLock =
        new WriteLock(zookeeper, path, acls, new SyncLockListener());
  }

  public void lock() throws InterruptedException, KeeperException {
    writeLock.lock();
    signal.await();
  }

  public void unlock() {
    writeLock.unlock();
  }

  class SyncLockListener implements LockListener {
    @Override public void lockAcquired() {
      signal.countDown();
    }

    @Override public void lockReleased() { /* ignored */ }
  }
}

You can then use the BlockingWriteLock as shown in Listing 7.

Listing 7 - Using BlockingWriteLock

BlockingWriteLock lock =
  new BlockingWriteLock(zooKeeper, path, ZooDefs.Ids.OPEN_ACL_UNSAFE);
try {
  lock.lock();
  // do something while we own the lock
} catch (Exception ex) {
  // handle appropriately
} finally {
  lock.unlock();
}

You can take this a step further, wrapping the try/catch/finally logic and creating a class that takes commands which implement an interface. For example, you can create a DistributedLockOperationExecutor class that implements a withLock method that takes a DistributedLockOperation instance as an argument, as shown in Listing 8.

Listing 8 - Wrapping the BlockingWriteLock try/catch/finally logic

DistributedLockOperationExecutor executor =
  new DistributedLockOperationExecutor(zooKeeper);
executor.withLock(lockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE,
  new DistributedLockOperation() {
    @Override public Object execute() {
      // do something while we have the lock
    }
  });

The nice thing about wrapping try/catch/finally logic in DistributedLockOperationExecutor is that when you call withLock you eliminate boilerplate code and you cannot possibly forget to unlock the lock.

Conclusion to Part 5

In this fifth blog on ZooKeeper, you implemented a distributed lock and saw some of the potential problems that should be avoided such as partial failure on connection loss, and the "herd effect". We took our initial distributed lock and cleaned it up a bit, which resulted in a synchronous implementation using the DistributedLockOperationExecutor and DistributedLockOperation which ensures proper connection handling and lock release.

In the next (and final) blog, we'll briefly touch on administration and tuning ZooKeeper and introduce the Apache Curator framework, and finally summarize what we've learned.

References

This is the fourth in a series of blogs that introduce Apache ZooKeeper. In the third blog, you implemented a group membership example using the ZooKeeper Java API. In this blog, we'll get an overview of ZooKeeper's architecture.

Now that we've test driven Apache ZooKeeper in the shell and Java code, let's take a bird's eye view of the ZooKeeper architecture, and expand on the core concepts discussed earlier. As previously mentioned, ZooKeeper is essentially a distributed, hierarchical filesystem comprised of znodes, which can be either persistent or ephemeral. Persistent znodes can have chidren, whereas ephemeral nodes cannot, and persistent znodes persist after client sessions expire or disconnect. In contrast, ephemeral nodes cannot have children, and they are automatically destroyed as soon as the session in which they were created is closed. Both persistent and ephemeral znodes can have associated data, however the data must be less than 1MB (per znode). All znodes can optionally be sequential, for which ZooKeeper maintains a monotonically increasing number which is automatically appended to the znode name upon creation. Each sequence number is guaranteed to be unique. Finally, all znode operations (reads and writes) are atomic; they either succeed or fail and there is never a partial application of an operation. For example, if a client tries to set data on a znode, the operation will either set the data in its entirely, or no data will be changed at all.

A key element of ZooKeeper's architecture is the ability to set watches on read operations such as exist, getChildren, and getData. Write operations (i.e. create, delete, setData) on znodes trigger any watches previously set on those znodes, and watchers are notified via a WatchedEvent. How clients respond to events is entirely up to them, but setting watches and receiving notifications at some later point in time results in an event-driven, decoupled architecture. Suppose client A sets a watch on a znode. At some point in the future, when client B performs a write operation on the znode client A is watching, a WatchedEvent is generated and client A is called back via the processResult method. Client A and B are completely independent and need not even know anything about each other, so long as they each know their own responsibilities in relation to specific znodes.

Important to remember about watches is that they are one-time notifications about changes to a znode. If a client receives a WatchedEvent notification, it must re-register a new Watcher if it wants to be notified about future updates. During the period between receipt of the notification and re-registration, there exists the possibility that other clients could perform write operations on the znode before the new Watcher is registered which the client would not know about. In other words, it is entirely possible in a high write volume environment that a client can miss updates during the time it takes to process an event and re-register a new watch. Clients should assume updates can be missed, and not rely on having a complete history of every single event that occurs to a given znode.

ZooKeeper implements the hierarchical filesystem via an "ensemble" of servers. Figure 1 shows a three server ensemble with multiple clients reading and one client writing. The basic idea is that the filesystem state is replicated on each server in the ensemble, both on disk and in memory.

Figure 1 - ZooKeeper Ensemble

ZooKeeper Architecture

In Figure 1 you can see one of the servers in the ensemble acts as the leader, while the rest are followers. When an ensemble is first started, a leader election is held. During leader election, a leader is elected and the process is complete onces a simple majority of followers have synchronized their state with the leader. After leader election is complete, all write requests are routed through the leader, and changes are broacast to all followers - this is termed atomic broadcast. Once a majority of followers have persisted the change (to disk and memory), the leader commits the change and notifies the client of a successful update. Because only a majority of followers are required for a successful update, followers can lag the leader which means ZooKeeper is an eventually consistent system. Thus, different clients can read information about a given znode and receive a different answer. Every write is assigned a globally unique, sequentially ordered identifier called a zxid, or ZooKeeper transaction id. This guarantees a global order to all updates in a ZooKeeper ensemble. In addition, because all writes go through the leader, write throughput does not scale as more nodes are added.

This leader/follower architecture is not a master/slave setup, however, since the leader is not a single point of failure. If a leader dies, then a new leader election takes place and a new leader is elected (this is typically very fast and will not noticeably degrade performance, however). In addition, because leader election and writes both require a simple majority of servers, ZooKeeper ensembles should contain an odd number of machines; in a five node ensemble any two machines can go down and ZooKeeper can still remain available, whereas a six node ensemble can also only handle two machines going down because if three nodes fail, the remaining three are not a majority (of the original six).

All client read requests are served directly from the memory of the server they are connected to, which makes reads very fast. In addition, clients have no knowledge about the server they are connected to and do not know if they are connected to a leader or follower. Because reads are from the in-memory representation of the filesystem, read throughput increases as servers are added to an ensemble. But recall that write throughput is limited by the leader, so you cannot simply add more and more ZooKeepers forever and expect performance to increase.

Data Consistency

With ZooKeeper's leader/follower architecture in mind, let's consider what guarantees it makes regarding data consistency.

Sequential Updates

ZooKeeper guarantees that updates are made to the filesystem in the order they are received from clients. Since all writes route through the leader, the global order is simply the order in which the leader receives write requests.

Atomicity

All updates either succeed or fail, just like transactions in ACID-compliant relational databases. ZooKeeper, as of version 3.4.0, supports transactions as a thin wrapper around the multi operation, which performs a list of operations (instances of the Op class) and either all operations succeed or none succeed. So if you need to ensure that multiple znodes are updated at the same time, for example if two znodes are part of a graph, then you can use multi or the transaction wrapper around multi.

Consistent client view

Consistent client view means that a client will see the same view of the system, regardless of which server it is connected to. The offical ZooKeeper documentation calls this "single system image". So, if a client fails over to a different server during a session, it will never see an older view of the system than it has previously seen. A server will not accept a connection from a client until it has caught up with the state of the server to which the client was previously connected.

Durability

If an update succeeds, ZooKeeper guarantees it has been persisted and will survive server failures, even if all ZooKeeper ensemble nodes were forcefully killed at the same time! (Admittedly this would be an extreme situation, but the update would survive such an apocalypse.)

Eventual consistency

Because followers may lag the leader, ZooKeeper is an eventually consistent system. But ZooKeeper limits the amount of time a follower can lag the leader, and a follower will take itself offline if it falls too far behind. Clients can force a server to catch up with the leader by calling the asynchronous sync command. Despite the fact that sync is asynchronous, a ZooKeeper server will not process any operations until it has caught up to the leader.

Conclusion to Part 4

In this fourth blog on ZooKeeper you saw a bird's eye view of ZooKeeper's architecture, and learned about its data consistency guarantees. You also learned that ZooKeeper is an eventually consistent system.

In the next blog, we'll dive back into some code and use what we've learned so far to build a distributed lock.

References

This is the third in a series of blogs that introduce Apache ZooKeeper. In the second blog, you took a test drive of ZooKeeper using its command-line shell. In this blog, we'll re-implement the group membership example using the ZooKeeper Java API.

Apache ZooKeeper is implemented in Java, and its native API is also Java. ZooKeeper also provides a C language API, and the distribution provides contrib modules for Perl, Python, and RESTful clients. The ZooKeeper APIs come in two flavors, synchronous or asynchronous. Which one you use depends on the situation. For example you might choose the asynchronous Java API if you are implementing a Java application to process a large number of child znodes independently of one another; in this case you could make good use of the asynchronous API to simultaneously launch all the independent tasks in parallel. On the other hand, if you are implementing simple tasks that perform sequential operations in ZooKeeper, the synchronous API is easier to use and might be a better fit in such cases.

For our group membership example, we'll use the synchronous Java API. The first thing we need to do is connect to ZooKeeper and get an instance of ZooKeeper, which is the main client API through which you perform operations like creating znodes, setting data on znodes, listing znodes, and so on. The ZooKeeper constructor launches a separate thread to connect, and returns immediately. As a result, you need to watch for the SyncConnected event which indicates when the connection has been established. Listing 1 shows code to connect to ZooKeeper, in which we use a CountDownLatch to block until we've received the connected event. (Sample code for this blog is available on GitHub at https://github.com/sleberknight/zookeeper-samples).

Listing 1 - Connecting to ZooKeeper

public ZooKeeper connect(String hosts, int sessionTimeout)
        throws IOException, InterruptedException {
  final CountDownLatch connectedSignal = new CountDownLatch(1);
  ZooKeeper zk = new ZooKeeper(hosts, sessionTimeout, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
      if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
        connectedSignal.countDown();
      }
    }
  });
  connectedSignal.await();
  return zk;
}

The next thing we need to do is create a znode for the group. As in the test drive, this znode should be persistent, so that it hangs around regardless of whether any clients are connected or not. Listing 2 shows creating a group znode.

Listing 2 - Creating the group znode

public void createGroup(String groupName)
        throws KeeperException, InterruptedException {
  String path = "/" + groupName;
  zk.create(path,
            null /* data */,
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.PERSISTENT);
}

Note in Listing 2 that we prepended a leading slash to the group name since ZooKeeper requires that all paths be absolute. The create operation takes arguments for the path, a byte[] for data which is optional, a list of ACLs (access control list) to control who can access the znode, and finally the type of znode, in this case persistent. Creating the group member znodes is almost identical to creating the group znode, except we need to create an ephemeral, sequential znode. Let's also say that we need to store some information about each member, so we'll set data on the member znodes. This is shown in Listing 3.

Listing 3 - Creating group member znodes with data

public String joinGroup(String groupName, String memberName, byte[] data)
        throws KeeperException, InterruptedException {
  String path = "/" + groupName + "/" + memberName + "-";
  String createdPath = zk.create(path,
          data,
          ZooDefs.Ids.OPEN_ACL_UNSAFE,
          CreateMode.EPHEMERAL_SEQUENTIAL);
  return createdPath;
}

Now that we can create the group allow members to join the group, it would be nice to have some way to monitor the group membership. To do this we'll first need to list children for the group znode, then set a watch on the group znode, and whenever the watch triggers an event, we'll query ZooKeeper for the group's (updated) members, as shown in Listing 4. This process continues in an infinite loop, hence the class name ListGroupForever.

Listing 4 - Listing a group's members indefinitely

public class ListGroupForever {
  private ZooKeeper zooKeeper;
  private Semaphore semaphore = new Semaphore(1);

  public ListGroupForever(ZooKeeper zooKeeper) {
    this.zooKeeper = zooKeeper;
  }

  public static void main(String[] args) throws Exception {
    ZooKeeper zk = new ConnectionHelper().connect(args[0]);
    new ListGroupForever(zk).listForever(args[1]);
  }

  public void listForever(String groupName)
          throws KeeperException, InterruptedException {
    semaphore.acquire();
    while (true) {
      list(groupName);
      semaphore.acquire();
    }
  }

  private void list(String groupName)
          throws KeeperException, InterruptedException {
    String path = "/" + groupName;
    List<String> children = zooKeeper.getChildren(path, new Watcher() {
      @Override
      public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeChildrenChanged) {
          semaphore.release();
        }
      }
    });
    if (children.isEmpty()) {
      System.out.printf("No members in group %s\n", groupName);
      return;
    }
    Collections.sort(children);
    System.out.println(children);
    System.out.println("--------------------");
}

The ListGroupForever class in Listing 4 has some interesting characteristics. The listForever method loops infinitely and uses a semaphore to block until changes occur to the group node. The list method calls getChildren to actually retrieve the child nodes from ZooKeeper, and critically sets a Watcher to watch for changes of type NodeChildrenChanged. When the NodeChildrenChanged event occurs, the watcher releases the semaphore, which permits listForever to re-acquire the semaphore and then retrieve and display the updated group znodes. This process continues until ListGroupForever is terminated.

To round out the example, we'll create a method to delete the group. As shown in the test drive, ZooKeeper doesn't permit znodes that have children to be deleted, so we first need to delete all the children, and then delete the group (parent) znode. This is shown in Listing 5.

Listing 5 - Deleting a group

public void delete(String groupName)
        throws KeeperException, InterruptedException {
  String path = "/" + groupName;
  try {
    List<String> children = zk.getChildren(path, false);
    for (String child : children) {
      zk.delete(path + "/" + child, -1);
    }
    zk.delete(path, -1);
  }
  catch (KeeperException.NoNodeException e) {
    System.out.printf("Group %s does not exist\n", groupName);
  }
}

When deleting a group, we passed -1 to the delete method to unconditionally delete the znodes. We could also have passed in a version, so that if we have the correct version number, the znode is deleted but otherwise we receive an optimistic locking violation in the form of a BadVersionException.

Conclusion to Part 3

In this third blog on ZooKeeper, we implemented a group membership example using the Java API. You saw how to connect to ZooKeeper; how to create persistent, ephemeral, and sequential znodes; how to list znodes and set watches to receive events; and finally how to delete znodes.

In the next blog, we'll back off from the code level and get an overview of ZooKeeper's architecture.

References

Distributed Coordination With ZooKeeper Part 2: Test Drive

Posted on June 27, 2013 by Scott Leberknight

This is the second in a series of blogs that introduce Apache ZooKeeper. In the first blog, you got an introduction to ZooKeeper and its core concepts. In this blog, you'll take a brief test drive of ZooKeeper using its command line shell. This is a really fast and convenient way to get up and running with ZooKeeper immediately.

To get an idea of some of the basic building blocks in Apache ZooKeeper, let's take a test drive. ZooKeeper comes with a command-line shell that you can connect to and interact with the service. The following listing shows connecting to the shell, listing the znodes at the root level, and creating a znode named /sample-group which will serve as a parent znode for some other znodes that we'll create in a moment. All paths in ZooKeeper must be absolute and begin with a /. The first argument to the create command is the path, while the second is the data that is associated with the znode. Note also that when a connection is established, the default watcher sends the SyncConnected event, which you see in the listing below.

$ ./zkCli.sh
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is enabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 1] create /sample-group a-sample-group
Created /sample-group
[zk: localhost:2181(CONNECTED) 2] ls /
[sample-group, zookeeper]

At this point we want to create some child znodes under /sample-group. ZooKeeper znodes can be either persistent or ephemeral. Persistent znodes are permanent and once created, stick around until they are explicitly deleted. On the other hand, ephemeral znodes exist only as long as the client who created them is alive; once the client goes away for any reason, all ephemeral znodes it created are automatically destroyed. As you might imagine, if we want to build a group membership service for a distributed system, the client ( which is a group member) should indicate its status via ephemeral znodes, so that if it dies, the znode representing its membership is destroyed thus indicating the client is no longer a member of the group. When we created the group, we created a persistent znode. To create an ephemeral znode we use the -e option. In addition, maybe we'd like to know the order in which clients joined our group. ZooKeeper znodes can be automatically and uniquely ordered by their parent. In the shell we use -s to indicate we want to create the child znode as a sequential znode. Note also that we named the child nodes /sample-group/child- in each case. When creating sequential znodes, it is typical to end the name with a dash, to which a unique, monotonically increasing integer is automatically appended.

[zk: localhost:2181(CONNECTED) 3] create -s -e /sample-group/child- data-1
Created /sample-group/child-0000000000
[zk: localhost:2181(CONNECTED) 4] create -s -e /sample-group/child- data-2
Created /sample-group/child-0000000001
[zk: localhost:2181(CONNECTED) 5] create -s -e /sample-group/child- data-3
Created /sample-group/child-0000000002

Now let's set a watch on the /sample-group znode in order to receive change notifications whenever a child znode is added or removed. Setting the watch lets us monitor the group for changes and react accordingly. For example, if we are building a distributed search engine and a server in the search cluster dies, we need to know about that event and move the data held by the (now dead) server across the remaining servers, assuming the data is stored redundantly such as in Hadoop. This is exactly what the Apache Blur distributed search engine does in order to ensure data is not lost and that the cluster continues operating when one or more servers is lost. In ZooKeeper you set watches on read operations, for example when listing a znode or getting its data. We'll list the children under /sample-group and set a watch, indicated by using true as the second argument.

[zk: localhost:2181(CONNECTED) 6] ls /sample-group true
[child-0000000001, child-0000000002, child-0000000000]

Now if we create another child znode, the watch event will fire and notify us that a NodeChildrenChanged event occurred.

[zk: localhost:2181(CONNECTED) 7] create -s -e /sample-group/child- data-4

WATCHER::

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sample-group
Created /sample-group/child-0000000003

The event does not tell us what actually changed, however. To get the updated list of children we need to again list the contents of /sample-group. In addition, watchers are one-time events, and clients must re-register the watch to continue receiving change notifications. So if we now create another child znode, no watch will fire.

[zk: localhost:2181(CONNECTED) 8] create -s -e /sample-group/child- data-5
Created /sample-group/child-0000000004

To finish off our test drive, let's delete our test group.

[zk: localhost:2181(CONNECTED) 9] delete /sample-group
Node not empty: /sample-group

Oops. ZooKeeper won't allow znodes to be deleted if they have children. In addition updates, including deletes, are conditional upon a specific version, which is a form of optimistic locking that ensures a client update succeeds only if it passes the current version of the data. Otherwise the update fails with a BadVersionException. You can short-circuit the optimistic versioning behavior by passing -1 to updates, which tells ZooKeeper to perform the update unconditionally. So in order to delete our group, we first delete all the child znodes and then delete the group znode, all unconditionally.

[zk: localhost:2181(CONNECTED) 10] delete /sample-group/child-0000000000 -1
[zk: localhost:2181(CONNECTED) 11] delete /sample-group/child-0000000001 -1
[zk: localhost:2181(CONNECTED) 12] delete /sample-group/child-0000000002 -1
[zk: localhost:2181(CONNECTED) 13] delete /sample-group/child-0000000003 -1
[zk: localhost:2181(CONNECTED) 14] delete /sample-group/child-0000000004 -1
[zk: localhost:2181(CONNECTED) 15] delete /sample-group -1                 

In addition to the shell, ZooKeeper also provides commands referred to as the "four letter words". You issue the commands via telnet or nc (netcat). For example, let's ask ZooKeeper how it's feeling.

$ echo "ruok" | nc localhost 2181
imok

You can also use the stat command to get basic statistics on ZooKeeper.

$ echo "stat" | nc localhost 2181
Zookeeper version: 3.4.5-1392090, built on 09/30/2012 17:52 GMT
Clients:
 /0:0:0:0:0:0:0:1%0:63888[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0/157
Received: 338
Sent: 337
Connections: 1
Outstanding: 0
Zxid: 0xb
Mode: standalone
Node count: 17

In this test drive, we've seen some basic but important aspects of ZooKeeper. We created persistent and sequential ephemeral znodes, set a watch and received a change notification event when a znode's children changed, and deleted znodes. We also saw how znodes can have associated data. When building real systems you obviously won't be using the command line shell to implement behavior, however, so let's translate this simple group membership example into Java code.

Conclusion to Part 2

In this second part of the ZooKeeper series of blogs, you took a test drive using the command-line shell available in ZooKeeper. You created both persistent and ephemeral znodes. You created the ephemeral znodes as children of the persistent znode, and made them sequential as well so that ZooKeeper maintains a monotonically increasing, unique order. Finally you saw how to delete znodes and use a few of the "four letter words" to check ZooKeeper's status.

In the next blog, we'll recreate the group example you've just seen using the ZooKeeper Java API.

References

Distributed Coordination With ZooKeeper Part 1: Introduction

Posted on June 25, 2013 by Scott Leberknight

This is the first in a series of blogs that introduce Apache ZooKeeper. This blog provides an introduction to ZooKeeper and its core concepts and use cases. In later blogs you will test drive ZooKeeper, see some examples of the Java API, learn about its architecture, build a distributed data structure which can be used across independent processes and machines, and finally get a brief introduction to a higher-level API on top of ZooKeeper.

Consider a distributed system with multiple servers, each of which is responsible for holding data and performing operations on that data. This could be a distributed search engine, a distributed build system, or even something like Hadoop which has both a distributed file system and a Map/Reduce data processing framework that operates on the data in the file system. How would you determine which servers are alive and operating at any given moment in time? Or, how would you determine which servers are available to process a build in a distributed build system? Or for a distributed search system how would you know which servers are available to hold data and handle search requests? Most importantly, how would you do these things reliably in the face of the difficulties of distributed computing such as network failures, bandwidth limitations, variable latency connections, security concerns, and anything else that can go wrong in a networked environment, perhaps even across multiple data centers?

These and similar questions are the focus of Apache ZooKeeper, which is a fast, highly available, fault tolerant, distributed coordination service. Using ZooKeeper you can build reliable, distributed data structures for group membership, leader election, coordinated workflow, and configuration services, as well as generalized distributed data structures like locks, queues, barriers, and latches.

Many well-known and successful projects already rely on ZooKeeper. Just a few of them include HBase, Hadoop 2.0, Solr Cloud, Neo4J, Apache Blur (incubating), and Accumulo.

Core Concepts

ZooKeeper is a distributed, hierarchical file system that facilitates loose coupling between clients and provides an eventually consistent view of its znodes, which are like files and directories in a traditional file system. It provides basic operations such as creating, deleting, and checking existence of znodes. It provides an event-driven model in which clients can watch for changes to specific znodes, for example if a new child is added to an existing znode. ZooKeeper achieves high availability by running multiple ZooKeeper servers, called an ensemble, with each server holding an in-memory copy of the distributed file system to service client read requests. Each server also holds a persistent copy on disk.

One of the servers is elected as the leader, and all other servers are followers. The leader is responsible for all writes and for broadcasting changes to to followers. Assuming a majority of followers commit a change successfully, the write succeeds and the data is then durable even if the leader then fails. This means ZooKeeper is an eventually consistent system, because the followers may lag the leader by some small amount of time, hence clients might not always see the most up-to-date information. Importantly, the leader is not a master as in a master/slave architecture and thus is not a single point of failure; rather, if the leader dies, then the remaining followers hold an election for a new leader, and the new leader takes over where the old one left off.

Each client connects to ZooKeeper, passing in the list of servers in the ensemble. The client connects to one of the servers in the ensemble at random until a connection is established. Once connected, ZooKeeper creates a session with the client-specified timeout period. The ZooKeeper client automatically sends periodic heartbeats to keep the session alive if no operations are performed for a while, and automatically handles failover. If the ZooKeeper server a client is connected to fails, the client automatically detects this and tries to reconnect to a different server in the ensemble. The nice thing is that the same client session is retained during this failover event; however during failover it is possible that client operations could fail and, as with almost all ZooKeeper operations, client code must be vigilant and detect errors and deal with them as necessary.

Partial Failure

One of the fallacies of distributed computing is that the network is reliable. Having worked on a project for the past few years with multiple Hadoop, Apache Blur, and ZooKeeper clusters including hundreds of servers, I can definitely say from experience that the network is not reliable. Simply put, things break and you cannot assume the network is 100% reliable all the time. When designing distributed systems, you must keep this in mind and handle things you ordinarily would not even consider when building software for a single server. For example, assume a client sends an update to a server, but before the response is received the network connection is lost for a brief period. You need to ask several questions in this case. Did the message get through to the server? If it did, then did the operation actually complete successfully? Is it safe to retry an operation for which you don't even know whether it reached the server or if it failed at the server, in other words is the operation idempotent? You need to consider questions like these when building distributed systems. ZooKeeper cannot help with network problems or partial failures, but once you are aware of the kinds of problems which can arise, you are much better prepared to deal with problems when (not if) they occur. ZooKeeper provides certain guarantees regarding data consistency and atomicity that can aid you when building systems, as you will see later.

Conclusion to Part 1

In this blog we've learned that ZooKeeper is a distributed coordination service that facilitates loose coupling between distributed components. It is implemented as a distributed, hierarchical file system and you can use it to build distributed data structures such as locks, queues, and so on. In the next blog, we'll take a test drive of ZooKeeper using its command line shell.

References

Hadoop Presentation at NOVA/DC Java Users Group

Posted on May 09, 2011 by Scott Leberknight

Last Thursday (on Cinco de Mayo) I gave a presentation on Hadoop and Hive at the Nova/DC Java Users Group. As several people asked about getting the slides, I've shared them here on Slideshare. I also posted the presentation sample code on Github at basic-hadoop-examples.

What's in JDK 7 Lightning Talk Slides

Posted on April 16, 2011 by Scott Leberknight

Yesterday at the Near Infinity 2011 Spring Conference I gave a talk on CoffeeScript (see here) and a very short lightning talk on what exactly is in JDK 7. You can find the slides for the JDK 7 talk here if you're interested.

CoffeeScript Slides

Posted on April 15, 2011 by Scott Leberknight

Today is the Near Infinity Spring Conference. We have one conference in the fall and one in the spring for all our developers as well as invited guests. Today I gave a presentation on CoffeeScript and shared the slides here.

Introducing RJava

Posted on March 31, 2011 by Scott Leberknight

You’ve no doubt heard about JRuby, which lets you run Ruby code on the JVM. This is nice, but wouldn’t it be nicer if you could write Java code on a Ruby VM? This would let you take advantage of the power of Ruby 1.9’s new YARV (Yet Another Ruby VM) interpreter while letting you write code in a statically-typed language. Without further ado, I’d like to introduce RJava, which does just that!

RJava lets you write code in Java and run it on a Ruby VM! And you still get the full benefit of the Java compiler to ensure your code is 100% correct. Of course with Java you also get checked exceptions and proper interfaces and abstract classes to ensure compliance with your design. You no longer need to worry about whether an object responds to a random message, because the Java compiler will enforce that it does.

You get all this and more but on the power and flexibility of a Ruby VM. And because Java does not support closures, you are ensured that everything is properly designed since you’ll be able to define interfaces and then implement anonymous inner classes just like you’re used to doing! Even when JDK 8 arrives sometime in the future with lambdas, you can rest assured that they will be statically typed.

As a first example, let’s see how you could filter a collection in RJava to find only the even numbers from one to ten. In Ruby you’d probably write something like this:

evens = (1..10).find_all { |n| n % 2 == 0 }

With RJava, you’d write this:

List<Integer> evens = new ArrayList<Integer>();
for (int i = 1; i <= 10; i++) {
  if (i % 2 == 0) {
    evens.add(i);
  }
}

This example shows the benefits of declaring variables with specific types, how you can use interfaces (e.g. List in the example) when declaring variables, and shows how you also get the benefits of Java generics to ensure your collections are always type-safe. Without any doubt you know that “evens” is a List containing Integers and that “i” is an int, so you can sleep soundly knowing your code is correct. You can also see Java’s powerful “for” loop at work here, to easily traverse from 1 to 10, inclusive. Finally, you saw how to effectively use Java’s braces to organize code to clearly show blocks, and semi-colons ensure you always know where lines terminate.

I’ve just released RJava on GitHub, so go check it out. Please download RJava today and give it a try and let me know what you think!

Database-Backed Refreshable Beans with Groovy and Spring 3

Posted on October 30, 2010 by Scott Leberknight

In 2009 I published a two-part series of articles on IBM developerWorks entitled Groovier Spring. The articles showed how Spring supports implementing beans in Groovy whose behavior can be changed at runtime via the "refreshable beans" feature. This feature essentially detects when a Spring bean backed by a Groovy script has changed, recompiles it, and replaces the old bean with the new one. This feature is pretty powerful in certain scenarios, for example in PDF generation; mail or any kind of template generation; and as a way to implement runtime modifiable business rules. One specific use case I showed was how to implement PDF generation where the Groovy scripts reside in a database, allowing you to change how PDFs are generated by simply updating Groovy scripts in your database.

In order to load Groovy scripts from a database, I showed how to implement custom ScriptFactoryPostProcessor and ScriptSource classes. The CustomScriptFactoryPostProcessor extends the default Spring ScriptFactoryPostProcessor and overrides the convertToScriptSource method to recognize a database-based script, e.g. you could specify a script source of database:com/nearinfinity/demo/GroovyPdfGenerator.groovy. There is also DatabaseScriptSource that implements the ScriptSource interface and which knows how to load Groovy scripts from a database.

In order to put these pieces together, you need to do a bit of configuration. In the articles I used Spring 2.5.x which was current at the time in early 2009. The configuration looked like this:

<bean id="dataSource"
  class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <!-- set data source props, e.g. driverClassName, url, username, password... -->
</bean>

<bean id="scriptFactoryPostProcessor"
  class="com.nearinfinity.spring.scripting.support.CustomScriptFactoryPostProcessor">
    <property name="dataSource" ref="dataSource"/>
</bean>

<lang:groovy id="pdfGenerator"
  script-source="database:com/nearinfinity/demo/DemoGroovyPdfGenerator.groovy">
    <lang:property name="companyName" value="Database Groovy Bookstore"/>
</lang:groovy>

In Spring 2.5.x this works because the <lang:groovy> tag looks for a Spring bean with id "scriptFactoryPostProcessor" and if one exists it uses it, if not it creates it. In the above configuration we created our own "scriptFactoryPostProcessor" bean for <lang:groovy> tags to utilize. So all's well...until you move to Spring 3.x at which point the above configuration no longer works. This was pointed out to me by João from Brazil who tried the sample code in the articles with Spring 3.x, and it did not work. After trying a bunch of things, we eventually determined that in Spring 3.x the <lang:groovy> tag looks for a ScriptFactoryPostProcessor bean whose id is "org.springframework.scripting.config.scriptFactoryPostProcessor" not just "scriptFactoryPostProcessor." So once you figure this out, it is easy to change the above configuration to:

<bean id="org.springframework.scripting.config.scriptFactoryPostProcessor"
  class="com.nearinfinity.spring.scripting.support.CustomScriptFactoryPostProcessor">
    <property name="dataSource" ref="dataSource"/>
</bean>

<lang:groovy id="pdfGenerator"
  script-source="database:com/nearinfinity/demo/DemoGroovyPdfGenerator.groovy">
    <lang:property name="companyName" value="Database Groovy Bookstore"/>
</lang:groovy>

Then, everything works as expected and the Groovy scripts can reside in your database and be automatically reloaded when you change them. So if you download the article sample code as-is, it will work since the bundled Spring version is 2.5.4, but if you update to Spring 3.x then you'll need to modify the configuration in applicationContext.xml for example #7 (EX #7) as shown above to change the "scriptFactoryPostProcessor" bean to be "org.springframework.scripting.config.scriptFactoryPostProcessor." Note there is a scheduled JIRA issue SPR-5106 that will make the ScriptFactoryPostProcessor mechanism pluggable, so that you won't need to extend the default ScriptFactoryPostProcessor and replace the default bean, etc. But until then, this hack continues to work pretty well.