并发编程解惑之锁的应用和底层实现(并发编程解惑之锁的应用和底层实现)
Synchronized和 ReentrantLock都可用作同步,可称为同步锁。在使用前要明确一个基本概念,同步锁要加在哪里?当然是加在共享资源上,文件系统,数据库系统等自身都提供了完善的同步锁机制,不需要我们加锁。确切来说我们是把同步锁加在访问共享资源的代码上,而且访问共享对象的不同线程,加的同步锁必须是同一把锁,不然无法起到同步(排队操作共享对象)的作用,同步锁本身也一定是多个线程间的共享对象。
Java的synchronized关键字:
synchronized语法表现形式如下
synchronized(同步锁) { // 访问共享资源,需要同步的代码段 }
同步锁本身要是共享的对象
methodA() {
Object lock1 = new Object(); // 产生一个同步锁
synchronized(lock1){
// 代码段 A
// 访问共享资源 resourceA
} }
同步锁是在函数体内部产生,每一个线程调用这个方法都会产生一个新的同步锁,多个线程之间访问同一个共享资源resourceA使用的是不同的锁,无法产生互斥作用,没有意义
public static final Object lock = new Object();
methodA() {
synchronized(lock) { // lock 是公用同步锁
// 代码段 A
// 访问共享资源 resourceA
}}
同步锁是类的成员变量,类内部的各个方法可以使用,声明为public,本类外的方法也可以访问,lock不需要非得声明为public或static,只要同步相关的代码公用同一把锁,就可协调多线程并发运行。
在Java里面,任何一个Object Reference(对象引用)都可以作为同步锁,Object Reference是对象在内存的内存地址。线程同步需要用同一个内存地址。上面代码声明lock时,使用了final关键字,这就保证了lock的Object Reference在整个系统运行过程中都保持不变。(并不是Object这个对象不变,而是内存地址Object Reference不变)
我们反编译下面代码的字节码看一下synchronized关键字到底做了什么事
public class UpdateData {
private Object object = new Object();
public void insert(Thread thread){
synchronized (object) {
}
}
public synchronized void updateSyn(Thread thread){
}
public void update(Thread thread){
}
}
先打开CMD命令行,进入到类对应的目录,执行javac UpdateData.java编译java文件成字节码文件UpdateData.class,再反编译字节码文件javap -c UpdateData.class
Code:
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: aload_0
5: new #2 // class java/lang/Object
8: dup
9: invokespecial #1 // Method java/lang/Object."<init>":()V
12: putfield #3 // Field object:Ljava/lang/Object;
15: return
public void insert(java.lang.Thread);
Code:
0: aload_0
1: getfield #3 // Field object:Ljava/lang/Object;
4: dup
5: astore_2
6: monitorenter
7: aload_2
8: monitorexit
9: goto 17
12: astore_3
13: aload_2
14: monitorexit
15: aload_3
16: athrow
17: return
Exception table:
from to target type
7 9 12 any
12 15 12 any
public synchronized void updateSyn(java.lang.Thread);
Code:
0: return
public void update(java.lang.Thread);
Code:
0: return
}
synchronized代码块实际上多了monitorenter和monitorexit两条指令(Monitors,也称为监视器)。monitorenter指令执行时会让对象的锁计数加1,而monitorexit指令执行时会让对象的锁计数减1,用来控制多个线程对临界资源(共享资源)的访问。
可以把执行monitorenter指令理解为加锁,执行monitorexit理解为释放锁。 每个对象维护着一个存储着被锁次数的计数器。未被锁定的对象的该计数器值为0,当一个线程获得锁(执行monitorenter)后,该计数器自增1变为 1 (当同一个线程再次获得该对象的锁的时候,计数器再次自增)。当同一个线程释放锁(执行monitorexit指令)的时候,计数器会自减1变为0。这时锁会被释放,其他线程就可以获取锁。
特别注意一点:对于synchronized方法或者synchronized代码块,当出现异常时,JVM会自动释放当前线程占用的锁,因此不会由于异常导致出现死锁现象。
我们还可以将同步锁改成public static final byte[] lock = new byte[0]; 编译后的字节码中,生成零长度的byte[]字节数组对象只需3条操作码,而Object lock = new Object()则需要7行操作码,这种锁占空间小,比较经济。
1.2 Synchronized的实现
Synchronized把任意一个非NULL的对象当作锁,它有多个队列,当多个线程一起访问某个对象监视器,也就是锁,对象监视器会将这些线程存储在不同的容器中。
Contention List:所有请求锁的线程首先被放在这个竞争队列中;
Entry List:Contention List中那些有资格成为候选资源的线程被移动到Entry List中;
Wait Set:那些调用了wait方法被阻塞的线程被放到这里;
OnDeck:任意时刻,最终最多只有一个线程正在竞争锁,该线程被称为OnDeck(候选者);
Owner:当前已经获取到锁的线程被称为Owner(拥有者);
!Owner:当前未取得锁的线程。
并发情况下,ContentionList会被大量的并发线程不断的访问,为了降低对尾部元素的竞争,JVM会将一部分线程移动到EntryList中作为候选竞争线程,JVM再从队列尾部取出一个用于锁竞争的候选者(OnDeck)。拥有锁的线程(Owner线程)会在释放锁时,将ContentionList中的部分线程迁移到EntryList中,并指定EntryList中的某个线程为OnDeck线程(通常是最先进去的那个线程)。Owner线程把锁竞争的权利交给OnDeck线程,让OnDeck线程重新竞争锁。 OnDeck线程获取到锁后会变为Owner线程,如果Owner线程被wait方法阻塞,则转移到WaitSet队列中,直到被其他线程调用notify或者notifyAll唤醒,才会重新进去EntryList中, 处于ContentionList、EntryList、WaitSet中的线程都处于阻塞状态。Synchronized是非公平锁,后申请锁的线程可能比先申请锁的线程更早的获取到锁。
加同步锁,要注意尽量缩小同步粒度,synchronized methodA ( ){ //code...}等同于synchronized ( this){//code ...},同步锁是对象实例。但static synchronized methodA ( ){ //code...} 给静态方法加锁,等同于synchronized ( Class.forName( )){//code ...},同步锁是类本身。synchronized修饰一个代码块:synchronized(obj) { //code.... }。表示一个线程要执行该代码块,必须获得obj锁。
加同步锁,要注意细分锁,减少对锁的竞争,方法1和2访问的是同一个资源A,方法3和4访问的是同一个资源B,那么这四个方法不可能只用一把同步锁,应该对资源A竞争的分配同一把锁,对资源B竞争的分配另外一把锁,这样就避免四个方法同时申请同一把锁,减少竞争概率,减少系统开销。
同步锁模型只是简单的同步方式,同一时刻,只有一个线程能够运行同步代码,适用于简单的场景, 在复杂的的场景下采用信号量模型进行同步更为合适。
信号量模型:
public static final Object signal = new Object();
f1() {
synchronized(singal) { // 获取了signal这个信号量(同步锁)之后,才可进入这段代码
// 放弃信号量,当前线程开始wait这个signal对象,本线程要进入signal信号量的等待队列(Waiting Queue)
signal.wait();
// 等到其他线程执行 signal.notify()释放通知之后,本线程从待召(Waiting)队列转到就绪(Ready)队列,向CPU申请同步锁singal,持有后才可继续运行
}
}
f2() {
synchronized(singal) { // 获取了signal这个信号量(同步锁)之后,才可进入这段代码
// 本线程通知正在等待signal信号量的其他线程,也就是通知signal的等待队列(Waiting Queue)中的某个线程。
signal.notify();
// 某个线程等到了这个通知,那个线程就会转到就绪队列(Ready Queue)中 ,开始向CPU竞争同步锁singal,持有后才可继续执行
// 本线程仍然继续持有signal这个同步锁,所以本行程进行执行,直到运行完了synchronized体里的代码,才释放同步锁singal
}
}
任何一个Object Reference也可以作为信号量,Java中的Object类是所有类的父类,所以自定义的对象都继承了wait()/notify()/notifyAll()方法。notify方法通知哪个线程无法确定,notifyAll方法会通知等待队列里的所有线程。
1.3 synchronized的缺陷使用synchronized做同步,一个线程获取了锁,其他线程只能等待锁的释放,获取了锁的线程如果有耗时的IO操作,被阻塞了,其他线程只能一直等待,效率很低。要么等到线程执行完代码,释放占用的锁;要么线程执行异常,JVM让线程自动释放锁。这时就需要让等待的线程可响应中断(或者只等待一段时间),不再等待,继续执行其他任务,通过Lock可以实现
使用synchronized,但多线程之间都是读操作,那一个线程读,其他线程就没办法读,而Lock是允许共享读,多线程都是读时,可以并发进行
当synchronized方法执行完之后,系统会自动让线程释放锁;而Lock则必须要用户去手动释放锁,一般在finally块里unlock释放锁,让释放的动作最后都执行。而且通过Lock可知道线程有没有成功获取到锁,而synchronized却不行。
需要注意的是,在JDK1.6后对synchronized做了很多的优化,性能得到了很大的提升。
public class MyLock {
private Lock lock = new ReentrantLock();
//Lock lock=new ReentrantLock(true);//公平锁
//Lock lock=new ReentrantLock(false);//非公平锁
private Condition condition=lock.newCondition();//创建 Condition条件对象
public void testMethod() {
try {
lock.lock(); //lock 加锁
condition.await(); //必须先执行 lock.lock 方法获得锁,才能使用lock对象创建的Condition对象来让线程wait,
condition.signal(); //condition对象的signal方法可以唤醒wait指定条件的线程
//业务代码
}
catch (interruptedException e) {
e.printStackTrace(); }
finally { lock.unlock(); } }}//确保出异常后也能释放锁
Condition 类和 Object 类中锁方法的区别:
Condition类的awiat方法等效Object类的wait方法
Condition类的signal方法等效Object类的notify方法
Condition类的signalAll方法等效Object类的notifyAll方法
ReentrantLock类可以唤醒指定条件的线程,而object是随机唤醒线程
ReentrantLock是唯一实现了Lock接口的类,并且ReentrantLock提供了更多的方法(公平锁、非公平锁等)。ReentrantLock是可重入锁,除了能完成synchronized所能完成的工作外,还提供了可响应中断的锁,定时锁等可避免线程死锁的方法。
public interface Lock {
//调用后一直阻塞到获得锁
void lock();
//线程获取锁时,在等待状态可被中断,防止死锁
void lockInterruptibly() throws InterruptedException;
//tryLock()有返回值,它尝试获取锁,如果成功,返回true,如果失败(锁已被其他线程占用),则返回false,防止死锁
boolean tryLock();
//类似tryLock(),不过不是获取不到就立刻失败,而是在特定时间内获取不到才返回false,防止死锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//释放锁
void unlock();
//生成条件对象,除了要获得锁,还要满足条件
Condition newCondition();
}
tryLock获取锁的正确用法:
Lock lock = new ReentrantLock();
if(lock.tryLock()) {
try{
//业务代码
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}
}else {
//如果获取失败,则处理其他事情
}
在Java中,synchronized就不是可中断锁,而Lock是可中断锁。对于阻塞方法(如 wait() 和 sleep() ),当另一个线程调用interrupt()中断该线程时,该线程会从阻塞状态退出并且抛出中断异常,这时可以捕获中断异常并做对应处理。线程A获得了锁进入了同步代码块中,但由于条件不足调用 wait() 方法阻塞了,这时线程B可执行 threadA.interrupt()请求中断线程A,线程A会抛出InterruptedException,我们可以在try catch 中捕获异常并做相应处理。(可进一步往上抛出,将异常进一步扩散,不要生吞异常,否则事务不生效,事务只有捕获到异常才会回滚,事务捕获到异常回滚后还会继续抛出异常,这时可以通过统一异常处理catch住异常并记录日志)
interrupt方法会将调用该方法的线程的中断状态设置为true,让被阻塞的线程抛出Interrupted Exception异常,最后将中断状态为false。
/**
* 类InterruptTest
*/
public class InterruptTest {
private Lock lock = new ReentrantLock();
public static void main(String[] args) {
InterruptTest interruptInstance = new InterruptTest();
TestThread threadA = new TestThread(interruptInstance);
TestThread threadB = new TestThread(interruptInstance);
threadA.start();
threadB.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
threadB.interrupt();
}
public void insert(Thread thread) throws InterruptedException{
lock.lockInterruptibly(); //获取锁,等待锁时,可被中断
try {
System.out.println(thread.getName() "得到了锁");
long startTime = System.currentTimeMillis();
for( ; ;) {
if (Thread.interrupted()) {//检测当前线程是否被中断,如果是返回true,
//然后中断状态会被重置false
System.out.println(thread.getName() "线程被中断...");
break; //跳出一层for循环
}
if(System.currentTimeMillis() - startTime >= Integer.MAX_VALUE)
break;
//插入数据
}
// 尽管线程被中断,但并没有结束运行,这行代码还是被执行。
// 如果想线程立刻结束可将break改成return
System.out.println(thread.getName() "继续执行...");
}
finally {
System.out.println(Thread.currentThread().getName() "执行finally");
lock.unlock();
System.out.println(thread.getName() "释放了锁");
}
}
}
/**
* 类TestThread
*/
class TestThread extends Thread {
private InterruptTest interruptInstance = null;
public TestThread(InterruptTest interruptInstance) {
this.interruptInstance = interruptInstance;
}
@Override
public void run() {
try {
interruptInstance.insert(Thread.currentThread());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() "被中断");
}
}
}
输出:
Thread-1得到了锁 Thread-1线程被中断... Thread-1继续执行... Thread-1执行finally Thread-1释放了锁 Thread-0得到了锁
先是B线程(Thread-1)得到了锁,然后主线程调用threadB.interrupt()中断线程B,最后线程A(Thread-0)获得锁。
Thread.isInterrupted()同样也可以检测线程是否被中断,线程一旦被中断,该方法返回true,但中断状态不会被清除,也就是不会被置为false。
每个线程都有一个与之相关联的 Boolean 属性:中断状态(interrupted status)。中断状态初始时为 false,当另一个线程通过调用ThreadA.interrupt()中断ThreadA线程时,如果A线程在执行一个可被中断的阻塞方法,如Thread.sleep(), Thread.join()或 Object.wait(),那么A线程将取消阻塞(等待)并抛出InterruptedException。否则,interrupt() 只是设置线程的中断状态为true,然后继续运行。在继续执行的代码中可以读取中断状态,如果为true,看是否需要跳出循环break或退出方法return(很多时候我们并不希望线程立刻停止,因为代码执行了一半,数据是不完整的,我们可以轮询中断状态,在工作处理好后,才退出)。中断状态可以通过 Thread.isInterrupted()来读取并返回,也可以通过,Thread.interrupted() 来返回中断状态并清除中断状态(置为false)。
不是所有的的阻塞方法都会抛出InterruptedException。输入和输出流类会阻塞等待 I/O 完成,在被中断的时不会提前返回,也不会抛出中断异常。但如果一个线程关闭套接字Socket,那么套接字上的阻塞 I/O 操作将提前结束,并抛出一个SocketException。获取一个内部锁的操作(synchronized)是不能被中断的,但是ReentrantLock支持可中断的获取锁模式。
/**
* 类SocketInterrupt
*/
public class SocketInterrupt extends Thread {
public static final int BUF_SIZE = 1024;
Socket socket;
InputStream in;
public SocketInterrupt(Socket socket) throws IOException {
this.socket = socket;
this.in = socket.getInputStream();
}
//继承Thread类并重写interrupt方法
@Override
public void interrupt() {
try {
socket.close();
} catch (IOException e) {
} finally {
super.interrupt();//最后中断父类.通知父类
}
}
@Override
public void run() {
try {
byte[] buf = new byte[BUF_SIZE];
while (true) {
int count = in.read(buf);
if (count < 0) {
break;
} else if (count > 0) {
//处读取到的流数据
}
}
} catch (IOException e) {
}
}
}
通过改写继承于Thread的interrupt方法,当调用interrupt方法时,会关闭socket,如果此时read方法阻塞,那么会抛出IOException异常,相当于结束线程。
2.3 读写锁ReentrantReadWriteLock如果一个线程已经占用了写锁,则其他线程申请写锁或者读锁,则申请的线程会一直等待写锁的释放。如果一个线程已经占有了读锁,其他线程申请写锁,则申请的线程会一直等待读锁的释放;如果一个线程已经占有了读锁,其他线程申请读锁,申请的线程不需要等待读锁的释放。在读多于写的情况下,读写锁能够提供比独占锁更好的并发和吞吐量。
如果当前进程拥有写锁,那么可以降级为读锁。而如果只持有读锁,是不能升级为写锁的。一个线程获取了写锁,但更改了数据后,需要感知数据的变更,这时可将写锁降级为读锁。而线程获取的是读锁,是不可能升级读锁为写锁去更改数据的。
class CachedData {
Object data;
volatile boolean cacheValid; //表示缓存是否有效,true表示有效
ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
//处理缓存数据
void processCachedData() {
//获取读锁
rw.readLock().lock();
//如果缓存无效,进入if块,更新缓存
if (!cacheValid) {
//获取写锁前须释放读锁
rw.readLock().unlock();
rw.writeLock().lock();
// 再次检查缓存有效性,因为另外一个线程有可能已经获取到了写锁并更新了缓存状态
if (!cacheValid) {
data = ...
//读取最新的数据并赋值给data,设置缓存状态为有效
cacheValid = true;
}
//锁降级,在释放写锁前获取读锁
rw.readLock().lock();
// 释放写锁,但仍然持有读锁
rw.writeLock().unlock();
}
//到这里,缓存都是有效状态,直接使用data
use(data);
//释放读锁
rw.readLock().unlock();
}
}
ReentrantLock可重入锁内部有3个类,Sync、FairSync和NonfairSync,ReentrantLock源码分析:
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
//Sync继承队列同步器AbstractQueuedSynchronizer
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//Sync是一个继承AQS的抽象类,使用独占锁,复写了tryRelease方法。
//Sync复写了tryRelease方法,它的2个子类FairSync(公平锁)和NonfairSync(非公平锁)没有再次复写这个方法
//公平锁和非公平锁释放锁的操作相同:唤醒等待队列里第一个被挂起的线程
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
final ConditionObject newCondition() {
return new ConditionObject();
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0);
}
}
//非公平锁NonfairSync继承Sync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 非公平锁直接对状态位state进行cas操作(无锁,旧值和预期值相同,则设置新值),成功则就获取锁,
//不成功就挂起线程到等待队列
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//tryAcquire方法由Sync的两个子类FairSync(公平锁)和NonfairSync(非公平锁)实现。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
//公平锁FairSync继承Sync
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
// acquire方法内部调用下面的tryAcquire方法
acquire(1);
}
//tryAcquire方法由Sync的两个子类FairSync(公平锁)和NonfairSync(非公平锁)实现。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// !hasQueuedPredecessors()判断,表示是否有线程在队列里等待的时间比当前线程要长,如果没有,成功获取锁
//如果有等待时间更长的线程,那么当前线程放弃获取锁,没有获取到锁的线程,会被挂起到等待队列里
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public String toString() {
Thread o = sync.getOwner();
return super.toString() ((o == null) ?
"[Unlocked]" :
"[Locked by thread " o.getName() "]");
}
}
Sync类的内部使用state变量来维护多个读线程和一个写线程的状态。state有高16位和低16位,高位表示写锁,低位表示读锁。如state值为000000000000010 0000000000000011,那么高16位000000000000010表示读锁被获取的总数为2次,低16位0000000000000011表示当前线程获取了3次写锁,当前线程重入写锁2次。
公平锁和非公平锁释放锁的方式相同,但获取锁的方式不同。非公平锁获取锁通过抢占的方式获取,不考虑已有线程的等待时间,某个线程获取了锁,其他线程进入等待队列等待。公平锁获取锁时,某个线程获取了锁,其他线程会被挂起到等待队列里。新来的线程等待时间比旧的线程等待时间短,就会放弃获取锁,进入等待队列,所以有多个线程在等待一个锁时,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该锁。
ReentrantLock和ReentrantReadWriteLock在构造函数中提供了 是否公平锁的初始化方式,默认情况下是非公平锁。JVM 按随机、就近原则分配锁的机制则称为非公平锁,非公平锁实际执行的效率要远高于公平锁,除非特殊情况,不然都是采用非公平锁。
tryLock 、 lock 、lockInterruptibly 的区别:
- tryLock 获得锁就返回 true,否则立即返回 false,使用tryLock(long timeout,TimeUnit unit) 去获取锁时,超过该时间段还没获得锁,返回 false
- lock 能获得锁就返回 true,否则一直等待,直到获得锁才返回true
- lock 和 lockInterruptibly,两个线程分别执行这两个方法,此时这两个线程被中断了, lock 不会抛出异常,而 lockInterruptibly 会抛出异常
我们知道sychronized加锁时,会调用对象监视器Monitor的enter方法,解锁的时候会调用exit方法。事实上,synchronized一般不会直接调用enter和exit,实际中sychronized获取锁的过程:
java中使用synchronized关键字来实现同步功能,被synchronized修饰的方法,在多线程的情况下不能同时执行,只能串行执行
在JDK1.5之前,实现同步就像上面所说的一样,线程A和B争抢同步锁,如果线程A成功获取锁,那么线程B就会从运行状态(Running)变成阻塞(Blocked)状态,等到线程A退出了同步代码块,线程B获取锁后,才能恢复到运行状态,执行代码
到JDK1.6版本,这种简单的运行-阻塞-运行模式会导致操作系统频繁切换线程的上下文,效率不高,于是引入了偏向锁,轻量级锁,自旋锁和重量级锁等概念,来提高同步锁的效率。synchronized用法不变,但JVM拿不到锁时不会直接阻塞
当锁的竞争很少时,JVM使用偏向锁,这基本就是没加锁;当锁竞争很激烈时,JVM会按照偏向锁,轻量级锁,自旋锁,重量级锁的顺序不断升级(锁的膨胀)的方式去处理同步锁。越到后面锁占用的资源越高,效率越低。
Java对象保存到内存中时,有对象头、实例数据、对齐填充字节三部分组成。而对象头又由Mark Word 、指向类的指针、数组长度(只有数组对象才有)三部分组成。其中最重要的是Mark Word,记录了对象和锁的相关信息,当一个对象被synchronized关键字当成同步锁时,围绕这个锁的一系列操作都和Mark Word有关。
Mark Word在32位JVM中的长度是32bit,在64位JVM中长度是64bit。在不同的锁的状态下,Mark Word的存储内容也不一样,而在32位的JVM中,其存储如下:
无锁和偏向锁的锁标志位都是01,只是前面有1位来标记是否为偏向锁,0为无锁,1为偏向锁
JVM获取synchronized锁的过程和锁的对象头中Mark Word的变化:
当一个对象没有被当成锁对象时,只是个普通的对象,Mark Word记录对象的HashCode,锁标志位设为01,是否偏向锁设为0。
当对象成为同步锁并且被线程A抢到锁,锁标志位还是01,是否偏向锁变成1,开头的23bit记录持有锁的线程id,这时锁对象已经进入偏向锁(Baised
Lock)状态,可以执行同步锁的代码。所谓偏向,就是偏心,锁会偏向当前已经占有锁的线程。当有其他线程请求相同的锁时,偏向模式会结束。
当线程B试图获得这个锁时,JVM发现同步锁处于偏向状态,但是Mark
Word中记录的线程id并不是B线程的id,那么线程B会先用CAS操作(乐观锁)试图获得锁,如果抢锁成功,则把Mark
Word里的线程id改为线程B的id,同时B线程可执行同步代码。
如果线程获取偏向锁失败,代表有轻微竞争,那偏向锁就会升级为轻量级锁,JVM会在当前线程的线程栈中开辟一块单独的空间叫锁记录(Lock
Record),里面保存对象锁Mark Word的内存地址,同时在对象锁的Mark
Word中保存锁记录空间的地址。这两个保存操作都是CAS操作,如果保存成功,代表线程抢到了同步锁,接着就可以把Mark
Word中的锁标志位改成00,表示成功获取轻量级锁,接着可以执行同步锁代码
如果线程获取轻量级锁失败,代表竞争激烈,JVM会升级为获取自旋锁(SpinLock),自旋锁代表线程会不断的重试获取锁。从JDK1.7开始,自旋锁的参数被取消,虚拟机不再支持由用户配置自旋锁,自旋锁会自动执行,自旋的时间也由虚拟机自动调整,自旋时间一般由上一次在同一个锁上的自旋时间和锁的拥有者的状态来决定
如果线程获取自旋锁失败,同步锁会升级至重量级锁,锁标志位改为10。在此状态下,如果获取锁失败,线程会被挂起,进入阻塞状态。
互斥锁(如synchronized)被称为重量级锁,通过操作系统的互斥信号量模型来同步。这种同步方式,在需要阻塞或唤醒一个线程时都需要操作系统协助,这时程序运行需要从用户态切换到核心态,而状态转换很消耗CPU时间,有时甚至比用户代码执行时间还长,所以称互斥锁为重量级锁。
自旋锁只是将当前线程不停的循环,不会改变当前线程的状态,时刻检查共享资源是否可以被访问,响应很快。而线程被阻塞,相当于线程放弃了CPU执行时间,进入等待区,等待被唤醒,需要线程的上下文切换,响应较慢。但如果线程竞争很激烈,每个线程都需要执行空循环,并且占用CPU执行时间,那CPU资源开销很大,这时反而会造成性能严重下降,自旋锁适用于锁竞争不激烈的场合,如果多个线程要频繁争取同一把锁,则不适合自旋。
上面说的都是JVM内部的锁优化方案,由JVM自动处理,属于系统层优化,我们也可以在应用层做锁的优化
减少锁的粒度:将不同的资源分配不同的锁,不同线程间操作相互独立,ConturrentHashMap类把map分成多个段,每个段就有一把锁,用空间换时间,不同段的数据可以同时被修改。
锁分离:将不相关的操作分配不同的锁,如ReenTrantLockd的读锁和写锁分离,如LinkedBlockingQueue类,从队列头获取数据的take()方法和从队列末尾添加数据的put()方法分别使用不同的锁,取数据和加数据互不影响。
锁粗化(Lock Coarsening):锁粗化是合并使用相同锁对象的相邻同步块的过程。连续多个synchronized语句块,或循环中的synchronized语句块,用的是同一个对象作为锁,那么可以将多个synchronized语句块合并成一个,将synchronized语句块提到循环外部。
锁消除(Lock Elision):在编译代码时,如果同步块所使用的锁对象通过逃逸分析被证实只能够被一个线程访问,那么JIT编译器在编译这个同步块的时候就会取消对这部分代码的同步(锁)。这个取消同步的过程就叫同步省略,也叫锁消除。
这些优化仅在Java虚拟机server模式下起作用(即运行Java程序时需要在命令行中指定Java虚拟机参数“-server”以开启这些优化)
JIT即时编译:
一个java源代码文件变成计算机可执行指令过程中,先是通过javac编译器把.java文件转换成.class字节码文件,JVM再通过解释字节码将其翻译成机器指令,逐条读入,逐条翻译,这是解释执行,必然比可执行的二进制文件慢很多,所以JVM引入了JIT编译器(即时编译)。JIT编译器会把使用频繁的热点代码直接翻译成机器指令,方便重复执行,避免再次解析字节码。
JIT逃逸分析:
在动态编译同步块的时候,JIT编译器可以借助逃逸分析(Escape Analysis)来判断同步块所使用的锁对象是否只能够被一个线程访问而没有被发布到其他线程。
下面的代码:
public void f() {
Object instance = new Object();
synchronized(instance) {
System.out.println(instance);
}
}
instance对象的生命周期只在方法f()内,不会被其他线程访问到,不存在逃逸,在JIT编译阶段会被优化成
public void f() {
Object instance = new Object();
System.out.println(instance);
}
使用的synchronized锁被去除,经过JIT逃逸分析后认为线程安全,便做了锁消除的优化
下面的代码:
public static void main(String[] args) {
createPerson();
}
private static void createPerson() {
Person per = new Person(18,1);
}
class Person{
private int age;
private int id;
}
Person对象没有逃逸出createPerson方法,该对象不会被其他线程访问,而且Person对象可以拆解成标量的,经过标量替换后createPerson方法变成:
private static void createPerson() {
int age = 18;
int id = 1;
}
标量替换可以减少堆内存的占用,因为不需要创建对象,就不用分配堆内存给对象实例了。
但是如果createPerson存在逃逸,JIT编译器是不会做标量替换的
private static Person createPerson() {
Person per = new Person(18,1);
return per; //存在逃逸,无法优化
}
private static String createPerson() {
Person per = new Person(18,1);
return per.toString;//不存在逃逸,可以优化
}
从JDK 1.7开始已经默认开启逃逸分析,如果要关闭,需要指定-XX:-DoEscapeAnalysis启动参数。
我们来总结下Java中锁的类型:
并发编程—锁的底层实现篇4.1 锁的底层实现:AQS抽象队列同步器
基于AQS构建的同步器有ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,这些同步器核心是原子状态state的获取和释放,只是获取和释放的条件不同而已。
ReentrantLock源码里的state变量记录当前线程获取锁的次数,如果state值为0,表示当前线程已经释放锁,如果为1,表明已经获得锁,若大于1,说明当前线程已经多次获得锁,重入了代码,我们分析ReetranLock的公平锁FairSync的获取锁tryAcquire的实现
private volatile int state; //获取锁的次数
protected final int getState() {//获取重进入数(获得锁的次数)
return state;
}
protected final void setState(int newState) {//设置重进入数(获得锁的次数)
state = newState;
}
//公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//当前线程已经释放了锁,c为0,则有机会获得锁
if (c == 0) {
//如果线程是队列的第一个等待者,并且设置重进入数state成功,那么当前线程成功获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//当前线程已经持有锁,如果叠加重进入数成功state,那么当前线程继续获得锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//以上条件都不满足,就进入等待队列
return false;
}
}
Semaphore则是记录有多少次许可可以用,若为0则需要等待,通过许可来控制并发量。Semaphore默认的许可为1,相当于一把互斥锁。我们分析Semaphore的公平锁FairSync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
//如果当前等待队列的第一个线程不是当前线程,则需要等待,并返回负值-1
if (hasQueuedPredecessors())
return -1;
//当前线程是队列第一个等待者,获取当前可用许可数
int available = getState();
//当前可用许可数减去线程需要的许可数
int remaining = available - acquires;
//如果剩余许可数remaining < 0,返回负值,表示当前线程需要等待。如果剩余许可数remaining > 0
//并且CAS设置state为remaining成功,则返回大于0的remaining,告知AQS当前线程成功拿到许可,可继续执行
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
CountDownLatch 闭锁可以设定初始值,在这个初始值递减为0之前,所有的线程都会被挂起。若初始值设为3,那每次调用countdown方法都会对初始值减去1,调用3次把初始值变为0,此时允许所有等待的线程继续执行
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
private final Sync sync;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() { sync.releaseShared(1);}
//await执行时会调用tryAcquireShared查看state数量,如果是0,所有线程继续执行,如果非0,所有线程继续等待
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
//countDown方法执行时会调用tryReleaseShared方法,如果state本身就是为0,返回false
//表明不需要唤醒被挂起的线程,因为没有在等待的线程
if (c == 0)
return false;
//如果不为0,则先减去1,CAS设置剩下的数量nextc,如果设置成功,返回nextc == 0,为0返回true,释放所有
//在等待的线程,不为0返回false,继续挂起等待的线程
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
}
Future Task 用state记录任务的状态
//当调用Future Task实例的get方法时,内部类Sync会去调用AQS的acquireSharedInterruptibly()方法,
//而这个方法会反向调用Sync实现的tryAcquireShared()方法
//该方法只检查当前任务是否完成或者被取消(Cancel),如果未完成而且也没有被取消,则挂起当前线程到等待队列,
//例如线程若是RUNNING状态那么让当前线程挂起,进入阻塞状态
protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1;
}
//判定任务是否完成或者被取消
boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}
//任务的执行线程执行完毕调用该方法
void innerSet(V v) {
for (;;) {
int s = getState();
//如果线程任务已经执行完毕,那么直接返回
if (s == RAN)
return;
//如果被取消了,那么释放等待线程,并抛出异常
if (s == CANCELLED) {
releaseShared(0);
return;
}
//如果成功设置任务状态为已完成,那就释放等待线程(调用get()方法而阻塞的线程)
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
done();
return;
}
}
}
### 4.2 AQS的源码分析 ###
AQS提供给了子类一个int state属性,并且暴露给子类getState()和setState()两个protected方法。RetrantLock用state记录当前线程的重入数,Semaphore则用state保存许可数,CountDownLatch用state保存需要被countDown的次数,而Future用state存储当前任务的执行状态(RUNING、RUN、CANCELL)。同理,其他的同步器(Synchronizer)可用来存储它们所需的状态。我们查看队列同步器AbstractQueuedSynchronizer的源码(AQS是采用模板方法模式实现的同步器基类):
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() { }
//提供给子类的一个state属性
private volatile int state;
//暴露给子类getState()和setState()两个protected方法
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
//抽象类AbstractQueuedSynchronizer(AQS)留给继承它的子类去实现的方法有5个
//其中tryAcquire,tryRelease和isHeldExclusively三个方法为需要独占形式获取同步锁的方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
//而tryAcquireShared和tryReleasedShared两个方法为需要共享形式获取同步锁的方法
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
java.util.concurrent.locks.AbstractQueuedSynchronizer.Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
}
AQS维护了一个volatile int state(代表共享资源锁)和一个FIFO(先进先出)线程等待队列CLH队列(多线程争用资源被阻塞时会进入此队列)。CLH队列是一个非阻塞的 FIFO 队列,在并发条件下,往队列里面插入或移除一个节点的时候不会阻塞,插入和移除操作都是通过自旋锁和CAS保证原子性。
state变量的访问方式有三种:
getState()
setState()
compareAndSetState()
红色和蓝色的长方形代表Node结点,Node结点是对每一个访问同步代码的线程的封装,包含了线程的状态,如是否被阻塞,是否等待唤醒,蓝色的结点是等待状态,用waitStatus表示,主要有3种取值CANCELLED、SIGNAL、CONDITION。
static final class Node {
/** 代表线程已经被取消*/
static final int CANCELLED = 1;
/** 代表当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行 */
static final int SIGNAL = -1;
/** 代表结点的线程等待在Condition上/
static final int CONDITION = -2;
//AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。
/** 标记是共享模式*/
static final Node SHARED = new Node();
/** 标记是独占模式*/
static final Node EXCLUSIVE = null;
/**
* 状态位 ,分别可以使CANCELLED、SINGNAL、CONDITION、0 (值为0,代表初始化状态)
*/
volatile int waitStatus;
/**
* 前置节点
*/
volatile Node prev;
/**
* 后续节点
*/
volatile Node next;
/**
* 节点代表的线程
*/
volatile Thread thread;
/**
*连接到等待condition的下一个节点
*/
Node nextWaiter;
}
从AQS源码可以看到,独占模式EXCLUSIVE初始为null,而共享模式SHARED则指向一个Node对象,这时因为独占模式只有一个线程能拿到Lock,exclusive永远指向同步队列的第一个节点,只有释放时设置为null,所以可初始化EXCLUSIVE为null值。而share模式可以有多个线程同时获得锁,一开始可先创建一个共享结点(new Node())充当头结点(head)。
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
自定义同步器在实现时只需要实现共享资源state的获取与释放方式,线程等待队列的维护(如获取锁失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时的主要方法:
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它,ReentrantLock需要实现该方法。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
ReentrantLock内部类Sync实现的是tryAcquire、tryRelease、isHeldExclusively三个方法;CountDownLatch、Semaphore、FutureTask各自的内部类Sync实现的是tryAcquireShared和tryReleasedShared两个方法。其中Semaphore与CountDownLatch,因为公平性问题,tryAcquireShared由其各自的内部类FairSync(公平锁)和各自的内部类NonfairSync实现(非公平锁)。
我们回看上面贴出的AQS代码的acquire( )方法,独占模式锁的获取过程:
public final void acquire(int arg) {
//tryAcquire()尝试去获取锁,如果成功返回true,经过取反为false,&&是短路判断,第一个为false
//整个都为false,acquireQueued()不会执行,最后返回。如果失败返回false,取反为true,还得执行acquireQueued()方法
if (!tryAcquire(arg) &&
//acquireQueued()使线程在等待队列中休息,被唤醒时会去尝试获取锁
acquireQueued(
//获取失败,addWaiter()将该线程加入等待队列的尾部,并标记为独占模式
addWaiter(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
//线程在等待过程不响应中断,只有获取了锁后才进行中断
selfInterrupt();
}
共享模式锁的获取过程:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
AQS里共享模式获取锁时调用acquireShared方法,进而调用tryAcquireShared方法,但该方法是直接抛异常的,因为AQS只是一个框架,具体锁的获取和释放都是交给自定义的实现类去处理,能否重入,能否阻塞都由子类实现。在AQS源码里,tryReleaseShared没被定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么独占模式或共享模式的实现里都需要实现所有的四个方法,会带来不必要的工作量。
共享模式中锁的获取可以分为入队和出队两个过程:
入队列:线程获取锁失败后,创建一个节点,并将节点添加到等待队列尾,线程会被挂起,等待唤醒;
出队列:当另外一个线程释放锁,取队列的第一个节点,将节点对应线程唤醒,被唤醒后的线程将尝试获取锁,成功后将当前节点设置为头节点,此时已经不在等待队列里,如果还存在空闲的锁,则线程会唤醒下一个节点,即使有多把锁,都只会唤醒下一个节点,其他的节点由它前一个节点负责唤醒,以减少锁的竞争
提供volatile变量state用于线程之间共享状态。通过 CAS 和 volatile 保证对state变量操作时的原子性和可见性
/**
* 同步状态
*/
private volatile int state;
/**
*CAS
*/
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS虽然实现了acquire和release方法(可能阻塞),但是里面调用的tryAcquire和tryRelease方法(不阻塞)是由子类来定制实现的,同步状态state的获取,设置操作是由子类实现的。
CountDownLatch将任务划分为N个线程去执行,state需要被初始化为N,和线程数量一致,这N个线程并发执行,每个线程执行完毕后调用方法countDown()一次,通过CAS方式将state减1,等N个线程都减1后,state变为0,这时主调用线程被释放(unpark),从调用await()方法处返回,继续执行。
ReentrantLock会先将state初始化为0,表示释放了锁,线程A调用lock()方法,会调用tryAcquire()独占该锁并将state 1,此后其他线程调用tryAcquire()变会失败,直到线程A调用unlock()方法使得state变为0,其他线程才能获取到锁。而且线程A可以多次tryAcquire()重复获得锁(可重入),但获取锁和释放锁的次数要相同,使得state变回0
自定义同步器要么是独占方式,要么是共享方式,只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared两种中的一种就行。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock,其中的写锁是独占方式,读锁是共享方式。 AQS应用
总结来说,AQS有三个过程:
原子操作同步状态(维护state);
阻塞或者唤醒一个线程;
内部维护一个队列;
我们用AQS自定义一个独占锁:
public class ExcluLock implements Lock, java.io.Serializable {
// 自定义同步器Sync
private static class Sync extends AbstractQueuedSynchronizer {
// 判断是否锁定状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 尝试获取锁,成功则返回true,否则false,无论成功还是失败,都立即返回。
public boolean tryAcquire(int acquires) {
assert acquires == 1; // 限定只能有1个许可
//调用父类AQS的CAS方法
if (compareAndSetState(0, 1)) {//state与期望值0比较,相同才设置state为1
setExclusiveOwnerThread(Thread.currentThread());//设置当前线程独占锁
return true;
}
return false;
}
// 尝试释放锁,成功则为true,否则false,然后立即返回。
protected boolean tryRelease(int releases) {
assert releases == 1; // 释放的许可数量为1
if (getState() == 0)//如果state为0,表明没有锁,那必然是错误的,抛异常,因为走到释放锁这,是持有锁的,state不能为0
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);//没有线程占用锁
setState(0);//释放锁后,state从1变成0
return true;
}
}
// 继承于AQS的自定义同步器
private final Sync sync = new Sync();
//lock<调用自定义同步器sync的acquire。获取锁,获取失败会等待,直到成功才返回
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
//tryLock调用自定义同步器sync的tryAcquire。尝试获取锁,成功返回true,失败返回false,无论成功失败都立即返回。
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
//unlock调用自定义同步器sync的release,release里再调用tryRelease(arg)。直接释放锁。
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return null;
}
//isLocked调用自定义同步器sync的isHeldExclusively,判断锁是否被占用
public boolean isLocked() {
return sync.isHeldExclusively();
}
public static void main(String[] args) {
ExcluLock lock = new ExcluLock();
Boolean flag = lock.tryLock();
System.out.println(flag);
}
}
ExcluLock是一个独占模式的锁,AQS的state有两种状态,0表示未锁定,1表示锁定。自定义的同步器Sync会定义成内部类,供同步类ExcluLock使用。而同步类ExcluLock只需要实现Lock接口,让外界通过ExcluLock lock = new ExcluLock();使用。上面main函数输出为true,表示线程成功获取锁。而Sync只实现资源state的获取-释放方式tryAcquire-tryRelelase,至于线程的排队、等待、唤醒等,基类AQS已经统一实现。ReentrantLock、CountDownLatch、Semphore这些同步类的区别就在获取-释放资源的方式tryAcquire-tryRelelase的条件不一样,其他都差不多。
4.5 AQS实现共享锁上面实现了独占锁,永远只有一个线程能占有锁,下面我们来实现共享模式的锁,共享锁。假如库存只有10个商品,而同时会有100个线程请求扣减库存,每个线程只能扣减一个商品,那么我们可以做服务端的限流,让资源同时只能被10个线程访问。
public class ShareLock implements Lock {
private class ShareLockAQS extends AbstractQueuedSynchronizer {
protected ShareLockAQS(Integer count) {
super();
setState(count);
}
@Override
protected int tryAcquireShared(int arg) {
for (; ; ) {//现在有几把锁
Integer state = getState();
//已有的锁减去申请的锁,得到剩下的锁
Integer newCount = state - arg;
//剩下的锁小于0,都返回负值,表明获取锁失败
//剩下的锁大于等于0,但CAS成功,返回大于0的值,表明获取锁成功
//如果剩下的锁大于等于0,但CAS失败,一定要在for(;;)循环里重复设置state,直到成功为止
if (newCount < 0 || compareAndSetState(state, newCount)) {
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
for (; ; ) {
//可能多个线程同时释放release,不能直接setState。而是通过CAS设置state值
Integer state = getState();
//已有的锁加上释放的锁,就是还剩下的锁
Integer newCount = state arg;
//在释放时,释放失败了,一定要for(;;)循环,保证释放成功
if (compareAndSetState(state, newCount)) {
return true;
}
}
}
@Override
protected boolean isHeldExclusively() {
return getState() == 0;
}
}
private ShareLockAQS sync = new ShareLockAQS(5);
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//返回值大于等于0,返回true,获取锁成功,若为小于0,返回false,获取锁失败,方法会不断获取,直到成功
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
//unlock调用自定义同步器sync父类AbstractQueuedSynchronizer的releaseShared,
//父类releaseShared里再调用其子类的(自定义同步器sync)tryRelease(arg),释放锁。方法会不断释放,直到成功
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return null;
}
public static void main(String[] args) {
ShareLock lock = new ShareLock();
Boolean flag = lock.tryLock();
System.out.println(flag);
}
}
4.6 AQS源码再读AbstractQueuedSynchronizer独占模式获取锁的方法acquire(int arg):
public final void acquire(int arg) {
//tryAcquire()尝试去获取锁,如果成功返回true,经过取反为false,&&是短路判断,第一个为false
//整个都为false,acquireQueued()不会执行,最后返回。如果失败返回false,取反为true,还得执行acquireQueued()方法
if (!tryAcquire(arg) &&
//acquireQueued()使线程在等待队列中休息,被唤醒时会去尝试获取锁
acquireQueued(
//获取失败,addWaiter()将该线程加入等待队列的尾部,并标记为独占模式
addWaiter(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
//线程在等待过程不响应中断,只有获取了锁后才进行中断
selfInterrupt();
}
调用子类Sync覆写的tryAcquire(int acquires)方法:
//
尝试获取锁,成功则返回true,否则false
,无论成功还是失败,都立即返回。
public boolean tryAcquire(int acquires) {
assert acquires == 1; // 限定只能有1个许可
//调用父类AQS的CAS方法
if (compareAndSetState(0, 1)) {//state与期望值0比较,相同才设置state为1
setExclusiveOwnerThread(Thread.currentThread());//设置当前线程独占锁
return true;
}
return false;
}
调用父类AbstractQueuedSynchronizer的acquireQueued(final Node node, int arg)方法:
//走到acquireQueued方法,表明获取锁失败
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//for循环相当于自旋
for (;;) {
final Node p = node.predecessor();//拿到前驱节点
//如果前驱是head(头节点),即该结点已成第二个节点,那么便可尝试获取锁
//可能是头节点唤醒的,也可能被interrupt后醒来的
if (p == head && tryAcquire(arg)) {
setHead(node);//拿到锁后 将自己设置为头节点
p.next = null; // 将head.next置为null,就是为了方便GC回收以前的head结点。也就是让之前释放锁后的节点出队列
failed = false;
return interrupted;
}
//如果发现自己可去休息了,就进入waiting状态,直到被unpark()唤醒
//shouldParkAfterFailedAcquire方法检查当前线程是否可以进入waiting状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(Node pred, Node node)方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前继节点的状态
if (ws == Node.SIGNAL)
//如果已经通知了前继节点,前继节点拿到锁后通知本节点,那就可以进入等待状态
return true;
if (ws > 0) {
//如果前继节点出队了,那就继续往前面的节点寻找,直到找到最近的一个正常等待状态的节点,并排在它后面
//出队的节点没有引用,会被GC回收
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前继节点正常,就把其状态设置成SIGNAL,让其释放锁时唤醒自己,
//也有可能失败,前继节点已经释放了,才收到通知
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()方法:
//该方法让线程进入等待状态
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//线程找到了安全休息点,调用park(),类似sleep(),使自己进入waiting状态
return Thread.interrupted();//如果被唤醒,查看自己是不是因为被中断而唤醒的
}
park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。Sleep()和Object.wait()遇到Interrupt会抛出异常,而调用
LockSupport.park()遇到中断时,线程则会唤醒并继续运行。
addWaiter(Node mode)方法将当前线程加入到等待队列的队尾,并返回当前线程所在节点。
private Node addWaiter(Node mode) {
//以给定模式构造结点,mode有两种:EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//尝试将节点直接放到队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果上一步失败则通过enq入队。
enq(node);
return node;
}
此方法enq(final Node node)用于将node加入队尾
private Node enq(final Node node) {
//CAS自旋,不断重试,直到成功将节点放入队尾
for (;;) {
Node t = tail;
if (t == null) { // 如果队列为空,必须初始化,创建一个空的标志结点作为head头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {//队列非空,直接放到队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com