Compare commits

..

4 Commits

Author SHA1 Message Date
Ruihang Xia
038bc4fe6e revert toml format
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-14 00:46:00 +08:00
Ruihang Xia
6d07c422d8 Merge branch 'main' into fix-proto-clear
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-14 00:36:28 +08:00
Ruihang Xia
6c14ece23f accomplish test assertion
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-14 00:32:49 +08:00
Ruihang Xia
89c51d9b87 reset Sample
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-13 23:32:22 +08:00
25 changed files with 263 additions and 1058 deletions

View File

@@ -70,7 +70,7 @@ runs:
- name: Build greptime binary
shell: pwsh
run: cargo build --profile ${{ inputs.cargo-profile }} --features ${{ inputs.features }} --target ${{ inputs.arch }} --bin greptime
run: cargo build --profile ${{ inputs.cargo-profile }} --features ${{ inputs.features }} --target ${{ inputs.arch }}
- name: Upload artifacts
uses: ./.github/actions/upload-artifacts

View File

@@ -117,7 +117,7 @@ jobs:
artifacts-dir: bins
version: current
fuzztest:
fuzztest:
name: Fuzz Test
needs: build
runs-on: ubuntu-latest
@@ -148,7 +148,7 @@ jobs:
- name: Unzip binaries
run: tar -xvf ./bins.tar.gz
- name: Run GreptimeDB
run: |
run: |
./bins/greptime standalone start&
- name: Fuzz Test
uses: ./.github/actions/fuzz-test
@@ -279,10 +279,6 @@ jobs:
with:
# Shares cross multiple jobs
shared-key: "coverage-test"
- name: Docker Cache
uses: ScribeMD/docker-cache@0.3.7
with:
key: docker-${{ runner.os }}-coverage
- name: Install latest nextest release
uses: taiki-e/install-action@nextest
- name: Install cargo-llvm-cov

149
Cargo.lock generated
View File

@@ -207,7 +207,7 @@ checksum = "8f1f8f5a6f3d50d89e3797d7593a50f96bb2aaa20ca0cc7be1fb673232c91d72"
[[package]]
name = "api"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"common-base",
"common-decimal",
@@ -570,6 +570,7 @@ version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a"
dependencies = [
"brotli",
"bzip2",
"flate2",
"futures-core",
@@ -588,7 +589,6 @@ version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5"
dependencies = [
"brotli",
"bzip2",
"flate2",
"futures-core",
@@ -695,7 +695,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"async-trait",
@@ -881,7 +881,7 @@ dependencies = [
[[package]]
name = "benchmarks"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"arrow",
"chrono",
@@ -1248,7 +1248,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"arc-swap",
@@ -1558,7 +1558,7 @@ checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1"
[[package]]
name = "client"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"arc-swap",
@@ -1594,7 +1594,7 @@ dependencies = [
"session",
"snafu",
"substrait 0.17.1",
"substrait 0.7.1",
"substrait 0.7.0",
"tokio",
"tokio-stream",
"tonic 0.10.2",
@@ -1624,7 +1624,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"anymap",
"async-trait",
@@ -1677,7 +1677,7 @@ dependencies = [
"session",
"snafu",
"store-api",
"substrait 0.7.1",
"substrait 0.7.0",
"table",
"temp-env",
"tikv-jemallocator",
@@ -1720,7 +1720,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"anymap",
"bitvec",
@@ -1735,7 +1735,7 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"chrono",
"common-error",
@@ -1746,7 +1746,7 @@ dependencies = [
[[package]]
name = "common-config"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"common-base",
"humantime-serde",
@@ -1757,7 +1757,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"arrow",
"arrow-schema",
@@ -1789,7 +1789,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"arrow",
"bigdecimal",
@@ -1803,7 +1803,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"snafu",
"strum 0.25.0",
@@ -1811,7 +1811,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"arc-swap",
@@ -1846,7 +1846,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"async-trait",
"common-error",
@@ -1865,7 +1865,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"arrow-flight",
@@ -1895,7 +1895,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"async-trait",
@@ -1914,7 +1914,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"arc-swap",
"common-query",
@@ -1929,7 +1929,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"common-error",
"common-macro",
@@ -1942,7 +1942,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"async-recursion",
@@ -1992,11 +1992,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.7.1"
version = "0.7.0"
[[package]]
name = "common-procedure"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"async-stream",
"async-trait",
@@ -2020,7 +2020,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"async-trait",
"common-procedure",
@@ -2028,7 +2028,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"async-trait",
@@ -2051,7 +2051,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"arc-swap",
"common-base",
@@ -2071,7 +2071,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"async-trait",
"common-error",
@@ -2091,7 +2091,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"atty",
"backtrace",
@@ -2119,7 +2119,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"client",
"common-query",
@@ -2131,7 +2131,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"arrow",
"chrono",
@@ -2147,14 +2147,14 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"build-data",
]
[[package]]
name = "common-wal"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"common-base",
"common-error",
@@ -2802,7 +2802,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"arrow-flight",
@@ -2860,7 +2860,7 @@ dependencies = [
"snafu",
"sql",
"store-api",
"substrait 0.7.1",
"substrait 0.7.0",
"table",
"tokio",
"tokio-stream",
@@ -2874,7 +2874,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"arrow",
"arrow-array",
@@ -3361,7 +3361,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"async-trait",
@@ -3462,7 +3462,7 @@ dependencies = [
[[package]]
name = "flow"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"bimap",
@@ -3519,7 +3519,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"arc-swap",
@@ -3583,7 +3583,7 @@ dependencies = [
"sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)",
"store-api",
"strfmt",
"substrait 0.7.1",
"substrait 0.7.0",
"table",
"tokio",
"toml 0.8.8",
@@ -4352,7 +4352,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -4526,12 +4526,11 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
[[package]]
name = "iri-string"
version = "0.7.0"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21859b667d66a4c1dacd9df0863b3efb65785474255face87f5bca39dd8407c0"
checksum = "8f0f7638c1e223529f1bfdc48c8b133b9e0b434094d1d28473161ee48b235f78"
dependencies = [
"memchr",
"serde",
"nom",
]
[[package]]
@@ -4931,7 +4930,7 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "log-store"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"async-stream",
"async-trait",
@@ -5220,7 +5219,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"async-trait",
@@ -5250,7 +5249,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"anymap",
"api",
@@ -5330,7 +5329,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"aquamarine",
@@ -5402,7 +5401,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"anymap",
"api",
@@ -6016,7 +6015,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"anyhow",
"async-trait",
@@ -6259,7 +6258,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"async-trait",
@@ -6306,7 +6305,7 @@ dependencies = [
"sql",
"sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)",
"store-api",
"substrait 0.7.1",
"substrait 0.7.0",
"table",
"tokio",
"tonic 0.10.2",
@@ -6537,7 +6536,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"async-trait",
@@ -6897,7 +6896,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"auth",
"common-base",
@@ -7164,7 +7163,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"ahash 0.8.6",
"async-recursion",
@@ -7375,7 +7374,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"async-trait",
"bitflags 2.4.1",
@@ -7496,7 +7495,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"ahash 0.8.6",
"api",
@@ -7557,7 +7556,7 @@ dependencies = [
"stats-cli",
"store-api",
"streaming-stats",
"substrait 0.7.1",
"substrait 0.7.0",
"table",
"tokio",
"tokio-stream",
@@ -8874,7 +8873,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "script"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"arc-swap",
@@ -9158,7 +9157,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"aide",
"api",
@@ -9264,7 +9263,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"arc-swap",
@@ -9534,7 +9533,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"common-base",
@@ -9586,7 +9585,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"async-trait",
"clap 4.4.11",
@@ -9793,7 +9792,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"aquamarine",
@@ -9933,7 +9932,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"async-recursion",
"async-trait",
@@ -10106,7 +10105,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"anymap",
"async-trait",
@@ -10218,7 +10217,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"arbitrary",
"async-trait",
@@ -10247,7 +10246,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.7.1"
version = "0.7.0"
dependencies = [
"api",
"arrow-flight",
@@ -10304,7 +10303,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.7.1",
"substrait 0.7.0",
"table",
"tempfile",
"time",
@@ -10863,13 +10862,13 @@ dependencies = [
[[package]]
name = "tower-http"
version = "0.4.4"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140"
checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858"
dependencies = [
"async-compression 0.4.5",
"base64 0.21.5",
"bitflags 2.4.1",
"async-compression 0.3.15",
"base64 0.13.1",
"bitflags 1.3.2",
"bytes",
"futures-core",
"futures-util",

View File

@@ -62,7 +62,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.7.1"
version = "0.7.0"
edition = "2021"
license = "Apache-2.0"

View File

@@ -1,50 +0,0 @@
# TSBS benchmark - v0.7.0
## Environment
### Local
| | |
| ------ | ---------------------------------- |
| CPU | AMD Ryzen 7 7735HS (8 core 3.2GHz) |
| Memory | 32GB |
| Disk | SOLIDIGM SSDPFKNU010TZ |
| OS | Ubuntu 22.04.2 LTS |
### Amazon EC2
| | |
| ------- | -------------- |
| Machine | c5d.2xlarge |
| CPU | 8 core |
| Memory | 16GB |
| Disk | 50GB (GP3) |
| OS | Ubuntu 22.04.1 |
## Write performance
| Environment | Ingest rate (rows/s) |
| ------------------ | --------------------- |
| Local | 3695814.64 |
| EC2 c5d.2xlarge | 2987166.64 |
## Query performance
| Query type | Local (ms) | EC2 c5d.2xlarge (ms) |
| --------------------- | ---------- | ---------------------- |
| cpu-max-all-1 | 30.56 | 54.74 |
| cpu-max-all-8 | 52.69 | 70.50 |
| double-groupby-1 | 664.30 | 1366.63 |
| double-groupby-5 | 1391.26 | 2141.71 |
| double-groupby-all | 2828.94 | 3389.59 |
| groupby-orderby-limit | 718.92 | 1213.90 |
| high-cpu-1 | 29.21 | 52.98 |
| high-cpu-all | 5514.12 | 7194.91 |
| lastpoint | 7571.40 | 9423.41 |
| single-groupby-1-1-1 | 19.09 | 7.77 |
| single-groupby-1-1-12 | 27.28 | 51.64 |
| single-groupby-1-8-1 | 31.85 | 11.64 |
| single-groupby-5-1-1 | 16.14 | 9.67 |
| single-groupby-5-1-12 | 27.21 | 53.62 |
| single-groupby-5-8-1 | 39.62 | 14.96 |

View File

@@ -34,14 +34,10 @@ pub struct SequenceBuilder {
max: u64,
}
fn seq_name(name: impl AsRef<str>) -> String {
format!("{}-{}", SEQ_PREFIX, name.as_ref())
}
impl SequenceBuilder {
pub fn new(name: impl AsRef<str>, generator: KvBackendRef) -> Self {
Self {
name: seq_name(name),
name: format!("{}-{}", SEQ_PREFIX, name.as_ref()),
initial: 0,
step: 1,
generator,
@@ -142,14 +138,13 @@ impl Inner {
pub async fn next_range(&self) -> Result<Range<u64>> {
let key = self.name.as_bytes();
let mut start = self.next;
let mut expect = if start == self.initial {
vec![]
} else {
u64::to_le_bytes(start).to_vec()
};
for _ in 0..self.force_quit {
let expect = if start == self.initial {
vec![]
} else {
u64::to_le_bytes(start).to_vec()
};
let step = self.step.min(self.max - start);
ensure!(
@@ -172,24 +167,15 @@ impl Inner {
if !res.success {
if let Some(kv) = res.prev_kv {
expect = kv.value.clone();
let v: [u8; 8] = match kv.value.try_into() {
Ok(a) => a,
Err(v) => {
return error::UnexpectedSequenceValueSnafu {
err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name),
}
.fail()
let value = kv.value;
ensure!(
value.len() == std::mem::size_of::<u64>(),
error::UnexpectedSequenceValueSnafu {
err_msg: format!("key={}, unexpected value={:?}", self.name, value)
}
};
let v = u64::from_le_bytes(v);
// If the existed value is smaller than the initial, we should start from the initial.
start = v.max(self.initial);
);
start = u64::from_le_bytes(value.try_into().unwrap());
} else {
expect = vec![];
start = self.initial;
}
continue;
@@ -211,12 +197,8 @@ impl Inner {
#[cfg(test)]
mod tests {
use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;
use itertools::{Itertools, MinMaxResult};
use tokio::sync::mpsc;
use super::*;
use crate::error::Error;
use crate::kv_backend::memory::MemoryKvBackend;
@@ -227,76 +209,6 @@ mod tests {
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
#[tokio::test]
async fn test_sequence_with_existed_value() {
async fn test(exist: u64, expected: Vec<u64>) {
let kv_backend = Arc::new(MemoryKvBackend::default());
let exist = u64::to_le_bytes(exist);
kv_backend
.put(PutRequest::new().with_key(seq_name("s")).with_value(exist))
.await
.unwrap();
let initial = 100;
let seq = SequenceBuilder::new("s", kv_backend)
.initial(initial)
.build();
let mut actual = Vec::with_capacity(expected.len());
for _ in 0..expected.len() {
actual.push(seq.next().await.unwrap());
}
assert_eq!(actual, expected);
}
// put a value not greater than the "initial", the sequence should start from "initial"
test(1, vec![100, 101, 102]).await;
test(100, vec![100, 101, 102]).await;
// put a value greater than the "initial", the sequence should start from the put value
test(200, vec![200, 201, 202]).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_sequence_with_contention() {
let seq = Arc::new(
SequenceBuilder::new("s", Arc::new(MemoryKvBackend::default()))
.initial(1024)
.build(),
);
let (tx, mut rx) = mpsc::unbounded_channel();
// Spawn 10 tasks to concurrently get the next sequence. Each task will get 100 sequences.
for _ in 0..10 {
tokio::spawn({
let seq = seq.clone();
let tx = tx.clone();
async move {
for _ in 0..100 {
tx.send(seq.next().await.unwrap()).unwrap()
}
}
});
}
// Test that we get 1000 unique sequences, and start from 1024 to 2023.
let mut nums = HashSet::new();
let mut c = 0;
while c < 1000
&& let Some(x) = rx.recv().await
{
nums.insert(x);
c += 1;
}
assert_eq!(nums.len(), 1000);
let MinMaxResult::MinMax(min, max) = nums.iter().minmax() else {
unreachable!("nums has more than one elements");
};
assert_eq!(*min, 1024);
assert_eq!(*max, 2023);
}
#[tokio::test]
async fn test_sequence() {
let kv_backend = Arc::new(MemoryKvBackend::default());

View File

@@ -73,7 +73,7 @@ tokio-stream = { workspace = true, features = ["net"] }
toml.workspace = true
tonic.workspace = true
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["full"] }
url = "2.3.1"
uuid.workspace = true

View File

@@ -26,7 +26,6 @@ use table::predicate::Predicate;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
pub use crate::memtable::key_values::KeyValues;
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::metrics::WRITE_BUFFER_BYTES;
@@ -34,7 +33,6 @@ use crate::read::Batch;
pub mod key_values;
pub mod merge_tree;
pub mod time_partition;
pub mod time_series;
pub(crate) mod version;
@@ -84,12 +82,9 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Returns the id of this memtable.
fn id(&self) -> MemtableId;
/// Writes key values into the memtable.
/// Write key values into the memtable.
fn write(&self, kvs: &KeyValues) -> Result<()>;
/// Writes one key value pair into the memtable.
fn write_one(&self, key_value: KeyValue) -> Result<()>;
/// Scans the memtable.
/// `projection` selects columns to read, `None` means reading all columns.
/// `filters` are the predicates to be pushed down to memtable.

View File

@@ -71,7 +71,7 @@ impl KeyValues {
/// Primary key columns have the same order as region's primary key. Field
/// columns are ordered by their position in the region schema (The same order
/// as users defined while creating the region).
#[derive(Debug, Clone, Copy)]
#[derive(Debug)]
pub struct KeyValue<'a> {
row: &'a Row,
schema: &'a Vec<ColumnSchema>,

View File

@@ -36,7 +36,6 @@ use table::predicate::Predicate;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::tree::MergeTree;
use crate::memtable::{
@@ -128,17 +127,6 @@ impl Memtable for MergeTreeMemtable {
res
}
fn write_one(&self, key_value: KeyValue) -> Result<()> {
let mut metrics = WriteMetrics::default();
let mut pk_buffer = Vec::new();
// Ensures the memtable always updates stats.
let res = self.tree.write_one(key_value, &mut pk_buffer, &mut metrics);
self.update_stats(&metrics);
res
}
fn iter(
&self,
projection: Option<&[ColumnId]>,
@@ -302,14 +290,16 @@ impl MemtableBuilder for MergeTreeMemtableBuilder {
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::scalars::ScalarVector;
use datatypes::vectors::Int64Vector;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
use super::*;
use crate::test_util::memtable_util::{self, collect_iter_timestamps};
use crate::test_util::memtable_util;
#[test]
fn test_memtable_sorted_input() {
@@ -332,10 +322,23 @@ mod tests {
let expected_ts = kvs
.iter()
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<Vec<_>>();
.collect::<BTreeSet<_>>();
let iter = memtable.iter(None, None).unwrap();
let read = collect_iter_timestamps(iter);
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<BTreeSet<_>>();
assert_eq!(expected_ts, read);
let stats = memtable.stats();
@@ -383,7 +386,20 @@ mod tests {
memtable.write(&kvs).unwrap();
let iter = memtable.iter(None, None).unwrap();
let read = collect_iter_timestamps(iter);
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
let iter = memtable.iter(None, None).unwrap();
@@ -498,7 +514,20 @@ mod tests {
let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
let iter = memtable.iter(None, None).unwrap();
let read = collect_iter_timestamps(iter);
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
assert_eq!(expect, read);
}
@@ -535,7 +564,20 @@ mod tests {
let iter = memtable
.iter(None, Some(Predicate::new(vec![expr.into()])))
.unwrap();
let read = collect_iter_timestamps(iter);
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
assert_eq!(timestamps, read);
}
}

View File

@@ -148,54 +148,6 @@ impl MergeTree {
Ok(())
}
/// Write one key value pair into the tree.
///
/// # Panics
/// Panics if the tree is immutable (frozen).
pub fn write_one(
&self,
kv: KeyValue,
pk_buffer: &mut Vec<u8>,
metrics: &mut WriteMetrics,
) -> Result<()> {
let has_pk = !self.metadata.primary_key.is_empty();
ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
PrimaryKeyLengthMismatchSnafu {
expect: self.row_codec.num_fields(),
actual: kv.num_primary_keys(),
}
);
// Safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
metrics.min_ts = metrics.min_ts.min(ts);
metrics.max_ts = metrics.max_ts.max(ts);
metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
if !has_pk {
// No primary key.
return self.write_no_key(kv);
}
// Encode primary key.
pk_buffer.clear();
if self.is_partitioned {
// Use sparse encoder for metric engine.
self.sparse_encoder
.encode_to_vec(kv.primary_keys(), pk_buffer)?;
} else {
self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?;
}
// Write rows with
self.write_with_key(pk_buffer, kv, metrics)?;
metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
Ok(())
}
/// Scans the tree.
pub fn read(
&self,

View File

@@ -1,551 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Partitions memtables by time.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use common_telemetry::debug;
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use smallvec::{smallvec, SmallVec};
use snafu::OptionExt;
use store_api::metadata::RegionMetadataRef;
use crate::error::{InvalidRequestSnafu, Result};
use crate::memtable::key_values::KeyValue;
use crate::memtable::version::SmallMemtableVec;
use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
/// A partition holds rows with timestamps between `[min, max)`.
#[derive(Debug, Clone)]
pub struct TimePartition {
/// Memtable of the partition.
memtable: MemtableRef,
/// Time range of the partition. `None` means there is no time range. The time
/// range is `None` if and only if the [TimePartitions::part_duration] is `None`.
time_range: Option<PartTimeRange>,
}
impl TimePartition {
/// Returns whether the `ts` belongs to the partition.
fn contains_timestamp(&self, ts: Timestamp) -> bool {
let Some(range) = self.time_range else {
return true;
};
range.contains_timestamp(ts)
}
/// Write rows to the part.
fn write(&self, kvs: &KeyValues) -> Result<()> {
self.memtable.write(kvs)
}
}
type PartitionVec = SmallVec<[TimePartition; 2]>;
/// Partitions.
#[derive(Debug)]
pub struct TimePartitions {
/// Mutable data of partitions.
inner: Mutex<PartitionsInner>,
/// Duration of a partition.
///
/// `None` means there is only one partition and the [TimePartition::time_range] is
/// also `None`.
part_duration: Option<Duration>,
/// Metadata of the region.
metadata: RegionMetadataRef,
/// Builder of memtables.
builder: MemtableBuilderRef,
}
pub type TimePartitionsRef = Arc<TimePartitions>;
impl TimePartitions {
/// Returns a new empty partition list with optional duration.
pub fn new(
metadata: RegionMetadataRef,
builder: MemtableBuilderRef,
next_memtable_id: MemtableId,
part_duration: Option<Duration>,
) -> Self {
let mut inner = PartitionsInner::new(next_memtable_id);
if part_duration.is_none() {
// If `part_duration` is None, then we create a partition with `None` time
// range so we will write all rows to that partition.
let memtable = builder.build(inner.alloc_memtable_id(), &metadata);
debug!(
"Creates a time partition for all timestamps, region: {}, memtable_id: {}",
metadata.region_id,
memtable.id(),
);
let part = TimePartition {
memtable,
time_range: None,
};
inner.parts.push(part);
}
Self {
inner: Mutex::new(inner),
part_duration,
metadata,
builder,
}
}
/// Write key values to memtables.
///
/// It creates new partitions if necessary.
pub fn write(&self, kvs: &KeyValues) -> Result<()> {
// Get all parts.
let parts = self.list_partitions();
// Checks whether all rows belongs to a single part. Checks in reverse order as we usually
// put to latest part.
for part in parts.iter().rev() {
let mut all_in_partition = true;
for kv in kvs.iter() {
// Safety: We checked the schema in the write request.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
if !part.contains_timestamp(ts) {
all_in_partition = false;
break;
}
}
if !all_in_partition {
continue;
}
// We can write all rows to this part.
return part.write(kvs);
}
// Slow path: We have to split kvs by partitions.
self.write_multi_parts(kvs, &parts)
}
/// Append memtables in partitions to `memtables`.
pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
let inner = self.inner.lock().unwrap();
memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
}
/// Returns the number of partitions.
pub fn num_partitions(&self) -> usize {
let inner = self.inner.lock().unwrap();
inner.parts.len()
}
/// Returns true if all memtables are empty.
pub fn is_empty(&self) -> bool {
let inner = self.inner.lock().unwrap();
inner.parts.iter().all(|part| part.memtable.is_empty())
}
/// Freezes all memtables.
pub fn freeze(&self) -> Result<()> {
let inner = self.inner.lock().unwrap();
for part in &*inner.parts {
part.memtable.freeze()?;
}
Ok(())
}
/// Forks latest partition.
pub fn fork(&self, metadata: &RegionMetadataRef) -> Self {
let mut inner = self.inner.lock().unwrap();
let latest_part = inner
.parts
.iter()
.max_by_key(|part| part.time_range.map(|range| range.min_timestamp))
.cloned();
let Some(old_part) = latest_part else {
return Self::new(
metadata.clone(),
self.builder.clone(),
inner.next_memtable_id,
self.part_duration,
);
};
let memtable = old_part.memtable.fork(inner.alloc_memtable_id(), metadata);
let new_part = TimePartition {
memtable,
time_range: old_part.time_range,
};
Self {
inner: Mutex::new(PartitionsInner::with_partition(
new_part,
inner.next_memtable_id,
)),
part_duration: self.part_duration,
metadata: metadata.clone(),
builder: self.builder.clone(),
}
}
/// Returns partition duration.
pub(crate) fn part_duration(&self) -> Option<Duration> {
self.part_duration
}
/// Returns memory usage.
pub(crate) fn memory_usage(&self) -> usize {
let inner = self.inner.lock().unwrap();
inner
.parts
.iter()
.map(|part| part.memtable.stats().estimated_bytes)
.sum()
}
/// Append memtables in partitions to small vec.
pub(crate) fn list_memtables_to_small_vec(&self, memtables: &mut SmallMemtableVec) {
let inner = self.inner.lock().unwrap();
memtables.extend(inner.parts.iter().map(|part| part.memtable.clone()));
}
/// Returns the next memtable id.
pub(crate) fn next_memtable_id(&self) -> MemtableId {
let inner = self.inner.lock().unwrap();
inner.next_memtable_id
}
/// Returns all partitions.
fn list_partitions(&self) -> PartitionVec {
let inner = self.inner.lock().unwrap();
inner.parts.clone()
}
/// Write to multiple partitions.
fn write_multi_parts(&self, kvs: &KeyValues, parts: &PartitionVec) -> Result<()> {
// If part duration is `None` then there is always one partition and all rows
// will be put in that partition before invoking this method.
debug_assert!(self.part_duration.is_some());
let mut parts_to_write = HashMap::new();
let mut missing_parts = HashMap::new();
for kv in kvs.iter() {
let mut part_found = false;
// Safety: We used the timestamp before.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap();
for part in parts {
if part.contains_timestamp(ts) {
// Safety: Since part duration is `Some` so all time range should be `Some`.
parts_to_write
.entry(part.time_range.unwrap().min_timestamp)
.or_insert_with(|| PartitionToWrite {
partition: part.clone(),
key_values: Vec::new(),
})
.key_values
.push(kv);
part_found = true;
break;
}
}
if !part_found {
// We need to write it to a new part.
// Safety: `new()` ensures duration is always Some if we do to this method.
let part_duration = self.part_duration.unwrap();
let part_start =
partition_start_timestamp(ts, part_duration).with_context(|| {
InvalidRequestSnafu {
region_id: self.metadata.region_id,
reason: format!(
"timestamp {ts:?} and bucket {part_duration:?} are out of range"
),
}
})?;
missing_parts
.entry(part_start)
.or_insert_with(Vec::new)
.push(kv);
}
}
// Writes rows to existing parts.
for part_to_write in parts_to_write.into_values() {
for kv in part_to_write.key_values {
part_to_write.partition.memtable.write_one(kv)?;
}
}
let part_duration = self.part_duration.unwrap();
// Creates new parts and writes to them. Acquires the lock to avoid others create
// the same partition.
let mut inner = self.inner.lock().unwrap();
for (part_start, key_values) in missing_parts {
let part_pos = match inner
.parts
.iter()
.position(|part| part.time_range.unwrap().min_timestamp == part_start)
{
Some(pos) => pos,
None => {
let range = PartTimeRange::from_start_duration(part_start, part_duration)
.with_context(|| InvalidRequestSnafu {
region_id: self.metadata.region_id,
reason: format!(
"Partition time range for {part_start:?} is out of bound, bucket size: {part_duration:?}",
),
})?;
let memtable = self
.builder
.build(inner.alloc_memtable_id(), &self.metadata);
debug!(
"Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
range,
self.metadata.region_id,
part_duration,
memtable.id(),
inner.parts.len() + 1
);
let pos = inner.parts.len();
inner.parts.push(TimePartition {
memtable,
time_range: Some(range),
});
pos
}
};
let memtable = &inner.parts[part_pos].memtable;
for kv in key_values {
memtable.write_one(kv)?;
}
}
Ok(())
}
}
/// Computes the start timestamp of the partition for `ts`.
///
/// It always use bucket size in seconds which should fit all timestamp resolution.
fn partition_start_timestamp(ts: Timestamp, bucket: Duration) -> Option<Timestamp> {
// Safety: We convert it to seconds so it never returns `None`.
let ts_sec = ts.convert_to(TimeUnit::Second).unwrap();
let bucket_sec: i64 = bucket.as_secs().try_into().ok()?;
let start_sec = ts_sec.align_by_bucket(bucket_sec)?;
start_sec.convert_to(ts.unit())
}
#[derive(Debug)]
struct PartitionsInner {
/// All partitions.
parts: PartitionVec,
/// Next memtable id.
next_memtable_id: MemtableId,
}
impl PartitionsInner {
fn new(next_memtable_id: MemtableId) -> Self {
Self {
parts: Default::default(),
next_memtable_id,
}
}
fn with_partition(part: TimePartition, next_memtable_id: MemtableId) -> Self {
Self {
parts: smallvec![part],
next_memtable_id,
}
}
fn alloc_memtable_id(&mut self) -> MemtableId {
let id = self.next_memtable_id;
self.next_memtable_id += 1;
id
}
}
/// Time range of a partition.
#[derive(Debug, Clone, Copy)]
struct PartTimeRange {
/// Inclusive min timestamp of rows in the partition.
min_timestamp: Timestamp,
/// Exclusive max timestamp of rows in the partition.
max_timestamp: Timestamp,
}
impl PartTimeRange {
fn from_start_duration(start: Timestamp, duration: Duration) -> Option<Self> {
let start_sec = start.convert_to(TimeUnit::Second)?;
let end_sec = start_sec.add_duration(duration).ok()?;
let min_timestamp = start_sec.convert_to(start.unit())?;
let max_timestamp = end_sec.convert_to(start.unit())?;
Some(Self {
min_timestamp,
max_timestamp,
})
}
/// Returns whether the `ts` belongs to the partition.
fn contains_timestamp(&self, ts: Timestamp) -> bool {
self.min_timestamp <= ts && ts < self.max_timestamp
}
}
struct PartitionToWrite<'a> {
partition: TimePartition,
key_values: Vec<KeyValue<'a>>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memtable::merge_tree::MergeTreeMemtableBuilder;
use crate::test_util::memtable_util::{self, collect_iter_timestamps};
#[test]
fn test_no_duration() {
let metadata = memtable_util::metadata_for_test();
let builder = Arc::new(MergeTreeMemtableBuilder::default());
let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
assert_eq!(1, partitions.num_partitions());
assert!(partitions.is_empty());
let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
0,
&[1000, 3000, 7000, 5000, 6000],
0, // sequence 0, 1, 2, 3, 4
);
partitions.write(&kvs).unwrap();
assert_eq!(1, partitions.num_partitions());
assert!(!partitions.is_empty());
assert!(!partitions.is_empty());
let mut memtables = Vec::new();
partitions.list_memtables(&mut memtables);
let iter = memtables[0].iter(None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(&[1000, 3000, 5000, 6000, 7000], &timestamps[..]);
}
#[test]
fn test_write_single_part() {
let metadata = memtable_util::metadata_for_test();
let builder = Arc::new(MergeTreeMemtableBuilder::default());
let partitions =
TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
assert_eq!(0, partitions.num_partitions());
let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
0,
&[5000, 2000, 0],
0, // sequence 0, 1, 2
);
// It should creates a new partition.
partitions.write(&kvs).unwrap();
assert_eq!(1, partitions.num_partitions());
assert!(!partitions.is_empty());
let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
0,
&[3000, 7000, 4000],
3, // sequence 3, 4, 5
);
// Still writes to the same partition.
partitions.write(&kvs).unwrap();
assert_eq!(1, partitions.num_partitions());
let mut memtables = Vec::new();
partitions.list_memtables(&mut memtables);
let iter = memtables[0].iter(None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], &timestamps[..]);
let parts = partitions.list_partitions();
assert_eq!(
Timestamp::new_millisecond(0),
parts[0].time_range.unwrap().min_timestamp
);
assert_eq!(
Timestamp::new_millisecond(10000),
parts[0].time_range.unwrap().max_timestamp
);
}
#[test]
fn test_write_multi_parts() {
let metadata = memtable_util::metadata_for_test();
let builder = Arc::new(MergeTreeMemtableBuilder::default());
let partitions =
TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
assert_eq!(0, partitions.num_partitions());
let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
0,
&[2000, 0],
0, // sequence 0, 1
);
// It should creates a new partition.
partitions.write(&kvs).unwrap();
assert_eq!(1, partitions.num_partitions());
assert!(!partitions.is_empty());
let kvs = memtable_util::build_key_values(
&metadata,
"hello".to_string(),
0,
&[3000, 7000, 4000, 5000],
2, // sequence 2, 3, 4, 5
);
// Writes 2 rows to the old partition and 1 row to a new partition.
partitions.write(&kvs).unwrap();
assert_eq!(2, partitions.num_partitions());
let parts = partitions.list_partitions();
let iter = parts[0].memtable.iter(None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(
Timestamp::new_millisecond(0),
parts[0].time_range.unwrap().min_timestamp
);
assert_eq!(
Timestamp::new_millisecond(5000),
parts[0].time_range.unwrap().max_timestamp
);
assert_eq!(&[0, 2000, 3000, 4000], &timestamps[..]);
let iter = parts[1].memtable.iter(None, None).unwrap();
let timestamps = collect_iter_timestamps(iter);
assert_eq!(&[5000, 7000], &timestamps[..]);
assert_eq!(
Timestamp::new_millisecond(5000),
parts[1].time_range.unwrap().min_timestamp
);
assert_eq!(
Timestamp::new_millisecond(10000),
parts[1].time_range.unwrap().max_timestamp
);
}
}

View File

@@ -38,7 +38,6 @@ use table::predicate::Predicate;
use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRef, MemtableStats,
@@ -111,75 +110,49 @@ impl TimeSeriesMemtable {
}
/// Updates memtable stats.
fn update_stats(&self, stats: LocalStats) {
self.alloc_tracker.on_allocation(stats.allocated);
fn update_stats(&self, request_size: usize, min: i64, max: i64) {
self.alloc_tracker.on_allocation(request_size);
loop {
let current_min = self.min_timestamp.load(Ordering::Relaxed);
if stats.min_ts >= current_min {
if min >= current_min {
break;
}
let Err(updated) = self.min_timestamp.compare_exchange(
current_min,
stats.min_ts,
min,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};
if updated == stats.min_ts {
if updated == min {
break;
}
}
loop {
let current_max = self.max_timestamp.load(Ordering::Relaxed);
if stats.max_ts <= current_max {
if max <= current_max {
break;
}
let Err(updated) = self.max_timestamp.compare_exchange(
current_max,
stats.max_ts,
max,
Ordering::Relaxed,
Ordering::Relaxed,
) else {
break;
};
if updated == stats.max_ts {
if updated == max {
break;
}
}
}
fn write_key_value(&self, kv: KeyValue, stats: &mut LocalStats) -> Result<()> {
ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
PrimaryKeyLengthMismatchSnafu {
expect: self.row_codec.num_fields(),
actual: kv.num_primary_keys()
}
);
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let fields = kv.fields().collect::<Vec<_>>();
stats.allocated += fields.iter().map(|v| v.data_size()).sum::<usize>();
let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded);
stats.allocated += series_allocated;
// safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
stats.min_ts = stats.min_ts.min(ts);
stats.max_ts = stats.max_ts.max(ts);
let mut guard = series.write().unwrap();
guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields);
Ok(())
}
}
impl Debug for TimeSeriesMemtable {
@@ -194,30 +167,43 @@ impl Memtable for TimeSeriesMemtable {
}
fn write(&self, kvs: &KeyValues) -> Result<()> {
let mut local_stats = LocalStats::default();
let mut allocated = 0;
let mut min_ts = i64::MAX;
let mut max_ts = i64::MIN;
for kv in kvs.iter() {
self.write_key_value(kv, &mut local_stats)?;
ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
PrimaryKeyLengthMismatchSnafu {
expect: self.row_codec.num_fields(),
actual: kv.num_primary_keys()
}
);
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let fields = kv.fields().collect::<Vec<_>>();
allocated += fields.iter().map(|v| v.data_size()).sum::<usize>();
let (series, series_allocated) = self.series_set.get_or_add_series(primary_key_encoded);
allocated += series_allocated;
// safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
min_ts = min_ts.min(ts);
max_ts = max_ts.max(ts);
let mut guard = series.write().unwrap();
guard.push(kv.timestamp(), kv.sequence(), kv.op_type(), fields);
}
local_stats.allocated += kvs.num_rows() * std::mem::size_of::<Timestamp>();
local_stats.allocated += kvs.num_rows() * std::mem::size_of::<OpType>();
allocated += kvs.num_rows() * std::mem::size_of::<Timestamp>();
allocated += kvs.num_rows() * std::mem::size_of::<OpType>();
// TODO(hl): this maybe inaccurate since for-iteration may return early.
// We may lift the primary key length check out of Memtable::write
// so that we can ensure writing to memtable will succeed.
self.update_stats(local_stats);
self.update_stats(allocated, min_ts, max_ts);
Ok(())
}
fn write_one(&self, key_value: KeyValue) -> Result<()> {
let mut local_stats = LocalStats::default();
let res = self.write_key_value(key_value, &mut local_stats);
local_stats.allocated += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
self.update_stats(local_stats);
res
}
fn iter(
&self,
projection: Option<&[ColumnId]>,
@@ -281,22 +267,6 @@ impl Memtable for TimeSeriesMemtable {
}
}
struct LocalStats {
allocated: usize,
min_ts: i64,
max_ts: i64,
}
impl Default for LocalStats {
fn default() -> Self {
LocalStats {
allocated: 0,
min_ts: i64::MAX,
max_ts: i64::MIN,
}
}
}
type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
struct SeriesSet {

View File

@@ -20,29 +20,26 @@ use smallvec::SmallVec;
use store_api::metadata::RegionMetadataRef;
use crate::error::Result;
use crate::memtable::time_partition::TimePartitionsRef;
use crate::memtable::{MemtableId, MemtableRef};
pub(crate) type SmallMemtableVec = SmallVec<[MemtableRef; 2]>;
/// A version of current memtables in a region.
#[derive(Debug, Clone)]
pub(crate) struct MemtableVersion {
/// Mutable memtable.
pub(crate) mutable: TimePartitionsRef,
pub(crate) mutable: MemtableRef,
/// Immutable memtables.
///
/// We only allow one flush job per region but if a flush job failed, then we
/// might need to store more than one immutable memtable on the next time we
/// flush the region.
immutables: SmallMemtableVec,
immutables: SmallVec<[MemtableRef; 2]>,
}
pub(crate) type MemtableVersionRef = Arc<MemtableVersion>;
impl MemtableVersion {
/// Returns a new [MemtableVersion] with specific mutable memtable.
pub(crate) fn new(mutable: TimePartitionsRef) -> MemtableVersion {
pub(crate) fn new(mutable: MemtableRef) -> MemtableVersion {
MemtableVersion {
mutable,
immutables: SmallVec::new(),
@@ -56,8 +53,8 @@ impl MemtableVersion {
/// Lists mutable and immutable memtables.
pub(crate) fn list_memtables(&self) -> Vec<MemtableRef> {
let mut mems = Vec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
self.mutable.list_memtables(&mut mems);
let mut mems = Vec::with_capacity(self.immutables.len() + 1);
mems.push(self.mutable.clone());
mems.extend_from_slice(&self.immutables);
mems
}
@@ -79,13 +76,15 @@ impl MemtableVersion {
// soft limit.
self.mutable.freeze()?;
// Fork the memtable.
let mutable = Arc::new(self.mutable.fork(metadata));
let mutable = self.mutable.fork(self.next_memtable_id(), metadata);
// Pushes the mutable memtable to immutable list.
let mut immutables =
SmallVec::with_capacity(self.immutables.len() + self.mutable.num_partitions());
self.mutable.list_memtables_to_small_vec(&mut immutables);
immutables.extend(self.immutables.iter().cloned());
let immutables = self
.immutables
.iter()
.cloned()
.chain([self.mutable.clone()])
.collect();
Ok(Some(MemtableVersion {
mutable,
immutables,
@@ -104,7 +103,7 @@ impl MemtableVersion {
/// Returns the memory usage of the mutable memtable.
pub(crate) fn mutable_usage(&self) -> usize {
self.mutable.memory_usage()
self.mutable.stats().estimated_bytes
}
/// Returns the memory usage of the immutable memtables.
@@ -122,4 +121,9 @@ impl MemtableVersion {
pub(crate) fn is_empty(&self) -> bool {
self.mutable.is_empty() && self.immutables.is_empty()
}
/// Returns the next memtable id.
pub(crate) fn next_memtable_id(&self) -> MemtableId {
self.mutable.id() + 1
}
}

View File

@@ -37,7 +37,6 @@ use crate::error::{
};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::memtable::time_partition::TimePartitions;
use crate::memtable::MemtableBuilderRef;
use crate::region::options::RegionOptions;
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
@@ -170,13 +169,7 @@ impl RegionOpener {
RegionManifestManager::new(metadata.clone(), region_manifest_options).await?;
// Initial memtable id is 0.
let part_duration = options.compaction.time_window();
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
self.memtable_builder,
0,
part_duration,
));
let mutable = self.memtable_builder.build(0, &metadata);
debug!("Create region {} with options: {:?}", region_id, options);
@@ -272,13 +265,7 @@ impl RegionOpener {
self.cache_manager.clone(),
));
// Initial memtable id is 0.
let part_duration = region_options.compaction.time_window();
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
self.memtable_builder.clone(),
0,
part_duration,
));
let mutable = self.memtable_builder.build(0, &metadata);
let version = VersionBuilder::new(metadata, mutable)
.add_files(file_purger.clone(), manifest.files.values().cloned())
.flushed_entry_id(manifest.flushed_entry_id)

View File

@@ -94,14 +94,6 @@ pub enum CompactionOptions {
Twcs(TwcsOptions),
}
impl CompactionOptions {
pub(crate) fn time_window(&self) -> Option<Duration> {
match self {
CompactionOptions::Twcs(opts) => opts.time_window,
}
}
}
impl Default for CompactionOptions {
fn default() -> Self {
Self::Twcs(TwcsOptions::default())

View File

@@ -31,9 +31,8 @@ use store_api::storage::SequenceNumber;
use crate::error::Result;
use crate::manifest::action::RegionEdit;
use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
use crate::memtable::{MemtableBuilderRef, MemtableId};
use crate::memtable::{MemtableBuilderRef, MemtableId, MemtableRef};
use crate::region::options::RegionOptions;
use crate::sst::file::FileMeta;
use crate::sst::file_purger::FilePurgerRef;
@@ -123,14 +122,8 @@ impl VersionControl {
/// Mark all opened files as deleted and set the delete marker in [VersionControlData]
pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
let version = self.current().version;
let part_duration = version.memtables.mutable.part_duration();
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let new_mutable = Arc::new(TimePartitions::new(
version.metadata.clone(),
memtable_builder.clone(),
next_memtable_id,
part_duration,
));
let new_mutable =
memtable_builder.build(version.memtables.next_memtable_id(), &version.metadata);
let mut data = self.data.write().unwrap();
data.is_dropped = true;
@@ -147,14 +140,7 @@ impl VersionControl {
/// new schema. Memtables of the version must be empty.
pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
let version = self.current().version;
let part_duration = version.memtables.mutable.part_duration();
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let new_mutable = Arc::new(TimePartitions::new(
metadata.clone(),
builder.clone(),
next_memtable_id,
part_duration,
));
let new_mutable = builder.build(version.memtables.next_memtable_id(), &metadata);
debug_assert!(version.memtables.mutable.is_empty());
debug_assert!(version.memtables.immutables().is_empty());
let new_version = Arc::new(
@@ -177,14 +163,8 @@ impl VersionControl {
) {
let version = self.current().version;
let part_duration = version.memtables.mutable.part_duration();
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let new_mutable = Arc::new(TimePartitions::new(
version.metadata.clone(),
memtable_builder.clone(),
next_memtable_id,
part_duration,
));
let new_mutable =
memtable_builder.build(version.memtables.next_memtable_id(), &version.metadata);
let new_version = Arc::new(
VersionBuilder::new(version.metadata.clone(), new_mutable)
.flushed_entry_id(truncated_entry_id)
@@ -262,7 +242,7 @@ pub(crate) struct VersionBuilder {
impl VersionBuilder {
/// Returns a new builder.
pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self {
pub(crate) fn new(metadata: RegionMetadataRef, mutable: MemtableRef) -> Self {
VersionBuilder {
metadata,
memtables: Arc::new(MemtableVersion::new(mutable)),

View File

@@ -21,9 +21,7 @@ use api::v1::value::ValueData;
use api::v1::{Row, Rows, SemanticType};
use datatypes::arrow::array::UInt64Array;
use datatypes::data_type::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::TimestampMillisecondVector;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::storage::{ColumnId, RegionId, SequenceNumber};
use table::predicate::Predicate;
@@ -60,10 +58,6 @@ impl Memtable for EmptyMemtable {
Ok(())
}
fn write_one(&self, _key_value: KeyValue) -> Result<()> {
Ok(())
}
fn iter(
&self,
_projection: Option<&[ColumnId]>,
@@ -309,20 +303,3 @@ pub(crate) fn encode_key_by_kv(key_value: &KeyValue) -> Vec<u8> {
]);
row_codec.encode(key_value.primary_keys()).unwrap()
}
/// Collects timestamps from the batch iter.
pub(crate) fn collect_iter_timestamps(iter: BoxedBatchIterator) -> Vec<i64> {
iter.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect()
}

View File

@@ -25,7 +25,7 @@ use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}
use store_api::storage::RegionId;
use crate::manifest::action::RegionEdit;
use crate::memtable::time_partition::TimePartitions;
use crate::memtable::MemtableBuilder;
use crate::region::version::{Version, VersionBuilder, VersionControl};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
@@ -101,12 +101,7 @@ impl VersionControlBuilder {
pub(crate) fn build_version(&self) -> Version {
let metadata = Arc::new(self.metadata.clone());
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
self.memtable_builder.clone(),
0,
None,
));
let mutable = self.memtable_builder.build(0, &metadata);
VersionBuilder::new(metadata, mutable)
.add_files(self.file_purger.clone(), self.files.values().cloned())
.build()

View File

@@ -103,7 +103,7 @@ tokio-stream = { workspace = true, features = ["net"] }
tonic.workspace = true
tonic-reflection = "0.10"
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["full"] }
urlencoding = "2.1"
[target.'cfg(not(windows))'.dependencies]

View File

@@ -271,25 +271,18 @@ pub enum Error {
#[snafu(display("Not found influx http authorization info"))]
NotFoundInfluxAuth {},
#[snafu(display("Unsupported http auth scheme, name: {}", name))]
UnsupportedAuthScheme { name: String },
#[snafu(display("Invalid visibility ASCII chars"))]
InvalidAuthHeaderInvisibleASCII {
InvisibleASCII {
#[snafu(source)]
error: hyper::header::ToStrError,
location: Location,
},
#[snafu(display("Invalid utf-8 value"))]
InvalidAuthHeaderInvalidUtf8Value {
#[snafu(source)]
error: FromUtf8Error,
location: Location,
},
#[snafu(display("Unsupported http auth scheme, name: {}", name))]
UnsupportedAuthScheme { name: String },
#[snafu(display("Invalid http authorization header"))]
InvalidAuthHeader { location: Location },
InvalidAuthorizationHeader { location: Location },
#[snafu(display("Invalid base64 value"))]
InvalidBase64Value {
@@ -527,17 +520,16 @@ impl ErrorExt for Error {
DescribeStatement { source } => source.status_code(),
NotFoundAuthHeader { .. } | NotFoundInfluxAuth { .. } => StatusCode::AuthHeaderNotFound,
InvalidAuthHeaderInvisibleASCII { .. }
InvisibleASCII { .. }
| UnsupportedAuthScheme { .. }
| InvalidAuthHeader { .. }
| InvalidAuthorizationHeader { .. }
| InvalidBase64Value { .. }
| InvalidAuthHeaderInvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader,
| InvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader,
DatabaseNotFound { .. } => StatusCode::DatabaseNotFound,
#[cfg(feature = "mem-prof")]
DumpProfileData { source, .. } => source.status_code(),
InvalidUtf8Value { .. } | InvalidFlushArgument { .. } => StatusCode::InvalidArguments,
InvalidFlushArgument { .. } => StatusCode::InvalidArguments,
ReplacePreparedStmtParams { source, .. }
| GetPreparedStmtParams { source, .. }
@@ -613,7 +605,9 @@ macro_rules! define_into_tonic_status {
fn from(err: $Error) -> Self {
use tonic::codegen::http::{HeaderMap, HeaderValue};
use tonic::metadata::MetadataMap;
use $crate::http::header::constants::GREPTIME_DB_HEADER_ERROR_CODE;
use $crate::http::header::constants::{
GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG,
};
let mut headers = HeaderMap::<HeaderValue>::with_capacity(2);
@@ -626,6 +620,10 @@ macro_rules! define_into_tonic_status {
);
let root_error = err.output_msg();
if let Ok(err_msg) = HeaderValue::from_bytes(root_error.as_bytes()) {
let _ = headers.insert(GREPTIME_DB_HEADER_ERROR_MSG, err_msg);
}
let metadata = MetadataMap::from_headers(headers);
tonic::Status::with_metadata(
$crate::error::status_to_tonic_code(status_code),

View File

@@ -45,7 +45,6 @@ use tokio::sync::oneshot::{self, Sender};
use tokio::sync::Mutex;
use tower::timeout::TimeoutLayer;
use tower::ServiceBuilder;
use tower_http::decompression::RequestDecompressionLayer;
use tower_http::trace::TraceLayer;
use self::authorize::AuthState;
@@ -699,11 +698,6 @@ impl HttpServer {
Router::new()
.route("/write", routing::post(influxdb_write_v1))
.route("/api/v2/write", routing::post(influxdb_write_v2))
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
.layer(RequestDecompressionLayer::new()),
)
.route("/ping", routing::get(influxdb_ping))
.route("/health", routing::get(influxdb_health))
.with_state(influxdb_handler)

View File

@@ -35,7 +35,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use super::header::{GreptimeDbName, GREPTIME_TIMEZONE_HEADER_NAME};
use super::{ResponseFormat, PUBLIC_APIS};
use crate::error::{
self, InvalidAuthHeaderInvisibleASCIISnafu, InvalidAuthHeaderSnafu, InvalidParameterSnafu,
self, InvalidAuthorizationHeaderSnafu, InvalidParameterSnafu, InvisibleASCIISnafu,
NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu,
};
use crate::http::error_result::ErrorResponse;
@@ -174,13 +174,15 @@ fn get_influxdb_credentials<B>(request: &Request<B>) -> Result<Option<(Username,
// try header
let (auth_scheme, credential) = header
.to_str()
.context(InvalidAuthHeaderInvisibleASCIISnafu)?
.context(InvisibleASCIISnafu)?
.split_once(' ')
.context(InvalidAuthHeaderSnafu)?;
.context(InvalidAuthorizationHeaderSnafu)?;
let (username, password) = match auth_scheme.to_lowercase().as_str() {
"token" => {
let (u, p) = credential.split_once(':').context(InvalidAuthHeaderSnafu)?;
let (u, p) = credential
.split_once(':')
.context(InvalidAuthorizationHeaderSnafu)?;
(u.to_string(), p.to_string().into())
}
"basic" => decode_basic(credential)?,
@@ -235,10 +237,13 @@ impl TryFrom<&str> for AuthScheme {
type Error = error::Error;
fn try_from(value: &str) -> Result<Self> {
let (scheme, encoded_credentials) =
value.split_once(' ').context(InvalidAuthHeaderSnafu)?;
ensure!(!encoded_credentials.contains(' '), InvalidAuthHeaderSnafu);
let (scheme, encoded_credentials) = value
.split_once(' ')
.context(InvalidAuthorizationHeaderSnafu)?;
ensure!(
!encoded_credentials.contains(' '),
InvalidAuthorizationHeaderSnafu
);
match scheme.to_lowercase().as_str() {
"basic" => decode_basic(encoded_credentials)
@@ -256,7 +261,7 @@ fn auth_header<B>(req: &Request<B>) -> Result<AuthScheme> {
.get(http::header::AUTHORIZATION)
.context(error::NotFoundAuthHeaderSnafu)?
.to_str()
.context(InvalidAuthHeaderInvisibleASCIISnafu)?;
.context(InvisibleASCIISnafu)?;
auth_header.try_into()
}
@@ -265,14 +270,13 @@ fn decode_basic(credential: Credential) -> Result<(Username, Password)> {
let decoded = BASE64_STANDARD
.decode(credential)
.context(error::InvalidBase64ValueSnafu)?;
let as_utf8 =
String::from_utf8(decoded).context(error::InvalidAuthHeaderInvalidUtf8ValueSnafu)?;
let as_utf8 = String::from_utf8(decoded).context(error::InvalidUtf8ValueSnafu)?;
if let Some((user_id, password)) = as_utf8.split_once(':') {
return Ok((user_id.to_string(), password.to_string().into()));
}
InvalidAuthHeaderSnafu {}.fail()
InvalidAuthorizationHeaderSnafu {}.fail()
}
fn need_auth<B>(req: &Request<B>) -> bool {
@@ -391,7 +395,10 @@ mod tests {
let wrong_req = mock_http_request(Some("Basic dXNlcm5hbWU6 cGFzc3dvcmQ="), None).unwrap();
let res = auth_header(&wrong_req);
assert_matches!(res.err(), Some(error::Error::InvalidAuthHeader { .. }));
assert_matches!(
res.err(),
Some(error::Error::InvalidAuthorizationHeader { .. })
);
let wrong_req = mock_http_request(Some("Digest dXNlcm5hbWU6cGFzc3dvcmQ="), None).unwrap();
let res = auth_header(&wrong_req);

View File

@@ -21,7 +21,7 @@ use common_telemetry::logging::{debug, error};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::http::header::constants::GREPTIME_DB_HEADER_ERROR_CODE;
use crate::http::header::constants::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG};
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::ResponseFormat;
@@ -78,10 +78,15 @@ impl IntoResponse for ErrorResponse {
fn into_response(self) -> Response {
let ty = self.ty.as_str();
let code = self.code;
let msg = self.error.clone();
let execution_time = self.execution_time_ms;
let mut resp = Json(self).into_response();
resp.headers_mut()
.insert(GREPTIME_DB_HEADER_ERROR_CODE, HeaderValue::from(code));
resp.headers_mut().insert(
GREPTIME_DB_HEADER_ERROR_MSG,
HeaderValue::from_str(&msg).expect("malformed error msg"),
);
resp.headers_mut()
.insert(&GREPTIME_DB_HEADER_FORMAT, HeaderValue::from_static(ty));
resp.headers_mut().insert(

View File

@@ -36,6 +36,7 @@ pub mod constants {
pub const GREPTIME_DB_HEADER_NAME: &str = "x-greptime-db-name";
pub const GREPTIME_TIMEZONE_HEADER_NAME: &str = "x-greptime-timezone";
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = common_error::GREPTIME_DB_HEADER_ERROR_CODE;
pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = common_error::GREPTIME_DB_HEADER_ERROR_MSG;
}
pub static GREPTIME_DB_HEADER_FORMAT: HeaderName =