ZooKeeper

Introduction to ZooKeeper - 21 July 2018

ZooKeeper is a coordination and configuration service for distributed systems. It provides the facility for maintaining values that different processes use to coordinate their activities.

I'm writing this guide, both as an exercise in getting to know ZooKeeper better and because I'm disappointed with the state of examples out there on the web. The examples I've found all seem to clutter up ZooKeeper's operations with additional concerns.

So let's get started.

Hello World

The most basic thing I figure we can do with ZooKeeper is maintain a value. So here is the code:

CountDownLatch connectionLatch = new CountDownLatch(1);
ZooKeeper zoo = new ZooKeeper("127.0.0.1:2181", 2000, we -> {
    if (we.getState() == Watcher.Event.KeeperState.SyncConnected) {
        connectionLatch.countDown();
    }
});
connectionLatch.await();

zoo.create("/foo", "bar".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

String result = new String(zoo.getData("/foo", null, null));
System.out.println(result);

So that already looks a bit busy and we've only done the most basic thing imaginable. But it's really because of the connection setup. ZooKeeper doesn't block the calling thread when setting up a new connection. This is a common theme for ZooKeeper. ZooKeeper really looks after your threads. So from now on I'm going to wrap that connection up in a handy function.

public static ZooKeeper connect(String connectionString) throws IOException, InterruptedException {
    CountDownLatch connectionLatch = new CountDownLatch(1);
    ZooKeeper zoo = new ZooKeeper(connectionString, 2000, we -> {
        if (we.getState() == Watcher.Event.KeeperState.SyncConnected) {
            connectionLatch.countDown();
        }
    });
    connectionLatch.await();
    return zoo;
}

That leaves us with just this bit: Here we are creating a ZNode at the path '/foo' and we are writing a few bytes to it. The additional parameters handle authorisation settings and persistence settings.

ZooKeeper zoo = connect("127.0.0.1:2181");

zoo.create("/foo", "bar".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

String result = new String(zoo.getData("/foo", null, null));
System.out.println(result);

So that's all good. How about an update? Let's test if the ZNode exists and then do an update if necessary.

Stat stat = zoo.exists("/foo", null);
if (stat == null) {
    zoo.create("/foo", "bar".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
    zoo.setData("/foo", "bar".getBytes(), stat.getVersion());
}

Note the call to state.getVersion() on the update. This is an example of optimistic locking. The client code here will receive an error if trying to do an update to the ZNode if the version number supplied isn't as expected. In this way, different processes can be sure they don't overwrite ZNodes as is the case with Lost Updates. At this point is goes without saying that reads and writes to ZNodes are atomic.

Global Sequence

So we can implement our first use-case: Let's say we have a distributed system of workers and any worker can create a new record. They now face the problem of generating a unique identifier for the record and they must do so in a thread safe manner across the processes.

public static BigInteger incrementAndGet(ZooKeeper zoo, String path) throws KeeperException, InterruptedException {
    BigInteger result;
    Stat stat = zoo.exists(path, null);
    if (stat == null) {
        result = BigInteger.valueOf(1L);
        zoo.create(path, result.toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    } else {
        byte[] data = zoo.getData(path, null, stat);
        result = new BigInteger(data).add(BigInteger.ONE);
        zoo.setData(path, result.toByteArray(), stat.getVersion());
    }
    return result;
}

And let's call it:

ZooKeeper zoo = connect("127.0.0.1:2181");

String PATH = "/next-id";
System.out.println(incrementAndGet(zoo, PATH));

What's missing from this is a retry mechanism for when the optimistic locking fails. I'm not going to include that as it would muddy the waters.

Leader Latch

Let's say we have to fire some business logic on a regular basis. We might have a collection of events sitting in a database table or mongo collection and we can retrieve them in scheduled order. We have some code that will pull the first item off the queue and test if it's time to execute. We now have the problem that we don't want all our processes in our distributed system to poll the database for the next job to execute. We only want one process to do so. And if that process were to go down, or fail for some reason, we want some other process to take over but only one other process. This pattern is called the Leader Latch.

Here is a rather simple implementation.

public static class SchedulerThread extends Thread {

    private String PATH = "/leaderlatch";

    public void run() {
        String address = this.getName();
        try {
            ZooKeeper zoo = connect("127.0.0.1:2181");
            createNodeIfNecessary(zoo, PATH, address);

            while (true) {
                Stat stat = new Stat();
                String currentLeader = new String(zoo.getData(PATH, null, stat));

                if (!currentLeader.equals(address)) {
                    try {
                        if (System.currentTimeMillis() - stat.getMtime() > 20000) {
                            zoo.setData(PATH, address.getBytes(), stat.getVersion());
                            currentLeader = address;
                        }
                    } catch (KeeperException.BadVersionException e) {
                        //failed to assume leadership
                    }
                }

                if (currentLeader.equals(address)) {
                    zoo.setData(PATH, address.getBytes(), stat.getVersion()); //heartbeat
                    //do something significant
                }
                waitASecond();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

As you can see from the implementation, we are using the modified time of the ZNode to determine if the current holder of the ZNode, the leader, has expired. The timeout I have here is 20 seconds. To see this in action, just start the threads.

new SchedulerThread().start();
new SchedulerThread().start();
new SchedulerThread().start();

There is another approach we could take. ZooKeeper allows us to set ZNodes that are only active for the life of the session that created them. This can be used as a mechanism to notify peer processes as to the fact that the leader is no longer active. This method avoids the use of additional threads in each process.

We start with a worker thread which does the work. This will only be active within the leader process.

public static class Worker extends Thread {
        private boolean running = true;

        public Worker(String address) {
            super(address);
        }

        public void run() {
            while (running) {
                System.out.println(this.getName() + " doing stuff");
                waitASecond();
            }
        }

        public void cease() {
            this.running = false;
        }
    }

And here we have a Watcher. A watcher is registered against a ZNode and will receive events whenever the ZNode is updated or deleted. You can see the call to exists() that is made right after the ZNode is created. Since the ZNode's value is never updated, the only time it will ever fire an event to any watchers that are waiting is when the ZNode is deleted when the leader process ends. The value of the Znode is irrelevant.

    public static class SchedulerWatcher implements Watcher {

        private String PATH = "/leader-latch-watchers";
        private String address;
        private ZooKeeper zoo;
        private Worker running;

        public SchedulerWatcher(String address) throws IOException, InterruptedException, KeeperException {
            this.address = address;
            zoo = connect("127.0.0.1:2181");
        }

        public void register() throws Exception {
            try {
                zoo.create(PATH, address.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                this.running = new Worker(address);
                this.running.start();
            } catch (KeeperException.NodeExistsException e) {
                //assumption of leadership failed.
            }

            zoo.exists(PATH, this);
        }

        @Override
        public synchronized void process(WatchedEvent event) {
            if (zoo.getState() == ZooKeeper.States.CLOSED) {
                return;
            }
            try {
                register();
            } catch (Exception e){
                e.printStackTrace();
            }
        }

        public synchronized void close() throws InterruptedException {
            if (running != null) {
                running.cease();
            }
            zoo.close();
        }
    }

Note that we create the ZNode using the EPHEMERAL flag. This indicates that the ZNode should only exist as long as the session that created it is active.

This is how you would use the above code to setup three watchers.

new SchedulerWatcher("Address-1").register();
new SchedulerWatcher("Address-2").register();
new SchedulerWatcher("Address-3").register();

For all its cleverness, the second solution is probably not the best. Its weakness is that the thread that does the work doesn't play an intrinsic role in the heartbeat process. So it's conceivable that the worker thread might die, while the ZooKeeper connection remains open. The former thread based solution consumes additional threads, but the cost of a thread in terms of memory is typically 2mb of stack space and a little bit of computation on a periodic basis. You might also think that the thread based solution above is heavier on the network, and it is, but not materially so.

But it does illustrate the use of watchers and the Ephemeral flag.

Worker Pool

So what's the idea with the Worker Pool? Let's say that we have a number of workers that could be available to do work and we have a client that will randomly offer work to the pool of workers. The worker pool can grow or shrink on demand. Any clients that are offering work should dynamically become aware of the available pool of workers.

ZooKeeper handles this with a particular type of ZNode referred to as a Sequential node. When you create a Sequential node, ZooKeeper will append the next number in a sequence onto the end of the path that is supplied. In this way, multiple clients can create values on the end of a list.

Take this code as an example.

public static class Worker {

    private ZooKeeper zoo;

    public Worker(String address) throws IOException, InterruptedException, KeeperException {
        zoo = connect("127.0.0.1:2181");
        zoo.create(PATH + "/", address.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    public void close() throws InterruptedException {
        zoo.close();
    }
}

Note that we are actually creating an Ephemeral Sequential node here. In this way the list that is represented by the path, will grow or shrink based on the number of open connections to ZooKeeper. You would use these workers like so:

    Worker worker1 = new Worker("Address-1");
    Worker worker2 = new Worker("Address-2");
    Worker worker3 = new Worker("Address-3");

    //Address-1, Address-2, Address-3 are in the pool

    worker1.close();

    //Address-2, Address-3 are in the pool

Queue

Let's consider the problem of a queue. We have many producers that will add values to a list and we have many consumers that will consume values from the list. This can be done in ZooKeeper, but ZooKeeper really isn't appropriate for this kind of thing.

public static void produce(String value) throws IOException, InterruptedException, KeeperException {
    ZooKeeper zoo = connect("127.0.0.1:2181");
    zoo.create(PATH + "/", value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
    zoo.close();
}

public static String consume() throws IOException, InterruptedException, KeeperException {
    ZooKeeper zoo = connect("127.0.0.1:2181");
    String result = null;

    List<String> children = zoo.getChildren(PATH, null);
    OptionalInt nextKey = children.stream()
            .mapToInt(Integer::new)
            .min();

    if (nextKey.isPresent()) {
        Stat stat = new Stat();
        result = new String(zoo.getData(PATH + "/" + String.format("%010d", nextKey.getAsInt()), null, stat));
        try {
            zoo.delete(PATH + "/" + String.format("%010d", nextKey.getAsInt()), stat.getVersion());
        } catch (KeeperException.BadVersionException e) {
            return consume();
        }
    }

    zoo.close();
    return result;
}

As you can see, we have a produce method that is setting a Persistent Sequential ZNode. The consume method above finds all the children and then identifies the minimum. We then pull the value for that child off of ZooKeeper and delete it. As you can see the code is a little unwieldy and we have to do a lot of work to identify the next item in the queue.

So why include this example at all if its not such a great idea? Well there are related ideas that could make use of the queue concept. It not so bad if the queue size is very small and if there are attached processes to the items in the queue.

You could imagine for instance that we have a number of processes that all need access to some restricted resource and we want them to obtain access to the shared resource in the same order in which they first requested access to the resource. How is this implemented? The create method will return the actual path.

String path = zoo.create(PATH + "/", value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);

//path = "/path/0000000001"

In this way, each process each process that adds an item to the list is aware of the number of the item in the list. Each process would have to test whether the minimum child in the list is their number whenever the set of children changes. That could be set up with a watcher.

Conclusion

So ZooKeeper is a distributed service that allows many processes to atomically alter simple values or lists of values. In this way, ZooKeeper plays a crucial role in allowing a distributed cluster of peer-to-peer services manage themselves, elect leaders and share commmon resources. ZooKeeper also allows nodes in such clusters to become aware of new peer nodes in a dynamic fashion.

It is not recommended that values in ZooKeeper exceed 100kb. And its apis can be used to create a queueing mechanism, but it was not built for this purpose.

What I haven't done in this guide is elaborate on how to download and install it. I won't as there are plenty of guides out there. What I will add on that subject is that it was extremely easy to setup. I would also add that there is a handy java library out there called Curator which can simplify the above code dramatically. I chose not to use this in the article as I intended this to be a pedagogical guide on what ZooKeeper actually does. I hope I've kept it really simple.

References