최종 결과 데이터를 만들기 위해 여러 API 들을 호출하고, 얻어온 각 API 결과들을 조합하는 로직을 만든다고 해보자.
먼저 아래코드처럼 각 API 결과들을 순차적으로 가져와 조합하는 방법이 있다.
@Service
public class SimpleContext {
@Autowired
private FirstJobService firstJobService;
@Autowired
private SecondJobService secondJobService;
@Autowired
private ThirdJobService thirdJobService;
@Autowired
private FourthJobService fourthJobService;
public CombineContextResult combine() {
// 각각의 API 호출 (단순 예제 코드이므로 Object type 사용)
Object firstResult = this.firstJobService.doFirstJob();
Object secondResult = this.secondJobService.doSecondJob();
Object thirdResult = this.thirdJobService.doThirdJob();
Object fourthResult = this.fourthJobService.doFourthJob();
// 결과 조합
return combine(firstResult, secondResult, thirdResult, fourthResult);
}
}
각 API 들을 순차적으로 호출하면 이해하기 쉽고 심플하지만 효율적이지 못할 수 있다. 그래서 효율을 높이기 위해 비동기 방식으로 수정하려고 한다. 예제에서는 작업에 필요한 서비스가 4개 뿐이라 비동기가 꼭 필요할까 싶지만 실제 프로덕션 코드에서는 서비스가 20개 가까이 되고 하는일이 많아서 필요했다.
cf) 순차적인 동기방식이 무조건 안좋다고 보면 안된다. 만약 아래코드와 같이 첫번째 job 결과(firstResult)가 두번째 job에 사용되고 그 결과가 세번째에 사용되는 케이스라면 순차적으로 결과를 기다려야 한다.
@Service
public class SimpleContext {
@Autowired
private FirstJobService firstJobService;
@Autowired
private SecondJobService secondJobService;
@Autowired
private ThirdJobService thirdJobService;
@Autowired
private FourthJobService fourthJobService;
public CombineContextResult combine() {
// 각각의 API 호출 (단순 예제 코드이므로 Object type 사용)
Object firstResult = this.firstJobService.doFirstJob();
Object secondResult = this.secondJobService.doSecondJob(firstResult);
Object thirdResult = this.thirdJobService.doThirdJob(secondResult);
return this.fourthJobService.doFourthJob(thirdResult);
}
}
다시 이전 케이스로 돌아와서 순차적인 동기방식이 효율적이지 못할 상황을 가정해보자.
@Service
public class SimpleContext {
@Autowired
private FirstJobService firstJobService;
@Autowired
private SecondJobService secondJobService;
@Autowired
private ThirdJobService thirdJobService;
@Autowired
private FourthJobService fourthJobService;
public CombineContextResult combine() {
// 각각의 API 호출 (단순 예제 코드이므로 Object type 사용)
Object firstResult = this.firstJobService.doFirstJob(); // 3초 걸림.
Object secondResult = this.secondJobService.doSecondJob(); // 1초 걸림.
Object thirdResult = this.thirdJobService.doThirdJob(secondResult); // secondResult가 필요
Object fourthResult = this.fourthJobService.doFourthJob(secondResult); // secondResult가 필요
// 결과 조합
return combine(firstResult, secondResult, thirdResult, fourthResult);
}
}
firstResult 결과를 얻는데 3초가 걸린다. 제일 먼저 수행됨으로 인해 나머지 job들은 기본 3초를 기다려야 한다. 만약 비동기로 firstResult 결과를 기다리지 않고 second job을 실행시키면 시간을 아낄 수 있다. 그런데 third job 과 fourth job은 secondResult 가 필요하므로 단순히 모두 비동기로 호출할 순 없다. 이럴 경우 어떻게 효율적인 구조를 만들 수 있을까?
cf) 지금부터 설명할 비동기 구조를 내가 만든건 아니다. 팀에서 여러 개발자분들이 함께 고민해서 만들었다. 나는 이미 만들어진 구조를 정확하게 이해하기 위해 정리했다.
먼저 CombineActor 스프링 빈 객체가 있다. 필요한 서비스들과 executor를 주입받아 CombineContext 생성자로 넘겨준다. 추가적으로 필요한 데이터들은 CombineContextSource 에 담아서 같이 전달한다.
@Component
@RequiredArgsConstructor
public class CombineActor {
private final FirstJobService firstJobService;
private final SecondJobService secondJobService;
private final ThirdJobService thirdJobService;
private final FourthJobService fourthJobService;
@Qualifier("callbackExecutor")
private final Executor executor;
public CombineContextResult combine(CombineSource source) {
CombineContextSource contextSource = CombineContextSource.builder()
.id(source.getId())
.name(source.getName())
.build();
CombineContext combineContext = new CombineContext(
contextSource,
this.firstJobService,
this.secondJobService,
this.thirdJobService,
this.fourthJobService,
this.executor
);
return combineContext.combine();
}
}
CombineContext 객체가 combine 메서드를 수행하면서 본격적인 작업이 시작된다. combine 메서드에서 제일 먼저 LazyContext 객체를 생성하는데 아래 private class 로 정의되어 있다. LazyContext 에서 주의깊게 볼 부분이 Getter lazy 설정이다.
@RequiredArgsConstructor
public class CombineContext {
private final CombineContextSource contextSource;
private final FirstJobService firstJobService;
private final SecondJobService secondJobService;
private final ThirdJobService thirdJobService;
private final FourthJobService fourthJobService;
private final Executor executor;
public CombineContextResult combine() {
LazyContext context = new LazyContext();
CompletableFuture.allOf(
context.getLazyFirstJobFuture(),
context.getLazySecondJobFuture(),
context.getLazyThirdJob(),
context.getLazyFourthJob()
).join(); // join을 함으로써 모든 CompletableFuture 의 실행 완료 기다림
return CombineContextResult.builder()
.firstJobResult(context.getLazyFirstJobFuture().join())
.secondJobResult(context.getLazySecondJobFuture().join().orElse(null))
.thirdJobResult(context.getLazyThirdJob().join())
.fourthJobResult(context.getLazyFourthJob().join())
.build();
}
private class LazyContext {
@Getter(lazy = true)
private final CompletableFuture<Boolean> lazyFirstJobFuture = this.initFirstJob();
@Getter(lazy = true)
private final CompletableFuture<Optional<String>> lazySecondJobFuture = this.initSecondJob();
@Getter(lazy = true)
private final CompletableFuture<String> lazyThirdJob = this.initThirdJob();
@Getter(lazy = true)
private final CompletableFuture<Optional<Boolean>> lazyFourthJob = this.initFourthJob();
private CompletableFuture<Boolean> initFirstJob() {
return firstJobService.doFirstJob(contextSource.getId())
.toFuture(); // toFuture() 호출 시 바로 subscribe 수행
}
private CompletableFuture<Optional<String>> initSecondJob() {
return secondJobService.doSecondJob(contextSource.getId())
.toFuture()
.thenApply(Optional::ofNullable);
}
private CompletableFuture<String> initThirdJob() {
return this.getLazySecondJobFuture()
.thenApply(
optionalString -> {
// third job 작업
return thirdJobService.doThirdJob();
}
);
}
private CompletableFuture<Optional<Boolean>> initFourthJob() {
return this.getLazySecondJobFuture()
.thenApplyAsync(
optionalString -> {
// Mono<Boolean> 리턴하는 webClient 호출
return fourthJobService.doFourthJob(optionalString)
.toFuture()
.thenApply(Optional::of);
},
executor
);
}
}
lazyFirstJobFuture, lazySecondJobFuture, lazyThirdJob, lazyFourthJob 변수들은 선언과 동시에 초기화를 진행하도록 되어있다. 그래서 LazyContext 객체가 생성되기전, 클래스가 로딩되는 시점에 초기화작업이 시작된다. 하지만 Getter lazy 애노테이션을 추가함으로써 각 필드의 getter 가 실행되는 시점에 초기화 작업이 실행된다.
만약 Getter lazy 를 쓰지 않으면, 호출 순서를 예상하기 어렵고 아직 초기화 되지 않은 대상을 먼저 접근하면 null 이다. 즉 Getter lazy 쓴다는것은 '실행 순서를 알 수 없으니까 초기화 시점에 실행하지 않겠다'는 뜻이다.
다음으로 LazyContext future 들을 CompletableFuture.allOf() 로 묶어서 호출한 후 join 한다. 호출순서를 먼저 고려해 던져놓으면 아래에서는 순서 상관없이 결과를 꺼낼 수 있다. 만약 여기서 join 을 안하면 아래에서 꺼내올 때 실제 호출이 발생하게되고 그 시점에서 호출 순서를 고려해야하므로 더 복잡해진다.
다음으로 initThirdJob, initFourthJob 메서드를 살펴보자. 둘 다 lazySecondJobFuture 결과를 얻은 후 각각의 작업을 수행한다. 즉 second job 에 대한 의존관계가 있다. 그래서 thenApply 를 통해 callback 처리를 한다. 그런데 initFourthJob 은 thenApply 가 아니라 thenApplyAsync 를 사용했다(주입받은 executor 사용).
thenApply, thenCompose 의 callback 은 reactor-thread 가 처리한다. reactor-thread 는 Mono / Flux 의 block 을 허용하지 않는다(netty event loop). 그래서 thenApply, thenCompose callback 안에서 외부호출 + block 이 존재하면 에러가 발생한다. 이 문제를 해결하기 위해 thenApplyAsync, thenComposeAsync 를 사용한다.
그렇다면 thenApplyAsync 와 thenComposeAsync 의 차이는 뭘까? 메서드 시그니처를 보면 Function 리턴타입인 U 부분만 다르고 나머지는 동일하다. thenComposeAsync 는 CompletionStage 타입만 리턴가능하다.
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) { ... }
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) { ... }
이렇게만 보면 이해가 잘 안간다. 아래 샘플 코드에서 getLazySecondJobFuture 결과를 기다려서 이후 작업을 시작하는것까진 동일하다. 하지만 thenApplyAsync 에서는 모든 타입이 리턴 가능하지만, thenComposeAsyncJob 에서는 WebClient 호출하고 리턴된 Mono 타입을 toFuture 로 변환시켜 리턴하는 것을 볼 수 있다.
private CompletableFuture<String> thenApplyAsyncJob() {
return this.getLazySecondJobFuture()
.thenApplyAsync(
secondJobResult -> {
// 모든 타입 리턴 가능
return thirdJobService.doThirdJob(secondJobResult);
}, executor
);
}
private CompletableFuture<String> thenComposeAsyncJob() {
return this.getLazySecondJobFuture()
.thenComposeAsync(
secondJobResult -> {
// Mono<String> 리턴하는 webClient 호출
return fourthJobService.doFourthJob(secondJobResult)
.toFuture(); // CompletableFuture 타입 리턴
}, executor
);
}
이제 위 구조에서 새로운 job 을 추가해보자. 먼저 그 작업에 필요한 의존관계를 살펴봐야 한다. 만약 모든 job 들의 결과가 필요하면 아래와 같이 코드를 작성한다.
private CompletableFuture<String> initAdditionalJob() {
return CompletableFuture.allOf(
this.getLazyFirstJobFuture(),
this.getLazySecondJobFuture(),
this.getLazyThirdJobFuture(),
this.getLazyFourthJobFuture()
)
.thenApply(
then -> {
// 조합 작업
return result;
}
);
}
그리고 LazyContext에 초기화 작업을 추가한다.
@Getter(lazy = true)
private final CompletableFuture<Boolean> lazyAdditionalJobFuture = this.initAdditionalJob();
그리고 combine 메서드 맨 위 allOf 에서 가장 마지막에 lazyAdditionalJobFuture를 넣어줘야 한다.
CompletableFuture.allOf(
context.getLazyFirstJobFuture(),
context.getLazySecondJobFuture(),
context.getLazyThirdJob(),
context.getLazyFourthJob(),
context.getLazyAddiotnalJob() // 새롭게 추가
).join();
만약 맨위에 넣어주게 되면 에러는 발생안하지만 비동기 효율은 안좋아진다.
CompletableFuture.allOf(
context.getLazyAddiotnalJob() // 새롭게 추가(문제!!)
context.getLazyFirstJobFuture(),
context.getLazySecondJobFuture(),
context.getLazyThirdJob(),
context.getLazyFourthJob(),
).join();
그래서 이 구조는 새로운 job 추가가 쉽지 않은게 단점이다. 지금같은 극단적인 케이스가 아니라면 실수했을 때 인지하기 쉽지 않다. 하지만 이구조를 통해 의존관계 표현을 직접적으로 핸들링하지 않으므로 코드는 깔끔해진다.
다음으로 위 코드에서 CombineContext 는 빈이 아니다. 하지만 빈 객체를 사용하기 위해 주입을 받는다. 스프링은 이런 용도로 prototype scope bean 을 제공한다.
@Component
public class CombineActor {
// 문제 있는 코드
@Autowired
private CombineContext combineContext;
public CombineContextResult combine(CombineSource source) {
CombineContextSource contextSource = CombineContextSource.builder()
.id(source.getId())
.name(source.getName())
.build();
return combineContext.combine(contextSource);
}
}
주의: 위와 같이 DI 받으면 안됌.
import javax.inject.Inject;
import javax.inject.Provider;
...
@Inject
Provider<CombineContext> contextProvider;
public CombineContextResult combine(CombineSource source) {
CombineContextSource contextSource = CombineContextSource.builder()
.id(source.getId())
.name(source.getName())
.build();
CombineContext combineContext = this.contextProvider.get();
return combineContext.combine(contextSource);
}
...
@Component
@Scope("prototype")
public class CombineContext {
private CombineContextSource contextSource;
@Autowired
private FirstJobService firstJobService;
@Autowired
private SecondJobService secondJobService;
@Autowired
private ThirdJobService thirdJobService;
@Autowired
private FourthJobService fourthJobService;
@Autowired
private ErrorService errorService;
@Qualifier("callbackExecutor")
@Autowired
private Executor executor;
public CombineContextResult combine(CombineContextSource contextSource) {
this.contextSource = contextSource;
... 이하 생략
}
}
마지막으로 ThreadPoolTaskExecutor 에 설정에 대해서 살펴보자.
@Bean("callbackExecutor")
Executor callbackExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("callback");
// 최초 10개 스레드에서 처리하다, 더필요하게 되면 50개까지 큐를 먼저 채운다. 그리고도 더 필요하면 스레드를 100개 까지 늘린다.
executor.setCorePoolSize(10);
executor.setQueueCapacity(50);
executor.setMaxPoolSize(100);
// application이 shutdown 될 때 task 가 완료되길 기다린다(interrupt 방지).
executor.setWaitForTasksToCompleteOnShutdown(true);
// application이 shutdown 될 때 task가 완료되길 기다리는 시간
executor.setAwaitTerminationSeconds(10);
// RejectedExecutionException 예외가 발생할 때 핸들러
// 디폴트 핸들러. RejectedExecutionException을 발생
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// ThreadPoolTaskExecutor 를 요청한 thread(여기서는 reactor-thread) 가 직접 처리
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// task를 버린다.
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 처리되지 않은 가장 오래된 요청을 버리고 다시 execute 시도
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
return executor;
}
'Spring' 카테고리의 다른 글
WebClient 사용할때 주의 (5편) (0) | 2021.11.11 |
---|---|
WebClient 사용할때 주의 (4편) (0) | 2021.11.05 |
jackson-dataformat-xml 이슈 정리 (0) | 2021.10.19 |
@RequestParam 사용 시 Null에 대한 고민 정리 (0) | 2021.09.20 |
@RequestParam 사용 시 주의사항 (2) | 2021.09.11 |