ssung_데이터 엔지니어링/14주차_Kafka와 Spark Streaming

Kafka와 Spark Streaming_(4, 5)

ssungcohol 2024. 1. 30. 17:06

Kafka Topic 파라미터 설정

  • Topic 생성시 다수의 Partition이나 Replica를 주려할 때
    • KafkaAdminClient 오브젝트 생성 후 create_topics 함수로 Topic 추가
    • create_topics의 인자로는 NewTopic 클래스의 오브젝트 지정
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic = NewTopic(
 name=name,
 num_partitions=partitions, # partition의 수 = 1
 replication_factor=replica) # Replication의 수 = 1
client.create_topics([topic])
  • Kafka Producer의 파라미터
파라미터 의미 기본 값
bootstrap_servers 메세지를 보낼 때 사용할 브로커 리스트 (host : port) localhost:9092
client_id Kafka Producer의 이름 'kafka-python-{version}
key_serializer, value_serializer 메세지의 키와 값의 serialize 방법 지정(함수)  
enable_idemporence 중복 메세지 전송을 막을 것인가? False (막지 않음)
acks: 0, 1, 'all' consistency level. 0 : 바로 리턴
1. 리더에 쓰일 때까지 대기
all : 모든 partition leader/follower에 적용까지 대기
0
retries
delivery.timeout.ms
메세지 실패 시 재시도 회수
메세지 전송 최대 시간
2147483647
120000
linger_ms, batch_size 다수의 메세지를 동시에 보내기 위함 (배치전송)
 - 메세지 송신 전 대기 시간
 - 메세지 송신 전 데이터 크기
0
16384
  • Kafka Producer 동작


  • Consumer가 다수의 Partitions들로부터 어떻게 읽어오는가?
    • Consumer가 하나이고 다수의 Partitions들로 구성된 Topic으로부터 읽어야하는 상황
      • Consumer는 각 Partition들로부터 라운드 로빈 형태로 하나씩 읽게 됨
      • 이 경우 병렬성이 떨어지고 데이터 생산 속도에 따라 Backpressure가 심해질 수 있음
      • 이를 해결하기 위한 것이 Consumer Group
    • 한 프로세스에서 다수의 Topic을 읽는 것 가능
      • Topic 수만큼 KafkaConsumer 인스턴스를 생성하고 별도의 Group ID와 Client ID를 지정
  • Consumer Group
    • Consumer가 Topic을 읽기 시작하면 해당 Topic내 일부 Partition들이 자동으로 할당
    • Consumer의 수보다 Partition의 수가 더 많은 경우, Partition은 라운드 로빈 방식으로 Consumer들에게 할당됨
      (한 Partition은 한 Consumer에게만 할당됨)
      • 이를 통해 데이터 소비 병렬성을 늘리고 Backpressure경감
      • Consumer가 일부 중단되더라도 계속해서 데이터 처리 가능
    • Consumer Group Rebalancing
      • 기존 Consumer가 어떠한 이유로 사라지거나 새로운 Consumer가 Group에 참여하는 경우 Partition들이 다시 지정되어야함. 이를 Consumer Group Rebalancing이라고 부르면 Kafka에서 알아서 수행해줌
  • Consumer 파라미터
파라미터 의미 기본 값
bootstrap_servers 메세지를 보낼 때 사용할 브로커 리스트 (host:port) localhost:9092
client_id Kafka Consumer의 이름 'kafka-python-{version}
group_id Kafka Consumer Group의 이름  
key_deserializer, value_deserializer 메세지의 키와 값의 deserialize 방법 지정 (함수)  
auto_offset_reset earliest, latest latest
enable_auto_commit True이면 소비자의 오프셋이 백그라운드에서 주기적으로 커밋
False이면 명시적으로 커밋을 해주어야함
오프셋은 별도로 리셋이 가능하고 Conduktor Web UI에서도 가능
True

Message Processing Guarantee 방식

  • 실시간 메시지 처리 및 전송 관점에서 시스템의 보장 방식에는 크게 3가지가 존재
방식 설명
Exactly Once Exactly Once (정확히 한번)는 각 Message가 Consumer에게 정확히 한번만 전달된다는 것을 보장.
네트워크 문제, 장애 또는 재시도 가능성 등이 있어 아주 어려운 문제
1 > Producer 단에는 enable_idempotence를 True로 설정
2 > Producer에서 메세지를 쓸 때와 Consumer에서 읽을 때 Transaction API를 사용
At Least Once At Least Once (적어도 한 번 이상)는 모든 메시지가 Consumer에게 적어도 한 번 이상 전달되도록 보장하지만, 메시지 중복 가능성 존재
해당 경우, Consumer는 중복 메시지를 처리하기 위해 중복 제거 메커니즘을 구현해야함 (멱등성)
이는 보통 Consumer가 직접 오프셋을 커밋할 때 발생
At Most Once At Most Once (최대 한 번만)는 메시지 손실 가능성에 중점을 둠.
이는 메시지가 손실될 수는 있지만 중복이 없음을 의미. 가장 흔한 메시지 전송 방법

Consumer/Producer 패턴

  • 많은 경우 Consumer는 한 Topic의 메세지를 소비해서 새로운 Topic을 만들기도함
  • 즉, Consumer이면서 Producer로 동작하는 것이 아주 흔한 패턴임
  • 데이터 Transformation, Filtering, Enrichment
    • 동일한 프로세스 내에서 Kafka Consumer를 사용하여 한 Topic에서 메시지를 읽고 필요한 데이터 변환 또는 Enrichment를 수행한 다음, Producer를 사용하여 수정된 데디어틑 다른 Topic으로 푸시 가능

Spark Streaming

  • 실시간 데이터 스트림 처리를 위한 Spark API
  • Kafka, Kinesis, Flume, TCP 소켓 등의 다양한 소스에서 발생하는 데이터 처리 가능
  • Join, Map, Reduce, Window와 같은 고급 함수 사용 가능

 

Spark Streaming 동작방식

  • 데이터를 마이크로 배치로 처리
  • 위의 과정을 반복 (루프)
  • 읽은 데이터를 앞서 읽은 데이터에 merge
  • 배치마다 데이터 위치 관리 (시작과 끝)
  • Fault Tolerance와 데이터 재처리 관리 (실패시)

Spark Streaming 내부 동작

  • Spark Streaming은 실시간 입력 데이터 스트림을 배치로 나눔
  • 이후, Spark Engine에서 처리하여 최종 결과 스트림을 일괄적으로 생성
    • DStream과 Structured Streaming 두 종류가 존재


Spark Structured Streaming

  • Spark Streaming은 실시간 데이터 스트림을 배치로 나눔
  • Spark Engine에서 처리하여 최종 결과 스트림을 일괄적으로 생성
DStream Structured Streaming
RDD 기반 스트리밍 처리 DataFrame 기반 스트리밍 처리
Spark SQL 엔진의 최적화 기능 사용불가 Catalyst 기반 최적화 혜택을 가져감
이벤트 발생 시간 기반 처리 불가 이벤트 발생 시간 기반을 처리 가능
개발이 중단된 상태 계속해서 기능이 추가 중

Source & Sink

  • Source와 Sink는 외부 시스템 (소스)에서 스트리밍 데이터를 수집하고 처리된 데이터를 외부 시스템 (싱크)으로 출력하는 것을 용이하게 하는 구성 요소

 

Source

  • Source는 Kafka, Amazon Kinesis, Apache Flume, TCP/IP 소켓, HDFS, File 등을 Spark Structured Streaming에서 처리할 수 있도록 해줌
    • Spark DataFrame으로 변환해줌
    • Kafka에서 Spark Structured Streaming으로 데이터를 수집하려는 경우, Kafka Source를 사용하여 Kafka 클러스터에서 하나 이상의 토픽에서 데이터를 가져와 DataFrame으로 변환 가능
  • Spark DataFrame과 비교하면 readStream을 사용하는 점이 다름

 

Sink

  • Sink는 Spark Structured Streaming에서 처리된 데이터를 외부 시스템이나 스토리지로 출력 가능하게 해줌
  • Sink는 변환되거나 집계된 데이터가 어떻게 쓰이거나 소비되는지를 정의
    • Source와 마찬가지로, Sink는 Kafka, HDFS, Amazon S3, Apache Cassandra, JDBC 데이터베이스 등과 같은 다양한 대상에 대해 사용 가능
    • Kafka Sink를 사용하여 Spark Structured Streaming에서 처리된 데이터를 Kafka Topic으로 쓰는 것이 가능
  • OutputMode : 현재 Micro Batch의 결과가 Sink에 어떻게 쓰일지 결정
    • Append
    • Update: UPSERT 같은 느낌
    • Complete : FULL REFRESH 같은 느낌

 

Micro Batch Trigger Option

  • Unspecified : 디폴드 모드로 현재 Micro Batch가 끝나면 다음 Batch가 바로 시작
  • Time Interval : 고정된 시간마다 Micro Batch를 시작. 현재 Batch가 지정된 시간을 넘어서 끝나면 끝나마자마 다음 Batch가 시작됨. 읽을 데이터가 없는 경우 시작되지 않음
  • One Time => Available-Now : 지금 있는 데이터를 모두 처리하고 중단
  • Continuous : 새로운 저지연 연속 처리 모드에서 실행.  (베타버전)

 

728x90