If you want to use the official Parquet Java library implementation, you’ll quickly see that it brings along Hadoop as a large, cumbersome transitive dependency. This makes it complicated to use parquet in small systems and simple use cases.
In this post, I’ll show you how you can eliminate almost all of the Hadoop dependency. I’m using this technique in production systems that need to export data with a clean, structured data schema. This works for both reading and writing parquet files using the official java implementation. In one case, I’ve seen an over 85% reduction in shaded jar size by cutting out most of the Hadoop transitive dependencies.
This technique can be summarized by:
- Switch to using all non-Hadoop Parquet interfaces.
- Remove all Hadoop imports from your code
- Explicitly bring in transitive dependencies that are still part of the parquet import graph. Exclude all other Hadoop dependencies.
For our method, where we want to write out parquet files to S3 object storage, we’re also going to utilize AWS’ Java NIO FileSystem SPI project to make it easy to do I/O to S3 without having to implement some of the more intricate S3 I/O for direct readinga and writing to S3.
Here are these steps in more detail.
Switch to non-Hadoop Parquet interfaces
Many of the reader and writer interfaces in parquet-java (formerly parquet-mr)
are explicitly coupled to Hadoop classes. If all you want to do is read and
write parquet files, you’re at a minimum going to interact with ParquetReader
, ParquetWriter
and their superclasses such as AvroParquetWriter
or
ProtoParquetWriter
. All of these classes have constructors that take in
instances of
org.apache.hadoop.conf.Configuration to configure the writer,
and will also specify input and output paths with a Hadoop filesystem
Path.
You need to ensure that none of your code depends on these interfaces. You need to:
- Replace Hadoop
Configuration
with an instance of ParquetConfiguration, most likely PlainParquetConfiguration. All parquet readers and writers should have constructor interfaces that take in this configuration object, instead of the Hadoop variant. - Replace all instances of Hadoop
Path
with either a parquet InputFile for reads, or an OutputFile for writes. If you want to write to your local filesystem, you can use the LocalInputFile or LocalOutputFile implementations.
Concretely, if you were writing out Parquet files using an AvroParquetWriter
to export to a local FileSystem, you would change your code from something like this:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
final Path hadoopPath = new Path("file:///tmp/file.parquet");
final Configuration hadoopConfiguration = new Configuration();
final ParquetWriter<T> writer = AvroParquetWriter.<T>builder(hadoopPath)
.withConf(hadoopConfiguration)
.withSchema(schema)
.build();
To something like this instead:
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.conf.PlainParquetConfiguration;
import org.apache.parquet.io.LocalOutputFile;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Path;
final FileSystem fs = FileSystems.getFileSystem(new URI("file", null, "/", null, null));
final ParquetConfiguration parquetConf = new PlainParquetConfiguration();
final Path nioOutputPath = fs.getPath("file:///tmp/file.parquet");
final LocalOutputFile outputFile = new LocalOutputFile(nioOutputPath);
final ParquetWriter<T> writer = AvroParquetWriter.<T>builder(outputFile)
.withConf(parquetConf)
.withSchema(schema)
.build();
Notice how we’ve switched to using Java’s built in NIO interfaces for file I/O.
In a later section, we’ll see how we can use these same interfaces to do direct I/O to S3 instead of local file system operations.
After you’ve moved over to these interfaces, you should remove all unused
org.apache.hadoop.*
imports from your code, before proceeding to the next
step.
Dependency Changes
Even though none of your code won’t reference anything from Hadoop, we still
can’t explicitly exclude all transitive Hadoop dependencies. This is because the
core reader / writer classes we’re using still bring in these Hadoop
dependencies as imports. While our technique will not explicitly use any of
these Hadoop code paths, they are still referenced as imports from classes like
ParquetWriter
and friends.
I’m guessing that when parquet moves to 2.0, the parquet team will remove this explicit Hadoop coupling, but to do so any sooner would bring breaking interface changes. Until then, we’ll need to do some transitive dependency surgery ourselves.
We explicitly bring in hadoop-common
, and intentionally only bring in
dependencies that are referenced through the import graph. Everything else is
excluded. There might be a better way to do this with maven, but this has been
working so far.
For parquet-java version 1.14.1, here’s the relevant maven pom exclusions:
<!-- BEGIN LINGERING HADOOP TRANSITIVE DEPENDENCIES
There are still some lingering classes in parquet-mr that are coupled to Hadoop via their public interfaces.
Remove this entirely once Hadoop is correctly made optional in parquet-mr for another
bunch of jar size savings.
NOTE: None of *our* code explicitly calls into any Hadoop code. However, some of the Parquet classes
we depend on will import Hadoop dependencies (but not use them in our core code paths). Once parquet-mr
makes Hadoop optional (probably sometime in 2.0.0, since these would break certain public interfaces),
we can remove everything between these Hadoop dependencies.
On wildcard exclusions: This is the bare-minimum jars that must be brought into the classpath to satisfy import dependencies.
(The import graph from the classes we *do* use, such as ParquetWriter, etc.). We explicitly avoid bringing in any transitive dependencies, and just bring
in the exact jars we need.
-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.woodstox</groupId>
<artifactId>woodstox-core</artifactId>
<version>5.4.0</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.codehaus.woodstox</groupId>
<artifactId>stax2-api</artifactId>
<version>4.2.1</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop.thirdparty</groupId>
<artifactId>hadoop-shaded-guava</artifactId>
<version>1.2.0</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- END LINGERING HADOOP TRANSITIVE DEPENDENCIES
delete everything between these two blocks once parquet-mr makes Hadoop entirely optional.
-->
Like I mentioned, cutting out all these transitives, along with hadoop-aws
took one production shaded jar from ~657MB down to around ~96MB.
If upstream parquet removes all Hadoop interface coupling, we’d be able to get rid of this ugly maven hack altogether.
S3 Reading and Writing
So now we theoretically can read and write parquet files from the local
filesystem using Java’s built in NIO interfaces, but what about direct I/O to S3?
One of the nicities of using the Hadoop FileSystem
abstraction was that we
could read and write parquet files directly from blob storage, and we’d like to
recreate that.
Let’s fix that by leveraging AWS’ Java NIO SPI for S3.
This isn’t strictly necessary, but its implementation already can handle
producing a seekable byte channel that implements I/O buffering for us. If you’d rather
not bring this library into scope, you’re at a minimum going to need to
recreate InputFile
and OutputFile
implementations that can buffer I/O to and
from S3 to your liking.
If you follow the README directions to configure your credentials for S3, you’re 95% of
the way to being able to just plug in directly to Parquet reading and writing.
Instead of looking up java.nio.file.Path
objects from our local file
FileSystem, we instead look up java.nio.file.Path
objects from the provided
s3
filesystem implementation.
- For reading and writing, lookup
Path
objects from the S3FileSystem. - For writing, the upstream
LocalOutputFile
implementation works just fine with S3 paths. - For reading, you’ll need to use a different
InputFile
implementation that works correctly with nio interfaces, and doesn’t fallback to legacyFile
operations.
Here’s an implementation that should work with our S3 NIO FileSystem implementation, that only depends on NIO compatible interfaces:
package me.blakesmith.parquet;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
/**
* An {@link org.apache.parquet.io.InputFile} implementation that only uses
* java.nio.* interfaces for I/O operations. The LocalInputFile implementation in
* upstream parquet-mr currently falls back to the old-school java file I/O APIs
* (via Path#toFile) which won't work with nio remote FileSystems such as an S3
* FileSystem implementation.
*/
public class NioInputFile implements InputFile {
private final Path path;
private long length = -1;
public NioInputFile(Path file) {
path = file;
}
@Override
public long getLength() throws IOException {
if (length == -1) {
length = Files.size(path);
}
return length;
}
@Override
public SeekableInputStream newStream() throws IOException {
return new SeekableInputStream() {
private final SeekableByteChannel byteChannel = Files.newByteChannel(path);
private final ByteBuffer singleByteBuffer = ByteBuffer.allocate(1);
@Override
public int read() throws IOException {
// There has to be a better way to do this?
singleByteBuffer.clear();
final int numRead = read(singleByteBuffer);
if (numRead >= 0) {
int value = (int)singleByteBuffer.get(0) & 0xFF;
return value;
} else {
return -1;
}
}
@Override
public long getPos() throws IOException {
return byteChannel.position();
}
@Override
public void seek(long newPos) throws IOException {
byteChannel.position(newPos);
}
@Override
public void readFully(byte[] bytes) throws IOException {
readFully(bytes, 0, bytes.length);
}
@Override
public void readFully(byte[] bytes, int start, int len) throws IOException {
final ByteBuffer buf = ByteBuffer.wrap(bytes);
buf.position(start);
readFully(buf);
}
@Override
public int read(ByteBuffer buf) throws IOException {
return byteChannel.read(buf);
}
@Override
public void readFully(ByteBuffer buf) throws IOException {
int numRead = 0;
while (numRead < buf.limit()) {
final int code = read(buf);
if (code == -1) {
return;
} else {
numRead += code;
}
}
}
@Override
public void close() throws IOException {
byteChannel.close();
}
};
}
}
I’d like to get this implementation pushed upstream, either to replace the
LocalInputFile
implementation, or to sit alongside it for use cases like this
where we want to plug in NIO FileSystem implementations that can’t fall
back to legacy File
interfaces.
Conclusion
We can now read / write Parquet files, using the official Java implementation, without any explicit dependencies on Hadoop, but still read and write directly to S3 blob storage. Not only does this reduce our jar sizes, but it also cuts down on classpath dependency sprawl. You can embed parquet functions inside smaller codebases where carrying around a prohibitively cumbersome Hadoop dependency would be a complete non-starter.
Parquet is an amazing file format, that’s going to be here for a long time, especially in our current age of cheap blob storage. One of the biggest things holding parquet java back from being ubiquitously usable is issues like this where the implementation bloats your codebases and deployables. I’m eager to both help and support however I can to reduce parquet-java’s dependency on Hadoop, and help bring the benefits of parquet to more code bases.