본문 바로가기

공부/빅데이터와 머신러닝 소프트웨어 (K-MOOC)

4주차. 빅데이터 스트림 분석

반응형

빅데이터 스트림 분석

 

4-1. 스트림 처리

  • 기존의 배치 분석은 데이터가 고정
  • 스트림 데이터는 계속해서 데이터가 들어옴
  • 예를 들어 이벤트 타임과 실제 들어오는 시각이 다를 수 있음
  • 이벤트 타임과 프로세싱 타임이 별도의 차이가 발생함

 

  • 스트림 데이터 특성
    • Unbounded data - 꾸준히 끊임없이 들어오는 데이터
    • 이벤트 타임과 프로세싱 타임이 다름
    • late data 처리 중요, 순서가 바뀌어서 들어올 가능성도 있음
    • Window operation - 특정 시간 인터벌 안에 있는 인터벌을 처리하기 위해 사용하는 것

 

  • What
  • How
    • Continuous 질의 시스템 - Strom, Heron, Flink
      • 빠르게 데이터 처리
      • 장애복구가 훨씬 어려움
    • Micro-batch system
      • 작은단위의 배치를 계속 수행하는 것
      • 장애복구에는 장점, 하지만 느림

 

  • 분석 패턴
    • element마다 변환하는것 - map, filter
    • Aggreation - Group by로 특정값에 해당하는 것을 모아서 통계
    • Windowing - 특정 시간에 해당하는 데이터를 모아서 분석
      • 이벤트 개수를 새는등 상황에 따라서 다른 조건으로 만들 수 있음

 

    • Fixed - 전체 시간을 고정된 인터벌로 잘라서 Window 만들음
    • Sliding - 크기 뿐만 아니라 얼마나 자주 나오는지 인터벌이 있음
      • ex) 10분마다 밀리게 하여 겹치게하는 부분 발생
      • 크기와 얼마나 자추 출력할지 결정
    • Session
      • 웹서비스와 같은 서비스에서 중요
      • 그 사용자가 연속적으로 사용한다고 하는 시간에 해당하는 개념
      • Action들이 일어나는 것을 모아서 시작과 끝을 정하고 그 시작과 끝을 하나의 Session이라고 함Windowing

세 유저에 대해서 Session Window를 만듬

 

  • EX) Fixed Window의 예제
    • 전체 시간에서 인터벌로 끊어서 분석
  • Watermarks
    • delay를 감안해서 어떤 시점까지 기다리면 해당하는 이벤트가 다 들어올 것이다 추측을 하는것
    • 즉 Watermark보다 더 이전 timestamp를 가진 데이터는 더 이상 들어오지 않을 것 이라고 추측하는것
    • Watermark는 무조건 정확하지는 않음
    • 시스템을 개발하는 사람이 휴리스틱으로 선언
    • 워터마크 선언이 너무 느리면 결과가 지연
    • 너무 빠르면 몇몇 데이터는 늦게 들어오게됨 - 데이터 고려되지 않음
    •  
  • 스트림 질의 Execution 모델
    • 질의가 계속적으로 수행하면서, 데이터가 질의를 통과하면서 질의의 상태를 변형
    • 배치에서는 데이터가 고정되어있고 질의코드를 보내서 수행함
    • 스트림에서는 질의는 고정되어있고 데이터가 질의를 통과하는 개념
    • Micro-batch 에서는 작은 배치 단위로 나누어 처리
      • 데이터 들어옴
      • 배치 오퍼레이션 통해 immutable data set 생성
      • 다음 인터벌 데이터와 위의 immutable data을 합하여 결과 생성

 

4-2 스파크 구조적 스트리밍

 

 

  • Spark Streaming
    • RDD 기반의 Spark Streaming 시스템
    • 새롭게 나온 DataFame 기반의 Spark Structured Streaming
  • Spark Structured Streaming
    • 끊임없이 들어오는 데이터를 테이블에 계속 추가
    • Unbounded Table로 표현이됨
    • 기존의 table row에 계속 추가됨
  • 질의 수행
    • 데이터가 들어올때 마다 질의수행 trigger
    • 1초 마다 들어온다고 할때 1초밑의 데이터 모두 질의수행후 추가
    • 다음초에는 다음 범위의 데이터 추가하는 방식
  • WordCount Example
    • 기존 Batch 분석에서는 분석해야할 데이터가 고정되어 있어 WordCount 결과가 한 번에 나옴


- 매초마다 갱신되는 과정

  • WordCount Program
    1. Spark Cluster에 연결해서 질의를 수행할 수 있는 SparkSession을 생성
      from pyspark.sql import SparkSession 
      from pyspark.sql.functions import explode 
      from pyspark.sql.functions import split 
      spark = SparkSession \
      	.builder \ 
          .appName("WordCountWithKafka")\ 
          .getOrCreate()
    2. SparkSession을 이용해서 데이터 스트림을 읽어서 lines라는 DataFrame 생성
      • 데이터 스트림은 kafka라는 시스템을 통해서전달
      • 카프카 서버의 주소에 해당하는 IP와 port를 옵션에서 주고 어떤 topic을 구독할지 subscribe로 표현
      • 카프카서비스는 받는쪽은 구독 보내는 쪽에서는 publish
        lines = spark \
        	.readStream
            .format("kafka") \
            .option("kafka.bootstrap.servers", ip_addr:port) \
            .option("subscribe", "wc") \
            .load()​
    3. 만들어진 데이터 프레임을 word 형태로 바꾸는 작업
      words = lines.select(
      		explode(split(lines.value, "")
       ).alias("word")
      )
      
      wordCounts = words.groupBy("word").count()​
    4. WordCount를 출력
      • complete - 처음부터 끝까지 다 보여줌
      • awaitTermination -끝날때 까지 질의는 계속되는 함수
        query = wordCounts \
        	.writeStream \
            .queryName("wordcount_kafka") \
            .outputMode("complete") \
            .format("memory") \
            .start()
            
        query.awaitTermination()​
  • Window Operation
    1. grouping
words = ...


windowedCounts = words.groupBy(
	window(words.ts, "3 minutes", "1 minute"),
	words.word
).count()
  1. Watermarking
words = ...


windowedCounts = words
	.withWatermark("ts", "5minutes")
    .groupBy(window(word.ts, "3minutes", "1 minute"),
            words.word)
    .count()
  • Stream-Static Join
    • 스트림 데이터에 대한 고정된 메타 정보가 있을때 사용하는 것
    • static DataFrame - deviceInfoDF
    • 어떤 col로 join할지 알려줌

Stream-Stream Join

  • 스트림과 스트림 데이터를 조인 할 수 있음

 

Quiz

 

Quiz_1

스트림 처리에서 윈도우에 대한 설명 중 올바른 것은?

윈도우는 시간은 겹치지 않는 구간으로 나누는 것만 가능하다.

세션 윈도우는 이벤트가 어느 간격 이상으로 떨어져서 들어와야 같은 세션 윈도우 안에 들어간다.

윈도우에서 이벤트 시간 기반 윈도우는 이벤트가 만들어진 시점을 기반으로 데이터를 모으는 윈도우이다. 정답

윈도우를 사용하면 늦게 들어오는 데이터를 다룰 수 없다.

 

해설)

정답은 3번입니다. 윈도우는 Fixed window, sliding window, session window등이 있고 sliding window나 session window는 겹칠 수 있다. 세션 윈도우에서 두 event가 근접해서 들어오면 같은 세션 윈도우에 들어간다. 같은 윈도우에 들어가기 위해서 어느 간격 이상으로 떨어져서 들어와야 한다는 조건은 없다. 윈도우를 사용할 때 늦게 들어오는 데이터를 다룰 수 있는 여러 정책들을 사용할 수 있다.

 

Quiz_2

다음과 같이 들어오는 데이터를 10분짜리 fixed window를 사용하여 합을 구하고자 할 때 어떤 결과가 출력되는가? Fixed window의 시작 시점은 11:00이다.

11:01 8
11:04 7
11:08 2
11:12 3
11:15 9
11:24 2

8 7 2 3 9 2

15 5 11

2 12 17

17 12 2 정답

 

해설)

정답은 4번입니다.10분 간격으로 윈도우를 만들면 첫 번째 윈도우에 8, 7, 2가 들어가서 합이 17되고 두번째 윈도우에 3, 9가 들어가서 합이 12가 되고 맨 마지막 윈도우는 2가 들어가서 합이 2가 된다.

 

Quiz_3

Micro-batch에 대한 설명 중 옳은 것을 고르시오.

Batch처리를 위한 데이터 프로세싱 모델이다.

Element를 하나씩 순서대로 처리하는 스트림 프로세싱 모델이다.

작은 시간 단위의 batch로 데이터 스트림을 분할하여 처리하는 모델이다. 정답

Continuous Query모델에 비해 낮은 Latency를 가진다.

 

해설)

정답은 3번입니다. Micro-batch 스트림 처리 모델은 작은 시간 단위의 batch로 데이터 스트림을 분할하여 처리하는 모델이다

 

Quiz_4

스트림 처리에서 워터마크 개념의 설명에 부합되는 것을 모두 고르시오.

가. 워터마크는 시스템에서 워터마크 시점 이후에는 더 이상 지연된 데이터가 안 들어온다고 가정하라는 신호이다.
나. 워터마크가 이르게 설정되면 지연된 데이터를 충분히 잘 반영할 수 있다.
다. 워터마크가 늦게 설정되면 결과를 보는 것이 늦어진다.

가 + 나

가 + 다 정답

가 + 나 + 다

 

해설)

정답은 3번입니다. 워터마크가 이르게 설정되면 지연된 데이터를 받기 전에 워터마크가 설정됨으로 지연된 데이터를 반영할 확률이 적어진다.

 

Quiz_5

tags는 스키마를 가지는 스파크 스트리밍 데이터프레임이다. 다음과 같이 슬라이딩 윈도우 (윈도우 크기 10분, 인터벌 크기 1분)로 hashtag값 별로 count를 하는 스트림 처리 질의를 만들려고 한다. 또한 워터마크는 10분 지연된 데이터를 고려하려고 한다. 아래 밑 줄 친 곳에 들어가야 하는 것은?

windowedCounts = tags
.withWatermark(“ts”, ________ )
.groupBy( window(_______,
_______, ________),
________)
.count()

 

“10 minutes”, tags.hashtags, “10 minutes”, “1 minute”, tags.ts

“10 minutes”, tags.ts, “10 minutes”, “1 minute”, tags.hashtags 정답

“10 minutes”, tags.ts, “1 minute”, “10 minutes”, tags.hashtags

“10 minutes”, tags.hashtags, “1 minute”, “10 minutes”, tags.ts

 

해설)

정답은 2번입니다. 처음에는 워터마크에 해당하는 10 minutes가 들어가고, 그 다음은 window를 정의하는 파라미터인 tags.ts, 10 minutes, 1 minute을 써야 한다. 그리고, 마지막으로 윈도우에서 그룹을 hashtags 값들로 하기때문에 tags.hashtags를 써 준다.

반응형