risingwave_frontend/scheduler/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Fragment and schedule batch queries.
16
17use std::sync::Arc;
18use std::time::Duration;
19
20use futures::Stream;
21use risingwave_batch::task::BatchTaskContext;
22use risingwave_common::array::DataChunk;
23
24use crate::error::Result;
25use crate::session::SessionImpl;
26
27mod distributed;
28pub use distributed::*;
29pub mod plan_fragmenter;
30pub use plan_fragmenter::BatchPlanFragmenter;
31mod snapshot;
32pub use snapshot::*;
33mod local;
34pub use local::*;
35mod fast_insert;
36pub use fast_insert::*;
37
38use crate::scheduler::task_context::FrontendBatchTaskContext;
39
40mod error;
41pub mod streaming_manager;
42mod task_context;
43
44pub use self::error::SchedulerError;
45pub type SchedulerResult<T> = std::result::Result<T, SchedulerError>;
46
47pub trait DataChunkStream = Stream<Item = Result<DataChunk>>;
48
49/// Context for mpp query execution.
50pub struct ExecutionContext {
51    session: Arc<SessionImpl>,
52    timeout: Option<Duration>,
53}
54
55pub type ExecutionContextRef = Arc<ExecutionContext>;
56
57impl ExecutionContext {
58    pub fn new(session: Arc<SessionImpl>, timeout: Option<Duration>) -> Self {
59        Self { session, timeout }
60    }
61
62    pub fn session(&self) -> &SessionImpl {
63        &self.session
64    }
65
66    pub fn timeout(&self) -> Option<Duration> {
67        self.timeout
68    }
69
70    pub fn to_batch_task_context(&self) -> Arc<dyn BatchTaskContext> {
71        FrontendBatchTaskContext::create(self.session.clone())
72    }
73}