본문 바로가기
FrameWork/Spring

[Spring] @Async 비동기 멀티스레드 사용법

by 계범 2022. 6. 25.

수정사항 2022-08-27

  • async 사용 시 비동기 스레드 exception 처리
  • CompletableFuture 사용법 추가

Async 사용계기

현재 마이다스 AI 역검 백엔드팀에 들어오게 되었는데,

과제 중 원활한 검증작업을 위한 응시 데이터를 만드는 것을 담당하게 되었다.

 

실제로는 응시자 한명 당 다양한 게임마다 응시를 본 데이터가 날라오게 되고, 이를 각각 저장하게 되는데

응시를 실제로 보기때문에 하나의 패킷이 날라오는데 시간도 오래걸리게되어서 부담이 작다고 생각했지만,

 

나는 한번의 요청으로 응시자를 생성해준 후 각 게임마다 문제를 불러오고 문제에 대한 응시데이터도 만들어주고 해야한다. 거기다 검증을 위해 응시자를 천명~만명을 만든다고하면, 만명에 대한 것을 만들어줘야하기때문에 굉장히 느려졌다.

 

스프링은 요청 하나 당 스레드를 만들어줘서 진행하게 되는데,

나는 하나의 요청에서 만명의 데이터를 만들어줘야하기때문에 멀티 스레드를 생각했다.

 

스프링엔 @Async 어노테이션이 존재한다.(JS에도 동일이름으로 있음!)

이 어노테이션은 쓰레드풀을 활용한 비동기 메소드를 지원해준다.

 

메소드에 @Async를 달아두면 비동기로 호출자는 즉시 리턴하고 spring TaskExcutor에 의해 새로운 스레드로 실행되게 된다.

 

해당 어노테이션을 통해 속도가 굉장히 빨라졌다 ㅎㅎ

 

@Async 사용 전 주의사항

  • public 메소드에만 사용 가능
  • 자가 호출(self-invocation) 불가능
    • 같은객체(클래스) 내의 메소드 호출시 불가능.(@Async가 붙은 메소드를 호출 시에 다른 클래스에서 호출해야한다!)
  • ThreadLocal 사용 시 내용 복사
  • 비동기 스레드에서 터진 Exception 처리
  • (프로젝트 내 Thread 개수 제한 걸려있는지 확인해볼것)

 

1,2  -> @Async의 동작은 별도로 설정하지 않으면 Proxy 모드가 적용되면서 스프링의 AOP를 가져가는데, 그로 인해 AOP와 관련된 제약사항을 다 가지게 된다.

AOP는 프록시 패턴이 사용되고, 프록시패턴은 실제 기능을 수행하는 객체 대신 가상의 객체를 사용하게 되는 것이므로, private으로 접근이 불가능하다던가 자가호출을 하게되면 proxy를 거치지 않기때문에 사용이 불가능해진다.

 

3 - > @Async를 사용하게되면, 새로운 스레드를 생성하여 작동하는것이므로 기존 스레드의 스택에 저장되는 ThreadLocal의 데이터는 사용하지 못하게되므로, 복사해서 전달해줘야한다.

 

4 -> 비동기 스레드에서 터진 Error는 메인까지 반환하지 못하므로, 별도의 처리 또는 @Async를 Return 값이 있는 형태로 줘서 별도 처리 필요(아래 참조)

 

https://dzone.com/articles/effective-advice-on-spring-async-part-1

https://steady-coding.tistory.com/608

 

@Async 사용법

Application 클래스에 적용

@EnableAsync 어노테이션을 Application 클래스에 붙이고, 비동기를 사용하려는 메소드위에 @Async를 붙이면 적용된다. 하지만 해당방법은 default값으로 적용되는데, SimpleAsyncTaskExecutor를 사용하게되고 스레드 풀에 의한게 아닌 스레드를 만들어내는 역할만 한다. 스레드를 제대로 관리해주지 못함.

 

 

@EnableAsync
@SpringBootApplication
public class DemoApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoApplication.class, args);
	}

}

 

@RestController
@RequiredArgsConstructor
public class Controller {

    private final TestService testService;

    @GetMapping("/test")
    public void main(){
        for(int i = 1; i <= 10; i++){
            testService.testAsync(i + "");
        }
    }
}

 

@Service
@RequiredArgsConstructor
public class TestService {

    @Async("sampleExecutor")
    public void testAsync(String message){

        for(int i = 1; i <= 3; i++){
            System.out.println(message + "비동기 : " + i);
        }
        
    }
}

 

결과 잘 적용됨.

더보기

3비동기 : 1
4비동기 : 1
7비동기 : 1
5비동기 : 1
2비동기 : 1
1비동기 : 1
8비동기 : 1
6비동기 : 1
8비동기 : 2
8비동기 : 3
9비동기 : 1
9비동기 : 2
9비동기 : 3
10비동기 : 1
10비동기 : 2
10비동기 : 3
1비동기 : 2
2비동기 : 2
5비동기 : 2
7비동기 : 2
4비동기 : 2
3비동기 : 2
4비동기 : 3
7비동기 : 3
5비동기 : 3
2비동기 : 3
1비동기 : 3
6비동기 : 2
3비동기 : 3
6비동기 : 3

 

ThreadPool 사용 방법

위에서 적용했던 Applcation에 @EnableAsync 제거 해준 뒤, AsyncConfig 생성.

@Configuration
@EnableAsync
public class AsyncConfig {

    private int CORE_POOL_SIZE = 3;
    private int MAX_POOL_SIZE = 10;
    private int QUEUE_CAPACITY = 100_000;

    @Bean(name = "sampleExecutor")
    public Executor threadPoolTaskExecutor(){

        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

        taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
        taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
        taskExecutor.setThreadNamePrefix("Executor-");

        return taskExecutor;
    }
}

 

  • CorePoolSize : 최초 동작 시에 corePoolSize만큼 스레드가 생성하여 사용된다.(Default 1)
  • MaxPoolSize : Queue 사이즈 이상의 요청이 들어오게 될 경우, 스레드의 개수를 MaxPoolSize만큼 늘린다.(Default : Integer.MAX_VAULE)
  • QueueCapacity : CorePoolSize 이상의 요청이 들어올 경우, LinkedBlockingQueue에서 대기하게 되는데 그 Queue의 사이즈를 지정해주는 것이다.(Default : Integer.MAX_VAULE)
  • SetThreadNamePrefix : 스레드명 설정

 

위의 설정대로면 3개의 요청까진 CorePoolSize의 범위내이니 작업하고, 요청이 더 들어와서 10만까지는 QueueCapacity의 크기내에서 대기하게 되고, Queue사이즈도 넘는 요청이 들어올경우 MaxPoolSize만큼 스레드개수를 늘려서 작업하게 된다.

 

스레드 풀의 종류를 여러개 사용시엔, @Async 설정 시에 위의 Bean이름을 설정해주면 된다.

 

@Async("sampleExecutor") // ThreadPoolTaskExecutor Bean명과 동일하게 가져가기
public void testAsync(String message){

    for(int i = 1; i <= 3; i++){
        System.out.println(message + "비동기 : " + i);
    }
}

 

예외처리(void)

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer { // 추가

    private int CORE_POOL_SIZE = 3;
    private int MAX_POOL_SIZE = 10;
    private int QUEUE_CAPACITY = 100_000;

    @Bean(name = "sampleExecutor")
    public Executor threadPoolTaskExecutor() {

        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

        taskExecutor.setCorePoolSize( CORE_POOL_SIZE );
        taskExecutor.setMaxPoolSize( MAX_POOL_SIZE );
        taskExecutor.setQueueCapacity( QUEUE_CAPACITY );
        taskExecutor.setTaskDecorator( new CustomDecorator() ); // 데코레이터 적용
        taskExecutor.setThreadNamePrefix( "Executor-" );
        taskExecutor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() );

        return taskExecutor;
    }


	// override method 
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncExceptionHandler(); // 추가
    }

}

 

 

@Slf4j
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        log.error( ex.getMessage(), ex );
    }
}

핸들러 생성해서 처리.

반환값이 있을땐, 아래 내용 참조.

 

Return 값이 필요할때

처리결과를 전달할 필요 없을땐 void로 설정하면 되고,

필요할 경우엔 3가지 방법이 있다.

 

Future

Future의 경우 블로킹을 통해 결과가 올때까지 기다리므로 잘 쓰지 않는다.(비동기 블로킹)

 

ListenableFuture

콜백메소드를 통해 논블로킹 처리가 가능하다.

addCallback() 메소드로 첫번째 파라미터는 성공시 실행할 것을, 두번째 파라미터는 실패시에 실행할것을 지정해주면 된다.

 

@Service
@RequiredArgsConstructor
public class TestService {

    @Async("sampleExecutor")
    public ListenableFuture<String> testAsync(String message){

        for(int i = 1; i <= 3; i++){
            System.out.println(message + "비동기 : " + i);
        }

        return new AsyncResult<>( "성공" + message);
    }
}

@Async 달은 메서드의 반환값 지정(ListenableFuture)

 

@RestController
@RequiredArgsConstructor
public class Controller {

    private final TestService testService;

    @GetMapping("/test")
    public void main(){
        for(int i = 1; i <= 10; i++){
            ListenableFuture<String> listenableFuture = testService.testAsync( i + "" );
            listenableFuture.addCallback(result -> System.out.println(result), error -> System.out.println(error.getMessage()));
        }
    }
}

addCallback(성공시 처리, 실패시 처리)

 

 

결과

더보기

3비동기 : 1
1비동기 : 1
1비동기 : 2
1비동기 : 3
2비동기 : 1
2비동기 : 2
2비동기 : 3
성공1
3비동기 : 2
3비동기 : 3
성공3
5비동기 : 1
5비동기 : 2
5비동기 : 3
성공5
6비동기 : 1
6비동기 : 2
6비동기 : 3
성공6
7비동기 : 1
7비동기 : 2
7비동기 : 3
성공7
8비동기 : 1
8비동기 : 2
8비동기 : 3
성공8
9비동기 : 1
9비동기 : 2
9비동기 : 3
성공9
10비동기 : 1
성공2
10비동기 : 2
4비동기 : 1
10비동기 : 3
4비동기 : 2
성공10
4비동기 : 3
성공4

 

CompletableFuture

Java 8에 추가 된 것으로, 비동기 작업 이후의 다양한 메서드를 제공해준다.

 

@Service
@RequiredArgsConstructor
public class TestService {

    @Async("sampleExecutor")
    public CompletableFuture<String> testAsync(String message) {

        for (int i = 1; i <= 3; i++) {
            System.out.println( message + "비동기 : " + i );
        }

        if (message == "2") {
            throw new RuntimeException();
        }

        return CompletableFuture.completedFuture( "성공" + message );
    }
}

 

@RestController
@RequiredArgsConstructor
@Slf4j // log를 찍기 위한 어노테이션 추가
public class Controller {

    private final TestService testService;

    @GetMapping("/test")
    public void async() {
        for (int i = 1; i <= 10; i++) {
            CompletableFuture<String> stringCompletableFuture = testService.testAsync( i + "" );

            // Exception발생 시 처리
            stringCompletableFuture.exceptionally(
                    throwable -> {
                        log.error( "AsyncError: ", throwable );
                        return null;
                    }
            );

            // 성공, 실패 값 둘다 처리 (반대 값들은 null형태로 들어옴) -> 처리후 반환값 지정 필요 x 이전 Completable 반환됨.
            // peek처럼 그냥 불러와서 별도 처리 가능.
            stringCompletableFuture.whenComplete(
                    (s, throwable) -> {
                        if (Objects.isNull( throwable )) {
                            log.info( s );
                        } else {
                            log.error( "AsyncError: " + throwable );
                        }
                    }
            );

            // 성공, 실패 값 둘다 처리 (반대 값들은 null형태로 들어옴) -> 처리후 반환값 지정 필요
            stringCompletableFuture.handle(
                    (s, throwable) -> {
                        if (Objects.isNull( throwable )) {
                            log.info( s );
                        } else {
                            log.error( "AsyncError: " + throwable );
                        }
                        return null;
                    }
            );

            // 성공했을 시 작업 수행(return 값이 필요 없음)
            stringCompletableFuture.thenAccept( s -> {

            } );

            // 성공했을 시 작업 수행(return 값이 필요함)
            CompletableFuture<Integer> integerCompletableFuture = stringCompletableFuture.thenApply( s -> {
                return 2;
            } );
        }
    }

}

 

위의 메서드들 뒤에 async를 붙여서 이후 수행들도 비동기로 처리 가능.

(Executor(스레드풀)을 지정해주지 않으면, 자바의 ForkJoinPool사용.)

ex) handleAsync()

이 외의 메서드들은 아래 oracle 참조하자!

List<CompleatableFuture> 형태를 다 끝나길 기다렸다가 하는 allOf() , 하나라도 끝나면 받는 anyOf()도 존재함.

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

 

 

Queue사이즈 초과 방어 코드

try catch문을 통해 TaskRejectedException 처리를 해주자.

 

@RestController
@RequiredArgsConstructor
public class Controller {

    private final TestService testService;

    @GetMapping("/test")
    public void main(){
        for(int i = 1; i <= 10; i++){
            try{
                ListenableFuture<String> listenableFuture = testService.testAsync( i + "" );
                listenableFuture.addCallback(result -> System.out.println(result), error -> System.out.println(error.getMessage()));
            }catch (TaskRejectedException e){
                // 핸들링
            }
        }
    }
}

 

ThreadLocal 사용 시 데이터 복사

TaskDecorator를 통해 TaskExecutor 생성시에 커스터마이징을 해줄 수가 있다.

기존 스레드로컬 데이터를 새로운 스레드 생성 시에 복사해주자.

 

커스텀 데코레이터 생성.

public class CustomDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable runnable) {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();

        return() -> {
            RequestContextHolder.setRequestAttributes( requestAttributes );
            runnable.run();
        };
    }
}

 

TaskExecutor에 데코레이터 적용

@Configuration
@EnableAsync
public class AsyncConfig {

    private int CORE_POOL_SIZE = 3;
    private int MAX_POOL_SIZE = 10;
    private int QUEUE_CAPACITY = 100_000;

    @Bean(name = "sampleExecutor")
    public Executor threadPoolTaskExecutor(){

        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

        taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
        taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
        taskExecutor.setTaskDecorator(new CustomDecorator()); // 데코레이터 적용
        taskExecutor.setThreadNamePrefix("Executor-");

        return taskExecutor;
    }
}

 

https://blog.gangnamunni.com/post/mdc-context-task-decorator/

 

 

Rejection Policy

거부된 작업 관리.

 

  • AbortPolicy : 작업이 거부되면 RejectedExecutionException을 던짐.
  • CallerRunsPolicy : Async 메소드를 불렀던 메인 스레드에서 거부된 작업을 실행함.
  • DiscardOldestPolicy : 큐에서 가장 오래된 task를 제거하고 실행시킨다.
  • DiscardPolicy : Reject된 Task에 대해 어떠한 작업도 진행안함.

 

내부 코드

더보기
 public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    /**
     * A handler for rejected tasks that throws a
     * {@link RejectedExecutionException}.
     *
     * This is the default handler for {@link ThreadPoolExecutor} and
     * {@link ScheduledThreadPoolExecutor}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

 

@Configuration
@EnableAsync
public class AsyncConfig {

    private int CORE_POOL_SIZE = 3;
    private int MAX_POOL_SIZE = 10;
    private int QUEUE_CAPACITY = 100_000;

    @Bean(name = "sampleExecutor")
    public Executor threadPoolTaskExecutor(){

        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

        taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
        taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
        taskExecutor.setTaskDecorator(new CustomDecorator()); // 데코레이터 적용
        taskExecutor.setThreadNamePrefix("Executor-");
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //추가

        return taskExecutor;
    }
}

 

 

참조


https://steady-coding.tistory.com/611

 

https://velog.io/@gillog/Spring-Async-Annotation%EB%B9%84%EB%8F%99%EA%B8%B0-%EB%A9%94%EC%86%8C%EB%93%9C-%EC%82%AC%EC%9A%A9%ED%95%98%EA%B8%B0

 

https://brunch.co.kr/@springboot/401

 

https://blog.gangnamunni.com/post/mdc-context-task-decorator/

 

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

댓글