mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 06:12:55 +00:00
Compare commits
10 Commits
v0.4.1
...
v0.5.0-nig
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d478658b5 | ||
|
|
89ebe47cd9 | ||
|
|
212ea2c25c | ||
|
|
1658d088ab | ||
|
|
346b57cf10 | ||
|
|
e1dcf83326 | ||
|
|
b5d9d635eb | ||
|
|
88dd78a69c | ||
|
|
6439b929b3 | ||
|
|
ba15c14103 |
16
.github/workflows/nightly-ci.yml
vendored
16
.github/workflows/nightly-ci.yml
vendored
@@ -34,6 +34,14 @@ jobs:
|
||||
uses: Swatinem/rust-cache@v2
|
||||
- name: Run sqlness
|
||||
run: cargo sqlness
|
||||
- name: Notify slack if failed
|
||||
if: failure()
|
||||
uses: slackapi/slack-github-action@v1.23.0
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL_DEVELOP_CHANNEL }}
|
||||
with:
|
||||
payload: |
|
||||
{"text": "Nightly CI failed for sqlness tests"}
|
||||
- name: Upload sqlness logs
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v3
|
||||
@@ -80,3 +88,11 @@ jobs:
|
||||
GT_S3_ACCESS_KEY: ${{ secrets.S3_ACCESS_KEY }}
|
||||
GT_S3_REGION: ${{ secrets.S3_REGION }}
|
||||
UNITTEST_LOG_DIR: "__unittest_logs"
|
||||
- name: Notify slack if failed
|
||||
if: failure()
|
||||
uses: slackapi/slack-github-action@v1.23.0
|
||||
env:
|
||||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL_DEVELOP_CHANNEL }}
|
||||
with:
|
||||
payload: |
|
||||
{"text": "Nightly CI failed for cargo test"}
|
||||
|
||||
9
.github/workflows/release.yml
vendored
9
.github/workflows/release.yml
vendored
@@ -302,8 +302,12 @@ jobs:
|
||||
release-cn-artifacts:
|
||||
name: Release artifacts to CN region
|
||||
if: ${{ inputs.release_images || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
needs: [
|
||||
needs: [ # The job have to wait for all the artifacts are built.
|
||||
allocate-runners,
|
||||
build-linux-amd64-artifacts,
|
||||
build-linux-arm64-artifacts,
|
||||
build-macos-artifacts,
|
||||
build-windows-artifacts,
|
||||
release-images-to-dockerhub,
|
||||
]
|
||||
runs-on: ubuntu-20.04
|
||||
@@ -338,11 +342,12 @@ jobs:
|
||||
publish-github-release:
|
||||
name: Create GitHub release and upload artifacts
|
||||
if: ${{ inputs.publish_github_release || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
needs: [
|
||||
needs: [ # The job have to wait for all the artifacts are built.
|
||||
allocate-runners,
|
||||
build-linux-amd64-artifacts,
|
||||
build-linux-arm64-artifacts,
|
||||
build-macos-artifacts,
|
||||
build-windows-artifacts,
|
||||
release-images-to-dockerhub,
|
||||
]
|
||||
runs-on: ubuntu-20.04
|
||||
|
||||
26
.github/workflows/size-label.yml
vendored
Normal file
26
.github/workflows/size-label.yml
vendored
Normal file
@@ -0,0 +1,26 @@
|
||||
name: size-labeler
|
||||
|
||||
on: [pull_request]
|
||||
|
||||
jobs:
|
||||
labeler:
|
||||
runs-on: ubuntu-latest
|
||||
name: Label the PR size
|
||||
steps:
|
||||
- uses: codelytv/pr-size-labeler@v1
|
||||
with:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
s_label: 'Size: S'
|
||||
s_max_size: '100'
|
||||
m_label: 'Size: M'
|
||||
m_max_size: '500'
|
||||
l_label: 'Size: L'
|
||||
l_max_size: '1000'
|
||||
xl_label: 'Size: XL'
|
||||
fail_if_xl: 'false'
|
||||
message_if_xl: >
|
||||
This PR exceeds the recommended size of 1000 lines.
|
||||
Please make sure you are NOT addressing multiple issues with one PR.
|
||||
Note this PR might be rejected due to its size.
|
||||
github_api_url: 'api.github.com'
|
||||
files_to_ignore: 'Cargo.lock'
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -7034,6 +7034,7 @@ dependencies = [
|
||||
"common-catalog",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-recordbatch",
|
||||
"common-telemetry",
|
||||
"datafusion",
|
||||
"datatypes",
|
||||
|
||||
@@ -19,8 +19,13 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
|
||||
build-essential \
|
||||
pkg-config \
|
||||
python3.10 \
|
||||
python3.10-dev \
|
||||
python3-pip
|
||||
python3.10-dev
|
||||
|
||||
# Remove Python 3.8 and install pip.
|
||||
RUN apt-get -y purge python3.8 && \
|
||||
apt-get -y autoremove && \
|
||||
ln -s /usr/bin/python3.10 /usr/bin/python3 && \
|
||||
curl -sS https://bootstrap.pypa.io/get-pip.py | python3.10
|
||||
|
||||
RUN git config --global --add safe.directory /greptimedb
|
||||
|
||||
|
||||
@@ -57,7 +57,10 @@ impl GreptimeDBTelemetryTask {
|
||||
task_fn: BoxedTaskFunction<Error>,
|
||||
should_report: Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
GreptimeDBTelemetryTask::Enable((RepeatedTask::new(interval, task_fn), should_report))
|
||||
GreptimeDBTelemetryTask::Enable((
|
||||
RepeatedTask::new(interval, task_fn).with_initial_delay(Some(Duration::ZERO)),
|
||||
should_report,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn disable() -> Self {
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub mod memory;
|
||||
pub mod test;
|
||||
pub mod txn;
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
@@ -17,7 +17,6 @@ use std::collections::btree_map::Entry;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Range;
|
||||
use std::sync::RwLock;
|
||||
|
||||
use async_trait::async_trait;
|
||||
@@ -85,21 +84,25 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
|
||||
}
|
||||
|
||||
async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error> {
|
||||
let range = req.range();
|
||||
let RangeRequest {
|
||||
key,
|
||||
range_end,
|
||||
limit,
|
||||
keys_only,
|
||||
limit, keys_only, ..
|
||||
} = req;
|
||||
|
||||
let kvs = self.kvs.read().unwrap();
|
||||
let values = kvs.range(range);
|
||||
|
||||
let iter: Box<dyn Iterator<Item = (&Vec<u8>, &Vec<u8>)>> = if range_end.is_empty() {
|
||||
Box::new(kvs.get_key_value(&key).into_iter())
|
||||
} else {
|
||||
Box::new(kvs.range(key..range_end))
|
||||
};
|
||||
let mut kvs = iter
|
||||
let mut more = false;
|
||||
let mut iter: i64 = 0;
|
||||
|
||||
let kvs = values
|
||||
.take_while(|_| {
|
||||
let take = limit == 0 || iter != limit;
|
||||
iter += 1;
|
||||
more = limit > 0 && iter > limit;
|
||||
|
||||
take
|
||||
})
|
||||
.map(|(k, v)| {
|
||||
let key = k.clone();
|
||||
let value = if keys_only { vec![] } else { v.clone() };
|
||||
@@ -107,13 +110,6 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let more = if limit > 0 && kvs.len() > limit as usize {
|
||||
kvs.truncate(limit as usize);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
Ok(RangeResponse { kvs, more })
|
||||
}
|
||||
|
||||
@@ -215,36 +211,32 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
|
||||
&self,
|
||||
req: DeleteRangeRequest,
|
||||
) -> Result<DeleteRangeResponse, Self::Error> {
|
||||
let DeleteRangeRequest {
|
||||
key,
|
||||
range_end,
|
||||
prev_kv,
|
||||
} = req;
|
||||
let range = req.range();
|
||||
let DeleteRangeRequest { prev_kv, .. } = req;
|
||||
|
||||
let mut kvs = self.kvs.write().unwrap();
|
||||
|
||||
let prev_kvs = if range_end.is_empty() {
|
||||
kvs.remove(&key)
|
||||
.into_iter()
|
||||
.map(|value| KeyValue {
|
||||
key: key.clone(),
|
||||
value,
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
let range = Range {
|
||||
start: key,
|
||||
end: range_end,
|
||||
};
|
||||
kvs.extract_if(|key, _| range.contains(key))
|
||||
.map(Into::into)
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
let keys = kvs
|
||||
.range(range)
|
||||
.map(|(key, _)| key.clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(DeleteRangeResponse {
|
||||
deleted: prev_kvs.len() as i64,
|
||||
prev_kvs: if prev_kv { prev_kvs } else { vec![] },
|
||||
})
|
||||
let mut prev_kvs = if prev_kv {
|
||||
Vec::with_capacity(keys.len())
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
let deleted = keys.len() as i64;
|
||||
|
||||
for key in keys {
|
||||
if let Some(value) = kvs.remove(&key) {
|
||||
if prev_kv {
|
||||
prev_kvs.push((key.clone(), value).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(DeleteRangeResponse { deleted, prev_kvs })
|
||||
}
|
||||
|
||||
async fn batch_delete(
|
||||
@@ -358,254 +350,63 @@ impl<T: ErrorExt + Send + Sync> TxnService for MemoryKvBackend<T> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::atomic::{AtomicU8, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::kv_backend::test::{
|
||||
prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
|
||||
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
|
||||
};
|
||||
use crate::kv_backend::KvBackend;
|
||||
use crate::rpc::store::{BatchGetRequest, BatchPutRequest};
|
||||
use crate::rpc::KeyValue;
|
||||
use crate::util;
|
||||
|
||||
async fn mock_mem_store_with_data() -> MemoryKvBackend<Error> {
|
||||
let kv_store = MemoryKvBackend::<Error>::new();
|
||||
let kvs = mock_kvs();
|
||||
|
||||
assert!(kv_store
|
||||
.batch_put(BatchPutRequest {
|
||||
kvs,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.is_ok());
|
||||
|
||||
assert!(kv_store
|
||||
.put(PutRequest {
|
||||
key: b"key11".to_vec(),
|
||||
value: b"val11".to_vec(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.is_ok());
|
||||
prepare_kv(&kv_store).await;
|
||||
|
||||
kv_store
|
||||
}
|
||||
|
||||
fn mock_kvs() -> Vec<KeyValue> {
|
||||
vec![
|
||||
KeyValue {
|
||||
key: b"key1".to_vec(),
|
||||
value: b"val1".to_vec(),
|
||||
},
|
||||
KeyValue {
|
||||
key: b"key2".to_vec(),
|
||||
value: b"val2".to_vec(),
|
||||
},
|
||||
KeyValue {
|
||||
key: b"key3".to_vec(),
|
||||
value: b"val3".to_vec(),
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put() {
|
||||
let kv_store = mock_mem_store_with_data().await;
|
||||
|
||||
let resp = kv_store
|
||||
.put(PutRequest {
|
||||
key: b"key11".to_vec(),
|
||||
value: b"val12".to_vec(),
|
||||
prev_kv: false,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(resp.prev_kv.is_none());
|
||||
|
||||
let resp = kv_store
|
||||
.put(PutRequest {
|
||||
key: b"key11".to_vec(),
|
||||
value: b"val13".to_vec(),
|
||||
prev_kv: true,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let prev_kv = resp.prev_kv.unwrap();
|
||||
assert_eq!(b"key11", prev_kv.key());
|
||||
assert_eq!(b"val12", prev_kv.value());
|
||||
test_kv_put(kv_store).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_range() {
|
||||
let kv_store = mock_mem_store_with_data().await;
|
||||
|
||||
let key = b"key1".to_vec();
|
||||
let range_end = util::get_prefix_end_key(b"key1");
|
||||
test_kv_range(kv_store).await;
|
||||
}
|
||||
|
||||
let resp = kv_store
|
||||
.range(RangeRequest {
|
||||
key: key.clone(),
|
||||
range_end: range_end.clone(),
|
||||
limit: 0,
|
||||
keys_only: false,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
#[tokio::test]
|
||||
async fn test_range_2() {
|
||||
let kv = MemoryKvBackend::<Error>::new();
|
||||
|
||||
assert_eq!(2, resp.kvs.len());
|
||||
assert_eq!(b"key1", resp.kvs[0].key());
|
||||
assert_eq!(b"val1", resp.kvs[0].value());
|
||||
assert_eq!(b"key11", resp.kvs[1].key());
|
||||
assert_eq!(b"val11", resp.kvs[1].value());
|
||||
|
||||
let resp = kv_store
|
||||
.range(RangeRequest {
|
||||
key: key.clone(),
|
||||
range_end: range_end.clone(),
|
||||
limit: 0,
|
||||
keys_only: true,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(2, resp.kvs.len());
|
||||
assert_eq!(b"key1", resp.kvs[0].key());
|
||||
assert_eq!(b"", resp.kvs[0].value());
|
||||
assert_eq!(b"key11", resp.kvs[1].key());
|
||||
assert_eq!(b"", resp.kvs[1].value());
|
||||
|
||||
let resp = kv_store
|
||||
.range(RangeRequest {
|
||||
key: key.clone(),
|
||||
limit: 0,
|
||||
keys_only: false,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(1, resp.kvs.len());
|
||||
assert_eq!(b"key1", resp.kvs[0].key());
|
||||
assert_eq!(b"val1", resp.kvs[0].value());
|
||||
|
||||
let resp = kv_store
|
||||
.range(RangeRequest {
|
||||
key,
|
||||
range_end,
|
||||
limit: 1,
|
||||
keys_only: false,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(1, resp.kvs.len());
|
||||
assert_eq!(b"key1", resp.kvs[0].key());
|
||||
assert_eq!(b"val1", resp.kvs[0].value());
|
||||
test_kv_range_2(kv).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_batch_get() {
|
||||
let kv_store = mock_mem_store_with_data().await;
|
||||
|
||||
let keys = vec![];
|
||||
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
|
||||
|
||||
assert!(resp.kvs.is_empty());
|
||||
|
||||
let keys = vec![b"key10".to_vec()];
|
||||
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
|
||||
|
||||
assert!(resp.kvs.is_empty());
|
||||
|
||||
let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()];
|
||||
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
|
||||
|
||||
assert_eq!(2, resp.kvs.len());
|
||||
assert_eq!(b"key1", resp.kvs[0].key());
|
||||
assert_eq!(b"val1", resp.kvs[0].value());
|
||||
assert_eq!(b"key3", resp.kvs[1].key());
|
||||
assert_eq!(b"val3", resp.kvs[1].value());
|
||||
test_kv_batch_get(kv_store).await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_compare_and_put() {
|
||||
let kv_store = Arc::new(MemoryKvBackend::<Error>::new());
|
||||
let success = Arc::new(AtomicU8::new(0));
|
||||
|
||||
let mut joins = vec![];
|
||||
for _ in 0..20 {
|
||||
let kv_store_clone = kv_store.clone();
|
||||
let success_clone = success.clone();
|
||||
let join = tokio::spawn(async move {
|
||||
let req = CompareAndPutRequest {
|
||||
key: b"key".to_vec(),
|
||||
expect: vec![],
|
||||
value: b"val_new".to_vec(),
|
||||
};
|
||||
let resp = kv_store_clone.compare_and_put(req).await.unwrap();
|
||||
if resp.success {
|
||||
success_clone.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
});
|
||||
joins.push(join);
|
||||
}
|
||||
|
||||
for join in joins {
|
||||
join.await.unwrap();
|
||||
}
|
||||
|
||||
assert_eq!(1, success.load(Ordering::SeqCst));
|
||||
test_kv_compare_and_put(kv_store).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_range() {
|
||||
let kv_store = mock_mem_store_with_data().await;
|
||||
|
||||
let req = DeleteRangeRequest {
|
||||
key: b"key3".to_vec(),
|
||||
range_end: vec![],
|
||||
prev_kv: true,
|
||||
};
|
||||
|
||||
let resp = kv_store.delete_range(req).await.unwrap();
|
||||
assert_eq!(1, resp.prev_kvs.len());
|
||||
assert_eq!(b"key3", resp.prev_kvs[0].key());
|
||||
assert_eq!(b"val3", resp.prev_kvs[0].value());
|
||||
|
||||
let resp = kv_store.get(b"key3").await.unwrap();
|
||||
assert!(resp.is_none());
|
||||
|
||||
let req = DeleteRangeRequest {
|
||||
key: b"key2".to_vec(),
|
||||
range_end: vec![],
|
||||
prev_kv: false,
|
||||
};
|
||||
|
||||
let resp = kv_store.delete_range(req).await.unwrap();
|
||||
assert!(resp.prev_kvs.is_empty());
|
||||
|
||||
let resp = kv_store.get(b"key2").await.unwrap();
|
||||
assert!(resp.is_none());
|
||||
|
||||
let key = b"key1".to_vec();
|
||||
let range_end = util::get_prefix_end_key(b"key1");
|
||||
|
||||
let req = DeleteRangeRequest {
|
||||
key: key.clone(),
|
||||
range_end: range_end.clone(),
|
||||
prev_kv: true,
|
||||
};
|
||||
let resp = kv_store.delete_range(req).await.unwrap();
|
||||
assert_eq!(2, resp.prev_kvs.len());
|
||||
|
||||
let req = RangeRequest {
|
||||
key,
|
||||
range_end,
|
||||
..Default::default()
|
||||
};
|
||||
let resp = kv_store.range(req).await.unwrap();
|
||||
assert!(resp.kvs.is_empty());
|
||||
test_kv_delete_range(kv_store).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -636,35 +437,6 @@ mod tests {
|
||||
async fn test_batch_delete() {
|
||||
let kv_store = mock_mem_store_with_data().await;
|
||||
|
||||
assert!(kv_store.get(b"key1").await.unwrap().is_some());
|
||||
assert!(kv_store.get(b"key100").await.unwrap().is_none());
|
||||
|
||||
let req = BatchDeleteRequest {
|
||||
keys: vec![b"key1".to_vec(), b"key100".to_vec()],
|
||||
prev_kv: true,
|
||||
};
|
||||
let resp = kv_store.batch_delete(req).await.unwrap();
|
||||
assert_eq!(1, resp.prev_kvs.len());
|
||||
assert_eq!(
|
||||
vec![KeyValue {
|
||||
key: b"key1".to_vec(),
|
||||
value: b"val1".to_vec()
|
||||
}],
|
||||
resp.prev_kvs
|
||||
);
|
||||
assert!(kv_store.get(b"key1").await.unwrap().is_none());
|
||||
|
||||
assert!(kv_store.get(b"key2").await.unwrap().is_some());
|
||||
assert!(kv_store.get(b"key3").await.unwrap().is_some());
|
||||
|
||||
let req = BatchDeleteRequest {
|
||||
keys: vec![b"key2".to_vec(), b"key3".to_vec()],
|
||||
prev_kv: false,
|
||||
};
|
||||
let resp = kv_store.batch_delete(req).await.unwrap();
|
||||
assert!(resp.prev_kvs.is_empty());
|
||||
|
||||
assert!(kv_store.get(b"key2").await.unwrap().is_none());
|
||||
assert!(kv_store.get(b"key3").await.unwrap().is_none());
|
||||
test_kv_batch_delete(kv_store).await;
|
||||
}
|
||||
}
|
||||
|
||||
352
src/common/meta/src/kv_backend/test.rs
Normal file
352
src/common/meta/src/kv_backend/test.rs
Normal file
@@ -0,0 +1,352 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::atomic::{AtomicU8, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::{KvBackend, *};
|
||||
use crate::error::Error;
|
||||
use crate::rpc::store::{BatchGetRequest, PutRequest};
|
||||
use crate::rpc::KeyValue;
|
||||
use crate::util;
|
||||
|
||||
pub fn mock_kvs() -> Vec<KeyValue> {
|
||||
vec![
|
||||
KeyValue {
|
||||
key: b"key1".to_vec(),
|
||||
value: b"val1".to_vec(),
|
||||
},
|
||||
KeyValue {
|
||||
key: b"key2".to_vec(),
|
||||
value: b"val2".to_vec(),
|
||||
},
|
||||
KeyValue {
|
||||
key: b"key3".to_vec(),
|
||||
value: b"val3".to_vec(),
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
pub async fn prepare_kv(kv_store: &impl KvBackend) {
|
||||
let kvs = mock_kvs();
|
||||
assert!(kv_store
|
||||
.batch_put(BatchPutRequest {
|
||||
kvs,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.is_ok());
|
||||
|
||||
assert!(kv_store
|
||||
.put(PutRequest {
|
||||
key: b"key11".to_vec(),
|
||||
value: b"val11".to_vec(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
pub async fn test_kv_put(kv_store: impl KvBackend) {
|
||||
let resp = kv_store
|
||||
.put(PutRequest {
|
||||
key: b"key11".to_vec(),
|
||||
value: b"val12".to_vec(),
|
||||
prev_kv: false,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(resp.prev_kv.is_none());
|
||||
|
||||
let resp = kv_store
|
||||
.put(PutRequest {
|
||||
key: b"key11".to_vec(),
|
||||
value: b"val13".to_vec(),
|
||||
prev_kv: true,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let prev_kv = resp.prev_kv.unwrap();
|
||||
assert_eq!(b"key11", prev_kv.key());
|
||||
assert_eq!(b"val12", prev_kv.value());
|
||||
}
|
||||
|
||||
pub async fn test_kv_range(kv_store: impl KvBackend) {
|
||||
let key = b"key1".to_vec();
|
||||
let range_end = util::get_prefix_end_key(b"key1");
|
||||
|
||||
let resp = kv_store
|
||||
.range(RangeRequest {
|
||||
key: key.clone(),
|
||||
range_end: range_end.clone(),
|
||||
limit: 0,
|
||||
keys_only: false,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(2, resp.kvs.len());
|
||||
assert_eq!(b"key1", resp.kvs[0].key());
|
||||
assert_eq!(b"val1", resp.kvs[0].value());
|
||||
assert_eq!(b"key11", resp.kvs[1].key());
|
||||
assert_eq!(b"val11", resp.kvs[1].value());
|
||||
|
||||
let resp = kv_store
|
||||
.range(RangeRequest {
|
||||
key: key.clone(),
|
||||
range_end: range_end.clone(),
|
||||
limit: 0,
|
||||
keys_only: true,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(2, resp.kvs.len());
|
||||
assert_eq!(b"key1", resp.kvs[0].key());
|
||||
assert_eq!(b"", resp.kvs[0].value());
|
||||
assert_eq!(b"key11", resp.kvs[1].key());
|
||||
assert_eq!(b"", resp.kvs[1].value());
|
||||
|
||||
let resp = kv_store
|
||||
.range(RangeRequest {
|
||||
key: key.clone(),
|
||||
limit: 0,
|
||||
keys_only: false,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(1, resp.kvs.len());
|
||||
assert_eq!(b"key1", resp.kvs[0].key());
|
||||
assert_eq!(b"val1", resp.kvs[0].value());
|
||||
|
||||
let resp = kv_store
|
||||
.range(RangeRequest {
|
||||
key,
|
||||
range_end,
|
||||
limit: 1,
|
||||
keys_only: false,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(1, resp.kvs.len());
|
||||
assert_eq!(b"key1", resp.kvs[0].key());
|
||||
assert_eq!(b"val1", resp.kvs[0].value());
|
||||
}
|
||||
|
||||
pub async fn test_kv_range_2(kv_store: impl KvBackend) {
|
||||
kv_store
|
||||
.put(PutRequest::new().with_key("atest").with_value("value"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
kv_store
|
||||
.put(PutRequest::new().with_key("test").with_value("value"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// If both key and range_end are ‘\0’, then range represents all keys.
|
||||
let result = kv_store
|
||||
.range(RangeRequest::new().with_range(b"\0".to_vec(), b"\0".to_vec()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result.kvs.len(), 2);
|
||||
assert!(!result.more);
|
||||
|
||||
// If range_end is ‘\0’, the range is all keys greater than or equal to the key argument.
|
||||
let result = kv_store
|
||||
.range(RangeRequest::new().with_range(b"a".to_vec(), b"\0".to_vec()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result.kvs.len(), 2);
|
||||
|
||||
let result = kv_store
|
||||
.range(RangeRequest::new().with_range(b"b".to_vec(), b"\0".to_vec()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result.kvs.len(), 1);
|
||||
assert_eq!(result.kvs[0].key, b"test");
|
||||
|
||||
// Fetches the keys >= "a", set limit to 1, the `more` should be true.
|
||||
let result = kv_store
|
||||
.range(
|
||||
RangeRequest::new()
|
||||
.with_range(b"a".to_vec(), b"\0".to_vec())
|
||||
.with_limit(1),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.kvs.len(), 1);
|
||||
assert!(result.more);
|
||||
|
||||
// Fetches the keys >= "a", set limit to 2, the `more` should be false.
|
||||
let result = kv_store
|
||||
.range(
|
||||
RangeRequest::new()
|
||||
.with_range(b"a".to_vec(), b"\0".to_vec())
|
||||
.with_limit(2),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.kvs.len(), 2);
|
||||
assert!(!result.more);
|
||||
|
||||
// Fetches the keys >= "a", set limit to 3, the `more` should be false.
|
||||
let result = kv_store
|
||||
.range(
|
||||
RangeRequest::new()
|
||||
.with_range(b"a".to_vec(), b"\0".to_vec())
|
||||
.with_limit(3),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.kvs.len(), 2);
|
||||
assert!(!result.more);
|
||||
}
|
||||
|
||||
pub async fn test_kv_batch_get(kv_store: impl KvBackend) {
|
||||
let keys = vec![];
|
||||
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
|
||||
|
||||
assert!(resp.kvs.is_empty());
|
||||
|
||||
let keys = vec![b"key10".to_vec()];
|
||||
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
|
||||
|
||||
assert!(resp.kvs.is_empty());
|
||||
|
||||
let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()];
|
||||
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
|
||||
|
||||
assert_eq!(2, resp.kvs.len());
|
||||
assert_eq!(b"key1", resp.kvs[0].key());
|
||||
assert_eq!(b"val1", resp.kvs[0].value());
|
||||
assert_eq!(b"key3", resp.kvs[1].key());
|
||||
assert_eq!(b"val3", resp.kvs[1].value());
|
||||
}
|
||||
|
||||
pub async fn test_kv_compare_and_put(kv_store: Arc<dyn KvBackend<Error = Error>>) {
|
||||
let success = Arc::new(AtomicU8::new(0));
|
||||
|
||||
let mut joins = vec![];
|
||||
for _ in 0..20 {
|
||||
let kv_store_clone = kv_store.clone();
|
||||
let success_clone = success.clone();
|
||||
let join = tokio::spawn(async move {
|
||||
let req = CompareAndPutRequest {
|
||||
key: b"key".to_vec(),
|
||||
expect: vec![],
|
||||
value: b"val_new".to_vec(),
|
||||
};
|
||||
let resp = kv_store_clone.compare_and_put(req).await.unwrap();
|
||||
if resp.success {
|
||||
success_clone.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
});
|
||||
joins.push(join);
|
||||
}
|
||||
|
||||
for join in joins {
|
||||
join.await.unwrap();
|
||||
}
|
||||
|
||||
assert_eq!(1, success.load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
pub async fn test_kv_delete_range(kv_store: impl KvBackend) {
|
||||
let req = DeleteRangeRequest {
|
||||
key: b"key3".to_vec(),
|
||||
range_end: vec![],
|
||||
prev_kv: true,
|
||||
};
|
||||
|
||||
let resp = kv_store.delete_range(req).await.unwrap();
|
||||
assert_eq!(1, resp.prev_kvs.len());
|
||||
assert_eq!(1, resp.deleted);
|
||||
assert_eq!(b"key3", resp.prev_kvs[0].key());
|
||||
assert_eq!(b"val3", resp.prev_kvs[0].value());
|
||||
|
||||
let resp = kv_store.get(b"key3").await.unwrap();
|
||||
assert!(resp.is_none());
|
||||
|
||||
let req = DeleteRangeRequest {
|
||||
key: b"key2".to_vec(),
|
||||
range_end: vec![],
|
||||
prev_kv: false,
|
||||
};
|
||||
|
||||
let resp = kv_store.delete_range(req).await.unwrap();
|
||||
assert_eq!(1, resp.deleted);
|
||||
assert!(resp.prev_kvs.is_empty());
|
||||
|
||||
let resp = kv_store.get(b"key2").await.unwrap();
|
||||
assert!(resp.is_none());
|
||||
|
||||
let key = b"key1".to_vec();
|
||||
let range_end = util::get_prefix_end_key(b"key1");
|
||||
|
||||
let req = DeleteRangeRequest {
|
||||
key: key.clone(),
|
||||
range_end: range_end.clone(),
|
||||
prev_kv: true,
|
||||
};
|
||||
let resp = kv_store.delete_range(req).await.unwrap();
|
||||
assert_eq!(2, resp.prev_kvs.len());
|
||||
|
||||
let req = RangeRequest {
|
||||
key,
|
||||
range_end,
|
||||
..Default::default()
|
||||
};
|
||||
let resp = kv_store.range(req).await.unwrap();
|
||||
assert!(resp.kvs.is_empty());
|
||||
}
|
||||
|
||||
pub async fn test_kv_batch_delete(kv_store: impl KvBackend) {
|
||||
assert!(kv_store.get(b"key1").await.unwrap().is_some());
|
||||
assert!(kv_store.get(b"key100").await.unwrap().is_none());
|
||||
|
||||
let req = BatchDeleteRequest {
|
||||
keys: vec![b"key1".to_vec(), b"key100".to_vec()],
|
||||
prev_kv: true,
|
||||
};
|
||||
let resp = kv_store.batch_delete(req).await.unwrap();
|
||||
assert_eq!(1, resp.prev_kvs.len());
|
||||
assert_eq!(
|
||||
vec![KeyValue {
|
||||
key: b"key1".to_vec(),
|
||||
value: b"val1".to_vec()
|
||||
}],
|
||||
resp.prev_kvs
|
||||
);
|
||||
assert!(kv_store.get(b"key1").await.unwrap().is_none());
|
||||
|
||||
assert!(kv_store.get(b"key2").await.unwrap().is_some());
|
||||
assert!(kv_store.get(b"key3").await.unwrap().is_some());
|
||||
|
||||
let req = BatchDeleteRequest {
|
||||
keys: vec![b"key2".to_vec(), b"key3".to_vec()],
|
||||
prev_kv: false,
|
||||
};
|
||||
let resp = kv_store.batch_delete(req).await.unwrap();
|
||||
assert!(resp.prev_kvs.is_empty());
|
||||
|
||||
assert!(kv_store.get(b"key2").await.unwrap().is_none());
|
||||
assert!(kv_store.get(b"key3").await.unwrap().is_none());
|
||||
}
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::ops::Bound;
|
||||
|
||||
use api::v1::meta::{
|
||||
BatchDeleteRequest as PbBatchDeleteRequest, BatchDeleteResponse as PbBatchDeleteResponse,
|
||||
@@ -30,6 +31,17 @@ use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::rpc::{util, KeyValue};
|
||||
|
||||
pub fn to_range(key: Vec<u8>, range_end: Vec<u8>) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
|
||||
match (&key[..], &range_end[..]) {
|
||||
(_, []) => (Bound::Included(key.clone()), Bound::Included(key)),
|
||||
// If both key and range_end are ‘\0’, then range represents all keys.
|
||||
([0], [0]) => (Bound::Unbounded, Bound::Unbounded),
|
||||
// If range_end is ‘\0’, the range is all keys greater than or equal to the key argument.
|
||||
(_, [0]) => (Bound::Included(key), Bound::Unbounded),
|
||||
(_, _) => (Bound::Included(key), Bound::Excluded(range_end)),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct RangeRequest {
|
||||
/// key is the first key for the range, If range_end is not given, the
|
||||
@@ -96,6 +108,11 @@ impl RangeRequest {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the `RangeBounds`.
|
||||
pub fn range(&self) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
|
||||
to_range(self.key.clone(), self.range_end.clone())
|
||||
}
|
||||
|
||||
/// key is the first key for the range, If range_end is not given, the
|
||||
/// request only looks up key.
|
||||
#[inline]
|
||||
@@ -690,6 +707,11 @@ impl DeleteRangeRequest {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the `RangeBounds`.
|
||||
pub fn range(&self) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
|
||||
to_range(self.key.clone(), self.range_end.clone())
|
||||
}
|
||||
|
||||
/// key is the first key to delete in the range. If range_end is not given,
|
||||
/// the range is defined to contain only the key argument.
|
||||
#[inline]
|
||||
|
||||
@@ -40,6 +40,7 @@ pub type BoxedTaskFunction<E> = Box<dyn TaskFunction<E> + Send + Sync + 'static>
|
||||
struct TaskInner<E> {
|
||||
/// The repeated task handle. This handle is Some if the task is started.
|
||||
task_handle: Option<JoinHandle<()>>,
|
||||
|
||||
/// The task_fn to run. This is Some if the task is not started.
|
||||
task_fn: Option<BoxedTaskFunction<E>>,
|
||||
}
|
||||
@@ -50,6 +51,7 @@ pub struct RepeatedTask<E> {
|
||||
inner: Mutex<TaskInner<E>>,
|
||||
started: AtomicBool,
|
||||
interval: Duration,
|
||||
initial_delay: Option<Duration>,
|
||||
}
|
||||
|
||||
impl<E> std::fmt::Display for RepeatedTask<E> {
|
||||
@@ -75,6 +77,9 @@ impl<E> Drop for RepeatedTask<E> {
|
||||
}
|
||||
|
||||
impl<E: ErrorExt + 'static> RepeatedTask<E> {
|
||||
/// Creates a new repeated task. The `initial_delay` is the delay before the first execution.
|
||||
/// `initial_delay` default is None, the initial interval uses the `interval`.
|
||||
/// You can use `with_initial_delay` to set the `initial_delay`.
|
||||
pub fn new(interval: Duration, task_fn: BoxedTaskFunction<E>) -> Self {
|
||||
Self {
|
||||
name: task_fn.name().to_string(),
|
||||
@@ -85,9 +90,15 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
|
||||
}),
|
||||
started: AtomicBool::new(false),
|
||||
interval,
|
||||
initial_delay: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_initial_delay(mut self, initial_delay: Option<Duration>) -> Self {
|
||||
self.initial_delay = initial_delay;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn started(&self) -> bool {
|
||||
self.started.load(Ordering::Relaxed)
|
||||
}
|
||||
@@ -99,17 +110,21 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
|
||||
IllegalStateSnafu { name: &self.name }
|
||||
);
|
||||
|
||||
let interval = self.interval;
|
||||
let child = self.cancel_token.child_token();
|
||||
// Safety: The task is not started.
|
||||
let mut task_fn = inner.task_fn.take().unwrap();
|
||||
let interval = self.interval;
|
||||
let mut initial_delay = self.initial_delay;
|
||||
// TODO(hl): Maybe spawn to a blocking runtime.
|
||||
let handle = runtime.spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(interval) => {}
|
||||
_ = child.cancelled() => {
|
||||
return;
|
||||
let sleep_time = initial_delay.take().unwrap_or(interval);
|
||||
if sleep_time > Duration::ZERO {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(sleep_time) => {}
|
||||
_ = child.cancelled() => {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Err(e) = task_fn.call().await {
|
||||
@@ -192,4 +207,21 @@ mod tests {
|
||||
|
||||
assert_eq!(n.load(Ordering::Relaxed), 5);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_repeated_task_prior_exec() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let n = Arc::new(AtomicI32::new(0));
|
||||
let task_fn = TickTask { n: n.clone() };
|
||||
|
||||
let task = RepeatedTask::new(Duration::from_millis(100), Box::new(task_fn))
|
||||
.with_initial_delay(Some(Duration::ZERO));
|
||||
|
||||
task.start(crate::bg_runtime()).unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(550)).await;
|
||||
task.stop().await.unwrap();
|
||||
|
||||
assert_eq!(n.load(Ordering::Relaxed), 6);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -654,6 +654,17 @@ impl ListValue {
|
||||
Arc::new(new_item_field(output_type.item_type().as_arrow_type())),
|
||||
))
|
||||
}
|
||||
|
||||
/// use 'the first item size' * 'length of items' to estimate the size.
|
||||
/// it could be inaccurate.
|
||||
fn estimated_size(&self) -> usize {
|
||||
if let Some(items) = &self.items {
|
||||
if let Some(item) = items.first() {
|
||||
return item.as_value_ref().data_size() * items.len();
|
||||
}
|
||||
}
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ListValue {
|
||||
@@ -1090,12 +1101,46 @@ impl<'a> PartialOrd for ListValueRef<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ValueRef<'a> {
|
||||
/// Returns the size of the underlying data in bytes,
|
||||
/// The size is estimated and only considers the data size.
|
||||
pub fn data_size(&self) -> usize {
|
||||
match *self {
|
||||
ValueRef::Null => 0,
|
||||
ValueRef::Boolean(_) => 1,
|
||||
ValueRef::UInt8(_) => 1,
|
||||
ValueRef::UInt16(_) => 2,
|
||||
ValueRef::UInt32(_) => 4,
|
||||
ValueRef::UInt64(_) => 8,
|
||||
ValueRef::Int8(_) => 1,
|
||||
ValueRef::Int16(_) => 2,
|
||||
ValueRef::Int32(_) => 4,
|
||||
ValueRef::Int64(_) => 8,
|
||||
ValueRef::Float32(_) => 4,
|
||||
ValueRef::Float64(_) => 8,
|
||||
ValueRef::String(v) => std::mem::size_of_val(v),
|
||||
ValueRef::Binary(v) => std::mem::size_of_val(v),
|
||||
ValueRef::Date(_) => 4,
|
||||
ValueRef::DateTime(_) => 8,
|
||||
ValueRef::Timestamp(_) => 16,
|
||||
ValueRef::Time(_) => 16,
|
||||
ValueRef::Duration(_) => 16,
|
||||
ValueRef::Interval(_) => 24,
|
||||
ValueRef::List(v) => match v {
|
||||
ListValueRef::Indexed { vector, .. } => vector.memory_size() / vector.len(),
|
||||
ListValueRef::Ref { val } => val.estimated_size(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
use num_traits::Float;
|
||||
|
||||
use super::*;
|
||||
use crate::vectors::ListVectorBuilder;
|
||||
|
||||
#[test]
|
||||
fn test_try_from_scalar_value() {
|
||||
@@ -2158,4 +2203,90 @@ mod tests {
|
||||
duration_to_scalar_value(TimeUnit::Nanosecond, Some(1))
|
||||
);
|
||||
}
|
||||
|
||||
fn check_value_ref_size_eq(value_ref: &ValueRef, size: usize) {
|
||||
assert_eq!(value_ref.data_size(), size);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_value_ref_estimated_size() {
|
||||
assert_eq!(std::mem::size_of::<ValueRef>(), 24);
|
||||
|
||||
check_value_ref_size_eq(&ValueRef::Boolean(true), 1);
|
||||
check_value_ref_size_eq(&ValueRef::UInt8(1), 1);
|
||||
check_value_ref_size_eq(&ValueRef::UInt16(1), 2);
|
||||
check_value_ref_size_eq(&ValueRef::UInt32(1), 4);
|
||||
check_value_ref_size_eq(&ValueRef::UInt64(1), 8);
|
||||
check_value_ref_size_eq(&ValueRef::Int8(1), 1);
|
||||
check_value_ref_size_eq(&ValueRef::Int16(1), 2);
|
||||
check_value_ref_size_eq(&ValueRef::Int32(1), 4);
|
||||
check_value_ref_size_eq(&ValueRef::Int64(1), 8);
|
||||
check_value_ref_size_eq(&ValueRef::Float32(1.0.into()), 4);
|
||||
check_value_ref_size_eq(&ValueRef::Float64(1.0.into()), 8);
|
||||
check_value_ref_size_eq(&ValueRef::String("greptimedb"), 10);
|
||||
check_value_ref_size_eq(&ValueRef::Binary(b"greptimedb"), 10);
|
||||
check_value_ref_size_eq(&ValueRef::Date(Date::new(1)), 4);
|
||||
check_value_ref_size_eq(&ValueRef::DateTime(DateTime::new(1)), 8);
|
||||
check_value_ref_size_eq(&ValueRef::Timestamp(Timestamp::new_millisecond(1)), 16);
|
||||
check_value_ref_size_eq(&ValueRef::Time(Time::new_millisecond(1)), 16);
|
||||
check_value_ref_size_eq(
|
||||
&ValueRef::Interval(Interval::from_month_day_nano(1, 2, 3)),
|
||||
24,
|
||||
);
|
||||
check_value_ref_size_eq(&ValueRef::Duration(Duration::new_millisecond(1)), 16);
|
||||
check_value_ref_size_eq(
|
||||
&ValueRef::List(ListValueRef::Ref {
|
||||
val: &ListValue {
|
||||
items: Some(Box::new(vec![
|
||||
Value::String("hello world".into()),
|
||||
Value::String("greptimedb".into()),
|
||||
])),
|
||||
datatype: ConcreteDataType::string_datatype(),
|
||||
},
|
||||
}),
|
||||
22,
|
||||
);
|
||||
|
||||
let data = vec![
|
||||
Some(vec![Some(1), Some(2), Some(3)]),
|
||||
None,
|
||||
Some(vec![Some(4), None, Some(6)]),
|
||||
];
|
||||
let mut builder =
|
||||
ListVectorBuilder::with_type_capacity(ConcreteDataType::int32_datatype(), 8);
|
||||
for vec_opt in &data {
|
||||
if let Some(vec) = vec_opt {
|
||||
let values = vec.iter().map(|v| Value::from(*v)).collect();
|
||||
let values = Some(Box::new(values));
|
||||
let list_value = ListValue::new(values, ConcreteDataType::int32_datatype());
|
||||
|
||||
builder.push(Some(ListValueRef::Ref { val: &list_value }));
|
||||
} else {
|
||||
builder.push(None);
|
||||
}
|
||||
}
|
||||
let vector = builder.finish();
|
||||
|
||||
check_value_ref_size_eq(
|
||||
&ValueRef::List(ListValueRef::Indexed {
|
||||
vector: &vector,
|
||||
idx: 0,
|
||||
}),
|
||||
85,
|
||||
);
|
||||
check_value_ref_size_eq(
|
||||
&ValueRef::List(ListValueRef::Indexed {
|
||||
vector: &vector,
|
||||
idx: 1,
|
||||
}),
|
||||
85,
|
||||
);
|
||||
check_value_ref_size_eq(
|
||||
&ValueRef::List(ListValueRef::Indexed {
|
||||
vector: &vector,
|
||||
idx: 2,
|
||||
}),
|
||||
85,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,13 +12,13 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::InsertRequests;
|
||||
use async_trait::async_trait;
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
use common_error::ext::BoxedError;
|
||||
use servers::error as server_error;
|
||||
use servers::error::AuthSnafu;
|
||||
use servers::opentsdb::codec::DataPoint;
|
||||
use servers::opentsdb::data_point_to_grpc_row_insert_requests;
|
||||
use servers::query_handler::OpentsdbProtocolHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::prelude::*;
|
||||
@@ -27,23 +27,27 @@ use crate::instance::Instance;
|
||||
|
||||
#[async_trait]
|
||||
impl OpentsdbProtocolHandler for Instance {
|
||||
async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> server_error::Result<()> {
|
||||
async fn exec(
|
||||
&self,
|
||||
data_points: Vec<DataPoint>,
|
||||
ctx: QueryContextRef,
|
||||
) -> server_error::Result<usize> {
|
||||
self.plugins
|
||||
.get::<PermissionCheckerRef>()
|
||||
.as_ref()
|
||||
.check_permission(ctx.current_user(), PermissionReq::Opentsdb)
|
||||
.context(AuthSnafu)?;
|
||||
|
||||
let requests = InsertRequests {
|
||||
inserts: vec![data_point.as_grpc_insert()],
|
||||
};
|
||||
let _ = self
|
||||
.handle_inserts(requests, ctx)
|
||||
let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
|
||||
let output = self
|
||||
.handle_row_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| server_error::ExecuteQuerySnafu {
|
||||
query: format!("{data_point:?}"),
|
||||
})?;
|
||||
Ok(())
|
||||
.context(servers::error::ExecuteGrpcQuerySnafu)?;
|
||||
|
||||
Ok(match output {
|
||||
common_query::Output::AffectedRows(rows) => rows,
|
||||
_ => unreachable!(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ common-base = { workspace = true }
|
||||
common-config = { workspace = true }
|
||||
common-error = { workspace = true }
|
||||
common-macro = { workspace = true }
|
||||
common-meta = { workspace = true }
|
||||
common-meta = { workspace = true, features = ["testing"] }
|
||||
common-runtime = { workspace = true }
|
||||
common-telemetry = { workspace = true }
|
||||
futures-util.workspace = true
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
//! [KvBackend] implementation based on [raft_engine::Engine].
|
||||
|
||||
use std::any::Any;
|
||||
use std::ops::Bound::{Excluded, Included, Unbounded};
|
||||
use std::sync::RwLock;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
@@ -28,6 +29,7 @@ use common_meta::rpc::store::{
|
||||
RangeRequest, RangeResponse,
|
||||
};
|
||||
use common_meta::rpc::KeyValue;
|
||||
use common_meta::util::get_next_prefix_key;
|
||||
use raft_engine::{Config, Engine, LogBatch};
|
||||
use snafu::ResultExt;
|
||||
|
||||
@@ -137,29 +139,48 @@ impl KvBackend for RaftEngineBackend {
|
||||
|
||||
async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error> {
|
||||
let mut res = vec![];
|
||||
let (start, end) = req.range();
|
||||
let RangeRequest {
|
||||
keys_only, limit, ..
|
||||
} = req;
|
||||
|
||||
let (start_key, end_key) = match (start, end) {
|
||||
(Included(start), Included(end)) => (Some(start), Some(get_next_prefix_key(&end))),
|
||||
(Unbounded, Unbounded) => (None, None),
|
||||
(Included(start), Excluded(end)) => (Some(start), Some(end)),
|
||||
(Included(start), Unbounded) => (Some(start), None),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let mut more = false;
|
||||
let mut iter = 0;
|
||||
|
||||
self.engine
|
||||
.read()
|
||||
.unwrap()
|
||||
.scan_raw_messages(
|
||||
SYSTEM_NAMESPACE,
|
||||
Some(&req.key),
|
||||
Some(&req.range_end),
|
||||
start_key.as_deref(),
|
||||
end_key.as_deref(),
|
||||
false,
|
||||
|key, value| {
|
||||
res.push(KeyValue {
|
||||
key: key.to_vec(),
|
||||
value: value.to_vec(),
|
||||
});
|
||||
true
|
||||
let take = limit == 0 || iter != limit;
|
||||
iter += 1;
|
||||
more = limit > 0 && iter > limit;
|
||||
|
||||
if take {
|
||||
res.push(KeyValue {
|
||||
key: key.to_vec(),
|
||||
value: if keys_only { vec![] } else { value.to_vec() },
|
||||
});
|
||||
}
|
||||
|
||||
take
|
||||
},
|
||||
)
|
||||
.context(RaftEngineSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.context(meta_error::ExternalSnafu)?;
|
||||
Ok(RangeResponse {
|
||||
kvs: res,
|
||||
more: false,
|
||||
})
|
||||
Ok(RangeResponse { kvs: res, more })
|
||||
}
|
||||
|
||||
async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error> {
|
||||
@@ -275,7 +296,7 @@ impl KvBackend for RaftEngineBackend {
|
||||
key,
|
||||
range_end,
|
||||
limit: 0,
|
||||
keys_only: true,
|
||||
keys_only: false,
|
||||
};
|
||||
let range_resp = self.range(range).await?;
|
||||
|
||||
@@ -383,7 +404,12 @@ fn engine_delete(engine: &Engine, key: &[u8]) -> meta_error::Result<()> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::kv_backend::test::{
|
||||
prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
|
||||
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
|
||||
};
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use raft_engine::{Config, ReadableSize, RecoveryMode};
|
||||
|
||||
@@ -615,4 +641,66 @@ mod tests {
|
||||
keys
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_range() {
|
||||
let dir = create_temp_dir("range");
|
||||
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
|
||||
prepare_kv(&backend).await;
|
||||
|
||||
test_kv_range(backend).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_range_2() {
|
||||
let dir = create_temp_dir("range2");
|
||||
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
|
||||
|
||||
test_kv_range_2(backend).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put() {
|
||||
let dir = create_temp_dir("put");
|
||||
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
|
||||
prepare_kv(&backend).await;
|
||||
|
||||
test_kv_put(backend).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_batch_get() {
|
||||
let dir = create_temp_dir("batch_get");
|
||||
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
|
||||
prepare_kv(&backend).await;
|
||||
|
||||
test_kv_batch_get(backend).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_batch_delete() {
|
||||
let dir = create_temp_dir("batch_delete");
|
||||
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
|
||||
prepare_kv(&backend).await;
|
||||
|
||||
test_kv_batch_delete(backend).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_range() {
|
||||
let dir = create_temp_dir("delete_range");
|
||||
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
|
||||
prepare_kv(&backend).await;
|
||||
|
||||
test_kv_delete_range(backend).await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_compare_and_put_2() {
|
||||
let dir = create_temp_dir("compare_and_put");
|
||||
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
|
||||
prepare_kv(&backend).await;
|
||||
|
||||
test_kv_compare_and_put(Arc::new(backend)).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ catalog = { workspace = true }
|
||||
common-catalog = { workspace = true }
|
||||
common-error = { workspace = true }
|
||||
common-macro = { workspace = true }
|
||||
common-recordbatch = { workspace = true }
|
||||
common-telemetry = { workspace = true }
|
||||
datafusion.workspace = true
|
||||
datatypes = { workspace = true }
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
mod empty_metric;
|
||||
mod histogram_fold;
|
||||
mod instant_manipulate;
|
||||
mod normalize;
|
||||
mod planner;
|
||||
|
||||
798
src/promql/src/extension_plan/histogram_fold.rs
Normal file
798
src/promql/src/extension_plan/histogram_fold.rs
Normal file
@@ -0,0 +1,798 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
use std::time::Instant;
|
||||
|
||||
use common_recordbatch::RecordBatch as GtRecordBatch;
|
||||
use common_telemetry::warn;
|
||||
use datafusion::arrow::array::AsArray;
|
||||
use datafusion::arrow::compute::{self, concat_batches, SortOptions};
|
||||
use datafusion::arrow::datatypes::{DataType, Field, Float64Type, SchemaRef};
|
||||
use datafusion::arrow::record_batch::RecordBatch;
|
||||
use datafusion::common::{DFField, DFSchema, DFSchemaRef};
|
||||
use datafusion::error::{DataFusionError, Result as DataFusionResult};
|
||||
use datafusion::execution::TaskContext;
|
||||
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore};
|
||||
use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
|
||||
use datafusion::physical_plan::expressions::Column as PhyColumn;
|
||||
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
|
||||
use datafusion::physical_plan::{
|
||||
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
|
||||
RecordBatchStream, SendableRecordBatchStream, Statistics,
|
||||
};
|
||||
use datafusion::prelude::{Column, Expr};
|
||||
use datatypes::prelude::{ConcreteDataType, DataType as GtDataType};
|
||||
use datatypes::schema::Schema as GtSchema;
|
||||
use datatypes::value::{ListValue, Value};
|
||||
use datatypes::vectors::MutableVector;
|
||||
use futures::{ready, Stream, StreamExt};
|
||||
|
||||
/// `HistogramFold` will fold the conventional (non-native) histogram ([1]) for later
|
||||
/// computing. Specifically, it will transform the `le` and `field` column into a complex
|
||||
/// type, and samples on other tag columns:
|
||||
/// - `le` will become a [ListArray] of [f64]. With each bucket bound parsed
|
||||
/// - `field` will become a [ListArray] of [f64]
|
||||
/// - other columns will be sampled every `bucket_num` element, but their types won't change.
|
||||
///
|
||||
/// Due to the folding or sampling, the output rows number will become `input_rows` / `bucket_num`.
|
||||
///
|
||||
/// # Requirement
|
||||
/// - Input should be sorted on `<tag list>, le ASC, ts`.
|
||||
/// - The value set of `le` should be same. I.e., buckets of every series should be same.
|
||||
///
|
||||
/// [1]: https://prometheus.io/docs/concepts/metric_types/#histogram
|
||||
#[derive(Debug, PartialEq, Eq, Hash)]
|
||||
pub struct HistogramFold {
|
||||
/// Name of the `le` column. It's a special column in prometheus
|
||||
/// for implementing conventional histogram. It's a string column
|
||||
/// with "literal" float value, like "+Inf", "0.001" etc.
|
||||
le_column: String,
|
||||
ts_column: String,
|
||||
input: LogicalPlan,
|
||||
field_column: String,
|
||||
output_schema: DFSchemaRef,
|
||||
}
|
||||
|
||||
impl UserDefinedLogicalNodeCore for HistogramFold {
|
||||
fn name(&self) -> &str {
|
||||
Self::name()
|
||||
}
|
||||
|
||||
fn inputs(&self) -> Vec<&LogicalPlan> {
|
||||
vec![&self.input]
|
||||
}
|
||||
|
||||
fn schema(&self) -> &DFSchemaRef {
|
||||
&self.output_schema
|
||||
}
|
||||
|
||||
fn expressions(&self) -> Vec<Expr> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"HistogramFold: le={}, field={}",
|
||||
self.le_column, self.field_column
|
||||
)
|
||||
}
|
||||
|
||||
fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
|
||||
Self {
|
||||
le_column: self.le_column.clone(),
|
||||
ts_column: self.ts_column.clone(),
|
||||
input: inputs[0].clone(),
|
||||
field_column: self.field_column.clone(),
|
||||
// This method cannot return error. Otherwise we should re-calculate
|
||||
// the output schema
|
||||
output_schema: self.output_schema.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HistogramFold {
|
||||
#[allow(dead_code)]
|
||||
pub fn new(
|
||||
le_column: String,
|
||||
field_column: String,
|
||||
ts_column: String,
|
||||
input: LogicalPlan,
|
||||
) -> DataFusionResult<Self> {
|
||||
let input_schema = input.schema();
|
||||
Self::check_schema(input_schema, &le_column, &field_column, &ts_column)?;
|
||||
let output_schema = Self::convert_schema(input_schema, &le_column, &field_column)?;
|
||||
Ok(Self {
|
||||
le_column,
|
||||
ts_column,
|
||||
input,
|
||||
field_column,
|
||||
output_schema,
|
||||
})
|
||||
}
|
||||
|
||||
pub const fn name() -> &'static str {
|
||||
"HistogramFold"
|
||||
}
|
||||
|
||||
fn check_schema(
|
||||
input_schema: &DFSchemaRef,
|
||||
le_column: &str,
|
||||
field_column: &str,
|
||||
ts_column: &str,
|
||||
) -> DataFusionResult<()> {
|
||||
let check_column = |col| {
|
||||
if !input_schema.has_column_with_unqualified_name(col) {
|
||||
return Err(DataFusionError::SchemaError(
|
||||
datafusion::common::SchemaError::FieldNotFound {
|
||||
field: Box::new(Column::new(None::<String>, col)),
|
||||
valid_fields: input_schema
|
||||
.fields()
|
||||
.iter()
|
||||
.map(|f| f.qualified_column())
|
||||
.collect(),
|
||||
},
|
||||
));
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
};
|
||||
|
||||
check_column(le_column)?;
|
||||
check_column(ts_column)?;
|
||||
check_column(field_column)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
|
||||
let input_schema = self.input.schema();
|
||||
// safety: those fields are checked in `check_schema()`
|
||||
let le_column_index = input_schema
|
||||
.index_of_column_by_name(None, &self.le_column)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let field_column_index = input_schema
|
||||
.index_of_column_by_name(None, &self.field_column)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let ts_column_index = input_schema
|
||||
.index_of_column_by_name(None, &self.ts_column)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
Arc::new(HistogramFoldExec {
|
||||
le_column_index,
|
||||
field_column_index,
|
||||
ts_column_index,
|
||||
input: exec_input,
|
||||
output_schema: Arc::new(self.output_schema.as_ref().into()),
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Transform the schema
|
||||
///
|
||||
/// - `le` will become a [ListArray] of [f64]. With each bucket bound parsed
|
||||
/// - `field` will become a [ListArray] of [f64]
|
||||
fn convert_schema(
|
||||
input_schema: &DFSchemaRef,
|
||||
le_column: &str,
|
||||
field_column: &str,
|
||||
) -> DataFusionResult<DFSchemaRef> {
|
||||
let mut fields = input_schema.fields().clone();
|
||||
// safety: those fields are checked in `check_schema()`
|
||||
let le_column_idx = input_schema
|
||||
.index_of_column_by_name(None, le_column)?
|
||||
.unwrap();
|
||||
let field_column_idx = input_schema
|
||||
.index_of_column_by_name(None, field_column)?
|
||||
.unwrap();
|
||||
|
||||
// transform `le`
|
||||
let le_field: Field = fields[le_column_idx].field().as_ref().clone();
|
||||
let le_field = le_field.with_data_type(DataType::Float64);
|
||||
let folded_le_datatype = DataType::List(Arc::new(le_field));
|
||||
let folded_le = DFField::new(
|
||||
fields[le_column_idx].qualifier().cloned(),
|
||||
fields[le_column_idx].name(),
|
||||
folded_le_datatype,
|
||||
false,
|
||||
);
|
||||
|
||||
// transform `field`
|
||||
// to avoid ambiguity, that field will be referenced as `the_field` below.
|
||||
let the_field: Field = fields[field_column_idx].field().as_ref().clone();
|
||||
let folded_field_datatype = DataType::List(Arc::new(the_field));
|
||||
let folded_field = DFField::new(
|
||||
fields[field_column_idx].qualifier().cloned(),
|
||||
fields[field_column_idx].name(),
|
||||
folded_field_datatype,
|
||||
false,
|
||||
);
|
||||
|
||||
fields[le_column_idx] = folded_le;
|
||||
fields[field_column_idx] = folded_field;
|
||||
|
||||
Ok(Arc::new(DFSchema::new_with_metadata(
|
||||
fields,
|
||||
HashMap::new(),
|
||||
)?))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct HistogramFoldExec {
|
||||
/// Index for `le` column in the schema of input.
|
||||
le_column_index: usize,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
output_schema: SchemaRef,
|
||||
/// Index for field column in the schema of input.
|
||||
field_column_index: usize,
|
||||
ts_column_index: usize,
|
||||
metric: ExecutionPlanMetricsSet,
|
||||
}
|
||||
|
||||
impl ExecutionPlan for HistogramFoldExec {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.output_schema.clone()
|
||||
}
|
||||
|
||||
fn output_partitioning(&self) -> Partitioning {
|
||||
self.input.output_partitioning()
|
||||
}
|
||||
|
||||
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
|
||||
self.input.output_ordering()
|
||||
}
|
||||
|
||||
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
|
||||
let mut cols = self
|
||||
.tag_col_exprs()
|
||||
.into_iter()
|
||||
.map(|expr| PhysicalSortRequirement {
|
||||
expr,
|
||||
options: None,
|
||||
})
|
||||
.collect::<Vec<PhysicalSortRequirement>>();
|
||||
// add le ASC
|
||||
cols.push(PhysicalSortRequirement {
|
||||
expr: Arc::new(PhyColumn::new(
|
||||
self.output_schema.field(self.le_column_index).name(),
|
||||
self.le_column_index,
|
||||
)),
|
||||
options: Some(SortOptions {
|
||||
descending: false, // +INF in the last
|
||||
nulls_first: false, // not nullable
|
||||
}),
|
||||
});
|
||||
// add ts
|
||||
cols.push(PhysicalSortRequirement {
|
||||
expr: Arc::new(PhyColumn::new(
|
||||
self.output_schema.field(self.ts_column_index).name(),
|
||||
self.ts_column_index,
|
||||
)),
|
||||
options: None,
|
||||
});
|
||||
|
||||
vec![Some(cols)]
|
||||
}
|
||||
|
||||
fn required_input_distribution(&self) -> Vec<Distribution> {
|
||||
// partition on all tag columns, i.e., non-le, non-ts and non-field columns
|
||||
vec![Distribution::HashPartitioned(self.tag_col_exprs())]
|
||||
}
|
||||
|
||||
fn maintains_input_order(&self) -> Vec<bool> {
|
||||
vec![true; self.children().len()]
|
||||
}
|
||||
|
||||
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
|
||||
vec![self.input.clone()]
|
||||
}
|
||||
|
||||
// cannot change schema with this method
|
||||
fn with_new_children(
|
||||
self: Arc<Self>,
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
|
||||
assert!(!children.is_empty());
|
||||
Ok(Arc::new(Self {
|
||||
input: children[0].clone(),
|
||||
metric: self.metric.clone(),
|
||||
le_column_index: self.le_column_index,
|
||||
ts_column_index: self.ts_column_index,
|
||||
output_schema: self.output_schema.clone(),
|
||||
field_column_index: self.field_column_index,
|
||||
}))
|
||||
}
|
||||
|
||||
fn execute(
|
||||
&self,
|
||||
partition: usize,
|
||||
context: Arc<TaskContext>,
|
||||
) -> DataFusionResult<SendableRecordBatchStream> {
|
||||
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
|
||||
|
||||
let batch_size = context.session_config().batch_size();
|
||||
let input = self.input.execute(partition, context)?;
|
||||
let output_schema = self.output_schema.clone();
|
||||
|
||||
let mut normal_indices = (0..output_schema.fields().len()).collect::<HashSet<_>>();
|
||||
normal_indices.remove(&self.le_column_index);
|
||||
normal_indices.remove(&self.field_column_index);
|
||||
Ok(Box::pin(HistogramFoldStream {
|
||||
le_column_index: self.le_column_index,
|
||||
field_column_index: self.field_column_index,
|
||||
normal_indices: normal_indices.into_iter().collect(),
|
||||
bucket_size: None,
|
||||
input_buffer: vec![],
|
||||
input,
|
||||
output_schema,
|
||||
metric: baseline_metric,
|
||||
batch_size,
|
||||
input_buffered_rows: 0,
|
||||
output_buffer: HistogramFoldStream::empty_output_buffer(&self.output_schema)?,
|
||||
output_buffered_rows: 0,
|
||||
}))
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<MetricsSet> {
|
||||
Some(self.metric.clone_inner())
|
||||
}
|
||||
|
||||
fn statistics(&self) -> Statistics {
|
||||
Statistics {
|
||||
num_rows: None,
|
||||
total_byte_size: None,
|
||||
column_statistics: None,
|
||||
is_exact: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HistogramFoldExec {
|
||||
/// Return all the [PhysicalExpr] of tag columns in order.
|
||||
///
|
||||
/// Tag columns are all columns except `le`, `field` and `ts` columns.
|
||||
pub fn tag_col_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
|
||||
self.input
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(idx, field)| {
|
||||
if idx == self.le_column_index
|
||||
|| idx == self.field_column_index
|
||||
|| idx == self.ts_column_index
|
||||
{
|
||||
None
|
||||
} else {
|
||||
Some(Arc::new(PhyColumn::new(field.name(), idx)) as _)
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for HistogramFoldExec {
|
||||
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match t {
|
||||
DisplayFormatType::Default | DisplayFormatType::Verbose => {
|
||||
write!(
|
||||
f,
|
||||
"HistogramFoldExec: le=@{}, field=@{}",
|
||||
self.le_column_index, self.field_column_index
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct HistogramFoldStream {
|
||||
// internal states
|
||||
le_column_index: usize,
|
||||
field_column_index: usize,
|
||||
/// Columns need not folding
|
||||
normal_indices: Vec<usize>,
|
||||
bucket_size: Option<usize>,
|
||||
/// Expected output batch size
|
||||
batch_size: usize,
|
||||
output_schema: SchemaRef,
|
||||
|
||||
// buffers
|
||||
input_buffer: Vec<RecordBatch>,
|
||||
input_buffered_rows: usize,
|
||||
output_buffer: Vec<Box<dyn MutableVector>>,
|
||||
output_buffered_rows: usize,
|
||||
|
||||
// runtime things
|
||||
input: SendableRecordBatchStream,
|
||||
metric: BaselineMetrics,
|
||||
}
|
||||
|
||||
impl RecordBatchStream for HistogramFoldStream {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.output_schema.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for HistogramFoldStream {
|
||||
type Item = DataFusionResult<RecordBatch>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
let poll = loop {
|
||||
match ready!(self.input.poll_next_unpin(cx)) {
|
||||
Some(batch) => {
|
||||
let batch = batch?;
|
||||
let timer = Instant::now();
|
||||
let Some(result) = self.fold_input(batch)? else {
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
continue;
|
||||
};
|
||||
self.metric.elapsed_compute().add_elapsed(timer);
|
||||
break Poll::Ready(Some(result));
|
||||
}
|
||||
None => break Poll::Ready(self.take_output_buf()?.map(Ok)),
|
||||
}
|
||||
};
|
||||
self.metric.record_poll(poll)
|
||||
}
|
||||
}
|
||||
|
||||
impl HistogramFoldStream {
|
||||
/// The inner most `Result` is for `poll_next()`
|
||||
pub fn fold_input(
|
||||
&mut self,
|
||||
input: RecordBatch,
|
||||
) -> DataFusionResult<Option<DataFusionResult<RecordBatch>>> {
|
||||
let Some(bucket_num) = self.calculate_bucket_num(&input)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
if self.input_buffered_rows + input.num_rows() < bucket_num {
|
||||
// not enough rows to fold
|
||||
self.push_input_buf(input);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
self.fold_buf(bucket_num, input)?;
|
||||
if self.output_buffered_rows >= self.batch_size {
|
||||
return Ok(self.take_output_buf()?.map(Ok));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub fn empty_output_buffer(
|
||||
schema: &SchemaRef,
|
||||
) -> DataFusionResult<Vec<Box<dyn MutableVector>>> {
|
||||
let mut builders = Vec::with_capacity(schema.fields().len());
|
||||
for field in schema.fields() {
|
||||
let concrete_datatype = ConcreteDataType::try_from(field.data_type()).unwrap();
|
||||
let mutable_vector = concrete_datatype.create_mutable_vector(0);
|
||||
builders.push(mutable_vector);
|
||||
}
|
||||
|
||||
Ok(builders)
|
||||
}
|
||||
|
||||
fn calculate_bucket_num(&mut self, batch: &RecordBatch) -> DataFusionResult<Option<usize>> {
|
||||
if let Some(size) = self.bucket_size {
|
||||
return Ok(Some(size));
|
||||
}
|
||||
|
||||
let inf_pos = self.find_positive_inf(batch)?;
|
||||
if inf_pos == batch.num_rows() {
|
||||
// no positive inf found, append to buffer and wait for next batch
|
||||
self.push_input_buf(batch.clone());
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// else we found the positive inf.
|
||||
// calculate the bucket size
|
||||
let bucket_size = inf_pos + self.input_buffered_rows + 1;
|
||||
Ok(Some(bucket_size))
|
||||
}
|
||||
|
||||
/// Fold record batches from input buffer and put to output buffer
|
||||
fn fold_buf(&mut self, bucket_num: usize, input: RecordBatch) -> DataFusionResult<()> {
|
||||
self.push_input_buf(input);
|
||||
// TODO(ruihang): this concat is avoidable.
|
||||
let batch = concat_batches(&self.input.schema(), self.input_buffer.drain(..).as_ref())?;
|
||||
let mut remaining_rows = self.input_buffered_rows;
|
||||
let mut cursor = 0;
|
||||
|
||||
let gt_schema = GtSchema::try_from(self.input.schema()).unwrap();
|
||||
let batch = GtRecordBatch::try_from_df_record_batch(Arc::new(gt_schema), batch).unwrap();
|
||||
|
||||
while remaining_rows >= bucket_num {
|
||||
// "sample" normal columns
|
||||
for normal_index in &self.normal_indices {
|
||||
let val = batch.column(*normal_index).get(cursor);
|
||||
self.output_buffer[*normal_index].push_value_ref(val.as_value_ref());
|
||||
}
|
||||
// "fold" `le` and field columns
|
||||
let le_array = batch.column(self.le_column_index);
|
||||
let field_array = batch.column(self.field_column_index);
|
||||
let mut le_item = vec![];
|
||||
let mut field_item = vec![];
|
||||
for bias in 0..bucket_num {
|
||||
let le_str_val = le_array.get(cursor + bias);
|
||||
let le_str_val_ref = le_str_val.as_value_ref();
|
||||
let le_str = le_str_val_ref
|
||||
.as_string()
|
||||
.unwrap()
|
||||
.expect("le column should not be nullable");
|
||||
let le = le_str.parse::<f64>().unwrap();
|
||||
let le_val = Value::from(le);
|
||||
le_item.push(le_val);
|
||||
|
||||
let field = field_array.get(cursor + bias);
|
||||
field_item.push(field);
|
||||
}
|
||||
let le_list_val = Value::List(ListValue::new(
|
||||
Some(Box::new(le_item)),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
));
|
||||
let field_list_val = Value::List(ListValue::new(
|
||||
Some(Box::new(field_item)),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
));
|
||||
self.output_buffer[self.le_column_index].push_value_ref(le_list_val.as_value_ref());
|
||||
self.output_buffer[self.field_column_index]
|
||||
.push_value_ref(field_list_val.as_value_ref());
|
||||
|
||||
cursor += bucket_num;
|
||||
remaining_rows -= bucket_num;
|
||||
self.output_buffered_rows += 1;
|
||||
}
|
||||
|
||||
let remaining_input_batch = batch.into_df_record_batch().slice(cursor, remaining_rows);
|
||||
self.input_buffered_rows = remaining_input_batch.num_rows();
|
||||
self.input_buffer.push(remaining_input_batch);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn push_input_buf(&mut self, batch: RecordBatch) {
|
||||
self.input_buffered_rows += batch.num_rows();
|
||||
self.input_buffer.push(batch);
|
||||
}
|
||||
|
||||
fn take_output_buf(&mut self) -> DataFusionResult<Option<RecordBatch>> {
|
||||
if self.output_buffered_rows == 0 {
|
||||
if self.input_buffered_rows != 0 {
|
||||
warn!(
|
||||
"input buffer is not empty, {} rows remaining",
|
||||
self.input_buffered_rows
|
||||
);
|
||||
}
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut output_buf = Self::empty_output_buffer(&self.output_schema)?;
|
||||
std::mem::swap(&mut self.output_buffer, &mut output_buf);
|
||||
let mut columns = Vec::with_capacity(output_buf.len());
|
||||
for builder in output_buf.iter_mut() {
|
||||
columns.push(builder.to_vector().to_arrow_array());
|
||||
}
|
||||
|
||||
// overwrite default list datatype to change field name
|
||||
columns[self.le_column_index] = compute::cast(
|
||||
&columns[self.le_column_index],
|
||||
self.output_schema.field(self.le_column_index).data_type(),
|
||||
)?;
|
||||
columns[self.field_column_index] = compute::cast(
|
||||
&columns[self.field_column_index],
|
||||
self.output_schema
|
||||
.field(self.field_column_index)
|
||||
.data_type(),
|
||||
)?;
|
||||
|
||||
self.output_buffered_rows = 0;
|
||||
RecordBatch::try_new(self.output_schema.clone(), columns)
|
||||
.map(Some)
|
||||
.map_err(DataFusionError::ArrowError)
|
||||
}
|
||||
|
||||
/// Find the first `+Inf` which indicates the end of the bucket group
|
||||
///
|
||||
/// If the return value equals to batch's num_rows means the it's not found
|
||||
/// in this batch
|
||||
fn find_positive_inf(&self, batch: &RecordBatch) -> DataFusionResult<usize> {
|
||||
// fuse this function. It should not be called when the
|
||||
// bucket size is already know.
|
||||
if let Some(bucket_size) = self.bucket_size {
|
||||
return Ok(bucket_size);
|
||||
}
|
||||
let string_le_array = batch.column(self.le_column_index);
|
||||
let float_le_array = compute::cast(&string_le_array, &DataType::Float64).map_err(|e| {
|
||||
DataFusionError::Execution(format!(
|
||||
"cannot cast {} array to float64 array: {:?}",
|
||||
string_le_array.data_type(),
|
||||
e
|
||||
))
|
||||
})?;
|
||||
let le_as_f64_array = float_le_array
|
||||
.as_primitive_opt::<Float64Type>()
|
||||
.ok_or_else(|| {
|
||||
DataFusionError::Execution(format!(
|
||||
"expect a float64 array, but found {}",
|
||||
float_le_array.data_type()
|
||||
))
|
||||
})?;
|
||||
for (i, v) in le_as_f64_array.iter().enumerate() {
|
||||
if let Some(v) = v && v == f64::INFINITY {
|
||||
return Ok(i);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(batch.num_rows())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::arrow::array::Float64Array;
|
||||
use datafusion::arrow::datatypes::Schema;
|
||||
use datafusion::common::ToDFSchema;
|
||||
use datafusion::physical_plan::memory::MemoryExec;
|
||||
use datafusion::prelude::SessionContext;
|
||||
use datatypes::arrow_array::StringArray;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn prepare_test_data() -> MemoryExec {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new("le", DataType::Utf8, true),
|
||||
Field::new("val", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
// 12 items
|
||||
let host_column_1 = Arc::new(StringArray::from(vec![
|
||||
"host_1", "host_1", "host_1", "host_1", "host_1", "host_1", "host_1", "host_1",
|
||||
"host_1", "host_1", "host_1", "host_1",
|
||||
])) as _;
|
||||
let le_column_1 = Arc::new(StringArray::from(vec![
|
||||
"0.001", "0.1", "10", "1000", "+Inf", "0.001", "0.1", "10", "1000", "+inf", "0.001",
|
||||
"0.1",
|
||||
])) as _;
|
||||
let val_column_1 = Arc::new(Float64Array::from(vec![
|
||||
0_0.0, 1.0, 1.0, 5.0, 5.0, 0_0.0, 20.0, 60.0, 70.0, 100.0, 0_1.0, 1.0,
|
||||
])) as _;
|
||||
|
||||
// 2 items
|
||||
let host_column_2 = Arc::new(StringArray::from(vec!["host_1", "host_1"])) as _;
|
||||
let le_column_2 = Arc::new(StringArray::from(vec!["10", "1000"])) as _;
|
||||
let val_column_2 = Arc::new(Float64Array::from(vec![1.0, 1.0])) as _;
|
||||
|
||||
// 11 items
|
||||
let host_column_3 = Arc::new(StringArray::from(vec![
|
||||
"host_1", "host_2", "host_2", "host_2", "host_2", "host_2", "host_2", "host_2",
|
||||
"host_2", "host_2", "host_2",
|
||||
])) as _;
|
||||
let le_column_3 = Arc::new(StringArray::from(vec![
|
||||
"+INF", "0.001", "0.1", "10", "1000", "+iNf", "0.001", "0.1", "10", "1000", "+Inf",
|
||||
])) as _;
|
||||
let val_column_3 = Arc::new(Float64Array::from(vec![
|
||||
1.0, 0_0.0, 0.0, 0.0, 0.0, 0.0, 0_0.0, 1.0, 2.0, 3.0, 4.0,
|
||||
])) as _;
|
||||
|
||||
let data_1 = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![host_column_1, le_column_1, val_column_1],
|
||||
)
|
||||
.unwrap();
|
||||
let data_2 = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![host_column_2, le_column_2, val_column_2],
|
||||
)
|
||||
.unwrap();
|
||||
let data_3 = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![host_column_3, le_column_3, val_column_3],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
MemoryExec::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fold_overall() {
|
||||
let memory_exec = Arc::new(prepare_test_data());
|
||||
let output_schema = Arc::new(
|
||||
(*HistogramFold::convert_schema(
|
||||
&Arc::new(memory_exec.schema().to_dfschema().unwrap()),
|
||||
"le",
|
||||
"val",
|
||||
)
|
||||
.unwrap()
|
||||
.as_ref())
|
||||
.clone()
|
||||
.into(),
|
||||
);
|
||||
let fold_exec = Arc::new(HistogramFoldExec {
|
||||
le_column_index: 1,
|
||||
field_column_index: 2,
|
||||
ts_column_index: 9999, // not exist but doesn't matter
|
||||
input: memory_exec,
|
||||
output_schema,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
});
|
||||
|
||||
let session_context = SessionContext::default();
|
||||
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
|
||||
.await
|
||||
.unwrap();
|
||||
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
let expected = String::from(
|
||||
"+--------+---------------------------------+--------------------------------+
|
||||
| host | le | val |
|
||||
+--------+---------------------------------+--------------------------------+
|
||||
| host_1 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 1.0, 1.0, 5.0, 5.0] |
|
||||
| host_1 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 20.0, 60.0, 70.0, 100.0] |
|
||||
| host_1 | [0.001, 0.1, 10.0, 1000.0, inf] | [1.0, 1.0, 1.0, 1.0, 1.0] |
|
||||
| host_2 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 0.0, 0.0, 0.0, 0.0] |
|
||||
| host_2 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 1.0, 2.0, 3.0, 4.0] |
|
||||
+--------+---------------------------------+--------------------------------+",
|
||||
);
|
||||
assert_eq!(result_literal, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn confirm_schema() {
|
||||
let input_schema = Schema::new(vec![
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new("le", DataType::Utf8, true),
|
||||
Field::new("val", DataType::Float64, true),
|
||||
])
|
||||
.to_dfschema_ref()
|
||||
.unwrap();
|
||||
let expected_output_schema = Schema::new(vec![
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new(
|
||||
"le",
|
||||
DataType::List(Arc::new(Field::new("le", DataType::Float64, true))),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"val",
|
||||
DataType::List(Arc::new(Field::new("val", DataType::Float64, true))),
|
||||
false,
|
||||
),
|
||||
])
|
||||
.to_dfschema_ref()
|
||||
.unwrap();
|
||||
|
||||
let actual = HistogramFold::convert_schema(&input_schema, "le", "val").unwrap();
|
||||
assert_eq!(actual, expected_output_schema)
|
||||
}
|
||||
}
|
||||
@@ -61,6 +61,8 @@ use crate::functions::{
|
||||
|
||||
/// `time()` function in PromQL.
|
||||
const SPECIAL_TIME_FUNCTION: &str = "time";
|
||||
/// `histogram_quantile` function in PromQL
|
||||
const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
|
||||
|
||||
const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
|
||||
|
||||
@@ -440,6 +442,10 @@ impl PromPlanner {
|
||||
}));
|
||||
}
|
||||
|
||||
if func.name == SPECIAL_HISTOGRAM_QUANTILE {
|
||||
todo!()
|
||||
}
|
||||
|
||||
let args = self.create_function_args(&args.args)?;
|
||||
let input = self
|
||||
.prom_expr_to_plan(args.input.with_context(|| ExpectExprSnafu {
|
||||
|
||||
@@ -84,17 +84,19 @@ pub async fn put(
|
||||
let summary = params.contains_key("summary");
|
||||
let details = params.contains_key("details");
|
||||
|
||||
let data_points = parse_data_points(body).await?;
|
||||
let data_point_requests = parse_data_points(body).await?;
|
||||
let data_points = data_point_requests
|
||||
.iter()
|
||||
.map(|point| point.clone().into())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let response = if !summary && !details {
|
||||
for data_point in data_points.into_iter() {
|
||||
if let Err(e) = opentsdb_handler.exec(&data_point.into(), ctx.clone()).await {
|
||||
// Not debugging purpose, failed fast.
|
||||
return error::InternalSnafu {
|
||||
err_msg: e.to_string(),
|
||||
}
|
||||
.fail();
|
||||
if let Err(e) = opentsdb_handler.exec(data_points, ctx.clone()).await {
|
||||
// Not debugging purpose, failed fast.
|
||||
return error::InternalSnafu {
|
||||
err_msg: e.to_string(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
(HttpStatusCode::NO_CONTENT, Json(OpentsdbPutResponse::Empty))
|
||||
} else {
|
||||
@@ -108,15 +110,11 @@ pub async fn put(
|
||||
},
|
||||
};
|
||||
|
||||
for data_point in data_points.into_iter() {
|
||||
let result = opentsdb_handler
|
||||
.exec(&data_point.clone().into(), ctx.clone())
|
||||
.await;
|
||||
for (data_point, request) in data_points.into_iter().zip(data_point_requests) {
|
||||
let result = opentsdb_handler.exec(vec![data_point], ctx.clone()).await;
|
||||
match result {
|
||||
Ok(()) => response.on_success(),
|
||||
Err(e) => {
|
||||
response.on_failed(data_point, e);
|
||||
}
|
||||
Ok(affected_rows) => response.on_success(affected_rows),
|
||||
Err(e) => response.on_failed(request, e),
|
||||
}
|
||||
}
|
||||
(
|
||||
@@ -151,8 +149,8 @@ pub struct OpentsdbDebuggingResponse {
|
||||
}
|
||||
|
||||
impl OpentsdbDebuggingResponse {
|
||||
fn on_success(&mut self) {
|
||||
self.success += 1;
|
||||
fn on_success(&mut self, affected_rows: usize) {
|
||||
self.success += affected_rows as i32;
|
||||
}
|
||||
|
||||
fn on_failed(&mut self, datapoint: DataPointRequest, error: impl ErrorExt) {
|
||||
|
||||
@@ -20,16 +20,20 @@ use std::future::Future;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::RowInsertRequests;
|
||||
use async_trait::async_trait;
|
||||
use common_runtime::Runtime;
|
||||
use common_telemetry::logging::error;
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use self::codec::DataPoint;
|
||||
use crate::error::Result;
|
||||
use crate::opentsdb::connection::Connection;
|
||||
use crate::opentsdb::handler::Handler;
|
||||
use crate::prom_store::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME};
|
||||
use crate::query_handler::OpentsdbProtocolHandlerRef;
|
||||
use crate::row_writer::{self, MultiTableData};
|
||||
use crate::server::{AbortableStream, BaseTcpServer, Server};
|
||||
use crate::shutdown::Shutdown;
|
||||
|
||||
@@ -126,3 +130,38 @@ impl Server for OpentsdbServer {
|
||||
OPENTSDB_SERVER
|
||||
}
|
||||
}
|
||||
|
||||
pub fn data_point_to_grpc_row_insert_requests(
|
||||
data_points: Vec<DataPoint>,
|
||||
) -> Result<(RowInsertRequests, usize)> {
|
||||
let mut multi_table_data = MultiTableData::new();
|
||||
|
||||
for mut data_point in data_points {
|
||||
let tags: Vec<(String, String)> = std::mem::take(data_point.tags_mut());
|
||||
let table_name = data_point.metric();
|
||||
let value = data_point.value();
|
||||
let timestamp = data_point.ts_millis();
|
||||
// length of tags + 2 extra columns for greptime_timestamp and the value
|
||||
let num_columns = tags.len() + 2;
|
||||
|
||||
let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 1);
|
||||
let mut one_row = table_data.alloc_one_row();
|
||||
|
||||
// tags
|
||||
row_writer::write_tags(table_data, tags.into_iter(), &mut one_row)?;
|
||||
|
||||
// value
|
||||
row_writer::write_f64(table_data, FIELD_COLUMN_NAME, value, &mut one_row)?;
|
||||
// timestamp
|
||||
row_writer::write_ts_millis(
|
||||
table_data,
|
||||
TIMESTAMP_COLUMN_NAME,
|
||||
Some(timestamp),
|
||||
&mut one_row,
|
||||
)?;
|
||||
|
||||
table_data.add_row(one_row);
|
||||
}
|
||||
|
||||
Ok(multi_table_data.into_row_insert_requests())
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ use crate::error::{self, Result};
|
||||
pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
|
||||
pub const OPENTSDB_FIELD_COLUMN_NAME: &str = "greptime_value";
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DataPoint {
|
||||
metric: String,
|
||||
ts_millis: i64,
|
||||
@@ -115,6 +115,10 @@ impl DataPoint {
|
||||
&self.tags
|
||||
}
|
||||
|
||||
pub fn tags_mut(&mut self) -> &mut Vec<(String, String)> {
|
||||
&mut self.tags
|
||||
}
|
||||
|
||||
pub fn ts_millis(&self) -> i64 {
|
||||
self.ts_millis
|
||||
}
|
||||
|
||||
@@ -94,7 +94,7 @@ impl<S: AsyncWrite + AsyncRead + Unpin> Handler<S> {
|
||||
match DataPoint::try_create(&line) {
|
||||
Ok(data_point) => {
|
||||
let _timer = timer!(crate::metrics::METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED);
|
||||
let result = self.query_handler.exec(&data_point, ctx.clone()).await;
|
||||
let result = self.query_handler.exec(vec![data_point], ctx.clone()).await;
|
||||
if let Err(e) = result {
|
||||
self.connection.write_line(e.output_msg()).await?;
|
||||
}
|
||||
@@ -128,8 +128,8 @@ mod tests {
|
||||
|
||||
#[async_trait]
|
||||
impl OpentsdbProtocolHandler for DummyQueryHandler {
|
||||
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
|
||||
let metric = data_point.metric();
|
||||
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
|
||||
let metric = data_points.first().unwrap().metric();
|
||||
if metric == "should_failed" {
|
||||
return error::InternalSnafu {
|
||||
err_msg: "expected",
|
||||
@@ -137,7 +137,7 @@ mod tests {
|
||||
.fail();
|
||||
}
|
||||
self.tx.send(metric.to_string()).await.unwrap();
|
||||
Ok(())
|
||||
Ok(data_points.len())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -74,7 +74,7 @@ pub trait InfluxdbLineProtocolHandler {
|
||||
pub trait OpentsdbProtocolHandler {
|
||||
/// A successful request will not return a response.
|
||||
/// Only on error will the socket return a line of data.
|
||||
async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> Result<()>;
|
||||
async fn exec(&self, data_points: Vec<DataPoint>, ctx: QueryContextRef) -> Result<usize>;
|
||||
}
|
||||
|
||||
pub struct PromStoreResponse {
|
||||
|
||||
@@ -51,7 +51,8 @@ impl GrpcQueryHandler for DummyInstance {
|
||||
|
||||
#[async_trait]
|
||||
impl OpentsdbProtocolHandler for DummyInstance {
|
||||
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
|
||||
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
|
||||
let data_point = data_points.first().unwrap();
|
||||
if data_point.metric() == "should_failed" {
|
||||
return error::InternalSnafu {
|
||||
err_msg: "expected",
|
||||
@@ -59,7 +60,7 @@ impl OpentsdbProtocolHandler for DummyInstance {
|
||||
.fail();
|
||||
}
|
||||
let _ = self.tx.send(data_point.metric().to_string()).await;
|
||||
Ok(())
|
||||
Ok(data_points.len())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,10 +173,7 @@ async fn test_opentsdb_put() {
|
||||
while let Ok(s) = rx.try_recv() {
|
||||
metrics.push(s);
|
||||
}
|
||||
assert_eq!(
|
||||
metrics,
|
||||
vec!["m1".to_string(), "m2".to_string(), "m3".to_string()]
|
||||
);
|
||||
assert_eq!(metrics, vec!["m1".to_string(), "m2".to_string()]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -37,8 +37,8 @@ struct DummyOpentsdbInstance {
|
||||
|
||||
#[async_trait]
|
||||
impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
|
||||
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
|
||||
let metric = data_point.metric();
|
||||
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
|
||||
let metric = data_points.first().unwrap().metric();
|
||||
if metric == "should_failed" {
|
||||
return server_error::InternalSnafu {
|
||||
err_msg: "expected",
|
||||
@@ -47,7 +47,7 @@ impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
|
||||
}
|
||||
let i = metric.parse::<i32>().unwrap();
|
||||
let _ = self.tx.send(i * i).await;
|
||||
Ok(())
|
||||
Ok(data_points.len())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,8 @@ mod tests {
|
||||
|
||||
async fn test_exec(instance: &Arc<Instance>) {
|
||||
let ctx = QueryContext::arc();
|
||||
|
||||
// should create new table "my_metric_1" directly
|
||||
let data_point1 = DataPoint::new(
|
||||
"my_metric_1".to_string(),
|
||||
1000,
|
||||
@@ -55,9 +57,8 @@ mod tests {
|
||||
("tagk2".to_string(), "tagv2".to_string()),
|
||||
],
|
||||
);
|
||||
// should create new table "my_metric_1" directly
|
||||
instance.exec(&data_point1, ctx.clone()).await.unwrap();
|
||||
|
||||
// should create new column "tagk3" directly
|
||||
let data_point2 = DataPoint::new(
|
||||
"my_metric_1".to_string(),
|
||||
2000,
|
||||
@@ -67,12 +68,12 @@ mod tests {
|
||||
("tagk3".to_string(), "tagv3".to_string()),
|
||||
],
|
||||
);
|
||||
// should create new column "tagk3" directly
|
||||
instance.exec(&data_point2, ctx.clone()).await.unwrap();
|
||||
|
||||
let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
|
||||
// should handle null tags properly
|
||||
instance.exec(&data_point3, ctx.clone()).await.unwrap();
|
||||
let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
|
||||
|
||||
let data_points = vec![data_point1, data_point2, data_point3];
|
||||
instance.exec(data_points, ctx.clone()).await.unwrap();
|
||||
|
||||
let output = instance
|
||||
.do_query(
|
||||
@@ -87,13 +88,13 @@ mod tests {
|
||||
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let pretty_print = recordbatches.pretty_print().unwrap();
|
||||
let expected = vec![
|
||||
"+---------------------+----------------+-------+-------+-------+",
|
||||
"| greptime_timestamp | greptime_value | tagk1 | tagk2 | tagk3 |",
|
||||
"+---------------------+----------------+-------+-------+-------+",
|
||||
"| 1970-01-01T00:00:01 | 1.0 | tagv1 | tagv2 | |",
|
||||
"| 1970-01-01T00:00:02 | 2.0 | | tagv2 | tagv3 |",
|
||||
"| 1970-01-01T00:00:03 | 3.0 | | | |",
|
||||
"+---------------------+----------------+-------+-------+-------+",
|
||||
"+-------+-------+----------------+---------------------+-------+",
|
||||
"| tagk1 | tagk2 | greptime_value | greptime_timestamp | tagk3 |",
|
||||
"+-------+-------+----------------+---------------------+-------+",
|
||||
"| tagv1 | tagv2 | 1.0 | 1970-01-01T00:00:01 | |",
|
||||
"| | tagv2 | 2.0 | 1970-01-01T00:00:02 | tagv3 |",
|
||||
"| | | 3.0 | 1970-01-01T00:00:03 | |",
|
||||
"+-------+-------+----------------+---------------------+-------+",
|
||||
]
|
||||
.into_iter()
|
||||
.join("\n");
|
||||
|
||||
@@ -4,12 +4,11 @@ require_lease_before_startup = true
|
||||
rpc_addr = '127.0.0.1:4100'
|
||||
rpc_hostname = '127.0.0.1'
|
||||
rpc_runtime_size = 8
|
||||
require_lease_before_startup = true
|
||||
|
||||
[wal]
|
||||
file_size = '1GB'
|
||||
purge_interval = '10m'
|
||||
purge_threshold = '50GB'
|
||||
purge_threshold = '10GB'
|
||||
read_batch_size = 128
|
||||
sync_write = false
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ require_lease_before_startup = true
|
||||
[wal]
|
||||
file_size = '1GB'
|
||||
purge_interval = '10m'
|
||||
purge_threshold = '50GB'
|
||||
purge_threshold = '10GB'
|
||||
read_batch_size = 128
|
||||
sync_write = false
|
||||
|
||||
|
||||
@@ -235,6 +235,8 @@ impl Env {
|
||||
args.push(format!("--http-addr=127.0.0.1:430{id}"));
|
||||
args.push(format!("--data-home={}", data_home.display()));
|
||||
args.push(format!("--node-id={id}"));
|
||||
args.push("-c".to_string());
|
||||
args.push(self.generate_config_file(subcommand, db_ctx));
|
||||
args.push("--metasrv-addr=127.0.0.1:3002".to_string());
|
||||
(args, format!("127.0.0.1:410{id}"))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user