mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
39 Commits
basic_with
...
feat/sst-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d5760a7348 | ||
|
|
bc9614e22c | ||
|
|
7dd9e98ff6 | ||
|
|
fb6b7f7801 | ||
|
|
87d7c316df | ||
|
|
c80a73bc20 | ||
|
|
dd9d13e7df | ||
|
|
79d249f5fa | ||
|
|
63bc544514 | ||
|
|
30c29539a3 | ||
|
|
359da62d9e | ||
|
|
c9f4b36360 | ||
|
|
85c346b16a | ||
|
|
738c23beb0 | ||
|
|
8aadd1e59a | ||
|
|
cbd58291da | ||
|
|
e522e8959b | ||
|
|
7183a93e5a | ||
|
|
8c538622e2 | ||
|
|
142dacb2c8 | ||
|
|
371afc458f | ||
|
|
0751cd74c0 | ||
|
|
ec34e8739a | ||
|
|
b650743785 | ||
|
|
80a8b2e1bd | ||
|
|
ec8a15cadd | ||
|
|
f929d751a5 | ||
|
|
fad3621a7a | ||
|
|
87723effc7 | ||
|
|
62a333ad09 | ||
|
|
6ad186a13e | ||
|
|
77dee84a75 | ||
|
|
a57e263e5a | ||
|
|
8796ddaf31 | ||
|
|
7fa3fbdfef | ||
|
|
457d2a620c | ||
|
|
9f14edbb28 | ||
|
|
cb3fad0c2d | ||
|
|
2d1e7c2441 |
100
Cargo.lock
generated
100
Cargo.lock
generated
@@ -1594,7 +1594,7 @@ dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"strsim 0.8.0",
|
||||
"textwrap 0.11.0",
|
||||
"unicode-width",
|
||||
"unicode-width 0.1.14",
|
||||
"vec_map",
|
||||
]
|
||||
|
||||
@@ -1876,7 +1876,7 @@ checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7"
|
||||
dependencies = [
|
||||
"strum 0.26.3",
|
||||
"strum_macros 0.26.4",
|
||||
"unicode-width",
|
||||
"unicode-width 0.1.14",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2469,6 +2469,7 @@ dependencies = [
|
||||
"encode_unicode",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"unicode-width 0.1.14",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
@@ -4645,7 +4646,7 @@ version = "0.2.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "14dbbfd5c71d70241ecf9e6f13737f7b5ce823821063188d7e46c41d371eebd5"
|
||||
dependencies = [
|
||||
"unicode-width",
|
||||
"unicode-width 0.1.14",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5599,6 +5600,19 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indicatif"
|
||||
version = "0.17.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235"
|
||||
dependencies = [
|
||||
"console",
|
||||
"number_prefix",
|
||||
"portable-atomic",
|
||||
"unicode-width 0.2.0",
|
||||
"web-time 1.1.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inferno"
|
||||
version = "0.11.21"
|
||||
@@ -5628,6 +5642,25 @@ dependencies = [
|
||||
"snafu 0.7.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ingester"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"clap 4.5.19",
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
"datanode",
|
||||
"meta-client",
|
||||
"mito2",
|
||||
"object-store",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sst-convert",
|
||||
"tokio",
|
||||
"toml 0.8.19",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify"
|
||||
version = "0.9.6"
|
||||
@@ -7517,6 +7550,12 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "number_prefix"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
|
||||
|
||||
[[package]]
|
||||
name = "objc"
|
||||
version = "0.2.7"
|
||||
@@ -7973,7 +8012,7 @@ version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d2ad9b889f1b12e0b9ee24db044b5129150d5eada288edc800f789928dc8c0e3"
|
||||
dependencies = [
|
||||
"unicode-width",
|
||||
"unicode-width 0.1.14",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -8069,6 +8108,19 @@ dependencies = [
|
||||
"zstd-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parquet_opendal"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4140ae96f37c170f8d684a544711fabdac1d94adcbd97e8b033329bd37f40446"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"futures",
|
||||
"opendal",
|
||||
"parquet",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parse-zoneinfo"
|
||||
version = "0.3.1"
|
||||
@@ -10056,7 +10108,7 @@ dependencies = [
|
||||
"radix_trie",
|
||||
"scopeguard",
|
||||
"unicode-segmentation",
|
||||
"unicode-width",
|
||||
"unicode-width 0.1.14",
|
||||
"utf8parse",
|
||||
"winapi",
|
||||
]
|
||||
@@ -11203,6 +11255,36 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sst-convert"
|
||||
version = "0.13.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-array",
|
||||
"async-trait",
|
||||
"catalog",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"common-recordbatch",
|
||||
"common-telemetry",
|
||||
"datanode",
|
||||
"datatypes",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"indicatif",
|
||||
"meta-client",
|
||||
"metric-engine",
|
||||
"mito2",
|
||||
"object-store",
|
||||
"parquet",
|
||||
"parquet_opendal",
|
||||
"prost 0.13.3",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"table",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "stable_deref_trait"
|
||||
version = "1.2.0"
|
||||
@@ -11935,7 +12017,7 @@ version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
|
||||
dependencies = [
|
||||
"unicode-width",
|
||||
"unicode-width 0.1.14",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -13038,6 +13120,12 @@ version = "0.1.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-width"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-xid"
|
||||
version = "0.2.6"
|
||||
|
||||
@@ -41,6 +41,7 @@ members = [
|
||||
"src/flow",
|
||||
"src/frontend",
|
||||
"src/index",
|
||||
"src/ingester",
|
||||
"src/log-query",
|
||||
"src/log-store",
|
||||
"src/meta-client",
|
||||
@@ -58,6 +59,7 @@ members = [
|
||||
"src/servers",
|
||||
"src/session",
|
||||
"src/sql",
|
||||
"src/sst-convert",
|
||||
"src/store-api",
|
||||
"src/table",
|
||||
"tests-fuzz",
|
||||
@@ -271,6 +273,7 @@ query = { path = "src/query" }
|
||||
servers = { path = "src/servers" }
|
||||
session = { path = "src/session" }
|
||||
sql = { path = "src/sql" }
|
||||
sst-convert = { path = "src/sst-convert" }
|
||||
store-api = { path = "src/store-api" }
|
||||
substrait = { path = "src/common/substrait" }
|
||||
table = { path = "src/table" }
|
||||
|
||||
76
chore.md
Normal file
76
chore.md
Normal file
@@ -0,0 +1,76 @@
|
||||
# log
|
||||
## first create table
|
||||
```bash
|
||||
mysql --host=127.0.0.1 --port=19195 --database=public;
|
||||
```
|
||||
|
||||
```sql
|
||||
CREATE DATABASE IF NOT EXISTS `cluster1`;
|
||||
USE `cluster1`;
|
||||
CREATE TABLE IF NOT EXISTS `app1` (
|
||||
`greptime_timestamp` TimestampNanosecond NOT NULL TIME INDEX,
|
||||
`app` STRING NULL INVERTED INDEX,
|
||||
`cluster` STRING NULL INVERTED INDEX,
|
||||
`message` STRING NULL,
|
||||
`region` STRING NULL,
|
||||
`cloud-provider` STRING NULL,
|
||||
`environment` STRING NULL,
|
||||
`product` STRING NULL,
|
||||
`sub-product` STRING NULL,
|
||||
`service` STRING NULL
|
||||
) WITH (
|
||||
append_mode = 'true',
|
||||
'compaction.type' = 'twcs',
|
||||
'compaction.twcs.max_output_file_size' = '500MB',
|
||||
'compaction.twcs.max_active_window_files' = '16',
|
||||
'compaction.twcs.max_active_window_runs' = '4',
|
||||
'compaction.twcs.max_inactive_window_files' = '4',
|
||||
'compaction.twcs.max_inactive_window_runs' = '2',
|
||||
);
|
||||
|
||||
select count(*) from app1;
|
||||
|
||||
SELECT * FROM app1 ORDER BY greptime_timestamp DESC LIMIT 10\G
|
||||
```
|
||||
|
||||
## then ingest
|
||||
```bash
|
||||
RUST_LOG="debug" cargo run --bin=ingester -- --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --parquet-dir="parquet_store/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json"
|
||||
```
|
||||
|
||||
# metrics!!!!!!!
|
||||
```bash
|
||||
mysql --host=127.0.0.1 --port=19195 --database=public < public.greptime_physical_table-create-tables.sql
|
||||
```
|
||||
|
||||
## then ingest
|
||||
```bash
|
||||
RUST_LOG="debug"
|
||||
cargo run --bin=ingester -- --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --remote-write-dir="metrics_parquet/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json"
|
||||
# perf it
|
||||
cargo build --release ---bin=ingester
|
||||
samply record target/release/ingester --input-dir="/home/discord9/greptimedb/parquet_store_bk/" --remote-write-dir="metrics_parquet/" --cfg="ingester.toml" --db-http-addr="http://127.0.0.1:4000/v1/sst/ingest_json"
|
||||
```
|
||||
|
||||
## check data
|
||||
```sql
|
||||
select count(*) from greptime_physical_table;
|
||||
+----------+
|
||||
| count(*) |
|
||||
+----------+
|
||||
| 36200 |
|
||||
+----------+
|
||||
1 row in set (0.06 sec)
|
||||
|
||||
select count(*) from storage_operation_errors_total;
|
||||
+----------+
|
||||
| count(*) |
|
||||
+----------+
|
||||
| 10 |
|
||||
+----------+
|
||||
1 row in set (0.03 sec)
|
||||
```
|
||||
|
||||
|
||||
# with oss
|
||||
the same, only different is change storage config in `ingester.toml`
|
||||
35
ingester.toml
Normal file
35
ingester.toml
Normal file
@@ -0,0 +1,35 @@
|
||||
## The metasrv client options.
|
||||
[meta_client]
|
||||
## The addresses of the metasrv.
|
||||
metasrv_addrs = ["127.0.0.1:3002", "127.0.0.1:3003"]
|
||||
|
||||
## Operation timeout.
|
||||
timeout = "3s"
|
||||
|
||||
## Heartbeat timeout.
|
||||
heartbeat_timeout = "500ms"
|
||||
|
||||
## DDL timeout.
|
||||
ddl_timeout = "10s"
|
||||
|
||||
## Connect server timeout.
|
||||
connect_timeout = "1s"
|
||||
|
||||
## `TCP_NODELAY` option for accepted connections.
|
||||
tcp_nodelay = true
|
||||
|
||||
## The configuration about the cache of the metadata.
|
||||
metadata_cache_max_capacity = 100000
|
||||
|
||||
## TTL of the metadata cache.
|
||||
metadata_cache_ttl = "10m"
|
||||
|
||||
# TTI of the metadata cache.
|
||||
metadata_cache_tti = "5m"
|
||||
|
||||
## The data storage options.
|
||||
[storage]
|
||||
## The working home directory.
|
||||
data_home = "/tmp/greptimedb-cluster/datanode0"
|
||||
type = "File"
|
||||
[mito]
|
||||
@@ -25,6 +25,6 @@ pub mod heartbeat;
|
||||
pub mod metrics;
|
||||
pub mod region_server;
|
||||
pub mod service;
|
||||
mod store;
|
||||
pub mod store;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod tests;
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
//! object storage utilities
|
||||
|
||||
mod azblob;
|
||||
mod fs;
|
||||
pub mod fs;
|
||||
mod gcs;
|
||||
mod oss;
|
||||
mod s3;
|
||||
|
||||
@@ -24,7 +24,8 @@ use crate::config::FileConfig;
|
||||
use crate::error::{self, Result};
|
||||
use crate::store;
|
||||
|
||||
pub(crate) async fn new_fs_object_store(
|
||||
/// A helper function to create a file system object store.
|
||||
pub async fn new_fs_object_store(
|
||||
data_home: &str,
|
||||
_file_config: &FileConfig,
|
||||
) -> Result<ObjectStore> {
|
||||
|
||||
23
src/ingester/Cargo.toml
Normal file
23
src/ingester/Cargo.toml
Normal file
@@ -0,0 +1,23 @@
|
||||
[package]
|
||||
name = "ingester"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
clap.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
datanode.workspace = true
|
||||
meta-client.workspace = true
|
||||
mito2.workspace = true
|
||||
object-store.workspace = true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
sst-convert.workspace = true
|
||||
tokio.workspace = true
|
||||
toml.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
294
src/ingester/src/main.rs
Normal file
294
src/ingester/src/main.rs
Normal file
@@ -0,0 +1,294 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use clap::Parser;
|
||||
use common_telemetry::info;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use datanode::config::StorageConfig;
|
||||
use meta_client::MetaClientOptions;
|
||||
use mito2::config::MitoConfig;
|
||||
use mito2::sst::file::IndexType;
|
||||
use mito2::sst::parquet::SstInfo;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sst_convert::converter::{InputFile, InputFileType, SstConverterBuilder};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version, about = "Greptime Ingester", long_about = None)]
|
||||
struct Args {
|
||||
/// Input directory
|
||||
#[arg(short, long)]
|
||||
input_dir: String,
|
||||
/// Directory of input parquet files, relative to input_dir
|
||||
#[arg(short, long)]
|
||||
parquet_dir: Option<String>,
|
||||
/// Directory of input json files, relative to input_dir
|
||||
#[arg(short, long)]
|
||||
remote_write_dir: Option<String>,
|
||||
/// Config file
|
||||
#[arg(short, long)]
|
||||
cfg: String,
|
||||
/// DB HTTP address
|
||||
#[arg(short, long)]
|
||||
db_http_addr: String,
|
||||
|
||||
/// Output path for the converted SST files.
|
||||
/// If it is not None, the converted SST files will be written to the specified path
|
||||
/// in the `input_store`.
|
||||
/// This is for debugging purposes.
|
||||
#[arg(short, long)]
|
||||
sst_output_path: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
struct IngesterConfig {
|
||||
meta_client: MetaClientOptions,
|
||||
storage: StorageConfig,
|
||||
mito: MitoConfig,
|
||||
}
|
||||
|
||||
pub const APP_NAME: &str = "greptime-ingester";
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let _guard = common_telemetry::init_global_logging(
|
||||
APP_NAME,
|
||||
&Default::default(),
|
||||
&Default::default(),
|
||||
None,
|
||||
);
|
||||
|
||||
let args = Args::parse();
|
||||
|
||||
let cfg_file = std::fs::read_to_string(&args.cfg).expect("Failed to read config file");
|
||||
let cfg: IngesterConfig = toml::from_str(&cfg_file).expect("Failed to parse config");
|
||||
|
||||
let sst_builder = {
|
||||
let mut builder = SstConverterBuilder::new_fs(args.input_dir)
|
||||
.with_meta_options(cfg.meta_client)
|
||||
.with_storage_config(cfg.storage)
|
||||
.with_config(cfg.mito);
|
||||
|
||||
if let Some(output_path) = args.sst_output_path {
|
||||
builder = builder.with_output_path(output_path);
|
||||
}
|
||||
|
||||
builder
|
||||
};
|
||||
|
||||
let sst_converter = sst_builder
|
||||
.clone()
|
||||
.build()
|
||||
.await
|
||||
.expect("Failed to build sst converter");
|
||||
|
||||
let input_store = sst_converter.input_store.clone();
|
||||
|
||||
if let Some(parquet_dir) = args.parquet_dir {
|
||||
// using opendal to read parquet files in given input object store
|
||||
let all_parquets = input_store
|
||||
.list(&parquet_dir)
|
||||
.await
|
||||
.expect("Failed to list parquet files");
|
||||
info!("Listed all files in parquet directory: {:?}", all_parquets);
|
||||
let all_parquets = all_parquets
|
||||
.iter()
|
||||
.filter(|parquet| parquet.name().ends_with(".parquet") && parquet.metadata().is_file())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let input_files = all_parquets
|
||||
.iter()
|
||||
.map(|parquet| {
|
||||
let full_table_name = parquet.name().split("-").next().unwrap();
|
||||
let (catalog_name, schema_name, table_name) = extract_name(full_table_name);
|
||||
|
||||
info!(
|
||||
"catalog: {}, schema: {}, table: {}",
|
||||
catalog_name, schema_name, table_name
|
||||
);
|
||||
|
||||
InputFile {
|
||||
catalog: catalog_name.to_string(),
|
||||
schema: schema_name.to_string(),
|
||||
table: table_name.to_string(),
|
||||
path: parquet.path().to_string(),
|
||||
file_type: InputFileType::Parquet,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
convert_and_send(&input_files, sst_builder.clone(), &args.db_http_addr).await;
|
||||
}
|
||||
|
||||
if let Some(remote_write_dir) = args.remote_write_dir {
|
||||
// using opendal to read parquet files in given input object store
|
||||
let all_parquets = input_store
|
||||
.list(&remote_write_dir)
|
||||
.await
|
||||
.expect("Failed to list parquet files");
|
||||
|
||||
let all_parquets = all_parquets
|
||||
.iter()
|
||||
.filter(|parquet| parquet.name().ends_with(".parquet") && parquet.metadata().is_file())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let input_files = all_parquets
|
||||
.iter()
|
||||
.map(|parquet| {
|
||||
let full_table_name = parquet.name().split("-").next().unwrap();
|
||||
let (catalog_name, schema_name, table_name) = extract_name(full_table_name);
|
||||
|
||||
info!(
|
||||
"catalog: {}, schema: {}, table: {}",
|
||||
catalog_name, schema_name, table_name
|
||||
);
|
||||
InputFile {
|
||||
catalog: catalog_name.to_string(),
|
||||
schema: schema_name.to_string(),
|
||||
table: table_name.to_string(),
|
||||
path: parquet.path().to_string(),
|
||||
file_type: InputFileType::RemoteWrite,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
convert_and_send(&input_files, sst_builder.clone(), &args.db_http_addr).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn convert_and_send(
|
||||
input_files: &[InputFile],
|
||||
sst_builder: SstConverterBuilder,
|
||||
db_http_addr: &str,
|
||||
) {
|
||||
let table_names = input_files
|
||||
.iter()
|
||||
.map(|f| (f.schema.clone(), f.table.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
let mut rxs = Vec::new();
|
||||
|
||||
// Spawn a task for each input file
|
||||
info!("Spawning tasks for {} input files", input_files.len());
|
||||
for input_file in input_files.iter() {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let sst_builder = sst_builder.clone();
|
||||
let input_file = (*input_file).clone();
|
||||
tokio::task::spawn(async move {
|
||||
let mut sst_converter = sst_builder
|
||||
.build()
|
||||
.await
|
||||
.expect("Failed to build sst converter");
|
||||
let sst_info = sst_converter
|
||||
.convert_one(&input_file)
|
||||
.await
|
||||
.expect("Failed to convert parquet files");
|
||||
tx.send(sst_info).unwrap();
|
||||
});
|
||||
rxs.push(rx);
|
||||
}
|
||||
|
||||
let mut sst_infos = Vec::new();
|
||||
for rx in rxs {
|
||||
sst_infos.push(rx.await.unwrap());
|
||||
}
|
||||
|
||||
info!("Converted {} input files", sst_infos.len());
|
||||
|
||||
let ingest_reqs = table_names
|
||||
.iter()
|
||||
.zip(sst_infos.iter())
|
||||
.flat_map(|(schema_name, sst_info)| {
|
||||
sst_info
|
||||
.ssts
|
||||
.iter()
|
||||
.map(|sst| to_ingest_sst_req(&schema_name.0, &schema_name.1, sst))
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// send ingest requests to DB
|
||||
send_ingest_requests(db_http_addr, ingest_reqs)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn extract_name(full_table_name: &str) -> (String, String, String) {
|
||||
let mut names = full_table_name.split('.').rev();
|
||||
let table_name = names.next().unwrap();
|
||||
let schema_name = names.next().unwrap_or("public");
|
||||
let catalog_name = names.next().unwrap_or("greptime");
|
||||
(
|
||||
catalog_name.to_string(),
|
||||
schema_name.to_string(),
|
||||
table_name.to_string(),
|
||||
)
|
||||
}
|
||||
|
||||
async fn send_ingest_requests(
|
||||
addr: &str,
|
||||
reqs: Vec<ClientIngestSstRequest>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let client = reqwest::Client::new();
|
||||
for req in reqs {
|
||||
info!("ingesting sst: {req:?}");
|
||||
let req = client.post(addr).json(&req);
|
||||
let resp = req.send().await?;
|
||||
info!("ingest response: {resp:?}");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub(crate) struct ClientIngestSstRequest {
|
||||
schema: Option<String>,
|
||||
table: String,
|
||||
pub(crate) file_id: String,
|
||||
pub(crate) min_ts: i64,
|
||||
pub(crate) max_ts: i64,
|
||||
pub(crate) file_size: u64,
|
||||
pub(crate) rows: u32,
|
||||
pub(crate) row_groups: u32,
|
||||
/// Available indexes of the file.
|
||||
pub available_indexes: Vec<IndexType>,
|
||||
/// Size of the index file.
|
||||
pub index_file_size: u64,
|
||||
pub time_unit: u32,
|
||||
}
|
||||
|
||||
fn to_ingest_sst_req(
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
sst_info: &SstInfo,
|
||||
) -> ClientIngestSstRequest {
|
||||
let index_file_size = sst_info.index_metadata.file_size;
|
||||
let available_indexs = sst_info.index_metadata.build_available_indexes();
|
||||
ClientIngestSstRequest {
|
||||
schema: Some(schema_name.to_string()),
|
||||
table: table_name.to_string(),
|
||||
file_id: sst_info.file_id.to_string(),
|
||||
min_ts: sst_info.time_range.0.value(),
|
||||
max_ts: sst_info.time_range.1.value(),
|
||||
file_size: sst_info.file_size,
|
||||
rows: sst_info.num_rows as _,
|
||||
row_groups: sst_info.num_row_groups as _,
|
||||
available_indexes: available_indexs.to_vec(),
|
||||
index_file_size,
|
||||
time_unit: match sst_info.time_range.0.unit() {
|
||||
TimeUnit::Second => 0,
|
||||
TimeUnit::Millisecond => 3,
|
||||
TimeUnit::Microsecond => 6,
|
||||
TimeUnit::Nanosecond => 9,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -59,7 +59,7 @@ pub mod engine;
|
||||
pub mod error;
|
||||
mod metadata_region;
|
||||
mod metrics;
|
||||
mod row_modifier;
|
||||
pub mod row_modifier;
|
||||
#[cfg(test)]
|
||||
mod test_util;
|
||||
mod utils;
|
||||
|
||||
@@ -40,7 +40,7 @@ const TSID_HASH_SEED: u32 = 846793005;
|
||||
///
|
||||
/// - For [`PrimaryKeyEncoding::Dense`] encoding,
|
||||
/// it adds two columns(`__table_id`, `__tsid`) to the row.
|
||||
pub struct RowModifier {
|
||||
pub(crate) struct RowModifier {
|
||||
codec: SparsePrimaryKeyCodec,
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ impl RowModifier {
|
||||
}
|
||||
|
||||
/// Modify rows with the given primary key encoding.
|
||||
pub fn modify_rows(
|
||||
pub(crate) fn modify_rows(
|
||||
&self,
|
||||
iter: RowsIter,
|
||||
table_id: TableId,
|
||||
@@ -145,16 +145,14 @@ impl RowModifier {
|
||||
|
||||
/// Fills internal columns of a row with table name and a hash of tag values.
|
||||
fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
|
||||
let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED);
|
||||
let mut hasher = TsidGenerator::default();
|
||||
for (name, value) in iter.primary_keys_with_name() {
|
||||
// The type is checked before. So only null is ignored.
|
||||
if let Some(ValueData::StringValue(string)) = &value.value_data {
|
||||
name.hash(&mut hasher);
|
||||
string.hash(&mut hasher);
|
||||
hasher.write_label(name, string);
|
||||
}
|
||||
}
|
||||
// TSID is 64 bits, simply truncate the 128 bits hash
|
||||
let (hash, _) = hasher.finish128();
|
||||
let hash = hasher.finish();
|
||||
|
||||
(
|
||||
ValueData::U32Value(table_id).into(),
|
||||
@@ -163,6 +161,34 @@ impl RowModifier {
|
||||
}
|
||||
}
|
||||
|
||||
/// Tsid generator.
|
||||
pub struct TsidGenerator {
|
||||
hasher: mur3::Hasher128,
|
||||
}
|
||||
|
||||
impl Default for TsidGenerator {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TsidGenerator {
|
||||
/// Writes a label pair to the generator.
|
||||
pub fn write_label(&mut self, name: &str, value: &str) {
|
||||
name.hash(&mut self.hasher);
|
||||
value.hash(&mut self.hasher);
|
||||
}
|
||||
|
||||
/// Generates a new TSID.
|
||||
pub fn finish(&mut self) -> u64 {
|
||||
// TSID is 64 bits, simply truncate the 128 bits hash
|
||||
let (hash, _) = self.hasher.finish128();
|
||||
hash
|
||||
}
|
||||
}
|
||||
|
||||
/// Index of a value.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct ValueIndex {
|
||||
|
||||
@@ -121,7 +121,7 @@ impl AccessLayer {
|
||||
/// Writes a SST with specific `file_id` and `metadata` to the layer.
|
||||
///
|
||||
/// Returns the info of the SST. If no data written, returns None.
|
||||
pub(crate) async fn write_sst(
|
||||
pub async fn write_sst(
|
||||
&self,
|
||||
request: SstWriteRequest,
|
||||
write_opts: &WriteOptions,
|
||||
@@ -191,26 +191,26 @@ impl AccessLayer {
|
||||
|
||||
/// `OperationType` represents the origin of the `SstWriteRequest`.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub(crate) enum OperationType {
|
||||
pub enum OperationType {
|
||||
Flush,
|
||||
Compact,
|
||||
}
|
||||
|
||||
/// Contents to build a SST.
|
||||
pub(crate) struct SstWriteRequest {
|
||||
pub(crate) op_type: OperationType,
|
||||
pub(crate) metadata: RegionMetadataRef,
|
||||
pub(crate) source: Source,
|
||||
pub(crate) cache_manager: CacheManagerRef,
|
||||
pub struct SstWriteRequest {
|
||||
pub op_type: OperationType,
|
||||
pub metadata: RegionMetadataRef,
|
||||
pub source: Source,
|
||||
pub cache_manager: CacheManagerRef,
|
||||
#[allow(dead_code)]
|
||||
pub(crate) storage: Option<String>,
|
||||
pub(crate) max_sequence: Option<SequenceNumber>,
|
||||
pub storage: Option<String>,
|
||||
pub max_sequence: Option<SequenceNumber>,
|
||||
|
||||
/// Configs for index
|
||||
pub(crate) index_options: IndexOptions,
|
||||
pub(crate) inverted_index_config: InvertedIndexConfig,
|
||||
pub(crate) fulltext_index_config: FulltextIndexConfig,
|
||||
pub(crate) bloom_filter_index_config: BloomFilterConfig,
|
||||
pub index_options: IndexOptions,
|
||||
pub inverted_index_config: InvertedIndexConfig,
|
||||
pub fulltext_index_config: FulltextIndexConfig,
|
||||
pub bloom_filter_index_config: BloomFilterConfig,
|
||||
}
|
||||
|
||||
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
|
||||
|
||||
@@ -46,6 +46,7 @@ const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
|
||||
pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
|
||||
|
||||
/// Configuration for [MitoEngine](crate::engine::MitoEngine).
|
||||
/// Before using the config, make sure to call `MitoConfig::validate()` to check if the config is valid.
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct MitoConfig {
|
||||
|
||||
@@ -42,6 +42,13 @@ use crate::worker::WorkerId;
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display("External error, context: {}", context))]
|
||||
External {
|
||||
source: BoxedError,
|
||||
context: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Failed to encode sparse primary key, reason: {}", reason))]
|
||||
EncodeSparsePrimaryKey {
|
||||
reason: String,
|
||||
@@ -1085,7 +1092,7 @@ impl ErrorExt for Error {
|
||||
| PuffinPurgeStager { source, .. } => source.status_code(),
|
||||
CleanDir { .. } => StatusCode::Unexpected,
|
||||
InvalidConfig { .. } => StatusCode::InvalidArguments,
|
||||
StaleLogEntry { .. } => StatusCode::Unexpected,
|
||||
StaleLogEntry { .. } | External { .. } => StatusCode::Unexpected,
|
||||
|
||||
FilterRecordBatch { source, .. } => source.status_code(),
|
||||
|
||||
|
||||
@@ -23,8 +23,8 @@
|
||||
#[cfg_attr(feature = "test", allow(unused))]
|
||||
pub mod test_util;
|
||||
|
||||
mod access_layer;
|
||||
mod cache;
|
||||
pub mod access_layer;
|
||||
pub mod cache;
|
||||
pub mod compaction;
|
||||
pub mod config;
|
||||
pub mod engine;
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
//! Mito region.
|
||||
|
||||
pub(crate) mod opener;
|
||||
pub mod opener;
|
||||
pub mod options;
|
||||
pub(crate) mod version;
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
//! Region opener.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::{AtomicI64, AtomicU64};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
@@ -27,7 +27,9 @@ use object_store::util::{join_dir, normalize_dir};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::logstore::provider::Provider;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
|
||||
use store_api::metadata::{
|
||||
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
|
||||
};
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::{ColumnId, RegionId};
|
||||
|
||||
@@ -38,6 +40,7 @@ use crate::error::{
|
||||
EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
|
||||
Result, StaleLogEntrySnafu,
|
||||
};
|
||||
use crate::manifest::action::RegionManifest;
|
||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
||||
use crate::manifest::storage::manifest_compress_type;
|
||||
use crate::memtable::time_partition::TimePartitions;
|
||||
@@ -203,11 +206,16 @@ impl RegionOpener {
|
||||
}
|
||||
// Safety: must be set before calling this method.
|
||||
let options = self.options.take().unwrap();
|
||||
let object_store = self.object_store(&options.storage)?.clone();
|
||||
let object_store = get_object_store(&options.storage, &self.object_store_manager)?.clone();
|
||||
let provider = self.provider(&options.wal_options);
|
||||
let metadata = Arc::new(metadata);
|
||||
// Create a manifest manager for this region and writes regions to the manifest file.
|
||||
let region_manifest_options = self.manifest_options(config, &options)?;
|
||||
let region_manifest_options = Self::manifest_options(
|
||||
config,
|
||||
&options,
|
||||
&self.region_dir,
|
||||
&self.object_store_manager,
|
||||
)?;
|
||||
let manifest_manager = RegionManifestManager::new(
|
||||
metadata.clone(),
|
||||
region_manifest_options,
|
||||
@@ -312,7 +320,12 @@ impl RegionOpener {
|
||||
) -> Result<Option<MitoRegion>> {
|
||||
let region_options = self.options.as_ref().unwrap().clone();
|
||||
|
||||
let region_manifest_options = self.manifest_options(config, ®ion_options)?;
|
||||
let region_manifest_options = Self::manifest_options(
|
||||
config,
|
||||
®ion_options,
|
||||
&self.region_dir,
|
||||
&self.object_store_manager,
|
||||
)?;
|
||||
let Some(manifest_manager) = RegionManifestManager::open(
|
||||
region_manifest_options,
|
||||
self.stats.total_manifest_size.clone(),
|
||||
@@ -332,7 +345,7 @@ impl RegionOpener {
|
||||
.take()
|
||||
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
|
||||
let on_region_opened = wal.on_region_opened();
|
||||
let object_store = self.object_store(®ion_options.storage)?.clone();
|
||||
let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?;
|
||||
|
||||
debug!("Open region {} with options: {:?}", region_id, self.options);
|
||||
|
||||
@@ -422,13 +435,14 @@ impl RegionOpener {
|
||||
|
||||
/// Returns a new manifest options.
|
||||
fn manifest_options(
|
||||
&self,
|
||||
config: &MitoConfig,
|
||||
options: &RegionOptions,
|
||||
region_dir: &str,
|
||||
object_store_manager: &ObjectStoreManagerRef,
|
||||
) -> Result<RegionManifestOptions> {
|
||||
let object_store = self.object_store(&options.storage)?.clone();
|
||||
let object_store = get_object_store(&options.storage, object_store_manager)?;
|
||||
Ok(RegionManifestOptions {
|
||||
manifest_dir: new_manifest_dir(&self.region_dir),
|
||||
manifest_dir: new_manifest_dir(region_dir),
|
||||
object_store,
|
||||
// We don't allow users to set the compression algorithm as we use it as a file suffix.
|
||||
// Currently, the manifest storage doesn't have good support for changing compression algorithms.
|
||||
@@ -436,20 +450,72 @@ impl RegionOpener {
|
||||
checkpoint_distance: config.manifest_checkpoint_distance,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
|
||||
fn object_store(&self, name: &Option<String>) -> Result<&object_store::ObjectStore> {
|
||||
if let Some(name) = name {
|
||||
Ok(self
|
||||
.object_store_manager
|
||||
.find(name)
|
||||
.context(ObjectStoreNotFoundSnafu {
|
||||
object_store: name.to_string(),
|
||||
})?)
|
||||
} else {
|
||||
Ok(self.object_store_manager.default_object_store())
|
||||
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
|
||||
pub fn get_object_store(
|
||||
name: &Option<String>,
|
||||
object_store_manager: &ObjectStoreManagerRef,
|
||||
) -> Result<object_store::ObjectStore> {
|
||||
if let Some(name) = name {
|
||||
Ok(object_store_manager
|
||||
.find(name)
|
||||
.context(ObjectStoreNotFoundSnafu {
|
||||
object_store: name.to_string(),
|
||||
})?
|
||||
.clone())
|
||||
} else {
|
||||
Ok(object_store_manager.default_object_store().clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// A loader for loading metadata from a region dir.
|
||||
pub struct RegionMetadataLoader {
|
||||
config: Arc<MitoConfig>,
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
}
|
||||
|
||||
impl RegionMetadataLoader {
|
||||
/// Creates a new `RegionOpenerBuilder`.
|
||||
pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
|
||||
Self {
|
||||
config,
|
||||
object_store_manager,
|
||||
}
|
||||
}
|
||||
|
||||
/// Loads the metadata of the region from the region dir.
|
||||
pub async fn load(
|
||||
&self,
|
||||
region_dir: &str,
|
||||
region_options: &RegionOptions,
|
||||
) -> Result<Option<RegionMetadataRef>> {
|
||||
let manifest = self.load_manifest(region_dir, region_options).await?;
|
||||
Ok(manifest.map(|m| m.metadata.clone()))
|
||||
}
|
||||
|
||||
/// Loads the manifest of the region from the region dir.
|
||||
pub async fn load_manifest(
|
||||
&self,
|
||||
region_dir: &str,
|
||||
region_options: &RegionOptions,
|
||||
) -> Result<Option<Arc<RegionManifest>>> {
|
||||
let region_manifest_options = RegionOpener::manifest_options(
|
||||
&self.config,
|
||||
region_options,
|
||||
region_dir,
|
||||
&self.object_store_manager,
|
||||
)?;
|
||||
let Some(manifest_manager) =
|
||||
RegionManifestManager::open(region_manifest_options, Arc::new(AtomicU64::new(0)))
|
||||
.await?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let manifest = manifest_manager.manifest();
|
||||
Ok(Some(manifest))
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks whether the recovered region has the same schema as region to create.
|
||||
|
||||
@@ -33,6 +33,8 @@ use crate::row_converter::dense::SortField;
|
||||
use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
|
||||
|
||||
/// A codec for sparse key of metrics.
|
||||
/// It requires the input primary key columns are sorted by the column name in lexicographical order.
|
||||
/// It encodes the column id of the physical region.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SparsePrimaryKeyCodec {
|
||||
inner: Arc<SparsePrimaryKeyCodecInner>,
|
||||
|
||||
@@ -16,9 +16,9 @@ pub(crate) mod bloom_filter;
|
||||
mod codec;
|
||||
pub(crate) mod fulltext_index;
|
||||
mod indexer;
|
||||
pub(crate) mod intermediate;
|
||||
pub mod intermediate;
|
||||
pub(crate) mod inverted_index;
|
||||
pub(crate) mod puffin_manager;
|
||||
pub mod puffin_manager;
|
||||
mod statistics;
|
||||
pub(crate) mod store;
|
||||
|
||||
|
||||
@@ -49,6 +49,11 @@ impl IntermediateManager {
|
||||
/// Create a new `IntermediateManager` with the given root path.
|
||||
/// It will clean up all garbage intermediate files from previous runs.
|
||||
pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
|
||||
common_telemetry::info!(
|
||||
"Initializing intermediate manager, aux_path: {}",
|
||||
aux_path.as_ref()
|
||||
);
|
||||
|
||||
let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
|
||||
let store = InstrumentedStore::new(store);
|
||||
|
||||
|
||||
@@ -61,6 +61,7 @@ impl Default for WriteOptions {
|
||||
}
|
||||
|
||||
/// Parquet SST info returned by the writer.
|
||||
#[derive(Debug)]
|
||||
pub struct SstInfo {
|
||||
/// SST file id.
|
||||
pub file_id: FileId,
|
||||
|
||||
34
src/sst-convert/Cargo.toml
Normal file
34
src/sst-convert/Cargo.toml
Normal file
@@ -0,0 +1,34 @@
|
||||
[package]
|
||||
name = "sst-convert"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arrow-array.workspace = true
|
||||
async-trait.workspace = true
|
||||
catalog.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-meta.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
datanode.workspace = true
|
||||
datatypes.workspace = true
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
indicatif = "0.17.0"
|
||||
meta-client.workspace = true
|
||||
metric-engine.workspace = true
|
||||
mito2.workspace = true
|
||||
object-store.workspace = true
|
||||
parquet.workspace = true
|
||||
parquet_opendal = "0.3.0"
|
||||
prost.workspace = true
|
||||
snafu.workspace = true
|
||||
store-api.workspace = true
|
||||
table.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
232
src/sst-convert/src/converter.rs
Normal file
232
src/sst-convert/src/converter.rs
Normal file
@@ -0,0 +1,232 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! SST converter.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::info;
|
||||
use datanode::config::StorageConfig;
|
||||
use datanode::datanode::DatanodeBuilder;
|
||||
use meta_client::MetaClientOptions;
|
||||
use mito2::access_layer::SstInfoArray;
|
||||
use mito2::config::MitoConfig;
|
||||
use mito2::read::Source;
|
||||
use mito2::sst::parquet::WriteOptions;
|
||||
use object_store::manager::ObjectStoreManagerRef;
|
||||
use object_store::services::Fs;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{DatanodeSnafu, MitoSnafu, ObjectStoreSnafu, Result};
|
||||
use crate::reader::InputReaderBuilder;
|
||||
use crate::table::TableMetadataHelper;
|
||||
use crate::writer::RegionWriterBuilder;
|
||||
|
||||
/// Input file type.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum InputFileType {
|
||||
/// File type is Parquet.
|
||||
Parquet,
|
||||
/// File type is remote write JSON.
|
||||
RemoteWrite,
|
||||
}
|
||||
|
||||
/// Description of a file to convert.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct InputFile {
|
||||
/// Catalog of the table.
|
||||
pub catalog: String,
|
||||
/// Schema of the table.
|
||||
pub schema: String,
|
||||
/// Table to write.
|
||||
/// For metric engine, it needs to be the physical table name.
|
||||
pub table: String,
|
||||
/// Path to the file.
|
||||
pub path: String,
|
||||
/// Type of the input file.
|
||||
pub file_type: InputFileType,
|
||||
}
|
||||
|
||||
/// Description of converted files for an input file.
|
||||
/// A input file can be converted to multiple output files.
|
||||
#[derive(Debug)]
|
||||
pub struct OutputSst {
|
||||
/// Meta of output SST files.
|
||||
pub ssts: SstInfoArray,
|
||||
}
|
||||
|
||||
/// SST converter takes a list of source files and converts them to SST files.
|
||||
pub struct SstConverter {
|
||||
/// Object store for input files.
|
||||
pub input_store: ObjectStore,
|
||||
/// Output path for the converted SST files.
|
||||
/// If it is not None, the converted SST files will be written to the specified path
|
||||
/// in the `input_store`.
|
||||
/// This is for debugging purposes.
|
||||
output_path: Option<String>,
|
||||
reader_builder: InputReaderBuilder,
|
||||
writer_builder: RegionWriterBuilder,
|
||||
write_opts: WriteOptions,
|
||||
}
|
||||
|
||||
impl SstConverter {
|
||||
/// Converts a list of input to a list of outputs.
|
||||
pub async fn convert(&mut self, input: &[InputFile]) -> Result<Vec<OutputSst>> {
|
||||
common_telemetry::info!("Converting input {} files", input.len());
|
||||
|
||||
let mut outputs = Vec::with_capacity(input.len());
|
||||
let bar = indicatif::ProgressBar::new(input.len() as u64);
|
||||
for file in input {
|
||||
let output = self.convert_one(file).await?;
|
||||
outputs.push(output);
|
||||
bar.inc(1);
|
||||
}
|
||||
bar.finish();
|
||||
|
||||
common_telemetry::info!("Converted {} files", outputs.len());
|
||||
|
||||
Ok(outputs)
|
||||
}
|
||||
|
||||
/// Converts one input.
|
||||
pub async fn convert_one(&mut self, input: &InputFile) -> Result<OutputSst> {
|
||||
common_telemetry::info!(
|
||||
"Converting input file, input_path: {}, output_path: {}",
|
||||
input.path,
|
||||
self.output_path.as_deref().unwrap_or(&input.path)
|
||||
);
|
||||
|
||||
let reader_info = self.reader_builder.read_input(input).await?;
|
||||
let source = Source::Reader(reader_info.reader);
|
||||
let output_dir = self
|
||||
.output_path
|
||||
.as_deref()
|
||||
.unwrap_or(&reader_info.region_dir);
|
||||
let writer = self
|
||||
.writer_builder
|
||||
.build(reader_info.metadata, output_dir, reader_info.region_options)
|
||||
.await
|
||||
.context(MitoSnafu)?;
|
||||
|
||||
let ssts = writer
|
||||
.write_sst(source, &self.write_opts)
|
||||
.await
|
||||
.context(MitoSnafu)?;
|
||||
common_telemetry::info!("Converted input file, input_path: {}", input.path);
|
||||
Ok(OutputSst { ssts })
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder to build a SST converter.
|
||||
#[derive(Clone)]
|
||||
pub struct SstConverterBuilder {
|
||||
input_path: String,
|
||||
meta_options: MetaClientOptions,
|
||||
storage_config: StorageConfig,
|
||||
output_path: Option<String>,
|
||||
config: MitoConfig,
|
||||
}
|
||||
|
||||
impl SstConverterBuilder {
|
||||
/// Creates a new builder with a file system path as input.
|
||||
pub fn new_fs(input_path: String) -> Self {
|
||||
Self {
|
||||
input_path,
|
||||
meta_options: MetaClientOptions::default(),
|
||||
storage_config: StorageConfig::default(),
|
||||
output_path: None,
|
||||
config: MitoConfig::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Attaches the meta client options.
|
||||
pub fn with_meta_options(mut self, meta_options: MetaClientOptions) -> Self {
|
||||
self.meta_options = meta_options;
|
||||
self
|
||||
}
|
||||
|
||||
/// Attaches the storage config.
|
||||
pub fn with_storage_config(mut self, storage_config: StorageConfig) -> Self {
|
||||
self.storage_config = storage_config;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the output path for the converted SST files.
|
||||
/// This is for debugging purposes.
|
||||
pub fn with_output_path(mut self, output_path: String) -> Self {
|
||||
self.output_path = Some(output_path);
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the config for the converted SST files.
|
||||
pub fn with_config(mut self, config: MitoConfig) -> Self {
|
||||
self.config = config;
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds a SST converter.
|
||||
pub async fn build(mut self) -> Result<SstConverter> {
|
||||
self.config
|
||||
.sanitize(&self.storage_config.data_home)
|
||||
.context(MitoSnafu)?;
|
||||
|
||||
common_telemetry::info!(
|
||||
"Building SST converter, input_path: {}, storage_config: {:?}, config: {:?}",
|
||||
self.input_path,
|
||||
self.storage_config,
|
||||
self.config
|
||||
);
|
||||
|
||||
let input_store = new_input_store(&self.input_path).await?;
|
||||
let output_store_manager = new_object_store_manager(&self.storage_config).await?;
|
||||
let table_helper = TableMetadataHelper::new(&self.meta_options).await?;
|
||||
let config = Arc::new(self.config);
|
||||
let reader_builder = InputReaderBuilder::new(
|
||||
input_store.clone(),
|
||||
table_helper,
|
||||
output_store_manager.clone(),
|
||||
config.clone(),
|
||||
);
|
||||
let writer_builder = RegionWriterBuilder::new(config, output_store_manager)
|
||||
.await
|
||||
.context(MitoSnafu)?;
|
||||
|
||||
Ok(SstConverter {
|
||||
input_store,
|
||||
output_path: self.output_path,
|
||||
reader_builder,
|
||||
writer_builder,
|
||||
write_opts: WriteOptions::default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A hepler function to create the object store manager.
|
||||
pub async fn new_object_store_manager(config: &StorageConfig) -> Result<ObjectStoreManagerRef> {
|
||||
DatanodeBuilder::build_object_store_manager(config)
|
||||
.await
|
||||
.context(DatanodeSnafu)
|
||||
}
|
||||
|
||||
/// Creates a input store from a path.
|
||||
pub async fn new_input_store(path: &str) -> Result<ObjectStore> {
|
||||
let builder = Fs::default().root(path);
|
||||
info!("Creating input store, path: {}", path);
|
||||
let object_store = ObjectStore::new(builder)
|
||||
.context(ObjectStoreSnafu)?
|
||||
.finish();
|
||||
info!("Created input store: {:?}", object_store);
|
||||
Ok(object_store)
|
||||
}
|
||||
111
src/sst-convert/src/error.rs
Normal file
111
src/sst-convert/src/error.rs
Normal file
@@ -0,0 +1,111 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Errors for SST conversion.
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use snafu::{Location, Snafu};
|
||||
|
||||
#[derive(Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display("Object store error"))]
|
||||
ObjectStore {
|
||||
#[snafu(source)]
|
||||
error: object_store::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Missing __name__ label"))]
|
||||
MissingMetricName {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Table not found: {}", table_name))]
|
||||
MissingTable {
|
||||
table_name: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Column not found: {}", column_name))]
|
||||
MissingColumn {
|
||||
column_name: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Mito error"))]
|
||||
Mito {
|
||||
source: mito2::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Datanode error"))]
|
||||
Datanode {
|
||||
source: datanode::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Meta error"))]
|
||||
Meta {
|
||||
source: common_meta::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Meta client error"))]
|
||||
MetaClient {
|
||||
source: meta_client::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Decode error"))]
|
||||
Decode {
|
||||
#[snafu(source)]
|
||||
error: prost::DecodeError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
Error::ObjectStore { .. } => StatusCode::StorageUnavailable,
|
||||
Error::MissingMetricName { .. } => StatusCode::InvalidArguments,
|
||||
Error::MissingTable { .. } => StatusCode::TableNotFound,
|
||||
Error::MissingColumn { .. } => StatusCode::TableColumnNotFound,
|
||||
Error::Mito { source, .. } => source.status_code(),
|
||||
Error::Datanode { source, .. } => source.status_code(),
|
||||
Error::Meta { source, .. } => source.status_code(),
|
||||
Error::MetaClient { source, .. } => source.status_code(),
|
||||
Error::Decode { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
21
src/sst-convert/src/lib.rs
Normal file
21
src/sst-convert/src/lib.rs
Normal file
@@ -0,0 +1,21 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod converter;
|
||||
pub mod error;
|
||||
pub mod reader;
|
||||
mod table;
|
||||
pub mod writer;
|
||||
|
||||
pub use reader::parquet::{OpenDALParquetReader, RawParquetReader};
|
||||
183
src/sst-convert/src/reader.rs
Normal file
183
src/sst-convert/src/reader.rs
Normal file
@@ -0,0 +1,183 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Reader to read input data in different formats.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use mito2::config::MitoConfig;
|
||||
use mito2::read::BoxedBatchReader;
|
||||
use mito2::region::opener::RegionMetadataLoader;
|
||||
use mito2::region::options::RegionOptions;
|
||||
use object_store::manager::ObjectStoreManagerRef;
|
||||
use object_store::util::join_dir;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::metric_engine_consts::DATA_REGION_SUBDIR;
|
||||
use store_api::path_utils::region_dir;
|
||||
use store_api::storage::{RegionId, SequenceNumber};
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::converter::{InputFile, InputFileType};
|
||||
use crate::error::{MissingTableSnafu, MitoSnafu, Result};
|
||||
use crate::reader::remote_write::RemoteWriteReader;
|
||||
use crate::table::TableMetadataHelper;
|
||||
use crate::OpenDALParquetReader;
|
||||
|
||||
pub(crate) mod parquet;
|
||||
pub mod remote_write;
|
||||
|
||||
/// Reader and context.
|
||||
pub struct ReaderInfo {
|
||||
pub reader: BoxedBatchReader,
|
||||
pub region_dir: String,
|
||||
pub region_options: RegionOptions,
|
||||
pub metadata: RegionMetadataRef,
|
||||
}
|
||||
|
||||
/// Builder to build readers to read input files.
|
||||
pub(crate) struct InputReaderBuilder {
|
||||
input_store: ObjectStore,
|
||||
table_helper: TableMetadataHelper,
|
||||
region_loader: RegionMetadataLoader,
|
||||
/// Cached region infos for tables.
|
||||
region_infos: HashMap<String, RegionInfo>,
|
||||
}
|
||||
|
||||
impl InputReaderBuilder {
|
||||
pub(crate) fn new(
|
||||
input_store: ObjectStore,
|
||||
table_helper: TableMetadataHelper,
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
config: Arc<MitoConfig>,
|
||||
) -> Self {
|
||||
let region_loader = RegionMetadataLoader::new(config, object_store_manager);
|
||||
|
||||
Self {
|
||||
input_store,
|
||||
table_helper,
|
||||
region_loader,
|
||||
region_infos: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a reader to read the input file.
|
||||
pub async fn read_input(&mut self, input: &InputFile) -> Result<ReaderInfo> {
|
||||
match input.file_type {
|
||||
InputFileType::Parquet => self.read_parquet(input).await,
|
||||
InputFileType::RemoteWrite => self.read_remote_write(input).await,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a reader to read the parquet file.
|
||||
pub async fn read_parquet(&mut self, input: &InputFile) -> Result<ReaderInfo> {
|
||||
let region_info = self.get_region_info(input).await?;
|
||||
let reader = OpenDALParquetReader::new(
|
||||
self.input_store.clone(),
|
||||
&input.path,
|
||||
region_info.metadata.clone(),
|
||||
Some(region_info.flushed_sequence),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(ReaderInfo {
|
||||
reader: Box::new(reader),
|
||||
region_dir: region_info.region_dir,
|
||||
region_options: region_info.region_options,
|
||||
metadata: region_info.metadata,
|
||||
})
|
||||
}
|
||||
|
||||
/// Builds a reader to read the remote write file.
|
||||
pub async fn read_remote_write(&mut self, input: &InputFile) -> Result<ReaderInfo> {
|
||||
let region_info = self.get_region_info(input).await?;
|
||||
// TODO(yingwen): Should update the sequence.
|
||||
let reader = RemoteWriteReader::open(
|
||||
self.input_store.clone(),
|
||||
&input.path,
|
||||
&input.catalog,
|
||||
&input.schema,
|
||||
region_info.metadata.clone(),
|
||||
self.table_helper.clone(),
|
||||
)
|
||||
.await?
|
||||
.with_sequence(region_info.flushed_sequence);
|
||||
|
||||
Ok(ReaderInfo {
|
||||
reader: Box::new(reader),
|
||||
region_dir: region_info.region_dir,
|
||||
region_options: region_info.region_options,
|
||||
metadata: region_info.metadata,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_region_info(&mut self, input: &InputFile) -> Result<RegionInfo> {
|
||||
let cache_key = cache_key(&input.catalog, &input.schema, &input.table);
|
||||
if let Some(region_info) = self.region_infos.get(&cache_key) {
|
||||
return Ok(region_info.clone());
|
||||
}
|
||||
|
||||
let table_info = self
|
||||
.table_helper
|
||||
.get_table(&input.catalog, &input.schema, &input.table)
|
||||
.await?
|
||||
.context(MissingTableSnafu {
|
||||
table_name: &input.table,
|
||||
})?;
|
||||
let region_id = to_region_id(table_info.table_info.ident.table_id);
|
||||
let opts = table_info.table_info.to_region_options();
|
||||
// TODO(yingwen): We ignore WAL options now. We should `prepare_wal_options()` in the future.
|
||||
let region_options = RegionOptions::try_from(&opts).context(MitoSnafu)?;
|
||||
let mut region_dir = region_dir(&table_info.region_storage_path(), region_id);
|
||||
if input.file_type == InputFileType::RemoteWrite {
|
||||
// metric engine has two internal regions.
|
||||
region_dir = join_dir(®ion_dir, DATA_REGION_SUBDIR);
|
||||
}
|
||||
let manifest = self
|
||||
.region_loader
|
||||
.load_manifest(®ion_dir, ®ion_options)
|
||||
.await
|
||||
.context(MitoSnafu)?
|
||||
.context(MissingTableSnafu {
|
||||
table_name: &table_info.table_info.name,
|
||||
})?;
|
||||
let region_info = RegionInfo {
|
||||
metadata: manifest.metadata.clone(),
|
||||
flushed_sequence: manifest.flushed_sequence,
|
||||
region_dir,
|
||||
region_options,
|
||||
};
|
||||
self.region_infos.insert(cache_key, region_info.clone());
|
||||
|
||||
Ok(region_info)
|
||||
}
|
||||
}
|
||||
|
||||
fn to_region_id(table_id: TableId) -> RegionId {
|
||||
RegionId::new(table_id, 0)
|
||||
}
|
||||
|
||||
fn cache_key(catalog: &str, schema: &str, table: &str) -> String {
|
||||
format!("{}/{}/{}", catalog, schema, table)
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct RegionInfo {
|
||||
metadata: RegionMetadataRef,
|
||||
flushed_sequence: SequenceNumber,
|
||||
region_dir: String,
|
||||
region_options: RegionOptions,
|
||||
}
|
||||
318
src/sst-convert/src/reader/parquet.rs
Normal file
318
src/sst-convert/src/reader/parquet.rs
Normal file
@@ -0,0 +1,318 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Parquet file format support.
|
||||
|
||||
use std::collections::{BTreeMap, HashMap, VecDeque};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::OpType;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_error::snafu::{OptionExt, ResultExt};
|
||||
use common_recordbatch::error::UnsupportedOperationSnafu;
|
||||
use common_recordbatch::RecordBatch;
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::schema::Schema;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{MutableVector, UInt64VectorBuilder, UInt8VectorBuilder};
|
||||
use futures_util::StreamExt;
|
||||
use mito2::error::ReadParquetSnafu;
|
||||
use mito2::read::{Batch, BatchColumn, BatchReader};
|
||||
use mito2::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SparsePrimaryKeyCodec};
|
||||
use object_store::ObjectStore;
|
||||
use parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream};
|
||||
use parquet::arrow::ParquetRecordBatchStreamBuilder;
|
||||
use parquet_opendal::AsyncReader;
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
|
||||
use crate::error::{MitoSnafu, ObjectStoreSnafu, Result};
|
||||
|
||||
pub struct OpenDALParquetReader {
|
||||
inner: RawParquetReader<AsyncReader>,
|
||||
}
|
||||
|
||||
impl OpenDALParquetReader {
|
||||
pub async fn new(
|
||||
operator: ObjectStore,
|
||||
path: &str,
|
||||
metadata: RegionMetadataRef,
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
) -> Result<Self> {
|
||||
let reader = operator.reader_with(path).await.context(ObjectStoreSnafu)?;
|
||||
|
||||
let content_len = operator
|
||||
.stat(path)
|
||||
.await
|
||||
.context(ObjectStoreSnafu)?
|
||||
.content_length();
|
||||
|
||||
let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024);
|
||||
let stream = ParquetRecordBatchStreamBuilder::new(reader)
|
||||
.await
|
||||
.context(ReadParquetSnafu { path })
|
||||
.context(MitoSnafu)?
|
||||
.build()
|
||||
.context(ReadParquetSnafu { path })
|
||||
.context(MitoSnafu)?;
|
||||
Ok(Self {
|
||||
inner: RawParquetReader::new(stream, metadata, override_sequence, path),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl BatchReader for OpenDALParquetReader {
|
||||
async fn next_batch(&mut self) -> mito2::error::Result<Option<Batch>> {
|
||||
self.inner.next_batch().await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RawParquetReader<T> {
|
||||
metadata: RegionMetadataRef,
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
output_batch_queue: VecDeque<Batch>,
|
||||
stream: Pin<Box<ParquetRecordBatchStream<T>>>,
|
||||
path: String,
|
||||
}
|
||||
|
||||
impl<T: AsyncFileReader + Unpin + Send + 'static> RawParquetReader<T> {
|
||||
pub fn new(
|
||||
stream: ParquetRecordBatchStream<T>,
|
||||
metadata: RegionMetadataRef,
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
path: &str,
|
||||
) -> Self {
|
||||
Self {
|
||||
stream: Box::pin(stream),
|
||||
metadata,
|
||||
override_sequence,
|
||||
output_batch_queue: VecDeque::new(),
|
||||
path: path.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn next_batch_inner(&mut self) -> mito2::error::Result<Option<Batch>> {
|
||||
if let Some(batch) = self.output_batch_queue.pop_front() {
|
||||
return Ok(Some(batch));
|
||||
}
|
||||
let Some(next_input_rb) = self.stream.next().await.transpose().with_context(|_| {
|
||||
mito2::error::ReadParquetSnafu {
|
||||
path: self.path.clone(),
|
||||
}
|
||||
})?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let schema = Arc::new(
|
||||
Schema::try_from(next_input_rb.schema())
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| mito2::error::ExternalSnafu {
|
||||
context: format!(
|
||||
"Failed to convert Schema from DfSchema: {:?}",
|
||||
next_input_rb.schema()
|
||||
),
|
||||
})?,
|
||||
);
|
||||
let rb = RecordBatch::try_from_df_record_batch(schema, next_input_rb)
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| mito2::error::ExternalSnafu {
|
||||
context: "Failed to convert RecordBatch from DfRecordBatch".to_string(),
|
||||
})?;
|
||||
let new_batches = extract_to_batches(&rb, &self.metadata, self.override_sequence)
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| mito2::error::ExternalSnafu {
|
||||
context: format!("Failed to extract batches from RecordBatch: {:?}", rb),
|
||||
})?;
|
||||
|
||||
self.output_batch_queue.extend(new_batches);
|
||||
Ok(self.output_batch_queue.pop_front())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<T: AsyncFileReader + Unpin + Send + 'static> BatchReader for RawParquetReader<T> {
|
||||
async fn next_batch(&mut self) -> mito2::error::Result<Option<Batch>> {
|
||||
self.next_batch_inner().await
|
||||
}
|
||||
}
|
||||
|
||||
pub fn extract_to_batches(
|
||||
rb: &RecordBatch,
|
||||
metadata: &RegionMetadataRef,
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
) -> Result<Vec<Batch>, BoxedError> {
|
||||
let pk_codec: Box<dyn PrimaryKeyCodec> = match metadata.primary_key_encoding {
|
||||
PrimaryKeyEncoding::Dense => Box::new(DensePrimaryKeyCodec::new(metadata)),
|
||||
PrimaryKeyEncoding::Sparse => Box::new(SparsePrimaryKeyCodec::new(metadata)),
|
||||
};
|
||||
let pk_ids = metadata.primary_key.clone();
|
||||
let pk_names: Vec<_> = pk_ids
|
||||
.iter()
|
||||
.map(|id| {
|
||||
metadata
|
||||
.column_by_id(*id)
|
||||
.expect("Can't find column by id")
|
||||
.column_schema
|
||||
.name
|
||||
.clone()
|
||||
})
|
||||
.collect();
|
||||
let pk_pos_in_rb: Vec<_> = pk_names
|
||||
.into_iter()
|
||||
.map(|name| {
|
||||
rb.schema
|
||||
.column_index_by_name(&name)
|
||||
.context(UnsupportedOperationSnafu {
|
||||
reason: format!("Can't find column {} in rb={:?}", name, rb),
|
||||
})
|
||||
.map_err(BoxedError::new)
|
||||
})
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
let mut pk_to_batchs: HashMap<Vec<u8>, SSTBatchBuilder> = HashMap::new();
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
for row in rb.rows() {
|
||||
let pk_values: Vec<_> = pk_ids
|
||||
.iter()
|
||||
.zip(pk_pos_in_rb.iter())
|
||||
.map(|(id, pos)| (*id, row[*pos].clone()))
|
||||
.collect();
|
||||
pk_codec
|
||||
.encode_values(&pk_values, &mut buffer)
|
||||
.map_err(BoxedError::new)?;
|
||||
let cur_pk = &buffer;
|
||||
let builder = if let Some(builder) = pk_to_batchs.get_mut(cur_pk) {
|
||||
builder
|
||||
} else {
|
||||
let builder =
|
||||
SSTBatchBuilder::new(rb, metadata, override_sequence).map_err(BoxedError::new)?;
|
||||
pk_to_batchs.insert(cur_pk.clone(), builder);
|
||||
pk_to_batchs.get_mut(cur_pk).expect("Just inserted")
|
||||
};
|
||||
builder.push_row(&row).map_err(BoxedError::new)?;
|
||||
}
|
||||
|
||||
// sort batches by primary key
|
||||
let mut batches = BTreeMap::new();
|
||||
for (pk, builder) in pk_to_batchs {
|
||||
batches.insert(pk.clone(), builder.finish(pk).map_err(BoxedError::new)?);
|
||||
}
|
||||
let batches = batches.into_values().collect();
|
||||
Ok(batches)
|
||||
}
|
||||
|
||||
struct SSTBatchBuilder {
|
||||
/// for extract field column from record batch's row
|
||||
field_column_pos: Vec<usize>,
|
||||
field_ids: Vec<ColumnId>,
|
||||
field_builders: Vec<Box<dyn MutableVector>>,
|
||||
timestamp_pos: usize,
|
||||
timestamp_builder: Box<dyn MutableVector>,
|
||||
/// override sequence number
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
sequence_builder: UInt64VectorBuilder,
|
||||
op_type_builder: UInt8VectorBuilder,
|
||||
cur_seq: SequenceNumber,
|
||||
}
|
||||
|
||||
impl SSTBatchBuilder {
|
||||
fn finish(mut self, pk: Vec<u8>) -> Result<Batch, BoxedError> {
|
||||
let fields: Vec<_> = self
|
||||
.field_ids
|
||||
.iter()
|
||||
.zip(self.field_builders)
|
||||
.map(|(id, mut b)| BatchColumn {
|
||||
column_id: *id,
|
||||
data: b.to_vector(),
|
||||
})
|
||||
.collect();
|
||||
Batch::new(
|
||||
pk,
|
||||
self.timestamp_builder.to_vector(),
|
||||
Arc::new(self.sequence_builder.finish()),
|
||||
Arc::new(self.op_type_builder.finish()),
|
||||
fields,
|
||||
)
|
||||
.map_err(BoxedError::new)
|
||||
}
|
||||
|
||||
fn push_row(&mut self, row: &[Value]) -> Result<(), BoxedError> {
|
||||
for (field_pos, field_builder) in self
|
||||
.field_column_pos
|
||||
.iter()
|
||||
.zip(self.field_builders.iter_mut())
|
||||
{
|
||||
field_builder.push_value_ref(row[*field_pos].as_value_ref());
|
||||
}
|
||||
self.timestamp_builder
|
||||
.push_value_ref(row[self.timestamp_pos].as_value_ref());
|
||||
self.sequence_builder
|
||||
.push(Some(self.override_sequence.unwrap_or(self.cur_seq)));
|
||||
self.op_type_builder.push(Some(OpType::Put as u8));
|
||||
self.cur_seq += 1;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn new(
|
||||
rb: &RecordBatch,
|
||||
metadata: &RegionMetadataRef,
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
) -> Result<Self, BoxedError> {
|
||||
let timeindex_name = &metadata.time_index_column().column_schema.name;
|
||||
Ok(Self {
|
||||
field_ids: metadata.field_columns().map(|c| c.column_id).collect(),
|
||||
field_column_pos: metadata
|
||||
.field_columns()
|
||||
.map(|c| &c.column_schema.name)
|
||||
.map(|name| {
|
||||
rb.schema
|
||||
.column_index_by_name(name)
|
||||
.context(UnsupportedOperationSnafu {
|
||||
reason: format!("Can't find column {} in rb={:?}", name, rb),
|
||||
})
|
||||
.map_err(BoxedError::new)
|
||||
})
|
||||
.collect::<Result<_, _>>()?,
|
||||
field_builders: metadata
|
||||
.field_columns()
|
||||
.map(|c| c.column_schema.data_type.create_mutable_vector(512))
|
||||
.collect(),
|
||||
|
||||
timestamp_pos: rb
|
||||
.schema
|
||||
.column_index_by_name(timeindex_name)
|
||||
.context(UnsupportedOperationSnafu {
|
||||
reason: format!("{} in rb={:?}", timeindex_name, rb),
|
||||
})
|
||||
.map_err(BoxedError::new)?,
|
||||
timestamp_builder: metadata
|
||||
.time_index_column()
|
||||
.column_schema
|
||||
.data_type
|
||||
.create_mutable_vector(512),
|
||||
|
||||
override_sequence,
|
||||
sequence_builder: UInt64VectorBuilder::with_capacity(512),
|
||||
|
||||
op_type_builder: UInt8VectorBuilder::with_capacity(512),
|
||||
cur_seq: override_sequence.unwrap_or_default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
369
src/sst-convert/src/reader/remote_write.rs
Normal file
369
src/sst-convert/src/reader/remote_write.rs
Normal file
@@ -0,0 +1,369 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Prometheus remote write support.
|
||||
//!
|
||||
//! Prometheus remote write protocol in parquet format.
|
||||
//! - Each row contains a protobuf binary representation of a single timeseries.
|
||||
//! - Each series only occurs once in the file.
|
||||
//! - Each timeseries must has a the `__name__` label.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::prom_store::remote::{Label, TimeSeries};
|
||||
use api::v1::OpType;
|
||||
use arrow_array::{
|
||||
Array, BinaryArray, Float64Array, TimestampMillisecondArray, UInt64Array, UInt8Array,
|
||||
};
|
||||
use datatypes::value::ValueRef;
|
||||
use futures::StreamExt;
|
||||
use metric_engine::row_modifier::TsidGenerator;
|
||||
use mito2::error::ReadParquetSnafu;
|
||||
use mito2::read::{Batch, BatchBuilder, BatchReader};
|
||||
use mito2::row_converter::SparsePrimaryKeyCodec;
|
||||
use object_store::ObjectStore;
|
||||
use parquet::arrow::async_reader::ParquetRecordBatchStream;
|
||||
use parquet::arrow::ParquetRecordBatchStreamBuilder;
|
||||
use parquet_opendal::AsyncReader;
|
||||
use prost::Message;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::error::{
|
||||
DecodeSnafu, MissingColumnSnafu, MissingMetricNameSnafu, MissingTableSnafu, MitoSnafu,
|
||||
ObjectStoreSnafu, Result,
|
||||
};
|
||||
use crate::table::TableMetadataHelper;
|
||||
|
||||
const METRIC_NAME_LABEL: &str = "__name__";
|
||||
const GREPTIME_VALUE: &str = "greptime_value";
|
||||
|
||||
/// A reader that reads remote write file, sorts and outputs timeseries in the primary key order.
|
||||
pub struct RemoteWriteReader {
|
||||
/// Timeseries sorted by primary key in reverse order.
|
||||
/// So we can pop the series.
|
||||
series: Vec<(Vec<u8>, TimeSeries)>,
|
||||
/// Converter for converting timeseries to batches.
|
||||
converter: SeriesConverter,
|
||||
}
|
||||
|
||||
impl RemoteWriteReader {
|
||||
/// Creates a new [`RemoteWriteReader`] from a object store.
|
||||
/// It reads and sorts timeseries.
|
||||
pub async fn open(
|
||||
operator: ObjectStore,
|
||||
path: &str,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
metadata: RegionMetadataRef,
|
||||
table_helper: TableMetadataHelper,
|
||||
) -> Result<Self> {
|
||||
let codec = SparsePrimaryKeyCodec::new(&metadata);
|
||||
let value_id = metadata
|
||||
.column_by_name(GREPTIME_VALUE)
|
||||
.context(MissingColumnSnafu {
|
||||
column_name: GREPTIME_VALUE,
|
||||
})?
|
||||
.column_id;
|
||||
let encoder = PrimaryKeyEncoder {
|
||||
catalog: catalog.to_string(),
|
||||
schema: schema.to_string(),
|
||||
metadata,
|
||||
table_helper,
|
||||
table_ids: HashMap::new(),
|
||||
codec,
|
||||
};
|
||||
let converter = SeriesConverter {
|
||||
value_id,
|
||||
override_sequence: None,
|
||||
};
|
||||
let mut sorter = TimeSeriesSorter::new(encoder);
|
||||
|
||||
let mut reader = TimeSeriesParquetReader::new(operator, path).await?;
|
||||
while let Some(series) = reader.next_series().await? {
|
||||
sorter.push(series).await?;
|
||||
}
|
||||
let mut series = sorter.sort();
|
||||
series.reverse();
|
||||
|
||||
Ok(Self { series, converter })
|
||||
}
|
||||
|
||||
/// Sets the sequence number of the batch.
|
||||
/// Otherwise, the sequence number will be 0.
|
||||
pub fn with_sequence(mut self, sequence: SequenceNumber) -> Self {
|
||||
self.converter.override_sequence = Some(sequence);
|
||||
self
|
||||
}
|
||||
|
||||
fn next_series(&mut self) -> Option<(Vec<u8>, TimeSeries)> {
|
||||
self.series.pop()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl BatchReader for RemoteWriteReader {
|
||||
async fn next_batch(&mut self) -> mito2::error::Result<Option<Batch>> {
|
||||
let Some((pk, series)) = self.next_series() else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
self.converter.convert(pk, series).map(Some)
|
||||
}
|
||||
}
|
||||
|
||||
struct SeriesConverter {
|
||||
/// Column id for the value field.
|
||||
value_id: ColumnId,
|
||||
/// Sequence number of the batch.
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
}
|
||||
|
||||
impl SeriesConverter {
|
||||
/// Builds a batch from a primary key and a time series.
|
||||
fn convert(&self, primary_key: Vec<u8>, series: TimeSeries) -> mito2::error::Result<Batch> {
|
||||
let num_rows = series.samples.len();
|
||||
let op_types = vec![OpType::Put as u8; num_rows];
|
||||
// TODO(yingwen): Should we use 0 or 1 as default?
|
||||
let sequences = vec![self.override_sequence.unwrap_or(0); num_rows];
|
||||
let mut timestamps = Vec::with_capacity(num_rows);
|
||||
let mut values = Vec::with_capacity(num_rows);
|
||||
for sample in series.samples {
|
||||
timestamps.push(sample.timestamp);
|
||||
values.push(sample.value);
|
||||
}
|
||||
|
||||
let mut builder = BatchBuilder::new(primary_key);
|
||||
builder
|
||||
.timestamps_array(Arc::new(TimestampMillisecondArray::from(timestamps)))?
|
||||
.sequences_array(Arc::new(UInt64Array::from(sequences)))?
|
||||
.op_types_array(Arc::new(UInt8Array::from(op_types)))?
|
||||
.push_field_array(self.value_id, Arc::new(Float64Array::from(values)))?;
|
||||
builder.build()
|
||||
}
|
||||
}
|
||||
|
||||
/// Prometheus remote write parquet reader.
|
||||
pub struct TimeSeriesParquetReader {
|
||||
path: String,
|
||||
stream: ParquetRecordBatchStream<AsyncReader>,
|
||||
/// Is the stream EOF.
|
||||
eof: bool,
|
||||
/// Current binary array.
|
||||
array: Option<BinaryArray>,
|
||||
/// Current row in the record batch.
|
||||
/// Only valid when the record batch is Some.
|
||||
row_index: usize,
|
||||
}
|
||||
|
||||
impl TimeSeriesParquetReader {
|
||||
/// Creates a new instance of `TimeSeriesParquetReader`.
|
||||
pub async fn new(object_store: ObjectStore, path: &str) -> Result<Self> {
|
||||
let reader = object_store
|
||||
.reader_with(path)
|
||||
.await
|
||||
.context(ObjectStoreSnafu)?;
|
||||
let content_len = object_store
|
||||
.stat(path)
|
||||
.await
|
||||
.context(ObjectStoreSnafu)?
|
||||
.content_length();
|
||||
let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024);
|
||||
let stream = ParquetRecordBatchStreamBuilder::new(reader)
|
||||
.await
|
||||
.context(ReadParquetSnafu { path })
|
||||
.context(MitoSnafu)?
|
||||
.build()
|
||||
.context(ReadParquetSnafu { path })
|
||||
.context(MitoSnafu)?;
|
||||
|
||||
Ok(Self {
|
||||
path: path.to_string(),
|
||||
stream,
|
||||
eof: false,
|
||||
array: None,
|
||||
row_index: 0,
|
||||
})
|
||||
}
|
||||
|
||||
/// Reads the next timeseries from the reader.
|
||||
pub async fn next_series(&mut self) -> Result<Option<TimeSeries>> {
|
||||
if self.eof {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if let Some(array) = &self.array {
|
||||
if self.row_index >= array.len() {
|
||||
self.row_index = 0;
|
||||
self.array = None;
|
||||
}
|
||||
}
|
||||
|
||||
if self.array.is_none() {
|
||||
let Some(record_batch) = self
|
||||
.stream
|
||||
.next()
|
||||
.await
|
||||
.transpose()
|
||||
.context(ReadParquetSnafu { path: &self.path })
|
||||
.context(MitoSnafu)?
|
||||
else {
|
||||
self.eof = true;
|
||||
return Ok(None);
|
||||
};
|
||||
let array = record_batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<BinaryArray>()
|
||||
.context(MissingColumnSnafu {
|
||||
column_name: "remote write json column",
|
||||
})?
|
||||
.clone();
|
||||
assert!(!array.is_empty());
|
||||
self.array = Some(array);
|
||||
}
|
||||
|
||||
let array = self.array.as_ref().unwrap();
|
||||
let value = array.value(self.row_index);
|
||||
let time_series = TimeSeries::decode(value).context(DecodeSnafu)?;
|
||||
self.row_index += 1;
|
||||
|
||||
Ok(Some(time_series))
|
||||
}
|
||||
}
|
||||
|
||||
/// Encoder to encode labels into primary key.
|
||||
struct PrimaryKeyEncoder {
|
||||
/// Catalog name.
|
||||
catalog: String,
|
||||
/// Schema name.
|
||||
schema: String,
|
||||
/// The metadata of the physical region.
|
||||
metadata: RegionMetadataRef,
|
||||
/// Helper to get table metadata.
|
||||
table_helper: TableMetadataHelper,
|
||||
/// Cached table name to table id.
|
||||
table_ids: HashMap<String, TableId>,
|
||||
/// Primary key encoder.
|
||||
codec: SparsePrimaryKeyCodec,
|
||||
}
|
||||
|
||||
impl PrimaryKeyEncoder {
|
||||
/// Encodes the primary key for the given labels.
|
||||
/// It'll sort the labels by name before encoding.
|
||||
async fn encode_primary_key(
|
||||
&mut self,
|
||||
labels: &mut Vec<Label>,
|
||||
key_buf: &mut Vec<u8>,
|
||||
) -> Result<()> {
|
||||
if !labels.is_sorted_by(|left, right| left.name <= right.name) {
|
||||
labels.sort_unstable_by(|left, right| left.name.cmp(&right.name));
|
||||
}
|
||||
|
||||
// Gets the table id from the label.
|
||||
let name_label = labels
|
||||
.iter()
|
||||
.find(|label| label.name == METRIC_NAME_LABEL)
|
||||
.context(MissingMetricNameSnafu)?;
|
||||
let table_id = match self.table_ids.get(&name_label.value) {
|
||||
Some(id) => *id,
|
||||
None => {
|
||||
let table_info = self
|
||||
.table_helper
|
||||
.get_table(&self.catalog, &self.schema, &name_label.value)
|
||||
.await?
|
||||
.context(MissingTableSnafu {
|
||||
table_name: &name_label.value,
|
||||
})?;
|
||||
let id = table_info.table_info.ident.table_id;
|
||||
self.table_ids.insert(name_label.value.clone(), id);
|
||||
|
||||
id
|
||||
}
|
||||
};
|
||||
// Computes the tsid for the given labels.
|
||||
let mut generator = TsidGenerator::default();
|
||||
for label in &*labels {
|
||||
if label.name != METRIC_NAME_LABEL {
|
||||
generator.write_label(&label.name, &label.value);
|
||||
}
|
||||
}
|
||||
let tsid = generator.finish();
|
||||
|
||||
key_buf.clear();
|
||||
let internal_columns = [
|
||||
(ReservedColumnId::table_id(), ValueRef::UInt32(table_id)),
|
||||
(ReservedColumnId::tsid(), ValueRef::UInt64(tsid)),
|
||||
];
|
||||
self.codec
|
||||
.encode_to_vec(internal_columns.into_iter(), key_buf)
|
||||
.context(MitoSnafu)?;
|
||||
let label_iter = labels.iter().filter_map(|label| {
|
||||
if label.name == METRIC_NAME_LABEL {
|
||||
return None;
|
||||
}
|
||||
|
||||
let column_id = self.metadata.column_by_name(&label.name)?.column_id;
|
||||
Some((column_id, ValueRef::String(&label.value)))
|
||||
});
|
||||
self.codec
|
||||
.encode_to_vec(label_iter, key_buf)
|
||||
.context(MitoSnafu)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Prom timeseries sorter.
|
||||
/// Sorts timeseries by the primary key.
|
||||
struct TimeSeriesSorter {
|
||||
/// Timeseries to sort.
|
||||
series: Vec<(Vec<u8>, TimeSeries)>,
|
||||
/// Key encoder.
|
||||
encoder: PrimaryKeyEncoder,
|
||||
}
|
||||
|
||||
impl TimeSeriesSorter {
|
||||
/// Creates a new sorter.
|
||||
fn new(encoder: PrimaryKeyEncoder) -> Self {
|
||||
Self {
|
||||
series: Vec::new(),
|
||||
encoder,
|
||||
}
|
||||
}
|
||||
|
||||
/// Push a timeseries to the sorter.
|
||||
async fn push(&mut self, mut series: TimeSeries) -> Result<()> {
|
||||
let mut key_buf = Vec::new();
|
||||
self.encoder
|
||||
.encode_primary_key(&mut series.labels, &mut key_buf)
|
||||
.await?;
|
||||
self.series.push((key_buf, series));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sorts the requests.
|
||||
/// Returns the sorted timeseries and the primary key.
|
||||
fn sort(&mut self) -> Vec<(Vec<u8>, TimeSeries)> {
|
||||
self.series
|
||||
.sort_unstable_by(|left, right| left.0.cmp(&right.0));
|
||||
|
||||
std::mem::take(&mut self.series)
|
||||
}
|
||||
}
|
||||
78
src/sst-convert/src/table.rs
Normal file
78
src/sst-convert/src/table.rs
Normal file
@@ -0,0 +1,78 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Utilities to get table metadata.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::kvbackend::MetaKvBackend;
|
||||
use common_meta::key::table_info::TableInfoValue;
|
||||
use common_meta::key::table_name::TableNameKey;
|
||||
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{MetaClientSnafu, MetaSnafu, Result};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TableMetadataHelper {
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
}
|
||||
|
||||
impl TableMetadataHelper {
|
||||
pub async fn new(meta_options: &MetaClientOptions) -> Result<Self> {
|
||||
let backend = build_kv_backend(meta_options).await?;
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(backend)));
|
||||
Ok(Self {
|
||||
table_metadata_manager,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get table info.
|
||||
pub async fn get_table(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
table: &str,
|
||||
) -> Result<Option<TableInfoValue>> {
|
||||
let table_name = TableNameKey::new(catalog, schema, table);
|
||||
let Some(table_id) = self
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
.get(table_name)
|
||||
.await
|
||||
.context(MetaSnafu)?
|
||||
.map(|v| v.table_id())
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let value = self
|
||||
.table_metadata_manager
|
||||
.table_info_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(MetaSnafu)?
|
||||
.map(|v| v.into_inner());
|
||||
Ok(value)
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_kv_backend(meta_options: &MetaClientOptions) -> Result<MetaKvBackend> {
|
||||
let meta_client = meta_client::create_meta_client(MetaClientType::Frontend, meta_options)
|
||||
.await
|
||||
.context(MetaClientSnafu)?;
|
||||
|
||||
Ok(MetaKvBackend::new(meta_client))
|
||||
}
|
||||
129
src/sst-convert/src/writer.rs
Normal file
129
src/sst-convert/src/writer.rs
Normal file
@@ -0,0 +1,129 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Utilities for writing SST files.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use mito2::access_layer::{
|
||||
AccessLayer, AccessLayerRef, OperationType, SstInfoArray, SstWriteRequest,
|
||||
};
|
||||
use mito2::cache::CacheManager;
|
||||
use mito2::config::MitoConfig;
|
||||
use mito2::error::Result;
|
||||
use mito2::read::Source;
|
||||
use mito2::region::options::RegionOptions;
|
||||
use mito2::sst::index::intermediate::IntermediateManager;
|
||||
use mito2::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
use mito2::sst::parquet::WriteOptions;
|
||||
use object_store::manager::ObjectStoreManagerRef;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
|
||||
/// A writer that can create multiple SST files for a region.
|
||||
pub struct RegionWriter {
|
||||
/// Mito engine config.
|
||||
config: Arc<MitoConfig>,
|
||||
/// Metadata of the region.
|
||||
metadata: RegionMetadataRef,
|
||||
/// Options of the region.
|
||||
region_options: RegionOptions,
|
||||
/// SST access layer.
|
||||
access_layer: AccessLayerRef,
|
||||
}
|
||||
|
||||
impl RegionWriter {
|
||||
/// Writes data from a source to SST files.
|
||||
pub async fn write_sst(
|
||||
&self,
|
||||
source: Source,
|
||||
write_opts: &WriteOptions,
|
||||
) -> Result<SstInfoArray> {
|
||||
let request = SstWriteRequest {
|
||||
op_type: OperationType::Flush,
|
||||
metadata: self.metadata.clone(),
|
||||
source,
|
||||
cache_manager: Arc::new(CacheManager::default()),
|
||||
storage: None,
|
||||
max_sequence: None,
|
||||
index_options: self.region_options.index_options.clone(),
|
||||
inverted_index_config: self.config.inverted_index.clone(),
|
||||
fulltext_index_config: self.config.fulltext_index.clone(),
|
||||
bloom_filter_index_config: self.config.bloom_filter_index.clone(),
|
||||
};
|
||||
|
||||
self.access_layer.write_sst(request, write_opts).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Creator to create [`RegionWriter`] for different regions.
|
||||
pub struct RegionWriterBuilder {
|
||||
/// Mito engine config.
|
||||
config: Arc<MitoConfig>,
|
||||
/// Object stores.
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
intermediate_manager: IntermediateManager,
|
||||
}
|
||||
|
||||
impl RegionWriterBuilder {
|
||||
/// Create a new [`RegionContextCreator`].
|
||||
pub async fn new(
|
||||
config: Arc<MitoConfig>,
|
||||
object_store_manager: ObjectStoreManagerRef,
|
||||
) -> Result<Self> {
|
||||
let puffin_manager_factory = PuffinManagerFactory::new(
|
||||
&config.index.aux_path,
|
||||
config.index.staging_size.as_bytes(),
|
||||
Some(config.index.write_buffer_size.as_bytes() as _),
|
||||
config.index.staging_ttl,
|
||||
)
|
||||
.await?;
|
||||
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
|
||||
.await?
|
||||
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
object_store_manager,
|
||||
puffin_manager_factory,
|
||||
intermediate_manager,
|
||||
})
|
||||
}
|
||||
|
||||
/// Builds a [`RegionWriter`] for the given region directory.
|
||||
pub async fn build(
|
||||
&self,
|
||||
metadata: RegionMetadataRef,
|
||||
region_dir: &str,
|
||||
region_options: RegionOptions,
|
||||
) -> Result<RegionWriter> {
|
||||
let object_store = mito2::region::opener::get_object_store(
|
||||
®ion_options.storage,
|
||||
&self.object_store_manager,
|
||||
)?;
|
||||
let access_layer = Arc::new(AccessLayer::new(
|
||||
region_dir,
|
||||
object_store,
|
||||
self.puffin_manager_factory.clone(),
|
||||
self.intermediate_manager.clone(),
|
||||
));
|
||||
|
||||
Ok(RegionWriter {
|
||||
config: self.config.clone(),
|
||||
metadata,
|
||||
region_options,
|
||||
access_layer,
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user