diff --git a/scouter.agent.java/src/main/java/reactor/core/publisher/ScouterOptimizableOperatorProxy.java b/scouter.agent.java/src/main/java/reactor/core/publisher/ScouterOptimizableOperatorProxy.java index 069ef24ac..85acc2eaa 100644 --- a/scouter.agent.java/src/main/java/reactor/core/publisher/ScouterOptimizableOperatorProxy.java +++ b/scouter.agent.java/src/main/java/reactor/core/publisher/ScouterOptimizableOperatorProxy.java @@ -21,6 +21,8 @@ import scouter.agent.Logger; import scouter.agent.util.Tuple; +import java.lang.reflect.Method; + /** * @author Gun Lee (gunlee01@gmail.com) on 2020/08/08 */ @@ -31,7 +33,8 @@ public class ScouterOptimizableOperatorProxy { public static boolean accessible = false; public static boolean first = true; - public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxScanDepth) { + public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxScanDepth, boolean isReactor34, + Method isCheckpoint) { try { if (!accessible && first) { try { @@ -54,12 +57,14 @@ public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxSca } if (closeAssembly instanceof MonoOnAssembly) { FluxOnAssembly.AssemblySnapshot snapshot = ((MonoOnAssembly) closeAssembly).stacktrace; - if (snapshot != null && snapshot.checkpointed) { + boolean cp = isReactor34 ? (Boolean) isCheckpoint.invoke(snapshot) : snapshot.checkpointed; + if (snapshot != null && cp) { return new Tuple.StringLongPair(snapshot.cached, snapshot.hashCode()); } } else if (closeAssembly instanceof FluxOnAssembly) { FluxOnAssembly.AssemblySnapshot snapshot = ((FluxOnAssembly) closeAssembly).snapshotStack; - if (snapshot != null && snapshot.checkpointed) { + boolean cp = isReactor34 ? (Boolean) isCheckpoint.invoke(snapshot) : snapshot.checkpointed; + if (snapshot != null && cp) { return new Tuple.StringLongPair(snapshot.cached, snapshot.hashCode()); } } diff --git a/scouter.agent.java/src/main/java/scouter/agent/proxy/IReactiveSupport.java b/scouter.agent.java/src/main/java/scouter/agent/proxy/IReactiveSupport.java index 40388932d..76f1cec47 100644 --- a/scouter.agent.java/src/main/java/scouter/agent/proxy/IReactiveSupport.java +++ b/scouter.agent.java/src/main/java/scouter/agent/proxy/IReactiveSupport.java @@ -28,4 +28,6 @@ public interface IReactiveSupport { String dumpScannable(TraceContext traceContext, TraceContext.TimedScannable timedScannable, long now); + boolean isReactor34(); + } diff --git a/scouter.agent.java/src/main/java/scouter/agent/proxy/ReactiveSupportFactory.java b/scouter.agent.java/src/main/java/scouter/agent/proxy/ReactiveSupportFactory.java index 3f4ff885a..38bf7cc70 100644 --- a/scouter.agent.java/src/main/java/scouter/agent/proxy/ReactiveSupportFactory.java +++ b/scouter.agent.java/src/main/java/scouter/agent/proxy/ReactiveSupportFactory.java @@ -39,6 +39,11 @@ public Object monoCoroutineContextHook(Object coroutineContext, TraceContext tra public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannable timedScannable, long now) { return null; } + + @Override + public boolean isReactor34() { + return false; + } }; public static IReactiveSupport create(ClassLoader parent) { diff --git a/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupport.java b/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupport.java index 19de11ee4..337bb7b05 100644 --- a/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupport.java +++ b/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupport.java @@ -42,6 +42,7 @@ import scouter.lang.step.ParameterizedMessageStep; import scouter.util.StringUtil; +import java.lang.reflect.Method; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -49,6 +50,25 @@ public class ReactiveSupport implements IReactiveSupport { static Configure configure = Configure.getInstance(); + private Method subscriberContextMethod; + private static Method isCheckpoint; + private static boolean isReactor34; + + public ReactiveSupport() { + isReactor34 = ReactiveSupportUtils.isSupportReactor34(); + try { + if (isReactor34) { + subscriberContextMethod = Mono.class.getMethod("contextWrite", Function.class); + Class assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot"); + isCheckpoint = assemblySnapshotClass.getDeclaredMethod("isCheckpoint"); + isCheckpoint.setAccessible(true); + } else { + subscriberContextMethod = Mono.class.getMethod("subscriberContext", Function.class); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } @Override public Object subscriptOnContext(Object mono0, final TraceContext traceContext) { @@ -58,12 +78,17 @@ public Object subscriptOnContext(Object mono0, final TraceContext traceContext) } Mono mono = (Mono) mono0; traceContext.isReactiveTxidMarked = true; - return mono.subscriberContext(new Function() { + + Mono monoChain; + Function func = new Function() { @Override public Context apply(Context context) { return context.put(TraceContext.class, traceContext); } - }).doOnSuccess(new Consumer() { + }; + + monoChain = (Mono) subscriberContextMethod.invoke(mono, func); + return monoChain.doOnSuccess(new Consumer() { @Override public void accept(Object o) { TraceMain.endHttpService(new TraceMain.Stat(traceContext), null); @@ -164,7 +189,7 @@ public TxidLifter(CoreSubscriber coreSubscriber, Scannable scannable, Publish this.traceContext = traceContext; Tuple.StringLongPair checkpointPair = ScouterOptimizableOperatorProxy - .nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth); + .nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth, isReactor34, isCheckpoint); checkpointDesc = checkpointPair.aString; Integer parentDepth = context.getOrDefault(SubscribeDepth.class, 0); @@ -321,4 +346,9 @@ public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannab ScouterOptimizableOperatorProxy.appendSources4Dump(scannable, builder, configure.profile_reactor_checkpoint_search_depth); return builder.toString(); } + + @Override + public boolean isReactor34() { + return isReactor34; + } } diff --git a/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportUtils.java b/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportUtils.java new file mode 100644 index 000000000..644c342f1 --- /dev/null +++ b/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportUtils.java @@ -0,0 +1,34 @@ +package scouter.xtra.reactive; + +import reactor.core.publisher.Mono; +import scouter.agent.Logger; + +import java.util.function.Function; + +/** + * @author Gun Lee (gunlee01@gmail.com) on 2/21/24 + */ +public class ReactiveSupportUtils { + + public static boolean isSupportReactor34() { + try { + Class assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot"); + assemblySnapshotClass.getDeclaredMethod("isCheckpoint"); + + Class monoClass = Mono.class; + Class[] parameterTypes = new Class[]{Function.class}; + monoClass.getMethod("contextWrite", parameterTypes); + + return true; + + } catch (ClassNotFoundException | NoSuchMethodException e) { + e.printStackTrace(); + Logger.println("R301", e.getMessage()); + return false; + } catch (Exception e) { + e.printStackTrace(); + Logger.println("R302", e.getMessage(), e); + return false; + } + } +} diff --git a/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportWithCoroutine.java b/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportWithCoroutine.java index 5f305e3f7..b2a37f0c0 100644 --- a/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportWithCoroutine.java +++ b/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportWithCoroutine.java @@ -45,6 +45,7 @@ import scouter.lang.step.ParameterizedMessageStep; import scouter.util.StringUtil; +import java.lang.reflect.Method; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -52,6 +53,25 @@ public class ReactiveSupportWithCoroutine implements IReactiveSupport { static Configure configure = Configure.getInstance(); + private Method subscriberContextMethod; + private static Method isCheckpoint; + private static boolean isReactor34; + + public ReactiveSupportWithCoroutine() { + isReactor34 = ReactiveSupportUtils.isSupportReactor34(); + try { + if (isReactor34) { + subscriberContextMethod = Mono.class.getMethod("contextWrite", Function.class); + Class assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot"); + isCheckpoint = assemblySnapshotClass.getDeclaredMethod("isCheckpoint"); + isCheckpoint.setAccessible(true); + } else { + subscriberContextMethod = Mono.class.getMethod("subscriberContext", Function.class); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } @Override public Object subscriptOnContext(Object mono0, final TraceContext traceContext) { @@ -61,12 +81,16 @@ public Object subscriptOnContext(Object mono0, final TraceContext traceContext) } Mono mono = (Mono) mono0; traceContext.isReactiveTxidMarked = true; - return mono.subscriberContext(new Function() { + Mono monoChain; + Function func = new Function() { @Override public Context apply(Context context) { return context.put(TraceContext.class, traceContext); } - }).doOnSuccess(new Consumer() { + }; + + monoChain = (Mono) subscriberContextMethod.invoke(mono, func); + return monoChain.doOnSuccess(new Consumer() { @Override public void accept(Object o) { TraceMain.endHttpService(new TraceMain.Stat(traceContext), null); @@ -161,6 +185,7 @@ public static class TxidLifter implements SpanSubscription, Scannable { private final String checkpointDesc; private final Integer depth; private Subscription orgSubs; + private boolean isReactor34; private enum ReactorCheckPointType { ON_SUBSCRIBE, @@ -176,9 +201,10 @@ public TxidLifter(CoreSubscriber coreSubscriber, Scannable scannable, Publish this.scannable = scannable; this.publisher = publisher; this.traceContext = traceContext; + this.isReactor34 = isReactor34; Tuple.StringLongPair checkpointPair = ScouterOptimizableOperatorProxy - .nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth); + .nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth, isReactor34, isCheckpoint); checkpointDesc = checkpointPair.aString; Integer parentDepth = context.getOrDefault(SubscribeDepth.class, 0); @@ -335,4 +361,9 @@ public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannab ScouterOptimizableOperatorProxy.appendSources4Dump(scannable, builder, configure.profile_reactor_checkpoint_search_depth); return builder.toString(); } + + @Override + public boolean isReactor34() { + return isReactor34; + } } diff --git a/scouter.server/src/main/java/scouter/server/CounterManager.java b/scouter.server/src/main/java/scouter/server/CounterManager.java index 3c8360891..185d40444 100644 --- a/scouter.server/src/main/java/scouter/server/CounterManager.java +++ b/scouter.server/src/main/java/scouter/server/CounterManager.java @@ -538,6 +538,9 @@ public String normalizeCountersXml(String xmlString) throws JAXBException { if (counters.familys == null) { counters.familys = new Familys0(new ArrayList<>()); } + if (counters.familys.family == null) { + counters.familys.family = new ArrayList<>(); + } List families = counters.familys.family.stream() .filter(f -> !f.name.contains("zws-metric.")) diff --git a/scouter.webapp/pom.xml b/scouter.webapp/pom.xml index e4474e9ec..7ce354075 100644 --- a/scouter.webapp/pom.xml +++ b/scouter.webapp/pom.xml @@ -73,7 +73,7 @@ com.fasterxml.jackson.core jackson-core - 2.8.11 + 2.12.6 com.fasterxml.jackson.core