Compare commits

...

5 Commits

Author SHA1 Message Date
Zhenchi
21bcd0fc81 chore: bump version to v0.10.2
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-26 21:04:16 +08:00
Lei, HUANG
49db50af81 feat: use cache kv manager for SchemaMetadataManager (#5053)
* feat: add cache for schema options

* fix/use-cache-kv-manager: Add cache invalidation handling to Datanode's heartbeat task

 • Implement InvalidateSchemaCacheHandler in heartbeat.rs to handle cache invalidation instructions.
 • Update HeartbeatTask constructor to accept cached_kv_backend and pass it to InvalidateSchemaCacheHandler.
 • Modify DatanodeBuilder to clone cached_kv_backend when creating schema_metadata_manager.
 • Refactor MetasrvCacheInvalidator in cache_invalidator.rs to reuse MailboxMessage for broadcasting to different channels.

* fix: only remove schema related cache entries

* chore: add more tests

* fix/use-cache-kv-manager: Moved InvalidateSchemaCacheHandler to a separate module

 • Extracted InvalidateSchemaCacheHandler and associated tests into a new file cache_invalidator.rs
 • Removed async_trait and CacheInvalidator related code from heartbeat.rs
 • Added cache_invalidator module declaration in handler.rs

* fix: unit tests

* fix/use-cache-kv-manager:
 Standardize TODO comment format in CachedKvBackend txn method

* Update src/datanode/src/heartbeat/handler/cache_invalidator.rs

* Update src/datanode/src/heartbeat/handler/cache_invalidator.rs

* Update src/datanode/src/heartbeat/handler/cache_invalidator.rs

---------

Co-authored-by: jeremyhi <jiachun_feng@proton.me>
2024-11-26 21:04:16 +08:00
Yingwen
8c804f6eeb fix: pass series row selector to file range reader (#5054) 2024-11-26 21:04:16 +08:00
Lei, HUANG
e060280ddc fix(metric-engine): set ttl also on opening metadata regions (#5051)
* fix/metric-metadata-region-options: Remove APPEND_MODE_KEY and refactor TTL option handling in MetricEngineInner

* fix/metric-metadata-region-options: Refactor metadata region options into a shared function

 • Extract metadata region options into region_options_for_metadata_region function
 • Replace inline options map with a call to the new shared function in both create.rs and open.rs files

* fix: exclude typos

* fix/metric-metadata-region-options:
 Refactor metadata region options to accept original options and remove APPEND_MODE_KEY
2024-11-26 21:04:16 +08:00
LFC
0787c5da66 refactor: expose configs for http clients used in object store (#5041) 2024-11-26 21:04:16 +08:00
27 changed files with 478 additions and 154 deletions

140
Cargo.lock generated
View File

@@ -208,7 +208,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"common-base",
"common-decimal",
@@ -769,7 +769,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -1379,7 +1379,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"catalog",
"common-error",
@@ -1387,7 +1387,7 @@ dependencies = [
"common-meta",
"moka",
"snafu 0.8.5",
"substrait 0.10.1",
"substrait 0.10.2",
]
[[package]]
@@ -1414,7 +1414,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"arrow",
@@ -1753,7 +1753,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "client"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"arc-swap",
@@ -1782,7 +1782,7 @@ dependencies = [
"rand",
"serde_json",
"snafu 0.8.5",
"substrait 0.10.1",
"substrait 0.10.2",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -1823,7 +1823,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"async-trait",
"auth",
@@ -1882,7 +1882,7 @@ dependencies = [
"similar-asserts",
"snafu 0.8.5",
"store-api",
"substrait 0.10.1",
"substrait 0.10.2",
"table",
"temp-env",
"tempfile",
@@ -1928,7 +1928,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"anymap2",
"async-trait",
@@ -1949,7 +1949,7 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"chrono",
"common-error",
@@ -1960,7 +1960,7 @@ dependencies = [
[[package]]
name = "common-config"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"common-base",
"common-error",
@@ -1983,7 +1983,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"arrow",
"arrow-schema",
@@ -2020,7 +2020,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"bigdecimal 0.4.5",
"common-error",
@@ -2033,7 +2033,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"snafu 0.8.5",
"strum 0.25.0",
@@ -2042,7 +2042,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -2057,7 +2057,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"approx 0.5.1",
@@ -2102,7 +2102,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"async-trait",
"common-runtime",
@@ -2119,7 +2119,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"arrow-flight",
@@ -2145,7 +2145,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"common-base",
@@ -2164,7 +2164,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"arc-swap",
"common-query",
@@ -2178,7 +2178,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"common-error",
"common-macro",
@@ -2191,7 +2191,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"anymap2",
"api",
@@ -2248,7 +2248,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2257,11 +2257,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.10.1"
version = "0.10.2"
[[package]]
name = "common-pprof"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"common-error",
"common-macro",
@@ -2273,7 +2273,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"async-stream",
"async-trait",
@@ -2300,7 +2300,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"async-trait",
"common-procedure",
@@ -2308,7 +2308,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -2334,7 +2334,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"arc-swap",
"common-error",
@@ -2353,7 +2353,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2383,7 +2383,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"atty",
"backtrace",
@@ -2411,7 +2411,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"client",
"common-query",
@@ -2423,7 +2423,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"arrow",
"chrono",
@@ -2439,7 +2439,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"build-data",
"const_format",
@@ -2450,7 +2450,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"common-base",
"common-error",
@@ -3259,7 +3259,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"arrow-flight",
@@ -3309,7 +3309,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.10.1",
"substrait 0.10.2",
"table",
"tokio",
"toml 0.8.19",
@@ -3318,7 +3318,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"arrow",
"arrow-array",
@@ -3936,7 +3936,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -4053,7 +4053,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"arrow",
@@ -4110,7 +4110,7 @@ dependencies = [
"snafu 0.8.5",
"store-api",
"strum 0.25.0",
"substrait 0.10.1",
"substrait 0.10.2",
"table",
"tokio",
"tonic 0.11.0",
@@ -4172,7 +4172,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"arc-swap",
@@ -5312,7 +5312,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6156,7 +6156,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-store"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"async-stream",
"async-trait",
@@ -6486,7 +6486,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -6513,7 +6513,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -6592,7 +6592,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"aquamarine",
@@ -6695,7 +6695,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"aquamarine",
@@ -7459,7 +7459,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"anyhow",
"bytes",
@@ -7750,7 +7750,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"async-stream",
@@ -7797,7 +7797,7 @@ dependencies = [
"sql",
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
"store-api",
"substrait 0.10.1",
"substrait 0.10.2",
"table",
"tokio",
"tokio-util",
@@ -8047,7 +8047,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -8348,7 +8348,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8511,7 +8511,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"auth",
"common-base",
@@ -8785,7 +8785,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -9023,7 +9023,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9147,7 +9147,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9214,7 +9214,7 @@ dependencies = [
"stats-cli",
"store-api",
"streaming-stats",
"substrait 0.10.1",
"substrait 0.10.2",
"table",
"tokio",
"tokio-stream",
@@ -10677,7 +10677,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "script"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"arc-swap",
@@ -10971,7 +10971,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"ahash 0.8.11",
"aide",
@@ -11086,7 +11086,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"arc-swap",
@@ -11432,7 +11432,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"chrono",
@@ -11495,7 +11495,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11715,7 +11715,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"aquamarine",
@@ -11886,7 +11886,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"async-trait",
"bytes",
@@ -12085,7 +12085,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -12351,7 +12351,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"arbitrary",
"async-trait",
@@ -12393,7 +12393,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.10.1"
version = "0.10.2"
dependencies = [
"api",
"arrow-flight",
@@ -12457,7 +12457,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.10.1",
"substrait 0.10.2",
"table",
"tempfile",
"time",

View File

@@ -66,7 +66,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.10.1"
version = "0.10.2"
edition = "2021"
license = "Apache-2.0"

View File

@@ -109,6 +109,11 @@
| `storage.sas_token` | String | Unset | The sas token of the azure account.<br/>**It's only used when the storage type is `Azblob`**. |
| `storage.endpoint` | String | Unset | The endpoint of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.region` | String | Unset | The region of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client` | -- | -- | The http client options to the storage.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client.pool_max_idle_per_host` | Integer | `1024` | The maximum idle connection per host allowed in the pool. |
| `storage.http_client.connect_timeout` | String | `30s` | The timeout for only the connect phase of a http client. |
| `storage.http_client.timeout` | String | `30s` | The total request timeout, applied from when the request starts connecting until the response body has finished.<br/>Also considered a total deadline. |
| `storage.http_client.pool_idle_timeout` | String | `90s` | The timeout for idle sockets being kept-alive. |
| `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. |
| `region_engine.mito` | -- | -- | The Mito engine options. |
| `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. |
@@ -432,6 +437,11 @@
| `storage.sas_token` | String | Unset | The sas token of the azure account.<br/>**It's only used when the storage type is `Azblob`**. |
| `storage.endpoint` | String | Unset | The endpoint of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.region` | String | Unset | The region of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client` | -- | -- | The http client options to the storage.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client.pool_max_idle_per_host` | Integer | `1024` | The maximum idle connection per host allowed in the pool. |
| `storage.http_client.connect_timeout` | String | `30s` | The timeout for only the connect phase of a http client. |
| `storage.http_client.timeout` | String | `30s` | The total request timeout, applied from when the request starts connecting until the response body has finished.<br/>Also considered a total deadline. |
| `storage.http_client.pool_idle_timeout` | String | `90s` | The timeout for idle sockets being kept-alive. |
| `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. |
| `region_engine.mito` | -- | -- | The Mito engine options. |
| `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. |

View File

@@ -375,6 +375,23 @@ endpoint = "https://s3.amazonaws.com"
## @toml2docs:none-default
region = "us-west-2"
## The http client options to the storage.
## **It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**.
[storage.http_client]
## The maximum idle connection per host allowed in the pool.
pool_max_idle_per_host = 1024
## The timeout for only the connect phase of a http client.
connect_timeout = "30s"
## The total request timeout, applied from when the request starts connecting until the response body has finished.
## Also considered a total deadline.
timeout = "30s"
## The timeout for idle sockets being kept-alive.
pool_idle_timeout = "90s"
# Custom storage options
# [[storage.providers]]
# name = "S3"

View File

@@ -413,6 +413,23 @@ endpoint = "https://s3.amazonaws.com"
## @toml2docs:none-default
region = "us-west-2"
## The http client options to the storage.
## **It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**.
[storage.http_client]
## The maximum idle connection per host allowed in the pool.
pool_max_idle_per_host = 1024
## The timeout for only the connect phase of a http client.
connect_timeout = "30s"
## The total request timeout, applied from when the request starts connecting until the response body has finished.
## Also considered a total deadline.
timeout = "30s"
## The timeout for idle sockets being kept-alive.
pool_idle_timeout = "90s"
# Custom storage options
# [[storage.providers]]
# name = "S3"

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use client::{CachedMetaKvBackend, CachedMetaKvBackendBuilder, MetaKvBackend};
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend};
mod client;
mod manager;

View File

@@ -22,6 +22,7 @@ use common_error::ext::BoxedError;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::error::Error::CacheNotGet;
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
@@ -42,20 +43,20 @@ const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000;
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
pub struct CachedMetaKvBackendBuilder {
pub struct CachedKvBackendBuilder {
cache_max_capacity: Option<u64>,
cache_ttl: Option<Duration>,
cache_tti: Option<Duration>,
meta_client: Arc<MetaClient>,
inner: KvBackendRef,
}
impl CachedMetaKvBackendBuilder {
pub fn new(meta_client: Arc<MetaClient>) -> Self {
impl CachedKvBackendBuilder {
pub fn new(inner: KvBackendRef) -> Self {
Self {
cache_max_capacity: None,
cache_ttl: None,
cache_tti: None,
meta_client,
inner,
}
}
@@ -74,7 +75,7 @@ impl CachedMetaKvBackendBuilder {
self
}
pub fn build(self) -> CachedMetaKvBackend {
pub fn build(self) -> CachedKvBackend {
let cache_max_capacity = self
.cache_max_capacity
.unwrap_or(DEFAULT_CACHE_MAX_CAPACITY);
@@ -85,14 +86,11 @@ impl CachedMetaKvBackendBuilder {
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build();
let kv_backend = Arc::new(MetaKvBackend {
client: self.meta_client,
});
let kv_backend = self.inner;
let name = format!("CachedKvBackend({})", kv_backend.name());
let version = AtomicUsize::new(0);
CachedMetaKvBackend {
CachedKvBackend {
kv_backend,
cache,
name,
@@ -112,19 +110,29 @@ pub type CacheBackend = Cache<Vec<u8>, KeyValue>;
/// Therefore, it is recommended to use CachedMetaKvBackend to only read metadata related
/// information. Note: If you read other information, you may read expired data, which depends on
/// TTL and TTI for cache.
pub struct CachedMetaKvBackend {
pub struct CachedKvBackend {
kv_backend: KvBackendRef,
cache: CacheBackend,
name: String,
version: AtomicUsize,
}
impl TxnService for CachedMetaKvBackend {
#[async_trait::async_trait]
impl TxnService for CachedKvBackend {
type Error = Error;
async fn txn(&self, txn: Txn) -> std::result::Result<TxnResponse, Self::Error> {
// TODO(hl): txn of CachedKvBackend simply pass through to inner backend without invalidating caches.
self.kv_backend.txn(txn).await
}
fn max_txn_ops(&self) -> usize {
self.kv_backend.max_txn_ops()
}
}
#[async_trait::async_trait]
impl KvBackend for CachedMetaKvBackend {
impl KvBackend for CachedKvBackend {
fn name(&self) -> &str {
&self.name
}
@@ -305,7 +313,7 @@ impl KvBackend for CachedMetaKvBackend {
}
#[async_trait::async_trait]
impl KvCacheInvalidator for CachedMetaKvBackend {
impl KvCacheInvalidator for CachedKvBackend {
async fn invalidate_key(&self, key: &[u8]) {
self.create_new_version();
self.cache.invalidate(key).await;
@@ -313,7 +321,7 @@ impl KvCacheInvalidator for CachedMetaKvBackend {
}
}
impl CachedMetaKvBackend {
impl CachedKvBackend {
// only for test
#[cfg(test)]
fn wrap(kv_backend: KvBackendRef) -> Self {
@@ -466,7 +474,7 @@ mod tests {
use common_meta::rpc::KeyValue;
use dashmap::DashMap;
use super::CachedMetaKvBackend;
use super::CachedKvBackend;
#[derive(Default)]
pub struct SimpleKvBackend {
@@ -540,7 +548,7 @@ mod tests {
async fn test_cached_kv_backend() {
let simple_kv = Arc::new(SimpleKvBackend::default());
let get_execute_times = simple_kv.get_execute_times.clone();
let cached_kv = CachedMetaKvBackend::wrap(simple_kv);
let cached_kv = CachedKvBackend::wrap(simple_kv);
add_some_vals(&cached_kv).await;

View File

@@ -21,13 +21,14 @@ use cache::{
TABLE_ROUTE_CACHE_NAME,
};
use catalog::kvbackend::{
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
CachedKvBackend, CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_config::Mode;
use common_error::ext::ErrorExt;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::kv_backend::KvBackendRef;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::debug;
@@ -258,8 +259,9 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
.context(StartMetaClientSnafu)?;
let meta_client = Arc::new(meta_client);
let cached_meta_backend =
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
let cached_meta_backend = Arc::new(
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(),
);
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default()
.add_cache(cached_meta_backend.clone())

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
@@ -246,11 +246,12 @@ impl StartCommand {
let cache_tti = meta_config.metadata_cache_tti;
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);
// Builds cache registry

View File

@@ -17,7 +17,7 @@ use std::time::Duration;
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
@@ -293,11 +293,12 @@ impl StartCommand {
.context(MetaClientInitSnafu)?;
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);
// Builds cache registry

View File

@@ -14,6 +14,8 @@
//! Datanode configurations
use core::time::Duration;
use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_config::Configurable;
@@ -112,6 +114,38 @@ pub struct ObjectStorageCacheConfig {
pub cache_capacity: Option<ReadableSize>,
}
/// The http client options to the storage.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct HttpClientConfig {
/// The maximum idle connection per host allowed in the pool.
pub(crate) pool_max_idle_per_host: u32,
/// The timeout for only the connect phase of a http client.
#[serde(with = "humantime_serde")]
pub(crate) connect_timeout: Duration,
/// The total request timeout, applied from when the request starts connecting until the response body has finished.
/// Also considered a total deadline.
#[serde(with = "humantime_serde")]
pub(crate) timeout: Duration,
/// The timeout for idle sockets being kept-alive.
#[serde(with = "humantime_serde")]
pub(crate) pool_idle_timeout: Duration,
}
impl Default for HttpClientConfig {
fn default() -> Self {
Self {
pool_max_idle_per_host: 1024,
connect_timeout: Duration::from_secs(30),
timeout: Duration::from_secs(30),
pool_idle_timeout: Duration::from_secs(90),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct S3Config {
@@ -126,6 +160,7 @@ pub struct S3Config {
pub region: Option<String>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for S3Config {
@@ -138,6 +173,7 @@ impl PartialEq for S3Config {
&& self.endpoint == other.endpoint
&& self.region == other.region
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
@@ -154,6 +190,7 @@ pub struct OssConfig {
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for OssConfig {
@@ -165,6 +202,7 @@ impl PartialEq for OssConfig {
&& self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
@@ -182,6 +220,7 @@ pub struct AzblobConfig {
pub sas_token: Option<String>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for AzblobConfig {
@@ -194,6 +233,7 @@ impl PartialEq for AzblobConfig {
&& self.endpoint == other.endpoint
&& self.sas_token == other.sas_token
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
@@ -211,6 +251,7 @@ pub struct GcsConfig {
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for GcsConfig {
@@ -223,6 +264,7 @@ impl PartialEq for GcsConfig {
&& self.credential.expose_secret() == other.credential.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
@@ -237,6 +279,7 @@ impl Default for S3Config {
endpoint: Option::default(),
region: Option::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
@@ -251,6 +294,7 @@ impl Default for OssConfig {
access_key_secret: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
@@ -266,6 +310,7 @@ impl Default for AzblobConfig {
endpoint: String::default(),
sas_token: Option::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
@@ -281,6 +326,7 @@ impl Default for GcsConfig {
credential: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}

View File

@@ -18,6 +18,7 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use catalog::kvbackend::CachedKvBackendBuilder;
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
@@ -208,7 +209,10 @@ impl DatanodeBuilder {
(Box::new(NoopRegionServerEventListener) as _, None)
};
let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(kv_backend.clone()));
let cached_kv_backend = Arc::new(CachedKvBackendBuilder::new(kv_backend.clone()).build());
let schema_metadata_manager =
Arc::new(SchemaMetadataManager::new(cached_kv_backend.clone()));
let region_server = self
.new_region_server(schema_metadata_manager, region_event_listener)
.await?;
@@ -239,7 +243,15 @@ impl DatanodeBuilder {
}
let heartbeat_task = if let Some(meta_client) = meta_client {
Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?)
Some(
HeartbeatTask::try_new(
&self.opts,
region_server.clone(),
meta_client,
cached_kv_backend,
)
.await?,
)
} else {
None
};

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use catalog::kvbackend::CachedKvBackend;
use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
@@ -39,6 +40,7 @@ use crate::alive_keeper::RegionAliveKeeper;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler;
use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
use crate::region_server::RegionServer;
@@ -70,6 +72,7 @@ impl HeartbeatTask {
opts: &DatanodeOptions,
region_server: RegionServer,
meta_client: MetaClientRef,
cache_kv_backend: Arc<CachedKvBackend>,
) -> Result<Self> {
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
@@ -79,6 +82,7 @@ impl HeartbeatTask {
region_alive_keeper.clone(),
Arc::new(ParseMailboxMessageHandler),
Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
Arc::new(InvalidateSchemaCacheHandler::new(cache_kv_backend)),
]));
Ok(Self {

View File

@@ -24,6 +24,7 @@ use futures::future::BoxFuture;
use snafu::OptionExt;
use store_api::storage::RegionId;
pub(crate) mod cache_invalidator;
mod close_region;
mod downgrade_region;
mod open_region;
@@ -134,7 +135,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
}
});
Ok(HandleControl::Done)
Ok(HandleControl::Continue)
}
}
@@ -285,7 +286,7 @@ mod tests {
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
@@ -340,7 +341,7 @@ mod tests {
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
@@ -373,7 +374,7 @@ mod tests {
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
@@ -420,7 +421,7 @@ mod tests {
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
@@ -442,7 +443,7 @@ mod tests {
});
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

View File

@@ -0,0 +1,167 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Schema cache invalidator handler
use std::sync::Arc;
use async_trait::async_trait;
use catalog::kvbackend::CachedKvBackend;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::MetadataKey;
use common_telemetry::debug;
#[derive(Clone)]
pub(crate) struct InvalidateSchemaCacheHandler {
cached_kv_backend: Arc<CachedKvBackend>,
}
#[async_trait]
impl HeartbeatResponseHandler for InvalidateSchemaCacheHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(
ctx.incoming_message.as_ref(),
Some((_, Instruction::InvalidateCaches(_)))
)
}
async fn handle(
&self,
ctx: &mut HeartbeatResponseHandlerContext,
) -> common_meta::error::Result<HandleControl> {
let Some((_, Instruction::InvalidateCaches(caches))) = ctx.incoming_message.take() else {
unreachable!("InvalidateSchemaCacheHandler: should be guarded by 'is_acceptable'")
};
debug!(
"InvalidateSchemaCacheHandler: invalidating caches: {:?}",
caches
);
for cache in caches {
let CacheIdent::SchemaName(schema_name) = cache else {
continue;
};
let key: SchemaNameKey = (&schema_name).into();
let key_bytes = key.to_bytes();
// invalidate cache
self.cached_kv_backend.invalidate_key(&key_bytes).await;
}
Ok(HandleControl::Done)
}
}
impl InvalidateSchemaCacheHandler {
pub fn new(cached_kv_backend: Arc<CachedKvBackend>) -> Self {
Self { cached_kv_backend }
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::HeartbeatResponse;
use catalog::kvbackend::CachedKvBackendBuilder;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
use common_meta::key::{MetadataKey, SchemaMetadataManager};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackend;
use common_meta::rpc::store::PutRequest;
use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler;
#[tokio::test]
async fn test_invalidate_schema_cache_handler() {
let inner_kv = Arc::new(MemoryKvBackend::default());
let cached_kv = Arc::new(CachedKvBackendBuilder::new(inner_kv.clone()).build());
let schema_metadata_manager = SchemaMetadataManager::new(cached_kv.clone());
let schema_name = "test_schema";
let catalog_name = "test_catalog";
schema_metadata_manager
.register_region_table_info(
1,
"test_table",
schema_name,
catalog_name,
Some(SchemaNameValue {
ttl: Some(Duration::from_secs(1)),
}),
)
.await;
schema_metadata_manager
.get_schema_options_by_table_id(1)
.await
.unwrap();
let schema_key = SchemaNameKey::new(catalog_name, schema_name).to_bytes();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(3)),
}
.try_as_raw_value()
.unwrap();
inner_kv
.put(PutRequest {
key: schema_key.clone(),
value: new_schema_value,
prev_kv: false,
})
.await
.unwrap();
let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
InvalidateSchemaCacheHandler::new(cached_kv),
)]));
let (tx, _) = tokio::sync::mpsc::channel(8);
let mailbox = Arc::new(HeartbeatMailbox::new(tx));
// removes a valid key
let response = HeartbeatResponse::default();
let mut ctx: HeartbeatResponseHandlerContext =
HeartbeatResponseHandlerContext::new(mailbox, response);
ctx.incoming_message = Some((
MessageMeta::new_test(1, "hi", "foo", "bar"),
Instruction::InvalidateCaches(vec![CacheIdent::SchemaName(SchemaName {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
})]),
));
executor.handle(ctx).await.unwrap();
assert_eq!(
Some(Duration::from_secs(3)),
SchemaNameValue::try_from_raw_value(
&inner_kv.get(&schema_key).await.unwrap().unwrap().value
)
.unwrap()
.unwrap()
.ttl
);
}
}

View File

@@ -32,7 +32,7 @@ use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;
use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};
pub(crate) async fn new_raw_object_store(
@@ -177,7 +177,7 @@ pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
Ok(())
}
pub(crate) fn build_http_client() -> Result<HttpClient> {
pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient> {
let http_builder = {
let mut builder = reqwest::ClientBuilder::new();
@@ -186,25 +186,28 @@ pub(crate) fn build_http_client() -> Result<HttpClient> {
let pool_max_idle_per_host = env::var("_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(usize::MAX);
.inspect(|_| warn!("'_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST' might be deprecated in the future. Please set it in the config file instead."))
.unwrap_or(config.pool_max_idle_per_host as usize);
builder = builder.pool_max_idle_per_host(pool_max_idle_per_host);
// Connect timeout default to 30s.
let connect_timeout = env::var("_GREPTIMEDB_HTTP_CONNECT_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(30);
builder = builder.connect_timeout(Duration::from_secs(connect_timeout));
.and_then(|v| v.parse::<u64>().ok().map(Duration::from_secs))
.inspect(|_| warn!("'_GREPTIMEDB_HTTP_CONNECT_TIMEOUT' might be deprecated in the future. Please set it in the config file instead."))
.unwrap_or(config.connect_timeout);
builder = builder.connect_timeout(connect_timeout);
// Pool connection idle timeout default to 90s.
let idle_timeout = env::var("_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(90);
.and_then(|v| v.parse::<u64>().ok().map(Duration::from_secs))
.inspect(|_| warn!("'_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT' might be deprecated in the future. Please set it in the config file instead."))
.unwrap_or(config.pool_idle_timeout);
builder = builder.pool_idle_timeout(Duration::from_secs(idle_timeout));
builder = builder.pool_idle_timeout(idle_timeout);
builder
builder.timeout(config.timeout)
};
HttpClient::build(http_builder).context(error::InitBackendSnafu)

View File

@@ -30,13 +30,15 @@ pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Res
azblob_config.container, &root
);
let client = build_http_client(&azblob_config.http_client)?;
let mut builder = Azblob::default()
.root(&root)
.container(&azblob_config.container)
.endpoint(&azblob_config.endpoint)
.account_name(azblob_config.account_name.expose_secret())
.account_key(azblob_config.account_key.expose_secret())
.http_client(build_http_client()?);
.http_client(client);
if let Some(token) = &azblob_config.sas_token {
builder = builder.sas_token(token);

View File

@@ -29,6 +29,8 @@ pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<Objec
gcs_config.bucket, &root
);
let client = build_http_client(&gcs_config.http_client);
let builder = Gcs::default()
.root(&root)
.bucket(&gcs_config.bucket)
@@ -36,7 +38,7 @@ pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<Objec
.credential_path(gcs_config.credential_path.expose_secret())
.credential(gcs_config.credential.expose_secret())
.endpoint(&gcs_config.endpoint)
.http_client(build_http_client()?);
.http_client(client?);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?

View File

@@ -29,13 +29,15 @@ pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result<Objec
oss_config.bucket, &root
);
let client = build_http_client(&oss_config.http_client)?;
let builder = Oss::default()
.root(&root)
.bucket(&oss_config.bucket)
.endpoint(&oss_config.endpoint)
.access_key_id(oss_config.access_key_id.expose_secret())
.access_key_secret(oss_config.access_key_secret.expose_secret())
.http_client(build_http_client()?);
.http_client(client);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?

View File

@@ -30,12 +30,14 @@ pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectSt
s3_config.bucket, &root
);
let client = build_http_client(&s3_config.http_client)?;
let mut builder = S3::default()
.root(&root)
.bucket(&s3_config.bucket)
.access_key_id(s3_config.access_key_id.expose_secret())
.secret_access_key(s3_config.secret_access_key.expose_secret())
.http_client(build_http_client()?);
.http_client(client);
if s3_config.endpoint.is_some() {
builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());

View File

@@ -44,7 +44,7 @@ impl MetasrvCacheInvalidator {
.clone()
.unwrap_or_else(|| DEFAULT_SUBJECT.to_string());
let msg = &MailboxMessage::json_message(
let mut msg = MailboxMessage::json_message(
subject,
&format!("Metasrv@{}", self.info.server_addr),
"Frontend broadcast",
@@ -54,22 +54,21 @@ impl MetasrvCacheInvalidator {
.with_context(|_| meta_error::SerdeJsonSnafu)?;
self.mailbox
.broadcast(&BroadcastChannel::Frontend, msg)
.broadcast(&BroadcastChannel::Frontend, &msg)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
let msg = &MailboxMessage::json_message(
subject,
&format!("Metasrv@{}", self.info.server_addr),
"Flownode broadcast",
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| meta_error::SerdeJsonSnafu)?;
msg.to = "Datanode broadcast".to_string();
self.mailbox
.broadcast(&BroadcastChannel::Flownode, msg)
.broadcast(&BroadcastChannel::Datanode, &msg)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
msg.to = "Flownode broadcast".to_string();
self.mailbox
.broadcast(&BroadcastChannel::Flownode, &msg)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)

View File

@@ -456,11 +456,7 @@ impl MetricEngineInner {
// concat region dir
let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
// remove TTL and APPEND_MODE option
let mut options = request.options.clone();
options.insert(TTL_KEY.to_string(), "10000 years".to_string());
options.remove(APPEND_MODE_KEY);
let options = region_options_for_metadata_region(request.options.clone());
RegionCreateRequest {
engine: MITO_ENGINE_NAME.to_string(),
column_metadatas: vec![
@@ -539,6 +535,15 @@ impl MetricEngineInner {
}
}
/// Creates the region options for metadata region in metric engine.
pub(crate) fn region_options_for_metadata_region(
mut original: HashMap<String, String>,
) -> HashMap<String, String> {
original.remove(APPEND_MODE_KEY);
original.insert(TTL_KEY.to_string(), "10000 years".to_string());
original
}
#[cfg(test)]
mod test {
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};

View File

@@ -24,6 +24,7 @@ use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;
use super::MetricEngineInner;
use crate::engine::create::region_options_for_metadata_region;
use crate::engine::options::set_data_region_options;
use crate::error::{OpenMitoRegionSnafu, Result};
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
@@ -68,9 +69,10 @@ impl MetricEngineInner {
let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
let metadata_region_options = region_options_for_metadata_region(request.options.clone());
let open_metadata_region_request = RegionOpenRequest {
region_dir: metadata_region_dir,
options: request.options.clone(),
options: metadata_region_options,
engine: MITO_ENGINE_NAME.to_string(),
skip_wal_replay: request.skip_wal_replay,
};

View File

@@ -170,7 +170,7 @@ pub(crate) fn scan_file_ranges(
for range in ranges {
let build_reader_start = Instant::now();
let reader = range.reader(None).await?;
let reader = range.reader(stream_ctx.input.series_row_selector).await?;
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);
let compat_batch = range.compat_batch();

View File

@@ -20,7 +20,7 @@ use std::time::Duration;
use api::v1::region::region_server::RegionServer;
use arrow_flight::flight_service_server::FlightServiceServer;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use client::client_manager::NodeClients;
use client::Client;
use cmd::DistributedInformationExtension;
@@ -351,8 +351,9 @@ impl GreptimeDbClusterBuilder {
meta_client.start(&[&metasrv.server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);
let cached_meta_backend =
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
let cached_meta_backend = Arc::new(
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(),
);
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default()

View File

@@ -799,6 +799,28 @@ pub async fn test_config_api(store_type: StorageType) {
let res_get = client.get("/config").send().await;
assert_eq!(res_get.status(), StatusCode::OK);
let storage = if store_type != StorageType::File {
format!(
r#"[storage]
type = "{}"
providers = []
[storage.http_client]
pool_max_idle_per_host = 1024
connect_timeout = "30s"
timeout = "30s"
pool_idle_timeout = "1m 30s""#,
store_type
)
} else {
format!(
r#"[storage]
type = "{}"
providers = []"#,
store_type
)
};
let expected_toml_str = format!(
r#"
mode = "standalone"
@@ -867,9 +889,7 @@ sync_write = false
enable_log_recycle = true
prefill_log_files = false
[storage]
type = "{}"
providers = []
{storage}
[metadata_store]
file_size = "256MiB"
@@ -933,7 +953,6 @@ enable = false
write_interval = "30s"
[tracing]"#,
store_type
)
.trim()
.to_string();

View File

@@ -11,5 +11,6 @@ extend-exclude = [
"tests-fuzz/src/data/lorem_words",
"*.sql",
"*.result",
"src/pipeline/benches/data.log"
"src/pipeline/benches/data.log",
"cyborg/pnpm-lock.yaml"
]