Java does not provide any mechanism for safely terminating threads (although the Thread.stop and suspend methods do provide such mechanisms, they have some serious drawbacks and should be avoided)

Task to cancel

Cancellable:

An operation can be cancelable if external code can put it in the “done” state before it completes normally.

The reason for canceling an operation

  • User cancellation request: A user clicks the cancel button in a graphical application or makes a cancellation request through the administrative interface.
  • Time-constrained operations: For example, an application may need to search the problem space in a limited amount of time and find the best solution within that time. When the timer times out, you need to cancel all searching tasks.
  • Application events: For example, an application decomposes and searches a problem space so that different tasks can search different areas of the problem space. But when one of the tasks finds a solution, all the other tasks still searching can be cancelled.
  • Error: Web crawler searches related pages and saves page or summary data to hard disk. When a crawler task operates (for example, disk space is full), all search tasks are cancelled and their current status may be recorded for later restart.
  • Shutdown: When a program or service is shut down, some action must be performed on the work being processed and waiting to be processed. In a gentle shutdown, the currently executing task continues to execute until it is closed, while in an immediate shutdown, the current task may be canceled.

Procedure 1&2: A prime generator that only runs for 1 second

@ThreadSafe
public class PrimeGenerator implements Runnable {
    private static ExecutorService exec = Executors.newCachedThreadPool();

    @GuardedBy("this")
    private final List<BigInteger> primes
            = new ArrayList<BigInteger>();
    private volatile boolean cancelled;

    public void run() {
        BigInteger p = BigInteger.ONE;
        while(! cancelled) { p = p.nextProbablePrime(); synchronized (this) { primes.add(p); } } } public voidcancel() {
        cancelled = true;
    }

    public synchronized List<BigInteger> get() {
        return new ArrayList<BigInteger>(primes);
    }

    static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
        PrimeGenerator generator = new PrimeGenerator();
        exec.execute(generator);
        try {
            SECONDS.sleep(1);
        } finally {
            generator.cancel();
        }
        returngenerator.get(); }}Copy the code

Prime generators usually do not stop running just after one second, because there may be a delay between the moment the request is cancelled and the next check performed by the run method loop.

A cancelable task must have a cancelable policy in which the “How”, “When”, and “What” of cancelable operations are defined in detail: How does the code request cancelable tasks, When does the task check if cancelable tasks have already been requested, and What should be done in response to cancelable requests


  1. interrupt


class BrokenPrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;
    private volatile boolean cancelled = false;

    BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while(! cancelled) queue.put(p = p.nextProbablePrime()); } catch (InterruptedException consumed) { } } public voidcancel() {
        cancelled = true; } } void consumePrimes () throws InterruptedException { BlockingQueue<BigInteger> primes = ... ; BrokenPrimeProducer producer = new BrokenPrimeProducer(primes); producer.start(); try {while(needMorePrimes()) consume(pimes.take()); } finally { producer.cancel(); }}Copy the code

Interrupts are not managed with any cancellation semantics in Java apis or language rules, but in practice, using interrupts for operations other than cancellation is inappropriate and difficult to support large applications.

Each thread has a Boolean interrupt status. When the thread is interrupted, the interrupt status of this thread is set to true. Thread contains methods to interrupt a Thread and query its interrupted status. The interrupt method interrupts the target thread, while the isInterrupt method returns the interrupt status of the target thread. The static interrupted method clears the interrupted status of the current thread and returns its previous value, which is the only way to clear the interrupted status.

public class Thread {
    // Interrupts the target thread
    public void interrupt (a) {... }// Returns the interrupted status of the target thread
    public boolean isInterrupted (a) {... }// Clears the interrupted state of the current thread and returns the previous value
    public static boolen interrupted (a) {...}
}Copy the code

Blocking library methods, such as Thread.sleep and Object.wait, check when a Thread is interrupted and return early if an interruption is found. The actions they perform in response to an interrupt include clearing the interrupt status and throwing InterruptedException, indicating that the blocking operation ended prematurely because of the interrupt. The JVM does not guarantee that the blocking method will detect interrupts quickly, but in practice it can be very responsive.

When a thread interrupts in a non-blocking state, its interrupt status is set, and then the interrupt status is checked and the interrupt occurs based on the action to be canceled. In this way, the interrupt operation becomes “sticky” — if InterruptedException is not triggered, the interrupted status remains until the interruption status is explicitly cleared.

Calling interrupt does not immediately stop what the target thread is doing, but merely passes a message requesting an interruption.

The correct understanding of an interrupt operation is that it does not actually interrupt a running thread, but merely issues an interrupt request and the thread interrupts itself at the next appropriate time. (These moments are also known as cancellation points). Some methods, such as Wait, sleep, and Join, handle such requests strictly and throw an exception when they receive an interrupt request or when they start execution and find an interrupt state that has already been set. Well-designed methods can completely ignore such requests, as long as they cause the calling code to do some sort of processing on the interrupt request. A poorly designed method can mask interrupt requests, making it impossible for other code in the call stack to respond to interrupt requests.

Be careful when using static interrupted because it clears the interrupted state of the current thread. If interrupted returns true, then unless you want to disable the interrupt, you must either throw InterruptedException or restore the interrupted status by calling interrupt again.

Often, interrupts are the most logical way to implement cancellation

Procedure 5: Cancel by interrupt

public class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;

    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while(! Thread.currentThread().isInterrupted()) queue.put(p = p.nextProbablePrime()); } catch (InterruptedException consumed) { /* Allow thread toexit */
        }
    }

    public void cancel() { interrupt(); }}Copy the code

The problem in BrokenPrimeProducer is easy to fix: use the interrupt flag instead of the Boolean flag to request cancellation. There are two places where interrupts can be detected in each iteration of the loop: during a blocking call to the PUT method, and at the beginning of the loop when the query interrupts.

Interruption strategy

An interrupt policy dictates how a thread interprets an interrupt request — what work should be done when an interrupt request is found, what units of work should be a principled action for an interrupt, and how quickly to respond to an interrupt.

The most logical interrupt strategy is some form of thread-level or service-level cancellation: exit as soon as possible, clean up if necessary, and notify an owner that the thread has quit.

The task will not be executed in its own thread, but in a thread owned by a service (such as a thread pool). For code that does not own a thread, care should be taken to save the interrupt state so that code that owns the thread can respond to the interrupt.

Because each thread has its own interrupt policy, you should not interrupt a thread unless you know what the interrupt means for that thread.

In response to interrupt

Two practical strategies can be used to handle InterruptedException:

  • Pass an exception (perhaps after a cleanup operation for a domain-specific task), making your method an interruptible blocking method as well.
  • Restores the interrupted state so that it can be processed by code higher in the call stack.

Procedure 6: Pass InterruptedException to the caller

BlockingQueue<Task> queue; . public Task getNextTask() throws InterruptedException {return queue.take();
}Copy the code

If you do not want to or cannot pass InterruptedException (perhaps defining the task through Runnable), you need to find another way to save the interrupt request. One standard way to do this is to call interrupt again to restore the interrupted state.

Interrupt requests can only be masked if code implements a thread interrupt policy, and interrupt requests should not be masked in normal task or library code.

For operations that do not support cancellation but can still call interrupt blocking methods, they must call those methods in the loop and try again when they find an interrupt.

Procedure 7: Uncancelable tasks resume interrupts before exiting

public class NoncancelableTask {
    public Task getNextTask(BlockingQueue<Task> queue) {
        boolean interrupted = false;
        try {
            while (true) {
                try {
                    return queue.take();
                } catch (InterruptedException e) {
                    interrupted = true;
                    // fall through and retry}}}finally {
            if(interrupted) Thread.currentThread().interrupt(); }}interface Task {}}Copy the code

Example: Timed run

Many problems will never be solved (enumerating all prime numbers, for example), and some, which have quick answers, may never be answered. In these cases, it would be useful to be able to specify “take up to 10 minutes to search for answers” or “enumerate answers that can be found in 10 minutes.”

The aSecondOfPrimes method in program 2 starts a PrimeGenerator and interrupts after 1 second. Although it may take more than a second for the PrimeGenerator to stop, it will eventually find the interruption, then stop and terminate the thread. Another aspect of executing a task is that you want to know if an exception is thrown during the execution of the task. If the PrimeGenerator throws an unchecked exception within the specified time limit, the exception may be ignored because the PrimeGenerator is running in a separate thread that does not display the handling exception.

Program 8 shows an example of running an arbitrary Runnable at a specified time. The task runs in the calling thread with a cancellation task scheduled to interrupt after the specified time interval. Fixed the problem of a task throwing an unchecked exception that would be caught by the caller of timedRun.

Procedure 8: Schedule interrupts in external threads (don’t do this)

public class TimedRun1 { private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1);  public static void timedRun(Runnable r, long timeout, TimeUnit unit) { final Thread taskThread = Thread.currentThread();  cancelExec.schedule(newRunnable() {
            public void run() { taskThread.interrupt(); } }, timeout, unit); r.run(); }}Copy the code

This is a very simple approach, but it breaks the rule that before you interrupt a thread, you should know its interrupt strategy. Because timedRun can be called from any thread, it has no way of knowing the interrupt strategy of that calling thread. If the task completes before the timeout, the cancellation task that interrupts the thread in which timedRun is run starts after timedRun is returned to the caller.

Also, if the task does not respond to interrupts, timedRun will return only at the end of the task, which may or may not have exceeded the specified time limit. If a timed task does not return within the specified time, the caller will be adversely affected.

Procedure 9: Interrupts the task in a dedicated thread

public class TimedRun2 { private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1); public static void timedRun(final Runnable r, long timeout, TimeUnit Unit) throws InterruptedException {class RethrowableTask implements Runnable {// Through a volatile variable, Private Volatile Throwable t; public voidrun() {
                try {
                    r.run();
                } catch (Throwable t) {
                    this.t = t;
                }
            }

            void rethrow() {
                if(t ! = null) throw launderThrowable(t); } } RethrowableTask task = new RethrowableTask(); final Thread taskThread = new Thread(task); taskThread.start(); Cancelexec. schedule(new) cancelexec. schedule(newRunnable() {
            public void run() { taskThread.interrupt(); } }, timeout, unit); // Wait no matter what; Taskthread.join (ununit.tomillis (timeout)); taskThread.join(ununit.tomillis (timeout)); // If there is an exception in the task thread, throw task.rethrow(); }Copy the code

The exception handling of aSecondOfPrimes was addressed in Program 9 as well as in the previous solution. The thread executing the task has its own execution strategy, and a timed method can still be returned to its caller even if the task does not respond to an interrupt. After starting the task thread, timedRun executes a time-limited join method. After the join returns, it checks to see if any exception was thrown in the task, and if so, it will be thrown again in the thread that called timedRun. Since the Throwable will be shared between the two threads, this variable is declared as volatile to ensure that it is safely published from the task thread to the timedRun thread.

This code solves the problem in the previous example, but because it relies on a time-limited JOIN, it suffers from the deficiency of the join: there is no way to know whether the execution control room is returning because the thread exits normally or because the join timed out.

Cancel through the Future

Procedure 10: Cancel the task through the Future

public class TimedRun { private static final ExecutorService taskExec = Executors.newCachedThreadPool(); public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { Future<? > task = taskExec.submit(r); Try {// Use futrue. get(timeout) to catch exceptions to handle timed running and cancelling tasks task.get(timeout, unit); } catch (TimeoutException e) { // task will be cancelled below } catch (ExecutionException e) { // exception thrownin task; rethrow
            throw launderThrowable(e.getCause());
        } finally {
            // Harmless if task already completed
            task.cancel(true); // interrupt if running
        }
    }
}Copy the code

We have used an abstraction mechanism to manage the life cycle of tasks, handle exceptions, and implement cancellations, known as futures. It is often better to write your own using analogies from existing libraries, so we will continue to build timedRun using Future and the Task execution framework.

Executorservice. submit returns a Future describing the task. Future has a cancel method with a Boolean parameter, mayInterruptIfRunning, indicating whether the cancellation was successful (only that the task can receive interrupts, not that it detects and handles interrupts). If mayInterruptIfRunning is true and the task is currently running in a thread, the thread can be interrupted. If mayInterruptIfRunning is false, it means “do not run the task if it is not started” and should be used for tasks that do not handle interrupts.

Another version of timedRun is given in Program 10: submit tasks to an ExecutorService and get results via a timed future.get. If get throws a TimeoutException on return, the task will be cancelled with the Future. (To simplify the code, this version of timedRun calls future. cancel directly in the finally block, because cancelling an already completed task, Will have no effect). If the task throws an exception before cancellation, the exception is rethrown for the caller to handle.

When Futrue.get throws InterruptedException or TimeoutException, you can call Future.cancel to cancel the task if you know the result is no longer needed.

Handle non-interruptible blocking

In Java libraries, many blocking methods respond to interrupt requests by returning InterruptedException ahead of time or throwing InterruptedException, making it easier for developers to build tasks that respond to cancel requests. If a thread is blocked due to synchronous Socket I/O or waiting to acquire a built-in lock, the interrupt request does nothing but set the interrupt status of the thread. Threads that are blocked due to non-interruptible operations can be stopped using interrupt-like means, but this requires that we know why the thread is blocked.

  • Synchronous Socket I/O in java.io packages: The most common form of blocking I/O in server applications is reading and writing sockets. Although methods such as read and write in InputStream and OutputStream do not respond to interrupts, by closing the underlying socket, you can cause a thread blocked by executing a method such as read or write to throw a SocketException.
  • Synchronous I/O in java. IO package: When an interrupt when a thread is waiting on the InterruptibleChannel, throws ClosedByInterruptException and close link (this will also make other blocked thread on the link in the same throw CloaseByInterruptException). When closing a InterrptibleChannel, will lead to all the threads blocked on link operation throw AsynchronousClosedException.
  • Asynchronous I/O of Selector: If a thread is blocked calling Selector. Select (in java.nio.Channels), calling close or wakeup causes the thread to throw a ClosedSelectorException and return early.
  • Acquire a lock: If a thread is blocked waiting for a built-in, it will not respond to the interrupt because the thread thinks it must acquire the lock and will ignore the interrupt request.

Procedure 11: Encapsulate non-standard cancellation operations in Thread by overwriting the interrupt method

public class ReaderThread extends Thread {
    private static final int BUFSZ = 512;
    private final Socket socket;
    private final InputStream in;

    public ReaderThread(Socket socket) throws IOException {
        this.socket = socket;
        this.in = socket.getInputStream();
    }

    @Override
    public void interrupt() {try {// Close the socket before the interrupt. } catch (IOException ignored) { } finally { super.interrupt(); } } public voidrun() {
        try {
            byte[] buf = new byte[BUFSZ];
            while (true) {
                int count = in.read(buf);
                if (count < 0)
                    break;
                else if (count > 0)
                    processBuffer(buf, count);
            }
        } catch (IOException e) { /* Allow thread to exit */
        }
    }

    public void processBuffer(byte[] buf, int count) {
    }
}Copy the code

ReaderThread in program 11 shows how to encapsulate non-standard cancel operations. The ReaderThread manages a socket link that synchronously reads data from the socket and passes the received data to processBuffer. To end a user’s connection or shut down the server, ReaderThread rewrites the Interrupt method so that it can handle both standard interrupts and close the underlying socket. Therefore, whether a ReaderThread thread is blocked in a read method or in one of the interruptible blocking methods, it can be interrupted and stop performing its current work.

Use newTaskFor to encapsulate non-standard cancellations

New Java6 features in ThreadPoolExecutor.

When submitting a Callable to the ExecutorService, the Submit method returns a Future that we can use to cancel the task. NewTaskFor is a factory method that creates a Future to represent the task. NewTaskFor can also return a RunnableFuture interface that extends Future and Runnable (and is implemented by FutureTask)

Procedure 12: Encapsulate non-standard cancel operations in a task with newTaskFor

public abstract class SocketUsingTask <T> implements CancellableTask<T> {
    @GuardedBy("this") private Socket socket;

    protected synchronized void setSocket(Socket s) {
        socket = s;
    }

    public synchronized void cancel() {
        try {
            if(socket ! = null) socket.close(); } catch (IOException ignored) { } } public RunnableFuture<T>newTask() {
        return new FutureTask<T>(this) {
            public boolean cancel(boolean mayInterruptIfRunning) {
                try {
                    SocketUsingTask.this.cancel();
                } finally {
                    returnsuper.cancel(mayInterruptIfRunning); }}}; } } interface CancellableTask <T> extends Callable<T> { void cancel(); RunnableFuture<T> newTask(); } @ThreadSafe class CancellingExecutor extends ThreadPoolExecutor { public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public CancellingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {if (callable instanceof CancellableTask)
            return ((CancellableTask<T>) callable).newTask();
        else
            returnsuper.newTaskFor(callable); }}Copy the code

Program 12’s CancellableTask defines a CancellableTask interface that extends Callable and adds a cancel method and a newTask factory method to construct RunnableFuture. CancellingExecutor extends ThreadPoolExecutor and allows CancellableTask to create its own Future by rewriting newTaskFor.

SocketUsingTask implements the CancellableTask and defines future. cancel to close the socket and call super.cancel. If the SocketUsingTask is cancelled with its own Future, the underlying socket is closed and the thread is interrupted. Therefore, it improves the responsiveness of the task force cancel operation: not only can the interruptible method be called while the response cancel operation is ensured, but also the blocking socket I/O method can be called.

Stop thread-based services

Applications typically create services with multiple threads, such as thread pools, and these services typically have a longer lifetime than the method that created them. If the application is ready to exit, the threads owned by these services also need to be terminated. Threads cannot be stopped preemptively, so they need to terminate themselves.

The correct encapsulation principle is that you cannot manipulate a thread unless you own it.

As with other encapsulated objects, thread ownership is not transferable: an application can own a service, and a service can own worker threads, but an application cannot own worker threads, so an application cannot stop worker threads directly.

For thread-holding services, lifecycle methods should be provided as long as the lifetime of the service is longer than the lifetime of the creator thread’s methods

Example: Log service

Procedure 13: Closed producer-consumer logging service is not supported

public class LogWriter {
    private final BlockingQueue<String> queue;
    private final LoggerThread logger;
    private static final int CAPACITY = 1000;

    public LogWriter(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>(CAPACITY);
        this.logger = new LoggerThread(writer);
    }

    public void start() {
        logger.start();
    }

    public void log(String msg) throws InterruptedException {
        queue.put(msg);
    }

    private class LoggerThread extends Thread {
        private final PrintWriter writer;

        public LoggerThread(Writer writer) {
            this.writer = new PrintWriter(writer, true); // autoflush
        }

        public void run() {
            try {
                while (true) writer.println(queue.take()); } catch (InterruptedException ignored) { } finally { writer.close(); }}}}Copy the code

The LogWriter in program 13 shows an example of a simple logging service in which logging operations are performed in separate logging threads. The thread that produces the log message does not write the message directly to the output stream. Instead, the LogWriter submits the message to the log thread via BlockingQueue, and the log thread writes it. This is a multi-producer, single-consumer design approach: each log call is a producer, and the logging thread in the background is a consumer. If the consumer’s processing speed is slower than the producer’s generation speed, BlockingQueue blocks the producer until the logging thread is able to process the new log message.

In order for a service like LogWriter to be useful in a software product, you also need to implement a way to terminate the log thread so that the JVM can’t shut down properly. Stopping the log thread is easy because it calls take repeatedly, and take responds to interruptions. If the logging thread is modified to exit when InterruptedException is caught, the service can be stopped only on the logging thread.

However, simply making the logging thread exit is not a complete shutdown mechanism. This straight-off approach loses information that is waiting to be written to the log. Not only that, but other threads will block when the log is called because the log message queue is full, so these threads will not be able to unblock. When canceling a producer-consumer operation, you need to cancel both the producer and the consumer. Consumers are handled when the logging thread is interrupted, but in this example, because producers are not specialized threads, it would be very difficult to cancel them.

Another way to turn off LogWriter is to set some “requested to close” flag to avoid further submission of log messages, as shown in program 14. Upon receiving the close request, the consumer writes all messages in the queue to the log and unblocks any producers that blocked when calling log. However, there are race conditions in this method, which makes it unreliable. The implementation of log is a “judge before you run” code sequence: the producer finds that the service is not shut down, so the log message will still be put on the queue after the service is shut down, which also makes it possible for the producer to block when calling log and lose access to the blocked state.

Procedure 14: Add shutdown support for logging in an unreliable way

public void log(String msg) throws InterruptedException {
    if(! shutdownRequested) queue.put(msg);else
        throw new IllegalStateException("logger is shut down");
}Copy the code

The way to provide a reliable shutdown operation for LogWriter is to solve the race condition problem, thus making the submission of log messages an atomic operation. However, we do not want to hold a lock while the message is enqueued, because the PUT method itself blocks. The approach is to check for closing requests through atomic operations and conditionally increment a counter to “hold” the right to submit messages, as shown in program 15’s LogService.

Procedure 15: Add a reliable cancel operation to LogWriter

public class LogService {
    private final BlockingQueue<String> queue;
    private final LoggerThread loggerThread;
    private final PrintWriter writer;
    @GuardedBy("this") private boolean isShutdown;
    @GuardedBy("this") private int reservations;

    public LogService(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>();
        this.loggerThread = new LoggerThread();
        this.writer = new PrintWriter(writer);
    }

    public void start() {
        loggerThread.start();
    }

    public void stop() {
        synchronized (this) {
            isShutdown = true;
        }
        loggerThread.interrupt();
    }

    public void log(String msg) throws InterruptedException {
        synchronized (this) {
            if(isShutdown) throw new IllegalStateException(/*... * /); ++reservations; } queue.put(msg); } private class LoggerThread extends Thread { public voidrun() {
            try {
                while (true) {
                    try {
                        synchronized (LogService.this) {
                            if (isShutdown && reservations == 0)
                                break; } String msg = queue.take(); synchronized (LogService.this) { --reservations; } writer.println(msg); } catch (InterruptedException e) { /* retry */ } } } finally { writer.close(); }}}}Copy the code

Close the ExecutorService

In the previous section, the ExecutorService provided two shutdowns: Shutdown normally, and shutdownNow forcibly. In a forced shutdown, shutdownNow first closes the currently executing task and then returns a list of all tasks that have not yet been started.

The two shutdown methods differ in their safety and responsiveness: forcible shutdown is faster, but riskier, because the task is likely to end halfway through execution; Normal shutdowns are slower, but safer, because the ExecutorService waits until all tasks in the queue have executed.

Procedure 16: Use the log service of the ExecutorService

public class LogService {
    private final ExecutorService exec= new SingleThreadExecutor(); . public voidstart() {}
    public void stop() { try{ exec.shutdown(); exec.awaitTermination(TIMEOUT, UNIT); } the finally {writer. The close (); } } public voidlog(String msg){
        try{
            exec.execute(new WriterTask(msg));
        }catch(RejectedExecutionException ignored){}
    }
}Copy the code

“Poison pill” object

Another way to turn off a producer-consumer service is to use a “poison pill” object, which is an object placed on a queue that says, “When you get this object, stop immediately.” In A FIFO queue, the poison pill ensures that the consumer completes all work in the queue before closing, that work submitted until the poison pill is submitted is processed, and that the producer submits no more work after the poison pill is submitted.

Procedure 17: Shut down the service with poison pill objects

public class IndexingService {
    private static final int CAPACITY = 1000;
    private static final File POISON = new File("");
    private final IndexerThread consumer = new IndexerThread();
    private final CrawlerThread producer = new CrawlerThread();
    private final BlockingQueue<File> queue;
    private final FileFilter fileFilter;
    private final File root;

    public IndexingService(File root, final FileFilter fileFilter) {
        this.root = root;
        this.queue = new LinkedBlockingQueue<File>(CAPACITY);
        this.fileFilter = new FileFilter() {
            public boolean accept(File f) {
                returnf.isDirectory() || fileFilter.accept(f); }}; } private boolean alreadyIndexed(File f) {return false;
    }

    class CrawlerThread extends Thread {
        public void run() {
            try {
                crawl(root);
            } catch (InterruptedException e) { /* fall through */
            } finally {
                while (true) {
                    try {
                        queue.put(POISON);
                        break;
                    } catch (InterruptedException e1) { /* retry */
                    }
                }
            }
        }

        private void crawl(File root) throws InterruptedException {
            File[] entries = root.listFiles(fileFilter);
            if(entries ! = null) {for (File entry : entries) {
                    if (entry.isDirectory())
                        crawl(entry);
                    else if(! alreadyIndexed(entry)) queue.put(entry); } } } } class IndexerThread extends Thread { public voidrun() {
            try {
                while (true) {
                    File file = queue.take();
                    if (file == POISON)
                        break;
                    elseindexFile(file); } } catch (InterruptedException consumed) { } } public void indexFile(File file) { /*... * /}; } public voidstart() {
        producer.start();
        consumer.start();
    }

    public void stop() { producer.interrupt(); } public void awaitTermination() throws InterruptedException { consumer.join(); }}Copy the code


Poison pill objects can only be used if the number of producers and consumers is known.

The solution adopted in IndexingService can scale to multiple producers: only if each producer puts a poison pill object into the queue and the consumer only stops when it receives the NTH producer’s poison pill object. This approach can also be extended to multiple consumer cases, as long as the producer puts the NTH consumer “poison pill” object into the queue.

However, when the number of producers and consumers is large, this method becomes difficult to use, and only in unbounded queues can poison pill objects work reliably.

Example: a service that executes only once

If a method needs to process a batch of tasks and does not return until all the tasks have been processed, service lifecycle management can be simplified by having a private Executor whose life cycle is controlled by this method.

The checkMail method in program 20 checks for new messages on multiple hosts in parallel. He creates a private Executor and submits a task to each host. Then, when all the mail checking tasks are complete, shut down the Executor and wait for completion.

Procedure 20: Use a private Executor whose lifetime is limited to method calls

public class CheckForMail {
    public boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)
            throws InterruptedException {
        ExecutorService exec= Executors.newCachedThreadPool(); Final AtomicBoolean hasNewMail = new AtomicBoolean(false); Try {// loop to retrieve each hostfor (final String host : hosts)
                exec.execute(new Runnable() {
                    public void run() {
                        if (checkMail(host))
                            hasNewMail.set(true); }}); ExecutorService (ExecutorService) {ExecutorService (ExecutorService) {ExecutorService (ExecutorService) {ExecutorService (ExecutorService); Exec. awaitTermination(timeout, unit); // Wait for the task to complete. }return hasNewMail.get();
    }

    private boolean checkMail(String host) {
        // Check for mail
        return false; }}Copy the code

Limitations of shutdownNow

When forcibly shutting down the ExecutorService via shutdownNow, it tries to cancel in-progress tasks and returns all committed tasks that have not yet been started, either logging them or saving them for later processing.

However, there is no conventional way to find out which tasks have started but are not yet finished. This means that we cannot know the status of the task being executed during shutdown unless the task itself performs some kind of check. To know which tasks have not been completed, you need to know not only which tasks have not been started, but also which tasks are being executed when Executor shuts down.

The TrackingExecutor in program 21 shows how to determine what task is being executed during shutdown. By encapsulating the ExecutorService and allowing Execute to record which tasks were canceled after shutdown, TrackingExecutor can figure out which tasks started but haven’t completed properly. After Executor has finished, getCancelledTasks returns a list of canceled tasks. For this technique to work, the task must maintain the thread’s broken state when it returns, which is implemented in all well-designed tasks.

Task 21: Track tasks canceled after an ExecutorService shutdown

public class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService exec;
    private final Set<Runnable> tasksCancelledAtShutdown =
            Collections.synchronizedSet(new HashSet<Runnable>());

    public TrackingExecutor(ExecutorService exec) {
        this.exec = exec;
    }

    public void shutdown() {
        exec.shutdown();
    }

    public List<Runnable> shutdownNow() {
        return exec.shutdownNow();
    }

    public boolean isShutdown() {
        return exec.isShutdown();
    }

    public boolean isTerminated() {
        return exec.isTerminated();
    }

    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        returnexec.awaitTermination(timeout, unit); } public List<Runnable>getCancelledTasks() {// If shutdownNow is not called or the call is not completedif(! exec.isTerminated()) throw new IllegalStateException(/*... * /);return new ArrayList<Runnable>(tasksCancelledAtShutdown);
    }

    public void execute(final Runnable runnable) {
        exec.execute(new Runnable() {
            public void run() { try { runnable.run(); } finally {// If shutdownNow is called and the running task is interruptedif(isShutdown () && Thread. CurrentThread (). The isInterrupted ()) / / records cancelled task tasksCancelledAtShutdown. Add (runnable); }}}); }}Copy the code

The use of TrackingExecutor is given in the WebCrawler of program 22. A web crawler’s work is usually endless, so when a crawler must be shut down, we usually want to save its state so that it can be restarted later. CrawlTask provides a getPage method that finds out which page is being processed. When the crawler is closed, both the tasks that have not yet started and those that have been cancelled will have their urls recorded, so that when the crawler is restarted, the page fetching tasks for these urls can be added to the task queue.

Procedure 22: use the TrackingExecutorService to save unfinished tasks for later execution

public abstract class WebCrawler {
    private volatile TrackingExecutor exec;
    @GuardedBy("this") private final Set<URL> urlsToCrawl = new HashSet<URL>();

    private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
    private static final long TIMEOUT = 500;
    private static final TimeUnit UNIT = MILLISECONDS;

    public WebCrawler(URL startUrl) {
        urlsToCrawl.add(startUrl);
    }

    public synchronized void start() {
        exec = new TrackingExecutor(Executors.newCachedThreadPool());
        for (URL url : urlsToCrawl) submitCrawlTask(url);
        urlsToCrawl.clear();
    }

    public synchronized void stop() throws InterruptedException {
        try {
            saveUncrawled(exec.shutdownNow());
            if (exec.awaitTermination(TIMEOUT, UNIT))
                saveUncrawled(exec.getCancelledTasks());
        } finally {
            exec = null;
        }
    }

    protected abstract List<URL> processPage(URL url);

    private void saveUncrawled(List<Runnable> uncrawled) {
        for (Runnable task : uncrawled)
            urlsToCrawl.add(((CrawlTask) task).getPage());
    }

    private void submitCrawlTask(URL u) {
        exec.execute(new CrawlTask(u));
    }

    private class CrawlTask implements Runnable {
        private final URL url;

        CrawlTask(URL url) {
            this.url = url;
        }

        private int count = 1;

        boolean alreadyCrawled() {
            return seen.putIfAbsent(url, true) != null;
        }

        void markUncrawled() {
            seen.remove(url);
            System.out.printf("marking %s uncrawled%n", url);
        }

        public void run() {
            for (URL link : processPage(url)) {
                if (Thread.currentThread().isInterrupted())
                    return;
                submitCrawlTask(link);
            }
        }

        public URL getPage() {
            returnurl; }}}Copy the code

There is an inevitable race condition in TrackingExecutor that creates the “false positive” problem: some tasks canceled by a task have actually completed. The reason for this problem is that the thread pool can be shut down between the moment when the task executes its last instruction and the moment when the thread pool records the task as “finished.” This is not a problem if it is idempotent (meaning that executing the task twice gives the same result as executing it once), which is the case in web crawlers. Otherwise, you must consider this risk in your application and be prepared for “false positives”.

Handle abnormal thread terminations

It is easy to understand that when a single-threaded control program terminates due to an uncaught exception, the program stops running and produces very different stack trace information from the normal output of the program. However, if it is not a thread in the program that has failed, it is usually not so obvious. Stack trace information may be printed on the console, but no one is looking at the console. In addition, the application may appear to still be working when the thread fails, so this failure may be ignored.

The leading cause of premature thread death is RuntimeException. Because these exceptions indicate some kind of programming error or other unfixable error, they are not usually caught. Instead of calling the stack layer by layer, they default to entering the stack trace in the console and terminating the thread.

The consequences of an abnormal thread exit can be benign or malignant, depending on the role of the thread in your application. While losing a thread in a thread pool can have a performance impact, if a program can run well on a pool of 50 threads, it will usually run well on a pool of 49 threads.

Program 23 shows how to build a worker thread inside a thread pool. If the task throws an unchecked exception, it terminates the thread, but first notifies the framework that the thread has terminated. The framework may then replace the worker thread with a new thread, or it may not, because the thread pool is closing, or there are currently enough threads to meet the need.

Procedure 23: Typical thread pool worker thread structure

public void run(){
    Throwable thrown = null;
    try{
        while(!isInterrupted())
            runTask(getTaskFromWorkQueue());
    }catch(Throwable e){
        thrown = e;
    }finally{
        threadExited(this, thrown);
    }
}Copy the code

Handling of exceptions not caught

The previous section introduced proactive approaches to resolving unchecked exceptions. The Thread API provides UncaughtExceptionHandler, which detects when a Thread terminates due to an uncaught exception. These two methods are complementary and can be combined to effectively prevent thread leakage.

When a thread exits without catching an exception, the JVM reports the event to the application-provided UncaughtExceptionHandler exception handler. If no exception handlers are provided, the default behavior will be to enter the stack trace information to System.err

Procedure 24: UncaughtExceptionHandler interface

public interface UncaughtExceptionHandler {
    void uncaughtException(Thread t, Throwable e);
}Copy the code

How an exception handler handles uncaught exceptions depends on the quality of service requirements. The most common response is to write an error message and the corresponding stack clock information to the application log. Exception handlers can also take more direct responses, such as restarting threads, shutting down applications, or performing other repair or diagnostic operations.

Program 25: Writes the exception to the log UncaughtExceptionHandler

public class UEHLogger implements Thread.UncaughtExceptionHandler {
    public void uncaughtException(Thread t, Throwable e) {
        Logger logger = Logger.getAnonymousLogger();
        logger.log(Level.SEVERE, "Thread terminated with exception: "+ t.getName(), e); }}Copy the code

In a long-running application, the same uncaught exception is specified for all threads
Exception handlerAnd the handler will at least log the exception.

The JVM closed

The JVM can be shut down normally or forcibly. A normal shutdown can be triggered in a number of ways, including when the last normal thread terminates, when system.exit is called, or when it is closed through another platform method. Although the JVM can be shut down normally by standard methods, it can also be forcibly shut down by calling Runtime.halt or killing the JVM process on the operating system.

Close the hook

In a normal shutdown, the JVM first invokes all of the registered closing hooks. A closed hook is a thread registered via Runtime.addShutdownHook that has not yet been started. The JVM cannot guarantee the order in which the hooks are called to close. When an application thread is closed, if any threads are still running, those threads will then execute concurrently with the closing process. When all closing hooks have finished executing, if runFinalizersOnExit is true, the JVM will run the finalizer and then stop. The JVM does not stop or interrupt any application threads that are still running when shut down. When the JVM finally terminates, these threads are forcibly terminated. If the closing hook or finalizer does not complete, the normal closing process hangs, and the JVM must force a shutdown. When a shutdown is forced, the JVM is just shut down and the close hook is not run.

Closing hooks should be thread-safe: they must use synchronization when accessing shared data and be careful not to think about it, just as other concurrent code requires. Also, closing hooks should not add anything to the state of the application or the reason the JVM was shut down, so you must be thoughtful when writing code to close hooks. Finally, closing hooks must exit as soon as possible, because they delay the JVM’s end time, and the user may want the JVM to terminate as soon as possible.

Procedure 26: Stop the service by registering a close hook

public void start(){
    Runtime.getRuntime().addShutdownHook(new Thread(){
        public void run(){ try{LogServie.this.stop(); } catch(InterruptedException ignored){} } }); }Copy the code

Program 26 shows how to make LogService in program 16 register a close hook in its start method to ensure that the log file is closed on exit.

Daemon thread

There are two kinds of threads: regular threads and daemon threads. All threads created at JVM startup, except for the main thread, are daemons. When a new thread is created, the new thread inherits the daemon state of the thread that created it because, by default, all threads created by the main thread are normal threads.

The only difference between a normal thread and a daemon thread is what happens when the thread exits: When a thread exits, the JVM checks the other running threads, and if they are daemons, the JVM exits normally, and when the JVM stops, any remaining daemons are discarded — neither the finally code nor the rollback stack is executed, and the JVM simply exits.

Daemon threads generally cannot be used as a substitute for an application to manage the life cycle of individual services.

finalizers

Memory resources can be reclaimed through the garbage collector when they are no longer needed, but other resources, such as file handles or socket handles, need to be returned to the operating system explicitly when they are no longer needed. To implement this function, the garbage collector will give special treatment to objects with Finalize methods: After the collector frees them, call their Finalize methods to ensure that some persistent resources are freed.

Because a finalizer can run in a thread managed by a JVM, any state accessed by the finalizer can be accessed by multiple threads, so its access must be synchronized. Finalizers do not guarantee when, or even if, they will run, and complex finalizers often impose significant performance overhead on objects.

Avoid finalizers

summary

End-of-life issues in modules such as tasks, threads, services, and applications can add complexity to their design and implementation. Java does not provide some preemption mechanism to cancel operations or terminate threads. Instead, it provides a collaborative interrupt mechanism to cancel operations, but this depends on how the protocol for canceling operations is constructed and whether it is adhered to at all times. FutureTask and the Executor framework help us build cancelable tasks and services.