Skip to content

Commit

Permalink
Merge pull request scouter-project#981 from scouter-project/feature/r…
Browse files Browse the repository at this point in the history
…eactor34

Feature/reactor34
  • Loading branch information
gunlee01 authored Feb 26, 2024
2 parents 2a595d7 + e67e88b commit 474c456
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 10 deletions.
2 changes: 1 addition & 1 deletion scouter.agent.java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.35</version>
<version>3.3.8.RELEASE</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import scouter.agent.Logger;
import scouter.agent.util.Tuple;

import java.lang.reflect.Method;

/**
* @author Gun Lee ([email protected]) on 2020/08/08
*/
Expand All @@ -31,7 +33,8 @@ public class ScouterOptimizableOperatorProxy {
public static boolean accessible = false;
public static boolean first = true;

public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxScanDepth) {
public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxScanDepth, boolean isReactor34,
Method isCheckpoint) {
try {
if (!accessible && first) {
try {
Expand All @@ -54,12 +57,14 @@ public static Tuple.StringLongPair nameOnCheckpoint(Object candidate, int maxSca
}
if (closeAssembly instanceof MonoOnAssembly) {
FluxOnAssembly.AssemblySnapshot snapshot = ((MonoOnAssembly) closeAssembly).stacktrace;
if (snapshot != null && snapshot.isCheckpoint()) {
boolean cp = isReactor34 ? (Boolean) isCheckpoint.invoke(snapshot) : snapshot.checkpointed;
if (snapshot != null && cp) {
return new Tuple.StringLongPair(snapshot.cached, snapshot.hashCode());
}
} else if (closeAssembly instanceof FluxOnAssembly) {
FluxOnAssembly.AssemblySnapshot snapshot = ((FluxOnAssembly) closeAssembly).snapshotStack;
if (snapshot != null && snapshot.isCheckpoint()) {
boolean cp = isReactor34 ? (Boolean) isCheckpoint.invoke(snapshot) : snapshot.checkpointed;
if (snapshot != null && cp) {
return new Tuple.StringLongPair(snapshot.cached, snapshot.hashCode());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ public interface IReactiveSupport {

String dumpScannable(TraceContext traceContext, TraceContext.TimedScannable timedScannable, long now);

boolean isReactor34();

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public Object monoCoroutineContextHook(Object coroutineContext, TraceContext tra
public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannable timedScannable, long now) {
return null;
}

@Override
public boolean isReactor34() {
return false;
}
};

public static IReactiveSupport create(ClassLoader parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,33 @@
import scouter.lang.step.ParameterizedMessageStep;
import scouter.util.StringUtil;

import java.lang.reflect.Method;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

public class ReactiveSupport implements IReactiveSupport {

static Configure configure = Configure.getInstance();
private Method subscriberContextMethod;
private static Method isCheckpoint;
private static boolean isReactor34;

public ReactiveSupport() {
isReactor34 = ReactiveSupportUtils.isSupportReactor34();
try {
if (isReactor34) {
subscriberContextMethod = Mono.class.getMethod("contextWrite", Function.class);
Class<?> assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot");
isCheckpoint = assemblySnapshotClass.getDeclaredMethod("isCheckpoint");
isCheckpoint.setAccessible(true);
} else {
subscriberContextMethod = Mono.class.getMethod("subscriberContext", Function.class);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Object subscriptOnContext(Object mono0, final TraceContext traceContext) {
Expand All @@ -58,12 +78,17 @@ public Object subscriptOnContext(Object mono0, final TraceContext traceContext)
}
Mono<?> mono = (Mono<?>) mono0;
traceContext.isReactiveTxidMarked = true;
return mono.contextWrite(new Function<Context, Context>() {

Mono<?> monoChain;
Function<Context, Context> func = new Function<Context, Context>() {
@Override
public Context apply(Context context) {
return context.put(TraceContext.class, traceContext);
}
}).doOnSuccess(new Consumer<Object>() {
};

monoChain = (Mono<?>) subscriberContextMethod.invoke(mono, func);
return monoChain.doOnSuccess(new Consumer<Object>() {
@Override
public void accept(Object o) {
TraceMain.endHttpService(new TraceMain.Stat(traceContext), null);
Expand Down Expand Up @@ -164,7 +189,7 @@ public TxidLifter(CoreSubscriber<T> coreSubscriber, Scannable scannable, Publish
this.traceContext = traceContext;

Tuple.StringLongPair checkpointPair = ScouterOptimizableOperatorProxy
.nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth);
.nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth, isReactor34, isCheckpoint);
checkpointDesc = checkpointPair.aString;

Integer parentDepth = context.getOrDefault(SubscribeDepth.class, 0);
Expand Down Expand Up @@ -321,4 +346,9 @@ public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannab
ScouterOptimizableOperatorProxy.appendSources4Dump(scannable, builder, configure.profile_reactor_checkpoint_search_depth);
return builder.toString();
}

@Override
public boolean isReactor34() {
return isReactor34;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package scouter.xtra.reactive;

import reactor.core.publisher.Mono;
import scouter.agent.Logger;

import java.util.function.Function;

/**
* @author Gun Lee ([email protected]) on 2/21/24
*/
public class ReactiveSupportUtils {

public static boolean isSupportReactor34() {
try {
Class<?> assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot");
assemblySnapshotClass.getDeclaredMethod("isCheckpoint");

Class<Mono> monoClass = Mono.class;
Class<?>[] parameterTypes = new Class<?>[]{Function.class};
monoClass.getMethod("contextWrite", parameterTypes);

return true;

} catch (ClassNotFoundException | NoSuchMethodException e) {
e.printStackTrace();
Logger.println("R301", e.getMessage());
return false;
} catch (Exception e) {
e.printStackTrace();
Logger.println("R302", e.getMessage(), e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,33 @@
import scouter.lang.step.ParameterizedMessageStep;
import scouter.util.StringUtil;

import java.lang.reflect.Method;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

public class ReactiveSupportWithCoroutine implements IReactiveSupport {

static Configure configure = Configure.getInstance();
private Method subscriberContextMethod;
private static Method isCheckpoint;
private static boolean isReactor34;

public ReactiveSupportWithCoroutine() {
isReactor34 = ReactiveSupportUtils.isSupportReactor34();
try {
if (isReactor34) {
subscriberContextMethod = Mono.class.getMethod("contextWrite", Function.class);
Class<?> assemblySnapshotClass = Class.forName("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot");
isCheckpoint = assemblySnapshotClass.getDeclaredMethod("isCheckpoint");
isCheckpoint.setAccessible(true);
} else {
subscriberContextMethod = Mono.class.getMethod("subscriberContext", Function.class);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Object subscriptOnContext(Object mono0, final TraceContext traceContext) {
Expand All @@ -61,12 +81,16 @@ public Object subscriptOnContext(Object mono0, final TraceContext traceContext)
}
Mono<?> mono = (Mono<?>) mono0;
traceContext.isReactiveTxidMarked = true;
return mono.contextWrite(new Function<Context, Context>() {
Mono<?> monoChain;
Function<Context, Context> func = new Function<Context, Context>() {
@Override
public Context apply(Context context) {
return context.put(TraceContext.class, traceContext);
}
}).doOnSuccess(new Consumer<Object>() {
};

monoChain = (Mono<?>) subscriberContextMethod.invoke(mono, func);
return monoChain.doOnSuccess(new Consumer<Object>() {
@Override
public void accept(Object o) {
TraceMain.endHttpService(new TraceMain.Stat(traceContext), null);
Expand Down Expand Up @@ -161,6 +185,7 @@ public static class TxidLifter<T> implements SpanSubscription<T>, Scannable {
private final String checkpointDesc;
private final Integer depth;
private Subscription orgSubs;
private boolean isReactor34;

private enum ReactorCheckPointType {
ON_SUBSCRIBE,
Expand All @@ -176,9 +201,10 @@ public TxidLifter(CoreSubscriber<T> coreSubscriber, Scannable scannable, Publish
this.scannable = scannable;
this.publisher = publisher;
this.traceContext = traceContext;
this.isReactor34 = isReactor34;

Tuple.StringLongPair checkpointPair = ScouterOptimizableOperatorProxy
.nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth);
.nameOnCheckpoint(scannable, configure.profile_reactor_checkpoint_search_depth, isReactor34, isCheckpoint);
checkpointDesc = checkpointPair.aString;

Integer parentDepth = context.getOrDefault(SubscribeDepth.class, 0);
Expand Down Expand Up @@ -335,4 +361,9 @@ public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannab
ScouterOptimizableOperatorProxy.appendSources4Dump(scannable, builder, configure.profile_reactor_checkpoint_search_depth);
return builder.toString();
}

@Override
public boolean isReactor34() {
return isReactor34;
}
}

0 comments on commit 474c456

Please sign in to comment.