๊ด€๋ฆฌ ๋ฉ”๋‰ด

data_lab

[ํ•˜๋‘ก์—์ฝ”์‹œ์Šคํ…œ] ์ŠคํŒŒํฌ / apache spark ๋ณธ๋ฌธ

BIGDATA/ํ•˜๋‘ก์—์ฝ”์‹œ์Šคํ…œ

[ํ•˜๋‘ก์—์ฝ”์‹œ์Šคํ…œ] ์ŠคํŒŒํฌ / apache spark

๐Ÿฐํžˆํžˆ 2021. 3. 30. 21:37

ํ•˜๋‘ก์˜ ๋ฌธ์ œ์ ์„ ๋ณด์™„ํ•˜๊ธฐ ์œ„ํ•ด ์ŠคํŒŒํฌ ์ƒ๊น€

 

ํ•˜๋‘ก์˜ ๋ฌธ์ œ๋Š”

1. ๋ฐ˜๋ณต์ ์ธ ์ž‘์—…์—๋Š” ๋น„ํšจ์œจ์ ์ž„

2. ๋งต๋ฆฌ๋“€์Šค์‹œ ๋„คํŠธ์›Œํฌ ํŠธ๋ž˜ํ”ฝ์œผ๋กœ ์ธํ•ด ์„ฑ๋Šฅ์ €ํ•˜๋จ.

 

์ŠคํŒŒํฌ๋ž€?

๊ธฐ์กด ๋งต๋ฆฌ๋“€์Šค์˜ ๋””์Šคํฌ ์ž…์ถœ๋ ฅ์„ ๋ณด์™„ํ•˜์—ฌ

์ธ ๋ฉ”๋ชจ๋ฆฌ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ํ”„๋ ˆ์ž„ ์›Œํฌ ์ด๋‹ค.

 

์ธ ๋ฉ”๋ชจ๋ฆฌ - ์ตœ์ดˆ ๋ฐ์ดํ„ฐ ์ž…๋ ฅ, ์ถœ๋ ฅ์—๋งŒ ๋””์Šคํฌ์— ์ž‘์„ฑํ•จ์œผ๋กœ ๋„คํŠธ์›Œํฌ ํŠธ๋ž˜ํ”ฝ ๋ฐœ์ƒ ๋‚ฎ์ถค, ์ค‘๊ฐ„ ๊ฒฐ๊ณผ๋Š” ๋ณ‘๋ ฌ์ฒ˜๋ฆฌํ•จ

 

์ŠคํŒŒํฌ์˜ ์ฃผ์š”๊ธฐ๋Šฅ - ์ŠคํŒŒํฌ SQL, ์ŠคํŒŒํฌ ์ŠคํŠธ๋ฆฌ๋ฐ, ์ŠคํŒŒํฌ MLlib, ์ŠคํŒŒํฌ GraphX, ์ŠคํŒŒํฌ ์ฝ”์–ด, ์ŠคํŒŒํฌ ์ž‘์—… ์ฒ˜๋ฆฌ

 

์ŠคํŒŒํฌ ์•„ํ‚คํ…์ฒ˜

๋…ธ๋“œ๋งค๋‹ˆ์ € ์•ˆ์—  ๋“œ๋ผ์ด๋ฒ„ ํ”„๋กœ๊ทธ๋žจ์ด ์žˆ์Œ.

1. ๋“œ๋ผ์ด๋ฒ„ ํ”„๋กœ๊ทธ๋žจ์ด SparkContext ์ธ์Šคํ„ด์Šค ์ƒ์„ฑํ•จ(์ด๋•Œ yarn๊ณผ ์—ฐ๊ฒฐ)

2. executors ๋ฅผ ์š”๊ตฌ

3. ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์ฝ”๋“œ๋ฅผ executors์— ๋ณด๋‚ผ ๊ฒƒ

4. SparkContext ๋Š” executors๋ฅผ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•ด task๋ฅผ ๋ณด๋ƒ„

 

Spark์˜ Driver๋Š” YARN์—์„œ Application Master์™€ ๊ฐ™์Œ

 

์ŠคํŒŒํฌ ์„ค์น˜

1
2
3
$ wget https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
$ tar xvzf spark-2.4.7-bin-hadoop2.7.tgz
ln -s spark-2.4.7-bin-hadoop2.7/ spark
cs

 

์ŠคํŒŒํฌ์˜ RDD

RDD์˜ ๊ฐœ๋…(Resilient Distributed Datasets)

-์ŠคํŒŒํฌ ๋‚ด์— ์ €์žฅ๋˜๋Š” ๋ฐ์ดํ„ฐ ์…‹ ํƒ€์ž…

-๋‚ด๋ถ€์ ์œผ๋กœ ์—ฐ์‚ฐํ•˜๋Š” ๋ฐ์ดํ„ฐ๋“ค์„ ๋ชจ๋‘ RDD ํƒ€์ž…์œผ๋กœ ์ฒ˜๋ฆฌ

Immutable, Partitioned Collections of Records

์—ฌ๋Ÿฌ ๋ถ„์‚ฐ ๋…ธ๋“œ์— ๋‚˜๋ˆ„์–ด์ง€๋ฉฐ

๋‹ค์ˆ˜์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๊ด€๋ฆฌ๋จ

๋ณ€๊ฒฝ์ด ๋ถˆ๊ฐ€๋Šฅํ•œ ๋ฐ์ดํ„ฐ ์…‹

 

RDD์˜ ์ƒ์„ฑ

1. ์™ธ๋ถ€๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ฌ ๋•Œ

2. ์ฝ”๋“œ์—์„œ ์ƒ์„ฑ๋˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•  ๋•Œ

 

RDD๋ฅผ ์ œ์–ดํ•˜๋Š” 2๊ฐœ์˜ ์—ฐ์‚ฐ ํƒ€์ž…

1.Transformation : RDD์—์„œ ์ƒˆ๋กœ์šด RDD ์ƒ์„ฑํ•˜๋Š” ํ•จ์ˆ˜ (filter, map)

2.Action : RDD์—์„œ RDD๊ฐ€ ์•„๋‹Œ ๋‹ค๋ฅธ ํƒ€์ž…์˜ ๋ฐ์ดํ„ฐ๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ํ•จ์ˆ˜(count, collect)

 

RDD ๋ถ„์‚ฐ ์ฒ˜๋ฆฌ ๋ฐฉ๋ฒ•

1.Immutable : ๋ฐ์ดํ„ฐ์…‹ ์ƒ์„ฑ ๋’ค ๋ณ€ํ•˜์ง€ ์•Š์Œ

2.Partitoned : ๋ฐ์ดํ„ฐ์…‹์„ ์ž˜๊ฒŒ ์ž๋ฆ„

 

RDD Partitioning

ํ•˜๋‚˜์˜ RDD๋Š” ์—ฌ๋Ÿฌ ๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋‚˜๋‰œ๋‹ค.

ํŒŒํ‹ฐ์…˜์˜ ๊ฐœ์ˆ˜์™€ ํŒŒํ‹ฐ์…˜์„ ์„ ํƒํ•  ์ˆ˜ ์žˆ๋‹ค.

 

RDD Dependency

-Narrow Dependency

ํŒŒํ‹ฐ์…˜์ด 1:1๋กœ ๋งคํ•‘ ๋˜์–ด ๋„คํŠธ์›Œํฌ ํ•„์š” ์—†๊ณ  ํ•˜๋‚˜์˜ ๋…ธ๋“œ์—์„œ ์ž‘์—… ๊ฐ€๋Šฅํ•˜๋‹ค, ๊ทธ๋ฆฌ๊ณ  ํŒŒํ‹ฐ์…˜ ๋ณต์› ์‰ฌ์›€  

-Wide Dependency

ํŒŒํ‹ฐ์…˜์ด 1:N๋กœ ๋งคํ•‘ ๋˜์–ด ํŒŒํ‹ฐ์…˜ ์žฌ๊ณ„์‚ฐ ๋น„์šฉ ๋น„์‹ธ๋ฉฐ, ๋„คํŠธ์›Œํฌ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

RDD Lineage

RDD์—ฐ์‚ฐ ์ˆœ์„œ ๊ธฐ๋ก -> DAG (์ˆœํ™˜๋˜์ง€์•Š์Œ)

 Fault tolerant : ๊ณ„๋ณด๋กœ ๋˜‘๊ฐ™์€ RDD ์ƒ์„ฑ ๊ฐ€๋Šฅํ•จ

 Lazy execution

-Transformation ์—ฐ์‚ฐ ์‹œ ๊ณ„๋ณด ์ž‘์ • ๋จ

-Action ์—ฐ์‚ฐ ์‹œ ๊ณ„๋ณด ์‹คํ–‰๋จ

๋ฏธ๋ฆฌ ์ž‘์„ฑ๋œ ๊ณ„๋ณด๋กœ ์ž์›ํ• ๋‹น ์ฐธ๊ณ ํ•  ์ˆ˜ ์žˆ์Œ

ํ˜„์žฌ ์“ฐ๊ณ ์žˆ๋Š” ์ž์›, ์•ž์œผ๋กœ ์‚ฌ์šฉํ•  ์ž์›, Dependency๋กœ ์ž‘์—… ์Šค์ผ€์ค„๋ง์— ํ™œ์šฉ ๊ฐ€๋Šฅํ•จ

 

Spark YARN ์‹คํ–‰

1
2
3
4
5
6
scala : spark-shell --master yarn --queue queue_name
python : pyspark --master yarn --queue queue_name
 
--driver-memory 3G : spark driver๊ฐ€ ์‚ฌ์šฉํ•  ๋ฉ”๋ชจ๋ฆฌ default = 1024M
--executor-memory 3G : ๊ฐ spark executor๊ฐ€ ์‚ฌ์šฉํ•  ๋ฉ”๋ชจ๋ฆฌ์–‘
--executor-cores NUM : ๊ฐ spark executor์˜ ์ฝ”์–ด์˜ ์–‘
cs

Spark Shell ์ž…๋ ฅ ํ›„ ์ฝ”๋“œ ์ž‘์„ฑํ•ด๋ณด๊ธฐ

1
2
3
4
x=sc.parallelize([“spark”, ”rdd”, ”example”, “sample”, “example”], 3)
y=x.map(lambda x:(x,1))
y.collect()
[(‘spark’,1), (‘rdd’,1), (‘example’,1), (‘sample’,1), (‘example’,1)]
cs

x์— ์ŠคํŒŒํฌ ์ฝ˜ํ…์ŠคํŠธ ๋ณ‘๋ ฌํ™”๋กœ ์ƒ์„ฑํ•จ

y์— x๋ฅผ ๋งตํ˜•์‹์œผ๋กœ x๊ฐ’๊ณผ 1 ์ €์žฅํ•จ

collect ์จ์„œ ์ง‘ํ•ฉ ์ถœ๋ ฅํ•จ

 

์ž‘์„ฑํ•œ ์ŠคํŒŒํฌ์—์„œ ์‹คํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•

ํŒŒ์ด์ฌ ํŒŒ์ผ  (num์€ ์“ฐ๋ ˆ๋“œ ๊ฐœ์ˆ˜, default ๊ฐ’์€ 2~4๊ฐœ ์ •๋„)

1
spark-submit –master local[num] ํŒŒ์ผ๋ช….py 
cs

์ž๋ฐ”,์Šค์นผ๋ผ ํŒŒ์ผ

1
spark-submit \ --class “SimpleApp”\ --master local[num] /location~/name.jar
cs

 

์ŠคํŒŒํฌ์—์„œ ๋งต๋ฆฌ๋“€์Šค

1
2
3
4
val input: RDD[(K1, V1)] = ...
val mapOutput: RDD[(K2, V2)] = input.flatMap(mapFn)
val shuffled: RDD[(K2, Iterable[V2])] = mapOutput.groupByKey().sortByKey()
val output: RDD[(K3, V3)] = shuffled.flatMap(reduceFn)
cs

RDD (K1,V1) ๋ฐ์ดํ„ฐ ์ž…๋ ฅ

flatMap()์—ฐ์‚ฐ ์ˆ˜ํ–‰ํ•˜์—ฌ RDD (K2,V2) ์ถœ๋ ฅ

RDD (K2,V2) ๊ฐ’์œผ๋กœ ์…”ํ”Œํ•จ , groupByKey()๊ณผ sortByKey()์—ฐ์‚ฐ ์ˆ˜ํ–‰ํ•จ

RDD (K3, V3) ๊ฐ’์— shuffledํ›„ flatMap() ์—ฐ์‚ฐํ•œ ๊ฒฐ๊ณผ๊ฐ’ ์ €์žฅํ•จ

 

 

728x90
๋ฐ˜์‘ํ˜•