使用场景
场景一
工作中需要在web服务对外之前,spring的bean初始化的时候加载数据到缓存中。
但是由于数据量过大,需要多线程加载。要求所有的缓存加载成功之后,这个bean才初始化成功,程序继续往下走。
场景二
商务合作的需求,需要显示的数据来自合作方和我们自己的数据库中。
之前是只显示我们数据库点数据,现在合作方提供了一个接口让我实时调用对方的接口,并把两部分的数据合并后返回给前端。
但是由于合作方的接口不是特别稳定,而且也不能保证高可用,所以可以考虑同时从我们和合作方取数据,设置超时时间,如果都返回了数据就合并给前端,如果对方未能返回数据,还是有我们自己的数据能显示给用户的。
CountDownLatch和CyclicBarrier
综合上述两个场景,我看了CountDownLatch和CyclicBarrier,看说明觉得比较适合我的使用。
CountDownLatch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package countdownlatchtest;
import com.google.common.collect.Maps;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch;
public class CountDownLatchService {
private CountDownLatch countDownLatch = new CountDownLatch(4);
/** * 用来存储所有线程的运行结果 */ private ConcurrentMap<String, String> resultMap = Maps.newConcurrentMap();
public CountDownLatch getCountDownLatch() { return countDownLatch; }
public ConcurrentMap<String, String> getResultMap() { return resultMap; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package countdownlatchtest;
import org.apache.commons.lang3.RandomUtils;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch;
public class Worker implements Runnable {
private ConcurrentMap<String, String> map; private CountDownLatch countDownLatch;
public Worker(ConcurrentMap<String, String> map, CountDownLatch countDownLatch) { this.map = map; this.countDownLatch = countDownLatch; }
@Override public void run() {
// 这里写代码做某些事 System.out.println(Thread.currentThread().getName() + "\t开始了..."); final int sleep = RandomUtils.nextInt(5, 20); try { Thread.sleep(sleep * 1000); } catch (InterruptedException e) { e.printStackTrace(); }
// 添加结果到map中 map.putIfAbsent(Thread.currentThread().getName(), String.valueOf(sleep));
// 告诉 countDownLatch ,当前线程完成了 System.out.println(Thread.currentThread().getName() + "\t结束了..."); countDownLatch.countDown(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package countdownlatchtest;
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) { final ExecutorService executorService = Executors.newFixedThreadPool(6); for (int i = 0; i < 3; i++) { // 模拟多次执行任务 doTask(executorService); } }
private static void doTask(ExecutorService executorService) { CountDownLatchService countDownLatchService = new CountDownLatchService(); for (int i = 0; i < 4; i++) { Worker worker = new Worker(countDownLatchService.getResultMap(), countDownLatchService.getCountDownLatch()); executorService.submit(worker); } try { // 阻塞在这里,等待其他线程执行完成 countDownLatchService.getCountDownLatch().await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("所有线程执行成功"); System.out.println("打印结果如下:\n" + countDownLatchService.getResultMap()); }
}
|
CyclicBarrier
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package cyclicbarriertest;
import com.google.common.collect.Maps;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierService implements Runnable {
/** * 表示当有4个线程执行完的时候,会调用第二个参数 barrierAction 的start()方法 * * 所以本类实现 Runnable 接口,传入this */ private CyclicBarrier cyclicBarrier = new CyclicBarrier(4, this);
/** * 用来存储所有线程的运行结果 */ private ConcurrentMap<String, String> resultMap = Maps.newConcurrentMap();
@Override public void run() { System.out.println("所有线程执行成功"); System.out.println("打印结果如下:\n" + resultMap); }
public ConcurrentMap<String, String> getResultMap() { return resultMap; }
public CyclicBarrier getCyclicBarrier() { return cyclicBarrier; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package cyclicbarriertest;
import org.apache.commons.lang3.RandomUtils;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier;
public class Worker implements Runnable {
private ConcurrentMap<String, String> map; private CyclicBarrier cyclicBarrier;
public Worker(ConcurrentMap<String, String> map, CyclicBarrier cyclicBarrier) { this.map = map; this.cyclicBarrier = cyclicBarrier; }
@Override public void run() {
// 这里写代码做某些事 System.out.println(Thread.currentThread().getName() + "\t开始了..."); final int sleep = RandomUtils.nextInt(5, 20); try { Thread.sleep(sleep * 1000); } catch (InterruptedException e) { e.printStackTrace(); }
// 添加结果到map中 map.putIfAbsent(Thread.currentThread().getName(), String.valueOf(sleep));
try { // 告诉 cyclicBarrier ,当前线程完成了 System.out.println(Thread.currentThread().getName() + "\t结束了..."); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package cyclicbarriertest;
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) { final ExecutorService executorService = Executors.newFixedThreadPool(6); for (int j = 0; j < 3; j++) { // 模拟多次任务执行 doTask(executorService); } }
private static void doTask(ExecutorService executorService) { CyclicBarrierService cyclicBarrierService = new CyclicBarrierService(); for (int i = 0; i < 4; i++) { Worker worker = new Worker(cyclicBarrierService.getResultMap(), cyclicBarrierService.getCyclicBarrier()); executorService.submit(worker); } } }
|
总结
写了上面两个例子,后面发现场景一适合用CountDownLatch
,场景二适合用CyclicBarrier
。场景一我们基本上不存在有任务不能执行完的情况,基本上做到计数器不归0,即使服务启动了也没办法正常使用,场景二很多情况都是任务不能正常的执行完成。