risingwave_pb/
connector_service.rs

1// This file is @generated by prost-build.
2#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct TableSchema {
5    #[prost(message, repeated, tag = "1")]
6    pub columns: ::prost::alloc::vec::Vec<super::plan_common::ColumnDesc>,
7    #[prost(uint32, repeated, tag = "2")]
8    pub pk_indices: ::prost::alloc::vec::Vec<u32>,
9}
10#[derive(prost_helpers::AnyPB)]
11#[derive(Clone, PartialEq, ::prost::Message)]
12pub struct ValidationError {
13    #[prost(string, tag = "1")]
14    pub error_message: ::prost::alloc::string::String,
15}
16#[derive(prost_helpers::AnyPB)]
17#[derive(Clone, PartialEq, ::prost::Message)]
18pub struct SinkParam {
19    #[prost(uint32, tag = "1", wrapper = "crate::id::SinkId")]
20    pub sink_id: crate::id::SinkId,
21    #[prost(btree_map = "string, string", tag = "2")]
22    pub properties: ::prost::alloc::collections::BTreeMap<
23        ::prost::alloc::string::String,
24        ::prost::alloc::string::String,
25    >,
26    #[prost(message, optional, tag = "3")]
27    pub table_schema: ::core::option::Option<TableSchema>,
28    /// to be deprecated
29    #[prost(enumeration = "super::catalog::SinkType", tag = "4")]
30    pub sink_type: i32,
31    #[prost(string, tag = "5")]
32    pub db_name: ::prost::alloc::string::String,
33    #[prost(string, tag = "6")]
34    pub sink_from_name: ::prost::alloc::string::String,
35    #[prost(message, optional, tag = "7")]
36    pub format_desc: ::core::option::Option<super::catalog::SinkFormatDesc>,
37    #[prost(string, tag = "8")]
38    pub sink_name: ::prost::alloc::string::String,
39    /// Backward-compatible replacement for `SINK_TYPE_FORCE_APPEND_ONLY`.
40    ///
41    /// Should not directly access this field. Use method `ignore_delete()` instead.
42    #[prost(bool, tag = "9")]
43    pub raw_ignore_delete: bool,
44}
45#[derive(prost_helpers::AnyPB)]
46#[derive(Clone, PartialEq, ::prost::Message)]
47pub struct SinkWriterStreamRequest {
48    #[prost(oneof = "sink_writer_stream_request::Request", tags = "1, 3, 4")]
49    pub request: ::core::option::Option<sink_writer_stream_request::Request>,
50}
51/// Nested message and enum types in `SinkWriterStreamRequest`.
52pub mod sink_writer_stream_request {
53    #[derive(prost_helpers::AnyPB)]
54    #[derive(Clone, PartialEq, ::prost::Message)]
55    pub struct StartSink {
56        #[prost(message, optional, tag = "1")]
57        pub sink_param: ::core::option::Option<super::SinkParam>,
58        #[prost(message, optional, tag = "3")]
59        pub payload_schema: ::core::option::Option<super::TableSchema>,
60    }
61    #[derive(prost_helpers::AnyPB)]
62    #[derive(Clone, PartialEq, ::prost::Message)]
63    pub struct WriteBatch {
64        #[prost(uint64, tag = "3")]
65        pub batch_id: u64,
66        #[prost(uint64, tag = "4")]
67        pub epoch: u64,
68        #[prost(oneof = "write_batch::Payload", tags = "2, 5")]
69        pub payload: ::core::option::Option<write_batch::Payload>,
70    }
71    /// Nested message and enum types in `WriteBatch`.
72    pub mod write_batch {
73        #[derive(prost_helpers::AnyPB)]
74        #[derive(Clone, PartialEq, ::prost::Message)]
75        pub struct StreamChunkPayload {
76            #[prost(bytes = "vec", tag = "1")]
77            pub binary_data: ::prost::alloc::vec::Vec<u8>,
78        }
79        #[derive(prost_helpers::AnyPB)]
80        #[derive(Clone, PartialEq, ::prost::Oneof)]
81        pub enum Payload {
82            #[prost(message, tag = "2")]
83            StreamChunkPayload(StreamChunkPayload),
84            /// This is a reference pointer to a StreamChunk. The StreamChunk is owned
85            /// by the JniSinkWriterStreamRequest, which should handle the release of StreamChunk.
86            /// Index set to 5 because 3 and 4 have been occupied by `batch_id` and `epoch`
87            #[prost(int64, tag = "5")]
88            StreamChunkRefPointer(i64),
89        }
90    }
91    #[derive(prost_helpers::AnyPB)]
92    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
93    pub struct Barrier {
94        #[prost(uint64, tag = "1")]
95        pub epoch: u64,
96        #[prost(bool, tag = "2")]
97        pub is_checkpoint: bool,
98    }
99    #[derive(prost_helpers::AnyPB)]
100    #[derive(Clone, PartialEq, ::prost::Oneof)]
101    pub enum Request {
102        #[prost(message, tag = "1")]
103        Start(StartSink),
104        #[prost(message, tag = "3")]
105        WriteBatch(WriteBatch),
106        #[prost(message, tag = "4")]
107        Barrier(Barrier),
108    }
109}
110#[derive(prost_helpers::AnyPB)]
111#[derive(Clone, PartialEq, ::prost::Message)]
112pub struct SinkWriterStreamResponse {
113    #[prost(oneof = "sink_writer_stream_response::Response", tags = "1, 2, 3")]
114    pub response: ::core::option::Option<sink_writer_stream_response::Response>,
115}
116/// Nested message and enum types in `SinkWriterStreamResponse`.
117pub mod sink_writer_stream_response {
118    #[derive(prost_helpers::AnyPB)]
119    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
120    pub struct StartResponse {}
121    #[derive(prost_helpers::AnyPB)]
122    #[derive(Clone, PartialEq, ::prost::Message)]
123    pub struct CommitResponse {
124        #[prost(uint64, tag = "1")]
125        pub epoch: u64,
126        #[prost(message, optional, tag = "2")]
127        pub metadata: ::core::option::Option<super::SinkMetadata>,
128    }
129    #[derive(prost_helpers::AnyPB)]
130    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
131    pub struct BatchWrittenResponse {
132        #[prost(uint64, tag = "1")]
133        pub epoch: u64,
134        #[prost(uint64, tag = "2")]
135        pub batch_id: u64,
136    }
137    #[derive(prost_helpers::AnyPB)]
138    #[derive(Clone, PartialEq, ::prost::Oneof)]
139    pub enum Response {
140        #[prost(message, tag = "1")]
141        Start(StartResponse),
142        #[prost(message, tag = "2")]
143        Commit(CommitResponse),
144        #[prost(message, tag = "3")]
145        Batch(BatchWrittenResponse),
146    }
147}
148#[derive(prost_helpers::AnyPB)]
149#[derive(Clone, PartialEq, ::prost::Message)]
150pub struct ValidateSinkRequest {
151    #[prost(message, optional, tag = "1")]
152    pub sink_param: ::core::option::Option<SinkParam>,
153}
154#[derive(prost_helpers::AnyPB)]
155#[derive(Clone, PartialEq, ::prost::Message)]
156pub struct ValidateSinkResponse {
157    /// On validation failure, we return the error.
158    #[prost(message, optional, tag = "1")]
159    pub error: ::core::option::Option<ValidationError>,
160}
161#[derive(prost_helpers::AnyPB)]
162#[derive(Clone, PartialEq, ::prost::Message)]
163pub struct SinkMetadata {
164    #[prost(oneof = "sink_metadata::Metadata", tags = "1")]
165    pub metadata: ::core::option::Option<sink_metadata::Metadata>,
166}
167/// Nested message and enum types in `SinkMetadata`.
168pub mod sink_metadata {
169    #[derive(prost_helpers::AnyPB)]
170    #[derive(Clone, PartialEq, ::prost::Message)]
171    pub struct SerializedMetadata {
172        #[prost(bytes = "vec", tag = "1")]
173        pub metadata: ::prost::alloc::vec::Vec<u8>,
174    }
175    #[derive(prost_helpers::AnyPB)]
176    #[derive(Clone, PartialEq, ::prost::Oneof)]
177    pub enum Metadata {
178        #[prost(message, tag = "1")]
179        Serialized(SerializedMetadata),
180    }
181}
182#[derive(prost_helpers::AnyPB)]
183#[derive(Clone, PartialEq, ::prost::Message)]
184pub struct SinkCoordinatorStreamRequest {
185    #[prost(oneof = "sink_coordinator_stream_request::Request", tags = "1, 2")]
186    pub request: ::core::option::Option<sink_coordinator_stream_request::Request>,
187}
188/// Nested message and enum types in `SinkCoordinatorStreamRequest`.
189pub mod sink_coordinator_stream_request {
190    #[derive(prost_helpers::AnyPB)]
191    #[derive(Clone, PartialEq, ::prost::Message)]
192    pub struct StartCoordinator {
193        #[prost(message, optional, tag = "1")]
194        pub param: ::core::option::Option<super::SinkParam>,
195    }
196    #[derive(prost_helpers::AnyPB)]
197    #[derive(Clone, PartialEq, ::prost::Message)]
198    pub struct CommitMetadata {
199        #[prost(uint64, tag = "1")]
200        pub epoch: u64,
201        #[prost(message, repeated, tag = "2")]
202        pub metadata: ::prost::alloc::vec::Vec<super::SinkMetadata>,
203    }
204    #[derive(prost_helpers::AnyPB)]
205    #[derive(Clone, PartialEq, ::prost::Oneof)]
206    pub enum Request {
207        #[prost(message, tag = "1")]
208        Start(StartCoordinator),
209        #[prost(message, tag = "2")]
210        Commit(CommitMetadata),
211    }
212}
213#[derive(prost_helpers::AnyPB)]
214#[derive(Clone, Copy, PartialEq, ::prost::Message)]
215pub struct SinkCoordinatorStreamResponse {
216    #[prost(oneof = "sink_coordinator_stream_response::Response", tags = "1, 2")]
217    pub response: ::core::option::Option<sink_coordinator_stream_response::Response>,
218}
219/// Nested message and enum types in `SinkCoordinatorStreamResponse`.
220pub mod sink_coordinator_stream_response {
221    #[derive(prost_helpers::AnyPB)]
222    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
223    pub struct StartResponse {}
224    #[derive(prost_helpers::AnyPB)]
225    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
226    pub struct CommitResponse {
227        #[prost(uint64, tag = "1")]
228        pub epoch: u64,
229    }
230    #[derive(prost_helpers::AnyPB)]
231    #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
232    pub enum Response {
233        #[prost(message, tag = "1")]
234        Start(StartResponse),
235        #[prost(message, tag = "2")]
236        Commit(CommitResponse),
237    }
238}
239#[derive(prost_helpers::AnyPB)]
240#[derive(Clone, PartialEq, ::prost::Message)]
241pub struct CdcMessage {
242    /// The value of the Debezium message
243    #[prost(string, tag = "1")]
244    pub payload: ::prost::alloc::string::String,
245    #[prost(string, tag = "2")]
246    pub partition: ::prost::alloc::string::String,
247    #[prost(string, tag = "3")]
248    pub offset: ::prost::alloc::string::String,
249    #[prost(string, tag = "4")]
250    pub full_table_name: ::prost::alloc::string::String,
251    #[prost(int64, tag = "5")]
252    pub source_ts_ms: i64,
253    #[prost(enumeration = "cdc_message::CdcMessageType", tag = "6")]
254    pub msg_type: i32,
255    /// The key of the Debezium message, which only used by `mongodb-cdc` connector.
256    #[prost(string, tag = "7")]
257    pub key: ::prost::alloc::string::String,
258    /// The CDC source type of this message. This is required to disambiguate the parsing logic for
259    /// `full_table_name` (e.g. `db.table` in MySQL vs `schema.table` in Postgres/SQL Server).
260    #[prost(enumeration = "SourceType", tag = "8")]
261    pub source_type: i32,
262}
263/// Nested message and enum types in `CdcMessage`.
264pub mod cdc_message {
265    #[derive(prost_helpers::AnyPB)]
266    #[derive(
267        Clone,
268        Copy,
269        Debug,
270        PartialEq,
271        Eq,
272        Hash,
273        PartialOrd,
274        Ord,
275        ::prost::Enumeration
276    )]
277    #[repr(i32)]
278    pub enum CdcMessageType {
279        Unspecified = 0,
280        Heartbeat = 1,
281        Data = 2,
282        TransactionMeta = 3,
283        SchemaChange = 4,
284    }
285    impl CdcMessageType {
286        /// String value of the enum field names used in the ProtoBuf definition.
287        ///
288        /// The values are not transformed in any way and thus are considered stable
289        /// (if the ProtoBuf definition does not change) and safe for programmatic use.
290        pub fn as_str_name(&self) -> &'static str {
291            match self {
292                Self::Unspecified => "UNSPECIFIED",
293                Self::Heartbeat => "HEARTBEAT",
294                Self::Data => "DATA",
295                Self::TransactionMeta => "TRANSACTION_META",
296                Self::SchemaChange => "SCHEMA_CHANGE",
297            }
298        }
299        /// Creates an enum from field names used in the ProtoBuf definition.
300        pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
301            match value {
302                "UNSPECIFIED" => Some(Self::Unspecified),
303                "HEARTBEAT" => Some(Self::Heartbeat),
304                "DATA" => Some(Self::Data),
305                "TRANSACTION_META" => Some(Self::TransactionMeta),
306                "SCHEMA_CHANGE" => Some(Self::SchemaChange),
307                _ => None,
308            }
309        }
310    }
311}
312#[derive(prost_helpers::AnyPB)]
313#[derive(Clone, PartialEq, ::prost::Message)]
314pub struct GetEventStreamRequest {
315    #[prost(uint64, tag = "1")]
316    pub source_id: u64,
317    #[prost(enumeration = "SourceType", tag = "2")]
318    pub source_type: i32,
319    #[prost(string, tag = "3")]
320    pub start_offset: ::prost::alloc::string::String,
321    #[prost(btree_map = "string, string", tag = "4")]
322    pub properties: ::prost::alloc::collections::BTreeMap<
323        ::prost::alloc::string::String,
324        ::prost::alloc::string::String,
325    >,
326    #[prost(bool, tag = "5")]
327    pub snapshot_done: bool,
328    #[prost(bool, tag = "6")]
329    pub is_source_job: bool,
330}
331#[derive(prost_helpers::AnyPB)]
332#[derive(Clone, PartialEq, ::prost::Message)]
333pub struct GetEventStreamResponse {
334    #[prost(uint64, tag = "1")]
335    pub source_id: u64,
336    #[prost(message, repeated, tag = "2")]
337    pub events: ::prost::alloc::vec::Vec<CdcMessage>,
338    #[prost(message, optional, tag = "3")]
339    pub control: ::core::option::Option<get_event_stream_response::ControlInfo>,
340}
341/// Nested message and enum types in `GetEventStreamResponse`.
342pub mod get_event_stream_response {
343    #[derive(prost_helpers::AnyPB)]
344    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
345    pub struct ControlInfo {
346        #[prost(bool, tag = "1")]
347        pub handshake_ok: bool,
348    }
349}
350#[derive(prost_helpers::AnyPB)]
351#[derive(Clone, PartialEq, ::prost::Message)]
352pub struct ValidateSourceRequest {
353    #[prost(uint64, tag = "1")]
354    pub source_id: u64,
355    #[prost(enumeration = "SourceType", tag = "2")]
356    pub source_type: i32,
357    #[prost(btree_map = "string, string", tag = "3")]
358    pub properties: ::prost::alloc::collections::BTreeMap<
359        ::prost::alloc::string::String,
360        ::prost::alloc::string::String,
361    >,
362    #[prost(message, optional, tag = "4")]
363    pub table_schema: ::core::option::Option<TableSchema>,
364    #[prost(bool, tag = "5")]
365    pub is_source_job: bool,
366    #[prost(bool, tag = "6")]
367    pub is_backfill_table: bool,
368}
369#[derive(prost_helpers::AnyPB)]
370#[derive(Clone, PartialEq, ::prost::Message)]
371pub struct ValidateSourceResponse {
372    /// On validation failure, we return the error.
373    #[prost(message, optional, tag = "1")]
374    pub error: ::core::option::Option<ValidationError>,
375}
376#[derive(prost_helpers::AnyPB)]
377#[derive(Clone, PartialEq, ::prost::Message)]
378pub struct CoordinateRequest {
379    #[prost(oneof = "coordinate_request::Msg", tags = "1, 2, 3, 4, 5")]
380    pub msg: ::core::option::Option<coordinate_request::Msg>,
381}
382/// Nested message and enum types in `CoordinateRequest`.
383pub mod coordinate_request {
384    /// The first request that starts a coordination between sink writer and coordinator.
385    /// The service will respond after sink writers of all vnodes have sent the request.
386    #[derive(prost_helpers::AnyPB)]
387    #[derive(Clone, PartialEq, ::prost::Message)]
388    pub struct StartCoordinationRequest {
389        #[prost(message, optional, tag = "1")]
390        pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
391        #[prost(message, optional, tag = "2")]
392        pub param: ::core::option::Option<super::SinkParam>,
393    }
394    #[derive(prost_helpers::AnyPB)]
395    #[derive(Clone, PartialEq, ::prost::Message)]
396    pub struct CommitRequest {
397        #[prost(uint64, tag = "1")]
398        pub epoch: u64,
399        #[prost(message, optional, tag = "2")]
400        pub metadata: ::core::option::Option<super::SinkMetadata>,
401        #[prost(message, optional, tag = "3")]
402        pub schema_change: ::core::option::Option<
403            super::super::stream_plan::SinkSchemaChange,
404        >,
405    }
406    #[derive(prost_helpers::AnyPB)]
407    #[derive(Clone, PartialEq, ::prost::Message)]
408    pub struct UpdateVnodeBitmapRequest {
409        #[prost(message, optional, tag = "1")]
410        pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
411    }
412    #[derive(prost_helpers::AnyPB)]
413    #[derive(Clone, PartialEq, ::prost::Oneof)]
414    pub enum Msg {
415        #[prost(message, tag = "1")]
416        StartRequest(StartCoordinationRequest),
417        #[prost(message, tag = "2")]
418        CommitRequest(CommitRequest),
419        #[prost(message, tag = "3")]
420        UpdateVnodeRequest(UpdateVnodeBitmapRequest),
421        #[prost(bool, tag = "4")]
422        Stop(bool),
423        #[prost(uint64, tag = "5")]
424        AlignInitialEpochRequest(u64),
425    }
426}
427#[derive(prost_helpers::AnyPB)]
428#[derive(Clone, Copy, PartialEq, ::prost::Message)]
429pub struct CoordinateResponse {
430    #[prost(oneof = "coordinate_response::Msg", tags = "1, 2, 3, 4")]
431    pub msg: ::core::option::Option<coordinate_response::Msg>,
432}
433/// Nested message and enum types in `CoordinateResponse`.
434pub mod coordinate_response {
435    #[derive(prost_helpers::AnyPB)]
436    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
437    pub struct StartCoordinationResponse {
438        #[prost(uint64, optional, tag = "1")]
439        pub log_store_rewind_start_epoch: ::core::option::Option<u64>,
440    }
441    #[derive(prost_helpers::AnyPB)]
442    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
443    pub struct CommitResponse {
444        #[prost(uint64, tag = "1")]
445        pub epoch: u64,
446    }
447    #[derive(prost_helpers::AnyPB)]
448    #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
449    pub enum Msg {
450        #[prost(message, tag = "1")]
451        StartResponse(StartCoordinationResponse),
452        #[prost(message, tag = "2")]
453        CommitResponse(CommitResponse),
454        #[prost(bool, tag = "3")]
455        Stopped(bool),
456        #[prost(uint64, tag = "4")]
457        AlignInitialEpochResponse(u64),
458    }
459}
460#[derive(prost_helpers::AnyPB)]
461#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
462#[repr(i32)]
463pub enum SourceType {
464    Unspecified = 0,
465    Mysql = 1,
466    Postgres = 2,
467    Citus = 3,
468    Mongodb = 4,
469    SqlServer = 5,
470}
471impl SourceType {
472    /// String value of the enum field names used in the ProtoBuf definition.
473    ///
474    /// The values are not transformed in any way and thus are considered stable
475    /// (if the ProtoBuf definition does not change) and safe for programmatic use.
476    pub fn as_str_name(&self) -> &'static str {
477        match self {
478            Self::Unspecified => "UNSPECIFIED",
479            Self::Mysql => "MYSQL",
480            Self::Postgres => "POSTGRES",
481            Self::Citus => "CITUS",
482            Self::Mongodb => "MONGODB",
483            Self::SqlServer => "SQL_SERVER",
484        }
485    }
486    /// Creates an enum from field names used in the ProtoBuf definition.
487    pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
488        match value {
489            "UNSPECIFIED" => Some(Self::Unspecified),
490            "MYSQL" => Some(Self::Mysql),
491            "POSTGRES" => Some(Self::Postgres),
492            "CITUS" => Some(Self::Citus),
493            "MONGODB" => Some(Self::Mongodb),
494            "SQL_SERVER" => Some(Self::SqlServer),
495            _ => None,
496        }
497    }
498}
499/// Generated client implementations.
500pub mod connector_service_client {
501    #![allow(
502        unused_variables,
503        dead_code,
504        missing_docs,
505        clippy::wildcard_imports,
506        clippy::let_unit_value,
507    )]
508    use tonic::codegen::*;
509    use tonic::codegen::http::Uri;
510    #[derive(Debug, Clone)]
511    pub struct ConnectorServiceClient<T> {
512        inner: tonic::client::Grpc<T>,
513    }
514    impl ConnectorServiceClient<tonic::transport::Channel> {
515        /// Attempt to create a new client by connecting to a given endpoint.
516        pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
517        where
518            D: TryInto<tonic::transport::Endpoint>,
519            D::Error: Into<StdError>,
520        {
521            let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
522            Ok(Self::new(conn))
523        }
524    }
525    impl<T> ConnectorServiceClient<T>
526    where
527        T: tonic::client::GrpcService<tonic::body::BoxBody>,
528        T::Error: Into<StdError>,
529        T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
530        <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
531    {
532        pub fn new(inner: T) -> Self {
533            let inner = tonic::client::Grpc::new(inner);
534            Self { inner }
535        }
536        pub fn with_origin(inner: T, origin: Uri) -> Self {
537            let inner = tonic::client::Grpc::with_origin(inner, origin);
538            Self { inner }
539        }
540        pub fn with_interceptor<F>(
541            inner: T,
542            interceptor: F,
543        ) -> ConnectorServiceClient<InterceptedService<T, F>>
544        where
545            F: tonic::service::Interceptor,
546            T::ResponseBody: Default,
547            T: tonic::codegen::Service<
548                http::Request<tonic::body::BoxBody>,
549                Response = http::Response<
550                    <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
551                >,
552            >,
553            <T as tonic::codegen::Service<
554                http::Request<tonic::body::BoxBody>,
555            >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
556        {
557            ConnectorServiceClient::new(InterceptedService::new(inner, interceptor))
558        }
559        /// Compress requests with the given encoding.
560        ///
561        /// This requires the server to support it otherwise it might respond with an
562        /// error.
563        #[must_use]
564        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
565            self.inner = self.inner.send_compressed(encoding);
566            self
567        }
568        /// Enable decompressing responses.
569        #[must_use]
570        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
571            self.inner = self.inner.accept_compressed(encoding);
572            self
573        }
574        /// Limits the maximum size of a decoded message.
575        ///
576        /// Default: `4MB`
577        #[must_use]
578        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
579            self.inner = self.inner.max_decoding_message_size(limit);
580            self
581        }
582        /// Limits the maximum size of an encoded message.
583        ///
584        /// Default: `usize::MAX`
585        #[must_use]
586        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
587            self.inner = self.inner.max_encoding_message_size(limit);
588            self
589        }
590        pub async fn sink_writer_stream(
591            &mut self,
592            request: impl tonic::IntoStreamingRequest<
593                Message = super::SinkWriterStreamRequest,
594            >,
595        ) -> std::result::Result<
596            tonic::Response<tonic::codec::Streaming<super::SinkWriterStreamResponse>>,
597            tonic::Status,
598        > {
599            self.inner
600                .ready()
601                .await
602                .map_err(|e| {
603                    tonic::Status::unknown(
604                        format!("Service was not ready: {}", e.into()),
605                    )
606                })?;
607            let codec = tonic::codec::ProstCodec::default();
608            let path = http::uri::PathAndQuery::from_static(
609                "/connector_service.ConnectorService/SinkWriterStream",
610            );
611            let mut req = request.into_streaming_request();
612            req.extensions_mut()
613                .insert(
614                    GrpcMethod::new(
615                        "connector_service.ConnectorService",
616                        "SinkWriterStream",
617                    ),
618                );
619            self.inner.streaming(req, path, codec).await
620        }
621        pub async fn sink_coordinator_stream(
622            &mut self,
623            request: impl tonic::IntoStreamingRequest<
624                Message = super::SinkCoordinatorStreamRequest,
625            >,
626        ) -> std::result::Result<
627            tonic::Response<
628                tonic::codec::Streaming<super::SinkCoordinatorStreamResponse>,
629            >,
630            tonic::Status,
631        > {
632            self.inner
633                .ready()
634                .await
635                .map_err(|e| {
636                    tonic::Status::unknown(
637                        format!("Service was not ready: {}", e.into()),
638                    )
639                })?;
640            let codec = tonic::codec::ProstCodec::default();
641            let path = http::uri::PathAndQuery::from_static(
642                "/connector_service.ConnectorService/SinkCoordinatorStream",
643            );
644            let mut req = request.into_streaming_request();
645            req.extensions_mut()
646                .insert(
647                    GrpcMethod::new(
648                        "connector_service.ConnectorService",
649                        "SinkCoordinatorStream",
650                    ),
651                );
652            self.inner.streaming(req, path, codec).await
653        }
654        pub async fn validate_sink(
655            &mut self,
656            request: impl tonic::IntoRequest<super::ValidateSinkRequest>,
657        ) -> std::result::Result<
658            tonic::Response<super::ValidateSinkResponse>,
659            tonic::Status,
660        > {
661            self.inner
662                .ready()
663                .await
664                .map_err(|e| {
665                    tonic::Status::unknown(
666                        format!("Service was not ready: {}", e.into()),
667                    )
668                })?;
669            let codec = tonic::codec::ProstCodec::default();
670            let path = http::uri::PathAndQuery::from_static(
671                "/connector_service.ConnectorService/ValidateSink",
672            );
673            let mut req = request.into_request();
674            req.extensions_mut()
675                .insert(
676                    GrpcMethod::new("connector_service.ConnectorService", "ValidateSink"),
677                );
678            self.inner.unary(req, path, codec).await
679        }
680        pub async fn get_event_stream(
681            &mut self,
682            request: impl tonic::IntoRequest<super::GetEventStreamRequest>,
683        ) -> std::result::Result<
684            tonic::Response<tonic::codec::Streaming<super::GetEventStreamResponse>>,
685            tonic::Status,
686        > {
687            self.inner
688                .ready()
689                .await
690                .map_err(|e| {
691                    tonic::Status::unknown(
692                        format!("Service was not ready: {}", e.into()),
693                    )
694                })?;
695            let codec = tonic::codec::ProstCodec::default();
696            let path = http::uri::PathAndQuery::from_static(
697                "/connector_service.ConnectorService/GetEventStream",
698            );
699            let mut req = request.into_request();
700            req.extensions_mut()
701                .insert(
702                    GrpcMethod::new(
703                        "connector_service.ConnectorService",
704                        "GetEventStream",
705                    ),
706                );
707            self.inner.server_streaming(req, path, codec).await
708        }
709        pub async fn validate_source(
710            &mut self,
711            request: impl tonic::IntoRequest<super::ValidateSourceRequest>,
712        ) -> std::result::Result<
713            tonic::Response<super::ValidateSourceResponse>,
714            tonic::Status,
715        > {
716            self.inner
717                .ready()
718                .await
719                .map_err(|e| {
720                    tonic::Status::unknown(
721                        format!("Service was not ready: {}", e.into()),
722                    )
723                })?;
724            let codec = tonic::codec::ProstCodec::default();
725            let path = http::uri::PathAndQuery::from_static(
726                "/connector_service.ConnectorService/ValidateSource",
727            );
728            let mut req = request.into_request();
729            req.extensions_mut()
730                .insert(
731                    GrpcMethod::new(
732                        "connector_service.ConnectorService",
733                        "ValidateSource",
734                    ),
735                );
736            self.inner.unary(req, path, codec).await
737        }
738    }
739}
740/// Generated client implementations.
741pub mod sink_coordination_service_client {
742    #![allow(
743        unused_variables,
744        dead_code,
745        missing_docs,
746        clippy::wildcard_imports,
747        clippy::let_unit_value,
748    )]
749    use tonic::codegen::*;
750    use tonic::codegen::http::Uri;
751    #[derive(Debug, Clone)]
752    pub struct SinkCoordinationServiceClient<T> {
753        inner: tonic::client::Grpc<T>,
754    }
755    impl SinkCoordinationServiceClient<tonic::transport::Channel> {
756        /// Attempt to create a new client by connecting to a given endpoint.
757        pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
758        where
759            D: TryInto<tonic::transport::Endpoint>,
760            D::Error: Into<StdError>,
761        {
762            let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
763            Ok(Self::new(conn))
764        }
765    }
766    impl<T> SinkCoordinationServiceClient<T>
767    where
768        T: tonic::client::GrpcService<tonic::body::BoxBody>,
769        T::Error: Into<StdError>,
770        T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
771        <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
772    {
773        pub fn new(inner: T) -> Self {
774            let inner = tonic::client::Grpc::new(inner);
775            Self { inner }
776        }
777        pub fn with_origin(inner: T, origin: Uri) -> Self {
778            let inner = tonic::client::Grpc::with_origin(inner, origin);
779            Self { inner }
780        }
781        pub fn with_interceptor<F>(
782            inner: T,
783            interceptor: F,
784        ) -> SinkCoordinationServiceClient<InterceptedService<T, F>>
785        where
786            F: tonic::service::Interceptor,
787            T::ResponseBody: Default,
788            T: tonic::codegen::Service<
789                http::Request<tonic::body::BoxBody>,
790                Response = http::Response<
791                    <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
792                >,
793            >,
794            <T as tonic::codegen::Service<
795                http::Request<tonic::body::BoxBody>,
796            >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
797        {
798            SinkCoordinationServiceClient::new(
799                InterceptedService::new(inner, interceptor),
800            )
801        }
802        /// Compress requests with the given encoding.
803        ///
804        /// This requires the server to support it otherwise it might respond with an
805        /// error.
806        #[must_use]
807        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
808            self.inner = self.inner.send_compressed(encoding);
809            self
810        }
811        /// Enable decompressing responses.
812        #[must_use]
813        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
814            self.inner = self.inner.accept_compressed(encoding);
815            self
816        }
817        /// Limits the maximum size of a decoded message.
818        ///
819        /// Default: `4MB`
820        #[must_use]
821        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
822            self.inner = self.inner.max_decoding_message_size(limit);
823            self
824        }
825        /// Limits the maximum size of an encoded message.
826        ///
827        /// Default: `usize::MAX`
828        #[must_use]
829        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
830            self.inner = self.inner.max_encoding_message_size(limit);
831            self
832        }
833        pub async fn coordinate(
834            &mut self,
835            request: impl tonic::IntoStreamingRequest<Message = super::CoordinateRequest>,
836        ) -> std::result::Result<
837            tonic::Response<tonic::codec::Streaming<super::CoordinateResponse>>,
838            tonic::Status,
839        > {
840            self.inner
841                .ready()
842                .await
843                .map_err(|e| {
844                    tonic::Status::unknown(
845                        format!("Service was not ready: {}", e.into()),
846                    )
847                })?;
848            let codec = tonic::codec::ProstCodec::default();
849            let path = http::uri::PathAndQuery::from_static(
850                "/connector_service.SinkCoordinationService/Coordinate",
851            );
852            let mut req = request.into_streaming_request();
853            req.extensions_mut()
854                .insert(
855                    GrpcMethod::new(
856                        "connector_service.SinkCoordinationService",
857                        "Coordinate",
858                    ),
859                );
860            self.inner.streaming(req, path, codec).await
861        }
862    }
863}
864/// Generated server implementations.
865pub mod connector_service_server {
866    #![allow(
867        unused_variables,
868        dead_code,
869        missing_docs,
870        clippy::wildcard_imports,
871        clippy::let_unit_value,
872    )]
873    use tonic::codegen::*;
874    /// Generated trait containing gRPC methods that should be implemented for use with ConnectorServiceServer.
875    #[async_trait]
876    pub trait ConnectorService: std::marker::Send + std::marker::Sync + 'static {
877        /// Server streaming response type for the SinkWriterStream method.
878        type SinkWriterStreamStream: tonic::codegen::tokio_stream::Stream<
879                Item = std::result::Result<
880                    super::SinkWriterStreamResponse,
881                    tonic::Status,
882                >,
883            >
884            + std::marker::Send
885            + 'static;
886        async fn sink_writer_stream(
887            &self,
888            request: tonic::Request<tonic::Streaming<super::SinkWriterStreamRequest>>,
889        ) -> std::result::Result<
890            tonic::Response<Self::SinkWriterStreamStream>,
891            tonic::Status,
892        >;
893        /// Server streaming response type for the SinkCoordinatorStream method.
894        type SinkCoordinatorStreamStream: tonic::codegen::tokio_stream::Stream<
895                Item = std::result::Result<
896                    super::SinkCoordinatorStreamResponse,
897                    tonic::Status,
898                >,
899            >
900            + std::marker::Send
901            + 'static;
902        async fn sink_coordinator_stream(
903            &self,
904            request: tonic::Request<
905                tonic::Streaming<super::SinkCoordinatorStreamRequest>,
906            >,
907        ) -> std::result::Result<
908            tonic::Response<Self::SinkCoordinatorStreamStream>,
909            tonic::Status,
910        >;
911        async fn validate_sink(
912            &self,
913            request: tonic::Request<super::ValidateSinkRequest>,
914        ) -> std::result::Result<
915            tonic::Response<super::ValidateSinkResponse>,
916            tonic::Status,
917        >;
918        /// Server streaming response type for the GetEventStream method.
919        type GetEventStreamStream: tonic::codegen::tokio_stream::Stream<
920                Item = std::result::Result<super::GetEventStreamResponse, tonic::Status>,
921            >
922            + std::marker::Send
923            + 'static;
924        async fn get_event_stream(
925            &self,
926            request: tonic::Request<super::GetEventStreamRequest>,
927        ) -> std::result::Result<
928            tonic::Response<Self::GetEventStreamStream>,
929            tonic::Status,
930        >;
931        async fn validate_source(
932            &self,
933            request: tonic::Request<super::ValidateSourceRequest>,
934        ) -> std::result::Result<
935            tonic::Response<super::ValidateSourceResponse>,
936            tonic::Status,
937        >;
938    }
939    #[derive(Debug)]
940    pub struct ConnectorServiceServer<T> {
941        inner: Arc<T>,
942        accept_compression_encodings: EnabledCompressionEncodings,
943        send_compression_encodings: EnabledCompressionEncodings,
944        max_decoding_message_size: Option<usize>,
945        max_encoding_message_size: Option<usize>,
946    }
947    impl<T> ConnectorServiceServer<T> {
948        pub fn new(inner: T) -> Self {
949            Self::from_arc(Arc::new(inner))
950        }
951        pub fn from_arc(inner: Arc<T>) -> Self {
952            Self {
953                inner,
954                accept_compression_encodings: Default::default(),
955                send_compression_encodings: Default::default(),
956                max_decoding_message_size: None,
957                max_encoding_message_size: None,
958            }
959        }
960        pub fn with_interceptor<F>(
961            inner: T,
962            interceptor: F,
963        ) -> InterceptedService<Self, F>
964        where
965            F: tonic::service::Interceptor,
966        {
967            InterceptedService::new(Self::new(inner), interceptor)
968        }
969        /// Enable decompressing requests with the given encoding.
970        #[must_use]
971        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
972            self.accept_compression_encodings.enable(encoding);
973            self
974        }
975        /// Compress responses with the given encoding, if the client supports it.
976        #[must_use]
977        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
978            self.send_compression_encodings.enable(encoding);
979            self
980        }
981        /// Limits the maximum size of a decoded message.
982        ///
983        /// Default: `4MB`
984        #[must_use]
985        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
986            self.max_decoding_message_size = Some(limit);
987            self
988        }
989        /// Limits the maximum size of an encoded message.
990        ///
991        /// Default: `usize::MAX`
992        #[must_use]
993        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
994            self.max_encoding_message_size = Some(limit);
995            self
996        }
997    }
998    impl<T, B> tonic::codegen::Service<http::Request<B>> for ConnectorServiceServer<T>
999    where
1000        T: ConnectorService,
1001        B: Body + std::marker::Send + 'static,
1002        B::Error: Into<StdError> + std::marker::Send + 'static,
1003    {
1004        type Response = http::Response<tonic::body::BoxBody>;
1005        type Error = std::convert::Infallible;
1006        type Future = BoxFuture<Self::Response, Self::Error>;
1007        fn poll_ready(
1008            &mut self,
1009            _cx: &mut Context<'_>,
1010        ) -> Poll<std::result::Result<(), Self::Error>> {
1011            Poll::Ready(Ok(()))
1012        }
1013        fn call(&mut self, req: http::Request<B>) -> Self::Future {
1014            match req.uri().path() {
1015                "/connector_service.ConnectorService/SinkWriterStream" => {
1016                    #[allow(non_camel_case_types)]
1017                    struct SinkWriterStreamSvc<T: ConnectorService>(pub Arc<T>);
1018                    impl<
1019                        T: ConnectorService,
1020                    > tonic::server::StreamingService<super::SinkWriterStreamRequest>
1021                    for SinkWriterStreamSvc<T> {
1022                        type Response = super::SinkWriterStreamResponse;
1023                        type ResponseStream = T::SinkWriterStreamStream;
1024                        type Future = BoxFuture<
1025                            tonic::Response<Self::ResponseStream>,
1026                            tonic::Status,
1027                        >;
1028                        fn call(
1029                            &mut self,
1030                            request: tonic::Request<
1031                                tonic::Streaming<super::SinkWriterStreamRequest>,
1032                            >,
1033                        ) -> Self::Future {
1034                            let inner = Arc::clone(&self.0);
1035                            let fut = async move {
1036                                <T as ConnectorService>::sink_writer_stream(&inner, request)
1037                                    .await
1038                            };
1039                            Box::pin(fut)
1040                        }
1041                    }
1042                    let accept_compression_encodings = self.accept_compression_encodings;
1043                    let send_compression_encodings = self.send_compression_encodings;
1044                    let max_decoding_message_size = self.max_decoding_message_size;
1045                    let max_encoding_message_size = self.max_encoding_message_size;
1046                    let inner = self.inner.clone();
1047                    let fut = async move {
1048                        let method = SinkWriterStreamSvc(inner);
1049                        let codec = tonic::codec::ProstCodec::default();
1050                        let mut grpc = tonic::server::Grpc::new(codec)
1051                            .apply_compression_config(
1052                                accept_compression_encodings,
1053                                send_compression_encodings,
1054                            )
1055                            .apply_max_message_size_config(
1056                                max_decoding_message_size,
1057                                max_encoding_message_size,
1058                            );
1059                        let res = grpc.streaming(method, req).await;
1060                        Ok(res)
1061                    };
1062                    Box::pin(fut)
1063                }
1064                "/connector_service.ConnectorService/SinkCoordinatorStream" => {
1065                    #[allow(non_camel_case_types)]
1066                    struct SinkCoordinatorStreamSvc<T: ConnectorService>(pub Arc<T>);
1067                    impl<
1068                        T: ConnectorService,
1069                    > tonic::server::StreamingService<
1070                        super::SinkCoordinatorStreamRequest,
1071                    > for SinkCoordinatorStreamSvc<T> {
1072                        type Response = super::SinkCoordinatorStreamResponse;
1073                        type ResponseStream = T::SinkCoordinatorStreamStream;
1074                        type Future = BoxFuture<
1075                            tonic::Response<Self::ResponseStream>,
1076                            tonic::Status,
1077                        >;
1078                        fn call(
1079                            &mut self,
1080                            request: tonic::Request<
1081                                tonic::Streaming<super::SinkCoordinatorStreamRequest>,
1082                            >,
1083                        ) -> Self::Future {
1084                            let inner = Arc::clone(&self.0);
1085                            let fut = async move {
1086                                <T as ConnectorService>::sink_coordinator_stream(
1087                                        &inner,
1088                                        request,
1089                                    )
1090                                    .await
1091                            };
1092                            Box::pin(fut)
1093                        }
1094                    }
1095                    let accept_compression_encodings = self.accept_compression_encodings;
1096                    let send_compression_encodings = self.send_compression_encodings;
1097                    let max_decoding_message_size = self.max_decoding_message_size;
1098                    let max_encoding_message_size = self.max_encoding_message_size;
1099                    let inner = self.inner.clone();
1100                    let fut = async move {
1101                        let method = SinkCoordinatorStreamSvc(inner);
1102                        let codec = tonic::codec::ProstCodec::default();
1103                        let mut grpc = tonic::server::Grpc::new(codec)
1104                            .apply_compression_config(
1105                                accept_compression_encodings,
1106                                send_compression_encodings,
1107                            )
1108                            .apply_max_message_size_config(
1109                                max_decoding_message_size,
1110                                max_encoding_message_size,
1111                            );
1112                        let res = grpc.streaming(method, req).await;
1113                        Ok(res)
1114                    };
1115                    Box::pin(fut)
1116                }
1117                "/connector_service.ConnectorService/ValidateSink" => {
1118                    #[allow(non_camel_case_types)]
1119                    struct ValidateSinkSvc<T: ConnectorService>(pub Arc<T>);
1120                    impl<
1121                        T: ConnectorService,
1122                    > tonic::server::UnaryService<super::ValidateSinkRequest>
1123                    for ValidateSinkSvc<T> {
1124                        type Response = super::ValidateSinkResponse;
1125                        type Future = BoxFuture<
1126                            tonic::Response<Self::Response>,
1127                            tonic::Status,
1128                        >;
1129                        fn call(
1130                            &mut self,
1131                            request: tonic::Request<super::ValidateSinkRequest>,
1132                        ) -> Self::Future {
1133                            let inner = Arc::clone(&self.0);
1134                            let fut = async move {
1135                                <T as ConnectorService>::validate_sink(&inner, request)
1136                                    .await
1137                            };
1138                            Box::pin(fut)
1139                        }
1140                    }
1141                    let accept_compression_encodings = self.accept_compression_encodings;
1142                    let send_compression_encodings = self.send_compression_encodings;
1143                    let max_decoding_message_size = self.max_decoding_message_size;
1144                    let max_encoding_message_size = self.max_encoding_message_size;
1145                    let inner = self.inner.clone();
1146                    let fut = async move {
1147                        let method = ValidateSinkSvc(inner);
1148                        let codec = tonic::codec::ProstCodec::default();
1149                        let mut grpc = tonic::server::Grpc::new(codec)
1150                            .apply_compression_config(
1151                                accept_compression_encodings,
1152                                send_compression_encodings,
1153                            )
1154                            .apply_max_message_size_config(
1155                                max_decoding_message_size,
1156                                max_encoding_message_size,
1157                            );
1158                        let res = grpc.unary(method, req).await;
1159                        Ok(res)
1160                    };
1161                    Box::pin(fut)
1162                }
1163                "/connector_service.ConnectorService/GetEventStream" => {
1164                    #[allow(non_camel_case_types)]
1165                    struct GetEventStreamSvc<T: ConnectorService>(pub Arc<T>);
1166                    impl<
1167                        T: ConnectorService,
1168                    > tonic::server::ServerStreamingService<super::GetEventStreamRequest>
1169                    for GetEventStreamSvc<T> {
1170                        type Response = super::GetEventStreamResponse;
1171                        type ResponseStream = T::GetEventStreamStream;
1172                        type Future = BoxFuture<
1173                            tonic::Response<Self::ResponseStream>,
1174                            tonic::Status,
1175                        >;
1176                        fn call(
1177                            &mut self,
1178                            request: tonic::Request<super::GetEventStreamRequest>,
1179                        ) -> Self::Future {
1180                            let inner = Arc::clone(&self.0);
1181                            let fut = async move {
1182                                <T as ConnectorService>::get_event_stream(&inner, request)
1183                                    .await
1184                            };
1185                            Box::pin(fut)
1186                        }
1187                    }
1188                    let accept_compression_encodings = self.accept_compression_encodings;
1189                    let send_compression_encodings = self.send_compression_encodings;
1190                    let max_decoding_message_size = self.max_decoding_message_size;
1191                    let max_encoding_message_size = self.max_encoding_message_size;
1192                    let inner = self.inner.clone();
1193                    let fut = async move {
1194                        let method = GetEventStreamSvc(inner);
1195                        let codec = tonic::codec::ProstCodec::default();
1196                        let mut grpc = tonic::server::Grpc::new(codec)
1197                            .apply_compression_config(
1198                                accept_compression_encodings,
1199                                send_compression_encodings,
1200                            )
1201                            .apply_max_message_size_config(
1202                                max_decoding_message_size,
1203                                max_encoding_message_size,
1204                            );
1205                        let res = grpc.server_streaming(method, req).await;
1206                        Ok(res)
1207                    };
1208                    Box::pin(fut)
1209                }
1210                "/connector_service.ConnectorService/ValidateSource" => {
1211                    #[allow(non_camel_case_types)]
1212                    struct ValidateSourceSvc<T: ConnectorService>(pub Arc<T>);
1213                    impl<
1214                        T: ConnectorService,
1215                    > tonic::server::UnaryService<super::ValidateSourceRequest>
1216                    for ValidateSourceSvc<T> {
1217                        type Response = super::ValidateSourceResponse;
1218                        type Future = BoxFuture<
1219                            tonic::Response<Self::Response>,
1220                            tonic::Status,
1221                        >;
1222                        fn call(
1223                            &mut self,
1224                            request: tonic::Request<super::ValidateSourceRequest>,
1225                        ) -> Self::Future {
1226                            let inner = Arc::clone(&self.0);
1227                            let fut = async move {
1228                                <T as ConnectorService>::validate_source(&inner, request)
1229                                    .await
1230                            };
1231                            Box::pin(fut)
1232                        }
1233                    }
1234                    let accept_compression_encodings = self.accept_compression_encodings;
1235                    let send_compression_encodings = self.send_compression_encodings;
1236                    let max_decoding_message_size = self.max_decoding_message_size;
1237                    let max_encoding_message_size = self.max_encoding_message_size;
1238                    let inner = self.inner.clone();
1239                    let fut = async move {
1240                        let method = ValidateSourceSvc(inner);
1241                        let codec = tonic::codec::ProstCodec::default();
1242                        let mut grpc = tonic::server::Grpc::new(codec)
1243                            .apply_compression_config(
1244                                accept_compression_encodings,
1245                                send_compression_encodings,
1246                            )
1247                            .apply_max_message_size_config(
1248                                max_decoding_message_size,
1249                                max_encoding_message_size,
1250                            );
1251                        let res = grpc.unary(method, req).await;
1252                        Ok(res)
1253                    };
1254                    Box::pin(fut)
1255                }
1256                _ => {
1257                    Box::pin(async move {
1258                        let mut response = http::Response::new(empty_body());
1259                        let headers = response.headers_mut();
1260                        headers
1261                            .insert(
1262                                tonic::Status::GRPC_STATUS,
1263                                (tonic::Code::Unimplemented as i32).into(),
1264                            );
1265                        headers
1266                            .insert(
1267                                http::header::CONTENT_TYPE,
1268                                tonic::metadata::GRPC_CONTENT_TYPE,
1269                            );
1270                        Ok(response)
1271                    })
1272                }
1273            }
1274        }
1275    }
1276    impl<T> Clone for ConnectorServiceServer<T> {
1277        fn clone(&self) -> Self {
1278            let inner = self.inner.clone();
1279            Self {
1280                inner,
1281                accept_compression_encodings: self.accept_compression_encodings,
1282                send_compression_encodings: self.send_compression_encodings,
1283                max_decoding_message_size: self.max_decoding_message_size,
1284                max_encoding_message_size: self.max_encoding_message_size,
1285            }
1286        }
1287    }
1288    /// Generated gRPC service name
1289    pub const SERVICE_NAME: &str = "connector_service.ConnectorService";
1290    impl<T> tonic::server::NamedService for ConnectorServiceServer<T> {
1291        const NAME: &'static str = SERVICE_NAME;
1292    }
1293}
1294/// Generated server implementations.
1295pub mod sink_coordination_service_server {
1296    #![allow(
1297        unused_variables,
1298        dead_code,
1299        missing_docs,
1300        clippy::wildcard_imports,
1301        clippy::let_unit_value,
1302    )]
1303    use tonic::codegen::*;
1304    /// Generated trait containing gRPC methods that should be implemented for use with SinkCoordinationServiceServer.
1305    #[async_trait]
1306    pub trait SinkCoordinationService: std::marker::Send + std::marker::Sync + 'static {
1307        /// Server streaming response type for the Coordinate method.
1308        type CoordinateStream: tonic::codegen::tokio_stream::Stream<
1309                Item = std::result::Result<super::CoordinateResponse, tonic::Status>,
1310            >
1311            + std::marker::Send
1312            + 'static;
1313        async fn coordinate(
1314            &self,
1315            request: tonic::Request<tonic::Streaming<super::CoordinateRequest>>,
1316        ) -> std::result::Result<tonic::Response<Self::CoordinateStream>, tonic::Status>;
1317    }
1318    #[derive(Debug)]
1319    pub struct SinkCoordinationServiceServer<T> {
1320        inner: Arc<T>,
1321        accept_compression_encodings: EnabledCompressionEncodings,
1322        send_compression_encodings: EnabledCompressionEncodings,
1323        max_decoding_message_size: Option<usize>,
1324        max_encoding_message_size: Option<usize>,
1325    }
1326    impl<T> SinkCoordinationServiceServer<T> {
1327        pub fn new(inner: T) -> Self {
1328            Self::from_arc(Arc::new(inner))
1329        }
1330        pub fn from_arc(inner: Arc<T>) -> Self {
1331            Self {
1332                inner,
1333                accept_compression_encodings: Default::default(),
1334                send_compression_encodings: Default::default(),
1335                max_decoding_message_size: None,
1336                max_encoding_message_size: None,
1337            }
1338        }
1339        pub fn with_interceptor<F>(
1340            inner: T,
1341            interceptor: F,
1342        ) -> InterceptedService<Self, F>
1343        where
1344            F: tonic::service::Interceptor,
1345        {
1346            InterceptedService::new(Self::new(inner), interceptor)
1347        }
1348        /// Enable decompressing requests with the given encoding.
1349        #[must_use]
1350        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
1351            self.accept_compression_encodings.enable(encoding);
1352            self
1353        }
1354        /// Compress responses with the given encoding, if the client supports it.
1355        #[must_use]
1356        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
1357            self.send_compression_encodings.enable(encoding);
1358            self
1359        }
1360        /// Limits the maximum size of a decoded message.
1361        ///
1362        /// Default: `4MB`
1363        #[must_use]
1364        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
1365            self.max_decoding_message_size = Some(limit);
1366            self
1367        }
1368        /// Limits the maximum size of an encoded message.
1369        ///
1370        /// Default: `usize::MAX`
1371        #[must_use]
1372        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
1373            self.max_encoding_message_size = Some(limit);
1374            self
1375        }
1376    }
1377    impl<T, B> tonic::codegen::Service<http::Request<B>>
1378    for SinkCoordinationServiceServer<T>
1379    where
1380        T: SinkCoordinationService,
1381        B: Body + std::marker::Send + 'static,
1382        B::Error: Into<StdError> + std::marker::Send + 'static,
1383    {
1384        type Response = http::Response<tonic::body::BoxBody>;
1385        type Error = std::convert::Infallible;
1386        type Future = BoxFuture<Self::Response, Self::Error>;
1387        fn poll_ready(
1388            &mut self,
1389            _cx: &mut Context<'_>,
1390        ) -> Poll<std::result::Result<(), Self::Error>> {
1391            Poll::Ready(Ok(()))
1392        }
1393        fn call(&mut self, req: http::Request<B>) -> Self::Future {
1394            match req.uri().path() {
1395                "/connector_service.SinkCoordinationService/Coordinate" => {
1396                    #[allow(non_camel_case_types)]
1397                    struct CoordinateSvc<T: SinkCoordinationService>(pub Arc<T>);
1398                    impl<
1399                        T: SinkCoordinationService,
1400                    > tonic::server::StreamingService<super::CoordinateRequest>
1401                    for CoordinateSvc<T> {
1402                        type Response = super::CoordinateResponse;
1403                        type ResponseStream = T::CoordinateStream;
1404                        type Future = BoxFuture<
1405                            tonic::Response<Self::ResponseStream>,
1406                            tonic::Status,
1407                        >;
1408                        fn call(
1409                            &mut self,
1410                            request: tonic::Request<
1411                                tonic::Streaming<super::CoordinateRequest>,
1412                            >,
1413                        ) -> Self::Future {
1414                            let inner = Arc::clone(&self.0);
1415                            let fut = async move {
1416                                <T as SinkCoordinationService>::coordinate(&inner, request)
1417                                    .await
1418                            };
1419                            Box::pin(fut)
1420                        }
1421                    }
1422                    let accept_compression_encodings = self.accept_compression_encodings;
1423                    let send_compression_encodings = self.send_compression_encodings;
1424                    let max_decoding_message_size = self.max_decoding_message_size;
1425                    let max_encoding_message_size = self.max_encoding_message_size;
1426                    let inner = self.inner.clone();
1427                    let fut = async move {
1428                        let method = CoordinateSvc(inner);
1429                        let codec = tonic::codec::ProstCodec::default();
1430                        let mut grpc = tonic::server::Grpc::new(codec)
1431                            .apply_compression_config(
1432                                accept_compression_encodings,
1433                                send_compression_encodings,
1434                            )
1435                            .apply_max_message_size_config(
1436                                max_decoding_message_size,
1437                                max_encoding_message_size,
1438                            );
1439                        let res = grpc.streaming(method, req).await;
1440                        Ok(res)
1441                    };
1442                    Box::pin(fut)
1443                }
1444                _ => {
1445                    Box::pin(async move {
1446                        let mut response = http::Response::new(empty_body());
1447                        let headers = response.headers_mut();
1448                        headers
1449                            .insert(
1450                                tonic::Status::GRPC_STATUS,
1451                                (tonic::Code::Unimplemented as i32).into(),
1452                            );
1453                        headers
1454                            .insert(
1455                                http::header::CONTENT_TYPE,
1456                                tonic::metadata::GRPC_CONTENT_TYPE,
1457                            );
1458                        Ok(response)
1459                    })
1460                }
1461            }
1462        }
1463    }
1464    impl<T> Clone for SinkCoordinationServiceServer<T> {
1465        fn clone(&self) -> Self {
1466            let inner = self.inner.clone();
1467            Self {
1468                inner,
1469                accept_compression_encodings: self.accept_compression_encodings,
1470                send_compression_encodings: self.send_compression_encodings,
1471                max_decoding_message_size: self.max_decoding_message_size,
1472                max_encoding_message_size: self.max_encoding_message_size,
1473            }
1474        }
1475    }
1476    /// Generated gRPC service name
1477    pub const SERVICE_NAME: &str = "connector_service.SinkCoordinationService";
1478    impl<T> tonic::server::NamedService for SinkCoordinationServiceServer<T> {
1479        const NAME: &'static str = SERVICE_NAME;
1480    }
1481}