Compare commits

..

3 Commits

Author SHA1 Message Date
shuiyisong
3b701d8f5e test: more on processors (#4493)
* test: add date test

* test: add epoch test

* test: add letter test and complete some others

* test: add urlencoding test

* chore: typo
2024-08-04 08:29:31 +00:00
Weny Xu
cb4cffe636 chore: bump opendal version to 0.48 (#4499) 2024-08-04 00:46:04 +00:00
Ruihang Xia
cc7f33c90c fix(tql): avoid unwrap on parsing tql query (#4502)
* fix(tql): avoid unwrap on parsing tql query

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add unit test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-08-03 20:58:53 +00:00
39 changed files with 1079 additions and 161 deletions

26
Cargo.lock generated
View File

@@ -6967,9 +6967,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "opendal"
version = "0.47.2"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff159a2da374ef2d64848a6547943cf1af7d2ceada5ae77be175e1389aa07ae3"
checksum = "615d41187deea0ea7fab5b48e9afef6ae8fc742fdcfa248846ee3d92ff71e986"
dependencies = [
"anyhow",
"async-trait",
@@ -6986,7 +6986,7 @@ dependencies = [
"md-5",
"once_cell",
"percent-encoding",
"quick-xml 0.31.0",
"quick-xml 0.36.1",
"reqsign",
"reqwest",
"serde",
@@ -8605,9 +8605,19 @@ dependencies = [
[[package]]
name = "quick-xml"
version = "0.31.0"
version = "0.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33"
checksum = "86e446ed58cef1bbfe847bc2fda0e2e4ea9f0e57b90c507d4781292590d72a4e"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "quick-xml"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc"
dependencies = [
"memchr",
"serde",
@@ -8883,9 +8893,9 @@ dependencies = [
[[package]]
name = "reqsign"
version = "0.15.2"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70fe66d4cd0b5ed9b1abbfe639bf6baeaaf509f7da2d51b31111ba945be59286"
checksum = "03dd4ba7c3901dd43e6b8c7446a760d45bc1ea4301002e1a6fa48f97c3a796fa"
dependencies = [
"anyhow",
"async-trait",
@@ -8901,7 +8911,7 @@ dependencies = [
"log",
"once_cell",
"percent-encoding",
"quick-xml 0.31.0",
"quick-xml 0.35.0",
"rand",
"reqwest",
"rsa 0.9.6",

View File

@@ -19,9 +19,8 @@ use snafu::ResultExt;
use crate::error::{BuildBackendSnafu, Result};
pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
let mut builder = Fs::default();
let _ = builder.root(root);
let object_store = ObjectStore::new(builder)
let builder = Fs::default();
let object_store = ObjectStore::new(builder.root(root))
.context(BuildBackendSnafu)?
.layer(
object_store::layers::LoggingLayer::default()

View File

@@ -44,28 +44,26 @@ pub fn build_s3_backend(
path: &str,
connection: &HashMap<String, String>,
) -> Result<ObjectStore> {
let mut builder = S3::default();
let _ = builder.root(path).bucket(host);
let mut builder = S3::default().root(path).bucket(host);
if let Some(endpoint) = connection.get(ENDPOINT) {
let _ = builder.endpoint(endpoint);
builder = builder.endpoint(endpoint);
}
if let Some(region) = connection.get(REGION) {
let _ = builder.region(region);
builder = builder.region(region);
}
if let Some(key_id) = connection.get(ACCESS_KEY_ID) {
let _ = builder.access_key_id(key_id);
builder = builder.access_key_id(key_id);
}
if let Some(key) = connection.get(SECRET_ACCESS_KEY) {
let _ = builder.secret_access_key(key);
builder = builder.secret_access_key(key);
}
if let Some(session_token) = connection.get(SESSION_TOKEN) {
let _ = builder.security_token(session_token);
builder = builder.session_token(session_token);
}
if let Some(enable_str) = connection.get(ENABLE_VIRTUAL_HOST_STYLE) {
@@ -79,7 +77,7 @@ pub fn build_s3_backend(
.build()
})?;
if enable {
let _ = builder.enable_virtual_host_style();
builder = builder.enable_virtual_host_style();
}
}

View File

@@ -47,19 +47,15 @@ pub fn format_schema(schema: Schema) -> Vec<String> {
}
pub fn test_store(root: &str) -> ObjectStore {
let mut builder = Fs::default();
let _ = builder.root(root);
ObjectStore::new(builder).unwrap().finish()
let builder = Fs::default();
ObjectStore::new(builder.root(root)).unwrap().finish()
}
pub fn test_tmp_store(root: &str) -> (ObjectStore, TempDir) {
let dir = create_temp_dir(root);
let mut builder = Fs::default();
let _ = builder.root("/");
(ObjectStore::new(builder).unwrap().finish(), dir)
let builder = Fs::default();
(ObjectStore::new(builder.root("/")).unwrap().finish(), dir)
}
pub fn test_basic_schema() -> SchemaRef {

View File

@@ -680,9 +680,8 @@ pub(crate) mod test_util {
pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
let store_dir = dir.path().to_str().unwrap();
let mut builder = Builder::default();
let _ = builder.root(store_dir);
ObjectStore::new(builder).unwrap().finish()
let builder = Builder::default();
ObjectStore::new(builder.root(store_dir)).unwrap().finish()
}
}

View File

@@ -361,8 +361,7 @@ mod tests {
fn procedure_store_for_test(dir: &TempDir) -> ProcedureStore {
let store_dir = dir.path().to_str().unwrap();
let mut builder = Builder::default();
let _ = builder.root(store_dir);
let builder = Builder::default().root(store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
ProcedureStore::from_object_store(object_store)

View File

@@ -220,8 +220,7 @@ mod tests {
async fn test_object_state_store() {
let dir = create_temp_dir("state_store");
let store_dir = dir.path().to_str().unwrap();
let mut builder = Builder::default();
let _ = builder.root(store_dir);
let builder = Builder::default().root(store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let state_store = ObjectStateStore::new(object_store);
@@ -291,8 +290,7 @@ mod tests {
async fn test_object_state_store_delete() {
let dir = create_temp_dir("state_store_list");
let store_dir = dir.path().to_str().unwrap();
let mut builder = Builder::default();
let _ = builder.root(store_dir);
let builder = Builder::default().root(store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let state_store = ObjectStateStore::new(object_store);

View File

@@ -112,11 +112,11 @@ async fn create_object_store_with_cache(
let atomic_temp_dir = join_dir(path, ".tmp/");
clean_temp_dir(&atomic_temp_dir)?;
let cache_store = {
let mut builder = Fs::default();
builder.root(path).atomic_write_dir(&atomic_temp_dir);
builder.build().context(error::InitBackendSnafu)?
};
let cache_store = Fs::default()
.root(path)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::InitBackendSnafu)?;
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.await

View File

@@ -30,8 +30,7 @@ pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Res
azblob_config.container, &root
);
let mut builder = Azblob::default();
let _ = builder
let mut builder = Azblob::default()
.root(&root)
.container(&azblob_config.container)
.endpoint(&azblob_config.endpoint)
@@ -40,8 +39,8 @@ pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Res
.http_client(build_http_client()?);
if let Some(token) = &azblob_config.sas_token {
let _ = builder.sas_token(token);
}
builder = builder.sas_token(token);
};
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?

View File

@@ -35,8 +35,9 @@ pub(crate) async fn new_fs_object_store(
let atomic_write_dir = join_dir(data_home, ".tmp/");
store::clean_temp_dir(&atomic_write_dir)?;
let mut builder = Fs::default();
let _ = builder.root(data_home).atomic_write_dir(&atomic_write_dir);
let builder = Fs::default()
.root(data_home)
.atomic_write_dir(&atomic_write_dir);
let object_store = ObjectStore::new(builder)
.context(error::InitBackendSnafu)?

View File

@@ -29,8 +29,7 @@ pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<Objec
gcs_config.bucket, &root
);
let mut builder = Gcs::default();
builder
let builder = Gcs::default()
.root(&root)
.bucket(&gcs_config.bucket)
.scope(&gcs_config.scope)

View File

@@ -29,8 +29,7 @@ pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result<Objec
oss_config.bucket, &root
);
let mut builder = Oss::default();
let _ = builder
let builder = Oss::default()
.root(&root)
.bucket(&oss_config.bucket)
.endpoint(&oss_config.endpoint)

View File

@@ -30,8 +30,7 @@ pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectSt
s3_config.bucket, &root
);
let mut builder = S3::default();
let _ = builder
let mut builder = S3::default()
.root(&root)
.bucket(&s3_config.bucket)
.access_key_id(s3_config.access_key_id.expose_secret())
@@ -39,11 +38,11 @@ pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectSt
.http_client(build_http_client()?);
if s3_config.endpoint.is_some() {
let _ = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
}
builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
};
if s3_config.region.is_some() {
let _ = builder.region(s3_config.region.as_ref().unwrap());
}
builder = builder.region(s3_config.region.as_ref().unwrap());
};
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?

View File

@@ -26,8 +26,7 @@ use store_api::metadata::ColumnMetadata;
pub fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) {
let dir = create_temp_dir(prefix);
let store_dir = dir.path().to_string_lossy();
let mut builder = Fs::default();
let _ = builder.root(&store_dir);
let builder = Fs::default().root(&store_dir);
(dir, ObjectStore::new(builder).unwrap().finish())
}

View File

@@ -307,8 +307,7 @@ mod test {
env.init_metric_region().await;
let region_id = to_metadata_region_id(env.default_physical_region_id());
let mut builder = Fs::default();
builder.root(&env.data_home());
let builder = Fs::default().root(&env.data_home());
let object_store = ObjectStore::new(builder).unwrap().finish();
let region_dir = "test_metric_region";

View File

@@ -212,8 +212,7 @@ pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
let atomic_write_dir = join_dir(root, ".tmp/");
clean_dir(&atomic_write_dir).await?;
let mut builder = Fs::default();
builder.root(root).atomic_write_dir(&atomic_write_dir);
let builder = Fs::default().root(root).atomic_write_dir(&atomic_write_dir);
let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
Ok(with_instrument_layers(store, false))

View File

@@ -382,8 +382,7 @@ mod tests {
use super::*;
fn new_fs_store(path: &str) -> ObjectStore {
let mut builder = Fs::default();
builder.root(path);
let builder = Fs::default().root(path);
ObjectStore::new(builder).unwrap().finish()
}

View File

@@ -46,7 +46,6 @@ fn parquet_file_data() -> Vec<u8> {
}
pub(crate) fn new_fs_store(path: &str) -> ObjectStore {
let mut builder = Fs::default();
builder.root(path);
ObjectStore::new(builder).unwrap().finish()
let builder = Fs::default();
ObjectStore::new(builder.root(path)).unwrap().finish()
}

View File

@@ -642,8 +642,7 @@ mod tests {
fn new_test_manifest_store() -> ManifestObjectStore {
common_telemetry::init_default_ut_logging();
let tmp_dir = create_temp_dir("test_manifest_log_store");
let mut builder = Fs::default();
let _ = builder.root(&tmp_dir.path().to_string_lossy());
let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();
ManifestObjectStore::new(
"/",

View File

@@ -114,8 +114,7 @@ mod tests {
let dir = create_temp_dir("file-purge");
let dir_path = dir.path().display().to_string();
let mut builder = Fs::default();
builder.root(&dir_path);
let builder = Fs::default().root(&dir_path);
let sst_file_id = FileId::random();
let sst_dir = "table1";
let path = location::sst_file_path(sst_dir, sst_file_id);
@@ -171,8 +170,7 @@ mod tests {
let dir = create_temp_dir("file-purge");
let dir_path = dir.path().display().to_string();
let mut builder = Fs::default();
builder.root(&dir_path);
let builder = Fs::default().root(&dir_path);
let sst_file_id = FileId::random();
let sst_dir = "table1";

View File

@@ -365,8 +365,7 @@ impl TestEnv {
.display()
.to_string();
let mut builder = Fs::default();
builder.root(&data_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let object_store = ObjectStore::new(builder.root(&data_path)).unwrap().finish();
object_store_manager.add(storage_name, object_store);
}
let object_store_manager = Arc::new(object_store_manager);
@@ -553,8 +552,7 @@ impl TestEnv {
fn create_object_store_manager(&self) -> ObjectStoreManager {
let data_home = self.data_home.path();
let data_path = data_home.join("data").as_path().display().to_string();
let mut builder = Fs::default();
builder.root(&data_path);
let builder = Fs::default().root(&data_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
ObjectStoreManager::new("default", object_store)
}
@@ -570,9 +568,10 @@ impl TestEnv {
let data_home = self.data_home.path();
let manifest_dir = data_home.join("manifest").as_path().display().to_string();
let mut builder = Fs::default();
builder.root(&manifest_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let builder = Fs::default();
let object_store = ObjectStore::new(builder.root(&manifest_dir))
.unwrap()
.finish();
// The "manifest_dir" here should be the relative path from the `object_store`'s root.
// Otherwise the OpenDal's list operation would fail with "StripPrefixError". This is

View File

@@ -52,8 +52,7 @@ impl SchedulerEnv {
pub(crate) async fn new() -> SchedulerEnv {
let path = create_temp_dir("");
let path_str = path.path().display().to_string();
let mut builder = Fs::default();
builder.root(&path_str);
let builder = Fs::default().root(&path_str);
let index_aux_path = path.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None)

View File

@@ -17,7 +17,7 @@ futures.workspace = true
lazy_static.workspace = true
md5 = "0.7"
moka = { workspace = true, features = ["future"] }
opendal = { version = "0.47", features = [
opendal = { version = "0.48", features = [
"layers-tracing",
"services-azblob",
"services-fs",

View File

@@ -25,12 +25,19 @@ use common_telemetry::info;
use read_cache::ReadCache;
/// An opendal layer with local LRU file cache supporting.
#[derive(Clone)]
pub struct LruCacheLayer<C: Access> {
// The read cache
read_cache: ReadCache<C>,
}
impl<C: Access> Clone for LruCacheLayer<C> {
fn clone(&self) -> Self {
Self {
read_cache: self.read_cache.clone(),
}
}
}
impl<C: Access> LruCacheLayer<C> {
/// Create a `[LruCacheLayer]` with local file cache and capacity in bytes.
pub async fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {

View File

@@ -552,11 +552,12 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
}
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let bytes = bs.len();
match self.inner.write(bs).await {
Ok(n) => {
self.bytes += n as u64;
Ok(n)
Ok(_) => {
self.bytes += bytes as u64;
Ok(())
}
Err(err) => {
increment_errors_total(self.op, err.kind());
@@ -581,12 +582,12 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let bytes = bs.len();
self.inner
.write(bs)
.map(|n| {
self.bytes += n as u64;
n
.map(|_| {
self.bytes += bytes as u64;
})
.map_err(|err| {
increment_errors_total(self.op, err.kind());

View File

@@ -61,8 +61,7 @@ mod tests {
fn new_object_store(dir: &TempDir) -> ObjectStore {
let store_dir = dir.path().to_str().unwrap();
let mut builder = Builder::default();
let _ = builder.root(store_dir);
let builder = Builder::default().root(store_dir);
ObjectStore::new(builder).unwrap().finish()
}

View File

@@ -95,8 +95,7 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> {
async fn test_fs_backend() -> Result<()> {
let data_dir = create_temp_dir("test_fs_backend");
let tmp_dir = create_temp_dir("test_fs_backend");
let mut builder = Fs::default();
let _ = builder
let builder = Fs::default()
.root(&data_dir.path().to_string_lossy())
.atomic_write_dir(&tmp_dir.path().to_string_lossy());
@@ -117,8 +116,7 @@ async fn test_s3_backend() -> Result<()> {
let root = uuid::Uuid::new_v4().to_string();
let mut builder = S3::default();
let _ = builder
let builder = S3::default()
.root(&root)
.access_key_id(&env::var("GT_S3_ACCESS_KEY_ID")?)
.secret_access_key(&env::var("GT_S3_ACCESS_KEY")?)
@@ -146,8 +144,7 @@ async fn test_oss_backend() -> Result<()> {
let root = uuid::Uuid::new_v4().to_string();
let mut builder = Oss::default();
let _ = builder
let builder = Oss::default()
.root(&root)
.access_key_id(&env::var("GT_OSS_ACCESS_KEY_ID")?)
.access_key_secret(&env::var("GT_OSS_ACCESS_KEY")?)
@@ -174,8 +171,7 @@ async fn test_azblob_backend() -> Result<()> {
let root = uuid::Uuid::new_v4().to_string();
let mut builder = Azblob::default();
let _ = builder
let builder = Azblob::default()
.root(&root)
.account_name(&env::var("GT_AZBLOB_ACCOUNT_NAME")?)
.account_key(&env::var("GT_AZBLOB_ACCOUNT_KEY")?)
@@ -199,8 +195,7 @@ async fn test_gcs_backend() -> Result<()> {
if !container.is_empty() {
info!("Running azblob test.");
let mut builder = Gcs::default();
builder
let builder = Gcs::default()
.root(&uuid::Uuid::new_v4().to_string())
.bucket(&env::var("GT_GCS_BUCKET").unwrap())
.scope(&env::var("GT_GCS_SCOPE").unwrap())
@@ -224,8 +219,7 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
let data_dir = create_temp_dir("test_file_backend_with_lru_cache");
let tmp_dir = create_temp_dir("test_file_backend_with_lru_cache");
let mut builder = Fs::default();
let _ = builder
let builder = Fs::default()
.root(&data_dir.path().to_string_lossy())
.atomic_write_dir(&tmp_dir.path().to_string_lossy());
@@ -233,8 +227,7 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
let cache_dir = create_temp_dir("test_file_backend_with_lru_cache");
let cache_layer = {
let mut builder = Fs::default();
let _ = builder
let builder = Fs::default()
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&cache_dir.path().to_string_lossy());
let file_cache = Arc::new(builder.build().unwrap());
@@ -307,8 +300,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
// create file cache layer
let cache_dir = create_temp_dir("test_object_store_cache_policy_cache");
let atomic_temp_dir = create_temp_dir("test_object_store_cache_policy_cache_tmp");
let mut builder = Fs::default();
let _ = builder
let builder = Fs::default()
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&atomic_temp_dir.path().to_string_lossy());
let file_cache = Arc::new(builder.build().unwrap());

View File

@@ -244,8 +244,7 @@ mod tests {
async fn test_list_files_and_parse_table_name() {
let dir = common_test_util::temp_dir::create_temp_dir("test_list_files_to_copy");
let store_dir = normalize_dir(dir.path().to_str().unwrap());
let mut builder = Fs::default();
let _ = builder.root(&store_dir);
let builder = Fs::default().root(&store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
object_store.write("a.parquet", "").await.unwrap();
object_store.write("b.parquet", "").await.unwrap();

138
src/pipeline/tests/date.rs Normal file
View File

@@ -0,0 +1,138 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
use api::v1::ColumnSchema;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
use lazy_static::lazy_static;
const TEST_INPUT: &str = r#"
{
"input_str": "2024-06-27T06:13:36.991Z"
}"#;
const TEST_VALUE: Option<ValueData> =
Some(ValueData::TimestampNanosecondValue(1719468816991000000));
lazy_static! {
static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
common::make_column_schema(
"ts".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
}
#[test]
fn test_parse_date() {
let pipeline_yaml = r#"
processors:
- date:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S%.3fZ"
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE);
}
#[test]
fn test_multi_formats() {
let pipeline_yaml = r#"
processors:
- date:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%S%.3fZ"
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, TEST_VALUE);
}
#[test]
fn test_ignore_missing() {
let empty_input = r#"{}"#;
let pipeline_yaml = r#"
processors:
- date:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%S%.3fZ"
ignore_missing: true
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(empty_input, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, None);
}
#[test]
fn test_timezone() {
let pipeline_yaml = r#"
processors:
- date:
fields:
- input_str
formats:
- "%Y-%m-%dT%H:%M:%S"
- "%Y-%m-%dT%H:%M:%S%.3fZ"
ignore_missing: true
timezone: 'Asia/Shanghai'
transform:
- fields:
- input_str, ts
type: time
"#;
let output = common::parse_and_exec(TEST_INPUT, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampNanosecondValue(1719440016991000000))
);
}

View File

@@ -17,6 +17,10 @@ mod common;
use greptime_proto::v1::value::ValueData::StringValue;
use greptime_proto::v1::{ColumnDataType, SemanticType};
fn make_string_column_schema(name: String) -> greptime_proto::v1::ColumnSchema {
common::make_column_schema(name, ColumnDataType::String, SemanticType::Field)
}
#[test]
fn test_dissect_pattern() {
let input_value_str = r#"
@@ -43,8 +47,8 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field),
common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field),
make_string_column_schema("a".to_string()),
make_string_column_schema("b".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
@@ -91,8 +95,8 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field),
common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field),
make_string_column_schema("a".to_string()),
make_string_column_schema("b".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
@@ -111,3 +115,141 @@ transform:
Some(StringValue("456".to_string()))
);
}
#[test]
fn test_ignore_missing() {
let empty_str = r#"{}"#;
let pipeline_yaml = r#"
processors:
- dissect:
field: str
patterns:
- "%{a} %{b}"
ignore_missing: true
transform:
- fields:
- a
- b
type: string
"#;
let output = common::parse_and_exec(empty_str, pipeline_yaml);
let expected_schema = vec![
make_string_column_schema("a".to_string()),
make_string_column_schema("b".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
assert_eq!(output.rows[0].values[0].value_data, None);
assert_eq!(output.rows[0].values[1].value_data, None);
}
#[test]
fn test_modifier() {
let empty_str = r#"
{
"str": "key1 key2 key3 key4 key5 key6 key7 key8"
}"#;
let pipeline_yaml = r#"
processors:
- dissect:
field: str
patterns:
- "%{key1} %{key2} %{+key3} %{+key3/2} %{key5->} %{?key6} %{*key_7} %{&key_7}"
transform:
- fields:
- key1
- key2
- key3
- key5
- key7
type: string
"#;
let output = common::parse_and_exec(empty_str, pipeline_yaml);
let expected_schema = vec![
make_string_column_schema("key1".to_string()),
make_string_column_schema("key2".to_string()),
make_string_column_schema("key3".to_string()),
make_string_column_schema("key5".to_string()),
make_string_column_schema("key7".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("key1".to_string()))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(StringValue("key2".to_string()))
);
assert_eq!(
output.rows[0].values[2].value_data,
Some(StringValue("key3 key4".to_string()))
);
assert_eq!(
output.rows[0].values[3].value_data,
Some(StringValue("key5".to_string()))
);
assert_eq!(
output.rows[0].values[4].value_data,
Some(StringValue("key8".to_string()))
);
}
#[test]
fn test_append_separator() {
let empty_str = r#"
{
"str": "key1 key2"
}"#;
let pipeline_yaml = r#"
processors:
- dissect:
field: str
patterns:
- "%{+key1} %{+key1}"
append_separator: "_"
transform:
- fields:
- key1
type: string
"#;
let output = common::parse_and_exec(empty_str, pipeline_yaml);
let expected_schema = vec![
make_string_column_schema("key1".to_string()),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("key1_key2".to_string()))
);
}

255
src/pipeline/tests/epoch.rs Normal file
View File

@@ -0,0 +1,255 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
use api::v1::ColumnSchema;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
#[test]
fn test_parse_epoch() {
let test_input = r#"
{
"input_s": "1722580862",
"input_sec": "1722580862",
"input_second": "1722580862",
"input_ms": "1722580887794",
"input_millisecond": "1722580887794",
"input_milli": "1722580887794",
"input_default": "1722580887794",
"input_us": "1722580905423969",
"input_microsecond": "1722580905423969",
"input_micro": "1722580905423969",
"input_ns": "1722580929863842048",
"input_nanosecond": "1722580929863842048",
"input_nano": "1722580929863842048"
}"#;
let pipeline_yaml = r#"
processors:
- epoch:
field: input_s
resolution: s
- epoch:
field: input_sec
resolution: sec
- epoch:
field: input_second
resolution: second
- epoch:
field: input_ms
resolution: ms
- epoch:
field: input_millisecond
resolution: millisecond
- epoch:
field: input_milli
resolution: milli
- epoch:
field: input_default
- epoch:
field: input_us
resolution: us
- epoch:
field: input_microsecond
resolution: microsecond
- epoch:
field: input_micro
resolution: micro
- epoch:
field: input_ns
resolution: ns
- epoch:
field: input_nanosecond
resolution: nanosecond
- epoch:
field: input_nano
resolution: nano
transform:
- field: input_s
type: epoch, s
- field: input_sec
type: epoch, sec
- field: input_second
type: epoch, second
- field: input_ms
type: epoch, ms
- field: input_millisecond
type: epoch, millisecond
- field: input_milli
type: epoch, milli
- field: input_default
type: epoch, milli
- field: input_us
type: epoch, us
- field: input_microsecond
type: epoch, microsecond
- field: input_micro
type: epoch, micro
- field: input_ns
type: epoch, ns
- field: input_nanosecond
type: epoch, nanosecond
- field: input_nano
type: epoch, nano
"#;
fn make_time_field(name: &str, datatype: ColumnDataType) -> ColumnSchema {
common::make_column_schema(name.to_string(), datatype, SemanticType::Field)
}
let expected_schema = vec![
make_time_field("input_s", ColumnDataType::TimestampSecond),
make_time_field("input_sec", ColumnDataType::TimestampSecond),
make_time_field("input_second", ColumnDataType::TimestampSecond),
make_time_field("input_ms", ColumnDataType::TimestampMillisecond),
make_time_field("input_millisecond", ColumnDataType::TimestampMillisecond),
make_time_field("input_milli", ColumnDataType::TimestampMillisecond),
make_time_field("input_default", ColumnDataType::TimestampMillisecond),
make_time_field("input_us", ColumnDataType::TimestampMicrosecond),
make_time_field("input_microsecond", ColumnDataType::TimestampMicrosecond),
make_time_field("input_micro", ColumnDataType::TimestampMicrosecond),
make_time_field("input_ns", ColumnDataType::TimestampNanosecond),
make_time_field("input_nanosecond", ColumnDataType::TimestampNanosecond),
make_time_field("input_nano", ColumnDataType::TimestampNanosecond),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
for i in 0..2 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampSecondValue(1722580862))
);
}
for i in 3..6 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampMillisecondValue(1722580887794))
);
}
for i in 7..9 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampMicrosecondValue(1722580905423969))
);
}
for i in 10..12 {
assert_eq!(
output.rows[0].values[i].value_data,
Some(ValueData::TimestampNanosecondValue(1722580929863842048))
);
}
}
#[test]
fn test_ignore_missing() {
let empty_input = r#"{}"#;
let pipeline_yaml = r#"
processors:
- epoch:
field: input_s
resolution: s
ignore_missing: true
transform:
- fields:
- input_s, ts
type: epoch, s
"#;
let expected_schema = vec![
common::make_column_schema(
"ts".to_string(),
ColumnDataType::TimestampSecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(empty_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(output.rows[0].values[0].value_data, None);
}
#[test]
fn test_default_wrong_resolution() {
let test_input = r#"
{
"input_s": "1722580862",
"input_nano": "1722583122284583936"
}"#;
let pipeline_yaml = r#"
processors:
- epoch:
fields:
- input_s
- input_nano
transform:
- fields:
- input_s
type: epoch, s
- fields:
- input_nano
type: epoch, nano
"#;
let expected_schema = vec![
common::make_column_schema(
"input_s".to_string(),
ColumnDataType::TimestampSecond,
SemanticType::Field,
),
common::make_column_schema(
"input_nano".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
// this is actually wrong
// TODO(shuiyisong): add check for type when converting epoch
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::TimestampMillisecondValue(1722580862))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(ValueData::TimestampMillisecondValue(1722583122284583936))
);
}

View File

@@ -61,3 +61,37 @@ transform:
Some(TimestampMillisecondValue(1573840000000))
);
}
#[test]
fn test_ignore_missing() {
let empty_string = r#"{}"#;
let pipeline_yaml = r#"
processors:
- gsub:
field: reqTimeSec
pattern: "\\."
replacement: ""
ignore_missing: true
- epoch:
field: reqTimeSec
resolution: millisecond
ignore_missing: true
transform:
- field: reqTimeSec
type: epoch, millisecond
index: timestamp
"#;
let output = common::parse_and_exec(empty_string, pipeline_yaml);
let expected_schema = vec![common::make_column_schema(
"reqTimeSec".to_string(),
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
)];
assert_eq!(output.schema, expected_schema);
assert_eq!(output.rows[0].values[0].value_data, None);
}

View File

@@ -117,3 +117,41 @@ fn test_float() {
Some(StringValue("1.1-1.2-1.3".to_string()))
);
}
#[test]
fn test_mix_type() {
let input_value_str = r#"
[
{
"join_test": [1, true, "a", 1.1]
}
]
"#;
let output = common::parse_and_exec(input_value_str, PIPELINE_YAML);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("1-true-a-1.1".to_string()))
);
}
#[test]
fn test_ignore_missing() {
let empty_string = r#"{}"#;
let pipeline_yaml = r#"
processors:
- join:
field: join_test
separator: "-"
ignore_missing: true
transform:
- field: join_test
type: string
"#;
let output = common::parse_and_exec(empty_string, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, None);
}

View File

@@ -0,0 +1,188 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
use api::v1::ColumnSchema;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
use lazy_static::lazy_static;
lazy_static! {
static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
common::make_column_schema(
"input_str".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
}
#[test]
fn test_upper() {
let test_input = r#"
{
"input_str": "aaa"
}"#;
let pipeline_yaml = r#"
processors:
- letter:
fields:
- input_str
method: upper
transform:
- fields:
- input_str
type: string
"#;
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::StringValue("AAA".to_string()))
);
}
#[test]
fn test_lower() {
let test_input = r#"
{
"input_str": "AAA"
}"#;
let pipeline_yaml = r#"
processors:
- letter:
fields:
- input_str
method: lower
transform:
- fields:
- input_str
type: string
"#;
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::StringValue("aaa".to_string()))
);
}
#[test]
fn test_capital() {
let test_input = r#"
{
"upper": "AAA",
"lower": "aaa"
}"#;
let pipeline_yaml = r#"
processors:
- letter:
fields:
- upper
- lower
method: capital
transform:
- fields:
- upper
- lower
type: string
"#;
let expected_schema = vec![
common::make_column_schema(
"upper".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"lower".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::StringValue("AAA".to_string()))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(ValueData::StringValue("Aaa".to_string()))
);
}
#[test]
fn test_ignore_missing() {
let test_input = r#"{}"#;
let pipeline_yaml = r#"
processors:
- letter:
fields:
- upper
- lower
method: capital
ignore_missing: true
transform:
- fields:
- upper
- lower
type: string
"#;
let expected_schema = vec![
common::make_column_schema(
"upper".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"lower".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(output.rows[0].values[0].value_data, None);
assert_eq!(output.rows[0].values[1].value_data, None);
}

View File

@@ -14,8 +14,25 @@
mod common;
use api::v1::ColumnSchema;
use greptime_proto::v1::value::ValueData::StringValue;
use greptime_proto::v1::{ColumnDataType, SemanticType};
use lazy_static::lazy_static;
lazy_static! {
static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
common::make_column_schema(
"str_id".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
}
#[test]
fn test_regex_pattern() {
@@ -41,20 +58,7 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
common::make_column_schema(
"str_id".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
@@ -87,23 +91,34 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
common::make_column_schema(
"str_id".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("123".to_string()))
);
}
#[test]
fn test_ignore_missing() {
let input_value_str = r#"{}"#;
let pipeline_yaml = r#"
processors:
- regex:
fields:
- str
pattern: "(?<id>\\d+)"
ignore_missing: true
transform:
- field: str_id
type: string
"#;
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
assert_eq!(output.schema, *EXPECTED_SCHEMA);
assert_eq!(output.rows[0].values[0].value_data, None);
}

View File

@@ -0,0 +1,112 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, SemanticType};
#[test]
fn test() {
let test_input = r#"
{
"encoding": "2024-06-27T06:13:36.991Z",
"decoding": "2024-06-27T06%3A13%3A36.991Z"
}"#;
let pipeline_yaml = r#"
processors:
- urlencoding:
field: encoding
method: encode
- urlencoding:
field: decoding
method: decode
transform:
- fields:
- encoding
- decoding
type: string
"#;
let expected_schema = vec![
common::make_column_schema(
"encoding".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"decoding".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(ValueData::StringValue(
"2024-06-27T06%3A13%3A36.991Z".to_string()
))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(ValueData::StringValue(
"2024-06-27T06:13:36.991Z".to_string()
))
);
}
#[test]
fn test_ignore_missing() {
let test_input = r#"{}"#;
let pipeline_yaml = r#"
processors:
- urlencoding:
field: encoding
method: encode
ignore_missing: true
transform:
- fields:
- encoding
type: string
"#;
let expected_schema = vec![
common::make_column_schema(
"encoding".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
let output = common::parse_and_exec(test_input, pipeline_yaml);
assert_eq!(output.schema, expected_schema);
assert_eq!(output.rows[0].values[0].value_data, None);
}

View File

@@ -31,4 +31,12 @@ pub enum TQLError {
#[snafu(display("Failed to evaluate TQL expression: {}", msg))]
Evaluation { msg: String },
#[snafu(display("Failed to convert TQL expression to logical expression"))]
ConvertToLogicalExpression {
#[snafu(source)]
error: Box<crate::error::Error>,
#[snafu(implicit)]
location: Location,
},
}

View File

@@ -31,6 +31,7 @@ const VERBOSE: &str = "VERBOSE";
use sqlparser::parser::Parser;
use super::error::ConvertToLogicalExpressionSnafu;
use crate::dialect::GreptimeDbDialect;
use crate::parsers::error::{EvaluationSnafu, ParserSnafu, TQLError};
@@ -182,7 +183,9 @@ impl<'a> ParserContext<'a> {
fn parse_tokens(tokens: Vec<Token>) -> std::result::Result<String, TQLError> {
let parser_expr = Self::parse_to_expr(tokens)?;
let lit = utils::parser_expr_to_scalar_value(parser_expr).unwrap();
let lit = utils::parser_expr_to_scalar_value(parser_expr)
.map_err(Box::new)
.context(ConvertToLogicalExpressionSnafu)?;
let second = match lit {
ScalarValue::TimestampNanosecond(ts_nanos, _)
@@ -270,6 +273,11 @@ mod tests {
}
_ => unreachable!(),
}
let sql = "TQL EVAL (now(), now()-'5m', '30s') http_requests_total";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(result.is_err());
}
#[test]

View File

@@ -164,8 +164,7 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te
..Default::default()
};
let mut builder = Gcs::default();
builder
let builder = Gcs::default()
.root(&gcs_config.root)
.bucket(&gcs_config.bucket)
.scope(&gcs_config.scope)
@@ -186,8 +185,7 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te
..Default::default()
};
let mut builder = Azblob::default();
let _ = builder
let mut builder = Azblob::default()
.root(&azblob_config.root)
.endpoint(&azblob_config.endpoint)
.account_name(azblob_config.account_name.expose_secret())
@@ -195,8 +193,8 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te
.container(&azblob_config.container);
if let Ok(sas_token) = env::var("GT_AZBLOB_SAS_TOKEN") {
let _ = builder.sas_token(&sas_token);
}
builder = builder.sas_token(&sas_token);
};
let config = ObjectStoreConfig::Azblob(azblob_config);
@@ -214,8 +212,7 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te
..Default::default()
};
let mut builder = Oss::default();
let _ = builder
let builder = Oss::default()
.root(&oss_config.root)
.endpoint(&oss_config.endpoint)
.access_key_id(oss_config.access_key_id.expose_secret())
@@ -235,19 +232,18 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te
s3_config.cache.cache_path = Some("/tmp/greptimedb_cache".to_string());
}
let mut builder = S3::default();
let _ = builder
let mut builder = S3::default()
.root(&s3_config.root)
.access_key_id(s3_config.access_key_id.expose_secret())
.secret_access_key(s3_config.secret_access_key.expose_secret())
.bucket(&s3_config.bucket);
if s3_config.endpoint.is_some() {
let _ = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
}
builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
};
if s3_config.region.is_some() {
let _ = builder.region(s3_config.region.as_ref().unwrap());
}
builder = builder.region(s3_config.region.as_ref().unwrap());
};
let config = ObjectStoreConfig::S3(s3_config);