본문 바로가기
Spring

여러 API 결과 조합(with 비동기)

by ybs 2021. 10. 31.
반응형

최종 결과 데이터를 만들기 위해 여러 API 들을 호출하고, 얻어온 각 API 결과들을 조합하는 로직을 만든다고 해보자.

 

먼저 아래코드처럼 각 API 결과들을 순차적으로 가져와 조합하는 방법이 있다.

@Service
public class SimpleContext {

	@Autowired
	private FirstJobService firstJobService;
	@Autowired
	private SecondJobService secondJobService;
	@Autowired
	private ThirdJobService thirdJobService;
	@Autowired
	private FourthJobService fourthJobService;

	public CombineContextResult combine() {
		// 각각의 API 호출 (단순 예제 코드이므로 Object type 사용)
		Object firstResult = this.firstJobService.doFirstJob();
		Object secondResult = this.secondJobService.doSecondJob();
		Object thirdResult = this.thirdJobService.doThirdJob();
		Object fourthResult = this.fourthJobService.doFourthJob();

		// 결과 조합
		return combine(firstResult, secondResult, thirdResult, fourthResult);
	}
}

 

각 API 들을 순차적으로 호출하면 이해하기 쉽고 심플하지만 효율적이지 못할 수 있다. 그래서 효율을 높이기 위해 비동기 방식으로 수정하려고 한다. 예제에서는 작업에 필요한 서비스가 4개 뿐이라 비동기가 꼭 필요할까 싶지만 실제 프로덕션 코드에서는 서비스가 20개 가까이 되고 하는일이 많아서 필요했다. 

 

cf) 순차적인 동기방식이 무조건 안좋다고 보면 안된다. 만약 아래코드와 같이 첫번째 job 결과(firstResult)가 두번째 job에 사용되고 그 결과가 세번째에 사용되는 케이스라면 순차적으로 결과를 기다려야 한다.

@Service
public class SimpleContext {

	@Autowired
	private FirstJobService firstJobService;
	@Autowired
	private SecondJobService secondJobService;
	@Autowired
	private ThirdJobService thirdJobService;
	@Autowired
	private FourthJobService fourthJobService;

	public CombineContextResult combine() {
		// 각각의 API 호출 (단순 예제 코드이므로 Object type 사용)
		Object firstResult = this.firstJobService.doFirstJob();
		Object secondResult = this.secondJobService.doSecondJob(firstResult);
		Object thirdResult = this.thirdJobService.doThirdJob(secondResult);
		return this.fourthJobService.doFourthJob(thirdResult);
	}
}

 

다시 이전 케이스로 돌아와서 순차적인 동기방식이 효율적이지 못할 상황을 가정해보자. 

@Service
public class SimpleContext {

  @Autowired
  private FirstJobService firstJobService;
  @Autowired
  private SecondJobService secondJobService;
  @Autowired
  private ThirdJobService thirdJobService;
  @Autowired
  private FourthJobService fourthJobService;

  public CombineContextResult combine() {
    // 각각의 API 호출 (단순 예제 코드이므로 Object type 사용)
    Object firstResult = this.firstJobService.doFirstJob(); // 3초 걸림.
    Object secondResult = this.secondJobService.doSecondJob(); // 1초 걸림.
    Object thirdResult = this.thirdJobService.doThirdJob(secondResult); // secondResult가 필요
    Object fourthResult = this.fourthJobService.doFourthJob(secondResult); // secondResult가 필요

    // 결과 조합
    return combine(firstResult, secondResult, thirdResult, fourthResult);
  }
}

 

firstResult 결과를 얻는데 3초가 걸린다. 제일 먼저 수행됨으로 인해 나머지 job들은 기본 3초를 기다려야 한다. 만약 비동기로 firstResult 결과를 기다리지 않고 second job을 실행시키면 시간을 아낄 수 있다. 그런데 third job 과 fourth job은 secondResult 가 필요하므로 단순히 모두 비동기로 호출할 순 없다. 이럴 경우 어떻게 효율적인 구조를 만들 수 있을까?

 

cf) 지금부터 설명할 비동기 구조를 내가 만든건 아니다. 팀에서 여러 개발자분들이 함께 고민해서 만들었다. 나는 이미 만들어진 구조를 정확하게 이해하기 위해 정리했다.

 

먼저 CombineActor 스프링 빈 객체가 있다. 필요한 서비스들과 executor를 주입받아 CombineContext 생성자로 넘겨준다. 추가적으로 필요한 데이터들은 CombineContextSource 에 담아서 같이 전달한다.

@Component
@RequiredArgsConstructor
public class CombineActor {

	private final FirstJobService firstJobService;
	private final SecondJobService secondJobService;
	private final ThirdJobService thirdJobService;
	private final FourthJobService fourthJobService;
	@Qualifier("callbackExecutor")
	private final Executor executor;

	public CombineContextResult combine(CombineSource source) {

		CombineContextSource contextSource = CombineContextSource.builder()
			.id(source.getId())
			.name(source.getName())
			.build();

		CombineContext combineContext = new CombineContext(
			contextSource,
			this.firstJobService,
			this.secondJobService,
			this.thirdJobService,
			this.fourthJobService,
			this.executor
		);

		return combineContext.combine();
	}
}

 

CombineContext 객체가 combine 메서드를 수행하면서 본격적인 작업이 시작된다. combine 메서드에서 제일 먼저 LazyContext 객체를 생성하는데 아래 private class 로 정의되어 있다. LazyContext 에서 주의깊게 볼 부분이 Getter lazy 설정이다.

@RequiredArgsConstructor
public class CombineContext {
  private final CombineContextSource contextSource;

  private final FirstJobService firstJobService;
  private final SecondJobService secondJobService;
  private final ThirdJobService thirdJobService;
  private final FourthJobService fourthJobService;

  private final Executor executor;

  public CombineContextResult combine() {
    LazyContext context = new LazyContext();

    CompletableFuture.allOf(
      context.getLazyFirstJobFuture(),
      context.getLazySecondJobFuture(),
      context.getLazyThirdJob(),
      context.getLazyFourthJob()
    ).join(); // join을 함으로써 모든 CompletableFuture 의 실행 완료 기다림

    return CombineContextResult.builder()
      .firstJobResult(context.getLazyFirstJobFuture().join())
      .secondJobResult(context.getLazySecondJobFuture().join().orElse(null))
      .thirdJobResult(context.getLazyThirdJob().join())
      .fourthJobResult(context.getLazyFourthJob().join())
      .build();
  }

  private class LazyContext {
    @Getter(lazy = true)
    private final CompletableFuture<Boolean> lazyFirstJobFuture = this.initFirstJob();

    @Getter(lazy = true)
    private final CompletableFuture<Optional<String>> lazySecondJobFuture = this.initSecondJob();

    @Getter(lazy = true)
    private final CompletableFuture<String> lazyThirdJob = this.initThirdJob();

    @Getter(lazy = true)
    private final CompletableFuture<Optional<Boolean>> lazyFourthJob = this.initFourthJob();

    private CompletableFuture<Boolean> initFirstJob() {
      return firstJobService.doFirstJob(contextSource.getId())
        .toFuture(); // toFuture() 호출 시 바로 subscribe 수행
    }

    private CompletableFuture<Optional<String>> initSecondJob() {
      return secondJobService.doSecondJob(contextSource.getId())
        .toFuture()
        .thenApply(Optional::ofNullable);
    }

    private CompletableFuture<String> initThirdJob() {
      return this.getLazySecondJobFuture()
        .thenApply(
          optionalString -> {
            // third job 작업
            return thirdJobService.doThirdJob();
          }
         );
    }

    private CompletableFuture<Optional<Boolean>> initFourthJob() {
      return this.getLazySecondJobFuture()
        .thenApplyAsync(
          optionalString -> {
            // Mono<Boolean> 리턴하는 webClient 호출
            return fourthJobService.doFourthJob(optionalString)
              .toFuture()
              .thenApply(Optional::of);
            },
            executor
          );
    }    
}

 

lazyFirstJobFuture, lazySecondJobFuture, lazyThirdJob, lazyFourthJob 변수들은 선언과 동시에 초기화를 진행하도록 되어있다. 그래서 LazyContext 객체가 생성되기전, 클래스가 로딩되는 시점에 초기화작업이 시작된다. 하지만 Getter lazy 애노테이션을 추가함으로써 각 필드의 getter 가 실행되는 시점에 초기화 작업이 실행된다.

 

만약 Getter lazy 를 쓰지 않으면, 호출 순서를 예상하기 어렵고 아직 초기화 되지 않은 대상을 먼저 접근하면 null 이다. 즉 Getter lazy 쓴다는것은 '실행 순서를 알 수 없으니까 초기화 시점에 실행하지 않겠다'는 뜻이다.

 

다음으로 LazyContext future 들을 CompletableFuture.allOf() 로 묶어서 호출한 후 join 한다. 호출순서를 먼저 고려해 던져놓으면 아래에서는 순서 상관없이 결과를 꺼낼 수 있다. 만약 여기서 join 을 안하면 아래에서 꺼내올 때 실제 호출이 발생하게되고 그 시점에서 호출 순서를 고려해야하므로 더 복잡해진다.

 

다음으로 initThirdJob, initFourthJob 메서드를 살펴보자. 둘 다 lazySecondJobFuture 결과를 얻은 후 각각의 작업을 수행한다. 즉 second job 에 대한 의존관계가 있다. 그래서 thenApply 를 통해 callback 처리를 한다. 그런데 initFourthJob 은 thenApply 가 아니라 thenApplyAsync 를 사용했다(주입받은 executor 사용).

 

thenApply, thenCompose 의 callback 은 reactor-thread 가 처리한다. reactor-thread 는 Mono / Flux 의 block 을 허용하지 않는다(netty event loop). 그래서 thenApply, thenCompose callback 안에서 외부호출 + block 이 존재하면 에러가 발생한다. 이 문제를 해결하기 위해 thenApplyAsync, thenComposeAsync 를 사용한다.

 

그렇다면 thenApplyAsync 와 thenComposeAsync 의 차이는 뭘까? 메서드 시그니처를 보면 Function 리턴타입인 U 부분만 다르고 나머지는 동일하다. thenComposeAsync 는 CompletionStage 타입만 리턴가능하다.

public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn, Executor executor) { ... }
    
public <U> CompletableFuture<U> thenComposeAsync(
    Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) { ... }

 

이렇게만 보면 이해가 잘 안간다. 아래 샘플 코드에서 getLazySecondJobFuture 결과를 기다려서 이후 작업을 시작하는것까진 동일하다. 하지만 thenApplyAsync 에서는 모든 타입이 리턴 가능하지만, thenComposeAsyncJob 에서는 WebClient 호출하고 리턴된 Mono 타입을 toFuture 로 변환시켜 리턴하는 것을 볼 수 있다. 

    private CompletableFuture<String> thenApplyAsyncJob() {
      return this.getLazySecondJobFuture()
        .thenApplyAsync(
          secondJobResult -> {
            // 모든 타입 리턴 가능
            return thirdJobService.doThirdJob(secondJobResult);
          }, executor
         );
    }

    private CompletableFuture<String> thenComposeAsyncJob() {
      return this.getLazySecondJobFuture()
        .thenComposeAsync(
          secondJobResult -> {
              // Mono<String> 리턴하는 webClient 호출
              return fourthJobService.doFourthJob(secondJobResult)
                .toFuture(); // CompletableFuture 타입 리턴
            }, executor
          );
    }

 

 

 

이제 위 구조에서 새로운 job 을 추가해보자. 먼저 그 작업에 필요한 의존관계를 살펴봐야 한다. 만약 모든 job 들의 결과가 필요하면 아래와 같이 코드를 작성한다.

private CompletableFuture<String> initAdditionalJob() {
  return CompletableFuture.allOf(
      this.getLazyFirstJobFuture(),
      this.getLazySecondJobFuture(),
      this.getLazyThirdJobFuture(),
      this.getLazyFourthJobFuture()
    )
    .thenApply(
      then -> {
        // 조합 작업
        return result;
      }
    );
}

 

그리고 LazyContext에 초기화 작업을 추가한다.

@Getter(lazy = true)
private final CompletableFuture<Boolean> lazyAdditionalJobFuture = this.initAdditionalJob();

 

그리고 combine 메서드 맨 위 allOf 에서 가장 마지막에 lazyAdditionalJobFuture를 넣어줘야 한다. 

CompletableFuture.allOf(
  context.getLazyFirstJobFuture(),
  context.getLazySecondJobFuture(),
  context.getLazyThirdJob(),
  context.getLazyFourthJob(),
  context.getLazyAddiotnalJob() // 새롭게 추가
).join();

 

만약 맨위에 넣어주게 되면 에러는 발생안하지만 비동기 효율은 안좋아진다.

CompletableFuture.allOf(
  context.getLazyAddiotnalJob() // 새롭게 추가(문제!!)
  context.getLazyFirstJobFuture(),
  context.getLazySecondJobFuture(),
  context.getLazyThirdJob(),
  context.getLazyFourthJob(),
).join();

 

그래서 이 구조는 새로운 job 추가가 쉽지 않은게 단점이다. 지금같은 극단적인 케이스가 아니라면 실수했을 때 인지하기 쉽지 않다. 하지만 이구조를 통해 의존관계 표현을 직접적으로 핸들링하지 않으므로 코드는 깔끔해진다.

 

 

다음으로 위 코드에서 CombineContext 는 빈이 아니다. 하지만 빈 객체를 사용하기 위해 주입을 받는다. 스프링은 이런 용도로 prototype scope bean 을 제공한다. 

@Component
public class CombineActor {
	
	// 문제 있는 코드
	@Autowired
	private CombineContext combineContext;

	public CombineContextResult combine(CombineSource source) {

		CombineContextSource contextSource = CombineContextSource.builder()
			.id(source.getId())
			.name(source.getName())
			.build();

		return combineContext.combine(contextSource);
	}
}

주의: 위와 같이 DI 받으면 안됌. 

 

import javax.inject.Inject;
import javax.inject.Provider;

... 

@Inject
Provider<CombineContext> contextProvider;

public CombineContextResult combine(CombineSource source) {

	CombineContextSource contextSource = CombineContextSource.builder()
		.id(source.getId())
		.name(source.getName())
		.build();

	CombineContext combineContext = this.contextProvider.get();
	return combineContext.combine(contextSource);
}

...
@Component
@Scope("prototype")
public class CombineContext {
	private CombineContextSource contextSource;

	@Autowired
	private FirstJobService firstJobService;
	@Autowired
	private SecondJobService secondJobService;
	@Autowired
	private ThirdJobService thirdJobService;
	@Autowired
	private FourthJobService fourthJobService;
	@Autowired
	private ErrorService errorService;
	@Qualifier("callbackExecutor")
	@Autowired
	private Executor executor;

	public CombineContextResult combine(CombineContextSource contextSource) {
		this.contextSource = contextSource;
		... 이하 생략
	}
}

 

마지막으로 ThreadPoolTaskExecutor 에 설정에 대해서 살펴보자.

@Bean("callbackExecutor")
Executor callbackExecutor() {
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  executor.setThreadNamePrefix("callback");

  // 최초 10개 스레드에서 처리하다, 더필요하게 되면 50개까지 큐를 먼저 채운다. 그리고도 더 필요하면 스레드를 100개 까지 늘린다.
  executor.setCorePoolSize(10);
  executor.setQueueCapacity(50);
  executor.setMaxPoolSize(100);

  // application이 shutdown 될 때 task 가 완료되길 기다린다(interrupt 방지).
  executor.setWaitForTasksToCompleteOnShutdown(true);
  // application이 shutdown 될 때 task가 완료되길 기다리는 시간
  executor.setAwaitTerminationSeconds(10);
    
	
  // RejectedExecutionException 예외가 발생할 때 핸들러
  // 디폴트 핸들러. RejectedExecutionException을 발생
  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    
  // ThreadPoolTaskExecutor 를 요청한 thread(여기서는 reactor-thread) 가 직접 처리
  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    
  // task를 버린다.
  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    
  // 처리되지 않은 가장 오래된 요청을 버리고 다시 execute 시도
  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
    
  return executor;
}

 

반응형