risingwave_meta/barrier/
rpc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::error::Error;
19use std::fmt::{Debug, Formatter};
20use std::future::poll_fn;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::StreamExt;
27use futures::future::join_all;
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, TableId};
30use risingwave_common::util::epoch::Epoch;
31use risingwave_common::util::tracing::TracingContext;
32use risingwave_connector::source::SplitImpl;
33use risingwave_meta_model::WorkerId;
34use risingwave_pb::common::{HostAddress, WorkerNode};
35use risingwave_pb::hummock::HummockVersionStats;
36use risingwave_pb::stream_plan::barrier_mutation::Mutation;
37use risingwave_pb::stream_plan::{
38    AddMutation, Barrier, BarrierMutation, FragmentTypeFlag, SubscriptionUpstreamInfo,
39};
40use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
41use risingwave_pb::stream_service::inject_barrier_request::{
42    BuildActorInfo, FragmentBuildActorInfo,
43};
44use risingwave_pb::stream_service::streaming_control_stream_request::{
45    CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
46    RemovePartialGraphRequest, ResetDatabaseRequest,
47};
48use risingwave_pb::stream_service::{
49    BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
50    streaming_control_stream_request, streaming_control_stream_response,
51};
52use risingwave_rpc_client::StreamingControlHandle;
53use thiserror_ext::AsReport;
54use tokio::time::sleep;
55use tokio_retry::strategy::ExponentialBackoff;
56use tracing::{debug, error, info, warn};
57use uuid::Uuid;
58
59use super::{BarrierKind, Command, InflightSubscriptionInfo, TracedEpoch};
60use crate::barrier::checkpoint::{
61    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
62};
63use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
64use crate::barrier::edge_builder::FragmentEdgeBuildResult;
65use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo};
66use crate::barrier::progress::CreateMviewProgressTracker;
67use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
68use crate::controller::fragment::InflightFragmentInfo;
69use crate::manager::MetaSrvEnv;
70use crate::model::{ActorId, StreamActor, StreamJobActorsToCreate, StreamJobFragments};
71use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
72use crate::{MetaError, MetaResult};
73
74fn to_partial_graph_id(job_id: Option<TableId>) -> u32 {
75    job_id
76        .map(|table| {
77            assert_ne!(table.table_id, u32::MAX);
78            table.table_id
79        })
80        .unwrap_or(u32::MAX)
81}
82
83pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<TableId> {
84    if partial_graph_id == u32::MAX {
85        None
86    } else {
87        Some(TableId::new(partial_graph_id))
88    }
89}
90
91struct ControlStreamNode {
92    worker_id: WorkerId,
93    host: HostAddress,
94    handle: StreamingControlHandle,
95}
96
97pub(super) struct ControlStreamManager {
98    connected_nodes: HashMap<WorkerId, ControlStreamNode>,
99    workers: HashMap<WorkerId, WorkerNode>,
100    env: MetaSrvEnv,
101}
102
103impl ControlStreamManager {
104    pub(super) fn new(env: MetaSrvEnv) -> Self {
105        Self {
106            connected_nodes: Default::default(),
107            workers: Default::default(),
108            env,
109        }
110    }
111
112    pub(super) fn is_connected(&self, worker_id: WorkerId) -> bool {
113        self.connected_nodes.contains_key(&worker_id)
114    }
115
116    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
117        self.workers[&worker_id].host.clone().unwrap()
118    }
119
120    pub(super) async fn try_reconnect_worker(
121        &mut self,
122        worker_id: WorkerId,
123        inflight_infos: impl Iterator<
124            Item = (
125                DatabaseId,
126                &InflightSubscriptionInfo,
127                impl Iterator<Item = TableId>,
128            ),
129        >,
130        term_id: String,
131        context: &impl GlobalBarrierWorkerContext,
132    ) {
133        if self.connected_nodes.contains_key(&worker_id) {
134            warn!(worker_id, "node already connected");
135            return;
136        }
137        let node = &self.workers[&worker_id];
138        let node_host = node.host.as_ref().unwrap();
139
140        let init_request = Self::collect_init_request(inflight_infos, term_id);
141        match context.new_control_stream(node, &init_request).await {
142            Ok(handle) => {
143                assert!(
144                    self.connected_nodes
145                        .insert(
146                            worker_id,
147                            ControlStreamNode {
148                                worker_id,
149                                host: node.host.clone().unwrap(),
150                                handle,
151                            }
152                        )
153                        .is_none()
154                );
155                info!(?node_host, "add control stream worker");
156            }
157            Err(e) => {
158                error!(err = %e.as_report(), ?node_host, "fail to create worker node");
159            }
160        }
161    }
162
163    pub(super) async fn add_worker(
164        &mut self,
165        node: WorkerNode,
166        inflight_infos: impl Iterator<
167            Item = (
168                DatabaseId,
169                &InflightSubscriptionInfo,
170                impl Iterator<Item = TableId>,
171            ),
172        >,
173        term_id: String,
174        context: &impl GlobalBarrierWorkerContext,
175    ) {
176        let node_id = node.id as WorkerId;
177        let node = match self.workers.entry(node_id) {
178            Entry::Occupied(entry) => {
179                let entry = entry.into_mut();
180                assert_eq!(entry.host, node.host);
181                warn!(id = node.id, host = ?node.host, "node already exists");
182                &*entry
183            }
184            Entry::Vacant(entry) => &*entry.insert(node),
185        };
186        if self.connected_nodes.contains_key(&node_id) {
187            warn!(id = node.id, host = ?node.host, "new node already connected");
188            return;
189        }
190        let node_host = node.host.clone().unwrap();
191        let mut backoff = ExponentialBackoff::from_millis(100)
192            .max_delay(Duration::from_secs(3))
193            .factor(5);
194        let init_request = Self::collect_init_request(inflight_infos, term_id);
195        const MAX_RETRY: usize = 5;
196        for i in 1..=MAX_RETRY {
197            match context.new_control_stream(node, &init_request).await {
198                Ok(handle) => {
199                    assert!(
200                        self.connected_nodes
201                            .insert(
202                                node_id,
203                                ControlStreamNode {
204                                    worker_id: node.id as _,
205                                    host: node.host.clone().unwrap(),
206                                    handle,
207                                }
208                            )
209                            .is_none()
210                    );
211                    info!(?node_host, "add control stream worker");
212                    return;
213                }
214                Err(e) => {
215                    // It may happen that the dns information of newly registered worker node
216                    // has not been propagated to the meta node and cause error. Wait for a while and retry
217                    let delay = backoff.next().unwrap();
218                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
219                    sleep(delay).await;
220                }
221            }
222        }
223        error!(?node_host, "fail to create worker node after retry");
224    }
225
226    pub(super) async fn reset(
227        &mut self,
228        nodes: &HashMap<WorkerId, WorkerNode>,
229        term_id: String,
230        context: &impl GlobalBarrierWorkerContext,
231    ) -> HashSet<WorkerId> {
232        let init_request = PbInitRequest {
233            databases: vec![],
234            term_id,
235        };
236        let init_request = &init_request;
237        self.workers = nodes.clone();
238        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async move {
239            let result = context.new_control_stream(node, init_request).await;
240            (*worker_id, node.clone(), result)
241        }))
242        .await;
243        self.connected_nodes.clear();
244        let mut failed_workers = HashSet::new();
245        for (worker_id, node, result) in nodes {
246            match result {
247                Ok(handle) => {
248                    assert!(
249                        self.connected_nodes
250                            .insert(
251                                worker_id,
252                                ControlStreamNode {
253                                    worker_id: node.id as _,
254                                    host: node.host.clone().unwrap(),
255                                    handle
256                                }
257                            )
258                            .is_none()
259                    );
260                }
261                Err(e) => {
262                    failed_workers.insert(worker_id);
263                    warn!(
264                        e = %e.as_report(),
265                        worker_id,
266                        ?node,
267                        "failed to connect to node"
268                    )
269                }
270            }
271        }
272
273        failed_workers
274    }
275
276    /// Clear all nodes and response streams in the manager.
277    pub(super) fn clear(&mut self) {
278        *self = Self::new(self.env.clone());
279    }
280
281    fn poll_next_response(
282        &mut self,
283        cx: &mut Context<'_>,
284    ) -> Poll<(
285        WorkerId,
286        MetaResult<streaming_control_stream_response::Response>,
287    )> {
288        if self.connected_nodes.is_empty() {
289            return Poll::Pending;
290        }
291        let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
292        {
293            for (worker_id, node) in &mut self.connected_nodes {
294                match node.handle.response_stream.poll_next_unpin(cx) {
295                    Poll::Ready(result) => {
296                        poll_result = Poll::Ready((
297                            *worker_id,
298                            result
299                                .ok_or_else(|| anyhow!("end of stream").into())
300                                .and_then(|result| {
301                                    result.map_err(Into::<MetaError>::into).and_then(|resp| {
302                                        match resp
303                                            .response
304                                            .ok_or_else(||anyhow!("empty response"))?
305                                        {
306                                            streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!(
307                                                "worker node {worker_id} is shutting down"
308                                            )
309                                            .into()),
310                                            streaming_control_stream_response::Response::Init(_) => {
311                                                // This arm should be unreachable.
312                                                Err(anyhow!("get unexpected init response").into())
313                                            }
314                                            resp => Ok(resp),
315                                        }
316                                    })
317                                })
318                        ));
319                        break;
320                    }
321                    Poll::Pending => {
322                        continue;
323                    }
324                }
325            }
326        };
327
328        if let Poll::Ready((worker_id, Err(err))) = &poll_result {
329            let node = self
330                .connected_nodes
331                .remove(worker_id)
332                .expect("should exist when get shutdown resp");
333            warn!(worker_id = node.worker_id, host = ?node.host, err = %err.as_report(), "get error from response stream");
334        }
335
336        poll_result
337    }
338
339    pub(super) async fn next_response(
340        &mut self,
341    ) -> (
342        WorkerId,
343        MetaResult<streaming_control_stream_response::Response>,
344    ) {
345        poll_fn(|cx| self.poll_next_response(cx)).await
346    }
347
348    fn collect_init_request(
349        initial_inflight_infos: impl Iterator<
350            Item = (
351                DatabaseId,
352                &InflightSubscriptionInfo,
353                impl Iterator<Item = TableId>,
354            ),
355        >,
356        term_id: String,
357    ) -> PbInitRequest {
358        PbInitRequest {
359            databases: initial_inflight_infos
360                .map(|(database_id, subscriptions, creating_job_ids)| {
361                    let mut graphs = vec![PbInitialPartialGraph {
362                        partial_graph_id: to_partial_graph_id(None),
363                        subscriptions: subscriptions.into_iter().collect_vec(),
364                    }];
365                    graphs.extend(creating_job_ids.map(|job_id| PbInitialPartialGraph {
366                        partial_graph_id: to_partial_graph_id(Some(job_id)),
367                        subscriptions: vec![],
368                    }));
369                    PbDatabaseInitialPartialGraph {
370                        database_id: database_id.database_id,
371                        graphs,
372                    }
373                })
374                .collect(),
375            term_id,
376        }
377    }
378}
379
380pub(super) struct DatabaseInitialBarrierCollector {
381    database_id: DatabaseId,
382    node_to_collect: NodeToCollect,
383    database_state: BarrierWorkerState,
384    create_mview_tracker: CreateMviewProgressTracker,
385    creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
386    committed_epoch: u64,
387}
388
389impl Debug for DatabaseInitialBarrierCollector {
390    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
391        f.debug_struct("DatabaseInitialBarrierCollector")
392            .field("database_id", &self.database_id)
393            .field("node_to_collect", &self.node_to_collect)
394            .finish()
395    }
396}
397
398impl DatabaseInitialBarrierCollector {
399    pub(super) fn is_collected(&self) -> bool {
400        self.node_to_collect.is_empty()
401            && self
402                .creating_streaming_job_controls
403                .values()
404                .all(|job| job.is_empty())
405    }
406
407    pub(super) fn database_state(
408        &self,
409    ) -> (
410        &BarrierWorkerState,
411        &HashMap<TableId, CreatingStreamingJobControl>,
412    ) {
413        (&self.database_state, &self.creating_streaming_job_controls)
414    }
415
416    pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
417        assert_eq!(self.database_id.database_id, resp.database_id);
418        if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
419            self.creating_streaming_job_controls
420                .get_mut(&creating_job_id)
421                .expect("should exist")
422                .collect(resp);
423        } else {
424            assert_eq!(resp.epoch, self.committed_epoch);
425            assert!(
426                self.node_to_collect
427                    .remove(&(resp.worker_id as _))
428                    .is_some()
429            );
430        }
431    }
432
433    pub(super) fn finish(self) -> DatabaseCheckpointControl {
434        assert!(self.is_collected());
435        DatabaseCheckpointControl::recovery(
436            self.database_id,
437            self.create_mview_tracker,
438            self.database_state,
439            self.committed_epoch,
440            self.creating_streaming_job_controls,
441        )
442    }
443
444    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
445        is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
446            && self
447                .creating_streaming_job_controls
448                .values_mut()
449                .all(|job| job.is_valid_after_worker_err(worker_id))
450    }
451}
452
453impl ControlStreamManager {
454    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
455    #[expect(clippy::too_many_arguments)]
456    pub(super) fn inject_database_initial_barrier(
457        &mut self,
458        database_id: DatabaseId,
459        jobs: HashMap<TableId, InflightStreamingJobInfo>,
460        state_table_committed_epochs: &mut HashMap<TableId, u64>,
461        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
462        edges: &mut FragmentEdgeBuildResult,
463        stream_actors: &HashMap<ActorId, StreamActor>,
464        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
465        background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
466        mut subscription_info: InflightSubscriptionInfo,
467        is_paused: bool,
468        hummock_version_stats: &HummockVersionStats,
469    ) -> MetaResult<DatabaseInitialBarrierCollector> {
470        self.add_partial_graph(database_id, None);
471        let source_split_assignments = jobs
472            .values()
473            .flat_map(|job| job.fragment_infos())
474            .flat_map(|info| info.actors.keys())
475            .filter_map(|actor_id| {
476                let actor_id = *actor_id as ActorId;
477                source_splits
478                    .remove(&actor_id)
479                    .map(|splits| (actor_id, splits))
480            })
481            .collect();
482        let mutation = Mutation::Add(AddMutation {
483            // Actors built during recovery is not treated as newly added actors.
484            actor_dispatchers: Default::default(),
485            added_actors: Default::default(),
486            actor_splits: build_actor_connector_splits(&source_split_assignments),
487            pause: is_paused,
488            subscriptions_to_add: Default::default(),
489            // TODO(kwannoel): recover using backfill order plan
490            backfill_nodes_to_pause: Default::default(),
491        });
492
493        fn resolve_jobs_committed_epoch(
494            state_table_committed_epochs: &mut HashMap<TableId, u64>,
495            jobs: impl IntoIterator<Item = &InflightStreamingJobInfo>,
496        ) -> u64 {
497            let mut epochs = jobs
498                .into_iter()
499                .flat_map(InflightStreamingJobInfo::existing_table_ids)
500                .map(|table_id| {
501                    (
502                        table_id,
503                        state_table_committed_epochs
504                            .remove(&table_id)
505                            .expect("should exist"),
506                    )
507                });
508            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
509            for (table_id, epoch) in epochs {
510                assert_eq!(
511                    prev_epoch, epoch,
512                    "{} has different committed epoch to {}",
513                    first_table_id, table_id
514                );
515            }
516            prev_epoch
517        }
518
519        let mut database_jobs = HashMap::new();
520        let mut snapshot_backfill_jobs = HashMap::new();
521        let mut background_mviews = HashMap::new();
522
523        for (job_id, job) in jobs {
524            if let Some((definition, stream_job_fragments)) = background_jobs.remove(&job_id) {
525                if stream_job_fragments.fragments().any(|fragment| {
526                    (fragment.fragment_type_mask
527                        & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32))
528                        != 0
529                }) {
530                    debug!(%job_id, definition, "recovered snapshot backfill job");
531                    snapshot_backfill_jobs.insert(job_id, (job, definition, stream_job_fragments));
532                } else {
533                    database_jobs.insert(job_id, job);
534                    background_mviews.insert(job_id, (definition, stream_job_fragments));
535                }
536            } else {
537                database_jobs.insert(job_id, job);
538            }
539        }
540
541        let database_job_log_epochs: HashMap<_, _> = database_jobs
542            .keys()
543            .filter_map(|job_id| {
544                state_table_log_epochs
545                    .remove(job_id)
546                    .map(|epochs| (*job_id, epochs))
547            })
548            .collect();
549
550        let prev_epoch =
551            resolve_jobs_committed_epoch(state_table_committed_epochs, database_jobs.values());
552        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
553        // Use a different `curr_epoch` for each recovery attempt.
554        let curr_epoch = prev_epoch.next();
555        let barrier_info = BarrierInfo {
556            prev_epoch,
557            curr_epoch,
558            kind: BarrierKind::Initial,
559        };
560
561        let mut ongoing_snapshot_backfill_jobs: HashMap<TableId, _> = HashMap::new();
562        for (job_id, (info, definition, stream_job_fragments)) in snapshot_backfill_jobs {
563            let committed_epoch =
564                resolve_jobs_committed_epoch(state_table_committed_epochs, [&info]);
565            if committed_epoch == barrier_info.prev_epoch() {
566                info!(
567                    "recovered creating snapshot backfill job {} catch up with upstream already",
568                    job_id
569                );
570                background_mviews
571                    .try_insert(job_id, (definition, stream_job_fragments))
572                    .expect("non-duplicate");
573                database_jobs
574                    .try_insert(job_id, info)
575                    .expect("non-duplicate");
576                continue;
577            }
578            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
579                stream_job_fragments
580                    .fragments()
581                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
582            )?
583            .0
584            .ok_or_else(|| {
585                anyhow!(
586                    "recovered snapshot backfill job {} has no snapshot backfill info",
587                    job_id
588                )
589            })?;
590            let mut snapshot_epoch = None;
591            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
592                .upstream_mv_table_id_to_backfill_epoch
593                .keys()
594                .cloned()
595                .collect();
596            for (upstream_table_id, epoch) in
597                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
598            {
599                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
600                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
601                if *snapshot_epoch != epoch {
602                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
603                }
604            }
605            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
606                anyhow!(
607                    "snapshot backfill job {} has not set snapshot epoch",
608                    job_id
609                )
610            })?;
611            for upstream_table_id in &upstream_table_ids {
612                subscription_info
613                    .mv_depended_subscriptions
614                    .entry(*upstream_table_id)
615                    .or_default()
616                    .try_insert(job_id.into(), max(snapshot_epoch, committed_epoch))
617                    .expect("non-duplicate");
618            }
619            ongoing_snapshot_backfill_jobs
620                .try_insert(
621                    job_id,
622                    (
623                        info,
624                        definition,
625                        stream_job_fragments,
626                        upstream_table_ids,
627                        committed_epoch,
628                        snapshot_epoch,
629                    ),
630                )
631                .expect("non-duplicated");
632        }
633
634        let node_to_collect = {
635            let node_actors = edges.collect_actors_to_create(database_jobs.values().flatten().map(
636                move |fragment_info| {
637                    (
638                        fragment_info.fragment_id,
639                        &fragment_info.nodes,
640                        fragment_info.actors.iter().map(move |(actor_id, actor)| {
641                            (
642                                stream_actors.get(actor_id).expect("should exist"),
643                                actor.worker_id,
644                            )
645                        }),
646                    )
647                },
648            ));
649
650            let node_to_collect = self.inject_barrier(
651                database_id,
652                None,
653                Some(mutation.clone()),
654                &barrier_info,
655                database_jobs.values().flatten(),
656                database_jobs.values().flatten(),
657                Some(node_actors),
658                (&subscription_info).into_iter().collect(),
659                vec![],
660            )?;
661            debug!(
662                ?node_to_collect,
663                database_id = database_id.database_id,
664                "inject initial barrier"
665            );
666            node_to_collect
667        };
668
669        let tracker = CreateMviewProgressTracker::recover(
670            background_mviews
671                .iter()
672                .map(|(table_id, (definition, stream_job_fragments))| {
673                    (*table_id, (definition.clone(), stream_job_fragments))
674                }),
675            hummock_version_stats,
676        );
677
678        let mut creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl> =
679            HashMap::new();
680        for (
681            job_id,
682            (
683                info,
684                definition,
685                stream_job_fragments,
686                upstream_table_ids,
687                committed_epoch,
688                snapshot_epoch,
689            ),
690        ) in ongoing_snapshot_backfill_jobs
691        {
692            let node_actors =
693                edges.collect_actors_to_create(info.fragment_infos().map(move |fragment_info| {
694                    (
695                        fragment_info.fragment_id,
696                        &fragment_info.nodes,
697                        fragment_info.actors.iter().map(move |(actor_id, actor)| {
698                            (
699                                stream_actors.get(actor_id).expect("should exist"),
700                                actor.worker_id,
701                            )
702                        }),
703                    )
704                }));
705
706            creating_streaming_job_controls.insert(
707                job_id,
708                CreatingStreamingJobControl::recover(
709                    database_id,
710                    job_id,
711                    definition,
712                    upstream_table_ids,
713                    &database_job_log_epochs,
714                    snapshot_epoch,
715                    committed_epoch,
716                    barrier_info.curr_epoch.value().0,
717                    info,
718                    stream_job_fragments,
719                    hummock_version_stats,
720                    node_actors,
721                    mutation.clone(),
722                    self,
723                )?,
724            );
725        }
726
727        let committed_epoch = barrier_info.prev_epoch();
728        let new_epoch = barrier_info.curr_epoch;
729        let mut database = InflightDatabaseInfo::empty();
730        database_jobs
731            .into_values()
732            .for_each(|job| database.extend(job));
733        let database_state =
734            BarrierWorkerState::recovery(new_epoch, database, subscription_info, is_paused);
735        Ok(DatabaseInitialBarrierCollector {
736            database_id,
737            node_to_collect,
738            database_state,
739            create_mview_tracker: tracker,
740            creating_streaming_job_controls,
741            committed_epoch,
742        })
743    }
744
745    pub(super) fn inject_command_ctx_barrier(
746        &mut self,
747        database_id: DatabaseId,
748        command: Option<&Command>,
749        barrier_info: &BarrierInfo,
750        is_paused: bool,
751        pre_applied_graph_info: &InflightDatabaseInfo,
752        applied_graph_info: &InflightDatabaseInfo,
753        edges: &mut Option<FragmentEdgeBuildResult>,
754    ) -> MetaResult<NodeToCollect> {
755        let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
756        let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
757            add.subscriptions_to_add.clone()
758        } else {
759            vec![]
760        };
761        let subscriptions_to_remove = if let Some(Mutation::DropSubscriptions(drop)) = &mutation {
762            drop.info.clone()
763        } else {
764            vec![]
765        };
766        self.inject_barrier(
767            database_id,
768            None,
769            mutation,
770            barrier_info,
771            pre_applied_graph_info.fragment_infos(),
772            applied_graph_info.fragment_infos(),
773            command
774                .as_ref()
775                .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
776                .unwrap_or_default(),
777            subscriptions_to_add,
778            subscriptions_to_remove,
779        )
780    }
781
782    pub(super) fn inject_barrier<'a>(
783        &mut self,
784        database_id: DatabaseId,
785        creating_table_id: Option<TableId>,
786        mutation: Option<Mutation>,
787        barrier_info: &BarrierInfo,
788        pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
789        applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
790        mut new_actors: Option<StreamJobActorsToCreate>,
791        subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
792        subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
793    ) -> MetaResult<NodeToCollect> {
794        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
795            "inject_barrier_err"
796        ));
797
798        let partial_graph_id = to_partial_graph_id(creating_table_id);
799
800        let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
801
802        for worker_id in node_actors.keys() {
803            if !self.connected_nodes.contains_key(worker_id) {
804                return Err(anyhow!("unconnected worker node {}", worker_id).into());
805            }
806        }
807
808        let table_ids_to_sync: HashSet<_> =
809            InflightFragmentInfo::existing_table_ids(applied_graph_info)
810                .map(|table_id| table_id.table_id)
811                .collect();
812
813        let mut node_need_collect = HashMap::new();
814
815        self.connected_nodes
816            .iter()
817            .try_for_each(|(node_id, node)| {
818                let actor_ids_to_collect = node_actors
819                    .get(node_id)
820                    .map(|actors| actors.iter().cloned())
821                    .into_iter()
822                    .flatten()
823                    .collect_vec();
824                let is_empty = actor_ids_to_collect.is_empty();
825                {
826                    let mutation = mutation.clone();
827                    let barrier = Barrier {
828                        epoch: Some(risingwave_pb::data::Epoch {
829                            curr: barrier_info.curr_epoch.value().0,
830                            prev: barrier_info.prev_epoch(),
831                        }),
832                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
833                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
834                            .to_protobuf(),
835                        kind: barrier_info.kind.to_protobuf() as i32,
836                        passed_actors: vec![],
837                    };
838
839                    node.handle
840                        .request_sender
841                        .send(StreamingControlStreamRequest {
842                            request: Some(
843                                streaming_control_stream_request::Request::InjectBarrier(
844                                    InjectBarrierRequest {
845                                        request_id: Uuid::new_v4().to_string(),
846                                        barrier: Some(barrier),
847                                        database_id: database_id.database_id,
848                                        actor_ids_to_collect,
849                                        table_ids_to_sync: table_ids_to_sync
850                                            .iter()
851                                            .cloned()
852                                            .collect(),
853                                        partial_graph_id,
854                                        actors_to_build: new_actors
855                                            .as_mut()
856                                            .map(|new_actors| new_actors.remove(&(*node_id as _)))
857                                            .into_iter()
858                                            .flatten()
859                                            .flatten()
860                                            .map(|(fragment_id, (node, actors))| {
861                                                FragmentBuildActorInfo {
862                                                    fragment_id,
863                                                    node: Some(node),
864                                                    actors: actors
865                                                        .into_iter()
866                                                        .map(|(actor, upstreams, dispatchers)| {
867                                                            BuildActorInfo {
868                                                                actor_id: actor.actor_id,
869                                                                fragment_upstreams: upstreams
870                                                                    .into_iter()
871                                                                    .map(|(fragment_id, upstreams)| {
872                                                                        (
873                                                                            fragment_id,
874                                                                            UpstreamActors {
875                                                                                actors: upstreams
876                                                                                    .into_values()
877                                                                                    .collect(),
878                                                                            },
879                                                                        )
880                                                                    })
881                                                                    .collect(),
882                                                                dispatchers,
883                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
884                                                                mview_definition: actor.mview_definition,
885                                                                expr_context: actor.expr_context,
886                                                            }
887                                                        })
888                                                        .collect(),
889                                                }
890                                            })
891                                            .collect(),
892                                        subscriptions_to_add: subscriptions_to_add.clone(),
893                                        subscriptions_to_remove: subscriptions_to_remove.clone(),
894                                    },
895                                ),
896                            ),
897                        })
898                        .map_err(|_| {
899                            MetaError::from(anyhow!(
900                                "failed to send request to {} {:?}",
901                                node.worker_id,
902                                node.host
903                            ))
904                        })?;
905
906                    node_need_collect.insert(*node_id as WorkerId, is_empty);
907                    Result::<_, MetaError>::Ok(())
908                }
909            })
910            .inspect_err(|e| {
911                // Record failure in event log.
912                use risingwave_pb::meta::event_log;
913                let event = event_log::EventInjectBarrierFail {
914                    prev_epoch: barrier_info.prev_epoch(),
915                    cur_epoch: barrier_info.curr_epoch.value().0,
916                    error: e.to_report_string(),
917                };
918                self.env
919                    .event_log_manager_ref()
920                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
921            })?;
922        Ok(node_need_collect)
923    }
924
925    pub(super) fn add_partial_graph(
926        &mut self,
927        database_id: DatabaseId,
928        creating_job_id: Option<TableId>,
929    ) {
930        let partial_graph_id = to_partial_graph_id(creating_job_id);
931        self.connected_nodes.iter().for_each(|(_, node)| {
932            if node
933                .handle
934                .request_sender
935                .send(StreamingControlStreamRequest {
936                    request: Some(
937                        streaming_control_stream_request::Request::CreatePartialGraph(
938                            CreatePartialGraphRequest {
939                                database_id: database_id.database_id,
940                                partial_graph_id,
941                            },
942                        ),
943                    ),
944                }).is_err() {
945                warn!(%database_id, ?creating_job_id, worker_id = node.worker_id, "fail to add partial graph to worker")
946            }
947        });
948    }
949
950    pub(super) fn remove_partial_graph(
951        &mut self,
952        database_id: DatabaseId,
953        creating_job_ids: Vec<TableId>,
954    ) {
955        if creating_job_ids.is_empty() {
956            return;
957        }
958        let partial_graph_ids = creating_job_ids
959            .into_iter()
960            .map(|job_id| to_partial_graph_id(Some(job_id)))
961            .collect_vec();
962        self.connected_nodes.iter().for_each(|(_, node)| {
963            if node.handle
964                .request_sender
965                .send(StreamingControlStreamRequest {
966                    request: Some(
967                        streaming_control_stream_request::Request::RemovePartialGraph(
968                            RemovePartialGraphRequest {
969                                partial_graph_ids: partial_graph_ids.clone(),
970                                database_id: database_id.database_id,
971                            },
972                        ),
973                    ),
974                })
975                .is_err()
976            {
977                warn!(worker_id = node.worker_id,node = ?node.host,"failed to send remove partial graph request");
978            }
979        })
980    }
981
982    pub(super) fn reset_database(
983        &mut self,
984        database_id: DatabaseId,
985        reset_request_id: u32,
986    ) -> HashSet<WorkerId> {
987        self.connected_nodes
988            .iter()
989            .filter_map(|(worker_id, node)| {
990                if node
991                    .handle
992                    .request_sender
993                    .send(StreamingControlStreamRequest {
994                        request: Some(streaming_control_stream_request::Request::ResetDatabase(
995                            ResetDatabaseRequest {
996                                database_id: database_id.database_id,
997                                reset_request_id,
998                            },
999                        )),
1000                    })
1001                    .is_err()
1002                {
1003                    warn!(worker_id, node = ?node.host,"failed to send reset database request");
1004                    None
1005                } else {
1006                    Some(*worker_id)
1007                }
1008            })
1009            .collect()
1010    }
1011}
1012
1013impl GlobalBarrierWorkerContextImpl {
1014    pub(super) async fn new_control_stream_impl(
1015        &self,
1016        node: &WorkerNode,
1017        init_request: &PbInitRequest,
1018    ) -> MetaResult<StreamingControlHandle> {
1019        let handle = self
1020            .env
1021            .stream_client_pool()
1022            .get(node)
1023            .await?
1024            .start_streaming_control(init_request.clone())
1025            .await?;
1026        Ok(handle)
1027    }
1028}
1029
1030pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1031    message: &str,
1032    errors: impl IntoIterator<Item = (WorkerId, E)>,
1033) -> MetaError {
1034    use std::error::request_value;
1035    use std::fmt::Write;
1036
1037    use risingwave_common::error::tonic::extra::Score;
1038
1039    let errors = errors.into_iter().collect_vec();
1040
1041    if errors.is_empty() {
1042        return anyhow!(message.to_owned()).into();
1043    }
1044
1045    // Create the error from the single error.
1046    let single_error = |(worker_id, e)| {
1047        anyhow::Error::from(e)
1048            .context(format!("{message}, in worker node {worker_id}"))
1049            .into()
1050    };
1051
1052    if errors.len() == 1 {
1053        return single_error(errors.into_iter().next().unwrap());
1054    }
1055
1056    // Find the error with the highest score.
1057    let max_score = errors
1058        .iter()
1059        .filter_map(|(_, e)| request_value::<Score>(e))
1060        .max();
1061
1062    if let Some(max_score) = max_score {
1063        let mut errors = errors;
1064        let max_scored = errors
1065            .extract_if(.., |(_, e)| request_value::<Score>(e) == Some(max_score))
1066            .next()
1067            .unwrap();
1068
1069        return single_error(max_scored);
1070    }
1071
1072    // The errors do not have scores, so simply concatenate them.
1073    let concat: String = errors
1074        .into_iter()
1075        .fold(format!("{message}: "), |mut s, (w, e)| {
1076            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1077            s
1078        });
1079    anyhow!(concat).into()
1080}