WebClient 를 이용할 때, Retry 기능과 CircuitBreaker 기능을 추가할 수 있다.
구체적인 방법과 함께 테스트 코드는 어떻게 만드는지 알아보자.
먼저 전체 코드를 간단히 훑어보고 뒤에서 자세히 설명하겠다. 응답 status code 가 501 이상이면, CircuitBreakerException 을 만들어서 넘긴다. 하지만 timeout 에러면 0.5초 간격으로 1번 retry 를 수행한다. retry 를 수행해도 에러가 발생하면 RetryExhaustedException 으로 감싸져서 나오기 때문에 onErrorMap 으로 잡아서 cause 를 빼내 뒤로 전달한다.
.exchangeToMono(clientResponse -> {
if (clientResponse.rawStatusCode() > HttpStatus.INTERNAL_SERVER_ERROR.value()) {
return clientResponse
.createException()
.flatMap(CircuitBreakerException::createExceptionMono);
}
// 생략
})
.doOnSuccess(
// 서버에서 responseBody 로 내려준 data 중 responseType이 "SUCCESS" 인지 확인
)
.then() // insert api 라서 리턴타입을 Mono<Void> 로 만들기 위함
.retryWhen(
Retry // resilience4j.Retry 가 아닌 reactor.core Retry 사용
.backoff(1, Duration.ofMillis(500))
.filter(WebUtils::isTimeoutError)
)
.onErrorMap(
Exceptions::isRetryExhausted,
Throwable::getCause
)
.transformDeferred(CircuitBreakerOperator.of(this.circuitBreaker))
.onErrorResume(
throwable -> {
return ApiServerException.createExceptionMono(
// 생략
);
}
);
// WebUtils
public static boolean isTimeoutError(Throwable throwable) {
return throwable instanceof WebClientRequestException // org.springframework.web.reactive.function.client
&& throwable.getCause() instanceof TimeoutException; // io.netty.handler.timeout
}
CircuitBreaker
우리 프로젝트에서는 CircuitBreakerRegistry 만 빈으로 등록해서 사용했다.
@Configuration
@RequiredArgsConstructor
public class YBSApiClientConfig {
private final CircuitBreakerRegistry circuitBreakerRegistry;
@Bean
YBSApiClient ybsApiClient(@Qualifier("ybsWebClient") WebClient webClient) {
return new YBSApiClient(webClient, circuitBreakerRegistry);
}
그래서 CircuitBreaker 객체를 이용 하기 위해서는 registry 에서 직접 꺼내 줘야 한다.
private final CircuitBreaker circuitBreaker;
public YBSApiClient(WebClient webClient, CircuitBreakerRegistry circuitBreakerRegistry) {
this.webClient = webClient;
this.circuitBreaker = circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME);
}
...
cf) CircuitBreakerRegistry 뿐만 아니라 CircuitBreaker 도 빈으로 등록해서 사용할 수 도 있다.
@Configuration
public class CircuitBreakerConfig {
@Bean
public CircuitBreakerRegistry circuitBreakerRegistry() {
return CircuitBreakerRegistry.of(
CircuitBreakerConfig.custom()
// 옵션 추가
.build()
);
}
@Bean
public CircuitBreaker circuitBreaker(CircuitBreakerRegistry circuitBreakerRegistry) {
return circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME);
}
}
다음으로, yml 에 CircuitBreakerException 를 record-exceptions 에 등록해준다. 그리고 요청 단계에서도 실패 할 수 있으니 WebClientRequestException 을 추가한다.
resilience4j.circuitbreaker:
configs:
default:
record-exceptions:
- com.ybs.exception.CircuitBreakerException
- org.springframework.web.reactive.function.client.WebClientRequestException
CircuitBreakerException 는 우리가 새롭게 만든 클래스다. resilience4j 가 제공하는 클래스가 아니다.
public class CircuitBreakerException extends WebClientResponseException {
private CircuitBreakerException(WebClientResponseException exception) {
super(
exception.getMessage(),
exception.getRawStatusCode(),
exception.getStatusText(),
exception.getHeaders(),
exception.getResponseBodyAsByteArray(),
exception.getHeaders().getContentType().getCharset(),
exception.getRequest()
);
}
// exchangeToMono 전용
public static <T> Mono<T> createExceptionMono(WebClientResponseException exception) {
return Mono.error(new CircuitBreakerException(exception));
}
// retrieve 전용
public static CircuitBreakerException createException(WebClientResponseException exception) {
return new CircuitBreakerException(exception);
}
}
응답 status code 가 501 이상이면, CircuitBreakerException 을 만들어서 넘기는 코드를 다시 살펴보자. retrieve 를 사용해도 된다.
cf) flatMap, map 사용이 했갈린다면 이전에 정리해둔 글을 읽어보면 도움이 된다.
// 1. exchangeToMono 버전
.exchangeToMono(clientResponse -> {
if (clientResponse.rawStatusCode() > HttpStatus.INTERNAL_SERVER_ERROR.value()) {
return clientResponse
.createException()
.flatMap(CircuitBreakerException::createExceptionMono);
}
})
// 2. retrieve 버전
.retrieve()
.onStatus(
httpStatus -> httpStatus.value() > HttpStatus.INTERNAL_SERVER_ERROR.value(),
clientResponse -> clientResponse.createException()
.map(CircuitBreakerException::createException)
)
다음으로 transformDeferred 메서드 체인을 활용해서 CircuitBreakerOperator.of(this.circuitBreaker) 를 사용하면, CircuitBreakerException 예외 발생을 CircuitBreaker 가 캐치해서 발동시킨다. 서킷이 발동되면 CallNotPermittedException 이 발생하고 아래 onErrorResume 이 받아서 처리한다.
.transformDeferred(CircuitBreakerOperator.of(this.circuitBreaker))
transformDeferred
결론부터 말하면 transform 을 사용해도 된다. 하지만 공식 가이드 문서 에서 transformDeferred 를 사용했기 때문에 따라 했다. 리액티브 패러다임에서 defer 는 여러번 subscribe 할 때 의미가 있다. WebClient 에서 한번 API 를 호출하는 경우는 굳이 transformDeferred 를 안써도 된다. Mono.defer 에 대한 자세한 내용은 여기에 정리해 놨다.
Retry
첫번째 시도에서 timeout이 발생했고 두번째 시도에서 성공했다면 CircuitBreaker 에 기록되지 않게 하고싶어서 onErrorMap 으로 Exceptions::isRetryExhausted 인지 확인하는 작업을 수행한다. 두번의 시도 모두 timeout 으로 실패하면 cause 를 빼내 circuit 에 전달한다.
.retryWhen(
Retry // resilience4j.Retry 가 아닌 reactor.core Retry 사용
.backoff(1, Duration.ofMillis(500))
.filter(WebUtils::isTimeoutError)
)
.onErrorMap(
Exceptions::isRetryExhausted,
Throwable::getCause
)
.transformDeferred(CircuitBreakerOperator.of(this.circuitBreaker))
.onErrorResume(
throwable -> {
return ApiServerException.createExceptionMono(
// 생략
);
}
);
// WebUtils
public static boolean isTimeoutError(Throwable throwable) {
return throwable instanceof WebClientRequestException
&& throwable.getCause() instanceof TimeoutException;
}
Test
먼저 테스트에서는 쉽게 CircuitBreaker 가 발동되어야 하므로 minimumNumberOfCalls 는 1로 설정했다. 자세한 설정 정보는 여기에 있다.
resilience4j.circuitbreaker:
configs:
default:
minimumNumberOfCalls: 1
slidingWindowSize: 1
waitDurationInOpenState: 50ms
그리고 circuitBreakerRegistry 빈을 가져오고 WireMock 을 활용했다.
@Autowired
lateinit var sut: YBSApiClient
@Autowired
lateinit var circuitBreakerRegistry: CircuitBreakerRegistry
@BeforeEach
fun setUp() {
WireMock.reset()
this.circuitBreakerRegistry.allCircuitBreakers.forEach {
it.transitionToClosedState()
}
}
@AfterEach
fun tearDown() {
WireMock.reset()
this.circuitBreakerRegistry.allCircuitBreakers.forEach {
it.transitionToClosedState()
}
}
테스트 시나리오1 : 첫번째 시도 timeout 그리고 두번째 시도 성공 -> 서킷 안열렸고 failedCalls 0 개
@Test
fun retrySuccess() {
givenThat(
post(urlPathEqualTo("/api/1234"))
.inScenario("first-timeout-second-success")
.whenScenarioStateIs(STARTED)
.willReturn(
aResponse()
.withFixedDelay(1010),
)
.willSetStateTo("next"),
)
givenThat(
post(urlPathEqualTo("/api/1234"))
.inScenario("first-timeout-second-success")
.whenScenarioStateIs("next")
.willReturn(
okJson(
"""
{
"responseType": "Success"
}
""".trimIndent(),
),
),
)
StepVerifier.create(this.sut.apiCall()).verifyComplete()
then(circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME).state == CircuitBreaker.State.OPEN).isFalse
then(circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME).metrics.numberOfFailedCalls).isEqualTo(0)
}
테스트 시나리오2 : 첫번째 시도 timeout 그리고 두번째 시도 503 -> 서킷 열렸고 failedCalls 1 개
@Test
fun retryFailWithTimeoutAndServiceUnavailable() {
givenThat(
post(urlPathEqualTo("/api/1234"))
.inScenario("first-timeout-second-serviceUnavailable")
.whenScenarioStateIs(STARTED)
.willReturn(
aResponse()
.withFixedDelay(1010),
)
.willSetStateTo("next"),
)
givenThat(
post(urlPathEqualTo("/api/1234"))
.inScenario("first-timeout-second-serviceUnavailable")
.whenScenarioStateIs("next")
.willReturn(serviceUnavailable()),
)
StepVerifier.create(this.sut.apiCall())
.verifyError(ApiServerException::class.java)
then(circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME).state == CircuitBreaker.State.OPEN).isTrue
then(circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME).metrics.numberOfFailedCalls).isEqualTo(1)
}
테스트 시나리오3 : 첫번째 시도 500 에러. 그래서 Retry 없음 -> 서킷 안열림(500이니까)
@Test
fun internalServerError() {
givenThat(
post(urlPathEqualTo("/api/1234"))
.willReturn(serverError()),
)
StepVerifier.create(this.sut.apiCall())
.expectErrorMatches { exception ->
exception is ApiServerException && !exception.isTimeoutError
}
.verify()
then(circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME).state == CircuitBreaker.State.OPEN).isFalse
then(circuitBreakerRegistry.circuitBreaker(CIRCUIT_BREAKER_NAME).metrics.numberOfFailedCalls).isEqualTo(1)
}
cf) exception.isTiemoutError 코드는 아래와 같다.
public class ApiServerException {
public boolean isTimeoutError() {
return getCause() instanceof WebClientRequestException // org.springframework.web.reactive.function.client
&& getCause().getCause() instanceof TimeoutException; // io.netty.handler.timeout
}
}
'Spring' 카테고리의 다른 글
EDA 에서 NullPointerException 조심하기 (0) | 2023.04.15 |
---|---|
WebClient 인코딩 중복 이슈 (0) | 2023.03.13 |
WebClient 사용할때 주의 (8편) (0) | 2022.09.18 |
Kafka 이벤트 발행과 DB 저장(redis) 트랜잭션 (0) | 2022.03.09 |
WebClient 사용할때 주의 (7편) (0) | 2022.03.07 |