From b6565eb9a5e29b59d87e2cfe0d1a36e7e0d344f9 Mon Sep 17 00:00:00 2001 From: Gilboab <97948000+GilboaAWS@users.noreply.github.com> Date: Tue, 7 Jan 2025 13:04:00 +0200 Subject: [PATCH 1/6] Enable refresh slots after reconnecting to initial nodes. (#2921) Does not throttle refresh slots after reconnecting to initial nodes, as this operation is kind of resetting the client, so it should let it discover the whole topology before moving on the handle requests Signed-off-by: GilboaAWS --- glide-core/redis-rs/redis/src/cluster_async/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 534fdd429e..17c983d551 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -1301,7 +1301,7 @@ where .extend_connection_map(connection_map); if let Err(err) = Self::refresh_slots_and_subscriptions_with_retries( inner.clone(), - &RefreshPolicy::Throttable, + &RefreshPolicy::NotThrottable, ) .await { From 578e6ec83a144d2000a595d3398abb5097b48fa2 Mon Sep 17 00:00:00 2001 From: eifrah-aws Date: Tue, 7 Jan 2025 14:19:08 +0200 Subject: [PATCH 2/6] Initial work on OpenTelemetry (#2892) --- glide-core/redis-rs/redis/src/cmd.rs | 21 + glide-core/src/socket_listener.rs | 9 +- glide-core/telemetry/Cargo.toml | 6 + glide-core/telemetry/src/lib.rs | 5 + glide-core/telemetry/src/open_telemetry.rs | 359 ++++++++++++++++++ .../src/open_telemetry_exporter_file.rs | 194 ++++++++++ 6 files changed, 592 insertions(+), 2 deletions(-) create mode 100644 glide-core/telemetry/src/open_telemetry.rs create mode 100644 glide-core/telemetry/src/open_telemetry_exporter_file.rs diff --git a/glide-core/redis-rs/redis/src/cmd.rs b/glide-core/redis-rs/redis/src/cmd.rs index 92e8aea989..8ebe9cf9c7 100644 --- a/glide-core/redis-rs/redis/src/cmd.rs +++ b/glide-core/redis-rs/redis/src/cmd.rs @@ -11,6 +11,7 @@ use std::{fmt, io}; use crate::connection::ConnectionLike; use crate::pipeline::Pipeline; use crate::types::{from_owned_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs}; +use telemetrylib::GlideSpan; /// An argument to a redis command #[derive(Clone)] @@ -30,6 +31,8 @@ pub struct Cmd { cursor: Option, // If it's true command's response won't be read from socket. Useful for Pub/Sub. no_response: bool, + /// The span associated with this command + span: Option, } /// Represents a redis iterator. @@ -321,6 +324,7 @@ impl Cmd { args: vec![], cursor: None, no_response: false, + span: None, } } @@ -331,6 +335,7 @@ impl Cmd { args: Vec::with_capacity(arg_count), cursor: None, no_response: false, + span: None, } } @@ -360,6 +365,16 @@ impl Cmd { self } + /// Associate a trackable span to the command. This allow tracking the lifetime + /// of the command. + /// + /// A span is used by an OpenTelemetry backend to track the lifetime of the command + #[inline] + pub fn with_span(&mut self, name: &str) -> &mut Cmd { + self.span = Some(telemetrylib::GlideOpenTelemetry::new_span(name)); + self + } + /// Works similar to `arg` but adds a cursor argument. This is always /// an integer and also flips the command implementation to support a /// different mode for the iterators where the iterator will ask for @@ -582,6 +597,12 @@ impl Cmd { pub fn is_no_response(&self) -> bool { self.no_response } + + /// Return this command span + #[inline] + pub fn span(&self) -> Option { + self.span.clone() + } } impl fmt::Debug for Cmd { diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index 4896f83565..9d137d21bf 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -302,10 +302,15 @@ async fn send_command( mut client: Client, routing: Option, ) -> ClientUsageResult { - client + let child_span = cmd.span().map(|span| span.add_span("send_command")); + let res = client .send_command(&cmd, routing) .await - .map_err(|err| err.into()) + .map_err(|err| err.into()); + if let Some(child_span) = child_span { + child_span.end(); + } + res } // Parse the cluster scan command parameters from protobuf and send the command to redis-rs. diff --git a/glide-core/telemetry/Cargo.toml b/glide-core/telemetry/Cargo.toml index 73b9cb25ea..b6bd004274 100644 --- a/glide-core/telemetry/Cargo.toml +++ b/glide-core/telemetry/Cargo.toml @@ -9,3 +9,9 @@ authors = ["Valkey GLIDE Maintainers"] lazy_static = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" +chrono = "0" +futures-util = "0" +tokio = { version = "1", features = ["macros", "time"] } + +opentelemetry = "0" +opentelemetry_sdk = { version = "0", features = ["rt-tokio"] } diff --git a/glide-core/telemetry/src/lib.rs b/glide-core/telemetry/src/lib.rs index 886e43a2c8..f0a938f5e8 100644 --- a/glide-core/telemetry/src/lib.rs +++ b/glide-core/telemetry/src/lib.rs @@ -1,6 +1,11 @@ use lazy_static::lazy_static; use serde::Serialize; use std::sync::RwLock as StdRwLock; +mod open_telemetry; +mod open_telemetry_exporter_file; + +pub use open_telemetry::{GlideOpenTelemetry, GlideSpan}; +pub use open_telemetry_exporter_file::SpanExporterFile; #[derive(Default, Serialize)] #[allow(dead_code)] diff --git a/glide-core/telemetry/src/open_telemetry.rs b/glide-core/telemetry/src/open_telemetry.rs new file mode 100644 index 0000000000..eb61247bd5 --- /dev/null +++ b/glide-core/telemetry/src/open_telemetry.rs @@ -0,0 +1,359 @@ +use opentelemetry::global::ObjectSafeSpan; +use opentelemetry::trace::SpanKind; +use opentelemetry::trace::TraceContextExt; +use opentelemetry::{global, trace::Tracer}; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use opentelemetry_sdk::trace::TracerProvider; +use std::path::PathBuf; +use std::sync::{Arc, RwLock}; + +const SPAN_WRITE_LOCK_ERR: &str = "Failed to get span write lock"; +const SPAN_READ_LOCK_ERR: &str = "Failed to get span read lock"; +const TRACE_SCOPE: &str = "valkey_glide"; + +pub enum GlideSpanStatus { + Ok, + Error(String), +} + +#[allow(dead_code)] +#[derive(Clone, Debug)] +/// Defines the method that exporter connects to the collector. It can be: +/// gRPC or HTTP. The third type (i.e. "File") defines an exporter that does not connect to a collector +/// instead, it writes the collected signals to files. +pub enum GlideOpenTelemetryTraceExporter { + /// Collector is listening on grpc + Grpc(String), + /// Collector is listening on http + Http(String), + /// No collector. Instead, write the traces collected to a file. The contained value "PathBuf" + /// points to the folder where the collected data should be placed. + File(PathBuf), +} + +#[derive(Clone, Debug)] +struct GlideSpanInner { + span: Arc>, +} + +impl GlideSpanInner { + /// Create new span with no parent. + pub fn new(name: &str) -> Self { + let tracer = global::tracer(TRACE_SCOPE); + let span = Arc::new(RwLock::new( + tracer + .span_builder(name.to_string()) + .with_kind(SpanKind::Client) + .start(&tracer), + )); + GlideSpanInner { span } + } + + /// Create new span as a child of `parent`. + pub fn new_with_parent(name: &str, parent: &GlideSpanInner) -> Self { + let parent_span_ctx = parent + .span + .read() + .expect(SPAN_READ_LOCK_ERR) + .span_context() + .clone(); + + let parent_context = + opentelemetry::Context::new().with_remote_span_context(parent_span_ctx); + + let tracer = global::tracer(TRACE_SCOPE); + let span = Arc::new(RwLock::new( + tracer + .span_builder(name.to_string()) + .with_kind(SpanKind::Client) + .start_with_context(&tracer, &parent_context), + )); + GlideSpanInner { span } + } + + /// Attach event with name and list of attributes to this span. + pub fn add_event(&self, name: &str, attributes: Option<&Vec<(&str, &str)>>) { + let attributes: Vec = if let Some(attributes) = attributes { + attributes + .iter() + .map(|(k, v)| opentelemetry::KeyValue::new(k.to_string(), v.to_string())) + .collect() + } else { + Vec::::default() + }; + self.span + .write() + .expect(SPAN_WRITE_LOCK_ERR) + .add_event_with_timestamp( + name.to_string().into(), + std::time::SystemTime::now(), + attributes, + ); + } + + pub fn set_status(&self, status: GlideSpanStatus) { + match status { + GlideSpanStatus::Ok => self + .span + .write() + .expect(SPAN_WRITE_LOCK_ERR) + .set_status(opentelemetry::trace::Status::Ok), + GlideSpanStatus::Error(what) => { + self.span.write().expect(SPAN_WRITE_LOCK_ERR).set_status( + opentelemetry::trace::Status::Error { + description: what.into(), + }, + ) + } + } + } + + /// Create new span, add it as a child to this span and return it + pub fn add_span(&self, name: &str) -> GlideSpanInner { + let child = GlideSpanInner::new_with_parent(name, self); + { + let child_span = child.span.read().expect(SPAN_WRITE_LOCK_ERR); + self.span + .write() + .expect(SPAN_WRITE_LOCK_ERR) + .add_link(child_span.span_context().clone(), Vec::default()); + } + child + } + + /// Return the span ID + pub fn id(&self) -> String { + self.span + .read() + .expect(SPAN_READ_LOCK_ERR) + .span_context() + .span_id() + .to_string() + } + + /// Finishes the `Span`. + pub fn end(&self) { + self.span.write().expect(SPAN_READ_LOCK_ERR).end() + } +} + +#[derive(Clone, Debug)] +pub struct GlideSpan { + inner: GlideSpanInner, +} + +impl GlideSpan { + pub fn new(name: &str) -> Self { + GlideSpan { + inner: GlideSpanInner::new(name), + } + } + + /// Attach event with name to this span. + pub fn add_event(&self, name: &str) { + self.inner.add_event(name, None) + } + + /// Attach event with name and attributes to this span. + pub fn add_event_with_attributes(&self, name: &str, attributes: &Vec<(&str, &str)>) { + self.inner.add_event(name, Some(attributes)) + } + + pub fn set_status(&self, status: GlideSpanStatus) { + self.inner.set_status(status) + } + + /// Add child span to this span and return it + pub fn add_span(&self, name: &str) -> GlideSpan { + GlideSpan { + inner: self.inner.add_span(name), + } + } + + pub fn id(&self) -> String { + self.inner.id() + } + + /// Finishes the `Span`. + pub fn end(&self) { + self.inner.end() + } +} + +/// OpenTelemetry configuration object. Use `GlideOpenTelemetryConfigBuilder` to construct it: +/// +/// ```text +/// let config = GlideOpenTelemetryConfigBuilder::default() +/// .with_flush_interval(std::time::Duration::from_millis(100)) +/// .build(); +/// GlideOpenTelemetry::initialise(config); +/// ``` +pub struct GlideOpenTelemetryConfig { + /// Default delay interval between two consecutive exports. + span_flush_interval: std::time::Duration, + /// Determines the protocol between the collector and GLIDE + trace_exporter: GlideOpenTelemetryTraceExporter, +} + +#[derive(Clone, Debug)] +#[allow(dead_code)] +pub struct GlideOpenTelemetryConfigBuilder { + span_flush_interval: std::time::Duration, + trace_exporter: GlideOpenTelemetryTraceExporter, +} + +impl Default for GlideOpenTelemetryConfigBuilder { + fn default() -> Self { + GlideOpenTelemetryConfigBuilder { + span_flush_interval: std::time::Duration::from_millis(5_000), + trace_exporter: GlideOpenTelemetryTraceExporter::File(std::env::temp_dir()), + } + } +} + +#[allow(dead_code)] +impl GlideOpenTelemetryConfigBuilder { + pub fn with_flush_interval(mut self, duration: std::time::Duration) -> Self { + self.span_flush_interval = duration; + self + } + + pub fn with_trace_exporter(mut self, protocol: GlideOpenTelemetryTraceExporter) -> Self { + self.trace_exporter = protocol; + self + } + + pub fn build(self) -> GlideOpenTelemetryConfig { + GlideOpenTelemetryConfig { + span_flush_interval: self.span_flush_interval, + trace_exporter: self.trace_exporter, + } + } +} + +pub struct GlideOpenTelemetry {} + +/// Our interface to OpenTelemetry +impl GlideOpenTelemetry { + /// Initialise the open telemetry library with a file system exporter + /// + /// This method should be called once for the given **process** + pub fn initialise(config: GlideOpenTelemetryConfig) { + let trace_exporter = match config.trace_exporter { + GlideOpenTelemetryTraceExporter::File(p) => { + let exporter = crate::SpanExporterFile::new(p); + let batch_config = opentelemetry_sdk::trace::BatchConfigBuilder::default() + .with_scheduled_delay(config.span_flush_interval) + .build(); + opentelemetry_sdk::trace::BatchSpanProcessor::builder( + exporter, + opentelemetry_sdk::runtime::Tokio, + ) + .with_batch_config(batch_config) + .build() + } + GlideOpenTelemetryTraceExporter::Http(_url) => { + todo!("HTTP protocol is not implemented yet!") + } + GlideOpenTelemetryTraceExporter::Grpc(_url) => { + todo!("GRPC protocol is not implemented yet!") + } + }; + + global::set_text_map_propagator(TraceContextPropagator::new()); + let provider = TracerProvider::builder() + .with_span_processor(trace_exporter) + .build(); + global::set_tracer_provider(provider); + } + + /// Create new span + pub fn new_span(name: &str) -> GlideSpan { + GlideSpan::new(name) + } + + /// Trigger a shutdown procedure flushing all remaining traces + pub fn shutdown() { + global::shutdown_tracer_provider(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + const SPANS_JSON: &str = "/tmp/spans.json"; + + fn string_property_to_u64(json: &serde_json::Value, prop: &str) -> u64 { + let s = json[prop].to_string().replace('"', ""); + s.parse::().unwrap() + } + + #[test] + fn test_span_json_exporter() { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + runtime.block_on(async { + let _ = std::fs::remove_file(SPANS_JSON); + let config = GlideOpenTelemetryConfigBuilder::default() + .with_flush_interval(std::time::Duration::from_millis(100)) + .with_trace_exporter(GlideOpenTelemetryTraceExporter::File(PathBuf::from("/tmp"))) + .build(); + GlideOpenTelemetry::initialise(config); + let span = GlideOpenTelemetry::new_span("Root_Span_1"); + span.add_event("Event1"); + span.set_status(GlideSpanStatus::Ok); + + let child1 = span.add_span("Network_Span"); + + // Simulate some work + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + child1.end(); + + // Simulate that the parent span is still doing some work + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + span.end(); + + let span = GlideOpenTelemetry::new_span("Root_Span_2"); + span.add_event("Event1"); + span.add_event("Event2"); + span.set_status(GlideSpanStatus::Ok); + drop(span); // writes the span + + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + // Read the file content + let file_content = std::fs::read_to_string(SPANS_JSON).unwrap(); + let lines: Vec<&str> = file_content.split('\n').collect(); + assert_eq!(lines.len(), 4); + + let span_json: serde_json::Value = serde_json::from_str(lines[0]).unwrap(); + assert_eq!(span_json["name"], "Network_Span"); + let network_span_id = span_json["span_id"].to_string(); + let network_span_start_time = string_property_to_u64(&span_json, "start_time"); + let network_span_end_time = string_property_to_u64(&span_json, "end_time"); + + // Because of the sleep above, the network span should be at least 100ms (units are microseconds) + assert!(network_span_end_time - network_span_start_time >= 100_000); + + let span_json: serde_json::Value = serde_json::from_str(lines[1]).unwrap(); + assert_eq!(span_json["name"], "Root_Span_1"); + assert_eq!(span_json["links"].as_array().unwrap().len(), 1); // we expect 1 child + let root_1_span_start_time = string_property_to_u64(&span_json, "start_time"); + let root_1_span_end_time = string_property_to_u64(&span_json, "end_time"); + + // The network span started *after* its parent + assert!(network_span_start_time >= root_1_span_start_time); + + // The parent span ends *after* the child span (by at least 100ms) + assert!(root_1_span_end_time - network_span_end_time >= 100_000); + + let child_span_id = span_json["links"][0]["span_id"].to_string(); + assert_eq!(child_span_id, network_span_id); + + let span_json: serde_json::Value = serde_json::from_str(lines[2]).unwrap(); + assert_eq!(span_json["name"], "Root_Span_2"); + }); + } +} diff --git a/glide-core/telemetry/src/open_telemetry_exporter_file.rs b/glide-core/telemetry/src/open_telemetry_exporter_file.rs new file mode 100644 index 0000000000..71282cccda --- /dev/null +++ b/glide-core/telemetry/src/open_telemetry_exporter_file.rs @@ -0,0 +1,194 @@ +use chrono::{DateTime, Utc}; +use core::fmt; +use futures_util::future::BoxFuture; +use opentelemetry::trace::TraceError; +use opentelemetry_sdk::export::{self, trace::ExportResult}; +use serde_json::{Map, Value}; +use std::fs::OpenOptions; +use std::io::Write; +use std::path::PathBuf; +use std::sync::atomic; + +use opentelemetry_sdk::resource::Resource; + +/// An OpenTelemetry exporter that writes Spans to a file on export. +pub struct SpanExporterFile { + resource: Resource, + is_shutdown: atomic::AtomicBool, + path: PathBuf, +} + +impl fmt::Debug for SpanExporterFile { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("SpanExporterFile") + } +} + +impl SpanExporterFile { + pub fn new(mut path: PathBuf) -> Self { + path.push("spans.json"); + SpanExporterFile { + resource: Resource::default(), + is_shutdown: atomic::AtomicBool::new(false), + path, + } + } +} + +macro_rules! file_writeln { + ($file:expr, $content:expr) => {{ + if let Err(e) = $file.write(format!("{}\n", $content).as_bytes()) { + return Box::pin(std::future::ready(Err(TraceError::from(format!( + "File write error. {e}", + ))))); + } + }}; +} + +impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporterFile { + /// Write Spans to JSON file + fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { + let Ok(mut data_file) = OpenOptions::new() + .create(true) + .append(true) + .open(&self.path) + else { + return Box::pin(std::future::ready(Err(TraceError::from(format!( + "Unable to open exporter file: {} for append.", + self.path.display() + ))))); + }; + + let spans = to_jsons(batch); + for span in &spans { + if let Ok(s) = serde_json::to_string(&span) { + file_writeln!(data_file, s); + } + } + Box::pin(std::future::ready(Ok(()))) + } + + fn shutdown(&mut self) { + self.is_shutdown.store(true, atomic::Ordering::SeqCst); + } + + fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) { + self.resource = res.clone(); + } +} + +fn to_jsons(batch: Vec) -> Vec { + let mut spans = Vec::::new(); + for span in &batch { + let mut map = Map::new(); + map.insert( + "scope".to_string(), + Value::String(span.instrumentation_scope.name().to_string()), + ); + if let Some(version) = &span.instrumentation_scope.version() { + map.insert("version".to_string(), Value::String(version.to_string())); + } + if let Some(schema_url) = &span.instrumentation_scope.schema_url() { + map.insert( + "schema_url".to_string(), + Value::String(schema_url.to_string()), + ); + } + + let mut scope_attributes = Vec::::new(); + for kv in span.instrumentation_scope.attributes() { + let mut attr = Map::new(); + attr.insert(kv.key.to_string(), Value::String(kv.value.to_string())); + scope_attributes.push(Value::Object(attr)); + } + map.insert( + "scope_attributes".to_string(), + Value::Array(scope_attributes), + ); + map.insert("name".to_string(), Value::String(span.name.to_string())); + map.insert( + "span_id".to_string(), + Value::String(span.span_context.span_id().to_string()), + ); + map.insert( + "parent_span_id".to_string(), + Value::String(span.parent_span_id.to_string()), + ); + map.insert( + "trace_id".to_string(), + Value::String(span.span_context.trace_id().to_string()), + ); + map.insert( + "kind".to_string(), + Value::String(format!("{:?}", span.span_kind)), + ); + + let datetime: DateTime = span.start_time.into(); + map.insert( + "start_time".to_string(), + Value::String(datetime.timestamp_micros().to_string()), + ); + + let datetime: DateTime = span.end_time.into(); + map.insert( + "end_time".to_string(), + Value::String(datetime.timestamp_micros().to_string()), + ); + + map.insert( + "status".to_string(), + Value::String(format!("{:?}", span.status)), + ); + + // Add the span attributes + let mut span_attributes = Vec::::new(); + for kv in span.attributes.iter() { + let mut attr = Map::new(); + attr.insert(kv.key.to_string(), Value::String(kv.value.to_string())); + span_attributes.push(Value::Object(attr)); + } + map.insert("span_attributes".to_string(), Value::Array(span_attributes)); + + // Add span events + let mut events = Vec::::new(); + for event in span.events.iter() { + let mut evt = Map::new(); + evt.insert("name".to_string(), Value::String(event.name.to_string())); + let datetime: DateTime = event.timestamp.into(); + evt.insert( + "timestamp".to_string(), + Value::String(datetime.format("%Y-%m-%d %H:%M:%S%.6f").to_string()), + ); + + let mut event_attributes = Vec::::new(); + for kv in event.attributes.iter() { + let mut attr = Map::new(); + attr.insert(kv.key.to_string(), Value::String(kv.value.to_string())); + event_attributes.push(Value::Object(attr)); + } + evt.insert( + "event_attributes".to_string(), + Value::Array(event_attributes), + ); + events.push(Value::Object(evt)); + } + map.insert("events".to_string(), Value::Array(events)); + + let mut links = Vec::::new(); + for link in span.links.iter() { + let mut lk = Map::new(); + lk.insert( + "trace_id".to_string(), + Value::String(link.span_context.trace_id().to_string()), + ); + lk.insert( + "span_id".to_string(), + Value::String(link.span_context.span_id().to_string()), + ); + links.push(Value::Object(lk)); + } + map.insert("links".to_string(), Value::Array(links)); + spans.push(Value::Object(map)); + } + spans +} From de9526dddb8535024cdeb7a82311a8f5f1164446 Mon Sep 17 00:00:00 2001 From: BoazBD <50696333+BoazBD@users.noreply.github.com> Date: Tue, 7 Jan 2025 17:53:28 +0200 Subject: [PATCH 3/6] Update ORT approved list (#2922) update ort script to ignore approved liceneses and packages Signed-off-by: BoazBD --- utils/get_licenses_from_ort.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/utils/get_licenses_from_ort.py b/utils/get_licenses_from_ort.py index 6b4b6cb60e..0ba84559e7 100644 --- a/utils/get_licenses_from_ort.py +++ b/utils/get_licenses_from_ort.py @@ -34,6 +34,7 @@ "BSD-3-Clause OR Apache-2.0", "ISC", "MIT", + "MPL-2.0", "Zlib", "MIT OR Unlicense", "PSF-2.0", @@ -42,7 +43,9 @@ # Packages with non-pre-approved licenses that received manual approval. APPROVED_PACKAGES = [ "PyPI::pathspec:0.12.1", - "PyPI::certifi:2023.11.17" + "PyPI::certifi:2023.11.17", + "Crate::ring:0.17.8", + "Maven:org.json:json:20231013" ] SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__)) From 4341d66cfcd51a1f3b1e37bbf8ba46c8a50be71c Mon Sep 17 00:00:00 2001 From: Edric Cuartero Date: Wed, 8 Jan 2025 11:41:38 +0800 Subject: [PATCH 4/6] Go: Implement Persist Command (#2829) Implement Persist Command Signed-off-by: EdricCua --- go/api/base_client.go | 8 ++++++++ go/api/generic_commands.go | 19 +++++++++++++++++++ go/integTest/shared_commands_test.go | 28 ++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+) diff --git a/go/api/base_client.go b/go/api/base_client.go index 035ff774ba..b954ddabb3 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -1475,3 +1475,11 @@ func (client *baseClient) BZPopMin(keys []string, timeoutSecs float64) (Result[K return handleKeyWithMemberAndScoreResponse(result) } + +func (client *baseClient) Persist(key string) (Result[bool], error) { + result, err := client.executeCommand(C.Persist, []string{key}) + if err != nil { + return CreateNilBoolResult(), err + } + return handleBooleanResponse(result) +} diff --git a/go/api/generic_commands.go b/go/api/generic_commands.go index c583dfe31b..e562c67c4c 100644 --- a/go/api/generic_commands.go +++ b/go/api/generic_commands.go @@ -428,4 +428,23 @@ type GenericBaseCommands interface { // // [valkey.io]: https://valkey.io/commands/renamenx/ Renamenx(key string, newKey string) (Result[bool], error) + + // Removes the existing timeout on key, turning the key from volatile + // (a key with an expire set) to persistent (a key that will never expire as no timeout is associated). + // + // Parameters: + // key - The key to remove the existing timeout on. + // + // Return value: + // false if key does not exist or does not have an associated timeout, true if the timeout has been removed. + // + // Example: + // result, err := client.Persist([]string{"key"}) + // if err != nil { + // // handle error + // } + // fmt.Println(result.Value()) // Output: true + // + // [valkey.io]: https://valkey.io/commands/persist/ + Persist(key string) (Result[bool], error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index a43967bf1f..df0568e3f4 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -4456,3 +4456,31 @@ func (suite *GlideTestSuite) TestZRem() { assert.IsType(suite.T(), &api.RequestError{}, err) }) } + +func (suite *GlideTestSuite) TestPersist() { + suite.runWithDefaultClients(func(client api.BaseClient) { + // Test 1: Check if persist command removes the expiration time of a key. + keyName := "{keyName}" + uuid.NewString() + t := suite.T() + suite.verifyOK(client.Set(keyName, initialValue)) + resultExpire, err := client.Expire(keyName, 300) + assert.Nil(t, err) + assert.True(t, resultExpire.Value()) + resultPersist, err := client.Persist(keyName) + assert.Nil(t, err) + assert.True(t, resultPersist.Value()) + + // Test 2: Check if persist command return false if key that doesnt have associated timeout. + keyNoExp := "{keyName}" + uuid.NewString() + suite.verifyOK(client.Set(keyNoExp, initialValue)) + resultPersistNoExp, err := client.Persist(keyNoExp) + assert.Nil(t, err) + assert.False(t, resultPersistNoExp.Value()) + + // Test 3: Check if persist command return false if key not exist. + keyInvalid := "{invalidkey_forPersistTest}" + uuid.NewString() + resultInvalidKey, err := client.Persist(keyInvalid) + assert.Nil(t, err) + assert.False(t, resultInvalidKey.Value()) + }) +} From ebec0f79bed3c93fcd2cec62ca483dd7997bdab8 Mon Sep 17 00:00:00 2001 From: ikolomi Date: Wed, 8 Jan 2025 11:52:55 +0200 Subject: [PATCH 5/6] Add NO-OP worflow for testing of Self Hosted Runners scaling Signed-off-by: ikolomi --- .github/workflows/scale-shr-test.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 .github/workflows/scale-shr-test.yml diff --git a/.github/workflows/scale-shr-test.yml b/.github/workflows/scale-shr-test.yml new file mode 100644 index 0000000000..68f6f76cef --- /dev/null +++ b/.github/workflows/scale-shr-test.yml @@ -0,0 +1,11 @@ +name: Test workflow for scaling of Self Hosted Runners +on: + workflow_dispatch: + +jobs: + hello-world: + name: "say hello world" + runs-on: [self-hosted, linux, ARM64] + steps: + - name: print Hello World + run: echo "Hello World" From c11215575f54b924ddc546bd8d83dcbde9c6edc0 Mon Sep 17 00:00:00 2001 From: Shachar Langbeheim Date: Wed, 8 Jan 2025 13:04:24 +0200 Subject: [PATCH 6/6] Core: Some pointer shenanigans. (#2895) * Use C-string literals instead of allocating CStrings. * Use the `from_mut` function where relevant. Signed-off-by: Shachar Langbeheim --- csharp/lib/src/lib.rs | 2 +- .../benches/rotating_buffer_benchmark.rs | 8 +-- glide-core/src/rotating_buffer.rs | 8 +-- glide-core/src/socket_listener.rs | 9 ++-- go/api/response_handlers.go | 2 - go/src/lib.rs | 36 +++++-------- java/src/ffi_test.rs | 23 +++++---- node/rust-client/src/lib.rs | 51 ++++++++++--------- python/src/lib.rs | 5 +- 9 files changed, 70 insertions(+), 74 deletions(-) diff --git a/csharp/lib/src/lib.rs b/csharp/lib/src/lib.rs index c497410e31..88da043f03 100644 --- a/csharp/lib/src/lib.rs +++ b/csharp/lib/src/lib.rs @@ -51,7 +51,7 @@ fn create_client_internal( success_callback: unsafe extern "C" fn(usize, *const c_char) -> (), failure_callback: unsafe extern "C" fn(usize) -> (), ) -> RedisResult { - let host_cstring = unsafe { CStr::from_ptr(host as *mut c_char) }; + let host_cstring = unsafe { CStr::from_ptr(host) }; let host_string = host_cstring.to_str()?.to_string(); let request = create_connection_request(host_string, port, use_tls); let runtime = Builder::new_multi_thread() diff --git a/glide-core/benches/rotating_buffer_benchmark.rs b/glide-core/benches/rotating_buffer_benchmark.rs index 7f543a21d3..055702035c 100644 --- a/glide-core/benches/rotating_buffer_benchmark.rs +++ b/glide-core/benches/rotating_buffer_benchmark.rs @@ -1,6 +1,6 @@ // Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 -use std::io::Write; +use std::{io::Write, ptr::from_mut}; use bytes::BufMut; use criterion::{black_box, criterion_group, criterion_main, Criterion}; @@ -169,9 +169,9 @@ fn create_request(args: Vec, args_pointer: bool) -> CommandRequest let mut command = Command::new(); command.request_type = RequestType::CustomCommand.into(); if args_pointer { - command.args = Some(command::Args::ArgsVecPointer(Box::leak(Box::new(args)) - as *mut Vec - as u64)); + command.args = Some(command::Args::ArgsVecPointer( + from_mut(Box::leak(Box::new(args))) as u64, + )); } else { let mut args_array = command::ArgsArray::new(); args_array.args = args; diff --git a/glide-core/src/rotating_buffer.rs b/glide-core/src/rotating_buffer.rs index 1bebb33c65..d207f3419b 100644 --- a/glide-core/src/rotating_buffer.rs +++ b/glide-core/src/rotating_buffer.rs @@ -62,6 +62,8 @@ impl RotatingBuffer { #[cfg(test)] mod tests { + use std::ptr::from_mut; + use super::*; use crate::command_request::{command, command_request}; use crate::command_request::{Command, CommandRequest, RequestType}; @@ -87,9 +89,9 @@ mod tests { let mut command = Command::new(); command.request_type = request_type.into(); if args_pointer { - command.args = Some(command::Args::ArgsVecPointer(Box::leak(Box::new(args)) - as *mut Vec - as u64)); + command.args = Some(command::Args::ArgsVecPointer( + from_mut(Box::leak(Box::new(args))) as u64, + )); } else { let mut args_array = command::ArgsArray::new(); args_array.args.clone_from(&args); diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index 9d137d21bf..0b034e48c3 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -22,6 +22,7 @@ use redis::cluster_routing::{ResponsePolicy, Routable}; use redis::{ClusterScanArgs, Cmd, PushInfo, RedisError, ScanStateRC, Value}; use std::cell::Cell; use std::collections::HashSet; +use std::ptr::from_mut; use std::rc::Rc; use std::sync::RwLock; use std::{env, str}; @@ -191,8 +192,8 @@ async fn write_result( if value != Value::Nil { // Since null values don't require any additional data, they can be sent without any extra effort. // Move the value to the heap and leak it. The wrapper should use `Box::from_raw` to recreate the box, use the value, and drop the allocation. - let pointer = Box::leak(Box::new(value)); - let raw_pointer = pointer as *mut redis::Value; + let reference = Box::leak(Box::new(value)); + let raw_pointer = from_mut(reference); Some(response::response::Value::RespPointer(raw_pointer as u64)) } else { None @@ -639,8 +640,8 @@ async fn push_manager_loop(mut push_rx: mpsc::UnboundedReceiver, write kind: (push_msg.kind), data: (push_msg.data), }; - let pointer = Box::leak(Box::new(push_val)); - let raw_pointer = pointer as *mut redis::Value; + let reference = Box::leak(Box::new(push_val)); + let raw_pointer = from_mut(reference); Some(response::response::Value::RespPointer(raw_pointer as u64)) }; diff --git a/go/api/response_handlers.go b/go/api/response_handlers.go index 4a5056c0c6..9f788f507d 100644 --- a/go/api/response_handlers.go +++ b/go/api/response_handlers.go @@ -14,7 +14,6 @@ import ( func checkResponseType(response *C.struct_CommandResponse, expectedType C.ResponseType, isNilable bool) error { expectedTypeInt := uint32(expectedType) expectedTypeStr := C.get_response_type_string(expectedTypeInt) - defer C.free_response_type_string(expectedTypeStr) if !isNilable && response == nil { return &RequestError{ @@ -34,7 +33,6 @@ func checkResponseType(response *C.struct_CommandResponse, expectedType C.Respon } actualTypeStr := C.get_response_type_string(response.response_type) - defer C.free_response_type_string(actualTypeStr) return &RequestError{ fmt.Sprintf( "Unexpected return type from Valkey: got %s, expected %s", diff --git a/go/src/lib.rs b/go/src/lib.rs index 376da58dfa..f1eb794d31 100644 --- a/go/src/lib.rs +++ b/go/src/lib.rs @@ -258,31 +258,21 @@ pub unsafe extern "C" fn free_connection_response( } /// Provides the string mapping for the ResponseType enum. -#[no_mangle] -pub extern "C" fn get_response_type_string(response_type: ResponseType) -> *mut c_char { - let s = match response_type { - ResponseType::Null => "Null", - ResponseType::Int => "Int", - ResponseType::Float => "Float", - ResponseType::Bool => "Bool", - ResponseType::String => "String", - ResponseType::Array => "Array", - ResponseType::Map => "Map", - ResponseType::Sets => "Sets", - }; - let c_str = CString::new(s).unwrap_or_default(); - c_str.into_raw() -} - -/// Deallocates a string generated via get_response_type_string. /// -/// # Safety -/// free_response_type_string can be called only once per response_string. +/// Important: the returned pointer is a pointer to a constant string and should not be freed. #[no_mangle] -pub extern "C" fn free_response_type_string(response_string: *mut c_char) { - if !response_string.is_null() { - drop(unsafe { CString::from_raw(response_string as *mut c_char) }); - } +pub extern "C" fn get_response_type_string(response_type: ResponseType) -> *const c_char { + let c_str = match response_type { + ResponseType::Null => c"Null", + ResponseType::Int => c"Int", + ResponseType::Float => c"Float", + ResponseType::Bool => c"Bool", + ResponseType::String => c"String", + ResponseType::Array => c"Array", + ResponseType::Map => c"Map", + ResponseType::Sets => c"Sets", + }; + c_str.as_ptr() } /// Deallocates a `CommandResponse`. diff --git a/java/src/ffi_test.rs b/java/src/ffi_test.rs index fb54fc3b5b..141d569fcb 100644 --- a/java/src/ffi_test.rs +++ b/java/src/ffi_test.rs @@ -7,6 +7,7 @@ use jni::{ JNIEnv, }; use redis::Value; +use std::ptr::from_mut; #[no_mangle] pub extern "system" fn Java_glide_ffi_FfiTest_createLeakedNil<'local>( @@ -14,7 +15,7 @@ pub extern "system" fn Java_glide_ffi_FfiTest_createLeakedNil<'local>( _class: JClass<'local>, ) -> jlong { let resp_value = Value::Nil; - Box::leak(Box::new(resp_value)) as *mut Value as jlong + from_mut(Box::leak(Box::new(resp_value))) as jlong } #[no_mangle] @@ -25,7 +26,7 @@ pub extern "system" fn Java_glide_ffi_FfiTest_createLeakedSimpleString<'local>( ) -> jlong { let value: String = env.get_string(&value).unwrap().into(); let resp_value = Value::SimpleString(value); - Box::leak(Box::new(resp_value)) as *mut Value as jlong + from_mut(Box::leak(Box::new(resp_value))) as jlong } #[no_mangle] @@ -34,7 +35,7 @@ pub extern "system" fn Java_glide_ffi_FfiTest_createLeakedOkay<'local>( _class: JClass<'local>, ) -> jlong { let resp_value = Value::Okay; - Box::leak(Box::new(resp_value)) as *mut Value as jlong + from_mut(Box::leak(Box::new(resp_value))) as jlong } #[no_mangle] @@ -44,7 +45,7 @@ pub extern "system" fn Java_glide_ffi_FfiTest_createLeakedInt<'local>( value: jlong, ) -> jlong { let resp_value = Value::Int(value); - Box::leak(Box::new(resp_value)) as *mut Value as jlong + from_mut(Box::leak(Box::new(resp_value))) as jlong } #[no_mangle] @@ -56,7 +57,7 @@ pub extern "system" fn Java_glide_ffi_FfiTest_createLeakedBulkString<'local>( let value = env.convert_byte_array(&value).unwrap(); let value = value.into_iter().collect::>(); let resp_value = Value::BulkString(value); - Box::leak(Box::new(resp_value)) as *mut Value as jlong + from_mut(Box::leak(Box::new(resp_value))) as jlong } #[no_mangle] @@ -67,7 +68,7 @@ pub extern "system" fn Java_glide_ffi_FfiTest_createLeakedLongArray<'local>( ) -> jlong { let array = java_long_array_to_value(&mut env, &value); let resp_value = Value::Array(array); - Box::leak(Box::new(resp_value)) as *mut Value as jlong + from_mut(Box::leak(Box::new(resp_value))) as jlong } #[no_mangle] @@ -81,7 +82,7 @@ pub extern "system" fn Java_glide_ffi_FfiTest_createLeakedMap<'local>( let values_vec = java_long_array_to_value(&mut env, &values); let map: Vec<(Value, Value)> = keys_vec.into_iter().zip(values_vec).collect(); let resp_value = Value::Map(map); - Box::leak(Box::new(resp_value)) as *mut Value as jlong + from_mut(Box::leak(Box::new(resp_value))) as jlong } #[no_mangle] @@ -91,7 +92,7 @@ pub extern "system" fn Java_glide_ffi_FfiTest_createLeakedDouble<'local>( value: jdouble, ) -> jlong { let resp_value = Value::Double(value.into()); - Box::leak(Box::new(resp_value)) as *mut Value as jlong + from_mut(Box::leak(Box::new(resp_value))) as jlong } #[no_mangle] @@ -101,7 +102,7 @@ pub extern "system" fn Java_glide_ffi_FfiTest_createLeakedBoolean<'local>( value: jboolean, ) -> jlong { let resp_value = Value::Boolean(value != 0); - Box::leak(Box::new(resp_value)) as *mut Value as jlong + from_mut(Box::leak(Box::new(resp_value))) as jlong } #[no_mangle] @@ -116,7 +117,7 @@ pub extern "system" fn Java_glide_ffi_FfiTest_createLeakedVerbatimString<'local> format: VerbatimFormat::Text, text: value, }; - Box::leak(Box::new(resp_value)) as *mut Value as jlong + from_mut(Box::leak(Box::new(resp_value))) as jlong } #[no_mangle] @@ -127,7 +128,7 @@ pub extern "system" fn Java_glide_ffi_FfiTest_createLeakedLongSet<'local>( ) -> jlong { let set = java_long_array_to_value(&mut env, &value); let resp_value = Value::Set(set); - Box::leak(Box::new(resp_value)) as *mut Value as jlong + from_mut(Box::leak(Box::new(resp_value))) as jlong } fn java_long_array_to_value<'local>( diff --git a/node/rust-client/src/lib.rs b/node/rust-client/src/lib.rs index ffa5b5c47f..584eab16de 100644 --- a/node/rust-client/src/lib.rs +++ b/node/rust-client/src/lib.rs @@ -24,6 +24,7 @@ use num_traits::sign::Signed; use redis::{aio::MultiplexedConnection, AsyncCommands, Value}; #[cfg(feature = "testing_utilities")] use std::collections::HashMap; +use std::ptr::from_mut; use std::str; use tokio::runtime::{Builder, Runtime}; #[napi] @@ -315,7 +316,7 @@ fn split_pointer(pointer: *mut T) -> [u32; 2] { #[cfg(feature = "testing_utilities")] pub fn create_leaked_string(message: String) -> [u32; 2] { let value = Value::SimpleString(message); - let pointer = Box::leak(Box::new(value)) as *mut Value; + let pointer = from_mut(Box::leak(Box::new(value))); split_pointer(pointer) } @@ -323,7 +324,7 @@ pub fn create_leaked_string(message: String) -> [u32; 2] { pub fn create_leaked_string_vec(message: Vec) -> [u32; 2] { // Convert the string vec -> Bytes vector let bytes_vec: Vec = message.iter().map(|v| Bytes::from(v.to_vec())).collect(); - let pointer = Box::leak(Box::new(bytes_vec)) as *mut Vec; + let pointer = from_mut(Box::leak(Box::new(bytes_vec))); split_pointer(pointer) } @@ -332,11 +333,11 @@ pub fn create_leaked_string_vec(message: Vec) -> [u32; 2] { /// Should NOT be used in production. #[cfg(feature = "testing_utilities")] pub fn create_leaked_map(map: HashMap) -> [u32; 2] { - let pointer = Box::leak(Box::new(Value::Map( + let pointer = from_mut(Box::leak(Box::new(Value::Map( map.into_iter() .map(|(key, value)| (Value::SimpleString(key), Value::SimpleString(value))) .collect(), - ))) as *mut Value; + )))); split_pointer(pointer) } @@ -345,9 +346,9 @@ pub fn create_leaked_map(map: HashMap) -> [u32; 2] { /// Should NOT be used in production. #[cfg(feature = "testing_utilities")] pub fn create_leaked_array(array: Vec) -> [u32; 2] { - let pointer = Box::leak(Box::new(Value::Array( + let pointer = from_mut(Box::leak(Box::new(Value::Array( array.into_iter().map(Value::SimpleString).collect(), - ))) as *mut Value; + )))); split_pointer(pointer) } @@ -356,13 +357,13 @@ pub fn create_leaked_array(array: Vec) -> [u32; 2] { /// Should NOT be used in production. #[cfg(feature = "testing_utilities")] pub fn create_leaked_attribute(message: String, attribute: HashMap) -> [u32; 2] { - let pointer = Box::leak(Box::new(Value::Attribute { + let pointer = from_mut(Box::leak(Box::new(Value::Attribute { data: Box::new(Value::SimpleString(message)), attributes: attribute .into_iter() .map(|(key, value)| (Value::SimpleString(key), Value::SimpleString(value))) .collect(), - })) as *mut Value; + }))); split_pointer(pointer) } @@ -371,21 +372,23 @@ pub fn create_leaked_attribute(message: String, attribute: HashMap [u32; 2] { - let pointer = Box::leak(Box::new(Value::BigNumber(num_bigint::BigInt::new( - if big_int.sign_bit { - num_bigint::Sign::Minus - } else { - num_bigint::Sign::Plus - }, - big_int - .words - .into_iter() - .flat_map(|word| { - let bytes = u64::to_le_bytes(word); - unsafe { std::mem::transmute::<[u8; 8], [u32; 2]>(bytes) } - }) - .collect(), - )))) as *mut Value; + let pointer = from_mut(Box::leak(Box::new(Value::BigNumber( + num_bigint::BigInt::new( + if big_int.sign_bit { + num_bigint::Sign::Minus + } else { + num_bigint::Sign::Plus + }, + big_int + .words + .into_iter() + .flat_map(|word| { + let bytes = u64::to_le_bytes(word); + unsafe { std::mem::transmute::<[u8; 8], [u32; 2]>(bytes) } + }) + .collect(), + ), + )))); split_pointer(pointer) } @@ -394,7 +397,7 @@ pub fn create_leaked_bigint(big_int: BigInt) -> [u32; 2] { /// Should NOT be used in production. #[cfg(feature = "testing_utilities")] pub fn create_leaked_double(float: f64) -> [u32; 2] { - let pointer = Box::leak(Box::new(Value::Double(float))) as *mut Value; + let pointer = from_mut(Box::leak(Box::new(Value::Double(float)))); split_pointer(pointer) } diff --git a/python/src/lib.rs b/python/src/lib.rs index 09914c2c59..5e33ab8bd3 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -11,6 +11,7 @@ use pyo3::types::{PyAny, PyBool, PyBytes, PyDict, PyFloat, PyList, PySet, PyStri use pyo3::Python; use redis::Value; use std::collections::HashMap; +use std::ptr::from_mut; use std::sync::Arc; pub const DEFAULT_TIMEOUT_IN_MILLISECONDS: u32 = @@ -263,7 +264,7 @@ fn glide(_py: Python, m: &Bound) -> PyResult<()> { /// Should NOT be used in production. pub fn create_leaked_value(message: String) -> usize { let value = Value::SimpleString(message); - Box::leak(Box::new(value)) as *mut Value as usize + from_mut(Box::leak(Box::new(value))) as usize } #[pyfunction] @@ -276,7 +277,7 @@ fn glide(_py: Python, m: &Bound) -> PyResult<()> { Bytes::from(bytes.to_vec()) }) .collect(); - Box::leak(Box::new(bytes_vec)) as *mut Vec as usize + from_mut(Box::leak(Box::new(bytes_vec))) as usize } Ok(()) }