risingwave_stream/task/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};

use anyhow::anyhow;
use parking_lot::{MappedMutexGuard, Mutex, MutexGuard, RwLock};
use risingwave_common::config::StreamingConfig;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::common::ActorInfo;
use risingwave_rpc_client::ComputeClientPoolRef;

use crate::error::StreamResult;
use crate::executor::exchange::permit::{self, Receiver, Sender};

mod barrier_manager;
mod env;
mod stream_manager;

pub use barrier_manager::*;
pub use env::*;
pub use stream_manager::*;

pub type ConsumableChannelPair = (Option<Sender>, Option<Receiver>);
pub type ActorId = u32;
pub type FragmentId = u32;
pub type DispatcherId = u64;
pub type UpDownActorIds = (ActorId, ActorId);
pub type UpDownFragmentIds = (FragmentId, FragmentId);

#[derive(Hash, Eq, PartialEq, Copy, Clone, Debug)]
struct PartialGraphId(u64);

impl PartialGraphId {
    fn new(id: u64) -> Self {
        Self(id)
    }
}

impl From<PartialGraphId> for u64 {
    fn from(val: PartialGraphId) -> u64 {
        val.0
    }
}

/// Stores the information which may be modified from the data plane.
///
/// The data structure is created in `LocalBarrierWorker` and is shared by actors created
/// between two recoveries. In every recovery, the `LocalBarrierWorker` will create a new instance of
/// `SharedContext`, and the original one becomes stale. The new one is shared by actors created after
/// recovery.
pub struct SharedContext {
    /// Stores the senders and receivers for later `Processor`'s usage.
    ///
    /// Each actor has several senders and several receivers. Senders and receivers are created
    /// during `update_actors` and stored in a channel map. Upon `build_actors`, all these channels
    /// will be taken out and built into the executors and outputs.
    /// One sender or one receiver can be uniquely determined by the upstream and downstream actor
    /// id.
    ///
    /// There are three cases when we need local channels to pass around messages:
    /// 1. pass `Message` between two local actors
    /// 2. The RPC client at the downstream actor forwards received `Message` to one channel in
    ///    `ReceiverExecutor` or `MergerExecutor`.
    /// 3. The RPC `Output` at the upstream actor forwards received `Message` to
    ///    `ExchangeServiceImpl`.
    ///
    /// The channel serves as a buffer because `ExchangeServiceImpl`
    /// is on the server-side and we will also introduce backpressure.
    pub(crate) channel_map: Mutex<HashMap<UpDownActorIds, ConsumableChannelPair>>,

    /// Stores all actor information.
    pub(crate) actor_infos: RwLock<HashMap<ActorId, ActorInfo>>,

    /// Stores the local address.
    ///
    /// It is used to test whether an actor is local or not,
    /// thus determining whether we should setup local channel only or remote rpc connection
    /// between two actors/actors.
    pub(crate) addr: HostAddr,

    /// Compute client pool for streaming gRPC exchange.
    // TODO: currently the client pool won't be cleared. Should remove compute clients when
    // disconnected.
    pub(crate) compute_client_pool: ComputeClientPoolRef,

    pub(crate) config: StreamingConfig,

    pub(super) local_barrier_manager: LocalBarrierManager,
}

impl std::fmt::Debug for SharedContext {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("SharedContext")
            .field("addr", &self.addr)
            .finish_non_exhaustive()
    }
}

impl SharedContext {
    pub fn new(env: &StreamEnvironment, local_barrier_manager: LocalBarrierManager) -> Self {
        Self {
            channel_map: Default::default(),
            actor_infos: Default::default(),
            addr: env.server_address().clone(),
            config: env.config().as_ref().to_owned(),
            compute_client_pool: env.client_pool(),
            local_barrier_manager,
        }
    }

    #[cfg(test)]
    pub fn for_test() -> Self {
        use std::sync::Arc;

        use risingwave_common::config::StreamingDeveloperConfig;
        use risingwave_rpc_client::ComputeClientPool;

        Self {
            channel_map: Default::default(),
            actor_infos: Default::default(),
            addr: LOCAL_TEST_ADDR.clone(),
            config: StreamingConfig {
                developer: StreamingDeveloperConfig {
                    exchange_initial_permits: permit::for_test::INITIAL_PERMITS,
                    exchange_batched_permits: permit::for_test::BATCHED_PERMITS,
                    exchange_concurrent_barriers: permit::for_test::CONCURRENT_BARRIERS,
                    ..Default::default()
                },
                ..Default::default()
            },
            compute_client_pool: Arc::new(ComputeClientPool::for_test()),
            local_barrier_manager: LocalBarrierManager::for_test(),
        }
    }

    /// Get the channel pair for the given actor ids. If the channel pair does not exist, create one
    /// with the configured permits.
    fn get_or_insert_channels(
        &self,
        ids: UpDownActorIds,
    ) -> MappedMutexGuard<'_, ConsumableChannelPair> {
        MutexGuard::map(self.channel_map.lock(), |map| {
            map.entry(ids).or_insert_with(|| {
                let (tx, rx) = permit::channel(
                    self.config.developer.exchange_initial_permits,
                    self.config.developer.exchange_batched_permits,
                    self.config.developer.exchange_concurrent_barriers,
                );
                (Some(tx), Some(rx))
            })
        })
    }

    pub fn take_sender(&self, ids: &UpDownActorIds) -> StreamResult<Sender> {
        self.get_or_insert_channels(*ids)
            .0
            .take()
            .ok_or_else(|| anyhow!("sender for {ids:?} has already been taken").into())
    }

    pub fn take_receiver(&self, ids: UpDownActorIds) -> StreamResult<Receiver> {
        self.get_or_insert_channels(ids)
            .1
            .take()
            .ok_or_else(|| anyhow!("receiver for {ids:?} has already been taken").into())
    }

    pub fn get_actor_info(&self, actor_id: &ActorId) -> StreamResult<ActorInfo> {
        self.actor_infos
            .read()
            .get(actor_id)
            .cloned()
            .ok_or_else(|| anyhow!("actor {} not found in info table", actor_id).into())
    }

    pub fn config(&self) -> &StreamingConfig {
        &self.config
    }

    pub(super) fn drop_actors(&self, actors: &HashSet<ActorId>) {
        self.channel_map
            .lock()
            .retain(|(up_id, _), _| !actors.contains(up_id));
        let mut actor_infos = self.actor_infos.write();
        for actor_id in actors {
            actor_infos.remove(actor_id);
        }
    }
}

/// Generate a globally unique executor id.
pub fn unique_executor_id(actor_id: u32, operator_id: u64) -> u64 {
    assert!(operator_id <= u32::MAX as u64);
    ((actor_id as u64) << 32) + operator_id
}

/// Generate a globally unique operator id.
pub fn unique_operator_id(fragment_id: u32, operator_id: u64) -> u64 {
    assert!(operator_id <= u32::MAX as u64);
    ((fragment_id as u64) << 32) + operator_id
}