본문 바로가기

Spring34

Kafka consumer 와 partition 개수 증가시키기 사용자 트래픽이 높아짐에 따라, 특정 시간대에 Lag 이 쌓이면서 처리 지연이 발생했다. 그래서 consumer 와 partition 개수를 증가시키기로 했고 운영중인 상황에서 증가시키는 순서를 정리했다. 먼저 우리 시스템은 1:1:1 구조로 되어 있다. ex) consumer 서버 5대, producer 서버 5대, topic 의 partion 개수 5개 이제 Kafka consumer 와 partition 개수를 각각 2배씩 증가 시키려고 한다. 먼저 consumer concurrency 를 2로 올린다. 각 서버당 2니까 consumer 는 총 10이 된다. [아파치 카프카 애플리케이션 프로그래밍 p217] partition 을 여러 개로 운영하는 경우 데이터를 병렬처리하기 위해서 partition.. 2023. 11. 11.
WebClient "Only one connection receive subscriber allowed" 에러 아직 정확한 원인은 파악되지 않았고, 기록차원으로 작성했다. WebClient 에서 간헐적으로 "Only one connection receive subscriber allowed" 에러가 계속 발생하는것을 경험했다. 엄밀히 말하면 reactor-netty 에서 에러가 발생한다. Exception:java.lang.IllegalStateException: Only one connection receive subscriber allowed. at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:182) 샘플 프로젝트를 만들고 에러를 재현해서 reactor-netty 에 문의 해봤는데 Spring Framework issue 라는 답변만 줬.. 2023. 8. 15.
EDA 에서 Event 중복 발행(동시성 이슈) 먼저 spring application 서버는 kafka 메세지를 consuming 하고 producing 하는것을 반복하는 구조다. 왜 Event 가 중복 발행됐는지를 이해하려면, Kafka 이벤트 발행과 DB 저장(redis) 트랜잭션 을 이해해야 한다. 메세지 유실을 막기위해 db update 를 두번하는데 그 과정이 이해 안가면 아래 흐름을 받아들이기 어렵다. 핵심은 9번(redis 에 update) 이 끝나기전, 10번에서 order 가 조회된다는 부분이다. consume 타이밍을 제어할 수 없으므로 결국 reids db lock 이 필요하다. 9번(redis 에 update) 이 끝날 때까지 lock 을 잡고, 10번(order 조회) 을 기다리도록 해서 문제를 해결했다. lock 기능은 s.. 2023. 7. 22.
WebClient 에서 raw response body 로깅 모니터링 부서에서 특정 webClient api 요청과 응답 로그를 찍어주길 희망했다. 해당 webClient 는 retrieve 를 사용하고 있었기 때문에 doOnSuccess 와 onErrorMap 에 성공/실패 로그를 남겼다. cf) 응답 response body 는 xml 로 전달해주는 api 다. this.webClient.get() .uri(uri) .retrieve() .bodyToMono(YbsResponse.class) .doOnSuccess(ybsResponse -> { log.info(시간, uri, ybsResponse, 성공메세지) }) .onErrorMap(throwable -> { log.info(시간, uri, throwable.getMessage(), 실패메세지) }) ... 2023. 7. 15.
EDA 에서 NullPointerException 조심하기 spring application 내부에서 메세지를 직접 주고받지 않고, event 기반으로 kafka 를 통해 메세지를 주고받는다면 NullPointerException 을 특히 신경써야한다. 아래 그림은, onStartOrder 에서 주문이 시작되서 포인트적립 Command message를 kafka 에 발행하고 onReservePoint 에서 포인트적립 Command message 를 받아서 처리하는 프로세스다. 포인트적립 Command message 에는 pointReserveDto 가 담겨있다. 스펙이 변경되서 새로운 필드가 필요해졌다. 그래서 newObject 를 만들어서 추가로 전달해주고, 포인트적립을 수행하는 onReservePoint 에서 get 해서 사용하게 수정하고 배포했다. 여기서 .. 2023. 4. 15.
WebClient 인코딩 중복 이슈 이전글 에서 WebClient 인코딩 중복에 대해서 한번 다룬적 있다. 간단히 요약하면, uriBuilder 를 이용해서 만들면 이미 인코딩된 query string 값을 또 인코딩 해서 이중 인코딩 한 결과가 나온다. 그래서 미리 인코딩 할 필요가 없다. // 아래처럼 만들면, 최종 request 는 이중 인코딩된 결과로 보내진다. this.webClient .get() .uri(uriBuilder -> uriBuilder .path("/api") //.query("name=양봉수") .query("name=%EC%96%91%EB%B4%89%EC%88%98") .build() ) .retrieve() 그런데 query string 이 json 일때는 미리 인코딩 해야만 한다. 안하면 에러가 발생한다... 2023. 3. 13.