
์ปฌ๋ฆฌ, CJ, ์ฟ ํก ๋ฑ ๋ฌผ๋ฅ ์ธํ๋ผ๋ฅผ ๋ณด์ ํ๊ณ ์๋ ํ์ฌ๋ค์ 3PL์ด๋ผ๋ ์๋น์ค๋ฅผ ์ ๊ณตํฉ๋๋ค. 3PL์ด๋ ๋ฌผ๋ฅ ์ธํ๋ผ๋ฅผ ๊ฐ์ถ ํ์ฌ๊ฐ ๊ทธ๋ ์ง ๋ชปํ ํ๋งค์ฒ๋ก๋ถํฐ ๋ฐฐ์ก ์
๋ฌด๋ฅผ ์ํ๋ฐ์ ์ ๊ณตํ๋ ์๋น์ค๋ฅผ ๋งํฉ๋๋ค.
ํ๋งค์ฒ๋ ๋ฐฐ์ก์ด ํ์ํ ์ฃผ๋ฌธ ๋ชฉ๋ก์ 3PL ์์คํ
์ ๋ฑ๋กํ๊ฒ ๋๋๋ฐ, ์ด ๊ณผ์ ์์ ์
๋ ฅ๋ ์ฃผ๋ฌธ์ด ์ ํจํ ์ฃผ๋ฌธ์ธ์ง ํ์ธํ๊ธฐ ์ํด์ ์ฌ๋ฌ ์์คํ
๊ณผ ์ํตํ๊ฒ ๋ฉ๋๋ค.
์ธ๋ถ API ํธ์ถ์ ์ํ ๋๊ตฌ๋ก๋ WebClient๋ฅผ ์ฌ์ฉํ๊ณ ์์๋๋ฐ์. ์๋ ์งํ์์ ๋ณด๋ฏ์ด, ์ธ๋ถ API ํธ์ถ๊น์ง์ ์ง์ฐ ์๊ฐ์ด ์์ธ์ ์ ์ ์์ด ๊ธธ์ด์ง๋ ํ์์ด '๊ฐํ์ '์ผ๋ก ๋ฐ๊ฒฌ๋์์ต๋๋ค.

์ด๋ฒ ํฌ์คํธ์์๋ ํด๋น ํ์์ ์์ธ์ ํ์
ํ๋ฉฐ ์๊ฒ ๋ WebClient์ ๋ด๋ถ ๋์ ์๋ฆฌ์ Reactor Netty์ ์ํคํ
์ฒ, ๊ทธ๋ฆฌ๊ณ ํด๊ฒฐ์ฑ
์ ๊ณต์ ํ๊ณ ์ ํฉ๋๋ค.
์์ธ ํ์
์ ์ํ ๊ฐ์
์ง์ฐ์ด ๋ฐ์ํ๋ค๋ ๊ฒ์ ์์ฒญ์ด ์ด๋๊ฐ์์ ์ฆ์ ์ฒ๋ฆฌ๋์ง ๋ชปํ๊ณ ๋๊ธฐํ๊ณ ์์์ ๊ฐ๋ฅ์ฑ์ด ๋๋ค๋ ์๋ฏธ์
๋๋ค.
์ด์ ์ฌ์ง์์ ๋นจ๊ฐ ๋ฐ์ค๋ก ํ์๋ ๋ฉ์๋๋ ๊ฐ์ฒด ์์ฑ๊ณผ WebClient ํธ์ถ๋ง์ ๋ด๋นํ๊ณ ์์์ต๋๋ค. WebClient ๋ด๋ถ์ ์ด๋ ์ฒ๋ฆฌ ๋จ๊ณ์์ ๋ณ๋ชฉ์ด ๋ฐ์ํ ์ ์๋์ง ๋ช
ํํ ์๋ณํ๋ ค๋ฉด ์ํคํ
์ฒ์ ๋ํ ์ดํด๊ฐ ์ ํ๋์ด์ผ ํ์ต๋๋ค. ์ด๋ฅผ ์ํด WebClient์ ์์ฒญ ์ฒ๋ฆฌ ๋ฐฉ์๊ณผ Reactor Netty์ ์ํคํ
์ฒ๋ฅผ ๋ถ์ํ์ต๋๋ค.
WebClient ์คํ ๋ฉ์ปค๋์ฆ
๋จผ์ ๋ฌธ์ ๊ฐ ๋ฐ์ํ ์ฝ๋์ ๊ตฌ์กฐ๋ฅผ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
OrderRegistrationService.java
private final static int CONCURRENCY_CALL = 10;
List<RefineResult> results = Flux.fromIterable(registerDtos)
.flatMap(dto -> Mono.defer(() ->
omsService.refineAddress(dto.getPrimaryAddress(), dto.getSecondaryAddress())
.defaultIfEmpty(ApiResponse.failure(โNO_RESPONSEโ, false))
).map(resp -> new RefineResult(dto, resp)), CONCURRENCY_CALL)
.collectList()
.block();
OmsService.java
@Override
public Mono<ApiResponse<RefineAddressDto>> refineAddress(String primaryAddress, String secondaryAddress) {
RefineAddressInput request = RefineAddressInput.builder()
.primaryAddress(primaryAddress)
.secondaryAddress(secondaryAddress)
.build();
<span class="k">return</span> <span class="n">omsClient</span><span class="o">.</span><span class="na">post</span><span class="o">()</span>
<span class="o">.</span><span class="na">uri</span><span class="o">(</span><span class="s">"/refine-address"</span><span class="o">)</span>
<span class="o">.</span><span class="na">bodyValue</span><span class="o">(</span><span class="n">request</span><span class="o">)</span>
<span class="o">.</span><span class="na">retrieve</span><span class="o">()</span>
<span class="o">.</span><span class="na">bodyToMono</span><span class="o">(</span><span class="nc">RefineAddressOutput</span><span class="o">.</span><span class="na">class</span><span class="o">)</span>
<span class="o">.</span><span class="na">timeout</span><span class="o">(</span><span class="nc">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">5</span><span class="o">))</span>
<span class="o">.</span><span class="na">retryWhen</span><span class="o">(</span><span class="nc">RetryPolicy</span><span class="o">.</span><span class="na">fixedDelay</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="nc">Duration</span><span class="o">.</span><span class="na">ofMillis</span><span class="o">(</span><span class="mi">100</span><span class="o">),</span> <span class="s">"[OMS ์ฃผ์์ ์ ] ์ฌ์๋ ์์ฒญ: "</span> <span class="o">+</span> <span class="n">request</span><span class="o">))</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">output</span> <span class="o">-></span> <span class="n">output</span><span class="o">.</span><span class="na">isSuccess</span><span class="o">()</span> <span class="o">?</span> <span class="nc">ApiResponse</span><span class="o">.</span><span class="na">success</span><span class="o">(</span><span class="nc">RefineAddressDto</span><span class="o">.</span><span class="na">from</span><span class="o">(</span><span class="n">output</span><span class="o">))</span>
<span class="o">:</span> <span class="nc">ApiResponse</span><span class="o">.<</span><span class="nc">RefineAddressDto</span><span class="o">></span><span class="n">failure</span><span class="o">(</span><span class="s">"์๋ต ๊ฒฐ๊ณผ์ ๋ฐ์ดํฐ๊ฐ ์์"</span><span class="o">,</span> <span class="kc">false</span><span class="o">))</span>
<span class="o">.</span><span class="na">onErrorResume</span><span class="o">(</span><span class="n">ex</span> <span class="o">-></span> <span class="nc">ExternalErrorHandler</span><span class="o">.</span><span class="na">handleError</span><span class="o">(</span><span class="n">ex</span><span class="o">,</span> <span class="n">extractOmsErrorMessage</span><span class="o">(</span><span class="n">ex</span><span class="o">),</span> <span class="s">"OMS ์ฃผ์์ ์ "</span><span class="o">));</span>
}
Cold Sequence์ ๊ตฌ๋
์์
์ ์ฝ๋์์ ์ค์ HTTP ์์ฒญ์ด ์ธ์ ๋ฐ์ํ๋์ง ์ดํดํ๋ ค๋ฉด WebClient์ Cold Sequence ํน์ฑ์ ๋จผ์ ์ดํดํด์ผ ํฉ๋๋ค.
WebClient์ ๋ฆฌ์กํฐ๋ธ ์ฒด์ธ์ Cold Sequence๋ก ๋์ํฉ๋๋ค. ๊ตฌ๋
์ด ๋ฐ์ํ๊ธฐ ์ ๊น์ง๋ ํ์ดํ๋ผ์ธ๋ง ์ ์๋ ๋ฟ, ์ค์ ์คํ์ ์ผ์ด๋์ง ์์ต๋๋ค. HTTP ์์ฒญ ๋ฐ์ก ์์ ์ subscribe()๊ฐ ํธ์ถ๋๋ ์๊ฐ์ด๋ฉฐ, ์ฝ๋์์ block()์ด ๋ด๋ถ์ ์ผ๋ก ์ด๋ฅผ ํธ๋ฆฌ๊ฑฐํฉ๋๋ค.
List<RefineResult> results = Flux.fromIterable(registerDtos)
.flatMap(dto -> Mono.defer(() -> ...))
.collectList()
.block(); // โ ๊ตฌ๋
์์์
block()์ ๊ตฌ๋
์ ํธ๋ ์ญ๋ฐฉํฅ(upstream)์ผ๋ก ์ ํ๋ฉ๋๋ค
block() โ collectList() โ flatMap() โ Mono.defer() โ WebClient ์ฒด์ธ
flatMap(Function, int concurrency)์ ์ธ์๋ก ์ ๋ฌ๋ concurrency ์ ๋งํผ์ Mono๋ฅผ ๋์์ ๊ตฌ๋
ํฉ๋๋ค. Mono.defer()๋ ๊ฐ ๊ตฌ๋
์์ ๋ง๋ค ๋ด๋ถ ๋๋ค๋ฅผ ์คํํ์ฌ ์๋ก์ด Mono๋ฅผ ์์ฑํ๋ฏ๋ก, ๊ฐ DTO๋ง๋ค ๋
๋ฆฝ์ ์ธ HTTP ์์ฒญ ํ์ดํ๋ผ์ธ์ด ์์ฑ๋ฉ๋๋ค.
// ๊ตฌ๋
๋ ๋๋ง๋ค ์๋ก์ด WebClient ์ฒด์ธ ์์ฑ
Mono.defer(() -> omsService.refineAddress(...))
TaskQueue๋ก์ ์ ๋ฌ
omsService.refineAddress(...)๊ฐ ๋ฐํํ๋ Mono๊ฐ ๊ตฌ๋
๋๋ฉด ์์ฒญ ์ค์ ์ ๋น๋ํ๊ณ , .retrieve() ์ดํ ์ฒด์ธ์ด ๊ตฌ๋
๋๋ฉด์ ์ฐ๊ธฐ ์์ฒญ์ด TaskQueue์ ์ ์ฅ๋ฉ๋๋ค.
omsClient.post()
.uri("/refine-address")
.bodyValue(request)
.retrieve()
.bodyToMono(RefineAddressOutput.class)
POST ์์ฒญ์ด NioEventLoop์ TaskQueue์ ์ ์ฅ๋๋ฉด, WebClient๋ฅผ ํธ์ถํ ์ค๋ ๋์ ์ญํ ์ ์ฌ๊ธฐ์ ๋๋ฉ๋๋ค. ์ดํ ์์
์ EventLoop ์ค๋ ๋๊ฐ ๋ด๋นํฉ๋๋ค.
Netty EventLoop ์ค๋ ๋์ ๋์ ์๋ฆฌ
WebClient์ HTTP ์์ฒญ์ด TaskQueue์ ์ ์ฅ๋๋ ์ด์ ๋ Netty์ ์ด๋ฒคํธ ๋ฃจํ ๊ธฐ๋ฐ ๋น๋๊ธฐ ์ฒ๋ฆฌ ๋ชจ๋ธ ๋๋ฌธ์
๋๋ค. ์ด ๋ชจ๋ธ์ ์ดํดํ๋ ค๋ฉด ๋จผ์ ๋คํธ์ํฌ ํต์ ์ ๊ธฐ๋ณธ ๊ฐ๋
์ ์ง๊ณ ๋์ด๊ฐ์ผ ํฉ๋๋ค.
User Space์ Kernel Space
์๋ก ๋ค๋ฅธ ๋จธ์ ์ ์ ํ๋ฆฌ์ผ์ด์
์ด ํต์ ํ๋ ค๋ฉด ์์คํ
์ฝ๋ก ์ ์ ๋ชจ๋์ ์ปค๋ ๋ชจ๋๋ฅผ ์ค๊ฐ๋ฉฐ ์ปค๋ ๋ด ์์ผ ๋ฒํผ์ ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ฑฐ๋ ์จ์ผ ํฉ๋๋ค.

์์ผ ๋ฒํผ์ ๋ฐ์ดํฐ๋ฅผ ์ด๋ป๊ฒ ์ฝ๊ณ ์ฐ๋๋์ ๋ฐ๋ผ Blocking I/O์ Non-blocking I/O๋ก ๋๋ฉ๋๋ค. ๋์ ์ฐจ์ด๋ ์ค๋ ๋๊ฐ ์์คํ
์ฝ ํ ์๋ต์ ๊ธฐ๋ค๋ฆฌ๋์ง ์ฌ๋ถ์
๋๋ค.
-
Blocking I/O: ๋ฐ์ดํฐ๊ฐ ์ค๋น๋ ๋๊น์ง ์ค๋ ๋๊ฐ ๋๊ธฐ
-
Non-blocking I/O: ๋ฐ์ดํฐ๊ฐ ์์ผ๋ฉด ์ฆ์ ๋ฐํ, ์ค๋ ๋๋ ๋ค๋ฅธ ์์
์ํ ๊ฐ๋ฅ
ํจ์จ์ ์ธ Non-blocking I/O๋ฅผ ๊ตฌํํ๋ ค๋ฉด ํน์ ์ด๋ฒคํธ๋ฅผ ๋ฑ๋กํด ๋๊ณ ํด๋น ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ์ ๋๋ง ์ฒ๋ฆฌํ๋ ๋ฐฉ์์ด ํ์ํฉ๋๋ค. ์ด๋ ๊ฒ ํ๋ฉด ํ๋์ ์ค๋ ๋๋ก ์ฌ๋ฌ ์ฑ๋์ ๊ด๋ฆฌํ ์ ์์ต๋๋ค.
Multiplexing I/O์ Selector
์ด๋ฒคํธ ๊ธฐ๋ฐ ์์ผ ํต์ ์์๋ ํ๋์ Selector๊ฐ ์ฌ๋ฌ ์์ผ ์ฑ๋์ ๋ณํ๋ฅผ ๊ฐ์งํ๋ฉฐ ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ์ ๋๋ง ์ฒ๋ฆฌํฉ๋๋ค. ์ด๋ฅผ Multiplexing I/O๋ผ๊ณ ํฉ๋๋ค.

Linux์์ ์ด Multiplexing I/O๋ epoll ์์คํ
์ฝ๋ก ๊ตฌํ๋ฉ๋๋ค. Java NIO์ Selector๋ ๋ด๋ถ์ ์ผ๋ก ์ด epoll์ ์ฌ์ฉํฉ๋๋ค.
Selector.select()์ ์ค์ ๋์
OS ์ปค๋์ด ๋ฅ๋์ ์ผ๋ก I/O ์ด๋ฒคํธ๋ฅผ Selector์ ์๋ ค์ฃผ๋ ๊ฒ์ฒ๋ผ ๋ณด์ด์ง๋ง, ์ค์ ๋ก๋ ๊ทธ๋ ์ง ์์ต๋๋ค.
Selector.select()๊ฐ ํธ์ถ๋๋ฉด ์ ์ ๋ชจ๋์์ ์ปค๋ ๋ชจ๋๋ก ์ ํ๋๊ณ , ๋ด๋ถ์ ์ผ๋ก epoll_wait() ์์คํ
์ฝ์ด ํธ์ถ๋๋ฉด์ ํธ์ถ ์ค๋ ๋๋ ์ปค๋์์ ๋ธ๋กํน ์ํ๋ก ๋๊ธฐํฉ๋๋ค.

epoll_wait์ ํธ์ถํ๋ฉด, OS ์ปค๋์ ์ด์ ์ epoll_ctl๋ก ๋ฑ๋ก๋ ํ์ผ ๋์คํฌ๋ฆฝํฐ(์์ผ)๋ค์ ๋ชจ๋ํฐ๋งํ๋ค๊ฐ, ๋คํธ์ํฌ ์นด๋์ ๋ฐ์ดํฐ๊ฐ ๋์ฐฉํ๊ฑฐ๋ ์์ผ ๋ฒํผ์ ์ฐ๊ธฐ๊ฐ ๊ฐ๋ฅํด์ง๋ ๋ฑ์ I/O ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ๋ฉด ์ด๋ฅผ ๊ฐ์งํฉ๋๋ค. I/O๊ฐ ๋ฐ์ํ ์์ผ์ ์ปค๋ ๋ด Ready Queue์ ์ถ๊ฐ๋๊ณ , epoll_wait()์ด ๋ฐํ๋์ด ๋๊ธฐ ์ค์ด๋ ์ค๋ ๋๊ฐ ๊นจ์ด๋ฉ๋๋ค.
์ฆ, User Space๊ฐ ์ปค๋์ ์์ฒญํ๊ณ ์์คํ
์ฝ๋ก ์๋ต๋ฐ๋ pull ๊ตฌ์กฐ์
๋๋ค.
select() ์์ฒด๋ ๋ธ๋กํน ํธ์ถ์ด์ง๋ง, ํ๋์ ์ค๋ ๋๊ฐ ์ฌ๋ฌ ์์ผ์ ๊ฐ์ํ๊ณ ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ ์์ผ๋ค๋ง ๊ณจ๋ผ์ ์ฒ๋ฆฌํฉ๋๋ค. ๋ฐ๋ผ์ ๊ฐ ์์ผ ์
์ฅ์์๋ ์ ์ฉ ์ค๋ ๋ ์์ด๋ ๋น๋๊ธฐ์ ์ผ๋ก ์ฒ๋ฆฌ๋๋ ๊ฒ๊ณผ ๊ฐ์ ํจ๊ณผ๋ฅผ ์ป๊ฒ ๋ฉ๋๋ค.
NioEventLoop์ ๊ตฌ์กฐ
์ด์ Netty์ EventLoop๊ฐ Selector๋ฅผ ์ด๋ป๊ฒ ํ์ฉํ๋์ง ์ดํด๋ณด๊ฒ ์ต๋๋ค.
EventLoop์ ๊ตฌํ์ฒด์ธ NioEventLoop๋ 1 Thread + 1 Selector + 1 TaskQueue๋ก ๊ตฌ์ฑ๋ฉ๋๋ค.

EventLoop ์ค๋ ๋๋ ๊ธฐ๋ณธ์ ์ผ๋ก CPU ์ฝ์ด ์๋งํผ ์์ฑ๋ฉ๋๋ค. Math.max(Runtime.getRuntime().availableProcessors(), 4)
๊ฐ EventLoop ์ค๋ ๋๋ ์ ์ฉ NioEventLoop ์ธ์คํด์ค๋ฅผ ์คํํ๋ฉฐ, ๋จ์ผ ์ค๋ ๋๊ฐ ๋ฌดํ ๋ฃจํ๋ฅผ ๋๋ฉด์ ๋ ๊ฐ์ง ์์
์ ์ํํฉ๋๋ค.
- I/O ์ด๋ฒคํธ ์ฒ๋ฆฌ (๋คํธ์ํฌ ์ฝ๊ธฐ/์ฐ๊ธฐ)
- TaskQueue์ ์์
์ฒ๋ฆฌ (์ฌ์ฉ์๊ฐ ๋ฑ๋กํ Runnable)
// ๊ฐ๋
์ ์ธ ์ฝ๋
while (true) {
// 1. ๋คํธ์ํฌ์์ ๋ญ๊ฐ ์ผ์ด๋ฌ๋์ง ํ์ธ
๋คํธ์ํฌ_์ด๋ฒคํธ_ํ์ธ();
// 2. ์ผ์ด๋ ์ผ๋ค ์ฒ๋ฆฌ
์ด๋ฒคํธ๋ค_์ฒ๋ฆฌ();
// 3. ๋๊ฐ ์์ผ๋์ ์์
๋ค ์ฒ๋ฆฌ
์์
ํ์์_์์
๊บผ๋ด์_์คํ();
}
์ค์ Netty ์ฝ๋๋ฅผ ๋ณด๋ฉด (Netty 4.2 ๊ธฐ์ค)
// SingleThreadIoEventLoop.java:153-164
protected void run() {
do {
runIo(); // โ 1+2: I/O ํ์ธ ๋ฐ ์ฒ๋ฆฌ
runAllTasks(maxTasksPerRun); // โ 3: ์์
ํ ์ฒ๋ฆฌ
} while (!confirmShutdown());
}
runIo()๋ ๋ด๋ถ์ ์ผ๋ก NioIoHandler.run()์ ํธ์ถํฉ๋๋ค.
// NioIoHandler.java:420-485
public int run(IoExecutionContext runner) {
// 1๋จ๊ณ: select - I/O ์ด๋ฒคํธ ์กด์ฌ ์ฌ๋ถ ํ์ธ
select(runner, wakenUp.getAndSet(false));
<span class="c1">// 2๋จ๊ณ: ์์ผ๋ฉด ์ฒ๋ฆฌ</span>
<span class="k">return</span> <span class="nf">processSelectedKeys</span><span class="o">();</span>
}
์ด์ ๊ฐ ๋จ๊ณ๋ฅผ ์์ธํ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
- select() - I/O ์ด๋ฒคํธ ๊ฐ์ง
EventLoop๋ I/O ์ด๋ฒคํธ ์ฒ๋ฆฌ์ TaskQueue์ ์์ธ ์์
์ฒ๋ฆฌ, ๋ ๊ฐ์ง ์ญํ ์ ์ํํฉ๋๋ค. ์ด๋ Selector.select()๋ฅผ ์ฌ์ฉํ์ฌ ์ฒ๋ฆฌํ I/O ์ด๋ฒคํธ๊ฐ ์๋์ง ํ์ธํฉ๋๋ค.
select() ๋ฉ์๋๋ TaskQueue์ ์์
์ด ์กด์ฌํ๋์ง ์ฌ๋ถ์ ๋ฐ๋ผ ์ ์ ํ select ๋ฐฉ์์ ๊ฒฐ์ ํฉ๋๋ค.
// NioIoHandler.java
private void select(IoExecutionContext runner, boolean oldWakenUp) {
Selector selector = this.selector;
<span class="k">for</span> <span class="o">(;;)</span> <span class="o">{</span>
<span class="c1">// ํ์คํฌ๊ฐ ์์ผ๋ฉด ์ฆ์ ํ์ธํ๊ณ ๋์ด๊ฐ</span>
<span class="k">if</span> <span class="o">(!</span><span class="n">runner</span><span class="o">.</span><span class="na">canBlock</span><span class="o">()</span> <span class="o">&&</span> <span class="n">wakenUp</span><span class="o">.</span><span class="na">compareAndSet</span><span class="o">(</span><span class="kc">false</span><span class="o">,</span> <span class="kc">true</span><span class="o">))</span> <span class="o">{</span>
<span class="n">selector</span><span class="o">.</span><span class="na">selectNow</span><span class="o">();</span> <span class="c1">// ์์
์์ผ๋ฉด ๋ฐ๋ก ํ์ธ</span>
<span class="k">break</span><span class="o">;</span>
<span class="o">}</span>
<span class="c1">// ํ์คํฌ๊ฐ ์์ผ๋ฉด ์ด๋ฒคํธ ์ฌ ๋๊น์ง ๋๊ธฐ</span>
<span class="kt">int</span> <span class="n">selectedKeys</span> <span class="o">=</span> <span class="n">selector</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="n">timeoutMillis</span><span class="o">);</span>
<span class="o">}</span>
}
@Override
public boolean canBlock() {
assert inEventLoop();
return !hasTasks() && !hasScheduledTasks();
}
TaskQueue๊ฐ ๋น์์ ๋
TaskQueue๊ฐ ๋น์ด์์ผ๋ฉด Netty๋ select(timeout)์ ํธ์ถํ์ฌ ์ปค๋๋ก ๋ถํฐ I/O ์ด๋ฒคํธ ์ ํธ๋ฅผ ๋ฐ๊ฑฐ๋ ํ์์์์ด ๋ ๋๊น์ง ๋ธ๋กํน ์ํ๋ก ๋๊ธฐํ์ฌ CPU ์ฌ์ฉ์ ์ค์
๋๋ค.
๋ง์ฝ ๋๊ธฐ ์ค TaskQueue์ ์ task๊ฐ ๋ค์ด์ค๋ฉด, wakeup ๋ฉ์ปค๋์ฆ์ ํตํด select()์ ๋ธ๋กํน์ ๊นจ์์ ์ฆ์ ๋ฐํ์ํค๊ณ , ๋ฃจํ๋ฅผ ๋๋ฉฐ TaskQueue๋ฅผ ์ฒ๋ฆฌํ ์ ์๊ฒ ํฉ๋๋ค.
TaskQueue๊ฐ ์์ ๋
TaskQueue์ ์์
์ด ์์ผ๋ฉด selectNow()๋ฅผ ํธ์ถํ์ฌ I/O ์ด๋ฒคํธ๊ฐ ์๋์ง ๋น ๋ฅด๊ฒ ํ์ธํ๊ณ , ๊ณง๋ฐ๋ก ํ
์คํฌ ์คํ์ผ๋ก ๋์ด๊ฐ ์์
์ง์ฐ์ ์ค์
๋๋ค.
๋ง์ฝ TaskQueue์ ์์
์ด ์๋ ์ํฉ์์ select(timeout)์ ํธ์ถํด ๋ธ๋กํน๋๋ฉด, EventLoop ์ค๋ ๋๊ฐ ์ ๋ค์ด ํ
์คํฌ ์ฒ๋ฆฌ๊ฐ ์ง์ฐ๋๊ณ ์๋ต์ฑ์ด ๋จ์ด์ง๊ฒ ๋ฉ๋๋ค. ๋ฐ๋๋ก selectNow()๋ง ๊ณ์ ์ํํ๋ฉด ์ค๋น๋ I/O ์ด๋ฒคํธ๊ฐ ์์ด๋ ๊ณ์ ํ์ธํ๋ฏ๋ก ๋ถํ์ํ ๋ฐ๋ณต์ผ๋ก busy-wait(CPU ๋ญ๋น)์ด ๋ฐ์ํ ์ ์์ต๋๋ค.
์ฆ, Netty์ select()๋ ์ํฉ์ ๋ฐ๋ผ ์ ์ ํ ๋ฐฉ์์ ์ ํํ์ฌ CPU๋ฅผ ๋ญ๋นํ์ง ์๊ณ ํจ์จ์ ์ผ๋ก I/O ์ด๋ฒคํธ๋ฅผ ๋๊ธฐํฉ๋๋ค.
select() ํธ์ถ ์ดํ์ ๋ด๋ถ ๋์
์์ Netty๊ฐ ์ํฉ์ ๋ฐ๋ผ select(timeout) ๋๋ selectNow()๋ฅผ ์ ํ์ ์ผ๋ก ํธ์ถํ๋ค๋ ๊ฒ์ ์ดํด๋ณด์์ต๋๋ค. ์ด์ ์ด ํธ์ถ์ด ์ค์ ๋ก ์ด๋ค ๊ณผ์ ์ ๊ฑฐ์ณ ์ปค๋๊น์ง ๋๋ฌํ๊ณ , ๋ค์ ๋์์ค๋์ง ์ดํด๋ณด๊ฒ ์ต๋๋ค.
Selector.select()๋ฅผ ํธ์ถํ๋ฉด JDK ๋ด๋ถ์ SelectorImpl ํด๋์ค๊ฐ ์ด๋ฅผ ์ฒ๋ฆฌํฉ๋๋ค.
// SelectorImpl.java
@Override
public final int select(long timeout) throws IOException {
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}
lockAndDoSelect()๋ ๋๊ธฐํ๋ฅผ ์ํํ ๋ค Multiplexing I/O๋ฅผ ๋ด๋นํ๋ doSelect()๋ฅผ ํธ์ถํฉ๋๋ค.
// SelectorImpl.java
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}
์ฌ๊ธฐ์ doSelect()๋ ์ถ์ ๋ฉ์๋์
๋๋ค. ์ด์์ฒด์ ๋ง๋ค ํจ์จ์ ์ธ Multiplexing I/O ๋ฉ์ปค๋์ฆ์ด ๋ค๋ฅด๊ธฐ ๋๋ฌธ์, JDK๋ ํ๋ซํผ๋ณ๋ก ๋ค๋ฅธ ๊ตฌํ์ฒด๋ฅผ ์ ๊ณตํฉ๋๋ค.
| OS |
๊ตฌํ ํด๋์ค |
์์คํ
์ฝ |
| Linux |
EPollSelectorImpl |
epoll_wait() |
| macOS |
KQueueSelectorImpl |
kevent() |
| Windows |
WindowsSelectorImpl |
IOCP |
์ด ๊ธ์์๋ ์๋ฒ ํ๊ฒฝ์์ ๊ฐ์ฅ ๋ง์ด ์ฌ์ฉ๋๋ Linux์ epoll ๊ธฐ๋ฐ ๊ตฌํ์ ์ค์ฌ์ผ๋ก ์ดํด๋ณด๊ฒ ์ต๋๋ค. (JDK 21 ๊ธฐ์ค)
EPollSelectorImpl ์ธ์คํด์ค๋ ์ธ์ ์์ฑ๋๋๊ฐ?
EPollSelectorImpl ์ธ์คํด์ค๋ Selector.open() ํธ์ถ ์์ ์ ์ด๊ธฐํ๋ฉ๋๋ค.
- ์ ํ๋ฆฌ์ผ์ด์
์์ new NioEventLoopGroup(n) ํธ์ถ
- ๋ด๋ถ์ ์ผ๋ก n๊ฐ์ NioIoHandler ์์ฑ
- ๊ฐ NioIoHandler ์์ฑ์์์ provider.openSelector() ํธ์ถ
- Linux ํ๊ฒฝ์์๋ EPollSelectorImpl ์ธ์คํด์ค ์์ฑ
EPollSelectorImpl ์์ฑ ์ ๋ค์๊ณผ ๊ฐ์ ์ด๊ธฐํ๊ฐ ์ด๋ฃจ์ด์ง๋๋ค.
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
// 1. epoll ์ธ์คํด์ค ์์ฑ (epoll_create ์์คํ
์ฝ)
this.epfd = EPoll.create();
// 2. epoll_wait ๊ฒฐ๊ณผ๋ฅผ ์ ์ฅํ ๋ค์ดํฐ๋ธ ๋ฉ๋ชจ๋ฆฌ ํ ๋น
this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS);
// 3. wakeup์ฉ EventFD ์์ฑ
this.eventfd = new EventFD();
IOUtil.configureBlocking(IOUtil.newFD(eventfd.efd()), false);
// 4. EventFD๋ฅผ epoll์ EPOLLIN์ผ๋ก ๋ฑ๋ก
EPoll.ctl(epfd, EPOLL_CTL_ADD, eventfd.efd(), EPOLLIN);
}
์ฆ, ํ๋์ EventLoop๋ง๋ค ํ๋์ epoll ์ธ์คํด์ค๊ฐ ๋งคํ๋ฉ๋๋ค.
epoll์ ์ธ ๊ฐ์ง ์์คํ
์ฝ
epoll์ ์ธ ๊ฐ์ง ์์คํ
์ฝ์ ์ ๊ณตํฉ๋๋ค
-
epoll_create: epoll ์ธ์คํด์ค(์ฑ๋ ๊ฐ์ ์ ์ฅ์) ์์ฑ
-
epoll_ctl: ๊ฐ์ํ FD ์ถ๊ฐ/์์ /์ญ์
-
epoll_wait: ์ด๋ฒคํธ(read/write)๊ฐ ๋ฐ์ํ ๋๊น์ง ๋๊ธฐํ๊ณ , ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ FD ๋ชฉ๋ก์ ๋ฐํ
JDK์ EPoll.wait()๋ JNI๋ฅผ ํตํด ์ปค๋์ epoll_wait() ์์คํ
์ฝ์ ์ง์ ํธ์ถํฉ๋๋ค.
epoll_wait()๋ ๋ฏธ๋ฆฌ ํ ๋น๋ ๋ค์ดํฐ๋ธ ๋ฉ๋ชจ๋ฆฌ์ epoll_event ๊ตฌ์กฐ์ฒด ๋ฐฐ์ด์ ์ค๋น๋ ์ด๋ฒคํธ ์ ๋ณด๋ฅผ ์ฑ์ฐ๊ณ , ์ค๋น๋ ์ด๋ฒคํธ ๊ฐ์๋ฅผ ๋ฐํํฉ๋๋ค. ์ด ๋ฐฐ์ด์๋ ๊ฐ FD์ ๋ฐ์ํ ์ด๋ฒคํธ ํ์
(EPOLLIN/EPOLLOUT/EPOLLERR ๋ฑ)์ด ๋ด๊ฒจ ์์ต๋๋ค.
EPollSelectorImpl.doSelect()์์ ์ด ๋ฉ์๋๋ค์ด ์ค์ ๋ก ํธ์ถ๋๋ ํ๋ฆ์ ๋ณด๋ฉด:
// EpollSelectorImpl.java
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout) throws IOException {
int to = (int) Math.min(timeout, Integer.MAX_VALUE);
<span class="kt">int</span> <span class="n">numEntries</span><span class="o">;</span>
<span class="n">processUpdateQueue</span><span class="o">();</span> <span class="c1">// epoll_ctl๋ก ๊ด์ฌ ์ด๋ฒคํธ ๋ณ๊ฒฝ ๋ฐ์</span>
<span class="n">processDeregisterQueue</span><span class="o">();</span>
<span class="k">try</span> <span class="o">{</span>
<span class="n">begin</span><span class="o">(</span><span class="n">blocking</span><span class="o">);</span>
<span class="c1">// epoll_wait ์์คํ
์ฝ ํธ์ถ</span>
<span class="n">numEntries</span> <span class="o">=</span> <span class="nc">EPoll</span><span class="o">.</span><span class="na">wait</span><span class="o">(</span><span class="n">epfd</span><span class="o">,</span> <span class="n">pollArrayAddress</span><span class="o">,</span> <span class="no">NUM_EPOLLEVENTS</span><span class="o">,</span> <span class="n">to</span><span class="o">);</span>
<span class="o">}</span> <span class="k">finally</span> <span class="o">{</span>
<span class="n">end</span><span class="o">(</span><span class="n">blocking</span><span class="o">);</span>
<span class="o">}</span>
<span class="c1">// ๋ฐํ๋ ์ด๋ฒคํธ ์ฒ๋ฆฌ</span>
<span class="k">return</span> <span class="nf">processEvents</span><span class="o">(</span><span class="n">numEntries</span><span class="o">,</span> <span class="n">action</span><span class="o">);</span>
}
- processSelectedKeys() - I/O ์ด๋ฒคํธ ์ฒ๋ฆฌ
EPoll.wait()๊ฐ ์ด๋ฒคํธ ๊ฐ์๋ฅผ ๋ฐํํ๋ฉด, EPollSelectorImpl.processEvents()๊ฐ ํด๋น ๊ฐ์๋งํผ ์ด๋ฒคํธ ๋ฐฐ์ด์ ์ํํ๋ฉฐ ๊ฐ FD์ ์ฐ๊ฒฐ๋ SelectionKey๋ฅผ ์ฐพ์ selectedKeys์ ์ถ๊ฐํฉ๋๋ค. ์ดํ Netty์ NioIoHandler.processSelectedKeys()๊ฐ seletedKeys๋ฅผ ์ํํ๋ฉฐ ๊ฐ ์ฑ๋์ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํฉ๋๋ค.
// NioIoHandler.java
private int processSelectedKeysOptimized() {
int handled = 0;
for (int i = 0; i < selectedKeys.size; ++i) {
SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null; // GC๋ฅผ ์ํด null ์ฒ๋ฆฌ
<span class="n">processSelectedKey</span><span class="o">(</span><span class="n">k</span><span class="o">);</span> <span class="c1">// ๊ฐ ์ด๋ฒคํธ ์ฒ๋ฆฌ</span>
<span class="o">++</span><span class="n">handled</span><span class="o">;</span>
<span class="o">}</span>
<span class="k">return</span> <span class="n">handled</span><span class="o">;</span>
}
private void processSelectedKey(SelectionKey k) {
final DefaultNioRegistration registration = (DefaultNioRegistration) k.attachment();
<span class="c1">// ์ค๋น๋ ์ด๋ฒคํธ๋ฅผ ํธ๋ค๋ฌ์ ์ ๋ฌ</span>
<span class="c1">// OP_READ โ ๋ฐ์ดํฐ ์์ </span>
<span class="c1">// OP_WRITE โ ๋ฐ์ดํฐ ์ก์ </span>
<span class="c1">// OP_CONNECT โ ์ฐ๊ฒฐ ์๋ฃ</span>
<span class="c1">// OP_ACCEPT โ ์ ์ฐ๊ฒฐ ์์ฒญ</span>
<span class="n">registration</span><span class="o">.</span><span class="na">handle</span><span class="o">(</span><span class="n">k</span><span class="o">.</span><span class="na">readyOps</span><span class="o">());</span>
}
registration.handle()์ ๋ด๋ถ์ ์ผ๋ก AbstractNioChannel.AbstractNioUnsafe.handle()์ ํธ์ถํฉ๋๋ค. ์ด ๋ฉ์๋๋ ์ด๋ฒคํธ ํ์
์ ๋ฐ๋ผ ์ ์ ํ ์ฒ๋ฆฌ๋ฅผ ์ํํฉ๋๋ค.
// AbstractNioChannel.java:420-450
@Override
public void handle(IoRegistration registration, IoEvent event) {
NioIoOps nioReadyOps = ((NioIoEvent) event).ops();
<span class="c1">// 1. OP_CONNECT: ์ฐ๊ฒฐ ์๋ฃ ์ฒ๋ฆฌ (๊ฐ์ฅ ๋จผ์ ์ฒ๋ฆฌ)</span>
<span class="k">if</span> <span class="o">(</span><span class="n">nioReadyOps</span><span class="o">.</span><span class="na">contains</span><span class="o">(</span><span class="nc">NioIoOps</span><span class="o">.</span><span class="na">CONNECT</span><span class="o">))</span> <span class="o">{</span>
<span class="n">removeAndSubmit</span><span class="o">(</span><span class="nc">NioIoOps</span><span class="o">.</span><span class="na">CONNECT</span><span class="o">);</span>
<span class="n">unsafe</span><span class="o">().</span><span class="na">finishConnect</span><span class="o">();</span>
<span class="o">}</span>
<span class="c1">// 2. OP_WRITE: ์ฐ๊ธฐ ๊ฐ๋ฅ ์ํ - ๋๊ธฐ ์ค์ธ ๋ฒํผ ์ ์ก</span>
<span class="k">if</span> <span class="o">(</span><span class="n">nioReadyOps</span><span class="o">.</span><span class="na">contains</span><span class="o">(</span><span class="nc">NioIoOps</span><span class="o">.</span><span class="na">WRITE</span><span class="o">))</span> <span class="o">{</span>
<span class="n">forceFlush</span><span class="o">();</span>
<span class="o">}</span>
<span class="c1">// 3. OP_READ / OP_ACCEPT: ๋ฐ์ดํฐ ์์ ๋๋ ์ ์ฐ๊ฒฐ ์๋ฝ</span>
<span class="k">if</span> <span class="o">(</span><span class="n">nioReadyOps</span><span class="o">.</span><span class="na">contains</span><span class="o">(</span><span class="nc">NioIoOps</span><span class="o">.</span><span class="na">READ_AND_ACCEPT</span><span class="o">)</span> <span class="o">||</span> <span class="n">nioReadyOps</span><span class="o">.</span><span class="na">equals</span><span class="o">(</span><span class="nc">NioIoOps</span><span class="o">.</span><span class="na">NONE</span><span class="o">))</span> <span class="o">{</span>
<span class="n">read</span><span class="o">();</span>
<span class="o">}</span>
}
- runAllTasks() - Non-I/O Task ์ฒ๋ฆฌ
I/O ์ด๋ฒคํธ ์ฒ๋ฆฌ๊ฐ ๋๋๋ฉด runAllTasks()๊ฐ ํธ์ถ๋์ด TaskQueue์ ์์ธ ์์
๋ค์ ์ฒ๋ฆฌํฉ๋๋ค. WebClient์ HTTP ์์ฒญ๋ ๋ฐ๋ก ์ด ๋จ๊ณ์์ ์ค์ ๋ก ์ ์ก๋ฉ๋๋ค.
// SingleThreadEventExecutor.java
protected boolean runAllTasks(long timeoutNanos) {
// ์ค์ผ์ค ํ์์ ์คํ ๊ฐ๋ฅํ ํ์คํฌ๋ฅผ TaskQueue๋ก ์ด๋
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
<span class="kd">final</span> <span class="kt">long</span> <span class="n">deadline</span> <span class="o">=</span> <span class="n">timeoutNanos</span> <span class="o">></span> <span class="mi">0</span> <span class="o">?</span> <span class="n">getCurrentTimeNanos</span><span class="o">()</span> <span class="o">+</span> <span class="n">timeoutNanos</span> <span class="o">:</span> <span class="mi">0</span><span class="o">;</span>
<span class="kt">long</span> <span class="n">runTasks</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
<span class="k">for</span> <span class="o">(;;)</span> <span class="o">{</span>
<span class="n">safeExecute</span><span class="o">(</span><span class="n">task</span><span class="o">);</span> <span class="c1">// ํ
์คํฌ ์คํ</span>
<span class="n">runTasks</span> <span class="o">++;</span>
<span class="c1">// Check timeout every 64 tasks because nanoTime() is relatively expensive.</span>
<span class="c1">// XXX: Hard-coded value - will make it configurable if it is really a problem.</span>
<span class="k">if</span> <span class="o">((</span><span class="n">runTasks</span> <span class="o">&</span> <span class="mh">0x3F</span><span class="o">)</span> <span class="o">==</span> <span class="mi">0</span><span class="o">)</span> <span class="o">{</span>
<span class="n">lastExecutionTime</span> <span class="o">=</span> <span class="n">getCurrentTimeNanos</span><span class="o">();</span>
<span class="k">if</span> <span class="o">(</span><span class="n">lastExecutionTime</span> <span class="o">>=</span> <span class="n">deadline</span><span class="o">)</span> <span class="o">{</span>
<span class="k">break</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="n">task</span> <span class="o">=</span> <span class="n">pollTask</span><span class="o">();</span>
<span class="k">if</span> <span class="o">(</span><span class="n">task</span> <span class="o">==</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span>
<span class="n">lastExecutionTime</span> <span class="o">=</span> <span class="n">getCurrentTimeNanos</span><span class="o">();</span>
<span class="k">break</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="n">afterRunningAllTasks</span><span class="o">();</span>
<span class="k">return</span> <span class="kc">true</span><span class="o">;</span>
<span class="o">}</span>
์ ์ฒด ํ๋ฆ ์์ฝ
์ง๊ธ๊น์ง ์ดํด๋ณธ ๋ด์ฉ์ ํ๋์ ๋ค์ด์ด๊ทธ๋จ์ผ๋ก ์ ๋ฆฌํ๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค.

๋ณ๋ชฉ์ด ๋ฐ์ํ ์ธ์คํด์ค์ vCPU ์๋ 2๊ฐ์์ต๋๋ค. Netty์ EventLoop ์ค๋ ๋ ์๋ ๊ธฐ๋ณธ์ ์ผ๋ก Math.max(availableProcessors(), 4)๋ก ๊ฒฐ์ ๋๋ฏ๋ก, ์ด ํ๊ฒฝ์์๋ EventLoop ์ค๋ ๋๊ฐ ์ด 4๊ฐ ์กด์ฌํฉ๋๋ค.
์ง๊ธ๊น์ง ์ดํด๋ณธ ๋ฐ์ ๊ฐ์ด Netty์ EventLoop๋ Multiplexing I/O ๋ฐฉ์์ผ๋ก ๋์ํ๊ธฐ ๋๋ฌธ์ ์ ์ ์์ ์ค๋ ๋๋ก๋ ๋ง์ ๋์ ์์ฒญ์ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค. epoll_wait()์ ์์ฒ ๊ฐ์ ์ฑ๋์ด ๋ฑ๋ก๋์ด ์์ด๋ ์ค์ ๋ก I/O ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ ์ฑ๋๋ง ๋ฐํํ๋ฏ๋ก, ๋์ ์์ฒญ ์๊ฐ EventLoop ์ค๋ ๋ ์๋ณด๋ค ๋ง๋ค๊ณ ํด์ ๋ณ๋ชฉ์ด ๋ฐ์ํ์ง๋ ์์ต๋๋ค.
๋ฐ๋ผ์ "๋์ ์์ฒญ 10๊ฐ > EventLoop ์ค๋ ๋ 4๊ฐ"๋ ๋ณ๋ชฉ์ ์์ธ์ด ์๋๋๋ค.
๋ ๋ค๋ฅธ ๊ฐ์ค: Parallel Scheduler ์ค๋ ๋ ๊ฒฝํฉ
๊ทธ๋ ๋ค๋ฉด ๋ฌด์์ด ๋ฌธ์ ์์๊น์? ๋ฌธ์ ๊ฐ ๋ฐ์ํ ์ฝ๋๋ฅผ ๋ค์ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
return omsClient.post()
.uri("/refine-address")
.bodyValue(request)
.retrieve()
.bodyToMono(RefineAddressOutput.class)
.timeout(Duration.ofSeconds(5)) // โ ์ฌ๊ธฐ
.retryWhen(