risingwave_meta/manager/sink_coordination/
manager.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use futures::future::{BoxFuture, Either, join_all, select};
21use futures::stream::FuturesUnordered;
22use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
25use risingwave_connector::sink::catalog::SinkId;
26use risingwave_connector::sink::{SinkCommittedEpochSubscriber, SinkError, SinkParam};
27use risingwave_pb::connector_service::coordinate_request::Msg;
28use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse, coordinate_request};
29use rw_futures_util::pending_on_none;
30use sea_orm::DatabaseConnection;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
33use tokio::sync::oneshot::{Receiver, Sender, channel};
34use tokio::sync::{mpsc, oneshot};
35use tokio::task::{JoinError, JoinHandle};
36use tokio_stream::wrappers::UnboundedReceiverStream;
37use tonic::Status;
38use tracing::{debug, error, info, warn};
39
40use crate::hummock::HummockManagerRef;
41use crate::manager::MetadataManager;
42use crate::manager::sink_coordination::SinkWriterRequestStream;
43use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
44use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
45
46macro_rules! send_with_err_check {
47    ($tx:expr, $msg:expr) => {
48        if $tx.send($msg).is_err() {
49            error!("unable to send msg");
50        }
51    };
52}
53
54macro_rules! send_await_with_err_check {
55    ($tx:expr, $msg:expr) => {
56        if $tx.send($msg).await.is_err() {
57            error!("unable to send msg");
58        }
59    };
60}
61
62const BOUNDED_CHANNEL_SIZE: usize = 16;
63
64enum ManagerRequest {
65    NewSinkWriter(SinkWriterCoordinationHandle),
66    StopCoordinator {
67        finish_notifier: Sender<()>,
68        /// sink id to stop. When `None`, stop all sink coordinator
69        sink_ids: Option<Vec<SinkId>>,
70    },
71}
72
73#[derive(Clone)]
74pub struct SinkCoordinatorManager {
75    request_tx: mpsc::Sender<ManagerRequest>,
76}
77
78fn new_committed_epoch_subscriber(
79    hummock_manager: HummockManagerRef,
80    metadata_manager: MetadataManager,
81) -> SinkCommittedEpochSubscriber {
82    Arc::new(move |sink_id| {
83        let hummock_manager = hummock_manager.clone();
84        let metadata_manager = metadata_manager.clone();
85        async move {
86            let state_table_ids = metadata_manager
87                .get_sink_state_table_ids(sink_id)
88                .await
89                .map_err(SinkError::from)?;
90            let Some(table_id) = state_table_ids.first() else {
91                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
92            };
93            hummock_manager
94                .subscribe_table_committed_epoch(*table_id)
95                .await
96                .map_err(SinkError::from)
97        }
98        .boxed()
99    })
100}
101
102impl SinkCoordinatorManager {
103    pub fn start_worker(
104        db: DatabaseConnection,
105        hummock_manager: HummockManagerRef,
106        metadata_manager: MetadataManager,
107        iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
108    ) -> (Self, (JoinHandle<()>, Sender<()>)) {
109        let subscriber = new_committed_epoch_subscriber(hummock_manager, metadata_manager);
110        Self::start_worker_with_spawn_worker(move |param, manager_request_stream| {
111            tokio::spawn(CoordinatorWorker::run(
112                param,
113                manager_request_stream,
114                db.clone(),
115                subscriber.clone(),
116                iceberg_compact_stat_sender.clone(),
117            ))
118        })
119    }
120
121    fn start_worker_with_spawn_worker(
122        spawn_coordinator_worker: impl SpawnCoordinatorFn,
123    ) -> (Self, (JoinHandle<()>, Sender<()>)) {
124        let (request_tx, request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
125        let (shutdown_tx, shutdown_rx) = channel();
126        let worker = ManagerWorker::new(request_rx, shutdown_rx);
127        let join_handle = tokio::spawn(worker.execute(spawn_coordinator_worker));
128        (
129            SinkCoordinatorManager { request_tx },
130            (join_handle, shutdown_tx),
131        )
132    }
133
134    pub async fn handle_new_request(
135        &self,
136        mut request_stream: SinkWriterRequestStream,
137    ) -> Result<impl Stream<Item = Result<CoordinateResponse, Status>> + use<>, Status> {
138        let (param, vnode_bitmap) = match request_stream.try_next().await? {
139            Some(CoordinateRequest {
140                msg:
141                    Some(Msg::StartRequest(coordinate_request::StartCoordinationRequest {
142                        param: Some(param),
143                        vnode_bitmap: Some(vnode_bitmap),
144                    })),
145            }) => (SinkParam::from_proto(param), Bitmap::from(&vnode_bitmap)),
146            msg => {
147                return Err(Status::invalid_argument(format!(
148                    "expected CoordinateRequest::StartRequest in the first request, get {:?}",
149                    msg
150                )));
151            }
152        };
153        let (response_tx, response_rx) = mpsc::unbounded_channel();
154        self.request_tx
155            .send(ManagerRequest::NewSinkWriter(
156                SinkWriterCoordinationHandle::new(request_stream, response_tx, param, vnode_bitmap),
157            ))
158            .await
159            .map_err(|_| {
160                Status::unavailable(
161                    "unable to send to sink manager worker. The worker may have stopped",
162                )
163            })?;
164
165        Ok(UnboundedReceiverStream::new(response_rx))
166    }
167
168    async fn stop_coordinator(&self, sink_ids: Option<Vec<SinkId>>) {
169        let (tx, rx) = channel();
170        send_await_with_err_check!(
171            self.request_tx,
172            ManagerRequest::StopCoordinator {
173                finish_notifier: tx,
174                sink_ids: sink_ids.clone(),
175            }
176        );
177        if rx.await.is_err() {
178            error!("fail to wait for resetting sink manager worker");
179        }
180        info!("successfully stop coordinator: {:?}", sink_ids);
181    }
182
183    pub async fn reset(&self) {
184        self.stop_coordinator(None).await;
185    }
186
187    pub async fn stop_sink_coordinator(&self, sink_ids: Vec<SinkId>) {
188        self.stop_coordinator(Some(sink_ids)).await;
189    }
190}
191
192struct CoordinatorWorkerHandle {
193    /// Sender to coordinator worker. Drop the sender as a stop signal
194    request_sender: Option<UnboundedSender<SinkWriterCoordinationHandle>>,
195    /// Notify when the coordinator worker stops
196    finish_notifiers: Vec<Sender<()>>,
197}
198
199struct ManagerWorker {
200    request_rx: mpsc::Receiver<ManagerRequest>,
201    // Make it option so that it can be polled with &mut SinkManagerWorker
202    shutdown_rx: Receiver<()>,
203
204    running_coordinator_worker_join_handles:
205        FuturesUnordered<BoxFuture<'static, (SinkId, Result<(), JoinError>)>>,
206    running_coordinator_worker: HashMap<SinkId, CoordinatorWorkerHandle>,
207}
208
209enum ManagerEvent {
210    NewRequest(ManagerRequest),
211    CoordinatorWorkerFinished {
212        sink_id: SinkId,
213        join_result: Result<(), JoinError>,
214    },
215}
216
217trait SpawnCoordinatorFn = FnMut(SinkParam, UnboundedReceiver<SinkWriterCoordinationHandle>) -> JoinHandle<()>
218    + Send
219    + 'static;
220
221impl ManagerWorker {
222    fn new(request_rx: mpsc::Receiver<ManagerRequest>, shutdown_rx: Receiver<()>) -> Self {
223        ManagerWorker {
224            request_rx,
225            shutdown_rx,
226            running_coordinator_worker_join_handles: Default::default(),
227            running_coordinator_worker: Default::default(),
228        }
229    }
230
231    async fn execute(mut self, mut spawn_coordinator_worker: impl SpawnCoordinatorFn) {
232        while let Some(event) = self.next_event().await {
233            match event {
234                ManagerEvent::NewRequest(request) => match request {
235                    ManagerRequest::NewSinkWriter(request) => {
236                        self.handle_new_sink_writer(request, &mut spawn_coordinator_worker)
237                    }
238                    ManagerRequest::StopCoordinator {
239                        finish_notifier,
240                        sink_ids,
241                    } => {
242                        if let Some(sink_ids) = sink_ids {
243                            let mut rxs = Vec::with_capacity(sink_ids.len());
244                            for sink_id in sink_ids {
245                                if let Some(worker_handle) =
246                                    self.running_coordinator_worker.get_mut(&sink_id)
247                                {
248                                    let (tx, rx) = oneshot::channel();
249                                    rxs.push(rx);
250                                    worker_handle.finish_notifiers.push(tx);
251                                    if let Some(sender) = worker_handle.request_sender.take() {
252                                        // drop the sender as a signal to notify the coordinator worker
253                                        // to stop
254                                        drop(sender);
255                                    }
256                                } else {
257                                    debug!(
258                                        "sink coordinator of {} is not running, skip it",
259                                        sink_id
260                                    );
261                                }
262                            }
263                            tokio::spawn(async move {
264                                let notify_res = join_all(rxs).await;
265                                for res in notify_res {
266                                    if let Err(e) = res {
267                                        error!(
268                                            "fail to wait for resetting sink manager worker: {}",
269                                            e.as_report()
270                                        );
271                                    }
272                                }
273                                send_with_err_check!(finish_notifier, ());
274                            });
275                        } else {
276                            self.clean_up().await;
277                            send_with_err_check!(finish_notifier, ());
278                        }
279                    }
280                },
281                ManagerEvent::CoordinatorWorkerFinished {
282                    sink_id,
283                    join_result,
284                } => self.handle_coordinator_finished(sink_id, join_result),
285            }
286        }
287        self.clean_up().await;
288        info!("sink manager worker exited");
289    }
290
291    async fn next_event(&mut self) -> Option<ManagerEvent> {
292        match select(
293            select(
294                pin!(self.request_rx.recv()),
295                pin!(pending_on_none(
296                    self.running_coordinator_worker_join_handles.next()
297                )),
298            ),
299            &mut self.shutdown_rx,
300        )
301        .await
302        {
303            Either::Left((either, _)) => match either {
304                Either::Left((Some(request), _)) => Some(ManagerEvent::NewRequest(request)),
305                Either::Left((None, _)) => None,
306                Either::Right(((sink_id, join_result), _)) => {
307                    Some(ManagerEvent::CoordinatorWorkerFinished {
308                        sink_id,
309                        join_result,
310                    })
311                }
312            },
313            Either::Right(_) => None,
314        }
315    }
316
317    async fn clean_up(&mut self) {
318        info!("sink manager worker start cleaning up");
319        for worker_handle in self.running_coordinator_worker.values_mut() {
320            if let Some(sender) = worker_handle.request_sender.take() {
321                // drop the sender to notify the coordinator worker to stop
322                drop(sender);
323            }
324        }
325        while let Some((sink_id, join_result)) =
326            self.running_coordinator_worker_join_handles.next().await
327        {
328            self.handle_coordinator_finished(sink_id, join_result);
329        }
330        info!("sink manager worker finished cleaning up");
331    }
332
333    fn handle_coordinator_finished(&mut self, sink_id: SinkId, join_result: Result<(), JoinError>) {
334        let worker_handle = self
335            .running_coordinator_worker
336            .remove(&sink_id)
337            .expect("finished coordinator should have an associated worker handle");
338        for finish_notifier in worker_handle.finish_notifiers {
339            send_with_err_check!(finish_notifier, ());
340        }
341        match join_result {
342            Ok(()) => {
343                info!(
344                    id = %sink_id,
345                    "sink coordinator has gracefully finished",
346                );
347            }
348            Err(err) => {
349                error!(
350                    id = %sink_id,
351                    error = %err.as_report(),
352                    "sink coordinator finished with error",
353                );
354            }
355        }
356    }
357
358    fn handle_new_sink_writer(
359        &mut self,
360        new_writer: SinkWriterCoordinationHandle,
361        spawn_coordinator_worker: &mut impl SpawnCoordinatorFn,
362    ) {
363        let param = new_writer.param();
364        let sink_id = param.sink_id;
365
366        let handle = self
367            .running_coordinator_worker
368            .entry(param.sink_id)
369            .or_insert_with(|| {
370                // Launch the coordinator worker task if it is the first
371                let (request_tx, request_rx) = unbounded_channel();
372                let join_handle = spawn_coordinator_worker(param.clone(), request_rx);
373                self.running_coordinator_worker_join_handles.push(
374                    join_handle
375                        .map(move |join_result| (sink_id, join_result))
376                        .boxed(),
377                );
378                CoordinatorWorkerHandle {
379                    request_sender: Some(request_tx),
380                    finish_notifiers: Vec::new(),
381                }
382            });
383
384        if let Some(sender) = handle.request_sender.as_mut() {
385            send_with_err_check!(sender, new_writer);
386        } else {
387            warn!(
388                "handle a new request while the sink coordinator is being stopped: {:?}",
389                param
390            );
391            new_writer.abort(Status::internal("the sink is being stopped"));
392        }
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use std::future::{Future, poll_fn};
399    use std::pin::pin;
400    use std::sync::Arc;
401    use std::sync::atomic::AtomicI32;
402    use std::task::Poll;
403
404    use anyhow::anyhow;
405    use async_trait::async_trait;
406    use futures::future::{join, try_join};
407    use futures::{FutureExt, StreamExt, TryFutureExt};
408    use itertools::Itertools;
409    use rand::seq::SliceRandom;
410    use risingwave_common::bitmap::BitmapBuilder;
411    use risingwave_common::hash::VirtualNode;
412    use risingwave_connector::sink::catalog::{SinkId, SinkType};
413    use risingwave_connector::sink::{
414        SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkError, SinkParam,
415        TwoPhaseCommitCoordinator,
416    };
417    use risingwave_meta_model::SinkSchemachange;
418    use risingwave_pb::connector_service::SinkMetadata;
419    use risingwave_pb::connector_service::sink_metadata::{Metadata, SerializedMetadata};
420    use risingwave_pb::data::PbDataType;
421    use risingwave_pb::data::data_type::PbTypeName;
422    use risingwave_pb::plan_common::PbField;
423    use risingwave_pb::stream_plan::sink_schema_change::Op as SinkSchemachangeOp;
424    use risingwave_pb::stream_plan::{PbSinkAddColumnsOp, PbSinkSchemaChange};
425    use risingwave_rpc_client::CoordinatorStreamHandle;
426    use sea_orm::{ConnectionTrait, Database, DatabaseConnection};
427    use tokio::sync::mpsc::unbounded_channel;
428    use tokio_stream::wrappers::ReceiverStream;
429
430    use crate::manager::sink_coordination::SinkCoordinatorManager;
431    use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
432    use crate::manager::sink_coordination::manager::SinkCommittedEpochSubscriber;
433
434    struct MockSinglePhaseCoordinator<
435        C,
436        F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>,
437    > {
438        context: C,
439        f: F,
440    }
441
442    impl<
443        C: Send + 'static,
444        F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError> + Send + 'static,
445    > MockSinglePhaseCoordinator<C, F>
446    {
447        fn new_coordinator(context: C, f: F) -> SinkCommitCoordinator {
448            SinkCommitCoordinator::SinglePhase(Box::new(MockSinglePhaseCoordinator { context, f }))
449        }
450    }
451
452    #[async_trait]
453    impl<C: Send, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError> + Send>
454        SinglePhaseCommitCoordinator for MockSinglePhaseCoordinator<C, F>
455    {
456        async fn init(&mut self) -> risingwave_connector::sink::Result<()> {
457            Ok(())
458        }
459
460        async fn commit_data(
461            &mut self,
462            epoch: u64,
463            metadata: Vec<SinkMetadata>,
464        ) -> risingwave_connector::sink::Result<()> {
465            (self.f)(epoch, metadata, &mut self.context)
466        }
467
468        async fn commit_schema_change(
469            &mut self,
470            _epoch: u64,
471            _schema_change: PbSinkSchemaChange,
472        ) -> risingwave_connector::sink::Result<()> {
473            unreachable!()
474        }
475    }
476
477    #[tokio::test]
478    async fn test_basic() {
479        let db = prepare_db_backend().await;
480
481        let param = SinkParam {
482            sink_id: SinkId::from(1),
483            sink_name: "test".into(),
484            properties: Default::default(),
485            columns: vec![],
486            downstream_pk: None,
487            sink_type: SinkType::AppendOnly,
488            ignore_delete: false,
489            format_desc: None,
490            db_name: "test".into(),
491            sink_from_name: "test".into(),
492        };
493
494        let epoch0 = 232;
495        let epoch1 = 233;
496        let epoch2 = 234;
497
498        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
499        all_vnode.shuffle(&mut rand::rng());
500        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
501        let build_bitmap = |indexes: &[usize]| {
502            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
503            for i in indexes {
504                builder.set(*i, true);
505            }
506            builder.finish()
507        };
508        let vnode1 = build_bitmap(first);
509        let vnode2 = build_bitmap(second);
510
511        let metadata = [
512            [vec![1u8, 2u8], vec![3u8, 4u8]],
513            [vec![5u8, 6u8], vec![7u8, 8u8]],
514        ];
515        let sender = Arc::new(tokio::sync::Mutex::new(None));
516        let mock_subscriber: SinkCommittedEpochSubscriber = {
517            let captured_sender = sender.clone();
518            Arc::new(move |_sink_id: SinkId| {
519                let (sender, receiver) = unbounded_channel();
520                let captured_sender = captured_sender.clone();
521                async move {
522                    let mut guard = captured_sender.lock().await;
523                    *guard = Some(sender);
524                    Ok((1, receiver))
525                }
526                .boxed()
527            })
528        };
529
530        let (manager, (_join_handle, _stop_tx)) =
531            SinkCoordinatorManager::start_worker_with_spawn_worker({
532                let expected_param = param.clone();
533                let metadata = metadata.clone();
534                let db = db.clone();
535                move |param, new_writer_rx| {
536                    let metadata = metadata.clone();
537                    let expected_param = expected_param.clone();
538                    let db = db.clone();
539                    tokio::spawn({
540                        let subscriber = mock_subscriber.clone();
541                        async move {
542                            // validate the start request
543                            assert_eq!(param, expected_param);
544                            CoordinatorWorker::execute_coordinator(
545                                db,
546                                param.clone(),
547                                new_writer_rx,
548                                MockSinglePhaseCoordinator::new_coordinator(
549                                    0,
550                                    move |epoch, metadata_list, count: &mut usize| {
551                                        *count += 1;
552                                        let mut metadata_list =
553                                            metadata_list
554                                                .into_iter()
555                                                .map(|metadata| match metadata {
556                                                    SinkMetadata {
557                                                        metadata:
558                                                            Some(Metadata::Serialized(
559                                                                SerializedMetadata { metadata },
560                                                            )),
561                                                    } => metadata,
562                                                    _ => unreachable!(),
563                                                })
564                                                .collect_vec();
565                                        metadata_list.sort();
566                                        match *count {
567                                            1 => {
568                                                assert_eq!(epoch, epoch1);
569                                                assert_eq!(2, metadata_list.len());
570                                                assert_eq!(metadata[0][0], metadata_list[0]);
571                                                assert_eq!(metadata[0][1], metadata_list[1]);
572                                            }
573                                            2 => {
574                                                assert_eq!(epoch, epoch2);
575                                                assert_eq!(2, metadata_list.len());
576                                                assert_eq!(metadata[1][0], metadata_list[0]);
577                                                assert_eq!(metadata[1][1], metadata_list[1]);
578                                            }
579                                            _ => unreachable!(),
580                                        }
581                                        Ok(())
582                                    },
583                                ),
584                                subscriber.clone(),
585                            )
586                            .await;
587                        }
588                    })
589                }
590            });
591
592        let build_client = |vnode| async {
593            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
594                Ok(tonic::Response::new(
595                    manager
596                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
597                        .await
598                        .unwrap()
599                        .boxed(),
600                ))
601            })
602            .await
603            .unwrap()
604            .0
605        };
606
607        let (mut client1, mut client2) =
608            join(build_client(vnode1), pin!(build_client(vnode2))).await;
609
610        let (aligned_epoch1, aligned_epoch2) = try_join(
611            client1.align_initial_epoch(epoch0),
612            client2.align_initial_epoch(epoch1),
613        )
614        .await
615        .unwrap();
616        assert_eq!(aligned_epoch1, epoch1);
617        assert_eq!(aligned_epoch2, epoch1);
618
619        {
620            // commit epoch1
621            let mut commit_future = pin!(
622                client2
623                    .commit(
624                        epoch1,
625                        SinkMetadata {
626                            metadata: Some(Metadata::Serialized(SerializedMetadata {
627                                metadata: metadata[0][1].clone(),
628                            })),
629                        },
630                        None,
631                    )
632                    .map(|result| result.unwrap())
633            );
634            assert!(
635                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
636                    .await
637                    .is_pending()
638            );
639            join(
640                commit_future,
641                client1
642                    .commit(
643                        epoch1,
644                        SinkMetadata {
645                            metadata: Some(Metadata::Serialized(SerializedMetadata {
646                                metadata: metadata[0][0].clone(),
647                            })),
648                        },
649                        None,
650                    )
651                    .map(|result| result.unwrap()),
652            )
653            .await;
654        }
655
656        // commit epoch2
657        let mut commit_future = pin!(
658            client1
659                .commit(
660                    epoch2,
661                    SinkMetadata {
662                        metadata: Some(Metadata::Serialized(SerializedMetadata {
663                            metadata: metadata[1][0].clone(),
664                        })),
665                    },
666                    None,
667                )
668                .map(|result| result.unwrap())
669        );
670        assert!(
671            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
672                .await
673                .is_pending()
674        );
675        join(
676            commit_future,
677            client2
678                .commit(
679                    epoch2,
680                    SinkMetadata {
681                        metadata: Some(Metadata::Serialized(SerializedMetadata {
682                            metadata: metadata[1][1].clone(),
683                        })),
684                    },
685                    None,
686                )
687                .map(|result| result.unwrap()),
688        )
689        .await;
690    }
691
692    #[tokio::test]
693    async fn test_single_writer() {
694        let db = prepare_db_backend().await;
695        let param = SinkParam {
696            sink_id: SinkId::from(1),
697            sink_name: "test".into(),
698            properties: Default::default(),
699            columns: vec![],
700            downstream_pk: None,
701            sink_type: SinkType::AppendOnly,
702            ignore_delete: false,
703            format_desc: None,
704            db_name: "test".into(),
705            sink_from_name: "test".into(),
706        };
707
708        let epoch1 = 233;
709        let epoch2 = 234;
710
711        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
712        let build_bitmap = |indexes: &[usize]| {
713            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
714            for i in indexes {
715                builder.set(*i, true);
716            }
717            builder.finish()
718        };
719        let vnode = build_bitmap(&all_vnode);
720
721        let metadata = [vec![1u8, 2u8], vec![3u8, 4u8]];
722        let sender = Arc::new(tokio::sync::Mutex::new(None));
723        let mock_subscriber: SinkCommittedEpochSubscriber = {
724            let captured_sender = sender.clone();
725            Arc::new(move |_sink_id: SinkId| {
726                let (sender, receiver) = unbounded_channel();
727                let captured_sender = captured_sender.clone();
728                async move {
729                    let mut guard = captured_sender.lock().await;
730                    *guard = Some(sender);
731                    Ok((1, receiver))
732                }
733                .boxed()
734            })
735        };
736        let (manager, (_join_handle, _stop_tx)) =
737            SinkCoordinatorManager::start_worker_with_spawn_worker({
738                let expected_param = param.clone();
739                let metadata = metadata.clone();
740                let db = db.clone();
741                move |param, new_writer_rx| {
742                    let metadata = metadata.clone();
743                    let expected_param = expected_param.clone();
744                    let db = db.clone();
745                    tokio::spawn({
746                        let subscriber = mock_subscriber.clone();
747                        async move {
748                            // validate the start request
749                            assert_eq!(param, expected_param);
750                            CoordinatorWorker::execute_coordinator(
751                                db,
752                                param.clone(),
753                                new_writer_rx,
754                                MockSinglePhaseCoordinator::new_coordinator(
755                                    0,
756                                    move |epoch, metadata_list, count: &mut usize| {
757                                        *count += 1;
758                                        let mut metadata_list =
759                                            metadata_list
760                                                .into_iter()
761                                                .map(|metadata| match metadata {
762                                                    SinkMetadata {
763                                                        metadata:
764                                                            Some(Metadata::Serialized(
765                                                                SerializedMetadata { metadata },
766                                                            )),
767                                                    } => metadata,
768                                                    _ => unreachable!(),
769                                                })
770                                                .collect_vec();
771                                        metadata_list.sort();
772                                        match *count {
773                                            1 => {
774                                                assert_eq!(epoch, epoch1);
775                                                assert_eq!(1, metadata_list.len());
776                                                assert_eq!(metadata[0], metadata_list[0]);
777                                            }
778                                            2 => {
779                                                assert_eq!(epoch, epoch2);
780                                                assert_eq!(1, metadata_list.len());
781                                                assert_eq!(metadata[1], metadata_list[0]);
782                                            }
783                                            _ => unreachable!(),
784                                        }
785                                        Ok(())
786                                    },
787                                ),
788                                subscriber.clone(),
789                            )
790                            .await;
791                        }
792                    })
793                }
794            });
795
796        let build_client = |vnode| async {
797            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
798                Ok(tonic::Response::new(
799                    manager
800                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
801                        .await
802                        .unwrap()
803                        .boxed(),
804                ))
805            })
806            .await
807            .unwrap()
808            .0
809        };
810
811        let mut client = build_client(vnode).await;
812
813        let aligned_epoch = client.align_initial_epoch(epoch1).await.unwrap();
814        assert_eq!(aligned_epoch, epoch1);
815
816        client
817            .commit(
818                epoch1,
819                SinkMetadata {
820                    metadata: Some(Metadata::Serialized(SerializedMetadata {
821                        metadata: metadata[0].clone(),
822                    })),
823                },
824                None,
825            )
826            .await
827            .unwrap();
828
829        client
830            .commit(
831                epoch2,
832                SinkMetadata {
833                    metadata: Some(Metadata::Serialized(SerializedMetadata {
834                        metadata: metadata[1].clone(),
835                    })),
836                },
837                None,
838            )
839            .await
840            .unwrap();
841    }
842
843    #[tokio::test]
844    async fn test_partial_commit() {
845        let db = prepare_db_backend().await;
846        let param = SinkParam {
847            sink_id: SinkId::from(1),
848            sink_name: "test".into(),
849            properties: Default::default(),
850            columns: vec![],
851            downstream_pk: None,
852            sink_type: SinkType::AppendOnly,
853            ignore_delete: false,
854            format_desc: None,
855            db_name: "test".into(),
856            sink_from_name: "test".into(),
857        };
858
859        let epoch = 233;
860
861        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
862        all_vnode.shuffle(&mut rand::rng());
863        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
864        let build_bitmap = |indexes: &[usize]| {
865            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
866            for i in indexes {
867                builder.set(*i, true);
868            }
869            builder.finish()
870        };
871        let vnode1 = build_bitmap(first);
872        let vnode2 = build_bitmap(second);
873
874        let sender = Arc::new(tokio::sync::Mutex::new(None));
875        let mock_subscriber: SinkCommittedEpochSubscriber = {
876            let captured_sender = sender.clone();
877            Arc::new(move |_sink_id: SinkId| {
878                let (sender, receiver) = unbounded_channel();
879                let captured_sender = captured_sender.clone();
880                async move {
881                    let mut guard = captured_sender.lock().await;
882                    *guard = Some(sender);
883                    Ok((1, receiver))
884                }
885                .boxed()
886            })
887        };
888        let (manager, (_join_handle, _stop_tx)) =
889            SinkCoordinatorManager::start_worker_with_spawn_worker({
890                let expected_param = param.clone();
891                let db = db.clone();
892                move |param, new_writer_rx| {
893                    let expected_param = expected_param.clone();
894                    let db = db.clone();
895                    tokio::spawn({
896                        let subscriber = mock_subscriber.clone();
897                        async move {
898                            // validate the start request
899                            assert_eq!(param, expected_param);
900                            CoordinatorWorker::execute_coordinator(
901                                db,
902                                param,
903                                new_writer_rx,
904                                MockSinglePhaseCoordinator::new_coordinator(
905                                    (),
906                                    |_, _, _| unreachable!(),
907                                ),
908                                subscriber.clone(),
909                            )
910                            .await;
911                        }
912                    })
913                }
914            });
915
916        let build_client = |vnode| async {
917            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
918                Ok(tonic::Response::new(
919                    manager
920                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
921                        .await
922                        .unwrap()
923                        .boxed(),
924                ))
925            })
926            .await
927            .unwrap()
928            .0
929        };
930
931        let (mut client1, client2) = join(build_client(vnode1), build_client(vnode2)).await;
932
933        // commit epoch
934        let mut commit_future = pin!(client1.commit(
935            epoch,
936            SinkMetadata {
937                metadata: Some(Metadata::Serialized(SerializedMetadata {
938                    metadata: vec![],
939                })),
940            },
941            None,
942        ));
943        assert!(
944            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
945                .await
946                .is_pending()
947        );
948        drop(client2);
949        assert!(commit_future.await.is_err());
950    }
951
952    #[tokio::test]
953    async fn test_fail_commit() {
954        let db = prepare_db_backend().await;
955        let param = SinkParam {
956            sink_id: SinkId::from(1),
957            sink_name: "test".into(),
958            properties: Default::default(),
959            columns: vec![],
960            downstream_pk: None,
961            sink_type: SinkType::AppendOnly,
962            ignore_delete: false,
963            format_desc: None,
964            db_name: "test".into(),
965            sink_from_name: "test".into(),
966        };
967
968        let epoch = 233;
969
970        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
971        all_vnode.shuffle(&mut rand::rng());
972        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
973        let build_bitmap = |indexes: &[usize]| {
974            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
975            for i in indexes {
976                builder.set(*i, true);
977            }
978            builder.finish()
979        };
980        let vnode1 = build_bitmap(first);
981        let vnode2 = build_bitmap(second);
982        let sender = Arc::new(tokio::sync::Mutex::new(None));
983        let mock_subscriber: SinkCommittedEpochSubscriber = {
984            let captured_sender = sender.clone();
985            Arc::new(move |_sink_id: SinkId| {
986                let (sender, receiver) = unbounded_channel();
987                let captured_sender = captured_sender.clone();
988                async move {
989                    let mut guard = captured_sender.lock().await;
990                    *guard = Some(sender);
991                    Ok((1, receiver))
992                }
993                .boxed()
994            })
995        };
996        let (manager, (_join_handle, _stop_tx)) =
997            SinkCoordinatorManager::start_worker_with_spawn_worker({
998                let expected_param = param.clone();
999                let db = db.clone();
1000                move |param, new_writer_rx| {
1001                    let expected_param = expected_param.clone();
1002                    let db = db.clone();
1003                    tokio::spawn({
1004                        let subscriber = mock_subscriber.clone();
1005                        {
1006                            async move {
1007                                // validate the start request
1008                                assert_eq!(param, expected_param);
1009                                CoordinatorWorker::execute_coordinator(
1010                                    db,
1011                                    param,
1012                                    new_writer_rx,
1013                                    MockSinglePhaseCoordinator::new_coordinator((), |_, _, _| {
1014                                        Err(SinkError::Coordinator(anyhow!("failed to commit")))
1015                                    }),
1016                                    subscriber.clone(),
1017                                )
1018                                .await;
1019                            }
1020                        }
1021                    })
1022                }
1023            });
1024
1025        let build_client = |vnode| async {
1026            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1027                Ok(tonic::Response::new(
1028                    manager
1029                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1030                        .await
1031                        .unwrap()
1032                        .boxed(),
1033                ))
1034            })
1035            .await
1036            .unwrap()
1037            .0
1038        };
1039
1040        let (mut client1, mut client2) = join(build_client(vnode1), build_client(vnode2)).await;
1041
1042        // commit epoch
1043        let mut commit_future = pin!(client1.commit(
1044            epoch,
1045            SinkMetadata {
1046                metadata: Some(Metadata::Serialized(SerializedMetadata {
1047                    metadata: vec![],
1048                })),
1049            },
1050            None,
1051        ));
1052        assert!(
1053            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1054                .await
1055                .is_pending()
1056        );
1057        let (result1, result2) = join(
1058            commit_future,
1059            client2.commit(
1060                epoch,
1061                SinkMetadata {
1062                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1063                        metadata: vec![],
1064                    })),
1065                },
1066                None,
1067            ),
1068        )
1069        .await;
1070        assert!(result1.is_err());
1071        assert!(result2.is_err());
1072    }
1073
1074    #[tokio::test]
1075    async fn test_update_vnode_bitmap() {
1076        let db = prepare_db_backend().await;
1077        let param = SinkParam {
1078            sink_id: SinkId::from(1),
1079            sink_name: "test".into(),
1080            properties: Default::default(),
1081            columns: vec![],
1082            downstream_pk: None,
1083            sink_type: SinkType::AppendOnly,
1084            ignore_delete: false,
1085            format_desc: None,
1086            db_name: "test".into(),
1087            sink_from_name: "test".into(),
1088        };
1089
1090        let epoch1 = 233;
1091        let epoch2 = 234;
1092        let epoch3 = 235;
1093        let epoch4 = 236;
1094
1095        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1096        all_vnode.shuffle(&mut rand::rng());
1097        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
1098        let build_bitmap = |indexes: &[usize]| {
1099            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1100            for i in indexes {
1101                builder.set(*i, true);
1102            }
1103            builder.finish()
1104        };
1105        let vnode1 = build_bitmap(first);
1106        let vnode2 = build_bitmap(second);
1107
1108        let metadata = [
1109            [vec![1u8, 2u8], vec![3u8, 4u8]],
1110            [vec![5u8, 6u8], vec![7u8, 8u8]],
1111        ];
1112
1113        let metadata_scale_out = [vec![9u8, 10u8], vec![11u8, 12u8], vec![13u8, 14u8]];
1114        let metadata_scale_in = [vec![13u8, 14u8], vec![15u8, 16u8]];
1115        let sender = Arc::new(tokio::sync::Mutex::new(None));
1116        let mock_subscriber: SinkCommittedEpochSubscriber = {
1117            let captured_sender = sender.clone();
1118            Arc::new(move |_sink_id: SinkId| {
1119                let (sender, receiver) = unbounded_channel();
1120                let captured_sender = captured_sender.clone();
1121                async move {
1122                    let mut guard = captured_sender.lock().await;
1123                    *guard = Some(sender);
1124                    Ok((1, receiver))
1125                }
1126                .boxed()
1127            })
1128        };
1129        let (manager, (_join_handle, _stop_tx)) =
1130            SinkCoordinatorManager::start_worker_with_spawn_worker({
1131                let expected_param = param.clone();
1132                let metadata = metadata.clone();
1133                let metadata_scale_out = metadata_scale_out.clone();
1134                let metadata_scale_in = metadata_scale_in.clone();
1135                let db = db.clone();
1136                move |param, new_writer_rx| {
1137                    let metadata = metadata.clone();
1138                    let metadata_scale_out = metadata_scale_out.clone();
1139                    let metadata_scale_in = metadata_scale_in.clone();
1140                    let expected_param = expected_param.clone();
1141                    let db = db.clone();
1142                    tokio::spawn({
1143                        let subscriber = mock_subscriber.clone();
1144                        async move {
1145                            // validate the start request
1146                            assert_eq!(param, expected_param);
1147                            CoordinatorWorker::execute_coordinator(
1148                                db,
1149                                param.clone(),
1150                                new_writer_rx,
1151                                MockSinglePhaseCoordinator::new_coordinator(
1152                                    0,
1153                                    move |epoch, metadata_list, count: &mut usize| {
1154                                        *count += 1;
1155                                        let mut metadata_list =
1156                                            metadata_list
1157                                                .into_iter()
1158                                                .map(|metadata| match metadata {
1159                                                    SinkMetadata {
1160                                                        metadata:
1161                                                            Some(Metadata::Serialized(
1162                                                                SerializedMetadata { metadata },
1163                                                            )),
1164                                                    } => metadata,
1165                                                    _ => unreachable!(),
1166                                                })
1167                                                .collect_vec();
1168                                        metadata_list.sort();
1169                                        let (expected_epoch, expected_metadata_list) = match *count
1170                                        {
1171                                            1 => (epoch1, metadata[0].as_slice()),
1172                                            2 => (epoch2, metadata[1].as_slice()),
1173                                            3 => (epoch3, metadata_scale_out.as_slice()),
1174                                            4 => (epoch4, metadata_scale_in.as_slice()),
1175                                            _ => unreachable!(),
1176                                        };
1177                                        assert_eq!(expected_epoch, epoch);
1178                                        assert_eq!(expected_metadata_list, &metadata_list);
1179                                        Ok(())
1180                                    },
1181                                ),
1182                                subscriber.clone(),
1183                            )
1184                            .await;
1185                        }
1186                    })
1187                }
1188            });
1189
1190        let build_client = |vnode| async {
1191            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1192                Ok(tonic::Response::new(
1193                    manager
1194                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1195                        .await
1196                        .unwrap()
1197                        .boxed(),
1198                ))
1199            })
1200            .await
1201        };
1202
1203        let ((mut client1, _), (mut client2, _)) =
1204            try_join(build_client(vnode1), pin!(build_client(vnode2)))
1205                .await
1206                .unwrap();
1207
1208        let (aligned_epoch1, aligned_epoch2) = try_join(
1209            client1.align_initial_epoch(epoch1),
1210            client2.align_initial_epoch(epoch1),
1211        )
1212        .await
1213        .unwrap();
1214        assert_eq!(aligned_epoch1, epoch1);
1215        assert_eq!(aligned_epoch2, epoch1);
1216
1217        {
1218            // commit epoch1
1219            let mut commit_future = pin!(
1220                client2
1221                    .commit(
1222                        epoch1,
1223                        SinkMetadata {
1224                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1225                                metadata: metadata[0][1].clone(),
1226                            })),
1227                        },
1228                        None,
1229                    )
1230                    .map(|result| result.unwrap())
1231            );
1232            assert!(
1233                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1234                    .await
1235                    .is_pending()
1236            );
1237            join(
1238                commit_future,
1239                client1
1240                    .commit(
1241                        epoch1,
1242                        SinkMetadata {
1243                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1244                                metadata: metadata[0][0].clone(),
1245                            })),
1246                        },
1247                        None,
1248                    )
1249                    .map(|result| result.unwrap()),
1250            )
1251            .await;
1252        }
1253
1254        let (vnode1, vnode2, vnode3) = {
1255            let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1256            let (second, third) = second.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1257            (
1258                build_bitmap(first),
1259                build_bitmap(second),
1260                build_bitmap(third),
1261            )
1262        };
1263
1264        let mut build_client3_future = pin!(build_client(vnode3));
1265        assert!(
1266            poll_fn(|cx| Poll::Ready(build_client3_future.as_mut().poll(cx)))
1267                .await
1268                .is_pending()
1269        );
1270        let mut client3;
1271        {
1272            {
1273                // commit epoch2
1274                let mut commit_future = pin!(
1275                    client1
1276                        .commit(
1277                            epoch2,
1278                            SinkMetadata {
1279                                metadata: Some(Metadata::Serialized(SerializedMetadata {
1280                                    metadata: metadata[1][0].clone(),
1281                                })),
1282                            },
1283                            None,
1284                        )
1285                        .map_err(Into::into)
1286                );
1287                assert!(
1288                    poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1289                        .await
1290                        .is_pending()
1291                );
1292                try_join(
1293                    commit_future,
1294                    client2.commit(
1295                        epoch2,
1296                        SinkMetadata {
1297                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1298                                metadata: metadata[1][1].clone(),
1299                            })),
1300                        },
1301                        None,
1302                    ),
1303                )
1304                .await
1305                .unwrap();
1306            }
1307
1308            client3 = {
1309                let (
1310                    (client3, init_epoch),
1311                    (update_vnode_bitmap_epoch1, update_vnode_bitmap_epoch2),
1312                ) = try_join(
1313                    build_client3_future,
1314                    try_join(
1315                        client1.update_vnode_bitmap(&vnode1),
1316                        client2.update_vnode_bitmap(&vnode2),
1317                    )
1318                    .map_err(Into::into),
1319                )
1320                .await
1321                .unwrap();
1322                assert_eq!(init_epoch, Some(epoch2));
1323                assert_eq!(update_vnode_bitmap_epoch1, epoch2);
1324                assert_eq!(update_vnode_bitmap_epoch2, epoch2);
1325                client3
1326            };
1327            let mut commit_future3 = pin!(client3.commit(
1328                epoch3,
1329                SinkMetadata {
1330                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1331                        metadata: metadata_scale_out[2].clone(),
1332                    })),
1333                },
1334                None,
1335            ));
1336            assert!(
1337                poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1338                    .await
1339                    .is_pending()
1340            );
1341            let mut commit_future1 = pin!(client1.commit(
1342                epoch3,
1343                SinkMetadata {
1344                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1345                        metadata: metadata_scale_out[0].clone(),
1346                    })),
1347                },
1348                None,
1349            ));
1350            assert!(
1351                poll_fn(|cx| Poll::Ready(commit_future1.as_mut().poll(cx)))
1352                    .await
1353                    .is_pending()
1354            );
1355            assert!(
1356                poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1357                    .await
1358                    .is_pending()
1359            );
1360            try_join(
1361                client2.commit(
1362                    epoch3,
1363                    SinkMetadata {
1364                        metadata: Some(Metadata::Serialized(SerializedMetadata {
1365                            metadata: metadata_scale_out[1].clone(),
1366                        })),
1367                    },
1368                    None,
1369                ),
1370                try_join(commit_future1, commit_future3),
1371            )
1372            .await
1373            .unwrap();
1374        }
1375
1376        let (vnode2, vnode3) = {
1377            let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1378            (build_bitmap(first), build_bitmap(second))
1379        };
1380
1381        {
1382            let (_, (update_vnode_bitmap_epoch2, update_vnode_bitmap_epoch3)) = try_join(
1383                client1.stop(),
1384                try_join(
1385                    client2.update_vnode_bitmap(&vnode2),
1386                    client3.update_vnode_bitmap(&vnode3),
1387                ),
1388            )
1389            .await
1390            .unwrap();
1391            assert_eq!(update_vnode_bitmap_epoch2, epoch3);
1392            assert_eq!(update_vnode_bitmap_epoch3, epoch3);
1393        }
1394
1395        {
1396            let mut commit_future = pin!(
1397                client2
1398                    .commit(
1399                        epoch4,
1400                        SinkMetadata {
1401                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1402                                metadata: metadata_scale_in[0].clone(),
1403                            })),
1404                        },
1405                        None,
1406                    )
1407                    .map(|result| result.unwrap())
1408            );
1409            assert!(
1410                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1411                    .await
1412                    .is_pending()
1413            );
1414            join(
1415                commit_future,
1416                client3
1417                    .commit(
1418                        epoch4,
1419                        SinkMetadata {
1420                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1421                                metadata: metadata_scale_in[1].clone(),
1422                            })),
1423                        },
1424                        None,
1425                    )
1426                    .map(|result| result.unwrap()),
1427            )
1428            .await;
1429        }
1430    }
1431
1432    struct MockTwoPhaseCoordinator<
1433        P: FnMut(
1434            u64,
1435            Vec<SinkMetadata>,
1436            Option<PbSinkSchemaChange>,
1437        ) -> Result<Option<Vec<u8>>, SinkError>,
1438        CD: FnMut(u64, Vec<u8>) -> Result<(), SinkError>,
1439        CS: FnMut(u64, PbSinkSchemaChange) -> Result<(), SinkError>,
1440    > {
1441        pre_commit: P,
1442        commit_data: CD,
1443        commit_schema_change: CS,
1444    }
1445
1446    impl<
1447        P: FnMut(
1448                u64,
1449                Vec<SinkMetadata>,
1450                Option<PbSinkSchemaChange>,
1451            ) -> Result<Option<Vec<u8>>, SinkError>
1452            + Send
1453            + 'static,
1454        CD: FnMut(u64, Vec<u8>) -> Result<(), SinkError> + Send + 'static,
1455        CS: FnMut(u64, PbSinkSchemaChange) -> Result<(), SinkError> + Send + 'static,
1456    > MockTwoPhaseCoordinator<P, CD, CS>
1457    {
1458        fn new_coordinator(
1459            pre_commit: P,
1460            commit_data: CD,
1461            commit_schema_change: CS,
1462        ) -> SinkCommitCoordinator {
1463            SinkCommitCoordinator::TwoPhase(Box::new(MockTwoPhaseCoordinator {
1464                pre_commit,
1465                commit_data,
1466                commit_schema_change,
1467            }))
1468        }
1469    }
1470
1471    #[async_trait]
1472    impl<
1473        P: FnMut(
1474                u64,
1475                Vec<SinkMetadata>,
1476                Option<PbSinkSchemaChange>,
1477            ) -> Result<Option<Vec<u8>>, SinkError>
1478            + Send
1479            + 'static,
1480        CD: FnMut(u64, Vec<u8>) -> Result<(), SinkError> + Send + 'static,
1481        CS: FnMut(u64, PbSinkSchemaChange) -> Result<(), SinkError> + Send + 'static,
1482    > TwoPhaseCommitCoordinator for MockTwoPhaseCoordinator<P, CD, CS>
1483    {
1484        async fn init(&mut self) -> risingwave_connector::sink::Result<()> {
1485            Ok(())
1486        }
1487
1488        async fn pre_commit(
1489            &mut self,
1490            epoch: u64,
1491            metadata: Vec<SinkMetadata>,
1492            schema_change: Option<PbSinkSchemaChange>,
1493        ) -> risingwave_connector::sink::Result<Option<Vec<u8>>> {
1494            (self.pre_commit)(epoch, metadata, schema_change)
1495        }
1496
1497        async fn commit_data(
1498            &mut self,
1499            epoch: u64,
1500            commit_metadata: Vec<u8>,
1501        ) -> risingwave_connector::sink::Result<()> {
1502            (self.commit_data)(epoch, commit_metadata)
1503        }
1504
1505        async fn commit_schema_change(
1506            &mut self,
1507            epoch: u64,
1508            schema_change: PbSinkSchemaChange,
1509        ) -> risingwave_connector::sink::Result<()> {
1510            (self.commit_schema_change)(epoch, schema_change)
1511        }
1512
1513        async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
1514            tracing::debug!("abort called");
1515        }
1516    }
1517
1518    async fn prepare_db_backend() -> DatabaseConnection {
1519        let db: DatabaseConnection = Database::connect("sqlite::memory:").await.unwrap();
1520        let ddl = "
1521            CREATE TABLE IF NOT EXISTS pending_sink_state (
1522                sink_id i32 NOT NULL,
1523                epoch i64 NOT NULL,
1524                sink_state STRING NOT NULL,
1525                metadata BLOB NOT NULL,
1526                schema_change BLOB,
1527                PRIMARY KEY (sink_id, epoch)
1528            )
1529        ";
1530        db.execute(sea_orm::Statement::from_string(
1531            db.get_database_backend(),
1532            ddl.to_owned(),
1533        ))
1534        .await
1535        .unwrap();
1536        db
1537    }
1538
1539    async fn list_rows(
1540        db: &DatabaseConnection,
1541    ) -> Vec<(i32, i64, String, Vec<u8>, Option<PbSinkSchemaChange>)> {
1542        let sql =
1543            "SELECT sink_id, epoch, sink_state, metadata, schema_change FROM pending_sink_state";
1544        let rows = db
1545            .query_all(sea_orm::Statement::from_string(
1546                db.get_database_backend(),
1547                sql.to_owned(),
1548            ))
1549            .await
1550            .unwrap();
1551        rows.into_iter()
1552            .map(|row| {
1553                (
1554                    row.try_get("", "sink_id").unwrap(),
1555                    row.try_get("", "epoch").unwrap(),
1556                    row.try_get("", "sink_state").unwrap(),
1557                    row.try_get("", "metadata").unwrap(),
1558                    row.try_get::<Option<SinkSchemachange>>("", "schema_change")
1559                        .unwrap()
1560                        .map(|v| v.to_protobuf()),
1561                )
1562            })
1563            .collect()
1564    }
1565
1566    async fn set_epoch_aborted(db: &DatabaseConnection, sink_id: SinkId, epoch: u64) {
1567        let sql = format!(
1568            "UPDATE pending_sink_state SET sink_state = 'ABORTED' WHERE sink_id = {} AND epoch = {}",
1569            sink_id, epoch as i64
1570        );
1571        db.execute(sea_orm::Statement::from_string(
1572            db.get_database_backend(),
1573            sql,
1574        ))
1575        .await
1576        .unwrap();
1577    }
1578
1579    #[tokio::test]
1580    async fn test_pre_commit_failed() {
1581        let db = prepare_db_backend().await;
1582
1583        let param = SinkParam {
1584            sink_id: SinkId::from(1),
1585            sink_name: "test".into(),
1586            properties: Default::default(),
1587            columns: vec![],
1588            downstream_pk: None,
1589            sink_type: SinkType::AppendOnly,
1590            ignore_delete: false,
1591            format_desc: None,
1592            db_name: "test".into(),
1593            sink_from_name: "test".into(),
1594        };
1595
1596        let epoch1 = 233;
1597
1598        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1599        let build_bitmap = |indexes: &[usize]| {
1600            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1601            for i in indexes {
1602                builder.set(*i, true);
1603            }
1604            builder.finish()
1605        };
1606        let vnode = build_bitmap(&all_vnode);
1607
1608        let metadata = vec![1u8, 2u8];
1609        let sender = Arc::new(tokio::sync::Mutex::new(None));
1610        let mock_subscriber: SinkCommittedEpochSubscriber = {
1611            let captured_sender = sender.clone();
1612            Arc::new(move |_sink_id: SinkId| {
1613                let (sender, receiver) = unbounded_channel();
1614                let captured_sender = captured_sender.clone();
1615                async move {
1616                    let mut guard = captured_sender.lock().await;
1617                    *guard = Some(sender);
1618                    Ok((epoch1, receiver))
1619                }
1620                .boxed()
1621            })
1622        };
1623
1624        let (manager, (_join_handle, _stop_tx)) =
1625            SinkCoordinatorManager::start_worker_with_spawn_worker({
1626                let expected_param = param.clone();
1627                let db = db.clone();
1628                move |param, new_writer_rx| {
1629                    let expected_param = expected_param.clone();
1630                    let db = db.clone();
1631                    tokio::spawn({
1632                        let subscriber = mock_subscriber.clone();
1633                        async move {
1634                            // validate the start request
1635                            assert_eq!(param, expected_param);
1636                            CoordinatorWorker::execute_coordinator(
1637                                db,
1638                                param.clone(),
1639                                new_writer_rx,
1640                                MockTwoPhaseCoordinator::new_coordinator(
1641                                    move |_epoch, _metadata_list, _schema_change| {
1642                                        Err(SinkError::Coordinator(anyhow!("failed to pre commit")))
1643                                    },
1644                                    move |_epoch, _commit_metadata| unreachable!(),
1645                                    move |_epoch, _schema_change| unreachable!(),
1646                                ),
1647                                subscriber.clone(),
1648                            )
1649                            .await;
1650                        }
1651                    })
1652                }
1653            });
1654
1655        let build_client = |vnode| async {
1656            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1657                Ok(tonic::Response::new(
1658                    manager
1659                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1660                        .await
1661                        .unwrap()
1662                        .boxed(),
1663                ))
1664            })
1665            .await
1666            .unwrap()
1667            .0
1668        };
1669
1670        let mut client = build_client(vnode).await;
1671
1672        let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
1673        assert_eq!(aligned_epoch, 1);
1674
1675        let commit_result = client
1676            .commit(
1677                epoch1,
1678                SinkMetadata {
1679                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1680                        metadata: metadata.clone(),
1681                    })),
1682                },
1683                None,
1684            )
1685            .await;
1686        assert!(commit_result.is_err());
1687
1688        let rows = list_rows(&db).await;
1689        assert!(rows.is_empty());
1690    }
1691
1692    #[tokio::test]
1693    async fn test_waiting_on_checkpoint() {
1694        let db = prepare_db_backend().await;
1695
1696        let param = SinkParam {
1697            sink_id: SinkId::from(1),
1698            sink_name: "test".into(),
1699            properties: Default::default(),
1700            columns: vec![],
1701            downstream_pk: None,
1702            sink_type: SinkType::AppendOnly,
1703            ignore_delete: false,
1704            format_desc: None,
1705            db_name: "test".into(),
1706            sink_from_name: "test".into(),
1707        };
1708
1709        let epoch0 = 232;
1710        let epoch1 = 233;
1711
1712        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1713        let build_bitmap = |indexes: &[usize]| {
1714            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1715            for i in indexes {
1716                builder.set(*i, true);
1717            }
1718            builder.finish()
1719        };
1720        let vnode = build_bitmap(&all_vnode);
1721
1722        let metadata = vec![1u8, 2u8];
1723
1724        let sender = Arc::new(tokio::sync::Mutex::new(None));
1725        let mock_subscriber: SinkCommittedEpochSubscriber = {
1726            let captured_sender = sender.clone();
1727            Arc::new(move |_sink_id: SinkId| {
1728                let (sender, receiver) = unbounded_channel();
1729                let captured_sender = captured_sender.clone();
1730                async move {
1731                    let mut guard = captured_sender.lock().await;
1732                    *guard = Some(sender);
1733                    Ok((epoch0, receiver))
1734                }
1735                .boxed()
1736            })
1737        };
1738
1739        let (manager, (_join_handle, _stop_tx)) =
1740            SinkCoordinatorManager::start_worker_with_spawn_worker({
1741                let expected_param = param.clone();
1742                let metadata = metadata.clone();
1743                let db = db.clone();
1744                move |param, new_writer_rx| {
1745                    let metadata = metadata.clone();
1746                    let expected_param = expected_param.clone();
1747                    let db = db.clone();
1748                    tokio::spawn({
1749                        let subscriber = mock_subscriber.clone();
1750                        async move {
1751                            // validate the start request
1752                            assert_eq!(param, expected_param);
1753                            CoordinatorWorker::execute_coordinator(
1754                                db,
1755                                param.clone(),
1756                                new_writer_rx,
1757                                MockTwoPhaseCoordinator::new_coordinator(
1758                                    move |_epoch, metadata_list, _schema_change| {
1759                                        let metadata =
1760                                            metadata_list.into_iter().exactly_one().unwrap();
1761                                        Ok(match metadata.metadata {
1762                                            Some(Metadata::Serialized(SerializedMetadata {
1763                                                metadata,
1764                                            })) => Some(metadata),
1765                                            _ => unreachable!(),
1766                                        })
1767                                    },
1768                                    move |_epoch, commit_metadata| {
1769                                        assert_eq!(commit_metadata, metadata);
1770                                        Ok(())
1771                                    },
1772                                    move |_epoch, _schema_change| unreachable!(),
1773                                ),
1774                                subscriber.clone(),
1775                            )
1776                            .await;
1777                        }
1778                    })
1779                }
1780            });
1781
1782        let build_client = |vnode| async {
1783            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1784                Ok(tonic::Response::new(
1785                    manager
1786                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1787                        .await
1788                        .unwrap()
1789                        .boxed(),
1790                ))
1791            })
1792            .await
1793            .unwrap()
1794            .0
1795        };
1796
1797        let mut client = build_client(vnode).await;
1798
1799        let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
1800        assert_eq!(aligned_epoch, 1);
1801
1802        client
1803            .commit(
1804                epoch1,
1805                SinkMetadata {
1806                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1807                        metadata: metadata.clone(),
1808                    })),
1809                },
1810                None,
1811            )
1812            .await
1813            .unwrap();
1814
1815        {
1816            let rows = list_rows(&db).await;
1817            assert_eq!(rows.len(), 1);
1818            assert_eq!(rows[0].1, epoch1 as i64);
1819            assert_eq!(rows[0].2, "PENDING");
1820
1821            let guard = sender.lock().await;
1822            let sender = guard.as_ref().unwrap().clone();
1823            sender.send(233).unwrap();
1824        }
1825
1826        // wait max 5 seconds for the commit to be processed
1827        for _ in 0..50 {
1828            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1829            let rows = list_rows(&db).await;
1830            if rows[0].2 == "COMMITTED" {
1831                break;
1832            }
1833        }
1834
1835        {
1836            let rows = list_rows(&db).await;
1837            assert_eq!(rows.len(), 1);
1838            assert_eq!(rows[0].1, epoch1 as i64);
1839            assert_eq!(rows[0].2, "COMMITTED");
1840        }
1841    }
1842
1843    #[tokio::test]
1844    async fn test_commit_retry_loop() {
1845        let db = prepare_db_backend().await;
1846
1847        let param = SinkParam {
1848            sink_id: SinkId::from(1),
1849            sink_name: "test".into(),
1850            properties: Default::default(),
1851            columns: vec![],
1852            downstream_pk: None,
1853            sink_type: SinkType::AppendOnly,
1854            ignore_delete: false,
1855            format_desc: None,
1856            db_name: "test".into(),
1857            sink_from_name: "test".into(),
1858        };
1859
1860        let epoch1 = 233;
1861
1862        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1863        let build_bitmap = |indexes: &[usize]| {
1864            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1865            for i in indexes {
1866                builder.set(*i, true);
1867            }
1868            builder.finish()
1869        };
1870        let vnode = build_bitmap(&all_vnode);
1871
1872        let metadata = vec![1u8, 2u8];
1873        let sender = Arc::new(tokio::sync::Mutex::new(None));
1874        let mock_subscriber: SinkCommittedEpochSubscriber = {
1875            let captured_sender = sender.clone();
1876            Arc::new(move |_sink_id: SinkId| {
1877                let (sender, receiver) = unbounded_channel();
1878                let captured_sender = captured_sender.clone();
1879                async move {
1880                    let mut guard = captured_sender.lock().await;
1881                    *guard = Some(sender);
1882                    Ok((epoch1, receiver))
1883                }
1884                .boxed()
1885            })
1886        };
1887
1888        let commit_attempt = Arc::new(AtomicI32::new(0));
1889
1890        let (manager, (_join_handle, _stop_tx)) =
1891            SinkCoordinatorManager::start_worker_with_spawn_worker({
1892                let expected_param = param.clone();
1893                let metadata = metadata.clone();
1894                let db = db.clone();
1895                let commit_attempt = commit_attempt.clone();
1896                move |param, new_writer_rx| {
1897                    let metadata = metadata.clone();
1898                    let expected_param = expected_param.clone();
1899                    let db = db.clone();
1900                    let commit_attempt = commit_attempt.clone();
1901                    tokio::spawn({
1902                        let subscriber = mock_subscriber.clone();
1903                        async move {
1904                            // validate the start request
1905                            assert_eq!(param, expected_param);
1906                            CoordinatorWorker::execute_coordinator(
1907                                db,
1908                                param.clone(),
1909                                new_writer_rx,
1910                                MockTwoPhaseCoordinator::new_coordinator(
1911                                    move |_epoch, metadata_list, _schema_change| {
1912                                        let metadata =
1913                                            metadata_list.into_iter().exactly_one().unwrap();
1914                                        Ok(match metadata.metadata {
1915                                            Some(Metadata::Serialized(SerializedMetadata {
1916                                                metadata,
1917                                            })) => Some(metadata),
1918                                            _ => unreachable!(),
1919                                        })
1920                                    },
1921                                    move |_epoch, commit_metadata| {
1922                                        assert_eq!(commit_metadata, metadata);
1923                                        if commit_attempt
1924                                            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
1925                                            < 2
1926                                        {
1927                                            Err(SinkError::Coordinator(anyhow!("failed to commit")))
1928                                        } else {
1929                                            Ok(())
1930                                        }
1931                                    },
1932                                    move |_epoch, _schema_change| unreachable!(),
1933                                ),
1934                                subscriber.clone(),
1935                            )
1936                            .await;
1937                        }
1938                    })
1939                }
1940            });
1941
1942        let build_client = |vnode| async {
1943            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1944                Ok(tonic::Response::new(
1945                    manager
1946                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1947                        .await
1948                        .unwrap()
1949                        .boxed(),
1950                ))
1951            })
1952            .await
1953            .unwrap()
1954            .0
1955        };
1956
1957        let mut client = build_client(vnode).await;
1958
1959        let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
1960        assert_eq!(aligned_epoch, 1);
1961
1962        client
1963            .commit(
1964                epoch1,
1965                SinkMetadata {
1966                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1967                        metadata: metadata.clone(),
1968                    })),
1969                },
1970                None,
1971            )
1972            .await
1973            .unwrap();
1974
1975        // wait max 10 seconds for the commit to be processed
1976        for _ in 0..100 {
1977            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1978            let rows = list_rows(&db).await;
1979            if rows[0].2 == "COMMITTED" {
1980                break;
1981            }
1982        }
1983
1984        assert_eq!(commit_attempt.load(std::sync::atomic::Ordering::SeqCst), 3);
1985
1986        {
1987            let rows = list_rows(&db).await;
1988            assert_eq!(rows.len(), 1);
1989            assert_eq!(rows[0].1, epoch1 as i64);
1990            assert_eq!(rows[0].2, "COMMITTED");
1991        }
1992    }
1993
1994    #[tokio::test]
1995    async fn test_aborted() {
1996        let db = prepare_db_backend().await;
1997
1998        let param = SinkParam {
1999            sink_id: SinkId::from(1),
2000            sink_name: "test".into(),
2001            properties: Default::default(),
2002            columns: vec![],
2003            downstream_pk: None,
2004            sink_type: SinkType::AppendOnly,
2005            ignore_delete: false,
2006            format_desc: None,
2007            db_name: "test".into(),
2008            sink_from_name: "test".into(),
2009        };
2010
2011        let epoch0 = 232;
2012        let epoch1 = 233;
2013
2014        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
2015        let build_bitmap = |indexes: &[usize]| {
2016            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
2017            for i in indexes {
2018                builder.set(*i, true);
2019            }
2020            builder.finish()
2021        };
2022        let vnode = build_bitmap(&all_vnode);
2023
2024        let metadata = vec![1u8, 2u8];
2025
2026        let sender = Arc::new(tokio::sync::Mutex::new(None));
2027        let mock_subscriber: SinkCommittedEpochSubscriber = {
2028            let captured_sender = sender.clone();
2029            Arc::new(move |_sink_id: SinkId| {
2030                let (sender, receiver) = unbounded_channel();
2031                let captured_sender = captured_sender.clone();
2032                async move {
2033                    let mut guard = captured_sender.lock().await;
2034                    *guard = Some(sender);
2035                    Ok((epoch0, receiver))
2036                }
2037                .boxed()
2038            })
2039        };
2040
2041        let (manager, (_join_handle, _stop_tx)) =
2042            SinkCoordinatorManager::start_worker_with_spawn_worker({
2043                let expected_param = param.clone();
2044                let metadata = metadata.clone();
2045                let db = db.clone();
2046                move |param, new_writer_rx| {
2047                    let metadata = metadata.clone();
2048                    let expected_param = expected_param.clone();
2049                    let db = db.clone();
2050                    tokio::spawn({
2051                        let subscriber = mock_subscriber.clone();
2052                        async move {
2053                            // validate the start request
2054                            assert_eq!(param, expected_param);
2055                            CoordinatorWorker::execute_coordinator(
2056                                db,
2057                                param.clone(),
2058                                new_writer_rx,
2059                                MockTwoPhaseCoordinator::new_coordinator(
2060                                    move |_epoch, metadata_list, _schema_change| {
2061                                        let metadata =
2062                                            metadata_list.into_iter().exactly_one().unwrap();
2063                                        Ok(match metadata.metadata {
2064                                            Some(Metadata::Serialized(SerializedMetadata {
2065                                                metadata,
2066                                            })) => Some(metadata),
2067                                            _ => unreachable!(),
2068                                        })
2069                                    },
2070                                    move |_epoch, commit_metadata| {
2071                                        assert_eq!(commit_metadata, metadata);
2072                                        Ok(())
2073                                    },
2074                                    move |_epoch, _schema_change| unreachable!(),
2075                                ),
2076                                subscriber.clone(),
2077                            )
2078                            .await;
2079                        }
2080                    })
2081                }
2082            });
2083
2084        let build_client = |vnode| async {
2085            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
2086                Ok(tonic::Response::new(
2087                    manager
2088                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
2089                        .await
2090                        .unwrap()
2091                        .boxed(),
2092                ))
2093            })
2094            .await
2095            .unwrap()
2096            .0
2097        };
2098
2099        let mut client = build_client(vnode.clone()).await;
2100
2101        let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
2102        assert_eq!(aligned_epoch, 1);
2103
2104        client
2105            .commit(
2106                epoch1,
2107                SinkMetadata {
2108                    metadata: Some(Metadata::Serialized(SerializedMetadata {
2109                        metadata: metadata.clone(),
2110                    })),
2111                },
2112                None,
2113            )
2114            .await
2115            .unwrap();
2116
2117        manager.stop_sink_coordinator(vec![SinkId::from(1)]).await;
2118
2119        {
2120            let rows = list_rows(&db).await;
2121            assert_eq!(rows.len(), 1);
2122            assert_eq!(rows[0].1, epoch1 as i64);
2123            assert_eq!(rows[0].2, "PENDING");
2124
2125            set_epoch_aborted(&db, SinkId::from(1), epoch1).await;
2126            let rows = list_rows(&db).await;
2127            assert_eq!(rows.len(), 1);
2128            assert_eq!(rows[0].1, epoch1 as i64);
2129            assert_eq!(rows[0].2, "ABORTED");
2130        }
2131
2132        let mut client = build_client(vnode).await;
2133
2134        let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
2135        assert_eq!(aligned_epoch, 1);
2136
2137        {
2138            let rows = list_rows(&db).await;
2139            assert!(rows.is_empty());
2140        }
2141    }
2142
2143    #[tokio::test]
2144    async fn test_flush_when_reschedule() {
2145        let db = prepare_db_backend().await;
2146
2147        let param = SinkParam {
2148            sink_id: SinkId::from(1),
2149            sink_name: "test".into(),
2150            properties: Default::default(),
2151            columns: vec![],
2152            downstream_pk: None,
2153            sink_type: SinkType::AppendOnly,
2154            ignore_delete: false,
2155            format_desc: None,
2156            db_name: "test".into(),
2157            sink_from_name: "test".into(),
2158        };
2159
2160        let epoch0 = 232;
2161        let epoch1 = 233;
2162
2163        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
2164        let build_bitmap = |indexes: &[usize]| {
2165            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
2166            for i in indexes {
2167                builder.set(*i, true);
2168            }
2169            builder.finish()
2170        };
2171        let vnode = build_bitmap(&all_vnode);
2172
2173        let metadata = vec![1u8, 2u8];
2174        let schema_change = PbSinkSchemaChange {
2175            original_schema: vec![PbField {
2176                data_type: Some(PbDataType {
2177                    type_name: PbTypeName::Int32 as i32,
2178                    ..Default::default()
2179                }),
2180                name: "col_v1".into(),
2181            }],
2182            op: Some(SinkSchemachangeOp::AddColumns(PbSinkAddColumnsOp {
2183                fields: vec![PbField {
2184                    data_type: Some(PbDataType {
2185                        type_name: PbTypeName::Varchar as i32,
2186                        ..Default::default()
2187                    }),
2188                    name: "new_col".into(),
2189                }],
2190            })),
2191        };
2192
2193        let sender = Arc::new(tokio::sync::Mutex::new(None));
2194        let mock_subscriber: SinkCommittedEpochSubscriber = {
2195            let captured_sender = sender.clone();
2196            Arc::new(move |_sink_id: SinkId| {
2197                let (sender, receiver) = unbounded_channel();
2198                let captured_sender = captured_sender.clone();
2199                async move {
2200                    let mut guard = captured_sender.lock().await;
2201                    *guard = Some(sender);
2202                    Ok((epoch0, receiver))
2203                }
2204                .boxed()
2205            })
2206        };
2207
2208        let (manager, (_join_handle, _stop_tx)) =
2209            SinkCoordinatorManager::start_worker_with_spawn_worker({
2210                let expected_param = param.clone();
2211                let metadata = metadata.clone();
2212                let schema_change = schema_change.clone();
2213                let db = db.clone();
2214                move |param, new_writer_rx| {
2215                    let metadata = metadata.clone();
2216                    let schema_change_for_pre_commit = schema_change.clone();
2217                    let schema_change_for_commit = schema_change.clone();
2218                    let expected_param = expected_param.clone();
2219                    let db = db.clone();
2220                    tokio::spawn({
2221                        let subscriber = mock_subscriber.clone();
2222                        async move {
2223                            assert_eq!(param, expected_param);
2224                            CoordinatorWorker::execute_coordinator(
2225                                db,
2226                                param.clone(),
2227                                new_writer_rx,
2228                                MockTwoPhaseCoordinator::new_coordinator(
2229                                    move |_epoch, metadata_list, schema_change| {
2230                                        assert_eq!(
2231                                            schema_change,
2232                                            Some(schema_change_for_pre_commit.clone())
2233                                        );
2234                                        let metadata =
2235                                            metadata_list.into_iter().exactly_one().unwrap();
2236                                        Ok(match metadata.metadata {
2237                                            Some(Metadata::Serialized(SerializedMetadata {
2238                                                metadata,
2239                                            })) => Some(metadata),
2240                                            _ => unreachable!(),
2241                                        })
2242                                    },
2243                                    move |_epoch, commit_metadata| {
2244                                        assert_eq!(commit_metadata, metadata);
2245                                        Ok(())
2246                                    },
2247                                    move |_epoch, schema_change| {
2248                                        assert_eq!(schema_change, schema_change_for_commit.clone());
2249                                        Ok(())
2250                                    },
2251                                ),
2252                                subscriber.clone(),
2253                            )
2254                            .await;
2255                        }
2256                    })
2257                }
2258            });
2259
2260        let build_client = |vnode| async {
2261            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
2262                Ok(tonic::Response::new(
2263                    manager
2264                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
2265                        .await
2266                        .unwrap()
2267                        .boxed(),
2268                ))
2269            })
2270            .await
2271        };
2272
2273        let (mut client1, _) = build_client(vnode.clone()).await.unwrap();
2274
2275        let aligned_epoch = client1.align_initial_epoch(1).await.unwrap();
2276        assert_eq!(aligned_epoch, 1);
2277
2278        client1
2279            .commit(
2280                epoch1,
2281                SinkMetadata {
2282                    metadata: Some(Metadata::Serialized(SerializedMetadata {
2283                        metadata: metadata.clone(),
2284                    })),
2285                },
2286                Some(schema_change.clone()),
2287            )
2288            .await
2289            .unwrap();
2290
2291        {
2292            let rows = list_rows(&db).await;
2293            assert_eq!(rows.len(), 1);
2294            assert_eq!(rows[0].1, epoch1 as i64);
2295            assert_eq!(rows[0].2, "PENDING");
2296            assert_eq!(rows[0].4, Some(schema_change.clone()));
2297        }
2298
2299        let mut build_client2_future = pin!(build_client(vnode.clone()));
2300        assert!(
2301            poll_fn(|cx| Poll::Ready(build_client2_future.as_mut().poll(cx)))
2302                .await
2303                .is_pending()
2304        );
2305
2306        client1.stop().await.unwrap();
2307
2308        assert!(
2309            poll_fn(|cx| Poll::Ready(build_client2_future.as_mut().poll(cx)))
2310                .await
2311                .is_pending()
2312        );
2313
2314        {
2315            let guard = sender.lock().await;
2316            let sender = guard.as_ref().unwrap().clone();
2317            sender.send(epoch1).unwrap();
2318        }
2319
2320        let (_, init_epoch) = build_client2_future.await.unwrap();
2321        assert_eq!(init_epoch, Some(epoch1));
2322
2323        {
2324            let rows = list_rows(&db).await;
2325            assert_eq!(rows.len(), 1);
2326            assert_eq!(rows[0].1, epoch1 as i64);
2327            assert_eq!(rows[0].2, "COMMITTED");
2328            assert_eq!(rows[0].4, Some(schema_change.clone()));
2329        }
2330    }
2331}