risingwave_meta/manager/sink_coordination/
coordinator_worker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::fmt::Debug;
17use std::future::{Future, poll_fn};
18use std::pin::pin;
19use std::task::Poll;
20use std::time::{Duration, Instant};
21
22use anyhow::anyhow;
23use futures::future::{Either, select};
24use futures::pin_mut;
25use itertools::Itertools;
26use risingwave_common::bail;
27use risingwave_common::bitmap::Bitmap;
28use risingwave_common::catalog::Field;
29use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
30use risingwave_connector::dispatch_sink;
31use risingwave_connector::sink::{
32    Sink, SinkCommitCoordinator, SinkCommittedEpochSubscriber, SinkParam, build_sink,
33};
34use risingwave_pb::connector_service::{SinkMetadata, coordinate_request};
35use sea_orm::DatabaseConnection;
36use thiserror_ext::AsReport;
37use tokio::select;
38use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
39use tokio::time::sleep;
40use tonic::Status;
41use tracing::{error, warn};
42
43use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle;
44
45async fn run_future_with_periodic_fn<F: Future>(
46    future: F,
47    interval: Duration,
48    mut f: impl FnMut(),
49) -> F::Output {
50    pin_mut!(future);
51    loop {
52        match select(&mut future, pin!(sleep(interval))).await {
53            Either::Left((output, _)) => {
54                break output;
55            }
56            Either::Right(_) => f(),
57        }
58    }
59}
60
61type HandleId = usize;
62
63#[derive(Default)]
64struct AligningRequests<R> {
65    requests: Vec<R>,
66    handle_ids: HashSet<HandleId>,
67    committed_bitmap: Option<Bitmap>, // lazy-initialized on first request
68}
69
70impl<R> AligningRequests<R> {
71    fn add_new_request(
72        &mut self,
73        handle_id: HandleId,
74        request: R,
75        vnode_bitmap: &Bitmap,
76    ) -> anyhow::Result<()>
77    where
78        R: Debug,
79    {
80        let committed_bitmap = self
81            .committed_bitmap
82            .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
83        assert_eq!(committed_bitmap.len(), vnode_bitmap.len());
84
85        let check_bitmap = (&*committed_bitmap) & vnode_bitmap;
86        if check_bitmap.count_ones() > 0 {
87            return Err(anyhow!(
88                "duplicate vnode {:?}. request vnode: {:?}, prev vnode: {:?}. pending request: {:?}, request: {:?}",
89                check_bitmap.iter_ones().collect_vec(),
90                vnode_bitmap,
91                committed_bitmap,
92                self.requests,
93                request
94            ));
95        }
96        *committed_bitmap |= vnode_bitmap;
97        self.requests.push(request);
98        assert!(self.handle_ids.insert(handle_id));
99        Ok(())
100    }
101
102    fn aligned(&self) -> bool {
103        self.committed_bitmap.as_ref().is_some_and(|b| b.all())
104    }
105}
106
107struct CoordinationHandleManager {
108    param: SinkParam,
109    writer_handles: HashMap<HandleId, SinkWriterCoordinationHandle>,
110    next_handle_id: HandleId,
111    request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
112}
113
114impl CoordinationHandleManager {
115    fn start(
116        &mut self,
117        log_store_rewind_start_epoch: Option<u64>,
118        handle_ids: impl IntoIterator<Item = HandleId>,
119    ) -> anyhow::Result<()> {
120        for handle_id in handle_ids {
121            let handle = self
122                .writer_handles
123                .get_mut(&handle_id)
124                .ok_or_else(|| anyhow!("fail to find handle for {} to start", handle_id,))?;
125            handle.start(log_store_rewind_start_epoch).map_err(|_| {
126                anyhow!(
127                    "fail to start {:?} for handle {}",
128                    log_store_rewind_start_epoch,
129                    handle_id
130                )
131            })?;
132        }
133        Ok(())
134    }
135
136    fn ack_aligned_initial_epoch(&mut self, aligned_initial_epoch: u64) -> anyhow::Result<()> {
137        for (handle_id, handle) in &mut self.writer_handles {
138            handle
139                .ack_aligned_initial_epoch(aligned_initial_epoch)
140                .map_err(|_| {
141                    anyhow!(
142                        "fail to ack_aligned_initial_epoch {:?} for handle {}",
143                        aligned_initial_epoch,
144                        handle_id
145                    )
146                })?;
147        }
148        Ok(())
149    }
150
151    fn ack_commit(
152        &mut self,
153        epoch: u64,
154        handle_ids: impl IntoIterator<Item = HandleId>,
155    ) -> anyhow::Result<()> {
156        for handle_id in handle_ids {
157            let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| {
158                anyhow!(
159                    "fail to find handle for {} when ack commit on epoch {}",
160                    handle_id,
161                    epoch
162                )
163            })?;
164            handle.ack_commit(epoch).map_err(|_| {
165                anyhow!(
166                    "fail to ack commit on epoch {} for handle {}",
167                    epoch,
168                    handle_id
169                )
170            })?;
171        }
172        Ok(())
173    }
174
175    async fn next_request_inner(
176        writer_handles: &mut HashMap<HandleId, SinkWriterCoordinationHandle>,
177    ) -> anyhow::Result<(HandleId, coordinate_request::Msg)> {
178        poll_fn(|cx| {
179            for (handle_id, handle) in writer_handles.iter_mut() {
180                if let Poll::Ready(result) = handle.poll_next_request(cx) {
181                    return Poll::Ready(result.map(|request| (*handle_id, request)));
182                }
183            }
184            Poll::Pending
185        })
186        .await
187    }
188}
189
190enum CoordinationHandleManagerEvent {
191    NewHandle,
192    UpdateVnodeBitmap,
193    Stop,
194    CommitRequest {
195        epoch: u64,
196        metadata: SinkMetadata,
197        add_columns: Option<Vec<Field>>,
198    },
199    AlignInitialEpoch(u64),
200}
201
202impl CoordinationHandleManagerEvent {
203    fn name(&self) -> &'static str {
204        match self {
205            CoordinationHandleManagerEvent::NewHandle => "NewHandle",
206            CoordinationHandleManagerEvent::UpdateVnodeBitmap => "UpdateVnodeBitmap",
207            CoordinationHandleManagerEvent::Stop => "Stop",
208            CoordinationHandleManagerEvent::CommitRequest { .. } => "CommitRequest",
209            CoordinationHandleManagerEvent::AlignInitialEpoch(_) => "AlignInitialEpoch",
210        }
211    }
212}
213
214impl CoordinationHandleManager {
215    async fn next_event(&mut self) -> anyhow::Result<(HandleId, CoordinationHandleManagerEvent)> {
216        {
217            select! {
218                handle = self.request_rx.recv() => {
219                    let handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?;
220                    if handle.param() != &self.param {
221                        warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch");
222                    }
223                    let handle_id = self.next_handle_id;
224                    self.next_handle_id += 1;
225                    self.writer_handles.insert(handle_id, handle);
226                    Ok((handle_id, CoordinationHandleManagerEvent::NewHandle))
227                }
228                result = Self::next_request_inner(&mut self.writer_handles) => {
229                    let (handle_id, request) = result?;
230                    let event = match request {
231                        coordinate_request::Msg::CommitRequest(request) => {
232                            CoordinationHandleManagerEvent::CommitRequest {
233                                epoch: request.epoch,
234                                metadata: request.metadata.ok_or_else(|| anyhow!("empty sink metadata"))?,
235                                add_columns: request.add_columns.map(|add_columns| add_columns.fields.into_iter().map(|field| Field::from_prost(&field)).collect()),
236                            }
237                        }
238                        coordinate_request::Msg::AlignInitialEpochRequest(epoch) => {
239                            CoordinationHandleManagerEvent::AlignInitialEpoch(epoch)
240                        }
241                        coordinate_request::Msg::UpdateVnodeRequest(_) => {
242                            CoordinationHandleManagerEvent::UpdateVnodeBitmap
243                        }
244                        coordinate_request::Msg::Stop(_) => {
245                            CoordinationHandleManagerEvent::Stop
246                        }
247                        coordinate_request::Msg::StartRequest(_) => {
248                            unreachable!("should have been handled");
249                        }
250                    };
251                    Ok((handle_id, event))
252                }
253            }
254        }
255    }
256
257    fn vnode_bitmap(&self, handle_id: HandleId) -> &Bitmap {
258        self.writer_handles[&handle_id].vnode_bitmap()
259    }
260
261    fn stop_handle(&mut self, handle_id: HandleId) -> anyhow::Result<()> {
262        self.writer_handles
263            .remove(&handle_id)
264            .expect("should exist")
265            .stop()
266    }
267
268    async fn wait_init_handles(
269        &mut self,
270        log_store_rewind_start_epoch: Option<u64>,
271    ) -> anyhow::Result<HashSet<HandleId>> {
272        assert!(self.writer_handles.is_empty());
273        let mut init_requests = AligningRequests::default();
274        while !init_requests.aligned() {
275            let (handle_id, event) = self.next_event().await?;
276            let unexpected_event = match event {
277                CoordinationHandleManagerEvent::NewHandle => {
278                    init_requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
279                    continue;
280                }
281                event => event.name(),
282            };
283            return Err(anyhow!(
284                "expect new handle during init, but got {}",
285                unexpected_event
286            ));
287        }
288        self.start(
289            log_store_rewind_start_epoch,
290            init_requests.handle_ids.iter().cloned(),
291        )?;
292        if log_store_rewind_start_epoch.is_none() {
293            let mut align_requests = AligningRequests::default();
294            while !align_requests.aligned() {
295                let (handle_id, event) = self.next_event().await?;
296                match event {
297                    CoordinationHandleManagerEvent::AlignInitialEpoch(initial_epoch) => {
298                        align_requests.add_new_request(
299                            handle_id,
300                            initial_epoch,
301                            self.vnode_bitmap(handle_id),
302                        )?;
303                    }
304                    other => {
305                        return Err(anyhow!("expect AlignInitialEpoch but got {}", other.name()));
306                    }
307                }
308            }
309            let aligned_initial_epoch = align_requests
310                .requests
311                .into_iter()
312                .max()
313                .expect("non-empty");
314            self.ack_aligned_initial_epoch(aligned_initial_epoch)?;
315        }
316        Ok(init_requests.handle_ids)
317    }
318
319    async fn alter_parallelisms(
320        &mut self,
321        altered_handles: impl Iterator<Item = HandleId>,
322        prev_commit_epoch: u64,
323    ) -> anyhow::Result<HashSet<HandleId>> {
324        let mut requests = AligningRequests::default();
325        for handle_id in altered_handles {
326            requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
327        }
328        let mut remaining_handles: HashSet<_> = self
329            .writer_handles
330            .keys()
331            .filter(|handle_id| !requests.handle_ids.contains(handle_id))
332            .cloned()
333            .collect();
334        while !remaining_handles.is_empty() || !requests.aligned() {
335            let (handle_id, event) = self.next_event().await?;
336            match event {
337                CoordinationHandleManagerEvent::NewHandle => {
338                    requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
339                }
340                CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
341                    assert!(remaining_handles.remove(&handle_id));
342                    requests.add_new_request(handle_id, (), self.vnode_bitmap(handle_id))?;
343                }
344                CoordinationHandleManagerEvent::Stop => {
345                    assert!(remaining_handles.remove(&handle_id));
346                    self.stop_handle(handle_id)?;
347                }
348                CoordinationHandleManagerEvent::CommitRequest { epoch, .. } => {
349                    bail!(
350                        "receive commit request on epoch {} from handle {} during alter parallelism",
351                        epoch,
352                        handle_id
353                    );
354                }
355                CoordinationHandleManagerEvent::AlignInitialEpoch(epoch) => {
356                    bail!(
357                        "receive AlignInitialEpoch on epoch {} from handle {} during alter parallelism",
358                        epoch,
359                        handle_id
360                    );
361                }
362            }
363        }
364        self.start(Some(prev_commit_epoch), requests.handle_ids.iter().cloned())?;
365        Ok(requests.handle_ids)
366    }
367}
368
369pub struct CoordinatorWorker {
370    handle_manager: CoordinationHandleManager,
371}
372
373impl CoordinatorWorker {
374    pub async fn run(
375        param: SinkParam,
376        request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
377        db: DatabaseConnection,
378        subscriber: SinkCommittedEpochSubscriber,
379        iceberg_compact_stat_sender: UnboundedSender<IcebergSinkCompactionUpdate>,
380    ) {
381        let sink = match build_sink(param.clone()) {
382            Ok(sink) => sink,
383            Err(e) => {
384                error!(
385                    error = %e.as_report(),
386                    "unable to build sink with param {:?}",
387                    param
388                );
389                return;
390            }
391        };
392
393        dispatch_sink!(sink, sink, {
394            let coordinator = match sink
395                .new_coordinator(db, Some(iceberg_compact_stat_sender))
396                .await
397            {
398                Ok(coordinator) => coordinator,
399                Err(e) => {
400                    error!(
401                        error = %e.as_report(),
402                        "unable to build coordinator with param {:?}",
403                        param
404                    );
405                    return;
406                }
407            };
408            Self::execute_coordinator(param, request_rx, coordinator, subscriber).await
409        });
410    }
411
412    pub async fn execute_coordinator(
413        param: SinkParam,
414        request_rx: UnboundedReceiver<SinkWriterCoordinationHandle>,
415        coordinator: impl SinkCommitCoordinator,
416        subscriber: SinkCommittedEpochSubscriber,
417    ) {
418        let mut worker = CoordinatorWorker {
419            handle_manager: CoordinationHandleManager {
420                param,
421                writer_handles: HashMap::new(),
422                next_handle_id: 0,
423                request_rx,
424            },
425        };
426
427        if let Err(e) = worker.run_coordination(coordinator, subscriber).await {
428            for handle in worker.handle_manager.writer_handles.into_values() {
429                handle.abort(Status::internal(format!(
430                    "failed to run coordination: {:?}",
431                    e.as_report()
432                )))
433            }
434        }
435    }
436
437    async fn run_coordination(
438        &mut self,
439        mut coordinator: impl SinkCommitCoordinator,
440        subscriber: SinkCommittedEpochSubscriber,
441    ) -> anyhow::Result<()> {
442        let initial_log_store_rewind_start_epoch = coordinator.init(subscriber).await?;
443        let sink_id = self.handle_manager.param.sink_id;
444        let mut running_handles = self
445            .handle_manager
446            .wait_init_handles(initial_log_store_rewind_start_epoch)
447            .await?;
448        let mut pending_epochs: BTreeMap<u64, AligningRequests<_>> = BTreeMap::new();
449        let mut pending_new_handles = vec![];
450        let mut prev_commit_epoch = None;
451        loop {
452            let (handle_id, event) = self.handle_manager.next_event().await?;
453            let (epoch, commit_request) = match event {
454                CoordinationHandleManagerEvent::NewHandle => {
455                    pending_new_handles.push(handle_id);
456                    continue;
457                }
458                CoordinationHandleManagerEvent::UpdateVnodeBitmap => {
459                    running_handles = self
460                        .handle_manager
461                        .alter_parallelisms(
462                            pending_new_handles.drain(..).chain([handle_id]),
463                            prev_commit_epoch.ok_or_else(|| {
464                                anyhow!("should have committed once on alter parallelisms")
465                            })?,
466                        )
467                        .await?;
468                    continue;
469                }
470                CoordinationHandleManagerEvent::Stop => {
471                    self.handle_manager.stop_handle(handle_id)?;
472                    running_handles = self
473                        .handle_manager
474                        .alter_parallelisms(
475                            pending_new_handles.drain(..),
476                            prev_commit_epoch.ok_or_else(|| {
477                                anyhow!("should have committed once on alter parallelisms")
478                            })?,
479                        )
480                        .await?;
481                    continue;
482                }
483                CoordinationHandleManagerEvent::CommitRequest {
484                    epoch,
485                    metadata,
486                    add_columns,
487                } => (epoch, (metadata, add_columns)),
488                CoordinationHandleManagerEvent::AlignInitialEpoch(_) => {
489                    bail!("receive AlignInitialEpoch after initialization")
490                }
491            };
492            if !running_handles.contains(&handle_id) {
493                bail!(
494                    "receiving commit request from non-running handle {}, running handles: {:?}",
495                    handle_id,
496                    running_handles
497                );
498            }
499            pending_epochs.entry(epoch).or_default().add_new_request(
500                handle_id,
501                commit_request,
502                self.handle_manager.vnode_bitmap(handle_id),
503            )?;
504            if pending_epochs
505                .first_key_value()
506                .expect("non-empty")
507                .1
508                .aligned()
509            {
510                let (epoch, commit_requests) = pending_epochs.pop_first().expect("non-empty");
511                let mut metadatas = Vec::with_capacity(commit_requests.requests.len());
512                let mut requests = commit_requests.requests.into_iter();
513                let (first_metadata, first_add_columns) = requests.next().expect("non-empty");
514                metadatas.push(first_metadata);
515                for (metadata, add_columns) in requests {
516                    if first_add_columns != add_columns {
517                        return Err(anyhow!(
518                            "got different add columns {:?} to prev add columns {:?}",
519                            add_columns,
520                            first_add_columns
521                        ));
522                    }
523                    metadatas.push(metadata);
524                }
525
526                let start_time = Instant::now();
527                run_future_with_periodic_fn(
528                    coordinator.commit(epoch, metadatas, first_add_columns),
529                    Duration::from_secs(5),
530                    || {
531                        warn!(
532                            elapsed = ?start_time.elapsed(),
533                            sink_id = sink_id.sink_id,
534                            "committing"
535                        );
536                    },
537                )
538                .await
539                .map_err(|e| anyhow!(e))?;
540                self.handle_manager
541                    .ack_commit(epoch, commit_requests.handle_ids)?;
542                prev_commit_epoch = Some(epoch);
543            }
544        }
545    }
546}