Compare commits

..

1 Commits

Author SHA1 Message Date
discord9
13582c9efb bytes trace
Signed-off-by: discord9 <discord9@163.com>
2025-11-04 11:19:07 +08:00
173 changed files with 1769 additions and 7775 deletions

View File

@@ -48,22 +48,22 @@ runs:
path: /tmp/greptime-*.log
retention-days: 3
# - name: Build greptime # Builds standard greptime binary
# uses: ./.github/actions/build-greptime-binary
# with:
# base-image: ubuntu
# features: servers/dashboard
# cargo-profile: ${{ inputs.cargo-profile }}
# artifacts-dir: greptime-linux-${{ inputs.arch }}-${{ inputs.version }}
# version: ${{ inputs.version }}
# working-dir: ${{ inputs.working-dir }}
# image-registry: ${{ inputs.image-registry }}
# image-namespace: ${{ inputs.image-namespace }}
- name: Build greptime # Builds standard greptime binary
uses: ./.github/actions/build-greptime-binary
with:
base-image: ubuntu
features: servers/dashboard
cargo-profile: ${{ inputs.cargo-profile }}
artifacts-dir: greptime-linux-${{ inputs.arch }}-${{ inputs.version }}
version: ${{ inputs.version }}
working-dir: ${{ inputs.working-dir }}
image-registry: ${{ inputs.image-registry }}
image-namespace: ${{ inputs.image-namespace }}
# - name: Clean up the target directory # Clean up the target directory for the centos7 base image, or it will still use the objects of last build.
# shell: bash
# run: |
# rm -rf ./target/
- name: Clean up the target directory # Clean up the target directory for the centos7 base image, or it will still use the objects of last build.
shell: bash
run: |
rm -rf ./target/
- name: Build greptime on centos base image
uses: ./.github/actions/build-greptime-binary
@@ -78,14 +78,14 @@ runs:
image-registry: ${{ inputs.image-registry }}
image-namespace: ${{ inputs.image-namespace }}
# - name: Build greptime on android base image
# uses: ./.github/actions/build-greptime-binary
# if: ${{ inputs.arch == 'amd64' && inputs.dev-mode == 'false' }} # Builds arm64 greptime binary for android if the host machine amd64.
# with:
# base-image: android
# artifacts-dir: greptime-android-arm64-${{ inputs.version }}
# version: ${{ inputs.version }}
# working-dir: ${{ inputs.working-dir }}
# build-android-artifacts: true
# image-registry: ${{ inputs.image-registry }}
# image-namespace: ${{ inputs.image-namespace }}
- name: Build greptime on android base image
uses: ./.github/actions/build-greptime-binary
if: ${{ inputs.arch == 'amd64' && inputs.dev-mode == 'false' }} # Builds arm64 greptime binary for android if the host machine amd64.
with:
base-image: android
artifacts-dir: greptime-android-arm64-${{ inputs.version }}
version: ${{ inputs.version }}
working-dir: ${{ inputs.working-dir }}
build-android-artifacts: true
image-registry: ${{ inputs.image-registry }}
image-namespace: ${{ inputs.image-namespace }}

View File

@@ -177,7 +177,7 @@ jobs:
cargo-profile: ${{ env.CARGO_PROFILE }}
version: ${{ needs.allocate-runners.outputs.version }}
disable-run-tests: ${{ env.DISABLE_RUN_TESTS }}
dev-mode: false # Only build the standard greptime binary.
dev-mode: true # Only build the standard greptime binary.
working-dir: ${{ env.CHECKOUT_GREPTIMEDB_PATH }}
image-registry: ${{ vars.ECR_IMAGE_REGISTRY }}
image-namespace: ${{ vars.ECR_IMAGE_NAMESPACE }}

View File

@@ -613,9 +613,6 @@ jobs:
- name: "MySQL Kvbackend"
opts: "--setup-mysql"
kafka: false
- name: "Flat format"
opts: "--enable-flat-format"
kafka: false
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -811,7 +808,7 @@ jobs:
- name: Setup external services
working-directory: tests-integration/fixtures
run: ../../.github/scripts/pull-test-deps-images.sh && docker compose up -d --wait
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend -F mysql_kvbackend
env:

View File

@@ -92,6 +92,5 @@ jobs:
mode:
- name: "Basic"
- name: "Remote WAL"
- name: "Flat format"
steps:
- run: 'echo "No action required"'

67
Cargo.lock generated
View File

@@ -214,7 +214,6 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
name = "api"
version = "0.18.0"
dependencies = [
"arrow-schema",
"common-base",
"common-decimal",
"common-error",
@@ -1337,9 +1336,13 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
source = "git+https://github.com/discord9/bytes?rev=1572ab22c3cbad0e9b6681d1f68eca4139322a2a#1572ab22c3cbad0e9b6681d1f68eca4139322a2a"
dependencies = [
"backtrace",
"crossbeam-channel",
"inferno 0.12.2",
"papaya",
"quanta",
"serde",
]
@@ -1630,7 +1633,6 @@ dependencies = [
"chrono",
"chrono-tz-build",
"phf 0.11.3",
"uncased",
]
[[package]]
@@ -1641,8 +1643,6 @@ checksum = "8f10f8c9340e31fc120ff885fcdb54a0b48e474bbd77cab557f0c30a3e569402"
dependencies = [
"parse-zoneinfo",
"phf_codegen 0.11.3",
"phf_shared 0.11.3",
"uncased",
]
[[package]]
@@ -7508,7 +7508,6 @@ dependencies = [
"common-telemetry",
"common-test-util",
"common-time",
"common-wal",
"datafusion",
"datatypes",
"futures-util",
@@ -8867,6 +8866,16 @@ dependencies = [
"unicode-width 0.1.14",
]
[[package]]
name = "papaya"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f92dd0b07c53a0a0c764db2ace8c541dc47320dad97c2200c2a637ab9dd2328f"
dependencies = [
"equivalent",
"seize",
]
[[package]]
name = "parking"
version = "2.2.1"
@@ -9276,7 +9285,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5"
dependencies = [
"siphasher",
"uncased",
]
[[package]]
@@ -10101,6 +10109,21 @@ dependencies = [
"variadics",
]
[[package]]
name = "quanta"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7"
dependencies = [
"crossbeam-utils",
"libc",
"once_cell",
"raw-cpuid",
"wasi 0.11.1+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "query"
version = "0.18.0"
@@ -10400,6 +10423,15 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "raw-cpuid"
version = "11.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186"
dependencies = [
"bitflags 2.9.1",
]
[[package]]
name = "rawpointer"
version = "0.2.1"
@@ -11340,6 +11372,16 @@ dependencies = [
"libc",
]
[[package]]
name = "seize"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b55fb86dfd3a2f5f76ea78310a88f96c4ea21a3031f8d212443d56123fd0521"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "semver"
version = "1.0.26"
@@ -13976,15 +14018,6 @@ dependencies = [
"serde",
]
[[package]]
name = "uncased"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697"
dependencies = [
"version_check",
]
[[package]]
name = "unescaper"
version = "0.1.6"

View File

@@ -118,7 +118,7 @@ bitflags = "2.4.1"
bytemuck = "1.12"
bytes = { version = "1.7", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = { version = "0.10.1", features = ["case-insensitive"] }
chrono-tz = "0.10.1"
clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
const_format = "0.2"
@@ -219,7 +219,12 @@ similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sqlparser = { version = "0.58.0", default-features = false, features = ["std", "visitor", "serde"] }
sqlx = { version = "0.8", default-features = false, features = ["any", "macros", "json", "runtime-tokio-rustls"] }
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls",
"mysql",
"postgres",
"chrono",
] }
strum = { version = "0.27", features = ["derive"] }
sysinfo = "0.33"
tempfile = "3"
@@ -328,6 +333,7 @@ datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git"
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "4b519a5caa95472cc3988f5556813a583dd35af1" } # branch = "v0.58.x"
bytes = { git = "https://github.com/discord9/bytes", rev = "1572ab22c3cbad0e9b6681d1f68eca4139322a2a" }
[profile.release]
debug = 1

View File

@@ -12,6 +12,7 @@
<div align="center">
<h3 align="center">
<a href="https://greptime.com/product/cloud">GreptimeCloud</a> |
<a href="https://docs.greptime.com/">User Guide</a> |
<a href="https://greptimedb.rs/">API Docs</a> |
<a href="https://github.com/GreptimeTeam/greptimedb/issues/5446">Roadmap 2025</a>
@@ -104,6 +105,16 @@ Read [more benchmark reports](https://docs.greptime.com/user-guide/concepts/feat
## Try GreptimeDB
### 1. [Live Demo](https://greptime.com/playground)
Experience GreptimeDB directly in your browser.
### 2. [GreptimeCloud](https://console.greptime.cloud/)
Start instantly with a free cluster.
### 3. Docker (Local Quickstart)
```shell
docker pull greptime/greptimedb
```

View File

@@ -92,6 +92,9 @@ curl -X POST localhost:4000/debug/prof/mem > greptime.hprof
curl -X POST "localhost:4000/debug/prof/mem?output=flamegraph" > greptime.svg
# or output pprof format
curl -X POST "localhost:4000/debug/prof/mem?output=proto" > greptime.pprof
curl -X POST "localhost:4000/debug/prof/bytes" > greptime.svg
```
You can periodically dump profiling data and compare them to find the delta memory usage.

View File

@@ -8,7 +8,6 @@ license.workspace = true
workspace = true
[dependencies]
arrow-schema.workspace = true
common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true

View File

@@ -14,11 +14,10 @@
use std::collections::HashMap;
use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY};
use datatypes::schema::{
COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions,
SkippingIndexType,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, JSON_STRUCTURE_SETTINGS_KEY,
SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType,
};
use greptime_proto::v1::{
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,
@@ -69,14 +68,8 @@ pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
if let Some(skipping_index) = options.options.get(SKIPPING_INDEX_GRPC_KEY) {
metadata.insert(SKIPPING_INDEX_KEY.to_string(), skipping_index.to_owned());
}
if let Some(extension_name) = options.options.get(EXTENSION_TYPE_NAME_KEY) {
metadata.insert(EXTENSION_TYPE_NAME_KEY.to_string(), extension_name.clone());
}
if let Some(extension_metadata) = options.options.get(EXTENSION_TYPE_METADATA_KEY) {
metadata.insert(
EXTENSION_TYPE_METADATA_KEY.to_string(),
extension_metadata.clone(),
);
if let Some(settings) = options.options.get(JSON_STRUCTURE_SETTINGS_KEY) {
metadata.insert(JSON_STRUCTURE_SETTINGS_KEY.to_string(), settings.clone());
}
}
@@ -149,16 +142,10 @@ pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<Column
.options
.insert(SKIPPING_INDEX_GRPC_KEY.to_string(), skipping_index.clone());
}
if let Some(extension_name) = column_schema.metadata().get(EXTENSION_TYPE_NAME_KEY) {
if let Some(settings) = column_schema.metadata().get(JSON_STRUCTURE_SETTINGS_KEY) {
options
.options
.insert(EXTENSION_TYPE_NAME_KEY.to_string(), extension_name.clone());
}
if let Some(extension_metadata) = column_schema.metadata().get(EXTENSION_TYPE_METADATA_KEY) {
options.options.insert(
EXTENSION_TYPE_METADATA_KEY.to_string(),
extension_metadata.clone(),
);
.insert(JSON_STRUCTURE_SETTINGS_KEY.to_string(), settings.clone());
}
(!options.options.is_empty()).then_some(options)

View File

@@ -97,6 +97,7 @@ lazy_static! {
ROUTINES,
SCHEMA_PRIVILEGES,
TABLE_PRIVILEGES,
TRIGGERS,
GLOBAL_STATUS,
SESSION_STATUS,
PARTITIONS,
@@ -206,6 +207,7 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
ROUTINES => setup_memory_table!(ROUTINES),
SCHEMA_PRIVILEGES => setup_memory_table!(SCHEMA_PRIVILEGES),
TABLE_PRIVILEGES => setup_memory_table!(TABLE_PRIVILEGES),
TRIGGERS => setup_memory_table!(TRIGGERS),
GLOBAL_STATUS => setup_memory_table!(GLOBAL_STATUS),
SESSION_STATUS => setup_memory_table!(SESSION_STATUS),
KEY_COLUMN_USAGE => Some(Arc::new(InformationSchemaKeyColumnUsage::new(

View File

@@ -15,7 +15,8 @@
use std::sync::Arc;
use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{Int64Vector, StringVector, VectorRef};
use crate::system_schema::information_schema::table_names::*;
@@ -365,6 +366,16 @@ pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>
vec![],
),
TRIGGERS => (
vec![
string_column("TRIGGER_NAME"),
ColumnSchema::new("trigger_id", ConcreteDataType::uint64_datatype(), false),
string_column("TRIGGER_DEFINITION"),
ColumnSchema::new("flownode_id", ConcreteDataType::uint64_datatype(), true),
],
vec![],
),
// TODO: Considering store internal metrics in `global_status` and
// `session_status` tables.
GLOBAL_STATUS => (

View File

@@ -23,8 +23,6 @@ use crate::Tool;
use crate::data::export::ExportCommand;
use crate::data::import::ImportCommand;
pub(crate) const COPY_PATH_PLACEHOLDER: &str = "<PATH/TO/FILES>";
/// Command for data operations including exporting data from and importing data into GreptimeDB.
#[derive(Subcommand)]
pub enum DataCommand {

View File

@@ -30,7 +30,7 @@ use snafu::{OptionExt, ResultExt};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::data::{COPY_PATH_PLACEHOLDER, default_database};
use crate::data::default_database;
use crate::database::{DatabaseClient, parse_proxy_opts};
use crate::error::{
EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu,
@@ -668,26 +668,10 @@ impl Export {
);
// Create copy_from.sql file
let copy_database_from_sql = {
let command_without_connection = format!(
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({});"#,
export_self.catalog, schema, COPY_PATH_PLACEHOLDER, with_options_clone
);
if connection_part.is_empty() {
command_without_connection
} else {
let command_with_connection = format!(
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
export_self.catalog, schema, path, with_options_clone, connection_part
);
format!(
"-- {}\n{}",
command_with_connection, command_without_connection
)
}
};
let copy_database_from_sql = format!(
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
export_self.catalog, schema, path, with_options_clone, connection_part
);
let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
export_self

View File

@@ -21,13 +21,13 @@ use clap::{Parser, ValueEnum};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_error::ext::BoxedError;
use common_telemetry::{error, info, warn};
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{OptionExt, ResultExt};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::data::{COPY_PATH_PLACEHOLDER, default_database};
use crate::data::default_database;
use crate::database::{DatabaseClient, parse_proxy_opts};
use crate::error::{Error, FileIoSnafu, InvalidArgumentsSnafu, Result, SchemaNotFoundSnafu};
use crate::error::{Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
use crate::{Tool, database};
#[derive(Debug, Default, Clone, ValueEnum)]
@@ -148,15 +148,12 @@ impl Import {
let _permit = semaphore_moved.acquire().await.unwrap();
let database_input_dir = self.catalog_path().join(&schema);
let sql_file = database_input_dir.join(filename);
let mut sql = tokio::fs::read_to_string(sql_file)
let sql = tokio::fs::read_to_string(sql_file)
.await
.context(FileIoSnafu)?;
if sql.trim().is_empty() {
if sql.is_empty() {
info!("Empty `{filename}` {database_input_dir:?}");
} else {
if filename == "copy_from.sql" {
sql = self.rewrite_copy_database_sql(&schema, &sql)?;
}
let db = exec_db.unwrap_or(&schema);
self.database_client.sql(&sql, db).await?;
info!("Imported `{filename}` for database {schema}");
@@ -229,57 +226,6 @@ impl Import {
}
Ok(db_names)
}
fn rewrite_copy_database_sql(&self, schema: &str, sql: &str) -> Result<String> {
let target_location = self.build_copy_database_location(schema);
let escaped_location = target_location.replace('\'', "''");
let mut first_stmt_checked = false;
for line in sql.lines() {
let trimmed = line.trim_start();
if trimmed.is_empty() || trimmed.starts_with("--") {
continue;
}
ensure!(
trimmed.starts_with("COPY DATABASE"),
InvalidArgumentsSnafu {
msg: "Expected COPY DATABASE statement at start of copy_from.sql"
}
);
first_stmt_checked = true;
break;
}
ensure!(
first_stmt_checked,
InvalidArgumentsSnafu {
msg: "COPY DATABASE statement not found in copy_from.sql"
}
);
ensure!(
sql.contains(COPY_PATH_PLACEHOLDER),
InvalidArgumentsSnafu {
msg: format!(
"Placeholder `{}` not found in COPY DATABASE statement",
COPY_PATH_PLACEHOLDER
)
}
);
Ok(sql.replacen(COPY_PATH_PLACEHOLDER, &escaped_location, 1))
}
fn build_copy_database_location(&self, schema: &str) -> String {
let mut path = self.catalog_path();
path.push(schema);
let mut path_str = path.to_string_lossy().into_owned();
if !path_str.ends_with('/') {
path_str.push('/');
}
path_str
}
}
#[async_trait]
@@ -295,52 +241,3 @@ impl Tool for Import {
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
fn build_import(input_dir: &str) -> Import {
Import {
catalog: "catalog".to_string(),
schema: None,
database_client: DatabaseClient::new(
"127.0.0.1:4000".to_string(),
"catalog".to_string(),
None,
Duration::from_secs(0),
None,
),
input_dir: input_dir.to_string(),
parallelism: 1,
target: ImportTarget::Data,
}
}
#[test]
fn rewrite_copy_database_sql_replaces_placeholder() {
let import = build_import("/tmp/export-path");
let comment = "-- COPY DATABASE \"catalog\".\"schema\" FROM 's3://bucket/demo/' WITH (format = 'parquet') CONNECTION (region = 'us-west-2')";
let sql = format!(
"{comment}\nCOPY DATABASE \"catalog\".\"schema\" FROM '{}' WITH (format = 'parquet');",
COPY_PATH_PLACEHOLDER
);
let rewritten = import.rewrite_copy_database_sql("schema", &sql).unwrap();
let expected_location = import.build_copy_database_location("schema");
let escaped = expected_location.replace('\'', "''");
assert!(rewritten.starts_with(comment));
assert!(rewritten.contains(&format!("FROM '{escaped}'")));
assert!(!rewritten.contains(COPY_PATH_PLACEHOLDER));
}
#[test]
fn rewrite_copy_database_sql_requires_placeholder() {
let import = build_import("/tmp/export-path");
let sql = "COPY DATABASE \"catalog\".\"schema\" FROM '/tmp/export-path/catalog/schema/' WITH (format = 'parquet');";
assert!(import.rewrite_copy_database_sql("schema", sql).is_err());
}
}

View File

@@ -20,9 +20,7 @@ use api::v1::health_check_client::HealthCheckClient;
use api::v1::prometheus_gateway_client::PrometheusGatewayClient;
use api::v1::region::region_client::RegionClient as PbRegionClient;
use arrow_flight::flight_service_client::FlightServiceClient;
use common_grpc::channel_manager::{
ChannelConfig, ChannelManager, ClientTlsOption, load_tls_config,
};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager, ClientTlsOption};
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt};
use tonic::codec::CompressionEncoding;
@@ -96,9 +94,8 @@ impl Client {
A: AsRef<[U]>,
{
let channel_config = ChannelConfig::default().client_tls_config(client_tls);
let tls_config = load_tls_config(channel_config.client_tls.as_ref())
let channel_manager = ChannelManager::with_tls_config(channel_config)
.context(error::CreateTlsChannelSnafu)?;
let channel_manager = ChannelManager::with_config(channel_config, tls_config);
Ok(Self::with_manager_and_urls(channel_manager, urls))
}

View File

@@ -74,7 +74,7 @@ impl FlownodeManager for NodeClients {
impl NodeClients {
pub fn new(config: ChannelConfig) -> Self {
Self {
channel_manager: ChannelManager::with_config(config, None),
channel_manager: ChannelManager::with_config(config),
clients: CacheBuilder::new(1024)
.time_to_live(Duration::from_secs(30 * 60))
.time_to_idle(Duration::from_secs(5 * 60))

View File

@@ -162,7 +162,6 @@ impl ObjbenchCommand {
file_size,
available_indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
num_rows,
num_row_groups,
sequence: None,

View File

@@ -45,19 +45,3 @@ pub fn from_err_code_msg_to_header(code: u32, msg: &str) -> HeaderMap {
header.insert(GREPTIME_DB_HEADER_ERROR_MSG, msg);
header
}
/// Returns the external root cause of the source error (exclude the current error).
pub fn root_source(err: &dyn std::error::Error) -> Option<&dyn std::error::Error> {
// There are some divergence about the behavior of the `sources()` API
// in https://github.com/rust-lang/rust/issues/58520
// So this function iterates the sources manually.
let mut root = err.source();
while let Some(r) = root {
if let Some(s) = r.source() {
root = Some(s);
} else {
break;
}
}
root
}

View File

@@ -104,7 +104,7 @@ impl MetaClientSelector {
let cfg = ChannelConfig::new()
.connect_timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(30));
let channel_manager = ChannelManager::with_config(cfg, None);
let channel_manager = ChannelManager::with_config(cfg);
Self {
meta_client,
channel_manager,

View File

@@ -12,12 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::aggrs::vector::avg::VectorAvg;
use crate::aggrs::vector::product::VectorProduct;
use crate::aggrs::vector::sum::VectorSum;
use crate::function_registry::FunctionRegistry;
mod avg;
mod product;
mod sum;
@@ -27,6 +25,5 @@ impl VectorFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(VectorSum::uadf_impl());
registry.register_aggr(VectorProduct::uadf_impl());
registry.register_aggr(VectorAvg::uadf_impl());
}
}

View File

@@ -1,270 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::sync::Arc;
use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, LargeStringArray, StringArray};
use arrow::compute::sum;
use arrow::datatypes::UInt64Type;
use arrow_schema::{DataType, Field};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{
Accumulator, AggregateUDF, Signature, SimpleAggregateUDF, TypeSignature, Volatility,
};
use datafusion_functions_aggregate_common::accumulator::AccumulatorArgs;
use nalgebra::{Const, DVector, DVectorView, Dyn, OVector};
use crate::scalars::vector::impl_conv::{
binlit_as_veclit, parse_veclit_from_strlit, veclit_to_binlit,
};
/// The accumulator for the `vec_avg` aggregate function.
#[derive(Debug, Default)]
pub struct VectorAvg {
sum: Option<OVector<f32, Dyn>>,
count: u64,
}
impl VectorAvg {
/// Create a new `AggregateUDF` for the `vec_avg` aggregate function.
pub fn uadf_impl() -> AggregateUDF {
let signature = Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Utf8]),
TypeSignature::Exact(vec![DataType::LargeUtf8]),
TypeSignature::Exact(vec![DataType::Binary]),
],
Volatility::Immutable,
);
let udaf = SimpleAggregateUDF::new_with_signature(
"vec_avg",
signature,
DataType::Binary,
Arc::new(Self::accumulator),
vec![
Arc::new(Field::new("sum", DataType::Binary, true)),
Arc::new(Field::new("count", DataType::UInt64, true)),
],
);
AggregateUDF::from(udaf)
}
fn accumulator(args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
if args.schema.fields().len() != 1 {
return Err(datafusion_common::DataFusionError::Internal(format!(
"expect creating `VEC_AVG` with only one input field, actual {}",
args.schema.fields().len()
)));
}
let t = args.schema.field(0).data_type();
if !matches!(t, DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary) {
return Err(datafusion_common::DataFusionError::Internal(format!(
"unexpected input datatype {t} when creating `VEC_AVG`"
)));
}
Ok(Box::new(VectorAvg::default()))
}
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
self.sum
.get_or_insert_with(|| OVector::zeros_generic(Dyn(len), Const::<1>))
}
fn update(&mut self, values: &[ArrayRef], is_update: bool) -> Result<()> {
if values.is_empty() {
return Ok(());
};
let vectors = match values[0].data_type() {
DataType::Utf8 => {
let arr: &StringArray = values[0].as_string();
arr.iter()
.filter_map(|x| x.map(|s| parse_veclit_from_strlit(s).map_err(Into::into)))
.map(|x| x.map(Cow::Owned))
.collect::<Result<Vec<_>>>()?
}
DataType::LargeUtf8 => {
let arr: &LargeStringArray = values[0].as_string();
arr.iter()
.filter_map(|x| x.map(|s| parse_veclit_from_strlit(s).map_err(Into::into)))
.map(|x: Result<Vec<f32>>| x.map(Cow::Owned))
.collect::<Result<Vec<_>>>()?
}
DataType::Binary => {
let arr: &BinaryArray = values[0].as_binary();
arr.iter()
.filter_map(|x| x.map(|b| binlit_as_veclit(b).map_err(Into::into)))
.collect::<Result<Vec<_>>>()?
}
_ => {
return Err(datafusion_common::DataFusionError::NotImplemented(format!(
"unsupported data type {} for `VEC_AVG`",
values[0].data_type()
)));
}
};
if vectors.is_empty() {
return Ok(());
}
let len = if is_update {
vectors.len() as u64
} else {
sum(values[1].as_primitive::<UInt64Type>()).unwrap_or_default()
};
let dims = vectors[0].len();
let mut sum = DVector::zeros(dims);
for v in vectors {
if v.len() != dims {
return Err(datafusion_common::DataFusionError::Execution(
"vectors length not match: VEC_AVG".to_string(),
));
}
let v_view = DVectorView::from_slice(&v, dims);
sum += &v_view;
}
*self.inner(dims) += sum;
self.count += len;
Ok(())
}
}
impl Accumulator for VectorAvg {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let vector = match &self.sum {
None => ScalarValue::Binary(None),
Some(sum) => ScalarValue::Binary(Some(veclit_to_binlit(sum.as_slice()))),
};
Ok(vec![vector, ScalarValue::from(self.count)])
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
self.update(values, true)
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
self.update(states, false)
}
fn evaluate(&mut self) -> Result<ScalarValue> {
match &self.sum {
None => Ok(ScalarValue::Binary(None)),
Some(sum) => Ok(ScalarValue::Binary(Some(veclit_to_binlit(
(sum / self.count as f32).as_slice(),
)))),
}
}
fn size(&self) -> usize {
size_of_val(self)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::StringArray;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{ConstantVector, StringVector, Vector};
use super::*;
#[test]
fn test_update_batch() {
// test update empty batch, expect not updating anything
let mut vec_avg = VectorAvg::default();
vec_avg.update_batch(&[]).unwrap();
assert!(vec_avg.sum.is_none());
assert_eq!(ScalarValue::Binary(None), vec_avg.evaluate().unwrap());
// test update one not-null value
let mut vec_avg = VectorAvg::default();
let v: Vec<ArrayRef> = vec![Arc::new(StringArray::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
]))];
vec_avg.update_batch(&v).unwrap();
assert_eq!(
ScalarValue::Binary(Some(veclit_to_binlit(&[2.5, 3.5, 4.5]))),
vec_avg.evaluate().unwrap()
);
// test update one null value
let mut vec_avg = VectorAvg::default();
let v: Vec<ArrayRef> = vec![Arc::new(StringArray::from(vec![Option::<String>::None]))];
vec_avg.update_batch(&v).unwrap();
assert_eq!(ScalarValue::Binary(None), vec_avg.evaluate().unwrap());
// test update no null-value batch
let mut vec_avg = VectorAvg::default();
let v: Vec<ArrayRef> = vec![Arc::new(StringArray::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,8.0,9.0]".to_string()),
]))];
vec_avg.update_batch(&v).unwrap();
assert_eq!(
ScalarValue::Binary(Some(veclit_to_binlit(&[4.0, 5.0, 6.0]))),
vec_avg.evaluate().unwrap()
);
// test update null-value batch
let mut vec_avg = VectorAvg::default();
let v: Vec<ArrayRef> = vec![Arc::new(StringArray::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
None,
Some("[7.0,8.0,9.0]".to_string()),
]))];
vec_avg.update_batch(&v).unwrap();
assert_eq!(
ScalarValue::Binary(Some(veclit_to_binlit(&[4.0, 5.0, 6.0]))),
vec_avg.evaluate().unwrap()
);
let mut vec_avg = VectorAvg::default();
let v: Vec<ArrayRef> = vec![Arc::new(StringArray::from(vec![
None,
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,8.0,9.0]".to_string()),
]))];
vec_avg.update_batch(&v).unwrap();
assert_eq!(
ScalarValue::Binary(Some(veclit_to_binlit(&[5.5, 6.5, 7.5]))),
vec_avg.evaluate().unwrap()
);
// test update with constant vector
let mut vec_avg = VectorAvg::default();
let v: Vec<ArrayRef> = vec![
Arc::new(ConstantVector::new(
Arc::new(StringVector::from_vec(vec!["[1.0,2.0,3.0]".to_string()])),
4,
))
.to_arrow_array(),
];
vec_avg.update_batch(&v).unwrap();
assert_eq!(
ScalarValue::Binary(Some(veclit_to_binlit(&[1.0, 2.0, 3.0]))),
vec_avg.evaluate().unwrap()
);
}
}

View File

@@ -14,7 +14,6 @@
mod convert;
mod distance;
mod elem_avg;
mod elem_product;
mod elem_sum;
pub mod impl_conv;
@@ -65,7 +64,6 @@ impl VectorFunction {
registry.register_scalar(vector_subvector::VectorSubvectorFunction::default());
registry.register_scalar(elem_sum::ElemSumFunction::default());
registry.register_scalar(elem_product::ElemProductFunction::default());
registry.register_scalar(elem_avg::ElemAvgFunction::default());
}
}

View File

@@ -1,128 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion_common::ScalarValue;
use datafusion_expr::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use nalgebra::DVectorView;
use crate::function::Function;
use crate::scalars::vector::{VectorCalculator, impl_conv};
const NAME: &str = "vec_elem_avg";
#[derive(Debug, Clone)]
pub(crate) struct ElemAvgFunction {
signature: Signature,
}
impl Default for ElemAvgFunction {
fn default() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
],
Volatility::Immutable,
),
}
}
}
impl Function for ElemAvgFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::Float32)
}
fn signature(&self) -> &Signature {
&self.signature
}
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let body = |v0: &ScalarValue| -> datafusion_common::Result<ScalarValue> {
let v0 =
impl_conv::as_veclit(v0)?.map(|v0| DVectorView::from_slice(&v0, v0.len()).mean());
Ok(ScalarValue::Float32(v0))
};
let calculator = VectorCalculator {
name: self.name(),
func: body,
};
calculator.invoke_with_single_argument(args)
}
}
impl Display for ElemAvgFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::StringViewArray;
use arrow_schema::Field;
use datafusion::arrow::array::{Array, AsArray};
use datafusion::arrow::datatypes::Float32Type;
use datafusion_common::config::ConfigOptions;
use super::*;
#[test]
fn test_elem_avg() {
let func = ElemAvgFunction::default();
let input = Arc::new(StringViewArray::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,8.0,9.0]".to_string()),
None,
]));
let result = func
.invoke_with_args(ScalarFunctionArgs {
args: vec![ColumnarValue::Array(input.clone())],
arg_fields: vec![],
number_rows: input.len(),
return_field: Arc::new(Field::new("x", DataType::Float32, true)),
config_options: Arc::new(ConfigOptions::new()),
})
.and_then(|v| ColumnarValue::values_to_arrays(&[v]))
.map(|mut a| a.remove(0))
.unwrap();
let result = result.as_primitive::<Float32Type>();
assert_eq!(result.len(), 4);
assert_eq!(result.value(0), 2.0);
assert_eq!(result.value(1), 5.0);
assert_eq!(result.value(2), 8.0);
assert!(result.is_null(3));
}
}

View File

@@ -22,14 +22,14 @@ use dashmap::DashMap;
use dashmap::mapref::entry::Entry;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use tokio_util::sync::CancellationToken;
use tonic::transport::{
Certificate, Channel as InnerChannel, ClientTlsConfig, Endpoint, Identity, Uri,
};
use tower::Service;
use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, Result};
use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsConfigSnafu, Result};
const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60;
pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10;
@@ -91,18 +91,57 @@ impl ChannelManager {
Default::default()
}
/// unified with config function that support tls config
/// use [`load_tls_config`] to load tls config from file system
pub fn with_config(config: ChannelConfig, tls_config: Option<ClientTlsConfig>) -> Self {
let mut inner = Inner::with_config(config.clone());
if let Some(tls_config) = tls_config {
inner.client_tls_config = Some(tls_config);
}
pub fn with_config(config: ChannelConfig) -> Self {
let inner = Inner::with_config(config);
Self {
inner: Arc::new(inner),
}
}
/// Read tls cert and key files and create a ChannelManager with TLS config.
pub fn with_tls_config(config: ChannelConfig) -> Result<Self> {
let mut inner = Inner::with_config(config.clone());
// setup tls
let path_config = config.client_tls.context(InvalidTlsConfigSnafu {
msg: "no config input",
})?;
if !path_config.enabled {
// if TLS not enabled, just ignore other tls config
// and not set `client_tls_config` hence not use TLS
return Ok(Self {
inner: Arc::new(inner),
});
}
let mut tls_config = ClientTlsConfig::new();
if let Some(server_ca) = path_config.server_ca_cert_path {
let server_root_ca_cert =
std::fs::read_to_string(server_ca).context(InvalidConfigFilePathSnafu)?;
let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
tls_config = tls_config.ca_certificate(server_root_ca_cert);
}
if let (Some(client_cert_path), Some(client_key_path)) =
(&path_config.client_cert_path, &path_config.client_key_path)
{
let client_cert =
std::fs::read_to_string(client_cert_path).context(InvalidConfigFilePathSnafu)?;
let client_key =
std::fs::read_to_string(client_key_path).context(InvalidConfigFilePathSnafu)?;
let client_identity = Identity::from_pem(client_cert, client_key);
tls_config = tls_config.identity(client_identity);
}
inner.client_tls_config = Some(tls_config);
Ok(Self {
inner: Arc::new(inner),
})
}
pub fn config(&self) -> &ChannelConfig {
&self.inner.config
}
@@ -248,34 +287,6 @@ impl ChannelManager {
}
}
pub fn load_tls_config(tls_option: Option<&ClientTlsOption>) -> Result<Option<ClientTlsConfig>> {
let path_config = match tls_option {
Some(path_config) if path_config.enabled => path_config,
_ => return Ok(None),
};
let mut tls_config = ClientTlsConfig::new();
if let Some(server_ca) = &path_config.server_ca_cert_path {
let server_root_ca_cert =
std::fs::read_to_string(server_ca).context(InvalidConfigFilePathSnafu)?;
let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
tls_config = tls_config.ca_certificate(server_root_ca_cert);
}
if let (Some(client_cert_path), Some(client_key_path)) =
(&path_config.client_cert_path, &path_config.client_key_path)
{
let client_cert =
std::fs::read_to_string(client_cert_path).context(InvalidConfigFilePathSnafu)?;
let client_key =
std::fs::read_to_string(client_key_path).context(InvalidConfigFilePathSnafu)?;
let client_identity = Identity::from_pem(client_cert, client_key);
tls_config = tls_config.identity(client_identity);
}
Ok(Some(tls_config))
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ClientTlsOption {
/// Whether to enable TLS for client.
@@ -648,7 +659,7 @@ mod tests {
.http2_adaptive_window(true)
.tcp_keepalive(Duration::from_secs(2))
.tcp_nodelay(true);
let mgr = ChannelManager::with_config(config, None);
let mgr = ChannelManager::with_config(config);
let res = mgr.build_endpoint("test_addr");

View File

@@ -12,17 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_grpc::channel_manager::{
ChannelConfig, ChannelManager, ClientTlsOption, load_tls_config,
};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager, ClientTlsOption};
#[tokio::test]
async fn test_mtls_config() {
// test no config
let config = ChannelConfig::new();
let re = load_tls_config(config.client_tls.as_ref());
assert!(re.is_ok());
assert!(re.unwrap().is_none());
let re = ChannelManager::with_tls_config(config);
assert!(re.is_err());
// test wrong file
let config = ChannelConfig::new().client_tls_config(ClientTlsOption {
@@ -32,7 +29,7 @@ async fn test_mtls_config() {
client_key_path: Some("tests/tls/wrong_client.key".to_string()),
});
let re = load_tls_config(config.client_tls.as_ref());
let re = ChannelManager::with_tls_config(config);
assert!(re.is_err());
// test corrupted file content
@@ -43,9 +40,7 @@ async fn test_mtls_config() {
client_key_path: Some("tests/tls/corrupted".to_string()),
});
let tls_config = load_tls_config(config.client_tls.as_ref()).unwrap();
let re = ChannelManager::with_config(config, tls_config);
let re = ChannelManager::with_tls_config(config).unwrap();
let re = re.get("127.0.0.1:0");
assert!(re.is_err());
@@ -57,8 +52,7 @@ async fn test_mtls_config() {
client_key_path: Some("tests/tls/client.key".to_string()),
});
let tls_config = load_tls_config(config.client_tls.as_ref()).unwrap();
let re = ChannelManager::with_config(config, tls_config);
let re = ChannelManager::with_tls_config(config).unwrap();
let re = re.get("127.0.0.1:0");
let _ = re.unwrap();
}

View File

@@ -77,10 +77,7 @@ serde_json.workspace = true
serde_with.workspace = true
session.workspace = true
snafu.workspace = true
sqlx = { workspace = true, features = [
"mysql",
"chrono",
], optional = true }
sqlx = { workspace = true, optional = true }
store-api.workspace = true
strum.workspace = true
table = { workspace = true, features = ["testing"] }

View File

@@ -442,7 +442,7 @@ pub fn extract_column_metadatas(
results: &mut [RegionResponse],
key: &str,
) -> Result<Option<Vec<ColumnMetadata>>> {
let mut schemas = results
let schemas = results
.iter_mut()
.map(|r| r.extensions.remove(key))
.collect::<Vec<_>>();
@@ -454,24 +454,20 @@ pub fn extract_column_metadatas(
// Verify all the physical schemas are the same
// Safety: previous check ensures this vec is not empty
let first_column_metadatas = schemas
.swap_remove(0)
.map(|first_bytes| ColumnMetadata::decode_list(&first_bytes).context(DecodeJsonSnafu))
.transpose()?;
let first = schemas.first().unwrap();
ensure!(
schemas.iter().all(|x| x == first),
MetadataCorruptionSnafu {
err_msg: "The table column metadata schemas from datanodes are not the same."
}
);
for s in schemas {
// check decoded column metadata instead of bytes because it contains extension map.
let column_metadata = s
.map(|bytes| ColumnMetadata::decode_list(&bytes).context(DecodeJsonSnafu))
.transpose()?;
ensure!(
column_metadata == first_column_metadatas,
MetadataCorruptionSnafu {
err_msg: "The table column metadata schemas from datanodes are not the same."
}
);
if let Some(first) = first {
let column_metadatas = ColumnMetadata::decode_list(first).context(DecodeJsonSnafu)?;
Ok(Some(column_metadatas))
} else {
Ok(None)
}
Ok(first_column_metadatas)
}
#[cfg(test)]

View File

@@ -250,7 +250,7 @@ pub struct UpgradeRegion {
/// `None` stands for no wait,
/// it's helpful to verify whether the leader region is ready.
#[serde(with = "humantime_serde")]
pub replay_timeout: Duration,
pub replay_timeout: Option<Duration>,
/// The hint for replaying memtable.
#[serde(default)]
pub location_id: Option<u64>,
@@ -507,14 +507,13 @@ pub enum Instruction {
/// Closes regions.
#[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")]
CloseRegions(Vec<RegionIdent>),
/// Upgrades regions.
#[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")]
UpgradeRegions(Vec<UpgradeRegion>),
/// Upgrades a region.
UpgradeRegion(UpgradeRegion),
#[serde(
deserialize_with = "single_or_multiple_from",
alias = "DowngradeRegion"
)]
/// Downgrades regions.
/// Downgrades a region.
DowngradeRegions(Vec<DowngradeRegion>),
/// Invalidates batch cache.
InvalidateCaches(Vec<CacheIdent>),
@@ -560,9 +559,9 @@ impl Instruction {
}
/// Converts the instruction into a [UpgradeRegion].
pub fn into_upgrade_regions(self) -> Option<Vec<UpgradeRegion>> {
pub fn into_upgrade_regions(self) -> Option<UpgradeRegion> {
match self {
Self::UpgradeRegions(upgrade_region) => Some(upgrade_region),
Self::UpgradeRegion(upgrade_region) => Some(upgrade_region),
_ => None,
}
}
@@ -585,10 +584,6 @@ impl Instruction {
/// The reply of [UpgradeRegion].
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct UpgradeRegionReply {
/// The [RegionId].
/// For compatibility, it is defaulted to [RegionId::new(0, 0)].
#[serde(default)]
pub region_id: RegionId,
/// Returns true if `last_entry_id` has been replayed to the latest.
pub ready: bool,
/// Indicates whether the region exists.
@@ -640,39 +635,6 @@ where
})
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct UpgradeRegionsReply {
pub replies: Vec<UpgradeRegionReply>,
}
impl UpgradeRegionsReply {
pub fn new(replies: Vec<UpgradeRegionReply>) -> Self {
Self { replies }
}
pub fn single(reply: UpgradeRegionReply) -> Self {
Self::new(vec![reply])
}
}
#[derive(Deserialize)]
#[serde(untagged)]
enum UpgradeRegionsCompat {
Single(UpgradeRegionReply),
Multiple(UpgradeRegionsReply),
}
fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result<UpgradeRegionsReply, D::Error>
where
D: Deserializer<'de>,
{
let helper = UpgradeRegionsCompat::deserialize(deserializer)?;
Ok(match helper {
UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]),
UpgradeRegionsCompat::Multiple(reply) => reply,
})
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InstructionReply {
@@ -680,11 +642,7 @@ pub enum InstructionReply {
OpenRegions(SimpleReply),
#[serde(alias = "close_region")]
CloseRegions(SimpleReply),
#[serde(
deserialize_with = "upgrade_regions_compat_from",
alias = "upgrade_region"
)]
UpgradeRegions(UpgradeRegionsReply),
UpgradeRegion(UpgradeRegionReply),
#[serde(
alias = "downgrade_region",
deserialize_with = "downgrade_regions_compat_from"
@@ -700,11 +658,9 @@ impl Display for InstructionReply {
match self {
Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply),
Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply),
Self::UpgradeRegions(reply) => {
write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies)
}
Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
Self::DowngradeRegions(reply) => {
write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies)
write!(f, "InstructionReply::DowngradeRegions({:?})", reply)
}
Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
@@ -729,9 +685,9 @@ impl InstructionReply {
}
}
pub fn expect_upgrade_regions_reply(self) -> Vec<UpgradeRegionReply> {
pub fn expect_upgrade_region_reply(self) -> UpgradeRegionReply {
match self {
Self::UpgradeRegions(reply) => reply.replies,
Self::UpgradeRegion(reply) => reply,
_ => panic!("Expected UpgradeRegion reply"),
}
}
@@ -793,58 +749,25 @@ mod tests {
serialized
);
let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
region_id: RegionId::new(1024, 1),
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout: Duration::from_millis(1000),
location_id: None,
replay_entry_id: None,
metadata_replay_entry_id: None,
}]);
let serialized = serde_json::to_string(&upgrade_region).unwrap();
assert_eq!(
r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#,
serialized
);
}
#[test]
fn test_serialize_instruction_reply() {
let downgrade_region_reply = InstructionReply::DowngradeRegions(
DowngradeRegionsReply::single(DowngradeRegionReply {
let downgrade_region = InstructionReply::DowngradeRegions(DowngradeRegionsReply::single(
DowngradeRegionReply {
region_id: RegionId::new(1024, 1),
last_entry_id: None,
metadata_last_entry_id: None,
exists: true,
error: None,
}),
);
},
));
let serialized = serde_json::to_string(&downgrade_region_reply).unwrap();
let serialized = serde_json::to_string(&downgrade_region).unwrap();
assert_eq!(
r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#,
serialized
);
let upgrade_region_reply =
InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
region_id: RegionId::new(1024, 1),
ready: true,
exists: true,
error: None,
}));
let serialized = serde_json::to_string(&upgrade_region_reply).unwrap();
assert_eq!(
r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#,
serialized
);
)
}
#[test]
fn test_deserialize_instruction() {
// legacy open region instruction
let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#;
let open_region_instruction: Instruction =
serde_json::from_str(open_region_instruction).unwrap();
@@ -862,7 +785,6 @@ mod tests {
)]);
assert_eq!(open_region_instruction, open_region);
// legacy close region instruction
let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#;
let close_region_instruction: Instruction =
serde_json::from_str(close_region_instruction).unwrap();
@@ -874,7 +796,6 @@ mod tests {
}]);
assert_eq!(close_region_instruction, close_region);
// legacy downgrade region instruction
let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#;
let downgrade_region_instruction: Instruction =
serde_json::from_str(downgrade_region_instruction).unwrap();
@@ -884,25 +805,6 @@ mod tests {
}]);
assert_eq!(downgrade_region_instruction, downgrade_region);
// legacy upgrade region instruction
let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#;
let upgrade_region_instruction: Instruction =
serde_json::from_str(upgrade_region_instruction).unwrap();
let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion {
region_id: RegionId::new(1024, 1),
last_entry_id: None,
metadata_last_entry_id: None,
replay_timeout: Duration::from_millis(1000),
location_id: None,
replay_entry_id: None,
metadata_replay_entry_id: None,
}]);
assert_eq!(upgrade_region_instruction, upgrade_region);
}
#[test]
fn test_deserialize_instruction_reply() {
// legacy close region reply
let close_region_instruction_reply =
r#"{"result":true,"error":null,"type":"close_region"}"#;
let close_region_instruction_reply: InstructionReply =
@@ -913,7 +815,6 @@ mod tests {
});
assert_eq!(close_region_instruction_reply, close_region_reply);
// legacy open region reply
let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#;
let open_region_instruction_reply: InstructionReply =
serde_json::from_str(open_region_instruction_reply).unwrap();
@@ -923,7 +824,6 @@ mod tests {
});
assert_eq!(open_region_instruction_reply, open_region_reply);
// legacy downgrade region reply
let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#;
let downgrade_region_instruction_reply: InstructionReply =
serde_json::from_str(downgrade_region_instruction_reply).unwrap();
@@ -937,19 +837,6 @@ mod tests {
}),
);
assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply);
// legacy upgrade region reply
let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#;
let upgrade_region_instruction_reply: InstructionReply =
serde_json::from_str(upgrade_region_instruction_reply).unwrap();
let upgrade_region_reply =
InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply {
region_id: RegionId::new(1024, 1),
ready: true,
exists: true,
error: None,
}));
assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply);
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -26,6 +26,7 @@ use datatypes::arrow::datatypes::{
Int32Type, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType,
};
use datatypes::schema::SchemaRef;
fn prepare_record_batch(rows: usize) -> RecordBatch {
let schema = Schema::new(vec![
@@ -55,6 +56,14 @@ fn prepare_record_batch(rows: usize) -> RecordBatch {
RecordBatch::try_new(Arc::new(schema), columns).unwrap()
}
fn iter_by_greptimedb_values(schema: SchemaRef, record_batch: RecordBatch) {
let record_batch =
common_recordbatch::RecordBatch::try_from_df_record_batch(schema, record_batch).unwrap();
for row in record_batch.rows() {
black_box(row);
}
}
fn iter_by_loop_rows_and_columns(record_batch: RecordBatch) {
for i in 0..record_batch.num_rows() {
for column in record_batch.columns() {
@@ -116,6 +125,19 @@ pub fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("iter_record_batch");
for rows in [1usize, 10, 100, 1_000, 10_000] {
group.bench_with_input(
BenchmarkId::new("by_greptimedb_values", rows),
&rows,
|b, rows| {
let record_batch = prepare_record_batch(*rows);
let schema =
Arc::new(datatypes::schema::Schema::try_from(record_batch.schema()).unwrap());
b.iter(|| {
iter_by_greptimedb_values(schema.clone(), record_batch.clone());
})
},
);
group.bench_with_input(
BenchmarkId::new("by_loop_rows_and_columns", rows),
&rows,

View File

@@ -23,6 +23,7 @@ use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef a
use datatypes::arrow::array::RecordBatchOptions;
use datatypes::prelude::DataType;
use datatypes::schema::SchemaRef;
use datatypes::value::Value;
use datatypes::vectors::{Helper, VectorRef};
use serde::ser::{Error, SerializeStruct};
use serde::{Serialize, Serializer};
@@ -193,6 +194,11 @@ impl RecordBatch {
self.df_record_batch.num_rows()
}
/// Create an iterator to traverse the data by row
pub fn rows(&self) -> RecordBatchRowIterator<'_> {
RecordBatchRowIterator::new(self)
}
pub fn column_vectors(
&self,
table_name: &str,
@@ -271,6 +277,44 @@ impl Serialize for RecordBatch {
}
}
pub struct RecordBatchRowIterator<'a> {
record_batch: &'a RecordBatch,
rows: usize,
columns: usize,
row_cursor: usize,
}
impl<'a> RecordBatchRowIterator<'a> {
fn new(record_batch: &'a RecordBatch) -> RecordBatchRowIterator<'a> {
RecordBatchRowIterator {
record_batch,
rows: record_batch.df_record_batch.num_rows(),
columns: record_batch.df_record_batch.num_columns(),
row_cursor: 0,
}
}
}
impl Iterator for RecordBatchRowIterator<'_> {
type Item = Vec<Value>;
fn next(&mut self) -> Option<Self::Item> {
if self.row_cursor == self.rows {
None
} else {
let mut row = Vec::with_capacity(self.columns);
for col in 0..self.columns {
let column = self.record_batch.column(col);
row.push(column.get(self.row_cursor));
}
self.row_cursor += 1;
Some(row)
}
}
}
/// merge multiple recordbatch into a single
pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Result<RecordBatch> {
let batches_len = batches.len();
@@ -305,9 +349,7 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul
mod tests {
use std::sync::Arc;
use datatypes::arrow::array::{AsArray, UInt32Array};
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, UInt32Type};
use datatypes::arrow_array::StringArray;
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{StringVector, UInt32Vector};
@@ -365,6 +407,64 @@ mod tests {
);
}
#[test]
fn test_record_batch_visitor() {
let column_schemas = vec![
ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
];
let schema = Arc::new(Schema::new(column_schemas));
let columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
Arc::new(StringVector::from(vec![
None,
Some("hello"),
Some("greptime"),
None,
])),
];
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let mut record_batch_iter = recordbatch.rows();
assert_eq!(
vec![Value::UInt32(1), Value::Null],
record_batch_iter
.next()
.unwrap()
.into_iter()
.collect::<Vec<Value>>()
);
assert_eq!(
vec![Value::UInt32(2), Value::String("hello".into())],
record_batch_iter
.next()
.unwrap()
.into_iter()
.collect::<Vec<Value>>()
);
assert_eq!(
vec![Value::UInt32(3), Value::String("greptime".into())],
record_batch_iter
.next()
.unwrap()
.into_iter()
.collect::<Vec<Value>>()
);
assert_eq!(
vec![Value::UInt32(4), Value::Null],
record_batch_iter
.next()
.unwrap()
.into_iter()
.collect::<Vec<Value>>()
);
assert!(record_batch_iter.next().is_none());
}
#[test]
fn test_record_batch_slice() {
let column_schemas = vec![
@@ -383,16 +483,26 @@ mod tests {
];
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let recordbatch = recordbatch.slice(1, 2).expect("recordbatch slice");
let mut record_batch_iter = recordbatch.rows();
assert_eq!(
vec![Value::UInt32(2), Value::String("hello".into())],
record_batch_iter
.next()
.unwrap()
.into_iter()
.collect::<Vec<Value>>()
);
let expected = &UInt32Array::from_iter_values([2u32, 3]);
let array = recordbatch.column(0).to_arrow_array();
let actual = array.as_primitive::<UInt32Type>();
assert_eq!(expected, actual);
assert_eq!(
vec![Value::UInt32(3), Value::String("greptime".into())],
record_batch_iter
.next()
.unwrap()
.into_iter()
.collect::<Vec<Value>>()
);
let expected = &StringArray::from(vec!["hello", "greptime"]);
let array = recordbatch.column(1).to_arrow_array();
let actual = array.as_string::<i32>();
assert_eq!(expected, actual);
assert!(record_batch_iter.next().is_none());
assert!(recordbatch.slice(1, 5).is_err());
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::fmt::Display;
use std::str::FromStr;
use chrono::{FixedOffset, TimeZone};
use chrono_tz::{OffsetComponents, Tz};
@@ -101,7 +102,7 @@ impl Timezone {
.parse::<u32>()
.context(ParseOffsetStrSnafu { raw: tz_string })?;
Self::hours_mins_opt(hrs, mins)
} else if let Ok(tz) = Tz::from_str_insensitive(tz_string) {
} else if let Ok(tz) = Tz::from_str(tz_string) {
Ok(Self::Named(tz))
} else {
ParseTimezoneNameSnafu { raw: tz_string }.fail()
@@ -202,10 +203,6 @@ mod tests {
Timezone::Named(Tz::Asia__Shanghai),
Timezone::from_tz_string("Asia/Shanghai").unwrap()
);
assert_eq!(
Timezone::Named(Tz::Asia__Shanghai),
Timezone::from_tz_string("Asia/ShangHai").unwrap()
);
assert_eq!(
Timezone::Named(Tz::UTC),
Timezone::from_tz_string("UTC").unwrap()

View File

@@ -44,6 +44,7 @@ use crate::region_server::RegionServer;
#[derive(Clone)]
pub struct RegionHeartbeatResponseHandler {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
open_region_parallelism: usize,
@@ -63,6 +64,7 @@ pub trait InstructionHandler: Send + Sync {
#[derive(Clone)]
pub struct HandlerContext {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
gc_tasks: TaskTracker<GcReport>,
@@ -73,6 +75,7 @@ impl HandlerContext {
pub fn new_for_test(region_server: RegionServer) -> Self {
Self {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
gc_tasks: TaskTracker::new(),
@@ -85,6 +88,7 @@ impl RegionHeartbeatResponseHandler {
pub fn new(region_server: RegionServer) -> Self {
Self {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
// Default to half of the number of CPUs.
@@ -110,12 +114,7 @@ impl RegionHeartbeatResponseHandler {
)),
Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())),
Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())),
Instruction::UpgradeRegions(_) => Ok(Box::new(
UpgradeRegionsHandler {
upgrade_region_parallelism: self.open_region_parallelism,
}
.into(),
)),
Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler.into())),
Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler.into())),
Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler.into())),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
@@ -195,7 +194,7 @@ dispatch_instr!(
OpenRegions => OpenRegions,
FlushRegions => FlushRegions,
DowngradeRegions => DowngradeRegions,
UpgradeRegions => UpgradeRegions,
UpgradeRegion => UpgradeRegions,
GetFileRefs => GetFileRefs,
GcRegions => GcRegions,
);
@@ -217,6 +216,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
let mailbox = ctx.mailbox.clone();
let region_server = self.region_server.clone();
let catchup_tasks = self.catchup_tasks.clone();
let downgrade_tasks = self.downgrade_tasks.clone();
let flush_tasks = self.flush_tasks.clone();
let gc_tasks = self.gc_tasks.clone();
@@ -226,6 +226,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
.handle(
&HandlerContext {
region_server,
catchup_tasks,
downgrade_tasks,
flush_tasks,
gc_tasks,
@@ -333,10 +334,10 @@ mod tests {
);
// Upgrade region
let instruction = Instruction::UpgradeRegions(vec![UpgradeRegion {
let instruction = Instruction::UpgradeRegion(UpgradeRegion {
region_id,
..Default::default()
}]);
});
assert!(
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
);

View File

@@ -12,209 +12,125 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_meta::instruction::{
InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
};
use common_telemetry::{debug, info, warn};
use store_api::region_request::{RegionCatchupRequest, ReplayCheckpoint};
use store_api::storage::RegionId;
use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_telemetry::{info, warn};
use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint};
use crate::error::Result;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
use crate::heartbeat::task_tracker::WaitResult;
#[derive(Debug, Clone, Copy, Default)]
pub struct UpgradeRegionsHandler {
pub upgrade_region_parallelism: usize,
}
#[cfg(test)]
impl UpgradeRegionsHandler {
fn new_test() -> UpgradeRegionsHandler {
UpgradeRegionsHandler {
upgrade_region_parallelism: 8,
}
}
}
impl UpgradeRegionsHandler {
fn convert_responses_to_replies(
responses: Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>>,
catchup_regions: &[RegionId],
) -> Vec<UpgradeRegionReply> {
match responses {
Ok(responses) => responses
.into_iter()
.map(|(region_id, result)| match result {
Ok(()) => UpgradeRegionReply {
region_id,
ready: true,
exists: true,
error: None,
},
Err(err) => {
if err.status_code() == StatusCode::RegionNotFound {
UpgradeRegionReply {
region_id,
ready: false,
exists: false,
error: Some(format!("{err:?}")),
}
} else {
UpgradeRegionReply {
region_id,
ready: false,
exists: true,
error: Some(format!("{err:?}")),
}
}
}
})
.collect::<Vec<_>>(),
Err(err) => catchup_regions
.iter()
.map(|region_id| UpgradeRegionReply {
region_id: *region_id,
ready: false,
exists: true,
error: Some(format!("{err:?}")),
})
.collect::<Vec<_>>(),
}
}
}
impl UpgradeRegionsHandler {
// Handles upgrade regions instruction.
//
// Returns batch of upgrade region replies, the order of the replies is not guaranteed.
async fn handle_upgrade_regions(
&self,
ctx: &HandlerContext,
upgrade_regions: Vec<UpgradeRegion>,
) -> Vec<UpgradeRegionReply> {
let num_upgrade_regions = upgrade_regions.len();
let mut replies = Vec::with_capacity(num_upgrade_regions);
let mut catchup_requests = Vec::with_capacity(num_upgrade_regions);
let mut catchup_regions = Vec::with_capacity(num_upgrade_regions);
let mut timeout = None;
for upgrade_region in upgrade_regions {
let Some(writable) = ctx.region_server.is_region_leader(upgrade_region.region_id)
else {
// Region is not found.
debug!("Region {} is not found", upgrade_region.region_id);
replies.push(UpgradeRegionReply {
region_id: upgrade_region.region_id,
ready: false,
exists: false,
error: None,
});
continue;
};
// Ignores the catchup requests for writable regions.
if writable {
warn!(
"Region {} is writable, ignores the catchup request",
upgrade_region.region_id
);
replies.push(UpgradeRegionReply {
region_id: upgrade_region.region_id,
ready: true,
exists: true,
error: None,
});
} else {
let UpgradeRegion {
last_entry_id,
metadata_last_entry_id,
location_id,
replay_entry_id,
metadata_replay_entry_id,
replay_timeout,
..
} = upgrade_region;
match timeout {
Some(timeout) => {
debug_assert_eq!(timeout, replay_timeout);
}
None => {
// TODO(weny): required the replay_timeout.
timeout = Some(replay_timeout);
}
}
let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
(Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
entry_id,
metadata_entry_id,
}),
_ => None,
};
catchup_regions.push(upgrade_region.region_id);
catchup_requests.push((
upgrade_region.region_id,
RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
metadata_entry_id: metadata_last_entry_id,
location_id,
checkpoint,
},
));
}
}
let Some(timeout) = timeout else {
// No replay timeout, so we don't need to catchup the regions.
info!("All regions are writable, no need to catchup");
debug_assert_eq!(replies.len(), num_upgrade_regions);
return replies;
};
match tokio::time::timeout(
timeout,
ctx.region_server
.handle_batch_catchup_requests(self.upgrade_region_parallelism, catchup_requests),
)
.await
{
Ok(responses) => {
replies.extend(
Self::convert_responses_to_replies(responses, &catchup_regions).into_iter(),
);
}
Err(_) => {
replies.extend(catchup_regions.iter().map(|region_id| UpgradeRegionReply {
region_id: *region_id,
ready: false,
exists: true,
error: None,
}));
}
}
replies
}
}
pub struct UpgradeRegionsHandler;
#[async_trait::async_trait]
impl InstructionHandler for UpgradeRegionsHandler {
type Instruction = Vec<UpgradeRegion>;
type Instruction = UpgradeRegion;
async fn handle(
&self,
ctx: &HandlerContext,
upgrade_regions: Self::Instruction,
UpgradeRegion {
region_id,
last_entry_id,
metadata_last_entry_id,
replay_timeout,
location_id,
replay_entry_id,
metadata_replay_entry_id,
}: UpgradeRegion,
) -> Option<InstructionReply> {
let replies = self.handle_upgrade_regions(ctx, upgrade_regions).await;
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: false,
error: None,
}));
};
Some(InstructionReply::UpgradeRegions(UpgradeRegionsReply::new(
replies,
)))
if writable {
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: true,
exists: true,
error: None,
}));
}
let region_server_moved = ctx.region_server.clone();
let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
(Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
entry_id,
metadata_entry_id,
}),
_ => None,
};
// The catchup task is almost zero cost if the inside region is writable.
// Therefore, it always registers a new catchup task.
let register_result = ctx
.catchup_tasks
.try_register(
region_id,
Box::pin(async move {
info!(
"Executing region: {region_id} catchup to: last entry id {last_entry_id:?}"
);
region_server_moved
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
metadata_entry_id: metadata_last_entry_id,
location_id,
checkpoint,
}),
)
.await?;
Ok(())
}),
)
.await;
if register_result.is_busy() {
warn!("Another catchup task is running for the region: {region_id}");
}
// Returns immediately
let Some(replay_timeout) = replay_timeout else {
return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: None,
}));
};
// We don't care that it returns a newly registered or running task.
let mut watcher = register_result.into_watcher();
let result = ctx.catchup_tasks.wait(&mut watcher, replay_timeout).await;
match result {
WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: None,
})),
WaitResult::Finish(Ok(_)) => {
Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: true,
exists: true,
error: None,
}))
}
WaitResult::Finish(Err(err)) => {
Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: Some(format!("{err:?}")),
}))
}
}
}
}
@@ -226,6 +142,7 @@ mod tests {
use mito2::engine::MITO_ENGINE_NAME;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use tokio::time::Instant;
use crate::error;
use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
@@ -241,30 +158,21 @@ mod tests {
let handler_context = HandlerContext::new_for_test(mock_region_server);
let region_id = RegionId::new(1024, 1);
let region_id2 = RegionId::new(1024, 2);
let replay_timeout = Duration::from_millis(100u64);
let reply = UpgradeRegionsHandler::new_test()
.handle(
&handler_context,
vec![
let waits = vec![None, Some(Duration::from_millis(100u64))];
for replay_timeout in waits {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
},
UpgradeRegion {
region_id: region_id2,
replay_timeout,
..Default::default()
},
],
)
.await;
)
.await;
let replies = &reply.unwrap().expect_upgrade_regions_reply();
assert_eq!(replies[0].region_id, region_id);
assert_eq!(replies[1].region_id, region_id2);
for reply in replies {
let reply = reply.unwrap().expect_upgrade_region_reply();
assert!(!reply.exists);
assert!(reply.error.is_none());
}
@@ -274,7 +182,6 @@ mod tests {
async fn test_region_writable() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let region_id2 = RegionId::new(1024, 2);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
@@ -284,32 +191,25 @@ mod tests {
unreachable!();
}));
});
mock_region_server.register_test_region(region_id, mock_engine.clone());
mock_region_server.register_test_region(region_id2, mock_engine);
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let replay_timeout = Duration::from_millis(100u64);
let reply = UpgradeRegionsHandler::new_test()
.handle(
&handler_context,
vec![
let waits = vec![None, Some(Duration::from_millis(100u64))];
for replay_timeout in waits {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
},
UpgradeRegion {
region_id: region_id2,
replay_timeout,
..Default::default()
},
],
)
.await;
)
.await;
let replies = &reply.unwrap().expect_upgrade_regions_reply();
assert_eq!(replies[0].region_id, region_id);
assert_eq!(replies[1].region_id, region_id2);
for reply in replies {
let reply = reply.unwrap().expect_upgrade_region_reply();
assert!(reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
@@ -332,27 +232,30 @@ mod tests {
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let replay_timeout = Duration::from_millis(100u64);
let reply = UpgradeRegionsHandler::new_test()
.handle(
&handler_context,
vec![UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
}],
)
.await;
let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none(), "error: {:?}", reply.error);
let waits = vec![None, Some(Duration::from_millis(100u64))];
for replay_timeout in waits {
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
},
)
.await;
let reply = reply.unwrap().expect_upgrade_region_reply();
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
}
}
#[tokio::test]
async fn test_region_not_ready_with_retry() {
common_telemetry::init_default_ut_logging();
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
@@ -361,48 +264,58 @@ mod tests {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_millis(300));
});
mock_region_server.register_test_region(region_id, mock_engine);
let waits = vec![Duration::from_millis(100u64), Duration::from_millis(100u64)];
let waits = vec![
Some(Duration::from_millis(100u64)),
Some(Duration::from_millis(100u64)),
];
let handler_context = HandlerContext::new_for_test(mock_region_server);
for replay_timeout in waits {
let reply = UpgradeRegionsHandler::new_test()
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
vec![UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout,
..Default::default()
}],
},
)
.await;
let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
let reply = reply.unwrap().expect_upgrade_region_reply();
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none(), "error: {:?}", reply.error);
assert!(reply.error.is_none());
}
let reply = UpgradeRegionsHandler::new_test()
let timer = Instant::now();
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
vec![UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout: Duration::from_millis(500),
replay_timeout: Some(Duration::from_millis(500)),
..Default::default()
}],
},
)
.await;
let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
// Must less than 300 ms.
assert!(timer.elapsed().as_millis() < 300);
let reply = reply.unwrap().expect_upgrade_region_reply();
assert!(reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none(), "error: {:?}", reply.error);
assert!(reply.error.is_none());
}
#[tokio::test]
async fn test_region_error() {
common_telemetry::init_default_ut_logging();
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
@@ -422,37 +335,38 @@ mod tests {
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let reply = UpgradeRegionsHandler::new_test()
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
vec![UpgradeRegion {
UpgradeRegion {
region_id,
..Default::default()
}],
},
)
.await;
// It didn't wait for handle returns; it had no idea about the error.
let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
let reply = reply.unwrap().expect_upgrade_region_reply();
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
let reply = UpgradeRegionsHandler::new_test()
let reply = UpgradeRegionsHandler
.handle(
&handler_context,
vec![UpgradeRegion {
UpgradeRegion {
region_id,
replay_timeout: Duration::from_millis(200),
replay_timeout: Some(Duration::from_millis(200)),
..Default::default()
}],
},
)
.await;
let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
let reply = reply.unwrap().expect_upgrade_region_reply();
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_some());
assert!(reply.error.as_ref().unwrap().contains("mock_error"));
assert!(reply.error.unwrap().contains("mock_error"));
}
}

View File

@@ -75,20 +75,4 @@ lazy_static! {
&[RESULT_TYPE]
)
.unwrap();
/// Total count of failed region server requests.
pub static ref REGION_SERVER_REQUEST_FAILURE_COUNT: IntCounterVec = register_int_counter_vec!(
"greptime_datanode_region_request_fail_count",
"failed region server requests count",
&[REGION_REQUEST_TYPE]
)
.unwrap();
/// Total count of failed insert requests to region server.
pub static ref REGION_SERVER_INSERT_FAIL_COUNT: IntCounterVec = register_int_counter_vec!(
"greptime_datanode_region_failed_insert_count",
"failed region server insert requests count",
&[REGION_REQUEST_TYPE]
)
.unwrap();
}

View File

@@ -66,8 +66,7 @@ use store_api::region_engine::{
SettableRegionRoleState,
};
use store_api::region_request::{
AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
RegionOpenRequest, RegionRequest,
AffectedRows, BatchRegionDdlRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::RegionId;
use tokio::sync::{Semaphore, SemaphorePermit};
@@ -192,17 +191,6 @@ impl RegionServer {
.await
}
#[tracing::instrument(skip_all)]
pub async fn handle_batch_catchup_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionCatchupRequest)>,
) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
self.inner
.handle_batch_catchup_requests(parallelism, requests)
.await
}
#[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
pub async fn handle_request(
&self,
@@ -411,14 +399,6 @@ impl RegionServer {
#[cfg(test)]
/// Registers a region for test purpose.
pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
{
let mut engines = self.inner.engines.write().unwrap();
if !engines.contains_key(engine.name()) {
debug!("Registering test engine: {}", engine.name());
engines.insert(engine.name().to_string(), engine.clone());
}
}
self.inner
.region_map
.insert(region_id, RegionEngineWithStatus::Ready(engine));
@@ -600,8 +580,6 @@ impl RegionServer {
#[async_trait]
impl RegionServerHandler for RegionServer {
async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
.with_label_values(&[request.as_ref()]);
let response = match &request {
region_request::Body::Creates(_)
| region_request::Body::Drops(_)
@@ -619,9 +597,6 @@ impl RegionServerHandler for RegionServer {
_ => self.handle_requests_in_serial(request).await,
}
.map_err(BoxedError::new)
.inspect_err(|_| {
failed_requests_cnt.inc();
})
.context(ExecuteGrpcRequestSnafu)?;
Ok(RegionResponseV1 {
@@ -997,116 +972,6 @@ impl RegionServerInner {
.collect::<Vec<_>>())
}
pub async fn handle_batch_catchup_requests_inner(
&self,
engine: RegionEngineRef,
parallelism: usize,
requests: Vec<(RegionId, RegionCatchupRequest)>,
) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
for (region_id, _) in &requests {
self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup);
}
let region_ids = requests
.iter()
.map(|(region_id, _)| *region_id)
.collect::<Vec<_>>();
let mut responses = Vec::with_capacity(requests.len());
match engine
.handle_batch_catchup_requests(parallelism, requests)
.await
{
Ok(results) => {
for (region_id, result) in results {
match result {
Ok(_) => {
if let Err(e) = self
.set_region_status_ready(
region_id,
engine.clone(),
RegionChange::Catchup,
)
.await
{
error!(e; "Failed to set region to ready: {}", region_id);
responses.push((region_id, Err(BoxedError::new(e))));
} else {
responses.push((region_id, Ok(())));
}
}
Err(e) => {
self.unset_region_status(region_id, &engine, RegionChange::Catchup);
error!(e; "Failed to catchup region: {}", region_id);
responses.push((region_id, Err(e)));
}
}
}
}
Err(e) => {
for region_id in region_ids {
self.unset_region_status(region_id, &engine, RegionChange::Catchup);
}
error!(e; "Failed to catchup batch regions");
return error::UnexpectedSnafu {
violated: format!("Failed to catchup batch regions: {:?}", e),
}
.fail();
}
}
Ok(responses)
}
pub async fn handle_batch_catchup_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionCatchupRequest)>,
) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
let mut engine_grouped_requests: HashMap<String, Vec<_>> = HashMap::new();
let mut responses = Vec::with_capacity(requests.len());
for (region_id, request) in requests {
if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) {
match engine {
CurrentEngine::Engine(engine) => {
engine_grouped_requests
.entry(engine.name().to_string())
.or_default()
.push((region_id, request));
}
CurrentEngine::EarlyReturn(_) => {
return error::UnexpectedSnafu {
violated: format!("Unexpected engine type for region {}", region_id),
}
.fail();
}
}
} else {
responses.push((
region_id,
Err(BoxedError::new(
error::RegionNotFoundSnafu { region_id }.build(),
)),
));
}
}
for (engine, requests) in engine_grouped_requests {
let engine = self
.engines
.read()
.unwrap()
.get(&engine)
.with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
.clone();
responses.extend(
self.handle_batch_catchup_requests_inner(engine, parallelism, requests)
.await?,
);
}
Ok(responses)
}
// Handle requests in batch.
//
// limitation: all create requests must be in the same engine.
@@ -1235,11 +1100,6 @@ impl RegionServerInner {
})
}
Err(err) => {
if matches!(region_change, RegionChange::Ingest) {
crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
.with_label_values(&[request_type])
.inc();
}
// Removes the region status if the operation fails.
self.unset_region_status(region_id, &engine, region_change);
Err(err)

View File

@@ -277,10 +277,6 @@ impl ConcreteDataType {
matches!(self, ConcreteDataType::Null(NullType))
}
pub(crate) fn is_struct(&self) -> bool {
matches!(self, ConcreteDataType::Struct(_))
}
/// Try to cast the type as a [`ListType`].
pub fn as_list(&self) -> Option<&ListType> {
match self {

View File

@@ -266,14 +266,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse or serialize arrow metadata"))]
ArrowMetadata {
#[snafu(source)]
error: arrow::error::ArrowError,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -315,8 +307,7 @@ impl ErrorExt for Error {
| ConvertArrowArrayToScalars { .. }
| ConvertScalarToArrowArray { .. }
| ParseExtendedType { .. }
| InconsistentStructFieldsAndItems { .. }
| ArrowMetadata { .. } => StatusCode::Internal,
| InconsistentStructFieldsAndItems { .. } => StatusCode::Internal,
}
}

View File

@@ -1,15 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod json;

View File

@@ -1,104 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow_schema::extension::ExtensionType;
use arrow_schema::{ArrowError, DataType};
use serde::{Deserialize, Serialize};
use crate::json::JsonStructureSettings;
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct JsonMetadata {
/// Indicates how to handle JSON is stored in underlying data type
///
/// This field can be `None` for data is converted to complete structured in-memory form.
pub json_structure_settings: Option<JsonStructureSettings>,
}
#[derive(Debug, Clone)]
pub struct JsonExtensionType(Arc<JsonMetadata>);
impl JsonExtensionType {
pub fn new(metadata: Arc<JsonMetadata>) -> Self {
JsonExtensionType(metadata)
}
}
impl ExtensionType for JsonExtensionType {
const NAME: &'static str = "greptime.json";
type Metadata = Arc<JsonMetadata>;
fn metadata(&self) -> &Self::Metadata {
&self.0
}
fn serialize_metadata(&self) -> Option<String> {
serde_json::to_string(self.metadata()).ok()
}
fn deserialize_metadata(metadata: Option<&str>) -> Result<Self::Metadata, ArrowError> {
if let Some(metadata) = metadata {
let metadata = serde_json::from_str(metadata).map_err(|e| {
ArrowError::ParseError(format!("Failed to deserialize JSON metadata: {}", e))
})?;
Ok(Arc::new(metadata))
} else {
Ok(Arc::new(JsonMetadata::default()))
}
}
fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> {
match data_type {
// object
DataType::Struct(_)
// array
| DataType::List(_)
| DataType::ListView(_)
| DataType::LargeList(_)
| DataType::LargeListView(_)
// string
| DataType::Utf8
| DataType::Utf8View
| DataType::LargeUtf8
// number
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Float32
| DataType::Float64
// boolean
| DataType::Boolean
// null
| DataType::Null
// legacy json type
| DataType::Binary => Ok(()),
dt => Err(ArrowError::SchemaError(format!(
"Unexpected data type {dt}"
))),
}
}
fn try_new(data_type: &DataType, metadata: Self::Metadata) -> Result<Self, ArrowError> {
let json = Self(metadata);
json.supports_data_type(data_type)?;
Ok(json)
}
}

View File

@@ -13,13 +13,11 @@
// limitations under the License.
#![feature(assert_matches)]
#![feature(box_patterns)]
pub mod arrow_array;
pub mod data_type;
pub mod duration;
pub mod error;
pub mod extension;
pub mod interval;
pub mod json;
pub mod macros;

View File

@@ -32,8 +32,9 @@ pub use crate::schema::column_schema::{
COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY,
COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, ColumnExtType, ColumnSchema, FULLTEXT_KEY,
FulltextAnalyzer, FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata,
SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType, TIME_INDEX_KEY,
FulltextAnalyzer, FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY,
JSON_STRUCTURE_SETTINGS_KEY, Metadata, SKIPPING_INDEX_KEY, SkippingIndexOptions,
SkippingIndexType, TIME_INDEX_KEY,
};
pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;

View File

@@ -17,17 +17,13 @@ use std::fmt;
use std::str::FromStr;
use arrow::datatypes::Field;
use arrow_schema::extension::{
EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY, ExtensionType,
};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, ensure};
use sqlparser_derive::{Visit, VisitMut};
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{
self, ArrowMetadataSnafu, Error, InvalidFulltextOptionSnafu, ParseExtendedTypeSnafu, Result,
};
use crate::error::{self, Error, InvalidFulltextOptionSnafu, ParseExtendedTypeSnafu, Result};
use crate::json::JsonStructureSettings;
use crate::schema::TYPE_KEY;
use crate::schema::constraint::ColumnDefaultConstraint;
use crate::value::Value;
@@ -46,6 +42,7 @@ pub const FULLTEXT_KEY: &str = "greptime:fulltext";
pub const INVERTED_INDEX_KEY: &str = "greptime:inverted_index";
/// Key used to store skip options in arrow field's metadata.
pub const SKIPPING_INDEX_KEY: &str = "greptime:skipping_index";
pub const JSON_STRUCTURE_SETTINGS_KEY: &str = "greptime:json:structure_settings";
/// Keys used in fulltext options
pub const COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE: &str = "enable";
@@ -397,38 +394,18 @@ impl ColumnSchema {
Ok(())
}
pub fn extension_type<E>(&self) -> Result<Option<E>>
where
E: ExtensionType,
{
let extension_type_name = self.metadata.get(EXTENSION_TYPE_NAME_KEY);
if extension_type_name.map(|s| s.as_str()) == Some(E::NAME) {
let extension_metadata = self.metadata.get(EXTENSION_TYPE_METADATA_KEY);
let extension_metadata =
E::deserialize_metadata(extension_metadata.map(|s| s.as_str()))
.context(ArrowMetadataSnafu)?;
let extension = E::try_new(&self.data_type.as_arrow_type(), extension_metadata)
.context(ArrowMetadataSnafu)?;
Ok(Some(extension))
} else {
Ok(None)
}
pub fn json_structure_settings(&self) -> Result<Option<JsonStructureSettings>> {
self.metadata
.get(JSON_STRUCTURE_SETTINGS_KEY)
.map(|json| serde_json::from_str(json).context(error::DeserializeSnafu { json }))
.transpose()
}
pub fn with_extension_type<E>(&mut self, extension_type: &E) -> Result<()>
where
E: ExtensionType,
{
self.metadata
.insert(EXTENSION_TYPE_NAME_KEY.to_string(), E::NAME.to_string());
if let Some(extension_metadata) = extension_type.serialize_metadata() {
self.metadata
.insert(EXTENSION_TYPE_METADATA_KEY.to_string(), extension_metadata);
}
pub fn with_json_structure_settings(&mut self, settings: &JsonStructureSettings) -> Result<()> {
self.metadata.insert(
JSON_STRUCTURE_SETTINGS_KEY.to_string(),
serde_json::to_string(settings).context(error::SerializeSnafu)?,
);
Ok(())
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::str::FromStr;
use std::sync::Arc;
@@ -31,12 +31,9 @@ use crate::scalars::ScalarVectorBuilder;
use crate::type_id::LogicalTypeId;
use crate::types::{ListType, StructField, StructType};
use crate::value::Value;
use crate::vectors::json::builder::JsonVectorBuilder;
use crate::vectors::{BinaryVectorBuilder, MutableVector};
pub const JSON_TYPE_NAME: &str = "Json";
const JSON_PLAIN_FIELD_NAME: &str = "__plain__";
const JSON_PLAIN_FIELD_METADATA_KEY: &str = "is_plain_json";
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default)]
pub enum JsonFormat {
@@ -57,46 +54,28 @@ impl JsonType {
Self { format }
}
pub(crate) fn empty() -> Self {
Self {
format: JsonFormat::Native(Box::new(ConcreteDataType::null_datatype())),
}
}
// TODO(LFC): remove "allow unused"
#[allow(unused)]
/// Make json type a struct type, by:
/// - if the json is an object, its entries are mapped to struct fields, obviously;
/// - if not, the json is one of bool, number, string or array, make it a special field called
/// [JSON_PLAIN_FIELD_NAME] with metadata [JSON_PLAIN_FIELD_METADATA_KEY] = `"true"` in a
/// struct with only that field.
/// "__plain" in a struct with only that field.
pub(crate) fn as_struct_type(&self) -> StructType {
match &self.format {
JsonFormat::Jsonb => StructType::default(),
JsonFormat::Native(inner) => match inner.as_ref() {
ConcreteDataType::Struct(t) => t.clone(),
x => {
let mut field =
StructField::new(JSON_PLAIN_FIELD_NAME.to_string(), x.clone(), true);
field.insert_metadata(JSON_PLAIN_FIELD_METADATA_KEY, true);
StructType::new(Arc::new(vec![field]))
}
x => StructType::new(Arc::new(vec![StructField::new(
"__plain".to_string(),
x.clone(),
true,
)])),
},
}
}
/// Check if this json type is the special "plain" one.
/// See [JsonType::as_struct_type].
pub(crate) fn is_plain_json(&self) -> bool {
let JsonFormat::Native(box ConcreteDataType::Struct(t)) = &self.format else {
return true;
};
let fields = t.fields();
let Some((single, [])) = fields.split_first() else {
return false;
};
single.name() == JSON_PLAIN_FIELD_NAME
&& single.metadata(JSON_PLAIN_FIELD_METADATA_KEY) == Some("true")
}
// TODO(LFC): remove "allow unused"
#[allow(unused)]
/// Try to merge this json type with others, error on datatype conflict.
pub(crate) fn merge(&mut self, other: &JsonType) -> Result<()> {
match (&self.format, &other.format) {
@@ -112,47 +91,6 @@ impl JsonType {
.fail(),
}
}
pub(crate) fn is_mergeable(&self, other: &JsonType) -> bool {
match (&self.format, &other.format) {
(JsonFormat::Jsonb, JsonFormat::Jsonb) => true,
(JsonFormat::Native(this), JsonFormat::Native(that)) => {
is_mergeable(this.as_ref(), that.as_ref())
}
_ => false,
}
}
}
fn is_mergeable(this: &ConcreteDataType, that: &ConcreteDataType) -> bool {
fn is_mergeable_struct(this: &StructType, that: &StructType) -> bool {
let this_fields = this.fields();
let this_fields = this_fields
.iter()
.map(|x| (x.name(), x))
.collect::<HashMap<_, _>>();
for that_field in that.fields().iter() {
if let Some(this_field) = this_fields.get(that_field.name())
&& !is_mergeable(this_field.data_type(), that_field.data_type())
{
return false;
}
}
true
}
match (this, that) {
(this, that) if this == that => true,
(ConcreteDataType::List(this), ConcreteDataType::List(that)) => {
is_mergeable(this.item_type(), that.item_type())
}
(ConcreteDataType::Struct(this), ConcreteDataType::Struct(that)) => {
is_mergeable_struct(this, that)
}
(ConcreteDataType::Null(_), _) | (_, ConcreteDataType::Null(_)) => true,
_ => false,
}
}
fn merge(this: &ConcreteDataType, that: &ConcreteDataType) -> Result<ConcreteDataType> {
@@ -228,10 +166,7 @@ impl DataType for JsonType {
}
fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
match self.format {
JsonFormat::Jsonb => Box::new(BinaryVectorBuilder::with_capacity(capacity)),
JsonFormat::Native(_) => Box::new(JsonVectorBuilder::with_capacity(capacity)),
}
Box::new(BinaryVectorBuilder::with_capacity(capacity))
}
fn try_cast(&self, from: Value) -> Option<Value> {
@@ -291,12 +226,10 @@ mod tests {
let result = json_type.merge(other);
match (result, expected) {
(Ok(()), Ok(expected)) => {
assert_eq!(json_type.name(), expected);
assert!(json_type.is_mergeable(other));
assert_eq!(json_type.name(), expected)
}
(Err(err), Err(expected)) => {
assert_eq!(err.to_string(), expected);
assert!(!json_type.is_mergeable(other));
assert_eq!(err.to_string(), expected)
}
_ => unreachable!(),
}

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::sync::Arc;
use arrow::datatypes::{DataType as ArrowDataType, Field};
@@ -47,15 +46,6 @@ impl TryFrom<&Fields> for StructType {
}
}
impl<const N: usize> From<[StructField; N]> for StructType {
fn from(value: [StructField; N]) -> Self {
let value: Box<[StructField]> = Box::new(value);
Self {
fields: Arc::new(value.into_vec()),
}
}
}
impl DataType for StructType {
fn name(&self) -> String {
format!(
@@ -118,7 +108,6 @@ pub struct StructField {
name: String,
data_type: ConcreteDataType,
nullable: bool,
metadata: BTreeMap<String, String>,
}
impl StructField {
@@ -127,7 +116,6 @@ impl StructField {
name,
data_type,
nullable,
metadata: BTreeMap::new(),
}
}
@@ -147,25 +135,11 @@ impl StructField {
self.nullable
}
pub(crate) fn insert_metadata(&mut self, key: impl ToString, value: impl ToString) {
self.metadata.insert(key.to_string(), value.to_string());
}
pub(crate) fn metadata(&self, key: &str) -> Option<&str> {
self.metadata.get(key).map(String::as_str)
}
pub fn to_df_field(&self) -> Field {
let metadata = self
.metadata
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
Field::new(
self.name.clone(),
self.data_type.as_arrow_type(),
self.nullable,
)
.with_metadata(metadata)
}
}

View File

@@ -873,12 +873,6 @@ impl From<&[u8]> for Value {
}
}
impl From<()> for Value {
fn from(_: ()) -> Self {
Value::Null
}
}
impl TryFrom<Value> for serde_json::Value {
type Error = serde_json::Error;

View File

@@ -35,7 +35,6 @@ mod duration;
mod eq;
mod helper;
mod interval;
pub(crate) mod json;
mod list;
mod null;
pub(crate) mod operations;

View File

@@ -464,14 +464,6 @@ impl Helper {
}
}
#[cfg(test)]
pub(crate) fn pretty_print(vector: VectorRef) -> String {
let array = vector.to_arrow_array();
arrow::util::pretty::pretty_format_columns(&vector.vector_type_name(), &[array])
.map(|x| x.to_string())
.unwrap_or_else(|e| e.to_string())
}
#[cfg(test)]
mod tests {
use arrow::array::{

View File

@@ -1,15 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod builder;

View File

@@ -1,485 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use snafu::OptionExt;
use crate::data_type::ConcreteDataType;
use crate::error::{Result, TryFromValueSnafu, UnsupportedOperationSnafu};
use crate::prelude::{ValueRef, Vector, VectorRef};
use crate::types::JsonType;
use crate::value::StructValueRef;
use crate::vectors::{MutableVector, StructVectorBuilder};
struct JsonStructsBuilder {
json_type: JsonType,
inner: StructVectorBuilder,
}
impl JsonStructsBuilder {
fn new(json_type: JsonType, capacity: usize) -> Self {
let struct_type = json_type.as_struct_type();
let inner = StructVectorBuilder::with_type_and_capacity(struct_type, capacity);
Self { json_type, inner }
}
fn len(&self) -> usize {
self.inner.len()
}
fn push(&mut self, value: &ValueRef) -> Result<()> {
if self.json_type.is_plain_json() {
let value = ValueRef::Struct(StructValueRef::RefList {
val: vec![value.clone()],
fields: self.json_type.as_struct_type(),
});
self.inner.try_push_value_ref(&value)
} else {
self.inner.try_push_value_ref(value)
}
}
/// Try to merge (and consume the data of) other json vector builder into this one.
/// Note that the other builder's json type must be able to be merged with this one's
/// (this one's json type has all the fields in other one's, and no datatypes conflict).
/// Normally this is guaranteed, as long as json values are pushed through [JsonVectorBuilder].
fn try_merge(&mut self, other: &mut JsonStructsBuilder) -> Result<()> {
debug_assert!(self.json_type.is_mergeable(&other.json_type));
fn helper(this: &mut StructVectorBuilder, that: &mut StructVectorBuilder) -> Result<()> {
let that_len = that.len();
if let Some(x) = that.mut_null_buffer().finish() {
this.mut_null_buffer().append_buffer(&x)
} else {
this.mut_null_buffer().append_n_non_nulls(that_len);
}
let that_fields = that.struct_type().fields();
let mut that_builders = that_fields
.iter()
.zip(that.mut_value_builders().iter_mut())
.map(|(field, builder)| (field.name(), builder))
.collect::<HashMap<_, _>>();
for (field, this_builder) in this
.struct_type()
.fields()
.iter()
.zip(this.mut_value_builders().iter_mut())
{
if let Some(that_builder) = that_builders.get_mut(field.name()) {
if field.data_type().is_struct() {
let this = this_builder
.as_mut_any()
.downcast_mut::<StructVectorBuilder>()
// Safety: a struct datatype field must be corresponding to a struct vector builder.
.unwrap();
let that = that_builder
.as_mut_any()
.downcast_mut::<StructVectorBuilder>()
// Safety: other builder with same field name must have same datatype,
// ensured because the two json types are mergeable.
.unwrap();
helper(this, that)?;
} else {
let vector = that_builder.to_vector();
this_builder.extend_slice_of(vector.as_ref(), 0, vector.len())?;
}
} else {
this_builder.push_nulls(that_len);
}
}
Ok(())
}
helper(&mut self.inner, &mut other.inner)
}
/// Same as [JsonStructsBuilder::try_merge], but does not consume the other builder's data.
fn try_merge_cloned(&mut self, other: &JsonStructsBuilder) -> Result<()> {
debug_assert!(self.json_type.is_mergeable(&other.json_type));
fn helper(this: &mut StructVectorBuilder, that: &StructVectorBuilder) -> Result<()> {
let that_len = that.len();
if let Some(x) = that.null_buffer().finish_cloned() {
this.mut_null_buffer().append_buffer(&x)
} else {
this.mut_null_buffer().append_n_non_nulls(that_len);
}
let that_fields = that.struct_type().fields();
let that_builders = that_fields
.iter()
.zip(that.value_builders().iter())
.map(|(field, builder)| (field.name(), builder))
.collect::<HashMap<_, _>>();
for (field, this_builder) in this
.struct_type()
.fields()
.iter()
.zip(this.mut_value_builders().iter_mut())
{
if let Some(that_builder) = that_builders.get(field.name()) {
if field.data_type().is_struct() {
let this = this_builder
.as_mut_any()
.downcast_mut::<StructVectorBuilder>()
// Safety: a struct datatype field must be corresponding to a struct vector builder.
.unwrap();
let that = that_builder
.as_any()
.downcast_ref::<StructVectorBuilder>()
// Safety: other builder with same field name must have same datatype,
// ensured because the two json types are mergeable.
.unwrap();
helper(this, that)?;
} else {
let vector = that_builder.to_vector_cloned();
this_builder.extend_slice_of(vector.as_ref(), 0, vector.len())?;
}
} else {
this_builder.push_nulls(that_len);
}
}
Ok(())
}
helper(&mut self.inner, &other.inner)
}
}
/// The vector builder for json type values.
///
/// Json type are dynamic, to some degree (as long as they can be merged into each other). So are
/// json values. Json values are physically stored in struct vectors, which require the types of
/// struct values to be fixed inside a certain struct vector. So to resolve "dynamic" vs "fixed"
/// datatype problem, in this builder, each type of json value gets its own struct vector builder.
/// Once new json type value is pushing into this builder, it creates a new "child" builder for it.
///
/// Given the "mixed" nature of the values stored in this builder, to produce the json vector, a
/// "merge" operation is performed. The "merge" is to iterate over all the "child" builders, and fill
/// nulls for missing json fields. The final vector's json type is fixed to be the "merge" of all
/// pushed json types.
pub(crate) struct JsonVectorBuilder {
merged_type: JsonType,
capacity: usize,
builders: Vec<JsonStructsBuilder>,
}
impl JsonVectorBuilder {
pub(crate) fn with_capacity(capacity: usize) -> Self {
Self {
merged_type: JsonType::empty(),
capacity,
builders: vec![],
}
}
fn try_create_new_builder(&mut self, json_type: &JsonType) -> Result<&mut JsonStructsBuilder> {
self.merged_type.merge(json_type)?;
let builder = JsonStructsBuilder::new(json_type.clone(), self.capacity);
self.builders.push(builder);
let len = self.builders.len();
Ok(&mut self.builders[len - 1])
}
}
impl MutableVector for JsonVectorBuilder {
fn data_type(&self) -> ConcreteDataType {
ConcreteDataType::Json(self.merged_type.clone())
}
fn len(&self) -> usize {
self.builders.iter().map(|x| x.len()).sum()
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_mut_any(&mut self) -> &mut dyn Any {
self
}
fn to_vector(&mut self) -> VectorRef {
// Fast path:
if self.builders.len() == 1 {
return self.builders[0].inner.to_vector();
}
let mut unified_jsons = JsonStructsBuilder::new(self.merged_type.clone(), self.capacity);
for builder in self.builders.iter_mut() {
unified_jsons
.try_merge(builder)
// Safety: the "unified_jsons" has the merged json type from all the builders,
// so it should merge them without errors.
.unwrap_or_else(|e| panic!("failed to merge json builders, error: {e}"));
}
unified_jsons.inner.to_vector()
}
fn to_vector_cloned(&self) -> VectorRef {
// Fast path:
if self.builders.len() == 1 {
return self.builders[0].inner.to_vector_cloned();
}
let mut unified_jsons = JsonStructsBuilder::new(self.merged_type.clone(), self.capacity);
for builder in self.builders.iter() {
unified_jsons
.try_merge_cloned(builder)
// Safety: the "unified_jsons" has the merged json type from all the builders,
// so it should merge them without errors.
.unwrap_or_else(|e| panic!("failed to merge json builders, error: {e}"));
}
unified_jsons.inner.to_vector_cloned()
}
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
let data_type = value.data_type();
let json_type = data_type.as_json().with_context(|| TryFromValueSnafu {
reason: format!("expected json value, got {value:?}"),
})?;
let builder = match self.builders.last_mut() {
Some(last) => {
if &last.json_type != json_type {
self.try_create_new_builder(json_type)?
} else {
last
}
}
None => self.try_create_new_builder(json_type)?,
};
let ValueRef::Json(value) = value else {
// Safety: json datatype value must be the value of json.
unreachable!()
};
builder.push(value)
}
fn push_null(&mut self) {
let null_json_value = ValueRef::Json(Box::new(ValueRef::Null));
self.try_push_value_ref(&null_json_value)
// Safety: learning from the method "try_push_value_ref", a null json value should be
// always able to push into any json vectors.
.unwrap_or_else(|e| {
panic!("failed to push null json value: {null_json_value:?}, error: {e}")
});
}
fn extend_slice_of(&mut self, _: &dyn Vector, _: usize, _: usize) -> Result<()> {
UnsupportedOperationSnafu {
op: "extend_slice_of",
vector_type: "JsonVector",
}
.fail()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data_type::DataType;
use crate::json::JsonStructureSettings;
use crate::vectors::helper::pretty_print;
fn push(json: &str, builder: &mut JsonVectorBuilder, expected: std::result::Result<(), &str>) {
let settings = JsonStructureSettings::Structured(None);
let json: serde_json::Value = serde_json::from_str(json).unwrap();
let value = settings.encode(json).unwrap();
let value = value.as_value_ref();
let result = builder.try_push_value_ref(&value);
match (result, expected) {
(Ok(()), Ok(())) => (),
(Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
_ => unreachable!(),
}
}
#[test]
fn test_push_plain_jsons() -> Result<()> {
let jsons = vec!["1", "2", r#""s""#, "[true]"];
let results = vec![
Ok(()),
Ok(()),
Err(
"Failed to merge JSON datatype: datatypes have conflict, this: Int64, that: String",
),
Err(
"Failed to merge JSON datatype: datatypes have conflict, this: Int64, that: List<Boolean>",
),
];
let mut builder = JsonVectorBuilder::with_capacity(1);
for (json, result) in jsons.into_iter().zip(results.into_iter()) {
push(json, &mut builder, result);
}
let vector = builder.to_vector();
let expected = r#"
+----------------+
| StructVector |
+----------------+
| {__plain__: 1} |
| {__plain__: 2} |
+----------------+"#;
assert_eq!(pretty_print(vector), expected.trim());
Ok(())
}
#[test]
fn test_push_json_objects() -> Result<()> {
let jsons = vec![
r#"{
"s": "a",
"list": [1, 2, 3]
}"#,
r#"{
"list": [4],
"s": "b"
}"#,
r#"{
"s": "c",
"float": 0.9
}"#,
r#"{
"float": 0.8,
"s": "d"
}"#,
r#"{
"float": 0.7,
"int": -1
}"#,
r#"{
"int": 0,
"float": 0.6
}"#,
r#"{
"int": 1,
"object": {"hello": "world", "timestamp": 1761523200000}
}"#,
r#"{
"object": {"hello": "greptime", "timestamp": 1761523201000},
"int": 2
}"#,
r#"{
"object": {"timestamp": 1761523202000},
"nested": {"a": {"b": {"b": {"a": "abba"}}}}
}"#,
r#"{
"nested": {"a": {"b": {"a": {"b": "abab"}}}},
"object": {"timestamp": 1761523203000}
}"#,
];
let mut builder = JsonVectorBuilder::with_capacity(1);
for json in jsons {
push(json, &mut builder, Ok(()));
}
assert_eq!(builder.len(), 10);
// test children builders:
assert_eq!(builder.builders.len(), 6);
let expect_types = [
r#"Json<Struct<"list": List<Int64>, "s": String>>"#,
r#"Json<Struct<"float": Float64, "s": String>>"#,
r#"Json<Struct<"float": Float64, "int": Int64>>"#,
r#"Json<Struct<"int": Int64, "object": Struct<"hello": String, "timestamp": Int64>>>"#,
r#"Json<Struct<"nested": Struct<"a": Struct<"b": Struct<"b": Struct<"a": String>>>>, "object": Struct<"timestamp": Int64>>>"#,
r#"Json<Struct<"nested": Struct<"a": Struct<"b": Struct<"a": Struct<"b": String>>>>, "object": Struct<"timestamp": Int64>>>"#,
];
let expect_vectors = [
r#"
+-------------------------+
| StructVector |
+-------------------------+
| {list: [1, 2, 3], s: a} |
| {list: [4], s: b} |
+-------------------------+"#,
r#"
+--------------------+
| StructVector |
+--------------------+
| {float: 0.9, s: c} |
| {float: 0.8, s: d} |
+--------------------+"#,
r#"
+-----------------------+
| StructVector |
+-----------------------+
| {float: 0.7, int: -1} |
| {float: 0.6, int: 0} |
+-----------------------+"#,
r#"
+---------------------------------------------------------------+
| StructVector |
+---------------------------------------------------------------+
| {int: 1, object: {hello: world, timestamp: 1761523200000}} |
| {int: 2, object: {hello: greptime, timestamp: 1761523201000}} |
+---------------------------------------------------------------+"#,
r#"
+------------------------------------------------------------------------+
| StructVector |
+------------------------------------------------------------------------+
| {nested: {a: {b: {b: {a: abba}}}}, object: {timestamp: 1761523202000}} |
+------------------------------------------------------------------------+"#,
r#"
+------------------------------------------------------------------------+
| StructVector |
+------------------------------------------------------------------------+
| {nested: {a: {b: {a: {b: abab}}}}, object: {timestamp: 1761523203000}} |
+------------------------------------------------------------------------+"#,
];
for (builder, (expect_type, expect_vector)) in builder
.builders
.iter()
.zip(expect_types.into_iter().zip(expect_vectors.into_iter()))
{
assert_eq!(builder.json_type.name(), expect_type);
let vector = builder.inner.to_vector_cloned();
assert_eq!(pretty_print(vector), expect_vector.trim());
}
// test final merged json type:
let expected = r#"Json<Struct<"float": Float64, "int": Int64, "list": List<Int64>, "nested": Struct<"a": Struct<"b": Struct<"a": Struct<"b": String>, "b": Struct<"a": String>>>>, "object": Struct<"hello": String, "timestamp": Int64>, "s": String>>"#;
assert_eq!(builder.data_type().to_string(), expected);
// test final produced vector:
let expected = r#"
+-------------------------------------------------------------------------------------------------------------------+
| StructVector |
+-------------------------------------------------------------------------------------------------------------------+
| {float: , int: , list: [1, 2, 3], nested: , object: , s: a} |
| {float: , int: , list: [4], nested: , object: , s: b} |
| {float: 0.9, int: , list: , nested: , object: , s: c} |
| {float: 0.8, int: , list: , nested: , object: , s: d} |
| {float: 0.7, int: -1, list: , nested: , object: , s: } |
| {float: 0.6, int: 0, list: , nested: , object: , s: } |
| {float: , int: 1, list: , nested: , object: {hello: world, timestamp: 1761523200000}, s: } |
| {float: , int: 2, list: , nested: , object: {hello: greptime, timestamp: 1761523201000}, s: } |
| {float: , int: , list: , nested: {a: {b: {a: , b: {a: abba}}}}, object: {hello: , timestamp: 1761523202000}, s: } |
| {float: , int: , list: , nested: {a: {b: {a: {b: abab}, b: }}}, object: {hello: , timestamp: 1761523203000}, s: } |
+-------------------------------------------------------------------------------------------------------------------+"#;
let vector = builder.to_vector_cloned();
assert_eq!(pretty_print(vector), expected.trim());
let vector = builder.to_vector();
assert_eq!(pretty_print(vector), expected.trim());
Ok(())
}
}

View File

@@ -323,26 +323,6 @@ impl StructVectorBuilder {
}
self.null_buffer.append_null();
}
pub(crate) fn struct_type(&self) -> &StructType {
&self.fields
}
pub(crate) fn value_builders(&self) -> &[Box<dyn MutableVector>] {
&self.value_builders
}
pub(crate) fn mut_value_builders(&mut self) -> &mut [Box<dyn MutableVector>] {
&mut self.value_builders
}
pub(crate) fn null_buffer(&self) -> &NullBufferBuilder {
&self.null_buffer
}
pub(crate) fn mut_null_buffer(&mut self) -> &mut NullBufferBuilder {
&mut self.null_buffer
}
}
impl MutableVector for StructVectorBuilder {

View File

@@ -23,7 +23,7 @@ use api::v1::query_request::Query;
use api::v1::{CreateTableExpr, QueryRequest};
use client::{Client, Database};
use common_error::ext::{BoxedError, ErrorExt};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_tls_config};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
@@ -123,10 +123,12 @@ impl FrontendClient {
let cfg = ChannelConfig::new()
.connect_timeout(batch_opts.grpc_conn_timeout)
.timeout(batch_opts.query_timeout);
let tls_config = load_tls_config(batch_opts.frontend_tls.as_ref())
.context(InvalidClientConfigSnafu)?;
ChannelManager::with_config(cfg, tls_config)
if let Some(tls) = &batch_opts.frontend_tls {
let cfg = cfg.client_tls_config(tls.clone());
ChannelManager::with_tls_config(cfg).context(InvalidClientConfigSnafu)?
} else {
ChannelManager::with_config(cfg)
}
},
auth,
query,

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
@@ -28,7 +28,6 @@ use common_function::scalars::udf::create_udf;
use common_query::{Output, OutputData};
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::util;
use common_telemetry::warn;
use datafusion::dataframe::DataFrame;
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::context::SessionContext;
@@ -43,9 +42,8 @@ use servers::error::{
};
use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams};
use servers::otlp::trace::{
DURATION_NANO_COLUMN, KEY_OTEL_STATUS_ERROR_KEY, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN,
SPAN_KIND_COLUMN, SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, SPAN_STATUS_CODE, SPAN_STATUS_ERROR,
TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
DURATION_NANO_COLUMN, SERVICE_NAME_COLUMN, SPAN_ATTRIBUTES_COLUMN, SPAN_KIND_COLUMN,
SPAN_KIND_PREFIX, SPAN_NAME_COLUMN, TIMESTAMP_COLUMN, TRACE_ID_COLUMN,
};
use servers::query_handler::JaegerQueryHandler;
use session::context::QueryContextRef;
@@ -265,7 +263,7 @@ impl JaegerQueryHandler for Instance {
self.query_engine(),
vec![wildcard()],
filters,
vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
vec![],
None,
None,
vec![],
@@ -324,7 +322,6 @@ async fn query_trace_table(
})?;
let is_data_model_v1 = table
.clone()
.table_info()
.meta
.options
@@ -333,14 +330,6 @@ async fn query_trace_table(
.map(|s| s.as_str())
== Some(TABLE_DATA_MODEL_TRACE_V1);
// collect to set
let col_names = table
.table_info()
.meta
.field_column_names()
.map(|s| format!("\"{}\"", s))
.collect::<HashSet<String>>();
let df_context = create_df_context(query_engine)?;
let dataframe = df_context
@@ -353,7 +342,7 @@ async fn query_trace_table(
let dataframe = filters
.into_iter()
.chain(tags.map_or(Ok(vec![]), |t| {
tags_filters(&dataframe, t, is_data_model_v1, &col_names)
tags_filters(&dataframe, t, is_data_model_v1)
})?)
.try_fold(dataframe, |df, expr| {
df.filter(expr).context(DataFusionSnafu)
@@ -483,73 +472,23 @@ fn json_tag_filters(
Ok(filters)
}
/// Helper function to check if span_key or resource_key exists in col_names and create an expression.
/// If neither exists, logs a warning and returns None.
#[inline]
fn check_col_and_build_expr<F>(
span_key: String,
resource_key: String,
key: &str,
col_names: &HashSet<String>,
expr_builder: F,
) -> Option<Expr>
where
F: FnOnce(String) -> Expr,
{
if col_names.contains(&span_key) {
return Some(expr_builder(span_key));
}
if col_names.contains(&resource_key) {
return Some(expr_builder(resource_key));
}
warn!("tag key {} not found in table columns", key);
None
}
fn flatten_tag_filters(
tags: HashMap<String, JsonValue>,
col_names: &HashSet<String>,
) -> ServerResult<Vec<Expr>> {
fn flatten_tag_filters(tags: HashMap<String, JsonValue>) -> ServerResult<Vec<Expr>> {
let filters = tags
.into_iter()
.filter_map(|(key, value)| {
if key == KEY_OTEL_STATUS_ERROR_KEY && value == JsonValue::Bool(true) {
return Some(col(SPAN_STATUS_CODE).eq(lit(SPAN_STATUS_ERROR)));
}
// TODO(shuiyisong): add more precise mapping from key to col name
let span_key = format!("\"span_attributes.{}\"", key);
let resource_key = format!("\"resource_attributes.{}\"", key);
let key = format!("\"span_attributes.{}\"", key);
match value {
JsonValue::String(value) => {
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).eq(lit(value))
})
}
JsonValue::String(value) => Some(col(key).eq(lit(value))),
JsonValue::Number(value) => {
if value.is_f64() {
// safe to unwrap as checked previously
let value = value.as_f64().unwrap();
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).eq(lit(value))
})
Some(col(key).eq(lit(value.as_f64().unwrap())))
} else {
let value = value.as_i64().unwrap();
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).eq(lit(value))
})
Some(col(key).eq(lit(value.as_i64().unwrap())))
}
}
JsonValue::Bool(value) => {
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).eq(lit(value))
})
}
JsonValue::Null => {
check_col_and_build_expr(span_key, resource_key, &key, col_names, |k| {
col(k).is_null()
})
}
JsonValue::Bool(value) => Some(col(key).eq(lit(value))),
JsonValue::Null => Some(col(key).is_null()),
// not supported at the moment
JsonValue::Array(_value) => None,
JsonValue::Object(_value) => None,
@@ -563,10 +502,9 @@ fn tags_filters(
dataframe: &DataFrame,
tags: HashMap<String, JsonValue>,
is_data_model_v1: bool,
col_names: &HashSet<String>,
) -> ServerResult<Vec<Expr>> {
if is_data_model_v1 {
flatten_tag_filters(tags, col_names)
flatten_tag_filters(tags)
} else {
json_tag_filters(dataframe, tags)
}

View File

@@ -36,7 +36,7 @@ async fn run() {
.timeout(Duration::from_secs(3))
.connect_timeout(Duration::from_secs(5))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config, None);
let channel_manager = ChannelManager::with_config(config);
let mut meta_client = MetaClientBuilder::datanode_default_options(id)
.channel_manager(channel_manager)
.build();

View File

@@ -101,7 +101,7 @@ pub async fn create_meta_client(
if let MetaClientType::Frontend = client_type {
let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout);
builder = builder.ddl_channel_manager(ChannelManager::with_config(ddl_config, None));
builder = builder.ddl_channel_manager(ChannelManager::with_config(ddl_config));
if let Some(plugins) = plugins {
let region_follower = plugins.get::<RegionFollowerClientRef>();
if let Some(region_follower) = region_follower {
@@ -112,8 +112,8 @@ pub async fn create_meta_client(
}
builder = builder
.channel_manager(ChannelManager::with_config(base_config, None))
.heartbeat_channel_manager(ChannelManager::with_config(heartbeat_config, None));
.channel_manager(ChannelManager::with_config(base_config))
.heartbeat_channel_manager(ChannelManager::with_config(heartbeat_config));
let mut meta_client = builder.build();

View File

@@ -72,10 +72,7 @@ serde.workspace = true
serde_json.workspace = true
servers.workspace = true
snafu.workspace = true
sqlx = { workspace = true, features = [
"mysql",
"chrono",
], optional = true }
sqlx = { workspace = true, optional = true }
store-api.workspace = true
strum.workspace = true
table.workspace = true

View File

@@ -375,14 +375,12 @@ pub struct MetasrvNodeInfo {
// The node total cpu millicores
#[serde(default)]
pub total_cpu_millicores: i64,
// The node total memory bytes
#[serde(default)]
// The node total memory bytes
pub total_memory_bytes: i64,
/// The node build cpu usage millicores
#[serde(default)]
pub cpu_usage_millicores: i64,
/// The node build memory usage bytes
#[serde(default)]
pub memory_usage_bytes: i64,
// The node hostname
#[serde(default)]
@@ -860,18 +858,3 @@ impl Metasrv {
}
}
}
#[cfg(test)]
mod tests {
use crate::metasrv::MetasrvNodeInfo;
#[test]
fn test_deserialize_metasrv_node_info() {
let str = r#"{"addr":"127.0.0.1:4002","version":"0.1.0","git_commit":"1234567890","start_time_ms":1715145600}"#;
let node_info: MetasrvNodeInfo = serde_json::from_str(str).unwrap();
assert_eq!(node_info.addr, "127.0.0.1:4002");
assert_eq!(node_info.version, "0.1.0");
assert_eq!(node_info.git_commit, "1234567890");
assert_eq!(node_info.start_time_ms, 1715145600);
}
}

View File

@@ -134,7 +134,7 @@ pub async fn mock(
.timeout(Duration::from_secs(10))
.connect_timeout(Duration::from_secs(10))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config, None);
let channel_manager = ChannelManager::with_config(config);
// Move client to an option so we can _move_ the inner value
// on the first attempt to connect. All other attempts will fail.

View File

@@ -17,9 +17,7 @@ use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::ddl::utils::parse_region_wal_options;
use common_meta::instruction::{
Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
};
use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_meta::lock_key::RemoteWalLock;
use common_meta::wal_options_allocator::extract_topic_from_wal_options;
use common_procedure::{Context as ProcedureContext, Status};
@@ -133,19 +131,19 @@ impl UpgradeCandidateRegion {
None
};
let upgrade_instruction = Instruction::UpgradeRegions(vec![
let upgrade_instruction = Instruction::UpgradeRegion(
UpgradeRegion {
region_id,
last_entry_id,
metadata_last_entry_id,
replay_timeout,
replay_timeout: Some(replay_timeout),
location_id: Some(ctx.persistent_ctx.from_peer.id),
replay_entry_id: None,
metadata_replay_entry_id: None,
}
.with_replay_entry_id(checkpoint.map(|c| c.entry_id))
.with_metadata_replay_entry_id(checkpoint.and_then(|c| c.metadata_entry_id)),
]);
);
Ok(upgrade_instruction)
}
@@ -195,7 +193,11 @@ impl UpgradeCandidateRegion {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply
let InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready,
exists,
error,
}) = reply
else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
@@ -203,13 +205,6 @@ impl UpgradeCandidateRegion {
}
.fail();
};
// TODO(weny): handle multiple replies.
let UpgradeRegionReply {
ready,
exists,
error,
..
} = &replies[0];
// Notes: The order of handling is important.
if error.is_some() {

View File

@@ -18,7 +18,7 @@ use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage};
use common_meta::instruction::{
DowngradeRegionReply, DowngradeRegionsReply, FlushRegionReply, InstructionReply, SimpleReply,
UpgradeRegionReply, UpgradeRegionsReply,
UpgradeRegionReply,
};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_route::TableRouteValue;
@@ -212,14 +212,11 @@ pub fn new_upgrade_region_reply(
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::UpgradeRegions(
UpgradeRegionsReply::single(UpgradeRegionReply {
region_id: RegionId::new(0, 0),
ready,
exists,
error,
}),
))
serde_json::to_string(&InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready,
exists,
error,
}))
.unwrap(),
)),
}

View File

@@ -46,7 +46,6 @@ tracing.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
mito2 = { workspace = true, features = ["test"] }
common-wal = { workspace = true }
[package.metadata.cargo-udeps.ignore]
normal = ["aquamarine"]

View File

@@ -37,7 +37,7 @@ use common_error::status_code::StatusCode;
use common_runtime::RepeatedTask;
use mito2::engine::MitoEngine;
pub(crate) use options::IndexOptions;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
pub(crate) use state::MetricEngineState;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
@@ -46,9 +46,7 @@ use store_api::region_engine::{
RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{
BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
};
use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use crate::config::EngineConfig;
@@ -144,17 +142,6 @@ impl RegionEngine for MetricEngine {
.map_err(BoxedError::new)
}
async fn handle_batch_catchup_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionCatchupRequest)>,
) -> Result<BatchResponses, BoxedError> {
self.inner
.handle_batch_catchup_requests(parallelism, requests)
.await
.map_err(BoxedError::new)
}
async fn handle_batch_ddl_requests(
&self,
batch_request: BatchRegionDdlRequest,
@@ -260,25 +247,7 @@ impl RegionEngine for MetricEngine {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Catchup(_) => {
let mut response = self
.inner
.handle_batch_catchup_requests(
1,
vec![(region_id, RegionCatchupRequest::default())],
)
.await
.map_err(BoxedError::new)?;
debug_assert_eq!(response.len(), 1);
let (resp_region_id, response) = response
.pop()
.context(error::UnexpectedRequestSnafu {
reason: "expected 1 response, but got zero responses",
})
.map_err(BoxedError::new)?;
debug_assert_eq!(region_id, resp_region_id);
return response;
}
RegionRequest::Catchup(req) => self.inner.catchup_region(region_id, req).await,
RegionRequest::BulkInserts(_) => {
// todo(hl): find a way to support bulk inserts in metric engine.
UnsupportedRegionRequestSnafu { request }.fail()
@@ -527,17 +496,13 @@ mod test {
use std::collections::HashMap;
use common_telemetry::info;
use common_wal::options::{KafkaWalOptions, WalOptions};
use mito2::sst::location::region_dir_from_table_dir;
use mito2::test_util::{kafka_log_store_factory, prepare_test_for_kafka_log_store};
use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use store_api::mito_engine_options::WAL_OPTIONS_KEY;
use store_api::region_request::{
PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
};
use super::*;
use crate::maybe_skip_kafka_log_store_integration_test;
use crate::test_util::TestEnv;
#[tokio::test]
@@ -718,128 +683,4 @@ mod test {
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::RegionNotFound);
}
#[tokio::test]
async fn test_catchup_regions() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_log_store_integration_test!();
let kafka_log_store_factory = kafka_log_store_factory().unwrap();
let mito_env = mito2::test_util::TestEnv::new()
.await
.with_log_store_factory(kafka_log_store_factory.clone());
let env = TestEnv::with_mito_env(mito_env).await;
let table_dir = |region_id| format!("table/{region_id}");
let mut physical_region_ids = vec![];
let mut logical_region_ids = vec![];
let num_topics = 3;
let num_physical_regions = 8;
let num_logical_regions = 16;
let parallelism = 2;
let mut topics = Vec::with_capacity(num_topics);
for _ in 0..num_topics {
let topic = prepare_test_for_kafka_log_store(&kafka_log_store_factory)
.await
.unwrap();
topics.push(topic);
}
let topic_idx = |id| (id as usize) % num_topics;
// Creates physical regions
for i in 0..num_physical_regions {
let physical_region_id = RegionId::new(1, i);
physical_region_ids.push(physical_region_id);
let wal_options = WalOptions::Kafka(KafkaWalOptions {
topic: topics[topic_idx(i)].clone(),
});
env.create_physical_region(
physical_region_id,
&table_dir(physical_region_id),
vec![(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&wal_options).unwrap(),
)],
)
.await;
// Creates logical regions for each physical region
for j in 0..num_logical_regions {
let logical_region_id = RegionId::new(1024 + i, j);
logical_region_ids.push(logical_region_id);
env.create_logical_region(physical_region_id, logical_region_id)
.await;
}
}
let metric_engine = env.metric();
// Closes all regions
for region_id in logical_region_ids.iter().chain(physical_region_ids.iter()) {
metric_engine
.handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
}
// Opens all regions and skip the wal
let requests = physical_region_ids
.iter()
.enumerate()
.map(|(idx, region_id)| {
let mut options = HashMap::new();
let wal_options = WalOptions::Kafka(KafkaWalOptions {
topic: topics[topic_idx(idx as u32)].clone(),
});
options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new());
options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&wal_options).unwrap(),
);
(
*region_id,
RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
table_dir: table_dir(*region_id),
path_type: PathType::Bare,
options: options.clone(),
skip_wal_replay: true,
checkpoint: None,
},
)
})
.collect::<Vec<_>>();
info!("Open batch regions with parallelism: {parallelism}");
metric_engine
.handle_batch_open_requests(parallelism, requests)
.await
.unwrap();
{
let state = metric_engine.inner.state.read().unwrap();
for logical_region in &logical_region_ids {
assert!(!state.logical_regions().contains_key(logical_region));
}
}
let catch_requests = physical_region_ids
.iter()
.map(|region_id| {
(
*region_id,
RegionCatchupRequest {
set_writable: true,
..Default::default()
},
)
})
.collect::<Vec<_>>();
metric_engine
.handle_batch_catchup_requests(parallelism, catch_requests)
.await
.unwrap();
{
let state = metric_engine.inner.state.read().unwrap();
for logical_region in &logical_region_ids {
assert!(state.logical_regions().contains_key(logical_region));
}
}
}
}

View File

@@ -324,9 +324,9 @@ mod test {
let physical_region_id2 = RegionId::new(1024, 1);
let logical_region_id1 = RegionId::new(1025, 0);
let logical_region_id2 = RegionId::new(1025, 1);
env.create_physical_region(physical_region_id1, "/test_dir1", vec![])
env.create_physical_region(physical_region_id1, "/test_dir1")
.await;
env.create_physical_region(physical_region_id2, "/test_dir2", vec![])
env.create_physical_region(physical_region_id2, "/test_dir2")
.await;
let region_create_request1 = crate::test_util::create_logical_region_request(

View File

@@ -12,45 +12,51 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_telemetry::debug;
use snafu::{OptionExt, ResultExt};
use store_api::region_engine::{BatchResponses, RegionEngine};
use store_api::region_request::{RegionCatchupRequest, ReplayCheckpoint};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AffectedRows, RegionCatchupRequest, RegionRequest, ReplayCheckpoint,
};
use store_api::storage::RegionId;
use crate::engine::MetricEngineInner;
use crate::error::{BatchCatchupMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result};
use crate::error::{
MitoCatchupOperationSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
};
use crate::utils;
impl MetricEngineInner {
pub async fn handle_batch_catchup_requests(
pub async fn catchup_region(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionCatchupRequest)>,
) -> Result<BatchResponses> {
let mut all_requests = Vec::with_capacity(requests.len() * 2);
let mut physical_region_options_list = Vec::with_capacity(requests.len());
region_id: RegionId,
req: RegionCatchupRequest,
) -> Result<AffectedRows> {
if !self.is_physical_region(region_id) {
return UnsupportedRegionRequestSnafu {
request: RegionRequest::Catchup(req),
}
.fail();
}
let data_region_id = utils::to_data_region_id(region_id);
let physical_region_options = *self
.state
.read()
.unwrap()
.physical_region_states()
.get(&data_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?
.options();
for (region_id, req) in requests {
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);
let physical_region_options = *self
.state
.read()
.unwrap()
.physical_region_states()
.get(&data_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?
.options();
physical_region_options_list.push((data_region_id, physical_region_options));
all_requests.push((
let metadata_region_id = utils::to_metadata_region_id(region_id);
// TODO(weny): improve the catchup, we can read the wal entries only once.
debug!("Catchup metadata region {metadata_region_id}");
self.mito
.handle_request(
metadata_region_id,
RegionCatchupRequest {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: req.set_writable,
entry_id: req.metadata_entry_id,
metadata_entry_id: None,
@@ -59,11 +65,16 @@ impl MetricEngineInner {
entry_id: c.metadata_entry_id.unwrap_or_default(),
metadata_entry_id: None,
}),
},
));
all_requests.push((
}),
)
.await
.context(MitoCatchupOperationSnafu)?;
debug!("Catchup data region {data_region_id}");
self.mito
.handle_request(
data_region_id,
RegionCatchupRequest {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: req.set_writable,
entry_id: req.entry_id,
metadata_entry_id: None,
@@ -72,45 +83,14 @@ impl MetricEngineInner {
entry_id: c.entry_id,
metadata_entry_id: None,
}),
},
));
}
let mut results = self
.mito
.handle_batch_catchup_requests(parallelism, all_requests)
}),
)
.await
.context(BatchCatchupMitoRegionSnafu {})?
.into_iter()
.collect::<HashMap<_, _>>();
.context(MitoCatchupOperationSnafu)
.map(|response| response.affected_rows)?;
let mut responses = Vec::with_capacity(physical_region_options_list.len());
for (physical_region_id, physical_region_options) in physical_region_options_list {
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let data_region_id = utils::to_data_region_id(physical_region_id);
let metadata_region_result = results.remove(&metadata_region_id);
let data_region_result = results.remove(&data_region_id);
// Pass the optional `metadata_region_result` and `data_region_result` to
// `recover_physical_region_with_results`. This function handles errors for each
// catchup physical region request, allowing the process to continue with the
// remaining regions even if some requests fail.
let response = self
.recover_physical_region_with_results(
metadata_region_result,
data_region_result,
physical_region_id,
physical_region_options,
// Note: We intentionally dont close the region if recovery fails.
// Closing it here might confuse the region server since it links RegionIds to Engines.
// If recovery didnt succeed, the region should stay open.
false,
)
.await
.map_err(BoxedError::new);
responses.push((physical_region_id, response));
}
Ok(responses)
self.recover_states(region_id, physical_region_options)
.await?;
Ok(0)
}
}

View File

@@ -828,9 +828,9 @@ mod test {
let physical_region_id2 = RegionId::new(1024, 1);
let logical_region_id1 = RegionId::new(1025, 0);
let logical_region_id2 = RegionId::new(1025, 1);
env.create_physical_region(physical_region_id1, "/test_dir1", vec![])
env.create_physical_region(physical_region_id1, "/test_dir1")
.await;
env.create_physical_region(physical_region_id2, "/test_dir2", vec![])
env.create_physical_region(physical_region_id2, "/test_dir2")
.await;
let region_create_request1 =

View File

@@ -76,7 +76,7 @@ mod tests {
];
for (phy_region_id, logi_region_ids) in &phy_to_logi {
env.create_physical_region(*phy_region_id, &TestEnv::default_table_dir(), vec![])
env.create_physical_region(*phy_region_id, &TestEnv::default_table_dir())
.await;
for logi_region_id in logi_region_ids {
env.create_logical_region(*phy_region_id, *logi_region_id)
@@ -119,7 +119,6 @@ mod tests {
.index_file_path
.map(|path| path.replace(&e.file_id, "<file_id>"));
e.file_id = "<file_id>".to_string();
e.index_file_id = e.index_file_id.map(|_| "<index_file_id>".to_string());
format!("\n{:?}", e)
})
.sorted()
@@ -128,12 +127,12 @@ mod tests {
assert_eq!(
debug_format,
r#"
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3505, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3505, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#
);
// list from storage
let storage_entries = mito

View File

@@ -47,7 +47,6 @@ impl MetricEngineInner {
for (region_id, request) in requests {
if !request.is_physical_table() {
warn!("Skipping non-physical table open request: {region_id}");
continue;
}
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
@@ -73,19 +72,17 @@ impl MetricEngineInner {
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let data_region_id = utils::to_data_region_id(physical_region_id);
let metadata_region_result = results.remove(&metadata_region_id);
let data_region_result: Option<std::result::Result<RegionResponse, BoxedError>> =
results.remove(&data_region_id);
let data_region_result = results.remove(&data_region_id);
// Pass the optional `metadata_region_result` and `data_region_result` to
// `recover_physical_region_with_results`. This function handles errors for each
// `open_physical_region_with_results`. This function handles errors for each
// open physical region request, allowing the process to continue with the
// remaining regions even if some requests fail.
let response = self
.recover_physical_region_with_results(
.open_physical_region_with_results(
metadata_region_result,
data_region_result,
physical_region_id,
physical_region_options,
true,
)
.await
.map_err(BoxedError::new);
@@ -110,13 +107,12 @@ impl MetricEngineInner {
}
}
pub(crate) async fn recover_physical_region_with_results(
async fn open_physical_region_with_results(
&self,
metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
data_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
physical_region_id: RegionId,
physical_region_options: PhysicalRegionOptions,
close_region_on_failure: bool,
) -> Result<RegionResponse> {
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let data_region_id = utils::to_data_region_id(physical_region_id);
@@ -140,10 +136,8 @@ impl MetricEngineInner {
.recover_states(physical_region_id, physical_region_options)
.await
{
if close_region_on_failure {
self.close_physical_region_on_recovery_failure(physical_region_id)
.await;
}
self.close_physical_region_on_recovery_failure(physical_region_id)
.await;
return Err(err);
}
Ok(data_region_response)

View File

@@ -50,13 +50,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to batch catchup mito region"))]
BatchCatchupMitoRegion {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("No open region result for region {}", region_id))]
NoOpenRegionResult {
region_id: RegionId,
@@ -156,6 +149,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Mito catchup operation fails"))]
MitoCatchupOperation {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Mito sync operation fails"))]
MitoSyncOperation {
source: BoxedError,
@@ -357,11 +357,11 @@ impl ErrorExt for Error {
| CloseMitoRegion { source, .. }
| MitoReadOperation { source, .. }
| MitoWriteOperation { source, .. }
| MitoCatchupOperation { source, .. }
| MitoFlushOperation { source, .. }
| MitoDeleteOperation { source, .. }
| MitoSyncOperation { source, .. }
| BatchOpenMitoRegion { source, .. }
| BatchCatchupMitoRegion { source, .. } => source.status_code(),
| BatchOpenMitoRegion { source, .. } => source.status_code(),
EncodePrimaryKey { source, .. } => source.status_code(),

View File

@@ -76,17 +76,6 @@ impl TestEnv {
}
}
/// Returns a new env with specific `prefix` and `mito_env` for test.
pub async fn with_mito_env(mut mito_env: MitoTestEnv) -> Self {
let mito = mito_env.create_engine(MitoConfig::default()).await;
let metric = MetricEngine::try_new(mito.clone(), EngineConfig::default()).unwrap();
Self {
mito_env,
mito,
metric,
}
}
pub fn data_home(&self) -> String {
let env_root = self.mito_env.data_home().to_string_lossy().to_string();
join_dir(&env_root, "data")
@@ -136,12 +125,7 @@ impl TestEnv {
}
/// Create regions in [MetricEngine] with specific `physical_region_id`.
pub async fn create_physical_region(
&self,
physical_region_id: RegionId,
table_dir: &str,
options: Vec<(String, String)>,
) {
pub async fn create_physical_region(&self, physical_region_id: RegionId, table_dir: &str) {
let region_create_request = RegionCreateRequest {
engine: METRIC_ENGINE_NAME.to_string(),
column_metadatas: vec![
@@ -167,7 +151,6 @@ impl TestEnv {
primary_key: vec![],
options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.chain(options.into_iter())
.collect(),
table_dir: table_dir.to_string(),
path_type: PathType::Bare, // Use Bare path type for engine regions
@@ -248,7 +231,7 @@ impl TestEnv {
/// under [`default_logical_region_id`].
pub async fn init_metric_region(&self) {
let physical_region_id = self.default_physical_region_id();
self.create_physical_region(physical_region_id, &Self::default_table_dir(), vec![])
self.create_physical_region(physical_region_id, &Self::default_table_dir())
.await;
let logical_region_id = self.default_logical_region_id();
self.create_logical_region(physical_region_id, logical_region_id)
@@ -441,22 +424,6 @@ pub fn build_rows(num_tags: usize, num_rows: usize) -> Vec<Row> {
rows
}
#[macro_export]
/// Skip the test if the environment variable `GT_KAFKA_ENDPOINTS` is not set.
///
/// The format of the environment variable is:
/// ```text
/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093
/// ```
macro_rules! maybe_skip_kafka_log_store_integration_test {
() => {
if std::env::var("GT_KAFKA_ENDPOINTS").is_err() {
common_telemetry::warn!("The kafka endpoints is empty, skipping the test");
return;
}
};
}
#[cfg(test)]
mod test {
use object_store::ObjectStore;

View File

@@ -477,8 +477,6 @@ fn flat_merge_iterator_bench(c: &mut Criterion) {
bulk_part.batch.clone(),
context.clone(),
None, // No sequence filter
1024, // 1024 hosts per part
None, // No mem_scan_metrics
);
iters.push(Box::new(iter) as _);
}
@@ -536,13 +534,8 @@ fn bulk_part_record_batch_iter_filter(c: &mut Criterion) {
);
// Create and iterate over BulkPartRecordBatchIter with filter
let iter = BulkPartRecordBatchIter::new(
record_batch_with_filter.clone(),
context,
None, // No sequence filter
4096, // 4096 hosts
None, // No mem_scan_metrics
);
let iter =
BulkPartRecordBatchIter::new(record_batch_with_filter.clone(), context, None);
// Consume all batches
for batch_result in iter {
@@ -566,13 +559,7 @@ fn bulk_part_record_batch_iter_filter(c: &mut Criterion) {
);
// Create and iterate over BulkPartRecordBatchIter
let iter = BulkPartRecordBatchIter::new(
record_batch_no_filter.clone(),
context,
None, // No sequence filter
4096, // 4096 hosts
None, // No mem_scan_metrics
);
let iter = BulkPartRecordBatchIter::new(record_batch_no_filter.clone(), context, None);
// Consume all batches
for batch_result in iter {

View File

@@ -20,11 +20,12 @@ use criterion::{Criterion, black_box, criterion_group, criterion_main};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use mito2::memtable::simple_bulk_memtable::SimpleBulkMemtable;
use mito2::memtable::{KeyValues, Memtable, MemtableRanges, RangesOptions};
use mito2::memtable::{KeyValues, Memtable, MemtableRanges};
use mito2::read;
use mito2::read::Source;
use mito2::read::dedup::DedupReader;
use mito2::read::merge::MergeReaderBuilder;
use mito2::read::scan_region::PredicateGroup;
use mito2::region::options::MergeMode;
use mito2::test_util::column_metadata_to_column_schema;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
@@ -125,7 +126,9 @@ fn create_memtable_with_rows(num_batches: usize) -> SimpleBulkMemtable {
}
async fn flush(mem: &SimpleBulkMemtable) {
let MemtableRanges { ranges, .. } = mem.ranges(None, RangesOptions::for_flush()).unwrap();
let MemtableRanges { ranges, .. } = mem
.ranges(None, PredicateGroup::default(), None, true)
.unwrap();
let mut source = if ranges.len() == 1 {
let only_range = ranges.into_values().next().unwrap();

View File

@@ -213,11 +213,7 @@ impl AccessLayer {
}
/// Deletes a SST file (and its index file if it has one) with given file id.
pub(crate) async fn delete_sst(
&self,
region_file_id: &RegionFileId,
index_file_id: &RegionFileId,
) -> Result<()> {
pub(crate) async fn delete_sst(&self, region_file_id: &RegionFileId) -> Result<()> {
let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type);
self.object_store
.delete(&path)
@@ -226,7 +222,7 @@ impl AccessLayer {
file_id: region_file_id.file_id(),
})?;
let path = location::index_file_path(&self.table_dir, *index_file_id, self.path_type);
let path = location::index_file_path(&self.table_dir, *region_file_id, self.path_type);
self.object_store
.delete(&path)
.await

View File

@@ -305,7 +305,6 @@ impl CompactionScheduler {
&options,
&request.current_version.options.compaction,
request.current_version.options.append_mode,
Some(self.engine_config.max_background_compactions),
);
let region_id = request.region_id();
let CompactionRequest {

View File

@@ -35,7 +35,7 @@ use crate::access_layer::{
};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::picker::{PickerOutput, new_picker};
use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_ttl};
use crate::compaction::{CompactionSstReaderBuilder, find_ttl};
use crate::config::MitoConfig;
use crate::error::{
EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result,
@@ -313,126 +313,6 @@ pub trait Compactor: Send + Sync + 'static {
/// DefaultCompactor is the default implementation of Compactor.
pub struct DefaultCompactor;
impl DefaultCompactor {
/// Merge a single compaction output into SST files.
async fn merge_single_output(
compaction_region: CompactionRegion,
output: CompactionOutput,
write_opts: WriteOptions,
) -> Result<Vec<FileMeta>> {
let region_id = compaction_region.region_id;
let storage = compaction_region.region_options.storage.clone();
let index_options = compaction_region
.current_version
.options
.index_options
.clone();
let append_mode = compaction_region.current_version.options.append_mode;
let merge_mode = compaction_region.current_version.options.merge_mode();
let flat_format = compaction_region
.region_options
.sst_format
.map(|format| format == FormatType::Flat)
.unwrap_or(
compaction_region
.engine_config
.default_experimental_flat_format,
);
let index_config = compaction_region.engine_config.index.clone();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone();
let input_file_names = output
.inputs
.iter()
.map(|f| f.file_id().to_string())
.join(",");
let max_sequence = output
.inputs
.iter()
.map(|f| f.meta_ref().sequence)
.max()
.flatten();
let builder = CompactionSstReaderBuilder {
metadata: compaction_region.region_metadata.clone(),
sst_layer: compaction_region.access_layer.clone(),
cache: compaction_region.cache_manager.clone(),
inputs: &output.inputs,
append_mode,
filter_deleted: output.filter_deleted,
time_range: output.output_time_range,
merge_mode,
};
let source = if flat_format {
let reader = builder.build_flat_sst_reader().await?;
Either::Right(FlatSource::Stream(reader))
} else {
let reader = builder.build_sst_reader().await?;
Either::Left(Source::Reader(reader))
};
let mut metrics = Metrics::new(WriteType::Compaction);
let region_metadata = compaction_region.region_metadata.clone();
let sst_infos = compaction_region
.access_layer
.write_sst(
SstWriteRequest {
op_type: OperationType::Compact,
metadata: region_metadata.clone(),
source,
cache_manager: compaction_region.cache_manager.clone(),
storage,
max_sequence: max_sequence.map(NonZero::get),
index_options,
index_config,
inverted_index_config,
fulltext_index_config,
bloom_filter_index_config,
},
&write_opts,
&mut metrics,
)
.await?;
// Convert partition expression once outside the map
let partition_expr = match &region_metadata.partition_expr {
None => None,
Some(json_str) if json_str.is_empty() => None,
Some(json_str) => PartitionExpr::from_json_str(json_str).with_context(|_| {
InvalidPartitionExprSnafu {
expr: json_str.clone(),
}
})?,
};
let output_files = sst_infos
.into_iter()
.map(|sst_info| FileMeta {
region_id,
file_id: sst_info.file_id,
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
index_file_id: None,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: max_sequence,
partition_expr: partition_expr.clone(),
num_series: sst_info.num_series,
})
.collect::<Vec<_>>();
let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(",");
info!(
"Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
region_id, input_file_names, output_file_names, flat_format, metrics
);
metrics.observe();
Ok(output_files)
}
}
#[async_trait::async_trait]
impl Compactor for DefaultCompactor {
async fn merge_ssts(
@@ -444,22 +324,130 @@ impl Compactor for DefaultCompactor {
let mut compacted_inputs =
Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
let internal_parallelism = compaction_region.max_parallelism.max(1);
let compaction_time_window = picker_output.time_window_size;
for output in picker_output.outputs.drain(..) {
let inputs_to_remove: Vec<_> =
output.inputs.iter().map(|f| f.meta_ref().clone()).collect();
compacted_inputs.extend(inputs_to_remove.iter().cloned());
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
let write_opts = WriteOptions {
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
max_file_size: picker_output.max_file_size,
..Default::default()
};
futs.push(Self::merge_single_output(
compaction_region.clone(),
output,
write_opts,
));
let region_metadata = compaction_region.region_metadata.clone();
let sst_layer = compaction_region.access_layer.clone();
let region_id = compaction_region.region_id;
let cache_manager = compaction_region.cache_manager.clone();
let storage = compaction_region.region_options.storage.clone();
let index_options = compaction_region
.current_version
.options
.index_options
.clone();
let append_mode = compaction_region.current_version.options.append_mode;
let merge_mode = compaction_region.current_version.options.merge_mode();
let flat_format = compaction_region
.region_options
.sst_format
.map(|format| format == FormatType::Flat)
.unwrap_or(
compaction_region
.engine_config
.default_experimental_flat_format,
);
let index_config = compaction_region.engine_config.index.clone();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
let bloom_filter_index_config =
compaction_region.engine_config.bloom_filter_index.clone();
let max_sequence = output
.inputs
.iter()
.map(|f| f.meta_ref().sequence)
.max()
.flatten();
let region_metadata_for_filemeta = region_metadata.clone();
futs.push(async move {
let input_file_names = output
.inputs
.iter()
.map(|f| f.file_id().to_string())
.join(",");
let builder = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
sst_layer: sst_layer.clone(),
cache: cache_manager.clone(),
inputs: &output.inputs,
append_mode,
filter_deleted: output.filter_deleted,
time_range: output.output_time_range,
merge_mode,
};
let source = if flat_format {
let reader = builder.build_flat_sst_reader().await?;
either::Right(FlatSource::Stream(reader))
} else {
let reader = builder.build_sst_reader().await?;
either::Left(Source::Reader(reader))
};
let mut metrics = Metrics::new(WriteType::Compaction);
let sst_infos = sst_layer
.write_sst(
SstWriteRequest {
op_type: OperationType::Compact,
metadata: region_metadata,
source,
cache_manager,
storage,
max_sequence: max_sequence.map(NonZero::get),
index_options,
index_config,
inverted_index_config,
fulltext_index_config,
bloom_filter_index_config,
},
&write_opts,
&mut metrics,
)
.await?;
// Convert partition expression once outside the map
let partition_expr = match &region_metadata_for_filemeta.partition_expr {
None => None,
Some(json_str) if json_str.is_empty() => None,
Some(json_str) => {
PartitionExpr::from_json_str(json_str).with_context(|_| {
InvalidPartitionExprSnafu {
expr: json_str.clone(),
}
})?
}
};
let output_files = sst_infos
.into_iter()
.map(|sst_info| FileMeta {
region_id,
file_id: sst_info.file_id,
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: max_sequence,
partition_expr: partition_expr.clone(),
num_series: sst_info.num_series,
})
.collect::<Vec<_>>();
let output_file_names =
output_files.iter().map(|f| f.file_id.to_string()).join(",");
info!(
"Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
region_id, input_file_names, output_file_names, flat_format, metrics
);
metrics.observe();
Ok(output_files)
});
}
let mut output_files = Vec::with_capacity(futs.len());
while !futs.is_empty() {
@@ -477,8 +465,6 @@ impl Compactor for DefaultCompactor {
output_files.extend(metas.into_iter().flatten());
}
// In case of remote compaction, we still allow the region edit after merge to
// clean expired ssts.
let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
inputs.extend(
picker_output
@@ -490,7 +476,7 @@ impl Compactor for DefaultCompactor {
Ok(MergeOutput {
files_to_add: output_files,
files_to_remove: inputs,
compaction_time_window: Some(compaction_time_window),
compaction_time_window: Some(picker_output.time_window_size),
})
}
@@ -535,7 +521,6 @@ impl Compactor for DefaultCompactor {
&compact_request_options,
&compaction_region.region_options.compaction,
compaction_region.region_options.append_mode,
None,
)
.pick(compaction_region);

View File

@@ -125,7 +125,6 @@ pub fn new_picker(
compact_request_options: &compact_request::Options,
compaction_options: &CompactionOptions,
append_mode: bool,
max_background_tasks: Option<usize>,
) -> Arc<dyn Picker> {
if let compact_request::Options::StrictWindow(window) = compact_request_options {
let window = if window.window_seconds == 0 {
@@ -141,7 +140,6 @@ pub fn new_picker(
time_window_seconds: twcs_opts.time_window_seconds(),
max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()),
append_mode,
max_background_tasks,
}) as Arc<_>,
}
}

View File

@@ -16,22 +16,19 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Instant;
use common_telemetry::{error, info, warn};
use itertools::Itertools;
use common_telemetry::{error, info};
use snafu::ResultExt;
use tokio::sync::mpsc;
use crate::compaction::compactor::{CompactionRegion, Compactor};
use crate::compaction::picker::{CompactionTask, PickerOutput};
use crate::error::CompactRegionSnafu;
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::manifest::action::RegionEdit;
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::region::RegionLeaderState;
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult,
WorkerRequest, WorkerRequestWithTime,
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
WorkerRequestWithTime,
};
use crate::sst::file::FileMeta;
use crate::worker::WorkerListener;
use crate::{error, metrics};
@@ -81,93 +78,9 @@ impl CompactionTaskImpl {
.for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
}
/// Remove expired ssts files, update manifest immediately
/// and apply the edit to region version.
///
/// This function logs errors but does not stop the compaction process if removal fails.
async fn remove_expired(
&self,
compaction_region: &CompactionRegion,
expired_files: Vec<FileMeta>,
) {
let region_id = compaction_region.region_id;
let expired_files_str = expired_files.iter().map(|f| f.file_id).join(",");
let (expire_delete_sender, expire_delete_listener) = tokio::sync::oneshot::channel();
// Update manifest to remove expired SSTs
let edit = RegionEdit {
files_to_add: Vec::new(),
files_to_remove: expired_files,
timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
committed_sequence: None,
};
// 1. Update manifest
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = compaction_region
.manifest_ctx
.update_manifest(RegionLeaderState::Writable, action_list)
.await
{
error!(
e;
"Failed to update manifest for expired files removal, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
);
return;
}
// 2. Notify region worker loop to remove expired files from region version.
self.send_to_worker(WorkerRequest::Background {
region_id,
notify: BackgroundNotify::RegionEdit(RegionEditResult {
region_id,
sender: expire_delete_sender,
edit,
result: Ok(()),
}),
})
.await;
if let Err(e) = expire_delete_listener
.await
.context(error::RecvSnafu)
.flatten()
{
warn!(
e;
"Failed to remove expired files from region version, region: {region_id}, files: [{expired_files_str}]. Compaction will continue."
);
return;
}
info!(
"Successfully removed expired files, region: {region_id}, files: [{expired_files_str}]"
);
}
async fn handle_expiration_and_compaction(&mut self) -> error::Result<RegionEdit> {
async fn handle_compaction(&mut self) -> error::Result<RegionEdit> {
self.mark_files_compacting(true);
// 1. In case of local compaction, we can delete expired ssts in advance.
if !self.picker_output.expired_ssts.is_empty() {
let remove_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["remove_expired"])
.start_timer();
let expired_ssts = self
.picker_output
.expired_ssts
.drain(..)
.map(|f| f.meta_ref().clone())
.collect();
// remove_expired logs errors but doesn't stop compaction
self.remove_expired(&self.compaction_region, expired_ssts)
.await;
remove_timer.observe_duration();
}
// 2. Merge inputs
let merge_timer = COMPACTION_STAGE_ELAPSED
.with_label_values(&["merge"])
.start_timer();
@@ -239,7 +152,7 @@ impl CompactionTaskImpl {
#[async_trait::async_trait]
impl CompactionTask for CompactionTaskImpl {
async fn run(&mut self) {
let notify = match self.handle_expiration_and_compaction().await {
let notify = match self.handle_compaction().await {
Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.compaction_region.region_id,
senders: std::mem::take(&mut self.waiters),
@@ -265,66 +178,3 @@ impl CompactionTask for CompactionTaskImpl {
.await;
}
}
#[cfg(test)]
mod tests {
use store_api::storage::FileId;
use crate::compaction::picker::PickerOutput;
use crate::compaction::test_util::new_file_handle;
#[test]
fn test_picker_output_with_expired_ssts() {
// Test that PickerOutput correctly includes expired_ssts
// This verifies that expired SSTs are properly identified and included
// in the picker output, which is then handled by handle_expiration_and_compaction
let file_ids = (0..3).map(|_| FileId::random()).collect::<Vec<_>>();
let expired_ssts = vec![
new_file_handle(file_ids[0], 0, 999, 0),
new_file_handle(file_ids[1], 1000, 1999, 0),
];
let picker_output = PickerOutput {
outputs: vec![],
expired_ssts: expired_ssts.clone(),
time_window_size: 3600,
max_file_size: None,
};
// Verify expired_ssts are included
assert_eq!(picker_output.expired_ssts.len(), 2);
assert_eq!(
picker_output.expired_ssts[0].file_id(),
expired_ssts[0].file_id()
);
assert_eq!(
picker_output.expired_ssts[1].file_id(),
expired_ssts[1].file_id()
);
}
#[test]
fn test_picker_output_without_expired_ssts() {
// Test that PickerOutput works correctly when there are no expired SSTs
let picker_output = PickerOutput {
outputs: vec![],
expired_ssts: vec![],
time_window_size: 3600,
max_file_size: None,
};
// Verify empty expired_ssts
assert!(picker_output.expired_ssts.is_empty());
}
// Note: Testing remove_expired() directly requires extensive mocking of:
// - manifest_ctx (ManifestContext)
// - request_sender (mpsc::Sender<WorkerRequestWithTime>)
// - WorkerRequest handling
//
// The behavior is tested indirectly through integration tests:
// - remove_expired() logs errors but doesn't stop compaction
// - handle_expiration_and_compaction() continues even if remove_expired() encounters errors
// - The function is designed to be non-blocking for compaction
}

View File

@@ -76,7 +76,6 @@ pub fn new_file_handle_with_size_and_sequence(
file_size,
available_indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
num_rows: 0,
num_row_groups: 0,
num_series: 0,

View File

@@ -18,7 +18,7 @@ use std::fmt::Debug;
use std::num::NonZeroU64;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
use common_telemetry::info;
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
@@ -48,8 +48,6 @@ pub struct TwcsPicker {
pub max_output_file_size: Option<u64>,
/// Whether the target region is in append mode.
pub append_mode: bool,
/// Max background compaction tasks.
pub max_background_tasks: Option<usize>,
}
impl TwcsPicker {
@@ -90,7 +88,7 @@ impl TwcsPicker {
// because after compaction there will be no overlapping files.
let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
if found_runs == 0 {
continue;
return output;
}
let inputs = if found_runs > 1 {
@@ -121,16 +119,6 @@ impl TwcsPicker {
filter_deleted,
output_time_range: None, // we do not enforce output time range in twcs compactions.
});
if let Some(max_background_tasks) = self.max_background_tasks
&& output.len() >= max_background_tasks
{
debug!(
"Region ({:?}) compaction task size larger than max background tasks({}), remaining tasks discarded",
region_id, max_background_tasks
);
break;
}
}
}
output
@@ -692,7 +680,6 @@ mod tests {
time_window_seconds: None,
max_output_file_size: None,
append_mode: false,
max_background_tasks: None,
}
.build_output(RegionId::from_u64(0), &mut windows, active_window);
@@ -844,185 +831,5 @@ mod tests {
}
}
#[test]
fn test_build_output_multiple_windows_with_zero_runs() {
let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
let files = [
// Window 0: Contains 3 files but not forming any runs (not enough files in sequence to reach trigger_file_num)
new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
// Window 3: Contains files that will form 2 runs
new_file_handle_with_sequence(file_ids[3], 3000, 3999, 0, 4),
new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
];
let mut windows = assign_to_windows(files.iter(), 3);
// Create picker with trigger_file_num of 4 so single files won't form runs in first window
let picker = TwcsPicker {
trigger_file_num: 4, // High enough to prevent runs in first window
time_window_seconds: Some(3),
max_output_file_size: None,
append_mode: false,
max_background_tasks: None,
};
let active_window = find_latest_window_in_seconds(files.iter(), 3);
let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
assert!(
!output.is_empty(),
"Should have output from windows with runs, even when one window has 0 runs"
);
let all_output_files: Vec<_> = output
.iter()
.flat_map(|o| o.inputs.iter())
.map(|f| f.file_id().file_id())
.collect();
assert!(
all_output_files.contains(&file_ids[3])
|| all_output_files.contains(&file_ids[4])
|| all_output_files.contains(&file_ids[5]),
"Output should contain files from the window with runs"
);
}
#[test]
fn test_build_output_single_window_zero_runs() {
let file_ids = (0..2).map(|_| FileId::random()).collect::<Vec<_>>();
let large_file_1 = new_file_handle_with_size_and_sequence(file_ids[0], 0, 999, 0, 1, 2000); // 2000 bytes
let large_file_2 = new_file_handle_with_size_and_sequence(file_ids[1], 0, 999, 0, 2, 2500); // 2500 bytes
let files = [large_file_1, large_file_2];
let mut windows = assign_to_windows(files.iter(), 3);
let picker = TwcsPicker {
trigger_file_num: 2,
time_window_seconds: Some(3),
max_output_file_size: Some(1000),
append_mode: true,
max_background_tasks: None,
};
let active_window = find_latest_window_in_seconds(files.iter(), 3);
let output = picker.build_output(RegionId::from_u64(456), &mut windows, active_window);
// Should return empty output (no compaction needed)
assert!(
output.is_empty(),
"Should return empty output when no runs are found after filtering"
);
}
#[test]
fn test_max_background_tasks_truncation() {
let file_ids = (0..10).map(|_| FileId::random()).collect::<Vec<_>>();
let max_background_tasks = 3;
// Create files across multiple windows that will generate multiple compaction outputs
let files = [
// Window 0: 4 files that will form a run
new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
// Window 3: 4 files that will form another run
new_file_handle_with_sequence(file_ids[4], 3000, 3999, 0, 5),
new_file_handle_with_sequence(file_ids[5], 3000, 3999, 0, 6),
new_file_handle_with_sequence(file_ids[6], 3000, 3999, 0, 7),
new_file_handle_with_sequence(file_ids[7], 3000, 3999, 0, 8),
// Window 6: 4 files that will form another run
new_file_handle_with_sequence(file_ids[8], 6000, 6999, 0, 9),
new_file_handle_with_sequence(file_ids[9], 6000, 6999, 0, 10),
];
let mut windows = assign_to_windows(files.iter(), 3);
let picker = TwcsPicker {
trigger_file_num: 4,
time_window_seconds: Some(3),
max_output_file_size: None,
append_mode: false,
max_background_tasks: Some(max_background_tasks),
};
let active_window = find_latest_window_in_seconds(files.iter(), 3);
let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
// Should have at most max_background_tasks outputs
assert!(
output.len() <= max_background_tasks,
"Output should be truncated to max_background_tasks: expected <= {}, got {}",
max_background_tasks,
output.len()
);
// Without max_background_tasks, should have more outputs
let picker_no_limit = TwcsPicker {
trigger_file_num: 4,
time_window_seconds: Some(3),
max_output_file_size: None,
append_mode: false,
max_background_tasks: None,
};
let mut windows_no_limit = assign_to_windows(files.iter(), 3);
let output_no_limit = picker_no_limit.build_output(
RegionId::from_u64(123),
&mut windows_no_limit,
active_window,
);
// Without limit, should have more outputs (if there are enough windows)
if output_no_limit.len() > max_background_tasks {
assert!(
output_no_limit.len() > output.len(),
"Without limit should have more outputs than with limit"
);
}
}
#[test]
fn test_max_background_tasks_no_truncation_when_under_limit() {
let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
let max_background_tasks = 10; // Larger than expected outputs
// Create files in one window that will generate one compaction output
let files = [
new_file_handle_with_sequence(file_ids[0], 0, 999, 0, 1),
new_file_handle_with_sequence(file_ids[1], 0, 999, 0, 2),
new_file_handle_with_sequence(file_ids[2], 0, 999, 0, 3),
new_file_handle_with_sequence(file_ids[3], 0, 999, 0, 4),
];
let mut windows = assign_to_windows(files.iter(), 3);
let picker = TwcsPicker {
trigger_file_num: 4,
time_window_seconds: Some(3),
max_output_file_size: None,
append_mode: false,
max_background_tasks: Some(max_background_tasks),
};
let active_window = find_latest_window_in_seconds(files.iter(), 3);
let output = picker.build_output(RegionId::from_u64(123), &mut windows, active_window);
// Should have all outputs since we're under the limit
assert!(
output.len() <= max_background_tasks,
"Output should be within limit"
);
// Should have at least one output
assert!(!output.is_empty(), "Should have at least one output");
}
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
}

View File

@@ -21,8 +21,6 @@ mod append_mode_test;
#[cfg(test)]
mod basic_test;
#[cfg(test)]
mod batch_catchup_test;
#[cfg(test)]
mod batch_open_test;
#[cfg(test)]
mod bump_committed_sequence_test;
@@ -93,7 +91,7 @@ use snafu::{OptionExt, ResultExt, ensure};
use store_api::ManifestVersion;
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::LogStore;
use store_api::logstore::provider::{KafkaProvider, Provider};
use store_api::logstore::provider::Provider;
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::metric_engine_consts::{
MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
@@ -102,9 +100,7 @@ use store_api::region_engine::{
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{
AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
use tokio::sync::{Semaphore, oneshot};
@@ -303,11 +299,6 @@ impl MitoEngine {
self.inner.workers.is_region_opening(region_id)
}
/// Returns true if the specific region is catching up.
pub fn is_region_catching_up(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_catching_up(region_id)
}
/// Returns the region disk/memory statistic.
pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
self.find_region(region_id)
@@ -509,21 +500,19 @@ impl MitoEngine {
return Vec::new();
};
let Some(index_file_id) = entry.index_file_id.as_ref() else {
return Vec::new();
};
let file_id = match FileId::parse_str(index_file_id) {
let file_id = match FileId::parse_str(&entry.file_id) {
Ok(file_id) => file_id,
Err(err) => {
warn!(
err;
"Failed to parse puffin index file id, table_dir: {}, file_id: {}",
entry.table_dir,
index_file_id
entry.file_id
);
return Vec::new();
}
};
let region_file_id = RegionFileId::new(entry.region_id, file_id);
let context = IndexEntryContext {
table_dir: &entry.table_dir,
@@ -533,7 +522,7 @@ impl MitoEngine {
region_number: entry.region_number,
region_group: entry.region_group,
region_sequence: entry.region_sequence,
file_id: index_file_id,
file_id: &entry.file_id,
index_file_size: entry.index_file_size,
node_id,
};
@@ -781,122 +770,6 @@ impl EngineInner {
Ok(responses)
}
async fn catchup_topic_regions(
&self,
provider: Provider,
region_requests: Vec<(RegionId, RegionCatchupRequest)>,
) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
let now = Instant::now();
let region_ids = region_requests
.iter()
.map(|(region_id, _)| *region_id)
.collect::<Vec<_>>();
let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
provider.clone(),
self.wal_raw_entry_reader.clone(),
&region_ids,
DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
);
let mut responses = Vec::with_capacity(region_requests.len());
for ((region_id, request), entry_receiver) in
region_requests.into_iter().zip(entry_receivers)
{
let (request, receiver) =
WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver));
self.workers.submit_to_worker(region_id, request).await?;
responses.push(async move { receiver.await.context(RecvSnafu)? });
}
// Wait for entries distribution.
let distribution =
common_runtime::spawn_global(async move { distributor.distribute().await });
// Wait for worker returns.
let responses = join_all(responses).await;
distribution.await.context(JoinSnafu)??;
let num_failure = responses.iter().filter(|r| r.is_err()).count();
info!(
"Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}",
region_ids.len() - num_failure,
// Safety: provider is kafka provider.
provider.as_kafka_provider().unwrap(),
num_failure,
now.elapsed(),
);
Ok(region_ids.into_iter().zip(responses).collect())
}
async fn handle_batch_catchup_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionCatchupRequest)>,
) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
let mut responses = Vec::with_capacity(requests.len());
let mut topic_regions: HashMap<Arc<KafkaProvider>, Vec<_>> = HashMap::new();
let mut remaining_region_requests = vec![];
for (region_id, request) in requests {
match self.workers.get_region(region_id) {
Some(region) => match region.provider.as_kafka_provider() {
Some(provider) => {
topic_regions
.entry(provider.clone())
.or_default()
.push((region_id, request));
}
None => {
remaining_region_requests.push((region_id, request));
}
},
None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())),
}
}
let semaphore = Arc::new(Semaphore::new(parallelism));
if !topic_regions.is_empty() {
let mut tasks = Vec::with_capacity(topic_regions.len());
for (provider, region_requests) in topic_regions {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
// Safety: semaphore must exist
let _permit = semaphore_moved.acquire().await.unwrap();
self.catchup_topic_regions(Provider::Kafka(provider), region_requests)
.await
})
}
let r = try_join_all(tasks).await?;
responses.extend(r.into_iter().flatten());
}
if !remaining_region_requests.is_empty() {
let mut tasks = Vec::with_capacity(remaining_region_requests.len());
let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
for (region_id, request) in remaining_region_requests {
let semaphore_moved = semaphore.clone();
region_ids.push(region_id);
tasks.push(async move {
// Safety: semaphore must exist
let _permit = semaphore_moved.acquire().await.unwrap();
let (request, receiver) =
WorkerRequest::new_catchup_region_request(region_id, request, None);
self.workers.submit_to_worker(region_id, request).await?;
receiver.await.context(RecvSnafu)?
})
}
let results = join_all(tasks).await;
responses.extend(region_ids.into_iter().zip(results));
}
Ok(responses)
}
/// Handles [RegionRequest] and return its executed result.
async fn handle_request(
&self,
@@ -1039,29 +912,6 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)
}
#[tracing::instrument(skip_all)]
async fn handle_batch_catchup_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionCatchupRequest)>,
) -> Result<BatchResponses, BoxedError> {
self.inner
.handle_batch_catchup_requests(parallelism, requests)
.await
.map(|responses| {
responses
.into_iter()
.map(|(region_id, response)| {
(
region_id,
response.map(RegionResponse::new).map_err(BoxedError::new),
)
})
.collect::<Vec<_>>()
})
.map_err(BoxedError::new)
}
#[tracing::instrument(skip_all)]
async fn handle_request(
&self,

View File

@@ -220,126 +220,3 @@ async fn test_append_mode_compaction_with_format(flat_format: bool) {
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
}
#[tokio::test]
async fn test_put_single_range() {
test_put_single_range_with_format(false).await;
test_put_single_range_with_format(true).await;
}
async fn test_put_single_range_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("append_mode", "true")
.build();
let table_dir = request.table_dir.clone();
let region_opts = request.options.clone();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// a, field 1, 2
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 1, 3, 1),
};
put_rows(&engine, region_id, rows).await;
// a, field 0, 1
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
// b, field 0, 1
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
// a, field 2, 3
let rows = Rows {
schema: column_schemas,
rows: build_rows_for_key("a", 2, 4, 2),
};
put_rows(&engine, region_id, rows).await;
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| a | 0.0 | 1970-01-01T00:00:00 |
| a | 1.0 | 1970-01-01T00:00:01 |
| a | 1.0 | 1970-01-01T00:00:01 |
| a | 2.0 | 1970-01-01T00:00:02 |
| a | 2.0 | 1970-01-01T00:00:02 |
| a | 3.0 | 1970-01-01T00:00:03 |
| b | 0.0 | 1970-01-01T00:00:00 |
| b | 1.0 | 1970-01-01T00:00:01 |
+-------+---------+---------------------+";
// Scans in parallel.
let mut scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(0, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
// Flushes and scans.
flush_region(&engine, region_id, None).await;
let mut scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(1, scanner.num_files());
assert_eq!(0, scanner.num_memtables());
scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
// Reopens engine.
let engine = env
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
},
)
.await;
// Reopens the region.
reopen_region(&engine, region_id, table_dir, false, region_opts).await;
let stream = engine
.scan_to_stream(region_id, ScanRequest::default())
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
}

View File

@@ -675,7 +675,6 @@ async fn test_region_usage_with_format(flat_format: bool) {
let region_stat = region.region_statistic();
assert!(region_stat.wal_size > 0);
assert_eq!(region_stat.num_rows, 10);
// delete some rows
let rows = Rows {
@@ -686,7 +685,6 @@ async fn test_region_usage_with_format(flat_format: bool) {
let region_stat = region.region_statistic();
assert!(region_stat.wal_size > 0);
assert_eq!(region_stat.num_rows, 13);
// flush region
flush_region(&engine, region_id, None).await;
@@ -861,9 +859,9 @@ async fn test_cache_null_primary_key_with_format(flat_format: bool) {
#[tokio::test]
async fn test_list_ssts() {
test_list_ssts_with_format(false, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#"
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
@@ -871,9 +869,9 @@ StorageSstEntry { file_path: "test/11_0000000002/index/<file_id>.puffin", file_s
StorageSstEntry { file_path: "test/22_0000000042/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/22_0000000042/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await;
test_list_ssts_with_format(true, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#"
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
@@ -945,7 +943,6 @@ async fn test_list_ssts_with_format(
.index_file_path
.map(|p| p.replace(&e.file_id, "<file_id>"));
e.file_id = "<file_id>".to_string();
e.index_file_id = e.index_file_id.map(|_| "<index_file_id>".to_string());
format!("\n{:?}", e)
})
.sorted()

View File

@@ -1,250 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions};
use rstest::rstest;
use rstest_reuse::apply;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{PathType, RegionCatchupRequest, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::test_util::{
CreateRequestBuilder, LogStoreFactory, TestEnv, build_rows, flush_region,
kafka_log_store_factory, prepare_test_for_kafka_log_store, put_rows, rows_schema,
single_kafka_log_store_factory,
};
#[apply(single_kafka_log_store_factory)]
async fn test_batch_catchup(factory: Option<LogStoreFactory>) {
test_batch_catchup_with_format(factory.clone(), false).await;
test_batch_catchup_with_format(factory, true).await;
}
async fn test_batch_catchup_with_format(factory: Option<LogStoreFactory>, flat_format: bool) {
common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};
let mut env = TestEnv::with_prefix("catchup-batch-regions")
.await
.with_log_store_factory(factory.clone());
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
// Prepares 3 topics for 8 regions
let num_topic = 3;
let mut topics = vec![];
for _ in 0..num_topic {
let topic = prepare_test_for_kafka_log_store(&factory).await.unwrap();
topics.push(topic);
}
let num_regions = 8u32;
let table_dir_fn = |region_id| format!("test/{region_id}");
let mut region_schema = HashMap::new();
let get_topic_idx = |id| (id - 1) % num_topic;
// Creates 8 regions and puts data into them
for id in 1..=num_regions {
let engine = engine.clone();
let topic_idx = get_topic_idx(id);
let topic = topics[topic_idx as usize].clone();
let region_id = RegionId::new(1, id);
let request = CreateRequestBuilder::new()
.table_dir(&table_dir_fn(region_id))
.kafka_topic(Some(topic))
.build();
let column_schemas = rows_schema(&request);
region_schema.insert(region_id, column_schemas);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
}
// Puts data into regions
let rows = 30;
for i in 0..rows {
for region_number in 1..=num_regions {
let region_id = RegionId::new(1, region_number);
let rows = Rows {
schema: region_schema[&region_id].clone(),
rows: build_rows(
(region_number as usize) * 120 + i as usize,
(region_number as usize) * 120 + i as usize + 1,
),
};
put_rows(&engine, region_id, rows).await;
if i % region_number == 0 {
flush_region(&engine, region_id, None).await;
}
}
}
let assert_result = |engine: MitoEngine| async move {
for i in 1..=num_regions {
let region_id = RegionId::new(1, i);
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let mut expected = String::new();
expected.push_str(
"+-------+---------+---------------------+\n| tag_0 | field_0 | ts |\n+-------+---------+---------------------+\n",
);
for row in 0..rows {
expected.push_str(&format!(
"| {} | {}.0 | 1970-01-01T00:{:02}:{:02} |\n",
i * 120 + row,
i * 120 + row,
2 * i,
row
));
}
expected.push_str("+-------+---------+---------------------+");
assert_eq!(expected, batches.pretty_print().unwrap());
}
};
assert_result(engine.clone()).await;
// Reopen engine.
let engine = env
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
},
)
.await;
let requests = (1..=num_regions)
.map(|id| {
let region_id = RegionId::new(1, id);
let topic_idx = get_topic_idx(id);
let topic = topics[topic_idx as usize].clone();
let mut options = HashMap::new();
options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { topic })).unwrap(),
);
(
region_id,
RegionOpenRequest {
engine: String::new(),
table_dir: table_dir_fn(region_id),
options,
skip_wal_replay: true,
path_type: PathType::Bare,
checkpoint: None,
},
)
})
.collect::<Vec<_>>();
let parallelism = 2;
let results = engine
.handle_batch_open_requests(parallelism, requests)
.await
.unwrap();
for (_, result) in results {
assert!(result.is_ok());
}
let requests = (1..=num_regions)
.map(|id| {
let region_id = RegionId::new(1, id);
(
region_id,
RegionCatchupRequest {
set_writable: true,
entry_id: None,
metadata_entry_id: None,
location_id: None,
checkpoint: None,
},
)
})
.collect::<Vec<_>>();
let results = engine
.handle_batch_catchup_requests(parallelism, requests)
.await
.unwrap();
for (_, result) in results {
assert!(result.is_ok());
}
assert_result(engine.clone()).await;
}
#[apply(single_kafka_log_store_factory)]
async fn test_batch_catchup_err(factory: Option<LogStoreFactory>) {
test_batch_catchup_err_with_format(factory.clone(), false).await;
test_batch_catchup_err_with_format(factory, true).await;
}
async fn test_batch_catchup_err_with_format(factory: Option<LogStoreFactory>, flat_format: bool) {
common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};
let mut env = TestEnv::with_prefix("catchup-regions-err")
.await
.with_log_store_factory(factory.clone());
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let num_regions = 3u32;
let requests = (1..num_regions)
.map(|id| {
let region_id = RegionId::new(1, id);
(
region_id,
RegionCatchupRequest {
set_writable: true,
entry_id: None,
metadata_entry_id: None,
location_id: None,
checkpoint: None,
},
)
})
.collect::<Vec<_>>();
let results = engine
.handle_batch_catchup_requests(4, requests)
.await
.unwrap();
for (_, result) in results {
assert_eq!(
result.unwrap_err().status_code(),
StatusCode::RegionNotFound
);
}
}

View File

@@ -135,7 +135,6 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
let region = follower_engine.get_region(region_id).unwrap();
assert!(!region.is_writable());
assert!(resp.is_ok());
assert!(!follower_engine.is_region_catching_up(region_id));
// Scans
let request = ScanRequest::default();
@@ -257,7 +256,7 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
.await
.unwrap_err();
let err = err.as_any().downcast_ref::<Error>().unwrap();
assert!(!follower_engine.is_region_catching_up(region_id));
assert_matches!(err, Error::Unexpected { .. });
// It should ignore requests to writable regions.
@@ -720,33 +719,3 @@ async fn test_catchup_not_exist_with_format(flat_format: bool) {
.unwrap_err();
assert_matches!(err.status_code(), StatusCode::RegionNotFound);
}
#[tokio::test]
async fn test_catchup_region_busy() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.set_region_role(region_id, RegionRole::Follower)
.unwrap();
let worker = engine.inner.workers.worker(region_id);
let catchup_regions = worker.catchup_regions();
catchup_regions.insert_region(region_id);
let err = engine
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
..Default::default()
}),
)
.await
.unwrap_err();
assert_matches!(err.status_code(), StatusCode::RegionBusy);
}

View File

@@ -126,9 +126,9 @@ async fn test_index_build_type_flush() {
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(scanner.num_memtables(), 1);
assert_eq!(scanner.num_files(), 0);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 0);
assert!(scanner.num_memtables() == 1);
assert!(scanner.num_files() == 0);
assert!(num_of_index_files(&engine, &scanner, region_id).await == 0);
flush_region(&engine, region_id, None).await;
@@ -137,9 +137,9 @@ async fn test_index_build_type_flush() {
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(scanner.num_memtables(), 0);
assert_eq!(scanner.num_files(), 1);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 0);
assert!(scanner.num_memtables() == 0);
assert!(scanner.num_files() == 1);
assert!(num_of_index_files(&engine, &scanner, region_id).await == 0);
let rows = Rows {
schema: column_schemas.clone(),
@@ -155,7 +155,7 @@ async fn test_index_build_type_flush() {
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 2);
assert!(num_of_index_files(&engine, &scanner, region_id).await == 2);
}
#[tokio::test]
@@ -206,7 +206,7 @@ async fn test_index_build_type_compact() {
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(scanner.num_files(), 4);
assert!(scanner.num_files() == 4);
assert!(num_of_index_files(&engine, &scanner, region_id).await < 4);
// Note: Compaction have been implicitly triggered by the flush operations above.
@@ -219,9 +219,8 @@ async fn test_index_build_type_compact() {
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(scanner.num_files(), 2);
// Compaction is an async task, so it may be finished at this moment.
assert!(num_of_index_files(&engine, &scanner, region_id).await <= 2);
assert!(scanner.num_files() == 2);
assert!(num_of_index_files(&engine, &scanner, region_id).await < 2);
// Wait a while to make sure index build tasks are finished.
listener.wait_stop(5).await; // 4 flush + 1 compaction = some abort + some finish
@@ -229,9 +228,8 @@ async fn test_index_build_type_compact() {
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(scanner.num_files(), 2);
// Index files should be built.
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 2);
assert!(scanner.num_files() == 2);
assert!(num_of_index_files(&engine, &scanner, region_id).await == 2);
}
#[tokio::test]
@@ -276,8 +274,8 @@ async fn test_index_build_type_schema_change() {
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(scanner.num_files(), 1);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 0);
assert!(scanner.num_files() == 1);
assert!(num_of_index_files(&engine, &scanner, region_id).await == 0);
// Set Index and make sure index file is built without flush or compaction.
let set_index_request = RegionAlterRequest {
@@ -296,6 +294,6 @@ async fn test_index_build_type_schema_change() {
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(scanner.num_files(), 1);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1);
assert!(scanner.num_files() == 1);
assert!(num_of_index_files(&engine, &scanner, region_id).await == 1);
}

View File

@@ -22,10 +22,8 @@ use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::test_util::batch_util::sort_batches_and_print;
use crate::test_util::{
CreateRequestBuilder, TestEnv, build_delete_rows, build_rows, delete_rows, delete_rows_schema,
flush_region, put_rows, rows_schema,
CreateRequestBuilder, TestEnv, build_rows, flush_region, put_rows, rows_schema,
};
async fn check_prune_row_groups(exprs: Vec<Expr>, expected: &str, flat_format: bool) {
@@ -379,99 +377,3 @@ async fn test_mem_range_prune_with_format(flat_format: bool) {
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_scan_filter_field_after_delete() {
test_scan_filter_field_after_delete_with_format(false).await;
test_scan_filter_field_after_delete_with_format(true).await;
}
async fn test_scan_filter_field_after_delete_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
// put 1, 2, 3, 4 and flush
put_rows(
&engine,
region_id,
Rows {
schema: column_schemas,
rows: build_rows(1, 5),
},
)
.await;
flush_region(&engine, region_id, None).await;
// delete 2, 3
let delete_schemas = delete_rows_schema(&request);
delete_rows(
&engine,
region_id,
Rows {
schema: delete_schemas,
rows: build_delete_rows(2, 4),
},
)
.await;
// Scans and filter fields, the field should be deleted.
let request = ScanRequest {
filters: vec![col("field_0").eq(lit(3.0f64))],
..Default::default()
};
let stream = engine
.scan_to_stream(region_id, request.clone())
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+----+
| tag_0 | field_0 | ts |
+-------+---------+----+
+-------+---------+----+";
assert_eq!(
expected,
sort_batches_and_print(&batches, &["tag_0", "field_0", "ts"])
);
// flush delete op
flush_region(&engine, region_id, None).await;
let stream = engine
.scan_to_stream(region_id, request.clone())
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
expected,
sort_batches_and_print(&batches, &["tag_0", "field_0", "ts"])
);
}

View File

@@ -40,9 +40,7 @@ use crate::error::{
RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::memtable::{
BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
};
use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges};
use crate::metrics::{
FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
INFLIGHT_FLUSH_COUNT,
@@ -51,6 +49,7 @@ use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
use crate::read::flat_merge::FlatMergeIterator;
use crate::read::merge::MergeReaderBuilder;
use crate::read::scan_region::PredicateGroup;
use crate::read::{FlatSource, Source};
use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
@@ -460,7 +459,7 @@ impl RegionFlushTask {
flush_metrics.compact_memtable += compact_cost;
// Sets `for_flush` flag to true.
let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
let mem_ranges = mem.ranges(None, PredicateGroup::default(), None, true)?;
let num_mem_ranges = mem_ranges.ranges.len();
let num_mem_rows = mem_ranges.stats.num_rows();
let memtable_id = mem.id();
@@ -638,7 +637,6 @@ impl RegionFlushTask {
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
index_file_id: None,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: NonZeroU64::new(max_sequence),
@@ -743,6 +741,7 @@ struct FlatSources {
encoded: SmallVec<[EncodedRange; 4]>,
}
// TODO(yingwen): Flushes into multiple files in parallel.
/// Returns the max sequence and [FlatSource] for the given memtable.
fn memtable_flat_sources(
schema: SchemaRef,
@@ -764,9 +763,6 @@ fn memtable_flat_sources(
flat_sources.encoded.push(encoded);
} else {
let iter = only_range.build_record_batch_iter(None)?;
// Dedup according to append mode and merge mode.
// Even single range may have duplicate rows.
let iter = maybe_dedup_one(options, field_column_start, iter);
flat_sources.sources.push(FlatSource::Iter(iter));
};
} else {
@@ -834,28 +830,6 @@ fn merge_and_dedup(
Ok(maybe_dedup)
}
fn maybe_dedup_one(
options: &RegionOptions,
field_column_start: usize,
input_iter: BoxedRecordBatchIterator,
) -> BoxedRecordBatchIterator {
if options.append_mode {
// No dedup in append mode
input_iter
} else {
// Dedup according to merge mode.
match options.merge_mode() {
MergeMode::LastRow => {
Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
}
MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
input_iter,
FlatLastNonNull::new(field_column_start, false),
)),
}
}
}
/// Manages background flushes of a worker.
pub(crate) struct FlushScheduler {
/// Tracks regions need to flush.
@@ -1189,16 +1163,11 @@ impl FlushStatus {
#[cfg(test)]
mod tests {
use mito_codec::row_converter::build_primary_key_codec;
use tokio::sync::oneshot;
use super::*;
use crate::cache::CacheManager;
use crate::memtable::bulk::part::BulkPartConverter;
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::memtable::{Memtable, RangesOptions};
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
@@ -1382,113 +1351,4 @@ mod tests {
assert_eq!(output, 0);
}
}
// Verifies single-range flat flush path respects append_mode (no dedup) vs dedup when disabled.
#[test]
fn test_memtable_flat_sources_single_range_append_mode_behavior() {
// Build test metadata and flat schema
let metadata = metadata_for_test();
let schema = to_flat_sst_arrow_schema(
&metadata,
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
);
// Prepare a bulk part containing duplicate rows for the same PK and timestamp
// Two rows with identical keys and timestamps (ts = 1000), different field values
let capacity = 16;
let pk_codec = build_primary_key_codec(&metadata);
let mut converter =
BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
let kvs = build_key_values_with_ts_seq_values(
&metadata,
"dup_key".to_string(),
1,
vec![1000i64, 1000i64].into_iter(),
vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1,
);
converter.append_key_values(&kvs).unwrap();
let part = converter.convert().unwrap();
// Helper to build MemtableRanges with a single range from one bulk part.
// We use BulkMemtable directly because it produces record batch iterators.
let build_ranges = |append_mode: bool| -> MemtableRanges {
let memtable = crate::memtable::bulk::BulkMemtable::new(
1,
metadata.clone(),
None,
None,
append_mode,
MergeMode::LastRow,
);
memtable.write_bulk(part.clone()).unwrap();
memtable.ranges(None, RangesOptions::for_flush()).unwrap()
};
// Case 1: append_mode = false => dedup happens, total rows should be 1
{
let mem_ranges = build_ranges(false);
assert_eq!(1, mem_ranges.ranges.len());
let options = RegionOptions {
append_mode: false,
merge_mode: Some(MergeMode::LastRow),
..Default::default()
};
let flat_sources = memtable_flat_sources(
schema.clone(),
mem_ranges,
&options,
metadata.primary_key.len(),
)
.unwrap();
assert!(flat_sources.encoded.is_empty());
assert_eq!(1, flat_sources.sources.len());
// Consume the iterator and count rows
let mut total_rows = 0usize;
for source in flat_sources.sources {
match source {
crate::read::FlatSource::Iter(iter) => {
for rb in iter {
total_rows += rb.unwrap().num_rows();
}
}
crate::read::FlatSource::Stream(_) => unreachable!(),
}
}
assert_eq!(1, total_rows, "dedup should keep a single row");
}
// Case 2: append_mode = true => no dedup, total rows should be 2
{
let mem_ranges = build_ranges(true);
assert_eq!(1, mem_ranges.ranges.len());
let options = RegionOptions {
append_mode: true,
..Default::default()
};
let flat_sources =
memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
.unwrap();
assert!(flat_sources.encoded.is_empty());
assert_eq!(1, flat_sources.sources.len());
let mut total_rows = 0usize;
for source in flat_sources.sources {
match source {
crate::read::FlatSource::Iter(iter) => {
for rb in iter {
total_rows += rb.unwrap().num_rows();
}
}
crate::read::FlatSource::Stream(_) => unreachable!(),
}
}
assert_eq!(2, total_rows, "append_mode should preserve duplicates");
}
}
}

View File

@@ -365,22 +365,7 @@ impl LocalGcWorker {
unused_len, region_id
);
let file_pairs: Vec<(FileId, FileId)> = unused_files
.iter()
.filter_map(|file_id| {
current_files
.get(file_id)
.map(|meta| (meta.file_id().file_id(), meta.index_file_id().file_id()))
})
.collect();
info!(
"Found {} unused index files to delete for region {}",
file_pairs.len(),
region_id
);
self.delete_files(region_id, &file_pairs).await?;
self.delete_files(region_id, &unused_files).await?;
debug!(
"Successfully deleted {} unused files for region {}",
@@ -390,7 +375,7 @@ impl LocalGcWorker {
Ok(unused_files)
}
async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, FileId)]) -> Result<()> {
async fn delete_files(&self, region_id: RegionId, file_ids: &[FileId]) -> Result<()> {
delete_files(
region_id,
file_ids,

View File

@@ -265,7 +265,6 @@ async fn checkpoint_with_different_compression_types() {
file_size: 1024000,
available_indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
num_rows: 0,
num_row_groups: 0,
sequence: None,
@@ -332,7 +331,6 @@ fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>)
file_size: 1024000,
available_indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
num_rows: 0,
num_row_groups: 0,
sequence: None,

View File

@@ -44,7 +44,6 @@ use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
use crate::sst::FormatType;
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::SstInfo;
use crate::sst::parquet::file_range::PreFilterMode;
mod builder;
pub mod bulk;
@@ -74,63 +73,6 @@ pub enum MemtableConfig {
TimeSeries,
}
/// Options for querying ranges from a memtable.
#[derive(Clone)]
pub struct RangesOptions {
/// Whether the ranges are being queried for flush.
pub for_flush: bool,
/// Mode to pre-filter columns in ranges.
pub pre_filter_mode: PreFilterMode,
/// Predicate to filter the data.
pub predicate: PredicateGroup,
/// Sequence range to filter the data.
pub sequence: Option<SequenceRange>,
}
impl Default for RangesOptions {
fn default() -> Self {
Self {
for_flush: false,
pre_filter_mode: PreFilterMode::All,
predicate: PredicateGroup::default(),
sequence: None,
}
}
}
impl RangesOptions {
/// Creates a new [RangesOptions] for flushing.
pub fn for_flush() -> Self {
Self {
for_flush: true,
pre_filter_mode: PreFilterMode::All,
predicate: PredicateGroup::default(),
sequence: None,
}
}
/// Sets the pre-filter mode.
#[must_use]
pub fn with_pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
self.pre_filter_mode = pre_filter_mode;
self
}
/// Sets the predicate.
#[must_use]
pub fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
self.predicate = predicate;
self
}
/// Sets the sequence range.
#[must_use]
pub fn with_sequence(mut self, sequence: Option<SequenceRange>) -> Self {
self.sequence = sequence;
self
}
}
#[derive(Debug, Default, Clone)]
pub struct MemtableStats {
/// The estimated bytes allocated by this memtable from heap.
@@ -249,11 +191,14 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Returns the ranges in the memtable.
///
/// The `for_flush` flag is true if the flush job calls this method for flush.
/// The returned map contains the range id and the range after applying the predicate.
fn ranges(
&self,
projection: Option<&[ColumnId]>,
options: RangesOptions,
predicate: PredicateGroup,
sequence: Option<SequenceRange>,
for_flush: bool,
) -> Result<MemtableRanges>;
/// Returns true if the memtable is empty.

View File

@@ -36,15 +36,13 @@ use tokio::sync::Semaphore;
use crate::error::{Result, UnsupportedOperationSnafu};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::context::BulkIterContext;
use crate::memtable::bulk::part::{
BulkPart, BulkPartEncodeMetrics, BulkPartEncoder, UnorderedPart,
};
use crate::memtable::bulk::part::{BulkPart, BulkPartEncodeMetrics, BulkPartEncoder};
use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup,
};
use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
use crate::read::flat_merge::FlatMergeIterator;
@@ -56,8 +54,6 @@ use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
/// All parts in a bulk memtable.
#[derive(Default)]
struct BulkParts {
/// Unordered small parts (< 1024 rows).
unordered_part: UnorderedPart,
/// Raw parts.
parts: Vec<BulkPartWrapper>,
/// Parts encoded as parquets.
@@ -65,15 +61,14 @@ struct BulkParts {
}
impl BulkParts {
/// Total number of parts (raw + encoded + unordered).
/// Total number of parts (raw + encoded).
fn num_parts(&self) -> usize {
let unordered_count = if self.unordered_part.is_empty() { 0 } else { 1 };
self.parts.len() + self.encoded_parts.len() + unordered_count
self.parts.len() + self.encoded_parts.len()
}
/// Returns true if there is no part.
fn is_empty(&self) -> bool {
self.unordered_part.is_empty() && self.parts.is_empty() && self.encoded_parts.is_empty()
self.parts.is_empty() && self.encoded_parts.is_empty()
}
/// Returns true if the bulk parts should be merged.
@@ -94,11 +89,6 @@ impl BulkParts {
unmerged_count >= 8
}
/// Returns true if the unordered_part should be compacted into a BulkPart.
fn should_compact_unordered_part(&self) -> bool {
self.unordered_part.should_compact()
}
/// Collects unmerged parts and marks them as being merged.
/// Returns the collected parts to merge.
fn collect_bulk_parts_to_merge(&mut self) -> Vec<PartToMerge> {
@@ -253,6 +243,7 @@ pub struct BulkMemtable {
max_sequence: AtomicU64,
num_rows: AtomicUsize,
/// Cached flat SST arrow schema for memtable compaction.
#[allow(dead_code)]
flat_arrow_schema: SchemaRef,
/// Compactor for merging bulk parts
compactor: Arc<Mutex<MemtableCompactor>>,
@@ -307,29 +298,11 @@ impl Memtable for BulkMemtable {
{
let mut bulk_parts = self.parts.write().unwrap();
// Routes small parts to unordered_part based on threshold
if bulk_parts.unordered_part.should_accept(fragment.num_rows()) {
bulk_parts.unordered_part.push(fragment);
// Compacts unordered_part if threshold is reached
if bulk_parts.should_compact_unordered_part()
&& let Some(bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
{
bulk_parts.parts.push(BulkPartWrapper {
part: bulk_part,
file_id: FileId::random(),
merging: false,
});
bulk_parts.unordered_part.clear();
}
} else {
bulk_parts.parts.push(BulkPartWrapper {
part: fragment,
file_id: FileId::random(),
merging: false,
});
}
bulk_parts.parts.push(BulkPartWrapper {
part: fragment,
file_id: FileId::random(),
merging: false,
});
// Since this operation should be fast, we do it in parts lock scope.
// This ensure the statistics in `ranges()` are correct. What's more,
@@ -358,47 +331,25 @@ impl Memtable for BulkMemtable {
fn ranges(
&self,
projection: Option<&[ColumnId]>,
options: RangesOptions,
predicate: PredicateGroup,
sequence: Option<SequenceRange>,
for_flush: bool,
) -> Result<MemtableRanges> {
let predicate = options.predicate;
let sequence = options.sequence;
let mut ranges = BTreeMap::new();
let mut range_id = 0;
// TODO(yingwen): Filter ranges by sequence.
let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
let context = Arc::new(BulkIterContext::new(
self.metadata.clone(),
projection,
predicate.predicate().cloned(),
options.for_flush,
options.pre_filter_mode,
for_flush,
)?);
// Adds ranges for regular parts and encoded parts
{
let bulk_parts = self.parts.read().unwrap();
// Adds range for unordered part if not empty
if !bulk_parts.unordered_part.is_empty()
&& let Some(unordered_bulk_part) = bulk_parts.unordered_part.to_bulk_part()?
{
let num_rows = unordered_bulk_part.num_rows();
let range = MemtableRange::new(
Arc::new(MemtableRangeContext::new(
self.id,
Box::new(BulkRangeIterBuilder {
part: unordered_bulk_part,
context: context.clone(),
sequence,
}),
predicate.clone(),
)),
num_rows,
);
ranges.insert(range_id, range);
range_id += 1;
}
// Adds ranges for regular parts
for part_wrapper in bulk_parts.parts.iter() {
// Skips empty parts
@@ -593,26 +544,6 @@ impl BulkMemtable {
}
}
/// Sets the unordered part threshold (for testing).
#[cfg(test)]
pub fn set_unordered_part_threshold(&self, threshold: usize) {
self.parts
.write()
.unwrap()
.unordered_part
.set_threshold(threshold);
}
/// Sets the unordered part compact threshold (for testing).
#[cfg(test)]
pub fn set_unordered_part_compact_threshold(&self, compact_threshold: usize) {
self.parts
.write()
.unwrap()
.unordered_part
.set_compact_threshold(compact_threshold);
}
/// Updates memtable stats.
///
/// Please update this inside the write lock scope.
@@ -688,15 +619,12 @@ impl IterBuilder for BulkRangeIterBuilder {
fn build_record_batch(
&self,
metrics: Option<MemScanMetrics>,
_metrics: Option<MemScanMetrics>,
) -> Result<BoxedRecordBatchIterator> {
let series_count = self.part.estimated_series_count();
let iter = BulkPartRecordBatchIter::new(
self.part.batch.clone(),
self.context.clone(),
self.sequence,
series_count,
metrics,
);
Ok(Box::new(iter))
@@ -709,6 +637,7 @@ impl IterBuilder for BulkRangeIterBuilder {
/// Iterator builder for encoded bulk range
struct EncodedBulkRangeIterBuilder {
#[allow(dead_code)]
file_id: FileId,
part: EncodedBulkPart,
context: Arc<BulkIterContext>,
@@ -729,12 +658,9 @@ impl IterBuilder for EncodedBulkRangeIterBuilder {
fn build_record_batch(
&self,
metrics: Option<MemScanMetrics>,
_metrics: Option<MemScanMetrics>,
) -> Result<BoxedRecordBatchIterator> {
if let Some(iter) = self
.part
.read(self.context.clone(), self.sequence, metrics)?
{
if let Some(iter) = self.part.read(self.context.clone(), self.sequence)? {
Ok(iter)
} else {
// Return an empty iterator if no data to read
@@ -753,6 +679,7 @@ impl IterBuilder for EncodedBulkRangeIterBuilder {
struct BulkPartWrapper {
part: BulkPart,
/// The unique file id for this part in memtable.
#[allow(dead_code)]
file_id: FileId,
/// Whether this part is currently being merged.
merging: bool,
@@ -761,6 +688,7 @@ struct BulkPartWrapper {
struct EncodedPartWrapper {
part: EncodedBulkPart,
/// The unique file id for this part in memtable.
#[allow(dead_code)]
file_id: FileId,
/// Whether this part is currently being merged.
merging: bool,
@@ -818,17 +746,12 @@ impl PartToMerge {
) -> Result<Option<BoxedRecordBatchIterator>> {
match self {
PartToMerge::Bulk { part, .. } => {
let series_count = part.estimated_series_count();
let iter = BulkPartRecordBatchIter::new(
part.batch,
context,
None, // No sequence filter for merging
series_count,
None, // No metrics for merging
part.batch, context, None, // No sequence filter for merging
);
Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
}
PartToMerge::Encoded { part, .. } => part.read(context, None, None),
PartToMerge::Encoded { part, .. } => part.read(context, None),
}
}
}
@@ -1233,8 +1156,6 @@ mod tests {
let metadata = metadata_for_test();
let memtable =
BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
// Disable unordered_part for this test
memtable.set_unordered_part_threshold(0);
let test_data = [
(
@@ -1271,12 +1192,7 @@ mod tests {
assert_eq!(3000, max_ts.value());
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
let ranges = memtable
.ranges(
None,
RangesOptions::default().with_predicate(predicate_group),
)
.unwrap();
let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
assert_eq!(3, ranges.ranges.len());
assert_eq!(5, ranges.stats.num_rows);
@@ -1318,10 +1234,7 @@ mod tests {
let projection = vec![4u32];
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
let ranges = memtable
.ranges(
Some(&projection),
RangesOptions::default().with_predicate(predicate_group),
)
.ranges(Some(&projection), predicate_group, None, false)
.unwrap();
assert_eq!(1, ranges.ranges.len());
@@ -1412,8 +1325,6 @@ mod tests {
let metadata = metadata_for_test();
let memtable =
BulkMemtable::new(777, metadata.clone(), None, None, false, MergeMode::LastRow);
// Disable unordered_part for this test
memtable.set_unordered_part_threshold(0);
let parts_data = vec![
(
@@ -1439,12 +1350,7 @@ mod tests {
}
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
let ranges = memtable
.ranges(
None,
RangesOptions::default().with_predicate(predicate_group),
)
.unwrap();
let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
assert_eq!(3, ranges.ranges.len());
assert_eq!(5, ranges.stats.num_rows);
@@ -1477,12 +1383,7 @@ mod tests {
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); // Filters out rows with sequence > 400
let ranges = memtable
.ranges(
None,
RangesOptions::default()
.with_predicate(predicate_group)
.with_sequence(sequence_filter),
)
.ranges(None, predicate_group, sequence_filter, false)
.unwrap();
assert_eq!(1, ranges.ranges.len());
@@ -1497,8 +1398,6 @@ mod tests {
let metadata = metadata_for_test();
let memtable =
BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
// Disable unordered_part for this test
memtable.set_unordered_part_threshold(0);
// Adds enough bulk parts to trigger encoding
for i in 0..10 {
@@ -1516,12 +1415,7 @@ mod tests {
memtable.compact(false).unwrap();
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
let ranges = memtable
.ranges(
None,
RangesOptions::default().with_predicate(predicate_group),
)
.unwrap();
let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
// Should have ranges for both bulk parts and encoded parts
assert_eq!(3, ranges.ranges.len());
@@ -1541,229 +1435,4 @@ mod tests {
assert_eq!(total_rows, range.num_rows());
}
}
#[test]
fn test_bulk_memtable_unordered_part() {
let metadata = metadata_for_test();
let memtable = BulkMemtable::new(
1001,
metadata.clone(),
None,
None,
false,
MergeMode::LastRow,
);
// Set smaller thresholds for testing with smaller inputs
// Accept parts with < 5 rows into unordered_part
memtable.set_unordered_part_threshold(5);
// Compact when total rows >= 10
memtable.set_unordered_part_compact_threshold(10);
// Write 3 small parts (each has 2 rows), should be collected in unordered_part
for i in 0..3 {
let part = create_bulk_part_with_converter(
&format!("key_{}", i),
i,
vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
100 + i as u64,
)
.unwrap();
assert_eq!(2, part.num_rows());
memtable.write_bulk(part).unwrap();
}
// Total rows = 6, not yet reaching compact threshold
let stats = memtable.stats();
assert_eq!(6, stats.num_rows);
// Write 2 more small parts (each has 2 rows)
// This should trigger compaction when total >= 10
for i in 3..5 {
let part = create_bulk_part_with_converter(
&format!("key_{}", i),
i,
vec![1000 + i as i64 * 100, 1100 + i as i64 * 100],
vec![Some(i as f64 * 10.0), Some(i as f64 * 10.0 + 1.0)],
100 + i as u64,
)
.unwrap();
memtable.write_bulk(part).unwrap();
}
// Total rows = 10, should have compacted unordered_part into a regular part
let stats = memtable.stats();
assert_eq!(10, stats.num_rows);
// Verify we can read all data correctly
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
let ranges = memtable
.ranges(
None,
RangesOptions::default().with_predicate(predicate_group),
)
.unwrap();
// Should have at least 1 range (the compacted part)
assert!(!ranges.ranges.is_empty());
assert_eq!(10, ranges.stats.num_rows);
// Read all data and verify
let mut total_rows_read = 0;
for (_range_id, range) in ranges.ranges.iter() {
assert!(range.is_record_batch());
let record_batch_iter = range.build_record_batch_iter(None).unwrap();
for batch_result in record_batch_iter {
let batch = batch_result.unwrap();
total_rows_read += batch.num_rows();
}
}
assert_eq!(10, total_rows_read);
}
#[test]
fn test_bulk_memtable_unordered_part_mixed_sizes() {
let metadata = metadata_for_test();
let memtable = BulkMemtable::new(
1002,
metadata.clone(),
None,
None,
false,
MergeMode::LastRow,
);
// Set threshold to 4 rows - parts with < 4 rows go to unordered_part
memtable.set_unordered_part_threshold(4);
memtable.set_unordered_part_compact_threshold(8);
// Write small parts (3 rows each) - should go to unordered_part
for i in 0..2 {
let part = create_bulk_part_with_converter(
&format!("small_{}", i),
i,
vec![1000 + i as i64, 2000 + i as i64, 3000 + i as i64],
vec![Some(i as f64), Some(i as f64 + 1.0), Some(i as f64 + 2.0)],
10 + i as u64,
)
.unwrap();
assert_eq!(3, part.num_rows());
memtable.write_bulk(part).unwrap();
}
// Write a large part (5 rows) - should go directly to regular parts
let large_part = create_bulk_part_with_converter(
"large_key",
100,
vec![5000, 6000, 7000, 8000, 9000],
vec![
Some(100.0),
Some(101.0),
Some(102.0),
Some(103.0),
Some(104.0),
],
50,
)
.unwrap();
assert_eq!(5, large_part.num_rows());
memtable.write_bulk(large_part).unwrap();
// Write another small part (2 rows) - should trigger compaction of unordered_part
let part = create_bulk_part_with_converter(
"small_2",
2,
vec![4000, 4100],
vec![Some(20.0), Some(21.0)],
30,
)
.unwrap();
memtable.write_bulk(part).unwrap();
let stats = memtable.stats();
assert_eq!(13, stats.num_rows); // 3 + 3 + 5 + 2 = 13
// Verify all data can be read
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
let ranges = memtable
.ranges(
None,
RangesOptions::default().with_predicate(predicate_group),
)
.unwrap();
assert_eq!(13, ranges.stats.num_rows);
let mut total_rows_read = 0;
for (_range_id, range) in ranges.ranges.iter() {
let record_batch_iter = range.build_record_batch_iter(None).unwrap();
for batch_result in record_batch_iter {
let batch = batch_result.unwrap();
total_rows_read += batch.num_rows();
}
}
assert_eq!(13, total_rows_read);
}
#[test]
fn test_bulk_memtable_unordered_part_with_ranges() {
let metadata = metadata_for_test();
let memtable = BulkMemtable::new(
1003,
metadata.clone(),
None,
None,
false,
MergeMode::LastRow,
);
// Set small thresholds
memtable.set_unordered_part_threshold(3);
memtable.set_unordered_part_compact_threshold(100); // High threshold to prevent auto-compaction
// Write several small parts that stay in unordered_part
for i in 0..3 {
let part = create_bulk_part_with_converter(
&format!("key_{}", i),
i,
vec![1000 + i as i64 * 100],
vec![Some(i as f64 * 10.0)],
100 + i as u64,
)
.unwrap();
assert_eq!(1, part.num_rows());
memtable.write_bulk(part).unwrap();
}
let stats = memtable.stats();
assert_eq!(3, stats.num_rows);
// Test that ranges() can correctly read from unordered_part
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
let ranges = memtable
.ranges(
None,
RangesOptions::default().with_predicate(predicate_group),
)
.unwrap();
// Should have 1 range for the unordered_part
assert_eq!(1, ranges.ranges.len());
assert_eq!(3, ranges.stats.num_rows);
// Verify data is sorted correctly in the range
let range = ranges.ranges.get(&0).unwrap();
let record_batch_iter = range.build_record_batch_iter(None).unwrap();
let mut total_rows = 0;
for batch_result in record_batch_iter {
let batch = batch_result.unwrap();
total_rows += batch.num_rows();
// Verify data is properly sorted by primary key
assert!(batch.num_rows() > 0);
}
assert_eq!(3, total_rows);
}
}

View File

@@ -24,7 +24,7 @@ use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::error::Result;
use crate::sst::parquet::file_range::{PreFilterMode, RangeBase};
use crate::sst::parquet::file_range::RangeBase;
use crate::sst::parquet::flat_format::FlatReadFormat;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::SimpleFilterContext;
@@ -43,22 +43,6 @@ impl BulkIterContext {
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
skip_auto_convert: bool,
) -> Result<Self> {
Self::new_with_pre_filter_mode(
region_metadata,
projection,
predicate,
skip_auto_convert,
PreFilterMode::All,
)
}
pub fn new_with_pre_filter_mode(
region_metadata: RegionMetadataRef,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
skip_auto_convert: bool,
pre_filter_mode: PreFilterMode,
) -> Result<Self> {
let codec = build_primary_key_codec(&region_metadata);
@@ -89,23 +73,17 @@ impl BulkIterContext {
codec,
// we don't need to compat batch since all batch in memtable have the same schema.
compat_batch: None,
pre_filter_mode,
},
predicate,
})
}
/// Prunes row groups by stats.
pub(crate) fn row_groups_to_read(
&self,
file_meta: &Arc<ParquetMetaData>,
skip_fields: bool,
) -> VecDeque<usize> {
pub(crate) fn row_groups_to_read(&self, file_meta: &Arc<ParquetMetaData>) -> VecDeque<usize> {
let region_meta = self.base.read_format.metadata();
let row_groups = file_meta.row_groups();
// expected_metadata is set to None since we always expect region metadata of memtable is up-to-date.
let stats =
RowGroupPruningStats::new(row_groups, &self.base.read_format, None, skip_fields);
let stats = RowGroupPruningStats::new(row_groups, &self.base.read_format, None);
if let Some(predicate) = self.predicate.as_ref() {
predicate
.prune_with_stats(&stats, region_meta.schema.arrow_schema())
@@ -126,14 +104,4 @@ impl BulkIterContext {
pub(crate) fn read_format(&self) -> &ReadFormat {
&self.base.read_format
}
/// Returns the pre-filter mode.
pub(crate) fn pre_filter_mode(&self) -> PreFilterMode {
self.base.pre_filter_mode
}
/// Returns the region id.
pub(crate) fn region_id(&self) -> store_api::storage::RegionId {
self.base.read_format.metadata().region_id
}
}

View File

@@ -60,12 +60,11 @@ use crate::error::{
self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
EncodeSnafu, InvalidMetadataSnafu, NewRecordBatchSnafu, Result,
};
use crate::memtable::BoxedRecordBatchIterator;
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
use crate::memtable::time_series::{ValueBuilder, Values};
use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics};
use crate::sst::index::IndexOutput;
use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
@@ -232,154 +231,6 @@ impl BulkPart {
}
}
/// A collection of small unordered bulk parts.
/// Used to batch small parts together before merging them into a sorted part.
pub struct UnorderedPart {
/// Small bulk parts that haven't been sorted yet.
parts: Vec<BulkPart>,
/// Total number of rows across all parts.
total_rows: usize,
/// Minimum timestamp across all parts.
min_timestamp: i64,
/// Maximum timestamp across all parts.
max_timestamp: i64,
/// Maximum sequence number across all parts.
max_sequence: u64,
/// Row count threshold for accepting parts (default: 1024).
threshold: usize,
/// Row count threshold for compacting (default: 4096).
compact_threshold: usize,
}
impl Default for UnorderedPart {
fn default() -> Self {
Self::new()
}
}
impl UnorderedPart {
/// Creates a new empty UnorderedPart.
pub fn new() -> Self {
Self {
parts: Vec::new(),
total_rows: 0,
min_timestamp: i64::MAX,
max_timestamp: i64::MIN,
max_sequence: 0,
threshold: 1024,
compact_threshold: 4096,
}
}
/// Sets the threshold for accepting parts into unordered_part.
pub fn set_threshold(&mut self, threshold: usize) {
self.threshold = threshold;
}
/// Sets the threshold for compacting unordered_part.
pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
self.compact_threshold = compact_threshold;
}
/// Returns the threshold for accepting parts.
pub fn threshold(&self) -> usize {
self.threshold
}
/// Returns the compact threshold.
pub fn compact_threshold(&self) -> usize {
self.compact_threshold
}
/// Returns true if this part should accept the given row count.
pub fn should_accept(&self, num_rows: usize) -> bool {
num_rows < self.threshold
}
/// Returns true if this part should be compacted.
pub fn should_compact(&self) -> bool {
self.total_rows >= self.compact_threshold
}
/// Adds a BulkPart to this unordered collection.
pub fn push(&mut self, part: BulkPart) {
self.total_rows += part.num_rows();
self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
self.max_sequence = self.max_sequence.max(part.sequence);
self.parts.push(part);
}
/// Returns the total number of rows across all parts.
pub fn num_rows(&self) -> usize {
self.total_rows
}
/// Returns true if there are no parts.
pub fn is_empty(&self) -> bool {
self.parts.is_empty()
}
/// Returns the number of parts in this collection.
pub fn num_parts(&self) -> usize {
self.parts.len()
}
/// Concatenates and sorts all parts into a single RecordBatch.
/// Returns None if the collection is empty.
pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
if self.parts.is_empty() {
return Ok(None);
}
if self.parts.len() == 1 {
// If there's only one part, return its batch directly
return Ok(Some(self.parts[0].batch.clone()));
}
// Get the schema from the first part
let schema = self.parts[0].batch.schema();
// Concatenate all record batches
let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
let concatenated =
arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
// Sort the concatenated batch
let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
Ok(Some(sorted_batch))
}
/// Converts all parts into a single sorted BulkPart.
/// Returns None if the collection is empty.
pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
let Some(sorted_batch) = self.concat_and_sort()? else {
return Ok(None);
};
let timestamp_index = self.parts[0].timestamp_index;
Ok(Some(BulkPart {
batch: sorted_batch,
max_timestamp: self.max_timestamp,
min_timestamp: self.min_timestamp,
sequence: self.max_sequence,
timestamp_index,
raw_data: None,
}))
}
/// Clears all parts from this collection.
pub fn clear(&mut self) {
self.parts.clear();
self.total_rows = 0;
self.min_timestamp = i64::MAX;
self.max_timestamp = i64::MIN;
self.max_sequence = 0;
}
}
/// More accurate estimation of the size of a record batch.
pub(crate) fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
batch
@@ -720,15 +571,9 @@ impl EncodedBulkPart {
&self,
context: BulkIterContextRef,
sequence: Option<SequenceRange>,
mem_scan_metrics: Option<MemScanMetrics>,
) -> Result<Option<BoxedRecordBatchIterator>> {
// Compute skip_fields for row group pruning using the same approach as compute_skip_fields in reader.rs.
let skip_fields_for_pruning =
Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
// use predicate to find row groups to read.
let row_groups_to_read =
context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
if row_groups_to_read.is_empty() {
// All row groups are filtered.
@@ -736,28 +581,14 @@ impl EncodedBulkPart {
}
let iter = EncodedBulkPartIter::try_new(
self,
context,
row_groups_to_read,
self.metadata.parquet_metadata.clone(),
self.data.clone(),
sequence,
mem_scan_metrics,
)?;
Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
}
/// Computes whether to skip field columns based on PreFilterMode.
fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
match pre_filter_mode {
PreFilterMode::All => false,
PreFilterMode::SkipFields => true,
PreFilterMode::SkipFieldsOnDelete => {
// Check if any row group contains delete op
(0..parquet_meta.num_row_groups()).any(|rg_idx| {
row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
})
}
}
}
}
#[derive(Debug, Clone)]
@@ -1564,7 +1395,6 @@ mod tests {
.unwrap(),
),
None,
None,
)
.unwrap()
.expect("expect at least one row group");
@@ -1624,7 +1454,7 @@ mod tests {
.unwrap(),
);
let mut reader = part
.read(context, None, None)
.read(context, None)
.unwrap()
.expect("expect at least one row group");
let mut total_rows_read = 0;
@@ -1657,7 +1487,7 @@ mod tests {
)
.unwrap(),
);
assert!(part.read(context, None, None).unwrap().is_none());
assert!(part.read(context, None).unwrap().is_none());
check_prune_row_group(&part, None, 310);

View File

@@ -13,22 +13,20 @@
// limitations under the License.
use std::collections::VecDeque;
use std::time::Instant;
use std::sync::Arc;
use bytes::Bytes;
use datatypes::arrow::array::BooleanArray;
use datatypes::arrow::record_batch::RecordBatch;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet::file::metadata::ParquetMetaData;
use snafu::ResultExt;
use store_api::storage::SequenceRange;
use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
use crate::memtable::bulk::part::EncodedBulkPart;
use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
use crate::memtable::{MemScanMetrics, MemScanMetricsData};
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::sst::parquet::file_range::PreFilterMode;
use crate::sst::parquet::flat_format::sequence_column_index;
use crate::sst::parquet::reader::RowGroupReaderContext;
@@ -40,29 +38,19 @@ pub struct EncodedBulkPartIter {
builder: MemtableRowGroupReaderBuilder,
/// Sequence number filter.
sequence: Option<SequenceRange>,
/// Cached skip_fields for current row group.
current_skip_fields: bool,
/// Metrics for this iterator.
metrics: MemScanMetricsData,
/// Optional memory scan metrics to report to.
mem_scan_metrics: Option<MemScanMetrics>,
}
impl EncodedBulkPartIter {
/// Creates a new [BulkPartIter].
pub(crate) fn try_new(
encoded_part: &EncodedBulkPart,
context: BulkIterContextRef,
mut row_groups_to_read: VecDeque<usize>,
parquet_meta: Arc<ParquetMetaData>,
data: Bytes,
sequence: Option<SequenceRange>,
mem_scan_metrics: Option<MemScanMetrics>,
) -> error::Result<Self> {
assert!(context.read_format().as_flat().is_some());
let parquet_meta = encoded_part.metadata().parquet_metadata.clone();
let data = encoded_part.data().clone();
let series_count = encoded_part.metadata().num_series as usize;
let projection_mask = ProjectionMask::roots(
parquet_meta.file_metadata().schema_descr(),
context.read_format().projection_indices().iter().copied(),
@@ -70,90 +58,46 @@ impl EncodedBulkPartIter {
let builder =
MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() {
Some(first_row_group) => {
let skip_fields = builder.compute_skip_fields(&context, first_row_group);
let reader = builder.build_row_group_reader(first_row_group, None)?;
(Some(reader), skip_fields)
}
None => (None, false),
};
let init_reader = row_groups_to_read
.pop_front()
.map(|first_row_group| builder.build_row_group_reader(first_row_group, None))
.transpose()?;
Ok(Self {
context,
row_groups_to_read,
current_reader: init_reader,
builder,
sequence,
current_skip_fields,
metrics: MemScanMetricsData {
total_series: series_count,
..Default::default()
},
mem_scan_metrics,
})
}
fn report_mem_scan_metrics(&mut self) {
if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
mem_scan_metrics.merge_inner(&self.metrics);
}
}
/// Fetches next non-empty record batch.
pub(crate) fn next_record_batch(&mut self) -> error::Result<Option<RecordBatch>> {
let start = Instant::now();
let Some(current) = &mut self.current_reader else {
// All row group exhausted.
self.metrics.scan_cost += start.elapsed();
return Ok(None);
};
for batch in current {
let batch = batch.context(DecodeArrowRowGroupSnafu)?;
if let Some(batch) = apply_combined_filters(
&self.context,
&self.sequence,
batch,
self.current_skip_fields,
)? {
// Update metrics
self.metrics.num_batches += 1;
self.metrics.num_rows += batch.num_rows();
self.metrics.scan_cost += start.elapsed();
if let Some(batch) = apply_combined_filters(&self.context, &self.sequence, batch)? {
return Ok(Some(batch));
}
}
// Previous row group exhausted, read next row group
while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
// Compute skip_fields for this row group
self.current_skip_fields = self
.builder
.compute_skip_fields(&self.context, next_row_group);
let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
let current = self.current_reader.insert(next_reader);
for batch in current {
let batch = batch.context(DecodeArrowRowGroupSnafu)?;
if let Some(batch) = apply_combined_filters(
&self.context,
&self.sequence,
batch,
self.current_skip_fields,
)? {
// Update metrics
self.metrics.num_batches += 1;
self.metrics.num_rows += batch.num_rows();
self.metrics.scan_cost += start.elapsed();
if let Some(batch) = apply_combined_filters(&self.context, &self.sequence, batch)? {
return Ok(Some(batch));
}
}
}
self.metrics.scan_cost += start.elapsed();
Ok(None)
}
}
@@ -162,37 +106,7 @@ impl Iterator for EncodedBulkPartIter {
type Item = error::Result<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
let result = self.next_record_batch().transpose();
// Report metrics when iteration is complete
if result.is_none() {
self.report_mem_scan_metrics();
}
result
}
}
impl Drop for EncodedBulkPartIter {
fn drop(&mut self) {
common_telemetry::debug!(
"EncodedBulkPartIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}",
self.context.region_id(),
self.metrics.total_series,
self.metrics.num_rows,
self.metrics.num_batches,
self.metrics.scan_cost
);
// Report MemScanMetrics if not already reported
self.report_mem_scan_metrics();
READ_ROWS_TOTAL
.with_label_values(&["bulk_memtable"])
.inc_by(self.metrics.num_rows as u64);
READ_STAGE_ELAPSED
.with_label_values(&["scan_memtable"])
.observe(self.metrics.scan_cost.as_secs_f64());
self.next_record_batch().transpose()
}
}
@@ -204,10 +118,6 @@ pub struct BulkPartRecordBatchIter {
context: BulkIterContextRef,
/// Sequence number filter.
sequence: Option<SequenceRange>,
/// Metrics for this iterator.
metrics: MemScanMetricsData,
/// Optional memory scan metrics to report to.
mem_scan_metrics: Option<MemScanMetrics>,
}
impl BulkPartRecordBatchIter {
@@ -216,8 +126,6 @@ impl BulkPartRecordBatchIter {
record_batch: RecordBatch,
context: BulkIterContextRef,
sequence: Option<SequenceRange>,
series_count: usize,
mem_scan_metrics: Option<MemScanMetrics>,
) -> Self {
assert!(context.read_format().as_flat().is_some());
@@ -225,17 +133,6 @@ impl BulkPartRecordBatchIter {
record_batch: Some(record_batch),
context,
sequence,
metrics: MemScanMetricsData {
total_series: series_count,
..Default::default()
},
mem_scan_metrics,
}
}
fn report_mem_scan_metrics(&mut self) {
if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
mem_scan_metrics.merge_inner(&self.metrics);
}
}
@@ -252,29 +149,15 @@ impl BulkPartRecordBatchIter {
}
fn process_batch(&mut self, record_batch: RecordBatch) -> error::Result<Option<RecordBatch>> {
let start = Instant::now();
// Apply projection first.
let projected_batch = self.apply_projection(record_batch)?;
// Apply combined filtering (both predicate and sequence filters)
// For BulkPartRecordBatchIter, we don't have row group information.
let skip_fields = match self.context.pre_filter_mode() {
PreFilterMode::All => false,
PreFilterMode::SkipFields => true,
PreFilterMode::SkipFieldsOnDelete => true,
};
let Some(filtered_batch) =
apply_combined_filters(&self.context, &self.sequence, projected_batch, skip_fields)?
apply_combined_filters(&self.context, &self.sequence, projected_batch)?
else {
self.metrics.scan_cost += start.elapsed();
return Ok(None);
};
// Update metrics
self.metrics.num_batches += 1;
self.metrics.num_rows += filtered_batch.num_rows();
self.metrics.scan_cost += start.elapsed();
Ok(Some(filtered_batch))
}
}
@@ -283,43 +166,9 @@ impl Iterator for BulkPartRecordBatchIter {
type Item = error::Result<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
let Some(record_batch) = self.record_batch.take() else {
// `take()` should be cheap, we report the metrics directly.
self.report_mem_scan_metrics();
return None;
};
let record_batch = self.record_batch.take()?;
let result = self.process_batch(record_batch).transpose();
// Reports metrics when iteration is complete
if result.is_none() {
self.report_mem_scan_metrics();
}
result
}
}
impl Drop for BulkPartRecordBatchIter {
fn drop(&mut self) {
common_telemetry::debug!(
"BulkPartRecordBatchIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}",
self.context.region_id(),
self.metrics.total_series,
self.metrics.num_rows,
self.metrics.num_batches,
self.metrics.scan_cost
);
// Report MemScanMetrics if not already reported
self.report_mem_scan_metrics();
READ_ROWS_TOTAL
.with_label_values(&["bulk_memtable"])
.inc_by(self.metrics.num_rows as u64);
READ_STAGE_ELAPSED
.with_label_values(&["scan_memtable"])
.observe(self.metrics.scan_cost.as_secs_f64());
self.process_batch(record_batch).transpose()
}
}
@@ -332,7 +181,6 @@ fn apply_combined_filters(
context: &BulkIterContext,
sequence: &Option<SequenceRange>,
record_batch: RecordBatch,
skip_fields: bool,
) -> error::Result<Option<RecordBatch>> {
// Converts the format to the flat format first.
let format = context.read_format().as_flat().unwrap();
@@ -343,9 +191,7 @@ fn apply_combined_filters(
// First, apply predicate filters using the shared method.
if !context.base.filters.is_empty() {
let predicate_mask = context
.base
.compute_filter_mask_flat(&record_batch, skip_fields)?;
let predicate_mask = context.base.compute_filter_mask_flat(&record_batch)?;
// If predicate filters out the entire batch, return None early
let Some(mask) = predicate_mask else {
return Ok(None);
@@ -501,8 +347,7 @@ mod tests {
.unwrap(),
);
// Iterates all rows.
let iter =
BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None, 0, None);
let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None);
let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
assert_eq!(1, result.len());
assert_eq!(3, result[0].num_rows());
@@ -513,8 +358,6 @@ mod tests {
record_batch.clone(),
context,
Some(SequenceRange::LtEq { max: 2 }),
0,
None,
);
let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
assert_eq!(1, result.len());
@@ -535,8 +378,7 @@ mod tests {
.unwrap(),
);
// Creates iter with projection and predicate.
let iter =
BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None, 0, None);
let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None);
let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
assert_eq!(1, result.len());
assert_eq!(1, result[0].num_rows());

View File

@@ -169,23 +169,4 @@ impl MemtableRowGroupReaderBuilder {
)
.context(ReadDataPartSnafu)
}
/// Computes whether to skip field filters for a specific row group based on PreFilterMode.
pub(crate) fn compute_skip_fields(
&self,
context: &BulkIterContextRef,
row_group_idx: usize,
) -> bool {
use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
match context.pre_filter_mode() {
PreFilterMode::All => false,
PreFilterMode::SkipFields => true,
PreFilterMode::SkipFieldsOnDelete => {
// Check if this specific row group contains delete op
row_group_contains_delete(&self.parquet_metadata, row_group_idx, "memtable")
.unwrap_or(true)
}
}
}
}

View File

@@ -44,7 +44,7 @@ use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
MemtableStats, RangesOptions,
MemtableStats, PredicateGroup,
};
use crate::region::options::MergeMode;
@@ -190,10 +190,10 @@ impl Memtable for PartitionTreeMemtable {
fn ranges(
&self,
projection: Option<&[ColumnId]>,
options: RangesOptions,
predicate: PredicateGroup,
sequence: Option<SequenceRange>,
_for_flush: bool,
) -> Result<MemtableRanges> {
let predicate = options.predicate;
let sequence = options.sequence;
let projection = projection.map(|ids| ids.to_vec());
let builder = Box::new(PartitionTreeIterBuilder {
tree: self.tree.clone(),

View File

@@ -27,7 +27,7 @@ use mito_codec::key_values::KeyValue;
use rayon::prelude::*;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use store_api::storage::{ColumnId, SequenceRange};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
@@ -35,11 +35,12 @@ use crate::memtable::stats::WriteMetrics;
use crate::memtable::time_series::Series;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
};
use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
use crate::read::Batch;
use crate::read::dedup::LastNonNullIter;
use crate::read::scan_region::PredicateGroup;
use crate::region::options::MergeMode;
use crate::{error, metrics};
@@ -222,7 +223,7 @@ impl Memtable for SimpleBulkMemtable {
&self,
projection: Option<&[ColumnId]>,
_predicate: Option<table::predicate::Predicate>,
sequence: Option<store_api::storage::SequenceRange>,
sequence: Option<SequenceRange>,
) -> error::Result<BoxedBatchIterator> {
let iter = self.create_iter(projection, sequence)?.build(None)?;
@@ -237,10 +238,10 @@ impl Memtable for SimpleBulkMemtable {
fn ranges(
&self,
projection: Option<&[ColumnId]>,
options: RangesOptions,
predicate: PredicateGroup,
sequence: Option<SequenceRange>,
_for_flush: bool,
) -> error::Result<MemtableRanges> {
let predicate = options.predicate;
let sequence = options.sequence;
let start_time = Instant::now();
let projection = Arc::new(self.build_projection(projection));
let values = self.series.read().unwrap().read_to_values();
@@ -411,7 +412,7 @@ mod tests {
use datatypes::value::Value;
use datatypes::vectors::TimestampMillisecondVector;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::{RegionId, SequenceNumber, SequenceRange};
use store_api::storage::{RegionId, SequenceNumber};
use super::*;
use crate::read;
@@ -616,7 +617,9 @@ mod tests {
let kv = kvs.iter().next().unwrap();
memtable.write_one(kv).unwrap();
let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
let ranges = memtable
.ranges(None, PredicateGroup::default(), None, false)
.unwrap();
let mut source = vec![];
for r in ranges.ranges.values() {
source.push(Source::Iter(r.build_iter().unwrap()));
@@ -648,7 +651,9 @@ mod tests {
memtable.write_one(kv).unwrap();
memtable.freeze().unwrap();
let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
let ranges = memtable
.ranges(None, PredicateGroup::default(), None, false)
.unwrap();
let mut source = vec![];
for r in ranges.ranges.values() {
source.push(Source::Iter(r.build_iter().unwrap()));
@@ -689,7 +694,9 @@ mod tests {
memtable.write_one(kvs.iter().next().unwrap()).unwrap();
memtable.freeze().unwrap();
let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
let ranges = memtable
.ranges(None, PredicateGroup::default(), None, false)
.unwrap();
assert_eq!(ranges.ranges.len(), 1);
let range = ranges.ranges.into_values().next().unwrap();
let mut reader = range.context.builder.build(None).unwrap();
@@ -903,8 +910,9 @@ mod tests {
raw_data: None,
})
.unwrap();
let MemtableRanges { ranges, .. } =
memtable.ranges(None, RangesOptions::default()).unwrap();
let MemtableRanges { ranges, .. } = memtable
.ranges(None, PredicateGroup::default(), None, false)
.unwrap();
let mut source = if ranges.len() == 1 {
let only_range = ranges.into_values().next().unwrap();
Source::Iter(only_range.build_iter().unwrap())

View File

@@ -53,7 +53,7 @@ use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
MemtableStats, RangesOptions,
MemtableStats, PredicateGroup,
};
use crate::metrics::{
MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
@@ -303,10 +303,10 @@ impl Memtable for TimeSeriesMemtable {
fn ranges(
&self,
projection: Option<&[ColumnId]>,
options: RangesOptions,
predicate: PredicateGroup,
sequence: Option<SequenceRange>,
_for_flush: bool,
) -> Result<MemtableRanges> {
let predicate = options.predicate;
let sequence = options.sequence;
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {

View File

@@ -39,6 +39,7 @@ use crate::sst::{
///
/// This mapper support duplicate and unsorted projection indices.
/// The output schema is determined by the projection indices.
#[allow(dead_code)]
pub struct FlatProjectionMapper {
/// Metadata of the region.
metadata: RegionMetadataRef,

View File

@@ -49,40 +49,33 @@ pub struct PruneReader {
context: FileRangeContextRef,
source: Source,
metrics: ReaderMetrics,
/// Whether to skip field filters for this row group.
skip_fields: bool,
}
impl PruneReader {
pub(crate) fn new_with_row_group_reader(
ctx: FileRangeContextRef,
reader: RowGroupReader,
skip_fields: bool,
) -> Self {
Self {
context: ctx,
source: Source::RowGroup(reader),
metrics: Default::default(),
skip_fields,
}
}
pub(crate) fn new_with_last_row_reader(
ctx: FileRangeContextRef,
reader: RowGroupLastRowCachedReader,
skip_fields: bool,
) -> Self {
Self {
context: ctx,
source: Source::LastRow(reader),
metrics: Default::default(),
skip_fields,
}
}
pub(crate) fn reset_source(&mut self, source: Source, skip_fields: bool) {
pub(crate) fn reset_source(&mut self, source: Source) {
self.source = source;
self.skip_fields = skip_fields;
}
/// Merge metrics with the inner reader and return the merged metrics.
@@ -124,7 +117,7 @@ impl PruneReader {
}
let num_rows_before_filter = batch.num_rows();
let Some(batch_filtered) = self.context.precise_filter(batch, self.skip_fields)? else {
let Some(batch_filtered) = self.context.precise_filter(batch)? else {
// the entire batch is filtered out
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
return Ok(None);
@@ -264,21 +257,17 @@ pub struct FlatPruneReader {
context: FileRangeContextRef,
source: FlatSource,
metrics: ReaderMetrics,
/// Whether to skip field filters for this row group.
skip_fields: bool,
}
impl FlatPruneReader {
pub(crate) fn new_with_row_group_reader(
ctx: FileRangeContextRef,
reader: FlatRowGroupReader,
skip_fields: bool,
) -> Self {
Self {
context: ctx,
source: FlatSource::RowGroup(reader),
metrics: Default::default(),
skip_fields,
}
}
@@ -320,10 +309,7 @@ impl FlatPruneReader {
}
let num_rows_before_filter = record_batch.num_rows();
let Some(filtered_batch) = self
.context
.precise_filter_flat(record_batch, self.skip_fields)?
else {
let Some(filtered_batch) = self.context.precise_filter_flat(record_batch)? else {
// the entire batch is filtered out
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
return Ok(None);

View File

@@ -48,7 +48,7 @@ use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE
use crate::error::{InvalidPartitionExprSnafu, Result};
#[cfg(feature = "enterprise")]
use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
use crate::memtable::{MemtableRange, RangesOptions};
use crate::memtable::MemtableRange;
use crate::metrics::READ_SST_COUNT;
use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
use crate::read::projection::ProjectionMapper;
@@ -68,7 +68,6 @@ use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::parquet::file_range::PreFilterMode;
use crate::sst::parquet::reader::ReaderMetrics;
/// Parallel scan channel size for flat format.
@@ -427,10 +426,6 @@ impl ScanRegion {
let memtables = self.version.memtables.list_memtables();
// Skip empty memtables and memtables out of time range.
let mut mem_range_builders = Vec::new();
let filter_mode = pre_filter_mode(
self.version.options.append_mode,
self.version.options.merge_mode(),
);
for m in memtables {
// check if memtable is empty by reading stats.
@@ -444,13 +439,12 @@ impl ScanRegion {
}
let ranges_in_memtable = m.ranges(
Some(mapper.column_ids()),
RangesOptions::default()
.with_predicate(predicate.clone())
.with_sequence(SequenceRange::new(
self.request.memtable_min_sequence,
self.request.memtable_max_sequence,
))
.with_pre_filter_mode(filter_mode),
predicate.clone(),
SequenceRange::new(
self.request.memtable_min_sequence,
self.request.memtable_max_sequence,
),
false,
)?;
mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
// todo: we should add stats to MemtableRange
@@ -472,19 +466,12 @@ impl ScanRegion {
self.version.options.append_mode,
);
let (non_field_filters, field_filters) = self.partition_by_field_filters();
let inverted_index_appliers = [
self.build_invereted_index_applier(&non_field_filters),
self.build_invereted_index_applier(&field_filters),
];
let bloom_filter_appliers = [
self.build_bloom_filter_applier(&non_field_filters),
self.build_bloom_filter_applier(&field_filters),
];
let fulltext_index_appliers = [
self.build_fulltext_index_applier(&non_field_filters),
self.build_fulltext_index_applier(&field_filters),
];
// Remove field filters for LastNonNull mode after logging the request.
self.maybe_remove_field_filters();
let inverted_index_applier = self.build_invereted_index_applier();
let bloom_filter_applier = self.build_bloom_filter_applier();
let fulltext_index_applier = self.build_fulltext_index_applier();
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
if self.flat_format {
@@ -498,9 +485,9 @@ impl ScanRegion {
.with_memtables(mem_range_builders)
.with_files(files)
.with_cache(self.cache_strategy)
.with_inverted_index_appliers(inverted_index_appliers)
.with_bloom_filter_index_appliers(bloom_filter_appliers)
.with_fulltext_index_appliers(fulltext_index_appliers)
.with_inverted_index_applier(inverted_index_applier)
.with_bloom_filter_index_applier(bloom_filter_applier)
.with_fulltext_index_applier(fulltext_index_applier)
.with_parallel_scan_channel_size(self.parallel_scan_channel_size)
.with_max_concurrent_scan_files(self.max_concurrent_scan_files)
.with_start_time(self.start_time)
@@ -540,34 +527,40 @@ impl ScanRegion {
build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
}
/// Partitions filters into two groups: non-field filters and field filters.
/// Returns `(non_field_filters, field_filters)`.
fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
/// Remove field filters if the merge mode is [MergeMode::LastNonNull].
fn maybe_remove_field_filters(&mut self) {
if self.version.options.merge_mode() != MergeMode::LastNonNull {
return;
}
// TODO(yingwen): We can ignore field filters only when there are multiple sources in the same time window.
let field_columns = self
.version
.metadata
.field_columns()
.map(|col| &col.column_schema.name)
.collect::<HashSet<_>>();
// Columns in the expr.
let mut columns = HashSet::new();
self.request.filters.iter().cloned().partition(|expr| {
self.request.filters.retain(|expr| {
columns.clear();
// `expr_to_columns` won't return error.
if expr_to_columns(expr, &mut columns).is_err() {
// If we can't extract columns, treat it as non-field filter
return true;
return false;
}
// Return true for non-field filters (partition puts true cases in first vec)
!columns
.iter()
.any(|column| field_columns.contains(&column.name))
})
for column in &columns {
if field_columns.contains(&column.name) {
// This expr uses the field column.
return false;
}
}
true
});
}
/// Use the latest schema to build the inverted index applier.
fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
if self.ignore_inverted_index {
return None;
}
@@ -595,7 +588,7 @@ impl ScanRegion {
.with_file_cache(file_cache)
.with_inverted_index_cache(inverted_index_cache)
.with_puffin_metadata_cache(puffin_metadata_cache)
.build(filters)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
.ok()
.flatten()
@@ -603,7 +596,7 @@ impl ScanRegion {
}
/// Use the latest schema to build the bloom filter index applier.
fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
fn build_bloom_filter_applier(&self) -> Option<BloomFilterIndexApplierRef> {
if self.ignore_bloom_filter {
return None;
}
@@ -622,7 +615,7 @@ impl ScanRegion {
.with_file_cache(file_cache)
.with_bloom_filter_index_cache(bloom_filter_index_cache)
.with_puffin_metadata_cache(puffin_metadata_cache)
.build(filters)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
.ok()
.flatten()
@@ -630,7 +623,7 @@ impl ScanRegion {
}
/// Use the latest schema to build the fulltext index applier.
fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
fn build_fulltext_index_applier(&self) -> Option<FulltextIndexApplierRef> {
if self.ignore_fulltext_index {
return None;
}
@@ -648,7 +641,7 @@ impl ScanRegion {
.with_file_cache(file_cache)
.with_puffin_metadata_cache(puffin_metadata_cache)
.with_bloom_filter_cache(bloom_filter_index_cache)
.build(filters)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
.ok()
.flatten()
@@ -692,9 +685,9 @@ pub struct ScanInput {
/// Maximum number of SST files to scan concurrently.
pub(crate) max_concurrent_scan_files: usize,
/// Index appliers.
inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
inverted_index_applier: Option<InvertedIndexApplierRef>,
bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
fulltext_index_applier: Option<FulltextIndexApplierRef>,
/// Start time of the query.
pub(crate) query_start: Option<Instant>,
/// The region is using append mode.
@@ -731,9 +724,9 @@ impl ScanInput {
ignore_file_not_found: false,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
inverted_index_appliers: [None, None],
bloom_filter_index_appliers: [None, None],
fulltext_index_appliers: [None, None],
inverted_index_applier: None,
bloom_filter_index_applier: None,
fulltext_index_applier: None,
query_start: None,
append_mode: false,
filter_deleted: true,
@@ -810,33 +803,33 @@ impl ScanInput {
self
}
/// Sets inverted index appliers.
/// Sets invereted index applier.
#[must_use]
pub(crate) fn with_inverted_index_appliers(
pub(crate) fn with_inverted_index_applier(
mut self,
appliers: [Option<InvertedIndexApplierRef>; 2],
applier: Option<InvertedIndexApplierRef>,
) -> Self {
self.inverted_index_appliers = appliers;
self.inverted_index_applier = applier;
self
}
/// Sets bloom filter appliers.
/// Sets bloom filter applier.
#[must_use]
pub(crate) fn with_bloom_filter_index_appliers(
pub(crate) fn with_bloom_filter_index_applier(
mut self,
appliers: [Option<BloomFilterIndexApplierRef>; 2],
applier: Option<BloomFilterIndexApplierRef>,
) -> Self {
self.bloom_filter_index_appliers = appliers;
self.bloom_filter_index_applier = applier;
self
}
/// Sets fulltext index appliers.
/// Sets fulltext index applier.
#[must_use]
pub(crate) fn with_fulltext_index_appliers(
pub(crate) fn with_fulltext_index_applier(
mut self,
appliers: [Option<FulltextIndexApplierRef>; 2],
applier: Option<FulltextIndexApplierRef>,
) -> Self {
self.fulltext_index_appliers = appliers;
self.fulltext_index_applier = applier;
self
}
@@ -959,20 +952,18 @@ impl ScanInput {
reader_metrics: &mut ReaderMetrics,
) -> Result<FileRangeBuilder> {
let predicate = self.predicate_for_file(file);
let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
let res = self
.access_layer
.read_sst(file.clone())
.predicate(predicate)
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_strategy.clone())
.inverted_index_appliers(self.inverted_index_appliers.clone())
.bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
.fulltext_index_appliers(self.fulltext_index_appliers.clone())
.inverted_index_applier(self.inverted_index_applier.clone())
.bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
.fulltext_index_applier(self.fulltext_index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.flat_format(self.flat_format)
.compaction(self.compaction)
.pre_filter_mode(filter_mode)
.build_reader_input(reader_metrics)
.await;
let (mut file_range_ctx, selection) = match res {
@@ -1164,17 +1155,6 @@ impl ScanInput {
}
}
fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
if append_mode {
return PreFilterMode::All;
}
match merge_mode {
MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
MergeMode::LastNonNull => PreFilterMode::SkipFields,
}
}
/// Context shared by different streams from a scanner.
/// It contains the input and ranges to scan.
pub struct StreamContext {

Some files were not shown because too many files have changed in this diff Show More