1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::Debug;
17use std::future::{Future, poll_fn};
18use std::pin::pin;
19use std::task::Poll;
20use std::time::{Duration, Instant};
21
22use anyhow::anyhow;
23use futures::future::{Either, select};
24use futures::pin_mut;
25use itertools::Itertools;
26use risingwave_common::bail;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::Field;
29use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::{
32 Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkParam, build_sink,
33};
34use risingwave_pb::connector_service::{SinkMetadata, coordinate_request};
35use sea_orm::DatabaseConnection;
36use thiserror_ext::AsReport;
37use tokio::select;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
39use tokio::time::sleep;
40use tonic::Status;
41use tracing::{error, warn};
42
43use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
44
45async fn run_future_with_periodic_fn<F: Future>(
46 future: F,
47 interval: Duration,
48 mut f: impl FnMut(),
49) -> F::Output {
50 pin_mut!(future);
51 loop {
52 match select(&mut future, pin!(sleep(interval))).await {
53 Either::Left((output, _)) => {
54 break output;
55 }
56 Either::Right(_) => f(),
57 }
58 }
59}
60
61type HandleId = usize;
62
63#[derive(Default)]
64struct AligningRequests<R> {
65 requests: Vec<R>,
66 handle_ids: HashSet<HandleId>,
67 committed_bitmap: Option<Bitmap>, }
69
70impl<R> AligningRequests<R> {
71 fn add_new_request(
72 &mut self,
73 handle_id: HandleId,
74 request: R,
75 vnode_bitmap: &Bitmap,
76 ) -> anyhow::Result<()>
77 where
78 R: Debug,
79 {
80 let committed_bitmap = self
81 .committed_bitmap
82 .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
83 assert_eq!(committed_bitmap.len(), vnode_bitmap.len());
84
85 let check_bitmap = (&*committed_bitmap) & vnode_bitmap;
86 if check_bitmap.count_ones() > 0 {
87 return Err(anyhow!(
88 "duplicate vnode {:?}. request vnode: {:?}, prev vnode: {:?}. pending request: {:?}, request: {:?}",
89 check_bitmap.iter_ones().collect_vec(),
90 vnode_bitmap,
91 committed_bitmap,
92 self.requests,
93 request
94 ));
95 }
96 *committed_bitmap |= vnode_bitmap;
97 self.requests.push(request);
98 assert!(self.handle_ids.insert(handle_id));
99 Ok(())
100 }
101
102 fn aligned(&self) -> bool {
103 self.committed_bitmap.as_ref().is_some_and(|b| b.all())
104 }
105}
106
107struct CoordinationHandleManager {
108 param: SinkParam,
109 writer_handles: HashMap<HandleId, SinkWriterCoordinationHandle>,
110 next_handle_id: HandleId,
111 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
112}
113
114impl CoordinationHandleManager {
115 fn start(
116 &mut self,
117 log_store_rewind_start_epoch: Option<u64>,
118 handle_ids: impl IntoIterator<Item = HandleId>,
119 ) -> anyhow::Result<()> {
120 for handle_id in handle_ids {
121 let handle = self
122 .writer_handles
123 .get_mut(&handle_id)
124 .ok_or_else(|| anyhow!("fail to find handle for {} to start", handle_id,))?;
125 handle.start(log_store_rewind_start_epoch).map_err(|_| {
126 anyhow!(
127 "fail to start {:?} for handle {}",
128 log_store_rewind_start_epoch,
129 handle_id
130 )
131 })?;
132 }
133 Ok(())
134 }
135
136 fn ack_aligned_initial_epoch(&mut self, aligned_initial_epoch: u64) -> anyhow::Result<()> {
137 for (handle_id, handle) in &mut self.writer_handles {
138 handle
139 .ack_aligned_initial_epoch(aligned_initial_epoch)
140 .map_err(|_| {
141 anyhow!(
142 "fail to ack_aligned_initial_epoch {:?} for handle {}",
143 aligned_initial_epoch,
144 handle_id
145 )
146 })?;
147 }
148 Ok(())
149 }
150
151 fn ack_commit(
152 &mut self,
153 epoch: u64,
154 handle_ids: impl IntoIterator<Item = HandleId>,
155 ) -> anyhow::Result<()> {
156 for handle_id in handle_ids {
157 let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| {
158 anyhow!(
159 "fail to find handle for {} when ack commit on epoch {}",
160 handle_id,
161 epoch
162 )
163 })?;
164 handle.ack_commit(epoch).map_err(|_| {
165 anyhow!(
166 "fail to ack commit on epoch {} for handle {}",
167 epoch,
168 handle_id
169 )
170 })?;
171 }
172 Ok(())
173 }
174
175 async fn next_request_inner(
176 writer_handles: &mut HashMap<HandleId, SinkWriterCoordinationHandle>,
177 ) -> anyhow::Result<(HandleId, coordinate_request::Msg)> {
178 poll_fn(|cx| {
179 for (handle_id, handle) in writer_handles.iter_mut() {
180 if let Poll::Ready(result) = handle.poll_next_request(cx) {
181 return Poll::Ready(result.map(|request| (*handle_id, request)));
182 }
183 }
184 Poll::Pending
185 })
186 .await
187 }
188}
189
190enum CoordinationHandleManagerEvent {
191 NewHandle,
192 UpdateVnodeBitmap,
193 Stop,
194 CommitRequest {
195 epoch: u64,
196 metadata: SinkMetadata,
197 add_columns: Option<Vec<Field>>,
198 },
199 AlignInitialEpoch(u64),
200}
201
202impl CoordinationHandleManagerEvent {
203 fn name(&self) -> &'static str {
204 match self {
205 CoordinationHandleManagerEvent::NewHandle => "NewHandle",
206 CoordinationHandleManagerEvent::UpdateVnodeBitmap => "UpdateVnodeBitmap",
207 CoordinationHandleManagerEvent::Stop => "Stop",
208 CoordinationHandleManagerEvent::CommitRequest { .. } => "CommitRequest",
209 CoordinationHandleManagerEvent::AlignInitialEpoch(_) => "AlignInitialEpoch",
210 }
211 }
212}
213
214impl CoordinationHandleManager {
215 async fn next_event(&mut self) -> anyhow::Result<(HandleId, CoordinationHandleManagerEvent)> {
216 {
217 select! {
218 handle = self.request_rx.recv() => {
219 let handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?;
220 if handle.param() != &self.param {
221 warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch");
222 }
223 let handle_id = self.next_handle_id;
224 self.next_handle_id += 1;
225 self.writer_handles.insert(handle_id, handle);
226 Ok((handle_id, CoordinationHandleManagerEvent::NewHandle))
227 }
228 result = Self::next_request_inner(&mut self.writer_handles) => {
229 let (handle_id, request) = result?;
230 let event = match request {
231 coordinate_request::Msg::CommitRequest(request) => {
232 CoordinationHandleManagerEvent::CommitRequest {
233 epoch: request.epoch,
234 metadata: request.metadata.ok_or_else(|| anyhow!("empty sink metadata"))?,
235 add_columns: request.add_columns.map(|add_columns| add_columns.fields.into_iter().map(|field| Field::from_prost(&field)).collect()),
236 }
237 }
238 coordinate_request::Msg::AlignInitialEpochRequest(epoch) => {
239 CoordinationHandleManagerEvent::AlignInitialEpoch(epoch)
240 }
241 coordinate_request::Msg::UpdateVnodeRequest(_) => {
242 CoordinationHandleManagerEvent::UpdateVnodeBitmap
243 }
244 coordinate_request::Msg::Stop(_) => {
245 CoordinationHandleManagerEvent::Stop
246 }
247 coordinate_request::Msg::StartRequest(_) => {
248 unreachable!("should have been handled");
249 }
250 };
251 Ok((handle_id, event))
252 }
253 }
254 }
255 }
256
257 fn vnode_bitmap(&self, handle_id: HandleId) -> &Bitmap {
258 self.writer_handles[&handle_id].vnode_bitmap()
259 }
260
261 fn stop_handle(&mut self, handle_id: HandleId) -> anyhow::Result<()> {
262 self.writer_handles
263 .remove(&handle_id)
264 .expect("should exist")
265 .stop()
266 }
267
268 async fn wait_init_handles(
269 &mut self,
270 log_store_rewind_start_epoch: Option<u64>,
271 ) -> anyhow::Result<HashSet<HandleId>> {
272 assert!(self.writer_handles.is_empty());
273 let mut init_requests = AligningRequests::default();
274 while !init_requests.aligned() {
275 let (handle_id, event) = self.next_event().await?;
276 let unexpected_event = match event {
277 CoordinationHandleManagerEvent::NewHandle => {
278 init_requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
279 continue;
280 }
281 event => event.name(),
282 };
283 return Err(anyhow!(
284 "expect new handle during init, but got {}",
285 unexpected_event
286 ));
287 }
288 self.start(
289 log_store_rewind_start_epoch,
290 init_requests.handle_ids.iter().cloned(),
291 )?;
292 if log_store_rewind_start_epoch.is_none() {
293 let mut align_requests = AligningRequests::default();
294 while !align_requests.aligned() {
295 let (handle_id, event) = self.next_event().await?;
296 match event {
297 CoordinationHandleManagerEvent::AlignInitialEpoch(initial_epoch) => {
298 align_requests.add_new_request(
299 handle_id,
300 initial_epoch,
301 self.vnode_bitmap(handle_id),
302 )?;
303 }
304 other => {
305 return Err(anyhow!("expect AlignInitialEpoch but got {}", other.name()));
306 }
307 }
308 }
309 let aligned_initial_epoch = align_requests
310 .requests
311 .into_iter()
312 .max()
313 .expect("non-empty");
314 self.ack_aligned_initial_epoch(aligned_initial_epoch)?;
315 }
316 Ok(init_requests.handle_ids)
317 }
318
319 async fn alter_parallelisms(
320 &mut self,
321 altered_handles: impl Iterator<Item = HandleId>,
322 prev_commit_epoch: u64,
323 ) -> anyhow::Result<HashSet<HandleId>> {
324 let mut requests = AligningRequests::default();
325 for handle_id in altered_handles {
326 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
327 }
328 let mut remaining_handles: HashSet<_> = self
329 .writer_handles
330 .keys()
331 .filter(|handle_id| !requests.handle_ids.contains(handle_id))
332 .cloned()
333 .collect();
334 while !remaining_handles.is_empty() || !requests.aligned() {
335 let (handle_id, event) = self.next_event().await?;
336 match event {
337 CoordinationHandleManagerEvent::NewHandle => {
338 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
339 }
340 CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
341 assert!(remaining_handles.remove(&handle_id));
342 requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
343 }
344 CoordinationHandleManagerEvent::Stop => {
345 assert!(remaining_handles.remove(&handle_id));
346 self.stop_handle(handle_id)?;
347 }
348 CoordinationHandleManagerEvent::CommitRequest { epoch, .. } => {
349 bail!(
350 "receive commit request on epoch {} from handle {} during alter parallelism",
351 epoch,
352 handle_id
353 );
354 }
355 CoordinationHandleManagerEvent::AlignInitialEpoch(epoch) => {
356 bail!(
357 "receive AlignInitialEpoch on epoch {} from handle {} during alter parallelism",
358 epoch,
359 handle_id
360 );
361 }
362 }
363 }
364 self.start(Some(prev_commit_epoch), requests.handle_ids.iter().cloned())?;
365 Ok(requests.handle_ids)
366 }
367}
368
369pub struct CoordinatorWorker {
370 handle_manager: CoordinationHandleManager,
371}
372
373impl CoordinatorWorker {
374 pub async fn run(
375 param: SinkParam,
376 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
377 db: DatabaseConnection,
378 subscriber: SinkCommittedEpochSubscriber,
379 iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
380 ) {
381 let sink = match build_sink(param.clone()) {
382 Ok(sink) => sink,
383 Err(e) => {
384 error!(
385 error = %e.as_report(),
386 "unable to build sink with param {:?}",
387 param
388 );
389 return;
390 }
391 };
392
393 dispatch_sink!(sink, sink, {
394 let coordinator = match sink
395 .new_coordinator(db, Some(iceberg_compact_stat_sender))
396 .await
397 {
398 Ok(coordinator) => coordinator,
399 Err(e) => {
400 error!(
401 error = %e.as_report(),
402 "unable to build coordinator with param {:?}",
403 param
404 );
405 return;
406 }
407 };
408 Self::execute_coordinator(param, request_rx, coordinator, subscriber).await
409 });
410 }
411
412 pub async fn execute_coordinator(
413 param: SinkParam,
414 request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
415 coordinator: impl SinkCommitCoordinator,
416 subscriber: SinkCommittedEpochSubscriber,
417 ) {
418 let mut worker = CoordinatorWorker {
419 handle_manager: CoordinationHandleManager {
420 param,
421 writer_handles: HashMap::new(),
422 next_handle_id: 0,
423 request_rx,
424 },
425 };
426
427 if let Err(e) = worker.run_coordination(coordinator, subscriber).await {
428 for handle in worker.handle_manager.writer_handles.into_values() {
429 handle.abort(Status::internal(format!(
430 "failed to run coordination: {:?}",
431 e.as_report()
432 )))
433 }
434 }
435 }
436
437 async fn run_coordination(
438 &mut self,
439 mut coordinator: impl SinkCommitCoordinator,
440 subscriber: SinkCommittedEpochSubscriber,
441 ) -> anyhow::Result<()> {
442 let initial_log_store_rewind_start_epoch = coordinator.init(subscriber).await?;
443 let sink_id = self.handle_manager.param.sink_id;
444 let mut running_handles = self
445 .handle_manager
446 .wait_init_handles(initial_log_store_rewind_start_epoch)
447 .await?;
448 let mut pending_epochs: BTreeMap<u64, AligningRequests<_>> = BTreeMap::new();
449 let mut pending_new_handles = vec![];
450 let mut prev_commit_epoch = None;
451 loop {
452 let (handle_id, event) = self.handle_manager.next_event().await?;
453 let (epoch, commit_request) = match event {
454 CoordinationHandleManagerEvent::NewHandle => {
455 pending_new_handles.push(handle_id);
456 continue;
457 }
458 CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
459 running_handles = self
460 .handle_manager
461 .alter_parallelisms(
462 pending_new_handles.drain(..).chain([handle_id]),
463 prev_commit_epoch.ok_or_else(|| {
464 anyhow!("should have committed once on alter parallelisms")
465 })?,
466 )
467 .await?;
468 continue;
469 }
470 CoordinationHandleManagerEvent::Stop => {
471 self.handle_manager.stop_handle(handle_id)?;
472 running_handles = self
473 .handle_manager
474 .alter_parallelisms(
475 pending_new_handles.drain(..),
476 prev_commit_epoch.ok_or_else(|| {
477 anyhow!("should have committed once on alter parallelisms")
478 })?,
479 )
480 .await?;
481 continue;
482 }
483 CoordinationHandleManagerEvent::CommitRequest {
484 epoch,
485 metadata,
486 add_columns,
487 } => (epoch, (metadata, add_columns)),
488 CoordinationHandleManagerEvent::AlignInitialEpoch(_) => {
489 bail!("receive AlignInitialEpoch after initialization")
490 }
491 };
492 if !running_handles.contains(&handle_id) {
493 bail!(
494 "receiving commit request from non-running handle {}, running handles: {:?}",
495 handle_id,
496 running_handles
497 );
498 }
499 pending_epochs.entry(epoch).or_default().add_new_request(
500 handle_id,
501 commit_request,
502 self.handle_manager.vnode_bitmap(handle_id),
503 )?;
504 if pending_epochs
505 .first_key_value()
506 .expect("non-empty")
507 .1
508 .aligned()
509 {
510 let (epoch, commit_requests) = pending_epochs.pop_first().expect("non-empty");
511 let mut metadatas = Vec::with_capacity(commit_requests.requests.len());
512 let mut requests = commit_requests.requests.into_iter();
513 let (first_metadata, first_add_columns) = requests.next().expect("non-empty");
514 metadatas.push(first_metadata);
515 for (metadata, add_columns) in requests {
516 if first_add_columns != add_columns {
517 return Err(anyhow!(
518 "got different add columns {:?} to prev add columns {:?}",
519 add_columns,
520 first_add_columns
521 ));
522 }
523 metadatas.push(metadata);
524 }
525
526 let start_time = Instant::now();
527 run_future_with_periodic_fn(
528 coordinator.commit(epoch, metadatas, first_add_columns),
529 Duration::from_secs(5),
530 || {
531 warn!(
532 elapsed = ?start_time.elapsed(),
533 sink_id = sink_id.sink_id,
534 "committing"
535 );
536 },
537 )
538 .await
539 .map_err(|e| anyhow!(e))?;
540 self.handle_manager
541 .ack_commit(epoch, commit_requests.handle_ids)?;
542 prev_commit_epoch = Some(epoch);
543 }
544 }
545 }
546}