Spark
- 빅데이터 처리를 위한 오픈소스 분산 처리 플랫폼
Spark 데이터 처리 흐름
- 데이터프레임(DF)은 작은 파티션들로 구성
- 데이터프레임은 한 번 만들어지면 수정 불가 (Immutable)
- 입력 데이터프레임을 원하는 결과 도출까지 다른 데이터 프레임으로 계속 변환
- sort, group by, filter, map, join ....
- 셔플링 : 파티션 간에 데이터 이동이 필요한 경우 발생
- 셔플링이 발생하는 경우
- 명시적 파티션을 새롭게 하는 경우 (Ex_파티션 수 줄이기)
- 시스템에 의해 이뤄지는 셔플링 (Ex_그룹핑에서의 aggregation, sorting)
- 셔플링이 발생할 때 네트워크를 타고 데이터가 이동
- 파티션의 개수는 spark.sql.shuffle.partitions가 결정
(기본값은 200이며 이는 최대 파티션의 개수) - 오퍼레이션에 따라 파티션 수가 결정 (이 때 Data Skew 발생 가능)
(random, hashing partition, range partition ..), (sorting의 경우 range partition을 사용
- 파티션의 개수는 spark.sql.shuffle.partitions가 결정
- Data Skewness
- Data partitioning은 데이터 처리에 병렬설을 주지만 단점도 존재
- 이는 데이터가 균등하게 분포하지 않는 경우 (주로 데이터 셔플링 후에 발생)
- 셔플링을 최소화하는 것이 중요하고 파티션 최적화를 하는 것이 중요
- Data partitioning은 데이터 처리에 병렬설을 주지만 단점도 존재
- 셔플링이 발생하는 경우
Spark 데이터 구조
- RDD, DataFrame, Dataset (Immutable Distributed Data)
- DataFrame과 Dataset이 하나의 API로 통합 (2016)
- 모두 파티션으로 나뉘어 Spark에서 처리
RDD (Resilient Distributed Dataset)
- 로우레벨 데이터로 클러스터 내의 서버에 분산된 데이터를 지칭 (map, filter, flatMap 등등 지원)
- 레코드 별로 존재하지만 스키마가 존재하지 않음 (구조화된 데이터나 비구조화된 데이터 모두 지원)
- 변경이 불가능한 분산 저장된 데이터
- RDD는 다수의 파티션으로 구성
- 일반 python 데이터는 parallelize 함수로 RDD로 변환
- 반대는 collect로 python 데이터로 변환 가능
DataFrame, Dataset
- RDD 위에 만들어지는 RDD와는 달리 필드 정보를 갖고 있음 (테이블)
- Dataset은 타입 정보가 존재하며 컴파일 언어에서 사용가능 (Scala / Java 에서 사용 가능)
- PySpark에서는 DataFrame을 사용
- 변경이 불가한 분산 저장된 데이터
- RDD와 달리 관계형 데이터베이스 테이블처럼 컬럼으로 나눠 저장
- pandas의 데이터 프레임 혹은 RDBMS의 테이블과 거의 유사
- 다양한 데이터소스 지원 : HDFS, Hive, 외부 데이터베이스, RDD 등등
- Scala, Java, Python과 같은 언어에서 지원
Spark Session
- Spark 프로그램의 시작은 SparkSession을 만드는 것
- 프로그램마다 하나를 만들어 Spark Cluster와 통신 : Singleton 객체
- Spark Session을 통해 Spark이 제공해주는 다양한 기능을 사용
- DataFrame, SQL, Streaming, ML API 모두 이 객체로 통신
- config 메소드를 이용해 다양한 환경설정 가능
- 단, RDD와 관련된 작업을 할 때는 SparkSession 밑의 sparkContext 객체 사용
- 환경 변수
- Spark Session을 만들 때 다양한 환경 설정이 가능
- executor 별 메모리 : spark.executor.memory (기본값 : 1G)
- executor 별 CPU 수 : spark.executor.cores (YARN에서는 기본값 1)
- driver 메모리 : spark.driver.memory (기본값 : 1G)
- Shuffle 후 Partition의 수 : spark.sql.shuffle.partitions (기본값 : 최대 200)
- 사용하는 Resource Manager에 따라 환경변수가 많이 달라짐
- Spark Session을 만들 때 다양한 환경 설정이 가능
- 환경 설정 방법
- 환경변수
- $SPARK_HOME/conf/spark_defaults.conf
- spark-submit 명령의 커맨드라인 파라미터
- SparkSession 만들 때 지정 (SparkConf)
- Spark Session이 지원하는 데이터 소스
- spark.read (DataFrameReader)를 사용하여 데이터프레임으로 로드
- DataFrame.write (DataFrameWriter)를 사용하여 데이터프레임을 저장
- 많이 사용되는 데이터 소스
- HDFS 파일
- CSV, JSON, Parquet, ORC, Text, Avro
- Hive 테이블 - JDBC 관계형 데이터베이스
- 클라우드 기반 데이터 시스템
- 스트리밍 시스템
- HDFS 파일
728x90
'ssung_데이터 엔지니어링 > 13주차_하둡과 Spark' 카테고리의 다른 글
하둡&Spark_(1) (0) | 2024.01.18 |
---|