pub(super) struct GlobalBarrierWorkerContextImpl {
scheduled_barriers: ScheduledBarriers,
status: Arc<ArcSwap<BarrierManagerStatus>>,
pub(super) metadata_manager: MetadataManager,
hummock_manager: HummockManagerRef,
source_manager: SourceManagerRef,
scale_controller: ScaleControllerRef,
pub(super) env: MetaSrvEnv,
barrier_scheduler: BarrierScheduler,
}
Fields§
§scheduled_barriers: ScheduledBarriers
§status: Arc<ArcSwap<BarrierManagerStatus>>
§metadata_manager: MetadataManager
§hummock_manager: HummockManagerRef
§source_manager: SourceManagerRef
§scale_controller: ScaleControllerRef
§env: MetaSrvEnv
§barrier_scheduler: BarrierScheduler
Barrier scheduler for scheduling load finish commands
Implementations§
Source§impl GlobalBarrierWorkerContextImpl
impl GlobalBarrierWorkerContextImpl
fn set_status(&self, new_status: BarrierManagerStatus)
Source§impl GlobalBarrierWorkerContextImpl
impl GlobalBarrierWorkerContextImpl
Sourceasync fn clean_dirty_streaming_jobs(
&self,
database_id: Option<DatabaseId>,
) -> MetaResult<()>
async fn clean_dirty_streaming_jobs( &self, database_id: Option<DatabaseId>, ) -> MetaResult<()>
Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
async fn purge_state_table_from_hummock( &self, all_state_table_ids: &HashSet<TableId>, ) -> MetaResult<()>
async fn list_background_job_progress( &self, ) -> MetaResult<Vec<(String, StreamJobFragments)>>
Sourceasync fn resolve_graph_info(
&self,
database_id: Option<DatabaseId>,
) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>>
async fn resolve_graph_info( &self, database_id: Option<DatabaseId>, ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>>
Resolve actor information from cluster, fragment manager and ChangedTableId
.
We use changed_table_id
to modify the actors to be sent or collected. Because these actor
will create or drop before this barrier flow through them.
fn resolve_hummock_version_epochs( background_jobs: &HashMap<TableId, (String, StreamJobFragments)>, version: &HummockVersion, ) -> MetaResult<(HashMap<TableId, u64>, HashMap<TableId, Vec<(Vec<u64>, u64)>>)>
Sourceasync fn recovery_table_with_upstream_sinks(
&self,
inflight_jobs: &mut HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>,
) -> MetaResult<()>
async fn recovery_table_with_upstream_sinks( &self, inflight_jobs: &mut HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>, ) -> MetaResult<()>
For normal DDL operations, the UpstreamSinkUnion
operator is modified dynamically, and does not persist the
newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
information required by the operator based on the current state of the upstream (sink) and downstream (table) of
the operator.
pub(super) async fn reload_runtime_info_impl( &self, ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>
pub(super) async fn reload_database_runtime_info_impl( &self, database_id: DatabaseId, ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>>
Source§impl GlobalBarrierWorkerContextImpl
impl GlobalBarrierWorkerContextImpl
const RECOVERY_FORCE_MIGRATION_TIMEOUT: Duration
Sourceasync fn migrate_actors(
&self,
active_nodes: &mut ActiveStreamingWorkerNodes,
) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>>
async fn migrate_actors( &self, active_nodes: &mut ActiveStreamingWorkerNodes, ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, InflightStreamingJobInfo>>>
Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated.
async fn scale_actors( &self, active_nodes: &ActiveStreamingWorkerNodes, ) -> MetaResult<()>
fn derive_target_parallelism( available_parallelism: usize, assigned_parallelism: TableParallelism, actual_fragment_parallelism: Option<usize>, default_parallelism: DefaultParallelism, ) -> TableParallelism
Sourceasync fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>>
async fn load_all_actors(&self) -> MetaResult<HashMap<ActorId, StreamActor>>
Update all actors in compute nodes.
Source§impl GlobalBarrierWorkerContextImpl
impl GlobalBarrierWorkerContextImpl
pub(super) fn new( scheduled_barriers: ScheduledBarriers, status: Arc<ArcSwap<BarrierManagerStatus>>, metadata_manager: MetadataManager, hummock_manager: HummockManagerRef, source_manager: SourceManagerRef, scale_controller: ScaleControllerRef, env: MetaSrvEnv, barrier_scheduler: BarrierScheduler, ) -> Self
pub(super) fn status(&self) -> Arc<ArcSwap<BarrierManagerStatus>> ⓘ
Source§impl GlobalBarrierWorkerContextImpl
impl GlobalBarrierWorkerContextImpl
pub(super) async fn new_control_stream_impl( &self, node: &WorkerNode, init_request: &PbInitRequest, ) -> MetaResult<StreamingControlHandle>
Trait Implementations§
Source§impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl
impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl
async fn commit_epoch( &self, commit_info: CommitEpochInfo, ) -> MetaResult<HummockVersionStats>
async fn next_scheduled(&self) -> Scheduled
fn abort_and_mark_blocked( &self, database_id: Option<DatabaseId>, recovery_reason: RecoveryReason, )
fn mark_ready(&self, options: MarkReadyOptions)
async fn post_collect_command<'a>( &'a self, command: &'a CommandContext, ) -> MetaResult<()>
async fn notify_creating_job_failed( &self, database_id: Option<DatabaseId>, err: String, )
async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()>
async fn finish_cdc_table_backfill(&self, job: TableId) -> MetaResult<()>
async fn new_control_stream( &self, node: &WorkerNode, init_request: &PbInitRequest, ) -> MetaResult<StreamingControlHandle>
async fn reload_runtime_info( &self, ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>
async fn reload_database_runtime_info( &self, database_id: DatabaseId, ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>>
async fn handle_load_finished_source_ids( &self, load_finished_source_ids: Vec<u32>, ) -> MetaResult<()>
Auto Trait Implementations§
impl Freeze for GlobalBarrierWorkerContextImpl
impl !RefUnwindSafe for GlobalBarrierWorkerContextImpl
impl Send for GlobalBarrierWorkerContextImpl
impl Sync for GlobalBarrierWorkerContextImpl
impl Unpin for GlobalBarrierWorkerContextImpl
impl !UnwindSafe for GlobalBarrierWorkerContextImpl
Blanket Implementations§
§impl<T> AsAny for T
impl<T> AsAny for T
§fn any_ref(&self) -> &(dyn Any + Send + Sync + 'static)
fn any_ref(&self) -> &(dyn Any + Send + Sync + 'static)
dyn Any
reference to the object: Read more§fn as_any(self: Arc<T>) -> Arc<dyn Any + Send + Sync>
fn as_any(self: Arc<T>) -> Arc<dyn Any + Send + Sync>
Arc<dyn Any>
reference to the object: Read more§fn into_any(self: Box<T>) -> Box<dyn Any + Send + Sync>
fn into_any(self: Box<T>) -> Box<dyn Any + Send + Sync>
Box<dyn Any>
: Read more§fn type_name(&self) -> &'static str
fn type_name(&self) -> &'static str
std::any::type_name
, since Any
does not provide it and
Any::type_id
is useless as a debugging aid (its Debug
is just a mess of hex digits).Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
Source§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
Source§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level
.Source§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
.Source§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
with metric_level
set to
MetricLevel::Debug
and relabel_num
set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Scope for T
impl<T> Scope for T
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.