1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct InjectBarrierRequest {
5 #[prost(string, tag = "1")]
6 pub request_id: ::prost::alloc::string::String,
7 #[prost(message, optional, tag = "2")]
8 pub barrier: ::core::option::Option<super::stream_plan::Barrier>,
9 #[prost(uint32, repeated, tag = "4", wrapper = "crate::id::ActorId")]
10 pub actor_ids_to_collect: ::prost::alloc::vec::Vec<crate::id::ActorId>,
11 #[prost(uint32, repeated, tag = "5", wrapper = "crate::id::TableId")]
12 pub table_ids_to_sync: ::prost::alloc::vec::Vec<crate::id::TableId>,
13 #[prost(uint64, tag = "6", wrapper = "crate::id::PartialGraphId")]
14 pub partial_graph_id: crate::id::PartialGraphId,
15 #[prost(message, repeated, tag = "9")]
16 pub actors_to_build: ::prost::alloc::vec::Vec<
17 inject_barrier_request::FragmentBuildActorInfo,
18 >,
19}
20pub mod inject_barrier_request {
22 #[derive(prost_helpers::AnyPB)]
23 #[derive(Clone, PartialEq, ::prost::Message)]
24 pub struct FragmentBuildActorInfo {
25 #[prost(uint32, tag = "1", wrapper = "crate::id::FragmentId")]
26 pub fragment_id: crate::id::FragmentId,
27 #[prost(message, optional, tag = "2")]
28 pub node: ::core::option::Option<super::super::stream_plan::StreamNode>,
29 #[prost(message, repeated, tag = "3")]
30 pub actors: ::prost::alloc::vec::Vec<BuildActorInfo>,
31 }
32 #[derive(prost_helpers::AnyPB)]
33 #[derive(Clone, PartialEq, ::prost::Message)]
34 pub struct BuildActorInfo {
35 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
36 pub actor_id: crate::id::ActorId,
37 #[prost(map = "uint32, message", tag = "2", wrapper = "crate::id::FragmentId")]
38 pub fragment_upstreams: ::std::collections::HashMap<
39 crate::id::FragmentId,
40 build_actor_info::UpstreamActors,
41 >,
42 #[prost(message, repeated, tag = "3")]
43 pub dispatchers: ::prost::alloc::vec::Vec<super::super::stream_plan::Dispatcher>,
44 #[prost(message, optional, tag = "4")]
45 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
46 #[prost(string, tag = "5")]
47 pub mview_definition: ::prost::alloc::string::String,
48 #[prost(message, optional, tag = "6")]
49 pub expr_context: ::core::option::Option<super::super::plan_common::ExprContext>,
50 #[prost(string, tag = "9")]
51 pub config_override: ::prost::alloc::string::String,
52 #[prost(uint32, repeated, tag = "7", wrapper = "crate::id::SubscriberId")]
53 pub initial_subscriber_ids: ::prost::alloc::vec::Vec<crate::id::SubscriberId>,
54 }
55 pub mod build_actor_info {
57 #[derive(prost_helpers::AnyPB)]
58 #[derive(Clone, PartialEq, ::prost::Message)]
59 pub struct UpstreamActors {
60 #[prost(message, repeated, tag = "1")]
61 pub actors: ::prost::alloc::vec::Vec<super::super::super::common::ActorInfo>,
62 }
63 }
64}
65#[derive(prost_helpers::AnyPB)]
66#[derive(Clone, PartialEq, ::prost::Message)]
67pub struct BarrierCompleteResponse {
68 #[prost(string, tag = "1")]
69 pub request_id: ::prost::alloc::string::String,
70 #[prost(message, optional, tag = "2")]
71 pub status: ::core::option::Option<super::common::Status>,
72 #[prost(message, repeated, tag = "3")]
73 pub create_mview_progress: ::prost::alloc::vec::Vec<
74 barrier_complete_response::CreateMviewProgress,
75 >,
76 #[prost(message, repeated, tag = "4")]
77 pub synced_sstables: ::prost::alloc::vec::Vec<
78 barrier_complete_response::LocalSstableInfo,
79 >,
80 #[prost(uint32, tag = "5", wrapper = "crate::id::WorkerId")]
81 pub worker_id: crate::id::WorkerId,
82 #[prost(map = "uint32, message", tag = "6", wrapper = "crate::id::TableId")]
83 pub table_watermarks: ::std::collections::HashMap<
84 crate::id::TableId,
85 super::hummock::TableWatermarks,
86 >,
87 #[prost(message, repeated, tag = "7")]
88 pub old_value_sstables: ::prost::alloc::vec::Vec<super::hummock::SstableInfo>,
89 #[prost(uint64, tag = "8", wrapper = "crate::id::PartialGraphId")]
90 pub partial_graph_id: crate::id::PartialGraphId,
91 #[prost(uint64, tag = "9")]
93 pub epoch: u64,
94 #[prost(message, repeated, tag = "11")]
95 pub load_finished_sources: ::prost::alloc::vec::Vec<
96 barrier_complete_response::LoadFinishedSource,
97 >,
98 #[prost(map = "uint32, message", tag = "12", wrapper = "crate::id::TableId")]
99 pub vector_index_adds: ::std::collections::HashMap<
100 crate::id::TableId,
101 super::hummock::vector_index_delta::VectorIndexAdds,
102 >,
103 #[prost(message, repeated, tag = "13")]
104 pub cdc_table_backfill_progress: ::prost::alloc::vec::Vec<
105 barrier_complete_response::CdcTableBackfillProgress,
106 >,
107 #[prost(uint32, repeated, tag = "14", wrapper = "crate::id::TableId")]
111 pub truncate_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
112 #[prost(uint32, repeated, tag = "15", wrapper = "crate::id::TableId")]
116 pub refresh_finished_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
117 #[prost(message, repeated, tag = "16")]
118 pub list_finished_sources: ::prost::alloc::vec::Vec<
119 barrier_complete_response::ListFinishedSource,
120 >,
121 #[prost(message, repeated, tag = "17")]
122 pub cdc_source_offset_updated: ::prost::alloc::vec::Vec<
123 barrier_complete_response::CdcSourceOffsetUpdated,
124 >,
125}
126pub mod barrier_complete_response {
128 #[derive(prost_helpers::AnyPB)]
129 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
130 pub struct CreateMviewProgress {
131 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
134 pub backfill_actor_id: crate::id::ActorId,
135 #[prost(bool, tag = "2")]
136 pub done: bool,
137 #[prost(uint64, tag = "3")]
139 pub consumed_epoch: u64,
140 #[prost(uint64, tag = "4")]
142 pub consumed_rows: u64,
143 #[prost(uint64, tag = "5")]
144 pub pending_epoch_lag: u64,
145 #[prost(uint64, tag = "6")]
147 pub buffered_rows: u64,
148 #[prost(uint32, tag = "7", wrapper = "crate::id::FragmentId")]
149 pub fragment_id: crate::id::FragmentId,
150 }
151 #[derive(prost_helpers::AnyPB)]
152 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
153 pub struct CdcTableBackfillProgress {
154 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
155 pub actor_id: crate::id::ActorId,
156 #[prost(uint64, tag = "2")]
157 pub epoch: u64,
158 #[prost(bool, tag = "3")]
159 pub done: bool,
160 #[prost(int64, tag = "4")]
161 pub split_id_start_inclusive: i64,
162 #[prost(int64, tag = "5")]
163 pub split_id_end_inclusive: i64,
164 #[prost(uint64, tag = "6")]
165 pub generation: u64,
166 #[prost(uint32, tag = "7", wrapper = "crate::id::FragmentId")]
167 pub fragment_id: crate::id::FragmentId,
168 }
169 #[derive(prost_helpers::AnyPB)]
170 #[derive(Clone, PartialEq, ::prost::Message)]
171 pub struct LocalSstableInfo {
172 #[prost(message, optional, tag = "2")]
173 pub sst: ::core::option::Option<super::super::hummock::SstableInfo>,
174 #[prost(map = "uint32, message", tag = "3", wrapper = "crate::id::TableId")]
175 pub table_stats_map: ::std::collections::HashMap<
176 crate::id::TableId,
177 super::super::hummock::TableStats,
178 >,
179 #[prost(uint64, tag = "4")]
180 pub created_at: u64,
181 }
182 #[derive(prost_helpers::AnyPB)]
186 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
187 pub struct LoadFinishedSource {
188 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
190 pub reporter_actor_id: crate::id::ActorId,
191 #[prost(uint32, tag = "2", wrapper = "crate::id::TableId")]
193 pub table_id: crate::id::TableId,
194 #[prost(uint32, tag = "3", wrapper = "crate::id::SourceId")]
196 pub associated_source_id: crate::id::SourceId,
197 }
198 #[derive(prost_helpers::AnyPB)]
201 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
202 pub struct ListFinishedSource {
203 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
205 pub reporter_actor_id: crate::id::ActorId,
206 #[prost(uint32, tag = "2", wrapper = "crate::id::TableId")]
208 pub table_id: crate::id::TableId,
209 #[prost(uint32, tag = "3", wrapper = "crate::id::SourceId")]
211 pub associated_source_id: crate::id::SourceId,
212 }
213 #[derive(prost_helpers::AnyPB)]
216 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
217 pub struct CdcSourceOffsetUpdated {
218 #[prost(uint32, tag = "1")]
220 pub reporter_actor_id: u32,
221 #[prost(uint32, tag = "2")]
223 pub source_id: u32,
224 }
225}
226#[derive(prost_helpers::AnyPB)]
227#[derive(Clone, PartialEq, ::prost::Message)]
228pub struct StreamingControlStreamRequest {
229 #[prost(oneof = "streaming_control_stream_request::Request", tags = "1, 2, 3, 4, 5")]
230 pub request: ::core::option::Option<streaming_control_stream_request::Request>,
231}
232pub mod streaming_control_stream_request {
234 #[derive(prost_helpers::AnyPB)]
235 #[derive(Clone, PartialEq, ::prost::Message)]
236 pub struct InitRequest {
237 #[prost(string, tag = "1")]
238 pub term_id: ::prost::alloc::string::String,
239 }
240 #[derive(prost_helpers::AnyPB)]
241 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
242 pub struct CreatePartialGraphRequest {
243 #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
244 pub partial_graph_id: crate::id::PartialGraphId,
245 }
246 #[derive(prost_helpers::AnyPB)]
247 #[derive(Clone, PartialEq, ::prost::Message)]
248 pub struct RemovePartialGraphRequest {
249 #[prost(uint64, repeated, tag = "1", wrapper = "crate::id::PartialGraphId")]
250 pub partial_graph_ids: ::prost::alloc::vec::Vec<crate::id::PartialGraphId>,
251 }
252 #[derive(prost_helpers::AnyPB)]
253 #[derive(Clone, PartialEq, ::prost::Message)]
254 pub struct ResetPartialGraphsRequest {
255 #[prost(uint64, repeated, tag = "1", wrapper = "crate::id::PartialGraphId")]
256 pub partial_graph_ids: ::prost::alloc::vec::Vec<crate::id::PartialGraphId>,
257 }
258 #[derive(prost_helpers::AnyPB)]
259 #[derive(Clone, PartialEq, ::prost::Oneof)]
260 pub enum Request {
261 #[prost(message, tag = "1")]
262 Init(InitRequest),
263 #[prost(message, tag = "2")]
264 InjectBarrier(super::InjectBarrierRequest),
265 #[prost(message, tag = "3")]
266 RemovePartialGraph(RemovePartialGraphRequest),
267 #[prost(message, tag = "4")]
268 CreatePartialGraph(CreatePartialGraphRequest),
269 #[prost(message, tag = "5")]
270 ResetPartialGraphs(ResetPartialGraphsRequest),
271 }
272}
273#[derive(prost_helpers::AnyPB)]
274#[derive(Clone, PartialEq, ::prost::Message)]
275pub struct ScoredError {
276 #[prost(string, tag = "1")]
277 pub err_msg: ::prost::alloc::string::String,
278 #[prost(int32, tag = "2")]
279 pub score: i32,
280}
281#[derive(prost_helpers::AnyPB)]
282#[derive(Clone, PartialEq, ::prost::Message)]
283pub struct StreamingControlStreamResponse {
284 #[prost(
285 oneof = "streaming_control_stream_response::Response",
286 tags = "1, 2, 3, 4, 5"
287 )]
288 pub response: ::core::option::Option<streaming_control_stream_response::Response>,
289}
290pub mod streaming_control_stream_response {
292 #[derive(prost_helpers::AnyPB)]
293 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
294 pub struct InitResponse {}
295 #[derive(prost_helpers::AnyPB)]
296 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
297 pub struct ShutdownResponse {}
298 #[derive(prost_helpers::AnyPB)]
299 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
300 pub struct ReportPartialGraphFailureResponse {
301 #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
302 pub partial_graph_id: crate::id::PartialGraphId,
303 }
304 #[derive(prost_helpers::AnyPB)]
305 #[derive(Clone, PartialEq, ::prost::Message)]
306 pub struct ResetPartialGraphResponse {
307 #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
308 pub partial_graph_id: crate::id::PartialGraphId,
309 #[prost(message, optional, tag = "2")]
310 pub root_err: ::core::option::Option<super::ScoredError>,
311 }
312 #[derive(prost_helpers::AnyPB)]
313 #[derive(Clone, PartialEq, ::prost::Oneof)]
314 pub enum Response {
315 #[prost(message, tag = "1")]
316 Init(InitResponse),
317 #[prost(message, tag = "2")]
318 CompleteBarrier(super::BarrierCompleteResponse),
319 #[prost(message, tag = "3")]
320 Shutdown(ShutdownResponse),
321 #[prost(message, tag = "4")]
322 ReportPartialGraphFailure(ReportPartialGraphFailureResponse),
323 #[prost(message, tag = "5")]
324 ResetPartialGraph(ResetPartialGraphResponse),
325 }
326}
327#[derive(prost_helpers::AnyPB)]
328#[derive(Clone, Copy, PartialEq, ::prost::Message)]
329pub struct GetMinUncommittedObjectIdRequest {}
330#[derive(prost_helpers::AnyPB)]
331#[derive(Clone, Copy, PartialEq, ::prost::Message)]
332pub struct GetMinUncommittedObjectIdResponse {
333 #[prost(uint64, tag = "1", wrapper = "crate::id::HummockRawObjectId")]
334 pub min_uncommitted_object_id: crate::id::HummockRawObjectId,
335}
336pub mod stream_service_client {
338 #![allow(
339 unused_variables,
340 dead_code,
341 missing_docs,
342 clippy::wildcard_imports,
343 clippy::let_unit_value,
344 )]
345 use tonic::codegen::*;
346 use tonic::codegen::http::Uri;
347 #[derive(Debug, Clone)]
348 pub struct StreamServiceClient<T> {
349 inner: tonic::client::Grpc<T>,
350 }
351 impl StreamServiceClient<tonic::transport::Channel> {
352 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
354 where
355 D: TryInto<tonic::transport::Endpoint>,
356 D::Error: Into<StdError>,
357 {
358 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
359 Ok(Self::new(conn))
360 }
361 }
362 impl<T> StreamServiceClient<T>
363 where
364 T: tonic::client::GrpcService<tonic::body::BoxBody>,
365 T::Error: Into<StdError>,
366 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
367 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
368 {
369 pub fn new(inner: T) -> Self {
370 let inner = tonic::client::Grpc::new(inner);
371 Self { inner }
372 }
373 pub fn with_origin(inner: T, origin: Uri) -> Self {
374 let inner = tonic::client::Grpc::with_origin(inner, origin);
375 Self { inner }
376 }
377 pub fn with_interceptor<F>(
378 inner: T,
379 interceptor: F,
380 ) -> StreamServiceClient<InterceptedService<T, F>>
381 where
382 F: tonic::service::Interceptor,
383 T::ResponseBody: Default,
384 T: tonic::codegen::Service<
385 http::Request<tonic::body::BoxBody>,
386 Response = http::Response<
387 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
388 >,
389 >,
390 <T as tonic::codegen::Service<
391 http::Request<tonic::body::BoxBody>,
392 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
393 {
394 StreamServiceClient::new(InterceptedService::new(inner, interceptor))
395 }
396 #[must_use]
401 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
402 self.inner = self.inner.send_compressed(encoding);
403 self
404 }
405 #[must_use]
407 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
408 self.inner = self.inner.accept_compressed(encoding);
409 self
410 }
411 #[must_use]
415 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
416 self.inner = self.inner.max_decoding_message_size(limit);
417 self
418 }
419 #[must_use]
423 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
424 self.inner = self.inner.max_encoding_message_size(limit);
425 self
426 }
427 pub async fn streaming_control_stream(
428 &mut self,
429 request: impl tonic::IntoStreamingRequest<
430 Message = super::StreamingControlStreamRequest,
431 >,
432 ) -> std::result::Result<
433 tonic::Response<
434 tonic::codec::Streaming<super::StreamingControlStreamResponse>,
435 >,
436 tonic::Status,
437 > {
438 self.inner
439 .ready()
440 .await
441 .map_err(|e| {
442 tonic::Status::unknown(
443 format!("Service was not ready: {}", e.into()),
444 )
445 })?;
446 let codec = tonic::codec::ProstCodec::default();
447 let path = http::uri::PathAndQuery::from_static(
448 "/stream_service.StreamService/StreamingControlStream",
449 );
450 let mut req = request.into_streaming_request();
451 req.extensions_mut()
452 .insert(
453 GrpcMethod::new(
454 "stream_service.StreamService",
455 "StreamingControlStream",
456 ),
457 );
458 self.inner.streaming(req, path, codec).await
459 }
460 pub async fn get_min_uncommitted_object_id(
461 &mut self,
462 request: impl tonic::IntoRequest<super::GetMinUncommittedObjectIdRequest>,
463 ) -> std::result::Result<
464 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
465 tonic::Status,
466 > {
467 self.inner
468 .ready()
469 .await
470 .map_err(|e| {
471 tonic::Status::unknown(
472 format!("Service was not ready: {}", e.into()),
473 )
474 })?;
475 let codec = tonic::codec::ProstCodec::default();
476 let path = http::uri::PathAndQuery::from_static(
477 "/stream_service.StreamService/GetMinUncommittedObjectId",
478 );
479 let mut req = request.into_request();
480 req.extensions_mut()
481 .insert(
482 GrpcMethod::new(
483 "stream_service.StreamService",
484 "GetMinUncommittedObjectId",
485 ),
486 );
487 self.inner.unary(req, path, codec).await
488 }
489 }
490}
491pub mod stream_service_server {
493 #![allow(
494 unused_variables,
495 dead_code,
496 missing_docs,
497 clippy::wildcard_imports,
498 clippy::let_unit_value,
499 )]
500 use tonic::codegen::*;
501 #[async_trait]
503 pub trait StreamService: std::marker::Send + std::marker::Sync + 'static {
504 type StreamingControlStreamStream: tonic::codegen::tokio_stream::Stream<
506 Item = std::result::Result<
507 super::StreamingControlStreamResponse,
508 tonic::Status,
509 >,
510 >
511 + std::marker::Send
512 + 'static;
513 async fn streaming_control_stream(
514 &self,
515 request: tonic::Request<
516 tonic::Streaming<super::StreamingControlStreamRequest>,
517 >,
518 ) -> std::result::Result<
519 tonic::Response<Self::StreamingControlStreamStream>,
520 tonic::Status,
521 >;
522 async fn get_min_uncommitted_object_id(
523 &self,
524 request: tonic::Request<super::GetMinUncommittedObjectIdRequest>,
525 ) -> std::result::Result<
526 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
527 tonic::Status,
528 >;
529 }
530 #[derive(Debug)]
531 pub struct StreamServiceServer<T> {
532 inner: Arc<T>,
533 accept_compression_encodings: EnabledCompressionEncodings,
534 send_compression_encodings: EnabledCompressionEncodings,
535 max_decoding_message_size: Option<usize>,
536 max_encoding_message_size: Option<usize>,
537 }
538 impl<T> StreamServiceServer<T> {
539 pub fn new(inner: T) -> Self {
540 Self::from_arc(Arc::new(inner))
541 }
542 pub fn from_arc(inner: Arc<T>) -> Self {
543 Self {
544 inner,
545 accept_compression_encodings: Default::default(),
546 send_compression_encodings: Default::default(),
547 max_decoding_message_size: None,
548 max_encoding_message_size: None,
549 }
550 }
551 pub fn with_interceptor<F>(
552 inner: T,
553 interceptor: F,
554 ) -> InterceptedService<Self, F>
555 where
556 F: tonic::service::Interceptor,
557 {
558 InterceptedService::new(Self::new(inner), interceptor)
559 }
560 #[must_use]
562 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
563 self.accept_compression_encodings.enable(encoding);
564 self
565 }
566 #[must_use]
568 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
569 self.send_compression_encodings.enable(encoding);
570 self
571 }
572 #[must_use]
576 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
577 self.max_decoding_message_size = Some(limit);
578 self
579 }
580 #[must_use]
584 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
585 self.max_encoding_message_size = Some(limit);
586 self
587 }
588 }
589 impl<T, B> tonic::codegen::Service<http::Request<B>> for StreamServiceServer<T>
590 where
591 T: StreamService,
592 B: Body + std::marker::Send + 'static,
593 B::Error: Into<StdError> + std::marker::Send + 'static,
594 {
595 type Response = http::Response<tonic::body::BoxBody>;
596 type Error = std::convert::Infallible;
597 type Future = BoxFuture<Self::Response, Self::Error>;
598 fn poll_ready(
599 &mut self,
600 _cx: &mut Context<'_>,
601 ) -> Poll<std::result::Result<(), Self::Error>> {
602 Poll::Ready(Ok(()))
603 }
604 fn call(&mut self, req: http::Request<B>) -> Self::Future {
605 match req.uri().path() {
606 "/stream_service.StreamService/StreamingControlStream" => {
607 #[allow(non_camel_case_types)]
608 struct StreamingControlStreamSvc<T: StreamService>(pub Arc<T>);
609 impl<
610 T: StreamService,
611 > tonic::server::StreamingService<
612 super::StreamingControlStreamRequest,
613 > for StreamingControlStreamSvc<T> {
614 type Response = super::StreamingControlStreamResponse;
615 type ResponseStream = T::StreamingControlStreamStream;
616 type Future = BoxFuture<
617 tonic::Response<Self::ResponseStream>,
618 tonic::Status,
619 >;
620 fn call(
621 &mut self,
622 request: tonic::Request<
623 tonic::Streaming<super::StreamingControlStreamRequest>,
624 >,
625 ) -> Self::Future {
626 let inner = Arc::clone(&self.0);
627 let fut = async move {
628 <T as StreamService>::streaming_control_stream(
629 &inner,
630 request,
631 )
632 .await
633 };
634 Box::pin(fut)
635 }
636 }
637 let accept_compression_encodings = self.accept_compression_encodings;
638 let send_compression_encodings = self.send_compression_encodings;
639 let max_decoding_message_size = self.max_decoding_message_size;
640 let max_encoding_message_size = self.max_encoding_message_size;
641 let inner = self.inner.clone();
642 let fut = async move {
643 let method = StreamingControlStreamSvc(inner);
644 let codec = tonic::codec::ProstCodec::default();
645 let mut grpc = tonic::server::Grpc::new(codec)
646 .apply_compression_config(
647 accept_compression_encodings,
648 send_compression_encodings,
649 )
650 .apply_max_message_size_config(
651 max_decoding_message_size,
652 max_encoding_message_size,
653 );
654 let res = grpc.streaming(method, req).await;
655 Ok(res)
656 };
657 Box::pin(fut)
658 }
659 "/stream_service.StreamService/GetMinUncommittedObjectId" => {
660 #[allow(non_camel_case_types)]
661 struct GetMinUncommittedObjectIdSvc<T: StreamService>(pub Arc<T>);
662 impl<
663 T: StreamService,
664 > tonic::server::UnaryService<
665 super::GetMinUncommittedObjectIdRequest,
666 > for GetMinUncommittedObjectIdSvc<T> {
667 type Response = super::GetMinUncommittedObjectIdResponse;
668 type Future = BoxFuture<
669 tonic::Response<Self::Response>,
670 tonic::Status,
671 >;
672 fn call(
673 &mut self,
674 request: tonic::Request<
675 super::GetMinUncommittedObjectIdRequest,
676 >,
677 ) -> Self::Future {
678 let inner = Arc::clone(&self.0);
679 let fut = async move {
680 <T as StreamService>::get_min_uncommitted_object_id(
681 &inner,
682 request,
683 )
684 .await
685 };
686 Box::pin(fut)
687 }
688 }
689 let accept_compression_encodings = self.accept_compression_encodings;
690 let send_compression_encodings = self.send_compression_encodings;
691 let max_decoding_message_size = self.max_decoding_message_size;
692 let max_encoding_message_size = self.max_encoding_message_size;
693 let inner = self.inner.clone();
694 let fut = async move {
695 let method = GetMinUncommittedObjectIdSvc(inner);
696 let codec = tonic::codec::ProstCodec::default();
697 let mut grpc = tonic::server::Grpc::new(codec)
698 .apply_compression_config(
699 accept_compression_encodings,
700 send_compression_encodings,
701 )
702 .apply_max_message_size_config(
703 max_decoding_message_size,
704 max_encoding_message_size,
705 );
706 let res = grpc.unary(method, req).await;
707 Ok(res)
708 };
709 Box::pin(fut)
710 }
711 _ => {
712 Box::pin(async move {
713 let mut response = http::Response::new(empty_body());
714 let headers = response.headers_mut();
715 headers
716 .insert(
717 tonic::Status::GRPC_STATUS,
718 (tonic::Code::Unimplemented as i32).into(),
719 );
720 headers
721 .insert(
722 http::header::CONTENT_TYPE,
723 tonic::metadata::GRPC_CONTENT_TYPE,
724 );
725 Ok(response)
726 })
727 }
728 }
729 }
730 }
731 impl<T> Clone for StreamServiceServer<T> {
732 fn clone(&self) -> Self {
733 let inner = self.inner.clone();
734 Self {
735 inner,
736 accept_compression_encodings: self.accept_compression_encodings,
737 send_compression_encodings: self.send_compression_encodings,
738 max_decoding_message_size: self.max_decoding_message_size,
739 max_encoding_message_size: self.max_encoding_message_size,
740 }
741 }
742 }
743 pub const SERVICE_NAME: &str = "stream_service.StreamService";
745 impl<T> tonic::server::NamedService for StreamServiceServer<T> {
746 const NAME: &'static str = SERVICE_NAME;
747 }
748}