菜单

Administrator
发布于 2026-05-22 / 4 阅读
0
0

Java 开发者学 C++(九):并发编程

C++ 并发 vs Java 并发:晚来的标准

C++ 直到 C++11 才在标准库中提供了线程支持(<thread><mutex><future><condition_variable> 等),而 Java 从 1.0 开始就有线程。但 C++ 的设计思路和 Java 的 java.util.concurrent 非常相似,而且 C++ 的 RAII 机制让资源管理更安全。

核心差异:C++ 没有内置的 synchronized 关键字,所有同步都通过 RAII 锁对象管理。C++ 的线程对象析构时必须已 join 或 detach,否则调用 std::terminate()


线程的创建和管理

基础线程创建

#include <thread>
#include <iostream>

void worker(int id) {
    std::cout << "Worker " << id << " running\n";
}

int main() {
    // 通过函数指针创建
    std::thread t1(worker, 42);

    // 通过 Lambda 创建(最常用)
    std::thread t2([] {
        std::cout << "Hello from Lambda thread\n";
    });

    // 通过可调用对象创建
    struct Task {
        void operator()() const {
            std::cout << "Task executed\n";
        }
    };
    std::thread t3(Task{});

    // 必须 join 或 detach!
    t1.join();    // 等待线程完成
    t2.join();
    t3.detach();  // 分离线程(让它独自运行)

    // 如果 t1 在 join/detach 前析构 → std::terminate()!
    return 0;
}

Java vs C++ 线程创建

操作 Java C++
创建线程 new Thread(runnable).start() std::thread(func, args)
等待完成 thread.join() thread.join()
分离线程 自动(JVM 管理) thread.detach()

重要:Java 线程不 join 也无妨(JVM 会清理守护线程),但 C++ 的 std::thread 是 RAII 对象——析构时如果线程仍可 join(joinable),程序直接终止。

{
    std::thread t(worker, 1);
    // 忘记 join 或 detach
}  // t 析构 → std::terminate()!

互斥锁与 RAII

C++ 没有 synchronized 关键字,所有锁操作通过 RAII 包装类完成。

基础互斥锁

#include <mutex>

class Counter {
    int count = 0;
    std::mutex mtx;  // 互斥锁成员变量

public:
    void increment() {
        std::lock_guard<std::mutex> lock(mtx);  // 构造时加锁
        count++;                                 // 临界区
    }                                            // 析构时自动解锁

    int get() const {
        std::lock_guard<std::mutex> lock(mtx);
        return count;
    }
};

Java 中的等效代码:

public class Counter {
    private int count = 0;

    public synchronized void increment() {
        count++;
    }

    public synchronized int get() {
        return count;
    }
}

std::lock_guard vs std::unique_lock 对比

特性 lock_guard unique_lock
构造时加锁 ✅ 立即 ✅ 可延迟
手动 unlock/lock ❌ 不能 ✅ 可随时 unlock 再 lock
移动语义 ✅(所有权转移)
条件变量配合 ❌ 不能 ✅ 必须
开销 最小(零额外) 略高(内部标志位)
适用场景 简单临界区 条件变量、可打断锁
// lock_guard:最轻量,不能手动解锁
{
    std::lock_guard<std::mutex> lock(mtx);
    // 临界区
} // 自动解锁

// unique_lock:灵活,可手动解锁
{
    std::unique_lock<std::mutex> lock(mtx);
    // 临界区操作...
    lock.unlock();   // 暂时解锁
    // 做点不需要锁的事...
    lock.lock();     // 重新加锁
    // 继续临界区
} // 自动解锁

// unique_lock 的延迟加锁
std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
// 构造时没有加锁
lock.lock();  // 手动加锁

std::scoped_lock(C++17)——防死锁

当需要同时锁定多个互斥锁时,std::scoped_lock 使用死锁避免算法(类似 std::lock)锁定所有互斥锁。

std::mutex mtx1, mtx2;

// ❌ 危险:手动锁多个可能死锁
{
    std::lock_guard<std::mutex> lock1(mtx1);
    // 如果另一个线程先锁 mtx2 再锁 mtx1 → 死锁!
    std::lock_guard<std::mutex> lock2(mtx2);
}

// ✅ 安全:scoped_lock 使用死锁避免算法
{
    std::scoped_lock lock(mtx1, mtx2);  // C++17 类模板参数推导
    // 同时锁住 mtx1 和 mtx2,不会死锁
}

Java 锁 vs C++ 锁

特性 Java C++
同步块 synchronized(obj) std::lock_guard<std::mutex>
显式锁 ReentrantLock 无标准对应(std::recursive_mutex 不推荐)
多锁避免死锁 无内置支持(需手动顺序) std::scoped_lock(C++17)
可重入 synchronized std::mutex 不是(用 recursive_mutex

注意std::mutex不可重入的——同一线程不能两次 lock 同一个 mutex,否则死锁。如果需要可重入行为,使用 std::recursive_mutex,但不推荐长期持有。


原子操作

对于简单的计数器或标志位,原子操作比互斥锁更高效。

#include <atomic>

std::atomic<int> counter{0};

// 原子递增——比 lock_guard 快得多
counter++;

// 更复杂的原子操作
int expected = 10;
counter.compare_exchange_strong(expected, 20);
// 如果 counter == expected,设为 20;否则 expected = counter

// 内存序控制(默认最严格:memory_order_seq_cst)
counter.store(1, std::memory_order_release);
int v = counter.load(std::memory_order_acquire);
特性 Java C++
基础原子 AtomicIntegerAtomicLong std::atomic<T>
volatile 保证可见性(不保证原子性) std::atomic<T> 使用 memory_order_seq_cst
内存序 不可控 完全可控(6 种 memory_order)

期物(Future)与异步任务

C++ 提供了三个层次的异步编程设施:std::async(最高层)、std::packaged_task(中层)、std::promise(底层)。

std::async——最简单的异步任务

std::async 相当于 Java 的 ExecutorService.submit(Callable),是最推荐的异步方式。

#include <future>
#include <iostream>
#include <chrono>

int slow_square(int x) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return x * x;
}

int main() {
    // 异步执行(类似 submit Callable)
    std::future<int> future = std::async(std::launch::async, slow_square, 5);

    // 做其他事...
    std::cout << "等待结果中...\n";

    // 获取结果(阻塞直到完成)
    int result = future.get();  // 只能调用一次!
    std::cout << "结果为: " << result << std::endl;
}

std::async 启动策略

// std::launch::async:立即在新线程执行
auto f1 = std::async(std::launch::async, slow_square, 5);

// std::launch::deferred:延迟执行(get 时同步执行)
auto f2 = std::async(std::launch::deferred, slow_square, 5);

// 默认策略:由实现决定(可能异步也可能同步)
auto f3 = std::async(slow_square, 5);

std::packaged_task——将可调用对象包装为 future

packaged_task 允许你创建一个"可调用的 future"——把任务包装起来,稍后择机执行。

#include <future>
#include <iostream>

int main() {
    // 创建一个 packaged_task,包装可调用对象
    std::packaged_task<int(int, int)> task([](int a, int b) {
        return a + b;
    });

    // 获取关联的 future
    std::future<int> result = task.get_future();

    // 在另一个线程执行
    std::thread t(std::move(task), 3, 4);  // task 只能移动,不可复制
    t.join();

    std::cout << "结果: " << result.get() << std::endl;  // 7
}

std::promise——手动设置值的底层工具

promise 是最底层的设施,允许你在任意时刻手动设置值/异常给 future。

#include <future>
#include <iostream>
#include <thread>

void worker(std::promise<int> promise) {
    try {
        // 模拟耗时计算
        std::this_thread::sleep_for(std::chrono::seconds(1));
        promise.set_value(42);           // 设置值
    } catch (...) {
        promise.set_exception(std::current_exception());  // 设置异常
    }
}

int main() {
    std::promise<int> promise;
    std::future<int> future = promise.get_future();

    // 把 promise 移动给线程(promise 不可复制)
    std::thread t(worker, std::move(promise));

    // 主线程可以继续做其他事...
    int result = future.get();  // 阻塞直到 worker 设置了值
    std::cout << "结果: " << result << std::endl;

    t.join();
}

超时等待

std::future<int> future = std::async(std::launch::async, []{
    std::this_thread::sleep_for(std::chrono::seconds(3));
    return 42;
});

// 等待 1 秒,如果还没完成就继续
if (future.wait_for(std::chrono::seconds(1)) == std::future_status::ready) {
    std::cout << "已完成: " << future.get();
} else {
    std::cout << "任务还在执行...";
}

Java Future vs C++ Future

操作 Java C++
提交异步任务 ExecutorService.submit(Callable) std::async
获取结果 future.get() future.get()
检查完成 future.isDone() future.wait_for(0s) == ready
取消任务 future.cancel(true) 不支持取消
链式组合 CompletableFuture.thenApply 无标准对应
超时查询 future.get(1, TimeUnit.SECONDS) future.wait_for(1s)

std::async vs std::thread 的选择

场景 推荐 原因
只需异步执行并获取结果 std::async 自动管理线程生命周期
需要精细控制线程 std::thread 可设置优先级、绑定 CPU 等
只需"发后不管" std::thread + detach 更轻量
需要返回值 std::async 直接通过 future 获取
需要异常传播 std::async 异常自动捕获,get 时重新抛出
线程池复用 无标准支持(需第三方库) C++ 标准库无线程池
// ✅ 推荐:使用 async 获取计算结果
std::future<int> result = std::async(std::launch::async, [] {
    return compute_something();
});
// 异常自动传播到 future.get()

// ❌ 不推荐:用 thread 仅为了获取一个返回值
int result;
std::thread t([&result] { result = compute_something(); });
t.join();
// 还要考虑异常处理...

// ✅ thread 适合"发后不管"
std::thread([] {
    while (true) {
        poll_sensor();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}).detach();

条件变量

条件变量(std::condition_variable)用于线程间的等待/通知,相当于 Java 的 wait()/notify()

基础模式

#include <condition_variable>
#include <mutex>
#include <queue>
#include <iostream>

std::queue<int> queue;
std::mutex mtx;
std::condition_variable cv;

void producer() {
    for (int i = 0; i < 10; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        {
            std::lock_guard<std::mutex> lock(mtx);
            queue.push(i);
            std::cout << "生产: " << i << std::endl;
        }  // 解锁后再通知,避免通知后立即重新阻塞
        cv.notify_one();  // 通知一个等待线程
    }
}

void consumer() {
    for (int i = 0; i < 10; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        // ⚠️ 必须使用带谓词的 wait,防止虚假唤醒
        cv.wait(lock, [] { return !queue.empty(); });

        int val = queue.front();
        queue.pop();
        std::cout << "    消费: " << val << std::endl;
    }
}

为什么必须带谓词(predicate)?

// ❌ 危险:无谓词的 wait——可能虚假唤醒
cv.wait(lock);
if (queue.empty()) {
    // 即使 queue.empty(),也可能因为虚假唤醒而到达这里!
}

// ✅ 正确:带谓词的 wait——内部检查条件
cv.wait(lock, [] { return !queue.empty(); });
// 等价于:
while (queue.empty()) {
    cv.wait(lock);  // 唤醒后自动重新检查条件
}
Java C++
wait() cv.wait(lock, predicate)
notify() cv.notify_one()
notifyAll() cv.notify_all()
需在 synchronized 块内 需持有 std::unique_lock

完整案例:生产者-消费者模型

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>

class MessageQueue {
    std::queue<int> queue;
    std::mutex mtx;
    std::condition_variable cv;
    static constexpr size_t MAX_SIZE = 5;
    bool done = false;

public:
    void produce(int from, int to) {
        for (int i = from; i <= to; ++i) {
            std::this_thread::sleep_for(std::chrono::milliseconds(150));

            std::unique_lock<std::mutex> lock(mtx);
            // 如果队列满了,等待消费者消耗
            cv.wait(lock, [this] { return queue.size() < MAX_SIZE; });

            queue.push(i);
            std::cout << "生产 [" << i << "] (队列大小: " << queue.size() << ")\n";

            lock.unlock();
            cv.notify_all();  // 通知消费者
        }

        // 标记生产完成
        {
            std::lock_guard<std::mutex> lock(mtx);
            done = true;
        }
        cv.notify_all();
    }

    void consume() {
        while (true) {
            std::unique_lock<std::mutex> lock(mtx);

            // 等待:队列非空,或生产结束且队列为空
            cv.wait(lock, [this] {
                return !queue.empty() || done;
            });

            // 消费所有可用元素
            while (!queue.empty()) {
                int val = queue.front();
                queue.pop();
                std::cout << "    消费 [" << val << "] (剩余: " << queue.size() << ")\n";
            }

            // 生产结束且队列为空 → 退出
            if (done && queue.empty()) break;

            lock.unlock();
            cv.notify_all();  // 通知生产者(队列有空位了)
        }
        std::cout << "    消费者退出\n";
    }
};

int main() {
    MessageQueue mq;

    std::thread producer(&MessageQueue::produce, &mq, 1, 10);
    std::thread consumer(&MessageQueue::consume, &mq);

    producer.join();
    consumer.join();

    return 0;
}

一次性初始化

std::call_oncestd::once_flag 确保某个函数只被执行一次(即使多线程同时调用),类似 Java 的 double-checked locking

#include <mutex>
#include <iostream>

std::once_flag flag;

void initialize() {
    std::cout << "只执行一次的初始化\n";
    // 打开数据库连接、加载配置等...
}

void worker() {
    std::call_once(flag, initialize);  // 多线程安全,只执行一次
    std::cout << "工作线程开始\n";
}

int main() {
    std::thread t1(worker);
    std::thread t2(worker);
    t1.join();
    t2.join();
    // 输出:
    // 只执行一次的初始化
    // 工作线程开始
    // 工作线程开始
}

线程安全的单例模式(C++11 起)

C++11 保证了函数局部 static 变量的线程安全初始化——这是最简洁、最安全的单例实现。

class Singleton {
    Singleton() = default;
    Singleton(const Singleton&) = delete;
    Singleton& operator=(const Singleton&) = delete;

public:
    static Singleton& instance() {
        static Singleton inst;  // C++11 保证线程安全初始化
        return inst;
    }

    void do_something() { /* ... */ }
};

// 使用
Singleton::instance().do_something();

对比 Java 的 synchronized 单例或 enum 单例,C++ 的 static local 方式更简洁。


编码好习惯与坏味道

✅ 好习惯

① RAII 锁优先于手动 lock/unlock

std::mutex mtx;

// ✅ 好习惯:lock_guard / unique_lock / scoped_lock
{
    std::lock_guard<std::mutex> lock(mtx);
    // 临界区...
}  // 自动解锁

// ❌ 坏味道:手动 lock/unlock(忘记 unlock 或异常中断导致死锁)
mtx.lock();
// ... 如果这里抛出异常,unlock 永远不会执行!
mtx.unlock();

② 优先用 std::async 替代 std::thread 获取结果

// ✅ async:自动管理异常、返回值、线程生命周期
auto result = std::async(std::launch::async, compute);

// ❌ thread:需要手动传递引用、处理异常
std::exception_ptr eptr;
int value;
std::thread t([&] {
    try { value = compute(); }
    catch (...) { eptr = std::current_exception(); }
});
t.join();
if (eptr) std::rethrow_exception(eptr);

cv.wait 始终使用带谓词的版本

// ✅ 正确:带谓词,自动处理虚假唤醒
cv.wait(lock, [] { return !queue.empty(); });

// ❌ 错误:无谓词,可能因虚假唤醒而错误继续
cv.wait(lock);
if (queue.empty()) {
    // 虚假唤醒时会到达这里!
}

④ 用 std::scoped_lock 同时锁多个 mutex 防死锁

std::mutex a, b;

// ✅ 安全:scoped_lock 使用避免死锁算法
std::scoped_lock lock(a, b);

// ❌ 危险:多线程间不同加锁顺序 → 死锁
{
    std::lock_guard<std::mutex> lock1(a);
    std::lock_guard<std::mutex> lock2(b);  // 如果另一个线程先锁 b 再锁 a → 死锁
}

⑤ 通知时尽量在解锁后 notify

{
    std::lock_guard<std::mutex> lock(mtx);
    data_ready = true;
}  // 先解锁
cv.notify_one();  // 再通知(避免唤醒后立即重新阻塞)

⑥ 用 std::atomic 替代 mutex 保护简单共享变量

// ✅ 简单计数器用 atomic 比 mutex 更高效
std::atomic<int> counter{0};
for (int i = 0; i < 100; ++i) {
    std::thread([&counter] { counter++; }).detach();
}

// ❌ 杀鸡用牛刀:简单递增用 mutex 太重
int counter = 0;
std::mutex mtx;
for (int i = 0; i < 100; ++i) {
    std::thread([&] {
        std::lock_guard<std::mutex> lock(mtx);
        counter++;
    }).detach();
}

⑦ mutable mutex 用于 const 方法中的同步

class ThreadSafeCache {
    mutable std::mutex mtx;  // mutable:允许在 const 方法中修改
    std::map<int, int> cache;

public:
    int get(int key) const {
        std::lock_guard<std::mutex> lock(mtx);  // const 方法中加锁
        auto it = cache.find(key);
        return it != cache.end() ? it->second : -1;
    }
};

❌ 坏味道

① 线程不 join 也不 detach

{
    std::thread t(worker);
    // 忘记 join 或 detach
} // t 析构 → std::terminate()!程序崩溃!

// ✅ 正确做法:RAII 包装或确保 join/detach
class ThreadGuard {
    std::thread& t;
public:
    explicit ThreadGuard(std::thread& t_) : t(t_) {}
    ~ThreadGuard() {
        if (t.joinable()) t.join();  // 自动 join
    }
    ThreadGuard(const ThreadGuard&) = delete;
    ThreadGuard& operator=(const ThreadGuard&) = delete;
};

void safe_run() {
    std::thread t(worker);
    ThreadGuard g(t);
    // 即使抛出异常,ThreadGuard 析构时会 join
}

② 用裸 mutex 不加 RAII 锁

std::mutex mtx;

// ❌ 坏味道:裸 mutex 操作
mtx.lock();
// ... 如果这里 return、throw、或忘记 unlock → 死锁
mtx.unlock();

// ✅ RAII 锁
std::lock_guard<std::mutex> lock(mtx);
// ... 无论什么路径离开作用域都会解锁

③ 条件变量不用谓词(虚假唤醒问题)

// ❌ 坏味道:容易被虚假唤醒
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock);
if (queue.empty()) {  // 假设有元素,但实际上没有!
    // 错误地认为队列非空...
}

// ✅ 带谓词的 wait
cv.wait(lock, [] { return !queue.empty(); });

④ 不同加锁顺序导致死锁

std::mutex mtx1, mtx2;

// 线程 A
void thread_a() {
    // 注意:先锁 mtx1 再锁 mtx2
    std::lock_guard<std::mutex> lock1(mtx1);
    std::lock_guard<std::mutex> lock2(mtx2);
}

// 线程 B
void thread_b() {
    // 注意:先锁 mtx2 再锁 mtx1 → 死锁!
    std::lock_guard<std::mutex> lock2(mtx2);
    std::lock_guard<std::mutex> lock1(mtx1);
}

// ✅ 解决方案1:统一加锁顺序
void thread_b_fixed() {
    // 和 thread_a 保持相同顺序
    std::lock_guard<std::mutex> lock1(mtx1);
    std::lock_guard<std::mutex> lock2(mtx2);
}

// ✅ 解决方案2:scoped_lock(推荐)
void thread_a_best() {
    std::scoped_lock lock(mtx1, mtx2);  // 安全,自动避免死锁
}
void thread_b_best() {
    std::scoped_lock lock(mtx1, mtx2);  // 参数顺序无所谓
}

⑤ 在持有锁时调用外部代码(可能导致死锁)

// ❌ 危险:在锁内调用用户回调
void notify_all(const std::function<void()>& callback) {
    std::lock_guard<std::mutex> lock(mtx);
    // ...
    callback();  // 如果 callback 尝试获取同一 mutex → 死锁
}

// ✅ 安全:解锁后再调用
void notify_all(const std::function<void()>& callback) {
    std::vector<Data> snapshot;
    {
        std::lock_guard<std::mutex> lock(mtx);
        snapshot = data;  // 复制数据到本地快照
    }
    callback(snapshot);  // 解锁后调用,安全
}

std::thread 忘记处理异常导致 terminate

// ❌ 坏味道:thread 中的未捕获异常导致 std::terminate
std::thread t([] {
    throw std::runtime_error("错误");
    // 未捕获 → std::terminate()!
});
t.join();

// ✅ 正确:在 thread 内捕获异常
std::thread t([] {
    try {
        throw std::runtime_error("错误");
    } catch (...) {
        // 处理异常或保存到 promise
    }
});
t.join();

// ✅ 最佳:用 std::async,异常自动传播到 future
auto future = std::async(std::launch::async, [] {
    throw std::runtime_error("错误");
    return 42;
});
try {
    future.get();  // 异常在这里重新抛出
} catch (const std::exception& e) {
    std::cerr << "异步任务异常: " << e.what();
}

并发编程要点总结

Java → C++ 核心转换

Java 概念 C++ 等效
synchronized(obj) { } std::lock_guard<std::mutex> lock(mtx)
synchronized 锁多个对象 std::scoped_lock lock(m1, m2)(C++17)
obj.wait() cv.wait(lock, predicate)
obj.notify() cv.notify_one()
obj.notifyAll() cv.notify_all()
ExecutorService.submit(Callable) std::async(std::launch::async, func)
Future.get() std::future::get()
Future.isDone() future.wait_for(0s) == std::future_status::ready
AtomicInteger std::atomic<int>
volatile std::atomic<T>(不要用 volatile 做线程同步!)
call_once / double-checked locking std::call_once / static local

规则总结

  1. RAII 锁优先——lock_guard/scoped_lock 代替手动 lock/unlock
  2. std::async 优先于 std::thread——自动管理异常、返回值、生命周期
  3. 条件变量始终用谓词——避免虚假唤醒导致逻辑错误
  4. std::scoped_lock 处理多锁——参数顺序无关,自动防死锁
  5. 简单变量用 std::atomic——比 std::mutex 轻量
  6. 线程对象必须 join 或 detach——否则析构时程序终止
  7. std::mutex 不可重入——同一线程不能重复 lock,用 recursive_mutex 但需谨慎
  8. 通知时先解锁再 notify——避免被通知线程立即重新阻塞

评论