Skip to content

Commit

Permalink
add unit tests for transactionMetrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
GenerousMan committed Oct 30, 2023
1 parent ca74b6b commit eba8ac8
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@
public class TransactionMetrics extends ConfigManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

private final ConcurrentMap<String, Metric> transactionCounts =
private ConcurrentMap<String, Metric> transactionCounts =
new ConcurrentHashMap<>(1024);

private final DataVersion dataVersion = new DataVersion();
private DataVersion dataVersion = new DataVersion();

private final String configPath;

Expand Down Expand Up @@ -86,10 +86,13 @@ public long getTransactionCount(String topic) {
public Map<String, Metric> getTransactionCounts() {
return transactionCounts;
}
public void setTransactionCounts(ConcurrentMap<String, Metric> transactionCounts) {
this.transactionCounts = transactionCounts;
}

protected void write0(Writer writer) {
TransactionMetricsSerializeWrapper wrapper = new TransactionMetricsSerializeWrapper();
wrapper.setTimingCount(transactionCounts);
wrapper.setTransactionCount(transactionCounts);
wrapper.setDataVersion(dataVersion);
JSON.writeJSONString(writer, wrapper, SerializerFeature.BrowserCompatible);
}
Expand All @@ -110,7 +113,7 @@ public void decode(String jsonString) {
TransactionMetricsSerializeWrapper transactionMetricsSerializeWrapper =
TransactionMetricsSerializeWrapper.fromJson(jsonString, TransactionMetricsSerializeWrapper.class);
if (transactionMetricsSerializeWrapper != null) {
this.transactionCounts.putAll(transactionMetricsSerializeWrapper.getTimingCount());
this.transactionCounts.putAll(transactionMetricsSerializeWrapper.getTransactionCount());
this.dataVersion.assignNewOne(transactionMetricsSerializeWrapper.getDataVersion());
}
}
Expand All @@ -120,14 +123,18 @@ public void decode(String jsonString) {
public String encode(boolean prettyFormat) {
TransactionMetricsSerializeWrapper metricsSerializeWrapper = new TransactionMetricsSerializeWrapper();
metricsSerializeWrapper.setDataVersion(this.dataVersion);
metricsSerializeWrapper.setTimingCount(this.transactionCounts);
metricsSerializeWrapper.setTransactionCount(this.transactionCounts);
return metricsSerializeWrapper.toJson(prettyFormat);
}

public DataVersion getDataVersion() {
return dataVersion;
}

public void setDataVersion(DataVersion dataVersion) {
this.dataVersion = dataVersion;
}

public void cleanMetrics(Set<String> topics) {
if (topics == null || topics.isEmpty()) {
return;
Expand All @@ -139,12 +146,11 @@ public void cleanMetrics(Set<String> topics) {
if (topic.startsWith(TopicValidator.SYSTEM_TOPIC_PREFIX)) {
continue;
}
if (topics.contains(topic)) {
if (!topics.contains(topic)) {
continue;
}

// in the input topics set, then remove it.
iterator.remove();
log.info("clean timer metrics, because not in topic config, {}", topic);
}
}

Expand All @@ -153,11 +159,11 @@ public static class TransactionMetricsSerializeWrapper extends RemotingSerializa
new ConcurrentHashMap<>(1024);
private DataVersion dataVersion = new DataVersion();

public ConcurrentMap<String, Metric> getTimingCount() {
public ConcurrentMap<String, Metric> getTransactionCount() {
return transactionCount;
}

public void setTimingCount(
public void setTransactionCount(
ConcurrentMap<String, Metric> transactionCount) {
this.transactionCount = transactionCount;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.apache.rocketmq.broker.transaction.queue;

import org.apache.rocketmq.broker.transaction.TransactionMetrics;
import org.apache.rocketmq.broker.transaction.TransactionMetrics.Metric;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;


@RunWith(MockitoJUnitRunner.class)
public class TransactionMetricsTest {
private TransactionMetrics transactionMetrics;
private ConcurrentMap<String, Metric> transactionCounts = new ConcurrentHashMap<>();
private String configPath;

@Before
public void setUp() throws Exception {
configPath = "configPath";
transactionMetrics = new TransactionMetrics(configPath);
}

/**
* 测试addAndGet方法
*/
@Test
public void testAddAndGet() {
String topic = "testAddAndGet";
int value = 10;
long result = transactionMetrics.addAndGet(topic, value);

assert result == value;
}

/**
* 测试获取主题对应的度量值
*/
@Test
public void testGetTopicPair() {
String topic = "getTopicPair";
Metric result = transactionMetrics.getTopicPair(topic);
assert result != null;
}

/**
* 测试获取事务计数
*/
@Test
public void testGetTransactionCount() {
String topicExist = "topicExist";
String topicNotExist = "topicNotExist";

transactionMetrics.addAndGet(topicExist, 10);

assert transactionMetrics.getTransactionCount(topicExist) == 10;
assert transactionMetrics.getTransactionCount(topicNotExist) == 0;
}


/**
* 测试清除指标
*/
@Test
public void testCleanMetrics() {
String topic = "testCleanMetrics";
int value = 10;
assert transactionMetrics.addAndGet(topic, value) == value;
transactionMetrics.cleanMetrics(Collections.singleton(topic));
assert transactionMetrics.getTransactionCount(topic) == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,29 @@
package org.apache.rocketmq.store.timer;

import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import io.opentelemetry.api.common.Attributes;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.math.NumberUtils;
Expand Down Expand Up @@ -48,30 +71,6 @@
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.util.PerfCounter;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class TimerMessageStore {

private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
Expand Down

0 comments on commit eba8ac8

Please sign in to comment.