본문 바로가기
Spring

Kafka 이벤트 발행과 DB 저장(redis) 트랜잭션

by ybs 2022. 3. 9.
반응형

EDA 환경에서 일반적으로 kafka 이벤트 발행과 db 저장은 한 transaction 으로 묶을 수 없다. 물론 RDB 를 사용한다면 뒤에 서술할 Transactional Outbox Pattern 을 활용할 수 있다. 하지만 내가 속한 프로젝트는 redis in-memory nosql db 를 사용했다. 그래서 다른 방식으로 transaction 문제를 해결했다.

 

cf) kafka transaction 에 대해서 더 자세히 알고 싶다면 이전 글에서 자세히 설명했다.

 

계좌이체 준비  예제

핵심부분만 추려서 코드를 재구성해, 계좌이체 준비 예제로 설명하려 한다. 먼저 아래 prepareAccountTransfer 메서드는 전달받은 processId와 userNo 를 갖고 AccountTransfer 객체를 만든다.

@Service
@RequiredArgsConstructor
public class AccountTransferService {

  private final AccountTransferRepository accountTransferRepository;

  public void prepareAccountTransfer(String processId, long userNo) {

    AccountTransfer accountTransfer = AccountTransfer.prepareAccountTransfer(processId, userNo);

    accountTransferRepository.save(accountTransfer);
  }
}

 

AccountTransfer 객체는 processId, userNo 필드 뿐만아니라 생성된 AccountTransferPrepared 이벤트 객체를 담고 있는 pendingEvents 리스트도 갖고있다. 아래 prepareAccountTransfer 메서드는 AccountTransferPrepared 이벤트 객체를 생성하고 pendingEvents 에 담는다.

 

AccountTransferPrepared 이벤트 뿐만 아니라 '계좌이체 실행' 이벤트 등 다양한 이벤트들을 같이 처리하기 위해 pendAndApplyEvent 메서드는 T 타입으로 받았다. AccountTransfer 객체가 pendingEvents 리스트를 갖고 있는 이유는 뒤에서 설명하겠다. cf) @JsonTypeInfo 나 @JsonSubTypes 내용은 이전 objectMapper 글에서 자세히 설명했다.

@Getter
public class AccountTransfer {
  @NotBlank
  private String processId;
  private long userNo;
    
  @JsonTypeInfo(use = Id.NAME, include = As.WRAPPER_OBJECT)
  private List<DomainEvent> pendingEvents = new ArrayList<>();

  public static AccountTransfer prepareAccountTransfer(String processId, long userNo) {
    AccountTransfer accountTransfer = new AccountTransfer();
    accountTransfer.pendAndApplyEvent(
      AccountTransferPrepared.builder()
        .processId(processId)
        .userNo(userNo)
        .build(),
      accountTransfer::apply
    );

    return accountTransfer;
  }

  @JsonSubTypes({
    @Type(value = AccountTransferPrepared.class, name = "AccountTransferPrepared")
  })
  protected void setPendingEvents(List<DomainEvent> pendingEvents) {
    this.pendingEvents = pendingEvents;
  }

  private <T extends DomainEvent> void pendAndApplyEvent(T event, Consumer<T> apply) {
    this.pendingEvents.add(event);
    apply.accept(event);
  }
  
  private void apply(AccountTransferPrepared accountTransferPrepared) {
    this.processId = accountTransferPrepared.getProcessId();
    this.userNo = accountTransferPrepared.getUserNo();
  }
}

@Getter
@Setter
@Builder
public class AccountTransferPrepared implements DomainEvent {
  @NotBlank
  private String processId;
  private long userNo;
}

 

이렇게 해서 얻어진 accountTransfer 를 갖고 accountTransferRepository.save(accountTransfer) 를 수행한다. save 메서드에서는 먼저 accountTransfer 를 redis db 에 저장한다. 저 시점에서 accountTransfer 객체는 pending 된 이벤트를 갖고 있다. 그래서 이벤트 데이터가 db 에 같이 저장된다. 이렇게 먼저 db에 저장하는 이유는 메세지 유실을 막기위해서다.

@Repository
public class AccountTransferRepository {

  private final RedisOperations<String, AccountTransfer> redisOperations;
  private final EventPublisher eventPublisher;
  private final TransactionTemplate kafkaTransactionTemplate;

  public AccountTransferRepository(
    RedisOperations<String, AccountTransfer> redisOperations,
    EventPublisher eventPublisher,
    KafkaAwareTransactionManager<byte[], byte[]> kafkaAwareTransactionManager
  ) {
      this.redisOperations = redisOperations;
      this.eventPublisher = eventPublisher;
      this.kafkaTransactionTemplate = new TransactionTemplate(kafkaAwareTransactionManager);
  }
    
  public void save(AccountTransfer accountTransfer) {

    // 1. accountTransfer 에 pending 된것을 가진 상태로 redis db 저장
    this.redisOperations.opsForValue().set(
      generateId(accountTransfer.getProcessId()),
      accountTransfer,
      Duration.ofMillis(Instant.now().plus(90, ChronoUnit.DAYS).toEpochMilli())
    );

    this.kafkaTransactionTemplate.executeWithoutResult(status -> {
      if (!pendingEvents.isEmpty()) {
        // 2. pending 되어있는거 꺼내서 이벤트 발행
        this.eventMessagePublisher.publish(pendingEvents);
        // // 3. pending 되어있는것들을 clear 시킴
        accountTransfer.clearPendingEvents();
      }

      TransactionSynchronizationManager.registerSynchronization(
        new TransactionSynchronization() {
          @Override
          public void afterCommit() {
            // 4. kafka transaction 안에서 commit 콜백 걸어서 redis db 저장
            // pending 된게 남아있지 않은 accountTransfer 를 다시 저장
            redisOperations.opsForValue().set(
              generateId(accountTransfer.getProcessId()),
              accountTransfer,
              Duration.ofMillis(Instant.now().plus(90, ChronoUnit.DAYS).toEpochMilli())
            );
          }
        }
      );
    });
  }
}

 

그리고 이벤트를 발행한다(위 코드 주석 2번). 이벤트 발행이 성공하면 pendingEvents 리스트를 clear 시킨 후 accountTransfer 를 다시 db 에 저장한다. 이때는 kafka transaction 안에서 콜백을 걸어 db 에 저장하게 했다.

 

kafka transaction scope 에서 메세지를 발행하기 때문에 publish 한다고 바로 commit 된게 아니다. 그래서 발행이 확보된 후에 db 저장하기 위해 afterCommit 으로 훅을 걸었다. afterCommit 훅 없이 db 에 저장하게 만들면, 만약 commit 이 실패했는데 이미 clear 해버린 상황에서 발행도 안됐고 메세지도 유실된 채 db 에 저장되는 문제가 발생한다.

 

그래서 위와 같은 프로세스로 만들면 이벤트 발행이 실패해도 메세지 유실은 막을 수 있다. 즉 다시 보낼 기회가 있다. 물론 이벤트를 받는 컨슈머쪽에서는 같은 이벤트가 중복으로   있으니 중복 제거하는 프로세스를 갖춰야한다. 그래도 메세지 유실보다는 중복이 낫다고 판단했다.

 

Q) pendingEvents 에 있는것을 먼저 꺼내서 이벤트를 발행하고 clear 한 후에 db 저장하는 프로세스로 만들면?

A) 이벤트는 발행했는데 db 저장이 실패하게 되면 데이터 일관성이 깨지고 이벤트 데이터가 이미 clear 되서 메세지 유실 문제가 발생한다.

@Repository
public class AccountTransferRepository {
    
  public void save(AccountTransfer accountTransfer) {

    // 1. pending 에 있는것을 먼저 꺼냄
    List<DomainEvent> pendingEvents = accountTransfer.getPendingEvents();


    this.kafkaTransactionTemplate.executeWithoutResult(status -> {
      if (!pendingEvents.isEmpty()) {
        // 2. 이벤트 발행
        this.eventMessagePublisher.publish(pendingEvents);
        // 3. clear 작업
        accountTransfer.clearPendingEvents();
      }

      TransactionSynchronizationManager.registerSynchronization(
        new TransactionSynchronization() {
          @Override
          public void afterCommit() {
            // 4. 만약 db 저장이 실패한다면?? 이벤트는 이미 clear 해버림
            redisOperations.opsForValue().set(
              generateId(accountTransfer.getProcessId()),
              accountTransfer,
              Duration.ofMillis(Instant.now().plus(90, ChronoUnit.DAYS).toEpochMilli())
            );
          }
        }
      );
    });
  }
}

 

Q) pendingEvents 에 있는것을 꺼내고 clear 시킨 후 db 저장하는 프로세스로 만들면?

A) db 저장은 성공했는데 이벤트 발행이 실패하게 되면 마찬가지로 메세지가 유실 문제가 발생한다.

@Repository
public class AccountTransferRepository {
    
  public void save(AccountTransfer accountTransfer) {

    // 1. pending 에 있는것을 먼저 꺼냄
    List<DomainEvent> pendingEvents = accountTransfer.getPendingEvents();
    // 2. clear 작업
    accountTransfer.clearPendingEvents();

    // 3. db 저장
    redisOperations.opsForValue().set(
      generateId(accountTransfer.getProcessId()),
      accountTransfer,
      Duration.ofMillis(Instant.now().plus(90, ChronoUnit.DAYS).toEpochMilli())
    );
    
    // 4. 메세지 발행
    if (!pendingEvents.isEmpty()) {
      this.eventMessagePublisher.publish(pendingEvents);
    }    
  }
}

 

Transactional Outbox Pattern

Microservices Patterns 책에서 소개한 패턴이다. RDB 일때는 이 패턴을 활용해 transaction 문제를 해결할 수 있다.

이 패턴은 db table 을 임시 메세지 큐로써 사용한다. 아래 OrderService 에서 order table CUD 작업을 할때 OUTBOX table 에도 같이 insert 해준다. 한 transaction 안에서 이뤄지기 때문에 원자성이 보장된다. 그리고 MessageRelay 컴포넌트에서 OUTBOX table 을 읽어 메세지를 발행시킨다. 

출처 : microservices-pattern chapter3

 

MessageRelay 컴포넌트는 아직 발행되지 않은 OUTBOX row 를 주기적으로 select 해서 가져오고 Message broker(kafka) 에 발행한다. 그 뒤 발행된 OUTBOX row 들은 delete 시킨다. 간단한 방법이지만 db 를 얼마 주기로 polling 할건지 정해야 한다. db 를 자주 polling 하는것은 비용이 많이 들 수 있다. 그래서 이책에서는 아래 db transaction log 를 추적하는 방식이 더 나을 수 있다고 말한다.

 

OUTBOX table 에 insert 가 이뤄지면 transaction log(ex mysql binlog) 를 Transaction Log Miner 컴포넌트로 보낸다. 그리고 Transaction Log Miner 컴포넌트에서 Message broker(kafka) 에 발행한다. 이 방식은 위에서 설명한 방식보다 직접 개발하기 어렵다. db specific API 를 호출하는 low-level 코드 작업이 필요하다. 책에서는 Debezium, LinkedIn Databus, Eventuate Tram 등 이 방식을 지원하는 여러 오픈소스들을 소개한다. 

출처 : microservices-pattern chapter3

 

 

출처 : https://learning.oreilly.com/library/view/microservices-patterns/9781617294549/OEBPS/Text/03.html#ch03lev2sec15

반응형

'Spring' 카테고리의 다른 글

WebClient + Retry + CircuitBreaker  (1) 2022.09.24
WebClient 사용할때 주의 (8편)  (0) 2022.09.18
WebClient 사용할때 주의 (7편)  (0) 2022.03.07
ObjectMapper  (0) 2022.01.15
kafka transaction(exactly once semantic)  (0) 2022.01.09