feat!: reorganize the storage layout (#1609)

* feat: adds data_home to DataOptions

* refactor: split out object store stuffs from datanode instance

* feat: move data_home into FileConfig

* refactor: object storage layers

* feat: adds datanode path to procedure paths

* feat: temp commit

* refactor: clean code

* fix: forgot files

* fix: forgot files

* Update src/common/test-util/src/ports.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* Update tests/runner/src/env.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix: compile error

* chore: cr comments

* fix: dependencies order in cargo

* fix: data path in test

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
dennis zhuang
2023-05-23 13:58:26 +08:00
committed by GitHub
parent 5b304fa692
commit 7c55783e53
40 changed files with 614 additions and 340 deletions

2
Cargo.lock generated
View File

@@ -1872,6 +1872,8 @@ dependencies = [
name = "common-test-util"
version = "0.2.0"
dependencies = [
"once_cell",
"rand",
"tempfile",
]

View File

@@ -24,7 +24,8 @@ tcp_nodelay = true
# WAL options, see `standalone.example.toml`.
[wal]
dir = "/tmp/greptimedb/wal"
# WAL data directory
# dir = "/tmp/greptimedb/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
@@ -34,7 +35,7 @@ sync_write = false
# Storage options, see `standalone.example.toml`.
[storage]
type = "File"
data_dir = "/tmp/greptimedb/data/"
data_home = "/tmp/greptimedb/"
# Compaction options, see `standalone.example.toml`.
[storage.compaction]

View File

@@ -78,8 +78,8 @@ addr = "127.0.0.1:4004"
# WAL options.
[wal]
# WAL data directory.
dir = "/tmp/greptimedb/wal"
# WAL data directory
# dir = "/tmp/greptimedb/wal"
# WAL file size in bytes.
file_size = "1GB"
# WAL purge threshold in bytes.
@@ -96,7 +96,7 @@ sync_write = false
# Storage type.
type = "File"
# Data directory, "/tmp/greptimedb/data" by default.
data_dir = "/tmp/greptimedb/data/"
data_home = "/tmp/greptimedb/"
# Compaction options.
[storage.compaction]

View File

@@ -91,7 +91,7 @@ struct StartCommand {
#[clap(short, long)]
config_file: Option<String>,
#[clap(long)]
data_dir: Option<String>,
data_home: Option<String>,
#[clap(long)]
wal_dir: Option<String>,
#[clap(long)]
@@ -147,14 +147,14 @@ impl StartCommand {
.fail();
}
if let Some(data_dir) = &self.data_dir {
if let Some(data_home) = &self.data_home {
opts.storage.store = ObjectStoreConfig::File(FileConfig {
data_dir: data_dir.clone(),
data_home: data_home.clone(),
});
}
if let Some(wal_dir) = &self.wal_dir {
opts.wal.dir = wal_dir.clone();
opts.wal.dir = Some(wal_dir.clone());
}
if let Some(http_addr) = &self.http_addr {
@@ -214,7 +214,7 @@ mod tests {
tcp_nodelay = true
[wal]
dir = "/tmp/greptimedb/wal"
dir = "/other/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
@@ -223,7 +223,7 @@ mod tests {
[storage]
type = "File"
data_dir = "/tmp/greptimedb/data/"
data_home = "/tmp/greptimedb/"
[storage.compaction]
max_inflight_tasks = 3
@@ -255,6 +255,7 @@ mod tests {
assert_eq!(2, options.mysql_runtime_size);
assert_eq!(Some(42), options.node_id);
assert_eq!("/other/wal", options.wal.dir.unwrap());
assert_eq!(Duration::from_secs(600), options.wal.purge_interval);
assert_eq!(1024 * 1024 * 1024, options.wal.file_size.0);
assert_eq!(1024 * 1024 * 1024 * 50, options.wal.purge_threshold.0);
@@ -273,8 +274,8 @@ mod tests {
assert!(tcp_nodelay);
match &options.storage.store {
ObjectStoreConfig::File(FileConfig { data_dir, .. }) => {
assert_eq!("/tmp/greptimedb/data/", data_dir)
ObjectStoreConfig::File(FileConfig { data_home, .. }) => {
assert_eq!("/tmp/greptimedb/", data_home)
}
ObjectStoreConfig::S3 { .. } => unreachable!(),
ObjectStoreConfig::Oss { .. } => unreachable!(),
@@ -374,7 +375,6 @@ mod tests {
tcp_nodelay = true
[wal]
dir = "/tmp/greptimedb/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
@@ -383,7 +383,7 @@ mod tests {
[storage]
type = "File"
data_dir = "/tmp/greptimedb/data/"
data_home = "/tmp/greptimedb/"
[storage.compaction]
max_inflight_tasks = 3
@@ -464,7 +464,7 @@ mod tests {
assert_eq!(opts.storage.compaction.max_purge_tasks, 32);
// Should be read from cli, cli > config file > env > default values.
assert_eq!(opts.wal.dir, "/other/wal/dir");
assert_eq!(opts.wal.dir.unwrap(), "/other/wal/dir");
// Should be default value.
assert_eq!(

View File

@@ -264,7 +264,7 @@ mod tests {
);
// Should be the values from config file, not environment variables.
assert_eq!(opts.wal.dir, "/tmp/greptimedb/wal".to_string());
assert_eq!(opts.wal.dir.unwrap(), "/tmp/greptimedb/wal");
// Should be default values.
assert_eq!(opts.node_id, None);

View File

@@ -449,7 +449,7 @@ mod tests {
);
assert!(fe_opts.influxdb_options.as_ref().unwrap().enable);
assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir);
assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap());
match &dn_opts.storage.store {
datanode::datanode::ObjectStoreConfig::S3(s3_config) => {
assert_eq!(

View File

@@ -51,7 +51,7 @@ mod tests {
#[ignore]
#[test]
fn test_repl() {
let data_dir = create_temp_dir("data");
let data_home = create_temp_dir("data");
let wal_dir = create_temp_dir("wal");
let mut bin_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
@@ -65,7 +65,7 @@ mod tests {
"start",
"--rpc-addr=0.0.0.0:4321",
"--node-id=1",
&format!("--data-dir={}", data_dir.path().display()),
&format!("--data-home={}", data_home.path().display()),
&format!("--wal-dir={}", wal_dir.path().display()),
])
.stdout(Stdio::null())

View File

@@ -15,6 +15,7 @@
pub mod bit_vec;
pub mod buffer;
pub mod bytes;
pub mod paths;
#[allow(clippy::all)]
pub mod readable_size;

View File

@@ -0,0 +1,25 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Path constants for table engines, cluster states and WAL
/// All paths relative to data_home(file storage) or root path(S3, OSS etc).
/// WAL dir for local file storage
pub const WAL_DIR: &str = "wal/";
/// Data dir for table engines
pub const DATA_DIR: &str = "data/";
/// Cluster state dir
pub const CLUSTER_DIR: &str = "cluster/";

View File

@@ -343,6 +343,7 @@ impl ManagerContext {
/// Config for [LocalManager].
#[derive(Debug)]
pub struct ManagerConfig {
pub parent_path: String,
pub max_retry_times: usize,
pub retry_delay: Duration,
pub remove_outdated_meta_task_interval: Duration,
@@ -352,6 +353,7 @@ pub struct ManagerConfig {
impl Default for ManagerConfig {
fn default() -> Self {
Self {
parent_path: "".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
@@ -363,7 +365,7 @@ impl Default for ManagerConfig {
/// A [ProcedureManager] that maintains procedure states locally.
pub struct LocalManager {
manager_ctx: Arc<ManagerContext>,
state_store: StateStoreRef,
procedure_store: Arc<ProcedureStore>,
max_retry_times: usize,
retry_delay: Duration,
remove_outdated_meta_task: RepeatedTask<Error>,
@@ -382,7 +384,7 @@ impl LocalManager {
);
LocalManager {
manager_ctx,
state_store,
procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)),
max_retry_times: config.max_retry_times,
retry_delay: config.retry_delay,
remove_outdated_meta_task,
@@ -405,7 +407,7 @@ impl LocalManager {
exponential_builder: ExponentialBuilder::default()
.with_min_delay(self.retry_delay)
.with_max_times(self.max_retry_times),
store: ProcedureStore::new(self.state_store.clone()),
store: self.procedure_store.clone(),
rolling_back: false,
};
@@ -466,8 +468,7 @@ impl ProcedureManager for LocalManager {
logging::info!("LocalManager start to recover");
let recover_start = Instant::now();
let procedure_store = ProcedureStore::new(self.state_store.clone());
let (messages, finished_ids) = procedure_store.load_messages().await?;
let (messages, finished_ids) = self.procedure_store.load_messages().await?;
for (procedure_id, message) in &messages {
if message.parent_id.is_none() {
@@ -502,7 +503,7 @@ impl ProcedureManager for LocalManager {
);
for procedure_id in finished_ids {
if let Err(e) = procedure_store.delete_procedure(procedure_id).await {
if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
logging::error!(e; "Failed to delete procedure {}", procedure_id);
}
}
@@ -571,7 +572,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::store::ObjectStateStore;
use crate::store::state_store::ObjectStateStore;
use crate::{Context, Procedure, Status};
#[test]
@@ -680,6 +681,7 @@ mod tests {
fn test_register_loader() {
let dir = create_temp_dir("register");
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
..Default::default()
@@ -702,6 +704,7 @@ mod tests {
let dir = create_temp_dir("recover");
let object_store = test_util::new_object_store(&dir);
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
..Default::default()
@@ -714,7 +717,7 @@ mod tests {
.unwrap();
// Prepare data
let procedure_store = ProcedureStore::from(object_store.clone());
let procedure_store = ProcedureStore::from_object_store(object_store.clone());
let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
let root_id = ProcedureId::random();
// Prepare data for the root procedure.
@@ -749,6 +752,7 @@ mod tests {
async fn test_submit_procedure() {
let dir = create_temp_dir("submit");
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
..Default::default()
@@ -798,6 +802,7 @@ mod tests {
async fn test_state_changed_on_err() {
let dir = create_temp_dir("on_err");
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
..Default::default()
@@ -860,6 +865,7 @@ mod tests {
let dir = create_temp_dir("remove_outdated_meta_task");
let object_store = test_util::new_object_store(&dir);
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_millis(1),

View File

@@ -109,7 +109,7 @@ pub(crate) struct Runner {
pub(crate) manager_ctx: Arc<ManagerContext>,
pub(crate) step: u32,
pub(crate) exponential_builder: ExponentialBuilder,
pub(crate) store: ProcedureStore,
pub(crate) store: Arc<ProcedureStore>,
pub(crate) rolling_back: bool,
}
@@ -471,7 +471,7 @@ mod tests {
fn new_runner(
meta: ProcedureMetaRef,
procedure: BoxedProcedure,
store: ProcedureStore,
store: Arc<ProcedureStore>,
) -> Runner {
Runner {
meta,
@@ -484,8 +484,13 @@ mod tests {
}
}
async fn check_files(object_store: &ObjectStore, procedure_id: ProcedureId, files: &[&str]) {
let dir = proc_path!("{procedure_id}/");
async fn check_files(
object_store: &ObjectStore,
procedure_store: &ProcedureStore,
procedure_id: ProcedureId,
files: &[&str],
) {
let dir = proc_path!(procedure_store, "{procedure_id}/");
let lister = object_store.list(&dir).await.unwrap();
let mut files_in_dir: Vec<_> = lister
.map_ok(|de| de.name().to_string())
@@ -578,16 +583,28 @@ mod tests {
let meta = normal.new_meta(ROOT_ID);
let ctx = context_without_provider(meta.id);
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta, Box::new(normal), procedure_store);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
let res = runner.execute_once(&ctx).await;
assert!(res.is_continue(), "{res:?}");
check_files(&object_store, ctx.procedure_id, first_files).await;
check_files(
&object_store,
&procedure_store,
ctx.procedure_id,
first_files,
)
.await;
let res = runner.execute_once(&ctx).await;
assert!(res.is_done(), "{res:?}");
check_files(&object_store, ctx.procedure_id, second_files).await;
check_files(
&object_store,
&procedure_store,
ctx.procedure_id,
second_files,
)
.await;
}
#[tokio::test]
@@ -626,7 +643,7 @@ mod tests {
let meta = suspend.new_meta(ROOT_ID);
let ctx = context_without_provider(meta.id);
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
let res = runner.execute_once(&ctx).await;
@@ -726,8 +743,8 @@ mod tests {
let procedure_id = meta.id;
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
let manager_ctx = Arc::new(ManagerContext::new());
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta));
@@ -744,7 +761,7 @@ mod tests {
let state = manager_ctx.state(procedure_id).unwrap();
assert!(state.is_done(), "{state:?}");
// Files are removed.
check_files(&object_store, procedure_id, &[]).await;
check_files(&object_store, &procedure_store, procedure_id, &[]).await;
tokio::time::sleep(Duration::from_millis(5)).await;
// Clean outdated meta.
@@ -770,13 +787,19 @@ mod tests {
let meta = fail.new_meta(ROOT_ID);
let ctx = context_without_provider(meta.id);
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
let res = runner.execute_once(&ctx).await;
assert!(res.is_failed(), "{res:?}");
assert!(meta.state().is_failed());
check_files(&object_store, ctx.procedure_id, &["0000000000.rollback"]).await;
check_files(
&object_store,
&procedure_store,
ctx.procedure_id,
&["0000000000.rollback"],
)
.await;
}
#[tokio::test]
@@ -805,8 +828,8 @@ mod tests {
let meta = retry_later.new_meta(ROOT_ID);
let ctx = context_without_provider(meta.id);
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
let res = runner.execute_once(&ctx).await;
assert!(res.is_retry_later(), "{res:?}");
@@ -815,7 +838,13 @@ mod tests {
let res = runner.execute_once(&ctx).await;
assert!(res.is_done(), "{res:?}");
assert!(meta.state().is_done());
check_files(&object_store, ctx.procedure_id, &["0000000000.commit"]).await;
check_files(
&object_store,
&procedure_store,
ctx.procedure_id,
&["0000000000.commit"],
)
.await;
}
#[tokio::test]
@@ -832,7 +861,7 @@ mod tests {
let dir = create_temp_dir("exceed_max_retry_later");
let meta = exceed_max_retry_later.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(
meta.clone(),
Box::new(exceed_max_retry_later),
@@ -906,7 +935,7 @@ mod tests {
let meta = parent.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
let manager_ctx = Arc::new(ManagerContext::new());

View File

@@ -14,27 +14,25 @@
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use common_telemetry::logging;
use futures::TryStreamExt;
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{Result, ToJsonSnafu};
pub(crate) use crate::store::state_store::{ObjectStateStore, StateStoreRef};
pub(crate) use crate::store::state_store::StateStoreRef;
use crate::{BoxedProcedure, ProcedureId};
pub mod state_store;
/// Key prefix of procedure store.
pub(crate) const PROC_PATH: &str = "procedure/";
const PROC_PATH: &str = "procedure/";
/// Constructs a path for procedure store.
macro_rules! proc_path {
($fmt:expr) => { format!("{}{}", $crate::store::PROC_PATH, format_args!($fmt)) };
($fmt:expr, $($args:tt)*) => { format!("{}{}", $crate::store::PROC_PATH, format_args!($fmt, $($args)*)) };
($store: expr, $fmt:expr) => { format!("{}{}", $store.proc_path(), format_args!($fmt)) };
($store: expr, $fmt:expr, $($args:tt)*) => { format!("{}{}", $store.proc_path(), format_args!($fmt, $($args)*)) };
}
pub(crate) use proc_path;
@@ -54,13 +52,22 @@ pub struct ProcedureMessage {
}
/// Procedure storage layer.
#[derive(Clone)]
pub(crate) struct ProcedureStore(StateStoreRef);
pub(crate) struct ProcedureStore {
proc_path: String,
store: StateStoreRef,
}
impl ProcedureStore {
/// Creates a new [ProcedureStore] from specific [StateStoreRef].
pub(crate) fn new(state_store: StateStoreRef) -> ProcedureStore {
ProcedureStore(state_store)
pub(crate) fn new(parent_path: &str, store: StateStoreRef) -> ProcedureStore {
let proc_path = format!("{}{PROC_PATH}", parent_path);
logging::info!("The procedure state store path is: {}", &proc_path);
ProcedureStore { proc_path, store }
}
#[inline]
pub(crate) fn proc_path(&self) -> &str {
&self.proc_path
}
/// Dump the `procedure` to the storage.
@@ -81,6 +88,7 @@ impl ProcedureStore {
step,
};
let key = ParsedKey {
prefix: &self.proc_path,
procedure_id,
step,
key_type: KeyType::Step,
@@ -88,7 +96,7 @@ impl ProcedureStore {
.to_string();
let value = serde_json::to_string(&message).context(ToJsonSnafu)?;
self.0.put(&key, value.into_bytes()).await?;
self.store.put(&key, value.into_bytes()).await?;
Ok(())
}
@@ -100,12 +108,13 @@ impl ProcedureStore {
step: u32,
) -> Result<()> {
let key = ParsedKey {
prefix: &self.proc_path,
procedure_id,
step,
key_type: KeyType::Commit,
}
.to_string();
self.0.put(&key, Vec::new()).await?;
self.store.put(&key, Vec::new()).await?;
Ok(())
}
@@ -117,26 +126,27 @@ impl ProcedureStore {
step: u32,
) -> Result<()> {
let key = ParsedKey {
prefix: &self.proc_path,
procedure_id,
step,
key_type: KeyType::Rollback,
}
.to_string();
self.0.put(&key, Vec::new()).await?;
self.store.put(&key, Vec::new()).await?;
Ok(())
}
/// Delete states of procedure from the storage.
pub(crate) async fn delete_procedure(&self, procedure_id: ProcedureId) -> Result<()> {
let path = proc_path!("{procedure_id}/");
let path = proc_path!(self, "{procedure_id}/");
// TODO(yingwen): We can optimize this to avoid reading the value.
let mut key_values = self.0.walk_top_down(&path).await?;
let mut key_values = self.store.walk_top_down(&path).await?;
// 8 should be enough for most procedures.
let mut step_keys = Vec::with_capacity(8);
let mut finish_keys = Vec::new();
while let Some((key, _)) = key_values.try_next().await? {
let Some(curr_key) = ParsedKey::parse_str(&key) else {
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, &key) else {
logging::warn!("Unknown key while deleting procedures, key: {}", key);
continue;
};
@@ -155,11 +165,11 @@ impl ProcedureStore {
finish_keys
);
// We delete all step keys first.
self.0.batch_delete(step_keys.as_slice()).await?;
self.store.batch_delete(step_keys.as_slice()).await?;
// Then we delete the finish keys, to ensure
self.0.batch_delete(finish_keys.as_slice()).await?;
self.store.batch_delete(finish_keys.as_slice()).await?;
// Finally we remove the directory itself.
self.0.delete(&path).await?;
self.store.delete(&path).await?;
// Maybe we could use procedure_id.commit/rollback as the file name so we could
// use remove_all to remove the directory and then remove the commit/rollback file.
@@ -175,9 +185,9 @@ impl ProcedureStore {
let mut procedure_key_values: HashMap<_, (ParsedKey, Vec<u8>)> = HashMap::new();
// Scan all procedures.
let mut key_values = self.0.walk_top_down(PROC_PATH).await?;
let mut key_values = self.store.walk_top_down(&self.proc_path).await?;
while let Some((key, value)) = key_values.try_next().await? {
let Some(curr_key) = ParsedKey::parse_str(&key) else {
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, &key) else {
logging::warn!("Unknown key while loading procedures, key: {}", key);
continue;
};
@@ -221,14 +231,6 @@ impl ProcedureStore {
}
}
impl From<ObjectStore> for ProcedureStore {
fn from(store: ObjectStore) -> ProcedureStore {
let state_store = ObjectStateStore::new(store);
ProcedureStore::new(Arc::new(state_store))
}
}
/// Suffix type of the key.
#[derive(Debug, PartialEq, Eq)]
enum KeyType {
@@ -258,18 +260,19 @@ impl KeyType {
/// Key to refer the procedure in the [ProcedureStore].
#[derive(Debug, PartialEq, Eq)]
struct ParsedKey {
struct ParsedKey<'a> {
prefix: &'a str,
procedure_id: ProcedureId,
step: u32,
key_type: KeyType,
}
impl fmt::Display for ParsedKey {
impl<'a> fmt::Display for ParsedKey<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}{}/{:010}.{}",
PROC_PATH,
self.prefix,
self.procedure_id,
self.step,
self.key_type.as_str(),
@@ -277,10 +280,10 @@ impl fmt::Display for ParsedKey {
}
}
impl ParsedKey {
impl<'a> ParsedKey<'a> {
/// Try to parse the key from specific `input`.
fn parse_str(input: &str) -> Option<ParsedKey> {
let input = input.strip_prefix(PROC_PATH)?;
fn parse_str(prefix: &'a str, input: &str) -> Option<ParsedKey<'a>> {
let input = input.strip_prefix(prefix)?;
let mut iter = input.rsplit('/');
let name = iter.next()?;
let id_str = iter.next()?;
@@ -294,6 +297,7 @@ impl ParsedKey {
let step = step_str.parse().ok()?;
Some(ParsedKey {
prefix,
procedure_id,
step,
key_type,
@@ -303,6 +307,20 @@ impl ParsedKey {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use object_store::ObjectStore;
use crate::store::state_store::ObjectStateStore;
impl ProcedureStore {
pub(crate) fn from_object_store(store: ObjectStore) -> ProcedureStore {
let state_store = ObjectStateStore::new(store);
ProcedureStore::new("data/", Arc::new(state_store))
}
}
use async_trait::async_trait;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use object_store::services::Fs as Builder;
@@ -316,73 +334,91 @@ mod tests {
builder.root(store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
ProcedureStore::from(object_store)
ProcedureStore::from_object_store(object_store)
}
#[test]
fn test_parsed_key() {
let dir = create_temp_dir("store_procedure");
let store = procedure_store_for_test(&dir);
let procedure_id = ProcedureId::random();
let key = ParsedKey {
prefix: &store.proc_path,
procedure_id,
step: 2,
key_type: KeyType::Step,
};
assert_eq!(
proc_path!("{procedure_id}/0000000002.step"),
proc_path!(store, "{procedure_id}/0000000002.step"),
key.to_string()
);
assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap());
assert_eq!(
key,
ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
);
let key = ParsedKey {
prefix: &store.proc_path,
procedure_id,
step: 2,
key_type: KeyType::Commit,
};
assert_eq!(
proc_path!("{procedure_id}/0000000002.commit"),
proc_path!(store, "{procedure_id}/0000000002.commit"),
key.to_string()
);
assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap());
assert_eq!(
key,
ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
);
let key = ParsedKey {
prefix: &store.proc_path,
procedure_id,
step: 2,
key_type: KeyType::Rollback,
};
assert_eq!(
proc_path!("{procedure_id}/0000000002.rollback"),
proc_path!(store, "{procedure_id}/0000000002.rollback"),
key.to_string()
);
assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap());
assert_eq!(
key,
ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap()
);
}
#[test]
fn test_parse_invalid_key() {
assert!(ParsedKey::parse_str("").is_none());
assert!(ParsedKey::parse_str("invalidprefix").is_none());
assert!(ParsedKey::parse_str("procedu/0000000003.step").is_none());
assert!(ParsedKey::parse_str("procedure-0000000003.step").is_none());
let dir = create_temp_dir("store_procedure");
let store = procedure_store_for_test(&dir);
assert!(ParsedKey::parse_str(&store.proc_path, "").is_none());
assert!(ParsedKey::parse_str(&store.proc_path, "invalidprefix").is_none());
assert!(ParsedKey::parse_str(&store.proc_path, "procedu/0000000003.step").is_none());
assert!(ParsedKey::parse_str(&store.proc_path, "procedure-0000000003.step").is_none());
let procedure_id = ProcedureId::random();
let input = proc_path!("{procedure_id}");
assert!(ParsedKey::parse_str(&input).is_none());
let input = proc_path!(store, "{procedure_id}");
assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
let input = proc_path!("{procedure_id}/");
assert!(ParsedKey::parse_str(&input).is_none());
let input = proc_path!(store, "{procedure_id}");
assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
let input = proc_path!("{procedure_id}/0000000003");
assert!(ParsedKey::parse_str(&input).is_none());
let input = proc_path!(store, "{procedure_id}/0000000003");
assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
let input = proc_path!("{procedure_id}/0000000003.");
assert!(ParsedKey::parse_str(&input).is_none());
let input = proc_path!(store, "{procedure_id}/0000000003.");
assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
let input = proc_path!("{procedure_id}/0000000003.other");
assert!(ParsedKey::parse_str(&input).is_none());
let input = proc_path!(store, "{procedure_id}/0000000003.other");
assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
assert!(ParsedKey::parse_str("12345/0000000003.step").is_none());
assert!(ParsedKey::parse_str(&store.proc_path, "12345/0000000003.step").is_none());
let input = proc_path!("{procedure_id}-0000000003.commit");
assert!(ParsedKey::parse_str(&input).is_none());
let input = proc_path!(store, "{procedure_id}-0000000003.commit");
assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none());
}
#[test]

View File

@@ -5,4 +5,6 @@ edition.workspace = true
license.workspace = true
[dependencies]
once_cell = "1.16"
rand.workspace = true
tempfile.workspace = true

View File

@@ -12,4 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod ports;
pub mod temp_dir;

View File

@@ -0,0 +1,26 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicUsize, Ordering};
use once_cell::sync::OnceCell;
use rand::Rng;
static PORTS: OnceCell<AtomicUsize> = OnceCell::new();
/// Return a unique port(in runtime) for test
pub fn get_port() -> usize {
PORTS
.get_or_init(|| AtomicUsize::new(rand::thread_rng().gen_range(3000..3800)))
.fetch_add(1, Ordering::Relaxed)
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Datanode configurations
use std::sync::Arc;
use std::time::Duration;
@@ -35,6 +37,9 @@ use crate::server::Services;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024);
/// Default data home in file storage
const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb";
/// Object storage config
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
@@ -58,7 +63,7 @@ pub struct StorageConfig {
#[derive(Debug, Clone, Serialize, Default, Deserialize)]
#[serde(default)]
pub struct FileConfig {
pub data_dir: String,
pub data_home: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -122,7 +127,7 @@ impl Default for OssConfig {
impl Default for ObjectStoreConfig {
fn default() -> Self {
ObjectStoreConfig::File(FileConfig {
data_dir: "/tmp/greptimedb/data/".to_string(),
data_home: DEFAULT_DATA_HOME.to_string(),
})
}
}
@@ -131,7 +136,7 @@ impl Default for ObjectStoreConfig {
#[serde(default)]
pub struct WalConfig {
// wal directory
pub dir: String,
pub dir: Option<String>,
// wal file size in bytes
pub file_size: ReadableSize,
// wal purge threshold in bytes
@@ -148,7 +153,7 @@ pub struct WalConfig {
impl Default for WalConfig {
fn default() -> Self {
Self {
dir: "/tmp/greptimedb/wal".to_string(),
dir: None,
file_size: ReadableSize::gb(1), // log file size 1G
purge_threshold: ReadableSize::gb(50), // purge threshold 50G
purge_interval: Duration::from_secs(600),
@@ -342,7 +347,7 @@ pub struct Datanode {
impl Datanode {
pub async fn new(opts: DatanodeOptions) -> Result<Datanode> {
let instance = Arc::new(Instance::new(&opts).await?);
let instance = Arc::new(Instance::with_opts(&opts).await?);
let services = match opts.mode {
Mode::Distributed => Some(Services::try_new(instance.clone(), &opts).await?),
Mode::Standalone => None,

View File

@@ -429,6 +429,9 @@ pub enum Error {
StopProcedureManager {
source: common_procedure::error::Error,
},
#[snafu(display("Missing WAL dir config"))]
MissingWalDirConfig { location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -483,6 +486,7 @@ impl ErrorExt for Error {
| MissingNodeId { .. }
| MissingMetasrvOpts { .. }
| ColumnNoneDefaultValue { .. }
| MissingWalDirConfig { .. }
| PrepareImmutableTable { .. } => StatusCode::InvalidArguments,
EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => {

View File

@@ -20,7 +20,7 @@ use std::{fs, path};
use api::v1::meta::Role;
use catalog::remote::MetaKvBackend;
use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
use common_base::readable_size::ReadableSize;
use common_base::paths::{CLUSTER_DIR, WAL_DIR};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_error::prelude::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
@@ -35,12 +35,8 @@ use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOptions;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::cache_policy::LruCacheLayer;
use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::services::{Fs as FsBuilder, Oss as OSSBuilder, S3 as S3Builder};
use object_store::{util, ObjectStore, ObjectStoreBuilder};
use object_store::{util, ObjectStore};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use secrecy::ExposeSecret;
use servers::Mode;
use session::context::QueryContext;
use snafu::prelude::*;
@@ -56,9 +52,7 @@ use table::table::numbers::NumbersTable;
use table::table::TableIdProviderRef;
use table::Table;
use crate::datanode::{
DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE,
};
use crate::datanode::{DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu,
@@ -69,6 +63,7 @@ use crate::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler
use crate::heartbeat::handler::HandlerGroupExecutor;
use crate::heartbeat::HeartbeatTask;
use crate::sql::{SqlHandler, SqlRequest};
use crate::store;
mod grpc;
pub mod sql;
@@ -88,7 +83,7 @@ pub struct Instance {
pub type InstanceRef = Arc<Instance>;
impl Instance {
pub async fn new(opts: &DatanodeOptions) -> Result<Self> {
pub async fn with_opts(opts: &DatanodeOptions) -> Result<Self> {
let meta_client = match opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
@@ -105,16 +100,16 @@ impl Instance {
let compaction_scheduler = create_compaction_scheduler(opts);
Self::new_with(opts, meta_client, compaction_scheduler).await
Self::new(opts, meta_client, compaction_scheduler).await
}
pub(crate) async fn new_with(
pub(crate) async fn new(
opts: &DatanodeOptions,
meta_client: Option<Arc<MetaClient>>,
compaction_scheduler: CompactionSchedulerRef<RaftEngineLogStore>,
) -> Result<Self> {
let object_store = new_object_store(&opts.storage.store).await?;
let log_store = Arc::new(create_log_store(&opts.wal).await?);
let object_store = store::new_object_store(&opts.storage.store).await?;
let log_store = Arc::new(create_log_store(&opts.storage.store, &opts.wal).await?);
let mito_engine = Arc::new(DefaultEngine::new(
TableEngineConfig {
@@ -222,7 +217,9 @@ impl Instance {
)),
};
let procedure_manager = create_procedure_manager(&opts.procedure, object_store).await?;
let procedure_manager =
create_procedure_manager(opts.node_id.unwrap_or(0), &opts.procedure, object_store)
.await?;
// Register all procedures.
// Register procedures of the mito engine.
mito_engine.register_procedure_loaders(&*procedure_manager);
@@ -353,168 +350,6 @@ fn create_compaction_scheduler<S: LogStore>(opts: &DatanodeOptions) -> Compactio
Arc::new(scheduler)
}
pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
let object_store = match store_config {
ObjectStoreConfig::File { .. } => new_fs_object_store(store_config).await,
ObjectStoreConfig::S3 { .. } => new_s3_object_store(store_config).await,
ObjectStoreConfig::Oss { .. } => new_oss_object_store(store_config).await,
};
// Don't enable retry layer when using local file backend.
let object_store = if !matches!(store_config, ObjectStoreConfig::File(..)) {
object_store.map(|object_store| object_store.layer(RetryLayer::new().with_jitter()))
} else {
object_store
};
object_store.map(|object_store| {
object_store
.layer(MetricsLayer)
.layer(
LoggingLayer::default()
// Print the expected error only in DEBUG level.
// See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level
.with_error_level(Some(log::Level::Debug)),
)
.layer(TracingLayer)
})
}
pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
let oss_config = match store_config {
ObjectStoreConfig::Oss(config) => config,
_ => unreachable!(),
};
let root = util::normalize_dir(&oss_config.root);
info!(
"The oss storage bucket is: {}, root is: {}",
oss_config.bucket, &root
);
let mut builder = OSSBuilder::default();
builder
.root(&root)
.bucket(&oss_config.bucket)
.endpoint(&oss_config.endpoint)
.access_key_id(oss_config.access_key_id.expose_secret())
.access_key_secret(oss_config.access_key_secret.expose_secret());
let object_store = ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish();
create_object_store_with_cache(object_store, store_config).await
}
async fn create_object_store_with_cache(
object_store: ObjectStore,
store_config: &ObjectStoreConfig,
) -> Result<ObjectStore> {
let (cache_path, cache_capacity) = match store_config {
ObjectStoreConfig::S3(s3_config) => {
let path = s3_config.cache_path.as_ref();
let capacity = s3_config
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
ObjectStoreConfig::Oss(oss_config) => {
let path = oss_config.cache_path.as_ref();
let capacity = oss_config
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
_ => (None, ReadableSize(0)),
};
if let Some(path) = cache_path {
let atomic_temp_dir = format!("{path}/.tmp/");
clean_temp_dir(&atomic_temp_dir)?;
let cache_store = FsBuilder::default()
.root(path)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::InitBackendSnafu)?;
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.await
.context(error::InitBackendSnafu)?;
Ok(object_store.layer(cache_layer))
} else {
Ok(object_store)
}
}
pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
let s3_config = match store_config {
ObjectStoreConfig::S3(config) => config,
_ => unreachable!(),
};
let root = util::normalize_dir(&s3_config.root);
info!(
"The s3 storage bucket is: {}, root is: {}",
s3_config.bucket, &root
);
let mut builder = S3Builder::default();
builder
.root(&root)
.bucket(&s3_config.bucket)
.access_key_id(s3_config.access_key_id.expose_secret())
.secret_access_key(s3_config.secret_access_key.expose_secret());
if s3_config.endpoint.is_some() {
builder.endpoint(s3_config.endpoint.as_ref().unwrap());
}
if s3_config.region.is_some() {
builder.region(s3_config.region.as_ref().unwrap());
}
create_object_store_with_cache(
ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish(),
store_config,
)
.await
}
fn clean_temp_dir(dir: &str) -> Result<()> {
if path::Path::new(&dir).exists() {
info!("Begin to clean temp storage directory: {}", dir);
fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
info!("Cleaned temp storage directory: {}", dir);
}
Ok(())
}
pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
let file_config = match store_config {
ObjectStoreConfig::File(config) => config,
_ => unreachable!(),
};
let data_dir = util::normalize_dir(&file_config.data_dir);
fs::create_dir_all(path::Path::new(&data_dir))
.context(error::CreateDirSnafu { dir: &data_dir })?;
info!("The file storage directory is: {}", &data_dir);
let atomic_write_dir = format!("{data_dir}/.tmp/");
clean_temp_dir(&atomic_write_dir)?;
let mut builder = FsBuilder::default();
builder.root(&data_dir).atomic_write_dir(&atomic_write_dir);
let object_store = ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish();
Ok(object_store)
}
/// Create metasrv client instance and spawn heartbeat loop.
async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOptions) -> Result<MetaClient> {
let cluster_id = 0; // TODO(hl): read from config
@@ -547,15 +382,28 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOptions) -> Re
Ok(meta_client)
}
pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result<RaftEngineLogStore> {
pub(crate) async fn create_log_store(
store_config: &ObjectStoreConfig,
wal_config: &WalConfig,
) -> Result<RaftEngineLogStore> {
let wal_dir = match (&wal_config.dir, store_config) {
(Some(dir), _) => dir.to_string(),
(None, ObjectStoreConfig::File(file_config)) => {
format!("{}{WAL_DIR}", util::normalize_dir(&file_config.data_home))
}
_ => return error::MissingWalDirConfigSnafu {}.fail(),
};
// create WAL directory
fs::create_dir_all(path::Path::new(&wal_config.dir)).context(error::CreateDirSnafu {
dir: &wal_config.dir,
})?;
info!("Creating logstore with config: {:?}", wal_config);
fs::create_dir_all(path::Path::new(&wal_dir))
.context(error::CreateDirSnafu { dir: &wal_dir })?;
info!(
"Creating logstore with config: {:?} and storage path: {}",
wal_config, &wal_dir
);
let log_config = LogConfig {
file_size: wal_config.file_size.0,
log_file_dir: wal_config.dir.clone(),
log_file_dir: wal_dir,
purge_interval: wal_config.purge_interval,
purge_threshold: wal_config.purge_threshold.0,
read_batch_size: wal_config.read_batch_size,
@@ -569,6 +417,7 @@ pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result<RaftEngin
}
pub(crate) async fn create_procedure_manager(
datanode_id: u64,
procedure_config: &ProcedureConfig,
object_store: ObjectStore,
) -> Result<ProcedureManagerRef> {
@@ -579,7 +428,12 @@ pub(crate) async fn create_procedure_manager(
let state_store = Arc::new(ObjectStateStore::new(object_store));
let dn_store_path = format!("{CLUSTER_DIR}dn-{datanode_id}/");
info!("The datanode internal storage path is: {}", dn_store_path);
let manager_config = ManagerConfig {
parent_path: dn_store_path,
max_retry_times: procedure_config.max_retry_times,
retry_delay: procedure_config.retry_delay,
..Default::default()

View File

@@ -23,5 +23,6 @@ pub mod metrics;
mod mock;
pub mod server;
pub mod sql;
mod store;
#[cfg(test)]
mod tests;

View File

@@ -32,7 +32,7 @@ impl Instance {
pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result<Self> {
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
Instance::new_with(opts, Some(meta_client), compaction_scheduler).await
Instance::new(opts, Some(meta_client), compaction_scheduler).await
}
}

108
src/datanode/src/store.rs Normal file
View File

@@ -0,0 +1,108 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! object storage utilities
mod fs;
mod oss;
mod s3;
use std::path;
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_telemetry::logging::info;
use object_store::layers::{LoggingLayer, LruCacheLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::services::Fs as FsBuilder;
use object_store::{ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;
use crate::datanode::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};
pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
let object_store = match store_config {
ObjectStoreConfig::File(file_config) => fs::new_fs_object_store(file_config).await,
ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await,
ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await,
}?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(store_config, ObjectStoreConfig::File(..)) {
let object_store = create_object_store_with_cache(object_store, store_config).await?;
object_store.layer(RetryLayer::new().with_jitter())
} else {
object_store
};
Ok(object_store
.layer(MetricsLayer)
.layer(
LoggingLayer::default()
// Print the expected error only in DEBUG level.
// See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level
.with_error_level(Some(log::Level::Debug)),
)
.layer(TracingLayer))
}
async fn create_object_store_with_cache(
object_store: ObjectStore,
store_config: &ObjectStoreConfig,
) -> Result<ObjectStore> {
let (cache_path, cache_capacity) = match store_config {
ObjectStoreConfig::S3(s3_config) => {
let path = s3_config.cache_path.as_ref();
let capacity = s3_config
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
ObjectStoreConfig::Oss(oss_config) => {
let path = oss_config.cache_path.as_ref();
let capacity = oss_config
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
_ => (None, ReadableSize(0)),
};
if let Some(path) = cache_path {
let atomic_temp_dir = format!("{path}/.tmp/");
clean_temp_dir(&atomic_temp_dir)?;
let cache_store = FsBuilder::default()
.root(path)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::InitBackendSnafu)?;
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.await
.context(error::InitBackendSnafu)?;
Ok(object_store.layer(cache_layer))
} else {
Ok(object_store)
}
}
pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
if path::Path::new(&dir).exists() {
info!("Begin to clean temp storage directory: {}", dir);
std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
info!("Cleaned temp storage directory: {}", dir);
}
Ok(())
}

View File

@@ -0,0 +1,43 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{fs, path};
use common_telemetry::logging::info;
use object_store::services::Fs as FsBuilder;
use object_store::{util, ObjectStore};
use snafu::prelude::*;
use crate::datanode::FileConfig;
use crate::error::{self, Result};
use crate::store;
pub(crate) async fn new_fs_object_store(file_config: &FileConfig) -> Result<ObjectStore> {
let data_home = util::normalize_dir(&file_config.data_home);
fs::create_dir_all(path::Path::new(&data_home))
.context(error::CreateDirSnafu { dir: &data_home })?;
info!("The file storage home is: {}", &data_home);
let atomic_write_dir = format!("{data_home}/.tmp/");
store::clean_temp_dir(&atomic_write_dir)?;
let mut builder = FsBuilder::default();
builder.root(&data_home).atomic_write_dir(&atomic_write_dir);
let object_store = ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish();
Ok(object_store)
}

View File

@@ -0,0 +1,42 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::logging::info;
use object_store::services::Oss as OSSBuilder;
use object_store::{util, ObjectStore};
use secrecy::ExposeSecret;
use snafu::prelude::*;
use crate::datanode::OssConfig;
use crate::error::{self, Result};
pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result<ObjectStore> {
let root = util::normalize_dir(&oss_config.root);
info!(
"The oss storage bucket is: {}, root is: {}",
oss_config.bucket, &root
);
let mut builder = OSSBuilder::default();
builder
.root(&root)
.bucket(&oss_config.bucket)
.endpoint(&oss_config.endpoint)
.access_key_id(oss_config.access_key_id.expose_secret())
.access_key_secret(oss_config.access_key_secret.expose_secret());
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}

View File

@@ -0,0 +1,49 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::logging::info;
use object_store::services::S3 as S3Builder;
use object_store::{util, ObjectStore};
use secrecy::ExposeSecret;
use snafu::prelude::*;
use crate::datanode::S3Config;
use crate::error::{self, Result};
pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectStore> {
let root = util::normalize_dir(&s3_config.root);
info!(
"The s3 storage bucket is: {}, root is: {}",
s3_config.bucket, &root
);
let mut builder = S3Builder::default();
builder
.root(&root)
.bucket(&s3_config.bucket)
.access_key_id(s3_config.access_key_id.expose_secret())
.secret_access_key(s3_config.secret_access_key.expose_secret());
if s3_config.endpoint.is_some() {
builder.endpoint(s3_config.endpoint.as_ref().unwrap());
}
if s3_config.region.is_some() {
builder.region(s3_config.region.as_ref().unwrap());
}
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}

View File

@@ -59,12 +59,12 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
let data_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
let opts = DatanodeOptions {
wal: WalConfig {
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
dir: Some(wal_tmp_dir.path().to_str().unwrap().to_string()),
..Default::default()
},
storage: StorageConfig {
store: ObjectStoreConfig::File(FileConfig {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
data_home: data_tmp_dir.path().to_str().unwrap().to_string(),
}),
..Default::default()
},

View File

@@ -181,11 +181,11 @@ fn test_region_name() {
#[test]
fn test_table_dir() {
assert_eq!(
"greptime/public/1024/",
"data/greptime/public/1024/",
table_dir("greptime", "public", 1024)
);
assert_eq!(
"0x4354a1/prometheus/1024/",
"data/0x4354a1/prometheus/1024/",
table_dir("0x4354a1", "prometheus", 1024)
);
}

View File

@@ -0,0 +1,18 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod lru_cache;
pub use lru_cache::*;
pub use opendal::layers::*;

View File

@@ -73,7 +73,7 @@ impl<I: Accessor, C: Accessor> Layer<I> for LruCacheLayer<C> {
fn layer(&self, inner: I) -> Self::LayeredAccessor {
LruCacheAccessor {
inner: Arc::new(inner),
inner,
cache: self.cache.clone(),
lru_cache: self.lru_cache.clone(),
}
@@ -82,7 +82,7 @@ impl<I: Accessor, C: Accessor> Layer<I> for LruCacheLayer<C> {
#[derive(Debug)]
pub struct LruCacheAccessor<I, C> {
inner: Arc<I>,
inner: I,
cache: Arc<C>,
lru_cache: Arc<Mutex<LruCache<String, ()>>>,
}

View File

@@ -15,11 +15,11 @@
pub use opendal::raw::normalize_path as raw_normalize_path;
pub use opendal::raw::oio::Pager;
pub use opendal::{
layers, services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Metakey,
services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Metakey,
Operator as ObjectStore, Reader, Result, Writer,
};
pub mod cache_policy;
pub mod layers;
mod metrics;
pub mod test_util;
pub mod util;

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use anyhow::Result;
use common_telemetry::{logging, metric};
use common_test_util::temp_dir::create_temp_dir;
use object_store::cache_policy::LruCacheLayer;
use object_store::layers::LruCacheLayer;
use object_store::services::{Fs, S3};
use object_store::test_util::TempFolder;
use object_store::{util, ObjectStore, ObjectStoreBuilder};

View File

@@ -14,13 +14,19 @@
use axum::Router;
use axum_test_helper::TestClient;
use common_test_util::ports;
use servers::http::{HttpOptions, HttpServerBuilder};
use table::test_util::MemTable;
use crate::{create_testing_grpc_query_handler, create_testing_sql_query_handler};
fn make_test_app() -> Router {
let server = HttpServerBuilder::new(HttpOptions::default())
let http_opts = HttpOptions {
addr: format!("127.0.0.1:{}", ports::get_port()),
..Default::default()
};
let server = HttpServerBuilder::new(http_opts)
.with_sql_handler(create_testing_sql_query_handler(
MemTable::default_numbers_table(),
))

View File

@@ -20,6 +20,7 @@ use async_trait::async_trait;
use axum::{http, Router};
use axum_test_helper::TestClient;
use common_query::Output;
use common_test_util::ports;
use datatypes::schema::Schema;
use query::parser::PromQuery;
use servers::error::{Error, Result};
@@ -93,8 +94,13 @@ impl SqlQueryHandler for DummyInstance {
}
fn make_test_app(tx: Arc<mpsc::Sender<(String, String)>>, db_name: Option<&str>) -> Router {
let http_opts = HttpOptions {
addr: format!("127.0.0.1:{}", ports::get_port()),
..Default::default()
};
let instance = Arc::new(DummyInstance { tx });
let mut server_builder = HttpServerBuilder::new(HttpOptions::default());
let mut server_builder = HttpServerBuilder::new(http_opts);
server_builder.with_sql_handler(instance.clone());
server_builder.with_grpc_handler(instance.clone());
let mut user_provider = MockUserProvider::default();

View File

@@ -19,6 +19,7 @@ use async_trait::async_trait;
use axum::Router;
use axum_test_helper::TestClient;
use common_query::Output;
use common_test_util::ports;
use datatypes::schema::Schema;
use query::parser::PromQuery;
use servers::error::{self, Result};
@@ -91,8 +92,13 @@ impl SqlQueryHandler for DummyInstance {
}
fn make_test_app(tx: mpsc::Sender<String>) -> Router {
let http_opts = HttpOptions {
addr: format!("127.0.0.1:{}", ports::get_port()),
..Default::default()
};
let instance = Arc::new(DummyInstance { tx });
let server = HttpServerBuilder::new(HttpOptions::default())
let server = HttpServerBuilder::new(http_opts)
.with_grpc_handler(instance.clone())
.with_sql_handler(instance.clone())
.with_opentsdb_handler(instance)

View File

@@ -22,6 +22,7 @@ use async_trait::async_trait;
use axum::Router;
use axum_test_helper::TestClient;
use common_query::Output;
use common_test_util::ports;
use datatypes::schema::Schema;
use prost::Message;
use query::parser::PromQuery;
@@ -116,8 +117,13 @@ impl SqlQueryHandler for DummyInstance {
}
fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
let http_opts = HttpOptions {
addr: format!("127.0.0.1:{}", ports::get_port()),
..Default::default()
};
let instance = Arc::new(DummyInstance { tx });
let server = HttpServerBuilder::new(HttpOptions::default())
let server = HttpServerBuilder::new(http_opts)
.with_grpc_handler(instance.clone())
.with_sql_handler(instance.clone())
.with_prom_handler(instance)

View File

@@ -15,6 +15,7 @@
use std::fmt::{self, Display};
use std::sync::Arc;
use common_base::paths::DATA_DIR;
use common_procedure::BoxedProcedure;
use store_api::storage::{RegionId, RegionNumber};
@@ -182,7 +183,7 @@ pub fn region_id(table_id: TableId, n: u32) -> RegionId {
#[inline]
pub fn table_dir(catalog_name: &str, schema_name: &str, table_id: TableId) -> String {
format!("{catalog_name}/{schema_name}/{table_id}/")
format!("{DATA_DIR}{catalog_name}/{schema_name}/{table_id}/")
}
#[cfg(test)]

View File

@@ -14,7 +14,6 @@
use std::env;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
@@ -24,6 +23,7 @@ use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE,
};
use common_runtime::Builder as RuntimeBuilder;
use common_test_util::ports;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datanode::datanode::{
DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, ProcedureConfig, S3Config,
@@ -38,8 +38,6 @@ use frontend::instance::Instance as FeInstance;
use object_store::services::{Oss, S3};
use object_store::test_util::TempFolder;
use object_store::ObjectStore;
use once_cell::sync::OnceCell;
use rand::Rng;
use secrecy::ExposeSecret;
use servers::grpc::GrpcServer;
use servers::http::{HttpOptions, HttpServerBuilder};
@@ -53,14 +51,6 @@ use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
use table::requests::{CreateTableRequest, TableOptions};
static PORTS: OnceCell<AtomicUsize> = OnceCell::new();
fn get_port() -> usize {
PORTS
.get_or_init(|| AtomicUsize::new(rand::thread_rng().gen_range(3500..3900)))
.fetch_add(1, Ordering::Relaxed)
}
#[derive(Debug, Eq, PartialEq)]
pub enum StorageType {
S3,
@@ -167,7 +157,7 @@ pub fn get_test_store_config(
(
ObjectStoreConfig::File(FileConfig {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
data_home: data_tmp_dir.path().to_str().unwrap().to_string(),
}),
TempDirGuard::File(data_tmp_dir),
)
@@ -220,7 +210,7 @@ pub fn create_tmp_dir_and_datanode_opts(
pub fn create_datanode_opts(store: ObjectStoreConfig, wal_dir: String) -> DatanodeOptions {
DatanodeOptions {
wal: WalConfig {
dir: wal_dir,
dir: Some(wal_dir),
..Default::default()
},
storage: StorageConfig {
@@ -300,7 +290,12 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
.await
.unwrap();
instance.start().await.unwrap();
let http_server = HttpServerBuilder::new(HttpOptions::default())
let http_opts = HttpOptions {
addr: format!("127.0.0.1:{}", ports::get_port()),
..Default::default()
};
let http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(Arc::new(
frontend_instance,
)))
@@ -327,8 +322,14 @@ pub async fn setup_test_http_app_with_frontend(
)
.await
.unwrap();
let http_opts = HttpOptions {
addr: format!("127.0.0.1:{}", ports::get_port()),
..Default::default()
};
let frontend_ref = Arc::new(frontend);
let http_server = HttpServerBuilder::new(HttpOptions::default())
let http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone()))
.with_script_handler(frontend_ref)
@@ -376,7 +377,7 @@ pub async fn setup_grpc_server(
.unwrap(),
);
let fe_grpc_addr = format!("127.0.0.1:{}", get_port());
let fe_grpc_addr = format!("127.0.0.1:{}", ports::get_port());
let fe_instance = FeInstance::try_new_standalone(instance.clone())
.await

View File

@@ -63,9 +63,10 @@ impl MockStandaloneInstance {
}
}
pub async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance {
pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance {
let (opts, guard) = create_tmp_dir_and_datanode_opts(StorageType::File, test_name);
let dn_instance = Arc::new(DatanodeInstance::new(&opts).await.unwrap());
let dn_instance = Arc::new(DatanodeInstance::with_opts(&opts).await.unwrap());
let frontend_instance = Instance::try_new_standalone(dn_instance.clone())
.await
.unwrap();

View File

@@ -5,7 +5,6 @@ rpc_hostname = '127.0.0.1'
rpc_runtime_size = 8
[wal]
dir = '{wal_dir}'
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '50GB'
@@ -14,7 +13,7 @@ sync_write = false
[storage]
type = 'File'
data_dir = '{data_dir}'
data_home = '{data_home}'
[meta_client_options]
metasrv_addrs = ['127.0.0.1:3002']
@@ -22,8 +21,6 @@ timeout_millis = 3000
connect_timeout_millis = 5000
tcp_nodelay = false
[procedure.store]
type = "File"
data_dir = "{procedure_dir}"
[procedure]
max_retry_times = 3
retry_delay = "500ms"

View File

@@ -2,7 +2,6 @@ mode = 'standalone'
enable_memory_catalog = false
[wal]
dir = '{wal_dir}'
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '50GB'
@@ -11,14 +10,12 @@ sync_write = false
[storage]
type = 'File'
data_dir = '{data_dir}'
data_home = '{data_home}'
[grpc_options]
addr = '127.0.0.1:4001'
runtime_size = 8
[procedure.store]
type = "File"
data_dir = "{procedure_dir}"
[procedure]
max_retry_times = 3
retry_delay = "500ms"

View File

@@ -211,14 +211,14 @@ impl Env {
#[derive(Serialize)]
struct Context {
wal_dir: String,
data_dir: String,
data_home: String,
procedure_dir: String,
}
let greptimedb_dir = format!("/tmp/greptimedb-{subcommand}-{}", db_ctx.time);
let ctx = Context {
wal_dir: format!("{greptimedb_dir}/wal/"),
data_dir: format!("{greptimedb_dir}/data/"),
data_home: format!("{greptimedb_dir}/"),
procedure_dir: format!("{greptimedb_dir}/procedure/"),
};
let rendered = tt.render(subcommand, &ctx).unwrap();