chore: remove useless storage apis (#2904)

* chore: remove metadata.rs

* chore: remove snapshot.rs

* chore: remove chunk.rs

* chore: remove engine.rs

* chore: remove MIN_OP_TYPE from types.rs

* chore: remove region.rs

* chore: remove almost all codes in requests.rs

* chore: remove WriteRequest from requests.rs

* chore: remove responses.rs

* chore: remove unused descriptors from descriptors.rs

* chore: remove unused consts from consts.rs

* chore: remove useless comments
This commit is contained in:
niebayes
2023-12-13 11:36:14 +08:00
committed by GitHub
parent 3c24ca1a7a
commit c42168d7c2
14 changed files with 4 additions and 1120 deletions

View File

@@ -1,56 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use crate::storage::ColumnDescriptorBuilderError;
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Invalid raw region request: {err}"))]
InvalidRawRegionRequest { err: String, location: Location },
#[snafu(display("Invalid default constraint: {constraint}"))]
InvalidDefaultConstraint {
constraint: String,
source: datatypes::error::Error,
location: Location,
},
#[snafu(display("Failed to build column descriptor: "))]
BuildColumnDescriptor {
#[snafu(source)]
error: ColumnDescriptorBuilderError,
location: Location,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::InvalidRawRegionRequest { .. } => StatusCode::InvalidArguments,
Error::InvalidDefaultConstraint { source, .. } => source.status_code(),
Error::BuildColumnDescriptor { .. } => StatusCode::Internal,
}
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}

View File

@@ -16,7 +16,6 @@
//! Storage related APIs
pub mod data_source;
mod error;
pub mod logstore;
pub mod manifest;
pub mod metadata;

View File

@@ -14,15 +14,9 @@
//! Storage APIs.
mod chunk;
pub mod consts;
mod descriptors;
mod engine;
mod metadata;
mod region;
mod requests;
mod responses;
mod snapshot;
mod types;
pub use datatypes::data_type::ConcreteDataType;
@@ -30,19 +24,6 @@ pub use datatypes::schema::{
ColumnDefaultConstraint, ColumnSchema, Schema, SchemaBuilder, SchemaRef,
};
pub use self::chunk::{Chunk, ChunkReader};
pub use self::descriptors::*;
pub use self::engine::{
CloseOptions, CompactionStrategy, CreateOptions, EngineContext, OpenOptions, StorageEngine,
TwcsOptions,
};
pub use self::metadata::RegionMeta;
pub use self::region::{
CloseContext, CompactContext, FlushContext, FlushReason, Region, RegionStat, WriteContext,
};
pub use self::requests::{
AddColumn, AlterOperation, AlterRequest, GetRequest, ScanRequest, WriteRequest,
};
pub use self::responses::{GetResponse, ScanResponse, WriteResponse};
pub use self::snapshot::{ReadContext, Snapshot};
pub use self::types::{SequenceNumber, MIN_OP_TYPE};
pub use self::requests::ScanRequest;
pub use self::types::SequenceNumber;

View File

@@ -1,53 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_recordbatch::OrderOption;
use datatypes::vectors::VectorRef;
use crate::storage::SchemaRef;
/// A bunch of rows in columnar format.
#[derive(Debug)]
pub struct Chunk {
pub columns: Vec<VectorRef>,
// TODO(yingwen): Sequences.
}
impl Chunk {
pub fn new(columns: Vec<VectorRef>) -> Chunk {
Chunk { columns }
}
}
/// `ChunkReader` is similar to async iterator of [Chunk].
#[async_trait]
pub trait ChunkReader: Send {
type Error: ErrorExt + Send + Sync;
/// Schema of the chunks returned by this reader.
/// This schema does not contain internal columns.
fn user_schema(&self) -> &SchemaRef;
/// Fetch next chunk from the reader.
async fn next_chunk(&mut self) -> Result<Option<Chunk>, Self::Error>;
// project the chunk according to required projection.
fn project_chunk(&self, chunk: Chunk) -> Chunk;
fn output_ordering(&self) -> Option<Vec<OrderOption>> {
None
}
}

View File

@@ -14,19 +14,7 @@
//! Constants.
use crate::storage::descriptors::{ColumnFamilyId, ColumnId};
// ---------- Reserved column family ids ---------------------------------------
/// Column family Id for row key columns.
///
/// This is a virtual column family, actually row key columns are not
/// stored in any column family.
pub const KEY_CF_ID: ColumnFamilyId = 0;
/// Id for default column family.
pub const DEFAULT_CF_ID: ColumnFamilyId = 1;
// -----------------------------------------------------------------------------
use crate::storage::descriptors::ColumnId;
// ---------- Reserved column ids ----------------------------------------------
@@ -93,9 +81,6 @@ impl ReservedColumnId {
// ---------- Names reserved for internal columns and engine -------------------
/// Names for default column family.
pub const DEFAULT_CF_NAME: &str = "default";
/// Name for reserved column: sequence
pub const SEQUENCE_COLUMN_NAME: &str = "__sequence";
@@ -118,14 +103,6 @@ pub fn is_internal_column(name: &str) -> bool {
// -----------------------------------------------------------------------------
// ---------- Default options --------------------------------------------------
pub const READ_BATCH_SIZE: usize = 256;
pub const WRITE_ROW_GROUP_SIZE: usize = 4096;
// -----------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -17,12 +17,10 @@ use std::fmt;
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use crate::storage::{consts, ColumnDefaultConstraint, ColumnSchema, ConcreteDataType};
use crate::storage::{ColumnDefaultConstraint, ColumnSchema, ConcreteDataType};
/// Id of column. Unique in each region.
pub type ColumnId = u32;
/// Id of column family. Unique in each region.
pub type ColumnFamilyId = u32;
/// Group number of one region. Unique in each region.
pub type RegionGroup = u8;
/// Sequence number of region inside one table. Unique in each table.
@@ -224,68 +222,6 @@ impl ColumnDescriptorBuilder {
}
}
/// A [RowKeyDescriptor] contains information about row key.
#[derive(Debug, Clone, PartialEq, Eq, Builder)]
#[builder(pattern = "owned")]
pub struct RowKeyDescriptor {
#[builder(default, setter(each(name = "push_column")))]
pub columns: Vec<ColumnDescriptor>,
/// Timestamp key column.
pub timestamp: ColumnDescriptor,
}
/// A [ColumnFamilyDescriptor] contains information to create a column family.
#[derive(Debug, Clone, PartialEq, Eq, Builder)]
#[builder(pattern = "owned")]
pub struct ColumnFamilyDescriptor {
#[builder(default = "consts::DEFAULT_CF_ID")]
pub cf_id: ColumnFamilyId,
#[builder(default = "consts::DEFAULT_CF_NAME.to_string()", setter(into))]
pub name: String,
/// Descriptors of columns in this column family.
#[builder(default, setter(each(name = "push_column")))]
pub columns: Vec<ColumnDescriptor>,
}
/// A [RegionDescriptor] contains information to create a region.
#[derive(Debug, Clone, PartialEq, Eq, Builder)]
#[builder(pattern = "owned")]
pub struct RegionDescriptor {
#[builder(setter(into))]
pub id: RegionId,
/// Region name.
#[builder(setter(into))]
pub name: String,
/// Row key descriptor of this region.
pub row_key: RowKeyDescriptor,
/// Default column family.
pub default_cf: ColumnFamilyDescriptor,
/// Extra column families defined by user.
#[builder(default, setter(each(name = "push_extra_column_family")))]
pub extra_cfs: Vec<ColumnFamilyDescriptor>,
}
impl RowKeyDescriptorBuilder {
pub fn new(timestamp: ColumnDescriptor) -> Self {
Self {
timestamp: Some(timestamp),
..Default::default()
}
}
pub fn columns_capacity(mut self, capacity: usize) -> Self {
self.columns = Some(Vec::with_capacity(capacity));
self
}
}
impl ColumnFamilyDescriptorBuilder {
pub fn columns_capacity(mut self, capacity: usize) -> Self {
self.columns = Some(Vec::with_capacity(capacity));
self
}
}
#[cfg(test)]
mod tests {
use datatypes::value::Value;
@@ -360,71 +296,6 @@ mod tests {
assert_eq!(expected, column_schema);
}
fn new_timestamp_desc() -> ColumnDescriptor {
ColumnDescriptorBuilder::new(5, "timestamp", ConcreteDataType::int64_datatype())
.is_time_index(true)
.build()
.unwrap()
}
#[test]
fn test_row_key_descriptor_builder() {
let timestamp = new_timestamp_desc();
let desc = RowKeyDescriptorBuilder::new(timestamp.clone())
.build()
.unwrap();
assert!(desc.columns.is_empty());
let desc = RowKeyDescriptorBuilder::new(timestamp.clone())
.columns_capacity(1)
.push_column(
ColumnDescriptorBuilder::new(6, "c1", ConcreteDataType::int32_datatype())
.build()
.unwrap(),
)
.push_column(
ColumnDescriptorBuilder::new(7, "c2", ConcreteDataType::int32_datatype())
.build()
.unwrap(),
)
.build()
.unwrap();
assert_eq!(2, desc.columns.len());
let desc = RowKeyDescriptorBuilder::new(timestamp).build().unwrap();
assert!(desc.columns.is_empty());
}
#[test]
fn test_cf_descriptor_builder() {
let desc = ColumnFamilyDescriptorBuilder::default().build().unwrap();
assert_eq!(consts::DEFAULT_CF_ID, desc.cf_id);
assert_eq!(consts::DEFAULT_CF_NAME, desc.name);
assert!(desc.columns.is_empty());
let desc = ColumnFamilyDescriptorBuilder::default()
.cf_id(32)
.name("cf1")
.build()
.unwrap();
assert_eq!(32, desc.cf_id);
assert_eq!("cf1", desc.name);
let desc = ColumnFamilyDescriptorBuilder::default()
.push_column(
ColumnDescriptorBuilder::default()
.id(6)
.name("c1")
.data_type(ConcreteDataType::int32_datatype())
.build()
.unwrap(),
)
.build()
.unwrap();
assert_eq!(1, desc.columns.len());
}
#[test]
fn test_region_id() {
assert_eq!(RegionId::new(0, 1), 1);

View File

@@ -1,194 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Storage Engine traits.
//!
//! [`StorageEngine`] is the abstraction over a multi-regions, schematized data storage system,
//! a [`StorageEngine`] instance manages a bunch of storage unit called [`Region`], which holds
//! chunks of rows, support operations like PUT/DELETE/SCAN.
use std::collections::HashMap;
use std::time::Duration;
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use crate::storage::descriptors::RegionDescriptor;
use crate::storage::region::Region;
const COMPACTION_STRATEGY_KEY: &str = "compaction";
const COMPACTION_STRATEGY_TWCS_VALUE: &str = "TWCS";
const TWCS_MAX_ACTIVE_WINDOW_FILES_KEY: &str = "compaction.twcs.max_active_window_files";
const TWCS_TIME_WINDOW_SECONDS_KEY: &str = "compaction.twcs.time_window_seconds";
const TWCS_MAX_INACTIVE_WINDOW_FILES_KEY: &str = "compaction.twcs.max_inactive_window_files";
/// Storage engine provides primitive operations to store and access data.
#[async_trait]
pub trait StorageEngine: Send + Sync + Clone + 'static {
type Error: ErrorExt + Send + Sync;
type Region: Region;
/// Opens an existing region. Returns `Ok(None)` if region does not exists.
async fn open_region(
&self,
ctx: &EngineContext,
name: &str,
opts: &OpenOptions,
) -> Result<Option<Self::Region>, Self::Error>;
/// Closes given region.
async fn close_region(
&self,
ctx: &EngineContext,
name: &str,
opts: &CloseOptions,
) -> Result<(), Self::Error>;
/// Creates and returns the created region.
///
/// Returns existing region if region with same name already exists. The region will
/// be opened before returning.
async fn create_region(
&self,
ctx: &EngineContext,
descriptor: RegionDescriptor,
opts: &CreateOptions,
) -> Result<Self::Region, Self::Error>;
/// Drops given region.
///
/// The region will be closed before dropping.
async fn drop_region(
&self,
ctx: &EngineContext,
region: Self::Region,
) -> Result<(), Self::Error>;
/// Returns the opened region with given name.
fn get_region(
&self,
ctx: &EngineContext,
name: &str,
) -> Result<Option<Self::Region>, Self::Error>;
/// Close the engine.
async fn close(&self, ctx: &EngineContext) -> Result<(), Self::Error>;
}
/// Storage engine context.
#[derive(Debug, Clone, Default)]
pub struct EngineContext {}
/// Options to create a region.
#[derive(Debug, Clone, Default)]
pub struct CreateOptions {
/// Region parent directory
pub parent_dir: String,
/// Region memtable max size in bytes
pub write_buffer_size: Option<usize>,
/// Region SST files TTL
pub ttl: Option<Duration>,
/// Compaction strategy
pub compaction_strategy: CompactionStrategy,
}
/// Options to open a region.
#[derive(Debug, Clone, Default)]
pub struct OpenOptions {
/// Region parent directory
pub parent_dir: String,
/// Region memtable max size in bytes
pub write_buffer_size: Option<usize>,
/// Region SST files TTL
pub ttl: Option<Duration>,
/// Compaction strategy
pub compaction_strategy: CompactionStrategy,
}
/// Options to close a region.
#[derive(Debug, Clone, Default)]
pub struct CloseOptions {
/// Flush region
pub flush: bool,
}
/// Options for compactions
#[derive(Debug, Clone)]
pub enum CompactionStrategy {
/// TWCS
Twcs(TwcsOptions),
}
impl Default for CompactionStrategy {
fn default() -> Self {
Self::Twcs(TwcsOptions::default())
}
}
/// TWCS compaction options.
#[derive(Debug, Clone)]
pub struct TwcsOptions {
/// Max num of files that can be kept in active writing time window.
pub max_active_window_files: usize,
/// Max num of files that can be kept in inactive time window.
pub max_inactive_window_files: usize,
/// Compaction time window defined when creating tables.
pub time_window_seconds: Option<i64>,
}
impl Default for TwcsOptions {
fn default() -> Self {
Self {
max_active_window_files: 4,
max_inactive_window_files: 1,
time_window_seconds: None,
}
}
}
impl From<&HashMap<String, String>> for CompactionStrategy {
fn from(opts: &HashMap<String, String>) -> Self {
let Some(strategy_name) = opts.get(COMPACTION_STRATEGY_KEY) else {
return CompactionStrategy::default();
};
if strategy_name.eq_ignore_ascii_case(COMPACTION_STRATEGY_TWCS_VALUE) {
let mut twcs_opts = TwcsOptions::default();
if let Some(max_active_window_files) = opts
.get(TWCS_MAX_ACTIVE_WINDOW_FILES_KEY)
.and_then(|num| num.parse::<usize>().ok())
{
twcs_opts.max_active_window_files = max_active_window_files;
}
if let Some(max_inactive_window_files) = opts
.get(TWCS_MAX_INACTIVE_WINDOW_FILES_KEY)
.and_then(|num| num.parse::<usize>().ok())
{
twcs_opts.max_inactive_window_files = max_inactive_window_files;
}
if let Some(time_window) = opts
.get(TWCS_TIME_WINDOW_SECONDS_KEY)
.and_then(|num| num.parse::<i64>().ok()) && time_window > 0
{
twcs_opts.time_window_seconds = Some(time_window);
}
CompactionStrategy::Twcs(twcs_opts)
} else {
// unrecognized compaction strategy
CompactionStrategy::default()
}
}
}

View File

@@ -1,24 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::storage::SchemaRef;
/// Metadata of a region.
pub trait RegionMeta: Send + Sync {
/// Returns the schema of the region.
fn schema(&self) -> &SchemaRef;
/// Returns the version of the region metadata.
fn version(&self) -> u32;
}

View File

@@ -1,177 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Region holds chunks of rows stored in the storage engine, but does not require that
//! rows must have continuous primary key range, which is implementation specific.
//!
//! Regions support operations like PUT/DELETE/SCAN that most key-value stores provide.
//! However, unlike key-value store, data stored in region has data model like:
//!
//! ```text
//! colk-1, ..., colk-m, timestamp, version -> colv-1, ..., colv-n
//! ```
//!
//! The data model require each row
//! - has 0 ~ m key column, parts of row key columns;
//! - **MUST** has a timestamp column, part of row key columns;
//! - has a version column, part of row key columns;
//! - has 0 ~ n value column.
//!
//! Each row is identified by (value of key columns, timestamp, version), which forms
//! a row key. Note that the implementation may allow multiple rows have same row
//! key (like ClickHouse), which is useful in analytic scenario.
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use crate::storage::engine::OpenOptions;
use crate::storage::metadata::RegionMeta;
use crate::storage::requests::{AlterRequest, WriteRequest};
use crate::storage::responses::WriteResponse;
use crate::storage::snapshot::{ReadContext, Snapshot};
use crate::storage::RegionId;
/// Chunks of rows in storage engine.
#[async_trait]
pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static {
type Error: ErrorExt + Send + Sync;
type Meta: RegionMeta;
type WriteRequest: WriteRequest;
type Snapshot: Snapshot;
fn id(&self) -> RegionId;
/// Returns name of the region.
fn name(&self) -> &str;
/// Returns the in memory metadata of this region.
fn in_memory_metadata(&self) -> Self::Meta;
/// Write updates to region.
async fn write(
&self,
ctx: &WriteContext,
request: Self::WriteRequest,
) -> Result<WriteResponse, Self::Error>;
/// Create a snapshot for read.
fn snapshot(&self, ctx: &ReadContext) -> Result<Self::Snapshot, Self::Error>;
/// Create write request
fn write_request(&self) -> Self::WriteRequest;
async fn alter(&self, request: AlterRequest) -> Result<(), Self::Error>;
async fn drop_region(&self) -> Result<(), Self::Error>;
fn disk_usage_bytes(&self) -> u64;
fn region_stat(&self) -> RegionStat {
RegionStat {
region_id: self.id().into(),
disk_usage_bytes: self.disk_usage_bytes(),
}
}
/// Flush memtable of the region to disk.
async fn flush(&self, ctx: &FlushContext) -> Result<(), Self::Error>;
async fn compact(&self, ctx: &CompactContext) -> Result<(), Self::Error>;
async fn truncate(&self) -> Result<(), Self::Error>;
}
#[derive(Default, Debug)]
pub struct RegionStat {
pub region_id: u64,
pub disk_usage_bytes: u64,
}
/// Context for write operations.
#[derive(Debug, Clone, Default)]
pub struct WriteContext {}
impl From<&OpenOptions> for WriteContext {
fn from(_opts: &OpenOptions) -> WriteContext {
WriteContext::default()
}
}
#[derive(Debug, Clone, Default)]
pub struct CloseContext {
/// If true, flush the closing region.
pub flush: bool,
}
/// Context for flush operations.
#[derive(Debug, Clone)]
pub struct FlushContext {
/// If true, the flush will wait until the flush is done.
/// Default: true
pub wait: bool,
/// Flush reason.
pub reason: FlushReason,
/// If true, allows to flush a closed region
pub force: bool,
}
impl Default for FlushContext {
fn default() -> FlushContext {
FlushContext {
wait: true,
reason: FlushReason::Others,
force: false,
}
}
}
#[derive(Debug, Copy, Clone)]
pub struct CompactContext {
/// Whether to wait the compaction result.
pub wait: bool,
}
impl Default for CompactContext {
fn default() -> CompactContext {
CompactContext { wait: true }
}
}
/// Reason of flush operation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlushReason {
/// Other reasons.
Others,
/// Memtable is full.
MemtableFull,
/// Flush manually.
Manually,
/// Auto flush periodically.
Periodically,
/// Global write buffer is full.
GlobalBufferFull,
}
impl FlushReason {
/// Returns reason as `str`.
pub fn as_str(&self) -> &'static str {
match self {
FlushReason::Others => "others",
FlushReason::MemtableFull => "memtable_full",
FlushReason::Manually => "manually",
FlushReason::Periodically => "periodically",
FlushReason::GlobalBufferFull => "global_buffer_full",
}
}
}

View File

@@ -12,39 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use api::helper::ColumnDataTypeWrapper;
use api::v1::region::{alter_request, AddColumn as PbAddColumn};
use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use common_query::logical_plan::Expr;
use common_recordbatch::OrderOption;
use datatypes::vectors::VectorRef;
use snafu::{OptionExt, ResultExt};
use crate::error::{
BuildColumnDescriptorSnafu, Error, InvalidDefaultConstraintSnafu, InvalidRawRegionRequestSnafu,
};
use crate::storage::{ColumnDescriptor, ColumnDescriptorBuilder, RegionDescriptor};
/// Write request holds a collection of updates to apply to a region.
///
/// The implementation of the write request should ensure all operations in
/// the request follows the same schema restriction.
pub trait WriteRequest: Send {
type Error: ErrorExt + Send + Sync;
/// Add put operation to the request.
///
/// `data` is the columnar format of the data to put.
fn put(&mut self, data: HashMap<String, VectorRef>) -> Result<(), Self::Error>;
/// Delete rows by `keys`.
///
/// `keys` are the row keys, in columnar format, of the rows to delete.
fn delete(&mut self, keys: HashMap<String, VectorRef>) -> Result<(), Self::Error>;
}
#[derive(Default, Clone, Debug, PartialEq, Eq)]
pub struct ScanRequest {
@@ -61,325 +30,3 @@ pub struct ScanRequest {
/// The data source should return *at least* this number of rows if available.
pub limit: Option<usize>,
}
#[derive(Debug)]
pub struct GetRequest {}
/// Operation to add a column.
#[derive(Debug, Clone)]
pub struct AddColumn {
/// Descriptor of the column to add.
pub desc: ColumnDescriptor,
/// Is the column a key column.
pub is_key: bool,
}
/// Operation to alter a region.
#[derive(Debug, Clone)]
pub enum AlterOperation {
/// Add columns to the region.
AddColumns {
/// Columns to add.
columns: Vec<AddColumn>,
},
/// Drop columns from the region, only value columns are allowed to drop.
DropColumns {
/// Name of columns to drop.
names: Vec<String>,
},
}
impl AlterOperation {
/// Apply the operation to the [RegionDescriptor].
pub fn apply(&self, descriptor: &mut RegionDescriptor) {
match self {
AlterOperation::AddColumns { columns } => {
Self::apply_add(columns, descriptor);
}
AlterOperation::DropColumns { names } => {
Self::apply_drop(names, descriptor);
}
}
}
/// Add `columns` to the [RegionDescriptor].
///
/// Value columns would be added to the default column family.
fn apply_add(columns: &[AddColumn], descriptor: &mut RegionDescriptor) {
for col in columns {
if col.is_key {
descriptor.row_key.columns.push(col.desc.clone());
} else {
descriptor.default_cf.columns.push(col.desc.clone());
}
}
}
/// Drop columns from the [RegionDescriptor] by their `names`.
///
/// Only value columns would be removed, non-value columns in `names` would be ignored.
fn apply_drop(names: &[String], descriptor: &mut RegionDescriptor) {
let name_set: HashSet<_> = names.iter().collect();
// Remove columns in the default cf.
descriptor
.default_cf
.columns
.retain(|col| !name_set.contains(&col.name));
// Remove columns in other cfs.
for cf in &mut descriptor.extra_cfs {
cf.columns.retain(|col| !name_set.contains(&col.name));
}
}
}
impl TryFrom<PbAddColumn> for AddColumn {
type Error = Error;
fn try_from(add_column: PbAddColumn) -> Result<Self, Self::Error> {
let column_def = add_column
.column_def
.context(InvalidRawRegionRequestSnafu {
err: "'column_def' is absent",
})?;
let column_id = column_def.column_id;
let column_def = column_def
.column_def
.context(InvalidRawRegionRequestSnafu {
err: "'column_def' is absent",
})?;
let data_type = column_def.data_type;
let data_type_ext = column_def.datatype_extension.clone();
let data_type = ColumnDataTypeWrapper::try_new(data_type, data_type_ext)
.map_err(|_| {
InvalidRawRegionRequestSnafu {
err: format!("unknown raw column datatype: {data_type}"),
}
.build()
})?
.into();
let constraint = column_def.default_constraint.as_slice();
let constraint = if constraint.is_empty() {
None
} else {
Some(
constraint
.try_into()
.context(InvalidDefaultConstraintSnafu {
constraint: String::from_utf8_lossy(constraint),
})?,
)
};
let desc = ColumnDescriptorBuilder::new(column_id, column_def.name.clone(), data_type)
.is_nullable(column_def.is_nullable)
.is_time_index(column_def.semantic_type() == SemanticType::Timestamp)
.default_constraint(constraint)
.build()
.context(BuildColumnDescriptorSnafu)?;
Ok(AddColumn {
desc,
is_key: column_def.semantic_type() == SemanticType::Tag,
// TODO(ruihang & yingwen): support alter column's "location"
})
}
}
impl TryFrom<alter_request::Kind> for AlterOperation {
type Error = Error;
fn try_from(kind: alter_request::Kind) -> Result<Self, Self::Error> {
let operation = match kind {
alter_request::Kind::AddColumns(x) => {
let columns = x
.add_columns
.into_iter()
.map(|x| x.try_into())
.collect::<Result<Vec<_>, Self::Error>>()?;
AlterOperation::AddColumns { columns }
}
alter_request::Kind::DropColumns(x) => {
let names = x.drop_columns.into_iter().map(|x| x.name).collect();
AlterOperation::DropColumns { names }
}
};
Ok(operation)
}
}
/// Alter region request.
#[derive(Debug)]
pub struct AlterRequest {
/// Operation to do.
pub operation: AlterOperation,
/// The version of the schema before applying the alteration.
pub version: u32,
}
#[cfg(test)]
mod tests {
use api::v1::region::{
AddColumn as PbAddColumn, AddColumns, DropColumn, DropColumns, RegionColumnDef,
};
use api::v1::{ColumnDataType, ColumnDef};
use datatypes::prelude::*;
use datatypes::schema::ColumnDefaultConstraint;
use super::*;
use crate::storage::{
ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, ColumnId, RegionDescriptorBuilder,
RowKeyDescriptorBuilder,
};
fn new_column_desc(id: ColumnId) -> ColumnDescriptor {
ColumnDescriptorBuilder::new(id, id.to_string(), ConcreteDataType::int64_datatype())
.is_nullable(false)
.build()
.unwrap()
}
fn new_region_descriptor() -> RegionDescriptor {
let row_key = RowKeyDescriptorBuilder::default()
.timestamp(new_column_desc(1))
.build()
.unwrap();
let default_cf = ColumnFamilyDescriptorBuilder::default()
.push_column(new_column_desc(2))
.build()
.unwrap();
RegionDescriptorBuilder::default()
.id(1)
.name("test")
.row_key(row_key)
.default_cf(default_cf)
.build()
.unwrap()
}
#[test]
fn test_alter_operation() {
let mut desc = new_region_descriptor();
let op = AlterOperation::AddColumns {
columns: vec![
AddColumn {
desc: new_column_desc(3),
is_key: true,
},
AddColumn {
desc: new_column_desc(4),
is_key: false,
},
],
};
op.apply(&mut desc);
assert_eq!(1, desc.row_key.columns.len());
assert_eq!("3", desc.row_key.columns[0].name);
assert_eq!(2, desc.default_cf.columns.len());
assert_eq!("2", desc.default_cf.columns[0].name);
assert_eq!("4", desc.default_cf.columns[1].name);
let op = AlterOperation::DropColumns {
names: vec![String::from("2")],
};
op.apply(&mut desc);
assert_eq!(1, desc.row_key.columns.len());
assert_eq!(1, desc.default_cf.columns.len());
assert_eq!("4", desc.default_cf.columns[0].name);
// Key columns are ignored.
let op = AlterOperation::DropColumns {
names: vec![String::from("1"), String::from("3")],
};
op.apply(&mut desc);
assert_eq!(1, desc.row_key.columns.len());
assert_eq!(1, desc.default_cf.columns.len());
}
#[test]
fn test_try_from_raw_alter_kind() {
let kind = alter_request::Kind::AddColumns(AddColumns {
add_columns: vec![
PbAddColumn {
column_def: Some(RegionColumnDef {
column_def: Some(ColumnDef {
name: "my_tag".to_string(),
data_type: ColumnDataType::Int32 as _,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Tag as _,
comment: String::new(),
..Default::default()
}),
column_id: 1,
}),
location: None,
},
PbAddColumn {
column_def: Some(RegionColumnDef {
column_def: Some(ColumnDef {
name: "my_field".to_string(),
data_type: ColumnDataType::String as _,
is_nullable: true,
default_constraint: ColumnDefaultConstraint::Value("hello".into())
.try_into()
.unwrap(),
semantic_type: SemanticType::Field as _,
comment: String::new(),
..Default::default()
}),
column_id: 2,
}),
location: None,
},
],
});
let AlterOperation::AddColumns { columns } = AlterOperation::try_from(kind).unwrap() else {
unreachable!()
};
assert_eq!(2, columns.len());
let desc = &columns[0].desc;
assert_eq!(desc.id, 1);
assert_eq!(&desc.name, "my_tag");
assert_eq!(desc.data_type, ConcreteDataType::int32_datatype());
assert!(!desc.is_nullable());
assert!(!desc.is_time_index());
assert_eq!(desc.default_constraint(), None);
assert!(columns[0].is_key);
let desc = &columns[1].desc;
assert_eq!(desc.id, 2);
assert_eq!(&desc.name, "my_field");
assert_eq!(desc.data_type, ConcreteDataType::string_datatype());
assert!(desc.is_nullable());
assert!(!desc.is_time_index());
assert_eq!(
desc.default_constraint(),
Some(&ColumnDefaultConstraint::Value("hello".into()))
);
assert!(!columns[1].is_key);
let kind = alter_request::Kind::DropColumns(DropColumns {
drop_columns: vec![
DropColumn {
name: "c1".to_string(),
},
DropColumn {
name: "c2".to_string(),
},
],
});
let AlterOperation::DropColumns { names } = AlterOperation::try_from(kind).unwrap() else {
unreachable!()
};
assert_eq!(names, vec!["c1", "c2"]);
}
}

View File

@@ -1,25 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[derive(Debug)]
pub struct WriteResponse {}
#[derive(Debug)]
pub struct ScanResponse<R> {
/// Reader to read result chunks.
pub reader: R,
}
#[derive(Debug)]
pub struct GetResponse {}

View File

@@ -1,55 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use datatypes::schema::SchemaRef;
use crate::storage::chunk::ChunkReader;
use crate::storage::consts;
use crate::storage::requests::{GetRequest, ScanRequest};
use crate::storage::responses::{GetResponse, ScanResponse};
/// A consistent read-only view of region.
#[async_trait]
pub trait Snapshot: Send + Sync {
type Error: ErrorExt + Send + Sync;
type Reader: ChunkReader;
fn schema(&self) -> &SchemaRef;
async fn scan(
&self,
ctx: &ReadContext,
request: ScanRequest,
) -> Result<ScanResponse<Self::Reader>, Self::Error>;
async fn get(&self, ctx: &ReadContext, request: GetRequest)
-> Result<GetResponse, Self::Error>;
}
/// Context for read.
#[derive(Debug, Clone)]
pub struct ReadContext {
/// Suggested batch size of chunk.
pub batch_size: usize,
}
impl Default for ReadContext {
fn default() -> ReadContext {
ReadContext {
batch_size: consts::READ_BATCH_SIZE,
}
}
}

View File

@@ -14,11 +14,6 @@
//! Common types.
use api::v1::OpType;
/// Represents a sequence number of data in storage. The offset of logstore can be used
/// as a sequence number.
pub type SequenceNumber = u64;
// TODO(hl): We should implement a `min` method for OpType in greptime-proto crate.
pub const MIN_OP_TYPE: OpType = OpType::Delete;

View File

@@ -25,8 +25,6 @@ pub mod table;
pub mod test_util;
pub mod thin_table;
pub use store_api::storage::RegionStat;
pub use crate::error::{Error, Result};
pub use crate::stats::{ColumnStatistics, TableStatistics};
pub use crate::table::{Table, TableRef};