1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct InjectBarrierRequest {
5 #[prost(string, tag = "1")]
6 pub request_id: ::prost::alloc::string::String,
7 #[prost(message, optional, tag = "2")]
8 pub barrier: ::core::option::Option<super::stream_plan::Barrier>,
9 #[prost(uint32, tag = "3", wrapper = "crate::id::DatabaseId")]
10 pub database_id: crate::id::DatabaseId,
11 #[prost(uint32, repeated, tag = "4")]
12 pub actor_ids_to_collect: ::prost::alloc::vec::Vec<u32>,
13 #[prost(uint32, repeated, tag = "5", wrapper = "crate::id::TableId")]
14 pub table_ids_to_sync: ::prost::alloc::vec::Vec<crate::id::TableId>,
15 #[prost(uint32, tag = "6")]
16 pub partial_graph_id: u32,
17 #[prost(message, repeated, tag = "9")]
18 pub actors_to_build: ::prost::alloc::vec::Vec<
19 inject_barrier_request::FragmentBuildActorInfo,
20 >,
21}
22pub mod inject_barrier_request {
24 #[derive(prost_helpers::AnyPB)]
25 #[derive(Clone, PartialEq, ::prost::Message)]
26 pub struct FragmentBuildActorInfo {
27 #[prost(uint32, tag = "1")]
28 pub fragment_id: u32,
29 #[prost(message, optional, tag = "2")]
30 pub node: ::core::option::Option<super::super::stream_plan::StreamNode>,
31 #[prost(message, repeated, tag = "3")]
32 pub actors: ::prost::alloc::vec::Vec<BuildActorInfo>,
33 }
34 #[derive(prost_helpers::AnyPB)]
35 #[derive(Clone, PartialEq, ::prost::Message)]
36 pub struct BuildActorInfo {
37 #[prost(uint32, tag = "1")]
38 pub actor_id: u32,
39 #[prost(map = "uint32, message", tag = "2")]
40 pub fragment_upstreams: ::std::collections::HashMap<
41 u32,
42 build_actor_info::UpstreamActors,
43 >,
44 #[prost(message, repeated, tag = "3")]
45 pub dispatchers: ::prost::alloc::vec::Vec<super::super::stream_plan::Dispatcher>,
46 #[prost(message, optional, tag = "4")]
47 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
48 #[prost(string, tag = "5")]
49 pub mview_definition: ::prost::alloc::string::String,
50 #[prost(message, optional, tag = "6")]
51 pub expr_context: ::core::option::Option<super::super::plan_common::ExprContext>,
52 #[prost(uint32, repeated, tag = "7")]
53 pub initial_subscriber_ids: ::prost::alloc::vec::Vec<u32>,
54 }
55 pub mod build_actor_info {
57 #[derive(prost_helpers::AnyPB)]
58 #[derive(Clone, PartialEq, ::prost::Message)]
59 pub struct UpstreamActors {
60 #[prost(message, repeated, tag = "1")]
61 pub actors: ::prost::alloc::vec::Vec<super::super::super::common::ActorInfo>,
62 }
63 }
64}
65#[derive(prost_helpers::AnyPB)]
66#[derive(Clone, PartialEq, ::prost::Message)]
67pub struct BarrierCompleteResponse {
68 #[prost(string, tag = "1")]
69 pub request_id: ::prost::alloc::string::String,
70 #[prost(message, optional, tag = "2")]
71 pub status: ::core::option::Option<super::common::Status>,
72 #[prost(message, repeated, tag = "3")]
73 pub create_mview_progress: ::prost::alloc::vec::Vec<
74 barrier_complete_response::CreateMviewProgress,
75 >,
76 #[prost(message, repeated, tag = "4")]
77 pub synced_sstables: ::prost::alloc::vec::Vec<
78 barrier_complete_response::LocalSstableInfo,
79 >,
80 #[prost(uint32, tag = "5")]
81 pub worker_id: u32,
82 #[prost(map = "uint32, message", tag = "6", wrapper = "crate::id::TableId")]
83 pub table_watermarks: ::std::collections::HashMap<
84 crate::id::TableId,
85 super::hummock::TableWatermarks,
86 >,
87 #[prost(message, repeated, tag = "7")]
88 pub old_value_sstables: ::prost::alloc::vec::Vec<super::hummock::SstableInfo>,
89 #[prost(uint32, tag = "8")]
90 pub partial_graph_id: u32,
91 #[prost(uint64, tag = "9")]
93 pub epoch: u64,
94 #[prost(uint32, tag = "10", wrapper = "crate::id::DatabaseId")]
95 pub database_id: crate::id::DatabaseId,
96 #[prost(message, repeated, tag = "11")]
97 pub load_finished_sources: ::prost::alloc::vec::Vec<
98 barrier_complete_response::LoadFinishedSource,
99 >,
100 #[prost(map = "uint32, message", tag = "12", wrapper = "crate::id::TableId")]
101 pub vector_index_adds: ::std::collections::HashMap<
102 crate::id::TableId,
103 super::hummock::vector_index_delta::VectorIndexAdds,
104 >,
105 #[prost(message, repeated, tag = "13")]
106 pub cdc_table_backfill_progress: ::prost::alloc::vec::Vec<
107 barrier_complete_response::CdcTableBackfillProgress,
108 >,
109 #[prost(uint32, repeated, tag = "14", wrapper = "crate::id::TableId")]
113 pub truncate_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
114 #[prost(uint32, repeated, tag = "15", wrapper = "crate::id::TableId")]
118 pub refresh_finished_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
119 #[prost(message, repeated, tag = "16")]
120 pub list_finished_sources: ::prost::alloc::vec::Vec<
121 barrier_complete_response::ListFinishedSource,
122 >,
123}
124pub mod barrier_complete_response {
126 #[derive(prost_helpers::AnyPB)]
127 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
128 pub struct CreateMviewProgress {
129 #[prost(uint32, tag = "1")]
132 pub backfill_actor_id: u32,
133 #[prost(bool, tag = "2")]
134 pub done: bool,
135 #[prost(uint64, tag = "3")]
137 pub consumed_epoch: u64,
138 #[prost(uint64, tag = "4")]
140 pub consumed_rows: u64,
141 #[prost(uint64, tag = "5")]
142 pub pending_epoch_lag: u64,
143 #[prost(uint64, tag = "6")]
145 pub buffered_rows: u64,
146 }
147 #[derive(prost_helpers::AnyPB)]
148 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
149 pub struct CdcTableBackfillProgress {
150 #[prost(uint32, tag = "1")]
151 pub actor_id: u32,
152 #[prost(uint64, tag = "2")]
153 pub epoch: u64,
154 #[prost(bool, tag = "3")]
155 pub done: bool,
156 #[prost(int64, tag = "4")]
157 pub split_id_start_inclusive: i64,
158 #[prost(int64, tag = "5")]
159 pub split_id_end_inclusive: i64,
160 #[prost(uint64, tag = "6")]
161 pub generation: u64,
162 #[prost(uint32, tag = "7")]
163 pub fragment_id: u32,
164 }
165 #[derive(prost_helpers::AnyPB)]
166 #[derive(Clone, PartialEq, ::prost::Message)]
167 pub struct LocalSstableInfo {
168 #[prost(message, optional, tag = "2")]
169 pub sst: ::core::option::Option<super::super::hummock::SstableInfo>,
170 #[prost(map = "uint32, message", tag = "3", wrapper = "crate::id::TableId")]
171 pub table_stats_map: ::std::collections::HashMap<
172 crate::id::TableId,
173 super::super::hummock::TableStats,
174 >,
175 #[prost(uint64, tag = "4")]
176 pub created_at: u64,
177 }
178 #[derive(prost_helpers::AnyPB)]
182 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
183 pub struct LoadFinishedSource {
184 #[prost(uint32, tag = "1")]
186 pub reporter_actor_id: u32,
187 #[prost(uint32, tag = "2")]
189 pub table_id: u32,
190 #[prost(uint32, tag = "3")]
192 pub associated_source_id: u32,
193 }
194 #[derive(prost_helpers::AnyPB)]
197 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
198 pub struct ListFinishedSource {
199 #[prost(uint32, tag = "1")]
201 pub reporter_actor_id: u32,
202 #[prost(uint32, tag = "2")]
204 pub table_id: u32,
205 #[prost(uint32, tag = "3")]
207 pub associated_source_id: u32,
208 }
209}
210#[derive(prost_helpers::AnyPB)]
211#[derive(Clone, PartialEq, ::prost::Message)]
212pub struct StreamingControlStreamRequest {
213 #[prost(oneof = "streaming_control_stream_request::Request", tags = "1, 2, 3, 4, 5")]
214 pub request: ::core::option::Option<streaming_control_stream_request::Request>,
215}
216pub mod streaming_control_stream_request {
218 #[derive(prost_helpers::AnyPB)]
219 #[derive(Clone, PartialEq, ::prost::Message)]
220 pub struct InitRequest {
221 #[prost(string, tag = "1")]
222 pub term_id: ::prost::alloc::string::String,
223 }
224 #[derive(prost_helpers::AnyPB)]
225 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
226 pub struct CreatePartialGraphRequest {
227 #[prost(uint32, tag = "1")]
228 pub partial_graph_id: u32,
229 #[prost(uint32, tag = "2", wrapper = "crate::id::DatabaseId")]
230 pub database_id: crate::id::DatabaseId,
231 }
232 #[derive(prost_helpers::AnyPB)]
233 #[derive(Clone, PartialEq, ::prost::Message)]
234 pub struct RemovePartialGraphRequest {
235 #[prost(uint32, repeated, tag = "1")]
236 pub partial_graph_ids: ::prost::alloc::vec::Vec<u32>,
237 #[prost(uint32, tag = "2", wrapper = "crate::id::DatabaseId")]
238 pub database_id: crate::id::DatabaseId,
239 }
240 #[derive(prost_helpers::AnyPB)]
241 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
242 pub struct ResetDatabaseRequest {
243 #[prost(uint32, tag = "1", wrapper = "crate::id::DatabaseId")]
244 pub database_id: crate::id::DatabaseId,
245 #[prost(uint32, tag = "2")]
246 pub reset_request_id: u32,
247 }
248 #[derive(prost_helpers::AnyPB)]
249 #[derive(Clone, PartialEq, ::prost::Oneof)]
250 pub enum Request {
251 #[prost(message, tag = "1")]
252 Init(InitRequest),
253 #[prost(message, tag = "2")]
254 InjectBarrier(super::InjectBarrierRequest),
255 #[prost(message, tag = "3")]
256 RemovePartialGraph(RemovePartialGraphRequest),
257 #[prost(message, tag = "4")]
258 CreatePartialGraph(CreatePartialGraphRequest),
259 #[prost(message, tag = "5")]
260 ResetDatabase(ResetDatabaseRequest),
261 }
262}
263#[derive(prost_helpers::AnyPB)]
264#[derive(Clone, PartialEq, ::prost::Message)]
265pub struct ScoredError {
266 #[prost(string, tag = "1")]
267 pub err_msg: ::prost::alloc::string::String,
268 #[prost(int32, tag = "2")]
269 pub score: i32,
270}
271#[derive(prost_helpers::AnyPB)]
272#[derive(Clone, PartialEq, ::prost::Message)]
273pub struct StreamingControlStreamResponse {
274 #[prost(
275 oneof = "streaming_control_stream_response::Response",
276 tags = "1, 2, 3, 4, 5"
277 )]
278 pub response: ::core::option::Option<streaming_control_stream_response::Response>,
279}
280pub mod streaming_control_stream_response {
282 #[derive(prost_helpers::AnyPB)]
283 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
284 pub struct InitResponse {}
285 #[derive(prost_helpers::AnyPB)]
286 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
287 pub struct ShutdownResponse {}
288 #[derive(prost_helpers::AnyPB)]
289 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
290 pub struct ReportDatabaseFailureResponse {
291 #[prost(uint32, tag = "1", wrapper = "crate::id::DatabaseId")]
292 pub database_id: crate::id::DatabaseId,
293 }
294 #[derive(prost_helpers::AnyPB)]
295 #[derive(Clone, PartialEq, ::prost::Message)]
296 pub struct ResetDatabaseResponse {
297 #[prost(uint32, tag = "1", wrapper = "crate::id::DatabaseId")]
298 pub database_id: crate::id::DatabaseId,
299 #[prost(message, optional, tag = "2")]
300 pub root_err: ::core::option::Option<super::ScoredError>,
301 #[prost(uint32, tag = "3")]
302 pub reset_request_id: u32,
303 }
304 #[derive(prost_helpers::AnyPB)]
305 #[derive(Clone, PartialEq, ::prost::Oneof)]
306 pub enum Response {
307 #[prost(message, tag = "1")]
308 Init(InitResponse),
309 #[prost(message, tag = "2")]
310 CompleteBarrier(super::BarrierCompleteResponse),
311 #[prost(message, tag = "3")]
312 Shutdown(ShutdownResponse),
313 #[prost(message, tag = "4")]
314 ReportDatabaseFailure(ReportDatabaseFailureResponse),
315 #[prost(message, tag = "5")]
316 ResetDatabase(ResetDatabaseResponse),
317 }
318}
319#[derive(prost_helpers::AnyPB)]
320#[derive(Clone, Copy, PartialEq, ::prost::Message)]
321pub struct GetMinUncommittedObjectIdRequest {}
322#[derive(prost_helpers::AnyPB)]
323#[derive(Clone, Copy, PartialEq, ::prost::Message)]
324pub struct GetMinUncommittedObjectIdResponse {
325 #[prost(uint64, tag = "1")]
326 pub min_uncommitted_object_id: u64,
327}
328pub mod stream_service_client {
330 #![allow(
331 unused_variables,
332 dead_code,
333 missing_docs,
334 clippy::wildcard_imports,
335 clippy::let_unit_value,
336 )]
337 use tonic::codegen::*;
338 use tonic::codegen::http::Uri;
339 #[derive(Debug, Clone)]
340 pub struct StreamServiceClient<T> {
341 inner: tonic::client::Grpc<T>,
342 }
343 impl StreamServiceClient<tonic::transport::Channel> {
344 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
346 where
347 D: TryInto<tonic::transport::Endpoint>,
348 D::Error: Into<StdError>,
349 {
350 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
351 Ok(Self::new(conn))
352 }
353 }
354 impl<T> StreamServiceClient<T>
355 where
356 T: tonic::client::GrpcService<tonic::body::BoxBody>,
357 T::Error: Into<StdError>,
358 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
359 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
360 {
361 pub fn new(inner: T) -> Self {
362 let inner = tonic::client::Grpc::new(inner);
363 Self { inner }
364 }
365 pub fn with_origin(inner: T, origin: Uri) -> Self {
366 let inner = tonic::client::Grpc::with_origin(inner, origin);
367 Self { inner }
368 }
369 pub fn with_interceptor<F>(
370 inner: T,
371 interceptor: F,
372 ) -> StreamServiceClient<InterceptedService<T, F>>
373 where
374 F: tonic::service::Interceptor,
375 T::ResponseBody: Default,
376 T: tonic::codegen::Service<
377 http::Request<tonic::body::BoxBody>,
378 Response = http::Response<
379 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
380 >,
381 >,
382 <T as tonic::codegen::Service<
383 http::Request<tonic::body::BoxBody>,
384 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
385 {
386 StreamServiceClient::new(InterceptedService::new(inner, interceptor))
387 }
388 #[must_use]
393 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
394 self.inner = self.inner.send_compressed(encoding);
395 self
396 }
397 #[must_use]
399 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
400 self.inner = self.inner.accept_compressed(encoding);
401 self
402 }
403 #[must_use]
407 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
408 self.inner = self.inner.max_decoding_message_size(limit);
409 self
410 }
411 #[must_use]
415 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
416 self.inner = self.inner.max_encoding_message_size(limit);
417 self
418 }
419 pub async fn streaming_control_stream(
420 &mut self,
421 request: impl tonic::IntoStreamingRequest<
422 Message = super::StreamingControlStreamRequest,
423 >,
424 ) -> std::result::Result<
425 tonic::Response<
426 tonic::codec::Streaming<super::StreamingControlStreamResponse>,
427 >,
428 tonic::Status,
429 > {
430 self.inner
431 .ready()
432 .await
433 .map_err(|e| {
434 tonic::Status::unknown(
435 format!("Service was not ready: {}", e.into()),
436 )
437 })?;
438 let codec = tonic::codec::ProstCodec::default();
439 let path = http::uri::PathAndQuery::from_static(
440 "/stream_service.StreamService/StreamingControlStream",
441 );
442 let mut req = request.into_streaming_request();
443 req.extensions_mut()
444 .insert(
445 GrpcMethod::new(
446 "stream_service.StreamService",
447 "StreamingControlStream",
448 ),
449 );
450 self.inner.streaming(req, path, codec).await
451 }
452 pub async fn get_min_uncommitted_object_id(
453 &mut self,
454 request: impl tonic::IntoRequest<super::GetMinUncommittedObjectIdRequest>,
455 ) -> std::result::Result<
456 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
457 tonic::Status,
458 > {
459 self.inner
460 .ready()
461 .await
462 .map_err(|e| {
463 tonic::Status::unknown(
464 format!("Service was not ready: {}", e.into()),
465 )
466 })?;
467 let codec = tonic::codec::ProstCodec::default();
468 let path = http::uri::PathAndQuery::from_static(
469 "/stream_service.StreamService/GetMinUncommittedObjectId",
470 );
471 let mut req = request.into_request();
472 req.extensions_mut()
473 .insert(
474 GrpcMethod::new(
475 "stream_service.StreamService",
476 "GetMinUncommittedObjectId",
477 ),
478 );
479 self.inner.unary(req, path, codec).await
480 }
481 }
482}
483pub mod stream_service_server {
485 #![allow(
486 unused_variables,
487 dead_code,
488 missing_docs,
489 clippy::wildcard_imports,
490 clippy::let_unit_value,
491 )]
492 use tonic::codegen::*;
493 #[async_trait]
495 pub trait StreamService: std::marker::Send + std::marker::Sync + 'static {
496 type StreamingControlStreamStream: tonic::codegen::tokio_stream::Stream<
498 Item = std::result::Result<
499 super::StreamingControlStreamResponse,
500 tonic::Status,
501 >,
502 >
503 + std::marker::Send
504 + 'static;
505 async fn streaming_control_stream(
506 &self,
507 request: tonic::Request<
508 tonic::Streaming<super::StreamingControlStreamRequest>,
509 >,
510 ) -> std::result::Result<
511 tonic::Response<Self::StreamingControlStreamStream>,
512 tonic::Status,
513 >;
514 async fn get_min_uncommitted_object_id(
515 &self,
516 request: tonic::Request<super::GetMinUncommittedObjectIdRequest>,
517 ) -> std::result::Result<
518 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
519 tonic::Status,
520 >;
521 }
522 #[derive(Debug)]
523 pub struct StreamServiceServer<T> {
524 inner: Arc<T>,
525 accept_compression_encodings: EnabledCompressionEncodings,
526 send_compression_encodings: EnabledCompressionEncodings,
527 max_decoding_message_size: Option<usize>,
528 max_encoding_message_size: Option<usize>,
529 }
530 impl<T> StreamServiceServer<T> {
531 pub fn new(inner: T) -> Self {
532 Self::from_arc(Arc::new(inner))
533 }
534 pub fn from_arc(inner: Arc<T>) -> Self {
535 Self {
536 inner,
537 accept_compression_encodings: Default::default(),
538 send_compression_encodings: Default::default(),
539 max_decoding_message_size: None,
540 max_encoding_message_size: None,
541 }
542 }
543 pub fn with_interceptor<F>(
544 inner: T,
545 interceptor: F,
546 ) -> InterceptedService<Self, F>
547 where
548 F: tonic::service::Interceptor,
549 {
550 InterceptedService::new(Self::new(inner), interceptor)
551 }
552 #[must_use]
554 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
555 self.accept_compression_encodings.enable(encoding);
556 self
557 }
558 #[must_use]
560 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
561 self.send_compression_encodings.enable(encoding);
562 self
563 }
564 #[must_use]
568 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
569 self.max_decoding_message_size = Some(limit);
570 self
571 }
572 #[must_use]
576 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
577 self.max_encoding_message_size = Some(limit);
578 self
579 }
580 }
581 impl<T, B> tonic::codegen::Service<http::Request<B>> for StreamServiceServer<T>
582 where
583 T: StreamService,
584 B: Body + std::marker::Send + 'static,
585 B::Error: Into<StdError> + std::marker::Send + 'static,
586 {
587 type Response = http::Response<tonic::body::BoxBody>;
588 type Error = std::convert::Infallible;
589 type Future = BoxFuture<Self::Response, Self::Error>;
590 fn poll_ready(
591 &mut self,
592 _cx: &mut Context<'_>,
593 ) -> Poll<std::result::Result<(), Self::Error>> {
594 Poll::Ready(Ok(()))
595 }
596 fn call(&mut self, req: http::Request<B>) -> Self::Future {
597 match req.uri().path() {
598 "/stream_service.StreamService/StreamingControlStream" => {
599 #[allow(non_camel_case_types)]
600 struct StreamingControlStreamSvc<T: StreamService>(pub Arc<T>);
601 impl<
602 T: StreamService,
603 > tonic::server::StreamingService<
604 super::StreamingControlStreamRequest,
605 > for StreamingControlStreamSvc<T> {
606 type Response = super::StreamingControlStreamResponse;
607 type ResponseStream = T::StreamingControlStreamStream;
608 type Future = BoxFuture<
609 tonic::Response<Self::ResponseStream>,
610 tonic::Status,
611 >;
612 fn call(
613 &mut self,
614 request: tonic::Request<
615 tonic::Streaming<super::StreamingControlStreamRequest>,
616 >,
617 ) -> Self::Future {
618 let inner = Arc::clone(&self.0);
619 let fut = async move {
620 <T as StreamService>::streaming_control_stream(
621 &inner,
622 request,
623 )
624 .await
625 };
626 Box::pin(fut)
627 }
628 }
629 let accept_compression_encodings = self.accept_compression_encodings;
630 let send_compression_encodings = self.send_compression_encodings;
631 let max_decoding_message_size = self.max_decoding_message_size;
632 let max_encoding_message_size = self.max_encoding_message_size;
633 let inner = self.inner.clone();
634 let fut = async move {
635 let method = StreamingControlStreamSvc(inner);
636 let codec = tonic::codec::ProstCodec::default();
637 let mut grpc = tonic::server::Grpc::new(codec)
638 .apply_compression_config(
639 accept_compression_encodings,
640 send_compression_encodings,
641 )
642 .apply_max_message_size_config(
643 max_decoding_message_size,
644 max_encoding_message_size,
645 );
646 let res = grpc.streaming(method, req).await;
647 Ok(res)
648 };
649 Box::pin(fut)
650 }
651 "/stream_service.StreamService/GetMinUncommittedObjectId" => {
652 #[allow(non_camel_case_types)]
653 struct GetMinUncommittedObjectIdSvc<T: StreamService>(pub Arc<T>);
654 impl<
655 T: StreamService,
656 > tonic::server::UnaryService<
657 super::GetMinUncommittedObjectIdRequest,
658 > for GetMinUncommittedObjectIdSvc<T> {
659 type Response = super::GetMinUncommittedObjectIdResponse;
660 type Future = BoxFuture<
661 tonic::Response<Self::Response>,
662 tonic::Status,
663 >;
664 fn call(
665 &mut self,
666 request: tonic::Request<
667 super::GetMinUncommittedObjectIdRequest,
668 >,
669 ) -> Self::Future {
670 let inner = Arc::clone(&self.0);
671 let fut = async move {
672 <T as StreamService>::get_min_uncommitted_object_id(
673 &inner,
674 request,
675 )
676 .await
677 };
678 Box::pin(fut)
679 }
680 }
681 let accept_compression_encodings = self.accept_compression_encodings;
682 let send_compression_encodings = self.send_compression_encodings;
683 let max_decoding_message_size = self.max_decoding_message_size;
684 let max_encoding_message_size = self.max_encoding_message_size;
685 let inner = self.inner.clone();
686 let fut = async move {
687 let method = GetMinUncommittedObjectIdSvc(inner);
688 let codec = tonic::codec::ProstCodec::default();
689 let mut grpc = tonic::server::Grpc::new(codec)
690 .apply_compression_config(
691 accept_compression_encodings,
692 send_compression_encodings,
693 )
694 .apply_max_message_size_config(
695 max_decoding_message_size,
696 max_encoding_message_size,
697 );
698 let res = grpc.unary(method, req).await;
699 Ok(res)
700 };
701 Box::pin(fut)
702 }
703 _ => {
704 Box::pin(async move {
705 let mut response = http::Response::new(empty_body());
706 let headers = response.headers_mut();
707 headers
708 .insert(
709 tonic::Status::GRPC_STATUS,
710 (tonic::Code::Unimplemented as i32).into(),
711 );
712 headers
713 .insert(
714 http::header::CONTENT_TYPE,
715 tonic::metadata::GRPC_CONTENT_TYPE,
716 );
717 Ok(response)
718 })
719 }
720 }
721 }
722 }
723 impl<T> Clone for StreamServiceServer<T> {
724 fn clone(&self) -> Self {
725 let inner = self.inner.clone();
726 Self {
727 inner,
728 accept_compression_encodings: self.accept_compression_encodings,
729 send_compression_encodings: self.send_compression_encodings,
730 max_decoding_message_size: self.max_decoding_message_size,
731 max_encoding_message_size: self.max_encoding_message_size,
732 }
733 }
734 }
735 pub const SERVICE_NAME: &str = "stream_service.StreamService";
737 impl<T> tonic::server::NamedService for StreamServiceServer<T> {
738 const NAME: &'static str = SERVICE_NAME;
739 }
740}