4.2原子性-Atomic

提到原子性,我们就不得不讲Atomic包,通过Atomic包可以保证原子性,实现线程安全

下面我们介绍atomic包中的常用类

AtomicXXX

AtomicInteger使用示例

package com.moluo.concurrency.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerExample {
    public static int clientTotal = 5000;
    public static int threadTotal = 200;
    public static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println("count:" + count);
    }

    private static void add() {
        count.incrementAndGet();
    }
}

为何使用AtomicInter就能保证线程安全呢?通过查看底层源码我们发现Atomic包的核心为CAS(Unsafe.compareAndSwapInt),Atomic包就是通过CAS保证线程安全的,相关源码如下:

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

CAS原理:循环比较对象当前值和底层值,只有当对象当前值和底层值相同时才允许更新,否则就再循环

其中我们需要关注的是compareAndSwapInt(Object var1, long var2, int var4, int var5)方法,参数解释:

  • var1:对象

  • var2:对象当前值

  • var4:对象底层值

  • var5:更新值

LongAdder

JDK 8 中新增加LongAdder类,该类和AtomicLong十分相似。两者的区别如下:

  • AtomicLong采用死循环的方式,当高并发的时候,有一定的性能问题

  • LongAdder在atomic的基础上,将单点的更新压力分散到各个节点上。在低并发的时候,通过对base的直接更新,可以很好的保证和atomic的性能基本一致,而在高并发的时候通过分散,提高性能

package com.moluo.concurrency.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.LongAdder;

public class LongAdderExample {
    // 请求总数
    public static int clientTotal = 5000;
    // 同时并发执行的线程数
    public static int threadTotal = 200;
    public static LongAdder count = new LongAdder();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println("count:" + count);
    }

    private static void add() {
        count.increment();
    }
}

LongAdder设计思想

我们知道,对于普通类型的Long和Double变量,JVM允许将64位的读操作和写操作拆成2个32位的操作。

而LongAdder基于热点数据分离的思想设计的,它可以将AtomicLong内部核心数据value分离为一个数组,每个线程访问时,通过Hash等算法映射到其中一个数字进行计数,再通过对这个数组的求和累加得到最终的计数结果。

LongAdder缺点

在统计的时候,如果有并发更新,可能导致统计数据有误差。

AtomicLong和LongAdder的使用场景

  • 在实际使用中,当有高并发的时候,优先使用LongAdder

  • 在线程竞争很低的情况下,使用Atomic更简单直观,却性能略优于LongAdder

  • 若需要准确的数值,如序列号生成,全局唯一的AtomicLong才是正确的选择,而不适合使用LongAdder

AtomicReference、AtomicReferenceFieldUpdater

AtomicReference使用示例

package com.moluo.concurrency.atomic;

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceExample {

    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        count.compareAndSet(0, 2); // 2
        count.compareAndSet(0, 1); // no
        count.compareAndSet(1, 3); // no
        count.compareAndSet(2, 4); // 4
        count.compareAndSet(3, 5); // no
        System.out.println("count: " + count);
    }
}

AtomicReferenceFieldUpdater使用示例

package com.moluo.concurrency.atomic;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;


public class AtomicReferenceFieldUpdaterExample {

    private static AtomicIntegerFieldUpdater<AtomicReferenceFieldUpdaterExample> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicReferenceFieldUpdaterExample.class, "count");

    public volatile int count = 100;

    public static void main(String[] args) {

        AtomicReferenceFieldUpdaterExample example = new AtomicReferenceFieldUpdaterExample();

        if (updater.compareAndSet(example, 100, 120)) {
            System.out.println("update success 1, : " + example.getCount());
        }
        if (updater.compareAndSet(example, 100, 120)) {
            System.out.println("update success 2, : " + example.getCount());
        } else {
            System.out.println("update failed , : " + example.getCount());
        }
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }
}

AtomicStampReference

CAS的ABA问题

在CAS操作的时候,其他线程将变量的值A改为B,又由B改为A,本线程使用期望值A与当前变量比较的时候,发现A变量没有变,于是CAS就将A值进行了交互操作,但其实A值已经被其他线程改变过

如何解决ABA问题

AtomicStampReference通过引入Stamp,每次变量操作的时候,将当前变量版本号加一,从而解决CAS的ABA问题

AtomicBoolean

当希望某些代码只执行一次的时候可使用

package com.moluo.concurrency.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

public class AtomicBooleanExample {
    public static int clientTotal = 5000;
    public static int threadTotal = 200;
    public static AtomicBoolean isHappened = new AtomicBoolean(false);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println("isHappended:" + isHappened);
    }

    private static void test() {
        if (isHappened.compareAndSet(false, true)) {
            System.out.println("execute");
        }
    }
}

Last updated

Was this helpful?