7.4J.U.C值AQS-CyclicBarrier

在Java中,CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到所有线程都到达某个公共屏障点。当所有线程都到达屏障点后,才能继续执行后续操作。

使用场景

CyclicBarrier适用于需要多个线程协同完成某个任务的场景。例如,一个大型的计算任务需要被分解成多个小任务并行执行,最后将所有小任务的结果合并成最终结果。这时,可以使用CyclicBarrier来同步所有线程的执行,等待所有小任务都完成后再进行结果合并。

示例代码

下面是一个简单的示例代码,演示了如何使用CyclicBarrier来同步多个线程的执行:

package com.moluo.concurrency.aqs;

import java.util.concurrent.*;

public class CyclicBarrierExample {

    private static final int THREAD_COUNT =20;
    private static CyclicBarrier cyclicBarrier=new CyclicBarrier(5);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService= Executors.newCachedThreadPool();

        for (int i=0;i<THREAD_COUNT;i++){
            final int index=i;
            executorService.execute(()->{
                try {
                    test(index);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }

        System.out.println("finish");
        executorService.shutdown();
    }

    private static void test(int threadNum) throws InterruptedException, BrokenBarrierException {
        Thread.sleep(1000);
        System.out.println(threadNum+" is ready");
        cyclicBarrier.await();
        System.out.println(threadNum + " continue");
    }
}

在上面的代码中,我们创建了一个CyclicBarrier对象,并指定了需要等待的线程数为5。然后,我们创建了20个线程,并在每个线程中调用await()方法等待其他线程到达屏障点。当所有线程都到达屏障点后,它们会继续执行后续操作。

总结

CyclicBarrier是一个非常有用的同步工具类,它可以帮助我们实现多个线程之间的协同工作。在使用CyclicBarrier时,需要注意线程数和屏障点的设置,以确保所有线程都能够正确地同步执行。

扩展

await()方法允许设置超时时间

package com.moluo.concurrency.aqs;

import java.util.concurrent.*;

public class CyclicBarrierExample2 {

    private static final int THREAD_COUNT =60;
    private static CyclicBarrier cyclicBarrier=new CyclicBarrier(5);

    public static void main(String[] args) {
        ExecutorService executorService= Executors.newCachedThreadPool();

        for (int i=0;i<THREAD_COUNT;i++){
            final int index=i;
            executorService.execute(()->{
                try {
                    test(index);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }

        System.out.println("finish");
        executorService.shutdown();
    }

    private static void test(int threadNum) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println(threadNum+" is ready");
        try {
            cyclicBarrier.await(2000,TimeUnit.MILLISECONDS);
        } catch (BrokenBarrierException | TimeoutException e) {
            e.printStackTrace();
        }
        System.out.println(threadNum + " continue");

    }
}

同时等待到达屏障点之后,允许先执行cyclicBarrier的方法,在执行各线程各自的方法

package com.moluo.concurrency.aqs;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierExample3 {

    private static final int THREAD_COUNT = 20;
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
        System.out.println("callback is running");
    });

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();

        for (int i = 0; i < THREAD_COUNT; i++) {
            final int index = i;
            executorService.execute(() -> {
                try {
                    test(index);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }

        System.out.println("finish");
        executorService.shutdown();
    }

    private static void test(int threadNum) throws InterruptedException, BrokenBarrierException {
        Thread.sleep(1000);
        System.out.println(threadNum + " is ready");
        cyclicBarrier.await();
        System.out.println(threadNum + " continue");
    }
}

Last updated

Was this helpful?