risingwave_meta/stream/source_manager/
worker.rs#[cfg(not(debug_assertions))]
use risingwave_connector::error::ConnectorError;
use risingwave_connector::source::AnySplitEnumerator;
use super::*;
const MAX_FAIL_CNT: u32 = 10;
const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);
const DEBUG_SPLITS_KEY: &str = "debug_splits";
pub struct SharedSplitMap {
pub splits: Option<BTreeMap<SplitId, SplitImpl>>,
}
type SharedSplitMapRef = Arc<Mutex<SharedSplitMap>>;
pub struct ConnectorSourceWorker {
source_id: SourceId,
source_name: String,
current_splits: SharedSplitMapRef,
enumerator: Box<dyn AnySplitEnumerator>,
period: Duration,
metrics: Arc<MetaMetrics>,
connector_properties: ConnectorProperties,
fail_cnt: u32,
source_is_up: LabelGuardedIntGauge<2>,
debug_splits: Option<Vec<SplitImpl>>,
}
fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
let options_with_secret =
WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
let mut properties = ConnectorProperties::extract(options_with_secret, false)?;
properties.init_from_pb_source(source);
Ok(properties)
}
fn extract_prop_from_new_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
let options_with_secret = WithOptionsSecResolved::new(
{
let mut with_properties = source.with_properties.clone();
let _removed = with_properties.remove(DEBUG_SPLITS_KEY);
#[cfg(not(debug_assertions))]
{
if _removed.is_some() {
return Err(ConnectorError::from(anyhow::anyhow!(
"`debug_splits` is not allowed in release mode"
)));
}
}
with_properties
},
source.secret_refs.clone(),
);
let mut properties = ConnectorProperties::extract(options_with_secret, true)?;
properties.init_from_pb_source(source);
Ok(properties)
}
pub async fn create_source_worker(
source: &Source,
metrics: Arc<MetaMetrics>,
) -> MetaResult<ConnectorSourceWorkerHandle> {
tracing::info!("spawning new watcher for source {}", source.id);
let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
let current_splits_ref = splits.clone();
let connector_properties = extract_prop_from_new_source(source)?;
let enable_scale_in = connector_properties.enable_drop_split();
let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
let handle = {
let mut worker = ConnectorSourceWorker::create(
source,
connector_properties,
DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
current_splits_ref.clone(),
metrics,
)
.await?;
tokio::time::timeout(DEFAULT_SOURCE_TICK_TIMEOUT, worker.tick())
.await
.ok()
.with_context(|| {
format!(
"failed to fetch meta info for source {}, timeout {:?}",
source.id, DEFAULT_SOURCE_TICK_TIMEOUT
)
})??;
tokio::spawn(async move { worker.run(command_rx).await })
};
Ok(ConnectorSourceWorkerHandle {
handle,
command_tx,
splits,
enable_drop_split: enable_scale_in,
enable_adaptive_splits,
})
}
pub fn create_source_worker_async(
source: Source,
managed_sources: &mut HashMap<SourceId, ConnectorSourceWorkerHandle>,
metrics: Arc<MetaMetrics>,
) -> MetaResult<()> {
tracing::info!("spawning new watcher for source {}", source.id);
let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
let current_splits_ref = splits.clone();
let source_id = source.id;
let connector_properties = extract_prop_from_existing_source(&source)?;
let enable_drop_split = connector_properties.enable_drop_split();
let enable_adaptive_splits = connector_properties.enable_adaptive_splits();
let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
let handle = tokio::spawn(async move {
let mut ticker = time::interval(DEFAULT_SOURCE_WORKER_TICK_INTERVAL);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut worker = loop {
ticker.tick().await;
match ConnectorSourceWorker::create(
&source,
connector_properties.clone(),
DEFAULT_SOURCE_WORKER_TICK_INTERVAL,
current_splits_ref.clone(),
metrics.clone(),
)
.await
{
Ok(worker) => {
break worker;
}
Err(e) => {
tracing::warn!(error = %e.as_report(), "failed to create source worker");
}
}
};
worker.run(command_rx).await
});
managed_sources.insert(
source_id as SourceId,
ConnectorSourceWorkerHandle {
handle,
command_tx,
splits,
enable_drop_split,
enable_adaptive_splits,
},
);
Ok(())
}
const DEFAULT_SOURCE_WORKER_TICK_INTERVAL: Duration = Duration::from_secs(30);
impl ConnectorSourceWorker {
async fn refresh(&mut self) -> MetaResult<()> {
let enumerator = self
.connector_properties
.clone()
.create_split_enumerator(Arc::new(SourceEnumeratorContext {
metrics: self.metrics.source_enumerator_metrics.clone(),
info: SourceEnumeratorInfo {
source_id: self.source_id as u32,
},
}))
.await
.context("failed to create SplitEnumerator")?;
self.enumerator = enumerator;
self.fail_cnt = 0;
tracing::info!("refreshed source enumerator: {}", self.source_name);
Ok(())
}
pub async fn create(
source: &Source,
connector_properties: ConnectorProperties,
period: Duration,
splits: Arc<Mutex<SharedSplitMap>>,
metrics: Arc<MetaMetrics>,
) -> MetaResult<Self> {
let enumerator = connector_properties
.clone()
.create_split_enumerator(Arc::new(SourceEnumeratorContext {
metrics: metrics.source_enumerator_metrics.clone(),
info: SourceEnumeratorInfo {
source_id: source.id,
},
}))
.await
.context("failed to create SplitEnumerator")?;
let source_is_up = metrics
.source_is_up
.with_guarded_label_values(&[source.id.to_string().as_str(), &source.name]);
Ok(Self {
source_id: source.id as SourceId,
source_name: source.name.clone(),
current_splits: splits,
enumerator,
period,
metrics,
connector_properties,
fail_cnt: 0,
source_is_up,
debug_splits: {
let debug_splits = source.with_properties.get(DEBUG_SPLITS_KEY);
#[cfg(not(debug_assertions))]
{
if debug_splits.is_some() {
return Err(ConnectorError::from(anyhow::anyhow!(
"`debug_splits` is not allowed in release mode"
))
.into());
}
None
}
#[cfg(debug_assertions)]
{
use risingwave_common::types::JsonbVal;
if let Some(debug_splits) = debug_splits {
let mut splits = Vec::new();
let debug_splits_value =
jsonbb::serde_json::from_str::<serde_json::Value>(debug_splits)
.context("failed to parse split impl")?;
for split_impl_value in debug_splits_value.as_array().unwrap() {
splits.push(SplitImpl::restore_from_json(JsonbVal::from(
split_impl_value.clone(),
))?);
}
Some(splits)
} else {
None
}
}
},
})
}
pub async fn run(&mut self, mut command_rx: UnboundedReceiver<SourceWorkerCommand>) {
let mut interval = time::interval(self.period);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
select! {
biased;
cmd = command_rx.borrow_mut().recv() => {
if let Some(cmd) = cmd {
match cmd {
SourceWorkerCommand::Tick(tx) => {
let _ = tx.send(self.tick().await);
}
SourceWorkerCommand::DropFragments(fragment_ids) => {
if let Err(e) = self.drop_fragments(fragment_ids).await {
tracing::warn!(error = %e.as_report(), "error happened when drop fragment");
}
}
SourceWorkerCommand::FinishBackfill(fragment_ids) => {
if let Err(e) = self.finish_backfill(fragment_ids).await {
tracing::warn!(error = %e.as_report(), "error happened when finish backfill");
}
}
SourceWorkerCommand::Terminate => {
return;
}
}
}
}
_ = interval.tick() => {
if self.fail_cnt > MAX_FAIL_CNT {
if let Err(e) = self.refresh().await {
tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker");
}
}
if let Err(e) = self.tick().await {
tracing::error!(error = %e.as_report(), "error happened when tick from connector source worker");
}
}
}
}
}
async fn tick(&mut self) -> MetaResult<()> {
let source_is_up = |res: i64| {
self.source_is_up.set(res);
};
let splits = {
if let Some(debug_splits) = &self.debug_splits {
debug_splits.clone()
} else {
self.enumerator.list_splits().await.inspect_err(|_| {
source_is_up(0);
self.fail_cnt += 1;
})?
}
};
source_is_up(1);
self.fail_cnt = 0;
let mut current_splits = self.current_splits.lock().await;
current_splits.splits.replace(
splits
.into_iter()
.map(|split| (split.id(), split))
.collect(),
);
Ok(())
}
async fn drop_fragments(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
self.enumerator.on_drop_fragments(fragment_ids).await?;
Ok(())
}
async fn finish_backfill(&mut self, fragment_ids: Vec<FragmentId>) -> MetaResult<()> {
self.enumerator.on_finish_backfill(fragment_ids).await?;
Ok(())
}
}
pub struct ConnectorSourceWorkerHandle {
#[expect(dead_code)]
handle: JoinHandle<()>,
command_tx: UnboundedSender<SourceWorkerCommand>,
pub splits: SharedSplitMapRef,
pub enable_drop_split: bool,
pub enable_adaptive_splits: bool,
}
impl ConnectorSourceWorkerHandle {
pub fn get_enable_adaptive_splits(&self) -> bool {
self.enable_adaptive_splits
}
pub async fn discovered_splits(
&self,
source_id: SourceId,
actors: &HashSet<ActorId>,
) -> MetaResult<BTreeMap<Arc<str>, SplitImpl>> {
let Some(mut discovered_splits) = self.splits.lock().await.splits.clone() else {
tracing::info!(
"The discover loop for source {} is not ready yet; we'll wait for the next run",
source_id
);
return Ok(BTreeMap::new());
};
if discovered_splits.is_empty() {
tracing::warn!("No splits discovered for source {}", source_id);
}
if self.enable_adaptive_splits {
debug_assert!(self.enable_drop_split);
debug_assert!(discovered_splits.len() == 1);
discovered_splits =
fill_adaptive_split(discovered_splits.values().next().unwrap(), actors)?;
}
Ok(discovered_splits)
}
fn send_command(&self, command: SourceWorkerCommand) -> MetaResult<()> {
let cmd_str = format!("{:?}", command);
self.command_tx
.send(command)
.with_context(|| format!("failed to send {cmd_str} command to source worker"))?;
Ok(())
}
pub async fn force_tick(&self) -> MetaResult<()> {
let (tx, rx) = oneshot::channel();
self.send_command(SourceWorkerCommand::Tick(tx))?;
rx.await
.context("failed to receive tick command response from source worker")?
.context("source worker tick failed")?;
Ok(())
}
pub fn drop_fragments(&self, fragment_ids: Vec<FragmentId>) {
tracing::debug!("drop_fragments: {:?}", fragment_ids);
if let Err(e) = self.send_command(SourceWorkerCommand::DropFragments(fragment_ids)) {
tracing::warn!(error = %e.as_report(), "failed to drop fragments");
}
}
pub fn finish_backfill(&self, fragment_ids: Vec<FragmentId>) {
tracing::debug!("finish_backfill: {:?}", fragment_ids);
if let Err(e) = self.send_command(SourceWorkerCommand::FinishBackfill(fragment_ids)) {
tracing::warn!(error = %e.as_report(), "failed to finish backfill");
}
}
pub fn terminate(&self, dropped_fragments: Option<BTreeSet<FragmentId>>) {
tracing::debug!("terminate: {:?}", dropped_fragments);
if let Some(dropped_fragments) = dropped_fragments {
self.drop_fragments(dropped_fragments.into_iter().collect());
}
if let Err(e) = self.send_command(SourceWorkerCommand::Terminate) {
tracing::warn!(error = %e.as_report(), "failed to terminate source worker");
}
}
}
#[derive(educe::Educe)]
#[educe(Debug)]
pub enum SourceWorkerCommand {
Tick(#[educe(Debug(ignore))] oneshot::Sender<MetaResult<()>>),
DropFragments(Vec<FragmentId>),
FinishBackfill(Vec<FragmentId>),
Terminate,
}