Adventures with Spring XD and ZooKeeper
Back in January I announced my departure from Oracle and my arrival to Pivotal. One of my first goals was to help build a robust distributed runtime platform for Spring XD. Most (if not all) Java developers are familiar with the Spring framework, but Spring XD is not as well known.
Spring XD is a system used for defining streams and/or batch processing of data. Although developers may introduce their own modules into the system, writing code is not a prerequisite for use. This by itself makes Spring XD different from Spring framework. Spring XD is a standalone distributed application. Streams and jobs are created via an interactive shell.
What is a stream? The best way to demonstrate is via an example using the UNIX shell. Consider the following:
tail -f /tmp/log.txt | grep ERROR
The tail command will continuously display the file. The | will pipe the output of tail to grep, which will filter out all lines that don’t include the string ERROR.
This command can be translated to the Spring XD shell:
stream create --name error-filter --definition "tail --name=/tmp/log.txt | filter --expression=payload.contains('ERROR') | log" --deploy
Why is this useful? For this example Spring XD is not very practical. However, imagine building a system to grep for errors in a 1000 node farm of servers. Perhaps these messages should be persisted in HDFS for further analysis in a Hadoop cluster. The creation and management of these types of distributed streams is where Spring XD shines. In addition to streams, Spring XD can handle the scheduling and execution of jobs across a cluster.
As in any distributed system, it is desirable to scale the system up or down to meet demand. Single points of failure should be eliminated to the extent possible. In order to meet these challenges, Spring XD has adopted ZooKeeper as the basis for distributed runtime configuration and coordination.
Apache ZooKeeper is a “centralized service for maintaining configuration information, naming, providing distributed synchronization…[for] distributed applications.” It is used by many projects, including Kafka, Storm and HBase. One of the bits of core functionality that ZooKeeper provides is the ability to emit notifications when data is updated. Spring XD relies on this functionality for stream and job deployments. Much of this functionality, in addition to ZooKeeper data caching, is made much easier via the use of Apache Curator.
A container process in Spring XD has the ability to host modules for streams or jobs. An admin process can accept HTTP requests to manage streams and jobs. It is also responsible for deployment coordination. Multiple admin processes may exist in a Spring XD cluster. One of these processes is designated with the “supervisor” role. Curator’s leader election recipe is used to select which admin process is the supervisor. This process is the one responsible for deploying stream and job modules to containers.
Commands entered into the shell are sent to an admin server via HTTP/REST. Most commands will consist of the creation and deployment of streams and jobs. Stream and job definitions are stored in ZooKeeper by the admin server. When streams and jobs are deployed, a node is written in ZooKeeper by the admin process that received the REST request for deployment. The node triggers the admin supervisor (which may or may not be the same admin process that received the initial REST request) to start the deployment process. This process consists of selecting which containers will host the various modules that comprise the stream/job and writing to ZooKeeper a request for module deployment. The respective containers will deploy the modules, thus establishing the stream/job.
When a container joins or leaves the cluster, the supervisor will determine if any action is required in terms of stream/job module deployment. In most cases, if a departing container was hosting a module, the supervisor will select another container to host that module. Should the system be without a supervisor for a period of time, the new supervisor will query the state of the system upon startup and take any actions required.
See the reference documentation for more details on how Spring XD uses ZooKeeper.