본문 바로가기
Spring

EDA 에서 Event 중복 발행(동시성 이슈)

by ybs 2023. 7. 22.
반응형

먼저 spring application 서버는 kafka 메세지를 consuming 하고 producing 하는것을 반복하는 구조다.

 

왜 Event 가 중복 발행됐는지를 이해하려면, Kafka 이벤트 발행과 DB 저장(redis) 트랜잭션 을 이해해야 한다. 메세지 유실을 막기위해 db update 를 두번하는데 그 과정이 이해 안가면 아래 흐름을 받아들이기 어렵다.

 

핵심은 9번(redis 에 update) 이 끝나기전, 10번에서 order 가 조회된다는 부분이다.

 

consume 타이밍을 제어할 수 없으므로 결국 reids db lock 이 필요하다. 9번(redis 에 update) 이 끝날 때까지 lock 을 잡고, 10번(order 조회) 을 기다리도록 해서 문제를 해결했다.

 

 

lock 기능은 spring 이 제공하는 RedisLockRegistry 를 이용했다. lock 을 잡고 푸는 메커니즘은 OBTAIN_LOCK_SCRIPT 에 있다. key 로 lock 을 잡는 명령이 있고 lock 을 잡기 위해 redis 에 data 를 write 한다. 그래서 다른 누군가가 같은 key 에 대해 lock 을 잡으려고 하는데 data 가 있는것을 보고 기다린다.

 

즉 redis 에 data 가 있냐없냐로 lock 을 판단한다. redis.call('GET', KEYS[1]) 로 data 를 확인하고 만약 데이터가 있으면(if lockClientId == ARGV[1] then) 기다리는데 무한정 기다릴 수 없으니 PEXPIRE 만큼만 기다린다. 만약 데이터가 없으면(elseif not lockClientId then) 데이터를 set 해준다(redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])).

public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean {

private abstract class RedisLock implements Lock {

	private static final String OBTAIN_LOCK_SCRIPT =
			"local lockClientId = redis.call('GET', KEYS[1]) " +
					"if lockClientId == ARGV[1] then " +
					"  redis.call('PEXPIRE', KEYS[1], ARGV[2]) " +
					"  return true " +
					"elseif not lockClientId then " +
					"  redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2]) " +
					"  return true " +
					"end " +
					"return false";

	private final RedisScript<Boolean> obtainLockRedisScript =
			new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);

 

 

진짜 Event 가 중복 발행된 이유

사실 위 그림은 kafka 를 뭉뚱그려 설명했다. 조금만 더 자세히 설명해보겠다. orderId 를 key 로 해서, 같은 order 는 같은 partition 으로 들어간다. 비지니스 프로세스는 이걸 컨슈밍해가지고 db 에서 조회하고 결과를 다시 카프카에 쏘는 구조다. 그러니까 같은 orderId 는 같은 partition 에서 줄 서있기 때문에 아까와 같은 접근이 사실성 거의 없다.

 

문제는 Event 를 consume 하는 consumer 들이 서로 다를때다.

 

그래서 RedisLockRegistry 의 lock 기능을 수정해야 한다. 먼저 Kafka exactly once semantic 글과 Kafka 이벤트 발행과 DB 저장(redis) 트랜잭션 글에서 자세히 설명한 kafka transaction afterCommit 에서 db update 를 수행하는데

TransactionSynchronizationManager.registerSynchronization(
  new TransactionSynchronization() {
    @Override
    public void afterCommit() {
      redisOperations.opsForValue().set(
        generateId(accountTransfer.getProcessId()),
        accountTransfer,
        Duration.ofMillis(Instant.now().plus(90, ChronoUnit.DAYS).toEpochMilli())
      );
    }
  }
);

 

redis db unlock 은 afterCommit 이 끝나고 수행되어야 하므로 afterCompletion 에 추가해준다.

TransactionSynchronizationManager.registerSynchronization(
  new TransactionSynchronization() {
    @Override
    public void afterCompletion(int status) {
        lock.unlock();
    }
});

 

마지막 해결부분 설명이 디테일하지 못하다. 하지만 이부분을 차근차근 설명하려면 회사 코드가 많이 노출될 위험이 있다. 우리 도메인 특성인 요소가 많기 때문이다. 아쉽지만 지켜야할건 지켜야해서 어쩔수 없다.

반응형