java, parallel,

Java ExecutorService执行多线程并行任务

DolorHunter DolorHunter Follow Aug 02, 2020 · 3 mins read

Java ExecutorService执行多线程并行任务
Share this

实习过程的后期, 我遇到了一个比较棘手的问题.

菜单的数据内有一个很大的描述报表信息方便前端作图的数据, 或称为大字段. 这个字段的内容从几十 KB 到几 MB 不等, 单条记录查表就大约要花上一两秒. 而菜单又有子菜单, 一进行循环就多了好几倍的时间. 仅仅在几十条数据里查找, 就要花上快十秒的时间, 这是不可接受的, 因此我尝试用并行来解决问题.

虽然我在体系结构的课上, 对流水 (pipeline) 和并行 (parallel) 的概念有所了解, 但是我并没有实际操作的经验, 也不清楚大致的执行过程, 于是只能靠搜索引擎凿壁借光.

在花了不少时间查资料后, 我找到了 飞龙 整理的 Java8 简明指南. 里面以文档形式详细的介绍了并发的各个部分, 内容十分详尽. 再配合上 Yong Mook Kim 写的例子, 很快的就实现了并行功能. Yong Mook Kim 写了很多关于 Spring Boot 的博客内容和示例代码, 值得一看.

ExecutorService 并行

Java 中有很多并行实现的方式, 其中 ExecutorService 作为一个在程序中直接使用Thread的 高层次的替换方案。Executors支持运行异步任务,通常管理一个线程池,这样一来我们就不需要手动去创建新的线程。在不断地处理任务的过程中,线程池内部线程将会得到复用. 当然还有 invokeAll 方式, 不过博文似乎都把他放在后面, 我也不是很有耐心的人, 就没有使用他.

1. 实例化 ExecutorService

我们首先要对 ExecutorService 实例化, 并且设置其执行的任务类型和任务.

ExecutorService 可以操作 RunnableCallable 两种任务.

  • 1.Runnable 用于返回值为空的场景

    // 5 thread
    ExecutorService executor = Executors.newFixedThreadPool(5);
    // Runnable, return void, nothing, submit and run the task async
    executor.submit(() -> {
      String threadName = Thread.currentThread().getName();
      System.out.println("Hello " + threadName);
    });
    
  • 2.Callable 用于返回值非空的场景 (返回内容用Future装载)

    // 5 thread
    ExecutorService executor = Executors.newFixedThreadPool(5);
    // Callable, return a future, submit and run the task async
    Future<Integer> futureTask = executor.submit(() -> {
      System.out.println("I'm Callable task.");
      return 1 + 1;
    });
    

2. 执行并行任务

举个例子:

假设我们有若干个方法准备执行, 可以在 try{} 段内按照方法之间的逻辑顺序排列出来. 我们可以认为有零个或多个的任务 otherTask() 在 futureTask() 之前, 同时也有零个或多个的任务 otherTask() 在 futureTask() 之后.

通常来说, 我们执行的任务需要返回参数. 因此我们需要取得 Future 容器内装载的数据(Future 在上方 Callable 任务有说明). 我们使用 get方法得到任务结果, 同时也可以对运行时间做一个限制, 如果超时则中断任务.

我们需要为并行考虑三种异常情况: 线程中断, 线程异常和线程超时. 通常 IDE 都会自动补齐或是提示你补齐这三种例外, 因此也没什么说的.

以下就是执行部分的代码模板.

try {
  // My other task
  otherTask1("Before Future Result");

  // My task (Get result from my task)
  // timeout if the future takes more than 5 seconds to return the result
  Integer result = futureTask.get(5, TimeUnit.SECONDS);
  System.out.println("Get future result : " + result);

  // My other task
  otherTask2("After Future Result");

  } catch (InterruptedException e) {// thread was interrupted
    e.printStackTrace();
  } catch (ExecutionException e) {// thread threw an exception
    e.printStackTrace();
  } catch (TimeoutException e) {// timeout before the future task is complete
    e.printStackTrace();
  } finally {
    // shut down the executor manually
    executor.shutdown();
  }

第一部分 (实例化) 和第二部分 (并行执行) 的代码是顺序并列排列的. 两部分的代码共同完成并行任务的执行.

看到这里如果还是看不明白的话, 可以去看看 Mkyong - Java ExecutorService examples.

3. 并行效率问题

并行之后就一定会快很多吗? 不是这样的. 只有在方法之间不相干, 且执行时间差距不大时, 才能获得最大的效率. 当然, 设置一个合理的线程 (thread) 数量也很重要.

我这次的任务直接的相关性很大, 因此即使是并行处理, 提升也不足 10%, 可以说微乎其微. 即使把线程数大调特调, 也没什么大变化. 我认为实验中的变化来源更可能是处理器性能波动, 而非线程数量(因为把执行时间最短的线程数再执行一次, 执行时间就变成最长的了).

附一份实验记录:

Thread Pool Service time
1 8.45s
2 8.47s
4 8.37s
8 8.47s
16 8.62s
32 8.52s
64 8.37s
128 8.39s
200 9.34s

线程数量的选择上, 06 年以前主流观点认为, 线程数量 = 处理器核心数量. 而现在普遍认为

Number of threads = CPU Cores * (1 + Wait time / Service time)

线程数也不是多多益善. 过多的线程除了对资源需求激增, 还会因为线程之间的数据传递造成时间上浪费, 因此选择一个最佳的线程数很重要.

参考资料:

Join Newsletter
Get the latest news right in your inbox. We never spam!
DolorHunter
Written by DolorHunter
Developer & Independenet Blogger