Zookeeper Queues – Priority & Producer-Consumer Queue

Boost your career with Free Big Data Courses!!

In our last ZooKeeper tutorial, we discussed ZooKeeper CLI.  Today, in this Zookeeper article, we will learn to implement queue (distributed and Priority) in ZooKeeper Queues. Also, we will show simple implementations of producer-consumer Zookeeper Queues in detail.

So let’s begin Zookeeper Queues.

What is Zookeeper Queues?

On defining Distributed queues, they are a common data structure. At very first designate a Znode to hold the queue, the queue node, in order to implement a distributed queue in ZooKeeper.

However, by calling create() with a pathname ending in “queue-” the distributed clients put something into the queue, along with the sequence and ephemeral flags in the create() call set to true.

Though it is only because the sequence flag is set. So, the form of the new pathnames will be  _path-to-queue-node_/queue-X. Make sure, X refers to monotonic increasing number here.

However, if any client wants to be removed from the queue calls ZooKeeper’s getChildren( ) function, then it set to true on the queue node, and further starts processing nodes with the lowest number.

Until the client exhausts the list obtained from the first getChildren( ) call, it does not need to issue another getChildren( ). So, the ZooKeeper reader waits for a watch notification to check the queue again if there are are no children in the queue node.

Priority Queues: Zookeeper Queues

Further, we only need to make two simple changes to the generic ZooKeeper queues recipe, to implement a priority queue.

  • The first change is, the pathname must ends with “queue-YY” where YY is the priority of the element with lower numbers representing higher priority (as same as UNIX), in order to add to a queue.
  • And the second change is, as a client uses an up-to-date children list meaning while removing from the queue,  if a watch notification triggers for the queue node, that the client will invalidate previously obtained children lists.

Producer-Consumer Queues

A distributed data structure which we generally use to generate and consume items is what we call producer-consumer queues in ZooKeeper. Basically, at first Producer processes create new elements and then further it adds them to the queue.

Whereas, the Consumer processes remove elements from the list and then further process them. However, the elements are simple integers, in this implementation.

Here, by a root node a queue is represented, and also a producer process creates a new node, a child of the root node, in order to add an element to the queue.

Below is an excerpt of code corresponds to the constructor of the object. Though, first, it calls the constructor of the parent class,! SyncPrimitive. So, if one doesn’t exist, that creates a!ZooKeeper object. Also, it checks if the root node of the queue exists, and then creates one if in case it doesn’t.   

/**
******* Constructor of producer-consumer queue
*******
******* @param address
******* @param name
******* /
       Queue(String address, String name)
       throws KeeperException, InterruptedException {
           super(address);
           this.root = name;
           // Create ZK node name
           if (zk != null) {
                   Stat s = zk.exists(root, false);
                   if (s == null) {
                       zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
                   }
           }
       }

Furthermore, to add an element to the queue, a producer process calls “produce()”, and then also passes an integer as an argument.

And, the method creates a new node using “create()”, to add an element to the queue, and further to instruct!ZooKeeper to append the value of the sequencer counter associated to the root node, it uses the SEQUENCE flag.

Thus guaranteeing that the oldest element of the queue is the next one consumed, while, we impose a total order on the elements of the queue, in this way.

   /**
******* Add element to the queue.
*******
******* @param i
******* @return
******* /
       boolean produce(int i) throws KeeperException, InterruptedException{
           ByteBuffer b = ByteBuffer.allocate(4);
           byte[] value;
           // Add child with value i
           b.putInt(i);
           value = b.array();
           zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
                      CreateFlags.SEQUENCE);
           return true;
       }

However, in order to consume an element, a consumer process first obtains the children of the root node, after that it reads the node with smallest counter value, and also returns the element. 

In addition, when we make a call to getChildren(). we get that list of children in lexicographic order. However, make sure the lexicographic order does not necessarily follow the numerical order of the counter values.

Hence we have to decide that which element is the smallest here. So, we traverse the list and remove the prefix “element” from each one, to decide which one has the smallest counter value.

    /**
******* Remove first element from the queue.
*******
******* @return
******* @throws KeeperException
******* @throws InterruptedException
******* /
       int consume() throws KeeperException, InterruptedException
           int retvalue = -1;
           Stat stat = null;
           // Get the first element available
           while (true) {
               synchronized (mutex) {
                   ArrayList<String> list = zk.getChildren(root, true);
                   if (list.isEmpty()) {
                       System.out.println("Going to wait");
                       mutex.wait();
                   } else {
                       Integer min = new Integer(list.get(0).substring(7));
                       for(String s : list){
                           Integer tempValue = new Integer(s.substring(7));
                           if(tempValue < min) min = tempValue;
                       }
                       System.out.println("Temporary value: " + root + "/element" + min);
                       byte[] b = zk.getData(root + "/element" + min, false, stat);
                       zk.delete(root + "/element" + min, 0);
                       ByteBuffer buffer = ByteBuffer.wrap(b);
                       retvalue = buffer.getInt();
                       return retvalue;
                   }
               }
           }
       }

So, this was all in ZooKeeper Queues. Hope you like our explanation.

Conclusion

Hence, we have learned Zookeeper Queues in detail. Moreover, we discussed Producer-consumer ZooKeeper Queues. Furthermore, if any doubt occurs regarding Queue in ZooKeeper, feel free to ask in the comment section. We are happy to help!

You give me 15 seconds I promise you best tutorials
Please share your happy experience on Google

follow dataflair on YouTube

Leave a Reply

Your email address will not be published. Required fields are marked *