risingwave_meta/manager/sink_coordination/
coordinator_worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::future::{Future, poll_fn};
17use std::pin::pin;
18use std::task::Poll;
19use std::time::{Duration, Instant};
20
21use anyhow::anyhow;
22use futures::future::{Either, select};
23use futures::pin_mut;
24use itertools::Itertools;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_connector::dispatch_sink;
27use risingwave_connector::sink::{
28    Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkParam, build_sink,
29};
30use risingwave_pb::connector_service::SinkMetadata;
31use sea_orm::DatabaseConnection;
32use thiserror_ext::AsReport;
33use tokio::select;
34use tokio::sync::mpsc::UnboundedReceiver;
35use tokio::time::sleep;
36use tonic::Status;
37use tracing::{error, warn};
38
39use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
40
41async fn run_future_with_periodic_fn<F: Future>(
42    future: F,
43    interval: Duration,
44    mut f: impl FnMut(),
45) -> F::Output {
46    pin_mut!(future);
47    loop {
48        match select(&mut future, pin!(sleep(interval))).await {
49            Either::Left((output, _)) => {
50                break output;
51            }
52            Either::Right(_) => f(),
53        }
54    }
55}
56
57struct EpochCommitRequests {
58    epoch: u64,
59    metadatas: Vec<SinkMetadata>,
60    handle_ids: HashSet<usize>,
61    committed_bitmap: Option<Bitmap>, // lazy-initialized on first request
62}
63
64impl EpochCommitRequests {
65    fn new(epoch: u64) -> Self {
66        Self {
67            epoch,
68            metadatas: vec![],
69            handle_ids: Default::default(),
70            committed_bitmap: None,
71        }
72    }
73
74    fn add_new_request(
75        &mut self,
76        handle_id: usize,
77        metadata: SinkMetadata,
78        vnode_bitmap: Bitmap,
79    ) -> anyhow::Result<()> {
80        let committed_bitmap = self
81            .committed_bitmap
82            .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
83        assert_eq!(committed_bitmap.len(), vnode_bitmap.len());
84
85        self.metadatas.push(metadata);
86        assert!(self.handle_ids.insert(handle_id));
87        let check_bitmap = (&*committed_bitmap) & &vnode_bitmap;
88        if check_bitmap.count_ones() > 0 {
89            return Err(anyhow!(
90                "duplicate vnode {:?} on epoch {}. request vnode: {:?}, prev vnode: {:?}",
91                check_bitmap.iter_ones().collect_vec(),
92                self.epoch,
93                vnode_bitmap,
94                committed_bitmap
95            ));
96        }
97        *committed_bitmap |= &vnode_bitmap;
98        Ok(())
99    }
100
101    fn can_commit(&self) -> bool {
102        self.committed_bitmap.as_ref().is_some_and(|b| b.all())
103    }
104}
105
106struct CoordinationHandleManager {
107    param: SinkParam,
108    writer_handles: HashMap<usize, SinkWriterCoordinationHandle>,
109    next_handle_id: usize,
110    request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
111    initial_log_store_rewind_start_epoch: Option<u64>,
112}
113
114impl CoordinationHandleManager {
115    fn ack_commit(
116        &mut self,
117        epoch: u64,
118        handle_ids: impl IntoIterator<Item = usize>,
119    ) -> anyhow::Result<()> {
120        for handle_id in handle_ids {
121            let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| {
122                anyhow!(
123                    "fail to find handle for {} when ack commit on epoch {}",
124                    handle_id,
125                    epoch
126                )
127            })?;
128            handle.ack_commit(epoch).map_err(|_| {
129                anyhow!(
130                    "fail to ack commit on epoch {} for handle {}",
131                    epoch,
132                    handle_id
133                )
134            })?;
135        }
136        Ok(())
137    }
138
139    async fn next_commit_request_inner(
140        writer_handles: &mut HashMap<usize, SinkWriterCoordinationHandle>,
141    ) -> anyhow::Result<(usize, Bitmap, u64, SinkMetadata)> {
142        poll_fn(|cx| {
143            'outer: loop {
144                for (handle_id, handle) in writer_handles.iter_mut() {
145                    if let Poll::Ready(result) = handle.poll_next_commit_request(cx) {
146                        match result {
147                            Ok(Some((epoch, metadata))) => {
148                                return Poll::Ready(Ok((
149                                    *handle_id,
150                                    handle.vnode_bitmap().clone(),
151                                    epoch,
152                                    metadata,
153                                )));
154                            }
155                            Ok(None) => {
156                                let handle_id = *handle_id;
157                                writer_handles.remove(&handle_id);
158                                continue 'outer;
159                            }
160                            Err(e) => {
161                                return Poll::Ready(Err(e));
162                            }
163                        }
164                    }
165                }
166                return Poll::Pending;
167            }
168        })
169        .await
170    }
171
172    async fn next_commit_request(&mut self) -> anyhow::Result<(usize, Bitmap, u64, SinkMetadata)> {
173        loop {
174            select! {
175                handle = self.request_rx.recv() => {
176                    let mut handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?;
177                    if handle.param() != &self.param {
178                        warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch");
179                    }
180                    handle.start(self.initial_log_store_rewind_start_epoch)?;
181                    let handle_id = self.next_handle_id;
182                    self.next_handle_id += 1;
183                    self.writer_handles.insert(handle_id, handle);
184                }
185                result = Self::next_commit_request_inner(&mut self.writer_handles) => {
186                    break result;
187                }
188            }
189        }
190    }
191}
192
193pub struct CoordinatorWorker {
194    handle_manager: CoordinationHandleManager,
195    pending_epochs: BTreeMap<u64, EpochCommitRequests>,
196}
197
198impl CoordinatorWorker {
199    pub async fn run(
200        param: SinkParam,
201        request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
202        db: DatabaseConnection,
203        subscriber: SinkCommittedEpochSubscriber,
204    ) {
205        let sink = match build_sink(param.clone()) {
206            Ok(sink) => sink,
207            Err(e) => {
208                error!(
209                    error = %e.as_report(),
210                    "unable to build sink with param {:?}",
211                    param
212                );
213                return;
214            }
215        };
216
217        dispatch_sink!(sink, sink, {
218            let coordinator = match sink.new_coordinator(db).await {
219                Ok(coordinator) => coordinator,
220                Err(e) => {
221                    error!(
222                        error = %e.as_report(),
223                        "unable to build coordinator with param {:?}",
224                        param
225                    );
226                    return;
227                }
228            };
229            Self::execute_coordinator(param, request_rx, coordinator, subscriber).await
230        });
231    }
232
233    pub async fn execute_coordinator(
234        param: SinkParam,
235        request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
236        coordinator: impl SinkCommitCoordinator,
237        subscriber: SinkCommittedEpochSubscriber,
238    ) {
239        let mut worker = CoordinatorWorker {
240            handle_manager: CoordinationHandleManager {
241                param,
242                writer_handles: HashMap::new(),
243                next_handle_id: 0,
244                request_rx,
245                initial_log_store_rewind_start_epoch: None,
246            },
247            pending_epochs: Default::default(),
248        };
249
250        if let Err(e) = worker.run_coordination(coordinator, subscriber).await {
251            for handle in worker.handle_manager.writer_handles.into_values() {
252                handle.abort(Status::internal(format!(
253                    "failed to run coordination: {:?}",
254                    e.as_report()
255                )))
256            }
257        }
258    }
259
260    async fn run_coordination(
261        &mut self,
262        mut coordinator: impl SinkCommitCoordinator,
263        subscriber: SinkCommittedEpochSubscriber,
264    ) -> anyhow::Result<()> {
265        self.handle_manager.initial_log_store_rewind_start_epoch =
266            coordinator.init(subscriber).await?;
267        loop {
268            let (handle_id, vnode_bitmap, epoch, metadata) =
269                self.handle_manager.next_commit_request().await?;
270            self.pending_epochs
271                .entry(epoch)
272                .or_insert_with(|| EpochCommitRequests::new(epoch))
273                .add_new_request(handle_id, metadata, vnode_bitmap)?;
274            if self
275                .pending_epochs
276                .first_key_value()
277                .expect("non-empty")
278                .1
279                .can_commit()
280            {
281                let (epoch, requests) = self.pending_epochs.pop_first().expect("non-empty");
282
283                let start_time = Instant::now();
284                run_future_with_periodic_fn(
285                    coordinator.commit(epoch, requests.metadatas),
286                    Duration::from_secs(5),
287                    || {
288                        warn!(
289                            elapsed = ?start_time.elapsed(),
290                            sink_id = self.handle_manager.param.sink_id.sink_id,
291                            "committing"
292                        );
293                    },
294                )
295                .await
296                .map_err(|e| anyhow!(e))?;
297                self.handle_manager.ack_commit(epoch, requests.handle_ids)?;
298            }
299        }
300    }
301}