risingwave_meta::stream::source_manager

Function reassign_splits

source
fn reassign_splits<T>(
    fragment_id: FragmentId,
    actor_splits: HashMap<ActorId, Vec<T>>,
    discovered_splits: &BTreeMap<SplitId, T>,
    opts: SplitDiffOptions,
) -> Option<HashMap<ActorId, Vec<T>>>
where T: SplitMetaData + Clone,
Expand description

Reassigns splits if there are new splits or dropped splits, i.e., actor_splits and discovered_splits differ, or actors are rescheduled.

The existing splits will remain unmoved in their currently assigned actor.

If an actor has an upstream actor, it should be a backfill executor, and its splits should be aligned with the upstream actor. reassign_splits should not be used in this case. Use align_backfill_splits instead.

  • fragment_id: just for logging

§Different connectors’ behavior of split change

§Kafka and Pulsar

They only support increasing the number of splits via adding new empty splits. Old data is not moved.

§Kinesis

It supports pairwise shard split and merge.

In both cases, old data remain in the old shard(s) and the old shard is still available. New data are routed to the new shard(s). After the retention period has expired, the old shard will become EXPIRED and isn’t listed any more. In other words, the total number of shards will first increase and then decrease.

See also: