diff --git a/Cargo.lock b/Cargo.lock index 23eb67428b..97aed491ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3526,6 +3526,7 @@ dependencies = [ "futures", "object-store", "serde", + "serde_json", "snafu", "tokio", ] diff --git a/src/common/telemetry/src/logging.rs b/src/common/telemetry/src/logging.rs index 9bf0603313..d1b3e10ac0 100644 --- a/src/common/telemetry/src/logging.rs +++ b/src/common/telemetry/src/logging.rs @@ -73,6 +73,7 @@ pub fn init_global_logging( .with_target("tower", Level::WARN) .with_target("datafusion", Level::WARN) .with_target("reqwest", Level::WARN) + .with_target("sqlparser", Level::WARN) .with_default( directives .parse::() diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 0356bcd322..df4f77ab7e 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -5,20 +5,23 @@ use async_trait::async_trait; use common_telemetry::logging::info; use object_store::{backend::fs::Backend, util, ObjectStore}; use snafu::ResultExt; +use store_api::manifest::action::ProtocolAction; use store_api::{ logstore::LogStore, manifest::Manifest, storage::{EngineContext, RegionDescriptor, StorageEngine}, }; +use crate::background::JobPoolImpl; use crate::config::{EngineConfig, ObjectStoreConfig}; use crate::error::{self, Error, Result}; +use crate::flush::{FlushSchedulerImpl, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy}; use crate::manifest::action::*; use crate::manifest::region::RegionManifest; +use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef}; use crate::metadata::RegionMetadata; -use crate::region::RegionImpl; +use crate::region::{RegionImpl, StoreConfig}; use crate::sst::FsAccessLayer; -use crate::wal::Wal; /// [StorageEngine] implementation. pub struct EngineImpl { @@ -99,16 +102,16 @@ impl SharedData { object_store, }) } +} - #[inline] - fn region_sst_dir(&self, region_name: &str) -> String { - format!("{}/", region_name) - } +#[inline] +pub fn region_sst_dir(region_name: &str) -> String { + format!("{}/", region_name) +} - #[inline] - fn region_manifest_dir(&self, region_name: &str) -> String { - format!("{}/manifest/", region_name) - } +#[inline] +pub fn region_manifest_dir(region_name: &str) -> String { + format!("{}/manifest/", region_name) } type RegionMap = HashMap>; @@ -117,14 +120,23 @@ struct EngineInner { log_store: Arc, regions: RwLock>, shared: SharedData, + memtable_builder: MemtableBuilderRef, + flush_scheduler: FlushSchedulerRef, + flush_strategy: FlushStrategyRef, } impl EngineInner { pub async fn new(config: EngineConfig, log_store: Arc) -> Result { + let job_pool = Arc::new(JobPoolImpl {}); + let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool)); + Ok(Self { log_store, regions: RwLock::new(Default::default()), shared: SharedData::new(config).await?, + memtable_builder: Arc::new(DefaultMemtableBuilder {}), + flush_scheduler, + flush_strategy: Arc::new(SizeBasedStrategy::default()), }) } @@ -144,29 +156,38 @@ impl EngineInner { .context(error::InvalidRegionDescSnafu { region: ®ion_name, })?; - let wal = Wal::new(region_id, region_name.clone(), self.log_store.clone()); - let sst_dir = &self.shared.region_sst_dir(®ion_name); + let sst_dir = ®ion_sst_dir(®ion_name); let sst_layer = Arc::new(FsAccessLayer::new( sst_dir, self.shared.object_store.clone(), )); - let manifest_dir = self.shared.region_manifest_dir(®ion_name); + let manifest_dir = region_manifest_dir(®ion_name); let manifest = RegionManifest::new(region_id, &manifest_dir, self.shared.object_store.clone()); + let store_config = StoreConfig { + log_store: self.log_store.clone(), + sst_layer, + manifest: manifest.clone(), + memtable_builder: self.memtable_builder.clone(), + flush_scheduler: self.flush_scheduler.clone(), + flush_strategy: self.flush_strategy.clone(), + }; + let region = RegionImpl::new( region_id, region_name.clone(), metadata.clone(), - wal, - sst_layer, - manifest.clone(), + store_config, ); // Persist region metadata manifest - .update(RegionMetaAction::Change(RegionChange { - metadata: Arc::new(metadata), - })) + .update(RegionMetaActionList::new(vec![ + RegionMetaAction::Protocol(ProtocolAction::new()), + RegionMetaAction::Change(RegionChange { + metadata: Arc::new(metadata), + }), + ])) .await?; { diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 937a545580..207a70a767 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -5,6 +5,7 @@ use std::str::Utf8Error; use common_error::prelude::*; use datatypes::arrow; use serde_json::error::Error as JsonError; +use store_api::manifest::action::ProtocolVersion; use store_api::manifest::ManifestVersion; use crate::metadata::Error as MetadataError; @@ -142,6 +143,34 @@ pub enum Error { #[snafu(display("Task already cancelled"))] Cancelled { backtrace: Backtrace }, + + #[snafu(display( + "Manifest protocol forbid to read, min_version: {}, supported_version: {}", + min_version, + supported_version + ))] + ManifestProtocolForbidRead { + min_version: ProtocolVersion, + supported_version: ProtocolVersion, + backtrace: Backtrace, + }, + + #[snafu(display( + "Manifest protocol forbid to write, min_version: {}, supported_version: {}", + min_version, + supported_version + ))] + ManifestProtocolForbidWrite { + min_version: ProtocolVersion, + supported_version: ProtocolVersion, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to decode region action list, {}", msg))] + DecodeRegionMetaActionList { msg: String, backtrace: Backtrace }, + + #[snafu(display("Failed to read line, err: {}", source))] + Readline { source: IoError }, } pub type Result = std::result::Result; @@ -162,7 +191,9 @@ impl ErrorExt for Error { | EncodeJson { .. } | DecodeJson { .. } | JoinTask { .. } - | Cancelled { .. } => StatusCode::Unexpected, + | Cancelled { .. } + | DecodeRegionMetaActionList { .. } + | Readline { .. } => StatusCode::Unexpected, FlushIo { .. } | InitBackend { .. } @@ -173,7 +204,9 @@ impl ErrorExt for Error { | DeleteObject { .. } | WriteWal { .. } | DecodeWalHeader { .. } - | EncodeWalHeader { .. } => StatusCode::StorageUnavailable, + | EncodeWalHeader { .. } + | ManifestProtocolForbidRead { .. } + | ManifestProtocolForbidWrite { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 4d0bb5575d..0cc51fa2bf 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -205,7 +205,11 @@ impl FlushJob { files_to_remove: Vec::default(), }; logging::debug!("Write region edit: {:?} to manifest.", edit); - self.manifest.update(RegionMetaAction::Edit(edit)).await + self.manifest + .update(RegionMetaActionList::with_action(RegionMetaAction::Edit( + edit, + ))) + .await } /// Generates random SST file name in format: `^[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}.parquet$` diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index e3a7fb8ef3..27ba91a6c1 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -7,14 +7,14 @@ pub mod config; mod engine; pub mod error; mod flush; -pub mod manifest; +mod manifest; pub mod memtable; pub mod metadata; mod proto; mod region; mod snapshot; mod sst; -pub mod sync; +mod sync; #[cfg(test)] mod test_util; mod version; diff --git a/src/storage/src/manifest/action.rs b/src/storage/src/manifest/action.rs index 2826231c22..9a237002a2 100644 --- a/src/storage/src/manifest/action.rs +++ b/src/storage/src/manifest/action.rs @@ -1,26 +1,35 @@ +use std::io::{BufRead, BufReader, Write}; + use serde::{Deserialize, Serialize}; use serde_json as json; -use snafu::ResultExt; +use serde_json::ser::to_writer; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::manifest::action::ProtocolAction; +use store_api::manifest::action::ProtocolVersion; +use store_api::manifest::ManifestVersion; use store_api::manifest::MetaAction; use store_api::manifest::Metadata; use store_api::storage::RegionId; use store_api::storage::SequenceNumber; -use crate::error::{DecodeJsonSnafu, EncodeJsonSnafu, Result, Utf8Snafu}; +use crate::error::{ + DecodeJsonSnafu, DecodeRegionMetaActionListSnafu, EncodeJsonSnafu, + ManifestProtocolForbidReadSnafu, ReadlineSnafu, Result, +}; use crate::metadata::{RegionMetadataRef, VersionNumber}; use crate::sst::FileMeta; -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct RegionChange { pub metadata: RegionMetadataRef, } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct RegionRemove { pub region_id: RegionId, } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct RegionEdit { pub region_id: RegionId, pub region_version: VersionNumber, @@ -29,39 +38,186 @@ pub struct RegionEdit { pub files_to_remove: Vec, } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct RegionManifestData { pub region_meta: RegionMetadataRef, // TODO(dennis): version metadata } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct RegionMetaActionList { + pub actions: Vec, + pub prev_version: ManifestVersion, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum RegionMetaAction { + Protocol(ProtocolAction), Change(RegionChange), Remove(RegionRemove), Edit(RegionEdit), } -impl RegionMetaAction { - pub(crate) fn encode(&self) -> Result> { - Ok(json::to_string(self).context(EncodeJsonSnafu)?.into_bytes()) +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +struct VersionHeader { + prev_version: ManifestVersion, +} + +const NEWLINE: &[u8] = b"\n"; + +impl RegionMetaActionList { + pub fn with_action(action: RegionMetaAction) -> Self { + Self { + actions: vec![action], + prev_version: 0, + } } - pub(crate) fn decode(bs: &[u8]) -> Result { - json::from_str(std::str::from_utf8(bs).context(Utf8Snafu)?).context(DecodeJsonSnafu) + pub fn new(actions: Vec) -> Self { + Self { + actions, + prev_version: 0, + } + } + + /// Encode self into json in the form of string lines, starts with prev_version and then action json list. + pub(crate) fn encode(&self) -> Result> { + let mut bytes = Vec::default(); + + { + // Encode prev_version + let v = VersionHeader { + prev_version: self.prev_version, + }; + + to_writer(&mut bytes, &v).context(EncodeJsonSnafu)?; + // unwrap is fine here, because we write into a buffer. + bytes.write_all(NEWLINE).unwrap(); + } + + for action in &self.actions { + to_writer(&mut bytes, action).context(EncodeJsonSnafu)?; + bytes.write_all(NEWLINE).unwrap(); + } + + Ok(bytes) + } + + pub(crate) fn decode( + bs: &[u8], + reader_version: ProtocolVersion, + ) -> Result<(Self, Option)> { + let mut lines = BufReader::new(bs).lines(); + + let mut action_list = RegionMetaActionList { + actions: Vec::default(), + prev_version: 0, + }; + + { + let first_line = lines + .next() + .with_context(|| DecodeRegionMetaActionListSnafu { + msg: format!( + "Invalid content in manifest: {}", + std::str::from_utf8(bs).unwrap_or("**invalid bytes**") + ), + })? + .context(ReadlineSnafu)?; + + // Decode prev_version + let v: VersionHeader = json::from_str(&first_line).context(DecodeJsonSnafu)?; + action_list.prev_version = v.prev_version; + } + + // Decode actions + let mut protocol_action = None; + let mut actions = Vec::default(); + for line in lines { + let line = &line.context(ReadlineSnafu)?; + let action: RegionMetaAction = json::from_str(line).context(DecodeJsonSnafu)?; + + if let RegionMetaAction::Protocol(p) = &action { + ensure!( + p.is_readable(reader_version), + ManifestProtocolForbidReadSnafu { + min_version: p.min_reader_version, + supported_version: reader_version, + } + ); + protocol_action = Some(p.clone()); + } + + actions.push(action); + } + action_list.actions = actions; + + Ok((action_list, protocol_action)) } } impl Metadata for RegionManifestData {} -impl MetaAction for RegionMetaAction { - type MetadataId = RegionId; - - fn metadata_id(&self) -> RegionId { - match self { - RegionMetaAction::Change(c) => c.metadata.id, - RegionMetaAction::Remove(r) => r.region_id, - RegionMetaAction::Edit(e) => e.region_id, - } +impl MetaAction for RegionMetaActionList { + fn set_prev_version(&mut self, version: ManifestVersion) { + self.prev_version = version; + } +} + +#[cfg(test)] +mod tests { + use common_telemetry::logging; + + use super::*; + + #[test] + fn test_encode_decode_action_list() { + common_telemetry::init_default_ut_logging(); + let mut protocol = ProtocolAction::new(); + protocol.min_reader_version = 1; + let mut action_list = RegionMetaActionList::new(vec![ + RegionMetaAction::Protocol(protocol.clone()), + RegionMetaAction::Edit(RegionEdit { + region_id: 1, + region_version: 10, + flush_sequence: 99, + files_to_add: vec![ + FileMeta { + file_path: "test1".to_string(), + level: 1, + }, + FileMeta { + file_path: "test2".to_string(), + level: 2, + }, + ], + files_to_remove: vec![FileMeta { + file_path: "test0".to_string(), + level: 0, + }], + }), + ]); + action_list.set_prev_version(3); + + let bs = action_list.encode().unwrap(); + // {"prev_version":3} + // {"Protocol":{"min_reader_version":1,"min_writer_version":0}} + // {"Edit":{"region_id":1,"region_version":10,"flush_sequence":99,"files_to_add":[{"file_path":"test1","level":1},{"file_path":"test2","level":2}],"files_to_remove":[{"file_path":"test0","level":0}]}} + + logging::debug!( + "Encoded action list: \r\n{}", + String::from_utf8(bs.clone()).unwrap() + ); + + let e = RegionMetaActionList::decode(&bs, 0); + assert!(e.is_err()); + assert_eq!( + "Manifest protocol forbid to read, min_version: 1, supported_version: 0", + format!("{}", e.err().unwrap()) + ); + + let (decode_list, p) = RegionMetaActionList::decode(&bs, 1).unwrap(); + assert_eq!(decode_list, action_list); + assert_eq!(p.unwrap(), protocol); } } diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index 1d266326ce..3011a2cb9d 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -4,13 +4,16 @@ use std::sync::{ Arc, }; +use arc_swap::ArcSwap; use async_trait::async_trait; use common_telemetry::logging; use object_store::ObjectStore; +use snafu::ensure; +use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion}; use store_api::manifest::*; use store_api::storage::RegionId; -use crate::error::{Error, Result}; +use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result}; use crate::manifest::action::*; use crate::manifest::storage::ManifestObjectStore; use crate::manifest::storage::ObjectStoreLogIterator; @@ -23,7 +26,7 @@ pub struct RegionManifest { #[async_trait] impl Manifest for RegionManifest { type Error = Error; - type MetaAction = RegionMetaAction; + type MetaAction = RegionMetaActionList; type MetadataId = RegionId; type Metadata = RegionManifestData; @@ -33,8 +36,8 @@ impl Manifest for RegionManifest { } } - async fn update(&self, action: RegionMetaAction) -> Result { - self.inner.save(&action).await + async fn update(&self, action_list: RegionMetaActionList) -> Result { + self.inner.save(action_list).await } async fn load(&self) -> Result> { @@ -49,13 +52,17 @@ impl Manifest for RegionManifest { let mut iter = self.inner.scan(start_bound, MAX_VERSION).await?; - match iter.next_action().await? { - Some((_v, RegionMetaAction::Change(c))) => Ok(Some(RegionManifestData { - region_meta: c.metadata, - })), - Some(_) => todo!(), - None => Ok(None), + while let Some((_v, action_list)) = iter.next_action().await? { + for action in action_list.actions { + if let RegionMetaAction::Change(c) = action { + return Ok(Some(RegionManifestData { + region_meta: c.metadata, + })); + } + } } + + Ok(None) } async fn checkpoint(&self) -> Result { @@ -71,18 +78,26 @@ struct RegionManifestInner { region_id: RegionId, store: Arc, version: AtomicU64, + /// Current using protocol + protocol: ArcSwap, + /// Current node supported protocols (reader_version, writer_version) + supported_reader_version: ProtocolVersion, + supported_writer_version: ProtocolVersion, } -struct RegionMetaActionIterator { +struct RegionMetaActionListIterator { log_iter: ObjectStoreLogIterator, + reader_version: ProtocolVersion, } -impl RegionMetaActionIterator { - async fn next_action(&mut self) -> Result> { +impl RegionMetaActionListIterator { + async fn next_action(&mut self) -> Result> { match self.log_iter.next_log().await? { Some((v, bytes)) => { - let action: RegionMetaAction = RegionMetaAction::decode(&bytes)?; - Ok(Some((v, action))) + //TODO(dennis): save protocol into inner's protocol when recovering + let (action_list, _protocol) = + RegionMetaActionList::decode(&bytes, self.reader_version)?; + Ok(Some((v, action_list))) } None => Ok(None), } @@ -91,11 +106,16 @@ impl RegionMetaActionIterator { impl RegionManifestInner { fn new(region_id: RegionId, manifest_dir: &str, object_store: ObjectStore) -> Self { + let (reader_version, writer_version) = action::supported_protocol_version(); + Self { region_id, store: Arc::new(ManifestObjectStore::new(manifest_dir, object_store)), // TODO(dennis): recover the last version from history version: AtomicU64::new(0), + protocol: ArcSwap::new(Arc::new(ProtocolAction::new())), + supported_reader_version: reader_version, + supported_writer_version: writer_version, } } @@ -109,16 +129,26 @@ impl RegionManifestInner { self.version.load(Ordering::Relaxed) } - async fn save(&self, action: &RegionMetaAction) -> Result { + async fn save(&self, action_list: RegionMetaActionList) -> Result { + let protocol = self.protocol.load(); + + ensure!( + protocol.is_writable(self.supported_writer_version), + ManifestProtocolForbidWriteSnafu { + min_version: protocol.min_writer_version, + supported_version: self.supported_writer_version, + } + ); + let version = self.inc_version(); logging::debug!( "Save region metadata action: {:?}, version: {}", - action, + action_list, version ); - self.store.save(version, &action.encode()?).await?; + self.store.save(version, &action_list.encode()?).await?; Ok(version) } @@ -127,9 +157,10 @@ impl RegionManifestInner { &self, start: ManifestVersion, end: ManifestVersion, - ) -> Result { - Ok(RegionMetaActionIterator { + ) -> Result { + Ok(RegionMetaActionListIterator { log_iter: self.store.scan(start, end).await?, + reader_version: self.supported_reader_version, }) } } @@ -172,9 +203,11 @@ mod tests { assert!(manifest.load().await.unwrap().is_none()); manifest - .update(RegionMetaAction::Change(RegionChange { - metadata: region_meta.clone(), - })) + .update(RegionMetaActionList::with_action(RegionMetaAction::Change( + RegionChange { + metadata: region_meta.clone(), + }, + ))) .await .unwrap(); @@ -193,9 +226,11 @@ mod tests { let metadata: RegionMetadata = desc.try_into().unwrap(); let region_meta = Arc::new(metadata); manifest - .update(RegionMetaAction::Change(RegionChange { - metadata: region_meta.clone(), - })) + .update(RegionMetaActionList::with_action(RegionMetaAction::Change( + RegionChange { + metadata: region_meta.clone(), + }, + ))) .await .unwrap(); diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 55c1ff5da2..4303b273a0 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -9,11 +9,10 @@ use snafu::ensure; use store_api::logstore::LogStore; use store_api::storage::{ReadContext, Region, RegionId, RegionMeta, WriteContext, WriteResponse}; -use crate::background::JobPoolImpl; use crate::error::{self, Error, Result}; -use crate::flush::{FlushSchedulerImpl, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy}; +use crate::flush::{FlushSchedulerRef, FlushStrategyRef}; use crate::manifest::region::RegionManifest; -use crate::memtable::{DefaultMemtableBuilder, MemtableVersion}; +use crate::memtable::{MemtableBuilderRef, MemtableVersion}; use crate::metadata::{RegionMetaImpl, RegionMetadata}; pub use crate::region::writer::{RegionWriter, RegionWriterRef, WriterContext}; use crate::snapshot::SnapshotImpl; @@ -59,34 +58,42 @@ impl Region for RegionImpl { } } +/// Storage related config for region. +/// +/// Contains all necessary storage related components needed by the region, such as logstore, +/// manifest, memtable builder. +pub struct StoreConfig { + pub log_store: Arc, + pub sst_layer: AccessLayerRef, + pub manifest: RegionManifest, + pub memtable_builder: MemtableBuilderRef, + pub flush_scheduler: FlushSchedulerRef, + pub flush_strategy: FlushStrategyRef, +} + impl RegionImpl { pub fn new( id: RegionId, name: String, metadata: RegionMetadata, - wal: Wal, - sst_layer: AccessLayerRef, - manifest: RegionManifest, + store_config: StoreConfig, ) -> RegionImpl { - let memtable_builder = Arc::new(DefaultMemtableBuilder {}); let memtable_version = MemtableVersion::new(); - // TODO(yingwen): Pass flush scheduler to `RegionImpl::new`. - let job_pool = Arc::new(JobPoolImpl {}); - let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool)); - let version_control = VersionControl::new(metadata, memtable_version); + let wal = Wal::new(id, name.clone(), store_config.log_store); + let inner = Arc::new(RegionInner { shared: Arc::new(SharedData { id, name, version_control: Arc::new(version_control), }), - writer: Arc::new(RegionWriter::new(memtable_builder)), + writer: Arc::new(RegionWriter::new(store_config.memtable_builder)), wal, - flush_strategy: Arc::new(SizeBasedStrategy::default()), - flush_scheduler, - sst_layer, - manifest, + flush_strategy: store_config.flush_strategy, + flush_scheduler: store_config.flush_scheduler, + sst_layer: store_config.sst_layer, + manifest: store_config.manifest, }); RegionImpl { inner } diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 9f91ce1bfd..a072176a89 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -1,18 +1,14 @@ //! Region tests. +mod flush; mod read_write; use datatypes::type_id::LogicalTypeId; -use log_store::fs::noop::NoopLogStore; -use object_store::{backend::fs::Backend, ObjectStore}; -use store_api::manifest::Manifest; use store_api::storage::consts; use tempdir::TempDir; use super::*; -use crate::manifest::region::RegionManifest; -use crate::sst::FsAccessLayer; -use crate::test_util::{self, descriptor_util::RegionDescBuilder, schema_util}; +use crate::test_util::{self, config_util, descriptor_util::RegionDescBuilder, schema_util}; #[tokio::test] async fn test_new_region() { @@ -25,26 +21,15 @@ async fn test_new_region() { .build(); let metadata = desc.try_into().unwrap(); - let wal = Wal::new(region_id, region_name, Arc::new(NoopLogStore::default())); let store_dir = TempDir::new("test_new_region") .unwrap() .path() .to_string_lossy() .to_string(); - let accessor = Backend::build().root(&store_dir).finish().await.unwrap(); - let object_store = ObjectStore::new(accessor); - let sst_layer = Arc::new(FsAccessLayer::new("/", object_store.clone())); - let manifest = RegionManifest::new(region_id, "/manifest/", object_store); + let store_config = config_util::new_store_config(&store_dir, region_id, region_name).await; - let region = RegionImpl::new( - region_id, - region_name.to_string(), - metadata, - wal, - sst_layer, - manifest, - ); + let region = RegionImpl::new(region_id, region_name.to_string(), metadata, store_config); let expect_schema = schema_util::new_schema_ref( &[ diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs new file mode 100644 index 0000000000..d2623f69e8 --- /dev/null +++ b/src/storage/src/region/tests/flush.rs @@ -0,0 +1,108 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use log_store::fs::noop::NoopLogStore; +use store_api::storage::WriteResponse; +use tempdir::TempDir; + +use crate::engine; +use crate::flush::{FlushStrategy, FlushStrategyRef}; +use crate::region::tests::read_write::{self, Tester}; +use crate::region::{RegionImpl, SharedDataRef}; +use crate::test_util::config_util; + +const REGION_NAME: &str = "region-flush-0"; + +/// Create a new region for flush test +async fn new_region_for_flush( + store_dir: &str, + enable_version_column: bool, + flush_strategy: FlushStrategyRef, +) -> RegionImpl { + let region_id = 0; + + let metadata = read_write::new_metadata(REGION_NAME, enable_version_column); + + let mut store_config = config_util::new_store_config(store_dir, region_id, REGION_NAME).await; + store_config.flush_strategy = flush_strategy; + + RegionImpl::new(region_id, REGION_NAME.to_string(), metadata, store_config) +} + +struct FlushTester { + tester: Tester, +} + +impl FlushTester { + async fn new(store_dir: &str, flush_strategy: FlushStrategyRef) -> FlushTester { + let region = new_region_for_flush(store_dir, false, flush_strategy).await; + + FlushTester { + tester: Tester::with_region(region), + } + } + + async fn put(&self, data: &[(i64, Option)]) -> WriteResponse { + self.tester.put(data).await + } +} + +#[derive(Default)] +struct FlushSwitch { + should_flush: AtomicBool, +} + +impl FlushSwitch { + fn set_should_flush(&self, should_flush: bool) { + self.should_flush.store(should_flush, Ordering::Relaxed); + } +} + +impl FlushStrategy for FlushSwitch { + fn should_flush( + &self, + _shared: &SharedDataRef, + _bytes_mutable: usize, + _bytes_total: usize, + ) -> bool { + self.should_flush.load(Ordering::Relaxed) + } +} + +#[tokio::test] +async fn test_flush() { + common_telemetry::init_default_ut_logging(); + + let dir = TempDir::new("flush").unwrap(); + let store_dir = dir.path().to_str().unwrap(); + + let flush_switch = Arc::new(FlushSwitch::default()); + // Always trigger flush before write. + let tester = FlushTester::new(store_dir, flush_switch.clone()).await; + + let data = [(1000, Some(100))]; + // Put one element so we have content to flush. + tester.put(&data).await; + + // Now set should flush to true to trigger flush. + flush_switch.set_should_flush(true); + // Put element to trigger flush. + tester.put(&data).await; + + // Now put another data to trigger write stall and wait until last flush done to + // ensure at least one parquet file is generated. + tester.put(&data).await; + + // Check parquet files. + let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir(REGION_NAME)); + let mut has_parquet_file = false; + for entry in std::fs::read_dir(sst_dir).unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + if !path.is_dir() { + assert_eq!("parquet", path.extension().unwrap()); + has_parquet_file = true; + } + } + assert!(has_parquet_file); +} diff --git a/src/storage/src/region/tests/read_write.rs b/src/storage/src/region/tests/read_write.rs index 7a06c3c520..7628b2ee4f 100644 --- a/src/storage/src/region/tests/read_write.rs +++ b/src/storage/src/region/tests/read_write.rs @@ -6,21 +6,24 @@ use datatypes::prelude::*; use datatypes::type_id::LogicalTypeId; use datatypes::vectors::Int64Vector; use log_store::fs::noop::NoopLogStore; -use object_store::{backend::fs::Backend, ObjectStore}; -use store_api::manifest::Manifest; use store_api::storage::{ consts, Chunk, ChunkReader, PutOperation, ReadContext, Region, RegionMeta, ScanRequest, SequenceNumber, Snapshot, WriteContext, WriteRequest, WriteResponse, }; use tempdir::TempDir; -use crate::manifest::region::RegionManifest; -use crate::region::RegionImpl; -use crate::sst::FsAccessLayer; -use crate::test_util::{self, descriptor_util::RegionDescBuilder, write_batch_util}; -use crate::wal::Wal; +use crate::region::{RegionImpl, RegionMetadata}; +use crate::test_util::{self, config_util, descriptor_util::RegionDescBuilder, write_batch_util}; use crate::write_batch::{PutData, WriteBatch}; +pub fn new_metadata(region_name: &str, enable_version_column: bool) -> RegionMetadata { + let desc = RegionDescBuilder::new(region_name) + .enable_version_column(enable_version_column) + .push_value_column(("v1", LogicalTypeId::Int64, true)) + .build(); + desc.try_into().unwrap() +} + /// Create a new region for read/write test async fn new_region_for_rw( store_dir: &str, @@ -28,28 +31,12 @@ async fn new_region_for_rw( ) -> RegionImpl { let region_id = 0; let region_name = "region-rw-0"; - let sst_dir = format!("{}/{}/", store_dir, region_name); - let manifest_dir = format!("{}/{}/maniffest/", store_dir, region_name); - let desc = RegionDescBuilder::new(region_name) - .enable_version_column(enable_version_column) - .push_value_column(("v1", LogicalTypeId::Int64, true)) - .build(); - let metadata = desc.try_into().unwrap(); - let wal = Wal::new(region_id, region_name, Arc::new(NoopLogStore::default())); - let accessor = Backend::build().root(store_dir).finish().await.unwrap(); - let object_store = ObjectStore::new(accessor); - let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone())); - let manifest = RegionManifest::new(region_id, &manifest_dir, object_store); + let metadata = new_metadata(region_name, enable_version_column); - RegionImpl::new( - region_id, - region_name.to_string(), - metadata, - wal, - sst_layer, - manifest, - ) + let store_config = config_util::new_store_config(store_dir, region_id, region_name).await; + + RegionImpl::new(region_id, region_name.to_string(), metadata, store_config) } fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch { @@ -104,7 +91,7 @@ fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<(i64, Option)>) { } /// Test region without considering version column. -struct Tester { +pub struct Tester { region: RegionImpl, write_ctx: WriteContext, read_ctx: ReadContext, @@ -114,6 +101,10 @@ impl Tester { async fn new(store_dir: &str) -> Tester { let region = new_region_for_rw(store_dir, false).await; + Tester::with_region(region) + } + + pub fn with_region(region: RegionImpl) -> Tester { Tester { region, write_ctx: WriteContext::default(), @@ -124,7 +115,7 @@ impl Tester { /// Put without version specified. /// /// Format of data: (timestamp, v1), timestamp is key, v1 is value. - async fn put(&self, data: &[(i64, Option)]) -> WriteResponse { + pub async fn put(&self, data: &[(i64, Option)]) -> WriteResponse { // Build a batch without version. let mut batch = new_write_batch_for_test(false); let put_data = new_put_data(data); diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index a8f579478f..30d20c288f 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -21,17 +21,28 @@ use crate::write_batch::WriteBatch; pub type RegionWriterRef = Arc; +// TODO(yingwen): Add benches for write and support group commit to improve write throughput. + +/// Region writer manages all write operations to the region. pub struct RegionWriter { + /// Inner writer guarded by write lock, the write lock is used to ensure + /// all write operations are serialized. inner: Mutex, + /// Version lock, protects read-write-update to region `Version`. + /// + /// Increasing committed sequence should be guarded by this lock. + version_mutex: Mutex<()>, } impl RegionWriter { pub fn new(memtable_builder: MemtableBuilderRef) -> RegionWriter { RegionWriter { inner: Mutex::new(WriterInner::new(memtable_builder)), + version_mutex: Mutex::new(()), } } + /// Write to region in the write lock. pub async fn write( &self, ctx: &WriteContext, @@ -39,17 +50,48 @@ impl RegionWriter { writer_ctx: WriterContext<'_, S>, ) -> Result { let mut inner = self.inner.lock().await; - inner.write(ctx, request, writer_ctx).await + inner + .write(&self.version_mutex, ctx, request, writer_ctx) + .await } + /// Apply version edit. pub async fn apply_version_edit( &self, wal: &Wal, edit: VersionEdit, shared: &SharedDataRef, ) -> Result<()> { - let mut inner = self.inner.lock().await; - inner.apply_version_edit(wal, edit, shared).await + // HACK: We won't acquire the write lock here because write stall would hold + // write lock thus we have no chance to get the lock and apply the version edit. + // So we add a version lock to ensure modification to `VersionControl` is + // serialized. + let version_control = &shared.version_control; + + let _lock = self.version_mutex.lock().await; + let next_sequence = version_control.committed_sequence() + 1; + + self.persist_manifest_version(wal, next_sequence, &edit) + .await?; + + version_control.apply_edit(edit); + + version_control.set_committed_sequence(next_sequence); + + Ok(()) + } + + async fn persist_manifest_version( + &self, + wal: &Wal, + seq: SequenceNumber, + edit: &VersionEdit, + ) -> Result<()> { + let header = WalHeader::with_last_manifest_version(edit.manifest_version); + + wal.write_to_wal(seq, header, Payload::None).await?; + + Ok(()) } } @@ -85,13 +127,13 @@ impl WriterInner { } } - // TODO(yingwen): Support group commit so we can avoid taking mutable reference. /// Write `WriteBatch` to region, now the schema of batch needs to be validated outside. /// /// Mutable reference of writer ensure no other reference of this writer can modify the /// version control (write is exclusive). async fn write( &mut self, + version_mutex: &Mutex<()>, _ctx: &WriteContext, request: WriteBatch, writer_ctx: WriterContext<'_, S>, @@ -102,6 +144,7 @@ impl WriterInner { let version_control = writer_ctx.version_control(); let version = version_control.current(); + let _lock = version_mutex.lock().await; let committed_sequence = version_control.committed_sequence(); // Sequence for current write batch. let next_sequence = committed_sequence + 1; @@ -214,6 +257,10 @@ impl WriterInner { // However the last flush job may fail, in which case, we just return error // and abort current write request. The flush handle is left empty, so the next // time we still have chance to trigger a new flush. + logging::info!("Write stall, region: {}", shared.name); + + // TODO(yingwen): We should release the write lock during waiting flush done, which + // needs something like async condvar. flush_handle.join().await.map_err(|e| { logging::error!( "Previous flush job failed, region: {}, err: {}", @@ -250,39 +297,6 @@ impl WriterInner { Ok(()) } - async fn apply_version_edit( - &mut self, - wal: &Wal, - edit: VersionEdit, - shared: &SharedDataRef, - ) -> Result<()> { - let version_control = &shared.version_control; - - let next_sequence = version_control.committed_sequence() + 1; - - self.persist_manifest_version(wal, next_sequence, &edit) - .await?; - - version_control.apply_edit(edit); - - version_control.set_committed_sequence(next_sequence); - - Ok(()) - } - - async fn persist_manifest_version( - &self, - wal: &Wal, - seq: SequenceNumber, - edit: &VersionEdit, - ) -> Result<()> { - let header = WalHeader::with_last_manifest_version(edit.manifest_version); - - wal.write_to_wal(seq, header, Payload::None).await?; - - Ok(()) - } - #[inline] fn alloc_memtable_id(&mut self) -> MemtableId { self.last_memtable_id += 1; diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 35fb190e99..c8e6ea05b1 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -107,7 +107,7 @@ impl FileHandleInner { } /// Immutable metadata of a sst file. -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct FileMeta { pub file_path: String, /// SST level of the file. diff --git a/src/storage/src/test_util.rs b/src/storage/src/test_util.rs index 92828d8d59..c18e61b19e 100644 --- a/src/storage/src/test_util.rs +++ b/src/storage/src/test_util.rs @@ -1,3 +1,4 @@ +pub mod config_util; pub mod descriptor_util; pub mod schema_util; pub mod write_batch_util; diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs new file mode 100644 index 0000000000..392d055338 --- /dev/null +++ b/src/storage/src/test_util/config_util.rs @@ -0,0 +1,40 @@ +use std::sync::Arc; + +use log_store::fs::noop::NoopLogStore; +use object_store::{backend::fs::Backend, ObjectStore}; +use store_api::manifest::Manifest; +use store_api::storage::RegionId; + +use crate::background::JobPoolImpl; +use crate::engine; +use crate::flush::{FlushSchedulerImpl, SizeBasedStrategy}; +use crate::manifest::region::RegionManifest; +use crate::memtable::DefaultMemtableBuilder; +use crate::region::StoreConfig; +use crate::sst::FsAccessLayer; + +/// Create a new StoreConfig for test. +pub async fn new_store_config( + store_dir: &str, + region_id: RegionId, + region_name: &str, +) -> StoreConfig { + let sst_dir = engine::region_sst_dir(region_name); + let manifest_dir = engine::region_manifest_dir(region_name); + + let accessor = Backend::build().root(store_dir).finish().await.unwrap(); + let object_store = ObjectStore::new(accessor); + let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone())); + let manifest = RegionManifest::new(region_id, &manifest_dir, object_store); + let job_pool = Arc::new(JobPoolImpl {}); + let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool)); + + StoreConfig { + log_store: Arc::new(NoopLogStore::default()), + sst_layer, + manifest, + memtable_builder: Arc::new(DefaultMemtableBuilder {}), + flush_scheduler, + flush_strategy: Arc::new(SizeBasedStrategy::default()), + } +} diff --git a/src/storage/src/version.rs b/src/storage/src/version.rs index 6b5abb4f5e..40bb398b8b 100644 --- a/src/storage/src/version.rs +++ b/src/storage/src/version.rs @@ -97,16 +97,7 @@ impl VersionControl { pub fn apply_edit(&self, edit: VersionEdit) { let mut version_to_update = self.version.lock(); - - if let Some(max_memtable_id) = edit.max_memtable_id { - // Remove flushed memtables - let memtable_version = version_to_update.memtables(); - let removed = memtable_version.remove_immutables(max_memtable_id); - version_to_update.memtables = Arc::new(removed); - } - version_to_update.apply_edit(edit); - version_to_update.commit(); } } @@ -189,6 +180,14 @@ impl Version { if self.manifest_version < edit.manifest_version { self.manifest_version = edit.manifest_version; } + + if let Some(max_memtable_id) = edit.max_memtable_id { + // Remove flushed memtables + let memtable_version = self.memtables(); + let removed = memtable_version.remove_immutables(max_memtable_id); + self.memtables = Arc::new(removed); + } + let handles_to_add = edit.files_to_add.into_iter().map(FileHandle::new); let merged_ssts = self.ssts.merge(handles_to_add); diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index 4994b4e1dd..d3d9de4462 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -129,6 +129,7 @@ impl Wal { pub enum Payload<'a> { None, // only header WriteBatchArrow(&'a WriteBatch), + #[allow(dead_code)] WriteBatchProto(&'a WriteBatch), } diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index e2bb64282a..6b6493686a 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -19,4 +19,5 @@ snafu = { version = "0.7", features = ["backtraces"] } [dev-dependencies] async-stream = "0.3" +serde_json = "1.0" tokio = { version = "1.0", features = ["full"] } diff --git a/src/store-api/src/manifest.rs b/src/store-api/src/manifest.rs index b154f38cee..95eecccc33 100644 --- a/src/store-api/src/manifest.rs +++ b/src/store-api/src/manifest.rs @@ -1,4 +1,5 @@ //! metadata service +pub mod action; mod storage; use async_trait::async_trait; @@ -15,12 +16,8 @@ pub trait Metadata: Clone {} pub trait MetadataId: Clone + Copy {} -/// The action to apply on metadata pub trait MetaAction: Serialize + DeserializeOwned { - type MetadataId: MetadataId; - - /// Returns the metadata id of the action - fn metadata_id(&self) -> Self::MetadataId; + fn set_prev_version(&mut self, version: ManifestVersion); } /// Manifest service diff --git a/src/store-api/src/manifest/action.rs b/src/store-api/src/manifest/action.rs new file mode 100644 index 0000000000..89070d02ae --- /dev/null +++ b/src/store-api/src/manifest/action.rs @@ -0,0 +1,82 @@ +///! Common actions for manifest +use serde::{Deserialize, Serialize}; + +pub type ProtocolVersion = u16; + +/// Current reader and writer versions +/// TODO(dennis): configurable +const READER_VERSION: ProtocolVersion = 0; +const WRITER_VERSION: ProtocolVersion = 0; + +/// The maximum protocol version we are currently allowed to use, +/// TODO(dennis): reading from configuration. +pub fn supported_protocol_version() -> (ProtocolVersion, ProtocolVersion) { + (READER_VERSION, WRITER_VERSION) +} + +/// Protocol action that used to block older clients from reading or writing the log when backwards +/// incompatible changes are made to the protocol. clients should be tolerant of messages and +/// fields that they do not understand. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ProtocolAction { + pub min_reader_version: ProtocolVersion, + pub min_writer_version: ProtocolVersion, +} + +impl Default for ProtocolAction { + fn default() -> Self { + let (min_reader_version, min_writer_version) = supported_protocol_version(); + Self { + min_reader_version, + min_writer_version, + } + } +} + +impl ProtocolAction { + pub fn new() -> Self { + Self::default() + } + + pub fn is_readable(&self, reader_version: ProtocolVersion) -> bool { + reader_version >= self.min_reader_version + } + + pub fn is_writable(&self, writer_version: ProtocolVersion) -> bool { + writer_version >= self.min_writer_version + } +} + +#[cfg(test)] +mod tests { + use serde_json as json; + + use super::*; + + #[test] + fn test_protocol_action() { + let mut action = ProtocolAction::new(); + + assert!(action.is_readable(0)); + assert!(action.is_writable(0)); + action.min_reader_version = 2; + action.min_writer_version = 3; + assert!(!action.is_readable(0)); + assert!(!action.is_writable(0)); + assert!(action.is_readable(2)); + assert!(action.is_writable(3)); + assert!(action.is_readable(3)); + assert!(action.is_writable(4)); + + let s = json::to_string(&action).unwrap(); + assert_eq!("{\"min_reader_version\":2,\"min_writer_version\":3}", s); + + let action_decoded: ProtocolAction = json::from_str(&s).unwrap(); + assert!(!action_decoded.is_readable(0)); + assert!(!action_decoded.is_writable(0)); + assert!(action_decoded.is_readable(2)); + assert!(action_decoded.is_writable(3)); + assert!(action_decoded.is_readable(3)); + assert!(action_decoded.is_writable(4)); + } +}