risingwave_pb/
connector_service.rs

1// This file is @generated by prost-build.
2#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct TableSchema {
5    #[prost(message, repeated, tag = "1")]
6    pub columns: ::prost::alloc::vec::Vec<super::plan_common::ColumnDesc>,
7    #[prost(uint32, repeated, tag = "2")]
8    pub pk_indices: ::prost::alloc::vec::Vec<u32>,
9}
10#[derive(prost_helpers::AnyPB)]
11#[derive(Clone, PartialEq, ::prost::Message)]
12pub struct ValidationError {
13    #[prost(string, tag = "1")]
14    pub error_message: ::prost::alloc::string::String,
15}
16#[derive(prost_helpers::AnyPB)]
17#[derive(Clone, PartialEq, ::prost::Message)]
18pub struct SinkParam {
19    #[prost(uint32, tag = "1", wrapper = "crate::id::SinkId")]
20    pub sink_id: crate::id::SinkId,
21    #[prost(btree_map = "string, string", tag = "2")]
22    pub properties: ::prost::alloc::collections::BTreeMap<
23        ::prost::alloc::string::String,
24        ::prost::alloc::string::String,
25    >,
26    #[prost(message, optional, tag = "3")]
27    pub table_schema: ::core::option::Option<TableSchema>,
28    /// to be deprecated
29    #[prost(enumeration = "super::catalog::SinkType", tag = "4")]
30    pub sink_type: i32,
31    #[prost(string, tag = "5")]
32    pub db_name: ::prost::alloc::string::String,
33    #[prost(string, tag = "6")]
34    pub sink_from_name: ::prost::alloc::string::String,
35    #[prost(message, optional, tag = "7")]
36    pub format_desc: ::core::option::Option<super::catalog::SinkFormatDesc>,
37    #[prost(string, tag = "8")]
38    pub sink_name: ::prost::alloc::string::String,
39    /// Backward-compatible replacement for `SINK_TYPE_FORCE_APPEND_ONLY`.
40    ///
41    /// Should not directly access this field. Use method `ignore_delete()` instead.
42    #[prost(bool, tag = "9")]
43    pub raw_ignore_delete: bool,
44}
45#[derive(prost_helpers::AnyPB)]
46#[derive(Clone, PartialEq, ::prost::Message)]
47pub struct SinkWriterStreamRequest {
48    #[prost(oneof = "sink_writer_stream_request::Request", tags = "1, 3, 4")]
49    pub request: ::core::option::Option<sink_writer_stream_request::Request>,
50}
51/// Nested message and enum types in `SinkWriterStreamRequest`.
52pub mod sink_writer_stream_request {
53    #[derive(prost_helpers::AnyPB)]
54    #[derive(Clone, PartialEq, ::prost::Message)]
55    pub struct StartSink {
56        #[prost(message, optional, tag = "1")]
57        pub sink_param: ::core::option::Option<super::SinkParam>,
58        #[prost(message, optional, tag = "3")]
59        pub payload_schema: ::core::option::Option<super::TableSchema>,
60    }
61    #[derive(prost_helpers::AnyPB)]
62    #[derive(Clone, PartialEq, ::prost::Message)]
63    pub struct WriteBatch {
64        #[prost(uint64, tag = "3")]
65        pub batch_id: u64,
66        #[prost(uint64, tag = "4")]
67        pub epoch: u64,
68        #[prost(oneof = "write_batch::Payload", tags = "2, 5")]
69        pub payload: ::core::option::Option<write_batch::Payload>,
70    }
71    /// Nested message and enum types in `WriteBatch`.
72    pub mod write_batch {
73        #[derive(prost_helpers::AnyPB)]
74        #[derive(Clone, PartialEq, ::prost::Message)]
75        pub struct StreamChunkPayload {
76            #[prost(bytes = "vec", tag = "1")]
77            pub binary_data: ::prost::alloc::vec::Vec<u8>,
78        }
79        #[derive(prost_helpers::AnyPB)]
80        #[derive(Clone, PartialEq, ::prost::Oneof)]
81        pub enum Payload {
82            #[prost(message, tag = "2")]
83            StreamChunkPayload(StreamChunkPayload),
84            /// This is a reference pointer to a StreamChunk. The StreamChunk is owned
85            /// by the JniSinkWriterStreamRequest, which should handle the release of StreamChunk.
86            /// Index set to 5 because 3 and 4 have been occupied by `batch_id` and `epoch`
87            #[prost(int64, tag = "5")]
88            StreamChunkRefPointer(i64),
89        }
90    }
91    #[derive(prost_helpers::AnyPB)]
92    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
93    pub struct Barrier {
94        #[prost(uint64, tag = "1")]
95        pub epoch: u64,
96        #[prost(bool, tag = "2")]
97        pub is_checkpoint: bool,
98    }
99    #[derive(prost_helpers::AnyPB)]
100    #[derive(Clone, PartialEq, ::prost::Oneof)]
101    pub enum Request {
102        #[prost(message, tag = "1")]
103        Start(StartSink),
104        #[prost(message, tag = "3")]
105        WriteBatch(WriteBatch),
106        #[prost(message, tag = "4")]
107        Barrier(Barrier),
108    }
109}
110#[derive(prost_helpers::AnyPB)]
111#[derive(Clone, PartialEq, ::prost::Message)]
112pub struct SinkWriterStreamResponse {
113    #[prost(oneof = "sink_writer_stream_response::Response", tags = "1, 2, 3")]
114    pub response: ::core::option::Option<sink_writer_stream_response::Response>,
115}
116/// Nested message and enum types in `SinkWriterStreamResponse`.
117pub mod sink_writer_stream_response {
118    #[derive(prost_helpers::AnyPB)]
119    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
120    pub struct StartResponse {}
121    #[derive(prost_helpers::AnyPB)]
122    #[derive(Clone, PartialEq, ::prost::Message)]
123    pub struct CommitResponse {
124        #[prost(uint64, tag = "1")]
125        pub epoch: u64,
126        #[prost(message, optional, tag = "2")]
127        pub metadata: ::core::option::Option<super::SinkMetadata>,
128    }
129    #[derive(prost_helpers::AnyPB)]
130    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
131    pub struct BatchWrittenResponse {
132        #[prost(uint64, tag = "1")]
133        pub epoch: u64,
134        #[prost(uint64, tag = "2")]
135        pub batch_id: u64,
136    }
137    #[derive(prost_helpers::AnyPB)]
138    #[derive(Clone, PartialEq, ::prost::Oneof)]
139    pub enum Response {
140        #[prost(message, tag = "1")]
141        Start(StartResponse),
142        #[prost(message, tag = "2")]
143        Commit(CommitResponse),
144        #[prost(message, tag = "3")]
145        Batch(BatchWrittenResponse),
146    }
147}
148#[derive(prost_helpers::AnyPB)]
149#[derive(Clone, PartialEq, ::prost::Message)]
150pub struct ValidateSinkRequest {
151    #[prost(message, optional, tag = "1")]
152    pub sink_param: ::core::option::Option<SinkParam>,
153}
154#[derive(prost_helpers::AnyPB)]
155#[derive(Clone, PartialEq, ::prost::Message)]
156pub struct ValidateSinkResponse {
157    /// On validation failure, we return the error.
158    #[prost(message, optional, tag = "1")]
159    pub error: ::core::option::Option<ValidationError>,
160}
161#[derive(prost_helpers::AnyPB)]
162#[derive(Clone, PartialEq, ::prost::Message)]
163pub struct SinkMetadata {
164    #[prost(oneof = "sink_metadata::Metadata", tags = "1")]
165    pub metadata: ::core::option::Option<sink_metadata::Metadata>,
166}
167/// Nested message and enum types in `SinkMetadata`.
168pub mod sink_metadata {
169    #[derive(prost_helpers::AnyPB)]
170    #[derive(Clone, PartialEq, ::prost::Message)]
171    pub struct SerializedMetadata {
172        #[prost(bytes = "vec", tag = "1")]
173        pub metadata: ::prost::alloc::vec::Vec<u8>,
174    }
175    #[derive(prost_helpers::AnyPB)]
176    #[derive(Clone, PartialEq, ::prost::Oneof)]
177    pub enum Metadata {
178        #[prost(message, tag = "1")]
179        Serialized(SerializedMetadata),
180    }
181}
182#[derive(prost_helpers::AnyPB)]
183#[derive(Clone, PartialEq, ::prost::Message)]
184pub struct SinkCoordinatorStreamRequest {
185    #[prost(oneof = "sink_coordinator_stream_request::Request", tags = "1, 2")]
186    pub request: ::core::option::Option<sink_coordinator_stream_request::Request>,
187}
188/// Nested message and enum types in `SinkCoordinatorStreamRequest`.
189pub mod sink_coordinator_stream_request {
190    #[derive(prost_helpers::AnyPB)]
191    #[derive(Clone, PartialEq, ::prost::Message)]
192    pub struct StartCoordinator {
193        #[prost(message, optional, tag = "1")]
194        pub param: ::core::option::Option<super::SinkParam>,
195    }
196    #[derive(prost_helpers::AnyPB)]
197    #[derive(Clone, PartialEq, ::prost::Message)]
198    pub struct CommitMetadata {
199        #[prost(uint64, tag = "1")]
200        pub epoch: u64,
201        #[prost(message, repeated, tag = "2")]
202        pub metadata: ::prost::alloc::vec::Vec<super::SinkMetadata>,
203    }
204    #[derive(prost_helpers::AnyPB)]
205    #[derive(Clone, PartialEq, ::prost::Oneof)]
206    pub enum Request {
207        #[prost(message, tag = "1")]
208        Start(StartCoordinator),
209        #[prost(message, tag = "2")]
210        Commit(CommitMetadata),
211    }
212}
213#[derive(prost_helpers::AnyPB)]
214#[derive(Clone, Copy, PartialEq, ::prost::Message)]
215pub struct SinkCoordinatorStreamResponse {
216    #[prost(oneof = "sink_coordinator_stream_response::Response", tags = "1, 2")]
217    pub response: ::core::option::Option<sink_coordinator_stream_response::Response>,
218}
219/// Nested message and enum types in `SinkCoordinatorStreamResponse`.
220pub mod sink_coordinator_stream_response {
221    #[derive(prost_helpers::AnyPB)]
222    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
223    pub struct StartResponse {}
224    #[derive(prost_helpers::AnyPB)]
225    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
226    pub struct CommitResponse {
227        #[prost(uint64, tag = "1")]
228        pub epoch: u64,
229    }
230    #[derive(prost_helpers::AnyPB)]
231    #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
232    pub enum Response {
233        #[prost(message, tag = "1")]
234        Start(StartResponse),
235        #[prost(message, tag = "2")]
236        Commit(CommitResponse),
237    }
238}
239#[derive(prost_helpers::AnyPB)]
240#[derive(Clone, PartialEq, ::prost::Message)]
241pub struct CdcMessage {
242    /// The value of the Debezium message
243    #[prost(string, tag = "1")]
244    pub payload: ::prost::alloc::string::String,
245    #[prost(string, tag = "2")]
246    pub partition: ::prost::alloc::string::String,
247    #[prost(string, tag = "3")]
248    pub offset: ::prost::alloc::string::String,
249    #[prost(string, tag = "4")]
250    pub full_table_name: ::prost::alloc::string::String,
251    #[prost(int64, tag = "5")]
252    pub source_ts_ms: i64,
253    #[prost(enumeration = "cdc_message::CdcMessageType", tag = "6")]
254    pub msg_type: i32,
255    /// The key of the Debezium message, which only used by `mongodb-cdc` connector.
256    #[prost(string, tag = "7")]
257    pub key: ::prost::alloc::string::String,
258}
259/// Nested message and enum types in `CdcMessage`.
260pub mod cdc_message {
261    #[derive(prost_helpers::AnyPB)]
262    #[derive(
263        Clone,
264        Copy,
265        Debug,
266        PartialEq,
267        Eq,
268        Hash,
269        PartialOrd,
270        Ord,
271        ::prost::Enumeration
272    )]
273    #[repr(i32)]
274    pub enum CdcMessageType {
275        Unspecified = 0,
276        Heartbeat = 1,
277        Data = 2,
278        TransactionMeta = 3,
279        SchemaChange = 4,
280    }
281    impl CdcMessageType {
282        /// String value of the enum field names used in the ProtoBuf definition.
283        ///
284        /// The values are not transformed in any way and thus are considered stable
285        /// (if the ProtoBuf definition does not change) and safe for programmatic use.
286        pub fn as_str_name(&self) -> &'static str {
287            match self {
288                Self::Unspecified => "UNSPECIFIED",
289                Self::Heartbeat => "HEARTBEAT",
290                Self::Data => "DATA",
291                Self::TransactionMeta => "TRANSACTION_META",
292                Self::SchemaChange => "SCHEMA_CHANGE",
293            }
294        }
295        /// Creates an enum from field names used in the ProtoBuf definition.
296        pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
297            match value {
298                "UNSPECIFIED" => Some(Self::Unspecified),
299                "HEARTBEAT" => Some(Self::Heartbeat),
300                "DATA" => Some(Self::Data),
301                "TRANSACTION_META" => Some(Self::TransactionMeta),
302                "SCHEMA_CHANGE" => Some(Self::SchemaChange),
303                _ => None,
304            }
305        }
306    }
307}
308#[derive(prost_helpers::AnyPB)]
309#[derive(Clone, PartialEq, ::prost::Message)]
310pub struct GetEventStreamRequest {
311    #[prost(uint64, tag = "1")]
312    pub source_id: u64,
313    #[prost(enumeration = "SourceType", tag = "2")]
314    pub source_type: i32,
315    #[prost(string, tag = "3")]
316    pub start_offset: ::prost::alloc::string::String,
317    #[prost(btree_map = "string, string", tag = "4")]
318    pub properties: ::prost::alloc::collections::BTreeMap<
319        ::prost::alloc::string::String,
320        ::prost::alloc::string::String,
321    >,
322    #[prost(bool, tag = "5")]
323    pub snapshot_done: bool,
324    #[prost(bool, tag = "6")]
325    pub is_source_job: bool,
326}
327#[derive(prost_helpers::AnyPB)]
328#[derive(Clone, PartialEq, ::prost::Message)]
329pub struct GetEventStreamResponse {
330    #[prost(uint64, tag = "1")]
331    pub source_id: u64,
332    #[prost(message, repeated, tag = "2")]
333    pub events: ::prost::alloc::vec::Vec<CdcMessage>,
334    #[prost(message, optional, tag = "3")]
335    pub control: ::core::option::Option<get_event_stream_response::ControlInfo>,
336}
337/// Nested message and enum types in `GetEventStreamResponse`.
338pub mod get_event_stream_response {
339    #[derive(prost_helpers::AnyPB)]
340    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
341    pub struct ControlInfo {
342        #[prost(bool, tag = "1")]
343        pub handshake_ok: bool,
344    }
345}
346#[derive(prost_helpers::AnyPB)]
347#[derive(Clone, PartialEq, ::prost::Message)]
348pub struct ValidateSourceRequest {
349    #[prost(uint64, tag = "1")]
350    pub source_id: u64,
351    #[prost(enumeration = "SourceType", tag = "2")]
352    pub source_type: i32,
353    #[prost(btree_map = "string, string", tag = "3")]
354    pub properties: ::prost::alloc::collections::BTreeMap<
355        ::prost::alloc::string::String,
356        ::prost::alloc::string::String,
357    >,
358    #[prost(message, optional, tag = "4")]
359    pub table_schema: ::core::option::Option<TableSchema>,
360    #[prost(bool, tag = "5")]
361    pub is_source_job: bool,
362    #[prost(bool, tag = "6")]
363    pub is_backfill_table: bool,
364}
365#[derive(prost_helpers::AnyPB)]
366#[derive(Clone, PartialEq, ::prost::Message)]
367pub struct ValidateSourceResponse {
368    /// On validation failure, we return the error.
369    #[prost(message, optional, tag = "1")]
370    pub error: ::core::option::Option<ValidationError>,
371}
372#[derive(prost_helpers::AnyPB)]
373#[derive(Clone, PartialEq, ::prost::Message)]
374pub struct CoordinateRequest {
375    #[prost(oneof = "coordinate_request::Msg", tags = "1, 2, 3, 4, 5")]
376    pub msg: ::core::option::Option<coordinate_request::Msg>,
377}
378/// Nested message and enum types in `CoordinateRequest`.
379pub mod coordinate_request {
380    /// The first request that starts a coordination between sink writer and coordinator.
381    /// The service will respond after sink writers of all vnodes have sent the request.
382    #[derive(prost_helpers::AnyPB)]
383    #[derive(Clone, PartialEq, ::prost::Message)]
384    pub struct StartCoordinationRequest {
385        #[prost(message, optional, tag = "1")]
386        pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
387        #[prost(message, optional, tag = "2")]
388        pub param: ::core::option::Option<super::SinkParam>,
389    }
390    #[derive(prost_helpers::AnyPB)]
391    #[derive(Clone, PartialEq, ::prost::Message)]
392    pub struct CommitRequest {
393        #[prost(uint64, tag = "1")]
394        pub epoch: u64,
395        #[prost(message, optional, tag = "2")]
396        pub metadata: ::core::option::Option<super::SinkMetadata>,
397        #[prost(message, optional, tag = "3")]
398        pub schema_change: ::core::option::Option<
399            super::super::stream_plan::SinkSchemaChange,
400        >,
401    }
402    #[derive(prost_helpers::AnyPB)]
403    #[derive(Clone, PartialEq, ::prost::Message)]
404    pub struct UpdateVnodeBitmapRequest {
405        #[prost(message, optional, tag = "1")]
406        pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
407    }
408    #[derive(prost_helpers::AnyPB)]
409    #[derive(Clone, PartialEq, ::prost::Oneof)]
410    pub enum Msg {
411        #[prost(message, tag = "1")]
412        StartRequest(StartCoordinationRequest),
413        #[prost(message, tag = "2")]
414        CommitRequest(CommitRequest),
415        #[prost(message, tag = "3")]
416        UpdateVnodeRequest(UpdateVnodeBitmapRequest),
417        #[prost(bool, tag = "4")]
418        Stop(bool),
419        #[prost(uint64, tag = "5")]
420        AlignInitialEpochRequest(u64),
421    }
422}
423#[derive(prost_helpers::AnyPB)]
424#[derive(Clone, Copy, PartialEq, ::prost::Message)]
425pub struct CoordinateResponse {
426    #[prost(oneof = "coordinate_response::Msg", tags = "1, 2, 3, 4")]
427    pub msg: ::core::option::Option<coordinate_response::Msg>,
428}
429/// Nested message and enum types in `CoordinateResponse`.
430pub mod coordinate_response {
431    #[derive(prost_helpers::AnyPB)]
432    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
433    pub struct StartCoordinationResponse {
434        #[prost(uint64, optional, tag = "1")]
435        pub log_store_rewind_start_epoch: ::core::option::Option<u64>,
436    }
437    #[derive(prost_helpers::AnyPB)]
438    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
439    pub struct CommitResponse {
440        #[prost(uint64, tag = "1")]
441        pub epoch: u64,
442    }
443    #[derive(prost_helpers::AnyPB)]
444    #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
445    pub enum Msg {
446        #[prost(message, tag = "1")]
447        StartResponse(StartCoordinationResponse),
448        #[prost(message, tag = "2")]
449        CommitResponse(CommitResponse),
450        #[prost(bool, tag = "3")]
451        Stopped(bool),
452        #[prost(uint64, tag = "4")]
453        AlignInitialEpochResponse(u64),
454    }
455}
456#[derive(prost_helpers::AnyPB)]
457#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
458#[repr(i32)]
459pub enum SourceType {
460    Unspecified = 0,
461    Mysql = 1,
462    Postgres = 2,
463    Citus = 3,
464    Mongodb = 4,
465    SqlServer = 5,
466}
467impl SourceType {
468    /// String value of the enum field names used in the ProtoBuf definition.
469    ///
470    /// The values are not transformed in any way and thus are considered stable
471    /// (if the ProtoBuf definition does not change) and safe for programmatic use.
472    pub fn as_str_name(&self) -> &'static str {
473        match self {
474            Self::Unspecified => "UNSPECIFIED",
475            Self::Mysql => "MYSQL",
476            Self::Postgres => "POSTGRES",
477            Self::Citus => "CITUS",
478            Self::Mongodb => "MONGODB",
479            Self::SqlServer => "SQL_SERVER",
480        }
481    }
482    /// Creates an enum from field names used in the ProtoBuf definition.
483    pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
484        match value {
485            "UNSPECIFIED" => Some(Self::Unspecified),
486            "MYSQL" => Some(Self::Mysql),
487            "POSTGRES" => Some(Self::Postgres),
488            "CITUS" => Some(Self::Citus),
489            "MONGODB" => Some(Self::Mongodb),
490            "SQL_SERVER" => Some(Self::SqlServer),
491            _ => None,
492        }
493    }
494}
495/// Generated client implementations.
496pub mod connector_service_client {
497    #![allow(
498        unused_variables,
499        dead_code,
500        missing_docs,
501        clippy::wildcard_imports,
502        clippy::let_unit_value,
503    )]
504    use tonic::codegen::*;
505    use tonic::codegen::http::Uri;
506    #[derive(Debug, Clone)]
507    pub struct ConnectorServiceClient<T> {
508        inner: tonic::client::Grpc<T>,
509    }
510    impl ConnectorServiceClient<tonic::transport::Channel> {
511        /// Attempt to create a new client by connecting to a given endpoint.
512        pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
513        where
514            D: TryInto<tonic::transport::Endpoint>,
515            D::Error: Into<StdError>,
516        {
517            let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
518            Ok(Self::new(conn))
519        }
520    }
521    impl<T> ConnectorServiceClient<T>
522    where
523        T: tonic::client::GrpcService<tonic::body::BoxBody>,
524        T::Error: Into<StdError>,
525        T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
526        <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
527    {
528        pub fn new(inner: T) -> Self {
529            let inner = tonic::client::Grpc::new(inner);
530            Self { inner }
531        }
532        pub fn with_origin(inner: T, origin: Uri) -> Self {
533            let inner = tonic::client::Grpc::with_origin(inner, origin);
534            Self { inner }
535        }
536        pub fn with_interceptor<F>(
537            inner: T,
538            interceptor: F,
539        ) -> ConnectorServiceClient<InterceptedService<T, F>>
540        where
541            F: tonic::service::Interceptor,
542            T::ResponseBody: Default,
543            T: tonic::codegen::Service<
544                http::Request<tonic::body::BoxBody>,
545                Response = http::Response<
546                    <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
547                >,
548            >,
549            <T as tonic::codegen::Service<
550                http::Request<tonic::body::BoxBody>,
551            >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
552        {
553            ConnectorServiceClient::new(InterceptedService::new(inner, interceptor))
554        }
555        /// Compress requests with the given encoding.
556        ///
557        /// This requires the server to support it otherwise it might respond with an
558        /// error.
559        #[must_use]
560        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
561            self.inner = self.inner.send_compressed(encoding);
562            self
563        }
564        /// Enable decompressing responses.
565        #[must_use]
566        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
567            self.inner = self.inner.accept_compressed(encoding);
568            self
569        }
570        /// Limits the maximum size of a decoded message.
571        ///
572        /// Default: `4MB`
573        #[must_use]
574        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
575            self.inner = self.inner.max_decoding_message_size(limit);
576            self
577        }
578        /// Limits the maximum size of an encoded message.
579        ///
580        /// Default: `usize::MAX`
581        #[must_use]
582        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
583            self.inner = self.inner.max_encoding_message_size(limit);
584            self
585        }
586        pub async fn sink_writer_stream(
587            &mut self,
588            request: impl tonic::IntoStreamingRequest<
589                Message = super::SinkWriterStreamRequest,
590            >,
591        ) -> std::result::Result<
592            tonic::Response<tonic::codec::Streaming<super::SinkWriterStreamResponse>>,
593            tonic::Status,
594        > {
595            self.inner
596                .ready()
597                .await
598                .map_err(|e| {
599                    tonic::Status::unknown(
600                        format!("Service was not ready: {}", e.into()),
601                    )
602                })?;
603            let codec = tonic::codec::ProstCodec::default();
604            let path = http::uri::PathAndQuery::from_static(
605                "/connector_service.ConnectorService/SinkWriterStream",
606            );
607            let mut req = request.into_streaming_request();
608            req.extensions_mut()
609                .insert(
610                    GrpcMethod::new(
611                        "connector_service.ConnectorService",
612                        "SinkWriterStream",
613                    ),
614                );
615            self.inner.streaming(req, path, codec).await
616        }
617        pub async fn sink_coordinator_stream(
618            &mut self,
619            request: impl tonic::IntoStreamingRequest<
620                Message = super::SinkCoordinatorStreamRequest,
621            >,
622        ) -> std::result::Result<
623            tonic::Response<
624                tonic::codec::Streaming<super::SinkCoordinatorStreamResponse>,
625            >,
626            tonic::Status,
627        > {
628            self.inner
629                .ready()
630                .await
631                .map_err(|e| {
632                    tonic::Status::unknown(
633                        format!("Service was not ready: {}", e.into()),
634                    )
635                })?;
636            let codec = tonic::codec::ProstCodec::default();
637            let path = http::uri::PathAndQuery::from_static(
638                "/connector_service.ConnectorService/SinkCoordinatorStream",
639            );
640            let mut req = request.into_streaming_request();
641            req.extensions_mut()
642                .insert(
643                    GrpcMethod::new(
644                        "connector_service.ConnectorService",
645                        "SinkCoordinatorStream",
646                    ),
647                );
648            self.inner.streaming(req, path, codec).await
649        }
650        pub async fn validate_sink(
651            &mut self,
652            request: impl tonic::IntoRequest<super::ValidateSinkRequest>,
653        ) -> std::result::Result<
654            tonic::Response<super::ValidateSinkResponse>,
655            tonic::Status,
656        > {
657            self.inner
658                .ready()
659                .await
660                .map_err(|e| {
661                    tonic::Status::unknown(
662                        format!("Service was not ready: {}", e.into()),
663                    )
664                })?;
665            let codec = tonic::codec::ProstCodec::default();
666            let path = http::uri::PathAndQuery::from_static(
667                "/connector_service.ConnectorService/ValidateSink",
668            );
669            let mut req = request.into_request();
670            req.extensions_mut()
671                .insert(
672                    GrpcMethod::new("connector_service.ConnectorService", "ValidateSink"),
673                );
674            self.inner.unary(req, path, codec).await
675        }
676        pub async fn get_event_stream(
677            &mut self,
678            request: impl tonic::IntoRequest<super::GetEventStreamRequest>,
679        ) -> std::result::Result<
680            tonic::Response<tonic::codec::Streaming<super::GetEventStreamResponse>>,
681            tonic::Status,
682        > {
683            self.inner
684                .ready()
685                .await
686                .map_err(|e| {
687                    tonic::Status::unknown(
688                        format!("Service was not ready: {}", e.into()),
689                    )
690                })?;
691            let codec = tonic::codec::ProstCodec::default();
692            let path = http::uri::PathAndQuery::from_static(
693                "/connector_service.ConnectorService/GetEventStream",
694            );
695            let mut req = request.into_request();
696            req.extensions_mut()
697                .insert(
698                    GrpcMethod::new(
699                        "connector_service.ConnectorService",
700                        "GetEventStream",
701                    ),
702                );
703            self.inner.server_streaming(req, path, codec).await
704        }
705        pub async fn validate_source(
706            &mut self,
707            request: impl tonic::IntoRequest<super::ValidateSourceRequest>,
708        ) -> std::result::Result<
709            tonic::Response<super::ValidateSourceResponse>,
710            tonic::Status,
711        > {
712            self.inner
713                .ready()
714                .await
715                .map_err(|e| {
716                    tonic::Status::unknown(
717                        format!("Service was not ready: {}", e.into()),
718                    )
719                })?;
720            let codec = tonic::codec::ProstCodec::default();
721            let path = http::uri::PathAndQuery::from_static(
722                "/connector_service.ConnectorService/ValidateSource",
723            );
724            let mut req = request.into_request();
725            req.extensions_mut()
726                .insert(
727                    GrpcMethod::new(
728                        "connector_service.ConnectorService",
729                        "ValidateSource",
730                    ),
731                );
732            self.inner.unary(req, path, codec).await
733        }
734    }
735}
736/// Generated client implementations.
737pub mod sink_coordination_service_client {
738    #![allow(
739        unused_variables,
740        dead_code,
741        missing_docs,
742        clippy::wildcard_imports,
743        clippy::let_unit_value,
744    )]
745    use tonic::codegen::*;
746    use tonic::codegen::http::Uri;
747    #[derive(Debug, Clone)]
748    pub struct SinkCoordinationServiceClient<T> {
749        inner: tonic::client::Grpc<T>,
750    }
751    impl SinkCoordinationServiceClient<tonic::transport::Channel> {
752        /// Attempt to create a new client by connecting to a given endpoint.
753        pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
754        where
755            D: TryInto<tonic::transport::Endpoint>,
756            D::Error: Into<StdError>,
757        {
758            let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
759            Ok(Self::new(conn))
760        }
761    }
762    impl<T> SinkCoordinationServiceClient<T>
763    where
764        T: tonic::client::GrpcService<tonic::body::BoxBody>,
765        T::Error: Into<StdError>,
766        T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
767        <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
768    {
769        pub fn new(inner: T) -> Self {
770            let inner = tonic::client::Grpc::new(inner);
771            Self { inner }
772        }
773        pub fn with_origin(inner: T, origin: Uri) -> Self {
774            let inner = tonic::client::Grpc::with_origin(inner, origin);
775            Self { inner }
776        }
777        pub fn with_interceptor<F>(
778            inner: T,
779            interceptor: F,
780        ) -> SinkCoordinationServiceClient<InterceptedService<T, F>>
781        where
782            F: tonic::service::Interceptor,
783            T::ResponseBody: Default,
784            T: tonic::codegen::Service<
785                http::Request<tonic::body::BoxBody>,
786                Response = http::Response<
787                    <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
788                >,
789            >,
790            <T as tonic::codegen::Service<
791                http::Request<tonic::body::BoxBody>,
792            >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
793        {
794            SinkCoordinationServiceClient::new(
795                InterceptedService::new(inner, interceptor),
796            )
797        }
798        /// Compress requests with the given encoding.
799        ///
800        /// This requires the server to support it otherwise it might respond with an
801        /// error.
802        #[must_use]
803        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
804            self.inner = self.inner.send_compressed(encoding);
805            self
806        }
807        /// Enable decompressing responses.
808        #[must_use]
809        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
810            self.inner = self.inner.accept_compressed(encoding);
811            self
812        }
813        /// Limits the maximum size of a decoded message.
814        ///
815        /// Default: `4MB`
816        #[must_use]
817        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
818            self.inner = self.inner.max_decoding_message_size(limit);
819            self
820        }
821        /// Limits the maximum size of an encoded message.
822        ///
823        /// Default: `usize::MAX`
824        #[must_use]
825        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
826            self.inner = self.inner.max_encoding_message_size(limit);
827            self
828        }
829        pub async fn coordinate(
830            &mut self,
831            request: impl tonic::IntoStreamingRequest<Message = super::CoordinateRequest>,
832        ) -> std::result::Result<
833            tonic::Response<tonic::codec::Streaming<super::CoordinateResponse>>,
834            tonic::Status,
835        > {
836            self.inner
837                .ready()
838                .await
839                .map_err(|e| {
840                    tonic::Status::unknown(
841                        format!("Service was not ready: {}", e.into()),
842                    )
843                })?;
844            let codec = tonic::codec::ProstCodec::default();
845            let path = http::uri::PathAndQuery::from_static(
846                "/connector_service.SinkCoordinationService/Coordinate",
847            );
848            let mut req = request.into_streaming_request();
849            req.extensions_mut()
850                .insert(
851                    GrpcMethod::new(
852                        "connector_service.SinkCoordinationService",
853                        "Coordinate",
854                    ),
855                );
856            self.inner.streaming(req, path, codec).await
857        }
858    }
859}
860/// Generated server implementations.
861pub mod connector_service_server {
862    #![allow(
863        unused_variables,
864        dead_code,
865        missing_docs,
866        clippy::wildcard_imports,
867        clippy::let_unit_value,
868    )]
869    use tonic::codegen::*;
870    /// Generated trait containing gRPC methods that should be implemented for use with ConnectorServiceServer.
871    #[async_trait]
872    pub trait ConnectorService: std::marker::Send + std::marker::Sync + 'static {
873        /// Server streaming response type for the SinkWriterStream method.
874        type SinkWriterStreamStream: tonic::codegen::tokio_stream::Stream<
875                Item = std::result::Result<
876                    super::SinkWriterStreamResponse,
877                    tonic::Status,
878                >,
879            >
880            + std::marker::Send
881            + 'static;
882        async fn sink_writer_stream(
883            &self,
884            request: tonic::Request<tonic::Streaming<super::SinkWriterStreamRequest>>,
885        ) -> std::result::Result<
886            tonic::Response<Self::SinkWriterStreamStream>,
887            tonic::Status,
888        >;
889        /// Server streaming response type for the SinkCoordinatorStream method.
890        type SinkCoordinatorStreamStream: tonic::codegen::tokio_stream::Stream<
891                Item = std::result::Result<
892                    super::SinkCoordinatorStreamResponse,
893                    tonic::Status,
894                >,
895            >
896            + std::marker::Send
897            + 'static;
898        async fn sink_coordinator_stream(
899            &self,
900            request: tonic::Request<
901                tonic::Streaming<super::SinkCoordinatorStreamRequest>,
902            >,
903        ) -> std::result::Result<
904            tonic::Response<Self::SinkCoordinatorStreamStream>,
905            tonic::Status,
906        >;
907        async fn validate_sink(
908            &self,
909            request: tonic::Request<super::ValidateSinkRequest>,
910        ) -> std::result::Result<
911            tonic::Response<super::ValidateSinkResponse>,
912            tonic::Status,
913        >;
914        /// Server streaming response type for the GetEventStream method.
915        type GetEventStreamStream: tonic::codegen::tokio_stream::Stream<
916                Item = std::result::Result<super::GetEventStreamResponse, tonic::Status>,
917            >
918            + std::marker::Send
919            + 'static;
920        async fn get_event_stream(
921            &self,
922            request: tonic::Request<super::GetEventStreamRequest>,
923        ) -> std::result::Result<
924            tonic::Response<Self::GetEventStreamStream>,
925            tonic::Status,
926        >;
927        async fn validate_source(
928            &self,
929            request: tonic::Request<super::ValidateSourceRequest>,
930        ) -> std::result::Result<
931            tonic::Response<super::ValidateSourceResponse>,
932            tonic::Status,
933        >;
934    }
935    #[derive(Debug)]
936    pub struct ConnectorServiceServer<T> {
937        inner: Arc<T>,
938        accept_compression_encodings: EnabledCompressionEncodings,
939        send_compression_encodings: EnabledCompressionEncodings,
940        max_decoding_message_size: Option<usize>,
941        max_encoding_message_size: Option<usize>,
942    }
943    impl<T> ConnectorServiceServer<T> {
944        pub fn new(inner: T) -> Self {
945            Self::from_arc(Arc::new(inner))
946        }
947        pub fn from_arc(inner: Arc<T>) -> Self {
948            Self {
949                inner,
950                accept_compression_encodings: Default::default(),
951                send_compression_encodings: Default::default(),
952                max_decoding_message_size: None,
953                max_encoding_message_size: None,
954            }
955        }
956        pub fn with_interceptor<F>(
957            inner: T,
958            interceptor: F,
959        ) -> InterceptedService<Self, F>
960        where
961            F: tonic::service::Interceptor,
962        {
963            InterceptedService::new(Self::new(inner), interceptor)
964        }
965        /// Enable decompressing requests with the given encoding.
966        #[must_use]
967        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
968            self.accept_compression_encodings.enable(encoding);
969            self
970        }
971        /// Compress responses with the given encoding, if the client supports it.
972        #[must_use]
973        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
974            self.send_compression_encodings.enable(encoding);
975            self
976        }
977        /// Limits the maximum size of a decoded message.
978        ///
979        /// Default: `4MB`
980        #[must_use]
981        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
982            self.max_decoding_message_size = Some(limit);
983            self
984        }
985        /// Limits the maximum size of an encoded message.
986        ///
987        /// Default: `usize::MAX`
988        #[must_use]
989        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
990            self.max_encoding_message_size = Some(limit);
991            self
992        }
993    }
994    impl<T, B> tonic::codegen::Service<http::Request<B>> for ConnectorServiceServer<T>
995    where
996        T: ConnectorService,
997        B: Body + std::marker::Send + 'static,
998        B::Error: Into<StdError> + std::marker::Send + 'static,
999    {
1000        type Response = http::Response<tonic::body::BoxBody>;
1001        type Error = std::convert::Infallible;
1002        type Future = BoxFuture<Self::Response, Self::Error>;
1003        fn poll_ready(
1004            &mut self,
1005            _cx: &mut Context<'_>,
1006        ) -> Poll<std::result::Result<(), Self::Error>> {
1007            Poll::Ready(Ok(()))
1008        }
1009        fn call(&mut self, req: http::Request<B>) -> Self::Future {
1010            match req.uri().path() {
1011                "/connector_service.ConnectorService/SinkWriterStream" => {
1012                    #[allow(non_camel_case_types)]
1013                    struct SinkWriterStreamSvc<T: ConnectorService>(pub Arc<T>);
1014                    impl<
1015                        T: ConnectorService,
1016                    > tonic::server::StreamingService<super::SinkWriterStreamRequest>
1017                    for SinkWriterStreamSvc<T> {
1018                        type Response = super::SinkWriterStreamResponse;
1019                        type ResponseStream = T::SinkWriterStreamStream;
1020                        type Future = BoxFuture<
1021                            tonic::Response<Self::ResponseStream>,
1022                            tonic::Status,
1023                        >;
1024                        fn call(
1025                            &mut self,
1026                            request: tonic::Request<
1027                                tonic::Streaming<super::SinkWriterStreamRequest>,
1028                            >,
1029                        ) -> Self::Future {
1030                            let inner = Arc::clone(&self.0);
1031                            let fut = async move {
1032                                <T as ConnectorService>::sink_writer_stream(&inner, request)
1033                                    .await
1034                            };
1035                            Box::pin(fut)
1036                        }
1037                    }
1038                    let accept_compression_encodings = self.accept_compression_encodings;
1039                    let send_compression_encodings = self.send_compression_encodings;
1040                    let max_decoding_message_size = self.max_decoding_message_size;
1041                    let max_encoding_message_size = self.max_encoding_message_size;
1042                    let inner = self.inner.clone();
1043                    let fut = async move {
1044                        let method = SinkWriterStreamSvc(inner);
1045                        let codec = tonic::codec::ProstCodec::default();
1046                        let mut grpc = tonic::server::Grpc::new(codec)
1047                            .apply_compression_config(
1048                                accept_compression_encodings,
1049                                send_compression_encodings,
1050                            )
1051                            .apply_max_message_size_config(
1052                                max_decoding_message_size,
1053                                max_encoding_message_size,
1054                            );
1055                        let res = grpc.streaming(method, req).await;
1056                        Ok(res)
1057                    };
1058                    Box::pin(fut)
1059                }
1060                "/connector_service.ConnectorService/SinkCoordinatorStream" => {
1061                    #[allow(non_camel_case_types)]
1062                    struct SinkCoordinatorStreamSvc<T: ConnectorService>(pub Arc<T>);
1063                    impl<
1064                        T: ConnectorService,
1065                    > tonic::server::StreamingService<
1066                        super::SinkCoordinatorStreamRequest,
1067                    > for SinkCoordinatorStreamSvc<T> {
1068                        type Response = super::SinkCoordinatorStreamResponse;
1069                        type ResponseStream = T::SinkCoordinatorStreamStream;
1070                        type Future = BoxFuture<
1071                            tonic::Response<Self::ResponseStream>,
1072                            tonic::Status,
1073                        >;
1074                        fn call(
1075                            &mut self,
1076                            request: tonic::Request<
1077                                tonic::Streaming<super::SinkCoordinatorStreamRequest>,
1078                            >,
1079                        ) -> Self::Future {
1080                            let inner = Arc::clone(&self.0);
1081                            let fut = async move {
1082                                <T as ConnectorService>::sink_coordinator_stream(
1083                                        &inner,
1084                                        request,
1085                                    )
1086                                    .await
1087                            };
1088                            Box::pin(fut)
1089                        }
1090                    }
1091                    let accept_compression_encodings = self.accept_compression_encodings;
1092                    let send_compression_encodings = self.send_compression_encodings;
1093                    let max_decoding_message_size = self.max_decoding_message_size;
1094                    let max_encoding_message_size = self.max_encoding_message_size;
1095                    let inner = self.inner.clone();
1096                    let fut = async move {
1097                        let method = SinkCoordinatorStreamSvc(inner);
1098                        let codec = tonic::codec::ProstCodec::default();
1099                        let mut grpc = tonic::server::Grpc::new(codec)
1100                            .apply_compression_config(
1101                                accept_compression_encodings,
1102                                send_compression_encodings,
1103                            )
1104                            .apply_max_message_size_config(
1105                                max_decoding_message_size,
1106                                max_encoding_message_size,
1107                            );
1108                        let res = grpc.streaming(method, req).await;
1109                        Ok(res)
1110                    };
1111                    Box::pin(fut)
1112                }
1113                "/connector_service.ConnectorService/ValidateSink" => {
1114                    #[allow(non_camel_case_types)]
1115                    struct ValidateSinkSvc<T: ConnectorService>(pub Arc<T>);
1116                    impl<
1117                        T: ConnectorService,
1118                    > tonic::server::UnaryService<super::ValidateSinkRequest>
1119                    for ValidateSinkSvc<T> {
1120                        type Response = super::ValidateSinkResponse;
1121                        type Future = BoxFuture<
1122                            tonic::Response<Self::Response>,
1123                            tonic::Status,
1124                        >;
1125                        fn call(
1126                            &mut self,
1127                            request: tonic::Request<super::ValidateSinkRequest>,
1128                        ) -> Self::Future {
1129                            let inner = Arc::clone(&self.0);
1130                            let fut = async move {
1131                                <T as ConnectorService>::validate_sink(&inner, request)
1132                                    .await
1133                            };
1134                            Box::pin(fut)
1135                        }
1136                    }
1137                    let accept_compression_encodings = self.accept_compression_encodings;
1138                    let send_compression_encodings = self.send_compression_encodings;
1139                    let max_decoding_message_size = self.max_decoding_message_size;
1140                    let max_encoding_message_size = self.max_encoding_message_size;
1141                    let inner = self.inner.clone();
1142                    let fut = async move {
1143                        let method = ValidateSinkSvc(inner);
1144                        let codec = tonic::codec::ProstCodec::default();
1145                        let mut grpc = tonic::server::Grpc::new(codec)
1146                            .apply_compression_config(
1147                                accept_compression_encodings,
1148                                send_compression_encodings,
1149                            )
1150                            .apply_max_message_size_config(
1151                                max_decoding_message_size,
1152                                max_encoding_message_size,
1153                            );
1154                        let res = grpc.unary(method, req).await;
1155                        Ok(res)
1156                    };
1157                    Box::pin(fut)
1158                }
1159                "/connector_service.ConnectorService/GetEventStream" => {
1160                    #[allow(non_camel_case_types)]
1161                    struct GetEventStreamSvc<T: ConnectorService>(pub Arc<T>);
1162                    impl<
1163                        T: ConnectorService,
1164                    > tonic::server::ServerStreamingService<super::GetEventStreamRequest>
1165                    for GetEventStreamSvc<T> {
1166                        type Response = super::GetEventStreamResponse;
1167                        type ResponseStream = T::GetEventStreamStream;
1168                        type Future = BoxFuture<
1169                            tonic::Response<Self::ResponseStream>,
1170                            tonic::Status,
1171                        >;
1172                        fn call(
1173                            &mut self,
1174                            request: tonic::Request<super::GetEventStreamRequest>,
1175                        ) -> Self::Future {
1176                            let inner = Arc::clone(&self.0);
1177                            let fut = async move {
1178                                <T as ConnectorService>::get_event_stream(&inner, request)
1179                                    .await
1180                            };
1181                            Box::pin(fut)
1182                        }
1183                    }
1184                    let accept_compression_encodings = self.accept_compression_encodings;
1185                    let send_compression_encodings = self.send_compression_encodings;
1186                    let max_decoding_message_size = self.max_decoding_message_size;
1187                    let max_encoding_message_size = self.max_encoding_message_size;
1188                    let inner = self.inner.clone();
1189                    let fut = async move {
1190                        let method = GetEventStreamSvc(inner);
1191                        let codec = tonic::codec::ProstCodec::default();
1192                        let mut grpc = tonic::server::Grpc::new(codec)
1193                            .apply_compression_config(
1194                                accept_compression_encodings,
1195                                send_compression_encodings,
1196                            )
1197                            .apply_max_message_size_config(
1198                                max_decoding_message_size,
1199                                max_encoding_message_size,
1200                            );
1201                        let res = grpc.server_streaming(method, req).await;
1202                        Ok(res)
1203                    };
1204                    Box::pin(fut)
1205                }
1206                "/connector_service.ConnectorService/ValidateSource" => {
1207                    #[allow(non_camel_case_types)]
1208                    struct ValidateSourceSvc<T: ConnectorService>(pub Arc<T>);
1209                    impl<
1210                        T: ConnectorService,
1211                    > tonic::server::UnaryService<super::ValidateSourceRequest>
1212                    for ValidateSourceSvc<T> {
1213                        type Response = super::ValidateSourceResponse;
1214                        type Future = BoxFuture<
1215                            tonic::Response<Self::Response>,
1216                            tonic::Status,
1217                        >;
1218                        fn call(
1219                            &mut self,
1220                            request: tonic::Request<super::ValidateSourceRequest>,
1221                        ) -> Self::Future {
1222                            let inner = Arc::clone(&self.0);
1223                            let fut = async move {
1224                                <T as ConnectorService>::validate_source(&inner, request)
1225                                    .await
1226                            };
1227                            Box::pin(fut)
1228                        }
1229                    }
1230                    let accept_compression_encodings = self.accept_compression_encodings;
1231                    let send_compression_encodings = self.send_compression_encodings;
1232                    let max_decoding_message_size = self.max_decoding_message_size;
1233                    let max_encoding_message_size = self.max_encoding_message_size;
1234                    let inner = self.inner.clone();
1235                    let fut = async move {
1236                        let method = ValidateSourceSvc(inner);
1237                        let codec = tonic::codec::ProstCodec::default();
1238                        let mut grpc = tonic::server::Grpc::new(codec)
1239                            .apply_compression_config(
1240                                accept_compression_encodings,
1241                                send_compression_encodings,
1242                            )
1243                            .apply_max_message_size_config(
1244                                max_decoding_message_size,
1245                                max_encoding_message_size,
1246                            );
1247                        let res = grpc.unary(method, req).await;
1248                        Ok(res)
1249                    };
1250                    Box::pin(fut)
1251                }
1252                _ => {
1253                    Box::pin(async move {
1254                        let mut response = http::Response::new(empty_body());
1255                        let headers = response.headers_mut();
1256                        headers
1257                            .insert(
1258                                tonic::Status::GRPC_STATUS,
1259                                (tonic::Code::Unimplemented as i32).into(),
1260                            );
1261                        headers
1262                            .insert(
1263                                http::header::CONTENT_TYPE,
1264                                tonic::metadata::GRPC_CONTENT_TYPE,
1265                            );
1266                        Ok(response)
1267                    })
1268                }
1269            }
1270        }
1271    }
1272    impl<T> Clone for ConnectorServiceServer<T> {
1273        fn clone(&self) -> Self {
1274            let inner = self.inner.clone();
1275            Self {
1276                inner,
1277                accept_compression_encodings: self.accept_compression_encodings,
1278                send_compression_encodings: self.send_compression_encodings,
1279                max_decoding_message_size: self.max_decoding_message_size,
1280                max_encoding_message_size: self.max_encoding_message_size,
1281            }
1282        }
1283    }
1284    /// Generated gRPC service name
1285    pub const SERVICE_NAME: &str = "connector_service.ConnectorService";
1286    impl<T> tonic::server::NamedService for ConnectorServiceServer<T> {
1287        const NAME: &'static str = SERVICE_NAME;
1288    }
1289}
1290/// Generated server implementations.
1291pub mod sink_coordination_service_server {
1292    #![allow(
1293        unused_variables,
1294        dead_code,
1295        missing_docs,
1296        clippy::wildcard_imports,
1297        clippy::let_unit_value,
1298    )]
1299    use tonic::codegen::*;
1300    /// Generated trait containing gRPC methods that should be implemented for use with SinkCoordinationServiceServer.
1301    #[async_trait]
1302    pub trait SinkCoordinationService: std::marker::Send + std::marker::Sync + 'static {
1303        /// Server streaming response type for the Coordinate method.
1304        type CoordinateStream: tonic::codegen::tokio_stream::Stream<
1305                Item = std::result::Result<super::CoordinateResponse, tonic::Status>,
1306            >
1307            + std::marker::Send
1308            + 'static;
1309        async fn coordinate(
1310            &self,
1311            request: tonic::Request<tonic::Streaming<super::CoordinateRequest>>,
1312        ) -> std::result::Result<tonic::Response<Self::CoordinateStream>, tonic::Status>;
1313    }
1314    #[derive(Debug)]
1315    pub struct SinkCoordinationServiceServer<T> {
1316        inner: Arc<T>,
1317        accept_compression_encodings: EnabledCompressionEncodings,
1318        send_compression_encodings: EnabledCompressionEncodings,
1319        max_decoding_message_size: Option<usize>,
1320        max_encoding_message_size: Option<usize>,
1321    }
1322    impl<T> SinkCoordinationServiceServer<T> {
1323        pub fn new(inner: T) -> Self {
1324            Self::from_arc(Arc::new(inner))
1325        }
1326        pub fn from_arc(inner: Arc<T>) -> Self {
1327            Self {
1328                inner,
1329                accept_compression_encodings: Default::default(),
1330                send_compression_encodings: Default::default(),
1331                max_decoding_message_size: None,
1332                max_encoding_message_size: None,
1333            }
1334        }
1335        pub fn with_interceptor<F>(
1336            inner: T,
1337            interceptor: F,
1338        ) -> InterceptedService<Self, F>
1339        where
1340            F: tonic::service::Interceptor,
1341        {
1342            InterceptedService::new(Self::new(inner), interceptor)
1343        }
1344        /// Enable decompressing requests with the given encoding.
1345        #[must_use]
1346        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
1347            self.accept_compression_encodings.enable(encoding);
1348            self
1349        }
1350        /// Compress responses with the given encoding, if the client supports it.
1351        #[must_use]
1352        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
1353            self.send_compression_encodings.enable(encoding);
1354            self
1355        }
1356        /// Limits the maximum size of a decoded message.
1357        ///
1358        /// Default: `4MB`
1359        #[must_use]
1360        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
1361            self.max_decoding_message_size = Some(limit);
1362            self
1363        }
1364        /// Limits the maximum size of an encoded message.
1365        ///
1366        /// Default: `usize::MAX`
1367        #[must_use]
1368        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
1369            self.max_encoding_message_size = Some(limit);
1370            self
1371        }
1372    }
1373    impl<T, B> tonic::codegen::Service<http::Request<B>>
1374    for SinkCoordinationServiceServer<T>
1375    where
1376        T: SinkCoordinationService,
1377        B: Body + std::marker::Send + 'static,
1378        B::Error: Into<StdError> + std::marker::Send + 'static,
1379    {
1380        type Response = http::Response<tonic::body::BoxBody>;
1381        type Error = std::convert::Infallible;
1382        type Future = BoxFuture<Self::Response, Self::Error>;
1383        fn poll_ready(
1384            &mut self,
1385            _cx: &mut Context<'_>,
1386        ) -> Poll<std::result::Result<(), Self::Error>> {
1387            Poll::Ready(Ok(()))
1388        }
1389        fn call(&mut self, req: http::Request<B>) -> Self::Future {
1390            match req.uri().path() {
1391                "/connector_service.SinkCoordinationService/Coordinate" => {
1392                    #[allow(non_camel_case_types)]
1393                    struct CoordinateSvc<T: SinkCoordinationService>(pub Arc<T>);
1394                    impl<
1395                        T: SinkCoordinationService,
1396                    > tonic::server::StreamingService<super::CoordinateRequest>
1397                    for CoordinateSvc<T> {
1398                        type Response = super::CoordinateResponse;
1399                        type ResponseStream = T::CoordinateStream;
1400                        type Future = BoxFuture<
1401                            tonic::Response<Self::ResponseStream>,
1402                            tonic::Status,
1403                        >;
1404                        fn call(
1405                            &mut self,
1406                            request: tonic::Request<
1407                                tonic::Streaming<super::CoordinateRequest>,
1408                            >,
1409                        ) -> Self::Future {
1410                            let inner = Arc::clone(&self.0);
1411                            let fut = async move {
1412                                <T as SinkCoordinationService>::coordinate(&inner, request)
1413                                    .await
1414                            };
1415                            Box::pin(fut)
1416                        }
1417                    }
1418                    let accept_compression_encodings = self.accept_compression_encodings;
1419                    let send_compression_encodings = self.send_compression_encodings;
1420                    let max_decoding_message_size = self.max_decoding_message_size;
1421                    let max_encoding_message_size = self.max_encoding_message_size;
1422                    let inner = self.inner.clone();
1423                    let fut = async move {
1424                        let method = CoordinateSvc(inner);
1425                        let codec = tonic::codec::ProstCodec::default();
1426                        let mut grpc = tonic::server::Grpc::new(codec)
1427                            .apply_compression_config(
1428                                accept_compression_encodings,
1429                                send_compression_encodings,
1430                            )
1431                            .apply_max_message_size_config(
1432                                max_decoding_message_size,
1433                                max_encoding_message_size,
1434                            );
1435                        let res = grpc.streaming(method, req).await;
1436                        Ok(res)
1437                    };
1438                    Box::pin(fut)
1439                }
1440                _ => {
1441                    Box::pin(async move {
1442                        let mut response = http::Response::new(empty_body());
1443                        let headers = response.headers_mut();
1444                        headers
1445                            .insert(
1446                                tonic::Status::GRPC_STATUS,
1447                                (tonic::Code::Unimplemented as i32).into(),
1448                            );
1449                        headers
1450                            .insert(
1451                                http::header::CONTENT_TYPE,
1452                                tonic::metadata::GRPC_CONTENT_TYPE,
1453                            );
1454                        Ok(response)
1455                    })
1456                }
1457            }
1458        }
1459    }
1460    impl<T> Clone for SinkCoordinationServiceServer<T> {
1461        fn clone(&self) -> Self {
1462            let inner = self.inner.clone();
1463            Self {
1464                inner,
1465                accept_compression_encodings: self.accept_compression_encodings,
1466                send_compression_encodings: self.send_compression_encodings,
1467                max_decoding_message_size: self.max_decoding_message_size,
1468                max_encoding_message_size: self.max_encoding_message_size,
1469            }
1470        }
1471    }
1472    /// Generated gRPC service name
1473    pub const SERVICE_NAME: &str = "connector_service.SinkCoordinationService";
1474    impl<T> tonic::server::NamedService for SinkCoordinationServiceServer<T> {
1475        const NAME: &'static str = SERVICE_NAME;
1476    }
1477}