spring application 내부에서 메세지를 직접 주고받지 않고, event 기반으로 kafka 를 통해 메세지를 주고받는다면 NullPointerException 을 특히 신경써야한다.
아래 그림은, onStartOrder 에서 주문이 시작되서 포인트적립 Command message를 kafka 에 발행하고 onReservePoint 에서 포인트적립 Command message 를 받아서 처리하는 프로세스다.
포인트적립 Command message 에는 pointReserveDto 가 담겨있다. 스펙이 변경되서 새로운 필드가 필요해졌다. 그래서 newObject 를 만들어서 추가로 전달해주고, 포인트적립을 수행하는 onReservePoint 에서 get 해서 사용하게 수정하고 배포했다.
여기서 문제가 생긴다. kafka 에 있던 기존 포인트적립 Command message 의 pointReserveDto 에는 newObject 가 없기 때문이다. spring application 은 새로운 버전으로 배포됐지만 기존 버전 meesage 를 consume 하고 pointReserveDto.getNewObject().getData() 코드를 수행하게 되니 NullPointerException 이 발생한다.
코드로 자세한 설명을 하기위해 전체적인 프로세스를 그렸다. 작업 순서와 코드를 같이 보면 좋다.
cf) 5번 6번에 대한 코드는 생략했다. persist 까지 다루면 얘기가 너무 길어진다.
첫 시작은 kafkaListener 다. kafka 로부터 StartOrder command message 를 받으면 orderService 로 작업을 위임한다.
@Configuration
@RequiredArgsConstructor
public class KafkaListener {
@Component
@RequiredArgsConstructor
public static class KafkaCommandHandler {
private final OrderService orderService;
public void onStartOrder(StartOrder startOrder) {
this.orderService.startOrder(startOrder);
}
}
}
orderService.startOrder 에서 PointReserveDto 를 생성하는데, 스펙 변경으로 NewObject 도 만들어서 넣어준다. 그리고 orderProcessor 로 전달한다.
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderProcessor orderProcessor;
public void startOrder(StartOrder startOrder) {
String orderId = startOrder.getOrderId();
// 스펙 변경으로 NewObject 생성
PointReserveDto pointReserveDto = PointReserveDto.builder()
.newObject(
NewObject.builder()
.data("newData")
.build()
)
.build();
orderProcessor.processStartOrder(orderId, pointReserveDto);
}
}
orderProcessor 에서 비지니스 로직을 수행한다. 먼저 StartOrder command 로부터 전달받은 orderId 를 갖고 db를 조회해서 Order 객체를 얻는다. 그리고 Order 객체를 활용해 본격적인 StartOrder 관련 작업을 수행한다. 다 끝났으면 마지막으로 StartOrder 가 끝났다는 event 를 발행하기 위해 order.applyOrderStarted 를 호출한다.
cf) NullPointerException 이 발생하는 과정을 설명하는게 주 목적이기 때문에 설명없이 넘어가지만, order 에서 event 및 command 발행을 수행하는 구조가 EDA 에서 중요한 포인트 중 하나다.
@Component
public class OrderProcessor {
public void processStartOrder(String orderId, PointReserveDto pointReserveDto) {
Order order = getOrderById(orderId);
// StartOrder 관련 작업 수행
order.applyOrderStarted(pointReserveDto);
}
// 실제론 DB 조회해서 Order 얻어와야함
private Order getOrderById(String orderId) {
return Order.builder().id(orderId).build();
}
}
다음으로 아래 코드를 보자. order.applyOrderStarted 는 전달받은 pointReserveDto 를 OrderStarted event 에 담고 pendingEvents list 에 추가한다. 마지막으로 apply 를 통해 Order 객체의 pointReserveDto 필드를 채운다.
그다음 ReservePoint command 를 생성해서 pointReserveDto 를 담고 마찬가지로 pendingCommands list 에 추가한다.
남은 작업은 pendingEvents 와 pendingCommands 에 있는 메세지들을 kafka 로 발행하고 order 를 db 에 저장하는것이다. 그림 5번, 6번에 대한 내용이고 관련 코드는 생략한다.
@Getter
@Builder
public class Order {
String id;
@Nullable
PointReserveDto pointReserveDto;
List<Event> pendingEvents = new ArrayList<>();
List<Command> pendingCommands = new ArrayList<>();
public void applyOrderStarted(PointReserveDto pointReserveDto) {
this.pendAndApplyEvent(
OrderStarted.builder()
.orderId(this.getId())
.pointReserveDto(pointReserveDto)
.build(),
this::apply
);
this.pendingCommands.add(
ReservePoint.builder()
.orderId(this.getId())
.pointReserveDto(this.pointReserveDto)
.build()
);
}
private <T extends Event> void pendAndApplyEvent(T event, Consumer<T> apply) {
this.pendingEvents.add(event);
apply.accept(event);
}
private void apply(OrderStarted orderStarted) {
this.pointReserveDto = orderStarted.getPointReserveDto();
}
}
위에서 ReservePoint command 를 발행했기 때문에 아래 KafkaCommandHandler 가 onReservePoint 를 통해 다시 consume 한다.
@Configuration
@RequiredArgsConstructor
public class KafkaListener {
@Component
@RequiredArgsConstructor
public static class KafkaCommandHandler {
private final OrderService orderService;
public void onReservePoint(ReservePoint reservePoint) {
this.orderService.reservePoint(reservePoint);
}
}
}
이전 startOrder 프로세스와 비슷하게 흘러가다, orderProcessor 에서 전달받은 pointReserveDto 의 newObject.data 값을 꺼내려다 NullPointerException 을 만나게 된다.
@Component
public class OrderProcessor {
public void processReservePoint(String orderId, PointReserveDto pointReserveDto) {
Order order = getOrderById(orderId);
// 관련 작업 수행
// Fixme: NPE 발생 !! pointReserveSource.getNewObject() 가 Null
String newData = pointReserveDto.getNewObject().getData();
}
// 실제론 DB 조회해서 Order 얻어와야함
private Order getOrderById(String orderId) {
return Order.builder().id(orderId).build();
}
}
'Spring' 카테고리의 다른 글
EDA 에서 Event 중복 발행(동시성 이슈) (0) | 2023.07.22 |
---|---|
WebClient 에서 raw response body 로깅 (0) | 2023.07.15 |
WebClient 인코딩 중복 이슈 (0) | 2023.03.13 |
WebClient + Retry + CircuitBreaker (1) | 2022.09.24 |
WebClient 사용할때 주의 (8편) (0) | 2022.09.18 |