Programming/Scala

스칼라 퓨처(Future)와 동시성(Concurrency) - 2

Data Engineer 2019. 8. 17. 11:11

이전 포스트에 이어서 스칼라 Future의 다른 사용법에 대해 살펴보도록 하겠습니다.

Future 실패 처리하기

스칼라 Future는 실패한 퓨처를 다룰 수 있는 failed, fallBackTo, recover, recoverWith 메소드를 제공합니다.

failed

Future[T]에서 실패한 Future를 예외를 저장한 Future[Throwable] 타입의 성공한 Future로 바꿔줍니다.

scala> val failure = Future { 42 / 0 }
failure: scala.concurrent.Future[Int] = ...

scala> failure.value
res23: Option[scala.util.Try[Int]] =
  Some(Failure(java.lang.ArithmeticException: / by zero))

scala> val expectedFailure = failure.failed
expectedFailure: scala.concurrent.Future[Throwable] = ...

scala> expectedFailure.value
res25: Option[scala.util.Try[Throwable]] =
  Some(Success(java.lang.ArithmeticException: / by zero))

만일 성공한 Future에 대해 failed 메소드를 호출하면 NoSuchElementException으로 실패합니다. failed 메소드는 Future가 실패하리라 예상하는 경우에 사용한게 더 적합합니다.

fallbackTo

fallbackTo를 호출한 Future가 실패한 경우 사용할 수 있는 다른 Future를 지정할 수 있습니다. 실패한 Future 대신 성공한 Future로 바꿔주는 것이죠.

scala> val fallback = failure.fallbackTo(success)
fallback: scala.concurrent.Future[Int] = ...

scala> fallback.value
res27: Option[scala.util.Try[Int]] = Some(Success(42))

위 예제에서 실패한 failure를 fallbackTo 메소드를 이용해서 성공한 Future로 바꿔주었습니다. 만약 fallbackTo에 전달된 Future도 실패하는 경우에는 fallbackTo 호출한 Future에서 발생한 예외를 가지고 실패합니다. 다음 예를 통해 살펴보겠습니다.

scala> val failedFallback = failure.fallbackTo(
         Future { val res = 42; require(res < 0); res }
        )
failedFallback: scala.concurrent.Future[Int] = ...

scala> failedFallback.value
res28: Option[scala.util.Try[Int]] =
  Some(Failure(java.lang.ArithmeticException: / by zero))

위의 예에서 보면 fallbackTo 안에 있는 Future의 경우 IllegalArgumentException을 발생합니다. 그러나 fallbackTo안에 있는 Future의 예외가 아닌 failure Future에서 발생한 ArithmeticException을 발생하는 것을 확인할 수 있습니다.

recover

recover 메소드를 사용하면 실패한 Future를 성공한 Future로 변환할 수 있습니다. 만일 원래 Future가 성공하면 그 결과는 그대로 유지됩니다. 예를 살펴보겠습니다.

scala> val recovered = failedFallback recover {
          case ex: ArithmeticException => -1
        }
 recovered: scala.concurrent.Future[Int] = ...

 scala> recovered.value
res32: Option[scala.util.Try[Int]] = Some(Success(-1))

이전에 살펴보았던 failedFallback는 ArithmeticException을 빌생하는데 해당 예외를 -1로 변환해서 성공한 Future로 변경해줍니다.
만일 원래의 Future가 성공이면 원래의 값을 그대로 리턴해주는데 예를 통해 살펴보겠습니다.

scala> val unrecovered = fallback recover {
         case ex: ArithmeticException => -1
       }
unrecovered: scala.concurrent.Future[Int] = ...

scala> unrecovered.value
res33: Option[scala.util.Try[Int]] = Some(Success(42))

위의 예와 같이 원래의 성공한 값을 돌려줍니다.

recoverWith

recoverWith는 recover 메소드와 비슷합니다. 차이는 recover는 문제가 생기면 값으로 복구하는데 반해, recoverWith는 Future 값으로 복구하는 차이가 있습니다. map과 flatMap의 차이라고 볼 수 있겠습니다.

scala> val alsoRecovered = failedFallback recoverWith {
         case ex: ArithmeticException => Future { 42 + 46 }
       }
alsoRecovered: scala.concurrent.Future[Int] = ...

scala> alsoRecovered.value
res35: Option[scala.util.Try[Int]] = Some(Success(88))

recover와 마찬가지로 원래의 Future가 실패하지 않는 경우 원래의 값을 반환합니다. 만일 recoverWith에 전달된 부분 함수내에 원래 Future에서 발생한 예외를 정의하지 않는 경우에도 기존 Future에서 발생한 예외가 그대로 전달됩니다.

transform

Future의 transform 메소드는 Future를 변환하기 위해 두 개의 함수를 받아서 처리합니다. 하나는 Success인 경우의 변환이고, 다른 하나는 Failure에 대한 변환입니다. 먼저 예제를 살펴보도록 하겠습니다.

scala> val first = success.transform(
         res => res * -1,
         ex => new Exception("see cause", ex)
       )
first: scala.concurrent.Future[Int] = ...

퓨처가 성공하면 첫 번째 함수가 사용되고, 실패하면 두 번째 함수가 사용됩니다. transform 메소드는 성공한 Future를 실패로 만들거나 실패한 Future를 성공으로 변환할 수는 없습니다. 그러나 스칼라 2.12부터 Try를 받아서 Try를 돌려주는 transform 메소드가 추가되었습니다. 다음 예제는 실패를 성공으로 변환하는 예를 보여줍니다.

scala> val nonNegative = failure.transform { // Scala 2.12
         case Success(res) => Success(res.abs + 1)
         case Failure(_) => Success(0)
       }
nonNegative: scala.concurrent.Future[Int] = ...

scala> nonNegative.value
res11: Option[scala.util.Try[Int]] = Some(Success(0))

Future 조합하기

Future의 Companion object에는 여러 Future를 조합할 수 있는 메소드를 제공합니다.

zip

2개의 성공적인 Future를 2개의 값을 가진 tuple로 만들어줍니다.

scala> val zippedSuccess = success zip recovered
zippedSuccess: scala.concurrent.Future[(Int, Int)] = ...

scala> zippedSuccess.value
res46: Option[scala.util.Try[(Int, Int)]] =
     Some(Success((42,-1)))

만일 두 Future 중에 하나라도 실패하면 zip이 반환하는 Future도 같은 예외로 실패합니다. 만약 두 개의 Future가 모두 실패하는 경우에는 zip을 호출한 Future의 예외를 가지고 실패합니다.

fold

Future의 동반 객체에서 Future의 TraversableOnce 컬렉션 결과를 누적할 수 있는 fold 메소드 제공합니다. 컬렉션 안의 모든 Future가 성공하면 결과 Future도 누적 값을 가지고 성공하지만 컬렉션에 있는 Future 중 하나가 실패하면 결과 Future도 실패합니다. 결과 Future는 가장 먼저 실패한 예외 값을 가지고 실패합니다. fold를 사용하는 예제는 다음과 같습니다.

scala> val fortyTwo = Future { 21 + 21 }
fortyTwo: scala.concurrent.Future[Int] = ...

scala> val fortySix = Future { 23 + 23 }
fortySix: scala.concurrent.Future[Int] = ...

scala> val futureNums = List(fortyTwo, fortySix)
futureNums: List[scala.concurrent.Future[Int]] = ...

scala> val folded =
        Future.fold(futureNums)(0) { (acc, num) =>
          acc + num
        }
folded: scala.concurrent.Future[Int] = ...

scala> folded.value
res53: Option[scala.util.Try[Int]] = Some(Success(88))

reduce

Future.reduce 메소드는 초기값 없이 fold를 진행합니다. 대신 초기값으로 첫 번째 Future의 값을 사용합니다.

scala> val reduced =
        Future.reduce(futureNums) { (acc, num) =>
          acc + num
        }
reduced: scala.concurrent.Future[Int] = ...

scala> reduced.value
res54: Option[scala.util.Try[Int]] = Some(Success(88))

sequence

Future.sequence 메소드는 Future로 이루어진 TraversableOnce 컬렉션을 TraversableOnce 값이 담긴 Future로 변환해줍니다. 예를 들어 List[Future[Int]]를 Future[List[Int]]로 변환시켜줍니다. 다음은 sequence를 사용하는 예입니다.

scala> val futureList = Future.sequence(futureNums)
futureList: scala.concurrent.Future[List[Int]] = ...

scala> futureList.value
res55: Option[scala.util.Try[List[Int]]] =
  Some(Success(List(42, 46)))

traverse

Future.traverse 메소드는 TraversableOnce를 Future의 TraversableOnce로 변환하고, 값의 TraversableOnce를 완료하는 Future로 시퀀스화해줍니다.. 다시 말해서 List[Int] -> List[Future[Int]] -> Future[List[Int]]로 바꾸어 처리합니다. 다음 예를 보면 이해가 더 수월하실 것입니다.

scala> val traversed =
        Future.traverse(List(1, 2, 3)) { i => Future(i) }
traversed: scala.concurrent.Future[List[Int]] = ...

scala> traversed.value
res58: Option[scala.util.Try[List[Int]]] =
  Some(Success(List(1, 2, 3)))

Side-effects 처리하기

Future가 완료된 다음 side-effect를 수행해야 할 때가 있습니다. 이러한 side-effect를 처리하기 위해서 Future는 다양한 함수를 제공합니다.

foreach

Future가 성공적으로 완료된 경우 side-effect를 수행합니다. 다음 예에서 Future가 실패하는 경우 println이 실행되지 않고, 성공한 경우에만 실행됩니다.

scala> failure.foreach(ex => println(ex))

scala> success.foreach(res => println(res))
42

yield가 없는 for는 foreach로 바뀌기 때문에 for를 사용해도 마찬가지 효과를 얻을 수 있습니다.

scala> for (res <- failure) println(res)

scala> for (res <- success) println(res)
42

callback 함수

Future는 콜백 함수를 등록하기 위해 두 가지 메소드르 제공합니다. 하나는 onComplete 메소드입니다. onComplete 메소드는 Future의 성공 여부와 관계 없이 완료 시 실행됩니다. callback 함수에는 Try가 전달되므로 성공과 실패 여부에 따라 다르게 처리할 수 있습니다.

scala> success onComplete {
        case Success(res) => println(res)
        case Failure(ex) => println(ex)
      }
42

scala> failure onComplete {
        case Success(res) => println(res)
        case Failure(ex) => println(ex)
      }
java.lang.ArithmeticException: / by zero

Future는 onComplete로 등록한 콜백 함수의 실행 순서에 대해 아무것도 보장해주지 않습니다. 이렇게 콜백 함수의 호출되는 순서를 보장하고 싶다면 andThen 메소드를 대신 사용해야 합니다. andThen 메소드는 andThen을 호출한 Future를 그대로 반영하는 새로운 Future를 반환합니다. 그러나 콜백 함수가 완전히 실행될 때까지는 완료되지 않습니다.

scala> val newFuture = success andThen {
        case Success(res) => println(res)
        case Failure(ex) => println(ex)
      }
42
newFuture: scala.concurrent.Future[Int] = ...

scala> newFuture.value
res76: Option[scala.util.Try[Int]] = Some(Success(42))

2.12에 추가된 메소드

flatten

flatten은 Future[Future[Int]]를 Future[Int]로 변환해줍니다.

scala> val nestedFuture = Future { Future { 42 } }
nestedFuture: Future[Future[Int]] = ...

scala> val flattened = nestedFuture.flatten // Scala 2.12
flattened: scala.concurrent.Future[Int] = Future(Success(42))

zipWith 메소드는 2개의 Future를 묶는 zip과 map을 조합한 메소드입니다.

scala> val futNum = Future { 21 + 21 }
futNum: scala.concurrent.Future[Int] = ...

scala> val futStr = Future { "ans" + "wer" }
futStr: scala.concurrent.Future[String] = ...

scala> val zipped = futNum zip futStr
zipped: scala.concurrent.Future[(Int, String)] = ...

scala> val mapped = zipped map {
         case (num, str) => s"$num is the $str"
       }
mapped: scala.concurrent.Future[String] = ...

scala> mapped.value
res2: Option[scala.util.Try[String]] =
    Some(Success(42 is the answer))

위의 예는 zip과 map을 조합하여 사용한 방식입니다. zipWith를 사용하면 더 간결하게 표현할 수 있습니다.

scala> val fut = futNum.zipWith(futStr) { // Scala 2.12
         case (num, str) => s"$num is the $str"
       }
zipWithed: scala.concurrent.Future[String] = ...

scala> fut.value
res3: Option[scala.util.Try[String]] =
    Some(Success(42 is the answer))

transformWith 메소드를 이용해서 Future를 얻을 수 있습니다. transformWith는 transform 메소드와 유사하지만 transform에 전달하는 함수는 Try를 만들어내지만, transformWith는 Future를 만들어낸다는 점이 다릅니다.

scala> val flipped = success.transformWith { // Scala 2.12
         case Success(res) =>
           Future { throw new Exception(res.toString) }
         case Failure(ex) => Future { 21 + 21 }
       }
flipped: scala.concurrent.Future[Int] = ...

scala> flipped.value
res5: Option[scala.util.Try[Int]] =
     Some(Failure(java.lang.Exception: 42))

스칼라의 Future는 동시성 프로그래밍을 간결하게 할 수 있는 강력한 도구입니다. 이를 사용하면 코드를 단순화하면서 멀티 프로세서의 이점을 누릴 수 있습니다. 기존의 동시성 프로그래밍을 지원하기 위한 스레드, 락, 모니터는 데드락과 레이스 컨디션을 처리하기 어려운 문제가 있었습니다. 그러나 Future는 데드락이나 레이스 컨디션의 큰 위험 없이도 동시성 프로그램을 작성할 수 있게 해줍니다. 이번 포스팅에서는 Future의 생성, 변환 등 Future를 사용하기 위한 기본적인 요소들에 대해 살펴보았습니다.

References

'Programming > Scala' 카테고리의 다른 글

case class (케이스 클래스)  (0) 2019.11.17
스칼라 퓨처(Future)와 동시성(Concurrency) - 1  (0) 2019.08.16