feat(remote_wal): add skeleton for remote wal related to meta srv (#2933)

* feat: introduce wal config and kafka config

* feat: introduce kafka topic manager and selector

* feat: introduce region wal options

* chore: build region wal options upon starting meta srv

* feat: integrate region wal options allocator into table meta allocator

* chore: add wal config to metasrv.example.toml

* chore: add region wal options map to create table procedure

* feat: augment region create request with wal options

* feat: augment DatanodeTableValue with region wal options map

* chore: encode region wal options upon constructing table creator

* feat: persist region wal options when creating table meta

* fix: sqlness test

* chore: set default wal provider to raft-engine

* refactor: refactor wal options

* chore: update wal options allocator

* refactor: rename region wal options to wal options

* chore: update usages of region wal options

* chore: add some comments to kafka

* chore: fill in kafka config

* test: add tests for serde wal config

* test: add tests for wal options

* refactor: refactor wal options allocator to enum

* refactor: store wal options into the request options instead

* fix: typo

* fix: typo

* refactor: move wal options map to region info

* refactor: refacto serialization and deserialization of wal options

* refactor: use serde_json to encode wal options

* chore: rename wal_options_map to region_wal_options

* chore: resolve some review comments

* fix: typo

* refactor: replace kecab-case with snake_case

* fix: sqlness and converage tests

* fix: typo

* fix: coverage test

* fix: coverage test

* chore: resolve some review conversations

* fix: resolve some review conversations

* chore: format comments in metasrv.example.toml

* chore: update import style

* feat: integrate wal options allocator to standalone mode

* test: add compatible test for OpenRegion

* test: add compatible test for UpdateRegionMetadata

* chore: remove based suffix from topic selector type
This commit is contained in:
niebayes
2023-12-19 20:43:47 +08:00
committed by GitHub
parent c7b36779c1
commit 839e653e0d
45 changed files with 972 additions and 109 deletions

3
Cargo.lock generated
View File

@@ -1560,6 +1560,7 @@ dependencies = [
"servers",
"session",
"snafu",
"store-api",
"substrait 0.4.4",
"table",
"temp-env",
@@ -1831,11 +1832,13 @@ dependencies = [
"regex",
"serde",
"serde_json",
"serde_with",
"snafu",
"store-api",
"strum 0.25.0",
"table",
"tokio",
"toml 0.7.8",
"tonic 0.10.2",
]

View File

@@ -42,3 +42,27 @@ first_heartbeat_estimate = "1000ms"
# timeout = "10s"
# connect_timeout = "10s"
# tcp_nodelay = true
[wal]
# Available wal providers:
# - "raft_engine" (default)
# - "kafka"
provider = "raft_engine"
# There're none raft-engine wal config since meta srv only involves in remote wal currently.
# Kafka wal config.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# broker_endpoints = ["127.0.0.1:9090"]
# Number of topics to be created upon start.
# num_topics = 64
# Topic selector type.
# Available selector types:
# - "round_robin" (default)
# selector_type = "round_robin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
# topic_name_prefix = "greptimedb_kafka_wal"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3

View File

@@ -58,6 +58,7 @@ serde_json.workspace = true
servers.workspace = true
session.workspace = true
snafu.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
@@ -28,6 +28,7 @@ use common_telemetry::info;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use rand::Rng;
use store_api::storage::RegionNumber;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType};
use self::metadata::TableMetadataBencher;
@@ -137,12 +138,12 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
}
}
fn create_region_routes() -> Vec<RegionRoute> {
let mut regions = Vec::with_capacity(100);
fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {
let mut region_routes = Vec::with_capacity(100);
let mut rng = rand::thread_rng();
for region_id in 0..64u64 {
regions.push(RegionRoute {
for region_id in regions.into_iter().map(u64::from) {
region_routes.push(RegionRoute {
region: Region {
id: region_id.into(),
name: String::new(),
@@ -158,5 +159,11 @@ fn create_region_routes() -> Vec<RegionRoute> {
});
}
regions
region_routes
}
fn create_region_wal_options(regions: Vec<RegionNumber>) -> HashMap<RegionNumber, String> {
// TODO(niebayes): construct region wal options for benchmark.
let _ = regions;
HashMap::default()
}

View File

@@ -17,7 +17,9 @@ use std::time::Instant;
use common_meta::key::TableMetadataManagerRef;
use common_meta::table_name::TableName;
use super::{bench_self_recorded, create_region_routes, create_table_info};
use crate::cli::bench::{
bench_self_recorded, create_region_routes, create_region_wal_options, create_table_info,
};
pub struct TableMetadataBencher {
table_metadata_manager: TableMetadataManagerRef,
@@ -43,12 +45,15 @@ impl TableMetadataBencher {
let table_name = format!("bench_table_name_{}", i);
let table_name = TableName::new("bench_catalog", "bench_schema", table_name);
let table_info = create_table_info(i, table_name);
let region_routes = create_region_routes();
let regions: Vec<_> = (0..64).collect();
let region_routes = create_region_routes(regions.clone());
let region_wal_options = create_region_wal_options(regions);
let start = Instant::now();
self.table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, region_wal_options)
.await
.unwrap();

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
@@ -395,6 +396,9 @@ impl MigrateTableMetadata {
let region_distribution: RegionDistribution =
value.regions_id_map.clone().into_iter().collect();
// TODO(niebayes): properly fetch or construct wal options.
let region_wal_options = HashMap::default();
let datanode_table_kvs = region_distribution
.into_iter()
.map(|(datanode_id, regions)| {
@@ -409,6 +413,7 @@ impl MigrateTableMetadata {
engine: engine.to_string(),
region_storage_path: region_storage_path.clone(),
region_options: (&value.table_info.meta.options).into(),
region_wal_options: region_wal_options.clone(),
},
),
)

View File

@@ -14,7 +14,7 @@
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use config::ConfigError;
@@ -231,6 +231,12 @@ pub enum Error {
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Other error"))]
Other {
source: BoxedError,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -276,6 +282,8 @@ impl ErrorExt for Error {
Error::StartCatalogManager { source, .. } => source.status_code(),
Error::SerdeJson { .. } | Error::FileIo { .. } => StatusCode::Unexpected,
Error::Other { source, .. } => source.status_code(),
}
}

View File

@@ -19,6 +19,7 @@ use async_trait::async_trait;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef};
@@ -27,6 +28,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::build_wal_options_allocator;
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
@@ -35,7 +37,7 @@ use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::standalone::StandaloneTableMetadataCreator;
use frontend::instance::standalone::StandaloneTableMetadataAllocator;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
@@ -48,7 +50,7 @@ use servers::Mode;
use snafu::ResultExt;
use crate::error::{
CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, Result,
CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, OtherSnafu, Result,
ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
StartProcedureManagerSnafu, StopProcedureManagerSnafu,
};
@@ -372,7 +374,16 @@ impl StartCommand {
.step(10)
.build(),
);
let table_meta_allocator = Arc::new(StandaloneTableMetadataCreator::new(table_id_sequence));
// TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder.
let wal_options_allocator =
build_wal_options_allocator(&common_meta::wal::WalConfig::default(), &kv_backend)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?;
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator,
));
let ddl_task_executor = Self::create_ddl_task_executor(
kv_backend.clone(),

View File

@@ -34,11 +34,13 @@ prost.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with = "3"
snafu.workspace = true
store-api.workspace = true
strum.workspace = true
table.workspace = true
tokio.workspace = true
toml.workspace = true
tonic.workspace = true
[dev-dependencies]

View File

@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::Partition;
use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::TableId;
use store_api::storage::{RegionNumber, TableId};
use table::metadata::RawTableInfo;
use crate::cache_invalidator::CacheInvalidatorRef;
@@ -54,6 +55,17 @@ pub struct TableMetadataAllocatorContext {
pub cluster_id: u64,
}
/// Metadata allocated to a table.
pub struct TableMetadata {
/// Table id.
pub table_id: TableId,
/// Route information for each region of the table.
pub region_routes: Vec<RegionRoute>,
/// The encoded wal options for regions of the table.
// If a region does not have an associated wal options, no key for the region would be found in the map.
pub region_wal_options: HashMap<RegionNumber, String>,
}
#[async_trait::async_trait]
pub trait TableMetadataAllocator: Send + Sync {
async fn create(
@@ -61,7 +73,7 @@ pub trait TableMetadataAllocator: Send + Sync {
ctx: &TableMetadataAllocatorContext,
table_info: &mut RawTableInfo,
partitions: &[Partition],
) -> Result<(TableId, Vec<RegionRoute>)>;
) -> Result<TableMetadata>;
}
pub type TableMetadataAllocatorRef = Arc<dyn TableMetadataAllocator>;

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{
CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader,
@@ -30,7 +32,7 @@ use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
@@ -45,6 +47,7 @@ use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
};
use crate::wal::WAL_OPTIONS_KEY;
pub struct CreateTableProcedure {
pub context: DdlContext,
@@ -58,11 +61,12 @@ impl CreateTableProcedure {
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
context: DdlContext,
) -> Self {
Self {
context,
creator: TableCreator::new(cluster_id, task, region_routes),
creator: TableCreator::new(cluster_id, task, region_routes, region_wal_options),
}
}
@@ -94,6 +98,10 @@ impl CreateTableProcedure {
&self.creator.data.region_routes
}
pub fn region_wal_options(&self) -> &HashMap<RegionNumber, String> {
&self.creator.data.region_wal_options
}
/// Checks whether the table exists.
async fn on_prepare(&mut self) -> Result<Status> {
let expr = &self.creator.data.task.create_table;
@@ -193,6 +201,7 @@ impl CreateTableProcedure {
let create_table_data = &self.creator.data;
let region_routes = &create_table_data.region_routes;
let region_wal_options = &create_table_data.region_wal_options;
let create_table_expr = &create_table_data.task.create_table;
let catalog = &create_table_expr.catalog_name;
@@ -211,12 +220,12 @@ impl CreateTableProcedure {
let mut requests = Vec::with_capacity(regions.len());
for region_number in regions {
let region_id = RegionId::new(self.table_id(), region_number);
let create_region_request = request_builder
.build_one(
&self.creator.data.task.create_table,
region_id,
storage_path.clone(),
region_wal_options,
)
.await?;
@@ -262,8 +271,9 @@ impl CreateTableProcedure {
let raw_table_info = self.table_info().clone();
let region_routes = self.region_routes().clone();
let region_wal_options = self.region_wal_options().clone();
manager
.create_table_metadata(raw_table_info, region_routes)
.create_table_metadata(raw_table_info, region_routes, region_wal_options)
.await?;
info!("Created table metadata for table {table_id}");
@@ -316,13 +326,19 @@ pub struct TableCreator {
}
impl TableCreator {
pub fn new(cluster_id: u64, task: CreateTableTask, region_routes: Vec<RegionRoute>) -> Self {
pub fn new(
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
) -> Self {
Self {
data: CreateTableData {
state: CreateTableState::Prepare,
cluster_id,
task,
region_routes,
region_wal_options,
},
opening_regions: vec![],
}
@@ -371,6 +387,7 @@ pub struct CreateTableData {
pub state: CreateTableState,
pub task: CreateTableTask,
pub region_routes: Vec<RegionRoute>,
pub region_wal_options: HashMap<RegionNumber, String>,
pub cluster_id: u64,
}
@@ -406,11 +423,20 @@ impl CreateRequestBuilder {
create_expr: &CreateTableExpr,
region_id: RegionId,
storage_path: String,
region_wal_options: &HashMap<RegionNumber, String>,
) -> Result<PbCreateRegionRequest> {
let mut request = self.template.clone();
request.region_id = region_id.as_u64();
request.path = storage_path;
// Stores the encoded wal options into the request options.
region_wal_options
.get(&region_id.region_number())
.and_then(|wal_options| {
request
.options
.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
});
if self.template.engine == METRIC_ENGINE {
self.metric_engine_hook(create_expr, region_id, &mut request)

View File

@@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, tracing};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
@@ -26,7 +28,7 @@ use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{
DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadataAllocatorContext,
DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata, TableMetadataAllocatorContext,
TableMetadataAllocatorRef,
};
use crate::error::{
@@ -52,7 +54,7 @@ pub struct DdlManager {
datanode_manager: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_meta_allocator: TableMetadataAllocatorRef,
table_metadata_allocator: TableMetadataAllocatorRef,
memory_region_keeper: MemoryRegionKeeperRef,
}
@@ -63,7 +65,7 @@ impl DdlManager {
datanode_clients: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_meta_allocator: TableMetadataAllocatorRef,
table_metadata_allocator: TableMetadataAllocatorRef,
memory_region_keeper: MemoryRegionKeeperRef,
) -> Result<Self> {
let manager = Self {
@@ -71,7 +73,7 @@ impl DdlManager {
datanode_manager: datanode_clients,
cache_invalidator,
table_metadata_manager,
table_meta_allocator,
table_metadata_allocator,
memory_region_keeper,
};
manager.register_loaders()?;
@@ -176,11 +178,17 @@ impl DdlManager {
cluster_id: u64,
create_table_task: CreateTableTask,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<ProcedureId> {
let context = self.create_context();
let procedure =
CreateTableProcedure::new(cluster_id, create_table_task, region_routes, context);
let procedure = CreateTableProcedure::new(
cluster_id,
create_table_task,
region_routes,
region_wal_options,
context,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
@@ -374,8 +382,8 @@ async fn handle_create_table_task(
cluster_id: u64,
mut create_table_task: CreateTableTask,
) -> Result<SubmitDdlTaskResponse> {
let (table_id, region_routes) = ddl_manager
.table_meta_allocator
let table_meta = ddl_manager
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&mut create_table_task.table_info,
@@ -383,8 +391,19 @@ async fn handle_create_table_task(
)
.await?;
let TableMetadata {
table_id,
region_routes,
region_wal_options,
} = table_meta;
let id = ddl_manager
.submit_create_table_task(cluster_id, create_table_task, region_routes)
.submit_create_table_task(
cluster_id,
create_table_task,
region_routes,
region_wal_options,
)
.await?;
info!("Table: {table_id:?} is created via procedure_id {id:?}");
@@ -437,7 +456,7 @@ mod tests {
use api::v1::meta::Partition;
use common_procedure::local::LocalManager;
use table::metadata::{RawTableInfo, TableId};
use table::metadata::RawTableInfo;
use super::DdlManager;
use crate::cache_invalidator::DummyCacheInvalidator;
@@ -446,13 +465,12 @@ mod tests {
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext};
use crate::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext};
use crate::error::Result;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
use crate::rpc::router::RegionRoute;
use crate::state_store::KvStateStore;
/// A dummy implemented [DatanodeManager].
@@ -475,7 +493,7 @@ mod tests {
_ctx: &TableMetadataAllocatorContext,
_table_info: &mut RawTableInfo,
_partitions: &[Partition],
) -> Result<(TableId, Vec<RegionRoute>)> {
) -> Result<TableMetadata> {
unimplemented!()
}
}

View File

@@ -23,6 +23,7 @@ use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::peer::Peer;
use crate::wal::WalOptions;
use crate::DatanodeId;
#[derive(Snafu)]
@@ -284,6 +285,17 @@ pub enum Error {
#[snafu(display("Retry later"))]
RetryLater { source: BoxedError },
#[snafu(display(
"Failed to encode a wal options to json string, wal_options: {:?}",
wal_options
))]
EncodeWalOptionsToJson {
wal_options: WalOptions,
#[snafu(source)]
error: serde_json::Error,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -318,7 +330,8 @@ impl ErrorExt for Error {
| BuildTableMeta { .. }
| TableRouteNotFound { .. }
| ConvertRawTableInfo { .. }
| RegionOperatingRace { .. } => StatusCode::Unexpected,
| RegionOperatingRace { .. }
| EncodeWalOptionsToJson { .. } => StatusCode::Unexpected,
SendMessage { .. }
| GetKvCache { .. }

View File

@@ -91,19 +91,27 @@ impl Display for OpenRegion {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct OpenRegion {
pub region_ident: RegionIdent,
pub region_storage_path: String,
pub options: HashMap<String, String>,
pub region_options: HashMap<String, String>,
#[serde(default)]
pub region_wal_options: HashMap<String, String>,
}
impl OpenRegion {
pub fn new(region_ident: RegionIdent, path: &str, options: HashMap<String, String>) -> Self {
pub fn new(
region_ident: RegionIdent,
path: &str,
region_options: HashMap<String, String>,
region_wal_options: HashMap<String, String>,
) -> Self {
Self {
region_ident,
region_storage_path: path.to_string(),
options,
region_options,
region_wal_options,
}
}
}
@@ -218,12 +226,13 @@ mod tests {
},
"test/foo",
HashMap::new(),
HashMap::new(),
));
let serialized = serde_json::to_string(&open_region).unwrap();
assert_eq!(
r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","options":{}}}"#,
r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{}}}"#,
serialized
);
@@ -242,4 +251,45 @@ mod tests {
serialized
);
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct LegacyOpenRegion {
region_ident: RegionIdent,
region_storage_path: String,
region_options: HashMap<String, String>,
}
#[test]
fn test_compatible_serialize_open_region() {
let region_ident = RegionIdent {
cluster_id: 1,
datanode_id: 2,
table_id: 1024,
region_number: 1,
engine: "mito2".to_string(),
};
let region_storage_path = "test/foo".to_string();
let region_options = HashMap::from([
("a".to_string(), "aa".to_string()),
("b".to_string(), "bb".to_string()),
]);
// Serialize a legacy OpenRegion.
let legacy_open_region = LegacyOpenRegion {
region_ident: region_ident.clone(),
region_storage_path: region_storage_path.clone(),
region_options: region_options.clone(),
};
let serialized = serde_json::to_string(&legacy_open_region).unwrap();
// Deserialize to OpenRegion.
let deserialized = serde_json::from_str(&serialized).unwrap();
let expected = OpenRegion {
region_ident,
region_storage_path,
region_options,
region_wal_options: HashMap::new(),
};
assert_eq!(expected, deserialized);
}
}

View File

@@ -364,6 +364,7 @@ impl TableMetadataManager {
&self,
mut table_info: RawTableInfo,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<()> {
let region_numbers = region_routes
.iter()
@@ -399,6 +400,7 @@ impl TableMetadataManager {
&engine,
&region_storage_path,
region_options,
region_wal_options,
distribution,
)?;
@@ -587,6 +589,7 @@ impl TableMetadataManager {
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
new_region_routes: Vec<RegionRoute>,
new_region_options: &HashMap<String, String>,
new_region_wal_options: &HashMap<String, String>,
) -> Result<()> {
// Updates the datanode table key value pairs.
let current_region_distribution =
@@ -599,6 +602,7 @@ impl TableMetadataManager {
current_region_distribution,
new_region_distribution,
new_region_options,
new_region_wal_options,
)?;
// Updates the table_route.
@@ -827,19 +831,31 @@ mod tests {
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
// creates metadata.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
// if metadata was already created, it should be ok.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
let mut modified_region_routes = region_routes.clone();
modified_region_routes.push(region_route.clone());
// if remote metadata was exists, it should return an error.
assert!(table_metadata_manager
.create_table_metadata(table_info.clone(), modified_region_routes)
.create_table_metadata(
table_info.clone(),
modified_region_routes,
HashMap::default()
)
.await
.is_err());
@@ -873,7 +889,11 @@ mod tests {
// creates metadata.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
@@ -944,7 +964,11 @@ mod tests {
let table_id = table_info.ident.table_id;
// creates metadata.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
let new_table_name = "another_name".to_string();
@@ -1012,7 +1036,11 @@ mod tests {
let table_id = table_info.ident.table_id;
// creates metadata.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
let mut new_table_info = table_info.clone();
@@ -1089,7 +1117,11 @@ mod tests {
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone()));
// creates metadata.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
@@ -1155,7 +1187,11 @@ mod tests {
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone()));
// creates metadata.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();
assert_datanode_table(&table_metadata_manager, table_id, &region_routes).await;
@@ -1172,10 +1208,12 @@ mod tests {
engine: engine.to_string(),
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
region_wal_options: HashMap::new(),
},
&current_table_route_value,
new_region_routes.clone(),
&HashMap::new(),
&HashMap::new(),
)
.await
.unwrap();
@@ -1189,10 +1227,12 @@ mod tests {
engine: engine.to_string(),
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
region_wal_options: HashMap::new(),
},
&current_table_route_value,
new_region_routes.clone(),
&HashMap::new(),
&HashMap::new(),
)
.await
.unwrap();
@@ -1211,10 +1251,12 @@ mod tests {
engine: engine.to_string(),
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
region_wal_options: HashMap::new(),
},
&current_table_route_value,
new_region_routes.clone(),
&HashMap::new(),
&HashMap::new(),
)
.await
.unwrap();
@@ -1236,10 +1278,12 @@ mod tests {
engine: engine.to_string(),
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
region_wal_options: HashMap::new(),
},
&wrong_table_route_value,
new_region_routes,
&HashMap::new(),
&HashMap::new(),
)
.await
.is_err());

View File

@@ -38,14 +38,18 @@ use crate::DatanodeId;
/// For compatible reason, DON'T modify the field name.
pub struct RegionInfo {
#[serde(default)]
// The table engine, it SHOULD be immutable after created.
/// The table engine, it SHOULD be immutable after created.
pub engine: String,
// The region storage path, it SHOULD be immutable after created.
/// The region storage path, it SHOULD be immutable after created.
#[serde(default)]
pub region_storage_path: String,
// The region options.
/// The region options.
#[serde(default)]
pub region_options: HashMap<String, String>,
/// The per-region wal options.
/// Key: region number (in string representation). Value: the encoded wal options of the region.
#[serde(default)]
pub region_wal_options: HashMap<String, String>,
}
pub struct DatanodeTableKey {
@@ -165,11 +169,21 @@ impl DatanodeTableManager {
engine: &str,
region_storage_path: &str,
region_options: HashMap<String, String>,
region_wal_options: HashMap<RegionNumber, String>,
distribution: RegionDistribution,
) -> Result<Txn> {
let txns = distribution
.into_iter()
.map(|(datanode_id, regions)| {
let filtered_region_wal_options = regions
.iter()
.filter_map(|region_number| {
region_wal_options
.get(region_number)
.map(|wal_options| (region_number.to_string(), wal_options.clone()))
})
.collect();
let key = DatanodeTableKey::new(datanode_id, table_id);
let val = DatanodeTableValue::new(
table_id,
@@ -178,6 +192,7 @@ impl DatanodeTableManager {
engine: engine.to_string(),
region_storage_path: region_storage_path.to_string(),
region_options: region_options.clone(),
region_wal_options: filtered_region_wal_options,
},
);
@@ -198,6 +213,7 @@ impl DatanodeTableManager {
current_region_distribution: RegionDistribution,
new_region_distribution: RegionDistribution,
new_region_options: &HashMap<String, String>,
new_region_wal_options: &HashMap<String, String>,
) -> Result<Txn> {
let mut opts = Vec::new();
@@ -209,12 +225,15 @@ impl DatanodeTableManager {
opts.push(TxnOp::Delete(raw_key))
}
}
let need_update_options = region_info.region_options != *new_region_options;
let need_update_wal_options = region_info.region_wal_options != *new_region_wal_options;
for (datanode, regions) in new_region_distribution.into_iter() {
let need_update =
if let Some(current_region) = current_region_distribution.get(&datanode) {
// Updates if need.
*current_region != regions || need_update_options
*current_region != regions || need_update_options || need_update_wal_options
} else {
true
};
@@ -272,7 +291,7 @@ mod tests {
region_info: RegionInfo::default(),
version: 1,
};
let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","region_options":{},"version":1}"#;
let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
let raw_value = value.try_as_raw_value().unwrap();
assert_eq!(raw_value, literal);
@@ -286,6 +305,41 @@ mod tests {
assert!(parsed.is_ok());
}
// This test intends to ensure both the `serde_json::to_string` + `serde_json::from_str`
// and `serde_json::to_vec` + `serde_json::from_slice` work for `DatanodeTableValue`.
// Warning: if the key of `region_wal_options` is of type non-String, this test would fail.
#[test]
fn test_serde_with_region_info() {
let region_info = RegionInfo {
engine: "test_engine".to_string(),
region_storage_path: "test_storage_path".to_string(),
region_options: HashMap::from([
("a".to_string(), "aa".to_string()),
("b".to_string(), "bb".to_string()),
("c".to_string(), "cc".to_string()),
]),
region_wal_options: HashMap::from([
("1".to_string(), "aaa".to_string()),
("2".to_string(), "bbb".to_string()),
("3".to_string(), "ccc".to_string()),
]),
};
let table_value = DatanodeTableValue {
table_id: 1,
regions: vec![],
region_info,
version: 1,
};
let encoded = serde_json::to_string(&table_value).unwrap();
let decoded = serde_json::from_str(&encoded).unwrap();
assert_eq!(table_value, decoded);
let encoded = serde_json::to_vec(&table_value).unwrap();
let decoded = serde_json::from_slice(&encoded).unwrap();
assert_eq!(table_value, decoded);
}
#[test]
fn test_strip_table_id() {
fn test_err(raw_key: &[u8]) {

View File

@@ -35,6 +35,8 @@ pub mod sequence;
pub mod state_store;
pub mod table_name;
pub mod util;
#[allow(unused)]
pub mod wal;
pub type ClusterId = u64;
pub type DatanodeId = u64;

127
src/common/meta/src/wal.rs Normal file
View File

@@ -0,0 +1,127 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod kafka;
pub mod options_allocator;
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;
use crate::error::Result;
use crate::wal::kafka::KafkaConfig;
pub use crate::wal::kafka::{KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic};
pub use crate::wal::options_allocator::{build_wal_options_allocator, WalOptionsAllocator};
/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair
/// and inserted into the options of a `RegionCreateRequest`.
pub const WAL_OPTIONS_KEY: &str = "wal_options";
/// Wal configurations for bootstrapping metasrv.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
#[serde(tag = "provider")]
pub enum WalConfig {
#[default]
#[serde(rename = "raft_engine")]
RaftEngine,
#[serde(rename = "kafka")]
Kafka(KafkaConfig),
}
/// Wal options allocated to a region.
/// A wal options is encoded by metasrv into a `String` with `serde_json::to_string`.
/// It's then decoded by datanode to a `HashMap<String, String>` with `serde_json::from_str`.
/// Such a encoding/decoding scheme is inspired by the encoding/decoding of `RegionOptions`.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
#[serde(tag = "wal.provider")]
pub enum WalOptions {
#[default]
#[serde(rename = "raft_engine")]
RaftEngine,
#[serde(rename = "kafka")]
#[serde(with = "prefix_wal_kafka")]
Kafka(KafkaWalOptions),
}
with_prefix!(prefix_wal_kafka "wal.kafka.");
#[cfg(test)]
mod tests {
use super::*;
use crate::wal::kafka::topic_selector::SelectorType as KafkaTopicSelectorType;
#[test]
fn test_serde_wal_config() {
// Test serde raft-engine wal config with none other wal config.
let toml_str = r#"
provider = "raft_engine"
"#;
let wal_config: WalConfig = toml::from_str(toml_str).unwrap();
assert_eq!(wal_config, WalConfig::RaftEngine);
// Test serde raft-engine wal config with extra other wal config.
let toml_str = r#"
provider = "raft_engine"
broker_endpoints = ["127.0.0.1:9090"]
num_topics = 32
"#;
let wal_config: WalConfig = toml::from_str(toml_str).unwrap();
assert_eq!(wal_config, WalConfig::RaftEngine);
// Test serde kafka wal config.
let toml_str = r#"
provider = "kafka"
broker_endpoints = ["127.0.0.1:9090"]
num_topics = 32
selector_type = "round_robin"
topic_name_prefix = "greptimedb_kafka_wal"
num_partitions = 1
replication_factor = 3
"#;
let wal_config: WalConfig = toml::from_str(toml_str).unwrap();
let expected_kafka_config = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 32,
selector_type: KafkaTopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_kafka_wal".to_string(),
num_partitions: 1,
replication_factor: 3,
};
assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config));
}
#[test]
fn test_serde_wal_options() {
// Test serde raft-engine wal options.
let wal_options = WalOptions::RaftEngine;
let encoded = serde_json::to_string(&wal_options).unwrap();
let expected = r#"{"wal.provider":"raft_engine"}"#;
assert_eq!(&encoded, expected);
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, wal_options);
// Test serde kafka wal options.
let wal_options = WalOptions::Kafka(KafkaWalOptions {
topic: "test_topic".to_string(),
});
let encoded = serde_json::to_string(&wal_options).unwrap();
let expected = r#"{"wal.provider":"kafka","wal.kafka.topic":"test_topic"}"#;
assert_eq!(&encoded, expected);
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, wal_options);
}
}

View File

@@ -0,0 +1,60 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod topic;
pub mod topic_manager;
pub mod topic_selector;
use serde::{Deserialize, Serialize};
pub use crate::wal::kafka::topic::Topic;
pub use crate::wal::kafka::topic_manager::TopicManager;
use crate::wal::kafka::topic_selector::SelectorType as TopicSelectorType;
/// Configurations for bootstrapping a kafka wal.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct KafkaConfig {
/// The broker endpoints of the Kafka cluster.
pub broker_endpoints: Vec<String>,
/// Number of topics to be created upon start.
pub num_topics: usize,
/// The type of the topic selector with which to select a topic for a region.
pub selector_type: TopicSelectorType,
/// Topic name prefix.
pub topic_name_prefix: String,
/// Number of partitions per topic.
pub num_partitions: i32,
/// The replication factor of each topic.
pub replication_factor: i16,
}
impl Default for KafkaConfig {
fn default() -> Self {
Self {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_kafka_wal".to_string(),
num_partitions: 1,
replication_factor: 3,
}
}
}
/// Kafka wal options allocated to a region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct KafkaOptions {
/// Kafka wal topic.
pub topic: Topic,
}

View File

@@ -0,0 +1,18 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Kafka wal topic.
/// Publishers publish log entries to the topic while subscribers pull log entries from the topic.
/// A topic is simply a string right now. But it may be more complex in the future.
pub type Topic = String;

View File

@@ -0,0 +1,61 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use crate::error::Result;
use crate::kv_backend::KvBackendRef;
use crate::wal::kafka::topic::Topic;
use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, SelectorType, TopicSelectorRef};
use crate::wal::kafka::KafkaConfig;
/// Manages topic initialization and selection.
pub struct TopicManager {
topic_pool: Vec<Topic>,
topic_selector: TopicSelectorRef,
kv_backend: KvBackendRef,
}
impl TopicManager {
/// Creates a new topic manager.
pub fn new(config: &KafkaConfig, kv_backend: KvBackendRef) -> Self {
let selector = match config.selector_type {
SelectorType::RoundRobin => RoundRobinTopicSelector::new(),
};
Self {
topic_pool: Vec::new(),
topic_selector: Arc::new(selector),
kv_backend,
}
}
/// Tries to initialize the topic pool.
/// The initializer first tries to restore persisted topics from the kv backend.
/// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request more topics.
pub async fn try_init(&mut self) -> Result<()> {
todo!()
}
/// Selects one topic from the topic pool through the topic selector.
pub fn select(&self) -> &Topic {
self.topic_selector.select(&self.topic_pool)
}
/// Selects a batch of topics from the topic pool through the topic selector.
pub fn select_batch(&self, num_topics: usize) -> Vec<Topic> {
// TODO(niebayes): calls `select` to select a collection of topics in a batching manner.
vec!["tmp_topic".to_string(); num_topics]
}
}

View File

@@ -0,0 +1,51 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::wal::kafka::topic::Topic;
/// The type of the topic selector, i.e. with which strategy to select a topic.
#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SelectorType {
#[default]
#[serde(rename = "round_robin")]
RoundRobin,
}
/// Controls topic selection.
pub(super) trait TopicSelector: Send + Sync {
/// Selects a topic from the topic pool.
fn select(&self, topic_pool: &[Topic]) -> &Topic;
}
pub(super) type TopicSelectorRef = Arc<dyn TopicSelector>;
/// A topic selector with the round-robin strategy, i.e. selects topics in a round-robin manner.
pub(super) struct RoundRobinTopicSelector;
impl RoundRobinTopicSelector {
/// Creates a new round-robin topic selector.
pub(super) fn new() -> Self {
todo!()
}
}
impl TopicSelector for RoundRobinTopicSelector {
fn select(&self, topic_pool: &[Topic]) -> &Topic {
todo!()
}
}

View File

@@ -0,0 +1,90 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use crate::error::{EncodeWalOptionsToJsonSnafu, Result};
use crate::kv_backend::KvBackendRef;
use crate::wal::kafka::{KafkaOptions, TopicManager as KafkaTopicManager};
use crate::wal::{WalConfig, WalOptions};
/// Allocates wal options in region granularity.
#[derive(Default)]
pub enum WalOptionsAllocator {
#[default]
RaftEngine,
Kafka(KafkaTopicManager),
}
impl WalOptionsAllocator {
/// Creates a WalOptionsAllocator.
pub fn new(config: &WalConfig, kv_backend: KvBackendRef) -> Self {
todo!()
}
/// Tries to initialize the allocator.
pub fn try_init(&self) -> Result<()> {
todo!()
}
/// Allocates a wal options for a region.
pub fn alloc(&self) -> WalOptions {
todo!()
}
/// Allocates a batch of wal options where each wal options goes to a region.
pub fn alloc_batch(&self, num_regions: usize) -> Vec<WalOptions> {
match self {
WalOptionsAllocator::RaftEngine => vec![WalOptions::RaftEngine; num_regions],
WalOptionsAllocator::Kafka(topic_manager) => {
let topics = topic_manager.select_batch(num_regions);
topics
.into_iter()
.map(|topic| WalOptions::Kafka(KafkaOptions { topic }))
.collect()
}
}
}
}
/// Allocates a wal options for each region. The allocated wal options is encoded immediately.
pub fn build_region_wal_options(
regions: Vec<RegionNumber>,
wal_options_allocator: &WalOptionsAllocator,
) -> Result<HashMap<RegionNumber, String>> {
let wal_options = wal_options_allocator
.alloc_batch(regions.len())
.into_iter()
.map(|wal_options| {
serde_json::to_string(&wal_options).context(EncodeWalOptionsToJsonSnafu { wal_options })
})
.collect::<Result<Vec<_>>>()?;
Ok(regions.into_iter().zip(wal_options).collect())
}
/// Builds a wal options allocator.
// TODO(niebayes): implement.
pub async fn build_wal_options_allocator(
config: &WalConfig,
kv_backend: &KvBackendRef,
) -> Result<WalOptionsAllocator> {
let _ = config;
let _ = kv_backend;
Ok(WalOptionsAllocator::default())
}

View File

@@ -534,6 +534,7 @@ mod tests {
"mock",
"foo/bar/weny",
HashMap::from([("foo".to_string(), "bar".to_string())]),
HashMap::default(),
BTreeMap::from([(0, vec![0, 1, 2])]),
)
.unwrap();

View File

@@ -53,14 +53,17 @@ impl RegionHeartbeatResponseHandler {
Instruction::OpenRegion(OpenRegion {
region_ident,
region_storage_path,
options,
region_options,
region_wal_options,
}) => Ok(Box::new(|region_server| {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
// TODO(niebayes): extends region options with region_wal_options.
let _ = region_wal_options;
let request = RegionRequest::Open(RegionOpenRequest {
engine: region_ident.engine,
region_dir: region_dir(&region_storage_path, region_id),
options,
options: region_options,
});
let result = region_server.handle_request(region_id, request).await;
@@ -239,6 +242,7 @@ mod tests {
},
path,
HashMap::new(),
HashMap::new(),
))
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::Partition;
@@ -20,18 +21,20 @@ use async_trait::async_trait;
use client::region::check_response_header;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef};
use common_meta::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceRef;
use common_meta::wal::options_allocator::build_region_wal_options;
use common_meta::wal::WalOptionsAllocator;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{debug, tracing};
use datanode::region_server::RegionServer;
use servers::grpc::region_server::RegionServerHandler;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result};
@@ -104,24 +107,28 @@ impl Datanode for RegionInvoker {
}
}
pub struct StandaloneTableMetadataCreator {
pub struct StandaloneTableMetadataAllocator {
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocator,
}
impl StandaloneTableMetadataCreator {
pub fn new(table_id_sequence: SequenceRef) -> Self {
Self { table_id_sequence }
impl StandaloneTableMetadataAllocator {
pub fn new(table_id_sequence: SequenceRef, wal_options_allocator: WalOptionsAllocator) -> Self {
Self {
table_id_sequence,
wal_options_allocator,
}
}
}
#[async_trait]
impl TableMetadataAllocator for StandaloneTableMetadataCreator {
impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
raw_table_info: &mut RawTableInfo,
partitions: &[Partition],
) -> MetaResult<(TableId, Vec<RegionRoute>)> {
) -> MetaResult<TableMetadata> {
let table_id = self.table_id_sequence.next().await? as u32;
raw_table_info.ident.table_id = table_id;
let region_routes = partitions
@@ -144,6 +151,22 @@ impl TableMetadataAllocator for StandaloneTableMetadataCreator {
})
.collect::<Vec<_>>();
Ok((table_id, region_routes))
let region_numbers = region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
let region_wal_options =
build_region_wal_options(region_numbers, &self.wal_options_allocator)?;
debug!(
"Allocated region wal options {:?} for table {}",
region_wal_options, table_id
);
Ok(TableMetadata {
table_id,
region_routes,
region_wal_options: HashMap::default(),
})
}
}

View File

@@ -161,7 +161,7 @@ mod test {
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
@@ -303,7 +303,7 @@ mod test {
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();

View File

@@ -26,6 +26,7 @@ use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::wal::WalConfig;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
@@ -70,6 +71,7 @@ pub struct MetaSrvOptions {
pub datanode: DatanodeOptions,
pub enable_telemetry: bool,
pub data_home: String,
pub wal: WalConfig,
}
impl Default for MetaSrvOptions {
@@ -94,6 +96,7 @@ impl Default for MetaSrvOptions {
datanode: DatanodeOptions::default(),
enable_telemetry: true,
data_home: METASRV_HOME.to_string(),
wal: WalConfig::default(),
}
}
}

View File

@@ -19,6 +19,7 @@ use std::time::Duration;
use client::client_manager::DatanodeClients;
use common_base::Plugins;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::TableMetadataAllocatorRef;
@@ -30,13 +31,14 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef};
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::wal::build_wal_options_allocator;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::error::{self, Result};
use crate::error::{self, OtherSnafu, Result};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_stats_handler::CollectStatsHandler;
@@ -204,6 +206,10 @@ impl MetaSrvBuilder {
table_id: None,
};
let wal_options_allocator = build_wal_options_allocator(&options.wal, &kv_backend)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?;
let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
let sequence = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
@@ -214,7 +220,8 @@ impl MetaSrvBuilder {
Arc::new(MetaSrvTableMetadataAllocator::new(
selector_ctx.clone(),
selector.clone(),
sequence,
sequence.clone(),
wal_options_allocator,
))
});

View File

@@ -621,6 +621,7 @@ mod tests {
opening_region,
&path,
HashMap::new(),
HashMap::new(),
)))
.unwrap(),
))

View File

@@ -41,8 +41,10 @@ pub(super) struct ActivateRegion {
// the new leader node needs to remark the failed region as "inactive"
// to prevent it from renewing the lease.
remark_inactive_region: bool,
// An `None` option stands for uninitialized.
region_storage_path: Option<String>,
region_options: Option<HashMap<String, String>>,
region_wal_options: Option<HashMap<String, String>>,
}
impl ActivateRegion {
@@ -52,6 +54,7 @@ impl ActivateRegion {
remark_inactive_region: false,
region_storage_path: None,
region_options: None,
region_wal_options: None,
}
}
@@ -81,14 +84,19 @@ impl ActivateRegion {
};
info!("Activating region: {candidate_ident:?}");
let region_options: HashMap<String, String> = (&table_info.meta.options).into();
// TODO(niebayes): properly fetch or construct region wal options.
let region_wal_options = HashMap::new();
let instruction = Instruction::OpenRegion(OpenRegion::new(
candidate_ident.clone(),
&region_storage_path,
region_options.clone(),
region_wal_options.clone(),
));
self.region_storage_path = Some(region_storage_path);
self.region_options = Some(region_options);
self.region_wal_options = Some(region_wal_options);
let msg = MailboxMessage::json_message(
"Activate Region",
&format!("Metasrv@{}", ctx.selector_ctx.server_addr),
@@ -137,6 +145,11 @@ impl ActivateRegion {
.context(error::UnexpectedSnafu {
violated: "expected region_options",
})?,
self.region_wal_options
.clone()
.context(error::UnexpectedSnafu {
violated: "expected region_wal_options",
})?,
)))
} else {
// The region could be just indeed cannot be opened by the candidate, retry
@@ -222,6 +235,7 @@ mod tests {
},
&env.path,
HashMap::new(),
HashMap::new(),
)))
.unwrap(),
))
@@ -256,7 +270,7 @@ mod tests {
.unwrap();
assert_eq!(
format!("{next_state:?}"),
r#"UpdateRegionMetadata { candidate: Peer { id: 2, addr: "" }, region_storage_path: "greptime/public", region_options: {} }"#
r#"UpdateRegionMetadata { candidate: Peer { id: 2, addr: "" }, region_storage_path: "greptime/public", region_options: {}, region_wal_options: {} }"#
);
}
@@ -292,6 +306,7 @@ mod tests {
},
&env.path,
HashMap::new(),
HashMap::new(),
)))
.unwrap(),
))

View File

@@ -277,7 +277,7 @@ mod tests {
.unwrap();
assert_eq!(
format!("{next_state:?}"),
r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None }"#
r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None, region_wal_options: None }"#
);
}
@@ -319,7 +319,7 @@ mod tests {
// Timeout or not, proceed to `ActivateRegion`.
assert_eq!(
format!("{next_state:?}"),
r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None }"#
r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None, region_wal_options: None }"#
);
}
}

View File

@@ -30,11 +30,13 @@ use crate::error::{self, Result, RetryLaterSnafu, TableRouteNotFoundSnafu};
use crate::lock::keys::table_metadata_lock_key;
use crate::lock::Opts;
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub(super) struct UpdateRegionMetadata {
candidate: Peer,
region_storage_path: String,
region_options: HashMap<String, String>,
#[serde(default)]
region_wal_options: HashMap<String, String>,
}
impl UpdateRegionMetadata {
@@ -42,11 +44,13 @@ impl UpdateRegionMetadata {
candidate: Peer,
region_storage_path: String,
region_options: HashMap<String, String>,
region_wal_options: HashMap<String, String>,
) -> Self {
Self {
candidate,
region_storage_path,
region_options,
region_wal_options,
}
}
@@ -104,10 +108,12 @@ impl UpdateRegionMetadata {
engine: engine.to_string(),
region_storage_path: self.region_storage_path.to_string(),
region_options: self.region_options.clone(),
region_wal_options: self.region_wal_options.clone(),
},
&table_route_value,
new_region_routes,
&self.region_options,
&self.region_wal_options,
)
.await
.context(error::UpdateTableRouteSnafu)?;
@@ -188,8 +194,12 @@ mod tests {
let env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let mut state =
UpdateRegionMetadata::new(Peer::new(2, ""), env.path.clone(), HashMap::new());
let mut state = UpdateRegionMetadata::new(
Peer::new(2, ""),
env.path.clone(),
HashMap::new(),
HashMap::new(),
);
let next_state = state.next(&env.context, &failed_region).await.unwrap();
assert_eq!(format!("{next_state:?}"), "InvalidateCache");
@@ -206,6 +216,7 @@ mod tests {
Peer::new(candidate, ""),
env.path.clone(),
HashMap::new(),
HashMap::new(),
);
state
.update_table_route(&env.context, &failed_region)
@@ -348,7 +359,12 @@ mod tests {
let path = env.path.clone();
let _ = futures::future::join_all(vec![
tokio::spawn(async move {
let state = UpdateRegionMetadata::new(Peer::new(2, ""), path, HashMap::new());
let state = UpdateRegionMetadata::new(
Peer::new(2, ""),
path,
HashMap::new(),
HashMap::new(),
);
state
.update_metadata(&ctx_1, &failed_region_1)
.await
@@ -359,6 +375,7 @@ mod tests {
Peer::new(3, ""),
env.path.clone(),
HashMap::new(),
HashMap::new(),
);
state
.update_metadata(&ctx_2, &failed_region_2)
@@ -431,4 +448,40 @@ mod tests {
assert_eq!(tables[0].regions, vec![2, 4]);
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct LegacyUpdateRegionMetadata {
candidate: Peer,
region_storage_path: String,
region_options: HashMap<String, String>,
}
#[test]
fn test_compatible_serialize_update_region_metadata() {
let candidate = Peer::new(1, "test_addr");
let region_storage_path = "test_path".to_string();
let region_options = HashMap::from([
("a".to_string(), "aa".to_string()),
("b".to_string(), "bb".to_string()),
]);
let legacy_update_region_metadata = LegacyUpdateRegionMetadata {
candidate: candidate.clone(),
region_storage_path: region_storage_path.clone(),
region_options: region_options.clone(),
};
// Serialize a LegacyUpdateRegionMetadata.
let serialized = serde_json::to_string(&legacy_update_region_metadata).unwrap();
// Deserialize to UpdateRegionMetadata.
let deserialized = serde_json::from_str(&serialized).unwrap();
let expected = UpdateRegionMetadata {
candidate,
region_storage_path,
region_options,
region_wal_options: HashMap::new(),
};
assert_eq!(expected, deserialized);
}
}

View File

@@ -137,6 +137,7 @@ impl RegionMigrationStart {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -187,7 +188,7 @@ mod tests {
};
env.table_metadata_manager()
.create_table_metadata(table_info, vec![region_route])
.create_table_metadata(table_info, vec![region_route], HashMap::default())
.await
.unwrap();
@@ -221,7 +222,7 @@ mod tests {
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
@@ -254,7 +255,7 @@ mod tests {
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
@@ -281,7 +282,7 @@ mod tests {
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();

View File

@@ -76,6 +76,8 @@ impl OpenCandidateRegion {
let engine = table_info.meta.engine.clone();
let region_options: HashMap<String, String> = (&table_info.meta.options).into();
// TODO(niebayes): properly fetch or construct region wal options.
let region_wal_options = HashMap::new();
let open_instruction = Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
@@ -87,6 +89,7 @@ impl OpenCandidateRegion {
},
&region_storage_path,
region_options,
region_wal_options,
));
Ok(open_instruction)
@@ -210,7 +213,8 @@ mod tests {
engine: MITO2_ENGINE.to_string(),
},
region_storage_path: "/bar/foo/region/".to_string(),
options: Default::default(),
region_options: Default::default(),
region_wal_options: Default::default(),
})
}
@@ -403,7 +407,7 @@ mod tests {
}];
env.table_metadata_manager()
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
@@ -368,7 +369,7 @@ impl ProcedureMigrationTestSuite {
) {
self.env
.table_metadata_manager()
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
}

View File

@@ -74,6 +74,7 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -137,7 +138,7 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
@@ -191,7 +192,7 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
@@ -234,7 +235,7 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();

View File

@@ -59,6 +59,7 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -129,7 +130,7 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
@@ -215,7 +216,7 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();

View File

@@ -136,6 +136,9 @@ impl UpdateMetadata {
let engine = table_info.meta.engine.clone();
let region_options: HashMap<String, String> = (&table_info.meta.options).into();
// TODO(niebayes): properly fetch or construct region wal options.
let region_wal_options = HashMap::new();
// No remote fetch.
let table_route_value = ctx.get_table_route_value().await?;
@@ -146,10 +149,12 @@ impl UpdateMetadata {
engine: engine.to_string(),
region_storage_path: region_storage_path.to_string(),
region_options: region_options.clone(),
region_wal_options: region_wal_options.clone(),
},
table_route_value,
region_routes,
&region_options,
&region_wal_options,
)
.await
.context(error::TableMetadataManagerSnafu)
@@ -171,6 +176,7 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -221,7 +227,7 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
@@ -250,7 +256,7 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
@@ -281,7 +287,7 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
@@ -322,7 +328,7 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
@@ -381,7 +387,7 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
@@ -407,7 +413,7 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
@@ -433,7 +439,7 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
@@ -466,7 +472,7 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();

View File

@@ -101,6 +101,7 @@ fn test_region_request_builder() {
1,
create_table_task(),
test_data::new_region_routes(),
HashMap::default(),
test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
);
@@ -191,6 +192,7 @@ async fn test_on_datanode_create_regions() {
1,
create_table_task(),
region_routes,
HashMap::default(),
test_data::new_ddl_context(datanode_manager),
);
@@ -369,7 +371,11 @@ async fn test_submit_alter_region_requests() {
let table_info = test_data::new_table_info();
context
.table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.create_table_metadata(
table_info.clone(),
region_routes.clone(),
HashMap::default(),
)
.await
.unwrap();

View File

@@ -291,7 +291,7 @@ mod tests {
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, vec![region_route.clone()])
.create_table_metadata(table_info, vec![region_route.clone()], HashMap::default())
.await
.unwrap();
@@ -378,7 +378,7 @@ mod tests {
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(table_info, vec![region_route.clone()])
.create_table_metadata(table_info, vec![region_route.clone()], HashMap::default())
.await
.unwrap();

View File

@@ -15,11 +15,13 @@
use api::v1::meta::Partition;
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceRef;
use common_telemetry::warn;
use common_meta::wal::options_allocator::build_region_wal_options;
use common_meta::wal::WalOptionsAllocator;
use common_telemetry::{debug, warn};
use snafu::{ensure, ResultExt};
use store_api::storage::{RegionId, TableId, MAX_REGION_SEQ};
use table::metadata::RawTableInfo;
@@ -32,6 +34,7 @@ pub struct MetaSrvTableMetadataAllocator {
ctx: SelectorContext,
selector: SelectorRef,
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocator,
}
impl MetaSrvTableMetadataAllocator {
@@ -39,11 +42,13 @@ impl MetaSrvTableMetadataAllocator {
ctx: SelectorContext,
selector: SelectorRef,
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocator,
) -> Self {
Self {
ctx,
selector,
table_id_sequence,
wal_options_allocator,
}
}
}
@@ -55,8 +60,8 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator {
ctx: &TableMetadataAllocatorContext,
raw_table_info: &mut RawTableInfo,
partitions: &[Partition],
) -> MetaResult<(TableId, Vec<RegionRoute>)> {
handle_create_region_routes(
) -> MetaResult<TableMetadata> {
let (table_id, region_routes) = handle_create_region_routes(
ctx.cluster_id,
raw_table_info,
partitions,
@@ -66,7 +71,25 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator {
)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
.context(meta_error::ExternalSnafu)?;
let region_numbers = region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
let region_wal_options =
build_region_wal_options(region_numbers, &self.wal_options_allocator)?;
debug!(
"Allocated region wal options {:?} for table {}",
region_wal_options, table_id
);
Ok(TableMetadata {
table_id,
region_routes,
region_wal_options,
})
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use chrono::DateTime;
@@ -144,7 +145,7 @@ pub(crate) async fn prepare_table_region_and_info_value(
region_route_factory(4, 3),
];
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, HashMap::default())
.await
.unwrap();
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
@@ -80,6 +80,12 @@ pub fn new_test_table_info(
.unwrap()
}
fn new_test_region_wal_options(regions: Vec<RegionNumber>) -> HashMap<RegionNumber, String> {
// TODO(niebayes): construct region wal options for test.
let _ = regions;
HashMap::default()
}
/// Create a partition rule manager with two tables, one is partitioned by single column, and
/// the other one is two. The tables are under default catalog and schema.
///
@@ -102,9 +108,12 @@ pub(crate) async fn create_partition_rule_manager(
let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend));
let regions = vec![1u32, 2, 3];
let region_wal_options = new_test_region_wal_options(regions.clone());
table_metadata_manager
.create_table_metadata(
new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter()).into(),
new_test_table_info(1, "table_1", regions.clone().into_iter()).into(),
vec![
RegionRoute {
region: Region {
@@ -161,13 +170,14 @@ pub(crate) async fn create_partition_rule_manager(
leader_status: None,
},
],
region_wal_options.clone(),
)
.await
.unwrap();
table_metadata_manager
.create_table_metadata(
new_test_table_info(2, "table_2", vec![0u32, 1, 2].into_iter()).into(),
new_test_table_info(2, "table_2", regions.clone().into_iter()).into(),
vec![
RegionRoute {
region: Region {
@@ -230,6 +240,7 @@ pub(crate) async fn create_partition_rule_manager(
leader_status: None,
},
],
region_wal_options,
)
.await
.unwrap();

View File

@@ -23,13 +23,14 @@ use common_meta::ddl_manager::DdlManager;
use common_meta::key::TableMetadataManager;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::build_wal_options_allocator;
use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::LoggingOptions;
use datanode::config::DatanodeOptions;
use datanode::datanode::DatanodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::standalone::StandaloneTableMetadataCreator;
use frontend::instance::standalone::StandaloneTableMetadataAllocator;
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard};
@@ -117,7 +118,15 @@ impl GreptimeDbStandaloneBuilder {
.step(10)
.build(),
);
let table_meta_allocator = Arc::new(StandaloneTableMetadataCreator::new(table_id_sequence));
// TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder.
let wal_options_allocator =
build_wal_options_allocator(&common_meta::wal::WalConfig::default(), &kv_backend)
.await
.unwrap();
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator,
));
let ddl_task_executor = Arc::new(
DdlManager::try_new(