본문 바로가기
Spring

WebClient + Retry + CircuitBreaker

by ybs 2022. 9. 24.
반응형

WebClient 를 이용할 때, Retry 기능과 CircuitBreaker 기능을 추가할 수 있다.

구체적인 방법과 함께 테스트 코드는 어떻게 만드는지 알아보자.

 

먼저 전체 코드를 간단히 훑어보고 뒤에서 자세히 설명하겠다. 응답 status code 가 501 이상이면, CircuitBreakerException 을 만들어서 넘긴다. 하지만 timeout 에러면 0.5초 간격으로 1번 retry 를 수행한다. retry 를 수행해도 에러가 발생하면 RetryExhaustedException 으로 감싸져서 나오기 때문에 onErrorMap 으로 잡아서 cause 를 빼내 뒤로 전달한다.

.exchangeToMono(clientResponse -> {
  if (clientResponse.rawStatusCode() > HttpStatus.INTERNAL_SERVER_ERROR.value()) {
    return clientResponse
      .createException()
      .flatMap(CircuitBreakerException::createExceptionMono);
  }

  // 생략
})
.doOnSuccess( 
  // 서버에서 responseBody 로 내려준 data 중 responseType이 "SUCCESS" 인지 확인
)
.then() // insert api 라서 리턴타입을 Mono<Void> 로 만들기 위함
.retryWhen(
  Retry // resilience4j.Retry 가 아닌 reactor.core Retry 사용
    .backoff(1, Duration.ofMillis(500))
    .filter(WebUtils::isTimeoutError)
)
.onErrorMap(
  Exceptions::isRetryExhausted,
  Throwable::getCause
)
.transformDeferred(CircuitBreakerOperator.of(this.circuitBreaker))
.onErrorResume(
  throwable -> {
    return ApiServerException.createExceptionMono(
      // 생략
    );
  }
);

// WebUtils
public static boolean isTimeoutError(Throwable throwable) {
  return throwable instanceof WebClientRequestException // org.springframework.web.reactive.function.client
    && throwable.getCause() instanceof TimeoutException; // io.netty.handler.timeout
}

 

CircuitBreaker

우리 프로젝트에서는 CircuitBreakerRegistry 만 빈으로 등록해서 사용했다.

@Configuration
@RequiredArgsConstructor
public class YBSApiClientConfig {

	private final CircuitBreakerRegistry circuitBreakerRegistry;

	@Bean
	YBSApiClient ybsApiClient(@Qualifier("ybsWebClient") WebClient webClient) {
		return new YBSApiClient(webClient, circuitBreakerRegistry);
	}

 

그래서 CircuitBreaker 객체를 이용 하기 위해서는 registry 에서 직접 꺼내 줘야 한다.

private final CircuitBreaker circuitBreaker;

public YBSApiClient(WebClient webClient, CircuitBreakerRegistry circuitBreakerRegistry) {
	this.webClient = webClient;
	this.circuitBreaker = circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME);
}

...

 

cf) CircuitBreakerRegistry 뿐만 아니라 CircuitBreaker 도 빈으로 등록해서 사용할 수 도 있다.

@Configuration
public class CircuitBreakerConfig {

    @Bean
    public CircuitBreakerRegistry circuitBreakerRegistry() {
        return CircuitBreakerRegistry.of(
                CircuitBreakerConfig.custom()
                    // 옵션 추가
                    .build()
        );
    }

    @Bean
    public CircuitBreaker circuitBreaker(CircuitBreakerRegistry circuitBreakerRegistry) {
        return circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME);
    }
}

 

다음으로, yml 에 CircuitBreakerException 를 record-exceptions 에 등록해준다. 그리고 요청 단계에서도 실패 할 수 있으니 WebClientRequestException 을 추가한다.

resilience4j.circuitbreaker:
  configs:
    default:
      record-exceptions:
        - com.ybs.exception.CircuitBreakerException
        - org.springframework.web.reactive.function.client.WebClientRequestException

 

CircuitBreakerException 는 우리가 새롭게 만든 클래스다. resilience4j 가 제공하는 클래스가 아니다.

public class CircuitBreakerException extends WebClientResponseException {
  private CircuitBreakerException(WebClientResponseException exception) {
    super(
      exception.getMessage(),
      exception.getRawStatusCode(),
      exception.getStatusText(),
      exception.getHeaders(),
      exception.getResponseBodyAsByteArray(),
      exception.getHeaders().getContentType().getCharset(),
      exception.getRequest()
    );
  }


  // exchangeToMono 전용
  public static <T> Mono<T> createExceptionMono(WebClientResponseException exception) {
    return Mono.error(new CircuitBreakerException(exception));
  }

  // retrieve 전용
  public static CircuitBreakerException createException(WebClientResponseException exception) {
    return new CircuitBreakerException(exception);
  }
}

 

응답 status code 가 501 이상이면, CircuitBreakerException 을 만들어서 넘기는 코드를 다시 살펴보자. retrieve 를 사용해도 된다.

cf) flatMap, map 사용이 했갈린다면 이전에 정리해둔 글을 읽어보면 도움이 된다.

// 1. exchangeToMono 버전
.exchangeToMono(clientResponse -> {
	if (clientResponse.rawStatusCode() > HttpStatus.INTERNAL_SERVER_ERROR.value()) {
		return clientResponse
			.createException()
			.flatMap(CircuitBreakerException::createExceptionMono);
	}
})

// 2. retrieve 버전 
.retrieve()
.onStatus(
	httpStatus -> httpStatus.value() > HttpStatus.INTERNAL_SERVER_ERROR.value(),
	clientResponse -> clientResponse.createException()
		.map(CircuitBreakerException::createException)
)

 

다음으로 transformDeferred 메서드 체인을 활용해서 CircuitBreakerOperator.of(this.circuitBreaker) 를 사용하면, CircuitBreakerException 예외 발생을 CircuitBreaker 가 캐치해서 발동시킨다. 서킷이 발동되면 CallNotPermittedException 이 발생하고 아래 onErrorResume 이 받아서 처리한다.

.transformDeferred(CircuitBreakerOperator.of(this.circuitBreaker))

 

transformDeferred

결론부터 말하면 transform 을 사용해도 된다. 하지만 공식 가이드 문서 에서 transformDeferred 를 사용했기 때문에 따라 했다. 리액티브 패러다임에서 defer 는 여러번 subscribe 할 때 의미가 있다. WebClient 에서 한번 API 를 호출하는 경우는 굳이 transformDeferred 를 안써도 된다. Mono.defer 에 대한 자세한 내용은 여기에 정리해 놨다.

 

Retry

첫번째 시도에서 timeout이 발생했고 두번째 시도에서 성공했다면 CircuitBreaker 에 기록되지 않게 하고싶어서 onErrorMap 으로 Exceptions::isRetryExhausted 인지 확인하는 작업을 수행한다. 두번의 시도 모두 timeout 으로 실패하면 cause 를 빼내 circuit 에 전달한다.

.retryWhen(
	Retry // resilience4j.Retry 가 아닌 reactor.core Retry 사용
		.backoff(1, Duration.ofMillis(500))
		.filter(WebUtils::isTimeoutError)
)
.onErrorMap(
	Exceptions::isRetryExhausted,
	Throwable::getCause
)
.transformDeferred(CircuitBreakerOperator.of(this.circuitBreaker))
.onErrorResume(
	throwable -> {
		return ApiServerException.createExceptionMono(
			// 생략
		);
	}
);

// WebUtils
public static boolean isTimeoutError(Throwable throwable) {
	return throwable instanceof WebClientRequestException
		&& throwable.getCause() instanceof TimeoutException;
}

 

Test

먼저 테스트에서는 쉽게 CircuitBreaker 가 발동되어야 하므로 minimumNumberOfCalls 는 1로 설정했다. 자세한 설정 정보는 여기에 있다.

resilience4j.circuitbreaker:
  configs:
    default:
      minimumNumberOfCalls: 1
      slidingWindowSize: 1
      waitDurationInOpenState: 50ms

 

그리고 circuitBreakerRegistry 빈을 가져오고 WireMock 을 활용했다.

    @Autowired
    lateinit var sut: YBSApiClient

    @Autowired
    lateinit var circuitBreakerRegistry: CircuitBreakerRegistry

    @BeforeEach
    fun setUp() {
        WireMock.reset()
        this.circuitBreakerRegistry.allCircuitBreakers.forEach {
            it.transitionToClosedState()
        }
    }

    @AfterEach
    fun tearDown() {
        WireMock.reset()
        this.circuitBreakerRegistry.allCircuitBreakers.forEach {
            it.transitionToClosedState()
        }
    }

 

테스트 시나리오1 : 첫번째 시도 timeout 그리고 두번째 시도 성공 -> 서킷 안열렸고 failedCalls 0 개

@Test
fun retrySuccess() {
  givenThat(
      post(urlPathEqualTo("/api/1234"))
          .inScenario("first-timeout-second-success")
          .whenScenarioStateIs(STARTED)
          .willReturn(
              aResponse()
                  .withFixedDelay(1010),
          )
          .willSetStateTo("next"),
  )

  givenThat(
      post(urlPathEqualTo("/api/1234"))
          .inScenario("first-timeout-second-success")
          .whenScenarioStateIs("next")
          .willReturn(
              okJson(
                  """
                  {
                      "responseType": "Success"
                  }
                  """.trimIndent(),
              ),
          ),
  )

  StepVerifier.create(this.sut.apiCall()).verifyComplete()

  then(circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME).state == CircuitBreaker.State.OPEN).isFalse
  then(circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME).metrics.numberOfFailedCalls).isEqualTo(0)
}

 

테스트 시나리오2 :  첫번째 시도 timeout 그리고 두번째 시도 503 -> 서킷 열렸고 failedCalls 1 개

@Test
fun retryFailWithTimeoutAndServiceUnavailable() {
  givenThat(
      post(urlPathEqualTo("/api/1234"))
          .inScenario("first-timeout-second-serviceUnavailable")
          .whenScenarioStateIs(STARTED)
          .willReturn(
              aResponse()
                  .withFixedDelay(1010),
          )
          .willSetStateTo("next"),
  )

  givenThat(
      post(urlPathEqualTo("/api/1234"))
        .inScenario("first-timeout-second-serviceUnavailable")
        .whenScenarioStateIs("next")
        .willReturn(serviceUnavailable()),
  )

  StepVerifier.create(this.sut.apiCall())
    .verifyError(ApiServerException::class.java)

  then(circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME).state == CircuitBreaker.State.OPEN).isTrue
  then(circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME).metrics.numberOfFailedCalls).isEqualTo(1)
}

 

테스트 시나리오3 : 첫번째 시도 500 에러. 그래서 Retry 없음 -> 서킷 안열림(500이니까)

@Test
fun internalServerError() {
  givenThat(
      post(urlPathEqualTo("/api/1234"))
        .willReturn(serverError()),
  )

  StepVerifier.create(this.sut.apiCall())
    .expectErrorMatches { exception ->
      exception is ApiServerException && !exception.isTimeoutError
    }
    .verify()

  then(circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME).state == CircuitBreaker.State.OPEN).isFalse
  then(circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME).metrics.numberOfFailedCalls).isEqualTo(1)
}

 

cf) exception.isTiemoutError 코드는 아래와 같다.

public class ApiServerException {
  public boolean isTimeoutError() {
    return getCause() instanceof WebClientRequestException // org.springframework.web.reactive.function.client
      && getCause().getCause() instanceof TimeoutException; // io.netty.handler.timeout
  }
}
반응형