pub struct SourceManager {
pub paused: Mutex<()>,
barrier_scheduler: BarrierScheduler,
core: Mutex<SourceManagerCore>,
pub metrics: Arc<MetaMetrics>,
}
Expand description
SourceManager
keeps fetching the latest split metadata from the external source services (ConnectorSourceWorker::tick
),
and sends a split assignment command if split changes detected (Self::tick
).
Fields§
§paused: Mutex<()>
§barrier_scheduler: BarrierScheduler
§core: Mutex<SourceManagerCore>
§metrics: Arc<MetaMetrics>
Implementations§
source§impl SourceManager
impl SourceManager
const DEFAULT_SOURCE_TICK_INTERVAL: Duration = _
pub async fn new( barrier_scheduler: BarrierScheduler, metadata_manager: MetadataManager, metrics: Arc<MetaMetrics>, ) -> MetaResult<Self>
pub async fn drop_source_fragments( &self, source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>, removed_actors: HashSet<ActorId>, )
sourcepub async fn drop_source_fragments_vec(
&self,
table_fragments: &[TableFragments],
)
pub async fn drop_source_fragments_vec( &self, table_fragments: &[TableFragments], )
For dropping MV.
sourcepub async fn apply_source_change(
&self,
added_source_fragments: Option<HashMap<SourceId, BTreeSet<FragmentId>>>,
added_backfill_fragments: Option<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>>,
split_assignment: Option<SplitAssignment>,
dropped_actors: Option<HashSet<ActorId>>,
)
pub async fn apply_source_change( &self, added_source_fragments: Option<HashMap<SourceId, BTreeSet<FragmentId>>>, added_backfill_fragments: Option<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>>, split_assignment: Option<SplitAssignment>, dropped_actors: Option<HashSet<ActorId>>, )
Updates states after split change (post_collect
barrier) or scaling (post_apply_reschedule
).
sourcepub async fn migrate_splits_for_source_actors(
&self,
fragment_id: FragmentId,
prev_actor_ids: &[ActorId],
curr_actor_ids: &[ActorId],
) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>>
pub async fn migrate_splits_for_source_actors( &self, fragment_id: FragmentId, prev_actor_ids: &[ActorId], curr_actor_ids: &[ActorId], ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>>
Migrates splits from previous actors to the new actors for a rescheduled fragment.
Very occasionally split removal may happen during scaling, in which case we need to use the old splits for reallocation instead of the latest splits (which may be missing), so that we can resolve the split removal in the next command.
sourcepub fn migrate_splits_for_backfill_actors(
&self,
fragment_id: FragmentId,
upstream_fragment_ids: &Vec<FragmentId>,
curr_actor_ids: &[ActorId],
fragment_actor_splits: &HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>,
no_shuffle_upstream_actor_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>,
) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>>
pub fn migrate_splits_for_backfill_actors( &self, fragment_id: FragmentId, upstream_fragment_ids: &Vec<FragmentId>, curr_actor_ids: &[ActorId], fragment_actor_splits: &HashMap<FragmentId, HashMap<ActorId, Vec<SplitImpl>>>, no_shuffle_upstream_actor_map: &HashMap<ActorId, HashMap<FragmentId, ActorId>>, ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>>
Migrates splits from previous actors to the new actors for a rescheduled fragment.
sourcepub async fn allocate_splits(
&self,
table_id: &TableId,
) -> MetaResult<SplitAssignment>
pub async fn allocate_splits( &self, table_id: &TableId, ) -> MetaResult<SplitAssignment>
Allocates splits to actors for a newly created source executor.
sourcepub async fn allocate_splits_for_backfill(
&self,
table_id: &TableId,
dispatchers: &HashMap<ActorId, Vec<Dispatcher>>,
) -> MetaResult<SplitAssignment>
pub async fn allocate_splits_for_backfill( &self, table_id: &TableId, dispatchers: &HashMap<ActorId, Vec<Dispatcher>>, ) -> MetaResult<SplitAssignment>
Allocates splits to actors for a newly created SourceBackfill
executor.
Unlike Self::allocate_splits
, which creates a new assignment,
this method aligns the splits for backfill fragments with its upstream source fragment (align_backfill_splits
).
sourcepub async fn register_source(&self, source: &Source) -> MetaResult<()>
pub async fn register_source(&self, source: &Source) -> MetaResult<()>
create and register connector worker for source.
sourcepub async fn register_source_with_handle(
&self,
source_id: SourceId,
handle: ConnectorSourceWorkerHandle,
)
pub async fn register_source_with_handle( &self, source_id: SourceId, handle: ConnectorSourceWorkerHandle, )
register connector worker for source.
sourcepub async fn unregister_sources(&self, source_ids: Vec<SourceId>)
pub async fn unregister_sources(&self, source_ids: Vec<SourceId>)
Unregister connector worker for source.
sourcefn create_source_worker_async(
source: Source,
managed_sources: &mut HashMap<SourceId, ConnectorSourceWorkerHandle>,
metrics: Arc<MetaMetrics>,
) -> MetaResult<()>
fn create_source_worker_async( source: Source, managed_sources: &mut HashMap<SourceId, ConnectorSourceWorkerHandle>, metrics: Arc<MetaMetrics>, ) -> MetaResult<()>
Used on startup (Self::new
). Failed sources will not block meta startup.
pub async fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>>
pub async fn get_running_info(&self) -> SourceManagerRunningInfo
sourceasync fn tick(&self) -> MetaResult<()>
async fn tick(&self) -> MetaResult<()>
Checks whether the external source metadata has changed, and sends a split assignment command if it has.
This is also how a newly created SourceExecutor
is initialized.
(force tick
in Self::create_source_worker
)
The command will first updates SourceExecutor
’s splits, and finally calls Self::apply_source_change
to update states in SourceManager
.
pub async fn run(&self) -> MetaResult<()>
Auto Trait Implementations§
impl !Freeze for SourceManager
impl !RefUnwindSafe for SourceManager
impl Send for SourceManager
impl Sync for SourceManager
impl Unpin for SourceManager
impl !UnwindSafe for SourceManager
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
source§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
source§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level
.source§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
.source§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
with metric_level
set to
MetricLevel::Debug
and relabel_num
set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.