risingwave_meta/barrier/
rpc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::error::Error;
19use std::fmt::{Debug, Formatter};
20use std::future::poll_fn;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::StreamExt;
27use futures::future::join_all;
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
30use risingwave_common::util::epoch::Epoch;
31use risingwave_common::util::tracing::TracingContext;
32use risingwave_connector::source::SplitImpl;
33use risingwave_connector::source::cdc::{
34    CdcTableSnapshotSplitAssignment, build_pb_actor_cdc_table_snapshot_splits,
35};
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::stream_plan::barrier_mutation::Mutation;
40use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation, SubscriptionUpstreamInfo};
41use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
42use risingwave_pb::stream_service::inject_barrier_request::{
43    BuildActorInfo, FragmentBuildActorInfo,
44};
45use risingwave_pb::stream_service::streaming_control_stream_request::{
46    CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
47    RemovePartialGraphRequest, ResetDatabaseRequest,
48};
49use risingwave_pb::stream_service::{
50    BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
51    streaming_control_stream_request, streaming_control_stream_response,
52};
53use risingwave_rpc_client::StreamingControlHandle;
54use thiserror_ext::AsReport;
55use tokio::time::sleep;
56use tokio_retry::strategy::ExponentialBackoff;
57use tracing::{debug, error, info, warn};
58use uuid::Uuid;
59
60use super::{BarrierKind, Command, InflightSubscriptionInfo, TracedEpoch};
61use crate::barrier::checkpoint::{
62    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
63};
64use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
65use crate::barrier::edge_builder::FragmentEdgeBuildResult;
66use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo};
67use crate::barrier::progress::CreateMviewProgressTracker;
68use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
69use crate::controller::fragment::InflightFragmentInfo;
70use crate::manager::MetaSrvEnv;
71use crate::model::{ActorId, StreamActor, StreamJobActorsToCreate, StreamJobFragments};
72use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
73use crate::{MetaError, MetaResult};
74
75fn to_partial_graph_id(job_id: Option<TableId>) -> u32 {
76    job_id
77        .map(|table| {
78            assert_ne!(table.table_id, u32::MAX);
79            table.table_id
80        })
81        .unwrap_or(u32::MAX)
82}
83
84pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<TableId> {
85    if partial_graph_id == u32::MAX {
86        None
87    } else {
88        Some(TableId::new(partial_graph_id))
89    }
90}
91
92struct ControlStreamNode {
93    worker_id: WorkerId,
94    host: HostAddress,
95    handle: StreamingControlHandle,
96}
97
98pub(super) struct ControlStreamManager {
99    connected_nodes: HashMap<WorkerId, ControlStreamNode>,
100    workers: HashMap<WorkerId, WorkerNode>,
101    env: MetaSrvEnv,
102}
103
104impl ControlStreamManager {
105    pub(super) fn new(env: MetaSrvEnv) -> Self {
106        Self {
107            connected_nodes: Default::default(),
108            workers: Default::default(),
109            env,
110        }
111    }
112
113    pub(super) fn is_connected(&self, worker_id: WorkerId) -> bool {
114        self.connected_nodes.contains_key(&worker_id)
115    }
116
117    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
118        self.workers[&worker_id].host.clone().unwrap()
119    }
120
121    #[await_tree::instrument("try_reconnect_worker({worker_id})")]
122    pub(super) async fn try_reconnect_worker(
123        &mut self,
124        worker_id: WorkerId,
125        inflight_infos: impl Iterator<
126            Item = (
127                DatabaseId,
128                &InflightSubscriptionInfo,
129                impl Iterator<Item = TableId>,
130            ),
131        >,
132        term_id: String,
133        context: &impl GlobalBarrierWorkerContext,
134    ) {
135        if self.connected_nodes.contains_key(&worker_id) {
136            warn!(worker_id, "node already connected");
137            return;
138        }
139        let node = &self.workers[&worker_id];
140        let node_host = node.host.as_ref().unwrap();
141
142        let init_request = Self::collect_init_request(inflight_infos, term_id);
143        match context.new_control_stream(node, &init_request).await {
144            Ok(handle) => {
145                assert!(
146                    self.connected_nodes
147                        .insert(
148                            worker_id,
149                            ControlStreamNode {
150                                worker_id,
151                                host: node.host.clone().unwrap(),
152                                handle,
153                            }
154                        )
155                        .is_none()
156                );
157                info!(?node_host, "add control stream worker");
158            }
159            Err(e) => {
160                error!(err = %e.as_report(), ?node_host, "fail to create worker node");
161            }
162        }
163    }
164
165    pub(super) async fn add_worker(
166        &mut self,
167        node: WorkerNode,
168        inflight_infos: impl Iterator<
169            Item = (
170                DatabaseId,
171                &InflightSubscriptionInfo,
172                impl Iterator<Item = TableId>,
173            ),
174        >,
175        term_id: String,
176        context: &impl GlobalBarrierWorkerContext,
177    ) {
178        let node_id = node.id as WorkerId;
179        let node = match self.workers.entry(node_id) {
180            Entry::Occupied(entry) => {
181                let entry = entry.into_mut();
182                assert_eq!(entry.host, node.host);
183                warn!(id = node.id, host = ?node.host, "node already exists");
184                &*entry
185            }
186            Entry::Vacant(entry) => &*entry.insert(node),
187        };
188        if self.connected_nodes.contains_key(&node_id) {
189            warn!(id = node.id, host = ?node.host, "new node already connected");
190            return;
191        }
192        let node_host = node.host.clone().unwrap();
193        let mut backoff = ExponentialBackoff::from_millis(100)
194            .max_delay(Duration::from_secs(3))
195            .factor(5);
196        let init_request = Self::collect_init_request(inflight_infos, term_id);
197        const MAX_RETRY: usize = 5;
198        for i in 1..=MAX_RETRY {
199            match context.new_control_stream(node, &init_request).await {
200                Ok(handle) => {
201                    assert!(
202                        self.connected_nodes
203                            .insert(
204                                node_id,
205                                ControlStreamNode {
206                                    worker_id: node.id as _,
207                                    host: node.host.clone().unwrap(),
208                                    handle,
209                                }
210                            )
211                            .is_none()
212                    );
213                    info!(?node_host, "add control stream worker");
214                    return;
215                }
216                Err(e) => {
217                    // It may happen that the dns information of newly registered worker node
218                    // has not been propagated to the meta node and cause error. Wait for a while and retry
219                    let delay = backoff.next().unwrap();
220                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
221                    sleep(delay).await;
222                }
223            }
224        }
225        error!(?node_host, "fail to create worker node after retry");
226    }
227
228    pub(super) async fn reset(
229        &mut self,
230        nodes: &HashMap<WorkerId, WorkerNode>,
231        term_id: String,
232        context: &impl GlobalBarrierWorkerContext,
233    ) -> HashSet<WorkerId> {
234        let init_request = PbInitRequest {
235            databases: vec![],
236            term_id,
237        };
238        let init_request = &init_request;
239        self.workers = nodes.clone();
240        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async move {
241            let result = context.new_control_stream(node, init_request).await;
242            (*worker_id, node.clone(), result)
243        }))
244        .await;
245        self.connected_nodes.clear();
246        let mut failed_workers = HashSet::new();
247        for (worker_id, node, result) in nodes {
248            match result {
249                Ok(handle) => {
250                    assert!(
251                        self.connected_nodes
252                            .insert(
253                                worker_id,
254                                ControlStreamNode {
255                                    worker_id: node.id as _,
256                                    host: node.host.clone().unwrap(),
257                                    handle
258                                }
259                            )
260                            .is_none()
261                    );
262                }
263                Err(e) => {
264                    failed_workers.insert(worker_id);
265                    warn!(
266                        e = %e.as_report(),
267                        worker_id,
268                        ?node,
269                        "failed to connect to node"
270                    )
271                }
272            }
273        }
274
275        failed_workers
276    }
277
278    /// Clear all nodes and response streams in the manager.
279    pub(super) fn clear(&mut self) {
280        *self = Self::new(self.env.clone());
281    }
282
283    fn poll_next_response(
284        &mut self,
285        cx: &mut Context<'_>,
286    ) -> Poll<(
287        WorkerId,
288        MetaResult<streaming_control_stream_response::Response>,
289    )> {
290        if self.connected_nodes.is_empty() {
291            return Poll::Pending;
292        }
293        let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
294        {
295            for (worker_id, node) in &mut self.connected_nodes {
296                match node.handle.response_stream.poll_next_unpin(cx) {
297                    Poll::Ready(result) => {
298                        poll_result = Poll::Ready((
299                            *worker_id,
300                            result
301                                .ok_or_else(|| anyhow!("end of stream").into())
302                                .and_then(|result| {
303                                    result.map_err(Into::<MetaError>::into).and_then(|resp| {
304                                        match resp
305                                            .response
306                                            .ok_or_else(||anyhow!("empty response"))?
307                                        {
308                                            streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!(
309                                                "worker node {worker_id} is shutting down"
310                                            )
311                                            .into()),
312                                            streaming_control_stream_response::Response::Init(_) => {
313                                                // This arm should be unreachable.
314                                                Err(anyhow!("get unexpected init response").into())
315                                            }
316                                            resp => Ok(resp),
317                                        }
318                                    })
319                                })
320                        ));
321                        break;
322                    }
323                    Poll::Pending => {
324                        continue;
325                    }
326                }
327            }
328        };
329
330        if let Poll::Ready((worker_id, Err(err))) = &poll_result {
331            let node = self
332                .connected_nodes
333                .remove(worker_id)
334                .expect("should exist when get shutdown resp");
335            warn!(worker_id = node.worker_id, host = ?node.host, err = %err.as_report(), "get error from response stream");
336        }
337
338        poll_result
339    }
340
341    #[await_tree::instrument("control_stream_next_response")]
342    pub(super) async fn next_response(
343        &mut self,
344    ) -> (
345        WorkerId,
346        MetaResult<streaming_control_stream_response::Response>,
347    ) {
348        poll_fn(|cx| self.poll_next_response(cx)).await
349    }
350
351    fn collect_init_request(
352        initial_inflight_infos: impl Iterator<
353            Item = (
354                DatabaseId,
355                &InflightSubscriptionInfo,
356                impl Iterator<Item = TableId>,
357            ),
358        >,
359        term_id: String,
360    ) -> PbInitRequest {
361        PbInitRequest {
362            databases: initial_inflight_infos
363                .map(|(database_id, subscriptions, creating_job_ids)| {
364                    let mut graphs = vec![PbInitialPartialGraph {
365                        partial_graph_id: to_partial_graph_id(None),
366                        subscriptions: subscriptions.into_iter().collect_vec(),
367                    }];
368                    graphs.extend(creating_job_ids.map(|job_id| PbInitialPartialGraph {
369                        partial_graph_id: to_partial_graph_id(Some(job_id)),
370                        subscriptions: vec![],
371                    }));
372                    PbDatabaseInitialPartialGraph {
373                        database_id: database_id.database_id,
374                        graphs,
375                    }
376                })
377                .collect(),
378            term_id,
379        }
380    }
381}
382
383pub(super) struct DatabaseInitialBarrierCollector {
384    database_id: DatabaseId,
385    node_to_collect: NodeToCollect,
386    database_state: BarrierWorkerState,
387    create_mview_tracker: CreateMviewProgressTracker,
388    creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
389    committed_epoch: u64,
390}
391
392impl Debug for DatabaseInitialBarrierCollector {
393    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
394        f.debug_struct("DatabaseInitialBarrierCollector")
395            .field("database_id", &self.database_id)
396            .field("node_to_collect", &self.node_to_collect)
397            .finish()
398    }
399}
400
401impl DatabaseInitialBarrierCollector {
402    pub(super) fn is_collected(&self) -> bool {
403        self.node_to_collect.is_empty()
404            && self
405                .creating_streaming_job_controls
406                .values()
407                .all(|job| job.is_empty())
408    }
409
410    pub(super) fn database_state(
411        &self,
412    ) -> (
413        &BarrierWorkerState,
414        &HashMap<TableId, CreatingStreamingJobControl>,
415    ) {
416        (&self.database_state, &self.creating_streaming_job_controls)
417    }
418
419    pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
420        assert_eq!(self.database_id.database_id, resp.database_id);
421        if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
422            self.creating_streaming_job_controls
423                .get_mut(&creating_job_id)
424                .expect("should exist")
425                .collect(resp);
426        } else {
427            assert_eq!(resp.epoch, self.committed_epoch);
428            assert!(
429                self.node_to_collect
430                    .remove(&(resp.worker_id as _))
431                    .is_some()
432            );
433        }
434    }
435
436    pub(super) fn finish(self) -> DatabaseCheckpointControl {
437        assert!(self.is_collected());
438        DatabaseCheckpointControl::recovery(
439            self.database_id,
440            self.create_mview_tracker,
441            self.database_state,
442            self.committed_epoch,
443            self.creating_streaming_job_controls,
444        )
445    }
446
447    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
448        is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
449            && self
450                .creating_streaming_job_controls
451                .values_mut()
452                .all(|job| job.is_valid_after_worker_err(worker_id))
453    }
454}
455
456impl ControlStreamManager {
457    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
458    #[expect(clippy::too_many_arguments)]
459    pub(super) fn inject_database_initial_barrier(
460        &mut self,
461        database_id: DatabaseId,
462        jobs: HashMap<TableId, InflightStreamingJobInfo>,
463        state_table_committed_epochs: &mut HashMap<TableId, u64>,
464        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
465        edges: &mut FragmentEdgeBuildResult,
466        stream_actors: &HashMap<ActorId, StreamActor>,
467        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
468        background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
469        mut subscription_info: InflightSubscriptionInfo,
470        is_paused: bool,
471        hummock_version_stats: &HummockVersionStats,
472        cdc_table_snapshot_split_assignment: &mut CdcTableSnapshotSplitAssignment,
473    ) -> MetaResult<DatabaseInitialBarrierCollector> {
474        self.add_partial_graph(database_id, None);
475        let source_split_assignments = jobs
476            .values()
477            .flat_map(|job| job.fragment_infos())
478            .flat_map(|info| info.actors.keys())
479            .filter_map(|actor_id| {
480                let actor_id = *actor_id as ActorId;
481                source_splits
482                    .remove(&actor_id)
483                    .map(|splits| (actor_id, splits))
484            })
485            .collect();
486        let database_cdc_table_snapshot_split_assignment = jobs
487            .values()
488            .flat_map(|job| job.fragment_infos())
489            .flat_map(|info| info.actors.keys())
490            .filter_map(|actor_id| {
491                let actor_id = *actor_id as ActorId;
492                cdc_table_snapshot_split_assignment
493                    .remove(&actor_id)
494                    .map(|splits| (actor_id, splits))
495            })
496            .collect();
497        let mutation = Mutation::Add(AddMutation {
498            // Actors built during recovery is not treated as newly added actors.
499            actor_dispatchers: Default::default(),
500            added_actors: Default::default(),
501            actor_splits: build_actor_connector_splits(&source_split_assignments),
502            actor_cdc_table_snapshot_splits: build_pb_actor_cdc_table_snapshot_splits(
503                database_cdc_table_snapshot_split_assignment,
504            ),
505            pause: is_paused,
506            subscriptions_to_add: Default::default(),
507            // TODO(kwannoel): recover using backfill order plan
508            backfill_nodes_to_pause: Default::default(),
509        });
510
511        fn resolve_jobs_committed_epoch(
512            state_table_committed_epochs: &mut HashMap<TableId, u64>,
513            jobs: impl IntoIterator<Item = &InflightStreamingJobInfo>,
514        ) -> u64 {
515            let mut epochs = jobs
516                .into_iter()
517                .flat_map(InflightStreamingJobInfo::existing_table_ids)
518                .map(|table_id| {
519                    (
520                        table_id,
521                        state_table_committed_epochs
522                            .remove(&table_id)
523                            .expect("should exist"),
524                    )
525                });
526            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
527            for (table_id, epoch) in epochs {
528                assert_eq!(
529                    prev_epoch, epoch,
530                    "{} has different committed epoch to {}",
531                    first_table_id, table_id
532                );
533            }
534            prev_epoch
535        }
536
537        let mut database_jobs = HashMap::new();
538        let mut snapshot_backfill_jobs = HashMap::new();
539        let mut background_mviews = HashMap::new();
540
541        for (job_id, job) in jobs {
542            if let Some((definition, stream_job_fragments)) = background_jobs.remove(&job_id) {
543                if stream_job_fragments.fragments().any(|fragment| {
544                    fragment
545                        .fragment_type_mask
546                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
547                }) {
548                    debug!(%job_id, definition, "recovered snapshot backfill job");
549                    snapshot_backfill_jobs.insert(job_id, (job, definition, stream_job_fragments));
550                } else {
551                    database_jobs.insert(job_id, job);
552                    background_mviews.insert(job_id, (definition, stream_job_fragments));
553                }
554            } else {
555                database_jobs.insert(job_id, job);
556            }
557        }
558
559        let database_job_log_epochs: HashMap<_, _> = database_jobs
560            .keys()
561            .filter_map(|job_id| {
562                state_table_log_epochs
563                    .remove(job_id)
564                    .map(|epochs| (*job_id, epochs))
565            })
566            .collect();
567
568        let prev_epoch =
569            resolve_jobs_committed_epoch(state_table_committed_epochs, database_jobs.values());
570        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
571        // Use a different `curr_epoch` for each recovery attempt.
572        let curr_epoch = prev_epoch.next();
573        let barrier_info = BarrierInfo {
574            prev_epoch,
575            curr_epoch,
576            kind: BarrierKind::Initial,
577        };
578
579        let mut ongoing_snapshot_backfill_jobs: HashMap<TableId, _> = HashMap::new();
580        for (job_id, (info, definition, stream_job_fragments)) in snapshot_backfill_jobs {
581            let committed_epoch =
582                resolve_jobs_committed_epoch(state_table_committed_epochs, [&info]);
583            if committed_epoch == barrier_info.prev_epoch() {
584                info!(
585                    "recovered creating snapshot backfill job {} catch up with upstream already",
586                    job_id
587                );
588                background_mviews
589                    .try_insert(job_id, (definition, stream_job_fragments))
590                    .expect("non-duplicate");
591                database_jobs
592                    .try_insert(job_id, info)
593                    .expect("non-duplicate");
594                continue;
595            }
596            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
597                stream_job_fragments
598                    .fragments()
599                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
600            )?
601            .0
602            .ok_or_else(|| {
603                anyhow!(
604                    "recovered snapshot backfill job {} has no snapshot backfill info",
605                    job_id
606                )
607            })?;
608            let mut snapshot_epoch = None;
609            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
610                .upstream_mv_table_id_to_backfill_epoch
611                .keys()
612                .cloned()
613                .collect();
614            for (upstream_table_id, epoch) in
615                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
616            {
617                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
618                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
619                if *snapshot_epoch != epoch {
620                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
621                }
622            }
623            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
624                anyhow!(
625                    "snapshot backfill job {} has not set snapshot epoch",
626                    job_id
627                )
628            })?;
629            for upstream_table_id in &upstream_table_ids {
630                subscription_info
631                    .mv_depended_subscriptions
632                    .entry(*upstream_table_id)
633                    .or_default()
634                    .try_insert(job_id.into(), max(snapshot_epoch, committed_epoch))
635                    .expect("non-duplicate");
636            }
637            ongoing_snapshot_backfill_jobs
638                .try_insert(
639                    job_id,
640                    (
641                        info,
642                        definition,
643                        stream_job_fragments,
644                        upstream_table_ids,
645                        committed_epoch,
646                        snapshot_epoch,
647                    ),
648                )
649                .expect("non-duplicated");
650        }
651
652        let node_to_collect = {
653            let node_actors = edges.collect_actors_to_create(database_jobs.values().flatten().map(
654                move |fragment_info| {
655                    (
656                        fragment_info.fragment_id,
657                        &fragment_info.nodes,
658                        fragment_info.actors.iter().map(move |(actor_id, actor)| {
659                            (
660                                stream_actors.get(actor_id).expect("should exist"),
661                                actor.worker_id,
662                            )
663                        }),
664                    )
665                },
666            ));
667
668            let node_to_collect = self.inject_barrier(
669                database_id,
670                None,
671                Some(mutation.clone()),
672                &barrier_info,
673                database_jobs.values().flatten(),
674                database_jobs.values().flatten(),
675                Some(node_actors),
676                (&subscription_info).into_iter().collect(),
677                vec![],
678            )?;
679            debug!(
680                ?node_to_collect,
681                database_id = database_id.database_id,
682                "inject initial barrier"
683            );
684            node_to_collect
685        };
686
687        let tracker = CreateMviewProgressTracker::recover(
688            background_mviews
689                .iter()
690                .map(|(table_id, (definition, stream_job_fragments))| {
691                    (
692                        *table_id,
693                        (definition.clone(), stream_job_fragments, Default::default()),
694                    )
695                }),
696            hummock_version_stats,
697        );
698
699        let mut creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl> =
700            HashMap::new();
701        for (
702            job_id,
703            (
704                info,
705                definition,
706                stream_job_fragments,
707                upstream_table_ids,
708                committed_epoch,
709                snapshot_epoch,
710            ),
711        ) in ongoing_snapshot_backfill_jobs
712        {
713            let node_actors =
714                edges.collect_actors_to_create(info.fragment_infos().map(move |fragment_info| {
715                    (
716                        fragment_info.fragment_id,
717                        &fragment_info.nodes,
718                        fragment_info.actors.iter().map(move |(actor_id, actor)| {
719                            (
720                                stream_actors.get(actor_id).expect("should exist"),
721                                actor.worker_id,
722                            )
723                        }),
724                    )
725                }));
726
727            creating_streaming_job_controls.insert(
728                job_id,
729                CreatingStreamingJobControl::recover(
730                    database_id,
731                    job_id,
732                    definition,
733                    upstream_table_ids,
734                    &database_job_log_epochs,
735                    snapshot_epoch,
736                    committed_epoch,
737                    barrier_info.curr_epoch.value().0,
738                    info,
739                    stream_job_fragments,
740                    hummock_version_stats,
741                    node_actors,
742                    mutation.clone(),
743                    self,
744                )?,
745            );
746        }
747
748        self.env.shared_actor_infos().recover_database(
749            database_id,
750            database_jobs.values().flatten().chain(
751                creating_streaming_job_controls
752                    .values()
753                    .flat_map(|job| job.graph_info().fragment_infos()),
754            ),
755        );
756
757        let committed_epoch = barrier_info.prev_epoch();
758        let new_epoch = barrier_info.curr_epoch;
759        let database_state = BarrierWorkerState::recovery(
760            database_id,
761            self.env.shared_actor_infos().clone(),
762            new_epoch,
763            database_jobs.into_values(),
764            subscription_info,
765            is_paused,
766        );
767        Ok(DatabaseInitialBarrierCollector {
768            database_id,
769            node_to_collect,
770            database_state,
771            create_mview_tracker: tracker,
772            creating_streaming_job_controls,
773            committed_epoch,
774        })
775    }
776
777    pub(super) fn inject_command_ctx_barrier(
778        &mut self,
779        database_id: DatabaseId,
780        command: Option<&Command>,
781        barrier_info: &BarrierInfo,
782        is_paused: bool,
783        pre_applied_graph_info: &InflightDatabaseInfo,
784        applied_graph_info: &InflightDatabaseInfo,
785        edges: &mut Option<FragmentEdgeBuildResult>,
786    ) -> MetaResult<NodeToCollect> {
787        let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
788        let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
789            add.subscriptions_to_add.clone()
790        } else {
791            vec![]
792        };
793        let subscriptions_to_remove = if let Some(Mutation::DropSubscriptions(drop)) = &mutation {
794            drop.info.clone()
795        } else {
796            vec![]
797        };
798        self.inject_barrier(
799            database_id,
800            None,
801            mutation,
802            barrier_info,
803            pre_applied_graph_info.fragment_infos(),
804            applied_graph_info.fragment_infos(),
805            command
806                .as_ref()
807                .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
808                .unwrap_or_default(),
809            subscriptions_to_add,
810            subscriptions_to_remove,
811        )
812    }
813
814    pub(super) fn inject_barrier<'a>(
815        &mut self,
816        database_id: DatabaseId,
817        creating_table_id: Option<TableId>,
818        mutation: Option<Mutation>,
819        barrier_info: &BarrierInfo,
820        pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
821        applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
822        mut new_actors: Option<StreamJobActorsToCreate>,
823        subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
824        subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
825    ) -> MetaResult<NodeToCollect> {
826        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
827            "inject_barrier_err"
828        ));
829
830        let partial_graph_id = to_partial_graph_id(creating_table_id);
831
832        let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
833
834        for worker_id in node_actors.keys() {
835            if !self.connected_nodes.contains_key(worker_id) {
836                return Err(anyhow!("unconnected worker node {}", worker_id).into());
837            }
838        }
839
840        let table_ids_to_sync: HashSet<_> =
841            InflightFragmentInfo::existing_table_ids(applied_graph_info)
842                .map(|table_id| table_id.table_id)
843                .collect();
844
845        let mut node_need_collect = HashMap::new();
846
847        self.connected_nodes
848            .iter()
849            .try_for_each(|(node_id, node)| {
850                let actor_ids_to_collect = node_actors
851                    .get(node_id)
852                    .map(|actors| actors.iter().cloned())
853                    .into_iter()
854                    .flatten()
855                    .collect_vec();
856                let is_empty = actor_ids_to_collect.is_empty();
857                {
858                    let mutation = mutation.clone();
859                    let barrier = Barrier {
860                        epoch: Some(risingwave_pb::data::Epoch {
861                            curr: barrier_info.curr_epoch.value().0,
862                            prev: barrier_info.prev_epoch(),
863                        }),
864                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
865                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
866                            .to_protobuf(),
867                        kind: barrier_info.kind.to_protobuf() as i32,
868                        passed_actors: vec![],
869                    };
870
871                    node.handle
872                        .request_sender
873                        .send(StreamingControlStreamRequest {
874                            request: Some(
875                                streaming_control_stream_request::Request::InjectBarrier(
876                                    InjectBarrierRequest {
877                                        request_id: Uuid::new_v4().to_string(),
878                                        barrier: Some(barrier),
879                                        database_id: database_id.database_id,
880                                        actor_ids_to_collect,
881                                        table_ids_to_sync: table_ids_to_sync
882                                            .iter()
883                                            .cloned()
884                                            .collect(),
885                                        partial_graph_id,
886                                        actors_to_build: new_actors
887                                            .as_mut()
888                                            .map(|new_actors| new_actors.remove(&(*node_id as _)))
889                                            .into_iter()
890                                            .flatten()
891                                            .flatten()
892                                            .map(|(fragment_id, (node, actors))| {
893                                                FragmentBuildActorInfo {
894                                                    fragment_id,
895                                                    node: Some(node),
896                                                    actors: actors
897                                                        .into_iter()
898                                                        .map(|(actor, upstreams, dispatchers)| {
899                                                            BuildActorInfo {
900                                                                actor_id: actor.actor_id,
901                                                                fragment_upstreams: upstreams
902                                                                    .into_iter()
903                                                                    .map(|(fragment_id, upstreams)| {
904                                                                        (
905                                                                            fragment_id,
906                                                                            UpstreamActors {
907                                                                                actors: upstreams
908                                                                                    .into_values()
909                                                                                    .collect(),
910                                                                            },
911                                                                        )
912                                                                    })
913                                                                    .collect(),
914                                                                dispatchers,
915                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
916                                                                mview_definition: actor.mview_definition,
917                                                                expr_context: actor.expr_context,
918                                                            }
919                                                        })
920                                                        .collect(),
921                                                }
922                                            })
923                                            .collect(),
924                                        subscriptions_to_add: subscriptions_to_add.clone(),
925                                        subscriptions_to_remove: subscriptions_to_remove.clone(),
926                                    },
927                                ),
928                            ),
929                        })
930                        .map_err(|_| {
931                            MetaError::from(anyhow!(
932                                "failed to send request to {} {:?}",
933                                node.worker_id,
934                                node.host
935                            ))
936                        })?;
937
938                    node_need_collect.insert(*node_id as WorkerId, is_empty);
939                    Result::<_, MetaError>::Ok(())
940                }
941            })
942            .inspect_err(|e| {
943                // Record failure in event log.
944                use risingwave_pb::meta::event_log;
945                let event = event_log::EventInjectBarrierFail {
946                    prev_epoch: barrier_info.prev_epoch(),
947                    cur_epoch: barrier_info.curr_epoch.value().0,
948                    error: e.to_report_string(),
949                };
950                self.env
951                    .event_log_manager_ref()
952                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
953            })?;
954        Ok(node_need_collect)
955    }
956
957    pub(super) fn add_partial_graph(
958        &mut self,
959        database_id: DatabaseId,
960        creating_job_id: Option<TableId>,
961    ) {
962        let partial_graph_id = to_partial_graph_id(creating_job_id);
963        self.connected_nodes.iter().for_each(|(_, node)| {
964            if node
965                .handle
966                .request_sender
967                .send(StreamingControlStreamRequest {
968                    request: Some(
969                        streaming_control_stream_request::Request::CreatePartialGraph(
970                            CreatePartialGraphRequest {
971                                database_id: database_id.database_id,
972                                partial_graph_id,
973                            },
974                        ),
975                    ),
976                }).is_err() {
977                warn!(%database_id, ?creating_job_id, worker_id = node.worker_id, "fail to add partial graph to worker")
978            }
979        });
980    }
981
982    pub(super) fn remove_partial_graph(
983        &mut self,
984        database_id: DatabaseId,
985        creating_job_ids: Vec<TableId>,
986    ) {
987        if creating_job_ids.is_empty() {
988            return;
989        }
990        let partial_graph_ids = creating_job_ids
991            .into_iter()
992            .map(|job_id| to_partial_graph_id(Some(job_id)))
993            .collect_vec();
994        self.connected_nodes.iter().for_each(|(_, node)| {
995            if node.handle
996                .request_sender
997                .send(StreamingControlStreamRequest {
998                    request: Some(
999                        streaming_control_stream_request::Request::RemovePartialGraph(
1000                            RemovePartialGraphRequest {
1001                                partial_graph_ids: partial_graph_ids.clone(),
1002                                database_id: database_id.database_id,
1003                            },
1004                        ),
1005                    ),
1006                })
1007                .is_err()
1008            {
1009                warn!(worker_id = node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1010            }
1011        })
1012    }
1013
1014    pub(super) fn reset_database(
1015        &mut self,
1016        database_id: DatabaseId,
1017        reset_request_id: u32,
1018    ) -> HashSet<WorkerId> {
1019        self.connected_nodes
1020            .iter()
1021            .filter_map(|(worker_id, node)| {
1022                if node
1023                    .handle
1024                    .request_sender
1025                    .send(StreamingControlStreamRequest {
1026                        request: Some(streaming_control_stream_request::Request::ResetDatabase(
1027                            ResetDatabaseRequest {
1028                                database_id: database_id.database_id,
1029                                reset_request_id,
1030                            },
1031                        )),
1032                    })
1033                    .is_err()
1034                {
1035                    warn!(worker_id, node = ?node.host,"failed to send reset database request");
1036                    None
1037                } else {
1038                    Some(*worker_id)
1039                }
1040            })
1041            .collect()
1042    }
1043}
1044
1045impl GlobalBarrierWorkerContextImpl {
1046    pub(super) async fn new_control_stream_impl(
1047        &self,
1048        node: &WorkerNode,
1049        init_request: &PbInitRequest,
1050    ) -> MetaResult<StreamingControlHandle> {
1051        let handle = self
1052            .env
1053            .stream_client_pool()
1054            .get(node)
1055            .await?
1056            .start_streaming_control(init_request.clone())
1057            .await?;
1058        Ok(handle)
1059    }
1060}
1061
1062pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1063    message: &str,
1064    errors: impl IntoIterator<Item = (WorkerId, E)>,
1065) -> MetaError {
1066    use std::fmt::Write;
1067
1068    use risingwave_common::error::error_request_copy;
1069    use risingwave_common::error::tonic::extra::Score;
1070
1071    let errors = errors.into_iter().collect_vec();
1072
1073    if errors.is_empty() {
1074        return anyhow!(message.to_owned()).into();
1075    }
1076
1077    // Create the error from the single error.
1078    let single_error = |(worker_id, e)| {
1079        anyhow::Error::from(e)
1080            .context(format!("{message}, in worker node {worker_id}"))
1081            .into()
1082    };
1083
1084    if errors.len() == 1 {
1085        return single_error(errors.into_iter().next().unwrap());
1086    }
1087
1088    // Find the error with the highest score.
1089    let max_score = errors
1090        .iter()
1091        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1092        .max();
1093
1094    if let Some(max_score) = max_score {
1095        let mut errors = errors;
1096        let max_scored = errors
1097            .extract_if(.., |(_, e)| {
1098                error_request_copy::<Score>(e) == Some(max_score)
1099            })
1100            .next()
1101            .unwrap();
1102
1103        return single_error(max_scored);
1104    }
1105
1106    // The errors do not have scores, so simply concatenate them.
1107    let concat: String = errors
1108        .into_iter()
1109        .fold(format!("{message}: "), |mut s, (w, e)| {
1110            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1111            s
1112        });
1113    anyhow!(concat).into()
1114}