1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
30use risingwave_common::util::epoch::Epoch;
31use risingwave_common::util::tracing::TracingContext;
32use risingwave_connector::source::SplitImpl;
33use risingwave_connector::source::cdc::{
34    CdcTableSnapshotSplitAssignmentWithGeneration,
35    build_pb_actor_cdc_table_snapshot_splits_with_generation,
36};
37use risingwave_meta_model::WorkerId;
38use risingwave_pb::common::{HostAddress, WorkerNode};
39use risingwave_pb::hummock::HummockVersionStats;
40use risingwave_pb::stream_plan::barrier_mutation::Mutation;
41use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
42use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
43use risingwave_pb::stream_service::inject_barrier_request::{
44    BuildActorInfo, FragmentBuildActorInfo,
45};
46use risingwave_pb::stream_service::streaming_control_stream_request::{
47    CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
48    RemovePartialGraphRequest, ResetDatabaseRequest,
49};
50use risingwave_pb::stream_service::{
51    BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
52    streaming_control_stream_request, streaming_control_stream_response,
53};
54use risingwave_rpc_client::StreamingControlHandle;
55use thiserror_ext::AsReport;
56use tokio::time::{Instant, sleep};
57use tokio_retry::strategy::ExponentialBackoff;
58use tracing::{debug, error, info, warn};
59use uuid::Uuid;
60
61use super::{BarrierKind, Command, TracedEpoch};
62use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
63use crate::barrier::checkpoint::{
64    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
65};
66use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
67use crate::barrier::edge_builder::FragmentEdgeBuildResult;
68use crate::barrier::info::{
69    BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo, SubscriberType,
70};
71use crate::barrier::progress::CreateMviewProgressTracker;
72use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
73use crate::controller::fragment::InflightFragmentInfo;
74use crate::manager::MetaSrvEnv;
75use crate::model::{ActorId, FragmentId, StreamActor, StreamJobActorsToCreate, SubscriptionId};
76use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
77use crate::{MetaError, MetaResult};
78
79fn to_partial_graph_id(job_id: Option<TableId>) -> u32 {
80    job_id
81        .map(|table| {
82            assert_ne!(table.table_id, u32::MAX);
83            table.table_id
84        })
85        .unwrap_or(u32::MAX)
86}
87
88pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<TableId> {
89    if partial_graph_id == u32::MAX {
90        None
91    } else {
92        Some(TableId::new(partial_graph_id))
93    }
94}
95
96struct ControlStreamNode {
97    worker_id: WorkerId,
98    host: HostAddress,
99    handle: StreamingControlHandle,
100}
101
102enum WorkerNodeState {
103    Connected {
104        control_stream: ControlStreamNode,
105        removed: bool,
106    },
107    Reconnecting(BoxFuture<'static, StreamingControlHandle>),
108}
109
110pub(super) struct ControlStreamManager {
111    workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
112    pub env: MetaSrvEnv,
113}
114
115impl ControlStreamManager {
116    pub(super) fn new(env: MetaSrvEnv) -> Self {
117        Self {
118            workers: Default::default(),
119            env,
120        }
121    }
122
123    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
124        self.workers[&worker_id].0.host.clone().unwrap()
125    }
126
127    pub(super) async fn add_worker(
128        &mut self,
129        node: WorkerNode,
130        inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = TableId>)>,
131        term_id: String,
132        context: &impl GlobalBarrierWorkerContext,
133    ) {
134        let node_id = node.id as WorkerId;
135        if let Entry::Occupied(entry) = self.workers.entry(node_id) {
136            let (existing_node, worker_state) = entry.get();
137            assert_eq!(existing_node.host, node.host);
138            warn!(id = node.id, host = ?node.host, "node already exists");
139            match worker_state {
140                WorkerNodeState::Connected { .. } => {
141                    warn!(id = node.id, host = ?node.host, "new node already connected");
142                    return;
143                }
144                WorkerNodeState::Reconnecting(_) => {
145                    warn!(id = node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
146                    entry.remove();
147                }
148            }
149        }
150        let node_host = node.host.clone().unwrap();
151        let mut backoff = ExponentialBackoff::from_millis(100)
152            .max_delay(Duration::from_secs(3))
153            .factor(5);
154        const MAX_RETRY: usize = 5;
155        for i in 1..=MAX_RETRY {
156            match context
157                .new_control_stream(
158                    &node,
159                    &PbInitRequest {
160                        term_id: term_id.clone(),
161                    },
162                )
163                .await
164            {
165                Ok(mut handle) => {
166                    WorkerNodeConnected {
167                        handle: &mut handle,
168                        node: &node,
169                    }
170                    .initialize(inflight_infos);
171                    info!(?node_host, "add control stream worker");
172                    assert!(
173                        self.workers
174                            .insert(
175                                node_id,
176                                (
177                                    node,
178                                    WorkerNodeState::Connected {
179                                        control_stream: ControlStreamNode {
180                                            worker_id: node_id as _,
181                                            host: node_host,
182                                            handle,
183                                        },
184                                        removed: false
185                                    }
186                                )
187                            )
188                            .is_none()
189                    );
190                    return;
191                }
192                Err(e) => {
193                    let delay = backoff.next().unwrap();
196                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
197                    sleep(delay).await;
198                }
199            }
200        }
201        error!(?node_host, "fail to create worker node after retry");
202    }
203
204    pub(super) fn remove_worker(&mut self, node: WorkerNode) {
205        if let Entry::Occupied(mut entry) = self.workers.entry(node.id as _) {
206            let (_, worker_state) = entry.get_mut();
207            match worker_state {
208                WorkerNodeState::Connected { removed, .. } => {
209                    info!(worker_id = node.id, "mark connected worker as removed");
210                    *removed = true;
211                }
212                WorkerNodeState::Reconnecting(_) => {
213                    info!(worker_id = node.id, "remove worker");
214                    entry.remove();
215                }
216            }
217        }
218    }
219
220    fn retry_connect(
221        node: WorkerNode,
222        term_id: String,
223        context: Arc<impl GlobalBarrierWorkerContext>,
224    ) -> BoxFuture<'static, StreamingControlHandle> {
225        async move {
226            let mut attempt = 0;
227            let backoff = ExponentialBackoff::from_millis(100)
228                .max_delay(Duration::from_mins(1))
229                .factor(5);
230            let init_request = PbInitRequest { term_id };
231            for delay in backoff {
232                attempt += 1;
233                sleep(delay).await;
234                match context.new_control_stream(&node, &init_request).await {
235                    Ok(handle) => {
236                        return handle;
237                    }
238                    Err(e) => {
239                        warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
240                    }
241                }
242            }
243            unreachable!("end of retry backoff")
244        }.boxed()
245    }
246
247    pub(super) async fn recover(
248        env: MetaSrvEnv,
249        nodes: &HashMap<WorkerId, WorkerNode>,
250        term_id: &str,
251        context: Arc<impl GlobalBarrierWorkerContext>,
252    ) -> Self {
253        let reset_start_time = Instant::now();
254        let init_request = PbInitRequest {
255            term_id: term_id.to_owned(),
256        };
257        let init_request = &init_request;
258        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
259            let result = context.new_control_stream(node, init_request).await;
260            (*worker_id, node.clone(), result)
261        }))
262        .await;
263        let mut unconnected_workers = HashSet::new();
264        let mut workers = HashMap::new();
265        for (worker_id, node, result) in nodes {
266            match result {
267                Ok(handle) => {
268                    let control_stream = ControlStreamNode {
269                        worker_id: node.id as _,
270                        host: node.host.clone().unwrap(),
271                        handle,
272                    };
273                    assert!(
274                        workers
275                            .insert(
276                                worker_id,
277                                (
278                                    node,
279                                    WorkerNodeState::Connected {
280                                        control_stream,
281                                        removed: false
282                                    }
283                                )
284                            )
285                            .is_none()
286                    );
287                }
288                Err(e) => {
289                    unconnected_workers.insert(worker_id);
290                    warn!(
291                        e = %e.as_report(),
292                        worker_id,
293                        ?node,
294                        "failed to connect to node"
295                    );
296                    assert!(
297                        workers
298                            .insert(
299                                worker_id,
300                                (
301                                    node.clone(),
302                                    WorkerNodeState::Reconnecting(Self::retry_connect(
303                                        node,
304                                        term_id.to_owned(),
305                                        context.clone()
306                                    ))
307                                )
308                            )
309                            .is_none()
310                    );
311                }
312            }
313        }
314
315        info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
316
317        Self { workers, env }
318    }
319
320    pub(super) fn clear(&mut self) {
322        *self = Self::new(self.env.clone());
323    }
324}
325
326pub(super) struct WorkerNodeConnected<'a> {
327    node: &'a WorkerNode,
328    handle: &'a mut StreamingControlHandle,
329}
330
331impl<'a> WorkerNodeConnected<'a> {
332    pub(super) fn initialize(
333        self,
334        inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = TableId>)>,
335    ) {
336        for request in ControlStreamManager::collect_init_partial_graph(inflight_infos) {
337            if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
338                request: Some(
339                    streaming_control_stream_request::Request::CreatePartialGraph(request),
340                ),
341            }) {
342                warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
343            }
344        }
345    }
346}
347
348pub(super) enum WorkerNodeEvent<'a> {
349    Response(MetaResult<streaming_control_stream_response::Response>),
350    Connected(WorkerNodeConnected<'a>),
351}
352
353impl ControlStreamManager {
354    fn poll_next_event<'a>(
355        this_opt: &mut Option<&'a mut Self>,
356        cx: &mut Context<'_>,
357        term_id: &str,
358        context: &Arc<impl GlobalBarrierWorkerContext>,
359        poll_reconnect: bool,
360    ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
361        let this = this_opt.as_mut().expect("Future polled after completion");
362        if this.workers.is_empty() {
363            return Poll::Pending;
364        }
365        {
366            for (&worker_id, (node, worker_state)) in &mut this.workers {
367                let control_stream = match worker_state {
368                    WorkerNodeState::Connected { control_stream, .. } => control_stream,
369                    WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
370                        continue;
371                    }
372                    WorkerNodeState::Reconnecting(join_handle) => {
373                        match join_handle.poll_unpin(cx) {
374                            Poll::Ready(handle) => {
375                                info!(id=node.id, host=?node.host, "reconnected to worker");
376                                *worker_state = WorkerNodeState::Connected {
377                                    control_stream: ControlStreamNode {
378                                        worker_id: node.id as _,
379                                        host: node.host.clone().unwrap(),
380                                        handle,
381                                    },
382                                    removed: false,
383                                };
384                                let this = this_opt.take().expect("should exist");
385                                let (node, worker_state) =
386                                    this.workers.get_mut(&worker_id).expect("should exist");
387                                let WorkerNodeState::Connected { control_stream, .. } =
388                                    worker_state
389                                else {
390                                    unreachable!()
391                                };
392                                return Poll::Ready((
393                                    worker_id,
394                                    WorkerNodeEvent::Connected(WorkerNodeConnected {
395                                        node,
396                                        handle: &mut control_stream.handle,
397                                    }),
398                                ));
399                            }
400                            Poll::Pending => {
401                                continue;
402                            }
403                        }
404                    }
405                };
406                match control_stream.handle.response_stream.poll_next_unpin(cx) {
407                    Poll::Ready(result) => {
408                        {
409                            let result = result
410                                .ok_or_else(|| (false, anyhow!("end of stream").into()))
411                                .and_then(|result| {
412                                    result.map_err(|err| -> (bool, MetaError) {(false, err.into())}).and_then(|resp| {
413                                        match resp
414                                            .response
415                                            .ok_or_else(|| (false, anyhow!("empty response").into()))?
416                                        {
417                                            streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
418                                                "worker node {worker_id} is shutting down"
419                                            )
420                                                .into())),
421                                            streaming_control_stream_response::Response::Init(_) => {
422                                                Err((false, anyhow!("get unexpected init response").into()))
424                                            }
425                                            resp => {
426                                                if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
427                                                    assert_eq!(worker_id, barrier_resp.worker_id as WorkerId);
428                                                }
429                                                Ok(resp)
430                                            },
431                                        }
432                                    })
433                                });
434                            let result = match result {
435                                Ok(resp) => Ok(resp),
436                                Err((shutdown, err)) => {
437                                    warn!(worker_id = node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
438                                    let WorkerNodeState::Connected { removed, .. } = worker_state
439                                    else {
440                                        unreachable!("checked connected")
441                                    };
442                                    if *removed || shutdown {
443                                        this.workers.remove(&worker_id);
444                                    } else {
445                                        *worker_state = WorkerNodeState::Reconnecting(
446                                            ControlStreamManager::retry_connect(
447                                                node.clone(),
448                                                term_id.to_owned(),
449                                                context.clone(),
450                                            ),
451                                        );
452                                    }
453                                    Err(err)
454                                }
455                            };
456                            return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
457                        }
458                    }
459                    Poll::Pending => {
460                        continue;
461                    }
462                }
463            }
464        };
465
466        Poll::Pending
467    }
468
469    #[await_tree::instrument("control_stream_next_event")]
470    pub(super) async fn next_event<'a>(
471        &'a mut self,
472        term_id: &str,
473        context: &Arc<impl GlobalBarrierWorkerContext>,
474    ) -> (WorkerId, WorkerNodeEvent<'a>) {
475        let mut this = Some(self);
476        poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
477    }
478
479    #[await_tree::instrument("control_stream_next_response")]
480    pub(super) async fn next_response(
481        &mut self,
482        term_id: &str,
483        context: &Arc<impl GlobalBarrierWorkerContext>,
484    ) -> (
485        WorkerId,
486        MetaResult<streaming_control_stream_response::Response>,
487    ) {
488        let mut this = Some(self);
489        let (worker_id, event) =
490            poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
491        match event {
492            WorkerNodeEvent::Response(result) => (worker_id, result),
493            WorkerNodeEvent::Connected(_) => {
494                unreachable!("set poll_reconnect=false")
495            }
496        }
497    }
498
499    fn collect_init_partial_graph(
500        initial_inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = TableId>)>,
501    ) -> impl Iterator<Item = PbCreatePartialGraphRequest> {
502        initial_inflight_infos.flat_map(|(database_id, creating_job_ids)| {
503            [PbCreatePartialGraphRequest {
504                partial_graph_id: to_partial_graph_id(None),
505                database_id: database_id.into(),
506            }]
507            .into_iter()
508            .chain(
509                creating_job_ids.map(move |job_id| PbCreatePartialGraphRequest {
510                    partial_graph_id: to_partial_graph_id(Some(job_id)),
511                    database_id: database_id.into(),
512                }),
513            )
514        })
515    }
516}
517
518pub(super) struct DatabaseInitialBarrierCollector {
519    database_id: DatabaseId,
520    node_to_collect: NodeToCollect,
521    database_state: BarrierWorkerState,
522    create_mview_tracker: CreateMviewProgressTracker,
523    creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
524    committed_epoch: u64,
525    cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
526}
527
528impl Debug for DatabaseInitialBarrierCollector {
529    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
530        f.debug_struct("DatabaseInitialBarrierCollector")
531            .field("database_id", &self.database_id)
532            .field("node_to_collect", &self.node_to_collect)
533            .finish()
534    }
535}
536
537impl DatabaseInitialBarrierCollector {
538    pub(super) fn is_collected(&self) -> bool {
539        self.node_to_collect.is_empty()
540            && self
541                .creating_streaming_job_controls
542                .values()
543                .all(|job| job.is_empty())
544    }
545
546    pub(super) fn database_state(
547        &self,
548    ) -> (
549        &BarrierWorkerState,
550        &HashMap<TableId, CreatingStreamingJobControl>,
551    ) {
552        (&self.database_state, &self.creating_streaming_job_controls)
553    }
554
555    pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
556        assert_eq!(self.database_id.database_id, resp.database_id);
557        if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
558            self.creating_streaming_job_controls
559                .get_mut(&creating_job_id)
560                .expect("should exist")
561                .collect(resp);
562        } else {
563            assert_eq!(resp.epoch, self.committed_epoch);
564            assert!(
565                self.node_to_collect
566                    .remove(&(resp.worker_id as _))
567                    .is_some()
568            );
569        }
570    }
571
572    pub(super) fn finish(self) -> DatabaseCheckpointControl {
573        assert!(self.is_collected());
574        DatabaseCheckpointControl::recovery(
575            self.database_id,
576            self.create_mview_tracker,
577            self.database_state,
578            self.committed_epoch,
579            self.creating_streaming_job_controls,
580            self.cdc_table_backfill_tracker,
581        )
582    }
583
584    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
585        is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
586            && self
587                .creating_streaming_job_controls
588                .values_mut()
589                .all(|job| job.is_valid_after_worker_err(worker_id))
590    }
591}
592
593impl ControlStreamManager {
594    #[expect(clippy::too_many_arguments)]
596    pub(super) fn inject_database_initial_barrier(
597        &mut self,
598        database_id: DatabaseId,
599        jobs: HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>,
600        state_table_committed_epochs: &mut HashMap<TableId, u64>,
601        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
602        edges: &mut FragmentEdgeBuildResult,
603        stream_actors: &HashMap<ActorId, StreamActor>,
604        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
605        background_jobs: &mut HashMap<TableId, String>,
606        mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
607        is_paused: bool,
608        hummock_version_stats: &HummockVersionStats,
609        cdc_table_snapshot_split_assignment: &mut CdcTableSnapshotSplitAssignmentWithGeneration,
610    ) -> MetaResult<DatabaseInitialBarrierCollector> {
611        self.add_partial_graph(database_id, None);
612        let source_split_assignments = jobs
613            .values()
614            .flat_map(|fragments| fragments.values())
615            .flat_map(|info| info.actors.keys())
616            .filter_map(|actor_id| {
617                let actor_id = *actor_id as ActorId;
618                source_splits
619                    .remove(&actor_id)
620                    .map(|splits| (actor_id, splits))
621            })
622            .collect();
623        let database_cdc_table_snapshot_split_assignment = jobs
624            .values()
625            .flat_map(|fragments| fragments.values())
626            .flat_map(|info| info.actors.keys())
627            .filter_map(|actor_id| {
628                let actor_id = *actor_id as ActorId;
629                cdc_table_snapshot_split_assignment
630                    .splits
631                    .remove(&actor_id)
632                    .map(|splits| (actor_id, splits))
633            })
634            .collect();
635        let database_cdc_table_snapshot_split_assignment =
636            CdcTableSnapshotSplitAssignmentWithGeneration::new(
637                database_cdc_table_snapshot_split_assignment,
638                cdc_table_snapshot_split_assignment.generation,
639            );
640        let mutation = Mutation::Add(AddMutation {
641            actor_dispatchers: Default::default(),
643            added_actors: Default::default(),
644            actor_splits: build_actor_connector_splits(&source_split_assignments),
645            actor_cdc_table_snapshot_splits:
646                build_pb_actor_cdc_table_snapshot_splits_with_generation(
647                    database_cdc_table_snapshot_split_assignment,
648                )
649                .into(),
650            pause: is_paused,
651            subscriptions_to_add: Default::default(),
652            backfill_nodes_to_pause: Default::default(),
654            new_upstream_sinks: Default::default(),
655        });
656
657        fn resolve_jobs_committed_epoch<'a>(
658            state_table_committed_epochs: &mut HashMap<TableId, u64>,
659            fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
660        ) -> u64 {
661            let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
662                (
663                    table_id,
664                    state_table_committed_epochs
665                        .remove(&table_id)
666                        .expect("should exist"),
667                )
668            });
669            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
670            for (table_id, epoch) in epochs {
671                assert_eq!(
672                    prev_epoch, epoch,
673                    "{} has different committed epoch to {}",
674                    first_table_id, table_id
675                );
676            }
677            prev_epoch
678        }
679
680        let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
681            .keys()
682            .filter_map(|job_id| {
683                mv_depended_subscriptions
684                    .remove(job_id)
685                    .map(|subscriptions| {
686                        (
687                            *job_id,
688                            subscriptions
689                                .into_iter()
690                                .map(|(subscription_id, retention)| {
691                                    (subscription_id, SubscriberType::Subscription(retention))
692                                })
693                                .collect(),
694                        )
695                    })
696            })
697            .collect();
698
699        let mut database_jobs = HashMap::new();
700        let mut snapshot_backfill_jobs = HashMap::new();
701        let mut background_mviews = HashMap::new();
702
703        for (job_id, job_fragments) in jobs {
704            if let Some(definition) = background_jobs.remove(&job_id) {
705                if job_fragments.values().any(|fragment| {
706                    fragment
707                        .fragment_type_mask
708                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
709                }) {
710                    debug!(%job_id, definition, "recovered snapshot backfill job");
711                    snapshot_backfill_jobs.insert(job_id, (job_fragments, definition));
712                } else {
713                    database_jobs.insert(job_id, job_fragments);
714                    background_mviews.insert(job_id, definition);
715                }
716            } else {
717                database_jobs.insert(job_id, job_fragments);
718            }
719        }
720
721        let database_job_log_epochs: HashMap<_, _> = database_jobs
722            .keys()
723            .filter_map(|job_id| {
724                state_table_log_epochs
725                    .remove(job_id)
726                    .map(|epochs| (*job_id, epochs))
727            })
728            .collect();
729
730        let prev_epoch = resolve_jobs_committed_epoch(
731            state_table_committed_epochs,
732            database_jobs.values().flat_map(|job| job.values()),
733        );
734        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
735        let curr_epoch = prev_epoch.next();
737        let barrier_info = BarrierInfo {
738            prev_epoch,
739            curr_epoch,
740            kind: BarrierKind::Initial,
741        };
742
743        let mut ongoing_snapshot_backfill_jobs: HashMap<TableId, _> = HashMap::new();
744        for (job_id, (fragment_infos, definition)) in snapshot_backfill_jobs {
745            let committed_epoch =
746                resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
747            if committed_epoch == barrier_info.prev_epoch() {
748                info!(
749                    "recovered creating snapshot backfill job {} catch up with upstream already",
750                    job_id
751                );
752                background_mviews
753                    .try_insert(job_id, definition)
754                    .expect("non-duplicate");
755                database_jobs
756                    .try_insert(job_id, fragment_infos)
757                    .expect("non-duplicate");
758                continue;
759            }
760            let info = InflightStreamingJobInfo {
761                job_id,
762                fragment_infos,
763                subscribers: Default::default(), };
765            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
766                info.fragment_infos()
767                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
768            )?
769            .0
770            .ok_or_else(|| {
771                anyhow!(
772                    "recovered snapshot backfill job {} has no snapshot backfill info",
773                    job_id
774                )
775            })?;
776            let mut snapshot_epoch = None;
777            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
778                .upstream_mv_table_id_to_backfill_epoch
779                .keys()
780                .cloned()
781                .collect();
782            for (upstream_table_id, epoch) in
783                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
784            {
785                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
786                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
787                if *snapshot_epoch != epoch {
788                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
789                }
790            }
791            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
792                anyhow!(
793                    "snapshot backfill job {} has not set snapshot epoch",
794                    job_id
795                )
796            })?;
797            for upstream_table_id in &upstream_table_ids {
798                subscribers
799                    .entry(*upstream_table_id)
800                    .or_default()
801                    .try_insert(job_id.into(), SubscriberType::SnapshotBackfill)
802                    .expect("non-duplicate");
803            }
804            ongoing_snapshot_backfill_jobs
805                .try_insert(
806                    job_id,
807                    (
808                        info,
809                        definition,
810                        upstream_table_ids,
811                        committed_epoch,
812                        snapshot_epoch,
813                    ),
814                )
815                .expect("non-duplicated");
816        }
817
818        let database_jobs: HashMap<TableId, InflightStreamingJobInfo> = {
819            database_jobs
820                .into_iter()
821                .map(|(job_id, fragment_infos)| {
822                    (
823                        job_id,
824                        InflightStreamingJobInfo {
825                            job_id,
826                            fragment_infos,
827                            subscribers: subscribers.remove(&job_id).unwrap_or_default(),
828                        },
829                    )
830                })
831                .collect()
832        };
833
834        let node_to_collect = {
835            let node_actors =
836                edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
837                    job.fragment_infos.values().map(move |fragment_info| {
838                        (
839                            fragment_info.fragment_id,
840                            &fragment_info.nodes,
841                            fragment_info.actors.iter().map(move |(actor_id, actor)| {
842                                (
843                                    stream_actors.get(actor_id).expect("should exist"),
844                                    actor.worker_id,
845                                )
846                            }),
847                            job.subscribers.keys().copied(),
848                        )
849                    })
850                }));
851
852            let node_to_collect = self.inject_barrier(
853                database_id,
854                None,
855                Some(mutation.clone()),
856                &barrier_info,
857                database_jobs.values().flatten(),
858                database_jobs.values().flatten(),
859                Some(node_actors),
860            )?;
861            debug!(
862                ?node_to_collect,
863                database_id = database_id.database_id,
864                "inject initial barrier"
865            );
866            node_to_collect
867        };
868
869        let tracker = CreateMviewProgressTracker::recover(
870            background_mviews.iter().map(|(table_id, definition)| {
871                (
872                    *table_id,
873                    (
874                        definition.clone(),
875                        &database_jobs[table_id],
876                        Default::default(),
877                    ),
878                )
879            }),
880            hummock_version_stats,
881        );
882
883        let mut creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl> =
884            HashMap::new();
885        for (job_id, (info, definition, upstream_table_ids, committed_epoch, snapshot_epoch)) in
886            ongoing_snapshot_backfill_jobs
887        {
888            let node_actors =
889                edges.collect_actors_to_create(info.fragment_infos().map(|fragment_info| {
890                    (
891                        fragment_info.fragment_id,
892                        &fragment_info.nodes,
893                        fragment_info.actors.iter().map(move |(actor_id, actor)| {
894                            (
895                                stream_actors.get(actor_id).expect("should exist"),
896                                actor.worker_id,
897                            )
898                        }),
899                        info.subscribers.keys().copied(),
900                    )
901                }));
902
903            creating_streaming_job_controls.insert(
904                job_id,
905                CreatingStreamingJobControl::recover(
906                    database_id,
907                    job_id,
908                    definition,
909                    upstream_table_ids,
910                    &database_job_log_epochs,
911                    snapshot_epoch,
912                    committed_epoch,
913                    barrier_info.curr_epoch.value().0,
914                    info,
915                    hummock_version_stats,
916                    node_actors,
917                    mutation.clone(),
918                    self,
919                )?,
920            );
921        }
922
923        self.env.shared_actor_infos().recover_database(
924            database_id,
925            database_jobs
926                .values()
927                .chain(
928                    creating_streaming_job_controls
929                        .values()
930                        .map(|job| job.graph_info()),
931                )
932                .flat_map(|info| {
933                    info.fragment_infos()
934                        .map(move |fragment| (fragment, info.job_id))
935                }),
936        );
937
938        let committed_epoch = barrier_info.prev_epoch();
939        let new_epoch = barrier_info.curr_epoch;
940        let database_state = BarrierWorkerState::recovery(
941            database_id,
942            self.env.shared_actor_infos().clone(),
943            new_epoch,
944            database_jobs.into_values(),
945            is_paused,
946        );
947        let cdc_table_backfill_tracker = self.env.cdc_table_backfill_tracker();
948        Ok(DatabaseInitialBarrierCollector {
949            database_id,
950            node_to_collect,
951            database_state,
952            create_mview_tracker: tracker,
953            creating_streaming_job_controls,
954            committed_epoch,
955            cdc_table_backfill_tracker,
956        })
957    }
958
959    pub(super) fn inject_command_ctx_barrier(
960        &mut self,
961        database_id: DatabaseId,
962        command: Option<&Command>,
963        barrier_info: &BarrierInfo,
964        is_paused: bool,
965        pre_applied_graph_info: &InflightDatabaseInfo,
966        applied_graph_info: &InflightDatabaseInfo,
967        edges: &mut Option<FragmentEdgeBuildResult>,
968    ) -> MetaResult<NodeToCollect> {
969        let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
970        self.inject_barrier(
971            database_id,
972            None,
973            mutation,
974            barrier_info,
975            pre_applied_graph_info.fragment_infos(),
976            applied_graph_info.fragment_infos(),
977            command
978                .as_ref()
979                .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
980                .unwrap_or_default(),
981        )
982    }
983
984    fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
985        self.workers
986            .iter()
987            .filter_map(|(worker_id, (_, worker_state))| match worker_state {
988                WorkerNodeState::Connected { control_stream, .. } => {
989                    Some((*worker_id, control_stream))
990                }
991                WorkerNodeState::Reconnecting(_) => None,
992            })
993    }
994
995    pub(super) fn inject_barrier<'a>(
996        &mut self,
997        database_id: DatabaseId,
998        creating_table_id: Option<TableId>,
999        mutation: Option<Mutation>,
1000        barrier_info: &BarrierInfo,
1001        pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
1002        applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
1003        mut new_actors: Option<StreamJobActorsToCreate>,
1004    ) -> MetaResult<NodeToCollect> {
1005        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1006            "inject_barrier_err"
1007        ));
1008
1009        let partial_graph_id = to_partial_graph_id(creating_table_id);
1010
1011        let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
1012
1013        for worker_id in node_actors.keys() {
1014            if let Some((_, worker_state)) = self.workers.get(worker_id)
1015                && let WorkerNodeState::Connected { .. } = worker_state
1016            {
1017            } else {
1018                return Err(anyhow!("unconnected worker node {}", worker_id).into());
1019            }
1020        }
1021
1022        let table_ids_to_sync: HashSet<_> =
1023            InflightFragmentInfo::existing_table_ids(applied_graph_info)
1024                .map(|table_id| table_id.table_id)
1025                .collect();
1026
1027        let mut node_need_collect = HashMap::new();
1028
1029        self.connected_workers()
1030            .try_for_each(|(node_id, node)| {
1031                let actor_ids_to_collect = node_actors
1032                    .get(&node_id)
1033                    .map(|actors| actors.iter().cloned())
1034                    .into_iter()
1035                    .flatten()
1036                    .collect_vec();
1037                let is_empty = actor_ids_to_collect.is_empty();
1038                {
1039                    let mutation = mutation.clone();
1040                    let barrier = Barrier {
1041                        epoch: Some(risingwave_pb::data::Epoch {
1042                            curr: barrier_info.curr_epoch.value().0,
1043                            prev: barrier_info.prev_epoch(),
1044                        }),
1045                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1046                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1047                            .to_protobuf(),
1048                        kind: barrier_info.kind.to_protobuf() as i32,
1049                        passed_actors: vec![],
1050                    };
1051
1052                    node.handle
1053                        .request_sender
1054                        .send(StreamingControlStreamRequest {
1055                            request: Some(
1056                                streaming_control_stream_request::Request::InjectBarrier(
1057                                    InjectBarrierRequest {
1058                                        request_id: Uuid::new_v4().to_string(),
1059                                        barrier: Some(barrier),
1060                                        database_id: database_id.database_id,
1061                                        actor_ids_to_collect,
1062                                        table_ids_to_sync: table_ids_to_sync
1063                                            .iter()
1064                                            .cloned()
1065                                            .collect(),
1066                                        partial_graph_id,
1067                                        actors_to_build: new_actors
1068                                            .as_mut()
1069                                            .map(|new_actors| new_actors.remove(&(node_id as _)))
1070                                            .into_iter()
1071                                            .flatten()
1072                                            .flatten()
1073                                            .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1074                                                FragmentBuildActorInfo {
1075                                                    fragment_id,
1076                                                    node: Some(node),
1077                                                    actors: actors
1078                                                        .into_iter()
1079                                                        .map(|(actor, upstreams, dispatchers)| {
1080                                                            BuildActorInfo {
1081                                                                actor_id: actor.actor_id,
1082                                                                fragment_upstreams: upstreams
1083                                                                    .into_iter()
1084                                                                    .map(|(fragment_id, upstreams)| {
1085                                                                        (
1086                                                                            fragment_id,
1087                                                                            UpstreamActors {
1088                                                                                actors: upstreams
1089                                                                                    .into_values()
1090                                                                                    .collect(),
1091                                                                            },
1092                                                                        )
1093                                                                    })
1094                                                                    .collect(),
1095                                                                dispatchers,
1096                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1097                                                                mview_definition: actor.mview_definition,
1098                                                                expr_context: actor.expr_context,
1099                                                                initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1100                                                            }
1101                                                        })
1102                                                        .collect(),
1103                                                }
1104                                            })
1105                                            .collect(),
1106                                    },
1107                                ),
1108                            ),
1109                        })
1110                        .map_err(|_| {
1111                            MetaError::from(anyhow!(
1112                                "failed to send request to {} {:?}",
1113                                node.worker_id,
1114                                node.host
1115                            ))
1116                        })?;
1117
1118                    node_need_collect.insert(node_id as WorkerId, is_empty);
1119                    Result::<_, MetaError>::Ok(())
1120                }
1121            })
1122            .inspect_err(|e| {
1123                use risingwave_pb::meta::event_log;
1125                let event = event_log::EventInjectBarrierFail {
1126                    prev_epoch: barrier_info.prev_epoch(),
1127                    cur_epoch: barrier_info.curr_epoch.value().0,
1128                    error: e.to_report_string(),
1129                };
1130                self.env
1131                    .event_log_manager_ref()
1132                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1133            })?;
1134        Ok(node_need_collect)
1135    }
1136
1137    pub(super) fn add_partial_graph(
1138        &mut self,
1139        database_id: DatabaseId,
1140        creating_job_id: Option<TableId>,
1141    ) {
1142        let partial_graph_id = to_partial_graph_id(creating_job_id);
1143        self.connected_workers().for_each(|(_, node)| {
1144            if node
1145                .handle
1146                .request_sender
1147                .send(StreamingControlStreamRequest {
1148                    request: Some(
1149                        streaming_control_stream_request::Request::CreatePartialGraph(
1150                            CreatePartialGraphRequest {
1151                                database_id: database_id.database_id,
1152                                partial_graph_id,
1153                            },
1154                        ),
1155                    ),
1156                }).is_err() {
1157                warn!(%database_id, ?creating_job_id, worker_id = node.worker_id, "fail to add partial graph to worker")
1158            }
1159        });
1160    }
1161
1162    pub(super) fn remove_partial_graph(
1163        &mut self,
1164        database_id: DatabaseId,
1165        creating_job_ids: Vec<TableId>,
1166    ) {
1167        if creating_job_ids.is_empty() {
1168            return;
1169        }
1170        let partial_graph_ids = creating_job_ids
1171            .into_iter()
1172            .map(|job_id| to_partial_graph_id(Some(job_id)))
1173            .collect_vec();
1174        self.connected_workers().for_each(|(_, node)| {
1175            if node.handle
1176                .request_sender
1177                .send(StreamingControlStreamRequest {
1178                    request: Some(
1179                        streaming_control_stream_request::Request::RemovePartialGraph(
1180                            RemovePartialGraphRequest {
1181                                partial_graph_ids: partial_graph_ids.clone(),
1182                                database_id: database_id.database_id,
1183                            },
1184                        ),
1185                    ),
1186                })
1187                .is_err()
1188            {
1189                warn!(worker_id = node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1190            }
1191        })
1192    }
1193
1194    pub(super) fn reset_database(
1195        &mut self,
1196        database_id: DatabaseId,
1197        reset_request_id: u32,
1198    ) -> HashSet<WorkerId> {
1199        self.connected_workers()
1200            .filter_map(|(worker_id, node)| {
1201                if node
1202                    .handle
1203                    .request_sender
1204                    .send(StreamingControlStreamRequest {
1205                        request: Some(streaming_control_stream_request::Request::ResetDatabase(
1206                            ResetDatabaseRequest {
1207                                database_id: database_id.database_id,
1208                                reset_request_id,
1209                            },
1210                        )),
1211                    })
1212                    .is_err()
1213                {
1214                    warn!(worker_id, node = ?node.host,"failed to send reset database request");
1215                    None
1216                } else {
1217                    Some(worker_id)
1218                }
1219            })
1220            .collect()
1221    }
1222}
1223
1224impl GlobalBarrierWorkerContextImpl {
1225    pub(super) async fn new_control_stream_impl(
1226        &self,
1227        node: &WorkerNode,
1228        init_request: &PbInitRequest,
1229    ) -> MetaResult<StreamingControlHandle> {
1230        let handle = self
1231            .env
1232            .stream_client_pool()
1233            .get(node)
1234            .await?
1235            .start_streaming_control(init_request.clone())
1236            .await?;
1237        Ok(handle)
1238    }
1239}
1240
1241pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1242    message: &str,
1243    errors: impl IntoIterator<Item = (WorkerId, E)>,
1244) -> MetaError {
1245    use std::fmt::Write;
1246
1247    use risingwave_common::error::error_request_copy;
1248    use risingwave_common::error::tonic::extra::Score;
1249
1250    let errors = errors.into_iter().collect_vec();
1251
1252    if errors.is_empty() {
1253        return anyhow!(message.to_owned()).into();
1254    }
1255
1256    let single_error = |(worker_id, e)| {
1258        anyhow::Error::from(e)
1259            .context(format!("{message}, in worker node {worker_id}"))
1260            .into()
1261    };
1262
1263    if errors.len() == 1 {
1264        return single_error(errors.into_iter().next().unwrap());
1265    }
1266
1267    let max_score = errors
1269        .iter()
1270        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1271        .max();
1272
1273    if let Some(max_score) = max_score {
1274        let mut errors = errors;
1275        let max_scored = errors
1276            .extract_if(.., |(_, e)| {
1277                error_request_copy::<Score>(e) == Some(max_score)
1278            })
1279            .next()
1280            .unwrap();
1281
1282        return single_error(max_scored);
1283    }
1284
1285    let concat: String = errors
1287        .into_iter()
1288        .fold(format!("{message}: "), |mut s, (w, e)| {
1289            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1290            s
1291        });
1292    anyhow!(concat).into()
1293}