risingwave_meta/stream/
source_manager.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::id::ObjectId;
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_common::panic_if_debug;
29use risingwave_connector::WithOptionsSecResolved;
30use risingwave_connector::error::ConnectorResult;
31use risingwave_connector::source::{
32    ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
33    SplitMetaData, fill_adaptive_split,
34};
35use risingwave_meta_model::SourceId;
36use risingwave_pb::catalog::Source;
37use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
38pub use split_assignment::{SplitDiffOptions, SplitState, reassign_splits};
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{Mutex, MutexGuard, oneshot};
42use tokio::task::JoinHandle;
43use tokio::time::MissedTickBehavior;
44use tokio::{select, time};
45pub use worker::create_source_worker;
46use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
47
48use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan, SharedActorInfos};
49use crate::manager::{MetaSrvEnv, MetadataManager};
50use crate::model::{ActorId, FragmentId, StreamJobFragments};
51use crate::rpc::metrics::MetaMetrics;
52use crate::{MetaError, MetaResult};
53
54pub type SourceManagerRef = Arc<SourceManager>;
55pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
56pub type DiscoveredSourceSplits = HashMap<SourceId, Vec<SplitImpl>>;
57
58// ALTER CONNECTOR parameters, specifying the new parameters to be set for each job_id (source_id/sink_id)
59pub type ConnectorPropsChange = HashMap<ObjectId, HashMap<String, String>>;
60
61const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
62
63/// `SourceManager` keeps fetching the latest split metadata from the external source services ([`worker::ConnectorSourceWorker::tick`]),
64/// and sends a split assignment command if split changes detected ([`Self::tick`]).
65pub struct SourceManager {
66    pub paused: Mutex<()>,
67    barrier_scheduler: BarrierScheduler,
68    core: Mutex<SourceManagerCore>,
69    pub metrics: Arc<MetaMetrics>,
70}
71pub struct SourceManagerCore {
72    metadata_manager: MetadataManager,
73
74    /// Managed source loops
75    managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
76    /// Fragments associated with each source
77    source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
78    /// `source_id` -> `(fragment_id, upstream_fragment_id)`
79    backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
80
81    env: MetaSrvEnv,
82}
83
84pub struct SourceManagerRunningInfo {
85    pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
86    pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
87}
88
89impl SourceManagerCore {
90    fn new(
91        metadata_manager: MetadataManager,
92        managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
93        source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
94        backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
95        env: MetaSrvEnv,
96    ) -> Self {
97        Self {
98            metadata_manager,
99            managed_sources,
100            source_fragments,
101            backfill_fragments,
102            env,
103        }
104    }
105
106    /// Updates states after all kinds of source change.
107    pub fn apply_source_change(&mut self, source_change: SourceChange) {
108        let mut added_source_fragments = Default::default();
109        let mut added_backfill_fragments = Default::default();
110        let mut finished_backfill_fragments = Default::default();
111        let mut fragment_replacements = Default::default();
112        let mut dropped_source_fragments = Default::default();
113        let mut dropped_source_ids = Default::default();
114        let mut recreate_source_id_map_new_props: Vec<(SourceId, HashMap<String, String>)> =
115            Default::default();
116
117        match source_change {
118            SourceChange::CreateJob {
119                added_source_fragments: added_source_fragments_,
120                added_backfill_fragments: added_backfill_fragments_,
121            } => {
122                added_source_fragments = added_source_fragments_;
123                added_backfill_fragments = added_backfill_fragments_;
124            }
125            SourceChange::CreateJobFinished {
126                finished_backfill_fragments: finished_backfill_fragments_,
127            } => {
128                finished_backfill_fragments = finished_backfill_fragments_;
129            }
130
131            SourceChange::DropMv {
132                dropped_source_fragments: dropped_source_fragments_,
133            } => {
134                dropped_source_fragments = dropped_source_fragments_;
135            }
136            SourceChange::ReplaceJob {
137                dropped_source_fragments: dropped_source_fragments_,
138                added_source_fragments: added_source_fragments_,
139                fragment_replacements: fragment_replacements_,
140            } => {
141                dropped_source_fragments = dropped_source_fragments_;
142                added_source_fragments = added_source_fragments_;
143                fragment_replacements = fragment_replacements_;
144            }
145            SourceChange::DropSource {
146                dropped_source_ids: dropped_source_ids_,
147            } => {
148                dropped_source_ids = dropped_source_ids_;
149            }
150
151            SourceChange::UpdateSourceProps {
152                source_id_map_new_props,
153            } => {
154                for (source_id, new_props) in source_id_map_new_props {
155                    recreate_source_id_map_new_props.push((source_id, new_props));
156                }
157            }
158        }
159
160        for source_id in dropped_source_ids {
161            let dropped_fragments = self.source_fragments.remove(&source_id);
162
163            if let Some(handle) = self.managed_sources.remove(&source_id) {
164                handle.terminate(dropped_fragments);
165            }
166            if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
167                // TODO: enable this assertion after we implemented cleanup for backfill fragments
168                // debug_assert!(
169                //     fragments.is_empty(),
170                //     "when dropping source, there should be no backfill fragments, got: {:?}",
171                //     fragments
172                // );
173            }
174        }
175
176        for (source_id, fragments) in added_source_fragments {
177            self.source_fragments
178                .entry(source_id)
179                .or_default()
180                .extend(fragments);
181        }
182
183        for (source_id, fragments) in added_backfill_fragments {
184            self.backfill_fragments
185                .entry(source_id)
186                .or_default()
187                .extend(fragments);
188        }
189
190        for (source_id, fragments) in finished_backfill_fragments {
191            let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
192                panic!(
193                    "source {} not found when adding backfill fragments {:?}",
194                    source_id, fragments
195                );
196            });
197            handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
198        }
199
200        for (source_id, fragment_ids) in dropped_source_fragments {
201            self.drop_source_fragments(Some(source_id), fragment_ids);
202        }
203
204        for (old_fragment_id, new_fragment_id) in fragment_replacements {
205            // TODO: add source_id to the fragment_replacements to avoid iterating all sources
206            self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
207
208            for fragment_ids in self.backfill_fragments.values_mut() {
209                let mut new_backfill_fragment_ids = fragment_ids.clone();
210                for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
211                    assert_ne!(
212                        fragment_id, upstream_fragment_id,
213                        "backfill fragment should not be replaced"
214                    );
215                    if *upstream_fragment_id == old_fragment_id {
216                        new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
217                        new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
218                    }
219                }
220                *fragment_ids = new_backfill_fragment_ids;
221            }
222        }
223
224        for (source_id, new_props) in recreate_source_id_map_new_props {
225            if let Some(handle) = self.managed_sources.get_mut(&source_id) {
226                // the update here should not involve fragments change and split change
227                // Or we need to drop and recreate the source worker instead of updating inplace
228                let props_wrapper =
229                    WithOptionsSecResolved::without_secrets(new_props.into_iter().collect());
230                let props = ConnectorProperties::extract(props_wrapper, false).unwrap(); // already checked when sending barrier
231                handle.update_props(props);
232                tracing::info!("update source {source_id} properties in source manager");
233            } else {
234                tracing::info!("job id {source_id} is not registered in source manager");
235            }
236        }
237    }
238
239    fn drop_source_fragments(
240        &mut self,
241        source_id: Option<SourceId>,
242        dropped_fragment_ids: BTreeSet<FragmentId>,
243    ) {
244        if let Some(source_id) = source_id {
245            if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
246                let mut dropped_ids = vec![];
247                let managed_fragment_ids = entry.get_mut();
248                for fragment_id in &dropped_fragment_ids {
249                    managed_fragment_ids.remove(fragment_id);
250                    dropped_ids.push(*fragment_id);
251                }
252                if let Some(handle) = self.managed_sources.get(&source_id) {
253                    handle.drop_fragments(dropped_ids);
254                } else {
255                    panic_if_debug!(
256                        "source {source_id} not found when dropping fragment {dropped_ids:?}",
257                    );
258                }
259                if managed_fragment_ids.is_empty() {
260                    entry.remove();
261                }
262            }
263        } else {
264            for (source_id, fragment_ids) in &mut self.source_fragments {
265                let mut dropped_ids = vec![];
266                for fragment_id in &dropped_fragment_ids {
267                    if fragment_ids.remove(fragment_id) {
268                        dropped_ids.push(*fragment_id);
269                    }
270                }
271                if !dropped_ids.is_empty() {
272                    if let Some(handle) = self.managed_sources.get(source_id) {
273                        handle.drop_fragments(dropped_ids);
274                    } else {
275                        panic_if_debug!(
276                            "source {source_id} not found when dropping fragment {dropped_ids:?}",
277                        );
278                    }
279                }
280            }
281        }
282    }
283}
284
285impl SourceManager {
286    const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
287
288    pub async fn new(
289        barrier_scheduler: BarrierScheduler,
290        metadata_manager: MetadataManager,
291        metrics: Arc<MetaMetrics>,
292        env: MetaSrvEnv,
293    ) -> MetaResult<Self> {
294        let mut managed_sources = HashMap::new();
295        {
296            let sources = metadata_manager.list_sources().await?;
297            for source in sources {
298                create_source_worker_async(source, &mut managed_sources, metrics.clone())?
299            }
300        }
301
302        let source_fragments = metadata_manager
303            .catalog_controller
304            .load_source_fragment_ids()
305            .await?
306            .into_iter()
307            .map(|(source_id, fragment_ids)| {
308                (
309                    source_id as SourceId,
310                    fragment_ids.into_iter().map(|id| id as _).collect(),
311                )
312            })
313            .collect();
314        let backfill_fragments = metadata_manager
315            .catalog_controller
316            .load_backfill_fragment_ids()
317            .await?;
318
319        let core = Mutex::new(SourceManagerCore::new(
320            metadata_manager,
321            managed_sources,
322            source_fragments,
323            backfill_fragments,
324            env,
325        ));
326
327        Ok(Self {
328            barrier_scheduler,
329            core,
330            paused: Mutex::new(()),
331            metrics,
332        })
333    }
334
335    pub async fn validate_source_once(
336        &self,
337        source_id: SourceId,
338        new_source_props: WithOptionsSecResolved,
339    ) -> MetaResult<()> {
340        let props = ConnectorProperties::extract(new_source_props, false).unwrap();
341
342        {
343            let mut enumerator = props
344                .create_split_enumerator(Arc::new(SourceEnumeratorContext {
345                    metrics: self.metrics.source_enumerator_metrics.clone(),
346                    info: SourceEnumeratorInfo { source_id },
347                }))
348                .await
349                .context("failed to create SplitEnumerator")?;
350
351            let _ = tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, enumerator.list_splits())
352                .await
353                .context("failed to list splits")??;
354        }
355        Ok(())
356    }
357
358    /// For replacing job (alter table/source, create sink into table).
359    #[await_tree::instrument]
360    pub async fn handle_replace_job(
361        &self,
362        dropped_job_fragments: &StreamJobFragments,
363        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
364        replace_plan: &ReplaceStreamJobPlan,
365    ) {
366        // Extract the fragments that include source operators.
367        let dropped_source_fragments = dropped_job_fragments.stream_source_fragments();
368
369        self.apply_source_change(SourceChange::ReplaceJob {
370            dropped_source_fragments,
371            added_source_fragments,
372            fragment_replacements: replace_plan.fragment_replacements(),
373        })
374        .await;
375    }
376
377    /// Updates states after all kinds of source change.
378    /// e.g., split change (`post_collect` barrier) or scaling (`post_apply_reschedule`).
379    #[await_tree::instrument("apply_source_change({source_change})")]
380    pub async fn apply_source_change(&self, source_change: SourceChange) {
381        let need_force_tick = matches!(source_change, SourceChange::UpdateSourceProps { .. });
382        let updated_source_ids = if let SourceChange::UpdateSourceProps {
383            ref source_id_map_new_props,
384        } = source_change
385        {
386            source_id_map_new_props.keys().cloned().collect::<Vec<_>>()
387        } else {
388            Vec::new()
389        };
390
391        {
392            let mut core = self.core.lock().await;
393            core.apply_source_change(source_change);
394        }
395
396        // Force tick for updated source workers
397        if need_force_tick {
398            self.force_tick_updated_sources(updated_source_ids).await;
399        }
400    }
401
402    /// create and register connector worker for source.
403    #[await_tree::instrument("register_source({})", source.name)]
404    pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
405        tracing::debug!("register_source: {}", source.get_id());
406        let mut core = self.core.lock().await;
407        let source_id = source.get_id();
408        if core.managed_sources.contains_key(&source_id) {
409            tracing::warn!("source {} already registered", source_id);
410            return Ok(());
411        }
412
413        let handle = create_source_worker(source, self.metrics.clone())
414            .await
415            .context("failed to create source worker")?;
416
417        core.managed_sources.insert(source_id, handle);
418
419        Ok(())
420    }
421
422    /// register connector worker for source.
423    pub async fn register_source_with_handle(
424        &self,
425        source_id: SourceId,
426        handle: ConnectorSourceWorkerHandle,
427    ) {
428        let mut core = self.core.lock().await;
429        if core.managed_sources.contains_key(&source_id) {
430            tracing::warn!("source {} already registered", source_id);
431            return;
432        }
433
434        core.managed_sources.insert(source_id, handle);
435    }
436
437    pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
438        let core = self.core.lock().await;
439
440        SourceManagerRunningInfo {
441            source_fragments: core.source_fragments.clone(),
442            backfill_fragments: core.backfill_fragments.clone(),
443        }
444    }
445
446    /// Checks whether the external source metadata has changed, and sends a split assignment command
447    /// if it has.
448    ///
449    /// This is also how a newly created `SourceExecutor` is initialized.
450    /// (force `tick` in `Self::create_source_worker`)
451    ///
452    /// The command will first updates `SourceExecutor`'s splits, and finally calls `Self::apply_source_change`
453    /// to update states in `SourceManager`.
454    async fn tick(&self) -> MetaResult<()> {
455        let split_states = {
456            let core_guard = self.core.lock().await;
457            core_guard.reassign_splits().await?
458        };
459
460        for (database_id, split_state) in split_states {
461            if !split_state.split_assignment.is_empty() {
462                let command = Command::SourceChangeSplit(split_state);
463                tracing::info!(command = ?command, "pushing down split assignment command");
464                self.barrier_scheduler
465                    .run_command(database_id, command)
466                    .await?;
467            }
468        }
469
470        Ok(())
471    }
472
473    pub async fn run(&self) -> MetaResult<()> {
474        let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
475        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
476        loop {
477            ticker.tick().await;
478            let _pause_guard = self.paused.lock().await;
479            if let Err(e) = self.tick().await {
480                tracing::error!(
481                    error = %e.as_report(),
482                    "error happened while running source manager tick",
483                );
484            }
485        }
486    }
487
488    /// Pause the tick loop in source manager until the returned guard is dropped.
489    pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
490        tracing::debug!("pausing tick lock in source manager");
491        self.paused.lock().await
492    }
493
494    /// Force tick for specific updated source workers after properties update.
495    async fn force_tick_updated_sources(&self, updated_source_ids: Vec<SourceId>) {
496        let core = self.core.lock().await;
497        for source_id in updated_source_ids {
498            if let Some(handle) = core.managed_sources.get(&source_id) {
499                tracing::info!("forcing tick for updated source {}", source_id.as_raw_id());
500                if let Err(e) = handle.force_tick().await {
501                    tracing::warn!(
502                        error = %e.as_report(),
503                        "failed to force tick for source {} after properties update",
504                        source_id.as_raw_id()
505                    );
506                }
507            } else {
508                tracing::warn!(
509                    "source {} not found when trying to force tick after update",
510                    source_id.as_raw_id()
511                );
512            }
513        }
514    }
515
516    /// Reset source split assignments by clearing the cached split state
517    /// and triggering re-discovery. This is an UNSAFE operation that may
518    /// cause data duplication or loss depending on the connector.
519    pub async fn reset_source_splits(&self, source_id: SourceId) -> MetaResult<()> {
520        tracing::warn!(
521            source_id = source_id.as_raw_id(),
522            "UNSAFE: Resetting source splits - clearing cached state and triggering re-discovery"
523        );
524
525        let core = self.core.lock().await;
526        if let Some(handle) = core.managed_sources.get(&source_id) {
527            // Clear the cached splits to force re-discovery
528            {
529                let mut splits_guard = handle.splits.lock().await;
530                tracing::info!(
531                    source_id = source_id.as_raw_id(),
532                    prev_splits = ?splits_guard.splits.as_ref().map(|s| s.len()),
533                    "Clearing cached splits"
534                );
535                splits_guard.splits = None;
536            }
537
538            // Force a tick to re-discover splits
539            tracing::info!(
540                source_id = source_id.as_raw_id(),
541                "Triggering split re-discovery via force_tick"
542            );
543            handle.force_tick().await.with_context(|| {
544                format!(
545                    "failed to force tick for source {} after split reset",
546                    source_id.as_raw_id()
547                )
548            })?;
549
550            tracing::info!(
551                source_id = source_id.as_raw_id(),
552                "Split reset completed - new splits will be assigned on next tick"
553            );
554            Ok(())
555        } else {
556            Err(anyhow::anyhow!(
557                "source {} not found in source manager",
558                source_id.as_raw_id()
559            )
560            .into())
561        }
562    }
563
564    /// Validate split offsets before injecting them.
565    /// Returns `Ok(applied_split_ids)` if all validations pass, otherwise returns an error.
566    ///
567    /// Validations performed:
568    /// 1. Source exists in source manager
569    /// 2. All requested split IDs exist in the source's current splits (runtime assignment)
570    pub async fn validate_inject_source_offsets(
571        &self,
572        source_id: SourceId,
573        split_offsets: &HashMap<String, String>,
574    ) -> MetaResult<Vec<String>> {
575        let (fragment_ids, env) = {
576            let core = self.core.lock().await;
577
578            // Check if source exists
579            let _ = core.managed_sources.get(&source_id).ok_or_else(|| {
580                MetaError::invalid_parameter(format!(
581                    "source {} not found in source manager",
582                    source_id.as_raw_id()
583                ))
584            })?;
585
586            let mut ids = Vec::new();
587            if let Some(src_frags) = core.source_fragments.get(&source_id) {
588                ids.extend(src_frags.iter().copied());
589            }
590            if let Some(backfill_frags) = core.backfill_fragments.get(&source_id) {
591                ids.extend(
592                    backfill_frags
593                        .iter()
594                        .flat_map(|(id, upstream)| [*id, *upstream]),
595                );
596            }
597            (ids, core.env.clone())
598        };
599
600        if fragment_ids.is_empty() {
601            return Err(MetaError::invalid_parameter(format!(
602                "source {} has no running fragments",
603                source_id.as_raw_id()
604            )));
605        }
606
607        let guard = env.shared_actor_infos().read_guard();
608        let mut assigned_split_ids = HashSet::new();
609        for fragment_id in fragment_ids {
610            if let Some(fragment) = guard.get_fragment(fragment_id) {
611                for actor in fragment.actors.values() {
612                    for split in &actor.splits {
613                        assigned_split_ids.insert(split.id().to_string());
614                    }
615                }
616            }
617        }
618
619        // Validate all requested split IDs exist
620        let mut invalid_splits = Vec::new();
621        for split_id in split_offsets.keys() {
622            if !assigned_split_ids.contains(split_id) {
623                invalid_splits.push(split_id.clone());
624            }
625        }
626
627        if !invalid_splits.is_empty() {
628            return Err(MetaError::invalid_parameter(format!(
629                "invalid split IDs for source {}: {:?}. Valid splits are: {:?}",
630                source_id.as_raw_id(),
631                invalid_splits,
632                assigned_split_ids.iter().collect::<Vec<_>>()
633            )));
634        }
635
636        tracing::info!(
637            source_id = source_id.as_raw_id(),
638            num_splits = split_offsets.len(),
639            "Validated inject source offsets request"
640        );
641
642        Ok(split_offsets.keys().cloned().collect())
643    }
644}
645
646#[derive(strum::Display, Debug)]
647pub enum SourceChange {
648    /// `CREATE SOURCE` (shared), or `CREATE MV`.
649    /// This is applied after the job is successfully created (`post_collect` barrier).
650    CreateJob {
651        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
652        /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
653        added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
654    },
655    UpdateSourceProps {
656        // the new properties to be set for each source_id
657        // and the props should not affect split assignment and fragments
658        source_id_map_new_props: HashMap<SourceId, HashMap<String, String>>,
659    },
660    /// `CREATE SOURCE` (shared), or `CREATE MV` is _finished_ (backfill is done).
661    /// This is applied after `wait_streaming_job_finished`.
662    /// XXX: Should we merge `CreateJob` into this?
663    CreateJobFinished {
664        /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
665        finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
666    },
667    /// `DROP SOURCE` or `DROP MV`
668    DropSource { dropped_source_ids: Vec<SourceId> },
669    DropMv {
670        // FIXME: we should consider source backfill fragments here for MV on shared source.
671        dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
672    },
673    ReplaceJob {
674        dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
675        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
676        fragment_replacements: HashMap<FragmentId, FragmentId>,
677    },
678}
679
680pub fn build_actor_connector_splits(
681    splits: &HashMap<ActorId, Vec<SplitImpl>>,
682) -> HashMap<ActorId, ConnectorSplits> {
683    splits
684        .iter()
685        .map(|(&actor_id, splits)| {
686            (
687                actor_id,
688                ConnectorSplits {
689                    splits: splits.iter().map(ConnectorSplit::from).collect(),
690                },
691            )
692        })
693        .collect()
694}
695
696pub fn build_actor_split_impls(
697    actor_splits: &HashMap<ActorId, ConnectorSplits>,
698) -> HashMap<ActorId, Vec<SplitImpl>> {
699    actor_splits
700        .iter()
701        .map(|(actor_id, ConnectorSplits { splits })| {
702            (
703                *actor_id,
704                splits
705                    .iter()
706                    .map(|split| SplitImpl::try_from(split).unwrap())
707                    .collect(),
708            )
709        })
710        .collect()
711}