1use std::collections::HashMap;
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use futures::future::{BoxFuture, Either, select};
21use futures::stream::FuturesUnordered;
22use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_connector::sink::catalog::SinkId;
25use risingwave_connector::sink::{SinkCommittedEpochSubscriber, SinkError, SinkParam};
26use risingwave_pb::connector_service::coordinate_request::Msg;
27use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse, coordinate_request};
28use rw_futures_util::pending_on_none;
29use sea_orm::DatabaseConnection;
30use thiserror_ext::AsReport;
31use tokio::sync::mpsc;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
33use tokio::sync::oneshot::{Receiver, Sender, channel};
34use tokio::task::{JoinError, JoinHandle};
35use tokio_stream::wrappers::UnboundedReceiverStream;
36use tonic::Status;
37use tracing::{debug, error, info, warn};
38
39use crate::hummock::HummockManagerRef;
40use crate::manager::MetadataManager;
41use crate::manager::sink_coordination::SinkWriterRequestStream;
42use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
43use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
44
45macro_rules! send_with_err_check {
46 ($tx:expr, $msg:expr) => {
47 if $tx.send($msg).is_err() {
48 error!("unable to send msg");
49 }
50 };
51}
52
53macro_rules! send_await_with_err_check {
54 ($tx:expr, $msg:expr) => {
55 if $tx.send($msg).await.is_err() {
56 error!("unable to send msg");
57 }
58 };
59}
60
61const BOUNDED_CHANNEL_SIZE: usize = 16;
62
63enum ManagerRequest {
64 NewSinkWriter(SinkWriterCoordinationHandle),
65 StopCoordinator {
66 finish_notifier: Sender<()>,
67 sink_id: Option<SinkId>,
69 },
70}
71
72#[derive(Clone)]
73pub struct SinkCoordinatorManager {
74 request_tx: mpsc::Sender<ManagerRequest>,
75}
76
77fn new_committed_epoch_subscriber(
78 hummock_manager: HummockManagerRef,
79 metadata_manager: MetadataManager,
80) -> SinkCommittedEpochSubscriber {
81 Arc::new(move |sink_id| {
82 let hummock_manager = hummock_manager.clone();
83 let metadata_manager = metadata_manager.clone();
84 async move {
85 let state_table_ids = metadata_manager
86 .get_sink_state_table_ids(sink_id.sink_id as _)
87 .await
88 .map_err(SinkError::from)?;
89 let Some(table_id) = state_table_ids.first() else {
90 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
91 };
92 hummock_manager
93 .subscribe_table_committed_epoch(*table_id)
94 .await
95 .map_err(SinkError::from)
96 }
97 .boxed()
98 })
99}
100
101impl SinkCoordinatorManager {
102 pub fn start_worker(
103 db: DatabaseConnection,
104 hummock_manager: HummockManagerRef,
105 metadata_manager: MetadataManager,
106 ) -> (Self, (JoinHandle<()>, Sender<()>)) {
107 let subscriber =
108 new_committed_epoch_subscriber(hummock_manager.clone(), metadata_manager.clone());
109 Self::start_worker_with_spawn_worker(move |param, manager_request_stream| {
110 tokio::spawn(CoordinatorWorker::run(
111 param,
112 manager_request_stream,
113 db.clone(),
114 subscriber.clone(),
115 ))
116 })
117 }
118
119 fn start_worker_with_spawn_worker(
120 spawn_coordinator_worker: impl SpawnCoordinatorFn,
121 ) -> (Self, (JoinHandle<()>, Sender<()>)) {
122 let (request_tx, request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
123 let (shutdown_tx, shutdown_rx) = channel();
124 let worker = ManagerWorker::new(request_rx, shutdown_rx);
125 let join_handle = tokio::spawn(worker.execute(spawn_coordinator_worker));
126 (
127 SinkCoordinatorManager { request_tx },
128 (join_handle, shutdown_tx),
129 )
130 }
131
132 pub async fn handle_new_request(
133 &self,
134 mut request_stream: SinkWriterRequestStream,
135 ) -> Result<impl Stream<Item = Result<CoordinateResponse, Status>> + use<>, Status> {
136 let (param, vnode_bitmap) = match request_stream.try_next().await? {
137 Some(CoordinateRequest {
138 msg:
139 Some(Msg::StartRequest(coordinate_request::StartCoordinationRequest {
140 param: Some(param),
141 vnode_bitmap: Some(vnode_bitmap),
142 })),
143 }) => (SinkParam::from_proto(param), Bitmap::from(&vnode_bitmap)),
144 msg => {
145 return Err(Status::invalid_argument(format!(
146 "expected CoordinateRequest::StartRequest in the first request, get {:?}",
147 msg
148 )));
149 }
150 };
151 let (response_tx, response_rx) = mpsc::unbounded_channel();
152 self.request_tx
153 .send(ManagerRequest::NewSinkWriter(
154 SinkWriterCoordinationHandle::new(request_stream, response_tx, param, vnode_bitmap),
155 ))
156 .await
157 .map_err(|_| {
158 Status::unavailable(
159 "unable to send to sink manager worker. The worker may have stopped",
160 )
161 })?;
162
163 Ok(UnboundedReceiverStream::new(response_rx))
164 }
165
166 async fn stop_coordinator(&self, sink_id: Option<SinkId>) {
167 let (tx, rx) = channel();
168 send_await_with_err_check!(
169 self.request_tx,
170 ManagerRequest::StopCoordinator {
171 finish_notifier: tx,
172 sink_id,
173 }
174 );
175 if rx.await.is_err() {
176 error!("fail to wait for resetting sink manager worker");
177 }
178 info!("successfully stop coordinator: {:?}", sink_id);
179 }
180
181 pub async fn reset(&self) {
182 self.stop_coordinator(None).await;
183 }
184
185 pub async fn stop_sink_coordinator(&self, sink_id: SinkId) {
186 self.stop_coordinator(Some(sink_id)).await;
187 }
188}
189
190struct CoordinatorWorkerHandle {
191 request_sender: Option<UnboundedSender<SinkWriterCoordinationHandle>>,
193 finish_notifiers: Vec<Sender<()>>,
195}
196
197struct ManagerWorker {
198 request_rx: mpsc::Receiver<ManagerRequest>,
199 shutdown_rx: Receiver<()>,
201
202 running_coordinator_worker_join_handles:
203 FuturesUnordered<BoxFuture<'static, (SinkId, Result<(), JoinError>)>>,
204 running_coordinator_worker: HashMap<SinkId, CoordinatorWorkerHandle>,
205}
206
207enum ManagerEvent {
208 NewRequest(ManagerRequest),
209 CoordinatorWorkerFinished {
210 sink_id: SinkId,
211 join_result: Result<(), JoinError>,
212 },
213}
214
215trait SpawnCoordinatorFn = FnMut(SinkParam, UnboundedReceiver<SinkWriterCoordinationHandle>) -> JoinHandle<()>
216 + Send
217 + 'static;
218
219impl ManagerWorker {
220 fn new(request_rx: mpsc::Receiver<ManagerRequest>, shutdown_rx: Receiver<()>) -> Self {
221 ManagerWorker {
222 request_rx,
223 shutdown_rx,
224 running_coordinator_worker_join_handles: Default::default(),
225 running_coordinator_worker: Default::default(),
226 }
227 }
228
229 async fn execute(mut self, mut spawn_coordinator_worker: impl SpawnCoordinatorFn) {
230 while let Some(event) = self.next_event().await {
231 match event {
232 ManagerEvent::NewRequest(request) => match request {
233 ManagerRequest::NewSinkWriter(request) => {
234 self.handle_new_sink_writer(request, &mut spawn_coordinator_worker)
235 }
236 ManagerRequest::StopCoordinator {
237 finish_notifier,
238 sink_id,
239 } => {
240 if let Some(sink_id) = sink_id {
241 if let Some(worker_handle) =
242 self.running_coordinator_worker.get_mut(&sink_id)
243 {
244 if let Some(sender) = worker_handle.request_sender.take() {
245 drop(sender);
248 }
249 worker_handle.finish_notifiers.push(finish_notifier);
250 } else {
251 debug!(
252 "sink coordinator of {} is not running. Notify finish directly",
253 sink_id.sink_id
254 );
255 send_with_err_check!(finish_notifier, ());
256 }
257 } else {
258 self.clean_up().await;
259 send_with_err_check!(finish_notifier, ());
260 }
261 }
262 },
263 ManagerEvent::CoordinatorWorkerFinished {
264 sink_id,
265 join_result,
266 } => self.handle_coordinator_finished(sink_id, join_result),
267 }
268 }
269 self.clean_up().await;
270 info!("sink manager worker exited");
271 }
272
273 async fn next_event(&mut self) -> Option<ManagerEvent> {
274 match select(
275 select(
276 pin!(self.request_rx.recv()),
277 pin!(pending_on_none(
278 self.running_coordinator_worker_join_handles.next()
279 )),
280 ),
281 &mut self.shutdown_rx,
282 )
283 .await
284 {
285 Either::Left((either, _)) => match either {
286 Either::Left((Some(request), _)) => Some(ManagerEvent::NewRequest(request)),
287 Either::Left((None, _)) => None,
288 Either::Right(((sink_id, join_result), _)) => {
289 Some(ManagerEvent::CoordinatorWorkerFinished {
290 sink_id,
291 join_result,
292 })
293 }
294 },
295 Either::Right(_) => None,
296 }
297 }
298
299 async fn clean_up(&mut self) {
300 info!("sink manager worker start cleaning up");
301 for worker_handle in self.running_coordinator_worker.values_mut() {
302 if let Some(sender) = worker_handle.request_sender.take() {
303 drop(sender);
305 }
306 }
307 while let Some((sink_id, join_result)) =
308 self.running_coordinator_worker_join_handles.next().await
309 {
310 self.handle_coordinator_finished(sink_id, join_result);
311 }
312 info!("sink manager worker finished cleaning up");
313 }
314
315 fn handle_coordinator_finished(&mut self, sink_id: SinkId, join_result: Result<(), JoinError>) {
316 let worker_handle = self
317 .running_coordinator_worker
318 .remove(&sink_id)
319 .expect("finished coordinator should have an associated worker handle");
320 for finish_notifier in worker_handle.finish_notifiers {
321 send_with_err_check!(finish_notifier, ());
322 }
323 match join_result {
324 Ok(()) => {
325 info!(
326 id = sink_id.sink_id,
327 "sink coordinator has gracefully finished",
328 );
329 }
330 Err(err) => {
331 error!(
332 id = sink_id.sink_id,
333 error = %err.as_report(),
334 "sink coordinator finished with error",
335 );
336 }
337 }
338 }
339
340 fn handle_new_sink_writer(
341 &mut self,
342 new_writer: SinkWriterCoordinationHandle,
343 spawn_coordinator_worker: &mut impl SpawnCoordinatorFn,
344 ) {
345 let param = new_writer.param();
346 let sink_id = param.sink_id;
347
348 let handle = self
349 .running_coordinator_worker
350 .entry(param.sink_id)
351 .or_insert_with(|| {
352 let (request_tx, request_rx) = unbounded_channel();
354 let join_handle = spawn_coordinator_worker(param.clone(), request_rx);
355 self.running_coordinator_worker_join_handles.push(
356 join_handle
357 .map(move |join_result| (sink_id, join_result))
358 .boxed(),
359 );
360 CoordinatorWorkerHandle {
361 request_sender: Some(request_tx),
362 finish_notifiers: Vec::new(),
363 }
364 });
365
366 if let Some(sender) = handle.request_sender.as_mut() {
367 send_with_err_check!(sender, new_writer);
368 } else {
369 warn!(
370 "handle a new request while the sink coordinator is being stopped: {:?}",
371 param
372 );
373 new_writer.abort(Status::internal("the sink is being stopped"));
374 }
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use std::future::{Future, poll_fn};
381 use std::pin::pin;
382 use std::sync::Arc;
383 use std::task::Poll;
384
385 use anyhow::anyhow;
386 use async_trait::async_trait;
387 use futures::future::join;
388 use futures::{FutureExt, StreamExt};
389 use itertools::Itertools;
390 use rand::seq::SliceRandom;
391 use risingwave_common::bitmap::BitmapBuilder;
392 use risingwave_common::hash::VirtualNode;
393 use risingwave_connector::sink::catalog::{SinkId, SinkType};
394 use risingwave_connector::sink::{SinkCommitCoordinator, SinkError, SinkParam};
395 use risingwave_pb::connector_service::SinkMetadata;
396 use risingwave_pb::connector_service::sink_metadata::{Metadata, SerializedMetadata};
397 use risingwave_rpc_client::CoordinatorStreamHandle;
398 use tokio::sync::mpsc::unbounded_channel;
399 use tokio_stream::wrappers::ReceiverStream;
400
401 use crate::manager::sink_coordination::SinkCoordinatorManager;
402 use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
403 use crate::manager::sink_coordination::manager::SinkCommittedEpochSubscriber;
404
405 struct MockCoordinator<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> {
406 context: C,
407 f: F,
408 }
409
410 impl<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> MockCoordinator<C, F> {
411 fn new(context: C, f: F) -> Self {
412 MockCoordinator { context, f }
413 }
414 }
415
416 #[async_trait]
417 impl<C: Send, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError> + Send>
418 SinkCommitCoordinator for MockCoordinator<C, F>
419 {
420 async fn init(
421 &mut self,
422 _subscriber: SinkCommittedEpochSubscriber,
423 ) -> risingwave_connector::sink::Result<Option<u64>> {
424 Ok(None)
425 }
426
427 async fn commit(
428 &mut self,
429 epoch: u64,
430 metadata: Vec<SinkMetadata>,
431 ) -> risingwave_connector::sink::Result<()> {
432 (self.f)(epoch, metadata, &mut self.context)
433 }
434 }
435
436 #[tokio::test]
437 async fn test_basic() {
438 let param = SinkParam {
439 sink_id: SinkId::from(1),
440 sink_name: "test".into(),
441 properties: Default::default(),
442 columns: vec![],
443 downstream_pk: vec![],
444 sink_type: SinkType::AppendOnly,
445 format_desc: None,
446 db_name: "test".into(),
447 sink_from_name: "test".into(),
448 };
449
450 let epoch1 = 233;
451 let epoch2 = 234;
452
453 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
454 all_vnode.shuffle(&mut rand::rng());
455 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
456 let build_bitmap = |indexes: &[usize]| {
457 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
458 for i in indexes {
459 builder.set(*i, true);
460 }
461 builder.finish()
462 };
463 let vnode1 = build_bitmap(first);
464 let vnode2 = build_bitmap(second);
465
466 let metadata = [
467 [vec![1u8, 2u8], vec![3u8, 4u8]],
468 [vec![5u8, 6u8], vec![7u8, 8u8]],
469 ];
470 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
471 let (_sender, receiver) = unbounded_channel();
472
473 async move { Ok((1, receiver)) }.boxed()
474 });
475
476 let (manager, (_join_handle, _stop_tx)) =
477 SinkCoordinatorManager::start_worker_with_spawn_worker({
478 let expected_param = param.clone();
479 let metadata = metadata.clone();
480 move |param, new_writer_rx| {
481 let metadata = metadata.clone();
482 let expected_param = expected_param.clone();
483 tokio::spawn({
484 let subscriber = mock_subscriber.clone();
485 async move {
486 assert_eq!(param, expected_param);
488 CoordinatorWorker::execute_coordinator(
489 param.clone(),
490 new_writer_rx,
491 MockCoordinator::new(
492 0,
493 |epoch, metadata_list, count: &mut usize| {
494 *count += 1;
495 let mut metadata_list =
496 metadata_list
497 .into_iter()
498 .map(|metadata| match metadata {
499 SinkMetadata {
500 metadata:
501 Some(Metadata::Serialized(
502 SerializedMetadata { metadata },
503 )),
504 } => metadata,
505 _ => unreachable!(),
506 })
507 .collect_vec();
508 metadata_list.sort();
509 match *count {
510 1 => {
511 assert_eq!(epoch, epoch1);
512 assert_eq!(2, metadata_list.len());
513 assert_eq!(metadata[0][0], metadata_list[0]);
514 assert_eq!(metadata[0][1], metadata_list[1]);
515 }
516 2 => {
517 assert_eq!(epoch, epoch2);
518 assert_eq!(2, metadata_list.len());
519 assert_eq!(metadata[1][0], metadata_list[0]);
520 assert_eq!(metadata[1][1], metadata_list[1]);
521 }
522 _ => unreachable!(),
523 }
524 Ok(())
525 },
526 ),
527 subscriber.clone(),
528 )
529 .await;
530 }
531 })
532 }
533 });
534
535 let build_client = |vnode| async {
536 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
537 Ok(tonic::Response::new(
538 manager
539 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
540 .await
541 .unwrap()
542 .boxed(),
543 ))
544 })
545 .await
546 .unwrap()
547 .0
548 };
549
550 let (mut client1, mut client2) =
551 join(build_client(vnode1), pin!(build_client(vnode2))).await;
552
553 {
554 let mut commit_future = pin!(
556 client2
557 .commit(
558 epoch1,
559 SinkMetadata {
560 metadata: Some(Metadata::Serialized(SerializedMetadata {
561 metadata: metadata[0][1].clone(),
562 })),
563 },
564 )
565 .map(|result| result.unwrap())
566 );
567 assert!(
568 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
569 .await
570 .is_pending()
571 );
572 join(
573 commit_future,
574 client1
575 .commit(
576 epoch1,
577 SinkMetadata {
578 metadata: Some(Metadata::Serialized(SerializedMetadata {
579 metadata: metadata[0][0].clone(),
580 })),
581 },
582 )
583 .map(|result| result.unwrap()),
584 )
585 .await;
586 }
587
588 let mut commit_future = pin!(
590 client1
591 .commit(
592 epoch2,
593 SinkMetadata {
594 metadata: Some(Metadata::Serialized(SerializedMetadata {
595 metadata: metadata[1][0].clone(),
596 })),
597 },
598 )
599 .map(|result| result.unwrap())
600 );
601 assert!(
602 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
603 .await
604 .is_pending()
605 );
606 join(
607 commit_future,
608 client2
609 .commit(
610 epoch2,
611 SinkMetadata {
612 metadata: Some(Metadata::Serialized(SerializedMetadata {
613 metadata: metadata[1][1].clone(),
614 })),
615 },
616 )
617 .map(|result| result.unwrap()),
618 )
619 .await;
620 }
621
622 #[tokio::test]
623 async fn test_single_writer() {
624 let param = SinkParam {
625 sink_id: SinkId::from(1),
626 sink_name: "test".into(),
627 properties: Default::default(),
628 columns: vec![],
629 downstream_pk: vec![],
630 sink_type: SinkType::AppendOnly,
631 format_desc: None,
632 db_name: "test".into(),
633 sink_from_name: "test".into(),
634 };
635
636 let epoch1 = 233;
637 let epoch2 = 234;
638
639 let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
640 let build_bitmap = |indexes: &[usize]| {
641 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
642 for i in indexes {
643 builder.set(*i, true);
644 }
645 builder.finish()
646 };
647 let vnode = build_bitmap(&all_vnode);
648
649 let metadata = [vec![1u8, 2u8], vec![3u8, 4u8]];
650 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
651 let (_sender, receiver) = unbounded_channel();
652
653 async move { Ok((1, receiver)) }.boxed()
654 });
655 let (manager, (_join_handle, _stop_tx)) =
656 SinkCoordinatorManager::start_worker_with_spawn_worker({
657 let expected_param = param.clone();
658 let metadata = metadata.clone();
659 move |param, new_writer_rx| {
660 let metadata = metadata.clone();
661 let expected_param = expected_param.clone();
662 tokio::spawn({
663 let subscriber = mock_subscriber.clone();
664 async move {
665 assert_eq!(param, expected_param);
667 CoordinatorWorker::execute_coordinator(
668 param.clone(),
669 new_writer_rx,
670 MockCoordinator::new(
671 0,
672 |epoch, metadata_list, count: &mut usize| {
673 *count += 1;
674 let mut metadata_list =
675 metadata_list
676 .into_iter()
677 .map(|metadata| match metadata {
678 SinkMetadata {
679 metadata:
680 Some(Metadata::Serialized(
681 SerializedMetadata { metadata },
682 )),
683 } => metadata,
684 _ => unreachable!(),
685 })
686 .collect_vec();
687 metadata_list.sort();
688 match *count {
689 1 => {
690 assert_eq!(epoch, epoch1);
691 assert_eq!(1, metadata_list.len());
692 assert_eq!(metadata[0], metadata_list[0]);
693 }
694 2 => {
695 assert_eq!(epoch, epoch2);
696 assert_eq!(1, metadata_list.len());
697 assert_eq!(metadata[1], metadata_list[0]);
698 }
699 _ => unreachable!(),
700 }
701 Ok(())
702 },
703 ),
704 subscriber.clone(),
705 )
706 .await;
707 }
708 })
709 }
710 });
711
712 let build_client = |vnode| async {
713 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
714 Ok(tonic::Response::new(
715 manager
716 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
717 .await
718 .unwrap()
719 .boxed(),
720 ))
721 })
722 .await
723 .unwrap()
724 .0
725 };
726
727 let mut client = build_client(vnode).await;
728
729 client
730 .commit(
731 epoch1,
732 SinkMetadata {
733 metadata: Some(Metadata::Serialized(SerializedMetadata {
734 metadata: metadata[0].clone(),
735 })),
736 },
737 )
738 .await
739 .unwrap();
740
741 client
742 .commit(
743 epoch2,
744 SinkMetadata {
745 metadata: Some(Metadata::Serialized(SerializedMetadata {
746 metadata: metadata[1].clone(),
747 })),
748 },
749 )
750 .await
751 .unwrap();
752 }
753
754 #[tokio::test]
755 async fn test_partial_commit() {
756 let param = SinkParam {
757 sink_id: SinkId::from(1),
758 sink_name: "test".into(),
759 properties: Default::default(),
760 columns: vec![],
761 downstream_pk: vec![],
762 sink_type: SinkType::AppendOnly,
763 format_desc: None,
764 db_name: "test".into(),
765 sink_from_name: "test".into(),
766 };
767
768 let epoch = 233;
769
770 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
771 all_vnode.shuffle(&mut rand::rng());
772 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
773 let build_bitmap = |indexes: &[usize]| {
774 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
775 for i in indexes {
776 builder.set(*i, true);
777 }
778 builder.finish()
779 };
780 let vnode1 = build_bitmap(first);
781 let vnode2 = build_bitmap(second);
782
783 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
784 let (_sender, receiver) = unbounded_channel();
785
786 async move { Ok((1, receiver)) }.boxed()
787 });
788 let (manager, (_join_handle, _stop_tx)) =
789 SinkCoordinatorManager::start_worker_with_spawn_worker({
790 let expected_param = param.clone();
791 move |param, new_writer_rx| {
792 let expected_param = expected_param.clone();
793 tokio::spawn({
794 let subscriber = mock_subscriber.clone();
795 async move {
796 assert_eq!(param, expected_param);
798 CoordinatorWorker::execute_coordinator(
799 param,
800 new_writer_rx,
801 MockCoordinator::new((), |_, _, _| unreachable!()),
802 subscriber.clone(),
803 )
804 .await;
805 }
806 })
807 }
808 });
809
810 let build_client = |vnode| async {
811 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
812 Ok(tonic::Response::new(
813 manager
814 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
815 .await
816 .unwrap()
817 .boxed(),
818 ))
819 })
820 .await
821 .unwrap()
822 .0
823 };
824
825 let (mut client1, client2) = join(build_client(vnode1), build_client(vnode2)).await;
826
827 let mut commit_future = pin!(client1.commit(
829 epoch,
830 SinkMetadata {
831 metadata: Some(Metadata::Serialized(SerializedMetadata {
832 metadata: vec![],
833 })),
834 },
835 ));
836 assert!(
837 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
838 .await
839 .is_pending()
840 );
841 drop(client2);
842 assert!(commit_future.await.is_err());
843 }
844
845 #[tokio::test]
846 async fn test_fail_commit() {
847 let param = SinkParam {
848 sink_id: SinkId::from(1),
849 sink_name: "test".into(),
850 properties: Default::default(),
851 columns: vec![],
852 downstream_pk: vec![],
853 sink_type: SinkType::AppendOnly,
854 format_desc: None,
855 db_name: "test".into(),
856 sink_from_name: "test".into(),
857 };
858
859 let epoch = 233;
860
861 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
862 all_vnode.shuffle(&mut rand::rng());
863 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
864 let build_bitmap = |indexes: &[usize]| {
865 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
866 for i in indexes {
867 builder.set(*i, true);
868 }
869 builder.finish()
870 };
871 let vnode1 = build_bitmap(first);
872 let vnode2 = build_bitmap(second);
873 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
874 let (_sender, receiver) = unbounded_channel();
875
876 async move { Ok((1, receiver)) }.boxed()
877 });
878 let (manager, (_join_handle, _stop_tx)) =
879 SinkCoordinatorManager::start_worker_with_spawn_worker({
880 let expected_param = param.clone();
881 move |param, new_writer_rx| {
882 let expected_param = expected_param.clone();
883 tokio::spawn({
884 let subscriber = mock_subscriber.clone();
885 {
886 async move {
887 assert_eq!(param, expected_param);
889 CoordinatorWorker::execute_coordinator(
890 param,
891 new_writer_rx,
892 MockCoordinator::new((), |_, _, _| {
893 Err(SinkError::Coordinator(anyhow!("failed to commit")))
894 }),
895 subscriber.clone(),
896 )
897 .await;
898 }
899 }
900 })
901 }
902 });
903
904 let build_client = |vnode| async {
905 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
906 Ok(tonic::Response::new(
907 manager
908 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
909 .await
910 .unwrap()
911 .boxed(),
912 ))
913 })
914 .await
915 .unwrap()
916 .0
917 };
918
919 let (mut client1, mut client2) = join(build_client(vnode1), build_client(vnode2)).await;
920
921 let mut commit_future = pin!(client1.commit(
923 epoch,
924 SinkMetadata {
925 metadata: Some(Metadata::Serialized(SerializedMetadata {
926 metadata: vec![],
927 })),
928 },
929 ));
930 assert!(
931 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
932 .await
933 .is_pending()
934 );
935 let (result1, result2) = join(
936 commit_future,
937 client2.commit(
938 epoch,
939 SinkMetadata {
940 metadata: Some(Metadata::Serialized(SerializedMetadata {
941 metadata: vec![],
942 })),
943 },
944 ),
945 )
946 .await;
947 assert!(result1.is_err());
948 assert!(result2.is_err());
949 }
950
951 #[tokio::test]
952 async fn test_update_vnode_bitmap() {
953 let param = SinkParam {
954 sink_id: SinkId::from(1),
955 sink_name: "test".into(),
956 properties: Default::default(),
957 columns: vec![],
958 downstream_pk: vec![],
959 sink_type: SinkType::AppendOnly,
960 format_desc: None,
961 db_name: "test".into(),
962 sink_from_name: "test".into(),
963 };
964
965 let epoch1 = 233;
966 let epoch2 = 234;
967 let epoch3 = 235;
968 let epoch4 = 236;
969
970 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
971 all_vnode.shuffle(&mut rand::rng());
972 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
973 let build_bitmap = |indexes: &[usize]| {
974 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
975 for i in indexes {
976 builder.set(*i, true);
977 }
978 builder.finish()
979 };
980 let vnode1 = build_bitmap(first);
981 let vnode2 = build_bitmap(second);
982
983 let metadata = [
984 [vec![1u8, 2u8], vec![3u8, 4u8]],
985 [vec![5u8, 6u8], vec![7u8, 8u8]],
986 ];
987
988 let metadata_scale_out = [vec![9u8, 10u8], vec![11u8, 12u8], vec![13u8, 14u8]];
989 let metadata_scale_in = [vec![13u8, 14u8], vec![15u8, 16u8]];
990 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
991 let (_sender, receiver) = unbounded_channel();
992
993 async move { Ok((1, receiver)) }.boxed()
994 });
995 let (manager, (_join_handle, _stop_tx)) =
996 SinkCoordinatorManager::start_worker_with_spawn_worker({
997 let expected_param = param.clone();
998 let metadata = metadata.clone();
999 let metadata_scale_out = metadata_scale_out.clone();
1000 let metadata_scale_in = metadata_scale_in.clone();
1001 move |param, new_writer_rx| {
1002 let metadata = metadata.clone();
1003 let metadata_scale_out = metadata_scale_out.clone();
1004 let metadata_scale_in = metadata_scale_in.clone();
1005 let expected_param = expected_param.clone();
1006 tokio::spawn({
1007 let subscriber = mock_subscriber.clone();
1008 async move {
1009 assert_eq!(param, expected_param);
1011 CoordinatorWorker::execute_coordinator(
1012 param.clone(),
1013 new_writer_rx,
1014 MockCoordinator::new(
1015 0,
1016 |epoch, metadata_list, count: &mut usize| {
1017 *count += 1;
1018 let mut metadata_list =
1019 metadata_list
1020 .into_iter()
1021 .map(|metadata| match metadata {
1022 SinkMetadata {
1023 metadata:
1024 Some(Metadata::Serialized(
1025 SerializedMetadata { metadata },
1026 )),
1027 } => metadata,
1028 _ => unreachable!(),
1029 })
1030 .collect_vec();
1031 metadata_list.sort();
1032 let (expected_epoch, expected_metadata_list) = match *count
1033 {
1034 1 => (epoch1, metadata[0].as_slice()),
1035 2 => (epoch2, metadata[1].as_slice()),
1036 3 => (epoch3, metadata_scale_out.as_slice()),
1037 4 => (epoch4, metadata_scale_in.as_slice()),
1038 _ => unreachable!(),
1039 };
1040 assert_eq!(expected_epoch, epoch);
1041 assert_eq!(expected_metadata_list, &metadata_list);
1042 Ok(())
1043 },
1044 ),
1045 subscriber.clone(),
1046 )
1047 .await;
1048 }
1049 })
1050 }
1051 });
1052
1053 let build_client = |vnode| async {
1054 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1055 Ok(tonic::Response::new(
1056 manager
1057 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1058 .await
1059 .unwrap()
1060 .boxed(),
1061 ))
1062 })
1063 .await
1064 .unwrap()
1065 .0
1066 };
1067
1068 let (mut client1, mut client2) =
1069 join(build_client(vnode1), pin!(build_client(vnode2))).await;
1070
1071 {
1072 let mut commit_future = pin!(
1074 client2
1075 .commit(
1076 epoch1,
1077 SinkMetadata {
1078 metadata: Some(Metadata::Serialized(SerializedMetadata {
1079 metadata: metadata[0][1].clone(),
1080 })),
1081 },
1082 )
1083 .map(|result| result.unwrap())
1084 );
1085 assert!(
1086 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1087 .await
1088 .is_pending()
1089 );
1090 join(
1091 commit_future,
1092 client1
1093 .commit(
1094 epoch1,
1095 SinkMetadata {
1096 metadata: Some(Metadata::Serialized(SerializedMetadata {
1097 metadata: metadata[0][0].clone(),
1098 })),
1099 },
1100 )
1101 .map(|result| result.unwrap()),
1102 )
1103 .await;
1104 }
1105
1106 let (vnode1, vnode2, vnode3) = {
1107 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1108 let (second, third) = second.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1109 (
1110 build_bitmap(first),
1111 build_bitmap(second),
1112 build_bitmap(third),
1113 )
1114 };
1115
1116 let mut client3 = build_client(vnode3).await;
1117 {
1118 let mut commit_future3 = pin!(
1119 client3
1120 .commit(
1121 epoch3,
1122 SinkMetadata {
1123 metadata: Some(Metadata::Serialized(SerializedMetadata {
1124 metadata: metadata_scale_out[2].clone(),
1125 })),
1126 },
1127 )
1128 .map(|result| result.unwrap())
1129 );
1130 assert!(
1131 poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1132 .await
1133 .is_pending()
1134 );
1135
1136 {
1137 let mut commit_future = pin!(
1139 client1
1140 .commit(
1141 epoch2,
1142 SinkMetadata {
1143 metadata: Some(Metadata::Serialized(SerializedMetadata {
1144 metadata: metadata[1][0].clone(),
1145 })),
1146 },
1147 )
1148 .map(|result| result.unwrap())
1149 );
1150 assert!(
1151 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1152 .await
1153 .is_pending()
1154 );
1155 join(
1156 commit_future,
1157 client2
1158 .commit(
1159 epoch2,
1160 SinkMetadata {
1161 metadata: Some(Metadata::Serialized(SerializedMetadata {
1162 metadata: metadata[1][1].clone(),
1163 })),
1164 },
1165 )
1166 .map(|result| result.unwrap()),
1167 )
1168 .await;
1169 }
1170
1171 client1.update_vnode_bitmap(&vnode1).await.unwrap();
1172 client2.update_vnode_bitmap(&vnode2).await.unwrap();
1173 let mut commit_future1 = pin!(
1174 client1
1175 .commit(
1176 epoch3,
1177 SinkMetadata {
1178 metadata: Some(Metadata::Serialized(SerializedMetadata {
1179 metadata: metadata_scale_out[0].clone(),
1180 })),
1181 },
1182 )
1183 .map(|result| result.unwrap())
1184 );
1185 assert!(
1186 poll_fn(|cx| Poll::Ready(commit_future1.as_mut().poll(cx)))
1187 .await
1188 .is_pending()
1189 );
1190 assert!(
1191 poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1192 .await
1193 .is_pending()
1194 );
1195 client2
1196 .commit(
1197 epoch3,
1198 SinkMetadata {
1199 metadata: Some(Metadata::Serialized(SerializedMetadata {
1200 metadata: metadata_scale_out[1].clone(),
1201 })),
1202 },
1203 )
1204 .map(|result| result.unwrap())
1205 .await;
1206 }
1207
1208 let (vnode2, vnode3) = {
1209 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1210 (build_bitmap(first), build_bitmap(second))
1211 };
1212
1213 client2.update_vnode_bitmap(&vnode2).await.unwrap();
1215 client3.update_vnode_bitmap(&vnode3).await.unwrap();
1216
1217 {
1218 let mut commit_future = pin!(
1219 client2
1220 .commit(
1221 epoch4,
1222 SinkMetadata {
1223 metadata: Some(Metadata::Serialized(SerializedMetadata {
1224 metadata: metadata_scale_in[0].clone(),
1225 })),
1226 },
1227 )
1228 .map(|result| result.unwrap())
1229 );
1230 assert!(
1231 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1232 .await
1233 .is_pending()
1234 );
1235 join(
1236 commit_future,
1237 client3
1238 .commit(
1239 epoch4,
1240 SinkMetadata {
1241 metadata: Some(Metadata::Serialized(SerializedMetadata {
1242 metadata: metadata_scale_in[1].clone(),
1243 })),
1244 },
1245 )
1246 .map(|result| result.unwrap()),
1247 )
1248 .await;
1249 }
1250 }
1251}