Skip to content

Commit

Permalink
104 daemon threads (#105)
Browse files Browse the repository at this point in the history
* Revert "Update LICENSE"

* Added new option that instructs Tjahzi to use daemon threads (and thus potentially loose data when idle).

* Re #104: Added documentation of useDaemonThreads option to log4j2 and logback. Always using daemon threads for netty event loop group.

* Re #104: Fixed test.

* Re #104: Fixed test.

* Re #104: Added test verifying we start threads as daemons.

* Re #104: Corrected a typo in the docs.

* Re #104: Rewrite of the readme section about useDaemonThreads option.
  • Loading branch information
tkowalcz authored Feb 25, 2023
1 parent 082fe7e commit 32ee5d0
Show file tree
Hide file tree
Showing 15 changed files with 150 additions and 34 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,5 @@ Check out [this wiki article](https://github.com/tkowalcz/tjahzi/wiki/Monitoring

# LICENSE

This work is released under MIT license with the provision that any company registered or conducting operations mainly in Russian Federation IS PROHIBITED from using this software.
This work is released under MIT license. Feel free to use, copy and modify this work as long as you credit original authors.
Pull and feature requests are welcome.
13 changes: 12 additions & 1 deletion core/src/main/java/pl/tkowalcz/tjahzi/LoggingSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ public class LoggingSystem {
private final AgentRunner runner;

private final MonitoringModule monitoringModule;
private final boolean useDaemonThreads;
private final Closeable[] resourcesToCleanup;

public LoggingSystem(
ManyToOneRingBuffer logBuffer,
AgentRunner runner,
MonitoringModule monitoringModule,
boolean useDaemonThreads,
Closeable... resourcesToCleanup
) {
this.logBuffer = logBuffer;
this.runner = runner;

this.monitoringModule = monitoringModule;
this.useDaemonThreads = useDaemonThreads;
this.resourcesToCleanup = resourcesToCleanup;
}

Expand All @@ -33,7 +36,15 @@ public TjahziLogger createLogger() {
}

public void start() {
AgentRunner.startOnThread(runner);
AgentRunner.startOnThread(
runner,
runnable -> {
Thread result = new Thread(runnable);
result.setDaemon(useDaemonThreads);

return result;
}
);
}

public void close(
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/pl/tkowalcz/tjahzi/TjahziInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public LoggingSystem createLoggingSystem(
int bufferSizeBytes,
long logShipperWakeupIntervalMillis,
long shutdownTimeoutMillis,
boolean offHeap
boolean offHeap,
boolean useDaemonThreads
) {
bufferSizeBytes = findNearestPowerOfTwo(bufferSizeBytes);
ByteBuffer javaBuffer = allocateJavaBuffer(bufferSizeBytes, offHeap);
Expand Down Expand Up @@ -64,6 +65,7 @@ public LoggingSystem createLoggingSystem(
logBuffer,
runner,
monitoringModule,
useDaemonThreads,
httpClient
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.util.concurrent.DefaultThreadFactory;
import pl.tkowalcz.tjahzi.stats.MonitoringModule;

import java.io.Closeable;
Expand All @@ -20,9 +21,7 @@ public HttpConnection(ClientConfiguration clientConfiguration, MonitoringModule
this.clientConfiguration = clientConfiguration;
this.monitoringModule = monitoringModule;

ThreadGroup threadGroup = new ThreadGroup("Tjahzi Loki client");
ThreadFactory threadFactory = r -> new Thread(threadGroup, r, "tjahzi-worker");

ThreadFactory threadFactory = new DefaultThreadFactory("tjahzi-worker", true);
this.group = new NioEventLoopGroup(1, threadFactory);

EventLoopGroupRetry retry = new EventLoopGroupRetry(
Expand Down
3 changes: 3 additions & 0 deletions core/src/test/java/pl/tkowalcz/tjahzi/HeadersTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ void shouldIncludeAdditionalHeaders() {
1024 * 1024,
250,
10_000,
false,
false
);
loggingSystem.start();
Expand Down Expand Up @@ -145,6 +146,7 @@ void shouldHandleCaseWithNoAdditionalHeaders() {
1024 * 1024,
250,
10_000,
false,
false
);
loggingSystem.start();
Expand Down Expand Up @@ -201,6 +203,7 @@ void shouldNotOverrideCrucialHeaders() {
1024 * 1024,
250,
10_000,
false,
false
);
loggingSystem.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ void setUp() {
1024 * 1024,
250,
10_000,
false,
false
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ void setUp() {
0,
250,
10_000,
false,
false
);

Expand Down
2 changes: 2 additions & 0 deletions core/src/test/java/pl/tkowalcz/tjahzi/ReconnectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ void shouldEventuallyReconnectIfLokiWasDownWhenStarting() {
1024 * 1024,
250,
10_000,
false,
false
);
loggingSystem.start();
Expand Down Expand Up @@ -138,6 +139,7 @@ void shouldReconnectIfLokiFailed() {
1024 * 1024,
250,
10_000,
false,
false
);
loggingSystem.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
Expand All @@ -26,6 +28,7 @@ class ResourcesCleanupOnCloseTest {
private WireMockServer wireMockServer;
private TjahziInitializer initializer;
private StandardMonitoringModule monitoringModule;
private LoggingSystem loggingSystem;

@BeforeEach
void setUp() {
Expand All @@ -43,6 +46,13 @@ void setUp() {
@AfterEach
void tearDown() {
wireMockServer.stop();

if (loggingSystem != null) {
loggingSystem.close(
(int) TimeUnit.SECONDS.toMillis(10),
System.out::println
);
}
}

@Test
Expand Down Expand Up @@ -72,6 +82,7 @@ void shouldStopThreads() {
1024 * 1024,
250,
10_000,
false,
false
);
loggingSystem.start();
Expand All @@ -83,10 +94,8 @@ void shouldStopThreads() {

assertThat(threadInfos)
.extracting(ThreadInfo::getThreadName)
.contains(
"LogShipper",
"tjahzi-worker"
);
.anyMatch(threadName -> threadName.contains("LogShipper"))
.anyMatch(threadName -> threadName.contains("tjahzi-worker"));
});

//When
Expand All @@ -108,4 +117,58 @@ void shouldStopThreads() {
);
});
}

@Test
void shouldStartThreadsAsDaemon() {
// Given
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.withConnectionTimeoutMillis(10_000)
.withHost("localhost")
.withPort(wireMockServer.port())
.withMaxRetries(1)
.build();

NettyHttpClient httpClient = HttpClientFactory.defaultFactory()
.getHttpClient(
clientConfiguration,
monitoringModule,
"X-Scope-OrgID", "Circus",
"C", "Control"
);

// When
loggingSystem = initializer.createLoggingSystem(
httpClient,
monitoringModule,
Map.of(),
0,
0,
1024 * 1024,
250,
10_000,
false,
true
);
loggingSystem.start();

// Then
Awaitility.await().untilAsserted(() -> {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);

Optional<ThreadInfo> maybeLogShipper = Arrays.stream(threadInfos)
.filter(thread -> thread.getThreadName().contains("LogShipper"))
.findAny();

Optional<ThreadInfo> maybeTjahziWorker = Arrays.stream(threadInfos)
.filter(thread -> thread.getThreadName().contains("tjahzi-worker"))
.findAny();

assertThat(maybeLogShipper)
.hasValueSatisfying(ThreadInfo::isDaemon);

assertThat(maybeTjahziWorker)
.hasValueSatisfying(ThreadInfo::isDaemon);
});
}
}
8 changes: 8 additions & 0 deletions log4j2-appender/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,11 @@ controls how often it wakes up to perform its duties. Other properties control h

On logging system shutdown (or config reload) Tjahzi will flush its internal buffers so that no logs are lost. This
property sets limit on how long to wait for this to complete before proceeding with shutdown.

#### useDaemonThreads (optional, default = false)

If set to true Tjahzi will run all it's threads as daemon threads.

Use this option if you do not want to explicitly close the logging system and still want to make sure Tjahzi internal
threads will not prevent JVM from closing down. Note that this can result in unflushed logs not being delivered when the
JVM is closed.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class LokiAppenderBuilder<B extends LokiAppenderBuilder<B>> extends Abstr
@PluginBuilderAttribute
private boolean useSSL;

@PluginBuilderAttribute
private boolean useDaemonThreads;

@PluginBuilderAttribute
private String username;

Expand Down Expand Up @@ -155,7 +158,8 @@ public LokiAppender build() {
bufferSizeBytes,
logShipperWakeupIntervalMillis,
TimeUnit.SECONDS.toMillis(shutdownTimeoutSeconds),
isUseOffHeapBuffer()
isUseOffHeapBuffer(),
useDaemonThreads
);

int maxLogLineSizeBytes = Math.toIntExact(getMaxLogLineSizeKilobytes() * BYTES_IN_KILOBYTE);
Expand Down Expand Up @@ -201,6 +205,14 @@ public void setUseSSL(boolean useSSL) {
this.useSSL = useSSL;
}

public boolean isUseDaemonThreads() {
return useDaemonThreads;
}

public void setUseDaemonThreads(boolean useDaemonThreads) {
this.useDaemonThreads = useDaemonThreads;
}

public String getUsername() {
return username;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.*;
Expand All @@ -12,6 +13,7 @@
class GrafanaCloudAppenderTest {

@Test
@Disabled
void shouldSendData() {
// Given
System.setProperty("loki.host", "logs-prod-us-central1.grafana.net");
Expand Down
Loading

0 comments on commit 32ee5d0

Please sign in to comment.