Compare commits

..

3 Commits

Author SHA1 Message Date
Ning Sun
8853e08a7d chore: use pinned prost 0.14.1 2026-01-15 16:58:24 +08:00
Ning Sun
9cba14f904 chore: update otel librarires 2026-01-15 16:12:34 +08:00
Ning Sun
09ba24b7a9 chore: update otel-arrow 2026-01-15 15:46:40 +08:00
41 changed files with 315 additions and 1829 deletions

227
Cargo.lock generated
View File

@@ -725,7 +725,7 @@ dependencies = [
"memchr",
"num",
"regex",
"regex-syntax 0.8.7",
"regex-syntax",
]
[[package]]
@@ -742,7 +742,7 @@ dependencies = [
"memchr",
"num-traits",
"regex",
"regex-syntax 0.8.7",
"regex-syntax",
]
[[package]]
@@ -1482,7 +1482,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "234113d19d0d7d613b40e86fb654acf958910802bcceab913a4f9e7cda03b1a4"
dependencies = [
"memchr",
"regex-automata 0.4.13",
"regex-automata",
"serde",
]
@@ -2013,7 +2013,7 @@ dependencies = [
"humantime",
"meta-client",
"meta-srv",
"nu-ansi-term",
"nu-ansi-term 0.46.0",
"object-store",
"operator",
"paste",
@@ -2155,7 +2155,7 @@ dependencies = [
"metric-engine",
"mito2",
"moka",
"nu-ansi-term",
"nu-ansi-term 0.46.0",
"object-store",
"parquet",
"plugins",
@@ -2844,10 +2844,10 @@ dependencies = [
"humantime-serde",
"lazy_static",
"once_cell",
"opentelemetry 0.30.0",
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"opentelemetry_sdk 0.30.0",
"opentelemetry_sdk",
"parking_lot 0.12.4",
"prometheus",
"serde",
@@ -4074,7 +4074,7 @@ dependencies = [
"log",
"recursive",
"regex",
"regex-syntax 0.8.7",
"regex-syntax",
]
[[package]]
@@ -4976,8 +4976,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e24cb5a94bcae1e5408b0effca5cd7172ea3c5755049c5f3af4cd283a165298"
dependencies = [
"bit-set",
"regex-automata 0.4.13",
"regex-syntax 0.8.7",
"regex-automata",
"regex-syntax",
]
[[package]]
@@ -5323,7 +5323,7 @@ dependencies = [
"meta-client",
"meta-srv",
"num_cpus",
"opentelemetry-proto 0.31.0",
"opentelemetry-proto",
"operator",
"otel-arrow-rust",
"partition",
@@ -6526,8 +6526,8 @@ dependencies = [
"rand 0.9.1",
"rand_chacha 0.9.0",
"regex",
"regex-automata 0.4.13",
"roaring",
"regex-automata",
"roaring 0.10.12",
"serde",
"serde_json",
"snafu 0.8.6",
@@ -7119,7 +7119,7 @@ dependencies = [
"lalrpop-util",
"petgraph 0.7.1",
"regex",
"regex-syntax 0.8.7",
"regex-syntax",
"sha3",
"string_cache",
"term",
@@ -7133,7 +7133,7 @@ version = "0.22.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5baa5e9ff84f1aefd264e6869907646538a52147a755d494517a8007fb48733"
dependencies = [
"regex-automata 0.4.13",
"regex-automata",
"rustversion",
]
@@ -7527,7 +7527,7 @@ dependencies = [
"num-traits",
"quote",
"regex",
"regex-syntax 0.8.7",
"regex-syntax",
"serde",
"vergen",
]
@@ -7653,11 +7653,11 @@ checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
[[package]]
name = "matchers"
version = "0.1.0"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
dependencies = [
"regex-automata 0.1.10",
"regex-automata",
]
[[package]]
@@ -8061,7 +8061,7 @@ dependencies = [
"rand 0.9.1",
"rayon",
"regex",
"roaring",
"roaring 0.10.12",
"rskafka",
"rstest 0.25.0",
"rstest_reuse",
@@ -8523,6 +8523,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "num"
version = "0.4.3"
@@ -8942,20 +8951,6 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "opentelemetry"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6"
dependencies = [
"futures-core",
"futures-sink",
"js-sys",
"pin-project-lite",
"thiserror 2.0.17",
"tracing",
]
[[package]]
name = "opentelemetry"
version = "0.31.0"
@@ -8972,48 +8967,36 @@ dependencies = [
[[package]]
name = "opentelemetry-http"
version = "0.30.0"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d"
checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d"
dependencies = [
"async-trait",
"bytes",
"http 1.3.1",
"opentelemetry 0.30.0",
"opentelemetry",
"reqwest",
]
[[package]]
name = "opentelemetry-otlp"
version = "0.30.0"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b"
checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf"
dependencies = [
"http 1.3.1",
"opentelemetry 0.30.0",
"opentelemetry",
"opentelemetry-http",
"opentelemetry-proto 0.30.0",
"opentelemetry_sdk 0.30.0",
"prost 0.13.5",
"opentelemetry-proto",
"opentelemetry_sdk",
"prost 0.14.1",
"reqwest",
"thiserror 2.0.17",
"tokio",
"tonic 0.13.1",
"tonic 0.14.2",
"tracing",
]
[[package]]
name = "opentelemetry-proto"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc"
dependencies = [
"opentelemetry 0.30.0",
"opentelemetry_sdk 0.30.0",
"prost 0.13.5",
"tonic 0.13.1",
]
[[package]]
name = "opentelemetry-proto"
version = "0.31.0"
@@ -9022,8 +9005,8 @@ checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f"
dependencies = [
"base64 0.22.1",
"const-hex",
"opentelemetry 0.31.0",
"opentelemetry_sdk 0.31.0",
"opentelemetry",
"opentelemetry_sdk",
"prost 0.14.1",
"serde",
"serde_json",
@@ -9033,27 +9016,9 @@ dependencies = [
[[package]]
name = "opentelemetry-semantic-conventions"
version = "0.30.0"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83d059a296a47436748557a353c5e6c5705b9470ef6c95cfc52c21a8814ddac2"
[[package]]
name = "opentelemetry_sdk"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b"
dependencies = [
"futures-channel",
"futures-executor",
"futures-util",
"opentelemetry 0.30.0",
"percent-encoding",
"rand 0.9.1",
"serde_json",
"thiserror 2.0.17",
"tokio",
"tokio-stream",
]
checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846"
[[package]]
name = "opentelemetry_sdk"
@@ -9064,10 +9029,12 @@ dependencies = [
"futures-channel",
"futures-executor",
"futures-util",
"opentelemetry 0.31.0",
"opentelemetry",
"percent-encoding",
"rand 0.9.1",
"thiserror 2.0.17",
"tokio",
"tokio-stream",
]
[[package]]
@@ -9227,7 +9194,7 @@ checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1"
[[package]]
name = "otel-arrow-rust"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/otel-arrow?rev=5da284414e9b14f678344b51e5292229e4b5f8d2#5da284414e9b14f678344b51e5292229e4b5f8d2"
source = "git+https://github.com/GreptimeTeam/otel-arrow?rev=452821e455b16e9a397a09d299340e197eb91571#452821e455b16e9a397a09d299340e197eb91571"
dependencies = [
"ahash 0.8.12",
"arrow 56.2.0",
@@ -9243,6 +9210,7 @@ dependencies = [
"prost-build 0.14.1",
"rand 0.9.1",
"replace_with",
"roaring 0.11.3",
"serde",
"smallvec",
"snafu 0.8.6",
@@ -9255,7 +9223,7 @@ dependencies = [
[[package]]
name = "otlp-derive"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/otel-arrow?rev=5da284414e9b14f678344b51e5292229e4b5f8d2#5da284414e9b14f678344b51e5292229e4b5f8d2"
source = "git+https://github.com/GreptimeTeam/otel-arrow?rev=452821e455b16e9a397a09d299340e197eb91571#452821e455b16e9a397a09d299340e197eb91571"
dependencies = [
"convert_case 0.8.0",
"otlp-model",
@@ -9267,7 +9235,7 @@ dependencies = [
[[package]]
name = "otlp-model"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/otel-arrow?rev=5da284414e9b14f678344b51e5292229e4b5f8d2#5da284414e9b14f678344b51e5292229e4b5f8d2"
source = "git+https://github.com/GreptimeTeam/otel-arrow?rev=452821e455b16e9a397a09d299340e197eb91571#452821e455b16e9a397a09d299340e197eb91571"
dependencies = [
"tonic-prost-build",
]
@@ -10281,7 +10249,7 @@ dependencies = [
"rand 0.9.1",
"rand_chacha 0.9.0",
"rand_xorshift",
"regex-syntax 0.8.7",
"regex-syntax",
"unarray",
]
@@ -11041,17 +11009,8 @@ checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata 0.4.13",
"regex-syntax 0.8.7",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
"regex-automata",
"regex-syntax",
]
[[package]]
@@ -11062,7 +11021,7 @@ checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.8.7",
"regex-syntax",
]
[[package]]
@@ -11076,7 +11035,7 @@ dependencies = [
"itertools 0.13.0",
"nohash",
"regex",
"regex-syntax 0.8.7",
"regex-syntax",
]
[[package]]
@@ -11085,12 +11044,6 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a"
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.7"
@@ -11295,6 +11248,16 @@ dependencies = [
"byteorder",
]
[[package]]
name = "roaring"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ba9ce64a8f45d7fc86358410bb1a82e8c987504c0d4900e9141d69a9f26c885"
dependencies = [
"bytemuck",
"byteorder",
]
[[package]]
name = "robust"
version = "1.2.0"
@@ -12174,7 +12137,7 @@ dependencies = [
"once_cell",
"openmetrics-parser",
"opensrv-mysql",
"opentelemetry-proto 0.31.0",
"opentelemetry-proto",
"operator",
"otel-arrow-rust",
"parking_lot 0.12.4",
@@ -13434,7 +13397,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d60769b80ad7953d8a7b2c70cdfe722bbcdcac6bccc8ac934c40c034d866fc18"
dependencies = [
"byteorder",
"regex-syntax 0.8.7",
"regex-syntax",
"utf8-ranges",
]
@@ -13659,7 +13622,7 @@ dependencies = [
"moka",
"mysql_async",
"object-store",
"opentelemetry-proto 0.31.0",
"opentelemetry-proto",
"operator",
"otel-arrow-rust",
"partition",
@@ -14124,32 +14087,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "tonic"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
dependencies = [
"async-trait",
"base64 0.22.1",
"bytes",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
"hyper 1.6.0",
"hyper-timeout 0.5.2",
"hyper-util",
"percent-encoding",
"pin-project",
"prost 0.13.5",
"tokio",
"tokio-stream",
"tower 0.5.2",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic"
version = "0.14.2"
@@ -14340,9 +14277,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]]
name = "tracing"
version = "0.1.41"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
dependencies = [
"log",
"pin-project-lite",
@@ -14364,9 +14301,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.28"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
dependencies = [
"proc-macro2",
"quote",
@@ -14375,9 +14312,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.34"
version = "0.1.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678"
checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
dependencies = [
"once_cell",
"valuable",
@@ -14396,14 +14333,12 @@ dependencies = [
[[package]]
name = "tracing-opentelemetry"
version = "0.31.0"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c"
checksum = "1ac28f2d093c6c477eaa76b23525478f38de514fa9aeb1285738d4b97a9552fc"
dependencies = [
"js-sys",
"once_cell",
"opentelemetry 0.30.0",
"opentelemetry_sdk 0.30.0",
"opentelemetry",
"smallvec",
"tracing",
"tracing-core",
@@ -14424,14 +14359,14 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.19"
version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
dependencies = [
"matchers",
"nu-ansi-term",
"nu-ansi-term 0.50.3",
"once_cell",
"regex",
"regex-automata",
"serde",
"serde_json",
"sharded-slab",

View File

@@ -181,7 +181,7 @@ opentelemetry-proto = { version = "0.31", features = [
"logs",
] }
ordered-float = { version = "4.3", features = ["serde"] }
otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "5da284414e9b14f678344b51e5292229e4b5f8d2", features = [
otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "452821e455b16e9a397a09d299340e197eb91571", features = [
"server",
] }
parking_lot = "0.12"
@@ -191,8 +191,8 @@ pin-project = "1.0"
pretty_assertions = "1.4.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.7.1", features = ["ser"] }
prost = { version = "0.14", features = ["no-recursion-limit"] }
prost-types = "0.14"
prost = { version = "=0.14.1", features = ["no-recursion-limit"] }
prost-types = "=0.14.1"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.9"
ratelimit = "0.10"
@@ -240,7 +240,7 @@ tower = "0.5"
tower-http = "0.6"
tracing = "0.1"
tracing-appender = "0.2"
tracing-opentelemetry = "0.31.0"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] }
typetag = "0.2"
uuid = { version = "1.17", features = ["serde", "v4", "fast-rng"] }

View File

@@ -102,6 +102,6 @@ pub fn create_region_request_builder_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template_from_raw_table_info(raw_table_info, false)?;
let template = build_template_from_raw_table_info(raw_table_info)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}

View File

@@ -20,9 +20,7 @@ use api::v1::region::{CreateRequest, RegionColumnDef};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, is_metric_engine_internal_column,
};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, TableId};
@@ -32,45 +30,34 @@ use crate::wal_provider::prepare_wal_options;
/// Constructs a [CreateRequest] based on the provided [RawTableInfo].
///
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
pub fn build_template_from_raw_table_info(
raw_table_info: &RawTableInfo,
skip_internal_columns: bool,
) -> Result<CreateRequest> {
pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result<CreateRequest> {
let primary_key_indices = &raw_table_info.meta.primary_key_indices;
let filtered = raw_table_info
let column_defs = raw_table_info
.meta
.schema
.column_schemas
.iter()
.enumerate()
.filter(|(_, c)| !skip_internal_columns || !is_metric_engine_internal_column(&c.name))
.map(|(i, c)| {
let is_primary_key = primary_key_indices.contains(&i);
let column_def = try_as_column_def(c, is_primary_key)
.context(error::ConvertColumnDefSnafu { column: &c.name })?;
Ok((
is_primary_key.then_some(i),
RegionColumnDef {
column_def: Some(column_def),
// The column id will be overridden by the metric engine.
// So we just use the index as the column id.
column_id: i as u32,
},
))
})
.collect::<Result<Vec<(Option<usize>, RegionColumnDef)>>>()?;
let (new_primary_key_indices, column_defs): (Vec<_>, Vec<_>) = filtered.into_iter().unzip();
Ok(RegionColumnDef {
column_def: Some(column_def),
// The column id will be overridden by the metric engine.
// So we just use the index as the column id.
column_id: i as u32,
})
})
.collect::<Result<Vec<_>>>()?;
let options = HashMap::from(&raw_table_info.meta.options);
let template = CreateRequest {
region_id: 0,
engine: raw_table_info.meta.engine.clone(),
column_defs,
primary_key: new_primary_key_indices
.iter()
.flatten()
.map(|i| *i as u32)
.collect(),
primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(),
path: String::new(),
options,
partition: None,

View File

@@ -17,7 +17,6 @@ use std::fmt::{Display, Formatter};
use std::time::Duration;
use serde::{Deserialize, Deserializer, Serialize};
use store_api::region_engine::SyncRegionFromRequest;
use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
use strum::Display;
use table::metadata::TableId;
@@ -531,25 +530,6 @@ impl Display for EnterStagingRegion {
}
}
/// Instruction payload for syncing a region from a manifest or another region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SyncRegion {
/// Region id to sync.
pub region_id: RegionId,
/// Request to sync the region.
pub request: SyncRegionFromRequest,
}
impl Display for SyncRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"SyncRegion(region_id={}, request={:?})",
self.region_id, self.request
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RemapManifest {
pub region_id: RegionId,
@@ -622,11 +602,8 @@ pub enum Instruction {
Suspend,
/// Makes regions enter staging state.
EnterStagingRegions(Vec<EnterStagingRegion>),
/// Syncs regions.
SyncRegions(Vec<SyncRegion>),
/// Remaps manifests for a region.
RemapManifest(RemapManifest),
/// Applies staging manifests for a region.
ApplyStagingManifests(Vec<ApplyStagingManifest>),
}
@@ -692,13 +669,6 @@ impl Instruction {
_ => None,
}
}
pub fn into_sync_regions(self) -> Option<Vec<SyncRegion>> {
match self {
Self::SyncRegions(sync_regions) => Some(sync_regions),
_ => None,
}
}
}
/// The reply of [UpgradeRegion].
@@ -814,31 +784,6 @@ impl EnterStagingRegionsReply {
}
}
/// Reply for a single region sync request.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct SyncRegionReply {
/// Region id of the synced region.
pub region_id: RegionId,
/// Returns true if the region is successfully synced and ready.
pub ready: bool,
/// Indicates whether the region exists.
pub exists: bool,
/// Return error message if any during the operation.
pub error: Option<String>,
}
/// Reply for a batch of region sync requests.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct SyncRegionsReply {
pub replies: Vec<SyncRegionReply>,
}
impl SyncRegionsReply {
pub fn new(replies: Vec<SyncRegionReply>) -> Self {
Self { replies }
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct RemapManifestReply {
/// Returns false if the region does not exist.
@@ -902,7 +847,6 @@ pub enum InstructionReply {
GetFileRefs(GetFileRefsReply),
GcRegions(GcRegionsReply),
EnterStagingRegions(EnterStagingRegionsReply),
SyncRegions(SyncRegionsReply),
RemapManifest(RemapManifestReply),
ApplyStagingManifests(ApplyStagingManifestsReply),
}
@@ -928,9 +872,6 @@ impl Display for InstructionReply {
reply.replies
)
}
Self::SyncRegions(reply) => {
write!(f, "InstructionReply::SyncRegions({:?})", reply.replies)
}
Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
Self::ApplyStagingManifests(reply) => write!(
f,
@@ -985,13 +926,6 @@ impl InstructionReply {
}
}
pub fn expect_sync_regions_reply(self) -> Vec<SyncRegionReply> {
match self {
Self::SyncRegions(reply) => reply.replies,
_ => panic!("Expected SyncRegion reply"),
}
}
pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
match self {
Self::RemapManifest(reply) => reply,

View File

@@ -150,7 +150,7 @@ fn create_region_request_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template_from_raw_table_info(raw_table_info, false)?;
let template = build_template_from_raw_table_info(raw_table_info)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}

View File

@@ -21,12 +21,12 @@ greptime-proto.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
once_cell.workspace = true
opentelemetry = { version = "0.30.0", default-features = false, features = [
opentelemetry = { version = "0.31.0", default-features = false, features = [
"trace",
] }
opentelemetry-otlp = { version = "0.30.0", features = ["trace", "grpc-tonic", "http-proto"] }
opentelemetry-semantic-conventions = { version = "0.30.0", features = ["semconv_experimental"] }
opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio", "trace"] }
opentelemetry-otlp = { version = "0.31.0", features = ["trace", "grpc-tonic", "http-proto"] }
opentelemetry-semantic-conventions = { version = "0.31.0", features = ["semconv_experimental"] }
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "trace"] }
parking_lot.workspace = true
prometheus.workspace = true
serde.workspace = true

View File

@@ -73,7 +73,7 @@ impl TracingContext {
/// Attach the given span as a child of the context. Returns the attached span.
pub fn attach(&self, span: tracing::Span) -> tracing::Span {
span.set_parent(self.0.clone());
let _ = span.set_parent(self.0.clone());
span
}

View File

@@ -31,7 +31,6 @@ mod flush_region;
mod gc_worker;
mod open_region;
mod remap_manifest;
mod sync_region;
mod upgrade_region;
use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler;
@@ -43,7 +42,6 @@ use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
use crate::heartbeat::handler::open_region::OpenRegionsHandler;
use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
use crate::heartbeat::handler::sync_region::SyncRegionHandler;
use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
use crate::heartbeat::task_tracker::TaskTracker;
use crate::region_server::RegionServer;
@@ -134,7 +132,6 @@ impl RegionHeartbeatResponseHandler {
Instruction::EnterStagingRegions(_) => {
Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
}
Instruction::SyncRegions(_) => Ok(Some(Box::new(SyncRegionHandler.into()))),
Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))),
Instruction::ApplyStagingManifests(_) => {
Ok(Some(Box::new(ApplyStagingManifestsHandler.into())))
@@ -153,7 +150,6 @@ pub enum InstructionHandlers {
GetFileRefs(GetFileRefsHandler),
GcRegions(GcRegionsHandler),
EnterStagingRegions(EnterStagingRegionsHandler),
SyncRegions(SyncRegionHandler),
RemapManifest(RemapManifestHandler),
ApplyStagingManifests(ApplyStagingManifestsHandler),
}
@@ -179,7 +175,6 @@ impl_from_handler!(
GetFileRefsHandler => GetFileRefs,
GcRegionsHandler => GcRegions,
EnterStagingRegionsHandler => EnterStagingRegions,
SyncRegionHandler => SyncRegions,
RemapManifestHandler => RemapManifest,
ApplyStagingManifestsHandler => ApplyStagingManifests
);
@@ -227,7 +222,6 @@ dispatch_instr!(
GetFileRefs => GetFileRefs,
GcRegions => GcRegions,
EnterStagingRegions => EnterStagingRegions,
SyncRegions => SyncRegions,
RemapManifest => RemapManifest,
ApplyStagingManifests => ApplyStagingManifests,
);

View File

@@ -48,32 +48,19 @@ impl ApplyStagingManifestsHandler {
ctx: &HandlerContext,
request: ApplyStagingManifest,
) -> ApplyStagingManifestReply {
let ApplyStagingManifest {
region_id,
ref partition_expr,
central_region_id,
ref manifest_path,
} = request;
common_telemetry::info!(
"Datanode received apply staging manifest request, region_id: {}, central_region_id: {}, partition_expr: {}, manifest_path: {}",
region_id,
central_region_id,
partition_expr,
manifest_path
);
let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
let Some(leader) = ctx.region_server.is_region_leader(request.region_id) else {
warn!("Region: {} is not found", request.region_id);
return ApplyStagingManifestReply {
region_id,
region_id: request.region_id,
exists: false,
ready: false,
error: None,
};
};
if !leader {
warn!("Region: {} is not leader", region_id);
warn!("Region: {} is not leader", request.region_id);
return ApplyStagingManifestReply {
region_id,
region_id: request.region_id,
exists: true,
ready: false,
error: Some("Region is not leader".into()),
@@ -83,25 +70,25 @@ impl ApplyStagingManifestsHandler {
match ctx
.region_server
.handle_request(
region_id,
request.region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: partition_expr.clone(),
central_region_id,
manifest_path: manifest_path.clone(),
partition_expr: request.partition_expr,
central_region_id: request.central_region_id,
manifest_path: request.manifest_path,
}),
)
.await
{
Ok(_) => ApplyStagingManifestReply {
region_id,
region_id: request.region_id,
exists: true,
ready: true,
error: None,
},
Err(err) => {
error!(err; "Failed to apply staging manifest, region_id: {}", region_id);
error!(err; "Failed to apply staging manifest");
ApplyStagingManifestReply {
region_id,
region_id: request.region_id,
exists: true,
ready: false,
error: Some(format!("{err:?}")),

View File

@@ -51,11 +51,6 @@ impl EnterStagingRegionsHandler {
partition_expr,
}: EnterStagingRegion,
) -> EnterStagingRegionReply {
common_telemetry::info!(
"Datanode received enter staging region: {}, partition_expr: {}",
region_id,
partition_expr
);
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return EnterStagingRegionReply {
@@ -90,7 +85,7 @@ impl EnterStagingRegionsHandler {
error: None,
},
Err(err) => {
error!(err; "Failed to enter staging region, region_id: {}", region_id);
error!(err; "Failed to enter staging region");
EnterStagingRegionReply {
region_id,
ready: false,

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_meta::instruction::{InstructionReply, RemapManifest, RemapManifestReply};
use common_telemetry::{error, info, warn};
use common_telemetry::warn;
use store_api::region_engine::RemapManifestsRequest;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
@@ -34,12 +34,6 @@ impl InstructionHandler for RemapManifestHandler {
region_mapping,
new_partition_exprs,
} = request;
info!(
"Datanode received remap manifest request, region_id: {}, input_regions: {}, target_regions: {}",
region_id,
input_regions.len(),
new_partition_exprs.len()
);
let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return Some(InstructionReply::RemapManifest(RemapManifestReply {
@@ -73,18 +67,11 @@ impl InstructionHandler for RemapManifestHandler {
manifest_paths: result.manifest_paths,
error: None,
}),
Err(e) => {
error!(
e;
"Remap manifests failed on datanode, region_id: {}",
region_id
);
InstructionReply::RemapManifest(RemapManifestReply {
exists: true,
manifest_paths: Default::default(),
error: Some(format!("{e:?}")),
})
}
Err(e) => InstructionReply::RemapManifest(RemapManifestReply {
exists: true,
manifest_paths: Default::default(),
error: Some(format!("{e:?}")),
}),
};
Some(reply)

View File

@@ -1,192 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{InstructionReply, SyncRegion, SyncRegionReply, SyncRegionsReply};
use common_telemetry::{error, info, warn};
use futures::future::join_all;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
/// Handler for [SyncRegion] instruction.
/// It syncs the region from a manifest or another region.
#[derive(Debug, Clone, Copy, Default)]
pub struct SyncRegionHandler;
#[async_trait::async_trait]
impl InstructionHandler for SyncRegionHandler {
type Instruction = Vec<SyncRegion>;
/// Handles a batch of [SyncRegion] instructions.
async fn handle(
&self,
ctx: &HandlerContext,
regions: Self::Instruction,
) -> Option<InstructionReply> {
let futures = regions
.into_iter()
.map(|sync_region| Self::handle_sync_region(ctx, sync_region));
let results = join_all(futures).await;
Some(InstructionReply::SyncRegions(SyncRegionsReply::new(
results,
)))
}
}
impl SyncRegionHandler {
/// Handles a single [SyncRegion] instruction.
async fn handle_sync_region(
ctx: &HandlerContext,
SyncRegion { region_id, request }: SyncRegion,
) -> SyncRegionReply {
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return SyncRegionReply {
region_id,
ready: false,
exists: false,
error: None,
};
};
if !writable {
warn!("Region: {} is not writable", region_id);
return SyncRegionReply {
region_id,
ready: false,
exists: true,
error: Some("Region is not writable".into()),
};
}
match ctx.region_server.sync_region(region_id, request).await {
Ok(_) => {
info!("Successfully synced region: {}", region_id);
SyncRegionReply {
region_id,
ready: true,
exists: true,
error: None,
}
}
Err(e) => {
error!(e; "Failed to sync region: {}", region_id);
SyncRegionReply {
region_id,
ready: false,
exists: true,
error: Some(format!("{:?}", e)),
}
}
}
}
}
#[cfg(test)]
mod tests {
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{RegionRole, SyncRegionFromRequest};
use store_api::storage::RegionId;
use crate::heartbeat::handler::sync_region::SyncRegionHandler;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
use crate::tests::{MockRegionEngine, mock_region_server};
#[tokio::test]
async fn test_handle_sync_region_not_found() {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(METRIC_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let handler = SyncRegionHandler;
let region_id = RegionId::new(1024, 1);
let sync_region = common_meta::instruction::SyncRegion {
region_id,
request: SyncRegionFromRequest::from_manifest(Default::default()),
};
let reply = handler
.handle(&handler_context, vec![sync_region])
.await
.unwrap()
.expect_sync_regions_reply();
assert_eq!(reply.len(), 1);
assert_eq!(reply[0].region_id, region_id);
assert!(!reply[0].exists);
assert!(!reply[0].ready);
}
#[tokio::test]
async fn test_handle_sync_region_not_writable() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
r.mock_role = Some(Some(RegionRole::Follower));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let handler = SyncRegionHandler;
let sync_region = common_meta::instruction::SyncRegion {
region_id,
request: SyncRegionFromRequest::from_manifest(Default::default()),
};
let reply = handler
.handle(&handler_context, vec![sync_region])
.await
.unwrap()
.expect_sync_regions_reply();
assert_eq!(reply.len(), 1);
assert_eq!(reply[0].region_id, region_id);
assert!(reply[0].exists);
assert!(!reply[0].ready);
assert!(reply[0].error.is_some());
}
#[tokio::test]
async fn test_handle_sync_region_success() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(METRIC_ENGINE_NAME, |r| {
r.mock_role = Some(Some(RegionRole::Leader));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let handler = SyncRegionHandler;
let sync_region = common_meta::instruction::SyncRegion {
region_id,
request: SyncRegionFromRequest::from_manifest(Default::default()),
};
let reply = handler
.handle(&handler_context, vec![sync_region])
.await
.unwrap()
.expect_sync_regions_reply();
assert_eq!(reply.len(), 1);
assert_eq!(reply[0].region_id, region_id);
assert!(reply[0].exists);
assert!(reply[0].ready);
assert!(reply[0].error.is_none());
}
}

View File

@@ -115,17 +115,12 @@ pub type MockSetReadonlyGracefullyHandler =
pub type MockGetMetadataHandler =
Box<dyn Fn(RegionId) -> Result<RegionMetadataRef, Error> + Send + Sync>;
pub type MockSyncRegionHandler = Box<
dyn Fn(RegionId, SyncRegionFromRequest) -> Result<SyncRegionFromResponse, Error> + Send + Sync,
>;
pub struct MockRegionEngine {
sender: Sender<(RegionId, RegionRequest)>,
pub(crate) handle_request_delay: Option<Duration>,
pub(crate) handle_request_mock_fn: Option<MockRequestHandler>,
pub(crate) handle_set_readonly_gracefully_mock_fn: Option<MockSetReadonlyGracefullyHandler>,
pub(crate) handle_get_metadata_mock_fn: Option<MockGetMetadataHandler>,
pub(crate) handle_sync_region_mock_fn: Option<MockSyncRegionHandler>,
pub(crate) mock_role: Option<Option<RegionRole>>,
engine: String,
}
@@ -141,7 +136,6 @@ impl MockRegionEngine {
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
handle_sync_region_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
@@ -162,7 +156,6 @@ impl MockRegionEngine {
handle_request_mock_fn: Some(mock_fn),
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
handle_sync_region_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
@@ -183,7 +176,6 @@ impl MockRegionEngine {
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: Some(mock_fn),
handle_sync_region_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
@@ -205,7 +197,6 @@ impl MockRegionEngine {
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
handle_sync_region_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
};
@@ -295,14 +286,10 @@ impl RegionEngine for MockRegionEngine {
async fn sync_region(
&self,
region_id: RegionId,
request: SyncRegionFromRequest,
_region_id: RegionId,
_request: SyncRegionFromRequest,
) -> Result<SyncRegionFromResponse, BoxedError> {
if let Some(mock_fn) = &self.handle_sync_region_mock_fn {
return mock_fn(region_id, request).map_err(BoxedError::new);
};
Ok(SyncRegionFromResponse::Mito { synced: true })
unimplemented!()
}
async fn remap_manifests(

View File

@@ -14,15 +14,19 @@
use std::any::Any;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use tokio::time::Instant;
use crate::error::{self, Result};
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::procedure::utils;
use crate::service::mailbox::Channel;
/// Flushes the leader region before downgrading it.
///
@@ -57,6 +61,15 @@ impl State for PreFlushRegion {
}
impl PreFlushRegion {
/// Builds flush leader region instruction.
fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
let pc = &ctx.persistent_ctx;
Instruction::FlushRegions(FlushRegions::sync_batch(
pc.region_ids.clone(),
FlushErrorStrategy::TryAll,
))
}
/// Tries to flush a leader region.
///
/// Ignore:
@@ -76,18 +89,109 @@ impl PreFlushRegion {
.context(error::ExceededDeadlineSnafu {
operation: "Flush leader region",
})?;
let flush_instruction = self.build_flush_leader_region_instruction(ctx);
let region_ids = &ctx.persistent_ctx.region_ids;
let leader = &ctx.persistent_ctx.from_peer;
utils::flush_region(
&ctx.mailbox,
&ctx.server_addr,
region_ids,
leader,
operation_timeout,
utils::ErrorStrategy::Ignore,
let msg = MailboxMessage::json_message(
&format!("Flush leader region: {:?}", region_ids),
&format!("Metasrv@{}", ctx.server_addr()),
&format!("Datanode-{}@{}", leader.id, leader.addr),
common_time::util::current_time_millis(),
&flush_instruction,
)
.await
.with_context(|_| error::SerializeToJsonSnafu {
input: flush_instruction.to_string(),
})?;
let ch = Channel::Datanode(leader.id);
let now = Instant::now();
let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
match result {
Ok(receiver) => match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received flush leader region reply: {:?}, region: {:?}, elapsed: {:?}",
reply,
region_ids,
now.elapsed()
);
let reply_result = match reply {
InstructionReply::FlushRegions(flush_reply) => {
if flush_reply.results.len() != region_ids.len() {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: format!(
"expect {} region flush result, but got {}",
region_ids.len(),
flush_reply.results.len()
),
}
.fail();
}
match flush_reply.overall_success {
true => (true, None),
false => (
false,
Some(
flush_reply
.results
.iter()
.filter_map(|(region_id, result)| match result {
Ok(_) => None,
Err(e) => Some(format!("{}: {}", region_id, e)),
})
.collect::<Vec<String>>()
.join("; "),
),
),
}
}
_ => {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect flush region reply",
}
.fail();
}
};
let (result, error) = reply_result;
if let Some(error) = error {
warn!(
"Failed to flush leader regions {:?} on datanode {:?}, error: {}. Skip flush operation.",
region_ids, leader, &error
);
} else if result {
info!(
"The flush leader regions {:?} on datanode {:?} is successful, elapsed: {:?}",
region_ids,
leader,
now.elapsed()
);
}
Ok(())
}
Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
operation: "Flush leader regions",
}
.fail(),
Err(err) => Err(err),
},
Err(Error::PusherNotFound { .. }) => {
warn!(
"Failed to flush leader regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
region_ids, leader
);
Ok(())
}
Err(err) => Err(err),
}
}
}
@@ -98,13 +202,11 @@ mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
};
use crate::service::mailbox::Channel;
fn new_persistent_context() -> PersistentContext {
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))

View File

@@ -47,7 +47,7 @@ use common_procedure::{
BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
};
use common_telemetry::{error, info};
use common_telemetry::error;
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -232,10 +232,7 @@ impl Context {
&new_region_routes,
table_id,
)?;
info!(
"Updating table route for table: {}, new region routes: {:?}",
table_id, new_region_routes
);
self.table_metadata_manager
.update_table_route(
table_id,
@@ -265,13 +262,6 @@ impl Context {
.await;
Ok(())
}
/// Returns the next operation timeout.
///
/// If the next operation timeout is not set, it will return `None`.
pub fn next_operation_timeout(&self) -> Option<std::time::Duration> {
Some(std::time::Duration::from_secs(10))
}
}
#[async_trait::async_trait]
@@ -345,13 +335,6 @@ impl Procedure for RepartitionProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let state_name = state.name();
// Log state transition
common_telemetry::info!(
"Repartition procedure executing state: {}, table_id: {}",
state_name,
self.context.persistent_ctx.table_id
);
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;

View File

@@ -65,12 +65,6 @@ impl State for AllocateRegion {
&mut next_region_number,
&self.plan_entries,
);
let plan_count = repartition_plan_entries.len();
let to_allocate = Self::count_regions_to_allocate(&repartition_plan_entries);
info!(
"Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
table_id, plan_count, to_allocate
);
// If no region to allocate, directly dispatch the plan.
if Self::count_regions_to_allocate(&repartition_plan_entries) == 0 {
@@ -105,20 +99,6 @@ impl State for AllocateRegion {
.await
.context(error::AllocateWalOptionsSnafu { table_id })?;
let new_region_count = new_allocated_region_routes.len();
let new_regions_brief: Vec<_> = new_allocated_region_routes
.iter()
.map(|route| {
let region_id = route.region.id;
let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default();
format!("region_id: {}, peer: {}", region_id, peer)
})
.collect();
info!(
"Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}",
table_id, new_region_count, new_regions_brief
);
let _operating_guards = Self::register_operating_regions(
&ctx.memory_region_keeper,
&new_allocated_region_routes,
@@ -157,6 +137,7 @@ impl AllocateRegion {
Self { plan_entries }
}
#[allow(dead_code)]
fn register_operating_regions(
memory_region_keeper: &MemoryRegionKeeperRef,
region_routes: &[RegionRoute],
@@ -174,6 +155,7 @@ impl AllocateRegion {
Ok(operating_guards)
}
#[allow(dead_code)]
fn generate_region_routes(
region_routes: &[RegionRoute],
new_allocated_region_ids: &[RegionRoute],
@@ -195,6 +177,7 @@ impl AllocateRegion {
///
/// This method takes the allocation plan entries and converts them to repartition plan entries,
/// updating `next_region_number` for each newly allocated region.
#[allow(dead_code)]
fn convert_to_repartition_plans(
table_id: TableId,
next_region_number: &mut RegionNumber,
@@ -213,6 +196,7 @@ impl AllocateRegion {
}
/// Collects all regions that need to be allocated from the repartition plan entries.
#[allow(dead_code)]
fn collect_allocate_regions(
repartition_plan_entries: &[RepartitionPlanEntry],
) -> Vec<&RegionDescriptor> {
@@ -223,6 +207,7 @@ impl AllocateRegion {
}
/// Prepares region allocation data: region numbers and their partition expressions.
#[allow(dead_code)]
fn prepare_region_allocation_data(
allocate_regions: &[&RegionDescriptor],
) -> Result<Vec<(RegionNumber, String)>> {
@@ -240,6 +225,7 @@ impl AllocateRegion {
}
/// Calculates the total number of regions that need to be allocated.
#[allow(dead_code)]
fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
repartition_plan_entries
.iter()
@@ -248,10 +234,12 @@ impl AllocateRegion {
}
/// Gets the next region number from the physical table route.
#[allow(dead_code)]
fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
max_region_number + 1
}
#[allow(dead_code)]
async fn allocate_regions(
node_manager: &NodeManagerRef,
raw_table_info: &RawTableInfo,
@@ -264,14 +252,12 @@ impl AllocateRegion {
&raw_table_info.name,
);
let table_id = raw_table_info.ident.table_id;
let request = build_template_from_raw_table_info(raw_table_info, true)
let request = build_template_from_raw_table_info(raw_table_info)
.context(error::BuildCreateRequestSnafu { table_id })?;
let builder = CreateRequestBuilder::new(request, None);
let region_count = region_routes.len();
let wal_region_count = wal_options.len();
info!(
"Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}",
table_id, region_count, wal_region_count
"Allocating regions for table: {}, region_routes: {:?}, wal_options: {:?}",
table_id, region_routes, wal_options
);
let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
executor

View File

@@ -15,7 +15,7 @@
use std::any::Any;
use common_procedure::{Context as ProcedureContext, ProcedureId, Status, watcher};
use common_telemetry::{error, info};
use common_telemetry::error;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -64,10 +64,9 @@ impl Collect {
impl State for Collect {
async fn next(
&mut self,
ctx: &mut Context,
_ctx: &mut Context,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.persistent_ctx.table_id;
for procedure_meta in self.inflight_procedures.iter() {
let procedure_id = procedure_meta.procedure_id;
let group_id = procedure_meta.group_id;
@@ -94,16 +93,7 @@ impl State for Collect {
}
}
let inflight = self.inflight_procedures.len();
let succeeded = self.succeeded_procedures.len();
let failed = self.failed_procedures.len();
let unknown = self.unknown_procedures.len();
info!(
"Collected repartition group results for table_id: {}, inflight: {}, succeeded: {}, failed: {}, unknown: {}",
table_id, inflight, succeeded, failed, unknown
);
if failed > 0 || unknown > 0 {
if !self.failed_procedures.is_empty() || !self.unknown_procedures.is_empty() {
// TODO(weny): retry the failed or unknown procedures.
}

View File

@@ -62,10 +62,9 @@ impl State for DeallocateRegion {
.flat_map(|p| p.pending_deallocate_region_ids.iter())
.cloned()
.collect::<HashSet<_>>();
let dealloc_count = pending_deallocate_region_ids.len();
info!(
"Deallocating regions for repartition, table_id: {}, count: {}, regions: {:?}",
table_id, dealloc_count, pending_deallocate_region_ids
"Deallocating regions: {:?} for table: {} during repartition procedure",
pending_deallocate_region_ids, table_id
);
let table_lock = TableLock::Write(table_id).into();
@@ -112,6 +111,7 @@ impl State for DeallocateRegion {
}
impl DeallocateRegion {
#[allow(dead_code)]
async fn deallocate_regions(
node_manager: &NodeManagerRef,
leader_region_registry: &LeaderRegionRegistryRef,
@@ -136,6 +136,7 @@ impl DeallocateRegion {
Ok(())
}
#[allow(dead_code)]
fn filter_deallocatable_region_routes(
table_id: TableId,
region_routes: &[RegionRoute],
@@ -160,6 +161,7 @@ impl DeallocateRegion {
.collect::<Vec<_>>()
}
#[allow(dead_code)]
fn generate_region_routes(
region_routes: &[RegionRoute],
pending_deallocate_region_ids: &HashSet<RegionId>,

View File

@@ -16,9 +16,7 @@ use std::any::Any;
use std::collections::HashMap;
use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::storage::RegionId;
use crate::error::Result;
@@ -30,6 +28,7 @@ use crate::procedure::repartition::{self, Context, State};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Dispatch;
#[allow(dead_code)]
fn build_region_mapping(
source_regions: &[RegionDescriptor],
target_regions: &[RegionDescriptor],
@@ -58,12 +57,8 @@ impl State for Dispatch {
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.persistent_ctx.table_id;
let table_info_value = ctx.get_table_info_value().await?;
let table_engine = table_info_value.table_info.meta.engine;
let sync_region = table_engine == METRIC_ENGINE_NAME;
let plan_count = ctx.persistent_ctx.plans.len();
let mut procedures = Vec::with_capacity(plan_count);
let mut procedure_metas = Vec::with_capacity(plan_count);
let mut procedures = Vec::with_capacity(ctx.persistent_ctx.plans.len());
let mut procedure_metas = Vec::with_capacity(ctx.persistent_ctx.plans.len());
for (plan_index, plan) in ctx.persistent_ctx.plans.iter().enumerate() {
let region_mapping = build_region_mapping(
&plan.source_regions,
@@ -78,9 +73,6 @@ impl State for Dispatch {
plan.source_regions.clone(),
plan.target_regions.clone(),
region_mapping,
sync_region,
plan.allocated_region_ids.clone(),
plan.pending_deallocate_region_ids.clone(),
);
let group_procedure = RepartitionGroupProcedure::new(persistent_ctx, ctx);
@@ -93,14 +85,6 @@ impl State for Dispatch {
procedures.push(procedure);
}
let group_ids: Vec<_> = procedure_metas.iter().map(|m| m.group_id).collect();
info!(
"Dispatch repartition groups for table_id: {}, group_count: {}, group_ids: {:?}",
table_id,
group_ids.len(),
group_ids
);
Ok((
Box::new(Collect::new(procedure_metas)),
Status::suspended(procedures, true),

View File

@@ -17,7 +17,6 @@ pub(crate) mod enter_staging_region;
pub(crate) mod remap_manifest;
pub(crate) mod repartition_end;
pub(crate) mod repartition_start;
pub(crate) mod sync_region;
pub(crate) mod update_metadata;
pub(crate) mod utils;
@@ -41,7 +40,7 @@ use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status, StringKey, UserMetadata,
};
use common_telemetry::{error, info};
use common_telemetry::error;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
@@ -56,6 +55,7 @@ use crate::service::mailbox::MailboxRef;
pub type GroupId = Uuid;
#[allow(dead_code)]
pub struct RepartitionGroupProcedure {
state: Box<dyn State>,
context: Context,
@@ -113,14 +113,6 @@ impl Procedure for RepartitionGroupProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let state_name = state.name();
// Log state transition
common_telemetry::info!(
"Repartition group procedure executing state: {}, group id: {}, table id: {}",
state_name,
self.context.persistent_ctx.group_id,
self.context.persistent_ctx.table_id
);
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
@@ -229,16 +221,9 @@ pub struct PersistentContext {
/// The staging manifest paths of the repartition group.
/// The value will be set in [RemapManifest](crate::procedure::repartition::group::remap_manifest::RemapManifest) state.
pub staging_manifest_paths: HashMap<RegionId, String>,
/// Whether sync region is needed for this group.
pub sync_region: bool,
/// The region ids of the newly allocated regions.
pub allocated_region_ids: Vec<RegionId>,
/// The region ids of the regions that are pending deallocation.
pub pending_deallocate_region_ids: Vec<RegionId>,
}
impl PersistentContext {
#[allow(clippy::too_many_arguments)]
pub fn new(
group_id: GroupId,
table_id: TableId,
@@ -247,9 +232,6 @@ impl PersistentContext {
sources: Vec<RegionDescriptor>,
targets: Vec<RegionDescriptor>,
region_mapping: HashMap<RegionId, Vec<RegionId>>,
sync_region: bool,
allocated_region_ids: Vec<RegionId>,
pending_deallocate_region_ids: Vec<RegionId>,
) -> Self {
Self {
group_id,
@@ -261,9 +243,6 @@ impl PersistentContext {
region_mapping,
group_prepare_result: None,
staging_manifest_paths: HashMap::new(),
sync_region,
allocated_region_ids,
pending_deallocate_region_ids,
}
}
@@ -355,7 +334,6 @@ impl Context {
new_region_routes: Vec<RegionRoute>,
) -> Result<()> {
let table_id = self.persistent_ctx.table_id;
let group_id = self.persistent_ctx.group_id;
// Safety: prepare result is set in [RepartitionStart] state.
let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
let central_region_datanode_table_value = self
@@ -367,10 +345,6 @@ impl Context {
..
} = &central_region_datanode_table_value.region_info;
info!(
"Updating table route for table: {}, group_id: {}, new region routes: {:?}",
table_id, group_id, new_region_routes
);
self.table_metadata_manager
.update_table_route(
table_id,

View File

@@ -31,7 +31,7 @@ use store_api::storage::RegionId;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
use crate::procedure::repartition::group::utils::{
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
};
@@ -52,10 +52,7 @@ impl State for ApplyStagingManifest {
) -> Result<(Box<dyn State>, Status)> {
self.apply_staging_manifests(ctx).await?;
Ok((
Box::new(UpdateMetadata::ExitStaging),
Status::executing(true),
))
Ok((Box::new(RepartitionEnd), Status::executing(true)))
}
fn as_any(&self) -> &dyn Any {
@@ -128,6 +125,7 @@ impl ApplyStagingManifest {
})
}
#[allow(dead_code)]
async fn apply_staging_manifests(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
@@ -152,7 +150,6 @@ impl ApplyStagingManifest {
operation: "Apply staging manifests",
})?;
let instruction_region_count: usize = instructions.values().map(|v| v.len()).sum();
let (peers, tasks): (Vec<_>, Vec<_>) = instructions
.iter()
.map(|(peer, apply_staging_manifests)| {
@@ -169,11 +166,8 @@ impl ApplyStagingManifest {
})
.unzip();
info!(
"Sent apply staging manifests instructions, table_id: {}, group_id: {}, peers: {}, regions: {}",
table_id,
group_id,
peers.len(),
instruction_region_count
"Sent apply staging manifests instructions to peers: {:?} for repartition table {}, group id {}",
peers, table_id, group_id
);
let format_err_msg = |idx: usize, error: &Error| {
@@ -298,7 +292,11 @@ impl ApplyStagingManifest {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
let elapsed = now.elapsed();
info!(
"Received apply staging manifests reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let InstructionReply::ApplyStagingManifests(ApplyStagingManifestsReply { replies }) =
reply
else {
@@ -308,23 +306,9 @@ impl ApplyStagingManifest {
}
.fail();
};
let total = replies.len();
let (mut ready, mut not_ready, mut with_error) = (0, 0, 0);
let region_ids = replies.iter().map(|r| r.region_id).collect::<Vec<_>>();
for reply in replies {
if reply.error.is_some() {
with_error += 1;
} else if reply.ready {
ready += 1;
} else {
not_ready += 1;
}
Self::handle_apply_staging_manifest_reply(&reply, &now, peer)?;
}
info!(
"Received apply staging manifests reply, peer: {:?}, total_regions: {}, regions:{:?}, ready: {}, not_ready: {}, with_error: {}, elapsed: {:?}",
peer, total, region_ids, ready, not_ready, with_error, elapsed
);
Ok(())
}

View File

@@ -23,7 +23,7 @@ use common_meta::instruction::{
use common_meta::peer::Peer;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use futures::future::{join_all, try_join_all};
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
@@ -35,7 +35,6 @@ use crate::procedure::repartition::group::utils::{
};
use crate::procedure::repartition::group::{Context, GroupPrepareResult, State};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::utils::{self, ErrorStrategy};
use crate::service::mailbox::{Channel, MailboxRef};
#[derive(Debug, Serialize, Deserialize)]
@@ -49,7 +48,6 @@ impl State for EnterStagingRegion {
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
self.flush_pending_deallocate_regions(ctx).await?;
self.enter_staging_regions(ctx).await?;
Ok((Box::new(RemapManifest), Status::executing(true)))
@@ -96,6 +94,7 @@ impl EnterStagingRegion {
Ok(instructions)
}
#[allow(dead_code)]
async fn enter_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
@@ -103,8 +102,6 @@ impl EnterStagingRegion {
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let targets = &ctx.persistent_ctx.targets;
let instructions = Self::build_enter_staging_instructions(prepare_result, targets)?;
let target_region_count = targets.len();
let peer_count = instructions.len();
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
@@ -126,8 +123,8 @@ impl EnterStagingRegion {
})
.unzip();
info!(
"Sent enter staging regions instructions, table_id: {}, group_id: {}, peers: {}, target_regions: {}",
table_id, group_id, peer_count, target_region_count
"Sent enter staging regions instructions to peers: {:?} for repartition table {}, group id {}",
peers, table_id, group_id
);
let format_err_msg = |idx: usize, error: &Error| {
@@ -245,7 +242,11 @@ impl EnterStagingRegion {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
let elapsed = now.elapsed();
info!(
"Received enter staging regions reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let InstructionReply::EnterStagingRegions(EnterStagingRegionsReply { replies }) =
reply
else {
@@ -255,22 +256,9 @@ impl EnterStagingRegion {
}
.fail();
};
let total = replies.len();
let (mut ready, mut not_ready, mut with_error) = (0, 0, 0);
for reply in replies {
if reply.error.is_some() {
with_error += 1;
} else if reply.ready {
ready += 1;
} else {
not_ready += 1;
}
Self::handle_enter_staging_region_reply(&reply, &now, peer)?;
}
info!(
"Received enter staging regions reply, peer: {:?}, total_regions: {}, ready: {}, not_ready: {}, with_error: {}, elapsed: {:?}",
peer, total, ready, not_ready, with_error, elapsed
);
Ok(())
}
@@ -332,61 +320,6 @@ impl EnterStagingRegion {
Ok(())
}
async fn flush_pending_deallocate_regions(&self, ctx: &mut Context) -> Result<()> {
let pending_deallocate_region_ids = &ctx.persistent_ctx.pending_deallocate_region_ids;
if pending_deallocate_region_ids.is_empty() {
return Ok(());
}
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Flush pending deallocate regions",
})?;
let result = &ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let source_routes = result
.source_routes
.iter()
.filter(|route| pending_deallocate_region_ids.contains(&route.region.id))
.cloned()
.collect::<Vec<_>>();
let peer_region_ids_map = group_region_routes_by_peer(&source_routes);
info!(
"Flushing pending deallocate regions, table_id: {}, group_id: {}, peer_region_ids_map: {:?}",
table_id, group_id, peer_region_ids_map
);
let now = Instant::now();
let tasks = peer_region_ids_map
.iter()
.map(|(peer, region_ids)| {
utils::flush_region(
&ctx.mailbox,
&ctx.server_addr,
region_ids,
peer,
operation_timeout,
ErrorStrategy::Retry,
)
})
.collect::<Vec<_>>();
try_join_all(tasks).await?;
info!(
"Flushed pending deallocate regions: {:?}, table_id: {}, group_id: {}, elapsed: {:?}",
source_routes
.iter()
.map(|route| route.region.id)
.collect::<Vec<_>>(),
table_id,
group_id,
now.elapsed()
);
Ok(())
}
}
#[cfg(test)]

View File

@@ -65,13 +65,6 @@ impl State for RemapManifest {
.await?;
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let manifest_count = manifest_paths.len();
let input_region_count = ctx.persistent_ctx.sources.len();
let target_region_count = ctx.persistent_ctx.targets.len();
info!(
"Remap manifests finished for repartition, table_id: {}, group_id: {}, input_regions: {}, target_regions: {}, manifest_paths: {}",
table_id, group_id, input_region_count, target_region_count, manifest_count
);
if manifest_paths.len() != ctx.persistent_ctx.targets.len() {
warn!(
@@ -163,7 +156,11 @@ impl RemapManifest {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
let elapsed = now.elapsed();
info!(
"Received remap manifest reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let InstructionReply::RemapManifest(reply) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
@@ -171,11 +168,6 @@ impl RemapManifest {
}
.fail();
};
let manifest_count = reply.manifest_paths.len();
info!(
"Received remap manifest reply for central_region: {}, manifest_paths: {}, elapsed: {:?}",
remap.region_id, manifest_count, elapsed
);
Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer)
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
@@ -22,7 +22,6 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use crate::error::{self, Result};
use crate::procedure::repartition::group::sync_region::SyncRegion;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{
Context, GroupId, GroupPrepareResult, State, region_routes,
@@ -57,6 +56,7 @@ impl RepartitionStart {
/// Ensures that both source and target regions are present in the region routes.
///
/// Both source and target regions must be present in the region routes (target regions should be allocated before repartitioning).
#[allow(dead_code)]
fn ensure_route_present(
group_id: GroupId,
region_routes: &[RegionRoute],
@@ -172,28 +172,6 @@ impl State for RepartitionStart {
ctx.persistent_ctx.targets.len()
);
if ctx.persistent_ctx.sync_region {
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let allocated_region_ids: HashSet<_> = ctx
.persistent_ctx
.allocated_region_ids
.iter()
.copied()
.collect();
let region_routes: Vec<_> = prepare_result
.target_routes
.iter()
.filter(|route| allocated_region_ids.contains(&route.region.id))
.cloned()
.collect();
if !region_routes.is_empty() {
return Ok((
Box::new(SyncRegion { region_routes }),
Status::executing(true),
));
}
}
Ok((
Box::new(UpdateMetadata::ApplyStaging),
Status::executing(true),

View File

@@ -1,445 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::time::{Duration, Instant};
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{Instruction, InstructionReply, SyncRegionReply, SyncRegionsReply};
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::region_engine::SyncRegionFromRequest;
use store_api::storage::RegionId;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::utils::{
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
};
use crate::procedure::repartition::group::{Context, State};
use crate::procedure::utils::ErrorStrategy;
use crate::service::mailbox::{Channel, MailboxRef};
const DEFAULT_SYNC_REGION_PARALLELISM: usize = 3;
/// The state of syncing regions for a repartition group.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncRegion {
pub region_routes: Vec<RegionRoute>,
}
#[async_trait::async_trait]
#[typetag::serde]
impl State for SyncRegion {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
Self::flush_central_region(ctx).await?;
self.sync_regions(ctx).await?;
Ok((
Box::new(UpdateMetadata::ApplyStaging),
Status::executing(true),
))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl SyncRegion {
async fn flush_central_region(ctx: &mut Context) -> Result<()> {
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Flush central region",
})?;
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
crate::procedure::utils::flush_region(
&ctx.mailbox,
&ctx.server_addr,
&[prepare_result.central_region],
&prepare_result.central_region_datanode,
operation_timeout,
ErrorStrategy::Retry,
)
.await
}
/// Builds instructions to sync regions on datanodes.
fn build_sync_region_instructions(
central_region: RegionId,
region_routes: &[RegionRoute],
) -> HashMap<Peer, Vec<common_meta::instruction::SyncRegion>> {
let target_region_routes_by_peer = group_region_routes_by_peer(region_routes);
let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
for (peer, region_ids) in target_region_routes_by_peer {
let sync_regions = region_ids
.into_iter()
.map(|region_id| {
let request = SyncRegionFromRequest::FromRegion {
source_region_id: central_region,
parallelism: DEFAULT_SYNC_REGION_PARALLELISM,
};
common_meta::instruction::SyncRegion { region_id, request }
})
.collect();
instructions.insert((*peer).clone(), sync_regions);
}
instructions
}
/// Syncs regions on datanodes.
async fn sync_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let instructions = Self::build_sync_region_instructions(
prepare_result.central_region,
&self.region_routes,
);
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Sync regions",
})?;
let (peers, tasks): (Vec<_>, Vec<_>) = instructions
.iter()
.map(|(peer, sync_regions)| {
(
peer,
Self::sync_region(
&ctx.mailbox,
&ctx.server_addr,
peer,
sync_regions,
operation_timeout,
),
)
})
.unzip();
info!(
"Sent sync regions instructions to peers: {:?} for repartition table {}",
peers, table_id
);
let format_err_msg = |idx: usize, error: &Error| {
let peer = peers[idx];
format!(
"Failed to sync regions on datanode {:?}, error: {:?}",
peer, error
)
};
let results = join_all(tasks).await;
let result = handle_multiple_results(&results);
match result {
HandleMultipleResult::AllSuccessful => Ok(()),
HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
reason: format!(
"All retryable errors during syncing regions for repartition table {}: {:?}",
table_id,
retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(",")
),
}
.fail(),
HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
violated: format!(
"All non retryable errors during syncing regions for repartition table {}: {:?}",
table_id,
non_retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(",")
),
}
.fail(),
HandleMultipleResult::PartialRetryable {
retryable_errors,
non_retryable_errors,
} => error::UnexpectedSnafu {
violated: format!(
"Partial retryable errors during syncing regions for repartition table {}: {:?}, non retryable errors: {:?}",
table_id,
retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(","),
non_retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(","),
),
}
.fail(),
}
}
/// Syncs regions on a datanode.
async fn sync_region(
mailbox: &MailboxRef,
server_addr: &str,
peer: &Peer,
sync_regions: &[common_meta::instruction::SyncRegion],
timeout: Duration,
) -> Result<()> {
let ch = Channel::Datanode(peer.id);
let instruction = Instruction::SyncRegions(sync_regions.to_vec());
let message = MailboxMessage::json_message(
&format!(
"Sync regions: {:?}",
sync_regions.iter().map(|r| r.region_id).collect::<Vec<_>>()
),
&format!("Metasrv@{}", server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let now = std::time::Instant::now();
let receiver = mailbox.send(&ch, message, timeout).await;
let receiver = match receiver {
Ok(receiver) => receiver,
Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
reason: format!(
"Pusher not found for sync regions on datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
),
}
.fail()?,
Err(err) => {
return Err(err);
}
};
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received sync regions reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let InstructionReply::SyncRegions(SyncRegionsReply { replies }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect sync regions reply",
}
.fail();
};
for reply in replies {
Self::handle_sync_region_reply(&reply, &now, peer)?;
}
Ok(())
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for sync regions on datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
);
error::RetryLaterSnafu { reason }.fail()
}
Err(err) => Err(err),
}
}
fn handle_sync_region_reply(
SyncRegionReply {
region_id,
ready,
exists,
error,
}: &SyncRegionReply,
now: &Instant,
peer: &Peer,
) -> Result<()> {
ensure!(
exists,
error::UnexpectedSnafu {
violated: format!(
"Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
region_id,
peer,
now.elapsed()
)
}
);
if let Some(error) = error {
return error::RetryLaterSnafu {
reason: format!(
"Failed to sync region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
region_id,
peer,
error,
now.elapsed()
),
}
.fail();
}
ensure!(
ready,
error::RetryLaterSnafu {
reason: format!(
"Region {} failed to sync on datanode {:?}, elapsed: {:?}",
region_id,
peer,
now.elapsed()
),
}
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::region_engine::SyncRegionFromRequest;
use store_api::storage::RegionId;
use crate::error::Error;
use crate::procedure::repartition::group::GroupPrepareResult;
use crate::procedure::repartition::group::sync_region::SyncRegion;
use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
use crate::procedure::test_util::{new_sync_region_reply, send_mock_reply};
use crate::service::mailbox::Channel;
#[test]
fn test_build_sync_region_instructions() {
let table_id = 1024;
let central_region = RegionId::new(table_id, 1);
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let instructions =
SyncRegion::build_sync_region_instructions(central_region, &region_routes);
assert_eq!(instructions.len(), 1);
let peer_instructions = instructions.get(&Peer::empty(1)).unwrap();
assert_eq!(peer_instructions.len(), 1);
assert_eq!(peer_instructions[0].region_id, RegionId::new(table_id, 3));
let SyncRegionFromRequest::FromRegion {
source_region_id, ..
} = &peer_instructions[0].request
else {
panic!("expect from region request");
};
assert_eq!(*source_region_id, central_region);
}
fn test_prepare_result(table_id: u32) -> GroupPrepareResult {
GroupPrepareResult {
source_routes: vec![],
target_routes: vec![],
central_region: RegionId::new(table_id, 1),
central_region_datanode: Peer::empty(1),
}
}
#[tokio::test]
async fn test_sync_regions_all_successful() {
let mut env = TestingEnv::new();
let table_id = 1024;
let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
let (tx, rx) = tokio::sync::mpsc::channel(1);
env.mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
.await;
send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
Ok(new_sync_region_reply(
id,
RegionId::new(1024, 3),
true,
true,
None,
))
});
let mut ctx = env.create_context(persistent_context);
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let sync_region = SyncRegion { region_routes };
sync_region.sync_regions(&mut ctx).await.unwrap();
}
#[tokio::test]
async fn test_sync_regions_retryable() {
let env = TestingEnv::new();
let table_id = 1024;
let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
let mut ctx = env.create_context(persistent_context);
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let sync_region = SyncRegion { region_routes };
let err = sync_region.sync_regions(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::RetryLater { .. });
}
}

View File

@@ -13,7 +13,6 @@
// limitations under the License.
pub(crate) mod apply_staging_region;
pub(crate) mod exit_staging_region;
pub(crate) mod rollback_staging_region;
use std::any::Any;
@@ -29,14 +28,11 @@ use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
use crate::procedure::repartition::group::{Context, State};
#[derive(Debug, Serialize, Deserialize)]
#[allow(clippy::enum_variant_names)]
pub enum UpdateMetadata {
/// Applies the new partition expressions for staging regions.
ApplyStaging,
/// Rolls back the new partition expressions for staging regions.
RollbackStaging,
/// Exits the staging regions.
ExitStaging,
}
#[async_trait::async_trait]
@@ -66,18 +62,7 @@ impl State for UpdateMetadata {
if let Err(err) = ctx.invalidate_table_cache().await {
warn!(
err;
"Failed to broadcast the invalidate table cache message during the rollback staging regions"
);
};
Ok((Box::new(RepartitionEnd), Status::executing(false)))
}
UpdateMetadata::ExitStaging => {
self.exit_staging_regions(ctx).await?;
if let Err(err) = ctx.invalidate_table_cache().await {
warn!(
err;
"Failed to broadcast the invalidate table cache message during the exit staging regions"
"Failed to broadcast the invalidate table cache message during the rollback staging regions, error: {err:?}"
);
};
Ok((Box::new(RepartitionEnd), Status::executing(false)))

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::{error, info};
use common_telemetry::error;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
@@ -77,6 +77,7 @@ impl UpdateMetadata {
/// - Source region not found.
/// - Failed to update the table route.
/// - Central region datanode table value not found.
#[allow(dead_code)]
pub(crate) async fn apply_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
@@ -89,13 +90,6 @@ impl UpdateMetadata {
region_routes,
)?;
let source_count = ctx.persistent_ctx.sources.len();
let target_count = ctx.persistent_ctx.targets.len();
info!(
"Apply staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}",
table_id, group_id, source_count, target_count
);
if let Err(err) = ctx
.update_table_route(&current_table_route_value, new_region_routes)
.await

View File

@@ -1,104 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::{error, info};
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{Context, GroupId, region_routes};
use crate::procedure::repartition::plan::RegionDescriptor;
impl UpdateMetadata {
fn exit_staging_region_routes(
group_id: GroupId,
sources: &[RegionDescriptor],
targets: &[RegionDescriptor],
current_region_routes: &[RegionRoute],
) -> Result<Vec<RegionRoute>> {
let mut region_routes = current_region_routes.to_vec();
let mut region_routes_map = region_routes
.iter_mut()
.map(|route| (route.region.id, route))
.collect::<HashMap<_, _>>();
for target in targets {
let region_route = region_routes_map.get_mut(&target.region_id).context(
error::RepartitionTargetRegionMissingSnafu {
group_id,
region_id: target.region_id,
},
)?;
region_route.clear_leader_staging();
}
for source in sources {
let region_route = region_routes_map.get_mut(&source.region_id).context(
error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: source.region_id,
},
)?;
region_route.clear_leader_staging();
}
Ok(region_routes)
}
/// Exits the staging regions.
///
/// Abort:
/// - Table route is not physical.
/// - Target region not found.
/// - Source region not found.
/// - Failed to update the table route.
/// - Central region datanode table value not found.
pub(crate) async fn exit_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let current_table_route_value = ctx.get_table_route_value().await?;
let region_routes = region_routes(table_id, current_table_route_value.get_inner_ref())?;
let new_region_routes = Self::exit_staging_region_routes(
group_id,
&ctx.persistent_ctx.sources,
&ctx.persistent_ctx.targets,
region_routes,
)?;
let source_count = ctx.persistent_ctx.sources.len();
let target_count = ctx.persistent_ctx.targets.len();
info!(
"Exit staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}",
table_id, group_id, source_count, target_count
);
if let Err(err) = ctx
.update_table_route(&current_table_route_value, new_region_routes)
.await
{
error!(err; "Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}");
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}"
),
});
};
Ok(())
}
}

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::{error, info};
use common_telemetry::error;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
@@ -29,6 +29,7 @@ impl UpdateMetadata {
/// Abort:
/// - Source region not found.
/// - Target region not found.
#[allow(dead_code)]
fn rollback_staging_region_routes(
group_id: GroupId,
source_routes: &[RegionRoute],
@@ -73,6 +74,7 @@ impl UpdateMetadata {
/// - Target region not found.
/// - Failed to update the table route.
/// - Central region datanode table value not found.
#[allow(dead_code)]
pub(crate) async fn rollback_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
@@ -87,13 +89,6 @@ impl UpdateMetadata {
region_routes,
)?;
let source_count = prepare_result.source_routes.len();
let target_count = prepare_result.target_routes.len();
info!(
"Rollback staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}",
table_id, group_id, source_count, target_count
);
if let Err(err) = ctx
.update_table_route(&current_table_route_value, new_region_routes)
.await

View File

@@ -16,7 +16,6 @@ use std::any::Any;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::debug;
use partition::expr::PartitionExpr;
use partition::subtask::{self, RepartitionSubtask};
use serde::{Deserialize, Serialize};
@@ -70,17 +69,6 @@ impl State for RepartitionStart {
);
let plans = Self::build_plan(&table_route, &self.from_exprs, &self.to_exprs)?;
let plan_count = plans.len();
let total_source_regions: usize = plans.iter().map(|p| p.source_regions.len()).sum();
let total_target_regions: usize =
plans.iter().map(|p| p.target_partition_exprs.len()).sum();
common_telemetry::info!(
"Repartition start, table_id: {}, plans: {}, total_source_regions: {}, total_target_regions: {}",
table_id,
plan_count,
total_source_regions,
total_target_regions
);
if plans.is_empty() {
return Ok((Box::new(RepartitionEnd), Status::done()));
@@ -98,6 +86,7 @@ impl State for RepartitionStart {
}
impl RepartitionStart {
#[allow(dead_code)]
fn build_plan(
physical_route: &PhysicalTableRouteValue,
from_exprs: &[PartitionExpr],
@@ -117,6 +106,7 @@ impl RepartitionStart {
))
}
#[allow(dead_code)]
fn build_plan_entries(
subtasks: Vec<RepartitionSubtask>,
source_index: &[RegionDescriptor],
@@ -169,9 +159,8 @@ impl RepartitionStart {
.find_map(|(region_id, existing_expr)| {
(existing_expr == &expr_json).then_some(*region_id)
})
.with_context(|| error::RepartitionSourceExprMismatchSnafu { expr: &expr_json })
.inspect_err(|_| {
debug!("Failed to find matching region for partition expression: {}, existing regions: {:?}", expr_json, existing_regions);
.with_context(|| error::RepartitionSourceExprMismatchSnafu {
expr: expr_json,
})?;
Ok(RegionDescriptor {

View File

@@ -96,8 +96,5 @@ pub fn new_persistent_context(
region_mapping: HashMap::new(),
group_prepare_result: None,
staging_manifest_paths: HashMap::new(),
sync_region: false,
allocated_region_ids: vec![],
pending_deallocate_region_ids: vec![],
}
}

View File

@@ -18,8 +18,7 @@ use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage};
use common_meta::instruction::{
DowngradeRegionReply, DowngradeRegionsReply, EnterStagingRegionReply, EnterStagingRegionsReply,
FlushRegionReply, InstructionReply, SimpleReply, SyncRegionReply, SyncRegionsReply,
UpgradeRegionReply, UpgradeRegionsReply,
FlushRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, UpgradeRegionsReply,
};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_route::TableRouteValue;
@@ -254,34 +253,6 @@ pub fn new_enter_staging_region_reply(
}
}
/// Generates a [InstructionReply::SyncRegions] reply.
pub fn new_sync_region_reply(
id: u64,
region_id: RegionId,
ready: bool,
exists: bool,
error: Option<String>,
) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::SyncRegions(SyncRegionsReply::new(vec![
SyncRegionReply {
region_id,
ready,
exists,
error,
},
])))
.unwrap(),
)),
}
}
/// Mock the test data for WAL pruning.
pub async fn new_wal_prune_metadata(
table_metadata_manager: TableMetadataManagerRef,

View File

@@ -12,185 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
use common_meta::peer::Peer;
use common_telemetry::{info, warn};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::time::Instant;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::service::mailbox::{Channel, MailboxRef};
pub(crate) enum ErrorStrategy {
Ignore,
Retry,
}
fn handle_flush_region_reply(
reply: &InstructionReply,
region_ids: &[RegionId],
msg: &MailboxMessage,
) -> Result<(bool, Option<String>)> {
let result = match reply {
InstructionReply::FlushRegions(flush_reply) => {
if flush_reply.results.len() != region_ids.len() {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: format!(
"expect {} region flush result, but got {}",
region_ids.len(),
flush_reply.results.len()
),
}
.fail();
}
match flush_reply.overall_success {
true => (true, None),
false => (
false,
Some(
flush_reply
.results
.iter()
.filter_map(|(region_id, result)| match result {
Ok(_) => None,
Err(e) => Some(format!("{}: {:?}", region_id, e)),
})
.collect::<Vec<String>>()
.join("; "),
),
),
}
}
_ => {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect flush region reply",
}
.fail();
}
};
Ok(result)
}
/// Flushes the regions on the datanode.
///
/// Retry Or Ignore:
/// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
/// - Failed to flush region on the Datanode.
///
/// Abort:
/// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
/// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
/// - [ExceededDeadline](error::Error::ExceededDeadline)
/// - Invalid JSON.
pub(crate) async fn flush_region(
mailbox: &MailboxRef,
server_addr: &str,
region_ids: &[RegionId],
datanode: &Peer,
timeout: Duration,
error_strategy: ErrorStrategy,
) -> Result<()> {
let flush_instruction = Instruction::FlushRegions(FlushRegions::sync_batch(
region_ids.to_vec(),
FlushErrorStrategy::TryAll,
));
let msg = MailboxMessage::json_message(
&format!("Flush regions: {:?}", region_ids),
&format!("Metasrv@{}", server_addr),
&format!("Datanode-{}@{}", datanode.id, datanode.addr),
common_time::util::current_time_millis(),
&flush_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: flush_instruction.to_string(),
})?;
let ch = Channel::Datanode(datanode.id);
let now = Instant::now();
let receiver = mailbox.send(&ch, msg, timeout).await;
let receiver = match receiver {
Ok(receiver) => receiver,
Err(error::Error::PusherNotFound { .. }) => match error_strategy {
ErrorStrategy::Ignore => {
warn!(
"Failed to flush regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
region_ids, datanode
);
return Ok(());
}
ErrorStrategy::Retry => error::RetryLaterSnafu {
reason: format!(
"Pusher not found for flush regions on datanode {:?}, elapsed: {:?}",
datanode,
now.elapsed()
),
}
.fail()?,
},
Err(err) => {
return Err(err);
}
};
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received flush region reply: {:?}, regions: {:?}, elapsed: {:?}",
reply,
region_ids,
now.elapsed()
);
let (result, error) = handle_flush_region_reply(&reply, region_ids, &msg)?;
if let Some(error) = error {
match error_strategy {
ErrorStrategy::Ignore => {
warn!(
"Failed to flush regions {:?}, the datanode({}) error is ignored: {}",
region_ids, datanode, error
);
}
ErrorStrategy::Retry => {
return error::RetryLaterSnafu {
reason: format!(
"Failed to flush regions {:?}, the datanode({}) error is retried: {}",
region_ids,
datanode,
error,
),
}
.fail()?;
}
}
} else if result {
info!(
"The flush regions {:?} on datanode {:?} is successful, elapsed: {:?}",
region_ids,
datanode,
now.elapsed()
);
}
Ok(())
}
Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
operation: "Flush regions",
}
.fail(),
Err(err) => Err(err),
}
}
#[cfg(any(test, feature = "mock"))]
pub mod mock {
use std::io::Error;

View File

@@ -14,7 +14,7 @@
//! Drop a metric region
use common_telemetry::{debug, info};
use common_telemetry::info;
use snafu::ResultExt;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionDropRequest, RegionRequest};
@@ -46,15 +46,6 @@ impl MetricEngineInner {
.physical_region_states()
.get(&data_region_id)
{
debug!(
"Physical region {} is busy, there are still some logical regions: {:?}",
data_region_id,
state
.logical_regions()
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
);
(true, !state.logical_regions().is_empty())
} else {
// the second argument is not used, just pass in a dummy value

View File

@@ -314,8 +314,11 @@ impl MitoRegion {
/// Sets the dropping state.
/// You should call this method in the worker loop.
pub(crate) fn set_dropping(&self, expect: RegionLeaderState) -> Result<()> {
self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Dropping))
pub(crate) fn set_dropping(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Writable,
RegionRoleState::Leader(RegionLeaderState::Dropping),
)
}
/// Sets the truncating state.

View File

@@ -31,7 +31,7 @@ impl<S> RegionWorkerLoop<S> {
let region_id = request.region_id;
let source_region_id = request.source_region_id;
let sender = request.sender;
let region = match self.regions.writable_non_staging_region(region_id) {
let region = match self.regions.writable_region(region_id) {
Ok(region) => region,
Err(e) => {
let _ = sender.send(Err(e));

View File

@@ -42,18 +42,12 @@ where
&mut self,
region_id: RegionId,
) -> Result<AffectedRows> {
let region = self.regions.writable_region(region_id)?;
let region = self.regions.writable_non_staging_region(region_id)?;
info!("Try to drop region: {}, worker: {}", region_id, self.id);
let is_staging = region.is_staging();
let expect_state = if is_staging {
RegionLeaderState::Staging
} else {
RegionLeaderState::Writable
};
// Marks the region as dropping.
region.set_dropping(expect_state)?;
region.set_dropping()?;
// Writes dropping marker
// We rarely drop a region so we still operate in the worker loop.
let region_dir = region.access_layer.build_region_dir(region_id);

View File

@@ -638,7 +638,7 @@ impl RegionStatistic {
}
/// Request to sync the region from a manifest or a region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Clone)]
pub enum SyncRegionFromRequest {
/// Syncs the region using manifest information.
/// Used in leader-follower manifest sync scenarios.

View File

@@ -99,155 +99,3 @@ DROP TABLE alter_repartition_table;
Affected Rows: 0
-- Metric engine repartition test
CREATE TABLE metric_physical_table (
ts TIMESTAMP TIME INDEX,
host STRING,
cpu DOUBLE,
PRIMARY KEY(host)
)
PARTITION ON COLUMNS (host) (
host < 'h1',
host >= 'h1' AND host < 'h2',
host >= 'h2'
)
ENGINE = metric
WITH (
physical_metric_table = "true"
);
Affected Rows: 0
CREATE TABLE logical_table_v1 (
ts TIMESTAMP TIME INDEX,
host STRING PRIMARY KEY,
cpu DOUBLE,
)
ENGINE = metric
WITH (
on_physical_table = "metric_physical_table"
);
Affected Rows: 0
CREATE TABLE logical_table_v2 (
ts TIMESTAMP TIME INDEX,
host STRING PRIMARY KEY,
cpu DOUBLE,
)
ENGINE = metric
WITH (
on_physical_table = "metric_physical_table"
);
Affected Rows: 0
-- Split physical table partition
ALTER TABLE metric_physical_table SPLIT PARTITION (
host < 'h1'
) INTO (
host < 'h0',
host >= 'h0' AND host < 'h1'
);
Affected Rows: 0
SHOW CREATE TABLE metric_physical_table;
+-----------------------+------------------------------------------------------+
| Table | Create Table |
+-----------------------+------------------------------------------------------+
| metric_physical_table | CREATE TABLE IF NOT EXISTS "metric_physical_table" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "host" STRING NULL, |
| | "cpu" DOUBLE NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host") |
| | ) |
| | PARTITION ON COLUMNS ("host") ( |
| | host < 'h0', |
| | host >= 'h1' AND host < 'h2', |
| | host >= 'h2', |
| | host >= 'h0' AND host < 'h1' |
| | ) |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = 'true' |
| | ) |
+-----------------------+------------------------------------------------------+
-- Verify select * works and returns empty
SELECT * FROM metric_physical_table;
++
++
SELECT * FROM logical_table_v1;
++
++
SELECT * FROM logical_table_v2;
++
++
-- Merge physical table partition
ALTER TABLE metric_physical_table MERGE PARTITION (
host < 'h0',
host >= 'h0' AND host < 'h1'
);
Affected Rows: 0
SHOW CREATE TABLE metric_physical_table;
+-----------------------+------------------------------------------------------+
| Table | Create Table |
+-----------------------+------------------------------------------------------+
| metric_physical_table | CREATE TABLE IF NOT EXISTS "metric_physical_table" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "host" STRING NULL, |
| | "cpu" DOUBLE NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host") |
| | ) |
| | PARTITION ON COLUMNS ("host") ( |
| | host < 'h0' OR host >= 'h0' AND host < 'h1', |
| | host >= 'h1' AND host < 'h2', |
| | host >= 'h2' |
| | ) |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = 'true' |
| | ) |
+-----------------------+------------------------------------------------------+
-- Verify select * works and returns empty
SELECT * FROM metric_physical_table;
++
++
SELECT * FROM logical_table_v1;
++
++
SELECT * FROM logical_table_v2;
++
++
DROP TABLE logical_table_v1;
Affected Rows: 0
DROP TABLE logical_table_v2;
Affected Rows: 0
DROP TABLE metric_physical_table;
Affected Rows: 0

View File

@@ -46,78 +46,3 @@ ALTER TABLE alter_repartition_table REPARTITION (
);
DROP TABLE alter_repartition_table;
-- Metric engine repartition test
CREATE TABLE metric_physical_table (
ts TIMESTAMP TIME INDEX,
host STRING,
cpu DOUBLE,
PRIMARY KEY(host)
)
PARTITION ON COLUMNS (host) (
host < 'h1',
host >= 'h1' AND host < 'h2',
host >= 'h2'
)
ENGINE = metric
WITH (
physical_metric_table = "true"
);
CREATE TABLE logical_table_v1 (
ts TIMESTAMP TIME INDEX,
host STRING PRIMARY KEY,
cpu DOUBLE,
)
ENGINE = metric
WITH (
on_physical_table = "metric_physical_table"
);
CREATE TABLE logical_table_v2 (
ts TIMESTAMP TIME INDEX,
host STRING PRIMARY KEY,
cpu DOUBLE,
)
ENGINE = metric
WITH (
on_physical_table = "metric_physical_table"
);
-- Split physical table partition
ALTER TABLE metric_physical_table SPLIT PARTITION (
host < 'h1'
) INTO (
host < 'h0',
host >= 'h0' AND host < 'h1'
);
SHOW CREATE TABLE metric_physical_table;
-- Verify select * works and returns empty
SELECT * FROM metric_physical_table;
SELECT * FROM logical_table_v1;
SELECT * FROM logical_table_v2;
-- Merge physical table partition
ALTER TABLE metric_physical_table MERGE PARTITION (
host < 'h0',
host >= 'h0' AND host < 'h1'
);
SHOW CREATE TABLE metric_physical_table;
-- Verify select * works and returns empty
SELECT * FROM metric_physical_table;
SELECT * FROM logical_table_v1;
SELECT * FROM logical_table_v2;
DROP TABLE logical_table_v1;
DROP TABLE logical_table_v2;
DROP TABLE metric_physical_table;