7.3J.U.C值AQS-Semaphore

用于控制访问某资源的并发数

使用场景,假设数据库的连接数只有20,而上层应用的请求并发数可能达到几千几万。若无数请求同时对数据库进行操作,则可能造成获取不到数据库连接数而导致异常,这个时候就可以通过信号量semaphore来做并发访问控制。

package com.moluo.concurrency.aqs;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreExample {

    private static final int THREAD_COUNT = 30;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();

        Semaphore semaphore = new Semaphore(2);

        for (int i = 0; i < THREAD_COUNT; i++) {
            final int index = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire(); // 获取一个许可
                    test(index);
                    semaphore.release(); // 释放一个许可
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        System.out.println("finish");
        executorService.shutdown();
    }

    private static void test(int threadNum) throws InterruptedException {
        System.out.println(threadNum);
        Thread.sleep(1000);
    }
}

注意使用semaphore时,可以获取多个许可后,再执行方法,如下

semaphore.acquire(3); // 获取多个许可
test(index);
semaphore.release(3); // 释放多个许可

尝试获取许可,获取到许可的线程则执行任务,获取不到许可的线程直接丢弃任务

package com.moluo.concurrency.aqs;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreExample3 {

    private static final int THREAD_COUNT = 30;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();

        Semaphore semaphore = new Semaphore(2);

        for (int i = 0; i < THREAD_COUNT; i++) {
            final int index = i;
            executorService.execute(() -> {
                try {
                    if (semaphore.tryAcquire()){ //尝试获取许可,获取到许可的线程可执行任务,否则丢弃任务
                        test(index);
                        semaphore.release(); // 释放一个许可
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown(); 
    }

    private static void test(int threadNum) throws InterruptedException {
        System.out.println(threadNum);
        Thread.sleep(1000);
    }
}

尝试获取许可,获取到许可的线程则执行任务,获取不到许可的线程,等待一段时间,该时间段内获取到许可则执行任务,获取不到则直接丢弃任务

package com.moluo.concurrency.aqs;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreExample3 {

    private static final int THREAD_COUNT = 30;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();

        Semaphore semaphore = new Semaphore(2);

        for (int i = 0; i < THREAD_COUNT; i++) {
            final int index = i;
            executorService.execute(() -> {
                try {
                    if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)){ //尝试获取许可
                        test(index);
                        semaphore.release(); // 释放一个许可
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }

    private static void test(int threadNum) throws InterruptedException {
        System.out.println(threadNum);
        Thread.sleep(1000);
    }
}

Last updated

Was this helpful?