Skip to content
This repository has been archived by the owner on May 4, 2019. It is now read-only.

Commit

Permalink
Merge branch 'release/0.9.5'
Browse files Browse the repository at this point in the history
  • Loading branch information
rdegnan committed Oct 18, 2018
2 parents b756cd1 + cd1b9d5 commit 1556287
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 42 deletions.
6 changes: 0 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,3 @@ cache:
- "$HOME/.gradle/caches/"
- "$HOME/.gradle/wrapper/"
- "$HOME/.m2"
env:
global:
- secure: crwXv5Q/iCtIIC81QWgWYNfuGJxSQQjVn83vvRoRMXmfSZ02NnQfDos+p1GxxbIXLUQ8NsaLc8cg0o2PQgsK/l0d7nDExix5Rt+AwzU4UOrUcZh46MgEeZDow4q6ySToOHaeKsaRnbTIpAwU28o9fnZ9ViaQ7s2uZWQZXYigGf0PN1ca9E8MJ6VEMvp34e2wJok6woaWG9ALCL2b5viofTv0zdHXWfGV4rdz9Rq0fThmFn4BlmHdcTwGt2ZLdWcEbzv0qNYNAiVvNJd4uWHOnjRceslAZ/JZxgIzSHK0ctNiZdc9rRNo8BHGQqKvmZqmbzUAU07uS0+iLBnm89KKJyQCXKulDv7NM0ShiFexMQ17TUOdlJtH65ypzUIJ+sbSOj2M+NhThUD8OQlebvjKF6t8M9f1NIXkkH/Xw2XJ146ANBcGyjALVFDIqlKIyCtIoY3/teQWccfbAzBglC24QBizpfgmdBEGOJXs2qR0NxzcIRNc1o9XPyT9BJxa+9JnHyHbF+LG07j1LfaaJSQBIYL7xjDBwEEIOEmP3UrISwjvs1fOQA3oYINguXtl7qi3wNylv35e5xw5acijcTTYHRSY6A5rPdokbK0fCQHCrHldBYDc8Bvy+GnjnYQHynjdWGbTnkvMgvYe2aHD+mH5pO5V7DRYHhnJsuMuscRIELc=
- secure: ZMu9rhzTT1F6pyqqrd0TGhAeUGigAvGZT5rJv2ifZJEB8+FOQ+/ylJ/Q3Gq9Wx8quWVR/ocTWF8yEuA1BlZF1Z+tfOcCXOaYPrgMFNhgmETK9X1AQN1gahkTzLNWEZUzKwte5IwaDYDwMe54VcJpY9/f7R6ZyjKVEJOLCjnJ+YLR2rjttsug9ZurH3iLzdgRGMHyI55+dz83MkdumkkATFAeLnQepdL2n4ijD0NvuzvlvaU+Knz9AZCX2kSNDM+hn+K0fFDM4CBFduBi8gEYmRlnSXslHtfwuZRViZQaS4voRMAFmWPNCH67ya3f1voi41BWXgaGTW5He8XDjQtFcIphvykmMOsS4ka9u4K0LEbReoSBRwBUAhigGMFXB1lMup66+WvjvSvwS9rMVVGaXpVGjgpM5shWpiIEoWICWU+Pu5ihk9BWv61/tYNNw6u3QlXt1ZfjM50F7eoJ1ctfYqClrlD3iwrBJ47aIu4kH1HdCp9uxkxq+Cz75n2UdV6Py7eEW1N141fnZ/Iax8nzbNu4SxccsJn1yUv22LJCteW3//GqvRVa7hets587PFpfkyaraBLNBvh90/wouXK/yvYBZN4WK7VwnuCUmxTjTglT9mQGUgPOYNMDU/m7rDp2zDc+8aMEfQjWGktH93M0g1l9yhidc8AMQbyAaxkk73M=
- secure: hxmPq8HtqUN04I639LiDA6euoWW8ziWs70USv7nwRLSE0gV/oDrwFrDX5QJNP4rP04hD/UCll0jKZPHClVjXfW7s9U2wqrFlyJifFoefQms7HZIselUuRXAjkK7KVA8ur1kz+6Ox3Ji7gxUnkF9cGR+rGAHyllSmuD+UIw2vCUHcqAAlhYzN9NfenjqEBNTQ5QTR0p1I37fDLsxORxTjuUxQO0yaYv1sy0eUUbDSR0XN0VrN/sNFMkV6+m5+A+7/4LYqxmZxyRqSC4Qk726k8Eab0K/QCnxE3TO/QoAPk9QZ2BEtHbrycNhm9yxG9+CGNbZCpQaWH1BEeo5F9B6u1prbikqzd9cTSuLGqR1r4wBDuIDFB8RloOMvoB0Z5+CWeP8X9u7q13vFFDS26crMWbqCCoo9Vcel8Js1H8/awnYrxtYhBk4BRGDzgUfRzaru+GpjE3s1kppkjj01qUo3fN9P+Px/zISPD9OdMgVvdN0O3rGPYHniFS9lQJZaJSJUU8hlApqsTQfK+5j8vIKmPe0ur39XriVeC8Kam8SWMymjLrLaNGOe0XvST9MLKKel7Aj9mknNyZ0dQEr24BLlN/vGHQsHTrpnReLHP3szuF5FXZQsqYQ2uN4Q5000O6IJfbD9FcBJHzOCtyIlNkJ12qK6GiLChwqcpv9muQZcaH8=
- secure: fyh1tUQJBijuGd5jO6ZHl2yn8u5uKgz+9tKgrGNBg2NNlxXvq1TJqBj1/fkZonH3rxPAH4AnmxsR4ktWKhmTci16D3ATflpxVOSQYHdMqbsxpNt4e+um/lnFLp6WORLkcnFAsA0rUrH3eRJXPfZ6YgcY3L4tddiT0sqLZ1ECo5gA915CPN8ANioHzooWJgVBcbr34z2HcJudgKws4C+MoiTKv3C9Ijamg+30D/wgiyRJXPD/azb2KoVx+Bk5yIQuBOl3G/a/LMpK+kv/2jZQjUT/gnKLEvRu9H5xJIL41mFTRgmZxD9E0Rb0VeAaYICfo+4K2g2smXeA/Eb9lmwMQYiEKdG2IH2chjPggBM0hsB42pGc+mFDKaNCtQSVkbQkrg2CmDMwvBaL7IBFNUmlSbZfG1PeQluSNs9uECR0MFM/zyNECLGkcrf3kZXHMuQ+1jYPWf37CuiNbEnN8sQm65t9wqX3TqRB8rksGaThVvYizIon7zuuIrKINynGDEc3ygVUOeOBLT+U9AaF1jatkxuwiZ/VJaReYuJvH76wCUb6k91RQA9udK4n8h8aMomOmTyIWRSnEFObDs1RW6Lixh5PRShTKG8ffoY0vM5IO5i17dPFOmIRo99yqKiWPcfQLK8IP0SiCwc6w0pqAns6H/r8qBgk6BoXGnBT5kyzKr8=
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
group=io.netifi.proteus
version=0.9.4
version=0.9.5
11 changes: 5 additions & 6 deletions gradle/java.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ task ciVersion {
build.dependsOn ciVersion

ext {
rsocketRpcVersion = '0.2.2'
rsocketRpcVersion = '0.2.4'
protobufVersion = '3.6.1'
}

repositories {
jcenter()
mavenLocal()
maven {
url = 'https://dl.bintray.com/netifi/netifi-oss'
url = 'https://dl.bintray.com/rsocket-admin/RSocket'
}
}

Expand Down Expand Up @@ -88,14 +87,14 @@ dependencies {
testCompile 'junit:junit:4.12'

testCompile 'javax.inject:javax.inject:1'
testCompile 'io.projectreactor:reactor-test:3.1.9.RELEASE'
testCompile 'io.projectreactor:reactor-test:3.2.1.RELEASE'
testCompile "com.google.protobuf:protobuf-java:$protobufVersion"
testCompile 'org.hdrhistogram:HdrHistogram:2.1.10'
testCompile 'org.apache.logging.log4j:log4j-api:2.9.0'
testCompile 'org.apache.logging.log4j:log4j-core:2.9.0'
testCompile 'org.apache.logging.log4j:log4j-slf4j-impl:2.9.0'
testCompile 'io.rsocket:rsocket-transport-netty:0.11.7'
testCompile 'io.rsocket:rsocket-transport-local:0.11.7'
testCompile 'io.rsocket:rsocket-transport-netty:0.11.9'
testCompile 'io.rsocket:rsocket-transport-local:0.11.9'
testCompile 'org.mockito:mockito-all:1.10.19'
}

Expand Down
7 changes: 3 additions & 4 deletions proteus-client/src/main/java/io/netifi/proteus/Proteus.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.ipc.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpClient;

/** This is where the magic happens */
public class Proteus implements Closeable {
Expand Down Expand Up @@ -256,7 +256,7 @@ public Proteus build() {
if (sslDisabled) {
clientTransportFactory =
address -> {
TcpClient client = TcpClient.create(opts -> opts.connectAddress(() -> address));
TcpClient client = TcpClient.create().addressSupplier(() -> address);
return TcpClientTransport.create(client);
};
} else {
Expand All @@ -277,8 +277,7 @@ public Proteus build() {
clientTransportFactory =
address -> {
TcpClient client =
TcpClient.create(
opts -> opts.connectAddress(() -> address).sslContext(sslContext));
TcpClient.create().addressSupplier(() -> address).secure(sslContext);
return TcpClientTransport.create(client);
};
} catch (Exception sslException) {
Expand Down
2 changes: 1 addition & 1 deletion proteus-frames/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
dependencies {
compile 'io.rsocket:rsocket-core:0.11.7'
compile 'io.rsocket:rsocket-core:0.11.9'
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.server.HttpServer;
import reactor.ipc.netty.http.server.HttpServerRequest;
import reactor.ipc.netty.http.server.HttpServerResponse;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

@Named("ProteusPrometheusBridge")
public class ProteusPrometheusBridge implements MetricsSnapshotHandler {
Expand Down Expand Up @@ -99,8 +99,11 @@ public static void main(String... args) {
}

private void init() {
HttpServer.create(bindAddress, bindPort)
.newRouter(routes -> routes.post(metricsUrl, this::handle).get(metricsUrl, this::handle))
HttpServer.create()
.host(bindAddress)
.port(bindPort)
.route(routes -> routes.post(metricsUrl, this::handle).get(metricsUrl, this::handle))
.bind()
.subscribe();
}

Expand Down
2 changes: 1 addition & 1 deletion proteus-tracing-openzipkin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies {
protobuf project (':proteus-tracing-idl')
compile project (':proteus-client')

compile 'io.projectreactor.addons:reactor-adapter:3.1.7.RELEASE'
compile 'io.projectreactor.addons:reactor-adapter:3.2.0.RELEASE'
compile "com.google.protobuf:protobuf-java-util:$protobufVersion"

compile 'io.opentracing:opentracing-api:0.31.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.netifi.proteus.tracing;

import brave.Tracing;
import brave.opentracing.BraveTracer;
import io.netifi.proteus.Proteus;
import io.netifi.proteus.rsocket.ProteusSocket;
import io.opentracing.Tracer;
import java.util.Optional;
import java.util.function.Supplier;
import javax.inject.Inject;
import javax.inject.Named;

@Named("ProteusTracerSupplier")
public class ProteusTracerSupplier implements Supplier<Tracer> {
private final Tracer tracer;

@Inject
public ProteusTracerSupplier(Proteus proteus, Optional<String> tracingGroup) {
ProteusSocket proteusSocket = proteus.group(tracingGroup.orElse("com.netifi.proteus.tracing"));

ProteusTracingServiceClient client = new ProteusTracingServiceClient(proteusSocket);
ProteusReporter reporter =
new ProteusReporter(client, proteus.getGroupName(), proteus.getDestination());

Tracing tracing = Tracing.newBuilder().spanReporter(reporter).build();

tracer = BraveTracer.create(tracing);
}

@Override
public Tracer get() {
return tracer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.client.HttpClient;
import reactor.ipc.netty.resources.PoolResources;
import reactor.netty.ByteBufFlux;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import zipkin2.proto3.Span;

public class ProteusZipkinHttpBridge implements ProteusTracingService {
Expand Down Expand Up @@ -82,17 +83,15 @@ public static void main(String... args) {
private synchronized HttpClient getClient() {
if (httpClient == null) {
this.httpClient =
HttpClient.builder()
.options(
builder ->
builder
.compression(true)
.poolResources(PoolResources.fixed("proteusZipkinBridge"))
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30_000)
HttpClient.create(ConnectionProvider.fixed("proteusZipkinBridge"))
.compress(true)
.port(port)
.tcpConfiguration(
tcpClient ->
tcpClient
.host(host)
.port(port))
.build();
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30_000));
}
return httpClient;
}
Expand Down Expand Up @@ -127,12 +126,11 @@ public Mono<Ack> streamSpans(Publisher<Span> messages, ByteBuf metadata) {
.concatMap(
spans ->
getClient()
.post(
zipkinSpansUrl,
request -> {
request.addHeader("Content-Type", "application/json");
return request.sendString(Mono.just(spans));
})
.headers(hh -> hh.add("Content-Type", "application/json"))
.post()
.uri(zipkinSpansUrl)
.send(ByteBufFlux.fromString(Mono.just(spans)))
.response()
.timeout(Duration.ofSeconds(30))
.doOnError(throwable -> resetHttpClient()),
8)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.netifi.proteus.tracing;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.module.SimpleDeserializers;
import com.hubspot.jackson.datatype.protobuf.ProtobufModule;
import io.netty.handler.codec.json.JsonObjectDecoder;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import zipkin2.proto3.Span;

public class TracesStreamer {

private final ObjectMapper objectMapper = protoMapper();
private Function<Integer, Publisher<InputStream>> inputSource;

public TracesStreamer(String zipkinUrl, Mono<HttpClient> client) {
this(zipkinServerStream(zipkinUrl, client));
}

public TracesStreamer(Publisher<InputStream> tracesSource) {
this(v -> tracesSource);
}

private TracesStreamer(Function<Integer, Publisher<InputStream>> inputSource) {
this.inputSource = inputSource;
}

public Flux<Trace> streamTraces(int lookbackSeconds) {
return streamTraces(inputSource.apply(lookbackSeconds));
}

Flux<Trace> streamTraces(Publisher<InputStream> input) {
return Flux.from(input)
.filter(
is -> {
try {
return is.available() > 0;
} catch (IOException e) {
throw Exceptions.propagate(e);
}
})
.map(
is -> {
try {
return objectMapper.readValue(is, new TypeReference<Trace>() {});
} catch (IOException e) {
throw Exceptions.propagate(e);
}
});
}

private static Function<Integer, Publisher<InputStream>> zipkinServerStream(
String zipkinUrl, Mono<HttpClient> client) {
return lookbackSeconds ->
client.flatMapMany(
c ->
c.doOnRequest(
(__, connection) -> connection.addHandler(new JsonObjectDecoder(true)))
.get()
.uri(zipkinQuery(zipkinUrl, lookbackSeconds))
.responseContent()
.asInputStream());
}

private static String zipkinQuery(String zipkinUrl, int lookbackSeconds) {
long lookbackMillis = TimeUnit.SECONDS.toMillis(lookbackSeconds);
return zipkinUrl + "?lookback=" + lookbackMillis + "&limit=100000";
}

private ObjectMapper protoMapper() {
ObjectMapper mapper = new ObjectMapper();
ProtobufModule module = new CustomProtoModule();
mapper.registerModule(module);
return mapper;
}

public static class CustomProtoModule extends ProtobufModule {
@Override
public void setupModule(SetupContext context) {
super.setupModule(context);
SimpleDeserializers deser = new SimpleDeserializers();
deser.addDeserializer(Trace.class, new TracersDeserializer());
context.addDeserializers(deser);
}
}

public static class TracersDeserializer extends StdDeserializer<Trace> {

public TracersDeserializer() {
this(null);
}

protected TracersDeserializer(Class<?> vc) {
super(vc);
}

@Override
public Trace deserialize(JsonParser p, DeserializationContext ctx) throws IOException {
Trace.Builder traceBuilder = Trace.newBuilder();
while (p.nextToken() != JsonToken.END_ARRAY) {
traceBuilder.addSpans(ctx.readValue(p, Span.class));
}
return traceBuilder.build();
}
}
}
Loading

0 comments on commit 1556287

Please sign in to comment.