JUC详解

简介

JUC(java.util.concurrent),Java并发包,可以让Java更好地处理多线程并发问题。

传统线程创建方式

Thread

继承Thread类,重写其中的run方法,在run方法中编写任务代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Main {
public static void main(String[] args) {
ThreadA threadA = new ThreadA();
threadA.start();
}
}

class ThreadA extends Thread{
@Override
public void run() {
System.out.println("通过继承Thread的方式创建一个线程:ThreadA");
}
}

start方法才是启动线程的方法,run方法是执行任务的方法,所以需要通过start方法来启动线程,线程会自动执行任务。

因为需要在Thread的run方法中编写任务代码,也就造成了线程与任务的耦合,一个线程与一个任务捆绑,不够灵活。

Runnable

我们是通过重写在run方法来提供任务的,如果我们看看Thread类就会发现,Thread类中的run方法也是重写的:

1
2
@Override
public void run() {

Thread类实现了Runnable接口,重写了Runnable接口中的run方法

1
2
3
4
5
6
7
@FunctionalInterface
public interface Runnable {
/**
* Runs this operation.
*/
void run();
}

也就是说,其实run方法和Thread类是可以分开的,不捆绑在一起。

如果我们不重写Thread类的run方法,执行的就是Thread类自己的run方法逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void run() {
Runnable task = holder.task; // 获取到task
if (task != null) {
Object bindings = scopedValueBindings(); // 获取线程上下文
runWith(bindings, task); // 执行
}
}

@Hidden
@ForceInline
private void runWith(Object bindings, Runnable op) {
ensureMaterializedForStackWalk(bindings);
op.run();
Reference.reachabilityFence(bindings);
}

可以看到,此时,run方法需要先获取到一个Runnable对象,然后才能在runWith方法中调用Runnable对象的run方法来执行任务。

上面我们是通过无参构造函数来创建Thread对象的:

1
ThreadA threadA = new ThreadA();

而无参构造函数旁边刚好有一个可以传入Runnable对象的构造函数:

1
2
3
4
5
6
7
public Thread() {
this(null, null, 0, null, 0, null);
}

public Thread(Runnable task) {
this(null, null, 0, task, 0, null);
}

那么,我们只需要自己实现Runnable接口,重写run方法,就可以将任务与线程拆开啦:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Main {
public static void main(String[] args) {
ThreadB threadB = new ThreadB();
Thread threadB1 = new Thread(threadB);
Thread threadB2 = new Thread(threadB);
threadB1.start();
threadB2.start();
}
}

class ThreadB implements Runnable{
@Override
public void run() {
System.out.println("通过实现Runnable接口的方式创建一个线程:ThreadB");
}
}

但是,这种创建线程的方式还有两个问题:

  1. 拿不到返回值,start方法的返回值是void。

  2. 线程内部异常不能被外层 try-catch 直接捕获:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public class Main {
    public static void main(String[] args) {
    ThreadB threadB = new ThreadB();

    Thread threadB1 = new Thread(threadB);
    try {
    threadB1.start();
    } catch (Exception e) {
    System.out.println("能捕捉到异常吗?");
    }
    }
    }

    class ThreadB implements Runnable {
    @Override
    public void run() {
    // 构建异常
    int i = 1 / 0;
    System.out.println("通过实现Runnable接口的方式创建一个线程:ThreadB");
    }
    }

    执行时,线程threadB1内部发生的异常,如果线程threadB1内部不处理的话,外部线程无法捕捉到该异常并处理。

FutureTask

JUC提供了一个FutureTask类,可以用来获取线程执行结果。

FutureTask类实现了RunnableFuture接口

1
public class FutureTask<V> implements RunnableFuture<V> {

RunnableFuture接口也继承了Runnable接口

1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

所以,FutureTask对象也可以用来构建一个Thread对象。那么,FutureTask类的run方法是如何实现的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void run() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); // 调用了Callable对象的call方法,并且获取到了结果result
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

除了对线程状态的判断外(保证任务只执行一次、避免并发重复执行),关键就是调用了Callable对象call方法,并且获取到了结果result。

而FutureTask类里也刚好有一个构造函数可以传递Callable对象:

1
2
3
4
5
6
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

也就是说,我们需要在Callable对象中编写具体的任务。看看Callable接口

1
2
3
4
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}

有一个需要实现的call方法,并且是有返回值的!

那么,清晰了,我们需要先实现Callable接口,并重写call方法,然后传递给FutureTask对象,再将FutureTask对象传递给Thread对象,就可以啦!既可以拿到返回值,又可以捕捉到线程内部的异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Main {
public static void main(String[] args) {
FutureTask<Integer> futureTask = new FutureTask<>(new TaskC());
new Thread(futureTask).start();
try {
System.out.println(futureTask.get());
} catch (Exception e) {
System.out.println("捕捉到异常");
}
}
}

class TaskC implements Callable<Integer> {
@Override
public Integer call() {
return 1;
}
}

CompletableFuture

如果有多个线程需要灵活合作的话,FutureTask用起来就不方便了。

JUC提供了CompletableFuture类,可以让我们灵活编排多个线程:

顺序执行

thenApplyAsync方法可以拿到前一个阶段的结果,然后开始执行当前任务:

1
2
3
4
5
6
7
8
public class Main {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> " future1 ok! ");
CompletableFuture<String> future2 = future1.thenApplyAsync((result) -> result+" future2 ok! ");
CompletableFuture<String> future3 = future2.thenApplyAsync((result) -> result+" future3 ok!");
System.out.println(future3.join());
}
}

阻塞等待

allOf方法可以组合多个任务,配合join/get方法阻塞等待所有的任务都执行完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Main {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> " future1 ok! ");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> " future2 ok! ");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> " future3 ok! ");
// 阻塞等待所有任务完成
CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3);
// 统一处理
all.thenRun(() -> {
String result1 = future1.join();
String result2 = future2.join();
String result3 = future3.join();
System.out.println(result1);
System.out.println(result2);
System.out.println(result3);
}).join(); // 阻塞等待处理完成
}
}

anyOf方法可以组合多个任务,配合join/get方法阻塞等待任意一个任务先执行完成:

1
2
3
4
5
6
7
8
9
10
public class Main {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> " future1 ok! ");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> " future2 ok! ");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> " future3 ok! ");
// 阻塞等待一个任务完成
CompletableFuture<Object> any = CompletableFuture.anyOf(future1, future2, future3);
any.thenAccept(result -> System.out.println("First one:" + result)).join();
}
}

线程池

CompletableFuture.supplyAsync方法还可以传入一个线程池,否则使用的就是公共线程池。使用公共线程池的话,可能会因为线程资源不足,影响公共线程池中的其他任务执行,所以推荐自定义一个线程池。

Executors

JUC提供了一个Executors类,可以帮助我们快捷创建线程池。

常用的有三个方法,分别创建一种常用的线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 创建只有一个线程的线程池  
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

// 创建有nThreads个线程的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

// 创建可以动态扩容到Integer.MAX_VALUE个线程的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

众所周知,阿里巴巴Java开发手册禁止通过Executors类来创建线程池,因为:

  1. SingleThreadExecutor和FixedThreadPool配置的等待队列为LinkedBlockingQueue(),默认的长度是Integer.MAX_VALUE。允许大量等待任务,容易导致内存溢出(OOM)。
  2. CachedThreadPool配置的最大线程数量为Integer.MAX_VALUE,允许创建大量的线程,也容易导致内存溢出(OOM)。

ThreadPoolExecutor

可以看到,Executors类里创建线程池的方法,其实是通过ThreadPoolExecutor类创建的,不过是帮我们配置了一些参数而已。如果要避开Executors类创建线程池的弊端,我们就需要通过ThreadPoolExecutor类来创建,并且自己配置参数。

ThreadPoolExecutor类的构造函数,有下面这几个参数:

  • int corePoolSize:核心线程数,即使处于空闲状态也不会被销毁的线程
  • int maximumPoolSize:线程池的最大线程数
  • long keepAliveTime:当核心线程用完、任务等待队列用完、最大线程数未用完时,会创建新线程来处理任务,这些线程在空闲时的存活时间
  • TimeUnit unit:keepAliveTime的单位
  • BlockingQueue workQueue:任务等待队列,核心线程用完时,任务会在等待队列进行等待
  • ThreadFactory threadFactory:创建线程时使用的工厂
  • RejectedExecutionHandler handler:当核心线程用完、任务等待队列用完、最大线程数用完时,执行的任务拒绝策略

辅助工具

关于控制多个线程间的行为,JUC还提供了三个工具来辅助我们:

Semaphore

可以将Semaphore理解为一个信号量,可以设置持有一定数量的许可证,许可证被分发给某个线程后,只有该线程释放许可证后,其他线程才可获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Main {
public static void main(String[] args) {
// 三个许可证
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
int temp = i;
new Thread(() -> {
boolean tried = false;
try {
// 等待获取一个许可证
tried = semaphore.tryAcquire();
if (tried) {
// 模拟任务处理耗时
Thread.sleep(1000);
System.out.println(temp + "号:占用一个许可证");
}
} catch (InterruptedException e) {
System.out.println("线程被中断");
} finally {
// 释放一个许可证
if (tried) {
semaphore.release();
System.out.println(temp + "号:释放一个许可证");
}
}
}).start();
}
}
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
0号:占用一个许可证
2号:占用一个许可证
1号:占用一个许可证
0号:释放一个许可证
2号:释放一个许可证
1号:释放一个许可证
3号:占用一个许可证
3号:释放一个许可证
5号:占用一个许可证
4号:占用一个许可证
4号:释放一个许可证
5号:释放一个许可证

可以看到,线程会阻塞等待获取许可证。

也就是说,可以用Semaphore来控制大量线程获取有限的资源。

CyclicBarrier

可以将CyclicBarrier理解为一个屏障,可以在任务中设置一个屏障,线程必须要等所有线程均抵达屏障处,才可继续向下执行任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Main {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4, () -> System.out.println("均抵达屏障处"));
for (int i = 0; i < 4; i++) {
int temp = i;
new Thread(() -> {
try {
// 模拟任务耗时
Thread.sleep(1000);
System.out.println(temp + "号已抵达屏障处");
// 等待其它线程抵达屏障处
cyclicBarrier.await();
// 继续任务
System.out.println(temp + "号继续任务");
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}).start();
}
}
}

结果:

1
2
3
4
5
6
7
8
9
2号已抵达屏障处
3号已抵达屏障处
1号已抵达屏障处
0号已抵达屏障处
均抵达屏障处
0号继续任务
2号继续任务
3号继续任务
1号继续任务

可以看到,每个线程都会等待其它线程抵达屏障处,然后才会继续执行任务。

也就是说,可以用CyclicBarrier来控制多个线程间互相等待。

CountDownLatch

可以将CountDownLatch理解为一个计数器,可以控制一个线程必须阻塞等待计数器归零,才能继续执行任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Main {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
int temp = i;
new Thread(() -> {
try {
// 模拟任务耗时
Thread.sleep(1000);
System.out.println(temp + "号执行完成");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 减少计数
countDownLatch.countDown();
}
}).start();
}
try {
// 阻塞等待计数器归零
countDownLatch.await();
System.out.println("全部执行完成");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

结果:

1
2
3
4
5
2号执行完成
1号执行完成
3号执行完成
0号执行完成
全部执行完成

可以看到,我们控制了main线程等待其他线程减少计数器,然后main线程才能继续执行任务。

也就是说,CountDownLatch可以用来控制一个线程等待其它线程。

原子类

前面我们介绍了JUC提供的很多工具,可以帮助我们很方便地构建多线程。但是,还无法解决多线程访问共享数据的并发安全问题。JUC在atomic包下提供了一些原子类,原子类封装了一些并发安全的方法。

常用的原子类有:AtomicBooleanAtomicIntegerAtomicLong

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Main {
public static void main(String[] args) throws InterruptedException {
AtomicInteger atomicInteger = new AtomicInteger();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
int get = atomicInteger.incrementAndGet();
System.out.println(get);
}).start();
}
// 等待线程执行完成
Thread.sleep(1000);
System.out.println("最终:" + atomicInteger.get());
}
}

原子类内部对值添加了volatile关键字:

1
private volatile int value;

这样保证了值的可见性,一个线程修改后,其它线程立刻能看见。

此外,对值的修改使用了Unsafe类下的CAS方法:

1
2
3
4
5
private static final Unsafe U = Unsafe.getUnsafe();  

public final boolean compareAndSet(int expectedValue, int newValue) {
return U.compareAndSetInt(this, VALUE, expectedValue, newValue);
}

CAS方法是利用JVM调用CPU级原子指令来保证并发安全性的。

所以,原子类只能保证单次方法调用是并发安全的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Main {
public static void main(String[] args) throws InterruptedException {
AtomicInteger atomicInteger = new AtomicInteger();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
// 一个原子类的两次操作就不是并发安全的了
if (atomicInteger.get() == 0) {
try {
Thread.sleep(1000);
atomicInteger.incrementAndGet();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
}
// 等待线程执行完成
Thread.sleep(2000);
System.out.println("最终:" + atomicInteger.get());
}
}

集合类

JUC还提供了一些并发安全的集合类,常用的有ConcurrentHashMapCopyOnWriteArrayListCopyOnWriteArraySetArrayBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
public class Main {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("a",0);
for (int i = 0; i < 10; i++) {
new Thread(()-> map.compute("a",(k, v)->v+1)).start();
}
Thread.sleep(1000);
System.out.println(map.get("a"));
}
}

JUC提供的集合类,并不都采用同一种机制来实现并发安全性。

ConcurrentHashMap是通过CAS结合synchronized来实现,详情参考本人另一篇文章:https://blog.csdn.net/Uncommen/article/details/144174135。

CopyOnWriteArrayList在写操作时会先加synchronized锁,然后将原数组复制一份,在新数组上修改,再将新数组赋给元素组。这样,读操作即使不加锁,结合volatile就可以保证要么读到旧数组,要么读到新数组,绝不会读到中间脏数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private transient volatile Object[] array;

public E get(int index) {
return elementAt(getArray(), index);
}

public E set(int index, E element) {
synchronized (lock) {
Object[] es = getArray();
E oldValue = elementAt(es, index);

if (oldValue != element) {
es = es.clone();
es[index] = element;
}
// Ensure volatile write semantics even when oldvalue == element
setArray(es);
return oldValue;
}
}

CopyOnWriteArraySetCopyOnWriteArrayList差不多,只是需要判断重复值。会先不加锁判断重复情况,提高效率,如果没有重复值,再加锁去进一步操作,加了锁后还需要再判断一次重复值,避免被其它线程趁方法调用间隙插入了重复值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOfRange(e, snapshot, 0, snapshot.length) < 0
&& addIfAbsent(e, snapshot);
}

/**
* A version of addIfAbsent using the strong hint that given
* recent snapshot does not contain e.
*/
private boolean addIfAbsent(E e, Object[] snapshot) {
synchronized (lock) {
Object[] current = getArray();
int len = current.length;
if (snapshot != current) {
// Optimize for lost race to another addXXX operation
int common = Math.min(snapshot.length, len);
for (int i = 0; i < common; i++)
if (current[i] != snapshot[i]
&& Objects.equals(e, current[i]))
return false;
if (indexOfRange(e, current, common, len) >= 0)
return false;
}
Object[] newElements = Arrays.copyOf(current, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
}
}

ArrayBlockingQueue是对写操作加ReentrantLock锁实现并发安全性,并且通Condition来实现等待队列空余:

1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

Synchronized

既然是通过加synchronized锁来保证并发安全性的,那我们完全可以自己操作synchronized锁来保证一段操作的并发安全性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Main {
private static int count = 0;
private static final Object LOCK = new Object();

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
synchronized (LOCK) {
if (count == 0) {
try {
Thread.sleep(500);
count++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}).start();
}
Thread.sleep(1000);
System.out.println(count);
}
}

关于synchronized锁的详细讲解,可以阅读本人的另一篇文章:https://blog.csdn.net/Uncommen/article/details/152095400

Lock

synchronized锁的用法非常简单,可读性强,不需要我们手动解锁,但同时也缺乏了灵活性。如果我们想更灵活的控制加锁解锁,JUC为我们提供了Lock锁。

Lock是一个接口,代表一类锁,常用的是ReentrantLock可重入锁:

1
public class ReentrantLock implements Lock, java.io.Serializable {

简单的使用,手动加锁解锁,需要在finally中保证锁的释放,避免死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Main {
private static int count = 0;
private static final Lock LOCK = new ReentrantLock();

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
LOCK.lock();
if (count == 0) {
try {
Thread.sleep(500);
count++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
} finally {
LOCK.unlock();
}
}).start();
}
Thread.sleep(1000);
System.out.println(count);
}
}

可以在规定时间内尝试获取锁,而不是一直等着:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Main {
private static int count = 0;
private static final Lock LOCK = new ReentrantLock();

public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
boolean tried = false;
try {
// 尝试获取锁,超过3秒没获取到锁时返回false
tried = LOCK.tryLock(3, TimeUnit.SECONDS);
if (tried && count == 0) {
Thread.sleep(500);
count++;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (tried) {
LOCK.unlock();
}
}
}).start();
}
Thread.sleep(5000);
System.out.println(count);
}
}

ReentrantLock默认是非公平锁,可以传参设置为公平锁(讲究先来后到):

1
2
3
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

如果我们想要控制两个线程交替执行,形成类似生产者消费者模型的话,对于synchronized锁,由于是利用Object类提供的wait方法notifyAll方法,所以生产者和消费者相当于共用同一个等待队列,notifyAll方法会将所有等待线程唤醒:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Main {
private static int count = 0;
private static final Object LOCK = new Object();

public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 5; i++) {
synchronized (LOCK) {
try {
while (count > 0) {
LOCK.wait();
}
count++;
System.out.println(Thread.currentThread().getName() + ":" + count);
LOCK.notifyAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}, "生产者").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
synchronized (LOCK) {
try {
while (count < 1) {
LOCK.wait();
}
count--;
System.out.println(Thread.currentThread().getName() + ":" + count);
LOCK.notifyAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}, "消费者").start();
}
}

但是,JUC为我们提供了Condition,可以让我们为生产者和消费者提供不同的等待队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class Main {
public static void main(String[] args) {
Pc pc = new Pc();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
pc.publish();
}
}, "生产者").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
pc.consume();
}
}, "消费者").start();
}
}

class Pc {
Lock lock = new ReentrantLock();
Condition publisher = lock.newCondition();
Condition consumer = lock.newCondition();
int count = 0;

public void publish() {
try {
lock.lock();
while (count > 0) {
publisher.await();
}
System.out.println(Thread.currentThread().getName() + ":" + ++count);
consumer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void consume() {
try {
lock.lock();
while (count < 1) {
consumer.await();
}
System.out.println(Thread.currentThread().getName() + ":" + --count);
publisher.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

但是,上面的所有情况,都是无论读数据还是写数据,都用同一把锁,多个读操作间仍然需要竞争锁,这是不合理的,读操作完全可以单独使用一把共享锁,而不是与写操作共用一把互斥锁

没关系,JUC为我们提供了一个ReentrantReadWriteLock可重入读写锁:

可以将共享的读锁与互斥的写锁分开:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Main {
public static void main(String[] args) {
Pc pc = new Pc();
for (int i = 0; i < 5; i++) {
new Thread(pc::read).start();
}
for (int i = 0; i < 5; i++) {
new Thread(pc::write).start();
}
}
}

class Pc {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
Lock readLock = lock.readLock();
Lock writeLock = lock.writeLock();
int count = 0;

public void read() {
try {
readLock.lock();
System.out.println("获取到读锁");
Thread.sleep(1000);
System.out.println("读到:" + count);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("释放读锁");
readLock.unlock();
}
}

public void write() {
try {
writeLock.lock();
System.out.println("获取到写锁");
System.out.println("写入:" + ++count);
} finally {
System.out.println("释放写锁");
writeLock.unlock();
}
}
}

读读间共享,读写、写写间互斥。


JUC详解
https://www.wananhome.site/2026/03/10/JUC详解/
作者
WanAn
发布于
2026年3月10日
许可协议