risingwave_meta/manager/sink_coordination/
manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use futures::future::{BoxFuture, Either, select};
21use futures::stream::FuturesUnordered;
22use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
25use risingwave_connector::sink::catalog::SinkId;
26use risingwave_connector::sink::{SinkCommittedEpochSubscriber, SinkError, SinkParam};
27use risingwave_pb::connector_service::coordinate_request::Msg;
28use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse, coordinate_request};
29use rw_futures_util::pending_on_none;
30use sea_orm::DatabaseConnection;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
34use tokio::sync::oneshot::{Receiver, Sender, channel};
35use tokio::task::{JoinError, JoinHandle};
36use tokio_stream::wrappers::UnboundedReceiverStream;
37use tonic::Status;
38use tracing::{debug, error, info, warn};
39
40use crate::hummock::HummockManagerRef;
41use crate::manager::MetadataManager;
42use crate::manager::sink_coordination::SinkWriterRequestStream;
43use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
44use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
45
46macro_rules! send_with_err_check {
47    ($tx:expr, $msg:expr) => {
48        if $tx.send($msg).is_err() {
49            error!("unable to send msg");
50        }
51    };
52}
53
54macro_rules! send_await_with_err_check {
55    ($tx:expr, $msg:expr) => {
56        if $tx.send($msg).await.is_err() {
57            error!("unable to send msg");
58        }
59    };
60}
61
62const BOUNDED_CHANNEL_SIZE: usize = 16;
63
64enum ManagerRequest {
65    NewSinkWriter(SinkWriterCoordinationHandle),
66    StopCoordinator {
67        finish_notifier: Sender<()>,
68        /// sink id to stop. When `None`, stop all sink coordinator
69        sink_id: Option<SinkId>,
70    },
71}
72
73#[derive(Clone)]
74pub struct SinkCoordinatorManager {
75    request_tx: mpsc::Sender<ManagerRequest>,
76}
77
78fn new_committed_epoch_subscriber(
79    hummock_manager: HummockManagerRef,
80    metadata_manager: MetadataManager,
81) -> SinkCommittedEpochSubscriber {
82    Arc::new(move |sink_id| {
83        let hummock_manager = hummock_manager.clone();
84        let metadata_manager = metadata_manager.clone();
85        async move {
86            let state_table_ids = metadata_manager
87                .get_sink_state_table_ids(sink_id.sink_id as _)
88                .await
89                .map_err(SinkError::from)?;
90            let Some(table_id) = state_table_ids.first() else {
91                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
92            };
93            hummock_manager
94                .subscribe_table_committed_epoch(*table_id)
95                .await
96                .map_err(SinkError::from)
97        }
98        .boxed()
99    })
100}
101
102impl SinkCoordinatorManager {
103    #[cfg(test)]
104    pub(crate) fn for_test() -> Self {
105        let (request_tx, mut request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
106        let _join_handle = tokio::spawn(async move {
107            while let Some(req) = request_rx.recv().await {
108                let ManagerRequest::StopCoordinator {
109                    finish_notifier,
110                    sink_id,
111                } = req
112                else {
113                    unreachable!()
114                };
115                assert_eq!(sink_id, None);
116                finish_notifier.send(()).unwrap();
117            }
118        });
119        SinkCoordinatorManager { request_tx }
120    }
121
122    pub fn start_worker(
123        db: DatabaseConnection,
124        hummock_manager: HummockManagerRef,
125        metadata_manager: MetadataManager,
126        iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
127    ) -> (Self, (JoinHandle<()>, Sender<()>)) {
128        let subscriber =
129            new_committed_epoch_subscriber(hummock_manager.clone(), metadata_manager.clone());
130        Self::start_worker_with_spawn_worker(move |param, manager_request_stream| {
131            tokio::spawn(CoordinatorWorker::run(
132                param,
133                manager_request_stream,
134                db.clone(),
135                subscriber.clone(),
136                iceberg_compact_stat_sender.clone(),
137            ))
138        })
139    }
140
141    fn start_worker_with_spawn_worker(
142        spawn_coordinator_worker: impl SpawnCoordinatorFn,
143    ) -> (Self, (JoinHandle<()>, Sender<()>)) {
144        let (request_tx, request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
145        let (shutdown_tx, shutdown_rx) = channel();
146        let worker = ManagerWorker::new(request_rx, shutdown_rx);
147        let join_handle = tokio::spawn(worker.execute(spawn_coordinator_worker));
148        (
149            SinkCoordinatorManager { request_tx },
150            (join_handle, shutdown_tx),
151        )
152    }
153
154    pub async fn handle_new_request(
155        &self,
156        mut request_stream: SinkWriterRequestStream,
157    ) -> Result<impl Stream<Item = Result<CoordinateResponse, Status>> + use<>, Status> {
158        let (param, vnode_bitmap) = match request_stream.try_next().await? {
159            Some(CoordinateRequest {
160                msg:
161                    Some(Msg::StartRequest(coordinate_request::StartCoordinationRequest {
162                        param: Some(param),
163                        vnode_bitmap: Some(vnode_bitmap),
164                    })),
165            }) => (SinkParam::from_proto(param), Bitmap::from(&vnode_bitmap)),
166            msg => {
167                return Err(Status::invalid_argument(format!(
168                    "expected CoordinateRequest::StartRequest in the first request, get {:?}",
169                    msg
170                )));
171            }
172        };
173        let (response_tx, response_rx) = mpsc::unbounded_channel();
174        self.request_tx
175            .send(ManagerRequest::NewSinkWriter(
176                SinkWriterCoordinationHandle::new(request_stream, response_tx, param, vnode_bitmap),
177            ))
178            .await
179            .map_err(|_| {
180                Status::unavailable(
181                    "unable to send to sink manager worker. The worker may have stopped",
182                )
183            })?;
184
185        Ok(UnboundedReceiverStream::new(response_rx))
186    }
187
188    async fn stop_coordinator(&self, sink_id: Option<SinkId>) {
189        let (tx, rx) = channel();
190        send_await_with_err_check!(
191            self.request_tx,
192            ManagerRequest::StopCoordinator {
193                finish_notifier: tx,
194                sink_id,
195            }
196        );
197        if rx.await.is_err() {
198            error!("fail to wait for resetting sink manager worker");
199        }
200        info!("successfully stop coordinator: {:?}", sink_id);
201    }
202
203    pub async fn reset(&self) {
204        self.stop_coordinator(None).await;
205    }
206
207    pub async fn stop_sink_coordinator(&self, sink_id: SinkId) {
208        self.stop_coordinator(Some(sink_id)).await;
209    }
210}
211
212struct CoordinatorWorkerHandle {
213    /// Sender to coordinator worker. Drop the sender as a stop signal
214    request_sender: Option<UnboundedSender<SinkWriterCoordinationHandle>>,
215    /// Notify when the coordinator worker stops
216    finish_notifiers: Vec<Sender<()>>,
217}
218
219struct ManagerWorker {
220    request_rx: mpsc::Receiver<ManagerRequest>,
221    // Make it option so that it can be polled with &mut SinkManagerWorker
222    shutdown_rx: Receiver<()>,
223
224    running_coordinator_worker_join_handles:
225        FuturesUnordered<BoxFuture<'static, (SinkId, Result<(), JoinError>)>>,
226    running_coordinator_worker: HashMap<SinkId, CoordinatorWorkerHandle>,
227}
228
229enum ManagerEvent {
230    NewRequest(ManagerRequest),
231    CoordinatorWorkerFinished {
232        sink_id: SinkId,
233        join_result: Result<(), JoinError>,
234    },
235}
236
237trait SpawnCoordinatorFn = FnMut(SinkParam, UnboundedReceiver<SinkWriterCoordinationHandle>) -> JoinHandle<()>
238    + Send
239    + 'static;
240
241impl ManagerWorker {
242    fn new(request_rx: mpsc::Receiver<ManagerRequest>, shutdown_rx: Receiver<()>) -> Self {
243        ManagerWorker {
244            request_rx,
245            shutdown_rx,
246            running_coordinator_worker_join_handles: Default::default(),
247            running_coordinator_worker: Default::default(),
248        }
249    }
250
251    async fn execute(mut self, mut spawn_coordinator_worker: impl SpawnCoordinatorFn) {
252        while let Some(event) = self.next_event().await {
253            match event {
254                ManagerEvent::NewRequest(request) => match request {
255                    ManagerRequest::NewSinkWriter(request) => {
256                        self.handle_new_sink_writer(request, &mut spawn_coordinator_worker)
257                    }
258                    ManagerRequest::StopCoordinator {
259                        finish_notifier,
260                        sink_id,
261                    } => {
262                        if let Some(sink_id) = sink_id {
263                            if let Some(worker_handle) =
264                                self.running_coordinator_worker.get_mut(&sink_id)
265                            {
266                                if let Some(sender) = worker_handle.request_sender.take() {
267                                    // drop the sender as a signal to notify the coordinator worker
268                                    // to stop
269                                    drop(sender);
270                                }
271                                worker_handle.finish_notifiers.push(finish_notifier);
272                            } else {
273                                debug!(
274                                    "sink coordinator of {} is not running. Notify finish directly",
275                                    sink_id.sink_id
276                                );
277                                send_with_err_check!(finish_notifier, ());
278                            }
279                        } else {
280                            self.clean_up().await;
281                            send_with_err_check!(finish_notifier, ());
282                        }
283                    }
284                },
285                ManagerEvent::CoordinatorWorkerFinished {
286                    sink_id,
287                    join_result,
288                } => self.handle_coordinator_finished(sink_id, join_result),
289            }
290        }
291        self.clean_up().await;
292        info!("sink manager worker exited");
293    }
294
295    async fn next_event(&mut self) -> Option<ManagerEvent> {
296        match select(
297            select(
298                pin!(self.request_rx.recv()),
299                pin!(pending_on_none(
300                    self.running_coordinator_worker_join_handles.next()
301                )),
302            ),
303            &mut self.shutdown_rx,
304        )
305        .await
306        {
307            Either::Left((either, _)) => match either {
308                Either::Left((Some(request), _)) => Some(ManagerEvent::NewRequest(request)),
309                Either::Left((None, _)) => None,
310                Either::Right(((sink_id, join_result), _)) => {
311                    Some(ManagerEvent::CoordinatorWorkerFinished {
312                        sink_id,
313                        join_result,
314                    })
315                }
316            },
317            Either::Right(_) => None,
318        }
319    }
320
321    async fn clean_up(&mut self) {
322        info!("sink manager worker start cleaning up");
323        for worker_handle in self.running_coordinator_worker.values_mut() {
324            if let Some(sender) = worker_handle.request_sender.take() {
325                // drop the sender to notify the coordinator worker to stop
326                drop(sender);
327            }
328        }
329        while let Some((sink_id, join_result)) =
330            self.running_coordinator_worker_join_handles.next().await
331        {
332            self.handle_coordinator_finished(sink_id, join_result);
333        }
334        info!("sink manager worker finished cleaning up");
335    }
336
337    fn handle_coordinator_finished(&mut self, sink_id: SinkId, join_result: Result<(), JoinError>) {
338        let worker_handle = self
339            .running_coordinator_worker
340            .remove(&sink_id)
341            .expect("finished coordinator should have an associated worker handle");
342        for finish_notifier in worker_handle.finish_notifiers {
343            send_with_err_check!(finish_notifier, ());
344        }
345        match join_result {
346            Ok(()) => {
347                info!(
348                    id = sink_id.sink_id,
349                    "sink coordinator has gracefully finished",
350                );
351            }
352            Err(err) => {
353                error!(
354                    id = sink_id.sink_id,
355                    error = %err.as_report(),
356                    "sink coordinator finished with error",
357                );
358            }
359        }
360    }
361
362    fn handle_new_sink_writer(
363        &mut self,
364        new_writer: SinkWriterCoordinationHandle,
365        spawn_coordinator_worker: &mut impl SpawnCoordinatorFn,
366    ) {
367        let param = new_writer.param();
368        let sink_id = param.sink_id;
369
370        let handle = self
371            .running_coordinator_worker
372            .entry(param.sink_id)
373            .or_insert_with(|| {
374                // Launch the coordinator worker task if it is the first
375                let (request_tx, request_rx) = unbounded_channel();
376                let join_handle = spawn_coordinator_worker(param.clone(), request_rx);
377                self.running_coordinator_worker_join_handles.push(
378                    join_handle
379                        .map(move |join_result| (sink_id, join_result))
380                        .boxed(),
381                );
382                CoordinatorWorkerHandle {
383                    request_sender: Some(request_tx),
384                    finish_notifiers: Vec::new(),
385                }
386            });
387
388        if let Some(sender) = handle.request_sender.as_mut() {
389            send_with_err_check!(sender, new_writer);
390        } else {
391            warn!(
392                "handle a new request while the sink coordinator is being stopped: {:?}",
393                param
394            );
395            new_writer.abort(Status::internal("the sink is being stopped"));
396        }
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use std::future::{Future, poll_fn};
403    use std::pin::pin;
404    use std::sync::Arc;
405    use std::task::Poll;
406
407    use anyhow::anyhow;
408    use async_trait::async_trait;
409    use futures::future::{join, try_join};
410    use futures::{FutureExt, StreamExt, TryFutureExt};
411    use itertools::Itertools;
412    use rand::seq::SliceRandom;
413    use risingwave_common::bitmap::BitmapBuilder;
414    use risingwave_common::catalog::Field;
415    use risingwave_common::hash::VirtualNode;
416    use risingwave_connector::sink::catalog::{SinkId, SinkType};
417    use risingwave_connector::sink::{SinkCommitCoordinator, SinkError, SinkParam};
418    use risingwave_pb::connector_service::SinkMetadata;
419    use risingwave_pb::connector_service::sink_metadata::{Metadata, SerializedMetadata};
420    use risingwave_rpc_client::CoordinatorStreamHandle;
421    use tokio::sync::mpsc::unbounded_channel;
422    use tokio_stream::wrappers::ReceiverStream;
423
424    use crate::manager::sink_coordination::SinkCoordinatorManager;
425    use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
426    use crate::manager::sink_coordination::manager::SinkCommittedEpochSubscriber;
427
428    struct MockCoordinator<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> {
429        context: C,
430        f: F,
431    }
432
433    impl<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> MockCoordinator<C, F> {
434        fn new(context: C, f: F) -> Self {
435            MockCoordinator { context, f }
436        }
437    }
438
439    #[async_trait]
440    impl<C: Send, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError> + Send>
441        SinkCommitCoordinator for MockCoordinator<C, F>
442    {
443        async fn init(
444            &mut self,
445            _subscriber: SinkCommittedEpochSubscriber,
446        ) -> risingwave_connector::sink::Result<Option<u64>> {
447            Ok(None)
448        }
449
450        async fn commit(
451            &mut self,
452            epoch: u64,
453            metadata: Vec<SinkMetadata>,
454            _add_columns: Option<Vec<Field>>,
455        ) -> risingwave_connector::sink::Result<()> {
456            (self.f)(epoch, metadata, &mut self.context)
457        }
458    }
459
460    #[tokio::test]
461    async fn test_basic() {
462        let param = SinkParam {
463            sink_id: SinkId::from(1),
464            sink_name: "test".into(),
465            properties: Default::default(),
466            columns: vec![],
467            downstream_pk: vec![],
468            sink_type: SinkType::AppendOnly,
469            format_desc: None,
470            db_name: "test".into(),
471            sink_from_name: "test".into(),
472        };
473
474        let epoch0 = 232;
475        let epoch1 = 233;
476        let epoch2 = 234;
477
478        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
479        all_vnode.shuffle(&mut rand::rng());
480        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
481        let build_bitmap = |indexes: &[usize]| {
482            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
483            for i in indexes {
484                builder.set(*i, true);
485            }
486            builder.finish()
487        };
488        let vnode1 = build_bitmap(first);
489        let vnode2 = build_bitmap(second);
490
491        let metadata = [
492            [vec![1u8, 2u8], vec![3u8, 4u8]],
493            [vec![5u8, 6u8], vec![7u8, 8u8]],
494        ];
495        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
496            let (_sender, receiver) = unbounded_channel();
497
498            async move { Ok((1, receiver)) }.boxed()
499        });
500
501        let (manager, (_join_handle, _stop_tx)) =
502            SinkCoordinatorManager::start_worker_with_spawn_worker({
503                let expected_param = param.clone();
504                let metadata = metadata.clone();
505                move |param, new_writer_rx| {
506                    let metadata = metadata.clone();
507                    let expected_param = expected_param.clone();
508                    tokio::spawn({
509                        let subscriber = mock_subscriber.clone();
510                        async move {
511                            // validate the start request
512                            assert_eq!(param, expected_param);
513                            CoordinatorWorker::execute_coordinator(
514                                param.clone(),
515                                new_writer_rx,
516                                MockCoordinator::new(
517                                    0,
518                                    |epoch, metadata_list, count: &mut usize| {
519                                        *count += 1;
520                                        let mut metadata_list =
521                                            metadata_list
522                                                .into_iter()
523                                                .map(|metadata| match metadata {
524                                                    SinkMetadata {
525                                                        metadata:
526                                                            Some(Metadata::Serialized(
527                                                                SerializedMetadata { metadata },
528                                                            )),
529                                                    } => metadata,
530                                                    _ => unreachable!(),
531                                                })
532                                                .collect_vec();
533                                        metadata_list.sort();
534                                        match *count {
535                                            1 => {
536                                                assert_eq!(epoch, epoch1);
537                                                assert_eq!(2, metadata_list.len());
538                                                assert_eq!(metadata[0][0], metadata_list[0]);
539                                                assert_eq!(metadata[0][1], metadata_list[1]);
540                                            }
541                                            2 => {
542                                                assert_eq!(epoch, epoch2);
543                                                assert_eq!(2, metadata_list.len());
544                                                assert_eq!(metadata[1][0], metadata_list[0]);
545                                                assert_eq!(metadata[1][1], metadata_list[1]);
546                                            }
547                                            _ => unreachable!(),
548                                        }
549                                        Ok(())
550                                    },
551                                ),
552                                subscriber.clone(),
553                            )
554                            .await;
555                        }
556                    })
557                }
558            });
559
560        let build_client = |vnode| async {
561            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
562                Ok(tonic::Response::new(
563                    manager
564                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
565                        .await
566                        .unwrap()
567                        .boxed(),
568                ))
569            })
570            .await
571            .unwrap()
572            .0
573        };
574
575        let (mut client1, mut client2) =
576            join(build_client(vnode1), pin!(build_client(vnode2))).await;
577
578        let (aligned_epoch1, aligned_epoch2) = try_join(
579            client1.align_initial_epoch(epoch0),
580            client2.align_initial_epoch(epoch1),
581        )
582        .await
583        .unwrap();
584        assert_eq!(aligned_epoch1, epoch1);
585        assert_eq!(aligned_epoch2, epoch1);
586
587        {
588            // commit epoch1
589            let mut commit_future = pin!(
590                client2
591                    .commit(
592                        epoch1,
593                        SinkMetadata {
594                            metadata: Some(Metadata::Serialized(SerializedMetadata {
595                                metadata: metadata[0][1].clone(),
596                            })),
597                        },
598                        None,
599                    )
600                    .map(|result| result.unwrap())
601            );
602            assert!(
603                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
604                    .await
605                    .is_pending()
606            );
607            join(
608                commit_future,
609                client1
610                    .commit(
611                        epoch1,
612                        SinkMetadata {
613                            metadata: Some(Metadata::Serialized(SerializedMetadata {
614                                metadata: metadata[0][0].clone(),
615                            })),
616                        },
617                        None,
618                    )
619                    .map(|result| result.unwrap()),
620            )
621            .await;
622        }
623
624        // commit epoch2
625        let mut commit_future = pin!(
626            client1
627                .commit(
628                    epoch2,
629                    SinkMetadata {
630                        metadata: Some(Metadata::Serialized(SerializedMetadata {
631                            metadata: metadata[1][0].clone(),
632                        })),
633                    },
634                    None,
635                )
636                .map(|result| result.unwrap())
637        );
638        assert!(
639            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
640                .await
641                .is_pending()
642        );
643        join(
644            commit_future,
645            client2
646                .commit(
647                    epoch2,
648                    SinkMetadata {
649                        metadata: Some(Metadata::Serialized(SerializedMetadata {
650                            metadata: metadata[1][1].clone(),
651                        })),
652                    },
653                    None,
654                )
655                .map(|result| result.unwrap()),
656        )
657        .await;
658    }
659
660    #[tokio::test]
661    async fn test_single_writer() {
662        let param = SinkParam {
663            sink_id: SinkId::from(1),
664            sink_name: "test".into(),
665            properties: Default::default(),
666            columns: vec![],
667            downstream_pk: vec![],
668            sink_type: SinkType::AppendOnly,
669            format_desc: None,
670            db_name: "test".into(),
671            sink_from_name: "test".into(),
672        };
673
674        let epoch1 = 233;
675        let epoch2 = 234;
676
677        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
678        let build_bitmap = |indexes: &[usize]| {
679            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
680            for i in indexes {
681                builder.set(*i, true);
682            }
683            builder.finish()
684        };
685        let vnode = build_bitmap(&all_vnode);
686
687        let metadata = [vec![1u8, 2u8], vec![3u8, 4u8]];
688        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
689            let (_sender, receiver) = unbounded_channel();
690
691            async move { Ok((1, receiver)) }.boxed()
692        });
693        let (manager, (_join_handle, _stop_tx)) =
694            SinkCoordinatorManager::start_worker_with_spawn_worker({
695                let expected_param = param.clone();
696                let metadata = metadata.clone();
697                move |param, new_writer_rx| {
698                    let metadata = metadata.clone();
699                    let expected_param = expected_param.clone();
700                    tokio::spawn({
701                        let subscriber = mock_subscriber.clone();
702                        async move {
703                            // validate the start request
704                            assert_eq!(param, expected_param);
705                            CoordinatorWorker::execute_coordinator(
706                                param.clone(),
707                                new_writer_rx,
708                                MockCoordinator::new(
709                                    0,
710                                    |epoch, metadata_list, count: &mut usize| {
711                                        *count += 1;
712                                        let mut metadata_list =
713                                            metadata_list
714                                                .into_iter()
715                                                .map(|metadata| match metadata {
716                                                    SinkMetadata {
717                                                        metadata:
718                                                            Some(Metadata::Serialized(
719                                                                SerializedMetadata { metadata },
720                                                            )),
721                                                    } => metadata,
722                                                    _ => unreachable!(),
723                                                })
724                                                .collect_vec();
725                                        metadata_list.sort();
726                                        match *count {
727                                            1 => {
728                                                assert_eq!(epoch, epoch1);
729                                                assert_eq!(1, metadata_list.len());
730                                                assert_eq!(metadata[0], metadata_list[0]);
731                                            }
732                                            2 => {
733                                                assert_eq!(epoch, epoch2);
734                                                assert_eq!(1, metadata_list.len());
735                                                assert_eq!(metadata[1], metadata_list[0]);
736                                            }
737                                            _ => unreachable!(),
738                                        }
739                                        Ok(())
740                                    },
741                                ),
742                                subscriber.clone(),
743                            )
744                            .await;
745                        }
746                    })
747                }
748            });
749
750        let build_client = |vnode| async {
751            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
752                Ok(tonic::Response::new(
753                    manager
754                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
755                        .await
756                        .unwrap()
757                        .boxed(),
758                ))
759            })
760            .await
761            .unwrap()
762            .0
763        };
764
765        let mut client = build_client(vnode).await;
766
767        let aligned_epoch = client.align_initial_epoch(epoch1).await.unwrap();
768        assert_eq!(aligned_epoch, epoch1);
769
770        client
771            .commit(
772                epoch1,
773                SinkMetadata {
774                    metadata: Some(Metadata::Serialized(SerializedMetadata {
775                        metadata: metadata[0].clone(),
776                    })),
777                },
778                None,
779            )
780            .await
781            .unwrap();
782
783        client
784            .commit(
785                epoch2,
786                SinkMetadata {
787                    metadata: Some(Metadata::Serialized(SerializedMetadata {
788                        metadata: metadata[1].clone(),
789                    })),
790                },
791                None,
792            )
793            .await
794            .unwrap();
795    }
796
797    #[tokio::test]
798    async fn test_partial_commit() {
799        let param = SinkParam {
800            sink_id: SinkId::from(1),
801            sink_name: "test".into(),
802            properties: Default::default(),
803            columns: vec![],
804            downstream_pk: vec![],
805            sink_type: SinkType::AppendOnly,
806            format_desc: None,
807            db_name: "test".into(),
808            sink_from_name: "test".into(),
809        };
810
811        let epoch = 233;
812
813        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
814        all_vnode.shuffle(&mut rand::rng());
815        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
816        let build_bitmap = |indexes: &[usize]| {
817            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
818            for i in indexes {
819                builder.set(*i, true);
820            }
821            builder.finish()
822        };
823        let vnode1 = build_bitmap(first);
824        let vnode2 = build_bitmap(second);
825
826        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
827            let (_sender, receiver) = unbounded_channel();
828
829            async move { Ok((1, receiver)) }.boxed()
830        });
831        let (manager, (_join_handle, _stop_tx)) =
832            SinkCoordinatorManager::start_worker_with_spawn_worker({
833                let expected_param = param.clone();
834                move |param, new_writer_rx| {
835                    let expected_param = expected_param.clone();
836                    tokio::spawn({
837                        let subscriber = mock_subscriber.clone();
838                        async move {
839                            // validate the start request
840                            assert_eq!(param, expected_param);
841                            CoordinatorWorker::execute_coordinator(
842                                param,
843                                new_writer_rx,
844                                MockCoordinator::new((), |_, _, _| unreachable!()),
845                                subscriber.clone(),
846                            )
847                            .await;
848                        }
849                    })
850                }
851            });
852
853        let build_client = |vnode| async {
854            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
855                Ok(tonic::Response::new(
856                    manager
857                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
858                        .await
859                        .unwrap()
860                        .boxed(),
861                ))
862            })
863            .await
864            .unwrap()
865            .0
866        };
867
868        let (mut client1, client2) = join(build_client(vnode1), build_client(vnode2)).await;
869
870        // commit epoch
871        let mut commit_future = pin!(client1.commit(
872            epoch,
873            SinkMetadata {
874                metadata: Some(Metadata::Serialized(SerializedMetadata {
875                    metadata: vec![],
876                })),
877            },
878            None,
879        ));
880        assert!(
881            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
882                .await
883                .is_pending()
884        );
885        drop(client2);
886        assert!(commit_future.await.is_err());
887    }
888
889    #[tokio::test]
890    async fn test_fail_commit() {
891        let param = SinkParam {
892            sink_id: SinkId::from(1),
893            sink_name: "test".into(),
894            properties: Default::default(),
895            columns: vec![],
896            downstream_pk: vec![],
897            sink_type: SinkType::AppendOnly,
898            format_desc: None,
899            db_name: "test".into(),
900            sink_from_name: "test".into(),
901        };
902
903        let epoch = 233;
904
905        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
906        all_vnode.shuffle(&mut rand::rng());
907        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
908        let build_bitmap = |indexes: &[usize]| {
909            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
910            for i in indexes {
911                builder.set(*i, true);
912            }
913            builder.finish()
914        };
915        let vnode1 = build_bitmap(first);
916        let vnode2 = build_bitmap(second);
917        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
918            let (_sender, receiver) = unbounded_channel();
919
920            async move { Ok((1, receiver)) }.boxed()
921        });
922        let (manager, (_join_handle, _stop_tx)) =
923            SinkCoordinatorManager::start_worker_with_spawn_worker({
924                let expected_param = param.clone();
925                move |param, new_writer_rx| {
926                    let expected_param = expected_param.clone();
927                    tokio::spawn({
928                        let subscriber = mock_subscriber.clone();
929                        {
930                            async move {
931                                // validate the start request
932                                assert_eq!(param, expected_param);
933                                CoordinatorWorker::execute_coordinator(
934                                    param,
935                                    new_writer_rx,
936                                    MockCoordinator::new((), |_, _, _| {
937                                        Err(SinkError::Coordinator(anyhow!("failed to commit")))
938                                    }),
939                                    subscriber.clone(),
940                                )
941                                .await;
942                            }
943                        }
944                    })
945                }
946            });
947
948        let build_client = |vnode| async {
949            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
950                Ok(tonic::Response::new(
951                    manager
952                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
953                        .await
954                        .unwrap()
955                        .boxed(),
956                ))
957            })
958            .await
959            .unwrap()
960            .0
961        };
962
963        let (mut client1, mut client2) = join(build_client(vnode1), build_client(vnode2)).await;
964
965        // commit epoch
966        let mut commit_future = pin!(client1.commit(
967            epoch,
968            SinkMetadata {
969                metadata: Some(Metadata::Serialized(SerializedMetadata {
970                    metadata: vec![],
971                })),
972            },
973            None,
974        ));
975        assert!(
976            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
977                .await
978                .is_pending()
979        );
980        let (result1, result2) = join(
981            commit_future,
982            client2.commit(
983                epoch,
984                SinkMetadata {
985                    metadata: Some(Metadata::Serialized(SerializedMetadata {
986                        metadata: vec![],
987                    })),
988                },
989                None,
990            ),
991        )
992        .await;
993        assert!(result1.is_err());
994        assert!(result2.is_err());
995    }
996
997    #[tokio::test]
998    async fn test_update_vnode_bitmap() {
999        let param = SinkParam {
1000            sink_id: SinkId::from(1),
1001            sink_name: "test".into(),
1002            properties: Default::default(),
1003            columns: vec![],
1004            downstream_pk: vec![],
1005            sink_type: SinkType::AppendOnly,
1006            format_desc: None,
1007            db_name: "test".into(),
1008            sink_from_name: "test".into(),
1009        };
1010
1011        let epoch1 = 233;
1012        let epoch2 = 234;
1013        let epoch3 = 235;
1014        let epoch4 = 236;
1015
1016        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1017        all_vnode.shuffle(&mut rand::rng());
1018        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
1019        let build_bitmap = |indexes: &[usize]| {
1020            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1021            for i in indexes {
1022                builder.set(*i, true);
1023            }
1024            builder.finish()
1025        };
1026        let vnode1 = build_bitmap(first);
1027        let vnode2 = build_bitmap(second);
1028
1029        let metadata = [
1030            [vec![1u8, 2u8], vec![3u8, 4u8]],
1031            [vec![5u8, 6u8], vec![7u8, 8u8]],
1032        ];
1033
1034        let metadata_scale_out = [vec![9u8, 10u8], vec![11u8, 12u8], vec![13u8, 14u8]];
1035        let metadata_scale_in = [vec![13u8, 14u8], vec![15u8, 16u8]];
1036        let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
1037            let (_sender, receiver) = unbounded_channel();
1038
1039            async move { Ok((1, receiver)) }.boxed()
1040        });
1041        let (manager, (_join_handle, _stop_tx)) =
1042            SinkCoordinatorManager::start_worker_with_spawn_worker({
1043                let expected_param = param.clone();
1044                let metadata = metadata.clone();
1045                let metadata_scale_out = metadata_scale_out.clone();
1046                let metadata_scale_in = metadata_scale_in.clone();
1047                move |param, new_writer_rx| {
1048                    let metadata = metadata.clone();
1049                    let metadata_scale_out = metadata_scale_out.clone();
1050                    let metadata_scale_in = metadata_scale_in.clone();
1051                    let expected_param = expected_param.clone();
1052                    tokio::spawn({
1053                        let subscriber = mock_subscriber.clone();
1054                        async move {
1055                            // validate the start request
1056                            assert_eq!(param, expected_param);
1057                            CoordinatorWorker::execute_coordinator(
1058                                param.clone(),
1059                                new_writer_rx,
1060                                MockCoordinator::new(
1061                                    0,
1062                                    |epoch, metadata_list, count: &mut usize| {
1063                                        *count += 1;
1064                                        let mut metadata_list =
1065                                            metadata_list
1066                                                .into_iter()
1067                                                .map(|metadata| match metadata {
1068                                                    SinkMetadata {
1069                                                        metadata:
1070                                                            Some(Metadata::Serialized(
1071                                                                SerializedMetadata { metadata },
1072                                                            )),
1073                                                    } => metadata,
1074                                                    _ => unreachable!(),
1075                                                })
1076                                                .collect_vec();
1077                                        metadata_list.sort();
1078                                        let (expected_epoch, expected_metadata_list) = match *count
1079                                        {
1080                                            1 => (epoch1, metadata[0].as_slice()),
1081                                            2 => (epoch2, metadata[1].as_slice()),
1082                                            3 => (epoch3, metadata_scale_out.as_slice()),
1083                                            4 => (epoch4, metadata_scale_in.as_slice()),
1084                                            _ => unreachable!(),
1085                                        };
1086                                        assert_eq!(expected_epoch, epoch);
1087                                        assert_eq!(expected_metadata_list, &metadata_list);
1088                                        Ok(())
1089                                    },
1090                                ),
1091                                subscriber.clone(),
1092                            )
1093                            .await;
1094                        }
1095                    })
1096                }
1097            });
1098
1099        let build_client = |vnode| async {
1100            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1101                Ok(tonic::Response::new(
1102                    manager
1103                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1104                        .await
1105                        .unwrap()
1106                        .boxed(),
1107                ))
1108            })
1109            .await
1110        };
1111
1112        let ((mut client1, _), (mut client2, _)) =
1113            try_join(build_client(vnode1), pin!(build_client(vnode2)))
1114                .await
1115                .unwrap();
1116
1117        let (aligned_epoch1, aligned_epoch2) = try_join(
1118            client1.align_initial_epoch(epoch1),
1119            client2.align_initial_epoch(epoch1),
1120        )
1121        .await
1122        .unwrap();
1123        assert_eq!(aligned_epoch1, epoch1);
1124        assert_eq!(aligned_epoch2, epoch1);
1125
1126        {
1127            // commit epoch1
1128            let mut commit_future = pin!(
1129                client2
1130                    .commit(
1131                        epoch1,
1132                        SinkMetadata {
1133                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1134                                metadata: metadata[0][1].clone(),
1135                            })),
1136                        },
1137                        None,
1138                    )
1139                    .map(|result| result.unwrap())
1140            );
1141            assert!(
1142                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1143                    .await
1144                    .is_pending()
1145            );
1146            join(
1147                commit_future,
1148                client1
1149                    .commit(
1150                        epoch1,
1151                        SinkMetadata {
1152                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1153                                metadata: metadata[0][0].clone(),
1154                            })),
1155                        },
1156                        None,
1157                    )
1158                    .map(|result| result.unwrap()),
1159            )
1160            .await;
1161        }
1162
1163        let (vnode1, vnode2, vnode3) = {
1164            let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1165            let (second, third) = second.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1166            (
1167                build_bitmap(first),
1168                build_bitmap(second),
1169                build_bitmap(third),
1170            )
1171        };
1172
1173        let mut build_client3_future = pin!(build_client(vnode3));
1174        assert!(
1175            poll_fn(|cx| Poll::Ready(build_client3_future.as_mut().poll(cx)))
1176                .await
1177                .is_pending()
1178        );
1179        let mut client3;
1180        {
1181            {
1182                // commit epoch2
1183                let mut commit_future = pin!(
1184                    client1
1185                        .commit(
1186                            epoch2,
1187                            SinkMetadata {
1188                                metadata: Some(Metadata::Serialized(SerializedMetadata {
1189                                    metadata: metadata[1][0].clone(),
1190                                })),
1191                            },
1192                            None,
1193                        )
1194                        .map_err(Into::into)
1195                );
1196                assert!(
1197                    poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1198                        .await
1199                        .is_pending()
1200                );
1201                try_join(
1202                    commit_future,
1203                    client2.commit(
1204                        epoch2,
1205                        SinkMetadata {
1206                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1207                                metadata: metadata[1][1].clone(),
1208                            })),
1209                        },
1210                        None,
1211                    ),
1212                )
1213                .await
1214                .unwrap();
1215            }
1216
1217            client3 = {
1218                let (
1219                    (client3, init_epoch),
1220                    (update_vnode_bitmap_epoch1, update_vnode_bitmap_epoch2),
1221                ) = try_join(
1222                    build_client3_future,
1223                    try_join(
1224                        client1.update_vnode_bitmap(&vnode1),
1225                        client2.update_vnode_bitmap(&vnode2),
1226                    )
1227                    .map_err(Into::into),
1228                )
1229                .await
1230                .unwrap();
1231                assert_eq!(init_epoch, Some(epoch2));
1232                assert_eq!(update_vnode_bitmap_epoch1, epoch2);
1233                assert_eq!(update_vnode_bitmap_epoch2, epoch2);
1234                client3
1235            };
1236            let mut commit_future3 = pin!(client3.commit(
1237                epoch3,
1238                SinkMetadata {
1239                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1240                        metadata: metadata_scale_out[2].clone(),
1241                    })),
1242                },
1243                None,
1244            ));
1245            assert!(
1246                poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1247                    .await
1248                    .is_pending()
1249            );
1250            let mut commit_future1 = pin!(client1.commit(
1251                epoch3,
1252                SinkMetadata {
1253                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1254                        metadata: metadata_scale_out[0].clone(),
1255                    })),
1256                },
1257                None,
1258            ));
1259            assert!(
1260                poll_fn(|cx| Poll::Ready(commit_future1.as_mut().poll(cx)))
1261                    .await
1262                    .is_pending()
1263            );
1264            assert!(
1265                poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1266                    .await
1267                    .is_pending()
1268            );
1269            try_join(
1270                client2.commit(
1271                    epoch3,
1272                    SinkMetadata {
1273                        metadata: Some(Metadata::Serialized(SerializedMetadata {
1274                            metadata: metadata_scale_out[1].clone(),
1275                        })),
1276                    },
1277                    None,
1278                ),
1279                try_join(commit_future1, commit_future3),
1280            )
1281            .await
1282            .unwrap();
1283        }
1284
1285        let (vnode2, vnode3) = {
1286            let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1287            (build_bitmap(first), build_bitmap(second))
1288        };
1289
1290        {
1291            let (_, (update_vnode_bitmap_epoch2, update_vnode_bitmap_epoch3)) = try_join(
1292                client1.stop(),
1293                try_join(
1294                    client2.update_vnode_bitmap(&vnode2),
1295                    client3.update_vnode_bitmap(&vnode3),
1296                ),
1297            )
1298            .await
1299            .unwrap();
1300            assert_eq!(update_vnode_bitmap_epoch2, epoch3);
1301            assert_eq!(update_vnode_bitmap_epoch3, epoch3);
1302        }
1303
1304        {
1305            let mut commit_future = pin!(
1306                client2
1307                    .commit(
1308                        epoch4,
1309                        SinkMetadata {
1310                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1311                                metadata: metadata_scale_in[0].clone(),
1312                            })),
1313                        },
1314                        None,
1315                    )
1316                    .map(|result| result.unwrap())
1317            );
1318            assert!(
1319                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1320                    .await
1321                    .is_pending()
1322            );
1323            join(
1324                commit_future,
1325                client3
1326                    .commit(
1327                        epoch4,
1328                        SinkMetadata {
1329                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1330                                metadata: metadata_scale_in[1].clone(),
1331                            })),
1332                        },
1333                        None,
1334                    )
1335                    .map(|result| result.unwrap()),
1336            )
1337            .await;
1338        }
1339    }
1340}