refactor: remove datanode instance (#2342)

* pass nextest

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove deadcode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename region_alive_keepers

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-09-06 21:24:08 -05:00
parent 1d83c942a9
commit 6215f124f7
22 changed files with 174 additions and 1397 deletions

View File

@@ -20,8 +20,8 @@ use common_config::WalConfig;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig};
use datanode::instance::InstanceRef;
use datanode::region_server::RegionServer;
use datanode::Instance as InstanceRef;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::service_config::{

View File

@@ -18,7 +18,7 @@ use std::path::{Path, PathBuf};
use std::time::Duration;
use common_runtime::error::{Error, Result};
use common_runtime::{BoxedTaskFunction, RepeatedTask, Runtime, TaskFunction};
use common_runtime::{BoxedTaskFunction, RepeatedTask, TaskFunction};
use common_telemetry::{debug, info};
use reqwest::{Client, Response};
use serde::{Deserialize, Serialize};
@@ -49,11 +49,11 @@ impl GreptimeDBTelemetryTask {
GreptimeDBTelemetryTask::Disable
}
pub fn start(&self, runtime: Runtime) -> Result<()> {
pub fn start(&self) -> Result<()> {
match self {
GreptimeDBTelemetryTask::Enable(task) => {
print_anonymous_usage_data_disclaimer();
task.start(runtime)
task.start(common_runtime::bg_runtime())
}
GreptimeDBTelemetryTask::Disable => Ok(()),
}

View File

@@ -23,6 +23,7 @@ use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_config::WalConfig;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
pub use common_procedure::options::ProcedureConfig;
use common_runtime::Runtime;
use common_telemetry::info;
@@ -52,6 +53,7 @@ use tokio::fs;
use crate::error::{
CreateDirSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu,
};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::HeartbeatTask;
use crate::region_server::RegionServer;
use crate::server::Services;
@@ -399,6 +401,7 @@ pub struct Datanode {
services: Option<Services>,
heartbeat_task: Option<HeartbeatTask>,
region_server: RegionServer,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
}
impl Datanode {
@@ -420,7 +423,7 @@ impl Datanode {
.context(RuntimeResourceSnafu)?,
);
let mut region_server = RegionServer::new(query_engine, runtime);
let mut region_server = RegionServer::new(query_engine, runtime.clone());
let log_store = Self::build_log_store(&opts).await?;
let object_store = store::new_object_store(&opts).await?;
let engines = Self::build_store_engines(&opts, log_store, object_store).await?;
@@ -439,12 +442,19 @@ impl Datanode {
}
Mode::Standalone => None,
};
let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
Some(opts.storage.data_home.clone()),
&opts.mode,
opts.enable_telemetry,
)
.await;
Ok(Self {
opts,
services,
heartbeat_task,
region_server,
greptimedb_telemetry_task,
})
}
@@ -453,6 +463,7 @@ impl Datanode {
if let Some(task) = &self.heartbeat_task {
task.start().await?;
}
let _ = self.greptimedb_telemetry_task.start();
self.start_services().await
}
@@ -476,6 +487,7 @@ impl Datanode {
pub async fn shutdown(&self) -> Result<()> {
// We must shutdown services first
self.shutdown_services().await?;
let _ = self.greptimedb_telemetry_task.stop().await;
if let Some(heartbeat_task) = &self.heartbeat_task {
heartbeat_task
.close()

View File

@@ -16,7 +16,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, Peer, RegionStat};
use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, Role};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
@@ -24,7 +25,8 @@ use common_meta::heartbeat::handler::{
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info, trace, warn};
use meta_client::client::{HeartbeatSender, MetaClient};
use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder};
use meta_client::MetaClientOptions;
use snafu::{OptionExt, ResultExt};
use tokio::sync::mpsc;
use tokio::time::Instant;
@@ -35,7 +37,6 @@ use crate::datanode::DatanodeOptions;
use crate::error::{
self, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, Result,
};
use crate::instance::new_metasrv_client;
use crate::region_server::RegionServer;
pub(crate) mod handler;
@@ -306,6 +307,39 @@ fn resolve_addr(bind_addr: &str, hostname_addr: &Option<String>) -> String {
}
}
/// Create metasrv client instance and spawn heartbeat loop.
pub async fn new_metasrv_client(
node_id: u64,
meta_config: &MetaClientOptions,
) -> Result<MetaClient> {
let cluster_id = 0; // TODO(hl): read from config
let member_id = node_id;
let config = ChannelConfig::new()
.timeout(Duration::from_millis(meta_config.timeout_millis))
.connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis))
.tcp_nodelay(meta_config.tcp_nodelay);
let channel_manager = ChannelManager::with_config(config);
let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Datanode)
.enable_heartbeat()
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.build();
meta_client
.start(&meta_config.metasrv_addrs)
.await
.context(MetaClientInitSnafu)?;
// required only when the heartbeat_client is enabled
meta_client
.ask_leader()
.await
.context(MetaClientInitSnafu)?;
Ok(meta_client)
}
#[cfg(test)]
mod tests {
#[test]

View File

@@ -1,419 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::{fs, path};
use api::v1::meta::Role;
use catalog::local::MemoryCatalogManager;
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_config::WalConfig;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::store::state_store::ObjectStateStore;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::{debug, info};
use file_table_engine::engine::immutable::ImmutableFileTableEngine;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOptions;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::{util, ObjectStore};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use servers::Mode;
use session::context::QueryContextBuilder;
use snafu::prelude::*;
use storage::compaction::{CompactionHandler, CompactionSchedulerRef};
use storage::config::EngineConfig as StorageEngineConfig;
use storage::scheduler::{LocalScheduler, SchedulerConfig};
use storage::EngineImpl;
use store_api::logstore::LogStore;
use store_api::path_utils::{CLUSTER_DIR, WAL_DIR};
use table::engine::manager::MemoryTableEngineManager;
use table::engine::{TableEngine, TableEngineProcedureRef};
use table::requests::FlushTableRequest;
use table::table::TableIdProviderRef;
use crate::datanode::{DatanodeOptions, ProcedureConfig};
use crate::error::{
self, CatalogSnafu, IncorrectInternalStateSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu,
MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result,
ShutdownInstanceSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu,
};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::HeartbeatTask;
use crate::row_inserter::RowInserter;
use crate::sql::{SqlHandler, SqlRequest};
use crate::store;
mod grpc;
pub mod sql;
pub(crate) type DefaultEngine = MitoEngine<EngineImpl<RaftEngineLogStore>>;
// An abstraction to read/write services.
pub struct Instance {
pub(crate) query_engine: QueryEngineRef,
pub(crate) sql_handler: SqlHandler,
pub(crate) catalog_manager: CatalogManagerRef,
pub(crate) table_id_provider: Option<TableIdProviderRef>,
row_inserter: RowInserter,
procedure_manager: ProcedureManagerRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
}
pub type InstanceRef = Arc<Instance>;
impl Instance {
pub async fn with_opts(
opts: &DatanodeOptions,
plugins: Arc<Plugins>,
) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let meta_client = match opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
let meta_client = new_metasrv_client(
opts.node_id.context(MissingNodeIdSnafu)?,
opts.meta_client_options
.as_ref()
.context(MissingMetasrvOptsSnafu)?,
)
.await?;
Some(Arc::new(meta_client))
}
};
let compaction_scheduler = create_compaction_scheduler(opts);
Self::new(opts, meta_client, compaction_scheduler, plugins).await
}
fn build_heartbeat_task(
opts: &DatanodeOptions,
meta_client: Option<Arc<MetaClient>>,
) -> Result<Option<HeartbeatTask>> {
Ok(match opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
let _node_id = opts.node_id.context(MissingNodeIdSnafu)?;
let _meta_client = meta_client.context(IncorrectInternalStateSnafu {
state: "meta client is not provided when building heartbeat task",
})?;
let _handlers_executor =
HandlerGroupExecutor::new(vec![Arc::new(ParseMailboxMessageHandler)]);
todo!("remove this method")
}
})
}
pub(crate) async fn new(
opts: &DatanodeOptions,
meta_client: Option<Arc<MetaClient>>,
compaction_scheduler: CompactionSchedulerRef<RaftEngineLogStore>,
plugins: Arc<Plugins>,
) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let data_home = util::normalize_dir(&opts.storage.data_home);
info!("The working home directory is: {}", data_home);
let object_store = store::new_object_store(opts).await?;
let log_store = Arc::new(create_log_store(&data_home, opts.wal.clone()).await?);
let mito_engine = Arc::new(DefaultEngine::new(
TableEngineConfig {
compress_manifest: opts.storage.manifest.compress,
},
EngineImpl::new(
StorageEngineConfig::from(opts),
log_store.clone(),
object_store.clone(),
compaction_scheduler,
)
.unwrap(),
object_store.clone(),
));
let immutable_file_engine = Arc::new(ImmutableFileTableEngine::new(
file_table_engine::config::EngineConfig::default(),
object_store.clone(),
));
let engine_procedures = HashMap::from([
(
mito_engine.name().to_string(),
mito_engine.clone() as TableEngineProcedureRef,
),
(
immutable_file_engine.name().to_string(),
immutable_file_engine.clone() as TableEngineProcedureRef,
),
]);
let engine_manager = Arc::new(
MemoryTableEngineManager::with(vec![
mito_engine.clone(),
immutable_file_engine.clone(),
])
.with_engine_procedures(engine_procedures),
);
// create remote catalog manager
let (catalog_manager, table_id_provider) = match opts.mode {
Mode::Standalone => {
let catalog = Arc::new(
catalog::local::LocalCatalogManager::try_new(engine_manager.clone())
.await
.context(CatalogSnafu)?,
);
(
catalog.clone() as CatalogManagerRef,
Some(catalog as TableIdProviderRef),
)
}
Mode::Distributed => (
MemoryCatalogManager::with_default_setup() as CatalogManagerRef,
None,
),
};
let factory =
QueryEngineFactory::new_with_plugins(catalog_manager.clone(), None, false, plugins);
let query_engine = factory.query_engine();
let procedure_manager = create_procedure_manager(
opts.node_id.unwrap_or(0),
&ProcedureConfig::default(),
object_store,
)
.await?;
let sql_handler = SqlHandler::new(
engine_manager.clone(),
catalog_manager.clone(),
procedure_manager.clone(),
);
// Register all procedures.
// Register procedures of the mito engine.
mito_engine.register_procedure_loaders(&*procedure_manager);
// Register procedures of the file table engine.
immutable_file_engine.register_procedure_loaders(&*procedure_manager);
// Register procedures in table-procedure crate.
table_procedure::register_procedure_loaders(
catalog_manager.clone(),
mito_engine.clone(),
mito_engine.clone(),
&*procedure_manager,
);
let row_inserter = RowInserter::new(catalog_manager.clone());
let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
Some(opts.storage.data_home.clone()),
&opts.mode,
opts.enable_telemetry,
)
.await;
let instance = Arc::new(Self {
query_engine: query_engine.clone(),
sql_handler,
catalog_manager: catalog_manager.clone(),
table_id_provider,
row_inserter,
procedure_manager,
greptimedb_telemetry_task,
});
let heartbeat_task = Instance::build_heartbeat_task(opts, meta_client)?;
Ok((instance, heartbeat_task))
}
pub async fn start(&self) -> Result<()> {
self.catalog_manager
.start()
.await
.context(NewCatalogSnafu)?;
// Recover procedures after the catalog manager is started, so we can
// ensure we can access all tables from the catalog manager.
self.procedure_manager
.recover()
.await
.context(RecoverProcedureSnafu)?;
self.procedure_manager
.start()
.context(StartProcedureManagerSnafu)?;
let _ = self
.greptimedb_telemetry_task
.start(common_runtime::bg_runtime())
.map_err(|e| {
debug!("Failed to start greptimedb telemetry task: {}", e);
});
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
self.procedure_manager
.stop()
.await
.context(StopProcedureManagerSnafu)?;
self.flush_tables().await?;
self.sql_handler
.close()
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)
}
pub async fn flush_tables(&self) -> Result<()> {
info!("going to flush all schemas under {DEFAULT_CATALOG_NAME}");
let schema_list = self
.catalog_manager
.schema_names(DEFAULT_CATALOG_NAME)
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?;
let flush_requests = schema_list
.into_iter()
.map(|schema_name| {
SqlRequest::FlushTable(FlushTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name,
table_name: None,
region_number: None,
wait: Some(true),
})
})
.collect::<Vec<_>>();
let flush_result =
futures::future::try_join_all(flush_requests.into_iter().map(|request| {
self.sql_handler
.execute(request, QueryContextBuilder::default().build())
}))
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu);
info!("Flushed all tables result: {}", flush_result.is_ok());
let _ = flush_result?;
Ok(())
}
pub fn sql_handler(&self) -> &SqlHandler {
&self.sql_handler
}
pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
pub fn query_engine(&self) -> QueryEngineRef {
self.query_engine.clone()
}
}
fn create_compaction_scheduler<S: LogStore>(opts: &DatanodeOptions) -> CompactionSchedulerRef<S> {
let config = SchedulerConfig::from(opts);
let handler = CompactionHandler::default();
let scheduler = LocalScheduler::new(config, handler);
Arc::new(scheduler)
}
/// Create metasrv client instance and spawn heartbeat loop.
pub async fn new_metasrv_client(
node_id: u64,
meta_config: &MetaClientOptions,
) -> Result<MetaClient> {
let cluster_id = 0; // TODO(hl): read from config
let member_id = node_id;
let config = ChannelConfig::new()
.timeout(Duration::from_millis(meta_config.timeout_millis))
.connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis))
.tcp_nodelay(meta_config.tcp_nodelay);
let channel_manager = ChannelManager::with_config(config);
let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Datanode)
.enable_heartbeat()
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.build();
meta_client
.start(&meta_config.metasrv_addrs)
.await
.context(MetaClientInitSnafu)?;
// required only when the heartbeat_client is enabled
meta_client
.ask_leader()
.await
.context(MetaClientInitSnafu)?;
Ok(meta_client)
}
pub(crate) async fn create_log_store(
data_home: &str,
wal_config: WalConfig,
) -> Result<RaftEngineLogStore> {
let wal_dir = format!("{}{WAL_DIR}", data_home);
// create WAL directory
fs::create_dir_all(path::Path::new(&wal_dir))
.context(error::CreateDirSnafu { dir: &wal_dir })?;
info!(
"Creating logstore with config: {:?} and storage path: {}",
wal_config, &wal_dir
);
let logstore = RaftEngineLogStore::try_new(wal_dir, wal_config)
.await
.map_err(Box::new)
.context(OpenLogStoreSnafu)?;
Ok(logstore)
}
pub(crate) async fn create_procedure_manager(
datanode_id: u64,
procedure_config: &ProcedureConfig,
object_store: ObjectStore,
) -> Result<ProcedureManagerRef> {
info!(
"Creating procedure manager with config: {:?}",
procedure_config
);
let state_store = Arc::new(ObjectStateStore::new(object_store));
let dn_store_path = format!("{CLUSTER_DIR}dn-{datanode_id}/");
info!("The datanode internal storage path is: {}", dn_store_path);
let manager_config = ManagerConfig {
parent_path: dn_store_path,
max_retry_times: procedure_config.max_retry_times,
retry_delay: procedure_config.retry_delay,
..Default::default()
};
Ok(Arc::new(LocalManager::new(manager_config, state_store)))
}

View File

@@ -225,56 +225,6 @@ impl Instance {
}
}
// TODO(LFC): Refactor consideration: move this function to some helper mod,
// could be done together or after `TableReference`'s refactoring, when issue #559 is resolved.
/// Converts maybe fully-qualified table name (`<catalog>.<schema>.<table>`) to tuple.
pub fn table_idents_to_full_name(
obj_name: &ObjectName,
query_ctx: QueryContextRef,
) -> Result<(String, String, String)> {
match &obj_name.0[..] {
[table] => Ok((
query_ctx.current_catalog().to_owned(),
query_ctx.current_schema().to_owned(),
table.value.clone(),
)),
[schema, table] => Ok((
query_ctx.current_catalog().to_owned(),
schema.value.clone(),
table.value.clone(),
)),
[catalog, schema, table] => Ok((
catalog.value.clone(),
schema.value.clone(),
table.value.clone(),
)),
_ => error::InvalidSqlSnafu {
msg: format!(
"expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {obj_name}",
),
}.fail(),
}
}
pub fn idents_to_full_database_name(
obj_name: &ObjectName,
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
match &obj_name.0[..] {
[database] => Ok((
query_ctx.current_catalog().to_owned(),
database.value.clone(),
)),
[catalog, database] => Ok((catalog.value.clone(), database.value.clone())),
_ => error::InvalidSqlSnafu {
msg: format!(
"expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
),
}
.fail(),
}
}
#[async_trait]
impl SqlStatementExecutor for Instance {
async fn execute_sql(

View File

@@ -15,20 +15,31 @@
#![feature(assert_matches)]
#![feature(trait_upcasting)]
use query::query_engine::SqlStatementExecutor;
pub mod alive_keeper;
pub mod datanode;
pub mod error;
mod greptimedb_telemetry;
pub mod heartbeat;
pub mod instance;
pub mod metrics;
#[cfg(any(test, feature = "testing"))]
mod mock;
pub mod region_server;
mod row_inserter;
pub mod server;
pub mod sql;
mod store;
#[cfg(test)]
#[allow(dead_code)]
mod tests;
// TODO(ruihang): remove this
pub struct Instance;
#[async_trait::async_trait]
impl SqlStatementExecutor for Instance {
async fn execute_sql(
&self,
_stmt: sql::statements::statement::Statement,
_query_ctx: session::context::QueryContextRef,
) -> query::error::Result<common_query::Output> {
unreachable!()
}
}

View File

@@ -1,70 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::meta::Role;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_srv::mocks::MockInfo;
use storage::compaction::noop::NoopCompactionScheduler;
use crate::datanode::DatanodeOptions;
use crate::error::Result;
use crate::heartbeat::HeartbeatTask;
use crate::instance::{Instance, InstanceRef};
impl Instance {
pub async fn with_mock_meta_client(
opts: &DatanodeOptions,
) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let mock_info = meta_srv::mocks::mock_with_memstore().await;
Self::with_mock_meta_server(opts, mock_info).await
}
pub async fn with_mock_meta_server(
opts: &DatanodeOptions,
meta_srv: MockInfo,
) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
Instance::new(
opts,
Some(meta_client),
compaction_scheduler,
Default::default(),
)
.await
}
}
async fn mock_meta_client(mock_info: MockInfo, node_id: u64) -> MetaClient {
let MockInfo {
server_addr,
channel_manager,
..
} = mock_info;
let id = (1000u64, 2000u64);
let mut meta_client = MetaClientBuilder::new(id.0, node_id, Role::Datanode)
.enable_heartbeat()
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.build();
meta_client.start(&[&server_addr]).await.unwrap();
// // required only when the heartbeat_client is enabled
meta_client.ask_leader().await.unwrap();
meta_client
}

View File

@@ -1,143 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::helper;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{RowInsertRequest, RowInsertRequests};
use catalog::CatalogManagerRef;
use common_query::Output;
use datatypes::data_type::{ConcreteDataType, DataType};
use futures_util::future;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::requests::InsertRequest;
use crate::error::{
CatalogSnafu, ColumnDataTypeSnafu, CreateVectorSnafu, InsertSnafu, InvalidInsertRowLenSnafu,
JoinTaskSnafu, Result, TableNotFoundSnafu,
};
pub struct RowInserter {
catalog_manager: CatalogManagerRef,
}
impl RowInserter {
pub fn new(catalog_manager: CatalogManagerRef) -> Self {
Self { catalog_manager }
}
pub async fn handle_inserts(
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let insert_tasks = requests.inserts.into_iter().map(|insert| {
let catalog_manager = self.catalog_manager.clone();
let catalog_name = ctx.current_catalog().to_owned();
let schema_name = ctx.current_schema().to_owned();
let table_name = insert.table_name.clone();
let insert_task = async move {
let Some(request) =
convert_to_table_insert_request(&catalog_name, &schema_name, insert)?
else {
// empty data
return Ok(0usize);
};
let table = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format!("{catalog_name}.{schema_name}.{table_name}"),
})?;
table.insert(request).await.with_context(|_| InsertSnafu {
table_name: format!("{catalog_name}.{schema_name}.{table_name}"),
})
};
common_runtime::spawn_write(insert_task)
});
let results = future::try_join_all(insert_tasks)
.await
.context(JoinTaskSnafu)?;
let affected_rows = results.into_iter().sum::<Result<usize>>()?;
Ok(Output::AffectedRows(affected_rows))
}
}
fn convert_to_table_insert_request(
catalog_name: &str,
schema_name: &str,
request: RowInsertRequest,
) -> Result<Option<InsertRequest>> {
let table_name = request.table_name;
let region_number = request.region_number;
let Some(rows) = request.rows else {
return Ok(None);
};
let schema = rows.schema;
let rows = rows.rows;
let num_columns = schema.len();
let num_rows = rows.len();
if num_rows == 0 || num_columns == 0 {
return Ok(None);
}
let mut columns_values = Vec::with_capacity(num_columns);
for column_schema in schema {
let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(column_schema.datatype)
.context(ColumnDataTypeSnafu)?
.into();
let mutable_vector = datatype.create_mutable_vector(num_rows);
columns_values.push((column_schema.column_name, mutable_vector));
}
for row in rows {
ensure!(
row.values.len() == num_columns,
InvalidInsertRowLenSnafu {
table_name: format!("{catalog_name}.{schema_name}.{table_name}"),
expected: num_columns,
actual: row.values.len(),
}
);
for ((_, mutable_vector), value) in columns_values.iter_mut().zip(row.values.iter()) {
mutable_vector
.try_push_value_ref(helper::pb_value_to_value_ref(value))
.context(CreateVectorSnafu)?;
}
}
let columns_values = columns_values
.into_iter()
.map(|(k, mut v)| (k, v.to_vector()))
.collect();
let insert_request = InsertRequest {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name,
columns_values,
region_number,
};
Ok(Some(insert_request))
}

View File

@@ -28,8 +28,6 @@ use crate::error::{
};
use crate::region_server::RegionServer;
pub mod grpc;
/// All rpc services.
pub struct Services {
grpc_server: GrpcServer,

View File

@@ -1,321 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, TruncateTableExpr};
use common_catalog::consts::IMMUTABLE_FILE_ENGINE;
use common_catalog::format_full_table_name;
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
use common_query::Output;
use common_telemetry::info;
use session::context::QueryContextRef;
use snafu::prelude::*;
use table::requests::{DropTableRequest, TruncateTableRequest};
use crate::error::{
AlterExprToRequestSnafu, BumpTableIdSnafu, CatalogSnafu, CreateExprToRequestSnafu,
IncorrectInternalStateSnafu, Result, TableNotFoundSnafu,
};
use crate::instance::Instance;
use crate::sql::SqlRequest;
impl Instance {
/// Handle gRPC create table requests.
pub(crate) async fn handle_create(
&self,
expr: CreateTableExpr,
ctx: QueryContextRef,
) -> Result<Output> {
let table_name = format!(
"{}.{}.{}",
expr.catalog_name, expr.schema_name, expr.table_name
);
// TODO(LFC): Revisit table id related feature, add more tests.
// Also merge this mod with mod instance::grpc.
// Respect CreateExpr's table id and region ids if present, or allocate table id
// from local table id provider and set region id to 0.
let table_id = if let Some(table_id) = &expr.table_id {
info!(
"Creating table {table_name} with table id {} from Frontend",
table_id.id
);
table_id.id
} else {
let provider =
self.table_id_provider
.as_ref()
.context(IncorrectInternalStateSnafu {
state: "Table id provider absent in standalone mode",
})?;
let table_id = provider.next_table_id().await.context(BumpTableIdSnafu)?;
info!("Creating table {table_name} with table id {table_id} from TableIdProvider");
table_id
};
let require_time_index = expr.engine != IMMUTABLE_FILE_ENGINE;
let request = create_expr_to_request(table_id, expr, require_time_index)
.context(CreateExprToRequestSnafu)?;
self.sql_handler()
.execute(SqlRequest::CreateTable(request), ctx)
.await
}
pub(crate) async fn handle_alter(
&self,
expr: AlterExpr,
ctx: QueryContextRef,
) -> Result<Output> {
let table_id = self
.catalog_manager
.table(&expr.catalog_name, &expr.schema_name, &expr.table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(
&expr.catalog_name,
&expr.schema_name,
&expr.table_name,
),
})?
.table_info()
.table_id();
let request = alter_expr_to_request(table_id, expr).context(AlterExprToRequestSnafu)?;
self.sql_handler()
.execute(SqlRequest::Alter(request), ctx)
.await
}
pub(crate) async fn handle_drop_table(
&self,
expr: DropTableExpr,
ctx: QueryContextRef,
) -> Result<Output> {
let table = self
.catalog_manager
.table(&expr.catalog_name, &expr.schema_name, &expr.table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(
&expr.catalog_name,
&expr.schema_name,
&expr.table_name,
),
})?;
let req = DropTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
table_id: table.table_info().ident.table_id,
};
self.sql_handler()
.execute(SqlRequest::DropTable(req), ctx)
.await
}
pub(crate) async fn handle_truncate_table(
&self,
expr: TruncateTableExpr,
ctx: QueryContextRef,
) -> Result<Output> {
let table = self
.catalog_manager
.table(&expr.catalog_name, &expr.schema_name, &expr.table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(
&expr.catalog_name,
&expr.schema_name,
&expr.table_name,
),
})?;
let req = TruncateTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
table_id: table.table_info().ident.table_id,
};
self.sql_handler()
.execute(SqlRequest::TruncateTable(req), ctx)
.await
}
}
#[cfg(test)]
mod tests {
use api::v1::{column_def, ColumnDataType, ColumnDef, SemanticType, TableId};
use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE};
use common_grpc_expr::create_table_schema;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema};
use datatypes::value::Value;
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn test_create_expr_to_request() {
common_telemetry::init_default_ut_logging();
let expr = testing_create_expr();
let request = create_expr_to_request(1024, expr, true).unwrap();
assert_eq!(request.id, MIN_USER_TABLE_ID);
assert_eq!(request.catalog_name, "greptime".to_string());
assert_eq!(request.schema_name, "public".to_string());
assert_eq!(request.table_name, "my-metrics");
assert_eq!(request.desc, Some("blabla little magic fairy".to_string()));
assert_eq!(request.schema, expected_table_schema());
assert_eq!(request.primary_key_indices, vec![1, 0]);
assert!(request.create_if_not_exists);
let mut expr = testing_create_expr();
expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()];
let result = create_expr_to_request(1025, expr, true);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Column `not-exist-column` not found in table `my-metrics`"),
"{}",
err_msg
);
}
#[test]
fn test_create_table_schema() {
let mut expr = testing_create_expr();
let schema = create_table_schema(&expr, true).unwrap();
assert_eq!(schema, expected_table_schema());
expr.time_index = "not-exist-column".to_string();
let result = create_table_schema(&expr, true);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Missing timestamp column"),
"actual: {err_msg}",
);
}
#[test]
fn test_create_column_schema() {
let column_def = ColumnDef {
name: "a".to_string(),
data_type: 1024,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Tag as i32,
};
let result = column_def::try_as_column_schema(&column_def);
assert!(matches!(
result.unwrap_err(),
api::error::Error::UnknownColumnDataType { .. }
));
let column_def = ColumnDef {
name: "a".to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Tag as i32,
};
let column_schema = column_def::try_as_column_schema(&column_def).unwrap();
assert_eq!(column_schema.name, "a");
assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype());
assert!(column_schema.is_nullable());
let default_constraint = ColumnDefaultConstraint::Value(Value::from("default value"));
let column_def = ColumnDef {
name: "a".to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: default_constraint.clone().try_into().unwrap(),
semantic_type: SemanticType::Tag as i32,
};
let column_schema = column_def::try_as_column_schema(&column_def).unwrap();
assert_eq!(column_schema.name, "a");
assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype());
assert!(column_schema.is_nullable());
assert_eq!(
default_constraint,
*column_schema.default_constraint().unwrap()
);
}
fn testing_create_expr() -> CreateTableExpr {
let column_defs = vec![
ColumnDef {
name: "host".to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Tag as i32,
},
ColumnDef {
name: "ts".to_string(),
data_type: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Timestamp as i32,
},
ColumnDef {
name: "cpu".to_string(),
data_type: ColumnDataType::Float32 as i32,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
},
ColumnDef {
name: "memory".to_string(),
data_type: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
},
];
CreateTableExpr {
catalog_name: "".to_string(),
schema_name: "".to_string(),
table_name: "my-metrics".to_string(),
desc: "blabla little magic fairy".to_string(),
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options: Default::default(),
table_id: Some(TableId {
id: MIN_USER_TABLE_ID,
}),
region_numbers: vec![0],
engine: MITO_ENGINE.to_string(),
}
}
fn expected_table_schema() -> RawSchema {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
];
RawSchema::new(column_schemas)
}
}

View File

@@ -1,140 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_telemetry::error;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::engine::manager::TableEngineManagerRef;
use table::engine::{TableEngineProcedureRef, TableEngineRef, TableReference};
use table::requests::*;
use table::TableRef;
use crate::error::{
self, CloseTableEngineSnafu, EngineProcedureNotFoundSnafu, Result, TableEngineNotFoundSnafu,
TableNotFoundSnafu,
};
use crate::instance::sql::table_idents_to_full_name;
mod alter;
mod compact_table;
mod create;
mod create_external;
mod drop_table;
mod flush_table;
pub(crate) mod insert;
mod truncate_table;
#[derive(Debug)]
pub enum SqlRequest {
CreateTable(CreateTableRequest),
CreateDatabase(CreateDatabaseRequest),
Alter(AlterTableRequest),
DropTable(DropTableRequest),
FlushTable(FlushTableRequest),
CompactTable(CompactTableRequest),
TruncateTable(TruncateTableRequest),
}
// Handler to execute SQL except query
#[derive(Clone)]
pub struct SqlHandler {
table_engine_manager: TableEngineManagerRef,
catalog_manager: CatalogManagerRef,
procedure_manager: ProcedureManagerRef,
}
impl SqlHandler {
pub fn new(
table_engine_manager: TableEngineManagerRef,
catalog_manager: CatalogManagerRef,
procedure_manager: ProcedureManagerRef,
) -> Self {
Self {
table_engine_manager,
catalog_manager,
procedure_manager,
}
}
pub async fn execute(&self, request: SqlRequest, query_ctx: QueryContextRef) -> Result<Output> {
let result = match request {
SqlRequest::CreateTable(req) => self.create_table(req).await,
SqlRequest::CreateDatabase(req) => self.create_database(req, query_ctx.clone()).await,
SqlRequest::Alter(req) => self.alter_table(req).await,
SqlRequest::DropTable(req) => self.drop_table(req).await,
SqlRequest::FlushTable(req) => self.flush_table(req).await,
SqlRequest::CompactTable(req) => self.compact_table(req).await,
SqlRequest::TruncateTable(req) => self.truncate_table(req).await,
};
if let Err(e) = &result {
error!(e; "{query_ctx}");
}
result
}
pub async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
let TableReference {
catalog,
schema,
table,
} = table_ref;
let table = self
.catalog_manager
.table(catalog, schema, table)
.await
.context(error::CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
Ok(table)
}
pub fn table_engine_manager(&self) -> &TableEngineManagerRef {
&self.table_engine_manager
}
pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
pub fn table_engine(&self, table: TableRef) -> Result<TableEngineRef> {
let engine_name = &table.table_info().meta.engine;
let engine = self
.table_engine_manager
.engine(engine_name)
.context(TableEngineNotFoundSnafu { engine_name })?;
Ok(engine)
}
pub fn engine_procedure(&self, table: TableRef) -> Result<TableEngineProcedureRef> {
let engine_name = &table.table_info().meta.engine;
let engine = self
.table_engine_manager
.engine_procedure(engine_name)
.context(EngineProcedureNotFoundSnafu { engine_name })?;
Ok(engine)
}
pub async fn close(&self) -> Result<()> {
self.table_engine_manager
.close()
.await
.map_err(BoxedError::new)
.context(CloseTableEngineSnafu)
}
}

View File

@@ -15,10 +15,7 @@
use std::any::Any;
use std::sync::Arc;
use api::v1::greptime_request::Request as GrpcRequest;
use api::v1::meta::HeartbeatResponse;
use api::v1::query_request::Query;
use api::v1::QueryRequest;
use async_trait::async_trait;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::FunctionRef;
@@ -31,43 +28,33 @@ use common_meta::instruction::{Instruction, InstructionReply, RegionIdent};
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_runtime::Runtime;
use datatypes::prelude::ConcreteDataType;
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
use query::planner::LogicalPlanner;
use query::query_engine::DescribeResult;
use query::QueryEngine;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use table::engine::manager::TableEngineManagerRef;
use session::context::QueryContextRef;
use table::TableRef;
use test_util::MockInstance;
use tokio::sync::mpsc::{self, Receiver};
use crate::instance::Instance;
use crate::region_server::RegionServer;
pub(crate) mod test_util;
use crate::Instance;
struct HandlerTestGuard {
instance: MockInstance,
instance: Instance,
mailbox: Arc<HeartbeatMailbox>,
rx: Receiver<(MessageMeta, InstructionReply)>,
engine_manager_ref: TableEngineManagerRef,
}
async fn prepare_handler_test(name: &str) -> HandlerTestGuard {
let mock_instance = MockInstance::new(name).await;
let instance = mock_instance.inner();
let engine_manager = instance.sql_handler().table_engine_manager().clone();
async fn prepare_handler_test(_name: &str) -> HandlerTestGuard {
let instance = Instance;
let (tx, rx) = mpsc::channel(8);
let mailbox = Arc::new(HeartbeatMailbox::new(tx));
HandlerTestGuard {
instance: mock_instance,
instance,
mailbox,
rx,
engine_manager_ref: engine_manager,
}
}
@@ -122,43 +109,6 @@ fn open_region_instruction() -> Instruction {
})
}
async fn prepare_table(instance: &Instance) -> TableRef {
test_util::create_test_table(instance, ConcreteDataType::timestamp_millisecond_datatype())
.await
.unwrap()
}
async fn assert_test_table_not_found(instance: &Instance) {
let query = GrpcRequest::Query(QueryRequest {
query: Some(Query::Sql(
"INSERT INTO demo(host, cpu, memory, ts) VALUES \
('host1', 66.6, 1024, 1672201025000),\
('host2', 88.8, 333.3, 1672201026000)"
.to_string(),
)),
});
let output = instance
.do_query(query, QueryContext::arc())
.await
.unwrap_err();
assert_eq!(output.to_string(), "Failed to execute sql, source: Failure during query execution, source: Table not found: greptime.public.demo");
}
async fn assert_test_table_found(instance: &Instance) {
let query = GrpcRequest::Query(QueryRequest {
query: Some(Query::Sql(
"INSERT INTO demo(host, cpu, memory, ts) VALUES \
('host1', 66.6, 1024, 1672201025000),\
('host2', 88.8, 333.3, 1672201026000)"
.to_string(),
)),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
assert!(matches!(output, Output::AffectedRows(2)));
}
pub struct MockQueryEngine;
#[async_trait]

View File

@@ -1,135 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use catalog::RegisterTableRequest;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE,
};
use common_config::WalConfig;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use servers::Mode;
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
use table::requests::{CreateTableRequest, TableOptions};
use table::TableRef;
use crate::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, StorageConfig};
use crate::error::{CreateTableSnafu, Result};
use crate::heartbeat::HeartbeatTask;
use crate::instance::{Instance, InstanceRef};
pub(crate) struct MockInstance {
instance: InstanceRef,
_heartbeat: Option<HeartbeatTask>,
_guard: TestGuard,
}
impl MockInstance {
pub(crate) async fn new(name: &str) -> Self {
let (opts, _guard) = create_tmp_dir_and_datanode_opts(name);
let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
if let Some(task) = heartbeat.as_ref() {
task.start().await.unwrap();
}
MockInstance {
instance,
_guard,
_heartbeat: heartbeat,
}
}
pub(crate) fn inner(&self) -> &Instance {
&self.instance
}
}
struct TestGuard {
_wal_tmp_dir: TempDir,
_data_tmp_dir: TempDir,
}
fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) {
let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}"));
let data_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
let opts = DatanodeOptions {
wal: WalConfig::default(),
storage: StorageConfig {
data_home: data_tmp_dir.path().to_str().unwrap().to_string(),
store: ObjectStoreConfig::File(FileConfig {}),
..Default::default()
},
mode: Mode::Standalone,
..Default::default()
};
(
opts,
TestGuard {
_wal_tmp_dir: wal_tmp_dir,
_data_tmp_dir: data_tmp_dir,
},
)
}
pub(crate) async fn create_test_table(
instance: &Instance,
ts_type: ConcreteDataType,
) -> Result<TableRef> {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ts_type, true).with_time_index(true),
];
let table_name = "demo";
let table_engine: TableEngineRef = instance
.sql_handler()
.table_engine_manager()
.engine(MITO_ENGINE)
.unwrap();
let table = table_engine
.create_table(
&EngineContext::default(),
CreateTableRequest {
id: MIN_USER_TABLE_ID,
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: table_name.to_string(),
desc: Some(" a test table".to_string()),
schema: RawSchema::new(column_schemas),
create_if_not_exists: true,
primary_key_indices: vec![0], // "host" is in primary keys
table_options: TableOptions::default(),
region_numbers: vec![0],
engine: MITO_ENGINE.to_string(),
},
)
.await
.context(CreateTableSnafu { table_name })?;
let req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id: table.table_info().ident.table_id,
table: table.clone(),
};
let _ = instance.catalog_manager.register_table(req).await.unwrap();
Ok(table)
}

View File

@@ -22,7 +22,6 @@ use api::v1::{
};
use common_error::ext::BoxedError;
use common_grpc_expr::util::ColumnExpr;
use datanode::instance::sql::table_idents_to_full_name;
use datatypes::schema::ColumnSchema;
use file_table_engine::table::immutable::ImmutableFileTableOptions;
use query::sql::prepare_immutable_file_table_files_and_schema;
@@ -41,6 +40,7 @@ use crate::error::{
EncodeJsonSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu, InvalidSqlSnafu, NotSupportedSnafu,
ParseSqlSnafu, PrepareImmutableTableSnafu, Result, UnrecognizedTableOptionSnafu,
};
use crate::table::table_idents_to_full_name;
#[derive(Debug, Copy, Clone)]
pub struct CreateExprFactory;

View File

@@ -42,9 +42,8 @@ use common_meta::key::TableMetadataManager;
use common_query::Output;
use common_telemetry::logging::info;
use common_telemetry::{error, timer};
use datanode::instance::sql::table_idents_to_full_name;
use datanode::instance::InstanceRef as DnInstanceRef;
use datanode::region_server::RegionServer;
use datanode::Instance as DnInstanceRef;
use distributed::DistInstance;
use meta_client::client::{MetaClient, MetaClientBuilder};
use partition::manager::PartitionRuleManager;
@@ -75,7 +74,6 @@ use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
use self::distributed::DistRegionRequestHandler;
use self::standalone::StandaloneRegionRequestHandler;
use crate::catalog::FrontendCatalogManager;
use crate::delete::Deleter;
use crate::error::{
@@ -87,11 +85,11 @@ use crate::frontend::FrontendOptions;
use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use crate::heartbeat::HeartbeatTask;
use crate::insert::Inserter;
use crate::instance::standalone::StandaloneGrpcQueryHandler;
use crate::metrics;
use crate::script::ScriptExecutor;
use crate::server::{start_server, ServerHandlers, Services};
use crate::statement::StatementExecutor;
use crate::table::table_idents_to_full_name;
#[async_trait]
pub trait FrontendInstance:
@@ -259,37 +257,38 @@ impl Instance {
}
pub async fn try_new_standalone(
dn_instance: DnInstanceRef,
region_server: RegionServer,
_dn_instance: DnInstanceRef,
_region_server: RegionServer,
) -> Result<Self> {
let catalog_manager = dn_instance.catalog_manager();
let query_engine = dn_instance.query_engine();
let script_executor =
Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);
todo!()
// let catalog_manager = dn_instance.catalog_manager();
// let query_engine = dn_instance.query_engine();
// let script_executor =
// Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?);
let region_request_handler = StandaloneRegionRequestHandler::arc(region_server);
let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
dn_instance.clone(),
region_request_handler.clone(),
));
// let region_request_handler = StandaloneRegionRequestHandler::arc(region_server);
// let statement_executor = Arc::new(StatementExecutor::new(
// catalog_manager.clone(),
// query_engine.clone(),
// dn_instance.clone(),
// region_request_handler.clone(),
// ));
let create_expr_factory = CreateExprFactory;
let grpc_query_handler = StandaloneGrpcQueryHandler::arc(dn_instance.clone());
// let create_expr_factory = CreateExprFactory;
// let grpc_query_handler = StandaloneGrpcQueryHandler::arc(dn_instance.clone());
Ok(Instance {
catalog_manager: catalog_manager.clone(),
script_executor,
create_expr_factory,
statement_executor,
query_engine,
grpc_query_handler,
region_request_handler,
plugins: Default::default(),
servers: Arc::new(HashMap::new()),
heartbeat_task: None,
})
// Ok(Instance {
// catalog_manager: catalog_manager.clone(),
// script_executor,
// create_expr_factory,
// statement_executor,
// query_engine,
// grpc_query_handler,
// region_request_handler,
// plugins: Default::default(),
// servers: Arc::new(HashMap::new()),
// heartbeat_task: None,
// })
}
pub async fn build_servers(&mut self, opts: &FrontendOptions) -> Result<()> {

View File

@@ -42,7 +42,6 @@ use common_meta::table_name::TableName;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::info;
use datanode::instance::sql::table_idents_to_full_name;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use partition::manager::PartitionInfo;
@@ -72,7 +71,7 @@ use crate::error::{
use crate::expr_factory;
use crate::instance::distributed::deleter::DistDeleter;
use crate::instance::distributed::inserter::DistInserter;
use crate::table::DistTable;
use crate::table::{table_idents_to_full_name, DistTable};
const MAX_VALUE: &str = "MAXVALUE";

View File

@@ -35,12 +35,6 @@ use crate::error::{Error, InvokeDatanodeSnafu, InvokeRegionServerSnafu, Result};
pub(crate) struct StandaloneGrpcQueryHandler(GrpcQueryHandlerRef<DatanodeError>);
impl StandaloneGrpcQueryHandler {
pub(crate) fn arc(handler: GrpcQueryHandlerRef<DatanodeError>) -> Arc<Self> {
Arc::new(Self(handler))
}
}
#[async_trait]
impl GrpcQueryHandler for StandaloneGrpcQueryHandler {
type Error = Error;
@@ -58,6 +52,7 @@ pub(crate) struct StandaloneRegionRequestHandler {
}
impl StandaloneRegionRequestHandler {
#[allow(dead_code)]
pub fn arc(region_server: RegionServer) -> Arc<Self> {
Arc::new(Self { region_server })
}

View File

@@ -30,7 +30,6 @@ use common_error::ext::BoxedError;
use common_query::Output;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use datanode::instance::sql::{idents_to_full_database_name, table_idents_to_full_name};
use query::parser::QueryStatement;
use query::plan::LogicalPlan;
use query::query_engine::SqlStatementExecutorRef;
@@ -39,6 +38,7 @@ use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument};
use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
use table::engine::TableReference;
use table::requests::{
CopyDatabaseRequest, CopyDirection, CopyTableRequest, DeleteRequest, InsertRequest,
@@ -47,10 +47,11 @@ use table::TableRef;
use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu,
PlanStatementSnafu, RequestDatanodeSnafu, Result, TableNotFoundSnafu,
InvalidSqlSnafu, PlanStatementSnafu, RequestDatanodeSnafu, Result, TableNotFoundSnafu,
};
use crate::req_convert::{delete, insert};
use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
use crate::table::table_idents_to_full_name;
#[derive(Clone)]
pub struct StatementExecutor {
@@ -289,3 +290,22 @@ fn extract_timestamp(map: &HashMap<String, String>, key: &str) -> Result<Option<
})
.transpose()
}
fn idents_to_full_database_name(
obj_name: &ObjectName,
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
match &obj_name.0[..] {
[database] => Ok((
query_ctx.current_catalog().to_owned(),
database.value.clone(),
)),
[catalog, database] => Ok((catalog.value.clone(), database.value.clone())),
_ => InvalidSqlSnafu {
err_msg: format!(
"expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
),
}
.fail(),
}
}

View File

@@ -14,7 +14,6 @@
use common_error::ext::BoxedError;
use common_query::Output;
use datanode::instance::sql::table_idents_to_full_name;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use sql::statements::describe::DescribeTable;
@@ -23,6 +22,7 @@ use crate::error::{
CatalogSnafu, DescribeStatementSnafu, ExternalSnafu, Result, TableNotFoundSnafu,
};
use crate::statement::StatementExecutor;
use crate::table::table_idents_to_full_name;
impl StatementExecutor {
pub(super) async fn describe_table(

View File

@@ -16,13 +16,15 @@ use std::sync::Arc;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use session::context::QueryContextRef;
use sqlparser::ast::ObjectName;
use store_api::data_source::DataSource;
use store_api::storage::ScanRequest;
use table::metadata::{FilterPushDownType, TableInfoRef};
use table::thin_table::{ThinTable, ThinTableAdapter};
use table::TableRef;
use crate::error::NotSupportedSnafu;
use crate::error::{InvalidSqlSnafu, NotSupportedSnafu, Result};
#[derive(Clone)]
pub struct DistTable;
@@ -38,7 +40,10 @@ impl DistTable {
pub struct DummyDataSource;
impl DataSource for DummyDataSource {
fn get_stream(&self, _request: ScanRequest) -> Result<SendableRecordBatchStream, BoxedError> {
fn get_stream(
&self,
_request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
NotSupportedSnafu {
feat: "get stream from a distributed table",
}
@@ -47,6 +52,37 @@ impl DataSource for DummyDataSource {
}
}
// TODO(LFC): Refactor consideration: move this function to some helper mod,
// could be done together or after `TableReference`'s refactoring, when issue #559 is resolved.
/// Converts maybe fully-qualified table name (`<catalog>.<schema>.<table>`) to tuple.
pub fn table_idents_to_full_name(
obj_name: &ObjectName,
query_ctx: QueryContextRef,
) -> Result<(String, String, String)> {
match &obj_name.0[..] {
[table] => Ok((
query_ctx.current_catalog().to_owned(),
query_ctx.current_schema().to_owned(),
table.value.clone(),
)),
[schema, table] => Ok((
query_ctx.current_catalog().to_owned(),
schema.value.clone(),
table.value.clone(),
)),
[catalog, schema, table] => Ok((
catalog.value.clone(),
schema.value.clone(),
table.value.clone(),
)),
_ => InvalidSqlSnafu {
err_msg: format!(
"expect table name to be <catalog>.<schema>.<table>, <schema>.<table> or <table>, actual: {obj_name}",
),
}.fail(),
}
}
#[cfg(test)]
pub(crate) mod test {
use std::collections::BTreeMap;

View File

@@ -239,9 +239,10 @@ impl MetaSrv {
if let Err(e) = procedure_manager.recover().await {
error!("Failed to recover procedures, error: {e}");
}
let _ = task_handler.start(common_runtime::bg_runtime())
.map_err(|e| {
debug!("Failed to start greptimedb telemetry task, error: {e}");
let _ = task_handler.start().map_err(|e| {
debug!(
"Failed to start greptimedb telemetry task, error: {e}"
);
});
}
LeaderChangeMessage::StepDown(leader) => {