Four Paths to Java Parallelism

Parallel programming in Java is becoming easier with tools such as the fork/join framework, Pervasive DataRush, Terracotta, and Hadoop. This article gives a high-level description of each approach, pointing you in the right direction to begin writing parallel applications of your own.
##CONTINUE##
Boiling the Ocean of Data
Companies today are swimming in data. The increasing ease with which data can be collected, combined with the decreasing cost of storing and managing it, means huge amounts of information are now accessible to anyone with the inclination. And many businesses do have the desire. Internet search engines index billions of pages of content on the web. Banks analyze credit card transactions looking for fraud. While these may be industry-specific examples, there are others that apply broadly: business intelligence, predictive analytics, and regulatory compliance to name a few.

How can programmers handle the volumes of data now available? Even efficient algorithms take a long time to run on such enormous data sets. Huge amounts of processing need to be harnessed. The good news is that we already know the answer. Maybe parallel processing isn't new, but with multicore processors it has become increasingly accessible to everyone. The need and ability to perform complex data processing is no longer just in the realm of scientific computing, but increasingly has become a concern of the corporate world.

There's bad news too. While parallel processing is much more feasible, it isn't necessarily any easier. Writing complex and correct multithreaded programs is hard. There are many details of concurrency - thread management, data consistency, and synchronization to name a few - that must be addressed. Programming mistakes can lead to subtle and difficult-to-find errors. Because there is no "one-size-fits-all" solution to parallelism, you might find yourself focusing more on developing a concurrency framework and less on solving your original problem.

Making parallelism easier has been a topic of interest in the Java world lately. Java starts out with basic concurrency constructs, such as threading and synchronization, built into the language. It also provides a consistent memory model regardless of underlying hardware. Most programmers are now aware of (and perhaps vaguely intimidated by) the utilities of java.util.concurrent added in Java 5. At least these tools provide a step up from working with bare threads, implementing many abstractions you would likely need to create yourself. However, it remains very low-level with its power coming at the expense of requiring significant knowledge (and preferably concurrent programming experience) (Sun, 2006).

Fortunately, a number of tools and frameworks that attempt to simplify parallelism in Java are available, or soon will be. We'll look at four of these approaches with the intent of finding the "sweet spot" of each - where they provide the most benefit with least cost - financial, conceptual, or otherwise.

Fork/Join: Divide-and-Conquer
If your problem fits into the heap of a single JVM on an SMP machine and can be decomposed using divide-and-conquer, then help is coming in Java 7's fork/join framework. How do you know if your problem fits the divide-and-conquer mold? Such an algorithm solves a problem by splitting it into smaller subproblems, solving them, then merging the results to form the solution to the original problem. The beauty of the paradigm is that the solutions to subproblems can be computed independently of one another, allowing the computer to work on them simultaneously. In Figure 1, you can see one way to divide-and-conquer grepping for a pattern in a file with size lines.

If the computation is large enough, the original problem may be recursively split into smaller and smaller pieces until solving a given subproblem sequentially is less expensive than further subdividing it and solving the components in parallel. You can imagine for expensive computations on large data, a divide-and-conquer implementation might generate a large number of subproblems, far outstripping the parallelism of the underlying hardware. Luckily, the fork/join execution model is designed to efficiently handle exactly this situation (Goetz, 2007).

The heart of fork/join is the ForkJoinTask, a concurrent construct much lighter than a thread. Listing 1 shows the code to implement the grep computation as a RecursiveAction, which is a ForkJoinTask designed explicitly for divide-and-conquer problems. Because tasks cost very little, you're free to create a large number of them, assigning each a fine-grained unit of work (you're responsible for determining the exact amount). Multiple tasks will be executed by each ForkJoinWorkerThread in the ForkJoinPool, the size of which you often tune commensurate to the structure of the underlying hardware.

The fork/join execution model hinges on largely implicit restrictions on tasks. First, tasks cannot perform blocking synchronization. If every task currently being executed by the thread pool were to block simultaneously, your application would deadlock: a very real danger on a machine with only a handful of cores. Second, tasks cannot depend on one another, but should only be coordinated using fork and join. Inter-task communication can lead to performance degradation or deadlock, as in the first case. Finally, it is not advisable for fork/join tasks to perform I/O and there are certainly no tools to support this use case. These restrictions would be impractical to enforce rigidly, but the API intimates them by refusing to allow you to throw checked exceptions (Lea, JSR 166y API).

That's not all the fork/join framework has to offer. Should your problem involve sorting data by one or more key fields, searching through the data to find all or some records meeting your criteria, or summarizing the data by computing aggregations like an average, you may be able to code at a significantly higher level using ParallelArray. Compare the brevity of the ParallelArray implementation of grep in Listing 2 with our previous implementation. ParallelArray is built on top of the task model discussed earlier and designed for a narrower class of problem. For sort, search, and summarization on in-memory arrays, it provides a declarative, SQL-like interface to manipulating your data, somewhat analogous to a poor man's LINQ (Goetz, 2008).

The fork/join package is replete with operation interfaces designed to be passed to the various methods of ParallelArray (Lea, JSR 166y API). By defining a predicate operation, you can filter your data to find the subset relevant to your computation (see withFilter). With a procedure operation, you can perform arbitrary actions for each record (see apply). The various typed mapping operations allow you to transform your data (see withMapping). Replacements can be performed using the built-in sort and cumulate methods, or the more general replaceWith* family of methods. Likewise, the general purpose reduce method can be used directly with a reducer operation, or summary functions like min and max. You can even produce a combination of these aggregations using the summary method (Goetz, 2008), (Lea, JSR 166y API).

The utilities of the upcoming Java 7 fork/join package provide powerful and succinct mechanisms for fine-grained parallelism. They are tuned to in-memory problems, explicitly in the case of ParallelArray and implicitly in that of ForkJoinTask, as discussed earlier. What they don't provide is facilities to help you when your problem starts to scale out of the heap. What can you do when you find yourself in this situation?

Pervasive DataRush: Dataflow Programming
When processing large sets of data, you'll notice it often follows a series of steps: read the data, perform A on it, and then perform both B and C to that result, and so on. You can express this processing as a graph of operations, connected by the flow of data. This is the essence of dataflow programming - execution as the streaming of data through a graph. As the data is streaming, only data required by any active operation need be in memory at any given time, allowing very large data sets to be processed.

Besides offering the potential for scaling to problems larger than the heap would otherwise permit, dataflow graphs are a useful model for parallelism because they can express and therefore exploit multiple forms of parallelism. By its very nature, a dataflow graph exhibits pipeline parallelism. If each operator generates output incrementally, dependent operators can execute simultaneously, just a few steps behind. Also, if the results of an operator are independent for each piece of data, the operator can be replaced with multiple copies, each receiving a portion of the original input. This is called horizontal partitioning. Finally, the output of an operator might undergo multiple sets of processing and later be merged (this is most prevalent with record data) as input to another operator. The different branches can execute in parallel; this is vertical parallelism.

Pervasive DataRush is a library and dataflow engine, allowing you to construct and execute dataflow graphs in Java. You develop new operators simply by extending a base class and implementing a few methods. Refer to Listing 3 to see how an operator might look. All threading and synchronization is handled by the framework as data is only shared through inputs and outputs. You can focus on just the processing logic of the operator. A library of common operators is already implemented as part of Pervasive DataRush, in addition to the dataflow engine.

It is just as straightforward to assemble a dataflow graph in Pervasive DataRush. You compose them by adding operators. Operators require their input sources in order to be constructed, so the wiring of outputs to inputs is done as you build the graph. Once you are finished composing, just invoke the run method and the graph begins execution. Because this is all done in Java, composition can be done conditionally based on pre-execution processing. The most common example would be making adjustments to the graph based on the number of available processors. Or you can extend this idea even further - since operators are also written in Java, they can compose graphs of their own, extending a currently executing graph. Figure 2 shows a sample dataflow graph and Listing 4 shows the Pervasive DataRush code used to construct it. In addition, it is possible to construct a graph that is used like an operator (in fact, the majority of the operator library is done this way); construction is done similarly to the above.

While some problems are clearly and naturally expressible as dataflow, others can be more difficult to adapt. Particularly difficult to deal with are operators that need all of the input before producing any output, with sorting and aggregating being prime examples of this. Not only does this disrupt any pipeline parallelism occurring in dependent operators, it can impact scalability if all the data ends up held in memory. These problems can usually be solved by clever thinking, taking advantage of additional knowledge of the graph (aggregating sorted data is easier) or dynamically extending the graph (to help manage memory usage). Fortunately, the pre-packaged library in Pervasive DataRush provides sorting and aggregating operators so you don't have to worry about this. Despite any required change in mindset, dataflow has proven to be a successful approach for quickly developing efficient, parallelized solutions for the analysis of large data sets (Falgout, 2008).

Terracotta: JVM Clustering
So far, we've constrained ourselves to running on a single multicore machine. However, we live in a world of inexpensive hardware and networking. If we're willing to accept more administrative complexity, we can make an attempt to gain horizontal scaling through a distributed solution. As your problems get larger, you can just throw more machines at it.

Terracotta is an open source solution for doing just that. It allows multiple JVMs, potentially on different machines, to cluster and behave as a single JVM. Not only does this provide more processing power, it also provides more memory. The best part - this is all transparent to the programmer. You can make a multi-threaded program into a clustered program without rewriting any code. Just specify which objects need to be shared across the cluster and things like change propagation and distributed locking are handled for you.

Unlike the other solutions we discussed, Terracotta does not provide (by itself) any abstractions that hide concurrency. You still have to worry about threads and locking when writing code. Plus, since it's easy to share objects, it's also easy to naively introduce cluster-wide hotspots that kill performance. But if you follow a simple pattern - the Master/Worker pattern - most of these issues can be avoided. Making it even easier, the Terracotta Framework Library already provides an implementation of this pattern.

The Master/Worker pattern uses relatively familiar concepts from multithreading. A master thread breaks a task into smaller units of work, which are then placed on a work queue. The worker threads consume the contents of the work queue, performing these subtasks, and return the results to the master. All you need to do is place workers on each JVM of the cluster and use Terracotta to share the work queue, and you now have a horizontally scalable solution, as illustrated in Figure 3.

Hadoop: Distributed MapReduce
The Terracotta approach to distributing a parallel application comes at the cost of writing your application using threads. Just as our programmer on a single SMP machine wasn't content with threads and the utilities of java.util.concurrent, and so traded up for the higher-level tools in the fork/join framework and Pervasive DataRush, a Terracotta programmer might look to MapReduce to harness a network of cheap, dedicated machines by programming at a higher level of abstraction.

The MapReduce interface is simple: provide an implementation of the map and reduce functions. Map takes a key and a value and produces a list of (key, value) pairs, potentially of a different type (Map :: (K,V) -> [(K', V')]). Reduce then takes a list of values all corresponding to the same key and produces a final list of values (Reduce :: (K', [V']) -> [V']). Behind the scenes, the framework spreads your data over multiple machines and orchestrates the distributed computation using the map and reduce you provided (Dean & Ghemawat, 2004).

Despite its simplicity, MapReduce is actually applicable to a wide range of problems. It was designed to index a copy of the Internet, building up both a model of its link structure and the word content of its pages. Large scale distributed sorts and searches obviously lend themselves to MapReduce, but recent research shows a broad class of machine-learning algorithms can also be recast to fit the model (Chu, et al., 2007).

One challenge is that Google keeps MapReduce behind closed doors. Fortunately, Apache provides Hadoop, an open source implementation of MapReduce built on top of the Hadoop Distributed File System (HDFS). Hadoop relies upon HDFS for much of its fault tolerance, particularly through replication. As we delve into the details of how Hadoop works, refer to Listing 5 for yet another implementation of grep.

Before you begin a Hadoop job, you must store its input in HDFS. At the start of the map phase, Hadoop logically partitions the data and allocates one map task, called a Mapper, per partition (there may be hundreds of these on a single machine). The map task invokes the user-defined map function once per (key, value) pair in its local portion of the data set. The output of the map task is sorted by key, then partitioned per reduce task and saved to disk.

In the reduce phase, each reduce task, called a Reducer, starts by fetching the sorted result files assigned to its partition during the map phase and merging them. The reduce task invokes the user-defined reduce function on each incoming key group, writing the results unsorted to disk. The number of reduce tasks is exposed as a tunable to the programmer; increasing the number increases overhead, but also better balances the load across the cluster (Hadoop, 2008).

What It All Means
In the end, it all boils down to standard good engineering practice: choose the right tool for the job, subject to the constraints of your problem. The tools we've discussed provide you with higher-level abstractions than threads, allowing suitable problems to be solved in far more intuitive, robust ways. They are effectively non-competing, each occupying its own niche of the problem space. And it's quite possible that a problem will require a combination approach. Both fork/join and Pervasive DataRush can be used to implement Hadoop tasks. It's also possible, in theory and with slight modification, to use both with Terracotta.

As you begin to parallelize your applications, bear in mind the requirements of each of the tools we've discussed. Fork/join requires you to specify your implementation using divide-and-conquer. The problem must fit into memory on a single, multicore machine. Pervasive DataRush requires the same hardware, but handles scaling your problem out of memory and onto disk more gracefully. This adds the requirement of an ample local disk. Rather than the divide-and-conquer of fork/join, Pervasive DataRush requires you to recast your implementation in dataflow.

If your application is already threaded, Terracotta doesn't require any rewriting. However, programmers must take on the burden of designing, implementing, and troubleshooting concurrent code. Terracotta really starts to shine when you give it multiple machines and the behavior of your code partitions readily across them. Hadoop requires the same or more machines and the infrastructure of a distributed file system, but lets you program to a much higher-level MapReduce interface. It also gives you fault tolerance, which is vital as you scale up the number of machines.

Your job as a software engineer is to distill the fundamental nature of your application and choose the tool whose "sweet spot" most closely aligns with it. The high-level overview we've provided here will give you a start in your research. You can find further help in the references below.

References

  • Chu, C.-T., Kim, S. K., Lin, Y.-A., Yu, Y., Bradski, G., Ng, A. Y., et al. (2007). Map-Reduce for Machine Learning on Multicore. NIPS (pp. 281 - 288). Cambridge: MIT Press.
  • Dean, J., & Ghemawat, S. (2004). MapReduce: Simplified Data Processing on Large Clusters. OSDI'04: Sixth Symposium on Operating System Design and Implementation. San Francisco.
  • Falgout, J. (2008, March 30). Crunching Big Data with Java. Retrieved from JDJ.
  • Goetz, B. (2007, November 13). Java theory and practice: Stick a fork in it, Part 1. Retrieved August 7, 2008, from IBM developerWorks.
  • Goetz, B. (2008, March 4). Java theory and practice: Stick a fork in it, Part 2. Retrieved August 7, 2008, from IBM developerWorks.
  • Hadoop. (2008, July 7). Hadoop Map-Reduce Tutorial. Retrieved August 8, 2008, from Hadoop.
  • Hadoop. (2007, February 13). Project Description. Retrieved August 8, 2008, from Hadoop Wiki.
  • Lea, D. (2000). A Java Fork/Join Framework. Java Grande, (pp. 36-43).
  • Lea, D. (n.d.). JSR 166y API. Retrieved August 7, 2008, from JSR 166y API.
  • Pervasive Software. (n.d.). Retrieved August 7, 2008, from Pervasive DataRush.
  • Sun. (2006). java.uti.concurrent Package API. Retrieved August 7, 2008, from Java 6 API.
  • Terracotta. (n.d.). Retrieved August 7, 2008, from Terracotta.
  • Terracotta. (2008, June 30). Master Worker. Retrieved August 7, 2008, from Terracotta Forge Labs.
-----------------------------
BY Matt Walker; Kevin Irwin
Source:SYS-CON

About Matt Walker
Matt Walker is an engineer at Pervasive Software, seeking a deeper understanding of concurrent programming techniques to improve the Pervasive DataRush framework for dataflow programming. He holds an MS in computer science from UT and received his BS in electrical and computer engineering from Rice University.

About Kevin Irwin
Kevin Irwin is a senior engineer at Pervasive Software, working on performance and concurrency within the Pervasive DataRush dataflow engine. With 15 years of industry experience, he previously worked at companies including Sun and IBM, developing high-performance, scalable enterprise software. Kevin holds an MCS and a BA in mathematics and computer science from Rice University.

0 comments:

 

Copyright 2008-2009 Daily IT News | Contact Us