본문 바로가기
Java

RxJava - Flux 함수

by sinabeuro 2023. 8. 27.
728x90

 

 

Reactor Core에서 제공하는 Flux는 0개 이상의 아이템을 처리할 수 있는 리액티브 스트림 시퀀스를 나타냅니다. 

Flux에는 데이터 스트림을 생성, 변환, 조합, 처리하는데 사용할 수 있는 다양한 연산자와 메서드가 있습니다.

 

Flux의 대표적인 함수와 연산자

 

생성 연산자:

  • just(...): 주어진 아이템들로 Flux를 생성합니다.
  • fromIterable(...): Iterable로부터 Flux를 생성합니다.
  • range(...): 주어진 범위의 숫자로 Flux를 생성합니다.
  • empty(): 아무 아이템도 발행하지 않는 Flux를 생성합니다.
  • interval(...): 주어진 시간 간격으로 숫자를 발행하는 Flux를 생성합니다.


변환 연산자:

  • map(...): 각 아이템을 다른 형태로 변환합니다.
  • flatMap(...): 각 아이템을 다른 Flux로 변환하고, 이들을 하나의 Flux로 병합합니다.
  • buffer(...): 아이템들을 주어진 크기의 리스트로 그룹화합니다.


필터 연산자:

  • filter(...): 주어진 조건을 만족하는 아이템만을 포함하는 Flux를 반환합니다.


조합 연산자:

  • merge(...): 여러 Flux 스트림을 하나의 스트림으로 병합합니다.
  • zip(...): 여러 Flux 스트림의 아이템들을 조합하여 새로운 아이템을 생성합니다.


오류 처리:

  • onErrorReturn(...): 오류 발생 시, 지정된 값으로 대체합니다.
  • onErrorResume(...): 오류 발생 시, 다른 Flux로 대체합니다.


구독 메서드:

  • subscribe(...): Flux에 구독하여 아이템들을 소비하고, 오류나 완료 신호를 처리합니다.

 

이 외에도 많은 연산자와 메서드가 있습니다. 위의 리스트는 Flux에서 제공하는 기능의 일부만을 나열한 것입니다. 리액티브 프로그래밍은 복잡한 데이터 플로우와 비동기 작업을 우아하게 처리할 수 있게 도와주기 때문에 많은 연산자와 메서드를 학습하는 것이 중요합니다.

 


Mono의 대표적인 함수와 연산자

생성 연산자:

  • just(...): 주어진 값을 사용하여 Mono를 생성합니다.
  • empty(): 아무 값도 발행하지 않는 Mono를 생성합니다.
  • fromCallable(...): Callable에서 반환되는 값을 사용하여 Mono를 생성합니다.
  • fromFuture(...): 주어진 CompletableFuture로부터 Mono를 생성합니다.


변환 연산자:

  • map(...): Mono의 값을 다른 형태나 타입으로 변환합니다.
  • flatMap(...): 현재 Mono의 값으로 다른 Mono를 생성하고 반환합니다.

 

오류 처리:

  • onErrorReturn(...): 에러 발생 시 주어진 값을 반환하는 Mono로 대체합니다.
  • onErrorResume(...): 오류 발생 시 다른 Mono로 대체합니다.

 

조합 연산자:

  • zipWith(...): 두 Mono의 결과를 조합하여 새로운 값을 생성합니다.
  • and(...): 여러 Mono들이 모두 완료될 때까지 기다립니다.

 

블로킹 연산:

  • block(): Mono의 결과를 블로킹 방식으로 받아옵니다. (이 방법은 리액티브 프로그래밍의 본질과는 다소 상반되는 연산이므로 주의해야 합니다.)

 

구독 메서드:

  • subscribe(...): Mono를 구독하고 그 결과를 처리하거나, 오류나 완료 신호를 처리합니다.

 

이 외에도 많은 연산자와 메서드가 있습니다. 위의 리스트는 Mono에서 제공하는 기능의 일부만을 나열한 것입니다. Mono와 Flux는 Reactor Core에서 제공하는 기본 리액티브 타입이므로 이들의 연산자와 메서드를 잘 이해하고 활용하는 것이 중요합니다.

 

 


 

subscribe와 subscribeOn은 Reactor 라이브러리에서 제공하는 리액티브 스트림의 연산자들입니다. 

두 연산자는 다음과 같은 주요 차이점을 가집니다:

용도:
subscribe: 실제로 리액티브 스트림의 실행을 시작하는 메서드입니다. Flux나 Mono와 같은 리액티브 타입은 구독하기 전까지 데이터를 방출하거나 처리하지 않습니다. subscribe를 호출하면 해당 스트림의 처리가 시작됩니다. subscribe는 파라미터로 Consumer 타입의 콜백(데이터 처리, 에러 처리, 완료 처리 등)을 받을 수 있습니다.
subscribeOn: 리액티브 스트림의 연산이 시작될 스레드 또는 스레드 풀을 지정하는 연산자입니다. 이것은 리액티브 스트림의 실행을 시작하는 것이 아니라 어디서 실행될지만 지정합니다.

 

호출 시점:
subscribe: 이 메서드를 호출하는 순간 리액티브 스트림의 처리가 시작됩니다.
subscribeOn: 이 연산자를 사용하는 것만으로 리액티브 스트림의 처리가 시작되지 않습니다. 여전히 스트림을 구독하기 위해 subscribe를 호출해야 합니다.


동작 방식:
subscribe: 리액티브 스트림의 구독자가 됩니다. 데이터, 에러, 완료 이벤트를 처리하는 콜백을 제공할 수 있습니다.
subscribeOn: 스트림의 연산이 실행될 스레드를 지정합니다. 여러 번 호출하더라도 첫 번째 호출이 적용됩니다.
간단히 말해, subscribeOn은 "어디서" 리액티브 작업이 실행될지를 결정하며, subscribe는 "언제" 그 작업이 시작될지를 결정하고 실제로 실행을 시작합니다.

 


 

Flux와 Mono는 Reactor 라이브러리에서 제공하는 두 가지 기본 리액티브 타입입니다. 각각 다음과 같은 케이스에 주로 사용됩니다:

Flux: 0개 이상의 아이템을 처리할 때 사용됩니다. 예를 들어, 데이터 스트림이나 리스트 처리 등에 적합합니다.
Mono: 정확히 0개 또는 1개의 아이템만을 처리할 때 사용됩니다. 예를 들어, 단일 값의 계산이나 단일 데이터베이스 쿼리 결과 등에 사용됩니다.
왜 같이 사용하는가?
다양한 케이스 처리: 어플리케이션 내에서 단일 값 처리와 다중 값 처리가 섞여 있는 경우가 많습니다. Mono는 단일 결과를, Flux는 여러 결과를 모델링하므로, 두 가지를 혼합하여 사용하면 더 다양한 케이스를 쉽게 처리할 수 있습니다.

비동기 작업의 조합: Flux의 각 아이템에 대해 어떤 비동기 작업을 수행하고 그 결과를 다시 스트림으로 만들고 싶을 때, 그 비동기 작업은 종종 Mono로 표현됩니다. 이 경우 flatMap 등의 연산자를 사용하여 Flux와 Mono를 쉽게 조합할 수 있습니다.

프로그램의 일관성: Mono와 Flux는 리액티브 프로그래밍의 일관성을 유지하는 데 도움을 줍니다. 이 두 타입이 제공하는 연산자(예: map, filter, flatMap 등)는 거의 비슷하기 때문에, 리액티브 스트림을 다룰 때 일관된 방식으로 코딩할 수 있습니다.

Error Handling: 둘 다 예외 처리 메커니즘이 잘 구성되어 있어, Flux 스트림에서 발생한 에러를 Mono를 이용하여 처리하거나 반대의 경우도 쉽게 구현할 수 있습니다.

코드의 명확성: Mono와 Flux를 적절히 사용하면, 작업이 단일 값에 대한 것인지, 여러 값에 대한 것인지 코드를 통해 명확히 알 수 있습니다.

따라서, Flux와 Mono를 혼합하여 사용하면 다양한 비동기 처리 요구 사항을 효과적으로 만족시킬 수 있습니다.

 


 

 

https://velog.io/@zenon8485/Reactor-Java-1.-Mono%EC%99%80-Flux%EB%A5%BC-%EC%83%9D%EC%84%B1%ED%95%98%EB%8A%94-%EB%B0%A9%EB%B2%95

 

Reactor Java 1. Mono와 Flux를 생성하는 방법

Reactor는 Reative Streams 명세를 기반으로 JVM에서 반응성 Non-Blocking 애플리케이션을 생성하기 위한 JAVA 라이브러리입니다.처음에는 이 라이브러리로 작업하는것이 어려울 수 있습니다. 이 시리즈는 M

velog.io

 

https://velog.io/@zenon8485/Reactor-Java-2.-Mono%EC%99%80-Flux%EC%9D%98-%EB%82%B4%EB%B6%80-%EB%8D%B0%EC%9D%B4%ED%84%B0%EB%A5%BC-%EC%A1%B0%EC%9E%91%ED%95%98%EB%8A%94-%EB%B0%A9%EB%B2%95

 

Reactor Java 2. Mono와 Flux의 내부 데이터를 조작하는 방법

Reactor는 Reactive Streams 사양을 기반으로 JVM에서 반응성 비차단 애플리케이션을 생성하기 위한 Java 라이브러리이다. 이 글은 Mono 및 Flux 클래스를 통해 Reactor를 제공하는 Reactive Streams의 실행을 생성

velog.io

 

https://velog.io/@zenon8485/Reactor-Java-3.-Mono%EC%99%80-Flux%EB%8A%94-%EC%96%B4%EB%96%BB%EA%B2%8C-%EB%8F%99%EC%9E%91%ED%95%98%EB%8A%94%EA%B0%80

 

Reactor Java 3. Mono와 Flux는 어떻게 동작하는가?

Reactor는 Reactive Streams 명세를 기반으로한 JVM에서 Reactive Non-Blocking 애플리케이션을 생성하기 위한 Java 라이브러리 입니다.앞에서는 Mono와 Flux를 생성하는 방법과 그들이 가지고 있는 데이터를 변

velog.io

 

https://velog.io/@zenon8485/Reactor-%EB%8D%B0%EC%9D%B4%ED%84%B0%EC%99%80-%EC%8A%A4%ED%8A%B8%EB%A6%BC-%EC%B2%98%EB%A6%AC

 

Reactor: 데이터와 스트림 처리

Reactor는 데이터를 조작하는 다양한 연산자를 제공한다. 이 연산자는 스트림을 받아서 다른 데이터로 구성된 스트림을 반환한다.본격적으로 연산자를 보기 전에 스트림 데이터를 생성해보자. 아

velog.io

https://velog.io/@zenon8485/Reactor-Flow-Control%EA%B3%BC-Back-Pressure

 

Reactor: Flow Control (흐름 조절)

Flow Control Flow Control은 생산자가 많은 이벤트를 발행하면서 구독자를 압박하지 않고 이벤트를 관리하는 것이다. 빠른 생산자는 많은 이벤트를 구독자에게 전달할 수 있다. 구독자는 이벤트를 받

velog.io

 

https://velog.io/@zenon8485/Reactor-BackPressure-%EB%B0%B0%EC%95%95

 

Reactor: BackPressure (배압)

소개 배압은 Reactor의 중요한 부분이다. 각 구독자는 구독 객체를 사용하여 처리한 이벤트를 요청한다. 발행자는 이벤트 요청의 수와 같거나 더 적은 이벤트를 발행해야한다. 이벤드 요청 수를 Lo

velog.io

 

https://velog.io/@zenon8485/Reactor-Java-2.-Mono%EC%99%80-Flux%EC%9D%98-%EB%82%B4%EB%B6%80-%EB%8D%B0%EC%9D%B4%ED%84%B0%EB%A5%BC-%EC%A1%B0%EC%9E%91%ED%95%98%EB%8A%94-%EB%B0%A9%EB%B2%95

 

Reactor Java 2. Mono와 Flux의 내부 데이터를 조작하는 방법

Reactor는 Reactive Streams 사양을 기반으로 JVM에서 반응성 비차단 애플리케이션을 생성하기 위한 Java 라이브러리이다. 이 글은 Mono 및 Flux 클래스를 통해 Reactor를 제공하는 Reactive Streams의 실행을 생성

velog.io

 

https://velog.io/@zenon8485/Reactor-Error-1-Generating-Errors

 

Reactor: Error 1 - Generating Errors

소개 탄력성은 반응형 시스템에서 매우 중요한 관점이다. 반응형 시스템은 실패하는 동안에도 응답 가능한 상태로 남아있어야 한다. 시스템은 에러를 잘 다루며, 사용자의 요청에 적시에 응답

velog.io

 

https://velog.io/@zenon8485/Reactor-Error-2-Recovery

 

Reactor: Error 2 - Recovery

에러를 처리할때, 프로세스를 종료하지 않고 대체할 이벤트 또는 데이터를 발행하고 계속 진행하고 싶을 경우가 있을 수 있다. 여기서는 이러한 목적을 달성하는 방법을 알아보자.Reactor는 예외

velog.io

 

https://velog.io/@zenon8485/Reactor-Execution-Control-1-Scheduler

 

Reactor: Execution Control 1 - Scheduler

Reactor에 대하여 학습하면서 Filtering, Transforming, Collecting 등의 작업을 살펴보았다. 여기서 살펴본 대부분의 작업은 추가적인 쓰레드를 사용하지 않고 Main에서 동작한다. 그러나, Reactor에서는 Schedul

velog.io

 

https://velog.io/@zenon8485/Reactor-Execution-Control-2-Parallel-Processing

 

Reactor: Execution Control 2 - Parallel Processing

Reactor 발행자와 구독자는 쓰레드를 생성하지 않는다. 그러나 이전 글에서 확인했듯이, 이러한 행동을 변경할 수 있는 연산자가 존재한다. 이전 글에서는 delay 연산자가 Reactor chain의 메인 쓰레드

velog.io

 

https://velog.io/@zenon8485/Reactor-Execution-Control-3-Broadcasting

 

728x90

댓글