1use std::collections::HashSet;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::{Parser, Subcommand};
22use common_error::ext::BoxedError;
23use common_telemetry::info;
24use serde_json::Value;
25use snafu::{OptionExt, ResultExt};
26
27use crate::Tool;
28use crate::common::ObjectStoreConfig;
29use crate::data::export_v2::error::{
30 CannotResumeSchemaOnlySnafu, DataExportNotImplementedSnafu, DatabaseSnafu, EmptyResultSnafu,
31 ManifestVersionMismatchSnafu, Result, UnexpectedValueTypeSnafu,
32};
33use crate::data::export_v2::extractor::SchemaExtractor;
34use crate::data::export_v2::manifest::{DataFormat, MANIFEST_VERSION, Manifest};
35use crate::data::path::ddl_path_for_schema;
36use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
37use crate::data::sql::{escape_sql_identifier, escape_sql_literal};
38use crate::database::{DatabaseClient, parse_proxy_opts};
39
40#[derive(Debug, Subcommand)]
42pub enum ExportV2Command {
43 Create(ExportCreateCommand),
45}
46
47impl ExportV2Command {
48 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
49 match self {
50 ExportV2Command::Create(cmd) => cmd.build().await,
51 }
52 }
53}
54
55#[derive(Debug, Parser)]
57pub struct ExportCreateCommand {
58 #[clap(long)]
60 addr: String,
61
62 #[clap(long)]
64 to: String,
65
66 #[clap(long, default_value = "greptime")]
68 catalog: String,
69
70 #[clap(long, value_delimiter = ',')]
73 schemas: Vec<String>,
74
75 #[clap(long)]
77 schema_only: bool,
78
79 #[clap(long)]
81 start_time: Option<String>,
82
83 #[clap(long)]
85 end_time: Option<String>,
86
87 #[clap(long, value_enum, default_value = "parquet")]
89 format: DataFormat,
90
91 #[clap(long)]
93 force: bool,
94
95 #[clap(long, default_value = "1")]
97 parallelism: usize,
98
99 #[clap(long)]
101 auth_basic: Option<String>,
102
103 #[clap(long, value_parser = humantime::parse_duration)]
105 timeout: Option<Duration>,
106
107 #[clap(long)]
112 proxy: Option<String>,
113
114 #[clap(long)]
118 no_proxy: bool,
119
120 #[clap(flatten)]
122 storage: ObjectStoreConfig,
123}
124
125impl ExportCreateCommand {
126 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
127 validate_uri(&self.to).map_err(BoxedError::new)?;
129
130 if !self.schema_only {
131 return DataExportNotImplementedSnafu
132 .fail()
133 .map_err(BoxedError::new);
134 }
135
136 let schemas = if self.schemas.is_empty() {
138 None
139 } else {
140 Some(self.schemas.clone())
141 };
142
143 let storage = OpenDalStorage::from_uri(&self.to, &self.storage).map_err(BoxedError::new)?;
145
146 let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
148 let database_client = DatabaseClient::new(
149 self.addr.clone(),
150 self.catalog.clone(),
151 self.auth_basic.clone(),
152 self.timeout.unwrap_or(Duration::from_secs(60)),
153 proxy,
154 self.no_proxy,
155 );
156
157 Ok(Box::new(ExportCreate {
158 catalog: self.catalog.clone(),
159 schemas,
160 schema_only: self.schema_only,
161 _format: self.format,
162 force: self.force,
163 _parallelism: self.parallelism,
164 storage: Box::new(storage),
165 database_client,
166 }))
167 }
168}
169
170pub struct ExportCreate {
172 catalog: String,
173 schemas: Option<Vec<String>>,
174 schema_only: bool,
175 _format: DataFormat,
176 force: bool,
177 _parallelism: usize,
178 storage: Box<dyn SnapshotStorage>,
179 database_client: DatabaseClient,
180}
181
182#[async_trait]
183impl Tool for ExportCreate {
184 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
185 self.run().await.map_err(BoxedError::new)
186 }
187}
188
189impl ExportCreate {
190 async fn run(&self) -> Result<()> {
191 let exists = self.storage.exists().await?;
193
194 if exists {
195 if self.force {
196 info!("Deleting existing snapshot (--force)");
197 self.storage.delete_snapshot().await?;
198 } else {
199 let manifest = self.storage.read_manifest().await?;
201
202 if manifest.version != MANIFEST_VERSION {
204 return ManifestVersionMismatchSnafu {
205 expected: MANIFEST_VERSION,
206 found: manifest.version,
207 }
208 .fail();
209 }
210
211 if manifest.schema_only && !self.schema_only {
213 return CannotResumeSchemaOnlySnafu.fail();
214 }
215
216 info!(
217 "Resuming existing snapshot: {} (completed: {}/{} chunks)",
218 manifest.snapshot_id,
219 manifest.completed_count(),
220 manifest.chunks.len()
221 );
222
223 if manifest.is_complete() {
226 info!("Snapshot is already complete");
227 return Ok(());
228 }
229
230 info!("Data export resume not yet implemented (M2)");
232 return Ok(());
233 }
234 }
235
236 let extractor = SchemaExtractor::new(&self.database_client, &self.catalog);
238 let schema_snapshot = extractor.extract(self.schemas.as_deref()).await?;
239
240 let schema_names: Vec<String> = schema_snapshot
241 .schemas
242 .iter()
243 .map(|s| s.name.clone())
244 .collect();
245 info!("Exporting schemas: {:?}", schema_names);
246
247 let manifest = Manifest::new_schema_only(self.catalog.clone(), schema_names.clone());
249
250 self.storage.write_schema(&schema_snapshot).await?;
252 info!("Exported {} schemas", schema_snapshot.schemas.len());
253
254 let ddl_by_schema = self.build_ddl_by_schema(&schema_names).await?;
256 for (schema, ddl) in ddl_by_schema {
257 let ddl_path = ddl_path_for_schema(&schema);
258 self.storage.write_text(&ddl_path, &ddl).await?;
259 info!("Exported DDL for schema {} to {}", schema, ddl_path);
260 }
261
262 self.storage.write_manifest(&manifest).await?;
268 info!("Snapshot created: {}", manifest.snapshot_id);
269
270 Ok(())
271 }
272
273 async fn build_ddl_by_schema(&self, schema_names: &[String]) -> Result<Vec<(String, String)>> {
274 let mut schemas = schema_names.to_vec();
275 schemas.sort();
276
277 let mut ddl_by_schema = Vec::with_capacity(schemas.len());
278 for schema in schemas {
279 let create_database = self.show_create("DATABASE", &schema, None).await?;
280
281 let (mut physical_tables, mut tables, mut views) =
282 self.get_schema_objects(&schema).await?;
283 physical_tables.sort();
284 let mut physical_ddls = Vec::with_capacity(physical_tables.len());
285 for table in physical_tables {
286 physical_ddls.push(self.show_create("TABLE", &schema, Some(&table)).await?);
287 }
288
289 tables.sort();
290 let mut table_ddls = Vec::with_capacity(tables.len());
291 for table in tables {
292 table_ddls.push(self.show_create("TABLE", &schema, Some(&table)).await?);
293 }
294
295 views.sort();
296 let mut view_ddls = Vec::with_capacity(views.len());
297 for view in views {
298 view_ddls.push(self.show_create("VIEW", &schema, Some(&view)).await?);
299 }
300
301 let ddl = build_schema_ddl(
302 &schema,
303 create_database,
304 physical_ddls,
305 table_ddls,
306 view_ddls,
307 );
308 ddl_by_schema.push((schema, ddl));
309 }
310
311 Ok(ddl_by_schema)
312 }
313
314 async fn get_schema_objects(
315 &self,
316 schema: &str,
317 ) -> Result<(Vec<String>, Vec<String>, Vec<String>)> {
318 let physical_tables = self.get_metric_physical_tables(schema).await?;
319 let physical_set: HashSet<&str> = physical_tables.iter().map(String::as_str).collect();
320 let sql = format!(
321 "SELECT table_name, table_type FROM information_schema.tables \
322 WHERE table_catalog = '{}' AND table_schema = '{}' \
323 AND (table_type = 'BASE TABLE' OR table_type = 'VIEW')",
324 escape_sql_literal(&self.catalog),
325 escape_sql_literal(schema)
326 );
327 let records: Option<Vec<Vec<Value>>> = self
328 .database_client
329 .sql_in_public(&sql)
330 .await
331 .context(DatabaseSnafu)?;
332
333 let mut tables = Vec::new();
334 let mut views = Vec::new();
335 if let Some(rows) = records {
336 for row in rows {
337 let name = match row.first() {
338 Some(Value::String(name)) => name.clone(),
339 _ => return UnexpectedValueTypeSnafu.fail(),
340 };
341 let table_type = match row.get(1) {
342 Some(Value::String(table_type)) => table_type.as_str(),
343 _ => return UnexpectedValueTypeSnafu.fail(),
344 };
345 if !physical_set.contains(name.as_str()) {
346 if table_type == "VIEW" {
347 views.push(name);
348 } else {
349 tables.push(name);
350 }
351 }
352 }
353 }
354
355 Ok((physical_tables, tables, views))
356 }
357
358 async fn get_metric_physical_tables(&self, schema: &str) -> Result<Vec<String>> {
359 let sql = format!(
360 "SELECT DISTINCT table_name FROM information_schema.columns \
361 WHERE table_catalog = '{}' AND table_schema = '{}' AND column_name = '__tsid'",
362 escape_sql_literal(&self.catalog),
363 escape_sql_literal(schema)
364 );
365 let records: Option<Vec<Vec<Value>>> = self
366 .database_client
367 .sql_in_public(&sql)
368 .await
369 .context(DatabaseSnafu)?;
370
371 let mut tables = HashSet::new();
372 if let Some(rows) = records {
373 for row in rows {
374 let name = match row.first() {
375 Some(Value::String(name)) => name.clone(),
376 _ => return UnexpectedValueTypeSnafu.fail(),
377 };
378 tables.insert(name);
379 }
380 }
381
382 Ok(tables.into_iter().collect())
383 }
384
385 async fn show_create(
386 &self,
387 show_type: &str,
388 schema: &str,
389 table: Option<&str>,
390 ) -> Result<String> {
391 let sql = match table {
392 Some(table) => format!(
393 r#"SHOW CREATE {} "{}"."{}"."{}""#,
394 show_type,
395 escape_sql_identifier(&self.catalog),
396 escape_sql_identifier(schema),
397 escape_sql_identifier(table)
398 ),
399 None => format!(
400 r#"SHOW CREATE {} "{}"."{}""#,
401 show_type,
402 escape_sql_identifier(&self.catalog),
403 escape_sql_identifier(schema)
404 ),
405 };
406
407 let records: Option<Vec<Vec<Value>>> = self
408 .database_client
409 .sql_in_public(&sql)
410 .await
411 .context(DatabaseSnafu)?;
412 let rows = records.context(EmptyResultSnafu)?;
413 let row = rows.first().context(EmptyResultSnafu)?;
414 let Some(Value::String(create)) = row.get(1) else {
415 return UnexpectedValueTypeSnafu.fail();
416 };
417
418 Ok(format!("{};\n", create))
419 }
420}
421
422fn build_schema_ddl(
423 schema: &str,
424 create_database: String,
425 physical_tables: Vec<String>,
426 tables: Vec<String>,
427 views: Vec<String>,
428) -> String {
429 let mut ddl = String::new();
430 ddl.push_str(&format!("-- Schema: {}\n", schema));
431 ddl.push_str(&create_database);
432 for stmt in physical_tables {
433 ddl.push_str(&stmt);
434 }
435 for stmt in tables {
436 ddl.push_str(&stmt);
437 }
438 for stmt in views {
439 ddl.push_str(&stmt);
440 }
441 ddl.push('\n');
442 ddl
443}
444
445#[cfg(test)]
446mod tests {
447 use clap::Parser;
448
449 use super::*;
450 use crate::data::path::ddl_path_for_schema;
451
452 #[test]
453 fn test_ddl_path_for_schema() {
454 assert_eq!(ddl_path_for_schema("public"), "schema/ddl/public.sql");
455 assert_eq!(
456 ddl_path_for_schema("../evil"),
457 "schema/ddl/%2E%2E%2Fevil.sql"
458 );
459 }
460
461 #[test]
462 fn test_build_schema_ddl_order() {
463 let ddl = build_schema_ddl(
464 "public",
465 "CREATE DATABASE public;\n".to_string(),
466 vec!["PHYSICAL;\n".to_string()],
467 vec!["TABLE;\n".to_string()],
468 vec!["VIEW;\n".to_string()],
469 );
470
471 let db_pos = ddl.find("CREATE DATABASE").unwrap();
472 let physical_pos = ddl.find("PHYSICAL;").unwrap();
473 let table_pos = ddl.find("TABLE;").unwrap();
474 let view_pos = ddl.find("VIEW;").unwrap();
475 assert!(db_pos < physical_pos);
476 assert!(physical_pos < table_pos);
477 assert!(table_pos < view_pos);
478 }
479
480 #[tokio::test]
481 async fn test_build_rejects_non_schema_only_export() {
482 let cmd = ExportCreateCommand::parse_from([
483 "export-v2-create",
484 "--addr",
485 "127.0.0.1:4000",
486 "--to",
487 "file:///tmp/export-v2-test",
488 ]);
489
490 let result = cmd.build().await;
491 assert!(result.is_err());
492 let error = result.err().unwrap().to_string();
493
494 assert!(error.contains("Data export is not implemented yet"));
495 }
496}