mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
Compare commits
39 Commits
refactor/e
...
feat/sst-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d5760a7348 | ||
|
|
bc9614e22c | ||
|
|
7dd9e98ff6 | ||
|
|
fb6b7f7801 | ||
|
|
87d7c316df | ||
|
|
c80a73bc20 | ||
|
|
dd9d13e7df | ||
|
|
79d249f5fa | ||
|
|
63bc544514 | ||
|
|
30c29539a3 | ||
|
|
359da62d9e | ||
|
|
c9f4b36360 | ||
|
|
85c346b16a | ||
|
|
738c23beb0 | ||
|
|
8aadd1e59a | ||
|
|
cbd58291da | ||
|
|
e522e8959b | ||
|
|
7183a93e5a | ||
|
|
8c538622e2 | ||
|
|
142dacb2c8 | ||
|
|
371afc458f | ||
|
|
0751cd74c0 | ||
|
|
ec34e8739a | ||
|
|
b650743785 | ||
|
|
80a8b2e1bd | ||
|
|
ec8a15cadd | ||
|
|
f929d751a5 | ||
|
|
fad3621a7a | ||
|
|
87723effc7 | ||
|
|
62a333ad09 | ||
|
|
6ad186a13e | ||
|
|
77dee84a75 | ||
|
|
a57e263e5a | ||
|
|
8796ddaf31 | ||
|
|
7fa3fbdfef | ||
|
|
457d2a620c | ||
|
|
9f14edbb28 | ||
|
|
cb3fad0c2d | ||
|
|
2d1e7c2441 |
100
Cargo.lock
generated
100
Cargo.lock
generated
@@ -1594,7 +1594,7 @@ dependencies = [
|
|||||||
"bitflags 1.3.2",
|
"bitflags 1.3.2",
|
||||||
"strsim 0.8.0",
|
"strsim 0.8.0",
|
||||||
"textwrap 0.11.0",
|
"textwrap 0.11.0",
|
||||||
"unicode-width",
|
"unicode-width 0.1.14",
|
||||||
"vec_map",
|
"vec_map",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -1876,7 +1876,7 @@ checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"strum 0.26.3",
|
"strum 0.26.3",
|
||||||
"strum_macros 0.26.4",
|
"strum_macros 0.26.4",
|
||||||
"unicode-width",
|
"unicode-width 0.1.14",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -2469,6 +2469,7 @@ dependencies = [
|
|||||||
"encode_unicode",
|
"encode_unicode",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"libc",
|
"libc",
|
||||||
|
"unicode-width 0.1.14",
|
||||||
"windows-sys 0.52.0",
|
"windows-sys 0.52.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -4645,7 +4646,7 @@ version = "0.2.21"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "14dbbfd5c71d70241ecf9e6f13737f7b5ce823821063188d7e46c41d371eebd5"
|
checksum = "14dbbfd5c71d70241ecf9e6f13737f7b5ce823821063188d7e46c41d371eebd5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"unicode-width",
|
"unicode-width 0.1.14",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -5599,6 +5600,19 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "indicatif"
|
||||||
|
version = "0.17.11"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235"
|
||||||
|
dependencies = [
|
||||||
|
"console",
|
||||||
|
"number_prefix",
|
||||||
|
"portable-atomic",
|
||||||
|
"unicode-width 0.2.0",
|
||||||
|
"web-time 1.1.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "inferno"
|
name = "inferno"
|
||||||
version = "0.11.21"
|
version = "0.11.21"
|
||||||
@@ -5628,6 +5642,25 @@ dependencies = [
|
|||||||
"snafu 0.7.5",
|
"snafu 0.7.5",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ingester"
|
||||||
|
version = "0.13.0"
|
||||||
|
dependencies = [
|
||||||
|
"clap 4.5.19",
|
||||||
|
"common-telemetry",
|
||||||
|
"common-time",
|
||||||
|
"datanode",
|
||||||
|
"meta-client",
|
||||||
|
"mito2",
|
||||||
|
"object-store",
|
||||||
|
"reqwest",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"sst-convert",
|
||||||
|
"tokio",
|
||||||
|
"toml 0.8.19",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "inotify"
|
name = "inotify"
|
||||||
version = "0.9.6"
|
version = "0.9.6"
|
||||||
@@ -7517,6 +7550,12 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "number_prefix"
|
||||||
|
version = "0.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "objc"
|
name = "objc"
|
||||||
version = "0.2.7"
|
version = "0.2.7"
|
||||||
@@ -7973,7 +8012,7 @@ version = "0.1.6"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d2ad9b889f1b12e0b9ee24db044b5129150d5eada288edc800f789928dc8c0e3"
|
checksum = "d2ad9b889f1b12e0b9ee24db044b5129150d5eada288edc800f789928dc8c0e3"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"unicode-width",
|
"unicode-width 0.1.14",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -8069,6 +8108,19 @@ dependencies = [
|
|||||||
"zstd-sys",
|
"zstd-sys",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "parquet_opendal"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4140ae96f37c170f8d684a544711fabdac1d94adcbd97e8b033329bd37f40446"
|
||||||
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
|
"bytes",
|
||||||
|
"futures",
|
||||||
|
"opendal",
|
||||||
|
"parquet",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "parse-zoneinfo"
|
name = "parse-zoneinfo"
|
||||||
version = "0.3.1"
|
version = "0.3.1"
|
||||||
@@ -10056,7 +10108,7 @@ dependencies = [
|
|||||||
"radix_trie",
|
"radix_trie",
|
||||||
"scopeguard",
|
"scopeguard",
|
||||||
"unicode-segmentation",
|
"unicode-segmentation",
|
||||||
"unicode-width",
|
"unicode-width 0.1.14",
|
||||||
"utf8parse",
|
"utf8parse",
|
||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
@@ -11203,6 +11255,36 @@ dependencies = [
|
|||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "sst-convert"
|
||||||
|
version = "0.13.0"
|
||||||
|
dependencies = [
|
||||||
|
"api",
|
||||||
|
"arrow-array",
|
||||||
|
"async-trait",
|
||||||
|
"catalog",
|
||||||
|
"common-error",
|
||||||
|
"common-macro",
|
||||||
|
"common-meta",
|
||||||
|
"common-recordbatch",
|
||||||
|
"common-telemetry",
|
||||||
|
"datanode",
|
||||||
|
"datatypes",
|
||||||
|
"futures",
|
||||||
|
"futures-util",
|
||||||
|
"indicatif",
|
||||||
|
"meta-client",
|
||||||
|
"metric-engine",
|
||||||
|
"mito2",
|
||||||
|
"object-store",
|
||||||
|
"parquet",
|
||||||
|
"parquet_opendal",
|
||||||
|
"prost 0.13.3",
|
||||||
|
"snafu 0.8.5",
|
||||||
|
"store-api",
|
||||||
|
"table",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "stable_deref_trait"
|
name = "stable_deref_trait"
|
||||||
version = "1.2.0"
|
version = "1.2.0"
|
||||||
@@ -11935,7 +12017,7 @@ version = "0.11.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
|
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"unicode-width",
|
"unicode-width 0.1.14",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -13038,6 +13120,12 @@ version = "0.1.14"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af"
|
checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "unicode-width"
|
||||||
|
version = "0.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "unicode-xid"
|
name = "unicode-xid"
|
||||||
version = "0.2.6"
|
version = "0.2.6"
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ members = [
|
|||||||
"src/flow",
|
"src/flow",
|
||||||
"src/frontend",
|
"src/frontend",
|
||||||
"src/index",
|
"src/index",
|
||||||
|
"src/ingester",
|
||||||
"src/log-query",
|
"src/log-query",
|
||||||
"src/log-store",
|
"src/log-store",
|
||||||
"src/meta-client",
|
"src/meta-client",
|
||||||
@@ -58,6 +59,7 @@ members = [
|
|||||||
"src/servers",
|
"src/servers",
|
||||||
"src/session",
|
"src/session",
|
||||||
"src/sql",
|
"src/sql",
|
||||||
|
"src/sst-convert",
|
||||||
"src/store-api",
|
"src/store-api",
|
||||||
"src/table",
|
"src/table",
|
||||||
"tests-fuzz",
|
"tests-fuzz",
|
||||||
@@ -271,6 +273,7 @@ query = { path = "src/query" }
|
|||||||
servers = { path = "src/servers" }
|
servers = { path = "src/servers" }
|
||||||
session = { path = "src/session" }
|
session = { path = "src/session" }
|
||||||
sql = { path = "src/sql" }
|
sql = { path = "src/sql" }
|
||||||
|
sst-convert = { path = "src/sst-convert" }
|
||||||
store-api = { path = "src/store-api" }
|
store-api = { path = "src/store-api" }
|
||||||
substrait = { path = "src/common/substrait" }
|
substrait = { path = "src/common/substrait" }
|
||||||
table = { path = "src/table" }
|
table = { path = "src/table" }
|
||||||
|
|||||||
76
chore.md
Normal file
76
chore.md
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
# log
|
||||||
|
## first create table
|
||||||
|
```bash
|
||||||
|
mysql --host=127.0.0.1 --port=19195 --database=public;
|
||||||
|
```
|
||||||
|
|
||||||
|
```sql
|
||||||
|
CREATE DATABASE IF NOT EXISTS `cluster1`;
|
||||||
|
USE `cluster1`;
|
||||||
|
CREATE TABLE IF NOT EXISTS `app1` (
|
||||||
|
`greptime_timestamp` TimestampNanosecond NOT NULL TIME INDEX,
|
||||||
|
`app` STRING NULL INVERTED INDEX,
|
||||||
|
`cluster` STRING NULL INVERTED INDEX,
|
||||||
|
`message` STRING NULL,
|
||||||
|
`region` STRING NULL,
|
||||||
|
`cloud-provider` STRING NULL,
|
||||||
|
`environment` STRING NULL,
|
||||||
|
`product` STRING NULL,
|
||||||
|
`sub-product` STRING NULL,
|
||||||
|
`service` STRING NULL
|
||||||
|
) WITH (
|
||||||
|
append_mode = 'true',
|
||||||
|
'compaction.type' = 'twcs',
|
||||||
|
'compaction.twcs.max_output_file_size' = '500MB',
|
||||||
|
'compaction.twcs.max_active_window_files' = '16',
|
||||||
|
'compaction.twcs.max_active_window_runs' = '4',
|
||||||
|
'compaction.twcs.max_inactive_window_files' = '4',
|
||||||
|
'compaction.twcs.max_inactive_window_runs' = '2',
|
||||||
|
);
|
||||||
|
|
||||||
|
select count(*) from app1;
|
||||||
|
|
||||||
|
SELECT * FROM app1 ORDER BY greptime_timestamp DESC LIMIT 10\G
|
||||||
|
```
|
||||||
|
|
||||||
|
## then ingest
|
||||||
|
```bash
|
||||||
|
RUST_LOG="debug" cargo run --bin=ingester -- --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --parquet-dir="parquet_store/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json"
|
||||||
|
```
|
||||||
|
|
||||||
|
# metrics!!!!!!!
|
||||||
|
```bash
|
||||||
|
mysql --host=127.0.0.1 --port=19195 --database=public < public.greptime_physical_table-create-tables.sql
|
||||||
|
```
|
||||||
|
|
||||||
|
## then ingest
|
||||||
|
```bash
|
||||||
|
RUST_LOG="debug"
|
||||||
|
cargo run --bin=ingester -- --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --remote-write-dir="metrics_parquet/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json"
|
||||||
|
# perf it
|
||||||
|
cargo build --release ---bin=ingester
|
||||||
|
samply record target/release/ingester --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --remote-write-dir="metrics_parquet/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json"
|
||||||
|
```
|
||||||
|
|
||||||
|
## check data
|
||||||
|
```sql
|
||||||
|
select count(*) from greptime_physical_table;
|
||||||
|
+----------+
|
||||||
|
| count(*) |
|
||||||
|
+----------+
|
||||||
|
| 36200 |
|
||||||
|
+----------+
|
||||||
|
1 row in set (0.06 sec)
|
||||||
|
|
||||||
|
select count(*) from storage_operation_errors_total;
|
||||||
|
+----------+
|
||||||
|
| count(*) |
|
||||||
|
+----------+
|
||||||
|
| 10 |
|
||||||
|
+----------+
|
||||||
|
1 row in set (0.03 sec)
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
# with oss
|
||||||
|
the same, only different is change storage config in `ingester.toml`
|
||||||
35
ingester.toml
Normal file
35
ingester.toml
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
## The metasrv client options.
|
||||||
|
[meta_client]
|
||||||
|
## The addresses of the metasrv.
|
||||||
|
metasrv_addrs = ["127.0.0.1:3002", "127.0.0.1:3003"]
|
||||||
|
|
||||||
|
## Operation timeout.
|
||||||
|
timeout = "3s"
|
||||||
|
|
||||||
|
## Heartbeat timeout.
|
||||||
|
heartbeat_timeout = "500ms"
|
||||||
|
|
||||||
|
## DDL timeout.
|
||||||
|
ddl_timeout = "10s"
|
||||||
|
|
||||||
|
## Connect server timeout.
|
||||||
|
connect_timeout = "1s"
|
||||||
|
|
||||||
|
## `TCP_NODELAY` option for accepted connections.
|
||||||
|
tcp_nodelay = true
|
||||||
|
|
||||||
|
## The configuration about the cache of the metadata.
|
||||||
|
metadata_cache_max_capacity = 100000
|
||||||
|
|
||||||
|
## TTL of the metadata cache.
|
||||||
|
metadata_cache_ttl = "10m"
|
||||||
|
|
||||||
|
# TTI of the metadata cache.
|
||||||
|
metadata_cache_tti = "5m"
|
||||||
|
|
||||||
|
## The data storage options.
|
||||||
|
[storage]
|
||||||
|
## The working home directory.
|
||||||
|
data_home = "/tmp/greptimedb-cluster/datanode0"
|
||||||
|
type = "File"
|
||||||
|
[mito]
|
||||||
@@ -25,6 +25,6 @@ pub mod heartbeat;
|
|||||||
pub mod metrics;
|
pub mod metrics;
|
||||||
pub mod region_server;
|
pub mod region_server;
|
||||||
pub mod service;
|
pub mod service;
|
||||||
mod store;
|
pub mod store;
|
||||||
#[cfg(any(test, feature = "testing"))]
|
#[cfg(any(test, feature = "testing"))]
|
||||||
pub mod tests;
|
pub mod tests;
|
||||||
|
|||||||
@@ -15,7 +15,7 @@
|
|||||||
//! object storage utilities
|
//! object storage utilities
|
||||||
|
|
||||||
mod azblob;
|
mod azblob;
|
||||||
mod fs;
|
pub mod fs;
|
||||||
mod gcs;
|
mod gcs;
|
||||||
mod oss;
|
mod oss;
|
||||||
mod s3;
|
mod s3;
|
||||||
|
|||||||
@@ -24,7 +24,8 @@ use crate::config::FileConfig;
|
|||||||
use crate::error::{self, Result};
|
use crate::error::{self, Result};
|
||||||
use crate::store;
|
use crate::store;
|
||||||
|
|
||||||
pub(crate) async fn new_fs_object_store(
|
/// A helper function to create a file system object store.
|
||||||
|
pub async fn new_fs_object_store(
|
||||||
data_home: &str,
|
data_home: &str,
|
||||||
_file_config: &FileConfig,
|
_file_config: &FileConfig,
|
||||||
) -> Result<ObjectStore> {
|
) -> Result<ObjectStore> {
|
||||||
|
|||||||
23
src/ingester/Cargo.toml
Normal file
23
src/ingester/Cargo.toml
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
[package]
|
||||||
|
name = "ingester"
|
||||||
|
version.workspace = true
|
||||||
|
edition.workspace = true
|
||||||
|
license.workspace = true
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
clap.workspace = true
|
||||||
|
common-telemetry.workspace = true
|
||||||
|
common-time.workspace = true
|
||||||
|
datanode.workspace = true
|
||||||
|
meta-client.workspace = true
|
||||||
|
mito2.workspace = true
|
||||||
|
object-store.workspace = true
|
||||||
|
reqwest.workspace = true
|
||||||
|
serde.workspace = true
|
||||||
|
serde_json.workspace = true
|
||||||
|
sst-convert.workspace = true
|
||||||
|
tokio.workspace = true
|
||||||
|
toml.workspace = true
|
||||||
|
|
||||||
|
[lints]
|
||||||
|
workspace = true
|
||||||
294
src/ingester/src/main.rs
Normal file
294
src/ingester/src/main.rs
Normal file
@@ -0,0 +1,294 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use clap::Parser;
|
||||||
|
use common_telemetry::info;
|
||||||
|
use common_time::timestamp::TimeUnit;
|
||||||
|
use datanode::config::StorageConfig;
|
||||||
|
use meta_client::MetaClientOptions;
|
||||||
|
use mito2::config::MitoConfig;
|
||||||
|
use mito2::sst::file::IndexType;
|
||||||
|
use mito2::sst::parquet::SstInfo;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use sst_convert::converter::{InputFile, InputFileType, SstConverterBuilder};
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
#[command(version, about = "Greptime Ingester", long_about = None)]
|
||||||
|
struct Args {
|
||||||
|
/// Input directory
|
||||||
|
#[arg(short, long)]
|
||||||
|
input_dir: String,
|
||||||
|
/// Directory of input parquet files, relative to input_dir
|
||||||
|
#[arg(short, long)]
|
||||||
|
parquet_dir: Option<String>,
|
||||||
|
/// Directory of input json files, relative to input_dir
|
||||||
|
#[arg(short, long)]
|
||||||
|
remote_write_dir: Option<String>,
|
||||||
|
/// Config file
|
||||||
|
#[arg(short, long)]
|
||||||
|
cfg: String,
|
||||||
|
/// DB HTTP address
|
||||||
|
#[arg(short, long)]
|
||||||
|
db_http_addr: String,
|
||||||
|
|
||||||
|
/// Output path for the converted SST files.
|
||||||
|
/// If it is not None, the converted SST files will be written to the specified path
|
||||||
|
/// in the `input_store`.
|
||||||
|
/// This is for debugging purposes.
|
||||||
|
#[arg(short, long)]
|
||||||
|
sst_output_path: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
|
struct IngesterConfig {
|
||||||
|
meta_client: MetaClientOptions,
|
||||||
|
storage: StorageConfig,
|
||||||
|
mito: MitoConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub const APP_NAME: &str = "greptime-ingester";
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let _guard = common_telemetry::init_global_logging(
|
||||||
|
APP_NAME,
|
||||||
|
&Default::default(),
|
||||||
|
&Default::default(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
let args = Args::parse();
|
||||||
|
|
||||||
|
let cfg_file = std::fs::read_to_string(&args.cfg).expect("Failed to read config file");
|
||||||
|
let cfg: IngesterConfig = toml::from_str(&cfg_file).expect("Failed to parse config");
|
||||||
|
|
||||||
|
let sst_builder = {
|
||||||
|
let mut builder = SstConverterBuilder::new_fs(args.input_dir)
|
||||||
|
.with_meta_options(cfg.meta_client)
|
||||||
|
.with_storage_config(cfg.storage)
|
||||||
|
.with_config(cfg.mito);
|
||||||
|
|
||||||
|
if let Some(output_path) = args.sst_output_path {
|
||||||
|
builder = builder.with_output_path(output_path);
|
||||||
|
}
|
||||||
|
|
||||||
|
builder
|
||||||
|
};
|
||||||
|
|
||||||
|
let sst_converter = sst_builder
|
||||||
|
.clone()
|
||||||
|
.build()
|
||||||
|
.await
|
||||||
|
.expect("Failed to build sst converter");
|
||||||
|
|
||||||
|
let input_store = sst_converter.input_store.clone();
|
||||||
|
|
||||||
|
if let Some(parquet_dir) = args.parquet_dir {
|
||||||
|
// using opendal to read parquet files in given input object store
|
||||||
|
let all_parquets = input_store
|
||||||
|
.list(&parquet_dir)
|
||||||
|
.await
|
||||||
|
.expect("Failed to list parquet files");
|
||||||
|
info!("Listed all files in parquet directory: {:?}", all_parquets);
|
||||||
|
let all_parquets = all_parquets
|
||||||
|
.iter()
|
||||||
|
.filter(|parquet| parquet.name().ends_with(".parquet") && parquet.metadata().is_file())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let input_files = all_parquets
|
||||||
|
.iter()
|
||||||
|
.map(|parquet| {
|
||||||
|
let full_table_name = parquet.name().split("-").next().unwrap();
|
||||||
|
let (catalog_name, schema_name, table_name) = extract_name(full_table_name);
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"catalog: {}, schema: {}, table: {}",
|
||||||
|
catalog_name, schema_name, table_name
|
||||||
|
);
|
||||||
|
|
||||||
|
InputFile {
|
||||||
|
catalog: catalog_name.to_string(),
|
||||||
|
schema: schema_name.to_string(),
|
||||||
|
table: table_name.to_string(),
|
||||||
|
path: parquet.path().to_string(),
|
||||||
|
file_type: InputFileType::Parquet,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
convert_and_send(&input_files, sst_builder.clone(), &args.db_http_addr).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(remote_write_dir) = args.remote_write_dir {
|
||||||
|
// using opendal to read parquet files in given input object store
|
||||||
|
let all_parquets = input_store
|
||||||
|
.list(&remote_write_dir)
|
||||||
|
.await
|
||||||
|
.expect("Failed to list parquet files");
|
||||||
|
|
||||||
|
let all_parquets = all_parquets
|
||||||
|
.iter()
|
||||||
|
.filter(|parquet| parquet.name().ends_with(".parquet") && parquet.metadata().is_file())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let input_files = all_parquets
|
||||||
|
.iter()
|
||||||
|
.map(|parquet| {
|
||||||
|
let full_table_name = parquet.name().split("-").next().unwrap();
|
||||||
|
let (catalog_name, schema_name, table_name) = extract_name(full_table_name);
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"catalog: {}, schema: {}, table: {}",
|
||||||
|
catalog_name, schema_name, table_name
|
||||||
|
);
|
||||||
|
InputFile {
|
||||||
|
catalog: catalog_name.to_string(),
|
||||||
|
schema: schema_name.to_string(),
|
||||||
|
table: table_name.to_string(),
|
||||||
|
path: parquet.path().to_string(),
|
||||||
|
file_type: InputFileType::RemoteWrite,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
convert_and_send(&input_files, sst_builder.clone(), &args.db_http_addr).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn convert_and_send(
|
||||||
|
input_files: &[InputFile],
|
||||||
|
sst_builder: SstConverterBuilder,
|
||||||
|
db_http_addr: &str,
|
||||||
|
) {
|
||||||
|
let table_names = input_files
|
||||||
|
.iter()
|
||||||
|
.map(|f| (f.schema.clone(), f.table.clone()))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let mut rxs = Vec::new();
|
||||||
|
|
||||||
|
// Spawn a task for each input file
|
||||||
|
info!("Spawning tasks for {} input files", input_files.len());
|
||||||
|
for input_file in input_files.iter() {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let sst_builder = sst_builder.clone();
|
||||||
|
let input_file = (*input_file).clone();
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
let mut sst_converter = sst_builder
|
||||||
|
.build()
|
||||||
|
.await
|
||||||
|
.expect("Failed to build sst converter");
|
||||||
|
let sst_info = sst_converter
|
||||||
|
.convert_one(&input_file)
|
||||||
|
.await
|
||||||
|
.expect("Failed to convert parquet files");
|
||||||
|
tx.send(sst_info).unwrap();
|
||||||
|
});
|
||||||
|
rxs.push(rx);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut sst_infos = Vec::new();
|
||||||
|
for rx in rxs {
|
||||||
|
sst_infos.push(rx.await.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Converted {} input files", sst_infos.len());
|
||||||
|
|
||||||
|
let ingest_reqs = table_names
|
||||||
|
.iter()
|
||||||
|
.zip(sst_infos.iter())
|
||||||
|
.flat_map(|(schema_name, sst_info)| {
|
||||||
|
sst_info
|
||||||
|
.ssts
|
||||||
|
.iter()
|
||||||
|
.map(|sst| to_ingest_sst_req(&schema_name.0, &schema_name.1, sst))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
// send ingest requests to DB
|
||||||
|
send_ingest_requests(db_http_addr, ingest_reqs)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn extract_name(full_table_name: &str) -> (String, String, String) {
|
||||||
|
let mut names = full_table_name.split('.').rev();
|
||||||
|
let table_name = names.next().unwrap();
|
||||||
|
let schema_name = names.next().unwrap_or("public");
|
||||||
|
let catalog_name = names.next().unwrap_or("greptime");
|
||||||
|
(
|
||||||
|
catalog_name.to_string(),
|
||||||
|
schema_name.to_string(),
|
||||||
|
table_name.to_string(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_ingest_requests(
|
||||||
|
addr: &str,
|
||||||
|
reqs: Vec<ClientIngestSstRequest>,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
for req in reqs {
|
||||||
|
info!("ingesting sst: {req:?}");
|
||||||
|
let req = client.post(addr).json(&req);
|
||||||
|
let resp = req.send().await?;
|
||||||
|
info!("ingest response: {resp:?}");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub(crate) struct ClientIngestSstRequest {
|
||||||
|
schema: Option<String>,
|
||||||
|
table: String,
|
||||||
|
pub(crate) file_id: String,
|
||||||
|
pub(crate) min_ts: i64,
|
||||||
|
pub(crate) max_ts: i64,
|
||||||
|
pub(crate) file_size: u64,
|
||||||
|
pub(crate) rows: u32,
|
||||||
|
pub(crate) row_groups: u32,
|
||||||
|
/// Available indexes of the file.
|
||||||
|
pub available_indexes: Vec<IndexType>,
|
||||||
|
/// Size of the index file.
|
||||||
|
pub index_file_size: u64,
|
||||||
|
pub time_unit: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_ingest_sst_req(
|
||||||
|
schema_name: &str,
|
||||||
|
table_name: &str,
|
||||||
|
sst_info: &SstInfo,
|
||||||
|
) -> ClientIngestSstRequest {
|
||||||
|
let index_file_size = sst_info.index_metadata.file_size;
|
||||||
|
let available_indexs = sst_info.index_metadata.build_available_indexes();
|
||||||
|
ClientIngestSstRequest {
|
||||||
|
schema: Some(schema_name.to_string()),
|
||||||
|
table: table_name.to_string(),
|
||||||
|
file_id: sst_info.file_id.to_string(),
|
||||||
|
min_ts: sst_info.time_range.0.value(),
|
||||||
|
max_ts: sst_info.time_range.1.value(),
|
||||||
|
file_size: sst_info.file_size,
|
||||||
|
rows: sst_info.num_rows as _,
|
||||||
|
row_groups: sst_info.num_row_groups as _,
|
||||||
|
available_indexes: available_indexs.to_vec(),
|
||||||
|
index_file_size,
|
||||||
|
time_unit: match sst_info.time_range.0.unit() {
|
||||||
|
TimeUnit::Second => 0,
|
||||||
|
TimeUnit::Millisecond => 3,
|
||||||
|
TimeUnit::Microsecond => 6,
|
||||||
|
TimeUnit::Nanosecond => 9,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -59,7 +59,7 @@ pub mod engine;
|
|||||||
pub mod error;
|
pub mod error;
|
||||||
mod metadata_region;
|
mod metadata_region;
|
||||||
mod metrics;
|
mod metrics;
|
||||||
mod row_modifier;
|
pub mod row_modifier;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test_util;
|
mod test_util;
|
||||||
mod utils;
|
mod utils;
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ const TSID_HASH_SEED: u32 = 846793005;
|
|||||||
///
|
///
|
||||||
/// - For [`PrimaryKeyEncoding::Dense`] encoding,
|
/// - For [`PrimaryKeyEncoding::Dense`] encoding,
|
||||||
/// it adds two columns(`__table_id`, `__tsid`) to the row.
|
/// it adds two columns(`__table_id`, `__tsid`) to the row.
|
||||||
pub struct RowModifier {
|
pub(crate) struct RowModifier {
|
||||||
codec: SparsePrimaryKeyCodec,
|
codec: SparsePrimaryKeyCodec,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -52,7 +52,7 @@ impl RowModifier {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Modify rows with the given primary key encoding.
|
/// Modify rows with the given primary key encoding.
|
||||||
pub fn modify_rows(
|
pub(crate) fn modify_rows(
|
||||||
&self,
|
&self,
|
||||||
iter: RowsIter,
|
iter: RowsIter,
|
||||||
table_id: TableId,
|
table_id: TableId,
|
||||||
@@ -145,16 +145,14 @@ impl RowModifier {
|
|||||||
|
|
||||||
/// Fills internal columns of a row with table name and a hash of tag values.
|
/// Fills internal columns of a row with table name and a hash of tag values.
|
||||||
fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
|
fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
|
||||||
let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED);
|
let mut hasher = TsidGenerator::default();
|
||||||
for (name, value) in iter.primary_keys_with_name() {
|
for (name, value) in iter.primary_keys_with_name() {
|
||||||
// The type is checked before. So only null is ignored.
|
// The type is checked before. So only null is ignored.
|
||||||
if let Some(ValueData::StringValue(string)) = &value.value_data {
|
if let Some(ValueData::StringValue(string)) = &value.value_data {
|
||||||
name.hash(&mut hasher);
|
hasher.write_label(name, string);
|
||||||
string.hash(&mut hasher);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TSID is 64 bits, simply truncate the 128 bits hash
|
let hash = hasher.finish();
|
||||||
let (hash, _) = hasher.finish128();
|
|
||||||
|
|
||||||
(
|
(
|
||||||
ValueData::U32Value(table_id).into(),
|
ValueData::U32Value(table_id).into(),
|
||||||
@@ -163,6 +161,34 @@ impl RowModifier {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Tsid generator.
|
||||||
|
pub struct TsidGenerator {
|
||||||
|
hasher: mur3::Hasher128,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for TsidGenerator {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TsidGenerator {
|
||||||
|
/// Writes a label pair to the generator.
|
||||||
|
pub fn write_label(&mut self, name: &str, value: &str) {
|
||||||
|
name.hash(&mut self.hasher);
|
||||||
|
value.hash(&mut self.hasher);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generates a new TSID.
|
||||||
|
pub fn finish(&mut self) -> u64 {
|
||||||
|
// TSID is 64 bits, simply truncate the 128 bits hash
|
||||||
|
let (hash, _) = self.hasher.finish128();
|
||||||
|
hash
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Index of a value.
|
/// Index of a value.
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
struct ValueIndex {
|
struct ValueIndex {
|
||||||
|
|||||||
@@ -121,7 +121,7 @@ impl AccessLayer {
|
|||||||
/// Writes a SST with specific `file_id` and `metadata` to the layer.
|
/// Writes a SST with specific `file_id` and `metadata` to the layer.
|
||||||
///
|
///
|
||||||
/// Returns the info of the SST. If no data written, returns None.
|
/// Returns the info of the SST. If no data written, returns None.
|
||||||
pub(crate) async fn write_sst(
|
pub async fn write_sst(
|
||||||
&self,
|
&self,
|
||||||
request: SstWriteRequest,
|
request: SstWriteRequest,
|
||||||
write_opts: &WriteOptions,
|
write_opts: &WriteOptions,
|
||||||
@@ -191,26 +191,26 @@ impl AccessLayer {
|
|||||||
|
|
||||||
/// `OperationType` represents the origin of the `SstWriteRequest`.
|
/// `OperationType` represents the origin of the `SstWriteRequest`.
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||||
pub(crate) enum OperationType {
|
pub enum OperationType {
|
||||||
Flush,
|
Flush,
|
||||||
Compact,
|
Compact,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Contents to build a SST.
|
/// Contents to build a SST.
|
||||||
pub(crate) struct SstWriteRequest {
|
pub struct SstWriteRequest {
|
||||||
pub(crate) op_type: OperationType,
|
pub op_type: OperationType,
|
||||||
pub(crate) metadata: RegionMetadataRef,
|
pub metadata: RegionMetadataRef,
|
||||||
pub(crate) source: Source,
|
pub source: Source,
|
||||||
pub(crate) cache_manager: CacheManagerRef,
|
pub cache_manager: CacheManagerRef,
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub(crate) storage: Option<String>,
|
pub storage: Option<String>,
|
||||||
pub(crate) max_sequence: Option<SequenceNumber>,
|
pub max_sequence: Option<SequenceNumber>,
|
||||||
|
|
||||||
/// Configs for index
|
/// Configs for index
|
||||||
pub(crate) index_options: IndexOptions,
|
pub index_options: IndexOptions,
|
||||||
pub(crate) inverted_index_config: InvertedIndexConfig,
|
pub inverted_index_config: InvertedIndexConfig,
|
||||||
pub(crate) fulltext_index_config: FulltextIndexConfig,
|
pub fulltext_index_config: FulltextIndexConfig,
|
||||||
pub(crate) bloom_filter_index_config: BloomFilterConfig,
|
pub bloom_filter_index_config: BloomFilterConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
|
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
|
|||||||
pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
|
pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
|
||||||
|
|
||||||
/// Configuration for [MitoEngine](crate::engine::MitoEngine).
|
/// Configuration for [MitoEngine](crate::engine::MitoEngine).
|
||||||
|
/// Before using the config, make sure to call `MitoConfig::validate()` to check if the config is valid.
|
||||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub struct MitoConfig {
|
pub struct MitoConfig {
|
||||||
|
|||||||
@@ -42,6 +42,13 @@ use crate::worker::WorkerId;
|
|||||||
#[snafu(visibility(pub))]
|
#[snafu(visibility(pub))]
|
||||||
#[stack_trace_debug]
|
#[stack_trace_debug]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
#[snafu(display("External error, context: {}", context))]
|
||||||
|
External {
|
||||||
|
source: BoxedError,
|
||||||
|
context: String,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
#[snafu(display("Failed to encode sparse primary key, reason: {}", reason))]
|
#[snafu(display("Failed to encode sparse primary key, reason: {}", reason))]
|
||||||
EncodeSparsePrimaryKey {
|
EncodeSparsePrimaryKey {
|
||||||
reason: String,
|
reason: String,
|
||||||
@@ -1085,7 +1092,7 @@ impl ErrorExt for Error {
|
|||||||
| PuffinPurgeStager { source, .. } => source.status_code(),
|
| PuffinPurgeStager { source, .. } => source.status_code(),
|
||||||
CleanDir { .. } => StatusCode::Unexpected,
|
CleanDir { .. } => StatusCode::Unexpected,
|
||||||
InvalidConfig { .. } => StatusCode::InvalidArguments,
|
InvalidConfig { .. } => StatusCode::InvalidArguments,
|
||||||
StaleLogEntry { .. } => StatusCode::Unexpected,
|
StaleLogEntry { .. } | External { .. } => StatusCode::Unexpected,
|
||||||
|
|
||||||
FilterRecordBatch { source, .. } => source.status_code(),
|
FilterRecordBatch { source, .. } => source.status_code(),
|
||||||
|
|
||||||
|
|||||||
@@ -23,8 +23,8 @@
|
|||||||
#[cfg_attr(feature = "test", allow(unused))]
|
#[cfg_attr(feature = "test", allow(unused))]
|
||||||
pub mod test_util;
|
pub mod test_util;
|
||||||
|
|
||||||
mod access_layer;
|
pub mod access_layer;
|
||||||
mod cache;
|
pub mod cache;
|
||||||
pub mod compaction;
|
pub mod compaction;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod engine;
|
pub mod engine;
|
||||||
|
|||||||
@@ -14,7 +14,7 @@
|
|||||||
|
|
||||||
//! Mito region.
|
//! Mito region.
|
||||||
|
|
||||||
pub(crate) mod opener;
|
pub mod opener;
|
||||||
pub mod options;
|
pub mod options;
|
||||||
pub(crate) mod version;
|
pub(crate) mod version;
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,7 @@
|
|||||||
//! Region opener.
|
//! Region opener.
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::atomic::AtomicI64;
|
use std::sync::atomic::{AtomicI64, AtomicU64};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use common_telemetry::{debug, error, info, warn};
|
use common_telemetry::{debug, error, info, warn};
|
||||||
@@ -27,7 +27,9 @@ use object_store::util::{join_dir, normalize_dir};
|
|||||||
use snafu::{ensure, OptionExt, ResultExt};
|
use snafu::{ensure, OptionExt, ResultExt};
|
||||||
use store_api::logstore::provider::Provider;
|
use store_api::logstore::provider::Provider;
|
||||||
use store_api::logstore::LogStore;
|
use store_api::logstore::LogStore;
|
||||||
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
|
use store_api::metadata::{
|
||||||
|
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
|
||||||
|
};
|
||||||
use store_api::region_engine::RegionRole;
|
use store_api::region_engine::RegionRole;
|
||||||
use store_api::storage::{ColumnId, RegionId};
|
use store_api::storage::{ColumnId, RegionId};
|
||||||
|
|
||||||
@@ -38,6 +40,7 @@ use crate::error::{
|
|||||||
EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
|
EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
|
||||||
Result, StaleLogEntrySnafu,
|
Result, StaleLogEntrySnafu,
|
||||||
};
|
};
|
||||||
|
use crate::manifest::action::RegionManifest;
|
||||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
||||||
use crate::manifest::storage::manifest_compress_type;
|
use crate::manifest::storage::manifest_compress_type;
|
||||||
use crate::memtable::time_partition::TimePartitions;
|
use crate::memtable::time_partition::TimePartitions;
|
||||||
@@ -203,11 +206,16 @@ impl RegionOpener {
|
|||||||
}
|
}
|
||||||
// Safety: must be set before calling this method.
|
// Safety: must be set before calling this method.
|
||||||
let options = self.options.take().unwrap();
|
let options = self.options.take().unwrap();
|
||||||
let object_store = self.object_store(&options.storage)?.clone();
|
let object_store = get_object_store(&options.storage, &self.object_store_manager)?.clone();
|
||||||
let provider = self.provider(&options.wal_options);
|
let provider = self.provider(&options.wal_options);
|
||||||
let metadata = Arc::new(metadata);
|
let metadata = Arc::new(metadata);
|
||||||
// Create a manifest manager for this region and writes regions to the manifest file.
|
// Create a manifest manager for this region and writes regions to the manifest file.
|
||||||
let region_manifest_options = self.manifest_options(config, &options)?;
|
let region_manifest_options = Self::manifest_options(
|
||||||
|
config,
|
||||||
|
&options,
|
||||||
|
&self.region_dir,
|
||||||
|
&self.object_store_manager,
|
||||||
|
)?;
|
||||||
let manifest_manager = RegionManifestManager::new(
|
let manifest_manager = RegionManifestManager::new(
|
||||||
metadata.clone(),
|
metadata.clone(),
|
||||||
region_manifest_options,
|
region_manifest_options,
|
||||||
@@ -312,7 +320,12 @@ impl RegionOpener {
|
|||||||
) -> Result<Option<MitoRegion>> {
|
) -> Result<Option<MitoRegion>> {
|
||||||
let region_options = self.options.as_ref().unwrap().clone();
|
let region_options = self.options.as_ref().unwrap().clone();
|
||||||
|
|
||||||
let region_manifest_options = self.manifest_options(config, ®ion_options)?;
|
let region_manifest_options = Self::manifest_options(
|
||||||
|
config,
|
||||||
|
®ion_options,
|
||||||
|
&self.region_dir,
|
||||||
|
&self.object_store_manager,
|
||||||
|
)?;
|
||||||
let Some(manifest_manager) = RegionManifestManager::open(
|
let Some(manifest_manager) = RegionManifestManager::open(
|
||||||
region_manifest_options,
|
region_manifest_options,
|
||||||
self.stats.total_manifest_size.clone(),
|
self.stats.total_manifest_size.clone(),
|
||||||
@@ -332,7 +345,7 @@ impl RegionOpener {
|
|||||||
.take()
|
.take()
|
||||||
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
|
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
|
||||||
let on_region_opened = wal.on_region_opened();
|
let on_region_opened = wal.on_region_opened();
|
||||||
let object_store = self.object_store(®ion_options.storage)?.clone();
|
let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?;
|
||||||
|
|
||||||
debug!("Open region {} with options: {:?}", region_id, self.options);
|
debug!("Open region {} with options: {:?}", region_id, self.options);
|
||||||
|
|
||||||
@@ -422,13 +435,14 @@ impl RegionOpener {
|
|||||||
|
|
||||||
/// Returns a new manifest options.
|
/// Returns a new manifest options.
|
||||||
fn manifest_options(
|
fn manifest_options(
|
||||||
&self,
|
|
||||||
config: &MitoConfig,
|
config: &MitoConfig,
|
||||||
options: &RegionOptions,
|
options: &RegionOptions,
|
||||||
|
region_dir: &str,
|
||||||
|
object_store_manager: &ObjectStoreManagerRef,
|
||||||
) -> Result<RegionManifestOptions> {
|
) -> Result<RegionManifestOptions> {
|
||||||
let object_store = self.object_store(&options.storage)?.clone();
|
let object_store = get_object_store(&options.storage, object_store_manager)?;
|
||||||
Ok(RegionManifestOptions {
|
Ok(RegionManifestOptions {
|
||||||
manifest_dir: new_manifest_dir(&self.region_dir),
|
manifest_dir: new_manifest_dir(region_dir),
|
||||||
object_store,
|
object_store,
|
||||||
// We don't allow users to set the compression algorithm as we use it as a file suffix.
|
// We don't allow users to set the compression algorithm as we use it as a file suffix.
|
||||||
// Currently, the manifest storage doesn't have good support for changing compression algorithms.
|
// Currently, the manifest storage doesn't have good support for changing compression algorithms.
|
||||||
@@ -436,20 +450,72 @@ impl RegionOpener {
|
|||||||
checkpoint_distance: config.manifest_checkpoint_distance,
|
checkpoint_distance: config.manifest_checkpoint_distance,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
|
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
|
||||||
fn object_store(&self, name: &Option<String>) -> Result<&object_store::ObjectStore> {
|
pub fn get_object_store(
|
||||||
if let Some(name) = name {
|
name: &Option<String>,
|
||||||
Ok(self
|
object_store_manager: &ObjectStoreManagerRef,
|
||||||
.object_store_manager
|
) -> Result<object_store::ObjectStore> {
|
||||||
.find(name)
|
if let Some(name) = name {
|
||||||
.context(ObjectStoreNotFoundSnafu {
|
Ok(object_store_manager
|
||||||
object_store: name.to_string(),
|
.find(name)
|
||||||
})?)
|
.context(ObjectStoreNotFoundSnafu {
|
||||||
} else {
|
object_store: name.to_string(),
|
||||||
Ok(self.object_store_manager.default_object_store())
|
})?
|
||||||
|
.clone())
|
||||||
|
} else {
|
||||||
|
Ok(object_store_manager.default_object_store().clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A loader for loading metadata from a region dir.
|
||||||
|
pub struct RegionMetadataLoader {
|
||||||
|
config: Arc<MitoConfig>,
|
||||||
|
object_store_manager: ObjectStoreManagerRef,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RegionMetadataLoader {
|
||||||
|
/// Creates a new `RegionOpenerBuilder`.
|
||||||
|
pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
|
||||||
|
Self {
|
||||||
|
config,
|
||||||
|
object_store_manager,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Loads the metadata of the region from the region dir.
|
||||||
|
pub async fn load(
|
||||||
|
&self,
|
||||||
|
region_dir: &str,
|
||||||
|
region_options: &RegionOptions,
|
||||||
|
) -> Result<Option<RegionMetadataRef>> {
|
||||||
|
let manifest = self.load_manifest(region_dir, region_options).await?;
|
||||||
|
Ok(manifest.map(|m| m.metadata.clone()))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Loads the manifest of the region from the region dir.
|
||||||
|
pub async fn load_manifest(
|
||||||
|
&self,
|
||||||
|
region_dir: &str,
|
||||||
|
region_options: &RegionOptions,
|
||||||
|
) -> Result<Option<Arc<RegionManifest>>> {
|
||||||
|
let region_manifest_options = RegionOpener::manifest_options(
|
||||||
|
&self.config,
|
||||||
|
region_options,
|
||||||
|
region_dir,
|
||||||
|
&self.object_store_manager,
|
||||||
|
)?;
|
||||||
|
let Some(manifest_manager) =
|
||||||
|
RegionManifestManager::open(region_manifest_options, Arc::new(AtomicU64::new(0)))
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
|
let manifest = manifest_manager.manifest();
|
||||||
|
Ok(Some(manifest))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Checks whether the recovered region has the same schema as region to create.
|
/// Checks whether the recovered region has the same schema as region to create.
|
||||||
|
|||||||
@@ -33,6 +33,8 @@ use crate::row_converter::dense::SortField;
|
|||||||
use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
|
use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
|
||||||
|
|
||||||
/// A codec for sparse key of metrics.
|
/// A codec for sparse key of metrics.
|
||||||
|
/// It requires the input primary key columns are sorted by the column name in lexicographical order.
|
||||||
|
/// It encodes the column id of the physical region.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct SparsePrimaryKeyCodec {
|
pub struct SparsePrimaryKeyCodec {
|
||||||
inner: Arc<SparsePrimaryKeyCodecInner>,
|
inner: Arc<SparsePrimaryKeyCodecInner>,
|
||||||
|
|||||||
@@ -16,9 +16,9 @@ pub(crate) mod bloom_filter;
|
|||||||
mod codec;
|
mod codec;
|
||||||
pub(crate) mod fulltext_index;
|
pub(crate) mod fulltext_index;
|
||||||
mod indexer;
|
mod indexer;
|
||||||
pub(crate) mod intermediate;
|
pub mod intermediate;
|
||||||
pub(crate) mod inverted_index;
|
pub(crate) mod inverted_index;
|
||||||
pub(crate) mod puffin_manager;
|
pub mod puffin_manager;
|
||||||
mod statistics;
|
mod statistics;
|
||||||
pub(crate) mod store;
|
pub(crate) mod store;
|
||||||
|
|
||||||
|
|||||||
@@ -49,6 +49,11 @@ impl IntermediateManager {
|
|||||||
/// Create a new `IntermediateManager` with the given root path.
|
/// Create a new `IntermediateManager` with the given root path.
|
||||||
/// It will clean up all garbage intermediate files from previous runs.
|
/// It will clean up all garbage intermediate files from previous runs.
|
||||||
pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
|
pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
|
||||||
|
common_telemetry::info!(
|
||||||
|
"Initializing intermediate manager, aux_path: {}",
|
||||||
|
aux_path.as_ref()
|
||||||
|
);
|
||||||
|
|
||||||
let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
|
let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
|
||||||
let store = InstrumentedStore::new(store);
|
let store = InstrumentedStore::new(store);
|
||||||
|
|
||||||
|
|||||||
@@ -61,6 +61,7 @@ impl Default for WriteOptions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Parquet SST info returned by the writer.
|
/// Parquet SST info returned by the writer.
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct SstInfo {
|
pub struct SstInfo {
|
||||||
/// SST file id.
|
/// SST file id.
|
||||||
pub file_id: FileId,
|
pub file_id: FileId,
|
||||||
|
|||||||
34
src/sst-convert/Cargo.toml
Normal file
34
src/sst-convert/Cargo.toml
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
[package]
|
||||||
|
name = "sst-convert"
|
||||||
|
version.workspace = true
|
||||||
|
edition.workspace = true
|
||||||
|
license.workspace = true
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
api.workspace = true
|
||||||
|
arrow-array.workspace = true
|
||||||
|
async-trait.workspace = true
|
||||||
|
catalog.workspace = true
|
||||||
|
common-error.workspace = true
|
||||||
|
common-macro.workspace = true
|
||||||
|
common-meta.workspace = true
|
||||||
|
common-recordbatch.workspace = true
|
||||||
|
common-telemetry.workspace = true
|
||||||
|
datanode.workspace = true
|
||||||
|
datatypes.workspace = true
|
||||||
|
futures.workspace = true
|
||||||
|
futures-util.workspace = true
|
||||||
|
indicatif = "0.17.0"
|
||||||
|
meta-client.workspace = true
|
||||||
|
metric-engine.workspace = true
|
||||||
|
mito2.workspace = true
|
||||||
|
object-store.workspace = true
|
||||||
|
parquet.workspace = true
|
||||||
|
parquet_opendal = "0.3.0"
|
||||||
|
prost.workspace = true
|
||||||
|
snafu.workspace = true
|
||||||
|
store-api.workspace = true
|
||||||
|
table.workspace = true
|
||||||
|
|
||||||
|
[lints]
|
||||||
|
workspace = true
|
||||||
232
src/sst-convert/src/converter.rs
Normal file
232
src/sst-convert/src/converter.rs
Normal file
@@ -0,0 +1,232 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
//! SST converter.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use common_telemetry::info;
|
||||||
|
use datanode::config::StorageConfig;
|
||||||
|
use datanode::datanode::DatanodeBuilder;
|
||||||
|
use meta_client::MetaClientOptions;
|
||||||
|
use mito2::access_layer::SstInfoArray;
|
||||||
|
use mito2::config::MitoConfig;
|
||||||
|
use mito2::read::Source;
|
||||||
|
use mito2::sst::parquet::WriteOptions;
|
||||||
|
use object_store::manager::ObjectStoreManagerRef;
|
||||||
|
use object_store::services::Fs;
|
||||||
|
use object_store::ObjectStore;
|
||||||
|
use snafu::ResultExt;
|
||||||
|
|
||||||
|
use crate::error::{DatanodeSnafu, MitoSnafu, ObjectStoreSnafu, Result};
|
||||||
|
use crate::reader::InputReaderBuilder;
|
||||||
|
use crate::table::TableMetadataHelper;
|
||||||
|
use crate::writer::RegionWriterBuilder;
|
||||||
|
|
||||||
|
/// Input file type.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub enum InputFileType {
|
||||||
|
/// File type is Parquet.
|
||||||
|
Parquet,
|
||||||
|
/// File type is remote write JSON.
|
||||||
|
RemoteWrite,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Description of a file to convert.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct InputFile {
|
||||||
|
/// Catalog of the table.
|
||||||
|
pub catalog: String,
|
||||||
|
/// Schema of the table.
|
||||||
|
pub schema: String,
|
||||||
|
/// Table to write.
|
||||||
|
/// For metric engine, it needs to be the physical table name.
|
||||||
|
pub table: String,
|
||||||
|
/// Path to the file.
|
||||||
|
pub path: String,
|
||||||
|
/// Type of the input file.
|
||||||
|
pub file_type: InputFileType,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Description of converted files for an input file.
|
||||||
|
/// A input file can be converted to multiple output files.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct OutputSst {
|
||||||
|
/// Meta of output SST files.
|
||||||
|
pub ssts: SstInfoArray,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// SST converter takes a list of source files and converts them to SST files.
|
||||||
|
pub struct SstConverter {
|
||||||
|
/// Object store for input files.
|
||||||
|
pub input_store: ObjectStore,
|
||||||
|
/// Output path for the converted SST files.
|
||||||
|
/// If it is not None, the converted SST files will be written to the specified path
|
||||||
|
/// in the `input_store`.
|
||||||
|
/// This is for debugging purposes.
|
||||||
|
output_path: Option<String>,
|
||||||
|
reader_builder: InputReaderBuilder,
|
||||||
|
writer_builder: RegionWriterBuilder,
|
||||||
|
write_opts: WriteOptions,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SstConverter {
|
||||||
|
/// Converts a list of input to a list of outputs.
|
||||||
|
pub async fn convert(&mut self, input: &[InputFile]) -> Result<Vec<OutputSst>> {
|
||||||
|
common_telemetry::info!("Converting input {} files", input.len());
|
||||||
|
|
||||||
|
let mut outputs = Vec::with_capacity(input.len());
|
||||||
|
let bar = indicatif::ProgressBar::new(input.len() as u64);
|
||||||
|
for file in input {
|
||||||
|
let output = self.convert_one(file).await?;
|
||||||
|
outputs.push(output);
|
||||||
|
bar.inc(1);
|
||||||
|
}
|
||||||
|
bar.finish();
|
||||||
|
|
||||||
|
common_telemetry::info!("Converted {} files", outputs.len());
|
||||||
|
|
||||||
|
Ok(outputs)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Converts one input.
|
||||||
|
pub async fn convert_one(&mut self, input: &InputFile) -> Result<OutputSst> {
|
||||||
|
common_telemetry::info!(
|
||||||
|
"Converting input file, input_path: {}, output_path: {}",
|
||||||
|
input.path,
|
||||||
|
self.output_path.as_deref().unwrap_or(&input.path)
|
||||||
|
);
|
||||||
|
|
||||||
|
let reader_info = self.reader_builder.read_input(input).await?;
|
||||||
|
let source = Source::Reader(reader_info.reader);
|
||||||
|
let output_dir = self
|
||||||
|
.output_path
|
||||||
|
.as_deref()
|
||||||
|
.unwrap_or(&reader_info.region_dir);
|
||||||
|
let writer = self
|
||||||
|
.writer_builder
|
||||||
|
.build(reader_info.metadata, output_dir, reader_info.region_options)
|
||||||
|
.await
|
||||||
|
.context(MitoSnafu)?;
|
||||||
|
|
||||||
|
let ssts = writer
|
||||||
|
.write_sst(source, &self.write_opts)
|
||||||
|
.await
|
||||||
|
.context(MitoSnafu)?;
|
||||||
|
common_telemetry::info!("Converted input file, input_path: {}", input.path);
|
||||||
|
Ok(OutputSst { ssts })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builder to build a SST converter.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct SstConverterBuilder {
|
||||||
|
input_path: String,
|
||||||
|
meta_options: MetaClientOptions,
|
||||||
|
storage_config: StorageConfig,
|
||||||
|
output_path: Option<String>,
|
||||||
|
config: MitoConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SstConverterBuilder {
|
||||||
|
/// Creates a new builder with a file system path as input.
|
||||||
|
pub fn new_fs(input_path: String) -> Self {
|
||||||
|
Self {
|
||||||
|
input_path,
|
||||||
|
meta_options: MetaClientOptions::default(),
|
||||||
|
storage_config: StorageConfig::default(),
|
||||||
|
output_path: None,
|
||||||
|
config: MitoConfig::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attaches the meta client options.
|
||||||
|
pub fn with_meta_options(mut self, meta_options: MetaClientOptions) -> Self {
|
||||||
|
self.meta_options = meta_options;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attaches the storage config.
|
||||||
|
pub fn with_storage_config(mut self, storage_config: StorageConfig) -> Self {
|
||||||
|
self.storage_config = storage_config;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets the output path for the converted SST files.
|
||||||
|
/// This is for debugging purposes.
|
||||||
|
pub fn with_output_path(mut self, output_path: String) -> Self {
|
||||||
|
self.output_path = Some(output_path);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets the config for the converted SST files.
|
||||||
|
pub fn with_config(mut self, config: MitoConfig) -> Self {
|
||||||
|
self.config = config;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds a SST converter.
|
||||||
|
pub async fn build(mut self) -> Result<SstConverter> {
|
||||||
|
self.config
|
||||||
|
.sanitize(&self.storage_config.data_home)
|
||||||
|
.context(MitoSnafu)?;
|
||||||
|
|
||||||
|
common_telemetry::info!(
|
||||||
|
"Building SST converter, input_path: {}, storage_config: {:?}, config: {:?}",
|
||||||
|
self.input_path,
|
||||||
|
self.storage_config,
|
||||||
|
self.config
|
||||||
|
);
|
||||||
|
|
||||||
|
let input_store = new_input_store(&self.input_path).await?;
|
||||||
|
let output_store_manager = new_object_store_manager(&self.storage_config).await?;
|
||||||
|
let table_helper = TableMetadataHelper::new(&self.meta_options).await?;
|
||||||
|
let config = Arc::new(self.config);
|
||||||
|
let reader_builder = InputReaderBuilder::new(
|
||||||
|
input_store.clone(),
|
||||||
|
table_helper,
|
||||||
|
output_store_manager.clone(),
|
||||||
|
config.clone(),
|
||||||
|
);
|
||||||
|
let writer_builder = RegionWriterBuilder::new(config, output_store_manager)
|
||||||
|
.await
|
||||||
|
.context(MitoSnafu)?;
|
||||||
|
|
||||||
|
Ok(SstConverter {
|
||||||
|
input_store,
|
||||||
|
output_path: self.output_path,
|
||||||
|
reader_builder,
|
||||||
|
writer_builder,
|
||||||
|
write_opts: WriteOptions::default(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A hepler function to create the object store manager.
|
||||||
|
pub async fn new_object_store_manager(config: &StorageConfig) -> Result<ObjectStoreManagerRef> {
|
||||||
|
DatanodeBuilder::build_object_store_manager(config)
|
||||||
|
.await
|
||||||
|
.context(DatanodeSnafu)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a input store from a path.
|
||||||
|
pub async fn new_input_store(path: &str) -> Result<ObjectStore> {
|
||||||
|
let builder = Fs::default().root(path);
|
||||||
|
info!("Creating input store, path: {}", path);
|
||||||
|
let object_store = ObjectStore::new(builder)
|
||||||
|
.context(ObjectStoreSnafu)?
|
||||||
|
.finish();
|
||||||
|
info!("Created input store: {:?}", object_store);
|
||||||
|
Ok(object_store)
|
||||||
|
}
|
||||||
111
src/sst-convert/src/error.rs
Normal file
111
src/sst-convert/src/error.rs
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
//! Errors for SST conversion.
|
||||||
|
|
||||||
|
use common_error::ext::ErrorExt;
|
||||||
|
use common_error::status_code::StatusCode;
|
||||||
|
use common_macro::stack_trace_debug;
|
||||||
|
use snafu::{Location, Snafu};
|
||||||
|
|
||||||
|
#[derive(Snafu)]
|
||||||
|
#[snafu(visibility(pub))]
|
||||||
|
#[stack_trace_debug]
|
||||||
|
pub enum Error {
|
||||||
|
#[snafu(display("Object store error"))]
|
||||||
|
ObjectStore {
|
||||||
|
#[snafu(source)]
|
||||||
|
error: object_store::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Missing __name__ label"))]
|
||||||
|
MissingMetricName {
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Table not found: {}", table_name))]
|
||||||
|
MissingTable {
|
||||||
|
table_name: String,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Column not found: {}", column_name))]
|
||||||
|
MissingColumn {
|
||||||
|
column_name: String,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Mito error"))]
|
||||||
|
Mito {
|
||||||
|
source: mito2::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Datanode error"))]
|
||||||
|
Datanode {
|
||||||
|
source: datanode::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Meta error"))]
|
||||||
|
Meta {
|
||||||
|
source: common_meta::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Meta client error"))]
|
||||||
|
MetaClient {
|
||||||
|
source: meta_client::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Decode error"))]
|
||||||
|
Decode {
|
||||||
|
#[snafu(source)]
|
||||||
|
error: prost::DecodeError,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
|
||||||
|
impl ErrorExt for Error {
|
||||||
|
fn status_code(&self) -> StatusCode {
|
||||||
|
match self {
|
||||||
|
Error::ObjectStore { .. } => StatusCode::StorageUnavailable,
|
||||||
|
Error::MissingMetricName { .. } => StatusCode::InvalidArguments,
|
||||||
|
Error::MissingTable { .. } => StatusCode::TableNotFound,
|
||||||
|
Error::MissingColumn { .. } => StatusCode::TableColumnNotFound,
|
||||||
|
Error::Mito { source, .. } => source.status_code(),
|
||||||
|
Error::Datanode { source, .. } => source.status_code(),
|
||||||
|
Error::Meta { source, .. } => source.status_code(),
|
||||||
|
Error::MetaClient { source, .. } => source.status_code(),
|
||||||
|
Error::Decode { .. } => StatusCode::InvalidArguments,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn as_any(&self) -> &dyn std::any::Any {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
21
src/sst-convert/src/lib.rs
Normal file
21
src/sst-convert/src/lib.rs
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
pub mod converter;
|
||||||
|
pub mod error;
|
||||||
|
pub mod reader;
|
||||||
|
mod table;
|
||||||
|
pub mod writer;
|
||||||
|
|
||||||
|
pub use reader::parquet::{OpenDALParquetReader, RawParquetReader};
|
||||||
183
src/sst-convert/src/reader.rs
Normal file
183
src/sst-convert/src/reader.rs
Normal file
@@ -0,0 +1,183 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
//! Reader to read input data in different formats.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use mito2::config::MitoConfig;
|
||||||
|
use mito2::read::BoxedBatchReader;
|
||||||
|
use mito2::region::opener::RegionMetadataLoader;
|
||||||
|
use mito2::region::options::RegionOptions;
|
||||||
|
use object_store::manager::ObjectStoreManagerRef;
|
||||||
|
use object_store::util::join_dir;
|
||||||
|
use object_store::ObjectStore;
|
||||||
|
use snafu::{OptionExt, ResultExt};
|
||||||
|
use store_api::metadata::RegionMetadataRef;
|
||||||
|
use store_api::metric_engine_consts::DATA_REGION_SUBDIR;
|
||||||
|
use store_api::path_utils::region_dir;
|
||||||
|
use store_api::storage::{RegionId, SequenceNumber};
|
||||||
|
use table::metadata::TableId;
|
||||||
|
|
||||||
|
use crate::converter::{InputFile, InputFileType};
|
||||||
|
use crate::error::{MissingTableSnafu, MitoSnafu, Result};
|
||||||
|
use crate::reader::remote_write::RemoteWriteReader;
|
||||||
|
use crate::table::TableMetadataHelper;
|
||||||
|
use crate::OpenDALParquetReader;
|
||||||
|
|
||||||
|
pub(crate) mod parquet;
|
||||||
|
pub mod remote_write;
|
||||||
|
|
||||||
|
/// Reader and context.
|
||||||
|
pub struct ReaderInfo {
|
||||||
|
pub reader: BoxedBatchReader,
|
||||||
|
pub region_dir: String,
|
||||||
|
pub region_options: RegionOptions,
|
||||||
|
pub metadata: RegionMetadataRef,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builder to build readers to read input files.
|
||||||
|
pub(crate) struct InputReaderBuilder {
|
||||||
|
input_store: ObjectStore,
|
||||||
|
table_helper: TableMetadataHelper,
|
||||||
|
region_loader: RegionMetadataLoader,
|
||||||
|
/// Cached region infos for tables.
|
||||||
|
region_infos: HashMap<String, RegionInfo>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InputReaderBuilder {
|
||||||
|
pub(crate) fn new(
|
||||||
|
input_store: ObjectStore,
|
||||||
|
table_helper: TableMetadataHelper,
|
||||||
|
object_store_manager: ObjectStoreManagerRef,
|
||||||
|
config: Arc<MitoConfig>,
|
||||||
|
) -> Self {
|
||||||
|
let region_loader = RegionMetadataLoader::new(config, object_store_manager);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
input_store,
|
||||||
|
table_helper,
|
||||||
|
region_loader,
|
||||||
|
region_infos: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds a reader to read the input file.
|
||||||
|
pub async fn read_input(&mut self, input: &InputFile) -> Result<ReaderInfo> {
|
||||||
|
match input.file_type {
|
||||||
|
InputFileType::Parquet => self.read_parquet(input).await,
|
||||||
|
InputFileType::RemoteWrite => self.read_remote_write(input).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds a reader to read the parquet file.
|
||||||
|
pub async fn read_parquet(&mut self, input: &InputFile) -> Result<ReaderInfo> {
|
||||||
|
let region_info = self.get_region_info(input).await?;
|
||||||
|
let reader = OpenDALParquetReader::new(
|
||||||
|
self.input_store.clone(),
|
||||||
|
&input.path,
|
||||||
|
region_info.metadata.clone(),
|
||||||
|
Some(region_info.flushed_sequence),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(ReaderInfo {
|
||||||
|
reader: Box::new(reader),
|
||||||
|
region_dir: region_info.region_dir,
|
||||||
|
region_options: region_info.region_options,
|
||||||
|
metadata: region_info.metadata,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds a reader to read the remote write file.
|
||||||
|
pub async fn read_remote_write(&mut self, input: &InputFile) -> Result<ReaderInfo> {
|
||||||
|
let region_info = self.get_region_info(input).await?;
|
||||||
|
// TODO(yingwen): Should update the sequence.
|
||||||
|
let reader = RemoteWriteReader::open(
|
||||||
|
self.input_store.clone(),
|
||||||
|
&input.path,
|
||||||
|
&input.catalog,
|
||||||
|
&input.schema,
|
||||||
|
region_info.metadata.clone(),
|
||||||
|
self.table_helper.clone(),
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
.with_sequence(region_info.flushed_sequence);
|
||||||
|
|
||||||
|
Ok(ReaderInfo {
|
||||||
|
reader: Box::new(reader),
|
||||||
|
region_dir: region_info.region_dir,
|
||||||
|
region_options: region_info.region_options,
|
||||||
|
metadata: region_info.metadata,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_region_info(&mut self, input: &InputFile) -> Result<RegionInfo> {
|
||||||
|
let cache_key = cache_key(&input.catalog, &input.schema, &input.table);
|
||||||
|
if let Some(region_info) = self.region_infos.get(&cache_key) {
|
||||||
|
return Ok(region_info.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
let table_info = self
|
||||||
|
.table_helper
|
||||||
|
.get_table(&input.catalog, &input.schema, &input.table)
|
||||||
|
.await?
|
||||||
|
.context(MissingTableSnafu {
|
||||||
|
table_name: &input.table,
|
||||||
|
})?;
|
||||||
|
let region_id = to_region_id(table_info.table_info.ident.table_id);
|
||||||
|
let opts = table_info.table_info.to_region_options();
|
||||||
|
// TODO(yingwen): We ignore WAL options now. We should `prepare_wal_options()` in the future.
|
||||||
|
let region_options = RegionOptions::try_from(&opts).context(MitoSnafu)?;
|
||||||
|
let mut region_dir = region_dir(&table_info.region_storage_path(), region_id);
|
||||||
|
if input.file_type == InputFileType::RemoteWrite {
|
||||||
|
// metric engine has two internal regions.
|
||||||
|
region_dir = join_dir(®ion_dir, DATA_REGION_SUBDIR);
|
||||||
|
}
|
||||||
|
let manifest = self
|
||||||
|
.region_loader
|
||||||
|
.load_manifest(®ion_dir, ®ion_options)
|
||||||
|
.await
|
||||||
|
.context(MitoSnafu)?
|
||||||
|
.context(MissingTableSnafu {
|
||||||
|
table_name: &table_info.table_info.name,
|
||||||
|
})?;
|
||||||
|
let region_info = RegionInfo {
|
||||||
|
metadata: manifest.metadata.clone(),
|
||||||
|
flushed_sequence: manifest.flushed_sequence,
|
||||||
|
region_dir,
|
||||||
|
region_options,
|
||||||
|
};
|
||||||
|
self.region_infos.insert(cache_key, region_info.clone());
|
||||||
|
|
||||||
|
Ok(region_info)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_region_id(table_id: TableId) -> RegionId {
|
||||||
|
RegionId::new(table_id, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cache_key(catalog: &str, schema: &str, table: &str) -> String {
|
||||||
|
format!("{}/{}/{}", catalog, schema, table)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct RegionInfo {
|
||||||
|
metadata: RegionMetadataRef,
|
||||||
|
flushed_sequence: SequenceNumber,
|
||||||
|
region_dir: String,
|
||||||
|
region_options: RegionOptions,
|
||||||
|
}
|
||||||
318
src/sst-convert/src/reader/parquet.rs
Normal file
318
src/sst-convert/src/reader/parquet.rs
Normal file
@@ -0,0 +1,318 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
//! Parquet file format support.
|
||||||
|
|
||||||
|
use std::collections::{BTreeMap, HashMap, VecDeque};
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use api::v1::OpType;
|
||||||
|
use common_error::ext::BoxedError;
|
||||||
|
use common_error::snafu::{OptionExt, ResultExt};
|
||||||
|
use common_recordbatch::error::UnsupportedOperationSnafu;
|
||||||
|
use common_recordbatch::RecordBatch;
|
||||||
|
use datatypes::prelude::DataType;
|
||||||
|
use datatypes::scalars::ScalarVectorBuilder;
|
||||||
|
use datatypes::schema::Schema;
|
||||||
|
use datatypes::value::Value;
|
||||||
|
use datatypes::vectors::{MutableVector, UInt64VectorBuilder, UInt8VectorBuilder};
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use mito2::error::ReadParquetSnafu;
|
||||||
|
use mito2::read::{Batch, BatchColumn, BatchReader};
|
||||||
|
use mito2::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SparsePrimaryKeyCodec};
|
||||||
|
use object_store::ObjectStore;
|
||||||
|
use parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream};
|
||||||
|
use parquet::arrow::ParquetRecordBatchStreamBuilder;
|
||||||
|
use parquet_opendal::AsyncReader;
|
||||||
|
use store_api::codec::PrimaryKeyEncoding;
|
||||||
|
use store_api::metadata::RegionMetadataRef;
|
||||||
|
use store_api::storage::{ColumnId, SequenceNumber};
|
||||||
|
|
||||||
|
use crate::error::{MitoSnafu, ObjectStoreSnafu, Result};
|
||||||
|
|
||||||
|
pub struct OpenDALParquetReader {
|
||||||
|
inner: RawParquetReader<AsyncReader>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OpenDALParquetReader {
|
||||||
|
pub async fn new(
|
||||||
|
operator: ObjectStore,
|
||||||
|
path: &str,
|
||||||
|
metadata: RegionMetadataRef,
|
||||||
|
override_sequence: Option<SequenceNumber>,
|
||||||
|
) -> Result<Self> {
|
||||||
|
let reader = operator.reader_with(path).await.context(ObjectStoreSnafu)?;
|
||||||
|
|
||||||
|
let content_len = operator
|
||||||
|
.stat(path)
|
||||||
|
.await
|
||||||
|
.context(ObjectStoreSnafu)?
|
||||||
|
.content_length();
|
||||||
|
|
||||||
|
let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024);
|
||||||
|
let stream = ParquetRecordBatchStreamBuilder::new(reader)
|
||||||
|
.await
|
||||||
|
.context(ReadParquetSnafu { path })
|
||||||
|
.context(MitoSnafu)?
|
||||||
|
.build()
|
||||||
|
.context(ReadParquetSnafu { path })
|
||||||
|
.context(MitoSnafu)?;
|
||||||
|
Ok(Self {
|
||||||
|
inner: RawParquetReader::new(stream, metadata, override_sequence, path),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl BatchReader for OpenDALParquetReader {
|
||||||
|
async fn next_batch(&mut self) -> mito2::error::Result<Option<Batch>> {
|
||||||
|
self.inner.next_batch().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct RawParquetReader<T> {
|
||||||
|
metadata: RegionMetadataRef,
|
||||||
|
override_sequence: Option<SequenceNumber>,
|
||||||
|
output_batch_queue: VecDeque<Batch>,
|
||||||
|
stream: Pin<Box<ParquetRecordBatchStream<T>>>,
|
||||||
|
path: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: AsyncFileReader + Unpin + Send + 'static> RawParquetReader<T> {
|
||||||
|
pub fn new(
|
||||||
|
stream: ParquetRecordBatchStream<T>,
|
||||||
|
metadata: RegionMetadataRef,
|
||||||
|
override_sequence: Option<SequenceNumber>,
|
||||||
|
path: &str,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
stream: Box::pin(stream),
|
||||||
|
metadata,
|
||||||
|
override_sequence,
|
||||||
|
output_batch_queue: VecDeque::new(),
|
||||||
|
path: path.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn next_batch_inner(&mut self) -> mito2::error::Result<Option<Batch>> {
|
||||||
|
if let Some(batch) = self.output_batch_queue.pop_front() {
|
||||||
|
return Ok(Some(batch));
|
||||||
|
}
|
||||||
|
let Some(next_input_rb) = self.stream.next().await.transpose().with_context(|_| {
|
||||||
|
mito2::error::ReadParquetSnafu {
|
||||||
|
path: self.path.clone(),
|
||||||
|
}
|
||||||
|
})?
|
||||||
|
else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
|
let schema = Arc::new(
|
||||||
|
Schema::try_from(next_input_rb.schema())
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.with_context(|_| mito2::error::ExternalSnafu {
|
||||||
|
context: format!(
|
||||||
|
"Failed to convert Schema from DfSchema: {:?}",
|
||||||
|
next_input_rb.schema()
|
||||||
|
),
|
||||||
|
})?,
|
||||||
|
);
|
||||||
|
let rb = RecordBatch::try_from_df_record_batch(schema, next_input_rb)
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.with_context(|_| mito2::error::ExternalSnafu {
|
||||||
|
context: "Failed to convert RecordBatch from DfRecordBatch".to_string(),
|
||||||
|
})?;
|
||||||
|
let new_batches = extract_to_batches(&rb, &self.metadata, self.override_sequence)
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.with_context(|_| mito2::error::ExternalSnafu {
|
||||||
|
context: format!("Failed to extract batches from RecordBatch: {:?}", rb),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
self.output_batch_queue.extend(new_batches);
|
||||||
|
Ok(self.output_batch_queue.pop_front())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<T: AsyncFileReader + Unpin + Send + 'static> BatchReader for RawParquetReader<T> {
|
||||||
|
async fn next_batch(&mut self) -> mito2::error::Result<Option<Batch>> {
|
||||||
|
self.next_batch_inner().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn extract_to_batches(
|
||||||
|
rb: &RecordBatch,
|
||||||
|
metadata: &RegionMetadataRef,
|
||||||
|
override_sequence: Option<SequenceNumber>,
|
||||||
|
) -> Result<Vec<Batch>, BoxedError> {
|
||||||
|
let pk_codec: Box<dyn PrimaryKeyCodec> = match metadata.primary_key_encoding {
|
||||||
|
PrimaryKeyEncoding::Dense => Box::new(DensePrimaryKeyCodec::new(metadata)),
|
||||||
|
PrimaryKeyEncoding::Sparse => Box::new(SparsePrimaryKeyCodec::new(metadata)),
|
||||||
|
};
|
||||||
|
let pk_ids = metadata.primary_key.clone();
|
||||||
|
let pk_names: Vec<_> = pk_ids
|
||||||
|
.iter()
|
||||||
|
.map(|id| {
|
||||||
|
metadata
|
||||||
|
.column_by_id(*id)
|
||||||
|
.expect("Can't find column by id")
|
||||||
|
.column_schema
|
||||||
|
.name
|
||||||
|
.clone()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let pk_pos_in_rb: Vec<_> = pk_names
|
||||||
|
.into_iter()
|
||||||
|
.map(|name| {
|
||||||
|
rb.schema
|
||||||
|
.column_index_by_name(&name)
|
||||||
|
.context(UnsupportedOperationSnafu {
|
||||||
|
reason: format!("Can't find column {} in rb={:?}", name, rb),
|
||||||
|
})
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
})
|
||||||
|
.collect::<Result<_, _>>()?;
|
||||||
|
|
||||||
|
let mut pk_to_batchs: HashMap<Vec<u8>, SSTBatchBuilder> = HashMap::new();
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
|
||||||
|
for row in rb.rows() {
|
||||||
|
let pk_values: Vec<_> = pk_ids
|
||||||
|
.iter()
|
||||||
|
.zip(pk_pos_in_rb.iter())
|
||||||
|
.map(|(id, pos)| (*id, row[*pos].clone()))
|
||||||
|
.collect();
|
||||||
|
pk_codec
|
||||||
|
.encode_values(&pk_values, &mut buffer)
|
||||||
|
.map_err(BoxedError::new)?;
|
||||||
|
let cur_pk = &buffer;
|
||||||
|
let builder = if let Some(builder) = pk_to_batchs.get_mut(cur_pk) {
|
||||||
|
builder
|
||||||
|
} else {
|
||||||
|
let builder =
|
||||||
|
SSTBatchBuilder::new(rb, metadata, override_sequence).map_err(BoxedError::new)?;
|
||||||
|
pk_to_batchs.insert(cur_pk.clone(), builder);
|
||||||
|
pk_to_batchs.get_mut(cur_pk).expect("Just inserted")
|
||||||
|
};
|
||||||
|
builder.push_row(&row).map_err(BoxedError::new)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// sort batches by primary key
|
||||||
|
let mut batches = BTreeMap::new();
|
||||||
|
for (pk, builder) in pk_to_batchs {
|
||||||
|
batches.insert(pk.clone(), builder.finish(pk).map_err(BoxedError::new)?);
|
||||||
|
}
|
||||||
|
let batches = batches.into_values().collect();
|
||||||
|
Ok(batches)
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SSTBatchBuilder {
|
||||||
|
/// for extract field column from record batch's row
|
||||||
|
field_column_pos: Vec<usize>,
|
||||||
|
field_ids: Vec<ColumnId>,
|
||||||
|
field_builders: Vec<Box<dyn MutableVector>>,
|
||||||
|
timestamp_pos: usize,
|
||||||
|
timestamp_builder: Box<dyn MutableVector>,
|
||||||
|
/// override sequence number
|
||||||
|
override_sequence: Option<SequenceNumber>,
|
||||||
|
sequence_builder: UInt64VectorBuilder,
|
||||||
|
op_type_builder: UInt8VectorBuilder,
|
||||||
|
cur_seq: SequenceNumber,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SSTBatchBuilder {
|
||||||
|
fn finish(mut self, pk: Vec<u8>) -> Result<Batch, BoxedError> {
|
||||||
|
let fields: Vec<_> = self
|
||||||
|
.field_ids
|
||||||
|
.iter()
|
||||||
|
.zip(self.field_builders)
|
||||||
|
.map(|(id, mut b)| BatchColumn {
|
||||||
|
column_id: *id,
|
||||||
|
data: b.to_vector(),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Batch::new(
|
||||||
|
pk,
|
||||||
|
self.timestamp_builder.to_vector(),
|
||||||
|
Arc::new(self.sequence_builder.finish()),
|
||||||
|
Arc::new(self.op_type_builder.finish()),
|
||||||
|
fields,
|
||||||
|
)
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn push_row(&mut self, row: &[Value]) -> Result<(), BoxedError> {
|
||||||
|
for (field_pos, field_builder) in self
|
||||||
|
.field_column_pos
|
||||||
|
.iter()
|
||||||
|
.zip(self.field_builders.iter_mut())
|
||||||
|
{
|
||||||
|
field_builder.push_value_ref(row[*field_pos].as_value_ref());
|
||||||
|
}
|
||||||
|
self.timestamp_builder
|
||||||
|
.push_value_ref(row[self.timestamp_pos].as_value_ref());
|
||||||
|
self.sequence_builder
|
||||||
|
.push(Some(self.override_sequence.unwrap_or(self.cur_seq)));
|
||||||
|
self.op_type_builder.push(Some(OpType::Put as u8));
|
||||||
|
self.cur_seq += 1;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new(
|
||||||
|
rb: &RecordBatch,
|
||||||
|
metadata: &RegionMetadataRef,
|
||||||
|
override_sequence: Option<SequenceNumber>,
|
||||||
|
) -> Result<Self, BoxedError> {
|
||||||
|
let timeindex_name = &metadata.time_index_column().column_schema.name;
|
||||||
|
Ok(Self {
|
||||||
|
field_ids: metadata.field_columns().map(|c| c.column_id).collect(),
|
||||||
|
field_column_pos: metadata
|
||||||
|
.field_columns()
|
||||||
|
.map(|c| &c.column_schema.name)
|
||||||
|
.map(|name| {
|
||||||
|
rb.schema
|
||||||
|
.column_index_by_name(name)
|
||||||
|
.context(UnsupportedOperationSnafu {
|
||||||
|
reason: format!("Can't find column {} in rb={:?}", name, rb),
|
||||||
|
})
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
})
|
||||||
|
.collect::<Result<_, _>>()?,
|
||||||
|
field_builders: metadata
|
||||||
|
.field_columns()
|
||||||
|
.map(|c| c.column_schema.data_type.create_mutable_vector(512))
|
||||||
|
.collect(),
|
||||||
|
|
||||||
|
timestamp_pos: rb
|
||||||
|
.schema
|
||||||
|
.column_index_by_name(timeindex_name)
|
||||||
|
.context(UnsupportedOperationSnafu {
|
||||||
|
reason: format!("{} in rb={:?}", timeindex_name, rb),
|
||||||
|
})
|
||||||
|
.map_err(BoxedError::new)?,
|
||||||
|
timestamp_builder: metadata
|
||||||
|
.time_index_column()
|
||||||
|
.column_schema
|
||||||
|
.data_type
|
||||||
|
.create_mutable_vector(512),
|
||||||
|
|
||||||
|
override_sequence,
|
||||||
|
sequence_builder: UInt64VectorBuilder::with_capacity(512),
|
||||||
|
|
||||||
|
op_type_builder: UInt8VectorBuilder::with_capacity(512),
|
||||||
|
cur_seq: override_sequence.unwrap_or_default(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
369
src/sst-convert/src/reader/remote_write.rs
Normal file
369
src/sst-convert/src/reader/remote_write.rs
Normal file
@@ -0,0 +1,369 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
//! Prometheus remote write support.
|
||||||
|
//!
|
||||||
|
//! Prometheus remote write protocol in parquet format.
|
||||||
|
//! - Each row contains a protobuf binary representation of a single timeseries.
|
||||||
|
//! - Each series only occurs once in the file.
|
||||||
|
//! - Each timeseries must has a the `__name__` label.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use api::prom_store::remote::{Label, TimeSeries};
|
||||||
|
use api::v1::OpType;
|
||||||
|
use arrow_array::{
|
||||||
|
Array, BinaryArray, Float64Array, TimestampMillisecondArray, UInt64Array, UInt8Array,
|
||||||
|
};
|
||||||
|
use datatypes::value::ValueRef;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use metric_engine::row_modifier::TsidGenerator;
|
||||||
|
use mito2::error::ReadParquetSnafu;
|
||||||
|
use mito2::read::{Batch, BatchBuilder, BatchReader};
|
||||||
|
use mito2::row_converter::SparsePrimaryKeyCodec;
|
||||||
|
use object_store::ObjectStore;
|
||||||
|
use parquet::arrow::async_reader::ParquetRecordBatchStream;
|
||||||
|
use parquet::arrow::ParquetRecordBatchStreamBuilder;
|
||||||
|
use parquet_opendal::AsyncReader;
|
||||||
|
use prost::Message;
|
||||||
|
use snafu::{OptionExt, ResultExt};
|
||||||
|
use store_api::metadata::RegionMetadataRef;
|
||||||
|
use store_api::storage::consts::ReservedColumnId;
|
||||||
|
use store_api::storage::{ColumnId, SequenceNumber};
|
||||||
|
use table::metadata::TableId;
|
||||||
|
|
||||||
|
use crate::error::{
|
||||||
|
DecodeSnafu, MissingColumnSnafu, MissingMetricNameSnafu, MissingTableSnafu, MitoSnafu,
|
||||||
|
ObjectStoreSnafu, Result,
|
||||||
|
};
|
||||||
|
use crate::table::TableMetadataHelper;
|
||||||
|
|
||||||
|
const METRIC_NAME_LABEL: &str = "__name__";
|
||||||
|
const GREPTIME_VALUE: &str = "greptime_value";
|
||||||
|
|
||||||
|
/// A reader that reads remote write file, sorts and outputs timeseries in the primary key order.
|
||||||
|
pub struct RemoteWriteReader {
|
||||||
|
/// Timeseries sorted by primary key in reverse order.
|
||||||
|
/// So we can pop the series.
|
||||||
|
series: Vec<(Vec<u8>, TimeSeries)>,
|
||||||
|
/// Converter for converting timeseries to batches.
|
||||||
|
converter: SeriesConverter,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RemoteWriteReader {
|
||||||
|
/// Creates a new [`RemoteWriteReader`] from a object store.
|
||||||
|
/// It reads and sorts timeseries.
|
||||||
|
pub async fn open(
|
||||||
|
operator: ObjectStore,
|
||||||
|
path: &str,
|
||||||
|
catalog: &str,
|
||||||
|
schema: &str,
|
||||||
|
metadata: RegionMetadataRef,
|
||||||
|
table_helper: TableMetadataHelper,
|
||||||
|
) -> Result<Self> {
|
||||||
|
let codec = SparsePrimaryKeyCodec::new(&metadata);
|
||||||
|
let value_id = metadata
|
||||||
|
.column_by_name(GREPTIME_VALUE)
|
||||||
|
.context(MissingColumnSnafu {
|
||||||
|
column_name: GREPTIME_VALUE,
|
||||||
|
})?
|
||||||
|
.column_id;
|
||||||
|
let encoder = PrimaryKeyEncoder {
|
||||||
|
catalog: catalog.to_string(),
|
||||||
|
schema: schema.to_string(),
|
||||||
|
metadata,
|
||||||
|
table_helper,
|
||||||
|
table_ids: HashMap::new(),
|
||||||
|
codec,
|
||||||
|
};
|
||||||
|
let converter = SeriesConverter {
|
||||||
|
value_id,
|
||||||
|
override_sequence: None,
|
||||||
|
};
|
||||||
|
let mut sorter = TimeSeriesSorter::new(encoder);
|
||||||
|
|
||||||
|
let mut reader = TimeSeriesParquetReader::new(operator, path).await?;
|
||||||
|
while let Some(series) = reader.next_series().await? {
|
||||||
|
sorter.push(series).await?;
|
||||||
|
}
|
||||||
|
let mut series = sorter.sort();
|
||||||
|
series.reverse();
|
||||||
|
|
||||||
|
Ok(Self { series, converter })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets the sequence number of the batch.
|
||||||
|
/// Otherwise, the sequence number will be 0.
|
||||||
|
pub fn with_sequence(mut self, sequence: SequenceNumber) -> Self {
|
||||||
|
self.converter.override_sequence = Some(sequence);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
fn next_series(&mut self) -> Option<(Vec<u8>, TimeSeries)> {
|
||||||
|
self.series.pop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl BatchReader for RemoteWriteReader {
|
||||||
|
async fn next_batch(&mut self) -> mito2::error::Result<Option<Batch>> {
|
||||||
|
let Some((pk, series)) = self.next_series() else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
|
self.converter.convert(pk, series).map(Some)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SeriesConverter {
|
||||||
|
/// Column id for the value field.
|
||||||
|
value_id: ColumnId,
|
||||||
|
/// Sequence number of the batch.
|
||||||
|
override_sequence: Option<SequenceNumber>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SeriesConverter {
|
||||||
|
/// Builds a batch from a primary key and a time series.
|
||||||
|
fn convert(&self, primary_key: Vec<u8>, series: TimeSeries) -> mito2::error::Result<Batch> {
|
||||||
|
let num_rows = series.samples.len();
|
||||||
|
let op_types = vec![OpType::Put as u8; num_rows];
|
||||||
|
// TODO(yingwen): Should we use 0 or 1 as default?
|
||||||
|
let sequences = vec![self.override_sequence.unwrap_or(0); num_rows];
|
||||||
|
let mut timestamps = Vec::with_capacity(num_rows);
|
||||||
|
let mut values = Vec::with_capacity(num_rows);
|
||||||
|
for sample in series.samples {
|
||||||
|
timestamps.push(sample.timestamp);
|
||||||
|
values.push(sample.value);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut builder = BatchBuilder::new(primary_key);
|
||||||
|
builder
|
||||||
|
.timestamps_array(Arc::new(TimestampMillisecondArray::from(timestamps)))?
|
||||||
|
.sequences_array(Arc::new(UInt64Array::from(sequences)))?
|
||||||
|
.op_types_array(Arc::new(UInt8Array::from(op_types)))?
|
||||||
|
.push_field_array(self.value_id, Arc::new(Float64Array::from(values)))?;
|
||||||
|
builder.build()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Prometheus remote write parquet reader.
|
||||||
|
pub struct TimeSeriesParquetReader {
|
||||||
|
path: String,
|
||||||
|
stream: ParquetRecordBatchStream<AsyncReader>,
|
||||||
|
/// Is the stream EOF.
|
||||||
|
eof: bool,
|
||||||
|
/// Current binary array.
|
||||||
|
array: Option<BinaryArray>,
|
||||||
|
/// Current row in the record batch.
|
||||||
|
/// Only valid when the record batch is Some.
|
||||||
|
row_index: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TimeSeriesParquetReader {
|
||||||
|
/// Creates a new instance of `TimeSeriesParquetReader`.
|
||||||
|
pub async fn new(object_store: ObjectStore, path: &str) -> Result<Self> {
|
||||||
|
let reader = object_store
|
||||||
|
.reader_with(path)
|
||||||
|
.await
|
||||||
|
.context(ObjectStoreSnafu)?;
|
||||||
|
let content_len = object_store
|
||||||
|
.stat(path)
|
||||||
|
.await
|
||||||
|
.context(ObjectStoreSnafu)?
|
||||||
|
.content_length();
|
||||||
|
let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024);
|
||||||
|
let stream = ParquetRecordBatchStreamBuilder::new(reader)
|
||||||
|
.await
|
||||||
|
.context(ReadParquetSnafu { path })
|
||||||
|
.context(MitoSnafu)?
|
||||||
|
.build()
|
||||||
|
.context(ReadParquetSnafu { path })
|
||||||
|
.context(MitoSnafu)?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
path: path.to_string(),
|
||||||
|
stream,
|
||||||
|
eof: false,
|
||||||
|
array: None,
|
||||||
|
row_index: 0,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reads the next timeseries from the reader.
|
||||||
|
pub async fn next_series(&mut self) -> Result<Option<TimeSeries>> {
|
||||||
|
if self.eof {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(array) = &self.array {
|
||||||
|
if self.row_index >= array.len() {
|
||||||
|
self.row_index = 0;
|
||||||
|
self.array = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.array.is_none() {
|
||||||
|
let Some(record_batch) = self
|
||||||
|
.stream
|
||||||
|
.next()
|
||||||
|
.await
|
||||||
|
.transpose()
|
||||||
|
.context(ReadParquetSnafu { path: &self.path })
|
||||||
|
.context(MitoSnafu)?
|
||||||
|
else {
|
||||||
|
self.eof = true;
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
let array = record_batch
|
||||||
|
.column(0)
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<BinaryArray>()
|
||||||
|
.context(MissingColumnSnafu {
|
||||||
|
column_name: "remote write json column",
|
||||||
|
})?
|
||||||
|
.clone();
|
||||||
|
assert!(!array.is_empty());
|
||||||
|
self.array = Some(array);
|
||||||
|
}
|
||||||
|
|
||||||
|
let array = self.array.as_ref().unwrap();
|
||||||
|
let value = array.value(self.row_index);
|
||||||
|
let time_series = TimeSeries::decode(value).context(DecodeSnafu)?;
|
||||||
|
self.row_index += 1;
|
||||||
|
|
||||||
|
Ok(Some(time_series))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Encoder to encode labels into primary key.
|
||||||
|
struct PrimaryKeyEncoder {
|
||||||
|
/// Catalog name.
|
||||||
|
catalog: String,
|
||||||
|
/// Schema name.
|
||||||
|
schema: String,
|
||||||
|
/// The metadata of the physical region.
|
||||||
|
metadata: RegionMetadataRef,
|
||||||
|
/// Helper to get table metadata.
|
||||||
|
table_helper: TableMetadataHelper,
|
||||||
|
/// Cached table name to table id.
|
||||||
|
table_ids: HashMap<String, TableId>,
|
||||||
|
/// Primary key encoder.
|
||||||
|
codec: SparsePrimaryKeyCodec,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PrimaryKeyEncoder {
|
||||||
|
/// Encodes the primary key for the given labels.
|
||||||
|
/// It'll sort the labels by name before encoding.
|
||||||
|
async fn encode_primary_key(
|
||||||
|
&mut self,
|
||||||
|
labels: &mut Vec<Label>,
|
||||||
|
key_buf: &mut Vec<u8>,
|
||||||
|
) -> Result<()> {
|
||||||
|
if !labels.is_sorted_by(|left, right| left.name <= right.name) {
|
||||||
|
labels.sort_unstable_by(|left, right| left.name.cmp(&right.name));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gets the table id from the label.
|
||||||
|
let name_label = labels
|
||||||
|
.iter()
|
||||||
|
.find(|label| label.name == METRIC_NAME_LABEL)
|
||||||
|
.context(MissingMetricNameSnafu)?;
|
||||||
|
let table_id = match self.table_ids.get(&name_label.value) {
|
||||||
|
Some(id) => *id,
|
||||||
|
None => {
|
||||||
|
let table_info = self
|
||||||
|
.table_helper
|
||||||
|
.get_table(&self.catalog, &self.schema, &name_label.value)
|
||||||
|
.await?
|
||||||
|
.context(MissingTableSnafu {
|
||||||
|
table_name: &name_label.value,
|
||||||
|
})?;
|
||||||
|
let id = table_info.table_info.ident.table_id;
|
||||||
|
self.table_ids.insert(name_label.value.clone(), id);
|
||||||
|
|
||||||
|
id
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Computes the tsid for the given labels.
|
||||||
|
let mut generator = TsidGenerator::default();
|
||||||
|
for label in &*labels {
|
||||||
|
if label.name != METRIC_NAME_LABEL {
|
||||||
|
generator.write_label(&label.name, &label.value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let tsid = generator.finish();
|
||||||
|
|
||||||
|
key_buf.clear();
|
||||||
|
let internal_columns = [
|
||||||
|
(ReservedColumnId::table_id(), ValueRef::UInt32(table_id)),
|
||||||
|
(ReservedColumnId::tsid(), ValueRef::UInt64(tsid)),
|
||||||
|
];
|
||||||
|
self.codec
|
||||||
|
.encode_to_vec(internal_columns.into_iter(), key_buf)
|
||||||
|
.context(MitoSnafu)?;
|
||||||
|
let label_iter = labels.iter().filter_map(|label| {
|
||||||
|
if label.name == METRIC_NAME_LABEL {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let column_id = self.metadata.column_by_name(&label.name)?.column_id;
|
||||||
|
Some((column_id, ValueRef::String(&label.value)))
|
||||||
|
});
|
||||||
|
self.codec
|
||||||
|
.encode_to_vec(label_iter, key_buf)
|
||||||
|
.context(MitoSnafu)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Prom timeseries sorter.
|
||||||
|
/// Sorts timeseries by the primary key.
|
||||||
|
struct TimeSeriesSorter {
|
||||||
|
/// Timeseries to sort.
|
||||||
|
series: Vec<(Vec<u8>, TimeSeries)>,
|
||||||
|
/// Key encoder.
|
||||||
|
encoder: PrimaryKeyEncoder,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TimeSeriesSorter {
|
||||||
|
/// Creates a new sorter.
|
||||||
|
fn new(encoder: PrimaryKeyEncoder) -> Self {
|
||||||
|
Self {
|
||||||
|
series: Vec::new(),
|
||||||
|
encoder,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Push a timeseries to the sorter.
|
||||||
|
async fn push(&mut self, mut series: TimeSeries) -> Result<()> {
|
||||||
|
let mut key_buf = Vec::new();
|
||||||
|
self.encoder
|
||||||
|
.encode_primary_key(&mut series.labels, &mut key_buf)
|
||||||
|
.await?;
|
||||||
|
self.series.push((key_buf, series));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sorts the requests.
|
||||||
|
/// Returns the sorted timeseries and the primary key.
|
||||||
|
fn sort(&mut self) -> Vec<(Vec<u8>, TimeSeries)> {
|
||||||
|
self.series
|
||||||
|
.sort_unstable_by(|left, right| left.0.cmp(&right.0));
|
||||||
|
|
||||||
|
std::mem::take(&mut self.series)
|
||||||
|
}
|
||||||
|
}
|
||||||
78
src/sst-convert/src/table.rs
Normal file
78
src/sst-convert/src/table.rs
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
//! Utilities to get table metadata.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use catalog::kvbackend::MetaKvBackend;
|
||||||
|
use common_meta::key::table_info::TableInfoValue;
|
||||||
|
use common_meta::key::table_name::TableNameKey;
|
||||||
|
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||||
|
use meta_client::{MetaClientOptions, MetaClientType};
|
||||||
|
use snafu::ResultExt;
|
||||||
|
|
||||||
|
use crate::error::{MetaClientSnafu, MetaSnafu, Result};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct TableMetadataHelper {
|
||||||
|
table_metadata_manager: TableMetadataManagerRef,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TableMetadataHelper {
|
||||||
|
pub async fn new(meta_options: &MetaClientOptions) -> Result<Self> {
|
||||||
|
let backend = build_kv_backend(meta_options).await?;
|
||||||
|
let table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(backend)));
|
||||||
|
Ok(Self {
|
||||||
|
table_metadata_manager,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get table info.
|
||||||
|
pub async fn get_table(
|
||||||
|
&self,
|
||||||
|
catalog: &str,
|
||||||
|
schema: &str,
|
||||||
|
table: &str,
|
||||||
|
) -> Result<Option<TableInfoValue>> {
|
||||||
|
let table_name = TableNameKey::new(catalog, schema, table);
|
||||||
|
let Some(table_id) = self
|
||||||
|
.table_metadata_manager
|
||||||
|
.table_name_manager()
|
||||||
|
.get(table_name)
|
||||||
|
.await
|
||||||
|
.context(MetaSnafu)?
|
||||||
|
.map(|v| v.table_id())
|
||||||
|
else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
|
let value = self
|
||||||
|
.table_metadata_manager
|
||||||
|
.table_info_manager()
|
||||||
|
.get(table_id)
|
||||||
|
.await
|
||||||
|
.context(MetaSnafu)?
|
||||||
|
.map(|v| v.into_inner());
|
||||||
|
Ok(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn build_kv_backend(meta_options: &MetaClientOptions) -> Result<MetaKvBackend> {
|
||||||
|
let meta_client = meta_client::create_meta_client(MetaClientType::Frontend, meta_options)
|
||||||
|
.await
|
||||||
|
.context(MetaClientSnafu)?;
|
||||||
|
|
||||||
|
Ok(MetaKvBackend::new(meta_client))
|
||||||
|
}
|
||||||
129
src/sst-convert/src/writer.rs
Normal file
129
src/sst-convert/src/writer.rs
Normal file
@@ -0,0 +1,129 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
//! Utilities for writing SST files.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use mito2::access_layer::{
|
||||||
|
AccessLayer, AccessLayerRef, OperationType, SstInfoArray, SstWriteRequest,
|
||||||
|
};
|
||||||
|
use mito2::cache::CacheManager;
|
||||||
|
use mito2::config::MitoConfig;
|
||||||
|
use mito2::error::Result;
|
||||||
|
use mito2::read::Source;
|
||||||
|
use mito2::region::options::RegionOptions;
|
||||||
|
use mito2::sst::index::intermediate::IntermediateManager;
|
||||||
|
use mito2::sst::index::puffin_manager::PuffinManagerFactory;
|
||||||
|
use mito2::sst::parquet::WriteOptions;
|
||||||
|
use object_store::manager::ObjectStoreManagerRef;
|
||||||
|
use store_api::metadata::RegionMetadataRef;
|
||||||
|
|
||||||
|
/// A writer that can create multiple SST files for a region.
|
||||||
|
pub struct RegionWriter {
|
||||||
|
/// Mito engine config.
|
||||||
|
config: Arc<MitoConfig>,
|
||||||
|
/// Metadata of the region.
|
||||||
|
metadata: RegionMetadataRef,
|
||||||
|
/// Options of the region.
|
||||||
|
region_options: RegionOptions,
|
||||||
|
/// SST access layer.
|
||||||
|
access_layer: AccessLayerRef,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RegionWriter {
|
||||||
|
/// Writes data from a source to SST files.
|
||||||
|
pub async fn write_sst(
|
||||||
|
&self,
|
||||||
|
source: Source,
|
||||||
|
write_opts: &WriteOptions,
|
||||||
|
) -> Result<SstInfoArray> {
|
||||||
|
let request = SstWriteRequest {
|
||||||
|
op_type: OperationType::Flush,
|
||||||
|
metadata: self.metadata.clone(),
|
||||||
|
source,
|
||||||
|
cache_manager: Arc::new(CacheManager::default()),
|
||||||
|
storage: None,
|
||||||
|
max_sequence: None,
|
||||||
|
index_options: self.region_options.index_options.clone(),
|
||||||
|
inverted_index_config: self.config.inverted_index.clone(),
|
||||||
|
fulltext_index_config: self.config.fulltext_index.clone(),
|
||||||
|
bloom_filter_index_config: self.config.bloom_filter_index.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.access_layer.write_sst(request, write_opts).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creator to create [`RegionWriter`] for different regions.
|
||||||
|
pub struct RegionWriterBuilder {
|
||||||
|
/// Mito engine config.
|
||||||
|
config: Arc<MitoConfig>,
|
||||||
|
/// Object stores.
|
||||||
|
object_store_manager: ObjectStoreManagerRef,
|
||||||
|
puffin_manager_factory: PuffinManagerFactory,
|
||||||
|
intermediate_manager: IntermediateManager,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RegionWriterBuilder {
|
||||||
|
/// Create a new [`RegionContextCreator`].
|
||||||
|
pub async fn new(
|
||||||
|
config: Arc<MitoConfig>,
|
||||||
|
object_store_manager: ObjectStoreManagerRef,
|
||||||
|
) -> Result<Self> {
|
||||||
|
let puffin_manager_factory = PuffinManagerFactory::new(
|
||||||
|
&config.index.aux_path,
|
||||||
|
config.index.staging_size.as_bytes(),
|
||||||
|
Some(config.index.write_buffer_size.as_bytes() as _),
|
||||||
|
config.index.staging_ttl,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
|
||||||
|
.await?
|
||||||
|
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
config,
|
||||||
|
object_store_manager,
|
||||||
|
puffin_manager_factory,
|
||||||
|
intermediate_manager,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds a [`RegionWriter`] for the given region directory.
|
||||||
|
pub async fn build(
|
||||||
|
&self,
|
||||||
|
metadata: RegionMetadataRef,
|
||||||
|
region_dir: &str,
|
||||||
|
region_options: RegionOptions,
|
||||||
|
) -> Result<RegionWriter> {
|
||||||
|
let object_store = mito2::region::opener::get_object_store(
|
||||||
|
®ion_options.storage,
|
||||||
|
&self.object_store_manager,
|
||||||
|
)?;
|
||||||
|
let access_layer = Arc::new(AccessLayer::new(
|
||||||
|
region_dir,
|
||||||
|
object_store,
|
||||||
|
self.puffin_manager_factory.clone(),
|
||||||
|
self.intermediate_manager.clone(),
|
||||||
|
));
|
||||||
|
|
||||||
|
Ok(RegionWriter {
|
||||||
|
config: self.config.clone(),
|
||||||
|
metadata,
|
||||||
|
region_options,
|
||||||
|
access_layer,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user