risingwave_meta/manager/sink_coordination/
manager.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use futures::future::{BoxFuture, Either, join_all, select};
21use futures::stream::FuturesUnordered;
22use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
25use risingwave_connector::sink::catalog::SinkId;
26use risingwave_connector::sink::{SinkCommittedEpochSubscriber, SinkError, SinkParam};
27use risingwave_pb::connector_service::coordinate_request::Msg;
28use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse, coordinate_request};
29use rw_futures_util::pending_on_none;
30use sea_orm::DatabaseConnection;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
33use tokio::sync::oneshot::{Receiver, Sender, channel};
34use tokio::sync::{mpsc, oneshot};
35use tokio::task::{JoinError, JoinHandle};
36use tokio_stream::wrappers::UnboundedReceiverStream;
37use tonic::Status;
38use tracing::{debug, error, info, warn};
39
40use crate::hummock::HummockManagerRef;
41use crate::manager::MetadataManager;
42use crate::manager::sink_coordination::SinkWriterRequestStream;
43use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
44use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
45
46macro_rules! send_with_err_check {
47    ($tx:expr, $msg:expr) => {
48        if $tx.send($msg).is_err() {
49            error!("unable to send msg");
50        }
51    };
52}
53
54macro_rules! send_await_with_err_check {
55    ($tx:expr, $msg:expr) => {
56        if $tx.send($msg).await.is_err() {
57            error!("unable to send msg");
58        }
59    };
60}
61
62const BOUNDED_CHANNEL_SIZE: usize = 16;
63
64enum ManagerRequest {
65    NewSinkWriter(SinkWriterCoordinationHandle),
66    StopCoordinator {
67        finish_notifier: Sender<()>,
68        /// sink id to stop. When `None`, stop all sink coordinator
69        sink_ids: Option<Vec<SinkId>>,
70    },
71}
72
73#[derive(Clone)]
74pub struct SinkCoordinatorManager {
75    request_tx: mpsc::Sender<ManagerRequest>,
76}
77
78fn new_committed_epoch_subscriber(
79    hummock_manager: HummockManagerRef,
80    metadata_manager: MetadataManager,
81) -> SinkCommittedEpochSubscriber {
82    Arc::new(move |sink_id| {
83        let hummock_manager = hummock_manager.clone();
84        let metadata_manager = metadata_manager.clone();
85        async move {
86            let state_table_ids = metadata_manager
87                .get_sink_state_table_ids(sink_id)
88                .await
89                .map_err(SinkError::from)?;
90            let Some(table_id) = state_table_ids.first() else {
91                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
92            };
93            hummock_manager
94                .subscribe_table_committed_epoch(*table_id)
95                .await
96                .map_err(SinkError::from)
97        }
98        .boxed()
99    })
100}
101
102impl SinkCoordinatorManager {
103    pub fn start_worker(
104        db: DatabaseConnection,
105        hummock_manager: HummockManagerRef,
106        metadata_manager: MetadataManager,
107        iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
108        await_tree_reg: await_tree::Registry,
109    ) -> (Self, (JoinHandle<()>, Sender<()>)) {
110        let subscriber = new_committed_epoch_subscriber(hummock_manager, metadata_manager);
111        Self::start_worker_with_spawn_worker({
112            move |param, manager_request_stream| {
113                let sink_id = param.sink_id;
114                let fut = CoordinatorWorker::run(
115                    param,
116                    manager_request_stream,
117                    db.clone(),
118                    subscriber.clone(),
119                    iceberg_compact_stat_sender.clone(),
120                );
121                tokio::spawn(
122                    await_tree_reg
123                        .register_derived_root(format!("Sink Coordinator {sink_id}"))
124                        .instrument(fut),
125                )
126            }
127        })
128    }
129
130    fn start_worker_with_spawn_worker(
131        spawn_coordinator_worker: impl SpawnCoordinatorFn,
132    ) -> (Self, (JoinHandle<()>, Sender<()>)) {
133        let (request_tx, request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
134        let (shutdown_tx, shutdown_rx) = channel();
135        let worker = ManagerWorker::new(request_rx, shutdown_rx);
136        let join_handle = tokio::spawn(worker.execute(spawn_coordinator_worker));
137        (
138            SinkCoordinatorManager { request_tx },
139            (join_handle, shutdown_tx),
140        )
141    }
142
143    pub async fn handle_new_request(
144        &self,
145        mut request_stream: SinkWriterRequestStream,
146    ) -> Result<impl Stream<Item = Result<CoordinateResponse, Status>> + use<>, Status> {
147        let (param, vnode_bitmap) = match request_stream.try_next().await? {
148            Some(CoordinateRequest {
149                msg:
150                    Some(Msg::StartRequest(coordinate_request::StartCoordinationRequest {
151                        param: Some(param),
152                        vnode_bitmap: Some(vnode_bitmap),
153                    })),
154            }) => (SinkParam::from_proto(param), Bitmap::from(&vnode_bitmap)),
155            msg => {
156                return Err(Status::invalid_argument(format!(
157                    "expected CoordinateRequest::StartRequest in the first request, get {:?}",
158                    msg
159                )));
160            }
161        };
162        let (response_tx, response_rx) = mpsc::unbounded_channel();
163        self.request_tx
164            .send(ManagerRequest::NewSinkWriter(
165                SinkWriterCoordinationHandle::new(request_stream, response_tx, param, vnode_bitmap),
166            ))
167            .await
168            .map_err(|_| {
169                Status::unavailable(
170                    "unable to send to sink manager worker. The worker may have stopped",
171                )
172            })?;
173
174        Ok(UnboundedReceiverStream::new(response_rx))
175    }
176
177    async fn stop_coordinator(&self, sink_ids: Option<Vec<SinkId>>) {
178        let (tx, rx) = channel();
179        send_await_with_err_check!(
180            self.request_tx,
181            ManagerRequest::StopCoordinator {
182                finish_notifier: tx,
183                sink_ids: sink_ids.clone(),
184            }
185        );
186        if rx.await.is_err() {
187            error!("fail to wait for resetting sink manager worker");
188        }
189        info!("successfully stop coordinator: {:?}", sink_ids);
190    }
191
192    pub async fn reset(&self) {
193        self.stop_coordinator(None).await;
194    }
195
196    pub async fn stop_sink_coordinator(&self, sink_ids: Vec<SinkId>) {
197        self.stop_coordinator(Some(sink_ids)).await;
198    }
199}
200
201struct CoordinatorWorkerHandle {
202    /// Sender to coordinator worker. Drop the sender as a stop signal
203    request_sender: Option<UnboundedSender<SinkWriterCoordinationHandle>>,
204    /// Notify when the coordinator worker stops
205    finish_notifiers: Vec<Sender<()>>,
206}
207
208struct ManagerWorker {
209    request_rx: mpsc::Receiver<ManagerRequest>,
210    // Make it option so that it can be polled with &mut SinkManagerWorker
211    shutdown_rx: Receiver<()>,
212
213    running_coordinator_worker_join_handles:
214        FuturesUnordered<BoxFuture<'static, (SinkId, Result<(), JoinError>)>>,
215    running_coordinator_worker: HashMap<SinkId, CoordinatorWorkerHandle>,
216}
217
218enum ManagerEvent {
219    NewRequest(ManagerRequest),
220    CoordinatorWorkerFinished {
221        sink_id: SinkId,
222        join_result: Result<(), JoinError>,
223    },
224}
225
226trait SpawnCoordinatorFn = FnMut(SinkParam, UnboundedReceiver<SinkWriterCoordinationHandle>) -> JoinHandle<()>
227    + Send
228    + 'static;
229
230impl ManagerWorker {
231    fn new(request_rx: mpsc::Receiver<ManagerRequest>, shutdown_rx: Receiver<()>) -> Self {
232        ManagerWorker {
233            request_rx,
234            shutdown_rx,
235            running_coordinator_worker_join_handles: Default::default(),
236            running_coordinator_worker: Default::default(),
237        }
238    }
239
240    async fn execute(mut self, mut spawn_coordinator_worker: impl SpawnCoordinatorFn) {
241        while let Some(event) = self.next_event().await {
242            match event {
243                ManagerEvent::NewRequest(request) => match request {
244                    ManagerRequest::NewSinkWriter(request) => {
245                        self.handle_new_sink_writer(request, &mut spawn_coordinator_worker)
246                    }
247                    ManagerRequest::StopCoordinator {
248                        finish_notifier,
249                        sink_ids,
250                    } => {
251                        if let Some(sink_ids) = sink_ids {
252                            let mut rxs = Vec::with_capacity(sink_ids.len());
253                            for sink_id in sink_ids {
254                                if let Some(worker_handle) =
255                                    self.running_coordinator_worker.get_mut(&sink_id)
256                                {
257                                    let (tx, rx) = oneshot::channel();
258                                    rxs.push(rx);
259                                    worker_handle.finish_notifiers.push(tx);
260                                    if let Some(sender) = worker_handle.request_sender.take() {
261                                        // drop the sender as a signal to notify the coordinator worker
262                                        // to stop
263                                        drop(sender);
264                                    }
265                                } else {
266                                    debug!(
267                                        "sink coordinator of {} is not running, skip it",
268                                        sink_id
269                                    );
270                                }
271                            }
272                            tokio::spawn(async move {
273                                let notify_res = join_all(rxs).await;
274                                for res in notify_res {
275                                    if let Err(e) = res {
276                                        error!(
277                                            "fail to wait for resetting sink manager worker: {}",
278                                            e.as_report()
279                                        );
280                                    }
281                                }
282                                send_with_err_check!(finish_notifier, ());
283                            });
284                        } else {
285                            self.clean_up().await;
286                            send_with_err_check!(finish_notifier, ());
287                        }
288                    }
289                },
290                ManagerEvent::CoordinatorWorkerFinished {
291                    sink_id,
292                    join_result,
293                } => self.handle_coordinator_finished(sink_id, join_result),
294            }
295        }
296        self.clean_up().await;
297        info!("sink manager worker exited");
298    }
299
300    async fn next_event(&mut self) -> Option<ManagerEvent> {
301        match select(
302            select(
303                pin!(self.request_rx.recv()),
304                pin!(pending_on_none(
305                    self.running_coordinator_worker_join_handles.next()
306                )),
307            ),
308            &mut self.shutdown_rx,
309        )
310        .await
311        {
312            Either::Left((either, _)) => match either {
313                Either::Left((Some(request), _)) => Some(ManagerEvent::NewRequest(request)),
314                Either::Left((None, _)) => None,
315                Either::Right(((sink_id, join_result), _)) => {
316                    Some(ManagerEvent::CoordinatorWorkerFinished {
317                        sink_id,
318                        join_result,
319                    })
320                }
321            },
322            Either::Right(_) => None,
323        }
324    }
325
326    async fn clean_up(&mut self) {
327        info!("sink manager worker start cleaning up");
328        for worker_handle in self.running_coordinator_worker.values_mut() {
329            if let Some(sender) = worker_handle.request_sender.take() {
330                // drop the sender to notify the coordinator worker to stop
331                drop(sender);
332            }
333        }
334        while let Some((sink_id, join_result)) =
335            self.running_coordinator_worker_join_handles.next().await
336        {
337            self.handle_coordinator_finished(sink_id, join_result);
338        }
339        info!("sink manager worker finished cleaning up");
340    }
341
342    fn handle_coordinator_finished(&mut self, sink_id: SinkId, join_result: Result<(), JoinError>) {
343        let worker_handle = self
344            .running_coordinator_worker
345            .remove(&sink_id)
346            .expect("finished coordinator should have an associated worker handle");
347        for finish_notifier in worker_handle.finish_notifiers {
348            send_with_err_check!(finish_notifier, ());
349        }
350        match join_result {
351            Ok(()) => {
352                info!(
353                    id = %sink_id,
354                    "sink coordinator has gracefully finished",
355                );
356            }
357            Err(err) => {
358                error!(
359                    id = %sink_id,
360                    error = %err.as_report(),
361                    "sink coordinator finished with error",
362                );
363            }
364        }
365    }
366
367    fn handle_new_sink_writer(
368        &mut self,
369        new_writer: SinkWriterCoordinationHandle,
370        spawn_coordinator_worker: &mut impl SpawnCoordinatorFn,
371    ) {
372        let param = new_writer.param();
373        let sink_id = param.sink_id;
374
375        let handle = self
376            .running_coordinator_worker
377            .entry(param.sink_id)
378            .or_insert_with(|| {
379                // Launch the coordinator worker task if it is the first
380                let (request_tx, request_rx) = unbounded_channel();
381                let join_handle = spawn_coordinator_worker(param.clone(), request_rx);
382                self.running_coordinator_worker_join_handles.push(
383                    join_handle
384                        .map(move |join_result| (sink_id, join_result))
385                        .boxed(),
386                );
387                CoordinatorWorkerHandle {
388                    request_sender: Some(request_tx),
389                    finish_notifiers: Vec::new(),
390                }
391            });
392
393        if let Some(sender) = handle.request_sender.as_mut() {
394            send_with_err_check!(sender, new_writer);
395        } else {
396            warn!(
397                "handle a new request while the sink coordinator is being stopped: {:?}",
398                param
399            );
400            new_writer.abort(Status::internal("the sink is being stopped"));
401        }
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use std::future::{Future, poll_fn};
408    use std::pin::pin;
409    use std::sync::Arc;
410    use std::sync::atomic::AtomicI32;
411    use std::task::Poll;
412
413    use anyhow::anyhow;
414    use async_trait::async_trait;
415    use futures::future::{join, try_join};
416    use futures::{FutureExt, StreamExt, TryFutureExt};
417    use itertools::Itertools;
418    use rand::seq::SliceRandom;
419    use risingwave_common::bitmap::BitmapBuilder;
420    use risingwave_common::hash::VirtualNode;
421    use risingwave_connector::sink::catalog::{SinkId, SinkType};
422    use risingwave_connector::sink::{
423        SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkError, SinkParam,
424        TwoPhaseCommitCoordinator,
425    };
426    use risingwave_meta_model::SinkSchemachange;
427    use risingwave_pb::connector_service::SinkMetadata;
428    use risingwave_pb::connector_service::sink_metadata::{Metadata, SerializedMetadata};
429    use risingwave_pb::data::PbDataType;
430    use risingwave_pb::data::data_type::PbTypeName;
431    use risingwave_pb::plan_common::PbField;
432    use risingwave_pb::stream_plan::sink_schema_change::Op as SinkSchemachangeOp;
433    use risingwave_pb::stream_plan::{PbSinkAddColumnsOp, PbSinkSchemaChange};
434    use risingwave_rpc_client::CoordinatorStreamHandle;
435    use sea_orm::{ConnectionTrait, Database, DatabaseConnection};
436    use tokio::sync::mpsc::unbounded_channel;
437    use tokio_stream::wrappers::ReceiverStream;
438
439    use crate::manager::sink_coordination::SinkCoordinatorManager;
440    use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
441    use crate::manager::sink_coordination::manager::SinkCommittedEpochSubscriber;
442
443    struct MockSinglePhaseCoordinator<
444        C,
445        F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>,
446    > {
447        context: C,
448        f: F,
449    }
450
451    impl<
452        C: Send + 'static,
453        F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError> + Send + 'static,
454    > MockSinglePhaseCoordinator<C, F>
455    {
456        fn new_coordinator(context: C, f: F) -> SinkCommitCoordinator {
457            SinkCommitCoordinator::SinglePhase(Box::new(MockSinglePhaseCoordinator { context, f }))
458        }
459    }
460
461    #[async_trait]
462    impl<C: Send, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError> + Send>
463        SinglePhaseCommitCoordinator for MockSinglePhaseCoordinator<C, F>
464    {
465        async fn init(&mut self) -> risingwave_connector::sink::Result<()> {
466            Ok(())
467        }
468
469        async fn commit_data(
470            &mut self,
471            epoch: u64,
472            metadata: Vec<SinkMetadata>,
473        ) -> risingwave_connector::sink::Result<()> {
474            (self.f)(epoch, metadata, &mut self.context)
475        }
476
477        async fn commit_schema_change(
478            &mut self,
479            _epoch: u64,
480            _schema_change: PbSinkSchemaChange,
481        ) -> risingwave_connector::sink::Result<()> {
482            unreachable!()
483        }
484    }
485
486    #[tokio::test]
487    async fn test_basic() {
488        let db = prepare_db_backend().await;
489
490        let param = SinkParam {
491            sink_id: SinkId::from(1),
492            sink_name: "test".into(),
493            properties: Default::default(),
494            columns: vec![],
495            downstream_pk: None,
496            sink_type: SinkType::AppendOnly,
497            ignore_delete: false,
498            format_desc: None,
499            db_name: "test".into(),
500            sink_from_name: "test".into(),
501        };
502
503        let epoch0 = 232;
504        let epoch1 = 233;
505        let epoch2 = 234;
506
507        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
508        all_vnode.shuffle(&mut rand::rng());
509        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
510        let build_bitmap = |indexes: &[usize]| {
511            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
512            for i in indexes {
513                builder.set(*i, true);
514            }
515            builder.finish()
516        };
517        let vnode1 = build_bitmap(first);
518        let vnode2 = build_bitmap(second);
519
520        let metadata = [
521            [vec![1u8, 2u8], vec![3u8, 4u8]],
522            [vec![5u8, 6u8], vec![7u8, 8u8]],
523        ];
524        let sender = Arc::new(tokio::sync::Mutex::new(None));
525        let mock_subscriber: SinkCommittedEpochSubscriber = {
526            let captured_sender = sender.clone();
527            Arc::new(move |_sink_id: SinkId| {
528                let (sender, receiver) = unbounded_channel();
529                let captured_sender = captured_sender.clone();
530                async move {
531                    let mut guard = captured_sender.lock().await;
532                    *guard = Some(sender);
533                    Ok((1, receiver))
534                }
535                .boxed()
536            })
537        };
538
539        let (manager, (_join_handle, _stop_tx)) =
540            SinkCoordinatorManager::start_worker_with_spawn_worker({
541                let expected_param = param.clone();
542                let metadata = metadata.clone();
543                let db = db.clone();
544                move |param, new_writer_rx| {
545                    let metadata = metadata.clone();
546                    let expected_param = expected_param.clone();
547                    let db = db.clone();
548                    tokio::spawn({
549                        let subscriber = mock_subscriber.clone();
550                        async move {
551                            // validate the start request
552                            assert_eq!(param, expected_param);
553                            CoordinatorWorker::execute_coordinator(
554                                db,
555                                param.clone(),
556                                new_writer_rx,
557                                MockSinglePhaseCoordinator::new_coordinator(
558                                    0,
559                                    move |epoch, metadata_list, count: &mut usize| {
560                                        *count += 1;
561                                        let mut metadata_list =
562                                            metadata_list
563                                                .into_iter()
564                                                .map(|metadata| match metadata {
565                                                    SinkMetadata {
566                                                        metadata:
567                                                            Some(Metadata::Serialized(
568                                                                SerializedMetadata { metadata },
569                                                            )),
570                                                    } => metadata,
571                                                    _ => unreachable!(),
572                                                })
573                                                .collect_vec();
574                                        metadata_list.sort();
575                                        match *count {
576                                            1 => {
577                                                assert_eq!(epoch, epoch1);
578                                                assert_eq!(2, metadata_list.len());
579                                                assert_eq!(metadata[0][0], metadata_list[0]);
580                                                assert_eq!(metadata[0][1], metadata_list[1]);
581                                            }
582                                            2 => {
583                                                assert_eq!(epoch, epoch2);
584                                                assert_eq!(2, metadata_list.len());
585                                                assert_eq!(metadata[1][0], metadata_list[0]);
586                                                assert_eq!(metadata[1][1], metadata_list[1]);
587                                            }
588                                            _ => unreachable!(),
589                                        }
590                                        Ok(())
591                                    },
592                                ),
593                                subscriber.clone(),
594                            )
595                            .await;
596                        }
597                    })
598                }
599            });
600
601        let build_client = |vnode| async {
602            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
603                Ok(tonic::Response::new(
604                    manager
605                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
606                        .await
607                        .unwrap()
608                        .boxed(),
609                ))
610            })
611            .await
612            .unwrap()
613            .0
614        };
615
616        let (mut client1, mut client2) =
617            join(build_client(vnode1), pin!(build_client(vnode2))).await;
618
619        let (aligned_epoch1, aligned_epoch2) = try_join(
620            client1.align_initial_epoch(epoch0),
621            client2.align_initial_epoch(epoch1),
622        )
623        .await
624        .unwrap();
625        assert_eq!(aligned_epoch1, epoch1);
626        assert_eq!(aligned_epoch2, epoch1);
627
628        {
629            // commit epoch1
630            let mut commit_future = pin!(
631                client2
632                    .commit(
633                        epoch1,
634                        SinkMetadata {
635                            metadata: Some(Metadata::Serialized(SerializedMetadata {
636                                metadata: metadata[0][1].clone(),
637                            })),
638                        },
639                        None,
640                    )
641                    .map(|result| result.unwrap())
642            );
643            assert!(
644                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
645                    .await
646                    .is_pending()
647            );
648            join(
649                commit_future,
650                client1
651                    .commit(
652                        epoch1,
653                        SinkMetadata {
654                            metadata: Some(Metadata::Serialized(SerializedMetadata {
655                                metadata: metadata[0][0].clone(),
656                            })),
657                        },
658                        None,
659                    )
660                    .map(|result| result.unwrap()),
661            )
662            .await;
663        }
664
665        // commit epoch2
666        let mut commit_future = pin!(
667            client1
668                .commit(
669                    epoch2,
670                    SinkMetadata {
671                        metadata: Some(Metadata::Serialized(SerializedMetadata {
672                            metadata: metadata[1][0].clone(),
673                        })),
674                    },
675                    None,
676                )
677                .map(|result| result.unwrap())
678        );
679        assert!(
680            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
681                .await
682                .is_pending()
683        );
684        join(
685            commit_future,
686            client2
687                .commit(
688                    epoch2,
689                    SinkMetadata {
690                        metadata: Some(Metadata::Serialized(SerializedMetadata {
691                            metadata: metadata[1][1].clone(),
692                        })),
693                    },
694                    None,
695                )
696                .map(|result| result.unwrap()),
697        )
698        .await;
699    }
700
701    #[tokio::test]
702    async fn test_single_writer() {
703        let db = prepare_db_backend().await;
704        let param = SinkParam {
705            sink_id: SinkId::from(1),
706            sink_name: "test".into(),
707            properties: Default::default(),
708            columns: vec![],
709            downstream_pk: None,
710            sink_type: SinkType::AppendOnly,
711            ignore_delete: false,
712            format_desc: None,
713            db_name: "test".into(),
714            sink_from_name: "test".into(),
715        };
716
717        let epoch1 = 233;
718        let epoch2 = 234;
719
720        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
721        let build_bitmap = |indexes: &[usize]| {
722            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
723            for i in indexes {
724                builder.set(*i, true);
725            }
726            builder.finish()
727        };
728        let vnode = build_bitmap(&all_vnode);
729
730        let metadata = [vec![1u8, 2u8], vec![3u8, 4u8]];
731        let sender = Arc::new(tokio::sync::Mutex::new(None));
732        let mock_subscriber: SinkCommittedEpochSubscriber = {
733            let captured_sender = sender.clone();
734            Arc::new(move |_sink_id: SinkId| {
735                let (sender, receiver) = unbounded_channel();
736                let captured_sender = captured_sender.clone();
737                async move {
738                    let mut guard = captured_sender.lock().await;
739                    *guard = Some(sender);
740                    Ok((1, receiver))
741                }
742                .boxed()
743            })
744        };
745        let (manager, (_join_handle, _stop_tx)) =
746            SinkCoordinatorManager::start_worker_with_spawn_worker({
747                let expected_param = param.clone();
748                let metadata = metadata.clone();
749                let db = db.clone();
750                move |param, new_writer_rx| {
751                    let metadata = metadata.clone();
752                    let expected_param = expected_param.clone();
753                    let db = db.clone();
754                    tokio::spawn({
755                        let subscriber = mock_subscriber.clone();
756                        async move {
757                            // validate the start request
758                            assert_eq!(param, expected_param);
759                            CoordinatorWorker::execute_coordinator(
760                                db,
761                                param.clone(),
762                                new_writer_rx,
763                                MockSinglePhaseCoordinator::new_coordinator(
764                                    0,
765                                    move |epoch, metadata_list, count: &mut usize| {
766                                        *count += 1;
767                                        let mut metadata_list =
768                                            metadata_list
769                                                .into_iter()
770                                                .map(|metadata| match metadata {
771                                                    SinkMetadata {
772                                                        metadata:
773                                                            Some(Metadata::Serialized(
774                                                                SerializedMetadata { metadata },
775                                                            )),
776                                                    } => metadata,
777                                                    _ => unreachable!(),
778                                                })
779                                                .collect_vec();
780                                        metadata_list.sort();
781                                        match *count {
782                                            1 => {
783                                                assert_eq!(epoch, epoch1);
784                                                assert_eq!(1, metadata_list.len());
785                                                assert_eq!(metadata[0], metadata_list[0]);
786                                            }
787                                            2 => {
788                                                assert_eq!(epoch, epoch2);
789                                                assert_eq!(1, metadata_list.len());
790                                                assert_eq!(metadata[1], metadata_list[0]);
791                                            }
792                                            _ => unreachable!(),
793                                        }
794                                        Ok(())
795                                    },
796                                ),
797                                subscriber.clone(),
798                            )
799                            .await;
800                        }
801                    })
802                }
803            });
804
805        let build_client = |vnode| async {
806            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
807                Ok(tonic::Response::new(
808                    manager
809                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
810                        .await
811                        .unwrap()
812                        .boxed(),
813                ))
814            })
815            .await
816            .unwrap()
817            .0
818        };
819
820        let mut client = build_client(vnode).await;
821
822        let aligned_epoch = client.align_initial_epoch(epoch1).await.unwrap();
823        assert_eq!(aligned_epoch, epoch1);
824
825        client
826            .commit(
827                epoch1,
828                SinkMetadata {
829                    metadata: Some(Metadata::Serialized(SerializedMetadata {
830                        metadata: metadata[0].clone(),
831                    })),
832                },
833                None,
834            )
835            .await
836            .unwrap();
837
838        client
839            .commit(
840                epoch2,
841                SinkMetadata {
842                    metadata: Some(Metadata::Serialized(SerializedMetadata {
843                        metadata: metadata[1].clone(),
844                    })),
845                },
846                None,
847            )
848            .await
849            .unwrap();
850    }
851
852    #[tokio::test]
853    async fn test_partial_commit() {
854        let db = prepare_db_backend().await;
855        let param = SinkParam {
856            sink_id: SinkId::from(1),
857            sink_name: "test".into(),
858            properties: Default::default(),
859            columns: vec![],
860            downstream_pk: None,
861            sink_type: SinkType::AppendOnly,
862            ignore_delete: false,
863            format_desc: None,
864            db_name: "test".into(),
865            sink_from_name: "test".into(),
866        };
867
868        let epoch = 233;
869
870        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
871        all_vnode.shuffle(&mut rand::rng());
872        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
873        let build_bitmap = |indexes: &[usize]| {
874            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
875            for i in indexes {
876                builder.set(*i, true);
877            }
878            builder.finish()
879        };
880        let vnode1 = build_bitmap(first);
881        let vnode2 = build_bitmap(second);
882
883        let sender = Arc::new(tokio::sync::Mutex::new(None));
884        let mock_subscriber: SinkCommittedEpochSubscriber = {
885            let captured_sender = sender.clone();
886            Arc::new(move |_sink_id: SinkId| {
887                let (sender, receiver) = unbounded_channel();
888                let captured_sender = captured_sender.clone();
889                async move {
890                    let mut guard = captured_sender.lock().await;
891                    *guard = Some(sender);
892                    Ok((1, receiver))
893                }
894                .boxed()
895            })
896        };
897        let (manager, (_join_handle, _stop_tx)) =
898            SinkCoordinatorManager::start_worker_with_spawn_worker({
899                let expected_param = param.clone();
900                let db = db.clone();
901                move |param, new_writer_rx| {
902                    let expected_param = expected_param.clone();
903                    let db = db.clone();
904                    tokio::spawn({
905                        let subscriber = mock_subscriber.clone();
906                        async move {
907                            // validate the start request
908                            assert_eq!(param, expected_param);
909                            CoordinatorWorker::execute_coordinator(
910                                db,
911                                param,
912                                new_writer_rx,
913                                MockSinglePhaseCoordinator::new_coordinator(
914                                    (),
915                                    |_, _, _| unreachable!(),
916                                ),
917                                subscriber.clone(),
918                            )
919                            .await;
920                        }
921                    })
922                }
923            });
924
925        let build_client = |vnode| async {
926            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
927                Ok(tonic::Response::new(
928                    manager
929                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
930                        .await
931                        .unwrap()
932                        .boxed(),
933                ))
934            })
935            .await
936            .unwrap()
937            .0
938        };
939
940        let (mut client1, client2) = join(build_client(vnode1), build_client(vnode2)).await;
941
942        // commit epoch
943        let mut commit_future = pin!(client1.commit(
944            epoch,
945            SinkMetadata {
946                metadata: Some(Metadata::Serialized(SerializedMetadata {
947                    metadata: vec![],
948                })),
949            },
950            None,
951        ));
952        assert!(
953            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
954                .await
955                .is_pending()
956        );
957        drop(client2);
958        assert!(commit_future.await.is_err());
959    }
960
961    #[tokio::test]
962    async fn test_fail_commit() {
963        let db = prepare_db_backend().await;
964        let param = SinkParam {
965            sink_id: SinkId::from(1),
966            sink_name: "test".into(),
967            properties: Default::default(),
968            columns: vec![],
969            downstream_pk: None,
970            sink_type: SinkType::AppendOnly,
971            ignore_delete: false,
972            format_desc: None,
973            db_name: "test".into(),
974            sink_from_name: "test".into(),
975        };
976
977        let epoch = 233;
978
979        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
980        all_vnode.shuffle(&mut rand::rng());
981        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
982        let build_bitmap = |indexes: &[usize]| {
983            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
984            for i in indexes {
985                builder.set(*i, true);
986            }
987            builder.finish()
988        };
989        let vnode1 = build_bitmap(first);
990        let vnode2 = build_bitmap(second);
991        let sender = Arc::new(tokio::sync::Mutex::new(None));
992        let mock_subscriber: SinkCommittedEpochSubscriber = {
993            let captured_sender = sender.clone();
994            Arc::new(move |_sink_id: SinkId| {
995                let (sender, receiver) = unbounded_channel();
996                let captured_sender = captured_sender.clone();
997                async move {
998                    let mut guard = captured_sender.lock().await;
999                    *guard = Some(sender);
1000                    Ok((1, receiver))
1001                }
1002                .boxed()
1003            })
1004        };
1005        let (manager, (_join_handle, _stop_tx)) =
1006            SinkCoordinatorManager::start_worker_with_spawn_worker({
1007                let expected_param = param.clone();
1008                let db = db.clone();
1009                move |param, new_writer_rx| {
1010                    let expected_param = expected_param.clone();
1011                    let db = db.clone();
1012                    tokio::spawn({
1013                        let subscriber = mock_subscriber.clone();
1014                        {
1015                            async move {
1016                                // validate the start request
1017                                assert_eq!(param, expected_param);
1018                                CoordinatorWorker::execute_coordinator(
1019                                    db,
1020                                    param,
1021                                    new_writer_rx,
1022                                    MockSinglePhaseCoordinator::new_coordinator((), |_, _, _| {
1023                                        Err(SinkError::Coordinator(anyhow!("failed to commit")))
1024                                    }),
1025                                    subscriber.clone(),
1026                                )
1027                                .await;
1028                            }
1029                        }
1030                    })
1031                }
1032            });
1033
1034        let build_client = |vnode| async {
1035            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1036                Ok(tonic::Response::new(
1037                    manager
1038                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1039                        .await
1040                        .unwrap()
1041                        .boxed(),
1042                ))
1043            })
1044            .await
1045            .unwrap()
1046            .0
1047        };
1048
1049        let (mut client1, mut client2) = join(build_client(vnode1), build_client(vnode2)).await;
1050
1051        // commit epoch
1052        let mut commit_future = pin!(client1.commit(
1053            epoch,
1054            SinkMetadata {
1055                metadata: Some(Metadata::Serialized(SerializedMetadata {
1056                    metadata: vec![],
1057                })),
1058            },
1059            None,
1060        ));
1061        assert!(
1062            poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1063                .await
1064                .is_pending()
1065        );
1066        let (result1, result2) = join(
1067            commit_future,
1068            client2.commit(
1069                epoch,
1070                SinkMetadata {
1071                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1072                        metadata: vec![],
1073                    })),
1074                },
1075                None,
1076            ),
1077        )
1078        .await;
1079        assert!(result1.is_err());
1080        assert!(result2.is_err());
1081    }
1082
1083    #[tokio::test]
1084    async fn test_update_vnode_bitmap() {
1085        let db = prepare_db_backend().await;
1086        let param = SinkParam {
1087            sink_id: SinkId::from(1),
1088            sink_name: "test".into(),
1089            properties: Default::default(),
1090            columns: vec![],
1091            downstream_pk: None,
1092            sink_type: SinkType::AppendOnly,
1093            ignore_delete: false,
1094            format_desc: None,
1095            db_name: "test".into(),
1096            sink_from_name: "test".into(),
1097        };
1098
1099        let epoch1 = 233;
1100        let epoch2 = 234;
1101        let epoch3 = 235;
1102        let epoch4 = 236;
1103
1104        let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1105        all_vnode.shuffle(&mut rand::rng());
1106        let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
1107        let build_bitmap = |indexes: &[usize]| {
1108            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1109            for i in indexes {
1110                builder.set(*i, true);
1111            }
1112            builder.finish()
1113        };
1114        let vnode1 = build_bitmap(first);
1115        let vnode2 = build_bitmap(second);
1116
1117        let metadata = [
1118            [vec![1u8, 2u8], vec![3u8, 4u8]],
1119            [vec![5u8, 6u8], vec![7u8, 8u8]],
1120        ];
1121
1122        let metadata_scale_out = [vec![9u8, 10u8], vec![11u8, 12u8], vec![13u8, 14u8]];
1123        let metadata_scale_in = [vec![13u8, 14u8], vec![15u8, 16u8]];
1124        let sender = Arc::new(tokio::sync::Mutex::new(None));
1125        let mock_subscriber: SinkCommittedEpochSubscriber = {
1126            let captured_sender = sender.clone();
1127            Arc::new(move |_sink_id: SinkId| {
1128                let (sender, receiver) = unbounded_channel();
1129                let captured_sender = captured_sender.clone();
1130                async move {
1131                    let mut guard = captured_sender.lock().await;
1132                    *guard = Some(sender);
1133                    Ok((1, receiver))
1134                }
1135                .boxed()
1136            })
1137        };
1138        let (manager, (_join_handle, _stop_tx)) =
1139            SinkCoordinatorManager::start_worker_with_spawn_worker({
1140                let expected_param = param.clone();
1141                let metadata = metadata.clone();
1142                let metadata_scale_out = metadata_scale_out.clone();
1143                let metadata_scale_in = metadata_scale_in.clone();
1144                let db = db.clone();
1145                move |param, new_writer_rx| {
1146                    let metadata = metadata.clone();
1147                    let metadata_scale_out = metadata_scale_out.clone();
1148                    let metadata_scale_in = metadata_scale_in.clone();
1149                    let expected_param = expected_param.clone();
1150                    let db = db.clone();
1151                    tokio::spawn({
1152                        let subscriber = mock_subscriber.clone();
1153                        async move {
1154                            // validate the start request
1155                            assert_eq!(param, expected_param);
1156                            CoordinatorWorker::execute_coordinator(
1157                                db,
1158                                param.clone(),
1159                                new_writer_rx,
1160                                MockSinglePhaseCoordinator::new_coordinator(
1161                                    0,
1162                                    move |epoch, metadata_list, count: &mut usize| {
1163                                        *count += 1;
1164                                        let mut metadata_list =
1165                                            metadata_list
1166                                                .into_iter()
1167                                                .map(|metadata| match metadata {
1168                                                    SinkMetadata {
1169                                                        metadata:
1170                                                            Some(Metadata::Serialized(
1171                                                                SerializedMetadata { metadata },
1172                                                            )),
1173                                                    } => metadata,
1174                                                    _ => unreachable!(),
1175                                                })
1176                                                .collect_vec();
1177                                        metadata_list.sort();
1178                                        let (expected_epoch, expected_metadata_list) = match *count
1179                                        {
1180                                            1 => (epoch1, metadata[0].as_slice()),
1181                                            2 => (epoch2, metadata[1].as_slice()),
1182                                            3 => (epoch3, metadata_scale_out.as_slice()),
1183                                            4 => (epoch4, metadata_scale_in.as_slice()),
1184                                            _ => unreachable!(),
1185                                        };
1186                                        assert_eq!(expected_epoch, epoch);
1187                                        assert_eq!(expected_metadata_list, &metadata_list);
1188                                        Ok(())
1189                                    },
1190                                ),
1191                                subscriber.clone(),
1192                            )
1193                            .await;
1194                        }
1195                    })
1196                }
1197            });
1198
1199        let build_client = |vnode| async {
1200            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1201                Ok(tonic::Response::new(
1202                    manager
1203                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1204                        .await
1205                        .unwrap()
1206                        .boxed(),
1207                ))
1208            })
1209            .await
1210        };
1211
1212        let ((mut client1, _), (mut client2, _)) =
1213            try_join(build_client(vnode1), pin!(build_client(vnode2)))
1214                .await
1215                .unwrap();
1216
1217        let (aligned_epoch1, aligned_epoch2) = try_join(
1218            client1.align_initial_epoch(epoch1),
1219            client2.align_initial_epoch(epoch1),
1220        )
1221        .await
1222        .unwrap();
1223        assert_eq!(aligned_epoch1, epoch1);
1224        assert_eq!(aligned_epoch2, epoch1);
1225
1226        {
1227            // commit epoch1
1228            let mut commit_future = pin!(
1229                client2
1230                    .commit(
1231                        epoch1,
1232                        SinkMetadata {
1233                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1234                                metadata: metadata[0][1].clone(),
1235                            })),
1236                        },
1237                        None,
1238                    )
1239                    .map(|result| result.unwrap())
1240            );
1241            assert!(
1242                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1243                    .await
1244                    .is_pending()
1245            );
1246            join(
1247                commit_future,
1248                client1
1249                    .commit(
1250                        epoch1,
1251                        SinkMetadata {
1252                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1253                                metadata: metadata[0][0].clone(),
1254                            })),
1255                        },
1256                        None,
1257                    )
1258                    .map(|result| result.unwrap()),
1259            )
1260            .await;
1261        }
1262
1263        let (vnode1, vnode2, vnode3) = {
1264            let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1265            let (second, third) = second.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1266            (
1267                build_bitmap(first),
1268                build_bitmap(second),
1269                build_bitmap(third),
1270            )
1271        };
1272
1273        let mut build_client3_future = pin!(build_client(vnode3));
1274        assert!(
1275            poll_fn(|cx| Poll::Ready(build_client3_future.as_mut().poll(cx)))
1276                .await
1277                .is_pending()
1278        );
1279        let mut client3;
1280        {
1281            {
1282                // commit epoch2
1283                let mut commit_future = pin!(
1284                    client1
1285                        .commit(
1286                            epoch2,
1287                            SinkMetadata {
1288                                metadata: Some(Metadata::Serialized(SerializedMetadata {
1289                                    metadata: metadata[1][0].clone(),
1290                                })),
1291                            },
1292                            None,
1293                        )
1294                        .map_err(Into::into)
1295                );
1296                assert!(
1297                    poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1298                        .await
1299                        .is_pending()
1300                );
1301                try_join(
1302                    commit_future,
1303                    client2.commit(
1304                        epoch2,
1305                        SinkMetadata {
1306                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1307                                metadata: metadata[1][1].clone(),
1308                            })),
1309                        },
1310                        None,
1311                    ),
1312                )
1313                .await
1314                .unwrap();
1315            }
1316
1317            client3 = {
1318                let (
1319                    (client3, init_epoch),
1320                    (update_vnode_bitmap_epoch1, update_vnode_bitmap_epoch2),
1321                ) = try_join(
1322                    build_client3_future,
1323                    try_join(
1324                        client1.update_vnode_bitmap(&vnode1),
1325                        client2.update_vnode_bitmap(&vnode2),
1326                    )
1327                    .map_err(Into::into),
1328                )
1329                .await
1330                .unwrap();
1331                assert_eq!(init_epoch, Some(epoch2));
1332                assert_eq!(update_vnode_bitmap_epoch1, epoch2);
1333                assert_eq!(update_vnode_bitmap_epoch2, epoch2);
1334                client3
1335            };
1336            let mut commit_future3 = pin!(client3.commit(
1337                epoch3,
1338                SinkMetadata {
1339                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1340                        metadata: metadata_scale_out[2].clone(),
1341                    })),
1342                },
1343                None,
1344            ));
1345            assert!(
1346                poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1347                    .await
1348                    .is_pending()
1349            );
1350            let mut commit_future1 = pin!(client1.commit(
1351                epoch3,
1352                SinkMetadata {
1353                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1354                        metadata: metadata_scale_out[0].clone(),
1355                    })),
1356                },
1357                None,
1358            ));
1359            assert!(
1360                poll_fn(|cx| Poll::Ready(commit_future1.as_mut().poll(cx)))
1361                    .await
1362                    .is_pending()
1363            );
1364            assert!(
1365                poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1366                    .await
1367                    .is_pending()
1368            );
1369            try_join(
1370                client2.commit(
1371                    epoch3,
1372                    SinkMetadata {
1373                        metadata: Some(Metadata::Serialized(SerializedMetadata {
1374                            metadata: metadata_scale_out[1].clone(),
1375                        })),
1376                    },
1377                    None,
1378                ),
1379                try_join(commit_future1, commit_future3),
1380            )
1381            .await
1382            .unwrap();
1383        }
1384
1385        let (vnode2, vnode3) = {
1386            let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1387            (build_bitmap(first), build_bitmap(second))
1388        };
1389
1390        {
1391            let (_, (update_vnode_bitmap_epoch2, update_vnode_bitmap_epoch3)) = try_join(
1392                client1.stop(),
1393                try_join(
1394                    client2.update_vnode_bitmap(&vnode2),
1395                    client3.update_vnode_bitmap(&vnode3),
1396                ),
1397            )
1398            .await
1399            .unwrap();
1400            assert_eq!(update_vnode_bitmap_epoch2, epoch3);
1401            assert_eq!(update_vnode_bitmap_epoch3, epoch3);
1402        }
1403
1404        {
1405            let mut commit_future = pin!(
1406                client2
1407                    .commit(
1408                        epoch4,
1409                        SinkMetadata {
1410                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1411                                metadata: metadata_scale_in[0].clone(),
1412                            })),
1413                        },
1414                        None,
1415                    )
1416                    .map(|result| result.unwrap())
1417            );
1418            assert!(
1419                poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1420                    .await
1421                    .is_pending()
1422            );
1423            join(
1424                commit_future,
1425                client3
1426                    .commit(
1427                        epoch4,
1428                        SinkMetadata {
1429                            metadata: Some(Metadata::Serialized(SerializedMetadata {
1430                                metadata: metadata_scale_in[1].clone(),
1431                            })),
1432                        },
1433                        None,
1434                    )
1435                    .map(|result| result.unwrap()),
1436            )
1437            .await;
1438        }
1439    }
1440
1441    struct MockTwoPhaseCoordinator<
1442        P: FnMut(
1443            u64,
1444            Vec<SinkMetadata>,
1445            Option<PbSinkSchemaChange>,
1446        ) -> Result<Option<Vec<u8>>, SinkError>,
1447        CD: FnMut(u64, Vec<u8>) -> Result<(), SinkError>,
1448        CS: FnMut(u64, PbSinkSchemaChange) -> Result<(), SinkError>,
1449    > {
1450        pre_commit: P,
1451        commit_data: CD,
1452        commit_schema_change: CS,
1453    }
1454
1455    impl<
1456        P: FnMut(
1457                u64,
1458                Vec<SinkMetadata>,
1459                Option<PbSinkSchemaChange>,
1460            ) -> Result<Option<Vec<u8>>, SinkError>
1461            + Send
1462            + 'static,
1463        CD: FnMut(u64, Vec<u8>) -> Result<(), SinkError> + Send + 'static,
1464        CS: FnMut(u64, PbSinkSchemaChange) -> Result<(), SinkError> + Send + 'static,
1465    > MockTwoPhaseCoordinator<P, CD, CS>
1466    {
1467        fn new_coordinator(
1468            pre_commit: P,
1469            commit_data: CD,
1470            commit_schema_change: CS,
1471        ) -> SinkCommitCoordinator {
1472            SinkCommitCoordinator::TwoPhase(Box::new(MockTwoPhaseCoordinator {
1473                pre_commit,
1474                commit_data,
1475                commit_schema_change,
1476            }))
1477        }
1478    }
1479
1480    #[async_trait]
1481    impl<
1482        P: FnMut(
1483                u64,
1484                Vec<SinkMetadata>,
1485                Option<PbSinkSchemaChange>,
1486            ) -> Result<Option<Vec<u8>>, SinkError>
1487            + Send
1488            + 'static,
1489        CD: FnMut(u64, Vec<u8>) -> Result<(), SinkError> + Send + 'static,
1490        CS: FnMut(u64, PbSinkSchemaChange) -> Result<(), SinkError> + Send + 'static,
1491    > TwoPhaseCommitCoordinator for MockTwoPhaseCoordinator<P, CD, CS>
1492    {
1493        async fn init(&mut self) -> risingwave_connector::sink::Result<()> {
1494            Ok(())
1495        }
1496
1497        async fn pre_commit(
1498            &mut self,
1499            epoch: u64,
1500            metadata: Vec<SinkMetadata>,
1501            schema_change: Option<PbSinkSchemaChange>,
1502        ) -> risingwave_connector::sink::Result<Option<Vec<u8>>> {
1503            (self.pre_commit)(epoch, metadata, schema_change)
1504        }
1505
1506        async fn commit_data(
1507            &mut self,
1508            epoch: u64,
1509            commit_metadata: Vec<u8>,
1510        ) -> risingwave_connector::sink::Result<()> {
1511            (self.commit_data)(epoch, commit_metadata)
1512        }
1513
1514        async fn commit_schema_change(
1515            &mut self,
1516            epoch: u64,
1517            schema_change: PbSinkSchemaChange,
1518        ) -> risingwave_connector::sink::Result<()> {
1519            (self.commit_schema_change)(epoch, schema_change)
1520        }
1521
1522        async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
1523            tracing::debug!("abort called");
1524        }
1525    }
1526
1527    async fn prepare_db_backend() -> DatabaseConnection {
1528        let db: DatabaseConnection = Database::connect("sqlite::memory:").await.unwrap();
1529        let ddl = "
1530            CREATE TABLE IF NOT EXISTS pending_sink_state (
1531                sink_id i32 NOT NULL,
1532                epoch i64 NOT NULL,
1533                sink_state STRING NOT NULL,
1534                metadata BLOB NOT NULL,
1535                schema_change BLOB,
1536                PRIMARY KEY (sink_id, epoch)
1537            )
1538        ";
1539        db.execute(sea_orm::Statement::from_string(
1540            db.get_database_backend(),
1541            ddl.to_owned(),
1542        ))
1543        .await
1544        .unwrap();
1545        db
1546    }
1547
1548    async fn list_rows(
1549        db: &DatabaseConnection,
1550    ) -> Vec<(i32, i64, String, Vec<u8>, Option<PbSinkSchemaChange>)> {
1551        let sql =
1552            "SELECT sink_id, epoch, sink_state, metadata, schema_change FROM pending_sink_state";
1553        let rows = db
1554            .query_all(sea_orm::Statement::from_string(
1555                db.get_database_backend(),
1556                sql.to_owned(),
1557            ))
1558            .await
1559            .unwrap();
1560        rows.into_iter()
1561            .map(|row| {
1562                (
1563                    row.try_get("", "sink_id").unwrap(),
1564                    row.try_get("", "epoch").unwrap(),
1565                    row.try_get("", "sink_state").unwrap(),
1566                    row.try_get("", "metadata").unwrap(),
1567                    row.try_get::<Option<SinkSchemachange>>("", "schema_change")
1568                        .unwrap()
1569                        .map(|v| v.to_protobuf()),
1570                )
1571            })
1572            .collect()
1573    }
1574
1575    async fn set_epoch_aborted(db: &DatabaseConnection, sink_id: SinkId, epoch: u64) {
1576        let sql = format!(
1577            "UPDATE pending_sink_state SET sink_state = 'ABORTED' WHERE sink_id = {} AND epoch = {}",
1578            sink_id, epoch as i64
1579        );
1580        db.execute(sea_orm::Statement::from_string(
1581            db.get_database_backend(),
1582            sql,
1583        ))
1584        .await
1585        .unwrap();
1586    }
1587
1588    #[tokio::test]
1589    async fn test_pre_commit_failed() {
1590        let db = prepare_db_backend().await;
1591
1592        let param = SinkParam {
1593            sink_id: SinkId::from(1),
1594            sink_name: "test".into(),
1595            properties: Default::default(),
1596            columns: vec![],
1597            downstream_pk: None,
1598            sink_type: SinkType::AppendOnly,
1599            ignore_delete: false,
1600            format_desc: None,
1601            db_name: "test".into(),
1602            sink_from_name: "test".into(),
1603        };
1604
1605        let epoch1 = 233;
1606
1607        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1608        let build_bitmap = |indexes: &[usize]| {
1609            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1610            for i in indexes {
1611                builder.set(*i, true);
1612            }
1613            builder.finish()
1614        };
1615        let vnode = build_bitmap(&all_vnode);
1616
1617        let metadata = vec![1u8, 2u8];
1618        let sender = Arc::new(tokio::sync::Mutex::new(None));
1619        let mock_subscriber: SinkCommittedEpochSubscriber = {
1620            let captured_sender = sender.clone();
1621            Arc::new(move |_sink_id: SinkId| {
1622                let (sender, receiver) = unbounded_channel();
1623                let captured_sender = captured_sender.clone();
1624                async move {
1625                    let mut guard = captured_sender.lock().await;
1626                    *guard = Some(sender);
1627                    Ok((epoch1, receiver))
1628                }
1629                .boxed()
1630            })
1631        };
1632
1633        let (manager, (_join_handle, _stop_tx)) =
1634            SinkCoordinatorManager::start_worker_with_spawn_worker({
1635                let expected_param = param.clone();
1636                let db = db.clone();
1637                move |param, new_writer_rx| {
1638                    let expected_param = expected_param.clone();
1639                    let db = db.clone();
1640                    tokio::spawn({
1641                        let subscriber = mock_subscriber.clone();
1642                        async move {
1643                            // validate the start request
1644                            assert_eq!(param, expected_param);
1645                            CoordinatorWorker::execute_coordinator(
1646                                db,
1647                                param.clone(),
1648                                new_writer_rx,
1649                                MockTwoPhaseCoordinator::new_coordinator(
1650                                    move |_epoch, _metadata_list, _schema_change| {
1651                                        Err(SinkError::Coordinator(anyhow!("failed to pre commit")))
1652                                    },
1653                                    move |_epoch, _commit_metadata| unreachable!(),
1654                                    move |_epoch, _schema_change| unreachable!(),
1655                                ),
1656                                subscriber.clone(),
1657                            )
1658                            .await;
1659                        }
1660                    })
1661                }
1662            });
1663
1664        let build_client = |vnode| async {
1665            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1666                Ok(tonic::Response::new(
1667                    manager
1668                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1669                        .await
1670                        .unwrap()
1671                        .boxed(),
1672                ))
1673            })
1674            .await
1675            .unwrap()
1676            .0
1677        };
1678
1679        let mut client = build_client(vnode).await;
1680
1681        let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
1682        assert_eq!(aligned_epoch, 1);
1683
1684        let commit_result = client
1685            .commit(
1686                epoch1,
1687                SinkMetadata {
1688                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1689                        metadata: metadata.clone(),
1690                    })),
1691                },
1692                None,
1693            )
1694            .await;
1695        assert!(commit_result.is_err());
1696
1697        let rows = list_rows(&db).await;
1698        assert!(rows.is_empty());
1699    }
1700
1701    #[tokio::test]
1702    async fn test_waiting_on_checkpoint() {
1703        let db = prepare_db_backend().await;
1704
1705        let param = SinkParam {
1706            sink_id: SinkId::from(1),
1707            sink_name: "test".into(),
1708            properties: Default::default(),
1709            columns: vec![],
1710            downstream_pk: None,
1711            sink_type: SinkType::AppendOnly,
1712            ignore_delete: false,
1713            format_desc: None,
1714            db_name: "test".into(),
1715            sink_from_name: "test".into(),
1716        };
1717
1718        let epoch0 = 232;
1719        let epoch1 = 233;
1720
1721        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1722        let build_bitmap = |indexes: &[usize]| {
1723            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1724            for i in indexes {
1725                builder.set(*i, true);
1726            }
1727            builder.finish()
1728        };
1729        let vnode = build_bitmap(&all_vnode);
1730
1731        let metadata = vec![1u8, 2u8];
1732
1733        let sender = Arc::new(tokio::sync::Mutex::new(None));
1734        let mock_subscriber: SinkCommittedEpochSubscriber = {
1735            let captured_sender = sender.clone();
1736            Arc::new(move |_sink_id: SinkId| {
1737                let (sender, receiver) = unbounded_channel();
1738                let captured_sender = captured_sender.clone();
1739                async move {
1740                    let mut guard = captured_sender.lock().await;
1741                    *guard = Some(sender);
1742                    Ok((epoch0, receiver))
1743                }
1744                .boxed()
1745            })
1746        };
1747
1748        let (manager, (_join_handle, _stop_tx)) =
1749            SinkCoordinatorManager::start_worker_with_spawn_worker({
1750                let expected_param = param.clone();
1751                let metadata = metadata.clone();
1752                let db = db.clone();
1753                move |param, new_writer_rx| {
1754                    let metadata = metadata.clone();
1755                    let expected_param = expected_param.clone();
1756                    let db = db.clone();
1757                    tokio::spawn({
1758                        let subscriber = mock_subscriber.clone();
1759                        async move {
1760                            // validate the start request
1761                            assert_eq!(param, expected_param);
1762                            CoordinatorWorker::execute_coordinator(
1763                                db,
1764                                param.clone(),
1765                                new_writer_rx,
1766                                MockTwoPhaseCoordinator::new_coordinator(
1767                                    move |_epoch, metadata_list, _schema_change| {
1768                                        let metadata =
1769                                            metadata_list.into_iter().exactly_one().unwrap();
1770                                        Ok(match metadata.metadata {
1771                                            Some(Metadata::Serialized(SerializedMetadata {
1772                                                metadata,
1773                                            })) => Some(metadata),
1774                                            _ => unreachable!(),
1775                                        })
1776                                    },
1777                                    move |_epoch, commit_metadata| {
1778                                        assert_eq!(commit_metadata, metadata);
1779                                        Ok(())
1780                                    },
1781                                    move |_epoch, _schema_change| unreachable!(),
1782                                ),
1783                                subscriber.clone(),
1784                            )
1785                            .await;
1786                        }
1787                    })
1788                }
1789            });
1790
1791        let build_client = |vnode| async {
1792            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1793                Ok(tonic::Response::new(
1794                    manager
1795                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1796                        .await
1797                        .unwrap()
1798                        .boxed(),
1799                ))
1800            })
1801            .await
1802            .unwrap()
1803            .0
1804        };
1805
1806        let mut client = build_client(vnode).await;
1807
1808        let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
1809        assert_eq!(aligned_epoch, 1);
1810
1811        client
1812            .commit(
1813                epoch1,
1814                SinkMetadata {
1815                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1816                        metadata: metadata.clone(),
1817                    })),
1818                },
1819                None,
1820            )
1821            .await
1822            .unwrap();
1823
1824        {
1825            let rows = list_rows(&db).await;
1826            assert_eq!(rows.len(), 1);
1827            assert_eq!(rows[0].1, epoch1 as i64);
1828            assert_eq!(rows[0].2, "PENDING");
1829
1830            let guard = sender.lock().await;
1831            let sender = guard.as_ref().unwrap().clone();
1832            sender.send(233).unwrap();
1833        }
1834
1835        // wait max 5 seconds for the commit to be processed
1836        for _ in 0..50 {
1837            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1838            let rows = list_rows(&db).await;
1839            if rows[0].2 == "COMMITTED" {
1840                break;
1841            }
1842        }
1843
1844        {
1845            let rows = list_rows(&db).await;
1846            assert_eq!(rows.len(), 1);
1847            assert_eq!(rows[0].1, epoch1 as i64);
1848            assert_eq!(rows[0].2, "COMMITTED");
1849        }
1850    }
1851
1852    #[tokio::test]
1853    async fn test_commit_retry_loop() {
1854        let db = prepare_db_backend().await;
1855
1856        let param = SinkParam {
1857            sink_id: SinkId::from(1),
1858            sink_name: "test".into(),
1859            properties: Default::default(),
1860            columns: vec![],
1861            downstream_pk: None,
1862            sink_type: SinkType::AppendOnly,
1863            ignore_delete: false,
1864            format_desc: None,
1865            db_name: "test".into(),
1866            sink_from_name: "test".into(),
1867        };
1868
1869        let epoch1 = 233;
1870
1871        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1872        let build_bitmap = |indexes: &[usize]| {
1873            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1874            for i in indexes {
1875                builder.set(*i, true);
1876            }
1877            builder.finish()
1878        };
1879        let vnode = build_bitmap(&all_vnode);
1880
1881        let metadata = vec![1u8, 2u8];
1882        let sender = Arc::new(tokio::sync::Mutex::new(None));
1883        let mock_subscriber: SinkCommittedEpochSubscriber = {
1884            let captured_sender = sender.clone();
1885            Arc::new(move |_sink_id: SinkId| {
1886                let (sender, receiver) = unbounded_channel();
1887                let captured_sender = captured_sender.clone();
1888                async move {
1889                    let mut guard = captured_sender.lock().await;
1890                    *guard = Some(sender);
1891                    Ok((epoch1, receiver))
1892                }
1893                .boxed()
1894            })
1895        };
1896
1897        let commit_attempt = Arc::new(AtomicI32::new(0));
1898
1899        let (manager, (_join_handle, _stop_tx)) =
1900            SinkCoordinatorManager::start_worker_with_spawn_worker({
1901                let expected_param = param.clone();
1902                let metadata = metadata.clone();
1903                let db = db.clone();
1904                let commit_attempt = commit_attempt.clone();
1905                move |param, new_writer_rx| {
1906                    let metadata = metadata.clone();
1907                    let expected_param = expected_param.clone();
1908                    let db = db.clone();
1909                    let commit_attempt = commit_attempt.clone();
1910                    tokio::spawn({
1911                        let subscriber = mock_subscriber.clone();
1912                        async move {
1913                            // validate the start request
1914                            assert_eq!(param, expected_param);
1915                            CoordinatorWorker::execute_coordinator(
1916                                db,
1917                                param.clone(),
1918                                new_writer_rx,
1919                                MockTwoPhaseCoordinator::new_coordinator(
1920                                    move |_epoch, metadata_list, _schema_change| {
1921                                        let metadata =
1922                                            metadata_list.into_iter().exactly_one().unwrap();
1923                                        Ok(match metadata.metadata {
1924                                            Some(Metadata::Serialized(SerializedMetadata {
1925                                                metadata,
1926                                            })) => Some(metadata),
1927                                            _ => unreachable!(),
1928                                        })
1929                                    },
1930                                    move |_epoch, commit_metadata| {
1931                                        assert_eq!(commit_metadata, metadata);
1932                                        if commit_attempt
1933                                            .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
1934                                            < 2
1935                                        {
1936                                            Err(SinkError::Coordinator(anyhow!("failed to commit")))
1937                                        } else {
1938                                            Ok(())
1939                                        }
1940                                    },
1941                                    move |_epoch, _schema_change| unreachable!(),
1942                                ),
1943                                subscriber.clone(),
1944                            )
1945                            .await;
1946                        }
1947                    })
1948                }
1949            });
1950
1951        let build_client = |vnode| async {
1952            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1953                Ok(tonic::Response::new(
1954                    manager
1955                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1956                        .await
1957                        .unwrap()
1958                        .boxed(),
1959                ))
1960            })
1961            .await
1962            .unwrap()
1963            .0
1964        };
1965
1966        let mut client = build_client(vnode).await;
1967
1968        let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
1969        assert_eq!(aligned_epoch, 1);
1970
1971        client
1972            .commit(
1973                epoch1,
1974                SinkMetadata {
1975                    metadata: Some(Metadata::Serialized(SerializedMetadata {
1976                        metadata: metadata.clone(),
1977                    })),
1978                },
1979                None,
1980            )
1981            .await
1982            .unwrap();
1983
1984        // wait max 10 seconds for the commit to be processed
1985        for _ in 0..100 {
1986            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1987            let rows = list_rows(&db).await;
1988            if rows[0].2 == "COMMITTED" {
1989                break;
1990            }
1991        }
1992
1993        assert_eq!(commit_attempt.load(std::sync::atomic::Ordering::SeqCst), 3);
1994
1995        {
1996            let rows = list_rows(&db).await;
1997            assert_eq!(rows.len(), 1);
1998            assert_eq!(rows[0].1, epoch1 as i64);
1999            assert_eq!(rows[0].2, "COMMITTED");
2000        }
2001    }
2002
2003    #[tokio::test]
2004    async fn test_aborted() {
2005        let db = prepare_db_backend().await;
2006
2007        let param = SinkParam {
2008            sink_id: SinkId::from(1),
2009            sink_name: "test".into(),
2010            properties: Default::default(),
2011            columns: vec![],
2012            downstream_pk: None,
2013            sink_type: SinkType::AppendOnly,
2014            ignore_delete: false,
2015            format_desc: None,
2016            db_name: "test".into(),
2017            sink_from_name: "test".into(),
2018        };
2019
2020        let epoch0 = 232;
2021        let epoch1 = 233;
2022
2023        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
2024        let build_bitmap = |indexes: &[usize]| {
2025            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
2026            for i in indexes {
2027                builder.set(*i, true);
2028            }
2029            builder.finish()
2030        };
2031        let vnode = build_bitmap(&all_vnode);
2032
2033        let metadata = vec![1u8, 2u8];
2034
2035        let sender = Arc::new(tokio::sync::Mutex::new(None));
2036        let mock_subscriber: SinkCommittedEpochSubscriber = {
2037            let captured_sender = sender.clone();
2038            Arc::new(move |_sink_id: SinkId| {
2039                let (sender, receiver) = unbounded_channel();
2040                let captured_sender = captured_sender.clone();
2041                async move {
2042                    let mut guard = captured_sender.lock().await;
2043                    *guard = Some(sender);
2044                    Ok((epoch0, receiver))
2045                }
2046                .boxed()
2047            })
2048        };
2049
2050        let (manager, (_join_handle, _stop_tx)) =
2051            SinkCoordinatorManager::start_worker_with_spawn_worker({
2052                let expected_param = param.clone();
2053                let metadata = metadata.clone();
2054                let db = db.clone();
2055                move |param, new_writer_rx| {
2056                    let metadata = metadata.clone();
2057                    let expected_param = expected_param.clone();
2058                    let db = db.clone();
2059                    tokio::spawn({
2060                        let subscriber = mock_subscriber.clone();
2061                        async move {
2062                            // validate the start request
2063                            assert_eq!(param, expected_param);
2064                            CoordinatorWorker::execute_coordinator(
2065                                db,
2066                                param.clone(),
2067                                new_writer_rx,
2068                                MockTwoPhaseCoordinator::new_coordinator(
2069                                    move |_epoch, metadata_list, _schema_change| {
2070                                        let metadata =
2071                                            metadata_list.into_iter().exactly_one().unwrap();
2072                                        Ok(match metadata.metadata {
2073                                            Some(Metadata::Serialized(SerializedMetadata {
2074                                                metadata,
2075                                            })) => Some(metadata),
2076                                            _ => unreachable!(),
2077                                        })
2078                                    },
2079                                    move |_epoch, commit_metadata| {
2080                                        assert_eq!(commit_metadata, metadata);
2081                                        Ok(())
2082                                    },
2083                                    move |_epoch, _schema_change| unreachable!(),
2084                                ),
2085                                subscriber.clone(),
2086                            )
2087                            .await;
2088                        }
2089                    })
2090                }
2091            });
2092
2093        let build_client = |vnode| async {
2094            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
2095                Ok(tonic::Response::new(
2096                    manager
2097                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
2098                        .await
2099                        .unwrap()
2100                        .boxed(),
2101                ))
2102            })
2103            .await
2104            .unwrap()
2105            .0
2106        };
2107
2108        let mut client = build_client(vnode.clone()).await;
2109
2110        let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
2111        assert_eq!(aligned_epoch, 1);
2112
2113        client
2114            .commit(
2115                epoch1,
2116                SinkMetadata {
2117                    metadata: Some(Metadata::Serialized(SerializedMetadata {
2118                        metadata: metadata.clone(),
2119                    })),
2120                },
2121                None,
2122            )
2123            .await
2124            .unwrap();
2125
2126        manager.stop_sink_coordinator(vec![SinkId::from(1)]).await;
2127
2128        {
2129            let rows = list_rows(&db).await;
2130            assert_eq!(rows.len(), 1);
2131            assert_eq!(rows[0].1, epoch1 as i64);
2132            assert_eq!(rows[0].2, "PENDING");
2133
2134            set_epoch_aborted(&db, SinkId::from(1), epoch1).await;
2135            let rows = list_rows(&db).await;
2136            assert_eq!(rows.len(), 1);
2137            assert_eq!(rows[0].1, epoch1 as i64);
2138            assert_eq!(rows[0].2, "ABORTED");
2139        }
2140
2141        let mut client = build_client(vnode).await;
2142
2143        let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
2144        assert_eq!(aligned_epoch, 1);
2145
2146        {
2147            let rows = list_rows(&db).await;
2148            assert!(rows.is_empty());
2149        }
2150    }
2151
2152    #[tokio::test]
2153    async fn test_flush_when_reschedule() {
2154        let db = prepare_db_backend().await;
2155
2156        let param = SinkParam {
2157            sink_id: SinkId::from(1),
2158            sink_name: "test".into(),
2159            properties: Default::default(),
2160            columns: vec![],
2161            downstream_pk: None,
2162            sink_type: SinkType::AppendOnly,
2163            ignore_delete: false,
2164            format_desc: None,
2165            db_name: "test".into(),
2166            sink_from_name: "test".into(),
2167        };
2168
2169        let epoch0 = 232;
2170        let epoch1 = 233;
2171
2172        let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
2173        let build_bitmap = |indexes: &[usize]| {
2174            let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
2175            for i in indexes {
2176                builder.set(*i, true);
2177            }
2178            builder.finish()
2179        };
2180        let vnode = build_bitmap(&all_vnode);
2181
2182        let metadata = vec![1u8, 2u8];
2183        let schema_change = PbSinkSchemaChange {
2184            original_schema: vec![PbField {
2185                data_type: Some(PbDataType {
2186                    type_name: PbTypeName::Int32 as i32,
2187                    ..Default::default()
2188                }),
2189                name: "col_v1".into(),
2190            }],
2191            op: Some(SinkSchemachangeOp::AddColumns(PbSinkAddColumnsOp {
2192                fields: vec![PbField {
2193                    data_type: Some(PbDataType {
2194                        type_name: PbTypeName::Varchar as i32,
2195                        ..Default::default()
2196                    }),
2197                    name: "new_col".into(),
2198                }],
2199            })),
2200        };
2201
2202        let sender = Arc::new(tokio::sync::Mutex::new(None));
2203        let mock_subscriber: SinkCommittedEpochSubscriber = {
2204            let captured_sender = sender.clone();
2205            Arc::new(move |_sink_id: SinkId| {
2206                let (sender, receiver) = unbounded_channel();
2207                let captured_sender = captured_sender.clone();
2208                async move {
2209                    let mut guard = captured_sender.lock().await;
2210                    *guard = Some(sender);
2211                    Ok((epoch0, receiver))
2212                }
2213                .boxed()
2214            })
2215        };
2216
2217        let (manager, (_join_handle, _stop_tx)) =
2218            SinkCoordinatorManager::start_worker_with_spawn_worker({
2219                let expected_param = param.clone();
2220                let metadata = metadata.clone();
2221                let schema_change = schema_change.clone();
2222                let db = db.clone();
2223                move |param, new_writer_rx| {
2224                    let metadata = metadata.clone();
2225                    let schema_change_for_pre_commit = schema_change.clone();
2226                    let schema_change_for_commit = schema_change.clone();
2227                    let expected_param = expected_param.clone();
2228                    let db = db.clone();
2229                    tokio::spawn({
2230                        let subscriber = mock_subscriber.clone();
2231                        async move {
2232                            assert_eq!(param, expected_param);
2233                            CoordinatorWorker::execute_coordinator(
2234                                db,
2235                                param.clone(),
2236                                new_writer_rx,
2237                                MockTwoPhaseCoordinator::new_coordinator(
2238                                    move |_epoch, metadata_list, schema_change| {
2239                                        assert_eq!(
2240                                            schema_change,
2241                                            Some(schema_change_for_pre_commit.clone())
2242                                        );
2243                                        let metadata =
2244                                            metadata_list.into_iter().exactly_one().unwrap();
2245                                        Ok(match metadata.metadata {
2246                                            Some(Metadata::Serialized(SerializedMetadata {
2247                                                metadata,
2248                                            })) => Some(metadata),
2249                                            _ => unreachable!(),
2250                                        })
2251                                    },
2252                                    move |_epoch, commit_metadata| {
2253                                        assert_eq!(commit_metadata, metadata);
2254                                        Ok(())
2255                                    },
2256                                    move |_epoch, schema_change| {
2257                                        assert_eq!(schema_change, schema_change_for_commit.clone());
2258                                        Ok(())
2259                                    },
2260                                ),
2261                                subscriber.clone(),
2262                            )
2263                            .await;
2264                        }
2265                    })
2266                }
2267            });
2268
2269        let build_client = |vnode| async {
2270            CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
2271                Ok(tonic::Response::new(
2272                    manager
2273                        .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
2274                        .await
2275                        .unwrap()
2276                        .boxed(),
2277                ))
2278            })
2279            .await
2280        };
2281
2282        let (mut client1, _) = build_client(vnode.clone()).await.unwrap();
2283
2284        let aligned_epoch = client1.align_initial_epoch(1).await.unwrap();
2285        assert_eq!(aligned_epoch, 1);
2286
2287        client1
2288            .commit(
2289                epoch1,
2290                SinkMetadata {
2291                    metadata: Some(Metadata::Serialized(SerializedMetadata {
2292                        metadata: metadata.clone(),
2293                    })),
2294                },
2295                Some(schema_change.clone()),
2296            )
2297            .await
2298            .unwrap();
2299
2300        {
2301            let rows = list_rows(&db).await;
2302            assert_eq!(rows.len(), 1);
2303            assert_eq!(rows[0].1, epoch1 as i64);
2304            assert_eq!(rows[0].2, "PENDING");
2305            assert_eq!(rows[0].4, Some(schema_change.clone()));
2306        }
2307
2308        let mut build_client2_future = pin!(build_client(vnode.clone()));
2309        assert!(
2310            poll_fn(|cx| Poll::Ready(build_client2_future.as_mut().poll(cx)))
2311                .await
2312                .is_pending()
2313        );
2314
2315        client1.stop().await.unwrap();
2316
2317        assert!(
2318            poll_fn(|cx| Poll::Ready(build_client2_future.as_mut().poll(cx)))
2319                .await
2320                .is_pending()
2321        );
2322
2323        {
2324            let guard = sender.lock().await;
2325            let sender = guard.as_ref().unwrap().clone();
2326            sender.send(epoch1).unwrap();
2327        }
2328
2329        let (_, init_epoch) = build_client2_future.await.unwrap();
2330        assert_eq!(init_epoch, Some(epoch1));
2331
2332        {
2333            let rows = list_rows(&db).await;
2334            assert_eq!(rows.len(), 1);
2335            assert_eq!(rows[0].1, epoch1 as i64);
2336            assert_eq!(rows[0].2, "COMMITTED");
2337            assert_eq!(rows[0].4, Some(schema_change.clone()));
2338        }
2339    }
2340}