risingwave_meta/manager/sink_coordination/
manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use futures::future::{BoxFuture, Either, select};
21use futures::stream::FuturesUnordered;
22use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
25use risingwave_connector::sink::catalog::SinkId;
26use risingwave_connector::sink::{SinkCommittedEpochSubscriber, SinkError, SinkParam};
27use risingwave_pb::connector_service::coordinate_request::Msg;
28use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse, coordinate_request};
29use rw_futures_util::pending_on_none;
30use sea_orm::DatabaseConnection;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
34use tokio::sync::oneshot::{Receiver, Sender, channel};
35use tokio::task::{JoinError, JoinHandle};
36use tokio_stream::wrappers::UnboundedReceiverStream;
37use tonic::Status;
38use tracing::{debug, error, info, warn};
39
40use crate::hummock::HummockManagerRef;
41use crate::manager::MetadataManager;
42use crate::manager::sink_coordination::SinkWriterRequestStream;
43use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
44use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
45
46macro_rules! send_with_err_check {
47    ($tx:expr, $msg:expr) => {
48        if $tx.send($msg).is_err() {
49            error!("unable to send msg");
50        }
51    };
52}
53
54macro_rules! send_await_with_err_check {
55    ($tx:expr, $msg:expr) => {
56        if $tx.send($msg).await.is_err() {
57            error!("unable to send msg");
58        }
59    };
60}
61
62const BOUNDED_CHANNEL_SIZE: usize = 16;
63
64enum ManagerRequest {
65    NewSinkWriter(SinkWriterCoordinationHandle),
66    StopCoordinator {
67        finish_notifier: Sender<()>,
68        /// sink id to stop. When `None`, stop all sink coordinator
69        sink_id: Option<SinkId>,
70    },
71}
72
73#[derive(Clone)]
74pub struct SinkCoordinatorManager {
75    request_tx: mpsc::Sender<ManagerRequest>,
76}
77
78fn new_committed_epoch_subscriber(
79    hummock_manager: HummockManagerRef,
80    metadata_manager: MetadataManager,
81) -> SinkCommittedEpochSubscriber {
82    Arc::new(move |sink_id| {
83        let hummock_manager = hummock_manager.clone();
84        let metadata_manager = metadata_manager.clone();
85        async move {
86            let state_table_ids = metadata_manager
87                .get_sink_state_table_ids(sink_id.sink_id as _)
88                .await
89                .map_err(SinkError::from)?;
90            let Some(table_id) = state_table_ids.first() else {
91                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
92            };
93            hummock_manager
94                .subscribe_table_committed_epoch(*table_id)
95                .await
96                .map_err(SinkError::from)
97        }
98        .boxed()
99    })
100}
101
102impl SinkCoordinatorManager {
103    #[cfg(test)]
104    pub(crate) fn for_test() -> Self {
105        let (request_tx, mut request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
106        let _join_handle = tokio::spawn(async move {
107            while let Some(req) = request_rx.recv().await {
108                let ManagerRequest::StopCoordinator {
109                    finish_notifier,
110                    sink_id,
111                } = req
112                else {
113                    unreachable!()
114                };
115                assert_eq!(sink_id, None);
116                finish_notifier.send(()).unwrap();
117            }
118        });
119        SinkCoordinatorManager { request_tx }
120    }
121
122    pub fn start_worker(
123        db: DatabaseConnection,
124        hummock_manager: HummockManagerRef,
125        metadata_manager: MetadataManager,
126        iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
127    ) -> (Self, (JoinHandle<()>, Sender<()>)) {
128        let subscriber = new_committed_epoch_subscriber(hummock_manager, metadata_manager);
129        Self::start_worker_with_spawn_worker(move |param, manager_request_stream| {
130            tokio::spawn(CoordinatorWorker::run(
131                param,
132                manager_request_stream,
133                db.clone(),
134                subscriber.clone(),
135                iceberg_compact_stat_sender.clone(),
136            ))
137        })
138    }
139
140    fn start_worker_with_spawn_worker(
141        spawn_coordinator_worker: impl SpawnCoordinatorFn,
142    ) -> (Self, (JoinHandle<()>, Sender<()>)) {
143        let (request_tx, request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
144        let (shutdown_tx, shutdown_rx) = channel();
145        let worker = ManagerWorker::new(request_rx, shutdown_rx);
146        let join_handle = tokio::spawn(worker.execute(spawn_coordinator_worker));
147        (
148            SinkCoordinatorManager { request_tx },
149            (join_handle, shutdown_tx),
150        )
151    }
152
153    pub async fn handle_new_request(
154        &self,
155        mut request_stream: SinkWriterRequestStream,
156    ) -> Result<impl Stream<Item = Result<CoordinateResponse, Status>> + use<>, Status> {
157        let (param, vnode_bitmap) = match request_stream.try_next().await? {
158            Some(CoordinateRequest {
159                msg:
160                    Some(Msg::StartRequest(coordinate_request::StartCoordinationRequest {
161                        param: Some(param),
162                        vnode_bitmap: Some(vnode_bitmap),
163                    })),
164            }) => (SinkParam::from_proto(param), Bitmap::from(&vnode_bitmap)),
165            msg => {
166                return Err(Status::invalid_argument(format!(
167                    "expected CoordinateRequest::StartRequest in the first request, get {:?}",
168                    msg
169                )));
170            }
171        };
172        let (response_tx, response_rx) = mpsc::unbounded_channel();
173        self.request_tx
174            .send(ManagerRequest::NewSinkWriter(
175                SinkWriterCoordinationHandle::new(request_stream, response_tx, param, vnode_bitmap),
176            ))
177            .await
178            .map_err(|_| {
179                Status::unavailable(
180                    "unable to send to sink manager worker. The worker may have stopped",
181                )
182            })?;
183
184        Ok(UnboundedReceiverStream::new(response_rx))
185    }
186
187    async fn stop_coordinator(&self, sink_id: Option<SinkId>) {
188        let (tx, rx) = channel();
189        send_await_with_err_check!(
190            self.request_tx,
191            ManagerRequest::StopCoordinator {
192                finish_notifier: tx,
193                sink_id,
194            }
195        );
196        if rx.await.is_err() {
197            error!("fail to wait for resetting sink manager worker");
198        }
199        info!("successfully stop coordinator: {:?}", sink_id);
200    }
201
202    pub async fn reset(&self) {
203        self.stop_coordinator(None).await;
204    }
205
206    pub async fn stop_sink_coordinator(&self, sink_id: SinkId) {
207        self.stop_coordinator(Some(sink_id)).await;
208    }
209}
210
211struct CoordinatorWorkerHandle {
212    /// Sender to coordinator worker. Drop the sender as a stop signal
213    request_sender: Option<UnboundedSender<SinkWriterCoordinationHandle>>,
214    /// Notify when the coordinator worker stops
215    finish_notifiers: Vec<Sender<()>>,
216}
217
218struct ManagerWorker {
219    request_rx: mpsc::Receiver<ManagerRequest>,
220    // Make it option so that it can be polled with &mut SinkManagerWorker
221    shutdown_rx: Receiver<()>,
222
223    running_coordinator_worker_join_handles:
224        FuturesUnordered<BoxFuture<'static, (SinkId, Result<(), JoinError>)>>,
225    running_coordinator_worker: HashMap<SinkId, CoordinatorWorkerHandle>,
226}
227
228enum ManagerEvent {
229    NewRequest(ManagerRequest),
230    CoordinatorWorkerFinished {
231        sink_id: SinkId,
232        join_result: Result<(), JoinError>,
233    },
234}
235
236trait SpawnCoordinatorFn = FnMut(SinkParam, UnboundedReceiver<SinkWriterCoordinationHandle>) -> JoinHandle<()>
237    + Send
238    + 'static;
239
240impl ManagerWorker {
241    fn new(request_rx: mpsc::Receiver<ManagerRequest>, shutdown_rx: Receiver<()>) -> Self {
242        ManagerWorker {
243            request_rx,
244            shutdown_rx,
245            running_coordinator_worker_join_handles: Default::default(),
246            running_coordinator_worker: Default::default(),
247        }
248    }
249
250    async fn execute(mut self, mut spawn_coordinator_worker: impl SpawnCoordinatorFn) {
251        while let Some(event) = self.next_event().await {
252            match event {
253                ManagerEvent::NewRequest(request) => match request {
254                    ManagerRequest::NewSinkWriter(request) => {
255                        self.handle_new_sink_writer(request, &mut spawn_coordinator_worker)
256                    }
257                    ManagerRequest::StopCoordinator {
258                        finish_notifier,
259                        sink_id,
260                    } => {
261                        if let Some(sink_id) = sink_id {
262                            if let Some(worker_handle) =
263                                self.running_coordinator_worker.get_mut(&sink_id)
264                            {
265                                if let Some(sender) = worker_handle.request_sender.take() {
266                                    // drop the sender as a signal to notify the coordinator worker
267                                    // to stop
268                                    drop(sender);
269                                }
270                                worker_handle.finish_notifiers.push(finish_notifier);
271                            } else {
272                                debug!(
273                                    "sink coordinator of {} is not running. Notify finish directly",
274                                    sink_id.sink_id
275                                );
276                                send_with_err_check!(finish_notifier, ());
277                            }
278                        } else {
279                            self.clean_up().await;
280                            send_with_err_check!(finish_notifier, ());
281                        }
282                    }
283                },
284                ManagerEvent::CoordinatorWorkerFinished {
285                    sink_id,
286                    join_result,
287                } => self.handle_coordinator_finished(sink_id, join_result),
288            }
289        }
290        self.clean_up().await;
291        info!("sink manager worker exited");
292    }
293
294    async fn next_event(&mut self) -> Option<ManagerEvent> {
295        match select(
296            select(
297                pin!(self.request_rx.recv()),
298                pin!(pending_on_none(
299                    self.running_coordinator_worker_join_handles.next()
300                )),
301            ),
302            &mut self.shutdown_rx,
303        )
304        .await
305        {
306            Either::Left((either, _)) => match either {
307                Either::Left((Some(request), _)) => Some(ManagerEvent::NewRequest(request)),
308                Either::Left((None, _)) => None,
309                Either::Right(((sink_id, join_result), _)) => {
310                    Some(ManagerEvent::CoordinatorWorkerFinished {
311                        sink_id,
312                        join_result,
313                    })
314                }
315            },
316            Either::Right(_) => None,
317        }
318    }
319
320    async fn clean_up(&mut self) {
321        info!("sink manager worker start cleaning up");
322        for worker_handle in self.running_coordinator_worker.values_mut() {
323            if let Some(sender) = worker_handle.request_sender.take() {
324                // drop the sender to notify the coordinator worker to stop
325                drop(sender);
326            }
327        }
328        while let Some((sink_id, join_result)) =
329            self.running_coordinator_worker_join_handles.next().await
330        {
331            self.handle_coordinator_finished(sink_id, join_result);
332        }
333        info!("sink manager worker finished cleaning up");
334    }
335
336    fn handle_coordinator_finished(&mut self, sink_id: SinkId, join_result: Result<(), JoinError>) {
337        let worker_handle = self
338            .running_coordinator_worker
339            .remove(&sink_id)
340            .expect("finished coordinator should have an associated worker handle");
341        for finish_notifier in worker_handle.finish_notifiers {
342            send_with_err_check!(finish_notifier, ());
343        }
344        match join_result {
345            Ok(()) => {
346                info!(
347                    id = sink_id.sink_id,
348                    "sink coordinator has gracefully finished",
349                );
350            }
351            Err(err) => {
352                error!(
353                    id = sink_id.sink_id,
354                    error = %err.as_report(),
355                    "sink coordinator finished with error",
356                );
357            }
358        }
359    }
360
361    fn handle_new_sink_writer(
362        &mut self,
363        new_writer: SinkWriterCoordinationHandle,
364        spawn_coordinator_worker: &mut impl SpawnCoordinatorFn,
365    ) {
366        let param = new_writer.param();
367        let sink_id = param.sink_id;
368
369        let handle = self
370            .running_coordinator_worker
371            .entry(param.sink_id)
372            .or_insert_with(|| {
373                // Launch the coordinator worker task if it is the first
374                let (request_tx, request_rx) = unbounded_channel();
375                let join_handle = spawn_coordinator_worker(param.clone(), request_rx);
376                self.running_coordinator_worker_join_handles.push(
377                    join_handle
378                        .map(move |join_result| (sink_id, join_result))
379                        .boxed(),
380                );
381                CoordinatorWorkerHandle {
382                    request_sender: Some(request_tx),
383                    finish_notifiers: Vec::new(),
384                }
385            });
386
387        if let Some(sender) = handle.request_sender.as_mut() {
388            send_with_err_check!(sender, new_writer);
389        } else {
390            warn!(
391                "handle a new request while the sink coordinator is being stopped: {:?}",
392                param
393            );
394            new_writer.abort(Status::internal("the sink is being stopped"));
395        }
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use std::future::{Future, poll_fn};
402    use std::pin::pin;
403    use std::sync::Arc;
404    use std::task::Poll;
405
406    use anyhow::anyhow;
407    use async_trait::async_trait;
408    use futures::future::{join, try_join};
409    use futures::{FutureExt, StreamExt, TryFutureExt};
410    use itertools::Itertools;
411    use rand::seq::SliceRandom;
412    use risingwave_common::bitmap::BitmapBuilder;
413    use risingwave_common::catalog::Field;
414    use risingwave_common::hash::VirtualNode;
415    use risingwave_connector::sink::catalog::{SinkId, SinkType};
416    use risingwave_connector::sink::{SinkCommitCoordinator, SinkError, SinkParam};
417    use risingwave_pb::connector_service::SinkMetadata;
418    use risingwave_pb::connector_service::sink_metadata::{Metadata, SerializedMetadata};
419    use risingwave_rpc_client::CoordinatorStreamHandle;
420    use tokio::sync::mpsc::unbounded_channel;
421    use tokio_stream::wrappers::ReceiverStream;
422
423    use crate::manager::sink_coordination::SinkCoordinatorManager;
424    use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
425    use crate::manager::sink_coordination::manager::SinkCommittedEpochSubscriber;
426
427    struct MockCoordinator<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> {
428        context: C,
429        f: F,
430    }
431
432    impl<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> MockCoordinator<C, F> {
433        fn new(context: C, f: F) -> Self {
434            MockCoordinator { context, f }
435        }
436    }
437
438    #[async_trait]
439    impl<C: Send, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError> + Send>
440        SinkCommitCoordinator for MockCoordinator<C, F>
441    {
442        async fn init(
443            &mut self,
444            _subscriber: SinkCommittedEpochSubscriber,
445        ) -> risingwave_connector::sink::Result<Option<u64>> {
446            Ok(None)
447        }
448
449        async fn commit(
450            &mut self,
451            epoch: u64,
452            metadata: Vec<SinkMetadata>,
453            _add_columns: Option<Vec<Field>>,
454        ) -> risingwave_connector::sink::Result<()> {
455            (self.f)(epoch, metadata, &mut self.context)
456        }
457    }
458
459    #[tokio::test]
460    async fn test_basic() {
461        let param = SinkParam {
462            sink_id: SinkId::from(1),
463            sink_name: "test".into(),
464            properties: Default::default(),
465            columns: vec![],
466            downstream_pk: None,
467            sink_type: SinkType::AppendOnly,
468            format_desc: None,
469            db_name: "test".into(),
470            sink_from_name: "test".into(),
471        };
472
473        let epoch0 = 232;
474        let epoch1 = 233;
475        let epoch2 = 234;
476
477        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
478        all_vnode.shuffle(&mut rand::rng());
479        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
480        let build_bitmap = |indexes: &[usize]| {
481            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
482            for i in indexes {
483                builder.set(*i, true);
484            }
485            builder.finish()
486        };
487        let vnode1 = build_bitmap(first);
488        let vnode2 = build_bitmap(second);
489
490        let metadata = [
491            [vec![1u8, 2u8], vec![3u8, 4u8]],
492            [vec![5u8, 6u8], vec![7u8, 8u8]],
493        ];
494        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
495            let (_sender, receiver) = unbounded_channel();
496
497            async move { Ok((1, receiver)) }.boxed()
498        });
499
500        let (manager, (_join_handle, _stop_tx)) =
501            SinkCoordinatorManager::start_worker_with_spawn_worker({
502                let expected_param = param.clone();
503                let metadata = metadata.clone();
504                move |param, new_writer_rx| {
505                    let metadata = metadata.clone();
506                    let expected_param = expected_param.clone();
507                    tokio::spawn({
508                        let subscriber = mock_subscriber.clone();
509                        async move {
510                            // validate the start request
511                            assert_eq!(param, expected_param);
512                            CoordinatorWorker::execute_coordinator(
513                                param.clone(),
514                                new_writer_rx,
515                                MockCoordinator::new(
516                                    0,
517                                    |epoch, metadata_list, count: &mut usize| {
518                                        *count += 1;
519                                        let mut metadata_list =
520                                            metadata_list
521                                                .into_iter()
522                                                .map(|metadata| match metadata {
523                                                    SinkMetadata {
524                                                        metadata:
525                                                            Some(Metadata::Serialized(
526                                                                SerializedMetadata { metadata },
527                                                            )),
528                                                    } => metadata,
529                                                    _ => unreachable!(),
530                                                })
531                                                .collect_vec();
532                                        metadata_list.sort();
533                                        match *count {
534                                            1 => {
535                                                assert_eq!(epoch, epoch1);
536                                                assert_eq!(2, metadata_list.len());
537                                                assert_eq!(metadata[0][0], metadata_list[0]);
538                                                assert_eq!(metadata[0][1], metadata_list[1]);
539                                            }
540                                            2 => {
541                                                assert_eq!(epoch, epoch2);
542                                                assert_eq!(2, metadata_list.len());
543                                                assert_eq!(metadata[1][0], metadata_list[0]);
544                                                assert_eq!(metadata[1][1], metadata_list[1]);
545                                            }
546                                            _ => unreachable!(),
547                                        }
548                                        Ok(())
549                                    },
550                                ),
551                                subscriber.clone(),
552                            )
553                            .await;
554                        }
555                    })
556                }
557            });
558
559        let build_client = |vnode| async {
560            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
561                Ok(tonic::Response::new(
562                    manager
563                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
564                        .await
565                        .unwrap()
566                        .boxed(),
567                ))
568            })
569            .await
570            .unwrap()
571            .0
572        };
573
574        let (mut client1, mut client2) =
575            join(build_client(vnode1), pin!(build_client(vnode2))).await;
576
577        let (aligned_epoch1, aligned_epoch2) = try_join(
578            client1.align_initial_epoch(epoch0),
579            client2.align_initial_epoch(epoch1),
580        )
581        .await
582        .unwrap();
583        assert_eq!(aligned_epoch1, epoch1);
584        assert_eq!(aligned_epoch2, epoch1);
585
586        {
587            // commit epoch1
588            let mut commit_future = pin!(
589                client2
590                    .commit(
591                        epoch1,
592                        SinkMetadata {
593                            metadata: Some(Metadata::Serialized(SerializedMetadata {
594                                metadata: metadata[0][1].clone(),
595                            })),
596                        },
597                        None,
598                    )
599                    .map(|result| result.unwrap())
600            );
601            assert!(
602                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
603                    .await
604                    .is_pending()
605            );
606            join(
607                commit_future,
608                client1
609                    .commit(
610                        epoch1,
611                        SinkMetadata {
612                            metadata: Some(Metadata::Serialized(SerializedMetadata {
613                                metadata: metadata[0][0].clone(),
614                            })),
615                        },
616                        None,
617                    )
618                    .map(|result| result.unwrap()),
619            )
620            .await;
621        }
622
623        // commit epoch2
624        let mut commit_future = pin!(
625            client1
626                .commit(
627                    epoch2,
628                    SinkMetadata {
629                        metadata: Some(Metadata::Serialized(SerializedMetadata {
630                            metadata: metadata[1][0].clone(),
631                        })),
632                    },
633                    None,
634                )
635                .map(|result| result.unwrap())
636        );
637        assert!(
638            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
639                .await
640                .is_pending()
641        );
642        join(
643            commit_future,
644            client2
645                .commit(
646                    epoch2,
647                    SinkMetadata {
648                        metadata: Some(Metadata::Serialized(SerializedMetadata {
649                            metadata: metadata[1][1].clone(),
650                        })),
651                    },
652                    None,
653                )
654                .map(|result| result.unwrap()),
655        )
656        .await;
657    }
658
659    #[tokio::test]
660    async fn test_single_writer() {
661        let param = SinkParam {
662            sink_id: SinkId::from(1),
663            sink_name: "test".into(),
664            properties: Default::default(),
665            columns: vec![],
666            downstream_pk: None,
667            sink_type: SinkType::AppendOnly,
668            format_desc: None,
669            db_name: "test".into(),
670            sink_from_name: "test".into(),
671        };
672
673        let epoch1 = 233;
674        let epoch2 = 234;
675
676        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
677        let build_bitmap = |indexes: &[usize]| {
678            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
679            for i in indexes {
680                builder.set(*i, true);
681            }
682            builder.finish()
683        };
684        let vnode = build_bitmap(&all_vnode);
685
686        let metadata = [vec![1u8, 2u8], vec![3u8, 4u8]];
687        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
688            let (_sender, receiver) = unbounded_channel();
689
690            async move { Ok((1, receiver)) }.boxed()
691        });
692        let (manager, (_join_handle, _stop_tx)) =
693            SinkCoordinatorManager::start_worker_with_spawn_worker({
694                let expected_param = param.clone();
695                let metadata = metadata.clone();
696                move |param, new_writer_rx| {
697                    let metadata = metadata.clone();
698                    let expected_param = expected_param.clone();
699                    tokio::spawn({
700                        let subscriber = mock_subscriber.clone();
701                        async move {
702                            // validate the start request
703                            assert_eq!(param, expected_param);
704                            CoordinatorWorker::execute_coordinator(
705                                param.clone(),
706                                new_writer_rx,
707                                MockCoordinator::new(
708                                    0,
709                                    |epoch, metadata_list, count: &mut usize| {
710                                        *count += 1;
711                                        let mut metadata_list =
712                                            metadata_list
713                                                .into_iter()
714                                                .map(|metadata| match metadata {
715                                                    SinkMetadata {
716                                                        metadata:
717                                                            Some(Metadata::Serialized(
718                                                                SerializedMetadata { metadata },
719                                                            )),
720                                                    } => metadata,
721                                                    _ => unreachable!(),
722                                                })
723                                                .collect_vec();
724                                        metadata_list.sort();
725                                        match *count {
726                                            1 => {
727                                                assert_eq!(epoch, epoch1);
728                                                assert_eq!(1, metadata_list.len());
729                                                assert_eq!(metadata[0], metadata_list[0]);
730                                            }
731                                            2 => {
732                                                assert_eq!(epoch, epoch2);
733                                                assert_eq!(1, metadata_list.len());
734                                                assert_eq!(metadata[1], metadata_list[0]);
735                                            }
736                                            _ => unreachable!(),
737                                        }
738                                        Ok(())
739                                    },
740                                ),
741                                subscriber.clone(),
742                            )
743                            .await;
744                        }
745                    })
746                }
747            });
748
749        let build_client = |vnode| async {
750            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
751                Ok(tonic::Response::new(
752                    manager
753                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
754                        .await
755                        .unwrap()
756                        .boxed(),
757                ))
758            })
759            .await
760            .unwrap()
761            .0
762        };
763
764        let mut client = build_client(vnode).await;
765
766        let aligned_epoch = client.align_initial_epoch(epoch1).await.unwrap();
767        assert_eq!(aligned_epoch, epoch1);
768
769        client
770            .commit(
771                epoch1,
772                SinkMetadata {
773                    metadata: Some(Metadata::Serialized(SerializedMetadata {
774                        metadata: metadata[0].clone(),
775                    })),
776                },
777                None,
778            )
779            .await
780            .unwrap();
781
782        client
783            .commit(
784                epoch2,
785                SinkMetadata {
786                    metadata: Some(Metadata::Serialized(SerializedMetadata {
787                        metadata: metadata[1].clone(),
788                    })),
789                },
790                None,
791            )
792            .await
793            .unwrap();
794    }
795
796    #[tokio::test]
797    async fn test_partial_commit() {
798        let param = SinkParam {
799            sink_id: SinkId::from(1),
800            sink_name: "test".into(),
801            properties: Default::default(),
802            columns: vec![],
803            downstream_pk: None,
804            sink_type: SinkType::AppendOnly,
805            format_desc: None,
806            db_name: "test".into(),
807            sink_from_name: "test".into(),
808        };
809
810        let epoch = 233;
811
812        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
813        all_vnode.shuffle(&mut rand::rng());
814        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
815        let build_bitmap = |indexes: &[usize]| {
816            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
817            for i in indexes {
818                builder.set(*i, true);
819            }
820            builder.finish()
821        };
822        let vnode1 = build_bitmap(first);
823        let vnode2 = build_bitmap(second);
824
825        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
826            let (_sender, receiver) = unbounded_channel();
827
828            async move { Ok((1, receiver)) }.boxed()
829        });
830        let (manager, (_join_handle, _stop_tx)) =
831            SinkCoordinatorManager::start_worker_with_spawn_worker({
832                let expected_param = param.clone();
833                move |param, new_writer_rx| {
834                    let expected_param = expected_param.clone();
835                    tokio::spawn({
836                        let subscriber = mock_subscriber.clone();
837                        async move {
838                            // validate the start request
839                            assert_eq!(param, expected_param);
840                            CoordinatorWorker::execute_coordinator(
841                                param,
842                                new_writer_rx,
843                                MockCoordinator::new((), |_, _, _| unreachable!()),
844                                subscriber.clone(),
845                            )
846                            .await;
847                        }
848                    })
849                }
850            });
851
852        let build_client = |vnode| async {
853            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
854                Ok(tonic::Response::new(
855                    manager
856                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
857                        .await
858                        .unwrap()
859                        .boxed(),
860                ))
861            })
862            .await
863            .unwrap()
864            .0
865        };
866
867        let (mut client1, client2) = join(build_client(vnode1), build_client(vnode2)).await;
868
869        // commit epoch
870        let mut commit_future = pin!(client1.commit(
871            epoch,
872            SinkMetadata {
873                metadata: Some(Metadata::Serialized(SerializedMetadata {
874                    metadata: vec![],
875                })),
876            },
877            None,
878        ));
879        assert!(
880            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
881                .await
882                .is_pending()
883        );
884        drop(client2);
885        assert!(commit_future.await.is_err());
886    }
887
888    #[tokio::test]
889    async fn test_fail_commit() {
890        let param = SinkParam {
891            sink_id: SinkId::from(1),
892            sink_name: "test".into(),
893            properties: Default::default(),
894            columns: vec![],
895            downstream_pk: None,
896            sink_type: SinkType::AppendOnly,
897            format_desc: None,
898            db_name: "test".into(),
899            sink_from_name: "test".into(),
900        };
901
902        let epoch = 233;
903
904        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
905        all_vnode.shuffle(&mut rand::rng());
906        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
907        let build_bitmap = |indexes: &[usize]| {
908            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
909            for i in indexes {
910                builder.set(*i, true);
911            }
912            builder.finish()
913        };
914        let vnode1 = build_bitmap(first);
915        let vnode2 = build_bitmap(second);
916        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
917            let (_sender, receiver) = unbounded_channel();
918
919            async move { Ok((1, receiver)) }.boxed()
920        });
921        let (manager, (_join_handle, _stop_tx)) =
922            SinkCoordinatorManager::start_worker_with_spawn_worker({
923                let expected_param = param.clone();
924                move |param, new_writer_rx| {
925                    let expected_param = expected_param.clone();
926                    tokio::spawn({
927                        let subscriber = mock_subscriber.clone();
928                        {
929                            async move {
930                                // validate the start request
931                                assert_eq!(param, expected_param);
932                                CoordinatorWorker::execute_coordinator(
933                                    param,
934                                    new_writer_rx,
935                                    MockCoordinator::new((), |_, _, _| {
936                                        Err(SinkError::Coordinator(anyhow!("failed to commit")))
937                                    }),
938                                    subscriber.clone(),
939                                )
940                                .await;
941                            }
942                        }
943                    })
944                }
945            });
946
947        let build_client = |vnode| async {
948            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
949                Ok(tonic::Response::new(
950                    manager
951                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
952                        .await
953                        .unwrap()
954                        .boxed(),
955                ))
956            })
957            .await
958            .unwrap()
959            .0
960        };
961
962        let (mut client1, mut client2) = join(build_client(vnode1), build_client(vnode2)).await;
963
964        // commit epoch
965        let mut commit_future = pin!(client1.commit(
966            epoch,
967            SinkMetadata {
968                metadata: Some(Metadata::Serialized(SerializedMetadata {
969                    metadata: vec![],
970                })),
971            },
972            None,
973        ));
974        assert!(
975            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
976                .await
977                .is_pending()
978        );
979        let (result1, result2) = join(
980            commit_future,
981            client2.commit(
982                epoch,
983                SinkMetadata {
984                    metadata: Some(Metadata::Serialized(SerializedMetadata {
985                        metadata: vec![],
986                    })),
987                },
988                None,
989            ),
990        )
991        .await;
992        assert!(result1.is_err());
993        assert!(result2.is_err());
994    }
995
996    #[tokio::test]
997    async fn test_update_vnode_bitmap() {
998        let param = SinkParam {
999            sink_id: SinkId::from(1),
1000            sink_name: "test".into(),
1001            properties: Default::default(),
1002            columns: vec![],
1003            downstream_pk: None,
1004            sink_type: SinkType::AppendOnly,
1005            format_desc: None,
1006            db_name: "test".into(),
1007            sink_from_name: "test".into(),
1008        };
1009
1010        let epoch1 = 233;
1011        let epoch2 = 234;
1012        let epoch3 = 235;
1013        let epoch4 = 236;
1014
1015        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1016        all_vnode.shuffle(&mut rand::rng());
1017        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
1018        let build_bitmap = |indexes: &[usize]| {
1019            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1020            for i in indexes {
1021                builder.set(*i, true);
1022            }
1023            builder.finish()
1024        };
1025        let vnode1 = build_bitmap(first);
1026        let vnode2 = build_bitmap(second);
1027
1028        let metadata = [
1029            [vec![1u8, 2u8], vec![3u8, 4u8]],
1030            [vec![5u8, 6u8], vec![7u8, 8u8]],
1031        ];
1032
1033        let metadata_scale_out = [vec![9u8, 10u8], vec![11u8, 12u8], vec![13u8, 14u8]];
1034        let metadata_scale_in = [vec![13u8, 14u8], vec![15u8, 16u8]];
1035        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
1036            let (_sender, receiver) = unbounded_channel();
1037
1038            async move { Ok((1, receiver)) }.boxed()
1039        });
1040        let (manager, (_join_handle, _stop_tx)) =
1041            SinkCoordinatorManager::start_worker_with_spawn_worker({
1042                let expected_param = param.clone();
1043                let metadata = metadata.clone();
1044                let metadata_scale_out = metadata_scale_out.clone();
1045                let metadata_scale_in = metadata_scale_in.clone();
1046                move |param, new_writer_rx| {
1047                    let metadata = metadata.clone();
1048                    let metadata_scale_out = metadata_scale_out.clone();
1049                    let metadata_scale_in = metadata_scale_in.clone();
1050                    let expected_param = expected_param.clone();
1051                    tokio::spawn({
1052                        let subscriber = mock_subscriber.clone();
1053                        async move {
1054                            // validate the start request
1055                            assert_eq!(param, expected_param);
1056                            CoordinatorWorker::execute_coordinator(
1057                                param.clone(),
1058                                new_writer_rx,
1059                                MockCoordinator::new(
1060                                    0,
1061                                    |epoch, metadata_list, count: &mut usize| {
1062                                        *count += 1;
1063                                        let mut metadata_list =
1064                                            metadata_list
1065                                                .into_iter()
1066                                                .map(|metadata| match metadata {
1067                                                    SinkMetadata {
1068                                                        metadata:
1069                                                            Some(Metadata::Serialized(
1070                                                                SerializedMetadata { metadata },
1071                                                            )),
1072                                                    } => metadata,
1073                                                    _ => unreachable!(),
1074                                                })
1075                                                .collect_vec();
1076                                        metadata_list.sort();
1077                                        let (expected_epoch, expected_metadata_list) = match *count
1078                                        {
1079                                            1 => (epoch1, metadata[0].as_slice()),
1080                                            2 => (epoch2, metadata[1].as_slice()),
1081                                            3 => (epoch3, metadata_scale_out.as_slice()),
1082                                            4 => (epoch4, metadata_scale_in.as_slice()),
1083                                            _ => unreachable!(),
1084                                        };
1085                                        assert_eq!(expected_epoch, epoch);
1086                                        assert_eq!(expected_metadata_list, &metadata_list);
1087                                        Ok(())
1088                                    },
1089                                ),
1090                                subscriber.clone(),
1091                            )
1092                            .await;
1093                        }
1094                    })
1095                }
1096            });
1097
1098        let build_client = |vnode| async {
1099            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1100                Ok(tonic::Response::new(
1101                    manager
1102                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1103                        .await
1104                        .unwrap()
1105                        .boxed(),
1106                ))
1107            })
1108            .await
1109        };
1110
1111        let ((mut client1, _), (mut client2, _)) =
1112            try_join(build_client(vnode1), pin!(build_client(vnode2)))
1113                .await
1114                .unwrap();
1115
1116        let (aligned_epoch1, aligned_epoch2) = try_join(
1117            client1.align_initial_epoch(epoch1),
1118            client2.align_initial_epoch(epoch1),
1119        )
1120        .await
1121        .unwrap();
1122        assert_eq!(aligned_epoch1, epoch1);
1123        assert_eq!(aligned_epoch2, epoch1);
1124
1125        {
1126            // commit epoch1
1127            let mut commit_future = pin!(
1128                client2
1129                    .commit(
1130                        epoch1,
1131                        SinkMetadata {
1132                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1133                                metadata: metadata[0][1].clone(),
1134                            })),
1135                        },
1136                        None,
1137                    )
1138                    .map(|result| result.unwrap())
1139            );
1140            assert!(
1141                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1142                    .await
1143                    .is_pending()
1144            );
1145            join(
1146                commit_future,
1147                client1
1148                    .commit(
1149                        epoch1,
1150                        SinkMetadata {
1151                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1152                                metadata: metadata[0][0].clone(),
1153                            })),
1154                        },
1155                        None,
1156                    )
1157                    .map(|result| result.unwrap()),
1158            )
1159            .await;
1160        }
1161
1162        let (vnode1, vnode2, vnode3) = {
1163            let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1164            let (second, third) = second.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1165            (
1166                build_bitmap(first),
1167                build_bitmap(second),
1168                build_bitmap(third),
1169            )
1170        };
1171
1172        let mut build_client3_future = pin!(build_client(vnode3));
1173        assert!(
1174            poll_fn(|cx| Poll::Ready(build_client3_future.as_mut().poll(cx)))
1175                .await
1176                .is_pending()
1177        );
1178        let mut client3;
1179        {
1180            {
1181                // commit epoch2
1182                let mut commit_future = pin!(
1183                    client1
1184                        .commit(
1185                            epoch2,
1186                            SinkMetadata {
1187                                metadata: Some(Metadata::Serialized(SerializedMetadata {
1188                                    metadata: metadata[1][0].clone(),
1189                                })),
1190                            },
1191                            None,
1192                        )
1193                        .map_err(Into::into)
1194                );
1195                assert!(
1196                    poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1197                        .await
1198                        .is_pending()
1199                );
1200                try_join(
1201                    commit_future,
1202                    client2.commit(
1203                        epoch2,
1204                        SinkMetadata {
1205                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1206                                metadata: metadata[1][1].clone(),
1207                            })),
1208                        },
1209                        None,
1210                    ),
1211                )
1212                .await
1213                .unwrap();
1214            }
1215
1216            client3 = {
1217                let (
1218                    (client3, init_epoch),
1219                    (update_vnode_bitmap_epoch1, update_vnode_bitmap_epoch2),
1220                ) = try_join(
1221                    build_client3_future,
1222                    try_join(
1223                        client1.update_vnode_bitmap(&vnode1),
1224                        client2.update_vnode_bitmap(&vnode2),
1225                    )
1226                    .map_err(Into::into),
1227                )
1228                .await
1229                .unwrap();
1230                assert_eq!(init_epoch, Some(epoch2));
1231                assert_eq!(update_vnode_bitmap_epoch1, epoch2);
1232                assert_eq!(update_vnode_bitmap_epoch2, epoch2);
1233                client3
1234            };
1235            let mut commit_future3 = pin!(client3.commit(
1236                epoch3,
1237                SinkMetadata {
1238                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1239                        metadata: metadata_scale_out[2].clone(),
1240                    })),
1241                },
1242                None,
1243            ));
1244            assert!(
1245                poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1246                    .await
1247                    .is_pending()
1248            );
1249            let mut commit_future1 = pin!(client1.commit(
1250                epoch3,
1251                SinkMetadata {
1252                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1253                        metadata: metadata_scale_out[0].clone(),
1254                    })),
1255                },
1256                None,
1257            ));
1258            assert!(
1259                poll_fn(|cx| Poll::Ready(commit_future1.as_mut().poll(cx)))
1260                    .await
1261                    .is_pending()
1262            );
1263            assert!(
1264                poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1265                    .await
1266                    .is_pending()
1267            );
1268            try_join(
1269                client2.commit(
1270                    epoch3,
1271                    SinkMetadata {
1272                        metadata: Some(Metadata::Serialized(SerializedMetadata {
1273                            metadata: metadata_scale_out[1].clone(),
1274                        })),
1275                    },
1276                    None,
1277                ),
1278                try_join(commit_future1, commit_future3),
1279            )
1280            .await
1281            .unwrap();
1282        }
1283
1284        let (vnode2, vnode3) = {
1285            let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1286            (build_bitmap(first), build_bitmap(second))
1287        };
1288
1289        {
1290            let (_, (update_vnode_bitmap_epoch2, update_vnode_bitmap_epoch3)) = try_join(
1291                client1.stop(),
1292                try_join(
1293                    client2.update_vnode_bitmap(&vnode2),
1294                    client3.update_vnode_bitmap(&vnode3),
1295                ),
1296            )
1297            .await
1298            .unwrap();
1299            assert_eq!(update_vnode_bitmap_epoch2, epoch3);
1300            assert_eq!(update_vnode_bitmap_epoch3, epoch3);
1301        }
1302
1303        {
1304            let mut commit_future = pin!(
1305                client2
1306                    .commit(
1307                        epoch4,
1308                        SinkMetadata {
1309                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1310                                metadata: metadata_scale_in[0].clone(),
1311                            })),
1312                        },
1313                        None,
1314                    )
1315                    .map(|result| result.unwrap())
1316            );
1317            assert!(
1318                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1319                    .await
1320                    .is_pending()
1321            );
1322            join(
1323                commit_future,
1324                client3
1325                    .commit(
1326                        epoch4,
1327                        SinkMetadata {
1328                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1329                                metadata: metadata_scale_in[1].clone(),
1330                            })),
1331                        },
1332                        None,
1333                    )
1334                    .map(|result| result.unwrap()),
1335            )
1336            .await;
1337        }
1338    }
1339}