1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{AddAssign, Deref};
17
18use itertools::Itertools;
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, TableId};
21use risingwave_common::hash::{
22 ActorAlignmentId, IsSingleton, VirtualNode, VnodeCount, VnodeCountCompat,
23};
24use risingwave_common::util::stream_graph_visitor::{self, visit_stream_node_body};
25use risingwave_connector::source::SplitImpl;
26use risingwave_meta_model::{DispatcherType, SourceId, StreamingParallelism, WorkerId};
27use risingwave_pb::catalog::Table;
28use risingwave_pb::common::{ActorInfo, PbActorLocation};
29use risingwave_pb::meta::table_fragments::actor_status::ActorState;
30use risingwave_pb::meta::table_fragments::fragment::{
31 FragmentDistributionType, PbFragmentDistributionType,
32};
33use risingwave_pb::meta::table_fragments::{ActorStatus, PbFragment, State};
34use risingwave_pb::meta::table_parallelism::{
35 FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
36 PbParallelism,
37};
38use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
39use risingwave_pb::plan_common::PbExprContext;
40use risingwave_pb::stream_plan::stream_node::NodeBody;
41use risingwave_pb::stream_plan::{
42 DispatchStrategy, Dispatcher, PbDispatchOutputMapping, PbDispatcher, PbStreamActor,
43 PbStreamContext, StreamNode,
44};
45
46use super::{ActorId, FragmentId};
47use crate::stream::{SplitAssignment, build_actor_connector_splits};
48
49#[derive(Debug, Copy, Clone, Eq, PartialEq)]
51pub enum TableParallelism {
52 Adaptive,
54 Fixed(usize),
57 Custom,
64}
65
66impl From<PbTableParallelism> for TableParallelism {
67 fn from(value: PbTableParallelism) -> Self {
68 use Parallelism::*;
69 match &value.parallelism {
70 Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
71 Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
72 Some(Custom(_)) => Self::Custom,
73 _ => unreachable!(),
74 }
75 }
76}
77
78impl From<TableParallelism> for PbTableParallelism {
79 fn from(value: TableParallelism) -> Self {
80 use TableParallelism::*;
81
82 let parallelism = match value {
83 Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
84 Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
85 parallelism: n as u32,
86 }),
87 Custom => PbParallelism::Custom(PbCustomParallelism {}),
88 };
89
90 Self {
91 parallelism: Some(parallelism),
92 }
93 }
94}
95
96impl From<StreamingParallelism> for TableParallelism {
97 fn from(value: StreamingParallelism) -> Self {
98 match value {
99 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
100 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n),
101 StreamingParallelism::Custom => TableParallelism::Custom,
102 }
103 }
104}
105
106impl From<TableParallelism> for StreamingParallelism {
107 fn from(value: TableParallelism) -> Self {
108 match value {
109 TableParallelism::Adaptive => StreamingParallelism::Adaptive,
110 TableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
111 TableParallelism::Custom => StreamingParallelism::Custom,
112 }
113 }
114}
115
116pub type ActorUpstreams = BTreeMap<FragmentId, HashMap<ActorId, ActorInfo>>;
117pub type StreamActorWithDispatchers = (StreamActor, Vec<PbDispatcher>);
118pub type StreamActorWithUpDownstreams = (StreamActor, ActorUpstreams, Vec<PbDispatcher>);
119pub type FragmentActorDispatchers = HashMap<FragmentId, HashMap<ActorId, Vec<PbDispatcher>>>;
120
121pub type FragmentDownstreamRelation = HashMap<FragmentId, Vec<DownstreamFragmentRelation>>;
122pub type FragmentReplaceUpstream = HashMap<FragmentId, HashMap<FragmentId, FragmentId>>;
124pub type FragmentNewNoShuffle = HashMap<FragmentId, HashMap<FragmentId, HashMap<ActorId, ActorId>>>;
127
128#[derive(Debug, Clone)]
129pub struct DownstreamFragmentRelation {
130 pub downstream_fragment_id: FragmentId,
131 pub dispatcher_type: DispatcherType,
132 pub dist_key_indices: Vec<u32>,
133 pub output_mapping: PbDispatchOutputMapping,
134}
135
136impl From<(FragmentId, DispatchStrategy)> for DownstreamFragmentRelation {
137 fn from((fragment_id, dispatch): (FragmentId, DispatchStrategy)) -> Self {
138 Self {
139 downstream_fragment_id: fragment_id,
140 dispatcher_type: dispatch.get_type().unwrap().into(),
141 dist_key_indices: dispatch.dist_key_indices,
142 output_mapping: dispatch.output_mapping.unwrap(),
143 }
144 }
145}
146
147#[derive(Debug, Clone)]
148pub struct StreamJobFragmentsToCreate {
149 pub inner: StreamJobFragments,
150 pub downstreams: FragmentDownstreamRelation,
151}
152
153impl Deref for StreamJobFragmentsToCreate {
154 type Target = StreamJobFragments;
155
156 fn deref(&self) -> &Self::Target {
157 &self.inner
158 }
159}
160
161#[derive(Clone, Debug)]
162pub struct StreamActor {
163 pub actor_id: u32,
164 pub fragment_id: u32,
165 pub vnode_bitmap: Option<Bitmap>,
166 pub mview_definition: String,
167 pub expr_context: Option<PbExprContext>,
168}
169
170impl StreamActor {
171 fn to_protobuf(&self, dispatchers: impl Iterator<Item = Dispatcher>) -> PbStreamActor {
172 PbStreamActor {
173 actor_id: self.actor_id,
174 fragment_id: self.fragment_id,
175 dispatcher: dispatchers.collect(),
176 vnode_bitmap: self
177 .vnode_bitmap
178 .as_ref()
179 .map(|bitmap| bitmap.to_protobuf()),
180 mview_definition: self.mview_definition.clone(),
181 expr_context: self.expr_context.clone(),
182 }
183 }
184}
185
186#[derive(Clone, Debug, Default)]
187pub struct Fragment {
188 pub fragment_id: FragmentId,
189 pub fragment_type_mask: FragmentTypeMask,
190 pub distribution_type: PbFragmentDistributionType,
191 pub actors: Vec<StreamActor>,
192 pub state_table_ids: Vec<u32>,
193 pub maybe_vnode_count: Option<u32>,
194 pub nodes: StreamNode,
195}
196
197impl Fragment {
198 pub fn to_protobuf(
199 &self,
200 upstream_fragments: impl Iterator<Item = FragmentId>,
201 dispatchers: Option<&HashMap<ActorId, Vec<Dispatcher>>>,
202 ) -> PbFragment {
203 PbFragment {
204 fragment_id: self.fragment_id,
205 fragment_type_mask: self.fragment_type_mask.into(),
206 distribution_type: self.distribution_type as _,
207 actors: self
208 .actors
209 .iter()
210 .map(|actor| {
211 actor.to_protobuf(
212 dispatchers
213 .and_then(|dispatchers| dispatchers.get(&(actor.actor_id as _)))
214 .into_iter()
215 .flatten()
216 .cloned(),
217 )
218 })
219 .collect(),
220 state_table_ids: self.state_table_ids.clone(),
221 upstream_fragment_ids: upstream_fragments.collect(),
222 maybe_vnode_count: self.maybe_vnode_count,
223 nodes: Some(self.nodes.clone()),
224 }
225 }
226}
227
228impl VnodeCountCompat for Fragment {
229 fn vnode_count_inner(&self) -> VnodeCount {
230 VnodeCount::from_protobuf(self.maybe_vnode_count, || self.is_singleton())
231 }
232}
233
234impl IsSingleton for Fragment {
235 fn is_singleton(&self) -> bool {
236 matches!(self.distribution_type, FragmentDistributionType::Single)
237 }
238}
239
240#[derive(Debug, Clone)]
246pub struct StreamJobFragments {
247 pub stream_job_id: TableId,
249
250 pub state: State,
252
253 pub fragments: BTreeMap<FragmentId, Fragment>,
255
256 pub actor_status: BTreeMap<ActorId, ActorStatus>,
258
259 pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
262
263 pub ctx: StreamContext,
265
266 pub assigned_parallelism: TableParallelism,
268
269 pub max_parallelism: usize,
280}
281
282#[derive(Debug, Clone, Default)]
283pub struct StreamContext {
284 pub timezone: Option<String>,
286}
287
288impl StreamContext {
289 pub fn to_protobuf(&self) -> PbStreamContext {
290 PbStreamContext {
291 timezone: self.timezone.clone().unwrap_or("".into()),
292 }
293 }
294
295 pub fn to_expr_context(&self) -> PbExprContext {
296 PbExprContext {
297 time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
299 strict_mode: false,
300 }
301 }
302
303 pub fn from_protobuf(prost: &PbStreamContext) -> Self {
304 Self {
305 timezone: if prost.get_timezone().is_empty() {
306 None
307 } else {
308 Some(prost.get_timezone().clone())
309 },
310 }
311 }
312}
313
314impl StreamJobFragments {
315 pub fn to_protobuf(
316 &self,
317 fragment_upstreams: &HashMap<FragmentId, HashSet<FragmentId>>,
318 fragment_dispatchers: &FragmentActorDispatchers,
319 ) -> PbTableFragments {
320 PbTableFragments {
321 table_id: self.stream_job_id.table_id(),
322 state: self.state as _,
323 fragments: self
324 .fragments
325 .iter()
326 .map(|(id, fragment)| {
327 (
328 *id,
329 fragment.to_protobuf(
330 fragment_upstreams.get(id).into_iter().flatten().cloned(),
331 fragment_dispatchers.get(&(*id as _)),
332 ),
333 )
334 })
335 .collect(),
336 actor_status: self.actor_status.clone().into_iter().collect(),
337 actor_splits: build_actor_connector_splits(&self.actor_splits),
338 ctx: Some(self.ctx.to_protobuf()),
339 parallelism: Some(self.assigned_parallelism.into()),
340 node_label: "".to_owned(),
341 backfill_done: true,
342 max_parallelism: Some(self.max_parallelism as _),
343 }
344 }
345}
346
347pub type StreamJobActorsToCreate =
348 HashMap<WorkerId, HashMap<FragmentId, (StreamNode, Vec<StreamActorWithUpDownstreams>)>>;
349
350impl StreamJobFragments {
351 pub fn for_test(table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
353 Self::new(
354 table_id,
355 fragments,
356 &BTreeMap::new(),
357 StreamContext::default(),
358 TableParallelism::Adaptive,
359 VirtualNode::COUNT_FOR_TEST,
360 )
361 }
362
363 pub fn new(
366 stream_job_id: TableId,
367 fragments: BTreeMap<FragmentId, Fragment>,
368 actor_locations: &BTreeMap<ActorId, ActorAlignmentId>,
369 ctx: StreamContext,
370 table_parallelism: TableParallelism,
371 max_parallelism: usize,
372 ) -> Self {
373 let actor_status = actor_locations
374 .iter()
375 .map(|(&actor_id, alignment_id)| {
376 (
377 actor_id,
378 ActorStatus {
379 location: PbActorLocation::from_worker(alignment_id.worker_id()),
380 state: ActorState::Inactive as i32,
381 },
382 )
383 })
384 .collect();
385
386 Self {
387 stream_job_id,
388 state: State::Initial,
389 fragments,
390 actor_status,
391 actor_splits: HashMap::default(),
392 ctx,
393 assigned_parallelism: table_parallelism,
394 max_parallelism,
395 }
396 }
397
398 pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
399 self.fragments.keys().cloned()
400 }
401
402 pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
403 self.fragments.values()
404 }
405
406 pub fn fragment_actors(&self, fragment_id: FragmentId) -> &[StreamActor] {
407 self.fragments
408 .get(&fragment_id)
409 .map(|f| f.actors.as_slice())
410 .unwrap_or_default()
411 }
412
413 pub fn stream_job_id(&self) -> TableId {
415 self.stream_job_id
416 }
417
418 pub fn state(&self) -> State {
420 self.state
421 }
422
423 pub fn timezone(&self) -> Option<String> {
425 self.ctx.timezone.clone()
426 }
427
428 pub fn is_created(&self) -> bool {
430 self.state == State::Created
431 }
432
433 pub fn is_initial(&self) -> bool {
435 self.state == State::Initial
436 }
437
438 pub fn set_state(&mut self, state: State) {
440 self.state = state;
441 }
442
443 pub fn update_actors_state(&mut self, state: ActorState) {
445 for actor_status in self.actor_status.values_mut() {
446 actor_status.set_state(state);
447 }
448 }
449
450 pub fn set_actor_splits_by_split_assignment(&mut self, split_assignment: SplitAssignment) {
451 self.actor_splits = split_assignment.into_values().flatten().collect();
452 }
453
454 pub fn actor_ids(&self) -> Vec<ActorId> {
456 self.fragments
457 .values()
458 .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
459 .collect()
460 }
461
462 pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
463 self.fragments
464 .values()
465 .flat_map(|fragment| {
466 fragment
467 .actors
468 .iter()
469 .map(|actor| (actor.actor_id, fragment.fragment_id))
470 })
471 .collect()
472 }
473
474 #[cfg(test)]
476 pub fn actors(&self) -> Vec<StreamActor> {
477 self.fragments
478 .values()
479 .flat_map(|fragment| fragment.actors.clone())
480 .collect()
481 }
482
483 pub fn filter_actor_ids(
485 &self,
486 check_type: impl Fn(FragmentTypeMask) -> bool + 'static,
487 ) -> impl Iterator<Item = ActorId> + '_ {
488 self.fragments
489 .values()
490 .filter(move |fragment| check_type(fragment.fragment_type_mask))
491 .flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
492 }
493
494 pub fn mview_actor_ids(&self) -> Vec<ActorId> {
496 Self::filter_actor_ids(self, |fragment_type_mask| {
497 fragment_type_mask.contains(FragmentTypeFlag::Mview)
498 })
499 .collect()
500 }
501
502 pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
504 let mut actor_ids = vec![];
505 for fragment in self.fragments.values() {
506 if fragment
507 .fragment_type_mask
508 .contains(FragmentTypeFlag::CdcFilter)
509 {
510 return vec![];
513 }
514 if fragment.fragment_type_mask.contains_any([
515 FragmentTypeFlag::Values,
516 FragmentTypeFlag::StreamScan,
517 FragmentTypeFlag::SourceScan,
518 ]) {
519 actor_ids.extend(fragment.actors.iter().map(|actor| {
520 (
521 actor.actor_id,
522 BackfillUpstreamType::from_fragment_type_mask(fragment.fragment_type_mask),
523 )
524 }));
525 }
526 }
527 actor_ids
528 }
529
530 pub fn root_fragment(&self) -> Option<Fragment> {
531 self.mview_fragment()
532 .or_else(|| self.sink_fragment())
533 .or_else(|| self.source_fragment())
534 }
535
536 pub fn mview_fragment(&self) -> Option<Fragment> {
538 self.fragments
539 .values()
540 .find(|fragment| {
541 fragment
542 .fragment_type_mask
543 .contains(FragmentTypeFlag::Mview)
544 })
545 .cloned()
546 }
547
548 pub fn source_fragment(&self) -> Option<Fragment> {
549 self.fragments
550 .values()
551 .find(|fragment| {
552 fragment
553 .fragment_type_mask
554 .contains(FragmentTypeFlag::Source)
555 })
556 .cloned()
557 }
558
559 pub fn sink_fragment(&self) -> Option<Fragment> {
560 self.fragments
561 .values()
562 .find(|fragment| fragment.fragment_type_mask.contains(FragmentTypeFlag::Sink))
563 .cloned()
564 }
565
566 pub fn snapshot_backfill_actor_ids(&self) -> HashSet<ActorId> {
567 Self::filter_actor_ids(self, |mask| {
568 mask.contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
569 })
570 .collect()
571 }
572
573 pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
576 let mut source_fragments = HashMap::new();
577
578 for fragment in self.fragments() {
579 {
580 if let Some(source_id) = fragment.nodes.find_stream_source() {
581 source_fragments
582 .entry(source_id as SourceId)
583 .or_insert(BTreeSet::new())
584 .insert(fragment.fragment_id as FragmentId);
585 }
586 }
587 }
588 source_fragments
589 }
590
591 pub fn source_backfill_fragments(
596 &self,
597 ) -> HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>> {
598 let mut source_backfill_fragments = HashMap::new();
599
600 for fragment in self.fragments() {
601 {
602 if let Some((source_id, upstream_source_fragment_id)) =
603 fragment.nodes.find_source_backfill()
604 {
605 source_backfill_fragments
606 .entry(source_id as SourceId)
607 .or_insert(BTreeSet::new())
608 .insert((fragment.fragment_id, upstream_source_fragment_id));
609 }
610 }
611 }
612 source_backfill_fragments
613 }
614
615 pub fn union_fragment_for_table(&mut self) -> &mut Fragment {
618 let mut union_fragment_id = None;
619 for (fragment_id, fragment) in &self.fragments {
620 {
621 {
622 visit_stream_node_body(&fragment.nodes, |body| {
623 if let NodeBody::Union(_) = body {
624 if let Some(union_fragment_id) = union_fragment_id.as_mut() {
625 assert_eq!(*union_fragment_id, *fragment_id);
627 } else {
628 union_fragment_id = Some(*fragment_id);
629 }
630 }
631 })
632 }
633 }
634 }
635
636 let union_fragment_id =
637 union_fragment_id.expect("fragment of placeholder merger not found");
638
639 (self
640 .fragments
641 .get_mut(&union_fragment_id)
642 .unwrap_or_else(|| panic!("fragment {} not found", union_fragment_id))) as _
643 }
644
645 fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
647 let table_id = match stream_node.node_body.as_ref() {
648 Some(NodeBody::StreamScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
649 Some(NodeBody::StreamCdcScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
650 _ => None,
651 };
652 if let Some(table_id) = table_id {
653 table_ids.entry(table_id).or_default().add_assign(1);
654 }
655
656 for child in &stream_node.input {
657 Self::resolve_dependent_table(child, table_ids);
658 }
659 }
660
661 pub fn upstream_table_counts(&self) -> HashMap<TableId, usize> {
663 let mut table_ids = HashMap::new();
664 self.fragments.values().for_each(|fragment| {
665 Self::resolve_dependent_table(&fragment.nodes, &mut table_ids);
666 });
667
668 table_ids
669 }
670
671 pub fn worker_actor_states(&self) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>> {
673 let mut map = BTreeMap::default();
674 for (&actor_id, actor_status) in &self.actor_status {
675 let node_id = actor_status.worker_id() as WorkerId;
676 map.entry(node_id)
677 .or_insert_with(Vec::new)
678 .push((actor_id, actor_status.state()));
679 }
680 map
681 }
682
683 pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
685 let mut map = BTreeMap::default();
686 for (&actor_id, actor_status) in &self.actor_status {
687 let node_id = actor_status.worker_id() as WorkerId;
688 map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
689 }
690 map
691 }
692
693 pub fn active_actors(&self) -> Vec<StreamActor> {
695 let mut actors = vec![];
696 for fragment in self.fragments.values() {
697 for actor in &fragment.actors {
698 if self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 {
699 continue;
700 }
701 actors.push(actor.clone());
702 }
703 }
704 actors
705 }
706
707 pub fn actors_to_create(
708 &self,
709 ) -> impl Iterator<
710 Item = (
711 FragmentId,
712 &StreamNode,
713 impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
714 ),
715 > + '_ {
716 self.fragments.values().map(move |fragment| {
717 (
718 fragment.fragment_id,
719 &fragment.nodes,
720 fragment.actors.iter().map(move |actor| {
721 let worker_id = self
722 .actor_status
723 .get(&actor.actor_id)
724 .expect("should exist")
725 .worker_id() as WorkerId;
726 (actor, worker_id)
727 }),
728 )
729 })
730 }
731
732 pub fn mv_table_id(&self) -> Option<u32> {
733 if self
734 .fragments
735 .values()
736 .flat_map(|f| f.state_table_ids.iter())
737 .any(|table_id| *table_id == self.stream_job_id.table_id)
738 {
739 Some(self.stream_job_id.table_id)
740 } else {
741 None
742 }
743 }
744
745 pub fn collect_tables(fragments: impl Iterator<Item = &Fragment>) -> BTreeMap<u32, Table> {
746 let mut tables = BTreeMap::new();
747 for fragment in fragments {
748 stream_graph_visitor::visit_stream_node_tables_inner(
749 &mut fragment.nodes.clone(),
750 false,
751 true,
752 |table, _| {
753 let table_id = table.id;
754 tables
755 .try_insert(table_id, table.clone())
756 .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
757 },
758 );
759 }
760 tables
761 }
762
763 pub fn internal_table_ids(&self) -> Vec<u32> {
765 self.fragments
766 .values()
767 .flat_map(|f| f.state_table_ids.clone())
768 .filter(|&t| t != self.stream_job_id.table_id)
769 .collect_vec()
770 }
771
772 pub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_ {
774 self.fragments
775 .values()
776 .flat_map(|f| f.state_table_ids.clone())
777 }
778
779 pub fn fill_expr_context(mut self) -> Self {
781 self.fragments.values_mut().for_each(|fragment| {
782 fragment.actors.iter_mut().for_each(|actor| {
783 if actor.expr_context.is_none() {
784 actor.expr_context = Some(self.ctx.to_expr_context());
785 }
786 });
787 });
788 self
789 }
790}
791
792#[derive(Debug, Clone, Copy, PartialEq, Eq)]
793pub enum BackfillUpstreamType {
794 MView,
795 Values,
796 Source,
797}
798
799impl BackfillUpstreamType {
800 pub fn from_fragment_type_mask(mask: FragmentTypeMask) -> Self {
801 let is_mview = mask.contains(FragmentTypeFlag::StreamScan);
802 let is_values = mask.contains(FragmentTypeFlag::Values);
803 let is_source = mask.contains(FragmentTypeFlag::SourceScan);
804
805 debug_assert!(
808 is_mview as u8 + is_values as u8 + is_source as u8 == 1,
809 "a backfill fragment should either be mview, value or source, found {:?}",
810 mask
811 );
812
813 if is_mview {
814 BackfillUpstreamType::MView
815 } else if is_values {
816 BackfillUpstreamType::Values
817 } else if is_source {
818 BackfillUpstreamType::Source
819 } else {
820 unreachable!("invalid fragment type mask: {:?}", mask);
821 }
822 }
823}