C++ Concurrent - Promise / future / async[2]
이번 시간에는 C++의 비동기 처리 관련 기술을 살펴보겠습니다.
std:promise
promise란
- 스레드 사이에서 값(객체) 또는 예외를 공유 할 수 있는 템플릿
- promise를 통해 전달된 데이터는 future를 통해서 얻을 수 있음
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
using namespace std::literals;
void add(std::promise<int>&& p, int a, int b)
{
int s = a + b;
std::this_thread::sleep_for(1s);
//p.set_value(s);
p.set_value_at_thread_exit(s);
std::cout << "add" << std::endl;
std::this_thread::sleep_for(1s);
}
int main()
{
std::promise<int> pm;
std::future<int> ft = pm.get_future();
std::thread t(add, std::move(pm), 10, 20);
//....
int ret = ft.get(); // blocking
std::cout << ret << std::endl; // 30
t.join();
}
get 수행시 예외를 발생시키는 법
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
using namespace std::literals;
void divide(std::promise<int>&& p, int a, int b)
{
try
{
if ( b == 0 )
throw std::runtime_error("divide by zero");
p.set_value(a / b);
}
catch(...)
{
//p.set_exception( std::current_exception() );
p.set_exception_at_thread_exit( std::current_exception() );
}
}
int main()
{
std::promise<int> pm;
std::future<int> ft = pm.get_future();
std::thread t(divide, std::move(pm), 10, 0);
try
{
int ret = ft.get();
}
catch(std::exception& e)
{
std::cout << e.what() << std::endl;
}
t.join();
}
std::future
shared_future
- 일반 future 객체는 복사 불가, 여러 스레드에서 공유 시키고 싶을 때 사용
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
using namespace std::literals;
void add(std::promise<int>&& p, int a, int b)
{
std::this_thread::sleep_for(1s);
p.set_value(a + b);
}
void consume(std::shared_future<int> sf)
{
sf.get();
std::cout << "finish foo" << std::endl;
}
int main()
{
std::promise<int> pm;
// std::future<int> ft = pm.get_future();
//std::future<int> ft2 = ft; // error
// std::shared_future<int> sft = ft.share();
std::shared_future<int> sft = pm.get_future();
std::thread t(add, std::move(pm), 10, 20);
std::thread t1(consume, sft);
std::thread t2(consume, sft);
t.join();
t1.join();
t2.join();
}
핵심 정리
- set_value는 1회만 가능
- get_future(), get() 또한 1회만 가능
std::packaged_task
멀티스레드를 고려하지 않고 작성한 함수를 비동기로 호출 할 수 있음
#include <iostream>
#include <thread>
#include <future>
int add(int a, int b)
{
std::cout << "add" << std::endl;
return a + b;
}
int main()
{
std::packaged_task<int(int, int)> task(add);
std::future<int> ft = task.get_future();
//task(10, 20); // add(10, 20)
std::thread t(std::move(task), 10, 20);
std::cout << "continue main" << std::endl;
int ret = ft.get();
std::cout << ret << std::endl;
t.join();
}
std::async
비동기 함수 개념
- 함수 호출시 호출지점에서 주 스레드의 흐름이 block 되지 않고, 호출 후 주 스레드의 흐름이 이어짐
C++에서 비동기 처리를 구현하는 방법 3가지
- std::thead 사용
- std::jthread 사용(C++20)
- std::async 사용
std::async의 개념
- 주어진 함수를 비동기로 수행하는 함수 템플릿
- 기존 동기 방식으로 작성된 함수를 간단하게 스레드로 수행 할 수 있음
- 일반적인 구현은 이미 구현되어 있는 스레드풀이 사용됨
- std::future 반환
#include <iostream>
#include <thread>
#include <chrono>
#include <future>
using namespace std::literals;
int add(int a, int b)
{
std::this_thread::sleep_for(2s);
return a + b;
}
int main()
{
//add(10, 20); // 동기 호출
std::future<int> ft = std::async(add, 10, 20);
std::cout << "continue main" << std::endl;
int ret = ft.get();
std::cout << "result : " << ret << std::endl;
}
예제1
#include <iostream>
#include <thread>
#include <chrono>
#include <future>
using namespace std::literals;
int add(int a, int b)
{
std::cout << "add : " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(2s);
return a + b;
}
int main()
{
std::future<int> ft = std::async( std::launch::async, add, 10, 20);
// std::future<int> ft = std::async( std::launch::deferred, add, 10, 20); // 지연된 실행
// std::future<int> ft = std::async( std::launch::async | std::launch::deferred , add, 10, 20);
// std::future<int> ft = std::async( add, 10, 20);
std::cout << "continue main : " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(2s);
int ret = ft.get();
std::cout << "result : " << ret << std::endl;
}
std::jthread
std::thread 사용시 반드시 join 또는 detach 해야 함(불편)
- C++20에서 소멸자에서 join을 처리하는 jthread 추가
- 작업중인 스레드에 중단을 요청하는 멤버 추가 : request_stop()
#include <chrono>
#include <iostream>
#include <thread>
using namespace std::literals;
void foo()
{
for( int i = 0; i < 10; i++)
{
std::this_thread::sleep_for(500ms);
std::cout << "foo : " << i << std::endl;
}
}
void goo( std::stop_token token )
{
for( int i = 0; i < 10; i++)
{
if ( token.stop_requested() )
{
std::cout << "중지요청" << std::endl;
return ;
}
std::this_thread::sleep_for(500ms);
std::cout << "goo : " << i << std::endl;
}
}
int main()
{
std::jthread j1(foo);
std::jthread j2(goo);
std::this_thread::sleep_for(2s);
j1.request_stop();
j2.request_stop();
}