risingwave_meta/stream/stream_graph/
actor.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use assert_matches::assert_matches;
18use risingwave_common::bail;
19use risingwave_common::hash::{IsSingleton, VnodeCount, VnodeCountCompat};
20use risingwave_common::util::iter_util::ZipEqFast;
21use risingwave_common::util::stream_graph_visitor::visit_tables;
22use risingwave_pb::stream_plan::stream_node::NodeBody;
23use risingwave_pb::stream_plan::{
24    DispatchStrategy, DispatcherType, MergeNode, StreamNode, StreamScanType,
25};
26
27use crate::MetaResult;
28use crate::model::{Fragment, FragmentDownstreamRelation, FragmentId, FragmentReplaceUpstream};
29use crate::stream::stream_graph::fragment::{
30    CompleteStreamFragmentGraph, DownstreamExternalEdgeId, EdgeId, EitherFragment,
31    StreamFragmentEdge,
32};
33use crate::stream::stream_graph::id::GlobalFragmentId;
34use crate::stream::stream_graph::schedule;
35use crate::stream::stream_graph::schedule::Distribution;
36
37impl FragmentActorBuilder {
38    /// Rewrite the actor body.
39    ///
40    /// During this process, the following things will be done:
41    /// 1. Replace the logical `Exchange` in node's input with `Merge`, which can be executed on the
42    ///    compute nodes.
43    fn rewrite(&self) -> MetaResult<StreamNode> {
44        self.rewrite_inner(&self.node, 0)
45    }
46
47    fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult<StreamNode> {
48        match stream_node.get_node_body()? {
49            // Leaf node `Exchange`.
50            NodeBody::Exchange(exchange) => {
51                // The exchange node should always be the bottom of the plan node. If we find one
52                // when the depth is 0, it means that the plan node is not well-formed.
53                if depth == 0 {
54                    bail!(
55                        "there should be no ExchangeNode on the top of the plan node: {:#?}",
56                        stream_node
57                    )
58                }
59                assert!(!stream_node.get_fields().is_empty());
60                assert!(stream_node.input.is_empty());
61
62                // Index the upstreams by the an internal edge ID.
63                let (upstream_fragment_id, _) = &self.upstreams[&EdgeId::Internal {
64                    link_id: stream_node.get_operator_id().as_raw_id(),
65                }];
66
67                let upstream_fragment_id = upstream_fragment_id.as_global_id();
68
69                Ok(StreamNode {
70                    node_body: Some(NodeBody::Merge(Box::new({
71                        MergeNode {
72                            upstream_fragment_id,
73                            upstream_dispatcher_type: exchange.get_strategy()?.r#type,
74                            ..Default::default()
75                        }
76                    }))),
77                    identity: "MergeExecutor".to_owned(),
78                    ..stream_node.clone()
79                })
80            }
81
82            // "Leaf" node `StreamScan`.
83            NodeBody::StreamScan(stream_scan) => {
84                let input = stream_node.get_input();
85                if stream_scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill {
86                    // CrossDbSnapshotBackfill is a special case, which doesn't have any upstream actor
87                    // and always reads from log store.
88                    assert!(input.is_empty());
89                    return Ok(stream_node.clone());
90                }
91                assert_eq!(input.len(), 2);
92
93                let merge_node = &input[0];
94                assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
95                let batch_plan_node = &input[1];
96                assert_matches!(batch_plan_node.node_body, Some(NodeBody::BatchPlan(_)));
97
98                // Index the upstreams by the an external edge ID.
99                let (upstream_fragment_id, upstream_no_shuffle_actor) = &self.upstreams
100                    [&EdgeId::UpstreamExternal {
101                        upstream_job_id: stream_scan.table_id.as_job_id(),
102                        downstream_fragment_id: self.fragment_id,
103                    }];
104
105                let is_shuffled_backfill = stream_scan.stream_scan_type
106                    == StreamScanType::ArrangementBackfill as i32
107                    || stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32;
108                if !is_shuffled_backfill {
109                    assert!(*upstream_no_shuffle_actor);
110                }
111
112                let upstream_dispatcher_type = if is_shuffled_backfill {
113                    // FIXME(kwannoel): Should the upstream dispatcher type depends on the upstream distribution?
114                    // If singleton, use `Simple` dispatcher, otherwise use `Hash` dispatcher.
115                    DispatcherType::Hash as _
116                } else {
117                    DispatcherType::NoShuffle as _
118                };
119
120                let upstream_fragment_id = upstream_fragment_id.as_global_id();
121
122                let input = vec![
123                    // Fill the merge node body with correct upstream info.
124                    StreamNode {
125                        node_body: Some(NodeBody::Merge(Box::new({
126                            MergeNode {
127                                upstream_fragment_id,
128                                upstream_dispatcher_type,
129                                ..Default::default()
130                            }
131                        }))),
132                        ..merge_node.clone()
133                    },
134                    batch_plan_node.clone(),
135                ];
136
137                Ok(StreamNode {
138                    input,
139                    ..stream_node.clone()
140                })
141            }
142
143            // "Leaf" node `CdcFilter` and `SourceBackfill`. They both `Merge` an upstream `Source`
144            // cdc_filter -> backfill -> mview
145            // source_backfill -> mview
146            NodeBody::CdcFilter(_) | NodeBody::SourceBackfill(_) => {
147                let input = stream_node.get_input();
148                assert_eq!(input.len(), 1);
149
150                let merge_node = &input[0];
151                assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
152
153                let upstream_source_id = match stream_node.get_node_body()? {
154                    NodeBody::CdcFilter(node) => node.upstream_source_id,
155                    NodeBody::SourceBackfill(node) => node.upstream_source_id,
156                    _ => unreachable!(),
157                };
158
159                // Index the upstreams by the an external edge ID.
160                let (upstream_fragment_id, upstream_is_no_shuffle) = &self.upstreams
161                    [&EdgeId::UpstreamExternal {
162                        upstream_job_id: upstream_source_id.as_share_source_job_id(),
163                        downstream_fragment_id: self.fragment_id,
164                    }];
165
166                assert!(
167                    *upstream_is_no_shuffle,
168                    "Upstream Cdc Source should be singleton. \
169                    SourceBackfill is NoShuffle 1-1 correspondence. \
170                    So they both should have only one upstream actor."
171                );
172
173                let upstream_fragment_id = upstream_fragment_id.as_global_id();
174
175                // rewrite the input
176                let input = vec![
177                    // Fill the merge node body with correct upstream info.
178                    StreamNode {
179                        node_body: Some(NodeBody::Merge(Box::new({
180                            MergeNode {
181                                upstream_fragment_id,
182                                upstream_dispatcher_type: DispatcherType::NoShuffle as _,
183                                ..Default::default()
184                            }
185                        }))),
186                        ..merge_node.clone()
187                    },
188                ];
189                Ok(StreamNode {
190                    input,
191                    ..stream_node.clone()
192                })
193            }
194
195            // For other nodes, visit the children recursively.
196            _ => {
197                let mut new_stream_node = stream_node.clone();
198                for (input, new_input) in stream_node
199                    .input
200                    .iter()
201                    .zip_eq_fast(&mut new_stream_node.input)
202                {
203                    *new_input = self.rewrite_inner(input, depth + 1)?;
204                }
205                Ok(new_stream_node)
206            }
207        }
208    }
209}
210
211/// The required changes to an existing external actor to build the graph of a streaming job.
212///
213/// For example, when we're creating an mview on an existing mview, we need to add new downstreams
214/// to the upstream actors, by adding new dispatchers.
215#[derive(Default)]
216struct UpstreamFragmentChange {
217    /// The new downstreams to be added.
218    new_downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
219}
220
221#[derive(Default)]
222struct DownstreamFragmentChange {
223    /// The new upstreams to be added (replaced), indexed by the edge id to upstream fragment.
224    /// `edge_id` -> new upstream fragment id
225    new_upstreams: HashMap<DownstreamExternalEdgeId, GlobalFragmentId>,
226}
227
228impl UpstreamFragmentChange {
229    /// Add a dispatcher to the external actor.
230    fn add_dispatcher(
231        &mut self,
232        downstream_fragment_id: GlobalFragmentId,
233        dispatch: DispatchStrategy,
234    ) {
235        self.new_downstreams
236            .try_insert(downstream_fragment_id, dispatch)
237            .unwrap();
238    }
239}
240
241impl DownstreamFragmentChange {
242    /// Add an upstream to the external actor.
243    fn add_upstream(
244        &mut self,
245        edge_id: DownstreamExternalEdgeId,
246        new_upstream_fragment_id: GlobalFragmentId,
247    ) {
248        self.new_upstreams
249            .try_insert(edge_id, new_upstream_fragment_id)
250            .unwrap();
251    }
252}
253
254#[derive(Debug)]
255struct FragmentActorBuilder {
256    fragment_id: GlobalFragmentId,
257    node: StreamNode,
258    downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
259    // edge_id -> (upstream fragment_id, whether the edge is no shuffle)
260    upstreams: HashMap<EdgeId, (GlobalFragmentId, bool)>,
261}
262
263impl FragmentActorBuilder {
264    fn new(fragment_id: GlobalFragmentId, node: StreamNode) -> Self {
265        Self {
266            fragment_id,
267            node,
268            downstreams: Default::default(),
269            upstreams: Default::default(),
270        }
271    }
272}
273
274/// The actual mutable state of building an actor graph.
275///
276/// When the fragments are visited in a topological order, actor builders will be added to this
277/// state and the scheduled locations will be added. As the building process is run on the
278/// **complete graph** which also contains the info of the existing (external) fragments, the info
279/// of them will be also recorded.
280#[derive(Default)]
281struct ActorGraphBuildStateInner {
282    /// The builders of the actors to be built.
283    fragment_actor_builders: BTreeMap<GlobalFragmentId, FragmentActorBuilder>,
284
285    /// The required changes to the external downstream fragment. See [`DownstreamFragmentChange`].
286    /// Indexed by the `fragment_id` of fragments that have updates on its downstream.
287    downstream_fragment_changes: BTreeMap<GlobalFragmentId, DownstreamFragmentChange>,
288
289    /// The required changes to the external upstream fragment. See [`UpstreamFragmentChange`].
290    /// /// Indexed by the `fragment_id` of fragments that have updates on its upstream.
291    upstream_fragment_changes: BTreeMap<GlobalFragmentId, UpstreamFragmentChange>,
292}
293
294/// The information of a fragment, used for parameter passing for `Inner::add_link`.
295struct FragmentLinkNode {
296    fragment_id: GlobalFragmentId,
297}
298
299impl ActorGraphBuildStateInner {
300    /// Add the new downstream fragment relation to a fragment.
301    ///
302    /// - If the fragment is to be built, the fragment relation will be added to the fragment actor builder.
303    /// - If the fragment is an external existing fragment, the fragment relation will be added to the external changes.
304    fn add_dispatcher(
305        &mut self,
306        fragment_id: GlobalFragmentId,
307        downstream_fragment_id: GlobalFragmentId,
308        dispatch: DispatchStrategy,
309    ) {
310        if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
311            builder
312                .downstreams
313                .try_insert(downstream_fragment_id, dispatch)
314                .unwrap();
315        } else {
316            self.upstream_fragment_changes
317                .entry(fragment_id)
318                .or_default()
319                .add_dispatcher(downstream_fragment_id, dispatch);
320        }
321    }
322
323    /// Add the new upstream for an actor.
324    ///
325    /// - If the actor is to be built, the upstream will be added to the actor builder.
326    /// - If the actor is an external actor, the upstream will be added to the external changes.
327    fn add_upstream(
328        &mut self,
329        fragment_id: GlobalFragmentId,
330        edge_id: EdgeId,
331        upstream_fragment_id: GlobalFragmentId,
332        is_no_shuffle: bool,
333    ) {
334        if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
335            builder
336                .upstreams
337                .try_insert(edge_id, (upstream_fragment_id, is_no_shuffle))
338                .unwrap();
339        } else {
340            let EdgeId::DownstreamExternal(edge_id) = edge_id else {
341                unreachable!("edge from internal to external must be `DownstreamExternal`")
342            };
343            self.downstream_fragment_changes
344                .entry(fragment_id)
345                .or_default()
346                .add_upstream(edge_id, upstream_fragment_id);
347        }
348    }
349
350    /// Add a "link" between two fragments in the graph.
351    ///
352    /// The `edge` will be transformed into the fragment relation (downstream - upstream) pair between two fragments,
353    /// based on the distribution and the dispatch strategy. They will be
354    /// finally transformed to `Dispatcher` and `Merge` nodes when building the actors.
355    ///
356    /// If there're existing (external) fragments, the info will be recorded in `upstream_fragment_changes` and `downstream_fragment_changes`,
357    /// instead of the actor builders.
358    fn add_link(
359        &mut self,
360        upstream: FragmentLinkNode,
361        downstream: FragmentLinkNode,
362        edge: &StreamFragmentEdge,
363    ) {
364        let dt = edge.dispatch_strategy.r#type();
365
366        match dt {
367            // For `NoShuffle`, make n "1-1" links between the actors.
368            DispatcherType::NoShuffle => {
369                // Create a new dispatcher just between these two actors.
370                self.add_dispatcher(
371                    upstream.fragment_id,
372                    downstream.fragment_id,
373                    edge.dispatch_strategy.clone(),
374                );
375
376                // Also record the upstream for the downstream actor.
377                self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, true);
378            }
379
380            // Otherwise, make m * n links between the actors.
381            DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
382                self.add_dispatcher(
383                    upstream.fragment_id,
384                    downstream.fragment_id,
385                    edge.dispatch_strategy.clone(),
386                );
387                self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, false);
388            }
389
390            DispatcherType::Unspecified => unreachable!(),
391        }
392    }
393}
394
395/// The mutable state of building an actor graph. See [`ActorGraphBuildStateInner`].
396struct ActorGraphBuildState {
397    /// The actual state.
398    inner: ActorGraphBuildStateInner,
399}
400
401impl ActorGraphBuildState {
402    /// Create an empty state with the given id generator.
403    fn new() -> Self {
404        Self {
405            inner: Default::default(),
406        }
407    }
408
409    /// Finish the build and return the inner state.
410    fn finish(self) -> ActorGraphBuildStateInner {
411        self.inner
412    }
413}
414
415/// The result of a built actor graph. Will be further embedded into the `Context` for building
416/// actors on the compute nodes.
417pub struct ActorGraphBuildResult {
418    /// The graph of sealed fragments, including all actors.
419    pub graph: BTreeMap<FragmentId, Fragment>,
420    /// The downstream fragments of the fragments from the new graph to be created.
421    /// Including the fragment relation to external downstream fragment.
422    pub downstream_fragment_relations: FragmentDownstreamRelation,
423
424    /// The new dispatchers to be added to the upstream mview actors. Used for MV on MV.
425    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
426
427    /// The updates to be applied to the downstream fragment merge node. Used for schema change (replace
428    /// table plan).
429    pub replace_upstream: FragmentReplaceUpstream,
430}
431
432/// [`ActorGraphBuilder`] builds the actor graph for the given complete fragment graph, based on the
433/// current cluster info and the required parallelism.
434#[derive(Debug)]
435pub struct ActorGraphBuilder {
436    /// The pre-scheduled distribution for each building fragment.
437    distributions: HashMap<GlobalFragmentId, Distribution>,
438
439    /// The complete fragment graph.
440    fragment_graph: CompleteStreamFragmentGraph,
441}
442
443impl ActorGraphBuilder {
444    /// Create a new actor graph builder with the given "complete" graph. Returns an error if the
445    /// graph is failed to be scheduled.
446    pub fn new(fragment_graph: CompleteStreamFragmentGraph) -> MetaResult<Self> {
447        let expected_vnode_count = fragment_graph.max_parallelism();
448        let scheduler = schedule::Scheduler::new(expected_vnode_count)?;
449
450        let distributions = scheduler.schedule(&fragment_graph)?;
451
452        // Fill the vnode count for each internal table, based on schedule result.
453        let mut fragment_graph = fragment_graph;
454        for (id, fragment) in fragment_graph.building_fragments_mut() {
455            let mut error = None;
456            let fragment_vnode_count = distributions[id].vnode_count();
457            visit_tables(fragment, |table, _| {
458                if error.is_some() {
459                    return;
460                }
461                // There are special cases where a hash-distributed fragment contains singleton
462                // internal tables, e.g., the state table of `Source` executors.
463                let vnode_count = if table.is_singleton() {
464                    if fragment_vnode_count > 1 {
465                        tracing::info!(
466                            table.name,
467                            "found singleton table in hash-distributed fragment"
468                        );
469                    }
470                    1
471                } else {
472                    fragment_vnode_count
473                };
474                match table.vnode_count_inner().value_opt() {
475                    // Vnode count of this table is not set to placeholder, meaning that we are replacing
476                    // a streaming job, and the existing state table requires a specific vnode count.
477                    // Check if it's the same with what we derived from the schedule result.
478                    //
479                    // Typically, inconsistency should not happen as we force to align the vnode count
480                    // when planning the new streaming job in the frontend.
481                    Some(required_vnode_count) if required_vnode_count != vnode_count => {
482                        error = Some(format!(
483                            "failed to align vnode count for table {}({}): required {}, but got {}",
484                            table.id, table.name, required_vnode_count, vnode_count
485                        ));
486                    }
487                    // Normal cases.
488                    _ => table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf(),
489                }
490            });
491            if let Some(error) = error {
492                bail!(error);
493            }
494        }
495
496        Ok(Self {
497            distributions,
498            fragment_graph,
499        })
500    }
501
502    /// Build a stream graph by duplicating each fragment as parallel actors. Returns
503    /// [`ActorGraphBuildResult`] that will be further used to build actors on the compute nodes.
504    pub fn generate_graph(self) -> MetaResult<ActorGraphBuildResult> {
505        // Build the actor graph and get the final state.
506        let ActorGraphBuildStateInner {
507            fragment_actor_builders,
508            downstream_fragment_changes,
509            upstream_fragment_changes,
510        } = self.build_actor_graph()?;
511
512        let mut downstream_fragment_relations: FragmentDownstreamRelation = HashMap::new();
513        // Serialize the graph into sealed fragments.
514        let graph = {
515            // As all fragments are processed, we can now `rewrite` the stream nodes where the
516            // `Exchange` and `Chain` are rewritten.
517            let mut fragment_nodes: HashMap<GlobalFragmentId, StreamNode> = HashMap::new();
518
519            for (fragment_id, builder) in fragment_actor_builders {
520                let global_fragment_id = fragment_id.as_global_id();
521                let node = builder.rewrite()?;
522                downstream_fragment_relations
523                    .try_insert(
524                        global_fragment_id,
525                        builder
526                            .downstreams
527                            .into_iter()
528                            .map(|(id, dispatch)| (id.as_global_id(), dispatch).into())
529                            .collect(),
530                    )
531                    .expect("non-duplicate");
532                fragment_nodes
533                    .try_insert(fragment_id, node)
534                    .expect("non-duplicate");
535            }
536
537            let mut graph = BTreeMap::new();
538            for (fragment_id, stream_node) in fragment_nodes {
539                let distribution = self.distributions[&fragment_id].clone();
540                let fragment =
541                    self.fragment_graph
542                        .seal_fragment(fragment_id, distribution, stream_node);
543                let fragment_id = fragment_id.as_global_id();
544                graph.insert(fragment_id, fragment);
545            }
546            graph
547        };
548
549        // Extract the new fragment relation from the external changes.
550        let upstream_fragment_downstreams = upstream_fragment_changes
551            .into_iter()
552            .map(|(fragment_id, changes)| {
553                (
554                    fragment_id.as_global_id(),
555                    changes
556                        .new_downstreams
557                        .into_iter()
558                        .map(|(downstream_fragment_id, new_dispatch)| {
559                            (downstream_fragment_id.as_global_id(), new_dispatch).into()
560                        })
561                        .collect(),
562                )
563            })
564            .collect();
565
566        // Extract the updates for merge executors from the external changes.
567        let replace_upstream = downstream_fragment_changes
568            .into_iter()
569            .map(|(fragment_id, changes)| {
570                let fragment_id = fragment_id.as_global_id();
571                (
572                    fragment_id,
573                    changes
574                        .new_upstreams
575                        .into_iter()
576                        .map(move |(edge_id, upstream_fragment_id)| {
577                            let upstream_fragment_id = upstream_fragment_id.as_global_id();
578                            let DownstreamExternalEdgeId {
579                                original_upstream_fragment_id,
580                                ..
581                            } = edge_id;
582                            (
583                                original_upstream_fragment_id.as_global_id(),
584                                upstream_fragment_id,
585                            )
586                        })
587                        .collect(),
588                )
589            })
590            .filter(|(_, fragment_changes): &(_, HashMap<_, _>)| !fragment_changes.is_empty())
591            .collect();
592
593        Ok(ActorGraphBuildResult {
594            graph,
595            downstream_fragment_relations,
596            upstream_fragment_downstreams,
597            replace_upstream,
598        })
599    }
600
601    /// Build actor graph for each fragment, using topological order.
602    fn build_actor_graph(&self) -> MetaResult<ActorGraphBuildStateInner> {
603        let mut state = ActorGraphBuildState::new();
604
605        // Use topological sort to build the graph from downstream to upstream. (The first fragment
606        // popped out from the heap will be the top-most node in plan, or the sink in stream graph.)
607        for fragment_id in self.fragment_graph.topo_order()? {
608            self.build_actor_graph_fragment(fragment_id, &mut state)?;
609        }
610
611        Ok(state.finish())
612    }
613
614    /// Build actor graph for a specific fragment.
615    fn build_actor_graph_fragment(
616        &self,
617        fragment_id: GlobalFragmentId,
618        state: &mut ActorGraphBuildState,
619    ) -> MetaResult<()> {
620        let current_fragment = self.fragment_graph.get_fragment(fragment_id);
621
622        // First, add or record the actors for the current fragment into the state.
623        match current_fragment {
624            // For building fragments, we need to generate the actor builders.
625            EitherFragment::Building(current_fragment) => {
626                let node = current_fragment.node.clone().unwrap();
627                state
628                    .inner
629                    .fragment_actor_builders
630                    .try_insert(fragment_id, FragmentActorBuilder::new(fragment_id, node))
631                    .expect("non-duplicate");
632            }
633
634            // For existing fragments, we only need to record the actor locations.
635            EitherFragment::Existing => {}
636        };
637
638        // Then, add links between the current fragment and its downstream fragments.
639        for (downstream_fragment_id, edge) in self.fragment_graph.get_downstreams(fragment_id) {
640            state.inner.add_link(
641                FragmentLinkNode { fragment_id },
642                FragmentLinkNode {
643                    fragment_id: downstream_fragment_id,
644                },
645                edge,
646            );
647        }
648
649        Ok(())
650    }
651}