Spark源码分析(2)

分布式计算平台 Spark 源码分析(2)

Posted by UNMOVE on February 1, 2018

支持新类的修改流程

最近对代码做了封装,将对对象的转化部分封装成了一个新类FlintSerializer,同时这个新类也会用于shuffle read和shuffle write的序列化和反序列化操作。上层代码只需将对象或者字节数组传入这个类,FlintSerialize会根据其所持有的handle自动将对象转化为字节数组或是将字节数组转化为对象,做了封装之后,上层代码的逻辑更加清晰,而且在添加对新类的支持时,不需要对上层代码做任何修改。接下来我会介绍如何在代码中添加对新类的支持。

FlintType.scala

新类必须声明在FlintType.scala文件中(感觉很low就对了),在静态类FlintType中给类创建别名是为了统一名称,不创建也没关系。

FlintSerializer.scala

新类在声明之后,必须在在此文件中添加其字节化和反字节化信息,文件中主要涉及的4个方法:writeKey、writeValue、readKey和readValue。分别对应于:对key的字节化、对value的字节化、对key的对象化和对value的对象化。根据声明的类是将用作key,还是用作value,添加对应逻辑即可。 注意:部分基本类型(Int/Long/Double/String)的字节化/对象化逻辑已经实现,要添加对新类的支持只需用基本逻辑作组合即可 因为String的长度是可变的,所以暂时规定String的长度不能超过20个字符。

ShuffleHandle.scala

之所以上层可以脱离对handle中的type依赖,是因为我们在handle中封装了对象转化为字节数组之后的长度信息,该长度信息很容易可以预先解析出来,上层对堆外内存的读写都是依赖于该长度信息的。所以在整合之前(整合之后可以自动计算长度信息),我们想添加对新类的支持,需要在ShuffleHandle的_size方法中添加新类所对应type的长度

注意:若新类中包含字符串,那么一个字符串占20个字符,即40个字节

最后一步 目前还是手动版所以我们还需去Dependency.scala或FlintHashShuffleReader.scala中设置类型,此处可用字符串替换数字,看起来可能更直观,不过并没有什么实质影响。

待完善的逻辑

在shuffle write需要进行map端聚合时,目前还未实现获取用户的聚合函数,并且此部分逻辑还未与FlintSerializer相关联,还是单独的聚合逻辑,所以测试代码中若包含reduce操作,目前还需在UnsafeShuffleExternalSorter.java中手动添加聚合逻辑,或者关掉map端的聚合直接等到reduce端再一起聚合。