Yahoo! Developer Network Blog
« Previous | Main | Next »
August 27, 2009
The Anatomy of Hadoop I/O Pipeline
Introduction
In a typical Hadoop MapReduce job, input files are read from HDFS. Data are usually compressed to reduce the file sizes. After decompression, serialized bytes are transformed into Java objects before being passed to a user-defined map() function. Conversely, output records are serialized, compressed, and eventually pushed back to HDFS. This seemingly simple, two-way process is in fact much more complicated due to a few reasons:
- Compression and decompression are typically done through native library code.
- End-to-end CRC32 checksum is always verified or calculated during reading or writing.
- Buffer management is complicated due to various interface restrictions.
In this blog post, I attempt to decompose and analyze the Hadoop I/O pipeline in detail, and explore possible optimizations. To keep the discussion concrete, I am going to use the ubiquitous example of reading and writing line records from/to gzip-compressed text files. I will not get into details on the DataNode side of the pipeline, and instead mainly focus on the client-side (the map/reduce task processes). Finally, all descriptions are based on Hadoop 0.21 trunk at the time of this writing, which means you may see things differently if you are using older or newer versions of Hadoop.
Reading Inputs
Figure 1 illustrates the I/O pipeline when reading line records from a gzipped text file using TextInputFormat. The figure is divided in two sides separated by a thin gap. The left side shows the DataNode process, and the right side the application process (namely, the Map task). From bottom to top, there are three zones where buffers are allocated or manipulated: kernel space, native code space, and JVM space. For the application process, from left to right, there are the software layers that a data block needs to traverse through. Boxes with different colors are buffers of various sorts. An arrow between two boxes represents a data transfer or buffer-copy. The weight of an arrow indicates the amount of data being transferred. The label in each box shows the rough location of the buffer (either the variable that references to the buffer, or the module where the buffer is allocated). If available, the size of a buffer is described in square brackets. If the buffer size is configurable, then both the configuration property and the default size are shown. I tag each data transfer with the numeric step where the transfer happens:
![]() |
- Data transferred from DataNode to MapTask process. DBlk is the file data block; CBlk is the file checksum block. File data are transferred to the client through Java nio transferTo (aka UNIX sendfile syscall). Checksum data are first fetched to DataNode JVM buffer, and then pushed to the client (details are not shown). Both file data and checksum data are bundled in an HDFS packet (typically 64KB) in the format of: {packet header | checksum bytes | data bytes}.
- Data received from the socket are buffered in a BufferedInputStream, presumably for the purpose of reducing the number of syscalls to the kernel. This actually involves two buffer-copies: first, data are copied from kernel buffers into a temporary direct buffer in JDK code; second, data are copied from the temporary direct buffer to the byte[] buffer owned by the BufferedInputStream. The size of the byte[] in BufferedInputStream is controlled by configuration property "io.file.buffer.size", and is default to 4K. In our production environment, this parameter is customized to 128K.
- Through the BufferedInputStream, the checksum bytes are saved into an internal ByteBuffer (whose size is roughly (PacketSize / 512 * 4) or 512B), and file bytes (compressed data) are deposited into the byte[] buffer supplied by the decompression layer. Since the checksum calculation requires a full 512 byte chunk while a user's request may not be aligned with a chunk boundary, a 512B byte[] buffer is used to align the input before copying partial chunks into user-supplied byte[] buffer. Also note that data are copied to the buffer in 512-byte pieces (as required by FSInputChecker API). Finally, all checksum bytes are copied to a 4-byte array for FSInputChecker to perform checksum verification. Overall, this step involves an extra buffer-copy.
- The decompression layer uses a byte[] buffer to receive data from the DFSClient layer. The DecompressorStream copies the data from the byte[] buffer to a 64K direct buffer, calls the native library code to decompress the data and stores the uncompressed bytes in another 64K direct buffer. This step involves two buffer-copies.
- LineReader maintains an internal buffer to absorb data from the downstream. From the buffer, line separators are discovered and line bytes are copied to form Text objects. This step requires two buffer-copies.
Optimizing Input Pipeline
Adding everything up, including a "copy" for decompressing bytes, the whole read pipeline involves seven buffer-copies to deliver a record to MapTask's map() function since data are received in the process's kernel buffer. There are a couple of things that could be improved in the above process:
- Many buffer-copies are needed simply to convert between direct buffer and byte[] buffer.
- Checksum calculation can be done in bulk instead of one chunk at a time.
![]() |
Figure 2 shows the post-optimization view where the total number of buffer copies is reduced from seven to three:
- An input packet is decomposed into the checksum part and the data part, which are scattered into two direct buffers: an internal one for checksum bytes, and the direct buffer owned by the decompression layer to hold compressed bytes. The FSInputChecker accesses both buffers directly.
- The decompression layer deflates the uncompressed bytes to a direct buffer owned by the LineReader.
- LineReader scans the bytes in the direct buffer, finds the line separators from the buffer, and constructs Text objects.
Writing Outputs
Now let's shift gears and look at the write-side of the story. Figure 3 illustrates the I/O pipeline when a ReduceTask writes line records into a gzipped text file using TextOutputFormat. Similar to Figure 1, each data transfer is tagged with the numeric step where the transfer occurs:
![]() |
- TextOutputFormat's RecordWriter is unbuffered. When a user emits a line record, the bytes of the Text object are copied straight into a 64KB direct buffer owned by the compression layer. For a very long line, it will be copied to this buffer 64KB at a time for multiple times.
- Every time the compression layer receives a line (or part of a very long line), the native compression code is called, and compressed bytes are stored into another 64KB direct buffer. Data are then copied from that direct buffer to an internal byte[] buffer owned by the compression layer before pushing down to the DFSClient layer because the DFSClient layer only accepts byte[] buffer as input. The size of this buffer is again controlled by configuration property "io.file.buffer.size". This step involves two buffer-copies.
- FSOutputSummer calculates the CRC32 checksum from the byte[] buffer from the compression layer, and deposits both data bytes and checksum bytes into a byte[] buffer in a Packet object. Again, checksum calculation must be done on whole 512B chunks, and an internal 512B byte[] buffer is used to hold partial chunks that may result from compressed data not aligned with chunk boundaries. Checksums are first calculated and stored in a 4B byte[] buffer before being copied to the packet. This step involves one buffer-copy.
- When a packet is full, the packet is pushed to a queue whose length is limited to 80. The size of the packet is controlled by configuration property "dfs.write.packet.size" and is default to 64KB. This step involves no buffer-copy.
- A DataStreamer thread waits on the queue and sends the packet to the socket whenever it receives one. The socket is wrapped with a BufferedOutputStream. But the byte[] buffer is very small (no more than 512B) and it is usually bypassed. The data, however, still needs to be copied to a temporary direct buffer owned by JDK code. This step requires two data copies.
- Data are sent from the ReduceTask's kernel buffer to the DataNode's kernel buffer. Before the data are stored in Block files and checksum files, there are a few buffer-copies in DataNode side. Unlike the case of DFS read, both file data and checksum data will traverse out of kernel, and into JVM land. The details of this process are beyond the discussion here and are not shown in the figure.
Optimizing Output Pipeline
Overall, including the "copy" for compressing bytes, the process described above requires six buffer-copies for an output line record to reach ReduceTask's kernel buffer. What could we do to optimize the write pipeline?
- We can probably reduce a few buffer-copies.
- The native compression code may be called less frequently if we call it only after the input buffer is full (block compression codecs like LZO already do this).
- Checksum calculations can be done in bulk instead of one chunk at a time.
![]() |
Figure 4 shows how it looks like after these optimizations, where a total of four buffer-copies are necessary:
- Bytes from a user's Text object are copied to a direct buffer owned by the TextOutputFormat layer.
- Once this buffer is full, native compression code is called and compressed data is deposited to a direct buffer owned by the compression layer.
- FSOutputSummer computes the checksum for bytes in the direct buffer from the compression layer and saves both data bytes and checksum bytes into a packet's direct buffer.
- A full packet will be pushed into a queue, and, in background, the DataStreamer thread sends the packet through the socket, which copies the bytes to be copied to kernel buffers.
Conclusion
This blog post came out of an afternoon spent asking ourselves specific questions about Hadoop's I/O and validating the answers in the code. It turns out, after combing through class after class, that the pipeline is more complex than we originally thought. While each of us is familiar with one or more components, we found the preceding, comprehensive picture of Hadoop I/O elucidating, and we hope other developers and users will, too. Effecting the optimizations outlined above will be a daunting task, and this is the first step toward a more performant Hadoop.
-- Hong Tang
Posted at August 27, 2009 3:45 PM
Comments
This is a great article. I haven't had a chance to get familiar with Hadoop/MapReduce professionally, and very much enjoyed this well-written article about the data processing pipeline.
Keep up the good work.
Tony
Posted by: Tony Chang at August 28, 2009 9:12 AM | Permalink
Hong - Thanks for a glimpse into the (gory) details of Hadoop's I/O. It's heartening -- at the very least -- that these represent targets for performance gains in future releases.
Posted by: Michael E Driscoll at September 14, 2009 7:09 PM | Permalink
Post a comment
Comment Policy: We encourage comments and look forward to hearing from you. Please note that Yahoo! may, in our sole discretion, remove comments if they are off topic, inappropriate, or otherwise violate our Terms of Service.
Hadoop is a trademark of the Apache Software Foundation.
Subscribe
Recent Blog Articles
view all
Hadoop Bay Area User Group - Feb 17th at Yahoo!, Sunnyvale
Wed, 03 Feb 2010
Comparing Pig Latin and SQL for Constructing Data Processing Pipelines
Fri, 29 Jan 2010
Video from Jan. 20, 2010 Hadoop Bay Area User Group now online
Thu, 28 Jan 2010
Stomping out Java "concurrency cockroaches" with SureLogic's Flashlight and JSure tools
Tue, 26 Jan 2010
Hadoop Bay Area January 2010 User Group - Recap
Thu, 21 Jan 2010
Recent Links
Appcelerator Titanium + Yahoo YQL on Vimeo
Mon, 08 Feb 2010
Tue, 02 Feb 2010
PhoneGap | Cross platform mobile framework
Sat, 30 Jan 2010
Web developers can rule the iPad - O'Reilly Radar
Sat, 30 Jan 2010
rc3.org - Is the iPad the harbinger of doom for personal computing?
Thu, 28 Jan 2010
Archives
Recent Readers





