[νλ‘μμ½μμ€ν ] Apache Avro / μνμΉ μμ΄λΈλ‘
μνμΉ μμ΄λΈλ‘λ ?
- νΉμ μΈμ΄μ μ’ μλμ§ μλ μΈμ΄ μ€λ¦½μ λ°μ΄ν° μ§λ ¬ν μμ€ν
- νλ‘ Writableμ μ£Όμ λ¨μ μΈ μΈμ΄ μ΄μμ± ν΄κ²° μν΄ μ겨λ¨
μνμΉ μ°λ¦¬ννΈ, κ΅¬κΈ νλ‘ν μ½ λ²νΌμ λ€λ₯Έ μ°¨λ³νλ νΉμ±κ°μ§κ³ μμ
λ°μ΄ν°λ λ€λ₯Έ μμ€ν κ³Ό λΉμ·νκ² μΈμ΄ λ 립 μ€ν€λ§λ‘ κΈ°μ λ¨
μμ΄λΈλ‘μμ μ½λ μμ±μ μ νμ¬νμ
λ°μ΄ν°λ₯Ό μ½κ³ μ°λ μμ μ μ€ν€λ§λ νμ μ‘΄μ¬νλ€ κ°μ ν¨ - λ§€μ° κ°κ²°ν μ½λ©μ΄ κ°λ₯
μ€ν€λ§μ μμ±
JSON
λ°μ΄ν°λ λ°μ΄λ리 ν¬λ§·μΌλ‘ μΈμ½λ©
μμ΄λΈλ‘ λͺ μΈ - λͺ¨λ ꡬνμ²΄κ° μ§μν΄μΌ νλ λ²μ΄λ리 ν¬λ§·μ λν μμΈν λ΄μ©
API - μμ΄λΈλ‘ λͺ μΈμμ λΉ μ Έμλ λ΄μ©μ. κ° νΉμ μΈμ΄μ λ°λΌ λ€λ₯΄κ² μμ±λ¨. μΈμ΄μ λ°μΈλ© νΈμμ± λμ΄κ³ μνΈμ΄μμ± μ ν λ¬Έμ ν΄κ²°λ¨
μ€ν€λ§ν΄μ - μ μ€νκ² μ μλ μ΄λ ν μ μ½μ‘°κ±΄μμλ λ°μ΄ν°λ₯Ό μ½λ λ° μ¬μ©λλ μ€ν€λ§μ λ°μ΄ν°λ₯Ό μ°λ λ° μ¬μ©λλ μ€ν€λ§κ° κ°μ§ μμλ λλ€.(μ€ν€λ§ λ³ν λ©μ»€λμ¦)
ex ) κ³Όκ±°μ λ°μ΄ν°λ₯Ό μ½μ λ μ¬μ©ν μ€ν€λ§μ μλ‘μ΄ νλλ₯Ό μΆκ°ν μ μλ€. μλ‘μ΄ μ¬μ©μμ κΈ°μ‘΄ μ¬μ©μλ λͺ¨λ κ³Όκ±°μ λ°μ΄ν°λ₯Ό λ¬Έμ μμ΄ μ½μ μ μμΌλ©°μλ‘μ΄ μ¬μ©μλ μλ‘μ΄ νλκ° μΆκ°λ λ°μ΄ν°λ₯Ό μΈμ μλ€. κΈ°μ‘΄ μ¬μ©μλ μλ‘μ΄ λ°μ΄ν°λ₯Ό 보κ²λλλ° μλ‘μ΄ νλλ 무μνκ³ κΈ°μ‘΄ λ°μ΄ν° μμ μ²λΌ μ²λ¦¬ν μ μλ€.
κ°μ²΄ 컨ν μ΄λ ν¬λ§· μ 곡(νλ‘ μνμ€ νμΌκ³Ό μ μ¬ν¨)
μμ΄λΈλ‘ λ°μ΄ν° νμΌμ μ€ν€λ§κ° μ μ₯λ λ©νλ°μ΄ν° μΉμ μ ν¬ν¨νκ³ μμ΄ μμ μ μ€λͺ νλ νμΌμ
μμ΄λΈλ‘ λ°μ΄ν° νμΌμ μμΆκ³Ό λΆν κΈ°λ₯ μ 곡
μμ΄λΈλ‘ μλ£νκ³Ό μ€ν€λ§
μμ΄λΈλ‘μ κΈ°λ³Έ μλ£ν νμ
1
2
3
4
5
6
7
8
|
{"type":"null"}
{"type":"boolean"}
{"type":"int"}
{"type":"long"}
{"type":"float"}
{"type":"double"}
{"type":"bytes"}
{"type":"string"}
|
cs |
μμ΄λΈλ‘ λ³΅ν© μλ£ν
array μμμλ κ°μ²΄ μ§ν©, λμΌ νμ
1
2
3
4
5
|
{
"type": "array",
"items": "long"
}
|
cs |
map μμ μλ ν€-κ°, λμΌ νμ
1
2
3
4
|
{
"type": "map",
"values": "string"
}
|
cs |
record μμμ μλ£ν
1
2
3
4
5
6
7
8
9
10
11
|
{
"type": "record",
"name": "WeatherRecord",
"doc": "A weather reading.",
"fields": [
{"name": "year", "type": "int"},
{"name": "temperature", "type": "int"},
{"name": "stationId", "type": "string"}
]
}
|
cs |
enum λͺ λͺ λ κ°μ μ§ν©
1
2
3
4
5
6
7
|
{
"type": "enum",
"name": "Cutlery",
"doc": "An eating utensil.",
"symbols": ["KNIFE", "FORK", "SPOON"]
}
|
cs |
fixed κ³ μ κΈΈμ΄ 8λΉνΈ λΆνΈ μλ λ°μ΄νΈ
1
2
3
4
5
6
|
{
"type": "fixed",
"name": "Md5Hash",
"size": 16
}
|
cs |
union μ€ν€λ§μ μ λμ¨, λ°°μ΄μ κ°μμλ μ€ν€λ§μ
1
2
3
4
5
|
[
"null",
"string",
{"type": "map", "values": "string"}
]
|
cs |
μμ΄λΈλ‘ μλ£νκ³Ό λ€λ₯Έ νλ‘κ·Έλλ° μΈμ΄ μλ£ν 맀ν νμ
μλ° - μ λ€λ¦ 맀ν : μ€ν€λ§λ₯Ό κ²°μ ν μ μμ λ
μλ°, C++ - ꡬ체μ 맀ν : μ€ν€λ§ λ°μ΄ν° νν μ½λ μμ±
μλ° - 리νλ νΈ λ§€ν : μμ΄λΈλ‘ μλ£νμ κΈ°μ‘΄ μλ° μλ£νμΌλ‘ 맀ν
μΈλ©λͺ¨λ¦¬ μ§λ ¬νμ μμ§λ ¬ν
μμ΄λΈλ‘ μ€ν€λ§ μμ - ν΄λΉ μμ΄λΈλ‘ μ€ν€λ§λ StringPair.avsc μ μ μ₯λ¨
1
2
3
4
5
6
7
8
9
|
{
"type": "record",
"name": "StringPair",
"doc": "A pair of strings.",
"fields": [
{"name": "left", "type": "string"},
{"name": "right", "type": "string"}
]
}
|
cs |
νμΌμ ν΄λμ€ κ²½λ‘μ μ μ₯ν ν λ‘λ©ν¨
1
2
3
|
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(
getClass().getResourceAsStream("StringPair.avsc"));
|
cs |
μ λ€λ¦ API λ₯Ό μ¬μ©νμ¬ μμ΄λΈλ‘ λ μ½λμ μΈμ€ν΄μ€λ₯Ό μμ±ν¨
1
2
3
4
|
GenericRecord datum = new GenericData.Record(schema);
datum.put("left", "L");
datum.put("right", "R");
|
cs |
μΆλ ₯ μ€νΈλ¦Όμ λ μ½λλ₯Ό μ§λ ¬νν¨
1
2
3
4
5
6
7
8
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer =
new GenericDatumWriter<GenericRecord>(schema);
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(datum, encoder);
encoder.flush();
out.close();
|
cs |
DatumReader : λ°μ΄ν° κ°μ²΄λ₯Ό μΈμ½λκ° μ΄ν΄ν μ μλ μλ£νμΌλ‘ λ°ν
GenericDatumReader : GenericRecordμ νλλ₯Ό μΈμ½λλ‘ μ λ¬
μ΄μ μ μμ±λ μΈμ½λλ₯Ό μ¬μ¬μ©νμ§ μκΈ° λλ¬Έμ μΈμ½λ ν©ν 리μ null μ λ¬
writer() μ€νΈλ¦Ό λ«κΈ° μ μ νμνλ©΄ λ νΈμΆ κ°λ₯
encoder.flush(); writeλ©μλ νΈμΆ ν μΈμ½λ νλ¬μνκ³ μΆλ ₯ μ€νΈλ¦Ό λ«μ
ν΄λΉ κ³Όμ λ°λλ‘ νλ©΄ λ°μ΄νΈ λ²νΌμμ κ°μ²΄ μ½μ μ μμ
1
2
3
4
5
6
7
|
DatumReader<GenericRecord> reader =
new GenericDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(),
null);
GenericRecord result = reader.read(null, decoder);
assertThat(result.get("left").toString(), is("L"));
assertThat(result.get("right").toString(), is("R"));
|
cs |
ꡬ체μ μΈ API
λ©μ΄λΈμ μΆκ°νμ¬ μλ°λ‘λ μ€ν€λ§ μ½λ μμ±ν μ μλ€.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
<project>
...
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<includes>
<include>StringPair.avsc</include>
</includes>
<stringType>String</stringType>
<sourceDirectory>src/main/resources</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-sources/java
</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
...
</project>
|
cs |
μμ΄λΈλ‘ λ°μ΄ν° νμΌ
μΈλ©λͺ¨λ¦¬ μ€νΈλ¦Όμμ νμΌ μ½κΈ°
λ°μ΄ν° νμΌ = μμ΄λΈλ‘ μ€ν€λ§ + ν€λ (λ©νλ°μ΄ν° (μ±ν¬λ§μ»€ ν¬ν¨)) +μΌλ ¨μ λΈλ‘ (μ§λ ¬νλ μμ΄λΈλ‘ κ°μ²΄κ° μλ)
λ°μ΄ν° νμΌμ κΈ°λ‘λ κ°μ²΄λ λ°λμ νμΌμ μ€ν€λ§μ μΌμΉν΄μΌνλ€.
μΌμΉνμ§μλ κ²½μ°μ appendμ μμΈ λ°μν μ μλ€.
1
2
3
4
5
6
7
8
9
10
|
File file = new File("data.avro");
DatumWriter<GenericRecord> writer =
new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter =
new DataFileWriter<GenericRecord>(writer);
dataFileWriter.create(schema, file);
dataFileWriter.append(datum);
dataFileWriter.close();
|
cs |
νμΌμ ν¬ν¨λ λ©νλ°μ΄ν° μ°Έμ‘°νμ¬ μ½κΈ° λλ¬Έμ μ€ν€λ§λ₯Ό λ°λ‘ μ μνμ§μμλλ¨
getSchema()λ₯Ό μ΄μ©νλ©΄ DataFileReader μΈμ€ν΄μ€μ μ€ν€λ§ μ 보 μ»μ μ μκ³ μλ³Έ κ°μ²΄μ μ¬μ©ν μ€ν€λ§μ κ°μμ§ νμΈκ°λ₯
1
2
3
4
|
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DataFileReader<GenericRecord> dataFileReader =
new DataFileReader<GenericRecord>(file, reader);
assertThat("Schema is the same", schema, is(dataFileReader.getSchema()));
|
cs |
DataFileReaderλ μ κ· μλ° λ°λ³΅μλ‘ hasNext, nextλ©μλλ₯Ό λ°λ³΅μ μΌλ‘ νΈμΆνμ¬
λͺ¨λ λ°μ΄ν° κ°μ²΄λ₯Ό μνν μ μλ€. λ μ½λκ° νκ°λ§ μλμ§ νμΈνκ³ κΈ°λν νλκ° μλ μ§ νμΈ
1
2
3
4
5
|
assertThat(dataFileReader.hasNext(), is(true));
GenericRecord result = dataFileReader.next();
assertThat(result.get("left").toString(), is("L"));
assertThat(result.get("right").toString(), is("R"));
assertThat(dataFileReader.hasNext(), is(false));
|
cs |
μνΈ μ΄μμ±
νμ΄μ¬ API
1
2
3
4
5
6
|
import os
import string
import sys
from avro import schema
from avro import io
from avro import datafile
|
cs |
μμ΄λΈλ‘ λꡬ
1
|
% java -jar $AVRO_HOME/avro-tools-*.jar tojson pairs.avro
|
cs |
μ€ν€λ§ ν΄μ
κΈ°λ‘ν λ μ¬μ©ν writer μ€ν€λ§μ λ€λ₯Έ readerμ μ€ν€λ§λ₯Ό μ¬μ©νμ¬ λ°μ΄ν°λ₯Ό λ€μ μ½μ μ μλ€,.
μΆκ°λ νλ - reader μ κ·μΌ λ, readerλ μ κ· νλμ κΈ°λ³Έκ°μ΄μ©
μΆκ°λ νλ - writer μ κ·μΌ λ, readerλ μ κ· νλ λͺ¨λ₯΄κΈ° λλ¬Έμ 무μν¨
μ κ±°λ νλ - reader μ κ·μΌ λ, μμ λ νλ 무μ
μ κ±°λ νλ - writer μ κ·μΌ λ, μ κ±°λ νλ κΈ°λ‘νμ§ μμ. reader μ μ€ν€λ§λ₯Ό writer μ€ν€λ§μ κ°κ² λ§μΆκ±°λ μ΄μ μΌλ‘ κ°±μ ν¨
μ λ ¬ μμ
recordλ₯Ό μ μΈμ λͺ¨λ μλ£νμλ μμκ° μ ν΄μ Έμμ
recordλ order μμ± λͺ μνμ¬ μ λ ¬ μ μ΄ν μ μλ€.
μ€λ¦μ°¨μ (κΈ°λ³Έ)
λ΄λ¦Όμ°¨μ : descending
무μ : ignore
μμ΄λΈλ‘ 맡리λμ€
λ μ¨ λ μ½λ
1
2
3
4
5
6
7
8
9
10
|
{
"type": "record",
"name": "WeatherRecord",
"doc": "A weather reading.",
"fields": [
{"name": "year", "type": "int"},
{"name": "temperature", "type": "int"},
{"name": "stationId", "type": "string"}
]
}
|
cs |
μ΅κ³ κΈ°μ¨μ μ°Ύλ 맡리λμ€ νλ‘κ·Έλ¨, μμ΄λΈλ‘ μΆλ ₯ λ§λ¦
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
public class AvroGenericMaxTemperature extends Configured implements Tool {
private static final Schema SCHEMA = new Schema.Parser().parse(
"{" +
" \"type\": \"record\"," +
" \"name\": \"WeatherRecord\"," +
" \"doc\": \"A weather reading.\"," +
" \"fields\": [" +
" {\"name\": \"year\", \"type\": \"int\"}," +
" {\"name\": \"temperature\", \"type\": \"int\"}," +
" {\"name\": \"stationId\", \"type\": \"string\"}" +
" ]" +
"}"
);
public static class MaxTemperatureMapper
extends Mapper<LongWritable, Text, AvroKey<Integer>,
AvroValue<GenericRecord>> {
private NcdcRecordParser parser = new NcdcRecordParser();
private GenericRecord record = new GenericData.Record(SCHEMA);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value.toString());
if (parser.isValidTemperature()) {
record.put("year", parser.getYearInt());
record.put("temperature", parser.getAirTemperature());
record.put("stationId", parser.getStationId());
context.write(new AvroKey<Integer>(parser.getYearInt()),
new AvroValue<GenericRecord>(record));
}
}
}
public static class MaxTemperatureReducer
extends Reducer<AvroKey<Integer>, AvroValue<GenericRecord>,
AvroKey<GenericRecord>, NullWritable> {
@Override
protected void reduce(AvroKey<Integer> key, Iterable<AvroValue<GenericRecord>>
values, Context context) throws IOException, InterruptedException {
GenericRecord max = null;
for (AvroValue<GenericRecord> value : values) {
GenericRecord record = value.datum();
if (max == null ||
360 | Chapter 12: Avro (Integer) record.get("temperature") > (Integer) max.get("temperature")) {
max = newWeatherRecord(record);
}
}
context.write(new AvroKey(max), NullWritable.get());
}
private GenericRecord newWeatherRecord(GenericRecord value) {
GenericRecord record = new GenericData.Record(SCHEMA);
record.put("year", value.get("year"));
record.put("temperature", value.get("temperature"));
record.put("stationId", value.get("stationId"));
return record;
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = new Job(getConf(), "Max temperature");
job.setJarByClass(getClass());
job.getConfiguration().setBoolean(
Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.INT));
AvroJob.setMapOutputValueSchema(job, SCHEMA);
AvroJob.setOutputKeySchema(job, SCHEMA);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), args);
System.exit(exitCode);
}
}
|
cs |
νλ‘κ·Έλ¨ μ€ν
1
2
3
4
|
% export HADOOP_CLASSPATH=avro-examples.jar
% export HADOOP_USER_CLASSPATH_FIRST=true # override version of Avro in Hadoop
% hadoop jar avro-examples.jar AvroGenericMaxTemperature \
input/ncdc/sample.txt output
|
κ²°κ³Όλ¬ΌμΆλ ₯
1
2
3
|
% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-r-00000.avro
{"year":1949,"temperature":111,"stationId":"012650-99999"}
{"year":1950,"temperature":22,"stationId":"011990-99999"}
|
cs |
μμ΄λΈλ‘ 맡리λμ€ μ΄μ©ν΄ μ λ ¬
μμ΄λΈλ‘ λ°μ΄ν° νμΌμ μ λ ¬νλ 맡리λμ€ νλ‘κ·Έλ¨
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
public class AvroSort extends Configured implements Tool {
static class SortMapper<K> extends Mapper<AvroKey<K>, NullWritable,
AvroKey<K>, AvroValue<K>> {
@Override
protected void map(AvroKey<K> key, NullWritable value,
Context context) throws IOException, InterruptedException {
context.write(key, new AvroValue<K>(key.datum()));
}
}
static class SortReducer<K> extends Reducer<AvroKey<K>, AvroValue<K>,
AvroKey<K>, NullWritable> {
@Override
protected void reduce(AvroKey<K> key, Iterable<AvroValue<K>> values,
Context context) throws IOException, InterruptedException {
for (AvroValue<K> value : values) {
context.write(new AvroKey(value.datum()), NullWritable.get());
}
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 3) {
System.err.printf(
"Usage: %s [generic options] <input> <output> <schema-file>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
String input = args[0];
String output = args[1];
String schemaFile = args[2];
Job job = new Job(getConf(), "Avro sort");
job.setJarByClass(getClass());
job.getConfiguration().setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
AvroJob.setDataModelClass(job, GenericData.class);
Schema schema = new Schema.Parser().parse(new File(schemaFile));
AvroJob.setInputKeySchema(job, schema);
AvroJob.setMapOutputKeySchema(job, schema);
AvroJob.setMapOutputValueSchema(job, schema);
AvroJob.setOutputKeySchema(job, schema);
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setOutputKeyClass(AvroKey.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvroSort(), args);
System.exit(exitCode);
}
}
|
cs |
μ λ ¬μ 맡리λμ€ μ ν κ³Όμ μμ μΌμ΄λλ©° μ λ ¬κΈ°λ₯μ μμ΄λΈλ‘μ μ€ν€λ§μ μν΄ μ ν΄μ§
μ λ ₯λ°μ΄ν° μ κ²
1
2
3
4
5
|
% java -jar $AVRO_HOME/avro-tools-*.jar tojson input/avro/pairs.avro
{"left":"a","right":"1"}
{"left":"c","right":"2"}
{"left":"b","right":"3"}
{"left":"b","right":"2"}
|
cs |
νλ‘κ·Έλ¨μ¬μ©νμ¬ μ λ ¬
1
2
|
% hadoop jar avro-examples.jar AvroSort input/avro/pairs.avro output \
ch12-avro/src/main/resources/SortedStringPair.avsc
|
cs |
μ λ ¬ ν μ μ₯λ νμΌ μΆλ ₯
1
2
3
4
5
|
% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-r-00000.avro
{"left":"b","right":"3"}
{"left":"b","right":"2"}
{"left":"c","right":"2"}
{"left":"a","right":"1"}
|
cs |