EDA 에서 Event 중복 발행(동시성 이슈)
먼저 spring application 서버는 kafka 메세지를 consuming 하고 producing 하는것을 반복하는 구조다.
왜 Event 가 중복 발행됐는지를 이해하려면, Kafka 이벤트 발행과 DB 저장(redis) 트랜잭션 을 이해해야 한다. 메세지 유실을 막기위해 db update 를 두번하는데 그 과정이 이해 안가면 아래 흐름을 받아들이기 어렵다.
핵심은 9번(redis 에 update) 이 끝나기전, 10번에서 order 가 조회된다는 부분이다.
consume 타이밍을 제어할 수 없으므로 결국 reids db lock 이 필요하다. 9번(redis 에 update) 이 끝날 때까지 lock 을 잡고, 10번(order 조회) 을 기다리도록 해서 문제를 해결했다.
lock 기능은 spring 이 제공하는 RedisLockRegistry 를 이용했다. lock 을 잡고 푸는 메커니즘은 OBTAIN_LOCK_SCRIPT 에 있다. key 로 lock 을 잡는 명령이 있고 lock 을 잡기 위해 redis 에 data 를 write 한다. 그래서 다른 누군가가 같은 key 에 대해 lock 을 잡으려고 하는데 data 가 있는것을 보고 기다린다.
즉 redis 에 data 가 있냐없냐로 lock 을 판단한다. redis.call('GET', KEYS[1]) 로 data 를 확인하고 만약 데이터가 있으면(if lockClientId == ARGV[1] then) 기다리는데 무한정 기다릴 수 없으니 PEXPIRE 만큼만 기다린다. 만약 데이터가 없으면(elseif not lockClientId then) 데이터를 set 해준다(redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])).
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean {
private abstract class RedisLock implements Lock {
private static final String OBTAIN_LOCK_SCRIPT =
"local lockClientId = redis.call('GET', KEYS[1]) " +
"if lockClientId == ARGV[1] then " +
" redis.call('PEXPIRE', KEYS[1], ARGV[2]) " +
" return true " +
"elseif not lockClientId then " +
" redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2]) " +
" return true " +
"end " +
"return false";
private final RedisScript<Boolean> obtainLockRedisScript =
new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
진짜 Event 가 중복 발행된 이유
사실 위 그림은 kafka 를 뭉뚱그려 설명했다. 조금만 더 자세히 설명해보겠다. orderId 를 key 로 해서, 같은 order 는 같은 partition 으로 들어간다. 비지니스 프로세스는 이걸 컨슈밍해가지고 db 에서 조회하고 결과를 다시 카프카에 쏘는 구조다. 그러니까 같은 orderId 는 같은 partition 에서 줄 서있기 때문에 아까와 같은 접근이 사실성 거의 없다.
문제는 Event 를 consume 하는 consumer 들이 서로 다를때다.
그래서 RedisLockRegistry 의 lock 기능을 수정해야 한다. 먼저 Kafka exactly once semantic 글과 Kafka 이벤트 발행과 DB 저장(redis) 트랜잭션 글에서 자세히 설명한 kafka transaction afterCommit 에서 db update 를 수행하는데
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
redisOperations.opsForValue().set(
generateId(accountTransfer.getProcessId()),
accountTransfer,
Duration.ofMillis(Instant.now().plus(90, ChronoUnit.DAYS).toEpochMilli())
);
}
}
);
redis db unlock 은 afterCommit 이 끝나고 수행되어야 하므로 afterCompletion 에 추가해준다.
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCompletion(int status) {
lock.unlock();
}
});
마지막 해결부분 설명이 디테일하지 못하다. 하지만 이부분을 차근차근 설명하려면 회사 코드가 많이 노출될 위험이 있다. 우리 도메인 특성인 요소가 많기 때문이다. 아쉽지만 지켜야할건 지켜야해서 어쩔수 없다.