停止基于服务的线程

停止基于服务的线程

  应用程序通常会创建拥有服务的线程, 比如线程池. 这些服务的存在时间通常要比创建他们的方法存在的时间更长, 如果应用程序优雅的退出了,这些服务的线程也需要结束.因为没有退出线程惯用的优先方法, 他们需要自行结束.

  明智的封装实践指出,你不应该操控某个线程一一中断它,改变他的优先级,等等... 除非是这个.线程的拥有者, 线程API没有关于线程所属权正规的概念. 线程通过一个Thread对象表示,和其他对象一样可以被自由的共享. 但是, 认为线程有一个拥有者是有道理的. 这个拥有者就是创建这个线程的类. 所有线程池拥有它的工作线程, 如果需要中断这些线程. 那么应该由线程池来负责

  例如: ExecutorService 提供的 shutdown 和 shutdownNow 方法.

  对于线程持有的服务, 只要服务的存在时间大于创建线程的方法存在时间,那么就应该提供生命周期方法(lifecycle method)

 

实例: 日志服务

public class LogService {
    private final BlockingQueue<String> queue;
    private final LoggerThread loggerThread;
    private final PrintWriter writer;
    private boolean isShutdown;
    private int reservations;

    public LogService(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>();
        this.loggerThread = new LoggerThread();
        this.writer = new PrintWriter(writer);
    }

    public void start() {
        loggerThread.start();
    }

    public void stop() {
        synchronized (this) {
            isShutdown = true;
        }
        loggerThread.interrupt();
    }

    public void log(String msg) throws InterruptedException {
        synchronized (this) {
            if (isShutdown)
                throw new IllegalStateException(/* ... */);
            ++reservations;
        }
        queue.put(msg);
    }

    private class LoggerThread extends Thread {
        public void run() {
            try {
                while (true) {
                    try {
                        synchronized (LogService.this) {
                            if (isShutdown && reservations == 0)
                                break;
                        }
                        String msg = queue.take();
                        synchronized (LogService.this) {
                            --reservations;
                        }
                        writer.println(msg);
                    } catch (InterruptedException e) { /* retry */
                    }
                }
            } finally {
                writer.close();
            }
        }
    }
}

或者使用 ExecutorService 

public class LogService2 {

    private final ExecutorService executorService = Executors
            .newSingleThreadExecutor();
    private final PrintWriter writer;

    public LogService2(PrintWriter writer) {
        super();
        this.writer = writer;
    }

    public void start() {
    }

    public void close() {
        try {

            this.executorService.shutdown();
            this.executorService.awaitTermination(TIME_OUT, UNIT);
        } finally {
            if (writer != null)
                writer.close();
        }
    }

    public void log(String str) {
        this.executorService.execute(new Writer(str));
    }

}

 

实例: 致命药丸

public class IndexingService {
    private static final int CAPACITY = 1000;
    private static final File POISON = new File("");
    private final IndexerThread consumer = new IndexerThread();
    private final CrawlerThread producer = new CrawlerThread();
    private final BlockingQueue<File> queue;
    private final FileFilter fileFilter;
    private final File root;

    public IndexingService(File root, final FileFilter fileFilter) {
        this.root = root;
        this.queue = new LinkedBlockingQueue<File>(CAPACITY);
        this.fileFilter = new FileFilter() {
            public boolean accept(File f) {
                return f.isDirectory() || fileFilter.accept(f);
            }
        };
    }

    private boolean alreadyIndexed(File f) {
        return false;
    }

    class CrawlerThread extends Thread {
        public void run() {
            try {
                crawl(root);
            } catch (InterruptedException e) { /* fall through */
            } finally {
                while (true) {
                    try {
                        queue.put(POISON);
                        break;
                    } catch (InterruptedException e1) { /* retry */
                    }
                }
            }
        }

        private void crawl(File root) throws InterruptedException {
            File[] entries = root.listFiles(fileFilter);
            if (entries != null) {
                for (File entry : entries) {
                    if (entry.isDirectory())
                        crawl(entry);
                    else if (!alreadyIndexed(entry))
                        queue.put(entry);
                }
            }
        }
    }

    class IndexerThread extends Thread {
        public void run() {
            try {
                while (true) {
                    File file = queue.take();
                    if (file == POISON)
                        break;
                    else
                        indexFile(file);
                }
            } catch (InterruptedException consumed) {
            }
        }

        public void indexFile(File file) {
            /* ... */
        };
    }

    public void start() {
        producer.start();
        consumer.start();
    }

    public void stop() {
        producer.interrupt();
    }

    public void awaitTermination() throws InterruptedException {
        consumer.join();
    }

    public static void main(String[] args) {
        IndexingService is =new IndexingService(new File("e://"), new FileFilter() {

            public boolean accept(File pathname) {
                System.out.println(pathname);
                return true;
            }
        });
        is.start();
        is.stop();
        try {
            is.awaitTermination();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("the end ");
    }
}

  致命药丸只有在生产线程与消费线程已知的情况下才能使用. IndexingService中的解决方案也可以被扩展到多生产者, 只要让每一个生产线程都往队列里放入一个药丸, 并且消费线程收到第 N(生产者的数量)个药丸时停止. 

 

实例: 只执行一次的服务

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。