Programming/Scala

스칼라 퓨처(Future)와 동시성(Concurrency) - 1

Data Engineer 2019. 8. 16. 11:04

오늘 멀티 코어 프로세서가 대중화되면서 동시성에 대한 관심도 많이 늘어났습니다. 이러한 동시성 프로그래밍을 위해서 기존의 프로그래밍 언어들을 블로킹을 사용하여 동기화함으로써 동시성을 지원합니다. 자바의 경우도 마찬가지로 공유 메모리와 락을 기반으로 동시성을 지원하고 있습니다. 그러나 이러한 블로킹을 사용한 동기화의 경우 Deadlock이나 Starvation과 같은 문제가 발생할 수 있습니다. 이렇게 블로킹 기반의 동시성 처리는 이러한 어려움이 있습니다. 그래서 비동기적 프로그래밍을 사용하면 이러한 블로킹을 없앨 수 있습니다. 비동기적(asynchronous) 프로그래밍이란 아래의 그림과 같이 메인 프로그램의 흐름이 있고 이와 독립적으로 실행되는 프로그래밍 스타일을 의미합니다.

 

이미지 출처 :  https://medium.com/@santhoshhari/efficient-web-scraping-with-pythons-asynchronous-programming-6b9e730f1ff7

 

스칼라에서는 이러한 비동기적 프로그래밍을 지원하기 위해 Future라는 것을 지원합니다. Future는 스칼라의 표준라이브러리로써 Future를 사용해 변경 불가능한 상태를 비동기적으로 변환하여 블로킹 방식의 동시성 처리의 어려움을 피할 수 있게 해줍니다.

자바에도 Future가 있지만 스칼라의 Future와 다릅니다. 두 Future 모드 비동기적인 연산의 결과를 표현하지만 자바의 경우 블로킹 방식의 get()을 사용해 결과를 얻어와야 합니다. 오히려 자바 8에 추가된 CompletableFuture가 스칼라의 Future와 비슷합니다. CompletableFuture를 정의하고 그 값을 얻었을 때 행동을 정의할 수 있습니다.

반면 스칼라의 Future는 이미 연산 결과의 완료 여부와 관계 없이 결과 값의 변환을 지정할 수 있습니다. Future의 연산을 수행하는 스레드는 암시적으로 제공되는 Execution context를 사용해 결정됩니다. 이러한 방식을 사용해서 불변 값에 대한 일련의 변환으로 비동기적 연산을 표현할 수 있고, 공유 메모리나 락에 대해 신경 쓸 필요가 없는 장점이 있습니다.

1. 낙원의 곤경

공유 데이터와 락 모델을 사용해 멀티 스레드 애플리케이션을 신뢰성 있게 구축하기가 어렵습니다. 프로그램의 각 위치에서 접근하거나 변경하는 데이터 중 어떤 것을 다른 스레드가 변경하거나 접근할 수 있는지를 추론하고, 어떤 락을 현재 가지고 있는지 알아야 합니다. 거기다가 더 어려운점은 프로그램이 실행되는 도중에 새로운 락을 얼마든지 만들 수 있습니다. 즉, 락이 컴파일 시점에 고정되지 않는다는 점이죠.

자바의 java.util.concurrent 라이브러리는 고수준의 동시성 프로그래밍 추상화를 제공해서 저수준의 공유 데이터와 락 모델을 사용하는 것보다는 오류의 여지가 적지만 마찬가지로 내부적으로 공유 데이터와 락 모델을 사용하기 때문에 해당 문제의 근본적인 어려움을 해소하긴 어렵습니다.

스칼라의 Future는 공유 데이터와 락의 필요성을 줄여주는 동시성 처리 방식을 제공합니다. 물론 Future가 모든 것을 해결할 수 있는 도깨비 방망이는 아닙니다. 하지만 위에서 발생하는 문제를 더욱 간단하게 해줍니다.

2. 비동기 실행과 Try

스칼라에서 메소드를 호출하고 그 결과가 Future라면 그것을 비동기적으로 진행할 다른 연산을 표현하는 것입니다. 이러한 Future를 실행하기 위해서는 암시적인 execution context가 필요합니다.

Future에는 isCompleted와 value 메소드를 통해서 polling 기능을 제공합니다. value 메소드의 경우 Option[Try[T]]의 값을 리턴합니다. Try는 성공을 나타내는 Success와 예외가 들어있는 실패를 나타내는 Failure 중에 하나를 표현합니다. Try의 목적은 동기적 계산이 있는 try 식이 하는 역할을 비동기 연산을 할 때 동일한 기능을 제공해주는 역할을 합니다. Try의 계층 구조는 아래의 그림과 같습니다.

 

이미지 출처 : https://jaxenter.com/cheat-sheet-complete-guide-scala-136558.html

 

동기적 연산에서는 try/catch 구문으로 메소드가 던지는 예외를 그 메소드를 실행하는 스레드에서 잡을 수 있습니다. 그러나 비동기적 연산에서는 연산을 시작한 스레드가 다른 작을 계속 진행하는 경우가 있습니다. 그래서 비동기적 연산에서는 Try 객체를 사용해서 예외가 발생하여 갑자기 종료되는 상황에 대비할 수 있습니다.

3. Future의 사용

map

map을 사용해 Future의 연산 결과를 비동기적으로 변환할 수 있다.

scala> val fut = Future { Thread.sleep(10000); 21 + 21 }
fut: scala.concurrent.Future[Int] = ...

scala> val result = fut.map(x => x + 1)
result: scala.concurrent.Future[Int] = ...

scala> result.value
res6: Option[scala.util.Try[Int]] = Some(Success(43))

위의 예제에서 Future의 생성, 21 + 21 계산, 42 + 1 계산이 다른 세가지 스레드에서 실행됩니다.

for 문으로 Future 변환하기

Future에는 flatMap의 정의도 들어있기 때문에 for 표현식을 통해 Future를 변환할 수 있습니다.

scala> val fut1 = Future { Thread.sleep(10000); 21 + 21 }
fut1: scala.concurrent.Future[Int] = ...

scala> val fut2 = Future { Thread.sleep(10000); 23 + 23 }
fut2: scala.concurrent.Future[Int] = ...

scala> for {
        x <- fut1
        y <- fut2
      } yield x + y
res7: scala.concurrent.Future[Int] = ...

scala> result.value
res8: Option[scala.util.Try[Int]] = Some(Success(88))

위와 다르게 두 Future를 미리 만들지 않은 경우 그 두 Future는 병렬실행되지 않으므로 다음 for 문의 최소 20초가 걸립니다. 그와 다르게 미리 생성한 이전의 예제는 완료되기까지 10여초가 걸립니다.

scala> for {
        x <- Future { Thread.sleep(10000); 21 + 21 }
        y <- Future { Thread.sleep(10000); 23 + 23 }
      } yield x + y
res9: scala.concurrent.Future[Int] = ...

// Will need at least 20 seconds to complete

scala> result.value
res28: Option[scala.util.Try[Int]] = Some(Success(88))

Future 만들기

Future의 동반 객체에는 이미 완료된 Future를 만들기 위해 successful, failed, fromTry라는 3가지 팩토리 메소드를 제공합니다. 해당 메소드를 사용해서 완료된 퓨처를 만들 때는 ExecutionContext가 필요하지 않습니다.

scala> Future.successful { 21 + 21 }
res2: scala.concurrent.Future[Int] = ...

scala> Future.failed(new Exception("bummer!"))
res3: scala.concurrent.Future[Nothing] = ...

Future.fromTry(Success { 21 + 21 })
res4: scala.concurrent.Future[Int] = ...

Future.fromTry(Failure(new Exception("bummer!")))
res5: scala.concurrent.Future[Nothing] = ...

또한 Promise를 사용해서 Future를 만들 수 있습니다. 다음 예제를 보겠습니다.

scala> val pro = Promise[Int]
pro: scala.concurrent.Promise[Int] = ...

scala> val fut = pro.future
fut: scala.concurrent.Future[Int] = ...

scala> fut.value
res8: Option[scala.util.Try[Int]] = None

Primise에 success, failure, complete 메소드를 이용해서 Promise를 완료할 수 있습니다.

scala> pro.success(42)
res9: pro.type = ...

scala> fut.value
res10: Option[scala.util.Try[Int]] = Some(Success(42))

Future 필터링

filter 메소드는 Future의 결과를 확인하고 결과가 참인 경우 그 값을 그대로 남겨둡니다.

scala> val fut = Future { 42 }
fut: scala.concurrent.Future[Int] = ...

scala> val valid = fut.filter(res => res > 0)
valid: scala.concurrent.Future[Int] = ...

scala> valid.value
res0: Option[scala.util.Try[Int]] = Some(Success(42))

filter 조건을 충족시키는 것이 하나도 없는 경우 NoSucheElementException으로 실패합니다.

scala> val invalid = fut.filter(res => res < 0)
invalid: scala.concurrent.Future[Int] = ...

scala> invalid.value
res1: Option[scala.util.Try[Int]] =
  Some(Failure(java.util.NoSuchElementException:
  Future.filter predicate is not satisfied))

Future에는 withFilter 메소드를 제공하기 때문에 for에서도 필터로 사용할 수 있습니다.

scala> val valid = for (res <- fut if res > 0) yield res
valid: scala.concurrent.Future[Int] = ...

scala> valid.value
res2: Option[scala.util.Try[Int]] = Some(Success(42))

Future의 collect 메소드를 사용하면 Future 값을 검증하고 변환하는 작업을 한 연산에서 수행할 수 있습니다. collect에 전달되는 부분 함수를 가지고 변환을 해서 처리할 수 있습니다.

scala> val valid = fut collect { case res if res > 0 => res + 46 }
valid: scala.concurrent.Future[Int] = ...

scala> valid.value
res17: Option[scala.util.Try[Int]] = Some(Success(88))

이번 포스트에서는 스칼라에서 제공하는 동시성 도구인 Future에 관해서 살펴보았습니다. 스칼라의 Future를 사용하면 비동기적 프로그래밍이 가능합니다. Future의 사용법의 내용이 길어 다음 포스트에 이어서 작성할 예정입니다.

 

다음 포스트는 이 링크를 클릭하시면 이동하실 수 있습니다.

References

'Programming > Scala' 카테고리의 다른 글

case class (케이스 클래스)  (0) 2019.11.17
스칼라 퓨처(Future)와 동시성(Concurrency) - 2  (0) 2019.08.17