首页 » 并发和多线程 » 正文

优雅关机之IO阻塞状态退出

我在另外一篇文章一个框架的线程同步代码引起的思考中讲解了如何对一个后台服务进行优雅关机,并且给出一个示例程序来说明优雅关机的实现细节,后来,有读者提出了一个质疑,质疑在IO阻塞状态下,仅仅interrupt一个线程是不足够让一个线程100%退出的,IO阻塞下的线程只有在阻塞完成,Socket关闭的时候,或者流被关闭的时候才会退出。这位读者还说,NIO则可以在interrupt线程的时候将IO阻塞的线程退出。本篇文章将根据读者提供的线索深入研究可靠保证线程退出的方法。

首先,我们复习一下Java线程状态,从网上搜罗了一张线程状态图:

thread-status

从这张图中看到,一个线程有New, Runnable, Running, Bocked(join, sleep, wait, synchronized), Dead等状态,但是线程基本是在可执行状态/执行状态,线程阻塞状态之间来回转换,其中线程阻塞可能是由于join, sleep, wait, syncrhonized等线程同步操作引起的。

在上一篇文章中我们讨论过,线程在可执行状态下/执行状态下,我们通过虚拟机钩子响应退出事件,在退出的时候,设置退出标志,由于线程处于可执行状态/执行状态,当它获得CPU时间的时候,并且做完一次操作,会检查退出标志,发现退出标志被设置则主动退出,所以,一般的线程循环代码如下:

1
2
3
while(!isExit) {
    // do biz
}

那么处于线程阻塞状态下的线程,阻塞状态可能是由于使用了join, sleep, wait, synchronized等线程操作引起的,这时的线程在等待着某个条件发生后唤起线程继续执行,那么我们需要通过interrrupt线程的方法让其退出,退出后会产生一个InterruptedException,这从InterruptedException的注释中也可以看到:

 Thrown when a thread is waiting, sleeping, or otherwise occupied, and the thread is interrupted, either before or during the activity.

线程类中interrupt方法定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();

        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }

那么要想实现优雅关机,我们需要这样写线程循环:

1
2
3
4
5
6
7
8
while(!isExit) {
    try {
        // do block(join, sleep, wait, synchronized)
    } catch (InterruptedException e) {
        log.error("Bocked worker is interrupted. This may generate some inconsistent data. Please check the log.");
        interrupted = true;
    }
}

到这里,我们基本实现了一个有保证的优雅关机,然而,这个实现并不是完美的,正如文章开头读者提出的质疑,线程还有一种特殊的状态,就是IO阻塞状态,线程处于可执行状态/执行状态,但是线程在调用IO操作,IO操作会阻塞,也就是等待外设的结果,这种情况下,我们设置退出标志以及使用Thread.interrupt都无法退出,写一个小的例子验证以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class InterruptReadTest {
    public static void main(String[] args) throws IOException {
        Thread t = new Thread(new Runnable() {
           
            @Override
            public void run() {
                System.out.println("in");
                try {
                    // IO阻塞
                    System.in.read();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("out");
            }
        });
       
        t.start();
        t.interrupt();
    }
}

程序输出:

in

我们看到通过interrupt是无法退出线程的,现在我们使用流关闭来退出线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class InterruptReadTest {
    public static void main(String[] args) throws IOException {
        Thread t = new Thread(new Runnable() {
           
            @Override
            public void run() {
                System.out.println("in");
                try {
                    // IO阻塞
                    System.in.read();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("out");
            }
        });
       
        t.start();
        //t.interrupt();
        System.in.close();
    }
}

程序输出:

in
out
java.io.IOException: Stream closed
	at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:214)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at com.robert.issue.interrupt.InterruptReadTest$1.run(InterruptReadTest.java:14)
	at java.lang.Thread.run(Thread.java:745)

我们看到,一个处于IO阻塞的线程,如果流被关闭了,那么线程会退出IO阻塞状态,并且抛出IOException,因此,要想优雅关机,我们必须关闭当前读的流,让其线程退出,并且处理IOException。这时我们的线程循环应该这样写:

1
2
3
4
5
6
7
8
9
10
11
12
while(!isExit) {
    try {
        // do block(join, sleep, wait, synchronized
        // or do IO blocking
    } catch (InterruptedException e) {
        log.error("Bocked worker is interrupted. This may generate some inconsistent data. Please check the log.");
        interrupted = true;
    } catch (IOException e) {
        log.error("IO Blocking worker is interrupted. This may generate some inconsistent data. Please check the log.");
        streamClosed = true;
    }
}

请注意,我们需要在一个异步线程中关闭流,才能抛出IOException,仅仅调用Thread.interrupt()不能让IO阻塞线程退出。我们的退出程序应该这样写:

1
2
3
4
void shutdownNow() {
    thread.interrupt();
    stream.close();
}

然而,这个问题并不存在NIO中,因为NIO中的Channel在父类AbstractInterruptibleChannel中做了相应的处理,在做IO操作之前,先在线程中注册了一个Interruptible的实现,当这个线程的inerrupt被调用的时候,会调用这个Interruptible的实现,在这个Interruptible的实现中,关闭当前的Channel, 让线程退出,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected final void begin() {
if (interruptor == null) {
    interruptor = new Interruptible() {
            public void interrupt(Thread target) {
                synchronized (closeLock) {
                    if (!open)
                        return;
                    open = false;
                    interrupted = target;
                    try {
                        AbstractInterruptibleChannel.this.implCloseChannel();
                    } catch (IOException x) { }
                }
            }};
}
blockedOn(interruptor);
Thread me = Thread.currentThread();
if (me.isInterrupted())
    interruptor.interrupt(me);
}

在上面的代码,先创建一个Interruptible的实现来关闭Channel,并且把这个实现传递给当前IO线程,blockedOn方法的具体实现如下:

1
2
3
4
5
void blockedOn(Interruptible b) {
    synchronized (blockerLock) {
        blocker = b;
    }
}

现在回头看上面Thead.interrupt()的实现,你会发现,这个Interruptible的实现会在线程interupt的时候被调用,进而会关闭Channel, 来让线程退出IO阻塞状态。

现在总结一下,线程可能处于执行状态,线程阻塞状态,IO阻塞状态,执行状态的线程会主动检查退出标志,线程阻塞状态需要进行interrupt操作即可退出,并抛出InterruptedException, 而IO阻塞状态需要关闭流或者Socket来让其退出,并且抛出IOException,然而,NIO的Channel在线程退出的时候关闭了Channel, 因此,如果使用NIO,则只需要进行线程interrupt操作即可退出一个线程。