多线程中的AQS

AQS简介

AQS取自AbstractQueuedSynchronizer中每个单词的首字母,是一个抽象类,该类是阻塞式锁和相关的同步器工具的框架,JUC包下的一些类继承了该抽象类。

AQS核心思想

AQS核心思想是:状态,队列,CAS。当一个线程要执行的时候,先查看状态是否被占用,如果可用,那么就将当前线程设置为有效的工作线程,将状态设置已占有;如果状态是已占用,就需要一些阻塞唤醒机制来保证多个线程运行的安全性。这个机制是通过内部维护一个状态volatile int state(共享资源),一个变种的CLH锁队列来实现同步功能,以及CAS保证操作原子性。

状态state

state是AQS中的一个成员变量,由volatile修饰。state可以看作是锁的状态,当state为0时,表示未占用,大于0表示已占用,考虑到锁的重入,这里写的是大于0而不是等于1。当state被占用的时候,可以通过exclusiveOwnerThread得知state是被哪个线程所占用,这个类似于synchronized锁对象中的markword。

队列

队列是CLH的变种锁队列,CLH是以Craig、Landin and Hagersten三个人的名字首字母命名的队列,AQS中的队列是CLH变体的双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个Node类型对象来实现锁的分配,该队列被称为sync queue。Node类中的重要属性如下:

  1. volatile Thread thread节点所代表的线程
  2. volatile Node prev当前节点前一个节点的引用
  3. volatile Node next当前节点后一个节点的引用
  4. volatile int waitStatus线程所处的等待锁的状态,初始化时,该值为0
  5. Node nextWaiter该属性用于共享锁,独占锁的时候该值为null

一个sync queue中的head节点是不代表任何线程,因此head节点中的Node的thread是null。当线程没有抢到锁创建其Node对象之后要被放入到sync queue中的时候,即使队列为空,也会被放入到第二个位置。

CAS

AQS中很多操作都是使用CAS来完成的,本质还是使用了Unsafe类中提供的一些方法。CAS操作主要AQS的3个属性state,headtail, 以及Node对象的2个属性waitStatusnext

倘若开发者想要自定义同步器的话,只需要实现共享资源state的获取和释放方式即可,其他如线程队列的维护(如获取资源失败入队/唤醒出队等)等操作,AQS在顶层已经实现了,

AQS代码内部提供了一系列操作锁和线程队列的方法,主要操作锁的方法包含以下几个,后面4个方法通常是由子类重写的:

  • compareAndSetState 利用CAS的操作来设置state的值
  • tryAcquire 独占获取锁
  • tryRelease 独占释放锁
  • tryAcquireShared 共享获取锁
  • tryReleaseShared 共享释放锁

ReentrantLock就是实现了自定义的tryAcquire-tryRelease,从而操作state的值来实现同步效果。

AQS工作过程

AQS中支持两种资源分享的模式:

  • 独占模式(Exclusive),只有一个线程能执行,如ReentrantLock。当一个线程要获取锁时,会先查看state的值,如果是0则获取锁成功,如果大于0,则获取锁失败,将当前线程放入Node对象中,并加入到sync queue队列中。里面很多操作是使用了CAS。
  • 共享模式(Share),多个线程可同时执行,如Semaphore/CountDownLatch。共享模式跟独占模式类似,只不过共享模式下的锁是可以被多个线程共享的,当一个线程获得锁之后,其后面的线程也可能会获取到锁。

下面以独占模式为例进行分析,流程简图如下

通过acquire方法尝试获取锁

该方法中编写了获取锁的逻辑。里面调用了4个方法。

  • tryAcquire(arg)
  • addWaiter(Node mode)
  • acquireQueued(final Node node, int arg)
  • selfInterrupt倘若在争抢锁的过程中出现了中断,在acquire执行结束之前,会调用该方法自我中断。

tryAcquire方法

该方法是由AQS的子类重写, 编写获取锁的逻辑。这里分析一下ReentrantLock中FairSync重写的方法,主要分为下面的步骤:

  1. state如果是0,则说明锁没有被其他线程占用,由于是公平锁,因此在抢占锁之前检查是否有排在自己前面的Node节点,如果没有的话,则使用CAS方式获取锁。
  2. state不为0,则说明锁已被占用,此时会检查是否可重入,倘若不行的话,最终返回false。

addWaiter方法

线程获取锁失败后调用该方法, 作用是将当前线程放入Node对象并加入到sync queue队列尾节点的后面。多线程下有可能失败,即别的线程Node对象成为了新的尾节点,此时会通过自旋的方式(调用的是enq方法),确保当前节点放入到队列中。

尾部逆向遍历

在aqs源码中遍历sync queue的时候是从尾部逆向遍历的,这么操作的原因是enq方法中else的操作并不是原子性的,分为三步执行

  1. 将node的前驱节点设置为目前队列中的尾节点:node.prev = t
  2. 修改tail属性,使它指向当前node节点,这一步是通过cas操作的
  3. 修改之前队列中的尾节点,使它的next指向当前节点

上面操作保证了多线程下,只有一个节点能够成为尾节点。当有多个线程执行上面操作的时候,第1步都能执行成功,第2步是cas操作,第3步是在第2步执行成功的基础上执行。当前2步执行成功之后,有可能第3步还没有来得及执行,此时若从头开始遍历节点的话,最后的尾节点是获取不到的,所以在AQS中对于sync queue的遍历是从后向前的,这样就可以保证尾节点可以获取到了。

acquireQueued方法

对加入sync queue队列中的Node对象进行下面操作,

  1. 如果其前面的节点是head的话,继续尝试获取锁,当获取到锁之后,会将head指向当前Node对象,然后将Node对象中的thread设置为null,这个操作相当于是将Node对象从队列中取出。
  2. 获取锁失败,会根据前面Node节点中的waitStatus值判断是否将当前线程阻塞。
  3. 前面节点中的waitStatus值是CANCELLED时,表示已经取消排队,当前节点会将前置节点再向前调整。
  4. 前面节点中的waitStatus值是SIGNAL时,会将当前线程阻塞。

通过release方法释放锁

该方法中编写了释放锁的逻辑,里面调用了2个方法

  • tryRelease(arg)该方法主要由AQS子类来重写,编写了释放锁的逻辑。
  • unparkSuccessor该方法的作用是锁释放之后要唤醒sync queue中后面的Node。

自定义同步器

了解了AQS的工作过程之后,我们可以自己尝试写一个同步器。

/*
    自定义锁
 */
public class MyLock implements Lock {

    /*
        同步器类
        这里实现一个独占锁
     */
    class MySync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, arg)) {
                //将owner标识为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            //释放锁之后将owner设置为null
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        /*
            是否获取独占锁
         */
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        public Condition newCondition() {
            return new ConditionObject();
        }
    }

    private MySync mySync = new MySync();

    @Override
    public void lock() {
        mySync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        mySync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return mySync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        //锁超时,注意时间的操作
        return mySync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        mySync.release(1);
    }

    @Override
    public Condition newCondition() {
        return mySync.newCondition();
    }
}

测试类

public class TestMyLock {
    public static void main(String[] args) {
        //创建锁对象
        MyLock lock = new MyLock();
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        
        for (int i = 0; i < 2 ; i++) {
            executorService.submit(()->{
                try {
                    lock.lock();
                    //间隔2秒
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName() + ":" + LocalTime.now());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            });
        }

        executorService.shutdown();

    }
}