feat(mito): Write wal and memtable (#2135)

* feat: hold wal entry in RegionWriteCtx

* feat: entry id and commited sequence

* feat: write to wal

* feat: write memtable

* feat: fill missing columns

* feat: validate write request

* feat: more validation to write request

* chore: fix typos

* feat: remove init and validate rows in new()

* style: fix clippy
This commit is contained in:
Yingwen
2023-08-12 16:44:44 +09:00
committed by GitHub
parent b62e643e92
commit e6090a8d5b
11 changed files with 678 additions and 91 deletions

View File

@@ -20,12 +20,12 @@ mod tests;
use std::sync::Arc;
use object_store::ObjectStore;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result};
use crate::request::{
CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest,
};
@@ -90,12 +90,17 @@ impl MitoEngine {
}
/// Write to a region.
pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> {
write_request.validate()?;
pub async fn write_region(&self, mut write_request: WriteRequest) -> Result<()> {
let region = self
.inner
.workers
.get_region(write_request.region_id)
.context(RegionNotFoundSnafu {
region_id: write_request.region_id,
})?;
let metadata = region.metadata();
// TODO(yingwen): Fill default values.
// We need to fill default values before writing it to WAL so we can get
// the same default value after reopening the region.
write_request.fill_missing_columns(&metadata)?;
self.inner
.handle_request_body(RequestBody::Write(write_request))

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use common_datasource::compression::CompressionType;
use common_error::ext::{BoxedError, ErrorExt};
@@ -185,15 +186,10 @@ pub enum Error {
/// An error type to indicate that schema is changed and we need
/// to fill default values again.
#[snafu(display(
"Need to fill default value to column {} of region {}",
column,
region_id
))]
#[snafu(display("Need to fill default value for region {}", region_id))]
FillDefault {
region_id: RegionId,
column: String,
// The error is for retry purpose so we don't need a location.
// The error is for internal use so we don't need a location.
},
#[snafu(display(
@@ -260,10 +256,21 @@ pub enum Error {
location: Location,
source: BoxedError,
},
// Shared error for each writer in the write group.
#[snafu(display("Failed to write region, source: {}", source))]
WriteGroup { source: Arc<Error> },
}
pub type Result<T> = std::result::Result<T, Error>;
impl Error {
/// Returns true if we need to fill default value for a region.
pub(crate) fn is_fill_default(&self) -> bool {
matches!(self, Error::FillDefault { .. })
}
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
@@ -296,6 +303,7 @@ impl ErrorExt for Error {
| EncodeWal { .. }
| DecodeWal { .. } => StatusCode::Internal,
WriteBuffer { source, .. } => source.status_code(),
WriteGroup { source, .. } => source.status_code(),
}
}

View File

@@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use crate::error::Result;
use crate::memtable::key_values::KeyValues;
pub use crate::memtable::key_values::KeyValues;
use crate::metadata::RegionMetadataRef;
/// Id for memtables.

View File

@@ -190,11 +190,12 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use greptime_proto::v1;
use greptime_proto::v1::{value, ColumnDataType, Value};
use greptime_proto::v1::ColumnDataType;
use store_api::storage::RegionId;
use super::*;
use crate::metadata::{ColumnMetadata, RegionMetadataBuilder};
use crate::proto_util::i64_value;
const TS_NAME: &str = "ts";
const START_SEQ: SequenceNumber = 100;
@@ -290,12 +291,6 @@ mod tests {
}
}
fn i64_value(data: i64) -> Value {
Value {
value: Some(value::Value::I64Value(data)),
}
}
fn check_key_values(kvs: &KeyValues, num_rows: usize, keys: &[i64], ts: i64, values: &[i64]) {
assert_eq!(num_rows, kvs.num_rows());
let mut expect_seq = START_SEQ;

View File

@@ -37,4 +37,9 @@ impl MemtableVersion {
immutables: vec![],
}
}
/// Returns the mutable memtable.
pub(crate) fn mutable(&self) -> &MemtableRef {
&self.mutable
}
}

View File

@@ -120,6 +120,56 @@ pub(crate) fn to_proto_value(value: Value) -> Option<v1::Value> {
Some(proto_value)
}
/// Returns the [ColumnDataType] of the value.
///
/// If value is null, returns `None`.
pub(crate) fn proto_value_type(value: &v1::Value) -> Option<ColumnDataType> {
let value_data = value.value.as_ref()?;
let value_type = match value_data {
v1::value::Value::I8Value(_) => ColumnDataType::Int8,
v1::value::Value::I16Value(_) => ColumnDataType::Int16,
v1::value::Value::I32Value(_) => ColumnDataType::Int32,
v1::value::Value::I64Value(_) => ColumnDataType::Int64,
v1::value::Value::U8Value(_) => ColumnDataType::Uint8,
v1::value::Value::U16Value(_) => ColumnDataType::Uint16,
v1::value::Value::U32Value(_) => ColumnDataType::Uint32,
v1::value::Value::U64Value(_) => ColumnDataType::Uint64,
v1::value::Value::F32Value(_) => ColumnDataType::Float32,
v1::value::Value::F64Value(_) => ColumnDataType::Float64,
v1::value::Value::BoolValue(_) => ColumnDataType::Boolean,
v1::value::Value::BinaryValue(_) => ColumnDataType::Binary,
v1::value::Value::StringValue(_) => ColumnDataType::String,
v1::value::Value::DateValue(_) => ColumnDataType::Date,
v1::value::Value::DatetimeValue(_) => ColumnDataType::Datetime,
v1::value::Value::TsSecondValue(_) => ColumnDataType::TimestampSecond,
v1::value::Value::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond,
v1::value::Value::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond,
v1::value::Value::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond,
v1::value::Value::TimeSecondValue(_) => ColumnDataType::TimeSecond,
v1::value::Value::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond,
v1::value::Value::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond,
v1::value::Value::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond,
};
Some(value_type)
}
// TODO(yingwen): Support conversion in greptime-proto.
/// Creates value for i64.
#[cfg(test)]
pub(crate) fn i64_value(data: i64) -> v1::Value {
v1::Value {
value: Some(v1::value::Value::I64Value(data)),
}
}
/// Creates value for timestamp millis.
#[cfg(test)]
pub(crate) fn ts_ms_value(data: i64) -> v1::Value {
v1::Value {
value: Some(v1::value::Value::TsMillisecondValue(data)),
}
}
/// Convert [ConcreteDataType] to [ColumnDataType].
pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option<ColumnDataType> {
let column_data_type = match data_type {
@@ -186,3 +236,5 @@ fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType
false
}
}
// TODO(yingwen): Tests.

View File

@@ -25,7 +25,8 @@ use store_api::storage::RegionId;
use crate::error::Result;
use crate::manifest::manager::RegionManifestManager;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::metadata::RegionMetadataRef;
use crate::region::version::VersionControlRef;
/// Type to store region version.
pub type VersionNumber = u32;
@@ -40,7 +41,7 @@ pub(crate) struct MitoRegion {
pub(crate) region_id: RegionId,
/// Version controller for this region.
version_control: VersionControlRef,
pub(crate) version_control: VersionControlRef,
/// Manager to maintain manifest for this region.
manifest_manager: RegionManifestManager,
}
@@ -57,9 +58,10 @@ impl MitoRegion {
Ok(())
}
/// Returns current version of the region.
pub(crate) fn version(&self) -> VersionRef {
self.version_control.current()
/// Returns current metadata of the region.
pub(crate) fn metadata(&self) -> RegionMetadataRef {
let version_data = self.version_control.current();
version_data.version.metadata.clone()
}
}

View File

@@ -23,39 +23,56 @@
//! Reason: data may be flushed/compacted and some data with old sequence may be removed
//! and became invisible between step 1 and 2, so need to acquire version at first.
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use arc_swap::ArcSwap;
use store_api::storage::SequenceNumber;
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
use crate::memtable::MemtableRef;
use crate::metadata::RegionMetadataRef;
use crate::sst::version::{SstVersion, SstVersionRef};
use crate::wal::EntryId;
/// Controls version of in memory metadata for a region.
/// Controls metadata and sequence numbers for a region.
///
/// It manages metadata in a copy-on-write fashion. Any modification to a region's metadata
/// will generate a new [Version].
#[derive(Debug)]
pub(crate) struct VersionControl {
/// Latest version.
version: ArcSwap<Version>,
data: RwLock<VersionControlData>,
}
impl VersionControl {
/// Returns a new [VersionControl] with specific `version`.
pub(crate) fn new(version: Version) -> VersionControl {
VersionControl {
version: ArcSwap::new(Arc::new(version)),
data: RwLock::new(VersionControlData {
version: Arc::new(version),
committed_sequence: 0,
last_entry_id: 0,
}),
}
}
/// Returns current [Version].
pub(crate) fn current(&self) -> VersionRef {
self.version.load_full()
/// Returns current copy of data.
pub(crate) fn current(&self) -> VersionControlData {
self.data.read().unwrap().clone()
}
}
pub(crate) type VersionControlRef = Arc<VersionControl>;
/// Data of [VersionControl].
#[derive(Debug, Clone)]
pub(crate) struct VersionControlData {
/// Latest version.
pub(crate) version: VersionRef,
/// Sequence number of last committed data.
pub(crate) committed_sequence: SequenceNumber,
/// Last WAL entry Id.
pub(crate) last_entry_id: EntryId,
}
/// Static metadata of a region.
#[derive(Clone, Debug)]
pub(crate) struct Version {

View File

@@ -18,7 +18,7 @@ use std::collections::HashMap;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows};
use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows, Value};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ColumnId, CompactionStrategy, OpType, RegionId};
use tokio::sync::oneshot::{self, Receiver, Sender};
@@ -27,8 +27,8 @@ use crate::config::DEFAULT_WRITE_BUFFER_SIZE;
use crate::error::{CreateDefaultSnafu, FillDefaultSnafu, InvalidRequestSnafu, Result};
use crate::metadata::{ColumnMetadata, RegionMetadata};
use crate::proto_util::{
is_column_type_value_eq, is_semantic_type_eq, to_column_data_type, to_proto_semantic_type,
to_proto_value,
is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type,
to_proto_semantic_type, to_proto_value,
};
/// Options that affect the entire region.
@@ -100,32 +100,58 @@ pub struct WriteRequest {
pub rows: Rows,
/// Map column name to column index in `rows`.
name_to_index: HashMap<String, usize>,
/// Whether each column has null.
has_null: Vec<bool>,
}
impl WriteRequest {
/// Returns a new request.
pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> WriteRequest {
let name_to_index = rows
.schema
.iter()
.enumerate()
.map(|(index, column)| (column.column_name.clone(), index))
.collect();
WriteRequest {
/// Creates a new request.
///
/// Returns `Err` if `rows` are invalid.
pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> Result<WriteRequest> {
let mut name_to_index = HashMap::with_capacity(rows.schema.len());
for (index, column) in rows.schema.iter().enumerate() {
ensure!(
name_to_index
.insert(column.column_name.clone(), index)
.is_none(),
InvalidRequestSnafu {
region_id,
reason: format!("duplicate column {}", column.column_name),
}
);
}
let mut has_null = vec![false; rows.schema.len()];
for row in &rows.rows {
ensure!(
row.values.len() == rows.schema.len(),
InvalidRequestSnafu {
region_id,
reason: format!(
"row has {} columns but schema has {}",
row.values.len(),
rows.schema.len()
),
}
);
for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
validate_proto_value(region_id, value, column_schema)?;
if value.value.is_none() {
has_null[i] = true;
}
}
}
Ok(WriteRequest {
region_id,
op_type,
rows,
name_to_index,
}
}
/// Validate the request.
pub(crate) fn validate(&self) -> Result<()> {
// - checks whether the request is too large.
// - checks whether each row in rows has the same schema.
// - checks whether each column match the schema in Rows.
// - checks rows don't have duplicate columns.
unimplemented!()
has_null,
})
}
/// Get column index by name.
@@ -133,7 +159,7 @@ impl WriteRequest {
self.name_to_index.get(name).copied()
}
/// Checks schema of rows.
/// Checks schema of rows is compatible with schema of the region.
///
/// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
/// error.
@@ -156,10 +182,12 @@ impl WriteRequest {
InvalidRequestSnafu {
region_id,
reason: format!(
"column {} expect type {:?}, given: {:?}({})",
"column {} expect type {:?}, given: {}({})",
column.column_schema.name,
column.column_schema.data_type,
ColumnDataType::from_i32(input_col.datatype),
ColumnDataType::from_i32(input_col.datatype)
.map(|v| v.as_str_name())
.unwrap_or("Unknown"),
input_col.datatype,
)
}
@@ -171,14 +199,27 @@ impl WriteRequest {
InvalidRequestSnafu {
region_id,
reason: format!(
"column {} has semantic type {:?}, given: {:?}({})",
"column {} has semantic type {:?}, given: {}({})",
column.column_schema.name,
column.semantic_type,
greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type),
greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type)
.map(|v| v.as_str_name())
.unwrap_or("Unknown"),
input_col.semantic_type
),
}
);
// Check nullable.
// Safety: `rows_columns` ensures this column exists.
let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
ensure!(
!has_null || column.column_schema.is_nullable(),
InvalidRequestSnafu {
region_id,
reason: format!("column {} is not null", column.column_schema.name),
}
);
} else {
// For columns not in rows, checks whether they have default value.
ensure!(
@@ -190,11 +231,7 @@ impl WriteRequest {
}
);
return FillDefaultSnafu {
region_id,
column: &column.column_schema.name,
}
.fail();
return FillDefaultSnafu { region_id }.fail();
}
}
@@ -278,6 +315,30 @@ impl WriteRequest {
}
}
/// Validate proto value schema.
pub(crate) fn validate_proto_value(
region_id: RegionId,
value: &Value,
column_schema: &ColumnSchema,
) -> Result<()> {
if let Some(value_type) = proto_value_type(value) {
ensure!(
value_type as i32 == column_schema.datatype,
InvalidRequestSnafu {
region_id,
reason: format!(
"column {} has type {:?}, but schema has type {:?}",
column_schema.column_name,
value_type,
ColumnDataType::from_i32(column_schema.datatype)
),
}
);
}
Ok(())
}
/// Sender and write request.
pub(crate) struct SenderWriteRequest {
/// Result sender.
@@ -362,3 +423,291 @@ impl RequestBody {
}
}
}
#[cfg(test)]
mod tests {
use datatypes::prelude::ConcreteDataType;
use greptime_proto::v1::{Row, SemanticType};
use super::*;
use crate::error::Error;
use crate::metadata::RegionMetadataBuilder;
use crate::proto_util::{i64_value, ts_ms_value};
fn new_column_schema(
name: &str,
data_type: ColumnDataType,
semantic_type: SemanticType,
) -> ColumnSchema {
ColumnSchema {
column_name: name.to_string(),
datatype: data_type as i32,
semantic_type: semantic_type as i32,
}
}
fn check_invalid_request(err: &Error, expect: &str) {
if let Error::InvalidRequest {
region_id: _,
reason,
location: _,
} = err
{
assert_eq!(reason, expect);
} else {
panic!("Unexpected error {err}")
}
}
#[test]
fn test_write_request_duplicate_column() {
let rows = Rows {
schema: vec![
new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
],
rows: vec![],
};
let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap_err();
check_invalid_request(&err, "duplicate column c0");
}
#[test]
fn test_valid_write_request() {
let rows = Rows {
schema: vec![
new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
],
rows: vec![Row {
values: vec![i64_value(1), i64_value(2)],
}],
};
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
assert_eq!(0, request.column_index_by_name("c0").unwrap());
assert_eq!(1, request.column_index_by_name("c1").unwrap());
assert_eq!(None, request.column_index_by_name("c2"));
}
#[test]
fn test_write_request_column_num() {
let rows = Rows {
schema: vec![
new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
],
rows: vec![Row {
values: vec![i64_value(1), i64_value(2), i64_value(3)],
}],
};
let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap_err();
check_invalid_request(&err, "row has 3 columns but schema has 2");
}
fn new_region_metadata() -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1), 1);
builder
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: crate::metadata::SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"k0",
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: crate::metadata::SemanticType::Tag,
column_id: 2,
})
.primary_key(vec![2]);
builder.build().unwrap()
}
#[test]
fn test_check_schema() {
let rows = Rows {
schema: vec![
new_column_schema(
"ts",
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
),
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
],
rows: vec![Row {
values: vec![ts_ms_value(1), i64_value(2)],
}],
};
let metadata = new_region_metadata();
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
request.check_schema(&metadata).unwrap();
}
#[test]
fn test_column_type() {
let rows = Rows {
schema: vec![
new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
],
rows: vec![Row {
values: vec![i64_value(1), i64_value(2)],
}],
};
let metadata = new_region_metadata();
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)");
}
#[test]
fn test_semantic_type() {
let rows = Rows {
schema: vec![
new_column_schema(
"ts",
ColumnDataType::TimestampMillisecond,
SemanticType::Tag,
),
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
],
rows: vec![Row {
values: vec![ts_ms_value(1), i64_value(2)],
}],
};
let metadata = new_region_metadata();
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
}
#[test]
fn test_column_nullable() {
let rows = Rows {
schema: vec![
new_column_schema(
"ts",
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
),
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
],
rows: vec![Row {
values: vec![Value { value: None }, i64_value(2)],
}],
};
let metadata = new_region_metadata();
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(&err, "column ts is not null");
}
#[test]
fn test_column_default() {
let rows = Rows {
schema: vec![new_column_schema(
"k0",
ColumnDataType::Int64,
SemanticType::Tag,
)],
rows: vec![Row {
values: vec![i64_value(1)],
}],
};
let metadata = new_region_metadata();
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(&err, "missing column ts");
}
#[test]
fn test_unknown_column() {
let rows = Rows {
schema: vec![
new_column_schema(
"ts",
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
),
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
],
rows: vec![Row {
values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
}],
};
let metadata = new_region_metadata();
let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
}
#[test]
fn test_fill_missing_columns() {
let rows = Rows {
schema: vec![new_column_schema(
"ts",
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
)],
rows: vec![Row {
values: vec![ts_ms_value(1)],
}],
};
let metadata = new_region_metadata();
let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
assert!(err.is_fill_default());
request.fill_missing_columns(&metadata).unwrap();
let expect_rows = Rows {
schema: vec![
new_column_schema(
"ts",
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
),
new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
],
rows: vec![Row {
values: vec![ts_ms_value(1), Value { value: None }],
}],
};
assert_eq!(expect_rows, request.rows);
}
#[test]
fn test_no_default() {
let rows = Rows {
schema: vec![new_column_schema(
"k0",
ColumnDataType::Int64,
SemanticType::Tag,
)],
rows: vec![Row {
values: vec![i64_value(1)],
}],
};
let metadata = new_region_metadata();
let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let err = request.fill_missing_columns(&metadata).unwrap_err();
check_invalid_request(&err, "column ts does not have default value");
}
}

View File

@@ -37,7 +37,7 @@ use tokio::sync::{mpsc, Mutex};
use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef};
use crate::region::{RegionMap, RegionMapRef};
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
use crate::request::{RegionRequest, RequestBody, SenderWriteRequest, WorkerRequest};
use crate::wal::Wal;
@@ -133,6 +133,13 @@ impl WorkerGroup {
self.worker(region_id).is_region_exists(region_id)
}
/// Returns region of specific `region_id`.
///
/// This method should not be public.
pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
self.worker(region_id).get_region(region_id)
}
/// Get worker for specific `region_id`.
fn worker(&self, region_id: RegionId) -> &RegionWorker {
let mut hasher = DefaultHasher::new();
@@ -252,6 +259,11 @@ impl RegionWorker {
fn is_region_exists(&self, region_id: RegionId) -> bool {
self.regions.is_region_exists(region_id)
}
/// Returns region of specific `region_id`.
fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
self.regions.get_region(region_id)
}
}
impl Drop for RegionWorker {
@@ -285,7 +297,7 @@ struct RegionWorkerLoop<S> {
memtable_builder: MemtableBuilderRef,
}
impl<S> RegionWorkerLoop<S> {
impl<S: LogStore> RegionWorkerLoop<S> {
/// Starts the worker loop.
async fn run(&mut self) {
info!("Start region worker thread {}", self.id);
@@ -353,7 +365,9 @@ impl<S> RegionWorkerLoop<S> {
self.handle_ddl_requests(ddl_requests).await;
}
}
impl<S> RegionWorkerLoop<S> {
/// Takes and handles all ddl requests.
async fn handle_ddl_requests(&mut self, ddl_requests: Vec<RegionRequest>) {
if ddl_requests.is_empty() {

View File

@@ -15,26 +15,64 @@
//! Handling write requests.
use std::collections::{hash_map, HashMap};
use std::mem;
use std::sync::Arc;
use greptime_proto::v1::mito::Mutation;
use greptime_proto::v1::mito::{Mutation, WalEntry};
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::storage::{RegionId, SequenceNumber};
use tokio::sync::oneshot::Sender;
use crate::error::{RegionNotFoundSnafu, Result};
use crate::error::{Error, RegionNotFoundSnafu, Result, WriteGroupSnafu};
use crate::memtable::KeyValues;
use crate::metadata::RegionMetadata;
use crate::proto_util::to_proto_op_type;
use crate::region::version::VersionRef;
use crate::region::version::{VersionControlData, VersionRef};
use crate::region::MitoRegionRef;
use crate::request::SenderWriteRequest;
use crate::request::{SenderWriteRequest, WriteRequest};
use crate::wal::{EntryId, WalWriter};
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
impl<S: LogStore> RegionWorkerLoop<S> {
/// Takes and handles all write requests.
pub(crate) async fn handle_write_requests(&mut self, write_requests: Vec<SenderWriteRequest>) {
if write_requests.is_empty() {
return;
}
let mut region_ctxs = self.prepare_region_write_ctx(write_requests);
// Write to WAL.
let mut wal_writer = self.wal.writer();
for region_ctx in region_ctxs.values_mut() {
if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
region_ctx.set_error(e);
}
}
if let Err(e) = wal_writer.write_to_wal().await.map_err(Arc::new) {
// Failed to write wal.
for mut region_ctx in region_ctxs.into_values() {
region_ctx.set_error(e.clone());
}
return;
}
// Write to memtables.
for mut region_ctx in region_ctxs.into_values() {
region_ctx.write_memtable();
}
}
}
impl<S> RegionWorkerLoop<S> {
/// Validates and groups requests by region.
fn prepare_region_write_ctx(
&self,
write_requests: Vec<SenderWriteRequest>,
) -> HashMap<RegionId, RegionWriteCtx> {
let mut region_ctxs = HashMap::new();
for sender_req in write_requests {
for mut sender_req in write_requests {
let region_id = sender_req.request.region_id;
// Checks whether the region exists.
if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
@@ -53,9 +91,8 @@ impl<S> RegionWorkerLoop<S> {
let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
// Checks whether request schema is compatible with region schema.
if let Err(e) = sender_req
.request
.check_schema(&region_ctx.version.metadata)
if let Err(e) =
maybe_fill_missing_columns(&mut sender_req.request, &region_ctx.version.metadata)
{
send_result(sender_req.sender, Err(e));
@@ -66,10 +103,26 @@ impl<S> RegionWorkerLoop<S> {
region_ctx.push_sender_request(sender_req);
}
todo!()
region_ctxs
}
}
/// Checks the schema and fill missing columns.
fn maybe_fill_missing_columns(request: &mut WriteRequest, metadata: &RegionMetadata) -> Result<()> {
if let Err(e) = request.check_schema(metadata) {
if e.is_fill_default() {
// TODO(yingwen): Add metrics for this case.
// We need to fill default value again. The write request may be a request
// sent before changing the schema.
request.fill_missing_columns(metadata)?;
} else {
return Err(e);
}
}
Ok(())
}
/// Send result to the request.
fn send_result(sender: Option<Sender<Result<()>>>, res: Result<()>) {
if let Some(sender) = sender {
@@ -78,39 +131,126 @@ fn send_result(sender: Option<Sender<Result<()>>>, res: Result<()>) {
}
}
/// Notifier to notify write result on drop.
struct WriteNotify {
/// Error to send to the waiter.
err: Option<Arc<Error>>,
/// Sender to send write result to the waiter for this mutation.
sender: Option<Sender<Result<()>>>,
}
impl WriteNotify {
/// Creates a new notify from the `sender`.
fn new(sender: Option<Sender<Result<()>>>) -> WriteNotify {
WriteNotify { err: None, sender }
}
/// Send result to the waiter.
fn notify_result(&mut self) {
let Some(sender) = self.sender.take() else {
return;
};
if let Some(err) = &self.err {
// Try to send the error to waiters.
let _ = sender.send(Err(err.clone()).context(WriteGroupSnafu));
} else {
// Send success result.
let _ = sender.send(Ok(()));
}
}
}
impl Drop for WriteNotify {
fn drop(&mut self) {
self.notify_result();
}
}
/// Context to keep region metadata and buffer write requests.
struct RegionWriteCtx {
/// Region to write.
region: MitoRegionRef,
/// Version of the region while creating the context.
version: VersionRef,
/// Valid mutations.
mutations: Vec<Mutation>,
/// Result senders.
/// Next sequence number to write.
///
/// The sender is 1:1 map to the mutation in `mutations`.
senders: Vec<Option<Sender<Result<()>>>>,
/// The context assigns a unique sequence number for each row.
next_sequence: SequenceNumber,
/// Next entry id of WAL to write.
next_entry_id: EntryId,
/// Valid WAL entry to write.
///
/// We keep [WalEntry] instead of mutations to avoid taking mutations
/// out of the context to construct the wal entry when we write to the wal.
wal_entry: WalEntry,
/// Notifiers to send write results to waiters.
///
/// The i-th notify is for i-th mutation.
notifiers: Vec<WriteNotify>,
}
impl RegionWriteCtx {
/// Returns an empty context.
fn new(region: MitoRegionRef) -> RegionWriteCtx {
let version = region.version();
let VersionControlData {
version,
committed_sequence,
last_entry_id,
} = region.version_control.current();
RegionWriteCtx {
region,
version,
mutations: Vec::new(),
senders: Vec::new(),
next_sequence: committed_sequence + 1,
next_entry_id: last_entry_id + 1,
wal_entry: WalEntry::default(),
notifiers: Vec::new(),
}
}
/// Push [SenderWriteRequest] to the context.
fn push_sender_request(&mut self, sender_req: SenderWriteRequest) {
self.mutations.push(Mutation {
let num_rows = sender_req.request.rows.rows.len() as u64;
self.wal_entry.mutations.push(Mutation {
op_type: to_proto_op_type(sender_req.request.op_type) as i32,
sequence: 0, // TODO(yingwen): Set sequence.
sequence: self.next_sequence,
rows: Some(sender_req.request.rows),
});
self.senders.push(sender_req.sender);
// Notifiers are 1:1 map to mutations.
self.notifiers.push(WriteNotify::new(sender_req.sender));
// Increase sequence number.
self.next_sequence += num_rows;
}
/// Encode and add WAL entry to the writer.
fn add_wal_entry<S: LogStore>(&self, wal_writer: &mut WalWriter<S>) -> Result<()> {
wal_writer.add_entry(self.region.region_id, self.next_entry_id, &self.wal_entry)
}
/// Sets error and marks all write operations are failed.
fn set_error(&mut self, err: Arc<Error>) {
// Set error for all notifiers
for notify in &mut self.notifiers {
notify.err = Some(err.clone());
}
}
/// Consumes mutations and writes them into mutable memtable.
fn write_memtable(&mut self) {
debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len());
let mutable = self.version.memtables.mutable();
// Takes mutations from the wal entry.
let mutations = mem::take(&mut self.wal_entry.mutations);
for (mutation, notify) in mutations.into_iter().zip(&mut self.notifiers) {
// Write mutation to the memtable.
let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else {
continue;
};
if let Err(e) = mutable.write(&kvs) {
notify.err = Some(Arc::new(e));
}
}
}
}