JAVA多线程并发、锁机制、线程池原理

JAVA多线程并发、锁机制、线程池原理

线程池原理

线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量, 超出数量的线程排队等候 ,等其它线程执行完毕,再从队列中取出任务来执行。他的主要特点为: 线程复用;控制最大并发数;管理线程 。

线程复用

每一个 Thread 的类都有一个 start 方法。 当调用 start 启动线程时 Java 虚拟机会调用该类的 run方法。 那么该类的 run() 方法中就是调用了 Runnable 对象的 run() 方法。 我们可以继承重写Thread 类,在其 start 方法中添加不断循环调用传递过来的 Runnable 对象 。 这就是线程池的实现原理。 循环方法中不断获取 Runnable 是用 Queue 实现的 ,在获取下一个 Runnable 之前可以是阻塞的。

线程池的组成

一般的线程池主要分为以下 4 个组成部分:

  1. 线程池管理器:用于创建并管理线程池
  2. 工作线程:线程池中的线程
  3. 任务接口:每个任务必须实现的接口,用于工作线程调度其运行
  4. 任务队列:用于存放待处理的任务,提供一种缓冲机制Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors,ExecutorService,ThreadPoolExecutor ,Callable 和 Future、FutureTask 这几个类。

ThreadPoolExecutor 的构造方法如下:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    Executors.defaultThreadFactory(), defaultHandler);
}
  1. corePoolSize:指定了线程池中的核心线程数量。
  2. maximumPoolSize:指定了线程池中的最大线程数量。
  3. keepAliveTime:当前线程池数量超过 corePoolSize 时,多余的空闲线程的存活时间,即多次时间内会被销毁。
  4. unit:keepAliveTime 的单位。
  5. workQueue:任务队列,被提交但尚未被执行的任务。
  6. threadFactory:线程工厂,用于创建线程,一般用默认的即可。
  7. handler:拒绝策略,当任务太多来不及处理,如何拒绝任务。

拒绝策略

线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列也已经排满了,再也塞不下新任务了。这时候我们就需要拒绝策略机制合理的处理这个问题。

JDK 内置的拒绝策略如下:

  1. AbortPolicy : 直接抛出异常,阻止系统正常运行。

  2. CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。

  3. DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。

  4. DiscardPolicy : 该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。

以上内置拒绝策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际需要,完全可以自己扩展 RejectedExecutionHandler 接口。

Java 线程池工作过程

  1. 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。

  2. 当调用 execute() 方法添加一个任务时,线程池会做如下判断: a) 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务; b) 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列; c) 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务; d) 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常 RejectExecutionException。

  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。

  4. 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

锁机制

原文链接

锁机制是实现线程同步的基础

首先我们来将锁进行分类,这里都是锁相关的一些名词,这并不全是指锁的状态,有的指锁的特性,有的指锁的设计

分类如下 :

一、公平锁 / 非公平锁

1.公平锁   指多个线程按照申请锁的顺序来获取锁。 2.非公平锁   指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。有可能,会造成优先级反转或者饥饿现象。

Java实例详解: (1)ReentrantLock   对于ReentrantLock而言,通过构造函数指定该锁是否是公平锁,默认是非公平锁。非公平锁的优点在于吞吐量比公平锁大。 (2)Synchronized   对于Synchronized而言,也是一种非公平锁。由于其并不像ReentrantLock是通过AQS的来实现线程调度,所以并没有任何办法使其变成公平锁。

二、可重入锁 / 不可重入锁

1.可重入锁   广义上指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁(前提得是同一个对象或者class),这样的锁就叫做可重入锁。ReentrantLock和synchronized都是可重入锁。 2.不可重入锁   与可重入锁相反,不可递归调用,递归调用就发生死锁。

看到一个经典的讲解,使用自旋锁来模拟一个不可重入锁,代码如下:

import java.util.concurrent.atomic.AtomicReference;

public class UnreentrantLock {

    private AtomicReference<Thread> owner = new AtomicReference<Thread>();

    public void lock() {
        Thread current = Thread.currentThread();
        //这句是很经典的“自旋”语法,AtomicInteger中也有
        for (;;) {
            if (!owner.compareAndSet(null, current)) {
                return;
            }
        }
    }

    public void unlock() {
        Thread current = Thread.currentThread();
        owner.compareAndSet(current, null);
    }
}

代码也比较简单,使用原子引用来存放线程,同一线程两次调用lock()方法,如果不执行unlock()释放锁的话,第二次调用自旋的时候就会产生死锁,这个锁就不是可重入的,而实际上同一个线程不必每次都去释放锁再来获取锁,这样的调度切换是很耗资源的。 把它变成一个可重入锁:

import java.util.concurrent.atomic.AtomicReference;

public class UnreentrantLock {

    private AtomicReference<Thread> owner = new AtomicReference<Thread>();
    private int state = 0;

    public void lock() {
        Thread current = Thread.currentThread();
        if (current == owner.get()) {
            state++;
            return;
        }
        //这句是很经典的“自旋”式语法,AtomicInteger中也有
        for (;;) {
            if (!owner.compareAndSet(null, current)) {
                return;
            }
        }
    }

    public void unlock() {
        Thread current = Thread.currentThread();
        if (current == owner.get()) {
            if (state != 0) {
                state--;
            } else {
                owner.compareAndSet(current, null);
            }
        }
    }
}

在执行每次操作之前,判断当前锁持有者是否是当前对象,采用state计数,不用每次去释放锁。 ReentrantLock中可重入锁实现 这里看非公平锁的锁获取方法:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //就是这里
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

在AQS中维护了一个private volatile int state来计数重入次数避免了频繁的持有释放操作,这样既提升了效率,又避免了死锁

三、独享锁 / 共享锁

1.独享锁

该锁每一次只能被一个线程所持有。SynchronizedReentrantLock就是独享锁。

2.共享锁   该锁可被多个线程共有,典型的就是ReentrantReadWriteLock里的读锁,它的读锁 ReentrantReadWriteLock.ReadLock是可以被共享的,但是它的写锁(ReentrantReadWriteLock.WriteLock却每次只能被独占。另外读锁的共享可保证并发读是非常高效的,但是读写和写写,写读都是互斥的。

独享锁与共享锁也是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。

AQS:

AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒 时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

四、互斥锁 / 读写锁

1.互斥锁

在访问共享资源之前进行加锁操作,在访问完成之后进行解锁操作。 加锁后,任何其他试图再次加锁的线程会被阻塞,直到当前进程解锁。如果解锁时有一个以上的线程阻塞,那么所有该锁上的线程都变成就绪状态, 第一个变为就绪状态的线程又执行加锁操作,那么其他的线程又会进入等待。 在这种方式下,只有一个线程能够访问被互斥锁保护的资源。

互斥锁某种意义上同于上面说到的独享锁。

2.读写锁-ReadWriteLock   读写锁既是互斥锁,又是共享锁,read模式是共享,write是互斥(排它锁)的。 读写锁有三种状态:

读加锁状态、写加锁状态和不加锁状态。   一次只有一个线程可以占有写模式的读写锁,但是多个线程可以同时占有读模式的读写锁。只有一个线程可以占有写状态的锁,但可以有多个线程同时占有读状态锁,这也是它可以实现高并发的原因。当其处于写状态锁下,任何想要尝试获得锁的线程都会被阻塞,直到写状态锁被释放;如果是处于读状态锁下,允许其它线程获得它的读状态锁,但是不允许获得它的写状态锁,直到所有线程的读状态锁被释放;为了避免想要尝试写操作的线程一直得不到写状态锁,当读写锁感知到有线程想要获得写状态锁时,便会阻塞其后所有想要获得读状态锁的线程。所以读写锁非常适合资源的读操作远多于写操作的情况。

五、乐观锁 / 悲观锁

1.乐观锁

总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制和CAS算法实现。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。在Java中java.util.concurrent.atomic包下面的原子变量类就是使用了乐观锁的一种实现方式CAS实现的。

2.悲观锁   总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。Java中synchronized和ReentrantLock等独占锁就是悲观锁思想的实现。

六、分段锁

分段锁其实是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap而言,其并发的实现就是通过分段锁的形式来实现高效的并发操作。

七、偏向锁 / 轻量级锁 / 重量级锁

1.锁的状态: (1)无锁状态 (2)偏向锁状态 (3)轻量级锁状态 (4)重量级锁状态   锁的状态是通过对象监视器在对象头中的字段来表明的

这四种状态都不是Java语言中的锁,而是JVM为了提高锁的获取与释放效率而做的优化(使用synchronized时)

2.锁的级别从低到高: 无锁、偏向锁、轻量级锁、重量级锁。 四种状态会随着竞争的情况逐渐升级,而且是不可逆的过程,即不可降级。

3.JDK1.6对锁的优化 3.1 重量级锁

重量级锁基于Monitor实现,成本高。   重量级锁是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。

3.2 轻量级锁   轻量级锁是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能

(1)优化点:   在没有多线程竞争的情况下,通过CAS减少重量级锁使用操作系统互斥量产生的性能消耗。 (2)什么情况下使用:   关闭偏向锁或由于多线程竞争导致的偏向锁升级为轻量级锁。 (3)获取锁步骤:

​ 1)判断是否处于无锁状态,若是,则JVM在当前线程的栈帧中创建锁记录(Lock Record)空间,用于存放锁对象中的Mark Word的拷贝,官方称为Displaced Mark Word;否则执行步骤3)。

  2)当前线程尝试利用CAS将锁对象的Mark Word更新为指向锁记录的指针。如果更新成功意味着获取到锁,将锁标志位置为00,执行同步代码块;如果更新失败,执行步骤3)。

  3)判断锁对象的Mark Word是否指向当前线程的栈帧,若是说明当前线程已经获取了锁,执行同步代码,否则说明其他线程已经获取了该锁对象,执行步骤4)。

  4)当前线程尝试使用自旋来获取锁,自旋期间会不断的执行步骤1),直到获取到锁或自旋结束。因为自旋锁会消耗CPU,所以不能无限的自旋。如果自旋期间获取到锁(其他线程释放锁),执行同步块;否则锁膨胀为重量级锁,当前线程阻塞,等待持有锁的线程释放锁时的唤醒。

(4)释放锁步骤:   1)从当前线程的栈帧中取出Displaced Mark Word存储的锁记录的内容。

  2)当前线程尝试使用CAS将锁记录内容更新到锁对象中的Mark Word中。如果更新成功,则释放锁成功,将锁标志位置为01无锁状态;否则,执行3)。

  3)CAS更新失败,说明有其他线程尝试获取锁。需要释放锁并同时唤醒等待的线程。

3.3 偏向锁   偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。 (1)优化点:   在没有多线程竞争的情况下,减少轻量级锁的不必要的CAS操作。在无竞争情况下,完全消除同步。 (2)优化方法:   锁对象的Mark Word中记录获取锁的线程ID。 (3)获取锁步骤:   1)判断锁对象是否是偏向锁(即锁标志位为01,偏向锁位为1),若为偏向锁状态执行2)。

  2)判断锁对象的线程ID是否为当前线程的ID,如果是则说明已经获取到锁,执行代码块;否则执行3)。

  3)当前线程使用CAS更新锁对象的线程ID为当前线程ID。如果成功,获取到锁;否则执行4)

  4)当到达全局安全点,当前线程根据Mark Word中的线程ID通知持有锁的线程挂起,将锁对象Mark Word中的锁对象指针指向当前堆栈中最近的一个锁记录,偏向锁升级为轻量级锁,恢复被挂起的线程。 (4)释放锁步骤:   偏向锁采用一种等到竞争出现时才释放锁的机制。当其他线程尝试竞争偏向锁时,当前线程才会释放偏向锁,否则线程不会主动去释放偏向锁。偏向锁的撤销需要等待全局安全点。

  1)首先暂停持有偏向锁的线程。

  2)撤销偏向锁,恢复到无锁状态或轻量级锁状态。

4.几种锁的对比:

优点缺点适用情况
偏向锁加锁和解锁不需要额外的消耗,和执行非同步代码相差无几。如果线程存在锁竞争,需要额外的锁撤销的消耗。适用于只有一个线程访问同步块的情况
轻量级锁竞争的线程不会阻塞,提高了响应速度长时间得不到锁的线程使用自旋消耗CPU追求响应速度。同步代码执行非常快
重量级锁线程竞争不会使用自旋,不会消耗CPU线程出现竞争时会阻塞,响应速度慢追求吞吐量。同步代码执行时间长

八、自旋锁

参考文献:

《Java 中15种锁的介绍》 《synchronized的实现原理》

附录:

CAS(compareAndSwamp,比较并交换) CAS有3个操作数: 内存值V 预期值A 更新值B 只有当V=A时,才把V更新为B。

过程简介:   更新时,判断只有A的值等于V变量的当前旧值时,才会将B新值赋给V,更新为新值。   否则,则认为已经有其他线程更新过了,则当前线程什么都不操作,最后cas放回当前V变量的真实值。

AQS(AbstractQueuedSynchronizer, 队列同步器)

​ AQS是Java并发用来构建锁和其他同步组件的基础框架。AQS是一个抽象类,主是是以继承的方式使用。AQS本身是没有实现任何同步接口的,它仅仅只是定义了同步状态的获取和释放的方法来供自定义的同步组件的使用。一般是同步组件的静态内部类,即通过组合的方式使用。

​ 抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch。 (1)原理简介:

维护了一个volatile int state(代表共享资源)和一个FIFO(双向队列)线程等待队列(多线程争用资源被阻塞时会进入此队列)。 (2)共享变量state state的访问方式有三种: getState() setState() compareAndSetState()

同步状态(可理解为锁)就是这个int型的变量state。head和tail分别是同步队列的头结点和尾结点。假设state=0表示同步状态可用(如果用于锁,则表示锁可用),state=1表示同步状态已被占用(锁被占用)。

(3)FIFO线程阻塞队列

state=0表示锁是空闲状态,state=1表示琐是被占用状态。 head指向占用锁线程所在的结点,后续线程会依次存放在后面的结点tail指向最后的结点,从head出,tail进入。

CAS和AQS具体解析可参考另一篇文章:《CAS和ASQ》

end
  • 作者:AWhiteElephant(联系作者)
  • 发表时间:2022-05-10 15:36
  • 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  • 转载声明:如果是转载栈主转载的文章,请附上原文链接
  • 评论