feat: Cherry picks lost commits of flush (#111)

* fix: Fix write stall blocks flush applying version

refactor: Use store config to help constructing Region

chore: Address CR comments

* feat: adds manifest protocol supporting and refactor region metadata protocol

feat: ignore sqlparser log

refactor: PREV_VERSION_KEY constant

refactor: minor change for checking readable/writable

fix: address CR problems

refactor: use binary literal

Co-authored-by: Dennis Zhuang <killme2008@gmail.com>
This commit is contained in:
evenyag
2022-07-26 15:52:39 +08:00
committed by GitHub
parent bf5975ca3e
commit c9db093af7
21 changed files with 664 additions and 187 deletions

1
Cargo.lock generated
View File

@@ -3526,6 +3526,7 @@ dependencies = [
"futures",
"object-store",
"serde",
"serde_json",
"snafu",
"tokio",
]

View File

@@ -73,6 +73,7 @@ pub fn init_global_logging(
.with_target("tower", Level::WARN)
.with_target("datafusion", Level::WARN)
.with_target("reqwest", Level::WARN)
.with_target("sqlparser", Level::WARN)
.with_default(
directives
.parse::<filter::LevelFilter>()

View File

@@ -5,20 +5,23 @@ use async_trait::async_trait;
use common_telemetry::logging::info;
use object_store::{backend::fs::Backend, util, ObjectStore};
use snafu::ResultExt;
use store_api::manifest::action::ProtocolAction;
use store_api::{
logstore::LogStore,
manifest::Manifest,
storage::{EngineContext, RegionDescriptor, StorageEngine},
};
use crate::background::JobPoolImpl;
use crate::config::{EngineConfig, ObjectStoreConfig};
use crate::error::{self, Error, Result};
use crate::flush::{FlushSchedulerImpl, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy};
use crate::manifest::action::*;
use crate::manifest::region::RegionManifest;
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef};
use crate::metadata::RegionMetadata;
use crate::region::RegionImpl;
use crate::region::{RegionImpl, StoreConfig};
use crate::sst::FsAccessLayer;
use crate::wal::Wal;
/// [StorageEngine] implementation.
pub struct EngineImpl<S: LogStore> {
@@ -99,16 +102,16 @@ impl SharedData {
object_store,
})
}
}
#[inline]
fn region_sst_dir(&self, region_name: &str) -> String {
format!("{}/", region_name)
}
#[inline]
pub fn region_sst_dir(region_name: &str) -> String {
format!("{}/", region_name)
}
#[inline]
fn region_manifest_dir(&self, region_name: &str) -> String {
format!("{}/manifest/", region_name)
}
#[inline]
pub fn region_manifest_dir(region_name: &str) -> String {
format!("{}/manifest/", region_name)
}
type RegionMap<S> = HashMap<String, RegionImpl<S>>;
@@ -117,14 +120,23 @@ struct EngineInner<S: LogStore> {
log_store: Arc<S>,
regions: RwLock<RegionMap<S>>,
shared: SharedData,
memtable_builder: MemtableBuilderRef,
flush_scheduler: FlushSchedulerRef,
flush_strategy: FlushStrategyRef,
}
impl<S: LogStore> EngineInner<S> {
pub async fn new(config: EngineConfig, log_store: Arc<S>) -> Result<Self> {
let job_pool = Arc::new(JobPoolImpl {});
let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool));
Ok(Self {
log_store,
regions: RwLock::new(Default::default()),
shared: SharedData::new(config).await?,
memtable_builder: Arc::new(DefaultMemtableBuilder {}),
flush_scheduler,
flush_strategy: Arc::new(SizeBasedStrategy::default()),
})
}
@@ -144,29 +156,38 @@ impl<S: LogStore> EngineInner<S> {
.context(error::InvalidRegionDescSnafu {
region: &region_name,
})?;
let wal = Wal::new(region_id, region_name.clone(), self.log_store.clone());
let sst_dir = &self.shared.region_sst_dir(&region_name);
let sst_dir = &region_sst_dir(&region_name);
let sst_layer = Arc::new(FsAccessLayer::new(
sst_dir,
self.shared.object_store.clone(),
));
let manifest_dir = self.shared.region_manifest_dir(&region_name);
let manifest_dir = region_manifest_dir(&region_name);
let manifest =
RegionManifest::new(region_id, &manifest_dir, self.shared.object_store.clone());
let store_config = StoreConfig {
log_store: self.log_store.clone(),
sst_layer,
manifest: manifest.clone(),
memtable_builder: self.memtable_builder.clone(),
flush_scheduler: self.flush_scheduler.clone(),
flush_strategy: self.flush_strategy.clone(),
};
let region = RegionImpl::new(
region_id,
region_name.clone(),
metadata.clone(),
wal,
sst_layer,
manifest.clone(),
store_config,
);
// Persist region metadata
manifest
.update(RegionMetaAction::Change(RegionChange {
metadata: Arc::new(metadata),
}))
.update(RegionMetaActionList::new(vec![
RegionMetaAction::Protocol(ProtocolAction::new()),
RegionMetaAction::Change(RegionChange {
metadata: Arc::new(metadata),
}),
]))
.await?;
{

View File

@@ -5,6 +5,7 @@ use std::str::Utf8Error;
use common_error::prelude::*;
use datatypes::arrow;
use serde_json::error::Error as JsonError;
use store_api::manifest::action::ProtocolVersion;
use store_api::manifest::ManifestVersion;
use crate::metadata::Error as MetadataError;
@@ -142,6 +143,34 @@ pub enum Error {
#[snafu(display("Task already cancelled"))]
Cancelled { backtrace: Backtrace },
#[snafu(display(
"Manifest protocol forbid to read, min_version: {}, supported_version: {}",
min_version,
supported_version
))]
ManifestProtocolForbidRead {
min_version: ProtocolVersion,
supported_version: ProtocolVersion,
backtrace: Backtrace,
},
#[snafu(display(
"Manifest protocol forbid to write, min_version: {}, supported_version: {}",
min_version,
supported_version
))]
ManifestProtocolForbidWrite {
min_version: ProtocolVersion,
supported_version: ProtocolVersion,
backtrace: Backtrace,
},
#[snafu(display("Failed to decode region action list, {}", msg))]
DecodeRegionMetaActionList { msg: String, backtrace: Backtrace },
#[snafu(display("Failed to read line, err: {}", source))]
Readline { source: IoError },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -162,7 +191,9 @@ impl ErrorExt for Error {
| EncodeJson { .. }
| DecodeJson { .. }
| JoinTask { .. }
| Cancelled { .. } => StatusCode::Unexpected,
| Cancelled { .. }
| DecodeRegionMetaActionList { .. }
| Readline { .. } => StatusCode::Unexpected,
FlushIo { .. }
| InitBackend { .. }
@@ -173,7 +204,9 @@ impl ErrorExt for Error {
| DeleteObject { .. }
| WriteWal { .. }
| DecodeWalHeader { .. }
| EncodeWalHeader { .. } => StatusCode::StorageUnavailable,
| EncodeWalHeader { .. }
| ManifestProtocolForbidRead { .. }
| ManifestProtocolForbidWrite { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -205,7 +205,11 @@ impl<S: LogStore> FlushJob<S> {
files_to_remove: Vec::default(),
};
logging::debug!("Write region edit: {:?} to manifest.", edit);
self.manifest.update(RegionMetaAction::Edit(edit)).await
self.manifest
.update(RegionMetaActionList::with_action(RegionMetaAction::Edit(
edit,
)))
.await
}
/// Generates random SST file name in format: `^[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}.parquet$`

View File

@@ -7,14 +7,14 @@ pub mod config;
mod engine;
pub mod error;
mod flush;
pub mod manifest;
mod manifest;
pub mod memtable;
pub mod metadata;
mod proto;
mod region;
mod snapshot;
mod sst;
pub mod sync;
mod sync;
#[cfg(test)]
mod test_util;
mod version;

View File

@@ -1,26 +1,35 @@
use std::io::{BufRead, BufReader, Write};
use serde::{Deserialize, Serialize};
use serde_json as json;
use snafu::ResultExt;
use serde_json::ser::to_writer;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::action::ProtocolAction;
use store_api::manifest::action::ProtocolVersion;
use store_api::manifest::ManifestVersion;
use store_api::manifest::MetaAction;
use store_api::manifest::Metadata;
use store_api::storage::RegionId;
use store_api::storage::SequenceNumber;
use crate::error::{DecodeJsonSnafu, EncodeJsonSnafu, Result, Utf8Snafu};
use crate::error::{
DecodeJsonSnafu, DecodeRegionMetaActionListSnafu, EncodeJsonSnafu,
ManifestProtocolForbidReadSnafu, ReadlineSnafu, Result,
};
use crate::metadata::{RegionMetadataRef, VersionNumber};
use crate::sst::FileMeta;
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct RegionChange {
pub metadata: RegionMetadataRef,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct RegionRemove {
pub region_id: RegionId,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct RegionEdit {
pub region_id: RegionId,
pub region_version: VersionNumber,
@@ -29,39 +38,186 @@ pub struct RegionEdit {
pub files_to_remove: Vec<FileMeta>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct RegionManifestData {
pub region_meta: RegionMetadataRef,
// TODO(dennis): version metadata
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct RegionMetaActionList {
pub actions: Vec<RegionMetaAction>,
pub prev_version: ManifestVersion,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum RegionMetaAction {
Protocol(ProtocolAction),
Change(RegionChange),
Remove(RegionRemove),
Edit(RegionEdit),
}
impl RegionMetaAction {
pub(crate) fn encode(&self) -> Result<Vec<u8>> {
Ok(json::to_string(self).context(EncodeJsonSnafu)?.into_bytes())
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
struct VersionHeader {
prev_version: ManifestVersion,
}
const NEWLINE: &[u8] = b"\n";
impl RegionMetaActionList {
pub fn with_action(action: RegionMetaAction) -> Self {
Self {
actions: vec![action],
prev_version: 0,
}
}
pub(crate) fn decode(bs: &[u8]) -> Result<Self> {
json::from_str(std::str::from_utf8(bs).context(Utf8Snafu)?).context(DecodeJsonSnafu)
pub fn new(actions: Vec<RegionMetaAction>) -> Self {
Self {
actions,
prev_version: 0,
}
}
/// Encode self into json in the form of string lines, starts with prev_version and then action json list.
pub(crate) fn encode(&self) -> Result<Vec<u8>> {
let mut bytes = Vec::default();
{
// Encode prev_version
let v = VersionHeader {
prev_version: self.prev_version,
};
to_writer(&mut bytes, &v).context(EncodeJsonSnafu)?;
// unwrap is fine here, because we write into a buffer.
bytes.write_all(NEWLINE).unwrap();
}
for action in &self.actions {
to_writer(&mut bytes, action).context(EncodeJsonSnafu)?;
bytes.write_all(NEWLINE).unwrap();
}
Ok(bytes)
}
pub(crate) fn decode(
bs: &[u8],
reader_version: ProtocolVersion,
) -> Result<(Self, Option<ProtocolAction>)> {
let mut lines = BufReader::new(bs).lines();
let mut action_list = RegionMetaActionList {
actions: Vec::default(),
prev_version: 0,
};
{
let first_line = lines
.next()
.with_context(|| DecodeRegionMetaActionListSnafu {
msg: format!(
"Invalid content in manifest: {}",
std::str::from_utf8(bs).unwrap_or("**invalid bytes**")
),
})?
.context(ReadlineSnafu)?;
// Decode prev_version
let v: VersionHeader = json::from_str(&first_line).context(DecodeJsonSnafu)?;
action_list.prev_version = v.prev_version;
}
// Decode actions
let mut protocol_action = None;
let mut actions = Vec::default();
for line in lines {
let line = &line.context(ReadlineSnafu)?;
let action: RegionMetaAction = json::from_str(line).context(DecodeJsonSnafu)?;
if let RegionMetaAction::Protocol(p) = &action {
ensure!(
p.is_readable(reader_version),
ManifestProtocolForbidReadSnafu {
min_version: p.min_reader_version,
supported_version: reader_version,
}
);
protocol_action = Some(p.clone());
}
actions.push(action);
}
action_list.actions = actions;
Ok((action_list, protocol_action))
}
}
impl Metadata for RegionManifestData {}
impl MetaAction for RegionMetaAction {
type MetadataId = RegionId;
fn metadata_id(&self) -> RegionId {
match self {
RegionMetaAction::Change(c) => c.metadata.id,
RegionMetaAction::Remove(r) => r.region_id,
RegionMetaAction::Edit(e) => e.region_id,
}
impl MetaAction for RegionMetaActionList {
fn set_prev_version(&mut self, version: ManifestVersion) {
self.prev_version = version;
}
}
#[cfg(test)]
mod tests {
use common_telemetry::logging;
use super::*;
#[test]
fn test_encode_decode_action_list() {
common_telemetry::init_default_ut_logging();
let mut protocol = ProtocolAction::new();
protocol.min_reader_version = 1;
let mut action_list = RegionMetaActionList::new(vec![
RegionMetaAction::Protocol(protocol.clone()),
RegionMetaAction::Edit(RegionEdit {
region_id: 1,
region_version: 10,
flush_sequence: 99,
files_to_add: vec![
FileMeta {
file_path: "test1".to_string(),
level: 1,
},
FileMeta {
file_path: "test2".to_string(),
level: 2,
},
],
files_to_remove: vec![FileMeta {
file_path: "test0".to_string(),
level: 0,
}],
}),
]);
action_list.set_prev_version(3);
let bs = action_list.encode().unwrap();
// {"prev_version":3}
// {"Protocol":{"min_reader_version":1,"min_writer_version":0}}
// {"Edit":{"region_id":1,"region_version":10,"flush_sequence":99,"files_to_add":[{"file_path":"test1","level":1},{"file_path":"test2","level":2}],"files_to_remove":[{"file_path":"test0","level":0}]}}
logging::debug!(
"Encoded action list: \r\n{}",
String::from_utf8(bs.clone()).unwrap()
);
let e = RegionMetaActionList::decode(&bs, 0);
assert!(e.is_err());
assert_eq!(
"Manifest protocol forbid to read, min_version: 1, supported_version: 0",
format!("{}", e.err().unwrap())
);
let (decode_list, p) = RegionMetaActionList::decode(&bs, 1).unwrap();
assert_eq!(decode_list, action_list);
assert_eq!(p.unwrap(), protocol);
}
}

View File

@@ -4,13 +4,16 @@ use std::sync::{
Arc,
};
use arc_swap::ArcSwap;
use async_trait::async_trait;
use common_telemetry::logging;
use object_store::ObjectStore;
use snafu::ensure;
use store_api::manifest::action::{self, ProtocolAction, ProtocolVersion};
use store_api::manifest::*;
use store_api::storage::RegionId;
use crate::error::{Error, Result};
use crate::error::{Error, ManifestProtocolForbidWriteSnafu, Result};
use crate::manifest::action::*;
use crate::manifest::storage::ManifestObjectStore;
use crate::manifest::storage::ObjectStoreLogIterator;
@@ -23,7 +26,7 @@ pub struct RegionManifest {
#[async_trait]
impl Manifest for RegionManifest {
type Error = Error;
type MetaAction = RegionMetaAction;
type MetaAction = RegionMetaActionList;
type MetadataId = RegionId;
type Metadata = RegionManifestData;
@@ -33,8 +36,8 @@ impl Manifest for RegionManifest {
}
}
async fn update(&self, action: RegionMetaAction) -> Result<ManifestVersion> {
self.inner.save(&action).await
async fn update(&self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
self.inner.save(action_list).await
}
async fn load(&self) -> Result<Option<RegionManifestData>> {
@@ -49,13 +52,17 @@ impl Manifest for RegionManifest {
let mut iter = self.inner.scan(start_bound, MAX_VERSION).await?;
match iter.next_action().await? {
Some((_v, RegionMetaAction::Change(c))) => Ok(Some(RegionManifestData {
region_meta: c.metadata,
})),
Some(_) => todo!(),
None => Ok(None),
while let Some((_v, action_list)) = iter.next_action().await? {
for action in action_list.actions {
if let RegionMetaAction::Change(c) = action {
return Ok(Some(RegionManifestData {
region_meta: c.metadata,
}));
}
}
}
Ok(None)
}
async fn checkpoint(&self) -> Result<ManifestVersion> {
@@ -71,18 +78,26 @@ struct RegionManifestInner {
region_id: RegionId,
store: Arc<ManifestObjectStore>,
version: AtomicU64,
/// Current using protocol
protocol: ArcSwap<ProtocolAction>,
/// Current node supported protocols (reader_version, writer_version)
supported_reader_version: ProtocolVersion,
supported_writer_version: ProtocolVersion,
}
struct RegionMetaActionIterator {
struct RegionMetaActionListIterator {
log_iter: ObjectStoreLogIterator,
reader_version: ProtocolVersion,
}
impl RegionMetaActionIterator {
async fn next_action(&mut self) -> Result<Option<(ManifestVersion, RegionMetaAction)>> {
impl RegionMetaActionListIterator {
async fn next_action(&mut self) -> Result<Option<(ManifestVersion, RegionMetaActionList)>> {
match self.log_iter.next_log().await? {
Some((v, bytes)) => {
let action: RegionMetaAction = RegionMetaAction::decode(&bytes)?;
Ok(Some((v, action)))
//TODO(dennis): save protocol into inner's protocol when recovering
let (action_list, _protocol) =
RegionMetaActionList::decode(&bytes, self.reader_version)?;
Ok(Some((v, action_list)))
}
None => Ok(None),
}
@@ -91,11 +106,16 @@ impl RegionMetaActionIterator {
impl RegionManifestInner {
fn new(region_id: RegionId, manifest_dir: &str, object_store: ObjectStore) -> Self {
let (reader_version, writer_version) = action::supported_protocol_version();
Self {
region_id,
store: Arc::new(ManifestObjectStore::new(manifest_dir, object_store)),
// TODO(dennis): recover the last version from history
version: AtomicU64::new(0),
protocol: ArcSwap::new(Arc::new(ProtocolAction::new())),
supported_reader_version: reader_version,
supported_writer_version: writer_version,
}
}
@@ -109,16 +129,26 @@ impl RegionManifestInner {
self.version.load(Ordering::Relaxed)
}
async fn save(&self, action: &RegionMetaAction) -> Result<ManifestVersion> {
async fn save(&self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
let protocol = self.protocol.load();
ensure!(
protocol.is_writable(self.supported_writer_version),
ManifestProtocolForbidWriteSnafu {
min_version: protocol.min_writer_version,
supported_version: self.supported_writer_version,
}
);
let version = self.inc_version();
logging::debug!(
"Save region metadata action: {:?}, version: {}",
action,
action_list,
version
);
self.store.save(version, &action.encode()?).await?;
self.store.save(version, &action_list.encode()?).await?;
Ok(version)
}
@@ -127,9 +157,10 @@ impl RegionManifestInner {
&self,
start: ManifestVersion,
end: ManifestVersion,
) -> Result<RegionMetaActionIterator> {
Ok(RegionMetaActionIterator {
) -> Result<RegionMetaActionListIterator> {
Ok(RegionMetaActionListIterator {
log_iter: self.store.scan(start, end).await?,
reader_version: self.supported_reader_version,
})
}
}
@@ -172,9 +203,11 @@ mod tests {
assert!(manifest.load().await.unwrap().is_none());
manifest
.update(RegionMetaAction::Change(RegionChange {
metadata: region_meta.clone(),
}))
.update(RegionMetaActionList::with_action(RegionMetaAction::Change(
RegionChange {
metadata: region_meta.clone(),
},
)))
.await
.unwrap();
@@ -193,9 +226,11 @@ mod tests {
let metadata: RegionMetadata = desc.try_into().unwrap();
let region_meta = Arc::new(metadata);
manifest
.update(RegionMetaAction::Change(RegionChange {
metadata: region_meta.clone(),
}))
.update(RegionMetaActionList::with_action(RegionMetaAction::Change(
RegionChange {
metadata: region_meta.clone(),
},
)))
.await
.unwrap();

View File

@@ -9,11 +9,10 @@ use snafu::ensure;
use store_api::logstore::LogStore;
use store_api::storage::{ReadContext, Region, RegionId, RegionMeta, WriteContext, WriteResponse};
use crate::background::JobPoolImpl;
use crate::error::{self, Error, Result};
use crate::flush::{FlushSchedulerImpl, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy};
use crate::flush::{FlushSchedulerRef, FlushStrategyRef};
use crate::manifest::region::RegionManifest;
use crate::memtable::{DefaultMemtableBuilder, MemtableVersion};
use crate::memtable::{MemtableBuilderRef, MemtableVersion};
use crate::metadata::{RegionMetaImpl, RegionMetadata};
pub use crate::region::writer::{RegionWriter, RegionWriterRef, WriterContext};
use crate::snapshot::SnapshotImpl;
@@ -59,34 +58,42 @@ impl<S: LogStore> Region for RegionImpl<S> {
}
}
/// Storage related config for region.
///
/// Contains all necessary storage related components needed by the region, such as logstore,
/// manifest, memtable builder.
pub struct StoreConfig<S> {
pub log_store: Arc<S>,
pub sst_layer: AccessLayerRef,
pub manifest: RegionManifest,
pub memtable_builder: MemtableBuilderRef,
pub flush_scheduler: FlushSchedulerRef,
pub flush_strategy: FlushStrategyRef,
}
impl<S: LogStore> RegionImpl<S> {
pub fn new(
id: RegionId,
name: String,
metadata: RegionMetadata,
wal: Wal<S>,
sst_layer: AccessLayerRef,
manifest: RegionManifest,
store_config: StoreConfig<S>,
) -> RegionImpl<S> {
let memtable_builder = Arc::new(DefaultMemtableBuilder {});
let memtable_version = MemtableVersion::new();
// TODO(yingwen): Pass flush scheduler to `RegionImpl::new`.
let job_pool = Arc::new(JobPoolImpl {});
let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool));
let version_control = VersionControl::new(metadata, memtable_version);
let wal = Wal::new(id, name.clone(), store_config.log_store);
let inner = Arc::new(RegionInner {
shared: Arc::new(SharedData {
id,
name,
version_control: Arc::new(version_control),
}),
writer: Arc::new(RegionWriter::new(memtable_builder)),
writer: Arc::new(RegionWriter::new(store_config.memtable_builder)),
wal,
flush_strategy: Arc::new(SizeBasedStrategy::default()),
flush_scheduler,
sst_layer,
manifest,
flush_strategy: store_config.flush_strategy,
flush_scheduler: store_config.flush_scheduler,
sst_layer: store_config.sst_layer,
manifest: store_config.manifest,
});
RegionImpl { inner }

View File

@@ -1,18 +1,14 @@
//! Region tests.
mod flush;
mod read_write;
use datatypes::type_id::LogicalTypeId;
use log_store::fs::noop::NoopLogStore;
use object_store::{backend::fs::Backend, ObjectStore};
use store_api::manifest::Manifest;
use store_api::storage::consts;
use tempdir::TempDir;
use super::*;
use crate::manifest::region::RegionManifest;
use crate::sst::FsAccessLayer;
use crate::test_util::{self, descriptor_util::RegionDescBuilder, schema_util};
use crate::test_util::{self, config_util, descriptor_util::RegionDescBuilder, schema_util};
#[tokio::test]
async fn test_new_region() {
@@ -25,26 +21,15 @@ async fn test_new_region() {
.build();
let metadata = desc.try_into().unwrap();
let wal = Wal::new(region_id, region_name, Arc::new(NoopLogStore::default()));
let store_dir = TempDir::new("test_new_region")
.unwrap()
.path()
.to_string_lossy()
.to_string();
let accessor = Backend::build().root(&store_dir).finish().await.unwrap();
let object_store = ObjectStore::new(accessor);
let sst_layer = Arc::new(FsAccessLayer::new("/", object_store.clone()));
let manifest = RegionManifest::new(region_id, "/manifest/", object_store);
let store_config = config_util::new_store_config(&store_dir, region_id, region_name).await;
let region = RegionImpl::new(
region_id,
region_name.to_string(),
metadata,
wal,
sst_layer,
manifest,
);
let region = RegionImpl::new(region_id, region_name.to_string(), metadata, store_config);
let expect_schema = schema_util::new_schema_ref(
&[

View File

@@ -0,0 +1,108 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use log_store::fs::noop::NoopLogStore;
use store_api::storage::WriteResponse;
use tempdir::TempDir;
use crate::engine;
use crate::flush::{FlushStrategy, FlushStrategyRef};
use crate::region::tests::read_write::{self, Tester};
use crate::region::{RegionImpl, SharedDataRef};
use crate::test_util::config_util;
const REGION_NAME: &str = "region-flush-0";
/// Create a new region for flush test
async fn new_region_for_flush(
store_dir: &str,
enable_version_column: bool,
flush_strategy: FlushStrategyRef,
) -> RegionImpl<NoopLogStore> {
let region_id = 0;
let metadata = read_write::new_metadata(REGION_NAME, enable_version_column);
let mut store_config = config_util::new_store_config(store_dir, region_id, REGION_NAME).await;
store_config.flush_strategy = flush_strategy;
RegionImpl::new(region_id, REGION_NAME.to_string(), metadata, store_config)
}
struct FlushTester {
tester: Tester,
}
impl FlushTester {
async fn new(store_dir: &str, flush_strategy: FlushStrategyRef) -> FlushTester {
let region = new_region_for_flush(store_dir, false, flush_strategy).await;
FlushTester {
tester: Tester::with_region(region),
}
}
async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
self.tester.put(data).await
}
}
#[derive(Default)]
struct FlushSwitch {
should_flush: AtomicBool,
}
impl FlushSwitch {
fn set_should_flush(&self, should_flush: bool) {
self.should_flush.store(should_flush, Ordering::Relaxed);
}
}
impl FlushStrategy for FlushSwitch {
fn should_flush(
&self,
_shared: &SharedDataRef,
_bytes_mutable: usize,
_bytes_total: usize,
) -> bool {
self.should_flush.load(Ordering::Relaxed)
}
}
#[tokio::test]
async fn test_flush() {
common_telemetry::init_default_ut_logging();
let dir = TempDir::new("flush").unwrap();
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
// Always trigger flush before write.
let tester = FlushTester::new(store_dir, flush_switch.clone()).await;
let data = [(1000, Some(100))];
// Put one element so we have content to flush.
tester.put(&data).await;
// Now set should flush to true to trigger flush.
flush_switch.set_should_flush(true);
// Put element to trigger flush.
tester.put(&data).await;
// Now put another data to trigger write stall and wait until last flush done to
// ensure at least one parquet file is generated.
tester.put(&data).await;
// Check parquet files.
let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir(REGION_NAME));
let mut has_parquet_file = false;
for entry in std::fs::read_dir(sst_dir).unwrap() {
let entry = entry.unwrap();
let path = entry.path();
if !path.is_dir() {
assert_eq!("parquet", path.extension().unwrap());
has_parquet_file = true;
}
}
assert!(has_parquet_file);
}

View File

@@ -6,21 +6,24 @@ use datatypes::prelude::*;
use datatypes::type_id::LogicalTypeId;
use datatypes::vectors::Int64Vector;
use log_store::fs::noop::NoopLogStore;
use object_store::{backend::fs::Backend, ObjectStore};
use store_api::manifest::Manifest;
use store_api::storage::{
consts, Chunk, ChunkReader, PutOperation, ReadContext, Region, RegionMeta, ScanRequest,
SequenceNumber, Snapshot, WriteContext, WriteRequest, WriteResponse,
};
use tempdir::TempDir;
use crate::manifest::region::RegionManifest;
use crate::region::RegionImpl;
use crate::sst::FsAccessLayer;
use crate::test_util::{self, descriptor_util::RegionDescBuilder, write_batch_util};
use crate::wal::Wal;
use crate::region::{RegionImpl, RegionMetadata};
use crate::test_util::{self, config_util, descriptor_util::RegionDescBuilder, write_batch_util};
use crate::write_batch::{PutData, WriteBatch};
pub fn new_metadata(region_name: &str, enable_version_column: bool) -> RegionMetadata {
let desc = RegionDescBuilder::new(region_name)
.enable_version_column(enable_version_column)
.push_value_column(("v1", LogicalTypeId::Int64, true))
.build();
desc.try_into().unwrap()
}
/// Create a new region for read/write test
async fn new_region_for_rw(
store_dir: &str,
@@ -28,28 +31,12 @@ async fn new_region_for_rw(
) -> RegionImpl<NoopLogStore> {
let region_id = 0;
let region_name = "region-rw-0";
let sst_dir = format!("{}/{}/", store_dir, region_name);
let manifest_dir = format!("{}/{}/maniffest/", store_dir, region_name);
let desc = RegionDescBuilder::new(region_name)
.enable_version_column(enable_version_column)
.push_value_column(("v1", LogicalTypeId::Int64, true))
.build();
let metadata = desc.try_into().unwrap();
let wal = Wal::new(region_id, region_name, Arc::new(NoopLogStore::default()));
let accessor = Backend::build().root(store_dir).finish().await.unwrap();
let object_store = ObjectStore::new(accessor);
let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone()));
let manifest = RegionManifest::new(region_id, &manifest_dir, object_store);
let metadata = new_metadata(region_name, enable_version_column);
RegionImpl::new(
region_id,
region_name.to_string(),
metadata,
wal,
sst_layer,
manifest,
)
let store_config = config_util::new_store_config(store_dir, region_id, region_name).await;
RegionImpl::new(region_id, region_name.to_string(), metadata, store_config)
}
fn new_write_batch_for_test(enable_version_column: bool) -> WriteBatch {
@@ -104,7 +91,7 @@ fn append_chunk_to(chunk: &Chunk, dst: &mut Vec<(i64, Option<i64>)>) {
}
/// Test region without considering version column.
struct Tester {
pub struct Tester {
region: RegionImpl<NoopLogStore>,
write_ctx: WriteContext,
read_ctx: ReadContext,
@@ -114,6 +101,10 @@ impl Tester {
async fn new(store_dir: &str) -> Tester {
let region = new_region_for_rw(store_dir, false).await;
Tester::with_region(region)
}
pub fn with_region(region: RegionImpl<NoopLogStore>) -> Tester {
Tester {
region,
write_ctx: WriteContext::default(),
@@ -124,7 +115,7 @@ impl Tester {
/// Put without version specified.
///
/// Format of data: (timestamp, v1), timestamp is key, v1 is value.
async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
pub async fn put(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
// Build a batch without version.
let mut batch = new_write_batch_for_test(false);
let put_data = new_put_data(data);

View File

@@ -21,17 +21,28 @@ use crate::write_batch::WriteBatch;
pub type RegionWriterRef = Arc<RegionWriter>;
// TODO(yingwen): Add benches for write and support group commit to improve write throughput.
/// Region writer manages all write operations to the region.
pub struct RegionWriter {
/// Inner writer guarded by write lock, the write lock is used to ensure
/// all write operations are serialized.
inner: Mutex<WriterInner>,
/// Version lock, protects read-write-update to region `Version`.
///
/// Increasing committed sequence should be guarded by this lock.
version_mutex: Mutex<()>,
}
impl RegionWriter {
pub fn new(memtable_builder: MemtableBuilderRef) -> RegionWriter {
RegionWriter {
inner: Mutex::new(WriterInner::new(memtable_builder)),
version_mutex: Mutex::new(()),
}
}
/// Write to region in the write lock.
pub async fn write<S: LogStore>(
&self,
ctx: &WriteContext,
@@ -39,17 +50,48 @@ impl RegionWriter {
writer_ctx: WriterContext<'_, S>,
) -> Result<WriteResponse> {
let mut inner = self.inner.lock().await;
inner.write(ctx, request, writer_ctx).await
inner
.write(&self.version_mutex, ctx, request, writer_ctx)
.await
}
/// Apply version edit.
pub async fn apply_version_edit<S: LogStore>(
&self,
wal: &Wal<S>,
edit: VersionEdit,
shared: &SharedDataRef,
) -> Result<()> {
let mut inner = self.inner.lock().await;
inner.apply_version_edit(wal, edit, shared).await
// HACK: We won't acquire the write lock here because write stall would hold
// write lock thus we have no chance to get the lock and apply the version edit.
// So we add a version lock to ensure modification to `VersionControl` is
// serialized.
let version_control = &shared.version_control;
let _lock = self.version_mutex.lock().await;
let next_sequence = version_control.committed_sequence() + 1;
self.persist_manifest_version(wal, next_sequence, &edit)
.await?;
version_control.apply_edit(edit);
version_control.set_committed_sequence(next_sequence);
Ok(())
}
async fn persist_manifest_version<S: LogStore>(
&self,
wal: &Wal<S>,
seq: SequenceNumber,
edit: &VersionEdit,
) -> Result<()> {
let header = WalHeader::with_last_manifest_version(edit.manifest_version);
wal.write_to_wal(seq, header, Payload::None).await?;
Ok(())
}
}
@@ -85,13 +127,13 @@ impl WriterInner {
}
}
// TODO(yingwen): Support group commit so we can avoid taking mutable reference.
/// Write `WriteBatch` to region, now the schema of batch needs to be validated outside.
///
/// Mutable reference of writer ensure no other reference of this writer can modify the
/// version control (write is exclusive).
async fn write<S: LogStore>(
&mut self,
version_mutex: &Mutex<()>,
_ctx: &WriteContext,
request: WriteBatch,
writer_ctx: WriterContext<'_, S>,
@@ -102,6 +144,7 @@ impl WriterInner {
let version_control = writer_ctx.version_control();
let version = version_control.current();
let _lock = version_mutex.lock().await;
let committed_sequence = version_control.committed_sequence();
// Sequence for current write batch.
let next_sequence = committed_sequence + 1;
@@ -214,6 +257,10 @@ impl WriterInner {
// However the last flush job may fail, in which case, we just return error
// and abort current write request. The flush handle is left empty, so the next
// time we still have chance to trigger a new flush.
logging::info!("Write stall, region: {}", shared.name);
// TODO(yingwen): We should release the write lock during waiting flush done, which
// needs something like async condvar.
flush_handle.join().await.map_err(|e| {
logging::error!(
"Previous flush job failed, region: {}, err: {}",
@@ -250,39 +297,6 @@ impl WriterInner {
Ok(())
}
async fn apply_version_edit<S: LogStore>(
&mut self,
wal: &Wal<S>,
edit: VersionEdit,
shared: &SharedDataRef,
) -> Result<()> {
let version_control = &shared.version_control;
let next_sequence = version_control.committed_sequence() + 1;
self.persist_manifest_version(wal, next_sequence, &edit)
.await?;
version_control.apply_edit(edit);
version_control.set_committed_sequence(next_sequence);
Ok(())
}
async fn persist_manifest_version<S: LogStore>(
&self,
wal: &Wal<S>,
seq: SequenceNumber,
edit: &VersionEdit,
) -> Result<()> {
let header = WalHeader::with_last_manifest_version(edit.manifest_version);
wal.write_to_wal(seq, header, Payload::None).await?;
Ok(())
}
#[inline]
fn alloc_memtable_id(&mut self) -> MemtableId {
self.last_memtable_id += 1;

View File

@@ -107,7 +107,7 @@ impl FileHandleInner {
}
/// Immutable metadata of a sst file.
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct FileMeta {
pub file_path: String,
/// SST level of the file.

View File

@@ -1,3 +1,4 @@
pub mod config_util;
pub mod descriptor_util;
pub mod schema_util;
pub mod write_batch_util;

View File

@@ -0,0 +1,40 @@
use std::sync::Arc;
use log_store::fs::noop::NoopLogStore;
use object_store::{backend::fs::Backend, ObjectStore};
use store_api::manifest::Manifest;
use store_api::storage::RegionId;
use crate::background::JobPoolImpl;
use crate::engine;
use crate::flush::{FlushSchedulerImpl, SizeBasedStrategy};
use crate::manifest::region::RegionManifest;
use crate::memtable::DefaultMemtableBuilder;
use crate::region::StoreConfig;
use crate::sst::FsAccessLayer;
/// Create a new StoreConfig for test.
pub async fn new_store_config(
store_dir: &str,
region_id: RegionId,
region_name: &str,
) -> StoreConfig<NoopLogStore> {
let sst_dir = engine::region_sst_dir(region_name);
let manifest_dir = engine::region_manifest_dir(region_name);
let accessor = Backend::build().root(store_dir).finish().await.unwrap();
let object_store = ObjectStore::new(accessor);
let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone()));
let manifest = RegionManifest::new(region_id, &manifest_dir, object_store);
let job_pool = Arc::new(JobPoolImpl {});
let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool));
StoreConfig {
log_store: Arc::new(NoopLogStore::default()),
sst_layer,
manifest,
memtable_builder: Arc::new(DefaultMemtableBuilder {}),
flush_scheduler,
flush_strategy: Arc::new(SizeBasedStrategy::default()),
}
}

View File

@@ -97,16 +97,7 @@ impl VersionControl {
pub fn apply_edit(&self, edit: VersionEdit) {
let mut version_to_update = self.version.lock();
if let Some(max_memtable_id) = edit.max_memtable_id {
// Remove flushed memtables
let memtable_version = version_to_update.memtables();
let removed = memtable_version.remove_immutables(max_memtable_id);
version_to_update.memtables = Arc::new(removed);
}
version_to_update.apply_edit(edit);
version_to_update.commit();
}
}
@@ -189,6 +180,14 @@ impl Version {
if self.manifest_version < edit.manifest_version {
self.manifest_version = edit.manifest_version;
}
if let Some(max_memtable_id) = edit.max_memtable_id {
// Remove flushed memtables
let memtable_version = self.memtables();
let removed = memtable_version.remove_immutables(max_memtable_id);
self.memtables = Arc::new(removed);
}
let handles_to_add = edit.files_to_add.into_iter().map(FileHandle::new);
let merged_ssts = self.ssts.merge(handles_to_add);

View File

@@ -129,6 +129,7 @@ impl<S: LogStore> Wal<S> {
pub enum Payload<'a> {
None, // only header
WriteBatchArrow(&'a WriteBatch),
#[allow(dead_code)]
WriteBatchProto(&'a WriteBatch),
}

View File

@@ -19,4 +19,5 @@ snafu = { version = "0.7", features = ["backtraces"] }
[dev-dependencies]
async-stream = "0.3"
serde_json = "1.0"
tokio = { version = "1.0", features = ["full"] }

View File

@@ -1,4 +1,5 @@
//! metadata service
pub mod action;
mod storage;
use async_trait::async_trait;
@@ -15,12 +16,8 @@ pub trait Metadata: Clone {}
pub trait MetadataId: Clone + Copy {}
/// The action to apply on metadata
pub trait MetaAction: Serialize + DeserializeOwned {
type MetadataId: MetadataId;
/// Returns the metadata id of the action
fn metadata_id(&self) -> Self::MetadataId;
fn set_prev_version(&mut self, version: ManifestVersion);
}
/// Manifest service

View File

@@ -0,0 +1,82 @@
///! Common actions for manifest
use serde::{Deserialize, Serialize};
pub type ProtocolVersion = u16;
/// Current reader and writer versions
/// TODO(dennis): configurable
const READER_VERSION: ProtocolVersion = 0;
const WRITER_VERSION: ProtocolVersion = 0;
/// The maximum protocol version we are currently allowed to use,
/// TODO(dennis): reading from configuration.
pub fn supported_protocol_version() -> (ProtocolVersion, ProtocolVersion) {
(READER_VERSION, WRITER_VERSION)
}
/// Protocol action that used to block older clients from reading or writing the log when backwards
/// incompatible changes are made to the protocol. clients should be tolerant of messages and
/// fields that they do not understand.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ProtocolAction {
pub min_reader_version: ProtocolVersion,
pub min_writer_version: ProtocolVersion,
}
impl Default for ProtocolAction {
fn default() -> Self {
let (min_reader_version, min_writer_version) = supported_protocol_version();
Self {
min_reader_version,
min_writer_version,
}
}
}
impl ProtocolAction {
pub fn new() -> Self {
Self::default()
}
pub fn is_readable(&self, reader_version: ProtocolVersion) -> bool {
reader_version >= self.min_reader_version
}
pub fn is_writable(&self, writer_version: ProtocolVersion) -> bool {
writer_version >= self.min_writer_version
}
}
#[cfg(test)]
mod tests {
use serde_json as json;
use super::*;
#[test]
fn test_protocol_action() {
let mut action = ProtocolAction::new();
assert!(action.is_readable(0));
assert!(action.is_writable(0));
action.min_reader_version = 2;
action.min_writer_version = 3;
assert!(!action.is_readable(0));
assert!(!action.is_writable(0));
assert!(action.is_readable(2));
assert!(action.is_writable(3));
assert!(action.is_readable(3));
assert!(action.is_writable(4));
let s = json::to_string(&action).unwrap();
assert_eq!("{\"min_reader_version\":2,\"min_writer_version\":3}", s);
let action_decoded: ProtocolAction = json::from_str(&s).unwrap();
assert!(!action_decoded.is_readable(0));
assert!(!action_decoded.is_writable(0));
assert!(action_decoded.is_readable(2));
assert!(action_decoded.is_writable(3));
assert!(action_decoded.is_readable(3));
assert!(action_decoded.is_writable(4));
}
}