1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Display, Formatter};
18
19use itertools::Itertools;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::{DatabaseId, TableId};
22use risingwave_common::hash::{ActorMapping, VnodeCountCompat};
23use risingwave_common::id::{JobId, SourceId};
24use risingwave_common::must_match;
25use risingwave_common::types::Timestamptz;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::{CdcTableSnapshotSplitRaw, SplitImpl};
28use risingwave_hummock_sdk::change_log::build_table_change_log_delta;
29use risingwave_hummock_sdk::vector_index::VectorIndexDelta;
30use risingwave_meta_model::{DispatcherType, WorkerId, fragment_relation, streaming_job};
31use risingwave_pb::catalog::CreateType;
32use risingwave_pb::common::PbActorInfo;
33use risingwave_pb::hummock::vector_index_delta::PbVectorIndexInit;
34use risingwave_pb::plan_common::{ColumnCatalog as PbColumnCatalog, PbField};
35use risingwave_pb::source::{
36 ConnectorSplit, ConnectorSplits, PbCdcTableSnapshotSplits,
37 PbCdcTableSnapshotSplitsWithGeneration,
38};
39use risingwave_pb::stream_plan::add_mutation::PbNewUpstreamSink;
40use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::connector_props_change_mutation::ConnectorPropsInfo;
43use risingwave_pb::stream_plan::sink_schema_change::Op as PbSinkSchemaChangeOp;
44use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
45use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
46use risingwave_pb::stream_plan::{
47 AddMutation, ConnectorPropsChangeMutation, Dispatcher, Dispatchers, DropSubscriptionsMutation,
48 ListFinishMutation, LoadFinishMutation, PauseMutation, PbSinkAddColumnsOp, PbSinkDropColumnsOp,
49 PbSinkSchemaChange, PbUpstreamSinkInfo, ResumeMutation, SourceChangeSplitMutation,
50 StartFragmentBackfillMutation, StopMutation, SubscriptionUpstreamInfo, ThrottleMutation,
51 UpdateMutation,
52};
53use risingwave_pb::stream_service::BarrierCompleteResponse;
54use tracing::warn;
55
56use super::info::InflightDatabaseInfo;
57use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
58use crate::barrier::complete_task::CompleteBarrierTask;
59use crate::barrier::edge_builder::FragmentEdgeBuildResult;
60use crate::barrier::info::BarrierInfo;
61use crate::barrier::partial_graph::PartialGraphBarrierInfo;
62use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
63use crate::barrier::utils::{collect_new_vector_index_info, collect_resp_info};
64use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
65use crate::controller::scale::LoadedFragmentContext;
66use crate::controller::utils::StreamingJobExtraInfo;
67use crate::hummock::NewTableFragmentInfo;
68use crate::manager::{StreamingJob, StreamingJobType};
69use crate::model::{
70 ActorId, ActorUpstreams, DispatcherId, FragmentActorDispatchers, FragmentDownstreamRelation,
71 FragmentId, FragmentReplaceUpstream, StreamActor, StreamActorWithDispatchers,
72 StreamJobActorsToCreate, StreamJobFragments, StreamJobFragmentsToCreate, SubscriptionId,
73};
74use crate::stream::{
75 AutoRefreshSchemaSinkContext, ConnectorPropsChange, ExtendedFragmentBackfillOrder,
76 ReplaceJobSplitPlan, SourceSplitAssignment, SplitAssignment, SplitState, UpstreamSinkInfo,
77 build_actor_connector_splits,
78};
79use crate::{MetaError, MetaResult};
80
81#[derive(Debug, Clone)]
84pub struct Reschedule {
85 pub added_actors: HashMap<WorkerId, Vec<ActorId>>,
87
88 pub removed_actors: HashSet<ActorId>,
90
91 pub vnode_bitmap_updates: HashMap<ActorId, Bitmap>,
93
94 pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
96 pub upstream_dispatcher_mapping: Option<ActorMapping>,
101
102 pub downstream_fragment_ids: Vec<FragmentId>,
104
105 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
109
110 pub newly_created_actors: HashMap<ActorId, (StreamActorWithDispatchers, WorkerId)>,
111}
112
113#[derive(Debug, Clone)]
114pub struct ReschedulePlan {
115 pub reschedules: HashMap<FragmentId, Reschedule>,
116 pub fragment_actors: HashMap<FragmentId, HashSet<ActorId>>,
119}
120
121#[derive(Debug, Clone)]
123pub struct RescheduleContext {
124 pub loaded: LoadedFragmentContext,
125 pub job_extra_info: HashMap<JobId, StreamingJobExtraInfo>,
126 pub upstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
127 pub downstream_fragments: HashMap<FragmentId, HashMap<FragmentId, DispatcherType>>,
128 pub downstream_relations: HashMap<(FragmentId, FragmentId), fragment_relation::Model>,
129}
130
131impl RescheduleContext {
132 pub fn empty() -> Self {
133 Self {
134 loaded: LoadedFragmentContext::default(),
135 job_extra_info: HashMap::new(),
136 upstream_fragments: HashMap::new(),
137 downstream_fragments: HashMap::new(),
138 downstream_relations: HashMap::new(),
139 }
140 }
141
142 pub fn is_empty(&self) -> bool {
143 self.loaded.is_empty()
144 }
145
146 pub fn for_database(&self, database_id: DatabaseId) -> Option<Self> {
147 let loaded = self.loaded.for_database(database_id)?;
148 let job_ids: HashSet<JobId> = loaded.job_map.keys().copied().collect();
149 let fragment_ids: HashSet<FragmentId> = loaded
152 .job_fragments
153 .values()
154 .flat_map(|fragments| fragments.keys().copied())
155 .collect();
156
157 let job_extra_info = self
158 .job_extra_info
159 .iter()
160 .filter(|(job_id, _)| job_ids.contains(*job_id))
161 .map(|(job_id, info)| (*job_id, info.clone()))
162 .collect();
163
164 let upstream_fragments = self
165 .upstream_fragments
166 .iter()
167 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
168 .map(|(fragment_id, upstreams)| (*fragment_id, upstreams.clone()))
169 .collect();
170
171 let downstream_fragments = self
172 .downstream_fragments
173 .iter()
174 .filter(|(fragment_id, _)| fragment_ids.contains(*fragment_id))
175 .map(|(fragment_id, downstreams)| (*fragment_id, downstreams.clone()))
176 .collect();
177
178 let downstream_relations = self
179 .downstream_relations
180 .iter()
181 .filter(|((source_fragment_id, _), _)| fragment_ids.contains(source_fragment_id))
185 .map(|(key, relation)| (*key, relation.clone()))
186 .collect();
187
188 Some(Self {
189 loaded,
190 job_extra_info,
191 upstream_fragments,
192 downstream_fragments,
193 downstream_relations,
194 })
195 }
196
197 pub fn into_database_contexts(self) -> HashMap<DatabaseId, Self> {
200 let Self {
201 loaded,
202 job_extra_info,
203 upstream_fragments,
204 downstream_fragments,
205 downstream_relations,
206 } = self;
207
208 let mut contexts: HashMap<_, _> = loaded
209 .into_database_contexts()
210 .into_iter()
211 .map(|(database_id, loaded)| {
212 (
213 database_id,
214 Self {
215 loaded,
216 job_extra_info: HashMap::new(),
217 upstream_fragments: HashMap::new(),
218 downstream_fragments: HashMap::new(),
219 downstream_relations: HashMap::new(),
220 },
221 )
222 })
223 .collect();
224
225 if contexts.is_empty() {
226 return contexts;
227 }
228
229 let mut job_databases = HashMap::new();
230 let mut fragment_databases = HashMap::new();
231 for (&database_id, context) in &contexts {
232 for job_id in context.loaded.job_map.keys().copied() {
233 job_databases.insert(job_id, database_id);
234 }
235 for fragment_id in context
236 .loaded
237 .job_fragments
238 .values()
239 .flat_map(|fragments| fragments.keys().copied())
240 {
241 fragment_databases.insert(fragment_id, database_id);
242 }
243 }
244
245 for (job_id, info) in job_extra_info {
246 if let Some(database_id) = job_databases.get(&job_id).copied() {
247 contexts
248 .get_mut(&database_id)
249 .expect("database context should exist for job")
250 .job_extra_info
251 .insert(job_id, info);
252 }
253 }
254
255 for (fragment_id, upstreams) in upstream_fragments {
256 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
257 contexts
258 .get_mut(&database_id)
259 .expect("database context should exist for fragment")
260 .upstream_fragments
261 .insert(fragment_id, upstreams);
262 }
263 }
264
265 for (fragment_id, downstreams) in downstream_fragments {
266 if let Some(database_id) = fragment_databases.get(&fragment_id).copied() {
267 contexts
268 .get_mut(&database_id)
269 .expect("database context should exist for fragment")
270 .downstream_fragments
271 .insert(fragment_id, downstreams);
272 }
273 }
274
275 for ((source_fragment_id, target_fragment_id), relation) in downstream_relations {
276 if let Some(database_id) = fragment_databases.get(&source_fragment_id).copied() {
279 contexts
280 .get_mut(&database_id)
281 .expect("database context should exist for relation source")
282 .downstream_relations
283 .insert((source_fragment_id, target_fragment_id), relation);
284 }
285 }
286
287 contexts
288 }
289}
290
291#[derive(Debug, Clone)]
298pub struct ReplaceStreamJobPlan {
299 pub old_fragments: StreamJobFragments,
300 pub new_fragments: StreamJobFragmentsToCreate,
301 pub database_resource_group: String,
303 pub replace_upstream: FragmentReplaceUpstream,
306 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
307 pub split_plan: ReplaceJobSplitPlan,
310 pub streaming_job: StreamingJob,
312 pub streaming_job_model: streaming_job::Model,
314 pub tmp_id: JobId,
316 pub to_drop_state_table_ids: Vec<TableId>,
318 pub auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
319}
320
321impl ReplaceStreamJobPlan {
322 pub fn fragment_replacements(&self) -> HashMap<FragmentId, FragmentId> {
324 let mut fragment_replacements = HashMap::new();
325 for (upstream_fragment_id, new_upstream_fragment_id) in
326 self.replace_upstream.values().flatten()
327 {
328 {
329 let r =
330 fragment_replacements.insert(*upstream_fragment_id, *new_upstream_fragment_id);
331 if let Some(r) = r {
332 assert_eq!(
333 *new_upstream_fragment_id, r,
334 "one fragment is replaced by multiple fragments"
335 );
336 }
337 }
338 }
339 fragment_replacements
340 }
341}
342
343#[derive(educe::Educe, Clone)]
344#[educe(Debug)]
345pub struct CreateStreamingJobCommandInfo {
346 #[educe(Debug(ignore))]
347 pub stream_job_fragments: StreamJobFragmentsToCreate,
348 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
349 pub database_resource_group: String,
351 pub init_split_assignment: SourceSplitAssignment,
353 pub definition: String,
354 pub job_type: StreamingJobType,
355 pub create_type: CreateType,
356 pub streaming_job: StreamingJob,
357 pub fragment_backfill_ordering: ExtendedFragmentBackfillOrder,
358 pub cdc_table_snapshot_splits: Option<Vec<CdcTableSnapshotSplitRaw>>,
359 pub locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
360 pub is_serverless: bool,
361 pub streaming_job_model: streaming_job::Model,
363 pub refresh_interval_sec: Option<u64>,
365}
366
367impl StreamJobFragments {
368 pub(super) fn new_fragment_info<'a>(
371 &'a self,
372 stream_actors: &'a HashMap<FragmentId, Vec<StreamActor>>,
373 actor_location: &'a HashMap<ActorId, WorkerId>,
374 assignment: &'a SplitAssignment,
375 ) -> impl Iterator<Item = (FragmentId, InflightFragmentInfo)> + 'a {
376 self.fragments.values().map(|fragment| {
377 (
378 fragment.fragment_id,
379 InflightFragmentInfo {
380 fragment_id: fragment.fragment_id,
381 distribution_type: fragment.distribution_type.into(),
382 fragment_type_mask: fragment.fragment_type_mask,
383 vnode_count: fragment.vnode_count(),
384 nodes: fragment.nodes.clone(),
385 actors: stream_actors
386 .get(&fragment.fragment_id)
387 .into_iter()
388 .flatten()
389 .map(|actor| {
390 (
391 actor.actor_id,
392 InflightActorInfo {
393 worker_id: actor_location[&actor.actor_id],
394 vnode_bitmap: actor.vnode_bitmap.clone(),
395 splits: assignment
396 .get(&fragment.fragment_id)
397 .and_then(|s| s.get(&actor.actor_id))
398 .cloned()
399 .unwrap_or_default(),
400 },
401 )
402 })
403 .collect(),
404 state_table_ids: fragment.state_table_ids.iter().copied().collect(),
405 },
406 )
407 })
408 }
409}
410
411#[derive(Debug, Clone)]
412pub struct SnapshotBackfillInfo {
413 pub upstream_mv_table_id_to_backfill_epoch: HashMap<TableId, Option<u64>>,
417}
418
419#[derive(Debug, Clone)]
420pub struct BatchRefreshInfo {
421 pub snapshot_backfill_info: SnapshotBackfillInfo,
422 pub refresh_interval_sec: u64,
423}
424
425#[derive(Debug, Clone)]
426pub enum CreateStreamingJobType {
427 Normal,
428 SinkIntoTable(UpstreamSinkInfo),
429 SnapshotBackfill(SnapshotBackfillInfo),
430 BatchRefresh(BatchRefreshInfo),
431}
432
433#[derive(Debug)]
438pub enum Command {
439 Flush,
442
443 Pause,
446
447 Resume,
451
452 DropStreamingJobs {
460 streaming_job_ids: HashSet<JobId>,
461 unregistered_state_table_ids: HashSet<TableId>,
463 dropped_sink_fragment_by_targets: HashMap<FragmentId, Vec<FragmentId>>,
465 },
466
467 CreateStreamingJob {
477 info: CreateStreamingJobCommandInfo,
478 job_type: CreateStreamingJobType,
479 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
480 },
481
482 RescheduleIntent {
484 context: RescheduleContext,
485 reschedule_plan: Option<ReschedulePlan>,
490 },
491
492 ReplaceStreamJob(ReplaceStreamJobPlan),
499
500 SourceChangeSplit(SplitState),
503
504 Throttle {
507 jobs: HashSet<JobId>,
508 config: HashMap<FragmentId, ThrottleConfig>,
509 },
510
511 CreateSubscription {
514 subscription_id: SubscriptionId,
515 upstream_mv_table_id: TableId,
516 retention_second: u64,
517 },
518
519 DropSubscription {
523 subscription_id: SubscriptionId,
524 upstream_mv_table_id: TableId,
525 },
526
527 AlterSubscriptionRetention {
529 subscription_id: SubscriptionId,
530 upstream_mv_table_id: TableId,
531 retention_second: u64,
532 },
533
534 ConnectorPropsChange(ConnectorPropsChange),
535
536 Refresh {
539 table_id: TableId,
540 associated_source_id: SourceId,
541 },
542 ListFinish {
543 table_id: TableId,
544 associated_source_id: SourceId,
545 },
546 LoadFinish {
547 table_id: TableId,
548 associated_source_id: SourceId,
549 },
550
551 ResetSource {
554 source_id: SourceId,
555 },
556
557 ResumeBackfill {
560 target: ResumeBackfillTarget,
561 },
562
563 InjectSourceOffsets {
567 source_id: SourceId,
568 split_offsets: HashMap<String, String>,
570 },
571}
572
573#[derive(Debug, Clone, Copy)]
574pub enum ResumeBackfillTarget {
575 Job(JobId),
576 Fragment(FragmentId),
577}
578
579impl std::fmt::Display for Command {
581 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
582 match self {
583 Command::Flush => write!(f, "Flush"),
584 Command::Pause => write!(f, "Pause"),
585 Command::Resume => write!(f, "Resume"),
586 Command::DropStreamingJobs {
587 streaming_job_ids, ..
588 } => {
589 write!(
590 f,
591 "DropStreamingJobs: {}",
592 streaming_job_ids.iter().sorted().join(", ")
593 )
594 }
595 Command::CreateStreamingJob { info, .. } => {
596 write!(f, "CreateStreamingJob: {}", info.streaming_job)
597 }
598 Command::RescheduleIntent {
599 reschedule_plan, ..
600 } => {
601 if reschedule_plan.is_some() {
602 write!(f, "RescheduleIntent(planned)")
603 } else {
604 write!(f, "RescheduleIntent")
605 }
606 }
607 Command::ReplaceStreamJob(plan) => {
608 write!(f, "ReplaceStreamJob: {}", plan.streaming_job)
609 }
610 Command::SourceChangeSplit { .. } => write!(f, "SourceChangeSplit"),
611 Command::Throttle { .. } => write!(f, "Throttle"),
612 Command::CreateSubscription {
613 subscription_id, ..
614 } => write!(f, "CreateSubscription: {subscription_id}"),
615 Command::DropSubscription {
616 subscription_id, ..
617 } => write!(f, "DropSubscription: {subscription_id}"),
618 Command::AlterSubscriptionRetention {
619 subscription_id,
620 retention_second,
621 ..
622 } => write!(
623 f,
624 "AlterSubscriptionRetention: {subscription_id} -> {retention_second}"
625 ),
626 Command::ConnectorPropsChange(_) => write!(f, "ConnectorPropsChange"),
627 Command::Refresh {
628 table_id,
629 associated_source_id,
630 } => write!(
631 f,
632 "Refresh: {} (source: {})",
633 table_id, associated_source_id
634 ),
635 Command::ListFinish {
636 table_id,
637 associated_source_id,
638 } => write!(
639 f,
640 "ListFinish: {} (source: {})",
641 table_id, associated_source_id
642 ),
643 Command::LoadFinish {
644 table_id,
645 associated_source_id,
646 } => write!(
647 f,
648 "LoadFinish: {} (source: {})",
649 table_id, associated_source_id
650 ),
651 Command::ResetSource { source_id } => write!(f, "ResetSource: {source_id}"),
652 Command::ResumeBackfill { target } => match target {
653 ResumeBackfillTarget::Job(job_id) => {
654 write!(f, "ResumeBackfill: job={job_id}")
655 }
656 ResumeBackfillTarget::Fragment(fragment_id) => {
657 write!(f, "ResumeBackfill: fragment={fragment_id}")
658 }
659 },
660 Command::InjectSourceOffsets {
661 source_id,
662 split_offsets,
663 } => write!(
664 f,
665 "InjectSourceOffsets: {} ({} splits)",
666 source_id,
667 split_offsets.len()
668 ),
669 }
670 }
671}
672
673impl Command {
674 pub fn pause() -> Self {
675 Self::Pause
676 }
677
678 pub fn resume() -> Self {
679 Self::Resume
680 }
681
682 pub fn need_checkpoint(&self) -> bool {
683 !matches!(self, Command::Resume)
685 }
686}
687
688#[derive(Debug)]
689pub enum PostCollectCommand {
690 Command(String),
691 DropStreamingJobs,
692 CreateStreamingJob {
693 info: CreateStreamingJobCommandInfo,
694 job_type: CreateStreamingJobType,
695 cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
696 resolved_split_assignment: SplitAssignment,
697 },
698 Reschedule {
699 reschedules: HashMap<FragmentId, Reschedule>,
700 },
701 ReplaceStreamJob {
702 plan: ReplaceStreamJobPlan,
703 resolved_split_assignment: SplitAssignment,
704 },
705 SourceChangeSplit {
706 split_assignment: SplitAssignment,
707 },
708 CreateSubscription {
709 subscription_id: SubscriptionId,
710 },
711 ConnectorPropsChange(ConnectorPropsChange),
712 ResumeBackfill {
713 target: ResumeBackfillTarget,
714 },
715}
716
717impl PostCollectCommand {
718 pub fn barrier() -> Self {
719 PostCollectCommand::Command("barrier".to_owned())
720 }
721
722 pub fn should_checkpoint(&self) -> bool {
723 match self {
724 PostCollectCommand::DropStreamingJobs
725 | PostCollectCommand::CreateStreamingJob { .. }
726 | PostCollectCommand::Reschedule { .. }
727 | PostCollectCommand::ReplaceStreamJob { .. }
728 | PostCollectCommand::SourceChangeSplit { .. }
729 | PostCollectCommand::CreateSubscription { .. }
730 | PostCollectCommand::ConnectorPropsChange(_)
731 | PostCollectCommand::ResumeBackfill { .. } => true,
732 PostCollectCommand::Command(_) => false,
733 }
734 }
735
736 pub fn command_name(&self) -> &str {
737 match self {
738 PostCollectCommand::Command(name) => name.as_str(),
739 PostCollectCommand::DropStreamingJobs => "DropStreamingJobs",
740 PostCollectCommand::CreateStreamingJob { .. } => "CreateStreamingJob",
741 PostCollectCommand::Reschedule { .. } => "Reschedule",
742 PostCollectCommand::ReplaceStreamJob { .. } => "ReplaceStreamJob",
743 PostCollectCommand::SourceChangeSplit { .. } => "SourceChangeSplit",
744 PostCollectCommand::CreateSubscription { .. } => "CreateSubscription",
745 PostCollectCommand::ConnectorPropsChange(_) => "ConnectorPropsChange",
746 PostCollectCommand::ResumeBackfill { .. } => "ResumeBackfill",
747 }
748 }
749}
750
751impl Display for PostCollectCommand {
752 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753 f.write_str(self.command_name())
754 }
755}
756
757#[derive(Debug, Clone, PartialEq, Eq)]
758pub enum BarrierKind {
759 Initial,
760 Barrier,
761 Checkpoint(Vec<u64>),
763}
764
765impl BarrierKind {
766 pub fn to_protobuf(&self) -> PbBarrierKind {
767 match self {
768 BarrierKind::Initial => PbBarrierKind::Initial,
769 BarrierKind::Barrier => PbBarrierKind::Barrier,
770 BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
771 }
772 }
773
774 pub fn is_checkpoint(&self) -> bool {
775 matches!(self, BarrierKind::Checkpoint(_))
776 }
777
778 pub fn is_initial(&self) -> bool {
779 matches!(self, BarrierKind::Initial)
780 }
781
782 pub fn as_str_name(&self) -> &'static str {
783 match self {
784 BarrierKind::Initial => "Initial",
785 BarrierKind::Barrier => "Barrier",
786 BarrierKind::Checkpoint(_) => "Checkpoint",
787 }
788 }
789}
790
791fn sink_original_schema_fields(columns: &[PbColumnCatalog]) -> Vec<PbField> {
792 columns
793 .iter()
794 .filter(|col| !col.is_hidden)
795 .map(|col| {
796 let column_desc = col
797 .column_desc
798 .as_ref()
799 .expect("sink column catalog should have a column descriptor");
800 PbField {
801 data_type: Some(
802 column_desc
803 .column_type
804 .as_ref()
805 .expect("sink column descriptor should have a column type")
806 .clone(),
807 ),
808 name: column_desc.name.clone(),
809 }
810 })
811 .collect()
812}
813
814impl BarrierInfo {
815 fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
816 let Some(truncate_timestamptz) = Timestamptz::from_secs(
817 self.prev_epoch.value().as_timestamptz().timestamp() - retention_second as i64,
818 ) else {
819 warn!(retention_second, prev_epoch = ?self.prev_epoch.value(), "invalid retention second value");
820 return self.prev_epoch.value();
821 };
822 Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
823 }
824}
825
826impl Command {
827 pub(super) fn collect_commit_epoch_info(
828 database_info: &InflightDatabaseInfo,
829 barrier_info: &PartialGraphBarrierInfo,
830 task: &mut CompleteBarrierTask,
831 resps: Vec<BarrierCompleteResponse>,
832 backfill_pinned_log_epoch: HashMap<JobId, (u64, HashSet<TableId>)>,
833 ) {
834 let (
835 sst_to_context,
836 synced_ssts,
837 new_table_watermarks,
838 old_value_ssts,
839 vector_index_adds,
840 truncate_tables,
841 iceberg_v3_sink_metadata,
842 ) = collect_resp_info(resps);
843
844 let new_table_fragment_infos =
845 if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } =
846 &barrier_info.post_collect_command
847 {
848 assert!(!matches!(
849 job_type,
850 CreateStreamingJobType::SnapshotBackfill(_)
851 | CreateStreamingJobType::BatchRefresh(_)
852 ));
853 let table_fragments = &info.stream_job_fragments;
854 let mut table_ids: HashSet<_> =
855 table_fragments.internal_table_ids().into_iter().collect();
856 if let Some(mv_table_id) = table_fragments.mv_table_id() {
857 table_ids.insert(mv_table_id);
858 }
859
860 vec![NewTableFragmentInfo { table_ids }]
861 } else {
862 vec![]
863 };
864
865 let mut mv_log_store_truncate_epoch = HashMap::new();
866 let mut update_truncate_epoch =
868 |table_id: TableId, truncate_epoch| match mv_log_store_truncate_epoch.entry(table_id) {
869 Entry::Occupied(mut entry) => {
870 let prev_truncate_epoch = entry.get_mut();
871 if truncate_epoch < *prev_truncate_epoch {
872 *prev_truncate_epoch = truncate_epoch;
873 }
874 }
875 Entry::Vacant(entry) => {
876 entry.insert(truncate_epoch);
877 }
878 };
879 for (mv_table_id, max_retention) in database_info.max_subscription_retention() {
880 let truncate_epoch = barrier_info
881 .barrier_info
882 .get_truncate_epoch(max_retention)
883 .0;
884 update_truncate_epoch(mv_table_id, truncate_epoch);
885 }
886 for (_, (backfill_epoch, upstream_mv_table_ids)) in backfill_pinned_log_epoch {
887 for mv_table_id in upstream_mv_table_ids {
888 update_truncate_epoch(mv_table_id, backfill_epoch);
889 }
890 }
891
892 let table_new_change_log = build_table_change_log_delta(
893 old_value_ssts.into_iter(),
894 synced_ssts.iter().map(|sst| &sst.sst_info),
895 must_match!(&barrier_info.barrier_info.kind, BarrierKind::Checkpoint(epochs) => epochs),
896 mv_log_store_truncate_epoch.into_iter(),
897 );
898
899 let epoch = barrier_info.barrier_info.prev_epoch();
900 let info = &mut task.commit_info;
901 for table_id in &barrier_info.table_ids_to_commit {
902 info.tables_to_commit
903 .try_insert(*table_id, epoch)
904 .expect("non duplicate");
905 }
906
907 info.sstables.extend(synced_ssts);
908 info.new_table_watermarks.extend(new_table_watermarks);
909 info.sst_to_context.extend(sst_to_context);
910 info.new_table_fragment_infos
911 .extend(new_table_fragment_infos);
912 info.change_log_delta.extend(table_new_change_log);
913 for (table_id, vector_index_adds) in vector_index_adds {
914 info.vector_index_delta
915 .try_insert(table_id, VectorIndexDelta::Adds(vector_index_adds))
916 .expect("non-duplicate");
917 }
918 if let PostCollectCommand::CreateStreamingJob { info: job_info, .. } =
919 &barrier_info.post_collect_command
920 && let Some(index_table) = collect_new_vector_index_info(job_info)
921 {
922 info.vector_index_delta
923 .try_insert(
924 index_table.id,
925 VectorIndexDelta::Init(PbVectorIndexInit {
926 info: Some(index_table.vector_index_info.unwrap()),
927 }),
928 )
929 .expect("non-duplicate");
930 }
931 info.truncate_tables.extend(truncate_tables);
932 task.iceberg_v3_sink_metadata
933 .extend(iceberg_v3_sink_metadata);
934 }
935}
936
937impl Command {
938 pub(super) fn pause_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
940 {
941 {
942 if !is_currently_paused {
945 Some(Mutation::Pause(PauseMutation {}))
946 } else {
947 None
948 }
949 }
950 }
951 }
952
953 pub(super) fn resume_to_mutation(is_currently_paused: bool) -> Option<Mutation> {
955 {
956 {
957 if is_currently_paused {
959 Some(Mutation::Resume(ResumeMutation {}))
960 } else {
961 None
962 }
963 }
964 }
965 }
966
967 pub(super) fn source_change_split_to_mutation(split_assignment: &SplitAssignment) -> Mutation {
969 {
970 {
971 let mut diff = HashMap::new();
972
973 for actor_splits in split_assignment.values() {
974 diff.extend(actor_splits.clone());
975 }
976
977 Mutation::Splits(SourceChangeSplitMutation {
978 actor_splits: build_actor_connector_splits(&diff),
979 })
980 }
981 }
982 }
983
984 pub(super) fn throttle_to_mutation(config: &HashMap<FragmentId, ThrottleConfig>) -> Mutation {
986 {
987 {
988 let config = config.clone();
989 Mutation::Throttle(ThrottleMutation {
990 fragment_throttle: config,
991 })
992 }
993 }
994 }
995
996 pub(super) fn drop_streaming_jobs_to_mutation(
998 actors: &Vec<ActorId>,
999 dropped_sink_fragment_by_targets: &HashMap<FragmentId, Vec<FragmentId>>,
1000 ) -> Mutation {
1001 {
1002 Mutation::Stop(StopMutation {
1003 actors: actors.clone(),
1004 dropped_sink_fragments: dropped_sink_fragment_by_targets
1005 .values()
1006 .flatten()
1007 .cloned()
1008 .collect(),
1009 })
1010 }
1011 }
1012
1013 pub(super) fn create_streaming_job_to_mutation(
1015 info: &CreateStreamingJobCommandInfo,
1016 job_type: &CreateStreamingJobType,
1017 is_currently_paused: bool,
1018 edges: &mut FragmentEdgeBuildResult,
1019 control_stream_manager: &ControlStreamManager,
1020 actor_cdc_table_snapshot_splits: Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>,
1021 split_assignment: &SplitAssignment,
1022 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1023 actor_location: &HashMap<ActorId, WorkerId>,
1024 ) -> MetaResult<Mutation> {
1025 {
1026 {
1027 let CreateStreamingJobCommandInfo {
1028 stream_job_fragments,
1029 upstream_fragment_downstreams,
1030 fragment_backfill_ordering,
1031 streaming_job,
1032 ..
1033 } = info;
1034 let database_id = streaming_job.database_id();
1035 let added_actors: Vec<ActorId> = stream_actors
1036 .values()
1037 .flatten()
1038 .map(|actor| actor.actor_id)
1039 .collect();
1040 let actor_splits = split_assignment
1041 .values()
1042 .flat_map(build_actor_connector_splits)
1043 .collect();
1044 let subscriptions_to_add =
1045 if let CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info)
1046 | CreateStreamingJobType::BatchRefresh(BatchRefreshInfo {
1047 snapshot_backfill_info,
1048 ..
1049 }) = job_type
1050 {
1051 snapshot_backfill_info
1052 .upstream_mv_table_id_to_backfill_epoch
1053 .keys()
1054 .map(|table_id| SubscriptionUpstreamInfo {
1055 subscriber_id: stream_job_fragments
1056 .stream_job_id()
1057 .as_subscriber_id(),
1058 upstream_mv_table_id: *table_id,
1059 })
1060 .collect()
1061 } else {
1062 Default::default()
1063 };
1064 let backfill_nodes_to_pause: Vec<_> =
1065 get_nodes_with_backfill_dependencies(fragment_backfill_ordering)
1066 .into_iter()
1067 .collect();
1068
1069 let new_upstream_sinks =
1070 if let CreateStreamingJobType::SinkIntoTable(UpstreamSinkInfo {
1071 sink_fragment_id,
1072 sink_output_fields,
1073 project_exprs,
1074 new_sink_downstream,
1075 ..
1076 }) = job_type
1077 {
1078 let new_sink_actors = stream_actors
1079 .get(sink_fragment_id)
1080 .unwrap_or_else(|| {
1081 panic!("upstream sink fragment {sink_fragment_id} not exist")
1082 })
1083 .iter()
1084 .map(|actor| {
1085 let worker_id = actor_location[&actor.actor_id];
1086 PbActorInfo {
1087 actor_id: actor.actor_id,
1088 host: Some(control_stream_manager.host_addr(worker_id)),
1089 partial_graph_id: to_partial_graph_id(database_id, None),
1090 }
1091 });
1092 let new_upstream_sink = PbNewUpstreamSink {
1093 info: Some(PbUpstreamSinkInfo {
1094 upstream_fragment_id: *sink_fragment_id,
1095 sink_output_schema: sink_output_fields.clone(),
1096 project_exprs: project_exprs.clone(),
1097 }),
1098 upstream_actors: new_sink_actors.collect(),
1099 };
1100 HashMap::from([(
1101 new_sink_downstream.downstream_fragment_id,
1102 new_upstream_sink,
1103 )])
1104 } else {
1105 HashMap::new()
1106 };
1107
1108 let actor_cdc_table_snapshot_splits = actor_cdc_table_snapshot_splits
1109 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1110
1111 let add_mutation = AddMutation {
1112 actor_dispatchers: edges
1113 .dispatchers
1114 .extract_if(|fragment_id, _| {
1115 upstream_fragment_downstreams.contains_key(fragment_id)
1116 })
1117 .flat_map(|(_, fragment_dispatchers)| fragment_dispatchers.into_iter())
1118 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1119 .collect(),
1120 added_actors,
1121 actor_splits,
1122 pause: is_currently_paused,
1124 subscriptions_to_add,
1125 backfill_nodes_to_pause,
1126 actor_cdc_table_snapshot_splits,
1127 new_upstream_sinks,
1128 };
1129
1130 Ok(Mutation::Add(add_mutation))
1131 }
1132 }
1133 }
1134
1135 pub(super) fn replace_stream_job_to_mutation(
1137 ReplaceStreamJobPlan {
1138 old_fragments,
1139 replace_upstream,
1140 upstream_fragment_downstreams,
1141 auto_refresh_schema_sinks,
1142 ..
1143 }: &ReplaceStreamJobPlan,
1144 edges: &mut FragmentEdgeBuildResult,
1145 database_info: &mut InflightDatabaseInfo,
1146 split_assignment: &SplitAssignment,
1147 ) -> MetaResult<Option<Mutation>> {
1148 {
1149 {
1150 let merge_updates = edges
1151 .merge_updates
1152 .extract_if(|fragment_id, _| replace_upstream.contains_key(fragment_id))
1153 .collect();
1154 let dispatchers = edges
1155 .dispatchers
1156 .extract_if(|fragment_id, _| {
1157 upstream_fragment_downstreams.contains_key(fragment_id)
1158 })
1159 .collect();
1160 let actor_cdc_table_snapshot_splits = database_info
1161 .assign_cdc_backfill_splits(old_fragments.stream_job_id)?
1162 .map(|splits| PbCdcTableSnapshotSplitsWithGeneration { splits });
1163 let old_fragments = old_fragments.fragments.keys().copied();
1164 let auto_refresh_sink_fragment_ids = auto_refresh_schema_sinks
1165 .as_ref()
1166 .into_iter()
1167 .flat_map(|sinks| sinks.iter())
1168 .map(|sink| sink.original_fragment.fragment_id);
1169 Ok(Self::generate_update_mutation_for_replace_table(
1170 old_fragments
1171 .chain(auto_refresh_sink_fragment_ids)
1172 .flat_map(|fragment_id| {
1173 database_info.fragment(fragment_id).actors.keys().copied()
1174 }),
1175 merge_updates,
1176 dispatchers,
1177 split_assignment,
1178 actor_cdc_table_snapshot_splits,
1179 auto_refresh_schema_sinks.as_ref(),
1180 ))
1181 }
1182 }
1183 }
1184
1185 pub(super) fn reschedule_to_mutation(
1187 reschedules: &HashMap<FragmentId, Reschedule>,
1188 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1189 control_stream_manager: &ControlStreamManager,
1190 database_info: &mut InflightDatabaseInfo,
1191 ) -> MetaResult<Option<Mutation>> {
1192 {
1193 {
1194 let database_id = database_info.database_id;
1195 let mut dispatcher_update = HashMap::new();
1196 for reschedule in reschedules.values() {
1197 for &(upstream_fragment_id, dispatcher_id) in
1198 &reschedule.upstream_fragment_dispatcher_ids
1199 {
1200 let upstream_actor_ids = fragment_actors
1202 .get(&upstream_fragment_id)
1203 .expect("should contain");
1204
1205 let upstream_reschedule = reschedules.get(&upstream_fragment_id);
1206
1207 for &actor_id in upstream_actor_ids {
1209 let added_downstream_actor_id = if upstream_reschedule
1210 .map(|reschedule| !reschedule.removed_actors.contains(&actor_id))
1211 .unwrap_or(true)
1212 {
1213 reschedule
1214 .added_actors
1215 .values()
1216 .flatten()
1217 .cloned()
1218 .collect()
1219 } else {
1220 Default::default()
1221 };
1222 dispatcher_update
1224 .try_insert(
1225 (actor_id, dispatcher_id),
1226 DispatcherUpdate {
1227 actor_id,
1228 dispatcher_id,
1229 hash_mapping: reschedule
1230 .upstream_dispatcher_mapping
1231 .as_ref()
1232 .map(|m| m.to_protobuf()),
1233 added_downstream_actor_id,
1234 removed_downstream_actor_id: reschedule
1235 .removed_actors
1236 .iter()
1237 .cloned()
1238 .collect(),
1239 },
1240 )
1241 .unwrap();
1242 }
1243 }
1244 }
1245 let dispatcher_update = dispatcher_update.into_values().collect();
1246
1247 let mut merge_update = HashMap::new();
1248 for (&fragment_id, reschedule) in reschedules {
1249 for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
1250 let downstream_actor_ids = fragment_actors
1252 .get(&downstream_fragment_id)
1253 .expect("should contain");
1254
1255 let downstream_removed_actors: HashSet<_> = reschedules
1259 .get(&downstream_fragment_id)
1260 .map(|downstream_reschedule| {
1261 downstream_reschedule
1262 .removed_actors
1263 .iter()
1264 .copied()
1265 .collect()
1266 })
1267 .unwrap_or_default();
1268
1269 for &actor_id in downstream_actor_ids {
1271 if downstream_removed_actors.contains(&actor_id) {
1272 continue;
1273 }
1274
1275 merge_update
1277 .try_insert(
1278 (actor_id, fragment_id),
1279 MergeUpdate {
1280 actor_id,
1281 upstream_fragment_id: fragment_id,
1282 new_upstream_fragment_id: None,
1283 added_upstream_actors: reschedule
1284 .added_actors
1285 .iter()
1286 .flat_map(|(worker_id, actors)| {
1287 let host =
1288 control_stream_manager.host_addr(*worker_id);
1289 actors.iter().map(move |&actor_id| PbActorInfo {
1290 actor_id,
1291 host: Some(host.clone()),
1292 partial_graph_id: to_partial_graph_id(
1294 database_id,
1295 None,
1296 ),
1297 })
1298 })
1299 .collect(),
1300 removed_upstream_actor_id: reschedule
1301 .removed_actors
1302 .iter()
1303 .cloned()
1304 .collect(),
1305 },
1306 )
1307 .unwrap();
1308 }
1309 }
1310 }
1311 let merge_update = merge_update.into_values().collect();
1312
1313 let mut actor_vnode_bitmap_update = HashMap::new();
1314 for reschedule in reschedules.values() {
1315 for (&actor_id, bitmap) in &reschedule.vnode_bitmap_updates {
1317 let bitmap = bitmap.to_protobuf();
1318 actor_vnode_bitmap_update
1319 .try_insert(actor_id, bitmap)
1320 .unwrap();
1321 }
1322 }
1323 let dropped_actors = reschedules
1324 .values()
1325 .flat_map(|r| r.removed_actors.iter().copied())
1326 .collect();
1327 let mut actor_splits = HashMap::new();
1328 let mut actor_cdc_table_snapshot_splits = HashMap::new();
1329 for (fragment_id, reschedule) in reschedules {
1330 for (actor_id, splits) in &reschedule.actor_splits {
1331 actor_splits.insert(
1332 *actor_id,
1333 ConnectorSplits {
1334 splits: splits.iter().map(ConnectorSplit::from).collect(),
1335 },
1336 );
1337 }
1338
1339 if let Some(assignment) =
1340 database_info.may_assign_fragment_cdc_backfill_splits(*fragment_id)?
1341 {
1342 actor_cdc_table_snapshot_splits.extend(assignment)
1343 }
1344 }
1345
1346 let actor_new_dispatchers = HashMap::new();
1348 let mutation = Mutation::Update(UpdateMutation {
1349 dispatcher_update,
1350 merge_update,
1351 actor_vnode_bitmap_update,
1352 dropped_actors,
1353 actor_splits,
1354 actor_new_dispatchers,
1355 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
1356 splits: actor_cdc_table_snapshot_splits,
1357 }),
1358 sink_schema_change: Default::default(),
1359 subscriptions_to_drop: vec![],
1360 });
1361 tracing::debug!("update mutation: {mutation:?}");
1362 Ok(Some(mutation))
1363 }
1364 }
1365 }
1366
1367 pub(super) fn create_subscription_to_mutation(
1369 upstream_mv_table_id: TableId,
1370 subscription_id: SubscriptionId,
1371 ) -> Mutation {
1372 {
1373 Mutation::Add(AddMutation {
1374 actor_dispatchers: Default::default(),
1375 added_actors: vec![],
1376 actor_splits: Default::default(),
1377 pause: false,
1378 subscriptions_to_add: vec![SubscriptionUpstreamInfo {
1379 upstream_mv_table_id,
1380 subscriber_id: subscription_id.as_subscriber_id(),
1381 }],
1382 backfill_nodes_to_pause: vec![],
1383 actor_cdc_table_snapshot_splits: None,
1384 new_upstream_sinks: Default::default(),
1385 })
1386 }
1387 }
1388
1389 pub(super) fn drop_subscription_to_mutation(
1391 upstream_mv_table_id: TableId,
1392 subscription_id: SubscriptionId,
1393 ) -> Mutation {
1394 {
1395 Mutation::DropSubscriptions(DropSubscriptionsMutation {
1396 info: vec![SubscriptionUpstreamInfo {
1397 subscriber_id: subscription_id.as_subscriber_id(),
1398 upstream_mv_table_id,
1399 }],
1400 })
1401 }
1402 }
1403
1404 pub(super) fn connector_props_change_to_mutation(config: &ConnectorPropsChange) -> Mutation {
1406 {
1407 {
1408 let mut connector_props_infos = HashMap::default();
1409 for (k, v) in config {
1410 connector_props_infos.insert(
1411 k.as_raw_id(),
1412 ConnectorPropsInfo {
1413 connector_props_info: v.clone(),
1414 },
1415 );
1416 }
1417 Mutation::ConnectorPropsChange(ConnectorPropsChangeMutation {
1418 connector_props_infos,
1419 })
1420 }
1421 }
1422 }
1423
1424 pub(super) fn refresh_to_mutation(
1426 table_id: TableId,
1427 associated_source_id: SourceId,
1428 ) -> Mutation {
1429 Mutation::RefreshStart(risingwave_pb::stream_plan::RefreshStartMutation {
1430 table_id,
1431 associated_source_id,
1432 })
1433 }
1434
1435 pub(super) fn list_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1437 Mutation::ListFinish(ListFinishMutation {
1438 associated_source_id,
1439 })
1440 }
1441
1442 pub(super) fn load_finish_to_mutation(associated_source_id: SourceId) -> Mutation {
1444 Mutation::LoadFinish(LoadFinishMutation {
1445 associated_source_id,
1446 })
1447 }
1448
1449 pub(super) fn reset_source_to_mutation(source_id: SourceId) -> Mutation {
1451 Mutation::ResetSource(risingwave_pb::stream_plan::ResetSourceMutation {
1452 source_id: source_id.as_raw_id(),
1453 })
1454 }
1455
1456 pub(super) fn resume_backfill_to_mutation(
1458 target: &ResumeBackfillTarget,
1459 database_info: &InflightDatabaseInfo,
1460 ) -> MetaResult<Option<Mutation>> {
1461 {
1462 {
1463 let fragment_ids: HashSet<_> = match target {
1464 ResumeBackfillTarget::Job(job_id) => {
1465 database_info.backfill_fragment_ids_for_job(*job_id)?
1466 }
1467 ResumeBackfillTarget::Fragment(fragment_id) => {
1468 if !database_info.is_backfill_fragment(*fragment_id)? {
1469 return Err(MetaError::invalid_parameter(format!(
1470 "fragment {} is not a backfill node",
1471 fragment_id
1472 )));
1473 }
1474 HashSet::from([*fragment_id])
1475 }
1476 };
1477 if fragment_ids.is_empty() {
1478 warn!(
1479 ?target,
1480 "resume backfill command ignored because no backfill fragments found"
1481 );
1482 Ok(None)
1483 } else {
1484 Ok(Some(Mutation::StartFragmentBackfill(
1485 StartFragmentBackfillMutation {
1486 fragment_ids: fragment_ids.into_iter().collect(),
1487 },
1488 )))
1489 }
1490 }
1491 }
1492 }
1493
1494 pub(super) fn inject_source_offsets_to_mutation(
1496 source_id: SourceId,
1497 split_offsets: &HashMap<String, String>,
1498 ) -> Mutation {
1499 Mutation::InjectSourceOffsets(risingwave_pb::stream_plan::InjectSourceOffsetsMutation {
1500 source_id: source_id.as_raw_id(),
1501 split_offsets: split_offsets.clone(),
1502 })
1503 }
1504
1505 pub(super) fn create_streaming_job_actors_to_create(
1507 info: &CreateStreamingJobCommandInfo,
1508 edges: &mut FragmentEdgeBuildResult,
1509 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1510 actor_location: &HashMap<ActorId, WorkerId>,
1511 ) -> StreamJobActorsToCreate {
1512 {
1513 {
1514 edges.collect_actors_to_create(info.stream_job_fragments.fragments.values().map(
1515 |fragment| {
1516 let actors = stream_actors
1517 .get(&fragment.fragment_id)
1518 .into_iter()
1519 .flatten()
1520 .map(|actor| (actor, actor_location[&actor.actor_id]));
1521 (
1522 fragment.fragment_id,
1523 &fragment.nodes,
1524 actors,
1525 [], )
1527 },
1528 ))
1529 }
1530 }
1531 }
1532
1533 pub(super) fn reschedule_actors_to_create(
1535 reschedules: &HashMap<FragmentId, Reschedule>,
1536 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
1537 database_info: &InflightDatabaseInfo,
1538 control_stream_manager: &ControlStreamManager,
1539 ) -> StreamJobActorsToCreate {
1540 {
1541 {
1542 let mut actor_upstreams = Self::collect_database_partial_graph_actor_upstreams(
1543 reschedules.iter().map(|(fragment_id, reschedule)| {
1544 (
1545 *fragment_id,
1546 reschedule.newly_created_actors.values().map(
1547 |((actor, dispatchers), _)| {
1548 (actor.actor_id, dispatchers.as_slice())
1549 },
1550 ),
1551 )
1552 }),
1553 Some((reschedules, fragment_actors)),
1554 database_info,
1555 control_stream_manager,
1556 );
1557 let mut map: HashMap<WorkerId, HashMap<_, (_, Vec<_>, _)>> = HashMap::new();
1558 for (fragment_id, (actor, dispatchers), worker_id) in
1559 reschedules.iter().flat_map(|(fragment_id, reschedule)| {
1560 reschedule
1561 .newly_created_actors
1562 .values()
1563 .map(|(actors, status)| (*fragment_id, actors, status))
1564 })
1565 {
1566 let upstreams = actor_upstreams.remove(&actor.actor_id).unwrap_or_default();
1567 map.entry(*worker_id)
1568 .or_default()
1569 .entry(fragment_id)
1570 .or_insert_with(|| {
1571 let node = database_info.fragment(fragment_id).nodes.clone();
1572 let subscribers =
1573 database_info.fragment_subscribers(fragment_id).collect();
1574 (node, vec![], subscribers)
1575 })
1576 .1
1577 .push((actor.clone(), upstreams, dispatchers.clone()));
1578 }
1579 map
1580 }
1581 }
1582 }
1583
1584 pub(super) fn replace_stream_job_actors_to_create(
1586 replace_table: &ReplaceStreamJobPlan,
1587 edges: &mut FragmentEdgeBuildResult,
1588 database_info: &InflightDatabaseInfo,
1589 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1590 actor_location: &HashMap<ActorId, WorkerId>,
1591 ) -> StreamJobActorsToCreate {
1592 {
1593 {
1594 let mut actors = edges.collect_actors_to_create(
1595 replace_table
1596 .new_fragments
1597 .fragments
1598 .values()
1599 .map(|fragment| {
1600 let actors = stream_actors
1601 .get(&fragment.fragment_id)
1602 .into_iter()
1603 .flatten()
1604 .map(|actor| (actor, actor_location[&actor.actor_id]));
1605 (
1606 fragment.fragment_id,
1607 &fragment.nodes,
1608 actors,
1609 database_info
1610 .job_subscribers(replace_table.old_fragments.stream_job_id),
1611 )
1612 }),
1613 );
1614
1615 if let Some(sinks) = &replace_table.auto_refresh_schema_sinks {
1617 let sink_actors = edges.collect_actors_to_create(sinks.iter().map(|sink| {
1618 (
1619 sink.new_fragment.fragment_id,
1620 &sink.new_fragment.nodes,
1621 stream_actors
1622 .get(&sink.new_fragment.fragment_id)
1623 .into_iter()
1624 .flatten()
1625 .map(|actor| (actor, actor_location[&actor.actor_id])),
1626 database_info.job_subscribers(sink.original_sink.id.as_job_id()),
1627 )
1628 }));
1629 for (worker_id, fragment_actors) in sink_actors {
1630 actors.entry(worker_id).or_default().extend(fragment_actors);
1631 }
1632 }
1633 actors
1634 }
1635 }
1636 }
1637
1638 fn generate_update_mutation_for_replace_table(
1639 dropped_actors: impl IntoIterator<Item = ActorId>,
1640 merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
1641 dispatchers: FragmentActorDispatchers,
1642 split_assignment: &SplitAssignment,
1643 cdc_table_snapshot_split_assignment: Option<PbCdcTableSnapshotSplitsWithGeneration>,
1644 auto_refresh_schema_sinks: Option<&Vec<AutoRefreshSchemaSinkContext>>,
1645 ) -> Option<Mutation> {
1646 let dropped_actors = dropped_actors.into_iter().collect();
1647
1648 let actor_new_dispatchers = dispatchers
1649 .into_values()
1650 .flatten()
1651 .map(|(actor_id, dispatchers)| (actor_id, Dispatchers { dispatchers }))
1652 .collect();
1653
1654 let actor_splits = split_assignment
1655 .values()
1656 .flat_map(build_actor_connector_splits)
1657 .collect();
1658 Some(Mutation::Update(UpdateMutation {
1659 actor_new_dispatchers,
1660 merge_update: merge_updates.into_values().flatten().collect(),
1661 dropped_actors,
1662 actor_splits,
1663 actor_cdc_table_snapshot_splits: cdc_table_snapshot_split_assignment,
1664 sink_schema_change: auto_refresh_schema_sinks
1665 .as_ref()
1666 .into_iter()
1667 .flat_map(|sinks| {
1668 sinks.iter().map(|sink| {
1669 let op = if !sink.removed_column_names.is_empty() {
1670 PbSinkSchemaChangeOp::DropColumns(PbSinkDropColumnsOp {
1671 column_names: sink.removed_column_names.clone(),
1672 })
1673 } else {
1674 PbSinkSchemaChangeOp::AddColumns(PbSinkAddColumnsOp {
1675 fields: sink
1676 .newly_add_fields
1677 .iter()
1678 .map(|field| field.to_prost())
1679 .collect(),
1680 })
1681 };
1682 (
1683 sink.original_sink.id.as_raw_id(),
1684 PbSinkSchemaChange {
1685 original_schema: sink_original_schema_fields(
1686 &sink.original_sink.columns,
1687 ),
1688 op: Some(op),
1689 },
1690 )
1691 })
1692 })
1693 .collect(),
1694 ..Default::default()
1695 }))
1696 }
1697}
1698
1699impl Command {
1700 #[expect(clippy::type_complexity)]
1701 pub(super) fn collect_database_partial_graph_actor_upstreams(
1702 actor_dispatchers: impl Iterator<
1703 Item = (FragmentId, impl Iterator<Item = (ActorId, &[Dispatcher])>),
1704 >,
1705 reschedule_dispatcher_update: Option<(
1706 &HashMap<FragmentId, Reschedule>,
1707 &HashMap<FragmentId, HashSet<ActorId>>,
1708 )>,
1709 database_info: &InflightDatabaseInfo,
1710 control_stream_manager: &ControlStreamManager,
1711 ) -> HashMap<ActorId, ActorUpstreams> {
1712 let mut actor_upstreams: HashMap<ActorId, ActorUpstreams> = HashMap::new();
1713 for (upstream_fragment_id, upstream_actors) in actor_dispatchers {
1714 let upstream_fragment = database_info.fragment(upstream_fragment_id);
1715 for (upstream_actor_id, dispatchers) in upstream_actors {
1716 let upstream_actor_location =
1717 upstream_fragment.actors[&upstream_actor_id].worker_id;
1718 let upstream_actor_host = control_stream_manager.host_addr(upstream_actor_location);
1719 for downstream_actor_id in dispatchers
1720 .iter()
1721 .flat_map(|dispatcher| dispatcher.downstream_actor_id.iter())
1722 {
1723 actor_upstreams
1724 .entry(*downstream_actor_id)
1725 .or_default()
1726 .entry(upstream_fragment_id)
1727 .or_default()
1728 .insert(
1729 upstream_actor_id,
1730 PbActorInfo {
1731 actor_id: upstream_actor_id,
1732 host: Some(upstream_actor_host.clone()),
1733 partial_graph_id: to_partial_graph_id(
1734 database_info.database_id,
1735 None,
1736 ),
1737 },
1738 );
1739 }
1740 }
1741 }
1742 if let Some((reschedules, fragment_actors)) = reschedule_dispatcher_update {
1743 for reschedule in reschedules.values() {
1744 for (upstream_fragment_id, _) in &reschedule.upstream_fragment_dispatcher_ids {
1745 let upstream_fragment = database_info.fragment(*upstream_fragment_id);
1746 let upstream_reschedule = reschedules.get(upstream_fragment_id);
1747 for upstream_actor_id in fragment_actors
1748 .get(upstream_fragment_id)
1749 .expect("should exist")
1750 {
1751 let upstream_actor_location =
1752 upstream_fragment.actors[upstream_actor_id].worker_id;
1753 let upstream_actor_host =
1754 control_stream_manager.host_addr(upstream_actor_location);
1755 if let Some(upstream_reschedule) = upstream_reschedule
1756 && upstream_reschedule
1757 .removed_actors
1758 .contains(upstream_actor_id)
1759 {
1760 continue;
1761 }
1762 for (_, downstream_actor_id) in
1763 reschedule
1764 .added_actors
1765 .iter()
1766 .flat_map(|(worker_id, actors)| {
1767 actors.iter().map(|actor| (*worker_id, *actor))
1768 })
1769 {
1770 actor_upstreams
1771 .entry(downstream_actor_id)
1772 .or_default()
1773 .entry(*upstream_fragment_id)
1774 .or_default()
1775 .insert(
1776 *upstream_actor_id,
1777 PbActorInfo {
1778 actor_id: *upstream_actor_id,
1779 host: Some(upstream_actor_host.clone()),
1780 partial_graph_id: to_partial_graph_id(
1781 database_info.database_id,
1782 None,
1783 ),
1784 },
1785 );
1786 }
1787 }
1788 }
1789 }
1790 }
1791 actor_upstreams
1792 }
1793}
1794
1795#[cfg(test)]
1796mod tests {
1797 use risingwave_pb::data::PbDataType;
1798 use risingwave_pb::data::data_type::PbTypeName;
1799 use risingwave_pb::plan_common::{ColumnCatalog as PbColumnCatalog, ColumnDesc};
1800
1801 use super::sink_original_schema_fields;
1802
1803 fn column(name: &str, type_name: PbTypeName, is_hidden: bool) -> PbColumnCatalog {
1804 PbColumnCatalog {
1805 column_desc: Some(ColumnDesc {
1806 column_type: Some(PbDataType {
1807 type_name: type_name as i32,
1808 ..Default::default()
1809 }),
1810 name: name.to_owned(),
1811 ..Default::default()
1812 }),
1813 is_hidden,
1814 }
1815 }
1816
1817 #[test]
1818 fn test_sink_original_schema_fields_skips_hidden_columns() {
1819 let columns = vec![
1820 column("k", PbTypeName::Int32, false),
1821 column("v", PbTypeName::Varchar, false),
1822 column("_row_id", PbTypeName::Serial, true),
1823 ];
1824
1825 let fields = sink_original_schema_fields(&columns);
1826 let field_names = fields
1827 .iter()
1828 .map(|field| field.name.as_str())
1829 .collect::<Vec<_>>();
1830
1831 assert_eq!(field_names, ["k", "v"]);
1832 }
1833}