risingwave_meta/stream/stream_graph/
actor.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::num::NonZeroUsize;
17
18use assert_matches::assert_matches;
19use risingwave_common::bail;
20use risingwave_common::hash::{IsSingleton, VnodeCount, VnodeCountCompat};
21use risingwave_common::id::JobId;
22use risingwave_common::util::iter_util::ZipEqFast;
23use risingwave_common::util::stream_graph_visitor::visit_tables;
24use risingwave_pb::stream_plan::stream_node::NodeBody;
25use risingwave_pb::stream_plan::{
26    DispatchStrategy, DispatcherType, MergeNode, StreamNode, StreamScanType,
27};
28
29use crate::MetaResult;
30use crate::model::{Fragment, FragmentDownstreamRelation, FragmentId, FragmentReplaceUpstream};
31use crate::stream::stream_graph::fragment::{
32    CompleteStreamFragmentGraph, DownstreamExternalEdgeId, EdgeId, EitherFragment,
33    StreamFragmentEdge,
34};
35use crate::stream::stream_graph::id::GlobalFragmentId;
36use crate::stream::stream_graph::schedule;
37use crate::stream::stream_graph::schedule::Distribution;
38
39impl FragmentActorBuilder {
40    /// Rewrite the actor body.
41    ///
42    /// During this process, the following things will be done:
43    /// 1. Replace the logical `Exchange` in node's input with `Merge`, which can be executed on the
44    ///    compute nodes.
45    fn rewrite(&self) -> MetaResult<StreamNode> {
46        self.rewrite_inner(&self.node, 0)
47    }
48
49    fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult<StreamNode> {
50        match stream_node.get_node_body()? {
51            // Leaf node `Exchange`.
52            NodeBody::Exchange(exchange) => {
53                // The exchange node should always be the bottom of the plan node. If we find one
54                // when the depth is 0, it means that the plan node is not well-formed.
55                if depth == 0 {
56                    bail!(
57                        "there should be no ExchangeNode on the top of the plan node: {:#?}",
58                        stream_node
59                    )
60                }
61                assert!(!stream_node.get_fields().is_empty());
62                assert!(stream_node.input.is_empty());
63
64                // Index the upstreams by the an internal edge ID.
65                let (upstream_fragment_id, _) = &self.upstreams[&EdgeId::Internal {
66                    link_id: stream_node.get_operator_id().as_raw_id(),
67                }];
68
69                let upstream_fragment_id = upstream_fragment_id.as_global_id();
70
71                Ok(StreamNode {
72                    node_body: Some(NodeBody::Merge(Box::new({
73                        MergeNode {
74                            upstream_fragment_id,
75                            upstream_dispatcher_type: exchange.get_strategy()?.r#type,
76                            ..Default::default()
77                        }
78                    }))),
79                    identity: "MergeExecutor".to_owned(),
80                    ..stream_node.clone()
81                })
82            }
83
84            // "Leaf" node `StreamScan`.
85            NodeBody::StreamScan(stream_scan) => {
86                let input = stream_node.get_input();
87                if stream_scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill {
88                    // CrossDbSnapshotBackfill is a special case, which doesn't have any upstream actor
89                    // and always reads from log store.
90                    assert!(input.is_empty());
91                    return Ok(stream_node.clone());
92                }
93                assert_eq!(input.len(), 2);
94
95                let merge_node = &input[0];
96                assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
97                let batch_plan_node = &input[1];
98                assert_matches!(batch_plan_node.node_body, Some(NodeBody::BatchPlan(_)));
99
100                // Index the upstreams by the an external edge ID.
101                let (upstream_fragment_id, upstream_no_shuffle_actor) = &self.upstreams
102                    [&EdgeId::UpstreamExternal {
103                        upstream_job_id: stream_scan.table_id.as_job_id(),
104                        downstream_fragment_id: self.fragment_id,
105                    }];
106
107                let is_shuffled_backfill = stream_scan.stream_scan_type
108                    == StreamScanType::ArrangementBackfill as i32
109                    || stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32;
110                if !is_shuffled_backfill {
111                    assert!(*upstream_no_shuffle_actor);
112                }
113
114                let upstream_dispatcher_type = if is_shuffled_backfill {
115                    // FIXME(kwannoel): Should the upstream dispatcher type depends on the upstream distribution?
116                    // If singleton, use `Simple` dispatcher, otherwise use `Hash` dispatcher.
117                    DispatcherType::Hash as _
118                } else {
119                    DispatcherType::NoShuffle as _
120                };
121
122                let upstream_fragment_id = upstream_fragment_id.as_global_id();
123
124                let input = vec![
125                    // Fill the merge node body with correct upstream info.
126                    StreamNode {
127                        node_body: Some(NodeBody::Merge(Box::new({
128                            MergeNode {
129                                upstream_fragment_id,
130                                upstream_dispatcher_type,
131                                ..Default::default()
132                            }
133                        }))),
134                        ..merge_node.clone()
135                    },
136                    batch_plan_node.clone(),
137                ];
138
139                Ok(StreamNode {
140                    input,
141                    ..stream_node.clone()
142                })
143            }
144
145            // "Leaf" node `CdcFilter` and `SourceBackfill`. They both `Merge` an upstream `Source`
146            // cdc_filter -> backfill -> mview
147            // source_backfill -> mview
148            NodeBody::CdcFilter(_) | NodeBody::SourceBackfill(_) => {
149                let input = stream_node.get_input();
150                assert_eq!(input.len(), 1);
151
152                let merge_node = &input[0];
153                assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
154
155                let upstream_source_id = match stream_node.get_node_body()? {
156                    NodeBody::CdcFilter(node) => node.upstream_source_id,
157                    NodeBody::SourceBackfill(node) => node.upstream_source_id,
158                    _ => unreachable!(),
159                };
160
161                // Index the upstreams by the an external edge ID.
162                let (upstream_fragment_id, upstream_is_no_shuffle) = &self.upstreams
163                    [&EdgeId::UpstreamExternal {
164                        upstream_job_id: upstream_source_id.as_share_source_job_id(),
165                        downstream_fragment_id: self.fragment_id,
166                    }];
167
168                assert!(
169                    *upstream_is_no_shuffle,
170                    "Upstream Cdc Source should be singleton. \
171                    SourceBackfill is NoShuffle 1-1 correspondence. \
172                    So they both should have only one upstream actor."
173                );
174
175                let upstream_fragment_id = upstream_fragment_id.as_global_id();
176
177                // rewrite the input
178                let input = vec![
179                    // Fill the merge node body with correct upstream info.
180                    StreamNode {
181                        node_body: Some(NodeBody::Merge(Box::new({
182                            MergeNode {
183                                upstream_fragment_id,
184                                upstream_dispatcher_type: DispatcherType::NoShuffle as _,
185                                ..Default::default()
186                            }
187                        }))),
188                        ..merge_node.clone()
189                    },
190                ];
191                Ok(StreamNode {
192                    input,
193                    ..stream_node.clone()
194                })
195            }
196
197            // For other nodes, visit the children recursively.
198            _ => {
199                let mut new_stream_node = stream_node.clone();
200                for (input, new_input) in stream_node
201                    .input
202                    .iter()
203                    .zip_eq_fast(&mut new_stream_node.input)
204                {
205                    *new_input = self.rewrite_inner(input, depth + 1)?;
206                }
207                Ok(new_stream_node)
208            }
209        }
210    }
211}
212
213/// The required changes to an existing external actor to build the graph of a streaming job.
214///
215/// For example, when we're creating an mview on an existing mview, we need to add new downstreams
216/// to the upstream actors, by adding new dispatchers.
217#[derive(Default)]
218struct UpstreamFragmentChange {
219    /// The new downstreams to be added.
220    new_downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
221}
222
223#[derive(Default)]
224struct DownstreamFragmentChange {
225    /// The new upstreams to be added (replaced), indexed by the edge id to upstream fragment.
226    /// `edge_id` -> new upstream fragment id
227    new_upstreams: HashMap<DownstreamExternalEdgeId, GlobalFragmentId>,
228}
229
230impl UpstreamFragmentChange {
231    /// Add a dispatcher to the external actor.
232    fn add_dispatcher(
233        &mut self,
234        downstream_fragment_id: GlobalFragmentId,
235        dispatch: DispatchStrategy,
236    ) {
237        self.new_downstreams
238            .try_insert(downstream_fragment_id, dispatch)
239            .unwrap();
240    }
241}
242
243impl DownstreamFragmentChange {
244    /// Add an upstream to the external actor.
245    fn add_upstream(
246        &mut self,
247        edge_id: DownstreamExternalEdgeId,
248        new_upstream_fragment_id: GlobalFragmentId,
249    ) {
250        self.new_upstreams
251            .try_insert(edge_id, new_upstream_fragment_id)
252            .unwrap();
253    }
254}
255
256#[derive(Debug)]
257struct FragmentActorBuilder {
258    fragment_id: GlobalFragmentId,
259    node: StreamNode,
260    downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
261    // edge_id -> (upstream fragment_id, whether the edge is no shuffle)
262    upstreams: HashMap<EdgeId, (GlobalFragmentId, bool)>,
263}
264
265impl FragmentActorBuilder {
266    fn new(fragment_id: GlobalFragmentId, node: StreamNode) -> Self {
267        Self {
268            fragment_id,
269            node,
270            downstreams: Default::default(),
271            upstreams: Default::default(),
272        }
273    }
274}
275
276/// The actual mutable state of building an actor graph.
277///
278/// When the fragments are visited in a topological order, actor builders will be added to this
279/// state and the scheduled locations will be added. As the building process is run on the
280/// **complete graph** which also contains the info of the existing (external) fragments, the info
281/// of them will be also recorded.
282#[derive(Default)]
283struct ActorGraphBuildStateInner {
284    /// The builders of the actors to be built.
285    fragment_actor_builders: BTreeMap<GlobalFragmentId, FragmentActorBuilder>,
286
287    /// The required changes to the external downstream fragment. See [`DownstreamFragmentChange`].
288    /// Indexed by the `fragment_id` of fragments that have updates on its downstream.
289    downstream_fragment_changes: BTreeMap<GlobalFragmentId, DownstreamFragmentChange>,
290
291    /// The required changes to the external upstream fragment. See [`UpstreamFragmentChange`].
292    /// /// Indexed by the `fragment_id` of fragments that have updates on its upstream.
293    upstream_fragment_changes: BTreeMap<GlobalFragmentId, UpstreamFragmentChange>,
294}
295
296/// The information of a fragment, used for parameter passing for `Inner::add_link`.
297struct FragmentLinkNode {
298    fragment_id: GlobalFragmentId,
299}
300
301impl ActorGraphBuildStateInner {
302    /// Add the new downstream fragment relation to a fragment.
303    ///
304    /// - If the fragment is to be built, the fragment relation will be added to the fragment actor builder.
305    /// - If the fragment is an external existing fragment, the fragment relation will be added to the external changes.
306    fn add_dispatcher(
307        &mut self,
308        fragment_id: GlobalFragmentId,
309        downstream_fragment_id: GlobalFragmentId,
310        dispatch: DispatchStrategy,
311    ) {
312        if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
313            builder
314                .downstreams
315                .try_insert(downstream_fragment_id, dispatch)
316                .unwrap();
317        } else {
318            self.upstream_fragment_changes
319                .entry(fragment_id)
320                .or_default()
321                .add_dispatcher(downstream_fragment_id, dispatch);
322        }
323    }
324
325    /// Add the new upstream for an actor.
326    ///
327    /// - If the actor is to be built, the upstream will be added to the actor builder.
328    /// - If the actor is an external actor, the upstream will be added to the external changes.
329    fn add_upstream(
330        &mut self,
331        fragment_id: GlobalFragmentId,
332        edge_id: EdgeId,
333        upstream_fragment_id: GlobalFragmentId,
334        is_no_shuffle: bool,
335    ) {
336        if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
337            builder
338                .upstreams
339                .try_insert(edge_id, (upstream_fragment_id, is_no_shuffle))
340                .unwrap();
341        } else {
342            let EdgeId::DownstreamExternal(edge_id) = edge_id else {
343                unreachable!("edge from internal to external must be `DownstreamExternal`")
344            };
345            self.downstream_fragment_changes
346                .entry(fragment_id)
347                .or_default()
348                .add_upstream(edge_id, upstream_fragment_id);
349        }
350    }
351
352    /// Add a "link" between two fragments in the graph.
353    ///
354    /// The `edge` will be transformed into the fragment relation (downstream - upstream) pair between two fragments,
355    /// based on the distribution and the dispatch strategy. They will be
356    /// finally transformed to `Dispatcher` and `Merge` nodes when building the actors.
357    ///
358    /// If there're existing (external) fragments, the info will be recorded in `upstream_fragment_changes` and `downstream_fragment_changes`,
359    /// instead of the actor builders.
360    fn add_link(
361        &mut self,
362        upstream: FragmentLinkNode,
363        downstream: FragmentLinkNode,
364        edge: &StreamFragmentEdge,
365    ) {
366        let dt = edge.dispatch_strategy.r#type();
367
368        match dt {
369            // For `NoShuffle`, make n "1-1" links between the actors.
370            DispatcherType::NoShuffle => {
371                // Create a new dispatcher just between these two actors.
372                self.add_dispatcher(
373                    upstream.fragment_id,
374                    downstream.fragment_id,
375                    edge.dispatch_strategy.clone(),
376                );
377
378                // Also record the upstream for the downstream actor.
379                self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, true);
380            }
381
382            // Otherwise, make m * n links between the actors.
383            DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
384                self.add_dispatcher(
385                    upstream.fragment_id,
386                    downstream.fragment_id,
387                    edge.dispatch_strategy.clone(),
388                );
389                self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, false);
390            }
391
392            DispatcherType::Unspecified => unreachable!(),
393        }
394    }
395}
396
397/// The mutable state of building an actor graph. See [`ActorGraphBuildStateInner`].
398struct ActorGraphBuildState {
399    /// The actual state.
400    inner: ActorGraphBuildStateInner,
401}
402
403impl ActorGraphBuildState {
404    /// Create an empty state with the given id generator.
405    fn new() -> Self {
406        Self {
407            inner: Default::default(),
408        }
409    }
410
411    /// Finish the build and return the inner state.
412    fn finish(self) -> ActorGraphBuildStateInner {
413        self.inner
414    }
415}
416
417/// The result of a built actor graph. Will be further embedded into the `Context` for building
418/// actors on the compute nodes.
419pub struct ActorGraphBuildResult {
420    /// The graph of sealed fragments, including all actors.
421    pub graph: BTreeMap<FragmentId, Fragment>,
422    /// The downstream fragments of the fragments from the new graph to be created.
423    /// Including the fragment relation to external downstream fragment.
424    pub downstream_fragment_relations: FragmentDownstreamRelation,
425
426    /// The new dispatchers to be added to the upstream mview actors. Used for MV on MV.
427    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
428
429    /// The updates to be applied to the downstream fragment merge node. Used for schema change (replace
430    /// table plan).
431    pub replace_upstream: FragmentReplaceUpstream,
432}
433
434/// [`ActorGraphBuilder`] builds the actor graph for the given complete fragment graph, based on the
435/// current cluster info and the required parallelism.
436#[derive(Debug)]
437pub struct ActorGraphBuilder {
438    /// The pre-scheduled distribution for each building fragment.
439    distributions: HashMap<GlobalFragmentId, Distribution>,
440
441    /// The complete fragment graph.
442    fragment_graph: CompleteStreamFragmentGraph,
443}
444
445impl ActorGraphBuilder {
446    /// Create a new actor graph builder with the given "complete" graph. Returns an error if the
447    /// graph is failed to be scheduled.
448    pub fn new(
449        streaming_job_id: JobId,
450        fragment_graph: CompleteStreamFragmentGraph,
451        default_parallelism: NonZeroUsize,
452    ) -> MetaResult<Self> {
453        let expected_vnode_count = fragment_graph.max_parallelism();
454
455        let scheduler =
456            schedule::Scheduler::new(streaming_job_id, default_parallelism, expected_vnode_count)?;
457
458        let distributions = scheduler.schedule(&fragment_graph)?;
459
460        // Fill the vnode count for each internal table, based on schedule result.
461        let mut fragment_graph = fragment_graph;
462        for (id, fragment) in fragment_graph.building_fragments_mut() {
463            let mut error = None;
464            let fragment_vnode_count = distributions[id].vnode_count();
465            visit_tables(fragment, |table, _| {
466                if error.is_some() {
467                    return;
468                }
469                // There are special cases where a hash-distributed fragment contains singleton
470                // internal tables, e.g., the state table of `Source` executors.
471                let vnode_count = if table.is_singleton() {
472                    if fragment_vnode_count > 1 {
473                        tracing::info!(
474                            table.name,
475                            "found singleton table in hash-distributed fragment"
476                        );
477                    }
478                    1
479                } else {
480                    fragment_vnode_count
481                };
482                match table.vnode_count_inner().value_opt() {
483                    // Vnode count of this table is not set to placeholder, meaning that we are replacing
484                    // a streaming job, and the existing state table requires a specific vnode count.
485                    // Check if it's the same with what we derived from the schedule result.
486                    //
487                    // Typically, inconsistency should not happen as we force to align the vnode count
488                    // when planning the new streaming job in the frontend.
489                    Some(required_vnode_count) if required_vnode_count != vnode_count => {
490                        error = Some(format!(
491                            "failed to align vnode count for table {}({}): required {}, but got {}",
492                            table.id, table.name, required_vnode_count, vnode_count
493                        ));
494                    }
495                    // Normal cases.
496                    _ => table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf(),
497                }
498            });
499            if let Some(error) = error {
500                bail!(error);
501            }
502        }
503
504        Ok(Self {
505            distributions,
506            fragment_graph,
507        })
508    }
509
510    /// Build a stream graph by duplicating each fragment as parallel actors. Returns
511    /// [`ActorGraphBuildResult`] that will be further used to build actors on the compute nodes.
512    pub fn generate_graph(self) -> MetaResult<ActorGraphBuildResult> {
513        // Build the actor graph and get the final state.
514        let ActorGraphBuildStateInner {
515            fragment_actor_builders,
516            downstream_fragment_changes,
517            upstream_fragment_changes,
518        } = self.build_actor_graph()?;
519
520        let mut downstream_fragment_relations: FragmentDownstreamRelation = HashMap::new();
521        // Serialize the graph into sealed fragments.
522        let graph = {
523            // As all fragments are processed, we can now `rewrite` the stream nodes where the
524            // `Exchange` and `Chain` are rewritten.
525            let mut fragment_nodes: HashMap<GlobalFragmentId, StreamNode> = HashMap::new();
526
527            for (fragment_id, builder) in fragment_actor_builders {
528                let global_fragment_id = fragment_id.as_global_id();
529                let node = builder.rewrite()?;
530                downstream_fragment_relations
531                    .try_insert(
532                        global_fragment_id,
533                        builder
534                            .downstreams
535                            .into_iter()
536                            .map(|(id, dispatch)| (id.as_global_id(), dispatch).into())
537                            .collect(),
538                    )
539                    .expect("non-duplicate");
540                fragment_nodes
541                    .try_insert(fragment_id, node)
542                    .expect("non-duplicate");
543            }
544
545            let mut graph = BTreeMap::new();
546            for (fragment_id, stream_node) in fragment_nodes {
547                let distribution = self.distributions[&fragment_id].clone();
548                let fragment =
549                    self.fragment_graph
550                        .seal_fragment(fragment_id, distribution, stream_node);
551                let fragment_id = fragment_id.as_global_id();
552                graph.insert(fragment_id, fragment);
553            }
554            graph
555        };
556
557        // Extract the new fragment relation from the external changes.
558        let upstream_fragment_downstreams = upstream_fragment_changes
559            .into_iter()
560            .map(|(fragment_id, changes)| {
561                (
562                    fragment_id.as_global_id(),
563                    changes
564                        .new_downstreams
565                        .into_iter()
566                        .map(|(downstream_fragment_id, new_dispatch)| {
567                            (downstream_fragment_id.as_global_id(), new_dispatch).into()
568                        })
569                        .collect(),
570                )
571            })
572            .collect();
573
574        // Extract the updates for merge executors from the external changes.
575        let replace_upstream = downstream_fragment_changes
576            .into_iter()
577            .map(|(fragment_id, changes)| {
578                let fragment_id = fragment_id.as_global_id();
579                (
580                    fragment_id,
581                    changes
582                        .new_upstreams
583                        .into_iter()
584                        .map(move |(edge_id, upstream_fragment_id)| {
585                            let upstream_fragment_id = upstream_fragment_id.as_global_id();
586                            let DownstreamExternalEdgeId {
587                                original_upstream_fragment_id,
588                                ..
589                            } = edge_id;
590                            (
591                                original_upstream_fragment_id.as_global_id(),
592                                upstream_fragment_id,
593                            )
594                        })
595                        .collect(),
596                )
597            })
598            .filter(|(_, fragment_changes): &(_, HashMap<_, _>)| !fragment_changes.is_empty())
599            .collect();
600
601        Ok(ActorGraphBuildResult {
602            graph,
603            downstream_fragment_relations,
604            upstream_fragment_downstreams,
605            replace_upstream,
606        })
607    }
608
609    /// Build actor graph for each fragment, using topological order.
610    fn build_actor_graph(&self) -> MetaResult<ActorGraphBuildStateInner> {
611        let mut state = ActorGraphBuildState::new();
612
613        // Use topological sort to build the graph from downstream to upstream. (The first fragment
614        // popped out from the heap will be the top-most node in plan, or the sink in stream graph.)
615        for fragment_id in self.fragment_graph.topo_order()? {
616            self.build_actor_graph_fragment(fragment_id, &mut state)?;
617        }
618
619        Ok(state.finish())
620    }
621
622    /// Build actor graph for a specific fragment.
623    fn build_actor_graph_fragment(
624        &self,
625        fragment_id: GlobalFragmentId,
626        state: &mut ActorGraphBuildState,
627    ) -> MetaResult<()> {
628        let current_fragment = self.fragment_graph.get_fragment(fragment_id);
629
630        // First, add or record the actors for the current fragment into the state.
631        match current_fragment {
632            // For building fragments, we need to generate the actor builders.
633            EitherFragment::Building(current_fragment) => {
634                let node = current_fragment.node.clone().unwrap();
635                state
636                    .inner
637                    .fragment_actor_builders
638                    .try_insert(fragment_id, FragmentActorBuilder::new(fragment_id, node))
639                    .expect("non-duplicate");
640            }
641
642            // For existing fragments, we only need to record the actor locations.
643            EitherFragment::Existing => {}
644        };
645
646        // Then, add links between the current fragment and its downstream fragments.
647        for (downstream_fragment_id, edge) in self.fragment_graph.get_downstreams(fragment_id) {
648            state.inner.add_link(
649                FragmentLinkNode { fragment_id },
650                FragmentLinkNode {
651                    fragment_id: downstream_fragment_id,
652                },
653                edge,
654            );
655        }
656
657        Ok(())
658    }
659}