1use std::path::PathBuf;
16
17use common_base::secrets::{ExposeSecret, SecretString};
18use common_error::ext::BoxedError;
19
20use crate::common::{
21 PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection, PrefixedS3Connection,
22};
23
24fn expose_optional_secret(secret: &Option<SecretString>) -> &str {
27 secret
28 .as_ref()
29 .map(|s| s.expose_secret().as_str())
30 .unwrap_or("")
31}
32
33fn format_root_path(root: &str) -> String {
35 if root.is_empty() {
36 String::new()
37 } else {
38 format!("/{}", root)
39 }
40}
41
42fn mask_secrets(mut sql: String, secrets: &[&str]) -> String {
44 for secret in secrets {
45 if !secret.is_empty() {
46 sql = sql.replace(secret, "[REDACTED]");
47 }
48 }
49 sql
50}
51
52fn format_uri(scheme: &str, bucket: &str, root: &str, path: &str) -> String {
54 let root = format_root_path(root);
55 format!("{}://{}{}/{}", scheme, bucket, root, path)
56}
57
58pub trait StorageExport: Send + Sync {
60 fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String);
63
64 fn format_output_path(&self, file_path: &str) -> String;
66
67 fn mask_sensitive_info(&self, sql: &str) -> String;
69}
70
71macro_rules! define_backend {
72 ($name:ident, $config:ty) => {
73 #[derive(Clone)]
74 pub struct $name {
75 config: $config,
76 }
77
78 impl $name {
79 pub fn new(config: $config) -> Result<Self, BoxedError> {
80 config.validate()?;
81 Ok(Self { config })
82 }
83 }
84 };
85}
86
87#[derive(Clone)]
89pub struct FsBackend {
90 output_dir: String,
91}
92
93impl FsBackend {
94 pub fn new(output_dir: String) -> Self {
95 Self { output_dir }
96 }
97}
98
99impl StorageExport for FsBackend {
100 fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
101 if self.output_dir.is_empty() {
102 unreachable!("output_dir must be set when not using remote storage")
103 }
104 let path = PathBuf::from(&self.output_dir)
105 .join(catalog)
106 .join(format!("{schema}/"))
107 .to_string_lossy()
108 .to_string();
109 (path, String::new())
110 }
111
112 fn format_output_path(&self, file_path: &str) -> String {
113 format!("{}/{}", self.output_dir, file_path)
114 }
115
116 fn mask_sensitive_info(&self, sql: &str) -> String {
117 sql.to_string()
118 }
119}
120
121define_backend!(S3Backend, PrefixedS3Connection);
122
123impl StorageExport for S3Backend {
124 fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
125 let s3_path = format_uri(
126 "s3",
127 &self.config.s3_bucket,
128 &self.config.s3_root,
129 &format!("{}/{}/", catalog, schema),
130 );
131
132 let mut connection_options = vec![
133 format!(
134 "ACCESS_KEY_ID='{}'",
135 expose_optional_secret(&self.config.s3_access_key_id)
136 ),
137 format!(
138 "SECRET_ACCESS_KEY='{}'",
139 expose_optional_secret(&self.config.s3_secret_access_key)
140 ),
141 ];
142
143 if let Some(region) = &self.config.s3_region {
144 connection_options.push(format!("REGION='{}'", region));
145 }
146
147 if let Some(endpoint) = &self.config.s3_endpoint {
148 connection_options.push(format!("ENDPOINT='{}'", endpoint));
149 }
150
151 let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
152 (s3_path, connection_str)
153 }
154
155 fn format_output_path(&self, file_path: &str) -> String {
156 format_uri(
157 "s3",
158 &self.config.s3_bucket,
159 &self.config.s3_root,
160 file_path,
161 )
162 }
163
164 fn mask_sensitive_info(&self, sql: &str) -> String {
165 mask_secrets(
166 sql.to_string(),
167 &[
168 expose_optional_secret(&self.config.s3_access_key_id),
169 expose_optional_secret(&self.config.s3_secret_access_key),
170 ],
171 )
172 }
173}
174
175define_backend!(OssBackend, PrefixedOssConnection);
176
177impl StorageExport for OssBackend {
178 fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
179 let oss_path = format_uri(
180 "oss",
181 &self.config.oss_bucket,
182 &self.config.oss_root,
183 &format!("{}/{}/", catalog, schema),
184 );
185
186 let connection_options = [
187 format!(
188 "ACCESS_KEY_ID='{}'",
189 expose_optional_secret(&self.config.oss_access_key_id)
190 ),
191 format!(
192 "ACCESS_KEY_SECRET='{}'",
193 expose_optional_secret(&self.config.oss_access_key_secret)
194 ),
195 ];
196
197 let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
198 (oss_path, connection_str)
199 }
200
201 fn format_output_path(&self, file_path: &str) -> String {
202 format_uri(
203 "oss",
204 &self.config.oss_bucket,
205 &self.config.oss_root,
206 file_path,
207 )
208 }
209
210 fn mask_sensitive_info(&self, sql: &str) -> String {
211 mask_secrets(
212 sql.to_string(),
213 &[
214 expose_optional_secret(&self.config.oss_access_key_id),
215 expose_optional_secret(&self.config.oss_access_key_secret),
216 ],
217 )
218 }
219}
220
221define_backend!(GcsBackend, PrefixedGcsConnection);
222
223impl StorageExport for GcsBackend {
224 fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
225 let gcs_path = format_uri(
226 "gcs",
227 &self.config.gcs_bucket,
228 &self.config.gcs_root,
229 &format!("{}/{}/", catalog, schema),
230 );
231
232 let mut connection_options = Vec::new();
233
234 let credential_path = expose_optional_secret(&self.config.gcs_credential_path);
235 if !credential_path.is_empty() {
236 connection_options.push(format!("CREDENTIAL_PATH='{}'", credential_path));
237 }
238
239 let credential = expose_optional_secret(&self.config.gcs_credential);
240 if !credential.is_empty() {
241 connection_options.push(format!("CREDENTIAL='{}'", credential));
242 }
243
244 if !self.config.gcs_endpoint.is_empty() {
245 connection_options.push(format!("ENDPOINT='{}'", self.config.gcs_endpoint));
246 }
247
248 let connection_str = if connection_options.is_empty() {
249 String::new()
250 } else {
251 format!(" CONNECTION ({})", connection_options.join(", "))
252 };
253
254 (gcs_path, connection_str)
255 }
256
257 fn format_output_path(&self, file_path: &str) -> String {
258 format_uri(
259 "gcs",
260 &self.config.gcs_bucket,
261 &self.config.gcs_root,
262 file_path,
263 )
264 }
265
266 fn mask_sensitive_info(&self, sql: &str) -> String {
267 mask_secrets(
268 sql.to_string(),
269 &[
270 expose_optional_secret(&self.config.gcs_credential_path),
271 expose_optional_secret(&self.config.gcs_credential),
272 ],
273 )
274 }
275}
276
277define_backend!(AzblobBackend, PrefixedAzblobConnection);
278
279impl StorageExport for AzblobBackend {
280 fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
281 let azblob_path = format_uri(
282 "azblob",
283 &self.config.azblob_container,
284 &self.config.azblob_root,
285 &format!("{}/{}/", catalog, schema),
286 );
287
288 let mut connection_options = vec![
289 format!(
290 "ACCOUNT_NAME='{}'",
291 expose_optional_secret(&self.config.azblob_account_name)
292 ),
293 format!(
294 "ACCOUNT_KEY='{}'",
295 expose_optional_secret(&self.config.azblob_account_key)
296 ),
297 ];
298
299 if let Some(sas_token) = &self.config.azblob_sas_token {
300 connection_options.push(format!("SAS_TOKEN='{}'", sas_token));
301 }
302
303 let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
304 (azblob_path, connection_str)
305 }
306
307 fn format_output_path(&self, file_path: &str) -> String {
308 format_uri(
309 "azblob",
310 &self.config.azblob_container,
311 &self.config.azblob_root,
312 file_path,
313 )
314 }
315
316 fn mask_sensitive_info(&self, sql: &str) -> String {
317 mask_secrets(
318 sql.to_string(),
319 &[
320 expose_optional_secret(&self.config.azblob_account_name),
321 expose_optional_secret(&self.config.azblob_account_key),
322 ],
323 )
324 }
325}
326
327#[derive(Clone)]
328pub enum StorageType {
329 Fs(FsBackend),
330 S3(S3Backend),
331 Oss(OssBackend),
332 Gcs(GcsBackend),
333 Azblob(AzblobBackend),
334}
335
336impl StorageExport for StorageType {
337 fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
338 match self {
339 StorageType::Fs(backend) => backend.get_storage_path(catalog, schema),
340 StorageType::S3(backend) => backend.get_storage_path(catalog, schema),
341 StorageType::Oss(backend) => backend.get_storage_path(catalog, schema),
342 StorageType::Gcs(backend) => backend.get_storage_path(catalog, schema),
343 StorageType::Azblob(backend) => backend.get_storage_path(catalog, schema),
344 }
345 }
346
347 fn format_output_path(&self, file_path: &str) -> String {
348 match self {
349 StorageType::Fs(backend) => backend.format_output_path(file_path),
350 StorageType::S3(backend) => backend.format_output_path(file_path),
351 StorageType::Oss(backend) => backend.format_output_path(file_path),
352 StorageType::Gcs(backend) => backend.format_output_path(file_path),
353 StorageType::Azblob(backend) => backend.format_output_path(file_path),
354 }
355 }
356
357 fn mask_sensitive_info(&self, sql: &str) -> String {
358 match self {
359 StorageType::Fs(backend) => backend.mask_sensitive_info(sql),
360 StorageType::S3(backend) => backend.mask_sensitive_info(sql),
361 StorageType::Oss(backend) => backend.mask_sensitive_info(sql),
362 StorageType::Gcs(backend) => backend.mask_sensitive_info(sql),
363 StorageType::Azblob(backend) => backend.mask_sensitive_info(sql),
364 }
365 }
366}
367
368impl StorageType {
369 pub fn is_remote_storage(&self) -> bool {
371 !matches!(self, StorageType::Fs(_))
372 }
373}