精灵王


  • 首页

  • 文章归档

  • 所有分类

  • 关于我

  • 搜索
设计模式-行为型 设计模式-创建型 设计模式-结构型 设计 系统设计 设计模式之美 分布式 Redis 并发编程 个人成长 周志明的软件架构课 架构 单元测试 LeetCode 工具 位运算 读书笔记 操作系统 MySQL 异步编程 技术方案设计 集合 设计模式 三亚 游玩 转载 Linux 观察者模式 事件 Spring SpringCloud 实战 实战,SpringCloud 源码分析 线程池 同步 锁 线程 线程模型 动态代理 字节码 类加载 垃圾收集器 垃圾回收算法 对象创建 虚拟机内存 内存结构 Java

自己写一个简单的线程池

发表于 2020-09-18 | 分类于 并发编程 | 0

需求分析

(1)自己动手写一个线程池需要考虑哪些因素?

(2)自己动手写的线程池如何测试?

思考:

  1. 既然是线程池, 池, 就是需要一个放线程的地方
  2. 可以控制池的大小, 并不是无限的, 也就是coreSize
  3. coreSzie满了, 任务需要排队, 所以需要一个有限的队列, 且这个队列必须要是线程安全的, 比如阻塞队列BlockingQueue
  4. 如果coreSize的线程执行的很快, 那队列里面排队的线程就可以很快被执行完成, 如果队列满了, 可以再增加线程来执行队列里的任务, 也就是maxSize
  5. 最后当队列满了, maxSize也已经达到了, 这时候就需要一种拒接策略了, 常用的策略有丢弃当前任务、丢弃最老的任务、调用者自己处理、抛出异常等。

根据上面的思考,我们定义一个线程池一共需要这么四个变量:

核心线程数coreSize、最大线程数maxSize、阻塞队列BlockingQueue、拒绝策略RejectPolicy。

定义线程池

定义核心参数以及构造方法

public class MyThreadPoolExecutor implements Executor{
		/**
     * 线程池的核心大小
     */
    private volatile int coreSize;
    /**
     * 线程池的最大值
     */
    private volatile int maxSize;
    /**
     * 排队的线程队列
     */
    private final BlockingQueue<Runnable> taskQueue;

    /**
     * 拒绝策略
     */
    private volatile MyRejectedHandler handler;

    /**
     * 线程池线程名称前缀
     */
    private String poolNamePrefix ;
    private static final String defaultPoolNamePrefix = "default-my-thread-pool-";

    /**
     * 运行中的线程数
     */
    private final AtomicInteger runningCount = new AtomicInteger(0);

		public MyThreadPoolExecutor(int coreSize,int maxSize,BlockingQueue<Runnable> taskQueue){
        this(coreSize,maxSize,taskQueue,defaultPoolNamePrefix);
    }

    /**
     * 构造方法
     * @param coreSize
     * @param maxSize
     * @param taskQueue
     * @param poolNamePrefix
     */
    public MyThreadPoolExecutor(int coreSize,int maxSize,BlockingQueue<Runnable> taskQueue,String poolNamePrefix){
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.poolNamePrefix = poolNamePrefix;
        this.handler = new MyRejectedHandler();
    }
}

实现execute方法

@Override
public void execute(Runnable command){
    if(command == null){
        throw new NullPointerException();
    }
    // 1.判断线程数是否达到了核心大小
    if(runningCount.get() < coreSize){
        // 1.1 没有达到coreSize, 按照核心线程执行
        if(addWorker(command,true)){
            return;
        }
    }
    // 2.加入到队列, 加入失败会返回false, 如果用add, 队列满会抛异常
    if(taskQueue.offer(command)){
    }else{
        // 3.加入队列失败, 以非核心线程的方式执行线程任务
        if(!addWorker(command,false)){
            // 3.1 以非核心线程的方式执行失败, 执行拒绝策略
            reject(command);
        }
    }
}

执行基本流程:

  1. 判断线程数是否达到了核心大小
    1. 没有达到coreSize, 按照核心线程执行
  2. 任务数达到coreSize后进来的任务加入到队列
  3. 如果队列满了, 按照maxSize线程数进行执行任务
  4. 如果任务数达到maxSize,后进来的任务按照拒绝策略来执行

添加到工作线程

private boolean addWorker(Runnable command,boolean coreThread){
    // 自旋
    for(;;){
        // 运行中的线程数
        int running = runningCount.get();
        // 确实是取核心大小还是最大值
        int wc = coreThread ? coreSize : maxSize;
        if(running >= wc){
            return false;
        }
        // 工作线程+1, 失败则自旋
        if(runningCount.compareAndSet(running,running+1)){
            break;
        }
    }
    // 标识是否开始工作
    boolean workerStarted = false;
    String coreThreadStr = coreThread ? "core-":"max-";
    String threadName = defaultPoolNamePrefix + coreThreadStr + runningCount.get();
    final Thread t = new Thread(new Runnable(){
        @Override
        public void run(){
            // 执行任务
            runWorker(command);
            // 执行完任务, runningCount-1
            runningCount.decrementAndGet();
        }
    },threadName);
    if (t != null) {
        t.start();
        workerStarted = true;
    }
    return workerStarted;
}

执行任务过程

private void runWorker(Runnable task){
    Runnable t = task;
    while(t != null || (t = getTask()) != null){
        try{
            log.info(Thread.currentThread().getName()+" is running");
            t.run();
        }catch(Exception e){
            throw e;
        }finally{
            t = null;
        }
    }
}

自定义拒绝策略

class MyRejectedHandler {
    public void rejectedExecution(Runnable r,MyThreadPoolExecutor executor){
        log.info(r.toString()+": 执行了拒绝策略");
        // 可以自定拒绝策略, 执行, 忽略, 从队列取任务, 抛异常都可以
        //r.run();
    }
}

从队列取任务

private Runnable getTask(){
    try{
        return taskQueue.take();
    }catch(InterruptedException e){
        return null;
    }
}

测试线程池

public static void main(String[] args) throws InterruptedException{
    MyThreadPoolExecutor myPool = new MyThreadPoolExecutor(2,3,new ArrayBlockingQueue(6));
    AtomicInteger taskIndex = new AtomicInteger(1);
    while(true){
        myPool.execute(new Runnable(){
            @SneakyThrows
            @Override
            public void run(){
                System.out.println("执行任务:"+taskIndex.getAndIncrement());
                // coreSize 为2时, 2秒足以, 不会出现maxSize的线程
                TimeUnit.MILLISECONDS.sleep(800);
                // coreSize 线程处理不过来, 会启用max线程
//                    TimeUnit.MILLISECONDS.sleep(3000);
                // max线程处理不过来, 会启用队列, 队列满了, 会执行拒绝策略
//                    TimeUnit.MILLISECONDS.sleep(4000);
            }
        });
        TimeUnit.MILLISECONDS.sleep(300);
        // 每隔1s提交一个任务
//            TimeUnit.MILLISECONDS.sleep(1000);
    }
}

测试输出:

default-my-thread-pool-core-1 is running 执行任务:1
default-my-thread-pool-core-2 is running 执行任务:2
default-my-thread-pool-core-1 is running 执行任务:3
default-my-thread-pool-core-2 is running 执行任务:4
default-my-thread-pool-max-3 is running 执行任务:5
17:29:47.401 [main] INFO net.admol.jingling.demo.executor.MyThreadPoolExecutor - executor.TestMyThreadPool$1@46d56d67: 执行了拒绝策略
精 灵 王 wechat
👆🏼欢迎扫码关注微信公众号👆🏼
  • 本文作者: 精 灵 王
  • 本文链接: https://jinglingwang.cn/archives/thread-pool-demo
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# 设计模式-行为型 # 设计模式-创建型 # 设计模式-结构型 # 设计 # 系统设计 # 设计模式之美 # 分布式 # Redis # 并发编程 # 个人成长 # 周志明的软件架构课 # 架构 # 单元测试 # LeetCode # 工具 # 位运算 # 读书笔记 # 操作系统 # MySQL # 异步编程 # 技术方案设计 # 集合 # 设计模式 # 三亚 # 游玩 # 转载 # Linux # 观察者模式 # 事件 # Spring # SpringCloud # 实战 # 实战,SpringCloud # 源码分析 # 线程池 # 同步 # 锁 # 线程 # 线程模型 # 动态代理 # 字节码 # 类加载 # 垃圾收集器 # 垃圾回收算法 # 对象创建 # 虚拟机内存 # 内存结构 # Java
Java 虚拟机垃圾回收算法总结
Java 内存中几种常见的OOM 异常
  • 文章目录
  • 站点概览
精 灵 王

精 灵 王

青春岁月,以此为伴

106 日志
14 分类
48 标签
RSS
Github E-mail
Creative Commons
Links
  • 添加友链说明
© 2023 精 灵 王
渝ICP备2020013371号
0%