CountDownLatch和CyclicBarrier
使用场景
场景一
工作中需要在web服务对外之前,spring的bean初始化的时候加载数据到缓存中。
但是由于数据量过大,需要多线程加载。要求所有的缓存加载成功之后,这个bean才初始化成功,程序继续往下走。
场景二
商务合作的需求,需要显示的数据来自合作方和我们自己的数据库中。
之前是只显示我们数据库点数据,现在合作方提供了一个接口让我实时调用对方的接口,并把两部分的数据合并后返回给前端。
但是由于合作方的接口不是特别稳定,而且也不能保证高可用,所以可以考虑同时从我们和合作方取数据,设置超时时间,如果都返回了数据就合并给前端,如果对方未能返回数据,还是有我们自己的数据能显示给用户的。
CountDownLatch和CyclicBarrier
综合上述两个场景,我看了CountDownLatch和CyclicBarrier,看说明觉得比较适合我的使用。
CountDownLatch
package countdownlatchtest;
import com.google.common.collect.Maps;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchService {
private CountDownLatch countDownLatch = new CountDownLatch(4);
/**
* 用来存储所有线程的运行结果
*/
private ConcurrentMap<String, String> resultMap = Maps.newConcurrentMap();
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public ConcurrentMap<String, String> getResultMap() {
return resultMap;
}
}
package countdownlatchtest;
import org.apache.commons.lang3.RandomUtils;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
public class Worker implements Runnable {
private ConcurrentMap<String, String> map;
private CountDownLatch countDownLatch;
public Worker(ConcurrentMap<String, String> map, CountDownLatch countDownLatch) {
this.map = map;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
// 这里写代码做某些事
System.out.println(Thread.currentThread().getName() + "\t开始了...");
final int sleep = RandomUtils.nextInt(5, 20);
try {
Thread.sleep(sleep * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 添加结果到map中
map.putIfAbsent(Thread.currentThread().getName(), String.valueOf(sleep));
// 告诉 countDownLatch ,当前线程完成了
System.out.println(Thread.currentThread().getName() + "\t结束了...");
countDownLatch.countDown();
}
}
package countdownlatchtest;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
final ExecutorService executorService = Executors.newFixedThreadPool(6);
for (int i = 0; i < 3; i++) {
// 模拟多次执行任务
doTask(executorService);
}
}
private static void doTask(ExecutorService executorService) {
CountDownLatchService countDownLatchService = new CountDownLatchService();
for (int i = 0; i < 4; i++) {
Worker worker = new Worker(countDownLatchService.getResultMap(), countDownLatchService.getCountDownLatch());
executorService.submit(worker);
}
try {
// 阻塞在这里,等待其他线程执行完成
countDownLatchService.getCountDownLatch().await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("所有线程执行成功");
System.out.println("打印结果如下:\n" + countDownLatchService.getResultMap());
}
}
CyclicBarrier
package cyclicbarriertest;
import com.google.common.collect.Maps;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierService implements Runnable {
/**
* 表示当有4个线程执行完的时候,会调用第二个参数 barrierAction 的start()方法
*
* 所以本类实现 Runnable 接口,传入this
*/
private CyclicBarrier cyclicBarrier = new CyclicBarrier(4, this);
/**
* 用来存储所有线程的运行结果
*/
private ConcurrentMap<String, String> resultMap = Maps.newConcurrentMap();
@Override
public void run() {
System.out.println("所有线程执行成功");
System.out.println("打印结果如下:\n" + resultMap);
}
public ConcurrentMap<String, String> getResultMap() {
return resultMap;
}
public CyclicBarrier getCyclicBarrier() {
return cyclicBarrier;
}
}
package cyclicbarriertest;
import org.apache.commons.lang3.RandomUtils;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
public class Worker implements Runnable {
private ConcurrentMap<String, String> map;
private CyclicBarrier cyclicBarrier;
public Worker(ConcurrentMap<String, String> map, CyclicBarrier cyclicBarrier) {
this.map = map;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
// 这里写代码做某些事
System.out.println(Thread.currentThread().getName() + "\t开始了...");
final int sleep = RandomUtils.nextInt(5, 20);
try {
Thread.sleep(sleep * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 添加结果到map中
map.putIfAbsent(Thread.currentThread().getName(), String.valueOf(sleep));
try {
// 告诉 cyclicBarrier ,当前线程完成了
System.out.println(Thread.currentThread().getName() + "\t结束了...");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
package cyclicbarriertest;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
final ExecutorService executorService = Executors.newFixedThreadPool(6);
for (int j = 0; j < 3; j++) {
// 模拟多次任务执行
doTask(executorService);
}
}
private static void doTask(ExecutorService executorService) {
CyclicBarrierService cyclicBarrierService = new CyclicBarrierService();
for (int i = 0; i < 4; i++) {
Worker worker = new Worker(cyclicBarrierService.getResultMap(), cyclicBarrierService.getCyclicBarrier());
executorService.submit(worker);
}
}
}
总结
写了上面两个例子,后面发现场景一适合用CountDownLatch
,场景二适合用CyclicBarrier
。场景一我们基本上不存在有任务不能执行完的情况,基本上做到计数器不归0,即使服务启动了也没办法正常使用,场景二很多情况都是任务不能正常的执行完成。