1use std::collections::HashMap;
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use futures::future::{BoxFuture, Either, select};
21use futures::stream::FuturesUnordered;
22use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
25use risingwave_connector::sink::catalog::SinkId;
26use risingwave_connector::sink::{SinkCommittedEpochSubscriber, SinkError, SinkParam};
27use risingwave_pb::connector_service::coordinate_request::Msg;
28use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse, coordinate_request};
29use rw_futures_util::pending_on_none;
30use sea_orm::DatabaseConnection;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
34use tokio::sync::oneshot::{Receiver, Sender, channel};
35use tokio::task::{JoinError, JoinHandle};
36use tokio_stream::wrappers::UnboundedReceiverStream;
37use tonic::Status;
38use tracing::{debug, error, info, warn};
39
40use crate::hummock::HummockManagerRef;
41use crate::manager::MetadataManager;
42use crate::manager::sink_coordination::SinkWriterRequestStream;
43use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
44use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
45
46macro_rules! send_with_err_check {
47 ($tx:expr, $msg:expr) => {
48 if $tx.send($msg).is_err() {
49 error!("unable to send msg");
50 }
51 };
52}
53
54macro_rules! send_await_with_err_check {
55 ($tx:expr, $msg:expr) => {
56 if $tx.send($msg).await.is_err() {
57 error!("unable to send msg");
58 }
59 };
60}
61
62const BOUNDED_CHANNEL_SIZE: usize = 16;
63
64enum ManagerRequest {
65 NewSinkWriter(SinkWriterCoordinationHandle),
66 StopCoordinator {
67 finish_notifier: Sender<()>,
68 sink_id: Option<SinkId>,
70 },
71}
72
73#[derive(Clone)]
74pub struct SinkCoordinatorManager {
75 request_tx: mpsc::Sender<ManagerRequest>,
76}
77
78fn new_committed_epoch_subscriber(
79 hummock_manager: HummockManagerRef,
80 metadata_manager: MetadataManager,
81) -> SinkCommittedEpochSubscriber {
82 Arc::new(move |sink_id| {
83 let hummock_manager = hummock_manager.clone();
84 let metadata_manager = metadata_manager.clone();
85 async move {
86 let state_table_ids = metadata_manager
87 .get_sink_state_table_ids(sink_id)
88 .await
89 .map_err(SinkError::from)?;
90 let Some(table_id) = state_table_ids.first() else {
91 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
92 };
93 hummock_manager
94 .subscribe_table_committed_epoch(*table_id)
95 .await
96 .map_err(SinkError::from)
97 }
98 .boxed()
99 })
100}
101
102impl SinkCoordinatorManager {
103 #[cfg(test)]
104 pub(crate) fn for_test() -> Self {
105 let (request_tx, mut request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
106 let _join_handle = tokio::spawn(async move {
107 while let Some(req) = request_rx.recv().await {
108 let ManagerRequest::StopCoordinator {
109 finish_notifier,
110 sink_id,
111 } = req
112 else {
113 unreachable!()
114 };
115 assert_eq!(sink_id, None);
116 finish_notifier.send(()).unwrap();
117 }
118 });
119 SinkCoordinatorManager { request_tx }
120 }
121
122 pub fn start_worker(
123 db: DatabaseConnection,
124 hummock_manager: HummockManagerRef,
125 metadata_manager: MetadataManager,
126 iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
127 ) -> (Self, (JoinHandle<()>, Sender<()>)) {
128 let subscriber = new_committed_epoch_subscriber(hummock_manager, metadata_manager);
129 Self::start_worker_with_spawn_worker(move |param, manager_request_stream| {
130 tokio::spawn(CoordinatorWorker::run(
131 param,
132 manager_request_stream,
133 db.clone(),
134 subscriber.clone(),
135 iceberg_compact_stat_sender.clone(),
136 ))
137 })
138 }
139
140 fn start_worker_with_spawn_worker(
141 spawn_coordinator_worker: impl SpawnCoordinatorFn,
142 ) -> (Self, (JoinHandle<()>, Sender<()>)) {
143 let (request_tx, request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
144 let (shutdown_tx, shutdown_rx) = channel();
145 let worker = ManagerWorker::new(request_rx, shutdown_rx);
146 let join_handle = tokio::spawn(worker.execute(spawn_coordinator_worker));
147 (
148 SinkCoordinatorManager { request_tx },
149 (join_handle, shutdown_tx),
150 )
151 }
152
153 pub async fn handle_new_request(
154 &self,
155 mut request_stream: SinkWriterRequestStream,
156 ) -> Result<impl Stream<Item = Result<CoordinateResponse, Status>> + use<>, Status> {
157 let (param, vnode_bitmap) = match request_stream.try_next().await? {
158 Some(CoordinateRequest {
159 msg:
160 Some(Msg::StartRequest(coordinate_request::StartCoordinationRequest {
161 param: Some(param),
162 vnode_bitmap: Some(vnode_bitmap),
163 })),
164 }) => (SinkParam::from_proto(param), Bitmap::from(&vnode_bitmap)),
165 msg => {
166 return Err(Status::invalid_argument(format!(
167 "expected CoordinateRequest::StartRequest in the first request, get {:?}",
168 msg
169 )));
170 }
171 };
172 let (response_tx, response_rx) = mpsc::unbounded_channel();
173 self.request_tx
174 .send(ManagerRequest::NewSinkWriter(
175 SinkWriterCoordinationHandle::new(request_stream, response_tx, param, vnode_bitmap),
176 ))
177 .await
178 .map_err(|_| {
179 Status::unavailable(
180 "unable to send to sink manager worker. The worker may have stopped",
181 )
182 })?;
183
184 Ok(UnboundedReceiverStream::new(response_rx))
185 }
186
187 async fn stop_coordinator(&self, sink_id: Option<SinkId>) {
188 let (tx, rx) = channel();
189 send_await_with_err_check!(
190 self.request_tx,
191 ManagerRequest::StopCoordinator {
192 finish_notifier: tx,
193 sink_id,
194 }
195 );
196 if rx.await.is_err() {
197 error!("fail to wait for resetting sink manager worker");
198 }
199 info!("successfully stop coordinator: {:?}", sink_id);
200 }
201
202 pub async fn reset(&self) {
203 self.stop_coordinator(None).await;
204 }
205
206 pub async fn stop_sink_coordinator(&self, sink_id: SinkId) {
207 self.stop_coordinator(Some(sink_id)).await;
208 }
209}
210
211struct CoordinatorWorkerHandle {
212 request_sender: Option<UnboundedSender<SinkWriterCoordinationHandle>>,
214 finish_notifiers: Vec<Sender<()>>,
216}
217
218struct ManagerWorker {
219 request_rx: mpsc::Receiver<ManagerRequest>,
220 shutdown_rx: Receiver<()>,
222
223 running_coordinator_worker_join_handles:
224 FuturesUnordered<BoxFuture<'static, (SinkId, Result<(), JoinError>)>>,
225 running_coordinator_worker: HashMap<SinkId, CoordinatorWorkerHandle>,
226}
227
228enum ManagerEvent {
229 NewRequest(ManagerRequest),
230 CoordinatorWorkerFinished {
231 sink_id: SinkId,
232 join_result: Result<(), JoinError>,
233 },
234}
235
236trait SpawnCoordinatorFn = FnMut(SinkParam, UnboundedReceiver<SinkWriterCoordinationHandle>) -> JoinHandle<()>
237 + Send
238 + 'static;
239
240impl ManagerWorker {
241 fn new(request_rx: mpsc::Receiver<ManagerRequest>, shutdown_rx: Receiver<()>) -> Self {
242 ManagerWorker {
243 request_rx,
244 shutdown_rx,
245 running_coordinator_worker_join_handles: Default::default(),
246 running_coordinator_worker: Default::default(),
247 }
248 }
249
250 async fn execute(mut self, mut spawn_coordinator_worker: impl SpawnCoordinatorFn) {
251 while let Some(event) = self.next_event().await {
252 match event {
253 ManagerEvent::NewRequest(request) => match request {
254 ManagerRequest::NewSinkWriter(request) => {
255 self.handle_new_sink_writer(request, &mut spawn_coordinator_worker)
256 }
257 ManagerRequest::StopCoordinator {
258 finish_notifier,
259 sink_id,
260 } => {
261 if let Some(sink_id) = sink_id {
262 if let Some(worker_handle) =
263 self.running_coordinator_worker.get_mut(&sink_id)
264 {
265 if let Some(sender) = worker_handle.request_sender.take() {
266 drop(sender);
269 }
270 worker_handle.finish_notifiers.push(finish_notifier);
271 } else {
272 debug!(
273 "sink coordinator of {} is not running. Notify finish directly",
274 sink_id
275 );
276 send_with_err_check!(finish_notifier, ());
277 }
278 } else {
279 self.clean_up().await;
280 send_with_err_check!(finish_notifier, ());
281 }
282 }
283 },
284 ManagerEvent::CoordinatorWorkerFinished {
285 sink_id,
286 join_result,
287 } => self.handle_coordinator_finished(sink_id, join_result),
288 }
289 }
290 self.clean_up().await;
291 info!("sink manager worker exited");
292 }
293
294 async fn next_event(&mut self) -> Option<ManagerEvent> {
295 match select(
296 select(
297 pin!(self.request_rx.recv()),
298 pin!(pending_on_none(
299 self.running_coordinator_worker_join_handles.next()
300 )),
301 ),
302 &mut self.shutdown_rx,
303 )
304 .await
305 {
306 Either::Left((either, _)) => match either {
307 Either::Left((Some(request), _)) => Some(ManagerEvent::NewRequest(request)),
308 Either::Left((None, _)) => None,
309 Either::Right(((sink_id, join_result), _)) => {
310 Some(ManagerEvent::CoordinatorWorkerFinished {
311 sink_id,
312 join_result,
313 })
314 }
315 },
316 Either::Right(_) => None,
317 }
318 }
319
320 async fn clean_up(&mut self) {
321 info!("sink manager worker start cleaning up");
322 for worker_handle in self.running_coordinator_worker.values_mut() {
323 if let Some(sender) = worker_handle.request_sender.take() {
324 drop(sender);
326 }
327 }
328 while let Some((sink_id, join_result)) =
329 self.running_coordinator_worker_join_handles.next().await
330 {
331 self.handle_coordinator_finished(sink_id, join_result);
332 }
333 info!("sink manager worker finished cleaning up");
334 }
335
336 fn handle_coordinator_finished(&mut self, sink_id: SinkId, join_result: Result<(), JoinError>) {
337 let worker_handle = self
338 .running_coordinator_worker
339 .remove(&sink_id)
340 .expect("finished coordinator should have an associated worker handle");
341 for finish_notifier in worker_handle.finish_notifiers {
342 send_with_err_check!(finish_notifier, ());
343 }
344 match join_result {
345 Ok(()) => {
346 info!(
347 id = %sink_id,
348 "sink coordinator has gracefully finished",
349 );
350 }
351 Err(err) => {
352 error!(
353 id = %sink_id,
354 error = %err.as_report(),
355 "sink coordinator finished with error",
356 );
357 }
358 }
359 }
360
361 fn handle_new_sink_writer(
362 &mut self,
363 new_writer: SinkWriterCoordinationHandle,
364 spawn_coordinator_worker: &mut impl SpawnCoordinatorFn,
365 ) {
366 let param = new_writer.param();
367 let sink_id = param.sink_id;
368
369 let handle = self
370 .running_coordinator_worker
371 .entry(param.sink_id)
372 .or_insert_with(|| {
373 let (request_tx, request_rx) = unbounded_channel();
375 let join_handle = spawn_coordinator_worker(param.clone(), request_rx);
376 self.running_coordinator_worker_join_handles.push(
377 join_handle
378 .map(move |join_result| (sink_id, join_result))
379 .boxed(),
380 );
381 CoordinatorWorkerHandle {
382 request_sender: Some(request_tx),
383 finish_notifiers: Vec::new(),
384 }
385 });
386
387 if let Some(sender) = handle.request_sender.as_mut() {
388 send_with_err_check!(sender, new_writer);
389 } else {
390 warn!(
391 "handle a new request while the sink coordinator is being stopped: {:?}",
392 param
393 );
394 new_writer.abort(Status::internal("the sink is being stopped"));
395 }
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use std::future::{Future, poll_fn};
402 use std::pin::pin;
403 use std::sync::Arc;
404 use std::task::Poll;
405
406 use anyhow::anyhow;
407 use async_trait::async_trait;
408 use futures::future::{join, try_join};
409 use futures::{FutureExt, StreamExt, TryFutureExt};
410 use itertools::Itertools;
411 use rand::seq::SliceRandom;
412 use risingwave_common::bitmap::BitmapBuilder;
413 use risingwave_common::catalog::Field;
414 use risingwave_common::hash::VirtualNode;
415 use risingwave_connector::sink::catalog::{SinkId, SinkType};
416 use risingwave_connector::sink::{SinkCommitCoordinator, SinkError, SinkParam};
417 use risingwave_pb::connector_service::SinkMetadata;
418 use risingwave_pb::connector_service::sink_metadata::{Metadata, SerializedMetadata};
419 use risingwave_rpc_client::CoordinatorStreamHandle;
420 use tokio::sync::mpsc::unbounded_channel;
421 use tokio_stream::wrappers::ReceiverStream;
422
423 use crate::manager::sink_coordination::SinkCoordinatorManager;
424 use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
425 use crate::manager::sink_coordination::manager::SinkCommittedEpochSubscriber;
426
427 struct MockCoordinator<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> {
428 context: C,
429 f: F,
430 }
431
432 impl<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> MockCoordinator<C, F> {
433 fn new(context: C, f: F) -> Self {
434 MockCoordinator { context, f }
435 }
436 }
437
438 #[async_trait]
439 impl<C: Send, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError> + Send>
440 SinkCommitCoordinator for MockCoordinator<C, F>
441 {
442 async fn init(
443 &mut self,
444 _subscriber: SinkCommittedEpochSubscriber,
445 ) -> risingwave_connector::sink::Result<Option<u64>> {
446 Ok(None)
447 }
448
449 async fn commit(
450 &mut self,
451 epoch: u64,
452 metadata: Vec<SinkMetadata>,
453 _add_columns: Option<Vec<Field>>,
454 ) -> risingwave_connector::sink::Result<()> {
455 (self.f)(epoch, metadata, &mut self.context)
456 }
457 }
458
459 #[tokio::test]
460 async fn test_basic() {
461 let param = SinkParam {
462 sink_id: SinkId::from(1),
463 sink_name: "test".into(),
464 properties: Default::default(),
465 columns: vec![],
466 downstream_pk: None,
467 sink_type: SinkType::AppendOnly,
468 format_desc: None,
469 db_name: "test".into(),
470 sink_from_name: "test".into(),
471 };
472
473 let epoch0 = 232;
474 let epoch1 = 233;
475 let epoch2 = 234;
476
477 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
478 all_vnode.shuffle(&mut rand::rng());
479 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
480 let build_bitmap = |indexes: &[usize]| {
481 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
482 for i in indexes {
483 builder.set(*i, true);
484 }
485 builder.finish()
486 };
487 let vnode1 = build_bitmap(first);
488 let vnode2 = build_bitmap(second);
489
490 let metadata = [
491 [vec![1u8, 2u8], vec![3u8, 4u8]],
492 [vec![5u8, 6u8], vec![7u8, 8u8]],
493 ];
494 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
495 let (_sender, receiver) = unbounded_channel();
496
497 async move { Ok((1, receiver)) }.boxed()
498 });
499
500 let (manager, (_join_handle, _stop_tx)) =
501 SinkCoordinatorManager::start_worker_with_spawn_worker({
502 let expected_param = param.clone();
503 let metadata = metadata.clone();
504 move |param, new_writer_rx| {
505 let metadata = metadata.clone();
506 let expected_param = expected_param.clone();
507 tokio::spawn({
508 let subscriber = mock_subscriber.clone();
509 async move {
510 assert_eq!(param, expected_param);
512 CoordinatorWorker::execute_coordinator(
513 param.clone(),
514 new_writer_rx,
515 MockCoordinator::new(
516 0,
517 |epoch, metadata_list, count: &mut usize| {
518 *count += 1;
519 let mut metadata_list =
520 metadata_list
521 .into_iter()
522 .map(|metadata| match metadata {
523 SinkMetadata {
524 metadata:
525 Some(Metadata::Serialized(
526 SerializedMetadata { metadata },
527 )),
528 } => metadata,
529 _ => unreachable!(),
530 })
531 .collect_vec();
532 metadata_list.sort();
533 match *count {
534 1 => {
535 assert_eq!(epoch, epoch1);
536 assert_eq!(2, metadata_list.len());
537 assert_eq!(metadata[0][0], metadata_list[0]);
538 assert_eq!(metadata[0][1], metadata_list[1]);
539 }
540 2 => {
541 assert_eq!(epoch, epoch2);
542 assert_eq!(2, metadata_list.len());
543 assert_eq!(metadata[1][0], metadata_list[0]);
544 assert_eq!(metadata[1][1], metadata_list[1]);
545 }
546 _ => unreachable!(),
547 }
548 Ok(())
549 },
550 ),
551 subscriber.clone(),
552 )
553 .await;
554 }
555 })
556 }
557 });
558
559 let build_client = |vnode| async {
560 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
561 Ok(tonic::Response::new(
562 manager
563 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
564 .await
565 .unwrap()
566 .boxed(),
567 ))
568 })
569 .await
570 .unwrap()
571 .0
572 };
573
574 let (mut client1, mut client2) =
575 join(build_client(vnode1), pin!(build_client(vnode2))).await;
576
577 let (aligned_epoch1, aligned_epoch2) = try_join(
578 client1.align_initial_epoch(epoch0),
579 client2.align_initial_epoch(epoch1),
580 )
581 .await
582 .unwrap();
583 assert_eq!(aligned_epoch1, epoch1);
584 assert_eq!(aligned_epoch2, epoch1);
585
586 {
587 let mut commit_future = pin!(
589 client2
590 .commit(
591 epoch1,
592 SinkMetadata {
593 metadata: Some(Metadata::Serialized(SerializedMetadata {
594 metadata: metadata[0][1].clone(),
595 })),
596 },
597 None,
598 )
599 .map(|result| result.unwrap())
600 );
601 assert!(
602 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
603 .await
604 .is_pending()
605 );
606 join(
607 commit_future,
608 client1
609 .commit(
610 epoch1,
611 SinkMetadata {
612 metadata: Some(Metadata::Serialized(SerializedMetadata {
613 metadata: metadata[0][0].clone(),
614 })),
615 },
616 None,
617 )
618 .map(|result| result.unwrap()),
619 )
620 .await;
621 }
622
623 let mut commit_future = pin!(
625 client1
626 .commit(
627 epoch2,
628 SinkMetadata {
629 metadata: Some(Metadata::Serialized(SerializedMetadata {
630 metadata: metadata[1][0].clone(),
631 })),
632 },
633 None,
634 )
635 .map(|result| result.unwrap())
636 );
637 assert!(
638 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
639 .await
640 .is_pending()
641 );
642 join(
643 commit_future,
644 client2
645 .commit(
646 epoch2,
647 SinkMetadata {
648 metadata: Some(Metadata::Serialized(SerializedMetadata {
649 metadata: metadata[1][1].clone(),
650 })),
651 },
652 None,
653 )
654 .map(|result| result.unwrap()),
655 )
656 .await;
657 }
658
659 #[tokio::test]
660 async fn test_single_writer() {
661 let param = SinkParam {
662 sink_id: SinkId::from(1),
663 sink_name: "test".into(),
664 properties: Default::default(),
665 columns: vec![],
666 downstream_pk: None,
667 sink_type: SinkType::AppendOnly,
668 format_desc: None,
669 db_name: "test".into(),
670 sink_from_name: "test".into(),
671 };
672
673 let epoch1 = 233;
674 let epoch2 = 234;
675
676 let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
677 let build_bitmap = |indexes: &[usize]| {
678 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
679 for i in indexes {
680 builder.set(*i, true);
681 }
682 builder.finish()
683 };
684 let vnode = build_bitmap(&all_vnode);
685
686 let metadata = [vec![1u8, 2u8], vec![3u8, 4u8]];
687 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
688 let (_sender, receiver) = unbounded_channel();
689
690 async move { Ok((1, receiver)) }.boxed()
691 });
692 let (manager, (_join_handle, _stop_tx)) =
693 SinkCoordinatorManager::start_worker_with_spawn_worker({
694 let expected_param = param.clone();
695 let metadata = metadata.clone();
696 move |param, new_writer_rx| {
697 let metadata = metadata.clone();
698 let expected_param = expected_param.clone();
699 tokio::spawn({
700 let subscriber = mock_subscriber.clone();
701 async move {
702 assert_eq!(param, expected_param);
704 CoordinatorWorker::execute_coordinator(
705 param.clone(),
706 new_writer_rx,
707 MockCoordinator::new(
708 0,
709 |epoch, metadata_list, count: &mut usize| {
710 *count += 1;
711 let mut metadata_list =
712 metadata_list
713 .into_iter()
714 .map(|metadata| match metadata {
715 SinkMetadata {
716 metadata:
717 Some(Metadata::Serialized(
718 SerializedMetadata { metadata },
719 )),
720 } => metadata,
721 _ => unreachable!(),
722 })
723 .collect_vec();
724 metadata_list.sort();
725 match *count {
726 1 => {
727 assert_eq!(epoch, epoch1);
728 assert_eq!(1, metadata_list.len());
729 assert_eq!(metadata[0], metadata_list[0]);
730 }
731 2 => {
732 assert_eq!(epoch, epoch2);
733 assert_eq!(1, metadata_list.len());
734 assert_eq!(metadata[1], metadata_list[0]);
735 }
736 _ => unreachable!(),
737 }
738 Ok(())
739 },
740 ),
741 subscriber.clone(),
742 )
743 .await;
744 }
745 })
746 }
747 });
748
749 let build_client = |vnode| async {
750 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
751 Ok(tonic::Response::new(
752 manager
753 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
754 .await
755 .unwrap()
756 .boxed(),
757 ))
758 })
759 .await
760 .unwrap()
761 .0
762 };
763
764 let mut client = build_client(vnode).await;
765
766 let aligned_epoch = client.align_initial_epoch(epoch1).await.unwrap();
767 assert_eq!(aligned_epoch, epoch1);
768
769 client
770 .commit(
771 epoch1,
772 SinkMetadata {
773 metadata: Some(Metadata::Serialized(SerializedMetadata {
774 metadata: metadata[0].clone(),
775 })),
776 },
777 None,
778 )
779 .await
780 .unwrap();
781
782 client
783 .commit(
784 epoch2,
785 SinkMetadata {
786 metadata: Some(Metadata::Serialized(SerializedMetadata {
787 metadata: metadata[1].clone(),
788 })),
789 },
790 None,
791 )
792 .await
793 .unwrap();
794 }
795
796 #[tokio::test]
797 async fn test_partial_commit() {
798 let param = SinkParam {
799 sink_id: SinkId::from(1),
800 sink_name: "test".into(),
801 properties: Default::default(),
802 columns: vec![],
803 downstream_pk: None,
804 sink_type: SinkType::AppendOnly,
805 format_desc: None,
806 db_name: "test".into(),
807 sink_from_name: "test".into(),
808 };
809
810 let epoch = 233;
811
812 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
813 all_vnode.shuffle(&mut rand::rng());
814 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
815 let build_bitmap = |indexes: &[usize]| {
816 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
817 for i in indexes {
818 builder.set(*i, true);
819 }
820 builder.finish()
821 };
822 let vnode1 = build_bitmap(first);
823 let vnode2 = build_bitmap(second);
824
825 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
826 let (_sender, receiver) = unbounded_channel();
827
828 async move { Ok((1, receiver)) }.boxed()
829 });
830 let (manager, (_join_handle, _stop_tx)) =
831 SinkCoordinatorManager::start_worker_with_spawn_worker({
832 let expected_param = param.clone();
833 move |param, new_writer_rx| {
834 let expected_param = expected_param.clone();
835 tokio::spawn({
836 let subscriber = mock_subscriber.clone();
837 async move {
838 assert_eq!(param, expected_param);
840 CoordinatorWorker::execute_coordinator(
841 param,
842 new_writer_rx,
843 MockCoordinator::new((), |_, _, _| unreachable!()),
844 subscriber.clone(),
845 )
846 .await;
847 }
848 })
849 }
850 });
851
852 let build_client = |vnode| async {
853 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
854 Ok(tonic::Response::new(
855 manager
856 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
857 .await
858 .unwrap()
859 .boxed(),
860 ))
861 })
862 .await
863 .unwrap()
864 .0
865 };
866
867 let (mut client1, client2) = join(build_client(vnode1), build_client(vnode2)).await;
868
869 let mut commit_future = pin!(client1.commit(
871 epoch,
872 SinkMetadata {
873 metadata: Some(Metadata::Serialized(SerializedMetadata {
874 metadata: vec![],
875 })),
876 },
877 None,
878 ));
879 assert!(
880 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
881 .await
882 .is_pending()
883 );
884 drop(client2);
885 assert!(commit_future.await.is_err());
886 }
887
888 #[tokio::test]
889 async fn test_fail_commit() {
890 let param = SinkParam {
891 sink_id: SinkId::from(1),
892 sink_name: "test".into(),
893 properties: Default::default(),
894 columns: vec![],
895 downstream_pk: None,
896 sink_type: SinkType::AppendOnly,
897 format_desc: None,
898 db_name: "test".into(),
899 sink_from_name: "test".into(),
900 };
901
902 let epoch = 233;
903
904 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
905 all_vnode.shuffle(&mut rand::rng());
906 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
907 let build_bitmap = |indexes: &[usize]| {
908 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
909 for i in indexes {
910 builder.set(*i, true);
911 }
912 builder.finish()
913 };
914 let vnode1 = build_bitmap(first);
915 let vnode2 = build_bitmap(second);
916 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
917 let (_sender, receiver) = unbounded_channel();
918
919 async move { Ok((1, receiver)) }.boxed()
920 });
921 let (manager, (_join_handle, _stop_tx)) =
922 SinkCoordinatorManager::start_worker_with_spawn_worker({
923 let expected_param = param.clone();
924 move |param, new_writer_rx| {
925 let expected_param = expected_param.clone();
926 tokio::spawn({
927 let subscriber = mock_subscriber.clone();
928 {
929 async move {
930 assert_eq!(param, expected_param);
932 CoordinatorWorker::execute_coordinator(
933 param,
934 new_writer_rx,
935 MockCoordinator::new((), |_, _, _| {
936 Err(SinkError::Coordinator(anyhow!("failed to commit")))
937 }),
938 subscriber.clone(),
939 )
940 .await;
941 }
942 }
943 })
944 }
945 });
946
947 let build_client = |vnode| async {
948 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
949 Ok(tonic::Response::new(
950 manager
951 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
952 .await
953 .unwrap()
954 .boxed(),
955 ))
956 })
957 .await
958 .unwrap()
959 .0
960 };
961
962 let (mut client1, mut client2) = join(build_client(vnode1), build_client(vnode2)).await;
963
964 let mut commit_future = pin!(client1.commit(
966 epoch,
967 SinkMetadata {
968 metadata: Some(Metadata::Serialized(SerializedMetadata {
969 metadata: vec![],
970 })),
971 },
972 None,
973 ));
974 assert!(
975 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
976 .await
977 .is_pending()
978 );
979 let (result1, result2) = join(
980 commit_future,
981 client2.commit(
982 epoch,
983 SinkMetadata {
984 metadata: Some(Metadata::Serialized(SerializedMetadata {
985 metadata: vec![],
986 })),
987 },
988 None,
989 ),
990 )
991 .await;
992 assert!(result1.is_err());
993 assert!(result2.is_err());
994 }
995
996 #[tokio::test]
997 async fn test_update_vnode_bitmap() {
998 let param = SinkParam {
999 sink_id: SinkId::from(1),
1000 sink_name: "test".into(),
1001 properties: Default::default(),
1002 columns: vec![],
1003 downstream_pk: None,
1004 sink_type: SinkType::AppendOnly,
1005 format_desc: None,
1006 db_name: "test".into(),
1007 sink_from_name: "test".into(),
1008 };
1009
1010 let epoch1 = 233;
1011 let epoch2 = 234;
1012 let epoch3 = 235;
1013 let epoch4 = 236;
1014
1015 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1016 all_vnode.shuffle(&mut rand::rng());
1017 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
1018 let build_bitmap = |indexes: &[usize]| {
1019 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1020 for i in indexes {
1021 builder.set(*i, true);
1022 }
1023 builder.finish()
1024 };
1025 let vnode1 = build_bitmap(first);
1026 let vnode2 = build_bitmap(second);
1027
1028 let metadata = [
1029 [vec![1u8, 2u8], vec![3u8, 4u8]],
1030 [vec![5u8, 6u8], vec![7u8, 8u8]],
1031 ];
1032
1033 let metadata_scale_out = [vec![9u8, 10u8], vec![11u8, 12u8], vec![13u8, 14u8]];
1034 let metadata_scale_in = [vec![13u8, 14u8], vec![15u8, 16u8]];
1035 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
1036 let (_sender, receiver) = unbounded_channel();
1037
1038 async move { Ok((1, receiver)) }.boxed()
1039 });
1040 let (manager, (_join_handle, _stop_tx)) =
1041 SinkCoordinatorManager::start_worker_with_spawn_worker({
1042 let expected_param = param.clone();
1043 let metadata = metadata.clone();
1044 let metadata_scale_out = metadata_scale_out.clone();
1045 let metadata_scale_in = metadata_scale_in.clone();
1046 move |param, new_writer_rx| {
1047 let metadata = metadata.clone();
1048 let metadata_scale_out = metadata_scale_out.clone();
1049 let metadata_scale_in = metadata_scale_in.clone();
1050 let expected_param = expected_param.clone();
1051 tokio::spawn({
1052 let subscriber = mock_subscriber.clone();
1053 async move {
1054 assert_eq!(param, expected_param);
1056 CoordinatorWorker::execute_coordinator(
1057 param.clone(),
1058 new_writer_rx,
1059 MockCoordinator::new(
1060 0,
1061 |epoch, metadata_list, count: &mut usize| {
1062 *count += 1;
1063 let mut metadata_list =
1064 metadata_list
1065 .into_iter()
1066 .map(|metadata| match metadata {
1067 SinkMetadata {
1068 metadata:
1069 Some(Metadata::Serialized(
1070 SerializedMetadata { metadata },
1071 )),
1072 } => metadata,
1073 _ => unreachable!(),
1074 })
1075 .collect_vec();
1076 metadata_list.sort();
1077 let (expected_epoch, expected_metadata_list) = match *count
1078 {
1079 1 => (epoch1, metadata[0].as_slice()),
1080 2 => (epoch2, metadata[1].as_slice()),
1081 3 => (epoch3, metadata_scale_out.as_slice()),
1082 4 => (epoch4, metadata_scale_in.as_slice()),
1083 _ => unreachable!(),
1084 };
1085 assert_eq!(expected_epoch, epoch);
1086 assert_eq!(expected_metadata_list, &metadata_list);
1087 Ok(())
1088 },
1089 ),
1090 subscriber.clone(),
1091 )
1092 .await;
1093 }
1094 })
1095 }
1096 });
1097
1098 let build_client = |vnode| async {
1099 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1100 Ok(tonic::Response::new(
1101 manager
1102 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1103 .await
1104 .unwrap()
1105 .boxed(),
1106 ))
1107 })
1108 .await
1109 };
1110
1111 let ((mut client1, _), (mut client2, _)) =
1112 try_join(build_client(vnode1), pin!(build_client(vnode2)))
1113 .await
1114 .unwrap();
1115
1116 let (aligned_epoch1, aligned_epoch2) = try_join(
1117 client1.align_initial_epoch(epoch1),
1118 client2.align_initial_epoch(epoch1),
1119 )
1120 .await
1121 .unwrap();
1122 assert_eq!(aligned_epoch1, epoch1);
1123 assert_eq!(aligned_epoch2, epoch1);
1124
1125 {
1126 let mut commit_future = pin!(
1128 client2
1129 .commit(
1130 epoch1,
1131 SinkMetadata {
1132 metadata: Some(Metadata::Serialized(SerializedMetadata {
1133 metadata: metadata[0][1].clone(),
1134 })),
1135 },
1136 None,
1137 )
1138 .map(|result| result.unwrap())
1139 );
1140 assert!(
1141 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1142 .await
1143 .is_pending()
1144 );
1145 join(
1146 commit_future,
1147 client1
1148 .commit(
1149 epoch1,
1150 SinkMetadata {
1151 metadata: Some(Metadata::Serialized(SerializedMetadata {
1152 metadata: metadata[0][0].clone(),
1153 })),
1154 },
1155 None,
1156 )
1157 .map(|result| result.unwrap()),
1158 )
1159 .await;
1160 }
1161
1162 let (vnode1, vnode2, vnode3) = {
1163 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1164 let (second, third) = second.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1165 (
1166 build_bitmap(first),
1167 build_bitmap(second),
1168 build_bitmap(third),
1169 )
1170 };
1171
1172 let mut build_client3_future = pin!(build_client(vnode3));
1173 assert!(
1174 poll_fn(|cx| Poll::Ready(build_client3_future.as_mut().poll(cx)))
1175 .await
1176 .is_pending()
1177 );
1178 let mut client3;
1179 {
1180 {
1181 let mut commit_future = pin!(
1183 client1
1184 .commit(
1185 epoch2,
1186 SinkMetadata {
1187 metadata: Some(Metadata::Serialized(SerializedMetadata {
1188 metadata: metadata[1][0].clone(),
1189 })),
1190 },
1191 None,
1192 )
1193 .map_err(Into::into)
1194 );
1195 assert!(
1196 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1197 .await
1198 .is_pending()
1199 );
1200 try_join(
1201 commit_future,
1202 client2.commit(
1203 epoch2,
1204 SinkMetadata {
1205 metadata: Some(Metadata::Serialized(SerializedMetadata {
1206 metadata: metadata[1][1].clone(),
1207 })),
1208 },
1209 None,
1210 ),
1211 )
1212 .await
1213 .unwrap();
1214 }
1215
1216 client3 = {
1217 let (
1218 (client3, init_epoch),
1219 (update_vnode_bitmap_epoch1, update_vnode_bitmap_epoch2),
1220 ) = try_join(
1221 build_client3_future,
1222 try_join(
1223 client1.update_vnode_bitmap(&vnode1),
1224 client2.update_vnode_bitmap(&vnode2),
1225 )
1226 .map_err(Into::into),
1227 )
1228 .await
1229 .unwrap();
1230 assert_eq!(init_epoch, Some(epoch2));
1231 assert_eq!(update_vnode_bitmap_epoch1, epoch2);
1232 assert_eq!(update_vnode_bitmap_epoch2, epoch2);
1233 client3
1234 };
1235 let mut commit_future3 = pin!(client3.commit(
1236 epoch3,
1237 SinkMetadata {
1238 metadata: Some(Metadata::Serialized(SerializedMetadata {
1239 metadata: metadata_scale_out[2].clone(),
1240 })),
1241 },
1242 None,
1243 ));
1244 assert!(
1245 poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1246 .await
1247 .is_pending()
1248 );
1249 let mut commit_future1 = pin!(client1.commit(
1250 epoch3,
1251 SinkMetadata {
1252 metadata: Some(Metadata::Serialized(SerializedMetadata {
1253 metadata: metadata_scale_out[0].clone(),
1254 })),
1255 },
1256 None,
1257 ));
1258 assert!(
1259 poll_fn(|cx| Poll::Ready(commit_future1.as_mut().poll(cx)))
1260 .await
1261 .is_pending()
1262 );
1263 assert!(
1264 poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1265 .await
1266 .is_pending()
1267 );
1268 try_join(
1269 client2.commit(
1270 epoch3,
1271 SinkMetadata {
1272 metadata: Some(Metadata::Serialized(SerializedMetadata {
1273 metadata: metadata_scale_out[1].clone(),
1274 })),
1275 },
1276 None,
1277 ),
1278 try_join(commit_future1, commit_future3),
1279 )
1280 .await
1281 .unwrap();
1282 }
1283
1284 let (vnode2, vnode3) = {
1285 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1286 (build_bitmap(first), build_bitmap(second))
1287 };
1288
1289 {
1290 let (_, (update_vnode_bitmap_epoch2, update_vnode_bitmap_epoch3)) = try_join(
1291 client1.stop(),
1292 try_join(
1293 client2.update_vnode_bitmap(&vnode2),
1294 client3.update_vnode_bitmap(&vnode3),
1295 ),
1296 )
1297 .await
1298 .unwrap();
1299 assert_eq!(update_vnode_bitmap_epoch2, epoch3);
1300 assert_eq!(update_vnode_bitmap_epoch3, epoch3);
1301 }
1302
1303 {
1304 let mut commit_future = pin!(
1305 client2
1306 .commit(
1307 epoch4,
1308 SinkMetadata {
1309 metadata: Some(Metadata::Serialized(SerializedMetadata {
1310 metadata: metadata_scale_in[0].clone(),
1311 })),
1312 },
1313 None,
1314 )
1315 .map(|result| result.unwrap())
1316 );
1317 assert!(
1318 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1319 .await
1320 .is_pending()
1321 );
1322 join(
1323 commit_future,
1324 client3
1325 .commit(
1326 epoch4,
1327 SinkMetadata {
1328 metadata: Some(Metadata::Serialized(SerializedMetadata {
1329 metadata: metadata_scale_in[1].clone(),
1330 })),
1331 },
1332 None,
1333 )
1334 .map(|result| result.unwrap()),
1335 )
1336 .await;
1337 }
1338 }
1339}