이번 시간에는 C++의 비동기 처리 관련 기술을 살펴보겠습니다.

std:promise

promise란

  • 스레드 사이에서 값(객체) 또는 예외를 공유 할 수 있는 템플릿
  • promise를 통해 전달된 데이터는 future를 통해서 얻을 수 있음
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
using namespace std::literals;

void add(std::promise<int>&& p, int a, int b)
{
	int s = a + b;
	std::this_thread::sleep_for(1s);
	//p.set_value(s);

	p.set_value_at_thread_exit(s);
	
	std::cout << "add" << std::endl;
	std::this_thread::sleep_for(1s);
}
int main()
{
	std::promise<int> pm;
	std::future<int> ft = pm.get_future();

	std::thread t(add, std::move(pm), 10, 20);
	//....
	int ret = ft.get(); // blocking			
	std::cout << ret << std::endl;  // 30
	t.join();
}

get 수행시 예외를 발생시키는 법

#include <iostream>
#include <thread>
#include <future>
#include <chrono>
using namespace std::literals;

void divide(std::promise<int>&& p, int a, int b)
{
    try
    {
        if ( b == 0 ) 
            throw std::runtime_error("divide by zero");

        p.set_value(a / b);
    }
    catch(...)
    {
        //p.set_exception( std::current_exception() );
        p.set_exception_at_thread_exit( std::current_exception() );
    }
}

int main()
{
	std::promise<int> pm;
	std::future<int>  ft = pm.get_future();

	std::thread t(divide, std::move(pm), 10, 0);

    try
    {
   	    int ret = ft.get(); 
    }
    catch(std::exception& e)
    {
        std::cout << e.what() << std::endl;
    }

	t.join();
}

std::future

shared_future

  • 일반 future 객체는 복사 불가, 여러 스레드에서 공유 시키고 싶을 때 사용
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
using namespace std::literals;

void add(std::promise<int>&& p, int a, int b)
{
    std::this_thread::sleep_for(1s);
    p.set_value(a + b);
}

void consume(std::shared_future<int> sf)
{
    sf.get();
    std::cout << "finish foo" << std::endl;
}
int main()
{
    std::promise<int> pm;
//    std::future<int> ft = pm.get_future();
    //std::future<int> ft2 = ft; // error
//    std::shared_future<int> sft = ft.share();

    std::shared_future<int> sft = pm.get_future();
 
    std::thread t(add, std::move(pm), 10, 20);

    std::thread t1(consume, sft);
    std::thread t2(consume, sft);

    t.join();
    t1.join();
    t2.join();
}

핵심 정리

  • set_value는 1회만 가능
  • get_future(), get() 또한 1회만 가능

std::packaged_task

멀티스레드를 고려하지 않고 작성한 함수를 비동기로 호출 할 수 있음

#include <iostream>
#include <thread>
#include <future>

int add(int a, int b)
{
    std::cout << "add" << std::endl;
    return a + b;
}
int main()
{
    std::packaged_task<int(int, int)> task(add);
    std::future<int> ft = task.get_future();

    //task(10, 20); // add(10, 20)
    std::thread t(std::move(task), 10, 20);
    std::cout << "continue main" << std::endl;
    
    int ret = ft.get();
    std::cout << ret << std::endl;
    t.join();
}

std::async

비동기 함수 개념

  • 함수 호출시 호출지점에서 주 스레드의 흐름이 block 되지 않고, 호출 후 주 스레드의 흐름이 이어짐

C++에서 비동기 처리를 구현하는 방법 3가지

  1. std::thead 사용
  2. std::jthread 사용(C++20)
  3. std::async 사용

std::async의 개념

  • 주어진 함수를 비동기로 수행하는 함수 템플릿
  • 기존 동기 방식으로 작성된 함수를 간단하게 스레드로 수행 할 수 있음
  • 일반적인 구현은 이미 구현되어 있는 스레드풀이 사용됨
  • std::future 반환
#include <iostream>
#include <thread>
#include <chrono>
#include <future>
using namespace std::literals;

int add(int a, int b)
{
    std::this_thread::sleep_for(2s);
    return a + b;
}

int main()
{
     //add(10, 20); // 동기 호출
     std::future<int> ft = std::async(add, 10, 20);
     std::cout << "continue main" << std::endl;
     int ret = ft.get();
     std::cout << "result : " << ret << std::endl;
}

예제1

#include <iostream>
#include <thread>
#include <chrono>
#include <future>
using namespace std::literals;

int add(int a, int b)
{
    std::cout << "add : " << std::this_thread::get_id() << std::endl;    
    std::this_thread::sleep_for(2s);
    return a + b;
}
int main()
{
    std::future<int> ft = std::async( std::launch::async,    add, 10, 20);
//    std::future<int> ft = std::async( std::launch::deferred, add, 10, 20); // 지연된 실행
//    std::future<int> ft = std::async( std::launch::async | std::launch::deferred , add, 10, 20);
//    std::future<int> ft = std::async( add, 10, 20);

    std::cout << "continue main : " << std::this_thread::get_id() << std::endl;    
    std::this_thread::sleep_for(2s);
    int ret = ft.get();

    std::cout << "result : " << ret << std::endl;
}

std::jthread

std::thread 사용시 반드시 join 또는 detach 해야 함(불편)

  • C++20에서 소멸자에서 join을 처리하는 jthread 추가
  • 작업중인 스레드에 중단을 요청하는 멤버 추가 : request_stop()
#include <chrono>
#include <iostream>
#include <thread>
using namespace std::literals;

void foo()
{
    for( int i = 0; i < 10; i++)
    {
        std::this_thread::sleep_for(500ms);
        std::cout << "foo : " << i << std::endl;
    }
}
void goo( std::stop_token token ) 
{     
    for( int i = 0; i < 10; i++)
    {
        if ( token.stop_requested() )
        {
            std::cout << "중지요청" << std::endl;
            return ;
        }

        std::this_thread::sleep_for(500ms);
        std::cout << "goo : " << i << std::endl;
    }
}
int main() 
{
    std::jthread j1(foo);
    std::jthread j2(goo);
    std::this_thread::sleep_for(2s);

    j1.request_stop();
    j2.request_stop();
}