common_meta/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_procedure::ProcedureId;
22use common_wal::options::WalOptions;
23use serde_json::error::Error as JsonError;
24use snafu::{Location, Snafu};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::DatanodeId;
29use crate::peer::Peer;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35    #[snafu(display("Empty key is not allowed"))]
36    EmptyKey {
37        #[snafu(implicit)]
38        location: Location,
39    },
40
41    #[snafu(display(
42        "Another procedure is operating the region: {} on peer: {}",
43        region_id,
44        peer_id
45    ))]
46    RegionOperatingRace {
47        #[snafu(implicit)]
48        location: Location,
49        peer_id: DatanodeId,
50        region_id: RegionId,
51    },
52
53    #[snafu(display("Failed to connect to Etcd"))]
54    ConnectEtcd {
55        #[snafu(source)]
56        error: etcd_client::Error,
57        #[snafu(implicit)]
58        location: Location,
59    },
60
61    #[snafu(display("Failed to execute via Etcd"))]
62    EtcdFailed {
63        #[snafu(source)]
64        error: etcd_client::Error,
65        #[snafu(implicit)]
66        location: Location,
67    },
68
69    #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
70    EtcdTxnFailed {
71        max_operations: usize,
72        #[snafu(source)]
73        error: etcd_client::Error,
74        #[snafu(implicit)]
75        location: Location,
76    },
77
78    #[snafu(display("Failed to get sequence: {}", err_msg))]
79    NextSequence {
80        err_msg: String,
81        #[snafu(implicit)]
82        location: Location,
83    },
84
85    #[snafu(display("Unexpected sequence value: {}", err_msg))]
86    UnexpectedSequenceValue {
87        err_msg: String,
88        #[snafu(implicit)]
89        location: Location,
90    },
91
92    #[snafu(display("Table info not found: {}", table))]
93    TableInfoNotFound {
94        table: String,
95        #[snafu(implicit)]
96        location: Location,
97    },
98
99    #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
100    RegisterProcedureLoader {
101        type_name: String,
102        #[snafu(implicit)]
103        location: Location,
104        source: common_procedure::error::Error,
105    },
106
107    #[snafu(display("Failed to register repartition procedure loader"))]
108    RegisterRepartitionProcedureLoader {
109        #[snafu(implicit)]
110        location: Location,
111        source: BoxedError,
112    },
113
114    #[snafu(display("Failed to create repartition procedure"))]
115    CreateRepartitionProcedure {
116        source: BoxedError,
117        #[snafu(implicit)]
118        location: Location,
119    },
120
121    #[snafu(display("Failed to submit procedure"))]
122    SubmitProcedure {
123        #[snafu(implicit)]
124        location: Location,
125        source: common_procedure::Error,
126    },
127
128    #[snafu(display("Failed to query procedure"))]
129    QueryProcedure {
130        #[snafu(implicit)]
131        location: Location,
132        source: common_procedure::Error,
133    },
134
135    #[snafu(display("Procedure not found: {pid}"))]
136    ProcedureNotFound {
137        #[snafu(implicit)]
138        location: Location,
139        pid: String,
140    },
141
142    #[snafu(display("Failed to parse procedure id: {key}"))]
143    ParseProcedureId {
144        #[snafu(implicit)]
145        location: Location,
146        key: String,
147        #[snafu(source)]
148        error: common_procedure::ParseIdError,
149    },
150
151    #[snafu(display("Unsupported operation {}", operation))]
152    Unsupported {
153        operation: String,
154        #[snafu(implicit)]
155        location: Location,
156    },
157
158    #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
159    ProcedureStateReceiver {
160        procedure_id: ProcedureId,
161        #[snafu(implicit)]
162        location: Location,
163        source: common_procedure::Error,
164    },
165
166    #[snafu(display("Procedure state receiver not found: {procedure_id}"))]
167    ProcedureStateReceiverNotFound {
168        procedure_id: ProcedureId,
169        #[snafu(implicit)]
170        location: Location,
171    },
172
173    #[snafu(display("Failed to wait procedure done"))]
174    WaitProcedure {
175        #[snafu(implicit)]
176        location: Location,
177        source: common_procedure::Error,
178    },
179
180    #[snafu(display("Failed to start procedure manager"))]
181    StartProcedureManager {
182        #[snafu(implicit)]
183        location: Location,
184        source: common_procedure::Error,
185    },
186
187    #[snafu(display("Failed to stop procedure manager"))]
188    StopProcedureManager {
189        #[snafu(implicit)]
190        location: Location,
191        source: common_procedure::Error,
192    },
193
194    #[snafu(display(
195        "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
196    ))]
197    ProcedureOutput {
198        procedure_id: String,
199        err_msg: String,
200        #[snafu(implicit)]
201        location: Location,
202    },
203
204    #[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
205    ConvertRawTableInfo {
206        #[snafu(implicit)]
207        location: Location,
208        source: datatypes::Error,
209    },
210
211    #[snafu(display("Primary key '{key}' not found when creating region request"))]
212    PrimaryKeyNotFound {
213        key: String,
214        #[snafu(implicit)]
215        location: Location,
216    },
217
218    #[snafu(display("Failed to build table meta for table: {}", table_name))]
219    BuildTableMeta {
220        table_name: String,
221        #[snafu(source)]
222        error: table::metadata::TableMetaBuilderError,
223        #[snafu(implicit)]
224        location: Location,
225    },
226
227    #[snafu(display("Table occurs error"))]
228    Table {
229        #[snafu(implicit)]
230        location: Location,
231        source: table::error::Error,
232    },
233
234    #[snafu(display("Failed to find table route for table id {}", table_id))]
235    TableRouteNotFound {
236        table_id: TableId,
237        #[snafu(implicit)]
238        location: Location,
239    },
240
241    #[snafu(display("Failed to find table repartition metadata for table id {}", table_id))]
242    TableRepartNotFound {
243        table_id: TableId,
244        #[snafu(implicit)]
245        location: Location,
246    },
247
248    #[snafu(display("Failed to decode protobuf"))]
249    DecodeProto {
250        #[snafu(implicit)]
251        location: Location,
252        #[snafu(source)]
253        error: prost::DecodeError,
254    },
255
256    #[snafu(display("Failed to encode object into json"))]
257    EncodeJson {
258        #[snafu(implicit)]
259        location: Location,
260        #[snafu(source)]
261        error: JsonError,
262    },
263
264    #[snafu(display("Failed to decode object from json"))]
265    DecodeJson {
266        #[snafu(implicit)]
267        location: Location,
268        #[snafu(source)]
269        error: JsonError,
270    },
271
272    #[snafu(display("Failed to serialize to json: {}", input))]
273    SerializeToJson {
274        input: String,
275        #[snafu(source)]
276        error: serde_json::error::Error,
277        #[snafu(implicit)]
278        location: Location,
279    },
280
281    #[snafu(display("Failed to deserialize from json: {}", input))]
282    DeserializeFromJson {
283        input: String,
284        #[snafu(source)]
285        error: serde_json::error::Error,
286        #[snafu(implicit)]
287        location: Location,
288    },
289
290    #[snafu(display("Payload not exist"))]
291    PayloadNotExist {
292        #[snafu(implicit)]
293        location: Location,
294    },
295
296    #[snafu(display("Failed to serde json"))]
297    SerdeJson {
298        #[snafu(source)]
299        error: serde_json::error::Error,
300        #[snafu(implicit)]
301        location: Location,
302    },
303
304    #[snafu(display("Failed to parse value {} into key {}", value, key))]
305    ParseOption {
306        key: String,
307        value: String,
308        #[snafu(implicit)]
309        location: Location,
310    },
311
312    #[snafu(display("Corrupted table route data, err: {}", err_msg))]
313    RouteInfoCorrupted {
314        err_msg: String,
315        #[snafu(implicit)]
316        location: Location,
317    },
318
319    #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
320    IllegalServerState {
321        code: i32,
322        err_msg: String,
323        #[snafu(implicit)]
324        location: Location,
325    },
326
327    #[snafu(display("Failed to convert alter table request"))]
328    ConvertAlterTableRequest {
329        source: common_grpc_expr::error::Error,
330        #[snafu(implicit)]
331        location: Location,
332    },
333
334    #[snafu(display("Invalid protobuf message: {err_msg}"))]
335    InvalidProtoMsg {
336        err_msg: String,
337        #[snafu(implicit)]
338        location: Location,
339    },
340
341    #[snafu(display("Unexpected: {err_msg}"))]
342    Unexpected {
343        err_msg: String,
344        #[snafu(implicit)]
345        location: Location,
346    },
347
348    #[snafu(display("Table already exists, table: {}", table_name))]
349    TableAlreadyExists {
350        table_name: String,
351        #[snafu(implicit)]
352        location: Location,
353    },
354
355    #[snafu(display("View already exists, view: {}", view_name))]
356    ViewAlreadyExists {
357        view_name: String,
358        #[snafu(implicit)]
359        location: Location,
360    },
361
362    #[snafu(display("Flow already exists: {}", flow_name))]
363    FlowAlreadyExists {
364        flow_name: String,
365        #[snafu(implicit)]
366        location: Location,
367    },
368
369    #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
370    SchemaAlreadyExists {
371        catalog: String,
372        schema: String,
373        #[snafu(implicit)]
374        location: Location,
375    },
376
377    #[snafu(display("Failed to convert raw key to str"))]
378    ConvertRawKey {
379        #[snafu(implicit)]
380        location: Location,
381        #[snafu(source)]
382        error: Utf8Error,
383    },
384
385    #[snafu(display("Table not found: '{}'", table_name))]
386    TableNotFound {
387        table_name: String,
388        #[snafu(implicit)]
389        location: Location,
390    },
391
392    #[snafu(display("Region not found: {}", region_id))]
393    RegionNotFound {
394        region_id: RegionId,
395        #[snafu(implicit)]
396        location: Location,
397    },
398
399    #[snafu(display("View not found: '{}'", view_name))]
400    ViewNotFound {
401        view_name: String,
402        #[snafu(implicit)]
403        location: Location,
404    },
405
406    #[snafu(display("Flow not found: '{}'", flow_name))]
407    FlowNotFound {
408        flow_name: String,
409        #[snafu(implicit)]
410        location: Location,
411    },
412
413    #[snafu(display("Flow route not found: '{}'", flow_name))]
414    FlowRouteNotFound {
415        flow_name: String,
416        #[snafu(implicit)]
417        location: Location,
418    },
419
420    #[snafu(display("Schema nod found, schema: {}", table_schema))]
421    SchemaNotFound {
422        table_schema: String,
423        #[snafu(implicit)]
424        location: Location,
425    },
426
427    #[snafu(display("Catalog not found, catalog: {}", catalog))]
428    CatalogNotFound {
429        catalog: String,
430        #[snafu(implicit)]
431        location: Location,
432    },
433
434    #[snafu(display("Invalid metadata, err: {}", err_msg))]
435    InvalidMetadata {
436        err_msg: String,
437        #[snafu(implicit)]
438        location: Location,
439    },
440
441    #[snafu(display("Invalid view info, err: {}", err_msg))]
442    InvalidViewInfo {
443        err_msg: String,
444        #[snafu(implicit)]
445        location: Location,
446    },
447
448    #[snafu(display("Invalid flow request body: {:?}", body))]
449    InvalidFlowRequestBody {
450        body: Box<Option<api::v1::flow::flow_request::Body>>,
451        #[snafu(implicit)]
452        location: Location,
453    },
454
455    #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
456    GetKvCache { err_msg: String },
457
458    #[snafu(display("Get null from cache, key: {}", key))]
459    CacheNotGet {
460        key: String,
461        #[snafu(implicit)]
462        location: Location,
463    },
464
465    #[snafu(display("Etcd txn error: {err_msg}"))]
466    EtcdTxnOpResponse {
467        err_msg: String,
468        #[snafu(implicit)]
469        location: Location,
470    },
471
472    #[snafu(display("External error"))]
473    External {
474        #[snafu(implicit)]
475        location: Location,
476        source: BoxedError,
477    },
478
479    #[snafu(display("The response exceeded size limit"))]
480    ResponseExceededSizeLimit {
481        #[snafu(implicit)]
482        location: Location,
483        source: BoxedError,
484    },
485
486    #[snafu(display("Invalid heartbeat response"))]
487    InvalidHeartbeatResponse {
488        #[snafu(implicit)]
489        location: Location,
490    },
491
492    #[snafu(display("Failed to operate on datanode: {}", peer))]
493    OperateDatanode {
494        #[snafu(implicit)]
495        location: Location,
496        peer: Peer,
497        source: BoxedError,
498    },
499
500    #[snafu(display("Retry later"))]
501    RetryLater {
502        source: BoxedError,
503        clean_poisons: bool,
504    },
505
506    #[snafu(display("Abort procedure"))]
507    AbortProcedure {
508        #[snafu(implicit)]
509        location: Location,
510        source: BoxedError,
511        clean_poisons: bool,
512    },
513
514    #[snafu(display(
515        "Failed to encode a wal options to json string, wal_options: {:?}",
516        wal_options
517    ))]
518    EncodeWalOptions {
519        wal_options: WalOptions,
520        #[snafu(source)]
521        error: serde_json::Error,
522        #[snafu(implicit)]
523        location: Location,
524    },
525
526    #[snafu(display("Invalid number of topics {}", num_topics))]
527    InvalidNumTopics {
528        num_topics: usize,
529        #[snafu(implicit)]
530        location: Location,
531    },
532
533    #[snafu(display(
534        "Failed to build a Kafka client, broker endpoints: {:?}",
535        broker_endpoints
536    ))]
537    BuildKafkaClient {
538        broker_endpoints: Vec<String>,
539        #[snafu(implicit)]
540        location: Location,
541        #[snafu(source)]
542        error: rskafka::client::error::Error,
543    },
544
545    #[snafu(display("Failed to create TLS Config"))]
546    TlsConfig {
547        #[snafu(implicit)]
548        location: Location,
549        source: common_wal::error::Error,
550    },
551
552    #[snafu(display("Failed to build a Kafka controller client"))]
553    BuildKafkaCtrlClient {
554        #[snafu(implicit)]
555        location: Location,
556        #[snafu(source)]
557        error: rskafka::client::error::Error,
558    },
559
560    #[snafu(display(
561        "Failed to get a Kafka partition client, topic: {}, partition: {}",
562        topic,
563        partition
564    ))]
565    KafkaPartitionClient {
566        topic: String,
567        partition: i32,
568        #[snafu(implicit)]
569        location: Location,
570        #[snafu(source)]
571        error: rskafka::client::error::Error,
572    },
573
574    #[snafu(display(
575        "Failed to get offset from Kafka, topic: {}, partition: {}",
576        topic,
577        partition
578    ))]
579    KafkaGetOffset {
580        topic: String,
581        partition: i32,
582        #[snafu(implicit)]
583        location: Location,
584        #[snafu(source)]
585        error: rskafka::client::error::Error,
586    },
587
588    #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
589    ProduceRecord {
590        topic: String,
591        #[snafu(implicit)]
592        location: Location,
593        #[snafu(source)]
594        error: rskafka::client::error::Error,
595    },
596
597    #[snafu(display("Failed to create a Kafka wal topic"))]
598    CreateKafkaWalTopic {
599        #[snafu(implicit)]
600        location: Location,
601        #[snafu(source)]
602        error: rskafka::client::error::Error,
603    },
604
605    #[snafu(display("The topic pool is empty"))]
606    EmptyTopicPool {
607        #[snafu(implicit)]
608        location: Location,
609    },
610
611    #[snafu(display("Unexpected table route type: {}", err_msg))]
612    UnexpectedLogicalRouteTable {
613        #[snafu(implicit)]
614        location: Location,
615        err_msg: String,
616    },
617
618    #[snafu(display("The tasks of {} cannot be empty", name))]
619    EmptyDdlTasks {
620        name: String,
621        #[snafu(implicit)]
622        location: Location,
623    },
624
625    #[snafu(display("Metadata corruption: {}", err_msg))]
626    MetadataCorruption {
627        err_msg: String,
628        #[snafu(implicit)]
629        location: Location,
630    },
631
632    #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
633    AlterLogicalTablesInvalidArguments {
634        err_msg: String,
635        #[snafu(implicit)]
636        location: Location,
637    },
638
639    #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
640    CreateLogicalTablesInvalidArguments {
641        err_msg: String,
642        #[snafu(implicit)]
643        location: Location,
644    },
645
646    #[snafu(display("Invalid node info key: {}", key))]
647    InvalidNodeInfoKey {
648        key: String,
649        #[snafu(implicit)]
650        location: Location,
651    },
652
653    #[snafu(display("Invalid node stat key: {}", key))]
654    InvalidStatKey {
655        key: String,
656        #[snafu(implicit)]
657        location: Location,
658    },
659
660    #[snafu(display("Failed to parse number: {}", err_msg))]
661    ParseNum {
662        err_msg: String,
663        #[snafu(source)]
664        error: std::num::ParseIntError,
665        #[snafu(implicit)]
666        location: Location,
667    },
668
669    #[snafu(display("Invalid role: {}", role))]
670    InvalidRole {
671        role: i32,
672        #[snafu(implicit)]
673        location: Location,
674    },
675
676    #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
677    InvalidSetDatabaseOption {
678        key: String,
679        value: String,
680        #[snafu(implicit)]
681        location: Location,
682    },
683
684    #[snafu(display("Invalid unset database option, key: {}", key))]
685    InvalidUnsetDatabaseOption {
686        key: String,
687        #[snafu(implicit)]
688        location: Location,
689    },
690
691    #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
692    MismatchPrefix {
693        prefix: String,
694        key: String,
695        #[snafu(implicit)]
696        location: Location,
697    },
698
699    #[snafu(display("Failed to move values: {err_msg}"))]
700    MoveValues {
701        err_msg: String,
702        #[snafu(implicit)]
703        location: Location,
704    },
705
706    #[snafu(display("Failed to parse {} from utf8", name))]
707    FromUtf8 {
708        name: String,
709        #[snafu(source)]
710        error: std::string::FromUtf8Error,
711        #[snafu(implicit)]
712        location: Location,
713    },
714
715    #[snafu(display("Value not exists"))]
716    ValueNotExist {
717        #[snafu(implicit)]
718        location: Location,
719    },
720
721    #[snafu(display("Failed to get cache"))]
722    GetCache { source: Arc<Error> },
723
724    #[cfg(feature = "pg_kvbackend")]
725    #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
726    PostgresExecution {
727        sql: String,
728        #[snafu(source)]
729        error: tokio_postgres::Error,
730        #[snafu(implicit)]
731        location: Location,
732    },
733
734    #[cfg(feature = "pg_kvbackend")]
735    #[snafu(display("Failed to create connection pool for Postgres"))]
736    CreatePostgresPool {
737        #[snafu(source)]
738        error: deadpool_postgres::CreatePoolError,
739        #[snafu(implicit)]
740        location: Location,
741    },
742
743    #[cfg(feature = "pg_kvbackend")]
744    #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
745    GetPostgresConnection {
746        reason: String,
747        #[snafu(implicit)]
748        location: Location,
749    },
750
751    #[cfg(feature = "pg_kvbackend")]
752    #[snafu(display("Failed to {} Postgres transaction", operation))]
753    PostgresTransaction {
754        #[snafu(source)]
755        error: tokio_postgres::Error,
756        #[snafu(implicit)]
757        location: Location,
758        operation: String,
759    },
760
761    #[cfg(feature = "pg_kvbackend")]
762    #[snafu(display("Failed to setup PostgreSQL TLS configuration: {}", reason))]
763    PostgresTlsConfig {
764        reason: String,
765        #[snafu(implicit)]
766        location: Location,
767    },
768
769    #[snafu(display("Failed to load TLS certificate from path: {}", path))]
770    LoadTlsCertificate {
771        path: String,
772        #[snafu(source)]
773        error: std::io::Error,
774        #[snafu(implicit)]
775        location: Location,
776    },
777
778    #[cfg(feature = "pg_kvbackend")]
779    #[snafu(display("Invalid TLS configuration: {}", reason))]
780    InvalidTlsConfig {
781        reason: String,
782        #[snafu(implicit)]
783        location: Location,
784    },
785
786    #[cfg(feature = "mysql_kvbackend")]
787    #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
788    MySqlExecution {
789        sql: String,
790        #[snafu(source)]
791        error: sqlx::Error,
792        #[snafu(implicit)]
793        location: Location,
794    },
795
796    #[cfg(feature = "mysql_kvbackend")]
797    #[snafu(display("Failed to create connection pool for MySql"))]
798    CreateMySqlPool {
799        #[snafu(source)]
800        error: sqlx::Error,
801        #[snafu(implicit)]
802        location: Location,
803    },
804
805    #[cfg(feature = "mysql_kvbackend")]
806    #[snafu(display("Failed to {} MySql transaction", operation))]
807    MySqlTransaction {
808        #[snafu(source)]
809        error: sqlx::Error,
810        #[snafu(implicit)]
811        location: Location,
812        operation: String,
813    },
814
815    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
816    #[snafu(display("Rds transaction retry failed"))]
817    RdsTransactionRetryFailed {
818        #[snafu(implicit)]
819        location: Location,
820    },
821
822    #[snafu(display(
823        "Datanode table info not found, table id: {}, datanode id: {}",
824        table_id,
825        datanode_id
826    ))]
827    DatanodeTableInfoNotFound {
828        datanode_id: DatanodeId,
829        table_id: TableId,
830        #[snafu(implicit)]
831        location: Location,
832    },
833
834    #[snafu(display("Invalid topic name prefix: {}", prefix))]
835    InvalidTopicNamePrefix {
836        prefix: String,
837        #[snafu(implicit)]
838        location: Location,
839    },
840
841    #[snafu(display("Failed to parse wal options: {}", wal_options))]
842    ParseWalOptions {
843        wal_options: String,
844        #[snafu(implicit)]
845        location: Location,
846        #[snafu(source)]
847        error: serde_json::Error,
848    },
849
850    #[snafu(display("No leader found for table_id: {}", table_id))]
851    NoLeader {
852        table_id: TableId,
853        #[snafu(implicit)]
854        location: Location,
855    },
856
857    #[snafu(display(
858        "Procedure poison key already exists with a different value, key: {}, value: {}",
859        key,
860        value
861    ))]
862    ProcedurePoisonConflict {
863        key: String,
864        value: String,
865        #[snafu(implicit)]
866        location: Location,
867    },
868
869    #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
870    PutPoison {
871        #[snafu(implicit)]
872        location: Location,
873        #[snafu(source)]
874        source: common_procedure::error::Error,
875    },
876
877    #[snafu(display("Failed to parse timezone"))]
878    InvalidTimeZone {
879        #[snafu(implicit)]
880        location: Location,
881        #[snafu(source)]
882        error: common_time::error::Error,
883    },
884    #[snafu(display("Invalid file path: {}", file_path))]
885    InvalidFilePath {
886        #[snafu(implicit)]
887        location: Location,
888        file_path: String,
889    },
890
891    #[snafu(display("Failed to serialize flexbuffers"))]
892    SerializeFlexbuffers {
893        #[snafu(implicit)]
894        location: Location,
895        #[snafu(source)]
896        error: flexbuffers::SerializationError,
897    },
898
899    #[snafu(display("Failed to deserialize flexbuffers"))]
900    DeserializeFlexbuffers {
901        #[snafu(implicit)]
902        location: Location,
903        #[snafu(source)]
904        error: flexbuffers::DeserializationError,
905    },
906
907    #[snafu(display("Failed to read flexbuffers"))]
908    ReadFlexbuffers {
909        #[snafu(implicit)]
910        location: Location,
911        #[snafu(source)]
912        error: flexbuffers::ReaderError,
913    },
914
915    #[snafu(display("Invalid file name: {}", reason))]
916    InvalidFileName {
917        #[snafu(implicit)]
918        location: Location,
919        reason: String,
920    },
921
922    #[snafu(display("Invalid file extension: {}", reason))]
923    InvalidFileExtension {
924        #[snafu(implicit)]
925        location: Location,
926        reason: String,
927    },
928
929    #[snafu(display("Failed to write object, file path: {}", file_path))]
930    WriteObject {
931        #[snafu(implicit)]
932        location: Location,
933        file_path: String,
934        #[snafu(source)]
935        error: object_store::Error,
936    },
937
938    #[snafu(display("Failed to read object, file path: {}", file_path))]
939    ReadObject {
940        #[snafu(implicit)]
941        location: Location,
942        file_path: String,
943        #[snafu(source)]
944        error: object_store::Error,
945    },
946
947    #[snafu(display("Missing column ids"))]
948    MissingColumnIds {
949        #[snafu(implicit)]
950        location: Location,
951    },
952
953    #[snafu(display(
954        "Missing column in column metadata: {}, table: {}, table_id: {}",
955        column_name,
956        table_name,
957        table_id,
958    ))]
959    MissingColumnInColumnMetadata {
960        column_name: String,
961        #[snafu(implicit)]
962        location: Location,
963        table_name: String,
964        table_id: TableId,
965    },
966
967    #[snafu(display(
968        "Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}",
969        column_name,
970        column_id,
971        table_name,
972        table_id,
973    ))]
974    MismatchColumnId {
975        column_name: String,
976        column_id: u32,
977        #[snafu(implicit)]
978        location: Location,
979        table_name: String,
980        table_id: TableId,
981    },
982
983    #[snafu(display("Failed to convert column def, column: {}", column))]
984    ConvertColumnDef {
985        column: String,
986        #[snafu(implicit)]
987        location: Location,
988        source: api::error::Error,
989    },
990
991    #[snafu(display("Failed to convert time ranges"))]
992    ConvertTimeRanges {
993        #[snafu(implicit)]
994        location: Location,
995        source: api::error::Error,
996    },
997
998    #[snafu(display(
999        "Column metadata inconsistencies found in table: {}, table_id: {}",
1000        table_name,
1001        table_id
1002    ))]
1003    ColumnMetadataConflicts {
1004        table_name: String,
1005        table_id: TableId,
1006    },
1007
1008    #[snafu(display(
1009        "Column not found in column metadata, column_name: {}, column_id: {}",
1010        column_name,
1011        column_id
1012    ))]
1013    ColumnNotFound { column_name: String, column_id: u32 },
1014
1015    #[snafu(display(
1016        "Column id mismatch, column_name: {}, expected column_id: {}, actual column_id: {}",
1017        column_name,
1018        expected_column_id,
1019        actual_column_id
1020    ))]
1021    ColumnIdMismatch {
1022        column_name: String,
1023        expected_column_id: u32,
1024        actual_column_id: u32,
1025    },
1026
1027    #[snafu(display(
1028        "Timestamp column mismatch, expected column_name: {}, expected column_id: {}, actual column_name: {}, actual column_id: {}",
1029        expected_column_name,
1030        expected_column_id,
1031        actual_column_name,
1032        actual_column_id,
1033    ))]
1034    TimestampMismatch {
1035        expected_column_name: String,
1036        expected_column_id: u32,
1037        actual_column_name: String,
1038        actual_column_id: u32,
1039    },
1040
1041    #[cfg(feature = "enterprise")]
1042    #[snafu(display("Too large duration"))]
1043    TooLargeDuration {
1044        #[snafu(source)]
1045        error: prost_types::DurationError,
1046        #[snafu(implicit)]
1047        location: Location,
1048    },
1049
1050    #[cfg(feature = "enterprise")]
1051    #[snafu(display("Negative duration"))]
1052    NegativeDuration {
1053        #[snafu(source)]
1054        error: prost_types::DurationError,
1055        #[snafu(implicit)]
1056        location: Location,
1057    },
1058
1059    #[cfg(feature = "enterprise")]
1060    #[snafu(display("Missing interval field"))]
1061    MissingInterval {
1062        #[snafu(implicit)]
1063        location: Location,
1064    },
1065}
1066
1067pub type Result<T> = std::result::Result<T, Error>;
1068
1069impl ErrorExt for Error {
1070    fn status_code(&self) -> StatusCode {
1071        use Error::*;
1072        match self {
1073            IllegalServerState { .. }
1074            | EtcdTxnOpResponse { .. }
1075            | EtcdFailed { .. }
1076            | EtcdTxnFailed { .. }
1077            | ConnectEtcd { .. }
1078            | MoveValues { .. }
1079            | GetCache { .. }
1080            | SerializeToJson { .. }
1081            | DeserializeFromJson { .. } => StatusCode::Internal,
1082
1083            NoLeader { .. } => StatusCode::TableUnavailable,
1084            ValueNotExist { .. }
1085            | ProcedurePoisonConflict { .. }
1086            | ProcedureStateReceiverNotFound { .. }
1087            | MissingColumnIds { .. }
1088            | MissingColumnInColumnMetadata { .. }
1089            | MismatchColumnId { .. }
1090            | ColumnMetadataConflicts { .. }
1091            | ColumnNotFound { .. }
1092            | ColumnIdMismatch { .. }
1093            | TimestampMismatch { .. } => StatusCode::Unexpected,
1094
1095            Unsupported { .. } => StatusCode::Unsupported,
1096            WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
1097
1098            SerdeJson { .. }
1099            | ParseOption { .. }
1100            | RouteInfoCorrupted { .. }
1101            | InvalidProtoMsg { .. }
1102            | InvalidMetadata { .. }
1103            | Unexpected { .. }
1104            | TableInfoNotFound { .. }
1105            | NextSequence { .. }
1106            | UnexpectedSequenceValue { .. }
1107            | InvalidHeartbeatResponse { .. }
1108            | EncodeJson { .. }
1109            | DecodeJson { .. }
1110            | PayloadNotExist { .. }
1111            | ConvertRawKey { .. }
1112            | DecodeProto { .. }
1113            | BuildTableMeta { .. }
1114            | TableRouteNotFound { .. }
1115            | TableRepartNotFound { .. }
1116            | ConvertRawTableInfo { .. }
1117            | RegionOperatingRace { .. }
1118            | EncodeWalOptions { .. }
1119            | BuildKafkaClient { .. }
1120            | BuildKafkaCtrlClient { .. }
1121            | KafkaPartitionClient { .. }
1122            | ProduceRecord { .. }
1123            | CreateKafkaWalTopic { .. }
1124            | EmptyTopicPool { .. }
1125            | UnexpectedLogicalRouteTable { .. }
1126            | ProcedureOutput { .. }
1127            | FromUtf8 { .. }
1128            | MetadataCorruption { .. }
1129            | ParseWalOptions { .. }
1130            | KafkaGetOffset { .. }
1131            | ReadFlexbuffers { .. }
1132            | SerializeFlexbuffers { .. }
1133            | DeserializeFlexbuffers { .. }
1134            | ConvertTimeRanges { .. } => StatusCode::Unexpected,
1135
1136            GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
1137
1138            SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1139
1140            ProcedureNotFound { .. }
1141            | InvalidViewInfo { .. }
1142            | PrimaryKeyNotFound { .. }
1143            | EmptyKey { .. }
1144            | AlterLogicalTablesInvalidArguments { .. }
1145            | CreateLogicalTablesInvalidArguments { .. }
1146            | MismatchPrefix { .. }
1147            | TlsConfig { .. }
1148            | InvalidSetDatabaseOption { .. }
1149            | InvalidUnsetDatabaseOption { .. }
1150            | InvalidTopicNamePrefix { .. }
1151            | InvalidTimeZone { .. }
1152            | InvalidFileExtension { .. }
1153            | InvalidFileName { .. }
1154            | InvalidFlowRequestBody { .. }
1155            | InvalidFilePath { .. } => StatusCode::InvalidArguments,
1156
1157            #[cfg(feature = "enterprise")]
1158            MissingInterval { .. } | NegativeDuration { .. } | TooLargeDuration { .. } => {
1159                StatusCode::InvalidArguments
1160            }
1161
1162            FlowNotFound { .. } => StatusCode::FlowNotFound,
1163            FlowRouteNotFound { .. } => StatusCode::Unexpected,
1164            FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
1165
1166            ViewNotFound { .. } | TableNotFound { .. } | RegionNotFound { .. } => {
1167                StatusCode::TableNotFound
1168            }
1169            ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1170
1171            SubmitProcedure { source, .. }
1172            | QueryProcedure { source, .. }
1173            | WaitProcedure { source, .. }
1174            | StartProcedureManager { source, .. }
1175            | StopProcedureManager { source, .. } => source.status_code(),
1176            RegisterProcedureLoader { source, .. } => source.status_code(),
1177            External { source, .. } => source.status_code(),
1178            ResponseExceededSizeLimit { source, .. } => source.status_code(),
1179            OperateDatanode { source, .. } => source.status_code(),
1180            Table { source, .. } => source.status_code(),
1181            RetryLater { source, .. } => source.status_code(),
1182            AbortProcedure { source, .. } => source.status_code(),
1183            ConvertAlterTableRequest { source, .. } => source.status_code(),
1184            PutPoison { source, .. } => source.status_code(),
1185            ConvertColumnDef { source, .. } => source.status_code(),
1186            ProcedureStateReceiver { source, .. } => source.status_code(),
1187            RegisterRepartitionProcedureLoader { source, .. } => source.status_code(),
1188            CreateRepartitionProcedure { source, .. } => source.status_code(),
1189
1190            ParseProcedureId { .. }
1191            | InvalidNumTopics { .. }
1192            | SchemaNotFound { .. }
1193            | CatalogNotFound { .. }
1194            | InvalidNodeInfoKey { .. }
1195            | InvalidStatKey { .. }
1196            | ParseNum { .. }
1197            | InvalidRole { .. }
1198            | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
1199
1200            LoadTlsCertificate { .. } => StatusCode::Internal,
1201
1202            #[cfg(feature = "pg_kvbackend")]
1203            PostgresExecution { .. }
1204            | CreatePostgresPool { .. }
1205            | GetPostgresConnection { .. }
1206            | PostgresTransaction { .. }
1207            | PostgresTlsConfig { .. }
1208            | InvalidTlsConfig { .. } => StatusCode::Internal,
1209            #[cfg(feature = "mysql_kvbackend")]
1210            MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
1211                StatusCode::Internal
1212            }
1213            #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1214            RdsTransactionRetryFailed { .. } => StatusCode::Internal,
1215            DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1216        }
1217    }
1218
1219    fn as_any(&self) -> &dyn std::any::Any {
1220        self
1221    }
1222}
1223
1224impl Error {
1225    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1226    /// Check if the error is a serialization error.
1227    pub fn is_serialization_error(&self) -> bool {
1228        match self {
1229            #[cfg(feature = "pg_kvbackend")]
1230            Error::PostgresTransaction { error, .. } => {
1231                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1232            }
1233            #[cfg(feature = "pg_kvbackend")]
1234            Error::PostgresExecution { error, .. } => {
1235                error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1236            }
1237            #[cfg(feature = "mysql_kvbackend")]
1238            Error::MySqlExecution {
1239                error: sqlx::Error::Database(database_error),
1240                ..
1241            } => {
1242                matches!(
1243                    database_error.message(),
1244                    "Deadlock found when trying to get lock; try restarting transaction"
1245                        | "can't serialize access for this transaction"
1246                )
1247            }
1248            _ => false,
1249        }
1250    }
1251
1252    /// Creates a new [Error::RetryLater] error from source `err`.
1253    pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1254        Error::RetryLater {
1255            source: BoxedError::new(err),
1256            clean_poisons: false,
1257        }
1258    }
1259
1260    /// Determine whether it is a retry later type through [StatusCode]
1261    pub fn is_retry_later(&self) -> bool {
1262        matches!(self, Error::RetryLater { .. })
1263    }
1264
1265    /// Determine whether it needs to clean poisons.
1266    pub fn need_clean_poisons(&self) -> bool {
1267        matches!(
1268            self,
1269            Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1270        ) || matches!(
1271            self,
1272            Error::RetryLater { clean_poisons, .. } if *clean_poisons
1273        )
1274    }
1275
1276    /// Returns true if the response exceeds the size limit.
1277    pub fn is_exceeded_size_limit(&self) -> bool {
1278        match self {
1279            Error::EtcdFailed {
1280                error: etcd_client::Error::GRpcStatus(status),
1281                ..
1282            } => status.code() == tonic::Code::OutOfRange,
1283            Error::ResponseExceededSizeLimit { .. } => true,
1284            _ => false,
1285        }
1286    }
1287}