From a612cd7db5ca24acbaaa9ddf298658f7e790abeb Mon Sep 17 00:00:00 2001 From: Gun Lee Date: Thu, 22 Feb 2024 13:59:16 +0900 Subject: [PATCH] [agent.java] Fixed an issue where some method signatures changed when the reactor-core library version went up and webflux was not tracking requests because of this --- .../ScouterOptimizableOperatorProxy.java | 11 +++-- .../scouter/agent/proxy/IReactiveSupport.java | 2 + .../agent/proxy/ReactiveSupportFactory.java | 5 +++ .../xtra/reactive/ReactiveSupport.java | 36 +++++++++++++++-- .../xtra/reactive/ReactiveSupportUtils.java | 34 ++++++++++++++++ .../ReactiveSupportWithCoroutine.java | 40 ++++++++++++++++++- 6 files changed, 120 insertions(+), 8 deletions(-) create mode 100644 scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportUtils.java diff --git a/scouter.agent.java/src/main/java/reactor/core/publisher/ScouterOptimizableOperatorProxy.java b/scouter.agent.java/src/main/java/reactor/core/publisher/ScouterOptimizableOperatorProxy.java index 069ef24ac..85acc2eaa 100644 --- a/scouter.agent.java/src/main/java/reactor/core/publisher/ScouterOptimizableOperatorProxy.java +++ b/scouter.agent.java/src/main/java/reactor/core/publisher/ScouterOptimizableOperatorProxy.java @@ -21,6 +21,8 @@ import scouter.agent.Logger; import scouter.agent.util.Tuple; +import java.lang.reflect.Method; + /** * @author Gun Lee (gunlee01@gmail.com) on 2020/08/08 */ @@ -31,7 +33,8 @@ public class ScouterOptimizableOperatorProxy { public static boolean accessible = false; public static boolean first = true; - public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxScanDepth) { + public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxScanDepth, boolean isReactor34, + Method isCheckpoint) { try { if (!accessible && first) { try { @@ -54,12 +57,14 @@ public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxSca } if (closeAssembly instanceof MonoOnAssembly) { FluxOnAssembly.AssemblySnapshot snapshot = ((MonoOnAssembly) closeAssembly).stacktrace; - if (snapshot != null && snapshot.checkpointed) { + boolean cp = isReactor34 ? (Boolean) isCheckpoint.invoke(snapshot) : snapshot.checkpointed; + if (snapshot != null && cp) { return new Tuple.StringLongPair(snapshot.cached, snapshot.hashCode()); } } else if (closeAssembly instanceof FluxOnAssembly) { FluxOnAssembly.AssemblySnapshot snapshot = ((FluxOnAssembly) closeAssembly).snapshotStack; - if (snapshot != null && snapshot.checkpointed) { + boolean cp = isReactor34 ? (Boolean) isCheckpoint.invoke(snapshot) : snapshot.checkpointed; + if (snapshot != null && cp) { return new Tuple.StringLongPair(snapshot.cached, snapshot.hashCode()); } } diff --git a/scouter.agent.java/src/main/java/scouter/agent/proxy/IReactiveSupport.java b/scouter.agent.java/src/main/java/scouter/agent/proxy/IReactiveSupport.java index 40388932d..76f1cec47 100644 --- a/scouter.agent.java/src/main/java/scouter/agent/proxy/IReactiveSupport.java +++ b/scouter.agent.java/src/main/java/scouter/agent/proxy/IReactiveSupport.java @@ -28,4 +28,6 @@ public interface IReactiveSupport { String dumpScannable(TraceContext traceContext, TraceContext.TimedScannable timedScannable, long now); + boolean isReactor34(); + } diff --git a/scouter.agent.java/src/main/java/scouter/agent/proxy/ReactiveSupportFactory.java b/scouter.agent.java/src/main/java/scouter/agent/proxy/ReactiveSupportFactory.java index 3f4ff885a..38bf7cc70 100644 --- a/scouter.agent.java/src/main/java/scouter/agent/proxy/ReactiveSupportFactory.java +++ b/scouter.agent.java/src/main/java/scouter/agent/proxy/ReactiveSupportFactory.java @@ -39,6 +39,11 @@ public Object monoCoroutineContextHook(Object coroutineContext, TraceContext tra public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannable timedScannable, long now) { return null; } + + @Override + public boolean isReactor34() { + return false; + } }; public static IReactiveSupport create(ClassLoader parent) { diff --git a/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupport.java b/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupport.java index 19de11ee4..337bb7b05 100644 --- a/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupport.java +++ b/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupport.java @@ -42,6 +42,7 @@ import scouter.lang.step.ParameterizedMessageStep; import scouter.util.StringUtil; +import java.lang.reflect.Method; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -49,6 +50,25 @@ public class ReactiveSupport implements IReactiveSupport { static Configure configure = Configure.getInstance(); + private Method subscriberContextMethod; + private static Method isCheckpoint; + private static boolean isReactor34; + + public ReactiveSupport() { + isReactor34 = ReactiveSupportUtils.isSupportReactor34(); + try { + if (isReactor34) { + subscriberContextMethod = Mono.class.getMethod("contextWrite", Function.class); + Class assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot"); + isCheckpoint = assemblySnapshotClass.getDeclaredMethod("isCheckpoint"); + isCheckpoint.setAccessible(true); + } else { + subscriberContextMethod = Mono.class.getMethod("subscriberContext", Function.class); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } @Override public Object subscriptOnContext(Object mono0, final TraceContext traceContext) { @@ -58,12 +78,17 @@ public Object subscriptOnContext(Object mono0, final TraceContext traceContext) } Mono mono = (Mono) mono0; traceContext.isReactiveTxidMarked = true; - return mono.subscriberContext(new Function() { + + Mono monoChain; + Function func = new Function() { @Override public Context apply(Context context) { return context.put(TraceContext.class, traceContext); } - }).doOnSuccess(new Consumer() { + }; + + monoChain = (Mono) subscriberContextMethod.invoke(mono, func); + return monoChain.doOnSuccess(new Consumer() { @Override public void accept(Object o) { TraceMain.endHttpService(new TraceMain.Stat(traceContext), null); @@ -164,7 +189,7 @@ public TxidLifter(CoreSubscriber coreSubscriber, Scannable scannable, Publish this.traceContext = traceContext; Tuple.StringLongPair checkpointPair = ScouterOptimizableOperatorProxy - .nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth); + .nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth, isReactor34, isCheckpoint); checkpointDesc = checkpointPair.aString; Integer parentDepth = context.getOrDefault(SubscribeDepth.class, 0); @@ -321,4 +346,9 @@ public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannab ScouterOptimizableOperatorProxy.appendSources4Dump(scannable, builder, configure.profile_reactor_checkpoint_search_depth); return builder.toString(); } + + @Override + public boolean isReactor34() { + return isReactor34; + } } diff --git a/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportUtils.java b/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportUtils.java new file mode 100644 index 000000000..644c342f1 --- /dev/null +++ b/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportUtils.java @@ -0,0 +1,34 @@ +package scouter.xtra.reactive; + +import reactor.core.publisher.Mono; +import scouter.agent.Logger; + +import java.util.function.Function; + +/** + * @author Gun Lee (gunlee01@gmail.com) on 2/21/24 + */ +public class ReactiveSupportUtils { + + public static boolean isSupportReactor34() { + try { + Class assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot"); + assemblySnapshotClass.getDeclaredMethod("isCheckpoint"); + + Class monoClass = Mono.class; + Class[] parameterTypes = new Class[]{Function.class}; + monoClass.getMethod("contextWrite", parameterTypes); + + return true; + + } catch (ClassNotFoundException | NoSuchMethodException e) { + e.printStackTrace(); + Logger.println("R301", e.getMessage()); + return false; + } catch (Exception e) { + e.printStackTrace(); + Logger.println("R302", e.getMessage(), e); + return false; + } + } +} diff --git a/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportWithCoroutine.java b/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportWithCoroutine.java index 5f305e3f7..273d69cea 100644 --- a/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportWithCoroutine.java +++ b/scouter.agent.java/src/main/java/scouter/xtra/reactive/ReactiveSupportWithCoroutine.java @@ -45,6 +45,7 @@ import scouter.lang.step.ParameterizedMessageStep; import scouter.util.StringUtil; +import java.lang.reflect.Method; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -52,6 +53,25 @@ public class ReactiveSupportWithCoroutine implements IReactiveSupport { static Configure configure = Configure.getInstance(); + private Method subscriberContextMethod; + private static Method isCheckpoint; + private static boolean isReactor34; + + public ReactiveSupportWithCoroutine() { + isReactor34 = ReactiveSupportUtils.isSupportReactor34(); + try { + if (isReactor34) { + subscriberContextMethod = Mono.class.getMethod("contextWrite", Function.class); + Class assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot"); + isCheckpoint = assemblySnapshotClass.getDeclaredMethod("isCheckpoint"); + isCheckpoint.setAccessible(true); + } else { + subscriberContextMethod = Mono.class.getMethod("subscriberContext", Function.class); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } @Override public Object subscriptOnContext(Object mono0, final TraceContext traceContext) { @@ -61,7 +81,16 @@ public Object subscriptOnContext(Object mono0, final TraceContext traceContext) } Mono mono = (Mono) mono0; traceContext.isReactiveTxidMarked = true; - return mono.subscriberContext(new Function() { + Mono monoChain; + Function func = new Function() { + @Override + public Context apply(Context context) { + return context.put(TraceContext.class, traceContext); + } + }; + + monoChain = (Mono) subscriberContextMethod.invoke(mono, func); + return monoChain.subscriberContext(new Function() { @Override public Context apply(Context context) { return context.put(TraceContext.class, traceContext); @@ -161,6 +190,7 @@ public static class TxidLifter implements SpanSubscription, Scannable { private final String checkpointDesc; private final Integer depth; private Subscription orgSubs; + private boolean isReactor34; private enum ReactorCheckPointType { ON_SUBSCRIBE, @@ -176,9 +206,10 @@ public TxidLifter(CoreSubscriber coreSubscriber, Scannable scannable, Publish this.scannable = scannable; this.publisher = publisher; this.traceContext = traceContext; + this.isReactor34 = isReactor34; Tuple.StringLongPair checkpointPair = ScouterOptimizableOperatorProxy - .nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth); + .nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth, isReactor34, isCheckpoint); checkpointDesc = checkpointPair.aString; Integer parentDepth = context.getOrDefault(SubscribeDepth.class, 0); @@ -335,4 +366,9 @@ public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannab ScouterOptimizableOperatorProxy.appendSources4Dump(scannable, builder, configure.profile_reactor_checkpoint_search_depth); return builder.toString(); } + + @Override + public boolean isReactor34() { + return isReactor34; + } }