[Spring] @Async 비동기 멀티스레드 사용법

by 계범 2022. 6. 25.

수정사항 2022-08-27

  • async 사용 시 비동기 스레드 exception 처리
  • CompletableFuture 사용법 추가

Async 사용계기

현재 마이다스 AI 역검 백엔드팀에 들어오게 되었는데,

과제 중 원활한 검증작업을 위한 응시 데이터를 만드는 것을 담당하게 되었다.


실제로는 응시자 한명 당 다양한 게임마다 응시를 본 데이터가 날라오게 되고, 이를 각각 저장하게 되는데

응시를 실제로 보기때문에 하나의 패킷이 날라오는데 시간도 오래걸리게되어서 부담이 작다고 생각했지만,


나는 한번의 요청으로 응시자를 생성해준 후 각 게임마다 문제를 불러오고 문제에 대한 응시데이터도 만들어주고 해야한다. 거기다 검증을 위해 응시자를 천명~만명을 만든다고하면, 만명에 대한 것을 만들어줘야하기때문에 굉장히 느려졌다.


스프링은 요청 하나 당 스레드를 만들어줘서 진행하게 되는데,

나는 하나의 요청에서 만명의 데이터를 만들어줘야하기때문에 멀티 스레드를 생각했다.


스프링엔 @Async 어노테이션이 존재한다.(JS에도 동일이름으로 있음!)

이 어노테이션은 쓰레드풀을 활용한 비동기 메소드를 지원해준다.


메소드에 @Async를 달아두면 비동기로 호출자는 즉시 리턴하고 spring TaskExcutor에 의해 새로운 스레드로 실행되게 된다.


해당 어노테이션을 통해 속도가 굉장히 빨라졌다 ㅎㅎ


@Async 사용 전 주의사항

  • public 메소드에만 사용 가능
  • 자가 호출(self-invocation) 불가능
    • 같은객체(클래스) 내의 메소드 호출시 불가능.(@Async가 붙은 메소드를 호출 시에 다른 클래스에서 호출해야한다!)
  • ThreadLocal 사용 시 내용 복사
  • 비동기 스레드에서 터진 Exception 처리
  • (프로젝트 내 Thread 개수 제한 걸려있는지 확인해볼것)


1,2  -> @Async의 동작은 별도로 설정하지 않으면 Proxy 모드가 적용되면서 스프링의 AOP를 가져가는데, 그로 인해 AOP와 관련된 제약사항을 다 가지게 된다.

AOP는 프록시 패턴이 사용되고, 프록시패턴은 실제 기능을 수행하는 객체 대신 가상의 객체를 사용하게 되는 것이므로, private으로 접근이 불가능하다던가 자가호출을 하게되면 proxy를 거치지 않기때문에 사용이 불가능해진다.


3 - > @Async를 사용하게되면, 새로운 스레드를 생성하여 작동하는것이므로 기존 스레드의 스택에 저장되는 ThreadLocal의 데이터는 사용하지 못하게되므로, 복사해서 전달해줘야한다.


4 -> 비동기 스레드에서 터진 Error는 메인까지 반환하지 못하므로, 별도의 처리 또는 @Async를 Return 값이 있는 형태로 줘서 별도 처리 필요(아래 참조)





@Async 사용법

Application 클래스에 적용

@EnableAsync 어노테이션을 Application 클래스에 붙이고, 비동기를 사용하려는 메소드위에 @Async를 붙이면 적용된다. 하지만 해당방법은 default값으로 적용되는데, SimpleAsyncTaskExecutor를 사용하게되고 스레드 풀에 의한게 아닌 스레드를 만들어내는 역할만 한다. 스레드를 제대로 관리해주지 못함.



public class DemoApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoApplication.class, args);



public class Controller {

    private final TestService testService;

    public void main(){
        for(int i = 1; i <= 10; i++){
            testService.testAsync(i + "");


public class TestService {

    public void testAsync(String message){

        for(int i = 1; i <= 3; i++){
            System.out.println(message + "비동기 : " + i);


결과 잘 적용됨.


3비동기 : 1
4비동기 : 1
7비동기 : 1
5비동기 : 1
2비동기 : 1
1비동기 : 1
8비동기 : 1
6비동기 : 1
8비동기 : 2
8비동기 : 3
9비동기 : 1
9비동기 : 2
9비동기 : 3
10비동기 : 1
10비동기 : 2
10비동기 : 3
1비동기 : 2
2비동기 : 2
5비동기 : 2
7비동기 : 2
4비동기 : 2
3비동기 : 2
4비동기 : 3
7비동기 : 3
5비동기 : 3
2비동기 : 3
1비동기 : 3
6비동기 : 2
3비동기 : 3
6비동기 : 3


ThreadPool 사용 방법

위에서 적용했던 Applcation에 @EnableAsync 제거 해준 뒤, AsyncConfig 생성.

public class AsyncConfig {

    private int CORE_POOL_SIZE = 3;
    private int MAX_POOL_SIZE = 10;
    private int QUEUE_CAPACITY = 100_000;

    @Bean(name = "sampleExecutor")
    public Executor threadPoolTaskExecutor(){

        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();


        return taskExecutor;


  • CorePoolSize : 최초 동작 시에 corePoolSize만큼 스레드가 생성하여 사용된다.(Default 1)
  • MaxPoolSize : Queue 사이즈 이상의 요청이 들어오게 될 경우, 스레드의 개수를 MaxPoolSize만큼 늘린다.(Default : Integer.MAX_VAULE)
  • QueueCapacity : CorePoolSize 이상의 요청이 들어올 경우, LinkedBlockingQueue에서 대기하게 되는데 그 Queue의 사이즈를 지정해주는 것이다.(Default : Integer.MAX_VAULE)
  • SetThreadNamePrefix : 스레드명 설정


위의 설정대로면 3개의 요청까진 CorePoolSize의 범위내이니 작업하고, 요청이 더 들어와서 10만까지는 QueueCapacity의 크기내에서 대기하게 되고, Queue사이즈도 넘는 요청이 들어올경우 MaxPoolSize만큼 스레드개수를 늘려서 작업하게 된다.


스레드 풀의 종류를 여러개 사용시엔, @Async 설정 시에 위의 Bean이름을 설정해주면 된다.


@Async("sampleExecutor") // ThreadPoolTaskExecutor Bean명과 동일하게 가져가기
public void testAsync(String message){

    for(int i = 1; i <= 3; i++){
        System.out.println(message + "비동기 : " + i);



public class AsyncConfig implements AsyncConfigurer { // 추가

    private int CORE_POOL_SIZE = 3;
    private int MAX_POOL_SIZE = 10;
    private int QUEUE_CAPACITY = 100_000;

    @Bean(name = "sampleExecutor")
    public Executor threadPoolTaskExecutor() {

        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

        taskExecutor.setCorePoolSize( CORE_POOL_SIZE );
        taskExecutor.setMaxPoolSize( MAX_POOL_SIZE );
        taskExecutor.setQueueCapacity( QUEUE_CAPACITY );
        taskExecutor.setTaskDecorator( new CustomDecorator() ); // 데코레이터 적용
        taskExecutor.setThreadNamePrefix( "Executor-" );
        taskExecutor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() );

        return taskExecutor;

	// override method 
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncExceptionHandler(); // 추가




public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        log.error( ex.getMessage(), ex );

핸들러 생성해서 처리.

반환값이 있을땐, 아래 내용 참조.


Return 값이 필요할때

처리결과를 전달할 필요 없을땐 void로 설정하면 되고,

필요할 경우엔 3가지 방법이 있다.



Future의 경우 블로킹을 통해 결과가 올때까지 기다리므로 잘 쓰지 않는다.(비동기 블로킹)



콜백메소드를 통해 논블로킹 처리가 가능하다.

addCallback() 메소드로 첫번째 파라미터는 성공시 실행할 것을, 두번째 파라미터는 실패시에 실행할것을 지정해주면 된다.


public class TestService {

    public ListenableFuture<String> testAsync(String message){

        for(int i = 1; i <= 3; i++){
            System.out.println(message + "비동기 : " + i);

        return new AsyncResult<>( "성공" + message);

@Async 달은 메서드의 반환값 지정(ListenableFuture)


public class Controller {

    private final TestService testService;

    public void main(){
        for(int i = 1; i <= 10; i++){
            ListenableFuture<String> listenableFuture = testService.testAsync( i + "" );
            listenableFuture.addCallback(result -> System.out.println(result), error -> System.out.println(error.getMessage()));

addCallback(성공시 처리, 실패시 처리)





3비동기 : 1
1비동기 : 1
1비동기 : 2
1비동기 : 3
2비동기 : 1
2비동기 : 2
2비동기 : 3
3비동기 : 2
3비동기 : 3
5비동기 : 1
5비동기 : 2
5비동기 : 3
6비동기 : 1
6비동기 : 2
6비동기 : 3
7비동기 : 1
7비동기 : 2
7비동기 : 3
8비동기 : 1
8비동기 : 2
8비동기 : 3
9비동기 : 1
9비동기 : 2
9비동기 : 3
10비동기 : 1
10비동기 : 2
4비동기 : 1
10비동기 : 3
4비동기 : 2
4비동기 : 3



Java 8에 추가 된 것으로, 비동기 작업 이후의 다양한 메서드를 제공해준다.


public class TestService {

    public CompletableFuture<String> testAsync(String message) {

        for (int i = 1; i <= 3; i++) {
            System.out.println( message + "비동기 : " + i );

        if (message == "2") {
            throw new RuntimeException();

        return CompletableFuture.completedFuture( "성공" + message );


@Slf4j // log를 찍기 위한 어노테이션 추가
public class Controller {

    private final TestService testService;

    public void async() {
        for (int i = 1; i <= 10; i++) {
            CompletableFuture<String> stringCompletableFuture = testService.testAsync( i + "" );

            // Exception발생 시 처리
                    throwable -> {
                        log.error( "AsyncError: ", throwable );
                        return null;

            // 성공, 실패 값 둘다 처리 (반대 값들은 null형태로 들어옴) -> 처리후 반환값 지정 필요 x 이전 Completable 반환됨.
            // peek처럼 그냥 불러와서 별도 처리 가능.
                    (s, throwable) -> {
                        if (Objects.isNull( throwable )) {
                            log.info( s );
                        } else {
                            log.error( "AsyncError: " + throwable );

            // 성공, 실패 값 둘다 처리 (반대 값들은 null형태로 들어옴) -> 처리후 반환값 지정 필요
                    (s, throwable) -> {
                        if (Objects.isNull( throwable )) {
                            log.info( s );
                        } else {
                            log.error( "AsyncError: " + throwable );
                        return null;

            // 성공했을 시 작업 수행(return 값이 필요 없음)
            stringCompletableFuture.thenAccept( s -> {

            } );

            // 성공했을 시 작업 수행(return 값이 필요함)
            CompletableFuture<Integer> integerCompletableFuture = stringCompletableFuture.thenApply( s -> {
                return 2;
            } );



위의 메서드들 뒤에 async를 붙여서 이후 수행들도 비동기로 처리 가능.

(Executor(스레드풀)을 지정해주지 않으면, 자바의 ForkJoinPool사용.)

ex) handleAsync()

이 외의 메서드들은 아래 oracle 참조하자!

List<CompleatableFuture> 형태를 다 끝나길 기다렸다가 하는 allOf() , 하나라도 끝나면 받는 anyOf()도 존재함.




Queue사이즈 초과 방어 코드

try catch문을 통해 TaskRejectedException 처리를 해주자.


public class Controller {

    private final TestService testService;

    public void main(){
        for(int i = 1; i <= 10; i++){
                ListenableFuture<String> listenableFuture = testService.testAsync( i + "" );
                listenableFuture.addCallback(result -> System.out.println(result), error -> System.out.println(error.getMessage()));
            }catch (TaskRejectedException e){
                // 핸들링


ThreadLocal 사용 시 데이터 복사

TaskDecorator를 통해 TaskExecutor 생성시에 커스터마이징을 해줄 수가 있다.

기존 스레드로컬 데이터를 새로운 스레드 생성 시에 복사해주자.


커스텀 데코레이터 생성.

public class CustomDecorator implements TaskDecorator {

    public Runnable decorate(Runnable runnable) {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();

        return() -> {
            RequestContextHolder.setRequestAttributes( requestAttributes );


TaskExecutor에 데코레이터 적용

public class AsyncConfig {

    private int CORE_POOL_SIZE = 3;
    private int MAX_POOL_SIZE = 10;
    private int QUEUE_CAPACITY = 100_000;

    @Bean(name = "sampleExecutor")
    public Executor threadPoolTaskExecutor(){

        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

        taskExecutor.setTaskDecorator(new CustomDecorator()); // 데코레이터 적용

        return taskExecutor;





Rejection Policy

거부된 작업 관리.


  • AbortPolicy : 작업이 거부되면 RejectedExecutionException을 던짐.
  • CallerRunsPolicy : Async 메소드를 불렀던 메인 스레드에서 거부된 작업을 실행함.
  • DiscardOldestPolicy : 큐에서 가장 오래된 task를 제거하고 실행시킨다.
  • DiscardPolicy : Reject된 Task에 대해 어떠한 작업도 진행안함.


내부 코드

 public static class CallerRunsPolicy implements RejectedExecutionHandler {
         * Creates a {@code CallerRunsPolicy}.
        public CallerRunsPolicy() { }

         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {

     * A handler for rejected tasks that throws a
     * {@link RejectedExecutionException}.
     * This is the default handler for {@link ThreadPoolExecutor} and
     * {@link ScheduledThreadPoolExecutor}.
    public static class AbortPolicy implements RejectedExecutionHandler {
         * Creates an {@code AbortPolicy}.
        public AbortPolicy() { }

         * Always throws RejectedExecutionException.
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +

     * A handler for rejected tasks that silently discards the
     * rejected task.
    public static class DiscardPolicy implements RejectedExecutionHandler {
         * Creates a {@code DiscardPolicy}.
        public DiscardPolicy() { }

         * Does nothing, which has the effect of discarding task r.
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
         * Creates a {@code DiscardOldestPolicy} for the given executor.
        public DiscardOldestPolicy() { }

         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {


public class AsyncConfig {

    private int CORE_POOL_SIZE = 3;
    private int MAX_POOL_SIZE = 10;
    private int QUEUE_CAPACITY = 100_000;

    @Bean(name = "sampleExecutor")
    public Executor threadPoolTaskExecutor(){

        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

        taskExecutor.setTaskDecorator(new CustomDecorator()); // 데코레이터 적용
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //추가

        return taskExecutor;












