Skip to content

Commit

Permalink
Merge branch 'main' into niharika-sortcmds
Browse files Browse the repository at this point in the history
  • Loading branch information
niharikabhavaraju committed Jan 8, 2025
2 parents 2e2fbbb + c112155 commit a1b5f2e
Show file tree
Hide file tree
Showing 20 changed files with 733 additions and 78 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/scale-shr-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: Test workflow for scaling of Self Hosted Runners
on:
workflow_dispatch:

jobs:
hello-world:
name: "say hello world"
runs-on: [self-hosted, linux, ARM64]
steps:
- name: print Hello World
run: echo "Hello World"
2 changes: 1 addition & 1 deletion csharp/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn create_client_internal(
success_callback: unsafe extern "C" fn(usize, *const c_char) -> (),
failure_callback: unsafe extern "C" fn(usize) -> (),
) -> RedisResult<Client> {
let host_cstring = unsafe { CStr::from_ptr(host as *mut c_char) };
let host_cstring = unsafe { CStr::from_ptr(host) };
let host_string = host_cstring.to_str()?.to_string();
let request = create_connection_request(host_string, port, use_tls);
let runtime = Builder::new_multi_thread()
Expand Down
8 changes: 4 additions & 4 deletions glide-core/benches/rotating_buffer_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0

use std::io::Write;
use std::{io::Write, ptr::from_mut};

use bytes::BufMut;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -169,9 +169,9 @@ fn create_request(args: Vec<bytes::Bytes>, args_pointer: bool) -> CommandRequest
let mut command = Command::new();
command.request_type = RequestType::CustomCommand.into();
if args_pointer {
command.args = Some(command::Args::ArgsVecPointer(Box::leak(Box::new(args))
as *mut Vec<bytes::Bytes>
as u64));
command.args = Some(command::Args::ArgsVecPointer(
from_mut(Box::leak(Box::new(args))) as u64,
));
} else {
let mut args_array = command::ArgsArray::new();
args_array.args = args;
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,7 @@ where
.extend_connection_map(connection_map);
if let Err(err) = Self::refresh_slots_and_subscriptions_with_retries(
inner.clone(),
&RefreshPolicy::Throttable,
&RefreshPolicy::NotThrottable,
)
.await
{
Expand Down
21 changes: 21 additions & 0 deletions glide-core/redis-rs/redis/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{fmt, io};
use crate::connection::ConnectionLike;
use crate::pipeline::Pipeline;
use crate::types::{from_owned_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs};
use telemetrylib::GlideSpan;

/// An argument to a redis command
#[derive(Clone)]
Expand All @@ -30,6 +31,8 @@ pub struct Cmd {
cursor: Option<u64>,
// If it's true command's response won't be read from socket. Useful for Pub/Sub.
no_response: bool,
/// The span associated with this command
span: Option<GlideSpan>,
}

/// Represents a redis iterator.
Expand Down Expand Up @@ -321,6 +324,7 @@ impl Cmd {
args: vec![],
cursor: None,
no_response: false,
span: None,
}
}

Expand All @@ -331,6 +335,7 @@ impl Cmd {
args: Vec::with_capacity(arg_count),
cursor: None,
no_response: false,
span: None,
}
}

Expand Down Expand Up @@ -360,6 +365,16 @@ impl Cmd {
self
}

/// Associate a trackable span to the command. This allow tracking the lifetime
/// of the command.
///
/// A span is used by an OpenTelemetry backend to track the lifetime of the command
#[inline]
pub fn with_span(&mut self, name: &str) -> &mut Cmd {
self.span = Some(telemetrylib::GlideOpenTelemetry::new_span(name));
self
}

/// Works similar to `arg` but adds a cursor argument. This is always
/// an integer and also flips the command implementation to support a
/// different mode for the iterators where the iterator will ask for
Expand Down Expand Up @@ -582,6 +597,12 @@ impl Cmd {
pub fn is_no_response(&self) -> bool {
self.no_response
}

/// Return this command span
#[inline]
pub fn span(&self) -> Option<GlideSpan> {
self.span.clone()
}
}

impl fmt::Debug for Cmd {
Expand Down
8 changes: 5 additions & 3 deletions glide-core/src/rotating_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ impl RotatingBuffer {

#[cfg(test)]
mod tests {
use std::ptr::from_mut;

use super::*;
use crate::command_request::{command, command_request};
use crate::command_request::{Command, CommandRequest, RequestType};
Expand All @@ -87,9 +89,9 @@ mod tests {
let mut command = Command::new();
command.request_type = request_type.into();
if args_pointer {
command.args = Some(command::Args::ArgsVecPointer(Box::leak(Box::new(args))
as *mut Vec<Bytes>
as u64));
command.args = Some(command::Args::ArgsVecPointer(
from_mut(Box::leak(Box::new(args))) as u64,
));
} else {
let mut args_array = command::ArgsArray::new();
args_array.args.clone_from(&args);
Expand Down
18 changes: 12 additions & 6 deletions glide-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use redis::cluster_routing::{ResponsePolicy, Routable};
use redis::{ClusterScanArgs, Cmd, PushInfo, RedisError, ScanStateRC, Value};
use std::cell::Cell;
use std::collections::HashSet;
use std::ptr::from_mut;
use std::rc::Rc;
use std::sync::RwLock;
use std::{env, str};
Expand Down Expand Up @@ -191,8 +192,8 @@ async fn write_result(
if value != Value::Nil {
// Since null values don't require any additional data, they can be sent without any extra effort.
// Move the value to the heap and leak it. The wrapper should use `Box::from_raw` to recreate the box, use the value, and drop the allocation.
let pointer = Box::leak(Box::new(value));
let raw_pointer = pointer as *mut redis::Value;
let reference = Box::leak(Box::new(value));
let raw_pointer = from_mut(reference);
Some(response::response::Value::RespPointer(raw_pointer as u64))
} else {
None
Expand Down Expand Up @@ -302,10 +303,15 @@ async fn send_command(
mut client: Client,
routing: Option<RoutingInfo>,
) -> ClientUsageResult<Value> {
client
let child_span = cmd.span().map(|span| span.add_span("send_command"));
let res = client
.send_command(&cmd, routing)
.await
.map_err(|err| err.into())
.map_err(|err| err.into());
if let Some(child_span) = child_span {
child_span.end();
}
res
}

// Parse the cluster scan command parameters from protobuf and send the command to redis-rs.
Expand Down Expand Up @@ -634,8 +640,8 @@ async fn push_manager_loop(mut push_rx: mpsc::UnboundedReceiver<PushInfo>, write
kind: (push_msg.kind),
data: (push_msg.data),
};
let pointer = Box::leak(Box::new(push_val));
let raw_pointer = pointer as *mut redis::Value;
let reference = Box::leak(Box::new(push_val));
let raw_pointer = from_mut(reference);
Some(response::response::Value::RespPointer(raw_pointer as u64))
};

Expand Down
6 changes: 6 additions & 0 deletions glide-core/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ authors = ["Valkey GLIDE Maintainers"]
lazy_static = "1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
chrono = "0"
futures-util = "0"
tokio = { version = "1", features = ["macros", "time"] }

opentelemetry = "0"
opentelemetry_sdk = { version = "0", features = ["rt-tokio"] }
5 changes: 5 additions & 0 deletions glide-core/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use lazy_static::lazy_static;
use serde::Serialize;
use std::sync::RwLock as StdRwLock;
mod open_telemetry;
mod open_telemetry_exporter_file;

pub use open_telemetry::{GlideOpenTelemetry, GlideSpan};
pub use open_telemetry_exporter_file::SpanExporterFile;

#[derive(Default, Serialize)]
#[allow(dead_code)]
Expand Down
Loading

0 comments on commit a1b5f2e

Please sign in to comment.