Big Data/Apache Flink

이벤트 시간 처리(Event Time Processing)와 워터마크(Watermark) - (feat. Apache Flink)

Data Engineer 2020. 11. 29. 21:44

이벤트 시간 처리(Event Time Processing)와 워터마크(Watermark)

스트림 처리에서 바라보는 시간적 측면 중에 이벤트 시간(Event time) 기반으로 처리하는 방식에 대해 살펴보겠습니다. 최근에 데이터 처리 분야에서 스트리밍 애플리케이션 개발의 중요성이 더욱 커지고 있습니다. 만약 스트리밍 애플리케이션을 개발하게 되는 경우 애플리케이션의 목적에 따라 이벤트 시간(Event time)을 기준으로 처리할 것인지 처리 시간(Processing time) 기준으로 처리할 것인지 선택을 해야 할 것입니다. 각각의 시간이 갖는 특성을 이해하고 있으면 스트리밍 애플리케이션의 요구사항에 알맞는 시간을 선택하여 개발할 수 있을 것입니다.

스트림 처리에서 바라보는 시간

  • 이벤트 시간(event time)
    • 이벤트 시간이란 데이터에 의존적인 타임스탬프입니다. 즉, 데이터 내에 존재하는 데이터 발생 시간과 같은 것입니다. 데이터에 의존적이기 때문에 어떠한 값을 타임스탬프로 넣을 것인지 다르겠지만 주로 이벤트가 발생한 시간을 많이 사용합니다.
  • 처리 시간(processing time)
    • 처리 시간이란 실제 스트림 처리 엔진에서 데이터를 처리하는 시간입니다. 즉, 해당 데이터를 받아 처리할 때 처리하는 서버의 시간입니다.
  • 수집 시간(ingestion time)
    • 스트림 처리 엔진에 데이터가 처음으로 수집된, 즉, 들어온 시간을 나타냅니다.

글로만 보았을 때는 헷깔리는 부분이 있습니다. 다음 그림을 보면 이해하기 쉬울 것입니다.

스트림에서 바라보는 시간들이 잘 구별되어 있습니다. 이 중에 이벤트 시간과 처리 시간이 가장 많이 사용됩니다. 스토리지 시간(Storage time)의 경우 많이 사용되진 않습니다.

이벤트 시간은 위에서 설명했듯이 실세계에서 발생한 이벤트의 시간이고 처리 시간은 스트림 시스템에서 이벤트를 처리한 시간입니다. 포스트의 제목처럼 이벤트 시간 처리의 이해도를 높이기 위해서 먼저 처리 시간 기반의 시스템에 대해 살펴보겠습니다. 이 포스트에선 따로 스트림 처리에서 사용하는 윈도우의 개념에 대해서는 설명하지 않습니다. 윈도우에 대한 개념은 해당 블로그 내에 있는 스트림 프로세싱 포스트에서 확인하실 수 있습니다.

처리 시간 기반 시스템

처리 시간 기반 시스템의 예제는 윈도우 사이즈가 10이고 5초마다 슬라이딩 되어 윈도우가 처리됩니다. 본 예제는 간단히 윈도우 내에서 키를 기준으로 워드 카운트를 하는 예제입니다. 플링크 기반의 예제로 설명을 하는데 대부분의 스트림 처리 엔진에서도 비슷한 방식의 연산들을 제공합니다.

val senv = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)

val counts = text.map {(m: String) => (m.split(",")(0), 1) }
    .keyBy(0)
    .timeWindow(Time.seconds(10), Time.seconds(5))
    .sum(1)

counts.print
senv.execute("ProcessingTime example")

이 예제에서 메시지가 정시에 도착하는 경우와 메시지가 지연이 발생해서 늦게 도착하는 경우를 나누어서 사례를 살펴보겠습니다. 본 예제에서는 아래의 그림과 같이 3개의 메세지에 대한 처리를 합니다. 메시지는 CSV 형태이며 value,eventTime의 문자열입니다. 첫 번째는 13초에 a라는 키를 가진 이벤트가 2개 발생하고 16초에 1개 발생합니다.

메시지가 정시에 도착하는 경우

위의 소스 코드에서 처럼 윈도우 사이즈는 10이며 5초마다 슬라이딩 됩니다. 메시지 지연이 없는 경우 아래의 그림과 같이 처리가 됩니다.

일반적으로 개발자는 메시지를 위의 그림과 같이 처리하기를 기대합니다. 그러나 메시지는 네트워크 끊김과 같은 상황으로 인해 지연될 수 있습니다. 그럼 메시지가 시스템에 지연되어 도착하는 경우 처리 시간 기반 시스템에선 어떻게 처리되는지 살펴보겠습니다.

메시지가 지연이 발생해서 늦게 도착하는 경우

13초에 발생한 메시지가 6초의 지연이 발생하여 19초에 도착했다라고 가정해봅시다. 그러면 처리 시간 기반 시스템에서는 어떻게 처리될까요? 바로 다음과 같이 처리가 될 것입니다.

위의 그림에서 처럼 첫 번째 윈도우의 키 합의 결과가 1이되고 윈도우 3의 결과가 2가 되어 우리가 기대했던 결과와 다른 형태로 처리가 되었습니다. 이와 같이 메시지 지연이 발생하는 경우 우리가 기대했던 결과값과 다른 결과가 나올 수 있습니다. 그럼 이 문제는 어떻게 해결해야 할까요? 이러한 문제를 해결하기 위해서는 처리 시간을 사용하는 것이 아닌 이벤트 시간 기준으로 처리해야 합니다.

이벤트 시간 기반 처리 시스템

이벤트 시간은 메시지가 생성된 시간이 대표적입니다. 그러나 스트림 처리 시스템에서는 데이터에서 어떤 것이 이벤트 시간인지 알 수 없습니다. 그래서 데이터 내에 시간 부분을 추출하는 방법을 알려줘야 합니다. 플링크에서는 이벤트 시간을 추출하는 클래스를 정의해주어야 합니다. 정의하는 방법은 다음과 같습니다.

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
  override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
    e.split(",")(1).toLong 
  }

  override def getCurrentWatermark(): Watermark = { 
    new Watermark(System.currentTimeMillis)
  }
}

위의 코드와 같이 이벤트 시간 추출기를 정의해줍니다. 실제 이벤트 시간 추출은 extractTimestamp라는 메소드에서 처리되서 나온 결과로 사용합니다. getCurrentWatermark 메소드는 뒤에서 설명드리도록 하겠습니다. 이제 이벤트 시간을 기반으로 하는 스트림 애플리케이션을 작성해보겠습니다.

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val text = senv.socketTextStream("localhost", 9999)
                .assignTimestampsAndWatermarks(new TimestampExtractor) 

val counts = text.map {(m: String) => (m.split(",")(0), 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(10), Time.seconds(5))
      .sum(1)

counts.print
senv.execute("EventTime example")

이벤트 시간을 기반으로 처리하는 경우 TimeCharacteristicEventTime으로 설정합니다. 그리고 스트림 소스에 이벤트 시간 추출기를 셋팅해주었습니다. 그 외의 로직 코드는 동일합니다. 자, 그럼 위의 코드를 사용하는 경우 메세지 지연이 발생했을 때 어떻게 처리가 되는지 한번 살펴보겠습니다.

메세지 지연이 발생하는 경우 처리 시간 기반 시스템과의 결과가 다릅니다. 이벤트 시간 기반 처리에서는 윈도우 3에서 지연된 메시지의 이벤트 시간이 맞지 않기 때문에 포함되지 않은 결과를 올바르게 내보냅니다. 하지만 여전히 윈도우 1의 결과는 우리가 기대하는 결과값과 다릅니다. 윈도우 1의 완료 시간은 15초인데 메시지는 이미 윈도우가 완료된 후인 19초에 들어왔기 때문입니다. 이렇게 메시지 지연으로 윈도우 1에서 처리해야 할 메시지를 처리하지 못한 것입니다. 이러한 문제를 해결하기 위해서 워터마크(Watermark)라는 기능을 사용합니다.

워터마크(Watermark)

워터마크는 이러한 지연된 메세지를 처리하기 위한 흥미로운 아이디어입니다. 워터마크는 하나의 타임스탬프입니다. 이 워터마크를 통해 플링크에서는 워터마크보다 지연된 메시지는 도착하지 않을 것이라고 가정하고 결과를 처리합니다. 이러한 워터마크의 개념은 플링크 뿐만 아니라 다른 스트림 시스템에서도 이와 같이 지연된 메시지를 처리하는 방법으로 제공하고 있습니다.
우리는 이미 위의 예제에서 워터마크를 설정하는 것을 확인하였습니다. 바로 이벤트 시간 추출기에서 워터마크에 관한 정보도 함께 지정해주었습니다. 이제 워터마크를 통해서 위에서 지연된 메시지를 정상적으로 처리하는 방법에 관해 살펴보겠습니다.

override def getCurrentWatermark(): Watermark = { 
  new Watermark(System.currentTimeMillis - 5000)
}

시간 추출기 클래스에서 워터마크 설정을 (현재 시간 - 5초)로 설정하겠습니다. 이렇게 설정하게 되면 시스템에게 메시지의 지연 시간을 5초까지는 허용하겠다고 알려주는 것입니다. 이와 같이 설정이 되면 위에서 윈도우 1 [5초 - 15초]의 결과는 20초에 나오게 됩니다. 마찬가지로 윈도우 2 [10초 - 20초]는 5초 뒤인 25초에 결과가 나오게 됩니다.

이와 같이 워터마크를 사용하여 지연된 메시지를 받은 경우에도 정상적으로 결과를 만들어 내도록 처리하였습니다.

이번 포스트에서 스트림 시스템에서 사용되는 시간의 종류에 대해 살펴보았습니다. 그리고 그 시간을 기반으로 메시지가 어떻게 처리되는지도 함께 알아보았습니다. 스트림 시스템에서 메세지 지연은 어쩔 수 없이 발생합니다. 이러한 메세지 지연을 처리하기 위해서 워터마크라는 기능을 제공합니다. 스트림 애플리케이션을 개발할 때 애플리케이션에서 메시지 지연을 허용할 지, 혹은 지연된 메시지를 그냥 무시할 것인지는 요구사항에 따라 다릅니다. 그래서 개발하려는 애플리케이션의 특성을 정확히 파악하고 특성에 맞도록 메시지를 처리하도록 해야 합니다.

References