Kafka Topic 파라미터 설정
- Topic 생성시 다수의 Partition이나 Replica를 주려할 때
- KafkaAdminClient 오브젝트 생성 후 create_topics 함수로 Topic 추가
- create_topics의 인자로는 NewTopic 클래스의 오브젝트 지정
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic = NewTopic(
name=name,
num_partitions=partitions, # partition의 수 = 1
replication_factor=replica) # Replication의 수 = 1
client.create_topics([topic])
- Kafka Producer의 파라미터
파라미터 | 의미 | 기본 값 |
bootstrap_servers | 메세지를 보낼 때 사용할 브로커 리스트 (host : port) | localhost:9092 |
client_id | Kafka Producer의 이름 | 'kafka-python-{version} |
key_serializer, value_serializer | 메세지의 키와 값의 serialize 방법 지정(함수) | |
enable_idemporence | 중복 메세지 전송을 막을 것인가? | False (막지 않음) |
acks: 0, 1, 'all' | consistency level. 0 : 바로 리턴 1. 리더에 쓰일 때까지 대기 all : 모든 partition leader/follower에 적용까지 대기 |
0 |
retries delivery.timeout.ms |
메세지 실패 시 재시도 회수 메세지 전송 최대 시간 |
2147483647 120000 |
linger_ms, batch_size | 다수의 메세지를 동시에 보내기 위함 (배치전송) - 메세지 송신 전 대기 시간 - 메세지 송신 전 데이터 크기 |
0 16384 |
- Kafka Producer 동작

- Consumer가 다수의 Partitions들로부터 어떻게 읽어오는가?
- Consumer가 하나이고 다수의 Partitions들로 구성된 Topic으로부터 읽어야하는 상황
- Consumer는 각 Partition들로부터 라운드 로빈 형태로 하나씩 읽게 됨
- 이 경우 병렬성이 떨어지고 데이터 생산 속도에 따라 Backpressure가 심해질 수 있음
- 이를 해결하기 위한 것이 Consumer Group
- 한 프로세스에서 다수의 Topic을 읽는 것 가능
- Topic 수만큼 KafkaConsumer 인스턴스를 생성하고 별도의 Group ID와 Client ID를 지정
- Consumer가 하나이고 다수의 Partitions들로 구성된 Topic으로부터 읽어야하는 상황
- Consumer Group
- Consumer가 Topic을 읽기 시작하면 해당 Topic내 일부 Partition들이 자동으로 할당
- Consumer의 수보다 Partition의 수가 더 많은 경우, Partition은 라운드 로빈 방식으로 Consumer들에게 할당됨
(한 Partition은 한 Consumer에게만 할당됨)- 이를 통해 데이터 소비 병렬성을 늘리고 Backpressure경감
- Consumer가 일부 중단되더라도 계속해서 데이터 처리 가능
- Consumer Group Rebalancing
- 기존 Consumer가 어떠한 이유로 사라지거나 새로운 Consumer가 Group에 참여하는 경우 Partition들이 다시 지정되어야함. 이를 Consumer Group Rebalancing이라고 부르면 Kafka에서 알아서 수행해줌
- Consumer 파라미터
파라미터 | 의미 | 기본 값 |
bootstrap_servers | 메세지를 보낼 때 사용할 브로커 리스트 (host:port) | localhost:9092 |
client_id | Kafka Consumer의 이름 | 'kafka-python-{version} |
group_id | Kafka Consumer Group의 이름 | |
key_deserializer, value_deserializer | 메세지의 키와 값의 deserialize 방법 지정 (함수) | |
auto_offset_reset | earliest, latest | latest |
enable_auto_commit | True이면 소비자의 오프셋이 백그라운드에서 주기적으로 커밋 False이면 명시적으로 커밋을 해주어야함 오프셋은 별도로 리셋이 가능하고 Conduktor Web UI에서도 가능 |
True |
Message Processing Guarantee 방식
- 실시간 메시지 처리 및 전송 관점에서 시스템의 보장 방식에는 크게 3가지가 존재
방식 | 설명 |
Exactly Once | Exactly Once (정확히 한번)는 각 Message가 Consumer에게 정확히 한번만 전달된다는 것을 보장. 네트워크 문제, 장애 또는 재시도 가능성 등이 있어 아주 어려운 문제 1 > Producer 단에는 enable_idempotence를 True로 설정 2 > Producer에서 메세지를 쓸 때와 Consumer에서 읽을 때 Transaction API를 사용 |
At Least Once | At Least Once (적어도 한 번 이상)는 모든 메시지가 Consumer에게 적어도 한 번 이상 전달되도록 보장하지만, 메시지 중복 가능성 존재 해당 경우, Consumer는 중복 메시지를 처리하기 위해 중복 제거 메커니즘을 구현해야함 (멱등성) 이는 보통 Consumer가 직접 오프셋을 커밋할 때 발생 |
At Most Once | At Most Once (최대 한 번만)는 메시지 손실 가능성에 중점을 둠. 이는 메시지가 손실될 수는 있지만 중복이 없음을 의미. 가장 흔한 메시지 전송 방법 |
Consumer/Producer 패턴
- 많은 경우 Consumer는 한 Topic의 메세지를 소비해서 새로운 Topic을 만들기도함
- 즉, Consumer이면서 Producer로 동작하는 것이 아주 흔한 패턴임
- 데이터 Transformation, Filtering, Enrichment
- 동일한 프로세스 내에서 Kafka Consumer를 사용하여 한 Topic에서 메시지를 읽고 필요한 데이터 변환 또는 Enrichment를 수행한 다음, Producer를 사용하여 수정된 데디어틑 다른 Topic으로 푸시 가능
Spark Streaming
- 실시간 데이터 스트림 처리를 위한 Spark API
- Kafka, Kinesis, Flume, TCP 소켓 등의 다양한 소스에서 발생하는 데이터 처리 가능
- Join, Map, Reduce, Window와 같은 고급 함수 사용 가능

Spark Streaming 동작방식
- 데이터를 마이크로 배치로 처리
- 위의 과정을 반복 (루프)
- 읽은 데이터를 앞서 읽은 데이터에 merge
- 배치마다 데이터 위치 관리 (시작과 끝)
- Fault Tolerance와 데이터 재처리 관리 (실패시)
Spark Streaming 내부 동작
- Spark Streaming은 실시간 입력 데이터 스트림을 배치로 나눔
- 이후, Spark Engine에서 처리하여 최종 결과 스트림을 일괄적으로 생성
- DStream과 Structured Streaming 두 종류가 존재

Spark Structured Streaming
- Spark Streaming은 실시간 데이터 스트림을 배치로 나눔
- Spark Engine에서 처리하여 최종 결과 스트림을 일괄적으로 생성
DStream | Structured Streaming |
RDD 기반 스트리밍 처리 | DataFrame 기반 스트리밍 처리 |
Spark SQL 엔진의 최적화 기능 사용불가 | Catalyst 기반 최적화 혜택을 가져감 |
이벤트 발생 시간 기반 처리 불가 | 이벤트 발생 시간 기반을 처리 가능 |
개발이 중단된 상태 | 계속해서 기능이 추가 중 |
Source & Sink
- Source와 Sink는 외부 시스템 (소스)에서 스트리밍 데이터를 수집하고 처리된 데이터를 외부 시스템 (싱크)으로 출력하는 것을 용이하게 하는 구성 요소
Source
- Source는 Kafka, Amazon Kinesis, Apache Flume, TCP/IP 소켓, HDFS, File 등을 Spark Structured Streaming에서 처리할 수 있도록 해줌
- Spark DataFrame으로 변환해줌
- Kafka에서 Spark Structured Streaming으로 데이터를 수집하려는 경우, Kafka Source를 사용하여 Kafka 클러스터에서 하나 이상의 토픽에서 데이터를 가져와 DataFrame으로 변환 가능
- Spark DataFrame과 비교하면 readStream을 사용하는 점이 다름
Sink
- Sink는 Spark Structured Streaming에서 처리된 데이터를 외부 시스템이나 스토리지로 출력 가능하게 해줌
- Sink는 변환되거나 집계된 데이터가 어떻게 쓰이거나 소비되는지를 정의
- Source와 마찬가지로, Sink는 Kafka, HDFS, Amazon S3, Apache Cassandra, JDBC 데이터베이스 등과 같은 다양한 대상에 대해 사용 가능
- Kafka Sink를 사용하여 Spark Structured Streaming에서 처리된 데이터를 Kafka Topic으로 쓰는 것이 가능
- OutputMode : 현재 Micro Batch의 결과가 Sink에 어떻게 쓰일지 결정
- Append
- Update: UPSERT 같은 느낌
- Complete : FULL REFRESH 같은 느낌
Micro Batch Trigger Option
- Unspecified : 디폴드 모드로 현재 Micro Batch가 끝나면 다음 Batch가 바로 시작
- Time Interval : 고정된 시간마다 Micro Batch를 시작. 현재 Batch가 지정된 시간을 넘어서 끝나면 끝나마자마 다음 Batch가 시작됨. 읽을 데이터가 없는 경우 시작되지 않음
- One Time => Available-Now : 지금 있는 데이터를 모두 처리하고 중단
- Continuous : 새로운 저지연 연속 처리 모드에서 실행. (베타버전)
728x90
'ssung_데이터 엔지니어링 > 14주차_Kafka와 Spark Streaming' 카테고리의 다른 글
Kafka와 Spark Streaming_(2, 3) (0) | 2024.01.24 |
---|---|
Kafka와 Spark Streaming_(1) (1) | 2024.01.23 |