From 84fd562991208171baf11c86f6d98e5be5ffb7a7 Mon Sep 17 00:00:00 2001 From: Krishnan Mahadevan Date: Tue, 19 Sep 2023 14:24:03 +0530 Subject: [PATCH] Support suite level thread pools for data provider MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #2980 We can now configure TestNG such that it uses a Suite level thread pool when running data driven Tests in parallel. This can be enabled via the configuration “-shareThreadPoolForDataProviders” with a value of “true” Alternatively one can also use the suite level attribute “share-thread-pool-for-data-providers” --- CHANGES.txt | 1 + .../main/java/org/testng/xml/XmlSuite.java | 14 ++++ .../main/java/org/testng/CommandLineArgs.java | 9 +++ .../src/main/java/org/testng/TestNG.java | 15 ++++ .../org/testng/internal/Configuration.java | 12 +++ .../org/testng/internal/IConfiguration.java | 4 + .../java/org/testng/internal/ObjectBag.java | 57 ++++++++++++++ .../java/org/testng/internal/PoolService.java | 22 +++++- .../internal/invokers/MethodRunner.java | 14 +++- .../org/testng/xml/TestNGContentHandler.java | 10 +++ testng-core/src/main/resources/testng-1.0.dtd | 5 +- .../test/dataprovider/DataProviderTest.java | 77 +++++++++++++++++++ .../issue2980/LoggingListener.java | 36 +++++++++ .../issue2980/TestClassSample.java | 42 ++++++++++ .../2980_with_shared_threadpool_disabled.xml | 9 +++ .../2980_with_shared_threadpool_enabled.xml | 9 +++ 16 files changed, 332 insertions(+), 4 deletions(-) create mode 100644 testng-core/src/main/java/org/testng/internal/ObjectBag.java create mode 100644 testng-core/src/test/java/test/dataprovider/issue2980/LoggingListener.java create mode 100644 testng-core/src/test/java/test/dataprovider/issue2980/TestClassSample.java create mode 100644 testng-core/src/test/resources/2980_with_shared_threadpool_disabled.xml create mode 100644 testng-core/src/test/resources/2980_with_shared_threadpool_enabled.xml diff --git a/CHANGES.txt b/CHANGES.txt index 068e3c082a..46e27cf4c0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ Current +Fixed: GITHUB-2980: Data Provider Threads configuration in the suite don't match the documentation (Krishnan Mahadevan) Fixed: GITHUB-3003: BeforeClass|AfterClass with inheritedGroups triggers cyclic dependencies (Krishnan Mahadevan) New: Added @Inherited to the Listeners annotation, allowing it to be used in forming meta-annotations. (Pavlo Glushchenko) Fixed: GITHUB-2991: Suite attributes map should be thread safe (Krishnan Mahadevan) diff --git a/testng-core-api/src/main/java/org/testng/xml/XmlSuite.java b/testng-core-api/src/main/java/org/testng/xml/XmlSuite.java index 34a606e686..5c9f8864ac 100644 --- a/testng-core-api/src/main/java/org/testng/xml/XmlSuite.java +++ b/testng-core-api/src/main/java/org/testng/xml/XmlSuite.java @@ -6,6 +6,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.UUID; import org.testng.ITestObjectFactory; import org.testng.collections.Lists; import org.testng.collections.Maps; @@ -123,6 +124,9 @@ public String toString() { private String m_parentModule = ""; private String m_guiceStage = ""; + /** Represents a unique id for this suite. Can be used for uniquely identifying the xml suite. */ + public final UUID SUITE_ID = UUID.randomUUID(); + /** Whether to SKIP or CONTINUE to re-attempt failed configuration methods. */ public static final FailurePolicy DEFAULT_CONFIG_FAILURE_POLICY = FailurePolicy.SKIP; @@ -139,6 +143,8 @@ public String toString() { public static final Boolean DEFAULT_SKIP_FAILED_INVOCATION_COUNTS = Boolean.FALSE; private Boolean m_skipFailedInvocationCounts = DEFAULT_SKIP_FAILED_INVOCATION_COUNTS; + private boolean shareThreadPoolForDataProviders = false; + /** The thread count. */ public static final Integer DEFAULT_THREAD_COUNT = 5; @@ -243,6 +249,14 @@ public Class getObjectFactoryClass() { return m_objectFactoryClass; } + public void setShareThreadPoolForDataProviders(boolean shareThreadPoolForDataProviders) { + this.shareThreadPoolForDataProviders = shareThreadPoolForDataProviders; + } + + public boolean isShareThreadPoolForDataProviders() { + return shareThreadPoolForDataProviders; + } + @Deprecated public void setObjectFactory(ITestObjectFactory objectFactory) { setObjectFactoryClass(objectFactory.getClass()); diff --git a/testng-core/src/main/java/org/testng/CommandLineArgs.java b/testng-core/src/main/java/org/testng/CommandLineArgs.java index bad8b7c0ac..1c1895a987 100644 --- a/testng-core/src/main/java/org/testng/CommandLineArgs.java +++ b/testng-core/src/main/java/org/testng/CommandLineArgs.java @@ -274,4 +274,13 @@ public class CommandLineArgs { names = GENERATE_RESULTS_PER_SUITE, description = "Should TestNG consider failures in Data Providers as test failures.") public Boolean generateResultsPerSuite = false; + + public static final String SHARE_THREAD_POOL_FOR_DATA_PROVIDERS = + "-shareThreadPoolForDataProviders"; + + @Parameter( + names = SHARE_THREAD_POOL_FOR_DATA_PROVIDERS, + description = + "Should TestNG use a global Shared ThreadPool (At suite level) for running data providers.") + public Boolean shareThreadPoolForDataProviders = false; } diff --git a/testng-core/src/main/java/org/testng/TestNG.java b/testng-core/src/main/java/org/testng/TestNG.java index fa0e81b642..5d0ba463fe 100644 --- a/testng-core/src/main/java/org/testng/TestNG.java +++ b/testng-core/src/main/java/org/testng/TestNG.java @@ -31,6 +31,7 @@ import org.testng.internal.ExitCode; import org.testng.internal.IConfiguration; import org.testng.internal.ListenerOrderDeterminer; +import org.testng.internal.ObjectBag; import org.testng.internal.OverrideProcessor; import org.testng.internal.ReporterConfig; import org.testng.internal.RuntimeBehavior; @@ -609,6 +610,14 @@ public boolean isPropagateDataProviderFailureAsTestFailure() { return this.m_configuration.isPropagateDataProviderFailureAsTestFailure(); } + public void shareThreadPoolForDataProviders(boolean flag) { + this.m_configuration.shareThreadPoolForDataProviders(flag); + } + + public boolean isShareThreadPoolForDataProviders() { + return this.m_configuration.isShareThreadPoolForDataProviders(); + } + /** * Set the suites file names to be run by this TestNG object. This method tries to load and parse * the specified TestNG suite xml files. If a file is missing, it is ignored. @@ -1082,6 +1091,7 @@ public void run() { m_end = System.currentTimeMillis(); if (null != suiteRunners) { + suiteRunners.forEach(ObjectBag::cleanup); generateReports(suiteRunners); } @@ -1186,6 +1196,9 @@ public List runSuitesLocally() { // First initialize the suite runners to ensure there are no configuration issues. // Create a map with XmlSuite as key and corresponding SuiteRunner as value for (XmlSuite xmlSuite : m_suites) { + if (m_configuration.isShareThreadPoolForDataProviders()) { + xmlSuite.setShareThreadPoolForDataProviders(true); + } createSuiteRunners(suiteRunnerMap, xmlSuite); } @@ -1454,6 +1467,8 @@ public static TestNG privateMain(String[] argv, ITestListener listener) { * @param cla The command line parameters */ protected void configure(CommandLineArgs cla) { + Optional.ofNullable(cla.shareThreadPoolForDataProviders) + .ifPresent(this::shareThreadPoolForDataProviders); Optional.ofNullable(cla.propagateDataProviderFailureAsTestFailure) .ifPresent(value -> propagateDataProviderFailureAsTestFailure()); setReportAllDataDrivenTestsAsSkipped(cla.includeAllDataDrivenTestsWhenSkipping); diff --git a/testng-core/src/main/java/org/testng/internal/Configuration.java b/testng-core/src/main/java/org/testng/internal/Configuration.java index db11ea482a..03b41bd2b9 100644 --- a/testng-core/src/main/java/org/testng/internal/Configuration.java +++ b/testng-core/src/main/java/org/testng/internal/Configuration.java @@ -23,6 +23,8 @@ public class Configuration implements IConfiguration { private ITestObjectFactory m_objectFactory; private IHookable m_hookable; private IConfigurable m_configurable; + + private boolean shareThreadPoolForDataProviders = false; private final Map, IExecutionListener> m_executionListeners = Maps.newLinkedHashMap(); private final Map, IConfigurationListener> @@ -168,4 +170,14 @@ public void propagateDataProviderFailureAsTestFailure() { public boolean isPropagateDataProviderFailureAsTestFailure() { return propagateDataProviderFailureAsTestFailure; } + + @Override + public boolean isShareThreadPoolForDataProviders() { + return this.shareThreadPoolForDataProviders; + } + + @Override + public void shareThreadPoolForDataProviders(boolean flag) { + this.shareThreadPoolForDataProviders = flag; + } } diff --git a/testng-core/src/main/java/org/testng/internal/IConfiguration.java b/testng-core/src/main/java/org/testng/internal/IConfiguration.java index cd49902b16..b081ff6c50 100644 --- a/testng-core/src/main/java/org/testng/internal/IConfiguration.java +++ b/testng-core/src/main/java/org/testng/internal/IConfiguration.java @@ -59,4 +59,8 @@ default boolean getReportAllDataDrivenTestsAsSkipped() { void propagateDataProviderFailureAsTestFailure(); boolean isPropagateDataProviderFailureAsTestFailure(); + + boolean isShareThreadPoolForDataProviders(); + + void shareThreadPoolForDataProviders(boolean flag); } diff --git a/testng-core/src/main/java/org/testng/internal/ObjectBag.java b/testng-core/src/main/java/org/testng/internal/ObjectBag.java new file mode 100644 index 0000000000..2276b293b8 --- /dev/null +++ b/testng-core/src/main/java/org/testng/internal/ObjectBag.java @@ -0,0 +1,57 @@ +package org.testng.internal; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; +import org.testng.ISuite; +import org.testng.log4testng.Logger; + +/** + * A simple bean bag that is intended to help share objects during the lifetime of TestNG without + * needing it to be a singleton. + */ +public final class ObjectBag { + + private static final Logger logger = Logger.getLogger(ObjectBag.class); + private final Map, Object> bag = new ConcurrentHashMap<>(); + + private static final Map instances = new ConcurrentHashMap<>(); + + public static ObjectBag getInstance(ISuite suite) { + return instances.computeIfAbsent(suite.getXmlSuite().SUITE_ID, k -> new ObjectBag()); + } + + public static void cleanup(ISuite suite) { + UUID uid = suite.getXmlSuite().SUITE_ID; + Optional.ofNullable(instances.get(uid)).ifPresent(ObjectBag::cleanup); + instances.remove(uid); + } + + /** + * @param type - The type of the object to be created + * @param supplier - A {@link Supplier} that should be used to produce a new instance + * @return - Either the newly produced instance or the existing instance. + */ + public Object createIfRequired(Class type, Supplier supplier) { + return bag.computeIfAbsent(type, t -> supplier.get()); + } + + public void cleanup() { + bag.values().stream() + .filter(it -> it instanceof Closeable) + .map(it -> (Closeable) it) + .forEach( + it -> { + try { + it.close(); + } catch (IOException e) { + logger.debug("Could not clean-up " + it, e); + } + }); + bag.clear(); + } +} diff --git a/testng-core/src/main/java/org/testng/internal/PoolService.java b/testng-core/src/main/java/org/testng/internal/PoolService.java index 660d169059..5ae8fc47ee 100644 --- a/testng-core/src/main/java/org/testng/internal/PoolService.java +++ b/testng-core/src/main/java/org/testng/internal/PoolService.java @@ -1,5 +1,7 @@ package org.testng.internal; +import java.io.Closeable; +import java.io.IOException; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -15,12 +17,18 @@ import org.testng.internal.thread.ThreadUtil; /** Simple wrapper for an ExecutorCompletionService. */ -public class PoolService { +public class PoolService implements Closeable { private final ExecutorCompletionService m_completionService; private final ExecutorService m_executor; + private final boolean shutdownAfterExecution; + public PoolService(int threadPoolSize) { + this(threadPoolSize, true); + } + + public PoolService(int threadPoolSize, boolean shutdownAfterExecution) { ThreadFactory threadFactory = new ThreadFactory() { @@ -35,6 +43,7 @@ public Thread newThread(@Nonnull Runnable r) { }; m_executor = Executors.newFixedThreadPool(threadPoolSize, threadFactory); m_completionService = new ExecutorCompletionService<>(m_executor); + this.shutdownAfterExecution = shutdownAfterExecution; } public List submitTasksAndWait(List> tasks) { @@ -53,7 +62,16 @@ public List submitTasksAndWait(List> } } - m_executor.shutdown(); + if (shutdownAfterExecution) { + m_executor.shutdown(); + } return result; } + + @Override + public void close() throws IOException { + if (!shutdownAfterExecution) { + m_executor.shutdown(); + } + } } diff --git a/testng-core/src/main/java/org/testng/internal/invokers/MethodRunner.java b/testng-core/src/main/java/org/testng/internal/invokers/MethodRunner.java index 53f1dd687b..ddd8ec556b 100644 --- a/testng-core/src/main/java/org/testng/internal/invokers/MethodRunner.java +++ b/testng-core/src/main/java/org/testng/internal/invokers/MethodRunner.java @@ -7,6 +7,7 @@ import org.testng.ITestResult; import org.testng.collections.CollectionUtils; import org.testng.collections.Lists; +import org.testng.internal.ObjectBag; import org.testng.internal.Parameters; import org.testng.internal.PoolService; import org.testng.internal.invokers.ITestInvoker.FailureContext; @@ -129,7 +130,18 @@ public List runInParallel( // testng387: increment the param index in the bag. parametersIndex += 1; } - PoolService> ps = new PoolService<>(suite.getDataProviderThreadCount()); + + ObjectBag objectBag = ObjectBag.getInstance(context.getSuite()); + boolean sharedThreadPool = context.getSuite().getXmlSuite().isShareThreadPoolForDataProviders(); + + @SuppressWarnings("unchecked") + PoolService> ps = + sharedThreadPool + ? (PoolService>) + objectBag.createIfRequired( + PoolService.class, + () -> new PoolService<>(suite.getDataProviderThreadCount(), false)) + : new PoolService<>(suite.getDataProviderThreadCount()); List> r = ps.submitTasksAndWait(workers); for (List l2 : r) { result.addAll(l2); diff --git a/testng-core/src/main/java/org/testng/xml/TestNGContentHandler.java b/testng-core/src/main/java/org/testng/xml/TestNGContentHandler.java index 249fbf9e09..29044e053d 100644 --- a/testng-core/src/main/java/org/testng/xml/TestNGContentHandler.java +++ b/testng-core/src/main/java/org/testng/xml/TestNGContentHandler.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Stack; import org.testng.ITestObjectFactory; import org.testng.TestNGException; @@ -267,6 +268,15 @@ private void xmlSuite(boolean start, Attributes attributes) { if (null != dataProviderThreadCount) { m_currentSuite.setDataProviderThreadCount(Integer.parseInt(dataProviderThreadCount)); } + + String shareThreadPoolForDataProviders = + attributes.getValue("share-thread-pool-for-data-providers"); + Optional.ofNullable(shareThreadPoolForDataProviders) + .ifPresent( + it -> + m_currentSuite.setShareThreadPoolForDataProviders( + Boolean.parseBoolean(shareThreadPoolForDataProviders))); + String timeOut = attributes.getValue("time-out"); if (null != timeOut) { m_currentSuite.setTimeOut(timeOut); diff --git a/testng-core/src/main/resources/testng-1.0.dtd b/testng-core/src/main/resources/testng-1.0.dtd index 2527392c2a..16bc4a7974 100644 --- a/testng-core/src/main/resources/testng-1.0.dtd +++ b/testng-core/src/main/resources/testng-1.0.dtd @@ -56,11 +56,13 @@ Cedric Beust & Alexandru Popescu @attr skipfailedinvocationcounts Whether to skip failed invocations. @attr data-provider-thread-count An integer giving the size of the thread pool to use for parallel data providers. +@attr share-thread-pool-for-data-providers - Whether TestNG should use a common thread pool + for running parallel data providers. (Works only with TestNG versions 7.9.0 or higher) @attr object-factory A class that implements IObjectFactory that will be used to instantiate the test objects. @attr allow-return-values If true, tests that return a value will be run as well --> -, Integer> pair) { + LoggingListener listener = runDataProviderTest(flag); + assertThat(listener.getMethodNames()) + .withFailMessage("Ensuring that the method names along with parameters match.") + .containsAll(pair.first()); + assertThat(listener.getThreadIds()) + .withFailMessage("Ensuring that the thread ids are correct") + .hasSize(pair.second()); + } + + @Test(description = "GITHUB-2980", dataProvider = "getSuiteFileNames") + public void ensureWeCanShareThreadPoolForDataProvidersThroughSuiteFiles( + String fileName, Pair, Integer> pair) { + TestNG testng = create(); + testng.setTestSuites(Collections.singletonList(fileName)); + LoggingListener listener = new LoggingListener(); + testng.addListener(listener); + testng.run(); + assertThat(listener.getMethodNames()) + .withFailMessage("Ensuring that the method names along with parameters match.") + .containsAll(pair.first()); + assertThat(listener.getThreadIds()) + .withFailMessage("Ensuring that the thread ids are correct") + .hasSize(pair.second()); + } + + @DataProvider + public Object[][] getSuiteFileNames() { + return new Object[][] { + { + "src/test/resources/2980_with_shared_threadpool_enabled.xml", + new Pair<>(METHODS_ISSUE_2980, 5) + }, + { + "src/test/resources/2980_with_shared_threadpool_disabled.xml", + new Pair<>(METHODS_ISSUE_2980, 10) + } + }; + } + + private LoggingListener runDataProviderTest(boolean flag) { + TestNG testng = new TestNG(); + testng.setTestClasses(new Class[] {test.dataprovider.issue2980.TestClassSample.class}); + LoggingListener listener = new LoggingListener(); + testng.addListener(listener); + testng.setDataProviderThreadCount(5); + testng.shareThreadPoolForDataProviders(flag); + testng.run(); + return listener; + } + + private static final List METHODS_ISSUE_2980 = + Lists.newArrayList( + "testMethod_[1]", + "testMethod_[2]", + "testMethod_[3]", + "testMethod_[4]", + "testMethod_[5]", + "anotherTestMethod_[A]", + "anotherTestMethod_[B]", + "anotherTestMethod_[C]", + "anotherTestMethod_[D]", + "anotherTestMethod_[E]"); + + @DataProvider + public Object[][] dataProviderForIssue2980() { + return new Object[][] { + {true, new Pair<>(METHODS_ISSUE_2980, 5)}, + {false, new Pair<>(METHODS_ISSUE_2980, 10)} + }; + } + private static void runTest(boolean isParallel) { TestNG testng = create(TestCaseSample.class); CoreListener listener = new CoreListener(); diff --git a/testng-core/src/test/java/test/dataprovider/issue2980/LoggingListener.java b/testng-core/src/test/java/test/dataprovider/issue2980/LoggingListener.java new file mode 100644 index 0000000000..59c690e1bf --- /dev/null +++ b/testng-core/src/test/java/test/dataprovider/issue2980/LoggingListener.java @@ -0,0 +1,36 @@ +package test.dataprovider.issue2980; + +import java.util.List; +import java.util.stream.Collectors; +import org.testng.IReporter; +import org.testng.ISuite; +import org.testng.ISuiteResult; +import org.testng.internal.collections.Pair; +import org.testng.xml.XmlSuite; + +public class LoggingListener implements IReporter { + + private List> pairs; + + @Override + @SuppressWarnings("unchecked") + public void generateReport( + List xmlSuites, List suites, String outputDirectory) { + pairs = + suites.stream() + .flatMap(it -> it.getResults().values().stream()) + .map(ISuiteResult::getTestContext) + .flatMap(it -> it.getPassedTests().getAllResults().stream()) + .map(it -> it.getAttribute(TestClassSample.THREAD_ID)) + .map(it -> (Pair) it) + .collect(Collectors.toList()); + } + + public final List getThreadIds() { + return pairs.stream().map(Pair::second).distinct().collect(Collectors.toList()); + } + + public final List getMethodNames() { + return pairs.stream().map(Pair::first).distinct().collect(Collectors.toList()); + } +} diff --git a/testng-core/src/test/java/test/dataprovider/issue2980/TestClassSample.java b/testng-core/src/test/java/test/dataprovider/issue2980/TestClassSample.java new file mode 100644 index 0000000000..98abcb6b48 --- /dev/null +++ b/testng-core/src/test/java/test/dataprovider/issue2980/TestClassSample.java @@ -0,0 +1,42 @@ +package test.dataprovider.issue2980; + +import java.util.Arrays; +import org.testng.ITestResult; +import org.testng.Reporter; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import org.testng.internal.collections.Pair; + +public class TestClassSample { + + static final String THREAD_ID = "thread_id"; + + @Test(dataProvider = "dp") + public void testMethod(int ignored) { + recordThreadId(); + } + + @Test(dataProvider = "dp1") + public void anotherTestMethod(String ignored) { + recordThreadId(); + } + + private static void recordThreadId() { + ITestResult itr = Reporter.getCurrentTestResult(); + itr.setAttribute( + THREAD_ID, + new Pair<>( + itr.getMethod().getMethodName() + "_" + Arrays.toString(itr.getParameters()), + Thread.currentThread().getId())); + } + + @DataProvider(name = "dp", parallel = true) + public Object[][] getTestData() { + return new Object[][] {{1}, {2}, {3}, {4}, {5}}; + } + + @DataProvider(name = "dp1", parallel = true) + public Object[][] getTestData1() { + return new Object[][] {{"A"}, {"B"}, {"C"}, {"D"}, {"E"}}; + } +} diff --git a/testng-core/src/test/resources/2980_with_shared_threadpool_disabled.xml b/testng-core/src/test/resources/2980_with_shared_threadpool_disabled.xml new file mode 100644 index 0000000000..fd66a2c321 --- /dev/null +++ b/testng-core/src/test/resources/2980_with_shared_threadpool_disabled.xml @@ -0,0 +1,9 @@ + + + + + + + + + diff --git a/testng-core/src/test/resources/2980_with_shared_threadpool_enabled.xml b/testng-core/src/test/resources/2980_with_shared_threadpool_enabled.xml new file mode 100644 index 0000000000..767b47fbb1 --- /dev/null +++ b/testng-core/src/test/resources/2980_with_shared_threadpool_enabled.xml @@ -0,0 +1,9 @@ + + + + + + + + +