JSR 166规范

JSR 介绍

JSR,全称 Java Specification Requests, 即 Java 规范提案, 主要是用于向 JCP(Java Community Process) 提出新增标准化技术规范的正式请求。每次 JAVA 版本更新都会有对应的 JSR 更新,比如在 Java 8 版本中,其新特性 Lambda 表达式对应的是 JSR 335,新的日期和时间 API 对应的是 JSR 310

JSR 166 是 Doug Lea 提出的一个关于 Java 并发编程的规范提案。JDK1.5 之前,我们控制程序并发访问同步代码只能使用 synchronized,那个时候 synchronized 的性能还没优化好,性能并不好,控制线程也只能使用 Object 的 wait 和 notify 方法。这个时候 Doug Lea 给 JCP 提交了 JSR-166 的提案,在提交 JSR-166 之前,Doug Lea 已经使用了类似 J.U.C 包功能的代码已经三年多了,这些代码就是 J.U.C 的原型。

J.U.C,即 java.util.concurrent 的缩写,该包参考自 EDU.oswego.cs.dl.util.concurrent,是 JSR 166 标准规范的一个实现。

jsr-166-concurrency-utilities

JSR-166 包括多个规范,每个规范都引入了一些新的接口和类,以下是详细描述:

  1. JSR-166(Java SE 5):定义了 Java 并发包的核心接口和类,包括 Executors 框架、Queues、Timing、Synchronizers、Concurrent Collections、Memory Consistency Properties、Atomic、Locks 等。这些接口和类提供了一种方便、高效、可扩展的方式来处理异步任务和并发编程。
  2. JSR-166x(Java SE 7):定义了 Java 并发包中的一些新特性,包括 Phaser、TransferQueue、Exchanger、LinkedTransferQueue 等接口和类。其中 Phaser 支持分阶段执行任务,TransferQueue 和 LinkedTransferQueue 实现了高效的生产者-消费者模式,Exchanger 支持两个线程之间交换数据。
  3. JSR-166y(Java SE 8):定义了 Java 并发包中的一些新特性,包括 StampedLock、CompletableFuture、LongAdder 等接口和类。其中 StampedLock 是一种乐观锁,支持读写分离,CompletableFuture 支持异步任务执行和结果处理,LongAdder 是一种高效的计数器。
  4. JSR-166z(Java SE 9):定义了 Java 并发包中的一些新特性,包括 VarHandle、Fences 等接口和类。其中 VarHandle 提供了一种更加灵活的原子操作方式,Fences 提供了一些方法用于控制内存屏障。

JUC

java.util.concurrent 包下的类以及引入版本(没有标注版本号的为 1.5 ):

  • java.util.concurrent
    • java.util.concurrent.locks
      • AbstractOwnableSynchronizer 1.6
      • AbstractQueuedLongSynchronizer 1.6
      • AbstractQueuedSynchronizer
      • Condition
      • Lock
      • LockSupport
      • ReadWriteLock
      • ReentrantLock
      • ReentrantReadWriteLock
      • StampedLock 1.8
    • java.util.concurrent.atomic
      • AtomicBoolean
      • AtomicInteger
      • AtomicIntegerArray
      • AtomicIntegerFieldUpdater
      • AtomicLong
      • AtomicLongArray
      • AtomicLongFieldUpdater
      • AtomicMarkableReference
      • AtomicReference
      • AtomicReferenceArray
      • AtomicReferenceFieldUpdater
      • AtomicStampedReference
      • DoubleAccumulator 1.8
      • DoubleAdder 1.8
      • LongAccumulator 1.8
      • LongAdder 1.8
    • AbstractExecutorService
    • ArrayBlockingQueue
    • BlockingDeque 1.6
    • BlockingQueue
    • BrokenBarrierException
    • Callable
    • CancellationException
    • CompletableFuture 1.8
    • CompletionException 1.8
    • CompletionService
    • CompletionStage 1.8
    • ConcurrentHashMap
    • ConcurrentLinkedDeque 1.7
    • ConcurrentLinkedQueue
    • ConcurrentMap
    • ConcurrentNavigableMap 1.6
    • ConcurrentSkipListMap 1.6
    • ConcurrentSkipListSet 1.6
    • CopyOnWriteArrayList
    • CopyOnWriteArraySet
    • CountDownLatch
    • CountedCompleter 1.8
    • CyclicBarrier
    • Delayed
    • DelayQueue
    • Exchanger
    • ExecutionException
    • Executor
    • ExecutorCompletionService
    • Executors
    • ExecutorService
    • Flow 1.9
    • ForkJoinPool 1.7
    • ForkJoinTask 1.7
    • ForkJoinWorkerThread 1.7
    • Future
    • FutureTask
    • LinkedBlockingDeque 1.6
    • LinkedBlockingQueue
    • LinkedTransferQueue 1.7
    • Phaser 1.7
    • PriorityBlockingQueue
    • RecursiveAction 1.7
    • RecursiveTask 1.7
    • RejectedExecutionException
    • RejectedExecutionHandler
    • RunnableFuture
    • RunnableScheduledFuture
    • ScheduledExecutorService
    • ScheduledFuture
    • ScheduledThreadPoolExecutor
    • Semaphore
    • SubmissionPublisher 1.9
    • SynchronousQueue
    • ThreadFactory
    • ThreadLocalRandom 1.7
    • ThreadPoolExecutor
    • TimeoutException
    • TimeUnit
    • TransferQueue 1.7

大致可以分为以下几类:

  • 原子更新
  • 锁和条件
  • 线程池
  • 并发容器
  • 同步器

在学习 JUC 之前我们需要了解 CASAQSUnsafe

  • CAS:
  • AQS:
  • Unsafe:

CAS

CAS(Compare and Swap)是一种基于原子性操作的并发编程技术,常用于实现线程安全的数据结构和算法。CAS 操作由三个参数组成:内存位置 V、期望值 A、新值 B。当且仅当 V 的值等于 A 时,CAS 操作才会将 V 的值设置为 B,否则不做任何操作。它的实现原理可以简单概括为以下几个步骤:

  1. 读取内存位置 V 的值,同时记录下该值的版本号或标记位。
  2. 检查内存位置 V 的值是否等于期望值 A。如果相等,则执行第 3 步;否则,操作失败。
  3. 将新值 B 写入内存位置 V,并更新其版本号或标记位。
  4. 返回操作结果。

CAS 操作是一种乐观锁机制,它不需要锁定整个共享资源,而是只针对需要修改的值进行原子性操作,从而避免了锁的竞争和开销。在执行 CAS 操作时,线程会对内存位置进行读取和写入,但同时也会检查内存位置的版本号或标记位,以保证操作的原子性和一致性。

需要注意的是,如果多个线程同时执行 CAS 操作,可能会出现 ABA 问题。例如,线程 A 读取内存位置 V 的值为 A,然后线程 B 将 V 的值修改为 B,最后线程 B 又将 V 的值修改为 A。此时,线程 A 执行 CAS 操作时,会发现内存位置 V 的值还是 A,虽然这个 A 的版本号或标记位与之前不同,但线程 A 并不知道 V 的值曾经被修改过,因此会将新值写入内存位置 V,从而导致数据不一致。为了解决 ABA 问题,可以使用带有版本号或标记位的 CAS 操作,或者使用其他的并发编程技术,例如锁或读写锁。

Java 中的AtomicXXX类实现了 CAS 操作,例如 AtomicInteger、AtomicLong 等。这些类提供了一组原子性操作方法,例如 get()、set()、addAndGet()、compareAndSet()等,它们可以被多个线程安全地使用。

CAS 操作虽然免去了锁的开销,但也存在一些问题。首先,CAS 操作需要进行多次尝试,直到成功为止。如果并发程度较高,多个线程同时进行 CAS 操作,可能会导致大量的 CAS 操作失败,从而降低性能。其次,CAS 操作只能保证单个变量的原子性操作,无法保证多个变量之间的操作的原子性,因此需要额外的措施来保证多个变量之间的一致性。

下面是一个使用 AtomicInteger 实现简单计数器的例子:

import java.util.concurrent.atomic.AtomicInteger;

public class Counter {
    private AtomicInteger value = new AtomicInteger(0);

    public void increment() {
        int oldValue, newValue;
        do {
            oldValue = value.get();
            newValue = oldValue + 1;
        } while (!value.compareAndSet(oldValue, newValue));
    }

    public int getValue() {
        return value.get();
    }
}

在上面的示例中,increment() 方法使用 do-while 循环和compareAndSet()方法执行 CAS 操作来增加计数器的值。该方法重复使用get()方法读取计数器的当前值,计算新值,然后尝试使用compareAndSet()方法更新计数器。循环将继续,直到 CAS 操作成功并且计数器成功更新。

getValue() 方法使用get()方法简单地返回计数器的当前值。

需要注意的是,在使用 CAS 操作时,需要小心处理潜在的 ABA 问题,其中共享变量的值可能在初始读取和更新尝试之间多次更改。一种处理方法是在共享变量中使用版本号或时间戳,以确保更新仅在值未更改的情况下成功。

ABA 问题是在使用 CAS(Compare-and-Swap)操作进行并发编程时经常遇到的一个问题。它发生在一个线程从共享内存位置读取一个值,然后另一个线程将该值更改为另一个值,最后又将其更改回原始值,从而使第一个线程的操作意外成功。

为了处理 ABA 问题,常用的方法是在共享内存位置中添加一个版本号或时间戳。版本号或时间戳可以在每次修改内存位置时进行递增或更新。这可以确保 CAS 操作不仅检查值,还检查内存位置的版本号或时间戳。

以下是使用版本号处理 ABA 问题的示例:

import java.util.concurrent.atomic.AtomicStampedReference;

public class ConcurrentStack<T> {
    private AtomicStampedReference<Node<T>> top = new AtomicStampedReference<>(null, 0);

    public void push(T value) {
        Node<T> newHead = new Node<>(value);
        int[] stampHolder = new int[1];
        Node<T> oldHead;
        do {
            oldHead = top.get(stampHolder);
            newHead.next = oldHead;
            stampHolder[0]++;
        } while (!top.compareAndSet(oldHead, newHead, stampHolder[0] - 1, stampHolder[0]));
    }

    public T pop() {
        Node<T> oldHead;
        int[] stampHolder = new int[1];
        do {
            oldHead = top.get(stampHolder);
            if (oldHead == null) {
                return null;
            }
        } while (!top.compareAndSet(oldHead, oldHead.next, stampHolder[0], stampHolder[0] + 1));
        return oldHead.value;
    }

    private static class Node<T> {
        private final T value;
        private Node<T> next;

        private Node(T value) {
            this.value = value;
        }
    }
}

在上面的示例中,ConcurrentStack 类使用 AtomicStampedReference 存储栈顶节点。AtomicStampedReference 类存储值的引用和版本号,版本号在引用更改时进行更新。

push() 方法使用新值创建一个新的 Node,然后尝试使用 CAS 操作将其推入栈中。循环将继续,直到 CAS 操作成功,节点成功推入栈中。

pop() 方法尝试使用 CAS 操作从栈中弹出顶部节点。循环将继续,直到顶部节点成功弹出或栈为空为止。

通过使用具有版本号的 AtomicStampedReferenceConcurrentStack 类可以处理在并发操作中可能发生的 ABA 问题。

AQS

AQS(AbstractQueuedSynchronizer)是 Java 中用于实现同步器(如锁,信号量等)的框架,它提供了一些基本的同步操作,例如获取锁释放锁等待条件唤醒线程等。

AQS 的实现原理基于一个双向链表,用于维护等待线程的队列。当一个线程需要获取同步器时,它会首先尝试使用 CAS 操作来获取同步器,如果获取成功,则继续执行;如果获取失败,则将线程加入等待队列中,并将其挂起。当同步器释放时,它会唤醒等待队列中的一个或多个线程,并将它们从等待队列中移除,使它们可以继续执行。

AQS 的等待队列是通过一个双向链表来实现的,每个节点代表一个等待线程,节点中包含了线程的状态以及等待条件等信息。等待队列中的节点是按照等待时间的先后顺序排列的,先等待的线程排在前面,后等待的线程排在后面。当一个线程被唤醒时,它会重新尝试获取同步器,如果获取成功,则继续执行;如果获取失败,则它会再次加入等待队列中,并将自己挂起。

AQS 的具体实现是通过重写其内部的一些方法来实现的。例如,tryAcquire() 方法用于实现获取同步器的逻辑,它会首先尝试使用 CAS 操作来获取同步器,如果获取成功,则返回 true;否则返回 false。tryRelease() 方法用于实现释放同步器的逻辑,它会释放同步器,并唤醒等待队列中的一个或多个线程。tryAcquireShared()tryReleaseShared() 方法则用于实现共享式同步器的逻辑,它们类似于 tryAcquire()tryRelease() 方法,但是可以支持多个线程同时获取或释放同步器。

Unsafe

Unsafe 类是 Java 中一个非常特殊且强大的类,它提供了一些不安全的操作,例如直接操作内存、线程挂起和恢复等。Unsafe 类是 Java 中少数几个不被公开支持的类之一,它主要被用于 Java 核心库和其他一些高级框架中,如 Netty、Hadoop 和 Kafka 等。

由于 Unsafe 类提供了一些不安全的操作,因此它的使用需要非常小心。如果不正确地使用 Unsafe 类,可能会导致程序崩溃或安全漏洞。因此,Java 官方并不建议开发人员使用 Unsafe 类,而是建议开发人员使用更加安全和标准的 Java API。

Unsafe 类中一些常用的方法包括:

  1. allocateMemory(long size):分配一段指定大小的内存空间。
  2. freeMemory(long address):释放指定地址的内存空间。
  3. putXXX(Object target, long offset, XXX value):将指定类型的值写入目标对象的指定偏移量处。
  4. getXXX(Object target, long offset):从目标对象的指定偏移量处读取指定类型的值。
  5. park(boolean isAbsolute, long time):挂起当前线程,直到被其他线程唤醒或指定的时间到期。
  6. unpark(Thread thread):恢复指定线程的运行。

需要注意的是,Unsafe 类中的大部分方法都是 native 方法,实现方式依赖于底层操作系统和硬件平台。这意味着 Unsafe 类中的方法在不同的平台上可能会有不同的行为,因此需要针对不同的平台进行测试和验证。

Java 9 中官方提出了移除 Sun.misc.Unsafe 类,并在该版本中将该类标记为不推荐使用。然而,由于 Unsafe 类在 Java 语言生态中的应用非常广泛,许多框架和库都依赖于 Unsafe 类来实现高性能和低层次的操作。因此,在 Java 9 中,官方引入了 jdk.internal.misc.Unsafe 类来替代 Sun.misc.Unsafe 类的功能,以保持对 Java 生态中使用 Unsafe 类的支持。

Share this post:

Related content