Compare commits

..

10 Commits

Author SHA1 Message Date
Ruihang Xia
4d478658b5 fix: pass datanode config file in distributed mode sqlness (#2631)
* fix: pass datanode config file in distributed mode sqlness

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2023-10-20 10:57:23 +00:00
localhost
89ebe47cd9 feat: RepeatedTask adds execute-first-wait-later behavior. (#2625)
* feat: RepeatedTask adds execute-first-wait-later behavior.

* feat: add inverval generator for repeate task component

* feat: impl debug for dyn IntervalGenerator trait

* chore: change some words

* chore: instead of complicated way, we add an initial_delay to control task interval

* chore: some improve by pr comment
2023-10-20 09:43:45 +00:00
Ruihang Xia
212ea2c25c feat: implement HistogramFold plan for prometheus histogram type (#2626)
* basic impl of fold plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add schema test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fill plan attributes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix styles

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* unify variable names

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2023-10-20 07:42:10 +00:00
Ruihang Xia
1658d088ab ci: add size labeler (#2628)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2023-10-20 06:39:13 +00:00
Baasit
346b57cf10 feat: row protocol support for opentsdb (#2623)
* feat: opentsdb row protocol

* fix: added commnets for num of rows and failure if output is not of affecetd rows

* fix: added extra 1 to number of columns

* fix: avoided cloning datapoints, took ownership instead

* fix: avoided cloning datapoints, took ownership instead

* fix: changed vecotr slice to vector

* fix: remove clone

* fix: combined datapoints and requests with zip instead of enumerating

---------

Co-authored-by: Ubuntu <ubuntu@ip-172-31-43-183.us-east-2.compute.internal>
2023-10-20 06:25:59 +00:00
Weny Xu
e1dcf83326 fix: correct the range behavior in MemoryKvBackend & RaftEngineBackend (#2615)
* fix: correct the range behavior in MemoryKvBackend & RaftEngineBackend

* refactor: migrate tests from MemoryKvBackend

* chore: apply suggestions from CR

* fix: fix license header

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* fix: fix range bugs
2023-10-20 02:30:47 +00:00
Ning Sun
b5d9d635eb ci: add slack notification for nightly ci failure (#2617) 2023-10-19 15:47:15 +00:00
zyy17
88dd78a69c ci: remove the old version python (#2624)
ci: remove old version python
2023-10-19 15:46:15 +00:00
zyy17
6439b929b3 ci: the 'publish-github-release' and 'release-cn-artifacts' have to wait for all the artifacts are built (#2622) 2023-10-19 21:05:44 +08:00
Wei
ba15c14103 feat: get internal value size of ValueRef (#2613)
* feat: impl byte_size

* chore: clippy

* chore: cr comment
2023-10-19 11:59:37 +08:00
30 changed files with 1668 additions and 363 deletions

View File

@@ -34,6 +34,14 @@ jobs:
uses: Swatinem/rust-cache@v2
- name: Run sqlness
run: cargo sqlness
- name: Notify slack if failed
if: failure()
uses: slackapi/slack-github-action@v1.23.0
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL_DEVELOP_CHANNEL }}
with:
payload: |
{"text": "Nightly CI failed for sqlness tests"}
- name: Upload sqlness logs
if: always()
uses: actions/upload-artifact@v3
@@ -80,3 +88,11 @@ jobs:
GT_S3_ACCESS_KEY: ${{ secrets.S3_ACCESS_KEY }}
GT_S3_REGION: ${{ secrets.S3_REGION }}
UNITTEST_LOG_DIR: "__unittest_logs"
- name: Notify slack if failed
if: failure()
uses: slackapi/slack-github-action@v1.23.0
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL_DEVELOP_CHANNEL }}
with:
payload: |
{"text": "Nightly CI failed for cargo test"}

View File

@@ -302,8 +302,12 @@ jobs:
release-cn-artifacts:
name: Release artifacts to CN region
if: ${{ inputs.release_images || github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [
needs: [ # The job have to wait for all the artifacts are built.
allocate-runners,
build-linux-amd64-artifacts,
build-linux-arm64-artifacts,
build-macos-artifacts,
build-windows-artifacts,
release-images-to-dockerhub,
]
runs-on: ubuntu-20.04
@@ -338,11 +342,12 @@ jobs:
publish-github-release:
name: Create GitHub release and upload artifacts
if: ${{ inputs.publish_github_release || github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [
needs: [ # The job have to wait for all the artifacts are built.
allocate-runners,
build-linux-amd64-artifacts,
build-linux-arm64-artifacts,
build-macos-artifacts,
build-windows-artifacts,
release-images-to-dockerhub,
]
runs-on: ubuntu-20.04

26
.github/workflows/size-label.yml vendored Normal file
View File

@@ -0,0 +1,26 @@
name: size-labeler
on: [pull_request]
jobs:
labeler:
runs-on: ubuntu-latest
name: Label the PR size
steps:
- uses: codelytv/pr-size-labeler@v1
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
s_label: 'Size: S'
s_max_size: '100'
m_label: 'Size: M'
m_max_size: '500'
l_label: 'Size: L'
l_max_size: '1000'
xl_label: 'Size: XL'
fail_if_xl: 'false'
message_if_xl: >
This PR exceeds the recommended size of 1000 lines.
Please make sure you are NOT addressing multiple issues with one PR.
Note this PR might be rejected due to its size.
github_api_url: 'api.github.com'
files_to_ignore: 'Cargo.lock'

1
Cargo.lock generated
View File

@@ -7034,6 +7034,7 @@ dependencies = [
"common-catalog",
"common-error",
"common-macro",
"common-recordbatch",
"common-telemetry",
"datafusion",
"datatypes",

View File

@@ -19,8 +19,13 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
build-essential \
pkg-config \
python3.10 \
python3.10-dev \
python3-pip
python3.10-dev
# Remove Python 3.8 and install pip.
RUN apt-get -y purge python3.8 && \
apt-get -y autoremove && \
ln -s /usr/bin/python3.10 /usr/bin/python3 && \
curl -sS https://bootstrap.pypa.io/get-pip.py | python3.10
RUN git config --global --add safe.directory /greptimedb

View File

@@ -57,7 +57,10 @@ impl GreptimeDBTelemetryTask {
task_fn: BoxedTaskFunction<Error>,
should_report: Arc<AtomicBool>,
) -> Self {
GreptimeDBTelemetryTask::Enable((RepeatedTask::new(interval, task_fn), should_report))
GreptimeDBTelemetryTask::Enable((
RepeatedTask::new(interval, task_fn).with_initial_delay(Some(Duration::ZERO)),
should_report,
))
}
pub fn disable() -> Self {

View File

@@ -13,6 +13,7 @@
// limitations under the License.
pub mod memory;
pub mod test;
pub mod txn;
use std::any::Any;

View File

@@ -17,7 +17,6 @@ use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::marker::PhantomData;
use std::ops::Range;
use std::sync::RwLock;
use async_trait::async_trait;
@@ -85,21 +84,25 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error> {
let range = req.range();
let RangeRequest {
key,
range_end,
limit,
keys_only,
limit, keys_only, ..
} = req;
let kvs = self.kvs.read().unwrap();
let values = kvs.range(range);
let iter: Box<dyn Iterator<Item = (&Vec<u8>, &Vec<u8>)>> = if range_end.is_empty() {
Box::new(kvs.get_key_value(&key).into_iter())
} else {
Box::new(kvs.range(key..range_end))
};
let mut kvs = iter
let mut more = false;
let mut iter: i64 = 0;
let kvs = values
.take_while(|_| {
let take = limit == 0 || iter != limit;
iter += 1;
more = limit > 0 && iter > limit;
take
})
.map(|(k, v)| {
let key = k.clone();
let value = if keys_only { vec![] } else { v.clone() };
@@ -107,13 +110,6 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
})
.collect::<Vec<_>>();
let more = if limit > 0 && kvs.len() > limit as usize {
kvs.truncate(limit as usize);
true
} else {
false
};
Ok(RangeResponse { kvs, more })
}
@@ -215,36 +211,32 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
&self,
req: DeleteRangeRequest,
) -> Result<DeleteRangeResponse, Self::Error> {
let DeleteRangeRequest {
key,
range_end,
prev_kv,
} = req;
let range = req.range();
let DeleteRangeRequest { prev_kv, .. } = req;
let mut kvs = self.kvs.write().unwrap();
let prev_kvs = if range_end.is_empty() {
kvs.remove(&key)
.into_iter()
.map(|value| KeyValue {
key: key.clone(),
value,
})
.collect::<Vec<_>>()
} else {
let range = Range {
start: key,
end: range_end,
};
kvs.extract_if(|key, _| range.contains(key))
.map(Into::into)
.collect::<Vec<_>>()
};
let keys = kvs
.range(range)
.map(|(key, _)| key.clone())
.collect::<Vec<_>>();
Ok(DeleteRangeResponse {
deleted: prev_kvs.len() as i64,
prev_kvs: if prev_kv { prev_kvs } else { vec![] },
})
let mut prev_kvs = if prev_kv {
Vec::with_capacity(keys.len())
} else {
vec![]
};
let deleted = keys.len() as i64;
for key in keys {
if let Some(value) = kvs.remove(&key) {
if prev_kv {
prev_kvs.push((key.clone(), value).into())
}
}
}
Ok(DeleteRangeResponse { deleted, prev_kvs })
}
async fn batch_delete(
@@ -358,254 +350,63 @@ impl<T: ErrorExt + Send + Sync> TxnService for MemoryKvBackend<T> {
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use super::*;
use crate::error::Error;
use crate::kv_backend::test::{
prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
};
use crate::kv_backend::KvBackend;
use crate::rpc::store::{BatchGetRequest, BatchPutRequest};
use crate::rpc::KeyValue;
use crate::util;
async fn mock_mem_store_with_data() -> MemoryKvBackend<Error> {
let kv_store = MemoryKvBackend::<Error>::new();
let kvs = mock_kvs();
assert!(kv_store
.batch_put(BatchPutRequest {
kvs,
..Default::default()
})
.await
.is_ok());
assert!(kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val11".to_vec(),
..Default::default()
})
.await
.is_ok());
prepare_kv(&kv_store).await;
kv_store
}
fn mock_kvs() -> Vec<KeyValue> {
vec![
KeyValue {
key: b"key1".to_vec(),
value: b"val1".to_vec(),
},
KeyValue {
key: b"key2".to_vec(),
value: b"val2".to_vec(),
},
KeyValue {
key: b"key3".to_vec(),
value: b"val3".to_vec(),
},
]
}
#[tokio::test]
async fn test_put() {
let kv_store = mock_mem_store_with_data().await;
let resp = kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val12".to_vec(),
prev_kv: false,
})
.await
.unwrap();
assert!(resp.prev_kv.is_none());
let resp = kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val13".to_vec(),
prev_kv: true,
})
.await
.unwrap();
let prev_kv = resp.prev_kv.unwrap();
assert_eq!(b"key11", prev_kv.key());
assert_eq!(b"val12", prev_kv.value());
test_kv_put(kv_store).await;
}
#[tokio::test]
async fn test_range() {
let kv_store = mock_mem_store_with_data().await;
let key = b"key1".to_vec();
let range_end = util::get_prefix_end_key(b"key1");
test_kv_range(kv_store).await;
}
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
range_end: range_end.clone(),
limit: 0,
keys_only: false,
})
.await
.unwrap();
#[tokio::test]
async fn test_range_2() {
let kv = MemoryKvBackend::<Error>::new();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
assert_eq!(b"key11", resp.kvs[1].key());
assert_eq!(b"val11", resp.kvs[1].value());
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
range_end: range_end.clone(),
limit: 0,
keys_only: true,
})
.await
.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"", resp.kvs[0].value());
assert_eq!(b"key11", resp.kvs[1].key());
assert_eq!(b"", resp.kvs[1].value());
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
limit: 0,
keys_only: false,
..Default::default()
})
.await
.unwrap();
assert_eq!(1, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
let resp = kv_store
.range(RangeRequest {
key,
range_end,
limit: 1,
keys_only: false,
})
.await
.unwrap();
assert_eq!(1, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
test_kv_range_2(kv).await;
}
#[tokio::test]
async fn test_batch_get() {
let kv_store = mock_mem_store_with_data().await;
let keys = vec![];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert!(resp.kvs.is_empty());
let keys = vec![b"key10".to_vec()];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert!(resp.kvs.is_empty());
let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
assert_eq!(b"key3", resp.kvs[1].key());
assert_eq!(b"val3", resp.kvs[1].value());
test_kv_batch_get(kv_store).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_compare_and_put() {
let kv_store = Arc::new(MemoryKvBackend::<Error>::new());
let success = Arc::new(AtomicU8::new(0));
let mut joins = vec![];
for _ in 0..20 {
let kv_store_clone = kv_store.clone();
let success_clone = success.clone();
let join = tokio::spawn(async move {
let req = CompareAndPutRequest {
key: b"key".to_vec(),
expect: vec![],
value: b"val_new".to_vec(),
};
let resp = kv_store_clone.compare_and_put(req).await.unwrap();
if resp.success {
success_clone.fetch_add(1, Ordering::SeqCst);
}
});
joins.push(join);
}
for join in joins {
join.await.unwrap();
}
assert_eq!(1, success.load(Ordering::SeqCst));
test_kv_compare_and_put(kv_store).await;
}
#[tokio::test]
async fn test_delete_range() {
let kv_store = mock_mem_store_with_data().await;
let req = DeleteRangeRequest {
key: b"key3".to_vec(),
range_end: vec![],
prev_kv: true,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert_eq!(1, resp.prev_kvs.len());
assert_eq!(b"key3", resp.prev_kvs[0].key());
assert_eq!(b"val3", resp.prev_kvs[0].value());
let resp = kv_store.get(b"key3").await.unwrap();
assert!(resp.is_none());
let req = DeleteRangeRequest {
key: b"key2".to_vec(),
range_end: vec![],
prev_kv: false,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert!(resp.prev_kvs.is_empty());
let resp = kv_store.get(b"key2").await.unwrap();
assert!(resp.is_none());
let key = b"key1".to_vec();
let range_end = util::get_prefix_end_key(b"key1");
let req = DeleteRangeRequest {
key: key.clone(),
range_end: range_end.clone(),
prev_kv: true,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert_eq!(2, resp.prev_kvs.len());
let req = RangeRequest {
key,
range_end,
..Default::default()
};
let resp = kv_store.range(req).await.unwrap();
assert!(resp.kvs.is_empty());
test_kv_delete_range(kv_store).await;
}
#[tokio::test]
@@ -636,35 +437,6 @@ mod tests {
async fn test_batch_delete() {
let kv_store = mock_mem_store_with_data().await;
assert!(kv_store.get(b"key1").await.unwrap().is_some());
assert!(kv_store.get(b"key100").await.unwrap().is_none());
let req = BatchDeleteRequest {
keys: vec![b"key1".to_vec(), b"key100".to_vec()],
prev_kv: true,
};
let resp = kv_store.batch_delete(req).await.unwrap();
assert_eq!(1, resp.prev_kvs.len());
assert_eq!(
vec![KeyValue {
key: b"key1".to_vec(),
value: b"val1".to_vec()
}],
resp.prev_kvs
);
assert!(kv_store.get(b"key1").await.unwrap().is_none());
assert!(kv_store.get(b"key2").await.unwrap().is_some());
assert!(kv_store.get(b"key3").await.unwrap().is_some());
let req = BatchDeleteRequest {
keys: vec![b"key2".to_vec(), b"key3".to_vec()],
prev_kv: false,
};
let resp = kv_store.batch_delete(req).await.unwrap();
assert!(resp.prev_kvs.is_empty());
assert!(kv_store.get(b"key2").await.unwrap().is_none());
assert!(kv_store.get(b"key3").await.unwrap().is_none());
test_kv_batch_delete(kv_store).await;
}
}

View File

@@ -0,0 +1,352 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use super::{KvBackend, *};
use crate::error::Error;
use crate::rpc::store::{BatchGetRequest, PutRequest};
use crate::rpc::KeyValue;
use crate::util;
pub fn mock_kvs() -> Vec<KeyValue> {
vec![
KeyValue {
key: b"key1".to_vec(),
value: b"val1".to_vec(),
},
KeyValue {
key: b"key2".to_vec(),
value: b"val2".to_vec(),
},
KeyValue {
key: b"key3".to_vec(),
value: b"val3".to_vec(),
},
]
}
pub async fn prepare_kv(kv_store: &impl KvBackend) {
let kvs = mock_kvs();
assert!(kv_store
.batch_put(BatchPutRequest {
kvs,
..Default::default()
})
.await
.is_ok());
assert!(kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val11".to_vec(),
..Default::default()
})
.await
.is_ok());
}
pub async fn test_kv_put(kv_store: impl KvBackend) {
let resp = kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val12".to_vec(),
prev_kv: false,
})
.await
.unwrap();
assert!(resp.prev_kv.is_none());
let resp = kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val13".to_vec(),
prev_kv: true,
})
.await
.unwrap();
let prev_kv = resp.prev_kv.unwrap();
assert_eq!(b"key11", prev_kv.key());
assert_eq!(b"val12", prev_kv.value());
}
pub async fn test_kv_range(kv_store: impl KvBackend) {
let key = b"key1".to_vec();
let range_end = util::get_prefix_end_key(b"key1");
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
range_end: range_end.clone(),
limit: 0,
keys_only: false,
})
.await
.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
assert_eq!(b"key11", resp.kvs[1].key());
assert_eq!(b"val11", resp.kvs[1].value());
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
range_end: range_end.clone(),
limit: 0,
keys_only: true,
})
.await
.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"", resp.kvs[0].value());
assert_eq!(b"key11", resp.kvs[1].key());
assert_eq!(b"", resp.kvs[1].value());
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
limit: 0,
keys_only: false,
..Default::default()
})
.await
.unwrap();
assert_eq!(1, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
let resp = kv_store
.range(RangeRequest {
key,
range_end,
limit: 1,
keys_only: false,
})
.await
.unwrap();
assert_eq!(1, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
}
pub async fn test_kv_range_2(kv_store: impl KvBackend) {
kv_store
.put(PutRequest::new().with_key("atest").with_value("value"))
.await
.unwrap();
kv_store
.put(PutRequest::new().with_key("test").with_value("value"))
.await
.unwrap();
// If both key and range_end are \0, then range represents all keys.
let result = kv_store
.range(RangeRequest::new().with_range(b"\0".to_vec(), b"\0".to_vec()))
.await
.unwrap();
assert_eq!(result.kvs.len(), 2);
assert!(!result.more);
// If range_end is \0, the range is all keys greater than or equal to the key argument.
let result = kv_store
.range(RangeRequest::new().with_range(b"a".to_vec(), b"\0".to_vec()))
.await
.unwrap();
assert_eq!(result.kvs.len(), 2);
let result = kv_store
.range(RangeRequest::new().with_range(b"b".to_vec(), b"\0".to_vec()))
.await
.unwrap();
assert_eq!(result.kvs.len(), 1);
assert_eq!(result.kvs[0].key, b"test");
// Fetches the keys >= "a", set limit to 1, the `more` should be true.
let result = kv_store
.range(
RangeRequest::new()
.with_range(b"a".to_vec(), b"\0".to_vec())
.with_limit(1),
)
.await
.unwrap();
assert_eq!(result.kvs.len(), 1);
assert!(result.more);
// Fetches the keys >= "a", set limit to 2, the `more` should be false.
let result = kv_store
.range(
RangeRequest::new()
.with_range(b"a".to_vec(), b"\0".to_vec())
.with_limit(2),
)
.await
.unwrap();
assert_eq!(result.kvs.len(), 2);
assert!(!result.more);
// Fetches the keys >= "a", set limit to 3, the `more` should be false.
let result = kv_store
.range(
RangeRequest::new()
.with_range(b"a".to_vec(), b"\0".to_vec())
.with_limit(3),
)
.await
.unwrap();
assert_eq!(result.kvs.len(), 2);
assert!(!result.more);
}
pub async fn test_kv_batch_get(kv_store: impl KvBackend) {
let keys = vec![];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert!(resp.kvs.is_empty());
let keys = vec![b"key10".to_vec()];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert!(resp.kvs.is_empty());
let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
assert_eq!(b"key3", resp.kvs[1].key());
assert_eq!(b"val3", resp.kvs[1].value());
}
pub async fn test_kv_compare_and_put(kv_store: Arc<dyn KvBackend<Error = Error>>) {
let success = Arc::new(AtomicU8::new(0));
let mut joins = vec![];
for _ in 0..20 {
let kv_store_clone = kv_store.clone();
let success_clone = success.clone();
let join = tokio::spawn(async move {
let req = CompareAndPutRequest {
key: b"key".to_vec(),
expect: vec![],
value: b"val_new".to_vec(),
};
let resp = kv_store_clone.compare_and_put(req).await.unwrap();
if resp.success {
success_clone.fetch_add(1, Ordering::SeqCst);
}
});
joins.push(join);
}
for join in joins {
join.await.unwrap();
}
assert_eq!(1, success.load(Ordering::SeqCst));
}
pub async fn test_kv_delete_range(kv_store: impl KvBackend) {
let req = DeleteRangeRequest {
key: b"key3".to_vec(),
range_end: vec![],
prev_kv: true,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert_eq!(1, resp.prev_kvs.len());
assert_eq!(1, resp.deleted);
assert_eq!(b"key3", resp.prev_kvs[0].key());
assert_eq!(b"val3", resp.prev_kvs[0].value());
let resp = kv_store.get(b"key3").await.unwrap();
assert!(resp.is_none());
let req = DeleteRangeRequest {
key: b"key2".to_vec(),
range_end: vec![],
prev_kv: false,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert_eq!(1, resp.deleted);
assert!(resp.prev_kvs.is_empty());
let resp = kv_store.get(b"key2").await.unwrap();
assert!(resp.is_none());
let key = b"key1".to_vec();
let range_end = util::get_prefix_end_key(b"key1");
let req = DeleteRangeRequest {
key: key.clone(),
range_end: range_end.clone(),
prev_kv: true,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert_eq!(2, resp.prev_kvs.len());
let req = RangeRequest {
key,
range_end,
..Default::default()
};
let resp = kv_store.range(req).await.unwrap();
assert!(resp.kvs.is_empty());
}
pub async fn test_kv_batch_delete(kv_store: impl KvBackend) {
assert!(kv_store.get(b"key1").await.unwrap().is_some());
assert!(kv_store.get(b"key100").await.unwrap().is_none());
let req = BatchDeleteRequest {
keys: vec![b"key1".to_vec(), b"key100".to_vec()],
prev_kv: true,
};
let resp = kv_store.batch_delete(req).await.unwrap();
assert_eq!(1, resp.prev_kvs.len());
assert_eq!(
vec![KeyValue {
key: b"key1".to_vec(),
value: b"val1".to_vec()
}],
resp.prev_kvs
);
assert!(kv_store.get(b"key1").await.unwrap().is_none());
assert!(kv_store.get(b"key2").await.unwrap().is_some());
assert!(kv_store.get(b"key3").await.unwrap().is_some());
let req = BatchDeleteRequest {
keys: vec![b"key2".to_vec(), b"key3".to_vec()],
prev_kv: false,
};
let resp = kv_store.batch_delete(req).await.unwrap();
assert!(resp.prev_kvs.is_empty());
assert!(kv_store.get(b"key2").await.unwrap().is_none());
assert!(kv_store.get(b"key3").await.unwrap().is_none());
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::ops::Bound;
use api::v1::meta::{
BatchDeleteRequest as PbBatchDeleteRequest, BatchDeleteResponse as PbBatchDeleteResponse,
@@ -30,6 +31,17 @@ use crate::error;
use crate::error::Result;
use crate::rpc::{util, KeyValue};
pub fn to_range(key: Vec<u8>, range_end: Vec<u8>) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
match (&key[..], &range_end[..]) {
(_, []) => (Bound::Included(key.clone()), Bound::Included(key)),
// If both key and range_end are \0, then range represents all keys.
([0], [0]) => (Bound::Unbounded, Bound::Unbounded),
// If range_end is \0, the range is all keys greater than or equal to the key argument.
(_, [0]) => (Bound::Included(key), Bound::Unbounded),
(_, _) => (Bound::Included(key), Bound::Excluded(range_end)),
}
}
#[derive(Debug, Clone, Default)]
pub struct RangeRequest {
/// key is the first key for the range, If range_end is not given, the
@@ -96,6 +108,11 @@ impl RangeRequest {
}
}
/// Returns the `RangeBounds`.
pub fn range(&self) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
to_range(self.key.clone(), self.range_end.clone())
}
/// key is the first key for the range, If range_end is not given, the
/// request only looks up key.
#[inline]
@@ -690,6 +707,11 @@ impl DeleteRangeRequest {
}
}
/// Returns the `RangeBounds`.
pub fn range(&self) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
to_range(self.key.clone(), self.range_end.clone())
}
/// key is the first key to delete in the range. If range_end is not given,
/// the range is defined to contain only the key argument.
#[inline]

View File

@@ -40,6 +40,7 @@ pub type BoxedTaskFunction<E> = Box<dyn TaskFunction<E> + Send + Sync + 'static>
struct TaskInner<E> {
/// The repeated task handle. This handle is Some if the task is started.
task_handle: Option<JoinHandle<()>>,
/// The task_fn to run. This is Some if the task is not started.
task_fn: Option<BoxedTaskFunction<E>>,
}
@@ -50,6 +51,7 @@ pub struct RepeatedTask<E> {
inner: Mutex<TaskInner<E>>,
started: AtomicBool,
interval: Duration,
initial_delay: Option<Duration>,
}
impl<E> std::fmt::Display for RepeatedTask<E> {
@@ -75,6 +77,9 @@ impl<E> Drop for RepeatedTask<E> {
}
impl<E: ErrorExt + 'static> RepeatedTask<E> {
/// Creates a new repeated task. The `initial_delay` is the delay before the first execution.
/// `initial_delay` default is None, the initial interval uses the `interval`.
/// You can use `with_initial_delay` to set the `initial_delay`.
pub fn new(interval: Duration, task_fn: BoxedTaskFunction<E>) -> Self {
Self {
name: task_fn.name().to_string(),
@@ -85,9 +90,15 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
}),
started: AtomicBool::new(false),
interval,
initial_delay: None,
}
}
pub fn with_initial_delay(mut self, initial_delay: Option<Duration>) -> Self {
self.initial_delay = initial_delay;
self
}
pub fn started(&self) -> bool {
self.started.load(Ordering::Relaxed)
}
@@ -99,17 +110,21 @@ impl<E: ErrorExt + 'static> RepeatedTask<E> {
IllegalStateSnafu { name: &self.name }
);
let interval = self.interval;
let child = self.cancel_token.child_token();
// Safety: The task is not started.
let mut task_fn = inner.task_fn.take().unwrap();
let interval = self.interval;
let mut initial_delay = self.initial_delay;
// TODO(hl): Maybe spawn to a blocking runtime.
let handle = runtime.spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = child.cancelled() => {
return;
let sleep_time = initial_delay.take().unwrap_or(interval);
if sleep_time > Duration::ZERO {
tokio::select! {
_ = tokio::time::sleep(sleep_time) => {}
_ = child.cancelled() => {
return;
}
}
}
if let Err(e) = task_fn.call().await {
@@ -192,4 +207,21 @@ mod tests {
assert_eq!(n.load(Ordering::Relaxed), 5);
}
#[tokio::test]
async fn test_repeated_task_prior_exec() {
common_telemetry::init_default_ut_logging();
let n = Arc::new(AtomicI32::new(0));
let task_fn = TickTask { n: n.clone() };
let task = RepeatedTask::new(Duration::from_millis(100), Box::new(task_fn))
.with_initial_delay(Some(Duration::ZERO));
task.start(crate::bg_runtime()).unwrap();
tokio::time::sleep(Duration::from_millis(550)).await;
task.stop().await.unwrap();
assert_eq!(n.load(Ordering::Relaxed), 6);
}
}

View File

@@ -654,6 +654,17 @@ impl ListValue {
Arc::new(new_item_field(output_type.item_type().as_arrow_type())),
))
}
/// use 'the first item size' * 'length of items' to estimate the size.
/// it could be inaccurate.
fn estimated_size(&self) -> usize {
if let Some(items) = &self.items {
if let Some(item) = items.first() {
return item.as_value_ref().data_size() * items.len();
}
}
0
}
}
impl Default for ListValue {
@@ -1090,12 +1101,46 @@ impl<'a> PartialOrd for ListValueRef<'a> {
}
}
impl<'a> ValueRef<'a> {
/// Returns the size of the underlying data in bytes,
/// The size is estimated and only considers the data size.
pub fn data_size(&self) -> usize {
match *self {
ValueRef::Null => 0,
ValueRef::Boolean(_) => 1,
ValueRef::UInt8(_) => 1,
ValueRef::UInt16(_) => 2,
ValueRef::UInt32(_) => 4,
ValueRef::UInt64(_) => 8,
ValueRef::Int8(_) => 1,
ValueRef::Int16(_) => 2,
ValueRef::Int32(_) => 4,
ValueRef::Int64(_) => 8,
ValueRef::Float32(_) => 4,
ValueRef::Float64(_) => 8,
ValueRef::String(v) => std::mem::size_of_val(v),
ValueRef::Binary(v) => std::mem::size_of_val(v),
ValueRef::Date(_) => 4,
ValueRef::DateTime(_) => 8,
ValueRef::Timestamp(_) => 16,
ValueRef::Time(_) => 16,
ValueRef::Duration(_) => 16,
ValueRef::Interval(_) => 24,
ValueRef::List(v) => match v {
ListValueRef::Indexed { vector, .. } => vector.memory_size() / vector.len(),
ListValueRef::Ref { val } => val.estimated_size(),
},
}
}
}
#[cfg(test)]
mod tests {
use arrow::datatypes::DataType as ArrowDataType;
use num_traits::Float;
use super::*;
use crate::vectors::ListVectorBuilder;
#[test]
fn test_try_from_scalar_value() {
@@ -2158,4 +2203,90 @@ mod tests {
duration_to_scalar_value(TimeUnit::Nanosecond, Some(1))
);
}
fn check_value_ref_size_eq(value_ref: &ValueRef, size: usize) {
assert_eq!(value_ref.data_size(), size);
}
#[test]
fn test_value_ref_estimated_size() {
assert_eq!(std::mem::size_of::<ValueRef>(), 24);
check_value_ref_size_eq(&ValueRef::Boolean(true), 1);
check_value_ref_size_eq(&ValueRef::UInt8(1), 1);
check_value_ref_size_eq(&ValueRef::UInt16(1), 2);
check_value_ref_size_eq(&ValueRef::UInt32(1), 4);
check_value_ref_size_eq(&ValueRef::UInt64(1), 8);
check_value_ref_size_eq(&ValueRef::Int8(1), 1);
check_value_ref_size_eq(&ValueRef::Int16(1), 2);
check_value_ref_size_eq(&ValueRef::Int32(1), 4);
check_value_ref_size_eq(&ValueRef::Int64(1), 8);
check_value_ref_size_eq(&ValueRef::Float32(1.0.into()), 4);
check_value_ref_size_eq(&ValueRef::Float64(1.0.into()), 8);
check_value_ref_size_eq(&ValueRef::String("greptimedb"), 10);
check_value_ref_size_eq(&ValueRef::Binary(b"greptimedb"), 10);
check_value_ref_size_eq(&ValueRef::Date(Date::new(1)), 4);
check_value_ref_size_eq(&ValueRef::DateTime(DateTime::new(1)), 8);
check_value_ref_size_eq(&ValueRef::Timestamp(Timestamp::new_millisecond(1)), 16);
check_value_ref_size_eq(&ValueRef::Time(Time::new_millisecond(1)), 16);
check_value_ref_size_eq(
&ValueRef::Interval(Interval::from_month_day_nano(1, 2, 3)),
24,
);
check_value_ref_size_eq(&ValueRef::Duration(Duration::new_millisecond(1)), 16);
check_value_ref_size_eq(
&ValueRef::List(ListValueRef::Ref {
val: &ListValue {
items: Some(Box::new(vec![
Value::String("hello world".into()),
Value::String("greptimedb".into()),
])),
datatype: ConcreteDataType::string_datatype(),
},
}),
22,
);
let data = vec![
Some(vec![Some(1), Some(2), Some(3)]),
None,
Some(vec![Some(4), None, Some(6)]),
];
let mut builder =
ListVectorBuilder::with_type_capacity(ConcreteDataType::int32_datatype(), 8);
for vec_opt in &data {
if let Some(vec) = vec_opt {
let values = vec.iter().map(|v| Value::from(*v)).collect();
let values = Some(Box::new(values));
let list_value = ListValue::new(values, ConcreteDataType::int32_datatype());
builder.push(Some(ListValueRef::Ref { val: &list_value }));
} else {
builder.push(None);
}
}
let vector = builder.finish();
check_value_ref_size_eq(
&ValueRef::List(ListValueRef::Indexed {
vector: &vector,
idx: 0,
}),
85,
);
check_value_ref_size_eq(
&ValueRef::List(ListValueRef::Indexed {
vector: &vector,
idx: 1,
}),
85,
);
check_value_ref_size_eq(
&ValueRef::List(ListValueRef::Indexed {
vector: &vector,
idx: 2,
}),
85,
)
}
}

View File

@@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::InsertRequests;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use servers::error as server_error;
use servers::error::AuthSnafu;
use servers::opentsdb::codec::DataPoint;
use servers::opentsdb::data_point_to_grpc_row_insert_requests;
use servers::query_handler::OpentsdbProtocolHandler;
use session::context::QueryContextRef;
use snafu::prelude::*;
@@ -27,23 +27,27 @@ use crate::instance::Instance;
#[async_trait]
impl OpentsdbProtocolHandler for Instance {
async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> server_error::Result<()> {
async fn exec(
&self,
data_points: Vec<DataPoint>,
ctx: QueryContextRef,
) -> server_error::Result<usize> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::Opentsdb)
.context(AuthSnafu)?;
let requests = InsertRequests {
inserts: vec![data_point.as_grpc_insert()],
};
let _ = self
.handle_inserts(requests, ctx)
let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
let output = self
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{data_point:?}"),
})?;
Ok(())
.context(servers::error::ExecuteGrpcQuerySnafu)?;
Ok(match output {
common_query::Output::AffectedRows(rows) => rows,
_ => unreachable!(),
})
}
}

View File

@@ -19,7 +19,7 @@ common-base = { workspace = true }
common-config = { workspace = true }
common-error = { workspace = true }
common-macro = { workspace = true }
common-meta = { workspace = true }
common-meta = { workspace = true, features = ["testing"] }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }
futures-util.workspace = true

View File

@@ -15,6 +15,7 @@
//! [KvBackend] implementation based on [raft_engine::Engine].
use std::any::Any;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::sync::RwLock;
use common_error::ext::BoxedError;
@@ -28,6 +29,7 @@ use common_meta::rpc::store::{
RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use common_meta::util::get_next_prefix_key;
use raft_engine::{Config, Engine, LogBatch};
use snafu::ResultExt;
@@ -137,29 +139,48 @@ impl KvBackend for RaftEngineBackend {
async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error> {
let mut res = vec![];
let (start, end) = req.range();
let RangeRequest {
keys_only, limit, ..
} = req;
let (start_key, end_key) = match (start, end) {
(Included(start), Included(end)) => (Some(start), Some(get_next_prefix_key(&end))),
(Unbounded, Unbounded) => (None, None),
(Included(start), Excluded(end)) => (Some(start), Some(end)),
(Included(start), Unbounded) => (Some(start), None),
_ => unreachable!(),
};
let mut more = false;
let mut iter = 0;
self.engine
.read()
.unwrap()
.scan_raw_messages(
SYSTEM_NAMESPACE,
Some(&req.key),
Some(&req.range_end),
start_key.as_deref(),
end_key.as_deref(),
false,
|key, value| {
res.push(KeyValue {
key: key.to_vec(),
value: value.to_vec(),
});
true
let take = limit == 0 || iter != limit;
iter += 1;
more = limit > 0 && iter > limit;
if take {
res.push(KeyValue {
key: key.to_vec(),
value: if keys_only { vec![] } else { value.to_vec() },
});
}
take
},
)
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Ok(RangeResponse {
kvs: res,
more: false,
})
Ok(RangeResponse { kvs: res, more })
}
async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error> {
@@ -275,7 +296,7 @@ impl KvBackend for RaftEngineBackend {
key,
range_end,
limit: 0,
keys_only: true,
keys_only: false,
};
let range_resp = self.range(range).await?;
@@ -383,7 +404,12 @@ fn engine_delete(engine: &Engine, key: &[u8]) -> meta_error::Result<()> {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use common_meta::kv_backend::test::{
prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
};
use common_test_util::temp_dir::create_temp_dir;
use raft_engine::{Config, ReadableSize, RecoveryMode};
@@ -615,4 +641,66 @@ mod tests {
keys
);
}
#[tokio::test]
async fn test_range() {
let dir = create_temp_dir("range");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
prepare_kv(&backend).await;
test_kv_range(backend).await;
}
#[tokio::test]
async fn test_range_2() {
let dir = create_temp_dir("range2");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
test_kv_range_2(backend).await;
}
#[tokio::test]
async fn test_put() {
let dir = create_temp_dir("put");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
prepare_kv(&backend).await;
test_kv_put(backend).await;
}
#[tokio::test]
async fn test_batch_get() {
let dir = create_temp_dir("batch_get");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
prepare_kv(&backend).await;
test_kv_batch_get(backend).await;
}
#[tokio::test]
async fn test_batch_delete() {
let dir = create_temp_dir("batch_delete");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
prepare_kv(&backend).await;
test_kv_batch_delete(backend).await;
}
#[tokio::test]
async fn test_delete_range() {
let dir = create_temp_dir("delete_range");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
prepare_kv(&backend).await;
test_kv_delete_range(backend).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_compare_and_put_2() {
let dir = create_temp_dir("compare_and_put");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
prepare_kv(&backend).await;
test_kv_compare_and_put(Arc::new(backend)).await;
}
}

View File

@@ -12,6 +12,7 @@ catalog = { workspace = true }
common-catalog = { workspace = true }
common-error = { workspace = true }
common-macro = { workspace = true }
common-recordbatch = { workspace = true }
common-telemetry = { workspace = true }
datafusion.workspace = true
datatypes = { workspace = true }

View File

@@ -13,6 +13,7 @@
// limitations under the License.
mod empty_metric;
mod histogram_fold;
mod instant_manipulate;
mod normalize;
mod planner;

View File

@@ -0,0 +1,798 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::task::Poll;
use std::time::Instant;
use common_recordbatch::RecordBatch as GtRecordBatch;
use common_telemetry::warn;
use datafusion::arrow::array::AsArray;
use datafusion::arrow::compute::{self, concat_batches, SortOptions};
use datafusion::arrow::datatypes::{DataType, Field, Float64Type, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::{DFField, DFSchema, DFSchemaRef};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::TaskContext;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion::physical_plan::expressions::Column as PhyColumn;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use datafusion::prelude::{Column, Expr};
use datatypes::prelude::{ConcreteDataType, DataType as GtDataType};
use datatypes::schema::Schema as GtSchema;
use datatypes::value::{ListValue, Value};
use datatypes::vectors::MutableVector;
use futures::{ready, Stream, StreamExt};
/// `HistogramFold` will fold the conventional (non-native) histogram ([1]) for later
/// computing. Specifically, it will transform the `le` and `field` column into a complex
/// type, and samples on other tag columns:
/// - `le` will become a [ListArray] of [f64]. With each bucket bound parsed
/// - `field` will become a [ListArray] of [f64]
/// - other columns will be sampled every `bucket_num` element, but their types won't change.
///
/// Due to the folding or sampling, the output rows number will become `input_rows` / `bucket_num`.
///
/// # Requirement
/// - Input should be sorted on `<tag list>, le ASC, ts`.
/// - The value set of `le` should be same. I.e., buckets of every series should be same.
///
/// [1]: https://prometheus.io/docs/concepts/metric_types/#histogram
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct HistogramFold {
/// Name of the `le` column. It's a special column in prometheus
/// for implementing conventional histogram. It's a string column
/// with "literal" float value, like "+Inf", "0.001" etc.
le_column: String,
ts_column: String,
input: LogicalPlan,
field_column: String,
output_schema: DFSchemaRef,
}
impl UserDefinedLogicalNodeCore for HistogramFold {
fn name(&self) -> &str {
Self::name()
}
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.input]
}
fn schema(&self) -> &DFSchemaRef {
&self.output_schema
}
fn expressions(&self) -> Vec<Expr> {
vec![]
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"HistogramFold: le={}, field={}",
self.le_column, self.field_column
)
}
fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
Self {
le_column: self.le_column.clone(),
ts_column: self.ts_column.clone(),
input: inputs[0].clone(),
field_column: self.field_column.clone(),
// This method cannot return error. Otherwise we should re-calculate
// the output schema
output_schema: self.output_schema.clone(),
}
}
}
impl HistogramFold {
#[allow(dead_code)]
pub fn new(
le_column: String,
field_column: String,
ts_column: String,
input: LogicalPlan,
) -> DataFusionResult<Self> {
let input_schema = input.schema();
Self::check_schema(input_schema, &le_column, &field_column, &ts_column)?;
let output_schema = Self::convert_schema(input_schema, &le_column, &field_column)?;
Ok(Self {
le_column,
ts_column,
input,
field_column,
output_schema,
})
}
pub const fn name() -> &'static str {
"HistogramFold"
}
fn check_schema(
input_schema: &DFSchemaRef,
le_column: &str,
field_column: &str,
ts_column: &str,
) -> DataFusionResult<()> {
let check_column = |col| {
if !input_schema.has_column_with_unqualified_name(col) {
return Err(DataFusionError::SchemaError(
datafusion::common::SchemaError::FieldNotFound {
field: Box::new(Column::new(None::<String>, col)),
valid_fields: input_schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
},
));
} else {
Ok(())
}
};
check_column(le_column)?;
check_column(ts_column)?;
check_column(field_column)
}
#[allow(dead_code)]
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let input_schema = self.input.schema();
// safety: those fields are checked in `check_schema()`
let le_column_index = input_schema
.index_of_column_by_name(None, &self.le_column)
.unwrap()
.unwrap();
let field_column_index = input_schema
.index_of_column_by_name(None, &self.field_column)
.unwrap()
.unwrap();
let ts_column_index = input_schema
.index_of_column_by_name(None, &self.ts_column)
.unwrap()
.unwrap();
Arc::new(HistogramFoldExec {
le_column_index,
field_column_index,
ts_column_index,
input: exec_input,
output_schema: Arc::new(self.output_schema.as_ref().into()),
metric: ExecutionPlanMetricsSet::new(),
})
}
/// Transform the schema
///
/// - `le` will become a [ListArray] of [f64]. With each bucket bound parsed
/// - `field` will become a [ListArray] of [f64]
fn convert_schema(
input_schema: &DFSchemaRef,
le_column: &str,
field_column: &str,
) -> DataFusionResult<DFSchemaRef> {
let mut fields = input_schema.fields().clone();
// safety: those fields are checked in `check_schema()`
let le_column_idx = input_schema
.index_of_column_by_name(None, le_column)?
.unwrap();
let field_column_idx = input_schema
.index_of_column_by_name(None, field_column)?
.unwrap();
// transform `le`
let le_field: Field = fields[le_column_idx].field().as_ref().clone();
let le_field = le_field.with_data_type(DataType::Float64);
let folded_le_datatype = DataType::List(Arc::new(le_field));
let folded_le = DFField::new(
fields[le_column_idx].qualifier().cloned(),
fields[le_column_idx].name(),
folded_le_datatype,
false,
);
// transform `field`
// to avoid ambiguity, that field will be referenced as `the_field` below.
let the_field: Field = fields[field_column_idx].field().as_ref().clone();
let folded_field_datatype = DataType::List(Arc::new(the_field));
let folded_field = DFField::new(
fields[field_column_idx].qualifier().cloned(),
fields[field_column_idx].name(),
folded_field_datatype,
false,
);
fields[le_column_idx] = folded_le;
fields[field_column_idx] = folded_field;
Ok(Arc::new(DFSchema::new_with_metadata(
fields,
HashMap::new(),
)?))
}
}
#[derive(Debug)]
pub struct HistogramFoldExec {
/// Index for `le` column in the schema of input.
le_column_index: usize,
input: Arc<dyn ExecutionPlan>,
output_schema: SchemaRef,
/// Index for field column in the schema of input.
field_column_index: usize,
ts_column_index: usize,
metric: ExecutionPlanMetricsSet,
}
impl ExecutionPlan for HistogramFoldExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.output_schema.clone()
}
fn output_partitioning(&self) -> Partitioning {
self.input.output_partitioning()
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.input.output_ordering()
}
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
let mut cols = self
.tag_col_exprs()
.into_iter()
.map(|expr| PhysicalSortRequirement {
expr,
options: None,
})
.collect::<Vec<PhysicalSortRequirement>>();
// add le ASC
cols.push(PhysicalSortRequirement {
expr: Arc::new(PhyColumn::new(
self.output_schema.field(self.le_column_index).name(),
self.le_column_index,
)),
options: Some(SortOptions {
descending: false, // +INF in the last
nulls_first: false, // not nullable
}),
});
// add ts
cols.push(PhysicalSortRequirement {
expr: Arc::new(PhyColumn::new(
self.output_schema.field(self.ts_column_index).name(),
self.ts_column_index,
)),
options: None,
});
vec![Some(cols)]
}
fn required_input_distribution(&self) -> Vec<Distribution> {
// partition on all tag columns, i.e., non-le, non-ts and non-field columns
vec![Distribution::HashPartitioned(self.tag_col_exprs())]
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![true; self.children().len()]
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
// cannot change schema with this method
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
assert!(!children.is_empty());
Ok(Arc::new(Self {
input: children[0].clone(),
metric: self.metric.clone(),
le_column_index: self.le_column_index,
ts_column_index: self.ts_column_index,
output_schema: self.output_schema.clone(),
field_column_index: self.field_column_index,
}))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let batch_size = context.session_config().batch_size();
let input = self.input.execute(partition, context)?;
let output_schema = self.output_schema.clone();
let mut normal_indices = (0..output_schema.fields().len()).collect::<HashSet<_>>();
normal_indices.remove(&self.le_column_index);
normal_indices.remove(&self.field_column_index);
Ok(Box::pin(HistogramFoldStream {
le_column_index: self.le_column_index,
field_column_index: self.field_column_index,
normal_indices: normal_indices.into_iter().collect(),
bucket_size: None,
input_buffer: vec![],
input,
output_schema,
metric: baseline_metric,
batch_size,
input_buffered_rows: 0,
output_buffer: HistogramFoldStream::empty_output_buffer(&self.output_schema)?,
output_buffered_rows: 0,
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
fn statistics(&self) -> Statistics {
Statistics {
num_rows: None,
total_byte_size: None,
column_statistics: None,
is_exact: false,
}
}
}
impl HistogramFoldExec {
/// Return all the [PhysicalExpr] of tag columns in order.
///
/// Tag columns are all columns except `le`, `field` and `ts` columns.
pub fn tag_col_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
self.input
.schema()
.fields()
.iter()
.enumerate()
.filter_map(|(idx, field)| {
if idx == self.le_column_index
|| idx == self.field_column_index
|| idx == self.ts_column_index
{
None
} else {
Some(Arc::new(PhyColumn::new(field.name(), idx)) as _)
}
})
.collect()
}
}
impl DisplayAs for HistogramFoldExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"HistogramFoldExec: le=@{}, field=@{}",
self.le_column_index, self.field_column_index
)
}
}
}
}
pub struct HistogramFoldStream {
// internal states
le_column_index: usize,
field_column_index: usize,
/// Columns need not folding
normal_indices: Vec<usize>,
bucket_size: Option<usize>,
/// Expected output batch size
batch_size: usize,
output_schema: SchemaRef,
// buffers
input_buffer: Vec<RecordBatch>,
input_buffered_rows: usize,
output_buffer: Vec<Box<dyn MutableVector>>,
output_buffered_rows: usize,
// runtime things
input: SendableRecordBatchStream,
metric: BaselineMetrics,
}
impl RecordBatchStream for HistogramFoldStream {
fn schema(&self) -> SchemaRef {
self.output_schema.clone()
}
}
impl Stream for HistogramFoldStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = loop {
match ready!(self.input.poll_next_unpin(cx)) {
Some(batch) => {
let batch = batch?;
let timer = Instant::now();
let Some(result) = self.fold_input(batch)? else {
self.metric.elapsed_compute().add_elapsed(timer);
continue;
};
self.metric.elapsed_compute().add_elapsed(timer);
break Poll::Ready(Some(result));
}
None => break Poll::Ready(self.take_output_buf()?.map(Ok)),
}
};
self.metric.record_poll(poll)
}
}
impl HistogramFoldStream {
/// The inner most `Result` is for `poll_next()`
pub fn fold_input(
&mut self,
input: RecordBatch,
) -> DataFusionResult<Option<DataFusionResult<RecordBatch>>> {
let Some(bucket_num) = self.calculate_bucket_num(&input)? else {
return Ok(None);
};
if self.input_buffered_rows + input.num_rows() < bucket_num {
// not enough rows to fold
self.push_input_buf(input);
return Ok(None);
}
self.fold_buf(bucket_num, input)?;
if self.output_buffered_rows >= self.batch_size {
return Ok(self.take_output_buf()?.map(Ok));
}
Ok(None)
}
pub fn empty_output_buffer(
schema: &SchemaRef,
) -> DataFusionResult<Vec<Box<dyn MutableVector>>> {
let mut builders = Vec::with_capacity(schema.fields().len());
for field in schema.fields() {
let concrete_datatype = ConcreteDataType::try_from(field.data_type()).unwrap();
let mutable_vector = concrete_datatype.create_mutable_vector(0);
builders.push(mutable_vector);
}
Ok(builders)
}
fn calculate_bucket_num(&mut self, batch: &RecordBatch) -> DataFusionResult<Option<usize>> {
if let Some(size) = self.bucket_size {
return Ok(Some(size));
}
let inf_pos = self.find_positive_inf(batch)?;
if inf_pos == batch.num_rows() {
// no positive inf found, append to buffer and wait for next batch
self.push_input_buf(batch.clone());
return Ok(None);
}
// else we found the positive inf.
// calculate the bucket size
let bucket_size = inf_pos + self.input_buffered_rows + 1;
Ok(Some(bucket_size))
}
/// Fold record batches from input buffer and put to output buffer
fn fold_buf(&mut self, bucket_num: usize, input: RecordBatch) -> DataFusionResult<()> {
self.push_input_buf(input);
// TODO(ruihang): this concat is avoidable.
let batch = concat_batches(&self.input.schema(), self.input_buffer.drain(..).as_ref())?;
let mut remaining_rows = self.input_buffered_rows;
let mut cursor = 0;
let gt_schema = GtSchema::try_from(self.input.schema()).unwrap();
let batch = GtRecordBatch::try_from_df_record_batch(Arc::new(gt_schema), batch).unwrap();
while remaining_rows >= bucket_num {
// "sample" normal columns
for normal_index in &self.normal_indices {
let val = batch.column(*normal_index).get(cursor);
self.output_buffer[*normal_index].push_value_ref(val.as_value_ref());
}
// "fold" `le` and field columns
let le_array = batch.column(self.le_column_index);
let field_array = batch.column(self.field_column_index);
let mut le_item = vec![];
let mut field_item = vec![];
for bias in 0..bucket_num {
let le_str_val = le_array.get(cursor + bias);
let le_str_val_ref = le_str_val.as_value_ref();
let le_str = le_str_val_ref
.as_string()
.unwrap()
.expect("le column should not be nullable");
let le = le_str.parse::<f64>().unwrap();
let le_val = Value::from(le);
le_item.push(le_val);
let field = field_array.get(cursor + bias);
field_item.push(field);
}
let le_list_val = Value::List(ListValue::new(
Some(Box::new(le_item)),
ConcreteDataType::float64_datatype(),
));
let field_list_val = Value::List(ListValue::new(
Some(Box::new(field_item)),
ConcreteDataType::float64_datatype(),
));
self.output_buffer[self.le_column_index].push_value_ref(le_list_val.as_value_ref());
self.output_buffer[self.field_column_index]
.push_value_ref(field_list_val.as_value_ref());
cursor += bucket_num;
remaining_rows -= bucket_num;
self.output_buffered_rows += 1;
}
let remaining_input_batch = batch.into_df_record_batch().slice(cursor, remaining_rows);
self.input_buffered_rows = remaining_input_batch.num_rows();
self.input_buffer.push(remaining_input_batch);
Ok(())
}
fn push_input_buf(&mut self, batch: RecordBatch) {
self.input_buffered_rows += batch.num_rows();
self.input_buffer.push(batch);
}
fn take_output_buf(&mut self) -> DataFusionResult<Option<RecordBatch>> {
if self.output_buffered_rows == 0 {
if self.input_buffered_rows != 0 {
warn!(
"input buffer is not empty, {} rows remaining",
self.input_buffered_rows
);
}
return Ok(None);
}
let mut output_buf = Self::empty_output_buffer(&self.output_schema)?;
std::mem::swap(&mut self.output_buffer, &mut output_buf);
let mut columns = Vec::with_capacity(output_buf.len());
for builder in output_buf.iter_mut() {
columns.push(builder.to_vector().to_arrow_array());
}
// overwrite default list datatype to change field name
columns[self.le_column_index] = compute::cast(
&columns[self.le_column_index],
self.output_schema.field(self.le_column_index).data_type(),
)?;
columns[self.field_column_index] = compute::cast(
&columns[self.field_column_index],
self.output_schema
.field(self.field_column_index)
.data_type(),
)?;
self.output_buffered_rows = 0;
RecordBatch::try_new(self.output_schema.clone(), columns)
.map(Some)
.map_err(DataFusionError::ArrowError)
}
/// Find the first `+Inf` which indicates the end of the bucket group
///
/// If the return value equals to batch's num_rows means the it's not found
/// in this batch
fn find_positive_inf(&self, batch: &RecordBatch) -> DataFusionResult<usize> {
// fuse this function. It should not be called when the
// bucket size is already know.
if let Some(bucket_size) = self.bucket_size {
return Ok(bucket_size);
}
let string_le_array = batch.column(self.le_column_index);
let float_le_array = compute::cast(&string_le_array, &DataType::Float64).map_err(|e| {
DataFusionError::Execution(format!(
"cannot cast {} array to float64 array: {:?}",
string_le_array.data_type(),
e
))
})?;
let le_as_f64_array = float_le_array
.as_primitive_opt::<Float64Type>()
.ok_or_else(|| {
DataFusionError::Execution(format!(
"expect a float64 array, but found {}",
float_le_array.data_type()
))
})?;
for (i, v) in le_as_f64_array.iter().enumerate() {
if let Some(v) = v && v == f64::INFINITY {
return Ok(i);
}
}
Ok(batch.num_rows())
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use datafusion::arrow::array::Float64Array;
use datafusion::arrow::datatypes::Schema;
use datafusion::common::ToDFSchema;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
use datatypes::arrow_array::StringArray;
use super::*;
fn prepare_test_data() -> MemoryExec {
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new("le", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
]));
// 12 items
let host_column_1 = Arc::new(StringArray::from(vec![
"host_1", "host_1", "host_1", "host_1", "host_1", "host_1", "host_1", "host_1",
"host_1", "host_1", "host_1", "host_1",
])) as _;
let le_column_1 = Arc::new(StringArray::from(vec![
"0.001", "0.1", "10", "1000", "+Inf", "0.001", "0.1", "10", "1000", "+inf", "0.001",
"0.1",
])) as _;
let val_column_1 = Arc::new(Float64Array::from(vec![
0_0.0, 1.0, 1.0, 5.0, 5.0, 0_0.0, 20.0, 60.0, 70.0, 100.0, 0_1.0, 1.0,
])) as _;
// 2 items
let host_column_2 = Arc::new(StringArray::from(vec!["host_1", "host_1"])) as _;
let le_column_2 = Arc::new(StringArray::from(vec!["10", "1000"])) as _;
let val_column_2 = Arc::new(Float64Array::from(vec![1.0, 1.0])) as _;
// 11 items
let host_column_3 = Arc::new(StringArray::from(vec![
"host_1", "host_2", "host_2", "host_2", "host_2", "host_2", "host_2", "host_2",
"host_2", "host_2", "host_2",
])) as _;
let le_column_3 = Arc::new(StringArray::from(vec![
"+INF", "0.001", "0.1", "10", "1000", "+iNf", "0.001", "0.1", "10", "1000", "+Inf",
])) as _;
let val_column_3 = Arc::new(Float64Array::from(vec![
1.0, 0_0.0, 0.0, 0.0, 0.0, 0.0, 0_0.0, 1.0, 2.0, 3.0, 4.0,
])) as _;
let data_1 = RecordBatch::try_new(
schema.clone(),
vec![host_column_1, le_column_1, val_column_1],
)
.unwrap();
let data_2 = RecordBatch::try_new(
schema.clone(),
vec![host_column_2, le_column_2, val_column_2],
)
.unwrap();
let data_3 = RecordBatch::try_new(
schema.clone(),
vec![host_column_3, le_column_3, val_column_3],
)
.unwrap();
MemoryExec::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap()
}
#[tokio::test]
async fn fold_overall() {
let memory_exec = Arc::new(prepare_test_data());
let output_schema = Arc::new(
(*HistogramFold::convert_schema(
&Arc::new(memory_exec.schema().to_dfschema().unwrap()),
"le",
"val",
)
.unwrap()
.as_ref())
.clone()
.into(),
);
let fold_exec = Arc::new(HistogramFoldExec {
le_column_index: 1,
field_column_index: 2,
ts_column_index: 9999, // not exist but doesn't matter
input: memory_exec,
output_schema,
metric: ExecutionPlanMetricsSet::new(),
});
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
.await
.unwrap();
let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();
let expected = String::from(
"+--------+---------------------------------+--------------------------------+
| host | le | val |
+--------+---------------------------------+--------------------------------+
| host_1 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 1.0, 1.0, 5.0, 5.0] |
| host_1 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 20.0, 60.0, 70.0, 100.0] |
| host_1 | [0.001, 0.1, 10.0, 1000.0, inf] | [1.0, 1.0, 1.0, 1.0, 1.0] |
| host_2 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 0.0, 0.0, 0.0, 0.0] |
| host_2 | [0.001, 0.1, 10.0, 1000.0, inf] | [0.0, 1.0, 2.0, 3.0, 4.0] |
+--------+---------------------------------+--------------------------------+",
);
assert_eq!(result_literal, expected);
}
#[test]
fn confirm_schema() {
let input_schema = Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new("le", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
])
.to_dfschema_ref()
.unwrap();
let expected_output_schema = Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new(
"le",
DataType::List(Arc::new(Field::new("le", DataType::Float64, true))),
false,
),
Field::new(
"val",
DataType::List(Arc::new(Field::new("val", DataType::Float64, true))),
false,
),
])
.to_dfschema_ref()
.unwrap();
let actual = HistogramFold::convert_schema(&input_schema, "le", "val").unwrap();
assert_eq!(actual, expected_output_schema)
}
}

View File

@@ -61,6 +61,8 @@ use crate::functions::{
/// `time()` function in PromQL.
const SPECIAL_TIME_FUNCTION: &str = "time";
/// `histogram_quantile` function in PromQL
const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile";
const DEFAULT_TIME_INDEX_COLUMN: &str = "time";
@@ -440,6 +442,10 @@ impl PromPlanner {
}));
}
if func.name == SPECIAL_HISTOGRAM_QUANTILE {
todo!()
}
let args = self.create_function_args(&args.args)?;
let input = self
.prom_expr_to_plan(args.input.with_context(|| ExpectExprSnafu {

View File

@@ -84,17 +84,19 @@ pub async fn put(
let summary = params.contains_key("summary");
let details = params.contains_key("details");
let data_points = parse_data_points(body).await?;
let data_point_requests = parse_data_points(body).await?;
let data_points = data_point_requests
.iter()
.map(|point| point.clone().into())
.collect::<Vec<_>>();
let response = if !summary && !details {
for data_point in data_points.into_iter() {
if let Err(e) = opentsdb_handler.exec(&data_point.into(), ctx.clone()).await {
// Not debugging purpose, failed fast.
return error::InternalSnafu {
err_msg: e.to_string(),
}
.fail();
if let Err(e) = opentsdb_handler.exec(data_points, ctx.clone()).await {
// Not debugging purpose, failed fast.
return error::InternalSnafu {
err_msg: e.to_string(),
}
.fail();
}
(HttpStatusCode::NO_CONTENT, Json(OpentsdbPutResponse::Empty))
} else {
@@ -108,15 +110,11 @@ pub async fn put(
},
};
for data_point in data_points.into_iter() {
let result = opentsdb_handler
.exec(&data_point.clone().into(), ctx.clone())
.await;
for (data_point, request) in data_points.into_iter().zip(data_point_requests) {
let result = opentsdb_handler.exec(vec![data_point], ctx.clone()).await;
match result {
Ok(()) => response.on_success(),
Err(e) => {
response.on_failed(data_point, e);
}
Ok(affected_rows) => response.on_success(affected_rows),
Err(e) => response.on_failed(request, e),
}
}
(
@@ -151,8 +149,8 @@ pub struct OpentsdbDebuggingResponse {
}
impl OpentsdbDebuggingResponse {
fn on_success(&mut self) {
self.success += 1;
fn on_success(&mut self, affected_rows: usize) {
self.success += affected_rows as i32;
}
fn on_failed(&mut self, datapoint: DataPointRequest, error: impl ErrorExt) {

View File

@@ -20,16 +20,20 @@ use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::error;
use futures::StreamExt;
use tokio::sync::broadcast;
use self::codec::DataPoint;
use crate::error::Result;
use crate::opentsdb::connection::Connection;
use crate::opentsdb::handler::Handler;
use crate::prom_store::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME};
use crate::query_handler::OpentsdbProtocolHandlerRef;
use crate::row_writer::{self, MultiTableData};
use crate::server::{AbortableStream, BaseTcpServer, Server};
use crate::shutdown::Shutdown;
@@ -126,3 +130,38 @@ impl Server for OpentsdbServer {
OPENTSDB_SERVER
}
}
pub fn data_point_to_grpc_row_insert_requests(
data_points: Vec<DataPoint>,
) -> Result<(RowInsertRequests, usize)> {
let mut multi_table_data = MultiTableData::new();
for mut data_point in data_points {
let tags: Vec<(String, String)> = std::mem::take(data_point.tags_mut());
let table_name = data_point.metric();
let value = data_point.value();
let timestamp = data_point.ts_millis();
// length of tags + 2 extra columns for greptime_timestamp and the value
let num_columns = tags.len() + 2;
let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 1);
let mut one_row = table_data.alloc_one_row();
// tags
row_writer::write_tags(table_data, tags.into_iter(), &mut one_row)?;
// value
row_writer::write_f64(table_data, FIELD_COLUMN_NAME, value, &mut one_row)?;
// timestamp
row_writer::write_ts_millis(
table_data,
TIMESTAMP_COLUMN_NAME,
Some(timestamp),
&mut one_row,
)?;
table_data.add_row(one_row);
}
Ok(multi_table_data.into_row_insert_requests())
}

View File

@@ -19,7 +19,7 @@ use crate::error::{self, Result};
pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
pub const OPENTSDB_FIELD_COLUMN_NAME: &str = "greptime_value";
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DataPoint {
metric: String,
ts_millis: i64,
@@ -115,6 +115,10 @@ impl DataPoint {
&self.tags
}
pub fn tags_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.tags
}
pub fn ts_millis(&self) -> i64 {
self.ts_millis
}

View File

@@ -94,7 +94,7 @@ impl<S: AsyncWrite + AsyncRead + Unpin> Handler<S> {
match DataPoint::try_create(&line) {
Ok(data_point) => {
let _timer = timer!(crate::metrics::METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED);
let result = self.query_handler.exec(&data_point, ctx.clone()).await;
let result = self.query_handler.exec(vec![data_point], ctx.clone()).await;
if let Err(e) = result {
self.connection.write_line(e.output_msg()).await?;
}
@@ -128,8 +128,8 @@ mod tests {
#[async_trait]
impl OpentsdbProtocolHandler for DummyQueryHandler {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
let metric = data_point.metric();
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
let metric = data_points.first().unwrap().metric();
if metric == "should_failed" {
return error::InternalSnafu {
err_msg: "expected",
@@ -137,7 +137,7 @@ mod tests {
.fail();
}
self.tx.send(metric.to_string()).await.unwrap();
Ok(())
Ok(data_points.len())
}
}

View File

@@ -74,7 +74,7 @@ pub trait InfluxdbLineProtocolHandler {
pub trait OpentsdbProtocolHandler {
/// A successful request will not return a response.
/// Only on error will the socket return a line of data.
async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> Result<()>;
async fn exec(&self, data_points: Vec<DataPoint>, ctx: QueryContextRef) -> Result<usize>;
}
pub struct PromStoreResponse {

View File

@@ -51,7 +51,8 @@ impl GrpcQueryHandler for DummyInstance {
#[async_trait]
impl OpentsdbProtocolHandler for DummyInstance {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
let data_point = data_points.first().unwrap();
if data_point.metric() == "should_failed" {
return error::InternalSnafu {
err_msg: "expected",
@@ -59,7 +60,7 @@ impl OpentsdbProtocolHandler for DummyInstance {
.fail();
}
let _ = self.tx.send(data_point.metric().to_string()).await;
Ok(())
Ok(data_points.len())
}
}
@@ -172,10 +173,7 @@ async fn test_opentsdb_put() {
while let Ok(s) = rx.try_recv() {
metrics.push(s);
}
assert_eq!(
metrics,
vec!["m1".to_string(), "m2".to_string(), "m3".to_string()]
);
assert_eq!(metrics, vec!["m1".to_string(), "m2".to_string()]);
}
#[tokio::test]

View File

@@ -37,8 +37,8 @@ struct DummyOpentsdbInstance {
#[async_trait]
impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
let metric = data_point.metric();
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
let metric = data_points.first().unwrap().metric();
if metric == "should_failed" {
return server_error::InternalSnafu {
err_msg: "expected",
@@ -47,7 +47,7 @@ impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
}
let i = metric.parse::<i32>().unwrap();
let _ = self.tx.send(i * i).await;
Ok(())
Ok(data_points.len())
}
}

View File

@@ -46,6 +46,8 @@ mod tests {
async fn test_exec(instance: &Arc<Instance>) {
let ctx = QueryContext::arc();
// should create new table "my_metric_1" directly
let data_point1 = DataPoint::new(
"my_metric_1".to_string(),
1000,
@@ -55,9 +57,8 @@ mod tests {
("tagk2".to_string(), "tagv2".to_string()),
],
);
// should create new table "my_metric_1" directly
instance.exec(&data_point1, ctx.clone()).await.unwrap();
// should create new column "tagk3" directly
let data_point2 = DataPoint::new(
"my_metric_1".to_string(),
2000,
@@ -67,12 +68,12 @@ mod tests {
("tagk3".to_string(), "tagv3".to_string()),
],
);
// should create new column "tagk3" directly
instance.exec(&data_point2, ctx.clone()).await.unwrap();
let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
// should handle null tags properly
instance.exec(&data_point3, ctx.clone()).await.unwrap();
let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
let data_points = vec![data_point1, data_point2, data_point3];
instance.exec(data_points, ctx.clone()).await.unwrap();
let output = instance
.do_query(
@@ -87,13 +88,13 @@ mod tests {
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let pretty_print = recordbatches.pretty_print().unwrap();
let expected = vec![
"+---------------------+----------------+-------+-------+-------+",
"| greptime_timestamp | greptime_value | tagk1 | tagk2 | tagk3 |",
"+---------------------+----------------+-------+-------+-------+",
"| 1970-01-01T00:00:01 | 1.0 | tagv1 | tagv2 | |",
"| 1970-01-01T00:00:02 | 2.0 | | tagv2 | tagv3 |",
"| 1970-01-01T00:00:03 | 3.0 | | | |",
"+---------------------+----------------+-------+-------+-------+",
"+-------+-------+----------------+---------------------+-------+",
"| tagk1 | tagk2 | greptime_value | greptime_timestamp | tagk3 |",
"+-------+-------+----------------+---------------------+-------+",
"| tagv1 | tagv2 | 1.0 | 1970-01-01T00:00:01 | |",
"| | tagv2 | 2.0 | 1970-01-01T00:00:02 | tagv3 |",
"| | | 3.0 | 1970-01-01T00:00:03 | |",
"+-------+-------+----------------+---------------------+-------+",
]
.into_iter()
.join("\n");

View File

@@ -4,12 +4,11 @@ require_lease_before_startup = true
rpc_addr = '127.0.0.1:4100'
rpc_hostname = '127.0.0.1'
rpc_runtime_size = 8
require_lease_before_startup = true
[wal]
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '50GB'
purge_threshold = '10GB'
read_batch_size = 128
sync_write = false

View File

@@ -5,7 +5,7 @@ require_lease_before_startup = true
[wal]
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '50GB'
purge_threshold = '10GB'
read_batch_size = 128
sync_write = false

View File

@@ -235,6 +235,8 @@ impl Env {
args.push(format!("--http-addr=127.0.0.1:430{id}"));
args.push(format!("--data-home={}", data_home.display()));
args.push(format!("--node-id={id}"));
args.push("-c".to_string());
args.push(self.generate_config_file(subcommand, db_ctx));
args.push("--metasrv-addr=127.0.0.1:3002".to_string());
(args, format!("127.0.0.1:410{id}"))
}