概述
- 输入文件从HDFS进行读取.
- 输出文件会存入本地磁盘.
- Reducer和Mapper间的网络I/O,从Mapper节点得到Reducer的检索文件.
- 使用Reducer实例从本地磁盘回读数据.
- Reducer输出- 回传到HDFS.
串行化(序列化)
传输、存储都需要
Writable接口
Avro框架:IDL,版本支持,跨语言,JSON-linke
压缩
能够减少磁盘的占用空间和网络传输的量
Compressed Size, Speed, Splittable
gzip, bzip2, LZO, LZ4, Snappy
要比较各种压缩算法的压缩比和性能
重点:压缩和拆分一般是冲突的(压缩后的文件的block是不能很好地拆分独立运行,很多时候某个文件的拆分点是被拆分到两个压缩文件中,这时Map任务就无法处理,所以对于这些压缩,Hadoop往往是直接使用一个Map任务处理整个文件的分析)
Map的输出结果也可以进行压缩,这样可以减少Map结果到Reduce的传输的数据量,加快传输速率
完整性
磁盘和网络很容易出错,保证数据传输的完整性一般是通过CRC32这种校验法
每次写数据到磁盘前都验证一下,同时保存校验码
每次读取数据时,也验证校验码,避免磁盘问题
同时每个datanode都会定时检查每一个block的完整性
当发现某个block数据有问题时,也不是立刻报错,而是先去Namenode找一块该数据的完整备份进行恢复,不能恢复才报错
数据完整性
由于每个磁盘或者网络上的I/O操作可能会对正在读写的数据不慎引入错误,如果通过的数据流量非常大,数据发生损坏的几率很高。
检查损坏数据的常用方法是在第一次进入系统时计算数据的校验和,然后只要数据不是在一个可靠的通道上传输,就可能会发生损坏。如果新生成的校验和不完全匹配原始的校验和,那么数据就会被认为是损坏的。
一个常用的错误检测代码是CRC-32(cyclic redundancy check,循环冗余检查),计算一个32位的任何大小输入的整数校验和。
HDFS的数据完整性
HDFS以透明方式校验所有写入它的数据,并在默认设置下,会在读取数据时验证校验和。针对数据的每个io.bytes.per.checksum(默认512字节)字节,都会创建一个单独的校验和。
数据节点负责在存储数据及其校验和之前验证它们收到的数据。 从客户端和其它数据节点复制过来的数据。客户端写入数据并且将它发送到一个数据节点管线中,在管线的最后一个数据节点验证校验和。
客户端读取数据节点上的数据时,会验证校验和,将其与数据节点上存储的校验和进行对比。每个数据节点维护一个连续的校验和验证日志,因此它知道每个数据块最后验证的时间。
每个数据节点还会在后台线程运行一个DataBlockScanner(数据块检测程序),定期验证存储在数据节点上的所有块,为了防止物理存储介质中位衰减锁造成的数据损坏。
HDFS通过复制完整的副本来产生一个新的,无错的副本来“治愈”哪些出错的数据块。工作方式:如果客户端读取数据块时检测到错误,抛出Checksum Exception前报告该坏块以及它试图从名称节点中药读取的数据节点。名称节点将这个块标记为损坏的,不会直接复制给客户端或复制该副本到另一个数据节点。它会从其他副本复制一个新的副本。
使用Open方法来读取文件前,通过setVerifyChecksum()方法来禁用校验和验证。
本地文件系统
Hadoop的本地文件系统执行客户端校验。意味着,在写一个名为filename的文件时,文件系统的客户端以透明的方式创建一个隐藏的文件.filename.crc。在同一个文件夹下,包含每个文件块的校验和。
数据块大小由io.bytes.per.checksum属性控制,块的大小作为元数据存储在.crc文件中。
也可能禁用校验和:底层文件系统原生支持校验和。这里通过RawLocalFileSystem来替代LocalFileSystem完成。要在一个应用中全局使用,只需要设置fs.file.impl值为org.apache.hadoop.fs.RawLocalFileSystem来重新map执行文件的URL。或者只想对某些读取禁用校验和校验。
Configuration conf = ...
FileSystem fs = new RawLocalFileSystem();
fs.initialize(null, conf);
ChecksumFileSystem
LocalFileSystem使用ChecksumFileSystem(校验和文件系统)为自己工作,这个类可以很容易添加校验和功能到其他文件系统中。因为ChecksumFileSystem也包含于文件系统中。
FileSystem rawFs = ...
FileSystem checksummedFs = new ChecksumFileSystem(rawFs);
压缩
文件压缩两大好处:减少存储文件所需要的空间且加快了数据在网络上或从磁盘上或到磁盘上的传输速度。
编码和解码
编码和解码器用以执行压缩解压算法。在Hadoop中,编码和解码是通过一个压缩解码器接口实现的。
CompressionCodec有两个方法轻松地压缩和解压数据。使用use the createOutputStream(OutputStream out)创建一个CompressionOutputStream,将其以压缩格式写入底层的流。使用createInputStream(InputStream in) 获取一个CompressionInputStream,从底层的流读取未压缩的数据。
- package com.laos.hadoop;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.io.IOUtils;
- import org.apache.hadoop.io.compress.CompressionCodec;
- import org.apache.hadoop.io.compress.CompressionOutputStream;
- import org.apache.hadoop.util.ReflectionUtils;
- public class StreamCompressor {
- public static void main(String[] args) throws Exception {
- String codecClassname = "org.apache.hadoop.io.compress.GzipCodec";
- Class codecClass = Class.forName(codecClassname);
- Configuration conf = new Configuration();
- CompressionCodec codec = (CompressionCodec) ReflectionUtils
- .newInstance(codecClass, conf);
- //将读入数据压缩至System.out
- CompressionOutputStream out = codec.createOutputStream(System.out);
- IOUtils.copyBytes(System.in, out, 4096, false);
- out.finish();
- }
- }
在阅读一个压缩文件时,我们可以从扩展名来推断出它的编码/解码器。以.gz结尾的文件可以用GzipCodec来阅读。CompressionCodecFactory提供了getCodec()方法,从而将文件扩展名映射到相应的CompressionCodec。
- 01 package com.laos.hadoop;
- 02
- 03 import java.io.InputStream;
- 04 import java.io.OutputStream;
- 05 import java.net.URI;
- 06
- 07 import org.apache.hadoop.conf.Configuration;
- 08 import org.apache.hadoop.fs.FileSystem;
- 09 import org.apache.hadoop.fs.Path;
- 10 import org.apache.hadoop.io.IOUtils;
- 11 import org.apache.hadoop.io.compress.CompressionCodec;
- 12 import org.apache.hadoop.io.compress.CompressionCodecFactory;
- 13
- 14 public class FileDecompressor {
- 15 public static void main(String[] args) throws Exception {
- 16 String uri = args[0];
- 17 Configuration conf = new Configuration();
- 18 FileSystem fs = FileSystem.get(URI.create(uri), conf);
- 19
- 20 Path inputPath = new Path(uri);
- 21 CompressionCodecFactory factory = new CompressionCodecFactory(conf);
- 22 CompressionCodec codec = factory.getCodec(inputPath);
- 23 if (codec == null) {
- 24 System.err.println("No codec found for " + uri);
- 25 System.exit(1);
- 26 }
- 27 String outputUri = CompressionCodecFactory.removeSuffix(uri, codec
- 28 .getDefaultExtension());
- 29 InputStream in = null;
- 30 OutputStream out = null;
- 31 try {
- 32 in = codec.createInputStream(fs.open(inputPath));
- 33 out = fs.create(new Path(outputUri));
- 34 IOUtils.copyBytes(in, out, conf);
- 35 } finally {
- 36 IOUtils.closeStream(in);
- 37 IOUtils.closeStream(out);
- 38 }
- 39 }
- 40 }
CompressionCodecFactory从io.compression.codecs配置属性定义的列表中找到编码和解码器,默认情况下,Hadoop给出所有的编码和解码器。每个编码/加码器都知道默认文件扩展名。
压缩和输入分隔
考虑如何压缩哪些将由MapReduce处理的数据时,考虑压缩格式是否支持分隔很重要。
例如,gzip格式使用default来存储压缩过的数据,default将数据作为一系列压缩过的块存储,但是每块的开始没有指定用户在数据流中的任意点定位到下一个块的起始位置,而是自身与数据同步,所以gzip不支持分隔机制。
在MapReduce中使用压缩
如果要压缩MapReduce作业的输出,设置mapred.output.compress
为true,mapred.output.compression.codec
属性指定编码解码器。如果输入的文件时压缩过的,MapReduce读取时,它们会自动解压,根据文件扩展名来决定使用那一个压缩解码器。
- 01 package com.laos.hadoop;
- 02
- 03 import java.io.IOException;
- 04
- 05 import org.apache.hadoop.fs.Path;
- 06 import org.apache.hadoop.io.IntWritable;
- 07 import org.apache.hadoop.io.Text;
- 08 import org.apache.hadoop.io.compress.CompressionCodec;
- 09 import org.apache.hadoop.io.compress.GzipCodec;
- 10 import org.apache.hadoop.mapred.FileInputFormat;
- 11 import org.apache.hadoop.mapred.FileOutputFormat;
- 12 import org.apache.hadoop.mapred.JobClient;
- 13 import org.apache.hadoop.mapred.JobConf;
- 14
- 15
- 16 public class MaxTemperatureWithCompression {
- 17 public static void main(String[] args) throws IOException {
- 18 if (args.length != 2) {
- 19 System.err.println("Usage: MaxTemperatureWithCompression <input path> " +
- 20 "
串行化(序列化)
序列化:将结构化对象转换为字节流以便于通过网络进行传输或写入存储的过程。
反序列化:将字节流转为一系列结构化对象的过程。
序列化用在两个地方:进程间通信和持久存储。在Hadoop中,节点之间的进程间通信是用远程过程调用(RPC)。RPC协议将使用序列化将消息编码为二进制流(发送到远程节点),此后在接收端二进制流被反序列化为消息。Hadoop使用自己的序列化格式Writables。
Writable接口
- package org.apache.hadoop.io;
- import java.io.DataOutput;
- import java.io.DataInput;
- import java.io.IOException;
- public interface Writable {
- void write(DataOutput out) throws IOException; //将状态写入二进制格式的流
- void readFields(DataInput in) throws IOException; //从二进制格式的流读出其状态
- }
WritableComparable和Comparator
IntWritable实现了WritableComparable接口。而WritableComparable继承了Writable和Comparable。
类型的比较对MapReduce而言至关重要,键和键之间的比较是在排序阶段完成的。Hadoop提供饿了优化方法:
该接口允许执行者比较从流中读取的未被反序列化为对象的记录,省去了创建对象的所有开销。例如,IntWritable使用原始的compare()方法从每个字节数组的指定开始位置(S1和S2)和长度(L1和L2)读取整数直接比较。
- package org.apache.hadoop.io;
- import Java.util.Comparator;
- public interface RawComparator
extends Comparator { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
WritableComparator是RawComparator对WritableComparable类的一个通用实现。第一,它提供一个默认的对原始compare()函数调用,对要比较的对象进行反序列化,然后调用对象的compare()方法。 第二,充当RawComparator实例的一个工厂方法。
RawComparator
Writable的java基本类封装
Java基本类型 | Writable使用 | 序列化大小(字节) |
---|---|---|
布尔类型 | BooleanWritable | 1 |
字节型 | ByteWritable | 1 |
整型 | IntWritable | 4 |
VIntWritable | 1-5 | |
浮点行 | FloatWritable | 4 |
长整型 | LongWritable | 8 |
VLongWritable | 1-9 | |
双精度浮点型 | DoubleWritable | 8 |
基于文件的数据结构
SequenceFile类
SequenceFile为二进制键值对对提供一个持久化的数据结构。
写SequenceFile类
创建SequenceFile类:SequenceFile.createWriter(....)
- package com.laos.hadoop;
- import java.io.IOException;
- import java.net.URI;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IOUtils;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.SequenceFile;
- import org.apache.hadoop.io.Text;
- public class SequenceFileWriteDemo {
- private static final String[] DATA = { "One, two, buckle my shoe",
- "Three, four, shut the door", "Five, six, pick up sticks",
- "Seven, eight, lay them straight", "Nine, ten, a big fat hen" };
- public static void main(String[] args) throws IOException {
- String uri = args[0];
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(URI.create(uri), conf);
- Path path = new Path(uri);
- IntWritable key = new IntWritable();
- Text value = new Text();
- SequenceFile.Writer writer = null;
- try {
- writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),
- value.getClass()); //参数:文件系统,configuration,路径,键的类型和值的类型
- for (int i = 0; i < 100; i++) {
- key.set(100 - i);
- value.set(DATA[i % DATA.length]);
- System.out.printf("[%s]/t%s/t%s/n", writer.getLength(), key,
- value);
- writer.append(key, value);
- }
- } finally {
- IOUtils.closeStream(writer);
- }
- }
- }
读取SequenceFile类
- 01 package com.laos.hadoop;
- 02
- 03 import java.io.IOException;
- 04 import java.net.URI;
- 05
- 06 import org.apache.hadoop.conf.Configuration;
- 07 import org.apache.hadoop.fs.FileSystem;
- 08 import org.apache.hadoop.fs.Path;
- 09 import org.apache.hadoop.io.IOUtils;
- 10 import org.apache.hadoop.io.SequenceFile;
- 11 import org.apache.hadoop.io.Writable;
- 12 import org.apache.hadoop.util.ReflectionUtils;
- 13
- 14 public class SequenceFileReadDemo {
- 15 public static void main(String[] args) throws IOException {
- 16 String uri = args[0];
- 17 Configuration conf = new Configuration();
- 18 FileSystem fs = FileSystem.get(URI.create(uri), conf);
- 19 Path path = new Path(uri);
- 20 SequenceFile.Reader reader = null;
- 21 try {
- 22 reader = new SequenceFile.Reader(fs, path, conf);//创建reader
- 23 Writable key = (Writable) ReflectionUtils.newInstance(reader
- 24 .getKeyClass(), conf); //获取key的类型
- 25 Writable value = (Writable) ReflectionUtils.newInstance(reader
- 26 .getValueClass(), conf);//获取value的类型
- 27 long position = reader.getPosition();//获取位置
- 28 while (reader.next(key, value)) {//遍历
- 29 String syncSeen = reader.syncSeen() ? "*" : "";
- 30 System.out.printf("[%s%s]/t%s/t%s/n", position, syncSeen, key,
- 31 value);
- 32 position = reader.getPosition(); // beginning of next record
- 33 }
- 34 } finally {
- 35 IOUtils.closeStream(reader);
- 36 }
- 37 }
- 38 }
两种方法查找文件中指定的位置,第一种是seak()方法。如果文件中指定位置不是记录边界,reader会在调用next方法是失败。第二种是SequenceFile.Reader.sync(long pposition)把reader定位到下一个同步点
用命令行接口显示序列文件
使用-text选项显示文本格式的序列文件。% hadoop fs -text number.seq
序列文件的格式
SequeceFile是Hadoop API提供的一种二进制文件支持。这种二进制文件直接将
- 支持压缩,且可定制为基于Record或Block压缩(Block级压缩性能较优)
- 本地化任务支持:因为文件可以被切分,因此MapReduce任务时数据的本地化情况应该是非常好的。
- 难度低:因为是Hadoop框架提供的API,业务逻辑侧的修改比较简单。
SequenceFile 是一个由二进制序列化过的key/value的字节流组成的文本存储文件,它可以在map/reduce过程中的input/output 的format时被使用。在map/reduce过程中,map处理文件的临时输出就是使用SequenceFile处理过的。
SequenceFile分别提供了读、写、排序的操作类。
SequenceFile的操作中有三种处理方式:
- 不压缩数据直接存储。 //enum.NONE
- 压缩value值不压缩key值存储的存储方式。//enum.RECORD
- key/value值都压缩的方式存储。//enum.BLOCK
没有压缩和记录压缩的序列文件的内部结构:未压缩和记录压缩的结构是一样的,record由记录长度、键长度、键和值(或压缩过的值)构成。
块压缩的序列文件的内部结构:一个同步点内记录笔数、压缩键的长度、压缩过的键值、压缩过值的长度和压缩值。压缩块一次压缩多个记录。块的最小大小由属性:io.seqfile.compress.blocksize
定义。
MapFile
MapFile是经过排序的带索引的SequenceFile,可以根据键值进行查找。
写MapFile
- 01 package com.laos.hadoop;
- 02
- 03 import java.io.IOException;
- 04 import java.net.URI;
- 05
- 06 import org.apache.hadoop.conf.Configuration;
- 07 import org.apache.hadoop.fs.FileSystem;
- 08 import org.apache.hadoop.io.IOUtils;
- 09 import org.apache.hadoop.io.IntWritable;
- 10 import org.apache.hadoop.io.MapFile;
- 11 import org.apache.hadoop.io.Text;
- 12
- 13 public class MapFileWriteDemo {
- 14 private static final String[] DATA = { "One, two, buckle my shoe",
- 15 "Three, four, shut the door", "Five, six, pick up sticks",
- 16 "Seven, eight, lay them straight", "Nine, ten, a big fat hen" };
- 17
- 18 public static void main(String[] args) throws IOException {
- 19 String uri = args[0];
- 20 Configuration conf = new Configuration();
- 21 FileSystem fs = FileSystem.get(URI.create(uri), conf);
- 22 IntWritable key = new IntWritable();
- 23 Text value = new Text();
- 24 MapFile.Writer writer = null;
- 25 try {
- 26 writer = new MapFile.Writer(conf, fs, uri, key.getClass(), value
- 27 .getClass()); //创建MapFile Writer的实例
- 28
- 29 for (int i = 0; i < 1024; i++) {
- 30 key.set(i + 1);
- 31 value.set(DATA[i % DATA.length]);
- 32 writer.append(key, value);
- 33 }
- 34 } finally {
- 35 IOUtils.closeStream(writer);
- 36 }
- 37 }
- 38 }
% hadoop MapFileWriteDemo numbers.map
numbers.map确实是一个目录,包含data和index两个文件。数据文件包括所有的输入,index文件包含一小部分键和键到data文件中偏移量的映射。索引中键的个数由io.map.index.interval属性设置。
读MapFile
顺序遍历MapFile过程和读取SequenceFile过程相似:创建一个MapFile Reader,调用next函数直到返回false。
- public boolean next(WritableComparable key, Writable val) throws IOException
随机访问:
- public Writable get(WritableComparable key, Writable val) throws IOException
将SequenceFile转换成MapFile
关键是给SequenceFile重建索引:使用MapFile的静态方法fix()。
- 01 package com.laos.hadoop;
- 02
- 03 import java.net.URI;
- 04
- 05 import org.apache.hadoop.conf.Configuration;
- 06 import org.apache.hadoop.fs.FileSystem;
- 07 import org.apache.hadoop.fs.Path;
- 08 import org.apache.hadoop.io.MapFile;
- 09 import org.apache.hadoop.io.SequenceFile;
- 10
- 11 public class MapFileFixer {
- 12 public static void main(String[] args) throws Exception {
- 13 String mapUri = args[0];
- 14
- 15 Configuration conf = new Configuration();
- 16
- 17 FileSystem fs = FileSystem.get(URI.create(mapUri), conf);
- 18 Path map = new Path(mapUri);
- 19 Path mapData = new Path(map, MapFile.DATA_FILE_NAME);
- 20
- 21 // Get key and value types from data sequence file
- 22 SequenceFile.Reader reader = new SequenceFile.Reader(fs, mapData, conf);
- 23 Class keyClass = reader.getKeyClass();
- 24 Class valueClass = reader.getValueClass();
- 25 reader.close();
- 26
- 27 // Create the map file index file
- 28 long entries = MapFile.fix(fs, map, keyClass, valueClass, false, conf);
- 29 System.out.printf("Created MapFile %s with %d entries/n", map, entries);
- 30 }
- 31 }
本章节参考:http://blog.csdn.net/sheperd_shu/article/details/6437845
转载本站内容时,请务必注明来自W3xue,违者必究。