risingwave_meta/barrier/
rpc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::error::Error;
19use std::fmt::{Debug, Formatter};
20use std::future::poll_fn;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::StreamExt;
27use futures::future::join_all;
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
30use risingwave_common::util::epoch::Epoch;
31use risingwave_common::util::tracing::TracingContext;
32use risingwave_connector::source::SplitImpl;
33use risingwave_connector::source::cdc::{
34    CdcTableSnapshotSplitAssignmentWithGeneration,
35    build_pb_actor_cdc_table_snapshot_splits_with_generation,
36};
37use risingwave_meta_model::WorkerId;
38use risingwave_pb::common::{HostAddress, WorkerNode};
39use risingwave_pb::hummock::HummockVersionStats;
40use risingwave_pb::stream_plan::barrier_mutation::Mutation;
41use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation, SubscriptionUpstreamInfo};
42use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
43use risingwave_pb::stream_service::inject_barrier_request::{
44    BuildActorInfo, FragmentBuildActorInfo,
45};
46use risingwave_pb::stream_service::streaming_control_stream_request::{
47    CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
48    RemovePartialGraphRequest, ResetDatabaseRequest,
49};
50use risingwave_pb::stream_service::{
51    BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
52    streaming_control_stream_request, streaming_control_stream_response,
53};
54use risingwave_rpc_client::StreamingControlHandle;
55use thiserror_ext::AsReport;
56use tokio::time::sleep;
57use tokio_retry::strategy::ExponentialBackoff;
58use tracing::{debug, error, info, warn};
59use uuid::Uuid;
60
61use super::{BarrierKind, Command, InflightSubscriptionInfo, TracedEpoch};
62use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
63use crate::barrier::checkpoint::{
64    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
65};
66use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
67use crate::barrier::edge_builder::FragmentEdgeBuildResult;
68use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo};
69use crate::barrier::progress::CreateMviewProgressTracker;
70use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
71use crate::controller::fragment::InflightFragmentInfo;
72use crate::manager::MetaSrvEnv;
73use crate::model::{ActorId, StreamActor, StreamJobActorsToCreate, StreamJobFragments};
74use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
75use crate::{MetaError, MetaResult};
76
77fn to_partial_graph_id(job_id: Option<TableId>) -> u32 {
78    job_id
79        .map(|table| {
80            assert_ne!(table.table_id, u32::MAX);
81            table.table_id
82        })
83        .unwrap_or(u32::MAX)
84}
85
86pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<TableId> {
87    if partial_graph_id == u32::MAX {
88        None
89    } else {
90        Some(TableId::new(partial_graph_id))
91    }
92}
93
94struct ControlStreamNode {
95    worker_id: WorkerId,
96    host: HostAddress,
97    handle: StreamingControlHandle,
98}
99
100pub(super) struct ControlStreamManager {
101    connected_nodes: HashMap<WorkerId, ControlStreamNode>,
102    workers: HashMap<WorkerId, WorkerNode>,
103    pub env: MetaSrvEnv,
104}
105
106impl ControlStreamManager {
107    pub(super) fn new(env: MetaSrvEnv) -> Self {
108        Self {
109            connected_nodes: Default::default(),
110            workers: Default::default(),
111            env,
112        }
113    }
114
115    pub(super) fn is_connected(&self, worker_id: WorkerId) -> bool {
116        self.connected_nodes.contains_key(&worker_id)
117    }
118
119    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
120        self.workers[&worker_id].host.clone().unwrap()
121    }
122
123    #[await_tree::instrument("try_reconnect_worker({worker_id})")]
124    pub(super) async fn try_reconnect_worker(
125        &mut self,
126        worker_id: WorkerId,
127        inflight_infos: impl Iterator<
128            Item = (
129                DatabaseId,
130                &InflightSubscriptionInfo,
131                impl Iterator<Item = TableId>,
132            ),
133        >,
134        term_id: String,
135        context: &impl GlobalBarrierWorkerContext,
136    ) {
137        if self.connected_nodes.contains_key(&worker_id) {
138            warn!(worker_id, "node already connected");
139            return;
140        }
141        let node = &self.workers[&worker_id];
142        let node_host = node.host.as_ref().unwrap();
143
144        let init_request = Self::collect_init_request(inflight_infos, term_id);
145        match context.new_control_stream(node, &init_request).await {
146            Ok(handle) => {
147                assert!(
148                    self.connected_nodes
149                        .insert(
150                            worker_id,
151                            ControlStreamNode {
152                                worker_id,
153                                host: node.host.clone().unwrap(),
154                                handle,
155                            }
156                        )
157                        .is_none()
158                );
159                info!(?node_host, "add control stream worker");
160            }
161            Err(e) => {
162                error!(err = %e.as_report(), ?node_host, "fail to create worker node");
163            }
164        }
165    }
166
167    pub(super) async fn add_worker(
168        &mut self,
169        node: WorkerNode,
170        inflight_infos: impl Iterator<
171            Item = (
172                DatabaseId,
173                &InflightSubscriptionInfo,
174                impl Iterator<Item = TableId>,
175            ),
176        >,
177        term_id: String,
178        context: &impl GlobalBarrierWorkerContext,
179    ) {
180        let node_id = node.id as WorkerId;
181        let node = match self.workers.entry(node_id) {
182            Entry::Occupied(entry) => {
183                let entry = entry.into_mut();
184                assert_eq!(entry.host, node.host);
185                warn!(id = node.id, host = ?node.host, "node already exists");
186                &*entry
187            }
188            Entry::Vacant(entry) => &*entry.insert(node),
189        };
190        if self.connected_nodes.contains_key(&node_id) {
191            warn!(id = node.id, host = ?node.host, "new node already connected");
192            return;
193        }
194        let node_host = node.host.clone().unwrap();
195        let mut backoff = ExponentialBackoff::from_millis(100)
196            .max_delay(Duration::from_secs(3))
197            .factor(5);
198        let init_request = Self::collect_init_request(inflight_infos, term_id);
199        const MAX_RETRY: usize = 5;
200        for i in 1..=MAX_RETRY {
201            match context.new_control_stream(node, &init_request).await {
202                Ok(handle) => {
203                    assert!(
204                        self.connected_nodes
205                            .insert(
206                                node_id,
207                                ControlStreamNode {
208                                    worker_id: node.id as _,
209                                    host: node.host.clone().unwrap(),
210                                    handle,
211                                }
212                            )
213                            .is_none()
214                    );
215                    info!(?node_host, "add control stream worker");
216                    return;
217                }
218                Err(e) => {
219                    // It may happen that the dns information of newly registered worker node
220                    // has not been propagated to the meta node and cause error. Wait for a while and retry
221                    let delay = backoff.next().unwrap();
222                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
223                    sleep(delay).await;
224                }
225            }
226        }
227        error!(?node_host, "fail to create worker node after retry");
228    }
229
230    pub(super) async fn reset(
231        &mut self,
232        nodes: &HashMap<WorkerId, WorkerNode>,
233        term_id: String,
234        context: &impl GlobalBarrierWorkerContext,
235    ) -> HashSet<WorkerId> {
236        let init_request = PbInitRequest {
237            databases: vec![],
238            term_id,
239        };
240        let init_request = &init_request;
241        self.workers = nodes.clone();
242        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async move {
243            let result = context.new_control_stream(node, init_request).await;
244            (*worker_id, node.clone(), result)
245        }))
246        .await;
247        self.connected_nodes.clear();
248        let mut failed_workers = HashSet::new();
249        for (worker_id, node, result) in nodes {
250            match result {
251                Ok(handle) => {
252                    assert!(
253                        self.connected_nodes
254                            .insert(
255                                worker_id,
256                                ControlStreamNode {
257                                    worker_id: node.id as _,
258                                    host: node.host.clone().unwrap(),
259                                    handle
260                                }
261                            )
262                            .is_none()
263                    );
264                }
265                Err(e) => {
266                    failed_workers.insert(worker_id);
267                    warn!(
268                        e = %e.as_report(),
269                        worker_id,
270                        ?node,
271                        "failed to connect to node"
272                    )
273                }
274            }
275        }
276
277        failed_workers
278    }
279
280    /// Clear all nodes and response streams in the manager.
281    pub(super) fn clear(&mut self) {
282        *self = Self::new(self.env.clone());
283    }
284
285    fn poll_next_response(
286        &mut self,
287        cx: &mut Context<'_>,
288    ) -> Poll<(
289        WorkerId,
290        MetaResult<streaming_control_stream_response::Response>,
291    )> {
292        if self.connected_nodes.is_empty() {
293            return Poll::Pending;
294        }
295        let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
296        {
297            for (worker_id, node) in &mut self.connected_nodes {
298                match node.handle.response_stream.poll_next_unpin(cx) {
299                    Poll::Ready(result) => {
300                        poll_result = Poll::Ready((
301                            *worker_id,
302                            result
303                                .ok_or_else(|| anyhow!("end of stream").into())
304                                .and_then(|result| {
305                                    result.map_err(Into::<MetaError>::into).and_then(|resp| {
306                                        match resp
307                                            .response
308                                            .ok_or_else(||anyhow!("empty response"))?
309                                        {
310                                            streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!(
311                                                "worker node {worker_id} is shutting down"
312                                            )
313                                            .into()),
314                                            streaming_control_stream_response::Response::Init(_) => {
315                                                // This arm should be unreachable.
316                                                Err(anyhow!("get unexpected init response").into())
317                                            }
318                                            resp => Ok(resp),
319                                        }
320                                    })
321                                })
322                        ));
323                        break;
324                    }
325                    Poll::Pending => {
326                        continue;
327                    }
328                }
329            }
330        };
331
332        if let Poll::Ready((worker_id, Err(err))) = &poll_result {
333            let node = self
334                .connected_nodes
335                .remove(worker_id)
336                .expect("should exist when get shutdown resp");
337            warn!(worker_id = node.worker_id, host = ?node.host, err = %err.as_report(), "get error from response stream");
338        }
339
340        poll_result
341    }
342
343    #[await_tree::instrument("control_stream_next_response")]
344    pub(super) async fn next_response(
345        &mut self,
346    ) -> (
347        WorkerId,
348        MetaResult<streaming_control_stream_response::Response>,
349    ) {
350        poll_fn(|cx| self.poll_next_response(cx)).await
351    }
352
353    fn collect_init_request(
354        initial_inflight_infos: impl Iterator<
355            Item = (
356                DatabaseId,
357                &InflightSubscriptionInfo,
358                impl Iterator<Item = TableId>,
359            ),
360        >,
361        term_id: String,
362    ) -> PbInitRequest {
363        PbInitRequest {
364            databases: initial_inflight_infos
365                .map(|(database_id, subscriptions, creating_job_ids)| {
366                    let mut graphs = vec![PbInitialPartialGraph {
367                        partial_graph_id: to_partial_graph_id(None),
368                        subscriptions: subscriptions.into_iter().collect_vec(),
369                    }];
370                    graphs.extend(creating_job_ids.map(|job_id| PbInitialPartialGraph {
371                        partial_graph_id: to_partial_graph_id(Some(job_id)),
372                        subscriptions: vec![],
373                    }));
374                    PbDatabaseInitialPartialGraph {
375                        database_id: database_id.database_id,
376                        graphs,
377                    }
378                })
379                .collect(),
380            term_id,
381        }
382    }
383}
384
385pub(super) struct DatabaseInitialBarrierCollector {
386    database_id: DatabaseId,
387    node_to_collect: NodeToCollect,
388    database_state: BarrierWorkerState,
389    create_mview_tracker: CreateMviewProgressTracker,
390    creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
391    committed_epoch: u64,
392    cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
393}
394
395impl Debug for DatabaseInitialBarrierCollector {
396    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
397        f.debug_struct("DatabaseInitialBarrierCollector")
398            .field("database_id", &self.database_id)
399            .field("node_to_collect", &self.node_to_collect)
400            .finish()
401    }
402}
403
404impl DatabaseInitialBarrierCollector {
405    pub(super) fn is_collected(&self) -> bool {
406        self.node_to_collect.is_empty()
407            && self
408                .creating_streaming_job_controls
409                .values()
410                .all(|job| job.is_empty())
411    }
412
413    pub(super) fn database_state(
414        &self,
415    ) -> (
416        &BarrierWorkerState,
417        &HashMap<TableId, CreatingStreamingJobControl>,
418    ) {
419        (&self.database_state, &self.creating_streaming_job_controls)
420    }
421
422    pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
423        assert_eq!(self.database_id.database_id, resp.database_id);
424        if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
425            self.creating_streaming_job_controls
426                .get_mut(&creating_job_id)
427                .expect("should exist")
428                .collect(resp);
429        } else {
430            assert_eq!(resp.epoch, self.committed_epoch);
431            assert!(
432                self.node_to_collect
433                    .remove(&(resp.worker_id as _))
434                    .is_some()
435            );
436        }
437    }
438
439    pub(super) fn finish(self) -> DatabaseCheckpointControl {
440        assert!(self.is_collected());
441        DatabaseCheckpointControl::recovery(
442            self.database_id,
443            self.create_mview_tracker,
444            self.database_state,
445            self.committed_epoch,
446            self.creating_streaming_job_controls,
447            self.cdc_table_backfill_tracker,
448        )
449    }
450
451    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
452        is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
453            && self
454                .creating_streaming_job_controls
455                .values_mut()
456                .all(|job| job.is_valid_after_worker_err(worker_id))
457    }
458}
459
460impl ControlStreamManager {
461    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
462    #[expect(clippy::too_many_arguments)]
463    pub(super) fn inject_database_initial_barrier(
464        &mut self,
465        database_id: DatabaseId,
466        jobs: HashMap<TableId, InflightStreamingJobInfo>,
467        state_table_committed_epochs: &mut HashMap<TableId, u64>,
468        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
469        edges: &mut FragmentEdgeBuildResult,
470        stream_actors: &HashMap<ActorId, StreamActor>,
471        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
472        background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
473        mut subscription_info: InflightSubscriptionInfo,
474        is_paused: bool,
475        hummock_version_stats: &HummockVersionStats,
476        cdc_table_snapshot_split_assignment: &mut CdcTableSnapshotSplitAssignmentWithGeneration,
477    ) -> MetaResult<DatabaseInitialBarrierCollector> {
478        self.add_partial_graph(database_id, None);
479        let source_split_assignments = jobs
480            .values()
481            .flat_map(|job| job.fragment_infos())
482            .flat_map(|info| info.actors.keys())
483            .filter_map(|actor_id| {
484                let actor_id = *actor_id as ActorId;
485                source_splits
486                    .remove(&actor_id)
487                    .map(|splits| (actor_id, splits))
488            })
489            .collect();
490        let database_cdc_table_snapshot_split_assignment = jobs
491            .values()
492            .flat_map(|job| job.fragment_infos())
493            .flat_map(|info| info.actors.keys())
494            .filter_map(|actor_id| {
495                let actor_id = *actor_id as ActorId;
496                cdc_table_snapshot_split_assignment
497                    .splits
498                    .remove(&actor_id)
499                    .map(|splits| (actor_id, splits))
500            })
501            .collect();
502        let database_cdc_table_snapshot_split_assignment =
503            CdcTableSnapshotSplitAssignmentWithGeneration::new(
504                database_cdc_table_snapshot_split_assignment,
505                cdc_table_snapshot_split_assignment.generation,
506            );
507        let mutation = Mutation::Add(AddMutation {
508            // Actors built during recovery is not treated as newly added actors.
509            actor_dispatchers: Default::default(),
510            added_actors: Default::default(),
511            actor_splits: build_actor_connector_splits(&source_split_assignments),
512            actor_cdc_table_snapshot_splits:
513                build_pb_actor_cdc_table_snapshot_splits_with_generation(
514                    database_cdc_table_snapshot_split_assignment,
515                )
516                .into(),
517            pause: is_paused,
518            subscriptions_to_add: Default::default(),
519            // TODO(kwannoel): recover using backfill order plan
520            backfill_nodes_to_pause: Default::default(),
521            new_upstream_sinks: Default::default(),
522        });
523
524        fn resolve_jobs_committed_epoch(
525            state_table_committed_epochs: &mut HashMap<TableId, u64>,
526            jobs: impl IntoIterator<Item = &InflightStreamingJobInfo>,
527        ) -> u64 {
528            let mut epochs = jobs
529                .into_iter()
530                .flat_map(InflightStreamingJobInfo::existing_table_ids)
531                .map(|table_id| {
532                    (
533                        table_id,
534                        state_table_committed_epochs
535                            .remove(&table_id)
536                            .expect("should exist"),
537                    )
538                });
539            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
540            for (table_id, epoch) in epochs {
541                assert_eq!(
542                    prev_epoch, epoch,
543                    "{} has different committed epoch to {}",
544                    first_table_id, table_id
545                );
546            }
547            prev_epoch
548        }
549
550        let mut database_jobs = HashMap::new();
551        let mut snapshot_backfill_jobs = HashMap::new();
552        let mut background_mviews = HashMap::new();
553
554        for (job_id, job) in jobs {
555            if let Some((definition, stream_job_fragments)) = background_jobs.remove(&job_id) {
556                if stream_job_fragments.fragments().any(|fragment| {
557                    fragment
558                        .fragment_type_mask
559                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
560                }) {
561                    debug!(%job_id, definition, "recovered snapshot backfill job");
562                    snapshot_backfill_jobs.insert(job_id, (job, definition, stream_job_fragments));
563                } else {
564                    database_jobs.insert(job_id, job);
565                    background_mviews.insert(job_id, (definition, stream_job_fragments));
566                }
567            } else {
568                database_jobs.insert(job_id, job);
569            }
570        }
571
572        let database_job_log_epochs: HashMap<_, _> = database_jobs
573            .keys()
574            .filter_map(|job_id| {
575                state_table_log_epochs
576                    .remove(job_id)
577                    .map(|epochs| (*job_id, epochs))
578            })
579            .collect();
580
581        let prev_epoch =
582            resolve_jobs_committed_epoch(state_table_committed_epochs, database_jobs.values());
583        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
584        // Use a different `curr_epoch` for each recovery attempt.
585        let curr_epoch = prev_epoch.next();
586        let barrier_info = BarrierInfo {
587            prev_epoch,
588            curr_epoch,
589            kind: BarrierKind::Initial,
590        };
591
592        let mut ongoing_snapshot_backfill_jobs: HashMap<TableId, _> = HashMap::new();
593        for (job_id, (info, definition, stream_job_fragments)) in snapshot_backfill_jobs {
594            let committed_epoch =
595                resolve_jobs_committed_epoch(state_table_committed_epochs, [&info]);
596            if committed_epoch == barrier_info.prev_epoch() {
597                info!(
598                    "recovered creating snapshot backfill job {} catch up with upstream already",
599                    job_id
600                );
601                background_mviews
602                    .try_insert(job_id, (definition, stream_job_fragments))
603                    .expect("non-duplicate");
604                database_jobs
605                    .try_insert(job_id, info)
606                    .expect("non-duplicate");
607                continue;
608            }
609            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
610                stream_job_fragments
611                    .fragments()
612                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
613            )?
614            .0
615            .ok_or_else(|| {
616                anyhow!(
617                    "recovered snapshot backfill job {} has no snapshot backfill info",
618                    job_id
619                )
620            })?;
621            let mut snapshot_epoch = None;
622            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
623                .upstream_mv_table_id_to_backfill_epoch
624                .keys()
625                .cloned()
626                .collect();
627            for (upstream_table_id, epoch) in
628                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
629            {
630                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
631                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
632                if *snapshot_epoch != epoch {
633                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
634                }
635            }
636            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
637                anyhow!(
638                    "snapshot backfill job {} has not set snapshot epoch",
639                    job_id
640                )
641            })?;
642            for upstream_table_id in &upstream_table_ids {
643                subscription_info
644                    .mv_depended_subscriptions
645                    .entry(*upstream_table_id)
646                    .or_default()
647                    .try_insert(job_id.into(), max(snapshot_epoch, committed_epoch))
648                    .expect("non-duplicate");
649            }
650            ongoing_snapshot_backfill_jobs
651                .try_insert(
652                    job_id,
653                    (
654                        info,
655                        definition,
656                        stream_job_fragments,
657                        upstream_table_ids,
658                        committed_epoch,
659                        snapshot_epoch,
660                    ),
661                )
662                .expect("non-duplicated");
663        }
664
665        let node_to_collect = {
666            let node_actors = edges.collect_actors_to_create(database_jobs.values().flatten().map(
667                move |fragment_info| {
668                    (
669                        fragment_info.fragment_id,
670                        &fragment_info.nodes,
671                        fragment_info.actors.iter().map(move |(actor_id, actor)| {
672                            (
673                                stream_actors.get(actor_id).expect("should exist"),
674                                actor.worker_id,
675                            )
676                        }),
677                    )
678                },
679            ));
680
681            let node_to_collect = self.inject_barrier(
682                database_id,
683                None,
684                Some(mutation.clone()),
685                &barrier_info,
686                database_jobs.values().flatten(),
687                database_jobs.values().flatten(),
688                Some(node_actors),
689                (&subscription_info).into_iter().collect(),
690                vec![],
691            )?;
692            debug!(
693                ?node_to_collect,
694                database_id = database_id.database_id,
695                "inject initial barrier"
696            );
697            node_to_collect
698        };
699
700        let tracker = CreateMviewProgressTracker::recover(
701            background_mviews
702                .iter()
703                .map(|(table_id, (definition, stream_job_fragments))| {
704                    (
705                        *table_id,
706                        (definition.clone(), stream_job_fragments, Default::default()),
707                    )
708                }),
709            hummock_version_stats,
710        );
711
712        let mut creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl> =
713            HashMap::new();
714        for (
715            job_id,
716            (
717                info,
718                definition,
719                stream_job_fragments,
720                upstream_table_ids,
721                committed_epoch,
722                snapshot_epoch,
723            ),
724        ) in ongoing_snapshot_backfill_jobs
725        {
726            let node_actors =
727                edges.collect_actors_to_create(info.fragment_infos().map(move |fragment_info| {
728                    (
729                        fragment_info.fragment_id,
730                        &fragment_info.nodes,
731                        fragment_info.actors.iter().map(move |(actor_id, actor)| {
732                            (
733                                stream_actors.get(actor_id).expect("should exist"),
734                                actor.worker_id,
735                            )
736                        }),
737                    )
738                }));
739
740            creating_streaming_job_controls.insert(
741                job_id,
742                CreatingStreamingJobControl::recover(
743                    database_id,
744                    job_id,
745                    definition,
746                    upstream_table_ids,
747                    &database_job_log_epochs,
748                    snapshot_epoch,
749                    committed_epoch,
750                    barrier_info.curr_epoch.value().0,
751                    info,
752                    stream_job_fragments,
753                    hummock_version_stats,
754                    node_actors,
755                    mutation.clone(),
756                    self,
757                )?,
758            );
759        }
760
761        self.env.shared_actor_infos().recover_database(
762            database_id,
763            database_jobs.values().flatten().chain(
764                creating_streaming_job_controls
765                    .values()
766                    .flat_map(|job| job.graph_info().fragment_infos()),
767            ),
768        );
769
770        let committed_epoch = barrier_info.prev_epoch();
771        let new_epoch = barrier_info.curr_epoch;
772        let database_state = BarrierWorkerState::recovery(
773            database_id,
774            self.env.shared_actor_infos().clone(),
775            new_epoch,
776            database_jobs.into_values(),
777            subscription_info,
778            is_paused,
779        );
780        let cdc_table_backfill_tracker = self.env.cdc_table_backfill_tracker();
781        Ok(DatabaseInitialBarrierCollector {
782            database_id,
783            node_to_collect,
784            database_state,
785            create_mview_tracker: tracker,
786            creating_streaming_job_controls,
787            committed_epoch,
788            cdc_table_backfill_tracker,
789        })
790    }
791
792    pub(super) fn inject_command_ctx_barrier(
793        &mut self,
794        database_id: DatabaseId,
795        command: Option<&Command>,
796        barrier_info: &BarrierInfo,
797        is_paused: bool,
798        pre_applied_graph_info: &InflightDatabaseInfo,
799        applied_graph_info: &InflightDatabaseInfo,
800        edges: &mut Option<FragmentEdgeBuildResult>,
801    ) -> MetaResult<NodeToCollect> {
802        let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
803        let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
804            add.subscriptions_to_add.clone()
805        } else {
806            vec![]
807        };
808        let subscriptions_to_remove = if let Some(Mutation::DropSubscriptions(drop)) = &mutation {
809            drop.info.clone()
810        } else {
811            vec![]
812        };
813        self.inject_barrier(
814            database_id,
815            None,
816            mutation,
817            barrier_info,
818            pre_applied_graph_info.fragment_infos(),
819            applied_graph_info.fragment_infos(),
820            command
821                .as_ref()
822                .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
823                .unwrap_or_default(),
824            subscriptions_to_add,
825            subscriptions_to_remove,
826        )
827    }
828
829    pub(super) fn inject_barrier<'a>(
830        &mut self,
831        database_id: DatabaseId,
832        creating_table_id: Option<TableId>,
833        mutation: Option<Mutation>,
834        barrier_info: &BarrierInfo,
835        pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
836        applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
837        mut new_actors: Option<StreamJobActorsToCreate>,
838        subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
839        subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
840    ) -> MetaResult<NodeToCollect> {
841        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
842            "inject_barrier_err"
843        ));
844
845        let partial_graph_id = to_partial_graph_id(creating_table_id);
846
847        let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
848
849        for worker_id in node_actors.keys() {
850            if !self.connected_nodes.contains_key(worker_id) {
851                return Err(anyhow!("unconnected worker node {}", worker_id).into());
852            }
853        }
854
855        let table_ids_to_sync: HashSet<_> =
856            InflightFragmentInfo::existing_table_ids(applied_graph_info)
857                .map(|table_id| table_id.table_id)
858                .collect();
859
860        let mut node_need_collect = HashMap::new();
861
862        self.connected_nodes
863            .iter()
864            .try_for_each(|(node_id, node)| {
865                let actor_ids_to_collect = node_actors
866                    .get(node_id)
867                    .map(|actors| actors.iter().cloned())
868                    .into_iter()
869                    .flatten()
870                    .collect_vec();
871                let is_empty = actor_ids_to_collect.is_empty();
872                {
873                    let mutation = mutation.clone();
874                    let barrier = Barrier {
875                        epoch: Some(risingwave_pb::data::Epoch {
876                            curr: barrier_info.curr_epoch.value().0,
877                            prev: barrier_info.prev_epoch(),
878                        }),
879                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
880                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
881                            .to_protobuf(),
882                        kind: barrier_info.kind.to_protobuf() as i32,
883                        passed_actors: vec![],
884                    };
885
886                    node.handle
887                        .request_sender
888                        .send(StreamingControlStreamRequest {
889                            request: Some(
890                                streaming_control_stream_request::Request::InjectBarrier(
891                                    InjectBarrierRequest {
892                                        request_id: Uuid::new_v4().to_string(),
893                                        barrier: Some(barrier),
894                                        database_id: database_id.database_id,
895                                        actor_ids_to_collect,
896                                        table_ids_to_sync: table_ids_to_sync
897                                            .iter()
898                                            .cloned()
899                                            .collect(),
900                                        partial_graph_id,
901                                        actors_to_build: new_actors
902                                            .as_mut()
903                                            .map(|new_actors| new_actors.remove(&(*node_id as _)))
904                                            .into_iter()
905                                            .flatten()
906                                            .flatten()
907                                            .map(|(fragment_id, (node, actors))| {
908                                                FragmentBuildActorInfo {
909                                                    fragment_id,
910                                                    node: Some(node),
911                                                    actors: actors
912                                                        .into_iter()
913                                                        .map(|(actor, upstreams, dispatchers)| {
914                                                            BuildActorInfo {
915                                                                actor_id: actor.actor_id,
916                                                                fragment_upstreams: upstreams
917                                                                    .into_iter()
918                                                                    .map(|(fragment_id, upstreams)| {
919                                                                        (
920                                                                            fragment_id,
921                                                                            UpstreamActors {
922                                                                                actors: upstreams
923                                                                                    .into_values()
924                                                                                    .collect(),
925                                                                            },
926                                                                        )
927                                                                    })
928                                                                    .collect(),
929                                                                dispatchers,
930                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
931                                                                mview_definition: actor.mview_definition,
932                                                                expr_context: actor.expr_context,
933                                                            }
934                                                        })
935                                                        .collect(),
936                                                }
937                                            })
938                                            .collect(),
939                                        subscriptions_to_add: subscriptions_to_add.clone(),
940                                        subscriptions_to_remove: subscriptions_to_remove.clone(),
941                                    },
942                                ),
943                            ),
944                        })
945                        .map_err(|_| {
946                            MetaError::from(anyhow!(
947                                "failed to send request to {} {:?}",
948                                node.worker_id,
949                                node.host
950                            ))
951                        })?;
952
953                    node_need_collect.insert(*node_id as WorkerId, is_empty);
954                    Result::<_, MetaError>::Ok(())
955                }
956            })
957            .inspect_err(|e| {
958                // Record failure in event log.
959                use risingwave_pb::meta::event_log;
960                let event = event_log::EventInjectBarrierFail {
961                    prev_epoch: barrier_info.prev_epoch(),
962                    cur_epoch: barrier_info.curr_epoch.value().0,
963                    error: e.to_report_string(),
964                };
965                self.env
966                    .event_log_manager_ref()
967                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
968            })?;
969        Ok(node_need_collect)
970    }
971
972    pub(super) fn add_partial_graph(
973        &mut self,
974        database_id: DatabaseId,
975        creating_job_id: Option<TableId>,
976    ) {
977        let partial_graph_id = to_partial_graph_id(creating_job_id);
978        self.connected_nodes.iter().for_each(|(_, node)| {
979            if node
980                .handle
981                .request_sender
982                .send(StreamingControlStreamRequest {
983                    request: Some(
984                        streaming_control_stream_request::Request::CreatePartialGraph(
985                            CreatePartialGraphRequest {
986                                database_id: database_id.database_id,
987                                partial_graph_id,
988                            },
989                        ),
990                    ),
991                }).is_err() {
992                warn!(%database_id, ?creating_job_id, worker_id = node.worker_id, "fail to add partial graph to worker")
993            }
994        });
995    }
996
997    pub(super) fn remove_partial_graph(
998        &mut self,
999        database_id: DatabaseId,
1000        creating_job_ids: Vec<TableId>,
1001    ) {
1002        if creating_job_ids.is_empty() {
1003            return;
1004        }
1005        let partial_graph_ids = creating_job_ids
1006            .into_iter()
1007            .map(|job_id| to_partial_graph_id(Some(job_id)))
1008            .collect_vec();
1009        self.connected_nodes.iter().for_each(|(_, node)| {
1010            if node.handle
1011                .request_sender
1012                .send(StreamingControlStreamRequest {
1013                    request: Some(
1014                        streaming_control_stream_request::Request::RemovePartialGraph(
1015                            RemovePartialGraphRequest {
1016                                partial_graph_ids: partial_graph_ids.clone(),
1017                                database_id: database_id.database_id,
1018                            },
1019                        ),
1020                    ),
1021                })
1022                .is_err()
1023            {
1024                warn!(worker_id = node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1025            }
1026        })
1027    }
1028
1029    pub(super) fn reset_database(
1030        &mut self,
1031        database_id: DatabaseId,
1032        reset_request_id: u32,
1033    ) -> HashSet<WorkerId> {
1034        self.connected_nodes
1035            .iter()
1036            .filter_map(|(worker_id, node)| {
1037                if node
1038                    .handle
1039                    .request_sender
1040                    .send(StreamingControlStreamRequest {
1041                        request: Some(streaming_control_stream_request::Request::ResetDatabase(
1042                            ResetDatabaseRequest {
1043                                database_id: database_id.database_id,
1044                                reset_request_id,
1045                            },
1046                        )),
1047                    })
1048                    .is_err()
1049                {
1050                    warn!(worker_id, node = ?node.host,"failed to send reset database request");
1051                    None
1052                } else {
1053                    Some(*worker_id)
1054                }
1055            })
1056            .collect()
1057    }
1058}
1059
1060impl GlobalBarrierWorkerContextImpl {
1061    pub(super) async fn new_control_stream_impl(
1062        &self,
1063        node: &WorkerNode,
1064        init_request: &PbInitRequest,
1065    ) -> MetaResult<StreamingControlHandle> {
1066        let handle = self
1067            .env
1068            .stream_client_pool()
1069            .get(node)
1070            .await?
1071            .start_streaming_control(init_request.clone())
1072            .await?;
1073        Ok(handle)
1074    }
1075}
1076
1077pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1078    message: &str,
1079    errors: impl IntoIterator<Item = (WorkerId, E)>,
1080) -> MetaError {
1081    use std::fmt::Write;
1082
1083    use risingwave_common::error::error_request_copy;
1084    use risingwave_common::error::tonic::extra::Score;
1085
1086    let errors = errors.into_iter().collect_vec();
1087
1088    if errors.is_empty() {
1089        return anyhow!(message.to_owned()).into();
1090    }
1091
1092    // Create the error from the single error.
1093    let single_error = |(worker_id, e)| {
1094        anyhow::Error::from(e)
1095            .context(format!("{message}, in worker node {worker_id}"))
1096            .into()
1097    };
1098
1099    if errors.len() == 1 {
1100        return single_error(errors.into_iter().next().unwrap());
1101    }
1102
1103    // Find the error with the highest score.
1104    let max_score = errors
1105        .iter()
1106        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1107        .max();
1108
1109    if let Some(max_score) = max_score {
1110        let mut errors = errors;
1111        let max_scored = errors
1112            .extract_if(.., |(_, e)| {
1113                error_request_copy::<Score>(e) == Some(max_score)
1114            })
1115            .next()
1116            .unwrap();
1117
1118        return single_error(max_scored);
1119    }
1120
1121    // The errors do not have scores, so simply concatenate them.
1122    let concat: String = errors
1123        .into_iter()
1124        .fold(format!("{message}: "), |mut s, (w, e)| {
1125            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1126            s
1127        });
1128    anyhow!(concat).into()
1129}