Adventures with Spring XD and ZooKeeper
Back in January I announced my departure from Oracle and my arrival to Pivotal. One of my first goals was to help build a robust distributed runtime platform for Spring XD. Most (if not all) Java developers are familiar with the Spring framework, but Spring XD is not as well known.
Spring XD is a system used for defining streams and/or batch processing of data. Although developers may introduce their own modules into the system, writing code is not a prerequisite for use. This by itself makes Spring XD different from Spring framework. Spring XD is a standalone distributed application. Streams and jobs are created via an interactive shell.
What is a stream? The best way to demonstrate is via an example using the UNIX shell. Consider the following:
tail -f /tmp/log.txt | grep ERROR
The tail command will continuously display the file. The | will pipe the output of tail to grep, which will filter out all lines that don’t include the string ERROR.
This command can be translated to the Spring XD shell:
stream create --name error-filter --definition "tail --name=/tmp/log.txt | filter --expression=payload.contains('ERROR') | log" --deploy
Why is this useful? For this example Spring XD is not very practical. However, imagine building a system to grep for errors in a 1000 node farm of servers. Perhaps these messages should be persisted in HDFS for further analysis in a Hadoop cluster. The creation and management of these types of distributed streams is where Spring XD shines. In addition to streams, Spring XD can handle the scheduling and execution of jobs across a cluster.
As in any distributed system, it is desirable to scale the system up or down to meet demand. Single points of failure should be eliminated to the extent possible. In order to meet these challenges, Spring XD has adopted ZooKeeper as the basis for distributed runtime configuration and coordination.
Apache ZooKeeper is a “centralized service for maintaining configuration information, naming, providing distributed synchronization…[for] distributed applications.” It is used by many projects, including Kafka, Storm and HBase. One of the bits of core functionality that ZooKeeper provides is the ability to emit notifications when data is updated. Spring XD relies on this functionality for stream and job deployments. Much of this functionality, in addition to ZooKeeper data caching, is made much easier via the use of Apache Curator.
A container process in Spring XD has the ability to host modules for streams or jobs. An admin process can accept HTTP requests to manage streams and jobs. It is also responsible for deployment coordination. Multiple admin processes may exist in a Spring XD cluster. One of these processes is designated with the “supervisor” role. Curator’s leader election recipe is used to select which admin process is the supervisor. This process is the one responsible for deploying stream and job modules to containers.
Commands entered into the shell are sent to an admin server via HTTP/REST. Most commands will consist of the creation and deployment of streams and jobs. Stream and job definitions are stored in ZooKeeper by the admin server. When streams and jobs are deployed, a node is written in ZooKeeper by the admin process that received the REST request for deployment. The node triggers the admin supervisor (which may or may not be the same admin process that received the initial REST request) to start the deployment process. This process consists of selecting which containers will host the various modules that comprise the stream/job and writing to ZooKeeper a request for module deployment. The respective containers will deploy the modules, thus establishing the stream/job.
When a container joins or leaves the cluster, the supervisor will determine if any action is required in terms of stream/job module deployment. In most cases, if a departing container was hosting a module, the supervisor will select another container to host that module. Should the system be without a supervisor for a period of time, the new supervisor will query the state of the system upon startup and take any actions required.
See the reference documentation for more details on how Spring XD uses ZooKeeper.
Coherence Near Cache Best Practices
New on the Coherence blog is an article on best practices for near caching written by yours truly. This article went through several reviews by PM and engineering (both internal and field engineers) and is a good summary of our experiences with near caching. The key takeaway is to understand the near cache invalidation strategies and to explicitly configure the one best suited to the application needs.
See the full Near Cache Best Practices article on the official Oracle Coherence blog.
The Live Object Pattern – Coming to a SIG Near You
In spite of this blog being dormant for over a year, things have been busier than ever over in Coherence land! We just released version 3.7.1 which is packed with new features. Some of the highlights:
- REST support in the proxy. In addition to Java, C# and C++, now you can access the grid with any language that has a REST API.
- POF enhancements, including annotations and the ability to eliminate Java key classes (for pure C# and C++ applications)
- Query Explain Plan. If you’ve ever had a problem with a production application missing indexes (because you won’t notice it in development and you may not notice it during testing) you’ll be interested in this feature. It analyzes queries (filter based or CohQL) and indicates where indexes are used (and not used.)
We’ve been adding screencasts to the Coherence YouTube channel going over these features – in most cases the screencasts are delivered by the architect/developer responsible for the feature.
We also just wrapped up JavaOne and Oracle Open World a few weeks ago. I had the privilege of co-presenting the Live Object Pattern with Brian Oliver. The description and PDF can be downloaded from the JavaOne site.
If you didn’t get to see the talk, you’re in luck if you’re in New York or London. Noah Arliss, who worked with Brian on the Live Objects concept and the abstract of the JavaOne talk, will be presenting on this topic at the New York SIG on October 28th and the London SIG on November 4th.
All of the SIGs this fall (including the Bay Area SIG on November 3rd) will cover the new features in Coherence 3.7.1. Come out and join us!
Oracle Enterprise Pack for Eclipse now supports Coherence
The latest release of Oracle Enterprise Pack for Eclipse (OEPE) now includes a Coherence Facet. This makes it convenient to quickly start up a Coherence project and launch nodes right in the IDE. Recently I took it for a test drive and took some notes to help users of Eclipse and Coherence get started.
Note that all images below are thumbnails; click on the images to expand.
You have the option of downloading Eclipse with the OEPE plugins pre-installed, but I already had a copy of Eclipse Helios 3.6; therefore I went for the plugin install. The plugin install is straight forward, similar to any other Eclipse plugin. These instructions are lifted straight from the doc:
- Select Help > Install New Software.
- Click Add to add a new update site.
- In the Add Repository dialog, enter the location as http://download.oracle.com/otn_software/oepe/helios, and then click OK.
- Select Oracle Enterprise Pack for Eclipse, verify that all of the subcomponents are selected, and then click Next.
- Confirm information presented in the Install Details, and then click Finish.
Once the install is complete and you’ve restarted Eclipse, the next step is to install Coherence as a User Library. I’ve got the latest 3.6 bits from our source code repository, so I’ll install that as a library. Note that the plugin also supports Coherence 3.5.2.
Now, let’s create a new Java project. As part of the project creation, add Coherence as a project library.
After the project is created, we’ll have to enable Facets to use the Coherence plugin. Bring up the project properties window and search for “Facets” in the search field.
Once Facets are enabled, select Oracle Coherence.
Upon selection, a link indicating that further configuration is required will appear. Click on the link. Select the “Oracle Coherence 3.6″ library. Note how it provides the option to generate configuration files. Let’s leave all of the checkboxes selected.
Now we are ready to start a cache server. Select File | Run Configurations to bring up the Run Configurations dialog. First select “Oracle Coherence” under the list of run configurations. Next, select the “New” button on the upper left portion of the dialog to create a new run configuration.
Under the “Main” tab, enter com.tangosol.net.DefaultCacheServer as the main class. Of course you are free to create configurations with your own classes; however this example will focus on starting up a cache server.
Note the presence of a “Coherence” tab. This tab allows for operational configuration (items typically found in tangosol-coherence-override.xml) such as the cache configuration file name, multicast address configuration, management/JMX, and so on. Here I decided to leave all of the defaults as is.
After clicking on “Run”, here’s what I get:
We can see that the node started up and formed a cluster, but there are no services listed. This is because the OEPE plugin generated a cache configuration file that defaults to all caches being local. Next, let’s examine the cache configuration file (located under src and add a distributed/partitioned cache to the configuration.
One of the nice features the plugin provides is pre-configured auto complete for Coherence configuration files via the DTD.
Here’s the cache configuration file I used:
<!--?xml version="1.0"?-->
*
partitioned
partitioned
true |
With the modified cache configuration, we now see the partitioned cache service start up:
I can see the Coherence plugin for OEPE being quite useful for Coherence developers on Eclipse not only for quickly starting up a Coherence project (since config files are generated) but also for enabling configuration validation out of the box.
Partitions, Backing Maps, and Services…Oh My!
Recently I was asked the following question:
What is the relationship between a partition, cache, and a backing map?
To answer this, first we should go through some definitions:
Cache: An object that maps keys to values. In Coherence, caches are referred to by name (thus the interface NamedCache.) Caches are also typically clustered, i.e. they can be accessed and modified from any member of a Coherence cluster.
Partition: A unit of transfer for a partitioned/distributed cache. (In Coherence terminology, partitioned and distributed are used interchangeably, with preference towards the former.)
Backing Map: Data structure used by a storage node to hold contents of a cache. For partitioned caches, this data is in binary (serialized) form.
I’ll add one more term for completeness:
Service: Set of threads dedicated to handling requests; a cache service handles requests such as put, get, etc. The DistributedCache service is present on all members of the cluster that read/write/store cache data, even if the node is storage disabled.
Now let’s put these concepts together to see how a clustered partitioned cache works.
First we’ll start with a backing map. It is a straight forward data structure, usually an instance of LocalCache:
The contents of this map are managed by a partitioned cache service. In a typical thread dump, this would be the “DistributedCache” thread:
"DistributedCache" daemon prio=5 tid=101a91800 nid=0x118445000 in Object.wait() [118444000] |
A single cache service can manage multiple backing maps. This is the default behavior if multiple services are not specified in the cache configuration file. (Multiple partitioned cache services are beyond the scope of this post; this topic will be addressed in a future posting.)
So how do partitions come into play? Each backing map is logically split into partitions:
This splitting is used for transferring data between storage enabled members. Here are log file snippets of data transfer between two nodes:
Member 1:
(thread=DistributedCache, member=1): 3> Transferring 128 out of 257 primary partitions to member 2 requesting 128 |
Member 2:
(thread=DistributedCache, member=2): Asking member 1 for 128 primary partitions |
As members are added to the cluster, the partitions are split up amongst the members. Note that this includes backup copies of each partition.
As of Coherence 3.5, it is possible to configure backing maps to physically split partitions into their own respective backing maps. This configuration may be desirable for backing maps that are stored off heap (i.e. NIO direct memory or disk) as it will make data transfer more efficient.