risingwave_meta/manager/sink_coordination/
manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use futures::future::{BoxFuture, Either, select};
21use futures::stream::FuturesUnordered;
22use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_connector::sink::catalog::SinkId;
25use risingwave_connector::sink::{SinkCommittedEpochSubscriber, SinkError, SinkParam};
26use risingwave_pb::connector_service::coordinate_request::Msg;
27use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse, coordinate_request};
28use rw_futures_util::pending_on_none;
29use sea_orm::DatabaseConnection;
30use thiserror_ext::AsReport;
31use tokio::sync::mpsc;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
33use tokio::sync::oneshot::{Receiver, Sender, channel};
34use tokio::task::{JoinError, JoinHandle};
35use tokio_stream::wrappers::UnboundedReceiverStream;
36use tonic::Status;
37use tracing::{debug, error, info, warn};
38
39use crate::hummock::HummockManagerRef;
40use crate::manager::MetadataManager;
41use crate::manager::sink_coordination::SinkWriterRequestStream;
42use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
43use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
44
45macro_rules! send_with_err_check {
46    ($tx:expr, $msg:expr) => {
47        if $tx.send($msg).is_err() {
48            error!("unable to send msg");
49        }
50    };
51}
52
53macro_rules! send_await_with_err_check {
54    ($tx:expr, $msg:expr) => {
55        if $tx.send($msg).await.is_err() {
56            error!("unable to send msg");
57        }
58    };
59}
60
61const BOUNDED_CHANNEL_SIZE: usize = 16;
62
63enum ManagerRequest {
64    NewSinkWriter(SinkWriterCoordinationHandle),
65    StopCoordinator {
66        finish_notifier: Sender<()>,
67        /// sink id to stop. When `None`, stop all sink coordinator
68        sink_id: Option<SinkId>,
69    },
70}
71
72#[derive(Clone)]
73pub struct SinkCoordinatorManager {
74    request_tx: mpsc::Sender<ManagerRequest>,
75}
76
77fn new_committed_epoch_subscriber(
78    hummock_manager: HummockManagerRef,
79    metadata_manager: MetadataManager,
80) -> SinkCommittedEpochSubscriber {
81    Arc::new(move |sink_id| {
82        let hummock_manager = hummock_manager.clone();
83        let metadata_manager = metadata_manager.clone();
84        async move {
85            let state_table_ids = metadata_manager
86                .get_sink_state_table_ids(sink_id.sink_id as _)
87                .await
88                .map_err(SinkError::from)?;
89            let Some(table_id) = state_table_ids.first() else {
90                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
91            };
92            hummock_manager
93                .subscribe_table_committed_epoch(*table_id)
94                .await
95                .map_err(SinkError::from)
96        }
97        .boxed()
98    })
99}
100
101impl SinkCoordinatorManager {
102    pub fn start_worker(
103        db: DatabaseConnection,
104        hummock_manager: HummockManagerRef,
105        metadata_manager: MetadataManager,
106    ) -> (Self, (JoinHandle<()>, Sender<()>)) {
107        let subscriber =
108            new_committed_epoch_subscriber(hummock_manager.clone(), metadata_manager.clone());
109        Self::start_worker_with_spawn_worker(move |param, manager_request_stream| {
110            tokio::spawn(CoordinatorWorker::run(
111                param,
112                manager_request_stream,
113                db.clone(),
114                subscriber.clone(),
115            ))
116        })
117    }
118
119    fn start_worker_with_spawn_worker(
120        spawn_coordinator_worker: impl SpawnCoordinatorFn,
121    ) -> (Self, (JoinHandle<()>, Sender<()>)) {
122        let (request_tx, request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
123        let (shutdown_tx, shutdown_rx) = channel();
124        let worker = ManagerWorker::new(request_rx, shutdown_rx);
125        let join_handle = tokio::spawn(worker.execute(spawn_coordinator_worker));
126        (
127            SinkCoordinatorManager { request_tx },
128            (join_handle, shutdown_tx),
129        )
130    }
131
132    pub async fn handle_new_request(
133        &self,
134        mut request_stream: SinkWriterRequestStream,
135    ) -> Result<impl Stream<Item = Result<CoordinateResponse, Status>> + use<>, Status> {
136        let (param, vnode_bitmap) = match request_stream.try_next().await? {
137            Some(CoordinateRequest {
138                msg:
139                    Some(Msg::StartRequest(coordinate_request::StartCoordinationRequest {
140                        param: Some(param),
141                        vnode_bitmap: Some(vnode_bitmap),
142                    })),
143            }) => (SinkParam::from_proto(param), Bitmap::from(&vnode_bitmap)),
144            msg => {
145                return Err(Status::invalid_argument(format!(
146                    "expected CoordinateRequest::StartRequest in the first request, get {:?}",
147                    msg
148                )));
149            }
150        };
151        let (response_tx, response_rx) = mpsc::unbounded_channel();
152        self.request_tx
153            .send(ManagerRequest::NewSinkWriter(
154                SinkWriterCoordinationHandle::new(request_stream, response_tx, param, vnode_bitmap),
155            ))
156            .await
157            .map_err(|_| {
158                Status::unavailable(
159                    "unable to send to sink manager worker. The worker may have stopped",
160                )
161            })?;
162
163        Ok(UnboundedReceiverStream::new(response_rx))
164    }
165
166    async fn stop_coordinator(&self, sink_id: Option<SinkId>) {
167        let (tx, rx) = channel();
168        send_await_with_err_check!(
169            self.request_tx,
170            ManagerRequest::StopCoordinator {
171                finish_notifier: tx,
172                sink_id,
173            }
174        );
175        if rx.await.is_err() {
176            error!("fail to wait for resetting sink manager worker");
177        }
178        info!("successfully stop coordinator: {:?}", sink_id);
179    }
180
181    pub async fn reset(&self) {
182        self.stop_coordinator(None).await;
183    }
184
185    pub async fn stop_sink_coordinator(&self, sink_id: SinkId) {
186        self.stop_coordinator(Some(sink_id)).await;
187    }
188}
189
190struct CoordinatorWorkerHandle {
191    /// Sender to coordinator worker. Drop the sender as a stop signal
192    request_sender: Option<UnboundedSender<SinkWriterCoordinationHandle>>,
193    /// Notify when the coordinator worker stops
194    finish_notifiers: Vec<Sender<()>>,
195}
196
197struct ManagerWorker {
198    request_rx: mpsc::Receiver<ManagerRequest>,
199    // Make it option so that it can be polled with &mut SinkManagerWorker
200    shutdown_rx: Receiver<()>,
201
202    running_coordinator_worker_join_handles:
203        FuturesUnordered<BoxFuture<'static, (SinkId, Result<(), JoinError>)>>,
204    running_coordinator_worker: HashMap<SinkId, CoordinatorWorkerHandle>,
205}
206
207enum ManagerEvent {
208    NewRequest(ManagerRequest),
209    CoordinatorWorkerFinished {
210        sink_id: SinkId,
211        join_result: Result<(), JoinError>,
212    },
213}
214
215trait SpawnCoordinatorFn = FnMut(SinkParam, UnboundedReceiver<SinkWriterCoordinationHandle>) -> JoinHandle<()>
216    + Send
217    + 'static;
218
219impl ManagerWorker {
220    fn new(request_rx: mpsc::Receiver<ManagerRequest>, shutdown_rx: Receiver<()>) -> Self {
221        ManagerWorker {
222            request_rx,
223            shutdown_rx,
224            running_coordinator_worker_join_handles: Default::default(),
225            running_coordinator_worker: Default::default(),
226        }
227    }
228
229    async fn execute(mut self, mut spawn_coordinator_worker: impl SpawnCoordinatorFn) {
230        while let Some(event) = self.next_event().await {
231            match event {
232                ManagerEvent::NewRequest(request) => match request {
233                    ManagerRequest::NewSinkWriter(request) => {
234                        self.handle_new_sink_writer(request, &mut spawn_coordinator_worker)
235                    }
236                    ManagerRequest::StopCoordinator {
237                        finish_notifier,
238                        sink_id,
239                    } => {
240                        if let Some(sink_id) = sink_id {
241                            if let Some(worker_handle) =
242                                self.running_coordinator_worker.get_mut(&sink_id)
243                            {
244                                if let Some(sender) = worker_handle.request_sender.take() {
245                                    // drop the sender as a signal to notify the coordinator worker
246                                    // to stop
247                                    drop(sender);
248                                }
249                                worker_handle.finish_notifiers.push(finish_notifier);
250                            } else {
251                                debug!(
252                                    "sink coordinator of {} is not running. Notify finish directly",
253                                    sink_id.sink_id
254                                );
255                                send_with_err_check!(finish_notifier, ());
256                            }
257                        } else {
258                            self.clean_up().await;
259                            send_with_err_check!(finish_notifier, ());
260                        }
261                    }
262                },
263                ManagerEvent::CoordinatorWorkerFinished {
264                    sink_id,
265                    join_result,
266                } => self.handle_coordinator_finished(sink_id, join_result),
267            }
268        }
269        self.clean_up().await;
270        info!("sink manager worker exited");
271    }
272
273    async fn next_event(&mut self) -> Option<ManagerEvent> {
274        match select(
275            select(
276                pin!(self.request_rx.recv()),
277                pin!(pending_on_none(
278                    self.running_coordinator_worker_join_handles.next()
279                )),
280            ),
281            &mut self.shutdown_rx,
282        )
283        .await
284        {
285            Either::Left((either, _)) => match either {
286                Either::Left((Some(request), _)) => Some(ManagerEvent::NewRequest(request)),
287                Either::Left((None, _)) => None,
288                Either::Right(((sink_id, join_result), _)) => {
289                    Some(ManagerEvent::CoordinatorWorkerFinished {
290                        sink_id,
291                        join_result,
292                    })
293                }
294            },
295            Either::Right(_) => None,
296        }
297    }
298
299    async fn clean_up(&mut self) {
300        info!("sink manager worker start cleaning up");
301        for worker_handle in self.running_coordinator_worker.values_mut() {
302            if let Some(sender) = worker_handle.request_sender.take() {
303                // drop the sender to notify the coordinator worker to stop
304                drop(sender);
305            }
306        }
307        while let Some((sink_id, join_result)) =
308            self.running_coordinator_worker_join_handles.next().await
309        {
310            self.handle_coordinator_finished(sink_id, join_result);
311        }
312        info!("sink manager worker finished cleaning up");
313    }
314
315    fn handle_coordinator_finished(&mut self, sink_id: SinkId, join_result: Result<(), JoinError>) {
316        let worker_handle = self
317            .running_coordinator_worker
318            .remove(&sink_id)
319            .expect("finished coordinator should have an associated worker handle");
320        for finish_notifier in worker_handle.finish_notifiers {
321            send_with_err_check!(finish_notifier, ());
322        }
323        match join_result {
324            Ok(()) => {
325                info!(
326                    id = sink_id.sink_id,
327                    "sink coordinator has gracefully finished",
328                );
329            }
330            Err(err) => {
331                error!(
332                    id = sink_id.sink_id,
333                    error = %err.as_report(),
334                    "sink coordinator finished with error",
335                );
336            }
337        }
338    }
339
340    fn handle_new_sink_writer(
341        &mut self,
342        new_writer: SinkWriterCoordinationHandle,
343        spawn_coordinator_worker: &mut impl SpawnCoordinatorFn,
344    ) {
345        let param = new_writer.param();
346        let sink_id = param.sink_id;
347
348        let handle = self
349            .running_coordinator_worker
350            .entry(param.sink_id)
351            .or_insert_with(|| {
352                // Launch the coordinator worker task if it is the first
353                let (request_tx, request_rx) = unbounded_channel();
354                let join_handle = spawn_coordinator_worker(param.clone(), request_rx);
355                self.running_coordinator_worker_join_handles.push(
356                    join_handle
357                        .map(move |join_result| (sink_id, join_result))
358                        .boxed(),
359                );
360                CoordinatorWorkerHandle {
361                    request_sender: Some(request_tx),
362                    finish_notifiers: Vec::new(),
363                }
364            });
365
366        if let Some(sender) = handle.request_sender.as_mut() {
367            send_with_err_check!(sender, new_writer);
368        } else {
369            warn!(
370                "handle a new request while the sink coordinator is being stopped: {:?}",
371                param
372            );
373            new_writer.abort(Status::internal("the sink is being stopped"));
374        }
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use std::future::{Future, poll_fn};
381    use std::pin::pin;
382    use std::sync::Arc;
383    use std::task::Poll;
384
385    use anyhow::anyhow;
386    use async_trait::async_trait;
387    use futures::future::join;
388    use futures::{FutureExt, StreamExt};
389    use itertools::Itertools;
390    use rand::seq::SliceRandom;
391    use risingwave_common::bitmap::BitmapBuilder;
392    use risingwave_common::hash::VirtualNode;
393    use risingwave_connector::sink::catalog::{SinkId, SinkType};
394    use risingwave_connector::sink::{SinkCommitCoordinator, SinkError, SinkParam};
395    use risingwave_pb::connector_service::SinkMetadata;
396    use risingwave_pb::connector_service::sink_metadata::{Metadata, SerializedMetadata};
397    use risingwave_rpc_client::CoordinatorStreamHandle;
398    use tokio::sync::mpsc::unbounded_channel;
399    use tokio_stream::wrappers::ReceiverStream;
400
401    use crate::manager::sink_coordination::SinkCoordinatorManager;
402    use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
403    use crate::manager::sink_coordination::manager::SinkCommittedEpochSubscriber;
404
405    struct MockCoordinator<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> {
406        context: C,
407        f: F,
408    }
409
410    impl<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> MockCoordinator<C, F> {
411        fn new(context: C, f: F) -> Self {
412            MockCoordinator { context, f }
413        }
414    }
415
416    #[async_trait]
417    impl<C: Send, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError> + Send>
418        SinkCommitCoordinator for MockCoordinator<C, F>
419    {
420        async fn init(
421            &mut self,
422            _subscriber: SinkCommittedEpochSubscriber,
423        ) -> risingwave_connector::sink::Result<Option<u64>> {
424            Ok(None)
425        }
426
427        async fn commit(
428            &mut self,
429            epoch: u64,
430            metadata: Vec<SinkMetadata>,
431        ) -> risingwave_connector::sink::Result<()> {
432            (self.f)(epoch, metadata, &mut self.context)
433        }
434    }
435
436    #[tokio::test]
437    async fn test_basic() {
438        let param = SinkParam {
439            sink_id: SinkId::from(1),
440            sink_name: "test".into(),
441            properties: Default::default(),
442            columns: vec![],
443            downstream_pk: vec![],
444            sink_type: SinkType::AppendOnly,
445            format_desc: None,
446            db_name: "test".into(),
447            sink_from_name: "test".into(),
448        };
449
450        let epoch1 = 233;
451        let epoch2 = 234;
452
453        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
454        all_vnode.shuffle(&mut rand::rng());
455        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
456        let build_bitmap = |indexes: &[usize]| {
457            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
458            for i in indexes {
459                builder.set(*i, true);
460            }
461            builder.finish()
462        };
463        let vnode1 = build_bitmap(first);
464        let vnode2 = build_bitmap(second);
465
466        let metadata = [
467            [vec![1u8, 2u8], vec![3u8, 4u8]],
468            [vec![5u8, 6u8], vec![7u8, 8u8]],
469        ];
470        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
471            let (_sender, receiver) = unbounded_channel();
472
473            async move { Ok((1, receiver)) }.boxed()
474        });
475
476        let (manager, (_join_handle, _stop_tx)) =
477            SinkCoordinatorManager::start_worker_with_spawn_worker({
478                let expected_param = param.clone();
479                let metadata = metadata.clone();
480                move |param, new_writer_rx| {
481                    let metadata = metadata.clone();
482                    let expected_param = expected_param.clone();
483                    tokio::spawn({
484                        let subscriber = mock_subscriber.clone();
485                        async move {
486                            // validate the start request
487                            assert_eq!(param, expected_param);
488                            CoordinatorWorker::execute_coordinator(
489                                param.clone(),
490                                new_writer_rx,
491                                MockCoordinator::new(
492                                    0,
493                                    |epoch, metadata_list, count: &mut usize| {
494                                        *count += 1;
495                                        let mut metadata_list =
496                                            metadata_list
497                                                .into_iter()
498                                                .map(|metadata| match metadata {
499                                                    SinkMetadata {
500                                                        metadata:
501                                                            Some(Metadata::Serialized(
502                                                                SerializedMetadata { metadata },
503                                                            )),
504                                                    } => metadata,
505                                                    _ => unreachable!(),
506                                                })
507                                                .collect_vec();
508                                        metadata_list.sort();
509                                        match *count {
510                                            1 => {
511                                                assert_eq!(epoch, epoch1);
512                                                assert_eq!(2, metadata_list.len());
513                                                assert_eq!(metadata[0][0], metadata_list[0]);
514                                                assert_eq!(metadata[0][1], metadata_list[1]);
515                                            }
516                                            2 => {
517                                                assert_eq!(epoch, epoch2);
518                                                assert_eq!(2, metadata_list.len());
519                                                assert_eq!(metadata[1][0], metadata_list[0]);
520                                                assert_eq!(metadata[1][1], metadata_list[1]);
521                                            }
522                                            _ => unreachable!(),
523                                        }
524                                        Ok(())
525                                    },
526                                ),
527                                subscriber.clone(),
528                            )
529                            .await;
530                        }
531                    })
532                }
533            });
534
535        let build_client = |vnode| async {
536            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
537                Ok(tonic::Response::new(
538                    manager
539                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
540                        .await
541                        .unwrap()
542                        .boxed(),
543                ))
544            })
545            .await
546            .unwrap()
547            .0
548        };
549
550        let (mut client1, mut client2) =
551            join(build_client(vnode1), pin!(build_client(vnode2))).await;
552
553        {
554            // commit epoch1
555            let mut commit_future = pin!(
556                client2
557                    .commit(
558                        epoch1,
559                        SinkMetadata {
560                            metadata: Some(Metadata::Serialized(SerializedMetadata {
561                                metadata: metadata[0][1].clone(),
562                            })),
563                        },
564                    )
565                    .map(|result| result.unwrap())
566            );
567            assert!(
568                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
569                    .await
570                    .is_pending()
571            );
572            join(
573                commit_future,
574                client1
575                    .commit(
576                        epoch1,
577                        SinkMetadata {
578                            metadata: Some(Metadata::Serialized(SerializedMetadata {
579                                metadata: metadata[0][0].clone(),
580                            })),
581                        },
582                    )
583                    .map(|result| result.unwrap()),
584            )
585            .await;
586        }
587
588        // commit epoch2
589        let mut commit_future = pin!(
590            client1
591                .commit(
592                    epoch2,
593                    SinkMetadata {
594                        metadata: Some(Metadata::Serialized(SerializedMetadata {
595                            metadata: metadata[1][0].clone(),
596                        })),
597                    },
598                )
599                .map(|result| result.unwrap())
600        );
601        assert!(
602            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
603                .await
604                .is_pending()
605        );
606        join(
607            commit_future,
608            client2
609                .commit(
610                    epoch2,
611                    SinkMetadata {
612                        metadata: Some(Metadata::Serialized(SerializedMetadata {
613                            metadata: metadata[1][1].clone(),
614                        })),
615                    },
616                )
617                .map(|result| result.unwrap()),
618        )
619        .await;
620    }
621
622    #[tokio::test]
623    async fn test_single_writer() {
624        let param = SinkParam {
625            sink_id: SinkId::from(1),
626            sink_name: "test".into(),
627            properties: Default::default(),
628            columns: vec![],
629            downstream_pk: vec![],
630            sink_type: SinkType::AppendOnly,
631            format_desc: None,
632            db_name: "test".into(),
633            sink_from_name: "test".into(),
634        };
635
636        let epoch1 = 233;
637        let epoch2 = 234;
638
639        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
640        let build_bitmap = |indexes: &[usize]| {
641            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
642            for i in indexes {
643                builder.set(*i, true);
644            }
645            builder.finish()
646        };
647        let vnode = build_bitmap(&all_vnode);
648
649        let metadata = [vec![1u8, 2u8], vec![3u8, 4u8]];
650        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
651            let (_sender, receiver) = unbounded_channel();
652
653            async move { Ok((1, receiver)) }.boxed()
654        });
655        let (manager, (_join_handle, _stop_tx)) =
656            SinkCoordinatorManager::start_worker_with_spawn_worker({
657                let expected_param = param.clone();
658                let metadata = metadata.clone();
659                move |param, new_writer_rx| {
660                    let metadata = metadata.clone();
661                    let expected_param = expected_param.clone();
662                    tokio::spawn({
663                        let subscriber = mock_subscriber.clone();
664                        async move {
665                            // validate the start request
666                            assert_eq!(param, expected_param);
667                            CoordinatorWorker::execute_coordinator(
668                                param.clone(),
669                                new_writer_rx,
670                                MockCoordinator::new(
671                                    0,
672                                    |epoch, metadata_list, count: &mut usize| {
673                                        *count += 1;
674                                        let mut metadata_list =
675                                            metadata_list
676                                                .into_iter()
677                                                .map(|metadata| match metadata {
678                                                    SinkMetadata {
679                                                        metadata:
680                                                            Some(Metadata::Serialized(
681                                                                SerializedMetadata { metadata },
682                                                            )),
683                                                    } => metadata,
684                                                    _ => unreachable!(),
685                                                })
686                                                .collect_vec();
687                                        metadata_list.sort();
688                                        match *count {
689                                            1 => {
690                                                assert_eq!(epoch, epoch1);
691                                                assert_eq!(1, metadata_list.len());
692                                                assert_eq!(metadata[0], metadata_list[0]);
693                                            }
694                                            2 => {
695                                                assert_eq!(epoch, epoch2);
696                                                assert_eq!(1, metadata_list.len());
697                                                assert_eq!(metadata[1], metadata_list[0]);
698                                            }
699                                            _ => unreachable!(),
700                                        }
701                                        Ok(())
702                                    },
703                                ),
704                                subscriber.clone(),
705                            )
706                            .await;
707                        }
708                    })
709                }
710            });
711
712        let build_client = |vnode| async {
713            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
714                Ok(tonic::Response::new(
715                    manager
716                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
717                        .await
718                        .unwrap()
719                        .boxed(),
720                ))
721            })
722            .await
723            .unwrap()
724            .0
725        };
726
727        let mut client = build_client(vnode).await;
728
729        client
730            .commit(
731                epoch1,
732                SinkMetadata {
733                    metadata: Some(Metadata::Serialized(SerializedMetadata {
734                        metadata: metadata[0].clone(),
735                    })),
736                },
737            )
738            .await
739            .unwrap();
740
741        client
742            .commit(
743                epoch2,
744                SinkMetadata {
745                    metadata: Some(Metadata::Serialized(SerializedMetadata {
746                        metadata: metadata[1].clone(),
747                    })),
748                },
749            )
750            .await
751            .unwrap();
752    }
753
754    #[tokio::test]
755    async fn test_partial_commit() {
756        let param = SinkParam {
757            sink_id: SinkId::from(1),
758            sink_name: "test".into(),
759            properties: Default::default(),
760            columns: vec![],
761            downstream_pk: vec![],
762            sink_type: SinkType::AppendOnly,
763            format_desc: None,
764            db_name: "test".into(),
765            sink_from_name: "test".into(),
766        };
767
768        let epoch = 233;
769
770        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
771        all_vnode.shuffle(&mut rand::rng());
772        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
773        let build_bitmap = |indexes: &[usize]| {
774            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
775            for i in indexes {
776                builder.set(*i, true);
777            }
778            builder.finish()
779        };
780        let vnode1 = build_bitmap(first);
781        let vnode2 = build_bitmap(second);
782
783        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
784            let (_sender, receiver) = unbounded_channel();
785
786            async move { Ok((1, receiver)) }.boxed()
787        });
788        let (manager, (_join_handle, _stop_tx)) =
789            SinkCoordinatorManager::start_worker_with_spawn_worker({
790                let expected_param = param.clone();
791                move |param, new_writer_rx| {
792                    let expected_param = expected_param.clone();
793                    tokio::spawn({
794                        let subscriber = mock_subscriber.clone();
795                        async move {
796                            // validate the start request
797                            assert_eq!(param, expected_param);
798                            CoordinatorWorker::execute_coordinator(
799                                param,
800                                new_writer_rx,
801                                MockCoordinator::new((), |_, _, _| unreachable!()),
802                                subscriber.clone(),
803                            )
804                            .await;
805                        }
806                    })
807                }
808            });
809
810        let build_client = |vnode| async {
811            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
812                Ok(tonic::Response::new(
813                    manager
814                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
815                        .await
816                        .unwrap()
817                        .boxed(),
818                ))
819            })
820            .await
821            .unwrap()
822            .0
823        };
824
825        let (mut client1, client2) = join(build_client(vnode1), build_client(vnode2)).await;
826
827        // commit epoch
828        let mut commit_future = pin!(client1.commit(
829            epoch,
830            SinkMetadata {
831                metadata: Some(Metadata::Serialized(SerializedMetadata {
832                    metadata: vec![],
833                })),
834            },
835        ));
836        assert!(
837            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
838                .await
839                .is_pending()
840        );
841        drop(client2);
842        assert!(commit_future.await.is_err());
843    }
844
845    #[tokio::test]
846    async fn test_fail_commit() {
847        let param = SinkParam {
848            sink_id: SinkId::from(1),
849            sink_name: "test".into(),
850            properties: Default::default(),
851            columns: vec![],
852            downstream_pk: vec![],
853            sink_type: SinkType::AppendOnly,
854            format_desc: None,
855            db_name: "test".into(),
856            sink_from_name: "test".into(),
857        };
858
859        let epoch = 233;
860
861        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
862        all_vnode.shuffle(&mut rand::rng());
863        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
864        let build_bitmap = |indexes: &[usize]| {
865            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
866            for i in indexes {
867                builder.set(*i, true);
868            }
869            builder.finish()
870        };
871        let vnode1 = build_bitmap(first);
872        let vnode2 = build_bitmap(second);
873        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
874            let (_sender, receiver) = unbounded_channel();
875
876            async move { Ok((1, receiver)) }.boxed()
877        });
878        let (manager, (_join_handle, _stop_tx)) =
879            SinkCoordinatorManager::start_worker_with_spawn_worker({
880                let expected_param = param.clone();
881                move |param, new_writer_rx| {
882                    let expected_param = expected_param.clone();
883                    tokio::spawn({
884                        let subscriber = mock_subscriber.clone();
885                        {
886                            async move {
887                                // validate the start request
888                                assert_eq!(param, expected_param);
889                                CoordinatorWorker::execute_coordinator(
890                                    param,
891                                    new_writer_rx,
892                                    MockCoordinator::new((), |_, _, _| {
893                                        Err(SinkError::Coordinator(anyhow!("failed to commit")))
894                                    }),
895                                    subscriber.clone(),
896                                )
897                                .await;
898                            }
899                        }
900                    })
901                }
902            });
903
904        let build_client = |vnode| async {
905            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
906                Ok(tonic::Response::new(
907                    manager
908                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
909                        .await
910                        .unwrap()
911                        .boxed(),
912                ))
913            })
914            .await
915            .unwrap()
916            .0
917        };
918
919        let (mut client1, mut client2) = join(build_client(vnode1), build_client(vnode2)).await;
920
921        // commit epoch
922        let mut commit_future = pin!(client1.commit(
923            epoch,
924            SinkMetadata {
925                metadata: Some(Metadata::Serialized(SerializedMetadata {
926                    metadata: vec![],
927                })),
928            },
929        ));
930        assert!(
931            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
932                .await
933                .is_pending()
934        );
935        let (result1, result2) = join(
936            commit_future,
937            client2.commit(
938                epoch,
939                SinkMetadata {
940                    metadata: Some(Metadata::Serialized(SerializedMetadata {
941                        metadata: vec![],
942                    })),
943                },
944            ),
945        )
946        .await;
947        assert!(result1.is_err());
948        assert!(result2.is_err());
949    }
950
951    #[tokio::test]
952    async fn test_update_vnode_bitmap() {
953        let param = SinkParam {
954            sink_id: SinkId::from(1),
955            sink_name: "test".into(),
956            properties: Default::default(),
957            columns: vec![],
958            downstream_pk: vec![],
959            sink_type: SinkType::AppendOnly,
960            format_desc: None,
961            db_name: "test".into(),
962            sink_from_name: "test".into(),
963        };
964
965        let epoch1 = 233;
966        let epoch2 = 234;
967        let epoch3 = 235;
968        let epoch4 = 236;
969
970        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
971        all_vnode.shuffle(&mut rand::rng());
972        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
973        let build_bitmap = |indexes: &[usize]| {
974            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
975            for i in indexes {
976                builder.set(*i, true);
977            }
978            builder.finish()
979        };
980        let vnode1 = build_bitmap(first);
981        let vnode2 = build_bitmap(second);
982
983        let metadata = [
984            [vec![1u8, 2u8], vec![3u8, 4u8]],
985            [vec![5u8, 6u8], vec![7u8, 8u8]],
986        ];
987
988        let metadata_scale_out = [vec![9u8, 10u8], vec![11u8, 12u8], vec![13u8, 14u8]];
989        let metadata_scale_in = [vec![13u8, 14u8], vec![15u8, 16u8]];
990        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
991            let (_sender, receiver) = unbounded_channel();
992
993            async move { Ok((1, receiver)) }.boxed()
994        });
995        let (manager, (_join_handle, _stop_tx)) =
996            SinkCoordinatorManager::start_worker_with_spawn_worker({
997                let expected_param = param.clone();
998                let metadata = metadata.clone();
999                let metadata_scale_out = metadata_scale_out.clone();
1000                let metadata_scale_in = metadata_scale_in.clone();
1001                move |param, new_writer_rx| {
1002                    let metadata = metadata.clone();
1003                    let metadata_scale_out = metadata_scale_out.clone();
1004                    let metadata_scale_in = metadata_scale_in.clone();
1005                    let expected_param = expected_param.clone();
1006                    tokio::spawn({
1007                        let subscriber = mock_subscriber.clone();
1008                        async move {
1009                            // validate the start request
1010                            assert_eq!(param, expected_param);
1011                            CoordinatorWorker::execute_coordinator(
1012                                param.clone(),
1013                                new_writer_rx,
1014                                MockCoordinator::new(
1015                                    0,
1016                                    |epoch, metadata_list, count: &mut usize| {
1017                                        *count += 1;
1018                                        let mut metadata_list =
1019                                            metadata_list
1020                                                .into_iter()
1021                                                .map(|metadata| match metadata {
1022                                                    SinkMetadata {
1023                                                        metadata:
1024                                                            Some(Metadata::Serialized(
1025                                                                SerializedMetadata { metadata },
1026                                                            )),
1027                                                    } => metadata,
1028                                                    _ => unreachable!(),
1029                                                })
1030                                                .collect_vec();
1031                                        metadata_list.sort();
1032                                        let (expected_epoch, expected_metadata_list) = match *count
1033                                        {
1034                                            1 => (epoch1, metadata[0].as_slice()),
1035                                            2 => (epoch2, metadata[1].as_slice()),
1036                                            3 => (epoch3, metadata_scale_out.as_slice()),
1037                                            4 => (epoch4, metadata_scale_in.as_slice()),
1038                                            _ => unreachable!(),
1039                                        };
1040                                        assert_eq!(expected_epoch, epoch);
1041                                        assert_eq!(expected_metadata_list, &metadata_list);
1042                                        Ok(())
1043                                    },
1044                                ),
1045                                subscriber.clone(),
1046                            )
1047                            .await;
1048                        }
1049                    })
1050                }
1051            });
1052
1053        let build_client = |vnode| async {
1054            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1055                Ok(tonic::Response::new(
1056                    manager
1057                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1058                        .await
1059                        .unwrap()
1060                        .boxed(),
1061                ))
1062            })
1063            .await
1064            .unwrap()
1065            .0
1066        };
1067
1068        let (mut client1, mut client2) =
1069            join(build_client(vnode1), pin!(build_client(vnode2))).await;
1070
1071        {
1072            // commit epoch1
1073            let mut commit_future = pin!(
1074                client2
1075                    .commit(
1076                        epoch1,
1077                        SinkMetadata {
1078                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1079                                metadata: metadata[0][1].clone(),
1080                            })),
1081                        },
1082                    )
1083                    .map(|result| result.unwrap())
1084            );
1085            assert!(
1086                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1087                    .await
1088                    .is_pending()
1089            );
1090            join(
1091                commit_future,
1092                client1
1093                    .commit(
1094                        epoch1,
1095                        SinkMetadata {
1096                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1097                                metadata: metadata[0][0].clone(),
1098                            })),
1099                        },
1100                    )
1101                    .map(|result| result.unwrap()),
1102            )
1103            .await;
1104        }
1105
1106        let (vnode1, vnode2, vnode3) = {
1107            let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1108            let (second, third) = second.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1109            (
1110                build_bitmap(first),
1111                build_bitmap(second),
1112                build_bitmap(third),
1113            )
1114        };
1115
1116        let mut client3 = build_client(vnode3).await;
1117        {
1118            let mut commit_future3 = pin!(
1119                client3
1120                    .commit(
1121                        epoch3,
1122                        SinkMetadata {
1123                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1124                                metadata: metadata_scale_out[2].clone(),
1125                            })),
1126                        },
1127                    )
1128                    .map(|result| result.unwrap())
1129            );
1130            assert!(
1131                poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1132                    .await
1133                    .is_pending()
1134            );
1135
1136            {
1137                // commit epoch2
1138                let mut commit_future = pin!(
1139                    client1
1140                        .commit(
1141                            epoch2,
1142                            SinkMetadata {
1143                                metadata: Some(Metadata::Serialized(SerializedMetadata {
1144                                    metadata: metadata[1][0].clone(),
1145                                })),
1146                            },
1147                        )
1148                        .map(|result| result.unwrap())
1149                );
1150                assert!(
1151                    poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1152                        .await
1153                        .is_pending()
1154                );
1155                join(
1156                    commit_future,
1157                    client2
1158                        .commit(
1159                            epoch2,
1160                            SinkMetadata {
1161                                metadata: Some(Metadata::Serialized(SerializedMetadata {
1162                                    metadata: metadata[1][1].clone(),
1163                                })),
1164                            },
1165                        )
1166                        .map(|result| result.unwrap()),
1167                )
1168                .await;
1169            }
1170
1171            client1.update_vnode_bitmap(&vnode1).await.unwrap();
1172            client2.update_vnode_bitmap(&vnode2).await.unwrap();
1173            let mut commit_future1 = pin!(
1174                client1
1175                    .commit(
1176                        epoch3,
1177                        SinkMetadata {
1178                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1179                                metadata: metadata_scale_out[0].clone(),
1180                            })),
1181                        },
1182                    )
1183                    .map(|result| result.unwrap())
1184            );
1185            assert!(
1186                poll_fn(|cx| Poll::Ready(commit_future1.as_mut().poll(cx)))
1187                    .await
1188                    .is_pending()
1189            );
1190            assert!(
1191                poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1192                    .await
1193                    .is_pending()
1194            );
1195            client2
1196                .commit(
1197                    epoch3,
1198                    SinkMetadata {
1199                        metadata: Some(Metadata::Serialized(SerializedMetadata {
1200                            metadata: metadata_scale_out[1].clone(),
1201                        })),
1202                    },
1203                )
1204                .map(|result| result.unwrap())
1205                .await;
1206        }
1207
1208        let (vnode2, vnode3) = {
1209            let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1210            (build_bitmap(first), build_bitmap(second))
1211        };
1212
1213        // client1.stop().await.unwrap();
1214        client2.update_vnode_bitmap(&vnode2).await.unwrap();
1215        client3.update_vnode_bitmap(&vnode3).await.unwrap();
1216
1217        {
1218            let mut commit_future = pin!(
1219                client2
1220                    .commit(
1221                        epoch4,
1222                        SinkMetadata {
1223                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1224                                metadata: metadata_scale_in[0].clone(),
1225                            })),
1226                        },
1227                    )
1228                    .map(|result| result.unwrap())
1229            );
1230            assert!(
1231                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1232                    .await
1233                    .is_pending()
1234            );
1235            join(
1236                commit_future,
1237                client3
1238                    .commit(
1239                        epoch4,
1240                        SinkMetadata {
1241                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1242                                metadata: metadata_scale_in[1].clone(),
1243                            })),
1244                        },
1245                    )
1246                    .map(|result| result.unwrap()),
1247            )
1248            .await;
1249        }
1250    }
1251}