평화롭던 서버에서 갑자기 reactor-netty out of direct memory error 다수 발생했다.
따로 설정을 한게 없어서 direct memory max 가 1G 인데 초과된것이다.
ERROR default c.n.s.p.GlobalErrorWebExceptionHandler - [GlobalErrorWebExceptionHandler] ETC
reactor.netty.ReactorNetty$InternalNettyException: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 1056964615, max: 1073741824)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP GET "/file/14c583325e3" [ExceptionHandlingWebHandler]
Stack trace:
Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 1056964615, max: 1073741824)
at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:726)
at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:681)
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:227)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:342)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
at io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53)
at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
at io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:777)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
그동안 Direct Buffer Memory 그래프 수치가 일정해서 문제 없다고 생각했는데 아니었다.
Netty의 DirectBuffer 할당구조가, JVM에서 일반적으로 통용되는 DirectBuffer 할당 api를 사용하지 않고 JVM 내부의 숨겨진 Unsafe API를 사용하기 때문에 트래킹이 안됐던 것이다.
일단 트래킹 안된 문제는 접어두고 빨리 메모리가 세는 부분을 찾아야 했다.
아래 api를 통해 로그를 남겨 범인을 찾았다.
io.netty.util.internal.PlatformDependent.usedDirectMemory();
io.netty.util.internal.PlatformDependent.maxDirectMemory();
io.netty.util.internal.PlatformDependent.directBufferPreferred();
문제는 request body 를 읽는 코드였다.
Spring Cloud Gateway 에서는 request body를 요청당 한번밖에 못읽는 제약이 있다.
cf) 이건 Spring Cloud Gateway 만의 문제는 아니다. 일반 요청에서도 제약이 발생할 수 있다.
한번밖에 못읽는데 Spring Cloud Gateway 에서 제공하는 NettyRoutingFilter 에서 request.getBody() 를 써버린다.
그래서 내가 만든 필터에서 getBody를 쓰면 뒤에서 java.lang.IllegalStateException: Only one connection receive subscriber allowed 에러가 발생한다. 그렇다고 NettyRoutingFilter를 안쓰게 뺄 수도 없다.
// 뒷단 서버로 요청 보내는 NettyRoutingFilter 코드
Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange)
// ... 생략
}).request(method).uri(url).send((req, nettyOutbound) -> {
// ... 생략
return nettyOutbound.send(request.getBody().map(this::getByteBuf));
}).responseConnection((res, connection) -> {
// ... 생략
그래서 이 문제를 getBody 캐싱을 구현해서 해결한다.
// request body 꺼내는 코드(Join all the DataBuffers so we have a single DataBuffer for the body)
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
// request body를 두번 읽을 수 있도록 retain count 를 업데이트
DataBufferUtils.retain(dataBuffer);
// Make a slice for each read so each read has its own read/write indexes
Flux<DataBuffer> cachedFlux = Flux.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
MultiValueMap<String, String> requestBodyMap = parse(StandardCharsets.UTF_8, toRawString(cachedFlux));
exchange.getAttributes().put("REQUEST_BODY", requestBodyMap);
return chain.filter(exchange.mutate().request(mutatedRequest).build());
});
private String toRawString(Flux<DataBuffer> body) {
if (body == null) {
return "";
}
AtomicReference<String> rawRef = new AtomicReference<>();
body.subscribe(buffer -> {
byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes);
DataBufferUtils.release(buffer);
rawRef.set(Strings.fromUTF8ByteArray(bytes));
});
return rawRef.get();
}
이렇게 사용하면 일반적으로 메모리 누수 문제가 발생하진 않는다.
그런데 나는 특정 패턴의 요청일 경우, 아래와 같이 도중에 강제 리턴을 시켰다.
이때 추가적인 DataBufferUtils.release(dataBuffer) 를 해주지 않으면 direct buffer memory 가 계속 쌓인다.
// request body 꺼내는 코드
// Join all the DataBuffers so we have a single DataBuffer for the body
return DataBufferUtils.join(exchange.getRequest().getBody())
.flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
MultiValueMap<String, String> requestBodyMap = parse(StandardCharsets.UTF_8, toRawString(cachedFlux));
exchange.getAttributes().put("REQUEST_BODY", requestBodyMap);
// NettyRoutingFilter 까지 보내지 않고 해당 필터에서 끝내면 따로 release를 해줘야함.
// 안하면 메모리 누수 발생
DataBufferUtils.release(dataBuffer);
// 응답 만들어서 리턴 시킴
return response.writeWith(Flux.just(response.bufferFactory().wrap("YBS".getBytes())));
});
private String toRawString(Flux<DataBuffer> body) {
if (body == null) {
return "";
}
AtomicReference<String> rawRef = new AtomicReference<>();
body.subscribe(buffer -> {
byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes);
DataBufferUtils.release(buffer);
rawRef.set(Strings.fromUTF8ByteArray(bytes));
});
return rawRef.get();
}
처음엔 toRawString 메서드에서 dataBuffer를 release 해주기 때문에 문제가 없다고 생각했었는데 틀렸다.
확실히 subscribe 코드는 더 주의해서 봐야한다.
현재는 getBody 코드를 아래와 같이 사용한다.
return DataBufferUtils.join(request.getBody())
.flatMap(dataBuffer -> {
byte[] requestBodyByteArray = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(requestBodyByteArray);
String requestBodyString = new String(requestBodyByteArray, StandardCharsets.UTF_8);
DataBufferUtils.release(dataBuffer);
MultiValueMap<String, String> requestBodyMap = parse(requestBodyString);
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return DataBufferUtils.read(new ByteArrayResource(requestBodyByteArray), new NettyDataBufferFactory(ByteBufAllocator.DEFAULT),
requestBodyByteArray.length);
}
};
return chain.filter(exchange.mutate().request(mutatedRequest).build());
});
왜 이렇게 바꿨고 뭐가 더 좋은지 설명하면 너무 길어져서 다음 기회로
참고 : github.com/spring-cloud/spring-cloud-gateway/issues/747
참고 : github.com/spring-cloud/spring-cloud-gateway/issues/946
'Spring' 카테고리의 다른 글
Spring Web MVC 구조 논의 1편 (0) | 2021.06.06 |
---|---|
Spring Cloud Gateway CORS 주의사항 (4) | 2021.01.24 |
useInsecureTrustManager 옵션 (0) | 2021.01.20 |
개발 이슈 모음 (0) | 2021.01.19 |
WebClient 사용할때 주의 (1편) (2) | 2021.01.18 |