Compare commits

...

25 Commits

Author SHA1 Message Date
Lei, HUANG
68593ae92a ### Added Deduplication and Merge Functionality
- Introduced `DedupReader` and `MergeReader` in `src/mito2/src/read/sync/dedup.rs` and `src/mito2/src/read/sync/merge.rs` to handle deduplication and merging of sorted batches.
 ### Enhanced `BulkMemtable` Iteration
 - Updated `BulkMemtable` in `src/mito2/src/memtable/bulk.rs` to support deduplication and merge modes during iteration.
 - Added `BulkIterContext` to manage iteration context.
 ### Testing Enhancements
 - Added comprehensive tests for `BulkMemtable` and `BulkPart` in `src/mito2/src/memtable/bulk.rs` and `src/mito2/src/memtable/bulk/part.rs`.
 ### Code Refactoring
 - Made `BulkPart` and `BulkPartMeta` cloneable in `src/mito2/src/memtable/bulk/part.rs`.
 - Exposed internal test modules for better test coverage in `src/mito2/src/memtable/time_series.rs` and `src/mito2/src/read/merge.rs`.
 ### New Modules
 - Created `sync` module in `src/mito2/src/read.rs` to organize synchronous read operations.
2025-02-20 08:54:44 +00:00
evenyag
91d755d9b5 feat: use spawn 2025-02-13 22:49:02 +08:00
Lei, HUANG
2566d254ad poc-write-path:
**Enhance BulkPart Metadata Handling**

 - Updated `BulkMemtable` in `bulk.rs` to track and update `max_sequence`, `max_timestamp`, `min_timestamp`, and `num_rows` using `BulkPart` metadata.
 - Extended `BulkPartMeta` in `bulk/part.rs` to include `max_sequence`.
 - Modified `mutations_to_record_batch` function to return `max_sequence` along with timestamps in `bulk/part.rs`.
 - Adjusted `BulkPartEncoder` to handle the new `max_sequence` metadata in `bulk/part.rs`.
2025-02-10 15:14:45 +00:00
WenyXu
0ec4ed804d fix: fix alter logic bug 2025-02-10 14:24:13 +00:00
evenyag
cc435234a4 chore: error handling in handle_batch_body 2025-02-10 21:38:10 +08:00
WenyXu
9c4aa81f85 fix: fix add column validation 2025-02-10 13:28:30 +00:00
evenyag
bdbb5435ea feat: region server use handle_batch_body 2025-02-10 21:20:14 +08:00
WenyXu
fd9940a253 fix: fix compile 2025-02-10 13:18:01 +00:00
WenyXu
e0bafd661c chore: unused error 2025-02-10 12:57:55 +00:00
WenyXu
99baa86b6a feat(metric-engine): introduce batch alter request handling 2025-02-10 12:57:54 +00:00
WenyXu
76d69901ea feat(metric-engine): introduce batch create request handling 2025-02-10 12:57:04 +00:00
WenyXu
764a57b80a refactor: introduce RegionRequestBundle::Vector 2025-02-10 12:55:30 +00:00
WenyXu
95b388d819 refactor: refactor region server requests handling 2025-02-10 12:55:28 +00:00
evenyag
c2b556e321 feat: batch put logical regions 2025-02-10 19:32:21 +08:00
evenyag
06ebe6b3fb feat: handle body 2025-02-10 19:32:14 +08:00
Lei, HUANG
bec8245e75 poc-write-path: Enhance Memtable Handling with Primary Key Encoding
• Introduced PrimaryKeyEncoding to differentiate between dense and sparse primary key encodings.
 • Updated BulkMemtableBuilder to conditionally create memtables based on primary key encoding.
 • Integrated PartitionTreeMemtableBuilder as a fallback for dense encodings.
 • Modified RegionWriteCtx to handle mutations differently based on primary key encoding.
 • Adjusted RegionWorkerLoop to skip bulk encoding for dense primary key mutations.
 • Refactored SparseEncoder to support conditional compilation for testing purposes.
2025-02-10 14:38:11 +08:00
Lei, HUANG
3cb2343f7f poc-write-path:
### Implement Sparse Primary Key Encoding

 - **Added `SparseEncoder`**: Introduced a new module `encoder.rs` to implement sparse primary key encoding, replacing the previous dense encoding approach.
 - **Updated `BulkPartEncoder`**: Modified `BulkPartEncoder` in `bulk/part.rs` to utilize `SparseEncoder` for encoding primary keys.
 - **Refactored `PartitionTree`**: Updated `partition_tree/tree.rs` to use the new `SparseEncoder` for primary key encoding.
 - **Code Adjustments**: Removed redundant code and adjusted imports in `key_values.rs` and `partition_tree/tree.rs` to align with the new encoding strategy.
2025-02-08 08:10:16 +00:00
Lei, HUANG
d10c207371 poc-write-path:
Add allocation tracking to `BulkMemtable` methods

 - Updated `write_bulk` in `bulk.rs` to track memory allocation using `alloc_tracker.on_allocation`.
 - Modified `freeze` in `bulk.rs` to signal completion of allocation with `alloc_tracker.done_allocating`.
2025-02-06 09:17:37 +00:00
Lei, HUANG
1a73a40bd9 Merge remote-tracking branch 'GreptimeTeam/poc-write-path' into poc-write-path 2025-02-06 08:55:39 +00:00
evenyag
713a73e9b2 feat: encode bulk before writing wal
* store bulk in wal
* write the BulkPart to the memtable directly
2025-02-06 16:36:41 +08:00
Lei, HUANG
65a88a63db poc-write-path:
**feat(memtable): Add BulkMemtableBuilder and BulkMemtable**

 - Introduced `BulkMemtableBuilder` and `BulkMemtable` in `memtable.rs` and `bulk.rs` to support bulk operations.
 - Added environment variable check for `enable_bulk_memtable` to conditionally use `BulkMemtableBuilder`.
 - Implemented `MemtableBuilder` for `BulkMemtableBuilder` and `Memtable` for `BulkMemtable`.
 - Included new fields `dedup` and `merge_mode` in `BulkMemtable` to handle deduplication and merge operations.
 - Temporarily disabled reads in `BulkMemtable` with `EmptyIter` as a placeholder iterator.
2025-02-06 07:36:37 +00:00
Lei, HUANG
5ad1436a8f Merge remote-tracking branch 'GreptimeTeam/poc-write-path' into poc-write-path 2025-02-06 06:44:33 +00:00
Lei, HUANG
ae59206caf feat: impl stats 2025-02-06 06:38:44 +00:00
evenyag
094d0fcdf5 feat: add bulk to wal Mutation 2025-02-05 21:16:18 +08:00
evenyag
7170120de6 chore: update proto (add bulk to Mutation) 2025-02-05 21:05:01 +08:00
42 changed files with 2478 additions and 323 deletions

2
Cargo.lock generated
View File

@@ -4723,7 +4723,7 @@ dependencies = [
[[package]] [[package]]
name = "greptime-proto" name = "greptime-proto"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=683e9d10ae7f3dfb8aaabd89082fc600c17e3795#683e9d10ae7f3dfb8aaabd89082fc600c17e3795" source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1915576b113a494f5352fd61f211d899b7f87aab#1915576b113a494f5352fd61f211d899b7f87aab"
dependencies = [ dependencies = [
"prost 0.13.3", "prost 0.13.3",
"serde", "serde",

View File

@@ -127,7 +127,8 @@ etcd-client = "0.14"
fst = "0.4.7" fst = "0.4.7"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "683e9d10ae7f3dfb8aaabd89082fc600c17e3795" } # branch: poc-write-path
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1915576b113a494f5352fd61f211d899b7f87aab" }
hex = "0.4" hex = "0.4"
http = "1" http = "1"
humantime = "2.1" humantime = "2.1"

View File

@@ -260,6 +260,13 @@ pub enum Error {
source: BoxedError, source: BoxedError,
}, },
#[snafu(display("Failed to handle batch request"))]
HandleBatchRequest {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("RegionId {} not found", region_id))] #[snafu(display("RegionId {} not found", region_id))]
RegionNotFound { RegionNotFound {
region_id: RegionId, region_id: RegionId,
@@ -438,7 +445,8 @@ impl ErrorExt for Error {
UnsupportedOutput { .. } => StatusCode::Unsupported, UnsupportedOutput { .. } => StatusCode::Unsupported,
HandleRegionRequest { source, .. } HandleRegionRequest { source, .. }
| GetRegionMetadata { source, .. } | GetRegionMetadata { source, .. }
| HandleBatchOpenRequest { source, .. } => source.status_code(), | HandleBatchOpenRequest { source, .. }
| HandleBatchRequest { source, .. } => source.status_code(),
StopRegionEngine { source, .. } => source.status_code(), StopRegionEngine { source, .. } => source.status_code(),
FindLogicalRegions { source, .. } => source.status_code(), FindLogicalRegions { source, .. } => source.status_code(),

View File

@@ -38,7 +38,7 @@ use datafusion::datasource::{provider_as_source, TableProvider};
use datafusion::error::Result as DfResult; use datafusion::error::Result as DfResult;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_expr::{LogicalPlan, TableSource}; use datafusion_expr::{LogicalPlan, TableSource};
use futures_util::future::try_join_all; use futures::future::try_join_all;
use metric_engine::engine::MetricEngine; use metric_engine::engine::MetricEngine;
use mito2::engine::MITO_ENGINE_NAME; use mito2::engine::MITO_ENGINE_NAME;
use prost::Message; use prost::Message;
@@ -59,7 +59,8 @@ use store_api::region_engine::{
SettableRegionRoleState, SettableRegionRoleState,
}; };
use store_api::region_request::{ use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest, convert_body_to_requests, AffectedRows, BatchRegionRequest, RegionCloseRequest,
RegionOpenRequest, RegionPutRequest, RegionRequest, RegionRequestBundle,
}; };
use store_api::storage::RegionId; use store_api::storage::RegionId;
use tokio::sync::{Semaphore, SemaphorePermit}; use tokio::sync::{Semaphore, SemaphorePermit};
@@ -70,8 +71,9 @@ use crate::error::{
self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu, self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu, ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu, ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu,
HandleRegionRequestSnafu, NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, HandleBatchRequestSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu,
RegionNotReadySnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, Result,
StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
}; };
use crate::event_listener::RegionServerEventListenerRef; use crate::event_listener::RegionServerEventListenerRef;
@@ -158,6 +160,18 @@ impl RegionServer {
self.inner.handle_request(region_id, request).await self.inner.handle_request(region_id, request).await
} }
#[tracing::instrument(skip_all, fields(request_type = "Put"))]
pub async fn handle_batch_body(&self, body: region_request::Body) -> Result<RegionResponse> {
self.inner.handle_batch_body(body).await
}
pub async fn handle_batch_request(
&self,
batch_request: BatchRegionRequest,
) -> Result<RegionResponse> {
self.inner.handle_batch_request(batch_request).await
}
async fn table_provider(&self, region_id: RegionId) -> Result<Arc<dyn TableProvider>> { async fn table_provider(&self, region_id: RegionId) -> Result<Arc<dyn TableProvider>> {
let status = self let status = self
.inner .inner
@@ -344,62 +358,41 @@ impl RegionServer {
.region_map .region_map
.insert(region_id, RegionEngineWithStatus::Ready(engine)); .insert(region_id, RegionEngineWithStatus::Ready(engine));
} }
}
#[async_trait] async fn handle_single_request(
impl RegionServerHandler for RegionServer { &self,
async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> { region_id: RegionId,
let is_parallel = matches!( request: RegionRequest,
request, ) -> Result<RegionResponse> {
region_request::Body::Inserts(_) | region_request::Body::Deletes(_) let tracing_context = TracingContext::from_current_span();
); let span = tracing_context.attach(info_span!(
let requests = RegionRequest::try_from_request_body(request) "RegionServer::handle_region_request",
.context(BuildRegionRequestsSnafu) region_id = region_id.to_string()
.map_err(BoxedError::new) ));
.context(ExecuteGrpcRequestSnafu)?; self.handle_request(region_id, request).trace(span).await
}
async fn handle_vector_request(
&self,
requests: Vec<(RegionId, RegionRequest)>,
) -> Result<RegionResponse> {
let tracing_context = TracingContext::from_current_span(); let tracing_context = TracingContext::from_current_span();
let results = if is_parallel { let join_tasks = requests.into_iter().map(|(region_id, req)| {
let join_tasks = requests.into_iter().map(|(region_id, req)| { let self_to_move = self.clone();
let self_to_move = self.clone(); let span = tracing_context.attach(info_span!(
let span = tracing_context.attach(info_span!( "RegionServer::handle_region_request",
"RegionServer::handle_region_request", region_id = region_id.to_string()
region_id = region_id.to_string() ));
)); async move {
async move { self_to_move
self_to_move
.handle_request(region_id, req)
.trace(span)
.await
}
});
try_join_all(join_tasks)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?
} else {
let mut results = Vec::with_capacity(requests.len());
// FIXME(jeremy, ruihang): Once the engine supports merged calls, we should immediately
// modify this part to avoid inefficient serial loop calls.
for (region_id, req) in requests {
let span = tracing_context.attach(info_span!(
"RegionServer::handle_region_request",
region_id = region_id.to_string()
));
let result = self
.handle_request(region_id, req) .handle_request(region_id, req)
.trace(span) .trace(span)
.await .await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
results.push(result);
} }
results });
};
// merge results by sum up affected rows and merge extensions. let results = try_join_all(join_tasks).await?;
let mut affected_rows = 0; let mut affected_rows = 0;
let mut extensions = HashMap::new(); let mut extensions = HashMap::new();
for result in results { for result in results {
@@ -407,6 +400,57 @@ impl RegionServerHandler for RegionServer {
extensions.extend(result.extensions); extensions.extend(result.extensions);
} }
Ok(RegionResponse {
affected_rows,
extensions,
})
}
}
#[async_trait]
impl RegionServerHandler for RegionServer {
async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
if matches!(request, region_request::Body::Inserts(_)) {
let resp = self
.handle_batch_body(request)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
return Ok(RegionResponseV1 {
header: Some(ResponseHeader {
status: Some(Status {
status_code: StatusCode::Success as _,
..Default::default()
}),
}),
affected_rows: resp.affected_rows as _,
extensions: resp.extensions,
});
}
let bundle = convert_body_to_requests(request)
.context(BuildRegionRequestsSnafu)
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
let result = match bundle {
RegionRequestBundle::Single((region_id, request)) => self
.handle_single_request(region_id, request)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?,
RegionRequestBundle::Vector(requests) => self
.handle_vector_request(requests)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?,
RegionRequestBundle::Batch(requests) => self
.handle_batch_request(requests)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?,
};
Ok(RegionResponseV1 { Ok(RegionResponseV1 {
header: Some(ResponseHeader { header: Some(ResponseHeader {
status: Some(Status { status: Some(Status {
@@ -414,8 +458,8 @@ impl RegionServerHandler for RegionServer {
..Default::default() ..Default::default()
}), }),
}), }),
affected_rows: affected_rows as _, affected_rows: result.affected_rows as _,
extensions, extensions: result.extensions,
}) })
} }
} }
@@ -727,6 +771,72 @@ impl RegionServerInner {
.collect::<Vec<_>>()) .collect::<Vec<_>>())
} }
// Handle requests in batch.
//
// limitation: all create requests must be in the same engine.
pub async fn handle_batch_request(
&self,
batch_request: BatchRegionRequest,
) -> Result<RegionResponse> {
let region_changes = match &batch_request {
BatchRegionRequest::Create(requests) => requests
.iter()
.map(|(region_id, create)| {
let attribute = parse_region_attribute(&create.engine, &create.options)?;
Ok((*region_id, RegionChange::Register(attribute)))
})
.collect::<Result<Vec<_>>>()?,
BatchRegionRequest::Drop(requests) => requests
.iter()
.map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
.collect::<Vec<_>>(),
BatchRegionRequest::Alter(requests) => requests
.iter()
.map(|(region_id, _)| (*region_id, RegionChange::None))
.collect::<Vec<_>>(),
BatchRegionRequest::Put(requests) => requests
.iter()
.map(|(region_id, _)| (*region_id, RegionChange::None))
.collect::<Vec<_>>(),
};
let (first_region_id, first_region_change) = region_changes.first().unwrap();
let engine = match self.get_engine(*first_region_id, first_region_change)? {
CurrentEngine::Engine(engine) => engine,
CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
};
for (region_id, region_change) in region_changes.iter() {
self.set_region_status_not_ready(*region_id, &engine, region_change);
}
let result = engine
.handle_batch_request(batch_request)
.await
.context(HandleBatchRequestSnafu {});
match result {
Ok(result) => {
for (region_id, region_change) in region_changes {
self.set_region_status_ready(region_id, engine.clone(), region_change)
.await?;
}
Ok(RegionResponse {
affected_rows: result.affected_rows,
extensions: result.extensions,
})
}
Err(err) => {
for (region_id, region_change) in region_changes {
self.unset_region_status(region_id, region_change);
}
Err(err)
}
}
}
pub async fn handle_request( pub async fn handle_request(
&self, &self,
region_id: RegionId, region_id: RegionId,
@@ -786,6 +896,71 @@ impl RegionServerInner {
} }
} }
async fn handle_batch_body(&self, body: region_request::Body) -> Result<RegionResponse> {
let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
.with_label_values(&["Put"])
.start_timer();
// Group requests by engine.
let mut engine_requests: HashMap<
String,
(RegionEngineRef, Vec<(RegionId, RegionPutRequest)>),
> = HashMap::with_capacity(1);
match body {
region_request::Body::Inserts(inserts) => {
let num_requests = inserts.requests.len();
for request in inserts.requests {
let region_id = RegionId::from_u64(request.region_id);
let CurrentEngine::Engine(engine) =
self.get_engine(region_id, &RegionChange::None)?
else {
continue;
};
let Some(rows) = request.rows else {
continue;
};
match engine_requests.get_mut(engine.name()) {
Some((_, requests)) => {
requests.push((region_id, RegionPutRequest { rows, hint: None }))
}
None => {
let mut requests = Vec::with_capacity(num_requests);
requests.push((region_id, RegionPutRequest { rows, hint: None }));
engine_requests.insert(engine.name().to_string(), (engine, requests));
}
}
}
}
_ => unreachable!(),
}
for (_, (engine, request)) in engine_requests {
engine
.handle_batch_request(BatchRegionRequest::Put(request))
.await
.context(HandleBatchRequestSnafu)?;
}
// match engine
// .handle_request(region_id, request)
// .await
// .with_context(|_| HandleRegionRequestSnafu { region_id })
// {
// Ok(result) => {
// Ok(RegionResponse {
// affected_rows: result.affected_rows,
// extensions: result.extensions,
// })
// }
// Err(err) => {
// Err(err)
// }
// }
Ok(RegionResponse::new(0))
}
fn set_region_status_not_ready( fn set_region_status_not_ready(
&self, &self,
region_id: RegionId, region_id: RegionId,

View File

@@ -42,7 +42,7 @@ use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse,
SettableRegionRoleState, SettableRegionRoleState,
}; };
use store_api::region_request::RegionRequest; use store_api::region_request::{BatchRegionRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest}; use store_api::storage::{RegionId, ScanRequest};
use self::state::MetricEngineState; use self::state::MetricEngineState;
@@ -127,6 +127,60 @@ impl RegionEngine for MetricEngine {
METRIC_ENGINE_NAME METRIC_ENGINE_NAME
} }
async fn handle_batch_request(
&self,
batch_request: BatchRegionRequest,
) -> Result<RegionResponse, BoxedError> {
match batch_request {
BatchRegionRequest::Put(requests) => {
let rows = self
.inner
.batch_put_region(requests)
.await
.map_err(BoxedError::new)?;
Ok(RegionResponse {
affected_rows: rows,
extensions: HashMap::new(),
})
}
BatchRegionRequest::Create(requests) => {
let mut extension_return_value = HashMap::new();
let rows = self
.inner
.create_regions(requests, &mut extension_return_value)
.await
.map_err(BoxedError::new)?;
Ok(RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
})
}
BatchRegionRequest::Alter(requests) => {
let mut extension_return_value = HashMap::new();
let rows = self
.inner
.alter_regions(requests, &mut extension_return_value)
.await
.map_err(BoxedError::new)?;
Ok(RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
})
}
BatchRegionRequest::Drop(requests) => {
self.handle_requests(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
)
.await
}
}
}
/// Handles non-query request to the region. Returns the count of affected rows. /// Handles non-query request to the region. Returns the count of affected rows.
async fn handle_request( async fn handle_request(
&self, &self,
@@ -258,6 +312,24 @@ impl RegionEngine for MetricEngine {
} }
impl MetricEngine { impl MetricEngine {
async fn handle_requests(
&self,
requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
) -> Result<RegionResponse, BoxedError> {
let mut affected_rows = 0;
let mut extensions = HashMap::new();
for (region_id, request) in requests {
let response = self.handle_request(region_id, request).await?;
affected_rows += response.affected_rows;
extensions.extend(response.extensions);
}
Ok(RegionResponse {
affected_rows,
extensions,
})
}
pub fn new(mito: MitoEngine, config: EngineConfig) -> Self { pub fn new(mito: MitoEngine, config: EngineConfig) -> Self {
let metadata_region = MetadataRegion::new(mito.clone()); let metadata_region = MetadataRegion::new(mito.clone());
let data_region = DataRegion::new(mito.clone()); let data_region = DataRegion::new(mito.clone());

View File

@@ -12,15 +12,23 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::collections::HashMap; mod extract_new_columns;
mod validate;
use std::collections::{HashMap, HashSet};
use common_telemetry::error; use common_telemetry::error;
use extract_new_columns::extract_new_columns;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata; use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest}; use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest};
use store_api::storage::RegionId; use store_api::storage::RegionId;
use validate::validate_alter_region_requests;
use crate::engine::create::{
add_columns_to_physical_data_region, add_logical_regions_to_meta_region,
};
use crate::engine::MetricEngineInner; use crate::engine::MetricEngineInner;
use crate::error::{ use crate::error::{
LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
@@ -28,6 +36,133 @@ use crate::error::{
use crate::utils::{to_data_region_id, to_metadata_region_id}; use crate::utils::{to_data_region_id, to_metadata_region_id};
impl MetricEngineInner { impl MetricEngineInner {
pub async fn alter_regions(
&self,
requests: Vec<(RegionId, RegionAlterRequest)>,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<AffectedRows> {
if requests.is_empty() {
return Ok(0);
}
let first_region_id = &requests.first().unwrap().0;
if self.is_physical_region(*first_region_id) {
for (region_id, request) in requests {
self.alter_physical_region(region_id, request).await?;
}
} else {
self.alter_logical_regions(requests, extension_return_value)
.await?;
}
Ok(0)
}
/// Alter multiple logical regions on the same physical region.
pub async fn alter_logical_regions(
&self,
requests: Vec<(RegionId, RegionAlterRequest)>,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<AffectedRows> {
validate_alter_region_requests(&requests)?;
let first_logical_region_id = requests[0].0;
// Finds new columns to add
let mut new_column_names = HashSet::new();
let mut new_columns_to_add = vec![];
let (physical_region_id, index_options) = {
let state = &self.state.read().unwrap();
let physical_region_id = state
.get_physical_region_id(first_logical_region_id)
.with_context(|| {
error!("Trying to alter an nonexistent region {first_logical_region_id}");
LogicalRegionNotFoundSnafu {
region_id: first_logical_region_id,
}
})?;
let region_state = state
.physical_region_states()
.get(&physical_region_id)
.with_context(|| PhysicalRegionNotFoundSnafu {
region_id: physical_region_id,
})?;
let physical_columns = region_state.physical_columns();
extract_new_columns(
&requests,
physical_columns,
&mut new_column_names,
&mut new_columns_to_add,
)?;
(physical_region_id, region_state.options().index)
};
let data_region_id = to_data_region_id(physical_region_id);
add_columns_to_physical_data_region(
data_region_id,
index_options,
&mut new_columns_to_add,
&self.data_region,
)
.await?;
let physical_columns = self.data_region.physical_columns(data_region_id).await?;
let physical_schema_map = physical_columns
.iter()
.map(|metadata| (metadata.column_schema.name.as_str(), metadata))
.collect::<HashMap<_, _>>();
let logical_region_columns = requests.iter().map(|(region_id, request)| {
let AlterKind::AddColumns { columns } = &request.kind else {
unreachable!()
};
(
*region_id,
columns
.iter()
.map(|col| {
let column_name = col.column_metadata.column_schema.name.as_str();
let column_metadata = *physical_schema_map.get(column_name).unwrap();
(column_name, column_metadata)
})
.collect::<HashMap<_, _>>(),
)
});
let new_add_columns = new_columns_to_add.iter().map(|metadata| {
// Safety: previous steps ensure the physical region exist
let column_metadata = *physical_schema_map
.get(metadata.column_schema.name.as_str())
.unwrap();
(
metadata.column_schema.name.to_string(),
column_metadata.column_id,
)
});
// Writes logical regions metadata to metadata region
add_logical_regions_to_meta_region(
&self.metadata_region,
physical_region_id,
false,
logical_region_columns,
)
.await?;
extension_return_value.insert(
ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
);
let mut state = self.state.write().unwrap();
state.add_physical_columns(data_region_id, new_add_columns);
state.invalid_logical_regions_cache(requests.iter().map(|(region_id, _)| *region_id));
Ok(0)
}
/// Dispatch region alter request /// Dispatch region alter request
pub async fn alter_region( pub async fn alter_region(
&self, &self,

View File

@@ -0,0 +1,51 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{AlterKind, RegionAlterRequest};
use store_api::storage::{ColumnId, RegionId};
use crate::error::Result;
/// Extract new columns from the create requests.
///
/// # Panics
///
/// This function will panic if the alter kind is not `AddColumns`.
pub fn extract_new_columns<'a>(
requests: &'a [(RegionId, RegionAlterRequest)],
physical_columns: &HashMap<String, ColumnId>,
new_column_names: &mut HashSet<&'a str>,
new_columns: &mut Vec<ColumnMetadata>,
) -> Result<()> {
for (_, request) in requests {
let AlterKind::AddColumns { columns } = &request.kind else {
unreachable!()
};
for col in columns {
let column_name = col.column_metadata.column_schema.name.as_str();
if !physical_columns.contains_key(column_name)
&& !new_column_names.contains(column_name)
{
new_column_names.insert(column_name);
// TODO(weny): avoid clone
new_columns.push(col.column_metadata.clone());
}
}
}
Ok(())
}

View File

@@ -0,0 +1,33 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::ensure;
use store_api::region_request::{AlterKind, RegionAlterRequest};
use store_api::storage::RegionId;
use crate::error::{Result, UnsupportedAlterKindSnafu};
/// Validate the alter region requests.
pub fn validate_alter_region_requests(requests: &[(RegionId, RegionAlterRequest)]) -> Result<()> {
for (_, request) in requests {
ensure!(
matches!(request.kind, AlterKind::AddColumns { .. }),
UnsupportedAlterKindSnafu {
kind: request.kind.as_ref()
}
);
}
Ok(())
}

View File

@@ -12,8 +12,15 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::collections::HashMap; mod add_columns;
mod add_logical_regions;
mod extract_new_columns;
mod validate;
use std::collections::{HashMap, HashSet};
pub(crate) use add_columns::add_columns_to_physical_data_region;
pub(crate) use add_logical_regions::add_logical_regions_to_meta_region;
use api::v1::SemanticType; use api::v1::SemanticType;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_telemetry::{info, warn}; use common_telemetry::{info, warn};
@@ -39,7 +46,9 @@ use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest}; use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
use store_api::storage::consts::ReservedColumnId; use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId; use store_api::storage::RegionId;
use validate::validate_create_logical_regions;
use crate::engine::create::extract_new_columns::extract_new_columns;
use crate::engine::options::{set_data_region_options, IndexOptions, PhysicalRegionOptions}; use crate::engine::options::{set_data_region_options, IndexOptions, PhysicalRegionOptions};
use crate::engine::MetricEngineInner; use crate::engine::MetricEngineInner;
use crate::error::{ use crate::error::{
@@ -50,9 +59,41 @@ use crate::error::{
Result, SerializeColumnMetadataSnafu, Result, SerializeColumnMetadataSnafu,
}; };
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT}; use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils::{to_data_region_id, to_metadata_region_id}; use crate::utils::{self, to_data_region_id, to_metadata_region_id};
impl MetricEngineInner { impl MetricEngineInner {
pub async fn create_regions(
&self,
requests: Vec<(RegionId, RegionCreateRequest)>,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<AffectedRows> {
if requests.is_empty() {
return Ok(0);
}
for (_, request) in requests.iter() {
Self::verify_region_create_request(request)?;
}
let first_request = &requests.first().unwrap().1;
if first_request.is_physical_table() {
for (region_id, request) in requests {
self.create_physical_region(region_id, request).await?;
}
return Ok(0);
} else if first_request
.options
.contains_key(LOGICAL_TABLE_METADATA_KEY)
{
self.create_logical_regions(requests, extension_return_value)
.await?;
} else {
return MissingRegionOptionSnafu {}.fail();
}
Ok(0)
}
/// Dispatch region creation request to physical region creation or logical /// Dispatch region creation request to physical region creation or logical
pub async fn create_region( pub async fn create_region(
&self, &self,
@@ -144,6 +185,116 @@ impl MetricEngineInner {
Ok(()) Ok(())
} }
/// Create multiple logical regions on the same physical region.
async fn create_logical_regions(
&self,
requests: Vec<(RegionId, RegionCreateRequest)>,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<()> {
let physical_region_id = validate_create_logical_regions(&requests)?;
let data_region_id = utils::to_data_region_id(physical_region_id);
// Filters out the requests that the logical region already exists
let requests = {
let state = self.state.read().unwrap();
let logical_region_exists = state.logical_region_exists_filter(data_region_id);
// TODO(weny): log the skipped logical regions
requests
.into_iter()
.filter(|(region_id, _)| !logical_region_exists(region_id))
.collect::<Vec<_>>()
};
// Finds new columns to add to physical region
let mut new_column_names = HashSet::new();
let mut new_columns = Vec::new();
let index_option = {
let state = &self.state.read().unwrap();
let region_state = state
.physical_region_states()
.get(&data_region_id)
.with_context(|| PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?;
let physical_columns = region_state.physical_columns();
extract_new_columns(
&requests,
physical_columns,
&mut new_column_names,
&mut new_columns,
)?;
region_state.options().index
};
// TODO(weny): we dont need to pass a mutable new_columns here.
add_columns_to_physical_data_region(
data_region_id,
index_option,
&mut new_columns,
&self.data_region,
)
.await?;
let physical_columns = self.data_region.physical_columns(data_region_id).await?;
let physical_schema_map = physical_columns
.iter()
.map(|metadata| (metadata.column_schema.name.as_str(), metadata))
.collect::<HashMap<_, _>>();
let logical_regions = requests
.iter()
.map(|(region_id, _)| (*region_id))
.collect::<Vec<_>>();
let logical_region_columns = requests.iter().map(|(region_id, request)| {
(
*region_id,
request
.column_metadatas
.iter()
.map(|metadata| {
// Safety: previous steps ensure the physical region exist
let column_metadata = *physical_schema_map
.get(metadata.column_schema.name.as_str())
.unwrap();
(metadata.column_schema.name.as_str(), column_metadata)
})
.collect::<HashMap<_, _>>(),
)
});
let new_add_columns = new_columns.iter().map(|metadata| {
// Safety: previous steps ensure the physical region exist
let column_metadata = *physical_schema_map
.get(metadata.column_schema.name.as_str())
.unwrap();
(
metadata.column_schema.name.to_string(),
column_metadata.column_id,
)
});
extension_return_value.insert(
ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
);
// Writes logical regions metadata to metadata region
add_logical_regions_to_meta_region(
&self.metadata_region,
physical_region_id,
true,
logical_region_columns,
)
.await?;
let mut state = self.state.write().unwrap();
state.add_physical_columns(data_region_id, new_add_columns);
state.add_logical_regions(physical_region_id, logical_regions);
Ok(())
}
/// Create a logical region. /// Create a logical region.
/// ///
/// Physical table and logical table can have multiple regions, and their /// Physical table and logical table can have multiple regions, and their
@@ -293,16 +444,16 @@ impl MetricEngineInner {
new_columns: &mut [ColumnMetadata], new_columns: &mut [ColumnMetadata],
index_options: IndexOptions, index_options: IndexOptions,
) -> Result<()> { ) -> Result<()> {
// alter data region
self.data_region
.add_columns(data_region_id, new_columns, index_options)
.await?;
// Return early if no new columns are added. // Return early if no new columns are added.
if new_columns.is_empty() { if new_columns.is_empty() {
return Ok(()); return Ok(());
} }
// alter data region
self.data_region
.add_columns(data_region_id, new_columns, index_options)
.await?;
// correct the column id // correct the column id
let after_alter_physical_schema = self.data_region.physical_columns(data_region_id).await?; let after_alter_physical_schema = self.data_region.physical_columns(data_region_id).await?;
let after_alter_physical_schema_map = after_alter_physical_schema let after_alter_physical_schema_map = after_alter_physical_schema

View File

@@ -0,0 +1,42 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;
use crate::data_region::DataRegion;
use crate::engine::IndexOptions;
use crate::error::Result;
use crate::metrics::PHYSICAL_COLUMN_COUNT;
/// Add new columns to the physical data region.
pub(crate) async fn add_columns_to_physical_data_region(
data_region_id: RegionId,
index_options: IndexOptions,
new_columns: &mut [ColumnMetadata],
data_region: &DataRegion,
) -> Result<()> {
// Return early if no new columns are added.
if new_columns.is_empty() {
return Ok(());
}
data_region
.add_columns(data_region_id, new_columns, index_options)
.await?;
PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _);
Ok(())
}

View File

@@ -0,0 +1,69 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::storage::RegionId;
use crate::error::{MitoWriteOperationSnafu, Result};
use crate::metadata_region::MetadataRegion;
use crate::utils;
/// Add logical regions to the metadata region.
pub async fn add_logical_regions_to_meta_region(
metadata_region: &MetadataRegion,
physical_region_id: RegionId,
write_region_id: bool,
logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
) -> Result<()> {
let region_id = utils::to_metadata_region_id(physical_region_id);
let iter =
logical_regions
.into_iter()
.flat_map(|(logical_region_id, column_metadatas)| {
if write_region_id {
Some((
MetadataRegion::concat_region_key(logical_region_id),
String::new(),
))
} else {
None
}
.into_iter()
.chain(column_metadatas.into_iter().map(
move |(name, column_metadata)| {
(
MetadataRegion::concat_column_key(logical_region_id, name),
MetadataRegion::serialize_column_metadata(column_metadata),
)
},
))
})
.collect::<Vec<_>>();
let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
metadata_region
.mito
.handle_request(
region_id,
store_api::region_request::RegionRequest::Put(put_request),
)
.await
.context(MitoWriteOperationSnafu)?;
Ok(())
}

View File

@@ -0,0 +1,51 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use api::v1::SemanticType;
use snafu::ensure;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::RegionCreateRequest;
use store_api::storage::{ColumnId, RegionId};
use crate::error::{AddingFieldColumnSnafu, Result};
/// Extract new columns from the create requests.
pub fn extract_new_columns<'a>(
requests: &'a [(RegionId, RegionCreateRequest)],
physical_columns: &HashMap<String, ColumnId>,
new_column_names: &mut HashSet<&'a str>,
new_columns: &mut Vec<ColumnMetadata>,
) -> Result<()> {
for (_, request) in requests {
for col in &request.column_metadatas {
if !physical_columns.contains_key(&col.column_schema.name)
&& !new_column_names.contains(&col.column_schema.name.as_str())
{
ensure!(
col.semantic_type != SemanticType::Field,
AddingFieldColumnSnafu {
name: col.column_schema.name.to_string(),
}
);
new_column_names.insert(&col.column_schema.name);
// TODO(weny): avoid clone
new_columns.push(col.clone());
}
}
}
Ok(())
}

View File

@@ -0,0 +1,57 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::{ensure, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::region_request::RegionCreateRequest;
use store_api::storage::RegionId;
use crate::error::{
ConflictRegionOptionSnafu, MissingRegionOptionSnafu, ParseRegionIdSnafu, Result,
};
/// Validate the create logical regions request.
///
/// Returns extracted physical region id from the first request.
pub fn validate_create_logical_regions(
requests: &[(RegionId, RegionCreateRequest)],
) -> Result<RegionId> {
let (_, request) = requests.first().unwrap();
let first_physical_region_id_raw = request
.options
.get(LOGICAL_TABLE_METADATA_KEY)
.ok_or(MissingRegionOptionSnafu {}.build())?;
let physical_region_id: RegionId = first_physical_region_id_raw
.parse::<u64>()
.with_context(|_| ParseRegionIdSnafu {
raw: first_physical_region_id_raw,
})?
.into();
// TODO(weny): Can we remove the check?
for (_, request) in requests.iter().skip(1) {
let physical_region_id_raw = request
.options
.get(LOGICAL_TABLE_METADATA_KEY)
.ok_or(MissingRegionOptionSnafu {}.build())?;
ensure!(
physical_region_id_raw == first_physical_region_id_raw,
ConflictRegionOptionSnafu {}
);
}
Ok(physical_region_id)
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::collections::HashMap;
use api::v1::{Rows, WriteHint}; use api::v1::{Rows, WriteHint};
use common_telemetry::{error, info}; use common_telemetry::{error, info};
use snafu::{ensure, OptionExt}; use snafu::{ensure, OptionExt};
@@ -50,6 +52,26 @@ impl MetricEngineInner {
} }
} }
/// Dispatch batch region put request
pub async fn batch_put_region(
&self,
requests: Vec<(RegionId, RegionPutRequest)>,
) -> Result<AffectedRows> {
{
let state = self.state.read().unwrap();
for region_id in requests.iter().map(|(region_id, _)| region_id) {
if state.physical_region_states().contains_key(region_id) {
info!("Metric region received put request on physical region {region_id:?}");
FORBIDDEN_OPERATION_COUNT.inc();
return ForbiddenPhysicalAlterSnafu.fail();
}
}
}
self.batch_put_logical_regions(requests).await
}
async fn put_logical_region( async fn put_logical_region(
&self, &self,
logical_region_id: RegionId, logical_region_id: RegionId,
@@ -98,6 +120,110 @@ impl MetricEngineInner {
self.data_region.write_data(data_region_id, request).await self.data_region.write_data(data_region_id, request).await
} }
async fn batch_put_logical_regions(
&self,
requests: Vec<(RegionId, RegionPutRequest)>,
) -> Result<AffectedRows> {
let _timer = MITO_OPERATION_ELAPSED
.with_label_values(&["put"])
.start_timer();
let mut physical_requests = HashMap::with_capacity(1);
// Group requests by physical region, also verify put requests.
{
let state = self.state.read().unwrap();
for (logical_region_id, request) in requests {
let physical_region_id = *state
.logical_regions()
.get(&logical_region_id)
.with_context(|| LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
})?;
let data_region_id = to_data_region_id(physical_region_id);
// Check if a physical column exists.
let physical_columns = state
.physical_region_states()
.get(&data_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?
.physical_columns();
for col in &request.rows.schema {
ensure!(
physical_columns.contains_key(&col.column_name),
ColumnNotFoundSnafu {
name: col.column_name.clone(),
region_id: logical_region_id,
}
);
}
physical_requests
.entry(physical_region_id)
.or_insert_with(Vec::new)
.push((logical_region_id, request));
}
}
let mut affected_rows = 0;
for (physical_region_id, mut requests) in physical_requests {
if requests.is_empty() {
continue;
}
let data_region_id = to_data_region_id(physical_region_id);
let primary_key_encoding = {
let state = self.state.read().unwrap();
state.get_primary_key_encoding(data_region_id).context(
PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
},
)?
};
for (logical_region_id, request) in &mut requests {
self.modify_rows(
physical_region_id,
logical_region_id.table_id(),
&mut request.rows,
primary_key_encoding,
)?;
}
let total_rows = requests
.iter()
.map(|(_, request)| request.rows.rows.len())
.sum::<usize>();
if primary_key_encoding == PrimaryKeyEncoding::Sparse {
let mut merged_request = RegionPutRequest {
rows: Rows {
schema: requests[0].1.rows.schema.clone(),
rows: Vec::with_capacity(total_rows),
},
hint: None,
};
merged_request.hint = Some(WriteHint {
primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
});
for (_, mut request) in requests {
merged_request.rows.rows.append(&mut request.rows.rows);
}
self.data_region
.write_data(data_region_id, merged_request)
.await?;
} else {
for (_, request) in requests {
self.data_region.write_data(data_region_id, request).await?;
}
}
affected_rows += total_rows;
}
Ok(affected_rows)
}
/// Verifies a put request for a logical region against its corresponding metadata region. /// Verifies a put request for a logical region against its corresponding metadata region.
/// ///
/// Includes: /// Includes:

View File

@@ -83,6 +83,18 @@ pub(crate) struct MetricEngineState {
} }
impl MetricEngineState { impl MetricEngineState {
pub fn logical_region_exists_filter(
&self,
physical_region_id: RegionId,
) -> impl for<'a> Fn(&'a RegionId) -> bool + use<'_> {
let state = self
.physical_region_states()
.get(&physical_region_id)
.unwrap();
move |logical_region_id| state.logical_regions().contains(logical_region_id)
}
pub fn add_physical_region( pub fn add_physical_region(
&mut self, &mut self,
physical_region_id: RegionId, physical_region_id: RegionId,
@@ -111,6 +123,31 @@ impl MetricEngineState {
} }
} }
/// # Panic
/// if the physical region does not exist
pub fn add_logical_regions(
&mut self,
physical_region_id: RegionId,
logical_region_ids: impl IntoIterator<Item = RegionId>,
) {
let physical_region_id = to_data_region_id(physical_region_id);
let state = self.physical_regions.get_mut(&physical_region_id).unwrap();
for logical_region_id in logical_region_ids {
state.logical_regions.insert(logical_region_id);
self.logical_regions
.insert(logical_region_id, physical_region_id);
}
}
pub fn invalid_logical_regions_cache(
&mut self,
logical_region_ids: impl IntoIterator<Item = RegionId>,
) {
for logical_region_id in logical_region_ids {
self.logical_columns.remove(&logical_region_id);
}
}
/// # Panic /// # Panic
/// if the physical region does not exist /// if the physical region does not exist
pub fn add_logical_region( pub fn add_logical_region(

View File

@@ -219,6 +219,13 @@ pub enum Error {
location: Location, location: Location,
}, },
#[snafu(display("Unsupported alter kind: {}", kind))]
UnsupportedAlterKind {
kind: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Multiple field column found: {} and {}", previous, current))] #[snafu(display("Multiple field column found: {} and {}", previous, current))]
MultipleFieldColumn { MultipleFieldColumn {
previous: String, previous: String,
@@ -263,7 +270,8 @@ impl ErrorExt for Error {
| MultipleFieldColumn { .. } | MultipleFieldColumn { .. }
| NoFieldColumn { .. } | NoFieldColumn { .. }
| AddingFieldColumn { .. } | AddingFieldColumn { .. }
| ParseRegionOptions { .. } => StatusCode::InvalidArguments, | ParseRegionOptions { .. }
| UnsupportedAlterKind { .. } => StatusCode::InvalidArguments,
ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => { ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => {
StatusCode::Unsupported StatusCode::Unsupported

View File

@@ -57,7 +57,7 @@ const COLUMN_PREFIX: &str = "__column_";
/// table id + region sequence. This handler will transform the region group by /// table id + region sequence. This handler will transform the region group by
/// itself. /// itself.
pub struct MetadataRegion { pub struct MetadataRegion {
mito: MitoEngine, pub(crate) mito: MitoEngine,
/// Logical lock for operations that need to be serialized. Like update & read region columns. /// Logical lock for operations that need to be serialized. Like update & read region columns.
/// ///
/// Region entry will be registered on creating and opening logical region, and deregistered on /// Region entry will be registered on creating and opening logical region, and deregistered on
@@ -474,6 +474,52 @@ impl MetadataRegion {
} }
} }
pub(crate) fn build_put_request_from_iter(
kv: impl Iterator<Item = (String, String)>,
) -> RegionPutRequest {
let cols = vec![
ColumnSchema {
column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
semantic_type: SemanticType::Timestamp as _,
..Default::default()
},
ColumnSchema {
column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String as _,
semantic_type: SemanticType::Tag as _,
..Default::default()
},
ColumnSchema {
column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String as _,
semantic_type: SemanticType::Field as _,
..Default::default()
},
];
let rows = Rows {
schema: cols,
rows: kv
.into_iter()
.map(|(key, value)| Row {
values: vec![
Value {
value_data: Some(ValueData::TimestampMillisecondValue(0)),
},
Value {
value_data: Some(ValueData::StringValue(key)),
},
Value {
value_data: Some(ValueData::StringValue(value)),
},
],
})
.collect(),
};
RegionPutRequest { rows, hint: None }
}
fn build_put_request(key: &str, value: &str) -> RegionPutRequest { fn build_put_request(key: &str, value: &str) -> RegionPutRequest {
let cols = vec![ let cols = vec![
ColumnSchema { ColumnSchema {

View File

@@ -276,6 +276,7 @@ impl CpuDataGenerator {
rows, rows,
}), }),
write_hint: None, write_hint: None,
bulk: Vec::new(),
}; };
KeyValues::new(&self.metadata, mutation).unwrap() KeyValues::new(&self.metadata, mutation).unwrap()

View File

@@ -16,6 +16,7 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt; use std::fmt;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
@@ -29,6 +30,7 @@ use table::predicate::Predicate;
use crate::config::MitoConfig; use crate::config::MitoConfig;
use crate::error::Result; use crate::error::Result;
use crate::flush::WriteBufferManagerRef; use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::BulkMemtableBuilder;
use crate::memtable::key_values::KeyValue; use crate::memtable::key_values::KeyValue;
pub use crate::memtable::key_values::KeyValues; pub use crate::memtable::key_values::KeyValues;
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder}; use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
@@ -40,6 +42,7 @@ use crate::region::options::{MemtableOptions, MergeMode};
use crate::sst::file::FileTimeRange; use crate::sst::file::FileTimeRange;
pub mod bulk; pub mod bulk;
mod encoder;
pub mod key_values; pub mod key_values;
pub mod partition_tree; pub mod partition_tree;
mod stats; mod stats;
@@ -290,6 +293,19 @@ impl MemtableBuilderProvider {
dedup: bool, dedup: bool,
merge_mode: MergeMode, merge_mode: MergeMode,
) -> MemtableBuilderRef { ) -> MemtableBuilderRef {
// todo(hl): make it an option
if std::env::var("enable_bulk_memtable")
.ok()
.and_then(|v| bool::from_str(&v).ok())
.unwrap_or(false)
{
return Arc::new(BulkMemtableBuilder::new(
self.write_buffer_manager.clone(),
dedup,
merge_mode,
));
}
match options { match options {
Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new( Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
self.write_buffer_manager.clone(), self.write_buffer_manager.clone(),

View File

@@ -14,18 +14,27 @@
//! Memtable implementation for bulk load //! Memtable implementation for bulk load
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef; use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber}; use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate; use table::predicate::Predicate;
use crate::error::Result; use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::context::BulkIterContext;
use crate::memtable::bulk::part::BulkPart; use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue; use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
use crate::memtable::{ use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, MemtableStats, AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRanges, MemtableRef, MemtableStats,
}; };
use crate::read::dedup::{LastNonNull, LastRow};
use crate::read::sync::dedup::DedupReader;
use crate::region::options::MergeMode;
#[allow(unused)] #[allow(unused)]
mod context; mod context;
@@ -34,10 +43,86 @@ pub(crate) mod part;
mod part_reader; mod part_reader;
mod row_group_reader; mod row_group_reader;
#[derive(Debug)]
pub struct BulkMemtableBuilder {
write_buffer_manager: Option<WriteBufferManagerRef>,
dedup: bool,
merge_mode: MergeMode,
fallback_builder: PartitionTreeMemtableBuilder,
}
impl MemtableBuilder for BulkMemtableBuilder {
fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
//todo(hl): create different memtables according to region type (metadata/physical)
if metadata.primary_key_encoding == PrimaryKeyEncoding::Dense {
self.fallback_builder.build(id, metadata)
} else {
Arc::new(BulkMemtable::new(
metadata.clone(),
id,
self.write_buffer_manager.clone(),
self.dedup,
self.merge_mode,
)) as MemtableRef
}
}
}
impl BulkMemtableBuilder {
pub fn new(
write_buffer_manager: Option<WriteBufferManagerRef>,
dedup: bool,
merge_mode: MergeMode,
) -> Self {
let builder = PartitionTreeMemtableBuilder::new(
PartitionTreeConfig::default(),
write_buffer_manager.clone(),
);
Self {
write_buffer_manager,
dedup,
merge_mode,
fallback_builder: builder,
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct BulkMemtable { pub struct BulkMemtable {
id: MemtableId, id: MemtableId,
parts: RwLock<Vec<BulkPart>>, parts: RwLock<Vec<BulkPart>>,
region_metadata: RegionMetadataRef,
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
max_sequence: AtomicU64,
num_rows: AtomicUsize,
dedup: bool,
merge_mode: MergeMode,
}
impl BulkMemtable {
pub fn new(
region_metadata: RegionMetadataRef,
id: MemtableId,
write_buffer_manager: Option<WriteBufferManagerRef>,
dedup: bool,
merge_mode: MergeMode,
) -> Self {
Self {
id,
parts: RwLock::new(vec![]),
region_metadata,
alloc_tracker: AllocTracker::new(write_buffer_manager),
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
max_sequence: Default::default(),
num_rows: Default::default(),
dedup,
merge_mode,
}
}
} }
impl Memtable for BulkMemtable { impl Memtable for BulkMemtable {
@@ -54,18 +139,56 @@ impl Memtable for BulkMemtable {
} }
fn write_bulk(&self, fragment: BulkPart) -> Result<()> { fn write_bulk(&self, fragment: BulkPart) -> Result<()> {
self.alloc_tracker.on_allocation(fragment.data.len());
let mut parts = self.parts.write().unwrap(); let mut parts = self.parts.write().unwrap();
let part_metadata = fragment.metadata();
if self.max_timestamp.load(Ordering::Relaxed) < part_metadata.max_timestamp {
self.max_timestamp
.store(part_metadata.max_timestamp, Ordering::Relaxed);
}
if self.min_timestamp.load(Ordering::Relaxed) > part_metadata.min_timestamp {
self.min_timestamp
.store(part_metadata.min_timestamp, Ordering::Relaxed);
}
if self.max_sequence.load(Ordering::Relaxed) < part_metadata.max_sequence {
self.max_sequence
.store(part_metadata.max_sequence, Ordering::Relaxed);
}
self.num_rows
.fetch_add(part_metadata.num_rows, Ordering::Relaxed);
parts.push(fragment); parts.push(fragment);
Ok(()) Ok(())
} }
fn iter( fn iter(
&self, &self,
_projection: Option<&[ColumnId]>, projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>, predicate: Option<Predicate>,
_sequence: Option<SequenceNumber>, sequence: Option<SequenceNumber>,
) -> Result<BoxedBatchIterator> { ) -> Result<BoxedBatchIterator> {
todo!() let mut readers = Vec::new();
let parts = self.parts.read().unwrap();
let ctx = Arc::new(BulkIterContext::new(
self.region_metadata.clone(),
&projection,
predicate.clone(),
));
for part in parts.as_slice() {
if let Some(reader) = part.read(ctx.clone(), sequence).unwrap() {
readers.push(reader);
}
}
let merge_reader = crate::read::sync::merge::MergeReader::new(readers)?;
let reader = match self.merge_mode {
MergeMode::LastRow => {
Box::new(DedupReader::new(merge_reader, LastRow::new(self.dedup))) as BoxedBatchIterator
}
MergeMode::LastNonNull => {
Box::new(DedupReader::new(merge_reader, LastNonNull::new(self.dedup))) as BoxedBatchIterator
}
};
Ok(reader )
} }
fn ranges( fn ranges(
@@ -82,17 +205,175 @@ impl Memtable for BulkMemtable {
} }
fn freeze(&self) -> Result<()> { fn freeze(&self) -> Result<()> {
self.alloc_tracker.done_allocating();
Ok(()) Ok(())
} }
fn stats(&self) -> MemtableStats { fn stats(&self) -> MemtableStats {
todo!() let estimated_bytes = self.alloc_tracker.bytes_allocated();
if estimated_bytes == 0 {
// no rows ever written
return MemtableStats {
estimated_bytes,
time_range: None,
num_rows: 0,
num_ranges: 0,
max_sequence: 0,
};
}
let ts_type = self
.region_metadata
.time_index_column()
.column_schema
.data_type
.clone()
.as_timestamp()
.expect("Timestamp column must have timestamp type");
let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
MemtableStats {
estimated_bytes,
time_range: Some((min_timestamp, max_timestamp)),
num_rows: self.num_rows.load(Ordering::Relaxed),
num_ranges: 1, //todo(hl): we should consider bulk parts as different ranges.
max_sequence: self.max_sequence.load(Ordering::Relaxed),
}
} }
fn fork(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef { fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(Self { Arc::new(Self::new(
metadata.clone(),
id, id,
parts: RwLock::new(vec![]), self.alloc_tracker.write_buffer_manager(),
}) self.dedup,
self.merge_mode,
))
}
}
#[cfg(test)]
mod tests {
use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{OpType, Row, Rows, SemanticType};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use std::sync::Arc;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::storage::RegionId;
use crate::memtable::bulk::part::BulkPartEncoder;
use crate::memtable::bulk::BulkMemtable;
use crate::memtable::{BulkPart, Memtable};
use crate::region::options::MergeMode;
fn metrics_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("k0", ConcreteDataType::binary_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v0", ConcreteDataType::float64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 3,
})
.primary_key(vec![0]);
let region_metadata = builder.build().unwrap();
Arc::new(region_metadata)
}
fn metrics_column_schema() -> Vec<api::v1::ColumnSchema> {
let schema = metrics_region_metadata();
schema
.column_metadatas
.iter()
.map(|c| api::v1::ColumnSchema {
column_name: c.column_schema.name.clone(),
datatype: ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
.unwrap()
.datatype() as i32,
semantic_type: c.semantic_type as i32,
..Default::default()
})
.collect()
}
fn build_metrics_bulk_part(
k: &str,
ts: &[i64],
v0: &[Option<f64>],
v1: &[Option<f64>],
seq: u64,
) -> BulkPart {
assert_eq!(ts.len(), v0.len());
assert_eq!(ts.len(), v1.len());
let rows = ts
.iter()
.zip(v0.iter())
.zip(v1.iter())
.map(|((ts, v0), v1)| Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::BinaryValue(k.as_bytes().to_vec())),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(*ts as i64)),
},
api::v1::Value {
value_data: v0.map(ValueData::F64Value),
},
api::v1::Value {
value_data: v1.map(ValueData::F64Value),
},
],
})
.collect::<Vec<_>>();
let mutation = api::v1::Mutation {
op_type: OpType::Put as i32,
sequence: seq,
rows: Some(Rows {
schema: metrics_column_schema(),
rows,
}),
write_hint: None,
bulk: Vec::new(),
};
let encoder = BulkPartEncoder::new(metrics_region_metadata(), true, 1024);
encoder.encode_mutations(&[mutation]).unwrap().unwrap()
}
#[test]
fn test_bulk_iter() {
let schema = metrics_region_metadata();
let memtable = BulkMemtable::new(schema, 0, None, true, MergeMode::LastRow);
memtable.write_bulk(build_metrics_bulk_part("a", &[1], &[None], &[Some(1.0)], 0)).unwrap();
// write duplicated rows
memtable.write_bulk(build_metrics_bulk_part("a", &[1], &[None], &[Some(1.0)], 0)).unwrap();
let iter = memtable.iter(None, None, None).unwrap();
let total_rows = iter.map(|b| {
b.unwrap().num_rows()
}).sum::<usize>();
assert_eq!(1, total_rows);
} }
} }

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use api::v1::Mutation; use api::v1::Mutation;
use bytes::Bytes; use bytes::Bytes;
use common_telemetry::error;
use common_time::timestamp::TimeUnit; use common_time::timestamp::TimeUnit;
use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder}; use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
use datatypes::arrow; use datatypes::arrow;
@@ -32,6 +33,7 @@ use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow_array::BinaryArray; use datatypes::arrow_array::BinaryArray;
use datatypes::data_type::DataType; use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector}; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
use datatypes::value::ValueRef;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::ArrowWriter; use parquet::arrow::ArrowWriter;
use parquet::data_type::AsBytes; use parquet::data_type::AsBytes;
@@ -46,16 +48,17 @@ use crate::error;
use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result}; use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result};
use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::BulkPartIter; use crate::memtable::bulk::part_reader::BulkPartIter;
use crate::memtable::encoder::{FieldWithId, SparseEncoder};
use crate::memtable::key_values::KeyValuesRef; use crate::memtable::key_values::KeyValuesRef;
use crate::memtable::BoxedBatchIterator; use crate::memtable::BoxedBatchIterator;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt}; use crate::row_converter::{PrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::to_sst_arrow_schema; use crate::sst::to_sst_arrow_schema;
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct BulkPart { pub struct BulkPart {
data: Bytes, pub(crate) data: Bytes,
metadata: BulkPartMeta, metadata: BulkPartMeta,
} }
@@ -92,7 +95,7 @@ impl BulkPart {
} }
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct BulkPartMeta { pub struct BulkPartMeta {
/// Total rows in part. /// Total rows in part.
pub num_rows: usize, pub num_rows: usize,
@@ -100,6 +103,8 @@ pub struct BulkPartMeta {
pub max_timestamp: i64, pub max_timestamp: i64,
/// Min timestamp in part. /// Min timestamp in part.
pub min_timestamp: i64, pub min_timestamp: i64,
/// Max sequence number in part.
pub max_sequence: u64,
/// Part file metadata. /// Part file metadata.
pub parquet_metadata: Arc<ParquetMetaData>, pub parquet_metadata: Arc<ParquetMetaData>,
/// Part region schema. /// Part region schema.
@@ -108,7 +113,7 @@ pub struct BulkPartMeta {
pub struct BulkPartEncoder { pub struct BulkPartEncoder {
metadata: RegionMetadataRef, metadata: RegionMetadataRef,
pk_encoder: DensePrimaryKeyCodec, pk_encoder: SparseEncoder,
row_group_size: usize, row_group_size: usize,
dedup: bool, dedup: bool,
writer_props: Option<WriterProperties>, writer_props: Option<WriterProperties>,
@@ -120,7 +125,7 @@ impl BulkPartEncoder {
dedup: bool, dedup: bool,
row_group_size: usize, row_group_size: usize,
) -> BulkPartEncoder { ) -> BulkPartEncoder {
let codec = DensePrimaryKeyCodec::new(&metadata); let encoder = SparseEncoder::new(&metadata);
let writer_props = Some( let writer_props = Some(
WriterProperties::builder() WriterProperties::builder()
.set_write_batch_size(row_group_size) .set_write_batch_size(row_group_size)
@@ -129,7 +134,7 @@ impl BulkPartEncoder {
); );
Self { Self {
metadata, metadata,
pk_encoder: codec, pk_encoder: encoder,
row_group_size, row_group_size,
dedup, dedup,
writer_props, writer_props,
@@ -139,9 +144,9 @@ impl BulkPartEncoder {
impl BulkPartEncoder { impl BulkPartEncoder {
/// Encodes mutations to a [BulkPart], returns true if encoded data has been written to `dest`. /// Encodes mutations to a [BulkPart], returns true if encoded data has been written to `dest`.
fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<BulkPart>> { pub(crate) fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<BulkPart>> {
let Some((arrow_record_batch, min_ts, max_ts)) = let Some((arrow_record_batch, min_ts, max_ts, max_sequence)) =
mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)? mutations_to_record_batch(mutations, &self.metadata, self.dedup)?
else { else {
return Ok(None); return Ok(None);
}; };
@@ -168,6 +173,7 @@ impl BulkPartEncoder {
num_rows: arrow_record_batch.num_rows(), num_rows: arrow_record_batch.num_rows(),
max_timestamp: max_ts, max_timestamp: max_ts,
min_timestamp: min_ts, min_timestamp: min_ts,
max_sequence,
parquet_metadata, parquet_metadata,
region_metadata: self.metadata.clone(), region_metadata: self.metadata.clone(),
}, },
@@ -179,9 +185,8 @@ impl BulkPartEncoder {
fn mutations_to_record_batch( fn mutations_to_record_batch(
mutations: &[Mutation], mutations: &[Mutation],
metadata: &RegionMetadataRef, metadata: &RegionMetadataRef,
pk_encoder: &DensePrimaryKeyCodec,
dedup: bool, dedup: bool,
) -> Result<Option<(RecordBatch, i64, i64)>> { ) -> Result<Option<(RecordBatch, i64, i64, u64)>> {
let total_rows: usize = mutations let total_rows: usize = mutations
.iter() .iter()
.map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0)) .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
@@ -206,18 +211,29 @@ fn mutations_to_record_batch(
.map(|f| f.column_schema.data_type.create_mutable_vector(total_rows)) .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
.collect(); .collect();
let mut pk_buffer = vec![]; let mut max_sequence = u64::MIN;
for m in mutations { for m in mutations {
let Some(key_values) = KeyValuesRef::new(metadata, m) else { let Some(key_values) = KeyValuesRef::new(metadata, m) else {
continue; continue;
}; };
for row in key_values.iter() { for row in key_values.iter() {
pk_buffer.clear(); assert_eq!(1, row.num_primary_keys());
pk_encoder.encode_to_vec(row.primary_keys(), &mut pk_buffer)?; let first_primary_key_col = row.primary_keys().next().unwrap();
pk_builder.append_value(pk_buffer.as_bytes());
let bytes = match first_primary_key_col {
ValueRef::Binary(b) => b,
_ => {
unreachable!(
"Primary key must be encoded binary type, found: {:?}",
first_primary_key_col
);
}
};
pk_builder.append_value(bytes);
ts_vector.push_value_ref(row.timestamp()); ts_vector.push_value_ref(row.timestamp());
sequence_builder.append_value(row.sequence()); sequence_builder.append_value(row.sequence());
max_sequence = max_sequence.max(row.sequence());
op_type_builder.append_value(row.op_type() as u8); op_type_builder.append_value(row.op_type() as u8);
for (builder, field) in field_builders.iter_mut().zip(row.fields()) { for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
builder.push_value_ref(field); builder.push_value_ref(field);
@@ -247,7 +263,9 @@ fn mutations_to_record_batch(
arrow_schema, arrow_schema,
}; };
sorter.sort().map(Some) sorter.sort().map(|(batch, min, max)|{
Some((batch, min, max, max_sequence))
})
} }
struct ArraysSorter<I> { struct ArraysSorter<I> {
@@ -263,7 +281,7 @@ struct ArraysSorter<I> {
impl<I> ArraysSorter<I> impl<I> ArraysSorter<I>
where where
I: Iterator<Item = ArrayRef>, I: Iterator<Item=ArrayRef>,
{ {
/// Converts arrays to record batch. /// Converts arrays to record batch.
fn sort(self) -> Result<(RecordBatch, i64, i64)> { fn sort(self) -> Result<(RecordBatch, i64, i64)> {
@@ -311,10 +329,10 @@ where
check_bounds: false, check_bounds: false,
}), }),
) )
.context(ComputeArrowSnafu)? .context(ComputeArrowSnafu)?
.as_any() .as_any()
.downcast_ref::<BinaryArray>() .downcast_ref::<BinaryArray>()
.unwrap(), .unwrap(),
)?) as ArrayRef; )?) as ArrayRef;
let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len()); let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
@@ -327,7 +345,7 @@ where
check_bounds: false, check_bounds: false,
}), }),
) )
.context(ComputeArrowSnafu)?, .context(ComputeArrowSnafu)?,
); );
} }
@@ -338,7 +356,7 @@ where
check_bounds: false, check_bounds: false,
}), }),
) )
.context(ComputeArrowSnafu)?; .context(ComputeArrowSnafu)?;
arrays.push(timestamp); arrays.push(timestamp);
arrays.push(pk_dictionary); arrays.push(pk_dictionary);
@@ -350,7 +368,7 @@ where
check_bounds: false, check_bounds: false,
}), }),
) )
.context(ComputeArrowSnafu)?, .context(ComputeArrowSnafu)?,
); );
arrays.push( arrays.push(
@@ -361,7 +379,7 @@ where
check_bounds: false, check_bounds: false,
}), }),
) )
.context(ComputeArrowSnafu)?, .context(ComputeArrowSnafu)?,
); );
let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?; let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
@@ -373,7 +391,7 @@ where
fn timestamp_array_to_iter( fn timestamp_array_to_iter(
timestamp_unit: TimeUnit, timestamp_unit: TimeUnit,
timestamp: &ArrayRef, timestamp: &ArrayRef,
) -> impl Iterator<Item = &i64> { ) -> impl Iterator<Item=&i64> {
match timestamp_unit { match timestamp_unit {
// safety: timestamp column must be valid. // safety: timestamp column must be valid.
TimeUnit::Second => timestamp TimeUnit::Second => timestamp
@@ -435,7 +453,7 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
} }
#[cfg(test)] #[cfg(test)]
mod tests { pub(crate) mod tests {
use std::collections::VecDeque; use std::collections::VecDeque;
use datafusion_common::ScalarValue; use datafusion_common::ScalarValue;
@@ -501,19 +519,19 @@ mod tests {
); );
} }
struct MutationInput<'a> { pub(crate) struct MutationInput<'a> {
k0: &'a str, pub(crate) k0: &'a str,
k1: u32, pub(crate) k1: u32,
timestamps: &'a [i64], pub(crate) timestamps: &'a [i64],
v1: &'a [Option<f64>], pub(crate) v1: &'a [Option<f64>],
sequence: u64, pub(crate) sequence: u64,
} }
#[derive(Debug, PartialOrd, PartialEq)] #[derive(Debug, PartialOrd, PartialEq)]
struct BatchOutput<'a> { pub(crate) struct BatchOutput<'a> {
pk_values: &'a [Value], pub(crate) pk_values: &'a [Value],
timestamps: &'a [i64], pub(crate) timestamps: &'a [i64],
v1: &'a [Option<f64>], pub(crate) v1: &'a [Option<f64>],
} }
fn check_mutations_to_record_batches( fn check_mutations_to_record_batches(
@@ -534,7 +552,7 @@ mod tests {
m.v1.iter().copied(), m.v1.iter().copied(),
m.sequence, m.sequence,
) )
.mutation .mutation
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let total_rows: usize = mutations let total_rows: usize = mutations
@@ -543,9 +561,9 @@ mod tests {
.map(|r| r.rows.len()) .map(|r| r.rows.len())
.sum(); .sum();
let pk_encoder = DensePrimaryKeyCodec::new(&metadata); let pk_encoder = SparseEncoder::new(&metadata);
let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup) let (batch, _, _,_) = mutations_to_record_batch(&mutations, &metadata, dedup)
.unwrap() .unwrap()
.unwrap(); .unwrap();
let read_format = ReadFormat::new_with_all_columns(metadata.clone()); let read_format = ReadFormat::new_with_all_columns(metadata.clone());
@@ -562,7 +580,7 @@ mod tests {
let batch_values = batches let batch_values = batches
.into_iter() .into_iter()
.map(|b| { .map(|b| {
let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense(); let pk_values = pk_encoder.decode(b.primary_key()).unwrap();
let timestamps = b let timestamps = b
.timestamps() .timestamps()
.as_any() .as_any()
@@ -742,7 +760,7 @@ mod tests {
); );
} }
fn encode(input: &[MutationInput]) -> BulkPart { pub(crate) fn encode(input: &[MutationInput]) -> BulkPart {
let metadata = metadata_for_test(); let metadata = metadata_for_test();
let mutations = input let mutations = input
.iter() .iter()
@@ -755,7 +773,7 @@ mod tests {
m.v1.iter().copied(), m.v1.iter().copied(),
m.sequence, m.sequence,
) )
.mutation .mutation
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let encoder = BulkPartEncoder::new(metadata, true, 1024); let encoder = BulkPartEncoder::new(metadata, true, 1024);

View File

@@ -0,0 +1,94 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Sparse primary key encoder;
use datatypes::prelude::ValueRef;
use memcomparable::Serializer;
use serde::Serialize;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use crate::error::SerializeFieldSnafu;
use crate::row_converter::SortField;
pub(crate) struct FieldWithId {
pub(crate) field: SortField,
pub(crate) column_id: ColumnId,
}
pub(crate) struct SparseEncoder {
pub(crate) columns: Vec<FieldWithId>,
#[cfg(test)]
pub(crate) column_id_to_field: std::collections::HashMap<ColumnId, (SortField, usize)>,
}
impl SparseEncoder {
pub(crate) fn new(metadata: &RegionMetadataRef) -> Self {
let mut columns = Vec::with_capacity(metadata.primary_key.len());
#[cfg(test)]
let mut column_id_to_field =
std::collections::HashMap::with_capacity(metadata.primary_key.len());
for (_idx, c) in metadata.primary_key_columns().enumerate() {
let sort_field = SortField::new(c.column_schema.data_type.clone());
let field = FieldWithId {
field: sort_field.clone(),
column_id: c.column_id,
};
columns.push(field);
#[cfg(test)]
column_id_to_field.insert(c.column_id, (sort_field, _idx));
}
Self {
columns,
#[cfg(test)]
column_id_to_field,
}
}
pub fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> crate::error::Result<()>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.columns.iter()) {
if !value.is_null() {
field
.column_id
.serialize(&mut serializer)
.context(SerializeFieldSnafu)?;
field.field.serialize(&mut serializer, &value)?;
}
}
Ok(())
}
#[cfg(test)]
pub fn decode(&self, bytes: &[u8]) -> crate::error::Result<Vec<datatypes::value::Value>> {
use serde::Deserialize;
let mut deserializer = memcomparable::Deserializer::new(bytes);
let mut values = vec![datatypes::value::Value::Null; self.columns.len()];
while deserializer.has_remaining() {
let column_id =
u32::deserialize(&mut deserializer).context(crate::error::DeserializeFieldSnafu)?;
let (field, idx) = self.column_id_to_field.get(&column_id).unwrap();
let value = field.deserialize(&mut deserializer)?;
values[*idx] = value;
}
Ok(values)
}
}

View File

@@ -394,6 +394,7 @@ mod tests {
sequence: START_SEQ, sequence: START_SEQ,
rows: Some(rows), rows: Some(rows),
write_hint: None, write_hint: None,
bulk: Vec::new(),
} }
} }
@@ -432,6 +433,7 @@ mod tests {
sequence: 100, sequence: 100,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}; };
let kvs = KeyValues::new(&meta, mutation); let kvs = KeyValues::new(&meta, mutation);
assert!(kvs.is_none()); assert!(kvs.is_none());

View File

@@ -731,6 +731,7 @@ mod tests {
rows, rows,
}), }),
write_hint: None, write_hint: None,
bulk: Vec::new(),
}; };
KeyValues::new(metadata.as_ref(), mutation).unwrap() KeyValues::new(metadata.as_ref(), mutation).unwrap()
} }

View File

@@ -23,18 +23,15 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
use common_time::Timestamp; use common_time::Timestamp;
use datafusion_common::ScalarValue; use datafusion_common::ScalarValue;
use datatypes::prelude::ValueRef; use datatypes::prelude::ValueRef;
use memcomparable::Serializer; use snafu::ensure;
use serde::Serialize;
use snafu::{ensure, ResultExt};
use store_api::codec::PrimaryKeyEncoding; use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef; use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber}; use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate; use table::predicate::Predicate;
use crate::error::{ use crate::error::{EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result};
EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu,
};
use crate::flush::WriteBufferManagerRef; use crate::flush::WriteBufferManagerRef;
use crate::memtable::encoder::SparseEncoder;
use crate::memtable::key_values::KeyValue; use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::partition::{ use crate::memtable::partition_tree::partition::{
Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext, Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
@@ -46,7 +43,7 @@ use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_ST
use crate::read::dedup::LastNonNullIter; use crate::read::dedup::LastNonNullIter;
use crate::read::Batch; use crate::read::Batch;
use crate::region::options::MergeMode; use crate::region::options::MergeMode;
use crate::row_converter::{PrimaryKeyCodec, SortField}; use crate::row_converter::PrimaryKeyCodec;
/// The partition tree. /// The partition tree.
pub struct PartitionTree { pub struct PartitionTree {
@@ -73,15 +70,7 @@ impl PartitionTree {
config: &PartitionTreeConfig, config: &PartitionTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>, write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self { ) -> Self {
let sparse_encoder = SparseEncoder { let sparse_encoder = SparseEncoder::new(&metadata);
fields: metadata
.primary_key_columns()
.map(|c| FieldWithId {
field: SortField::new(c.column_schema.data_type.clone()),
column_id: c.column_id,
})
.collect(),
};
let is_partitioned = Partition::has_multi_partitions(&metadata); let is_partitioned = Partition::has_multi_partitions(&metadata);
let mut config = config.clone(); let mut config = config.clone();
if config.merge_mode == MergeMode::LastNonNull { if config.merge_mode == MergeMode::LastNonNull {
@@ -436,34 +425,6 @@ impl PartitionTree {
} }
} }
struct FieldWithId {
field: SortField,
column_id: ColumnId,
}
struct SparseEncoder {
fields: Vec<FieldWithId>,
}
impl SparseEncoder {
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.fields.iter()) {
if !value.is_null() {
field
.column_id
.serialize(&mut serializer)
.context(SerializeFieldSnafu)?;
field.field.serialize(&mut serializer, &value)?;
}
}
Ok(())
}
}
#[derive(Default)] #[derive(Default)]
struct TreeIterMetrics { struct TreeIterMetrics {
iter_elapsed: Duration, iter_elapsed: Duration,

View File

@@ -29,7 +29,7 @@ use store_api::metadata::RegionMetadataRef;
use crate::error::{InvalidRequestSnafu, Result}; use crate::error::{InvalidRequestSnafu, Result};
use crate::memtable::key_values::KeyValue; use crate::memtable::key_values::KeyValue;
use crate::memtable::version::SmallMemtableVec; use crate::memtable::version::SmallMemtableVec;
use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef}; use crate::memtable::{BulkPart, KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
/// A partition holds rows with timestamps between `[min, max)`. /// A partition holds rows with timestamps between `[min, max)`.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -141,6 +141,18 @@ impl TimePartitions {
self.write_multi_parts(kvs, &parts) self.write_multi_parts(kvs, &parts)
} }
/// Write bulk to the memtables.
///
/// It creates new partitions if necessary.
pub fn write_bulk(&self, bulk_part: BulkPart) -> Result<()> {
// Get all parts.
let parts = self.list_partitions();
// TODO(yingwen): Now we never flush so we always have a partition.
let last_part = parts.last().unwrap();
last_part.memtable.write_bulk(bulk_part)
}
/// Append memtables in partitions to `memtables`. /// Append memtables in partitions to `memtables`.
pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) { pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
let inner = self.inner.lock().unwrap(); let inner = self.inner.lock().unwrap();

View File

@@ -912,7 +912,7 @@ impl IterBuilder for TimeSeriesIterBuilder {
} }
#[cfg(test)] #[cfg(test)]
mod tests { pub(crate) mod tests {
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use api::helper::ColumnDataTypeWrapper; use api::helper::ColumnDataTypeWrapper;
@@ -929,7 +929,7 @@ mod tests {
use super::*; use super::*;
use crate::row_converter::SortField; use crate::row_converter::SortField;
fn schema_for_test() -> RegionMetadataRef { pub(crate) fn schema_for_test() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
builder builder
.push_column_metadata(ColumnMetadata { .push_column_metadata(ColumnMetadata {
@@ -1143,7 +1143,7 @@ mod tests {
) )
} }
fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues { pub(crate) fn build_key_values(schema: &RegionMetadataRef, k0: String, k1: i64, len: usize) -> KeyValues {
let column_schema = schema let column_schema = schema
.column_metadatas .column_metadatas
.iter() .iter()
@@ -1186,6 +1186,7 @@ mod tests {
rows, rows,
}), }),
write_hint: None, write_hint: None,
bulk: Vec::new(),
}; };
KeyValues::new(schema.as_ref(), mutation).unwrap() KeyValues::new(schema.as_ref(), mutation).unwrap()
} }

View File

@@ -26,6 +26,8 @@ pub(crate) mod scan_util;
pub(crate) mod seq_scan; pub(crate) mod seq_scan;
pub(crate) mod unordered_scan; pub(crate) mod unordered_scan;
pub(crate) mod sync;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;

View File

@@ -284,23 +284,23 @@ impl MergeReaderBuilder {
/// Metrics for the merge reader. /// Metrics for the merge reader.
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct Metrics { pub(crate) struct Metrics {
/// Total scan cost of the reader. /// Total scan cost of the reader.
scan_cost: Duration, pub(crate) scan_cost: Duration,
/// Number of times to fetch batches. /// Number of times to fetch batches.
num_fetch_by_batches: usize, pub(crate) num_fetch_by_batches: usize,
/// Number of times to fetch rows. /// Number of times to fetch rows.
num_fetch_by_rows: usize, pub(crate) num_fetch_by_rows: usize,
/// Number of input rows. /// Number of input rows.
num_input_rows: usize, pub(crate) num_input_rows: usize,
/// Number of output rows. /// Number of output rows.
num_output_rows: usize, pub(crate) num_output_rows: usize,
/// Cost to fetch batches from sources. /// Cost to fetch batches from sources.
fetch_cost: Duration, pub(crate) fetch_cost: Duration,
} }
/// A `Node` represent an individual input data source to be merged. /// A `Node` represent an individual input data source to be merged.
struct Node { pub(crate) struct Node {
/// Data source of this `Node`. /// Data source of this `Node`.
source: Source, source: Source,
/// Current batch to be read. The node ensures the batch is not empty. /// Current batch to be read. The node ensures the batch is not empty.
@@ -313,7 +313,7 @@ impl Node {
/// Initialize a node. /// Initialize a node.
/// ///
/// It tries to fetch one batch from the `source`. /// It tries to fetch one batch from the `source`.
async fn new(mut source: Source, metrics: &mut Metrics) -> Result<Node> { pub(crate) async fn new(mut source: Source, metrics: &mut Metrics) -> Result<Node> {
// Ensures batch is not empty. // Ensures batch is not empty.
let start = Instant::now(); let start = Instant::now();
let current_batch = source.next_batch().await?.map(CompareFirst); let current_batch = source.next_batch().await?.map(CompareFirst);
@@ -432,7 +432,7 @@ impl Ord for Node {
/// Type to compare [Batch] by first row. /// Type to compare [Batch] by first row.
/// ///
/// It ignores op type as sequence is enough to distinguish different rows. /// It ignores op type as sequence is enough to distinguish different rows.
struct CompareFirst(Batch); pub(crate) struct CompareFirst(pub(crate) Batch);
impl PartialEq for CompareFirst { impl PartialEq for CompareFirst {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {

View File

@@ -0,0 +1,16 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod merge;
pub mod dedup;

View File

@@ -0,0 +1,84 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Sync dedup reader implementation
use common_telemetry::debug;
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
use crate::read::dedup::{DedupMetrics, DedupStrategy};
use crate::read::Batch;
/// A sync version of reader that dedup sorted batches from a source based on the
/// dedup strategy.
pub(crate) struct DedupReader<R, S> {
source: R,
strategy: S,
metrics: DedupMetrics,
}
impl<R, S> DedupReader<R, S> {
/// Creates a new dedup reader.
pub(crate) fn new(source: R, strategy: S) -> Self {
Self {
source,
strategy,
metrics: DedupMetrics::default(),
}
}
}
impl<R: Iterator<Item = crate::error::Result<Batch>>, S: DedupStrategy> DedupReader<R, S> {
/// Returns the next deduplicated batch.
fn fetch_next_batch(&mut self) -> Option<crate::error::Result<Batch>> {
while let Some(res) = self.source.next() {
match res {
Ok(batch) => {
if let Some(batch) = self
.strategy
.push_batch(batch, &mut self.metrics)
.transpose()
{
return Some(batch);
}
}
Err(err) => return Some(Err(err)),
}
}
self.strategy.finish(&mut self.metrics).transpose()
}
}
impl<R: Iterator<Item = crate::error::Result<Batch>>, S: DedupStrategy> Iterator
for DedupReader<R, S>
{
type Item = crate::error::Result<Batch>;
fn next(&mut self) -> Option<Self::Item> {
self.fetch_next_batch()
}
}
impl<R, S> Drop for DedupReader<R, S> {
fn drop(&mut self) {
debug!("Sync dedup reader finished, metrics: {:?}", self.metrics);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["dedup"])
.inc_by(self.metrics.num_unselected_rows as u64);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["delete"])
.inc_by(self.metrics.num_unselected_rows as u64);
}
}

View File

@@ -0,0 +1,384 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Sync merge reader implementation.
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::time::Instant;
use common_telemetry::debug;
use crate::error;
use crate::memtable::BoxedBatchIterator;
use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::{Batch};
use crate::read::merge::{CompareFirst, Metrics};
/// A `Node` represent an individual input data source to be merged.
pub(crate) struct Node {
/// Data source of this `Node`.
source: BoxedBatchIterator,
/// Current batch to be read. The node ensures the batch is not empty.
///
/// `None` means the `source` has reached EOF.
current_batch: Option<CompareFirst>,
}
impl Node {
/// Initialize a node.
///
/// It tries to fetch one batch from the `source`.
pub(crate) fn new(
mut source: BoxedBatchIterator,
metrics: &mut Metrics,
) -> error::Result<Node> {
// Ensures batch is not empty.
let start = Instant::now();
let current_batch = source.next().transpose()?.map(CompareFirst);
metrics.fetch_cost += start.elapsed();
metrics.num_input_rows += current_batch.as_ref().map(|b| b.0.num_rows()).unwrap_or(0);
Ok(Node {
source,
current_batch,
})
}
/// Returns whether the node still has batch to read.
fn is_eof(&self) -> bool {
self.current_batch.is_none()
}
/// Returns the primary key of current batch.
///
/// # Panics
/// Panics if the node has reached EOF.
fn primary_key(&self) -> &[u8] {
self.current_batch().primary_key()
}
/// Returns current batch.
///
/// # Panics
/// Panics if the node has reached EOF.
fn current_batch(&self) -> &Batch {
&self.current_batch.as_ref().unwrap().0
}
/// Returns current batch and fetches next batch
/// from the source.
///
/// # Panics
/// Panics if the node has reached EOF.
fn fetch_batch(&mut self, metrics: &mut Metrics) -> error::Result<Batch> {
let current = self.current_batch.take().unwrap();
let start = Instant::now();
// Ensures batch is not empty.
self.current_batch = self.source.next().transpose()?.map(CompareFirst);
metrics.fetch_cost += start.elapsed();
metrics.num_input_rows += self
.current_batch
.as_ref()
.map(|b| b.0.num_rows())
.unwrap_or(0);
Ok(current.0)
}
/// Returns true if the key range of current batch in `self` is behind (exclusive) current
/// batch in `other`.
///
/// # Panics
/// Panics if either `self` or `other` is EOF.
fn is_behind(&self, other: &Node) -> bool {
debug_assert!(!self.current_batch().is_empty());
debug_assert!(!other.current_batch().is_empty());
// We only compare pk and timestamp so nodes in the cold
// heap don't have overlapping timestamps with the hottest node
// in the hot heap.
self.primary_key().cmp(other.primary_key()).then_with(|| {
self.current_batch()
.first_timestamp()
.cmp(&other.current_batch().last_timestamp())
}) == Ordering::Greater
}
/// Skips first `num_to_skip` rows from node's current batch. If current batch is empty it fetches
/// next batch from the node.
///
/// # Panics
/// Panics if the node is EOF.
fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut Metrics) -> error::Result<()> {
let batch = self.current_batch();
debug_assert!(batch.num_rows() >= num_to_skip);
let remaining = batch.num_rows() - num_to_skip;
if remaining == 0 {
// Nothing remains, we need to fetch next batch to ensure the batch is not empty.
self.fetch_batch(metrics)?;
} else {
debug_assert!(!batch.is_empty());
self.current_batch = Some(CompareFirst(batch.slice(num_to_skip, remaining)));
}
Ok(())
}
}
impl PartialEq for Node {
fn eq(&self, other: &Node) -> bool {
self.current_batch == other.current_batch
}
}
impl Eq for Node {}
impl PartialOrd for Node {
fn partial_cmp(&self, other: &Node) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Node {
fn cmp(&self, other: &Node) -> Ordering {
// The std binary heap is a max heap, but we want the nodes are ordered in
// ascend order, so we compare the nodes in reverse order.
other.current_batch.cmp(&self.current_batch)
}
}
/// Reader to merge sorted batches.
///
/// The merge reader merges [Batch]es from multiple sources that yield sorted batches.
/// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can
/// ignore op type as sequence is already unique).
/// 2. Batches from sources **must** not be empty.
///
/// The reader won't concatenate batches. Each batch returned by the reader also doesn't
/// contain duplicate rows. But the last (primary key, timestamp) of a batch may be the same
/// as the first one in the next batch.
pub struct MergeReader {
/// Holds [Node]s whose key range of current batch **is** overlapped with the merge window.
/// Each node yields batches from a `source`.
///
/// [Node] in this heap **must** not be empty. A `merge window` is the (primary key, timestamp)
/// range of the **root node** in the `hot` heap.
hot: BinaryHeap<Node>,
/// Holds `Node` whose key range of current batch **isn't** overlapped with the merge window.
///
/// `Node` in this heap **must** not be empty.
cold: BinaryHeap<Node>,
/// Batch to output.
output_batch: Option<Batch>,
/// Local metrics.
metrics: Metrics,
}
impl Drop for MergeReader {
fn drop(&mut self) {
debug!("Merge reader(sync) finished, metrics: {:?}", self.metrics);
READ_STAGE_ELAPSED
.with_label_values(&["merge"])
.observe(self.metrics.scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["merge_fetch"])
.observe(self.metrics.fetch_cost.as_secs_f64());
}
}
impl Iterator for MergeReader {
type Item = error::Result<Batch>;
fn next(&mut self) -> Option<Self::Item> {
let start = Instant::now();
while !self.hot.is_empty() && self.output_batch.is_none() {
if self.hot.len() == 1 {
// No need to do merge sort if only one batch in the hot heap.
if let Err(e) = self.fetch_batch_from_hottest() {
return Some(Err(e));
}
self.metrics.num_fetch_by_batches += 1;
} else {
// We could only fetch rows that less than the next node from the hottest node.
if let Err(e) = self.fetch_rows_from_hottest() {
return Some(Err(e));
}
self.metrics.num_fetch_by_rows += 1;
}
}
if let Some(batch) = self.output_batch.take() {
self.metrics.scan_cost += start.elapsed();
self.metrics.num_output_rows += batch.num_rows();
Some(Ok(batch))
} else {
// Nothing fetched.
self.metrics.scan_cost += start.elapsed();
None
}
}
}
impl MergeReader {
/// Creates and initializes a new [MergeReader].
pub fn new(sources: Vec<BoxedBatchIterator>) -> error::Result<MergeReader> {
let start = Instant::now();
let mut metrics = Metrics::default();
let mut cold = BinaryHeap::with_capacity(sources.len());
let hot = BinaryHeap::with_capacity(sources.len());
for source in sources {
let node = Node::new(source, &mut metrics)?;
if !node.is_eof() {
// Ensure `cold` don't have eof nodes.
cold.push(node);
}
}
let mut reader = MergeReader {
hot,
cold,
output_batch: None,
metrics,
};
// Initializes the reader.
reader.refill_hot();
reader.metrics.scan_cost += start.elapsed();
Ok(reader)
}
/// Moves nodes in `cold` heap, whose key range is overlapped with current merge
/// window to `hot` heap.
fn refill_hot(&mut self) {
while !self.cold.is_empty() {
if let Some(merge_window) = self.hot.peek() {
let warmest = self.cold.peek().unwrap();
if warmest.is_behind(merge_window) {
// if the warmest node in the `cold` heap is totally after the
// `merge_window`, then no need to add more nodes into the `hot`
// heap for merge sorting.
break;
}
}
let warmest = self.cold.pop().unwrap();
self.hot.push(warmest);
}
}
/// Fetches one batch from the hottest node.
fn fetch_batch_from_hottest(&mut self) -> error::Result<()> {
assert_eq!(1, self.hot.len());
let mut hottest = self.hot.pop().unwrap();
let batch = hottest.fetch_batch(&mut self.metrics)?;
Self::maybe_output_batch(batch, &mut self.output_batch)?;
self.reheap(hottest)
}
/// Fetches non-duplicated rows from the hottest node.
fn fetch_rows_from_hottest(&mut self) -> error::Result<()> {
// Safety: `fetch_batches_to_output()` ensures the hot heap has more than 1 element.
// Pop hottest node.
let mut top_node = self.hot.pop().unwrap();
let top = top_node.current_batch();
// Min timestamp and its sequence in the next batch.
let next_min_ts = {
let next_node = self.hot.peek().unwrap();
let next = next_node.current_batch();
// top and next have overlapping rows so they must have same primary keys.
debug_assert_eq!(top.primary_key(), next.primary_key());
// Safety: Batches in the heap is not empty, so we can use unwrap here.
next.first_timestamp().unwrap()
};
// Safety: Batches in the heap is not empty, so we can use unwrap here.
let timestamps = top.timestamps_native().unwrap();
// Binary searches the timestamp in the top batch.
// Safety: Batches should have the same timestamp resolution so we can compare the native
// value directly.
let duplicate_pos = match timestamps.binary_search(&next_min_ts.value()) {
Ok(pos) => pos,
Err(pos) => {
// No duplicate timestamp. Outputs timestamp before `pos`.
Self::maybe_output_batch(top.slice(0, pos), &mut self.output_batch)?;
top_node.skip_rows(pos, &mut self.metrics)?;
return self.reheap(top_node);
}
};
// No need to remove duplicate timestamps.
let output_end = if duplicate_pos == 0 {
// If the first timestamp of the top node is duplicate. We can simply return the first row
// as the heap ensure it is the one with largest sequence.
1
} else {
// We don't know which one has the larger sequence so we use the range before
// the duplicate pos.
duplicate_pos
};
Self::maybe_output_batch(top.slice(0, output_end), &mut self.output_batch)?;
top_node.skip_rows(output_end, &mut self.metrics)?;
self.reheap(top_node)
}
/// Push the node popped from `hot` back to a proper heap.
fn reheap(&mut self, node: Node) -> crate::error::Result<()> {
if node.is_eof() {
// If the node is EOF, don't put it into the heap again.
// The merge window would be updated, need to refill the hot heap.
self.refill_hot();
} else {
// Find a proper heap for this node.
let node_is_cold = if let Some(hottest) = self.hot.peek() {
// If key range of this node is behind the hottest node's then we can
// push it to the cold heap. Otherwise we should push it to the hot heap.
node.is_behind(hottest)
} else {
// The hot heap is empty, but we don't known whether the current
// batch of this node is still the hottest.
true
};
if node_is_cold {
self.cold.push(node);
} else {
self.hot.push(node);
}
// Anyway, the merge window has been changed, we need to refill the hot heap.
self.refill_hot();
}
Ok(())
}
/// If `filter_deleted` is set to true, removes deleted entries and sets the `batch` to the `output_batch`.
///
/// Ignores the `batch` if it is empty.
fn maybe_output_batch(
batch: Batch,
output_batch: &mut Option<Batch>,
) -> crate::error::Result<()> {
debug_assert!(output_batch.is_none());
if batch.is_empty() {
return Ok(());
}
*output_batch = Some(batch);
Ok(())
}
}

View File

@@ -544,10 +544,12 @@ where
.as_ref() .as_ref()
.map(|rows| rows.rows.len()) .map(|rows| rows.rows.len())
.unwrap_or(0); .unwrap_or(0);
// TODO(yingwen): We need to support schema change as bulk may have different schema.
region_write_ctx.push_mutation( region_write_ctx.push_mutation(
mutation.op_type, mutation.op_type,
mutation.rows, mutation.rows,
mutation.write_hint, mutation.write_hint,
mutation.bulk,
OptionOutputTx::none(), OptionOutputTx::none(),
); );
} }

View File

@@ -16,15 +16,19 @@ use std::mem;
use std::sync::Arc; use std::sync::Arc;
use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint}; use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint};
use futures::future::try_join_all;
use snafu::ResultExt; use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::provider::Provider; use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore; use store_api::logstore::LogStore;
use store_api::storage::{RegionId, SequenceNumber}; use store_api::storage::{RegionId, SequenceNumber};
use crate::error::{Error, Result, WriteGroupSnafu}; use crate::error::{Error, JoinSnafu, Result, WriteGroupSnafu};
use crate::memtable::KeyValues; use crate::memtable::bulk::part::BulkPartEncoder;
use crate::memtable::{BulkPart, KeyValues};
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::request::OptionOutputTx; use crate::request::OptionOutputTx;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
use crate::wal::{EntryId, WalWriter}; use crate::wal::{EntryId, WalWriter};
/// Notifier to notify write result on drop. /// Notifier to notify write result on drop.
@@ -93,6 +97,8 @@ pub(crate) struct RegionWriteCtx {
notifiers: Vec<WriteNotify>, notifiers: Vec<WriteNotify>,
/// The write operation is failed and we should not write to the mutable memtable. /// The write operation is failed and we should not write to the mutable memtable.
failed: bool, failed: bool,
/// Bulk parts to write to the memtable.
bulk_parts: Vec<Option<BulkPart>>,
// Metrics: // Metrics:
/// Rows to put. /// Rows to put.
@@ -125,6 +131,7 @@ impl RegionWriteCtx {
provider, provider,
notifiers: Vec::new(), notifiers: Vec::new(),
failed: false, failed: false,
bulk_parts: Vec::new(),
put_num: 0, put_num: 0,
delete_num: 0, delete_num: 0,
} }
@@ -136,6 +143,7 @@ impl RegionWriteCtx {
op_type: i32, op_type: i32,
rows: Option<Rows>, rows: Option<Rows>,
write_hint: Option<WriteHint>, write_hint: Option<WriteHint>,
bulk: Vec<u8>,
tx: OptionOutputTx, tx: OptionOutputTx,
) { ) {
let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0); let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0);
@@ -144,6 +152,7 @@ impl RegionWriteCtx {
sequence: self.next_sequence, sequence: self.next_sequence,
rows, rows,
write_hint, write_hint,
bulk,
}); });
let notify = WriteNotify::new(tx, num_rows); let notify = WriteNotify::new(tx, num_rows);
@@ -205,15 +214,35 @@ impl RegionWriteCtx {
} }
let mutable = &self.version.memtables.mutable; let mutable = &self.version.memtables.mutable;
// Takes mutations from the wal entry.
let mutations = mem::take(&mut self.wal_entry.mutations); if self.version().metadata.primary_key_encoding == PrimaryKeyEncoding::Dense {
for (mutation, notify) in mutations.into_iter().zip(&mut self.notifiers) { let mutations = mem::take(&mut self.wal_entry.mutations);
// Write mutation to the memtable. for (mutation, notify) in mutations.into_iter().zip(&mut self.notifiers) {
let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else { // Write mutation to the memtable.
continue; let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else {
}; continue;
if let Err(e) = mutable.write(&kvs) { };
notify.err = Some(Arc::new(e)); if let Err(e) = mutable.write(&kvs) {
notify.err = Some(Arc::new(e));
}
}
} else {
// Takes mutations from the wal entry.
let bulk_parts = mem::take(&mut self.bulk_parts);
for (bulk_part, notify) in bulk_parts.into_iter().zip(&mut self.notifiers) {
// Write mutation to the memtable.
let Some(bulk_part) = bulk_part else {
continue;
};
if let Err(e) = mutable.write_bulk(bulk_part) {
notify.err = Some(Arc::new(e));
}
// let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else {
// continue;
// };
// if let Err(e) = mutable.write(&kvs) {
// notify.err = Some(Arc::new(e));
// }
} }
} }
@@ -222,4 +251,35 @@ impl RegionWriteCtx {
self.version_control self.version_control
.set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1); .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
} }
/// Encodes mutations into bulks and clears rows.
pub(crate) async fn encode_bulks(&mut self) -> Result<()> {
let mut tasks = Vec::with_capacity(self.wal_entry.mutations.len());
for mutation in self.wal_entry.mutations.drain(..) {
let metadata = self.version.metadata.clone();
let task = common_runtime::spawn_global(async move {
let encoder = BulkPartEncoder::new(metadata, true, DEFAULT_ROW_GROUP_SIZE);
let mutations = [mutation];
let part_opt = encoder.encode_mutations(&mutations)?;
let [mut mutation] = mutations;
// TODO(yingwen): This require clone the data, we should avoid this.
mutation.bulk = part_opt
.as_ref()
.map(|part| part.data.to_vec())
.unwrap_or_default();
mutation.rows = None;
Ok((part_opt, mutation))
});
tasks.push(task);
}
let results = try_join_all(tasks).await.context(JoinSnafu)?;
for result in results {
let (part_opt, mutation) = result?;
self.wal_entry.mutations.push(mutation);
self.bulk_parts.push(part_opt);
}
Ok(())
}
} }

View File

@@ -290,6 +290,7 @@ pub(crate) fn build_key_values_with_ts_seq_values(
rows, rows,
}), }),
write_hint: None, write_hint: None,
bulk: Vec::new(),
}; };
KeyValues::new(metadata.as_ref(), mutation).unwrap() KeyValues::new(metadata.as_ref(), mutation).unwrap()
} }

View File

@@ -166,6 +166,7 @@ pub(crate) fn write_rows_to_version(
sequence: start_ts as u64, // The sequence may be incorrect, but it's fine in test. sequence: start_ts as u64, // The sequence may be incorrect, but it's fine in test.
rows: Some(rows), rows: Some(rows),
write_hint: None, write_hint: None,
bulk: Vec::new(),
}; };
let key_values = KeyValues::new(&version.metadata, mutation).unwrap(); let key_values = KeyValues::new(&version.metadata, mutation).unwrap();
version.memtables.mutable.write(&key_values).unwrap(); version.memtables.mutable.write(&key_values).unwrap();

View File

@@ -288,6 +288,7 @@ mod tests {
sequence, sequence,
rows: Some(Rows { schema, rows }), rows: Some(Rows { schema, rows }),
write_hint: None, write_hint: None,
bulk: Vec::new(),
} }
} }

View File

@@ -176,7 +176,7 @@ pub const DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE: usize = 2048;
/// | /// |
/// // may deadlock | /// // may deadlock |
/// distributor.distribute().await; | /// distributor.distribute().await; |
/// | /// |
/// | /// |
/// receivers[0].read().await | /// receivers[0].read().await |
/// ``` /// ```
@@ -280,6 +280,7 @@ mod tests {
sequence: 1u64, sequence: 1u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
} }
.encode_to_vec(), .encode_to_vec(),
@@ -294,6 +295,7 @@ mod tests {
sequence: 2u64, sequence: 2u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
} }
.encode_to_vec(), .encode_to_vec(),
@@ -308,6 +310,7 @@ mod tests {
sequence: 3u64, sequence: 3u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
} }
.encode_to_vec(), .encode_to_vec(),
@@ -352,6 +355,7 @@ mod tests {
sequence: 1u64, sequence: 1u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
} }
)] )]
@@ -372,6 +376,7 @@ mod tests {
sequence: 2u64, sequence: 2u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
} }
)] )]
@@ -388,6 +393,7 @@ mod tests {
sequence: 1u64, sequence: 1u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
}; };
let region2 = RegionId::new(1, 2); let region2 = RegionId::new(1, 2);
@@ -397,6 +403,7 @@ mod tests {
sequence: 3u64, sequence: 3u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
}; };
let region3 = RegionId::new(1, 3); let region3 = RegionId::new(1, 3);
@@ -406,6 +413,7 @@ mod tests {
sequence: 3u64, sequence: 3u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
}; };
let provider = Provider::kafka_provider("my_topic".to_string()); let provider = Provider::kafka_provider("my_topic".to_string());
@@ -484,6 +492,7 @@ mod tests {
sequence: 1u64, sequence: 1u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
}; };
let region2 = RegionId::new(1, 2); let region2 = RegionId::new(1, 2);
@@ -561,6 +570,7 @@ mod tests {
sequence: 1u64, sequence: 1u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
} }
.encode_to_vec(), .encode_to_vec(),
@@ -575,6 +585,7 @@ mod tests {
sequence: 2u64, sequence: 2u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
} }
.encode_to_vec(), .encode_to_vec(),
@@ -589,6 +600,7 @@ mod tests {
sequence: 3u64, sequence: 3u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
} }
.encode_to_vec(), .encode_to_vec(),
@@ -603,6 +615,7 @@ mod tests {
sequence: 4u64, sequence: 4u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
} }
.encode_to_vec(), .encode_to_vec(),
@@ -638,6 +651,7 @@ mod tests {
sequence: 4u64, sequence: 4u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
} }
)] )]

View File

@@ -116,6 +116,7 @@ mod tests {
sequence: 1u64, sequence: 1u64,
rows: None, rows: None,
write_hint: None, write_hint: None,
bulk: Vec::new(),
}], }],
}; };
let encoded_entry = wal_entry.encode_to_vec(); let encoded_entry = wal_entry.encode_to_vec();

View File

@@ -69,6 +69,24 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.prepare_region_write_ctx(write_requests) self.prepare_region_write_ctx(write_requests)
}; };
// Encodes all data into bulk parts before writing into the wal.
{
let _timer = WRITE_STAGE_ELAPSED
.with_label_values(&["encode_bulk"])
.start_timer();
for region_ctx in region_ctxs.values_mut() {
// Avoid encoding to bulk part when mutations are dense encoded.
if region_ctx.version().metadata.primary_key_encoding == PrimaryKeyEncoding::Dense {
continue;
}
// TODO(yingwen): We don't do region level parallelism as we only test
// one region now.
if let Err(e) = region_ctx.encode_bulks().await.map_err(Arc::new) {
region_ctx.set_error(e);
}
}
}
// Write to WAL. // Write to WAL.
{ {
let _timer = WRITE_STAGE_ELAPSED let _timer = WRITE_STAGE_ELAPSED
@@ -246,10 +264,12 @@ impl<S> RegionWorkerLoop<S> {
} }
// Collect requests by region. // Collect requests by region.
// TODO(yingwen): Encode into bulk.
region_ctx.push_mutation( region_ctx.push_mutation(
sender_req.request.op_type as i32, sender_req.request.op_type as i32,
Some(sender_req.request.rows), Some(sender_req.request.rows),
sender_req.request.hint, sender_req.request.hint,
Vec::new(),
sender_req.sender, sender_req.sender,
); );
} }

View File

@@ -15,6 +15,7 @@
//! Region Engine's definition //! Region Engine's definition
use std::any::Any; use std::any::Any;
use std::collections::HashMap;
use std::fmt::{Debug, Display}; use std::fmt::{Debug, Display};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@@ -27,13 +28,13 @@ use common_recordbatch::SendableRecordBatchStream;
use common_time::Timestamp; use common_time::Timestamp;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef; use datatypes::schema::SchemaRef;
use futures::future::join_all; use futures::future::{join_all, try_join_all};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
use crate::logstore::entry; use crate::logstore::entry;
use crate::metadata::RegionMetadataRef; use crate::metadata::RegionMetadataRef;
use crate::region_request::{RegionOpenRequest, RegionRequest}; use crate::region_request::{BatchRegionRequest, RegionOpenRequest, RegionRequest};
use crate::storage::{RegionId, ScanRequest}; use crate::storage::{RegionId, ScanRequest};
/// The settable region role state. /// The settable region role state.
@@ -407,6 +408,28 @@ pub trait RegionEngine: Send + Sync {
Ok(join_all(tasks).await) Ok(join_all(tasks).await)
} }
async fn handle_batch_request(
&self,
batch_request: BatchRegionRequest,
) -> Result<RegionResponse, BoxedError> {
let join_tasks = batch_request
.into_requests_iter()
.map(|(region_id, req)| async move { self.handle_request(region_id, req).await });
let results = try_join_all(join_tasks).await?;
let mut affected_rows = 0;
let mut extensions = HashMap::new();
for result in results {
affected_rows += result.affected_rows;
extensions.extend(result.extensions);
}
Ok(RegionResponse {
affected_rows,
extensions,
})
}
/// Handles non-query request to the region. Returns the count of affected rows. /// Handles non-query request to the region. Returns the count of affected rows.
async fn handle_request( async fn handle_request(
&self, &self,

View File

@@ -19,9 +19,9 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column_location::LocationType; use api::v1::add_column_location::LocationType;
use api::v1::column_def::as_fulltext_option; use api::v1::column_def::as_fulltext_option;
use api::v1::region::{ use api::v1::region::{
alter_request, compact_request, region_request, AlterRequest, AlterRequests, CloseRequest, alter_request, compact_request, region_request, AlterRequest, CloseRequest, CompactRequest,
CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests, CreateRequest, DeleteRequests, DropRequest, FlushRequest, InsertRequests, OpenRequest,
FlushRequest, InsertRequests, OpenRequest, TruncateRequest, TruncateRequest,
}; };
use api::v1::{self, set_index, Analyzer, Option as PbOption, Rows, SemanticType, WriteHint}; use api::v1::{self, set_index, Analyzer, Option as PbOption, Rows, SemanticType, WriteHint};
pub use common_base::AffectedRows; pub use common_base::AffectedRows;
@@ -30,7 +30,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::schema::FulltextOptions; use datatypes::schema::FulltextOptions;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use strum::IntoStaticStr; use strum::{AsRefStr, IntoStaticStr};
use crate::logstore::entry; use crate::logstore::entry;
use crate::metadata::{ use crate::metadata::{
@@ -47,6 +47,132 @@ use crate::mito_engine_options::{
use crate::path_utils::region_dir; use crate::path_utils::region_dir;
use crate::storage::{ColumnId, RegionId, ScanRequest}; use crate::storage::{ColumnId, RegionId, ScanRequest};
macro_rules! make_region_request {
($body_variant:ident, $make_fn:ident, $request_variant:ident, $data:expr) => {
$make_fn($data).map(|(region_id, req)| {
RegionRequestBundle::new_single(region_id, RegionRequest::$request_variant(req))
})
};
}
/// Convert [`region_request::Body`] to [`RegionRequest`] with region id or [`BatchRegionRequest`].
pub fn convert_body_to_requests(body: region_request::Body) -> Result<RegionRequestBundle> {
match body {
region_request::Body::Create(create) => {
make_region_request!(Create, make_region_create, Create, create)
}
region_request::Body::Alter(alter) => {
make_region_request!(Alter, make_region_alter, Alter, alter)
}
region_request::Body::Drop(drop) => {
make_region_request!(Drop, make_region_drop, Drop, drop)
}
region_request::Body::Open(open) => {
make_region_request!(Open, make_region_open, Open, open)
}
region_request::Body::Close(close) => {
make_region_request!(Close, make_region_close, Close, close)
}
region_request::Body::Flush(flush) => {
make_region_request!(Flush, make_region_flush, Flush, flush)
}
region_request::Body::Compact(compact) => {
make_region_request!(Compact, make_region_compact, Compact, compact)
}
region_request::Body::Truncate(truncate) => {
make_region_request!(Truncate, make_region_truncate, Truncate, truncate)
}
// Batch requests
region_request::Body::Inserts(inserts) => {
let requests = make_region_puts(inserts)?;
Ok(RegionRequestBundle::new_vector(requests))
}
region_request::Body::Deletes(deletes) => {
let requests = make_region_deletes(deletes)?;
Ok(RegionRequestBundle::new_vector(requests))
}
region_request::Body::Creates(creates) => {
let requests = make_region_creates(creates.requests)?;
Ok(RegionRequestBundle::new_batch(BatchRegionRequest::Create(
requests,
)))
}
region_request::Body::Alters(alters) => {
let requests = make_region_alters(alters.requests)?;
Ok(RegionRequestBundle::new_batch(BatchRegionRequest::Alter(
requests,
)))
}
region_request::Body::Drops(drops) => {
let requests = make_region_drops(drops.requests)?;
Ok(RegionRequestBundle::new_batch(BatchRegionRequest::Drop(
requests,
)))
}
}
}
/// A bundle of region requests.
pub enum RegionRequestBundle {
Batch(BatchRegionRequest),
Vector(Vec<(RegionId, RegionRequest)>),
Single((RegionId, RegionRequest)),
}
impl RegionRequestBundle {
pub fn new_batch(requests: BatchRegionRequest) -> Self {
Self::Batch(requests)
}
pub fn new_single(region_id: RegionId, request: RegionRequest) -> Self {
Self::Single((region_id, request))
}
pub fn new_vector(requests: Vec<(RegionId, RegionRequest)>) -> Self {
Self::Vector(requests)
}
}
#[derive(Debug, IntoStaticStr)]
pub enum BatchRegionRequest {
Create(Vec<(RegionId, RegionCreateRequest)>),
Drop(Vec<(RegionId, RegionDropRequest)>),
Alter(Vec<(RegionId, RegionAlterRequest)>),
Put(Vec<(RegionId, RegionPutRequest)>),
}
impl BatchRegionRequest {
/// Returns an iterator over the requests.
pub fn into_requests_iter(self) -> Box<dyn Iterator<Item = (RegionId, RegionRequest)>> {
match self {
BatchRegionRequest::Create(requests) => Box::new(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Create(req))),
),
BatchRegionRequest::Drop(requests) => Box::new(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
),
BatchRegionRequest::Alter(requests) => Box::new(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Alter(req))),
),
BatchRegionRequest::Put(requests) => Box::new(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Put(req))),
),
}
}
pub fn into_requests(self) -> Vec<(RegionId, RegionRequest)> {
self.into_requests_iter().collect()
}
}
#[derive(Debug, IntoStaticStr)] #[derive(Debug, IntoStaticStr)]
pub enum RegionRequest { pub enum RegionRequest {
Put(RegionPutRequest), Put(RegionPutRequest),
@@ -63,32 +189,84 @@ pub enum RegionRequest {
} }
impl RegionRequest { impl RegionRequest {
/// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
/// Inserts/Deletes request might become multiple requests. Others are one-to-one.
pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
match body {
region_request::Body::Inserts(inserts) => make_region_puts(inserts),
region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
region_request::Body::Create(create) => make_region_create(create),
region_request::Body::Drop(drop) => make_region_drop(drop),
region_request::Body::Open(open) => make_region_open(open),
region_request::Body::Close(close) => make_region_close(close),
region_request::Body::Alter(alter) => make_region_alter(alter),
region_request::Body::Flush(flush) => make_region_flush(flush),
region_request::Body::Compact(compact) => make_region_compact(compact),
region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
region_request::Body::Creates(creates) => make_region_creates(creates),
region_request::Body::Drops(drops) => make_region_drops(drops),
region_request::Body::Alters(alters) => make_region_alters(alters),
}
}
/// Returns the type name of the request. /// Returns the type name of the request.
pub fn request_type(&self) -> &'static str { pub fn request_type(&self) -> &'static str {
self.into() self.into()
} }
} }
fn make_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
let column_metadatas = create
.column_defs
.into_iter()
.map(ColumnMetadata::try_from_column_def)
.collect::<Result<Vec<_>>>()?;
let region_id = create.region_id.into();
let region_dir = region_dir(&create.path, region_id);
Ok((
region_id,
RegionCreateRequest {
engine: create.engine,
column_metadatas,
primary_key: create.primary_key,
options: create.options,
region_dir,
},
))
}
fn make_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
let region_id = drop.region_id.into();
Ok((region_id, RegionDropRequest {}))
}
fn make_region_open(open: OpenRequest) -> Result<(RegionId, RegionOpenRequest)> {
let region_id = open.region_id.into();
let region_dir = region_dir(&open.path, region_id);
Ok((
region_id,
RegionOpenRequest {
engine: open.engine,
region_dir,
options: open.options,
skip_wal_replay: false,
},
))
}
fn make_region_close(close: CloseRequest) -> Result<(RegionId, RegionCloseRequest)> {
let region_id = close.region_id.into();
Ok((region_id, RegionCloseRequest {}))
}
fn make_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
let region_id = alter.region_id.into();
Ok((region_id, RegionAlterRequest::try_from(alter)?))
}
fn make_region_flush(flush: FlushRequest) -> Result<(RegionId, RegionFlushRequest)> {
let region_id = flush.region_id.into();
Ok((
region_id,
RegionFlushRequest {
row_group_size: None,
},
))
}
fn make_region_compact(compact: CompactRequest) -> Result<(RegionId, RegionCompactRequest)> {
let region_id = compact.region_id.into();
let options = compact
.options
.unwrap_or(compact_request::Options::Regular(Default::default()));
Ok((region_id, RegionCompactRequest { options }))
}
fn make_region_truncate(truncate: TruncateRequest) -> Result<(RegionId, RegionTruncateRequest)> {
let region_id = truncate.region_id.into();
Ok((region_id, RegionTruncateRequest {}))
}
fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> { fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
let requests = inserts let requests = inserts
.requests .requests
@@ -123,113 +301,32 @@ fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionR
Ok(requests) Ok(requests)
} }
fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> { macro_rules! make_region_batch_requests {
let column_metadatas = create ($fn_single:ident, $fn_batch:ident, $req_type:ty, $output_type:ty) => {
.column_defs fn $fn_batch(reqs: Vec<$req_type>) -> Result<Vec<(RegionId, $output_type)>> {
.into_iter() reqs.into_iter().map($fn_single).collect()
.map(ColumnMetadata::try_from_column_def) }
.collect::<Result<Vec<_>>>()?; };
let region_id = create.region_id.into();
let region_dir = region_dir(&create.path, region_id);
Ok(vec![(
region_id,
RegionRequest::Create(RegionCreateRequest {
engine: create.engine,
column_metadatas,
primary_key: create.primary_key,
options: create.options,
region_dir,
}),
)])
} }
fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> { make_region_batch_requests!(
let mut requests = Vec::with_capacity(creates.requests.len()); make_region_create,
for create in creates.requests { make_region_creates,
requests.extend(make_region_create(create)?); CreateRequest,
} RegionCreateRequest
Ok(requests) );
} make_region_batch_requests!(
make_region_drop,
fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> { make_region_drops,
let region_id = drop.region_id.into(); DropRequest,
Ok(vec![(region_id, RegionRequest::Drop(RegionDropRequest {}))]) RegionDropRequest
} );
make_region_batch_requests!(
fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> { make_region_alter,
let mut requests = Vec::with_capacity(drops.requests.len()); make_region_alters,
for drop in drops.requests { AlterRequest,
requests.extend(make_region_drop(drop)?); RegionAlterRequest
} );
Ok(requests)
}
fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = open.region_id.into();
let region_dir = region_dir(&open.path, region_id);
Ok(vec![(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: open.engine,
region_dir,
options: open.options,
skip_wal_replay: false,
}),
)])
}
fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = close.region_id.into();
Ok(vec![(
region_id,
RegionRequest::Close(RegionCloseRequest {}),
)])
}
fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = alter.region_id.into();
Ok(vec![(
region_id,
RegionRequest::Alter(RegionAlterRequest::try_from(alter)?),
)])
}
fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
let mut requests = Vec::with_capacity(alters.requests.len());
for alter in alters.requests {
requests.extend(make_region_alter(alter)?);
}
Ok(requests)
}
fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = flush.region_id.into();
Ok(vec![(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)])
}
fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = compact.region_id.into();
let options = compact
.options
.unwrap_or(compact_request::Options::Regular(Default::default()));
Ok(vec![(
region_id,
RegionRequest::Compact(RegionCompactRequest { options }),
)])
}
fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let region_id = truncate.region_id.into();
Ok(vec![(
region_id,
RegionRequest::Truncate(RegionTruncateRequest {}),
)])
}
/// Request to put data into a region. /// Request to put data into a region.
#[derive(Debug)] #[derive(Debug)]
@@ -401,7 +498,7 @@ impl TryFrom<AlterRequest> for RegionAlterRequest {
} }
/// Kind of the alteration. /// Kind of the alteration.
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
pub enum AlterKind { pub enum AlterKind {
/// Add columns to the region. /// Add columns to the region.
AddColumns { AddColumns {