1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct InjectBarrierRequest {
5 #[prost(string, tag = "1")]
6 pub request_id: ::prost::alloc::string::String,
7 #[prost(message, optional, tag = "2")]
8 pub barrier: ::core::option::Option<super::stream_plan::Barrier>,
9 #[prost(uint32, repeated, tag = "4", wrapper = "crate::id::ActorId")]
10 pub actor_ids_to_collect: ::prost::alloc::vec::Vec<crate::id::ActorId>,
11 #[prost(uint32, repeated, tag = "5", wrapper = "crate::id::TableId")]
12 pub table_ids_to_sync: ::prost::alloc::vec::Vec<crate::id::TableId>,
13 #[prost(uint64, tag = "6", wrapper = "crate::id::PartialGraphId")]
14 pub partial_graph_id: crate::id::PartialGraphId,
15 #[prost(message, repeated, tag = "9")]
16 pub actors_to_build: ::prost::alloc::vec::Vec<
17 inject_barrier_request::FragmentBuildActorInfo,
18 >,
19}
20pub mod inject_barrier_request {
22 #[derive(prost_helpers::AnyPB)]
23 #[derive(Clone, PartialEq, ::prost::Message)]
24 pub struct FragmentBuildActorInfo {
25 #[prost(uint32, tag = "1", wrapper = "crate::id::FragmentId")]
26 pub fragment_id: crate::id::FragmentId,
27 #[prost(message, optional, tag = "2")]
28 pub node: ::core::option::Option<super::super::stream_plan::StreamNode>,
29 #[prost(message, repeated, tag = "3")]
30 pub actors: ::prost::alloc::vec::Vec<BuildActorInfo>,
31 }
32 #[derive(prost_helpers::AnyPB)]
33 #[derive(Clone, PartialEq, ::prost::Message)]
34 pub struct BuildActorInfo {
35 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
36 pub actor_id: crate::id::ActorId,
37 #[prost(map = "uint32, message", tag = "2", wrapper = "crate::id::FragmentId")]
38 pub fragment_upstreams: ::std::collections::HashMap<
39 crate::id::FragmentId,
40 build_actor_info::UpstreamActors,
41 >,
42 #[prost(message, repeated, tag = "3")]
43 pub dispatchers: ::prost::alloc::vec::Vec<super::super::stream_plan::Dispatcher>,
44 #[prost(message, optional, tag = "4")]
45 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
46 #[prost(string, tag = "5")]
47 pub mview_definition: ::prost::alloc::string::String,
48 #[prost(message, optional, tag = "6")]
49 pub expr_context: ::core::option::Option<super::super::plan_common::ExprContext>,
50 #[prost(string, tag = "9")]
51 pub config_override: ::prost::alloc::string::String,
52 #[prost(uint32, repeated, tag = "7", wrapper = "crate::id::SubscriberId")]
53 pub initial_subscriber_ids: ::prost::alloc::vec::Vec<crate::id::SubscriberId>,
54 }
55 pub mod build_actor_info {
57 #[derive(prost_helpers::AnyPB)]
58 #[derive(Clone, PartialEq, ::prost::Message)]
59 pub struct UpstreamActors {
60 #[prost(message, repeated, tag = "1")]
61 pub actors: ::prost::alloc::vec::Vec<super::super::super::common::ActorInfo>,
62 }
63 }
64}
65#[derive(prost_helpers::AnyPB)]
66#[derive(Clone, PartialEq, ::prost::Message)]
67pub struct BarrierCompleteResponse {
68 #[prost(string, tag = "1")]
69 pub request_id: ::prost::alloc::string::String,
70 #[prost(message, optional, tag = "2")]
71 pub status: ::core::option::Option<super::common::Status>,
72 #[prost(message, repeated, tag = "3")]
73 pub create_mview_progress: ::prost::alloc::vec::Vec<
74 barrier_complete_response::CreateMviewProgress,
75 >,
76 #[prost(message, repeated, tag = "4")]
77 pub synced_sstables: ::prost::alloc::vec::Vec<
78 barrier_complete_response::LocalSstableInfo,
79 >,
80 #[prost(uint32, tag = "5", wrapper = "crate::id::WorkerId")]
81 pub worker_id: crate::id::WorkerId,
82 #[prost(map = "uint32, message", tag = "6", wrapper = "crate::id::TableId")]
83 pub table_watermarks: ::std::collections::HashMap<
84 crate::id::TableId,
85 super::hummock::TableWatermarks,
86 >,
87 #[prost(message, repeated, tag = "7")]
88 pub old_value_sstables: ::prost::alloc::vec::Vec<super::hummock::SstableInfo>,
89 #[prost(uint64, tag = "8", wrapper = "crate::id::PartialGraphId")]
90 pub partial_graph_id: crate::id::PartialGraphId,
91 #[prost(uint64, tag = "9")]
93 pub epoch: u64,
94 #[prost(message, repeated, tag = "11")]
95 pub load_finished_sources: ::prost::alloc::vec::Vec<
96 barrier_complete_response::LoadFinishedSource,
97 >,
98 #[prost(map = "uint32, message", tag = "12", wrapper = "crate::id::TableId")]
99 pub vector_index_adds: ::std::collections::HashMap<
100 crate::id::TableId,
101 super::hummock::vector_index_delta::VectorIndexAdds,
102 >,
103 #[prost(message, repeated, tag = "13")]
104 pub cdc_table_backfill_progress: ::prost::alloc::vec::Vec<
105 barrier_complete_response::CdcTableBackfillProgress,
106 >,
107 #[prost(uint32, repeated, tag = "14", wrapper = "crate::id::TableId")]
111 pub truncate_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
112 #[prost(uint32, repeated, tag = "15", wrapper = "crate::id::TableId")]
116 pub refresh_finished_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
117 #[prost(message, repeated, tag = "16")]
118 pub list_finished_sources: ::prost::alloc::vec::Vec<
119 barrier_complete_response::ListFinishedSource,
120 >,
121 #[prost(message, repeated, tag = "17")]
122 pub cdc_source_offset_updated: ::prost::alloc::vec::Vec<
123 barrier_complete_response::CdcSourceOffsetUpdated,
124 >,
125 #[prost(message, repeated, tag = "18")]
126 pub iceberg_v3_sink_metadata: ::prost::alloc::vec::Vec<
127 barrier_complete_response::IcebergV3SinkMetadata,
128 >,
129}
130pub mod barrier_complete_response {
132 #[derive(prost_helpers::AnyPB)]
133 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
134 pub struct CreateMviewProgress {
135 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
138 pub backfill_actor_id: crate::id::ActorId,
139 #[prost(bool, tag = "2")]
140 pub done: bool,
141 #[prost(uint64, tag = "3")]
143 pub consumed_epoch: u64,
144 #[prost(uint64, tag = "4")]
146 pub consumed_rows: u64,
147 #[prost(uint64, tag = "5")]
148 pub pending_epoch_lag: u64,
149 #[prost(uint64, tag = "6")]
151 pub buffered_rows: u64,
152 #[prost(uint32, tag = "7", wrapper = "crate::id::FragmentId")]
153 pub fragment_id: crate::id::FragmentId,
154 }
155 #[derive(prost_helpers::AnyPB)]
156 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
157 pub struct CdcTableBackfillProgress {
158 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
159 pub actor_id: crate::id::ActorId,
160 #[prost(uint64, tag = "2")]
161 pub epoch: u64,
162 #[prost(bool, tag = "3")]
163 pub done: bool,
164 #[prost(int64, tag = "4")]
165 pub split_id_start_inclusive: i64,
166 #[prost(int64, tag = "5")]
167 pub split_id_end_inclusive: i64,
168 #[prost(uint64, tag = "6")]
169 pub generation: u64,
170 #[prost(uint32, tag = "7", wrapper = "crate::id::FragmentId")]
171 pub fragment_id: crate::id::FragmentId,
172 }
173 #[derive(prost_helpers::AnyPB)]
174 #[derive(Clone, PartialEq, ::prost::Message)]
175 pub struct LocalSstableInfo {
176 #[prost(message, optional, tag = "2")]
177 pub sst: ::core::option::Option<super::super::hummock::SstableInfo>,
178 #[prost(map = "uint32, message", tag = "3", wrapper = "crate::id::TableId")]
179 pub table_stats_map: ::std::collections::HashMap<
180 crate::id::TableId,
181 super::super::hummock::TableStats,
182 >,
183 #[prost(uint64, tag = "4")]
184 pub created_at: u64,
185 }
186 #[derive(prost_helpers::AnyPB)]
190 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
191 pub struct LoadFinishedSource {
192 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
194 pub reporter_actor_id: crate::id::ActorId,
195 #[prost(uint32, tag = "2", wrapper = "crate::id::TableId")]
197 pub table_id: crate::id::TableId,
198 #[prost(uint32, tag = "3", wrapper = "crate::id::SourceId")]
200 pub associated_source_id: crate::id::SourceId,
201 }
202 #[derive(prost_helpers::AnyPB)]
205 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
206 pub struct ListFinishedSource {
207 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
209 pub reporter_actor_id: crate::id::ActorId,
210 #[prost(uint32, tag = "2", wrapper = "crate::id::TableId")]
212 pub table_id: crate::id::TableId,
213 #[prost(uint32, tag = "3", wrapper = "crate::id::SourceId")]
215 pub associated_source_id: crate::id::SourceId,
216 }
217 #[derive(prost_helpers::AnyPB)]
220 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
221 pub struct CdcSourceOffsetUpdated {
222 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
224 pub reporter_actor_id: crate::id::ActorId,
225 #[prost(uint32, tag = "2", wrapper = "crate::id::SourceId")]
227 pub source_id: crate::id::SourceId,
228 }
229 #[derive(prost_helpers::AnyPB)]
230 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
231 pub struct IcebergV3SinkMetadata {
232 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
233 pub reporter_actor_id: crate::id::ActorId,
234 #[prost(uint32, tag = "2", wrapper = "crate::id::SinkId")]
235 pub sink_id: crate::id::SinkId,
236 #[prost(uint64, tag = "3")]
237 pub prev_epoch: u64,
238 #[prost(enumeration = "super::PbIcebergV3SinkRole", tag = "4")]
239 pub role: i32,
240 #[prost(message, optional, tag = "5")]
241 pub metadata: ::core::option::Option<
242 super::super::connector_service::SinkMetadata,
243 >,
244 }
245}
246#[derive(prost_helpers::AnyPB)]
247#[derive(Clone, PartialEq, ::prost::Message)]
248pub struct StreamingControlStreamRequest {
249 #[prost(oneof = "streaming_control_stream_request::Request", tags = "1, 2, 3, 4, 5")]
250 pub request: ::core::option::Option<streaming_control_stream_request::Request>,
251}
252pub mod streaming_control_stream_request {
254 #[derive(prost_helpers::AnyPB)]
255 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
256 pub struct InitRequest {
257 #[prost(string, tag = "1")]
258 pub term_id: ::prost::alloc::string::String,
259 }
260 #[derive(prost_helpers::AnyPB)]
261 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
262 pub struct CreatePartialGraphRequest {
263 #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
264 pub partial_graph_id: crate::id::PartialGraphId,
265 }
266 #[derive(prost_helpers::AnyPB)]
267 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
268 pub struct RemovePartialGraphRequest {
269 #[prost(uint64, repeated, tag = "1", wrapper = "crate::id::PartialGraphId")]
270 pub partial_graph_ids: ::prost::alloc::vec::Vec<crate::id::PartialGraphId>,
271 }
272 #[derive(prost_helpers::AnyPB)]
273 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
274 pub struct ResetPartialGraphsRequest {
275 #[prost(uint64, repeated, tag = "1", wrapper = "crate::id::PartialGraphId")]
276 pub partial_graph_ids: ::prost::alloc::vec::Vec<crate::id::PartialGraphId>,
277 }
278 #[derive(prost_helpers::AnyPB)]
279 #[derive(Clone, PartialEq, ::prost::Oneof)]
280 pub enum Request {
281 #[prost(message, tag = "1")]
282 Init(InitRequest),
283 #[prost(message, tag = "2")]
284 InjectBarrier(super::InjectBarrierRequest),
285 #[prost(message, tag = "3")]
286 RemovePartialGraph(RemovePartialGraphRequest),
287 #[prost(message, tag = "4")]
288 CreatePartialGraph(CreatePartialGraphRequest),
289 #[prost(message, tag = "5")]
290 ResetPartialGraphs(ResetPartialGraphsRequest),
291 }
292}
293#[derive(prost_helpers::AnyPB)]
294#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
295pub struct ScoredError {
296 #[prost(string, tag = "1")]
297 pub err_msg: ::prost::alloc::string::String,
298 #[prost(int32, tag = "2")]
299 pub score: i32,
300}
301#[derive(prost_helpers::AnyPB)]
302#[derive(Clone, PartialEq, ::prost::Message)]
303pub struct StreamingControlStreamResponse {
304 #[prost(
305 oneof = "streaming_control_stream_response::Response",
306 tags = "1, 2, 3, 4, 5"
307 )]
308 pub response: ::core::option::Option<streaming_control_stream_response::Response>,
309}
310pub mod streaming_control_stream_response {
312 #[derive(prost_helpers::AnyPB)]
313 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
314 pub struct InitResponse {}
315 #[derive(prost_helpers::AnyPB)]
316 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
317 pub struct ShutdownResponse {}
318 #[derive(prost_helpers::AnyPB)]
319 #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
320 pub struct ReportPartialGraphFailureResponse {
321 #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
322 pub partial_graph_id: crate::id::PartialGraphId,
323 }
324 #[derive(prost_helpers::AnyPB)]
325 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
326 pub struct ResetPartialGraphResponse {
327 #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
328 pub partial_graph_id: crate::id::PartialGraphId,
329 #[prost(message, optional, tag = "2")]
330 pub root_err: ::core::option::Option<super::ScoredError>,
331 }
332 #[derive(prost_helpers::AnyPB)]
333 #[derive(Clone, PartialEq, ::prost::Oneof)]
334 pub enum Response {
335 #[prost(message, tag = "1")]
336 Init(InitResponse),
337 #[prost(message, tag = "2")]
338 CompleteBarrier(super::BarrierCompleteResponse),
339 #[prost(message, tag = "3")]
340 Shutdown(ShutdownResponse),
341 #[prost(message, tag = "4")]
342 ReportPartialGraphFailure(ReportPartialGraphFailureResponse),
343 #[prost(message, tag = "5")]
344 ResetPartialGraph(ResetPartialGraphResponse),
345 }
346}
347#[derive(prost_helpers::AnyPB)]
348#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
349pub struct GetMinUncommittedObjectIdRequest {}
350#[derive(prost_helpers::AnyPB)]
351#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
352pub struct GetMinUncommittedObjectIdResponse {
353 #[prost(uint64, tag = "1", wrapper = "crate::id::HummockRawObjectId")]
354 pub min_uncommitted_object_id: crate::id::HummockRawObjectId,
355}
356#[derive(prost_helpers::AnyPB)]
357#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
358#[repr(i32)]
359pub enum PbIcebergV3SinkRole {
360 Unspecified = 0,
361 Writer = 1,
362 DvMerger = 2,
363}
364impl PbIcebergV3SinkRole {
365 pub fn as_str_name(&self) -> &'static str {
370 match self {
371 Self::Unspecified => "PB_ICEBERG_V3_SINK_ROLE_UNSPECIFIED",
372 Self::Writer => "PB_ICEBERG_V3_SINK_ROLE_WRITER",
373 Self::DvMerger => "PB_ICEBERG_V3_SINK_ROLE_DV_MERGER",
374 }
375 }
376 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
378 match value {
379 "PB_ICEBERG_V3_SINK_ROLE_UNSPECIFIED" => Some(Self::Unspecified),
380 "PB_ICEBERG_V3_SINK_ROLE_WRITER" => Some(Self::Writer),
381 "PB_ICEBERG_V3_SINK_ROLE_DV_MERGER" => Some(Self::DvMerger),
382 _ => None,
383 }
384 }
385}
386pub mod stream_service_client {
388 #![allow(
389 unused_variables,
390 dead_code,
391 missing_docs,
392 clippy::wildcard_imports,
393 clippy::let_unit_value,
394 )]
395 use tonic::codegen::*;
396 use tonic::codegen::http::Uri;
397 #[derive(Debug, Clone)]
398 pub struct StreamServiceClient<T> {
399 inner: tonic::client::Grpc<T>,
400 }
401 impl StreamServiceClient<tonic::transport::Channel> {
402 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
404 where
405 D: TryInto<tonic::transport::Endpoint>,
406 D::Error: Into<StdError>,
407 {
408 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
409 Ok(Self::new(conn))
410 }
411 }
412 impl<T> StreamServiceClient<T>
413 where
414 T: tonic::client::GrpcService<tonic::body::Body>,
415 T::Error: Into<StdError>,
416 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
417 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
418 {
419 pub fn new(inner: T) -> Self {
420 let inner = tonic::client::Grpc::new(inner);
421 Self { inner }
422 }
423 pub fn with_origin(inner: T, origin: Uri) -> Self {
424 let inner = tonic::client::Grpc::with_origin(inner, origin);
425 Self { inner }
426 }
427 pub fn with_interceptor<F>(
428 inner: T,
429 interceptor: F,
430 ) -> StreamServiceClient<InterceptedService<T, F>>
431 where
432 F: tonic::service::Interceptor,
433 T::ResponseBody: Default,
434 T: tonic::codegen::Service<
435 http::Request<tonic::body::Body>,
436 Response = http::Response<
437 <T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
438 >,
439 >,
440 <T as tonic::codegen::Service<
441 http::Request<tonic::body::Body>,
442 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
443 {
444 StreamServiceClient::new(InterceptedService::new(inner, interceptor))
445 }
446 #[must_use]
451 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
452 self.inner = self.inner.send_compressed(encoding);
453 self
454 }
455 #[must_use]
457 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
458 self.inner = self.inner.accept_compressed(encoding);
459 self
460 }
461 #[must_use]
465 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
466 self.inner = self.inner.max_decoding_message_size(limit);
467 self
468 }
469 #[must_use]
473 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
474 self.inner = self.inner.max_encoding_message_size(limit);
475 self
476 }
477 pub async fn streaming_control_stream(
478 &mut self,
479 request: impl tonic::IntoStreamingRequest<
480 Message = super::StreamingControlStreamRequest,
481 >,
482 ) -> std::result::Result<
483 tonic::Response<
484 tonic::codec::Streaming<super::StreamingControlStreamResponse>,
485 >,
486 tonic::Status,
487 > {
488 self.inner
489 .ready()
490 .await
491 .map_err(|e| {
492 tonic::Status::unknown(
493 format!("Service was not ready: {}", e.into()),
494 )
495 })?;
496 let codec = tonic_prost::ProstCodec::default();
497 let path = http::uri::PathAndQuery::from_static(
498 "/stream_service.StreamService/StreamingControlStream",
499 );
500 let mut req = request.into_streaming_request();
501 req.extensions_mut()
502 .insert(
503 GrpcMethod::new(
504 "stream_service.StreamService",
505 "StreamingControlStream",
506 ),
507 );
508 self.inner.streaming(req, path, codec).await
509 }
510 pub async fn get_min_uncommitted_object_id(
511 &mut self,
512 request: impl tonic::IntoRequest<super::GetMinUncommittedObjectIdRequest>,
513 ) -> std::result::Result<
514 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
515 tonic::Status,
516 > {
517 self.inner
518 .ready()
519 .await
520 .map_err(|e| {
521 tonic::Status::unknown(
522 format!("Service was not ready: {}", e.into()),
523 )
524 })?;
525 let codec = tonic_prost::ProstCodec::default();
526 let path = http::uri::PathAndQuery::from_static(
527 "/stream_service.StreamService/GetMinUncommittedObjectId",
528 );
529 let mut req = request.into_request();
530 req.extensions_mut()
531 .insert(
532 GrpcMethod::new(
533 "stream_service.StreamService",
534 "GetMinUncommittedObjectId",
535 ),
536 );
537 self.inner.unary(req, path, codec).await
538 }
539 }
540}
541pub mod stream_service_server {
543 #![allow(
544 unused_variables,
545 dead_code,
546 missing_docs,
547 clippy::wildcard_imports,
548 clippy::let_unit_value,
549 )]
550 use tonic::codegen::*;
551 #[async_trait]
553 pub trait StreamService: std::marker::Send + std::marker::Sync + 'static {
554 type StreamingControlStreamStream: tonic::codegen::tokio_stream::Stream<
556 Item = std::result::Result<
557 super::StreamingControlStreamResponse,
558 tonic::Status,
559 >,
560 >
561 + std::marker::Send
562 + 'static;
563 async fn streaming_control_stream(
564 &self,
565 request: tonic::Request<
566 tonic::Streaming<super::StreamingControlStreamRequest>,
567 >,
568 ) -> std::result::Result<
569 tonic::Response<Self::StreamingControlStreamStream>,
570 tonic::Status,
571 >;
572 async fn get_min_uncommitted_object_id(
573 &self,
574 request: tonic::Request<super::GetMinUncommittedObjectIdRequest>,
575 ) -> std::result::Result<
576 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
577 tonic::Status,
578 >;
579 }
580 #[derive(Debug)]
581 pub struct StreamServiceServer<T> {
582 inner: Arc<T>,
583 accept_compression_encodings: EnabledCompressionEncodings,
584 send_compression_encodings: EnabledCompressionEncodings,
585 max_decoding_message_size: Option<usize>,
586 max_encoding_message_size: Option<usize>,
587 }
588 impl<T> StreamServiceServer<T> {
589 pub fn new(inner: T) -> Self {
590 Self::from_arc(Arc::new(inner))
591 }
592 pub fn from_arc(inner: Arc<T>) -> Self {
593 Self {
594 inner,
595 accept_compression_encodings: Default::default(),
596 send_compression_encodings: Default::default(),
597 max_decoding_message_size: None,
598 max_encoding_message_size: None,
599 }
600 }
601 pub fn with_interceptor<F>(
602 inner: T,
603 interceptor: F,
604 ) -> InterceptedService<Self, F>
605 where
606 F: tonic::service::Interceptor,
607 {
608 InterceptedService::new(Self::new(inner), interceptor)
609 }
610 #[must_use]
612 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
613 self.accept_compression_encodings.enable(encoding);
614 self
615 }
616 #[must_use]
618 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
619 self.send_compression_encodings.enable(encoding);
620 self
621 }
622 #[must_use]
626 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
627 self.max_decoding_message_size = Some(limit);
628 self
629 }
630 #[must_use]
634 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
635 self.max_encoding_message_size = Some(limit);
636 self
637 }
638 }
639 impl<T, B> tonic::codegen::Service<http::Request<B>> for StreamServiceServer<T>
640 where
641 T: StreamService,
642 B: Body + std::marker::Send + 'static,
643 B::Error: Into<StdError> + std::marker::Send + 'static,
644 {
645 type Response = http::Response<tonic::body::Body>;
646 type Error = std::convert::Infallible;
647 type Future = BoxFuture<Self::Response, Self::Error>;
648 fn poll_ready(
649 &mut self,
650 _cx: &mut Context<'_>,
651 ) -> Poll<std::result::Result<(), Self::Error>> {
652 Poll::Ready(Ok(()))
653 }
654 fn call(&mut self, req: http::Request<B>) -> Self::Future {
655 match req.uri().path() {
656 "/stream_service.StreamService/StreamingControlStream" => {
657 #[allow(non_camel_case_types)]
658 struct StreamingControlStreamSvc<T: StreamService>(pub Arc<T>);
659 impl<
660 T: StreamService,
661 > tonic::server::StreamingService<
662 super::StreamingControlStreamRequest,
663 > for StreamingControlStreamSvc<T> {
664 type Response = super::StreamingControlStreamResponse;
665 type ResponseStream = T::StreamingControlStreamStream;
666 type Future = BoxFuture<
667 tonic::Response<Self::ResponseStream>,
668 tonic::Status,
669 >;
670 fn call(
671 &mut self,
672 request: tonic::Request<
673 tonic::Streaming<super::StreamingControlStreamRequest>,
674 >,
675 ) -> Self::Future {
676 let inner = Arc::clone(&self.0);
677 let fut = async move {
678 <T as StreamService>::streaming_control_stream(
679 &inner,
680 request,
681 )
682 .await
683 };
684 Box::pin(fut)
685 }
686 }
687 let accept_compression_encodings = self.accept_compression_encodings;
688 let send_compression_encodings = self.send_compression_encodings;
689 let max_decoding_message_size = self.max_decoding_message_size;
690 let max_encoding_message_size = self.max_encoding_message_size;
691 let inner = self.inner.clone();
692 let fut = async move {
693 let method = StreamingControlStreamSvc(inner);
694 let codec = tonic_prost::ProstCodec::default();
695 let mut grpc = tonic::server::Grpc::new(codec)
696 .apply_compression_config(
697 accept_compression_encodings,
698 send_compression_encodings,
699 )
700 .apply_max_message_size_config(
701 max_decoding_message_size,
702 max_encoding_message_size,
703 );
704 let res = grpc.streaming(method, req).await;
705 Ok(res)
706 };
707 Box::pin(fut)
708 }
709 "/stream_service.StreamService/GetMinUncommittedObjectId" => {
710 #[allow(non_camel_case_types)]
711 struct GetMinUncommittedObjectIdSvc<T: StreamService>(pub Arc<T>);
712 impl<
713 T: StreamService,
714 > tonic::server::UnaryService<
715 super::GetMinUncommittedObjectIdRequest,
716 > for GetMinUncommittedObjectIdSvc<T> {
717 type Response = super::GetMinUncommittedObjectIdResponse;
718 type Future = BoxFuture<
719 tonic::Response<Self::Response>,
720 tonic::Status,
721 >;
722 fn call(
723 &mut self,
724 request: tonic::Request<
725 super::GetMinUncommittedObjectIdRequest,
726 >,
727 ) -> Self::Future {
728 let inner = Arc::clone(&self.0);
729 let fut = async move {
730 <T as StreamService>::get_min_uncommitted_object_id(
731 &inner,
732 request,
733 )
734 .await
735 };
736 Box::pin(fut)
737 }
738 }
739 let accept_compression_encodings = self.accept_compression_encodings;
740 let send_compression_encodings = self.send_compression_encodings;
741 let max_decoding_message_size = self.max_decoding_message_size;
742 let max_encoding_message_size = self.max_encoding_message_size;
743 let inner = self.inner.clone();
744 let fut = async move {
745 let method = GetMinUncommittedObjectIdSvc(inner);
746 let codec = tonic_prost::ProstCodec::default();
747 let mut grpc = tonic::server::Grpc::new(codec)
748 .apply_compression_config(
749 accept_compression_encodings,
750 send_compression_encodings,
751 )
752 .apply_max_message_size_config(
753 max_decoding_message_size,
754 max_encoding_message_size,
755 );
756 let res = grpc.unary(method, req).await;
757 Ok(res)
758 };
759 Box::pin(fut)
760 }
761 _ => {
762 Box::pin(async move {
763 let mut response = http::Response::new(
764 tonic::body::Body::default(),
765 );
766 let headers = response.headers_mut();
767 headers
768 .insert(
769 tonic::Status::GRPC_STATUS,
770 (tonic::Code::Unimplemented as i32).into(),
771 );
772 headers
773 .insert(
774 http::header::CONTENT_TYPE,
775 tonic::metadata::GRPC_CONTENT_TYPE,
776 );
777 Ok(response)
778 })
779 }
780 }
781 }
782 }
783 impl<T> Clone for StreamServiceServer<T> {
784 fn clone(&self) -> Self {
785 let inner = self.inner.clone();
786 Self {
787 inner,
788 accept_compression_encodings: self.accept_compression_encodings,
789 send_compression_encodings: self.send_compression_encodings,
790 max_decoding_message_size: self.max_decoding_message_size,
791 max_encoding_message_size: self.max_encoding_message_size,
792 }
793 }
794 }
795 pub const SERVICE_NAME: &str = "stream_service.StreamService";
797 impl<T> tonic::server::NamedService for StreamServiceServer<T> {
798 const NAME: &'static str = SERVICE_NAME;
799 }
800}