risingwave_meta/stream/
source_manager.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::id::ObjectId;
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_common::panic_if_debug;
29use risingwave_connector::WithOptionsSecResolved;
30use risingwave_connector::error::ConnectorResult;
31use risingwave_connector::source::{
32    ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
33    SplitMetaData,
34};
35use risingwave_meta_model::SourceId;
36use risingwave_pb::catalog::Source;
37use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
38pub use split_assignment::{SplitDiffOptions, SplitState, align_splits, reassign_splits};
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{Mutex, MutexGuard, oneshot};
42use tokio::task::JoinHandle;
43use tokio::time::MissedTickBehavior;
44use tokio::{select, time};
45pub use worker::create_source_worker;
46use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
47
48use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan};
49use crate::manager::{MetaSrvEnv, MetadataManager};
50use crate::model::{ActorId, FragmentId, StreamJobFragments};
51use crate::rpc::metrics::MetaMetrics;
52use crate::{MetaError, MetaResult};
53
54pub type SourceManagerRef = Arc<SourceManager>;
55/// Actor-level split assignment, used inside the barrier worker after actor rendering.
56pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
57
58/// Source-level split assignment: maps source ID to the discovered splits enum.
59/// Unlike [`SplitAssignment`], this does not include fragment- or actor-level assignment.
60/// The [`DiscoveredSplits`] enum is preserved all the way through to the barrier
61/// command, and only resolved to concrete per-fragment, per-actor splits when the
62/// barrier worker performs the actual split assignment.
63pub type SourceSplitAssignment = HashMap<SourceId, DiscoveredSplits>;
64
65/// Discovered splits from the source enumerator.
66///
67/// For adaptive splits, the single template split is returned and must be expanded
68/// to per-actor splits after actor rendering inside the barrier worker.
69#[derive(Debug, Clone)]
70pub enum DiscoveredSplits {
71    /// Fixed splits discovered from the external source.
72    Fixed(BTreeMap<Arc<str>, SplitImpl>),
73    /// A single adaptive split template that needs to be expanded per actor.
74    /// The `fill_adaptive_split` function should be called after actor rendering.
75    Adaptive(SplitImpl),
76}
77
78/// Describes how splits should be resolved for a replace stream job.
79///
80/// This is determined during Phase 1 (before the barrier command) and stored in
81/// [`ReplaceStreamJobPlan`](crate::barrier::ReplaceStreamJobPlan).
82/// The barrier worker uses this to decide the split resolution strategy in Phase 2.
83#[derive(Debug, Clone)]
84pub enum ReplaceJobSplitPlan {
85    /// Fresh discovered splits. Used when the replace job has no existing downstream
86    /// consumers (i.e., `replace_upstream` is empty), or the job is not a source.
87    /// Resolved via [`SourceManager::resolve_fragment_to_actor_splits`] in Phase 2.
88    Discovered(SourceSplitAssignment),
89    /// Splits need to be aligned with the previous source fragment in Phase 2.
90    /// This happens when replacing a source that has existing downstream consumers.
91    /// Resolved via [`SourceManager::resolve_replace_source_splits`] in Phase 2.
92    ///
93    /// Contains the no-shuffle mapping needed for split alignment.
94    AlignFromPrevious,
95}
96
97// ALTER CONNECTOR parameters, specifying the new parameters to be set for each job_id (source_id/sink_id)
98pub type ConnectorPropsChange = HashMap<ObjectId, HashMap<String, String>>;
99
100const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
101
102/// `SourceManager` keeps fetching the latest split metadata from the external source services ([`worker::ConnectorSourceWorker::tick`]),
103/// and sends a split assignment command if split changes detected ([`Self::tick`]).
104pub struct SourceManager {
105    pub paused: Mutex<()>,
106    barrier_scheduler: BarrierScheduler,
107    core: Mutex<SourceManagerCore>,
108    pub metrics: Arc<MetaMetrics>,
109}
110pub struct SourceManagerCore {
111    metadata_manager: MetadataManager,
112
113    /// Managed source loops
114    managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
115    /// Fragments associated with each source
116    source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
117    /// `source_id` -> `(fragment_id, upstream_fragment_id)`
118    backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
119
120    env: MetaSrvEnv,
121}
122
123pub struct SourceManagerRunningInfo {
124    pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
125    pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
126}
127
128impl SourceManagerCore {
129    fn new(
130        metadata_manager: MetadataManager,
131        managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
132        source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
133        backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
134        env: MetaSrvEnv,
135    ) -> Self {
136        Self {
137            metadata_manager,
138            managed_sources,
139            source_fragments,
140            backfill_fragments,
141            env,
142        }
143    }
144
145    /// Updates states after all kinds of source change.
146    pub fn apply_source_change(&mut self, source_change: SourceChange) {
147        let mut added_source_fragments = Default::default();
148        let mut added_backfill_fragments = Default::default();
149        let mut finished_backfill_fragments = Default::default();
150        let mut fragment_replacements = Default::default();
151        let mut dropped_source_fragments = Default::default();
152        let mut dropped_source_ids = Default::default();
153        let mut recreate_source_id_map_new_props: Vec<(SourceId, HashMap<String, String>)> =
154            Default::default();
155
156        match source_change {
157            SourceChange::CreateJob {
158                added_source_fragments: added_source_fragments_,
159                added_backfill_fragments: added_backfill_fragments_,
160            } => {
161                added_source_fragments = added_source_fragments_;
162                added_backfill_fragments = added_backfill_fragments_;
163            }
164            SourceChange::CreateJobFinished {
165                finished_backfill_fragments: finished_backfill_fragments_,
166            } => {
167                finished_backfill_fragments = finished_backfill_fragments_;
168            }
169
170            SourceChange::DropMv {
171                dropped_source_fragments: dropped_source_fragments_,
172            } => {
173                dropped_source_fragments = dropped_source_fragments_;
174            }
175            SourceChange::ReplaceJob {
176                dropped_source_fragments: dropped_source_fragments_,
177                added_source_fragments: added_source_fragments_,
178                fragment_replacements: fragment_replacements_,
179            } => {
180                dropped_source_fragments = dropped_source_fragments_;
181                added_source_fragments = added_source_fragments_;
182                fragment_replacements = fragment_replacements_;
183            }
184            SourceChange::DropSource {
185                dropped_source_ids: dropped_source_ids_,
186            } => {
187                dropped_source_ids = dropped_source_ids_;
188            }
189
190            SourceChange::UpdateSourceProps {
191                source_id_map_new_props,
192            } => {
193                for (source_id, new_props) in source_id_map_new_props {
194                    recreate_source_id_map_new_props.push((source_id, new_props));
195                }
196            }
197        }
198
199        for source_id in dropped_source_ids {
200            let dropped_fragments = self.source_fragments.remove(&source_id);
201
202            if let Some(handle) = self.managed_sources.remove(&source_id) {
203                handle.terminate(dropped_fragments);
204            }
205            if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
206                // TODO: enable this assertion after we implemented cleanup for backfill fragments
207                // debug_assert!(
208                //     fragments.is_empty(),
209                //     "when dropping source, there should be no backfill fragments, got: {:?}",
210                //     fragments
211                // );
212            }
213        }
214
215        for (source_id, fragments) in added_source_fragments {
216            self.source_fragments
217                .entry(source_id)
218                .or_default()
219                .extend(fragments);
220        }
221
222        for (source_id, fragments) in added_backfill_fragments {
223            self.backfill_fragments
224                .entry(source_id)
225                .or_default()
226                .extend(fragments);
227        }
228
229        for (source_id, fragments) in finished_backfill_fragments {
230            let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
231                panic!(
232                    "source {} not found when adding backfill fragments {:?}",
233                    source_id, fragments
234                );
235            });
236            handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
237        }
238
239        for (source_id, fragment_ids) in dropped_source_fragments {
240            self.drop_source_fragments(Some(source_id), fragment_ids);
241        }
242
243        for (old_fragment_id, new_fragment_id) in fragment_replacements {
244            // TODO: add source_id to the fragment_replacements to avoid iterating all sources
245            self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
246
247            for fragment_ids in self.backfill_fragments.values_mut() {
248                let mut new_backfill_fragment_ids = fragment_ids.clone();
249                for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
250                    assert_ne!(
251                        fragment_id, upstream_fragment_id,
252                        "backfill fragment should not be replaced"
253                    );
254                    if *upstream_fragment_id == old_fragment_id {
255                        new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
256                        new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
257                    }
258                }
259                *fragment_ids = new_backfill_fragment_ids;
260            }
261        }
262
263        for (source_id, new_props) in recreate_source_id_map_new_props {
264            if let Some(handle) = self.managed_sources.get_mut(&source_id) {
265                // the update here should not involve fragments change and split change
266                // Or we need to drop and recreate the source worker instead of updating inplace
267                let props_wrapper =
268                    WithOptionsSecResolved::without_secrets(new_props.into_iter().collect());
269                let props = ConnectorProperties::extract(props_wrapper, false).unwrap(); // already checked when sending barrier
270                handle.update_props(props);
271                tracing::info!("update source {source_id} properties in source manager");
272            } else {
273                tracing::info!("job id {source_id} is not registered in source manager");
274            }
275        }
276    }
277
278    fn drop_source_fragments(
279        &mut self,
280        source_id: Option<SourceId>,
281        dropped_fragment_ids: BTreeSet<FragmentId>,
282    ) {
283        if let Some(source_id) = source_id {
284            if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
285                let mut dropped_ids = vec![];
286                let managed_fragment_ids = entry.get_mut();
287                for fragment_id in &dropped_fragment_ids {
288                    managed_fragment_ids.remove(fragment_id);
289                    dropped_ids.push(*fragment_id);
290                }
291                if let Some(handle) = self.managed_sources.get(&source_id) {
292                    handle.drop_fragments(dropped_ids);
293                } else {
294                    panic_if_debug!(
295                        "source {source_id} not found when dropping fragment {dropped_ids:?}",
296                    );
297                }
298                if managed_fragment_ids.is_empty() {
299                    entry.remove();
300                }
301            }
302        } else {
303            for (source_id, fragment_ids) in &mut self.source_fragments {
304                let mut dropped_ids = vec![];
305                for fragment_id in &dropped_fragment_ids {
306                    if fragment_ids.remove(fragment_id) {
307                        dropped_ids.push(*fragment_id);
308                    }
309                }
310                if !dropped_ids.is_empty() {
311                    if let Some(handle) = self.managed_sources.get(source_id) {
312                        handle.drop_fragments(dropped_ids);
313                    } else {
314                        panic_if_debug!(
315                            "source {source_id} not found when dropping fragment {dropped_ids:?}",
316                        );
317                    }
318                }
319            }
320        }
321    }
322}
323
324impl SourceManager {
325    const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
326
327    pub async fn new(
328        barrier_scheduler: BarrierScheduler,
329        metadata_manager: MetadataManager,
330        metrics: Arc<MetaMetrics>,
331        env: MetaSrvEnv,
332    ) -> MetaResult<Self> {
333        let mut managed_sources = HashMap::new();
334        {
335            let sources = metadata_manager.list_sources().await?;
336            for source in sources {
337                create_source_worker_async(source, &mut managed_sources, metrics.clone())?
338            }
339        }
340
341        let source_fragments = metadata_manager
342            .catalog_controller
343            .load_source_fragment_ids()
344            .await?
345            .into_iter()
346            .map(|(source_id, fragment_ids)| {
347                (
348                    source_id as SourceId,
349                    fragment_ids.into_iter().map(|id| id as _).collect(),
350                )
351            })
352            .collect();
353        let backfill_fragments = metadata_manager
354            .catalog_controller
355            .load_backfill_fragment_ids()
356            .await?;
357
358        let core = Mutex::new(SourceManagerCore::new(
359            metadata_manager,
360            managed_sources,
361            source_fragments,
362            backfill_fragments,
363            env,
364        ));
365
366        Ok(Self {
367            barrier_scheduler,
368            core,
369            paused: Mutex::new(()),
370            metrics,
371        })
372    }
373
374    pub async fn validate_source_once(
375        &self,
376        source_id: SourceId,
377        new_source_props: WithOptionsSecResolved,
378    ) -> MetaResult<()> {
379        let props = ConnectorProperties::extract(new_source_props, false).unwrap();
380
381        {
382            let mut enumerator = props
383                .create_split_enumerator(Arc::new(SourceEnumeratorContext {
384                    metrics: self.metrics.source_enumerator_metrics.clone(),
385                    info: SourceEnumeratorInfo { source_id },
386                }))
387                .await
388                .context("failed to create SplitEnumerator")?;
389
390            let _ = tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, enumerator.list_splits())
391                .await
392                .context("failed to list splits")??;
393        }
394        Ok(())
395    }
396
397    /// For replacing job (alter table/source, create sink into table).
398    #[await_tree::instrument]
399    pub async fn handle_replace_job(
400        &self,
401        dropped_job_fragments: &StreamJobFragments,
402        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
403        replace_plan: &ReplaceStreamJobPlan,
404    ) {
405        // Extract the fragments that include source operators.
406        let dropped_source_fragments = dropped_job_fragments.stream_source_fragments();
407
408        self.apply_source_change(SourceChange::ReplaceJob {
409            dropped_source_fragments,
410            added_source_fragments,
411            fragment_replacements: replace_plan.fragment_replacements(),
412        })
413        .await;
414    }
415
416    /// Updates states after all kinds of source change.
417    /// e.g., split change (`post_collect` barrier) or scaling (`post_apply_reschedule`).
418    #[await_tree::instrument("apply_source_change({source_change})")]
419    pub async fn apply_source_change(&self, source_change: SourceChange) {
420        let need_force_tick = matches!(source_change, SourceChange::UpdateSourceProps { .. });
421        let updated_source_ids = if let SourceChange::UpdateSourceProps {
422            ref source_id_map_new_props,
423        } = source_change
424        {
425            source_id_map_new_props.keys().cloned().collect::<Vec<_>>()
426        } else {
427            Vec::new()
428        };
429
430        {
431            let mut core = self.core.lock().await;
432            core.apply_source_change(source_change);
433        }
434
435        // Force tick for updated source workers
436        if need_force_tick {
437            self.force_tick_updated_sources(updated_source_ids).await;
438        }
439    }
440
441    /// create and register connector worker for source.
442    #[await_tree::instrument("register_source({})", source.name)]
443    pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
444        tracing::debug!("register_source: {}", source.get_id());
445        let mut core = self.core.lock().await;
446        let source_id = source.get_id();
447        if core.managed_sources.contains_key(&source_id) {
448            tracing::warn!("source {} already registered", source_id);
449            return Ok(());
450        }
451
452        let handle = create_source_worker(source, self.metrics.clone())
453            .await
454            .context("failed to create source worker")?;
455
456        core.managed_sources.insert(source_id, handle);
457
458        Ok(())
459    }
460
461    /// register connector worker for source.
462    pub async fn register_source_with_handle(
463        &self,
464        source_id: SourceId,
465        handle: ConnectorSourceWorkerHandle,
466    ) {
467        let mut core = self.core.lock().await;
468        if core.managed_sources.contains_key(&source_id) {
469            tracing::warn!("source {} already registered", source_id);
470            return;
471        }
472
473        core.managed_sources.insert(source_id, handle);
474    }
475
476    pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
477        let core = self.core.lock().await;
478
479        SourceManagerRunningInfo {
480            source_fragments: core.source_fragments.clone(),
481            backfill_fragments: core.backfill_fragments.clone(),
482        }
483    }
484
485    /// Checks whether the external source metadata has changed, and sends a split assignment command
486    /// if it has.
487    ///
488    /// This is also how a newly created `SourceExecutor` is initialized.
489    /// (force `tick` in `Self::create_source_worker`)
490    ///
491    /// The command will first updates `SourceExecutor`'s splits, and finally calls `Self::apply_source_change`
492    /// to update states in `SourceManager`.
493    async fn tick(&self) -> MetaResult<()> {
494        let split_states = {
495            let core_guard = self.core.lock().await;
496            core_guard.reassign_splits().await?
497        };
498
499        for (database_id, split_state) in split_states {
500            if !split_state.split_assignment.is_empty() {
501                let command = Command::SourceChangeSplit(split_state);
502                tracing::info!(command = ?command, "pushing down split assignment command");
503                self.barrier_scheduler
504                    .run_command(database_id, command)
505                    .await?;
506            }
507        }
508
509        Ok(())
510    }
511
512    pub async fn run(&self) -> MetaResult<()> {
513        let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
514        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
515        loop {
516            ticker.tick().await;
517            let _pause_guard = self.paused.lock().await;
518            if let Err(e) = self.tick().await {
519                tracing::error!(
520                    error = %e.as_report(),
521                    "error happened while running source manager tick",
522                );
523            }
524        }
525    }
526
527    /// Pause the tick loop in source manager until the returned guard is dropped.
528    pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
529        tracing::debug!("pausing tick lock in source manager");
530        self.paused.lock().await
531    }
532
533    /// Force tick for specific updated source workers after properties update.
534    async fn force_tick_updated_sources(&self, updated_source_ids: Vec<SourceId>) {
535        let core = self.core.lock().await;
536        for source_id in updated_source_ids {
537            if let Some(handle) = core.managed_sources.get(&source_id) {
538                tracing::info!("forcing tick for updated source {}", source_id);
539                if let Err(e) = handle.force_tick().await {
540                    tracing::warn!(
541                        error = %e.as_report(),
542                        "failed to force tick for source {} after properties update",
543                        source_id
544                    );
545                }
546            } else {
547                tracing::warn!(
548                    "source {} not found when trying to force tick after update",
549                    source_id
550                );
551            }
552        }
553    }
554
555    /// Reset source split assignments by clearing the cached split state
556    /// and triggering re-discovery. This is an UNSAFE operation that may
557    /// cause data duplication or loss depending on the connector.
558    pub async fn reset_source_splits(&self, source_id: SourceId) -> MetaResult<()> {
559        tracing::warn!(
560            %source_id,
561            "UNSAFE: Resetting source splits - clearing cached state and triggering re-discovery"
562        );
563
564        let core = self.core.lock().await;
565        if let Some(handle) = core.managed_sources.get(&source_id) {
566            // Clear the cached splits to force re-discovery
567            {
568                let mut splits_guard = handle.splits.lock().await;
569                tracing::info!(
570                    %source_id,
571                    prev_splits = ?splits_guard.splits.as_ref().map(|s| s.len()),
572                    "Clearing cached splits"
573                );
574                splits_guard.splits = None;
575            }
576
577            // Force a tick to re-discover splits
578            tracing::info!(
579                %source_id,
580                "Triggering split re-discovery via force_tick"
581            );
582            handle.force_tick().await.with_context(|| {
583                format!(
584                    "failed to force tick for source {} after split reset",
585                    source_id
586                )
587            })?;
588
589            tracing::info!(
590                %source_id,
591                "Split reset completed - new splits will be assigned on next tick"
592            );
593            Ok(())
594        } else {
595            Err(anyhow::anyhow!("source {} not found in source manager", source_id).into())
596        }
597    }
598
599    /// Validate split offsets before injecting them.
600    /// Returns `Ok(applied_split_ids)` if all validations pass, otherwise returns an error.
601    ///
602    /// Validations performed:
603    /// 1. Source exists in source manager
604    /// 2. All requested split IDs exist in the source's current splits (runtime assignment)
605    pub async fn validate_inject_source_offsets(
606        &self,
607        source_id: SourceId,
608        split_offsets: &HashMap<String, String>,
609    ) -> MetaResult<Vec<String>> {
610        let (fragment_ids, env) = {
611            let core = self.core.lock().await;
612
613            // Check if source exists
614            let _ = core.managed_sources.get(&source_id).ok_or_else(|| {
615                MetaError::invalid_parameter(format!(
616                    "source {} not found in source manager",
617                    source_id
618                ))
619            })?;
620
621            let mut ids = Vec::new();
622            if let Some(src_frags) = core.source_fragments.get(&source_id) {
623                ids.extend(src_frags.iter().copied());
624            }
625            if let Some(backfill_frags) = core.backfill_fragments.get(&source_id) {
626                ids.extend(
627                    backfill_frags
628                        .iter()
629                        .flat_map(|(id, upstream)| [*id, *upstream]),
630                );
631            }
632            (ids, core.env.clone())
633        };
634
635        if fragment_ids.is_empty() {
636            return Err(MetaError::invalid_parameter(format!(
637                "source {} has no running fragments",
638                source_id
639            )));
640        }
641
642        let guard = env.shared_actor_infos().read_guard();
643        let mut assigned_split_ids = HashSet::new();
644        for fragment_id in fragment_ids {
645            if let Some(fragment) = guard.get_fragment(fragment_id) {
646                for actor in fragment.actors.values() {
647                    for split in &actor.splits {
648                        assigned_split_ids.insert(split.id().to_string());
649                    }
650                }
651            }
652        }
653
654        // Validate all requested split IDs exist
655        let mut invalid_splits = Vec::new();
656        for split_id in split_offsets.keys() {
657            if !assigned_split_ids.contains(split_id) {
658                invalid_splits.push(split_id.clone());
659            }
660        }
661
662        if !invalid_splits.is_empty() {
663            return Err(MetaError::invalid_parameter(format!(
664                "invalid split IDs for source {}: {:?}. Valid splits are: {:?}",
665                source_id,
666                invalid_splits,
667                assigned_split_ids.iter().collect::<Vec<_>>()
668            )));
669        }
670
671        tracing::info!(
672            source_id = %source_id,
673            num_splits = split_offsets.len(),
674            "Validated inject source offsets request"
675        );
676
677        Ok(split_offsets.keys().cloned().collect())
678    }
679}
680
681#[derive(strum::Display, Debug)]
682pub enum SourceChange {
683    /// `CREATE SOURCE` (shared), or `CREATE MV`.
684    /// This is applied after the job is successfully created (`post_collect` barrier).
685    CreateJob {
686        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
687        /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
688        added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
689    },
690    UpdateSourceProps {
691        // the new properties to be set for each source_id
692        // and the props should not affect split assignment and fragments
693        source_id_map_new_props: HashMap<SourceId, HashMap<String, String>>,
694    },
695    /// `CREATE SOURCE` (shared), or `CREATE MV` is _finished_ (backfill is done).
696    /// This is applied after `wait_streaming_job_finished`.
697    /// XXX: Should we merge `CreateJob` into this?
698    CreateJobFinished {
699        /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
700        finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
701    },
702    /// `DROP SOURCE` or `DROP MV`
703    DropSource { dropped_source_ids: Vec<SourceId> },
704    DropMv {
705        // FIXME: we should consider source backfill fragments here for MV on shared source.
706        dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
707    },
708    ReplaceJob {
709        dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
710        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
711        fragment_replacements: HashMap<FragmentId, FragmentId>,
712    },
713}
714
715pub fn build_actor_connector_splits(
716    splits: &HashMap<ActorId, Vec<SplitImpl>>,
717) -> HashMap<ActorId, ConnectorSplits> {
718    splits
719        .iter()
720        .map(|(&actor_id, splits)| {
721            (
722                actor_id,
723                ConnectorSplits {
724                    splits: splits.iter().map(ConnectorSplit::from).collect(),
725                },
726            )
727        })
728        .collect()
729}
730
731pub fn build_actor_split_impls(
732    actor_splits: &HashMap<ActorId, ConnectorSplits>,
733) -> HashMap<ActorId, Vec<SplitImpl>> {
734    actor_splits
735        .iter()
736        .map(|(actor_id, ConnectorSplits { splits })| {
737            (
738                *actor_id,
739                splits
740                    .iter()
741                    .map(|split| SplitImpl::try_from(split).unwrap())
742                    .collect(),
743            )
744        })
745        .collect()
746}