1use std::collections::HashMap;
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::anyhow;
20use futures::future::{BoxFuture, Either, select};
21use futures::stream::FuturesUnordered;
22use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
23use risingwave_common::bitmap::Bitmap;
24use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
25use risingwave_connector::sink::catalog::SinkId;
26use risingwave_connector::sink::{SinkCommittedEpochSubscriber, SinkError, SinkParam};
27use risingwave_pb::connector_service::coordinate_request::Msg;
28use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse, coordinate_request};
29use rw_futures_util::pending_on_none;
30use sea_orm::DatabaseConnection;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
34use tokio::sync::oneshot::{Receiver, Sender, channel};
35use tokio::task::{JoinError, JoinHandle};
36use tokio_stream::wrappers::UnboundedReceiverStream;
37use tonic::Status;
38use tracing::{debug, error, info, warn};
39
40use crate::hummock::HummockManagerRef;
41use crate::manager::MetadataManager;
42use crate::manager::sink_coordination::SinkWriterRequestStream;
43use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
44use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
45
46macro_rules! send_with_err_check {
47 ($tx:expr, $msg:expr) => {
48 if $tx.send($msg).is_err() {
49 error!("unable to send msg");
50 }
51 };
52}
53
54macro_rules! send_await_with_err_check {
55 ($tx:expr, $msg:expr) => {
56 if $tx.send($msg).await.is_err() {
57 error!("unable to send msg");
58 }
59 };
60}
61
62const BOUNDED_CHANNEL_SIZE: usize = 16;
63
64enum ManagerRequest {
65 NewSinkWriter(SinkWriterCoordinationHandle),
66 StopCoordinator {
67 finish_notifier: Sender<()>,
68 sink_id: Option<SinkId>,
70 },
71}
72
73#[derive(Clone)]
74pub struct SinkCoordinatorManager {
75 request_tx: mpsc::Sender<ManagerRequest>,
76}
77
78fn new_committed_epoch_subscriber(
79 hummock_manager: HummockManagerRef,
80 metadata_manager: MetadataManager,
81) -> SinkCommittedEpochSubscriber {
82 Arc::new(move |sink_id| {
83 let hummock_manager = hummock_manager.clone();
84 let metadata_manager = metadata_manager.clone();
85 async move {
86 let state_table_ids = metadata_manager
87 .get_sink_state_table_ids(sink_id.sink_id as _)
88 .await
89 .map_err(SinkError::from)?;
90 let Some(table_id) = state_table_ids.first() else {
91 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
92 };
93 hummock_manager
94 .subscribe_table_committed_epoch(*table_id)
95 .await
96 .map_err(SinkError::from)
97 }
98 .boxed()
99 })
100}
101
102impl SinkCoordinatorManager {
103 #[cfg(test)]
104 pub(crate) fn for_test() -> Self {
105 let (request_tx, mut request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
106 let _join_handle = tokio::spawn(async move {
107 while let Some(req) = request_rx.recv().await {
108 let ManagerRequest::StopCoordinator {
109 finish_notifier,
110 sink_id,
111 } = req
112 else {
113 unreachable!()
114 };
115 assert_eq!(sink_id, None);
116 finish_notifier.send(()).unwrap();
117 }
118 });
119 SinkCoordinatorManager { request_tx }
120 }
121
122 pub fn start_worker(
123 db: DatabaseConnection,
124 hummock_manager: HummockManagerRef,
125 metadata_manager: MetadataManager,
126 iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
127 ) -> (Self, (JoinHandle<()>, Sender<()>)) {
128 let subscriber =
129 new_committed_epoch_subscriber(hummock_manager.clone(), metadata_manager.clone());
130 Self::start_worker_with_spawn_worker(move |param, manager_request_stream| {
131 tokio::spawn(CoordinatorWorker::run(
132 param,
133 manager_request_stream,
134 db.clone(),
135 subscriber.clone(),
136 iceberg_compact_stat_sender.clone(),
137 ))
138 })
139 }
140
141 fn start_worker_with_spawn_worker(
142 spawn_coordinator_worker: impl SpawnCoordinatorFn,
143 ) -> (Self, (JoinHandle<()>, Sender<()>)) {
144 let (request_tx, request_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE);
145 let (shutdown_tx, shutdown_rx) = channel();
146 let worker = ManagerWorker::new(request_rx, shutdown_rx);
147 let join_handle = tokio::spawn(worker.execute(spawn_coordinator_worker));
148 (
149 SinkCoordinatorManager { request_tx },
150 (join_handle, shutdown_tx),
151 )
152 }
153
154 pub async fn handle_new_request(
155 &self,
156 mut request_stream: SinkWriterRequestStream,
157 ) -> Result<impl Stream<Item = Result<CoordinateResponse, Status>> + use<>, Status> {
158 let (param, vnode_bitmap) = match request_stream.try_next().await? {
159 Some(CoordinateRequest {
160 msg:
161 Some(Msg::StartRequest(coordinate_request::StartCoordinationRequest {
162 param: Some(param),
163 vnode_bitmap: Some(vnode_bitmap),
164 })),
165 }) => (SinkParam::from_proto(param), Bitmap::from(&vnode_bitmap)),
166 msg => {
167 return Err(Status::invalid_argument(format!(
168 "expected CoordinateRequest::StartRequest in the first request, get {:?}",
169 msg
170 )));
171 }
172 };
173 let (response_tx, response_rx) = mpsc::unbounded_channel();
174 self.request_tx
175 .send(ManagerRequest::NewSinkWriter(
176 SinkWriterCoordinationHandle::new(request_stream, response_tx, param, vnode_bitmap),
177 ))
178 .await
179 .map_err(|_| {
180 Status::unavailable(
181 "unable to send to sink manager worker. The worker may have stopped",
182 )
183 })?;
184
185 Ok(UnboundedReceiverStream::new(response_rx))
186 }
187
188 async fn stop_coordinator(&self, sink_id: Option<SinkId>) {
189 let (tx, rx) = channel();
190 send_await_with_err_check!(
191 self.request_tx,
192 ManagerRequest::StopCoordinator {
193 finish_notifier: tx,
194 sink_id,
195 }
196 );
197 if rx.await.is_err() {
198 error!("fail to wait for resetting sink manager worker");
199 }
200 info!("successfully stop coordinator: {:?}", sink_id);
201 }
202
203 pub async fn reset(&self) {
204 self.stop_coordinator(None).await;
205 }
206
207 pub async fn stop_sink_coordinator(&self, sink_id: SinkId) {
208 self.stop_coordinator(Some(sink_id)).await;
209 }
210}
211
212struct CoordinatorWorkerHandle {
213 request_sender: Option<UnboundedSender<SinkWriterCoordinationHandle>>,
215 finish_notifiers: Vec<Sender<()>>,
217}
218
219struct ManagerWorker {
220 request_rx: mpsc::Receiver<ManagerRequest>,
221 shutdown_rx: Receiver<()>,
223
224 running_coordinator_worker_join_handles:
225 FuturesUnordered<BoxFuture<'static, (SinkId, Result<(), JoinError>)>>,
226 running_coordinator_worker: HashMap<SinkId, CoordinatorWorkerHandle>,
227}
228
229enum ManagerEvent {
230 NewRequest(ManagerRequest),
231 CoordinatorWorkerFinished {
232 sink_id: SinkId,
233 join_result: Result<(), JoinError>,
234 },
235}
236
237trait SpawnCoordinatorFn = FnMut(SinkParam, UnboundedReceiver<SinkWriterCoordinationHandle>) -> JoinHandle<()>
238 + Send
239 + 'static;
240
241impl ManagerWorker {
242 fn new(request_rx: mpsc::Receiver<ManagerRequest>, shutdown_rx: Receiver<()>) -> Self {
243 ManagerWorker {
244 request_rx,
245 shutdown_rx,
246 running_coordinator_worker_join_handles: Default::default(),
247 running_coordinator_worker: Default::default(),
248 }
249 }
250
251 async fn execute(mut self, mut spawn_coordinator_worker: impl SpawnCoordinatorFn) {
252 while let Some(event) = self.next_event().await {
253 match event {
254 ManagerEvent::NewRequest(request) => match request {
255 ManagerRequest::NewSinkWriter(request) => {
256 self.handle_new_sink_writer(request, &mut spawn_coordinator_worker)
257 }
258 ManagerRequest::StopCoordinator {
259 finish_notifier,
260 sink_id,
261 } => {
262 if let Some(sink_id) = sink_id {
263 if let Some(worker_handle) =
264 self.running_coordinator_worker.get_mut(&sink_id)
265 {
266 if let Some(sender) = worker_handle.request_sender.take() {
267 drop(sender);
270 }
271 worker_handle.finish_notifiers.push(finish_notifier);
272 } else {
273 debug!(
274 "sink coordinator of {} is not running. Notify finish directly",
275 sink_id.sink_id
276 );
277 send_with_err_check!(finish_notifier, ());
278 }
279 } else {
280 self.clean_up().await;
281 send_with_err_check!(finish_notifier, ());
282 }
283 }
284 },
285 ManagerEvent::CoordinatorWorkerFinished {
286 sink_id,
287 join_result,
288 } => self.handle_coordinator_finished(sink_id, join_result),
289 }
290 }
291 self.clean_up().await;
292 info!("sink manager worker exited");
293 }
294
295 async fn next_event(&mut self) -> Option<ManagerEvent> {
296 match select(
297 select(
298 pin!(self.request_rx.recv()),
299 pin!(pending_on_none(
300 self.running_coordinator_worker_join_handles.next()
301 )),
302 ),
303 &mut self.shutdown_rx,
304 )
305 .await
306 {
307 Either::Left((either, _)) => match either {
308 Either::Left((Some(request), _)) => Some(ManagerEvent::NewRequest(request)),
309 Either::Left((None, _)) => None,
310 Either::Right(((sink_id, join_result), _)) => {
311 Some(ManagerEvent::CoordinatorWorkerFinished {
312 sink_id,
313 join_result,
314 })
315 }
316 },
317 Either::Right(_) => None,
318 }
319 }
320
321 async fn clean_up(&mut self) {
322 info!("sink manager worker start cleaning up");
323 for worker_handle in self.running_coordinator_worker.values_mut() {
324 if let Some(sender) = worker_handle.request_sender.take() {
325 drop(sender);
327 }
328 }
329 while let Some((sink_id, join_result)) =
330 self.running_coordinator_worker_join_handles.next().await
331 {
332 self.handle_coordinator_finished(sink_id, join_result);
333 }
334 info!("sink manager worker finished cleaning up");
335 }
336
337 fn handle_coordinator_finished(&mut self, sink_id: SinkId, join_result: Result<(), JoinError>) {
338 let worker_handle = self
339 .running_coordinator_worker
340 .remove(&sink_id)
341 .expect("finished coordinator should have an associated worker handle");
342 for finish_notifier in worker_handle.finish_notifiers {
343 send_with_err_check!(finish_notifier, ());
344 }
345 match join_result {
346 Ok(()) => {
347 info!(
348 id = sink_id.sink_id,
349 "sink coordinator has gracefully finished",
350 );
351 }
352 Err(err) => {
353 error!(
354 id = sink_id.sink_id,
355 error = %err.as_report(),
356 "sink coordinator finished with error",
357 );
358 }
359 }
360 }
361
362 fn handle_new_sink_writer(
363 &mut self,
364 new_writer: SinkWriterCoordinationHandle,
365 spawn_coordinator_worker: &mut impl SpawnCoordinatorFn,
366 ) {
367 let param = new_writer.param();
368 let sink_id = param.sink_id;
369
370 let handle = self
371 .running_coordinator_worker
372 .entry(param.sink_id)
373 .or_insert_with(|| {
374 let (request_tx, request_rx) = unbounded_channel();
376 let join_handle = spawn_coordinator_worker(param.clone(), request_rx);
377 self.running_coordinator_worker_join_handles.push(
378 join_handle
379 .map(move |join_result| (sink_id, join_result))
380 .boxed(),
381 );
382 CoordinatorWorkerHandle {
383 request_sender: Some(request_tx),
384 finish_notifiers: Vec::new(),
385 }
386 });
387
388 if let Some(sender) = handle.request_sender.as_mut() {
389 send_with_err_check!(sender, new_writer);
390 } else {
391 warn!(
392 "handle a new request while the sink coordinator is being stopped: {:?}",
393 param
394 );
395 new_writer.abort(Status::internal("the sink is being stopped"));
396 }
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use std::future::{Future, poll_fn};
403 use std::pin::pin;
404 use std::sync::Arc;
405 use std::task::Poll;
406
407 use anyhow::anyhow;
408 use async_trait::async_trait;
409 use futures::future::{join, try_join};
410 use futures::{FutureExt, StreamExt, TryFutureExt};
411 use itertools::Itertools;
412 use rand::seq::SliceRandom;
413 use risingwave_common::bitmap::BitmapBuilder;
414 use risingwave_common::catalog::Field;
415 use risingwave_common::hash::VirtualNode;
416 use risingwave_connector::sink::catalog::{SinkId, SinkType};
417 use risingwave_connector::sink::{SinkCommitCoordinator, SinkError, SinkParam};
418 use risingwave_pb::connector_service::SinkMetadata;
419 use risingwave_pb::connector_service::sink_metadata::{Metadata, SerializedMetadata};
420 use risingwave_rpc_client::CoordinatorStreamHandle;
421 use tokio::sync::mpsc::unbounded_channel;
422 use tokio_stream::wrappers::ReceiverStream;
423
424 use crate::manager::sink_coordination::SinkCoordinatorManager;
425 use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker;
426 use crate::manager::sink_coordination::manager::SinkCommittedEpochSubscriber;
427
428 struct MockCoordinator<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> {
429 context: C,
430 f: F,
431 }
432
433 impl<C, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError>> MockCoordinator<C, F> {
434 fn new(context: C, f: F) -> Self {
435 MockCoordinator { context, f }
436 }
437 }
438
439 #[async_trait]
440 impl<C: Send, F: FnMut(u64, Vec<SinkMetadata>, &mut C) -> Result<(), SinkError> + Send>
441 SinkCommitCoordinator for MockCoordinator<C, F>
442 {
443 async fn init(
444 &mut self,
445 _subscriber: SinkCommittedEpochSubscriber,
446 ) -> risingwave_connector::sink::Result<Option<u64>> {
447 Ok(None)
448 }
449
450 async fn commit(
451 &mut self,
452 epoch: u64,
453 metadata: Vec<SinkMetadata>,
454 _add_columns: Option<Vec<Field>>,
455 ) -> risingwave_connector::sink::Result<()> {
456 (self.f)(epoch, metadata, &mut self.context)
457 }
458 }
459
460 #[tokio::test]
461 async fn test_basic() {
462 let param = SinkParam {
463 sink_id: SinkId::from(1),
464 sink_name: "test".into(),
465 properties: Default::default(),
466 columns: vec![],
467 downstream_pk: vec![],
468 sink_type: SinkType::AppendOnly,
469 format_desc: None,
470 db_name: "test".into(),
471 sink_from_name: "test".into(),
472 };
473
474 let epoch0 = 232;
475 let epoch1 = 233;
476 let epoch2 = 234;
477
478 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
479 all_vnode.shuffle(&mut rand::rng());
480 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
481 let build_bitmap = |indexes: &[usize]| {
482 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
483 for i in indexes {
484 builder.set(*i, true);
485 }
486 builder.finish()
487 };
488 let vnode1 = build_bitmap(first);
489 let vnode2 = build_bitmap(second);
490
491 let metadata = [
492 [vec![1u8, 2u8], vec![3u8, 4u8]],
493 [vec![5u8, 6u8], vec![7u8, 8u8]],
494 ];
495 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
496 let (_sender, receiver) = unbounded_channel();
497
498 async move { Ok((1, receiver)) }.boxed()
499 });
500
501 let (manager, (_join_handle, _stop_tx)) =
502 SinkCoordinatorManager::start_worker_with_spawn_worker({
503 let expected_param = param.clone();
504 let metadata = metadata.clone();
505 move |param, new_writer_rx| {
506 let metadata = metadata.clone();
507 let expected_param = expected_param.clone();
508 tokio::spawn({
509 let subscriber = mock_subscriber.clone();
510 async move {
511 assert_eq!(param, expected_param);
513 CoordinatorWorker::execute_coordinator(
514 param.clone(),
515 new_writer_rx,
516 MockCoordinator::new(
517 0,
518 |epoch, metadata_list, count: &mut usize| {
519 *count += 1;
520 let mut metadata_list =
521 metadata_list
522 .into_iter()
523 .map(|metadata| match metadata {
524 SinkMetadata {
525 metadata:
526 Some(Metadata::Serialized(
527 SerializedMetadata { metadata },
528 )),
529 } => metadata,
530 _ => unreachable!(),
531 })
532 .collect_vec();
533 metadata_list.sort();
534 match *count {
535 1 => {
536 assert_eq!(epoch, epoch1);
537 assert_eq!(2, metadata_list.len());
538 assert_eq!(metadata[0][0], metadata_list[0]);
539 assert_eq!(metadata[0][1], metadata_list[1]);
540 }
541 2 => {
542 assert_eq!(epoch, epoch2);
543 assert_eq!(2, metadata_list.len());
544 assert_eq!(metadata[1][0], metadata_list[0]);
545 assert_eq!(metadata[1][1], metadata_list[1]);
546 }
547 _ => unreachable!(),
548 }
549 Ok(())
550 },
551 ),
552 subscriber.clone(),
553 )
554 .await;
555 }
556 })
557 }
558 });
559
560 let build_client = |vnode| async {
561 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
562 Ok(tonic::Response::new(
563 manager
564 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
565 .await
566 .unwrap()
567 .boxed(),
568 ))
569 })
570 .await
571 .unwrap()
572 .0
573 };
574
575 let (mut client1, mut client2) =
576 join(build_client(vnode1), pin!(build_client(vnode2))).await;
577
578 let (aligned_epoch1, aligned_epoch2) = try_join(
579 client1.align_initial_epoch(epoch0),
580 client2.align_initial_epoch(epoch1),
581 )
582 .await
583 .unwrap();
584 assert_eq!(aligned_epoch1, epoch1);
585 assert_eq!(aligned_epoch2, epoch1);
586
587 {
588 let mut commit_future = pin!(
590 client2
591 .commit(
592 epoch1,
593 SinkMetadata {
594 metadata: Some(Metadata::Serialized(SerializedMetadata {
595 metadata: metadata[0][1].clone(),
596 })),
597 },
598 None,
599 )
600 .map(|result| result.unwrap())
601 );
602 assert!(
603 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
604 .await
605 .is_pending()
606 );
607 join(
608 commit_future,
609 client1
610 .commit(
611 epoch1,
612 SinkMetadata {
613 metadata: Some(Metadata::Serialized(SerializedMetadata {
614 metadata: metadata[0][0].clone(),
615 })),
616 },
617 None,
618 )
619 .map(|result| result.unwrap()),
620 )
621 .await;
622 }
623
624 let mut commit_future = pin!(
626 client1
627 .commit(
628 epoch2,
629 SinkMetadata {
630 metadata: Some(Metadata::Serialized(SerializedMetadata {
631 metadata: metadata[1][0].clone(),
632 })),
633 },
634 None,
635 )
636 .map(|result| result.unwrap())
637 );
638 assert!(
639 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
640 .await
641 .is_pending()
642 );
643 join(
644 commit_future,
645 client2
646 .commit(
647 epoch2,
648 SinkMetadata {
649 metadata: Some(Metadata::Serialized(SerializedMetadata {
650 metadata: metadata[1][1].clone(),
651 })),
652 },
653 None,
654 )
655 .map(|result| result.unwrap()),
656 )
657 .await;
658 }
659
660 #[tokio::test]
661 async fn test_single_writer() {
662 let param = SinkParam {
663 sink_id: SinkId::from(1),
664 sink_name: "test".into(),
665 properties: Default::default(),
666 columns: vec![],
667 downstream_pk: vec![],
668 sink_type: SinkType::AppendOnly,
669 format_desc: None,
670 db_name: "test".into(),
671 sink_from_name: "test".into(),
672 };
673
674 let epoch1 = 233;
675 let epoch2 = 234;
676
677 let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
678 let build_bitmap = |indexes: &[usize]| {
679 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
680 for i in indexes {
681 builder.set(*i, true);
682 }
683 builder.finish()
684 };
685 let vnode = build_bitmap(&all_vnode);
686
687 let metadata = [vec![1u8, 2u8], vec![3u8, 4u8]];
688 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
689 let (_sender, receiver) = unbounded_channel();
690
691 async move { Ok((1, receiver)) }.boxed()
692 });
693 let (manager, (_join_handle, _stop_tx)) =
694 SinkCoordinatorManager::start_worker_with_spawn_worker({
695 let expected_param = param.clone();
696 let metadata = metadata.clone();
697 move |param, new_writer_rx| {
698 let metadata = metadata.clone();
699 let expected_param = expected_param.clone();
700 tokio::spawn({
701 let subscriber = mock_subscriber.clone();
702 async move {
703 assert_eq!(param, expected_param);
705 CoordinatorWorker::execute_coordinator(
706 param.clone(),
707 new_writer_rx,
708 MockCoordinator::new(
709 0,
710 |epoch, metadata_list, count: &mut usize| {
711 *count += 1;
712 let mut metadata_list =
713 metadata_list
714 .into_iter()
715 .map(|metadata| match metadata {
716 SinkMetadata {
717 metadata:
718 Some(Metadata::Serialized(
719 SerializedMetadata { metadata },
720 )),
721 } => metadata,
722 _ => unreachable!(),
723 })
724 .collect_vec();
725 metadata_list.sort();
726 match *count {
727 1 => {
728 assert_eq!(epoch, epoch1);
729 assert_eq!(1, metadata_list.len());
730 assert_eq!(metadata[0], metadata_list[0]);
731 }
732 2 => {
733 assert_eq!(epoch, epoch2);
734 assert_eq!(1, metadata_list.len());
735 assert_eq!(metadata[1], metadata_list[0]);
736 }
737 _ => unreachable!(),
738 }
739 Ok(())
740 },
741 ),
742 subscriber.clone(),
743 )
744 .await;
745 }
746 })
747 }
748 });
749
750 let build_client = |vnode| async {
751 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
752 Ok(tonic::Response::new(
753 manager
754 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
755 .await
756 .unwrap()
757 .boxed(),
758 ))
759 })
760 .await
761 .unwrap()
762 .0
763 };
764
765 let mut client = build_client(vnode).await;
766
767 let aligned_epoch = client.align_initial_epoch(epoch1).await.unwrap();
768 assert_eq!(aligned_epoch, epoch1);
769
770 client
771 .commit(
772 epoch1,
773 SinkMetadata {
774 metadata: Some(Metadata::Serialized(SerializedMetadata {
775 metadata: metadata[0].clone(),
776 })),
777 },
778 None,
779 )
780 .await
781 .unwrap();
782
783 client
784 .commit(
785 epoch2,
786 SinkMetadata {
787 metadata: Some(Metadata::Serialized(SerializedMetadata {
788 metadata: metadata[1].clone(),
789 })),
790 },
791 None,
792 )
793 .await
794 .unwrap();
795 }
796
797 #[tokio::test]
798 async fn test_partial_commit() {
799 let param = SinkParam {
800 sink_id: SinkId::from(1),
801 sink_name: "test".into(),
802 properties: Default::default(),
803 columns: vec![],
804 downstream_pk: vec![],
805 sink_type: SinkType::AppendOnly,
806 format_desc: None,
807 db_name: "test".into(),
808 sink_from_name: "test".into(),
809 };
810
811 let epoch = 233;
812
813 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
814 all_vnode.shuffle(&mut rand::rng());
815 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
816 let build_bitmap = |indexes: &[usize]| {
817 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
818 for i in indexes {
819 builder.set(*i, true);
820 }
821 builder.finish()
822 };
823 let vnode1 = build_bitmap(first);
824 let vnode2 = build_bitmap(second);
825
826 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
827 let (_sender, receiver) = unbounded_channel();
828
829 async move { Ok((1, receiver)) }.boxed()
830 });
831 let (manager, (_join_handle, _stop_tx)) =
832 SinkCoordinatorManager::start_worker_with_spawn_worker({
833 let expected_param = param.clone();
834 move |param, new_writer_rx| {
835 let expected_param = expected_param.clone();
836 tokio::spawn({
837 let subscriber = mock_subscriber.clone();
838 async move {
839 assert_eq!(param, expected_param);
841 CoordinatorWorker::execute_coordinator(
842 param,
843 new_writer_rx,
844 MockCoordinator::new((), |_, _, _| unreachable!()),
845 subscriber.clone(),
846 )
847 .await;
848 }
849 })
850 }
851 });
852
853 let build_client = |vnode| async {
854 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
855 Ok(tonic::Response::new(
856 manager
857 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
858 .await
859 .unwrap()
860 .boxed(),
861 ))
862 })
863 .await
864 .unwrap()
865 .0
866 };
867
868 let (mut client1, client2) = join(build_client(vnode1), build_client(vnode2)).await;
869
870 let mut commit_future = pin!(client1.commit(
872 epoch,
873 SinkMetadata {
874 metadata: Some(Metadata::Serialized(SerializedMetadata {
875 metadata: vec![],
876 })),
877 },
878 None,
879 ));
880 assert!(
881 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
882 .await
883 .is_pending()
884 );
885 drop(client2);
886 assert!(commit_future.await.is_err());
887 }
888
889 #[tokio::test]
890 async fn test_fail_commit() {
891 let param = SinkParam {
892 sink_id: SinkId::from(1),
893 sink_name: "test".into(),
894 properties: Default::default(),
895 columns: vec![],
896 downstream_pk: vec![],
897 sink_type: SinkType::AppendOnly,
898 format_desc: None,
899 db_name: "test".into(),
900 sink_from_name: "test".into(),
901 };
902
903 let epoch = 233;
904
905 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
906 all_vnode.shuffle(&mut rand::rng());
907 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
908 let build_bitmap = |indexes: &[usize]| {
909 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
910 for i in indexes {
911 builder.set(*i, true);
912 }
913 builder.finish()
914 };
915 let vnode1 = build_bitmap(first);
916 let vnode2 = build_bitmap(second);
917 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
918 let (_sender, receiver) = unbounded_channel();
919
920 async move { Ok((1, receiver)) }.boxed()
921 });
922 let (manager, (_join_handle, _stop_tx)) =
923 SinkCoordinatorManager::start_worker_with_spawn_worker({
924 let expected_param = param.clone();
925 move |param, new_writer_rx| {
926 let expected_param = expected_param.clone();
927 tokio::spawn({
928 let subscriber = mock_subscriber.clone();
929 {
930 async move {
931 assert_eq!(param, expected_param);
933 CoordinatorWorker::execute_coordinator(
934 param,
935 new_writer_rx,
936 MockCoordinator::new((), |_, _, _| {
937 Err(SinkError::Coordinator(anyhow!("failed to commit")))
938 }),
939 subscriber.clone(),
940 )
941 .await;
942 }
943 }
944 })
945 }
946 });
947
948 let build_client = |vnode| async {
949 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
950 Ok(tonic::Response::new(
951 manager
952 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
953 .await
954 .unwrap()
955 .boxed(),
956 ))
957 })
958 .await
959 .unwrap()
960 .0
961 };
962
963 let (mut client1, mut client2) = join(build_client(vnode1), build_client(vnode2)).await;
964
965 let mut commit_future = pin!(client1.commit(
967 epoch,
968 SinkMetadata {
969 metadata: Some(Metadata::Serialized(SerializedMetadata {
970 metadata: vec![],
971 })),
972 },
973 None,
974 ));
975 assert!(
976 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
977 .await
978 .is_pending()
979 );
980 let (result1, result2) = join(
981 commit_future,
982 client2.commit(
983 epoch,
984 SinkMetadata {
985 metadata: Some(Metadata::Serialized(SerializedMetadata {
986 metadata: vec![],
987 })),
988 },
989 None,
990 ),
991 )
992 .await;
993 assert!(result1.is_err());
994 assert!(result2.is_err());
995 }
996
997 #[tokio::test]
998 async fn test_update_vnode_bitmap() {
999 let param = SinkParam {
1000 sink_id: SinkId::from(1),
1001 sink_name: "test".into(),
1002 properties: Default::default(),
1003 columns: vec![],
1004 downstream_pk: vec![],
1005 sink_type: SinkType::AppendOnly,
1006 format_desc: None,
1007 db_name: "test".into(),
1008 sink_from_name: "test".into(),
1009 };
1010
1011 let epoch1 = 233;
1012 let epoch2 = 234;
1013 let epoch3 = 235;
1014 let epoch4 = 236;
1015
1016 let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
1017 all_vnode.shuffle(&mut rand::rng());
1018 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
1019 let build_bitmap = |indexes: &[usize]| {
1020 let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
1021 for i in indexes {
1022 builder.set(*i, true);
1023 }
1024 builder.finish()
1025 };
1026 let vnode1 = build_bitmap(first);
1027 let vnode2 = build_bitmap(second);
1028
1029 let metadata = [
1030 [vec![1u8, 2u8], vec![3u8, 4u8]],
1031 [vec![5u8, 6u8], vec![7u8, 8u8]],
1032 ];
1033
1034 let metadata_scale_out = [vec![9u8, 10u8], vec![11u8, 12u8], vec![13u8, 14u8]];
1035 let metadata_scale_in = [vec![13u8, 14u8], vec![15u8, 16u8]];
1036 let mock_subscriber: SinkCommittedEpochSubscriber = Arc::new(move |_sink_id: SinkId| {
1037 let (_sender, receiver) = unbounded_channel();
1038
1039 async move { Ok((1, receiver)) }.boxed()
1040 });
1041 let (manager, (_join_handle, _stop_tx)) =
1042 SinkCoordinatorManager::start_worker_with_spawn_worker({
1043 let expected_param = param.clone();
1044 let metadata = metadata.clone();
1045 let metadata_scale_out = metadata_scale_out.clone();
1046 let metadata_scale_in = metadata_scale_in.clone();
1047 move |param, new_writer_rx| {
1048 let metadata = metadata.clone();
1049 let metadata_scale_out = metadata_scale_out.clone();
1050 let metadata_scale_in = metadata_scale_in.clone();
1051 let expected_param = expected_param.clone();
1052 tokio::spawn({
1053 let subscriber = mock_subscriber.clone();
1054 async move {
1055 assert_eq!(param, expected_param);
1057 CoordinatorWorker::execute_coordinator(
1058 param.clone(),
1059 new_writer_rx,
1060 MockCoordinator::new(
1061 0,
1062 |epoch, metadata_list, count: &mut usize| {
1063 *count += 1;
1064 let mut metadata_list =
1065 metadata_list
1066 .into_iter()
1067 .map(|metadata| match metadata {
1068 SinkMetadata {
1069 metadata:
1070 Some(Metadata::Serialized(
1071 SerializedMetadata { metadata },
1072 )),
1073 } => metadata,
1074 _ => unreachable!(),
1075 })
1076 .collect_vec();
1077 metadata_list.sort();
1078 let (expected_epoch, expected_metadata_list) = match *count
1079 {
1080 1 => (epoch1, metadata[0].as_slice()),
1081 2 => (epoch2, metadata[1].as_slice()),
1082 3 => (epoch3, metadata_scale_out.as_slice()),
1083 4 => (epoch4, metadata_scale_in.as_slice()),
1084 _ => unreachable!(),
1085 };
1086 assert_eq!(expected_epoch, epoch);
1087 assert_eq!(expected_metadata_list, &metadata_list);
1088 Ok(())
1089 },
1090 ),
1091 subscriber.clone(),
1092 )
1093 .await;
1094 }
1095 })
1096 }
1097 });
1098
1099 let build_client = |vnode| async {
1100 CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async {
1101 Ok(tonic::Response::new(
1102 manager
1103 .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed())
1104 .await
1105 .unwrap()
1106 .boxed(),
1107 ))
1108 })
1109 .await
1110 };
1111
1112 let ((mut client1, _), (mut client2, _)) =
1113 try_join(build_client(vnode1), pin!(build_client(vnode2)))
1114 .await
1115 .unwrap();
1116
1117 let (aligned_epoch1, aligned_epoch2) = try_join(
1118 client1.align_initial_epoch(epoch1),
1119 client2.align_initial_epoch(epoch1),
1120 )
1121 .await
1122 .unwrap();
1123 assert_eq!(aligned_epoch1, epoch1);
1124 assert_eq!(aligned_epoch2, epoch1);
1125
1126 {
1127 let mut commit_future = pin!(
1129 client2
1130 .commit(
1131 epoch1,
1132 SinkMetadata {
1133 metadata: Some(Metadata::Serialized(SerializedMetadata {
1134 metadata: metadata[0][1].clone(),
1135 })),
1136 },
1137 None,
1138 )
1139 .map(|result| result.unwrap())
1140 );
1141 assert!(
1142 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1143 .await
1144 .is_pending()
1145 );
1146 join(
1147 commit_future,
1148 client1
1149 .commit(
1150 epoch1,
1151 SinkMetadata {
1152 metadata: Some(Metadata::Serialized(SerializedMetadata {
1153 metadata: metadata[0][0].clone(),
1154 })),
1155 },
1156 None,
1157 )
1158 .map(|result| result.unwrap()),
1159 )
1160 .await;
1161 }
1162
1163 let (vnode1, vnode2, vnode3) = {
1164 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1165 let (second, third) = second.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1166 (
1167 build_bitmap(first),
1168 build_bitmap(second),
1169 build_bitmap(third),
1170 )
1171 };
1172
1173 let mut build_client3_future = pin!(build_client(vnode3));
1174 assert!(
1175 poll_fn(|cx| Poll::Ready(build_client3_future.as_mut().poll(cx)))
1176 .await
1177 .is_pending()
1178 );
1179 let mut client3;
1180 {
1181 {
1182 let mut commit_future = pin!(
1184 client1
1185 .commit(
1186 epoch2,
1187 SinkMetadata {
1188 metadata: Some(Metadata::Serialized(SerializedMetadata {
1189 metadata: metadata[1][0].clone(),
1190 })),
1191 },
1192 None,
1193 )
1194 .map_err(Into::into)
1195 );
1196 assert!(
1197 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1198 .await
1199 .is_pending()
1200 );
1201 try_join(
1202 commit_future,
1203 client2.commit(
1204 epoch2,
1205 SinkMetadata {
1206 metadata: Some(Metadata::Serialized(SerializedMetadata {
1207 metadata: metadata[1][1].clone(),
1208 })),
1209 },
1210 None,
1211 ),
1212 )
1213 .await
1214 .unwrap();
1215 }
1216
1217 client3 = {
1218 let (
1219 (client3, init_epoch),
1220 (update_vnode_bitmap_epoch1, update_vnode_bitmap_epoch2),
1221 ) = try_join(
1222 build_client3_future,
1223 try_join(
1224 client1.update_vnode_bitmap(&vnode1),
1225 client2.update_vnode_bitmap(&vnode2),
1226 )
1227 .map_err(Into::into),
1228 )
1229 .await
1230 .unwrap();
1231 assert_eq!(init_epoch, Some(epoch2));
1232 assert_eq!(update_vnode_bitmap_epoch1, epoch2);
1233 assert_eq!(update_vnode_bitmap_epoch2, epoch2);
1234 client3
1235 };
1236 let mut commit_future3 = pin!(client3.commit(
1237 epoch3,
1238 SinkMetadata {
1239 metadata: Some(Metadata::Serialized(SerializedMetadata {
1240 metadata: metadata_scale_out[2].clone(),
1241 })),
1242 },
1243 None,
1244 ));
1245 assert!(
1246 poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1247 .await
1248 .is_pending()
1249 );
1250 let mut commit_future1 = pin!(client1.commit(
1251 epoch3,
1252 SinkMetadata {
1253 metadata: Some(Metadata::Serialized(SerializedMetadata {
1254 metadata: metadata_scale_out[0].clone(),
1255 })),
1256 },
1257 None,
1258 ));
1259 assert!(
1260 poll_fn(|cx| Poll::Ready(commit_future1.as_mut().poll(cx)))
1261 .await
1262 .is_pending()
1263 );
1264 assert!(
1265 poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx)))
1266 .await
1267 .is_pending()
1268 );
1269 try_join(
1270 client2.commit(
1271 epoch3,
1272 SinkMetadata {
1273 metadata: Some(Metadata::Serialized(SerializedMetadata {
1274 metadata: metadata_scale_out[1].clone(),
1275 })),
1276 },
1277 None,
1278 ),
1279 try_join(commit_future1, commit_future3),
1280 )
1281 .await
1282 .unwrap();
1283 }
1284
1285 let (vnode2, vnode3) = {
1286 let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
1287 (build_bitmap(first), build_bitmap(second))
1288 };
1289
1290 {
1291 let (_, (update_vnode_bitmap_epoch2, update_vnode_bitmap_epoch3)) = try_join(
1292 client1.stop(),
1293 try_join(
1294 client2.update_vnode_bitmap(&vnode2),
1295 client3.update_vnode_bitmap(&vnode3),
1296 ),
1297 )
1298 .await
1299 .unwrap();
1300 assert_eq!(update_vnode_bitmap_epoch2, epoch3);
1301 assert_eq!(update_vnode_bitmap_epoch3, epoch3);
1302 }
1303
1304 {
1305 let mut commit_future = pin!(
1306 client2
1307 .commit(
1308 epoch4,
1309 SinkMetadata {
1310 metadata: Some(Metadata::Serialized(SerializedMetadata {
1311 metadata: metadata_scale_in[0].clone(),
1312 })),
1313 },
1314 None,
1315 )
1316 .map(|result| result.unwrap())
1317 );
1318 assert!(
1319 poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx)))
1320 .await
1321 .is_pending()
1322 );
1323 join(
1324 commit_future,
1325 client3
1326 .commit(
1327 epoch4,
1328 SinkMetadata {
1329 metadata: Some(Metadata::Serialized(SerializedMetadata {
1330 metadata: metadata_scale_in[1].clone(),
1331 })),
1332 },
1333 None,
1334 )
1335 .map(|result| result.unwrap()),
1336 )
1337 .await;
1338 }
1339 }
1340}