1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::error::Error;
18use std::fmt::{Debug, Formatter};
19use std::future::poll_fn;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::future::{BoxFuture, join_all};
27use futures::{FutureExt, StreamExt};
28use itertools::Itertools;
29use risingwave_common::bail;
30use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
31use risingwave_common::id::JobId;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
34use risingwave_common::util::tracing::TracingContext;
35use risingwave_connector::source::SplitImpl;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::common::{HostAddress, WorkerNode};
38use risingwave_pb::hummock::HummockVersionStats;
39use risingwave_pb::id::PartialGraphId;
40use risingwave_pb::source::{PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration};
41use risingwave_pb::stream_plan::barrier_mutation::Mutation;
42use risingwave_pb::stream_plan::stream_node::NodeBody;
43use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation};
44use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
45use risingwave_pb::stream_service::inject_barrier_request::{
46 BuildActorInfo, FragmentBuildActorInfo,
47};
48use risingwave_pb::stream_service::streaming_control_stream_request::{
49 CreatePartialGraphRequest, PbCreatePartialGraphRequest, PbInitRequest,
50 RemovePartialGraphRequest, ResetPartialGraphsRequest,
51};
52use risingwave_pb::stream_service::{
53 BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
54 streaming_control_stream_request, streaming_control_stream_response,
55};
56use risingwave_rpc_client::StreamingControlHandle;
57use thiserror_ext::AsReport;
58use tokio::time::{Instant, sleep};
59use tokio_retry::strategy::ExponentialBackoff;
60use tracing::{debug, error, info, warn};
61use uuid::Uuid;
62
63use super::{BarrierKind, TracedEpoch};
64use crate::barrier::BackfillOrderState;
65use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
66use crate::barrier::cdc_progress::CdcTableBackfillTracker;
67use crate::barrier::checkpoint::{
68 BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
69};
70use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
71use crate::barrier::edge_builder::FragmentEdgeBuilder;
72use crate::barrier::info::{
73 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
74 SubscriberType,
75};
76use crate::barrier::progress::CreateMviewProgressTracker;
77use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
78use crate::controller::fragment::InflightFragmentInfo;
79use crate::controller::utils::StreamingJobExtraInfo;
80use crate::manager::MetaSrvEnv;
81use crate::model::{
82 ActorId, FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
83 SubscriptionId,
84};
85use crate::stream::cdc::{
86 CdcTableSnapshotSplits, is_parallelized_backfill_enabled_cdc_scan_fragment,
87};
88use crate::stream::{
89 ExtendedFragmentBackfillOrder, StreamFragmentGraph, UserDefinedFragmentBackfillOrder,
90 build_actor_connector_splits,
91};
92use crate::{MetaError, MetaResult};
93
94pub(super) fn to_partial_graph_id(
95 database_id: DatabaseId,
96 creating_job_id: Option<JobId>,
97) -> PartialGraphId {
98 let raw_job_id = creating_job_id
99 .map(|job_id| {
100 assert_ne!(job_id, u32::MAX);
101 job_id.as_raw_id()
102 })
103 .unwrap_or(u32::MAX);
104 (((database_id.as_raw_id() as u64) << 32) | (raw_job_id as u64)).into()
105}
106
107pub(super) fn from_partial_graph_id(
108 partial_graph_id: PartialGraphId,
109) -> (DatabaseId, Option<JobId>) {
110 let id = partial_graph_id.as_raw_id();
111 let database_id = (id >> 32) as u32;
112 let raw_creating_job_id = (id & ((1 << 32) - 1)) as u32;
113 let creating_job_id = if raw_creating_job_id == u32::MAX {
114 None
115 } else {
116 Some(JobId::new(raw_creating_job_id))
117 };
118 (database_id.into(), creating_job_id)
119}
120
121pub(super) fn build_locality_fragment_state_table_mapping(
122 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
123) -> HashMap<FragmentId, Vec<TableId>> {
124 let mut mapping = HashMap::new();
125
126 for (fragment_id, fragment_info) in fragment_infos {
127 let mut state_table_ids = Vec::new();
128 visit_stream_node_cont(&fragment_info.nodes, |stream_node| {
129 if let Some(NodeBody::LocalityProvider(locality_provider)) =
130 stream_node.node_body.as_ref()
131 {
132 let state_table_id = locality_provider
133 .state_table
134 .as_ref()
135 .expect("must have state table")
136 .id;
137 state_table_ids.push(state_table_id);
138 false
139 } else {
140 true
141 }
142 });
143 if !state_table_ids.is_empty() {
144 mapping.insert(*fragment_id, state_table_ids);
145 }
146 }
147
148 mapping
149}
150
151struct ControlStreamNode {
152 worker_id: WorkerId,
153 host: HostAddress,
154 handle: StreamingControlHandle,
155}
156
157enum WorkerNodeState {
158 Connected {
159 control_stream: ControlStreamNode,
160 removed: bool,
161 },
162 Reconnecting(BoxFuture<'static, StreamingControlHandle>),
163}
164
165pub(super) struct ControlStreamManager {
166 workers: HashMap<WorkerId, (WorkerNode, WorkerNodeState)>,
167 pub env: MetaSrvEnv,
168}
169
170impl ControlStreamManager {
171 pub(super) fn new(env: MetaSrvEnv) -> Self {
172 Self {
173 workers: Default::default(),
174 env,
175 }
176 }
177
178 pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
179 self.workers[&worker_id].0.host.clone().unwrap()
180 }
181
182 pub(super) async fn add_worker(
183 &mut self,
184 node: WorkerNode,
185 inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
186 term_id: String,
187 context: &impl GlobalBarrierWorkerContext,
188 ) {
189 let node_id = node.id;
190 if let Entry::Occupied(entry) = self.workers.entry(node_id) {
191 let (existing_node, worker_state) = entry.get();
192 assert_eq!(existing_node.host, node.host);
193 warn!(id = %node.id, host = ?node.host, "node already exists");
194 match worker_state {
195 WorkerNodeState::Connected { .. } => {
196 warn!(id = %node.id, host = ?node.host, "new node already connected");
197 return;
198 }
199 WorkerNodeState::Reconnecting(_) => {
200 warn!(id = %node.id, host = ?node.host, "remove previous pending worker connect request and reconnect");
201 entry.remove();
202 }
203 }
204 }
205 let node_host = node.host.clone().unwrap();
206 let mut backoff = ExponentialBackoff::from_millis(100)
207 .max_delay(Duration::from_secs(3))
208 .factor(5);
209 const MAX_RETRY: usize = 5;
210 for i in 1..=MAX_RETRY {
211 match context
212 .new_control_stream(
213 &node,
214 &PbInitRequest {
215 term_id: term_id.clone(),
216 },
217 )
218 .await
219 {
220 Ok(mut handle) => {
221 WorkerNodeConnected {
222 handle: &mut handle,
223 node: &node,
224 }
225 .initialize(inflight_infos);
226 info!(?node_host, "add control stream worker");
227 assert!(
228 self.workers
229 .insert(
230 node_id,
231 (
232 node,
233 WorkerNodeState::Connected {
234 control_stream: ControlStreamNode {
235 worker_id: node_id as _,
236 host: node_host,
237 handle,
238 },
239 removed: false
240 }
241 )
242 )
243 .is_none()
244 );
245 return;
246 }
247 Err(e) => {
248 let delay = backoff.next().unwrap();
251 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
252 sleep(delay).await;
253 }
254 }
255 }
256 error!(?node_host, "fail to create worker node after retry");
257 }
258
259 pub(super) fn remove_worker(&mut self, node: WorkerNode) {
260 if let Entry::Occupied(mut entry) = self.workers.entry(node.id) {
261 let (_, worker_state) = entry.get_mut();
262 match worker_state {
263 WorkerNodeState::Connected { removed, .. } => {
264 info!(worker_id = %node.id, "mark connected worker as removed");
265 *removed = true;
266 }
267 WorkerNodeState::Reconnecting(_) => {
268 info!(worker_id = %node.id, "remove worker");
269 entry.remove();
270 }
271 }
272 }
273 }
274
275 fn retry_connect(
276 node: WorkerNode,
277 term_id: String,
278 context: Arc<impl GlobalBarrierWorkerContext>,
279 ) -> BoxFuture<'static, StreamingControlHandle> {
280 async move {
281 let mut attempt = 0;
282 let backoff = ExponentialBackoff::from_millis(100)
283 .max_delay(Duration::from_mins(1))
284 .factor(5);
285 let init_request = PbInitRequest { term_id };
286 for delay in backoff {
287 attempt += 1;
288 sleep(delay).await;
289 match context.new_control_stream(&node, &init_request).await {
290 Ok(handle) => {
291 return handle;
292 }
293 Err(e) => {
294 warn!(e = %e.as_report(), ?node, attempt, "fail to create control stream worker");
295 }
296 }
297 }
298 unreachable!("end of retry backoff")
299 }.boxed()
300 }
301
302 pub(super) async fn recover(
303 env: MetaSrvEnv,
304 nodes: &HashMap<WorkerId, WorkerNode>,
305 term_id: &str,
306 context: Arc<impl GlobalBarrierWorkerContext>,
307 ) -> Self {
308 let reset_start_time = Instant::now();
309 let init_request = PbInitRequest {
310 term_id: term_id.to_owned(),
311 };
312 let init_request = &init_request;
313 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async {
314 let result = context.new_control_stream(node, init_request).await;
315 (*worker_id, node.clone(), result)
316 }))
317 .await;
318 let mut unconnected_workers = HashSet::new();
319 let mut workers = HashMap::new();
320 for (worker_id, node, result) in nodes {
321 match result {
322 Ok(handle) => {
323 let control_stream = ControlStreamNode {
324 worker_id: node.id,
325 host: node.host.clone().unwrap(),
326 handle,
327 };
328 assert!(
329 workers
330 .insert(
331 worker_id,
332 (
333 node,
334 WorkerNodeState::Connected {
335 control_stream,
336 removed: false
337 }
338 )
339 )
340 .is_none()
341 );
342 }
343 Err(e) => {
344 unconnected_workers.insert(worker_id);
345 warn!(
346 e = %e.as_report(),
347 %worker_id,
348 ?node,
349 "failed to connect to node"
350 );
351 assert!(
352 workers
353 .insert(
354 worker_id,
355 (
356 node.clone(),
357 WorkerNodeState::Reconnecting(Self::retry_connect(
358 node,
359 term_id.to_owned(),
360 context.clone()
361 ))
362 )
363 )
364 .is_none()
365 );
366 }
367 }
368 }
369
370 info!(elapsed=?reset_start_time.elapsed(), ?unconnected_workers, "control stream reset");
371
372 Self { workers, env }
373 }
374
375 pub(super) fn clear(&mut self) {
377 *self = Self::new(self.env.clone());
378 }
379}
380
381pub(super) struct WorkerNodeConnected<'a> {
382 node: &'a WorkerNode,
383 handle: &'a mut StreamingControlHandle,
384}
385
386impl<'a> WorkerNodeConnected<'a> {
387 pub(super) fn initialize(
388 self,
389 inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
390 ) {
391 for request in ControlStreamManager::collect_init_partial_graph(inflight_infos) {
392 if let Err(e) = self.handle.send_request(StreamingControlStreamRequest {
393 request: Some(
394 streaming_control_stream_request::Request::CreatePartialGraph(request),
395 ),
396 }) {
397 warn!(e = %e.as_report(), node = ?self.node, "failed to send initial partial graph request");
398 }
399 }
400 }
401}
402
403pub(super) enum WorkerNodeEvent<'a> {
404 Response(MetaResult<streaming_control_stream_response::Response>),
405 Connected(WorkerNodeConnected<'a>),
406}
407
408impl ControlStreamManager {
409 fn poll_next_event<'a>(
410 this_opt: &mut Option<&'a mut Self>,
411 cx: &mut Context<'_>,
412 term_id: &str,
413 context: &Arc<impl GlobalBarrierWorkerContext>,
414 poll_reconnect: bool,
415 ) -> Poll<(WorkerId, WorkerNodeEvent<'a>)> {
416 let this = this_opt.as_mut().expect("Future polled after completion");
417 if this.workers.is_empty() {
418 return Poll::Pending;
419 }
420 {
421 for (&worker_id, (node, worker_state)) in &mut this.workers {
422 let control_stream = match worker_state {
423 WorkerNodeState::Connected { control_stream, .. } => control_stream,
424 WorkerNodeState::Reconnecting(_) if !poll_reconnect => {
425 continue;
426 }
427 WorkerNodeState::Reconnecting(join_handle) => {
428 match join_handle.poll_unpin(cx) {
429 Poll::Ready(handle) => {
430 info!(id=%node.id, host=?node.host, "reconnected to worker");
431 *worker_state = WorkerNodeState::Connected {
432 control_stream: ControlStreamNode {
433 worker_id: node.id,
434 host: node.host.clone().unwrap(),
435 handle,
436 },
437 removed: false,
438 };
439 let this = this_opt.take().expect("should exist");
440 let (node, worker_state) =
441 this.workers.get_mut(&worker_id).expect("should exist");
442 let WorkerNodeState::Connected { control_stream, .. } =
443 worker_state
444 else {
445 unreachable!()
446 };
447 return Poll::Ready((
448 worker_id,
449 WorkerNodeEvent::Connected(WorkerNodeConnected {
450 node,
451 handle: &mut control_stream.handle,
452 }),
453 ));
454 }
455 Poll::Pending => {
456 continue;
457 }
458 }
459 }
460 };
461 match control_stream.handle.response_stream.poll_next_unpin(cx) {
462 Poll::Ready(result) => {
463 {
464 let result = result
465 .ok_or_else(|| (false, anyhow!("end of stream").into()))
466 .and_then(|result| {
467 result.map_err(|err| -> (bool, MetaError) { (false, err.into()) }).and_then(|resp| {
468 match resp
469 .response
470 .ok_or_else(|| (false, anyhow!("empty response").into()))?
471 {
472 streaming_control_stream_response::Response::Shutdown(_) => Err((true, anyhow!(
473 "worker node {worker_id} is shutting down"
474 )
475 .into())),
476 streaming_control_stream_response::Response::Init(_) => {
477 Err((false, anyhow!("get unexpected init response").into()))
479 }
480 resp => {
481 if let streaming_control_stream_response::Response::CompleteBarrier(barrier_resp) = &resp {
482 assert_eq!(worker_id, barrier_resp.worker_id);
483 }
484 Ok(resp)
485 }
486 }
487 })
488 });
489 let result = match result {
490 Ok(resp) => Ok(resp),
491 Err((shutdown, err)) => {
492 warn!(worker_id = %node.id, host = ?node.host, err = %err.as_report(), "get error from response stream");
493 let WorkerNodeState::Connected { removed, .. } = worker_state
494 else {
495 unreachable!("checked connected")
496 };
497 if *removed || shutdown {
498 this.workers.remove(&worker_id);
499 } else {
500 *worker_state = WorkerNodeState::Reconnecting(
501 ControlStreamManager::retry_connect(
502 node.clone(),
503 term_id.to_owned(),
504 context.clone(),
505 ),
506 );
507 }
508 Err(err)
509 }
510 };
511 return Poll::Ready((worker_id, WorkerNodeEvent::Response(result)));
512 }
513 }
514 Poll::Pending => {
515 continue;
516 }
517 }
518 }
519 };
520
521 Poll::Pending
522 }
523
524 #[await_tree::instrument("control_stream_next_event")]
525 pub(super) async fn next_event<'a>(
526 &'a mut self,
527 term_id: &str,
528 context: &Arc<impl GlobalBarrierWorkerContext>,
529 ) -> (WorkerId, WorkerNodeEvent<'a>) {
530 let mut this = Some(self);
531 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, true)).await
532 }
533
534 #[await_tree::instrument("control_stream_next_response")]
535 pub(super) async fn next_response(
536 &mut self,
537 term_id: &str,
538 context: &Arc<impl GlobalBarrierWorkerContext>,
539 ) -> (
540 WorkerId,
541 MetaResult<streaming_control_stream_response::Response>,
542 ) {
543 let mut this = Some(self);
544 let (worker_id, event) =
545 poll_fn(|cx| Self::poll_next_event(&mut this, cx, term_id, context, false)).await;
546 match event {
547 WorkerNodeEvent::Response(result) => (worker_id, result),
548 WorkerNodeEvent::Connected(_) => {
549 unreachable!("set poll_reconnect=false")
550 }
551 }
552 }
553
554 fn collect_init_partial_graph(
555 initial_inflight_infos: impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId>)>,
556 ) -> impl Iterator<Item = PbCreatePartialGraphRequest> {
557 initial_inflight_infos.flat_map(|(database_id, creating_job_ids)| {
558 [PbCreatePartialGraphRequest {
559 partial_graph_id: to_partial_graph_id(database_id, None),
560 }]
561 .into_iter()
562 .chain(
563 creating_job_ids.map(move |job_id| PbCreatePartialGraphRequest {
564 partial_graph_id: to_partial_graph_id(database_id, Some(job_id)),
565 }),
566 )
567 })
568 }
569}
570
571pub(super) struct DatabaseInitialBarrierCollector {
572 database_id: DatabaseId,
573 node_to_collect: NodeToCollect,
574 database_state: BarrierWorkerState,
575 database_info: InflightDatabaseInfo,
576 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
577 committed_epoch: u64,
578}
579
580impl Debug for DatabaseInitialBarrierCollector {
581 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
582 f.debug_struct("DatabaseInitialBarrierCollector")
583 .field("database_id", &self.database_id)
584 .field("node_to_collect", &self.node_to_collect)
585 .finish()
586 }
587}
588
589impl DatabaseInitialBarrierCollector {
590 pub(super) fn is_collected(&self) -> bool {
591 self.node_to_collect.is_empty()
592 && self
593 .creating_streaming_job_controls
594 .values()
595 .all(|job| job.is_empty())
596 }
597
598 pub(super) fn database_state(
599 &self,
600 ) -> (
601 &BarrierWorkerState,
602 &HashMap<JobId, CreatingStreamingJobControl>,
603 ) {
604 (&self.database_state, &self.creating_streaming_job_controls)
605 }
606
607 pub(super) fn creating_job_ids(&self) -> impl Iterator<Item = JobId> {
608 self.creating_streaming_job_controls.keys().copied()
609 }
610
611 pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
612 let (database_id, creating_job_id) = from_partial_graph_id(resp.partial_graph_id);
613 assert_eq!(self.database_id, database_id);
614 if let Some(creating_job_id) = creating_job_id {
615 assert!(
616 !self
617 .creating_streaming_job_controls
618 .get_mut(&creating_job_id)
619 .expect("should exist")
620 .collect(resp),
621 "unlikely to finish backfill since just recovered"
622 );
623 } else {
624 assert_eq!(resp.epoch, self.committed_epoch);
625 assert!(self.node_to_collect.remove(&resp.worker_id));
626 }
627 }
628
629 pub(super) fn finish(self) -> DatabaseCheckpointControl {
630 assert!(self.is_collected());
631 DatabaseCheckpointControl::recovery(
632 self.database_id,
633 self.database_state,
634 self.committed_epoch,
635 self.database_info,
636 self.creating_streaming_job_controls,
637 )
638 }
639
640 pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
641 is_valid_after_worker_err(&self.node_to_collect, worker_id)
642 && self
643 .creating_streaming_job_controls
644 .values()
645 .all(|job| job.is_valid_after_worker_err(worker_id))
646 }
647}
648
649impl ControlStreamManager {
650 #[expect(clippy::too_many_arguments)]
652 pub(super) fn inject_database_initial_barrier(
653 &mut self,
654 database_id: DatabaseId,
655 jobs: HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
656 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
657 state_table_committed_epochs: &mut HashMap<TableId, u64>,
658 state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
659 fragment_relations: &FragmentDownstreamRelation,
660 stream_actors: &HashMap<ActorId, StreamActor>,
661 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
662 background_jobs: &mut HashSet<JobId>,
663 mv_depended_subscriptions: &mut HashMap<TableId, HashMap<SubscriptionId, u64>>,
664 is_paused: bool,
665 hummock_version_stats: &HummockVersionStats,
666 cdc_table_snapshot_splits: &mut HashMap<JobId, CdcTableSnapshotSplits>,
667 injected_creating_jobs: &mut HashSet<JobId>,
668 ) -> MetaResult<DatabaseInitialBarrierCollector> {
669 self.add_partial_graph(database_id, None);
670 fn collect_source_splits(
671 fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
672 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
673 ) -> HashMap<ActorId, Vec<SplitImpl>> {
674 fragment_infos
675 .flat_map(|info| info.actors.keys())
676 .filter_map(|actor_id| {
677 let actor_id = *actor_id as ActorId;
678 source_splits
679 .remove(&actor_id)
680 .map(|splits| (actor_id, splits))
681 })
682 .collect()
683 }
684 fn build_mutation(
685 splits: &HashMap<ActorId, Vec<SplitImpl>>,
686 cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits>,
687 backfill_orders: &ExtendedFragmentBackfillOrder,
688 is_paused: bool,
689 ) -> Mutation {
690 let backfill_nodes_to_pause = get_nodes_with_backfill_dependencies(backfill_orders)
691 .into_iter()
692 .collect();
693 Mutation::Add(AddMutation {
694 actor_dispatchers: Default::default(),
696 added_actors: Default::default(),
697 actor_splits: build_actor_connector_splits(splits),
698 actor_cdc_table_snapshot_splits: Some(PbCdcTableSnapshotSplitsWithGeneration {
699 splits: cdc_table_snapshot_split_assignment,
700 }),
701 pause: is_paused,
702 subscriptions_to_add: Default::default(),
703 backfill_nodes_to_pause,
704 new_upstream_sinks: Default::default(),
705 })
706 }
707
708 fn resolve_jobs_committed_epoch<'a>(
709 state_table_committed_epochs: &mut HashMap<TableId, u64>,
710 fragments: impl Iterator<Item = &'a InflightFragmentInfo> + 'a,
711 ) -> u64 {
712 let mut epochs = InflightFragmentInfo::existing_table_ids(fragments).map(|table_id| {
713 (
714 table_id,
715 state_table_committed_epochs
716 .remove(&table_id)
717 .expect("should exist"),
718 )
719 });
720 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
721 for (table_id, epoch) in epochs {
722 assert_eq!(
723 prev_epoch, epoch,
724 "{} has different committed epoch to {}",
725 first_table_id, table_id
726 );
727 }
728 prev_epoch
729 }
730 fn job_backfill_orders(
731 job_extra_info: &HashMap<JobId, StreamingJobExtraInfo>,
732 job_id: JobId,
733 ) -> UserDefinedFragmentBackfillOrder {
734 UserDefinedFragmentBackfillOrder::new(
735 job_extra_info
736 .get(&job_id)
737 .and_then(|info| info.backfill_orders.clone())
738 .map_or_else(HashMap::new, |orders| orders.0),
739 )
740 }
741
742 let mut subscribers: HashMap<_, HashMap<_, _>> = jobs
743 .keys()
744 .filter_map(|job_id| {
745 mv_depended_subscriptions
746 .remove(&job_id.as_mv_table_id())
747 .map(|subscriptions| {
748 (
749 job_id.as_mv_table_id(),
750 subscriptions
751 .into_iter()
752 .map(|(subscription_id, retention)| {
753 (
754 subscription_id.as_subscriber_id(),
755 SubscriberType::Subscription(retention),
756 )
757 })
758 .collect(),
759 )
760 })
761 })
762 .collect();
763
764 let mut database_jobs = HashMap::new();
765 let mut snapshot_backfill_jobs = HashMap::new();
766
767 for (job_id, job_fragments) in jobs {
768 if background_jobs.remove(&job_id) {
769 if job_fragments.values().any(|fragment| {
770 fragment
771 .fragment_type_mask
772 .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
773 }) {
774 debug!(%job_id, "recovered snapshot backfill job");
775 snapshot_backfill_jobs.insert(job_id, job_fragments);
776 } else {
777 database_jobs.insert(job_id, (job_fragments, true));
778 }
779 } else {
780 database_jobs.insert(job_id, (job_fragments, false));
781 }
782 }
783
784 let database_job_log_epochs: HashMap<_, _> = database_jobs
785 .keys()
786 .filter_map(|job_id| {
787 state_table_log_epochs
788 .remove(&job_id.as_mv_table_id())
789 .map(|epochs| (job_id.as_mv_table_id(), epochs))
790 })
791 .collect();
792
793 let prev_epoch = resolve_jobs_committed_epoch(
794 state_table_committed_epochs,
795 database_jobs.values().flat_map(|(job, _)| job.values()),
796 );
797 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
798 let curr_epoch = prev_epoch.next();
800 let barrier_info = BarrierInfo {
801 prev_epoch,
802 curr_epoch,
803 kind: BarrierKind::Initial,
804 };
805
806 let mut ongoing_snapshot_backfill_jobs: HashMap<JobId, _> = HashMap::new();
807 for (job_id, fragment_infos) in snapshot_backfill_jobs {
808 let committed_epoch =
809 resolve_jobs_committed_epoch(state_table_committed_epochs, fragment_infos.values());
810 if committed_epoch == barrier_info.prev_epoch() {
811 info!(
812 "recovered creating snapshot backfill job {} catch up with upstream already",
813 job_id
814 );
815 database_jobs
816 .try_insert(job_id, (fragment_infos, true))
817 .expect("non-duplicate");
818 continue;
819 }
820 let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
821 fragment_infos
822 .values()
823 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
824 )?
825 .0
826 .ok_or_else(|| {
827 anyhow!(
828 "recovered snapshot backfill job {} has no snapshot backfill info",
829 job_id
830 )
831 })?;
832 let mut snapshot_epoch = None;
833 let upstream_table_ids: HashSet<_> = snapshot_backfill_info
834 .upstream_mv_table_id_to_backfill_epoch
835 .keys()
836 .cloned()
837 .collect();
838 for (upstream_table_id, epoch) in
839 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
840 {
841 let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
842 let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
843 if *snapshot_epoch != epoch {
844 return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
845 }
846 }
847 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
848 anyhow!(
849 "snapshot backfill job {} has not set snapshot epoch",
850 job_id
851 )
852 })?;
853 for upstream_table_id in &upstream_table_ids {
854 subscribers
855 .entry(*upstream_table_id)
856 .or_default()
857 .try_insert(job_id.as_subscriber_id(), SubscriberType::SnapshotBackfill)
858 .expect("non-duplicate");
859 }
860 ongoing_snapshot_backfill_jobs
861 .try_insert(
862 job_id,
863 (
864 fragment_infos,
865 upstream_table_ids,
866 committed_epoch,
867 snapshot_epoch,
868 ),
869 )
870 .expect("non-duplicated");
871 }
872
873 let mut cdc_table_snapshot_split_assignment: HashMap<ActorId, PbCdcTableSnapshotSplits> =
874 HashMap::new();
875
876 let database_jobs: HashMap<JobId, InflightStreamingJobInfo> = {
877 database_jobs
878 .into_iter()
879 .map(|(job_id, (fragment_infos, is_background_creating))| {
880 let status = if is_background_creating {
881 let backfill_ordering = job_backfill_orders(job_extra_info, job_id);
882 let backfill_ordering = StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
883 backfill_ordering,
884 fragment_relations,
885 || fragment_infos.iter().map(|(fragment_id, fragment)| {
886 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
887 }));
888 let locality_fragment_state_table_mapping =
889 build_locality_fragment_state_table_mapping(&fragment_infos);
890 let backfill_order_state = BackfillOrderState::recover_from_fragment_infos(
891 &backfill_ordering,
892 &fragment_infos,
893 locality_fragment_state_table_mapping,
894 );
895 CreateStreamingJobStatus::Creating {
896 tracker: CreateMviewProgressTracker::recover(
897 job_id,
898 &fragment_infos,
899 backfill_order_state,
900 hummock_version_stats,
901 ),
902 }
903 } else {
904 CreateStreamingJobStatus::Created
905 };
906 let cdc_table_backfill_tracker =
907 if let Some(splits) = cdc_table_snapshot_splits.remove(&job_id) {
908 let cdc_fragment = fragment_infos
909 .values()
910 .find(|fragment| {
911 is_parallelized_backfill_enabled_cdc_scan_fragment(
912 fragment.fragment_type_mask,
913 &fragment.nodes,
914 )
915 .is_some()
916 })
917 .expect("should have parallel cdc fragment");
918 let cdc_actors = cdc_fragment.actors.keys().copied().collect();
919 let mut tracker =
920 CdcTableBackfillTracker::restore(cdc_fragment.fragment_id, splits);
921 cdc_table_snapshot_split_assignment
922 .extend(tracker.reassign_splits(cdc_actors)?);
923 Some(tracker)
924 } else {
925 None
926 };
927 Ok((
928 job_id,
929 InflightStreamingJobInfo {
930 job_id,
931 fragment_infos,
932 subscribers: subscribers
933 .remove(&job_id.as_mv_table_id())
934 .unwrap_or_default(),
935 status,
936 cdc_table_backfill_tracker,
937 },
938 ))
939 })
940 .try_collect::<_, _, MetaError>()
941 }?;
942
943 let mut builder = FragmentEdgeBuilder::new(
944 database_jobs
945 .values()
946 .flat_map(|job| {
947 job.fragment_infos()
948 .map(|info| (info, to_partial_graph_id(database_id, None)))
949 })
950 .chain(ongoing_snapshot_backfill_jobs.iter().flat_map(
951 |(job_id, (fragments, ..))| {
952 let partial_graph_id = to_partial_graph_id(database_id, Some(*job_id));
953 fragments
954 .values()
955 .map(move |fragment| (fragment, partial_graph_id))
956 },
957 )),
958 self,
959 );
960 builder.add_relations(fragment_relations);
961 let mut edges = builder.build();
962
963 let node_to_collect = {
964 let new_actors =
965 edges.collect_actors_to_create(database_jobs.values().flat_map(move |job| {
966 job.fragment_infos.values().map(move |fragment_infos| {
967 (
968 fragment_infos.fragment_id,
969 &fragment_infos.nodes,
970 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
971 (
972 stream_actors.get(actor_id).expect("should exist"),
973 actor.worker_id,
974 )
975 }),
976 job.subscribers.keys().copied(),
977 )
978 })
979 }));
980
981 let nodes_actors =
982 InflightFragmentInfo::actor_ids_to_collect(database_jobs.values().flatten());
983 let database_job_source_splits =
984 collect_source_splits(database_jobs.values().flatten(), source_splits);
985 let database_backfill_orders =
986 UserDefinedFragmentBackfillOrder::merge(database_jobs.values().map(|job| {
987 if matches!(job.status, CreateStreamingJobStatus::Creating { .. }) {
988 job_backfill_orders(job_extra_info, job.job_id)
989 } else {
990 UserDefinedFragmentBackfillOrder::default()
991 }
992 }));
993 let database_backfill_orders =
994 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
995 database_backfill_orders,
996 fragment_relations,
997 || {
998 database_jobs.values().flat_map(|job_fragments| {
999 job_fragments
1000 .fragment_infos
1001 .iter()
1002 .map(|(fragment_id, fragment)| {
1003 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1004 })
1005 })
1006 },
1007 );
1008 let mutation = build_mutation(
1009 &database_job_source_splits,
1010 cdc_table_snapshot_split_assignment,
1011 &database_backfill_orders,
1012 is_paused,
1013 );
1014
1015 let node_to_collect = self.inject_barrier(
1016 database_id,
1017 None,
1018 Some(mutation),
1019 &barrier_info,
1020 &nodes_actors,
1021 InflightFragmentInfo::existing_table_ids(database_jobs.values().flatten()),
1022 nodes_actors.keys().copied(),
1023 Some(new_actors),
1024 )?;
1025 debug!(
1026 ?node_to_collect,
1027 %database_id,
1028 "inject initial barrier"
1029 );
1030 node_to_collect
1031 };
1032
1033 let mut creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl> =
1034 HashMap::new();
1035 for (job_id, (info, upstream_table_ids, committed_epoch, snapshot_epoch)) in
1036 ongoing_snapshot_backfill_jobs
1037 {
1038 let node_actors = edges.collect_actors_to_create(info.values().map(|fragment_infos| {
1039 (
1040 fragment_infos.fragment_id,
1041 &fragment_infos.nodes,
1042 fragment_infos.actors.iter().map(move |(actor_id, actor)| {
1043 (
1044 stream_actors.get(actor_id).expect("should exist"),
1045 actor.worker_id,
1046 )
1047 }),
1048 vec![], )
1050 }));
1051
1052 let database_job_source_splits =
1053 collect_source_splits(database_jobs.values().flatten(), source_splits);
1054 assert!(
1055 !cdc_table_snapshot_splits.contains_key(&job_id),
1056 "snapshot backfill job {job_id} should not have cdc backfill"
1057 );
1058 if is_paused {
1059 bail!("should not pause when having snapshot backfill job {job_id}");
1060 }
1061 let job_backfill_orders = job_backfill_orders(job_extra_info, job_id);
1062 let job_backfill_orders =
1063 StreamFragmentGraph::extend_fragment_backfill_ordering_with_locality_backfill(
1064 job_backfill_orders,
1065 fragment_relations,
1066 || {
1067 info.iter().map(|(fragment_id, fragment)| {
1068 (*fragment_id, fragment.fragment_type_mask, &fragment.nodes)
1069 })
1070 },
1071 );
1072 let mutation = build_mutation(
1073 &database_job_source_splits,
1074 Default::default(), &job_backfill_orders,
1076 false,
1077 );
1078
1079 injected_creating_jobs.insert(job_id);
1082 creating_streaming_job_controls.insert(
1083 job_id,
1084 CreatingStreamingJobControl::recover(
1085 database_id,
1086 job_id,
1087 upstream_table_ids,
1088 &database_job_log_epochs,
1089 snapshot_epoch,
1090 committed_epoch,
1091 &barrier_info,
1092 info,
1093 job_backfill_orders,
1094 fragment_relations,
1095 hummock_version_stats,
1096 node_actors,
1097 mutation,
1098 self,
1099 )?,
1100 );
1101 }
1102
1103 self.env.shared_actor_infos().recover_database(
1104 database_id,
1105 database_jobs
1106 .values()
1107 .flat_map(|info| {
1108 info.fragment_infos()
1109 .map(move |fragment| (fragment, info.job_id))
1110 })
1111 .chain(
1112 creating_streaming_job_controls
1113 .values()
1114 .flat_map(|job| job.fragment_infos_with_job_id()),
1115 ),
1116 );
1117
1118 let committed_epoch = barrier_info.prev_epoch();
1119 let new_epoch = barrier_info.curr_epoch;
1120 let database_info = InflightDatabaseInfo::recover(
1121 database_id,
1122 database_jobs.into_values(),
1123 self.env.shared_actor_infos().clone(),
1124 );
1125 let database_state = BarrierWorkerState::recovery(new_epoch, is_paused);
1126 Ok(DatabaseInitialBarrierCollector {
1127 database_id,
1128 node_to_collect,
1129 database_state,
1130 database_info,
1131 creating_streaming_job_controls,
1132 committed_epoch,
1133 })
1134 }
1135
1136 fn connected_workers(&self) -> impl Iterator<Item = (WorkerId, &ControlStreamNode)> + '_ {
1137 self.workers
1138 .iter()
1139 .filter_map(|(worker_id, (_, worker_state))| match worker_state {
1140 WorkerNodeState::Connected { control_stream, .. } => {
1141 Some((*worker_id, control_stream))
1142 }
1143 WorkerNodeState::Reconnecting(_) => None,
1144 })
1145 }
1146
1147 pub(super) fn inject_barrier(
1148 &mut self,
1149 database_id: DatabaseId,
1150 creating_job_id: Option<JobId>,
1151 mutation: Option<Mutation>,
1152 barrier_info: &BarrierInfo,
1153 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
1154 table_ids_to_sync: impl Iterator<Item = TableId>,
1155 nodes_to_sync_table: impl Iterator<Item = WorkerId>,
1156 mut new_actors: Option<StreamJobActorsToCreate>,
1157 ) -> MetaResult<NodeToCollect> {
1158 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
1159 "inject_barrier_err"
1160 ));
1161
1162 let nodes_to_sync_table: HashSet<_> = nodes_to_sync_table.collect();
1163
1164 let partial_graph_id = to_partial_graph_id(database_id, creating_job_id);
1165
1166 nodes_to_sync_table.iter().for_each(|worker_id| {
1167 assert!(node_actors.contains_key(worker_id), "worker_id {worker_id} in nodes_to_sync_table {nodes_to_sync_table:?} but not in node_actors {node_actors:?}");
1168 });
1169
1170 let mut node_need_collect = NodeToCollect::new();
1171 let table_ids_to_sync = table_ids_to_sync.collect_vec();
1172
1173 node_actors.iter()
1174 .try_for_each(|(worker_id, actor_ids_to_collect)| {
1175 assert!(!actor_ids_to_collect.is_empty(), "empty actor_ids_to_collect on worker {worker_id} in node_actors {node_actors:?}");
1176 let table_ids_to_sync = if nodes_to_sync_table.contains(worker_id) {
1177 table_ids_to_sync.clone()
1178 } else {
1179 vec![]
1180 };
1181
1182 let node = if let Some((_, worker_state)) = self.workers.get(worker_id)
1183 &&
1184 let WorkerNodeState::Connected { control_stream, .. } = worker_state
1185 {
1186 control_stream
1187 } else {
1188 return Err(anyhow!("unconnected worker node {}", worker_id).into());
1189 };
1190
1191 {
1192 let mutation = mutation.clone();
1193 let barrier = Barrier {
1194 epoch: Some(risingwave_pb::data::Epoch {
1195 curr: barrier_info.curr_epoch(),
1196 prev: barrier_info.prev_epoch(),
1197 }),
1198 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
1199 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
1200 .to_protobuf(),
1201 kind: barrier_info.kind.to_protobuf() as i32,
1202 };
1203
1204 node.handle
1205 .request_sender
1206 .send(StreamingControlStreamRequest {
1207 request: Some(
1208 streaming_control_stream_request::Request::InjectBarrier(
1209 InjectBarrierRequest {
1210 request_id: Uuid::new_v4().to_string(),
1211 barrier: Some(barrier),
1212 actor_ids_to_collect: actor_ids_to_collect.iter().copied().collect(),
1213 table_ids_to_sync,
1214 partial_graph_id,
1215 actors_to_build: new_actors
1216 .as_mut()
1217 .map(|new_actors| new_actors.remove(worker_id))
1218 .into_iter()
1219 .flatten()
1220 .flatten()
1221 .map(|(fragment_id, (node, actors, initial_subscriber_ids))| {
1222 FragmentBuildActorInfo {
1223 fragment_id,
1224 node: Some(node),
1225 actors: actors
1226 .into_iter()
1227 .map(|(actor, upstreams, dispatchers)| {
1228 BuildActorInfo {
1229 actor_id: actor.actor_id,
1230 fragment_upstreams: upstreams
1231 .into_iter()
1232 .map(|(fragment_id, upstreams)| {
1233 (
1234 fragment_id,
1235 UpstreamActors {
1236 actors: upstreams
1237 .into_values()
1238 .collect(),
1239 },
1240 )
1241 })
1242 .collect(),
1243 dispatchers,
1244 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
1245 mview_definition: actor.mview_definition,
1246 expr_context: actor.expr_context,
1247 config_override: actor.config_override.to_string(),
1248 initial_subscriber_ids: initial_subscriber_ids.iter().copied().collect(),
1249 }
1250 })
1251 .collect(),
1252 }
1253 })
1254 .collect(),
1255 },
1256 ),
1257 ),
1258 })
1259 .map_err(|_| {
1260 MetaError::from(anyhow!(
1261 "failed to send request to {} {:?}",
1262 node.worker_id,
1263 node.host
1264 ))
1265 })?;
1266
1267 node_need_collect.insert(*worker_id);
1268 Result::<_, MetaError>::Ok(())
1269 }
1270 })
1271 .inspect_err(|e| {
1272 use risingwave_pb::meta::event_log;
1274 let event = event_log::EventInjectBarrierFail {
1275 prev_epoch: barrier_info.prev_epoch(),
1276 cur_epoch: barrier_info.curr_epoch(),
1277 error: e.to_report_string(),
1278 };
1279 self.env
1280 .event_log_manager_ref()
1281 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
1282 })?;
1283 Ok(node_need_collect)
1284 }
1285
1286 pub(super) fn add_partial_graph(
1287 &mut self,
1288 database_id: DatabaseId,
1289 creating_job_id: Option<JobId>,
1290 ) {
1291 let partial_graph_id = to_partial_graph_id(database_id, creating_job_id);
1292 self.connected_workers().for_each(|(_, node)| {
1293 if node
1294 .handle
1295 .request_sender
1296 .send(StreamingControlStreamRequest {
1297 request: Some(
1298 streaming_control_stream_request::Request::CreatePartialGraph(
1299 CreatePartialGraphRequest {
1300 partial_graph_id,
1301 },
1302 ),
1303 ),
1304 }).is_err() {
1305 warn!(%database_id, ?creating_job_id, worker_id = %node.worker_id, "fail to add partial graph to worker")
1306 }
1307 });
1308 }
1309
1310 pub(super) fn remove_partial_graph(
1311 &mut self,
1312 database_id: DatabaseId,
1313 creating_job_ids: Vec<JobId>,
1314 ) {
1315 if creating_job_ids.is_empty() {
1316 return;
1317 }
1318 let partial_graph_ids = creating_job_ids
1319 .into_iter()
1320 .map(|job_id| to_partial_graph_id(database_id, Some(job_id)))
1321 .collect_vec();
1322 self.connected_workers().for_each(|(_, node)| {
1323 if node.handle
1324 .request_sender
1325 .send(StreamingControlStreamRequest {
1326 request: Some(
1327 streaming_control_stream_request::Request::RemovePartialGraph(
1328 RemovePartialGraphRequest {
1329 partial_graph_ids: partial_graph_ids.clone(),
1330 },
1331 ),
1332 ),
1333 })
1334 .is_err()
1335 {
1336 warn!(worker_id = %node.worker_id,node = ?node.host,"failed to send remove partial graph request");
1337 }
1338 })
1339 }
1340
1341 pub(super) fn reset_partial_graphs(
1342 &mut self,
1343 partial_graph_ids: Vec<PartialGraphId>,
1344 ) -> HashSet<WorkerId> {
1345 self.connected_workers()
1346 .filter_map(|(worker_id, node)| {
1347 if node
1348 .handle
1349 .request_sender
1350 .send(StreamingControlStreamRequest {
1351 request: Some(
1352 streaming_control_stream_request::Request::ResetPartialGraphs(
1353 ResetPartialGraphsRequest {
1354 partial_graph_ids: partial_graph_ids.clone(),
1355 },
1356 ),
1357 ),
1358 })
1359 .is_err()
1360 {
1361 warn!(%worker_id, node = ?node.host,"failed to send reset database request");
1362 None
1363 } else {
1364 Some(worker_id)
1365 }
1366 })
1367 .collect()
1368 }
1369}
1370
1371impl GlobalBarrierWorkerContextImpl {
1372 pub(super) async fn new_control_stream_impl(
1373 &self,
1374 node: &WorkerNode,
1375 init_request: &PbInitRequest,
1376 ) -> MetaResult<StreamingControlHandle> {
1377 let handle = self
1378 .env
1379 .stream_client_pool()
1380 .get(node)
1381 .await?
1382 .start_streaming_control(init_request.clone())
1383 .await?;
1384 Ok(handle)
1385 }
1386}
1387
1388pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1389 message: &str,
1390 errors: impl IntoIterator<Item = (WorkerId, E)>,
1391) -> MetaError {
1392 use std::fmt::Write;
1393
1394 use risingwave_common::error::error_request_copy;
1395 use risingwave_common::error::tonic::extra::Score;
1396
1397 let errors = errors.into_iter().collect_vec();
1398
1399 if errors.is_empty() {
1400 return anyhow!(message.to_owned()).into();
1401 }
1402
1403 let single_error = |(worker_id, e)| {
1405 anyhow::Error::from(e)
1406 .context(format!("{message}, in worker node {worker_id}"))
1407 .into()
1408 };
1409
1410 if errors.len() == 1 {
1411 return single_error(errors.into_iter().next().unwrap());
1412 }
1413
1414 let max_score = errors
1416 .iter()
1417 .filter_map(|(_, e)| error_request_copy::<Score>(e))
1418 .max();
1419
1420 if let Some(max_score) = max_score {
1421 let mut errors = errors;
1422 let max_scored = errors
1423 .extract_if(.., |(_, e)| {
1424 error_request_copy::<Score>(e) == Some(max_score)
1425 })
1426 .next()
1427 .unwrap();
1428
1429 return single_error(max_scored);
1430 }
1431
1432 let concat: String = errors
1434 .into_iter()
1435 .fold(format!("{message}: "), |mut s, (w, e)| {
1436 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1437 s
1438 });
1439 anyhow!(concat).into()
1440}
1441
1442#[cfg(test)]
1443mod test_partial_graph_id {
1444 use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
1445
1446 #[test]
1447 fn test_partial_graph_id_conversion() {
1448 let database_id = 233.into();
1449 let job_id = 233.into();
1450 assert_eq!(
1451 (database_id, None),
1452 from_partial_graph_id(to_partial_graph_id(database_id, None))
1453 );
1454 assert_eq!(
1455 (database_id, Some(job_id)),
1456 from_partial_graph_id(to_partial_graph_id(database_id, Some(job_id)))
1457 );
1458 }
1459}