Skip to content

Commit

Permalink
Merge pull request scouter-project#982 from scouter-project/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
gunlee01 authored Feb 26, 2024
2 parents 3818904 + 474c456 commit 891c295
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import scouter.agent.Logger;
import scouter.agent.util.Tuple;

import java.lang.reflect.Method;

/**
* @author Gun Lee ([email protected]) on 2020/08/08
*/
Expand All @@ -31,7 +33,8 @@ public class ScouterOptimizableOperatorProxy {
public static boolean accessible = false;
public static boolean first = true;

public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxScanDepth) {
public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxScanDepth, boolean isReactor34,
Method isCheckpoint) {
try {
if (!accessible && first) {
try {
Expand All @@ -54,12 +57,14 @@ public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxSca
}
if (closeAssembly instanceof MonoOnAssembly) {
FluxOnAssembly.AssemblySnapshot snapshot = ((MonoOnAssembly) closeAssembly).stacktrace;
if (snapshot != null && snapshot.checkpointed) {
boolean cp = isReactor34 ? (Boolean) isCheckpoint.invoke(snapshot) : snapshot.checkpointed;
if (snapshot != null && cp) {
return new Tuple.StringLongPair(snapshot.cached, snapshot.hashCode());
}
} else if (closeAssembly instanceof FluxOnAssembly) {
FluxOnAssembly.AssemblySnapshot snapshot = ((FluxOnAssembly) closeAssembly).snapshotStack;
if (snapshot != null && snapshot.checkpointed) {
boolean cp = isReactor34 ? (Boolean) isCheckpoint.invoke(snapshot) : snapshot.checkpointed;
if (snapshot != null && cp) {
return new Tuple.StringLongPair(snapshot.cached, snapshot.hashCode());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ public interface IReactiveSupport {

String dumpScannable(TraceContext traceContext, TraceContext.TimedScannable timedScannable, long now);

boolean isReactor34();

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public Object monoCoroutineContextHook(Object coroutineContext, TraceContext tra
public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannable timedScannable, long now) {
return null;
}

@Override
public boolean isReactor34() {
return false;
}
};

public static IReactiveSupport create(ClassLoader parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,33 @@
import scouter.lang.step.ParameterizedMessageStep;
import scouter.util.StringUtil;

import java.lang.reflect.Method;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

public class ReactiveSupport implements IReactiveSupport {

static Configure configure = Configure.getInstance();
private Method subscriberContextMethod;
private static Method isCheckpoint;
private static boolean isReactor34;

public ReactiveSupport() {
isReactor34 = ReactiveSupportUtils.isSupportReactor34();
try {
if (isReactor34) {
subscriberContextMethod = Mono.class.getMethod("contextWrite", Function.class);
Class<?> assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot");
isCheckpoint = assemblySnapshotClass.getDeclaredMethod("isCheckpoint");
isCheckpoint.setAccessible(true);
} else {
subscriberContextMethod = Mono.class.getMethod("subscriberContext", Function.class);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Object subscriptOnContext(Object mono0, final TraceContext traceContext) {
Expand All @@ -58,12 +78,17 @@ public Object subscriptOnContext(Object mono0, final TraceContext traceContext)
}
Mono<?> mono = (Mono<?>) mono0;
traceContext.isReactiveTxidMarked = true;
return mono.subscriberContext(new Function<Context, Context>() {

Mono<?> monoChain;
Function<Context, Context> func = new Function<Context, Context>() {
@Override
public Context apply(Context context) {
return context.put(TraceContext.class, traceContext);
}
}).doOnSuccess(new Consumer<Object>() {
};

monoChain = (Mono<?>) subscriberContextMethod.invoke(mono, func);
return monoChain.doOnSuccess(new Consumer<Object>() {
@Override
public void accept(Object o) {
TraceMain.endHttpService(new TraceMain.Stat(traceContext), null);
Expand Down Expand Up @@ -164,7 +189,7 @@ public TxidLifter(CoreSubscriber<T> coreSubscriber, Scannable scannable, Publish
this.traceContext = traceContext;

Tuple.StringLongPair checkpointPair = ScouterOptimizableOperatorProxy
.nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth);
.nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth, isReactor34, isCheckpoint);
checkpointDesc = checkpointPair.aString;

Integer parentDepth = context.getOrDefault(SubscribeDepth.class, 0);
Expand Down Expand Up @@ -321,4 +346,9 @@ public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannab
ScouterOptimizableOperatorProxy.appendSources4Dump(scannable, builder, configure.profile_reactor_checkpoint_search_depth);
return builder.toString();
}

@Override
public boolean isReactor34() {
return isReactor34;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package scouter.xtra.reactive;

import reactor.core.publisher.Mono;
import scouter.agent.Logger;

import java.util.function.Function;

/**
* @author Gun Lee ([email protected]) on 2/21/24
*/
public class ReactiveSupportUtils {

public static boolean isSupportReactor34() {
try {
Class<?> assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot");
assemblySnapshotClass.getDeclaredMethod("isCheckpoint");

Class<Mono> monoClass = Mono.class;
Class<?>[] parameterTypes = new Class<?>[]{Function.class};
monoClass.getMethod("contextWrite", parameterTypes);

return true;

} catch (ClassNotFoundException | NoSuchMethodException e) {
e.printStackTrace();
Logger.println("R301", e.getMessage());
return false;
} catch (Exception e) {
e.printStackTrace();
Logger.println("R302", e.getMessage(), e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,33 @@
import scouter.lang.step.ParameterizedMessageStep;
import scouter.util.StringUtil;

import java.lang.reflect.Method;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

public class ReactiveSupportWithCoroutine implements IReactiveSupport {

static Configure configure = Configure.getInstance();
private Method subscriberContextMethod;
private static Method isCheckpoint;
private static boolean isReactor34;

public ReactiveSupportWithCoroutine() {
isReactor34 = ReactiveSupportUtils.isSupportReactor34();
try {
if (isReactor34) {
subscriberContextMethod = Mono.class.getMethod("contextWrite", Function.class);
Class<?> assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot");
isCheckpoint = assemblySnapshotClass.getDeclaredMethod("isCheckpoint");
isCheckpoint.setAccessible(true);
} else {
subscriberContextMethod = Mono.class.getMethod("subscriberContext", Function.class);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Object subscriptOnContext(Object mono0, final TraceContext traceContext) {
Expand All @@ -61,12 +81,16 @@ public Object subscriptOnContext(Object mono0, final TraceContext traceContext)
}
Mono<?> mono = (Mono<?>) mono0;
traceContext.isReactiveTxidMarked = true;
return mono.subscriberContext(new Function<Context, Context>() {
Mono<?> monoChain;
Function<Context, Context> func = new Function<Context, Context>() {
@Override
public Context apply(Context context) {
return context.put(TraceContext.class, traceContext);
}
}).doOnSuccess(new Consumer<Object>() {
};

monoChain = (Mono<?>) subscriberContextMethod.invoke(mono, func);
return monoChain.doOnSuccess(new Consumer<Object>() {
@Override
public void accept(Object o) {
TraceMain.endHttpService(new TraceMain.Stat(traceContext), null);
Expand Down Expand Up @@ -161,6 +185,7 @@ public static class TxidLifter<T> implements SpanSubscription<T>, Scannable {
private final String checkpointDesc;
private final Integer depth;
private Subscription orgSubs;
private boolean isReactor34;

private enum ReactorCheckPointType {
ON_SUBSCRIBE,
Expand All @@ -176,9 +201,10 @@ public TxidLifter(CoreSubscriber<T> coreSubscriber, Scannable scannable, Publish
this.scannable = scannable;
this.publisher = publisher;
this.traceContext = traceContext;
this.isReactor34 = isReactor34;

Tuple.StringLongPair checkpointPair = ScouterOptimizableOperatorProxy
.nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth);
.nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth, isReactor34, isCheckpoint);
checkpointDesc = checkpointPair.aString;

Integer parentDepth = context.getOrDefault(SubscribeDepth.class, 0);
Expand Down Expand Up @@ -335,4 +361,9 @@ public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannab
ScouterOptimizableOperatorProxy.appendSources4Dump(scannable, builder, configure.profile_reactor_checkpoint_search_depth);
return builder.toString();
}

@Override
public boolean isReactor34() {
return isReactor34;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,9 @@ public String normalizeCountersXml(String xmlString) throws JAXBException {
if (counters.familys == null) {
counters.familys = new Familys0(new ArrayList<>());
}
if (counters.familys.family == null) {
counters.familys.family = new ArrayList<>();
}
List<Family0> families =
counters.familys.family.stream()
.filter(f -> !f.name.contains("zws-metric."))
Expand Down
2 changes: 1 addition & 1 deletion scouter.webapp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.8.11</version>
<version>2.12.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down

0 comments on commit 891c295

Please sign in to comment.