Skip to content

自己写一个简单的线程池

Published: at 17:30:54

需求分析

(1)自己动手写一个线程池需要考虑哪些因素?

(2)自己动手写的线程池如何测试?

思考:

  1. 既然是线程池, 池, 就是需要一个放线程的地方
  2. 可以控制池的大小, 并不是无限的, 也就是coreSize
  3. coreSzie满了, 任务需要排队, 所以需要一个有限的队列, 且这个队列必须要是线程安全的, 比如阻塞队列BlockingQueue
  4. 如果coreSize的线程执行的很快, 那队列里面排队的线程就可以很快被执行完成, 如果队列满了, 可以再增加线程来执行队列里的任务, 也就是maxSize
  5. 最后当队列满了, maxSize也已经达到了, 这时候就需要一种拒接策略了, 常用的策略有丢弃当前任务、丢弃最老的任务、调用者自己处理、抛出异常等。

根据上面的思考,我们定义一个线程池一共需要这么四个变量:

核心线程数coreSize、最大线程数maxSize、阻塞队列BlockingQueue、拒绝策略RejectPolicy。

定义线程池

定义核心参数以及构造方法

public class MyThreadPoolExecutor implements Executor{
		/**
     * 线程池的核心大小
     */
    private volatile int coreSize;
    /**
     * 线程池的最大值
     */
    private volatile int maxSize;
    /**
     * 排队的线程队列
     */
    private final BlockingQueue<Runnable> taskQueue;

    /**
     * 拒绝策略
     */
    private volatile MyRejectedHandler handler;

    /**
     * 线程池线程名称前缀
     */
    private String poolNamePrefix ;
    private static final String defaultPoolNamePrefix = "default-my-thread-pool-";

    /**
     * 运行中的线程数
     */
    private final AtomicInteger runningCount = new AtomicInteger(0);

		public MyThreadPoolExecutor(int coreSize,int maxSize,BlockingQueue<Runnable> taskQueue){
        this(coreSize,maxSize,taskQueue,defaultPoolNamePrefix);
    }

    /**
     * 构造方法
     * @param coreSize
     * @param maxSize
     * @param taskQueue
     * @param poolNamePrefix
     */
    public MyThreadPoolExecutor(int coreSize,int maxSize,BlockingQueue<Runnable> taskQueue,String poolNamePrefix){
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.poolNamePrefix = poolNamePrefix;
        this.handler = new MyRejectedHandler();
    }
}

实现execute方法

@Override
public void execute(Runnable command){
    if(command == null){
        throw new NullPointerException();
    }
    // 1.判断线程数是否达到了核心大小
    if(runningCount.get() < coreSize){
        // 1.1 没有达到coreSize, 按照核心线程执行
        if(addWorker(command,true)){
            return;
        }
    }
    // 2.加入到队列, 加入失败会返回false, 如果用add, 队列满会抛异常
    if(taskQueue.offer(command)){
    }else{
        // 3.加入队列失败, 以非核心线程的方式执行线程任务
        if(!addWorker(command,false)){
            // 3.1 以非核心线程的方式执行失败, 执行拒绝策略
            reject(command);
        }
    }
}

执行基本流程:

  1. 判断线程数是否达到了核心大小
    1. 没有达到coreSize, 按照核心线程执行
  2. 任务数达到coreSize后进来的任务加入到队列
  3. 如果队列满了, 按照maxSize线程数进行执行任务
  4. 如果任务数达到maxSize,后进来的任务按照拒绝策略来执行

添加到工作线程

private boolean addWorker(Runnable command,boolean coreThread){
    // 自旋
    for(;;){
        // 运行中的线程数
        int running = runningCount.get();
        // 确实是取核心大小还是最大值
        int wc = coreThread ? coreSize : maxSize;
        if(running >= wc){
            return false;
        }
        // 工作线程+1, 失败则自旋
        if(runningCount.compareAndSet(running,running+1)){
            break;
        }
    }
    // 标识是否开始工作
    boolean workerStarted = false;
    String coreThreadStr = coreThread ? "core-":"max-";
    String threadName = defaultPoolNamePrefix + coreThreadStr + runningCount.get();
    final Thread t = new Thread(new Runnable(){
        @Override
        public void run(){
            // 执行任务
            runWorker(command);
            // 执行完任务, runningCount-1
            runningCount.decrementAndGet();
        }
    },threadName);
    if (t != null) {
        t.start();
        workerStarted = true;
    }
    return workerStarted;
}

执行任务过程

private void runWorker(Runnable task){
    Runnable t = task;
    while(t != null || (t = getTask()) != null){
        try{
            log.info(Thread.currentThread().getName()+" is running");
            t.run();
        }catch(Exception e){
            throw e;
        }finally{
            t = null;
        }
    }
}

自定义拒绝策略

class MyRejectedHandler {
    public void rejectedExecution(Runnable r,MyThreadPoolExecutor executor){
        log.info(r.toString()+": 执行了拒绝策略");
        // 可以自定拒绝策略, 执行, 忽略, 从队列取任务, 抛异常都可以
        //r.run();
    }
}

从队列取任务

private Runnable getTask(){
    try{
        return taskQueue.take();
    }catch(InterruptedException e){
        return null;
    }
}

测试线程池

public static void main(String[] args) throws InterruptedException{
    MyThreadPoolExecutor myPool = new MyThreadPoolExecutor(2,3,new ArrayBlockingQueue(6));
    AtomicInteger taskIndex = new AtomicInteger(1);
    while(true){
        myPool.execute(new Runnable(){
            @SneakyThrows
            @Override
            public void run(){
                System.out.println("执行任务:"+taskIndex.getAndIncrement());
                // coreSize 为2时, 2秒足以, 不会出现maxSize的线程
                TimeUnit.MILLISECONDS.sleep(800);
                // coreSize 线程处理不过来, 会启用max线程
//                    TimeUnit.MILLISECONDS.sleep(3000);
                // max线程处理不过来, 会启用队列, 队列满了, 会执行拒绝策略
//                    TimeUnit.MILLISECONDS.sleep(4000);
            }
        });
        TimeUnit.MILLISECONDS.sleep(300);
        // 每隔1s提交一个任务
//            TimeUnit.MILLISECONDS.sleep(1000);
    }
}

测试输出:

default-my-thread-pool-core-1 is running 执行任务:1
default-my-thread-pool-core-2 is running 执行任务:2
default-my-thread-pool-core-1 is running 执行任务:3
default-my-thread-pool-core-2 is running 执行任务:4
default-my-thread-pool-max-3 is running 执行任务:5
17:29:47.401 [main] INFO net.admol.jingling.demo.executor.MyThreadPoolExecutor - executor.TestMyThreadPool$1@46d56d67: 执行了拒绝策略