跳至主要內容

并发工具类

摸鱼散人大约 17 分钟

CountDownLatch(倒计数器)了解吗?

  • CountDownLatch,倒计数器,有两个常见的应用场景

    • 场景1:协调子线程结束动作:等待所有子线程运行结束

      public static void main(String[] args) throws InterruptedException {
          CountDownLatch countDownLatch = new
                  CountDownLatch(5);
          Thread 大乔 = new Thread(countDownLatch::countDown);
          Thread 兰陵王 = new Thread(countDownLatch::countDown);
          Thread 安其拉 = new Thread(countDownLatch::countDown);
          Thread 哪吒 = new Thread(countDownLatch::countDown);
          Thread= new Thread(() -> {
              try {
                  // 稍等,上个卫生间,马上到...
                  Thread.sleep(1500);
                  countDownLatch.countDown();
              } catch (InterruptedException ignored) {
              }
          });
          大乔.start();
          兰陵王.start();
          安其拉.start();
          哪吒.start();.start();
          countDownLatch.await();
          System.out.println("所有玩家已经就位!");
      }
      
      • 子线程使用countDownLatch.countDown()进行倒数计数
      • 主线程使用countDownLatch.await()进行阻塞,等待子线程完成
    • 场景2:协调子线程开始动作:统一各线程动作开始的时机

          public static void main(String[] args) throws InterruptedException {
              CountDownLatch countDownLatch = new
                      CountDownLatch(1);
              Thread 大乔 = new Thread(() ->
                      waitToFight(countDownLatch));
              Thread 兰陵王 = new Thread(() ->
                      waitToFight(countDownLatch));
              Thread 安其拉 = new Thread(() ->
                      waitToFight(countDownLatch));
              Thread 哪吒 = new Thread(() ->
                      waitToFight(countDownLatch));
              Thread= new Thread(() ->
                      waitToFight(countDownLatch));
              大乔.start();
              兰陵王.start();
              安其拉.start();
              哪吒.start();.start();
              Thread.sleep(1000);
              countDownLatch.countDown();
              System.out.println("敌方还有5秒达到战场,全军出击!");
          }
      
          private static void waitToFight(CountDownLatch
                                                  countDownLatch) {
              try {
                  countDownLatch.await(); // 在此等待信号再继续
                  System.out.println("收到,发起进攻!");
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      
      • 子线程使用countDownLatch.await()进行阻塞,等待主线程完成
      • 主线程使用countDownLatch.countDown()进行倒数计数
  • CountDownLatch的核心方法

    • await() :等待latch降为0
    • boolean await(long timeout, TimeUnit unit) :等待latch降为0,但是可以设置超时时间。比如有玩家超时未确认,那就重新匹配,总不能为了某个玩家等到天荒地老
    • countDown() :latch数量减1
    • getCount() :获取当前的latch数量

CyclicBarrier(同步屏障)了解吗?

  • CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)

  • 它要做的事情是,让一 组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行

  • 它和CountDownLatch类似,都可以协调多线程的结束动作,在它们结束后都可以执行特定动作

  • 重要注意事项

    1. 线程中断:如果某个线程在等待时被中断,会抛出 InterruptedException,此时其他线程也会收到 BrokenBarrierException 并继续执行。
    2. 重用性CyclicBarrier 可以重复使用,当所有等待线程都被释放后,屏障会自动重置,可以再次使用。
    3. 异常处理:需要处理可能的 BrokenBarrierExceptionTimeoutException(如果使用带超时参数的 await 方法)。
  • 核心方法await()

    • 使线程在屏障点等待,直到所有线程都到达。如果当前线程是最后一个到达的线程,则会放行,往下执行
    • 如再次await()则阻塞,当前线程是最后一个到达的线程await时,再次放行,循环往复
public class CyclicBarrierExample {

    public static void main(String[] args) {
        // 创建CyclicBarrier实例,并设置屏障点操作(所有线程到达屏障点时执行)
        CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有线程已到达屏障点,继续执行后续任务...");
            }
        });

        // 创建并启动三个线程
        for (int i = 1; i <= 3; i++) {
            new Thread(new Task(barrier), "线程 " + i).start();
        }
    }

    static class Task implements Runnable {
        private CyclicBarrier barrier;

        public Task(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    System.out.println(Thread.currentThread().getName() + " 正在执行任务...");
                    // 模拟任务执行时间
                    Thread.sleep((long) (Math.random() * 1000));
                    System.out.println(Thread.currentThread().getName() + " 完成任务,等待其他线程...");
                    // 调用await方法等待其他线程
                    barrier.await();
                    // 所有线程到达屏障点后执行的代码
                    System.out.println(Thread.currentThread().getName() + " 继续执行后续任务...");
                }
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

线程 1 正在执行任务...
线程 2 正在执行任务...
线程 3 正在执行任务...
线程 3 完成任务,等待其他线程...
线程 2 完成任务,等待其他线程...
线程 1 完成任务,等待其他线程...
所有线程已到达屏障点,继续执行后续任务...
线程 1 继续执行后续任务...
线程 1 正在执行任务...
线程 2 继续执行后续任务...
线程 3 继续执行后续任务...
线程 2 正在执行任务...
线程 3 正在执行任务...
线程 3 完成任务,等待其他线程...
线程 2 完成任务,等待其他线程...
线程 1 完成任务,等待其他线程...
所有线程已到达屏障点,继续执行后续任务...
线程 1 继续执行后续任务...
线程 1 正在执行任务...
线程 3 继续执行后续任务...
线程 3 正在执行任务...
线程 2 继续执行后续任务...
线程 2 正在执行任务...
线程 1 完成任务,等待其他线程...
线程 2 完成任务,等待其他线程...
线程 3 完成任务,等待其他线程...
所有线程已到达屏障点,继续执行后续任务...
线程 3 继续执行后续任务...
线程 3 正在执行任务...
线程 1 继续执行后续任务...
线程 1 正在执行任务...
线程 2 继续执行后续任务...
线程 2 正在执行任务...
线程 3 完成任务,等待其他线程...
线程 1 完成任务,等待其他线程...
线程 2 完成任务,等待其他线程...
所有线程已到达屏障点,继续执行后续任务...
...
  • 线程达到公共屏障点(barrier)前阻塞,知道最后一个线程达到屏障点,然后又开始执行
  • 循环往复的做达到屏障点->放行->达到屏障点->放行...

CyclicBarrier和CountDownLatch有什么区别?

  • CountDownLatch是一次性的,而CyclicBarrier则可以多次设置屏障,实现重复利用
  • CountDownLatch中的各个子线程不可以等待其他线程,只能完成自己的任务;而CyclicBarrier中的各个线程可以等待其他线程
CyclicBarrierCountDownLatch
CyclicBarrier是可重用的,其中的线程会等待 所有的线程完成任务。届时,屏障将被拆 除,并可以选择性地做一些特定的动作。CountDownLatch是一次性的, 不同的线程在同一个计数器上 工作,直到计数器为0.
CyclicBarrier面向的是线程数CountDownLatch面向的是任务 数
在使用CyclicBarrier时,你必须在构造中指定 参与协作的线程数,这些线程必须调用await() 方法使用CountDownLatch时,则必 须要指定任务数,至于这些任 务由哪些线程完成无关紧要
CyclicBarrier可以在所有的线程释放后重新使 用CountDownLatch在计数器为0 时不能再使用
在CyclicBarrier中,如果某个线程遇到了中 断、超时等问题时,则处于await的线程都会 出现问题在CountDownLatch中,如果某 个线程出现问题,其他线程不 受影响

Semaphore(信号量)了解吗?

  • 定义

    • Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源
  • 应用场景

    • 资源池管理:如数据库连接池,限制同时访问数据库的连接数
    • 限流控制:如限制同时处理的请求数量,防止过载
    • 多线程限量操作:如控制多个线程对某个资源的访问,确保不会出现资源争用
  • 示例代码

    public class SemaphoreExample {
    
        public static void main(String[] args) {
            // 创建一个Semaphore实例,设定可用许可数量为3(表示有3个停车位)
            Semaphore parkingLot = new Semaphore(3);
    
            // 创建并启动六个线程模拟汽车
            for (int i = 1; i <= 6; i++) {
                new Thread(new Car(parkingLot), "汽车 " + i).start();
            }
        }
    
        static class Car implements Runnable {
            private Semaphore parkingLot;
    
            public Car(Semaphore parkingLot) {
                this.parkingLot = parkingLot;
            }
    
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " 尝试进入停车场...");
                    // 获取一个许可,阻塞直到有可用的许可
                    parkingLot.acquire();
                    System.out.println(Thread.currentThread().getName() + " 成功进入停车场。");
                    // 模拟停车时间
                    Thread.sleep((long) (Math.random() * 10000));
                    System.out.println(Thread.currentThread().getName() + " 离开停车场。");
                    // 释放许可
                    parkingLot.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    解释

    • Semaphore:构造函数中传入的参数表示可以同时访问的资源数量。在这个例子中,设定为3,表示有3个停车位。
    • acquire() 方法:线程调用这个方法尝试获取一个许可,如果没有可用许可,线程会被阻塞,直到有可用许可为止。
    • release() 方法:线程调用这个方法释放一个许可,使其他被阻塞的线程可以继续执行。
    • 模拟停车场Car 类中的 run 方法模拟了汽车进入停车场、停车和离开的过程。线程首先尝试获取一个许可,成功后进入停车场, 停车一段时间后离开,并释放许可。

    重要注意事项

    1. 公平性:Semaphore可以设置为公平模式(FIFO),通过在构造函数中传入 true,确保线程按请求顺序获取许可:

      Semaphore parkingLot = new Semaphore(3, true);
      
    2. 中断acquire 方法响应中断,如果线程在等待许可时被中断,会抛出 InterruptedException

    3. tryAcquire 方法:尝试获取许可但不阻塞,有多种重载形式:

      boolean acquired = parkingLot.tryAcquire();
      boolean acquiredWithTimeout = parkingLot.tryAcquire(1, TimeUnit.SECONDS);
      

    使用 Semaphore 可以有效控制对资源的并发访问,确保系统资源的有效利用和安全性

Exchanger 了解吗?

Exchanger 是 Java 中的一个同步辅助类,专门用于在两个线程之间交换数据。它的主要应用场景包括:

  1. 双线程数据交换:两个线程需要在某个点交换数据,如生产者和消费者模式中的数据交换。
  2. 任务分配与结果收集:一个线程生成任务,另一个线程处理任务并返回结果。
  3. 资源双向传递:两个线程互相传递数据或资源,如双缓冲区交换。

代码示例

以下是一个简单的 Exchanger 使用示例,模拟两个线程之间的数据交换:

import java.util.concurrent.Exchanger;

public class ExchangerExample {

    public static void main(String[] args) {
        // 创建一个Exchanger实例
        Exchanger<String> exchanger = new Exchanger<>();

        // 创建并启动两个线程
        new Thread(new Producer(exchanger), "生产者").start();
        new Thread(new Consumer(exchanger), "消费者").start();
    }

    static class Producer implements Runnable {
        private Exchanger<String> exchanger;

        public Producer(Exchanger<String> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            try {
                String data = "生产的数据";
                System.out.println(Thread.currentThread().getName() + " 生产了数据:" + data);
                // 与消费者交换数据
                String response = exchanger.exchange(data);
                System.out.println(Thread.currentThread().getName() + " 收到消费者的数据:" + response);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable {
        private Exchanger<String> exchanger;

        public Consumer(Exchanger<String> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            try {
                String data = "消费者处理的数据";
                // 接收生产者的数据并返回处理后的数据
                String received = exchanger.exchange(data);
                System.out.println(Thread.currentThread().getName() + " 接收到生产者的数据:" + received);
                System.out.println(Thread.currentThread().getName() + " 处理后数据:" + data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

解释

  • Exchanger:一个用于在两个线程之间交换数据的同步点。通过 exchange 方法,两个线程可以交换数据,并且这两个线程在此方法处阻塞,直到对方也到达此同步点。
  • Producer 类:模拟生产者线程,生成数据并与消费者交换。
  • Consumer 类:模拟消费者线程,准备处理数据并与生产者交换。
  • exchange() 方法:线程调用此方法进行数据交换,该方法会阻塞,直到另一个线程也调用 exchange 方法。

重要注意事项

  1. 成对使用Exchanger 是为成对线程设计的,如果有一个线程没有配对线程来交换数据,那么它会一直阻塞。
  2. 超时机制exchange 方法有带超时参数的重载形式,可以指定最大等待时间:
    String response = exchanger.exchange(data, 1, TimeUnit.SECONDS);
    
  3. 中断处理:如果线程在等待交换数据时被中断,会抛出 InterruptedException

Exchanger 可以简化两个线程之间的数据交换逻辑,确保交换操作是同步和线程安全的,非常适合需要双向数据传递的场景。

应用场景

  1. 遗传算法
    1. Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果
  2. 校对工作
    1. Exchanger也可以用于校对工作,比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致

说说你对CompletableFuture的理解?

1. 引言

CompletableFuture 是Java 8引入的一个异步编程工具类,用于处理异步任务的结果和执行流程。它提供了一种简洁而强大的方式来处理异步操作,包括任务的串行执行、并行执行、组合以及异常处理等

2. 基本概念

2.1 创建 CompletableFuture

你可以使用静态工厂方法来创建 CompletableFuture 对象:

  • supplyAsync:用于执行有返回值的异步任务。
  • runAsync:用于执行没有返回值的异步任务。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 任务逻辑
    return "结果";
});

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // 任务逻辑
});

3. 任务完成后的回调

3.1 thenApply

用于在任务完成后,对结果进行处理并返回新的结果。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(result -> result + " World");

3.2 thenAccept

用于在任务完成后,消费结果但不返回新的结果。

CompletableFuture.supplyAsync(() -> "Hello")
    .thenAccept(result -> System.out.println(result));

3.3 thenRun

用于在任务完成后执行一个没有返回值的操作。

CompletableFuture.supplyAsync(() -> "Hello")
    .thenRun(() -> System.out.println("任务完成"));

4. 组合多个 CompletableFuture

4.1 thenCompose

用于在一个 CompletableFuture 完成后,启动另一个 CompletableFuture

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " World"));

4.2 thenCombine

用于将两个独立的 CompletableFuture 的结果进行合并。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> " World");

CompletableFuture<String> resultFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);

4.3 allOf

用于等待所有给定的 CompletableFuture 完成。

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "结果1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "结果2");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "结果3");

CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);

allOf.thenAccept(v -> {
    try {
        String result1 = future1.get();
        String result2 = future2.get();
        String result3 = future3.get();
        System.out.println(result1 + ", " + result2 + ", " + result3);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
});

5. 处理异常

5.1 exceptionally

用于在异步任务发生异常时提供一个默认值。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("发生异常");
    }
    return "成功";
}).exceptionally(ex -> "默认值");

5.2 handle

用于在异步任务完成或发生异常时对结果进行处理。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("发生异常");
    }
    return "成功";
}).handle((result, ex) -> {
    if (ex != null) {
        return "异常处理后的默认值";
    }
    return result;
});

6. 超时控制

6.1 orTimeout

用于为异步操作设定超时。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "结果";
}).orTimeout(2, TimeUnit.SECONDS);

6.2 completeOnTimeout

用于在超时时返回默认值。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "结果";
}).completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);

7. 并行执行多个任务

示例

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.List;
import java.util.stream.Collectors;

public class CompletableFutureParallelExample {
    public static void main(String[] args) {
        // 定义三个异步任务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000); // 模拟耗时操作
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "结果1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000); // 模拟耗时操作
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "结果2";
        });

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000); // 模拟耗时操作
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "结果3";
        });

        // 创建一个包含所有异步任务的CompletableFuture
        CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);

        // 在所有任务完成后处理结果
        allOf.thenAccept(v -> {
            List<String> results = List.of(future1, future2, future3).stream()
                    .map(future -> {
                        try {
                            return future.get();
                        } catch (InterruptedException | ExecutionException e) {
                            throw new IllegalStateException(e);
                        }
                    })
                    .collect(Collectors.toList());

            // 处理所有结果
            results.forEach(result -> System.out.println("任务完成,结果: " + result));
        }).join(); // 等待所有任务完成

        System.out.println("主线程继续执行其他操作...");
    }
}

8. 真实场景应用

8.1 并行获取数据

以下示例展示如何并行获取多个远程数据源的数据,并在所有数据获取完成后进行处理。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.List;
import java.util.stream.Collectors;

public class CompletableFutureRealWorldExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = fetchDataFromService1();
        CompletableFuture<String> future2 = fetchDataFromService2();
        CompletableFuture<String> future3 = fetchDataFromService3();

        CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);

        allOf.thenAccept(v -> {
            try {
                String result1 = future1.get();
                String result2 = future2.get();
                String result3 = future3.get();

                // 合并结果
                String finalResult = result1 + ", " + result2 + ", " + result3;
                System.out.println("最终结果: " + finalResult);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }).join(); // 等待所有任务完成
    }

    private static CompletableFuture<String> fetchDataFromService1() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟远程调用
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Service1数据";
        });
    }

    private static CompletableFuture<String> fetchDataFromService2() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟远程调用
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Service2数据";
        });
    }

    private static CompletableFuture<String> fetchDataFromService3() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟远程调用
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Service3数据";
        });
    }
}

9. 总结

CompletableFuture 提供了一种简洁而强大的方式来处理异

CompletableFuture的线程池设置?

CompletableFuture 使用 ForkJoinPool.commonPool() 作为默认线程池来执行异步任务,但你可以指定自定义的线程池来满足特殊需求,如提高性能或控制资源使用。以下是如何设置和使用自定义线程池的详细指南。

1. 使用默认线程池

默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool()。这对于大多数情况已经足够,但在需要更多控制或资源隔离时,可以使用自定义线程池。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 异步任务逻辑
    return "结果";
});

2. 指定自定义线程池

你可以使用 CompletableFuture.supplyAsyncCompletableFuture.runAsync 方法的重载版本来指定自定义的线程池。

2.1 创建自定义线程池

首先,创建一个自定义线程池。你可以使用 Executors 工具类来创建不同类型的线程池:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

ExecutorService customThreadPool = Executors.newFixedThreadPool(10); // 创建一个固定大小的线程池

2.2 使用自定义线程池

然后,将自定义线程池传递给 CompletableFuture

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 异步任务逻辑
    return "结果";
}, customThreadPool);

3. 完整示例

以下是一个完整的示例,展示如何使用自定义线程池执行异步任务:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        // 创建自定义线程池
        ExecutorService customThreadPool = Executors.newFixedThreadPool(10);

        // 提交异步任务并使用自定义线程池
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000); // 模拟耗时操作
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "结果";
        }, customThreadPool);

        // 注册回调处理结果
        future.thenAccept(result -> System.out.println("异步任务完成,结果: " + result));

        // 主线程继续执行其他操作
        System.out.println("主线程继续执行其他操作...");

        // 关闭线程池
        customThreadPool.shutdown();
        try {
            if (!customThreadPool.awaitTermination(800, TimeUnit.MILLISECONDS)) {
                customThreadPool.shutdownNow();
            } 
        } catch (InterruptedException e) {
            customThreadPool.shutdownNow();
        }
    }
}

4. 线程池类型

4.1 固定大小线程池

适用于已知固定数量的并发任务:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

4.2 缓存线程池

适用于大量短生命周期的并发任务,线程池大小根据需要动态调整:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

4.3 单线程池

适用于需要顺序执行任务的场景:

ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();

4.4 调度线程池

适用于需要定期执行任务的场景:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

5. 异步任务链中的线程池

你还可以在异步任务链中的不同部分使用不同的线程池:

ExecutorService pool1 = Executors.newFixedThreadPool(5);
ExecutorService pool2 = Executors.newCachedThreadPool();

CompletableFuture.supplyAsync(() -> {
    // 使用 pool1 执行异步任务
    return "任务1结果";
}, pool1).thenApplyAsync(result -> {
    // 使用 pool2 处理结果
    return result + " 经过处理";
}, pool2).thenAcceptAsync(result -> {
    System.out.println("最终结果: " + result);
    // 可以不指定线程池,则使用默认的 ForkJoinPool.commonPool()
});

通过设置自定义线程池,你可以更好地控制 CompletableFuture 的并发行为,满足不同应用场景的需求。