1use std::collections::HashMap;
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use futures::future::{BoxFuture, Either, join_all, select};
21use futures::stream::FuturesUnordered;
22use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
25use risingwave_connector::sink::catalog::SinkId;
26use risingwave_connector::sink::{SinkCommittedEpochSubscriber, SinkError, SinkParam};
27use risingwave_pb::connector_service::coordinate_request::Msg;
28use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse, coordinate_request};
29use rw_futures_util::pending_on_none;
30use sea_orm::DatabaseConnection;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
33use tokio::sync::oneshot::{Receiver, Sender, channel};
34use tokio::sync::{mpsc, oneshot};
35use tokio::task::{JoinError, JoinHandle};
36use tokio_stream::wrappers::UnboundedReceiverStream;
37use tonic::Status;
38use tracing::{debug, error, info, warn};
39
40use crate::hummock::HummockManagerRef;
41use crate::manager::MetadataManager;
42use crate::manager::sink_coordination::SinkWriterRequestStream;
43use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
44use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
45
46macro_rules! send_with_err_check {
47 ($tx:expr, $msg:expr) => {
48 if $tx.send($msg).is_err() {
49 error!("unable to send msg");
50 }
51 };
52}
53
54macro_rules! send_await_with_err_check {
55 ($tx:expr, $msg:expr) => {
56 if $tx.send($msg).await.is_err() {
57 error!("unable to send msg");
58 }
59 };
60}
61
62const BOUNDED_CHANNEL_SIZE: usize = 16;
63
64enum ManagerRequest {
65 NewSinkWriter(SinkWriterCoordinationHandle),
66 StopCoordinator {
67 finish_notifier: Sender<()>,
68 sink_ids: Option<Vec<SinkId>>,
70 },
71}
72
73#[derive(Clone)]
74pub struct SinkCoordinatorManager {
75 request_tx: mpsc::Sender<ManagerRequest>,
76}
77
78fn new_committed_epoch_subscriber(
79 hummock_manager: HummockManagerRef,
80 metadata_manager: MetadataManager,
81) -> SinkCommittedEpochSubscriber {
82 Arc::new(move |sink_id| {
83 let hummock_manager = hummock_manager.clone();
84 let metadata_manager = metadata_manager.clone();
85 async move {
86 let state_table_ids = metadata_manager
87 .get_sink_state_table_ids(sink_id)
88 .await
89 .map_err(SinkError::from)?;
90 let Some(table_id) = state_table_ids.first() else {
91 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
92 };
93 hummock_manager
94 .subscribe_table_committed_epoch(*table_id)
95 .await
96 .map_err(SinkError::from)
97 }
98 .boxed()
99 })
100}
101
102impl SinkCoordinatorManager {
103 pub fn start_worker(
104 db: DatabaseConnection,
105 hummock_manager: HummockManagerRef,
106 metadata_manager: MetadataManager,
107 iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
108 ) -> (Self, (JoinHandle<()>, Sender<()>)) {
109 let subscriber = new_committed_epoch_subscriber(hummock_manager, metadata_manager);
110 Self::start_worker_with_spawn_worker(move |param, manager_request_stream| {
111 tokio::spawn(CoordinatorWorker::run(
112 param,
113 manager_request_stream,
114 db.clone(),
115 subscriber.clone(),
116 iceberg_compact_stat_sender.clone(),
117 ))
118 })
119 }
120
121 fn start_worker_with_spawn_worker(
122 spawn_coordinator_worker: impl SpawnCoordinatorFn,
123 ) -> (Self, (JoinHandle<()>, Sender<()>)) {
124 let (request_tx, request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
125 let (shutdown_tx, shutdown_rx) = channel();
126 let worker = ManagerWorker::new(request_rx, shutdown_rx);
127 let join_handle = tokio::spawn(worker.execute(spawn_coordinator_worker));
128 (
129 SinkCoordinatorManager { request_tx },
130 (join_handle, shutdown_tx),
131 )
132 }
133
134 pub async fn handle_new_request(
135 &self,
136 mut request_stream: SinkWriterRequestStream,
137 ) -> Result<impl Stream<Item = Result<CoordinateResponse, Status>> + use<>, Status> {
138 let (param, vnode_bitmap) = match request_stream.try_next().await? {
139 Some(CoordinateRequest {
140 msg:
141 Some(Msg::StartRequest(coordinate_request::StartCoordinationRequest {
142 param: Some(param),
143 vnode_bitmap: Some(vnode_bitmap),
144 })),
145 }) => (SinkParam::from_proto(param), Bitmap::from(&vnode_bitmap)),
146 msg => {
147 return Err(Status::invalid_argument(format!(
148 "expected CoordinateRequest::StartRequest in the first request, get {:?}",
149 msg
150 )));
151 }
152 };
153 let (response_tx, response_rx) = mpsc::unbounded_channel();
154 self.request_tx
155 .send(ManagerRequest::NewSinkWriter(
156 SinkWriterCoordinationHandle::new(request_stream, response_tx, param, vnode_bitmap),
157 ))
158 .await
159 .map_err(|_| {
160 Status::unavailable(
161 "unable to send to sink manager worker. The worker may have stopped",
162 )
163 })?;
164
165 Ok(UnboundedReceiverStream::new(response_rx))
166 }
167
168 async fn stop_coordinator(&self, sink_ids: Option<Vec<SinkId>>) {
169 let (tx, rx) = channel();
170 send_await_with_err_check!(
171 self.request_tx,
172 ManagerRequest::StopCoordinator {
173 finish_notifier: tx,
174 sink_ids: sink_ids.clone(),
175 }
176 );
177 if rx.await.is_err() {
178 error!("fail to wait for resetting sink manager worker");
179 }
180 info!("successfully stop coordinator: {:?}", sink_ids);
181 }
182
183 pub async fn reset(&self) {
184 self.stop_coordinator(None).await;
185 }
186
187 pub async fn stop_sink_coordinator(&self, sink_ids: Vec<SinkId>) {
188 self.stop_coordinator(Some(sink_ids)).await;
189 }
190}
191
192struct CoordinatorWorkerHandle {
193 request_sender: Option<UnboundedSender<SinkWriterCoordinationHandle>>,
195 finish_notifiers: Vec<Sender<()>>,
197}
198
199struct ManagerWorker {
200 request_rx: mpsc::Receiver<ManagerRequest>,
201 shutdown_rx: Receiver<()>,
203
204 running_coordinator_worker_join_handles:
205 FuturesUnordered<BoxFuture<'static, (SinkId, Result<(), JoinError>)>>,
206 running_coordinator_worker: HashMap<SinkId, CoordinatorWorkerHandle>,
207}
208
209enum ManagerEvent {
210 NewRequest(ManagerRequest),
211 CoordinatorWorkerFinished {
212 sink_id: SinkId,
213 join_result: Result<(), JoinError>,
214 },
215}
216
217trait SpawnCoordinatorFn = FnMut(SinkParam, UnboundedReceiver<SinkWriterCoordinationHandle>) -> JoinHandle<()>
218 + Send
219 + 'static;
220
221impl ManagerWorker {
222 fn new(request_rx: mpsc::Receiver<ManagerRequest>, shutdown_rx: Receiver<()>) -> Self {
223 ManagerWorker {
224 request_rx,
225 shutdown_rx,
226 running_coordinator_worker_join_handles: Default::default(),
227 running_coordinator_worker: Default::default(),
228 }
229 }
230
231 async fn execute(mut self, mut spawn_coordinator_worker: impl SpawnCoordinatorFn) {
232 while let Some(event) = self.next_event().await {
233 match event {
234 ManagerEvent::NewRequest(request) => match request {
235 ManagerRequest::NewSinkWriter(request) => {
236 self.handle_new_sink_writer(request, &mut spawn_coordinator_worker)
237 }
238 ManagerRequest::StopCoordinator {
239 finish_notifier,
240 sink_ids,
241 } => {
242 if let Some(sink_ids) = sink_ids {
243 let mut rxs = Vec::with_capacity(sink_ids.len());
244 for sink_id in sink_ids {
245 if let Some(worker_handle) =
246 self.running_coordinator_worker.get_mut(&sink_id)
247 {
248 let (tx, rx) = oneshot::channel();
249 rxs.push(rx);
250 worker_handle.finish_notifiers.push(tx);
251 if let Some(sender) = worker_handle.request_sender.take() {
252 drop(sender);
255 }
256 } else {
257 debug!(
258 "sink coordinator of {} is not running, skip it",
259 sink_id
260 );
261 }
262 }
263 tokio::spawn(async move {
264 let notify_res = join_all(rxs).await;
265 for res in notify_res {
266 if let Err(e) = res {
267 error!(
268 "fail to wait for resetting sink manager worker: {}",
269 e.as_report()
270 );
271 }
272 }
273 send_with_err_check!(finish_notifier, ());
274 });
275 } else {
276 self.clean_up().await;
277 send_with_err_check!(finish_notifier, ());
278 }
279 }
280 },
281 ManagerEvent::CoordinatorWorkerFinished {
282 sink_id,
283 join_result,
284 } => self.handle_coordinator_finished(sink_id, join_result),
285 }
286 }
287 self.clean_up().await;
288 info!("sink manager worker exited");
289 }
290
291 async fn next_event(&mut self) -> Option<ManagerEvent> {
292 match select(
293 select(
294 pin!(self.request_rx.recv()),
295 pin!(pending_on_none(
296 self.running_coordinator_worker_join_handles.next()
297 )),
298 ),
299 &mut self.shutdown_rx,
300 )
301 .await
302 {
303 Either::Left((either, _)) => match either {
304 Either::Left((Some(request), _)) => Some(ManagerEvent::NewRequest(request)),
305 Either::Left((None, _)) => None,
306 Either::Right(((sink_id, join_result), _)) => {
307 Some(ManagerEvent::CoordinatorWorkerFinished {
308 sink_id,
309 join_result,
310 })
311 }
312 },
313 Either::Right(_) => None,
314 }
315 }
316
317 async fn clean_up(&mut self) {
318 info!("sink manager worker start cleaning up");
319 for worker_handle in self.running_coordinator_worker.values_mut() {
320 if let Some(sender) = worker_handle.request_sender.take() {
321 drop(sender);
323 }
324 }
325 while let Some((sink_id, join_result)) =
326 self.running_coordinator_worker_join_handles.next().await
327 {
328 self.handle_coordinator_finished(sink_id, join_result);
329 }
330 info!("sink manager worker finished cleaning up");
331 }
332
333 fn handle_coordinator_finished(&mut self, sink_id: SinkId, join_result: Result<(), JoinError>) {
334 let worker_handle = self
335 .running_coordinator_worker
336 .remove(&sink_id)
337 .expect("finished coordinator should have an associated worker handle");
338 for finish_notifier in worker_handle.finish_notifiers {
339 send_with_err_check!(finish_notifier, ());
340 }
341 match join_result {
342 Ok(()) => {
343 info!(
344 id = %sink_id,
345 "sink coordinator has gracefully finished",
346 );
347 }
348 Err(err) => {
349 error!(
350 id = %sink_id,
351 error = %err.as_report(),
352 "sink coordinator finished with error",
353 );
354 }
355 }
356 }
357
358 fn handle_new_sink_writer(
359 &mut self,
360 new_writer: SinkWriterCoordinationHandle,
361 spawn_coordinator_worker: &mut impl SpawnCoordinatorFn,
362 ) {
363 let param = new_writer.param();
364 let sink_id = param.sink_id;
365
366 let handle = self
367 .running_coordinator_worker
368 .entry(param.sink_id)
369 .or_insert_with(|| {
370 let (request_tx, request_rx) = unbounded_channel();
372 let join_handle = spawn_coordinator_worker(param.clone(), request_rx);
373 self.running_coordinator_worker_join_handles.push(
374 join_handle
375 .map(move |join_result| (sink_id, join_result))
376 .boxed(),
377 );
378 CoordinatorWorkerHandle {
379 request_sender: Some(request_tx),
380 finish_notifiers: Vec::new(),
381 }
382 });
383
384 if let Some(sender) = handle.request_sender.as_mut() {
385 send_with_err_check!(sender, new_writer);
386 } else {
387 warn!(
388 "handle a new request while the sink coordinator is being stopped: {:?}",
389 param
390 );
391 new_writer.abort(Status::internal("the sink is being stopped"));
392 }
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use std::future::{Future, poll_fn};
399 use std::pin::pin;
400 use std::sync::Arc;
401 use std::sync::atomic::AtomicI32;
402 use std::task::Poll;
403
404 use anyhow::anyhow;
405 use async_trait::async_trait;
406 use futures::future::{join, try_join};
407 use futures::{FutureExt, StreamExt, TryFutureExt};
408 use itertools::Itertools;
409 use rand::seq::SliceRandom;
410 use risingwave_common::bitmap::BitmapBuilder;
411 use risingwave_common::hash::VirtualNode;
412 use risingwave_connector::sink::catalog::{SinkId, SinkType};
413 use risingwave_connector::sink::{
414 SinglePhaseCommitCoordinator, SinkCommitCoordinator, SinkError, SinkParam,
415 TwoPhaseCommitCoordinator,
416 };
417 use risingwave_meta_model::SinkSchemachange;
418 use risingwave_pb::connector_service::SinkMetadata;
419 use risingwave_pb::connector_service::sink_metadata::{Metadata, SerializedMetadata};
420 use risingwave_pb::data::PbDataType;
421 use risingwave_pb::data::data_type::PbTypeName;
422 use risingwave_pb::plan_common::PbField;
423 use risingwave_pb::stream_plan::sink_schema_change::Op as SinkSchemachangeOp;
424 use risingwave_pb::stream_plan::{PbSinkAddColumnsOp, PbSinkSchemaChange};
425 use risingwave_rpc_client::CoordinatorStreamHandle;
426 use sea_orm::{ConnectionTrait, Database, DatabaseConnection};
427 use tokio::sync::mpsc::unbounded_channel;
428 use tokio_stream::wrappers::ReceiverStream;
429
430 use crate::manager::sink_coordination::SinkCoordinatorManager;
431 use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
432 use crate::manager::sink_coordination::manager::SinkCommittedEpochSubscriber;
433
434 struct MockSinglePhaseCoordinator<
435 C,
436 F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>,
437 > {
438 context: C,
439 f: F,
440 }
441
442 impl<
443 C: Send + 'static,
444 F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError> + Send + 'static,
445 > MockSinglePhaseCoordinator<C, F>
446 {
447 fn new_coordinator(context: C, f: F) -> SinkCommitCoordinator {
448 SinkCommitCoordinator::SinglePhase(Box::new(MockSinglePhaseCoordinator { context, f }))
449 }
450 }
451
452 #[async_trait]
453 impl<C: Send, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError> + Send>
454 SinglePhaseCommitCoordinator for MockSinglePhaseCoordinator<C, F>
455 {
456 async fn init(&mut self) -> risingwave_connector::sink::Result<()> {
457 Ok(())
458 }
459
460 async fn commit_data(
461 &mut self,
462 epoch: u64,
463 metadata: Vec<SinkMetadata>,
464 ) -> risingwave_connector::sink::Result<()> {
465 (self.f)(epoch, metadata, &mut self.context)
466 }
467
468 async fn commit_schema_change(
469 &mut self,
470 _epoch: u64,
471 _schema_change: PbSinkSchemaChange,
472 ) -> risingwave_connector::sink::Result<()> {
473 unreachable!()
474 }
475 }
476
477 #[tokio::test]
478 async fn test_basic() {
479 let db = prepare_db_backend().await;
480
481 let param = SinkParam {
482 sink_id: SinkId::from(1),
483 sink_name: "test".into(),
484 properties: Default::default(),
485 columns: vec![],
486 downstream_pk: None,
487 sink_type: SinkType::AppendOnly,
488 ignore_delete: false,
489 format_desc: None,
490 db_name: "test".into(),
491 sink_from_name: "test".into(),
492 };
493
494 let epoch0 = 232;
495 let epoch1 = 233;
496 let epoch2 = 234;
497
498 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
499 all_vnode.shuffle(&mut rand::rng());
500 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
501 let build_bitmap = |indexes: &[usize]| {
502 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
503 for i in indexes {
504 builder.set(*i, true);
505 }
506 builder.finish()
507 };
508 let vnode1 = build_bitmap(first);
509 let vnode2 = build_bitmap(second);
510
511 let metadata = [
512 [vec![1u8, 2u8], vec![3u8, 4u8]],
513 [vec![5u8, 6u8], vec![7u8, 8u8]],
514 ];
515 let sender = Arc::new(tokio::sync::Mutex::new(None));
516 let mock_subscriber: SinkCommittedEpochSubscriber = {
517 let captured_sender = sender.clone();
518 Arc::new(move |_sink_id: SinkId| {
519 let (sender, receiver) = unbounded_channel();
520 let captured_sender = captured_sender.clone();
521 async move {
522 let mut guard = captured_sender.lock().await;
523 *guard = Some(sender);
524 Ok((1, receiver))
525 }
526 .boxed()
527 })
528 };
529
530 let (manager, (_join_handle, _stop_tx)) =
531 SinkCoordinatorManager::start_worker_with_spawn_worker({
532 let expected_param = param.clone();
533 let metadata = metadata.clone();
534 let db = db.clone();
535 move |param, new_writer_rx| {
536 let metadata = metadata.clone();
537 let expected_param = expected_param.clone();
538 let db = db.clone();
539 tokio::spawn({
540 let subscriber = mock_subscriber.clone();
541 async move {
542 assert_eq!(param, expected_param);
544 CoordinatorWorker::execute_coordinator(
545 db,
546 param.clone(),
547 new_writer_rx,
548 MockSinglePhaseCoordinator::new_coordinator(
549 0,
550 move |epoch, metadata_list, count: &mut usize| {
551 *count += 1;
552 let mut metadata_list =
553 metadata_list
554 .into_iter()
555 .map(|metadata| match metadata {
556 SinkMetadata {
557 metadata:
558 Some(Metadata::Serialized(
559 SerializedMetadata { metadata },
560 )),
561 } => metadata,
562 _ => unreachable!(),
563 })
564 .collect_vec();
565 metadata_list.sort();
566 match *count {
567 1 => {
568 assert_eq!(epoch, epoch1);
569 assert_eq!(2, metadata_list.len());
570 assert_eq!(metadata[0][0], metadata_list[0]);
571 assert_eq!(metadata[0][1], metadata_list[1]);
572 }
573 2 => {
574 assert_eq!(epoch, epoch2);
575 assert_eq!(2, metadata_list.len());
576 assert_eq!(metadata[1][0], metadata_list[0]);
577 assert_eq!(metadata[1][1], metadata_list[1]);
578 }
579 _ => unreachable!(),
580 }
581 Ok(())
582 },
583 ),
584 subscriber.clone(),
585 )
586 .await;
587 }
588 })
589 }
590 });
591
592 let build_client = |vnode| async {
593 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
594 Ok(tonic::Response::new(
595 manager
596 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
597 .await
598 .unwrap()
599 .boxed(),
600 ))
601 })
602 .await
603 .unwrap()
604 .0
605 };
606
607 let (mut client1, mut client2) =
608 join(build_client(vnode1), pin!(build_client(vnode2))).await;
609
610 let (aligned_epoch1, aligned_epoch2) = try_join(
611 client1.align_initial_epoch(epoch0),
612 client2.align_initial_epoch(epoch1),
613 )
614 .await
615 .unwrap();
616 assert_eq!(aligned_epoch1, epoch1);
617 assert_eq!(aligned_epoch2, epoch1);
618
619 {
620 let mut commit_future = pin!(
622 client2
623 .commit(
624 epoch1,
625 SinkMetadata {
626 metadata: Some(Metadata::Serialized(SerializedMetadata {
627 metadata: metadata[0][1].clone(),
628 })),
629 },
630 None,
631 )
632 .map(|result| result.unwrap())
633 );
634 assert!(
635 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
636 .await
637 .is_pending()
638 );
639 join(
640 commit_future,
641 client1
642 .commit(
643 epoch1,
644 SinkMetadata {
645 metadata: Some(Metadata::Serialized(SerializedMetadata {
646 metadata: metadata[0][0].clone(),
647 })),
648 },
649 None,
650 )
651 .map(|result| result.unwrap()),
652 )
653 .await;
654 }
655
656 let mut commit_future = pin!(
658 client1
659 .commit(
660 epoch2,
661 SinkMetadata {
662 metadata: Some(Metadata::Serialized(SerializedMetadata {
663 metadata: metadata[1][0].clone(),
664 })),
665 },
666 None,
667 )
668 .map(|result| result.unwrap())
669 );
670 assert!(
671 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
672 .await
673 .is_pending()
674 );
675 join(
676 commit_future,
677 client2
678 .commit(
679 epoch2,
680 SinkMetadata {
681 metadata: Some(Metadata::Serialized(SerializedMetadata {
682 metadata: metadata[1][1].clone(),
683 })),
684 },
685 None,
686 )
687 .map(|result| result.unwrap()),
688 )
689 .await;
690 }
691
692 #[tokio::test]
693 async fn test_single_writer() {
694 let db = prepare_db_backend().await;
695 let param = SinkParam {
696 sink_id: SinkId::from(1),
697 sink_name: "test".into(),
698 properties: Default::default(),
699 columns: vec![],
700 downstream_pk: None,
701 sink_type: SinkType::AppendOnly,
702 ignore_delete: false,
703 format_desc: None,
704 db_name: "test".into(),
705 sink_from_name: "test".into(),
706 };
707
708 let epoch1 = 233;
709 let epoch2 = 234;
710
711 let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
712 let build_bitmap = |indexes: &[usize]| {
713 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
714 for i in indexes {
715 builder.set(*i, true);
716 }
717 builder.finish()
718 };
719 let vnode = build_bitmap(&all_vnode);
720
721 let metadata = [vec![1u8, 2u8], vec![3u8, 4u8]];
722 let sender = Arc::new(tokio::sync::Mutex::new(None));
723 let mock_subscriber: SinkCommittedEpochSubscriber = {
724 let captured_sender = sender.clone();
725 Arc::new(move |_sink_id: SinkId| {
726 let (sender, receiver) = unbounded_channel();
727 let captured_sender = captured_sender.clone();
728 async move {
729 let mut guard = captured_sender.lock().await;
730 *guard = Some(sender);
731 Ok((1, receiver))
732 }
733 .boxed()
734 })
735 };
736 let (manager, (_join_handle, _stop_tx)) =
737 SinkCoordinatorManager::start_worker_with_spawn_worker({
738 let expected_param = param.clone();
739 let metadata = metadata.clone();
740 let db = db.clone();
741 move |param, new_writer_rx| {
742 let metadata = metadata.clone();
743 let expected_param = expected_param.clone();
744 let db = db.clone();
745 tokio::spawn({
746 let subscriber = mock_subscriber.clone();
747 async move {
748 assert_eq!(param, expected_param);
750 CoordinatorWorker::execute_coordinator(
751 db,
752 param.clone(),
753 new_writer_rx,
754 MockSinglePhaseCoordinator::new_coordinator(
755 0,
756 move |epoch, metadata_list, count: &mut usize| {
757 *count += 1;
758 let mut metadata_list =
759 metadata_list
760 .into_iter()
761 .map(|metadata| match metadata {
762 SinkMetadata {
763 metadata:
764 Some(Metadata::Serialized(
765 SerializedMetadata { metadata },
766 )),
767 } => metadata,
768 _ => unreachable!(),
769 })
770 .collect_vec();
771 metadata_list.sort();
772 match *count {
773 1 => {
774 assert_eq!(epoch, epoch1);
775 assert_eq!(1, metadata_list.len());
776 assert_eq!(metadata[0], metadata_list[0]);
777 }
778 2 => {
779 assert_eq!(epoch, epoch2);
780 assert_eq!(1, metadata_list.len());
781 assert_eq!(metadata[1], metadata_list[0]);
782 }
783 _ => unreachable!(),
784 }
785 Ok(())
786 },
787 ),
788 subscriber.clone(),
789 )
790 .await;
791 }
792 })
793 }
794 });
795
796 let build_client = |vnode| async {
797 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
798 Ok(tonic::Response::new(
799 manager
800 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
801 .await
802 .unwrap()
803 .boxed(),
804 ))
805 })
806 .await
807 .unwrap()
808 .0
809 };
810
811 let mut client = build_client(vnode).await;
812
813 let aligned_epoch = client.align_initial_epoch(epoch1).await.unwrap();
814 assert_eq!(aligned_epoch, epoch1);
815
816 client
817 .commit(
818 epoch1,
819 SinkMetadata {
820 metadata: Some(Metadata::Serialized(SerializedMetadata {
821 metadata: metadata[0].clone(),
822 })),
823 },
824 None,
825 )
826 .await
827 .unwrap();
828
829 client
830 .commit(
831 epoch2,
832 SinkMetadata {
833 metadata: Some(Metadata::Serialized(SerializedMetadata {
834 metadata: metadata[1].clone(),
835 })),
836 },
837 None,
838 )
839 .await
840 .unwrap();
841 }
842
843 #[tokio::test]
844 async fn test_partial_commit() {
845 let db = prepare_db_backend().await;
846 let param = SinkParam {
847 sink_id: SinkId::from(1),
848 sink_name: "test".into(),
849 properties: Default::default(),
850 columns: vec![],
851 downstream_pk: None,
852 sink_type: SinkType::AppendOnly,
853 ignore_delete: false,
854 format_desc: None,
855 db_name: "test".into(),
856 sink_from_name: "test".into(),
857 };
858
859 let epoch = 233;
860
861 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
862 all_vnode.shuffle(&mut rand::rng());
863 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
864 let build_bitmap = |indexes: &[usize]| {
865 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
866 for i in indexes {
867 builder.set(*i, true);
868 }
869 builder.finish()
870 };
871 let vnode1 = build_bitmap(first);
872 let vnode2 = build_bitmap(second);
873
874 let sender = Arc::new(tokio::sync::Mutex::new(None));
875 let mock_subscriber: SinkCommittedEpochSubscriber = {
876 let captured_sender = sender.clone();
877 Arc::new(move |_sink_id: SinkId| {
878 let (sender, receiver) = unbounded_channel();
879 let captured_sender = captured_sender.clone();
880 async move {
881 let mut guard = captured_sender.lock().await;
882 *guard = Some(sender);
883 Ok((1, receiver))
884 }
885 .boxed()
886 })
887 };
888 let (manager, (_join_handle, _stop_tx)) =
889 SinkCoordinatorManager::start_worker_with_spawn_worker({
890 let expected_param = param.clone();
891 let db = db.clone();
892 move |param, new_writer_rx| {
893 let expected_param = expected_param.clone();
894 let db = db.clone();
895 tokio::spawn({
896 let subscriber = mock_subscriber.clone();
897 async move {
898 assert_eq!(param, expected_param);
900 CoordinatorWorker::execute_coordinator(
901 db,
902 param,
903 new_writer_rx,
904 MockSinglePhaseCoordinator::new_coordinator(
905 (),
906 |_, _, _| unreachable!(),
907 ),
908 subscriber.clone(),
909 )
910 .await;
911 }
912 })
913 }
914 });
915
916 let build_client = |vnode| async {
917 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
918 Ok(tonic::Response::new(
919 manager
920 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
921 .await
922 .unwrap()
923 .boxed(),
924 ))
925 })
926 .await
927 .unwrap()
928 .0
929 };
930
931 let (mut client1, client2) = join(build_client(vnode1), build_client(vnode2)).await;
932
933 let mut commit_future = pin!(client1.commit(
935 epoch,
936 SinkMetadata {
937 metadata: Some(Metadata::Serialized(SerializedMetadata {
938 metadata: vec![],
939 })),
940 },
941 None,
942 ));
943 assert!(
944 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
945 .await
946 .is_pending()
947 );
948 drop(client2);
949 assert!(commit_future.await.is_err());
950 }
951
952 #[tokio::test]
953 async fn test_fail_commit() {
954 let db = prepare_db_backend().await;
955 let param = SinkParam {
956 sink_id: SinkId::from(1),
957 sink_name: "test".into(),
958 properties: Default::default(),
959 columns: vec![],
960 downstream_pk: None,
961 sink_type: SinkType::AppendOnly,
962 ignore_delete: false,
963 format_desc: None,
964 db_name: "test".into(),
965 sink_from_name: "test".into(),
966 };
967
968 let epoch = 233;
969
970 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
971 all_vnode.shuffle(&mut rand::rng());
972 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
973 let build_bitmap = |indexes: &[usize]| {
974 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
975 for i in indexes {
976 builder.set(*i, true);
977 }
978 builder.finish()
979 };
980 let vnode1 = build_bitmap(first);
981 let vnode2 = build_bitmap(second);
982 let sender = Arc::new(tokio::sync::Mutex::new(None));
983 let mock_subscriber: SinkCommittedEpochSubscriber = {
984 let captured_sender = sender.clone();
985 Arc::new(move |_sink_id: SinkId| {
986 let (sender, receiver) = unbounded_channel();
987 let captured_sender = captured_sender.clone();
988 async move {
989 let mut guard = captured_sender.lock().await;
990 *guard = Some(sender);
991 Ok((1, receiver))
992 }
993 .boxed()
994 })
995 };
996 let (manager, (_join_handle, _stop_tx)) =
997 SinkCoordinatorManager::start_worker_with_spawn_worker({
998 let expected_param = param.clone();
999 let db = db.clone();
1000 move |param, new_writer_rx| {
1001 let expected_param = expected_param.clone();
1002 let db = db.clone();
1003 tokio::spawn({
1004 let subscriber = mock_subscriber.clone();
1005 {
1006 async move {
1007 assert_eq!(param, expected_param);
1009 CoordinatorWorker::execute_coordinator(
1010 db,
1011 param,
1012 new_writer_rx,
1013 MockSinglePhaseCoordinator::new_coordinator((), |_, _, _| {
1014 Err(SinkError::Coordinator(anyhow!("failed to commit")))
1015 }),
1016 subscriber.clone(),
1017 )
1018 .await;
1019 }
1020 }
1021 })
1022 }
1023 });
1024
1025 let build_client = |vnode| async {
1026 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1027 Ok(tonic::Response::new(
1028 manager
1029 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1030 .await
1031 .unwrap()
1032 .boxed(),
1033 ))
1034 })
1035 .await
1036 .unwrap()
1037 .0
1038 };
1039
1040 let (mut client1, mut client2) = join(build_client(vnode1), build_client(vnode2)).await;
1041
1042 let mut commit_future = pin!(client1.commit(
1044 epoch,
1045 SinkMetadata {
1046 metadata: Some(Metadata::Serialized(SerializedMetadata {
1047 metadata: vec![],
1048 })),
1049 },
1050 None,
1051 ));
1052 assert!(
1053 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1054 .await
1055 .is_pending()
1056 );
1057 let (result1, result2) = join(
1058 commit_future,
1059 client2.commit(
1060 epoch,
1061 SinkMetadata {
1062 metadata: Some(Metadata::Serialized(SerializedMetadata {
1063 metadata: vec![],
1064 })),
1065 },
1066 None,
1067 ),
1068 )
1069 .await;
1070 assert!(result1.is_err());
1071 assert!(result2.is_err());
1072 }
1073
1074 #[tokio::test]
1075 async fn test_update_vnode_bitmap() {
1076 let db = prepare_db_backend().await;
1077 let param = SinkParam {
1078 sink_id: SinkId::from(1),
1079 sink_name: "test".into(),
1080 properties: Default::default(),
1081 columns: vec![],
1082 downstream_pk: None,
1083 sink_type: SinkType::AppendOnly,
1084 ignore_delete: false,
1085 format_desc: None,
1086 db_name: "test".into(),
1087 sink_from_name: "test".into(),
1088 };
1089
1090 let epoch1 = 233;
1091 let epoch2 = 234;
1092 let epoch3 = 235;
1093 let epoch4 = 236;
1094
1095 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1096 all_vnode.shuffle(&mut rand::rng());
1097 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
1098 let build_bitmap = |indexes: &[usize]| {
1099 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1100 for i in indexes {
1101 builder.set(*i, true);
1102 }
1103 builder.finish()
1104 };
1105 let vnode1 = build_bitmap(first);
1106 let vnode2 = build_bitmap(second);
1107
1108 let metadata = [
1109 [vec![1u8, 2u8], vec![3u8, 4u8]],
1110 [vec![5u8, 6u8], vec![7u8, 8u8]],
1111 ];
1112
1113 let metadata_scale_out = [vec![9u8, 10u8], vec![11u8, 12u8], vec![13u8, 14u8]];
1114 let metadata_scale_in = [vec![13u8, 14u8], vec![15u8, 16u8]];
1115 let sender = Arc::new(tokio::sync::Mutex::new(None));
1116 let mock_subscriber: SinkCommittedEpochSubscriber = {
1117 let captured_sender = sender.clone();
1118 Arc::new(move |_sink_id: SinkId| {
1119 let (sender, receiver) = unbounded_channel();
1120 let captured_sender = captured_sender.clone();
1121 async move {
1122 let mut guard = captured_sender.lock().await;
1123 *guard = Some(sender);
1124 Ok((1, receiver))
1125 }
1126 .boxed()
1127 })
1128 };
1129 let (manager, (_join_handle, _stop_tx)) =
1130 SinkCoordinatorManager::start_worker_with_spawn_worker({
1131 let expected_param = param.clone();
1132 let metadata = metadata.clone();
1133 let metadata_scale_out = metadata_scale_out.clone();
1134 let metadata_scale_in = metadata_scale_in.clone();
1135 let db = db.clone();
1136 move |param, new_writer_rx| {
1137 let metadata = metadata.clone();
1138 let metadata_scale_out = metadata_scale_out.clone();
1139 let metadata_scale_in = metadata_scale_in.clone();
1140 let expected_param = expected_param.clone();
1141 let db = db.clone();
1142 tokio::spawn({
1143 let subscriber = mock_subscriber.clone();
1144 async move {
1145 assert_eq!(param, expected_param);
1147 CoordinatorWorker::execute_coordinator(
1148 db,
1149 param.clone(),
1150 new_writer_rx,
1151 MockSinglePhaseCoordinator::new_coordinator(
1152 0,
1153 move |epoch, metadata_list, count: &mut usize| {
1154 *count += 1;
1155 let mut metadata_list =
1156 metadata_list
1157 .into_iter()
1158 .map(|metadata| match metadata {
1159 SinkMetadata {
1160 metadata:
1161 Some(Metadata::Serialized(
1162 SerializedMetadata { metadata },
1163 )),
1164 } => metadata,
1165 _ => unreachable!(),
1166 })
1167 .collect_vec();
1168 metadata_list.sort();
1169 let (expected_epoch, expected_metadata_list) = match *count
1170 {
1171 1 => (epoch1, metadata[0].as_slice()),
1172 2 => (epoch2, metadata[1].as_slice()),
1173 3 => (epoch3, metadata_scale_out.as_slice()),
1174 4 => (epoch4, metadata_scale_in.as_slice()),
1175 _ => unreachable!(),
1176 };
1177 assert_eq!(expected_epoch, epoch);
1178 assert_eq!(expected_metadata_list, &metadata_list);
1179 Ok(())
1180 },
1181 ),
1182 subscriber.clone(),
1183 )
1184 .await;
1185 }
1186 })
1187 }
1188 });
1189
1190 let build_client = |vnode| async {
1191 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1192 Ok(tonic::Response::new(
1193 manager
1194 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1195 .await
1196 .unwrap()
1197 .boxed(),
1198 ))
1199 })
1200 .await
1201 };
1202
1203 let ((mut client1, _), (mut client2, _)) =
1204 try_join(build_client(vnode1), pin!(build_client(vnode2)))
1205 .await
1206 .unwrap();
1207
1208 let (aligned_epoch1, aligned_epoch2) = try_join(
1209 client1.align_initial_epoch(epoch1),
1210 client2.align_initial_epoch(epoch1),
1211 )
1212 .await
1213 .unwrap();
1214 assert_eq!(aligned_epoch1, epoch1);
1215 assert_eq!(aligned_epoch2, epoch1);
1216
1217 {
1218 let mut commit_future = pin!(
1220 client2
1221 .commit(
1222 epoch1,
1223 SinkMetadata {
1224 metadata: Some(Metadata::Serialized(SerializedMetadata {
1225 metadata: metadata[0][1].clone(),
1226 })),
1227 },
1228 None,
1229 )
1230 .map(|result| result.unwrap())
1231 );
1232 assert!(
1233 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1234 .await
1235 .is_pending()
1236 );
1237 join(
1238 commit_future,
1239 client1
1240 .commit(
1241 epoch1,
1242 SinkMetadata {
1243 metadata: Some(Metadata::Serialized(SerializedMetadata {
1244 metadata: metadata[0][0].clone(),
1245 })),
1246 },
1247 None,
1248 )
1249 .map(|result| result.unwrap()),
1250 )
1251 .await;
1252 }
1253
1254 let (vnode1, vnode2, vnode3) = {
1255 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1256 let (second, third) = second.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1257 (
1258 build_bitmap(first),
1259 build_bitmap(second),
1260 build_bitmap(third),
1261 )
1262 };
1263
1264 let mut build_client3_future = pin!(build_client(vnode3));
1265 assert!(
1266 poll_fn(|cx| Poll::Ready(build_client3_future.as_mut().poll(cx)))
1267 .await
1268 .is_pending()
1269 );
1270 let mut client3;
1271 {
1272 {
1273 let mut commit_future = pin!(
1275 client1
1276 .commit(
1277 epoch2,
1278 SinkMetadata {
1279 metadata: Some(Metadata::Serialized(SerializedMetadata {
1280 metadata: metadata[1][0].clone(),
1281 })),
1282 },
1283 None,
1284 )
1285 .map_err(Into::into)
1286 );
1287 assert!(
1288 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1289 .await
1290 .is_pending()
1291 );
1292 try_join(
1293 commit_future,
1294 client2.commit(
1295 epoch2,
1296 SinkMetadata {
1297 metadata: Some(Metadata::Serialized(SerializedMetadata {
1298 metadata: metadata[1][1].clone(),
1299 })),
1300 },
1301 None,
1302 ),
1303 )
1304 .await
1305 .unwrap();
1306 }
1307
1308 client3 = {
1309 let (
1310 (client3, init_epoch),
1311 (update_vnode_bitmap_epoch1, update_vnode_bitmap_epoch2),
1312 ) = try_join(
1313 build_client3_future,
1314 try_join(
1315 client1.update_vnode_bitmap(&vnode1),
1316 client2.update_vnode_bitmap(&vnode2),
1317 )
1318 .map_err(Into::into),
1319 )
1320 .await
1321 .unwrap();
1322 assert_eq!(init_epoch, Some(epoch2));
1323 assert_eq!(update_vnode_bitmap_epoch1, epoch2);
1324 assert_eq!(update_vnode_bitmap_epoch2, epoch2);
1325 client3
1326 };
1327 let mut commit_future3 = pin!(client3.commit(
1328 epoch3,
1329 SinkMetadata {
1330 metadata: Some(Metadata::Serialized(SerializedMetadata {
1331 metadata: metadata_scale_out[2].clone(),
1332 })),
1333 },
1334 None,
1335 ));
1336 assert!(
1337 poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1338 .await
1339 .is_pending()
1340 );
1341 let mut commit_future1 = pin!(client1.commit(
1342 epoch3,
1343 SinkMetadata {
1344 metadata: Some(Metadata::Serialized(SerializedMetadata {
1345 metadata: metadata_scale_out[0].clone(),
1346 })),
1347 },
1348 None,
1349 ));
1350 assert!(
1351 poll_fn(|cx| Poll::Ready(commit_future1.as_mut().poll(cx)))
1352 .await
1353 .is_pending()
1354 );
1355 assert!(
1356 poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1357 .await
1358 .is_pending()
1359 );
1360 try_join(
1361 client2.commit(
1362 epoch3,
1363 SinkMetadata {
1364 metadata: Some(Metadata::Serialized(SerializedMetadata {
1365 metadata: metadata_scale_out[1].clone(),
1366 })),
1367 },
1368 None,
1369 ),
1370 try_join(commit_future1, commit_future3),
1371 )
1372 .await
1373 .unwrap();
1374 }
1375
1376 let (vnode2, vnode3) = {
1377 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1378 (build_bitmap(first), build_bitmap(second))
1379 };
1380
1381 {
1382 let (_, (update_vnode_bitmap_epoch2, update_vnode_bitmap_epoch3)) = try_join(
1383 client1.stop(),
1384 try_join(
1385 client2.update_vnode_bitmap(&vnode2),
1386 client3.update_vnode_bitmap(&vnode3),
1387 ),
1388 )
1389 .await
1390 .unwrap();
1391 assert_eq!(update_vnode_bitmap_epoch2, epoch3);
1392 assert_eq!(update_vnode_bitmap_epoch3, epoch3);
1393 }
1394
1395 {
1396 let mut commit_future = pin!(
1397 client2
1398 .commit(
1399 epoch4,
1400 SinkMetadata {
1401 metadata: Some(Metadata::Serialized(SerializedMetadata {
1402 metadata: metadata_scale_in[0].clone(),
1403 })),
1404 },
1405 None,
1406 )
1407 .map(|result| result.unwrap())
1408 );
1409 assert!(
1410 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1411 .await
1412 .is_pending()
1413 );
1414 join(
1415 commit_future,
1416 client3
1417 .commit(
1418 epoch4,
1419 SinkMetadata {
1420 metadata: Some(Metadata::Serialized(SerializedMetadata {
1421 metadata: metadata_scale_in[1].clone(),
1422 })),
1423 },
1424 None,
1425 )
1426 .map(|result| result.unwrap()),
1427 )
1428 .await;
1429 }
1430 }
1431
1432 struct MockTwoPhaseCoordinator<
1433 P: FnMut(
1434 u64,
1435 Vec<SinkMetadata>,
1436 Option<PbSinkSchemaChange>,
1437 ) -> Result<Option<Vec<u8>>, SinkError>,
1438 CD: FnMut(u64, Vec<u8>) -> Result<(), SinkError>,
1439 CS: FnMut(u64, PbSinkSchemaChange) -> Result<(), SinkError>,
1440 > {
1441 pre_commit: P,
1442 commit_data: CD,
1443 commit_schema_change: CS,
1444 }
1445
1446 impl<
1447 P: FnMut(
1448 u64,
1449 Vec<SinkMetadata>,
1450 Option<PbSinkSchemaChange>,
1451 ) -> Result<Option<Vec<u8>>, SinkError>
1452 + Send
1453 + 'static,
1454 CD: FnMut(u64, Vec<u8>) -> Result<(), SinkError> + Send + 'static,
1455 CS: FnMut(u64, PbSinkSchemaChange) -> Result<(), SinkError> + Send + 'static,
1456 > MockTwoPhaseCoordinator<P, CD, CS>
1457 {
1458 fn new_coordinator(
1459 pre_commit: P,
1460 commit_data: CD,
1461 commit_schema_change: CS,
1462 ) -> SinkCommitCoordinator {
1463 SinkCommitCoordinator::TwoPhase(Box::new(MockTwoPhaseCoordinator {
1464 pre_commit,
1465 commit_data,
1466 commit_schema_change,
1467 }))
1468 }
1469 }
1470
1471 #[async_trait]
1472 impl<
1473 P: FnMut(
1474 u64,
1475 Vec<SinkMetadata>,
1476 Option<PbSinkSchemaChange>,
1477 ) -> Result<Option<Vec<u8>>, SinkError>
1478 + Send
1479 + 'static,
1480 CD: FnMut(u64, Vec<u8>) -> Result<(), SinkError> + Send + 'static,
1481 CS: FnMut(u64, PbSinkSchemaChange) -> Result<(), SinkError> + Send + 'static,
1482 > TwoPhaseCommitCoordinator for MockTwoPhaseCoordinator<P, CD, CS>
1483 {
1484 async fn init(&mut self) -> risingwave_connector::sink::Result<()> {
1485 Ok(())
1486 }
1487
1488 async fn pre_commit(
1489 &mut self,
1490 epoch: u64,
1491 metadata: Vec<SinkMetadata>,
1492 schema_change: Option<PbSinkSchemaChange>,
1493 ) -> risingwave_connector::sink::Result<Option<Vec<u8>>> {
1494 (self.pre_commit)(epoch, metadata, schema_change)
1495 }
1496
1497 async fn commit_data(
1498 &mut self,
1499 epoch: u64,
1500 commit_metadata: Vec<u8>,
1501 ) -> risingwave_connector::sink::Result<()> {
1502 (self.commit_data)(epoch, commit_metadata)
1503 }
1504
1505 async fn commit_schema_change(
1506 &mut self,
1507 epoch: u64,
1508 schema_change: PbSinkSchemaChange,
1509 ) -> risingwave_connector::sink::Result<()> {
1510 (self.commit_schema_change)(epoch, schema_change)
1511 }
1512
1513 async fn abort(&mut self, _epoch: u64, _commit_metadata: Vec<u8>) {
1514 tracing::debug!("abort called");
1515 }
1516 }
1517
1518 async fn prepare_db_backend() -> DatabaseConnection {
1519 let db: DatabaseConnection = Database::connect("sqlite::memory:").await.unwrap();
1520 let ddl = "
1521 CREATE TABLE IF NOT EXISTS pending_sink_state (
1522 sink_id i32 NOT NULL,
1523 epoch i64 NOT NULL,
1524 sink_state STRING NOT NULL,
1525 metadata BLOB NOT NULL,
1526 schema_change BLOB,
1527 PRIMARY KEY (sink_id, epoch)
1528 )
1529 ";
1530 db.execute(sea_orm::Statement::from_string(
1531 db.get_database_backend(),
1532 ddl.to_owned(),
1533 ))
1534 .await
1535 .unwrap();
1536 db
1537 }
1538
1539 async fn list_rows(
1540 db: &DatabaseConnection,
1541 ) -> Vec<(i32, i64, String, Vec<u8>, Option<PbSinkSchemaChange>)> {
1542 let sql =
1543 "SELECT sink_id, epoch, sink_state, metadata, schema_change FROM pending_sink_state";
1544 let rows = db
1545 .query_all(sea_orm::Statement::from_string(
1546 db.get_database_backend(),
1547 sql.to_owned(),
1548 ))
1549 .await
1550 .unwrap();
1551 rows.into_iter()
1552 .map(|row| {
1553 (
1554 row.try_get("", "sink_id").unwrap(),
1555 row.try_get("", "epoch").unwrap(),
1556 row.try_get("", "sink_state").unwrap(),
1557 row.try_get("", "metadata").unwrap(),
1558 row.try_get::<Option<SinkSchemachange>>("", "schema_change")
1559 .unwrap()
1560 .map(|v| v.to_protobuf()),
1561 )
1562 })
1563 .collect()
1564 }
1565
1566 async fn set_epoch_aborted(db: &DatabaseConnection, sink_id: SinkId, epoch: u64) {
1567 let sql = format!(
1568 "UPDATE pending_sink_state SET sink_state = 'ABORTED' WHERE sink_id = {} AND epoch = {}",
1569 sink_id, epoch as i64
1570 );
1571 db.execute(sea_orm::Statement::from_string(
1572 db.get_database_backend(),
1573 sql,
1574 ))
1575 .await
1576 .unwrap();
1577 }
1578
1579 #[tokio::test]
1580 async fn test_pre_commit_failed() {
1581 let db = prepare_db_backend().await;
1582
1583 let param = SinkParam {
1584 sink_id: SinkId::from(1),
1585 sink_name: "test".into(),
1586 properties: Default::default(),
1587 columns: vec![],
1588 downstream_pk: None,
1589 sink_type: SinkType::AppendOnly,
1590 ignore_delete: false,
1591 format_desc: None,
1592 db_name: "test".into(),
1593 sink_from_name: "test".into(),
1594 };
1595
1596 let epoch1 = 233;
1597
1598 let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1599 let build_bitmap = |indexes: &[usize]| {
1600 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1601 for i in indexes {
1602 builder.set(*i, true);
1603 }
1604 builder.finish()
1605 };
1606 let vnode = build_bitmap(&all_vnode);
1607
1608 let metadata = vec![1u8, 2u8];
1609 let sender = Arc::new(tokio::sync::Mutex::new(None));
1610 let mock_subscriber: SinkCommittedEpochSubscriber = {
1611 let captured_sender = sender.clone();
1612 Arc::new(move |_sink_id: SinkId| {
1613 let (sender, receiver) = unbounded_channel();
1614 let captured_sender = captured_sender.clone();
1615 async move {
1616 let mut guard = captured_sender.lock().await;
1617 *guard = Some(sender);
1618 Ok((epoch1, receiver))
1619 }
1620 .boxed()
1621 })
1622 };
1623
1624 let (manager, (_join_handle, _stop_tx)) =
1625 SinkCoordinatorManager::start_worker_with_spawn_worker({
1626 let expected_param = param.clone();
1627 let db = db.clone();
1628 move |param, new_writer_rx| {
1629 let expected_param = expected_param.clone();
1630 let db = db.clone();
1631 tokio::spawn({
1632 let subscriber = mock_subscriber.clone();
1633 async move {
1634 assert_eq!(param, expected_param);
1636 CoordinatorWorker::execute_coordinator(
1637 db,
1638 param.clone(),
1639 new_writer_rx,
1640 MockTwoPhaseCoordinator::new_coordinator(
1641 move |_epoch, _metadata_list, _schema_change| {
1642 Err(SinkError::Coordinator(anyhow!("failed to pre commit")))
1643 },
1644 move |_epoch, _commit_metadata| unreachable!(),
1645 move |_epoch, _schema_change| unreachable!(),
1646 ),
1647 subscriber.clone(),
1648 )
1649 .await;
1650 }
1651 })
1652 }
1653 });
1654
1655 let build_client = |vnode| async {
1656 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1657 Ok(tonic::Response::new(
1658 manager
1659 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1660 .await
1661 .unwrap()
1662 .boxed(),
1663 ))
1664 })
1665 .await
1666 .unwrap()
1667 .0
1668 };
1669
1670 let mut client = build_client(vnode).await;
1671
1672 let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
1673 assert_eq!(aligned_epoch, 1);
1674
1675 let commit_result = client
1676 .commit(
1677 epoch1,
1678 SinkMetadata {
1679 metadata: Some(Metadata::Serialized(SerializedMetadata {
1680 metadata: metadata.clone(),
1681 })),
1682 },
1683 None,
1684 )
1685 .await;
1686 assert!(commit_result.is_err());
1687
1688 let rows = list_rows(&db).await;
1689 assert!(rows.is_empty());
1690 }
1691
1692 #[tokio::test]
1693 async fn test_waiting_on_checkpoint() {
1694 let db = prepare_db_backend().await;
1695
1696 let param = SinkParam {
1697 sink_id: SinkId::from(1),
1698 sink_name: "test".into(),
1699 properties: Default::default(),
1700 columns: vec![],
1701 downstream_pk: None,
1702 sink_type: SinkType::AppendOnly,
1703 ignore_delete: false,
1704 format_desc: None,
1705 db_name: "test".into(),
1706 sink_from_name: "test".into(),
1707 };
1708
1709 let epoch0 = 232;
1710 let epoch1 = 233;
1711
1712 let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1713 let build_bitmap = |indexes: &[usize]| {
1714 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1715 for i in indexes {
1716 builder.set(*i, true);
1717 }
1718 builder.finish()
1719 };
1720 let vnode = build_bitmap(&all_vnode);
1721
1722 let metadata = vec![1u8, 2u8];
1723
1724 let sender = Arc::new(tokio::sync::Mutex::new(None));
1725 let mock_subscriber: SinkCommittedEpochSubscriber = {
1726 let captured_sender = sender.clone();
1727 Arc::new(move |_sink_id: SinkId| {
1728 let (sender, receiver) = unbounded_channel();
1729 let captured_sender = captured_sender.clone();
1730 async move {
1731 let mut guard = captured_sender.lock().await;
1732 *guard = Some(sender);
1733 Ok((epoch0, receiver))
1734 }
1735 .boxed()
1736 })
1737 };
1738
1739 let (manager, (_join_handle, _stop_tx)) =
1740 SinkCoordinatorManager::start_worker_with_spawn_worker({
1741 let expected_param = param.clone();
1742 let metadata = metadata.clone();
1743 let db = db.clone();
1744 move |param, new_writer_rx| {
1745 let metadata = metadata.clone();
1746 let expected_param = expected_param.clone();
1747 let db = db.clone();
1748 tokio::spawn({
1749 let subscriber = mock_subscriber.clone();
1750 async move {
1751 assert_eq!(param, expected_param);
1753 CoordinatorWorker::execute_coordinator(
1754 db,
1755 param.clone(),
1756 new_writer_rx,
1757 MockTwoPhaseCoordinator::new_coordinator(
1758 move |_epoch, metadata_list, _schema_change| {
1759 let metadata =
1760 metadata_list.into_iter().exactly_one().unwrap();
1761 Ok(match metadata.metadata {
1762 Some(Metadata::Serialized(SerializedMetadata {
1763 metadata,
1764 })) => Some(metadata),
1765 _ => unreachable!(),
1766 })
1767 },
1768 move |_epoch, commit_metadata| {
1769 assert_eq!(commit_metadata, metadata);
1770 Ok(())
1771 },
1772 move |_epoch, _schema_change| unreachable!(),
1773 ),
1774 subscriber.clone(),
1775 )
1776 .await;
1777 }
1778 })
1779 }
1780 });
1781
1782 let build_client = |vnode| async {
1783 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1784 Ok(tonic::Response::new(
1785 manager
1786 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1787 .await
1788 .unwrap()
1789 .boxed(),
1790 ))
1791 })
1792 .await
1793 .unwrap()
1794 .0
1795 };
1796
1797 let mut client = build_client(vnode).await;
1798
1799 let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
1800 assert_eq!(aligned_epoch, 1);
1801
1802 client
1803 .commit(
1804 epoch1,
1805 SinkMetadata {
1806 metadata: Some(Metadata::Serialized(SerializedMetadata {
1807 metadata: metadata.clone(),
1808 })),
1809 },
1810 None,
1811 )
1812 .await
1813 .unwrap();
1814
1815 {
1816 let rows = list_rows(&db).await;
1817 assert_eq!(rows.len(), 1);
1818 assert_eq!(rows[0].1, epoch1 as i64);
1819 assert_eq!(rows[0].2, "PENDING");
1820
1821 let guard = sender.lock().await;
1822 let sender = guard.as_ref().unwrap().clone();
1823 sender.send(233).unwrap();
1824 }
1825
1826 for _ in 0..50 {
1828 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1829 let rows = list_rows(&db).await;
1830 if rows[0].2 == "COMMITTED" {
1831 break;
1832 }
1833 }
1834
1835 {
1836 let rows = list_rows(&db).await;
1837 assert_eq!(rows.len(), 1);
1838 assert_eq!(rows[0].1, epoch1 as i64);
1839 assert_eq!(rows[0].2, "COMMITTED");
1840 }
1841 }
1842
1843 #[tokio::test]
1844 async fn test_commit_retry_loop() {
1845 let db = prepare_db_backend().await;
1846
1847 let param = SinkParam {
1848 sink_id: SinkId::from(1),
1849 sink_name: "test".into(),
1850 properties: Default::default(),
1851 columns: vec![],
1852 downstream_pk: None,
1853 sink_type: SinkType::AppendOnly,
1854 ignore_delete: false,
1855 format_desc: None,
1856 db_name: "test".into(),
1857 sink_from_name: "test".into(),
1858 };
1859
1860 let epoch1 = 233;
1861
1862 let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1863 let build_bitmap = |indexes: &[usize]| {
1864 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1865 for i in indexes {
1866 builder.set(*i, true);
1867 }
1868 builder.finish()
1869 };
1870 let vnode = build_bitmap(&all_vnode);
1871
1872 let metadata = vec![1u8, 2u8];
1873 let sender = Arc::new(tokio::sync::Mutex::new(None));
1874 let mock_subscriber: SinkCommittedEpochSubscriber = {
1875 let captured_sender = sender.clone();
1876 Arc::new(move |_sink_id: SinkId| {
1877 let (sender, receiver) = unbounded_channel();
1878 let captured_sender = captured_sender.clone();
1879 async move {
1880 let mut guard = captured_sender.lock().await;
1881 *guard = Some(sender);
1882 Ok((epoch1, receiver))
1883 }
1884 .boxed()
1885 })
1886 };
1887
1888 let commit_attempt = Arc::new(AtomicI32::new(0));
1889
1890 let (manager, (_join_handle, _stop_tx)) =
1891 SinkCoordinatorManager::start_worker_with_spawn_worker({
1892 let expected_param = param.clone();
1893 let metadata = metadata.clone();
1894 let db = db.clone();
1895 let commit_attempt = commit_attempt.clone();
1896 move |param, new_writer_rx| {
1897 let metadata = metadata.clone();
1898 let expected_param = expected_param.clone();
1899 let db = db.clone();
1900 let commit_attempt = commit_attempt.clone();
1901 tokio::spawn({
1902 let subscriber = mock_subscriber.clone();
1903 async move {
1904 assert_eq!(param, expected_param);
1906 CoordinatorWorker::execute_coordinator(
1907 db,
1908 param.clone(),
1909 new_writer_rx,
1910 MockTwoPhaseCoordinator::new_coordinator(
1911 move |_epoch, metadata_list, _schema_change| {
1912 let metadata =
1913 metadata_list.into_iter().exactly_one().unwrap();
1914 Ok(match metadata.metadata {
1915 Some(Metadata::Serialized(SerializedMetadata {
1916 metadata,
1917 })) => Some(metadata),
1918 _ => unreachable!(),
1919 })
1920 },
1921 move |_epoch, commit_metadata| {
1922 assert_eq!(commit_metadata, metadata);
1923 if commit_attempt
1924 .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
1925 < 2
1926 {
1927 Err(SinkError::Coordinator(anyhow!("failed to commit")))
1928 } else {
1929 Ok(())
1930 }
1931 },
1932 move |_epoch, _schema_change| unreachable!(),
1933 ),
1934 subscriber.clone(),
1935 )
1936 .await;
1937 }
1938 })
1939 }
1940 });
1941
1942 let build_client = |vnode| async {
1943 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1944 Ok(tonic::Response::new(
1945 manager
1946 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1947 .await
1948 .unwrap()
1949 .boxed(),
1950 ))
1951 })
1952 .await
1953 .unwrap()
1954 .0
1955 };
1956
1957 let mut client = build_client(vnode).await;
1958
1959 let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
1960 assert_eq!(aligned_epoch, 1);
1961
1962 client
1963 .commit(
1964 epoch1,
1965 SinkMetadata {
1966 metadata: Some(Metadata::Serialized(SerializedMetadata {
1967 metadata: metadata.clone(),
1968 })),
1969 },
1970 None,
1971 )
1972 .await
1973 .unwrap();
1974
1975 for _ in 0..100 {
1977 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1978 let rows = list_rows(&db).await;
1979 if rows[0].2 == "COMMITTED" {
1980 break;
1981 }
1982 }
1983
1984 assert_eq!(commit_attempt.load(std::sync::atomic::Ordering::SeqCst), 3);
1985
1986 {
1987 let rows = list_rows(&db).await;
1988 assert_eq!(rows.len(), 1);
1989 assert_eq!(rows[0].1, epoch1 as i64);
1990 assert_eq!(rows[0].2, "COMMITTED");
1991 }
1992 }
1993
1994 #[tokio::test]
1995 async fn test_aborted() {
1996 let db = prepare_db_backend().await;
1997
1998 let param = SinkParam {
1999 sink_id: SinkId::from(1),
2000 sink_name: "test".into(),
2001 properties: Default::default(),
2002 columns: vec![],
2003 downstream_pk: None,
2004 sink_type: SinkType::AppendOnly,
2005 ignore_delete: false,
2006 format_desc: None,
2007 db_name: "test".into(),
2008 sink_from_name: "test".into(),
2009 };
2010
2011 let epoch0 = 232;
2012 let epoch1 = 233;
2013
2014 let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
2015 let build_bitmap = |indexes: &[usize]| {
2016 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
2017 for i in indexes {
2018 builder.set(*i, true);
2019 }
2020 builder.finish()
2021 };
2022 let vnode = build_bitmap(&all_vnode);
2023
2024 let metadata = vec![1u8, 2u8];
2025
2026 let sender = Arc::new(tokio::sync::Mutex::new(None));
2027 let mock_subscriber: SinkCommittedEpochSubscriber = {
2028 let captured_sender = sender.clone();
2029 Arc::new(move |_sink_id: SinkId| {
2030 let (sender, receiver) = unbounded_channel();
2031 let captured_sender = captured_sender.clone();
2032 async move {
2033 let mut guard = captured_sender.lock().await;
2034 *guard = Some(sender);
2035 Ok((epoch0, receiver))
2036 }
2037 .boxed()
2038 })
2039 };
2040
2041 let (manager, (_join_handle, _stop_tx)) =
2042 SinkCoordinatorManager::start_worker_with_spawn_worker({
2043 let expected_param = param.clone();
2044 let metadata = metadata.clone();
2045 let db = db.clone();
2046 move |param, new_writer_rx| {
2047 let metadata = metadata.clone();
2048 let expected_param = expected_param.clone();
2049 let db = db.clone();
2050 tokio::spawn({
2051 let subscriber = mock_subscriber.clone();
2052 async move {
2053 assert_eq!(param, expected_param);
2055 CoordinatorWorker::execute_coordinator(
2056 db,
2057 param.clone(),
2058 new_writer_rx,
2059 MockTwoPhaseCoordinator::new_coordinator(
2060 move |_epoch, metadata_list, _schema_change| {
2061 let metadata =
2062 metadata_list.into_iter().exactly_one().unwrap();
2063 Ok(match metadata.metadata {
2064 Some(Metadata::Serialized(SerializedMetadata {
2065 metadata,
2066 })) => Some(metadata),
2067 _ => unreachable!(),
2068 })
2069 },
2070 move |_epoch, commit_metadata| {
2071 assert_eq!(commit_metadata, metadata);
2072 Ok(())
2073 },
2074 move |_epoch, _schema_change| unreachable!(),
2075 ),
2076 subscriber.clone(),
2077 )
2078 .await;
2079 }
2080 })
2081 }
2082 });
2083
2084 let build_client = |vnode| async {
2085 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
2086 Ok(tonic::Response::new(
2087 manager
2088 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
2089 .await
2090 .unwrap()
2091 .boxed(),
2092 ))
2093 })
2094 .await
2095 .unwrap()
2096 .0
2097 };
2098
2099 let mut client = build_client(vnode.clone()).await;
2100
2101 let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
2102 assert_eq!(aligned_epoch, 1);
2103
2104 client
2105 .commit(
2106 epoch1,
2107 SinkMetadata {
2108 metadata: Some(Metadata::Serialized(SerializedMetadata {
2109 metadata: metadata.clone(),
2110 })),
2111 },
2112 None,
2113 )
2114 .await
2115 .unwrap();
2116
2117 manager.stop_sink_coordinator(vec![SinkId::from(1)]).await;
2118
2119 {
2120 let rows = list_rows(&db).await;
2121 assert_eq!(rows.len(), 1);
2122 assert_eq!(rows[0].1, epoch1 as i64);
2123 assert_eq!(rows[0].2, "PENDING");
2124
2125 set_epoch_aborted(&db, SinkId::from(1), epoch1).await;
2126 let rows = list_rows(&db).await;
2127 assert_eq!(rows.len(), 1);
2128 assert_eq!(rows[0].1, epoch1 as i64);
2129 assert_eq!(rows[0].2, "ABORTED");
2130 }
2131
2132 let mut client = build_client(vnode).await;
2133
2134 let aligned_epoch = client.align_initial_epoch(1).await.unwrap();
2135 assert_eq!(aligned_epoch, 1);
2136
2137 {
2138 let rows = list_rows(&db).await;
2139 assert!(rows.is_empty());
2140 }
2141 }
2142
2143 #[tokio::test]
2144 async fn test_flush_when_reschedule() {
2145 let db = prepare_db_backend().await;
2146
2147 let param = SinkParam {
2148 sink_id: SinkId::from(1),
2149 sink_name: "test".into(),
2150 properties: Default::default(),
2151 columns: vec![],
2152 downstream_pk: None,
2153 sink_type: SinkType::AppendOnly,
2154 ignore_delete: false,
2155 format_desc: None,
2156 db_name: "test".into(),
2157 sink_from_name: "test".into(),
2158 };
2159
2160 let epoch0 = 232;
2161 let epoch1 = 233;
2162
2163 let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
2164 let build_bitmap = |indexes: &[usize]| {
2165 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
2166 for i in indexes {
2167 builder.set(*i, true);
2168 }
2169 builder.finish()
2170 };
2171 let vnode = build_bitmap(&all_vnode);
2172
2173 let metadata = vec![1u8, 2u8];
2174 let schema_change = PbSinkSchemaChange {
2175 original_schema: vec![PbField {
2176 data_type: Some(PbDataType {
2177 type_name: PbTypeName::Int32 as i32,
2178 ..Default::default()
2179 }),
2180 name: "col_v1".into(),
2181 }],
2182 op: Some(SinkSchemachangeOp::AddColumns(PbSinkAddColumnsOp {
2183 fields: vec![PbField {
2184 data_type: Some(PbDataType {
2185 type_name: PbTypeName::Varchar as i32,
2186 ..Default::default()
2187 }),
2188 name: "new_col".into(),
2189 }],
2190 })),
2191 };
2192
2193 let sender = Arc::new(tokio::sync::Mutex::new(None));
2194 let mock_subscriber: SinkCommittedEpochSubscriber = {
2195 let captured_sender = sender.clone();
2196 Arc::new(move |_sink_id: SinkId| {
2197 let (sender, receiver) = unbounded_channel();
2198 let captured_sender = captured_sender.clone();
2199 async move {
2200 let mut guard = captured_sender.lock().await;
2201 *guard = Some(sender);
2202 Ok((epoch0, receiver))
2203 }
2204 .boxed()
2205 })
2206 };
2207
2208 let (manager, (_join_handle, _stop_tx)) =
2209 SinkCoordinatorManager::start_worker_with_spawn_worker({
2210 let expected_param = param.clone();
2211 let metadata = metadata.clone();
2212 let schema_change = schema_change.clone();
2213 let db = db.clone();
2214 move |param, new_writer_rx| {
2215 let metadata = metadata.clone();
2216 let schema_change_for_pre_commit = schema_change.clone();
2217 let schema_change_for_commit = schema_change.clone();
2218 let expected_param = expected_param.clone();
2219 let db = db.clone();
2220 tokio::spawn({
2221 let subscriber = mock_subscriber.clone();
2222 async move {
2223 assert_eq!(param, expected_param);
2224 CoordinatorWorker::execute_coordinator(
2225 db,
2226 param.clone(),
2227 new_writer_rx,
2228 MockTwoPhaseCoordinator::new_coordinator(
2229 move |_epoch, metadata_list, schema_change| {
2230 assert_eq!(
2231 schema_change,
2232 Some(schema_change_for_pre_commit.clone())
2233 );
2234 let metadata =
2235 metadata_list.into_iter().exactly_one().unwrap();
2236 Ok(match metadata.metadata {
2237 Some(Metadata::Serialized(SerializedMetadata {
2238 metadata,
2239 })) => Some(metadata),
2240 _ => unreachable!(),
2241 })
2242 },
2243 move |_epoch, commit_metadata| {
2244 assert_eq!(commit_metadata, metadata);
2245 Ok(())
2246 },
2247 move |_epoch, schema_change| {
2248 assert_eq!(schema_change, schema_change_for_commit.clone());
2249 Ok(())
2250 },
2251 ),
2252 subscriber.clone(),
2253 )
2254 .await;
2255 }
2256 })
2257 }
2258 });
2259
2260 let build_client = |vnode| async {
2261 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
2262 Ok(tonic::Response::new(
2263 manager
2264 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
2265 .await
2266 .unwrap()
2267 .boxed(),
2268 ))
2269 })
2270 .await
2271 };
2272
2273 let (mut client1, _) = build_client(vnode.clone()).await.unwrap();
2274
2275 let aligned_epoch = client1.align_initial_epoch(1).await.unwrap();
2276 assert_eq!(aligned_epoch, 1);
2277
2278 client1
2279 .commit(
2280 epoch1,
2281 SinkMetadata {
2282 metadata: Some(Metadata::Serialized(SerializedMetadata {
2283 metadata: metadata.clone(),
2284 })),
2285 },
2286 Some(schema_change.clone()),
2287 )
2288 .await
2289 .unwrap();
2290
2291 {
2292 let rows = list_rows(&db).await;
2293 assert_eq!(rows.len(), 1);
2294 assert_eq!(rows[0].1, epoch1 as i64);
2295 assert_eq!(rows[0].2, "PENDING");
2296 assert_eq!(rows[0].4, Some(schema_change.clone()));
2297 }
2298
2299 let mut build_client2_future = pin!(build_client(vnode.clone()));
2300 assert!(
2301 poll_fn(|cx| Poll::Ready(build_client2_future.as_mut().poll(cx)))
2302 .await
2303 .is_pending()
2304 );
2305
2306 client1.stop().await.unwrap();
2307
2308 assert!(
2309 poll_fn(|cx| Poll::Ready(build_client2_future.as_mut().poll(cx)))
2310 .await
2311 .is_pending()
2312 );
2313
2314 {
2315 let guard = sender.lock().await;
2316 let sender = guard.as_ref().unwrap().clone();
2317 sender.send(epoch1).unwrap();
2318 }
2319
2320 let (_, init_epoch) = build_client2_future.await.unwrap();
2321 assert_eq!(init_epoch, Some(epoch1));
2322
2323 {
2324 let rows = list_rows(&db).await;
2325 assert_eq!(rows.len(), 1);
2326 assert_eq!(rows[0].1, epoch1 as i64);
2327 assert_eq!(rows[0].2, "COMMITTED");
2328 assert_eq!(rows[0].4, Some(schema_change.clone()));
2329 }
2330 }
2331}