Reactor Flux에서 사용하는 스케쥴링은 publishOn()과 subscribeOn()을 이용하여 쓰레드를 분리하는 것임
그러나 설명만으로는 스케쥴링 계획과 쓰레드 분리에 대해 이해하기 쉽지 않은 관계로, 직관적으로 이해할 수 있도록 흐름도를 그려보았음
Flux.range(1, 3)
.map(i -> {
System.out.println("1st map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.map(i -> {
System.out.println("2nd map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.subscribe(i -> {
System.out.println("subscribe():" + Thread.currentThread().getName() + " " + i);
});
1st map(): main 1
2nd map(): main 1
subscribe():main 1
1st map(): main 2
2nd map(): main 2
subscribe():main 2
1st map(): main 3
2nd map(): main 3
subscribe():main 3
publishOn() 앞쪽의 publisher chain는 그대로 두고 뒷쪽의 subscriber chain만 별도의 쓰레드로 분리함
비유하자면 "과거와는 결별하고 나는 나만의 길을 가겠어!"라는 의미에 가까움
일반적으로 빠른 publisher와 느린 subscriber로 chain이 구성될 때 사용함. 외부 의존성에 쓰기를 수행할 때 필요함
Flux.range(1, 3)
.map(i -> {
System.out.println("1st map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.publishOn(schedulerA)
.map(i -> {
System.out.println("2nd map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.blockLast();
1st map(): main 1
1st map(): main 2
1st map(): main 3
2nd map(): scheduler-a-1 1
subscribe():scheduler-a-1 1
2nd map(): scheduler-a-1 2
subscribe():scheduler-a-1 2
2nd map(): scheduler-a-1 3
subscribe():scheduler-a-1 3
Flux.range(1, 3)
.map(i -> {
System.out.println("1st map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.publishOn(schedulerA)
.map(i -> {
System.out.println("2nd map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.publishOn(schedulerB)
.map(i -> {
System.out.println("3rd map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.subscribe(i -> {
System.out.println("subscribe():" + Thread.currentThread().getName() + " " + i);
});
그러나 스케쥴링 계획의 순서와 실제 쓰레드 생성 순서는 상이함
b-1
이 먼저 생성되고 a-2
가 나중에 생성되었음을 알 수 있음1st map(): main 1
1st map(): main 2
1st map(): main 3
2nd map(): scheduler-a-2 1
2nd map(): scheduler-a-2 2
2nd map(): scheduler-a-2 3
3rd map(): scheduler-b-1 1
subscribe():scheduler-b-1 1
3rd map(): scheduler-b-1 2
subscribe():scheduler-b-1 2
3rd map(): scheduler-b-1 3
subscribe():scheduler-b-1 3
subscribeOn() 앞쪽의 publisher chain와 뒷쪽의 subscriber chain을 함께 묶어서 별도의 쓰레드로 분리함
비유하자면 "과거의 책임자들을 다 내쫓고 내가 다 책임지겠어!"에 가까움
subscribeOn()이 호출되는 위치와 상관없음
일반적으로 느린 publisher와 빠른 subscriber로 구성된 chain에서 사용함. 외부 의존성으로부터 읽어올 때 subscribeOn()을 사용하는 게 필요함
Flux.range(1, 3)
.map(i -> {
System.out.println("1st map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.subscribeOn(schedulerA)
.map(i -> {
System.out.println("2nd map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.subscribe(i -> {
System.out.println("subscribe():" + Thread.currentThread().getName() + " " + i);
});
1st map(): scheduler-a-1 1
2nd map(): scheduler-a-1 1
subscribe():scheduler-a-1 1
1st map(): scheduler-a-1 2
2nd map(): scheduler-a-1 2
subscribe():scheduler-a-1 2
1st map(): scheduler-a-1 3
2nd map(): scheduler-a-1 3
subscribe():scheduler-a-1 3
Flux.range(1, 3)
.map(i -> {
System.out.println("1st map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.subscribeOn(schedulerA)
.map(i -> {
System.out.println("2nd map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.subscribeOn(schedulerB)
.map(i -> {
System.out.println("3rd map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.subscribe(i -> {
System.out.println("subscribe():" + Thread.currentThread().getName() + " " + i);
});
1st map(): scheduler-a-2 1
2nd map(): scheduler-a-2 1
3rd map(): scheduler-a-2 1
subscribe():scheduler-a-2 1
1st map(): scheduler-a-2 2
2nd map(): scheduler-a-2 2
3rd map(): scheduler-a-2 2
subscribe():scheduler-a-2 2
1st map(): scheduler-a-2 3
2nd map(): scheduler-a-2 3
3rd map(): scheduler-a-2 3
subscribe():scheduler-a-2 3
Flux.range(1, 3)
.map(i -> {
System.out.println("1st map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.publishOn(schedulerA)
.map(i -> {
System.out.println("2nd map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.subscribeOn(schedulerB)
.map(i -> {
System.out.println("3rd map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.subscribe(i -> {
System.out.println("subscribe():" + Thread.currentThread().getName() + " " + i);
});
1st map(): scheduler-b-1 1
1st map(): scheduler-b-1 2
1st map(): scheduler-b-1 3
2nd map(): scheduler-a-2 1
3rd map(): scheduler-a-2 1
subscribe():scheduler-a-2 1
2nd map(): scheduler-a-2 2
3rd map(): scheduler-a-2 2
subscribe():scheduler-a-2 2
2nd map(): scheduler-a-2 3
3rd map(): scheduler-a-2 3
subscribe():scheduler-a-2 3
Flux.range(1, 3)
.map(i -> {
System.out.println("1st map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.subscribeOn(schedulerA)
.map(i -> {
System.out.println("2nd map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.publishOn(schedulerB)
.map(i -> {
System.out.println("3rd map(): " + Thread.currentThread().getName() + " " + i);
return i;
})
.subscribe(i -> {
System.out.println("subscribe():" + Thread.currentThread().getName() + " " + i);
});
1st map(): scheduler-a-2 1
2nd map(): scheduler-a-2 1
1st map(): scheduler-a-2 2
2nd map(): scheduler-a-2 2
1st map(): scheduler-a-2 3
2nd map(): scheduler-a-2 3
3rd map(): scheduler-b-1 1
subscribe():scheduler-b-1 1
3rd map(): scheduler-b-1 2
subscribe():scheduler-b-1 2
3rd map(): scheduler-b-1 3
subscribe():scheduler-b-1 3
Flux.range(1, 3)
.map(i -> {
System.out.println("1st map():" + Thread.currentThread().getName() + " " + i);
return i;
})
.publishOn(schedulerA)
.map(i -> {
System.out.println("2nd map():" + Thread.currentThread().getName() + " " + i);
return Flux.fromIterable(Arrays.asList("A", "B"))
.map(j -> {
System.out.println("nested 1st map():" + Thread.currentThread().getName() + " " + i + " " + j);
return j;
})
.publishOn(schedulerB)
.map(j -> {
System.out.println("nested 2nd map():" + Thread.currentThread().getName() + " " + i +
" " + j);
return j;
})
.subscribe(j -> {
System.out.println("subscribe():" + Thread.currentThread().getName() + " " + i + " " + j);
});
});
1st map():main 1
1st map():main 2
1st map():main 3
2nd map():scheduler-a-1 1
nested 1st map():scheduler-a-1 1 A
nested 1st map():scheduler-a-1 1 B
2nd map():scheduler-a-1 2
nested 1st map():scheduler-a-1 2 A
nested 1st map():scheduler-a-1 2 B
2nd map():scheduler-a-1 3
nested 2nd map():scheduler-b-2 1 A
nested 2nd map():scheduler-b-3 2 A
subscribe():scheduler-b-2 1 A
nested 2nd map():scheduler-b-2 1 B
subscribe():scheduler-b-2 1 B
subscribe():scheduler-b-3 2 A
nested 1st map():scheduler-a-1 3 A
nested 1st map():scheduler-a-1 3 B
nested 2nd map():scheduler-b-3 2 B
nested 2nd map():scheduler-b-4 3 A
subscribe():scheduler-b-3 2 B
subscribe():scheduler-b-4 3 A
nested 2nd map():scheduler-b-4 3 B
subscribe():scheduler-b-4 3 B
Flux.range(1, 3)
.map(i -> {
System.out.println("1st map():" + Thread.currentThread().getName() + " " + i);
return i;
})
.publishOn(schedulerA)
.map(i -> {
System.out.println("2nd map():" + Thread.currentThread().getName() + " " + i);
return Flux.fromIterable(Arrays.asList("A", "B"))
.map(j -> {
System.out.println("nested 1st map():" + Thread.currentThread().getName() + " " + i + " " + j);
return j;
})
.subscribeOn(schedulerB)
.map(j -> {
System.out.println("nested 2nd map():" + Thread.currentThread().getName() + " " + i +
" " + j);
return j;
})
.subscribe(j -> {
System.out.println("subscribe():" + Thread.currentThread().getName() + " " + i + " " + j);
});
});
그러나 바깥쪽과 내부의 Flux stream이 분리되어 있으므로 스케쥴러B는 스케쥴러A가 담당하는 chain까지 넘겨받을 수는 없음
1st map():main 1
1st map():main 2
1st map():main 3
2nd map():scheduler-a-1 1
2nd map():scheduler-a-1 2
2nd map():scheduler-a-1 3
nested 1st map():scheduler-b-3 2 A
nested 1st map():scheduler-b-4 3 A
nested 2nd map():scheduler-b-3 2 A
nested 1st map():scheduler-b-2 1 A
nested 2nd map():scheduler-b-2 1 A
nested 2nd map():scheduler-b-4 3 A
subscribe():scheduler-b-3 2 A
nested 1st map():scheduler-b-3 2 B
nested 2nd map():scheduler-b-3 2 B
subscribe():scheduler-b-3 2 B
subscribe():scheduler-b-4 3 A
nested 1st map():scheduler-b-4 3 B
nested 2nd map():scheduler-b-4 3 B
subscribe():scheduler-b-4 3 B
subscribe():scheduler-b-2 1 A
nested 1st map():scheduler-b-2 1 B
nested 2nd map():scheduler-b-2 1 B
subscribe():scheduler-b-2 1 B
Flux.range(1, 3)
.map(i -> {
System.out.println("1st map():" + Thread.currentThread().getName() + " " + i);
return i;
})
.subscribeOn(schedulerA)
.map(i -> {
System.out.println("2nd map():" + Thread.currentThread().getName() + " " + i);
return Flux.fromIterable(Arrays.asList("A", "B"))
.map(j -> {
System.out.println("3rd map():" + Thread.currentThread().getName() + " " + i + " " + j);
return j;
})
.subscribeOn(schedulerB)
.map(j -> {
System.out.println("4th map():" + Thread.currentThread().getName() + " " + i + " " + j);
return "value " + j;
})
.subscribe(j -> {
System.out.println("subscribe():" + Thread.currentThread().getName() + " " + i + " " + j);
});
})
.blockLast();
그러나 바깥쪽과 내부의 Flux stream이 분리되어 있으므로 스케쥴러B는 스케쥴러A가 담당하는 chain까지 넘겨받을 수는 없음
1st map():scheduler-a-1 1
2nd map():scheduler-a-1 1
1st map():scheduler-a-1 2
2nd map():scheduler-a-1 2
1st map():scheduler-a-1 3
2nd map():scheduler-a-1 3
nested 1st map():scheduler-b-3 2 A
nested 1st map():scheduler-b-2 1 A
nested 1st map():scheduler-b-4 3 A
nested 2nd map():scheduler-b-2 1 A
nested 2nd map():scheduler-b-3 2 A
nested 2nd map():scheduler-b-4 3 A
subscribe():scheduler-b-2 1 A
nested 1st map():scheduler-b-2 1 B
nested 2nd map():scheduler-b-2 1 B
subscribe():scheduler-b-3 2 A
subscribe():scheduler-b-2 1 B
nested 1st map():scheduler-b-3 2 B
subscribe():scheduler-b-4 3 A
nested 2nd map():scheduler-b-3 2 B
nested 1st map():scheduler-b-4 3 B
subscribe():scheduler-b-3 2 B
nested 2nd map():scheduler-b-4 3 B
subscribe():scheduler-b-4 3 B
Flux.range(1, 3)
.map(i -> {
System.out.println("1st map():" + Thread.currentThread().getName() + " " + i);
return i;
})
.subscribeOn(schedulerA)
.map(i -> {
System.out.println("2nd map():" + Thread.currentThread().getName() + " " + i);
return Flux.fromIterable(Arrays.asList("A", "B"))
.map(j -> {
System.out.println("3rd map():" + Thread.currentThread().getName() + " " + i + " " + j);
return j;
})
.publishOn(schedulerB)
.map(j -> {
System.out.println("4th map():" + Thread.currentThread().getName() + " " + i + " " + j);
return "value " + j;
})
.subscribe(j -> {
System.out.println("subscribe():" + Thread.currentThread().getName() + " " + i + " " + j);
});
})
.blockLast();
1st map():scheduler-a-1 1
2nd map():scheduler-a-1 1
nested 1st map():scheduler-a-1 1 A
nested 1st map():scheduler-a-1 1 B
1st map():scheduler-a-1 2
2nd map():scheduler-a-1 2
nested 1st map():scheduler-a-1 2 A
nested 1st map():scheduler-a-1 2 B
nested 2nd map():scheduler-b-2 1 A
1st map():scheduler-a-1 3
2nd map():scheduler-a-1 3
nested 1st map():scheduler-a-1 3 A
nested 2nd map():scheduler-b-3 2 A
subscribe():scheduler-b-2 1 A
nested 2nd map():scheduler-b-4 3 A
subscribe():scheduler-b-4 3 A
nested 1st map():scheduler-a-1 3 B
nested 2nd map():scheduler-b-2 1 B
subscribe():scheduler-b-2 1 B
nested 2nd map():scheduler-b-4 3 B
subscribe():scheduler-b-3 2 A
subscribe():scheduler-b-4 3 B
nested 2nd map():scheduler-b-3 2 B
subscribe():scheduler-b-3 2 B
앞서 간단히 언급했던 것처럼, publishOn()은 빠른 publisher와 느린 subscriber로 구성된 chain에 사용하고, subscribeOn()은 반대로 느린 publisher와 빠른 subscriber로 구성된 chain에 사용하는 게 적절함
일반적으로 내부 서비스 로직이 좀 더 빠르고 외부 의존성은 더 느리거나 IO blocking이 발생할 수 있음. 외부 의존성(웹서비스나 DB서버)에서 데이터를 읽어오거나 쓰는 경우에는 모두 별도의 스케쥴링을 적용하는 게 바람직함. 그렇지 않으면 main 쓰레드가 block되어 전체 서비스가 멈추기 때문임.
외부 의존성에 쓰는 경우에는 외부 의존성에 가까운 subscriber쪽 chain을 별도의 스케쥴링으로 관리할 수 있도록 분리하는 게 필요함. 여기에 적합한 것이 publishOn()임
외부 의존성으로부터 읽어오는 경우에는 외부 의존성이 데이터를 생성하는 속도에 맞춰서 내부 서비스 로직이 움직여야 하므로 같은 스케쥴링이 적용되지만
main 쓰레드와는 별도의 스케쥴링으로 관리되는 게 바람직함. 그러므로 subscribeOn()을 이용하면 됨