ํ๋ก์ ๋ฌธ์ ์ ์ ๋ณด์ํ๊ธฐ ์ํด ์คํํฌ ์๊น
ํ๋ก์ ๋ฌธ์ ๋
1. ๋ฐ๋ณต์ ์ธ ์์
์๋ ๋นํจ์จ์ ์
2. ๋งต๋ฆฌ๋์ค์ ๋คํธ์ํฌ ํธ๋ํฝ์ผ๋ก ์ธํด ์ฑ๋ฅ์ ํ๋จ.
์คํํฌ๋?
๊ธฐ์กด ๋งต๋ฆฌ๋์ค์ ๋์คํฌ ์
์ถ๋ ฅ์ ๋ณด์ํ์ฌ
์ธ ๋ฉ๋ชจ๋ฆฌ๊ธฐ๋ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ํ๋ ์ ์ํฌ ์ด๋ค.
์ธ ๋ฉ๋ชจ๋ฆฌ - ์ต์ด ๋ฐ์ดํฐ ์
๋ ฅ, ์ถ๋ ฅ์๋ง ๋์คํฌ์ ์์ฑํจ์ผ๋ก ๋คํธ์ํฌ ํธ๋ํฝ ๋ฐ์ ๋ฎ์ถค, ์ค๊ฐ ๊ฒฐ๊ณผ๋ ๋ณ๋ ฌ์ฒ๋ฆฌํจ
์คํํฌ์ ์ฃผ์๊ธฐ๋ฅ - ์คํํฌ SQL, ์คํํฌ ์คํธ๋ฆฌ๋ฐ, ์คํํฌ MLlib, ์คํํฌ GraphX, ์คํํฌ ์ฝ์ด, ์คํํฌ ์์
์ฒ๋ฆฌ
์คํํฌ ์ํคํ
์ฒ
๋
ธ๋๋งค๋์ ์์ ๋๋ผ์ด๋ฒ ํ๋ก๊ทธ๋จ์ด ์์.
1. ๋๋ผ์ด๋ฒ ํ๋ก๊ทธ๋จ์ด SparkContext ์ธ์คํด์ค ์์ฑํจ(์ด๋ yarn๊ณผ ์ฐ๊ฒฐ)
2. executors ๋ฅผ ์๊ตฌ
3. ์ ํ๋ฆฌ์ผ์ด์
์ฝ๋๋ฅผ executors์ ๋ณด๋ผ ๊ฒ
4. SparkContext ๋ executors๋ฅผ ์คํํ๊ธฐ ์ํด task๋ฅผ ๋ณด๋
Spark์ Driver๋ YARN์์ Application Master์ ๊ฐ์
์คํํฌ ์ค์น
|
$ wget https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
$ tar xvzf spark-2.4.7-bin-hadoop2.7.tgz
ln -s spark-2.4.7-bin-hadoop2.7/ spark
|
cs |
์คํํฌ์ RDD
RDD์ ๊ฐ๋
(Resilient Distributed Datasets)
-์คํํฌ ๋ด์ ์ ์ฅ๋๋ ๋ฐ์ดํฐ ์
ํ์
-๋ด๋ถ์ ์ผ๋ก ์ฐ์ฐํ๋ ๋ฐ์ดํฐ๋ค์ ๋ชจ๋ RDD ํ์
์ผ๋ก ์ฒ๋ฆฌ
Immutable, Partitioned Collections of Records
์ฌ๋ฌ ๋ถ์ฐ ๋
ธ๋์ ๋๋์ด์ง๋ฉฐ
๋ค์์ ํํฐ์
์ผ๋ก ๊ด๋ฆฌ๋จ
๋ณ๊ฒฝ์ด ๋ถ๊ฐ๋ฅํ ๋ฐ์ดํฐ ์
RDD์ ์์ฑ
1. ์ธ๋ถ๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ฌ ๋
2. ์ฝ๋์์ ์์ฑ๋๋ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ ๋
RDD๋ฅผ ์ ์ดํ๋ 2๊ฐ์ ์ฐ์ฐ ํ์
1.Transformation : RDD์์ ์๋ก์ด RDD ์์ฑํ๋ ํจ์ (filter, map)
2.Action : RDD์์ RDD๊ฐ ์๋ ๋ค๋ฅธ ํ์
์ ๋ฐ์ดํฐ๋ก ๋ณํํ๋ ํจ์(count, collect)
RDD ๋ถ์ฐ ์ฒ๋ฆฌ ๋ฐฉ๋ฒ
1.Immutable : ๋ฐ์ดํฐ์
์์ฑ ๋ค ๋ณํ์ง ์์
2.Partitoned : ๋ฐ์ดํฐ์
์ ์๊ฒ ์๋ฆ
RDD Partitioning
ํ๋์ RDD๋ ์ฌ๋ฌ ๊ฐ์ ํํฐ์
์ผ๋ก ๋๋๋ค.
ํํฐ์
์ ๊ฐ์์ ํํฐ์
์ ์ ํํ ์ ์๋ค.
RDD Dependency
-Narrow Dependency
ํํฐ์
์ด 1:1๋ก ๋งคํ ๋์ด ๋คํธ์ํฌ ํ์ ์๊ณ ํ๋์ ๋
ธ๋์์ ์์
๊ฐ๋ฅํ๋ค, ๊ทธ๋ฆฌ๊ณ ํํฐ์
๋ณต์ ์ฌ์
-Wide Dependency
ํํฐ์
์ด 1:N๋ก ๋งคํ ๋์ด ํํฐ์
์ฌ๊ณ์ฐ ๋น์ฉ ๋น์ธ๋ฉฐ, ๋คํธ์ํฌ๋ฅผ ์ฌ์ฉํ๋ค.
RDD Lineage
RDD์ฐ์ฐ ์์ ๊ธฐ๋ก -> DAG (์ํ๋์ง์์)
Fault tolerant : ๊ณ๋ณด๋ก ๋๊ฐ์ RDD ์์ฑ ๊ฐ๋ฅํจ
Lazy execution
-Transformation ์ฐ์ฐ ์ ๊ณ๋ณด ์์ ๋จ
-Action ์ฐ์ฐ ์ ๊ณ๋ณด ์คํ๋จ
๋ฏธ๋ฆฌ ์์ฑ๋ ๊ณ๋ณด๋ก ์์ํ ๋น ์ฐธ๊ณ ํ ์ ์์
ํ์ฌ ์ฐ๊ณ ์๋ ์์, ์์ผ๋ก ์ฌ์ฉํ ์์, Dependency๋ก ์์
์ค์ผ์ค๋ง์ ํ์ฉ ๊ฐ๋ฅํจ
Spark YARN ์คํ
|
scala : spark-shell --master yarn --queue queue_name
python : pyspark --master yarn --queue queue_name
--driver-memory 3G : spark driver๊ฐ ์ฌ์ฉํ ๋ฉ๋ชจ๋ฆฌ default = 1024M
--executor-memory 3G : ๊ฐ spark executor๊ฐ ์ฌ์ฉํ ๋ฉ๋ชจ๋ฆฌ์
--executor-cores NUM : ๊ฐ spark executor์ ์ฝ์ด์ ์
|
cs |
Spark Shell ์
๋ ฅ ํ ์ฝ๋ ์์ฑํด๋ณด๊ธฐ
|
x=sc.parallelize([“spark”, ”rdd”, ”example”, “sample”, “example”], 3)
y=x.map(lambda x:(x,1))
y.collect()
[(‘spark’,1), (‘rdd’,1), (‘example’,1), (‘sample’,1), (‘example’,1)]
|
cs |
x์ ์คํํฌ ์ฝํ
์คํธ ๋ณ๋ ฌํ๋ก ์์ฑํจ
y์ x๋ฅผ ๋งตํ์์ผ๋ก x๊ฐ๊ณผ 1 ์ ์ฅํจ
collect ์จ์ ์งํฉ ์ถ๋ ฅํจ
์์ฑํ ์คํํฌ์์ ์คํํ๋ ๋ฐฉ๋ฒ
ํ์ด์ฌ ํ์ผ (num์ ์ฐ๋ ๋ ๊ฐ์, default ๊ฐ์ 2~4๊ฐ ์ ๋)
|
spark-submit –master local[num] ํ์ผ๋ช
.py
|
cs |
์๋ฐ,์ค์นผ๋ผ ํ์ผ
|
spark-submit \ --class “SimpleApp”\ --master local[num] /location~/name.jar
|
cs |
์คํํฌ์์ ๋งต๋ฆฌ๋์ค
|
val input: RDD[(K1, V1)] = ...
val mapOutput: RDD[(K2, V2)] = input.flatMap(mapFn)
val shuffled: RDD[(K2, Iterable[V2])] = mapOutput.groupByKey().sortByKey()
val output: RDD[(K3, V3)] = shuffled.flatMap(reduceFn)
|
cs |
RDD (K1,V1) ๋ฐ์ดํฐ ์
๋ ฅ
flatMap()์ฐ์ฐ ์ํํ์ฌ RDD (K2,V2) ์ถ๋ ฅ
RDD (K2,V2) ๊ฐ์ผ๋ก ์
ํํจ , groupByKey()๊ณผ sortByKey()์ฐ์ฐ ์ํํจ
RDD (K3, V3) ๊ฐ์ shuffledํ flatMap() ์ฐ์ฐํ ๊ฒฐ๊ณผ๊ฐ ์ ์ฅํจ