diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java index 824fc0fee3e..963c5046f24 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java @@ -38,7 +38,7 @@ public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager { protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - protected RocksDBConfigManager rocksDBConfigManager; + protected transient RocksDBConfigManager rocksDBConfigManager; public RocksDBConsumerOffsetManager(BrokerController brokerController) { super(brokerController); @@ -100,7 +100,7 @@ protected void removeConsumerOffset(String topicAtGroup) { byte[] keyBytes = topicAtGroup.getBytes(DataConverter.CHARSET_UTF8); this.rocksDBConfigManager.delete(keyBytes); } catch (Exception e) { - LOG.error("kv remove consumerOffset Failed, {}", topicAtGroup); + log.error("kv remove consumerOffset Failed, {}", topicAtGroup); } } @@ -109,7 +109,7 @@ protected void decodeOffset(final byte[] key, final byte[] body) { RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class); this.offsetTable.put(topicAtGroup, wrapper.getOffsetTable()); - LOG.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable()); + log.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable()); } public String rocksdbConfigFilePath() { @@ -132,12 +132,17 @@ public synchronized void persist() { this.rocksDBConfigManager.batchPutWithWal(writeBatch); this.rocksDBConfigManager.flushWAL(); } catch (Exception e) { - LOG.error("consumer offset persist Failed", e); + log.error("consumer offset persist Failed", e); } finally { writeBatch.close(); } } + public synchronized void exportToJson() { + log.info("RocksDBConsumerOffsetManager export consumer offset to json file"); + super.persist(); + } + private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupName, final ConcurrentMap offsetMap) throws Exception { byte[] keyBytes = topicGroupName.getBytes(DataConverter.CHARSET_UTF8); RocksDBOffsetSerializeWrapper wrapper = new RocksDBOffsetSerializeWrapper(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java index 8fc7a4d6edb..ff471525691 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java @@ -37,7 +37,7 @@ public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager { - protected RocksDBConfigManager rocksDBConfigManager; + protected transient RocksDBConfigManager rocksDBConfigManager; public RocksDBSubscriptionGroupManager(BrokerController brokerController) { super(brokerController, false); @@ -184,6 +184,11 @@ public synchronized void persist() { } } + public synchronized void exportToJson() { + log.info("RocksDBSubscriptionGroupManager export subscription group to json file"); + super.persist(); + } + public String rocksdbConfigFilePath() { return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "subscriptionGroups" + File.separator; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java index 18e633d348b..d64f808067c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java @@ -32,7 +32,7 @@ public class RocksDBTopicConfigManager extends TopicConfigManager { - protected RocksDBConfigManager rocksDBConfigManager; + protected transient RocksDBConfigManager rocksDBConfigManager; public RocksDBTopicConfigManager(BrokerController brokerController) { super(brokerController, false); @@ -139,6 +139,11 @@ public synchronized void persist() { } } + public synchronized void exportToJson() { + log.info("RocksDBTopicConfigManager export topic config to json file"); + super.persist(); + } + public String rocksdbConfigFilePath() { return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "topics" + File.separator; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 6fb7584aa9b..a9b913192fa 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -35,6 +35,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -60,6 +61,9 @@ import org.apache.rocketmq.broker.auth.converter.UserConverter; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager; +import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager; +import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager; import org.apache.rocketmq.broker.controller.ReplicasManager; import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; @@ -159,6 +163,7 @@ import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader; @@ -239,7 +244,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); protected final BrokerController brokerController; protected Set configBlackList = new HashSet<>(); - private final ExecutorService asyncExecuteWorker = new ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); + private final ExecutorService asyncExecuteWorker = new ThreadPoolExecutor(0, 4, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); public AdminBrokerProcessor(final BrokerController brokerController) { this.brokerController = brokerController; @@ -356,6 +361,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return queryConsumeQueue(ctx, request); case RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS: return this.checkRocksdbCqWriteProgress(ctx, request); + case RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON: + return this.exportRocksDBConfigToJson(ctx, request); case RequestCode.UPDATE_AND_GET_GROUP_FORBIDDEN: return this.updateAndGetGroupForbidden(ctx, request); case RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG: @@ -495,6 +502,51 @@ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, R return response; } + private RemotingCommand exportRocksDBConfigToJson(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + ExportRocksDBConfigToJsonRequestHeader requestHeader = request.decodeCommandCustomHeader(ExportRocksDBConfigToJsonRequestHeader.class); + List configTypes = requestHeader.fetchConfigType(); + List> futureList = new ArrayList<>(configTypes.size()); + for (ExportRocksDBConfigToJsonRequestHeader.ConfigType type : configTypes) { + switch (type) { + case TOPICS: + if (this.brokerController.getTopicConfigManager() instanceof RocksDBTopicConfigManager) { + RocksDBTopicConfigManager rocksDBTopicConfigManager = (RocksDBTopicConfigManager) this.brokerController.getTopicConfigManager(); + futureList.add(CompletableFuture.runAsync(rocksDBTopicConfigManager::exportToJson, asyncExecuteWorker)); + } + break; + case SUBSCRIPTION_GROUPS: + if (this.brokerController.getSubscriptionGroupManager() instanceof RocksDBSubscriptionGroupManager) { + RocksDBSubscriptionGroupManager rocksDBSubscriptionGroupManager = (RocksDBSubscriptionGroupManager) this.brokerController.getSubscriptionGroupManager(); + futureList.add(CompletableFuture.runAsync(rocksDBSubscriptionGroupManager::exportToJson, asyncExecuteWorker)); + } + break; + case CONSUMER_OFFSETS: + if (this.brokerController.getConsumerOffsetManager() instanceof RocksDBConsumerOffsetManager) { + RocksDBConsumerOffsetManager rocksDBConsumerOffsetManager = (RocksDBConsumerOffsetManager) this.brokerController.getConsumerOffsetManager(); + futureList.add(CompletableFuture.runAsync(rocksDBConsumerOffsetManager::exportToJson, asyncExecuteWorker)); + } + break; + default: + break; + } + } + + try { + CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); + } catch (CompletionException e) { + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.valueOf(e)); + return response; + } + + RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + response.setRemark("export done."); + return response; + } + @Override public boolean rejectRequest() { return false; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 7d4b51cfc5f..114093e3502 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -164,6 +164,7 @@ import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader; import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader; @@ -3036,6 +3037,21 @@ public CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(final String broker throw new MQClientException(response.getCode(), response.getRemark()); } + public void exportRocksDBConfigToJson(final String brokerAddr, + final List configType, + final long timeoutMillis) throws InterruptedException, + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + ExportRocksDBConfigToJsonRequestHeader header = new ExportRocksDBConfigToJsonRequestHeader(); + header.updateConfigType(configType); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, header); + RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis); + assert response != null; + + if (ResponseCode.SUCCESS != response.getCode()) { + throw new MQClientException(response.getCode(), response.getRemark()); + } + } + public void checkClientInBroker(final String brokerAddr, final String consumerGroup, final String clientId, final SubscriptionData subscriptionData, final long timeoutMillis) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index 623f5748d5a..e3b180a5379 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -219,6 +219,7 @@ public class RequestCode { public static final int GET_SUBSCRIPTIONGROUP_CONFIG = 352; public static final int UPDATE_AND_GET_GROUP_FORBIDDEN = 353; public static final int CHECK_ROCKSDB_CQ_WRITE_PROGRESS = 354; + public static final int EXPORT_ROCKSDB_CONFIG_TO_JSON = 355; public static final int LITE_PULL_MESSAGE = 361; public static final int RECALL_MESSAGE = 370; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java new file mode 100644 index 00000000000..7b1f9470e1e --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeader.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.header; + +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.action.Action; +import org.apache.rocketmq.common.action.RocketMQAction; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RequestCode; + +@RocketMQAction(value = RequestCode.EXPORT_ROCKSDB_CONFIG_TO_JSON, action = Action.GET) +public class ExportRocksDBConfigToJsonRequestHeader implements CommandCustomHeader { + private static final String CONFIG_TYPE_SEPARATOR = ";"; + + public enum ConfigType { + TOPICS("topics"), + SUBSCRIPTION_GROUPS("subscriptionGroups"), + CONSUMER_OFFSETS("consumerOffsets"); + + private final String typeName; + + ConfigType(String typeName) { + this.typeName = typeName; + } + + public static ConfigType getConfigTypeByName(String typeName) { + for (ConfigType configType : ConfigType.values()) { + if (configType.getTypeName().equalsIgnoreCase(typeName.trim())) { + return configType; + } + } + throw new IllegalArgumentException("Unknown config type: " + typeName); + } + + public static List fromString(String ordinal) { + String[] configTypeNames = StringUtils.split(ordinal, CONFIG_TYPE_SEPARATOR); + List configTypes = new ArrayList<>(); + for (String configTypeName : configTypeNames) { + if (StringUtils.isNotEmpty(configTypeName)) { + configTypes.add(getConfigTypeByName(configTypeName)); + } + } + return configTypes; + } + + public static String toString(List configTypes) { + StringBuilder sb = new StringBuilder(); + for (ConfigType configType : configTypes) { + sb.append(configType.getTypeName()).append(CONFIG_TYPE_SEPARATOR); + } + return sb.toString(); + } + + public String getTypeName() { + return typeName; + } + } + + @CFNotNull + private String configType; + + @Override + public void checkFields() throws RemotingCommandException { + + } + + public List fetchConfigType() { + return ConfigType.fromString(configType); + } + + public void updateConfigType(List configType) { + this.configType = ConfigType.toString(configType); + } + + public String getConfigType() { + return configType; + } + + public void setConfigType(String configType) { + this.configType = configType; + } +} \ No newline at end of file diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java new file mode 100644 index 00000000000..bbe625a42af --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/header/ExportRocksDBConfigToJsonRequestHeaderTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.header; + +import java.util.ArrayList; +import java.util.List; +import org.junit.Assert; +import org.junit.Test; + +public class ExportRocksDBConfigToJsonRequestHeaderTest { + @Test + public void configTypeTest() { + List configTypes = new ArrayList<>(); + configTypes.add(ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS); + configTypes.add(ExportRocksDBConfigToJsonRequestHeader.ConfigType.SUBSCRIPTION_GROUPS); + + String string = ExportRocksDBConfigToJsonRequestHeader.ConfigType.toString(configTypes); + + List newConfigTypes = ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString(string); + assert newConfigTypes.size() == 2; + assert configTypes.equals(newConfigTypes); + + List topics = ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString("topics"); + assert topics.size() == 1; + assert topics.get(0).equals(ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS); + + List mix = ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString("toPics; subScriptiongroups"); + assert mix.size() == 2; + assert mix.get(0).equals(ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS); + assert mix.get(1).equals(ExportRocksDBConfigToJsonRequestHeader.ConfigType.SUBSCRIPTION_GROUPS); + + Assert.assertThrows(IllegalArgumentException.class, () -> { + ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString("topics; subscription"); + }); + + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 4b97e14866a..f224f749cbc 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -65,6 +65,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UserInfo; +import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; @@ -778,6 +779,13 @@ public CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(String brokerAddr, return this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr, topic, checkStoreTime); } + @Override + public void exportRocksDBConfigToJson(String brokerAddr, + List configType) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + this.defaultMQAdminExtImpl.exportRocksDBConfigToJson(brokerAddr, configType); + } + @Override public boolean resumeCheckHalfMessage(String topic, String msgId) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 2523013af0d..5be99606dc8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -103,6 +103,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UserInfo; +import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; @@ -1824,6 +1825,13 @@ public CheckRocksdbCqWriteResult checkRocksdbCqWriteProgress(String brokerAddr, return this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr, topic, checkStoreTime, timeoutMillis); } + @Override + public void exportRocksDBConfigToJson(String brokerAddr, + List configType) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + this.mqClientInstance.getMQClientAPIImpl().exportRocksDBConfigToJson(brokerAddr, configType, timeoutMillis); + } + @Override public boolean resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 69a08218646..2f01b6cba81 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -61,6 +61,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UserInfo; +import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; @@ -392,6 +393,10 @@ QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final long index, final int count, final String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; + void exportRocksDBConfigToJson(String brokerAddr, + List configType) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; + boolean resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java index d5726985e3c..438d17d6689 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java @@ -18,6 +18,10 @@ package org.apache.rocketmq.tools.command.export; import com.alibaba.fastjson.JSONObject; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -30,11 +34,6 @@ import org.apache.rocketmq.tools.command.SubCommandException; import org.rocksdb.RocksIterator; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; - public class ExportMetadataInRocksDBCommand implements SubCommand { private static final String TOPICS_JSON_CONFIG = "topics"; private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups"; @@ -46,7 +45,7 @@ public String commandName() { @Override public String commandDesc() { - return "export RocksDB kv config (topics/subscriptionGroups)"; + return "export RocksDB kv config (topics/subscriptionGroups). Recommend to use [mqadmin rocksDBConfigToJson]"; } @Override diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java index f2803b0cbb3..48bc163678b 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/metadata/RocksDBConfigToJsonCommand.java @@ -18,27 +18,38 @@ package org.apache.rocketmq.tools.command.metadata; import com.alibaba.fastjson.JSONObject; +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.config.ConfigRocksDBStorage; import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; import org.rocksdb.RocksIterator; -import java.io.File; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - public class RocksDBConfigToJsonCommand implements SubCommand { - private static final String TOPICS_JSON_CONFIG = "topics"; - private static final String SUBSCRIPTION_GROUP_JSON_CONFIG = "subscriptionGroups"; - private static final String CONSUMER_OFFSETS_JSON_CONFIG = "consumerOffsets"; @Override public String commandName() { @@ -47,41 +58,140 @@ public String commandName() { @Override public String commandDesc() { - return "Convert RocksDB kv config (topics/subscriptionGroups/consumerOffsets) to json"; + return "Convert RocksDB kv config (topics/subscriptionGroups/consumerOffsets) to json. " + + "[rpc mode] Use [-n, -c, -b, -t] to send Request to broker ( version >= 5.3.2 ) or [local mode] use [-p, -t, -j, -e] to load RocksDB. " + + "If -e is provided, tools will export json file instead of std print"; } @Override public Options buildCommandlineOptions(Options options) { + Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " + + "topics/subscriptionGroups/consumerOffsets. Required in local mode and default all in rpc mode."); + options.addOption(configTypeOption); + + // [local mode] options Option pathOption = new Option("p", "configPath", true, - "Absolute path to the metadata config directory"); - pathOption.setRequired(true); + "[local mode] Absolute path to the metadata config directory"); options.addOption(pathOption); - Option configTypeOption = new Option("t", "configType", true, "Name of kv config, e.g. " + - "topics/subscriptionGroups/consumerOffsets"); - configTypeOption.setRequired(true); - options.addOption(configTypeOption); + Option exportPathOption = new Option("e", "exportFile", true, + "[local mode] Absolute file path for exporting, auto backup existing file, not directory. If exportFile is provided, will export Json file and ignore [-j]."); + options.addOption(exportPathOption); + + Option jsonEnableOption = new Option("j", "jsonEnable", true, + "[local mode] Json format enable, Default: true. If exportFile is provided, will export Json file and ignore [-j]."); + options.addOption(jsonEnableOption); + + // [rpc mode] options + Option nameserverOption = new Option("n", "nameserverAddr", true, + "[rpc mode] nameserverAddr. If nameserverAddr and clusterName are provided, will ignore [-p, -e, -j, -b] args"); + options.addOption(nameserverOption); + + Option clusterOption = new Option("c", "cluster", true, + "[rpc mode] Cluster name. If nameserverAddr and clusterName are provided, will ignore [-p, -e, -j, -b] args"); + options.addOption(clusterOption); + + Option brokerAddrOption = new Option("b", "brokerAddr", true, + "[rpc mode] Broker address. If brokerAddr is provided, will ignore [-p, -e, -j] args"); + options.addOption(brokerAddrOption); return options; } @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { + List typeList = getConfigTypeList(commandLine); + + if (commandLine.hasOption("nameserverAddr")) { + // [rpc mode] call all brokers in cluster to export to json file + System.out.print("Use [rpc mode] call all brokers in cluster to export to json file \n"); + checkRequiredArgsProvided(commandLine, "rpc mode", "cluster"); + handleRpcMode(commandLine, rpcHook, typeList); + } else if (commandLine.hasOption("brokerAddr")) { + // [rpc mode] call broker to export to json file + System.out.print("Use [rpc mode] call broker to export to json file \n"); + handleRpcMode(commandLine, rpcHook, typeList); + } else if (commandLine.hasOption("configPath")) { + // [local mode] load rocksdb to print or export file + System.out.print("Use [local mode] load rocksdb to print or export file \n"); + checkRequiredArgsProvided(commandLine, "local mode", "configType"); + handleLocalMode(commandLine); + } else { + System.out.print(commandDesc() + "\n"); + } + } + + private void handleLocalMode(CommandLine commandLine) { + ExportRocksDBConfigToJsonRequestHeader.ConfigType type = Objects.requireNonNull(getConfigTypeList(commandLine)).get(0); String path = commandLine.getOptionValue("configPath").trim(); if (StringUtils.isEmpty(path) || !new File(path).exists()) { System.out.print("Rocksdb path is invalid.\n"); return; } + path = Paths.get(path, type.toString()).toString(); + String exportFile = commandLine.hasOption("exportFile") ? commandLine.getOptionValue("exportFile").trim() : null; + Map configMap = getConfigMapFromRocksDB(path, type); + if (configMap != null) { + if (exportFile == null) { + if (commandLine.hasOption("jsonEnable") && "false".equalsIgnoreCase(commandLine.getOptionValue("jsonEnable").trim())) { + printConfigMapJsonDisable(configMap); + } else { + System.out.print(JSONObject.toJSONString(configMap, true) + "\n"); + } + } else { + String jsonString = JSONObject.toJSONString(configMap, true); + try { + MixAll.string2File(jsonString, exportFile); + } catch (IOException e) { + System.out.print("persist file " + exportFile + " exception" + e); + } + } + } + } - String configType = commandLine.getOptionValue("configType").trim(); - if (!path.endsWith("/")) { - path += "/"; + private void checkRequiredArgsProvided(CommandLine commandLine, String mode, + String... args) throws SubCommandException { + for (String arg : args) { + if (!commandLine.hasOption(arg)) { + System.out.printf("%s Invalid args, please input %s\n", mode, String.join(",", args)); + throw new SubCommandException("Invalid args"); + } } - path += configType; - if (CONSUMER_OFFSETS_JSON_CONFIG.equalsIgnoreCase(configType)) { - printConsumerOffsets(path); - return; + } + + private List getConfigTypeList(CommandLine commandLine) { + List typeList = new ArrayList<>(); + if (commandLine.hasOption("configType")) { + String configType = commandLine.getOptionValue("configType").trim(); + try { + typeList.addAll(ExportRocksDBConfigToJsonRequestHeader.ConfigType.fromString(configType)); + } catch (IllegalArgumentException e) { + System.out.print("Invalid configType: " + configType + " please input topics/subscriptionGroups/consumerOffsets \n"); + return null; + } + } else { + typeList.addAll(Arrays.asList(ExportRocksDBConfigToJsonRequestHeader.ConfigType.values())); } + return typeList; + } + + private static void printConfigMapJsonDisable(Map configMap) { + AtomicLong count = new AtomicLong(0); + for (Map.Entry entry : configMap.entrySet()) { + String configKey = entry.getKey(); + System.out.printf("type: %s", configKey); + JSONObject jsonObject = entry.getValue(); + jsonObject.forEach((k, v) -> System.out.printf("%d, Key: %s, Value: %s%n", count.incrementAndGet(), k, v)); + } + } + + private static Map getConfigMapFromRocksDB(String path, + ExportRocksDBConfigToJsonRequestHeader.ConfigType configType) { + + if (ExportRocksDBConfigToJsonRequestHeader.ConfigType.CONSUMER_OFFSETS.equals(configType)) { + return loadConsumerOffsets(path); + } + ConfigRocksDBStorage configRocksDBStorage = new ConfigRocksDBStorage(path, true); configRocksDBStorage.start(); RocksIterator iterator = configRocksDBStorage.iterator(); @@ -101,24 +211,79 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t byte[] kvDataVersion = configRocksDBStorage.getKvDataVersion(); if (kvDataVersion != null) { configMap.put("dataVersion", - JSONObject.parseObject(new String(kvDataVersion, DataConverter.CHARSET_UTF8))); + JSONObject.parseObject(new String(kvDataVersion, DataConverter.CHARSET_UTF8))); } - if (TOPICS_JSON_CONFIG.equalsIgnoreCase(configType)) { + if (ExportRocksDBConfigToJsonRequestHeader.ConfigType.TOPICS.equals(configType)) { configMap.put("topicConfigTable", configTable); } - if (SUBSCRIPTION_GROUP_JSON_CONFIG.equalsIgnoreCase(configType)) { + if (ExportRocksDBConfigToJsonRequestHeader.ConfigType.SUBSCRIPTION_GROUPS.equals(configType)) { configMap.put("subscriptionGroupTable", configTable); } - System.out.print(JSONObject.toJSONString(configMap, true) + "\n"); + return configMap; } catch (Exception e) { System.out.print("Error occurred while converting RocksDB kv config to json, " + "configType=" + configType + ", " + e.getMessage() + "\n"); } finally { configRocksDBStorage.shutdown(); } + return null; + } + + private void handleRpcMode(CommandLine commandLine, RPCHook rpcHook, + List type) { + String nameserverAddr = commandLine.hasOption('n') ? commandLine.getOptionValue("nameserverAddr").trim() : null; + String inputBrokerAddr = commandLine.hasOption('b') ? commandLine.getOptionValue('b').trim() : null; + String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null; + + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook, 30 * 1000); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + defaultMQAdminExt.setNamesrvAddr(nameserverAddr); + + List> futureList = new ArrayList<>(); + + try { + defaultMQAdminExt.start(); + if (clusterName != null) { + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + Map> clusterAddrTable = clusterInfo.getClusterAddrTable(); + Map brokerAddrTable = clusterInfo.getBrokerAddrTable(); + if (clusterAddrTable.get(clusterName) == null) { + System.out.print("clusterAddrTable is empty"); + return; + } + for (Map.Entry entry : brokerAddrTable.entrySet()) { + String brokerName = entry.getKey(); + BrokerData brokerData = entry.getValue(); + String brokerAddr = brokerData.getBrokerAddrs().get(0L); + futureList.add(sendRequest(type, defaultMQAdminExt, brokerAddr, brokerName)); + } + } else if (inputBrokerAddr != null) { + futureList.add(sendRequest(type, defaultMQAdminExt, inputBrokerAddr, null)); + } + CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete( + (v, t) -> System.out.print("broker export done.") + ).join(); + } catch (Exception e) { + throw new RuntimeException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + private CompletableFuture sendRequest(List type, + DefaultMQAdminExt defaultMQAdminExt, String brokerAddr, String brokerName) { + return CompletableFuture.supplyAsync(() -> { + try { + defaultMQAdminExt.exportRocksDBConfigToJson(brokerAddr, type); + } catch (Throwable t) { + System.out.print((brokerName != null) ? brokerName : brokerAddr + " export error"); + throw new CompletionException(this.getClass().getSimpleName() + " command failed", t); + } + return null; + }); } - private void printConsumerOffsets(String path) { + private static Map loadConsumerOffsets(String path) { ConfigRocksDBStorage configRocksDBStorage = new ConfigRocksDBStorage(path, true); configRocksDBStorage.start(); RocksIterator iterator = configRocksDBStorage.iterator(); @@ -136,12 +301,13 @@ private void printConsumerOffsets(String path) { iterator.next(); } configMap.put("offsetTable", configTable); - System.out.print(JSONObject.toJSONString(configMap, true) + "\n"); + return configMap; } catch (Exception e) { System.out.print("Error occurred while converting RocksDB kv config to json, " + "configType=consumerOffsets, " + e.getMessage() + "\n"); } finally { configRocksDBStorage.shutdown(); } + return null; } static class RocksDBOffsetSerializeWrapper {