Compare commits

...

5 Commits

Author SHA1 Message Date
Ruihang Xia
1bd53567b4 try to run on self-hosted runner
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2023-06-13 16:01:50 +08:00
Weny Xu
803940cfa4 feat: enable azblob tests (#1765)
* feat: enable azblob tests

* fix: add missing arg
2023-06-13 07:44:57 +00:00
Weny Xu
420ae054b3 chore: add debug log for heartbeat (#1770) 2023-06-13 07:43:26 +00:00
Lei, HUANG
0f1e061f24 fix: compile issue on develop and workaround to fix failing tests cau… (#1771)
* fix: compile issue on develop and workaround to fix failing tests caused by logstore file lock

* Apply suggestions from code review

Co-authored-by: JeremyHi <jiachun_feng@proton.me>

---------

Co-authored-by: JeremyHi <jiachun_feng@proton.me>
2023-06-13 07:30:16 +00:00
Lei, HUANG
7961de25ad feat: persist compaction time window (#1757)
* feat: persist compaction time window

* refactor: remove useless compaction window fields

* chore: revert some useless change

* fix: some CR comments

* fix: comment out unstable sqlness test

* revert commented sqlness
2023-06-13 10:15:42 +08:00
30 changed files with 247 additions and 128 deletions

View File

@@ -39,22 +39,22 @@ jobs:
# The file format is greptime-<os>-<arch>
include:
- arch: aarch64-apple-darwin
os: macos-latest
os: self-hosted
file: greptime-darwin-arm64
continue-on-error: false
opts: "-F servers/dashboard"
- arch: x86_64-apple-darwin
os: macos-latest
os: self-hosted
file: greptime-darwin-amd64
continue-on-error: false
opts: "-F servers/dashboard"
- arch: aarch64-apple-darwin
os: macos-latest
os: self-hosted
file: greptime-darwin-arm64-pyo3
continue-on-error: false
opts: "-F pyo3_backend,servers/dashboard"
- arch: x86_64-apple-darwin
os: macos-latest
os: self-hosted
file: greptime-darwin-amd64-pyo3
continue-on-error: false
opts: "-F pyo3_backend,servers/dashboard"

View File

@@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{error, info, trace, warn};
use common_telemetry::{debug, error, info, trace, warn};
use meta_client::client::{HeartbeatSender, MetaClient};
use snafu::ResultExt;
use tokio::sync::mpsc;
@@ -199,6 +199,7 @@ impl HeartbeatTask {
}
};
if let Some(req) = req {
debug!("Sending heartbeat request: {:?}", req);
if let Err(e) = tx.send(req).await {
error!("Failed to send heartbeat to metasrv, error: {:?}", e);
match Self::create_streams(

View File

@@ -20,8 +20,7 @@ use common_meta::heartbeat::handler::{
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::tracing::trace;
use common_telemetry::{error, info};
use common_telemetry::{debug, error, info};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use snafu::ResultExt;
use tokio::sync::mpsc;
@@ -83,7 +82,7 @@ impl HeartbeatTask {
loop {
match resp_stream.message().await {
Ok(Some(resp)) => {
trace!("Received a heartbeat response: {:?}", resp);
debug!("Receiving heartbeat response: {:?}", resp);
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
if let Err(e) = capture_self.handle_response(ctx) {
error!(e; "Error while handling heartbeat response");
@@ -92,7 +91,6 @@ impl HeartbeatTask {
Ok(None) => break,
Err(e) => {
error!(e; "Occur error while reading heartbeat response");
capture_self
.start_with_retry(Duration::from_secs(retry_interval))
.await;
@@ -148,7 +146,7 @@ impl HeartbeatTask {
error!(e; "Failed to send heartbeat to metasrv");
break;
} else {
trace!("Send a heartbeat request to metasrv, content: {:?}", req);
debug!("Send a heartbeat request to metasrv, content: {:?}", req);
}
}
}

View File

@@ -20,7 +20,7 @@ use api::v1::meta::{
heartbeat_server, AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse,
Peer, RequestHeader, ResponseHeader, Role,
};
use common_telemetry::{error, info, warn};
use common_telemetry::{debug, error, info, warn};
use futures::StreamExt;
use once_cell::sync::OnceCell;
use tokio::sync::mpsc;
@@ -59,6 +59,7 @@ impl heartbeat_server::Heartbeat for MetaSrv {
break;
}
};
debug!("Receiving heartbeat request: {:?}", req);
if pusher_key.is_none() {
let node_id = get_node_id(header);
@@ -76,6 +77,7 @@ impl heartbeat_server::Heartbeat for MetaSrv {
is_not_leader = res.as_ref().map_or(false, |r| r.is_not_leader());
debug!("Sending heartbeat response: {:?}", res);
tx.send(res).await.expect("working rx");
}
Err(err) => {

View File

@@ -452,7 +452,6 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.write_buffer_size
.map(|s| s.0 as usize),
ttl: table_info.meta.options.ttl,
compaction_time_window: table_info.meta.options.compaction_time_window,
};
debug!(
@@ -532,7 +531,6 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.write_buffer_size
.map(|s| s.0 as usize),
ttl: table_info.meta.options.ttl,
compaction_time_window: table_info.meta.options.compaction_time_window,
};
// TODO(weny): Returns an error earlier if the target region does not exist in the meta.

View File

@@ -228,18 +228,15 @@ impl<S: StorageEngine> TableCreator<S> {
let table_options = &self.data.request.table_options;
let write_buffer_size = table_options.write_buffer_size.map(|size| size.0 as usize);
let ttl = table_options.ttl;
let compaction_time_window = table_options.compaction_time_window;
let open_opts = OpenOptions {
parent_dir: table_dir.to_string(),
write_buffer_size,
ttl,
compaction_time_window,
};
let create_opts = CreateOptions {
parent_dir: table_dir.to_string(),
write_buffer_size,
ttl,
compaction_time_window,
};
let primary_key_indices = &self.data.request.primary_key_indices;
@@ -285,7 +282,6 @@ impl<S: StorageEngine> TableCreator<S> {
.name(region_name.clone())
.row_key(row_key.clone())
.default_cf(default_cf.clone())
.compaction_time_window(compaction_time_window)
.build()
.context(BuildRegionDescriptorSnafu {
table_name: &self.data.request.table_name,

View File

@@ -71,10 +71,6 @@ fn create_sql_options(table_meta: &TableMeta) -> Vec<SqlOption> {
));
}
if let Some(w) = table_opts.compaction_time_window {
options.push(sql_option("compaction_time_window", number_value(w)));
}
for (k, v) in table_opts
.extra_options
.iter()

View File

@@ -62,7 +62,6 @@ impl RegionDescBuilder {
row_key: self.key_builder.build().unwrap(),
default_cf: self.default_cf_builder.build().unwrap(),
extra_cfs: Vec::new(),
compaction_time_window: None,
}
}

View File

@@ -120,6 +120,7 @@ impl<S: LogStore> Picker for SimplePicker<S> {
}
let ctx = &PickerContext::with(req.compaction_time_window);
for level_num in 0..levels.level_num() {
let level = levels.level(level_num as u8);
let (compaction_time_window, outputs) = self.strategy.pick(ctx, level);
@@ -130,8 +131,8 @@ impl<S: LogStore> Picker for SimplePicker<S> {
}
debug!(
"Found SST files to compact {:?} on level: {}",
outputs, level_num
"Found SST files to compact {:?} on level: {}, compaction window: {:?}",
outputs, level_num, compaction_time_window,
);
return Ok(Some(CompactionTaskImpl {
schema: req.schema(),

View File

@@ -47,19 +47,24 @@ impl Strategy for SimpleTimeWindowStrategy {
if files.is_empty() {
return (None, vec![]);
}
let time_bucket = ctx
.compaction_time_window()
.unwrap_or_else(|| infer_time_bucket(&files));
let buckets = calculate_time_buckets(time_bucket, &files);
debug!("File bucket:{}, file groups: {:?}", time_bucket, buckets);
let time_window = ctx.compaction_time_window().unwrap_or_else(|| {
let inferred = infer_time_bucket(&files);
debug!(
"Compaction window is not present, inferring from files: {:?}",
inferred
);
inferred
});
let buckets = calculate_time_buckets(time_window, &files);
debug!("File bucket:{}, file groups: {:?}", time_window, buckets);
(
Some(time_bucket),
Some(time_window),
buckets
.into_iter()
.map(|(bound, files)| CompactionOutput {
output_level: 1,
bucket_bound: bound,
bucket: time_bucket,
bucket: time_window,
inputs: files,
})
.collect(),

View File

@@ -102,7 +102,6 @@ impl<S: LogStore> CompactionTaskImpl<S> {
}
/// Writes updated SST info into manifest.
// TODO(etolbakov): we are not persisting inferred compaction_time_window (#1083)[https://github.com/GreptimeTeam/greptimedb/pull/1083]
async fn write_manifest_and_apply(
&self,
output: HashSet<FileMeta>,
@@ -116,6 +115,7 @@ impl<S: LogStore> CompactionTaskImpl<S> {
flushed_sequence: None,
files_to_add: Vec::from_iter(output.into_iter()),
files_to_remove: Vec::from_iter(input.into_iter()),
compaction_time_window: self.compaction_time_window,
};
debug!(
"Compacted region: {}, region edit: {:?}",
@@ -151,7 +151,10 @@ impl<S: LogStore> CompactionTask for CompactionTaskImpl<S> {
let input_ids = compacted.iter().map(|f| f.file_id).collect::<Vec<_>>();
let output_ids = output.iter().map(|f| f.file_id).collect::<Vec<_>>();
info!("Compacting SST files, input: {input_ids:?}, output: {output_ids:?}");
info!(
"Compacting SST files, input: {:?}, output: {:?}, window: {:?}",
input_ids, output_ids, self.compaction_time_window
);
self.write_manifest_and_apply(output, compacted)
.await
.map_err(|e| {

View File

@@ -89,7 +89,7 @@ impl<S: LogStore> StorageEngine for EngineImpl<S> {
async fn drop_region(&self, _ctx: &EngineContext, region: Self::Region) -> Result<()> {
region.drop_region().await?;
self.inner.remove_reigon(region.name());
self.inner.remove_region(region.name());
Ok(())
}
@@ -395,7 +395,6 @@ impl<S: LogStore> EngineInner<S> {
name,
&self.config,
opts.ttl,
opts.compaction_time_window,
)
.await?;
@@ -441,7 +440,6 @@ impl<S: LogStore> EngineInner<S> {
&region_name,
&self.config,
opts.ttl,
opts.compaction_time_window,
)
.await?;
@@ -462,7 +460,7 @@ impl<S: LogStore> EngineInner<S> {
self.regions.get_region(name)
}
fn remove_reigon(&self, name: &str) {
fn remove_region(&self, name: &str) {
self.regions.remove(name)
}
@@ -473,7 +471,6 @@ impl<S: LogStore> EngineInner<S> {
region_name: &str,
config: &EngineConfig,
region_ttl: Option<Duration>,
compaction_time_window: Option<i64>,
) -> Result<StoreConfig<S>> {
let parent_dir = util::normalize_dir(parent_dir);
@@ -504,7 +501,6 @@ impl<S: LogStore> EngineInner<S> {
engine_config: self.config.clone(),
file_purger: self.file_purger.clone(),
ttl,
compaction_time_window,
write_buffer_size: write_buffer_size
.unwrap_or(self.config.region_write_buffer_size.as_bytes() as usize),
})

View File

@@ -312,6 +312,7 @@ impl<S: LogStore> FlushJob<S> {
flushed_sequence: Some(self.flush_sequence),
files_to_add: file_metas.to_vec(),
files_to_remove: Vec::default(),
compaction_time_window: None,
};
self.writer

View File

@@ -38,8 +38,6 @@ pub struct RawRegionMetadata {
pub columns: RawColumnsMetadata,
pub column_families: RawColumnFamiliesMetadata,
pub version: VersionNumber,
/// Time window for compaction
pub compaction_time_window: Option<i64>,
}
/// Minimal data that could be used to persist and recover [ColumnsMetadata](crate::metadata::ColumnsMetadata).
@@ -78,6 +76,7 @@ pub struct RegionEdit {
pub flushed_sequence: Option<SequenceNumber>,
pub files_to_add: Vec<FileMeta>,
pub files_to_remove: Vec<FileMeta>,
pub compaction_time_window: Option<i64>,
}
/// The region version checkpoint
@@ -382,6 +381,7 @@ mod tests {
flushed_sequence: Some(99),
files_to_add: files.clone(),
files_to_remove: vec![],
compaction_time_window: None,
},
);
builder.apply_edit(
@@ -391,6 +391,7 @@ mod tests {
flushed_sequence: Some(100),
files_to_add: vec![],
files_to_remove: vec![files[0].clone()],
compaction_time_window: None,
},
);

View File

@@ -71,5 +71,6 @@ pub fn build_region_edit(
file_size: DEFAULT_TEST_FILE_SIZE,
})
.collect(),
compaction_time_window: None,
}
}

View File

@@ -194,8 +194,6 @@ pub struct RegionMetadata {
pub columns: ColumnsMetadataRef,
column_families: ColumnFamiliesMetadata,
version: VersionNumber,
/// Time window for compaction
compaction_time_window: Option<i64>,
}
impl RegionMetadata {
@@ -214,11 +212,6 @@ impl RegionMetadata {
&self.schema
}
#[inline]
pub fn compaction_time_window(&self) -> Option<i64> {
self.compaction_time_window
}
#[inline]
pub fn user_schema(&self) -> &SchemaRef {
self.schema.user_schema()
@@ -320,8 +313,7 @@ impl RegionMetadata {
let mut builder = RegionDescriptorBuilder::default()
.id(self.id)
.name(&self.name)
.row_key(row_key)
.compaction_time_window(self.compaction_time_window);
.row_key(row_key);
for (cf_id, cf) in &self.column_families.id_to_cfs {
let mut cf_builder = ColumnFamilyDescriptorBuilder::default()
@@ -354,7 +346,6 @@ impl From<&RegionMetadata> for RawRegionMetadata {
columns: RawColumnsMetadata::from(&*data.columns),
column_families: RawColumnFamiliesMetadata::from(&data.column_families),
version: data.version,
compaction_time_window: data.compaction_time_window,
}
}
}
@@ -373,7 +364,6 @@ impl TryFrom<RawRegionMetadata> for RegionMetadata {
columns,
column_families: raw.column_families.into(),
version: raw.version,
compaction_time_window: raw.compaction_time_window,
})
}
}
@@ -631,7 +621,6 @@ impl TryFrom<RegionDescriptor> for RegionMetadataBuilder {
.name(desc.name)
.id(desc.id)
.row_key(desc.row_key)?
.compaction_time_window(desc.compaction_time_window)
.add_column_family(desc.default_cf)?;
for cf in desc.extra_cfs {
builder = builder.add_column_family(cf)?;
@@ -778,7 +767,6 @@ struct RegionMetadataBuilder {
columns_meta_builder: ColumnsMetadataBuilder,
cfs_meta_builder: ColumnFamiliesMetadataBuilder,
version: VersionNumber,
compaction_time_window: Option<i64>,
}
impl Default for RegionMetadataBuilder {
@@ -795,7 +783,6 @@ impl RegionMetadataBuilder {
columns_meta_builder: ColumnsMetadataBuilder::default(),
cfs_meta_builder: ColumnFamiliesMetadataBuilder::default(),
version: Schema::INITIAL_VERSION,
compaction_time_window: None,
}
}
@@ -820,11 +807,6 @@ impl RegionMetadataBuilder {
Ok(self)
}
fn compaction_time_window(mut self, compaction_time_window: Option<i64>) -> Self {
self.compaction_time_window = compaction_time_window;
self
}
fn add_column_family(mut self, cf: ColumnFamilyDescriptor) -> Result<Self> {
let column_index_start = self.columns_meta_builder.columns.len();
let column_index_end = column_index_start + cf.columns.len();
@@ -855,7 +837,6 @@ impl RegionMetadataBuilder {
columns,
column_families: self.cfs_meta_builder.build(),
version: self.version,
compaction_time_window: self.compaction_time_window,
})
}
}
@@ -1047,7 +1028,6 @@ mod tests {
.unwrap();
RegionMetadataBuilder::new()
.name(TEST_REGION)
.compaction_time_window(None)
.row_key(row_key)
.unwrap()
.add_column_family(cf)

View File

@@ -160,11 +160,10 @@ pub struct StoreConfig<S: LogStore> {
pub engine_config: Arc<EngineConfig>,
pub file_purger: FilePurgerRef,
pub ttl: Option<Duration>,
pub compaction_time_window: Option<i64>,
pub write_buffer_size: usize,
}
pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata));
pub type RecoveredMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata));
pub type RecoveredMetadataMap = BTreeMap<SequenceNumber, (ManifestVersion, RawRegionMetadata)>;
#[derive(Debug)]
@@ -244,7 +243,6 @@ impl<S: LogStore> RegionImpl<S> {
store_config.memtable_builder,
store_config.engine_config.clone(),
store_config.ttl,
store_config.compaction_time_window,
store_config.write_buffer_size,
)),
wal,
@@ -264,7 +262,7 @@ impl<S: LogStore> RegionImpl<S> {
pub async fn open(
name: String,
store_config: StoreConfig<S>,
opts: &OpenOptions,
_opts: &OpenOptions,
) -> Result<Option<RegionImpl<S>>> {
// Load version meta data from manifest.
let (version, mut recovered_metadata) = match Self::recover_from_manifest(
@@ -328,14 +326,11 @@ impl<S: LogStore> RegionImpl<S> {
version_control,
last_flush_millis: AtomicI64::new(0),
});
let compaction_time_window = store_config
.compaction_time_window
.or(opts.compaction_time_window);
let writer = Arc::new(RegionWriter::new(
store_config.memtable_builder,
store_config.engine_config.clone(),
store_config.ttl,
compaction_time_window,
store_config.write_buffer_size,
));
let writer_ctx = WriterContext {
@@ -521,6 +516,7 @@ impl<S: LogStore> RegionImpl<S> {
flushed_sequence: e.flushed_sequence,
manifest_version,
max_memtable_id: None,
compaction_time_window: e.compaction_time_window,
};
version.map(|mut v| {
v.apply_edit(edit);

View File

@@ -556,7 +556,6 @@ async fn create_store_config(region_name: &str, root: &str) -> StoreConfig<NoopL
engine_config: Default::default(),
file_purger,
ttl: None,
compaction_time_window: None,
write_buffer_size: ReadableSize::mb(32).0 as usize,
}
}

View File

@@ -17,13 +17,14 @@
use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::logging;
use common_test_util::temp_dir::create_temp_dir;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use object_store::services::{Fs, S3};
use object_store::ObjectStore;
use store_api::storage::{FlushContext, FlushReason, Region, WriteResponse};
use store_api::storage::{FlushContext, FlushReason, OpenOptions, Region, WriteResponse};
use tokio::sync::Notify;
use crate::compaction::{CompactionHandler, SimplePicker};
@@ -151,6 +152,9 @@ struct CompactionTester {
base: Option<FileTesterBase>,
purge_handler: MockFilePurgeHandler,
object_store: ObjectStore,
store_dir: String,
engine_config: EngineConfig,
flush_strategy: FlushStrategyRef,
}
impl CompactionTester {
@@ -165,7 +169,7 @@ impl CompactionTester {
store_dir,
engine_config.clone(),
purge_handler.clone(),
flush_strategy,
flush_strategy.clone(),
s3_bucket,
)
.await;
@@ -174,6 +178,9 @@ impl CompactionTester {
base: Some(FileTesterBase::with_region(region)),
purge_handler,
object_store,
store_dir: store_dir.to_string(),
engine_config,
flush_strategy,
}
}
@@ -221,6 +228,48 @@ impl CompactionTester {
self.object_store.remove_all("/").await.unwrap();
}
async fn reopen(&mut self) -> Result<bool> {
// Close the old region.
if let Some(base) = self.base.take() {
base.close().await;
}
// Reopen the region.
let object_store = new_object_store(&self.store_dir, None);
let (mut store_config, _) = config_util::new_store_config_with_object_store(
REGION_NAME,
&self.store_dir,
object_store.clone(),
EngineConfig {
max_files_in_l0: usize::MAX,
..Default::default()
},
)
.await;
store_config.engine_config = Arc::new(self.engine_config.clone());
store_config.flush_strategy = self.flush_strategy.clone();
let picker = SimplePicker::default();
let handler = CompactionHandler::new(picker);
let config = SchedulerConfig::default();
// Overwrite test compaction scheduler and file purger.
store_config.compaction_scheduler = Arc::new(LocalScheduler::new(config, handler));
store_config.file_purger = Arc::new(LocalScheduler::new(
SchedulerConfig {
max_inflight_tasks: store_config.engine_config.max_purge_tasks,
},
MockFilePurgeHandler::default(),
));
// FIXME(hl): find out which component prevents logstore from being dropped.
tokio::time::sleep(Duration::from_millis(100)).await;
let Some(region) = RegionImpl::open(REGION_NAME.to_string(), store_config, &OpenOptions::default()).await? else {
return Ok(false);
};
self.base = Some(FileTesterBase::with_region(region));
Ok(true)
}
}
async fn compact_during_read(s3_bucket: Option<String>) {
@@ -290,3 +339,110 @@ async fn test_compact_during_read_on_s3() {
}
}
}
#[tokio::test]
async fn test_persist_region_compaction_time_window() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("put-delete-scan");
let store_dir = dir.path().to_str().unwrap();
let mut tester = CompactionTester::new(
store_dir,
EngineConfig {
max_files_in_l0: 100,
..Default::default()
},
// Disable auto-flush.
Arc::new(FlushSwitch::default()),
None,
)
.await;
// initially the time window is not present since no compaction ever happened.
assert_eq!(
None,
tester
.base
.as_ref()
.unwrap()
.region
.inner
.shared
.version_control
.current()
.ssts()
.compaction_time_window()
);
// write some data with one hour span
for idx in 0..10 {
tester
.put(&[(idx * 1000, Some(idx)), ((idx + 360) * 1000, Some(idx))])
.await;
tester.flush(Some(true)).await;
}
tester.compact().await;
// the inferred and persisted compaction time window should be 3600 seconds.
assert_eq!(
3600,
tester
.base
.as_ref()
.unwrap()
.region
.inner
.shared
.version_control
.current()
.ssts()
.compaction_time_window()
.unwrap()
);
// try write data with a larger time window
for idx in 0..10 {
tester
.put(&[
(idx * 1000, Some(idx)),
((idx + 2 * 60 * 60) * 1000, Some(idx)),
])
.await;
tester.flush(Some(true)).await;
}
tester.compact().await;
// but we won't changed persisted compaction window for now, so it remains unchanged.
assert_eq!(
3600,
tester
.base
.as_ref()
.unwrap()
.region
.inner
.shared
.version_control
.current()
.ssts()
.compaction_time_window()
.unwrap()
);
let reopened = tester.reopen().await.unwrap();
assert!(reopened);
assert_eq!(
3600,
tester
.base
.as_ref()
.unwrap()
.region
.inner
.shared
.version_control
.current()
.ssts()
.compaction_time_window()
.unwrap()
);
}

View File

@@ -102,6 +102,8 @@ impl FlushTester {
)
.await;
store_config.flush_strategy = self.flush_strategy.clone();
// FIXME(hl): find out which component prevents logstore from being dropped.
tokio::time::sleep(Duration::from_millis(100)).await;
let opts = OpenOptions::default();
let region = RegionImpl::open(REGION_NAME.to_string(), store_config, &opts)
.await

View File

@@ -42,7 +42,7 @@ use crate::metadata::RegionMetadataRef;
use crate::metrics::{FLUSH_REASON, FLUSH_REQUESTS_TOTAL, PREPROCESS_ELAPSED};
use crate::proto::wal::WalHeader;
use crate::region::{
CompactContext, RecoverdMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef,
CompactContext, RecoveredMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef,
};
use crate::schema::compat::CompatWrite;
use crate::sst::AccessLayerRef;
@@ -72,7 +72,6 @@ impl RegionWriter {
memtable_builder: MemtableBuilderRef,
config: Arc<EngineConfig>,
ttl: Option<Duration>,
compaction_time_window: Option<i64>,
write_buffer_size: usize,
) -> RegionWriter {
RegionWriter {
@@ -80,7 +79,6 @@ impl RegionWriter {
memtable_builder,
config,
ttl,
compaction_time_window,
write_buffer_size,
)),
version_mutex: Mutex::new(()),
@@ -141,7 +139,7 @@ impl RegionWriter {
let files_to_add = edit.files_to_add.clone();
let files_to_remove = edit.files_to_remove.clone();
let flushed_sequence = edit.flushed_sequence;
let compaction_time_window = edit.compaction_time_window;
// Persist the meta action.
let mut action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
action_list.set_prev_version(prev_version);
@@ -158,6 +156,7 @@ impl RegionWriter {
flushed_sequence,
manifest_version,
max_memtable_id,
compaction_time_window,
};
// We could tolerate failure during persisting manifest version to the WAL, since it won't
@@ -456,7 +455,6 @@ struct WriterInner {
closed: bool,
engine_config: Arc<EngineConfig>,
ttl: Option<Duration>,
compaction_time_window: Option<i64>,
/// Size in bytes to freeze the mutable memtable.
write_buffer_size: usize,
}
@@ -466,7 +464,6 @@ impl WriterInner {
memtable_builder: MemtableBuilderRef,
engine_config: Arc<EngineConfig>,
ttl: Option<Duration>,
compaction_time_window: Option<i64>,
write_buffer_size: usize,
) -> WriterInner {
WriterInner {
@@ -475,7 +472,6 @@ impl WriterInner {
engine_config,
closed: false,
ttl,
compaction_time_window,
write_buffer_size,
}
}
@@ -641,7 +637,7 @@ impl WriterInner {
&self,
writer_ctx: &WriterContext<'_, S>,
sequence: SequenceNumber,
mut metadata: Option<RecoverdMetadata>,
mut metadata: Option<RecoveredMetadata>,
version_control: &VersionControl,
) -> Result<()> {
// It's safe to unwrap here, it's checked outside.
@@ -776,7 +772,7 @@ impl WriterInner {
manifest: ctx.manifest.clone(),
engine_config: self.engine_config.clone(),
ttl: self.ttl,
compaction_time_window: self.compaction_time_window,
compaction_time_window: current_version.ssts().compaction_time_window(),
};
let flush_handle = ctx
@@ -798,6 +794,12 @@ impl WriterInner {
sst_write_buffer_size: ReadableSize,
) -> Result<()> {
let region_id = writer_ctx.shared.id();
let compaction_time_window = writer_ctx
.shared
.version_control
.current()
.ssts()
.compaction_time_window();
let mut compaction_request = CompactionRequestImpl {
region_id,
sst_layer: writer_ctx.sst_layer.clone(),
@@ -806,7 +808,7 @@ impl WriterInner {
manifest: writer_ctx.manifest.clone(),
wal: writer_ctx.wal.clone(),
ttl: self.ttl,
compaction_time_window: self.compaction_time_window,
compaction_time_window,
sender: None,
sst_write_buffer_size,
};

View File

@@ -65,12 +65,15 @@ pub struct LevelMetas {
levels: LevelMetaVec,
sst_layer: AccessLayerRef,
file_purger: FilePurgerRef,
/// Compaction time window in seconds
compaction_time_window: Option<i64>,
}
impl std::fmt::Debug for LevelMetas {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LevelMetas")
.field("levels", &self.levels)
.field("compaction_time_window", &self.compaction_time_window)
.finish()
}
}
@@ -82,6 +85,7 @@ impl LevelMetas {
levels: new_level_meta_vec(),
sst_layer,
file_purger,
compaction_time_window: Default::default(),
}
}
@@ -91,6 +95,10 @@ impl LevelMetas {
self.levels.len()
}
pub fn compaction_time_window(&self) -> Option<i64> {
self.compaction_time_window
}
#[inline]
pub fn level(&self, level: Level) -> &LevelMeta {
&self.levels[level as usize]
@@ -104,6 +112,7 @@ impl LevelMetas {
&self,
files_to_add: impl Iterator<Item = FileMeta>,
files_to_remove: impl Iterator<Item = FileMeta>,
compaction_time_window: Option<i64>,
) -> LevelMetas {
let mut merged = self.clone();
for file in files_to_add {
@@ -118,6 +127,11 @@ impl LevelMetas {
removed_file.mark_deleted();
}
}
// we only update region's compaction time window iff region's window is not set and VersionEdit's
// compaction time window is present.
if let Some(window) = compaction_time_window {
merged.compaction_time_window.get_or_insert(window);
}
merged
}
@@ -726,6 +740,7 @@ mod tests {
]
.into_iter(),
vec![].into_iter(),
None,
);
assert_eq!(
@@ -740,6 +755,7 @@ mod tests {
]
.into_iter(),
vec![].into_iter(),
None,
);
assert_eq!(
HashSet::from([file_ids[0], file_ids[1]]),
@@ -758,6 +774,7 @@ mod tests {
create_file_meta(file_ids[2], 0),
]
.into_iter(),
None,
);
assert_eq!(
HashSet::from([file_ids[1]]),
@@ -776,6 +793,7 @@ mod tests {
create_file_meta(file_ids[3], 1),
]
.into_iter(),
None,
);
assert_eq!(
HashSet::from([file_ids[1]]),

View File

@@ -95,6 +95,7 @@ pub async fn new_store_config_with_object_store(
..Default::default()
};
let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap());
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
// We use an empty region map so actually the background worker of the picker is disabled.
let regions = Arc::new(RegionMap::new());
@@ -123,7 +124,6 @@ pub async fn new_store_config_with_object_store(
engine_config: Arc::new(engine_config),
file_purger,
ttl: None,
compaction_time_window: None,
write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize,
},
regions,

View File

@@ -89,7 +89,6 @@ impl RegionDescBuilder {
row_key: self.key_builder.build().unwrap(),
default_cf: self.default_cf_builder.build().unwrap(),
extra_cfs: Vec::new(),
compaction_time_window: None,
}
}

View File

@@ -134,6 +134,7 @@ pub struct VersionEdit {
pub flushed_sequence: Option<SequenceNumber>,
pub manifest_version: ManifestVersion,
pub max_memtable_id: Option<MemtableId>,
pub compaction_time_window: Option<i64>,
}
pub type VersionControlRef = Arc<VersionControl>;
@@ -235,7 +236,7 @@ impl Version {
) {
self.flushed_sequence = flushed_sequence.unwrap_or(self.flushed_sequence);
self.manifest_version = manifest_version;
let ssts = self.ssts.merge(files, std::iter::empty());
let ssts = self.ssts.merge(files, std::iter::empty(), None);
info!(
"After applying checkpoint, region: {}, id: {}, flushed_sequence: {}, manifest_version: {}",
self.metadata.name(),
@@ -264,15 +265,17 @@ impl Version {
}
let handles_to_add = edit.files_to_add.into_iter();
let merged_ssts = self
.ssts
.merge(handles_to_add, edit.files_to_remove.into_iter());
let merged_ssts = self.ssts.merge(
handles_to_add,
edit.files_to_remove.into_iter(),
edit.compaction_time_window,
);
debug!(
"After applying edit, region: {}, id: {}, SST files: {:?}",
self.metadata.name(),
self.metadata.id(),
merged_ssts
merged_ssts,
);
self.ssts = Arc::new(merged_ssts);
}

View File

@@ -144,8 +144,6 @@ pub struct RegionDescriptor {
/// Extra column families defined by user.
#[builder(default, setter(each(name = "push_extra_column_family")))]
pub extra_cfs: Vec<ColumnFamilyDescriptor>,
/// Time window for compaction
pub compaction_time_window: Option<i64>,
}
impl RowKeyDescriptorBuilder {

View File

@@ -92,7 +92,6 @@ pub struct CreateOptions {
pub write_buffer_size: Option<usize>,
/// Region SST files TTL
pub ttl: Option<Duration>,
pub compaction_time_window: Option<i64>,
}
/// Options to open a region.
@@ -104,7 +103,6 @@ pub struct OpenOptions {
pub write_buffer_size: Option<usize>,
/// Region SST files TTL
pub ttl: Option<Duration>,
pub compaction_time_window: Option<i64>,
}
/// Options to close a region.

View File

@@ -170,7 +170,6 @@ mod tests {
.name("test")
.row_key(row_key)
.default_cf(default_cf)
.compaction_time_window(Some(1677652502))
.build()
.unwrap()
}

View File

@@ -77,14 +77,11 @@ pub struct TableOptions {
pub ttl: Option<Duration>,
/// Extra options that may not applicable to all table engines.
pub extra_options: HashMap<String, String>,
/// Time window for compaction
pub compaction_time_window: Option<i64>,
}
pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
pub const TTL_KEY: &str = "ttl";
pub const REGIONS_KEY: &str = "regions";
pub const COMPACTION_TIME_WINDOW_KEY: &str = "compaction_time_window";
impl TryFrom<&HashMap<String, String>> for TableOptions {
type Error = error::Error;
@@ -115,24 +112,8 @@ impl TryFrom<&HashMap<String, String>> for TableOptions {
.into();
options.ttl = Some(ttl_value);
}
if let Some(compaction_time_window) = value.get(COMPACTION_TIME_WINDOW_KEY) {
options.compaction_time_window = match compaction_time_window.parse::<i64>() {
Ok(t) => Some(t),
Err(_) => {
return ParseTableOptionSnafu {
key: COMPACTION_TIME_WINDOW_KEY,
value: compaction_time_window,
}
.fail()
}
};
}
options.extra_options = HashMap::from_iter(value.iter().filter_map(|(k, v)| {
if k != WRITE_BUFFER_SIZE_KEY
&& k != REGIONS_KEY
&& k != TTL_KEY
&& k != COMPACTION_TIME_WINDOW_KEY
{
if k != WRITE_BUFFER_SIZE_KEY && k != REGIONS_KEY && k != TTL_KEY {
Some((k.clone(), v.clone()))
} else {
None
@@ -155,12 +136,6 @@ impl From<&TableOptions> for HashMap<String, String> {
let ttl_str = humantime::format_duration(ttl).to_string();
res.insert(TTL_KEY.to_string(), ttl_str);
}
if let Some(compaction_time_window) = opts.compaction_time_window {
res.insert(
COMPACTION_TIME_WINDOW_KEY.to_string(),
compaction_time_window.to_string(),
);
}
res.extend(
opts.extra_options
.iter()
@@ -328,7 +303,6 @@ mod tests {
write_buffer_size: None,
ttl: Some(Duration::from_secs(1000)),
extra_options: HashMap::new(),
compaction_time_window: Some(1677652502),
};
let serialized = serde_json::to_string(&options).unwrap();
let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap();
@@ -341,7 +315,6 @@ mod tests {
write_buffer_size: Some(ReadableSize::mb(128)),
ttl: Some(Duration::from_secs(1000)),
extra_options: HashMap::new(),
compaction_time_window: Some(1677652502),
};
let serialized_map = HashMap::from(&options);
let serialized = TableOptions::try_from(&serialized_map).unwrap();
@@ -351,7 +324,6 @@ mod tests {
write_buffer_size: None,
ttl: None,
extra_options: HashMap::new(),
compaction_time_window: None,
};
let serialized_map = HashMap::from(&options);
let serialized = TableOptions::try_from(&serialized_map).unwrap();
@@ -361,7 +333,6 @@ mod tests {
write_buffer_size: Some(ReadableSize::mb(128)),
ttl: Some(Duration::from_secs(1000)),
extra_options: HashMap::from([("a".to_string(), "A".to_string())]),
compaction_time_window: Some(1677652502),
};
let serialized_map = HashMap::from(&options);
let serialized = TableOptions::try_from(&serialized_map).unwrap();

View File

@@ -21,7 +21,7 @@ mod sql;
#[macro_use]
mod region_failover;
grpc_tests!(File, S3, S3WithCache, Oss);
http_tests!(File, S3, S3WithCache, Oss);
region_failover_tests!(File, S3, S3WithCache, Oss);
grpc_tests!(File, S3, S3WithCache, Oss, Azblob);
http_tests!(File, S3, S3WithCache, Oss, Azblob);
region_failover_tests!(File, S3, S3WithCache, Oss, Azblob);
sql_tests!(File);