feat: reopen corresponding regions on starting datanode (#2399)

* separate config and datanode impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* partial implement of fetching region id list

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reopen all regions on starting region server

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness & assign default datanode id

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* set writable on lease

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply cr suggs.

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/datanode/src/datanode.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Ruihang Xia
2023-09-15 08:30:20 -05:00
committed by GitHub
parent 0bd6b9bb39
commit c149c123c3
22 changed files with 685 additions and 564 deletions

View File

@@ -16,8 +16,8 @@ use std::time::Duration;
use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::builder::DatanodeBuilder;
use datanode::datanode::{Datanode, DatanodeOptions};
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
use meta_client::MetaClientOptions;
use servers::Mode;
use snafu::ResultExt;
@@ -163,7 +163,7 @@ impl StartCommand {
logging::info!("Datanode start command: {:#?}", self);
logging::info!("Datanode options: {:#?}", opts);
let datanode = DatanodeBuilder::new(opts, Default::default())
let datanode = DatanodeBuilder::new(opts, None, Default::default())
.build()
.await
.context(StartDatanodeSnafu)?;
@@ -179,9 +179,7 @@ mod tests {
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::datanode::{
CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig,
};
use datanode::config::{CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig};
use servers::Mode;
use super::*;

View File

@@ -15,7 +15,7 @@
use common_config::KvStoreConfig;
use common_telemetry::logging::LoggingOptions;
use config::{Config, Environment, File, FileFormat};
use datanode::datanode::{DatanodeOptions, ProcedureConfig};
use datanode::config::{DatanodeOptions, ProcedureConfig};
use frontend::frontend::FrontendOptions;
use meta_srv::metasrv::MetaSrvOptions;
use serde::{Deserialize, Serialize};
@@ -26,6 +26,7 @@ use crate::error::{LoadLayeredConfigSnafu, Result};
pub const ENV_VAR_SEP: &str = "__";
pub const ENV_LIST_SEP: &str = ",";
/// Options mixed up from datanode, frontend and metasrv.
pub struct MixOptions {
pub data_home: String,
pub procedure_cfg: ProcedureConfig,
@@ -119,7 +120,7 @@ mod tests {
use std::time::Duration;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::config::{DatanodeOptions, ObjectStoreConfig};
use super::*;

View File

@@ -23,8 +23,8 @@ use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use datanode::datanode::builder::DatanodeBuilder;
use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig};
use datanode::config::{DatanodeOptions, ProcedureConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
@@ -138,6 +138,7 @@ impl StandaloneOptions {
fn datanode_options(self) -> DatanodeOptions {
DatanodeOptions {
node_id: Some(0),
enable_telemetry: self.enable_telemetry,
wal: self.wal,
storage: self.storage,
@@ -306,10 +307,11 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;
let datanode = DatanodeBuilder::new(dn_opts.clone(), plugins.clone())
.build()
.await
.context(StartDatanodeSnafu)?;
let datanode =
DatanodeBuilder::new(dn_opts.clone(), Some(kv_store.clone()), plugins.clone())
.build()
.await
.context(StartDatanodeSnafu)?;
let region_server = datanode.region_server();
let catalog_manager = KvBackendCatalogManager::new(
@@ -468,7 +470,7 @@ mod tests {
assert!(fe_opts.influxdb_options.enable);
match &dn_opts.storage.store {
datanode::datanode::ObjectStoreConfig::S3(s3_config) => {
datanode::config::ObjectStoreConfig::S3(s3_config) => {
assert_eq!(
"Secret([REDACTED alloc::string::String])".to_string(),
format!("{:?}", s3_config.access_key_id)

View File

@@ -42,10 +42,14 @@ const MAX_CLOSE_RETRY_TIMES: usize = 10;
///
/// [RegionAliveKeeper] starts a [CountdownTask] for each region. When deadline is reached,
/// the region will be closed.
///
/// The deadline is controlled by Metasrv. It works like "lease" for regions: a Datanode submits its
/// opened regions to Metasrv, in heartbeats. If Metasrv decides some region could be resided in this
/// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to
/// countdown.
///
/// On each lease extension, [RegionAliveKeeper] will reset the deadline to the corresponding time, and
/// set region's status to "writable".
pub struct RegionAliveKeeper {
region_server: RegionServer,
tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
@@ -313,6 +317,7 @@ impl CountdownTask {
"Reset deadline of region {region_id} to approximately {} seconds later",
(deadline - Instant::now()).as_secs_f32(),
);
let _ = self.region_server.set_writable(self.region_id, true);
countdown.set(tokio::time::sleep_until(deadline));
}
// Else the countdown could be either:

405
src/datanode/src/config.rs Normal file
View File

@@ -0,0 +1,405 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Datanode configurations
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_config::WalConfig;
pub use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::LoggingOptions;
use meta_client::MetaClientOptions;
use mito2::config::MitoConfig;
use secrecy::SecretString;
use serde::{Deserialize, Serialize};
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::Mode;
use storage::config::{
EngineConfig as StorageEngineConfig, DEFAULT_AUTO_FLUSH_INTERVAL, DEFAULT_MAX_FLUSH_TASKS,
DEFAULT_PICKER_SCHEDULE_INTERVAL, DEFAULT_REGION_WRITE_BUFFER_SIZE,
};
use storage::scheduler::SchedulerConfig;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024);
/// Default data home in file storage
const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb";
/// Object storage config
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ObjectStoreConfig {
File(FileConfig),
S3(S3Config),
Oss(OssConfig),
Azblob(AzblobConfig),
Gcs(GcsConfig),
}
/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct StorageConfig {
/// Retention period for all tables.
///
/// Default value is `None`, which means no TTL.
///
/// The precedence order is: ttl in table options > global ttl.
#[serde(with = "humantime_serde")]
pub global_ttl: Option<Duration>,
/// The working directory of database
pub data_home: String,
#[serde(flatten)]
pub store: ObjectStoreConfig,
pub compaction: CompactionConfig,
pub manifest: RegionManifestConfig,
pub flush: FlushConfig,
}
impl Default for StorageConfig {
fn default() -> Self {
Self {
global_ttl: None,
data_home: DEFAULT_DATA_HOME.to_string(),
store: ObjectStoreConfig::default(),
compaction: CompactionConfig::default(),
manifest: RegionManifestConfig::default(),
flush: FlushConfig::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Default, Deserialize)]
#[serde(default)]
pub struct FileConfig {}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct S3Config {
pub bucket: String,
pub root: String,
#[serde(skip_serializing)]
pub access_key_id: SecretString,
#[serde(skip_serializing)]
pub secret_access_key: SecretString,
pub endpoint: Option<String>,
pub region: Option<String>,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct OssConfig {
pub bucket: String,
pub root: String,
#[serde(skip_serializing)]
pub access_key_id: SecretString,
#[serde(skip_serializing)]
pub access_key_secret: SecretString,
pub endpoint: String,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct AzblobConfig {
pub container: String,
pub root: String,
#[serde(skip_serializing)]
pub account_name: SecretString,
#[serde(skip_serializing)]
pub account_key: SecretString,
pub endpoint: String,
pub sas_token: Option<String>,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct GcsConfig {
pub root: String,
pub bucket: String,
pub scope: String,
#[serde(skip_serializing)]
pub credential_path: SecretString,
pub endpoint: String,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
}
impl Default for S3Config {
fn default() -> Self {
Self {
bucket: String::default(),
root: String::default(),
access_key_id: SecretString::from(String::default()),
secret_access_key: SecretString::from(String::default()),
endpoint: Option::default(),
region: Option::default(),
cache_path: Option::default(),
cache_capacity: Option::default(),
}
}
}
impl Default for OssConfig {
fn default() -> Self {
Self {
bucket: String::default(),
root: String::default(),
access_key_id: SecretString::from(String::default()),
access_key_secret: SecretString::from(String::default()),
endpoint: String::default(),
cache_path: Option::default(),
cache_capacity: Option::default(),
}
}
}
impl Default for AzblobConfig {
fn default() -> Self {
Self {
container: String::default(),
root: String::default(),
account_name: SecretString::from(String::default()),
account_key: SecretString::from(String::default()),
endpoint: String::default(),
cache_path: Option::default(),
cache_capacity: Option::default(),
sas_token: Option::default(),
}
}
}
impl Default for GcsConfig {
fn default() -> Self {
Self {
root: String::default(),
bucket: String::default(),
scope: String::default(),
credential_path: SecretString::from(String::default()),
endpoint: String::default(),
cache_path: Option::default(),
cache_capacity: Option::default(),
}
}
}
impl Default for ObjectStoreConfig {
fn default() -> Self {
ObjectStoreConfig::File(FileConfig {})
}
}
/// Options for region manifest
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(default)]
pub struct RegionManifestConfig {
/// Region manifest checkpoint actions margin.
/// Manifest service create a checkpoint every [checkpoint_margin] actions.
pub checkpoint_margin: Option<u16>,
/// Region manifest logs and checkpoints gc task execution duration.
#[serde(with = "humantime_serde")]
pub gc_duration: Option<Duration>,
/// Whether to compress manifest and checkpoint file by gzip
pub compress: bool,
}
impl Default for RegionManifestConfig {
fn default() -> Self {
Self {
checkpoint_margin: Some(10u16),
gc_duration: Some(Duration::from_secs(600)),
compress: false,
}
}
}
/// Options for table compaction
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(default)]
pub struct CompactionConfig {
/// Max task number that can concurrently run.
pub max_inflight_tasks: usize,
/// Max files in level 0 to trigger compaction.
pub max_files_in_level0: usize,
/// Max task number for SST purge task after compaction.
pub max_purge_tasks: usize,
/// Buffer threshold while writing SST files
pub sst_write_buffer_size: ReadableSize,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
max_inflight_tasks: 4,
max_files_in_level0: 8,
max_purge_tasks: 32,
sst_write_buffer_size: ReadableSize::mb(8),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(default)]
pub struct FlushConfig {
/// Max inflight flush tasks.
pub max_flush_tasks: usize,
/// Default write buffer size for a region.
pub region_write_buffer_size: ReadableSize,
/// Interval to schedule auto flush picker to find region to flush.
#[serde(with = "humantime_serde")]
pub picker_schedule_interval: Duration,
/// Interval to auto flush a region if it has not flushed yet.
#[serde(with = "humantime_serde")]
pub auto_flush_interval: Duration,
/// Global write buffer size for all regions.
pub global_write_buffer_size: Option<ReadableSize>,
}
impl Default for FlushConfig {
fn default() -> Self {
Self {
max_flush_tasks: DEFAULT_MAX_FLUSH_TASKS,
region_write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE,
picker_schedule_interval: Duration::from_millis(
DEFAULT_PICKER_SCHEDULE_INTERVAL.into(),
),
auto_flush_interval: Duration::from_millis(DEFAULT_AUTO_FLUSH_INTERVAL.into()),
global_write_buffer_size: None,
}
}
}
impl From<&DatanodeOptions> for SchedulerConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
max_inflight_tasks: value.storage.compaction.max_inflight_tasks,
}
}
}
impl From<&DatanodeOptions> for StorageEngineConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
compress_manifest: value.storage.manifest.compress,
manifest_checkpoint_margin: value.storage.manifest.checkpoint_margin,
manifest_gc_duration: value.storage.manifest.gc_duration,
max_files_in_l0: value.storage.compaction.max_files_in_level0,
max_purge_tasks: value.storage.compaction.max_purge_tasks,
sst_write_buffer_size: value.storage.compaction.sst_write_buffer_size,
max_flush_tasks: value.storage.flush.max_flush_tasks,
region_write_buffer_size: value.storage.flush.region_write_buffer_size,
picker_schedule_interval: value.storage.flush.picker_schedule_interval,
auto_flush_interval: value.storage.flush.auto_flush_interval,
global_write_buffer_size: value.storage.flush.global_write_buffer_size,
global_ttl: value.storage.global_ttl,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct DatanodeOptions {
pub mode: Mode,
pub node_id: Option<u64>,
pub rpc_addr: String,
pub rpc_hostname: Option<String>,
pub rpc_runtime_size: usize,
pub heartbeat: HeartbeatOptions,
pub http_opts: HttpOptions,
pub meta_client_options: Option<MetaClientOptions>,
pub wal: WalConfig,
pub storage: StorageConfig,
/// Options for different store engines.
pub region_engine: Vec<RegionEngineConfig>,
pub logging: LoggingOptions,
pub enable_telemetry: bool,
}
impl Default for DatanodeOptions {
fn default() -> Self {
Self {
mode: Mode::Standalone,
node_id: None,
rpc_addr: "127.0.0.1:3001".to_string(),
rpc_hostname: None,
rpc_runtime_size: 8,
http_opts: HttpOptions::default(),
meta_client_options: None,
wal: WalConfig::default(),
storage: StorageConfig::default(),
region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())],
logging: LoggingOptions::default(),
heartbeat: HeartbeatOptions::default(),
enable_telemetry: true,
}
}
}
impl DatanodeOptions {
pub fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["meta_client_options.metasrv_addrs"])
}
pub fn to_toml_string(&self) -> String {
toml::to_string(&self).unwrap()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum RegionEngineConfig {
#[serde(rename = "mito")]
Mito(MitoConfig),
}
#[cfg(test)]
mod tests {
use secrecy::ExposeSecret;
use super::*;
#[test]
fn test_toml() {
let opts = DatanodeOptions::default();
let toml_string = toml::to_string(&opts).unwrap();
let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
}
#[test]
fn test_secstr() {
let toml_str = r#"
[storage]
type = "S3"
access_key_id = "access_key_id"
secret_access_key = "secret_access_key"
"#;
let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
match opts.storage.store {
ObjectStoreConfig::S3(cfg) => {
assert_eq!(
"Secret([REDACTED alloc::string::String])".to_string(),
format!("{:?}", cfg.access_key_id)
);
assert_eq!("access_key_id", cfg.access_key_id.expose_secret());
}
_ => unreachable!(),
}
}
}

View File

@@ -12,390 +12,52 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Datanode configurations
pub mod builder;
//! Datanode implementation.
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use catalog::kvbackend::MetaKvBackend;
use catalog::memory::MemoryCatalogManager;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_config::WalConfig;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::key::datanode_table::DatanodeTableManager;
use common_meta::kv_backend::KvBackendRef;
pub use common_procedure::options::ProcedureConfig;
use common_runtime::Runtime;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use common_telemetry::{error, info};
use futures_util::StreamExt;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::MetaClientOptions;
use mito2::config::MitoConfig;
use meta_client::client::MetaClient;
use mito2::engine::MitoEngine;
use object_store::util::normalize_dir;
use query::QueryEngineFactory;
use secrecy::SecretString;
use serde::{Deserialize, Serialize};
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::Mode;
use snafu::ResultExt;
use storage::config::{
EngineConfig as StorageEngineConfig, DEFAULT_AUTO_FLUSH_INTERVAL, DEFAULT_MAX_FLUSH_TASKS,
DEFAULT_PICKER_SCHEDULE_INTERVAL, DEFAULT_REGION_WRITE_BUFFER_SIZE,
};
use storage::scheduler::SchedulerConfig;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::path_utils::WAL_DIR;
use store_api::region_engine::RegionEngineRef;
use store_api::region_request::{RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;
use tokio::fs;
use crate::config::{DatanodeOptions, RegionEngineConfig};
use crate::error::{
CreateDirSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu,
CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu, MissingMetaClientSnafu,
MissingMetasrvOptsSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu,
ShutdownInstanceSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::{new_metasrv_client, HeartbeatTask};
use crate::region_server::RegionServer;
use crate::server::Services;
use crate::store;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024);
/// Default data home in file storage
const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb";
/// Object storage config
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ObjectStoreConfig {
File(FileConfig),
S3(S3Config),
Oss(OssConfig),
Azblob(AzblobConfig),
Gcs(GcsConfig),
}
/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct StorageConfig {
/// Retention period for all tables.
///
/// Default value is `None`, which means no TTL.
///
/// The precedence order is: ttl in table options > global ttl.
#[serde(with = "humantime_serde")]
pub global_ttl: Option<Duration>,
/// The working directory of database
pub data_home: String,
#[serde(flatten)]
pub store: ObjectStoreConfig,
pub compaction: CompactionConfig,
pub manifest: RegionManifestConfig,
pub flush: FlushConfig,
}
impl Default for StorageConfig {
fn default() -> Self {
Self {
global_ttl: None,
data_home: DEFAULT_DATA_HOME.to_string(),
store: ObjectStoreConfig::default(),
compaction: CompactionConfig::default(),
manifest: RegionManifestConfig::default(),
flush: FlushConfig::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Default, Deserialize)]
#[serde(default)]
pub struct FileConfig {}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct S3Config {
pub bucket: String,
pub root: String,
#[serde(skip_serializing)]
pub access_key_id: SecretString,
#[serde(skip_serializing)]
pub secret_access_key: SecretString,
pub endpoint: Option<String>,
pub region: Option<String>,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct OssConfig {
pub bucket: String,
pub root: String,
#[serde(skip_serializing)]
pub access_key_id: SecretString,
#[serde(skip_serializing)]
pub access_key_secret: SecretString,
pub endpoint: String,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct AzblobConfig {
pub container: String,
pub root: String,
#[serde(skip_serializing)]
pub account_name: SecretString,
#[serde(skip_serializing)]
pub account_key: SecretString,
pub endpoint: String,
pub sas_token: Option<String>,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct GcsConfig {
pub root: String,
pub bucket: String,
pub scope: String,
#[serde(skip_serializing)]
pub credential_path: SecretString,
pub endpoint: String,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
}
impl Default for S3Config {
fn default() -> Self {
Self {
bucket: String::default(),
root: String::default(),
access_key_id: SecretString::from(String::default()),
secret_access_key: SecretString::from(String::default()),
endpoint: Option::default(),
region: Option::default(),
cache_path: Option::default(),
cache_capacity: Option::default(),
}
}
}
impl Default for OssConfig {
fn default() -> Self {
Self {
bucket: String::default(),
root: String::default(),
access_key_id: SecretString::from(String::default()),
access_key_secret: SecretString::from(String::default()),
endpoint: String::default(),
cache_path: Option::default(),
cache_capacity: Option::default(),
}
}
}
impl Default for AzblobConfig {
fn default() -> Self {
Self {
container: String::default(),
root: String::default(),
account_name: SecretString::from(String::default()),
account_key: SecretString::from(String::default()),
endpoint: String::default(),
cache_path: Option::default(),
cache_capacity: Option::default(),
sas_token: Option::default(),
}
}
}
impl Default for GcsConfig {
fn default() -> Self {
Self {
root: String::default(),
bucket: String::default(),
scope: String::default(),
credential_path: SecretString::from(String::default()),
endpoint: String::default(),
cache_path: Option::default(),
cache_capacity: Option::default(),
}
}
}
impl Default for ObjectStoreConfig {
fn default() -> Self {
ObjectStoreConfig::File(FileConfig {})
}
}
/// Options for region manifest
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(default)]
pub struct RegionManifestConfig {
/// Region manifest checkpoint actions margin.
/// Manifest service create a checkpoint every [checkpoint_margin] actions.
pub checkpoint_margin: Option<u16>,
/// Region manifest logs and checkpoints gc task execution duration.
#[serde(with = "humantime_serde")]
pub gc_duration: Option<Duration>,
/// Whether to compress manifest and checkpoint file by gzip
pub compress: bool,
}
impl Default for RegionManifestConfig {
fn default() -> Self {
Self {
checkpoint_margin: Some(10u16),
gc_duration: Some(Duration::from_secs(600)),
compress: false,
}
}
}
/// Options for table compaction
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(default)]
pub struct CompactionConfig {
/// Max task number that can concurrently run.
pub max_inflight_tasks: usize,
/// Max files in level 0 to trigger compaction.
pub max_files_in_level0: usize,
/// Max task number for SST purge task after compaction.
pub max_purge_tasks: usize,
/// Buffer threshold while writing SST files
pub sst_write_buffer_size: ReadableSize,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
max_inflight_tasks: 4,
max_files_in_level0: 8,
max_purge_tasks: 32,
sst_write_buffer_size: ReadableSize::mb(8),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(default)]
pub struct FlushConfig {
/// Max inflight flush tasks.
pub max_flush_tasks: usize,
/// Default write buffer size for a region.
pub region_write_buffer_size: ReadableSize,
/// Interval to schedule auto flush picker to find region to flush.
#[serde(with = "humantime_serde")]
pub picker_schedule_interval: Duration,
/// Interval to auto flush a region if it has not flushed yet.
#[serde(with = "humantime_serde")]
pub auto_flush_interval: Duration,
/// Global write buffer size for all regions.
pub global_write_buffer_size: Option<ReadableSize>,
}
impl Default for FlushConfig {
fn default() -> Self {
Self {
max_flush_tasks: DEFAULT_MAX_FLUSH_TASKS,
region_write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE,
picker_schedule_interval: Duration::from_millis(
DEFAULT_PICKER_SCHEDULE_INTERVAL.into(),
),
auto_flush_interval: Duration::from_millis(DEFAULT_AUTO_FLUSH_INTERVAL.into()),
global_write_buffer_size: None,
}
}
}
impl From<&DatanodeOptions> for SchedulerConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
max_inflight_tasks: value.storage.compaction.max_inflight_tasks,
}
}
}
impl From<&DatanodeOptions> for StorageEngineConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
compress_manifest: value.storage.manifest.compress,
manifest_checkpoint_margin: value.storage.manifest.checkpoint_margin,
manifest_gc_duration: value.storage.manifest.gc_duration,
max_files_in_l0: value.storage.compaction.max_files_in_level0,
max_purge_tasks: value.storage.compaction.max_purge_tasks,
sst_write_buffer_size: value.storage.compaction.sst_write_buffer_size,
max_flush_tasks: value.storage.flush.max_flush_tasks,
region_write_buffer_size: value.storage.flush.region_write_buffer_size,
picker_schedule_interval: value.storage.flush.picker_schedule_interval,
auto_flush_interval: value.storage.flush.auto_flush_interval,
global_write_buffer_size: value.storage.flush.global_write_buffer_size,
global_ttl: value.storage.global_ttl,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct DatanodeOptions {
pub mode: Mode,
pub node_id: Option<u64>,
pub rpc_addr: String,
pub rpc_hostname: Option<String>,
pub rpc_runtime_size: usize,
pub heartbeat: HeartbeatOptions,
pub http_opts: HttpOptions,
pub meta_client_options: Option<MetaClientOptions>,
pub wal: WalConfig,
pub storage: StorageConfig,
/// Options for different store engines.
pub region_engine: Vec<RegionEngineConfig>,
pub logging: LoggingOptions,
pub enable_telemetry: bool,
}
impl Default for DatanodeOptions {
fn default() -> Self {
Self {
mode: Mode::Standalone,
node_id: None,
rpc_addr: "127.0.0.1:3001".to_string(),
rpc_hostname: None,
rpc_runtime_size: 8,
http_opts: HttpOptions::default(),
meta_client_options: None,
wal: WalConfig::default(),
storage: StorageConfig::default(),
region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())],
logging: LoggingOptions::default(),
heartbeat: HeartbeatOptions::default(),
enable_telemetry: true,
}
}
}
impl DatanodeOptions {
pub fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["meta_client_options.metasrv_addrs"])
}
pub fn to_toml_string(&self) -> String {
toml::to_string(&self).unwrap()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum RegionEngineConfig {
#[serde(rename = "mito")]
Mito(MitoConfig),
}
/// Datanode service.
pub struct Datanode {
opts: DatanodeOptions,
@@ -406,38 +68,6 @@ pub struct Datanode {
}
impl Datanode {
async fn new_region_server(
opts: &DatanodeOptions,
plugins: Arc<Plugins>,
) -> Result<RegionServer> {
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in datanode only executes plan with resolved table source.
MemoryCatalogManager::with_default_setup(),
None,
false,
plugins,
);
let query_engine = query_engine_factory.query_engine();
let runtime = Arc::new(
Runtime::builder()
.worker_threads(opts.rpc_runtime_size)
.thread_name("io-handlers")
.build()
.context(RuntimeResourceSnafu)?,
);
let mut region_server = RegionServer::new(query_engine.clone(), runtime.clone());
let log_store = Self::build_log_store(opts).await?;
let object_store = store::new_object_store(opts).await?;
let engines = Self::build_store_engines(opts, log_store, object_store).await?;
for engine in engines {
region_server.register_engine(engine);
}
Ok(region_server)
}
pub async fn start(&mut self) -> Result<()> {
info!("Starting datanode instance...");
@@ -489,8 +119,189 @@ impl Datanode {
pub fn region_server(&self) -> RegionServer {
self.region_server.clone()
}
}
// internal utils
pub struct DatanodeBuilder {
opts: DatanodeOptions,
plugins: Arc<Plugins>,
meta_client: Option<MetaClient>,
kv_backend: Option<KvBackendRef>,
}
impl DatanodeBuilder {
/// `kv_backend` is optional. If absent, the builder will try to build one
/// by using the given `opts`
pub fn new(
opts: DatanodeOptions,
kv_backend: Option<KvBackendRef>,
plugins: Arc<Plugins>,
) -> Self {
Self {
opts,
plugins,
meta_client: None,
kv_backend,
}
}
pub fn with_meta_client(self, meta_client: MetaClient) -> Self {
Self {
meta_client: Some(meta_client),
..self
}
}
pub async fn build(mut self) -> Result<Datanode> {
let mode = &self.opts.mode;
// build meta client
let meta_client = match mode {
Mode::Distributed => {
let meta_client = if let Some(meta_client) = self.meta_client.take() {
meta_client
} else {
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
let meta_config = self
.opts
.meta_client_options
.as_ref()
.context(MissingMetasrvOptsSnafu)?;
new_metasrv_client(node_id, meta_config).await?
};
Some(meta_client)
}
Mode::Standalone => None,
};
// build kv-backend
let kv_backend = match mode {
Mode::Distributed => Arc::new(MetaKvBackend {
client: Arc::new(meta_client.clone().context(MissingMetaClientSnafu)?),
}),
Mode::Standalone => self.kv_backend.clone().context(MissingKvBackendSnafu)?,
};
// build and initialize region server
let log_store = Self::build_log_store(&self.opts).await?;
let region_server =
Self::new_region_server(&self.opts, self.plugins.clone(), log_store).await?;
self.initialize_region_server(&region_server, kv_backend, matches!(mode, Mode::Standalone))
.await?;
let heartbeat_task = match mode {
Mode::Distributed => {
let meta_client = meta_client.context(MissingMetaClientSnafu)?;
let heartbeat_task =
HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?;
Some(heartbeat_task)
}
Mode::Standalone => None,
};
let services = match mode {
Mode::Distributed => Some(Services::try_new(region_server.clone(), &self.opts).await?),
Mode::Standalone => None,
};
let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
Some(self.opts.storage.data_home.clone()),
mode,
self.opts.enable_telemetry,
)
.await;
Ok(Datanode {
opts: self.opts,
services,
heartbeat_task,
region_server,
greptimedb_telemetry_task,
})
}
/// Open all regions belong to this datanode.
async fn initialize_region_server(
&self,
region_server: &RegionServer,
kv_backend: KvBackendRef,
open_with_writable: bool,
) -> Result<()> {
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
let mut regions = vec![];
let mut table_values = datanode_table_manager.tables(node_id);
while let Some(table_value) = table_values.next().await {
let table_value = table_value.context(GetMetadataSnafu)?;
for region_number in table_value.regions {
regions.push((
RegionId::new(table_value.table_id, region_number),
table_value.engine.clone(),
table_value.region_storage_path.clone(),
));
}
}
info!("going to open {} regions", regions.len());
for (region_id, engine, region_dir) in regions {
region_server
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: engine.clone(),
region_dir,
options: HashMap::new(),
}),
)
.await?;
if open_with_writable {
if let Err(e) = region_server.set_writable(region_id, true) {
error!(
e; "failed to set writable for region {region_id}"
);
}
}
}
info!("region server is initialized");
Ok(())
}
async fn new_region_server(
opts: &DatanodeOptions,
plugins: Arc<Plugins>,
log_store: Arc<RaftEngineLogStore>,
) -> Result<RegionServer> {
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in datanode only executes plan with resolved table source.
MemoryCatalogManager::with_default_setup(),
None,
false,
plugins,
);
let query_engine = query_engine_factory.query_engine();
let runtime = Arc::new(
Runtime::builder()
.worker_threads(opts.rpc_runtime_size)
.thread_name("io-handlers")
.build()
.context(RuntimeResourceSnafu)?,
);
let mut region_server = RegionServer::new(query_engine.clone(), runtime.clone());
let object_store = store::new_object_store(opts).await?;
let engines = Self::build_store_engines(opts, log_store, object_store).await?;
for engine in engines {
region_server.register_engine(engine);
}
Ok(region_server)
}
/// Build [RaftEngineLogStore]
async fn build_log_store(opts: &DatanodeOptions) -> Result<Arc<RaftEngineLogStore>> {
@@ -535,38 +346,3 @@ impl Datanode {
Ok(engines)
}
}
#[cfg(test)]
mod tests {
use secrecy::ExposeSecret;
use super::*;
#[test]
fn test_toml() {
let opts = DatanodeOptions::default();
let toml_string = toml::to_string(&opts).unwrap();
let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
}
#[test]
fn test_secstr() {
let toml_str = r#"
[storage]
type = "S3"
access_key_id = "access_key_id"
secret_access_key = "secret_access_key"
"#;
let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
match opts.storage.store {
ObjectStoreConfig::S3(cfg) => {
assert_eq!(
"Secret([REDACTED alloc::string::String])".to_string(),
format!("{:?}", cfg.access_key_id)
);
assert_eq!("access_key_id", cfg.access_key_id.expose_secret());
}
_ => unreachable!(),
}
}
}

View File

@@ -1,98 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_base::Plugins;
use meta_client::client::MetaClient;
use servers::Mode;
use snafu::OptionExt;
use crate::datanode::{Datanode, DatanodeOptions};
use crate::error::{MissingMetasrvOptsSnafu, MissingNodeIdSnafu, Result};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::{new_metasrv_client, HeartbeatTask};
use crate::server::Services;
pub struct DatanodeBuilder {
opts: DatanodeOptions,
plugins: Arc<Plugins>,
meta_client: Option<MetaClient>,
}
impl DatanodeBuilder {
pub fn new(opts: DatanodeOptions, plugins: Arc<Plugins>) -> Self {
Self {
opts,
plugins,
meta_client: None,
}
}
pub fn with_meta_client(self, meta_client: MetaClient) -> Self {
Self {
meta_client: Some(meta_client),
..self
}
}
pub async fn build(mut self) -> Result<Datanode> {
let region_server = Datanode::new_region_server(&self.opts, self.plugins.clone()).await?;
let mode = &self.opts.mode;
let heartbeat_task = match mode {
Mode::Distributed => {
let meta_client = if let Some(meta_client) = self.meta_client.take() {
meta_client
} else {
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
let meta_config = self
.opts
.meta_client_options
.as_ref()
.context(MissingMetasrvOptsSnafu)?;
new_metasrv_client(node_id, meta_config).await?
};
let heartbeat_task =
HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?;
Some(heartbeat_task)
}
Mode::Standalone => None,
};
let services = match mode {
Mode::Distributed => Some(Services::try_new(region_server.clone(), &self.opts).await?),
Mode::Standalone => None,
};
let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
Some(self.opts.storage.data_home.clone()),
mode,
self.opts.enable_telemetry,
)
.await;
Ok(Datanode {
opts: self.opts,
services,
heartbeat_task,
region_server,
greptimedb_telemetry_task,
})
}
}

View File

@@ -94,6 +94,12 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Failed to get info from meta server, source: {}", source))]
GetMetadata {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to execute sql, source: {}", source))]
ExecuteSql {
location: Location,
@@ -266,6 +272,12 @@ pub enum Error {
source: common_runtime::error::Error,
},
#[snafu(display("Expect KvBackend but not found"))]
MissingKvBackend { location: Location },
#[snafu(display("Expect MetaClient but not found, location: {}", location))]
MissingMetaClient { location: Location },
#[snafu(display("Invalid SQL, error: {}", msg))]
InvalidSql { msg: String },
@@ -366,10 +378,10 @@ pub enum Error {
source: table::error::Error,
},
#[snafu(display("Missing node id option in distributed mode"))]
#[snafu(display("Missing node id in Datanode config, location: {}", location))]
MissingNodeId { location: Location },
#[snafu(display("Missing node id option in distributed mode"))]
#[snafu(display("Missing node id option in distributed mode, location: {}", location))]
MissingMetasrvOpts { location: Location },
#[snafu(display("Missing required field: {}", name))]
@@ -587,7 +599,9 @@ impl ErrorExt for Error {
| ExecuteLogicalPlan { source, .. } => source.status_code(),
BuildRegionRequests { source, .. } => source.status_code(),
HandleHeartbeatResponse { source, .. } => source.status_code(),
HandleHeartbeatResponse { source, .. } | GetMetadata { source, .. } => {
source.status_code()
}
DecodeLogicalPlan { source, .. } => source.status_code(),
RegisterSchema { source, .. } => source.status_code(),
@@ -629,7 +643,9 @@ impl ErrorExt for Error {
| MissingWalDirConfig { .. }
| PrepareImmutableTable { .. }
| InvalidInsertRowLen { .. }
| ColumnDataType { .. } => StatusCode::InvalidArguments,
| ColumnDataType { .. }
| MissingKvBackend { .. }
| MissingMetaClient { .. } => StatusCode::InvalidArguments,
EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } | Unexpected { .. } => {
StatusCode::Unexpected

View File

@@ -33,7 +33,7 @@ use tokio::time::Instant;
use self::handler::RegionHeartbeatResponseHandler;
use crate::alive_keeper::RegionAliveKeeper;
use crate::datanode::DatanodeOptions;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::region_server::RegionServer;

View File

@@ -18,6 +18,7 @@
use query::query_engine::SqlStatementExecutor;
pub mod alive_keeper;
pub mod config;
pub mod datanode;
pub mod error;
mod greptimedb_telemetry;

View File

@@ -96,6 +96,17 @@ impl RegionServer {
.collect()
}
pub fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<()> {
let engine = self
.inner
.region_map
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?;
engine
.set_writable(region_id, writable)
.with_context(|_| HandleRegionRequestSnafu { region_id })
}
pub fn runtime(&self) -> Arc<Runtime> {
self.inner.runtime.clone()
}

View File

@@ -22,7 +22,7 @@ use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
use snafu::ResultExt;
use crate::datanode::DatanodeOptions;
use crate::config::DatanodeOptions;
use crate::error::{
ParseAddrSnafu, Result, ShutdownServerSnafu, StartServerSnafu, WaitForGrpcServingSnafu,
};

View File

@@ -32,7 +32,7 @@ use object_store::util::normalize_dir;
use object_store::{util, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::config::{DatanodeOptions, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};
pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result<ObjectStore> {

View File

@@ -18,7 +18,7 @@ use object_store::{util, ObjectStore};
use secrecy::ExposeSecret;
use snafu::prelude::*;
use crate::datanode::AzblobConfig;
use crate::config::AzblobConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;

View File

@@ -19,7 +19,7 @@ use object_store::services::Fs as FsBuilder;
use object_store::ObjectStore;
use snafu::prelude::*;
use crate::datanode::FileConfig;
use crate::config::FileConfig;
use crate::error::{self, Result};
use crate::store;

View File

@@ -18,7 +18,7 @@ use object_store::{util, ObjectStore};
use secrecy::ExposeSecret;
use snafu::prelude::*;
use crate::datanode::GcsConfig;
use crate::config::GcsConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;

View File

@@ -18,7 +18,7 @@ use object_store::{util, ObjectStore};
use secrecy::ExposeSecret;
use snafu::prelude::*;
use crate::datanode::OssConfig;
use crate::config::OssConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;

View File

@@ -18,7 +18,7 @@ use object_store::{util, ObjectStore};
use secrecy::ExposeSecret;
use snafu::prelude::*;
use crate::datanode::S3Config;
use crate::config::S3Config;
use crate::error::{self, Result};
use crate::store::build_http_client;

View File

@@ -12,6 +12,7 @@ Affected Rows: 3
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
TQL ANALYZE (0, 10, '5s') test;
+-+-+
@@ -22,7 +23,7 @@ TQL ANALYZE (0, 10, '5s') test;
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
|_|_SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST], REDACTED
|_|_StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "i", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "j", data_type: Timestamp(Millisecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"greptime:time_index": "true"} }, Field { name: "k", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] }, REDACTED
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+

View File

@@ -8,6 +8,7 @@ INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a");
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
TQL ANALYZE (0, 10, '5s') test;
DROP TABLE test;

View File

@@ -9,23 +9,24 @@ Affected Rows: 3
-- explain at 0s, 5s and 10s. No point at 0s.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
TQL EXPLAIN (0, 10, '5s') test;
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | Sort: test.k DESC NULLS LAST, test.j DESC NULLS LAST |
| | TableScan: test projection=[i, j, k], partial_filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "i", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "j", data_type: Timestamp(Millisecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"greptime:time_index": "true"} }, Field { name: "k", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+-----------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | Sort: test.k DESC NULLS LAST, test.j DESC NULLS LAST |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------+
DROP TABLE test;

View File

@@ -5,6 +5,7 @@ INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a");
-- explain at 0s, 5s and 10s. No point at 0s.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
TQL EXPLAIN (0, 10, '5s') test;
DROP TABLE test;