精灵王


  • 首页

  • 文章归档

  • 所有分类

  • 关于我

  • 搜索
设计模式-行为型 设计模式-创建型 设计模式-结构型 设计 系统设计 设计模式之美 分布式 Redis 并发编程 个人成长 周志明的软件架构课 架构 单元测试 LeetCode 工具 位运算 读书笔记 操作系统 MySQL 异步编程 技术方案设计 集合 设计模式 三亚 游玩 转载 Linux 观察者模式 事件 Spring SpringCloud 实战 实战,SpringCloud 源码分析 线程池 同步 锁 线程 线程模型 动态代理 字节码 类加载 垃圾收集器 垃圾回收算法 对象创建 虚拟机内存 内存结构 Java

源码分析:ReentrantReadWriteLock之读写锁

发表于 2020-11-13 | 分类于 JDK源码系列 | 0

简介

ReentrantReadWriteLock 从字面意思可以看出,是和重入、读写有关系的锁,实际上 ReentrantReadWriteLock 确实也是支持可重入的读写锁,并且支持公平和非公平获取锁两种模式。

为什么会出现读写锁?

普通锁可以保证共享数据在同一时刻只被一个线程访问,就算有多个线程都只是读取的操作,也还是要排队等待获取锁,我们知道数据如果只涉及到读操作,是不会出现线程安全方面的问题的,那这部分加锁是不是可以去掉?或者是加锁不互斥?如果在读多写少的情况下,使用普通的锁,在所有读的情况加锁互斥等待会是一个及其影响系统并发量的问题,如果所有的读操作不互斥,只有涉及到写的时候才互斥,这样会不会大大的提高并发量呢?答案是肯定的,ReentrantReadWriteLock 就是这样干的,读读不互斥,读写、写读、写写都是互斥的,可以大大提高系统并发量。

源码分析

类结构

ReentrantReadWriteLock 仅实现了ReadWriteLock接口

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {...}

ReadWriteLock 接口仅有两个方法,分别是 readLock() 和 writeLock();

主要属性

ReentrantReadWriteLock 有3个重要的属性,分别是读锁readerLock,写锁writerLock和同步器sync,源码如下:

private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;

主要内部类

  1. Sync:同步器,继承至AbstractQueuedSynchronizer,定义了两个抽象方法,用于两种模式下自定义实现判断是否要阻塞

    abstract static class Sync extends AbstractQueuedSynchronizer{
    	...
            abstract boolean readerShouldBlock();
            abstract boolean writerShouldBlock();
    	...
    }
    
  2. NonfairSync:非公平同步器,用于实现非公平锁,继承Sync

    static final class NonfairSync extends Sync {...}
    
  3. FairSync:公平同步器,用于实现公平锁,继承Sync

    static final class FairSync extends Sync {...}
    
  4. ReadLock:读锁,实现了Lock接口,持有同步器Sync的具体实例

    public static class ReadLock implements Lock, java.io.Serializable {
     ...
          private final Sync sync;
     ...
    }
    
  5. WriteLock:写锁,实现了Lock接口,持有同步器Sync的具体实例

    public static class WriteLock implements Lock, java.io.Serializable {
     ...
          private final Sync sync;
     ...
    }
    

构造方法

有两个默认的构造方法,无参默认采用非公平锁,有参传入true使用公平锁

public ReentrantReadWriteLock() {
    this(false);
}
public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

获取读写锁

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

获取读锁:readLock.lock()

读锁主要是按照共享模式来获取锁的,在前面讲AQS的例子中——基于AQS实现自己的共享锁,也是差不多的流程,只不过不同的锁的实现方法tryAcquireShared有一定的区别。ReentrantReadWriteLock 读锁获取过程源码如下:

public void lock() {
    // 共享模式获取锁
    sync.acquireShared(1);
}
// acquireShared 是AQS框架里面的代码
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
// tryAcquireShared 是RRWLock.Sync 里面的自己实现,所以这里没有公平和非公平所谓之称
protected final int tryAcquireShared(int unused) {
    // 当前想要获得锁的线程
    Thread current = Thread.currentThread();
    // 获取state值
    int c = getState();
    // 独占锁被占用了,并且不是当前线程占有的,返回-1,出去要排队
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
        return -1;
    // 读锁共享锁的次数
    int r = sharedCount(c);
    // 判断读是否要阻塞,读共享锁的次数是否超过最大值,CAS 更新锁state值
    // readerShouldBlock 的返回要根据同步器是否公平的具体实现来决定
    if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            // r==0, 设置第一次获得读锁的读者
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 持有第一个读者读锁的线程重入计数
            firstReaderHoldCount++;
        } else {
            // 除第一个线程之后的其他线程获得读锁
            // 每个线程每次获得读锁重入计数+1
            // readHolds 就是一个ThreadLocal,里面放的HoldCounter,用来统计每个线程的重入次数 
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        // 获得读锁,返回1
        return 1;
    }
		// 上面if分支没进去时,走这里尝试获取读锁
    return fullTryAcquireShared(current);
}

上面代码中的readerShouldBlock()方法有两种情况下会返回true:

  1. 公平模式下,调用的AQS.hasQueuedPredecessors()方法

    static final class FairSync extends Sync {
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }
    
    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        // head 头结点是当前持有锁的节点,它的下一个节点不是当前线程,返回true,表示应该要阻塞当前线程
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    👏上面代码的主要思想就是:看一下队头排队等待获取锁的线程是不是当前线程,不是的话就应该要阻塞当前线程;

  2. 非公平模式下,最终调用的AQS.apparentlyFirstQueuedIsExclusive()方法

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }
    // apparentlyFirstQueuedIsExclusive 方法是AQS里面的方法
    final boolean apparentlyFirstQueuedIsExclusive() {
        // h 是同步队列的头结点,当前持有锁的节点
        // s 是下一个应该获得锁的节点
        Node h, s;
        // s 节点如果不是共享模式(在RRWLock 里面就是读锁的意思),s节点是排他模式(想要写锁)返回true,
        return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null;
    }
    

    👏上面代码的主要思想就是:看一下队头排队等待获取锁的第一个线程是不是要获取写锁,如果是就返回true,表示要阻塞当前线程,当前线程前面还有个要获得写锁的线程在排队呢!如果存在这种情况,其他获取读锁的线程都要给这种情况让路(写锁优先级更高)。那如果队列中第一个线程不是要获取写锁,那既然都是获取读锁,那就无所谓了,允许你插队。

上面的if分支进入失败时,会进入到fullTryAcquireShared()方法再次尝试获得读锁有3种情况会进入到这个方法:

  1. readerShouldBlock() 方法返回true,上面已经分析了,这个方法什么时候会返回true
  2. 共享计数达到了最大值 MAX_COUNT(65535),可能性较小
  3. CAS 修改state 值失败,也就是获取锁失败

下面是 fullTryAcquireShared() 方法的分析:

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    // 自旋
    for (;;) {
        int c = getState();
        // != 0 已经有其他线程获得了写锁
        if (exclusiveCount(c) != 0) {
            // 如果不是当前线程获得的写锁,返回-1,出去阻塞排队
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // 要进入到这个分支,说明exclusiveCount(c) == 0 , 也就是写锁没被占用
            //  readerShouldBlock() == true , 公平模式下,同步队列中有其他线程在排队,非公平模式下,有即将要获得写锁的线程
            //  readerShouldBlock() 返回true ,也就是要阻塞当前线程的意思
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                // 进入到这里,说明第一个读锁不是当前线程获得的
                // rh 可以理解为当前线程的重入计数
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 返回-1,阻塞当前线程,出去排队
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            // 超读锁上限,抛出错误
            throw new Error("Maximum lock count exceeded");
        // 进入到这儿,说明线程没有其他线程获得了写锁,并且不需要阻塞当前线程
        // 再次尝试CAS 获得锁,CAS 修改失败会继续自旋进行
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 成功获得锁
            if (sharedCount(c) == 0) {
                // 第一个获得读锁的线程
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // 第一个获得读锁的线程重入计数+1
                firstReaderHoldCount++;
            } else {
                // 非第一个获得读锁的线程 
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                // 线程重入计数
                rh.count++;
                // 缓存成功获取readLock的最后一个线程的计数
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

如果上面fullTryAcquireShared()方法还是没有获得锁,返回-1,就会进入下面的doAcquireShared(int arg)方法:

// doAcquireShared 方法是AQS里面的代码,非RRWLock 实现
private void doAcquireShared(int arg) {
    // 添加一个共享模式的节点到同步队列,并返回当前节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // 中断标识
        boolean interrupted = false;
        // for循环自旋操作
        for (;;) {
            // 在同步队列中,当前节点的前驱结点
            final Node p = node.predecessor();
            if (p == head) {
                // 如果前驱结点是头结点,说明排队轮到当前节点获得锁
                // tryAcquireShared 再次尝试获取锁,上面的逻辑一模一样
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // >=0 说明成功获得了锁 
                    // 设置新的头结点,并检查后面是否是在获得读锁,如果是就唤醒它
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        // 阻塞期间线程被中断了
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 阻塞中断线程
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    // 旧的头结点
    Node h = head; // Record old head for check below
    // 获得锁的线程节点设置为新的头结点
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 检查获得锁的下一个节点s是否是共享模式的节点(读)
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

private void doReleaseShared() {
    // 自旋
    for (;;) {
        Node h = head;
        // 同步队列不为空
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // -1 :表示当前节点的后继节点包含的线程需要运行,也就是unpark
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒被阻塞的下一个节点
                unparkSuccessor(h);
            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 只会唤醒一个节点,在调用上面代码过程中,如果head节点变了,就会一直自旋,直到成功
        if (h == head)                   // loop if head changed
            break;
    }
}

获取读锁过程总结

  1. 尝试去获取锁tryAcquireShared()
  2. 在tryAcquireShared() 中成功获得锁,就直接退出,执行lock() 之后的代码逻辑
    1. 如果有其他线程已经占用了写锁,退出方法,返回-1,获取锁失败
    2. 检查是否要阻塞当前的线程readerShouldBlock(),有两种情况下(也就是公平锁和非公平锁获取读锁的区别)会阻塞当前现在:
      1. 如果是公平锁,会看一下队头排队等待获取锁的线程是不是当前线程,不是的话就应该要阻塞当前线程;公平模式下是不允许插队的!
      2. 如果是非公平锁,看一下队头排队等待获取锁的第一个线程是不是要获取写锁,如果是表示要阻塞当前线程,写锁优先级更高!
    3. 检查读锁计数是否已经到了最大值(65535)
    4. 上面检查通过,才尝试CAS 修改同步状态,修改成功,代表成功获取读锁,退出方法返回1
      1. 成功获取读锁,如果是第一个获得读锁的线程,会缓存该线程firstReader,如果是重入,会进行重入计数,如果是新的线程获得读锁,会用一个ThreadLocal来保存重入计数
    5. 如果到上面还没获取到锁(可能是CAS修改同步状态失败),会进行自旋继续尝试获取锁,对应方法fullTryAcquireShared() ,该方法要么获取锁成功,要么获取锁失败,直到退出整个tryAcquireShared() 方法
  3. 如果tryAcquireShared() 中没有获得锁,进入到AQS的doAcquireShared方法,排队、阻塞线程
    1. doAcquireShared 方法也是一个自旋的操作,没有获取到锁,就会阻塞线程,等待被唤醒后继续获取锁,知道获取锁成功为止

释放读锁:readLock.lock()

读锁释放锁的逻辑如下:

public void unlock() {
    // 开始释放读锁
    sync.releaseShared(1);
}
//AQS框架中 的方法
public final boolean releaseShared(int arg) {
    // tryReleaseShared 在RRWLock 中的Sync里面
    if (tryReleaseShared(arg)) {
        // 唤醒后面的读锁节点
        doReleaseShared();
        return true;
    }
    return false;
}
// RRWLock.Sync 的实现方法
protected final boolean tryReleaseShared(int unused) {
    // 当前线程
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // 第一个读锁线程 
        if (firstReaderHoldCount == 1)
            // 如果它只获得了一次锁,直接置为null
            firstReader = null;
        else
            // 第一个线程获得读锁,并且重入获取锁很多次,慢慢减,直到为1,置为null
            firstReaderHoldCount--;
    } else {
        // 不是第一个线程
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                // 线程没有锁,还来释放锁,会抛出异常
                throw unmatchedUnlockException();
        }
        // 减计数
        --rh.count;
    }
    // 上面只是减重入的计数
    // 下面是自旋,重置同步状态state值
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc)) 
            // CAS 修改成功,并且要state为0才是真正释放了读锁
            // 如果有重入,只有释放最后一次才会返回true, 之后才会去尝试唤醒之后的节点
            return nextc == 0;
    }
}

private void doReleaseShared() {
    // 自旋
    for (;;) {
        Node h = head;
        // 同步等待的队列不为空
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 检查状态是否要唤醒下一个节点的线程
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 加入h节点是持有锁的节点,会唤醒它的下一个节点线程
                unparkSuccessor(h);
            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 理论上唤醒一个就会退出
        if (h == head)                   // loop if head changed
            break;
    }
}

释放读锁过程总结:

  1. 减计数,包含线程重入获取锁的计数
    • 从这里可以看出一个线程存在多次释放锁,会抛出异常
  2. 自旋,CAS 修改同步状态,重入获取锁的线程只有在state等于0时才是真正的释放锁成功
  3. 释放锁成功后,会唤醒队列中的下一个节点,下一个节点会继续获取锁

获取写锁:writeLock.lock()

public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
    // 当前线程
    Thread current = Thread.currentThread();
    int c = getState();
    // 写锁计数,>0的话说明写锁已经被占用了
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // c != 0 and w == 0 可能共享锁已经被占用了,这时候写锁获取失败
        // 同一个线程先获取读锁,再获取写锁,也会在这里返回false,获取写锁出去之后会阻塞自己,
        // 然后自己的读锁也不会释放,其他线程也获取不了读锁,就出现了死锁
        if (w == 0 || current != getExclusiveOwnerThread())
            // c != 0 and w == 0  锁的持有者不是当前线程,返回false
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            // 超限了 65535
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        // 重入获取锁,计数+1
        setState(c + acquires);
        return true;
    }
    //  writerShouldBlock的实现代码,以看上面读锁获取readerShouldBlock的分析
    // 公平锁时,writerShouldBlock 调用的hasQueuedPredecessors()
    // 非公平锁时,只返回false
    if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
        // CAS  修改失败,返回false
        return false;
    // 成功获取写锁,设置锁的拥有者线程
    setExclusiveOwnerThread(current);
    return true;
}

如果上面方法没有获取到写锁,会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ,这块的代码分析,可以查看之前的文章,关于AQS的分析或者ReentrantLock的分析。

释放写锁:writeLock.unlock()

释放写锁的逻辑比较简单,一般加锁和解锁都是成对出现的,所以这里解锁并不需要同步互斥的手段来进行,源代码如下:

public void unlock() {
    sync.release(1);
}
// AQS 框架的代码
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    // 校验是否是当前线程持有写锁
    if (!isHeldExclusively())
        // 释放别人的写锁,抛出异常
        throw new IllegalMonitorStateException();
    // 计算下一个同步状态值
    int nextc = getState() - releases;
    // 重入的情况,是否已经完全释放了
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        // 完全释放了,设置锁的持有者线程
        setExclusiveOwnerThread(null);
    // 
    setState(nextc);
    return free;
}

完全释放锁成功后,唤醒下一个节点的逻辑在AQS的unparkSuccessor代码中,不需要RRWLock来实现。

死锁问题

在上面获取写锁的过程中,分析了同一个线程先获取读锁,再获取写锁,写锁的逻辑会阻塞自己的线程,但是写锁和读锁又是同一个线程,相当于前面的写锁也被阻塞了,这时候写锁没地方释放,读锁也没有地方释放,其他线程读锁和写锁也都获取不了了,因为前面有个写锁在排队获取。

public static void main(String[] args) throws InterruptedException{
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
    Lock writeLock = lock.writeLock();
    Lock readLock = lock.readLock();
    new Thread(new Runnable(){
        @SneakyThrows
        @Override
        public void run(){
            TimeUnit.SECONDS.sleep(1);
            // 模拟1秒后其他线程来获得读锁
            System.out.println(Thread.currentThread().getName()+":准备获得读锁");
            readLock.lock();
            System.out.println(Thread.currentThread().getName()+":线程获得读锁");
            readLock.unlock();
            System.out.println(Thread.currentThread().getName()+":释放了读锁");
        }
    },"T0").start();
    readLock.lock();
    System.out.println(Thread.currentThread().getName()+":获得了读锁");
    writeLock.lock();
    System.out.println(Thread.currentThread().getName()+":获得了写锁");
    readLock.unlock();
    System.out.println(Thread.currentThread().getName()+":解读锁");
    writeLock.unlock();
    System.out.println(Thread.currentThread().getName()+":解写锁");
}

输出结果:

main:获得了读锁
T0:准备获得读锁

从上面输出结果可以看出,只有main线程获得了读锁,自己获取写锁被阻塞,其他线程也获取不了读锁,最后产生了死锁。

写线程饥饿问题

ReentrantReadWriteLock 的读写是互斥的,意思就是读锁在获取锁后,在还没有释放锁的期间,获取写锁的进程来了也要阻塞自己排队,如果有大量的线程获取了读锁,之后有一个线程获取写锁,写锁就可能一直获取不到写锁,引起写锁线程“饥饿”,这就是RRWLock的写线程饥饿问题。

我们用代码来验证一下上面的结论:

private static void testWriteLockHunger() throws InterruptedException{
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
    Lock writeLock = lock.writeLock();
    Lock readLock = lock.readLock();
    // T0 线程先获得读锁,并持有一段时间
    new Thread(new Runnable(){
        @SneakyThrows
        @Override
        public void run(){
            readLock.lock();
            System.out.println(Thread.currentThread().getName()+":最开始线程获得读锁");
            // 睡眠15秒,一直持有读锁
            TimeUnit.SECONDS.sleep(15);
            readLock.unlock();
            System.out.println(Thread.currentThread().getName()+":释放了读锁");
        }
    },"T0").start();
    // 1秒后其他线程再来获取锁,保证前面那个T0线程最先获得读锁
    TimeUnit.SECONDS.sleep(1);
    // TW-1 来排队获取写锁,是为了让后面的读锁,写锁都入队排队
    new Thread(new Runnable(){
        @SneakyThrows
        @Override
        public void run(){
            System.out.println(Thread.currentThread().getName()+":准备获得写锁");
            writeLock.lock();
            System.out.println(Thread.currentThread().getName()+":获得写锁");
            TimeUnit.SECONDS.sleep(5);
            writeLock.unlock();
            System.out.println(Thread.currentThread().getName()+":释放了写锁");
        }
    },"TW-1").start();
    TimeUnit.SECONDS.sleep(1);
    // 这里睡眠1秒是为了写锁排队在读锁获取的前面
    IntStream.range(1,5).forEach(i->{
        new Thread(new Runnable(){
            @SneakyThrows
            @Override
            public void run(){
                System.out.println(Thread.currentThread().getName()+":准备获取读锁");
                readLock.lock();
                System.out.println(Thread.currentThread().getName()+":获取了读锁");
                // 持有部分时间的读锁
                TimeUnit.SECONDS.sleep(i*2);
                readLock.unlock();
                System.out.println(Thread.currentThread().getName()+":释放了读锁");
            }
        },"T-"+i).start();
    });
    // 最后再来个获取写锁的线程,肯定会在所有读锁的后面获取到写锁
    new Thread(new Runnable(){
        @SneakyThrows
        @Override
        public void run(){
            System.out.println(Thread.currentThread().getName()+":准备获取写锁");
            writeLock.lock();
            System.out.println(Thread.currentThread().getName()+":获取了写锁");
            // 持有部分时间的读锁
            TimeUnit.SECONDS.sleep(2);
            writeLock.unlock();
            System.out.println(Thread.currentThread().getName()+":释放了写锁");
        }
    },"TW").start();
}

上面代码输出示例:

T0:最开始线程获得读锁
TW-1:准备获得写锁
T-1:准备获取读锁
T-2:准备获取读锁
T-4:准备获取读锁
T-3:准备获取读锁
TW:准备获取写锁
T0:释放了读锁
TW-1:获得写锁
TW-1:释放了写锁
T-1:获取了读锁
T-2:获取了读锁
T-4:获取了读锁
T-3:获取了读锁
T-1:释放了读锁
T-2:释放了读锁
T-3:释放了读锁
T-4:释放了读锁
TW:获取了写锁
TW:释放了写锁

从上面输出结果可以看出,TW写锁是最后才获取到写锁的,如果前面有大量的读锁在排队的话,写锁肯定就会造成饥饿的。

如果不想让获取写锁的线程“饥饿”怎么办呢?

可以把最后获取写锁的线程TW获取锁方式改造下,代码如下:

new Thread(new Runnable(){
    @SneakyThrows
    @Override
    public void run(){
        System.out.println(Thread.currentThread().getName()+":准备获取写锁");
        while(!writeLock.tryLock()){
            // 一直尝试获得写锁,直到成功
        }
        System.out.println(Thread.currentThread().getName()+":获取了写锁");
        // 持有部分时间的读锁
        TimeUnit.SECONDS.sleep(2);
        writeLock.unlock();
        System.out.println(Thread.currentThread().getName()+":释放了写锁");
    }
},"TW").start();

测试输出结果:

T0:最开始线程获得读锁
TW-1:准备获得写锁
T-1:准备获取读锁
T-2:准备获取读锁
T-3:准备获取读锁
T-4:准备获取读锁
TW:准备获取写锁
T0:释放了读锁
TW-1:获得写锁
TW-1:释放了写锁
TW:获取了写锁
TW:释放了写锁
T-4:获取了读锁
T-2:获取了读锁
T-3:获取了读锁
T-1:获取了读锁
T-1:释放了读锁
T-2:释放了读锁
T-3:释放了读锁
T-4:释放了读锁

从上面输出结果可以看出,TW线程成功的在读锁前面获取到了写锁;那为什么会这样呢?因为采用lock()来获取锁,如果第一次tryAcquire没有获取到锁,就会被加入到队列等待,只要进入了队列,就只能按照队列中的顺序来获得锁了,而tryLock在获取锁失败后是不会加入到同步等待队列中去的,从而实现“插队”的功能。

总结

  1. 读写锁除了读读不互斥,读写、写读、写写都是互斥的。
  2. 读写互斥的意思是A线程先获取读锁不释放,B来获取写锁,这时候B线程一样的要阻塞自己
  3. 同一个线程先获取读锁,再获取写锁,会导致死锁
  4. 允许同一个线程先获取写锁,再获取读锁;但是不允许同一个线程先获取读锁,再获取写锁;可以理解为允许锁降级,不允许锁升级。
  5. 公平锁模式下,获取写锁会去检查队列中是否有排队更久的线程。
  6. 非公平锁模式下,获取写锁不会去检查同步队列中是否有排队更久的线程。
  7. 公平锁模式下,获取读锁会去检查队列中是否有排队更久的线程。
  8. 非公平锁模式下,获取读锁会去检查队列中第一个等待获取的是不是写锁,如果存在就要阻塞当前获取读锁的线程(写锁优先级更高)。
精 灵 王 wechat
👆🏼欢迎扫码关注微信公众号👆🏼
  • 本文作者: 精 灵 王
  • 本文链接: https://jinglingwang.cn/archives/reentrantreadwritelock
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# 设计模式-行为型 # 设计模式-创建型 # 设计模式-结构型 # 设计 # 系统设计 # 设计模式之美 # 分布式 # Redis # 并发编程 # 个人成长 # 周志明的软件架构课 # 架构 # 单元测试 # LeetCode # 工具 # 位运算 # 读书笔记 # 操作系统 # MySQL # 异步编程 # 技术方案设计 # 集合 # 设计模式 # 三亚 # 游玩 # 转载 # Linux # 观察者模式 # 事件 # Spring # SpringCloud # 实战 # 实战,SpringCloud # 源码分析 # 线程池 # 同步 # 锁 # 线程 # 线程模型 # 动态代理 # 字节码 # 类加载 # 垃圾收集器 # 垃圾回收算法 # 对象创建 # 虚拟机内存 # 内存结构 # Java
源码分析:②ReentrantLock之条件锁Condition
源码分析:StampedLock之升级版的读写锁
  • 文章目录
  • 站点概览
精 灵 王

精 灵 王

青春岁月,以此为伴

106 日志
14 分类
48 标签
RSS
Github E-mail
Creative Commons
Links
  • 添加友链说明
© 2023 精 灵 王
渝ICP备2020013371号
0%