diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 36fe628787..8fb6e4609a 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -256,8 +256,8 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str { Some(Expr::Alter(_)) => "ddl.alter", Some(Expr::DropTable(_)) => "ddl.drop_table", Some(Expr::FlushTable(_)) => "ddl.flush_table", + Some(Expr::CompactTable(_)) => "ddl.compact_table", None => "ddl.empty", - _ => unreachable!("https://github.com/GreptimeTeam/greptimedb/pull/1912"), } } diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 7b36cb7413..1c522a322f 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -115,7 +115,7 @@ impl RemoteCatalogManager { joins.push(self.initiate_schemas(node_id, backend, engine_manager, catalog_name)); } - let _ = futures::future::try_join_all(joins).await?; + futures::future::try_join_all(joins).await?; Ok(()) } diff --git a/src/client/src/database.rs b/src/client/src/database.rs index b48e06becb..c3c9749fde 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -17,9 +17,9 @@ use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{ - greptime_response, AffectedRows, AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, - DeleteRequest, DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, PromRangeQuery, - QueryRequest, RequestHeader, + greptime_response, AffectedRows, AlterExpr, AuthHeader, CompactTableExpr, CreateTableExpr, + DdlRequest, DeleteRequest, DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, + PromRangeQuery, QueryRequest, RequestHeader, }; use arrow_flight::{FlightData, Ticket}; use common_error::prelude::*; @@ -234,6 +234,14 @@ impl Database { .await } + pub async fn compact_table(&self, expr: CompactTableExpr) -> Result { + let _timer = timer!(metrics::METRIC_GRPC_COMPACT_TABLE); + self.do_get(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CompactTable(expr)), + })) + .await + } + async fn do_get(&self, request: Request) -> Result { // FIXME(paomian): should be added some labels for metrics let _timer = timer!(metrics::METRIC_GRPC_DO_GET); diff --git a/src/client/src/metrics.rs b/src/client/src/metrics.rs index 370b17eb58..cba5ec1441 100644 --- a/src/client/src/metrics.rs +++ b/src/client/src/metrics.rs @@ -22,4 +22,5 @@ pub const METRIC_GRPC_LOGICAL_PLAN: &str = "grpc.logical_plan"; pub const METRIC_GRPC_ALTER: &str = "grpc.alter"; pub const METRIC_GRPC_DROP_TABLE: &str = "grpc.drop_table"; pub const METRIC_GRPC_FLUSH_TABLE: &str = "grpc.flush_table"; +pub const METRIC_GRPC_COMPACT_TABLE: &str = "grpc.compact_table"; pub const METRIC_GRPC_DO_GET: &str = "grpc.do_get"; diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 381c0aa212..a023e8871c 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -346,13 +346,6 @@ pub enum Error { source: catalog::error::Error, }, - #[snafu(display("Failed to find table {} from catalog, source: {}", table_name, source))] - FindTable { - table_name: String, - location: Location, - source: catalog::error::Error, - }, - #[snafu(display("Failed to initialize meta client, source: {}", source))] MetaClientInit { location: Location, @@ -499,7 +492,6 @@ impl ErrorExt for Error { DecodeLogicalPlan { source, .. } => source.status_code(), NewCatalog { source, .. } | RegisterSchema { source, .. } => source.status_code(), - FindTable { source, .. } => source.status_code(), CreateTable { source, .. } => source.status_code(), DropTable { source, .. } => source.status_code(), FlushTable { source, .. } => source.status_code(), diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 209b42060e..5cd0433003 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -198,9 +198,7 @@ impl Instance { DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr, query_ctx).await, DdlExpr::DropTable(expr) => self.handle_drop_table(expr).await, DdlExpr::FlushTable(expr) => self.handle_flush_table(expr).await, - Expr::CompactTable(_) => { - unreachable!("https://github.com/GreptimeTeam/greptimedb/pull/1912") - } + Expr::CompactTable(expr) => self.handle_compact_table(expr).await, } } } diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 89096c5e17..59615964f2 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, FlushTableExpr}; +use api::v1::{AlterExpr, CompactTableExpr, CreateTableExpr, DropTableExpr, FlushTableExpr}; use common_catalog::consts::IMMUTABLE_FILE_ENGINE; use common_catalog::format_full_table_name; use common_grpc_expr::{alter_expr_to_request, create_expr_to_request}; @@ -20,7 +20,7 @@ use common_query::Output; use common_telemetry::info; use session::context::QueryContext; use snafu::prelude::*; -use table::requests::{DropTableRequest, FlushTableRequest}; +use table::requests::{CompactTableRequest, DropTableRequest, FlushTableRequest}; use crate::error::{ AlterExprToRequestSnafu, BumpTableIdSnafu, CatalogSnafu, CreateExprToRequestSnafu, @@ -133,6 +133,25 @@ impl Instance { .execute(SqlRequest::FlushTable(req), QueryContext::arc()) .await } + + pub(crate) async fn handle_compact_table(&self, expr: CompactTableExpr) -> Result { + let table_name = if expr.table_name.trim().is_empty() { + None + } else { + Some(expr.table_name) + }; + + let req = CompactTableRequest { + catalog_name: expr.catalog_name, + schema_name: expr.schema_name, + table_name, + region_number: expr.region_number, + wait: None, + }; + self.sql_handler() + .execute(SqlRequest::CompactTable(req), QueryContext::arc()) + .await + } } #[cfg(test)] diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 611bfe7f56..d4b8400d76 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -31,6 +31,7 @@ use crate::error::{ use crate::instance::sql::table_idents_to_full_name; mod alter; +mod compact_table; mod create; mod create_external; mod drop_table; @@ -44,6 +45,7 @@ pub enum SqlRequest { Alter(AlterTableRequest), DropTable(DropTableRequest), FlushTable(FlushTableRequest), + CompactTable(CompactTableRequest), } // Handler to execute SQL except query @@ -74,6 +76,7 @@ impl SqlHandler { SqlRequest::Alter(req) => self.alter_table(req).await, SqlRequest::DropTable(req) => self.drop_table(req).await, SqlRequest::FlushTable(req) => self.flush_table(req).await, + SqlRequest::CompactTable(req) => self.compact_table(req).await, }; if let Err(e) = &result { error!(e; "{query_ctx}"); diff --git a/src/datanode/src/sql/compact_table.rs b/src/datanode/src/sql/compact_table.rs new file mode 100644 index 0000000000..88c2bb9d50 --- /dev/null +++ b/src/datanode/src/sql/compact_table.rs @@ -0,0 +1,77 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use catalog::CatalogManagerRef; +use common_query::Output; +use snafu::{OptionExt, ResultExt}; +use table::requests::CompactTableRequest; + +use crate::error; +use crate::error::{CatalogSnafu, Result}; +use crate::sql::SqlHandler; + +impl SqlHandler { + pub(crate) async fn compact_table(&self, req: CompactTableRequest) -> Result { + if let Some(table) = &req.table_name { + self.compact_table_inner( + &self.catalog_manager, + &req.catalog_name, + &req.schema_name, + table, + req.region_number, + req.wait, + ) + .await?; + } else { + let all_table_names = self + .catalog_manager + .table_names(&req.catalog_name, &req.schema_name) + .await + .context(CatalogSnafu)?; + let _ = futures::future::join_all(all_table_names.iter().map(|table| { + self.compact_table_inner( + &self.catalog_manager, + &req.catalog_name, + &req.schema_name, + table, + req.region_number, + req.wait, + ) + })) + .await + .into_iter() + .collect::>>()?; + } + Ok(Output::AffectedRows(0)) + } + + async fn compact_table_inner( + &self, + catalog_manager: &CatalogManagerRef, + catalog_name: &str, + schema_name: &str, + table_name: &str, + region: Option, + wait: Option, + ) -> Result<()> { + catalog_manager + .table(catalog_name, schema_name, table_name) + .await + .context(CatalogSnafu)? + .context(error::TableNotFoundSnafu { table_name })? + .compact(region, wait) + .await + .context(error::FlushTableSnafu { table_name }) + } +} diff --git a/src/datanode/src/sql/flush_table.rs b/src/datanode/src/sql/flush_table.rs index bfdc76fe72..b7c4459826 100644 --- a/src/datanode/src/sql/flush_table.rs +++ b/src/datanode/src/sql/flush_table.rs @@ -67,7 +67,7 @@ impl SqlHandler { catalog_manager .table(catalog_name, schema_name, table_name) .await - .context(error::FindTableSnafu { table_name })? + .context(CatalogSnafu)? .context(error::TableNotFoundSnafu { table_name })? .flush(region, wait) .await diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index b6c728c7a5..0456254a94 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -22,8 +22,8 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::ddl_request::{Expr as DdlExpr, Expr}; use api::v1::greptime_request::Request; use api::v1::{ - column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest, DropTableExpr, - FlushTableExpr, InsertRequests, TableId, + column_def, AlterExpr, CompactTableExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest, + DropTableExpr, FlushTableExpr, InsertRequests, TableId, }; use async_trait::async_trait; use catalog::helper::{SchemaKey, SchemaValue}; @@ -34,6 +34,7 @@ use client::Database; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; use common_error::prelude::BoxedError; +use common_meta::peer::Peer; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::{ DeleteRequest as MetaDeleteRequest, Partition as MetaPartition, RouteRequest, @@ -234,6 +235,66 @@ impl DistInstance { table_name: TableName, region_number: Option, ) -> Result { + let candidates = self + .find_flush_or_compaction_candidates(&table_name, region_number) + .await?; + + let expr = FlushTableExpr { + catalog_name: table_name.catalog_name.clone(), + schema_name: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + region_number, + ..Default::default() + }; + + for candidate in candidates { + debug!("Flushing table {table_name} on Datanode {candidate:?}"); + + let client = self.datanode_clients.get_client(&candidate).await; + let client = Database::new(&expr.catalog_name, &expr.schema_name, client); + client + .flush_table(expr.clone()) + .await + .context(RequestDatanodeSnafu)?; + } + + Ok(Output::AffectedRows(0)) + } + + async fn compact_table( + &self, + table_name: TableName, + region_number: Option, + ) -> Result { + let candidates = self + .find_flush_or_compaction_candidates(&table_name, region_number) + .await?; + + let expr = CompactTableExpr { + catalog_name: table_name.catalog_name.clone(), + schema_name: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + region_number, + }; + + for candidate in candidates { + debug!("Compacting table {table_name} on Datanode {candidate:?}"); + + let client = self.datanode_clients.get_client(&candidate).await; + let client = Database::new(&expr.catalog_name, &expr.schema_name, client); + client + .compact_table(expr.clone()) + .await + .context(RequestDatanodeSnafu)?; + } + Ok(Output::AffectedRows(0)) + } + + async fn find_flush_or_compaction_candidates( + &self, + table_name: &TableName, + region_number: Option, + ) -> Result> { let _ = self .catalog_manager .table( @@ -255,38 +316,18 @@ impl DistInstance { .await .context(RequestMetaSnafu)?; - let expr = FlushTableExpr { - catalog_name: table_name.catalog_name.clone(), - schema_name: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - region_number, - ..Default::default() - }; - - for table_route in &route_response.table_routes { - let should_send_rpc = table_route.region_routes.iter().any(|route| { - if let Some(n) = region_number { - n == route.region.id.region_number() - } else { - true - } - }); - - if !should_send_rpc { - continue; - } - for datanode in table_route.find_leaders() { - debug!("Flushing table {table_name} on Datanode {datanode:?}"); - - let client = self.datanode_clients.get_client(&datanode).await; - let client = Database::new(&expr.catalog_name, &expr.schema_name, client); - let _ = client - .flush_table(expr.clone()) - .await - .context(RequestDatanodeSnafu)?; - } - } - Ok(Output::AffectedRows(0)) + let res = route_response + .table_routes + .iter() + .filter(|route| { + route.region_routes.iter().any(|r| { + let Some(n) = region_number else { return true; }; + n == r.region.id.region_number() + }) + }) + .flat_map(|route| route.find_leaders().into_iter()) + .collect::>(); + Ok(res) } async fn handle_statement( @@ -597,8 +638,10 @@ impl GrpcQueryHandler for DistInstance { TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); self.flush_table(table_name, expr.region_number).await } - Expr::CompactTable(_) => { - unreachable!("https://github.com/GreptimeTeam/greptimedb/pull/1912") + Expr::CompactTable(expr) => { + let table_name = + TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); + self.compact_table(table_name, expr.region_number).await } } } diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 57c0e012ef..4d8a3133f2 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -33,8 +33,9 @@ use object_store::ObjectStore; use snafu::{ensure, OptionExt, ResultExt}; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; use store_api::storage::{ - AddColumn, AlterOperation, AlterRequest, ChunkReader, FlushContext, FlushReason, ReadContext, - Region, RegionMeta, RegionNumber, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest, + AddColumn, AlterOperation, AlterRequest, ChunkReader, CompactContext, FlushContext, + FlushReason, ReadContext, Region, RegionMeta, RegionNumber, ScanRequest, SchemaRef, Snapshot, + WriteContext, WriteRequest, }; use table::error::{ InvalidTableSnafu, RegionSchemaMismatchSnafu, Result as TableResult, TableOperationSnafu, @@ -332,6 +333,33 @@ impl Table for MitoTable { Ok(()) } + async fn compact( + &self, + region_number: Option, + wait: Option, + ) -> TableResult<()> { + let compact_ctx = wait.map(|wait| CompactContext { wait }).unwrap_or_default(); + let regions = self.regions.load(); + + if let Some(region_number) = region_number { + if let Some(region) = regions.get(®ion_number) { + region + .compact(&compact_ctx) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + } + } else { + let _ = futures::future::try_join_all( + regions.values().map(|region| region.compact(&compact_ctx)), + ) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; + } + Ok(()) + } + fn region_stats(&self) -> TableResult> { let regions = self.regions.load(); diff --git a/src/mito/src/table/test_util/mock_engine.rs b/src/mito/src/table/test_util/mock_engine.rs index 1dfcf97832..4dadedd7b2 100644 --- a/src/mito/src/table/test_util/mock_engine.rs +++ b/src/mito/src/table/test_util/mock_engine.rs @@ -26,9 +26,10 @@ use datatypes::schema::{ColumnSchema, Schema}; use storage::metadata::{RegionMetaImpl, RegionMetadata}; use storage::write_batch::WriteBatch; use store_api::storage::{ - AlterRequest, Chunk, ChunkReader, CloseOptions, CreateOptions, EngineContext, FlushContext, - GetRequest, GetResponse, OpenOptions, ReadContext, Region, RegionDescriptor, RegionId, - ScanRequest, ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext, WriteResponse, + AlterRequest, Chunk, ChunkReader, CloseOptions, CompactContext, CreateOptions, EngineContext, + FlushContext, GetRequest, GetResponse, OpenOptions, ReadContext, Region, RegionDescriptor, + RegionId, ScanRequest, ScanResponse, SchemaRef, Snapshot, StorageEngine, WriteContext, + WriteResponse, }; pub type Result = std::result::Result; @@ -204,6 +205,10 @@ impl Region for MockRegion { async fn flush(&self, _ctx: &FlushContext) -> Result<()> { unimplemented!() } + + async fn compact(&self, _ctx: &CompactContext) -> std::result::Result<(), Self::Error> { + unimplemented!() + } } impl MockRegionInner { diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 0d03465787..19cf2bd066 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -64,7 +64,7 @@ use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb use crate::auth::UserProviderRef; use crate::configurator::ConfiguratorRef; use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu}; -use crate::http::admin::flush; +use crate::http::admin::{compact, flush}; use crate::metrics::{ METRIC_HTTP_REQUESTS_ELAPSED, METRIC_HTTP_REQUESTS_TOTAL, METRIC_METHOD_LABEL, METRIC_PATH_LABEL, METRIC_STATUS_LABEL, @@ -650,6 +650,7 @@ impl HttpServer { fn route_admin(&self, grpc_handler: ServerGrpcQueryHandlerRef) -> Router { Router::new() .route("/flush", routing::post(flush)) + .route("/compact", routing::post(compact)) .with_state(grpc_handler) } diff --git a/src/servers/src/http/admin.rs b/src/servers/src/http/admin.rs index 156cae2517..a6e25329e4 100644 --- a/src/servers/src/http/admin.rs +++ b/src/servers/src/http/admin.rs @@ -16,9 +16,10 @@ use std::collections::HashMap; use api::v1::ddl_request::Expr; use api::v1::greptime_request::Request; -use api::v1::{DdlRequest, FlushTableExpr}; +use api::v1::{CompactTableExpr, DdlRequest, FlushTableExpr}; use axum::extract::{Query, RawBody, State}; use axum::http::StatusCode; +use common_catalog::consts::DEFAULT_CATALOG_NAME; use session::context::QueryContext; use snafu::OptionExt; @@ -35,7 +36,7 @@ pub async fn flush( let catalog_name = params .get("catalog") .cloned() - .unwrap_or("greptime".to_string()); + .unwrap_or(DEFAULT_CATALOG_NAME.to_string()); let schema_name = params .get("db") .cloned() @@ -63,6 +64,46 @@ pub async fn flush( })), }); - let _ = grpc_handler.do_query(request, QueryContext::arc()).await?; + grpc_handler.do_query(request, QueryContext::arc()).await?; + Ok((StatusCode::NO_CONTENT, ())) +} + +#[axum_macros::debug_handler] +pub async fn compact( + State(grpc_handler): State, + Query(params): Query>, + RawBody(_): RawBody, +) -> Result<(StatusCode, ())> { + let catalog_name = params + .get("catalog") + .cloned() + .unwrap_or(DEFAULT_CATALOG_NAME.to_string()); + let schema_name = params + .get("db") + .cloned() + .context(error::InvalidFlushArgumentSnafu { + err_msg: "db is not present", + })?; + + // if table name is not present, flush all tables inside schema + let table_name = params.get("table").cloned().unwrap_or_default(); + + let region_number: Option = params + .get("region") + .map(|v| v.parse()) + .transpose() + .ok() + .flatten(); + + let request = Request::Ddl(DdlRequest { + expr: Some(Expr::CompactTable(CompactTableExpr { + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: table_name.clone(), + region_number, + })), + }); + + grpc_handler.do_query(request, QueryContext::arc()).await?; Ok((StatusCode::NO_CONTENT, ())) } diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index eec4aecf12..ec3e1091a5 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -20,14 +20,14 @@ use common_telemetry::{debug, error, info, timer}; use itertools::Itertools; use snafu::ResultExt; use store_api::logstore::LogStore; -use store_api::storage::RegionId; +use store_api::storage::{CompactContext, RegionId}; use crate::compaction::writer::build_sst_reader; use crate::error; use crate::error::Result; use crate::manifest::action::RegionEdit; use crate::manifest::region::RegionManifest; -use crate::region::{CompactContext, RegionWriterRef, SharedDataRef, WriterCompactRequest}; +use crate::region::{RegionWriterRef, SharedDataRef, WriterCompactRequest}; use crate::schema::RegionSchemaRef; use crate::sst::{ AccessLayerRef, FileHandle, FileId, FileMeta, Level, Source, SstInfo, WriteOptions, diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index a94aa4f3a4..a35ec67129 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -32,8 +32,8 @@ use store_api::manifest::{ self, Manifest, ManifestLogStorage, ManifestVersion, MetaActionIterator, }; use store_api::storage::{ - AlterRequest, CloseContext, CompactionStrategy, FlushContext, FlushReason, OpenOptions, - ReadContext, Region, RegionId, SequenceNumber, WriteContext, WriteResponse, + AlterRequest, CloseContext, CompactContext, CompactionStrategy, FlushContext, FlushReason, + OpenOptions, ReadContext, Region, RegionId, SequenceNumber, WriteContext, WriteResponse, }; use crate::compaction::{ @@ -150,6 +150,10 @@ impl Region for RegionImpl { async fn flush(&self, ctx: &FlushContext) -> Result<()> { self.inner.flush(ctx).await } + + async fn compact(&self, ctx: &CompactContext) -> std::result::Result<(), Self::Error> { + self.inner.compact(ctx).await + } } /// Storage related config for region. @@ -174,18 +178,6 @@ pub struct StoreConfig { pub type RecoveredMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata)); pub type RecoveredMetadataMap = BTreeMap; -#[derive(Debug)] -pub struct CompactContext { - /// Whether to wait the compaction result. - pub wait: bool, -} - -impl Default for CompactContext { - fn default() -> CompactContext { - CompactContext { wait: true } - } -} - impl RegionImpl { /// Create a new region and also persist the region metadata to manifest. /// @@ -557,7 +549,7 @@ impl RegionImpl { } /// Compact the region manually. - pub async fn compact(&self, ctx: CompactContext) -> Result<()> { + pub async fn compact(&self, ctx: &CompactContext) -> Result<()> { self.inner.compact(ctx).await } @@ -765,7 +757,7 @@ impl RegionInner { } /// Compact the region manually. - async fn compact(&self, compact_ctx: CompactContext) -> Result<()> { + async fn compact(&self, compact_ctx: &CompactContext) -> Result<()> { self.writer .compact(WriterCompactRequest { shared_data: self.shared.clone(), @@ -773,7 +765,7 @@ impl RegionInner { manifest: self.manifest.clone(), wal: self.wal.clone(), region_writer: self.writer.clone(), - compact_ctx, + compact_ctx: *compact_ctx, }) .await } diff --git a/src/storage/src/region/tests/compact.rs b/src/storage/src/region/tests/compact.rs index 9063206229..bf49fbe608 100644 --- a/src/storage/src/region/tests/compact.rs +++ b/src/storage/src/region/tests/compact.rs @@ -223,7 +223,7 @@ impl CompactionTester { // Trigger compaction and wait until it is done. self.base() .region - .compact(CompactContext::default()) + .compact(&CompactContext::default()) .await .unwrap(); } diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index 3e7acf4d39..dff799a51f 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -37,7 +37,9 @@ pub use self::engine::{ TwcsOptions, }; pub use self::metadata::RegionMeta; -pub use self::region::{CloseContext, FlushContext, FlushReason, Region, RegionStat, WriteContext}; +pub use self::region::{ + CloseContext, CompactContext, FlushContext, FlushReason, Region, RegionStat, WriteContext, +}; pub use self::requests::{ AddColumn, AlterOperation, AlterRequest, GetRequest, ScanRequest, WriteRequest, }; diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index 04e8de9688..ac6037e066 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -86,6 +86,8 @@ pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { /// Flush memtable of the region to disk. async fn flush(&self, ctx: &FlushContext) -> Result<(), Self::Error>; + + async fn compact(&self, ctx: &CompactContext) -> Result<(), Self::Error>; } #[derive(Default, Debug)] @@ -132,6 +134,18 @@ impl Default for FlushContext { } } +#[derive(Debug, Copy, Clone)] +pub struct CompactContext { + /// Whether to wait the compaction result. + pub wait: bool, +} + +impl Default for CompactContext { + fn default() -> CompactContext { + CompactContext { wait: true } + } +} + /// Reason of flush operation. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum FlushReason { diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 532241a49e..c5c99bdba9 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -285,6 +285,16 @@ pub struct FlushTableRequest { pub wait: Option, } +#[derive(Debug, Clone, Default)] +pub struct CompactTableRequest { + pub catalog_name: String, + pub schema_name: String, + pub table_name: Option, + pub region_number: Option, + /// Wait until the compaction is done. + pub wait: Option, +} + #[macro_export] macro_rules! meter_insert_request { ($req: expr) => { diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 8b63f33b5b..34d7265a9b 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -122,6 +122,14 @@ pub trait Table: Send + Sync { fn statistics(&self) -> Option { None } + + async fn compact(&self, region_number: Option, wait: Option) -> Result<()> { + let _ = (region_number, wait); + UnsupportedSnafu { + operation: "COMPACTION", + } + .fail()? + } } pub type TableRef = Arc;