risingwave_frontend/scheduler/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Fragment and schedule batch queries.

use std::sync::Arc;
use std::time::Duration;

use futures::Stream;
use risingwave_common::array::DataChunk;

use crate::error::Result;
use crate::session::SessionImpl;

mod distributed;
pub use distributed::*;
pub mod plan_fragmenter;
pub use plan_fragmenter::BatchPlanFragmenter;
mod snapshot;
pub use snapshot::*;
mod local;
pub use local::*;

use crate::scheduler::task_context::FrontendBatchTaskContext;

mod error;
pub mod streaming_manager;
mod task_context;

pub use self::error::SchedulerError;
pub type SchedulerResult<T> = std::result::Result<T, SchedulerError>;

pub trait DataChunkStream = Stream<Item = Result<DataChunk>>;

/// Context for mpp query execution.
pub struct ExecutionContext {
    session: Arc<SessionImpl>,
    timeout: Option<Duration>,
}

pub type ExecutionContextRef = Arc<ExecutionContext>;

impl ExecutionContext {
    pub fn new(session: Arc<SessionImpl>, timeout: Option<Duration>) -> Self {
        Self { session, timeout }
    }

    pub fn session(&self) -> &SessionImpl {
        &self.session
    }

    pub fn timeout(&self) -> Option<Duration> {
        self.timeout
    }

    pub fn to_batch_task_context(&self) -> FrontendBatchTaskContext {
        FrontendBatchTaskContext::new(self.session.clone())
    }
}