Spring

kafka transaction(exactly once semantic)

ybs 2022. 1. 9. 17:46
반응형

kafka transaction 설명에 앞서 kafka 통신을 위한 기본적인 코드부터 설명하려 한다. 테스트 코드에서 EmbeddedKafkaBroker 를 활용해 broker 역할을 대신했다. 그리고 json 데이터를 보내기 위해 objectMapper 를 생성했다. 마지막으로 데이터 전달을 위해 Product  객체를 생성했다.

public class KafkaTest {
  static EmbeddedKafkaBroker BROKER;
  static final String TOPIC_NAME = "ybs-topic";
  ObjectMapper objectMapper = JsonMapper.builder().build();

  static {
    // broker 갯수 1개, 파티션 갯수 1개
    BROKER = new EmbeddedKafkaBroker(1, true, 1)
      .kafkaPorts(9092)
      .brokerProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), (short)1)
      .brokerProperty("transaction.state.log.replication.factor", (short)1)
      .brokerProperty("transaction.state.log.min.isr", 1);
    ROKER.afterPropertiesSet();
  }

  @Getter
  @ToString
  private static class Product {
    private String name;
    private int price;

    @ConstructorProperties({"name", "price"})
    public Product(String name, int price) {
      this.name = name;
      this.price = price;
    }
  }
}

 

다음으로 kafka transaction 개념부터 살펴보자. kafka transaction 은 consuming 한 것에 대해 다시 producing 할 때, 하나로 묶는 역할로써 의미가 있다.

 

아래 그림에서 Service1 이 produce 한 메세지를 Service2 가 consume 하고, 그 메세지를 갖고 추가 작업을 수행한 Service3 에서 다시 producing 했을 때, producing 이 성공해야만 이전에 consuming 한것도 offset 이 성공했다고 찍힌다(ack가).

 

다시말해 producing 성공과 consume ack offset 이 transaction 으로 묶여, producing 이 실패하면 offset 도 성공했다고 안찍히기 때문에 다시 consming 을 하게 된다.

 

cf) Service2 에서 Service3 으로 delegate 하는 것은 같은 애플리케이션 내 메서드 이동을 생각하고 표현했다. 만약 Service3 이 외부 모듈이고 네트워크를 타는 API가 필요하면 실패 했을 때 고려해야 될게 더 많아진다.

 

이제 kafka producer, consumer property 설정부터 살펴보자. 모든 설정을 언급하진 않고 몇개만 골라서 설명하려 한다.

@Test
void kafkaExactlyOnceSemanticTest() throws JsonProcessingException {
  /*
   * producer
   */
  Properties producerProps = new Properties();
  producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER.getBrokersAsString());
  producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

  // producer transaction 설정
  producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString());
  producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

  /*
   * consumer
   */
  Properties consumerProps = new Properties();
  consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER.getBrokersAsString());
  consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

  // 같은 그룹의 consumer 들은 offset 공유하므로 같은 메세지를 받지 않음
  consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-consumer-group");

  // latest : consumer group에 새로운 consumer가 붙으면, 새로운 consumer 붙은 순간부터 읽겠다
  // earliest : consumer group에 새로운 consumer가 붙으면, topic 에 해당하는 것들을 처음부터 다 읽겠다.
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

  // 필수값은 아닌데, 문제 생겼을 때 디버깅할 때 좋음. 다른 컨슈버들간 구분하기 좋음.
  consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());

  // consumer transaction 설정
  // default : true
  consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 

  // read_committed: 커밋된 데이터만 읽는다.
  // default : read_uncommitted
  consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); 
}

 

먼저 ENABLE_AUTO_COMMIT_CONFIG 설정을 좀 자세히 살펴보자. ENABLE_AUTO_COMMIT_CONFIG 이 false 면 consume 을 한 후에 비지니스 처리 하고 commit 을 직접 찍어줘야한다. 만약 true 면 자동으로 offset에 commit을 시킨다. 하지만 위험하다. 만약 작업 중간에 에러가 발생하면 그 때의 offset 부터 다시 읽어야 하는데 자동적으로 커밋을 해버리니까 다음콜을 했을 때 이미 이전꺼는 commit 이 되버렸기 때문에 메세지가 유실될 위험이 있다. 그래서 직접 커밋을 제어하는 방향이 안전하다. 유실보다는 중복이 낫다.

consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

 

다음으로 kafka transaction 을 위해 필요한 설정들을 살펴보자. producer 에서 필요한 설정은 두개다. 첫째,  TRANSACTIONAL_ID_CONFIG 는 producer transaction 을 구분하기 위한 id 이고 여기선 UUID 를 사용했다.

// producer transaction 설정
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString());

 

둘째, ENABLE_IDEMPOTENCE_CONFIG 는 true 로 지정한다. produce send 해서 메세지가 kafka broker 에 들어는 갔는데 timeout 이나 broker failure 로 실패응답을 받았다고 해보자. 그러면 producer 는 같은 메세지를 재발행 한다. producer 입장에서는 실패 응답을 받았으므로 메세지가 kafka broker 에 들어갔는지 안들어갔는지 알 수가 없기 때문이다. consumer 입장에서는 중복 발행이 된거기 때문에 똑같은 메세지를 2번 받게 된다. 코드상으로는 한번만 send 했지만 내부 구조상 재발행 하게 되어있다. 여기서 ENABLE_IDEMPOTENCE_CONFIG 가 true 로 되어 있으면, 재발행되서 두번째 메세지가 갔을 때 같은 메세지인지 비교한다. 그리고 두번째로 온 메세지가 이전과 같은 메세지라면 버린다.

// producer transaction 설정
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

 

consumer 에서 필요한 설정도 두개다. ENABLE_AUTO_COMMIT_CONFIG 는 위에서 설명했다.

ISOLATION_LEVEL_CONFIG 는 read_committed 으로 설정해, producer 가 broker 로 보낸 데이터 중 transaction 이 완벽하게 완료된(커밋된) 데이터에 대해서만 읽을 수 있다. 만약 read_committed 가 아니면, 메세지는 보내졌는데 커밋이 실패했을 때 consumer poll 했을 때 메세지를 읽어오게 된다. 이거는 transaction 을 사용하는 의도와 맞지 않기 때문에 read_committed 를 해줘야 poll 할 때, 이미 나가버린 메세지가 커밋이 안되 있으면 안읽고 skip 시킨다.

// consumer transaction 설정
// default : true
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 

// read_committed: 커밋된 데이터만 읽는다.
// default : read_uncommitted
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

 

이제 kafka transaction 작업한 코드를 보자. 먼저 consumer 객체를 만들어서 while 문 안에서 poll 작업을 수행한다. 그런데 transaction 시나리오 상 consume 하고 다시 produce 해야하므로, consume 이 동작하기 위한 최초의 produce 작업이 필요하다. 그래서 flag 값으로 한번만 수행되는 produce 작업을 만들었다.

 

그 후로는 produce 된 메세지를 consumer 가 읽어 records.forEach 메서드를 통해 다시 producing 하는 순환구조다. 그리고 initTransactions 메서드로 transaction 처리 준비를 하고 send 전  beginTransaction 메서드, send 후  sendOffsetsToTransaction, commitTransaction 메서드를 수행한다. 

 

cf) 실제로 만들때는 produce 작업과 consume 작업이 분리시킨다. 그래서 한곳에서 같이 쓸일은 없지만 transation 테스트를 위해 만들었다. 위의 그림에서 Service1, Service2, Service3 역할을 한번에 다 하기 때문에 코드가 복잡하다.

try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {

  consumer.subscribe(Collections.singletonList(TOPIC_NAME));
  // cf) consumer.assign() 으로 consume 할 특정 토픽 특정 파티션을 직접 지정할 수 있다.

  boolean flag = true;
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

    Producer<String, String> producer = new KafkaProducer<>(producerProps);
    producer.initTransactions();
    producer.beginTransaction();

    // 트랜잭션 시나리오는 consumer 부터 시작이므로, 최초 한번 producing 작업이 필요하다.
    if (flag) {
      producer.send(makeProducerRecord("toy", 1000));
      producer.flush();
      flag = false;
    }

    records.forEach(rec -> {
      try {
        UUID key = this.objectMapper.readValue(rec.key(), UUID.class);
        Product value = this.objectMapper.readValue(rec.value(), Product.class);
        int partition = rec.partition();
        System.out.println("key : " + key);
        System.out.println("value : " + value);
        System.out.println("partition : " + partition);

        producer.send(makeProducerRecord(value.getName(), value.getPrice() + 1));

      } catch (IOException ex) {
        throw new RuntimeException(ex);
      }
    });

    producer.sendOffsetsToTransaction(
      Collections.singletonMap(new TopicPartition(TOPIC_NAME, 0), new OffsetAndMetadata(1)),
        new ConsumerGroupMetadata("group")
      );

    producer.commitTransaction(); // 커밋하면 자동으로 flush 해주게 됌
  }
}

private ProducerRecord<String, String> makeProducerRecord(String name, int price) throws JsonProcessingException {
  return new ProducerRecord<>(
    TOPIC_NAME,
    0,
    Instant.now().toEpochMilli(),
    this.objectMapper.writeValueAsString(UUID.randomUUID()),
    this.objectMapper.writeValueAsString(new Product(name, price))
  );
}

 

 

결과는 아래와 같이 출력되 나온다.

key : f843dfec-1fcc-4be2-a57c-93d54e490c7f
value : Product{name='toy', price=1000}
partition : 0
key : 5a0e4619-8d21-4960-996c-35611a7d5314
value : Product{name='toy', price=1001}
partition : 0
key : 37aa39f9-1eb7-41ea-8c7d-2ff714b9570a
value : Product{name='toy', price=1002}
partition : 0
key : 76d4affd-c0d0-4e40-a55a-13ec86cd0462
value : Product{name='toy', price=1003}
partition : 0

 

cf ) 추가한 의존성.

dependencies {

    implementation("org.springframework.kafka:spring-kafka:2.8.1")
    implementation("com.fasterxml.jackson.core:jackson-databind:2.13.1")

    testImplementation("org.springframework.kafka:spring-kafka:2.8.1")
    testImplementation("org.springframework.kafka:spring-kafka-test:2.8.1")

    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
}

 

반응형