Introduction to the

Apache Commons Compress is a subproject of Apache Commons, which is a dedicated Java compression tool library. I am portal. Maven has the following dependencies:

<! -- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --> <dependency> Mons < groupId > org.apache.com < / groupId > < artifactId > Commons - compress < / artifactId > < version > 1.20 < / version > < / dependency >Copy the code

Concurrent compression

The sample code

public static void compressFileList(String zipOutName, List<String> fileNameList) throws IOException, ExecutionException, InterruptedException { ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("compressFileList-pool-").build(); ExecutorService executor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20), factory); ParallelScatterZipCreator parallelScatterZipCreator = new ParallelScatterZipCreator(executor); OutputStream outputStream = new FileOutputStream(zipOutName); ZipArchiveOutputStream zipArchiveOutputStream = new ZipArchiveOutputStream(outputStream); / / will each file input stream ZipArchiveEntry submitted to parallelScatterZipcreator execution zipArchiveOutputStream. SetEncoding (" utf-8 "); for (String fileName : fileNameList) { File inFile = new File(fileName); final InputStreamSupplier inputStreamSupplier = () -> { try { return new FileInputStream(inFile); } catch (FileNotFoundException e) { e.printStackTrace(); return new NullInputStream(0); }}; ZipArchiveEntry zipArchiveEntry = new ZipArchiveEntry(inFile.getName()); zipArchiveEntry.setMethod(ZipArchiveEntry.DEFLATED); zipArchiveEntry.setSize(inFile.length()); zipArchiveEntry.setUnixMode(UnixStat.FILE_FLAG | 436); parallelScatterZipCreator.addArchiveEntry(zipArchiveEntry, inputStreamSupplier); } parallelScatterZipCreator.writeTo(zipArchiveOutputStream); zipArchiveOutputStream.close(); outputStream.close(); log.info("ParallelCompressUtil->ParallelCompressUtil-> info:{}", JSONObject.toJSONString(parallelScatterZipCreator.getStatisticsMessage())); }Copy the code

ParallelScatterZipCreator

This class is the core processing class of parallel compression, look at the source code:

You can see that this class handles multi-file compression in parallel using thread pool commit queue consumption, merging the compressed output into the output file. Keep reading

We can see the constructor of this class with three arguments:

The first is to create a core thread number and the maximum number of threads are available on the system, so it is not recommended to use this type of self-built thread pool, because it will occupy all system resources during the compression process, causing other programs to affect. And the thread pool shuts down when the final compression is complete.

The second is intermediate storage, which can be understood as temporary files generated during the compression process and deleted when the compression is finally completed. In this case, the default is used. The third is the compression level, using the Deflater algorithm under the java.util.zip package.

The default compression level is the default compression level, which can be selected as required.

In the end isZipArchiveOutputStreamThis class compresses output, optionally to a file or a new OutputStream or channel.


In the for loop code above, will stay in turn compressed file assemble ZipArchiveEntry finally through parallelScatterZipCreator. AddArchiveEntry (ZipArchiveEntry inputStreamSupplier) Added to a thread pool and processed by multiple threads simultaneously.

for (String fileName : fileNameList) { File inFile = new File(fileName); final InputStreamSupplier inputStreamSupplier = () -> { try { return new FileInputStream(inFile); } catch (FileNotFoundException e) { e.printStackTrace(); return new NullInputStream(0); }}; ZipArchiveEntry zipArchiveEntry = new ZipArchiveEntry(inFile.getName()); zipArchiveEntry.setMethod(ZipArchiveEntry.DEFLATED); zipArchiveEntry.setSize(inFile.length()); zipArchiveEntry.setUnixMode(UnixStat.FILE_FLAG | 436); parallelScatterZipCreator.addArchiveEntry(zipArchiveEntry, inputStreamSupplier); }Copy the code

ZipArchiveEntry is the ZipEntry class in the base java.zip package, with extensions for additional fields. The value of method is ZipArchiveEntry.DEFLATED by default, the value of size is the size of the compressed input stream, and the value of unixmode is set to unzip. Each file or input stream to be compressed is wrapped as ZipArchiveEntry and added to the thread pool by addArchiveEntry.

/** * Adds an archive entry to this archive. * <p> * This method is expected to be called from a single client thread * </p> * * @param zipArchiveEntry The entry to add. * @param source The source input stream supplier */ public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { submitStreamAwareCallable(createCallable(zipArchiveEntry, source)); } public final Callable<ScatterZipOutputStream> createCallable(Final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { final int method = zipArchiveEntry.getMethod(); if (method == ZipMethod.UNKNOWN_CODE) { throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry); } final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source); return new Callable<ScatterZipOutputStream>() { @Override public ScatterZipOutputStream call() throws Exception { ScatterZipOutputStream scatterStream = tlScatterStreams.get(); / / to the compression of zipEntry writing submitted to the queue scatterStream ScatterZipOutputStream. AddArchiveEntry (zipArchiveEntryRequest); return scatterStream; }}; } /** * Submit a callable for compression. * * @see ParallelScatterZipCreator#createCallable for details of if/when to use this. * * @param callable The callable to run, created by {@link #createCallable createCallable}, Possibly wrapped by caller. * @ since 1.19 * / public final void submitStreamAwareCallable (final Callable <? extends ScatterZipOutputStream> callable) { futures.add(es.submit(callable)); }Copy the code

AddArchiveEntry converts each entry into a callable task and submits it to the thread pool for execution. The futrue returned is added to the queue in the parallel class. The submitted task is to Queue< CompressedEntry > in the ScatterZipOutputStream class, and finally to write all compressedentries from the Queue to the final output when parallel’s writeTo method is called.

/**
* 将压缩任务提交至队列中
*/
public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException {
        try (final InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) {
            streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod());
        }
        items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(),
                                      streamCompressor.getBytesWrittenForLastEntry(), streamCompressor.getBytesRead()));
                               
    }
 /**
 * 最终将压缩内容写入输出流
 */
 public void writeTo(final ZipArchiveOutputStream target) throws IOException {
        backingStore.closeForWriting();
        try (final InputStream data = backingStore.getInputStream()) {
            for (final CompressedEntry compressedEntry : items) {
                try (final BoundedInputStream rawStream = new BoundedInputStream(data,
                        compressedEntry.compressedSize)) {
                    target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
                }
            }
        }
    }
Copy the code

Put together a flow chart