Kafka 이벤트 발행과 DB 저장(redis) 트랜잭션
EDA 환경에서 일반적으로 kafka 이벤트 발행과 db 저장은 한 transaction 으로 묶을 수 없다. 물론 RDB 를 사용한다면 뒤에 서술할 Transactional Outbox Pattern 을 활용할 수 있다. 하지만 내가 속한 프로젝트는 redis in-memory nosql db 를 사용했다. 그래서 다른 방식으로 transaction 문제를 해결했다.
cf) kafka transaction 에 대해서 더 자세히 알고 싶다면 이전 글에서 자세히 설명했다.
계좌이체 준비 예제
핵심부분만 추려서 코드를 재구성해, 계좌이체 준비 예제로 설명하려 한다. 먼저 아래 prepareAccountTransfer 메서드는 전달받은 processId와 userNo 를 갖고 AccountTransfer 객체를 만든다.
@Service
@RequiredArgsConstructor
public class AccountTransferService {
private final AccountTransferRepository accountTransferRepository;
public void prepareAccountTransfer(String processId, long userNo) {
AccountTransfer accountTransfer = AccountTransfer.prepareAccountTransfer(processId, userNo);
accountTransferRepository.save(accountTransfer);
}
}
AccountTransfer 객체는 processId, userNo 필드 뿐만아니라 생성된 AccountTransferPrepared 이벤트 객체를 담고 있는 pendingEvents 리스트도 갖고있다. 아래 prepareAccountTransfer 메서드는 AccountTransferPrepared 이벤트 객체를 생성하고 pendingEvents 에 담는다.
AccountTransferPrepared 이벤트 뿐만 아니라 '계좌이체 실행' 이벤트 등 다양한 이벤트들을 같이 처리하기 위해 pendAndApplyEvent 메서드는 T 타입으로 받았다. AccountTransfer 객체가 pendingEvents 리스트를 갖고 있는 이유는 뒤에서 설명하겠다. cf) @JsonTypeInfo 나 @JsonSubTypes 내용은 이전 objectMapper 글에서 자세히 설명했다.
@Getter
public class AccountTransfer {
@NotBlank
private String processId;
private long userNo;
@JsonTypeInfo(use = Id.NAME, include = As.WRAPPER_OBJECT)
private List<DomainEvent> pendingEvents = new ArrayList<>();
public static AccountTransfer prepareAccountTransfer(String processId, long userNo) {
AccountTransfer accountTransfer = new AccountTransfer();
accountTransfer.pendAndApplyEvent(
AccountTransferPrepared.builder()
.processId(processId)
.userNo(userNo)
.build(),
accountTransfer::apply
);
return accountTransfer;
}
@JsonSubTypes({
@Type(value = AccountTransferPrepared.class, name = "AccountTransferPrepared")
})
protected void setPendingEvents(List<DomainEvent> pendingEvents) {
this.pendingEvents = pendingEvents;
}
private <T extends DomainEvent> void pendAndApplyEvent(T event, Consumer<T> apply) {
this.pendingEvents.add(event);
apply.accept(event);
}
private void apply(AccountTransferPrepared accountTransferPrepared) {
this.processId = accountTransferPrepared.getProcessId();
this.userNo = accountTransferPrepared.getUserNo();
}
}
@Getter
@Setter
@Builder
public class AccountTransferPrepared implements DomainEvent {
@NotBlank
private String processId;
private long userNo;
}
이렇게 해서 얻어진 accountTransfer 를 갖고 accountTransferRepository.save(accountTransfer) 를 수행한다. save 메서드에서는 먼저 accountTransfer 를 redis db 에 저장한다. 저 시점에서 accountTransfer 객체는 pending 된 이벤트를 갖고 있다. 그래서 이벤트 데이터가 db 에 같이 저장된다. 이렇게 먼저 db에 저장하는 이유는 메세지 유실을 막기위해서다.
@Repository
public class AccountTransferRepository {
private final RedisOperations<String, AccountTransfer> redisOperations;
private final EventPublisher eventPublisher;
private final TransactionTemplate kafkaTransactionTemplate;
public AccountTransferRepository(
RedisOperations<String, AccountTransfer> redisOperations,
EventPublisher eventPublisher,
KafkaAwareTransactionManager<byte[], byte[]> kafkaAwareTransactionManager
) {
this.redisOperations = redisOperations;
this.eventPublisher = eventPublisher;
this.kafkaTransactionTemplate = new TransactionTemplate(kafkaAwareTransactionManager);
}
public void save(AccountTransfer accountTransfer) {
// 1. accountTransfer 에 pending 된것을 가진 상태로 redis db 저장
this.redisOperations.opsForValue().set(
generateId(accountTransfer.getProcessId()),
accountTransfer,
Duration.ofMillis(Instant.now().plus(90, ChronoUnit.DAYS).toEpochMilli())
);
this.kafkaTransactionTemplate.executeWithoutResult(status -> {
if (!pendingEvents.isEmpty()) {
// 2. pending 되어있는거 꺼내서 이벤트 발행
this.eventMessagePublisher.publish(pendingEvents);
// // 3. pending 되어있는것들을 clear 시킴
accountTransfer.clearPendingEvents();
}
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
// 4. kafka transaction 안에서 commit 콜백 걸어서 redis db 저장
// pending 된게 남아있지 않은 accountTransfer 를 다시 저장
redisOperations.opsForValue().set(
generateId(accountTransfer.getProcessId()),
accountTransfer,
Duration.ofMillis(Instant.now().plus(90, ChronoUnit.DAYS).toEpochMilli())
);
}
}
);
});
}
}
그리고 이벤트를 발행한다(위 코드 주석 2번). 이벤트 발행이 성공하면 pendingEvents 리스트를 clear 시킨 후 accountTransfer 를 다시 db 에 저장한다. 이때는 kafka transaction 안에서 콜백을 걸어 db 에 저장하게 했다.
kafka transaction scope 에서 메세지를 발행하기 때문에 publish 한다고 바로 commit 된게 아니다. 그래서 발행이 확보된 후에 db 저장하기 위해 afterCommit 으로 훅을 걸었다. afterCommit 훅 없이 db 에 저장하게 만들면, 만약 commit 이 실패했는데 이미 clear 해버린 상황에서 발행도 안됐고 메세지도 유실된 채 db 에 저장되는 문제가 발생한다.
그래서 위와 같은 프로세스로 만들면 이벤트 발행이 실패해도 메세지 유실은 막을 수 있다. 즉 다시 보낼 기회가 있다. 물론 이벤트를 받는 컨슈머쪽에서는 같은 이벤트가 중복으로 올 수 있으니 중복 제거하는 프로세스를 갖춰야한다. 그래도 메세지 유실보다는 중복이 더 낫다고 판단했다.
Q) pendingEvents 에 있는것을 먼저 꺼내서 이벤트를 발행하고 clear 한 후에 db 저장하는 프로세스로 만들면?
A) 이벤트는 발행했는데 db 저장이 실패하게 되면 데이터 일관성이 깨지고 이벤트 데이터가 이미 clear 되서 메세지 유실 문제가 발생한다.
@Repository
public class AccountTransferRepository {
public void save(AccountTransfer accountTransfer) {
// 1. pending 에 있는것을 먼저 꺼냄
List<DomainEvent> pendingEvents = accountTransfer.getPendingEvents();
this.kafkaTransactionTemplate.executeWithoutResult(status -> {
if (!pendingEvents.isEmpty()) {
// 2. 이벤트 발행
this.eventMessagePublisher.publish(pendingEvents);
// 3. clear 작업
accountTransfer.clearPendingEvents();
}
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
// 4. 만약 db 저장이 실패한다면?? 이벤트는 이미 clear 해버림
redisOperations.opsForValue().set(
generateId(accountTransfer.getProcessId()),
accountTransfer,
Duration.ofMillis(Instant.now().plus(90, ChronoUnit.DAYS).toEpochMilli())
);
}
}
);
});
}
}
Q) pendingEvents 에 있는것을 꺼내고 clear 시킨 후 db 저장하는 프로세스로 만들면?
A) db 저장은 성공했는데 이벤트 발행이 실패하게 되면 마찬가지로 메세지가 유실 문제가 발생한다.
@Repository
public class AccountTransferRepository {
public void save(AccountTransfer accountTransfer) {
// 1. pending 에 있는것을 먼저 꺼냄
List<DomainEvent> pendingEvents = accountTransfer.getPendingEvents();
// 2. clear 작업
accountTransfer.clearPendingEvents();
// 3. db 저장
redisOperations.opsForValue().set(
generateId(accountTransfer.getProcessId()),
accountTransfer,
Duration.ofMillis(Instant.now().plus(90, ChronoUnit.DAYS).toEpochMilli())
);
// 4. 메세지 발행
if (!pendingEvents.isEmpty()) {
this.eventMessagePublisher.publish(pendingEvents);
}
}
}
Transactional Outbox Pattern
Microservices Patterns 책에서 소개한 패턴이다. RDB 일때는 이 패턴을 활용해 transaction 문제를 해결할 수 있다.
이 패턴은 db table 을 임시 메세지 큐로써 사용한다. 아래 OrderService 에서 order table CUD 작업을 할때 OUTBOX table 에도 같이 insert 해준다. 한 transaction 안에서 이뤄지기 때문에 원자성이 보장된다. 그리고 MessageRelay 컴포넌트에서 OUTBOX table 을 읽어 메세지를 발행시킨다.
MessageRelay 컴포넌트는 아직 발행되지 않은 OUTBOX row 를 주기적으로 select 해서 가져오고 Message broker(kafka) 에 발행한다. 그 뒤 발행된 OUTBOX row 들은 delete 시킨다. 간단한 방법이지만 db 를 얼마 주기로 polling 할건지 정해야 한다. db 를 자주 polling 하는것은 비용이 많이 들 수 있다. 그래서 이책에서는 아래 db transaction log 를 추적하는 방식이 더 나을 수 있다고 말한다.
OUTBOX table 에 insert 가 이뤄지면 transaction log(ex mysql binlog) 를 Transaction Log Miner 컴포넌트로 보낸다. 그리고 Transaction Log Miner 컴포넌트에서 Message broker(kafka) 에 발행한다. 이 방식은 위에서 설명한 방식보다 직접 개발하기 어렵다. db specific API 를 호출하는 low-level 코드 작업이 필요하다. 책에서는 Debezium, LinkedIn Databus, Eventuate Tram 등 이 방식을 지원하는 여러 오픈소스들을 소개한다.