C++ 并发 vs Java 并发:晚来的标准
C++ 直到 C++11 才在标准库中提供了线程支持(<thread>、<mutex>、<future>、<condition_variable> 等),而 Java 从 1.0 开始就有线程。但 C++ 的设计思路和 Java 的 java.util.concurrent 非常相似,而且 C++ 的 RAII 机制让资源管理更安全。
核心差异:C++ 没有内置的 synchronized 关键字,所有同步都通过 RAII 锁对象管理。C++ 的线程对象析构时必须已 join 或 detach,否则调用
std::terminate()。
线程的创建和管理
基础线程创建
#include <thread>
#include <iostream>
void worker(int id) {
std::cout << "Worker " << id << " running\n";
}
int main() {
// 通过函数指针创建
std::thread t1(worker, 42);
// 通过 Lambda 创建(最常用)
std::thread t2([] {
std::cout << "Hello from Lambda thread\n";
});
// 通过可调用对象创建
struct Task {
void operator()() const {
std::cout << "Task executed\n";
}
};
std::thread t3(Task{});
// 必须 join 或 detach!
t1.join(); // 等待线程完成
t2.join();
t3.detach(); // 分离线程(让它独自运行)
// 如果 t1 在 join/detach 前析构 → std::terminate()!
return 0;
}
Java vs C++ 线程创建
| 操作 | Java | C++ |
|---|---|---|
| 创建线程 | new Thread(runnable).start() |
std::thread(func, args) |
| 等待完成 | thread.join() |
thread.join() |
| 分离线程 | 自动(JVM 管理) | thread.detach() |
重要:Java 线程不 join 也无妨(JVM 会清理守护线程),但 C++ 的 std::thread 是 RAII 对象——析构时如果线程仍可 join(joinable),程序直接终止。
{
std::thread t(worker, 1);
// 忘记 join 或 detach
} // t 析构 → std::terminate()!
互斥锁与 RAII
C++ 没有 synchronized 关键字,所有锁操作通过 RAII 包装类完成。
基础互斥锁
#include <mutex>
class Counter {
int count = 0;
std::mutex mtx; // 互斥锁成员变量
public:
void increment() {
std::lock_guard<std::mutex> lock(mtx); // 构造时加锁
count++; // 临界区
} // 析构时自动解锁
int get() const {
std::lock_guard<std::mutex> lock(mtx);
return count;
}
};
Java 中的等效代码:
public class Counter {
private int count = 0;
public synchronized void increment() {
count++;
}
public synchronized int get() {
return count;
}
}
std::lock_guard vs std::unique_lock 对比
| 特性 | lock_guard |
unique_lock |
|---|---|---|
| 构造时加锁 | ✅ 立即 | ✅ 可延迟 |
| 手动 unlock/lock | ❌ 不能 | ✅ 可随时 unlock 再 lock |
| 移动语义 | ❌ | ✅(所有权转移) |
| 条件变量配合 | ❌ 不能 | ✅ 必须 |
| 开销 | 最小(零额外) | 略高(内部标志位) |
| 适用场景 | 简单临界区 | 条件变量、可打断锁 |
// lock_guard:最轻量,不能手动解锁
{
std::lock_guard<std::mutex> lock(mtx);
// 临界区
} // 自动解锁
// unique_lock:灵活,可手动解锁
{
std::unique_lock<std::mutex> lock(mtx);
// 临界区操作...
lock.unlock(); // 暂时解锁
// 做点不需要锁的事...
lock.lock(); // 重新加锁
// 继续临界区
} // 自动解锁
// unique_lock 的延迟加锁
std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
// 构造时没有加锁
lock.lock(); // 手动加锁
std::scoped_lock(C++17)——防死锁
当需要同时锁定多个互斥锁时,std::scoped_lock 使用死锁避免算法(类似 std::lock)锁定所有互斥锁。
std::mutex mtx1, mtx2;
// ❌ 危险:手动锁多个可能死锁
{
std::lock_guard<std::mutex> lock1(mtx1);
// 如果另一个线程先锁 mtx2 再锁 mtx1 → 死锁!
std::lock_guard<std::mutex> lock2(mtx2);
}
// ✅ 安全:scoped_lock 使用死锁避免算法
{
std::scoped_lock lock(mtx1, mtx2); // C++17 类模板参数推导
// 同时锁住 mtx1 和 mtx2,不会死锁
}
Java 锁 vs C++ 锁
| 特性 | Java | C++ |
|---|---|---|
| 同步块 | synchronized(obj) |
std::lock_guard<std::mutex> |
| 显式锁 | ReentrantLock |
无标准对应(std::recursive_mutex 不推荐) |
| 多锁避免死锁 | 无内置支持(需手动顺序) | std::scoped_lock(C++17) |
| 可重入 | synchronized 是 |
std::mutex 不是(用 recursive_mutex) |
注意:std::mutex 是不可重入的——同一线程不能两次 lock 同一个 mutex,否则死锁。如果需要可重入行为,使用 std::recursive_mutex,但不推荐长期持有。
原子操作
对于简单的计数器或标志位,原子操作比互斥锁更高效。
#include <atomic>
std::atomic<int> counter{0};
// 原子递增——比 lock_guard 快得多
counter++;
// 更复杂的原子操作
int expected = 10;
counter.compare_exchange_strong(expected, 20);
// 如果 counter == expected,设为 20;否则 expected = counter
// 内存序控制(默认最严格:memory_order_seq_cst)
counter.store(1, std::memory_order_release);
int v = counter.load(std::memory_order_acquire);
| 特性 | Java | C++ |
|---|---|---|
| 基础原子 | AtomicInteger、AtomicLong 等 |
std::atomic<T> |
| volatile | 保证可见性(不保证原子性) | std::atomic<T> 使用 memory_order_seq_cst |
| 内存序 | 不可控 | 完全可控(6 种 memory_order) |
期物(Future)与异步任务
C++ 提供了三个层次的异步编程设施:std::async(最高层)、std::packaged_task(中层)、std::promise(底层)。
std::async——最简单的异步任务
std::async 相当于 Java 的 ExecutorService.submit(Callable),是最推荐的异步方式。
#include <future>
#include <iostream>
#include <chrono>
int slow_square(int x) {
std::this_thread::sleep_for(std::chrono::seconds(1));
return x * x;
}
int main() {
// 异步执行(类似 submit Callable)
std::future<int> future = std::async(std::launch::async, slow_square, 5);
// 做其他事...
std::cout << "等待结果中...\n";
// 获取结果(阻塞直到完成)
int result = future.get(); // 只能调用一次!
std::cout << "结果为: " << result << std::endl;
}
std::async 启动策略
// std::launch::async:立即在新线程执行
auto f1 = std::async(std::launch::async, slow_square, 5);
// std::launch::deferred:延迟执行(get 时同步执行)
auto f2 = std::async(std::launch::deferred, slow_square, 5);
// 默认策略:由实现决定(可能异步也可能同步)
auto f3 = std::async(slow_square, 5);
std::packaged_task——将可调用对象包装为 future
packaged_task 允许你创建一个"可调用的 future"——把任务包装起来,稍后择机执行。
#include <future>
#include <iostream>
int main() {
// 创建一个 packaged_task,包装可调用对象
std::packaged_task<int(int, int)> task([](int a, int b) {
return a + b;
});
// 获取关联的 future
std::future<int> result = task.get_future();
// 在另一个线程执行
std::thread t(std::move(task), 3, 4); // task 只能移动,不可复制
t.join();
std::cout << "结果: " << result.get() << std::endl; // 7
}
std::promise——手动设置值的底层工具
promise 是最底层的设施,允许你在任意时刻手动设置值/异常给 future。
#include <future>
#include <iostream>
#include <thread>
void worker(std::promise<int> promise) {
try {
// 模拟耗时计算
std::this_thread::sleep_for(std::chrono::seconds(1));
promise.set_value(42); // 设置值
} catch (...) {
promise.set_exception(std::current_exception()); // 设置异常
}
}
int main() {
std::promise<int> promise;
std::future<int> future = promise.get_future();
// 把 promise 移动给线程(promise 不可复制)
std::thread t(worker, std::move(promise));
// 主线程可以继续做其他事...
int result = future.get(); // 阻塞直到 worker 设置了值
std::cout << "结果: " << result << std::endl;
t.join();
}
超时等待
std::future<int> future = std::async(std::launch::async, []{
std::this_thread::sleep_for(std::chrono::seconds(3));
return 42;
});
// 等待 1 秒,如果还没完成就继续
if (future.wait_for(std::chrono::seconds(1)) == std::future_status::ready) {
std::cout << "已完成: " << future.get();
} else {
std::cout << "任务还在执行...";
}
Java Future vs C++ Future
| 操作 | Java | C++ |
|---|---|---|
| 提交异步任务 | ExecutorService.submit(Callable) |
std::async |
| 获取结果 | future.get() |
future.get() |
| 检查完成 | future.isDone() |
future.wait_for(0s) == ready |
| 取消任务 | future.cancel(true) |
不支持取消 |
| 链式组合 | CompletableFuture.thenApply |
无标准对应 |
| 超时查询 | future.get(1, TimeUnit.SECONDS) |
future.wait_for(1s) |
std::async vs std::thread 的选择
| 场景 | 推荐 | 原因 |
|---|---|---|
| 只需异步执行并获取结果 | std::async |
自动管理线程生命周期 |
| 需要精细控制线程 | std::thread |
可设置优先级、绑定 CPU 等 |
| 只需"发后不管" | std::thread + detach |
更轻量 |
| 需要返回值 | std::async |
直接通过 future 获取 |
| 需要异常传播 | std::async |
异常自动捕获,get 时重新抛出 |
| 线程池复用 | 无标准支持(需第三方库) | C++ 标准库无线程池 |
// ✅ 推荐:使用 async 获取计算结果
std::future<int> result = std::async(std::launch::async, [] {
return compute_something();
});
// 异常自动传播到 future.get()
// ❌ 不推荐:用 thread 仅为了获取一个返回值
int result;
std::thread t([&result] { result = compute_something(); });
t.join();
// 还要考虑异常处理...
// ✅ thread 适合"发后不管"
std::thread([] {
while (true) {
poll_sensor();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}).detach();
条件变量
条件变量(std::condition_variable)用于线程间的等待/通知,相当于 Java 的 wait()/notify()。
基础模式
#include <condition_variable>
#include <mutex>
#include <queue>
#include <iostream>
std::queue<int> queue;
std::mutex mtx;
std::condition_variable cv;
void producer() {
for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
{
std::lock_guard<std::mutex> lock(mtx);
queue.push(i);
std::cout << "生产: " << i << std::endl;
} // 解锁后再通知,避免通知后立即重新阻塞
cv.notify_one(); // 通知一个等待线程
}
}
void consumer() {
for (int i = 0; i < 10; ++i) {
std::unique_lock<std::mutex> lock(mtx);
// ⚠️ 必须使用带谓词的 wait,防止虚假唤醒
cv.wait(lock, [] { return !queue.empty(); });
int val = queue.front();
queue.pop();
std::cout << " 消费: " << val << std::endl;
}
}
为什么必须带谓词(predicate)?
// ❌ 危险:无谓词的 wait——可能虚假唤醒
cv.wait(lock);
if (queue.empty()) {
// 即使 queue.empty(),也可能因为虚假唤醒而到达这里!
}
// ✅ 正确:带谓词的 wait——内部检查条件
cv.wait(lock, [] { return !queue.empty(); });
// 等价于:
while (queue.empty()) {
cv.wait(lock); // 唤醒后自动重新检查条件
}
| Java | C++ |
|---|---|
wait() |
cv.wait(lock, predicate) |
notify() |
cv.notify_one() |
notifyAll() |
cv.notify_all() |
| 需在 synchronized 块内 | 需持有 std::unique_lock |
完整案例:生产者-消费者模型
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>
class MessageQueue {
std::queue<int> queue;
std::mutex mtx;
std::condition_variable cv;
static constexpr size_t MAX_SIZE = 5;
bool done = false;
public:
void produce(int from, int to) {
for (int i = from; i <= to; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(150));
std::unique_lock<std::mutex> lock(mtx);
// 如果队列满了,等待消费者消耗
cv.wait(lock, [this] { return queue.size() < MAX_SIZE; });
queue.push(i);
std::cout << "生产 [" << i << "] (队列大小: " << queue.size() << ")\n";
lock.unlock();
cv.notify_all(); // 通知消费者
}
// 标记生产完成
{
std::lock_guard<std::mutex> lock(mtx);
done = true;
}
cv.notify_all();
}
void consume() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
// 等待:队列非空,或生产结束且队列为空
cv.wait(lock, [this] {
return !queue.empty() || done;
});
// 消费所有可用元素
while (!queue.empty()) {
int val = queue.front();
queue.pop();
std::cout << " 消费 [" << val << "] (剩余: " << queue.size() << ")\n";
}
// 生产结束且队列为空 → 退出
if (done && queue.empty()) break;
lock.unlock();
cv.notify_all(); // 通知生产者(队列有空位了)
}
std::cout << " 消费者退出\n";
}
};
int main() {
MessageQueue mq;
std::thread producer(&MessageQueue::produce, &mq, 1, 10);
std::thread consumer(&MessageQueue::consume, &mq);
producer.join();
consumer.join();
return 0;
}
一次性初始化
std::call_once 和 std::once_flag 确保某个函数只被执行一次(即使多线程同时调用),类似 Java 的 double-checked locking。
#include <mutex>
#include <iostream>
std::once_flag flag;
void initialize() {
std::cout << "只执行一次的初始化\n";
// 打开数据库连接、加载配置等...
}
void worker() {
std::call_once(flag, initialize); // 多线程安全,只执行一次
std::cout << "工作线程开始\n";
}
int main() {
std::thread t1(worker);
std::thread t2(worker);
t1.join();
t2.join();
// 输出:
// 只执行一次的初始化
// 工作线程开始
// 工作线程开始
}
线程安全的单例模式(C++11 起)
C++11 保证了函数局部 static 变量的线程安全初始化——这是最简洁、最安全的单例实现。
class Singleton {
Singleton() = default;
Singleton(const Singleton&) = delete;
Singleton& operator=(const Singleton&) = delete;
public:
static Singleton& instance() {
static Singleton inst; // C++11 保证线程安全初始化
return inst;
}
void do_something() { /* ... */ }
};
// 使用
Singleton::instance().do_something();
对比 Java 的 synchronized 单例或 enum 单例,C++ 的 static local 方式更简洁。
编码好习惯与坏味道
✅ 好习惯
① RAII 锁优先于手动 lock/unlock
std::mutex mtx;
// ✅ 好习惯:lock_guard / unique_lock / scoped_lock
{
std::lock_guard<std::mutex> lock(mtx);
// 临界区...
} // 自动解锁
// ❌ 坏味道:手动 lock/unlock(忘记 unlock 或异常中断导致死锁)
mtx.lock();
// ... 如果这里抛出异常,unlock 永远不会执行!
mtx.unlock();
② 优先用 std::async 替代 std::thread 获取结果
// ✅ async:自动管理异常、返回值、线程生命周期
auto result = std::async(std::launch::async, compute);
// ❌ thread:需要手动传递引用、处理异常
std::exception_ptr eptr;
int value;
std::thread t([&] {
try { value = compute(); }
catch (...) { eptr = std::current_exception(); }
});
t.join();
if (eptr) std::rethrow_exception(eptr);
③ cv.wait 始终使用带谓词的版本
// ✅ 正确:带谓词,自动处理虚假唤醒
cv.wait(lock, [] { return !queue.empty(); });
// ❌ 错误:无谓词,可能因虚假唤醒而错误继续
cv.wait(lock);
if (queue.empty()) {
// 虚假唤醒时会到达这里!
}
④ 用 std::scoped_lock 同时锁多个 mutex 防死锁
std::mutex a, b;
// ✅ 安全:scoped_lock 使用避免死锁算法
std::scoped_lock lock(a, b);
// ❌ 危险:多线程间不同加锁顺序 → 死锁
{
std::lock_guard<std::mutex> lock1(a);
std::lock_guard<std::mutex> lock2(b); // 如果另一个线程先锁 b 再锁 a → 死锁
}
⑤ 通知时尽量在解锁后 notify
{
std::lock_guard<std::mutex> lock(mtx);
data_ready = true;
} // 先解锁
cv.notify_one(); // 再通知(避免唤醒后立即重新阻塞)
⑥ 用 std::atomic 替代 mutex 保护简单共享变量
// ✅ 简单计数器用 atomic 比 mutex 更高效
std::atomic<int> counter{0};
for (int i = 0; i < 100; ++i) {
std::thread([&counter] { counter++; }).detach();
}
// ❌ 杀鸡用牛刀:简单递增用 mutex 太重
int counter = 0;
std::mutex mtx;
for (int i = 0; i < 100; ++i) {
std::thread([&] {
std::lock_guard<std::mutex> lock(mtx);
counter++;
}).detach();
}
⑦ mutable mutex 用于 const 方法中的同步
class ThreadSafeCache {
mutable std::mutex mtx; // mutable:允许在 const 方法中修改
std::map<int, int> cache;
public:
int get(int key) const {
std::lock_guard<std::mutex> lock(mtx); // const 方法中加锁
auto it = cache.find(key);
return it != cache.end() ? it->second : -1;
}
};
❌ 坏味道
① 线程不 join 也不 detach
{
std::thread t(worker);
// 忘记 join 或 detach
} // t 析构 → std::terminate()!程序崩溃!
// ✅ 正确做法:RAII 包装或确保 join/detach
class ThreadGuard {
std::thread& t;
public:
explicit ThreadGuard(std::thread& t_) : t(t_) {}
~ThreadGuard() {
if (t.joinable()) t.join(); // 自动 join
}
ThreadGuard(const ThreadGuard&) = delete;
ThreadGuard& operator=(const ThreadGuard&) = delete;
};
void safe_run() {
std::thread t(worker);
ThreadGuard g(t);
// 即使抛出异常,ThreadGuard 析构时会 join
}
② 用裸 mutex 不加 RAII 锁
std::mutex mtx;
// ❌ 坏味道:裸 mutex 操作
mtx.lock();
// ... 如果这里 return、throw、或忘记 unlock → 死锁
mtx.unlock();
// ✅ RAII 锁
std::lock_guard<std::mutex> lock(mtx);
// ... 无论什么路径离开作用域都会解锁
③ 条件变量不用谓词(虚假唤醒问题)
// ❌ 坏味道:容易被虚假唤醒
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock);
if (queue.empty()) { // 假设有元素,但实际上没有!
// 错误地认为队列非空...
}
// ✅ 带谓词的 wait
cv.wait(lock, [] { return !queue.empty(); });
④ 不同加锁顺序导致死锁
std::mutex mtx1, mtx2;
// 线程 A
void thread_a() {
// 注意:先锁 mtx1 再锁 mtx2
std::lock_guard<std::mutex> lock1(mtx1);
std::lock_guard<std::mutex> lock2(mtx2);
}
// 线程 B
void thread_b() {
// 注意:先锁 mtx2 再锁 mtx1 → 死锁!
std::lock_guard<std::mutex> lock2(mtx2);
std::lock_guard<std::mutex> lock1(mtx1);
}
// ✅ 解决方案1:统一加锁顺序
void thread_b_fixed() {
// 和 thread_a 保持相同顺序
std::lock_guard<std::mutex> lock1(mtx1);
std::lock_guard<std::mutex> lock2(mtx2);
}
// ✅ 解决方案2:scoped_lock(推荐)
void thread_a_best() {
std::scoped_lock lock(mtx1, mtx2); // 安全,自动避免死锁
}
void thread_b_best() {
std::scoped_lock lock(mtx1, mtx2); // 参数顺序无所谓
}
⑤ 在持有锁时调用外部代码(可能导致死锁)
// ❌ 危险:在锁内调用用户回调
void notify_all(const std::function<void()>& callback) {
std::lock_guard<std::mutex> lock(mtx);
// ...
callback(); // 如果 callback 尝试获取同一 mutex → 死锁
}
// ✅ 安全:解锁后再调用
void notify_all(const std::function<void()>& callback) {
std::vector<Data> snapshot;
{
std::lock_guard<std::mutex> lock(mtx);
snapshot = data; // 复制数据到本地快照
}
callback(snapshot); // 解锁后调用,安全
}
⑥ std::thread 忘记处理异常导致 terminate
// ❌ 坏味道:thread 中的未捕获异常导致 std::terminate
std::thread t([] {
throw std::runtime_error("错误");
// 未捕获 → std::terminate()!
});
t.join();
// ✅ 正确:在 thread 内捕获异常
std::thread t([] {
try {
throw std::runtime_error("错误");
} catch (...) {
// 处理异常或保存到 promise
}
});
t.join();
// ✅ 最佳:用 std::async,异常自动传播到 future
auto future = std::async(std::launch::async, [] {
throw std::runtime_error("错误");
return 42;
});
try {
future.get(); // 异常在这里重新抛出
} catch (const std::exception& e) {
std::cerr << "异步任务异常: " << e.what();
}
并发编程要点总结
Java → C++ 核心转换
| Java 概念 | C++ 等效 |
|---|---|
synchronized(obj) { } |
std::lock_guard<std::mutex> lock(mtx) |
synchronized 锁多个对象 |
std::scoped_lock lock(m1, m2)(C++17) |
obj.wait() |
cv.wait(lock, predicate) |
obj.notify() |
cv.notify_one() |
obj.notifyAll() |
cv.notify_all() |
ExecutorService.submit(Callable) |
std::async(std::launch::async, func) |
Future.get() |
std::future::get() |
Future.isDone() |
future.wait_for(0s) == std::future_status::ready |
AtomicInteger |
std::atomic<int> |
volatile |
std::atomic<T>(不要用 volatile 做线程同步!) |
call_once / double-checked locking |
std::call_once / static local |
规则总结
- RAII 锁优先——
lock_guard/scoped_lock代替手动 lock/unlock std::async优先于std::thread——自动管理异常、返回值、生命周期- 条件变量始终用谓词——避免虚假唤醒导致逻辑错误
std::scoped_lock处理多锁——参数顺序无关,自动防死锁- 简单变量用
std::atomic——比std::mutex轻量 - 线程对象必须 join 或 detach——否则析构时程序终止
std::mutex不可重入——同一线程不能重复 lock,用recursive_mutex但需谨慎- 通知时先解锁再 notify——避免被通知线程立即重新阻塞