TOC
KINA

KINA-0

Start having fun with KINA right now!

并发编程高级:线程基础、并发安全、线程池…

本文详细讲解了各种并发编程知识。首先,文章描述了线程与进程的区别,强调线程是轻量级的并发单位。接着,阐述了并行与并发的不同,并列举了四种创建线程的方法:继承Thread类、实现Runnable接口、Callable接口与线程池。文章还解读了线程的生命周期、状态及其互斥与同步概念,重点讲解了synchronized关键字的底层原理,包括对象锁与monitor结构,同时介绍了线程安全机制中的CAS与volatile的使用。总体上,文章为理解Java并发编程提供了系统性的知识框架。

目录

1 线程基础

1.1 线程和进程

程序由指令和数据组成,但这些指令要运行、数据要读写,就必须将指令加载至 CPU、数据加载至内存。在指令运行过程中还需用到磁盘、网络等设备。进程(Process)就是用于加载指令、管理内存、管理IO。
当一个程序被运行,从磁盘加载这个程序的代码至内存,此时就开启了一个进程。

进程

一个进程中可以分为一到多个线程(Thread)。一个线程就是一个指令流,将指令流中的指令以一定的顺序交给CPU执行。
Java中,线程是最小调度单位,进程是资源分配的最小单位。在windows中进程是不活动的,只是作为线程的容器。

线程

对比

  • 进程是正在运行程序的实例,进程中包含了线程,每个线程执行不同的任务
  • 不同的进程使用不同的内存空间,在当前进程下的所有线程可以共享内存空间
  • 线程更轻量,线程上下文切换成本一般上要比进程上下文切换低(上下文切换:从一个线程切换到另一个线程)

1.2 并行和并发

单核CPU中,线程实际是串行执行的。
操作系统中有一个组件叫做任务调度器,将cpu的时间片(windows下时间片最小约为 15 毫秒)分给不同的程序使用,由于cpu在线程间(时间片很短)的切换非常快,人类感觉是同时运行的——微观串行,宏观并行。
这种线程轮流使用CPU的做法称为并发(Concurrent)。

并发

多核CPU中,每个核都可以调度运行线程,这时候线程是并行(Parallel)的。

并行

并发(concurrent)是同一时间应对(dealing with)多件事情的能力
并行(parallel)是同一时间动手做(doing)多件事情的能力

1.3 线程初始化

1.3.1 Thread、Runnable、Callable、线程池

共有四种方式可以创建线程:继承Thread类、实现Runnable接口、实现Callable接口、线程池创建线程

  1. 继承Thread类
public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("MyThread...run...");
    }

    public static void main(String[] args) {
        // 创建Thread子类对象(或直接Thread传入lambda表达式有参构造,即传入run()函数)
        MyThread t1 = new MyThread() ;
        MyThread t2 = new MyThread() ;

        // 调用start()方法启动线程
        t1.start();
        t2.start();
    }
}
  1. 实现Runnable接口
public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("MyRunnable...run...");
    }

    public static void main(String[] args) {
        // 创建Runnable实现类对象
        MyRunnable mr = new MyRunnable() ;

        // 创建Thread对象
        Thread t1 = new Thread(mr) ;
        Thread t2 = new Thread(mr) ;

        // 调用start()方法启动线程
        t1.start();
        t2.start();
    }
}
  1. 实现Callable接口
public class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("MyCallable...call...");
        return "OK";
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建Callable实现类对象
        MyCallable mc = new MyCallable() ;

        // 创建FutureTask对象(泛型与实现Callable时一致,为返回值类型)
        FutureTask<String> ft = new FutureTask<String>(mc) ;

        // 创建Thread对象
        Thread t1 = new Thread(ft) ;
        Thread t2 = new Thread(ft) ;

        // 调用start()方法启动线程
        t1.start();

        // 调用FutureTask对象的get()方法获取执行结果
        String result = ft.get();
        System.out.println(result);
    }
}
  1. 线程池创建线程
public class MyExecutors implements Runnable {
    @Override
    public void run() {
        System.out.println("MyRunnable...run...");
    }

    public static void main(String[] args) {
        // 创建线程池对象
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        threadPool.submit(new MyExecutors()) ;

        // 关闭线程池
        threadPool.shutdown();
    }
}

1.3.2 Runnable和Callable的区别

  1. Runnable接口的run()方法没有返回值;Callable接口的call()方法有返回值,是个泛型,和Future、FutureTask配合可以用来获取异步执行的结果。
  2. Callalbe接口可通过调用FutureTask对象的get()得到返回的执行结果,此方法会阻塞主进程的继续往下执行,若不调用则不会阻塞。
  3. Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的异常只能在内部消化,不能继续上抛

1.3.3 线程run()和start()的区别

  • start(): 用来启动线程,通过该线程调用run()方法执行run()中所定义的逻辑代码。start()方法只能在启动时被调用一次。
  • run(): 封装了要被线程执行的代码,可以被调用多次。

1.4 线程状态

JDK5之后,Ready和Running合称Runnable状态

1.4.1 Thread类六大状态

JDK中的Thread类中的枚举State封装了操作系统的线程状态

  • NEW(新建):尚未启动的线程的线程状态。
  • RUNNABLE(可运行/就绪):可运行线程的线程状态。
  • BLOCKED(阻塞):线程阻塞等待监视器锁的线程状态。
  • WAITING(等待):等待线程的线程状态。
  • TIMED_WAITING(计时等待):具有指定等待时间的等待线程的线程状态。
  • TERMINATED(终止):已终止线程的线程状态。

1.4.2 状态之间的变化

  • 新建线程对象为新建状态
  • 调用start()方法转变为可执行状态
  • 线程获取到了CPU的执行权,执行结束为终止状态
  • 在可执行状态的过程中,如果没有获取CPU的执行权,可能会切换其他状态
    • 如果没有获取锁(synchronized或lock)则进入阻塞状态,获得锁后切换为可执行状态
    • 如果线程调用了wait()方法则进入等待状态,其他线程调用notify()唤醒后切换为可执行状态
    • 如果线程调用了sleep()方法则进入计时等待状态,到时间后切换为可执行状态

状态之间的变化

1.5 join()按顺序执行线程

在多线程中有多种方法让线程按特定顺序执行,可用线程类的join()方法在一个线程中启动另一个线程,另外一个线程完成该线程继续执行。

【例】新建T1、T2、T3三个线程,为确保三个线程的顺序,应先启动最后一个(T3调用T2,T2调用T1),使得T1先完成而T3最后完成

public class JoinTest {
    public static void main(String[] args) {
        // 创建线程对象
        Thread t1 = new Thread(() -> {
            System.out.println("t1");
        }) ;

        Thread t2 = new Thread(() -> {
            try {
                t1.join();  // 加入线程t1,仅当t1线程执行完毕以后,再执行该线程
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("t2");
        }) ;

        Thread t3 = new Thread(() -> {
            try {
                t2.join();  // 加入线程t2,仅当t2线程执行完毕以后,再执行该线程
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("t3");
        }) ;

        // 启动线程
        t1.start();
        t2.start();
        t3.start();
    }
}

1.6 线程阻塞与唤醒

1.6.1 wait()和sleep()的异同

  • 共同点
    • wait()wait(long)sleep(long)的效果都是让当前线程暂时放弃CPU的使用权,进入阻塞状态
  • 不同点
    • 方法归属不同
      • Thread.sleep(long)是Thread的静态方法
      • wait()wait(long)是Object的成员方法,每个对象都有
    • 醒来时机不同
      • 执行sleep(long)wait(long)的线程都会在等待相应毫秒后醒来
      • wait(long)wait()还可以被notify()打断唤醒,其中wait()如果不唤醒就一直等下去
    • 锁特性不同 (※)
      • wait()的调用必须先获取该对象的锁,而sleep()则无此限制
      • wait()执行后会释放对象锁,允许其它线程获得该对象锁;而sleep()如果在synchronized代码块中执行,并不会释放对象锁

【例】测试wait()sleep()的异同

public class WaitSleepCase {
    static final Object LOCK = new Object();

    public static void main(String[] args) throws InterruptedException {
        sleeping();
    }

    private static void illegalWait() throws InterruptedException {
        LOCK.wait();
    }

    private static void waiting() throws InterruptedException {
        Thread t1 = new Thread(() -> {
            synchronized (LOCK) {
                try {
                    get("t").debug("waiting...");
                    LOCK.wait(5000L);
                } catch (InterruptedException e) {
                    get("t").debug("interrupted...");
                    e.printStackTrace();
                }
            }
        }, "t1");
        t1.start();

        Thread.sleep(100);
        synchronized (LOCK) {
            main.debug("other...");
        }

    }

    private static void sleeping() throws InterruptedException {
        Thread t1 = new Thread(() -> {
            synchronized (LOCK) {
                try {
                    get("t").debug("sleeping...");
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    get("t").debug("interrupted...");
                    e.printStackTrace();
                }
            }
        }, "t1");
        t1.start();

        Thread.sleep(100);
        synchronized (LOCK) {
            main.debug("other...");
        }
    }
}

1.6.2 notify()和notifyAll()的区别

  • notify():只随机唤醒一个wait()线程
  • notifyAll():唤醒所有wait()的线程

【例】线程阻塞与唤醒

public class WaitNotify {
    static boolean flag = false;
    static Object lock = new Object();

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            synchronized (lock){
                while (!flag){
                    System.out.println(Thread.currentThread().getName() + "...wating...");
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + "...flag is true");
            }
        });

        Thread t2 = new Thread(() -> {
            synchronized (lock){
                while (!flag){
                    System.out.println(Thread.currentThread().getName() + "...wating...");
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + "...flag is true");
            }
        });

        Thread t3 = new Thread(() -> {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + " hold lock");
                lock.notifyAll();
                flag = true;
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        t1.start();
        t2.start();
        t3.start();
    }
}

1.7 停止线程的方式

有三种方式可以停止线程:

  1. 使用退出标志,使线程正常退出,即当run()方法完成后线程终止
public class MyInterrupt1 extends Thread {
    volatile boolean flag = false ;     // 线程执行的退出标记

    @Override
    public void run() {
        while(!flag) {
            System.out.println("MyThread...run...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 创建线程对象
        MyInterrupt1 t1 = new MyInterrupt1() ;
        t1.start();

        // 主线程休眠6秒
        Thread.sleep(6000);

        // 更改标记为true
        t1.flag = true;
    }
}
  1. 使用stop()方法强行终止(不推荐,该方法已作废)
public class MyInterrupt2 extends Thread {
    volatile boolean flag = false ;     // 线程执行的退出标记

    @Override
    public void run() {
        while(!flag) {
            System.out.println("MyThread...run...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 创建线程对象
        MyInterrupt2 t1 = new MyInterrupt2() ;
        t1.start();

        // 主线程休眠2秒
        Thread.sleep(6000);

        // 调用stop方法
        t1.stop();
    }
}
  1. 使用interrupt()方法中断线程

public class MyInterrupt3 {
    public static void main(String[] args) throws InterruptedException {
        //1.打断阻塞的线程
        /*
        Thread t1 = new Thread(()->{
            System.out.println("t1 正在运行...");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1");
        t1.start();
        Thread.sleep(500);
        t1.interrupt();
        System.out.println(t1.isInterrupted());
        */

        //2.打断正常的线程
        Thread t2 = new Thread(()->{
            while(true) {
                Thread current = Thread.currentThread();
                boolean interrupted = current.isInterrupted();
                if(interrupted) {
                    System.out.println("打断状态:" + interrupted);
                    break;
                }
            }
        }, "t2");
        t2.start();
        Thread.sleep(500);
//      t2.interrupt();
    }
}

1.8 线程互斥与同步

在引入多线程后,由于线程执行的异步性,会给系统造成混乱,特别是在急用临界资源时,如多个线程急用同一台打印机,会使打印结果交织在一起,难于区分。当多个线程急用共享变量,表格,链表时,可能会导致数据处理出错,因此线程同步的主要任务是使并发执行的各线程之间能够有效的共享资源和相互合作,从而使程序的执行具有可再现性。

当线程并发执行时,由于资源共享和线程协作,使用线程之间会存在以下两种制约关系。

  1. 互斥:间接相互制约。一个系统中的多个线程必然要共享某种系统资源,如共享 CPU,共享 I/O 设备,所谓间接相互制约即源于这种资源共享,打印机就是最好的例子,线程 A 在使用打印机时,其它线程都要等待。
  2. 同步:直接相互制约。这种制约主要是因为线程之间的合作,如有线程 A 将计算结果提供给线程 B 作进一步处理,那么线程 B 在线程 A 将数据送达之前都将处于阻塞状态。

对于互斥可以这样理解:线程A和线程B互斥访问某个资源则它们之间就会产个顺序问题——要么线程A等待线程B操作完毕,要么线程B等待线程操作完毕,这其实就是线程的同步。因此互斥其实是一种特殊的同步,同步包括互斥。


2 并发安全

2.1 synchronized关键字及其底层原理

  • Synchronized对象锁采用互斥的方式让同一时刻至多只有一个线程能持有对象锁
  • 其底层由monitor实现,monitor是JVM级别的对象( C++实现),线程获得锁需要使用对象(锁)关联monitor
  • 在monitor内部有三个属性——Owner、EntryList、WaitSet。其中owner是关联的获得锁的线程,并且只能关联一个线程;EntryList关联的是处于阻塞状态的线程;WaitSet关联的是处于Waiting状态的线程

2.1.1 对象锁

Synchronized对象锁)采用互斥的方式让同一时刻至多只有一个线程能持有对象锁,其它线程再想获取这个对象锁时就会阻塞住

【例】如下的抢票代码,如果不加锁,就会出现超卖或者一张票卖给多个人

public class TicketDemo {
    static Object lock = new Object();
    int ticketNum = 10;

    public synchronized void getTicket() {
        synchronized (this) {
            if (ticketNum <= 0) {
                return;
            }
            System.out.println(Thread.currentThread().getName() + "抢到一张票,剩余:" + ticketNum);
            // 非原子性操作
            ticketNum--;
        }
    }

    public static void main(String[] args) {
        TicketDemo ticketDemo = new TicketDemo();
        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                ticketDemo.getTicket();
            }).start();
        }
    }
}

2.1.2 Monitor

Monitor(监视器)由JVM提供,用C++语言实现。在代码中想要体现monitor需要借助javap命令查看clsss的字节码。

【例】找到以下类的class文件,在class文件目录下执行javap -v SyncTest.class,反编译效果如下:

public class SyncTest {
    static final Object lock = new Object();
    static int counter = 0;

    public static void main(String[] args) {
        synchronized (lock) {
            counter++;
        }
    }
}

Monitor

其中monitorenter为上锁开始的地方,monitorexit为解锁的地方。被monitorenter和monitorexit包围住的指令就是上锁的代码。
有两个monitorexit的原因:第二个monitorexit是为了防止锁住的代码抛异常后不能及时释放锁。

2.1.3 存储结构与执行流程

在使用了synchornized代码块时需要指定一个对象,所以synchornized也被称为对象锁。
monitor跟该对象产生关联,如下图所示

存储结构与执行流程

Monitor内部具体的存储结构:

  • Owner:存储当前获取锁的线程,只能有一个线程可以获取
  • EntryList:关联没有抢到锁的线程,即处于阻塞(Blocked)状态的线程
  • WaitSet:关联调用了wait()方法的线程,即处于等待(Waiting)状态的线程

具体执行流程:

  • 代码进入synchorized代码块——先让lock(对象锁)关联monitor,然后判断Owner是否有线程持有。
  • 若没有线程持有,则让当前线程持有,表示该线程获取锁成功。
  • 若有线程持有,则让当前线程进入EntryList进行阻塞。若Owner持有的线程已经释放了锁,则在EntryList中的线程去竞争锁的持有权(非公平)。
  • 若代码块中调用了wait()方法,则让当前线程进入WaitSet进行等待。

2.2 synchronized关键字及其底层原理【进阶】

Monitor实现的锁属于重量级锁。何为锁升级?

  • Monitor实现的锁属于重量级锁,里面涉及到了用户态和内核态的切换、进程的上下文切换,成本较高,性能比较低。
  • 在JDK 1.6引入了两种新型锁机制——轻量级锁和偏向锁。它们的引入是为了解决在没有多线程竞争或基本没有竞争的场景下因使用传统锁机制带来的性能开销问题。

Java中的synchronized有偏向锁、轻量级锁、重量级锁三种形式,分别对应了锁只被一个线程持有、不同线程交替持有锁、多线程竞争锁三种情况。

描述
重量级锁 底层使用的Monitor实现,涉及到用户态和内核态的切换、进程的上下文切换,成本较高,性能比较低
轻量级锁 若线程加锁的时间是错开的(即没有竞争),可以使用轻量级锁来优化。轻量级修改了对象头的锁标志,相对重量级锁性能提升很多。每次修改都是CAS操作,保证原子性
偏向锁 若很长一段时间内都只有一个线程使用锁,可以使用偏向锁。在第一次获得锁时,会有一个CAS操作,之后该线程再获取锁,只需要判断MarkWord中是否是自己的线程id即可,而不是开销相对较大的CAS命令

2.2.1 对象的内存结构

在HotSpot虚拟机中,对象在内存中存储的布局可分为3块区域:对象头(Header)、实例数据(Instance Data)和对齐填充

对象的内存结构

2.2.2 MarkWord

MarkWord

  • hashcode:25位的对象标识哈希码
  • age:对象分代年龄,占4位
  • biased_lock:偏向锁标识,占1位。0表示没有开启偏向锁,1表示开启偏向锁
  • thread:持有偏向锁的线程ID,占23位
  • epoch:偏向时间戳,占2位
  • ptr_to_lock_record:轻量级锁状态下,指向栈中锁记录的指针,占30位
  • ptr_to_heavyweight_monitor:重量级锁状态下,指向对象监视器Monitor的指针,占30位

可以通过lock的标识来判断是哪一种锁的等级:

  1. 后三位为 001 —— 无锁
  2. 后三位为 101 —— 偏向锁
  3. 后两位为 00 —— 轻量级锁
  4. 后两位为 10 —— 重量级锁

2.2.3 重量级锁

Monitor实现的锁属于重量级锁。每个Java对象都可以关联一个Monitor对象,使用synchronized给对象上锁(重量级)之后,该对象头MarkWord中就被设置了指向Monitor对象的指针,这就使得对象与monitor产生关联。

重量级锁

2.2.4 轻量级锁

在很多的情况下,在Java程序运行时,同步块中的代码都是不存在竞争的,不同的线程交替的执行同步块中的代码。这种情况下,用重量级锁是没必要的。因此JVM引入了轻量级锁的概念。

static final Object obj = new Object();

public static void method1() {
    synchronized (obj) {
        // 同步块 A
        method2();
    }
}

public static void method2() {
    synchronized (obj) {
        // 同步块 B
    }
}

加锁过程:

  1. 在线程栈中创建一个Lock Record,将其obj字段指向锁对象。

加锁过程1

  1. 通过CAS指令将Lock Record的地址存储在对象头的mark word中(数据进行交换),如果对象处于无锁状态则修改成功,代表该线程获得了轻量级锁。

加锁过程2

  1. 若当前线程已持有该锁,代表这是一次锁重入。此时设置Lock Record第一部分为null,起到重入计数器的作用。

加锁过程3

  1. 若CAS修改失败,说明发生了竞争,需要膨胀为重量级锁。

解锁过程:

  1. 遍历线程栈,找到所有obj字段等于当前锁对象的Lock Record。
  2. 若Lock Record的MarkWord为null,代表这是一次重入,将obj设置为null后continue。

解锁过程1

  1. 若Lock Record的MarkWord不为null,则利用CAS指令将对象头的mark word恢复成为无锁状态。如果失败则膨胀为重量级锁。

解锁过程2

2.2.5 偏向锁

轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行CAS操作。
Java 6中引入了偏向锁来做进一步优化:只有第一次使用CAS将线程ID设置到对象的MarkWord头,之后发现
这个线程ID是自己的就表示没有竞争,不用重新CAS。以后只要不发生竞争,这个对象就归该线程所有。

static final Object obj = new Object();

public static void m1() {
    synchronized (obj) {
        // 同步块 A
        m2();
    }
}

public static void m2() {
    synchronized (obj) {
        // 同步块 B
        m3();
    }
}

public static void m3() {
    synchronized (obj) {

    }
}

加锁流程如下,解锁流程参考轻量级锁

  1. 在线程栈中创建一个Lock Record,将其obj字段指向锁对象。

偏向锁1

  1. 通过CAS指令将Lock Record的线程id存储在对象头的MarkWord中,同时也设置偏向锁的标识为101,如果对象处于无锁状态则修改成功,代表该线程获得了偏向锁。

偏向锁2

  1. 若当前线程已持有该锁,代表这是一次锁重入。此时设置Lock Record第一部分为null,起到重入计数器的作用。与轻量级锁不同,这里不会再次进行CAS操作,只是判断对象头中的线程id是否是自己,因为缺少了CAS操作,性能相对轻量级锁更好一些。

偏向锁3

2.3 JMM(Java内存模型)

JMM(Java Memory Model,Java内存模型)是java虚拟机规范中所定义的一种内存模型,描述了Java程序中各种变量(线程共享变量)的访问规则,以及在JVM中将变量存储到内存和从内存中读取变量这样的底层细节。

JMM

特点:

  1. 所有的共享变量都存储于主存(计算机的RAM),这里所说的变量指的是实例变量和类变量,不包含局部变量,因为局部变量是线程私有的,因此不存在竞争问题。
  2. 每一个线程还存在自己的工作内存,线程的工作内存保留了被线程使用的变量的工作副本。
  3. 线程对变量的所有的操作(读、写)都必须在工作内存中完成,而不能直接读写主内存中的变量,不同线程之间也不能直接访问对方工作内存中的变量,线程间变量的值的传递需要通过主内存完成。

2.4 CAS

CAS(Compare And Swap,比较再交换)体现了一种乐观锁的思想,在无锁情况下保证线程操作共享数据的原子性。在JUC(java.util.concurrent)包下实现的很多类都用到了CAS操作:AbstractQueuedSynchronizer(AQS框架)、AtomicXXX类等。

2.4.1 基本工作流程

基于上文的JMM内存模型进行说明

  1. 线程1与线程2都从主内存中获取变量int a = 100,同时放到各个线程的工作内存中
    设当前内存值V、旧的预期值A、即将更新的值B,当且仅当旧的预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做并返回false。如果CAS操作失败,通过自旋的方式等待并再次尝试,直到成功

CAS

  1. 线程1操作:V:int a = 100,A:int a = 100,B:修改后的值 int a = 101 (a++)
    • 线程1拿A的值与主内存V的值进行比较,判断是否相等
    • 如果相等,则把B的值101更新到主内存中

线程1操作

  1. 线程2操作:V:int a = 100,A:int a = 100,B:修改后的值:int a = 99a--
    • 线程2拿A的值与主内存V的值进行比较,判断是否相等(目前不相等,因为线程1已更新V的值99)
    • 不相等,则线程2更新失败

线程2操作

  1. 自旋锁操作
    • 因为没有加锁,所以线程不会陷入阻塞,效率较高
    • 如果竞争激烈,重试频繁发生,效率会受影响

自旋锁

需要不断尝试获取共享内存V中最新的值,然后再在新的值的基础上进行更新操作,如果失败就继续尝试获取新的值,直到更新成功

2.4.2 底层实现

CAS 底层依赖于一个Unsafe类来直接调用操作系统底层的 CAS 指令:

底层实现

如上所示,均为native修饰的方法,由系统提供的接口执行,并非java代码实现,一般的思路也都是自旋锁实现

底层实现2

在java中比较常用的如ReentrantLock和Atomic开头的线程安全类,都调用了Unsafe中的方法

【例】ReentrantLock中的一段CAS代码

ReentrantLock中的一段CAS代码

2.4.3 乐观锁和悲观锁

  • CAS 基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量。(就算改了也没关系,大不了吃亏点再重试)
  • synchronized基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量。(我上了锁你们都别想改,我改完了解开锁,你们才有机会)

2.5 volatile关键字

一旦一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,其就具备了以下两层语义:

2.5.1 作用1:保证线程间的可见性

保证了不同线程对该变量进行操作时的可见性,即一个线程修改了某个变量的值,该新值对其他线程来说是立即可见的。volatile关键字会强制将修改的值立即写入主存。

【例】永不停止的循环:当执行以下代码时,会发现foo()方法中的循环无法结束,即读取不到共享变量的值结束循环

public class ForeverLoop {
    static boolean stop = false;

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            stop = true;
            System.out.println("modify stop to true...");
        }).start();
        foo();
    }

    static void foo() {
        int i = 0;
        while (!stop) {
            i++;
        }
        System.out.println("stopped... c:"+ i);
    }
}

原因:在JVM虚拟机中有一个JIT即时编辑器)对代码做了优化。

while (!stop) {
  i++;
} 

上述这段代码在短时间内执行次数太多,当达到阈值时JIT就会优化此代码,如下所示

while (true) {
  i++;
} 

代码优化后,即使stop变量变为false也依旧无法停止循环

解决方案:

  • 法1:在程序运行时加入vm参数-Xint,表示禁用即时编辑器。(不推荐,得不偿失,因为其他程序还要使用)
  • 法2:在修饰stop变量时加上volatile,表示当前代码禁用了即时编辑器。代码如下:
    static volatile boolean stop = false;

2.5.2 作用2:禁止进行指令重排序

用volatile修饰共享变量会在读、写共享变量时加入不同的屏障,阻止其他读写操作越过屏障,从而达到阻止重排序的效果。

【例】如下代码所示,在获取结果时,有可能会出现4种情况

禁止进行指令重排序

  • 情况一:执行actor2()获取结果 → 0, 0(正常)
  • 情况二:先执行actor1()中的第一行代码,然后执行actor2()获取结果 → 0, 1(正常)
  • 情况三:先执行actor1()中所有代码,然后执行actor2()获取结果 → 1, 1(正常)
  • 情况四:先执行actor1()中第二行代码,然后执行actor2()获取结果 → 1, 0(发生了指令重排序,影响结果)

解决方案:在变量上添加volatile,禁止指令重排序,即可以解决问题。代码与屏障添加的示意图如下所示

禁止进行指令重排序2

两个原则:

  1. 写操作屏障:阻止上方其它写操作越过屏障排到volatile变量写之下
  2. 读操作屏障:阻止下方其它读操作越过屏障排到volatile变量读之上

【补充】上面的解决方案将volatile加在变量y上。将volatile加在变量x上是不可行的,因为违反了上述两个原则。代码与屏障添加的示意图如下所示

禁止进行指令重排序 补充

由此可总结出volatile的使用技巧:

  1. 写变量让volatile修饰的变量的在代码的最后位置
  2. 读变量让volatile修饰的变量的在代码的最开始位置

2.6 AQS

AQS(AbstractQueuedSynchronizer)是阻塞式锁和相关的同步器工具的框架,它是构建锁或者其他同步组件的基础框架。
AQS常见的实现类:ReentrantLock阻塞式锁、Semaphore信号量、CountDownLatch倒计时锁

2.6.1 与synchronized的区别

synchronized AQS
是关键字,C++语言实现 Java语言实现
悲观锁,自动释放锁 悲观锁,手动开启和关闭
锁竞争激烈,都是重量级锁,性能差 锁竞争激烈时提供了多种解决方案

2.6.2 工作机制

  • 在AQS中维护了一个使用了volatile修饰的state属性来表示资源的状态,0表示无锁,1表示有锁
  • 提供了基于FIFO的等待队列,类似于Monitor的EntryList
  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于Monitor的WaitSet

【例】如下图所示

  1. 线程0先尝试修改state属性,若发现state属性是0,就修改state状态为1,表示线程0抢锁成功
  2. 线程1和线程2也会先尝试修改state属性,发现state的值已为1,说明有其他线程持有锁,它们就会进入FIFO队列进行等待

AQS

常见问答

  1. 如果多个线程共同去抢这个资源,如何保证原子性的?
    【答】在去修改state状态的时候,使用的CAS自旋锁来保证原子性,确保只能有一个线程修改成功,修改失败的线程将会进入FIFO队列中等待。如下图所示

多个线程共同去抢这个资源,如何保证原子性

  1. AQS是公平锁还是非公平锁?
    【答】

    • 若新的线程与队列中的线程共同来抢资源,则为非公平锁
    • 若新的线程到队列中等待,只让队列中的head线程获取锁,则为公平锁

      比较典型的AQS实现类ReentrantLock默认就是非公平锁,新的线程与队列中的线程共同来抢资源

2.7 ReentrantLock

2.7.1 特点

ReentrantLock(可重入锁)相对于synchronized具备以下特点:

  1. 可中断
  2. 可以设置超时时间
  3. 可以设置公平锁
  4. 支持多个条件变量
  5. 与synchronized一样,都支持重入

ReentrantLock

2.7.2 实现原理

ReentrantLock主要利用CAS+AQS队列来实现。它支持公平锁和非公平锁,两者的实现类似。

构造方法接受一个可选的公平参数(默认非公平锁),当设为true时,表示公平锁,否则为非公平锁。公平锁的效率往往没有非公平锁的效率高,在许多线程访问的情况下,公平锁表现出较低的吞吐量。

实现原理

其中NonfairSync和FairSync这两个类父类都是Sync

Sync

而Sync的父类是AQS,所以可以得出ReentrantLock底层主要实现就是基于AQS来实现的

ReentrantLock底层

2.7.3 工作流程

工作流程

  1. 线程来抢锁后使用CAS方式修改state状态,修改状态成功为1,则让exclusiveOwnerThread属性指向当前线程,获取锁成功
  2. 若修改状态失败,则会进入双向队列中等待(head指向双向队列头部,tail指向双向队列尾部)
  3. 当exclusiveOwnerThread为null时,则会唤醒在双向队列中等待的线程

公平锁:按照先后顺序获取锁;非公平锁:不在排队的线程也可以抢锁

2.8 synchronized和Lock的区别

  • 语法层面
    • synchronized是关键字,源码在JVM中,用C++语言实现
    • Lock是接口,源码由JDK提供,用Java语言实现
    • 使用synchronized时,退出同步代码块锁会自动释放,而使用Lock时,需要手动调用unlock方法释放锁
  • 功能层面
    • 二者均属于悲观锁、都具备基本的互斥、同步、锁重入功能
    • Lock提供了许多synchronized不具备的功能,例如获取等待状态、公平锁、可打断、可超时、多条件变量
    • Lock有适合不同场景的实现,如ReentrantLock, ReentrantReadWriteLock等
  • 性能层面
    • 在没有竞争时,synchronized做了很多优化,如偏向锁、轻量级锁,性能较好
    • 在竞争激烈时,Lock的实现通常会提供更好的性能

2.9 死锁

2.9.1 产生的条件

死锁:一个线程需要同时获取多把锁

【例】t1线程获得A对象锁,接下来想获取B对象的锁;t2线程获得B对象锁,接下来想获取A对象的锁

public class Deadlock {
    public static void main(String[] args) {
        Object A = new Object();
        Object B = new Object();
        Thread t1 = new Thread(() -> {
            synchronized (A) {
                System.out.println("lock A");
                try {
                    sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                synchronized (B) {
                    System.out.println("lock B");
                    System.out.println("操作...");
                }
            }
        }, "t1");

        Thread t2 = new Thread(() -> {
            synchronized (B) {
                System.out.println("lock B");
                try {
                    sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                synchronized (A) {
                    System.out.println("lock A");
                    System.out.println("操作...");
                }
            }
        }, "t2");

        t1.start();
        t2.start();
    }
}

死锁

此时程序并没有结束,这种现象就是死锁现象——线程t1持有A的锁等待获取B锁,线程t2持有B的锁等待获取A的锁。

2.9.2 死锁诊断

当程序出现了死锁现象,可以使用jdk自带的工具:jpsjstack

死锁诊断步骤如下:

  1. 查看运行的线程

查看运行的线程

  1. 使用jstack查看线程运行的情况。根据所查得的TID,运行命令:jstack -l 46032

jstack

其他解决工具、可视化工具:

  • jconsole:用于对jvm的内存、线程、类的监控,是一个基于jmx的GUI性能监控工具
  • VisualVM:故障处理工具。能够监控线程,内存情况,查看方法的CPU时间和内存中的对象,已被GC的对象,反向查看分配的堆栈

2.10 ConcurrentHashMap

ConcurrentHashMap是一种线程安全的高效Map集合

底层数据结构:

  • JDK1.7底层采用分段的数组+链表实现
  • JDK1.8 采用的数据结构跟HashMap1.8的结构一样,数组+链表/红黑二叉树。

2.10.1 JDK1.7中的底层原理

数据结构:

ConcurrentHashMap

  • 提供了一个segment数组,在初始化时可以指定数组的长度,默认为16,一旦初始化之后中间不可扩容
  • 在每个segment中都可以挂一个HashEntry数组,数组里面可以存储具体的元素,HashEntry数组是可以扩容的
  • 在HashEntry存储的数组中存储的元素,如果发生冲突,则可以挂单向链表

存储流程:

ConcurrentHashMap存储流程

  1. 先计算key的哈希值,确定segment数组下标
  2. 再通过哈希值确定HashEntry数组中的下标,存储数据
  3. 在进行操作数据前,会先判断当前segment对应下标位置是否有线程进行操作。为了线程安全使用ReentrantLock进行加锁,若要获取锁会使用cas自旋锁进行尝试

2.10.2 JDK1.8中的底层原理

在JDK1.8中放弃了Segment臃肿的设计,数据结构与HashMap一样:数组+红黑树+链表

采用CAS + Synchronized来保证并发安全进行实现

  • CAS控制数组节点的添加
  • synchronized只锁定当前链表或红黑二叉树的首节点,只要哈希不冲突,就不会产生并发的问题 , 效率得到提升

JDK1.8中的底层原理

2.11 导致并发程序出现问题的根本原因

根本原因:Java并发编程三大特性——原子性、内存可见性、有序性

2.11.1 原子性

一个线程在CPU中操作不可暂停,也不可中断,要不执行完成,要不不执行

【例】以下代码会出现超卖或者是一张票卖给同一个人,执行并不是原子性的

原子性

解决方案:

  1. synchronized:同步加锁
  2. JUC里的Lock:加锁

原子性 解决方案

2.11.2 内存可见性

让一个线程对共享变量的修改对另一个线程可见

【例】以下代码不能保证内存可见性

内存可见性

解决方案:

  1. synchronized
  2. volatile(推荐)
  3. Lock

2.11.3 有序性

指令重排:处理器为了提高程序运行效率,可能会对输入代码进行优化。它不保证程序中各个语句的执行先后顺序同代码中的顺序一致,但是它会保证程序最终执行结果和代码顺序执行的结果是一致的。

【例】使用2.5.2中的例子

有序性

解决方案:volatile


3 线程池

3.1 核心参数

线程池 核心参数

  • corePoolSize:核心线程数目
  • maximumPoolSize:最大线程数目(核心线程 + 救急线程的最大数目)
  • keepAliveTime:生存时间。救急线程的生存时间,生存时间内没有新任务,此线程资源会释放
  • unit:时间单位。救急线程的生存时间单位,如秒、毫秒等
  • workQueue:当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务
  • threadFactory:线程工厂。可以定制线程对象的创建,例如设置线程名字、是否是守护线程等
  • handler:拒绝策略。当所有线程都在繁忙,workQueue也放满时,会触发拒绝策略

3.2 工作流程

线程池 工作流程

  1. 任务在提交的时候,首先判断核心线程数是否已满,如果没有满则直接添加到工作线程执行。
  2. 如果核心线程数满了,则判断阻塞队列是否已满,若没有满,当前任务存入阻塞队列。
  3. 如果阻塞队列也满了,则判断线程数是否小于最大线程数,若满足条件,则使用临时线程执行任务。
    核心或临时线程执行完成任务后,会检查阻塞队列中是否有需要执行的线程,如果有,则使用非核心线程执行任务。
  4. 如果所有线程都在忙(核心线程+临时线程),则走拒绝策略。

拒绝策略:

  • AbortPolicy:直接抛出异常,默认策略;
  • CallerRunsPolicy:用调用者所在的线程来执行任务;
  • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  • DiscardPolicy:直接丢弃任务;

【例】测试线程池

public class TestThreadPoolExecutor {
    static class MyTask implements Runnable {
        private final String name;
        private final long duration;

        public MyTask(String name) {
            this(name, 0);
        }

        public MyTask(String name, long duration) {
            this.name = name;
            this.duration = duration;
        }

        @Override
        public void run() {
            try {
                LoggerUtils.get("myThread").debug("running..." + this);
                Thread.sleep(duration);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "MyTask(" + name + ")";
        }
    }

    public static void main(String[] args) throws InterruptedException {
        AtomicInteger c = new AtomicInteger(1);
        ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(2);
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,
                3,
                0,
                TimeUnit.MILLISECONDS,
                queue,
                r -> new Thread(r, "myThread" + c.getAndIncrement()),
                new ThreadPoolExecutor.AbortPolicy());
        showState(queue, threadPool);
        threadPool.submit(new MyTask("1", 3600000));
        showState(queue, threadPool);
        threadPool.submit(new MyTask("2", 3600000));
        showState(queue, threadPool);
        threadPool.submit(new MyTask("3"));
        showState(queue, threadPool);
        threadPool.submit(new MyTask("4"));
        showState(queue, threadPool);
        threadPool.submit(new MyTask("5",3600000));
        showState(queue, threadPool);
        threadPool.submit(new MyTask("6"));
        showState(queue, threadPool);
    }

    private static void showState(ArrayBlockingQueue<Runnable> queue, ThreadPoolExecutor threadPool) {
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        List<Object> tasks = new ArrayList<>();
        for (Runnable runnable : queue) {
            try {
                Field callable = FutureTask.class.getDeclaredField("callable");
                callable.setAccessible(true);
                Object adapter = callable.get(runnable);
                Class<?> clazz = Class.forName("java.util.concurrent.Executors$RunnableAdapter");
                Field task = clazz.getDeclaredField("task");
                task.setAccessible(true);
                Object o = task.get(adapter);
                tasks.add(o);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        LoggerUtils.main.debug("pool size: {}, queue: {}", threadPool.getPoolSize(), tasks);
    }
}

3.3 WorkQueue(阻塞队列)

WorkQueue(阻塞队列):当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建救急线程执行任务。常见的有如下4个

  1. ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。
  2. LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。
  3. DelayedWorkQueue:是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的
  4. SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作。

用的最多为LinkedBlockingQueue和ArrayBlockingQueue,区别如下

LinkedBlockingQueue ArrayBlockingQueue
默认无界,支持有界 强制有界
底层是链表 底层是数组
是懒惰的,创建节点的时候添加数据 提前初始化Node 数组
入队会生成新Node Node需要提前创建好
两把锁(头尾) 一把锁

如下图所示,左、右分别为LinkedBlockingQueue、ArrayBlockingQueue的加锁方式

  • LinkedBlockingQueue读和写各有一把锁,性能相对较好
  • ArrayBlockingQueue只有一把锁,读和写公用,性能相对于LinkedBlockingQueue差一些

WorkQueue

3.4 如何确定核心线程数

  1. 高并发、任务执行时间短:CPU核数 + 1,减少线程上下文的切换
  2. 并发不高、任务执行时间长
    • IO密集型任务(文件读写、DB读写、网络请求等):2 * CPU核数 + 1
    • CPU密集型任务(计算型代码、Bitmap转换、Gson转换等):CPU核数 + 1
  3. 并发高、业务执行时间长:解决这种类型任务的关键不在于线程池而在于整体架构的设计——先看某些数据是否能做缓存,再增加服务器。线程池的设置参考 2.

查看CPU核数:

查看CPU核数

3.5 线程池的种类

在java.util.concurrent.Executors类中提供了大量创建连接池的静态方法,常见的有以下4种

3.5.1 FixedThreadPool(使用固定线程数的线程池)

FixedThreadPool(使用固定线程数的线程池)

使用固定线程数的线程池

  • 核心线程数与最大线程数一样,没有救急线程
  • 阻塞队列是LinkedBlockingQueue,最大容量为Integer.MAX_VALUE
  • 适用场景:适用于任务量已知,相对耗时的任务

【例】

public class FixedThreadPoolCase {
    static class FixedThreadDemo implements Runnable{
        @Override
        public void run() {
            String name = Thread.currentThread().getName();
            for (int i = 0; i < 2; i++) {
                System.out.println(name + ":" + i);
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //创建一个固定大小的线程池,核心线程数和最大线程数都是3
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        for (int i = 0; i < 5; i++) {
            executorService.submit(new FixedThreadDemo());
            Thread.sleep(10);
        }

        executorService.shutdown();
    }
}

3.5.2 SingleThreadExecutor(单线程化的线程池)

SingleThreadExecutor(单线程化的线程池)只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO)执行

单线程化的线程池

  • 核心线程数和最大线程数都是1
  • 阻塞队列是LinkedBlockingQueue,最大容量为Integer.MAX_VALUE
  • 适用场景:适用于按照顺序执行的任务

【例】

public class NewSingleThreadCase {
    static int count = 0;

    static class Demo implements Runnable {
        @Override
        public void run() {
            count++;
            System.out.println(Thread.currentThread().getName() + ":" + count);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //单个线程池,核心线程数和最大线程数都是1
        ExecutorService exec = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 10; i++) {
            exec.execute(new Demo());
            Thread.sleep(5);
        }
        exec.shutdown();
    }
}

3.5.3 CachedThreadPool(可缓存线程池)

CachedThreadPool(可缓存线程池)

可缓存线程池

  • 核心线程数为0
  • 最大线程数为Integer.MAX_VALUE
  • 阻塞队列为SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作。
  • 适用场景:适合任务数比较密集,但每个任务执行时间较短的情况

【例】

public class CachedThreadPoolCase {
    static class Demo implements Runnable {
        @Override
        public void run() {
            String name = Thread.currentThread().getName();
            try {
                //修改睡眠时间,模拟线程执行需要花费的时间
                Thread.sleep(100);

                System.out.println(name + "执行完了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //创建一个缓存的线程,没有核心线程数,最大线程数为Integer.MAX_VALUE
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            exec.execute(new Demo());
            Thread.sleep(1);
        }
        exec.shutdown();
    }
}

3.5.4 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor

  • 适用场景:有定时和延迟执行的任务

【例】

public class ScheduledThreadPoolCase {
    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                String name = Thread.currentThread().getName();

                System.out.println(name + ", 开始:" + new Date());
                Thread.sleep(1000);
                System.out.println(name + ", 结束:" + new Date());

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //按照周期执行的线程池,核心线程数为2,最大线程数为Integer.MAX_VALUE
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
        System.out.println("程序开始:" + new Date());

        /**
         * schedule 提交任务到线程池中
         * 第1个参数:提交的任务
         * 第2个参数:任务执行的延迟时间
         * 第3个参数:时间单位
         */
        scheduledThreadPool.schedule(new Task(), 0, TimeUnit.SECONDS);
        scheduledThreadPool.schedule(new Task(), 1, TimeUnit.SECONDS);
        scheduledThreadPool.schedule(new Task(), 5, TimeUnit.SECONDS);

        Thread.sleep(5000);

        // 关闭线程池
        scheduledThreadPool.shutdown();

    }
}

3.6 为何不建议用Executors创建线程池

参考阿里开发手册《Java开发手册-嵩山版》

Java开发手册-嵩山版 为何不建议用Executors创建线程池


4 其他

4.1 CountDownLatch

CountDownLatch(闭锁/倒计时锁)用来进行线程同步协作,等待所有线程完成倒计时(一个或者多个线程,等待其他多个线程完成某件事情之后才能执行)

  • 构造参数初始化等待计数值
  • await():等待计数归零
  • countDown():让计数减一

CountDownLatch

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        // 初始化一个倒计时锁,参数为3
        CountDownLatch latch = new CountDownLatch(3);

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"-begin...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            // count--
            latch.countDown();
            System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
        }).start();
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"-begin...");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            // count--
            latch.countDown();
            System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
        }).start();
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"-begin...");
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            // count--
            latch.countDown();
            System.out.println(Thread.currentThread().getName()+"-end..." +latch.getCount());
        }).start();
        String name = Thread.currentThread().getName();
        System.out.println(name + "-waiting...");
        // 等待其他线程完成
        latch.await();
        System.out.println(name + "-wait end...");
    }
}

4.2 Semaphore

Semaphore(信号量)是JUC包下的一个工具类,可以通过其限制执行的线程数量,达到限流的效果。
当一个线程执行时先通过其方法进行获取许可操作,获取到许可的线程继续执行业务逻辑,当线程执行完成后进行释放许可操作,未获取达到许可的线程进行等待或者直接结束。

两个重要的方法:

  • acquire():请求一个信号量,这时候的信号量个数-1(一旦没有可使用的信号量,即信号量个数变为负数时,再次请求的时候就会阻塞,直到其他线程释放了信号量)
  • release():释放一个信号量,此时信号量个数+1

【例】线程任务类

public class SemaphoreCase {
    public static void main(String[] args) {
        // 1. 创建 semaphore 对象
        Semaphore semaphore = new Semaphore(3);
        // 2. 10个线程同时运行
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {

                try {
                    // 3. 获取许可
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println("running...");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("end...");
                } finally {
                    // 4. 释放许可
                    semaphore.release();
                }
            }).start();
        }
    }
}

4.3 ThreadLocal

ThreadLocal(线程局部变量)是多线程中对于解决线程安全的一个操作类。用于实现线程内的数据共享,即对于相同的程序代码,多个模块在同一个线程中运行时要共享一份数据,而在另外线程中运行时又共享另外一份数据。它会为每个线程都分配一个独立的线程副本从而解决了变量并发访问冲突的问题。

案例:使用JDBC操作数据库时,会将每一个线程的Connection放入各自的ThreadLocal中,从而保证每个线程都在各自的Connection上进行数据库的操作,避免A线程关闭了B线程的连接。

ThreadLocal

4.3.1 基本使用

三个主要方法:

  • set(value):设置值
  • get():获取值
  • remove():清除值
public class ThreadLocalTest {
    static ThreadLocal<String> threadLocal = new ThreadLocal<>();

    public static void main(String[] args) {
        new Thread(() -> {
            String name = Thread.currentThread().getName();
            threadLocal.set("itcast");
            print(name);
            System.out.println(name + "-after remove : " + threadLocal.get());
        }, "t1").start();
        new Thread(() -> {
            String name = Thread.currentThread().getName();
            threadLocal.set("itheima");
            print(name);
            System.out.println(name + "-after remove : " + threadLocal.get());
        }, "t2").start();
    }

    static void print(String str) {
        //打印当前线程中本地内存中本地变量的值
        System.out.println(str + " :" + threadLocal.get());
        //清除本地内存中的本地变量
        threadLocal.remove();
    }
}

4.3.2 实现原理&源码解析

ThreadLocal本质是一个线程内部存储类,让多个线程只操作自己内部的值,从而实现线程数据隔离。

在ThreadLocal中有一个内部类叫做ThreadLocalMap,类似于HashMap。ThreadLocalMap中有一个属性table数组,这才是真正存储数据的位置。

ThreadLocal 实现原理

  • set()方法

set()方法

  • get() / remove()方法

get() / remove()方法

4.3.3 内存泄露问题

Java对象中的四种引用类型:强引用、软引用、弱引用、虚引用

  • 强引用:最为普通的引用方式,表示一个对象处于有用且必须的状态,如果一个对象具有强引用,则GC并不会回收它。即便堆中内存不足了,宁可出现OOM,也不会对其进行回收

强引用

  • 弱引用:表示一个对象处于可能有用且非必须的状态。在GC线程扫描内存区域时,一旦发现弱引用,就会回收到弱引用相关联的对象。对于弱引用的回收,无关内存区域是否足够,一旦发现则会被回收

弱引用

每一个Thread维护一个ThreadLocalMap,在ThreadLocalMap中的Entry对象继承了WeakReference。其中key为使用弱引用的ThreadLocal实例,value为线程变量的副本

在ThreadLocalMap中的Entry对象继承了WeakReference

在使用ThreadLocal的时候,强烈建议:务必手动remove!

发表评论