본문 바로가기
Computer Engineering

DB 장애로 인한 EDA 기반 이벤트 보정처리

by ybs 2023. 1. 15.
반응형

지난주 갑자기 모든 애플리케이션 서버들이 DB 서버와의 네트워크 연결을 실패했다. 전면장애인 상황이다. DB 에 문제가 생기면 내가 담당하는 주문서 서버는 어떤 문제가 발생하고 왜 보정처리가 필요한지 알아보자.

// api 응답 에러 메세지
Could not open JDBC Connection for transaction;
nested exception is java.sql.SQLException: Cannot get a connection, 
pool error Timeout waiting for idle object

 

사용자가 주문서 화면에서 결제 버튼을 누르면 유효성 검사를 거쳐 결제가 진행되고 최종적으로 주문완료 작업을 수행한다. 만약 결제가 수행되기 전에 DB 문제가 생긴다면 사용자는 불편함을 겪겠지만 우리가 데이터를 보정 해줄건 없다. 하지만 결제까지 끝난 다음에 DB 문제가발생했다면 결제 취소 등 보정처리를 해줘야 한다.

 

주문서 서버는 EDA 로 설계되었지만 주문/결제 서버와는 api 로 직접 통신했다. DB 서버가 죽으면 주문 완료 요청(2번)이 실패한 후, 주문 실패 이벤트 처리 작업(4번)을 수행하지만 마찬가지로 실패하게 된다(6번).

cf) 2024년 2월 기준, 이 구간 통신도 kafka pub/sub 구조로 바꿨다.

 

조금더 구체적으로 알아보자. 사용자가 주문서 화면에서 결제 버튼을 클릭하면, 주문서 서버는 유효성 검사와 결제를 진행 한 후 주문 완료 이벤트를 받아 마지막 작업을 시작한다.

@Component
@RequiredArgsConstructor
public static class CommandMessageHandler {
  // number 1
  public void onCompleteOrder(CompleteOrder completeOrder) {
    onCompleteOrder(completeOrder);
  }
}

 

api 응답이 실패하면 onCompleteOrderFailed 이벤트를 발행시켜 4번으로 보낸다.

public void completeOrder(CompleteOrder completeOrder) { 
  CompleteOrderRequestDto dto = toCompleteOrderRequestDto(completeOrder);

  try {    
    apiClient.completeOrder(dto).block(); // number 2
  } catch (Exception ex) {
    // publish onCompleteOrderFailed event(go to number 4)
  }
}

 

DB 가 죽었으니 마찬가지로 주문실패처리 요청(5번) 도 실패한다. 그런데 catch 에서 로그만 남기고 끝내버린다. 다시말해 KafkaListener 에 예외를 던져버린다.

@Component
@RequiredArgsConstructor
public static class MessageMessageHandler {
  // number 4
  public void onCompleteOrderFailed(CompleteOrderFailed completeOrderFailed) {
    onCompleteOrderFailed(completeOrderFailed);
  }
}


public void onCompleteOrderFailed(completeOrderFailed) {
  try {
    apiClient.rollbackForConsistence(completeOrderFailed); // number 5
  } catch(Exception ex) {
    log.error("error!!", ex)
  }
}

 

springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder 에서는 afterRollbackProcessor 로 DefaultAfterRollbackProcessor 를 사용한다. DefaultAfterRollbackProcessor 객체를 생성할 때 BackOff 도 넣어주는데 createBackOff 가보면 ConsumerProperties 의 maxAttempts 가 3으로 되어있다. maxAttempts 는 처리 실패 시 메시지 처리 시도 횟수(첫 번째 시도 포함)를 의미한다. 프레임워크에서 제공하는 RetryTemplate 구성이다. 우리는 디폴트를 사용하고 있었기 때문에 총 3번 consume 을 시도한다.

messageListenerContainer.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(
  (record, exception) -> {
    MessagingException payload =
        new MessagingException(((RecordMessageConverter) messageConverter)
              .toMessage(record, null, null, null),
              "Transaction rollback limit exceeded", exception);
    try {
      errorInfrastructure.getErrorChannel()
          .send(new ErrorMessage(
              payload,
              Collections.singletonMap(
                IntegrationMessageHeaderAccessor.SOURCE_DATA,
                record
              )
            )
          );
    }
    catch (Exception e) {
      /*
      * When there is no DLQ, the FinalRethrowingErrorMessageHandler will re-throw
      * the payload; that will subvert the recovery and cause a re-seek of the failed
      * record, so we ignore that here.
      */
      if (!e.equals(payload)) {
        throw e;
      }
    }
  }, createBackOff(extendedConsumerProperties),
  new KafkaTemplate<>(transMan.getProducerFactory()),
  extendedConsumerProperties.getExtension().isTxCommitRecovered()));

 

해당 이벤트 consume 을 3번 재시도 했지만 모두 실패했기 때문에 마지막으로 아래와 같은 에러 메세지를 받게 된다.

Transaction rollback limit exceeded; nested exception is 
org.springframework.kafka.listener.ListenerExecutionFailedException

 

DB 서버가 정상화 된 후, 데이터를 보정하는 방법은 주문 실패 처리(4번)를 다시 할 수 있게 onCompleteOrderFailed 이벤트를 직접 재발행하는것이다. 그러면 주문 실패처리가 진행 되고, 주문서도 뒤이은 롤백 프로세스가 동작해서 꼬인 데이터들을 처리할 수 있다.

 

onCompleteOrderFailed 이벤트를 직접 재발행하기 위해선 먼저 topic, partition, offset 정보 를 알아야 한다. topic 은 애플리케이션 프로퍼티 설정에서 찾을 수 있고 partition, offset 은 에러 로그에서 찾으면 된다(3번에서 api error 가 발생한 후  producer 가 주문 실패 이벤트를 발행했던 offset).

아파치 카프카 애플리케이션 프로그래밍 p72

 

offset 을 찾았으면 다음은 offset 의 메세지를 찾아야 한다. 사내 운영툴을 이용해 검색으로 찾을 수 있었고 그 메세지 값을 복사해 key 만 새롭게 generate 해서 다시 이벤트를 발생시켰다. 개인 블로그라서 이 부분을 자세히 설명 못하는게 아쉽다.

 

마지막으로, API 통신이 아닌 kafka pub/sub 구조일 때 DB 장애가 발생하면 어떻게 될까? API 에러 응답은 받지 않겠지만 command/event 객체가 특정 상태에서 계속 멈춰있게 된다. 우리는 일정 시간이 지나도 계속 멈춰있는 객체들을 따로 수집하고 보정처리를 진행한다.

반응형