risingwave_meta/manager/sink_coordination/
coordinator_worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::Debug;
17use std::future::{Future, poll_fn};
18use std::pin::pin;
19use std::task::Poll;
20use std::time::{Duration, Instant};
21
22use anyhow::anyhow;
23use futures::future::{Either, select};
24use futures::pin_mut;
25use itertools::Itertools;
26use risingwave_common::bail;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
29use risingwave_connector::dispatch_sink;
30use risingwave_connector::sink::{
31    Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkParam, build_sink,
32};
33use risingwave_pb::connector_service::{SinkMetadata, coordinate_request};
34use sea_orm::DatabaseConnection;
35use thiserror_ext::AsReport;
36use tokio::select;
37use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
38use tokio::time::sleep;
39use tonic::Status;
40use tracing::{error, warn};
41
42use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
43
44async fn run_future_with_periodic_fn<F: Future>(
45    future: F,
46    interval: Duration,
47    mut f: impl FnMut(),
48) -> F::Output {
49    pin_mut!(future);
50    loop {
51        match select(&mut future, pin!(sleep(interval))).await {
52            Either::Left((output, _)) => {
53                break output;
54            }
55            Either::Right(_) => f(),
56        }
57    }
58}
59
60type HandleId = usize;
61
62#[derive(Default)]
63struct AligningRequests<R> {
64    requests: Vec<R>,
65    handle_ids: HashSet<HandleId>,
66    committed_bitmap: Option<Bitmap>, // lazy-initialized on first request
67}
68
69impl<R> AligningRequests<R> {
70    fn add_new_request(
71        &mut self,
72        handle_id: HandleId,
73        request: R,
74        vnode_bitmap: &Bitmap,
75    ) -> anyhow::Result<()>
76    where
77        R: Debug,
78    {
79        let committed_bitmap = self
80            .committed_bitmap
81            .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
82        assert_eq!(committed_bitmap.len(), vnode_bitmap.len());
83
84        let check_bitmap = (&*committed_bitmap) & vnode_bitmap;
85        if check_bitmap.count_ones() > 0 {
86            return Err(anyhow!(
87                "duplicate vnode {:?}. request vnode: {:?}, prev vnode: {:?}. pending request: {:?}, request: {:?}",
88                check_bitmap.iter_ones().collect_vec(),
89                vnode_bitmap,
90                committed_bitmap,
91                self.requests,
92                request
93            ));
94        }
95        *committed_bitmap |= vnode_bitmap;
96        self.requests.push(request);
97        assert!(self.handle_ids.insert(handle_id));
98        Ok(())
99    }
100
101    fn aligned(&self) -> bool {
102        self.committed_bitmap.as_ref().is_some_and(|b| b.all())
103    }
104}
105
106struct CoordinationHandleManager {
107    param: SinkParam,
108    writer_handles: HashMap<HandleId, SinkWriterCoordinationHandle>,
109    next_handle_id: HandleId,
110    request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
111}
112
113impl CoordinationHandleManager {
114    fn start(
115        &mut self,
116        log_store_rewind_start_epoch: Option<u64>,
117        handle_ids: impl IntoIterator<Item = HandleId>,
118    ) -> anyhow::Result<()> {
119        for handle_id in handle_ids {
120            let handle = self
121                .writer_handles
122                .get_mut(&handle_id)
123                .ok_or_else(|| anyhow!("fail to find handle for {} to start", handle_id,))?;
124            handle.start(log_store_rewind_start_epoch).map_err(|_| {
125                anyhow!(
126                    "fail to start {:?} for handle {}",
127                    log_store_rewind_start_epoch,
128                    handle_id
129                )
130            })?;
131        }
132        Ok(())
133    }
134
135    fn ack_aligned_initial_epoch(&mut self, aligned_initial_epoch: u64) -> anyhow::Result<()> {
136        for (handle_id, handle) in &mut self.writer_handles {
137            handle
138                .ack_aligned_initial_epoch(aligned_initial_epoch)
139                .map_err(|_| {
140                    anyhow!(
141                        "fail to ack_aligned_initial_epoch {:?} for handle {}",
142                        aligned_initial_epoch,
143                        handle_id
144                    )
145                })?;
146        }
147        Ok(())
148    }
149
150    fn ack_commit(
151        &mut self,
152        epoch: u64,
153        handle_ids: impl IntoIterator<Item = HandleId>,
154    ) -> anyhow::Result<()> {
155        for handle_id in handle_ids {
156            let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| {
157                anyhow!(
158                    "fail to find handle for {} when ack commit on epoch {}",
159                    handle_id,
160                    epoch
161                )
162            })?;
163            handle.ack_commit(epoch).map_err(|_| {
164                anyhow!(
165                    "fail to ack commit on epoch {} for handle {}",
166                    epoch,
167                    handle_id
168                )
169            })?;
170        }
171        Ok(())
172    }
173
174    async fn next_request_inner(
175        writer_handles: &mut HashMap<HandleId, SinkWriterCoordinationHandle>,
176    ) -> anyhow::Result<(HandleId, coordinate_request::Msg)> {
177        poll_fn(|cx| {
178            for (handle_id, handle) in writer_handles.iter_mut() {
179                if let Poll::Ready(result) = handle.poll_next_request(cx) {
180                    return Poll::Ready(result.map(|request| (*handle_id, request)));
181                }
182            }
183            Poll::Pending
184        })
185        .await
186    }
187}
188
189enum CoordinationHandleManagerEvent {
190    NewHandle,
191    UpdateVnodeBitmap,
192    Stop,
193    CommitRequest { epoch: u64, metadata: SinkMetadata },
194    AlignInitialEpoch(u64),
195}
196
197impl CoordinationHandleManagerEvent {
198    fn name(&self) -> &'static str {
199        match self {
200            CoordinationHandleManagerEvent::NewHandle => "NewHandle",
201            CoordinationHandleManagerEvent::UpdateVnodeBitmap => "UpdateVnodeBitmap",
202            CoordinationHandleManagerEvent::Stop => "Stop",
203            CoordinationHandleManagerEvent::CommitRequest { .. } => "CommitRequest",
204            CoordinationHandleManagerEvent::AlignInitialEpoch(_) => "AlignInitialEpoch",
205        }
206    }
207}
208
209impl CoordinationHandleManager {
210    async fn next_event(&mut self) -> anyhow::Result<(HandleId, CoordinationHandleManagerEvent)> {
211        {
212            select! {
213                handle = self.request_rx.recv() => {
214                    let handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?;
215                    if handle.param() != &self.param {
216                        warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch");
217                    }
218                    let handle_id = self.next_handle_id;
219                    self.next_handle_id += 1;
220                    self.writer_handles.insert(handle_id, handle);
221                    Ok((handle_id, CoordinationHandleManagerEvent::NewHandle))
222                }
223                result = Self::next_request_inner(&mut self.writer_handles) => {
224                    let (handle_id, request) = result?;
225                    let event = match request {
226                        coordinate_request::Msg::CommitRequest(request) => {
227                            CoordinationHandleManagerEvent::CommitRequest {
228                                epoch: request.epoch,
229                                metadata: request.metadata.ok_or_else(|| anyhow!("empty sink metadata"))?,
230                            }
231                        }
232                        coordinate_request::Msg::AlignInitialEpochRequest(epoch) => {
233                            CoordinationHandleManagerEvent::AlignInitialEpoch(epoch)
234                        }
235                        coordinate_request::Msg::UpdateVnodeRequest(_) => {
236                            CoordinationHandleManagerEvent::UpdateVnodeBitmap
237                        }
238                        coordinate_request::Msg::Stop(_) => {
239                            CoordinationHandleManagerEvent::Stop
240                        }
241                        coordinate_request::Msg::StartRequest(_) => {
242                            unreachable!("should have been handled");
243                        }
244                    };
245                    Ok((handle_id, event))
246                }
247            }
248        }
249    }
250
251    fn vnode_bitmap(&self, handle_id: HandleId) -> &Bitmap {
252        self.writer_handles[&handle_id].vnode_bitmap()
253    }
254
255    fn stop_handle(&mut self, handle_id: HandleId) -> anyhow::Result<()> {
256        self.writer_handles
257            .remove(&handle_id)
258            .expect("should exist")
259            .stop()
260    }
261
262    async fn wait_init_handles(
263        &mut self,
264        log_store_rewind_start_epoch: Option<u64>,
265    ) -> anyhow::Result<HashSet<HandleId>> {
266        assert!(self.writer_handles.is_empty());
267        let mut init_requests = AligningRequests::default();
268        while !init_requests.aligned() {
269            let (handle_id, event) = self.next_event().await?;
270            let unexpected_event = match event {
271                CoordinationHandleManagerEvent::NewHandle => {
272                    init_requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
273                    continue;
274                }
275                event => event.name(),
276            };
277            return Err(anyhow!(
278                "expect new handle during init, but got {}",
279                unexpected_event
280            ));
281        }
282        self.start(
283            log_store_rewind_start_epoch,
284            init_requests.handle_ids.iter().cloned(),
285        )?;
286        if log_store_rewind_start_epoch.is_none() {
287            let mut align_requests = AligningRequests::default();
288            while !align_requests.aligned() {
289                let (handle_id, event) = self.next_event().await?;
290                match event {
291                    CoordinationHandleManagerEvent::AlignInitialEpoch(initial_epoch) => {
292                        align_requests.add_new_request(
293                            handle_id,
294                            initial_epoch,
295                            self.vnode_bitmap(handle_id),
296                        )?;
297                    }
298                    other => {
299                        return Err(anyhow!("expect AlignInitialEpoch but got {}", other.name()));
300                    }
301                }
302            }
303            let aligned_initial_epoch = align_requests
304                .requests
305                .into_iter()
306                .max()
307                .expect("non-empty");
308            self.ack_aligned_initial_epoch(aligned_initial_epoch)?;
309        }
310        Ok(init_requests.handle_ids)
311    }
312
313    async fn alter_parallelisms(
314        &mut self,
315        altered_handles: impl Iterator<Item = HandleId>,
316        prev_commit_epoch: u64,
317    ) -> anyhow::Result<HashSet<HandleId>> {
318        let mut requests = AligningRequests::default();
319        for handle_id in altered_handles {
320            requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
321        }
322        let mut remaining_handles: HashSet<_> = self
323            .writer_handles
324            .keys()
325            .filter(|handle_id| !requests.handle_ids.contains(handle_id))
326            .cloned()
327            .collect();
328        while !remaining_handles.is_empty() || !requests.aligned() {
329            let (handle_id, event) = self.next_event().await?;
330            match event {
331                CoordinationHandleManagerEvent::NewHandle => {
332                    requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
333                }
334                CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
335                    assert!(remaining_handles.remove(&handle_id));
336                    requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
337                }
338                CoordinationHandleManagerEvent::Stop => {
339                    assert!(remaining_handles.remove(&handle_id));
340                    self.stop_handle(handle_id)?;
341                }
342                CoordinationHandleManagerEvent::CommitRequest { epoch, .. } => {
343                    bail!(
344                        "receive commit request on epoch {} from handle {} during alter parallelism",
345                        epoch,
346                        handle_id
347                    );
348                }
349                CoordinationHandleManagerEvent::AlignInitialEpoch(epoch) => {
350                    bail!(
351                        "receive AlignInitialEpoch on epoch {} from handle {} during alter parallelism",
352                        epoch,
353                        handle_id
354                    );
355                }
356            }
357        }
358        self.start(Some(prev_commit_epoch), requests.handle_ids.iter().cloned())?;
359        Ok(requests.handle_ids)
360    }
361}
362
363pub struct CoordinatorWorker {
364    handle_manager: CoordinationHandleManager,
365}
366
367impl CoordinatorWorker {
368    pub async fn run(
369        param: SinkParam,
370        request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
371        db: DatabaseConnection,
372        subscriber: SinkCommittedEpochSubscriber,
373        iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
374    ) {
375        let sink = match build_sink(param.clone()) {
376            Ok(sink) => sink,
377            Err(e) => {
378                error!(
379                    error = %e.as_report(),
380                    "unable to build sink with param {:?}",
381                    param
382                );
383                return;
384            }
385        };
386
387        dispatch_sink!(sink, sink, {
388            let coordinator = match sink
389                .new_coordinator(db, Some(iceberg_compact_stat_sender))
390                .await
391            {
392                Ok(coordinator) => coordinator,
393                Err(e) => {
394                    error!(
395                        error = %e.as_report(),
396                        "unable to build coordinator with param {:?}",
397                        param
398                    );
399                    return;
400                }
401            };
402            Self::execute_coordinator(param, request_rx, coordinator, subscriber).await
403        });
404    }
405
406    pub async fn execute_coordinator(
407        param: SinkParam,
408        request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
409        coordinator: impl SinkCommitCoordinator,
410        subscriber: SinkCommittedEpochSubscriber,
411    ) {
412        let mut worker = CoordinatorWorker {
413            handle_manager: CoordinationHandleManager {
414                param,
415                writer_handles: HashMap::new(),
416                next_handle_id: 0,
417                request_rx,
418            },
419        };
420
421        if let Err(e) = worker.run_coordination(coordinator, subscriber).await {
422            for handle in worker.handle_manager.writer_handles.into_values() {
423                handle.abort(Status::internal(format!(
424                    "failed to run coordination: {:?}",
425                    e.as_report()
426                )))
427            }
428        }
429    }
430
431    async fn run_coordination(
432        &mut self,
433        mut coordinator: impl SinkCommitCoordinator,
434        subscriber: SinkCommittedEpochSubscriber,
435    ) -> anyhow::Result<()> {
436        let initial_log_store_rewind_start_epoch = coordinator.init(subscriber).await?;
437        let sink_id = self.handle_manager.param.sink_id;
438        let mut running_handles = self
439            .handle_manager
440            .wait_init_handles(initial_log_store_rewind_start_epoch)
441            .await?;
442        let mut pending_epochs: BTreeMap<u64, AligningRequests<SinkMetadata>> = BTreeMap::new();
443        let mut pending_new_handles = vec![];
444        let mut prev_commit_epoch = None;
445        loop {
446            let (handle_id, event) = self.handle_manager.next_event().await?;
447            let (epoch, metadata) = match event {
448                CoordinationHandleManagerEvent::NewHandle => {
449                    pending_new_handles.push(handle_id);
450                    continue;
451                }
452                CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
453                    running_handles = self
454                        .handle_manager
455                        .alter_parallelisms(
456                            pending_new_handles.drain(..).chain([handle_id]),
457                            prev_commit_epoch.ok_or_else(|| {
458                                anyhow!("should have committed once on alter parallelisms")
459                            })?,
460                        )
461                        .await?;
462                    continue;
463                }
464                CoordinationHandleManagerEvent::Stop => {
465                    self.handle_manager.stop_handle(handle_id)?;
466                    running_handles = self
467                        .handle_manager
468                        .alter_parallelisms(
469                            pending_new_handles.drain(..),
470                            prev_commit_epoch.ok_or_else(|| {
471                                anyhow!("should have committed once on alter parallelisms")
472                            })?,
473                        )
474                        .await?;
475                    continue;
476                }
477                CoordinationHandleManagerEvent::CommitRequest { epoch, metadata } => {
478                    (epoch, metadata)
479                }
480                CoordinationHandleManagerEvent::AlignInitialEpoch(_) => {
481                    bail!("receive AlignInitialEpoch after initialization")
482                }
483            };
484            if !running_handles.contains(&handle_id) {
485                bail!(
486                    "receiving commit request from non-running handle {}, running handles: {:?}",
487                    handle_id,
488                    running_handles
489                );
490            }
491            pending_epochs.entry(epoch).or_default().add_new_request(
492                handle_id,
493                metadata,
494                self.handle_manager.vnode_bitmap(handle_id),
495            )?;
496            if pending_epochs
497                .first_key_value()
498                .expect("non-empty")
499                .1
500                .aligned()
501            {
502                let (epoch, requests) = pending_epochs.pop_first().expect("non-empty");
503
504                let start_time = Instant::now();
505                run_future_with_periodic_fn(
506                    coordinator.commit(epoch, requests.requests),
507                    Duration::from_secs(5),
508                    || {
509                        warn!(
510                            elapsed = ?start_time.elapsed(),
511                            sink_id = sink_id.sink_id,
512                            "committing"
513                        );
514                    },
515                )
516                .await
517                .map_err(|e| anyhow!(e))?;
518                self.handle_manager.ack_commit(epoch, requests.handle_ids)?;
519                prev_commit_epoch = Some(epoch);
520            }
521        }
522    }
523}