ZooKeeper Implementation

Here we will discuss how zookeeper functions internally

Image result for The components of the ZooKeeper service
Figure 1: Components of Zookeeper service

ZooKeeper provides high availability by replicating the ZooKeeper data on each server that composes the service. We assume that servers fail by crashing, and such faulty servers may later recover. Figure 1 shows the high-level components of the ZooKeeper service. Upon receiving a request, a server prepares it for execution (request processor). If such a request requires coordination among the servers (write requests), then they use an agreement protocol (an implementation of atomic broadcast), and finally, servers commit changes to the ZooKeeper database fully replicated across all servers of the ensemble. In the case of read requests, a server simply reads the state of the local database and generates a response to the request. The replicated database is an in-memory database containing the entire data tree.

Each znode in the tree stores a maximum of 1MB of data by default, but this maximum value is a configuration parameter that can be changed in specific cases. For recoverability, we efficiently log updates to disk, and we force writes to be on the disk media before they are applied to the in-memory database. In fact, as Chubby, we keep a replay log (a write-ahead log, in our case) of committed operations and generate periodic snapshots of the in-memory database. Every ZooKeeper server services clients. Clients connect to exactly one server to submit its requests. As we noted earlier, read requests are serviced from the local replica of each server database.

Requests that change the state of the service, write requests, are processed by an agreement protocol. As part of the agreement protocol write requests are forwarded to a single server, called the leader . The rest of the ZooKeeper servers, called followers, receive message proposals consisting of state changes from the leader and agree upon state changes.

4.1 Request Processor

Since the messaging layer is atomic, we guarantee that the local replicas never diverge, although at any point in time some servers may have applied more transactions than others. Unlike the requests sent from clients, the transactions are idempotent. When the leader receives a write request, it calculates what the state of the system will be when the write is applied and transforms it into a transaction that captures this new state. The future state must be calculated because there may be outstanding transactions that have not yet been applied to the database.

For example, if a client does a conditional setData and the version number in the request matches the future version number of the znode being updated, the service generates a setDataTXN that contains the new data, the new version number, and updated time stamps. If an error occurs, such as mismatched version numbers or the znode to be updated does not exist, an errorTXN is generated instead.

4.2 Atomic Broadcast

All requests that update ZooKeeper state are forwarded to the leader. The leader executes the request and broadcasts the change to the ZooKeeper state through Zab, an atomic broadcast protocol. The server that receives the client request responds to the client when it delivers the corresponding state change. Zab uses by default simple majority quorums to decide on a proposal, so Zab and thus ZooKeeper can only work if a majority of servers are correct (i.e., with 2f + 1 server we can tolerate f failures). To achieve high throughput, ZooKeeper tries to keep the request processing pipeline full. It may have thousands of requests in different parts of the processing pipeline. Because state changes depend on the application of previous state changes, Zab provides stronger order guarantees than regular atomic broadcast.

More specifically, Zab guarantees that changes broadcast by a leader are delivered in the order they were sent and all changes from previous leaders are delivered to an established leader before it broadcasts its own changes. There are a few implementation details that simplify our implementation and give us excellent performance. We use TCP for our transport so message order is maintained by the network, which allows us to simplify our implementation. We use the leader chosen by Zab as the ZooKeeper leader, so that the same process that creates transactions also proposes them. We use the log to keep track of proposals as the write-ahead log for the in-memory database, so that we do not have to write messages twice to disk. During normal operation, Zab does deliver all messages in order and exactly once, but since Zab does not persistently record the id of every message delivered, Zab may redeliver a message during recovery. Because we use idempotent transactions, multiple delivery is acceptable as long as they are delivered in order. In fact, ZooKeeper requires Zab to redeliver at least all messages that were delivered after the start of the last snapshot.

4.3 Replicated Database

Each replica has a copy in memory of the ZooKeeper state. When a ZooKeeper server recovers from a crash, it needs to recover this internal state. Replaying all delivered messages to recover state would take prohibitively long after running the server for a while, so ZooKeeper uses periodic snapshots and only requires redelivery of messages since the start of the snapshot. We call ZooKeeper snapshots fuzzy snapshots since we do not lock the ZooKeeper state to take the snapshot; instead, we do a depth-first scan of the tree atomically reading each znode’s data and meta-data and writing them to disk.

Since the resulting fuzzy snapshot may have applied some subset of the state changes delivered during the generation of the snapshot, the result may not correspond to the state of ZooKeeper at any point in time. However, since state changes are idempotent, we can apply them twice as long as we apply the state changes in order.

For example, assume that in a ZooKeeper data tree two nodes /foo and /goo have values f1 and g1 respectively and both are at version 1 when the fuzzy snapshot begins, and the following stream of state changes arrive having the form
(transactionType, path, value, new-version)

(SetDataTXN, /foo, f2, 2)
(SetDataTXN, /goo, g2, 2)
(SetDataTXN, /foo, f3, 3)

After processing these state changes, /foo and /goo have values f3 and g2 with versions 3 and 2 respectively. However, the fuzzy snapshot may have recorded that /foo and /goo have values f3 and g1 with versions 3 and 1 respectively, which was not a valid state of the ZooKeeper data tree. If the server crashes and recovers with this snapshot and Zab redelivers the state changes, the resulting state corresponds to the state of the service before the crash.

4.4 Client-Server Interactions

When a server processes a write request, it also sends out and clears notifications relative to any watch that corresponds to that update. Servers process writes in order and do not process other writes or reads concurrently. This ensures strict succession of notifications. Note that servers handle notifications locally. Only the server that a client is connected to tracks and triggers notifications for that client. Read requests are handled locally at each server.

Each read request is processed and tagged with a zxid that corresponds to the last transaction seen by the server. This zxid defines the partial order of the read requests with respect to the write requests. By processing reads locally, we obtain excellent read performance because it is just an in-memory operation on the local server, and there is no disk activity or agreement protocol to run. This design choice is key to achieving our goal of excellent performance with read-dominant workloads.

One drawback of using fast reads is not guaranteeing precedence order for read operations. That is, a read operation may return a stale value, even though a more recent update to the same znode has been committed. Not all of our applications require precedence order, but for applications that do require it, we have implemented sync. This primitive executes asynchronously and is ordered by the leader after all pending writes to its local replica. To guarantee that a given read operation returns the latest updated value, a client calls sync followed by the read operation. The FIFO order guarantee of client operations together with the global guarantee of sync enables the result of the read operation to reflect any changes that happened before the sync was issued. In our implementation, we do not need to atomically broadcast sync as we use a leader-based algorithm, and we simply place the sync operation at the end of the queue of requests between the leader and the server executing the call to sync. In order for this to work, the follower must be sure that the leader is still the leader. If there are pending transactions that commit, then the server does not suspect the leader. If the pending queue is empty, the leader needs to issue a null transaction to commit and orders the sync after that transaction. This has the nice property that when the leader is under load, no extra broadcast traffic is generated. In our implementation, timeouts are set such that leaders realize they are not leaders before followers abandon them, so we do not issue the null transaction.

ZooKeeper servers process requests from clients in FIFO order. Responses include the zxid that the response is relative to. Even heartbeat messages during intervals of no activity include the last zxid seen by the server that the client is connected to. If the client connects to a new server, that new server ensures that its view of the ZooKeeper data is at least as recent as the view of the client by checking the last zxid of the client against its last zxid. If the client has a more recent view than the server, the server does not reestablish the session with the client until the server has caught up. The client is guaranteed to be able to find another server that has a recent view of the system since the client only sees changes that have been replicated to a majority of the ZooKeeper servers. This behavior is important to guarantee durability.

To detect client session failures, ZooKeeper uses timeouts. The leader determines that there has been a failure if no other server receives anything from a client session within the session timeout. If the client sends requests frequently enough, then there is no need to send any other message. Otherwise, the client sends heartbeat messages during periods of low activity. If the client cannot communicate with a server to send a request or heartbeat, it connects to a different ZooKeeper server to re-establish its session. To prevent the session from timing out, the ZooKeeper client library sends a heartbeat after the session has been idle for s/3 ms and switch to a new server if it has not heard from a server for 2s/3 ms, where s is the session timeout in milliseconds.

 

how does salt work?

SALT-MASTER job flow:

The Salt master works by always publishing commands to all connected minions and the minions decide if the command is meant for them by checking themselves against the command target.

The typical lifecycle of a salt job from the perspective of the master might be as follows:

  1. A command is issued on the CLI. For example, ‘salt my_minion test.version’.
  2. The ‘salt’ command uses LocalClient to generate a request to the salt master by connecting to the ReqServer on TCP:4506 and issuing the job.
  3. The salt-master ReqServer sees the request and passes it to an available MWorker over workers.ipc.
  4. A worker picks up the request and handles it. First, it checks to ensure that the requested user has permissions to issue the command. Then, it sends the publish command to all connected minions. For the curious, this happens in ClearFuncs.publish().
  5. The worker announces on the master event bus that it is about to publish a job to connected minions. This happens by placing the event on the master event bus (master_event_pull.ipc) where the EventPublisher picks it up and distributes it to all connected event listeners on master_event_pub.ipc.
  6. The message to the minions is encrypted and sent to the Publisher via IPC on publish_pull.ipc.
  7. Connected minions have a TCP session established with the Publisher on TCP port 4505 where they await commands. When the Publisher receives the job over publish_pull, it sends the jobs across the wire to the minions for processing.
  8. After the minions receive the request, they decrypt it and perform any requested work, if they determine that they are targeted to do so.
  9. When the minion is ready to respond, it publishes the result of its job back to the master by sending the encrypted result back to the master on TCP 4506 where it is again picked up by the ReqServer and forwarded to an available MWorker for processing. (Again, this happens by passing this message across workers.ipc to an available worker.)
  10. When the MWorker receives the job it decrypts it and fires an event onto the master event bus (master_event_pull.ipc). (Again for the curious, this happens in AESFuncs._return().
  11. The EventPublisher sees this event and re-publishes it on the bus to all connected listeners of the master event bus (on master_event_pub.ipc). This is where the LocalClient has been waiting, listening to the event bus for minion replies. It gathers the job and stores the result.
  12. When all targeted minions have replied or the timeout has been exceeded, the salt client displays the results of the job to the user on the CLI.

SALT-MINION job flow:

When a salt minion starts up, it attempts to connect to the Publisher and the ReqServer on the salt master. It then attempts to authenticate and once the minion has successfully authenticated, it simply listens for jobs.

Jobs normally come either come from the ‘salt-call’ script run by a local user on the salt minion or they can come directly from a master.

The job flow on a minion, coming from the master via a ‘salt’ command is as follows:

1) A master publishes a job that is received by a minion as outlined by the master’s job flow above.

2) The minion is polling its receive socket that’s connected to the master Publisher (TCP 4505 on master). When it detects an incoming message, it picks it up from the socket and decrypts it.

3) A new minion process or thread is created and provided with the contents of the decrypted message. The _thread_return() method is provided with the contents of the received message.

4) The new minion thread is created. The _thread_return() function starts up and actually calls out to the requested function contained in the job.

5) The requested function runs and returns a result.

6) The result of the function that’s run is encrypted and returned to the master’s ReqServer (TCP 4506 on master).

7) Thread exits. Because the main thread was only blocked for the time that it took to initialize the worker thread, many other requests could have been received and processed during this time.

How arguments in Python decorated functions work

I have troubles understanding how the argument is passed to a wrapper function inside a decorator.
Take a simple example:

def my_decorator(func):
    def wrapper(func_arg):
        print('Before')
        func(func_arg)
        print('After')
    return wrapper

@my_decorator
def my_function(arg):
    print(arg + 1)

my_function(1)

I have a function that takes 1 argument and it is decorated. I have troubles in understanding how func_arg works. When my_function(1) is called, how is the value 1 passed to the wrapper. From my little understanding of this, is that my_function is ‘replaced’ by
a new function like: my_function = my_decorator(my_function).

print(my_function)
<function my_decorator.<locals>.wrapper at 0x7f72fea9c620>

Solution:

Your understanding is entirely correct. Decorator syntax is just syntactic sugar, the lines:

@my_decorator
def my_function(arg):
    print(arg + 1)

are executed as

def my_function(arg):
    print(arg + 1)

my_function = my_decorator(my_function)

without my_function actually having been set before the decorator is called*.

So my_function is now bound to the wrapper() function created in your my_decorator() function. The original function object was passed into my_decorator() as the func argument, so is still available to the wrapper() function, as a closure. So calling func() calls the original function object.

So when you call the decorated my_function(1) object, you really call wrapper(1). This function receives the 1 via the name func_arg, and wrapper() then itself calls func(func_arg), which is the original function object. So in the end, the original function is passed 1 too.

You can see this result in the interpreter:

>>> def my_decorator(func):
...     def wrapper(func_arg):
...         print('Before')
...         func(func_arg)
...         print('After')
...     return wrapper
...
>>> @my_decorator
... def my_function(arg):
...     print(arg + 1)
...
>>> my_function
<function my_decorator.<locals>.wrapper at 0x10f278ea0>
>>> my_function.__closure__
(<cell at 0x10ecdf498: function object at 0x10ece9730>,)
>>> my_function.__closure__[0].cell_contents
<function my_function at 0x10ece9730>
>>> my_function.__closure__[0].cell_contents(1)
2

Closures are accessible via the __closure__ attribute, and you can access the current value for a closure via the cell_contents attribute. Here, that’s the original decorated function object.

It is important to note that each time you call my_decorator(), a new function object is created. They are all named wrapper() but they are separate objects, each with their own __closure__.


* Python produces bytecode that creates the function object without assigning it to a name; it lives on the stack instead. The next bytecode instruction then calls the decorator object:

>>> import dis
>>> dis.dis(compile('@my_decorator\ndef my_function(arg):\n    print(arg + 1)\n', '', 'exec'))
  1           0 LOAD_NAME                0 (my_decorator)
              2 LOAD_CONST               0 (<code object my_function at 0x10f25bb70, file "", line 1>)
              4 LOAD_CONST               1 ('my_function')
              6 MAKE_FUNCTION            0
              8 CALL_FUNCTION            1
             10 STORE_NAME               1 (my_function)
             12 LOAD_CONST               2 (None)
             14 RETURN_VALUE

So first LOAD_NAME looks up the my_decorator name. Next, the bytecode generated for the function object is loaded as well as the name for the function. MAKE_FUNCTION creates the function object from those two pieces of information (removing them from the stack) and puts the resulting function object back on. CALL_FUNCTION then takes the one argument on the stack (it’s operand 1 tells it how many positional arguments to take), and calls the next object on the stack (the decorator object loaded). The result of that call is then stored under the name my_function.

Nested Function calling in Python

def f1(): 
    X = 88
    def f2(): 
        print(X)
    return f2
action = f1() 
action()

Since f1 is returning f2 so it seems fine when I call f2 as (f1())().

But when I call f2 directly as f2(), it gives error.

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
NameError: name 'f2' is not defined

Can someone explain what is the difference between the function calling of f2 using above 2 ways.

Solution:

The function f2 is local to the scope of function f1. Its name is only valid inside of that function because you defined it there. When you return f2, all you are doing is giving the rest of the program access to the function’s properties, not to its name. The function f1 returns something that prints 88 but does not expose the name f2 to the outer scope.

Calling f2 indirectly through f1()() or action() is perfectly valid because those names are defined in that outer scope. The name f2 is not defined in the outer scope so calling it is a NameError.

acess function in objected, nested inside another function

Im trying to manage a connection instance, using a function to handle idle connection disconnect issues, using mysql database and node.js

At moment, i’ve got following code (coffescript):

mysql = require 'mysql'

handleDisconnect = () ->
  connection = mysql.createConnection
    host: 'localhost'
    user: 'root'
    password: 'passroot'
    database: 'mydb'

  connection.connect (err) ->
    if err
      console.log 'Error connecting to db: ', err
    setTimeout handleDisconnect, 2000

  connection.on 'error', (err) ->
    console.log 'db error', err
    if err.code == 'PROTOCOL_CONNECTION_LOST'
      handleDisconnect()
    else
      throw err

  handleDisconnect.instance = connection

module.exports = handleDisconnect

and

express = require 'express'
router = express.Router()
connection = require('../database')().instance

bcrypt = require 'bcryptjs'

router.post '/', (req, res) ->
  credential = connection.escape req.body.credential
  password = connection.escape req.body.password
  res.send credential+password

module.exports = router

Problem is, when i try to access the route, i get following error:

Cannot read property ‘escape’ of undefined

What am i doing wrong?

Solution:

I believe your issue is that the final line of handleDisconnect is returning the instance, so you’re trying to get the instance from instance, not from handleDisconnect. So you’ll need the function to return itself at the end if you want to access properties on it.

You also want the function to be using the equivalent of “this” (@ in coffeescript) rather than specifically referring to handleDisconnect.

Example code:

mysql = require 'mysql'

handleDisconnect = () ->
  connection = mysql.createConnection
    host: 'localhost'
    user: 'root'
    password: 'passroot'
    database: 'mydb'

  connection.connect (err) ->
    if err
      console.log 'Error connecting to db: ', err
    setTimeout handleDisconnect, 2000

  connection.on 'error', (err) ->
    console.log 'db error', err
    if err.code == 'PROTOCOL_CONNECTION_LOST'
      handleDisconnect()
    else
      throw err

  @instance = connection
  @

module.exports = handleDisconnect

Although I’d personally just do the following, don’t bother with “instance” at all:

  1. Use @connection in your function
  2. Scrap the @instance = connection
  3. Get the function to return itself
  4. Access it with require('../database')().connection.

execute python function from terminal

If you have a python file: myfunction.py

def hours_to_minutes(minutes):
  hours = minutes / 60.0
  return hours
<span 				data-mce-type="bookmark" 				id="mce_SELREST_start" 				data-mce-style="overflow:hidden;line-height:0" 				style="overflow:hidden;line-height:0" 			></span>

Then you can execute it with:

python -c ‘from function import *; print(hours_to_minutes(20))’