diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index df4c0071c..8aab1dd8a 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -50,6 +50,8 @@ import net.spy.memcached.compat.SpyObject; import net.spy.memcached.compat.log.LoggerFactory; import net.spy.memcached.internal.ReconnDelay; +import net.spy.memcached.metrics.OpLatencyMonitor; +import net.spy.memcached.metrics.OpThroughputMonitor; import net.spy.memcached.ops.KeyedOperation; import net.spy.memcached.ops.MultiOperationCallback; import net.spy.memcached.ops.Operation; @@ -990,6 +992,7 @@ private void handleReads(MemcachedNode qa) throw new IllegalStateException("No read operation."); } currentOp.readFromBuffer(rbuf); + OpLatencyMonitor.getInstance().recordLatency(currentOp.getStartTime()); if (currentOp.getState() == OperationState.COMPLETE) { getLogger().debug("Completed read op: %s and giving the next %d bytes", currentOp, rbuf.remaining()); @@ -1519,6 +1522,7 @@ public String toString() { * @param op */ public static void opTimedOut(Operation op) { + OpThroughputMonitor.getInstance().addTimeOutedOpCount(1); MemcachedConnection.setTimeout(op, true); } @@ -1528,6 +1532,7 @@ public static void opTimedOut(Operation op) { * @param ops */ public static void opsTimedOut(Collection ops) { + OpThroughputMonitor.getInstance().addTimeOutedOpCount(ops.size()); Collection timedOutNodes = new HashSet<>(); for (Operation op : ops) { try { diff --git a/src/main/java/net/spy/memcached/metrics/LatencyMetricsSnapShot.java b/src/main/java/net/spy/memcached/metrics/LatencyMetricsSnapShot.java new file mode 100644 index 000000000..9e2212911 --- /dev/null +++ b/src/main/java/net/spy/memcached/metrics/LatencyMetricsSnapShot.java @@ -0,0 +1,55 @@ +package net.spy.memcached.metrics; + +public class LatencyMetricsSnapShot { + private final long avgLatency; + private final long minLatency; + private final long maxLatency; + private final long p25Latency; + private final long p50Latency; + private final long p75Latency; + private final long timestamp; // 캐시 생성 시간 + + private static final LatencyMetricsSnapShot EMPTY = new LatencyMetricsSnapShot(0, 0, 0, 0, 0, 0); + + LatencyMetricsSnapShot(long avg, long min, long max, long p25, long p50, long p75) { + this.avgLatency = avg; + this.minLatency = min; + this.maxLatency = max; + this.p25Latency = p25; + this.p50Latency = p50; + this.p75Latency = p75; + this.timestamp = System.currentTimeMillis(); + } + + public static LatencyMetricsSnapShot empty() { + return EMPTY; + } + + public long getAvgLatency() { + return avgLatency; + } + + public long getMinLatency() { + return minLatency; + } + + public long getMaxLatency() { + return maxLatency; + } + + public long getP25Latency() { + return p25Latency; + } + + public long getP50Latency() { + return p50Latency; + } + + public long getP75Latency() { + return p75Latency; + } + + public long getTimestamp() { + return timestamp; + } +} diff --git a/src/main/java/net/spy/memcached/metrics/OpLatencyMonitor.java b/src/main/java/net/spy/memcached/metrics/OpLatencyMonitor.java new file mode 100644 index 000000000..ce485615d --- /dev/null +++ b/src/main/java/net/spy/memcached/metrics/OpLatencyMonitor.java @@ -0,0 +1,149 @@ +package net.spy.memcached.metrics; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import net.spy.memcached.ArcusMBeanServer; + +public final class OpLatencyMonitor implements OpLatencyMonitorMBean { + + private static final OpLatencyMonitor INSTANCE = new OpLatencyMonitor(); + private static final long CACHE_DURATION = 2000; // 2초 캐시 + private static final int WINDOW_SIZE = 10_000; + + private final AtomicReferenceArray latencies = new AtomicReferenceArray<>(WINDOW_SIZE); + private final AtomicInteger currentIndex = new AtomicInteger(0); + private final AtomicInteger count = new AtomicInteger(0); + private final AtomicReference cachedMetrics + = new AtomicReference<>(LatencyMetricsSnapShot.empty()); + private final boolean enabled; + + private OpLatencyMonitor() { + if (System.getProperty("arcus.mbean", "false").toLowerCase().equals("false")) { + enabled = false; + return; + } + enabled = true; + for (int i = 0; i < WINDOW_SIZE; i++) { + latencies.set(i, 0L); + } + + try { + ArcusMBeanServer mbs = ArcusMBeanServer.getInstance(); + mbs.registMBean(this, this.getClass().getPackage().getName() + + ":type=" + this.getClass().getSimpleName()); + } catch (Exception e) { + throw new RuntimeException("Failed to register MBean", e); + } + } + + public static OpLatencyMonitor getInstance() { + return INSTANCE; + } + + public void recordLatency(long startNanos) { + if (!enabled) { + return; + } + long latencyMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanos); + int index = currentIndex.getAndUpdate(i -> (i + 1) % WINDOW_SIZE); + latencies.lazySet(index, latencyMicros); + + if (count.get() < WINDOW_SIZE) { + count.incrementAndGet(); + } + } + + // 모든 메트릭을 한 번에 계산하고 캐시하는 메서드 + private LatencyMetricsSnapShot computeMetrics() { + int currentCount = count.get(); + if (currentCount == 0) { + return LatencyMetricsSnapShot.empty(); + } + + // 현재 데이터를 배열로 복사 + List sortedLatencies = new ArrayList<>(currentCount); + int startIndex = currentIndex.get(); + + for (int i = 0; i < currentCount; i++) { + int idx = (startIndex - i + WINDOW_SIZE) % WINDOW_SIZE; + long value = latencies.get(idx); + if (value > 0) { + sortedLatencies.add(value); + } + } + + if (sortedLatencies.isEmpty()) { + return LatencyMetricsSnapShot.empty(); + } + + sortedLatencies.sort(Long::compareTo); + + // 모든 메트릭을 한 번에 계산 + long avg = sortedLatencies.stream().mapToLong(Long::longValue).sum() / sortedLatencies.size(); + long min = sortedLatencies.get(0); + long max = sortedLatencies.get(sortedLatencies.size() - 1); + long p25 = sortedLatencies.get((int) Math.ceil((sortedLatencies.size() * 25.0) / 100.0) - 1); + long p50 = sortedLatencies.get((int) Math.ceil((sortedLatencies.size() * 50.0) / 100.0) - 1); + long p75 = sortedLatencies.get((int) Math.ceil((sortedLatencies.size() * 75.0) / 100.0) - 1); + + return new LatencyMetricsSnapShot(avg, min, max, p25, p50, p75); + } + + // 캐시된 메트릭을 가져오거나 필요시 새로 계산 + private LatencyMetricsSnapShot getMetricsSnapshot() { + LatencyMetricsSnapShot current = cachedMetrics.get(); + long now = System.currentTimeMillis(); + + // 캐시가 유효한지 확인 + if (now - current.getTimestamp() < CACHE_DURATION) { + return current; + } + + // 새로운 메트릭 계산 및 캐시 업데이트 + LatencyMetricsSnapShot newMetrics = computeMetrics(); + cachedMetrics.set(newMetrics); + return newMetrics; + } + + @Override + public long getAverageLatencyMicros() { + return getMetricsSnapshot().getAvgLatency(); + } + + @Override + public long getMinLatencyMicros() { + return getMetricsSnapshot().getMinLatency(); + } + + @Override + public long getMaxLatencyMicros() { + return getMetricsSnapshot().getMaxLatency(); + } + + @Override + public long get25thPercentileLatencyMicros() { + return getMetricsSnapshot().getP25Latency(); + } + + @Override + public long get50thPercentileLatencyMicros() { + return getMetricsSnapshot().getP50Latency(); + } + + @Override + public long get75thPercentileLatencyMicros() { + return getMetricsSnapshot().getP75Latency(); + } + + @Override + public void resetStatistics() { + count.set(0); + currentIndex.set(0); + cachedMetrics.set(LatencyMetricsSnapShot.empty()); + } +} diff --git a/src/main/java/net/spy/memcached/metrics/OpLatencyMonitorMBean.java b/src/main/java/net/spy/memcached/metrics/OpLatencyMonitorMBean.java new file mode 100644 index 000000000..3c05c45ac --- /dev/null +++ b/src/main/java/net/spy/memcached/metrics/OpLatencyMonitorMBean.java @@ -0,0 +1,17 @@ +package net.spy.memcached.metrics; + +public interface OpLatencyMonitorMBean { + long getAverageLatencyMicros(); + + long getMaxLatencyMicros(); + + long getMinLatencyMicros(); + + long get25thPercentileLatencyMicros(); + + long get50thPercentileLatencyMicros(); + + long get75thPercentileLatencyMicros(); + + void resetStatistics(); +} diff --git a/src/main/java/net/spy/memcached/metrics/OpThroughputMonitor.java b/src/main/java/net/spy/memcached/metrics/OpThroughputMonitor.java new file mode 100644 index 000000000..577222c5c --- /dev/null +++ b/src/main/java/net/spy/memcached/metrics/OpThroughputMonitor.java @@ -0,0 +1,90 @@ +package net.spy.memcached.metrics; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + +import net.spy.memcached.ArcusMBeanServer; + +public final class OpThroughputMonitor implements OpThroughputMonitorMBean { + private static final OpThroughputMonitor INSTANCE = new OpThroughputMonitor(); + + private final LongAdder completeOps = new LongAdder(); + private final LongAdder cancelOps = new LongAdder(); + private final LongAdder timeOutOps = new LongAdder(); + private final AtomicLong lastResetTime = new AtomicLong(System.currentTimeMillis()); + private final boolean enabled; + + private OpThroughputMonitor() { + if (System.getProperty("arcus.mbean", "false").toLowerCase().equals("false")) { + enabled = false; + return; + } + enabled = true; + try { + ArcusMBeanServer mbs = ArcusMBeanServer.getInstance(); + mbs.registMBean(this, this.getClass().getPackage().getName() + + ":type=" + this.getClass().getSimpleName()); + } catch (Exception e) { + throw new RuntimeException("Failed to register Throughput MBean", e); + } + } + + public static OpThroughputMonitor getInstance() { + return INSTANCE; + } + + public void addCompletedOpCount() { + if (!enabled) { + return; + } + completeOps.increment(); + } + + public void addCanceledOpCount() { + if (!enabled) { + return; + } + cancelOps.increment(); + } + + public void addTimeOutedOpCount(int count) { + if (!enabled) { + return; + } + timeOutOps.add(count); + } + + @Override + public long getCompletedOps() { + return getThroughput(completeOps); + } + + @Override + public long getCanceledOps() { + return getThroughput(cancelOps); + } + + @Override + public long getTimeoutOps() { + return getThroughput(timeOutOps); + } + + @Override + public void resetStatistics() { + completeOps.reset(); + cancelOps.reset(); + timeOutOps.reset(); + lastResetTime.set(System.currentTimeMillis()); + } + + private long getThroughput(LongAdder ops) { + long currentTime = System.currentTimeMillis(); + long lastTime = lastResetTime.get(); + long countValue = ops.sum(); + + // 경과 시간 계산 (초 단위) + long elapsedSeconds = (long) ((currentTime - lastTime) / 1000.0); + + return countValue / elapsedSeconds; + } +} diff --git a/src/main/java/net/spy/memcached/metrics/OpThroughputMonitorMBean.java b/src/main/java/net/spy/memcached/metrics/OpThroughputMonitorMBean.java new file mode 100644 index 000000000..dafd70c43 --- /dev/null +++ b/src/main/java/net/spy/memcached/metrics/OpThroughputMonitorMBean.java @@ -0,0 +1,11 @@ +package net.spy.memcached.metrics; + +public interface OpThroughputMonitorMBean { + long getCompletedOps(); + + long getCanceledOps(); + + long getTimeoutOps(); + + void resetStatistics(); +} diff --git a/src/main/java/net/spy/memcached/ops/Operation.java b/src/main/java/net/spy/memcached/ops/Operation.java index 777906e79..49f413ef9 100644 --- a/src/main/java/net/spy/memcached/ops/Operation.java +++ b/src/main/java/net/spy/memcached/ops/Operation.java @@ -134,4 +134,8 @@ public interface Operation { /* ENABLE_MIGRATION end */ APIType getAPIType(); + + void setStartTime(long startTime); + + long getStartTime(); } diff --git a/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java b/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java index e4b6aa229..3936be458 100644 --- a/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java @@ -26,6 +26,7 @@ import net.spy.memcached.MemcachedReplicaGroup; import net.spy.memcached.RedirectHandler; import net.spy.memcached.compat.SpyObject; +import net.spy.memcached.metrics.OpThroughputMonitor; import net.spy.memcached.ops.APIType; import net.spy.memcached.ops.CancelledOperationStatus; import net.spy.memcached.ops.OperationCallback; @@ -58,6 +59,16 @@ public abstract class BaseOperationImpl extends SpyObject { private OperationType opType = OperationType.UNDEFINED; private APIType apiType = APIType.UNDEFINED; + private long startTime; + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public long getStartTime() { + return startTime; + } + /* ENABLE_MIGRATION if */ private RedirectHandler redirectHandler = null; /* ENABLE_MIGRATION end */ @@ -95,6 +106,7 @@ public final OperationException getException() { public final boolean cancel(String cause) { if (callbacked.compareAndSet(false, true)) { cancelled = true; + OpThroughputMonitor.getInstance().addCanceledOpCount(); if (handlingNode != null) { cause += " @ " + handlingNode.getNodeName(); } @@ -222,6 +234,7 @@ protected final void transitionState(OperationState newState) { } if (state == OperationState.COMPLETE && callbacked.compareAndSet(false, true)) { + OpThroughputMonitor.getInstance().addCompletedOpCount(); callback.complete(); } } diff --git a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java index b8e4bc83e..7c48456dc 100644 --- a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java +++ b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java @@ -316,6 +316,7 @@ private Operation getNextWritableOp() { Operation cancelledOp = removeCurrentWriteOp(); assert o == cancelledOp; } else { + o.setStartTime(System.nanoTime()); o.writing(); readQ.add(o); return o;