datanode/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use common_error::define_into_tonic_status;
19use common_error::ext::{BoxedError, ErrorExt};
20use common_error::status_code::StatusCode;
21use common_macro::stack_trace_debug;
22use snafu::{Location, Snafu};
23use store_api::storage::RegionId;
24use table::error::Error as TableError;
25use tokio::time::error::Elapsed;
26
27/// Business error of datanode.
28#[derive(Snafu)]
29#[snafu(visibility(pub))]
30#[stack_trace_debug]
31pub enum Error {
32    #[snafu(display("Failed to execute async task"))]
33    AsyncTaskExecute {
34        #[snafu(implicit)]
35        location: Location,
36        source: Arc<Error>,
37    },
38
39    #[snafu(display("Failed to watch change"))]
40    WatchAsyncTaskChange {
41        #[snafu(implicit)]
42        location: Location,
43        #[snafu(source)]
44        error: tokio::sync::watch::error::RecvError,
45    },
46
47    #[snafu(display("Failed to handle heartbeat response"))]
48    HandleHeartbeatResponse {
49        #[snafu(implicit)]
50        location: Location,
51        source: common_meta::error::Error,
52    },
53
54    #[snafu(display("Failed to get info from meta server"))]
55    GetMetadata {
56        #[snafu(implicit)]
57        location: Location,
58        source: common_meta::error::Error,
59    },
60
61    #[snafu(display("Failed to execute logical plan"))]
62    ExecuteLogicalPlan {
63        #[snafu(implicit)]
64        location: Location,
65        source: query::error::Error,
66    },
67
68    #[snafu(display("Failed to create plan decoder"))]
69    NewPlanDecoder {
70        #[snafu(implicit)]
71        location: Location,
72        source: query::error::Error,
73    },
74
75    #[snafu(display("Failed to decode logical plan"))]
76    DecodeLogicalPlan {
77        #[snafu(implicit)]
78        location: Location,
79        source: common_query::error::Error,
80    },
81
82    #[snafu(display("Schema not found: {}", name))]
83    SchemaNotFound {
84        name: String,
85        #[snafu(implicit)]
86        location: Location,
87    },
88
89    #[snafu(display("Missing timestamp column in request"))]
90    MissingTimestampColumn {
91        #[snafu(implicit)]
92        location: Location,
93    },
94
95    #[snafu(display("Failed to delete value from table: {}", table_name))]
96    Delete {
97        table_name: String,
98        #[snafu(implicit)]
99        location: Location,
100        source: TableError,
101    },
102
103    #[snafu(display("Failed to start server"))]
104    StartServer {
105        #[snafu(implicit)]
106        location: Location,
107        source: servers::error::Error,
108    },
109
110    #[snafu(display("Failed to parse address {}", addr))]
111    ParseAddr {
112        addr: String,
113        #[snafu(source)]
114        error: std::net::AddrParseError,
115    },
116
117    #[snafu(display("Failed to create directory {}", dir))]
118    CreateDir {
119        dir: String,
120        #[snafu(source)]
121        error: std::io::Error,
122    },
123
124    #[snafu(display("Failed to remove directory {}", dir))]
125    RemoveDir {
126        dir: String,
127        #[snafu(source)]
128        error: std::io::Error,
129    },
130
131    #[snafu(display("Failed to open log store"))]
132    OpenLogStore {
133        #[snafu(implicit)]
134        location: Location,
135        source: Box<log_store::error::Error>,
136    },
137
138    #[snafu(display("Invalid SQL, error: {}", msg))]
139    InvalidSql { msg: String },
140
141    #[snafu(display("Illegal primary keys definition: {}", msg))]
142    IllegalPrimaryKeysDef {
143        msg: String,
144        #[snafu(implicit)]
145        location: Location,
146    },
147
148    #[snafu(display("Schema {} already exists", name))]
149    SchemaExists {
150        name: String,
151        #[snafu(implicit)]
152        location: Location,
153    },
154
155    #[snafu(display("Failed to initialize meta client"))]
156    MetaClientInit {
157        #[snafu(implicit)]
158        location: Location,
159        source: meta_client::error::Error,
160    },
161
162    #[snafu(display("Missing node id in Datanode config"))]
163    MissingNodeId {
164        #[snafu(implicit)]
165        location: Location,
166    },
167
168    #[snafu(display("Failed to build datanode"))]
169    BuildDatanode {
170        #[snafu(implicit)]
171        location: Location,
172        source: BoxedError,
173    },
174
175    #[snafu(display("Failed to build http client"))]
176    BuildHttpClient {
177        #[snafu(implicit)]
178        location: Location,
179        #[snafu(source)]
180        error: reqwest::Error,
181    },
182
183    #[snafu(display("Missing required field: {}", name))]
184    MissingRequiredField {
185        name: String,
186        #[snafu(implicit)]
187        location: Location,
188    },
189
190    #[snafu(display(
191        "No valid default value can be built automatically, column: {}",
192        column,
193    ))]
194    ColumnNoneDefaultValue {
195        column: String,
196        #[snafu(implicit)]
197        location: Location,
198    },
199
200    #[snafu(display("Failed to shutdown server"))]
201    ShutdownServer {
202        #[snafu(implicit)]
203        location: Location,
204        #[snafu(source)]
205        source: servers::error::Error,
206    },
207
208    #[snafu(display("Failed to shutdown instance"))]
209    ShutdownInstance {
210        #[snafu(implicit)]
211        location: Location,
212        #[snafu(source)]
213        source: BoxedError,
214    },
215
216    #[snafu(display("Payload not exist"))]
217    PayloadNotExist {
218        #[snafu(implicit)]
219        location: Location,
220    },
221
222    #[snafu(display("Unexpected, violated: {}", violated))]
223    Unexpected {
224        violated: String,
225        #[snafu(implicit)]
226        location: Location,
227    },
228
229    #[snafu(display("Failed to handle request for region {}", region_id))]
230    HandleRegionRequest {
231        region_id: RegionId,
232        #[snafu(implicit)]
233        location: Location,
234        source: BoxedError,
235    },
236
237    #[snafu(display("Failed to open batch regions"))]
238    HandleBatchOpenRequest {
239        #[snafu(implicit)]
240        location: Location,
241        source: BoxedError,
242    },
243
244    #[snafu(display("Failed to handle batch ddl request, ddl_type: {}", ddl_type))]
245    HandleBatchDdlRequest {
246        #[snafu(implicit)]
247        location: Location,
248        source: BoxedError,
249        ddl_type: String,
250    },
251
252    #[snafu(display("RegionId {} not found", region_id))]
253    RegionNotFound {
254        region_id: RegionId,
255        #[snafu(implicit)]
256        location: Location,
257    },
258
259    #[snafu(display("Region {} not ready", region_id))]
260    RegionNotReady {
261        region_id: RegionId,
262        #[snafu(implicit)]
263        location: Location,
264    },
265
266    #[snafu(display("Region {} is busy", region_id))]
267    RegionBusy {
268        region_id: RegionId,
269        #[snafu(implicit)]
270        location: Location,
271    },
272
273    #[snafu(display("Region engine {} is not registered", name))]
274    RegionEngineNotFound {
275        name: String,
276        #[snafu(implicit)]
277        location: Location,
278    },
279
280    #[snafu(display("Unsupported output type, expected: {}", expected))]
281    UnsupportedOutput {
282        expected: String,
283        #[snafu(implicit)]
284        location: Location,
285    },
286
287    #[snafu(display("Failed to build region requests"))]
288    BuildRegionRequests {
289        #[snafu(implicit)]
290        location: Location,
291        source: store_api::metadata::MetadataError,
292    },
293
294    #[snafu(display("Failed to stop region engine {}", name))]
295    StopRegionEngine {
296        name: String,
297        #[snafu(implicit)]
298        location: Location,
299        source: BoxedError,
300    },
301
302    #[snafu(display(
303        "Failed to find logical regions in physical region {}",
304        physical_region_id
305    ))]
306    FindLogicalRegions {
307        physical_region_id: RegionId,
308        source: metric_engine::error::Error,
309        #[snafu(implicit)]
310        location: Location,
311    },
312
313    #[snafu(display("Failed to build mito engine"))]
314    BuildMitoEngine {
315        source: mito2::error::Error,
316        #[snafu(implicit)]
317        location: Location,
318    },
319
320    #[snafu(display("Failed to build metric engine"))]
321    BuildMetricEngine {
322        source: metric_engine::error::Error,
323        #[snafu(implicit)]
324        location: Location,
325    },
326
327    #[snafu(display("Failed to run gc for region {}", region_id))]
328    GcMitoEngine {
329        region_id: RegionId,
330        source: mito2::error::Error,
331        #[snafu(implicit)]
332        location: Location,
333    },
334
335    #[snafu(display("Invalid arguments for GC: {}", msg))]
336    InvalidGcArgs {
337        msg: String,
338        #[snafu(implicit)]
339        location: Location,
340    },
341
342    #[snafu(display("Failed to list SST entries from storage"))]
343    ListStorageSsts {
344        #[snafu(implicit)]
345        location: Location,
346        source: mito2::error::Error,
347    },
348
349    #[snafu(display("Failed to serialize options to TOML"))]
350    TomlFormat {
351        #[snafu(implicit)]
352        location: Location,
353        #[snafu(source(from(common_config::error::Error, Box::new)))]
354        source: Box<common_config::error::Error>,
355    },
356
357    #[snafu(display(
358        "Failed to get region metadata from engine {} for region_id {}",
359        engine,
360        region_id,
361    ))]
362    GetRegionMetadata {
363        engine: String,
364        region_id: RegionId,
365        #[snafu(implicit)]
366        location: Location,
367        source: BoxedError,
368    },
369
370    #[snafu(display("DataFusion"))]
371    DataFusion {
372        #[snafu(source)]
373        error: datafusion::error::DataFusionError,
374        #[snafu(implicit)]
375        location: Location,
376    },
377
378    #[snafu(display("Failed to acquire permit, source closed"))]
379    ConcurrentQueryLimiterClosed {
380        #[snafu(source)]
381        error: tokio::sync::AcquireError,
382        #[snafu(implicit)]
383        location: Location,
384    },
385
386    #[snafu(display("Failed to acquire permit under timeouts"))]
387    ConcurrentQueryLimiterTimeout {
388        #[snafu(source)]
389        error: Elapsed,
390        #[snafu(implicit)]
391        location: Location,
392    },
393
394    #[snafu(display("Cache not found in registry"))]
395    MissingCache {
396        #[snafu(implicit)]
397        location: Location,
398    },
399
400    #[snafu(display("Failed to serialize json"))]
401    SerializeJson {
402        #[snafu(source)]
403        error: serde_json::Error,
404        #[snafu(implicit)]
405        location: Location,
406    },
407
408    #[snafu(display("Failed object store operation"))]
409    ObjectStore {
410        source: object_store::error::Error,
411        #[snafu(implicit)]
412        location: Location,
413    },
414
415    #[snafu(display("Not yet implemented: {what}"))]
416    NotYetImplemented { what: String },
417}
418
419pub type Result<T> = std::result::Result<T, Error>;
420
421impl ErrorExt for Error {
422    fn status_code(&self) -> StatusCode {
423        use Error::*;
424        match self {
425            NewPlanDecoder { source, .. } | ExecuteLogicalPlan { source, .. } => {
426                source.status_code()
427            }
428
429            BuildRegionRequests { source, .. } => source.status_code(),
430            HandleHeartbeatResponse { source, .. } | GetMetadata { source, .. } => {
431                source.status_code()
432            }
433
434            DecodeLogicalPlan { source, .. } => source.status_code(),
435
436            Delete { source, .. } => source.status_code(),
437
438            InvalidSql { .. }
439            | IllegalPrimaryKeysDef { .. }
440            | MissingTimestampColumn { .. }
441            | SchemaNotFound { .. }
442            | SchemaExists { .. }
443            | MissingNodeId { .. }
444            | ColumnNoneDefaultValue { .. }
445            | MissingRequiredField { .. }
446            | RegionEngineNotFound { .. }
447            | ParseAddr { .. }
448            | TomlFormat { .. }
449            | BuildDatanode { .. } => StatusCode::InvalidArguments,
450
451            PayloadNotExist { .. }
452            | Unexpected { .. }
453            | WatchAsyncTaskChange { .. }
454            | BuildHttpClient { .. } => StatusCode::Unexpected,
455
456            AsyncTaskExecute { source, .. } => source.status_code(),
457
458            CreateDir { .. }
459            | RemoveDir { .. }
460            | ShutdownInstance { .. }
461            | DataFusion { .. }
462            | InvalidGcArgs { .. } => StatusCode::Internal,
463
464            RegionNotFound { .. } => StatusCode::RegionNotFound,
465            RegionNotReady { .. } => StatusCode::RegionNotReady,
466            RegionBusy { .. } => StatusCode::RegionBusy,
467
468            StartServer { source, .. } | ShutdownServer { source, .. } => source.status_code(),
469
470            OpenLogStore { source, .. } => source.status_code(),
471            MetaClientInit { source, .. } => source.status_code(),
472            UnsupportedOutput { .. } | NotYetImplemented { .. } => StatusCode::Unsupported,
473            HandleRegionRequest { source, .. }
474            | GetRegionMetadata { source, .. }
475            | HandleBatchOpenRequest { source, .. }
476            | HandleBatchDdlRequest { source, .. } => source.status_code(),
477            StopRegionEngine { source, .. } => source.status_code(),
478
479            FindLogicalRegions { source, .. } => source.status_code(),
480            BuildMitoEngine { source, .. } | GcMitoEngine { source, .. } => source.status_code(),
481            BuildMetricEngine { source, .. } => source.status_code(),
482            ListStorageSsts { source, .. } => source.status_code(),
483            ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
484                StatusCode::RegionBusy
485            }
486            MissingCache { .. } => StatusCode::Internal,
487            SerializeJson { .. } => StatusCode::Internal,
488
489            ObjectStore { source, .. } => source.status_code(),
490        }
491    }
492
493    fn as_any(&self) -> &dyn Any {
494        self
495    }
496}
497
498define_into_tonic_status!(Error);