refactor: introduce DdlTaskExecutor and refactor statement executor (#2341)

* feat: add kv store option

* refactor: refactor statement executor

* refactor: refactor standalone table creator

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* refactor: move ShowCreateTable and CreateDatabase to StatementExecutor

* fix: fix RegionDistribution

* feat: build standalone

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-09-08 12:16:59 +08:00
committed by Ruihang Xia
parent 3f6d557b8d
commit f386329e29
45 changed files with 1597 additions and 1394 deletions

12
Cargo.lock generated
View File

@@ -1591,6 +1591,7 @@ dependencies = [
"common-config",
"common-error",
"common-meta",
"common-procedure",
"common-query",
"common-recordbatch",
"common-telemetry",
@@ -1840,6 +1841,7 @@ name = "common-meta"
version = "0.4.0-nightly"
dependencies = [
"api",
"arrow-flight",
"async-stream",
"async-trait",
"chrono",
@@ -1847,6 +1849,7 @@ dependencies = [
"common-error",
"common-grpc-expr",
"common-procedure",
"common-recordbatch",
"common-runtime",
"common-telemetry",
"common-time",
@@ -3242,12 +3245,14 @@ dependencies = [
"client",
"common-base",
"common-catalog",
"common-config",
"common-datasource",
"common-error",
"common-function",
"common-grpc",
"common-grpc-expr",
"common-meta",
"common-procedure",
"common-query",
"common-recordbatch",
"common-runtime",
@@ -3264,6 +3269,7 @@ dependencies = [
"futures-util",
"humantime-serde",
"itertools 0.10.5",
"log-store",
"meta-client",
"meta-srv",
"meter-core",
@@ -3277,6 +3283,7 @@ dependencies = [
"partition",
"prost",
"query",
"raft-engine",
"regex",
"script",
"serde",
@@ -7275,8 +7282,9 @@ dependencies = [
[[package]]
name = "raft-engine"
version = "0.3.0"
source = "git+https://github.com/tikv/raft-engine.git?rev=2dcaf5beeea3d5de9ec9c7133a2451d00f508f52#2dcaf5beeea3d5de9ec9c7133a2451d00f508f52"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e02bdc8cba47cb7062b433f56700a8edbc9fcd6d706389120d20aa1827e5ba7b"
dependencies = [
"byteorder",
"crc32fast",

View File

@@ -153,6 +153,7 @@ object-store = { path = "src/object-store" }
partition = { path = "src/partition" }
promql = { path = "src/promql" }
query = { path = "src/query" }
raft-engine = { version = "0.4" }
script = { path = "src/script" }
servers = { path = "src/servers" }
session = { path = "src/session" }

View File

@@ -82,8 +82,6 @@ enable = true
# WAL options.
[wal]
# WAL data directory
# dir = "/tmp/greptimedb/wal"
# WAL file size in bytes.
file_size = "256MB"
# WAL purge threshold.
@@ -95,6 +93,20 @@ read_batch_size = 128
# Whether to sync log file after every write.
sync_write = false
# Kv options.
[kv_store]
# Kv file size in bytes.
file_size = "256MB"
# Kv purge threshold.
purge_threshold = "4GB"
# Procedure storage options.
[procedure]
# Procedure max retry time.
max_retry_times = 3
# Initial retry delay of procedures, increases exponentially
retry_delay = "500ms"
# Storage options.
[storage]
# The working home directory.

View File

@@ -27,3 +27,10 @@ pub trait KvCacheInvalidator: Send + Sync {
}
pub type KvCacheInvalidatorRef = Arc<dyn KvCacheInvalidator>;
pub struct DummyKvCacheInvalidator;
#[async_trait::async_trait]
impl KvCacheInvalidator for DummyKvCacheInvalidator {
async fn invalidate_key(&self, _key: &[u8]) {}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::region::{RegionRequest, RegionResponse};
use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use api::v1::ResponseHeader;
use arrow_flight::Ticket;
use async_stream::stream;
@@ -25,6 +25,7 @@ use common_meta::error::{self as meta_error, Result as MetaResult};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use common_telemetry::{error, timer};
use prost::Message;
use snafu::{location, Location, OptionExt, ResultExt};
use tokio_stream::StreamExt;
@@ -56,6 +57,16 @@ impl Datanode for RegionRequester {
}
})
}
async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
let ticket = Ticket {
ticket: request.encode_to_vec().into(),
};
self.do_get_inner(ticket)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
}
}
impl RegionRequester {
@@ -63,7 +74,7 @@ impl RegionRequester {
Self { client }
}
pub async fn do_get(&self, ticket: Ticket) -> Result<SendableRecordBatchStream> {
pub async fn do_get_inner(&self, ticket: Ticket) -> Result<SendableRecordBatchStream> {
let mut flight_client = self.client.make_flight_client()?;
let response = flight_client
.mut_inner()

View File

@@ -26,6 +26,7 @@ common-base = { workspace = true }
common-config = { workspace = true }
common-error = { workspace = true }
common-meta = { workspace = true }
common-procedure = { workspace = true }
common-query = { workspace = true }
common-recordbatch = { workspace = true }
common-telemetry = { workspace = true, features = [

View File

@@ -116,7 +116,7 @@ impl SubCommand {
Ok(Application::Metasrv(app))
}
(SubCommand::Standalone(cmd), Options::Standalone(opts)) => {
let app = cmd.build(opts.fe_opts, opts.dn_opts).await?;
let app = cmd.build(*opts).await?;
Ok(Application::Standalone(app))
}
(SubCommand::Cli(cmd), Options::Cli(_)) => {

View File

@@ -21,15 +21,12 @@ use client::client_manager::DatanodeClients;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_error::ext::ErrorExt;
use common_meta::key::TableMetadataManager;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
use either::Either;
use frontend::catalog::FrontendCatalogManager;
use meta_client::client::MetaClientBuilder;
use partition::manager::PartitionRuleManager;
use partition::route::TableRoutes;
use query::datafusion::DatafusionQueryEngine;
use query::logical_optimizer::LogicalOptimizer;
use query::parser::QueryLanguageParser;
@@ -254,17 +251,12 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
let cached_meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let table_routes = Arc::new(TableRoutes::new(meta_client));
let partition_manager = Arc::new(PartitionRuleManager::new(table_routes));
let datanode_clients = Arc::new(DatanodeClients::default());
let catalog_list = Arc::new(FrontendCatalogManager::new(
cached_meta_backend.clone(),
cached_meta_backend.clone(),
partition_manager,
datanode_clients,
Arc::new(TableMetadataManager::new(cached_meta_backend)),
));
let plugins: Arc<Plugins> = Default::default();
let state = Arc::new(QueryEngineState::new(

View File

@@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_config::KvStoreConfig;
use common_telemetry::logging::LoggingOptions;
use config::{Config, Environment, File, FileFormat};
use datanode::datanode::DatanodeOptions;
use datanode::datanode::{DatanodeOptions, ProcedureConfig};
use frontend::frontend::FrontendOptions;
use meta_srv::metasrv::MetaSrvOptions;
use serde::{Deserialize, Serialize};
@@ -26,9 +27,12 @@ pub const ENV_VAR_SEP: &str = "__";
pub const ENV_LIST_SEP: &str = ",";
pub struct MixOptions {
pub data_home: String,
pub procedure_cfg: ProcedureConfig,
pub kv_store_cfg: KvStoreConfig,
pub fe_opts: FrontendOptions,
pub dn_opts: DatanodeOptions,
pub logging: LoggingOptions,
pub logging_opts: LoggingOptions,
}
pub enum Options {
@@ -51,7 +55,7 @@ impl Options {
Options::Datanode(opts) => &opts.logging,
Options::Frontend(opts) => &opts.logging,
Options::Metasrv(opts) => &opts.logging,
Options::Standalone(opts) => &opts.logging,
Options::Standalone(opts) => &opts.logging_opts,
Options::Cli(opts) => opts,
}
}

View File

@@ -14,19 +14,24 @@
use std::sync::Arc;
use catalog::remote::DummyKvCacheInvalidator;
use catalog::CatalogManagerRef;
use clap::Parser;
use common_base::Plugins;
use common_config::WalConfig;
use common_config::{kv_store_dir, KvStoreConfig, WalConfig};
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig};
use datanode::region_server::RegionServer;
use datanode::Instance as InstanceRef;
use frontend::catalog::FrontendCatalogManager;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
};
use query::QueryEngineRef;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
@@ -47,12 +52,8 @@ pub struct Command {
}
impl Command {
pub async fn build(
self,
fe_opts: FrontendOptions,
dn_opts: DatanodeOptions,
) -> Result<Instance> {
self.subcmd.build(fe_opts, dn_opts).await
pub async fn build(self, opts: MixOptions) -> Result<Instance> {
self.subcmd.build(opts).await
}
pub fn load_options(&self, top_level_options: TopLevelOptions) -> Result<Options> {
@@ -66,9 +67,9 @@ enum SubCommand {
}
impl SubCommand {
async fn build(self, fe_opts: FrontendOptions, dn_opts: DatanodeOptions) -> Result<Instance> {
async fn build(self, opts: MixOptions) -> Result<Instance> {
match self {
SubCommand::Start(cmd) => cmd.build(fe_opts, dn_opts).await,
SubCommand::Start(cmd) => cmd.build(opts).await,
}
}
@@ -93,6 +94,7 @@ pub struct StandaloneOptions {
pub prom_store_options: PromStoreOptions,
pub wal: WalConfig,
pub storage: StorageConfig,
pub kv_store: KvStoreConfig,
pub procedure: ProcedureConfig,
pub logging: LoggingOptions,
}
@@ -111,6 +113,7 @@ impl Default for StandaloneOptions {
prom_store_options: PromStoreOptions::default(),
wal: WalConfig::default(),
storage: StorageConfig::default(),
kv_store: KvStoreConfig::default(),
procedure: ProcedureConfig::default(),
logging: LoggingOptions::default(),
}
@@ -265,23 +268,29 @@ impl StartCommand {
if self.influxdb_enable {
opts.influxdb_options.enable = self.influxdb_enable;
}
let kv_store_cfg = opts.kv_store.clone();
let procedure_cfg = opts.procedure.clone();
let fe_opts = opts.clone().frontend_options();
let logging = opts.logging.clone();
let logging_opts = opts.logging.clone();
let dn_opts = opts.datanode_options();
Ok(Options::Standalone(Box::new(MixOptions {
procedure_cfg,
kv_store_cfg,
data_home: dn_opts.storage.data_home.to_string(),
fe_opts,
dn_opts,
logging,
logging_opts,
})))
}
#[allow(unreachable_code)]
#[allow(unused_variables)]
#[allow(clippy::diverging_sub_expression)]
async fn build(self, fe_opts: FrontendOptions, dn_opts: DatanodeOptions) -> Result<Instance> {
async fn build(self, opts: MixOptions) -> Result<Instance> {
let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?);
let fe_opts = opts.fe_opts;
let dn_opts = opts.dn_opts;
info!("Standalone start command: {:#?}", self);
info!(
@@ -289,13 +298,36 @@ impl StartCommand {
fe_opts, dn_opts
);
let datanode = Datanode::new(dn_opts.clone(), Default::default())
let kv_dir = kv_store_dir(&opts.data_home);
let (kv_store, procedure_manager) = FeInstance::try_build_standalone_components(
kv_dir,
opts.kv_store_cfg,
opts.procedure_cfg,
)
.await
.context(StartFrontendSnafu)?;
let datanode = Datanode::new(dn_opts.clone(), plugins.clone())
.await
.context(StartDatanodeSnafu)?;
let region_server = datanode.region_server();
let catalog_manager = Arc::new(FrontendCatalogManager::new(
kv_store.clone(),
Arc::new(DummyKvCacheInvalidator),
Arc::new(StandaloneDatanodeManager(region_server.clone())),
));
// TODO: build frontend instance like in distributed mode
let mut frontend =
build_frontend(plugins.clone(), todo!(), datanode.region_server()).await?;
let mut frontend = build_frontend(
plugins,
kv_store,
procedure_manager,
catalog_manager,
datanode.query_engine(),
region_server,
)
.await?;
frontend
.build_servers(&fe_opts)
@@ -309,12 +341,21 @@ impl StartCommand {
/// Build frontend instance in standalone mode
async fn build_frontend(
plugins: Arc<Plugins>,
datanode_instance: InstanceRef,
kv_store: KvBackendRef,
procedure_manager: ProcedureManagerRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
region_server: RegionServer,
) -> Result<FeInstance> {
let mut frontend_instance = FeInstance::try_new_standalone(datanode_instance, region_server)
.await
.context(StartFrontendSnafu)?;
let mut frontend_instance = FeInstance::try_new_standalone(
kv_store,
procedure_manager,
catalog_manager,
query_engine,
region_server,
)
.await
.context(StartFrontendSnafu)?;
frontend_instance.set_plugins(plugins.clone());
Ok(frontend_instance)
}
@@ -411,7 +452,7 @@ mod tests {
};
let fe_opts = options.fe_opts;
let dn_opts = options.dn_opts;
let logging_opts = options.logging;
let logging_opts = options.logging_opts;
assert_eq!(Mode::Standalone, fe_opts.mode);
assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http_options.addr);
assert_eq!(Duration::from_secs(30), fe_opts.http_options.timeout);
@@ -456,8 +497,8 @@ mod tests {
unreachable!()
};
assert_eq!("/tmp/greptimedb/test/logs", opts.logging.dir);
assert_eq!("debug", opts.logging.level.unwrap());
assert_eq!("/tmp/greptimedb/test/logs", opts.logging_opts.dir);
assert_eq!("debug", opts.logging_opts.level.unwrap());
}
#[test]
@@ -526,10 +567,10 @@ mod tests {
};
// Should be read from env, env > default values.
assert_eq!(opts.logging.dir, "/other/log/dir");
assert_eq!(opts.logging_opts.dir, "/other/log/dir");
// Should be read from config file, config file > env > default values.
assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
assert_eq!(opts.logging_opts.level.as_ref().unwrap(), "debug");
// Should be read from cli, cli > config file > env > default values.
assert_eq!(opts.fe_opts.http_options.addr, "127.0.0.1:14000");

View File

@@ -44,3 +44,27 @@ impl Default for WalConfig {
}
}
}
pub fn kv_store_dir(store_dir: &str) -> String {
format!("{store_dir}/kv")
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct KvStoreConfig {
// Kv file size in bytes
pub file_size: ReadableSize,
// Kv purge threshold in bytes
pub purge_threshold: ReadableSize,
}
impl Default for KvStoreConfig {
fn default() -> Self {
Self {
// log file size 256MB
file_size: ReadableSize::mb(256),
// purge threshold 4GB
purge_threshold: ReadableSize::gb(4),
}
}
}

View File

@@ -6,12 +6,14 @@ license.workspace = true
[dependencies]
api = { workspace = true }
arrow-flight.workspace = true
async-stream.workspace = true
async-trait.workspace = true
common-catalog = { workspace = true }
common-error = { workspace = true }
common-grpc-expr.workspace = true
common-procedure = { workspace = true }
common-recordbatch = { workspace = true }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }
common-time = { workspace = true }

View File

@@ -18,6 +18,7 @@ use crate::error::Result;
use crate::ident::TableIdent;
/// Places context of invalidating cache. e.g., span id, trace id etc.
#[derive(Debug, Default)]
pub struct Context {
pub subject: Option<String>,
}
@@ -29,3 +30,12 @@ pub trait CacheInvalidator: Send + Sync {
}
pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;
pub struct DummyCacheInvalidator;
#[async_trait::async_trait]
impl CacheInvalidator for DummyCacheInvalidator {
async fn invalidate_table(&self, _ctx: &Context, _table_ident: TableIdent) -> Result<()> {
Ok(())
}
}

View File

@@ -14,7 +14,8 @@
use std::sync::Arc;
use api::v1::region::RegionRequest;
use api::v1::region::{QueryRequest, RegionRequest};
use common_recordbatch::SendableRecordBatchStream;
use crate::error::Result;
use crate::peer::Peer;
@@ -25,6 +26,8 @@ pub type AffectedRows = u64;
pub trait Datanode: Send + Sync {
/// Handles DML, and DDL requests.
async fn handle(&self, request: RegionRequest) -> Result<AffectedRows>;
async fn handle_query(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;
}
pub type DatanodeRef = Arc<dyn Datanode>;

View File

@@ -36,7 +36,7 @@ pub struct ExecutorContext {
}
#[async_trait::async_trait]
pub trait DdlExecutor: Send + Sync {
pub trait DdlTaskExecutor: Send + Sync {
async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
@@ -44,23 +44,23 @@ pub trait DdlExecutor: Send + Sync {
) -> Result<SubmitDdlTaskResponse>;
}
pub type DdlExecutorRef = Arc<dyn DdlExecutor>;
pub type DdlTaskExecutorRef = Arc<dyn DdlTaskExecutor>;
pub struct TableCreatorContext {
pub struct TableMetadataAllocatorContext {
pub cluster_id: u64,
}
#[async_trait::async_trait]
pub trait TableCreator: Send + Sync {
pub trait TableMetadataAllocator: Send + Sync {
async fn create(
&self,
ctx: &TableCreatorContext,
ctx: &TableMetadataAllocatorContext,
table_info: &mut RawTableInfo,
partitions: &[Partition],
) -> Result<(TableId, Vec<RegionRoute>)>;
}
pub type TableCreatorRef = Arc<dyn TableCreator>;
pub type TableMetadataAllocatorRef = Arc<dyn TableMetadataAllocator>;
#[derive(Clone)]
pub struct DdlContext {

View File

@@ -23,7 +23,10 @@ use crate::datanode_manager::DatanodeManagerRef;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::{DdlContext, DdlExecutor, ExecutorContext, TableCreatorContext, TableCreatorRef};
use crate::ddl::{
DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadataAllocatorContext,
TableMetadataAllocatorRef,
};
use crate::error::{
self, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableNotFoundSnafu,
UnsupportedSnafu, WaitProcedureSnafu,
@@ -46,7 +49,7 @@ pub struct DdlManager {
datanode_manager: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_creator: TableCreatorRef,
table_creator: TableMetadataAllocatorRef,
}
impl DdlManager {
@@ -55,7 +58,7 @@ impl DdlManager {
datanode_clients: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_creator: TableCreatorRef,
table_creator: TableMetadataAllocatorRef,
) -> Self {
Self {
procedure_manager,
@@ -332,7 +335,7 @@ async fn handle_create_table_task(
let (table_id, region_routes) = ddl_manager
.table_creator
.create(
&TableCreatorContext { cluster_id },
&TableMetadataAllocatorContext { cluster_id },
&mut create_table_task.table_info,
&create_table_task.partitions,
)
@@ -342,7 +345,7 @@ async fn handle_create_table_task(
.submit_create_table_task(cluster_id, create_table_task, region_routes)
.await?;
info!("Table: {table_id} is created via procedure_id {id:?}");
info!("Table: {table_id:?} is created via procedure_id {id:?}");
Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),
@@ -351,7 +354,7 @@ async fn handle_create_table_task(
}
#[async_trait::async_trait]
impl DdlExecutor for DdlManager {
impl DdlTaskExecutor for DdlManager {
async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,

View File

@@ -17,7 +17,7 @@ use std::fmt::{Display, Formatter};
use api::v1::meta::Peer as PbPeer;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
#[derive(Debug, Default, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub struct Peer {
/// Node identifier. Unique in a cluster.
pub id: u64,

View File

@@ -77,16 +77,10 @@ impl TryFrom<PbRouteResponse> for RouteResponse {
pub fn region_distribution(region_routes: &[RegionRoute]) -> Result<RegionDistribution> {
let mut regions_id_map = RegionDistribution::new();
for route in region_routes.iter() {
let node_id = route
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
err_msg: "leader not found",
})?
.id;
let region_id = route.region.id.region_number();
regions_id_map.entry(node_id).or_default().push(region_id);
if let Some(peer) = route.leader_peer.as_ref() {
let region_id = route.region.id.region_number();
regions_id_map.entry(peer.id).or_default().push(region_id);
}
}
for (_, regions) in regions_id_map.iter_mut() {
// id asc
@@ -110,6 +104,24 @@ pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
.collect()
}
pub fn convert_to_region_map(region_routes: &[RegionRoute]) -> HashMap<u32, &Peer> {
region_routes
.iter()
.filter_map(|x| {
x.leader_peer
.as_ref()
.map(|leader| (x.region.id.region_number(), leader))
})
.collect::<HashMap<_, _>>()
}
pub fn find_region_leader(region_routes: &[RegionRoute], region_number: u32) -> Option<&Peer> {
region_routes
.iter()
.find(|x| x.region.id.region_number() == region_number)
.and_then(|r| r.leader_peer.as_ref())
}
pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
region_routes
.iter()
@@ -333,6 +345,18 @@ pub struct RegionRoute {
pub follower_peers: Vec<Peer>,
}
pub struct RegionRoutes(pub Vec<RegionRoute>);
impl RegionRoutes {
pub fn region_map(&self) -> HashMap<u32, &Peer> {
convert_to_region_map(&self.0)
}
pub fn find_region_leader(&self, region_number: u32) -> Option<&Peer> {
self.region_map().get(&region_number).copied()
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
pub struct Region {
pub id: RegionId,

View File

@@ -33,7 +33,7 @@ use meta_client::MetaClientOptions;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use object_store::util::normalize_dir;
use query::QueryEngineFactory;
use query::{QueryEngineFactory, QueryEngineRef};
use secrecy::SecretString;
use serde::{Deserialize, Serialize};
use servers::heartbeat_options::HeartbeatOptions;
@@ -401,6 +401,7 @@ pub struct Datanode {
services: Option<Services>,
heartbeat_task: Option<HeartbeatTask>,
region_server: RegionServer,
query_engine: QueryEngineRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
}
@@ -423,7 +424,7 @@ impl Datanode {
.context(RuntimeResourceSnafu)?,
);
let mut region_server = RegionServer::new(query_engine, runtime.clone());
let mut region_server = RegionServer::new(query_engine.clone(), runtime.clone());
let log_store = Self::build_log_store(&opts).await?;
let object_store = store::new_object_store(&opts).await?;
let engines = Self::build_store_engines(&opts, log_store, object_store).await?;
@@ -454,6 +455,7 @@ impl Datanode {
services,
heartbeat_task,
region_server,
query_engine,
greptimedb_telemetry_task,
})
}
@@ -502,6 +504,10 @@ impl Datanode {
self.region_server.clone()
}
pub fn query_engine(&self) -> QueryEngineRef {
self.query_engine.clone()
}
// internal utils
/// Build [RaftEngineLogStore]

View File

@@ -21,12 +21,14 @@ chrono.workspace = true
client = { workspace = true }
common-base = { workspace = true }
common-catalog = { workspace = true }
common-config = { workspace = true }
common-datasource = { workspace = true }
common-error = { workspace = true }
common-function = { workspace = true }
common-grpc = { workspace = true }
common-grpc-expr = { workspace = true }
common-meta = { workspace = true }
common-procedure = { workspace = true }
common-query = { workspace = true }
common-recordbatch = { workspace = true }
common-runtime = { workspace = true }
@@ -42,7 +44,9 @@ futures = "0.3"
futures-util.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
log-store = { workspace = true }
meta-client = { workspace = true }
raft-engine = { workspace = true }
# Although it is not used, please do not delete it.
meter-core.workspace = true
meter-macros.workspace = true

View File

@@ -26,22 +26,25 @@ use catalog::{
CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, RegisterSchemaRequest,
RegisterTableRequest, RenameTableRequest,
};
use client::client_manager::DatanodeClients;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, Context};
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::error::Result as MetaResult;
use common_meta::ident::TableIdent;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetaKey, TableMetadataManagerRef};
use common_meta::key::table_route::NextTableRouteKey;
use common_meta::key::{TableMetaKey, TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_telemetry::debug;
use futures_util::TryStreamExt;
use partition::manager::PartitionRuleManagerRef;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use snafu::prelude::*;
use table::metadata::TableId;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::TableRef;
@@ -52,57 +55,19 @@ pub struct FrontendCatalogManager {
backend: KvBackendRef,
backend_cache_invalidator: KvCacheInvalidatorRef,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
table_metadata_manager: TableMetadataManagerRef,
datanode_manager: DatanodeManagerRef,
}
impl FrontendCatalogManager {
pub fn new(
backend: KvBackendRef,
backend_cache_invalidator: KvCacheInvalidatorRef,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
backend,
backend_cache_invalidator,
partition_manager,
datanode_clients,
table_metadata_manager,
}
}
pub fn backend(&self) -> KvBackendRef {
self.backend.clone()
}
pub fn partition_manager(&self) -> PartitionRuleManagerRef {
self.partition_manager.clone()
}
pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}
pub fn datanode_clients(&self) -> Arc<DatanodeClients> {
self.datanode_clients.clone()
}
pub async fn invalidate_schema(&self, catalog: &str, schema: &str) {
let key = SchemaNameKey::new(catalog, schema).as_raw_key();
self.backend_cache_invalidator.invalidate_key(&key).await;
}
pub async fn invalidate_table(
&self,
catalog: &str,
schema: &str,
table: &str,
table_id: TableId,
) {
let key = TableNameKey::new(catalog, schema, table);
#[async_trait::async_trait]
impl CacheInvalidator for FrontendCatalogManager {
async fn invalidate_table(&self, _ctx: &Context, table_ident: TableIdent) -> MetaResult<()> {
let table_id = table_ident.table_id;
let key = TableNameKey::new(
&table_ident.catalog,
&table_ident.schema,
&table_ident.table,
);
self.backend_cache_invalidator
.invalidate_key(&key.as_raw_key())
.await;
@@ -120,10 +85,54 @@ impl FrontendCatalogManager {
String::from_utf8_lossy(&key.as_raw_key())
);
self.partition_manager
.table_routes()
.invalidate_table_route(table_id)
let key = &NextTableRouteKey { table_id };
self.backend_cache_invalidator
.invalidate_key(&key.as_raw_key())
.await;
debug!(
"invalidated cache key: {}",
String::from_utf8_lossy(&key.as_raw_key())
);
Ok(())
}
}
impl FrontendCatalogManager {
pub fn new(
backend: KvBackendRef,
backend_cache_invalidator: KvCacheInvalidatorRef,
datanode_manager: DatanodeManagerRef,
) -> Self {
Self {
backend: backend.clone(),
backend_cache_invalidator,
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
datanode_manager,
}
}
pub fn backend(&self) -> KvBackendRef {
self.backend.clone()
}
pub fn partition_manager(&self) -> PartitionRuleManagerRef {
self.partition_manager.clone()
}
pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}
pub fn datanode_manager(&self) -> DatanodeManagerRef {
self.datanode_manager.clone()
}
pub async fn invalidate_schema(&self, catalog: &str, schema: &str) {
let key = SchemaNameKey::new(catalog, schema).as_raw_key();
self.backend_cache_invalidator.invalidate_key(&key).await;
}
}

View File

@@ -27,6 +27,18 @@ use store_api::storage::RegionNumber;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to invalidate table cache, source: {}", source))]
InvalidateTableCache {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to open raft engine backend, source: {}", source))]
OpenRaftEngineBackend {
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to execute ddl, source: {}", source))]
ExecuteDdl {
location: Location,
@@ -57,6 +69,12 @@ pub enum Error {
source: client::Error,
},
#[snafu(display("Failed to query, source: {}", source))]
RequestQuery {
#[snafu(backtrace)]
source: common_meta::error::Error,
},
#[snafu(display("Failed to insert data, source: {}", source))]
RequestInserts {
#[snafu(backtrace)]
@@ -710,6 +728,8 @@ impl ErrorExt for Error {
source.status_code()
}
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::ParseFileFormat { source, .. } | Error::InferSchema { source, .. } => {
source.status_code()
}
@@ -722,7 +742,10 @@ impl ErrorExt for Error {
| Error::CreateTableInfo { source }
| Error::IntoVectors { source } => source.status_code(),
Error::OpenRaftEngineBackend { .. } => StatusCode::StorageUnavailable,
Error::RequestDatanode { source } => source.status_code(),
Error::RequestQuery { source } => source.status_code(),
Error::RequestInserts { source } => source.status_code(),
Error::RequestDeletes { source } => source.status_code(),

View File

@@ -22,14 +22,13 @@ use common_meta::ident::TableIdent;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_route::NextTableRouteKey;
use common_meta::key::TableMetaKey;
use common_telemetry::error;
use partition::manager::TableRouteCacheInvalidatorRef;
#[derive(Clone)]
pub struct InvalidateTableCacheHandler {
backend_cache_invalidator: KvCacheInvalidatorRef,
table_route_cache_invalidator: TableRouteCacheInvalidatorRef,
}
#[async_trait]
@@ -73,13 +72,9 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
}
impl InvalidateTableCacheHandler {
pub fn new(
backend_cache_invalidator: KvCacheInvalidatorRef,
table_route_cache_invalidator: TableRouteCacheInvalidatorRef,
) -> Self {
pub fn new(backend_cache_invalidator: KvCacheInvalidatorRef) -> Self {
Self {
backend_cache_invalidator,
table_route_cache_invalidator,
}
}
@@ -100,8 +95,8 @@ impl InvalidateTableCacheHandler {
)
.await;
self.table_route_cache_invalidator
.invalidate_table_route(table_id)
self.backend_cache_invalidator
.invalidate_key(&NextTableRouteKey { table_id }.as_raw_key())
.await;
}
}

View File

@@ -64,13 +64,8 @@ async fn test_invalidate_table_cache_handler() {
inner: Mutex::new(inner),
});
let inner = HashMap::from([(table_id, 1)]);
let table_route = Arc::new(MockTableRouteCacheInvalidator {
inner: Mutex::new(inner),
});
let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
InvalidateTableCacheHandler::new(backend.clone(), table_route.clone()),
InvalidateTableCacheHandler::new(backend.clone()),
)]));
let (tx, mut rx) = mpsc::channel(8);
@@ -101,8 +96,6 @@ async fn test_invalidate_table_cache_handler() {
.unwrap()
.contains_key(&table_info_key.as_raw_key()));
assert!(!table_route.inner.lock().unwrap().contains_key(&table_id));
// removes a invalid key
handle_instruction(
executor,

View File

@@ -13,12 +13,8 @@
// limitations under the License.
use api::v1::alter_expr::Kind;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::region::region_request;
use api::v1::{
AlterExpr, ColumnSchema, DdlRequest, InsertRequests, RowInsertRequest, RowInsertRequests,
};
use api::v1::{AlterExpr, ColumnSchema, InsertRequests, RowInsertRequest, RowInsertRequests};
use catalog::CatalogManager;
use client::region_handler::RegionRequestHandler;
use common_catalog::consts::default_engine;
@@ -26,23 +22,23 @@ use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_query::Output;
use common_telemetry::info;
use datatypes::schema::Schema;
use servers::query_handler::grpc::GrpcQueryHandlerRef;
use session::context::QueryContextRef;
use snafu::prelude::*;
use table::engine::TableReference;
use table::TableRef;
use crate::error::{
CatalogSnafu, Error, FindNewColumnsOnInsertionSnafu, InvalidInsertRequestSnafu,
RequestDatanodeSnafu, Result,
CatalogSnafu, FindNewColumnsOnInsertionSnafu, InvalidInsertRequestSnafu, RequestDatanodeSnafu,
Result,
};
use crate::expr_factory::CreateExprFactory;
use crate::req_convert::insert::{ColumnToRow, RowToRegion};
use crate::statement::StatementExecutor;
pub(crate) struct Inserter<'a> {
catalog_manager: &'a dyn CatalogManager,
create_expr_factory: &'a CreateExprFactory,
grpc_query_handler: &'a GrpcQueryHandlerRef<Error>,
statement_executor: &'a StatementExecutor,
region_request_handler: &'a dyn RegionRequestHandler,
}
@@ -50,13 +46,13 @@ impl<'a> Inserter<'a> {
pub fn new(
catalog_manager: &'a dyn CatalogManager,
create_expr_factory: &'a CreateExprFactory,
grpc_query_handler: &'a GrpcQueryHandlerRef<Error>,
statement_executor: &'a StatementExecutor,
region_request_handler: &'a dyn RegionRequestHandler,
) -> Self {
Self {
catalog_manager,
create_expr_factory,
grpc_query_handler,
statement_executor,
region_request_handler,
}
}
@@ -164,10 +160,9 @@ impl<'a> Inserter<'a> {
kind: Some(Kind::AddColumns(add_columns)),
};
let req = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(alter_table_expr)),
});
self.grpc_query_handler.do_query(req, ctx.clone()).await?;
self.statement_executor
.alter_table_inner(alter_table_expr)
.await?;
info!(
"Successfully added new columns to table: {}.{}.{}",
@@ -187,14 +182,14 @@ impl<'a> Inserter<'a> {
table_ref.catalog, table_ref.schema, table_ref.table,
);
let create_table_expr = self
let mut create_table_expr = self
.create_expr_factory
.create_table_expr_by_column_schemas(&table_ref, request_schema, default_engine())?;
let req = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(create_table_expr)),
});
self.grpc_query_handler.do_query(req, ctx.clone()).await?;
// TODO(weny): multiple regions table.
self.statement_executor
.create_table_inner(&mut create_table_expr, None)
.await?;
info!(
"Successfully created table on insertion: {}.{}.{}",

View File

@@ -20,7 +20,6 @@ mod otlp;
mod prom_store;
mod script;
mod standalone;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
@@ -34,32 +33,38 @@ use catalog::CatalogManagerRef;
use client::client_manager::DatanodeClients;
use client::region_handler::RegionRequestHandlerRef;
use common_base::Plugins;
use common_config::KvStoreConfig;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::ddl_manager::DdlManager;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_telemetry::logging::info;
use common_telemetry::{error, timer};
use datanode::region_server::RegionServer;
use datanode::Instance as DnInstanceRef;
use distributed::DistInstance;
use log_store::raft_engine::RaftEngineBackend;
use meta_client::client::{MetaClient, MetaClientBuilder};
use partition::manager::PartitionRuleManager;
use partition::route::TableRoutes;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::plan::LogicalPlan;
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult;
use query::{QueryEngineFactory, QueryEngineRef};
use raft_engine::{Config, ReadableSize, RecoveryMode};
use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::interceptor::{
PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
};
use servers::prometheus_handler::PrometheusHandler;
use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef};
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{
InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
@@ -72,8 +77,10 @@ use sql::parser::ParserContext;
use sql::statements::copy::CopyTable;
use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;
use self::distributed::DistRegionRequestHandler;
use self::standalone::{StandaloneRegionRequestHandler, StandaloneTableMetadataCreator};
use crate::catalog::FrontendCatalogManager;
use crate::delete::Deleter;
use crate::error::{
@@ -117,7 +124,6 @@ pub struct Instance {
script_executor: Arc<ScriptExecutor>,
statement_executor: Arc<StatementExecutor>,
query_engine: QueryEngineRef,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
region_request_handler: RegionRequestHandlerRef,
create_expr_factory: CreateExprFactory,
/// plugins: this map holds extensions to customize query or auth
@@ -146,21 +152,11 @@ impl Instance {
opts: &FrontendOptions,
) -> Result<Self> {
let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
let partition_manager = Arc::new(PartitionRuleManager::new(table_routes));
let table_metadata_manager = Arc::new(TableMetadataManager::new(meta_backend.clone()));
let catalog_manager = Arc::new(FrontendCatalogManager::new(
meta_backend.clone(),
meta_backend.clone(),
partition_manager.clone(),
datanode_clients.clone(),
table_metadata_manager.clone(),
));
let dist_instance = Arc::new(DistInstance::new(
meta_client.clone(),
catalog_manager.clone(),
));
let dist_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone());
@@ -173,26 +169,25 @@ impl Instance {
)
.query_engine();
let script_executor =
Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);
let region_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone());
let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
dist_instance.clone(),
region_request_handler.clone(),
meta_client.clone(),
meta_backend.clone(),
catalog_manager.clone(),
));
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
let script_executor =
Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(
meta_backend,
partition_manager,
)),
Arc::new(InvalidateTableCacheHandler::new(meta_backend)),
]);
let heartbeat_task = Some(HeartbeatTask::new(
@@ -212,7 +207,6 @@ impl Instance {
statement_executor,
query_engine,
region_request_handler,
grpc_query_handler: dist_instance,
plugins: plugins.clone(),
servers: Arc::new(HashMap::new()),
heartbeat_task,
@@ -256,39 +250,81 @@ impl Instance {
Ok(Arc::new(meta_client))
}
pub async fn try_build_standalone_components(
dir: String,
kv_store_config: KvStoreConfig,
procedure_config: ProcedureConfig,
) -> Result<(KvBackendRef, ProcedureManagerRef)> {
let kv_store = Arc::new(
RaftEngineBackend::try_open_with_cfg(Config {
dir,
purge_threshold: ReadableSize(kv_store_config.purge_threshold.0),
recovery_mode: RecoveryMode::TolerateTailCorruption,
batch_compression_threshold: ReadableSize::kb(8),
target_file_size: ReadableSize(kv_store_config.file_size.0),
..Default::default()
})
.map_err(BoxedError::new)
.context(error::OpenRaftEngineBackendSnafu)?,
);
let state_store = Arc::new(KvStateStore::new(kv_store.clone()));
let manager_config = ManagerConfig {
max_retry_times: procedure_config.max_retry_times,
retry_delay: procedure_config.retry_delay,
..Default::default()
};
let procedure_manager = Arc::new(LocalManager::new(manager_config, state_store));
Ok((kv_store, procedure_manager))
}
pub async fn try_new_standalone(
_dn_instance: DnInstanceRef,
_region_server: RegionServer,
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
region_server: RegionServer,
) -> Result<Self> {
todo!()
// let catalog_manager = dn_instance.catalog_manager();
// let query_engine = dn_instance.query_engine();
// let script_executor =
// Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);
let script_executor =
Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);
// let region_request_handler = StandaloneRegionRequestHandler::arc(region_server);
// let statement_executor = Arc::new(StatementExecutor::new(
// catalog_manager.clone(),
// query_engine.clone(),
// dn_instance.clone(),
// region_request_handler.clone(),
// ));
let region_request_handler = StandaloneRegionRequestHandler::arc(region_server.clone());
// let create_expr_factory = CreateExprFactory;
// let grpc_query_handler = StandaloneGrpcQueryHandler::arc(dn_instance.clone());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
// Ok(Instance {
// catalog_manager: catalog_manager.clone(),
// script_executor,
// create_expr_factory,
// statement_executor,
// query_engine,
// grpc_query_handler,
// region_request_handler,
// plugins: Default::default(),
// servers: Arc::new(HashMap::new()),
// heartbeat_task: None,
// })
let cache_invalidator = Arc::new(DummyCacheInvalidator);
let ddl_executor = Arc::new(DdlManager::new(
procedure_manager,
Arc::new(StandaloneDatanodeManager(region_server)),
cache_invalidator.clone(),
table_metadata_manager.clone(),
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())),
));
let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
region_request_handler.clone(),
ddl_executor,
kv_backend.clone(),
cache_invalidator,
));
let create_expr_factory = CreateExprFactory;
Ok(Instance {
catalog_manager: catalog_manager.clone(),
script_executor,
create_expr_factory,
statement_executor,
query_engine,
region_request_handler,
plugins: Default::default(),
servers: Arc::new(HashMap::new()),
heartbeat_task: None,
})
}
pub async fn build_servers(&mut self, opts: &FrontendOptions) -> Result<()> {
@@ -311,7 +347,7 @@ impl Instance {
let inserter = Inserter::new(
self.catalog_manager.as_ref(),
&self.create_expr_factory,
&self.grpc_query_handler,
&self.statement_executor,
self.region_request_handler.as_ref(),
);
inserter.handle_row_inserts(requests, ctx).await
@@ -326,7 +362,7 @@ impl Instance {
let inserter = Inserter::new(
self.catalog_manager.as_ref(),
&self.create_expr_factory,
&self.grpc_query_handler,
&self.statement_executor,
self.region_request_handler.as_ref(),
);
inserter.handle_column_inserts(requests, ctx).await

View File

@@ -15,551 +15,25 @@
pub mod deleter;
pub(crate) mod inserter;
use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::region::{region_request, QueryRequest};
use api::v1::{column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, TruncateTableExpr};
use arrow_flight::Ticket;
use async_trait::async_trait;
use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest};
use chrono::DateTime;
use client::error::{HandleRequestSnafu, Result as ClientResult};
use client::region::RegionRequester;
use client::region_handler::RegionRequestHandler;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::AffectedRows;
use common_meta::ddl::{DdlExecutorRef, ExecutorContext};
use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_meta::table_name::TableName;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::info;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use partition::manager::PartitionInfo;
use partition::partition::{PartitionBound, PartitionDef};
use prost::Message;
use query::error::QueryExecutionSnafu;
use query::query_engine::SqlStatementExecutor;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{Ident, Value as SqlValue};
use sql::statements::create::{PartitionEntry, Partitions};
use sql::statements::statement::Statement;
use sql::statements::{self, sql_value_to_value};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableInfo, TableType};
use table::requests::{AlterTableRequest, TableOptions};
use table::TableRef;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
DeserializePartitionSnafu, FindDatanodeSnafu, FindTableRouteSnafu, NotSupportedSnafu,
ParseSqlSnafu, RequestDatanodeSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
FindDatanodeSnafu, FindTableRouteSnafu, NotSupportedSnafu, RequestQuerySnafu, Result,
};
use crate::expr_factory;
use crate::instance::distributed::deleter::DistDeleter;
use crate::instance::distributed::inserter::DistInserter;
use crate::table::{table_idents_to_full_name, DistTable};
const MAX_VALUE: &str = "MAXVALUE";
#[derive(Clone)]
pub struct DistInstance {
ddl_executor: DdlExecutorRef,
pub(crate) catalog_manager: Arc<FrontendCatalogManager>,
}
impl DistInstance {
pub fn new(ddl_executor: DdlExecutorRef, catalog_manager: Arc<FrontendCatalogManager>) -> Self {
Self {
ddl_executor,
catalog_manager,
}
}
pub async fn create_table(
&self,
create_table: &mut CreateTableExpr,
partitions: Option<Partitions>,
) -> Result<TableRef> {
let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE);
// 1. get schema info
let schema_value = self
.catalog_manager
.table_metadata_manager_ref()
.schema_manager()
.get(SchemaNameKey::new(
&create_table.catalog_name,
&create_table.schema_name,
))
.await
.context(TableMetadataManagerSnafu)?;
let table_name = TableName::new(
&create_table.catalog_name,
&create_table.schema_name,
&create_table.table_name,
);
let (partitions, partition_cols) = parse_partitions(create_table, partitions)?;
let mut table_info = create_table_info(create_table, partition_cols, schema_value)?;
let resp = self
.create_table_procedure(create_table, partitions, table_info.clone())
.await?;
let table_id = resp.table_id.context(error::UnexpectedSnafu {
violated: "expected table_id",
})?;
info!("Successfully created distributed table '{table_name}' with table id {table_id}");
table_info.ident.table_id = table_id;
let table_info = Arc::new(table_info.try_into().context(error::CreateTableInfoSnafu)?);
create_table.table_id = Some(api::v1::TableId { id: table_id });
let table = DistTable::table(table_info);
let request = RegisterTableRequest {
catalog: table_name.catalog_name.clone(),
schema: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
table_id,
table: table.clone(),
};
ensure!(
self.catalog_manager
.register_table(request)
.await
.context(CatalogSnafu)?,
TableAlreadyExistSnafu {
table: table_name.to_string()
}
);
// Invalidates local cache ASAP.
self.catalog_manager
.invalidate_table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
table_id,
)
.await;
Ok(table)
}
async fn drop_table(&self, table_name: TableName) -> Result<Output> {
let table = self
.catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
let table_id = table.table_info().table_id();
self.drop_table_procedure(&table_name, table_id).await?;
let request = DeregisterTableRequest {
catalog: table_name.catalog_name.clone(),
schema: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
};
self.catalog_manager
.deregister_table(request)
.await
.context(CatalogSnafu)?;
// Invalidates local cache ASAP.
self.catalog_manager()
.invalidate_table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
table_id,
)
.await;
Ok(Output::AffectedRows(1))
}
async fn truncate_table(&self, table_name: TableName) -> Result<Output> {
let table = self
.catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
let table_id = table.table_info().ident.table_id;
let expr = TruncateTableExpr {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
table_id: Some(api::v1::TableId { id: table_id }),
};
self.truncate_table_procedure(&expr).await?;
Ok(Output::AffectedRows(0))
}
async fn handle_statement(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> Result<Output> {
match stmt {
Statement::CreateDatabase(stmt) => {
let expr = CreateDatabaseExpr {
database_name: stmt.name.to_string(),
create_if_not_exists: stmt.if_not_exists,
options: Default::default(),
};
self.handle_create_database(expr, query_ctx).await
}
Statement::CreateTable(stmt) => {
let create_expr = &mut expr_factory::create_to_expr(&stmt, query_ctx)?;
let _ = self.create_table(create_expr, stmt.partitions).await?;
Ok(Output::AffectedRows(0))
}
Statement::CreateExternalTable(stmt) => {
let create_expr = &mut expr_factory::create_external_expr(stmt, query_ctx).await?;
let _ = self.create_table(create_expr, None).await?;
Ok(Output::AffectedRows(0))
}
Statement::Alter(alter_table) => {
let expr = expr_factory::to_alter_expr(alter_table, query_ctx)?;
self.handle_alter_table(expr).await
}
Statement::DropTable(stmt) => {
let (catalog, schema, table) =
table_idents_to_full_name(stmt.table_name(), query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let table_name = TableName::new(catalog, schema, table);
self.drop_table(table_name).await
}
Statement::ShowCreateTable(show) => {
let (catalog, schema, table) =
table_idents_to_full_name(&show.table_name, query_ctx.clone())
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let table_ref = self
.catalog_manager
.table(&catalog, &schema, &table)
.await
.context(CatalogSnafu)?
.context(TableNotFoundSnafu { table_name: &table })?;
let table_name = TableName::new(catalog, schema, table);
self.show_create_table(table_name, table_ref, query_ctx.clone())
.await
}
Statement::TruncateTable(stmt) => {
let (catalog, schema, table) =
table_idents_to_full_name(stmt.table_name(), query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let table_name = TableName::new(catalog, schema, table);
self.truncate_table(table_name).await
}
_ => NotSupportedSnafu {
feat: format!("{stmt:?}"),
}
.fail(),
}
}
async fn show_create_table(
&self,
table_name: TableName,
table: TableRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let partitions = self
.catalog_manager
.partition_manager()
.find_table_partitions(table.table_info().table_id())
.await
.context(error::FindTablePartitionRuleSnafu {
table_name: &table_name.table_name,
})?;
let partitions = create_partitions_stmt(partitions)?;
query::sql::show_create_table(table, partitions, query_ctx)
.context(error::ExecuteStatementSnafu)
}
/// Handles distributed database creation
async fn handle_create_database(
&self,
expr: CreateDatabaseExpr,
query_ctx: QueryContextRef,
) -> Result<Output> {
let catalog = query_ctx.current_catalog();
if self
.catalog_manager
.schema_exist(catalog, &expr.database_name)
.await
.context(CatalogSnafu)?
{
return if expr.create_if_not_exists {
Ok(Output::AffectedRows(1))
} else {
SchemaExistsSnafu {
name: &expr.database_name,
}
.fail()
};
}
let schema = SchemaNameKey::new(catalog, &expr.database_name);
let exist = self
.catalog_manager
.table_metadata_manager_ref()
.schema_manager()
.exist(schema)
.await
.context(error::TableMetadataManagerSnafu)?;
ensure!(
!exist,
SchemaExistsSnafu {
name: schema.to_string(),
}
);
let schema_value =
SchemaNameValue::try_from(&expr.options).context(error::TableMetadataManagerSnafu)?;
self.catalog_manager
.table_metadata_manager_ref()
.schema_manager()
.create(schema, Some(schema_value))
.await
.context(error::TableMetadataManagerSnafu)?;
// Since the database created on meta does not go through KvBackend, so we manually
// invalidate the cache here.
//
// TODO(fys): when the meta invalidation cache mechanism is established, remove it.
self.catalog_manager()
.invalidate_schema(catalog, &expr.database_name)
.await;
Ok(Output::AffectedRows(1))
}
fn verify_alter(
&self,
table_id: TableId,
table_info: Arc<TableInfo>,
expr: AlterExpr,
) -> Result<()> {
let request: table::requests::AlterTableRequest =
common_grpc_expr::alter_expr_to_request(table_id, expr)
.context(AlterExprToRequestSnafu)?;
let AlterTableRequest { table_name, .. } = &request;
let _ = table_info
.meta
.builder_with_alter_kind(table_name, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
Ok(())
}
async fn handle_alter_table(&self, expr: AlterExpr) -> Result<Output> {
let catalog_name = if expr.catalog_name.is_empty() {
DEFAULT_CATALOG_NAME
} else {
expr.catalog_name.as_str()
};
let schema_name = if expr.schema_name.is_empty() {
DEFAULT_SCHEMA_NAME
} else {
expr.schema_name.as_str()
};
let table_name = expr.table_name.as_str();
let table = self
.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(CatalogSnafu)?
.context(TableNotFoundSnafu {
table_name: format_full_table_name(catalog_name, schema_name, table_name),
})?;
let table_id = table.table_info().ident.table_id;
self.verify_alter(table_id, table.table_info(), expr.clone())?;
let req = SubmitDdlTaskRequest {
task: DdlTask::new_alter_table(expr.clone()),
};
self.ddl_executor
.submit_ddl_task(&ExecutorContext::default(), req)
.await
.context(error::ExecuteDdlSnafu)?;
// Invalidates local cache ASAP.
self.catalog_manager()
.invalidate_table(catalog_name, schema_name, table_name, table_id)
.await;
Ok(Output::AffectedRows(0))
}
async fn create_table_procedure(
&self,
create_table: &CreateTableExpr,
partitions: Vec<Partition>,
table_info: RawTableInfo,
) -> Result<SubmitDdlTaskResponse> {
let partitions = partitions.into_iter().map(Into::into).collect();
let request = SubmitDdlTaskRequest {
task: DdlTask::new_create_table(create_table.clone(), partitions, table_info),
};
self.ddl_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}
async fn drop_table_procedure(
&self,
table_name: &TableName,
table_id: TableId,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
task: DdlTask::new_drop_table(
table_name.catalog_name.to_string(),
table_name.schema_name.to_string(),
table_name.table_name.to_string(),
table_id,
),
};
self.ddl_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}
async fn truncate_table_procedure(
&self,
truncate_table: &TruncateTableExpr,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
task: DdlTask::new_truncate_table(truncate_table.clone()),
};
self.ddl_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}
pub fn catalog_manager(&self) -> Arc<FrontendCatalogManager> {
self.catalog_manager.clone()
}
}
#[async_trait]
impl SqlStatementExecutor for DistInstance {
async fn execute_sql(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> query::error::Result<Output> {
self.handle_statement(stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)
}
}
#[async_trait]
impl GrpcQueryHandler for DistInstance {
type Error = error::Error;
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Inserts(_) => NotSupportedSnafu { feat: "inserts" }.fail(),
Request::Deletes(_) => NotSupportedSnafu { feat: "deletes" }.fail(),
Request::RowInserts(_) => NotSupportedSnafu {
feat: "row inserts",
}
.fail(),
Request::RowDeletes(_) => NotSupportedSnafu {
feat: "row deletes",
}
.fail(),
Request::Query(_) => {
unreachable!("Query should have been handled directly in Frontend Instance!")
}
Request::Ddl(request) => {
let expr = request.expr.context(error::IncompleteGrpcResultSnafu {
err_msg: "Missing 'expr' in DDL request",
})?;
match expr {
DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr, ctx).await,
DdlExpr::CreateTable(mut expr) => {
let _ = self.create_table(&mut expr, None).await?;
Ok(Output::AffectedRows(0))
}
DdlExpr::Alter(expr) => self.handle_alter_table(expr).await,
DdlExpr::DropTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.drop_table(table_name).await
}
DdlExpr::TruncateTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.truncate_table(table_name).await
}
}
}
}
}
}
pub(crate) struct DistRegionRequestHandler {
catalog_manager: Arc<FrontendCatalogManager>,
@@ -655,288 +129,12 @@ impl DistRegionRequestHandler {
.context(FindDatanodeSnafu {
region: region_id.region_number(),
})?;
let client = self
.catalog_manager
.datanode_clients()
.get_client(peer)
.await;
let ticket = Ticket {
ticket: request.encode_to_vec().into(),
};
let region_requester = RegionRequester::new(client);
region_requester
.do_get(ticket)
let client = self.catalog_manager.datanode_manager().datanode(peer).await;
client
.handle_query(request)
.await
.context(RequestDatanodeSnafu)
}
}
fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Partitions>> {
if partitions.is_empty() {
return Ok(None);
}
let column_list: Vec<Ident> = partitions[0]
.partition
.partition_columns()
.iter()
.map(|name| name[..].into())
.collect();
let entries = partitions
.into_iter()
.map(|info| {
// Generated the partition name from id
let name = &format!("r{}", info.id.region_number());
let bounds = info.partition.partition_bounds();
let value_list = bounds
.iter()
.map(|b| match b {
PartitionBound::Value(v) => statements::value_to_sql_value(v)
.with_context(|_| error::ConvertSqlValueSnafu { value: v.clone() }),
PartitionBound::MaxValue => Ok(SqlValue::Number(MAX_VALUE.to_string(), false)),
})
.collect::<Result<Vec<_>>>()?;
Ok(PartitionEntry {
name: name[..].into(),
value_list,
})
})
.collect::<Result<Vec<_>>>()?;
Ok(Some(Partitions {
column_list,
entries,
}))
}
fn create_table_info(
create_table: &CreateTableExpr,
partition_columns: Vec<String>,
schema_opts: Option<SchemaNameValue>,
) -> Result<RawTableInfo> {
let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
let mut column_name_to_index_map = HashMap::new();
for (idx, column) in create_table.column_defs.iter().enumerate() {
let schema =
column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
column: &column.name,
})?;
let schema = schema.with_time_index(column.name == create_table.time_index);
column_schemas.push(schema);
let _ = column_name_to_index_map.insert(column.name.clone(), idx);
}
let timestamp_index = column_name_to_index_map
.get(&create_table.time_index)
.cloned();
let raw_schema = RawSchema {
column_schemas: column_schemas.clone(),
timestamp_index,
version: 0,
};
let primary_key_indices = create_table
.primary_keys
.iter()
.map(|name| {
column_name_to_index_map
.get(name)
.cloned()
.context(ColumnNotFoundSnafu { msg: name })
})
.collect::<Result<Vec<_>>>()?;
let partition_key_indices = partition_columns
.into_iter()
.map(|col_name| {
column_name_to_index_map
.get(&col_name)
.cloned()
.context(ColumnNotFoundSnafu { msg: col_name })
})
.collect::<Result<Vec<_>>>()?;
let table_options = TableOptions::try_from(&create_table.table_options)
.context(UnrecognizedTableOptionSnafu)?;
let table_options = merge_options(table_options, schema_opts);
let meta = RawTableMeta {
schema: raw_schema,
primary_key_indices,
value_indices: vec![],
engine: create_table.engine.clone(),
next_column_id: column_schemas.len() as u32,
region_numbers: vec![],
engine_options: HashMap::new(),
options: table_options,
created_on: DateTime::default(),
partition_key_indices,
};
let desc = if create_table.desc.is_empty() {
None
} else {
Some(create_table.desc.clone())
};
let table_info = RawTableInfo {
ident: TableIdent {
// The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
table_id: 0,
version: 0,
},
name: create_table.table_name.clone(),
desc,
catalog_name: create_table.catalog_name.clone(),
schema_name: create_table.schema_name.clone(),
meta,
table_type: TableType::Base,
};
Ok(table_info)
}
fn merge_options(
mut table_opts: TableOptions,
schema_opts: Option<SchemaNameValue>,
) -> TableOptions {
table_opts.ttl = table_opts.ttl.or(schema_opts.and_then(|s| s.ttl));
table_opts
}
fn parse_partitions(
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
) -> Result<(Vec<MetaPartition>, Vec<String>)> {
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
// the partition column, and create only one partition.
let partition_columns = find_partition_columns(&partitions)?;
let partition_entries = find_partition_entries(create_table, &partitions, &partition_columns)?;
Ok((
partition_entries
.into_iter()
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
.collect::<std::result::Result<_, _>>()
.context(DeserializePartitionSnafu)?,
partition_columns,
))
}
fn find_partition_entries(
create_table: &CreateTableExpr,
partitions: &Option<Partitions>,
partition_columns: &[String],
) -> Result<Vec<Vec<PartitionBound>>> {
let entries = if let Some(partitions) = partitions {
let column_defs = partition_columns
.iter()
.map(|pc| {
create_table
.column_defs
.iter()
.find(|c| &c.name == pc)
// unwrap is safe here because we have checked that partition columns are defined
.unwrap()
})
.collect::<Vec<_>>();
let mut column_name_and_type = Vec::with_capacity(column_defs.len());
for column in column_defs {
let column_name = &column.name;
let data_type = ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(column.data_type).context(ColumnDataTypeSnafu)?,
);
column_name_and_type.push((column_name, data_type));
}
let mut entries = Vec::with_capacity(partitions.entries.len());
for e in partitions.entries.iter() {
let mut values = Vec::with_capacity(e.value_list.len());
for (i, v) in e.value_list.iter().enumerate() {
// indexing is safe here because we have checked that "value_list" and "column_list" are matched in size
let (column_name, data_type) = &column_name_and_type[i];
let v = match v {
SqlValue::Number(n, _) if n == MAX_VALUE => PartitionBound::MaxValue,
_ => PartitionBound::Value(
sql_value_to_value(column_name, data_type, v).context(ParseSqlSnafu)?,
),
};
values.push(v);
}
entries.push(values);
}
entries
} else {
vec![vec![PartitionBound::MaxValue]]
};
Ok(entries)
}
fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
let columns = if let Some(partitions) = partitions {
partitions
.column_list
.iter()
.map(|x| x.value.clone())
.collect::<Vec<_>>()
} else {
vec![]
};
Ok(columns)
}
#[cfg(test)]
mod test {
use session::context::QueryContext;
use sql::dialect::GreptimeDbDialect;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use super::*;
#[tokio::test]
async fn test_parse_partitions() {
common_telemetry::init_default_ut_logging();
let cases = [
(
r"
CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
PARTITION BY RANGE COLUMNS (b) (
PARTITION r0 VALUES LESS THAN ('hz'),
PARTITION r1 VALUES LESS THAN ('sh'),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
)
ENGINE=mito",
r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
),
(
r"
CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
PARTITION BY RANGE COLUMNS (b, a) (
PARTITION r0 VALUES LESS THAN ('hz', 10),
PARTITION r1 VALUES LESS THAN ('sh', 20),
PARTITION r2 VALUES LESS THAN (MAXVALUE, MAXVALUE),
)
ENGINE=mito",
r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
),
];
for (sql, expected) in cases {
let result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap();
match &result[0] {
Statement::CreateTable(c) => {
let expr = expr_factory::create_to_expr(c, QueryContext::arc()).unwrap();
let (partitions, _) = parse_partitions(&expr, c.partitions.clone()).unwrap();
let json = serde_json::to_string(&partitions).unwrap();
assert_eq!(json, expected);
}
_ => unreachable!(),
}
}
.context(RequestQuerySnafu)
}
}

View File

@@ -15,7 +15,7 @@
use std::collections::HashMap;
use api::v1::region::{region_request, DeleteRequests, RegionRequest, RegionRequestHeader};
use common_meta::datanode_manager::{AffectedRows, DatanodeManager};
use common_meta::datanode_manager::AffectedRows;
use common_meta::peer::Peer;
use futures::future;
use metrics::counter;
@@ -62,7 +62,7 @@ impl<'a> DistDeleter<'a> {
let trace_id = self.trace_id.unwrap_or_default();
let span_id = self.span_id.unwrap_or_default();
let results = future::try_join_all(requests.into_iter().map(|(peer, deletes)| {
let datanode_clients = self.catalog_manager.datanode_clients();
let datanode_clients = self.catalog_manager.datanode_manager();
common_runtime::spawn_write(async move {
let request = RegionRequest {
header: Some(RegionRequestHeader { trace_id, span_id }),
@@ -135,15 +135,11 @@ mod tests {
use common_meta::helper::{CatalogValue, SchemaValue};
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackend, KvBackendRef};
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::rpc::store::PutRequest;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema as DtColumnSchema, Schema};
use datatypes::prelude::VectorRef;
use datatypes::vectors::Int32Vector;
use table::metadata::{RawTableInfo, TableInfoBuilder, TableMetaBuilder};
use super::*;
use crate::heartbeat::handler::tests::MockKvCacheInvalidator;
@@ -174,72 +170,15 @@ mod tests {
backend
}
async fn create_testing_table(
table_name: &str,
table_metadata_manager: &TableMetadataManagerRef,
) {
let schema = Arc::new(Schema::new(vec![
DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(
"current_timestamp()".to_string(),
)))
.unwrap(),
DtColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
DtColumnSchema::new("value", ConcreteDataType::int32_datatype(), false),
]));
let table_meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![1])
.next_column_id(1)
.build()
.unwrap();
let table_id = 1;
let table_info: RawTableInfo = TableInfoBuilder::new(table_name, table_meta)
.table_id(table_id)
.build()
.unwrap()
.into();
let region_route_factory = |region_id: u64, peer: u64| RegionRoute {
region: Region {
id: region_id.into(),
..Default::default()
},
leader_peer: Some(Peer {
id: peer,
addr: String::new(),
}),
follower_peers: vec![],
};
let region_routes = vec![
region_route_factory(1, 1),
region_route_factory(2, 2),
region_route_factory(3, 3),
];
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
}
#[tokio::test]
async fn test_split_deletes() {
let backend = prepare_mocked_backend().await;
let table_metadata_manager = Arc::new(TableMetadataManager::new(backend.clone()));
let table_name = "one_column_partitioning_table";
create_testing_table(table_name, &table_metadata_manager).await;
create_partition_rule_manager(backend.clone()).await;
let catalog_manager = Arc::new(FrontendCatalogManager::new(
backend,
Arc::new(MockKvCacheInvalidator::default()),
create_partition_rule_manager().await,
Arc::new(DatanodeClients::default()),
table_metadata_manager,
));
let new_delete_request = |vector: VectorRef| -> DeleteRequest {

View File

@@ -15,7 +15,7 @@
use std::collections::HashMap;
use api::v1::region::{region_request, InsertRequests, RegionRequest, RegionRequestHeader};
use common_meta::datanode_manager::{AffectedRows, DatanodeManager};
use common_meta::datanode_manager::AffectedRows;
use common_meta::peer::Peer;
use futures_util::future;
use metrics::counter;
@@ -62,7 +62,7 @@ impl<'a> DistInserter<'a> {
let trace_id = self.trace_id.unwrap_or_default();
let span_id = self.span_id.unwrap_or_default();
let results = future::try_join_all(requests.into_iter().map(|(peer, inserts)| {
let datanode_clients = self.catalog_manager.datanode_clients();
let datanode_clients = self.catalog_manager.datanode_manager();
common_runtime::spawn_write(async move {
let request = RegionRequest {
header: Some(RegionRequestHeader { trace_id, span_id }),
@@ -102,14 +102,12 @@ impl<'a> DistInserter<'a> {
.find_table_route(table_id)
.await
.context(FindTableRouteSnafu { table_id })?;
let region_map = table_route.region_map();
for (region_number, insert) in req_splits {
let peer =
table_route
.find_region_leader(region_number)
.context(FindDatanodeSnafu {
region: region_number,
})?;
let peer = *region_map.get(&region_number).context(FindDatanodeSnafu {
region: region_number,
})?;
inserts
.entry(peer.clone())
.or_default()
@@ -133,14 +131,10 @@ mod tests {
use client::client_manager::DatanodeClients;
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use common_meta::rpc::router::{Region, RegionRoute};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema as DtColumnSchema, Schema};
use datatypes::prelude::VectorRef;
use datatypes::vectors::Int32Vector;
use table::metadata::{RawTableInfo, TableInfoBuilder, TableMetaBuilder};
use super::*;
use crate::heartbeat::handler::tests::MockKvCacheInvalidator;
@@ -164,71 +158,15 @@ mod tests {
backend
}
async fn create_testing_table(
table_name: &str,
table_metadata_manager: &TableMetadataManagerRef,
) {
let schema = Arc::new(Schema::new(vec![
DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false)
.with_time_index(true)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(
"current_timestamp()".to_string(),
)))
.unwrap(),
DtColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
]));
let table_meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![])
.next_column_id(1)
.build()
.unwrap();
let table_id = 1;
let table_info: RawTableInfo = TableInfoBuilder::new(table_name, table_meta)
.table_id(table_id)
.build()
.unwrap()
.into();
let region_route_factory = |region_id: u64, peer: u64| RegionRoute {
region: Region {
id: region_id.into(),
..Default::default()
},
leader_peer: Some(Peer {
id: peer,
addr: String::new(),
}),
follower_peers: vec![],
};
let region_routes = vec![
region_route_factory(1, 1),
region_route_factory(2, 2),
region_route_factory(3, 3),
];
table_metadata_manager
.create_table_metadata(table_info, region_routes)
.await
.unwrap();
}
#[tokio::test]
async fn test_split_inserts() {
let backend = prepare_mocked_backend().await;
let table_metadata_manager = Arc::new(TableMetadataManager::new(backend.clone()));
let table_name = "one_column_partitioning_table";
create_testing_table(table_name, &table_metadata_manager).await;
create_partition_rule_manager(backend.clone()).await;
let catalog_manager = Arc::new(FrontendCatalogManager::new(
backend,
Arc::new(MockKvCacheInvalidator::default()),
create_partition_rule_manager().await,
Arc::new(DatanodeClients::default()),
table_metadata_manager,
));
let inserter = DistInserter::new(&catalog_manager);

View File

@@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_meta::table_name::TableName;
use common_query::Output;
use query::parser::PromQuery;
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
@@ -24,7 +26,9 @@ use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{Error, IncompleteGrpcResultSnafu, NotSupportedSnafu, PermissionSnafu, Result};
use crate::error::{
self, Error, IncompleteGrpcResultSnafu, NotSupportedSnafu, PermissionSnafu, Result,
};
use crate::instance::Instance;
#[async_trait]
@@ -87,9 +91,41 @@ impl GrpcQueryHandler for Instance {
}
}
}
Request::Ddl(_) => {
GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx.clone())
.await?
Request::Ddl(request) => {
let expr = request.expr.context(error::UnexpectedSnafu {
violated: "expected expr",
})?;
match expr {
DdlExpr::CreateTable(mut expr) => {
// TODO(weny): supports to create multiple region table.
let _ = self
.statement_executor
.create_table_inner(&mut expr, None)
.await?;
Output::AffectedRows(0)
}
DdlExpr::Alter(expr) => self.statement_executor.alter_table_inner(expr).await?,
DdlExpr::CreateDatabase(expr) => {
self.statement_executor
.create_database(
ctx.current_catalog(),
&expr.database_name,
expr.create_if_not_exists,
)
.await?
}
DdlExpr::DropTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.statement_executor.drop_table(table_name).await?
}
DdlExpr::TruncateTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.statement_executor.truncate_table(table_name).await?
}
}
}
};

View File

@@ -14,38 +14,31 @@
use std::sync::Arc;
use api::v1::greptime_request::Request;
use api::v1::region::{region_request, QueryRequest};
use api::v1::meta::Partition;
use api::v1::region::{region_request, QueryRequest, RegionRequest};
use async_trait::async_trait;
use client::error::{HandleRequestSnafu, Result as ClientResult};
use client::region::check_response_header;
use client::region_handler::RegionRequestHandler;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::AffectedRows;
use common_query::Output;
use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef};
use common_meta::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::{Sequence, SequenceRef};
use common_recordbatch::SendableRecordBatchStream;
use datanode::error::Error as DatanodeError;
use datanode::region_server::RegionServer;
use servers::grpc::region_server::RegionServerHandler;
use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef};
use session::context::QueryContextRef;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
use crate::error::{Error, InvokeDatanodeSnafu, InvokeRegionServerSnafu, Result};
use crate::error::InvokeRegionServerSnafu;
pub(crate) struct StandaloneGrpcQueryHandler(GrpcQueryHandlerRef<DatanodeError>);
#[async_trait]
impl GrpcQueryHandler for StandaloneGrpcQueryHandler {
type Error = Error;
async fn do_query(&self, query: Request, ctx: QueryContextRef) -> Result<Output> {
self.0
.do_query(query, ctx)
.await
.context(InvokeDatanodeSnafu)
}
}
const TABLE_ID_SEQ: &str = "table_id";
pub(crate) struct StandaloneRegionRequestHandler {
region_server: RegionServer,
@@ -85,3 +78,84 @@ impl RegionRequestHandler for StandaloneRegionRequestHandler {
.context(HandleRequestSnafu)
}
}
pub(crate) struct StandaloneDatanode(pub(crate) RegionServer);
#[async_trait]
impl Datanode for StandaloneDatanode {
async fn handle(&self, request: RegionRequest) -> MetaResult<AffectedRows> {
let body = request.body.context(meta_error::UnexpectedSnafu {
err_msg: "body not found",
})?;
let resp = self
.0
.handle(body)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Ok(resp.affected_rows)
}
async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
self.0
.handle_read(request)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
}
}
pub struct StandaloneDatanodeManager(pub RegionServer);
#[async_trait]
impl DatanodeManager for StandaloneDatanodeManager {
async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
Arc::new(StandaloneDatanode(self.0.clone()))
}
}
pub(crate) struct StandaloneTableMetadataCreator {
table_id_sequence: SequenceRef,
}
impl StandaloneTableMetadataCreator {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self {
table_id_sequence: Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_backend)),
}
}
}
#[async_trait]
impl TableMetadataAllocator for StandaloneTableMetadataCreator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
raw_table_info: &mut RawTableInfo,
partitions: &[Partition],
) -> MetaResult<(TableId, Vec<RegionRoute>)> {
let table_id = self.table_id_sequence.next().await? as u32;
raw_table_info.ident.table_id = table_id;
let region_routes = partitions
.iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: RegionId::new(table_id, i as u32),
partition: Some(partition.clone().into()),
..Default::default()
};
// It's only a placeholder.
let peer = Peer::default();
RegionRoute {
region,
leader_peer: Some(peer),
follower_peers: vec![],
}
})
.collect::<Vec<_>>();
Ok((table_id, region_routes))
}
}

View File

@@ -30,3 +30,5 @@ mod server;
pub mod service_config;
pub mod statement;
pub mod table;
pub const MAX_VALUE: &str = "MAXVALUE";

View File

@@ -15,6 +15,7 @@
mod backup;
mod copy_table_from;
mod copy_table_to;
mod ddl;
mod describe;
mod dml;
mod show;
@@ -22,17 +23,23 @@ mod tql;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use api::v1::region::region_request;
use catalog::CatalogManagerRef;
use client::region_handler::RegionRequestHandlerRef;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use common_query::Output;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use query::parser::QueryStatement;
use query::plan::LogicalPlan;
use query::query_engine::SqlStatementExecutorRef;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
@@ -46,8 +53,8 @@ use table::requests::{
use table::TableRef;
use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu,
InvalidSqlSnafu, PlanStatementSnafu, RequestDatanodeSnafu, Result, TableNotFoundSnafu,
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, PlanStatementSnafu,
RequestDatanodeSnafu, Result, TableNotFoundSnafu,
};
use crate::req_convert::{delete, insert};
use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
@@ -57,22 +64,30 @@ use crate::table::table_idents_to_full_name;
pub struct StatementExecutor {
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
sql_stmt_executor: SqlStatementExecutorRef,
region_request_handler: RegionRequestHandlerRef,
ddl_executor: DdlTaskExecutorRef,
table_metadata_manager: TableMetadataManagerRef,
partition_manager: PartitionRuleManagerRef,
cache_invalidator: CacheInvalidatorRef,
}
impl StatementExecutor {
pub(crate) fn new(
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
sql_stmt_executor: SqlStatementExecutorRef,
region_request_handler: RegionRequestHandlerRef,
ddl_task_executor: DdlTaskExecutorRef,
kv_backend: KvBackendRef,
cache_invalidator: CacheInvalidatorRef,
) -> Self {
Self {
catalog_manager,
query_engine,
sql_stmt_executor,
region_request_handler,
ddl_executor: ddl_task_executor,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
partition_manager: Arc::new(PartitionRuleManager::new(kv_backend)),
cache_invalidator,
}
}
@@ -123,17 +138,59 @@ impl StatementExecutor {
self.copy_database(to_copy_database_request(arg, &query_ctx)?)
.await
}
Statement::CreateTable(_)
| Statement::CreateExternalTable(_)
| Statement::CreateDatabase(_)
| Statement::Alter(_)
| Statement::DropTable(_)
| Statement::TruncateTable(_)
| Statement::ShowCreateTable(_) => self
.sql_stmt_executor
.execute_sql(stmt, query_ctx)
Statement::CreateTable(stmt) => {
let _ = self.create_table(stmt, query_ctx).await?;
Ok(Output::AffectedRows(0))
}
Statement::CreateExternalTable(stmt) => {
let _ = self.create_external_table(stmt, query_ctx).await?;
Ok(Output::AffectedRows(0))
}
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
Statement::DropTable(stmt) => {
let (catalog, schema, table) =
table_idents_to_full_name(stmt.table_name(), query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let table_name = TableName::new(catalog, schema, table);
self.drop_table(table_name).await
}
Statement::TruncateTable(stmt) => {
let (catalog, schema, table) =
table_idents_to_full_name(stmt.table_name(), query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let table_name = TableName::new(catalog, schema, table);
self.truncate_table(table_name).await
}
Statement::CreateDatabase(stmt) => {
self.create_database(
query_ctx.current_catalog(),
&stmt.name.to_string(),
stmt.if_not_exists,
)
.await
.context(ExecuteStatementSnafu),
}
Statement::ShowCreateTable(show) => {
let (catalog, schema, table) =
table_idents_to_full_name(&show.table_name, query_ctx.clone())
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let table_ref = self
.catalog_manager
.table(&catalog, &schema, &table)
.await
.context(error::CatalogSnafu)?
.context(error::TableNotFoundSnafu { table_name: &table })?;
let table_name = TableName::new(catalog, schema, table);
self.show_create_table(table_name, table_ref, query_ctx)
.await
}
}
}

View File

@@ -0,0 +1,626 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{column_def, AlterExpr, CreateTableExpr, TruncateTableExpr};
use catalog::{CatalogManagerRef, DeregisterTableRequest, RegisterTableRequest};
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_meta::cache_invalidator::Context;
use common_meta::ddl::ExecutorContext;
use common_meta::ident::TableIdent;
use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::info;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use partition::partition::{PartitionBound, PartitionDef};
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::Value as SqlValue;
use sql::statements::alter::AlterTable;
use sql::statements::create::{CreateExternalTable, CreateTable, Partitions};
use sql::statements::sql_value_to_value;
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
use table::requests::{AlterTableRequest, TableOptions};
use table::TableRef;
use super::StatementExecutor;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
DeserializePartitionSnafu, ParseSqlSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
};
use crate::table::DistTable;
use crate::{expr_factory, MAX_VALUE};
impl StatementExecutor {
pub fn catalog_manager(&self) -> CatalogManagerRef {
self.catalog_manager.clone()
}
pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result<TableRef> {
let create_expr = &mut expr_factory::create_to_expr(&stmt, ctx)?;
self.create_table_inner(create_expr, stmt.partitions).await
}
pub async fn create_external_table(
&self,
create_expr: CreateExternalTable,
ctx: QueryContextRef,
) -> Result<TableRef> {
let create_expr = &mut expr_factory::create_external_expr(create_expr, ctx).await?;
self.create_table_inner(create_expr, None).await
}
pub async fn create_table_inner(
&self,
create_table: &mut CreateTableExpr,
partitions: Option<Partitions>,
) -> Result<TableRef> {
let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE);
let schema = self
.table_metadata_manager
.schema_manager()
.get(SchemaNameKey::new(
&create_table.catalog_name,
&create_table.schema_name,
))
.await
.context(TableMetadataManagerSnafu)?;
let Some(schema_opts) = schema else {
return SchemaNotFoundSnafu {
schema_info: &create_table.schema_name,
}
.fail();
};
let table_name = TableName::new(
&create_table.catalog_name,
&create_table.schema_name,
&create_table.table_name,
);
let (partitions, partition_cols) = parse_partitions(create_table, partitions)?;
let mut table_info = create_table_info(create_table, partition_cols, schema_opts)?;
let resp = self
.create_table_procedure(create_table, partitions, table_info.clone())
.await?;
let table_id = resp.table_id.context(error::UnexpectedSnafu {
violated: "expected table_id",
})?;
info!("Successfully created distributed table '{table_name}' with table id {table_id}");
table_info.ident.table_id = table_id;
let engine = table_info.meta.engine.to_string();
let table_info = Arc::new(table_info.try_into().context(error::CreateTableInfoSnafu)?);
create_table.table_id = Some(api::v1::TableId { id: table_id });
let table = DistTable::table(table_info);
let request = RegisterTableRequest {
catalog: table_name.catalog_name.clone(),
schema: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
table_id,
table: table.clone(),
};
ensure!(
self.catalog_manager
.register_table(request)
.await
.context(CatalogSnafu)?,
TableAlreadyExistSnafu {
table: table_name.to_string()
}
);
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate_table(
&Context::default(),
TableIdent {
catalog: table_name.catalog_name.to_string(),
schema: table_name.schema_name.to_string(),
table: table_name.table_name.to_string(),
table_id,
engine,
},
)
.await
.context(error::InvalidateTableCacheSnafu)?;
Ok(table)
}
pub async fn drop_table(&self, table_name: TableName) -> Result<Output> {
let table = self
.catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
let table_id = table.table_info().table_id();
let engine = table.table_info().meta.engine.to_string();
self.drop_table_procedure(&table_name, table_id).await?;
let request = DeregisterTableRequest {
catalog: table_name.catalog_name.clone(),
schema: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
};
self.catalog_manager
.deregister_table(request)
.await
.context(CatalogSnafu)?;
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate_table(
&Context::default(),
TableIdent {
catalog: table_name.catalog_name.to_string(),
schema: table_name.schema_name.to_string(),
table: table_name.table_name.to_string(),
table_id,
engine,
},
)
.await
.context(error::InvalidateTableCacheSnafu)?;
Ok(Output::AffectedRows(1))
}
pub async fn truncate_table(&self, table_name: TableName) -> Result<Output> {
let table = self
.catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
let table_id = table.table_info().ident.table_id;
let expr = TruncateTableExpr {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
table_id: Some(api::v1::TableId { id: table_id }),
};
self.truncate_table_procedure(&expr).await?;
Ok(Output::AffectedRows(0))
}
fn verify_alter(
&self,
table_id: TableId,
table_info: Arc<TableInfo>,
expr: AlterExpr,
) -> Result<()> {
let request: table::requests::AlterTableRequest =
common_grpc_expr::alter_expr_to_request(table_id, expr)
.context(AlterExprToRequestSnafu)?;
let AlterTableRequest { table_name, .. } = &request;
let _ = table_info
.meta
.builder_with_alter_kind(table_name, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
Ok(())
}
pub async fn alter_table(
&self,
alter_table: AlterTable,
query_ctx: QueryContextRef,
) -> Result<Output> {
let expr = expr_factory::to_alter_expr(alter_table, query_ctx)?;
self.alter_table_inner(expr).await
}
pub async fn alter_table_inner(&self, expr: AlterExpr) -> Result<Output> {
let catalog_name = if expr.catalog_name.is_empty() {
DEFAULT_CATALOG_NAME
} else {
expr.catalog_name.as_str()
};
let schema_name = if expr.schema_name.is_empty() {
DEFAULT_SCHEMA_NAME
} else {
expr.schema_name.as_str()
};
let table_name = expr.table_name.as_str();
let table = self
.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(catalog_name, schema_name, table_name),
})?;
let table_id = table.table_info().ident.table_id;
let engine = table.table_info().meta.engine.to_string();
self.verify_alter(table_id, table.table_info(), expr.clone())?;
let req = SubmitDdlTaskRequest {
task: DdlTask::new_alter_table(expr.clone()),
};
self.ddl_executor
.submit_ddl_task(&ExecutorContext::default(), req)
.await
.context(error::ExecuteDdlSnafu)?;
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate_table(
&Context::default(),
TableIdent {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
table: table_name.to_string(),
table_id,
engine,
},
)
.await
.context(error::InvalidateTableCacheSnafu)?;
Ok(Output::AffectedRows(0))
}
async fn create_table_procedure(
&self,
create_table: &CreateTableExpr,
partitions: Vec<Partition>,
table_info: RawTableInfo,
) -> Result<SubmitDdlTaskResponse> {
let partitions = partitions.into_iter().map(Into::into).collect();
let request = SubmitDdlTaskRequest {
task: DdlTask::new_create_table(create_table.clone(), partitions, table_info),
};
self.ddl_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}
async fn drop_table_procedure(
&self,
table_name: &TableName,
table_id: TableId,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
task: DdlTask::new_drop_table(
table_name.catalog_name.to_string(),
table_name.schema_name.to_string(),
table_name.table_name.to_string(),
table_id,
),
};
self.ddl_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}
async fn truncate_table_procedure(
&self,
truncate_table: &TruncateTableExpr,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
task: DdlTask::new_truncate_table(truncate_table.clone()),
};
self.ddl_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}
pub async fn create_database(
&self,
catalog: &str,
database: &str,
create_if_not_exists: bool,
) -> Result<Output> {
// TODO(weny): considers executing it in the procedures.
let schema_key = SchemaNameKey::new(catalog, database);
let exists = self
.table_metadata_manager
.schema_manager()
.exist(schema_key)
.await
.context(error::TableMetadataManagerSnafu)?;
if exists {
return if create_if_not_exists {
Ok(Output::AffectedRows(1))
} else {
error::SchemaExistsSnafu { name: database }.fail()
};
}
self.table_metadata_manager
.schema_manager()
.create(schema_key, None)
.await
.context(error::TableMetadataManagerSnafu)?;
Ok(Output::AffectedRows(1))
}
}
fn parse_partitions(
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
) -> Result<(Vec<MetaPartition>, Vec<String>)> {
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
// the partition column, and create only one partition.
let partition_columns = find_partition_columns(&partitions)?;
let partition_entries = find_partition_entries(create_table, &partitions, &partition_columns)?;
Ok((
partition_entries
.into_iter()
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
.collect::<std::result::Result<_, _>>()
.context(DeserializePartitionSnafu)?,
partition_columns,
))
}
fn create_table_info(
create_table: &CreateTableExpr,
partition_columns: Vec<String>,
schema_opts: SchemaNameValue,
) -> Result<RawTableInfo> {
let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
let mut column_name_to_index_map = HashMap::new();
for (idx, column) in create_table.column_defs.iter().enumerate() {
let schema =
column_def::try_as_column_schema(column).context(error::InvalidColumnDefSnafu {
column: &column.name,
})?;
let schema = schema.with_time_index(column.name == create_table.time_index);
column_schemas.push(schema);
let _ = column_name_to_index_map.insert(column.name.clone(), idx);
}
let timestamp_index = column_name_to_index_map
.get(&create_table.time_index)
.cloned();
let raw_schema = RawSchema {
column_schemas: column_schemas.clone(),
timestamp_index,
version: 0,
};
let primary_key_indices = create_table
.primary_keys
.iter()
.map(|name| {
column_name_to_index_map
.get(name)
.cloned()
.context(ColumnNotFoundSnafu { msg: name })
})
.collect::<Result<Vec<_>>>()?;
let partition_key_indices = partition_columns
.into_iter()
.map(|col_name| {
column_name_to_index_map
.get(&col_name)
.cloned()
.context(ColumnNotFoundSnafu { msg: col_name })
})
.collect::<Result<Vec<_>>>()?;
let table_options = TableOptions::try_from(&create_table.table_options)
.context(UnrecognizedTableOptionSnafu)?;
let table_options = merge_options(table_options, schema_opts);
let meta = RawTableMeta {
schema: raw_schema,
primary_key_indices,
value_indices: vec![],
engine: create_table.engine.clone(),
next_column_id: column_schemas.len() as u32,
region_numbers: vec![],
engine_options: HashMap::new(),
options: table_options,
created_on: DateTime::default(),
partition_key_indices,
};
let desc = if create_table.desc.is_empty() {
None
} else {
Some(create_table.desc.clone())
};
let table_info = RawTableInfo {
ident: metadata::TableIdent {
// The table id of distributed table is assigned by Meta, set "0" here as a placeholder.
table_id: 0,
version: 0,
},
name: create_table.table_name.clone(),
desc,
catalog_name: create_table.catalog_name.clone(),
schema_name: create_table.schema_name.clone(),
meta,
table_type: TableType::Base,
};
Ok(table_info)
}
fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
let columns = if let Some(partitions) = partitions {
partitions
.column_list
.iter()
.map(|x| x.value.clone())
.collect::<Vec<_>>()
} else {
vec![]
};
Ok(columns)
}
fn find_partition_entries(
create_table: &CreateTableExpr,
partitions: &Option<Partitions>,
partition_columns: &[String],
) -> Result<Vec<Vec<PartitionBound>>> {
let entries = if let Some(partitions) = partitions {
let column_defs = partition_columns
.iter()
.map(|pc| {
create_table
.column_defs
.iter()
.find(|c| &c.name == pc)
// unwrap is safe here because we have checked that partition columns are defined
.unwrap()
})
.collect::<Vec<_>>();
let mut column_name_and_type = Vec::with_capacity(column_defs.len());
for column in column_defs {
let column_name = &column.name;
let data_type = ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(column.data_type).context(ColumnDataTypeSnafu)?,
);
column_name_and_type.push((column_name, data_type));
}
let mut entries = Vec::with_capacity(partitions.entries.len());
for e in partitions.entries.iter() {
let mut values = Vec::with_capacity(e.value_list.len());
for (i, v) in e.value_list.iter().enumerate() {
// indexing is safe here because we have checked that "value_list" and "column_list" are matched in size
let (column_name, data_type) = &column_name_and_type[i];
let v = match v {
SqlValue::Number(n, _) if n == MAX_VALUE => PartitionBound::MaxValue,
_ => PartitionBound::Value(
sql_value_to_value(column_name, data_type, v).context(ParseSqlSnafu)?,
),
};
values.push(v);
}
entries.push(values);
}
entries
} else {
vec![vec![PartitionBound::MaxValue]]
};
Ok(entries)
}
fn merge_options(mut table_opts: TableOptions, schema_opts: SchemaNameValue) -> TableOptions {
table_opts.ttl = table_opts.ttl.or(schema_opts.ttl);
table_opts
}
#[cfg(test)]
mod test {
use session::context::QueryContext;
use sql::dialect::GreptimeDbDialect;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use super::*;
use crate::expr_factory;
#[tokio::test]
async fn test_parse_partitions() {
common_telemetry::init_default_ut_logging();
let cases = [
(
r"
CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
PARTITION BY RANGE COLUMNS (b) (
PARTITION r0 VALUES LESS THAN ('hz'),
PARTITION r1 VALUES LESS THAN ('sh'),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
)
ENGINE=mito",
r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
),
(
r"
CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
PARTITION BY RANGE COLUMNS (b, a) (
PARTITION r0 VALUES LESS THAN ('hz', 10),
PARTITION r1 VALUES LESS THAN ('sh', 20),
PARTITION r2 VALUES LESS THAN (MAXVALUE, MAXVALUE),
)
ENGINE=mito",
r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,
),
];
for (sql, expected) in cases {
let result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap();
match &result[0] {
Statement::CreateTable(c) => {
let expr = expr_factory::create_to_expr(c, QueryContext::arc()).unwrap();
let (partitions, _) = parse_partitions(&expr, c.partitions.clone()).unwrap();
let json = serde_json::to_string(&partitions).unwrap();
assert_eq!(json, expected);
}
_ => unreachable!(),
}
}
}
}

View File

@@ -12,13 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::table_name::TableName;
use common_query::Output;
use partition::manager::PartitionInfo;
use partition::partition::PartitionBound;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::ast::{Ident, Value as SqlValue};
use sql::statements;
use sql::statements::create::{PartitionEntry, Partitions};
use sql::statements::show::{ShowDatabases, ShowTables};
use table::TableRef;
use crate::error::{ExecuteStatementSnafu, Result};
use crate::error::{self, ExecuteStatementSnafu, Result};
use crate::statement::StatementExecutor;
use crate::MAX_VALUE;
impl StatementExecutor {
pub(super) async fn show_databases(
@@ -40,4 +48,64 @@ impl StatementExecutor {
.await
.context(ExecuteStatementSnafu)
}
pub async fn show_create_table(
&self,
table_name: TableName,
table: TableRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let partitions = self
.partition_manager
.find_table_partitions(table.table_info().table_id())
.await
.context(error::FindTablePartitionRuleSnafu {
table_name: &table_name.table_name,
})?;
let partitions = create_partitions_stmt(partitions)?;
query::sql::show_create_table(table, partitions, query_ctx)
.context(error::ExecuteStatementSnafu)
}
}
fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Partitions>> {
if partitions.is_empty() {
return Ok(None);
}
let column_list: Vec<Ident> = partitions[0]
.partition
.partition_columns()
.iter()
.map(|name| name[..].into())
.collect();
let entries = partitions
.into_iter()
.map(|info| {
// Generated the partition name from id
let name = &format!("r{}", info.id.region_number());
let bounds = info.partition.partition_bounds();
let value_list = bounds
.iter()
.map(|b| match b {
PartitionBound::Value(v) => statements::value_to_sql_value(v)
.with_context(|_| error::ConvertSqlValueSnafu { value: v.clone() }),
PartitionBound::MaxValue => Ok(SqlValue::Number(MAX_VALUE.to_string(), false)),
})
.collect::<Result<Vec<_>>>()?;
Ok(PartitionEntry {
name: name[..].into(),
value_list,
})
})
.collect::<Result<Vec<_>>>()?;
Ok(Some(Partitions {
column_list,
entries,
}))
}

View File

@@ -88,13 +88,17 @@ pub(crate) mod test {
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU32, Ordering};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use catalog::remote::MetaKvBackend;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute, Table, TableRoute};
use common_meta::table_name::TableName;
use common_meta::rpc::router::{Region, RegionRoute};
use common_query::prelude::Expr;
use datafusion_expr::expr_fn::{and, binary_expr, col, or};
use datafusion_expr::{lit, Operator};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use meta_client::client::MetaClient;
use meter_core::collect::Collect;
use meter_core::data::{ReadRecord, WriteRecord};
@@ -104,14 +108,52 @@ pub(crate) mod test {
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use partition::partition::{PartitionBound, PartitionDef};
use partition::range::RangePartitionRule;
use partition::route::TableRoutes;
use partition::PartitionRuleRef;
use store_api::storage::RegionNumber;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};
use table::meter_insert_request;
use table::requests::InsertRequest;
use super::*;
fn new_test_table_info(
table_id: u32,
table_name: &str,
region_numbers: impl Iterator<Item = u32>,
) -> TableInfo {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
];
let schema = SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.build()
.unwrap();
let meta = TableMetaBuilder::default()
.schema(Arc::new(schema))
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.region_numbers(region_numbers.collect::<Vec<_>>())
.build()
.unwrap();
TableInfoBuilder::default()
.table_id(table_id)
.table_version(5)
.name(table_name)
.meta(meta)
.build()
.unwrap()
}
/// Create a partition rule manager with two tables, one is partitioned by single column, and
/// the other one is two. The tables are under default catalog and schema.
///
@@ -128,160 +170,145 @@ pub(crate) mod test {
/// PARTITION r2 VALUES LESS THAN (50, 'sh'),
/// PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE),
/// )
pub(crate) async fn create_partition_rule_manager() -> PartitionRuleManagerRef {
let table_routes = Arc::new(TableRoutes::new(Arc::new(MetaClient::default())));
let partition_manager = Arc::new(PartitionRuleManager::new(table_routes.clone()));
pub(crate) async fn create_partition_rule_manager(
kv_backend: KvBackendRef,
) -> PartitionRuleManagerRef {
let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend));
let table_name = TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"one_column_partitioning_table",
);
let table_route = TableRoute::new(
Table {
id: 1,
table_name: table_name.clone(),
table_schema: vec![],
},
vec![
RegionRoute {
region: Region {
id: 3.into(),
name: "r1".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string()],
vec![PartitionBound::Value(10_i32.into())],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
table_metadata_manager
.create_table_metadata(
new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter()).into(),
vec![
RegionRoute {
region: Region {
id: 3.into(),
name: "r1".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string()],
vec![PartitionBound::Value(10_i32.into())],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer::new(3, "")),
follower_peers: vec![],
},
leader_peer: Some(Peer::new(3, "")),
follower_peers: vec![],
},
RegionRoute {
region: Region {
id: 2.into(),
name: "r2".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string()],
vec![PartitionBound::Value(50_i32.into())],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
RegionRoute {
region: Region {
id: 2.into(),
name: "r2".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string()],
vec![PartitionBound::Value(50_i32.into())],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer::new(2, "")),
follower_peers: vec![],
},
leader_peer: Some(Peer::new(2, "")),
follower_peers: vec![],
},
RegionRoute {
region: Region {
id: 1.into(),
name: "r3".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string()],
vec![PartitionBound::MaxValue],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
RegionRoute {
region: Region {
id: 1.into(),
name: "r3".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string()],
vec![PartitionBound::MaxValue],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer::new(1, "")),
follower_peers: vec![],
},
leader_peer: Some(Peer::new(1, "")),
follower_peers: vec![],
},
],
);
table_routes
.insert_table_route(1, Arc::new(table_route))
.await;
],
)
.await
.unwrap();
let table_name = TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"two_column_partitioning_table",
);
let table_route = TableRoute::new(
Table {
id: 2,
table_name: table_name.clone(),
table_schema: vec![],
},
vec![
RegionRoute {
region: Region {
id: 1.into(),
name: "r1".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string(), "b".to_string()],
vec![
PartitionBound::Value(10_i32.into()),
PartitionBound::Value("hz".into()),
],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
table_metadata_manager
.create_table_metadata(
new_test_table_info(2, "table_2", vec![0u32, 1, 2].into_iter()).into(),
vec![
RegionRoute {
region: Region {
id: 1.into(),
name: "r1".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string(), "b".to_string()],
vec![
PartitionBound::Value(10_i32.into()),
PartitionBound::Value("hz".into()),
],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
},
leader_peer: None,
follower_peers: vec![],
},
leader_peer: None,
follower_peers: vec![],
},
RegionRoute {
region: Region {
id: 2.into(),
name: "r2".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string(), "b".to_string()],
vec![
PartitionBound::Value(50_i32.into()),
PartitionBound::Value("sh".into()),
],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
RegionRoute {
region: Region {
id: 2.into(),
name: "r2".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string(), "b".to_string()],
vec![
PartitionBound::Value(50_i32.into()),
PartitionBound::Value("sh".into()),
],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
},
leader_peer: None,
follower_peers: vec![],
},
leader_peer: None,
follower_peers: vec![],
},
RegionRoute {
region: Region {
id: 3.into(),
name: "r3".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string(), "b".to_string()],
vec![PartitionBound::MaxValue, PartitionBound::MaxValue],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
RegionRoute {
region: Region {
id: 3.into(),
name: "r3".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string(), "b".to_string()],
vec![PartitionBound::MaxValue, PartitionBound::MaxValue],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
},
leader_peer: None,
follower_peers: vec![],
},
leader_peer: None,
follower_peers: vec![],
},
],
);
table_routes
.insert_table_route(2, Arc::new(table_route))
.await;
],
)
.await
.unwrap();
partition_manager
}
#[tokio::test(flavor = "multi_thread")]
async fn test_find_partition_rule() {
let partition_manager = create_partition_rule_manager().await;
let partition_manager =
create_partition_rule_manager(Arc::new(MemoryKvBackend::default())).await;
// "one_column_partitioning_table" has id 1
let partition_rule = partition_manager
@@ -325,9 +352,10 @@ pub(crate) mod test {
#[tokio::test(flavor = "multi_thread")]
async fn test_find_regions() {
let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(TableRoutes::new(
Arc::new(MetaClient::default()),
))));
let kv_backend = MetaKvBackend {
client: Arc::new(MetaClient::default()),
};
let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(kv_backend)));
// PARTITION BY RANGE (a) (
// PARTITION r1 VALUES LESS THAN (10),

View File

@@ -24,7 +24,7 @@ common-telemetry = { workspace = true }
futures-util.workspace = true
futures.workspace = true
protobuf = { version = "2", features = ["bytes"] }
raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "2dcaf5beeea3d5de9ec9c7133a2451d00f508f52" }
raft-engine = { workspace = true }
snafu = { version = "0.7", features = ["backtraces"] }
store-api = { workspace = true }
tokio-util.workspace = true

View File

@@ -23,6 +23,9 @@ use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl};
mod backend;
pub mod log_store;
pub use backend::RaftEngineBackend;
pub use raft_engine::Config;
pub mod protos {
include!(concat!(env!("OUT_DIR"), concat!("/", "protos/", "mod.rs")));
}

View File

@@ -28,10 +28,10 @@ use common_meta::rpc::store::{
RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use raft_engine::{Engine, LogBatch};
use raft_engine::{Config, Engine, LogBatch};
use snafu::ResultExt;
use crate::error::RaftEngineSnafu;
use crate::error::{self, RaftEngineSnafu};
pub(crate) const SYSTEM_NAMESPACE: u64 = 0;
@@ -40,6 +40,15 @@ pub struct RaftEngineBackend {
engine: RwLock<Engine>,
}
impl RaftEngineBackend {
pub fn try_open_with_cfg(config: Config) -> error::Result<Self> {
let engine = Engine::open(config).context(RaftEngineSnafu)?;
Ok(Self {
engine: RwLock::new(engine),
})
}
}
#[async_trait::async_trait]
impl TxnService for RaftEngineBackend {
type Error = meta_error::Error;

View File

@@ -23,7 +23,7 @@ mod store;
use api::v1::meta::Role;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::ddl::{DdlExecutor, ExecutorContext};
use common_meta::ddl::{DdlTaskExecutor, ExecutorContext};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest};
@@ -178,7 +178,7 @@ pub struct MetaClient {
}
#[async_trait::async_trait]
impl DdlExecutor for MetaClient {
impl DdlTaskExecutor for MetaClient {
async fn submit_ddl_task(
&self,
_ctx: &ExecutorContext,

View File

@@ -22,7 +22,7 @@ use api::v1::meta::Peer;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_grpc::channel_manager;
use common_meta::ddl::DdlExecutorRef;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::sequence::SequenceRef;
use common_procedure::options::ProcedureConfig;
@@ -198,7 +198,7 @@ pub struct MetaSrv {
procedure_manager: ProcedureManagerRef,
metadata_service: MetadataServiceRef,
mailbox: MailboxRef,
ddl_executor: DdlExecutorRef,
ddl_executor: DdlTaskExecutorRef,
table_metadata_manager: TableMetadataManagerRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
pubsub: Option<(PublishRef, SubscribeManagerRef)>,
@@ -350,7 +350,7 @@ impl MetaSrv {
&self.mailbox
}
pub fn ddl_executor(&self) -> &DdlExecutorRef {
pub fn ddl_executor(&self) -> &DdlTaskExecutorRef {
&self.ddl_executor
}

View File

@@ -54,7 +54,7 @@ use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore};
use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef};
use crate::service::store::memory::MemStore;
use crate::table_creator::MetaSrvTableCreator;
use crate::table_creator::MetaSrvTableMetadataAllocator;
// TODO(fys): try use derive_builder macro
pub struct MetaSrvBuilder {
@@ -366,7 +366,7 @@ fn build_ddl_manager(
},
));
let table_creator = Arc::new(MetaSrvTableCreator::new(
let table_creator = Arc::new(MetaSrvTableMetadataAllocator::new(
selector_ctx.clone(),
selector.clone(),
table_id_sequence.clone(),

View File

@@ -15,7 +15,7 @@
use api::v1::meta::Partition;
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::ddl::{TableCreator, TableCreatorContext};
use common_meta::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceRef;
@@ -27,13 +27,13 @@ use table::metadata::RawTableInfo;
use crate::error::{self, Result, TooManyPartitionsSnafu};
use crate::metasrv::{SelectorContext, SelectorRef};
pub struct MetaSrvTableCreator {
pub struct MetaSrvTableMetadataAllocator {
ctx: SelectorContext,
selector: SelectorRef,
table_id_sequence: SequenceRef,
}
impl MetaSrvTableCreator {
impl MetaSrvTableMetadataAllocator {
pub fn new(
ctx: SelectorContext,
selector: SelectorRef,
@@ -48,10 +48,10 @@ impl MetaSrvTableCreator {
}
#[async_trait::async_trait]
impl TableCreator for MetaSrvTableCreator {
impl TableMetadataAllocator for MetaSrvTableMetadataAllocator {
async fn create(
&self,
ctx: &TableCreatorContext,
ctx: &TableMetadataAllocatorContext,
raw_table_info: &mut RawTableInfo,
partitions: &[Partition],
) -> MetaResult<(TableId, Vec<RegionRoute>)> {

View File

@@ -25,6 +25,12 @@ use table::metadata::TableId;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Table route manager error: {}", source))]
TableRouteManager {
source: common_meta::error::Error,
location: Location,
},
#[snafu(display("Failed to get meta info from cache, error: {}", err_msg))]
GetCache { err_msg: String, location: Location },
@@ -156,6 +162,7 @@ impl ErrorExt for Error {
Error::ConvertScalarValue { .. } => StatusCode::Internal,
Error::FindDatanode { .. } => StatusCode::InvalidArguments,
Error::CreateDefaultToRead { source, .. } => source.status_code(),
Error::TableRouteManager { source, .. } => source.status_code(),
Error::MissingDefaultValue { .. } => StatusCode::Internal,
}
}

View File

@@ -16,8 +16,10 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::v1::region::{DeleteRequest, InsertRequest};
use common_meta::key::table_route::TableRouteManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::TableRoute;
use common_meta::rpc::router::{convert_to_region_map, RegionRoutes};
use common_query::prelude::Expr;
use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
use datatypes::prelude::Value;
@@ -29,7 +31,6 @@ use crate::columns::RangeColumnsPartitionRule;
use crate::error::{FindLeaderSnafu, Result};
use crate::partition::{PartitionBound, PartitionDef, PartitionExpr};
use crate::range::RangePartitionRule;
use crate::route::TableRoutes;
use crate::splitter::{DeleteRequestSplits, InsertRequestSplits, RowSplitter};
use crate::{error, PartitionRuleRef};
@@ -47,7 +48,7 @@ pub type PartitionRuleManagerRef = Arc<PartitionRuleManager>;
/// - values (in case of insertion)
/// - filters (in case of select, deletion and update)
pub struct PartitionRuleManager {
table_routes: Arc<TableRoutes>,
table_route_manager: TableRouteManager,
}
#[derive(Debug)]
@@ -56,25 +57,23 @@ pub struct PartitionInfo {
pub partition: PartitionDef,
}
#[async_trait::async_trait]
impl TableRouteCacheInvalidator for PartitionRuleManager {
async fn invalidate_table_route(&self, table: TableId) {
self.table_routes.invalidate_table_route(table).await
}
}
impl PartitionRuleManager {
pub fn new(table_routes: Arc<TableRoutes>) -> Self {
Self { table_routes }
}
pub fn table_routes(&self) -> &TableRoutes {
self.table_routes.as_ref()
pub fn new(kv_backend: KvBackendRef) -> Self {
Self {
table_route_manager: TableRouteManager::new(kv_backend),
}
}
/// Find table route of given table name.
pub async fn find_table_route(&self, table: TableId) -> Result<Arc<TableRoute>> {
self.table_routes.get_route(table).await
pub async fn find_table_route(&self, table_id: TableId) -> Result<RegionRoutes> {
let route = self
.table_route_manager
.get(table_id)
.await
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?;
Ok(RegionRoutes(route.region_routes))
}
/// Find datanodes of corresponding regions of given table.
@@ -83,15 +82,19 @@ impl PartitionRuleManager {
table_id: TableId,
regions: Vec<RegionNumber>,
) -> Result<HashMap<Peer, Vec<RegionNumber>>> {
let route = self.table_routes.get_route(table_id).await?;
let route = self
.table_route_manager
.get(table_id)
.await
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?;
let mut datanodes = HashMap::with_capacity(regions.len());
let region_map = convert_to_region_map(&route.region_routes);
for region in regions.iter() {
let datanode = route
.find_region_leader(*region)
.context(error::FindDatanodeSnafu {
table_id,
region: *region,
})?;
let datanode = *region_map.get(region).context(error::FindDatanodeSnafu {
table_id,
region: *region,
})?;
datanodes
.entry(datanode.clone())
.or_insert_with(Vec::new)
@@ -102,8 +105,14 @@ impl PartitionRuleManager {
/// Find all leader peers of given table.
pub async fn find_table_region_leaders(&self, table_id: TableId) -> Result<Vec<Peer>> {
let route = self.table_routes.get_route(table_id).await?;
let route = self
.table_route_manager
.get(table_id)
.await
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?;
let mut peers = Vec::with_capacity(route.region_routes.len());
for peer in &route.region_routes {
peers.push(peer.leader_peer.clone().with_context(|| FindLeaderSnafu {
region_id: peer.region.id,
@@ -115,7 +124,12 @@ impl PartitionRuleManager {
}
pub async fn find_table_partitions(&self, table_id: TableId) -> Result<Vec<PartitionInfo>> {
let route = self.table_routes.get_route(table_id).await?;
let route = self
.table_route_manager
.get(table_id)
.await
.context(error::TableRouteManagerSnafu)?
.context(error::FindTableRoutesSnafu { table_id })?;
ensure!(
!route.region_routes.is_empty(),
error::FindTableRoutesSnafu { table_id }