商城首页欢迎来到中国正版软件门户

您的位置:首页 >从零实现一个轻量级C++线程池

从零实现一个轻量级C++线程池

  发布于2026-05-02 阅读(0)

扫一扫,手机访问

一、引言

今天,我们来动手实现一个目标:一个轻量、可用且具备良好扩展性的C++线程池。这不仅是理解并发编程核心思想的绝佳实践,也能为你的项目带来实实在在的性能提升。

实现过程将围绕几个关键技术点展开:

  1. std::thread:线程的创建与管理。
  2. std::mutex / std::unique_lock:保护共享资源,防止数据竞争。
  3. std::condition_variable:线程间的同步与高效等待。
  4. std::function / future / packaged_task / bind:实现灵活的任务封装与结果获取。

二、什么是线程池

简单来说,线程池是一种用于管理和复用线程的并发编程模型。它的核心思想非常直观:与其在每次需要处理任务时都临时创建新线程,不如预先创建好一组工作线程,把它们放在一个“池子”里统一管理。当有新任务到来时,直接从池子里分配一个空闲线程去执行即可。

从零实现一个轻量级C++线程池

三、为什么需要线程池

频繁地创建和销毁线程可不是没有代价的。每一次操作都伴随着内存分配、内核态切换等系统开销。线程池通过复用线程,巧妙地绕过了这个问题,并带来了三重好处:

  • 降低资源开销:线程复用避免了反复创建和销毁的开销,系统性能自然得到提升。
  • 提高响应速度:任务提交后,无需等待漫长的线程创建过程,可以立即被分配给池中“待命”的线程执行。
  • 提高线程的可管理性:线程是系统级的稀缺资源。无限制地创建线程会迅速消耗大量内存,甚至导致程序崩溃。线程池则提供了统一的入口,让你能对线程数量进行精细的分配、调优和监控。

四、线程池的核心组成

一个典型的线程池,通常由以下几个核心部件构成:

  1. 工作线程集合 (Worker Threads):池中预先创建好的一组线程。它们会持续运行,不断从任务队列中“领取”并执行任务。
  2. 任务队列 (Task Queue):一个线程安全的队列,充当任务提交者和工作线程之间的缓冲区。所有待执行的任务都在这里排队等候。
  3. 同步机制 (Synchronization)
    • 互斥锁 (Mutex):用于保护任务队列,确保在多线程环境下对队列的访问是安全的,防止出现竞态条件。
    • 条件变量 (Condition Variable):这是实现高效等待的关键。当任务队列为空时,工作线程会在此处休眠等待;一旦有新任务加入,它就会被唤醒,从而避免了无意义的循环检测,节省了CPU资源。
  4. 任务接口 (Task Interface):一个对外的提交任务的方法。它需要足够灵活,能够接纳各种类型的可调用对象,比如普通函数、Lambda表达式、函数对象等。

五、C++线程池的实现

在深入代码之前,有必要先快速回顾一下实现中将用到的几个关键C++组件。理解了它们,后面的实现逻辑就会清晰很多。

std::condition_variable——条件变量,它是线程间同步的“信号灯”。当任务队列为空时,工作线程会在此阻塞休眠。而当生产者(任务提交者)将新任务加入队列后,会通过同一个条件变量发出通知,唤醒正在等待的线程。这种机制的好处显而易见:它彻底避免了工作线程通过忙等待(busy-waiting)来轮询队列状态所带来的CPU空转开销。代码中主要用到它的三个接口:

void wait (unique_lock& lck, Predicate pred);

第一个参数是互斥锁。这个函数很聪明,它在内部会先释放锁,然后让线程休眠。这样做是为了防止线程在休眠期间还持有锁,导致其他活跃线程无法获取锁而阻塞。

第二个参数是一个可调用对象(谓词),它必须返回true或false。这个谓词会被循环检查,直到其返回true为止。

当谓词返回true后,线程被唤醒,并重新获取锁,然后继续执行后续代码。

void notify_one() noexcept;

唤醒一个在该条件变量下等待的线程。

void notify_all() noexcept;

唤醒所有在该条件变量下等待的线程。

std::future,用来获取异步任务执行的结果。试想一下,如果没有它,在C++中想获取另一个线程的返回值,通常需要将结果写入一个全局变量。而全局变量作为共享资源,在多线程环境下必须加锁保护。std::future 封装了这些底层细节,提供了一种更安全、更优雅的同步获取结果的方式。

get()方法

调用此方法会阻塞当前线程,直到关联的异步任务完成并返回结果。所以说,它是一种“同步等待”结果的方式。

std::function,是一个通用的函数包装器。它的重要作用在于“统一类型”。函数指针、Lambda表达式、仿函数虽然都是可调用对象,但它们的类型各不相同,无法直接放入同一个容器(比如我们的任务队列)进行管理。而经过 std::function 包装后,它们就拥有了统一的类型,管理起来就方便多了。

std::packaged_task,可以看作是一个“任务包装器”,专门用来包装一个可调用对象,并方便地获取其异步执行的结果。它内部关联了一个 std::future 对象,任务的返回值会被自动存入这个future中。同时,它提供了 get_future() 方法来获取这个关联的future。这样一来,其他线程拿到这个future后,调用 get() 就能拿到结果了。值得一提的是,packaged_task 本身也重载了 () 运算符,可以直接像函数一样调用它来执行包装的任务。

std::bind,用于绑定函数参数,并返回一个新的可调用对象。举个例子,函数 Add(int a, int b) 原本需要两个参数,调用形式是 Add(10, 20)。但经过 std::bind 绑定后,例如 auto func = std::bind(Add, 10, 20);,调用时就不再需要参数了,直接 func() 即可。为什么要做参数绑定?看完下面的线程池实现,你就会豁然开朗。

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

class ThreadPool {
public:
    ThreadPool(size_t thread_num = 4):
        _thread_num(thread_num),
        _start(false),
        _stop(false)
    {}
    ~ThreadPool(){
        if (_start && !_stop) stop();
    }
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;
    ThreadPool(ThreadPool&&) = delete;
    ThreadPool& operator=(ThreadPool&&) = delete;

    void start() {
        std::unique_lock lock(_mutex);
        if (_start) return;
        _workers.reserve(_thread_num);
        for (size_t i = 0; i < _thread_num; i++) {
            _workers.emplace_back(std::thread([this](){
                work_loop();
            }));
        }
        _start = true;
    }

    void stop() {
        {
            std::unique_lock lock(_mutex);
            if (!_start || _stop) return;
            // 在join回收线程之前,必须先将_stop置为true,
            // 否则工作线程可能会一直阻塞在条件变量上,导致无法正常退出,甚至会导致程序崩溃
            _stop = true;
        }
        _cond.notify_all();
        for (auto& worker : _workers) {
            if (worker.joinable()) worker.join();
        }
    }

    template
    auto submit(F&& f, Args&&... args)->std::future> {
        using return_type = std::invoke_result_t;
        // 绑定函数参数,并交给任务包装器
        auto task = std::make_shared>(
            std::bind(std::forward(f), std::forward(args)...)
        );
        // 获取关联的future
        std::future res = task->get_future();
        // 加锁+入队列
        {
            std::unique_lock lock(_mutex);
            if (_stop || !_start) throw std::runtime_error("线程池未启动!");
            _tasks.emplace([task](){(*task)();});
        }
        _cond.notify_one();
        return res;
    }

private:
    void work_loop() {
        while (true) {
            std::function task;
            {
                std::unique_lock lock(_mutex);
                _cond.wait(lock, [this](){
                    return _stop || !_tasks.empty();
                });
                if (_stop && _tasks.empty()) return;
                task = std::move(_tasks.front());
                _tasks.pop();
            }
            task();
        }
    }

private:
    std::vector _workers;
    std::queue> _tasks;
    std::mutex _mutex;
    std::condition_variable _cond;
    size_t _thread_num;
    bool _start;
    bool _stop;
};

int add(int a, int b) {
    return a + b;
}

void print() {
    std::cout << "-------------------print-------------------" << std::endl;
    std::cout << "Hello World!" << std::endl;
}

int main() {
    ThreadPool pool(4);
    pool.start();
    std::cout << "==================ThreadPoolTest==================" << std::endl;
    pool.submit([](){
        std::cout << "-------------------lambda-------------------" << std::endl;
        std::cout << "this is a lambda!" << std::endl;
    });
    std::this_thread::sleep_for(std::chrono::seconds(3));  
    auto ret1 = pool.submit(add, 10, 20);
    std::cout << "-------------------add-------------------" << std::endl;
    std::cout << "10 + 20 = " << ret1.get() << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(3));  
    pool.submit(print);
    pool.stop();
    return 0;
}

运行结果:

从零实现一个轻量级C++线程池

上面的实现代码用到了C++17的语法特性(如 std::invoke_result_t)。

这里有一个关键点需要解释:我们的任务队列中存储的任务类型是 std::function,这意味着任务既没有参数,也没有返回值。那么,像 add 这样带参数和返回值的函数,是如何被成功提交并执行的呢?秘诀就在于那三步组合操作:

  1. 通过std::bind绑定函数的所有参数:这样一来,原本需要参数的函数就变成了一个“无参”的可调用对象。
  2. 通过std::packaged_task获取函数的返回值:返回值被自动存入其内部的future中,外部可以通过future来获取,因此任务本身无需返回值。
  3. 通过一层Lambda进行最终封装:不管原始函数是什么形态,最终放入队列的任务,都是一个符合 void() 签名的Lambda表达式。

这三板斧组合在一起,就实现了强大的适配能力,无论原函数有无参数、有无返回值,都能完美适配到我们统一的任务队列中。

另外,细心的你可能注意到了代码中的 std::forward(完美转发)。这不仅对函数参数进行了完美转发,以保持其原有的左值或右值属性;也对函数对象本身进行了完美转发。这是因为我们在调用 submit 时,可能会直接传入一个临时Lambda(右值),完美转发可以触发移动语义,避免在 std::bind 内部发生不必要的拷贝操作,提升了效率。

至此,一个轻量级、功能完整的C++线程池就从零开始构建完成了。希望这个详细的实现过程能帮助你更深入地理解线程池的工作原理和现代C++并发工具的使用。

本文转载于:https://www.jb51.net/program/36227121a.htm 如有侵犯,请联系zhengruancom@outlook.com删除。
免责声明:正软商城发布此文仅为传递信息,不代表正软商城认同其观点或证实其描述。

热门关注