cli/data/
storage_export.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::PathBuf;
16
17use common_base::secrets::{ExposeSecret, SecretString};
18use common_error::ext::BoxedError;
19
20use crate::common::{
21    PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection, PrefixedS3Connection,
22};
23
24/// Helper function to extract secret string from Option<SecretString>.
25/// Returns empty string if None.
26fn expose_optional_secret(secret: &Option<SecretString>) -> &str {
27    secret
28        .as_ref()
29        .map(|s| s.expose_secret().as_str())
30        .unwrap_or("")
31}
32
33/// Helper function to format root path with leading slash if non-empty.
34fn format_root_path(root: &str) -> String {
35    if root.is_empty() {
36        String::new()
37    } else {
38        format!("/{}", root)
39    }
40}
41
42/// Helper function to mask multiple secrets in a string.
43fn mask_secrets(mut sql: String, secrets: &[&str]) -> String {
44    for secret in secrets {
45        if !secret.is_empty() {
46            sql = sql.replace(secret, "[REDACTED]");
47        }
48    }
49    sql
50}
51
52/// Helper function to format storage URI.
53fn format_uri(scheme: &str, bucket: &str, root: &str, path: &str) -> String {
54    let root = format_root_path(root);
55    format!("{}://{}{}/{}", scheme, bucket, root, path)
56}
57
58/// Trait for storage backends that can be used for data export.
59pub trait StorageExport: Send + Sync {
60    /// Generate the storage path for COPY DATABASE command.
61    /// Returns (path, connection_string) where connection_string includes CONNECTION clause.
62    fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String);
63
64    /// Format the output path for logging purposes.
65    fn format_output_path(&self, file_path: &str) -> String;
66
67    /// Mask sensitive information in SQL commands for safe logging.
68    fn mask_sensitive_info(&self, sql: &str) -> String;
69}
70
71macro_rules! define_backend {
72    ($name:ident, $config:ty) => {
73        #[derive(Clone)]
74        pub struct $name {
75            config: $config,
76        }
77
78        impl $name {
79            pub fn new(config: $config) -> Result<Self, BoxedError> {
80                config.validate()?;
81                Ok(Self { config })
82            }
83        }
84    };
85}
86
87/// Local file system storage backend.
88#[derive(Clone)]
89pub struct FsBackend {
90    output_dir: String,
91}
92
93impl FsBackend {
94    pub fn new(output_dir: String) -> Self {
95        Self { output_dir }
96    }
97}
98
99impl StorageExport for FsBackend {
100    fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
101        if self.output_dir.is_empty() {
102            unreachable!("output_dir must be set when not using remote storage")
103        }
104        let path = PathBuf::from(&self.output_dir)
105            .join(catalog)
106            .join(format!("{schema}/"))
107            .to_string_lossy()
108            .to_string();
109        (path, String::new())
110    }
111
112    fn format_output_path(&self, file_path: &str) -> String {
113        format!("{}/{}", self.output_dir, file_path)
114    }
115
116    fn mask_sensitive_info(&self, sql: &str) -> String {
117        sql.to_string()
118    }
119}
120
121define_backend!(S3Backend, PrefixedS3Connection);
122
123impl StorageExport for S3Backend {
124    fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
125        let s3_path = format_uri(
126            "s3",
127            &self.config.s3_bucket,
128            &self.config.s3_root,
129            &format!("{}/{}/", catalog, schema),
130        );
131
132        let mut connection_options = vec![
133            format!(
134                "ACCESS_KEY_ID='{}'",
135                expose_optional_secret(&self.config.s3_access_key_id)
136            ),
137            format!(
138                "SECRET_ACCESS_KEY='{}'",
139                expose_optional_secret(&self.config.s3_secret_access_key)
140            ),
141        ];
142
143        if let Some(region) = &self.config.s3_region {
144            connection_options.push(format!("REGION='{}'", region));
145        }
146
147        if let Some(endpoint) = &self.config.s3_endpoint {
148            connection_options.push(format!("ENDPOINT='{}'", endpoint));
149        }
150
151        let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
152        (s3_path, connection_str)
153    }
154
155    fn format_output_path(&self, file_path: &str) -> String {
156        format_uri(
157            "s3",
158            &self.config.s3_bucket,
159            &self.config.s3_root,
160            file_path,
161        )
162    }
163
164    fn mask_sensitive_info(&self, sql: &str) -> String {
165        mask_secrets(
166            sql.to_string(),
167            &[
168                expose_optional_secret(&self.config.s3_access_key_id),
169                expose_optional_secret(&self.config.s3_secret_access_key),
170            ],
171        )
172    }
173}
174
175define_backend!(OssBackend, PrefixedOssConnection);
176
177impl StorageExport for OssBackend {
178    fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
179        let oss_path = format_uri(
180            "oss",
181            &self.config.oss_bucket,
182            &self.config.oss_root,
183            &format!("{}/{}/", catalog, schema),
184        );
185
186        let connection_options = [
187            format!(
188                "ACCESS_KEY_ID='{}'",
189                expose_optional_secret(&self.config.oss_access_key_id)
190            ),
191            format!(
192                "ACCESS_KEY_SECRET='{}'",
193                expose_optional_secret(&self.config.oss_access_key_secret)
194            ),
195        ];
196
197        let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
198        (oss_path, connection_str)
199    }
200
201    fn format_output_path(&self, file_path: &str) -> String {
202        format_uri(
203            "oss",
204            &self.config.oss_bucket,
205            &self.config.oss_root,
206            file_path,
207        )
208    }
209
210    fn mask_sensitive_info(&self, sql: &str) -> String {
211        mask_secrets(
212            sql.to_string(),
213            &[
214                expose_optional_secret(&self.config.oss_access_key_id),
215                expose_optional_secret(&self.config.oss_access_key_secret),
216            ],
217        )
218    }
219}
220
221define_backend!(GcsBackend, PrefixedGcsConnection);
222
223impl StorageExport for GcsBackend {
224    fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
225        let gcs_path = format_uri(
226            "gcs",
227            &self.config.gcs_bucket,
228            &self.config.gcs_root,
229            &format!("{}/{}/", catalog, schema),
230        );
231
232        let mut connection_options = Vec::new();
233
234        let credential_path = expose_optional_secret(&self.config.gcs_credential_path);
235        if !credential_path.is_empty() {
236            connection_options.push(format!("CREDENTIAL_PATH='{}'", credential_path));
237        }
238
239        let credential = expose_optional_secret(&self.config.gcs_credential);
240        if !credential.is_empty() {
241            connection_options.push(format!("CREDENTIAL='{}'", credential));
242        }
243
244        if !self.config.gcs_endpoint.is_empty() {
245            connection_options.push(format!("ENDPOINT='{}'", self.config.gcs_endpoint));
246        }
247
248        let connection_str = if connection_options.is_empty() {
249            String::new()
250        } else {
251            format!(" CONNECTION ({})", connection_options.join(", "))
252        };
253
254        (gcs_path, connection_str)
255    }
256
257    fn format_output_path(&self, file_path: &str) -> String {
258        format_uri(
259            "gcs",
260            &self.config.gcs_bucket,
261            &self.config.gcs_root,
262            file_path,
263        )
264    }
265
266    fn mask_sensitive_info(&self, sql: &str) -> String {
267        mask_secrets(
268            sql.to_string(),
269            &[
270                expose_optional_secret(&self.config.gcs_credential_path),
271                expose_optional_secret(&self.config.gcs_credential),
272            ],
273        )
274    }
275}
276
277define_backend!(AzblobBackend, PrefixedAzblobConnection);
278
279impl StorageExport for AzblobBackend {
280    fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
281        let azblob_path = format_uri(
282            "azblob",
283            &self.config.azblob_container,
284            &self.config.azblob_root,
285            &format!("{}/{}/", catalog, schema),
286        );
287
288        let mut connection_options = vec![
289            format!(
290                "ACCOUNT_NAME='{}'",
291                expose_optional_secret(&self.config.azblob_account_name)
292            ),
293            format!(
294                "ACCOUNT_KEY='{}'",
295                expose_optional_secret(&self.config.azblob_account_key)
296            ),
297        ];
298
299        if let Some(sas_token) = &self.config.azblob_sas_token {
300            connection_options.push(format!("SAS_TOKEN='{}'", sas_token));
301        }
302
303        let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
304        (azblob_path, connection_str)
305    }
306
307    fn format_output_path(&self, file_path: &str) -> String {
308        format_uri(
309            "azblob",
310            &self.config.azblob_container,
311            &self.config.azblob_root,
312            file_path,
313        )
314    }
315
316    fn mask_sensitive_info(&self, sql: &str) -> String {
317        mask_secrets(
318            sql.to_string(),
319            &[
320                expose_optional_secret(&self.config.azblob_account_name),
321                expose_optional_secret(&self.config.azblob_account_key),
322            ],
323        )
324    }
325}
326
327#[derive(Clone)]
328pub enum StorageType {
329    Fs(FsBackend),
330    S3(S3Backend),
331    Oss(OssBackend),
332    Gcs(GcsBackend),
333    Azblob(AzblobBackend),
334}
335
336impl StorageExport for StorageType {
337    fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
338        match self {
339            StorageType::Fs(backend) => backend.get_storage_path(catalog, schema),
340            StorageType::S3(backend) => backend.get_storage_path(catalog, schema),
341            StorageType::Oss(backend) => backend.get_storage_path(catalog, schema),
342            StorageType::Gcs(backend) => backend.get_storage_path(catalog, schema),
343            StorageType::Azblob(backend) => backend.get_storage_path(catalog, schema),
344        }
345    }
346
347    fn format_output_path(&self, file_path: &str) -> String {
348        match self {
349            StorageType::Fs(backend) => backend.format_output_path(file_path),
350            StorageType::S3(backend) => backend.format_output_path(file_path),
351            StorageType::Oss(backend) => backend.format_output_path(file_path),
352            StorageType::Gcs(backend) => backend.format_output_path(file_path),
353            StorageType::Azblob(backend) => backend.format_output_path(file_path),
354        }
355    }
356
357    fn mask_sensitive_info(&self, sql: &str) -> String {
358        match self {
359            StorageType::Fs(backend) => backend.mask_sensitive_info(sql),
360            StorageType::S3(backend) => backend.mask_sensitive_info(sql),
361            StorageType::Oss(backend) => backend.mask_sensitive_info(sql),
362            StorageType::Gcs(backend) => backend.mask_sensitive_info(sql),
363            StorageType::Azblob(backend) => backend.mask_sensitive_info(sql),
364        }
365    }
366}
367
368impl StorageType {
369    /// Returns true if the storage backend is remote (not local filesystem).
370    pub fn is_remote_storage(&self) -> bool {
371        !matches!(self, StorageType::Fs(_))
372    }
373}