-
Notifications
You must be signed in to change notification settings - Fork 413
2.4_HDFS Writer
Hdfs-Writer插件主要用于将其他类型数据源(如RDBMS和HBase)的增量数据进行处理和转换,写入HDFS相应的文件路径下。数据处理平台每天凌晨对T-1的数据进行清洗、去重处理,把同步过去的增量数据更新到spark-hive表中,供大数据分析和查询使用。
Hdfs-Writer插件自定义的BaseRecordHandler继承自com.ucar.datalink.worker.api.handle.AbstractHandler,使用了系统提供的默认流程(详见深入领域),并根据源端数据源类型扩展了两个子类RdbEventRecordHandler和HRecordHandler,分别负责将RDBMS和HBase类型的数据进行转换,最终写入HDFS相应路径的文件中。以源端为mysql数据库为例,具体的数据写入流程如下图所示(源端为HBase时,Hdfs-Writer的数据处理流程相同,只是写入Hadoop的文件路径不同):
-
【Records分组】
> 首先将Records按目标端库名、表名进行分组,不同分组并发写入。
> 然后将每张表的Records根据mapping-id再次进行聚合,因为多表合一的场景下,会存在同一个TableGroup下mapping-id不相同的情况。
> 聚合之后的mappingRecords中,所有Record的namespace和name都相同,并且所有Record对应的MediaMappingInfo对象都相同。 -
【Records转换】
> 将mappingRecords按batchSize进行拆分,分批对数据进行转换。首先根据目标端HDFS数据源信息和Hdfs-Writer参数构建HdfsConfig,包括HDFS地址、ZK配置、hadoopUser、写数据包大小、高可用设置等。接着将batchRecords构造为一个Map对象,key为Hdfs文件路径,value为构造好的待写入数据,准备向HDFS执行写入。
> 构建Hdfs文件路径:RDBMS的数据类型为delete时,为其设置单独的写入文件路径;HBase的schemaName为"default"时,Hdfs文件路径中省略schemaName。
> 构造写入数据:将每个RdbEventRecord/HRecord中的各列转换为map并序列化,其中key为columnName,value为columnValue,并为RDBMS类型的数据另外新增三个字段:binlog_ts和binlog_seq组合来标识数据写入的先后顺序,binlog_eventtime标识数据的实际产生时间。 -
【数据写入】
> 单表支持按BatchSize拆分写入。即将每张表按mapping-id聚合后的Records根据batchSize进行拆分,分批进行数据转换和写入。将每批次的batchRecords构造好对应的数据之后,通过调用写入对应的HDFS文件路径。
> 向HDFS的某个文件路径写入数据之前,首先要从FileStreamHolder管理的文件流tokens中获取其文件流,若不存在,则由HDFS的DistributedFileSystem实例来创建该路径的文件流,然后通过文件流写入数据(数据大小不能大于Hdfs-Writer设置的写入HDFS的数据包大小),并根据HDFS的提交方式进行hflush&hsync,二者可动态切换,前者保证把数据刷到OS级缓存,后者保证数据落盘。若写入过程中出现异常,则进行关流操作。 -
【文件流管理】
> FileStreamHolder是HDFS文件流管理类,由TaskWriter主线程调用。一个文件对应了一个文件流,创建文件流时FSNamesystem会把流对应的path放到Lease中,关闭文件流时FSNamesystem会把流对应的path从Lease中移除。
> 对文件流的管理需要保证以下几个原则:
1)流的生命周期应该和Task保持一致,Task运行过程中流随用随创建,Task关闭时把其占有的所有流也关闭,这样才能保证在发生Reblance后,不会出现租约被其它DFSClient占用的问题。
2)超时不用的流要及时清理,保证其它使用者有机会获取权限,比如发生日切之后,所有的数据都写到新文件中了,前一天的文件不会再有写入操作,那么应该及时关闭前一天的文件流。
-
【按表并发写入】
> 一批次的Records支持按表分组并发写入。Hdfs-Writer按表开启了多个线程,每个线程负责一张表的HDFS写入操作,提高写数据的吞吐量。 -
【解决租约问题】
> HDFS的读写模式为 "write-once-read-many",为了实现write-once,需要设计一种互斥机制,即某个文件在同一时刻只能被一个客户端写入,租约应运而生,保证了数据的一致性。
> DataLink的应用场景为:一个Worker进程中运行着一批Task,每个Task的Hdfs-Writer负责管理Hdfs中的N个文件,当发生(Re-)balance的时候,Task关闭之前会先进行关流操作,及时释放租约,保证其它Worker可以接管文件写入。关流不成功的情况下,Task重新分配可能会触发Other-Create或者Re-Create问题。
> 解决方法:
1)Other-Create异常中包含了other-Dfsclient的IP信息,我们可以调用other-worker提供的接口,远程关闭出问题的流,如果关闭失败或者访问出现超时(宕机的时候会超时),再进行force recovery操作。
2)流关闭时可能会出现Re-Create异常,如果出现异常,需要进行force recovery操作,否则的话租约将一直不可释放,一直报Re-Create异常。
3)FileStreamHolder在关闭的时候有些流可能会关闭失败,此时token不能被remove,而是需要把tokenSize大于0的Holder放到leakHolders队列中,后台线程定时清理这些holder,避免引发Lease的ReCreate或OtherCreate问题。
4)后台定时线程进行的另一项工作是,文件流超时自动关闭。定时对文件流进行空闲时间进行检测,某个流的空闲时间超过指定最大空闲时间(默认60秒),会自动关闭。
注:DataLink租约问题详解和源码分析参见文章链接:https://www.cnblogs.com/ucarinc/p/8064447.html -
【流的并发处理】
> FileStreamHolder的close方法和getStreamToken方法会被不同的线程调用,tokens存在并发问题,引入读写锁,进行并发处理。
> 具体的并发场景(问题)为:
close方法被TaskWriter主线程调用,getStreamToken方法被AbstractHandler的executorService线程调用,如果不进行并发控制,可能会导致close执行结束后,仍然有新的FileStreamToken(ps:假设其对应的pathString名字为p1)加入进来,那么新加入的token对应的FileStream没有机会被关闭。待Task重新启动之后,会实例化一个新的FileStreamHolder对象,新对象的tokens里面并没有p1对应的FileStreamToken,那么会尝试去创建,因为之前的那个FileStream没有被关闭,本次创建会导致hdfs的租约异常:because current leaseholder is trying to recreate file,并且无法自动恢复。
> 注:close方法被调用的时候TaskWriter主线程都已经接近尾声了,AbstractHandler的executorService为什么还有线程在运行??
参见AbstractHandler的submitAndWait方法,我们用到了ExecutorCompletionService,当出现异常的时候,会尝试执行Future.cancel,但这并不会导致线程的立即结束(参见Thread.interrupt()方法),所以当主线程接近尾声的时候,线程池中的操作是有可能还未结束的。
-
【访问多个Hadoop集群】
> 在一个Worker进程中,可能运行着多个Task的Hdfs-Writer,若要一个进程内同时访问多个hadoop集群,那就需要针对每个集群分别创建各自的FileSystem实例,需要做的有两点:
1)保证针对这多个集群的Configuration实例中的 "fs.defaultFS" 的配置是不同的;
2)HADOOP_USER_NAME属性不能通过System.setProperty方法设置,应该调用FileSystem的get方法时动态传入。 -
【缓存资源】
> 对于同一个HDFS,可能会有多次写入,但是不能每次访问都创建一个Configuration,为了提高性能,Hdfs-Writer使用CacheBuilder为每个HDFS缓存一份配置,从而减少后续的网络传输开销,加快查找连接速度。
> 对于同一个Hadoop集群,不能每次写入都要创建其FileSystem实例,因此同样使用CacheBuilder进行缓存,减少重复创建开销。
> 对于同一个HDFS文件,可能会有不同的客户端写入,为了实现互斥机制,每个客户端进行写入时都会生成一个ReentrantLock,同样使用CacheBuilder进行缓存,减少内存开销。
> 对于同一个HDFS文件,可能会有多次写入,即要多次用到该文件的文件流,所以将各个HDFS文件的文件流在内存中缓存起来,如果不进行缓存,而是频繁的创建和关闭文件流,存在两个问题:
1)性能问题:打开和关闭流都是耗费资源的操作,每接收到一次请求都创建和关闭一次文件流,不合理。
2)租约释放问题:HDFS文件流的关闭是一个较复杂的过程,针对同一文件,如果刚刚执行完关闭操作,立刻又开启一个文件流,频繁如此操作容易出现租约冲突,最终会积压一堆冲突,甚至无法恢复。 -
【文件拆分方式】
> HDFS的文件拆分方式支持DAY, HOUR, HALFHOUR三种(同一张表每天/每小时/每半小时生成一个文件)。可以在映射中配置HdfsFileParameter参数,设置多种FileSplitStrategy,每种文件拆分方式有不同的生效日期,自动取距离当前时间最近的生效时间对应的文件拆分方式生成文件。没有明确配置的话,文件以天为单位进行管理,同一张表每天对应一个文件。
在继承Writer插件通用参数基类(PluginWriterParameter,详见深入Task)的基础上,HdfsWriterParameter还针对HDFS类型数据库的特点扩展了自己的参数类,用户可以根据需求在页面更改其参数配置。
HdfsWriterParameter扩展参数 | 参数描述 | 默认值 | 备注 |
---|---|---|---|
CommitMode |
HDFS的提交方式 |
Hflush |
CommitMode分为Hflush, Hsync两种 |
streamLeisureLimit |
流的空闲时间最大值 |
600000 |
单位:毫秒,用于流的超时关闭 |
hdfsPacketSize |
客户端写入HDFS的数据包大小 |
20971520 |
20*1024*1024,packet-size设置为20M,保证数据一次性发送到hdfs-server端,避免被截断 |
hsyncInterval |
当CommitMode为Hflush的时候,进行hsync的时间间隔 |
30000 |
单位:毫秒,用于控制当CommitMode为Hflush的时候,多长时间进行一次hsync操作 |
hbasePath |
HBase增量数据在HDFS的路径前缀 |
"user/hbase" |
根据文件拆分方式写入具体的路径 |
mysqlBinlogPath |
Mysql增量数据在HDFS的路径前缀 |
"user/mysql/binlog" |
根据文件拆分方式写入具体的路径 |
在Mapping通用配置参数基础上,Hdfs-Writer插件还定义了自己的参数类HdfsFileParameter,用来设置写入HDFS路径的文件拆分策略。相关Mapping配置参数如下:
Mapping参数 |
参数描述 |
默认值 |
备注 |
---|---|---|---|
targetMediaSourceId |
目标端数据源的id |
无 | 目标数据源,这里指要同步到的HDFS集群 |
targetMediaName |
HDFS中表的名字 |
无 |
支持目标端有表别名,用于源端和目标端表名不一致的情况 |
targetMediaNamespace |
目标端数据源的namespace |
无 |
目标端数据源schema,对HDFS来说默认为空 |
ColumnMappingMode |
列映射模式 |
NONE |
支持列名黑白名单与列别名: NONE,//所有列均同步到目标端 INCLUDE,//只同步白名单中的列,可以设置列别名 EXCLUDE;//黑名单中的列不同步 |
writePriority |
同步优先级 |
5 |
数值越小优先级越高 |
interceptorId |
拦截器id |
无 |
拦截器可以对Records进行特殊处理,满足少数特定功能的需求 |
skipIds |
主键黑名单 |
无 |
可以通过指定主键id来过滤源端的某些异常Records |
valid |
是否有效 |
是 |
同步映射有效时,才进行同步 |
HdfsFileParameter |
设置写入HDFS路径的文件拆分策略 |
每天一个文件 |
FileSplitMode支持DAY, HOUR, HALFHOUR三种文件拆分方式,默认按天生成文件。用户可以根据需要在映射中进行参数配置,参数配置格式举例: {"@type":"com.ucar.datalink.domain.plugin.writer.hdfs.HdfsFileParameter","fileSplitStrategieList":[{"effectiveDate":"2016-11-30","fileSplitMode":"HALFHOUR"}]} |
关联技术 | 稳定版本 | 待测版本 |
---|---|---|
hadoop-hdfs |
2.6.3 |
参考资料: