본문 바로가기
Spring

EDA 에서 NullPointerException 조심하기

by ybs 2023. 4. 15.
반응형

spring application 내부에서 메세지를 직접 주고받지 않고, event 기반으로 kafka 를 통해 메세지를 주고받는다면 NullPointerException 을 특히 신경써야한다.

 

아래 그림은, onStartOrder 에서 주문이 시작되서 포인트적립 Command message를 kafka 에 발행하고 onReservePoint 에서 포인트적립 Command message 를 받아서 처리하는 프로세스다.

 

포인트적립 Command message 에는 pointReserveDto 가 담겨있다. 스펙이 변경되서 새로운 필드가 필요해졌다. 그래서 newObject 를 만들어서 추가로 전달해주고, 포인트적립을 수행하는 onReservePoint 에서 get 해서 사용하게 수정하고 배포했다.

 

여기서 문제가 생긴다. kafka 에 있던 기존 포인트적립 Command message 의 pointReserveDto 에는 newObject 가 없기 때문이다. spring application 은 새로운 버전으로 배포됐지만 기존 버전 meesage 를 consume 하고 pointReserveDto.getNewObject().getData() 코드를 수행하게 되니 NullPointerException 이 발생한다.

 

코드로 자세한 설명을 하기위해 전체적인 프로세스를 그렸다. 작업 순서와 코드를 같이 보면 좋다.

cf) 5번 6번에 대한 코드는 생략했다. persist 까지 다루면 얘기가 너무 길어진다.

 

첫 시작은 kafkaListener 다. kafka 로부터 StartOrder command message 를 받으면 orderService 로 작업을 위임한다.

@Configuration
@RequiredArgsConstructor
public class KafkaListener {

	@Component
	@RequiredArgsConstructor
	public static class KafkaCommandHandler {
		private final OrderService orderService;

		public void onStartOrder(StartOrder startOrder) {
			this.orderService.startOrder(startOrder);
		}
	}
}

 

orderService.startOrder 에서 PointReserveDto 를 생성하는데, 스펙 변경으로 NewObject 도 만들어서 넣어준다. 그리고 orderProcessor 로 전달한다.

@Service
@RequiredArgsConstructor
public class OrderService {

	private final OrderProcessor orderProcessor;

	public void startOrder(StartOrder startOrder) {
		String orderId = startOrder.getOrderId();

		// 스펙 변경으로 NewObject 생성
		PointReserveDto pointReserveDto = PointReserveDto.builder()
			.newObject(
				NewObject.builder()
					.data("newData")
					.build()
			)
			.build();

		orderProcessor.processStartOrder(orderId, pointReserveDto);
	}
}

 

 

orderProcessor 에서 비지니스 로직을 수행한다. 먼저 StartOrder command 로부터 전달받은 orderId 를 갖고 db를 조회해서 Order  객체를 얻는다. 그리고 Order 객체를 활용해 본격적인 StartOrder 관련 작업을 수행한다. 다 끝났으면 마지막으로 StartOrder 가 끝났다는 event 를 발행하기 위해 order.applyOrderStarted 를 호출한다.

cf) NullPointerException 이 발생하는 과정을 설명하는게 주 목적이기 때문에 설명없이 넘어가지만, order 에서 event 및 command 발행을 수행하는 구조가 EDA 에서 중요한 포인트 중 하나다.

@Component
public class OrderProcessor {
	public void processStartOrder(String orderId, PointReserveDto pointReserveDto) {
		Order order = getOrderById(orderId);

		// StartOrder 관련 작업 수행

		order.applyOrderStarted(pointReserveDto);
	}

	// 실제론 DB 조회해서 Order 얻어와야함
	private Order getOrderById(String orderId) {
		return Order.builder().id(orderId).build();
	}
}

 

 

다음으로 아래 코드를 보자. order.applyOrderStarted 는 전달받은 pointReserveDto 를 OrderStarted event 에 담고 pendingEvents list 에 추가한다. 마지막으로 apply 를 통해 Order 객체의 pointReserveDto 필드를 채운다.

 

그다음 ReservePoint command 를 생성해서 pointReserveDto 를 담고 마찬가지로 pendingCommands list 에 추가한다.

남은 작업은 pendingEvents 와 pendingCommands 에 있는 메세지들을 kafka 로 발행하고 order 를 db 에 저장하는것이다. 그림 5번, 6번에 대한 내용이고 관련 코드는 생략한다. 

@Getter
@Builder
public class Order {
	String id;

	@Nullable
	PointReserveDto pointReserveDto;

	List<Event> pendingEvents = new ArrayList<>();
	List<Command> pendingCommands = new ArrayList<>();

	public void applyOrderStarted(PointReserveDto pointReserveDto) {
		this.pendAndApplyEvent(
			OrderStarted.builder()
				.orderId(this.getId())
				.pointReserveDto(pointReserveDto)
				.build(),
			this::apply
		);

		this.pendingCommands.add(
			ReservePoint.builder()
				.orderId(this.getId())
				.pointReserveDto(this.pointReserveDto)
				.build()
		);
	}

	private <T extends Event> void pendAndApplyEvent(T event, Consumer<T> apply) {
		this.pendingEvents.add(event);
		apply.accept(event);
	}

	private void apply(OrderStarted orderStarted) {
		this.pointReserveDto = orderStarted.getPointReserveDto();
	}
}

 

위에서 ReservePoint command 를 발행했기 때문에 아래 KafkaCommandHandler 가 onReservePoint 를 통해 다시 consume 한다. 

@Configuration
@RequiredArgsConstructor
public class KafkaListener {

	@Component
	@RequiredArgsConstructor
	public static class KafkaCommandHandler {
		private final OrderService orderService;

		public void onReservePoint(ReservePoint reservePoint) {
			this.orderService.reservePoint(reservePoint);
		}
	}
}

 

이전 startOrder 프로세스와 비슷하게 흘러가다, orderProcessor 에서 전달받은 pointReserveDto 의 newObject.data 값을 꺼내려다 NullPointerException 을 만나게 된다.

@Component
public class OrderProcessor {
	public void processReservePoint(String orderId, PointReserveDto pointReserveDto) {
		Order order = getOrderById(orderId);

		// 관련 작업 수행

		// Fixme: NPE 발생 !! pointReserveSource.getNewObject() 가 Null
		String newData = pointReserveDto.getNewObject().getData();
	}

	// 실제론 DB 조회해서 Order 얻어와야함
	private Order getOrderById(String orderId) {
		return Order.builder().id(orderId).build();
	}
}

 

반응형