Merge branch 'main' into under-utilized-2

This commit is contained in:
Ruihang Xia
2026-04-02 02:44:29 +08:00
289 changed files with 11060 additions and 1362 deletions

View File

@@ -30,8 +30,11 @@ update_dev_builder_version() {
--body "This PR updates the dev-builder image tag" \
--base main \
--head $BRANCH_NAME \
--reviewer zyy17 \
--reviewer daviderli614
--reviewer sunng87 \
--reviewer daviderli614 \
--reviewer killme2008 \
--reviewer evenyag \
--reviewer fengjiachun
}
update_dev_builder_version

7
.gitignore vendored
View File

@@ -65,11 +65,12 @@ greptimedb_data
# github
!/.github
# Claude code
# AI related
CLAUDE.md
# AGENTS.md
AGENTS.md
.codex
.gemini
.opencode
# local design docs
docs/specs/

39
Cargo.lock generated
View File

@@ -565,7 +565,7 @@ dependencies = [
"arrow-schema 57.3.0",
"arrow-select 57.3.0",
"flatbuffers",
"lz4_flex 0.12.0",
"lz4_flex 0.12.1",
"zstd",
]
@@ -2055,7 +2055,6 @@ dependencies = [
"common-time",
"common-version",
"common-wal",
"const_format",
"datafusion",
"datafusion-common",
"datafusion-physical-plan",
@@ -5256,6 +5255,7 @@ dependencies = [
"humantime",
"humantime-serde",
"hyper-util",
"itertools 0.14.0",
"lazy_static",
"log-query",
"meta-client",
@@ -7296,7 +7296,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667"
dependencies = [
"cfg-if",
"windows-targets 0.52.6",
"windows-targets 0.48.5",
]
[[package]]
@@ -7593,18 +7593,18 @@ dependencies = [
[[package]]
name = "lz4_flex"
version = "0.11.5"
version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a"
checksum = "373f5eceeeab7925e0c1098212f2fbc4d416adec9d35051a6ab251e824c1854a"
dependencies = [
"twox-hash",
]
[[package]]
name = "lz4_flex"
version = "0.12.0"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab6473172471198271ff72e9379150e9dfd70d8e533e0752a27e515b48dd375e"
checksum = "98c23545df7ecf1b16c303910a69b079e8e251d60f7dd2cc9b4177f2afaf1746"
dependencies = [
"twox-hash",
]
@@ -9123,7 +9123,7 @@ dependencies = [
"flate2",
"futures",
"futures-util",
"lz4_flex 0.11.5",
"lz4_flex 0.11.6",
"lzokay-native",
"num",
"prost 0.13.5",
@@ -9346,7 +9346,7 @@ dependencies = [
"futures",
"half",
"hashbrown 0.16.1",
"lz4_flex 0.12.0",
"lz4_flex 0.12.1",
"num-bigint",
"num-integer",
"num-traits",
@@ -9384,6 +9384,7 @@ dependencies = [
"common-macro",
"common-meta",
"common-query",
"common-telemetry",
"criterion 0.7.0",
"datafusion-common",
"datafusion-expr",
@@ -10214,6 +10215,7 @@ dependencies = [
"common-macro",
"common-recordbatch",
"common-telemetry",
"criterion 0.7.0",
"datafusion",
"datafusion-common",
"datafusion-expr",
@@ -10573,7 +10575,7 @@ dependencies = [
"common-test-util",
"derive_builder 0.20.2",
"futures",
"lz4_flex 0.11.5",
"lz4_flex 0.11.6",
"moka",
"pin-project",
"prometheus 0.14.0",
@@ -11305,9 +11307,9 @@ dependencies = [
[[package]]
name = "rsasl"
version = "2.2.0"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8b534a23662bb559c5c73213be63ecd6524e774d291f3618c2b04b723d184eb"
checksum = "9f1bcb95b531681a622f3d6972eaab523e17e2aad6d6209f0276628eb1cb5038"
dependencies = [
"base64 0.22.1",
"core2",
@@ -11319,7 +11321,7 @@ dependencies = [
"serde_json",
"sha2",
"stringprep",
"thiserror 1.0.69",
"thiserror 2.0.17",
]
[[package]]
@@ -12056,6 +12058,7 @@ dependencies = [
"local-ip-address",
"log-query",
"loki-proto",
"metric-engine",
"mime_guess",
"mysql_async",
"notify",
@@ -12067,6 +12070,7 @@ dependencies = [
"operator",
"otel-arrow-rust",
"parking_lot 0.12.4",
"partition",
"permutation",
"pg_interval_2",
"pgwire",
@@ -12091,6 +12095,7 @@ dependencies = [
"session",
"simd-json",
"simdutf8",
"smallvec",
"snafu 0.8.6",
"snap",
"socket2 0.5.10",
@@ -12306,9 +12311,9 @@ dependencies = [
[[package]]
name = "slab"
version = "0.4.10"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d"
checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5"
[[package]]
name = "slotmap"
@@ -13222,7 +13227,7 @@ dependencies = [
"levenshtein_automata",
"log",
"lru",
"lz4_flex 0.11.5",
"lz4_flex 0.11.6",
"measure_time",
"memmap2",
"once_cell",
@@ -14712,7 +14717,7 @@ dependencies = [
"itertools 0.14.0",
"lalrpop",
"lalrpop-util",
"lz4_flex 0.11.5",
"lz4_flex 0.11.6",
"md-5",
"nom 7.1.3",
"ofb",

View File

@@ -8,7 +8,7 @@ CARGO_BUILD_OPTS := --locked
IMAGE_REGISTRY ?= docker.io
IMAGE_NAMESPACE ?= greptime
IMAGE_TAG ?= latest
DEV_BUILDER_IMAGE_TAG ?= 2025-10-01-8fe17d43-20251011080129
DEV_BUILDER_IMAGE_TAG ?= 2026-03-21-9c9d9e9e-20260331090344
BUILDX_MULTI_PLATFORM_BUILD ?= false
BUILDX_BUILDER_NAME ?= gtbuilder
BASE_IMAGE ?= ubuntu

View File

@@ -7,7 +7,7 @@ RUN sed -i s/mirror.centos.org/vault.centos.org/g /etc/yum.repos.d/*.repo
RUN sed -i s/^#.*baseurl=http/baseurl=http/g /etc/yum.repos.d/*.repo
# Install dependencies
RUN ulimit -n 1024000 && yum groupinstall -y 'Development Tools'
RUN yum groupinstall -y 'Development Tools'
RUN yum install -y epel-release \
openssl \
openssl-devel \

18
flake.lock generated
View File

@@ -8,11 +8,11 @@
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1770794449,
"narHash": "sha256-1nFkhcZx9+Sdw5OXwJqp5TxvGncqRqLeK781v0XV3WI=",
"lastModified": 1774250935,
"narHash": "sha256-mWID0WFgTnd9hbEeaPNX+YYWF70JN3r7zBouEqERJOE=",
"owner": "nix-community",
"repo": "fenix",
"rev": "b19d93fdf9761e6101f8cb5765d638bacebd9a1b",
"rev": "64d7705e8c37d650cfb1aa99c24a8ce46597f29e",
"type": "github"
},
"original": {
@@ -41,11 +41,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1770617025,
"narHash": "sha256-1jZvgZoAagZZB6NwGRv2T2ezPy+X6EFDsJm+YSlsvEs=",
"lastModified": 1774244481,
"narHash": "sha256-4XfMXU0DjN83o6HWZoKG9PegCvKvIhNUnRUI19vzTcQ=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "2db38e08fdadcc0ce3232f7279bab59a15b94482",
"rev": "4590696c8693fea477850fe379a01544293ca4e2",
"type": "github"
},
"original": {
@@ -65,11 +65,11 @@
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1770702974,
"narHash": "sha256-CbvWu72rpGHK5QynoXwuOnVzxX7njF2LYgk8wRSiAQ0=",
"lastModified": 1774221325,
"narHash": "sha256-aEIdkqB8gtQZtEbogdUb5iyfcZpKIlD3FkG8ANu73/I=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "07a594815f7c1d6e7e39f21ddeeedb75b21795f4",
"rev": "b42b63f390a4dab14e6efa34a70e67f5b087cc62",
"type": "github"
},
"original": {

View File

@@ -20,7 +20,7 @@
lib = nixpkgs.lib;
rustToolchain = fenix.packages.${system}.fromToolchainName {
name = (lib.importTOML ./rust-toolchain.toml).toolchain.channel;
sha256 = "sha256-GCGEXGZeJySLND0KU5TdtTrqFV76TF3UdvAHSUegSsk=";
sha256 = "sha256-rboGKQLH4eDuiY01SINOqmXUFUNr9F4awoFZGzib17o=";
};
in
{

View File

@@ -1,2 +1,2 @@
[toolchain]
channel = "nightly-2025-10-01"
channel = "nightly-2026-03-21"

View File

@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::sync::Arc;
use api::v1::greptime_request::Request;

View File

@@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
#![feature(try_blocks)]
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

View File

@@ -132,15 +132,13 @@ impl CatalogManager for MemoryCatalogManager {
table_name: &str,
_query_ctx: Option<&QueryContext>,
) -> Result<Option<TableRef>> {
let result = try {
self.catalogs
.read()
.unwrap()
.get(catalog)?
.get(schema)?
.get(table_name)
.cloned()?
};
let catalogs = self.catalogs.read().unwrap();
let result = catalogs
.get(catalog)
.and_then(|c| c.get(schema))
.and_then(|s| s.get(table_name))
.cloned();
Ok(result)
}
@@ -149,8 +147,8 @@ impl CatalogManager for MemoryCatalogManager {
.catalogs
.read()
.unwrap()
.iter()
.flat_map(|(_, schema_entries)| schema_entries.values())
.values()
.flat_map(|schema_entries| schema_entries.values())
.flat_map(|tables| tables.values())
.find(|t| t.table_info().ident.table_id == table_id)
.map(|t| t.table_info()))

View File

@@ -372,22 +372,16 @@ impl InformationSchemaTablesBuilder {
self.table_types.push(Some(table_type_text));
self.table_ids.push(Some(table_id));
let data_length = region_stats.iter().map(|stat| stat.sst_size).sum();
let table_rows = region_stats.iter().map(|stat| stat.num_rows).sum();
let index_length = region_stats.iter().map(|stat| stat.index_size).sum();
let data_length: u64 = region_stats.iter().map(|stat| stat.sst_size).sum();
let table_rows: u64 = region_stats.iter().map(|stat| stat.num_rows).sum();
let index_length: u64 = region_stats.iter().map(|stat| stat.index_size).sum();
// It's not precise, but it is acceptable for long-term data storage.
let avg_row_length = if table_rows > 0 {
let total_data_length = data_length
+ region_stats
.iter()
.map(|stat| stat.memtable_size)
.sum::<u64>();
total_data_length / table_rows
} else {
0
};
let total_data_length: u64 = data_length
+ region_stats
.iter()
.map(|stat| stat.memtable_size)
.sum::<u64>();
let avg_row_length = total_data_length.checked_div(table_rows).unwrap_or(0);
self.data_length.push(Some(data_length));
self.index_length.push(Some(index_length));

View File

@@ -74,12 +74,10 @@ impl PGCatalogProvider {
)
.expect("Failed to initialize PgCatalogSchemaProvider");
let mut table_ids = HashMap::new();
let mut table_id = PG_CATALOG_TABLE_ID_START;
for name in PG_CATALOG_TABLES {
table_ids.insert(*name, table_id);
table_id += 1;
}
let table_ids: HashMap<_, _> = (PG_CATALOG_TABLE_ID_START..)
.zip(PG_CATALOG_TABLES.iter())
.map(|(id, name)| (*name, id))
.collect();
let mut provider = Self {
catalog_name,

View File

@@ -15,7 +15,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use bytes::Bytes;
use common_catalog::format_full_table_name;
use common_query::logical_plan::{SubstraitPlanDecoderRef, rename_logical_plan_columns};
use datafusion::common::{ResolvedTableReference, TableReference};
@@ -151,11 +150,7 @@ impl DfTableSourceProvider {
let catalog_list = Arc::new(DummyCatalogList::new(self.catalog_manager.clone()));
let logical_plan = self
.plan_decoder
.decode(
Bytes::from(view_info.view_info.clone()),
catalog_list,
false,
)
.decode(view_info.view_info.clone().into(), catalog_list, false)
.await
.context(DecodePlanSnafu {
name: &table.table_info().name,
@@ -195,7 +190,7 @@ impl DfTableSourceProvider {
plan_columns
.iter()
.map(|c| c.as_str())
.zip(columns.into_iter())
.zip(columns)
.collect(),
)
.context(ProjectViewColumnsSnafu)?

View File

@@ -458,8 +458,10 @@ impl Export {
/// build operator with preference for file system
async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
if self.storage_type.is_remote_storage() && self.ddl_local_dir.is_some() {
let root = self.ddl_local_dir.as_ref().unwrap().clone();
if self.storage_type.is_remote_storage()
&& let Some(ddl_local_dir) = &self.ddl_local_dir
{
let root = ddl_local_dir.clone();
let op = new_fs_object_store(&root).map_err(|e| Error::Other {
source: e,
location: snafu::location!(),

View File

@@ -30,7 +30,7 @@
//! --to file:///tmp/snapshot \
//! --schema-only
//!
//! # Export with time range (M2)
//! # Export with time range
//! greptime cli data export-v2 create \
//! --addr 127.0.0.1:4000 \
//! --to s3://bucket/snapshots/prod-20250101 \
@@ -38,7 +38,10 @@
//! --end-time 2025-01-31T23:59:59Z
//! ```
mod chunker;
mod command;
mod coordinator;
mod data;
pub mod error;
pub mod extractor;
pub mod manifest;

View File

@@ -0,0 +1,103 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use chrono::Duration as ChronoDuration;
use crate::data::export_v2::manifest::{ChunkMeta, TimeRange};
pub fn generate_chunks(time_range: &TimeRange, window: Duration) -> Vec<ChunkMeta> {
let (Some(start), Some(end)) = (time_range.start, time_range.end) else {
return vec![ChunkMeta::new(1, time_range.clone())];
};
if start == end {
return vec![ChunkMeta::skipped(1, time_range.clone())];
}
if start > end {
return Vec::new();
}
let window = match ChronoDuration::from_std(window) {
Ok(window) if window > ChronoDuration::zero() => window,
_ => return vec![ChunkMeta::new(1, time_range.clone())],
};
let mut chunks = Vec::new();
let mut cursor = start;
let mut id = 1;
while cursor < end {
let next = cursor
.checked_add_signed(window)
.map_or(end, |timestamp| timestamp.min(end));
chunks.push(ChunkMeta::new(id, TimeRange::new(Some(cursor), Some(next))));
id += 1;
cursor = next;
}
chunks
}
#[cfg(test)]
mod tests {
use chrono::{TimeZone, Utc};
use super::*;
use crate::data::export_v2::manifest::ChunkStatus;
#[test]
fn test_generate_chunks_unbounded() {
let range = TimeRange::unbounded();
let chunks = generate_chunks(&range, Duration::from_secs(3600));
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].time_range, range);
}
#[test]
fn test_generate_chunks_split() {
let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(2025, 1, 1, 3, 0, 0).unwrap();
let range = TimeRange::new(Some(start), Some(end));
let chunks = generate_chunks(&range, Duration::from_secs(3600));
assert_eq!(chunks.len(), 3);
assert_eq!(chunks[0].time_range.start, Some(start));
assert_eq!(
chunks[2].time_range.end,
Some(Utc.with_ymd_and_hms(2025, 1, 1, 3, 0, 0).unwrap())
);
}
#[test]
fn test_generate_chunks_empty_range() {
let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let range = TimeRange::new(Some(start), Some(start));
let chunks = generate_chunks(&range, Duration::from_secs(3600));
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].status, ChunkStatus::Skipped);
assert_eq!(chunks[0].time_range, range);
}
#[test]
fn test_generate_chunks_invalid_range_is_empty() {
let start = Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let range = TimeRange::new(Some(start), Some(end));
let chunks = generate_chunks(&range, Duration::from_secs(3600));
assert!(chunks.is_empty());
}
}

View File

@@ -26,12 +26,16 @@ use snafu::{OptionExt, ResultExt};
use crate::Tool;
use crate::common::ObjectStoreConfig;
use crate::data::export_v2::coordinator::export_data;
use crate::data::export_v2::error::{
CannotResumeSchemaOnlySnafu, DataExportNotImplementedSnafu, DatabaseSnafu, EmptyResultSnafu,
ManifestVersionMismatchSnafu, Result, UnexpectedValueTypeSnafu,
ChunkTimeWindowRequiresBoundsSnafu, DatabaseSnafu, EmptyResultSnafu,
ManifestVersionMismatchSnafu, Result, ResumeConfigMismatchSnafu, SchemaOnlyArgsNotAllowedSnafu,
SchemaOnlyModeMismatchSnafu, UnexpectedValueTypeSnafu,
};
use crate::data::export_v2::extractor::SchemaExtractor;
use crate::data::export_v2::manifest::{DataFormat, MANIFEST_VERSION, Manifest};
use crate::data::export_v2::manifest::{
ChunkMeta, DataFormat, MANIFEST_VERSION, Manifest, TimeRange,
};
use crate::data::path::ddl_path_for_schema;
use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
use crate::data::sql::{escape_sql_identifier, escape_sql_literal};
@@ -84,6 +88,11 @@ pub struct ExportCreateCommand {
#[clap(long)]
end_time: Option<String>,
/// Chunk time window (e.g., 1h, 6h, 1d, 7d).
/// Requires both --start-time and --end-time when specified.
#[clap(long, value_parser = humantime::parse_duration)]
chunk_time_window: Option<Duration>,
/// Data format: parquet, csv, json.
#[clap(long, value_enum, default_value = "parquet")]
format: DataFormat,
@@ -92,7 +101,7 @@ pub struct ExportCreateCommand {
#[clap(long)]
force: bool,
/// Concurrency level (for future use).
/// Parallelism for COPY DATABASE execution (server-side, per schema per chunk).
#[clap(long, default_value = "1")]
parallelism: usize,
@@ -127,11 +136,38 @@ impl ExportCreateCommand {
// Validate URI format
validate_uri(&self.to).map_err(BoxedError::new)?;
if !self.schema_only {
return DataExportNotImplementedSnafu
let time_range = TimeRange::parse(self.start_time.as_deref(), self.end_time.as_deref())
.map_err(BoxedError::new)?;
if self.chunk_time_window.is_some() && !time_range.is_bounded() {
return ChunkTimeWindowRequiresBoundsSnafu
.fail()
.map_err(BoxedError::new);
}
if self.schema_only {
let mut invalid_args = Vec::new();
if self.start_time.is_some() {
invalid_args.push("--start-time");
}
if self.end_time.is_some() {
invalid_args.push("--end-time");
}
if self.chunk_time_window.is_some() {
invalid_args.push("--chunk-time-window");
}
if self.format != DataFormat::Parquet {
invalid_args.push("--format");
}
if self.parallelism != 1 {
invalid_args.push("--parallelism");
}
if !invalid_args.is_empty() {
return SchemaOnlyArgsNotAllowedSnafu {
args: invalid_args.join(", "),
}
.fail()
.map_err(BoxedError::new);
}
}
// Parse schemas (empty vec means all schemas)
let schemas = if self.schemas.is_empty() {
@@ -155,12 +191,18 @@ impl ExportCreateCommand {
);
Ok(Box::new(ExportCreate {
catalog: self.catalog.clone(),
schemas,
schema_only: self.schema_only,
_format: self.format,
force: self.force,
_parallelism: self.parallelism,
config: ExportConfig {
catalog: self.catalog.clone(),
schemas,
schema_only: self.schema_only,
format: self.format,
force: self.force,
time_range,
chunk_time_window: self.chunk_time_window,
parallelism: self.parallelism,
snapshot_uri: self.to.clone(),
storage_config: self.storage.clone(),
},
storage: Box::new(storage),
database_client,
}))
@@ -169,14 +211,22 @@ impl ExportCreateCommand {
/// Export tool implementation.
pub struct ExportCreate {
config: ExportConfig,
storage: Box<dyn SnapshotStorage>,
database_client: DatabaseClient,
}
struct ExportConfig {
catalog: String,
schemas: Option<Vec<String>>,
schema_only: bool,
_format: DataFormat,
format: DataFormat,
force: bool,
_parallelism: usize,
storage: Box<dyn SnapshotStorage>,
database_client: DatabaseClient,
time_range: TimeRange,
chunk_time_window: Option<Duration>,
parallelism: usize,
snapshot_uri: String,
storage_config: ObjectStoreConfig,
}
#[async_trait]
@@ -192,12 +242,12 @@ impl ExportCreate {
let exists = self.storage.exists().await?;
if exists {
if self.force {
if self.config.force {
info!("Deleting existing snapshot (--force)");
self.storage.delete_snapshot().await?;
} else {
// Resume mode - read existing manifest
let manifest = self.storage.read_manifest().await?;
let mut manifest = self.storage.read_manifest().await?;
// Check version compatibility
if manifest.version != MANIFEST_VERSION {
@@ -208,10 +258,7 @@ impl ExportCreate {
.fail();
}
// Cannot resume schema-only with data export
if manifest.schema_only && !self.schema_only {
return CannotResumeSchemaOnlySnafu.fail();
}
validate_resume_config(&manifest, &self.config)?;
info!(
"Resuming existing snapshot: {} (completed: {}/{} chunks)",
@@ -220,22 +267,31 @@ impl ExportCreate {
manifest.chunks.len()
);
// For M1, we only handle schema-only exports
// M2 will add chunk resume logic
if manifest.is_complete() {
info!("Snapshot is already complete");
return Ok(());
}
// TODO: Resume data export in M2
info!("Data export resume not yet implemented (M2)");
if manifest.schema_only {
return Ok(());
}
export_data(
self.storage.as_ref(),
&self.database_client,
&self.config.snapshot_uri,
&self.config.storage_config,
&mut manifest,
self.config.parallelism,
)
.await?;
return Ok(());
}
}
// 2. Get schema list
let extractor = SchemaExtractor::new(&self.database_client, &self.catalog);
let schema_snapshot = extractor.extract(self.schemas.as_deref()).await?;
let extractor = SchemaExtractor::new(&self.database_client, &self.config.catalog);
let schema_snapshot = extractor.extract(self.config.schemas.as_deref()).await?;
let schema_names: Vec<String> = schema_snapshot
.schemas
@@ -245,7 +301,14 @@ impl ExportCreate {
info!("Exporting schemas: {:?}", schema_names);
// 3. Create manifest
let manifest = Manifest::new_schema_only(self.catalog.clone(), schema_names.clone());
let mut manifest = Manifest::new_for_export(
self.config.catalog.clone(),
schema_names.clone(),
self.config.schema_only,
self.config.time_range.clone(),
self.config.format,
self.config.chunk_time_window,
)?;
// 4. Write schema files
self.storage.write_schema(&schema_snapshot).await?;
@@ -259,14 +322,28 @@ impl ExportCreate {
info!("Exported DDL for schema {} to {}", schema, ddl_path);
}
// 6. Write manifest last.
// 6. Write manifest after schema artifacts and before any data export.
//
// The manifest is the snapshot commit point: only write it after the schema
// index and all DDL files are durable, so a crash cannot leave a "valid"
// snapshot that is missing required schema artifacts.
// snapshot that is missing required schema artifacts. For full exports we
// still need the manifest before data copy starts, because chunk resume is
// tracked by updating this manifest in place.
self.storage.write_manifest(&manifest).await?;
info!("Snapshot created: {}", manifest.snapshot_id);
if !self.config.schema_only {
export_data(
self.storage.as_ref(),
&self.database_client,
&self.config.snapshot_uri,
&self.config.storage_config,
&mut manifest,
self.config.parallelism,
)
.await?;
}
Ok(())
}
@@ -321,7 +398,7 @@ impl ExportCreate {
"SELECT table_name, table_type FROM information_schema.tables \
WHERE table_catalog = '{}' AND table_schema = '{}' \
AND (table_type = 'BASE TABLE' OR table_type = 'VIEW')",
escape_sql_literal(&self.catalog),
escape_sql_literal(&self.config.catalog),
escape_sql_literal(schema)
);
let records: Option<Vec<Vec<Value>>> = self
@@ -359,7 +436,7 @@ impl ExportCreate {
let sql = format!(
"SELECT DISTINCT table_name FROM information_schema.columns \
WHERE table_catalog = '{}' AND table_schema = '{}' AND column_name = '__tsid'",
escape_sql_literal(&self.catalog),
escape_sql_literal(&self.config.catalog),
escape_sql_literal(schema)
);
let records: Option<Vec<Vec<Value>>> = self
@@ -392,14 +469,14 @@ impl ExportCreate {
Some(table) => format!(
r#"SHOW CREATE {} "{}"."{}"."{}""#,
show_type,
escape_sql_identifier(&self.catalog),
escape_sql_identifier(&self.config.catalog),
escape_sql_identifier(schema),
escape_sql_identifier(table)
),
None => format!(
r#"SHOW CREATE {} "{}"."{}""#,
show_type,
escape_sql_identifier(&self.catalog),
escape_sql_identifier(&self.config.catalog),
escape_sql_identifier(schema)
),
};
@@ -442,8 +519,118 @@ fn build_schema_ddl(
ddl
}
fn validate_resume_config(manifest: &Manifest, config: &ExportConfig) -> Result<()> {
if manifest.schema_only != config.schema_only {
return SchemaOnlyModeMismatchSnafu {
existing_schema_only: manifest.schema_only,
requested_schema_only: config.schema_only,
}
.fail();
}
if manifest.catalog != config.catalog {
return ResumeConfigMismatchSnafu {
field: "catalog",
existing: manifest.catalog.clone(),
requested: config.catalog.clone(),
}
.fail();
}
// If no schema filter is provided on resume, inherit the existing snapshot
// selection instead of reinterpreting the request as "all schemas".
if let Some(requested_schemas) = &config.schemas
&& !schema_selection_matches(&manifest.schemas, requested_schemas)
{
return ResumeConfigMismatchSnafu {
field: "schemas",
existing: format_schema_selection(&manifest.schemas),
requested: format_schema_selection(requested_schemas),
}
.fail();
}
if manifest.time_range != config.time_range {
return ResumeConfigMismatchSnafu {
field: "time_range",
existing: format!("{:?}", manifest.time_range),
requested: format!("{:?}", config.time_range),
}
.fail();
}
if manifest.format != config.format {
return ResumeConfigMismatchSnafu {
field: "format",
existing: manifest.format.to_string(),
requested: config.format.to_string(),
}
.fail();
}
let expected_plan = Manifest::new_for_export(
manifest.catalog.clone(),
manifest.schemas.clone(),
config.schema_only,
config.time_range.clone(),
config.format,
config.chunk_time_window,
)?;
if !chunk_plan_matches(manifest, &expected_plan) {
return ResumeConfigMismatchSnafu {
field: "chunk plan",
existing: format_chunk_plan(&manifest.chunks),
requested: format_chunk_plan(&expected_plan.chunks),
}
.fail();
}
Ok(())
}
fn schema_selection_matches(existing: &[String], requested: &[String]) -> bool {
canonical_schema_selection(existing) == canonical_schema_selection(requested)
}
fn canonical_schema_selection(schemas: &[String]) -> Vec<String> {
let mut canonicalized = Vec::new();
let mut seen = HashSet::new();
for schema in schemas {
let normalized = schema.to_ascii_lowercase();
if seen.insert(normalized.clone()) {
canonicalized.push(normalized);
}
}
canonicalized.sort();
canonicalized
}
fn format_schema_selection(schemas: &[String]) -> String {
format!("[{}]", schemas.join(", "))
}
fn chunk_plan_matches(existing: &Manifest, expected: &Manifest) -> bool {
existing.chunks.len() == expected.chunks.len()
&& existing
.chunks
.iter()
.zip(&expected.chunks)
.all(|(left, right)| left.id == right.id && left.time_range == right.time_range)
}
fn format_chunk_plan(chunks: &[ChunkMeta]) -> String {
let items = chunks
.iter()
.map(|chunk| format!("#{}:{:?}", chunk.id, chunk.time_range))
.collect::<Vec<_>>();
format!("[{}]", items.join(", "))
}
#[cfg(test)]
mod tests {
use chrono::TimeZone;
use clap::Parser;
use super::*;
@@ -478,19 +665,225 @@ mod tests {
}
#[tokio::test]
async fn test_build_rejects_non_schema_only_export() {
async fn test_build_rejects_chunk_window_without_bounds() {
let cmd = ExportCreateCommand::parse_from([
"export-v2-create",
"--addr",
"127.0.0.1:4000",
"--to",
"file:///tmp/export-v2-test",
"--chunk-time-window",
"1h",
]);
let result = cmd.build().await;
assert!(result.is_err());
let error = result.err().unwrap().to_string();
assert!(error.contains("Data export is not implemented yet"));
assert!(error.contains("chunk_time_window requires both --start-time and --end-time"));
}
#[tokio::test]
async fn test_build_rejects_data_export_args_in_schema_only_mode() {
let cmd = ExportCreateCommand::parse_from([
"export-v2-create",
"--addr",
"127.0.0.1:4000",
"--to",
"file:///tmp/export-v2-test",
"--schema-only",
"--start-time",
"2024-01-01T00:00:00Z",
"--end-time",
"2024-01-02T00:00:00Z",
"--chunk-time-window",
"1h",
"--format",
"csv",
"--parallelism",
"2",
]);
let error = cmd.build().await.err().unwrap().to_string();
assert!(error.contains("--schema-only cannot be used with data export arguments"));
assert!(error.contains("--start-time"));
assert!(error.contains("--end-time"));
assert!(error.contains("--chunk-time-window"));
assert!(error.contains("--format"));
assert!(error.contains("--parallelism"));
}
#[test]
fn test_schema_only_mode_mismatch_error_message() {
let error = crate::data::export_v2::error::SchemaOnlyModeMismatchSnafu {
existing_schema_only: false,
requested_schema_only: true,
}
.build()
.to_string();
assert!(error.contains("existing: false"));
assert!(error.contains("requested: true"));
}
#[test]
fn test_validate_resume_config_rejects_catalog_mismatch() {
let manifest = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string()],
false,
TimeRange::unbounded(),
DataFormat::Parquet,
None,
)
.unwrap();
let config = ExportConfig {
catalog: "other".to_string(),
schemas: None,
schema_only: false,
format: DataFormat::Parquet,
force: false,
time_range: TimeRange::unbounded(),
chunk_time_window: None,
parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};
let error = validate_resume_config(&manifest, &config)
.err()
.unwrap()
.to_string();
assert!(error.contains("catalog"));
}
#[test]
fn test_validate_resume_config_accepts_schema_selection_with_different_case_and_order() {
let manifest = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string(), "analytics".to_string()],
false,
TimeRange::unbounded(),
DataFormat::Parquet,
None,
)
.unwrap();
let config = ExportConfig {
catalog: "greptime".to_string(),
schemas: Some(vec![
"ANALYTICS".to_string(),
"PUBLIC".to_string(),
"public".to_string(),
]),
schema_only: false,
format: DataFormat::Parquet,
force: false,
time_range: TimeRange::unbounded(),
chunk_time_window: None,
parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};
assert!(validate_resume_config(&manifest, &config).is_ok());
}
#[test]
fn test_validate_resume_config_rejects_chunk_plan_mismatch() {
let start = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let end = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 2, 0, 0).unwrap();
let time_range = TimeRange::new(Some(start), Some(end));
let manifest = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string()],
false,
time_range.clone(),
DataFormat::Parquet,
None,
)
.unwrap();
let config = ExportConfig {
catalog: "greptime".to_string(),
schemas: None,
schema_only: false,
format: DataFormat::Parquet,
force: false,
time_range,
chunk_time_window: Some(Duration::from_secs(3600)),
parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};
let error = validate_resume_config(&manifest, &config)
.err()
.unwrap()
.to_string();
assert!(error.contains("chunk plan"));
}
#[test]
fn test_validate_resume_config_rejects_format_mismatch() {
let manifest = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string()],
false,
TimeRange::unbounded(),
DataFormat::Parquet,
None,
)
.unwrap();
let config = ExportConfig {
catalog: "greptime".to_string(),
schemas: None,
schema_only: false,
format: DataFormat::Csv,
force: false,
time_range: TimeRange::unbounded(),
chunk_time_window: None,
parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};
let error = validate_resume_config(&manifest, &config)
.err()
.unwrap()
.to_string();
assert!(error.contains("format"));
}
#[test]
fn test_validate_resume_config_rejects_time_range_mismatch() {
let start = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let end = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap();
let manifest = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string()],
false,
TimeRange::new(Some(start), Some(end)),
DataFormat::Parquet,
None,
)
.unwrap();
let config = ExportConfig {
catalog: "greptime".to_string(),
schemas: None,
schema_only: false,
format: DataFormat::Parquet,
force: false,
time_range: TimeRange::new(Some(start), Some(start)),
chunk_time_window: None,
parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};
let error = validate_resume_config(&manifest, &config)
.err()
.unwrap()
.to_string();
assert!(error.contains("time_range"));
}
}

View File

@@ -0,0 +1,166 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::info;
use crate::common::ObjectStoreConfig;
use crate::data::export_v2::data::{CopyOptions, build_copy_target, execute_copy_database};
use crate::data::export_v2::error::Result;
use crate::data::export_v2::manifest::{ChunkStatus, DataFormat, Manifest, TimeRange};
use crate::data::path::data_dir_for_schema_chunk;
use crate::data::snapshot_storage::{SnapshotStorage, StorageScheme};
use crate::database::DatabaseClient;
struct ExportContext<'a> {
storage: &'a dyn SnapshotStorage,
database_client: &'a DatabaseClient,
snapshot_uri: &'a str,
storage_config: &'a ObjectStoreConfig,
catalog: &'a str,
schemas: &'a [String],
format: DataFormat,
parallelism: usize,
}
pub async fn export_data(
storage: &dyn SnapshotStorage,
database_client: &DatabaseClient,
snapshot_uri: &str,
storage_config: &ObjectStoreConfig,
manifest: &mut Manifest,
parallelism: usize,
) -> Result<()> {
if manifest.chunks.is_empty() {
return Ok(());
}
for idx in 0..manifest.chunks.len() {
if matches!(
manifest.chunks[idx].status,
ChunkStatus::Completed | ChunkStatus::Skipped
) {
continue;
}
let (chunk_id, time_range) = mark_chunk_in_progress(manifest, idx);
manifest.touch();
storage.write_manifest(manifest).await?;
let context = ExportContext {
storage,
database_client,
snapshot_uri,
storage_config,
catalog: &manifest.catalog,
schemas: &manifest.schemas,
format: manifest.format,
parallelism,
};
let export_result = export_chunk(&context, chunk_id, time_range).await;
let result = match export_result {
Ok(files) => {
mark_chunk_completed(manifest, idx, files);
Ok(())
}
Err(err) => {
mark_chunk_failed(manifest, idx, err.to_string());
Err(err)
}
};
manifest.touch();
storage.write_manifest(manifest).await?;
result?;
}
Ok(())
}
fn mark_chunk_in_progress(manifest: &mut Manifest, idx: usize) -> (u32, TimeRange) {
let chunk = &mut manifest.chunks[idx];
chunk.mark_in_progress();
(chunk.id, chunk.time_range.clone())
}
fn mark_chunk_completed(manifest: &mut Manifest, idx: usize, files: Vec<String>) {
let chunk = &mut manifest.chunks[idx];
if files.is_empty() {
chunk.mark_skipped();
} else {
chunk.mark_completed(files, None);
}
}
fn mark_chunk_failed(manifest: &mut Manifest, idx: usize, error: String) {
let chunk = &mut manifest.chunks[idx];
chunk.mark_failed(error);
}
async fn export_chunk(
context: &ExportContext<'_>,
chunk_id: u32,
time_range: TimeRange,
) -> Result<Vec<String>> {
let scheme = StorageScheme::from_uri(context.snapshot_uri)?;
let needs_dir = matches!(scheme, StorageScheme::File);
let copy_options = CopyOptions {
format: context.format,
time_range,
parallelism: context.parallelism,
};
for schema in context.schemas {
let prefix = data_dir_for_schema_chunk(schema, chunk_id);
if needs_dir {
context.storage.create_dir_all(&prefix).await?;
}
let target = build_copy_target(
context.snapshot_uri,
context.storage_config,
schema,
chunk_id,
)?;
execute_copy_database(
context.database_client,
context.catalog,
schema,
&target,
&copy_options,
)
.await?;
}
let files = list_chunk_files(context.storage, context.schemas, chunk_id).await?;
info!("Collected {} files for chunk {}", files.len(), chunk_id);
Ok(files)
}
async fn list_chunk_files(
storage: &dyn SnapshotStorage,
schemas: &[String],
chunk_id: u32,
) -> Result<Vec<String>> {
let mut files = Vec::new();
for schema in schemas {
let prefix = data_dir_for_schema_chunk(schema, chunk_id);
files.extend(storage.list_files_recursive(&prefix).await?);
}
files.sort();
Ok(files)
}

View File

@@ -0,0 +1,440 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::{ExposeSecret, SecretString};
use common_telemetry::info;
use object_store::util::{join_path, normalize_path};
use snafu::ResultExt;
use url::Url;
use crate::common::ObjectStoreConfig;
use crate::data::export_v2::error::{DatabaseSnafu, InvalidUriSnafu, Result, UrlParseSnafu};
use crate::data::export_v2::manifest::{DataFormat, TimeRange};
use crate::data::path::data_dir_for_schema_chunk;
use crate::data::snapshot_storage::StorageScheme;
use crate::data::sql::{escape_sql_identifier, escape_sql_literal};
use crate::database::DatabaseClient;
pub(super) struct CopyOptions {
pub(super) format: DataFormat,
pub(super) time_range: TimeRange,
pub(super) parallelism: usize,
}
pub(super) struct CopyTarget {
pub(super) location: String,
pub(super) connection: String,
secrets: Vec<Option<String>>,
}
impl CopyTarget {
fn mask_sql(&self, sql: &str) -> String {
mask_secrets(sql, &self.secrets)
}
}
pub(super) fn build_copy_target(
snapshot_uri: &str,
storage: &ObjectStoreConfig,
schema: &str,
chunk_id: u32,
) -> Result<CopyTarget> {
let url = Url::parse(snapshot_uri).context(UrlParseSnafu)?;
let scheme = StorageScheme::from_uri(snapshot_uri)?;
let suffix = data_dir_for_schema_chunk(schema, chunk_id);
match scheme {
StorageScheme::File => {
let root = url.to_file_path().map_err(|_| {
InvalidUriSnafu {
uri: snapshot_uri,
reason: "file:// URI must use an absolute path like file:///tmp/backup",
}
.build()
})?;
let location = normalize_path(&format!("{}/{}", root.to_string_lossy(), suffix));
Ok(CopyTarget {
location,
connection: String::new(),
secrets: Vec::new(),
})
}
StorageScheme::S3 => {
let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
let location = format!("s3://{}/{}", bucket, join_root(&root, &suffix));
let (connection, secrets) = build_s3_connection(storage);
Ok(CopyTarget {
location,
connection,
secrets,
})
}
StorageScheme::Oss => {
let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
let location = format!("oss://{}/{}", bucket, join_root(&root, &suffix));
let (connection, secrets) = build_oss_connection(storage);
Ok(CopyTarget {
location,
connection,
secrets,
})
}
StorageScheme::Gcs => {
let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
let location = format!("gcs://{}/{}", bucket, join_root(&root, &suffix));
let (connection, secrets) = build_gcs_connection(storage, snapshot_uri)?;
Ok(CopyTarget {
location,
connection,
secrets,
})
}
StorageScheme::Azblob => {
let (bucket, root) = extract_bucket_root(&url, snapshot_uri)?;
let location = format!("azblob://{}/{}", bucket, join_root(&root, &suffix));
let (connection, secrets) = build_azblob_connection(storage);
Ok(CopyTarget {
location,
connection,
secrets,
})
}
}
}
pub(super) async fn execute_copy_database(
database_client: &DatabaseClient,
catalog: &str,
schema: &str,
target: &CopyTarget,
options: &CopyOptions,
) -> Result<()> {
let with_options = build_with_options(options);
let sql = format!(
r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
escape_sql_identifier(catalog),
escape_sql_identifier(schema),
escape_sql_literal(&target.location),
with_options,
target.connection
);
let safe_sql = target.mask_sql(&sql);
info!("Executing sql: {}", safe_sql);
database_client
.sql_in_public(&sql)
.await
.context(DatabaseSnafu)?;
Ok(())
}
fn build_with_options(options: &CopyOptions) -> String {
let mut parts = vec![format!("FORMAT='{}'", options.format)];
if let Some(start) = options.time_range.start {
parts.push(format!(
"START_TIME='{}'",
escape_sql_literal(&start.to_rfc3339())
));
}
if let Some(end) = options.time_range.end {
parts.push(format!(
"END_TIME='{}'",
escape_sql_literal(&end.to_rfc3339())
));
}
parts.push(format!("PARALLELISM={}", options.parallelism));
parts.join(", ")
}
fn extract_bucket_root(url: &Url, snapshot_uri: &str) -> Result<(String, String)> {
let bucket = url.host_str().unwrap_or("").to_string();
if bucket.is_empty() {
return InvalidUriSnafu {
uri: snapshot_uri,
reason: "URI must include bucket/container in host",
}
.fail();
}
let root = url
.path()
.trim_start_matches('/')
.trim_end_matches('/')
.to_string();
Ok((bucket, root))
}
fn join_root(root: &str, suffix: &str) -> String {
join_path(root, suffix).trim_start_matches('/').to_string()
}
fn build_s3_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
let access_key_id = expose_optional_secret(&storage.s3.s3_access_key_id);
let secret_access_key = expose_optional_secret(&storage.s3.s3_secret_access_key);
let mut options = Vec::new();
if let Some(access_key_id) = &access_key_id {
options.push(format!(
"ACCESS_KEY_ID='{}'",
escape_sql_literal(access_key_id)
));
}
if let Some(secret_access_key) = &secret_access_key {
options.push(format!(
"SECRET_ACCESS_KEY='{}'",
escape_sql_literal(secret_access_key)
));
}
if let Some(region) = &storage.s3.s3_region {
options.push(format!("REGION='{}'", escape_sql_literal(region)));
}
if let Some(endpoint) = &storage.s3.s3_endpoint {
options.push(format!("ENDPOINT='{}'", escape_sql_literal(endpoint)));
}
let secrets = vec![access_key_id, secret_access_key];
let connection = if options.is_empty() {
String::new()
} else {
format!(" CONNECTION ({})", options.join(", "))
};
(connection, secrets)
}
fn build_oss_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
let access_key_id = expose_optional_secret(&storage.oss.oss_access_key_id);
let access_key_secret = expose_optional_secret(&storage.oss.oss_access_key_secret);
let mut options = Vec::new();
if let Some(access_key_id) = &access_key_id {
options.push(format!(
"ACCESS_KEY_ID='{}'",
escape_sql_literal(access_key_id)
));
}
if let Some(access_key_secret) = &access_key_secret {
options.push(format!(
"ACCESS_KEY_SECRET='{}'",
escape_sql_literal(access_key_secret)
));
}
if !storage.oss.oss_endpoint.is_empty() {
options.push(format!(
"ENDPOINT='{}'",
escape_sql_literal(&storage.oss.oss_endpoint)
));
}
let secrets = vec![access_key_id, access_key_secret];
let connection = if options.is_empty() {
String::new()
} else {
format!(" CONNECTION ({})", options.join(", "))
};
(connection, secrets)
}
fn build_gcs_connection(
storage: &ObjectStoreConfig,
snapshot_uri: &str,
) -> Result<(String, Vec<Option<String>>)> {
let credential_path = expose_optional_secret(&storage.gcs.gcs_credential_path);
let credential = expose_optional_secret(&storage.gcs.gcs_credential);
if credential.is_none() && credential_path.is_some() {
return InvalidUriSnafu {
uri: snapshot_uri,
reason: "gcs_credential_path is not supported for server-side COPY; provide gcs_credential or rely on server-side ADC",
}
.fail();
}
let mut options = Vec::new();
if let Some(credential) = &credential {
options.push(format!("CREDENTIAL='{}'", escape_sql_literal(credential)));
}
if !storage.gcs.gcs_scope.is_empty() {
options.push(format!(
"SCOPE='{}'",
escape_sql_literal(&storage.gcs.gcs_scope)
));
}
if !storage.gcs.gcs_endpoint.is_empty() {
options.push(format!(
"ENDPOINT='{}'",
escape_sql_literal(&storage.gcs.gcs_endpoint)
));
}
let connection = if options.is_empty() {
String::new()
} else {
format!(" CONNECTION ({})", options.join(", "))
};
let secrets = vec![credential_path, credential];
Ok((connection, secrets))
}
fn build_azblob_connection(storage: &ObjectStoreConfig) -> (String, Vec<Option<String>>) {
let account_name = expose_optional_secret(&storage.azblob.azblob_account_name);
let account_key = expose_optional_secret(&storage.azblob.azblob_account_key);
let sas_token = storage.azblob.azblob_sas_token.clone();
let mut options = Vec::new();
if let Some(account_name) = &account_name {
options.push(format!(
"ACCOUNT_NAME='{}'",
escape_sql_literal(account_name)
));
}
if let Some(account_key) = &account_key {
options.push(format!("ACCOUNT_KEY='{}'", escape_sql_literal(account_key)));
}
if let Some(sas_token) = &sas_token {
options.push(format!("SAS_TOKEN='{}'", escape_sql_literal(sas_token)));
}
if !storage.azblob.azblob_endpoint.is_empty() {
options.push(format!(
"ENDPOINT='{}'",
escape_sql_literal(&storage.azblob.azblob_endpoint)
));
}
let secrets = vec![account_name, account_key, sas_token];
let connection = if options.is_empty() {
String::new()
} else {
format!(" CONNECTION ({})", options.join(", "))
};
(connection, secrets)
}
fn expose_optional_secret(secret: &Option<SecretString>) -> Option<String> {
secret.as_ref().map(|s| s.expose_secret().to_owned())
}
fn mask_secrets(sql: &str, secrets: &[Option<String>]) -> String {
let mut masked = sql.to_string();
for secret in secrets {
if let Some(secret) = secret
&& !secret.is_empty()
{
masked = masked.replace(secret, "[REDACTED]");
}
}
masked
}
#[cfg(test)]
mod tests {
use common_base::secrets::SecretString;
use super::*;
use crate::common::{PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection};
#[test]
fn test_build_oss_connection_includes_endpoint() {
let storage = ObjectStoreConfig {
oss: PrefixedOssConnection {
oss_endpoint: "https://oss.example.com".to_string(),
oss_access_key_id: Some(SecretString::from("key_id".to_string())),
oss_access_key_secret: Some(SecretString::from("key_secret".to_string())),
..Default::default()
},
..Default::default()
};
let (connection, _) = build_oss_connection(&storage);
assert!(connection.contains("ENDPOINT='https://oss.example.com'"));
}
#[test]
fn test_build_gcs_connection_uses_scope_and_inline_credential() {
let storage = ObjectStoreConfig {
gcs: PrefixedGcsConnection {
gcs_scope: "scope-a".to_string(),
gcs_endpoint: "https://storage.googleapis.com".to_string(),
gcs_credential: Some(SecretString::from("credential-json".to_string())),
..Default::default()
},
..Default::default()
};
let (connection, _) = build_gcs_connection(&storage, "gcs://bucket/root").unwrap();
assert!(connection.contains("CREDENTIAL='credential-json'"));
assert!(connection.contains("SCOPE='scope-a'"));
assert!(connection.contains("ENDPOINT='https://storage.googleapis.com'"));
assert!(!connection.contains("CREDENTIAL_PATH"));
}
#[test]
fn test_build_gcs_connection_rejects_credential_path_only() {
let storage = ObjectStoreConfig {
gcs: PrefixedGcsConnection {
gcs_scope: "scope-a".to_string(),
gcs_credential_path: Some(SecretString::from("/tmp/creds.json".to_string())),
..Default::default()
},
..Default::default()
};
let error = build_gcs_connection(&storage, "gcs://bucket/root")
.expect_err("credential_path-only should be rejected")
.to_string();
assert!(error.contains("gcs_credential_path is not supported"));
}
#[test]
fn test_build_azblob_connection_includes_endpoint() {
let storage = ObjectStoreConfig {
azblob: PrefixedAzblobConnection {
azblob_account_name: Some(SecretString::from("account".to_string())),
azblob_account_key: Some(SecretString::from("key".to_string())),
azblob_endpoint: "https://blob.example.com".to_string(),
..Default::default()
},
..Default::default()
};
let (connection, _) = build_azblob_connection(&storage);
assert!(connection.contains("ENDPOINT='https://blob.example.com'"));
}
#[test]
fn test_build_azblob_connection_redacts_sas_token() {
let storage = ObjectStoreConfig {
azblob: PrefixedAzblobConnection {
azblob_account_name: Some(SecretString::from("account".to_string())),
azblob_account_key: Some(SecretString::from("key".to_string())),
azblob_sas_token: Some("sig=secret-token".to_string()),
..Default::default()
},
..Default::default()
};
let (connection, secrets) = build_azblob_connection(&storage);
let masked = mask_secrets(&connection, &secrets);
assert!(connection.contains("SAS_TOKEN='sig=secret-token'"));
assert!(masked.contains("SAS_TOKEN='[REDACTED]'"));
assert!(!masked.contains("sig=secret-token"));
}
#[test]
fn test_build_copy_target_decodes_file_uri_path() {
let storage = ObjectStoreConfig::default();
let target = build_copy_target("file:///tmp/my%20backup", &storage, "public", 7)
.expect("file:// copy target should be built");
assert_eq!(target.location, "/tmp/my backup/data/public/7/");
}
}

View File

@@ -72,17 +72,55 @@ pub enum Error {
},
#[snafu(display(
"Cannot resume schema-only snapshot with data export. Use --force to recreate."
"Cannot resume snapshot with a different schema_only mode (existing: {}, requested: {}). Use --force to recreate.",
existing_schema_only,
requested_schema_only
))]
CannotResumeSchemaOnly {
SchemaOnlyModeMismatch {
existing_schema_only: bool,
requested_schema_only: bool,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Data export is not implemented yet. Use --schema-only to create a schema snapshot."
"Cannot resume snapshot with different {} (existing: {}, requested: {}). Use --force to recreate.",
field,
existing,
requested
))]
DataExportNotImplemented {
ResumeConfigMismatch {
field: String,
existing: String,
requested: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse time: invalid format: {}", input))]
TimeParseInvalidFormat {
input: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse time: end_time is before start_time"))]
TimeParseEndBeforeStart {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"chunk_time_window requires both --start-time and --end-time to be specified"
))]
ChunkTimeWindowRequiresBounds {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("--schema-only cannot be used with data export arguments: {}", args))]
SchemaOnlyArgsNotAllowed {
args: String,
#[snafu(implicit)]
location: Location,
},
@@ -154,9 +192,13 @@ impl ErrorExt for Error {
match self {
Error::InvalidUri { .. }
| Error::UnsupportedScheme { .. }
| Error::CannotResumeSchemaOnly { .. }
| Error::DataExportNotImplemented { .. }
| Error::ManifestVersionMismatch { .. } => StatusCode::InvalidArguments,
| Error::SchemaOnlyModeMismatch { .. }
| Error::ResumeConfigMismatch { .. }
| Error::ManifestVersionMismatch { .. }
| Error::SchemaOnlyArgsNotAllowed { .. } => StatusCode::InvalidArguments,
Error::TimeParseInvalidFormat { .. }
| Error::TimeParseEndBeforeStart { .. }
| Error::ChunkTimeWindowRequiresBounds { .. } => StatusCode::InvalidArguments,
Error::StorageOperation { .. }
| Error::ManifestParse { .. }

View File

@@ -14,12 +14,19 @@
//! Manifest data structures for Export/Import V2.
use std::time::Duration;
use std::{fmt, str};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::data::export_v2::chunker::generate_chunks;
use crate::data::export_v2::error::{
ChunkTimeWindowRequiresBoundsSnafu, Result as ExportResult, TimeParseEndBeforeStartSnafu,
TimeParseInvalidFormatSnafu,
};
/// Current manifest format version.
pub const MANIFEST_VERSION: u32 = 1;
@@ -55,6 +62,31 @@ impl TimeRange {
pub fn is_unbounded(&self) -> bool {
self.start.is_none() && self.end.is_none()
}
/// Returns true if both bounds are specified.
pub fn is_bounded(&self) -> bool {
self.start.is_some() && self.end.is_some()
}
/// Parses a time range from optional RFC3339 strings.
pub fn parse(start: Option<&str>, end: Option<&str>) -> ExportResult<Self> {
let start = start.map(parse_time).transpose()?;
let end = end.map(parse_time).transpose()?;
if let (Some(start), Some(end)) = (start, end)
&& end < start
{
return TimeParseEndBeforeStartSnafu.fail();
}
Ok(Self::new(start, end))
}
}
fn parse_time(input: &str) -> ExportResult<DateTime<Utc>> {
DateTime::parse_from_rfc3339(input)
.map(|dt| dt.with_timezone(&Utc))
.map_err(|_| TimeParseInvalidFormatSnafu { input }.build())
}
impl Default for TimeRange {
@@ -74,6 +106,8 @@ pub enum ChunkStatus {
InProgress,
/// Chunk export completed successfully.
Completed,
/// Chunk had no data to export.
Skipped,
/// Chunk export failed.
Failed,
}
@@ -111,6 +145,13 @@ impl ChunkMeta {
}
}
/// Creates a skipped chunk with the given id and time range.
pub fn skipped(id: u32, time_range: TimeRange) -> Self {
let mut chunk = Self::new(id, time_range);
chunk.mark_skipped();
chunk
}
/// Marks this chunk as in progress.
pub fn mark_in_progress(&mut self) {
self.status = ChunkStatus::InProgress;
@@ -125,6 +166,14 @@ impl ChunkMeta {
self.error = None;
}
/// Marks this chunk as skipped because no data files were produced.
pub fn mark_skipped(&mut self) {
self.status = ChunkStatus::Skipped;
self.files.clear();
self.checksum = None;
self.error = None;
}
/// Marks this chunk as failed with the given error message.
pub fn mark_failed(&mut self, error: String) {
self.status = ChunkStatus::Failed;
@@ -210,6 +259,35 @@ pub struct Manifest {
}
impl Manifest {
pub fn new_for_export(
catalog: String,
schemas: Vec<String>,
schema_only: bool,
time_range: TimeRange,
format: DataFormat,
chunk_time_window: Option<Duration>,
) -> ExportResult<Self> {
if chunk_time_window.is_some() && !time_range.is_bounded() {
return ChunkTimeWindowRequiresBoundsSnafu.fail();
}
let mut manifest = if schema_only {
Self::new_schema_only(catalog, schemas)
} else {
Self::new_full(catalog, schemas, time_range, format)
};
if !schema_only {
manifest.chunks = match chunk_time_window {
Some(window) => generate_chunks(&manifest.time_range, window),
None => generate_single_chunk(&manifest.time_range),
};
manifest.touch();
}
Ok(manifest)
}
/// Creates a new manifest for schema-only export.
pub fn new_schema_only(catalog: String, schemas: Vec<String>) -> Self {
let now = Utc::now();
@@ -258,7 +336,7 @@ impl Manifest {
&& self
.chunks
.iter()
.all(|c| c.status == ChunkStatus::Completed))
.all(|c| matches!(c.status, ChunkStatus::Completed | ChunkStatus::Skipped)))
}
/// Returns the number of pending chunks.
@@ -285,6 +363,14 @@ impl Manifest {
.count()
}
/// Returns the number of skipped chunks.
pub fn skipped_count(&self) -> usize {
self.chunks
.iter()
.filter(|c| c.status == ChunkStatus::Skipped)
.count()
}
/// Returns the number of failed chunks.
pub fn failed_count(&self) -> usize {
self.chunks
@@ -313,8 +399,24 @@ impl Manifest {
}
}
fn generate_single_chunk(time_range: &TimeRange) -> Vec<ChunkMeta> {
if let (Some(start), Some(end)) = (time_range.start, time_range.end) {
if start == end {
return vec![ChunkMeta::skipped(1, time_range.clone())];
}
if start > end {
return Vec::new();
}
}
vec![ChunkMeta::new(1, time_range.clone())]
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use chrono::{TimeZone, Utc};
use super::*;
#[test]
@@ -338,6 +440,26 @@ mod tests {
assert!(manifest.is_complete());
}
#[test]
fn test_generate_single_chunk_zero_width_range_is_skipped() {
let ts = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let chunks = generate_single_chunk(&TimeRange::new(Some(ts), Some(ts)));
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].status, ChunkStatus::Skipped);
assert_eq!(chunks[0].time_range.start, Some(ts));
assert_eq!(chunks[0].time_range.end, Some(ts));
}
#[test]
fn test_generate_single_chunk_invalid_range_is_empty() {
let start = Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let chunks = generate_single_chunk(&TimeRange::new(Some(start), Some(end)));
assert!(chunks.is_empty());
}
#[test]
fn test_manifest_full() {
let manifest = Manifest::new_full(
@@ -377,5 +499,71 @@ mod tests {
);
assert_eq!(chunk.status, ChunkStatus::Completed);
assert_eq!(chunk.files.len(), 1);
chunk.mark_skipped();
assert_eq!(chunk.status, ChunkStatus::Skipped);
assert!(chunk.files.is_empty());
}
#[test]
fn test_manifest_is_complete_when_chunks_are_completed_or_skipped() {
let mut manifest = Manifest::new_full(
"greptime".to_string(),
vec!["public".to_string()],
TimeRange::unbounded(),
DataFormat::Parquet,
);
manifest.add_chunk(ChunkMeta::new(1, TimeRange::unbounded()));
manifest.add_chunk(ChunkMeta::new(2, TimeRange::unbounded()));
manifest.update_chunk(1, |chunk| {
chunk.mark_completed(vec!["a.parquet".to_string()], None)
});
manifest.update_chunk(2, |chunk| chunk.mark_skipped());
assert!(manifest.is_complete());
assert_eq!(manifest.completed_count(), 1);
assert_eq!(manifest.skipped_count(), 1);
}
#[test]
fn test_manifest_chunk_time_window_none_single_chunk() {
let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap();
let range = TimeRange::new(Some(start), Some(end));
let manifest = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string()],
false,
range.clone(),
DataFormat::Parquet,
None,
)
.unwrap();
assert_eq!(manifest.chunks.len(), 1);
assert_eq!(manifest.chunks[0].time_range, range);
}
#[test]
fn test_time_range_parse_requires_order() {
let result = TimeRange::parse(Some("2025-01-02T00:00:00Z"), Some("2025-01-01T00:00:00Z"));
assert!(result.is_err());
}
#[test]
fn test_new_for_export_with_chunk_window_requires_bounded_range() {
let result = Manifest::new_for_export(
"greptime".to_string(),
vec!["public".to_string()],
false,
TimeRange::new(
None,
Some(Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap()),
),
DataFormat::Parquet,
Some(Duration::from_secs(3600)),
);
assert!(result.is_err());
}
}

View File

@@ -27,7 +27,8 @@ use crate::Tool;
use crate::common::ObjectStoreConfig;
use crate::data::export_v2::manifest::MANIFEST_VERSION;
use crate::data::import_v2::error::{
ManifestVersionMismatchSnafu, Result, SchemaNotInSnapshotSnafu, SnapshotStorageSnafu,
FullSnapshotImportNotSupportedSnafu, ManifestVersionMismatchSnafu, Result,
SchemaNotInSnapshotSnafu, SnapshotStorageSnafu,
};
use crate::data::import_v2::executor::{DdlExecutor, DdlStatement};
use crate::data::path::ddl_path_for_schema;
@@ -58,10 +59,6 @@ pub struct ImportV2Command {
#[clap(long)]
dry_run: bool,
/// Concurrency level (for future use).
#[clap(long, default_value = "1")]
parallelism: usize,
/// Basic authentication (user:password).
#[clap(long)]
auth_basic: Option<String>,
@@ -121,7 +118,6 @@ impl ImportV2Command {
Ok(Box::new(Import {
schemas,
dry_run: self.dry_run,
_parallelism: self.parallelism,
storage: Box::new(storage),
database_client,
}))
@@ -132,7 +128,6 @@ impl ImportV2Command {
pub struct Import {
schemas: Option<Vec<String>>,
dry_run: bool,
_parallelism: usize,
storage: Box<dyn SnapshotStorage>,
database_client: DatabaseClient,
}
@@ -169,6 +164,13 @@ impl Import {
info!("Snapshot contains {} schema(s)", manifest.schemas.len());
if !manifest.schema_only && !manifest.chunks.is_empty() {
return FullSnapshotImportNotSupportedSnafu {
chunk_count: manifest.chunks.len(),
}
.fail();
}
// 2. Determine schemas to import
let schemas_to_import = match &self.schemas {
Some(filter) => canonicalize_schema_filter(filter, &manifest.schemas)?,
@@ -203,14 +205,6 @@ impl Import {
ddl_statements.len()
);
// 6. Data import would happen here for non-schema-only snapshots (M2/M3)
if !manifest.schema_only && !manifest.chunks.is_empty() {
info!(
"Data import not yet implemented (M3). {} chunks pending.",
manifest.chunks.len()
);
}
Ok(())
}
@@ -403,7 +397,114 @@ fn canonicalize_schema_filter(
#[cfg(test)]
mod tests {
use std::time::Duration;
use async_trait::async_trait;
use super::*;
use crate::Tool;
use crate::data::export_v2::manifest::{ChunkMeta, DataFormat, Manifest, TimeRange};
use crate::data::export_v2::schema::SchemaSnapshot;
use crate::data::snapshot_storage::SnapshotStorage;
use crate::database::DatabaseClient;
struct StubStorage {
manifest: Manifest,
}
#[async_trait]
impl SnapshotStorage for StubStorage {
async fn exists(&self) -> crate::data::export_v2::error::Result<bool> {
Ok(true)
}
async fn read_manifest(&self) -> crate::data::export_v2::error::Result<Manifest> {
Ok(self.manifest.clone())
}
async fn write_manifest(
&self,
_manifest: &Manifest,
) -> crate::data::export_v2::error::Result<()> {
unimplemented!("not needed in import_v2::command tests")
}
async fn read_text(&self, _path: &str) -> crate::data::export_v2::error::Result<String> {
unimplemented!("not needed in import_v2::command tests")
}
async fn write_text(
&self,
_path: &str,
_content: &str,
) -> crate::data::export_v2::error::Result<()> {
unimplemented!("not needed in import_v2::command tests")
}
async fn write_schema(
&self,
_snapshot: &SchemaSnapshot,
) -> crate::data::export_v2::error::Result<()> {
unimplemented!("not needed in import_v2::command tests")
}
async fn create_dir_all(&self, _path: &str) -> crate::data::export_v2::error::Result<()> {
unimplemented!("not needed in import_v2::command tests")
}
async fn list_files_recursive(
&self,
_prefix: &str,
) -> crate::data::export_v2::error::Result<Vec<String>> {
unimplemented!("not needed in import_v2::command tests")
}
async fn delete_snapshot(&self) -> crate::data::export_v2::error::Result<()> {
unimplemented!("not needed in import_v2::command tests")
}
}
fn test_database_client() -> DatabaseClient {
DatabaseClient::new(
"127.0.0.1:4000".to_string(),
"greptime".to_string(),
None,
Duration::from_secs(1),
None,
false,
)
}
#[tokio::test]
async fn test_import_rejects_full_snapshot_before_schema_execution() {
let mut manifest = Manifest::new_full(
"greptime".to_string(),
vec!["public".to_string()],
TimeRange::unbounded(),
DataFormat::Parquet,
);
manifest
.chunks
.push(ChunkMeta::new(1, TimeRange::unbounded()));
let import = Import {
schemas: None,
dry_run: false,
storage: Box::new(StubStorage { manifest }),
database_client: test_database_client(),
};
let error = import
.do_work()
.await
.expect_err("full snapshot import should fail");
assert!(
error
.to_string()
.contains("Importing data from full snapshots is not implemented yet")
);
}
#[test]
fn test_parse_ddl_statements() {

View File

@@ -45,6 +45,16 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Importing data from full snapshots is not implemented yet (snapshot has {} chunk(s))",
chunk_count
))]
FullSnapshotImportNotSupported {
chunk_count: usize,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Snapshot storage error"))]
SnapshotStorage {
#[snafu(source)]
@@ -67,10 +77,10 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::SnapshotNotFound { .. } | Error::SchemaNotInSnapshot { .. } => {
StatusCode::InvalidArguments
}
Error::ManifestVersionMismatch { .. } => StatusCode::InvalidArguments,
Error::SnapshotNotFound { .. }
| Error::SchemaNotInSnapshot { .. }
| Error::ManifestVersionMismatch { .. }
| Error::FullSnapshotImportNotSupported { .. } => StatusCode::InvalidArguments,
Error::Database { error, .. } => error.status_code(),
Error::SnapshotStorage { error, .. } => error.status_code(),
}

View File

@@ -25,6 +25,10 @@ pub(crate) fn ddl_path_for_schema(schema: &str) -> String {
)
}
pub(crate) fn data_dir_for_schema_chunk(schema: &str, chunk_id: u32) -> String {
format!("data/{}/{}/", encode_path_segment(schema), chunk_id)
}
pub(crate) fn encode_path_segment(value: &str) -> String {
let mut encoded = String::with_capacity(value.len());
for byte in value.bytes() {
@@ -73,4 +77,13 @@ mod tests {
"schema/ddl/%2E%2E%2Fevil.sql"
);
}
#[test]
fn test_data_dir_for_schema_chunk_encodes_schema_segment() {
assert_eq!(data_dir_for_schema_chunk("public", 1), "data/public/1/");
assert_eq!(
data_dir_for_schema_chunk("../evil", 7),
"data/%2E%2E%2Fevil/7/"
);
}
}

View File

@@ -18,9 +18,12 @@
//! to various storage backends (S3, OSS, GCS, Azure Blob, local filesystem).
use async_trait::async_trait;
use futures::TryStreamExt;
use object_store::services::{Azblob, Fs, Gcs, Oss, S3};
use object_store::util::{with_instrument_layers, with_retry_layers};
use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection};
use object_store::{
AzblobConnection, ErrorKind, GcsConnection, ObjectStore, OssConnection, S3Connection,
};
use snafu::ResultExt;
use url::Url;
@@ -139,14 +142,14 @@ fn extract_file_path_from_uri(uri: &str) -> Result<String> {
.fail(),
_ => url
.to_file_path()
.map(|path| path.to_string_lossy().into_owned())
.map_err(|_| {
InvalidUriSnafu {
uri,
reason: "file:// URI must use a valid absolute filesystem path",
reason: "file:// URI must use an absolute path like file:///tmp/backup",
}
.build()
}),
})
.map(|path| path.to_string_lossy().into_owned()),
}
}
@@ -184,6 +187,12 @@ pub trait SnapshotStorage: Send + Sync {
/// Reads a text file from a relative path under the snapshot root.
async fn read_text(&self, path: &str) -> Result<String>;
/// Creates a directory-like prefix under the snapshot root when needed by the backend.
async fn create_dir_all(&self, path: &str) -> Result<()>;
/// Lists files recursively under a relative prefix.
async fn list_files_recursive(&self, prefix: &str) -> Result<Vec<String>>;
/// Deletes the entire snapshot (for --force).
async fn delete_snapshot(&self) -> Result<()>;
}
@@ -443,6 +452,38 @@ impl SnapshotStorage for OpenDalStorage {
String::from_utf8(data).context(TextDecodeSnafu)
}
async fn create_dir_all(&self, path: &str) -> Result<()> {
self.object_store
.create_dir(path)
.await
.context(StorageOperationSnafu {
operation: format!("create dir {}", path),
})
}
async fn list_files_recursive(&self, prefix: &str) -> Result<Vec<String>> {
let mut lister = match self.object_store.lister_with(prefix).recursive(true).await {
Ok(lister) => lister,
Err(error) if error.kind() == ErrorKind::NotFound => return Ok(Vec::new()),
Err(error) => {
return Err(error).context(StorageOperationSnafu {
operation: format!("list {}", prefix),
});
}
};
let mut files = Vec::new();
while let Some(entry) = lister.try_next().await.context(StorageOperationSnafu {
operation: format!("list {}", prefix),
})? {
if entry.metadata().is_dir() {
continue;
}
files.push(entry.path().to_string());
}
Ok(files)
}
async fn delete_snapshot(&self) -> Result<()> {
self.object_store
.remove_all("/")
@@ -533,6 +574,14 @@ mod tests {
extract_file_path_from_uri("file://localhost/tmp/backup").unwrap(),
"/tmp/backup"
);
assert_eq!(
extract_file_path_from_uri("file:///tmp/my%20backup").unwrap(),
"/tmp/my backup"
);
assert_eq!(
extract_file_path_from_uri("file://localhost/tmp/my%20backup").unwrap(),
"/tmp/my backup"
);
}
#[test]

View File

@@ -512,7 +512,7 @@ struct FlightContext {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use api::v1::auth_header::AuthScheme;
use api::v1::{AuthHeader, Basic};

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
mod client;
pub mod client_manager;
pub mod database;

View File

@@ -54,7 +54,6 @@ common-telemetry = { workspace = true, features = [
common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
const_format.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-physical-plan.workspace = true

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![recursion_limit = "256"]
#![doc = include_str!("../../../../README.md")]
use clap::{Parser, Subcommand};

View File

@@ -21,7 +21,7 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::options::GlobalOptions;
use crate::{App, Result, error};
pub const APP_NAME: &str = const_format::concatcp!(common_version::product_name(), "-cli");
pub const APP_NAME: &str = "greptime-cli";
use async_trait::async_trait;
pub struct Instance {

View File

@@ -43,7 +43,7 @@ use crate::error::{
};
use crate::options::{GlobalOptions, GreptimeOptions};
pub const APP_NAME: &str = const_format::concatcp!(common_version::product_name(), "-datanode");
pub const APP_NAME: &str = "greptime-datanode";
type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
@@ -356,7 +356,7 @@ impl StartCommand {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::io::Write;
use std::time::Duration;

View File

@@ -662,7 +662,7 @@ impl ScanbenchCommand {
// Sort ranges within each partition by start time ascending
for partition in &mut partitions {
partition.sort_by(|a, b| a.start.cmp(&b.start));
partition.sort_by_key(|a| a.start);
}
scanner
@@ -677,7 +677,9 @@ impl ScanbenchCommand {
// Scan all partitions
let num_partitions = scanner.properties().partitions.len();
let ctx = QueryScanContext::default();
let ctx = QueryScanContext {
explain_verbose: self.verbose,
};
let metrics_set = ExecutionPlanMetricsSet::new();
let mut scan_futures = FuturesUnordered::new();

View File

@@ -35,7 +35,6 @@ use common_stat::ResourceStatImpl;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_version::{short_version, verbose_version};
use const_format::concatcp;
use flow::{
FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendClient, FrontendInvoker,
get_flow_auth_options,
@@ -53,7 +52,7 @@ use crate::error::{
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
pub const APP_NAME: &str = concatcp!(common_version::product_name(), "-flownode");
pub const APP_NAME: &str = "greptime-flownode";
type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;

View File

@@ -72,7 +72,7 @@ pub struct Instance {
_guard: Vec<WorkerGuard>,
}
pub const APP_NAME: &str = const_format::concatcp!(common_version::product_name(), "-frontend");
pub const APP_NAME: &str = "greptime-frontend";
impl Instance {
pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
#![recursion_limit = "256"]
use async_trait::async_trait;
use common_error::ext::ErrorExt;

View File

@@ -24,7 +24,6 @@ use common_meta::distributed_time_constants::init_distributed_time_constants;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_version::{short_version, verbose_version};
use const_format::concatcp;
use meta_srv::bootstrap::{MetasrvInstance, metasrv_builder};
use meta_srv::metasrv::BackendImpl;
use snafu::ResultExt;
@@ -36,7 +35,7 @@ use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_hea
type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
pub const APP_NAME: &str = concatcp!(common_version::product_name(), "-metasrv");
pub const APP_NAME: &str = "greptime-metasrv";
pub struct Instance {
instance: MetasrvInstance,

View File

@@ -48,7 +48,6 @@ use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, verbose_version};
use const_format::concatcp;
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
@@ -76,7 +75,7 @@ use crate::error::{OtherSnafu, Result, StartFlownodeSnafu};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
pub const APP_NAME: &str = concatcp!(common_version::product_name(), "-standalone");
pub const APP_NAME: &str = "greptime-standalone";
#[derive(Parser)]
pub struct Command {

View File

@@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use std::vec;
use std::{assert_matches, vec};
use common_test_util::find_workspace_path;
use datafusion::assert_batches_eq;

View File

@@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
#![feature(type_alias_impl_trait)]
pub mod buffered_writer;
pub mod compressed_writer;
pub mod compression;

View File

@@ -52,7 +52,7 @@ pub enum Error {
#[snafu(display("Failed to invoke list process service"))]
CreateChannel {
source: common_grpc::error::Error,
source: Box<common_grpc::error::Error>,
#[snafu(implicit)]
location: Location,
},

View File

@@ -91,6 +91,7 @@ impl FrontendSelector for MetaClientSelector {
let channel = self
.channel_manager
.get(node.peer.addr)
.map_err(Box::new)
.context(error::CreateChannelSnafu)?;
let client = frontend_client::FrontendClient::new(channel);
Ok(Box::new(client) as FrontendClientPtr)

View File

@@ -13,7 +13,6 @@
// limitations under the License.
#![feature(try_blocks)]
#![feature(assert_matches)]
mod admin;
mod flush_flow;

View File

@@ -794,16 +794,12 @@ impl Tokenizer {
is_quote_present = true;
break;
}
' ' => {
if !is_quoted {
break;
}
' ' if !is_quoted => {
break;
}
'(' | ')' | '+' | '-' => {
if !is_quoted {
self.rewind_one();
break;
}
'(' | ')' | '+' | '-' if !is_quoted => {
self.rewind_one();
break;
}
'\\' => {
let Some(next) = self.consume_next(pattern) else {

View File

@@ -141,7 +141,7 @@ where
results.push((self.func)(v0, v1)?);
}
let results = ScalarValue::iter_to_array(results.into_iter())?;
let results = ScalarValue::iter_to_array(results)?;
Ok(ColumnarValue::Array(results))
}
}
@@ -200,7 +200,7 @@ where
}
}
let results = ScalarValue::iter_to_array(results.into_iter())?;
let results = ScalarValue::iter_to_array(results)?;
Ok(ColumnarValue::Array(results))
}
}
@@ -232,7 +232,7 @@ where
results.push((self.func)(&v)?);
}
let results = ScalarValue::iter_to_array(results.into_iter())?;
let results = ScalarValue::iter_to_array(results)?;
Ok(ColumnarValue::Array(results))
}
}

View File

@@ -167,7 +167,7 @@ mod tests {
"External error: Invalid vector string: [7.0,hello,9.0]",
];
for (input, expected) in inputs.into_iter().zip(expected.into_iter()) {
for (input, expected) in inputs.into_iter().zip(expected) {
let args = ScalarFunctionArgs {
args: vec![ColumnarValue::Array(Arc::new(input))],
arg_fields: vec![],

View File

@@ -172,7 +172,7 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
true
}
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
quota.metrics.inc_rejected("try_acquire_additional");
quota.metrics.inc_exhausted("try_acquire_additional");
false
}
}

View File

@@ -45,5 +45,5 @@ impl MemoryMetrics for NoOpMetrics {
fn set_in_use(&self, _: i64) {}
#[inline(always)]
fn inc_rejected(&self, _: &str) {}
fn inc_exhausted(&self, _: &str) {}
}

View File

@@ -29,7 +29,8 @@ use crate::policy::OnExhaustedPolicy;
pub trait MemoryMetrics: Clone + Send + Sync + 'static {
fn set_limit(&self, bytes: i64);
fn set_in_use(&self, bytes: i64);
fn inc_rejected(&self, reason: &str);
/// Record that immediate memory acquisition failed due to exhausted quota.
fn inc_exhausted(&self, reason: &str);
}
/// Generic memory manager for quota-controlled operations.
@@ -171,7 +172,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
Some(MemoryGuard::limited(quota.clone(), permit))
}
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
quota.metrics.inc_rejected("try_acquire");
quota.metrics.inc_exhausted("try_acquire");
None
}
}

View File

@@ -303,7 +303,7 @@ impl TryFrom<i32> for Role {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use common_workload::DatanodeWorkloadType;

View File

@@ -72,7 +72,7 @@ impl State for DropDatabaseStart {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::sync::Arc;
use crate::ddl::drop_database::cursor::DropDatabaseCursor;

View File

@@ -322,7 +322,7 @@ impl DropTableExecutor {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;

View File

@@ -19,7 +19,7 @@ pub mod datanode_handler;
pub mod flownode_handler;
pub mod region_metadata;
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::collections::HashMap;
use api::v1::meta::Partition;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::sync::Arc;
use api::region::RegionResponse;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
@@ -256,7 +256,7 @@ async fn test_on_submit_alter_request() {
results.push(result);
}
rx.try_recv().unwrap_err();
results.sort_unstable_by(|(a, _), (b, _)| a.id.cmp(&b.id));
results.sort_unstable_by_key(|(a, _)| a.id);
let (peer, request) = results.remove(0);
assert_alter_request(peer, request, 1, RegionId::new(table_id, 1));
@@ -310,7 +310,7 @@ async fn test_on_submit_alter_request_without_sync_request() {
results.push(result);
}
rx.try_recv().unwrap_err();
results.sort_unstable_by(|(a, _), (b, _)| a.id.cmp(&b.id));
results.sort_unstable_by_key(|(a, _)| a.id);
let (peer, request) = results.remove(0);
assert_alter_request(peer, request, 1, RegionId::new(table_id, 1));

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::sync::Arc;
use api::region::RegionResponse;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::collections::HashSet;
use std::sync::Arc;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;

View File

@@ -172,7 +172,7 @@ async fn test_on_datanode_drop_regions() {
let result = rx.try_recv().unwrap();
results.push(result);
}
results.sort_unstable_by(|(a, _), (b, _)| a.id.cmp(&b.id));
results.sort_unstable_by_key(|(a, _)| a.id);
let (peer, request) = results.remove(0);
check(peer, request, 1, RegionId::new(table_id, 1), false);

View File

@@ -987,8 +987,7 @@ impl MySqlElection {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::env;
use std::{assert_matches, env};
use common_telemetry::init_default_ut_logging;
use sqlx::MySqlPool;

View File

@@ -829,8 +829,7 @@ impl PgElection {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::env;
use std::{assert_matches, env};
use deadpool_postgres::{Config, Runtime};
use tokio_postgres::NoTls;

View File

@@ -708,7 +708,7 @@ impl TableMetadataManager {
// Creates view info
let view_info_value = ViewInfoValue::new(
raw_logical_plan,
raw_logical_plan.into(),
table_names,
columns,
plan_columns,
@@ -1184,7 +1184,7 @@ impl TableMetadataManager {
definition: String,
) -> Result<()> {
let new_view_info_value = current_view_info_value.update(
new_view_info,
new_view_info.into(),
table_names,
columns,
plan_columns,
@@ -2752,7 +2752,7 @@ mod tests {
let new_definition = "CREATE VIEW test AS SELECT * FROM b_table join c_table";
let current_view_info_value = DeserializedValueWithBytes::from_inner(ViewInfoValue::new(
logical_plan.clone(),
logical_plan.clone().into(),
table_names,
columns,
plan_columns,
@@ -2803,7 +2803,7 @@ mod tests {
let wrong_definition = "wrong_definition";
let wrong_view_info_value =
DeserializedValueWithBytes::from_inner(current_view_info_value.update(
wrong_view_info,
wrong_view_info.into(),
new_table_names.clone(),
new_columns.clone(),
new_plan_columns.clone(),

View File

@@ -390,7 +390,7 @@ impl std::fmt::Debug for FlowMetadataManager {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::collections::BTreeMap;
use std::sync::Arc;

View File

@@ -237,7 +237,7 @@ impl TopicNameManager {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::sync::Arc;
use super::*;

View File

@@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::sync::Arc;
use common_base::bytes::Bytes;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;
@@ -31,9 +32,6 @@ use crate::kv_backend::KvBackendRef;
use crate::kv_backend::txn::Txn;
use crate::rpc::store::BatchGetRequest;
/// The VIEW logical plan encoded bytes
type RawViewLogicalPlan = Vec<u8>;
/// The key stores the metadata of the view.
///
/// The layout: `__view_info/{view_id}`.
@@ -86,7 +84,7 @@ impl MetadataKey<'_, ViewInfoKey> for ViewInfoKey {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ViewInfoValue {
// The encoded logical plan
pub view_info: RawViewLogicalPlan,
pub view_info: Bytes,
// The resolved fully table names in logical plan
pub table_names: HashSet<TableName>,
// The view columns
@@ -100,7 +98,7 @@ pub struct ViewInfoValue {
impl ViewInfoValue {
pub fn new(
view_info: RawViewLogicalPlan,
view_info: Bytes,
table_names: HashSet<TableName>,
columns: Vec<String>,
plan_columns: Vec<String>,
@@ -118,7 +116,7 @@ impl ViewInfoValue {
pub(crate) fn update(
&self,
new_view_info: RawViewLogicalPlan,
new_view_info: Bytes,
table_names: HashSet<TableName>,
columns: Vec<String>,
plan_columns: Vec<String>,
@@ -305,7 +303,7 @@ mod tests {
};
let value = ViewInfoValue {
view_info: vec![1, 2, 3],
view_info: Bytes::from([1, 2, 3].as_ref()),
version: 1,
table_names,
columns: vec!["a".to_string()],
@@ -316,4 +314,53 @@ mod tests {
let deserialized = ViewInfoValue::try_from_raw_value(&serialized).unwrap();
assert_eq!(value, deserialized);
}
#[test]
fn test_deserialize_view_info_value_with_vec_u8() {
#[derive(Serialize)]
struct OldViewInfoValue {
view_info: Vec<u8>,
table_names: HashSet<TableName>,
columns: Vec<String>,
plan_columns: Vec<String>,
definition: String,
version: u64,
}
let table_names = {
let mut set = HashSet::new();
set.insert(TableName {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "a_table".to_string(),
});
set.insert(TableName {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "b_table".to_string(),
});
set
};
let old_value = OldViewInfoValue {
view_info: vec![1, 2, 3],
table_names: table_names.clone(),
columns: vec!["a".to_string()],
plan_columns: vec!["number".to_string()],
definition: "CREATE VIEW test AS SELECT * FROM numbers".to_string(),
version: 1,
};
let serialized = serde_json::to_vec(&old_value).unwrap();
let deserialized = ViewInfoValue::try_from_raw_value(&serialized).unwrap();
assert_eq!(deserialized.view_info, vec![1, 2, 3]);
assert_eq!(deserialized.table_names, table_names);
assert_eq!(deserialized.columns, vec!["a".to_string()]);
assert_eq!(deserialized.plan_columns, vec!["number".to_string()]);
assert_eq!(
deserialized.definition,
"CREATE VIEW test AS SELECT * FROM numbers"
);
}
}

View File

@@ -575,12 +575,12 @@ macro_rules! record_rds_sql_execute_elapsed {
.inspect(|_| {
$crate::metrics::RDS_SQL_EXECUTE_ELAPSED
.with_label_values(&[$label_store, "success", $label_op, $label_type])
.observe(timer.elapsed().as_millis_f64())
.observe(timer.elapsed().as_millis() as f64)
})
.inspect_err(|_| {
$crate::metrics::RDS_SQL_EXECUTE_ELAPSED
.with_label_values(&[$label_store, "error", $label_op, $label_type])
.observe(timer.elapsed().as_millis_f64());
.observe(timer.elapsed().as_millis() as f64);
})
}};
}

View File

@@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
#![feature(duration_millis_float)]
pub mod cache;
pub mod cache_invalidator;
pub mod cluster;

View File

@@ -187,7 +187,7 @@ impl<T> PaginationStream<T> {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::collections::BTreeMap;
use std::sync::Arc;

View File

@@ -84,7 +84,7 @@ impl State for UpdateTableInfos {
.persistent_ctx
.update_table_infos
.iter()
.zip(table_info_values.into_iter())
.zip(table_info_values)
{
let new_table_info = Self::build_new_table_info(
*table_id,

View File

@@ -949,7 +949,7 @@ impl Display for ReconcileTableMetrics {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;

View File

@@ -337,7 +337,7 @@ impl Inner {
#[cfg(test)]
mod tests {
use std::any::Any;
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::collections::HashSet;
use std::sync::Arc;

View File

@@ -355,7 +355,7 @@ impl MetadataSnapshotManager {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::sync::Arc;
use common_test_util::temp_dir::{TempDir, create_temp_dir};

View File

@@ -380,9 +380,8 @@ impl PoisonStore for KvStateStore {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::env;
use std::sync::Arc;
use std::{assert_matches, env};
use common_procedure::store::state_store::KeyValue;
use common_telemetry::info;

View File

@@ -172,7 +172,7 @@ pub fn extract_topic_from_wal_options(
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::config::kafka::common::KafkaTopicConfig;

View File

@@ -136,7 +136,7 @@ impl KafkaTopicPool {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use common_wal::maybe_skip_kafka_integration_test;
use common_wal::test_util::get_kafka_endpoints;

View File

@@ -14,8 +14,6 @@
//! Common traits and structures for the procedure framework.
#![feature(assert_matches)]
pub mod error;
pub mod event;
pub mod local;

View File

@@ -920,7 +920,7 @@ pub(crate) mod test_util {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use common_error::mock::MockError;
use common_error::status_code::StatusCode;

View File

@@ -704,7 +704,7 @@ impl Runner {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

View File

@@ -57,7 +57,7 @@ fn merge_multiple_values(
let (key, value) = pairs.into_iter().next().unwrap();
let prefix = KeySet::with_prefix(&key);
let mut parsed_segments = parse_segments(segments, &prefix)?;
parsed_segments.sort_unstable_by(|a, b| a.0.cmp(&b.0));
parsed_segments.sort_unstable_by_key(|a| a.0);
// Safety: `parsed_segments` must larger than 0.
let segment_num = parsed_segments.last().unwrap().0;
@@ -133,7 +133,7 @@ pub fn multiple_value_stream(
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use futures::TryStreamExt;
use futures::stream::{self};

View File

@@ -48,7 +48,7 @@ use crate::error::{ArrowComputeSnafu, Result, ToArrowScalarSnafu, UnsupportedOpe
///
/// This struct contains normalized predicate expr. In the form of
/// `col` `op` `literal` where the `col` is provided from input.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SimpleFilterEvaluator {
/// Name of the referenced column.
column_name: String,

View File

@@ -437,7 +437,8 @@ impl fmt::Debug for QueryMemoryTracker {
.field("limit", &self.limit())
.field("on_exhausted_policy", &self.on_exhausted_policy)
.field("on_update", &self.metrics.has_on_update())
.field("on_reject", &self.metrics.has_on_reject())
.field("on_exhausted", &self.metrics.has_on_exhausted())
.field("on_rejected", &self.metrics.has_on_rejected())
.finish()
}
}
@@ -452,6 +453,7 @@ impl QueryMemoryTracker {
limit,
on_exhausted_policy,
on_update: None,
on_exhausted: None,
on_reject: None,
}
}
@@ -483,12 +485,16 @@ impl QueryMemoryTracker {
"{} requested, {} used globally ({}%), {} used by this stream, hard limit: {}",
ReadableSize(additional as u64),
ReadableSize(current as u64),
if limit > 0 { current * 100 / limit } else { 0 },
(current * 100).checked_div(limit).unwrap_or(0),
ReadableSize(stream_tracked as u64),
ReadableSize(limit as u64)
);
error::ExceedMemoryLimitSnafu { msg }.build()
}
fn inc_rejected(&self) {
self.metrics.inc_rejected();
}
}
/// Builder for constructing a [`QueryMemoryTracker`] with optional callbacks.
@@ -496,6 +502,7 @@ pub struct QueryMemoryTrackerBuilder {
limit: usize,
on_exhausted_policy: OnExhaustedPolicy,
on_update: Option<UpdateCallback>,
on_exhausted: Option<UnitCallback>,
on_reject: Option<RejectCallback>,
}
@@ -514,11 +521,21 @@ impl QueryMemoryTrackerBuilder {
self
}
/// Set a callback to be called when memory allocation is rejected.
/// Set a callback to be called when memory is unavailable for immediate acquisition.
///
/// # Note
/// This is only called when `track()` fails due to exceeding the limit.
/// This is called when the non-blocking allocation fast path fails.
/// Requests using `OnExhaustedPolicy::Wait` may still succeed after waiting.
/// It is never called when `limit == 0` (unlimited mode).
pub fn on_exhausted<F>(mut self, on_exhausted: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
self.on_exhausted = Some(Arc::new(on_exhausted));
self
}
/// Set a callback to be called when the request ultimately fails due to memory pressure.
pub fn on_reject<F>(mut self, on_reject: F) -> Self
where
F: Fn() + Send + Sync + 'static,
@@ -529,7 +546,7 @@ impl QueryMemoryTrackerBuilder {
/// Build a [`QueryMemoryTracker`] from this builder.
pub fn build(self) -> QueryMemoryTracker {
let metrics = CallbackMemoryMetrics::new(self.on_update, self.on_reject);
let metrics = CallbackMemoryMetrics::new(self.on_update, self.on_exhausted, self.on_reject);
let manager = MemoryManager::with_granularity(
self.limit as u64,
PermitGranularity::Kilobyte,
@@ -553,6 +570,10 @@ struct StreamMemoryTracker {
type MemoryAcquireResult = std::result::Result<(), common_memory_manager::Error>;
impl StreamMemoryTracker {
fn inc_rejected(&self) {
self.tracker.inc_rejected();
}
fn try_track(&mut self, additional: usize) -> Result<()> {
if self.guard.try_acquire_additional(additional as u64) {
self.tracked_bytes = self.tracked_bytes.saturating_add(additional);
@@ -592,7 +613,7 @@ impl StreamMemoryTracker {
waited,
ReadableSize(additional as u64),
ReadableSize(current as u64),
if limit > 0 { current * 100 / limit } else { 0 },
(current * 100).checked_div(limit).unwrap_or(0),
ReadableSize(self.tracked_bytes as u64),
ReadableSize(limit as u64)
);
@@ -613,18 +634,25 @@ struct CallbackMemoryMetrics {
}
type UpdateCallback = Arc<dyn Fn(usize) + Send + Sync>;
type RejectCallback = Arc<dyn Fn() + Send + Sync>;
type UnitCallback = Arc<dyn Fn() + Send + Sync>;
type RejectCallback = UnitCallback;
struct CallbackMemoryMetricsInner {
on_update: Option<UpdateCallback>,
on_exhausted: Option<UnitCallback>,
on_reject: Option<RejectCallback>,
}
impl CallbackMemoryMetrics {
fn new(on_update: Option<UpdateCallback>, on_reject: Option<RejectCallback>) -> Self {
fn new(
on_update: Option<UpdateCallback>,
on_exhausted: Option<UnitCallback>,
on_reject: Option<RejectCallback>,
) -> Self {
Self {
inner: Arc::new(CallbackMemoryMetricsInner {
on_update,
on_exhausted,
on_reject,
}),
}
@@ -634,9 +662,19 @@ impl CallbackMemoryMetrics {
self.inner.on_update.is_some()
}
fn has_on_reject(&self) -> bool {
fn has_on_exhausted(&self) -> bool {
self.inner.on_exhausted.is_some()
}
fn has_on_rejected(&self) -> bool {
self.inner.on_reject.is_some()
}
fn inc_rejected(&self) {
if let Some(callback) = &self.inner.on_reject {
callback();
}
}
}
impl MemoryMetrics for CallbackMemoryMetrics {
@@ -648,8 +686,8 @@ impl MemoryMetrics for CallbackMemoryMetrics {
}
}
fn inc_rejected(&self, _: &str) {
if let Some(callback) = &self.inner.on_reject {
fn inc_exhausted(&self, _: &str) {
if let Some(callback) = &self.inner.on_exhausted {
callback();
}
}
@@ -712,7 +750,10 @@ impl MemoryTrackedStream {
Poll::Ready((tracker, batch, additional, result)) => {
let output = match result {
Ok(()) => Ok(batch),
Err(error) => Err(tracker.wait_error(additional, error)),
Err(error) => {
tracker.inc_rejected();
Err(tracker.wait_error(additional, error))
}
};
self.waiting = None;
self.tracker = Some(tracker);
@@ -732,7 +773,10 @@ impl MemoryTrackedStream {
if let Err(error) = tracker.try_track(additional) {
match tracker.tracker.on_exhausted_policy {
OnExhaustedPolicy::Fail => return Poll::Ready(Some(Err(error))),
OnExhaustedPolicy::Fail => {
tracker.inc_rejected();
return Poll::Ready(Some(Err(error)));
}
// `Wait` is a deliberate tradeoff: the batch has already been materialized, so we
// keep it in memory while waiting for quota instead of failing immediately. Under
// contention, real memory usage can therefore exceed `scan_memory_limit` by up to
@@ -786,6 +830,7 @@ impl RecordBatchStream for MemoryTrackedStream {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use common_memory_manager::{OnExhaustedPolicy, PermitGranularity};
@@ -988,12 +1033,22 @@ mod tests {
#[tokio::test]
async fn test_memory_tracked_stream_waits_for_capacity() {
let exhausted = Arc::new(AtomicUsize::new(0));
let rejected = Arc::new(AtomicUsize::new(0));
let exhausted_counter = exhausted.clone();
let rejected_counter = rejected.clone();
let tracker = QueryMemoryTracker::builder(
MB,
OnExhaustedPolicy::Wait {
timeout: Duration::from_millis(200),
},
)
.on_exhausted(move || {
exhausted_counter.fetch_add(1, Ordering::Relaxed);
})
.on_reject(move || {
rejected_counter.fetch_add(1, Ordering::Relaxed);
})
.build();
let batch = large_string_batch(700 * 1024);
let expected_bytes = aligned_tracked_bytes(batch.buffer_memory_size());
@@ -1025,16 +1080,28 @@ mod tests {
drop(stream1);
let second = waiter.await.unwrap().unwrap();
assert_eq!(second.num_rows(), 1);
assert_eq!(exhausted.load(Ordering::Relaxed), 1);
assert_eq!(rejected.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn test_memory_tracked_stream_wait_times_out() {
let exhausted = Arc::new(AtomicUsize::new(0));
let rejected = Arc::new(AtomicUsize::new(0));
let exhausted_counter = exhausted.clone();
let rejected_counter = rejected.clone();
let tracker = QueryMemoryTracker::builder(
MB,
OnExhaustedPolicy::Wait {
timeout: Duration::from_millis(50),
},
)
.on_exhausted(move || {
exhausted_counter.fetch_add(1, Ordering::Relaxed);
})
.on_reject(move || {
rejected_counter.fetch_add(1, Ordering::Relaxed);
})
.build();
let batch = large_string_batch(700 * 1024);
@@ -1058,5 +1125,44 @@ mod tests {
.unwrap();
let error = result.unwrap().unwrap_err();
assert!(error.to_string().contains("timed out waiting"));
assert_eq!(exhausted.load(Ordering::Relaxed), 1);
assert_eq!(rejected.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn test_memory_tracked_stream_fail_policy_rejects_immediately() {
let exhausted = Arc::new(AtomicUsize::new(0));
let rejected = Arc::new(AtomicUsize::new(0));
let exhausted_counter = exhausted.clone();
let rejected_counter = rejected.clone();
let tracker = QueryMemoryTracker::builder(MB, OnExhaustedPolicy::Fail)
.on_exhausted(move || {
exhausted_counter.fetch_add(1, Ordering::Relaxed);
})
.on_reject(move || {
rejected_counter.fetch_add(1, Ordering::Relaxed);
})
.build();
let batch = large_string_batch(700 * 1024);
let mut stream1 = MemoryTrackedStream::new(
RecordBatches::try_new(batch.schema.clone(), vec![batch.clone()])
.unwrap()
.as_stream(),
tracker.clone(),
);
let first = stream1.next().await.unwrap().unwrap();
assert_eq!(first.num_rows(), 1);
let mut stream2 = MemoryTrackedStream::new(
RecordBatches::try_new(batch.schema.clone(), vec![batch])
.unwrap()
.as_stream(),
tracker,
);
let result = stream2.next().await.unwrap();
assert!(result.is_err());
assert_eq!(exhausted.load(Ordering::Relaxed), 1);
assert_eq!(rejected.load(Ordering::Relaxed), 1);
}
}

View File

@@ -437,7 +437,7 @@ fn maybe_align_json_array_with_schema(
}
let mut aligned = Vec::with_capacity(arrays.len());
for (field, array) in schema.fields().iter().zip(arrays.into_iter()) {
for (field, array) in schema.fields().iter().zip(arrays) {
if !is_json_extension_type(field) {
aligned.push(array);
continue;

View File

@@ -122,7 +122,7 @@ pub fn parse_column_default_constraint(
#[cfg(test)]
mod test {
use std::assert_matches::assert_matches;
use std::assert_matches;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::types::BooleanType;

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
pub mod convert;
pub mod default_constraint;
pub mod error;

View File

@@ -21,10 +21,12 @@ mod panic_hook;
pub mod tracing_context;
mod tracing_sampler;
pub use common_error;
pub use logging::{
LOG_RELOAD_HANDLE, TRACE_RELOAD_HANDLE, get_or_init_tracer, init_default_ut_logging,
init_global_logging,
};
pub use metric::dump_metrics;
pub use panic_hook::set_panic_hook;
pub use {common_error, tracing, tracing_subscriber};
pub use tracing;
pub use tracing_subscriber;

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
use std::net::SocketAddr;
use error::{EndpointIPV4NotFoundSnafu, ResolveEndpointSnafu, Result};
@@ -59,7 +57,7 @@ async fn resolve_to_ipv4_one<T: AsRef<str>>(endpoint: T) -> Result<String> {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use common_telemetry::warn;
use rskafka::client::{Credentials, SaslConfig};

View File

@@ -551,14 +551,15 @@ impl DatanodeBuilder {
if kafka_config.create_index && opts.node_id.is_none() {
warn!("The WAL index creation only available in distributed mode.")
}
let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
let global_index_collector = if kafka_config.create_index
&& let Some(node_id) = opts.node_id
{
let operator = new_object_store_without_cache(
&opts.storage.store,
&opts.storage.data_home,
)
.await?;
let path = default_index_file(opts.node_id.unwrap());
let path = default_index_file(node_id);
Some(Self::build_global_index_collector(
kafka_config.dump_index_interval,
operator,
@@ -782,7 +783,7 @@ async fn open_all_regions(
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

View File

@@ -295,7 +295,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

View File

@@ -47,7 +47,7 @@ impl InstructionHandler for CloseRegionsHandler {
let results = join_all(futs).await;
let mut errors = vec![];
for (region_id, result) in region_ids.into_iter().zip(results.into_iter()) {
for (region_id, result) in region_ids.into_iter().zip(results) {
match result {
Ok(_) => (),
Err(error::Error::RegionNotFound { .. }) => {
@@ -79,7 +79,6 @@ mod tests {
use std::assert_matches;
use std::sync::Arc;
use assert_matches::assert_matches;
use common_meta::RegionIdent;
use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
use common_meta::heartbeat::mailbox::MessageMeta;

View File

@@ -225,7 +225,7 @@ impl HandlerContext {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::sync::Arc;
use std::time::Duration;

View File

@@ -72,7 +72,7 @@ impl InstructionHandler for OpenRegionsHandler {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;

View File

@@ -183,9 +183,10 @@ impl UpgradeRegionsHandler {
.await
{
Ok(responses) => {
replies.extend(
Self::convert_responses_to_replies(responses, &catchup_regions).into_iter(),
);
replies.extend(Self::convert_responses_to_replies(
responses,
&catchup_regions,
));
}
Err(_) => {
replies.extend(catchup_regions.iter().map(|region_id| UpgradeRegionReply {

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
pub mod alive_keeper;
pub mod config;
pub mod datanode;

View File

@@ -1667,7 +1667,7 @@ impl RegionAttribute {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::assert_matches;
use api::v1::SemanticType;
use common_error::ext::ErrorExt;

View File

@@ -426,7 +426,7 @@ fn decode_struct_with_context<'a>(
let (items, fields) = struct_value.into_parts();
for (field, field_value) in fields.fields().iter().zip(items.into_iter()) {
for (field, field_value) in fields.fields().iter().zip(items) {
let field_context = context.with_key(field.name());
let json_value = decode_value_with_context(field_value, &field_context)?;
json_object.insert(field.name().to_string(), json_value);
@@ -561,7 +561,7 @@ fn decode_struct_with_settings<'a>(
// Process each field in the struct value
let (struct_data, fields) = struct_value.into_parts();
for (field, value) in fields.fields().iter().zip(struct_data.into_iter()) {
for (field, value) in fields.fields().iter().zip(struct_data) {
let field_context = context.with_key(field.name());
// Check if this field should be treated as unstructured

View File

@@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
#![feature(box_patterns)]
pub mod arrow_array;
pub mod data_type;
pub mod duration;

View File

@@ -773,7 +773,7 @@ mod tests {
r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}, that: "<Number>""#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}, that: ["<Number>"]"#,
];
for (json, expect) in jsons.into_iter().zip(expects.into_iter()) {
for (json, expect) in jsons.into_iter().zip(expects) {
test(json, json_type, Err(expect))?;
}

View File

@@ -922,7 +922,7 @@ impl TryFrom<Value> for serde_json::Value {
let map = struct_type
.fields()
.iter()
.zip(items.into_iter())
.zip(items)
.map(|(field, value)| {
Ok((
field.name().to_string(),
@@ -2723,26 +2723,26 @@ pub(crate) mod tests {
.unwrap()
);
assert_eq!(
ScalarValue::UInt8(Some(u8::MIN + 1)),
Value::UInt8(u8::MIN + 1)
ScalarValue::UInt8(Some(1)),
Value::UInt8(1)
.try_to_scalar_value(&ConcreteDataType::uint8_datatype())
.unwrap()
);
assert_eq!(
ScalarValue::UInt16(Some(u16::MIN + 2)),
Value::UInt16(u16::MIN + 2)
ScalarValue::UInt16(Some(2)),
Value::UInt16(2)
.try_to_scalar_value(&ConcreteDataType::uint16_datatype())
.unwrap()
);
assert_eq!(
ScalarValue::UInt32(Some(u32::MIN + 3)),
Value::UInt32(u32::MIN + 3)
ScalarValue::UInt32(Some(3)),
Value::UInt32(3)
.try_to_scalar_value(&ConcreteDataType::uint32_datatype())
.unwrap()
);
assert_eq!(
ScalarValue::UInt64(Some(u64::MIN + 4)),
Value::UInt64(u64::MIN + 4)
ScalarValue::UInt64(Some(4)),
Value::UInt64(4)
.try_to_scalar_value(&ConcreteDataType::uint64_datatype())
.unwrap()
);

Some files were not shown because too many files have changed in this diff Show More