1. 题目的引出
线程安全归根结底可以说是内存安全,在jvm内存模子中,有一块特殊的公共内存空间,称为堆内存,进程内的所有线程都可以访问并修改其中的数据,就会造成埋伏的题目。因为堆内存空间在没有保护机制的情况下,你放进去的数据,可能被别的线程窜改。如下代码:
- public class ThreadSafe implements Runnable {
- private static int count = 0;
- @Override
- public void run() {
- for (int i = 0; i < 1000; i++) {
- count++;
- }
- }
- public static void main(String[] args) throws InterruptedException {
- ExecutorService es = Executors.newFixedThreadPool(10);
- for (int i = 0; i < 20; i++) {
- es.execute(new ThreadSafe());
- }
- es.shutdown(); //不答应添加线程到线程池,异步关闭毗连池
- es.awaitTermination(10L, TimeUnit.SECONDS); //等候毗连池的线程任务完成
- System.out.println(count);
- }
- }
复制代码
原来渴望的值是20000,可是终极输出的结果却一点在厘革,其值总是小于即是20000,显然这是由于线程不安全造成的,多个线程并发的去访问全局变量、静态变量、文件、装备、套接字等都可能出现这种题目。
2. 线程同步的步伐
为了和谐和配合线程之间对共享资源的访问,通常有四种方式:
1. 临界区:访问某一段临界资源的代码片断,与共享资源雷同,但有一点差异的是,某一时间只答应一个线程去访问(对应java中的关键字 synchronized包罗的代码)。
2. 互斥量:互斥量是一个对象,只有都拥有互斥量的对象才可以访问共享资源。而且互斥量中只有一个,通常互斥量的实现是通过锁来实现的,而且加锁操纵和开释操纵只能由同一个线程来完成。此处与临界区的区别是一段代码,通常存在与一个文件中,而互斥量是一个对象,加锁操纵息争锁操纵可以在差异的文件去编写,只要是同一个线程就好。
3. 信号量: 信号量可以答应指定命量的线程同一时间去访问共享资源,当线程数达到了阈值后,将制止其他线程的访问,最常见的比如生产者和消耗者题目。信号量和互斥量的区别则是信号量的发出和开释可以由差异线程来完成。
4. 变乱:通过发送关照的情势来实现线程同步,可以实现差异进程中的线程同步操纵。
3.饥饿与死锁
饥饿:某些线程或进程由于长期得不到资源,而总是处于停当或者阻塞状态。比方:
①. 该进程或线程所拥有的CPU时间片被其他线程抢占而得不到执行(通常是优先级比它高的线程或进程),不停处于停当状态。
②. 由于选用不适当的调理算法,导致该进程或线程长期无法得到CPU时间片,处于停当状态。
③. 由于唤醒的时间把握不对,唤醒线程时,所需的资源处于被锁定状态,导致线程回到阻塞状态。
死锁:两个或多个线程在执行过程中,由于相互占有对方所需的资源而又各执己见从而造成这些线程都被阻塞,若无外力的作用下,他们都将无法执行下去。比方
①. 进程推进顺序不合适。互相占有彼此必要的资源,同时哀求对方占有的资源,形成循环依靠的关系。
②. 资源不足。
③. 进程运行推进顺序与速率差异,也可能产生死锁。
一些制止死锁的步伐:
-
不要在锁地区内涵加把锁,即不要在开释锁之前竞争其他锁。
-
减小锁粒度,即减小线程加锁的范围,真正必要的时间再去加锁。
-
顺序访问共享资源。
-
设置超机遇制,凌驾指定时间则程序返回错误。
-
竞争锁期间,答应程序被制止。
4.代码层面解决线程安全
解决线程安全主要考虑三方面:
-
可见性:当多个线程并发的读写某个共享资源的时间,每个线程总是可以取到该共享资源的最新数据。
-
原子性:某线程对一个或者多个共享资源所做的连续串写操纵不会被制止,在此期间不会有其他线程同时对这些共享资源举行写操纵。
-
有序性:单个线程内的操纵必须是有序的。
通常原子性都可以得到保证,题目的病端就出在可见性和原子性。
4.1 可见性的题目
如下实例程序,按通常的明白来说,当主线程等候一秒后,把flag的值修改为true后,另外一个线程应该可以感知到,然后跳过while循环,直接打印出后面的数据,可是最结果却不停卡在了while循环里。
- public class Thread4 implements Runnable{
- private static boolean flag = false;
- @Override
- public void run() {
- System.out.println("waiting for data....");
- while (!flag);
- System.out.println("cpying with data");
- }
- public static void main(String[] args) throws InterruptedException {
- Thread4 thread4 = new Thread4();
- Thread t = new Thread(thread4);
- t.start();
- Thread.sleep(1000);
- flag = true;
- }
- }
- /* output
- * waiting for data....
- */
复制代码
主要的原因是java程序在jvm上运行的时间,该程序所占用的内存分为两类主内存和工作内存(逻辑上的内存,实际上是cpu的寄存器和高速缓存,因为,cpu在盘算的时间,并不总是从内存读取数据,它的数据读取顺序优先级是:寄存器-高速缓存-内存。CPU和内存之间通过总线举行)。也就是在主线程中启动另一个线程t会开辟出一个新的工作内存,与主线程的工作内存相互独立,且线程之间无法直接通讯,只能去主内存读取全局变量,而线程t中做while判断的时间并不会去读取主内存的flag值,致使线程t无法被感知到flag在其他线程被改变,可以做一个测试,如今把run函数改成如下情势:
- public class Thread4 implements Runnable{
- private volatile static boolean flag = false;
- @Override
- private volatile static boolean flag = false;
- public void run() {
- System.out.println("waiting for data....");
- while (!flag);
- System.out.println("cpying with data");
- }
- //....
-
- }
- /* output
- * waiting for data....
- * false
- * ...
- * false
- * cpying with data
- */
复制代码
为了感知其他线程中一些全局变量值的厘革,而且制止频仍去测试主内存中的数据厘革,保证线程之间的可见性,可以利用volatile关键字去修饰全局变量,如下:
- public void run() {
- System.out.println("waiting for data....");
- /* Notice
- 假如在while循环里加上System.out.println(flag);语句,则不会利用本工作内存的flag数据,
- 而是重新去主内存加载数据
- */
- while (!flag){
- System.out.println(flag); //测试,可以做到线程的可见性
- }
- System.out.println("cpying with data");
- }
- /* output
- * waiting for data....
- * false
- * ...
- * false
- * cpying with data
- */
复制代码
volatile关键字借助MESI一致性协议,会在工作内存(CPU的寄存器等)与主内存毗连的总线上创建一道总线嗅探机制,一旦发现其他线程修改了主内存中的某个全局变量(即图中橙灰色线条读取的数据以及写回的数据),就会让其他工作线程中从主内存拷贝出来的副本变量失效(即图中紫色的线条读取的数据),从而会使左边的线程重新去读取数据(即图中红色的线条读取的数据)。如下图:
固然解决了原子性题目,可是volatile关键字不支持原子性操纵,如下程序:
- public class Thread5 implements Runnable {
- private static volatile int count = 0;
- @Override
- public void run() {
- for (int i = 0; i < 10000; i++) {
- count++;
- }
- }
- public static void main(String[] args) throws InterruptedException {
- ExecutorService es = Executors.newFixedThreadPool(10);
- for (int i = 0; i < 20; i++) {
- es.execute(new Thread5());
- }
- es.shutdown(); //不答应添加线程,异步关闭毗连池
- es.awaitTermination(10L, TimeUnit.SECONDS); //等候毗连池的线程任务完成
- System.out.println(count);
- }
- }
- /* output
- * 175630
- */
复制代码
4.2 原子性题目
针对原子性题目,我们可以利用熟悉的synchronized关键字,synchronized关键字最主要有以下3种应用方式:
-
修饰实例方法,作用于当前实例加锁,进入同步代码前要得到当前实例的锁
-
修饰静态方法,作用于当前类对象加锁,进入同步代码前要得到当前类对象的锁
-
修饰代码块,指定加锁对象,对给定对象加锁,进入同步代码库前要得到给定对象的锁。
部门示例代码如下:
- public class Thread5 implements Runnable {
- private static int count = 0;
- public synchronized static void add() {
- count++;
- }
- @Override
- public void run() {
- for (int i = 0; i < 1000000; i++) {
- // add()
- synchronized (Thread5.class){
- count++;
- }
- }
- }
- public static void main(String[] args) throws InterruptedException {
- ExecutorService es = Executors.newFixedThreadPool(10);
- for (int i = 0; i < 20; i++) {
- es.execute(new Thread5());
- }
- es.shutdown(); //不答应添加线程,异步关闭毗连池
- es.awaitTermination(10L, TimeUnit.SECONDS); //等候毗连池的线程任务完成
- System.out.println(count);
- }
- }
- /* output
- * 20000000
- */
复制代码
然而synchronized是一种灰心锁,具有强烈的独占和排他特性,它频仍的加锁和开释锁操纵会使程序的效率低下。与灰心锁相对是一种乐观锁操纵CAS(CompareAndSwap),乐观锁就是每次去取数据的时间都乐观的以为数据不会被修改,因此这个过程不会上锁,但是在更新的时间会判断一下在此期间的数据有没有更新,假如没有更新则去修改,否则失败。可是上面这种 操纵会出现ABA(A-B-A,中途被改变,但最后又改回原值)的题目,
针对上面的题目,java中可以利用Atomic,它的包名为java.util.concurrent.atomic 。这个包内里提供了一组原子变量的操纵类(通过值加版本号的方式去解决ABA题目),这些类可以保证在多线程环境下,当某个线程在执行atomic的方法时,不会被其他线程打断,不停比及该方法执行完成(详细的API文档可以检察参考文献第5点)。
- public class ThreadSafe implements Runnable {
- private static AtomicInteger count = new AtomicInteger(0);
- @Override
- public void run() {
- for (int i = 0; i < 10000; i++) {
- count.getAndAdd(1);
- }
- }
- public static void main(String[] args) throws InterruptedException {
- ExecutorService es = Executors.newFixedThreadPool(10);
- for (int i = 0; i < 20; i++) {
- es.execute(new ThreadSafe());
- }
- es.shutdown(); //不答应添加线程,异步关闭毗连池
- es.awaitTermination(10L, TimeUnit.SECONDS); //等候毗连池的线程任务完成
- System.out.println(count);
- }
- }
- /* output
- * 200000
- */
复制代码
5. 其他方法解决线程同步
a. 自旋锁
线程循环反复查抄锁变量是否可用,在这一过程中线程不停保持执行(RUNNABLE),因此是一种忙等候,不像关键字synchronized一样,一旦发现不能访问,则处于线程处于阻塞状态(BLOCKED)。
- public class Thread6 implements Runnable{
- private static final Lock lock = new ReentrantLock();
- private volatile static int count = 0;
- @Override
- public void run() {
- for (int i = 0; i < 1000000; i++){
- lock.lock();
- count++;
- lock.unlock();
- }
- }
- static void test(ExecutorService es) throws InterruptedException {
- for (int i = 0; i < 20; i++) {
- es.execute(new Thread6());
- }
- es.shutdown(); //不答应添加线程,异步关闭毗连池
- es.awaitTermination(10L, TimeUnit.SECONDS); //等候毗连池的线程任务完成
- System.out.println(count);
- }
- public static void main(String[] args) throws InterruptedException {
- ExecutorService es = Executors.newFixedThreadPool(20);
- test(es);
- }
- }
复制代码
假如在利用lock的时间包罗了try...catch...语句,要注意的是lock 必须在 finally 块中开释。否则,假如受保护的代码将抛出异常,锁就有可能永久得不到开释!
与Lock类同一个包java.util.concurrent.locks 下另有一种读写分离的锁ReentrantReadWriteLock类,读写锁维护了一对锁,一个读锁和一个写锁。一样平常情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁可以或许提供比排它锁更好的并发性和吞吐量。
- public class RWTest {
- private static final Map<String, Object> map = new HashMap<String, Object>();
- private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private static final Lock readLock = lock.readLock();
- private static final Lock writeLock = lock.writeLock();
- public static final Object get(String key) {
- readLock.lock();
- try {
- return map.get(key);
- } finally {
- readLock.unlock();
- }
- }
- public static final Object put(String key, Object value) {
- writeLock.lock();
- try {
- return map.put(key, value);
- } finally {
- writeLock.unlock();
- }
- }
- public static final void clear() {
- writeLock.lock();
- try {
- map.clear();
- } finally {
- writeLock.unlock();
- }
- }
- }
复制代码
利用自旋锁Lock也提供了Condition 来实现线程间的状态关照的,可以根据实际情况去唤醒某个线程(与后面的wait差异,是随机的)或者所有线程。可以通过lock.newCondition() 来获取得Condition实例,可以根据实际需求创建多个实例。
- public class Thread9 {
- public static ReentrantLock lock=new ReentrantLock();
- public static Condition condition =lock.newCondition();
- public static void main(String[] args) {
- new Thread(){
- @Override
- public void run() {
- lock.lock();//哀求锁
- try{
- System.out.println(Thread.currentThread().getName()+"==》进入等候");
- condition.await();//设置当火线程进入等候
- }catch (InterruptedException e) {
- e.printStackTrace();
- }finally{
- lock.unlock();//开释锁
- }
- System.out.println(Thread.currentThread().getName()+"==》继续执行");
- }
- }.start();
- new Thread(){
- @Override
- public void run() {
- lock.lock();//哀求锁
- try{
- System.out.println(Thread.currentThread().getName()+"=》进入");
- Thread.sleep(2000);//苏息2秒
- condition.signal();//随机唤醒等候队列中的一个线程
- System.out.println(Thread.currentThread().getName()+"苏息竣事");
- }catch (InterruptedException e) {
- e.printStackTrace();
- }finally{
- lock.unlock();//开释锁
- }
- }
- }.start();
- }
- }
- /*output
- *Thread-0==》进入等候
- *Thread-1=》进入
- *Thread-1苏息竣事
- *Thread-0==》继续执行
- */
复制代码
b. wait.notify.notifyAll
在关键字synchronized的线程同步机制,调用线程的sleep,yield方法时,线程并不会让出对象锁,但是调用wait却差异,线程自动开释其占有的对象锁,同时不会去申请对象锁,当线程被唤醒的时间,它才再次去申请竞争对象的锁(该关键字通常只与synchronized团结利用)。notify()唤醒在等候该对象同步锁的线程(只唤醒一个,假如有多个在等候),注意的是在调用此方法的时间,并不能确切的唤醒某一个等候状态的线程,而是由JVM确定唤醒哪个线程,而且不是按优先级。而notifyAll()则是唤醒所有等候的线程。
- public class Thread8 implements Runnable {
- private int num;
- private Object lock;
- public Thread8(int num, Object lock) {
- this.num = num;
- this.lock = lock;
- }
- public void run() {
- try {
- while (true) {
- synchronized (lock) {
- lock.notifyAll();
- lock.wait();
- System.out.println(num);
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) {
- final Object lock = new Object();
- Thread thread1 = new Thread(new Thread8(1, lock));
- Thread thread2 = new Thread(new Thread8(2, lock));
- thread1.start();
- thread2.start();
- }
- }
- /* output
- * 交替输出1,2,1,2,1,2......
- */
复制代码
6.并发编程—CountDownLatch、CyclicBarrier、Semaphore和fork/join框架
1. CountDownLatch
CountDownLatch实现的是一个倒序计数器,可以通过调用它的countDown实现计数器减一和await方法来阻塞当火线程:
- public class Thread10 {
- public static void main(String[] args) throws InterruptedException {
- int count = 20;
- final CountDownLatch cdl = new CountDownLatch(count);
- ExecutorService es = Executors.newCachedThreadPool();
- for (int i = 0; i < count; i++) {
- es.execute(new Runnable() {
- @Override
- public void run() {
- try {
- System.out.println(cdl.getCount());
- }finally {
- cdl.countDown();
- }
- }
- });
- }
- cdl.await();
- es.shutdown();
- System.out.println("主线程如今才竣事: count = "+cdl.getCount());
- }
- }
复制代码
2.CyclicBarrier
即回环栅栏,是一种可重用的线程阻塞器,它将率先到达栅栏的这些线程阻塞(调用await()方法),直到指定命量的线程都到达该处,这些线程将会被全部开释。
- public class Thread11 implements Runnable{
- private int num;
- private static CyclicBarrier cb = new CyclicBarrier(6); //指定栅栏的等候线程数
- public Thread11(int num){
- this.num = num;
- }
- @Override
- public void run() {
- try {
- Thread.sleep(1000*num); //等候指定命量时间后到达栅栏处
- System.out.println(Thread.currentThread().getName() +" is coming..");
- cb.await(10L, TimeUnit.SECONDS);
- System.out.println("continue....");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) {
- ExecutorService es = Executors.newCachedThreadPool();
- for (int i = 0; i < 8; i++) {
- es.execute(new Thread11(i));
- }
- es.shutdown();
- }
- }
- /*
- *pool-1-thread-1 is coming..
- *pool-1-thread-2 is coming..
- *pool-1-thread-3 is coming..
- *continue....
- *continue....
- *continue....
- *pool-1-thread-4 is coming..
- *超时异常错误(指定时间内线程数量仍然到达)
- */
复制代码
3.Semaphore信号量
信号量用于保护对一个或多个共享资源的访问,其内部维护一个计数器,用来只是当前可以访问共享资源的数量。可以通过tryAcquire去实行获取答应,还可以通过availablePermits()方法得到可用的答应数量,而acquire/release则是获取/开释答应。
- public class Thread12 implements Runnable {
- private static SecureRandom random= new SecureRandom();
- private static Semaphore semaphore = new Semaphore(3, true);
- @Override
- public void run() {
- try {
- semaphore.acquire();
- System.out.println(Thread.currentThread().getName() + " got permission...");
- Thread.sleep(random.nextInt(10000));
- semaphore.release();
- System.out.println(Thread.currentThread().getName() + " released permission...");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) {
- ExecutorService es = Executors.newCachedThreadPool();
- for (int i = 0; i < 6; i++) {
- es.execute(new Thread12());
- }
- es.shutdown();
- }
- }
复制代码
4.fork/join框架
Fork/Join框架提供了的一个用于并行执行任务的框架,充实利用了CPU资源,把大任务分割成多少个小任务,终极汇总每个小任务结果后得到大任务结果的框架。(只供Java7利用)
Fork/Join利用两个类:
ForkJoinPool与其他类型的ExecutorService的差异之处主要在于利用工作盗取,每个线程都有自己的双端任务队列,线程在一样平常情况下会从队列头去获取任务,当某个线程任务队列的为空的时间,它会实行从其他线程的任务队列的尾部去“盗取”任务来执行。
- public class Thread13 extends RecursiveTask<Integer> {
- private int start;
- private int end;
- public Thread13(int start, int end) {
- this.start = start;
- this.end = end;
- }
- @Override
- protected Integer compute() {
- int m = 1000; //每个线程盘算的范围大小
- int s = start, n = end; //每个线程盘算的起始地点
- int r = 0; //算和的变量
- List<ForkJoinTask<Integer>> it = new ArrayList<ForkJoinTask<Integer>>();
- while (s <= n) {
- if (n - s < m) {
- for (int i = s; i <= n; i++) {
- r += i;
- }
- } else {
- n = Math.min(s + m - 1, n); //得到一个新的start
- it.add(new Thread13(s, n).fork()); //得到每一个范围[如(0,999)]参加一个线程
- }
- s = n + 1;
- n = end;
- }
- for (ForkJoinTask<Integer> t : it) {
- r += t.join();
- }
- return r;
- }
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- ForkJoinPool fjp = new ForkJoinPool();
- int s = 1, n = 10001;
- Future<Integer> rs = fjp.submit(new Thread13(s, n));
- System.out.println(rs.get());
- }
- }
- /* output
- * 50015001
- */
复制代码
参考文献
-
庞永华. Java多线程与Socket:实战微服务框架[M].电子工业出版社.2019.3
-
Executors类中创建线程池的几种方法的分析
-
知乎——假如你这样答复“什么是线程安全”,面试官都会对你刮目相看
-
知乎——Java线程内存模子,线程、工作内存、主内存
-
Java进阶——Java中的Atomic原子特性
-
深入明白Java并发之synchronized实现原理
-
Java的wait(), notify()和notifyAll()利用小结
-
java多线程-07-Lock和Condition
-
Java并发编程:CountDownLatch、CyclicBarrier和Semaphor
来源:https://www.cnblogs.com/helloworldcode/p/11728321.html |