risingwave_meta/manager/sink_coordination/
manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use futures::future::{BoxFuture, Either, select};
21use futures::stream::FuturesUnordered;
22use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
25use risingwave_connector::sink::catalog::SinkId;
26use risingwave_connector::sink::{SinkCommittedEpochSubscriber, SinkError, SinkParam};
27use risingwave_pb::connector_service::coordinate_request::Msg;
28use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse, coordinate_request};
29use rw_futures_util::pending_on_none;
30use sea_orm::DatabaseConnection;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
34use tokio::sync::oneshot::{Receiver, Sender, channel};
35use tokio::task::{JoinError, JoinHandle};
36use tokio_stream::wrappers::UnboundedReceiverStream;
37use tonic::Status;
38use tracing::{debug, error, info, warn};
39
40use crate::hummock::HummockManagerRef;
41use crate::manager::MetadataManager;
42use crate::manager::sink_coordination::SinkWriterRequestStream;
43use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
44use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
45
46macro_rules! send_with_err_check {
47    ($tx:expr, $msg:expr) => {
48        if $tx.send($msg).is_err() {
49            error!("unable to send msg");
50        }
51    };
52}
53
54macro_rules! send_await_with_err_check {
55    ($tx:expr, $msg:expr) => {
56        if $tx.send($msg).await.is_err() {
57            error!("unable to send msg");
58        }
59    };
60}
61
62const BOUNDED_CHANNEL_SIZE: usize = 16;
63
64enum ManagerRequest {
65    NewSinkWriter(SinkWriterCoordinationHandle),
66    StopCoordinator {
67        finish_notifier: Sender<()>,
68        /// sink id to stop. When `None`, stop all sink coordinator
69        sink_id: Option<SinkId>,
70    },
71}
72
73#[derive(Clone)]
74pub struct SinkCoordinatorManager {
75    request_tx: mpsc::Sender<ManagerRequest>,
76}
77
78fn new_committed_epoch_subscriber(
79    hummock_manager: HummockManagerRef,
80    metadata_manager: MetadataManager,
81) -> SinkCommittedEpochSubscriber {
82    Arc::new(move |sink_id| {
83        let hummock_manager = hummock_manager.clone();
84        let metadata_manager = metadata_manager.clone();
85        async move {
86            let state_table_ids = metadata_manager
87                .get_sink_state_table_ids(sink_id.sink_id as _)
88                .await
89                .map_err(SinkError::from)?;
90            let Some(table_id) = state_table_ids.first() else {
91                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
92            };
93            hummock_manager
94                .subscribe_table_committed_epoch(*table_id)
95                .await
96                .map_err(SinkError::from)
97        }
98        .boxed()
99    })
100}
101
102impl SinkCoordinatorManager {
103    #[cfg(test)]
104    pub(crate) fn for_test() -> Self {
105        let (request_tx, mut request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
106        let _join_handle = tokio::spawn(async move {
107            while let Some(req) = request_rx.recv().await {
108                let ManagerRequest::StopCoordinator {
109                    finish_notifier,
110                    sink_id,
111                } = req
112                else {
113                    unreachable!()
114                };
115                assert_eq!(sink_id, None);
116                finish_notifier.send(()).unwrap();
117            }
118        });
119        SinkCoordinatorManager { request_tx }
120    }
121
122    pub fn start_worker(
123        db: DatabaseConnection,
124        hummock_manager: HummockManagerRef,
125        metadata_manager: MetadataManager,
126        iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
127    ) -> (Self, (JoinHandle<()>, Sender<()>)) {
128        let subscriber =
129            new_committed_epoch_subscriber(hummock_manager.clone(), metadata_manager.clone());
130        Self::start_worker_with_spawn_worker(move |param, manager_request_stream| {
131            tokio::spawn(CoordinatorWorker::run(
132                param,
133                manager_request_stream,
134                db.clone(),
135                subscriber.clone(),
136                iceberg_compact_stat_sender.clone(),
137            ))
138        })
139    }
140
141    fn start_worker_with_spawn_worker(
142        spawn_coordinator_worker: impl SpawnCoordinatorFn,
143    ) -> (Self, (JoinHandle<()>, Sender<()>)) {
144        let (request_tx, request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
145        let (shutdown_tx, shutdown_rx) = channel();
146        let worker = ManagerWorker::new(request_rx, shutdown_rx);
147        let join_handle = tokio::spawn(worker.execute(spawn_coordinator_worker));
148        (
149            SinkCoordinatorManager { request_tx },
150            (join_handle, shutdown_tx),
151        )
152    }
153
154    pub async fn handle_new_request(
155        &self,
156        mut request_stream: SinkWriterRequestStream,
157    ) -> Result<impl Stream<Item = Result<CoordinateResponse, Status>> + use<>, Status> {
158        let (param, vnode_bitmap) = match request_stream.try_next().await? {
159            Some(CoordinateRequest {
160                msg:
161                    Some(Msg::StartRequest(coordinate_request::StartCoordinationRequest {
162                        param: Some(param),
163                        vnode_bitmap: Some(vnode_bitmap),
164                    })),
165            }) => (SinkParam::from_proto(param), Bitmap::from(&vnode_bitmap)),
166            msg => {
167                return Err(Status::invalid_argument(format!(
168                    "expected CoordinateRequest::StartRequest in the first request, get {:?}",
169                    msg
170                )));
171            }
172        };
173        let (response_tx, response_rx) = mpsc::unbounded_channel();
174        self.request_tx
175            .send(ManagerRequest::NewSinkWriter(
176                SinkWriterCoordinationHandle::new(request_stream, response_tx, param, vnode_bitmap),
177            ))
178            .await
179            .map_err(|_| {
180                Status::unavailable(
181                    "unable to send to sink manager worker. The worker may have stopped",
182                )
183            })?;
184
185        Ok(UnboundedReceiverStream::new(response_rx))
186    }
187
188    async fn stop_coordinator(&self, sink_id: Option<SinkId>) {
189        let (tx, rx) = channel();
190        send_await_with_err_check!(
191            self.request_tx,
192            ManagerRequest::StopCoordinator {
193                finish_notifier: tx,
194                sink_id,
195            }
196        );
197        if rx.await.is_err() {
198            error!("fail to wait for resetting sink manager worker");
199        }
200        info!("successfully stop coordinator: {:?}", sink_id);
201    }
202
203    pub async fn reset(&self) {
204        self.stop_coordinator(None).await;
205    }
206
207    pub async fn stop_sink_coordinator(&self, sink_id: SinkId) {
208        self.stop_coordinator(Some(sink_id)).await;
209    }
210}
211
212struct CoordinatorWorkerHandle {
213    /// Sender to coordinator worker. Drop the sender as a stop signal
214    request_sender: Option<UnboundedSender<SinkWriterCoordinationHandle>>,
215    /// Notify when the coordinator worker stops
216    finish_notifiers: Vec<Sender<()>>,
217}
218
219struct ManagerWorker {
220    request_rx: mpsc::Receiver<ManagerRequest>,
221    // Make it option so that it can be polled with &mut SinkManagerWorker
222    shutdown_rx: Receiver<()>,
223
224    running_coordinator_worker_join_handles:
225        FuturesUnordered<BoxFuture<'static, (SinkId, Result<(), JoinError>)>>,
226    running_coordinator_worker: HashMap<SinkId, CoordinatorWorkerHandle>,
227}
228
229enum ManagerEvent {
230    NewRequest(ManagerRequest),
231    CoordinatorWorkerFinished {
232        sink_id: SinkId,
233        join_result: Result<(), JoinError>,
234    },
235}
236
237trait SpawnCoordinatorFn = FnMut(SinkParam, UnboundedReceiver<SinkWriterCoordinationHandle>) -> JoinHandle<()>
238    + Send
239    + 'static;
240
241impl ManagerWorker {
242    fn new(request_rx: mpsc::Receiver<ManagerRequest>, shutdown_rx: Receiver<()>) -> Self {
243        ManagerWorker {
244            request_rx,
245            shutdown_rx,
246            running_coordinator_worker_join_handles: Default::default(),
247            running_coordinator_worker: Default::default(),
248        }
249    }
250
251    async fn execute(mut self, mut spawn_coordinator_worker: impl SpawnCoordinatorFn) {
252        while let Some(event) = self.next_event().await {
253            match event {
254                ManagerEvent::NewRequest(request) => match request {
255                    ManagerRequest::NewSinkWriter(request) => {
256                        self.handle_new_sink_writer(request, &mut spawn_coordinator_worker)
257                    }
258                    ManagerRequest::StopCoordinator {
259                        finish_notifier,
260                        sink_id,
261                    } => {
262                        if let Some(sink_id) = sink_id {
263                            if let Some(worker_handle) =
264                                self.running_coordinator_worker.get_mut(&sink_id)
265                            {
266                                if let Some(sender) = worker_handle.request_sender.take() {
267                                    // drop the sender as a signal to notify the coordinator worker
268                                    // to stop
269                                    drop(sender);
270                                }
271                                worker_handle.finish_notifiers.push(finish_notifier);
272                            } else {
273                                debug!(
274                                    "sink coordinator of {} is not running. Notify finish directly",
275                                    sink_id.sink_id
276                                );
277                                send_with_err_check!(finish_notifier, ());
278                            }
279                        } else {
280                            self.clean_up().await;
281                            send_with_err_check!(finish_notifier, ());
282                        }
283                    }
284                },
285                ManagerEvent::CoordinatorWorkerFinished {
286                    sink_id,
287                    join_result,
288                } => self.handle_coordinator_finished(sink_id, join_result),
289            }
290        }
291        self.clean_up().await;
292        info!("sink manager worker exited");
293    }
294
295    async fn next_event(&mut self) -> Option<ManagerEvent> {
296        match select(
297            select(
298                pin!(self.request_rx.recv()),
299                pin!(pending_on_none(
300                    self.running_coordinator_worker_join_handles.next()
301                )),
302            ),
303            &mut self.shutdown_rx,
304        )
305        .await
306        {
307            Either::Left((either, _)) => match either {
308                Either::Left((Some(request), _)) => Some(ManagerEvent::NewRequest(request)),
309                Either::Left((None, _)) => None,
310                Either::Right(((sink_id, join_result), _)) => {
311                    Some(ManagerEvent::CoordinatorWorkerFinished {
312                        sink_id,
313                        join_result,
314                    })
315                }
316            },
317            Either::Right(_) => None,
318        }
319    }
320
321    async fn clean_up(&mut self) {
322        info!("sink manager worker start cleaning up");
323        for worker_handle in self.running_coordinator_worker.values_mut() {
324            if let Some(sender) = worker_handle.request_sender.take() {
325                // drop the sender to notify the coordinator worker to stop
326                drop(sender);
327            }
328        }
329        while let Some((sink_id, join_result)) =
330            self.running_coordinator_worker_join_handles.next().await
331        {
332            self.handle_coordinator_finished(sink_id, join_result);
333        }
334        info!("sink manager worker finished cleaning up");
335    }
336
337    fn handle_coordinator_finished(&mut self, sink_id: SinkId, join_result: Result<(), JoinError>) {
338        let worker_handle = self
339            .running_coordinator_worker
340            .remove(&sink_id)
341            .expect("finished coordinator should have an associated worker handle");
342        for finish_notifier in worker_handle.finish_notifiers {
343            send_with_err_check!(finish_notifier, ());
344        }
345        match join_result {
346            Ok(()) => {
347                info!(
348                    id = sink_id.sink_id,
349                    "sink coordinator has gracefully finished",
350                );
351            }
352            Err(err) => {
353                error!(
354                    id = sink_id.sink_id,
355                    error = %err.as_report(),
356                    "sink coordinator finished with error",
357                );
358            }
359        }
360    }
361
362    fn handle_new_sink_writer(
363        &mut self,
364        new_writer: SinkWriterCoordinationHandle,
365        spawn_coordinator_worker: &mut impl SpawnCoordinatorFn,
366    ) {
367        let param = new_writer.param();
368        let sink_id = param.sink_id;
369
370        let handle = self
371            .running_coordinator_worker
372            .entry(param.sink_id)
373            .or_insert_with(|| {
374                // Launch the coordinator worker task if it is the first
375                let (request_tx, request_rx) = unbounded_channel();
376                let join_handle = spawn_coordinator_worker(param.clone(), request_rx);
377                self.running_coordinator_worker_join_handles.push(
378                    join_handle
379                        .map(move |join_result| (sink_id, join_result))
380                        .boxed(),
381                );
382                CoordinatorWorkerHandle {
383                    request_sender: Some(request_tx),
384                    finish_notifiers: Vec::new(),
385                }
386            });
387
388        if let Some(sender) = handle.request_sender.as_mut() {
389            send_with_err_check!(sender, new_writer);
390        } else {
391            warn!(
392                "handle a new request while the sink coordinator is being stopped: {:?}",
393                param
394            );
395            new_writer.abort(Status::internal("the sink is being stopped"));
396        }
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use std::future::{Future, poll_fn};
403    use std::pin::pin;
404    use std::sync::Arc;
405    use std::task::Poll;
406
407    use anyhow::anyhow;
408    use async_trait::async_trait;
409    use futures::future::{join, try_join};
410    use futures::{FutureExt, StreamExt, TryFutureExt};
411    use itertools::Itertools;
412    use rand::seq::SliceRandom;
413    use risingwave_common::bitmap::BitmapBuilder;
414    use risingwave_common::hash::VirtualNode;
415    use risingwave_connector::sink::catalog::{SinkId, SinkType};
416    use risingwave_connector::sink::{SinkCommitCoordinator, SinkError, SinkParam};
417    use risingwave_pb::connector_service::SinkMetadata;
418    use risingwave_pb::connector_service::sink_metadata::{Metadata, SerializedMetadata};
419    use risingwave_rpc_client::CoordinatorStreamHandle;
420    use tokio::sync::mpsc::unbounded_channel;
421    use tokio_stream::wrappers::ReceiverStream;
422
423    use crate::manager::sink_coordination::SinkCoordinatorManager;
424    use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
425    use crate::manager::sink_coordination::manager::SinkCommittedEpochSubscriber;
426
427    struct MockCoordinator<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> {
428        context: C,
429        f: F,
430    }
431
432    impl<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> MockCoordinator<C, F> {
433        fn new(context: C, f: F) -> Self {
434            MockCoordinator { context, f }
435        }
436    }
437
438    #[async_trait]
439    impl<C: Send, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError> + Send>
440        SinkCommitCoordinator for MockCoordinator<C, F>
441    {
442        async fn init(
443            &mut self,
444            _subscriber: SinkCommittedEpochSubscriber,
445        ) -> risingwave_connector::sink::Result<Option<u64>> {
446            Ok(None)
447        }
448
449        async fn commit(
450            &mut self,
451            epoch: u64,
452            metadata: Vec<SinkMetadata>,
453        ) -> risingwave_connector::sink::Result<()> {
454            (self.f)(epoch, metadata, &mut self.context)
455        }
456    }
457
458    #[tokio::test]
459    async fn test_basic() {
460        let param = SinkParam {
461            sink_id: SinkId::from(1),
462            sink_name: "test".into(),
463            properties: Default::default(),
464            columns: vec![],
465            downstream_pk: vec![],
466            sink_type: SinkType::AppendOnly,
467            format_desc: None,
468            db_name: "test".into(),
469            sink_from_name: "test".into(),
470        };
471
472        let epoch0 = 232;
473        let epoch1 = 233;
474        let epoch2 = 234;
475
476        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
477        all_vnode.shuffle(&mut rand::rng());
478        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
479        let build_bitmap = |indexes: &[usize]| {
480            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
481            for i in indexes {
482                builder.set(*i, true);
483            }
484            builder.finish()
485        };
486        let vnode1 = build_bitmap(first);
487        let vnode2 = build_bitmap(second);
488
489        let metadata = [
490            [vec![1u8, 2u8], vec![3u8, 4u8]],
491            [vec![5u8, 6u8], vec![7u8, 8u8]],
492        ];
493        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
494            let (_sender, receiver) = unbounded_channel();
495
496            async move { Ok((1, receiver)) }.boxed()
497        });
498
499        let (manager, (_join_handle, _stop_tx)) =
500            SinkCoordinatorManager::start_worker_with_spawn_worker({
501                let expected_param = param.clone();
502                let metadata = metadata.clone();
503                move |param, new_writer_rx| {
504                    let metadata = metadata.clone();
505                    let expected_param = expected_param.clone();
506                    tokio::spawn({
507                        let subscriber = mock_subscriber.clone();
508                        async move {
509                            // validate the start request
510                            assert_eq!(param, expected_param);
511                            CoordinatorWorker::execute_coordinator(
512                                param.clone(),
513                                new_writer_rx,
514                                MockCoordinator::new(
515                                    0,
516                                    |epoch, metadata_list, count: &mut usize| {
517                                        *count += 1;
518                                        let mut metadata_list =
519                                            metadata_list
520                                                .into_iter()
521                                                .map(|metadata| match metadata {
522                                                    SinkMetadata {
523                                                        metadata:
524                                                            Some(Metadata::Serialized(
525                                                                SerializedMetadata { metadata },
526                                                            )),
527                                                    } => metadata,
528                                                    _ => unreachable!(),
529                                                })
530                                                .collect_vec();
531                                        metadata_list.sort();
532                                        match *count {
533                                            1 => {
534                                                assert_eq!(epoch, epoch1);
535                                                assert_eq!(2, metadata_list.len());
536                                                assert_eq!(metadata[0][0], metadata_list[0]);
537                                                assert_eq!(metadata[0][1], metadata_list[1]);
538                                            }
539                                            2 => {
540                                                assert_eq!(epoch, epoch2);
541                                                assert_eq!(2, metadata_list.len());
542                                                assert_eq!(metadata[1][0], metadata_list[0]);
543                                                assert_eq!(metadata[1][1], metadata_list[1]);
544                                            }
545                                            _ => unreachable!(),
546                                        }
547                                        Ok(())
548                                    },
549                                ),
550                                subscriber.clone(),
551                            )
552                            .await;
553                        }
554                    })
555                }
556            });
557
558        let build_client = |vnode| async {
559            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
560                Ok(tonic::Response::new(
561                    manager
562                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
563                        .await
564                        .unwrap()
565                        .boxed(),
566                ))
567            })
568            .await
569            .unwrap()
570            .0
571        };
572
573        let (mut client1, mut client2) =
574            join(build_client(vnode1), pin!(build_client(vnode2))).await;
575
576        let (aligned_epoch1, aligned_epoch2) = try_join(
577            client1.align_initial_epoch(epoch0),
578            client2.align_initial_epoch(epoch1),
579        )
580        .await
581        .unwrap();
582        assert_eq!(aligned_epoch1, epoch1);
583        assert_eq!(aligned_epoch2, epoch1);
584
585        {
586            // commit epoch1
587            let mut commit_future = pin!(
588                client2
589                    .commit(
590                        epoch1,
591                        SinkMetadata {
592                            metadata: Some(Metadata::Serialized(SerializedMetadata {
593                                metadata: metadata[0][1].clone(),
594                            })),
595                        },
596                    )
597                    .map(|result| result.unwrap())
598            );
599            assert!(
600                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
601                    .await
602                    .is_pending()
603            );
604            join(
605                commit_future,
606                client1
607                    .commit(
608                        epoch1,
609                        SinkMetadata {
610                            metadata: Some(Metadata::Serialized(SerializedMetadata {
611                                metadata: metadata[0][0].clone(),
612                            })),
613                        },
614                    )
615                    .map(|result| result.unwrap()),
616            )
617            .await;
618        }
619
620        // commit epoch2
621        let mut commit_future = pin!(
622            client1
623                .commit(
624                    epoch2,
625                    SinkMetadata {
626                        metadata: Some(Metadata::Serialized(SerializedMetadata {
627                            metadata: metadata[1][0].clone(),
628                        })),
629                    },
630                )
631                .map(|result| result.unwrap())
632        );
633        assert!(
634            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
635                .await
636                .is_pending()
637        );
638        join(
639            commit_future,
640            client2
641                .commit(
642                    epoch2,
643                    SinkMetadata {
644                        metadata: Some(Metadata::Serialized(SerializedMetadata {
645                            metadata: metadata[1][1].clone(),
646                        })),
647                    },
648                )
649                .map(|result| result.unwrap()),
650        )
651        .await;
652    }
653
654    #[tokio::test]
655    async fn test_single_writer() {
656        let param = SinkParam {
657            sink_id: SinkId::from(1),
658            sink_name: "test".into(),
659            properties: Default::default(),
660            columns: vec![],
661            downstream_pk: vec![],
662            sink_type: SinkType::AppendOnly,
663            format_desc: None,
664            db_name: "test".into(),
665            sink_from_name: "test".into(),
666        };
667
668        let epoch1 = 233;
669        let epoch2 = 234;
670
671        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
672        let build_bitmap = |indexes: &[usize]| {
673            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
674            for i in indexes {
675                builder.set(*i, true);
676            }
677            builder.finish()
678        };
679        let vnode = build_bitmap(&all_vnode);
680
681        let metadata = [vec![1u8, 2u8], vec![3u8, 4u8]];
682        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
683            let (_sender, receiver) = unbounded_channel();
684
685            async move { Ok((1, receiver)) }.boxed()
686        });
687        let (manager, (_join_handle, _stop_tx)) =
688            SinkCoordinatorManager::start_worker_with_spawn_worker({
689                let expected_param = param.clone();
690                let metadata = metadata.clone();
691                move |param, new_writer_rx| {
692                    let metadata = metadata.clone();
693                    let expected_param = expected_param.clone();
694                    tokio::spawn({
695                        let subscriber = mock_subscriber.clone();
696                        async move {
697                            // validate the start request
698                            assert_eq!(param, expected_param);
699                            CoordinatorWorker::execute_coordinator(
700                                param.clone(),
701                                new_writer_rx,
702                                MockCoordinator::new(
703                                    0,
704                                    |epoch, metadata_list, count: &mut usize| {
705                                        *count += 1;
706                                        let mut metadata_list =
707                                            metadata_list
708                                                .into_iter()
709                                                .map(|metadata| match metadata {
710                                                    SinkMetadata {
711                                                        metadata:
712                                                            Some(Metadata::Serialized(
713                                                                SerializedMetadata { metadata },
714                                                            )),
715                                                    } => metadata,
716                                                    _ => unreachable!(),
717                                                })
718                                                .collect_vec();
719                                        metadata_list.sort();
720                                        match *count {
721                                            1 => {
722                                                assert_eq!(epoch, epoch1);
723                                                assert_eq!(1, metadata_list.len());
724                                                assert_eq!(metadata[0], metadata_list[0]);
725                                            }
726                                            2 => {
727                                                assert_eq!(epoch, epoch2);
728                                                assert_eq!(1, metadata_list.len());
729                                                assert_eq!(metadata[1], metadata_list[0]);
730                                            }
731                                            _ => unreachable!(),
732                                        }
733                                        Ok(())
734                                    },
735                                ),
736                                subscriber.clone(),
737                            )
738                            .await;
739                        }
740                    })
741                }
742            });
743
744        let build_client = |vnode| async {
745            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
746                Ok(tonic::Response::new(
747                    manager
748                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
749                        .await
750                        .unwrap()
751                        .boxed(),
752                ))
753            })
754            .await
755            .unwrap()
756            .0
757        };
758
759        let mut client = build_client(vnode).await;
760
761        let aligned_epoch = client.align_initial_epoch(epoch1).await.unwrap();
762        assert_eq!(aligned_epoch, epoch1);
763
764        client
765            .commit(
766                epoch1,
767                SinkMetadata {
768                    metadata: Some(Metadata::Serialized(SerializedMetadata {
769                        metadata: metadata[0].clone(),
770                    })),
771                },
772            )
773            .await
774            .unwrap();
775
776        client
777            .commit(
778                epoch2,
779                SinkMetadata {
780                    metadata: Some(Metadata::Serialized(SerializedMetadata {
781                        metadata: metadata[1].clone(),
782                    })),
783                },
784            )
785            .await
786            .unwrap();
787    }
788
789    #[tokio::test]
790    async fn test_partial_commit() {
791        let param = SinkParam {
792            sink_id: SinkId::from(1),
793            sink_name: "test".into(),
794            properties: Default::default(),
795            columns: vec![],
796            downstream_pk: vec![],
797            sink_type: SinkType::AppendOnly,
798            format_desc: None,
799            db_name: "test".into(),
800            sink_from_name: "test".into(),
801        };
802
803        let epoch = 233;
804
805        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
806        all_vnode.shuffle(&mut rand::rng());
807        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
808        let build_bitmap = |indexes: &[usize]| {
809            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
810            for i in indexes {
811                builder.set(*i, true);
812            }
813            builder.finish()
814        };
815        let vnode1 = build_bitmap(first);
816        let vnode2 = build_bitmap(second);
817
818        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
819            let (_sender, receiver) = unbounded_channel();
820
821            async move { Ok((1, receiver)) }.boxed()
822        });
823        let (manager, (_join_handle, _stop_tx)) =
824            SinkCoordinatorManager::start_worker_with_spawn_worker({
825                let expected_param = param.clone();
826                move |param, new_writer_rx| {
827                    let expected_param = expected_param.clone();
828                    tokio::spawn({
829                        let subscriber = mock_subscriber.clone();
830                        async move {
831                            // validate the start request
832                            assert_eq!(param, expected_param);
833                            CoordinatorWorker::execute_coordinator(
834                                param,
835                                new_writer_rx,
836                                MockCoordinator::new((), |_, _, _| unreachable!()),
837                                subscriber.clone(),
838                            )
839                            .await;
840                        }
841                    })
842                }
843            });
844
845        let build_client = |vnode| async {
846            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
847                Ok(tonic::Response::new(
848                    manager
849                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
850                        .await
851                        .unwrap()
852                        .boxed(),
853                ))
854            })
855            .await
856            .unwrap()
857            .0
858        };
859
860        let (mut client1, client2) = join(build_client(vnode1), build_client(vnode2)).await;
861
862        // commit epoch
863        let mut commit_future = pin!(client1.commit(
864            epoch,
865            SinkMetadata {
866                metadata: Some(Metadata::Serialized(SerializedMetadata {
867                    metadata: vec![],
868                })),
869            },
870        ));
871        assert!(
872            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
873                .await
874                .is_pending()
875        );
876        drop(client2);
877        assert!(commit_future.await.is_err());
878    }
879
880    #[tokio::test]
881    async fn test_fail_commit() {
882        let param = SinkParam {
883            sink_id: SinkId::from(1),
884            sink_name: "test".into(),
885            properties: Default::default(),
886            columns: vec![],
887            downstream_pk: vec![],
888            sink_type: SinkType::AppendOnly,
889            format_desc: None,
890            db_name: "test".into(),
891            sink_from_name: "test".into(),
892        };
893
894        let epoch = 233;
895
896        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
897        all_vnode.shuffle(&mut rand::rng());
898        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
899        let build_bitmap = |indexes: &[usize]| {
900            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
901            for i in indexes {
902                builder.set(*i, true);
903            }
904            builder.finish()
905        };
906        let vnode1 = build_bitmap(first);
907        let vnode2 = build_bitmap(second);
908        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
909            let (_sender, receiver) = unbounded_channel();
910
911            async move { Ok((1, receiver)) }.boxed()
912        });
913        let (manager, (_join_handle, _stop_tx)) =
914            SinkCoordinatorManager::start_worker_with_spawn_worker({
915                let expected_param = param.clone();
916                move |param, new_writer_rx| {
917                    let expected_param = expected_param.clone();
918                    tokio::spawn({
919                        let subscriber = mock_subscriber.clone();
920                        {
921                            async move {
922                                // validate the start request
923                                assert_eq!(param, expected_param);
924                                CoordinatorWorker::execute_coordinator(
925                                    param,
926                                    new_writer_rx,
927                                    MockCoordinator::new((), |_, _, _| {
928                                        Err(SinkError::Coordinator(anyhow!("failed to commit")))
929                                    }),
930                                    subscriber.clone(),
931                                )
932                                .await;
933                            }
934                        }
935                    })
936                }
937            });
938
939        let build_client = |vnode| async {
940            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
941                Ok(tonic::Response::new(
942                    manager
943                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
944                        .await
945                        .unwrap()
946                        .boxed(),
947                ))
948            })
949            .await
950            .unwrap()
951            .0
952        };
953
954        let (mut client1, mut client2) = join(build_client(vnode1), build_client(vnode2)).await;
955
956        // commit epoch
957        let mut commit_future = pin!(client1.commit(
958            epoch,
959            SinkMetadata {
960                metadata: Some(Metadata::Serialized(SerializedMetadata {
961                    metadata: vec![],
962                })),
963            },
964        ));
965        assert!(
966            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
967                .await
968                .is_pending()
969        );
970        let (result1, result2) = join(
971            commit_future,
972            client2.commit(
973                epoch,
974                SinkMetadata {
975                    metadata: Some(Metadata::Serialized(SerializedMetadata {
976                        metadata: vec![],
977                    })),
978                },
979            ),
980        )
981        .await;
982        assert!(result1.is_err());
983        assert!(result2.is_err());
984    }
985
986    #[tokio::test]
987    async fn test_update_vnode_bitmap() {
988        let param = SinkParam {
989            sink_id: SinkId::from(1),
990            sink_name: "test".into(),
991            properties: Default::default(),
992            columns: vec![],
993            downstream_pk: vec![],
994            sink_type: SinkType::AppendOnly,
995            format_desc: None,
996            db_name: "test".into(),
997            sink_from_name: "test".into(),
998        };
999
1000        let epoch1 = 233;
1001        let epoch2 = 234;
1002        let epoch3 = 235;
1003        let epoch4 = 236;
1004
1005        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1006        all_vnode.shuffle(&mut rand::rng());
1007        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
1008        let build_bitmap = |indexes: &[usize]| {
1009            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1010            for i in indexes {
1011                builder.set(*i, true);
1012            }
1013            builder.finish()
1014        };
1015        let vnode1 = build_bitmap(first);
1016        let vnode2 = build_bitmap(second);
1017
1018        let metadata = [
1019            [vec![1u8, 2u8], vec![3u8, 4u8]],
1020            [vec![5u8, 6u8], vec![7u8, 8u8]],
1021        ];
1022
1023        let metadata_scale_out = [vec![9u8, 10u8], vec![11u8, 12u8], vec![13u8, 14u8]];
1024        let metadata_scale_in = [vec![13u8, 14u8], vec![15u8, 16u8]];
1025        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
1026            let (_sender, receiver) = unbounded_channel();
1027
1028            async move { Ok((1, receiver)) }.boxed()
1029        });
1030        let (manager, (_join_handle, _stop_tx)) =
1031            SinkCoordinatorManager::start_worker_with_spawn_worker({
1032                let expected_param = param.clone();
1033                let metadata = metadata.clone();
1034                let metadata_scale_out = metadata_scale_out.clone();
1035                let metadata_scale_in = metadata_scale_in.clone();
1036                move |param, new_writer_rx| {
1037                    let metadata = metadata.clone();
1038                    let metadata_scale_out = metadata_scale_out.clone();
1039                    let metadata_scale_in = metadata_scale_in.clone();
1040                    let expected_param = expected_param.clone();
1041                    tokio::spawn({
1042                        let subscriber = mock_subscriber.clone();
1043                        async move {
1044                            // validate the start request
1045                            assert_eq!(param, expected_param);
1046                            CoordinatorWorker::execute_coordinator(
1047                                param.clone(),
1048                                new_writer_rx,
1049                                MockCoordinator::new(
1050                                    0,
1051                                    |epoch, metadata_list, count: &mut usize| {
1052                                        *count += 1;
1053                                        let mut metadata_list =
1054                                            metadata_list
1055                                                .into_iter()
1056                                                .map(|metadata| match metadata {
1057                                                    SinkMetadata {
1058                                                        metadata:
1059                                                            Some(Metadata::Serialized(
1060                                                                SerializedMetadata { metadata },
1061                                                            )),
1062                                                    } => metadata,
1063                                                    _ => unreachable!(),
1064                                                })
1065                                                .collect_vec();
1066                                        metadata_list.sort();
1067                                        let (expected_epoch, expected_metadata_list) = match *count
1068                                        {
1069                                            1 => (epoch1, metadata[0].as_slice()),
1070                                            2 => (epoch2, metadata[1].as_slice()),
1071                                            3 => (epoch3, metadata_scale_out.as_slice()),
1072                                            4 => (epoch4, metadata_scale_in.as_slice()),
1073                                            _ => unreachable!(),
1074                                        };
1075                                        assert_eq!(expected_epoch, epoch);
1076                                        assert_eq!(expected_metadata_list, &metadata_list);
1077                                        Ok(())
1078                                    },
1079                                ),
1080                                subscriber.clone(),
1081                            )
1082                            .await;
1083                        }
1084                    })
1085                }
1086            });
1087
1088        let build_client = |vnode| async {
1089            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1090                Ok(tonic::Response::new(
1091                    manager
1092                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1093                        .await
1094                        .unwrap()
1095                        .boxed(),
1096                ))
1097            })
1098            .await
1099        };
1100
1101        let ((mut client1, _), (mut client2, _)) =
1102            try_join(build_client(vnode1), pin!(build_client(vnode2)))
1103                .await
1104                .unwrap();
1105
1106        let (aligned_epoch1, aligned_epoch2) = try_join(
1107            client1.align_initial_epoch(epoch1),
1108            client2.align_initial_epoch(epoch1),
1109        )
1110        .await
1111        .unwrap();
1112        assert_eq!(aligned_epoch1, epoch1);
1113        assert_eq!(aligned_epoch2, epoch1);
1114
1115        {
1116            // commit epoch1
1117            let mut commit_future = pin!(
1118                client2
1119                    .commit(
1120                        epoch1,
1121                        SinkMetadata {
1122                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1123                                metadata: metadata[0][1].clone(),
1124                            })),
1125                        },
1126                    )
1127                    .map(|result| result.unwrap())
1128            );
1129            assert!(
1130                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1131                    .await
1132                    .is_pending()
1133            );
1134            join(
1135                commit_future,
1136                client1
1137                    .commit(
1138                        epoch1,
1139                        SinkMetadata {
1140                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1141                                metadata: metadata[0][0].clone(),
1142                            })),
1143                        },
1144                    )
1145                    .map(|result| result.unwrap()),
1146            )
1147            .await;
1148        }
1149
1150        let (vnode1, vnode2, vnode3) = {
1151            let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1152            let (second, third) = second.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1153            (
1154                build_bitmap(first),
1155                build_bitmap(second),
1156                build_bitmap(third),
1157            )
1158        };
1159
1160        let mut build_client3_future = pin!(build_client(vnode3));
1161        assert!(
1162            poll_fn(|cx| Poll::Ready(build_client3_future.as_mut().poll(cx)))
1163                .await
1164                .is_pending()
1165        );
1166        let mut client3;
1167        {
1168            {
1169                // commit epoch2
1170                let mut commit_future = pin!(
1171                    client1
1172                        .commit(
1173                            epoch2,
1174                            SinkMetadata {
1175                                metadata: Some(Metadata::Serialized(SerializedMetadata {
1176                                    metadata: metadata[1][0].clone(),
1177                                })),
1178                            },
1179                        )
1180                        .map_err(Into::into)
1181                );
1182                assert!(
1183                    poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1184                        .await
1185                        .is_pending()
1186                );
1187                try_join(
1188                    commit_future,
1189                    client2.commit(
1190                        epoch2,
1191                        SinkMetadata {
1192                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1193                                metadata: metadata[1][1].clone(),
1194                            })),
1195                        },
1196                    ),
1197                )
1198                .await
1199                .unwrap();
1200            }
1201
1202            client3 = {
1203                let (
1204                    (client3, init_epoch),
1205                    (update_vnode_bitmap_epoch1, update_vnode_bitmap_epoch2),
1206                ) = try_join(
1207                    build_client3_future,
1208                    try_join(
1209                        client1.update_vnode_bitmap(&vnode1),
1210                        client2.update_vnode_bitmap(&vnode2),
1211                    )
1212                    .map_err(Into::into),
1213                )
1214                .await
1215                .unwrap();
1216                assert_eq!(init_epoch, Some(epoch2));
1217                assert_eq!(update_vnode_bitmap_epoch1, epoch2);
1218                assert_eq!(update_vnode_bitmap_epoch2, epoch2);
1219                client3
1220            };
1221            let mut commit_future3 = pin!(client3.commit(
1222                epoch3,
1223                SinkMetadata {
1224                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1225                        metadata: metadata_scale_out[2].clone(),
1226                    })),
1227                },
1228            ));
1229            assert!(
1230                poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1231                    .await
1232                    .is_pending()
1233            );
1234            let mut commit_future1 = pin!(client1.commit(
1235                epoch3,
1236                SinkMetadata {
1237                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1238                        metadata: metadata_scale_out[0].clone(),
1239                    })),
1240                },
1241            ));
1242            assert!(
1243                poll_fn(|cx| Poll::Ready(commit_future1.as_mut().poll(cx)))
1244                    .await
1245                    .is_pending()
1246            );
1247            assert!(
1248                poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1249                    .await
1250                    .is_pending()
1251            );
1252            try_join(
1253                client2.commit(
1254                    epoch3,
1255                    SinkMetadata {
1256                        metadata: Some(Metadata::Serialized(SerializedMetadata {
1257                            metadata: metadata_scale_out[1].clone(),
1258                        })),
1259                    },
1260                ),
1261                try_join(commit_future1, commit_future3),
1262            )
1263            .await
1264            .unwrap();
1265        }
1266
1267        let (vnode2, vnode3) = {
1268            let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1269            (build_bitmap(first), build_bitmap(second))
1270        };
1271
1272        {
1273            let (_, (update_vnode_bitmap_epoch2, update_vnode_bitmap_epoch3)) = try_join(
1274                client1.stop(),
1275                try_join(
1276                    client2.update_vnode_bitmap(&vnode2),
1277                    client3.update_vnode_bitmap(&vnode3),
1278                ),
1279            )
1280            .await
1281            .unwrap();
1282            assert_eq!(update_vnode_bitmap_epoch2, epoch3);
1283            assert_eq!(update_vnode_bitmap_epoch3, epoch3);
1284        }
1285
1286        {
1287            let mut commit_future = pin!(
1288                client2
1289                    .commit(
1290                        epoch4,
1291                        SinkMetadata {
1292                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1293                                metadata: metadata_scale_in[0].clone(),
1294                            })),
1295                        },
1296                    )
1297                    .map(|result| result.unwrap())
1298            );
1299            assert!(
1300                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1301                    .await
1302                    .is_pending()
1303            );
1304            join(
1305                commit_future,
1306                client3
1307                    .commit(
1308                        epoch4,
1309                        SinkMetadata {
1310                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1311                                metadata: metadata_scale_in[1].clone(),
1312                            })),
1313                        },
1314                    )
1315                    .map(|result| result.unwrap()),
1316            )
1317            .await;
1318        }
1319    }
1320}