pub(super) struct GlobalBarrierWorker<C> {
enable_recovery: bool,
periodic_barriers: PeriodicBarriers,
in_flight_barrier_nums: usize,
pub(super) context: Arc<C>,
env: MetaSrvEnv,
checkpoint_control: CheckpointControl,
completing_task: CompletingTask,
request_rx: UnboundedReceiver<BarrierManagerRequest>,
active_streaming_nodes: ActiveStreamingWorkerNodes,
sink_manager: SinkCoordinatorManager,
control_stream_manager: ControlStreamManager,
}
Expand description
crate::barrier::worker::GlobalBarrierWorker
sends barriers to all registered compute nodes and
collect them, with monotonic increasing epoch numbers. On compute nodes, LocalBarrierManager
in risingwave_stream
crate will serve these requests and dispatch them to source actors.
Configuration change in our system is achieved by the mutation in the barrier. Thus,
crate::barrier::worker::GlobalBarrierWorker
provides a set of interfaces like a state machine,
accepting crate::barrier::command::Command
that carries info to build Mutation
. To keep the consistency between
barrier manager and meta store, some actions like “drop materialized view” or “create mv on mv”
must be done in barrier manager transactional using crate::barrier::command::Command
.
Fields§
§enable_recovery: bool
Enable recovery or not when failover.
periodic_barriers: PeriodicBarriers
The queue of scheduled barriers.
in_flight_barrier_nums: usize
The max barrier nums in flight
context: Arc<C>
§env: MetaSrvEnv
§checkpoint_control: CheckpointControl
§completing_task: CompletingTask
Command that has been collected but is still completing. The join handle of the completing future is stored.
request_rx: UnboundedReceiver<BarrierManagerRequest>
§active_streaming_nodes: ActiveStreamingWorkerNodes
§sink_manager: SinkCoordinatorManager
§control_stream_manager: ControlStreamManager
Implementations§
source§impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl>
impl GlobalBarrierWorker<GlobalBarrierWorkerContextImpl>
sourcepub async fn new(
scheduled_barriers: ScheduledBarriers,
env: MetaSrvEnv,
metadata_manager: MetadataManager,
hummock_manager: HummockManagerRef,
source_manager: SourceManagerRef,
sink_manager: SinkCoordinatorManager,
scale_controller: ScaleControllerRef,
request_rx: UnboundedReceiver<BarrierManagerRequest>,
) -> Self
pub async fn new( scheduled_barriers: ScheduledBarriers, env: MetaSrvEnv, metadata_manager: MetadataManager, hummock_manager: HummockManagerRef, source_manager: SourceManagerRef, sink_manager: SinkCoordinatorManager, scale_controller: ScaleControllerRef, request_rx: UnboundedReceiver<BarrierManagerRequest>, ) -> Self
Create a new crate::barrier::worker::GlobalBarrierWorker
.
pub fn start(self) -> (JoinHandle<()>, Sender<()>)
sourceasync fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool>
async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool>
Check whether we should pause on bootstrap from the system parameter and reset it.
sourceasync fn run(self, shutdown_rx: Receiver<()>)
async fn run(self, shutdown_rx: Receiver<()>)
Start an infinite loop to take scheduled barriers and send them.
async fn run_inner(self, shutdown_rx: Receiver<()>)
source§impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C>
impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C>
sourcepub async fn clear_on_err(&mut self, err: &MetaError)
pub async fn clear_on_err(&mut self, err: &MetaError)
We need to make sure there are no changes when doing recovery
source§impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C>
impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C>
sourceasync fn failure_recovery(&mut self, err: MetaError)
async fn failure_recovery(&mut self, err: MetaError)
Set barrier manager status.
async fn adhoc_recovery(&mut self)
source§impl<C> GlobalBarrierWorker<C>
impl<C> GlobalBarrierWorker<C>
sourcepub(super) fn report_collect_failure(&self, error: &MetaError)
pub(super) fn report_collect_failure(&self, error: &MetaError)
Send barrier-complete-rpc and wait for responses from all CNs
source§impl<C> GlobalBarrierWorker<C>
impl<C> GlobalBarrierWorker<C>
const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20u64
const RECOVERY_RETRY_MAX_INTERVAL: Duration = _
sourcefn get_retry_strategy() -> impl Iterator<Item = Duration>
fn get_retry_strategy() -> impl Iterator<Item = Duration>
Initialize a retry strategy for operation in recovery.
source§impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C>
impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C>
sourcepub async fn recovery(
&mut self,
paused_reason: Option<PausedReason>,
err: Option<MetaError>,
recovery_reason: RecoveryReason,
)
pub async fn recovery( &mut self, paused_reason: Option<PausedReason>, err: Option<MetaError>, recovery_reason: RecoveryReason, )
Recovery the whole cluster from the latest epoch.
If paused_reason
is Some
, all data sources (including connectors and DMLs) will be
immediately paused after recovery, until the user manually resume them either by restarting
the cluster or risectl
command. Used for debugging purpose.
Returns the new state of the barrier manager after recovery.
async fn recovery_inner( &mut self, paused_reason: Option<PausedReason>, err: Option<MetaError>, )
Auto Trait Implementations§
impl<C> Freeze for GlobalBarrierWorker<C>
impl<C> !RefUnwindSafe for GlobalBarrierWorker<C>
impl<C> Send for GlobalBarrierWorker<C>
impl<C> !Sync for GlobalBarrierWorker<C>
impl<C> Unpin for GlobalBarrierWorker<C>
impl<C> !UnwindSafe for GlobalBarrierWorker<C>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T> IntoResult<T> for T
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
source§impl<M> MetricVecRelabelExt for M
impl<M> MetricVecRelabelExt for M
source§fn relabel(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level
.source§fn relabel_n(
self,
metric_level: MetricLevel,
relabel_threshold: MetricLevel,
relabel_num: usize,
) -> RelabeledMetricVec<M>
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
.source§fn relabel_debug_1(
self,
relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n
with metric_level
set to
MetricLevel::Debug
and relabel_num
set to 1.§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.