risingwave_meta/stream/
source_manager.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod split_assignment;
16mod worker;
17use std::borrow::BorrowMut;
18use std::cmp::Ordering;
19use std::collections::hash_map::Entry;
20use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet};
21use std::sync::Arc;
22use std::time::Duration;
23
24use anyhow::Context;
25use risingwave_common::catalog::DatabaseId;
26use risingwave_common::id::ObjectId;
27use risingwave_common::metrics::LabelGuardedIntGauge;
28use risingwave_common::panic_if_debug;
29use risingwave_connector::WithOptionsSecResolved;
30use risingwave_connector::error::ConnectorResult;
31use risingwave_connector::source::{
32    ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SplitId, SplitImpl,
33    SplitMetaData, fill_adaptive_split,
34};
35use risingwave_meta_model::SourceId;
36use risingwave_pb::catalog::Source;
37use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
38pub use split_assignment::{SplitDiffOptions, SplitState, reassign_splits};
39use thiserror_ext::AsReport;
40use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
41use tokio::sync::{Mutex, MutexGuard, oneshot};
42use tokio::task::JoinHandle;
43use tokio::time::MissedTickBehavior;
44use tokio::{select, time};
45pub use worker::create_source_worker;
46use worker::{ConnectorSourceWorkerHandle, create_source_worker_async};
47
48use crate::MetaResult;
49use crate::barrier::{BarrierScheduler, Command, ReplaceStreamJobPlan, SharedActorInfos};
50use crate::manager::{MetaSrvEnv, MetadataManager};
51use crate::model::{ActorId, FragmentId, StreamJobFragments};
52use crate::rpc::metrics::MetaMetrics;
53
54pub type SourceManagerRef = Arc<SourceManager>;
55pub type SplitAssignment = HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>;
56pub type DiscoveredSourceSplits = HashMap<SourceId, Vec<SplitImpl>>;
57// ALTER CONNECTOR parameters, specifying the new parameters to be set for each job_id (source_id/sink_id)
58pub type ConnectorPropsChange = HashMap<ObjectId, HashMap<String, String>>;
59
60const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
61
62/// `SourceManager` keeps fetching the latest split metadata from the external source services ([`worker::ConnectorSourceWorker::tick`]),
63/// and sends a split assignment command if split changes detected ([`Self::tick`]).
64pub struct SourceManager {
65    pub paused: Mutex<()>,
66    barrier_scheduler: BarrierScheduler,
67    core: Mutex<SourceManagerCore>,
68    pub metrics: Arc<MetaMetrics>,
69}
70pub struct SourceManagerCore {
71    metadata_manager: MetadataManager,
72
73    /// Managed source loops
74    managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
75    /// Fragments associated with each source
76    source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
77    /// `source_id` -> `(fragment_id, upstream_fragment_id)`
78    backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
79
80    env: MetaSrvEnv,
81}
82
83pub struct SourceManagerRunningInfo {
84    pub source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
85    pub backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
86}
87
88impl SourceManagerCore {
89    fn new(
90        metadata_manager: MetadataManager,
91        managed_sources: HashMap<SourceId, ConnectorSourceWorkerHandle>,
92        source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
93        backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
94        env: MetaSrvEnv,
95    ) -> Self {
96        Self {
97            metadata_manager,
98            managed_sources,
99            source_fragments,
100            backfill_fragments,
101            env,
102        }
103    }
104
105    /// Updates states after all kinds of source change.
106    pub fn apply_source_change(&mut self, source_change: SourceChange) {
107        let mut added_source_fragments = Default::default();
108        let mut added_backfill_fragments = Default::default();
109        let mut finished_backfill_fragments = Default::default();
110        let mut fragment_replacements = Default::default();
111        let mut dropped_source_fragments = Default::default();
112        let mut dropped_source_ids = Default::default();
113        let mut recreate_source_id_map_new_props: Vec<(SourceId, HashMap<String, String>)> =
114            Default::default();
115
116        match source_change {
117            SourceChange::CreateJob {
118                added_source_fragments: added_source_fragments_,
119                added_backfill_fragments: added_backfill_fragments_,
120            } => {
121                added_source_fragments = added_source_fragments_;
122                added_backfill_fragments = added_backfill_fragments_;
123            }
124            SourceChange::CreateJobFinished {
125                finished_backfill_fragments: finished_backfill_fragments_,
126            } => {
127                finished_backfill_fragments = finished_backfill_fragments_;
128            }
129
130            SourceChange::DropMv {
131                dropped_source_fragments: dropped_source_fragments_,
132            } => {
133                dropped_source_fragments = dropped_source_fragments_;
134            }
135            SourceChange::ReplaceJob {
136                dropped_source_fragments: dropped_source_fragments_,
137                added_source_fragments: added_source_fragments_,
138                fragment_replacements: fragment_replacements_,
139            } => {
140                dropped_source_fragments = dropped_source_fragments_;
141                added_source_fragments = added_source_fragments_;
142                fragment_replacements = fragment_replacements_;
143            }
144            SourceChange::DropSource {
145                dropped_source_ids: dropped_source_ids_,
146            } => {
147                dropped_source_ids = dropped_source_ids_;
148            }
149
150            SourceChange::UpdateSourceProps {
151                source_id_map_new_props,
152            } => {
153                for (source_id, new_props) in source_id_map_new_props {
154                    recreate_source_id_map_new_props.push((source_id, new_props));
155                }
156            }
157        }
158
159        for source_id in dropped_source_ids {
160            let dropped_fragments = self.source_fragments.remove(&source_id);
161
162            if let Some(handle) = self.managed_sources.remove(&source_id) {
163                handle.terminate(dropped_fragments);
164            }
165            if let Some(_fragments) = self.backfill_fragments.remove(&source_id) {
166                // TODO: enable this assertion after we implemented cleanup for backfill fragments
167                // debug_assert!(
168                //     fragments.is_empty(),
169                //     "when dropping source, there should be no backfill fragments, got: {:?}",
170                //     fragments
171                // );
172            }
173        }
174
175        for (source_id, fragments) in added_source_fragments {
176            self.source_fragments
177                .entry(source_id)
178                .or_default()
179                .extend(fragments);
180        }
181
182        for (source_id, fragments) in added_backfill_fragments {
183            self.backfill_fragments
184                .entry(source_id)
185                .or_default()
186                .extend(fragments);
187        }
188
189        for (source_id, fragments) in finished_backfill_fragments {
190            let handle = self.managed_sources.get(&source_id).unwrap_or_else(|| {
191                panic!(
192                    "source {} not found when adding backfill fragments {:?}",
193                    source_id, fragments
194                );
195            });
196            handle.finish_backfill(fragments.iter().map(|(id, _up_id)| *id).collect());
197        }
198
199        for (source_id, fragment_ids) in dropped_source_fragments {
200            self.drop_source_fragments(Some(source_id), fragment_ids);
201        }
202
203        for (old_fragment_id, new_fragment_id) in fragment_replacements {
204            // TODO: add source_id to the fragment_replacements to avoid iterating all sources
205            self.drop_source_fragments(None, BTreeSet::from([old_fragment_id]));
206
207            for fragment_ids in self.backfill_fragments.values_mut() {
208                let mut new_backfill_fragment_ids = fragment_ids.clone();
209                for (fragment_id, upstream_fragment_id) in fragment_ids.iter() {
210                    assert_ne!(
211                        fragment_id, upstream_fragment_id,
212                        "backfill fragment should not be replaced"
213                    );
214                    if *upstream_fragment_id == old_fragment_id {
215                        new_backfill_fragment_ids.remove(&(*fragment_id, *upstream_fragment_id));
216                        new_backfill_fragment_ids.insert((*fragment_id, new_fragment_id));
217                    }
218                }
219                *fragment_ids = new_backfill_fragment_ids;
220            }
221        }
222
223        for (source_id, new_props) in recreate_source_id_map_new_props {
224            if let Some(handle) = self.managed_sources.get_mut(&(source_id as _)) {
225                // the update here should not involve fragments change and split change
226                // Or we need to drop and recreate the source worker instead of updating inplace
227                let props_wrapper =
228                    WithOptionsSecResolved::without_secrets(new_props.into_iter().collect());
229                let props = ConnectorProperties::extract(props_wrapper, false).unwrap(); // already checked when sending barrier
230                handle.update_props(props);
231                tracing::info!("update source {source_id} properties in source manager");
232            } else {
233                tracing::info!("job id {source_id} is not registered in source manager");
234            }
235        }
236    }
237
238    fn drop_source_fragments(
239        &mut self,
240        source_id: Option<SourceId>,
241        dropped_fragment_ids: BTreeSet<FragmentId>,
242    ) {
243        if let Some(source_id) = source_id {
244            if let Entry::Occupied(mut entry) = self.source_fragments.entry(source_id) {
245                let mut dropped_ids = vec![];
246                let managed_fragment_ids = entry.get_mut();
247                for fragment_id in &dropped_fragment_ids {
248                    managed_fragment_ids.remove(fragment_id);
249                    dropped_ids.push(*fragment_id);
250                }
251                if let Some(handle) = self.managed_sources.get(&source_id) {
252                    handle.drop_fragments(dropped_ids);
253                } else {
254                    panic_if_debug!(
255                        "source {source_id} not found when dropping fragment {dropped_ids:?}",
256                    );
257                }
258                if managed_fragment_ids.is_empty() {
259                    entry.remove();
260                }
261            }
262        } else {
263            for (source_id, fragment_ids) in &mut self.source_fragments {
264                let mut dropped_ids = vec![];
265                for fragment_id in &dropped_fragment_ids {
266                    if fragment_ids.remove(fragment_id) {
267                        dropped_ids.push(*fragment_id);
268                    }
269                }
270                if !dropped_ids.is_empty() {
271                    if let Some(handle) = self.managed_sources.get(source_id) {
272                        handle.drop_fragments(dropped_ids);
273                    } else {
274                        panic_if_debug!(
275                            "source {source_id} not found when dropping fragment {dropped_ids:?}",
276                        );
277                    }
278                }
279            }
280        }
281    }
282}
283
284impl SourceManager {
285    const DEFAULT_SOURCE_TICK_INTERVAL: Duration = Duration::from_secs(10);
286
287    pub async fn new(
288        barrier_scheduler: BarrierScheduler,
289        metadata_manager: MetadataManager,
290        metrics: Arc<MetaMetrics>,
291        env: MetaSrvEnv,
292    ) -> MetaResult<Self> {
293        let mut managed_sources = HashMap::new();
294        {
295            let sources = metadata_manager.list_sources().await?;
296            for source in sources {
297                create_source_worker_async(source, &mut managed_sources, metrics.clone())?
298            }
299        }
300
301        let source_fragments = metadata_manager
302            .catalog_controller
303            .load_source_fragment_ids()
304            .await?
305            .into_iter()
306            .map(|(source_id, fragment_ids)| {
307                (
308                    source_id as SourceId,
309                    fragment_ids.into_iter().map(|id| id as _).collect(),
310                )
311            })
312            .collect();
313        let backfill_fragments = metadata_manager
314            .catalog_controller
315            .load_backfill_fragment_ids()
316            .await?;
317
318        let core = Mutex::new(SourceManagerCore::new(
319            metadata_manager,
320            managed_sources,
321            source_fragments,
322            backfill_fragments,
323            env,
324        ));
325
326        Ok(Self {
327            barrier_scheduler,
328            core,
329            paused: Mutex::new(()),
330            metrics,
331        })
332    }
333
334    pub async fn validate_source_once(
335        &self,
336        source_id: SourceId,
337        new_source_props: WithOptionsSecResolved,
338    ) -> MetaResult<()> {
339        let props = ConnectorProperties::extract(new_source_props, false).unwrap();
340
341        {
342            let mut enumerator = props
343                .create_split_enumerator(Arc::new(SourceEnumeratorContext {
344                    metrics: self.metrics.source_enumerator_metrics.clone(),
345                    info: SourceEnumeratorInfo { source_id },
346                }))
347                .await
348                .context("failed to create SplitEnumerator")?;
349
350            let _ = tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, enumerator.list_splits())
351                .await
352                .context("failed to list splits")??;
353        }
354        Ok(())
355    }
356
357    /// For replacing job (alter table/source, create sink into table).
358    #[await_tree::instrument]
359    pub async fn handle_replace_job(
360        &self,
361        dropped_job_fragments: &StreamJobFragments,
362        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
363        replace_plan: &ReplaceStreamJobPlan,
364    ) {
365        // Extract the fragments that include source operators.
366        let dropped_source_fragments = dropped_job_fragments.stream_source_fragments();
367
368        self.apply_source_change(SourceChange::ReplaceJob {
369            dropped_source_fragments,
370            added_source_fragments,
371            fragment_replacements: replace_plan.fragment_replacements(),
372        })
373        .await;
374    }
375
376    /// Updates states after all kinds of source change.
377    /// e.g., split change (`post_collect` barrier) or scaling (`post_apply_reschedule`).
378    #[await_tree::instrument("apply_source_change({source_change})")]
379    pub async fn apply_source_change(&self, source_change: SourceChange) {
380        let need_force_tick = matches!(source_change, SourceChange::UpdateSourceProps { .. });
381        let updated_source_ids = if let SourceChange::UpdateSourceProps {
382            ref source_id_map_new_props,
383        } = source_change
384        {
385            source_id_map_new_props.keys().cloned().collect::<Vec<_>>()
386        } else {
387            Vec::new()
388        };
389
390        {
391            let mut core = self.core.lock().await;
392            core.apply_source_change(source_change);
393        }
394
395        // Force tick for updated source workers
396        if need_force_tick {
397            self.force_tick_updated_sources(updated_source_ids).await;
398        }
399    }
400
401    /// create and register connector worker for source.
402    #[await_tree::instrument("register_source({})", source.name)]
403    pub async fn register_source(&self, source: &Source) -> MetaResult<()> {
404        tracing::debug!("register_source: {}", source.get_id());
405        let mut core = self.core.lock().await;
406        let source_id = source.get_id();
407        if core.managed_sources.contains_key(&source_id) {
408            tracing::warn!("source {} already registered", source_id);
409            return Ok(());
410        }
411
412        let handle = create_source_worker(source, self.metrics.clone())
413            .await
414            .context("failed to create source worker")?;
415
416        core.managed_sources.insert(source_id, handle);
417
418        Ok(())
419    }
420
421    /// register connector worker for source.
422    pub async fn register_source_with_handle(
423        &self,
424        source_id: SourceId,
425        handle: ConnectorSourceWorkerHandle,
426    ) {
427        let mut core = self.core.lock().await;
428        if core.managed_sources.contains_key(&source_id) {
429            tracing::warn!("source {} already registered", source_id);
430            return;
431        }
432
433        core.managed_sources.insert(source_id, handle);
434    }
435
436    pub async fn get_running_info(&self) -> SourceManagerRunningInfo {
437        let core = self.core.lock().await;
438
439        SourceManagerRunningInfo {
440            source_fragments: core.source_fragments.clone(),
441            backfill_fragments: core.backfill_fragments.clone(),
442        }
443    }
444
445    /// Checks whether the external source metadata has changed, and sends a split assignment command
446    /// if it has.
447    ///
448    /// This is also how a newly created `SourceExecutor` is initialized.
449    /// (force `tick` in `Self::create_source_worker`)
450    ///
451    /// The command will first updates `SourceExecutor`'s splits, and finally calls `Self::apply_source_change`
452    /// to update states in `SourceManager`.
453    async fn tick(&self) -> MetaResult<()> {
454        let split_states = {
455            let core_guard = self.core.lock().await;
456            core_guard.reassign_splits().await?
457        };
458
459        for (database_id, split_state) in split_states {
460            if !split_state.split_assignment.is_empty() {
461                let command = Command::SourceChangeSplit(split_state);
462                tracing::info!(command = ?command, "pushing down split assignment command");
463                self.barrier_scheduler
464                    .run_command(database_id, command)
465                    .await?;
466            }
467        }
468
469        Ok(())
470    }
471
472    pub async fn run(&self) -> MetaResult<()> {
473        let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
474        ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
475        loop {
476            ticker.tick().await;
477            let _pause_guard = self.paused.lock().await;
478            if let Err(e) = self.tick().await {
479                tracing::error!(
480                    error = %e.as_report(),
481                    "error happened while running source manager tick",
482                );
483            }
484        }
485    }
486
487    /// Pause the tick loop in source manager until the returned guard is dropped.
488    pub async fn pause_tick(&self) -> MutexGuard<'_, ()> {
489        tracing::debug!("pausing tick lock in source manager");
490        self.paused.lock().await
491    }
492
493    /// Force tick for specific updated source workers after properties update.
494    async fn force_tick_updated_sources(&self, updated_source_ids: Vec<SourceId>) {
495        let core = self.core.lock().await;
496        for source_id in updated_source_ids {
497            if let Some(handle) = core.managed_sources.get(&source_id) {
498                tracing::info!("forcing tick for updated source {}", source_id.as_raw_id());
499                if let Err(e) = handle.force_tick().await {
500                    tracing::warn!(
501                        error = %e.as_report(),
502                        "failed to force tick for source {} after properties update",
503                        source_id.as_raw_id()
504                    );
505                }
506            } else {
507                tracing::warn!(
508                    "source {} not found when trying to force tick after update",
509                    source_id.as_raw_id()
510                );
511            }
512        }
513    }
514}
515
516#[derive(strum::Display, Debug)]
517pub enum SourceChange {
518    /// `CREATE SOURCE` (shared), or `CREATE MV`.
519    /// This is applied after the job is successfully created (`post_collect` barrier).
520    CreateJob {
521        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
522        /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
523        added_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
524    },
525    UpdateSourceProps {
526        // the new properties to be set for each source_id
527        // and the props should not affect split assignment and fragments
528        source_id_map_new_props: HashMap<SourceId, HashMap<String, String>>,
529    },
530    /// `CREATE SOURCE` (shared), or `CREATE MV` is _finished_ (backfill is done).
531    /// This is applied after `wait_streaming_job_finished`.
532    /// XXX: Should we merge `CreateJob` into this?
533    CreateJobFinished {
534        /// (`source_id`, -> (`source_backfill_fragment_id`, `upstream_source_fragment_id`))
535        finished_backfill_fragments: HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>,
536    },
537    /// `DROP SOURCE` or `DROP MV`
538    DropSource { dropped_source_ids: Vec<SourceId> },
539    DropMv {
540        // FIXME: we should consider source backfill fragments here for MV on shared source.
541        dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
542    },
543    ReplaceJob {
544        dropped_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
545        added_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
546        fragment_replacements: HashMap<FragmentId, FragmentId>,
547    },
548}
549
550pub fn build_actor_connector_splits(
551    splits: &HashMap<ActorId, Vec<SplitImpl>>,
552) -> HashMap<ActorId, ConnectorSplits> {
553    splits
554        .iter()
555        .map(|(&actor_id, splits)| {
556            (
557                actor_id,
558                ConnectorSplits {
559                    splits: splits.iter().map(ConnectorSplit::from).collect(),
560                },
561            )
562        })
563        .collect()
564}
565
566pub fn build_actor_split_impls(
567    actor_splits: &HashMap<ActorId, ConnectorSplits>,
568) -> HashMap<ActorId, Vec<SplitImpl>> {
569    actor_splits
570        .iter()
571        .map(|(actor_id, ConnectorSplits { splits })| {
572            (
573                *actor_id,
574                splits
575                    .iter()
576                    .map(|split| SplitImpl::try_from(split).unwrap())
577                    .collect(),
578            )
579        })
580        .collect()
581}