1use std::collections::HashMap;
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use futures::future::{BoxFuture, Either, select};
21use futures::stream::FuturesUnordered;
22use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
25use risingwave_connector::sink::catalog::SinkId;
26use risingwave_connector::sink::{SinkCommittedEpochSubscriber, SinkError, SinkParam};
27use risingwave_pb::connector_service::coordinate_request::Msg;
28use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse, coordinate_request};
29use rw_futures_util::pending_on_none;
30use sea_orm::DatabaseConnection;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
34use tokio::sync::oneshot::{Receiver, Sender, channel};
35use tokio::task::{JoinError, JoinHandle};
36use tokio_stream::wrappers::UnboundedReceiverStream;
37use tonic::Status;
38use tracing::{debug, error, info, warn};
39
40use crate::hummock::HummockManagerRef;
41use crate::manager::MetadataManager;
42use crate::manager::sink_coordination::SinkWriterRequestStream;
43use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
44use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
45
46macro_rules! send_with_err_check {
47 ($tx:expr, $msg:expr) => {
48 if $tx.send($msg).is_err() {
49 error!("unable to send msg");
50 }
51 };
52}
53
54macro_rules! send_await_with_err_check {
55 ($tx:expr, $msg:expr) => {
56 if $tx.send($msg).await.is_err() {
57 error!("unable to send msg");
58 }
59 };
60}
61
62const BOUNDED_CHANNEL_SIZE: usize = 16;
63
64enum ManagerRequest {
65 NewSinkWriter(SinkWriterCoordinationHandle),
66 StopCoordinator {
67 finish_notifier: Sender<()>,
68 sink_id: Option<SinkId>,
70 },
71}
72
73#[derive(Clone)]
74pub struct SinkCoordinatorManager {
75 request_tx: mpsc::Sender<ManagerRequest>,
76}
77
78fn new_committed_epoch_subscriber(
79 hummock_manager: HummockManagerRef,
80 metadata_manager: MetadataManager,
81) -> SinkCommittedEpochSubscriber {
82 Arc::new(move |sink_id| {
83 let hummock_manager = hummock_manager.clone();
84 let metadata_manager = metadata_manager.clone();
85 async move {
86 let state_table_ids = metadata_manager
87 .get_sink_state_table_ids(sink_id.sink_id as _)
88 .await
89 .map_err(SinkError::from)?;
90 let Some(table_id) = state_table_ids.first() else {
91 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
92 };
93 hummock_manager
94 .subscribe_table_committed_epoch(*table_id)
95 .await
96 .map_err(SinkError::from)
97 }
98 .boxed()
99 })
100}
101
102impl SinkCoordinatorManager {
103 #[cfg(test)]
104 pub(crate) fn for_test() -> Self {
105 let (request_tx, mut request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
106 let _join_handle = tokio::spawn(async move {
107 while let Some(req) = request_rx.recv().await {
108 let ManagerRequest::StopCoordinator {
109 finish_notifier,
110 sink_id,
111 } = req
112 else {
113 unreachable!()
114 };
115 assert_eq!(sink_id, None);
116 finish_notifier.send(()).unwrap();
117 }
118 });
119 SinkCoordinatorManager { request_tx }
120 }
121
122 pub fn start_worker(
123 db: DatabaseConnection,
124 hummock_manager: HummockManagerRef,
125 metadata_manager: MetadataManager,
126 iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
127 ) -> (Self, (JoinHandle<()>, Sender<()>)) {
128 let subscriber =
129 new_committed_epoch_subscriber(hummock_manager.clone(), metadata_manager.clone());
130 Self::start_worker_with_spawn_worker(move |param, manager_request_stream| {
131 tokio::spawn(CoordinatorWorker::run(
132 param,
133 manager_request_stream,
134 db.clone(),
135 subscriber.clone(),
136 iceberg_compact_stat_sender.clone(),
137 ))
138 })
139 }
140
141 fn start_worker_with_spawn_worker(
142 spawn_coordinator_worker: impl SpawnCoordinatorFn,
143 ) -> (Self, (JoinHandle<()>, Sender<()>)) {
144 let (request_tx, request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
145 let (shutdown_tx, shutdown_rx) = channel();
146 let worker = ManagerWorker::new(request_rx, shutdown_rx);
147 let join_handle = tokio::spawn(worker.execute(spawn_coordinator_worker));
148 (
149 SinkCoordinatorManager { request_tx },
150 (join_handle, shutdown_tx),
151 )
152 }
153
154 pub async fn handle_new_request(
155 &self,
156 mut request_stream: SinkWriterRequestStream,
157 ) -> Result<impl Stream<Item = Result<CoordinateResponse, Status>> + use<>, Status> {
158 let (param, vnode_bitmap) = match request_stream.try_next().await? {
159 Some(CoordinateRequest {
160 msg:
161 Some(Msg::StartRequest(coordinate_request::StartCoordinationRequest {
162 param: Some(param),
163 vnode_bitmap: Some(vnode_bitmap),
164 })),
165 }) => (SinkParam::from_proto(param), Bitmap::from(&vnode_bitmap)),
166 msg => {
167 return Err(Status::invalid_argument(format!(
168 "expected CoordinateRequest::StartRequest in the first request, get {:?}",
169 msg
170 )));
171 }
172 };
173 let (response_tx, response_rx) = mpsc::unbounded_channel();
174 self.request_tx
175 .send(ManagerRequest::NewSinkWriter(
176 SinkWriterCoordinationHandle::new(request_stream, response_tx, param, vnode_bitmap),
177 ))
178 .await
179 .map_err(|_| {
180 Status::unavailable(
181 "unable to send to sink manager worker. The worker may have stopped",
182 )
183 })?;
184
185 Ok(UnboundedReceiverStream::new(response_rx))
186 }
187
188 async fn stop_coordinator(&self, sink_id: Option<SinkId>) {
189 let (tx, rx) = channel();
190 send_await_with_err_check!(
191 self.request_tx,
192 ManagerRequest::StopCoordinator {
193 finish_notifier: tx,
194 sink_id,
195 }
196 );
197 if rx.await.is_err() {
198 error!("fail to wait for resetting sink manager worker");
199 }
200 info!("successfully stop coordinator: {:?}", sink_id);
201 }
202
203 pub async fn reset(&self) {
204 self.stop_coordinator(None).await;
205 }
206
207 pub async fn stop_sink_coordinator(&self, sink_id: SinkId) {
208 self.stop_coordinator(Some(sink_id)).await;
209 }
210}
211
212struct CoordinatorWorkerHandle {
213 request_sender: Option<UnboundedSender<SinkWriterCoordinationHandle>>,
215 finish_notifiers: Vec<Sender<()>>,
217}
218
219struct ManagerWorker {
220 request_rx: mpsc::Receiver<ManagerRequest>,
221 shutdown_rx: Receiver<()>,
223
224 running_coordinator_worker_join_handles:
225 FuturesUnordered<BoxFuture<'static, (SinkId, Result<(), JoinError>)>>,
226 running_coordinator_worker: HashMap<SinkId, CoordinatorWorkerHandle>,
227}
228
229enum ManagerEvent {
230 NewRequest(ManagerRequest),
231 CoordinatorWorkerFinished {
232 sink_id: SinkId,
233 join_result: Result<(), JoinError>,
234 },
235}
236
237trait SpawnCoordinatorFn = FnMut(SinkParam, UnboundedReceiver<SinkWriterCoordinationHandle>) -> JoinHandle<()>
238 + Send
239 + 'static;
240
241impl ManagerWorker {
242 fn new(request_rx: mpsc::Receiver<ManagerRequest>, shutdown_rx: Receiver<()>) -> Self {
243 ManagerWorker {
244 request_rx,
245 shutdown_rx,
246 running_coordinator_worker_join_handles: Default::default(),
247 running_coordinator_worker: Default::default(),
248 }
249 }
250
251 async fn execute(mut self, mut spawn_coordinator_worker: impl SpawnCoordinatorFn) {
252 while let Some(event) = self.next_event().await {
253 match event {
254 ManagerEvent::NewRequest(request) => match request {
255 ManagerRequest::NewSinkWriter(request) => {
256 self.handle_new_sink_writer(request, &mut spawn_coordinator_worker)
257 }
258 ManagerRequest::StopCoordinator {
259 finish_notifier,
260 sink_id,
261 } => {
262 if let Some(sink_id) = sink_id {
263 if let Some(worker_handle) =
264 self.running_coordinator_worker.get_mut(&sink_id)
265 {
266 if let Some(sender) = worker_handle.request_sender.take() {
267 drop(sender);
270 }
271 worker_handle.finish_notifiers.push(finish_notifier);
272 } else {
273 debug!(
274 "sink coordinator of {} is not running. Notify finish directly",
275 sink_id.sink_id
276 );
277 send_with_err_check!(finish_notifier, ());
278 }
279 } else {
280 self.clean_up().await;
281 send_with_err_check!(finish_notifier, ());
282 }
283 }
284 },
285 ManagerEvent::CoordinatorWorkerFinished {
286 sink_id,
287 join_result,
288 } => self.handle_coordinator_finished(sink_id, join_result),
289 }
290 }
291 self.clean_up().await;
292 info!("sink manager worker exited");
293 }
294
295 async fn next_event(&mut self) -> Option<ManagerEvent> {
296 match select(
297 select(
298 pin!(self.request_rx.recv()),
299 pin!(pending_on_none(
300 self.running_coordinator_worker_join_handles.next()
301 )),
302 ),
303 &mut self.shutdown_rx,
304 )
305 .await
306 {
307 Either::Left((either, _)) => match either {
308 Either::Left((Some(request), _)) => Some(ManagerEvent::NewRequest(request)),
309 Either::Left((None, _)) => None,
310 Either::Right(((sink_id, join_result), _)) => {
311 Some(ManagerEvent::CoordinatorWorkerFinished {
312 sink_id,
313 join_result,
314 })
315 }
316 },
317 Either::Right(_) => None,
318 }
319 }
320
321 async fn clean_up(&mut self) {
322 info!("sink manager worker start cleaning up");
323 for worker_handle in self.running_coordinator_worker.values_mut() {
324 if let Some(sender) = worker_handle.request_sender.take() {
325 drop(sender);
327 }
328 }
329 while let Some((sink_id, join_result)) =
330 self.running_coordinator_worker_join_handles.next().await
331 {
332 self.handle_coordinator_finished(sink_id, join_result);
333 }
334 info!("sink manager worker finished cleaning up");
335 }
336
337 fn handle_coordinator_finished(&mut self, sink_id: SinkId, join_result: Result<(), JoinError>) {
338 let worker_handle = self
339 .running_coordinator_worker
340 .remove(&sink_id)
341 .expect("finished coordinator should have an associated worker handle");
342 for finish_notifier in worker_handle.finish_notifiers {
343 send_with_err_check!(finish_notifier, ());
344 }
345 match join_result {
346 Ok(()) => {
347 info!(
348 id = sink_id.sink_id,
349 "sink coordinator has gracefully finished",
350 );
351 }
352 Err(err) => {
353 error!(
354 id = sink_id.sink_id,
355 error = %err.as_report(),
356 "sink coordinator finished with error",
357 );
358 }
359 }
360 }
361
362 fn handle_new_sink_writer(
363 &mut self,
364 new_writer: SinkWriterCoordinationHandle,
365 spawn_coordinator_worker: &mut impl SpawnCoordinatorFn,
366 ) {
367 let param = new_writer.param();
368 let sink_id = param.sink_id;
369
370 let handle = self
371 .running_coordinator_worker
372 .entry(param.sink_id)
373 .or_insert_with(|| {
374 let (request_tx, request_rx) = unbounded_channel();
376 let join_handle = spawn_coordinator_worker(param.clone(), request_rx);
377 self.running_coordinator_worker_join_handles.push(
378 join_handle
379 .map(move |join_result| (sink_id, join_result))
380 .boxed(),
381 );
382 CoordinatorWorkerHandle {
383 request_sender: Some(request_tx),
384 finish_notifiers: Vec::new(),
385 }
386 });
387
388 if let Some(sender) = handle.request_sender.as_mut() {
389 send_with_err_check!(sender, new_writer);
390 } else {
391 warn!(
392 "handle a new request while the sink coordinator is being stopped: {:?}",
393 param
394 );
395 new_writer.abort(Status::internal("the sink is being stopped"));
396 }
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use std::future::{Future, poll_fn};
403 use std::pin::pin;
404 use std::sync::Arc;
405 use std::task::Poll;
406
407 use anyhow::anyhow;
408 use async_trait::async_trait;
409 use futures::future::{join, try_join};
410 use futures::{FutureExt, StreamExt, TryFutureExt};
411 use itertools::Itertools;
412 use rand::seq::SliceRandom;
413 use risingwave_common::bitmap::BitmapBuilder;
414 use risingwave_common::hash::VirtualNode;
415 use risingwave_connector::sink::catalog::{SinkId, SinkType};
416 use risingwave_connector::sink::{SinkCommitCoordinator, SinkError, SinkParam};
417 use risingwave_pb::connector_service::SinkMetadata;
418 use risingwave_pb::connector_service::sink_metadata::{Metadata, SerializedMetadata};
419 use risingwave_rpc_client::CoordinatorStreamHandle;
420 use tokio::sync::mpsc::unbounded_channel;
421 use tokio_stream::wrappers::ReceiverStream;
422
423 use crate::manager::sink_coordination::SinkCoordinatorManager;
424 use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
425 use crate::manager::sink_coordination::manager::SinkCommittedEpochSubscriber;
426
427 struct MockCoordinator<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> {
428 context: C,
429 f: F,
430 }
431
432 impl<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> MockCoordinator<C, F> {
433 fn new(context: C, f: F) -> Self {
434 MockCoordinator { context, f }
435 }
436 }
437
438 #[async_trait]
439 impl<C: Send, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError> + Send>
440 SinkCommitCoordinator for MockCoordinator<C, F>
441 {
442 async fn init(
443 &mut self,
444 _subscriber: SinkCommittedEpochSubscriber,
445 ) -> risingwave_connector::sink::Result<Option<u64>> {
446 Ok(None)
447 }
448
449 async fn commit(
450 &mut self,
451 epoch: u64,
452 metadata: Vec<SinkMetadata>,
453 ) -> risingwave_connector::sink::Result<()> {
454 (self.f)(epoch, metadata, &mut self.context)
455 }
456 }
457
458 #[tokio::test]
459 async fn test_basic() {
460 let param = SinkParam {
461 sink_id: SinkId::from(1),
462 sink_name: "test".into(),
463 properties: Default::default(),
464 columns: vec![],
465 downstream_pk: vec![],
466 sink_type: SinkType::AppendOnly,
467 format_desc: None,
468 db_name: "test".into(),
469 sink_from_name: "test".into(),
470 };
471
472 let epoch0 = 232;
473 let epoch1 = 233;
474 let epoch2 = 234;
475
476 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
477 all_vnode.shuffle(&mut rand::rng());
478 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
479 let build_bitmap = |indexes: &[usize]| {
480 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
481 for i in indexes {
482 builder.set(*i, true);
483 }
484 builder.finish()
485 };
486 let vnode1 = build_bitmap(first);
487 let vnode2 = build_bitmap(second);
488
489 let metadata = [
490 [vec![1u8, 2u8], vec![3u8, 4u8]],
491 [vec![5u8, 6u8], vec![7u8, 8u8]],
492 ];
493 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
494 let (_sender, receiver) = unbounded_channel();
495
496 async move { Ok((1, receiver)) }.boxed()
497 });
498
499 let (manager, (_join_handle, _stop_tx)) =
500 SinkCoordinatorManager::start_worker_with_spawn_worker({
501 let expected_param = param.clone();
502 let metadata = metadata.clone();
503 move |param, new_writer_rx| {
504 let metadata = metadata.clone();
505 let expected_param = expected_param.clone();
506 tokio::spawn({
507 let subscriber = mock_subscriber.clone();
508 async move {
509 assert_eq!(param, expected_param);
511 CoordinatorWorker::execute_coordinator(
512 param.clone(),
513 new_writer_rx,
514 MockCoordinator::new(
515 0,
516 |epoch, metadata_list, count: &mut usize| {
517 *count += 1;
518 let mut metadata_list =
519 metadata_list
520 .into_iter()
521 .map(|metadata| match metadata {
522 SinkMetadata {
523 metadata:
524 Some(Metadata::Serialized(
525 SerializedMetadata { metadata },
526 )),
527 } => metadata,
528 _ => unreachable!(),
529 })
530 .collect_vec();
531 metadata_list.sort();
532 match *count {
533 1 => {
534 assert_eq!(epoch, epoch1);
535 assert_eq!(2, metadata_list.len());
536 assert_eq!(metadata[0][0], metadata_list[0]);
537 assert_eq!(metadata[0][1], metadata_list[1]);
538 }
539 2 => {
540 assert_eq!(epoch, epoch2);
541 assert_eq!(2, metadata_list.len());
542 assert_eq!(metadata[1][0], metadata_list[0]);
543 assert_eq!(metadata[1][1], metadata_list[1]);
544 }
545 _ => unreachable!(),
546 }
547 Ok(())
548 },
549 ),
550 subscriber.clone(),
551 )
552 .await;
553 }
554 })
555 }
556 });
557
558 let build_client = |vnode| async {
559 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
560 Ok(tonic::Response::new(
561 manager
562 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
563 .await
564 .unwrap()
565 .boxed(),
566 ))
567 })
568 .await
569 .unwrap()
570 .0
571 };
572
573 let (mut client1, mut client2) =
574 join(build_client(vnode1), pin!(build_client(vnode2))).await;
575
576 let (aligned_epoch1, aligned_epoch2) = try_join(
577 client1.align_initial_epoch(epoch0),
578 client2.align_initial_epoch(epoch1),
579 )
580 .await
581 .unwrap();
582 assert_eq!(aligned_epoch1, epoch1);
583 assert_eq!(aligned_epoch2, epoch1);
584
585 {
586 let mut commit_future = pin!(
588 client2
589 .commit(
590 epoch1,
591 SinkMetadata {
592 metadata: Some(Metadata::Serialized(SerializedMetadata {
593 metadata: metadata[0][1].clone(),
594 })),
595 },
596 )
597 .map(|result| result.unwrap())
598 );
599 assert!(
600 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
601 .await
602 .is_pending()
603 );
604 join(
605 commit_future,
606 client1
607 .commit(
608 epoch1,
609 SinkMetadata {
610 metadata: Some(Metadata::Serialized(SerializedMetadata {
611 metadata: metadata[0][0].clone(),
612 })),
613 },
614 )
615 .map(|result| result.unwrap()),
616 )
617 .await;
618 }
619
620 let mut commit_future = pin!(
622 client1
623 .commit(
624 epoch2,
625 SinkMetadata {
626 metadata: Some(Metadata::Serialized(SerializedMetadata {
627 metadata: metadata[1][0].clone(),
628 })),
629 },
630 )
631 .map(|result| result.unwrap())
632 );
633 assert!(
634 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
635 .await
636 .is_pending()
637 );
638 join(
639 commit_future,
640 client2
641 .commit(
642 epoch2,
643 SinkMetadata {
644 metadata: Some(Metadata::Serialized(SerializedMetadata {
645 metadata: metadata[1][1].clone(),
646 })),
647 },
648 )
649 .map(|result| result.unwrap()),
650 )
651 .await;
652 }
653
654 #[tokio::test]
655 async fn test_single_writer() {
656 let param = SinkParam {
657 sink_id: SinkId::from(1),
658 sink_name: "test".into(),
659 properties: Default::default(),
660 columns: vec![],
661 downstream_pk: vec![],
662 sink_type: SinkType::AppendOnly,
663 format_desc: None,
664 db_name: "test".into(),
665 sink_from_name: "test".into(),
666 };
667
668 let epoch1 = 233;
669 let epoch2 = 234;
670
671 let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
672 let build_bitmap = |indexes: &[usize]| {
673 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
674 for i in indexes {
675 builder.set(*i, true);
676 }
677 builder.finish()
678 };
679 let vnode = build_bitmap(&all_vnode);
680
681 let metadata = [vec![1u8, 2u8], vec![3u8, 4u8]];
682 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
683 let (_sender, receiver) = unbounded_channel();
684
685 async move { Ok((1, receiver)) }.boxed()
686 });
687 let (manager, (_join_handle, _stop_tx)) =
688 SinkCoordinatorManager::start_worker_with_spawn_worker({
689 let expected_param = param.clone();
690 let metadata = metadata.clone();
691 move |param, new_writer_rx| {
692 let metadata = metadata.clone();
693 let expected_param = expected_param.clone();
694 tokio::spawn({
695 let subscriber = mock_subscriber.clone();
696 async move {
697 assert_eq!(param, expected_param);
699 CoordinatorWorker::execute_coordinator(
700 param.clone(),
701 new_writer_rx,
702 MockCoordinator::new(
703 0,
704 |epoch, metadata_list, count: &mut usize| {
705 *count += 1;
706 let mut metadata_list =
707 metadata_list
708 .into_iter()
709 .map(|metadata| match metadata {
710 SinkMetadata {
711 metadata:
712 Some(Metadata::Serialized(
713 SerializedMetadata { metadata },
714 )),
715 } => metadata,
716 _ => unreachable!(),
717 })
718 .collect_vec();
719 metadata_list.sort();
720 match *count {
721 1 => {
722 assert_eq!(epoch, epoch1);
723 assert_eq!(1, metadata_list.len());
724 assert_eq!(metadata[0], metadata_list[0]);
725 }
726 2 => {
727 assert_eq!(epoch, epoch2);
728 assert_eq!(1, metadata_list.len());
729 assert_eq!(metadata[1], metadata_list[0]);
730 }
731 _ => unreachable!(),
732 }
733 Ok(())
734 },
735 ),
736 subscriber.clone(),
737 )
738 .await;
739 }
740 })
741 }
742 });
743
744 let build_client = |vnode| async {
745 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
746 Ok(tonic::Response::new(
747 manager
748 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
749 .await
750 .unwrap()
751 .boxed(),
752 ))
753 })
754 .await
755 .unwrap()
756 .0
757 };
758
759 let mut client = build_client(vnode).await;
760
761 let aligned_epoch = client.align_initial_epoch(epoch1).await.unwrap();
762 assert_eq!(aligned_epoch, epoch1);
763
764 client
765 .commit(
766 epoch1,
767 SinkMetadata {
768 metadata: Some(Metadata::Serialized(SerializedMetadata {
769 metadata: metadata[0].clone(),
770 })),
771 },
772 )
773 .await
774 .unwrap();
775
776 client
777 .commit(
778 epoch2,
779 SinkMetadata {
780 metadata: Some(Metadata::Serialized(SerializedMetadata {
781 metadata: metadata[1].clone(),
782 })),
783 },
784 )
785 .await
786 .unwrap();
787 }
788
789 #[tokio::test]
790 async fn test_partial_commit() {
791 let param = SinkParam {
792 sink_id: SinkId::from(1),
793 sink_name: "test".into(),
794 properties: Default::default(),
795 columns: vec![],
796 downstream_pk: vec![],
797 sink_type: SinkType::AppendOnly,
798 format_desc: None,
799 db_name: "test".into(),
800 sink_from_name: "test".into(),
801 };
802
803 let epoch = 233;
804
805 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
806 all_vnode.shuffle(&mut rand::rng());
807 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
808 let build_bitmap = |indexes: &[usize]| {
809 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
810 for i in indexes {
811 builder.set(*i, true);
812 }
813 builder.finish()
814 };
815 let vnode1 = build_bitmap(first);
816 let vnode2 = build_bitmap(second);
817
818 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
819 let (_sender, receiver) = unbounded_channel();
820
821 async move { Ok((1, receiver)) }.boxed()
822 });
823 let (manager, (_join_handle, _stop_tx)) =
824 SinkCoordinatorManager::start_worker_with_spawn_worker({
825 let expected_param = param.clone();
826 move |param, new_writer_rx| {
827 let expected_param = expected_param.clone();
828 tokio::spawn({
829 let subscriber = mock_subscriber.clone();
830 async move {
831 assert_eq!(param, expected_param);
833 CoordinatorWorker::execute_coordinator(
834 param,
835 new_writer_rx,
836 MockCoordinator::new((), |_, _, _| unreachable!()),
837 subscriber.clone(),
838 )
839 .await;
840 }
841 })
842 }
843 });
844
845 let build_client = |vnode| async {
846 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
847 Ok(tonic::Response::new(
848 manager
849 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
850 .await
851 .unwrap()
852 .boxed(),
853 ))
854 })
855 .await
856 .unwrap()
857 .0
858 };
859
860 let (mut client1, client2) = join(build_client(vnode1), build_client(vnode2)).await;
861
862 let mut commit_future = pin!(client1.commit(
864 epoch,
865 SinkMetadata {
866 metadata: Some(Metadata::Serialized(SerializedMetadata {
867 metadata: vec![],
868 })),
869 },
870 ));
871 assert!(
872 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
873 .await
874 .is_pending()
875 );
876 drop(client2);
877 assert!(commit_future.await.is_err());
878 }
879
880 #[tokio::test]
881 async fn test_fail_commit() {
882 let param = SinkParam {
883 sink_id: SinkId::from(1),
884 sink_name: "test".into(),
885 properties: Default::default(),
886 columns: vec![],
887 downstream_pk: vec![],
888 sink_type: SinkType::AppendOnly,
889 format_desc: None,
890 db_name: "test".into(),
891 sink_from_name: "test".into(),
892 };
893
894 let epoch = 233;
895
896 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
897 all_vnode.shuffle(&mut rand::rng());
898 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
899 let build_bitmap = |indexes: &[usize]| {
900 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
901 for i in indexes {
902 builder.set(*i, true);
903 }
904 builder.finish()
905 };
906 let vnode1 = build_bitmap(first);
907 let vnode2 = build_bitmap(second);
908 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
909 let (_sender, receiver) = unbounded_channel();
910
911 async move { Ok((1, receiver)) }.boxed()
912 });
913 let (manager, (_join_handle, _stop_tx)) =
914 SinkCoordinatorManager::start_worker_with_spawn_worker({
915 let expected_param = param.clone();
916 move |param, new_writer_rx| {
917 let expected_param = expected_param.clone();
918 tokio::spawn({
919 let subscriber = mock_subscriber.clone();
920 {
921 async move {
922 assert_eq!(param, expected_param);
924 CoordinatorWorker::execute_coordinator(
925 param,
926 new_writer_rx,
927 MockCoordinator::new((), |_, _, _| {
928 Err(SinkError::Coordinator(anyhow!("failed to commit")))
929 }),
930 subscriber.clone(),
931 )
932 .await;
933 }
934 }
935 })
936 }
937 });
938
939 let build_client = |vnode| async {
940 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
941 Ok(tonic::Response::new(
942 manager
943 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
944 .await
945 .unwrap()
946 .boxed(),
947 ))
948 })
949 .await
950 .unwrap()
951 .0
952 };
953
954 let (mut client1, mut client2) = join(build_client(vnode1), build_client(vnode2)).await;
955
956 let mut commit_future = pin!(client1.commit(
958 epoch,
959 SinkMetadata {
960 metadata: Some(Metadata::Serialized(SerializedMetadata {
961 metadata: vec![],
962 })),
963 },
964 ));
965 assert!(
966 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
967 .await
968 .is_pending()
969 );
970 let (result1, result2) = join(
971 commit_future,
972 client2.commit(
973 epoch,
974 SinkMetadata {
975 metadata: Some(Metadata::Serialized(SerializedMetadata {
976 metadata: vec![],
977 })),
978 },
979 ),
980 )
981 .await;
982 assert!(result1.is_err());
983 assert!(result2.is_err());
984 }
985
986 #[tokio::test]
987 async fn test_update_vnode_bitmap() {
988 let param = SinkParam {
989 sink_id: SinkId::from(1),
990 sink_name: "test".into(),
991 properties: Default::default(),
992 columns: vec![],
993 downstream_pk: vec![],
994 sink_type: SinkType::AppendOnly,
995 format_desc: None,
996 db_name: "test".into(),
997 sink_from_name: "test".into(),
998 };
999
1000 let epoch1 = 233;
1001 let epoch2 = 234;
1002 let epoch3 = 235;
1003 let epoch4 = 236;
1004
1005 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1006 all_vnode.shuffle(&mut rand::rng());
1007 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
1008 let build_bitmap = |indexes: &[usize]| {
1009 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1010 for i in indexes {
1011 builder.set(*i, true);
1012 }
1013 builder.finish()
1014 };
1015 let vnode1 = build_bitmap(first);
1016 let vnode2 = build_bitmap(second);
1017
1018 let metadata = [
1019 [vec![1u8, 2u8], vec![3u8, 4u8]],
1020 [vec![5u8, 6u8], vec![7u8, 8u8]],
1021 ];
1022
1023 let metadata_scale_out = [vec![9u8, 10u8], vec![11u8, 12u8], vec![13u8, 14u8]];
1024 let metadata_scale_in = [vec![13u8, 14u8], vec![15u8, 16u8]];
1025 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
1026 let (_sender, receiver) = unbounded_channel();
1027
1028 async move { Ok((1, receiver)) }.boxed()
1029 });
1030 let (manager, (_join_handle, _stop_tx)) =
1031 SinkCoordinatorManager::start_worker_with_spawn_worker({
1032 let expected_param = param.clone();
1033 let metadata = metadata.clone();
1034 let metadata_scale_out = metadata_scale_out.clone();
1035 let metadata_scale_in = metadata_scale_in.clone();
1036 move |param, new_writer_rx| {
1037 let metadata = metadata.clone();
1038 let metadata_scale_out = metadata_scale_out.clone();
1039 let metadata_scale_in = metadata_scale_in.clone();
1040 let expected_param = expected_param.clone();
1041 tokio::spawn({
1042 let subscriber = mock_subscriber.clone();
1043 async move {
1044 assert_eq!(param, expected_param);
1046 CoordinatorWorker::execute_coordinator(
1047 param.clone(),
1048 new_writer_rx,
1049 MockCoordinator::new(
1050 0,
1051 |epoch, metadata_list, count: &mut usize| {
1052 *count += 1;
1053 let mut metadata_list =
1054 metadata_list
1055 .into_iter()
1056 .map(|metadata| match metadata {
1057 SinkMetadata {
1058 metadata:
1059 Some(Metadata::Serialized(
1060 SerializedMetadata { metadata },
1061 )),
1062 } => metadata,
1063 _ => unreachable!(),
1064 })
1065 .collect_vec();
1066 metadata_list.sort();
1067 let (expected_epoch, expected_metadata_list) = match *count
1068 {
1069 1 => (epoch1, metadata[0].as_slice()),
1070 2 => (epoch2, metadata[1].as_slice()),
1071 3 => (epoch3, metadata_scale_out.as_slice()),
1072 4 => (epoch4, metadata_scale_in.as_slice()),
1073 _ => unreachable!(),
1074 };
1075 assert_eq!(expected_epoch, epoch);
1076 assert_eq!(expected_metadata_list, &metadata_list);
1077 Ok(())
1078 },
1079 ),
1080 subscriber.clone(),
1081 )
1082 .await;
1083 }
1084 })
1085 }
1086 });
1087
1088 let build_client = |vnode| async {
1089 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1090 Ok(tonic::Response::new(
1091 manager
1092 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1093 .await
1094 .unwrap()
1095 .boxed(),
1096 ))
1097 })
1098 .await
1099 };
1100
1101 let ((mut client1, _), (mut client2, _)) =
1102 try_join(build_client(vnode1), pin!(build_client(vnode2)))
1103 .await
1104 .unwrap();
1105
1106 let (aligned_epoch1, aligned_epoch2) = try_join(
1107 client1.align_initial_epoch(epoch1),
1108 client2.align_initial_epoch(epoch1),
1109 )
1110 .await
1111 .unwrap();
1112 assert_eq!(aligned_epoch1, epoch1);
1113 assert_eq!(aligned_epoch2, epoch1);
1114
1115 {
1116 let mut commit_future = pin!(
1118 client2
1119 .commit(
1120 epoch1,
1121 SinkMetadata {
1122 metadata: Some(Metadata::Serialized(SerializedMetadata {
1123 metadata: metadata[0][1].clone(),
1124 })),
1125 },
1126 )
1127 .map(|result| result.unwrap())
1128 );
1129 assert!(
1130 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1131 .await
1132 .is_pending()
1133 );
1134 join(
1135 commit_future,
1136 client1
1137 .commit(
1138 epoch1,
1139 SinkMetadata {
1140 metadata: Some(Metadata::Serialized(SerializedMetadata {
1141 metadata: metadata[0][0].clone(),
1142 })),
1143 },
1144 )
1145 .map(|result| result.unwrap()),
1146 )
1147 .await;
1148 }
1149
1150 let (vnode1, vnode2, vnode3) = {
1151 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1152 let (second, third) = second.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1153 (
1154 build_bitmap(first),
1155 build_bitmap(second),
1156 build_bitmap(third),
1157 )
1158 };
1159
1160 let mut build_client3_future = pin!(build_client(vnode3));
1161 assert!(
1162 poll_fn(|cx| Poll::Ready(build_client3_future.as_mut().poll(cx)))
1163 .await
1164 .is_pending()
1165 );
1166 let mut client3;
1167 {
1168 {
1169 let mut commit_future = pin!(
1171 client1
1172 .commit(
1173 epoch2,
1174 SinkMetadata {
1175 metadata: Some(Metadata::Serialized(SerializedMetadata {
1176 metadata: metadata[1][0].clone(),
1177 })),
1178 },
1179 )
1180 .map_err(Into::into)
1181 );
1182 assert!(
1183 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1184 .await
1185 .is_pending()
1186 );
1187 try_join(
1188 commit_future,
1189 client2.commit(
1190 epoch2,
1191 SinkMetadata {
1192 metadata: Some(Metadata::Serialized(SerializedMetadata {
1193 metadata: metadata[1][1].clone(),
1194 })),
1195 },
1196 ),
1197 )
1198 .await
1199 .unwrap();
1200 }
1201
1202 client3 = {
1203 let (
1204 (client3, init_epoch),
1205 (update_vnode_bitmap_epoch1, update_vnode_bitmap_epoch2),
1206 ) = try_join(
1207 build_client3_future,
1208 try_join(
1209 client1.update_vnode_bitmap(&vnode1),
1210 client2.update_vnode_bitmap(&vnode2),
1211 )
1212 .map_err(Into::into),
1213 )
1214 .await
1215 .unwrap();
1216 assert_eq!(init_epoch, Some(epoch2));
1217 assert_eq!(update_vnode_bitmap_epoch1, epoch2);
1218 assert_eq!(update_vnode_bitmap_epoch2, epoch2);
1219 client3
1220 };
1221 let mut commit_future3 = pin!(client3.commit(
1222 epoch3,
1223 SinkMetadata {
1224 metadata: Some(Metadata::Serialized(SerializedMetadata {
1225 metadata: metadata_scale_out[2].clone(),
1226 })),
1227 },
1228 ));
1229 assert!(
1230 poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1231 .await
1232 .is_pending()
1233 );
1234 let mut commit_future1 = pin!(client1.commit(
1235 epoch3,
1236 SinkMetadata {
1237 metadata: Some(Metadata::Serialized(SerializedMetadata {
1238 metadata: metadata_scale_out[0].clone(),
1239 })),
1240 },
1241 ));
1242 assert!(
1243 poll_fn(|cx| Poll::Ready(commit_future1.as_mut().poll(cx)))
1244 .await
1245 .is_pending()
1246 );
1247 assert!(
1248 poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1249 .await
1250 .is_pending()
1251 );
1252 try_join(
1253 client2.commit(
1254 epoch3,
1255 SinkMetadata {
1256 metadata: Some(Metadata::Serialized(SerializedMetadata {
1257 metadata: metadata_scale_out[1].clone(),
1258 })),
1259 },
1260 ),
1261 try_join(commit_future1, commit_future3),
1262 )
1263 .await
1264 .unwrap();
1265 }
1266
1267 let (vnode2, vnode3) = {
1268 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1269 (build_bitmap(first), build_bitmap(second))
1270 };
1271
1272 {
1273 let (_, (update_vnode_bitmap_epoch2, update_vnode_bitmap_epoch3)) = try_join(
1274 client1.stop(),
1275 try_join(
1276 client2.update_vnode_bitmap(&vnode2),
1277 client3.update_vnode_bitmap(&vnode3),
1278 ),
1279 )
1280 .await
1281 .unwrap();
1282 assert_eq!(update_vnode_bitmap_epoch2, epoch3);
1283 assert_eq!(update_vnode_bitmap_epoch3, epoch3);
1284 }
1285
1286 {
1287 let mut commit_future = pin!(
1288 client2
1289 .commit(
1290 epoch4,
1291 SinkMetadata {
1292 metadata: Some(Metadata::Serialized(SerializedMetadata {
1293 metadata: metadata_scale_in[0].clone(),
1294 })),
1295 },
1296 )
1297 .map(|result| result.unwrap())
1298 );
1299 assert!(
1300 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1301 .await
1302 .is_pending()
1303 );
1304 join(
1305 commit_future,
1306 client3
1307 .commit(
1308 epoch4,
1309 SinkMetadata {
1310 metadata: Some(Metadata::Serialized(SerializedMetadata {
1311 metadata: metadata_scale_in[1].clone(),
1312 })),
1313 },
1314 )
1315 .map(|result| result.unwrap()),
1316 )
1317 .await;
1318 }
1319 }
1320}