ReentrantLock

# 一、概述

# 利用AQS自定义锁

/**
 * 自定义锁
 */
class MyAQSLock implements Lock{

    class MySync extends AbstractQueuedSynchronizer{


        @Override
        protected boolean tryAcquire(int arg) {
            
            if (compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            // state变量是volatile 有写屏障 可以保证前面的对其他线程可见
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        public Condition newCondition(){
            return new ConditionObject();
        }
    }

    private MySync mySync = new MySync();


    @Override
    public void lock() {
        mySync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        mySync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return mySync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return mySync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        mySync.release(1);
    }

    @Override
    public Condition newCondition() {
        return mySync.newCondition();
    }
}

# 二、源码

# 1.简介

ReentrantLock相关的继承关系图如图:

img

如下为ReentrantLock类的结构。与上面我们自定义的锁类似。实现了Lock接口,对外提供Lock接口的方法。有一个同步器属性,上锁、释放锁都是通过调用同步器的相关方法实现的。构造时,同步器可以选择公平锁/非公平锁,它们都继承了抽象父类Sync,而Sync又继承了AQS。

抽象类AQS维护了等待队列,而ReentrantLock只需要定义共享资源的获取与释放的方式。

package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Collection;

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

  
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

  
    public void lock() {
        sync.lock();
    }


    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }


    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }


    public int getHoldCount() {
        return sync.getHoldCount();
    }


    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }


    public boolean isLocked() {
        return sync.isLocked();
    }
}

# 2.构造方法

  • 无参构造器:创建的是非公平锁,同步器初始化为NonfairSync。
  • 有参构造器:true创建的是公平锁,false创建的是非公平锁。

公平锁,会降低并发性能,而非公平锁性能更好,因为可以减少唤醒线程的开销,整体吞吐效率更高,但可能造成饥饿

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

# 3.非公平锁实现

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

# 3.1加锁

如果竞争成功,直接占有锁。如果失败,调用父类AQS中的acquire()方法

   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            // addWaiter构造队列 把头结点
            acquireQueued(
                addWaiter(Node.EXCLUSIVE), arg)
           )
            // 如果在acquireQueued()被中断过 这里自己产生一个中断 
            selfInterrupt();
    }

前面的tryAcquire()在NonfairSync中有重写,即nonfairTryAcquire()(这个方法就是ReentrantLock中的tryLock()的实现)

对于非公平锁:

  • 如果没有加锁,会直接CAS尝试加锁

  • 如果锁已经被持有,会看是不是自己持有了锁,如果是,则累加state,即发生了锁重入

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

如果tryAcquire()加锁失败,会addWaiter(Node.EXCLUSIVE), arg),创建一个独占的节点放到队列尾部

   private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
       // 自旋 空队列则CAS地初始化队列,队列有节点就把节点挂到队列尾部
        enq(node);
        return node;
    }

把当前节点入队列之后,执行acquireQueued():

    // 以独占、不可中断的模式 获取队列中的线程
	final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取自己的前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点是头结点 自己是第二个节点,那么可能有机会获得锁 
                // 尝试获取锁(非公平锁直接获取,公平锁会...)
                if (p == head && tryAcquire(arg)) {
                    // 获取到锁 把当前节点设为头结点,去掉之前的头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    
                    // 如果在shouldParkAfterFailedAcquire()保存过中断信号 这里返回有中断 否则直接返回没有被中断过
                    return interrupted;
                }
                // 如果自己不是第二个节点 或者自己虽然是第二个节点,但是由于非公平锁,新来的线程可以不入队列,直接获取锁,所以这里可能被其他线程抢先了
                // 将前驱节点waitStatus改为-1,即会唤醒后继节点
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // park 阻塞、检查中断标志 如果被中断了,保存下来,后面获取到锁之后会返回中断标志,后续自行产生一个中断标志
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

# 3.2释放锁

    public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            // 判断头节点不为空,状态不为0
            if (h != null && h.waitStatus != 0)
                // 唤醒后继节点 找到队列里离head最近的一个没取消的node,unpark恢复其运行
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
		protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
    		// 没拿到锁 释放锁会抛异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
    		// 如果完全释放了 直接把锁的持有者置空
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
    		// 部分释放 则设置新的重入锁次数
            setState(c);
            return free;
        }

# 4.公平锁实现

相比于非公平锁,公平锁在获取锁之前,会检查队列中有没有线程在等待,如果有的话不会去获取锁 ,而是会入队列

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 先判断是否需要排队
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

hasQueuedPredecessors()

    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
  • true:需要排队

    • h != t 且 (s = h.next) == null:至少两个节点,且头结点后继节点为空。可能发生在第一个线程正在尝试获取锁失败,正在初始化同步队列时,tail = head还没有赋值成功,已经有另一个线程准备进来,所以需要排队。
    • h != t 且 (s = h.next) != null 且 s.thread != Thread.currentThread()至少两个节点,且头结点的后继节点不是自己。此时轮到头结点的后继节点获取锁,当然需要排队
  • false:不需要排队

    • h == t 直接短路,没必要判断是不是两个都假队列为空或只有一个节点,说明前面没有其他线程排队
    • 前面为真 ,后面为假:(h != t)且 (s = h.next) != null 且 s.thread == Thread.currentThread(),至少两个节点,且头结点有后继节点,且后继节点就是当前线程。说明已经轮到自己去获取锁了,无需排队
  • 注意,这里 h != t 都不是绝对代表至少两个节点,当初始化队列时,设置head与tail指向head节点不是原子操作,可能出现刚设置head,但tail = head还没有赋值成功,此时tail = null,即h != t;

    			if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                }
    
上次更新: 2022/04/25, 21:51:40
最近更新
01
BeanFactoryPostProcessor
06-05
02
深入理解Java虚拟机读书笔记(四)
05-03
03
深入理解Java虚拟机(三)
04-30
更多文章>