feat: compaction integration (#997)

* feat: trigger compaction on flush

* chore: rebase develop

* feat: add config item max_file_in_level0 and remove compaction_after_flush

* fix: cr comments

* chore: add unit test to cover Timestamp::new_inclusive

* fix: workaround to fix future is not Sync

* fix: future is not sync

* fix: some cr comments
This commit is contained in:
Lei, HUANG
2023-02-15 14:14:07 +08:00
committed by GitHub
parent e2904b99ac
commit 75b8afe043
30 changed files with 515 additions and 196 deletions

View File

@@ -24,3 +24,7 @@ metasrv_addrs = ['127.0.0.1:3002']
timeout_millis = 3000
connect_timeout_millis = 5000
tcp_nodelay = false
[compaction]
max_inflight_task = 4
max_file_in_level0 = 16

View File

@@ -399,6 +399,7 @@ mod tests {
use mito::config::EngineConfig;
use mito::engine::MitoEngine;
use object_store::ObjectStore;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::metadata::TableType;
@@ -485,12 +486,14 @@ mod tests {
.build()
.unwrap();
let object_store = ObjectStore::new(accessor);
let noop_compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let table_engine = Arc::new(MitoEngine::new(
EngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
object_store.clone(),
noop_compaction_scheduler,
),
object_store,
));

View File

@@ -143,7 +143,7 @@ mod tests {
use std::assert_matches::assert_matches;
use std::time::Duration;
use datanode::datanode::ObjectStoreConfig;
use datanode::datanode::{CompactionConfig, ObjectStoreConfig};
use servers::Mode;
use super::*;
@@ -181,6 +181,14 @@ mod tests {
ObjectStoreConfig::S3 { .. } => unreachable!(),
ObjectStoreConfig::Oss { .. } => unreachable!(),
};
assert_eq!(
CompactionConfig {
max_inflight_task: 4,
max_file_in_level0: 16,
},
options.compaction
);
}
#[test]

View File

@@ -205,7 +205,7 @@ impl TimestampRange {
pub fn new_inclusive(start: Option<Timestamp>, end: Option<Timestamp>) -> Self {
// check for emptiness
if let (Some(start_ts), Some(end_ts)) = (start, end) {
if start_ts >= end_ts {
if start_ts > end_ts {
return Self::empty();
}
}
@@ -462,4 +462,29 @@ mod tests {
assert!(!full.intersects(&empty));
}
#[test]
fn test_new_inclusive() {
let range = TimestampRange::new_inclusive(
Some(Timestamp::new_millisecond(1)),
Some(Timestamp::new_millisecond(3)),
);
assert!(!range.is_empty());
assert!(range.contains(&Timestamp::new_millisecond(1)));
assert!(range.contains(&Timestamp::new_millisecond(3)));
let range = TimestampRange::new_inclusive(
Some(Timestamp::new_millisecond(1)),
Some(Timestamp::new_millisecond(1)),
);
assert!(!range.is_empty());
assert_eq!(1, range.start.unwrap().value());
assert!(range.contains(&Timestamp::new_millisecond(1)));
let range = TimestampRange::new_inclusive(
Some(Timestamp::new_millisecond(2)),
Some(Timestamp::new_millisecond(1)),
);
assert!(range.is_empty());
}
}

View File

@@ -20,6 +20,8 @@ use common_telemetry::info;
use meta_client::MetaClientOpts;
use serde::{Deserialize, Serialize};
use servers::Mode;
use storage::compaction::CompactionSchedulerConfig;
use storage::config::EngineConfig as StorageEngineConfig;
use crate::error::Result;
use crate::instance::{Instance, InstanceRef};
@@ -104,6 +106,40 @@ impl Default for WalConfig {
}
}
/// Options for table compaction
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct CompactionConfig {
/// Max task number that can concurrently run.
pub max_inflight_task: usize,
/// Max files in level 0 to trigger compaction.
pub max_file_in_level0: usize,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
max_inflight_task: 4,
max_file_in_level0: 8,
}
}
}
impl From<&DatanodeOptions> for CompactionSchedulerConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
max_inflight_task: value.compaction.max_inflight_task,
}
}
}
impl From<&DatanodeOptions> for StorageEngineConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
max_files_in_l0: value.compaction.max_file_in_level0,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct DatanodeOptions {
@@ -117,6 +153,7 @@ pub struct DatanodeOptions {
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
pub enable_memory_catalog: bool,
pub compaction: CompactionConfig,
pub mode: Mode,
}
@@ -133,6 +170,7 @@ impl Default for DatanodeOptions {
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
enable_memory_catalog: false,
compaction: CompactionConfig::default(),
mode: Mode::Standalone,
}
}

View File

@@ -38,8 +38,12 @@ use object_store::{util, ObjectStore};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use servers::Mode;
use snafu::prelude::*;
use storage::compaction::{
CompactionSchedulerConfig, CompactionSchedulerRef, LocalCompactionScheduler, SimplePicker,
};
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use store_api::logstore::LogStore;
use table::table::numbers::NumbersTable;
use table::table::TableIdProviderRef;
use table::Table;
@@ -92,12 +96,15 @@ impl Instance {
}
};
let compaction_scheduler = create_compaction_scheduler(opts);
let table_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
StorageEngineConfig::from(opts),
logstore.clone(),
object_store.clone(),
compaction_scheduler,
),
object_store,
));
@@ -204,6 +211,13 @@ impl Instance {
}
}
fn create_compaction_scheduler<S: LogStore>(opts: &DatanodeOptions) -> CompactionSchedulerRef<S> {
let picker = SimplePicker::default();
let config = CompactionSchedulerConfig::from(opts);
let scheduler = LocalCompactionScheduler::new(config, picker);
Arc::new(scheduler)
}
pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
let object_store = match store_config {
ObjectStoreConfig::File { .. } => new_fs_object_store(store_config).await,

View File

@@ -24,6 +24,7 @@ use mito::config::EngineConfig as TableEngineConfig;
use query::QueryEngineFactory;
use servers::Mode;
use snafu::ResultExt;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::metadata::TableId;
@@ -46,12 +47,14 @@ impl Instance {
let object_store = new_object_store(&opts.storage).await?;
let logstore = Arc::new(create_log_store(&opts.wal).await?);
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let table_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
logstore.clone(),
object_store.clone(),
compaction_scheduler,
),
object_store,
));

View File

@@ -150,6 +150,7 @@ mod tests {
use query::parser::{QueryLanguageParser, QueryStatement};
use query::QueryEngineFactory;
use sql::statements::statement::Statement;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::engine::TableReference;
@@ -209,7 +210,7 @@ mod tests {
let store_dir = dir.path().to_string_lossy();
let accessor = Builder::default().root(&store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let sql = r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
@@ -221,6 +222,7 @@ mod tests {
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
object_store.clone(),
compaction_scheduler,
),
object_store,
));

View File

@@ -605,6 +605,7 @@ mod tests {
Float64Vector, Int32Vector, StringVector, TimestampMillisecondVector, VectorRef,
};
use log_store::NoopLogStore;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::region::RegionImpl;
use storage::EngineImpl;
@@ -643,13 +644,14 @@ mod tests {
let (dir, object_store) =
test_util::new_test_object_store("test_insert_with_column_default_constraint").await;
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let table_engine = MitoEngine::new(
EngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
object_store.clone(),
compaction_scheduler,
),
object_store,
);

View File

@@ -23,6 +23,7 @@ use datatypes::vectors::VectorRef;
use log_store::NoopLogStore;
use object_store::services::fs::Builder;
use object_store::ObjectStore;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::engine::{EngineContext, TableEngine};
@@ -127,11 +128,12 @@ pub struct TestEngineComponents {
pub async fn setup_test_engine_and_table() -> TestEngineComponents {
let (dir, object_store) = new_test_object_store("setup_test_engine_and_table").await;
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let storage_engine = EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore::default()),
object_store.clone(),
compaction_scheduler,
);
let table_engine = MitoEngine::new(
EngineConfig::default(),

View File

@@ -118,6 +118,7 @@ mod tests {
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
use mito::engine::MitoEngine;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use tempdir::TempDir;
@@ -135,12 +136,14 @@ mod tests {
};
let log_store = RaftEngineLogStore::try_new(log_config).await.unwrap();
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let mock_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(log_store),
object_store.clone(),
compaction_scheduler,
),
object_store,
));

View File

@@ -962,7 +962,7 @@ pub(crate) mod greptime_builtin {
Ok(obj) => match py_vec_obj_to_array(&obj, vm, 1){
Ok(v) => if v.len()==1{
Ok(v)
}else{
} else {
Err(vm.new_runtime_error(format!("Expect return's length to be at most one, found to be length of {}.", v.len())))
},
Err(err) => Err(vm

View File

@@ -13,9 +13,22 @@
// limitations under the License.
mod dedup_deque;
pub mod noop;
mod picker;
mod rate_limit;
mod scheduler;
mod strategy;
mod task;
mod writer;
use std::sync::Arc;
pub use picker::{Picker, PickerContext, SimplePicker};
pub use scheduler::{
CompactionRequest, CompactionRequestImpl, CompactionScheduler, CompactionSchedulerConfig,
LocalCompactionScheduler,
};
pub use task::{CompactionTask, CompactionTaskImpl};
pub type CompactionSchedulerRef<S> =
Arc<dyn CompactionScheduler<CompactionRequestImpl<S>> + Send + Sync>;

View File

@@ -0,0 +1,79 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use store_api::storage::RegionId;
use crate::compaction::{
CompactionRequest, CompactionScheduler, CompactionTask, Picker, PickerContext,
};
pub struct NoopCompactionScheduler<R> {
_phantom_data: PhantomData<R>,
}
impl<R> Default for NoopCompactionScheduler<R> {
fn default() -> Self {
Self {
_phantom_data: Default::default(),
}
}
}
impl<R> Debug for NoopCompactionScheduler<R> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NoopCompactionScheduler<...>").finish()
}
}
#[derive(Default, Debug)]
pub struct NoopCompactionRequest;
#[derive(Default, Debug)]
pub struct NoopCompactionPicker;
impl<R, T: CompactionTask> Picker<R, T> for NoopCompactionPicker {
fn pick(&self, _ctx: &PickerContext, _req: &R) -> crate::error::Result<Option<T>> {
Ok(None)
}
}
#[derive(Debug)]
pub struct NoopCompactionTask;
#[async_trait::async_trait]
impl CompactionTask for NoopCompactionTask {
async fn run(self) -> crate::error::Result<()> {
Ok(())
}
}
impl CompactionRequest for NoopCompactionRequest {
fn region_id(&self) -> RegionId {
0
}
}
#[async_trait::async_trait]
impl<R: CompactionRequest> CompactionScheduler<R> for NoopCompactionScheduler<R> {
async fn schedule(&self, _request: R) -> crate::error::Result<bool> {
Ok(true)
}
async fn stop(&self) -> crate::error::Result<()> {
Ok(())
}
}

View File

@@ -13,12 +13,13 @@
// limitations under the License.
use std::marker::PhantomData;
use std::sync::Arc;
use common_telemetry::debug;
use store_api::logstore::LogStore;
use crate::compaction::scheduler::CompactionRequestImpl;
use crate::compaction::strategy::StrategyRef;
use crate::compaction::strategy::{SimpleTimeWindowStrategy, StrategyRef};
use crate::compaction::task::{CompactionTask, CompactionTaskImpl};
/// Picker picks input SST files and builds the compaction task.
@@ -30,12 +31,17 @@ pub trait Picker<R, T: CompactionTask>: Send + 'static {
pub struct PickerContext {}
/// L0 -> L1 compaction based on time windows.
pub(crate) struct SimplePicker<S> {
pub struct SimplePicker<S> {
strategy: StrategyRef,
_phantom_data: PhantomData<S>,
}
#[allow(unused)]
impl<S> Default for SimplePicker<S> {
fn default() -> Self {
Self::new(Arc::new(SimpleTimeWindowStrategy {}))
}
}
impl<S> SimplePicker<S> {
pub fn new(strategy: StrategyRef) -> Self {
Self {
@@ -51,7 +57,7 @@ impl<S: LogStore> Picker<CompactionRequestImpl<S>, CompactionTaskImpl<S>> for Si
ctx: &PickerContext,
req: &CompactionRequestImpl<S>,
) -> crate::error::Result<Option<CompactionTaskImpl<S>>> {
let levels = &req.levels;
let levels = &req.levels();
for level_num in 0..levels.level_num() {
let level = levels.level(level_num as u8);
@@ -67,7 +73,7 @@ impl<S: LogStore> Picker<CompactionRequestImpl<S>, CompactionTaskImpl<S>> for Si
outputs, level_num
);
return Ok(Some(CompactionTaskImpl {
schema: req.schema.clone(),
schema: req.schema(),
sst_layer: req.sst_layer.clone(),
outputs,
writer: req.writer.clone(),

View File

@@ -50,7 +50,6 @@ pub struct MaxInflightTaskLimiter<R> {
_phantom_data: PhantomData<R>,
}
#[allow(unused)]
impl<R> MaxInflightTaskLimiter<R> {
pub fn new(max_inflight_task: usize) -> Self {
Self {

View File

@@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use std::sync::{Arc, Mutex, RwLock};
use async_trait::async_trait;
use common_telemetry::{debug, info};
use common_telemetry::{debug, error, info};
use snafu::ResultExt;
use store_api::logstore::LogStore;
use table::metadata::TableId;
use store_api::storage::RegionId;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
@@ -38,11 +39,9 @@ use crate::sst::AccessLayerRef;
use crate::version::LevelMetasRef;
use crate::wal::Wal;
/// Table compaction request.
/// Region compaction request.
pub struct CompactionRequestImpl<S: LogStore> {
table_id: TableId,
pub levels: LevelMetasRef,
pub schema: RegionSchemaRef,
pub region_id: RegionId,
pub sst_layer: AccessLayerRef,
pub writer: RegionWriterRef,
pub shared: SharedDataRef,
@@ -50,36 +49,48 @@ pub struct CompactionRequestImpl<S: LogStore> {
pub wal: Wal<S>,
}
impl<S: LogStore> CompactionRequestImpl<S> {
#[inline]
pub(crate) fn schema(&self) -> RegionSchemaRef {
self.shared.version_control.current().schema().clone()
}
#[inline]
pub(crate) fn levels(&self) -> LevelMetasRef {
self.shared.version_control.current().ssts().clone()
}
}
impl<S: LogStore> CompactionRequest for CompactionRequestImpl<S> {
#[inline]
fn table_id(&self) -> TableId {
self.table_id
fn region_id(&self) -> RegionId {
self.region_id
}
}
pub trait CompactionRequest: Send + Sync + 'static {
fn table_id(&self) -> TableId;
fn region_id(&self) -> RegionId;
}
#[derive(Debug)]
pub struct CompactionSchedulerConfig {
max_inflight_task: usize,
pub max_inflight_task: usize,
}
impl Default for CompactionSchedulerConfig {
fn default() -> Self {
Self {
max_inflight_task: 16,
max_inflight_task: 4,
}
}
}
/// CompactionScheduler defines a set of API to schedule compaction tasks.
#[async_trait]
pub trait CompactionScheduler<R> {
pub trait CompactionScheduler<R>: Debug {
/// Schedules a compaction request.
/// Returns true if request is scheduled. Returns false if task queue already
/// contains the request with same table id.
/// contains the request with same region id.
async fn schedule(&self, request: R) -> Result<bool>;
/// Stops compaction scheduler.
@@ -87,14 +98,22 @@ pub trait CompactionScheduler<R> {
}
/// Compaction task scheduler based on local state.
#[allow(unused)]
pub struct LocalCompactionScheduler<R: CompactionRequest> {
request_queue: Arc<RwLock<DedupDeque<TableId, R>>>,
request_queue: Arc<RwLock<DedupDeque<RegionId, R>>>,
cancel_token: CancellationToken,
task_notifier: Arc<Notify>,
join_handle: Mutex<Option<JoinHandle<()>>>,
}
impl<R> Debug for LocalCompactionScheduler<R>
where
R: CompactionRequest + Send + Sync,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LocalCompactionScheduler<...>").finish()
}
}
#[async_trait]
impl<R> CompactionScheduler<R> for LocalCompactionScheduler<R>
where
@@ -103,11 +122,11 @@ where
async fn schedule(&self, request: R) -> Result<bool> {
debug!(
"Schedule request: {}, queue size: {}",
request.table_id(),
request.region_id(),
self.remaining_requests().await
);
let mut queue = self.request_queue.write().unwrap();
let res = queue.push_back(request.table_id(), request);
let res = queue.push_back(request.region_id(), request);
self.task_notifier.notify_one();
Ok(res)
}
@@ -122,7 +141,6 @@ where
}
}
#[allow(unused)]
impl<R> LocalCompactionScheduler<R>
where
R: CompactionRequest,
@@ -132,7 +150,7 @@ where
T: CompactionTask,
P: Picker<R, T> + Send + Sync,
{
let request_queue: Arc<RwLock<DedupDeque<TableId, R>>> =
let request_queue: Arc<RwLock<DedupDeque<RegionId, R>>> =
Arc::new(RwLock::new(DedupDeque::default()));
let cancel_token = CancellationToken::new();
let task_notifier = Arc::new(Notify::new());
@@ -164,9 +182,8 @@ where
}
}
#[allow(unused)]
struct CompactionHandler<R, T: CompactionTask, P: Picker<R, T>> {
req_queue: Arc<RwLock<DedupDeque<TableId, R>>>,
req_queue: Arc<RwLock<DedupDeque<RegionId, R>>>,
cancel_token: CancellationToken,
task_notifier: Arc<Notify>,
limiter: Arc<CascadeRateLimiter<R>>,
@@ -174,9 +191,8 @@ struct CompactionHandler<R, T: CompactionTask, P: Picker<R, T>> {
_phantom_data: PhantomData<T>,
}
#[allow(unused)]
impl<R: CompactionRequest, T: CompactionTask, P: Picker<R, T>> CompactionHandler<R, T, P> {
/// Runs table compaction requests dispatch loop.
/// Runs region compaction requests dispatch loop.
pub async fn run(&self) {
let task_notifier = self.task_notifier.clone();
let limiter = self.limiter.clone();
@@ -186,15 +202,19 @@ impl<R: CompactionRequest, T: CompactionTask, P: Picker<R, T>> CompactionHandler
// poll requests as many as possible until rate limited, and then wait for
// notification (some task's finished).
debug!("Notified, queue size: {:?}", self.req_queue.read().unwrap().len());
while let Some((table_id, req)) = self.poll_task().await {
while let Some((region_id, req)) = self.poll_task().await{
if let Ok(token) = limiter.acquire_token(&req) {
debug!("Executing compaction request: {}", table_id);
self.handle_compaction_request(req, token).await;
debug!("Executing compaction request: {}", region_id);
if let Err(e) = self.handle_compaction_request(req, token).await {
error!(e; "Failed to submit compaction task for region: {}", region_id);
} else {
info!("Submitted region compaction task: {}", region_id);
}
} else {
// compaction rate limited, put back to req queue to wait for next
// schedule
debug!("Put back request {}, queue size: {}", table_id, self.req_queue.read().unwrap().len());
self.put_back_req(table_id, req).await;
debug!("Put back request {}, queue size: {}", region_id, self.req_queue.read().unwrap().len());
self.put_back_req(region_id, req).await;
break;
}
}
@@ -208,35 +228,36 @@ impl<R: CompactionRequest, T: CompactionTask, P: Picker<R, T>> CompactionHandler
}
#[inline]
async fn poll_task(&self) -> Option<(TableId, R)> {
async fn poll_task(&self) -> Option<(RegionId, R)> {
let mut queue = self.req_queue.write().unwrap();
queue.pop_front()
}
/// Puts request back to the front of request queue.
#[inline]
async fn put_back_req(&self, table_id: TableId, req: R) {
async fn put_back_req(&self, region_id: RegionId, req: R) {
let mut queue = self.req_queue.write().unwrap();
queue.push_front(table_id, req);
queue.push_front(region_id, req);
}
// Handles compaction request, submit task to bg runtime.
async fn handle_compaction_request(
&self,
mut req: R,
token: BoxedRateLimitToken,
) -> Result<()> {
async fn handle_compaction_request(&self, req: R, token: BoxedRateLimitToken) -> Result<()> {
let cloned_notify = self.task_notifier.clone();
let table_id = req.table_id();
let region_id = req.region_id();
let Some(task) = self.build_compaction_task(req).await? else {
info!("No file needs compaction in table: {}", table_id);
info!("No file needs compaction in region: {}", region_id);
return Ok(());
};
debug!("Compaction task, region: {}, task: {:?}", region_id, task);
// TODO(hl): we need to keep a track of task handle here to allow task cancellation.
common_runtime::spawn_bg(async move {
task.run().await; // TODO(hl): handle errors
if let Err(e) = task.run().await {
// TODO(hl): maybe resubmit compaction task on failure?
error!(e; "Failed to compact region: {}", region_id);
} else {
info!("Successfully compacted region: {}", region_id);
}
// releases rate limit token
token.try_release();
// notify scheduler to schedule next task when current task finishes.
@@ -246,7 +267,6 @@ impl<R: CompactionRequest, T: CompactionTask, P: Picker<R, T>> CompactionHandler
Ok(())
}
// TODO(hl): generate compaction task(find SSTs to compact along with the output of compaction)
async fn build_compaction_task(&self, req: R) -> crate::error::Result<Option<T>> {
let ctx = PickerContext {};
self.picker.pick(&ctx, &req)
@@ -333,12 +353,12 @@ mod tests {
#[derive(Default, Debug)]
struct MockRequest {
table_id: TableId,
region_id: RegionId,
}
impl CompactionRequest for MockRequest {
fn table_id(&self) -> TableId {
self.table_id
fn region_id(&self) -> RegionId {
self.region_id
}
}
@@ -356,12 +376,12 @@ mod tests {
);
scheduler
.schedule(MockRequest { table_id: 1 })
.schedule(MockRequest { region_id: 1 })
.await
.unwrap();
scheduler
.schedule(MockRequest { table_id: 2 })
.schedule(MockRequest { region_id: 2 })
.await
.unwrap();
@@ -390,7 +410,7 @@ mod tests {
for i in 0..task_size {
scheduler
.schedule(MockRequest {
table_id: i as TableId,
region_id: i as RegionId,
})
.await
.unwrap();
@@ -420,7 +440,7 @@ mod tests {
for i in 0..task_size / 2 {
scheduler
.schedule(MockRequest {
table_id: i as TableId,
region_id: i as RegionId,
})
.await
.unwrap();
@@ -430,7 +450,7 @@ mod tests {
for i in task_size / 2..task_size {
scheduler
.schedule(MockRequest {
table_id: i as TableId,
region_id: i as RegionId,
})
.await
.unwrap();
@@ -453,7 +473,7 @@ mod tests {
let mut scheduled_task = 0;
for _ in 0..10 {
if scheduler
.schedule(MockRequest { table_id: 1 })
.schedule(MockRequest { region_id: 1 })
.await
.unwrap()
{

View File

@@ -43,13 +43,14 @@ impl Strategy for SimpleTimeWindowStrategy {
return vec![];
}
let files = find_compactable_files(level);
debug!("Compactable files found: {:?}", files);
if files.is_empty() {
return vec![];
}
let time_bucket = infer_time_bucket(&files);
let buckets = calculate_time_buckets(time_bucket, &files);
debug!("File buckets: {:?}", buckets);
debug!("File bucket:{}, file groups: {:?}", time_bucket, buckets);
buckets
.into_iter()
.map(|(bound, files)| CompactionOutput {
@@ -89,12 +90,7 @@ fn calculate_time_buckets(bucket_sec: i64, files: &[FileHandle]) -> HashMap<i64,
.push(file.clone());
}
} else {
// Files without timestamp range is assign to a special bucket `i64::MAX`,
// so that they can be compacted together.
buckets
.entry(i64::MAX)
.or_insert_with(Vec::new)
.push(file.clone());
warn!("Found corrupted SST without timestamp bounds: {:?}", file);
}
}
buckets
@@ -303,19 +299,7 @@ mod tests {
&[(0, &["a"]), (10, &["a"])],
);
// files without timestamp are align to a special bucket: i64::MAX
check_bucket_calculation(
10,
vec![FileHandle::new(FileMeta {
file_name: "a".to_string(),
time_range: None,
level: 0,
})],
&[(i64::MAX, &["a"])],
);
// file with an large time range
let expected = (0..(TIME_BUCKETS[4] / TIME_BUCKETS[0]))
.into_iter()
.map(|b| (b * TIME_BUCKETS[0], &["a"] as _))

View File

@@ -13,9 +13,9 @@
// limitations under the License.
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use common_telemetry::{error, info};
use object_store::ObjectStore;
use store_api::logstore::LogStore;
use uuid::Uuid;
@@ -25,17 +25,15 @@ use crate::manifest::action::RegionEdit;
use crate::manifest::region::RegionManifest;
use crate::region::{RegionWriterRef, SharedDataRef};
use crate::schema::RegionSchemaRef;
use crate::sst::parquet::{ParquetWriter, Source};
use crate::sst::{AccessLayerRef, FileHandle, FileMeta, Level, SstInfo, WriteOptions};
use crate::sst::{AccessLayerRef, FileHandle, FileMeta, Level, Source, SstInfo, WriteOptions};
use crate::wal::Wal;
#[async_trait::async_trait]
pub trait CompactionTask: Send + Sync + 'static {
pub trait CompactionTask: Debug + Send + Sync + 'static {
async fn run(self) -> Result<()>;
}
#[allow(unused)]
pub(crate) struct CompactionTaskImpl<S: LogStore> {
pub struct CompactionTaskImpl<S: LogStore> {
pub schema: RegionSchemaRef,
pub sst_layer: AccessLayerRef,
pub outputs: Vec<CompactionOutput>,
@@ -45,6 +43,14 @@ pub(crate) struct CompactionTaskImpl<S: LogStore> {
pub manifest: RegionManifest,
}
impl<S: LogStore> Debug for CompactionTaskImpl<S> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CompactionTaskImpl")
.field("region_name", &self.shared_data.name())
.finish()
}
}
impl<S: LogStore> Drop for CompactionTaskImpl<S> {
fn drop(&mut self) {
self.mark_files_compacting(false);
@@ -60,7 +66,6 @@ impl<S: LogStore> CompactionTaskImpl<S> {
for output in self.outputs.drain(..) {
let schema = self.schema.clone();
let sst_layer = self.sst_layer.clone();
let object_store = self.sst_layer.object_store();
compacted_inputs.extend(output.inputs.iter().map(|f| FileMeta {
file_name: f.file_name().to_string(),
time_range: *f.time_range(),
@@ -69,7 +74,7 @@ impl<S: LogStore> CompactionTaskImpl<S> {
// TODO(hl): Maybe spawn to runtime to exploit in-job parallelism.
futs.push(async move {
match output.build(schema, sst_layer, object_store).await {
match output.build(schema, sst_layer).await {
Ok(meta) => Ok(meta),
Err(e) => Err(e),
}
@@ -137,17 +142,9 @@ impl<S: LogStore> CompactionTask for CompactionTaskImpl<S> {
}
}
#[allow(unused)]
pub(crate) struct CompactionInput {
input_level: u8,
output_level: u8,
file: FileHandle,
}
/// Many-to-many compaction can be decomposed to a many-to-one compaction from level n to level n+1
/// and a many-to-one compaction from level n+1 to level n+1.
#[derive(Debug)]
#[allow(unused)]
pub struct CompactionOutput {
/// Compaction output file level.
pub(crate) output_level: Level,
@@ -160,15 +157,10 @@ pub struct CompactionOutput {
}
impl CompactionOutput {
async fn build(
&self,
schema: RegionSchemaRef,
sst_layer: AccessLayerRef,
object_store: ObjectStore,
) -> Result<FileMeta> {
async fn build(&self, schema: RegionSchemaRef, sst_layer: AccessLayerRef) -> Result<FileMeta> {
let reader = build_sst_reader(
schema,
sst_layer,
sst_layer.clone(),
&self.inputs,
self.bucket_bound,
self.bucket_bound + self.bucket,
@@ -176,10 +168,10 @@ impl CompactionOutput {
.await?;
let output_file_name = format!("{}.parquet", Uuid::new_v4().hyphenated());
let opts = WriteOptions {};
let SstInfo { time_range } =
ParquetWriter::new(&output_file_name, Source::Reader(reader), object_store)
.write_sst(&opts)
.await?;
let SstInfo { time_range } = sst_layer
.write_sst(&output_file_name, Source::Reader(reader), &opts)
.await?;
Ok(FileMeta {
file_name: output_file_name,
@@ -197,10 +189,18 @@ pub mod tests {
use crate::compaction::task::CompactionTask;
pub type CallbackRef = Arc<dyn Fn() + Send + Sync>;
pub struct NoopCompactionTask {
pub cbs: Vec<CallbackRef>,
}
impl Debug for NoopCompactionTask {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("storage::compaction::task::tests::NoopCompactionTask")
.finish()
}
}
impl NoopCompactionTask {
pub fn new(cbs: Vec<CallbackRef>) -> Self {
Self { cbs }

View File

@@ -102,8 +102,8 @@ mod tests {
};
use crate::metadata::RegionMetadata;
use crate::sst;
use crate::sst::parquet::{ParquetWriter, Source};
use crate::sst::{FileMeta, FsAccessLayer, SstInfo, WriteOptions};
use crate::sst::parquet::ParquetWriter;
use crate::sst::{FileMeta, FsAccessLayer, Source, SstInfo, WriteOptions};
use crate::test_util::descriptor_util::RegionDescBuilder;
fn schema_for_test() -> RegionSchemaRef {

View File

@@ -14,5 +14,13 @@
//! storage engine config
#[derive(Debug, Default, Clone)]
pub struct EngineConfig {}
#[derive(Debug, Clone)]
pub struct EngineConfig {
pub max_files_in_l0: usize,
}
impl Default for EngineConfig {
fn default() -> Self {
Self { max_files_in_l0: 8 }
}
}

View File

@@ -25,6 +25,7 @@ use store_api::storage::{
};
use crate::background::JobPoolImpl;
use crate::compaction::CompactionSchedulerRef;
use crate::config::EngineConfig;
use crate::error::{self, Error, Result};
use crate::flush::{FlushSchedulerImpl, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy};
@@ -84,9 +85,19 @@ impl<S: LogStore> StorageEngine for EngineImpl<S> {
}
impl<S: LogStore> EngineImpl<S> {
pub fn new(config: EngineConfig, log_store: Arc<S>, object_store: ObjectStore) -> Self {
pub fn new(
config: EngineConfig,
log_store: Arc<S>,
object_store: ObjectStore,
compaction_scheduler: CompactionSchedulerRef<S>,
) -> Self {
Self {
inner: Arc::new(EngineInner::new(config, log_store, object_store)),
inner: Arc::new(EngineInner::new(
config,
log_store,
object_store,
compaction_scheduler,
)),
}
}
}
@@ -210,13 +221,19 @@ struct EngineInner<S: LogStore> {
memtable_builder: MemtableBuilderRef,
flush_scheduler: FlushSchedulerRef,
flush_strategy: FlushStrategyRef,
compaction_scheduler: CompactionSchedulerRef<S>,
config: Arc<EngineConfig>,
}
impl<S: LogStore> EngineInner<S> {
pub fn new(_config: EngineConfig, log_store: Arc<S>, object_store: ObjectStore) -> Self {
pub fn new(
config: EngineConfig,
log_store: Arc<S>,
object_store: ObjectStore,
compaction_scheduler: CompactionSchedulerRef<S>,
) -> Self {
let job_pool = Arc::new(JobPoolImpl {});
let flush_scheduler = Arc::new(FlushSchedulerImpl::new(job_pool));
Self {
object_store,
log_store,
@@ -224,6 +241,8 @@ impl<S: LogStore> EngineInner<S> {
memtable_builder: Arc::new(DefaultMemtableBuilder::default()),
flush_scheduler,
flush_strategy: Arc::new(SizeBasedStrategy::default()),
compaction_scheduler,
config: Arc::new(config),
}
}
@@ -320,6 +339,8 @@ impl<S: LogStore> EngineInner<S> {
memtable_builder: self.memtable_builder.clone(),
flush_scheduler: self.flush_scheduler.clone(),
flush_strategy: self.flush_strategy.clone(),
compaction_scheduler: self.compaction_scheduler.clone(),
engine_config: self.config.clone(),
}
}
}
@@ -333,6 +354,7 @@ mod tests {
use tempdir::TempDir;
use super::*;
use crate::compaction::noop::NoopCompactionScheduler;
use crate::test_util::descriptor_util::RegionDescBuilder;
#[tokio::test]
@@ -347,7 +369,14 @@ mod tests {
let config = EngineConfig::default();
let engine = EngineImpl::new(config, Arc::new(log_store), object_store);
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let engine = EngineImpl::new(
config,
Arc::new(log_store),
object_store,
compaction_scheduler,
);
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
@@ -27,7 +29,7 @@ use crate::manifest::action::*;
use crate::manifest::region::RegionManifest;
use crate::memtable::{IterContext, MemtableId, MemtableRef};
use crate::region::{RegionWriterRef, SharedDataRef};
use crate::sst::{AccessLayerRef, FileMeta, SstInfo, WriteOptions};
use crate::sst::{AccessLayerRef, FileMeta, Source, SstInfo, WriteOptions};
use crate::wal::Wal;
/// Default write buffer size (32M).
@@ -142,6 +144,8 @@ impl FlushScheduler for FlushSchedulerImpl {
pub type FlushSchedulerRef = Arc<dyn FlushScheduler>;
pub type FlushCallback = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
pub struct FlushJob<S: LogStore> {
/// Max memtable id in these memtables,
/// used to remove immutable memtables in current version.
@@ -160,10 +164,12 @@ pub struct FlushJob<S: LogStore> {
pub wal: Wal<S>,
/// Region manifest service, used to persist metadata.
pub manifest: RegionManifest,
/// Callbacks that get invoked on flush success.
pub on_success: Option<FlushCallback>,
}
impl<S: LogStore> FlushJob<S> {
async fn write_memtables_to_layer(&self, ctx: &Context) -> Result<Vec<FileMeta>> {
async fn write_memtables_to_layer(&mut self, ctx: &Context) -> Result<Vec<FileMeta>> {
if ctx.is_cancelled() {
return CancelledSnafu {}.fail();
}
@@ -184,10 +190,11 @@ impl<S: LogStore> FlushJob<S> {
let file_name = Self::generate_sst_file_name();
// TODO(hl): Check if random file name already exists in meta.
let iter = m.iter(&iter_ctx)?;
let sst_layer = self.sst_layer.clone();
futures.push(async move {
let SstInfo { time_range } = self
.sst_layer
.write_sst(&file_name, iter, &WriteOptions::default())
let SstInfo { time_range } = sst_layer
.write_sst(&file_name, Source::Iter(iter), &WriteOptions::default())
.await?;
Ok(FileMeta {
@@ -209,7 +216,7 @@ impl<S: LogStore> FlushJob<S> {
Ok(metas)
}
async fn write_manifest_and_apply(&self, file_metas: &[FileMeta]) -> Result<()> {
async fn write_manifest_and_apply(&mut self, file_metas: &[FileMeta]) -> Result<()> {
let edit = RegionEdit {
region_version: self.shared.version_control.metadata().version(),
flushed_sequence: Some(self.flush_sequence),
@@ -241,6 +248,10 @@ impl<S: LogStore> Job for FlushJob<S> {
async fn run(&mut self, ctx: &Context) -> Result<()> {
let file_metas = self.write_memtables_to_layer(ctx).await?;
self.write_manifest_and_apply(&file_metas).await?;
if let Some(cb) = self.on_success.take() {
cb.await;
}
Ok(())
}
}

View File

@@ -17,7 +17,7 @@
mod background;
mod chunk;
pub mod codec;
mod compaction;
pub mod compaction;
pub mod config;
mod engine;
pub mod error;

View File

@@ -28,6 +28,8 @@ use store_api::storage::{
WriteResponse,
};
use crate::compaction::CompactionSchedulerRef;
use crate::config::EngineConfig;
use crate::error::{self, Error, Result};
use crate::flush::{FlushSchedulerRef, FlushStrategyRef};
use crate::manifest::action::{
@@ -107,13 +109,15 @@ impl<S: LogStore> Region for RegionImpl<S> {
///
/// Contains all necessary storage related components needed by the region, such as logstore,
/// manifest, memtable builder.
pub struct StoreConfig<S> {
pub struct StoreConfig<S: LogStore> {
pub log_store: Arc<S>,
pub sst_layer: AccessLayerRef,
pub manifest: RegionManifest,
pub memtable_builder: MemtableBuilderRef,
pub flush_scheduler: FlushSchedulerRef,
pub flush_strategy: FlushStrategyRef,
pub compaction_scheduler: CompactionSchedulerRef<S>,
pub engine_config: Arc<EngineConfig>,
}
pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata));
@@ -163,10 +167,14 @@ impl<S: LogStore> RegionImpl<S> {
name,
version_control: Arc::new(version_control),
}),
writer: Arc::new(RegionWriter::new(store_config.memtable_builder)),
writer: Arc::new(RegionWriter::new(
store_config.memtable_builder,
store_config.engine_config.clone(),
)),
wal,
flush_strategy: store_config.flush_strategy,
flush_scheduler: store_config.flush_scheduler,
compaction_scheduler: store_config.compaction_scheduler,
sst_layer: store_config.sst_layer,
manifest: store_config.manifest,
});
@@ -236,11 +244,15 @@ impl<S: LogStore> RegionImpl<S> {
version_control,
});
let writer = Arc::new(RegionWriter::new(store_config.memtable_builder));
let writer = Arc::new(RegionWriter::new(
store_config.memtable_builder,
store_config.engine_config.clone(),
));
let writer_ctx = WriterContext {
shared: &shared,
flush_strategy: &store_config.flush_strategy,
flush_scheduler: &store_config.flush_scheduler,
compaction_scheduler: &store_config.compaction_scheduler,
sst_layer: &store_config.sst_layer,
wal: &wal,
writer: &writer,
@@ -257,6 +269,7 @@ impl<S: LogStore> RegionImpl<S> {
wal,
flush_strategy: store_config.flush_strategy,
flush_scheduler: store_config.flush_scheduler,
compaction_scheduler: store_config.compaction_scheduler,
sst_layer: store_config.sst_layer,
manifest: store_config.manifest,
});
@@ -387,6 +400,7 @@ impl<S: LogStore> RegionImpl<S> {
shared: &inner.shared,
flush_strategy: &inner.flush_strategy,
flush_scheduler: &inner.flush_scheduler,
compaction_scheduler: &inner.compaction_scheduler,
sst_layer: &inner.sst_layer,
wal: &inner.wal,
writer: &inner.writer,
@@ -429,6 +443,7 @@ struct RegionInner<S: LogStore> {
wal: Wal<S>,
flush_strategy: FlushStrategyRef,
flush_scheduler: FlushSchedulerRef,
compaction_scheduler: CompactionSchedulerRef<S>,
sst_layer: AccessLayerRef,
manifest: RegionManifest,
}
@@ -467,6 +482,7 @@ impl<S: LogStore> RegionInner<S> {
shared: &self.shared,
flush_strategy: &self.flush_strategy,
flush_scheduler: &self.flush_scheduler,
compaction_scheduler: &self.compaction_scheduler,
sst_layer: &self.sst_layer,
wal: &self.wal,
writer: &self.writer,

View File

@@ -14,7 +14,8 @@
use std::sync::Arc;
use common_telemetry::logging;
use common_telemetry::tracing::log::info;
use common_telemetry::{error, logging};
use futures::TryStreamExt;
use snafu::ResultExt;
use store_api::logstore::LogStore;
@@ -23,8 +24,10 @@ use store_api::storage::{AlterRequest, SequenceNumber, WriteContext, WriteRespon
use tokio::sync::Mutex;
use crate::background::JobHandle;
use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef};
use crate::config::EngineConfig;
use crate::error::{self, Result};
use crate::flush::{FlushJob, FlushSchedulerRef, FlushStrategyRef};
use crate::flush::{FlushCallback, FlushJob, FlushSchedulerRef, FlushStrategyRef};
use crate::manifest::action::{
RawRegionMetadata, RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList,
};
@@ -34,7 +37,7 @@ use crate::proto::wal::WalHeader;
use crate::region::{RecoverdMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef};
use crate::schema::compat::CompatWrite;
use crate::sst::AccessLayerRef;
use crate::version::{VersionControl, VersionControlRef, VersionEdit};
use crate::version::{VersionControl, VersionControlRef, VersionEdit, VersionRef};
use crate::wal::Wal;
use crate::write_batch::WriteBatch;
@@ -56,9 +59,9 @@ pub struct RegionWriter {
}
impl RegionWriter {
pub fn new(memtable_builder: MemtableBuilderRef) -> RegionWriter {
pub fn new(memtable_builder: MemtableBuilderRef, config: Arc<EngineConfig>) -> RegionWriter {
RegionWriter {
inner: Mutex::new(WriterInner::new(memtable_builder)),
inner: Mutex::new(WriterInner::new(memtable_builder, config)),
version_mutex: Mutex::new(()),
}
}
@@ -241,6 +244,7 @@ pub struct WriterContext<'a, S: LogStore> {
pub shared: &'a SharedDataRef,
pub flush_strategy: &'a FlushStrategyRef,
pub flush_scheduler: &'a FlushSchedulerRef,
pub compaction_scheduler: &'a CompactionSchedulerRef<S>,
pub sst_layer: &'a AccessLayerRef,
pub wal: &'a Wal<S>,
pub writer: &'a RegionWriterRef,
@@ -271,13 +275,15 @@ impl<'a, S: LogStore> AlterContext<'a, S> {
struct WriterInner {
memtable_builder: MemtableBuilderRef,
flush_handle: Option<JobHandle>,
engine_config: Arc<EngineConfig>,
}
impl WriterInner {
fn new(memtable_builder: MemtableBuilderRef) -> WriterInner {
fn new(memtable_builder: MemtableBuilderRef, engine_config: Arc<EngineConfig>) -> WriterInner {
WriterInner {
memtable_builder,
flush_handle: None,
engine_config,
}
}
@@ -541,6 +547,8 @@ impl WriterInner {
return Ok(());
}
let cb = Self::build_flush_callback(&current_version, ctx, &self.engine_config);
let flush_req = FlushJob {
max_memtable_id: max_memtable_id.unwrap(),
memtables: mem_to_flush,
@@ -551,6 +559,7 @@ impl WriterInner {
writer: ctx.writer.clone(),
wal: ctx.wal.clone(),
manifest: ctx.manifest.clone(),
on_success: cb,
};
let flush_handle = ctx
@@ -561,4 +570,51 @@ impl WriterInner {
Ok(())
}
fn build_flush_callback<S: LogStore>(
version: &VersionRef,
ctx: &WriterContext<S>,
config: &Arc<EngineConfig>,
) -> Option<FlushCallback> {
let region_id = version.metadata().id();
let compaction_request = CompactionRequestImpl {
region_id,
sst_layer: ctx.sst_layer.clone(),
writer: ctx.writer.clone(),
shared: ctx.shared.clone(),
manifest: ctx.manifest.clone(),
wal: ctx.wal.clone(),
};
let compaction_scheduler = ctx.compaction_scheduler.clone();
let shared_data = ctx.shared.clone();
let max_files_in_l0 = config.max_files_in_l0;
let schedule_compaction_cb = Box::pin(async move {
let level0_file_num = shared_data
.version_control
.current()
.ssts()
.level(0)
.file_num();
if level0_file_num <= max_files_in_l0 {
info!(
"No enough SST files in level 0 (threshold: {}), skip compaction",
max_files_in_l0
);
return;
}
match compaction_scheduler.schedule(compaction_request).await {
Ok(scheduled) => {
info!(
"Schedule region {} compaction request result: {}",
region_id, scheduled
)
}
Err(e) => {
error!(e;"Failed to schedule region compaction request {}", region_id);
}
}
});
Some(schedule_compaction_cb)
}
}

View File

@@ -23,13 +23,15 @@ use common_time::range::TimestampRange;
use common_time::Timestamp;
use object_store::{util, ObjectStore};
use serde::{Deserialize, Serialize};
use store_api::storage::ChunkReader;
use table::predicate::Predicate;
use crate::chunk::ChunkReaderImpl;
use crate::error::Result;
use crate::memtable::BoxedBatchIterator;
use crate::read::BoxedBatchReader;
use crate::read::{Batch, BoxedBatchReader};
use crate::schema::ProjectedSchemaRef;
use crate::sst::parquet::{ParquetReader, ParquetWriter, Source};
use crate::sst::parquet::{ParquetReader, ParquetWriter};
/// Maximum level of SSTs.
pub const MAX_LEVEL: u8 = 2;
@@ -111,7 +113,7 @@ pub struct LevelMeta {
}
impl LevelMeta {
pub fn new_empty(level: Level) -> Self {
pub fn new(level: Level) -> Self {
Self {
level,
files: HashMap::new(),
@@ -132,6 +134,12 @@ impl LevelMeta {
self.level
}
/// Returns number of SST files in level.
#[inline]
pub fn file_num(&self) -> usize {
self.files.len()
}
pub fn files(&self) -> impl Iterator<Item = &FileHandle> {
self.files.values()
}
@@ -140,7 +148,7 @@ impl LevelMeta {
fn new_level_meta_vec() -> LevelMetaVec {
(0u8..MAX_LEVEL)
.into_iter()
.map(LevelMeta::new_empty)
.map(LevelMeta::new)
.collect::<Vec<_>>()
.try_into()
.unwrap() // safety: LevelMetaVec is a fixed length array with length MAX_LEVEL
@@ -243,7 +251,7 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug {
async fn write_sst(
&self,
file_name: &str,
iter: BoxedBatchIterator,
source: Source,
opts: &WriteOptions,
) -> Result<SstInfo>;
@@ -256,6 +264,33 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug {
pub type AccessLayerRef = Arc<dyn AccessLayer>;
/// Parquet writer data source.
pub enum Source {
/// Writes rows from memtable to parquet
Iter(BoxedBatchIterator),
/// Writes row from ChunkReaderImpl (maybe a set of SSTs) to parquet.
Reader(ChunkReaderImpl),
}
impl Source {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
match self {
Source::Iter(iter) => iter.next().transpose(),
Source::Reader(reader) => reader
.next_chunk()
.await
.map(|p| p.map(|chunk| Batch::new(chunk.columns))),
}
}
fn projected_schema(&self) -> ProjectedSchemaRef {
match self {
Source::Iter(iter) => iter.schema(),
Source::Reader(reader) => reader.projected_schema().clone(),
}
}
}
/// Sst access layer based on local file system.
#[derive(Debug)]
pub struct FsAccessLayer {
@@ -282,13 +317,13 @@ impl AccessLayer for FsAccessLayer {
async fn write_sst(
&self,
file_name: &str,
iter: BoxedBatchIterator,
source: Source,
opts: &WriteOptions,
) -> Result<SstInfo> {
// Now we only supports parquet format. We may allow caller to specific SST format in
// WriteOptions in the future.
let file_path = self.sst_file_path(file_name);
let writer = ParquetWriter::new(&file_path, Source::Iter(iter), self.object_store.clone());
let writer = ParquetWriter::new(&file_path, source, self.object_store.clone());
writer.write_sst(opts).await
}

View File

@@ -45,21 +45,18 @@ use parquet::file::properties::WriterProperties;
use parquet::format::FileMetaData;
use parquet::schema::types::SchemaDescriptor;
use snafu::{OptionExt, ResultExt};
use store_api::storage::ChunkReader;
use table::predicate::Predicate;
use tokio::io::BufReader;
use crate::chunk::ChunkReaderImpl;
use crate::error::{
self, DecodeParquetTimeRangeSnafu, NewRecordBatchSnafu, ReadObjectSnafu, ReadParquetSnafu,
Result, WriteObjectSnafu, WriteParquetSnafu,
};
use crate::memtable::BoxedBatchIterator;
use crate::read::{Batch, BatchReader};
use crate::schema::compat::ReadAdapter;
use crate::schema::{ProjectedSchemaRef, StoreSchema, StoreSchemaRef};
use crate::sst;
use crate::sst::SstInfo;
use crate::sst::{Source, SstInfo};
/// Parquet sst writer.
pub struct ParquetWriter<'a> {
file_path: &'a str,
@@ -321,7 +318,6 @@ impl<'a> ParquetReader<'a> {
// checks if converting time range unit into ts col unit will result into rounding error.
if time_unit_lossy(&self.time_range, ts_col_unit) {
let filter = RowFilter::new(vec![Box::new(PlainTimestampRowFilter::new(
ts_col_idx,
self.time_range,
projection,
))]);
@@ -343,15 +339,9 @@ impl<'a> ParquetReader<'a> {
.and_then(|s| s.convert_to(ts_col_unit))
.map(|t| t.value()),
) {
Box::new(FastTimestampRowFilter::new(
ts_col_idx, projection, lower, upper,
)) as _
Box::new(FastTimestampRowFilter::new(projection, lower, upper)) as _
} else {
Box::new(PlainTimestampRowFilter::new(
ts_col_idx,
self.time_range,
projection,
)) as _
Box::new(PlainTimestampRowFilter::new(self.time_range, projection)) as _
};
let filter = RowFilter::new(vec![row_filter]);
Some(filter)
@@ -372,21 +362,14 @@ fn time_unit_lossy(range: &TimestampRange, ts_col_unit: TimeUnit) -> bool {
/// `FastTimestampRowFilter` is used to filter rows within given timestamp range when reading
/// row groups from parquet files, while avoids fetching all columns from SSTs file.
struct FastTimestampRowFilter {
timestamp_index: usize,
lower_bound: i64,
upper_bound: i64,
projection: ProjectionMask,
}
impl FastTimestampRowFilter {
fn new(
ts_col_idx: usize,
projection: ProjectionMask,
lower_bound: i64,
upper_bound: i64,
) -> Self {
fn new(projection: ProjectionMask, lower_bound: i64, upper_bound: i64) -> Self {
Self {
timestamp_index: ts_col_idx,
lower_bound,
upper_bound,
projection,
@@ -401,7 +384,8 @@ impl ArrowPredicate for FastTimestampRowFilter {
/// Selects the rows matching given time range.
fn evaluate(&mut self, batch: RecordBatch) -> std::result::Result<BooleanArray, ArrowError> {
let ts_col = batch.column(self.timestamp_index);
// the projection has only timestamp column, so we can safely take the first column in batch.
let ts_col = batch.column(0);
macro_rules! downcast_and_compute {
($typ: ty) => {
@@ -443,15 +427,13 @@ impl ArrowPredicate for FastTimestampRowFilter {
/// [PlainTimestampRowFilter] iterates each element in timestamp column, build a [Timestamp] struct
/// and checks if given time range contains the timestamp.
struct PlainTimestampRowFilter {
timestamp_index: usize,
time_range: TimestampRange,
projection: ProjectionMask,
}
impl PlainTimestampRowFilter {
fn new(timestamp_index: usize, time_range: TimestampRange, projection: ProjectionMask) -> Self {
fn new(time_range: TimestampRange, projection: ProjectionMask) -> Self {
Self {
timestamp_index,
time_range,
projection,
}
@@ -464,7 +446,8 @@ impl ArrowPredicate for PlainTimestampRowFilter {
}
fn evaluate(&mut self, batch: RecordBatch) -> std::result::Result<BooleanArray, ArrowError> {
let ts_col = batch.column(self.timestamp_index);
// the projection has only timestamp column, so we can safely take the first column in batch.
let ts_col = batch.column(0);
macro_rules! downcast_and_compute {
($array_ty: ty, $unit: ident) => {{
@@ -532,33 +515,6 @@ impl BatchReader for ChunkStream {
}
}
/// Parquet writer data source.
pub enum Source {
/// Writes rows from memtable to parquet
Iter(BoxedBatchIterator),
/// Writes row from ChunkReaderImpl (maybe a set of SSTs) to parquet.
Reader(ChunkReaderImpl),
}
impl Source {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
match self {
Source::Iter(iter) => iter.next().transpose(),
Source::Reader(reader) => reader
.next_chunk()
.await
.map(|p| p.map(|chunk| Batch::new(chunk.columns))),
}
}
fn projected_schema(&self) -> ProjectedSchemaRef {
match self {
Source::Iter(iter) => iter.schema(),
Source::Reader(reader) => reader.projected_schema().clone(),
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -20,6 +20,7 @@ use object_store::backend::fs::Builder;
use object_store::ObjectStore;
use crate::background::JobPoolImpl;
use crate::compaction::noop::NoopCompactionScheduler;
use crate::engine;
use crate::flush::{FlushSchedulerImpl, SizeBasedStrategy};
use crate::manifest::region::RegionManifest;
@@ -51,7 +52,7 @@ pub async fn new_store_config(
..Default::default()
};
let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap());
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
StoreConfig {
log_store,
sst_layer,
@@ -59,5 +60,7 @@ pub async fn new_store_config(
memtable_builder: Arc::new(DefaultMemtableBuilder::default()),
flush_scheduler,
flush_strategy: Arc::new(SizeBasedStrategy::default()),
compaction_scheduler,
engine_config: Default::default(),
}
}

View File

@@ -243,7 +243,7 @@ impl Version {
);
info!(
"After region compaction, region: {}, SST files: {:?}",
"After apply edit, region: {}, SST files: {:?}",
self.metadata.id(),
merged_ssts
);