diff --git a/scouter.agent.java/pom.xml b/scouter.agent.java/pom.xml
index 617829601..37cfc9c41 100644
--- a/scouter.agent.java/pom.xml
+++ b/scouter.agent.java/pom.xml
@@ -332,7 +332,7 @@
io.projectreactor
reactor-core
- 3.4.35
+ 3.3.8.RELEASE
provided
diff --git a/scouter.agent.java/src/main/java/reactor/core/publisher/ScouterOptimizableOperatorProxy.java b/scouter.agent.java/src/main/java/reactor/core/publisher/ScouterOptimizableOperatorProxy.java
index d2555146c..85acc2eaa 100644
--- a/scouter.agent.java/src/main/java/reactor/core/publisher/ScouterOptimizableOperatorProxy.java
+++ b/scouter.agent.java/src/main/java/reactor/core/publisher/ScouterOptimizableOperatorProxy.java
@@ -21,6 +21,8 @@
import scouter.agent.Logger;
import scouter.agent.util.Tuple;
+import java.lang.reflect.Method;
+
/**
* @author Gun Lee (gunlee01@gmail.com) on 2020/08/08
*/
@@ -31,7 +33,8 @@ public class ScouterOptimizableOperatorProxy {
public static boolean accessible = false;
public static boolean first = true;
- public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxScanDepth) {
+ public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxScanDepth, boolean isReactor34,
+ Method isCheckpoint) {
try {
if (!accessible && first) {
try {
@@ -54,12 +57,14 @@ public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxSca
}
if (closeAssembly instanceof MonoOnAssembly) {
FluxOnAssembly.AssemblySnapshot snapshot = ((MonoOnAssembly) closeAssembly).stacktrace;
- if (snapshot != null && snapshot.isCheckpoint()) {
+ boolean cp = isReactor34 ? (Boolean) isCheckpoint.invoke(snapshot) : snapshot.checkpointed;
+ if (snapshot != null && cp) {
return new Tuple.StringLongPair(snapshot.cached, snapshot.hashCode());
}
} else if (closeAssembly instanceof FluxOnAssembly) {
FluxOnAssembly.AssemblySnapshot snapshot = ((FluxOnAssembly) closeAssembly).snapshotStack;
- if (snapshot != null && snapshot.isCheckpoint()) {
+ boolean cp = isReactor34 ? (Boolean) isCheckpoint.invoke(snapshot) : snapshot.checkpointed;
+ if (snapshot != null && cp) {
return new Tuple.StringLongPair(snapshot.cached, snapshot.hashCode());
}
}
diff --git a/scouter.agent.java/src/main/java/scouter/agent/proxy/IReactiveSupport.java b/scouter.agent.java/src/main/java/scouter/agent/proxy/IReactiveSupport.java
index 40388932d..76f1cec47 100644
--- a/scouter.agent.java/src/main/java/scouter/agent/proxy/IReactiveSupport.java
+++ b/scouter.agent.java/src/main/java/scouter/agent/proxy/IReactiveSupport.java
@@ -28,4 +28,6 @@ public interface IReactiveSupport {
String dumpScannable(TraceContext traceContext, TraceContext.TimedScannable timedScannable, long now);
+ boolean isReactor34();
+
}
diff --git a/scouter.agent.java/src/main/java/scouter/agent/proxy/ReactiveSupportFactory.java b/scouter.agent.java/src/main/java/scouter/agent/proxy/ReactiveSupportFactory.java
index 3f4ff885a..38bf7cc70 100644
--- a/scouter.agent.java/src/main/java/scouter/agent/proxy/ReactiveSupportFactory.java
+++ b/scouter.agent.java/src/main/java/scouter/agent/proxy/ReactiveSupportFactory.java
@@ -39,6 +39,11 @@ public Object monoCoroutineContextHook(Object coroutineContext, TraceContext tra
public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannable timedScannable, long now) {
return null;
}
+
+ @Override
+ public boolean isReactor34() {
+ return false;
+ }
};
public static IReactiveSupport create(ClassLoader parent) {
diff --git a/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupport.java b/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupport.java
index cdafa1a86..337bb7b05 100644
--- a/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupport.java
+++ b/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupport.java
@@ -42,6 +42,7 @@
import scouter.lang.step.ParameterizedMessageStep;
import scouter.util.StringUtil;
+import java.lang.reflect.Method;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -49,6 +50,25 @@
public class ReactiveSupport implements IReactiveSupport {
static Configure configure = Configure.getInstance();
+ private Method subscriberContextMethod;
+ private static Method isCheckpoint;
+ private static boolean isReactor34;
+
+ public ReactiveSupport() {
+ isReactor34 = ReactiveSupportUtils.isSupportReactor34();
+ try {
+ if (isReactor34) {
+ subscriberContextMethod = Mono.class.getMethod("contextWrite", Function.class);
+ Class> assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot");
+ isCheckpoint = assemblySnapshotClass.getDeclaredMethod("isCheckpoint");
+ isCheckpoint.setAccessible(true);
+ } else {
+ subscriberContextMethod = Mono.class.getMethod("subscriberContext", Function.class);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
@Override
public Object subscriptOnContext(Object mono0, final TraceContext traceContext) {
@@ -58,12 +78,17 @@ public Object subscriptOnContext(Object mono0, final TraceContext traceContext)
}
Mono> mono = (Mono>) mono0;
traceContext.isReactiveTxidMarked = true;
- return mono.contextWrite(new Function() {
+
+ Mono> monoChain;
+ Function func = new Function() {
@Override
public Context apply(Context context) {
return context.put(TraceContext.class, traceContext);
}
- }).doOnSuccess(new Consumer