2.5.1 KeyedStream与物理分区
通过MapReduce算法可以对接入数据按照指定Key进行数据分区,然后将相同Key值的数据路由到同一分区中,聚合统计算子再基于数据集进行聚合操作,对于Flink也不例外。在DataStream API中主要是通过执行keyBy()方法,并指定对应的KeySelector,来实现按照指定Key对数据进行分区操作,DataStream经过转换后会生成KeyedStream数据集。当然数据集的物理分区操作并不局限于keyBy()方法,还有其他类型的物理转换可以实现将DataStream中的数据按照指定的规则路由到下游的分区,如DataStream.shuffle()分区操作。
下面我们从源码实现的角度深入了解DataStream转换操作中物理分区的实现。
1.KeyedStream设计与实现
我们先看KeyedStream的具体实现。如代码清单2-35所示,根据KeySelector接口实现类创建KeyedStream数据集,KeySelector接口提供了getKey()方法,能够从StreamRecord中获取Key字段信息。
代码清单2-35 DataStream.keyBy()方法定义
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) { Preconditions.checkNotNull(key); return new KeyedStream<>(this, clean(key)); }
如代码清单2-36所示,从KeyedStream构造器中可以看出,最终会创建PartitionTransformation,这里我们称之为物理分区操作,其主要功能就是对数据元素在上下游算子之间进行重新分区。
在PartitionTransformation的创建过程中会同时构建KeyGroupStreamPartitioner实例作为参数。KeyGroupStreamPartitioner是按照Key进行分组发送的分区器。这里的KeyGroupStreamPartitioner实际上继承了ChannelSelector接口,ChannelSelector主要用于任务执行中,算子根据指定Key的分组信息选择下游节点对应的InputChannel,并将数据元素根据指定Key发送到下游指定的InputChannel中,最终实现对数据的分区操作。
代码清单2-36 KeyedStream.构造器方法
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) { this( dataStream, new PartitionTransformation<>( dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator. DEFAULT_LOWER_BOUND_MAX_PARALLELISM)), keySelector, keyType); }
2.StreamPartitioner数据分区
KeyGroupStreamPartitioner实际上就是对数据按照Key进行分组,然后根据Key的分组确定数据被路由到哪个下游的算子中。如图2-16所示,KeyGroupStreamPartitioner实际上继承自StreamPartitioner抽象类,而StreamPartitioner又实现了ChannelSelector接口,用于选择下游的InputChannel。InputChannel的概念我们会在第7章进行介绍,这里可以将其理解为基于Netty中的channel实现的跨网络数据输入管道,经过网络栈传输的数据最终发送到指定下游算子的InputChannel中。
图2-16 StreamPartitioner UML关系图
从图2-16中可以看出,根据分区策略的不同,StreamPartitioner的实现类也有所区别,这些实现类分别被应用在DataStream对应的转换操作中,例如ShufflePartitioner和DataStream.shuffe()对应,我们通过表2-3进行梳理。
表2-3 DataStream的主要物理转换操作
下面我们通过RebalancePartitioner具体实例了解分区器是如何对数据进行物理分区转换的。
如代码清单2-37所示,RebalancePartitioner.selectChannel()方法实现了对InputChannel的选择。在RebalancePartitioner中会记录nextChannelToSendTo,然后通过(nextChannelToSendTo + 1) % numberOfChannels公式计算并选择下一数据需要发送的InputChannel。实际上是对所有下游的InputChannel进行轮询,均匀地将数据发送到下游的Task。
代码清单2-37 RebalancePartitioner.selectChannel()方法定义
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels; return nextChannelToSendTo; }
对于RescalePartitioner来讲,上游Task发送到下游Task的数据元素取决于上下游之间Task实例的并行度。数据会在本地进行轮询,然后发送到下游的Task实例中。如果上游Task具有并行性2,而下游Task具有并行性4,则一个上游Task实例会将元素均匀分配到指定的两个下游Task实例中;而另一个上游Task将分配给另外两个下游Task。上游Task的所有数据会在本地对下游的Task进行轮询,然后均匀发送到已经分配的下游Task实例中。
如代码清单2-38所示,在RescalePartitioner.selectChannel()方法中,通过改变nextChannelToSendTo的值选择下一个需要发送的InputChannel,而方法中的numberOfChannels实际上是根据下游操作的并行度确定的。
代码清单2-38 RescalePartitioner.selectChannel()方法定义
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { if (++nextChannelToSendTo >= numberOfChannels) { nextChannelToSendTo = 0; } return nextChannelToSendTo; }