ssung_데이터 엔지니어링/13주차_하둡과 Spark

하둡&Spark_(2)

ssungcohol 2024. 1. 18. 23:40

Spark

  • 빅데이터 처리를 위한 오픈소스 분산 처리 플랫폼

Spark 데이터 처리 흐름

  • 데이터프레임(DF)은 작은 파티션들로 구성
    • 데이터프레임은 한 번 만들어지면 수정 불가 (Immutable)
  • 입력 데이터프레임을 원하는 결과 도출까지 다른 데이터 프레임으로 계속 변환
    • sort, group by, filter, map, join ....
  • 셔플링 : 파티션 간에 데이터 이동이 필요한 경우 발생
    • 셔플링이 발생하는 경우
      • 명시적 파티션을 새롭게 하는 경우 (Ex_파티션 수 줄이기)
      • 시스템에 의해 이뤄지는 셔플링 (Ex_그룹핑에서의 aggregation, sorting)
    • 셔플링이 발생할 때 네트워크를 타고 데이터가 이동
      • 파티션의 개수는 spark.sql.shuffle.partitions가 결정
        (기본값은 200이며 이는 최대 파티션의 개수)
      • 오퍼레이션에 따라 파티션 수가 결정 (이 때 Data Skew 발생 가능)
        (random, hashing partition, range partition ..), (sorting의 경우 range partition을 사용
    • Data Skewness
      • Data partitioning은 데이터 처리에 병렬설을 주지만 단점도 존재
        • 이는 데이터가 균등하게 분포하지 않는 경우 (주로 데이터 셔플링 후에 발생)
        • 셔플링을 최소화하는 것이 중요하고 파티션 최적화를 하는 것이 중요

Spark 데이터 구조

  • RDD, DataFrame, Dataset (Immutable Distributed Data)
    • DataFrame과 Dataset이 하나의 API로 통합 (2016)
    • 모두 파티션으로 나뉘어 Spark에서 처리

RDD (Resilient Distributed Dataset)

  • 로우레벨 데이터로 클러스터 내의 서버에 분산된 데이터를 지칭 (map, filter, flatMap 등등 지원)
  • 레코드 별로 존재하지만 스키마가 존재하지 않음 (구조화된 데이터나 비구조화된 데이터 모두 지원)
  • 변경이 불가능한 분산 저장된 데이터
    • RDD는 다수의 파티션으로 구성
  • 일반 python 데이터는 parallelize 함수로 RDD로 변환
    • 반대는 collect로 python 데이터로 변환 가능

DataFrame, Dataset

  • RDD 위에 만들어지는 RDD와는 달리 필드 정보를 갖고 있음 (테이블)
  • Dataset은 타입 정보가 존재하며 컴파일 언어에서 사용가능 (Scala / Java 에서 사용 가능)
  • PySpark에서는 DataFrame을 사용
  • 변경이 불가한 분산 저장된 데이터
  • RDD와 달리 관계형 데이터베이스 테이블처럼 컬럼으로 나눠 저장
    • pandas의 데이터 프레임 혹은 RDBMS의 테이블과 거의 유사
    • 다양한 데이터소스 지원 : HDFS, Hive, 외부 데이터베이스, RDD 등등
  • Scala, Java, Python과 같은 언어에서 지원

Spark Session

  • Spark 프로그램의 시작은 SparkSession을 만드는 것
    • 프로그램마다 하나를 만들어 Spark Cluster와 통신 : Singleton 객체
  • Spark Session을 통해 Spark이 제공해주는 다양한 기능을 사용
    • DataFrame, SQL, Streaming, ML API 모두 이 객체로 통신
    • config 메소드를 이용해 다양한 환경설정 가능
    • 단, RDD와 관련된 작업을 할 때는 SparkSession 밑의 sparkContext 객체 사용
  • 환경 변수
    • Spark Session을 만들 때 다양한 환경 설정이 가능
      • executor 별 메모리 : spark.executor.memory (기본값 : 1G)
      • executor 별 CPU 수 : spark.executor.cores (YARN에서는 기본값 1)
      • driver 메모리 : spark.driver.memory (기본값 : 1G)
      • Shuffle 후 Partition의 수 : spark.sql.shuffle.partitions (기본값 : 최대 200)
    • 사용하는 Resource Manager에 따라 환경변수가 많이 달라짐
  • 환경 설정 방법
    • 환경변수
    • $SPARK_HOME/conf/spark_defaults.conf
    • spark-submit 명령의 커맨드라인 파라미터
    • SparkSession 만들 때 지정 (SparkConf)
  • Spark Session이 지원하는 데이터 소스
    • spark.read (DataFrameReader)를 사용하여 데이터프레임으로 로드
    • DataFrame.write (DataFrameWriter)를 사용하여 데이터프레임을 저장
    • 많이 사용되는 데이터 소스
      • HDFS 파일
         - CSV, JSON, Parquet, ORC, Text, Avro
         - Hive 테이블
      • JDBC 관계형 데이터베이스
      • 클라우드 기반 데이터 시스템
      • 스트리밍 시스템
728x90

'ssung_데이터 엔지니어링 > 13주차_하둡과 Spark' 카테고리의 다른 글

하둡&Spark_(1)  (0) 2024.01.18