feat: accept influxdb request without timestamp even if table doesn't exist (#2041)

* feat: accept influxdb request without timestamp even if table doesn't exist

* refactor: InsertRequests::try_from

* feat: check row number
This commit is contained in:
Niwaka
2023-07-31 11:55:09 +09:00
committed by GitHub
parent fc6ebf58b4
commit 695398652c
3 changed files with 93 additions and 8 deletions

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::collections::HashMap;
use std::fmt::Display;
use api::helper::values_with_capacity;
use api::v1::column::SemanticType;
@@ -246,6 +247,19 @@ pub enum Precision {
Hour,
}
impl Display for Precision {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Precision::Nanosecond => write!(f, "Precision::Nanosecond"),
Precision::Microsecond => write!(f, "Precision::Microsecond"),
Precision::Millisecond => write!(f, "Precision::Millisecond"),
Precision::Second => write!(f, "Precision::Second"),
Precision::Minute => write!(f, "Precision::Minute"),
Precision::Hour => write!(f, "Precision::Hour"),
}
}
}
#[cfg(test)]
mod tests {
use api::v1::column::SemanticType;

View File

@@ -16,10 +16,12 @@ use std::collections::HashMap;
use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests};
use common_grpc::writer::{LinesWriter, Precision};
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use influxdb_line_protocol::{parse_lines, FieldValue};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu};
use crate::error::{Error, InfluxdbLineProtocolSnafu, InfluxdbLinesWriteSnafu, TimePrecisionSnafu};
pub const INFLUXDB_TIMESTAMP_COLUMN_NAME: &str = "ts";
pub const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;
@@ -90,16 +92,36 @@ impl TryFrom<&InfluxdbRequest> for InsertRequests {
}
if let Some(timestamp) = line.timestamp {
let precision = if let Some(val) = &value.precision {
*val
} else {
DEFAULT_TIME_PRECISION
};
let precision = unwarp_or_default_precision(value.precision);
writer
.write_ts(INFLUXDB_TIMESTAMP_COLUMN_NAME, (timestamp, precision))
.context(InfluxdbLinesWriteSnafu)?;
} else {
let precision = unwarp_or_default_precision(value.precision);
let timestamp = Timestamp::current_millis();
let unit = match precision {
Precision::Second => TimeUnit::Second,
Precision::Millisecond => TimeUnit::Millisecond,
Precision::Microsecond => TimeUnit::Microsecond,
Precision::Nanosecond => TimeUnit::Nanosecond,
_ => {
return Err(Error::NotSupported {
feat: format!("convert {precision} into TimeUnit"),
})
}
};
let timestamp = timestamp
.convert_to(unit)
.with_context(|| TimePrecisionSnafu {
name: precision.to_string(),
})?;
writer
.write_ts(
INFLUXDB_TIMESTAMP_COLUMN_NAME,
(timestamp.into(), precision),
)
.context(InfluxdbLinesWriteSnafu)?;
}
writer.commit();
}
@@ -119,6 +141,14 @@ impl TryFrom<&InfluxdbRequest> for InsertRequests {
}
}
fn unwarp_or_default_precision(precision: Option<Precision>) -> Precision {
if let Some(val) = precision {
val
} else {
DEFAULT_TIME_PRECISION
}
}
#[cfg(test)]
mod tests {
use api::v1::column::{SemanticType, Values};

View File

@@ -42,6 +42,47 @@ mod test {
test_put_influxdb_lines(&instance.frontend()).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_put_influxdb_lines_without_time_column() {
let standalone =
tests::create_standalone_instance("test_standalone_put_influxdb_lines").await;
test_put_influxdb_lines_without_time_column(&standalone.instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_put_influxdb_lines_without_time_column() {
let instance =
tests::create_distributed_instance("test_distributed_put_influxdb_lines").await;
test_put_influxdb_lines_without_time_column(&instance.frontend()).await;
}
async fn test_put_influxdb_lines_without_time_column(instance: &Arc<Instance>) {
let lines = r"
monitor1,host=host1 cpu=66.6,memory=1024
monitor1,host=host2 memory=1027";
let request = InfluxdbRequest {
precision: None,
lines: lines.to_string(),
};
assert!(instance.exec(&request, QueryContext::arc()).await.is_ok());
let mut output = instance
.do_query(
"SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts",
QueryContext::arc(),
)
.await;
let output = output.remove(0).unwrap();
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let recordbatches: Vec<_> = recordbatches.iter().collect();
let total = recordbatches
.into_iter()
.fold(0, |total, recordbatch| total + recordbatch.num_rows());
assert_eq!(total, 2);
}
async fn test_put_influxdb_lines(instance: &Arc<Instance>) {
let lines = r"
monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100