diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 88dd007b4d..800ec601af 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -15,7 +15,7 @@ env: jobs: coverage: if: github.event.pull_request.draft == false - runs-on: ubuntu-latest + runs-on: ubuntu-latest-16-cores timeout-minutes: 60 steps: - uses: actions/checkout@v3 diff --git a/src/catalog/tests/mock.rs b/src/catalog/tests/mock.rs index f41e1da533..cc9f67c7d4 100644 --- a/src/catalog/tests/mock.rs +++ b/src/catalog/tests/mock.rs @@ -13,7 +13,7 @@ use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::StringVector; use serde::Serializer; -use table::engine::{EngineContext, TableEngine}; +use table::engine::{EngineContext, TableEngine, TableReference}; use table::metadata::TableId; use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; use table::test_util::MemTable; @@ -175,12 +175,28 @@ impl TableEngine for MockTableEngine { unimplemented!() } - fn get_table(&self, _ctx: &EngineContext, name: &str) -> table::Result> { - futures::executor::block_on(async { Ok(self.tables.read().await.get(name).cloned()) }) + fn get_table<'a>( + &self, + _ctx: &EngineContext, + table_ref: &'a TableReference, + ) -> table::Result> { + futures::executor::block_on(async { + Ok(self + .tables + .read() + .await + .get(&table_ref.to_string()) + .cloned()) + }) } - fn table_exists(&self, _ctx: &EngineContext, name: &str) -> bool { - futures::executor::block_on(async { self.tables.read().await.contains_key(name) }) + fn table_exists<'a>(&self, _ctx: &EngineContext, table_ref: &'a TableReference) -> bool { + futures::executor::block_on(async { + self.tables + .read() + .await + .contains_key(&table_ref.to_string()) + }) } async fn drop_table( diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 1b38f98290..95247dd305 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -9,6 +9,7 @@ use frontend::frontend::Mode; use log_store::fs::{config::LogConfig, log::LocalFileLogStore}; use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOpts; +use object_store::layers::LoggingLayer; use object_store::{services::fs::Builder, util, ObjectStore}; use query::query_engine::{QueryEngineFactory, QueryEngineRef}; use snafu::prelude::*; @@ -156,7 +157,9 @@ pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result .build() .context(error::InitBackendSnafu { dir: &data_dir })?; - Ok(ObjectStore::new(accessor)) + let object_store = ObjectStore::new(accessor).layer(LoggingLayer); // Add logging + + Ok(object_store) } /// Create metasrv client instance and spawn heartbeat loop. diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 1dee0a0e27..5e0be36a25 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -4,7 +4,7 @@ use api::v1::{ ObjectExpr, ObjectResult, SelectExpr, }; use async_trait::async_trait; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_insert::insertion_expr_to_request; @@ -152,9 +152,8 @@ impl GrpcQueryHandler for Instance { async fn do_query(&self, query: ObjectExpr) -> servers::error::Result { let object_resp = match query.expr { Some(object_expr::Expr::Insert(insert_expr)) => { - // TODO(dennis): retrieve schema name from DatabaseRequest let catalog_name = DEFAULT_CATALOG_NAME; - let schema_name = DEFAULT_SCHEMA_NAME; + let schema_name = &insert_expr.schema_name; let table_name = &insert_expr.table_name; let expr = insert_expr .expr diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index b3cf4794db..9ff6abf76e 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -5,7 +5,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use snafu::{OptionExt, ResultExt}; use sql::statements::show::{ShowDatabases, ShowTables}; -use table::engine::{EngineContext, TableEngineRef}; +use table::engine::{EngineContext, TableEngineRef, TableReference}; use table::requests::*; use table::TableRef; @@ -54,11 +54,15 @@ impl SqlHandler { } } - pub(crate) fn get_table(&self, table_name: &str) -> Result { + pub(crate) fn get_table<'a>(&self, table_ref: &'a TableReference) -> Result { self.table_engine - .get_table(&EngineContext::default(), table_name) - .context(GetTableSnafu { table_name })? - .context(TableNotFoundSnafu { table_name }) + .get_table(&EngineContext::default(), table_ref) + .with_context(|_| GetTableSnafu { + table_name: table_ref.to_string(), + })? + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + }) } pub(crate) fn get_default_catalog(&self) -> Result { diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index 63d98a40dc..87ca827850 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -1,8 +1,9 @@ +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use snafu::prelude::*; use sql::statements::alter::{AlterTable, AlterTableOperation}; use sql::statements::{column_def_to_schema, table_idents_to_full_name}; -use table::engine::EngineContext; +use table::engine::{EngineContext, TableReference}; use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest}; use crate::error::{self, Result}; @@ -11,15 +12,29 @@ use crate::sql::SqlHandler; impl SqlHandler { pub(crate) async fn alter(&self, req: AlterTableRequest) -> Result { let ctx = EngineContext {}; - let table_name = &req.table_name.clone(); + let catalog_name = req.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME); + let schema_name = req.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME); + let table_name = &req.table_name.to_string(); + let table_ref = TableReference { + catalog: catalog_name, + schema: schema_name, + table: table_name, + }; + + let full_table_name = table_ref.to_string(); + ensure!( - self.table_engine.table_exists(&ctx, table_name), - error::TableNotFoundSnafu { table_name } + self.table_engine.table_exists(&ctx, &table_ref), + error::TableNotFoundSnafu { + table_name: &full_table_name, + } ); self.table_engine .alter_table(&ctx, req) .await - .context(error::AlterTableSnafu { table_name })?; + .context(error::AlterTableSnafu { + table_name: full_table_name, + })?; // Tried in MySQL, it really prints "Affected Rows: 0". Ok(Output::AffectedRows(0)) } diff --git a/src/datanode/src/sql/insert.rs b/src/datanode/src/sql/insert.rs index 556842c514..fd1a7e6fe8 100644 --- a/src/datanode/src/sql/insert.rs +++ b/src/datanode/src/sql/insert.rs @@ -7,6 +7,7 @@ use snafu::OptionExt; use snafu::ResultExt; use sql::ast::Value as SqlValue; use sql::statements::{self, insert::Insert}; +use table::engine::TableReference; use table::requests::*; use crate::error::{ @@ -17,13 +18,19 @@ use crate::sql::{SqlHandler, SqlRequest}; impl SqlHandler { pub(crate) async fn insert(&self, req: InsertRequest) -> Result { - let table_name = &req.table_name.to_string(); - let table = self.get_table(table_name)?; + // FIXME(dennis): table_ref is used in InsertSnafu and the req is consumed + // in `insert`, so we have to clone catalog_name etc. + let table_ref = TableReference { + catalog: &req.catalog_name.to_string(), + schema: &req.schema_name.to_string(), + table: &req.table_name.to_string(), + }; - let affected_rows = table - .insert(req) - .await - .context(InsertSnafu { table_name })?; + let table = self.get_table(&table_ref)?; + + let affected_rows = table.insert(req).await.with_context(|_| InsertSnafu { + table_name: table_ref.to_string(), + })?; Ok(Output::AffectedRows(affected_rows)) } diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index ed0437fa7e..d123540353 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -24,12 +24,13 @@ use crate::tests::test_util::{self, TestGuard}; async fn setup_grpc_server( name: &str, - port: usize, + datanode_port: usize, + frontend_port: usize, ) -> (String, TestGuard, Arc, Arc) { common_telemetry::init_default_ut_logging(); let (mut opts, guard) = test_util::create_tmp_dir_and_datanode_opts(name); - let datanode_grpc_addr = format!("127.0.0.1:{}", port); + let datanode_grpc_addr = format!("127.0.0.1:{}", datanode_port); opts.rpc_addr = datanode_grpc_addr.clone(); let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); instance.start().await.unwrap(); @@ -43,7 +44,7 @@ async fn setup_grpc_server( .unwrap(), ); - let fe_grpc_addr = format!("127.0.0.1:{}", port + 1); + let fe_grpc_addr = format!("127.0.0.1:{}", frontend_port); let fe_opts = FrontendOptions { mode: Standalone, datanode_rpc_addr: datanode_grpc_addr.clone(), @@ -95,7 +96,7 @@ async fn setup_grpc_server( #[tokio::test(flavor = "multi_thread")] async fn test_auto_create_table() { let (addr, _guard, fe_grpc_server, dn_grpc_server) = - setup_grpc_server("auto_create_table", 3991).await; + setup_grpc_server("auto_create_table", 3992, 3993).await; let grpc_client = Client::with_urls(vec![addr]); let db = Database::new("greptime", grpc_client); @@ -162,7 +163,7 @@ fn expect_data() -> (Column, Column, Column, Column) { async fn test_insert_and_select() { common_telemetry::init_default_ut_logging(); let (addr, _guard, fe_grpc_server, dn_grpc_server) = - setup_grpc_server("insert_and_select", 3990).await; + setup_grpc_server("insert_and_select", 3990, 3991).await; let grpc_client = Client::with_urls(vec![addr]); diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 7e5c761082..d75bff672b 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -1,4 +1,4 @@ -use arrow::array::{Int64Array, UInt64Array}; +use arrow::array::{Int64Array, UInt64Array, Utf8Array}; use common_query::Output; use common_recordbatch::util; use datafusion::arrow_print; @@ -64,6 +64,106 @@ async fn test_create_database_and_insert_query() { _ => unreachable!(), } } +#[tokio::test(flavor = "multi_thread")] +async fn test_issue477_same_table_name_in_different_databases() { + common_telemetry::init_default_ut_logging(); + + let (opts, _guard) = + test_util::create_tmp_dir_and_datanode_opts("create_database_and_insert_query"); + let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); + instance.start().await.unwrap(); + + // Create database a and b + let output = instance.execute_sql("create database a").await.unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + let output = instance.execute_sql("create database b").await.unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + // Create table a.demo and b.demo + let output = instance + .execute_sql( + r#"create table a.demo( + host STRING, + ts bigint, + TIME INDEX(ts) +)"#, + ) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let output = instance + .execute_sql( + r#"create table b.demo( + host STRING, + ts bigint, + TIME INDEX(ts) +)"#, + ) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + // Insert different data into a.demo and b.demo + let output = instance + .execute_sql( + r#"insert into a.demo(host, ts) values + ('host1', 1655276557000) + "#, + ) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + let output = instance + .execute_sql( + r#"insert into b.demo(host, ts) values + ('host2',1655276558000) + "#, + ) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + // Query data and assert + assert_query_result( + &instance, + "select host,ts from a.demo order by ts", + 1655276557000, + "host1", + ) + .await; + + assert_query_result( + &instance, + "select host,ts from b.demo order by ts", + 1655276558000, + "host2", + ) + .await; +} + +async fn assert_query_result(instance: &Instance, sql: &str, ts: i64, host: &str) { + let query_output = instance.execute_sql(sql).await.unwrap(); + match query_output { + Output::Stream(s) => { + let batches = util::collect(s).await.unwrap(); + let columns = batches[0].df_recordbatch.columns(); + assert_eq!(2, columns.len()); + assert_eq!( + &Utf8Array::::from_slice(&[host]), + columns[0] + .as_any() + .downcast_ref::>() + .unwrap() + ); + assert_eq!( + &Int64Array::from_slice(&[ts]), + columns[1].as_any().downcast_ref::().unwrap() + ); + } + _ => unreachable!(), + } +} #[tokio::test(flavor = "multi_thread")] async fn test_execute_insert() { diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 6310faa906..2f0aae5438 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -12,14 +12,15 @@ use api::v1::alter_expr::Kind; use api::v1::codec::InsertBatch; use api::v1::object_expr::Expr; use api::v1::{ - insert_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateExpr, - InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult, + admin_expr, insert_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, + CreateExpr, InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult, }; use async_trait::async_trait; use catalog::remote::MetaKvBackend; use catalog::{CatalogManagerRef, CatalogProviderRef, SchemaProviderRef}; use client::admin::{admin_result_to_output, Admin}; use client::{Client, Database, Select}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::{BoxedError, StatusCode}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_query::Output; @@ -155,14 +156,12 @@ impl Instance { Ok(instance) } - // TODO(fys): temporarily hard code - pub fn database(&self) -> Database { - Database::new("greptime", self.client.clone()) + pub fn database(&self, database: &str) -> Database { + Database::new(database, self.client.clone()) } - // TODO(fys): temporarily hard code - pub fn admin(&self) -> Admin { - Admin::new("greptime", self.client.clone()) + pub fn admin(&self, database: &str) -> Admin { + Admin::new(database, self.client.clone()) } pub fn set_catalog_manager(&mut self, catalog_manager: CatalogManagerRef) { @@ -173,7 +172,9 @@ impl Instance { if let Some(dist_instance) = &self.dist_instance { dist_instance.handle_select(expr).await } else { - self.database() + // TODO(LFC): Find a better way to execute query between Frontend and Datanode in standalone mode. + // Otherwise we have to parse SQL first to get schema name. Maybe not GRPC. + self.database(DEFAULT_SCHEMA_NAME) .select(expr) .await .and_then(Output::try_from) @@ -191,7 +192,10 @@ impl Instance { v.create_table(&mut expr, partitions).await } else { // Currently standalone mode does not support multi partitions/regions. - let result = self.admin().create(expr.clone()).await; + let result = self + .admin(expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME)) + .create(expr.clone()) + .await; if let Err(e) = &result { error!(e; "Failed to create table by expr: {:?}", expr); } @@ -203,7 +207,7 @@ impl Instance { /// Handle create database expr. pub async fn handle_create_database(&self, expr: CreateDatabaseExpr) -> Result { - self.admin() + self.admin(DEFAULT_SCHEMA_NAME) .create_database(expr) .await .and_then(admin_result_to_output) @@ -212,7 +216,7 @@ impl Instance { /// Handle alter expr pub async fn handle_alter(&self, expr: AlterExpr) -> Result { - self.admin() + self.admin(expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME)) .alter(expr) .await .and_then(admin_result_to_output) @@ -234,8 +238,8 @@ impl Instance { /// Handle insert. for 'values' insertion, create/alter the destination table on demand. pub async fn handle_insert(&self, insert_expr: &InsertExpr) -> Result { let table_name = &insert_expr.table_name; - let catalog_name = "greptime"; - let schema_name = "public"; + let catalog_name = DEFAULT_CATALOG_NAME; + let schema_name = &insert_expr.schema_name; if let Some(expr) = &insert_expr.expr { match expr { @@ -253,7 +257,7 @@ impl Instance { } api::v1::insert_expr::Expr::Sql(_) => { // Frontend does not comprehend insert request that is raw SQL string - self.database() + self.database(schema_name) .insert(insert_expr.clone()) .await .and_then(Output::try_from) @@ -286,7 +290,7 @@ impl Instance { &insert_batches, ) .await?; - self.database() + self.database(schema_name) .insert(InsertExpr { schema_name: schema_name.to_string(), table_name: table_name.to_string(), @@ -350,8 +354,13 @@ impl Instance { "Find new columns {:?} on insertion, try to alter table: {}.{}.{}", add_columns, catalog_name, schema_name, table_name ); - self.add_new_columns_to_table(table_name, add_columns) - .await?; + self.add_new_columns_to_table( + catalog_name, + schema_name, + table_name, + add_columns, + ) + .await?; info!( "Successfully altered table on insertion: {}.{}.{}", catalog_name, schema_name, table_name @@ -386,6 +395,8 @@ impl Instance { async fn add_new_columns_to_table( &self, + catalog_name: &str, + schema_name: &str, table_name: &str, add_columns: AddColumns, ) -> Result { @@ -395,11 +406,11 @@ impl Instance { ); let expr = AlterExpr { table_name: table_name.to_string(), - schema_name: None, - catalog_name: None, + schema_name: Some(schema_name.to_string()), + catalog_name: Some(catalog_name.to_string()), kind: Some(Kind::AddColumns(add_columns)), }; - self.admin() + self.admin(schema_name) .alter(expr) .await .and_then(admin_result_to_output) @@ -608,8 +619,10 @@ impl GrpcQueryHandler for Instance { query: format!("{:?}", query), }) } + + // FIXME(hl): refactor _ => self - .database() + .database(DEFAULT_SCHEMA_NAME) .object(query.clone()) .await .map_err(BoxedError::new) @@ -618,22 +631,27 @@ impl GrpcQueryHandler for Instance { }), } } else { - // why? - self.database() - .object(query.clone()) - .await - .map_err(BoxedError::new) - .with_context(|_| server_error::ExecuteQuerySnafu { - query: format!("{:?}", query), - }) + server_error::InvalidQuerySnafu { + reason: "empty query", + } + .fail() } } } +fn get_schema_name(expr: &AdminExpr) -> &str { + let schema_name = match &expr.expr { + Some(admin_expr::Expr::Create(expr)) => expr.schema_name.as_deref(), + Some(admin_expr::Expr::Alter(expr)) => expr.schema_name.as_deref(), + Some(admin_expr::Expr::CreateDatabase(_)) | None => Some(DEFAULT_SCHEMA_NAME), + }; + schema_name.unwrap_or(DEFAULT_SCHEMA_NAME) +} + #[async_trait] impl GrpcAdminHandler for Instance { async fn exec_admin_request(&self, expr: AdminExpr) -> server_error::Result { - self.admin() + self.admin(get_schema_name(&expr)) .do_request(expr.clone()) .await .map_err(BoxedError::new) diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index e3dc41aa0d..6c2ec04f00 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -68,7 +68,7 @@ async fn handle_remote_queries( let mut results = Vec::with_capacity(queries.len()); for q in queries { - let (table_name, sql) = prometheus::query_to_sql(q)?; + let (table_name, sql) = prometheus::query_to_sql(db.name(), q)?; logging::debug!( "prometheus remote read, table: {}, sql: {}", @@ -90,10 +90,10 @@ async fn handle_remote_queries( #[async_trait] impl PrometheusProtocolHandler for Instance { - async fn write(&self, request: WriteRequest) -> ServerResult<()> { + async fn write(&self, database: &str, request: WriteRequest) -> ServerResult<()> { match self.mode { Mode::Standalone => { - let exprs = prometheus::write_request_to_insert_exprs(request)?; + let exprs = prometheus::write_request_to_insert_exprs(database, request)?; let futures = exprs .iter() .map(|e| self.handle_insert(e)) @@ -108,7 +108,7 @@ impl PrometheusProtocolHandler for Instance { })?; } Mode::Distributed(_) => { - let inserts = prometheus::write_request_to_insert_reqs(request)?; + let inserts = prometheus::write_request_to_insert_reqs(database, request)?; self.dist_insert(inserts) .await @@ -122,11 +122,11 @@ impl PrometheusProtocolHandler for Instance { Ok(()) } - async fn read(&self, request: ReadRequest) -> ServerResult { + async fn read(&self, database: &str, request: ReadRequest) -> ServerResult { let response_type = negotiate_response_type(&request.accepted_response_types)?; // TODO(dennis): use read_hints to speedup query if possible - let results = handle_remote_queries(&self.database(), &request.queries).await?; + let results = handle_remote_queries(&self.database(database), &request.queries).await?; match response_type { ResponseType::Samples => { @@ -165,6 +165,7 @@ mod tests { use api::prometheus::remote::{ label_matcher::Type as MatcherType, Label, LabelMatcher, Sample, }; + use api::v1::CreateDatabaseExpr; use super::*; use crate::tests; @@ -179,7 +180,16 @@ mod tests { ..Default::default() }; - instance.write(write_request).await.unwrap(); + let db = "prometheus"; + + instance + .handle_create_database(CreateDatabaseExpr { + database_name: db.to_string(), + }) + .await + .unwrap(); + + instance.write(db, write_request).await.unwrap(); let read_request = ReadRequest { queries: vec![ @@ -214,7 +224,7 @@ mod tests { ..Default::default() }; - let resp = instance.read(read_request).await.unwrap(); + let resp = instance.read(db, read_request).await.unwrap(); assert_eq!(resp.content_type, "application/x-protobuf"); assert_eq!(resp.content_encoding, "snappy"); let body = prometheus::snappy_decompress(&resp.body).unwrap(); diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index 1b645695f0..03e47feb37 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -1,5 +1,5 @@ pub use opendal::{ - io_util::SeekableReader, services, Accessor, DirEntry, DirStreamer, Layer, Object, + io_util::SeekableReader, layers, services, Accessor, DirEntry, DirStreamer, Layer, Object, ObjectMetadata, ObjectMode, Operator as ObjectStore, }; pub mod backend; diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index ab28bf798e..ae360262d1 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -329,7 +329,6 @@ impl HttpServer { router = router.nest(&format!("/{}/opentsdb", HTTP_API_VERSION), opentsdb_router); } - // TODO(fys): Creating influxdb's database when we can create greptime schema. if let Some(influxdb_handler) = self.influxdb_handler.clone() { let influxdb_router = Router::with_state(influxdb_handler).route("/write", routing::post(influxdb_write)); diff --git a/src/servers/src/http/influxdb.rs b/src/servers/src/http/influxdb.rs index cef1716b7a..9a561fac92 100644 --- a/src/servers/src/http/influxdb.rs +++ b/src/servers/src/http/influxdb.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use axum::extract::{Query, State}; use axum::http::StatusCode; +use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_grpc::writer::Precision; use crate::error::Result; @@ -12,14 +13,22 @@ use crate::query_handler::InfluxdbLineProtocolHandlerRef; #[axum_macros::debug_handler] pub async fn influxdb_write( State(handler): State, - Query(params): Query>, + Query(mut params): Query>, lines: String, ) -> Result<(StatusCode, ())> { + let db = params + .remove("db") + .unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()); + let precision = params .get("precision") .map(|val| parse_time_precision(val)) .transpose()?; - let request = InfluxdbRequest { precision, lines }; + let request = InfluxdbRequest { + precision, + lines, + db, + }; handler.exec(&request).await?; Ok((StatusCode::NO_CONTENT, ())) } diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index f740eb322f..e286bcf75c 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -1,24 +1,43 @@ use api::prometheus::remote::{ReadRequest, WriteRequest}; -use axum::extract::{RawBody, State}; +use axum::extract::{Query, RawBody, State}; use axum::http::header; use axum::http::StatusCode; use axum::response::IntoResponse; +use common_catalog::consts::DEFAULT_SCHEMA_NAME; use hyper::Body; use prost::Message; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; use snafu::prelude::*; use crate::error::{self, Result}; use crate::prometheus::snappy_decompress; use crate::query_handler::{PrometheusProtocolHandlerRef, PrometheusResponse}; +#[derive(Debug, Serialize, Deserialize, JsonSchema)] +pub struct DatabaseQuery { + pub db: Option, +} + +impl Default for DatabaseQuery { + fn default() -> DatabaseQuery { + Self { + db: Some(DEFAULT_SCHEMA_NAME.to_string()), + } + } +} + #[axum_macros::debug_handler] pub async fn remote_write( State(handler): State, + Query(params): Query, RawBody(body): RawBody, ) -> Result<(StatusCode, ())> { let request = decode_remote_write_request(body).await?; - handler.write(request).await?; + handler + .write(params.db.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME), request) + .await?; Ok((StatusCode::NO_CONTENT, ())) } @@ -39,11 +58,14 @@ impl IntoResponse for PrometheusResponse { #[axum_macros::debug_handler] pub async fn remote_read( State(handler): State, + Query(params): Query, RawBody(body): RawBody, ) -> Result { let request = decode_remote_read_request(body).await?; - handler.read(request).await + handler + .read(params.db.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME), request) + .await } async fn decode_remote_write_request(body: Body) -> Result { diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 47ced11122..d09029bbc9 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -4,7 +4,6 @@ use api::v1::{ insert_expr::{self, Expr}, InsertExpr, }; -use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_grpc::writer::{LinesWriter, Precision}; use influxdb_line_protocol::{parse_lines, FieldValue}; use snafu::ResultExt; @@ -18,6 +17,7 @@ pub const DEFAULT_TIME_PRECISION: Precision = Precision::NANOSECOND; pub struct InfluxdbRequest { pub precision: Option, + pub db: String, pub lines: String, } @@ -32,12 +32,13 @@ impl TryFrom<&InfluxdbRequest> for Vec { .context(InfluxdbLineProtocolSnafu)?; let line_len = lines.len(); let mut writers: HashMap = HashMap::new(); + let db = &value.db; for line in lines { let table_name = line.series.measurement; let writer = writers .entry(table_name.to_string()) - .or_insert_with(|| LineWriter::with_lines(table_name, line_len)); + .or_insert_with(|| LineWriter::with_lines(db, table_name, line_len)); let tags = line.series.tag_set; if let Some(tags) = tags { @@ -81,8 +82,7 @@ impl TryFrom<&InfluxdbRequest> for Vec { type Error = Error; fn try_from(value: &InfluxdbRequest) -> Result { - // InfluxDB uses default catalog name and schema name - let schema_name = DEFAULT_SCHEMA_NAME.to_string(); + let schema_name = value.db.to_string(); let mut writers: HashMap = HashMap::new(); let lines = parse_lines(&value.lines) @@ -192,12 +192,14 @@ monitor2,host=host3 cpu=66.5 1663840496100023102 monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; let influxdb_req = &InfluxdbRequest { + db: "influxdb".to_string(), precision: None, lines: lines.to_string(), }; let insert_reqs: Vec = influxdb_req.try_into().unwrap(); for insert_req in insert_reqs { + assert_eq!("influxdb", insert_req.schema_name); match &insert_req.table_name[..] { "monitor1" => assert_table_1(&insert_req), "monitor2" => assert_table_2(&insert_req), @@ -216,6 +218,7 @@ monitor2,host=host3 cpu=66.5 1663840496100023102 monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; let influxdb_req = &InfluxdbRequest { + db: "public".to_string(), precision: None, lines: lines.to_string(), }; @@ -225,6 +228,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; assert_eq!(2, insert_exprs.len()); for expr in insert_exprs { + assert_eq!("public", expr.schema_name); let values = match expr.expr.unwrap() { Expr::Values(vals) => vals, Expr::Sql(_) => panic!(), diff --git a/src/servers/src/line_writer.rs b/src/servers/src/line_writer.rs index ed6b6c5ab4..a2196b45b6 100644 --- a/src/servers/src/line_writer.rs +++ b/src/servers/src/line_writer.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_grpc::writer::{to_ms_ts, Precision}; use common_time::{timestamp::TimeUnit::Millisecond, Timestamp}; use datatypes::{ @@ -15,6 +15,7 @@ type ColumnLen = usize; type ColumnName = String; pub struct LineWriter { + db: String, table_name: String, expected_rows: usize, current_rows: usize, @@ -22,8 +23,9 @@ pub struct LineWriter { } impl LineWriter { - pub fn with_lines(table_name: impl Into, lines: usize) -> Self { + pub fn with_lines(db: impl Into, table_name: impl Into, lines: usize) -> Self { Self { + db: db.into(), table_name: table_name.into(), expected_rows: lines, current_rows: 0, @@ -122,8 +124,7 @@ impl LineWriter { .collect(); InsertRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), - // TODO(dennis): supports database - schema_name: DEFAULT_SCHEMA_NAME.to_string(), + schema_name: self.db, table_name: self.table_name, columns_values, } @@ -134,6 +135,7 @@ impl LineWriter { mod tests { use std::sync::Arc; + use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_time::Timestamp; use datatypes::{value::Value, vectors::Vector}; @@ -141,7 +143,7 @@ mod tests { #[test] fn test_writer() { - let mut writer = LineWriter::with_lines("demo".to_string(), 4); + let mut writer = LineWriter::with_lines(DEFAULT_SCHEMA_NAME, "demo".to_string(), 4); writer.write_ts("ts", (1665893727685, Precision::MILLISECOND)); writer.write_tag("host", "host-1"); writer.write_i64("memory", 10_i64); @@ -162,6 +164,7 @@ mod tests { let insert_request = writer.finish(); assert_eq!("demo", insert_request.table_name); + assert_eq!(DEFAULT_SCHEMA_NAME, insert_request.schema_name); let columns = insert_request.columns_values; assert_eq!(5, columns.len()); diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index aa743324c6..cd7f2ea2aa 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -117,7 +117,7 @@ impl DataPoint { } pub fn as_insert_request(&self) -> InsertRequest { - let mut line_writer = LineWriter::with_lines(self.metric.clone(), 1); + let mut line_writer = LineWriter::with_lines(DEFAULT_SCHEMA_NAME, self.metric.clone(), 1); line_writer.write_ts( OPENTSDB_TIMESTAMP_COLUMN_NAME, (self.ts_millis(), Precision::MILLISECOND), diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index 1070aba65e..4dd642dfb5 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -11,7 +11,6 @@ use api::v1::{ codec::SelectResult, column, column::SemanticType, insert_expr, Column, ColumnDataType, InsertExpr, }; -use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_grpc::writer::Precision::MILLISECOND; use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue}; use snafu::{OptionExt, ResultExt}; @@ -32,7 +31,7 @@ pub struct Metrics { /// Generate a sql from a remote request query /// TODO(dennis): maybe use logical plan in future to prevent sql injection -pub fn query_to_sql(q: &Query) -> Result<(String, String)> { +pub fn query_to_sql(db: &str, q: &Query) -> Result<(String, String)> { let start_timestamp_ms = q.start_timestamp_ms; let end_timestamp_ms = q.end_timestamp_ms; @@ -93,8 +92,8 @@ pub fn query_to_sql(q: &Query) -> Result<(String, String)> { Ok(( table_name.to_string(), format!( - "select * from {} where {} order by {}", - table_name, conditions, TIMESTAMP_COLUMN_NAME, + "select * from {}.{} where {} order by {}", + db, table_name, conditions, TIMESTAMP_COLUMN_NAME, ), )) } @@ -280,16 +279,19 @@ pub fn select_result_to_timeseries( } /// Cast a remote write request into InsertRequest -pub fn write_request_to_insert_reqs(mut request: WriteRequest) -> Result> { +pub fn write_request_to_insert_reqs( + db: &str, + mut request: WriteRequest, +) -> Result> { let timeseries = std::mem::take(&mut request.timeseries); timeseries .into_iter() - .map(timeseries_to_insert_request) + .map(|timeseries| timeseries_to_insert_request(db, timeseries)) .collect() } -fn timeseries_to_insert_request(mut timeseries: TimeSeries) -> Result { +fn timeseries_to_insert_request(db: &str, mut timeseries: TimeSeries) -> Result { // TODO(dennis): save exemplars into a column let labels = std::mem::take(&mut timeseries.labels); let samples = std::mem::take(&mut timeseries.samples); @@ -306,7 +308,7 @@ fn timeseries_to_insert_request(mut timeseries: TimeSeries) -> Result Result Result> { +pub fn write_request_to_insert_exprs( + database: &str, + mut request: WriteRequest, +) -> Result> { let timeseries = std::mem::take(&mut request.timeseries); timeseries .into_iter() - .map(timeseries_to_insert_expr) + .map(|timeseries| timeseries_to_insert_expr(database, timeseries)) .collect() } // TODO(fys): it will remove in the future. -fn timeseries_to_insert_expr(mut timeseries: TimeSeries) -> Result { - let schema_name = DEFAULT_SCHEMA_NAME.to_string(); +fn timeseries_to_insert_expr(database: &str, mut timeseries: TimeSeries) -> Result { + let schema_name = database.to_string(); // TODO(dennis): save exemplars into a column let labels = std::mem::take(&mut timeseries.labels); @@ -518,7 +523,7 @@ mod tests { matchers: vec![], ..Default::default() }; - let err = query_to_sql(&q).unwrap_err(); + let err = query_to_sql("public", &q).unwrap_err(); assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. })); let q = Query { @@ -531,9 +536,9 @@ mod tests { }], ..Default::default() }; - let (table, sql) = query_to_sql(&q).unwrap(); + let (table, sql) = query_to_sql("public", &q).unwrap(); assert_eq!("test", table); - assert_eq!("select * from test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 order by greptime_timestamp", sql); + assert_eq!("select * from public.test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 order by greptime_timestamp", sql); let q = Query { start_timestamp_ms: 1000, @@ -557,9 +562,9 @@ mod tests { ], ..Default::default() }; - let (table, sql) = query_to_sql(&q).unwrap(); + let (table, sql) = query_to_sql("public", &q).unwrap(); assert_eq!("test", table); - assert_eq!("select * from test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 AND job~'*prom*' AND instance!='localhost' order by greptime_timestamp", sql); + assert_eq!("select * from public.test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 AND job~'*prom*' AND instance!='localhost' order by greptime_timestamp", sql); } #[test] @@ -569,11 +574,12 @@ mod tests { ..Default::default() }; - let reqs = write_request_to_insert_reqs(write_request).unwrap(); + let reqs = write_request_to_insert_reqs("public", write_request).unwrap(); assert_eq!(3, reqs.len()); let req1 = reqs.get(0).unwrap(); + assert_eq!("public", req1.schema_name); assert_eq!("metric1", req1.table_name); let columns = &req1.columns_values; @@ -593,6 +599,7 @@ mod tests { assert_vector(&expected, val); let req2 = reqs.get(1).unwrap(); + assert_eq!("public", req2.schema_name); assert_eq!("metric2", req2.table_name); let columns = &req2.columns_values; @@ -616,6 +623,7 @@ mod tests { assert_vector(&expected, val); let req3 = reqs.get(2).unwrap(); + assert_eq!("public", req3.schema_name); assert_eq!("metric3", req3.table_name); let columns = &req3.columns_values; @@ -654,8 +662,11 @@ mod tests { ..Default::default() }; - let exprs = write_request_to_insert_exprs(write_request).unwrap(); + let exprs = write_request_to_insert_exprs("prometheus", write_request).unwrap(); assert_eq!(3, exprs.len()); + assert_eq!("prometheus", exprs[0].schema_name); + assert_eq!("prometheus", exprs[1].schema_name); + assert_eq!("prometheus", exprs[2].schema_name); assert_eq!("metric1", exprs[0].table_name); assert_eq!("metric2", exprs[1].table_name); assert_eq!("metric3", exprs[2].table_name); diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 46b194169b..e466f61214 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -67,9 +67,9 @@ pub struct PrometheusResponse { #[async_trait] pub trait PrometheusProtocolHandler { /// Handling prometheus remote write requests - async fn write(&self, request: WriteRequest) -> Result<()>; + async fn write(&self, database: &str, request: WriteRequest) -> Result<()>; /// Handling prometheus remote read requests - async fn read(&self, request: ReadRequest) -> Result; + async fn read(&self, database: &str, request: ReadRequest) -> Result; /// Handling push gateway requests async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>; } diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index fe75c5bae5..e86f9b1cd2 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -12,7 +12,7 @@ use servers::query_handler::{InfluxdbLineProtocolHandler, SqlQueryHandler}; use tokio::sync::mpsc; struct DummyInstance { - tx: mpsc::Sender, + tx: mpsc::Sender<(String, String)>, } #[async_trait] @@ -21,7 +21,7 @@ impl InfluxdbLineProtocolHandler for DummyInstance { let exprs: Vec = request.try_into()?; for expr in exprs { - let _ = self.tx.send(expr.table_name).await; + let _ = self.tx.send((expr.schema_name, expr.table_name)).await; } Ok(()) @@ -43,7 +43,7 @@ impl SqlQueryHandler for DummyInstance { } } -fn make_test_app(tx: mpsc::Sender) -> Router { +fn make_test_app(tx: mpsc::Sender<(String, String)>) -> Router { let instance = Arc::new(DummyInstance { tx }); let mut server = HttpServer::new(instance.clone()); server.set_influxdb_handler(instance); @@ -66,6 +66,14 @@ async fn test_influxdb_write() { assert_eq!(result.status(), 204); assert!(result.text().await.is_empty()); + let result = client + .post("/v1/influxdb/write?db=influxdb") + .body("monitor,host=host1 cpu=1.2 1664370459457010101") + .send() + .await; + assert_eq!(result.status(), 204); + assert!(result.text().await.is_empty()); + // bad request let result = client .post("/v1/influxdb/write") @@ -79,5 +87,11 @@ async fn test_influxdb_write() { while let Ok(s) = rx.try_recv() { metrics.push(s); } - assert_eq!(metrics, vec!["monitor".to_string()]); + assert_eq!( + metrics, + vec![ + ("public".to_string(), "monitor".to_string()), + ("influxdb".to_string(), "monitor".to_string()) + ] + ); } diff --git a/src/servers/tests/http/prometheus_test.rs b/src/servers/tests/http/prometheus_test.rs index 3415110be7..ce85c56c8c 100644 --- a/src/servers/tests/http/prometheus_test.rs +++ b/src/servers/tests/http/prometheus_test.rs @@ -17,18 +17,24 @@ use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse, SqlQ use tokio::sync::mpsc; struct DummyInstance { - tx: mpsc::Sender>, + tx: mpsc::Sender<(String, Vec)>, } #[async_trait] impl PrometheusProtocolHandler for DummyInstance { - async fn write(&self, request: WriteRequest) -> Result<()> { - let _ = self.tx.send(request.encode_to_vec()).await; + async fn write(&self, db: &str, request: WriteRequest) -> Result<()> { + let _ = self + .tx + .send((db.to_string(), request.encode_to_vec())) + .await; Ok(()) } - async fn read(&self, request: ReadRequest) -> Result { - let _ = self.tx.send(request.encode_to_vec()).await; + async fn read(&self, db: &str, request: ReadRequest) -> Result { + let _ = self + .tx + .send((db.to_string(), request.encode_to_vec())) + .await; let response = ReadResponse { results: vec![QueryResult { @@ -63,7 +69,7 @@ impl SqlQueryHandler for DummyInstance { } } -fn make_test_app(tx: mpsc::Sender>) -> Router { +fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { tx }); let mut server = HttpServer::new(instance.clone()); server.set_prom_handler(instance); @@ -82,6 +88,7 @@ async fn test_prometheus_remote_write_read() { ..Default::default() }; + // Write to public database let result = client .post("/v1/prometheus/write") .body(snappy_compress(&write_request.clone().encode_to_vec()[..]).unwrap()) @@ -89,6 +96,14 @@ async fn test_prometheus_remote_write_read() { .await; assert_eq!(result.status(), 204); assert!(result.text().await.is_empty()); + // Write to prometheus database + let result = client + .post("/v1/prometheus/write?db=prometheus") + .body(snappy_compress(&write_request.clone().encode_to_vec()[..]).unwrap()) + .send() + .await; + assert_eq!(result.status(), 204); + assert!(result.text().await.is_empty()); let read_request = ReadRequest { queries: vec![Query { @@ -104,8 +119,9 @@ async fn test_prometheus_remote_write_read() { ..Default::default() }; + // Read from prometheus database let mut result = client - .post("/v1/prometheus/read") + .post("/v1/prometheus/read?db=prometheus") .body(snappy_compress(&read_request.clone().encode_to_vec()[..]).unwrap()) .send() .await; @@ -127,16 +143,41 @@ async fn test_prometheus_remote_write_read() { prometheus::mock_timeseries() ); - let mut requests = vec![]; + // Read from public database + let result = client + .post("/v1/prometheus/read") + .body(snappy_compress(&read_request.clone().encode_to_vec()[..]).unwrap()) + .send() + .await; + assert_eq!(result.status(), 200); + + let mut requests: Vec<(String, Vec)> = vec![]; while let Ok(s) = rx.try_recv() { requests.push(s); } - assert_eq!(2, requests.len()); + assert_eq!(4, requests.len()); + + assert_eq!("public", requests[0].0); + assert_eq!("prometheus", requests[1].0); + assert_eq!("prometheus", requests[2].0); + assert_eq!("public", requests[3].0); assert_eq!( write_request, - WriteRequest::decode(&requests[0][..]).unwrap() + WriteRequest::decode(&(requests[0].1)[..]).unwrap() + ); + assert_eq!( + write_request, + WriteRequest::decode(&(requests[1].1)[..]).unwrap() + ); + + assert_eq!( + read_request, + ReadRequest::decode(&(requests[2].1)[..]).unwrap() + ); + assert_eq!( + read_request, + ReadRequest::decode(&(requests[3].1)[..]).unwrap() ); - assert_eq!(read_request, ReadRequest::decode(&requests[1][..]).unwrap()); } diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index da0fe797f2..8ebbbf2938 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::sync::RwLock; use async_trait::async_trait; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::BoxedError; use common_telemetry::logging; use datatypes::schema::SchemaRef; @@ -13,7 +14,7 @@ use store_api::storage::{ CreateOptions, EngineContext as StorageEngineContext, OpenOptions, RegionDescriptorBuilder, RegionId, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine, }; -use table::engine::{EngineContext, TableEngine}; +use table::engine::{EngineContext, TableEngine, TableReference}; use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; use table::Result as TableResult; use table::{ @@ -46,8 +47,8 @@ fn region_id(table_id: TableId, n: u32) -> RegionId { } #[inline] -fn table_dir(table_name: &str) -> String { - format!("{}/", table_name) +fn table_dir(schema_name: &str, table_name: &str) -> String { + format!("{}/{}/", schema_name, table_name) } /// [TableEngine] implementation. @@ -97,12 +98,16 @@ impl TableEngine for MitoEngine { Ok(self.inner.alter_table(ctx, req).await?) } - fn get_table(&self, _ctx: &EngineContext, name: &str) -> TableResult> { - Ok(self.inner.get_table(name)) + fn get_table<'a>( + &self, + _ctx: &EngineContext, + table_ref: &'a TableReference, + ) -> TableResult> { + Ok(self.inner.get_table(table_ref)) } - fn table_exists(&self, _ctx: &EngineContext, name: &str) -> bool { - self.inner.get_table(name).is_some() + fn table_exists<'a>(&self, _ctx: &EngineContext, table_ref: &'a TableReference) -> bool { + self.inner.get_table(table_ref).is_some() } async fn drop_table( @@ -114,7 +119,6 @@ impl TableEngine for MitoEngine { } } -/// FIXME(dennis) impl system catalog to keep table metadata. struct MitoEngineInner { /// All tables opened by the engine. /// @@ -243,8 +247,13 @@ impl MitoEngineInner { let catalog_name = &request.catalog_name; let schema_name = &request.schema_name; let table_name = &request.table_name; + let table_ref = TableReference { + catalog: catalog_name, + schema: schema_name, + table: table_name, + }; - if let Some(table) = self.get_table(table_name) { + if let Some(table) = self.get_table(&table_ref) { if request.create_if_not_exists { return Ok(table); } else { @@ -290,7 +299,7 @@ impl MitoEngineInner { let _lock = self.table_mutex.lock().await; // Checks again, read lock should be enough since we are guarded by the mutex. - if let Some(table) = self.get_table(table_name) { + if let Some(table) = self.get_table(&table_ref) { if request.create_if_not_exists { return Ok(table); } else { @@ -298,8 +307,9 @@ impl MitoEngineInner { } } + let table_dir = table_dir(schema_name, table_name); let opts = CreateOptions { - parent_dir: table_dir(table_name), + parent_dir: table_dir.clone(), }; let region = self @@ -329,7 +339,14 @@ impl MitoEngineInner { .context(error::BuildTableInfoSnafu { table_name })?; let table = Arc::new( - MitoTable::create(table_name, table_info, region, self.object_store.clone()).await?, + MitoTable::create( + table_name, + &table_dir, + table_info, + region, + self.object_store.clone(), + ) + .await?, ); logging::info!("Mito engine created table: {:?}.", table.table_info()); @@ -337,19 +354,26 @@ impl MitoEngineInner { self.tables .write() .unwrap() - .insert(table_name.clone(), table.clone()); + .insert(table_ref.to_string(), table.clone()); Ok(table) } - // TODO(yingwen): Support catalog and schema name. async fn open_table( &self, _ctx: &EngineContext, request: OpenTableRequest, ) -> TableResult> { + let catalog_name = &request.catalog_name; + let schema_name = &request.schema_name; let table_name = &request.table_name; - if let Some(table) = self.get_table(table_name) { + let table_ref = TableReference { + catalog: catalog_name, + schema: schema_name, + table: table_name, + }; + + if let Some(table) = self.get_table(&table_ref) { // Table has already been opened. return Ok(Some(table)); } @@ -358,13 +382,14 @@ impl MitoEngineInner { let table = { let _lock = self.table_mutex.lock().await; // Checks again, read lock should be enough since we are guarded by the mutex. - if let Some(table) = self.get_table(table_name) { + if let Some(table) = self.get_table(&table_ref) { return Ok(Some(table)); } let engine_ctx = StorageEngineContext::default(); + let table_dir = table_dir(schema_name, table_name); let opts = OpenOptions { - parent_dir: table_dir(table_name), + parent_dir: table_dir.to_string(), }; let table_id = request.table_id; @@ -383,13 +408,14 @@ impl MitoEngineInner { Some(region) => region, }; - let table = - Arc::new(MitoTable::open(table_name, region, self.object_store.clone()).await?); + let table = Arc::new( + MitoTable::open(table_name, &table_dir, region, self.object_store.clone()).await?, + ); self.tables .write() .unwrap() - .insert(table_name.to_string(), table.clone()); + .insert(table_ref.to_string(), table.clone()); Some(table as _) }; @@ -398,14 +424,26 @@ impl MitoEngineInner { Ok(table) } - fn get_table(&self, name: &str) -> Option { - self.tables.read().unwrap().get(name).cloned() + fn get_table<'a>(&self, table_ref: &'a TableReference) -> Option { + self.tables + .read() + .unwrap() + .get(&table_ref.to_string()) + .cloned() } async fn alter_table(&self, _ctx: &EngineContext, req: AlterTableRequest) -> Result { + let catalog_name = req.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME); + let schema_name = req.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME); let table_name = &req.table_name.clone(); + + let table_ref = TableReference { + catalog: catalog_name, + schema: schema_name, + table: table_name, + }; let table = self - .get_table(table_name) + .get_table(&table_ref) .context(error::TableNotFoundSnafu { table_name })?; logging::info!("start altering table {} with request {:?}", table_name, req); @@ -585,8 +623,8 @@ mod tests { #[test] fn test_table_dir() { - assert_eq!("test_table/", table_dir("test_table")); - assert_eq!("demo/", table_dir("demo")); + assert_eq!("public/test_table/", table_dir("public", "test_table")); + assert_eq!("prometheus/demo/", table_dir("prometheus", "demo")); } #[tokio::test] @@ -771,8 +809,8 @@ mod tests { let ctx = EngineContext::default(); let open_req = OpenTableRequest { - catalog_name: String::new(), - schema_name: String::new(), + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: test_util::TABLE_NAME.to_string(), // the test table id is 1 table_id: 1, diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index b61722ea63..0e1c9b2003 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -41,8 +41,8 @@ use crate::manifest::action::*; use crate::manifest::TableManifest; #[inline] -fn table_manifest_dir(table_name: &str) -> String { - format!("{}/manifest/", table_name) +fn table_manifest_dir(table_dir: &str) -> String { + format!("{}/manifest/", table_dir) } /// [Table] implementation. @@ -341,11 +341,12 @@ impl MitoTable { pub async fn create( table_name: &str, + table_dir: &str, table_info: TableInfo, region: R, object_store: ObjectStore, ) -> Result> { - let manifest = TableManifest::new(&table_manifest_dir(table_name), object_store); + let manifest = TableManifest::new(&table_manifest_dir(table_dir), object_store); // TODO(dennis): save manifest version into catalog? let _manifest_version = manifest @@ -377,10 +378,11 @@ impl MitoTable { pub async fn open( table_name: &str, + table_dir: &str, region: R, object_store: ObjectStore, ) -> Result> { - let manifest = TableManifest::new(&table_manifest_dir(table_name), object_store); + let manifest = TableManifest::new(&table_manifest_dir(table_dir), object_store); let mut table_info = Self::recover_table_info(table_name, &manifest) .await? diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index db418f2529..a2ed36bbc7 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -1,9 +1,23 @@ +use std::fmt::{self, Display}; use std::sync::Arc; use crate::error::Result; use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; use crate::TableRef; +/// Represents a resolved path to a table of the form “catalog.schema.table” +pub struct TableReference<'a> { + pub catalog: &'a str, + pub schema: &'a str, + pub table: &'a str, +} + +impl<'a> Display for TableReference<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}.{}.{}", self.catalog, self.schema, self.table) + } +} + /// Table engine abstraction. #[async_trait::async_trait] pub trait TableEngine: Send + Sync { @@ -37,11 +51,14 @@ pub trait TableEngine: Send + Sync { ) -> Result; /// Returns the table by it's name. - fn get_table(&self, ctx: &EngineContext, name: &str) -> Result>; + fn get_table<'a>( + &self, + ctx: &EngineContext, + table_ref: &'a TableReference, + ) -> Result>; /// Returns true when the given table is exists. - /// TODO(hl): support catalog and schema - fn table_exists(&self, ctx: &EngineContext, name: &str) -> bool; + fn table_exists<'a>(&self, ctx: &EngineContext, table_ref: &'a TableReference) -> bool; /// Drops the given table. async fn drop_table(&self, ctx: &EngineContext, request: DropTableRequest) -> Result<()>; @@ -52,3 +69,19 @@ pub type TableEngineRef = Arc; /// Storage engine context. #[derive(Debug, Clone, Default)] pub struct EngineContext {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_table_reference() { + let table_ref = TableReference { + catalog: "greptime", + schema: "public", + table: "test", + }; + + assert_eq!("greptime.public.test", table_ref.to_string()); + } +} diff --git a/src/table/src/test_util/mock_engine.rs b/src/table/src/test_util/mock_engine.rs index ff05380a63..75c038f356 100644 --- a/src/table/src/test_util/mock_engine.rs +++ b/src/table/src/test_util/mock_engine.rs @@ -6,7 +6,7 @@ use tokio::sync::Mutex; use crate::test_util::EmptyTable; use crate::{ - engine::{EngineContext, TableEngine}, + engine::{EngineContext, TableEngine, TableReference}, requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}, Result, TableRef, }; @@ -73,11 +73,15 @@ impl TableEngine for MockTableEngine { unimplemented!() } - fn get_table(&self, _ctx: &EngineContext, _name: &str) -> Result> { + fn get_table<'a>( + &self, + _ctx: &EngineContext, + _ref: &'a TableReference, + ) -> Result> { unimplemented!() } - fn table_exists(&self, _ctx: &EngineContext, _name: &str) -> bool { + fn table_exists<'a>(&self, _ctx: &EngineContext, _name: &'a TableReference) -> bool { unimplemented!() }