Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry Metrics #214

Merged
merged 21 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f8463b8
First pass at adding telemetry
MichaelGHSeg Feb 28, 2024
5ab0c18
Fixing metric name
MichaelGHSeg Feb 28, 2024
8f13b6d
More tags, singleton, other improvements
MichaelGHSeg Mar 4, 2024
c5170b3
Fixing missing values
MichaelGHSeg Mar 5, 2024
1adbbba
Many features added and bugs fixed. Currently does not send log data.
MichaelGHSeg Mar 22, 2024
ffde914
A few improvements around enable
MichaelGHSeg Mar 22, 2024
f1c1d73
Adding empty header to missing parameter of HTTPExceptions in tests
MichaelGHSeg Mar 22, 2024
2902722
Addressing a bunch of smaller PR comments
MichaelGHSeg Mar 23, 2024
21398d6
Custom scope & dispatcher
MichaelGHSeg Mar 25, 2024
daa39b5
Tests and some fixes
MichaelGHSeg Apr 12, 2024
9ceb010
A couple more tests
MichaelGHSeg Apr 12, 2024
c2e51cc
Fixing queue async issue
MichaelGHSeg Apr 23, 2024
4961f89
Merge branch 'main' of ssh://github.com/segmentio/analytics-kotlin in…
MichaelGHSeg Apr 23, 2024
93e4d6e
Fixing unit test
MichaelGHSeg Apr 26, 2024
c119e29
Merge branch 'MichaelGHSeg/Telemetry' of ssh://github.com/segmentio/a…
MichaelGHSeg May 1, 2024
b0809c9
Several PR fixes for concurrency and structure
MichaelGHSeg May 1, 2024
03a356f
Attempt to combine repetative error code, and metric const rename
MichaelGHSeg May 1, 2024
c14bc56
Changing tag building to a trailing lambda
MichaelGHSeg May 2, 2024
8189a58
Last few PR fixes plus a few extras
MichaelGHSeg May 2, 2024
c8d29d2
Setting inital enabled state to default, fixing even type value
MichaelGHSeg May 3, 2024
edeb8c8
Adjusting default behavior for limited release and fixing some associ…
MichaelGHSeg May 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
.cxx
local.properties
.idea
/core/bin
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ open class Analytics protected constructor(

init {
require(configuration.isValid()) { "invalid configuration" }
Telemetry.increment(Telemetry.INVOKE,
mapOf("message" to "configured", "apihost" to configuration.apiHost, "cdnhost" to configuration.cdnHost,
"flush" to "at:${configuration.flushAt} int:${configuration.flushInterval} pol:${configuration.flushPolicies.count()}"))
build()
}

Expand All @@ -96,10 +93,8 @@ open class Analytics protected constructor(
object : CoroutineConfiguration {
override val store = Store()
val exceptionHandler = CoroutineExceptionHandler { _, t ->
Analytics.segmentLog(
"Caught Exception in Analytics Scope: ${t}"
)
Telemetry.error(Telemetry.INVOKE_ERROR,
reportErrorWithMetrics(null, t,
"Caught Exception in Analytics Scope", Telemetry.INVOKE_ERROR_METRIC,
mapOf("error" to t.toString(), "message" to "Exception in Analytics Scope"),
t.stackTraceToString()
)
Expand All @@ -121,6 +116,10 @@ open class Analytics protected constructor(
add(ContextPlugin())
add(UserInfoPlugin())

Telemetry.increment(Telemetry.INVOKE_METRIC,
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved
mapOf("message" to "configured", "apihost" to configuration.apiHost, "cdnhost" to configuration.cdnHost,
"flush" to "at:${configuration.flushAt} int:${configuration.flushInterval} pol:${configuration.flushPolicies.count()}"))

// Setup store
analyticsScope.launch(analyticsDispatcher) {
store.also {
Expand All @@ -136,6 +135,8 @@ open class Analytics protected constructor(
add(SegmentDestination())
}

Telemetry.subscribe(store)
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved

checkSettings()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ fun Analytics.reportInternalError(error: Throwable) {
Analytics.reportInternalError(error)
}

fun reportErrorWithMetrics(analytics: Analytics?, error: Throwable, message: String, metric:String, tags: Map<String, String>, log: String) {
analytics?.configuration?.errorHandler?.invoke(error)
var fullMessage = message
error.message?.let { fullMessage += ": $it"}
Analytics.segmentLog(fullMessage)
Telemetry.error(metric, tags, log)
}

fun Analytics.Companion.reportInternalError(error: Throwable) {
error.message?.let {
Analytics.segmentLog(it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class HTTPClient(
URL(url)
} catch (e: MalformedURLException) {
val error = IOException("Attempted to use malformed url: $url", e)
Analytics.reportInternalError(error)
Telemetry.error(Telemetry.INVOKE_ERROR,
reportErrorWithMetrics(null, e,
"Attempted to use malformed url: $url", Telemetry.INVOKE_ERROR_METRIC,
mapOf("error" to e.toString(), "writekey" to writeKey, "message" to "Malformed url"),
e.stackTraceToString()
)
Expand Down
17 changes: 2 additions & 15 deletions core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.double
import kotlinx.serialization.json.jsonPrimitive
import kotlinx.serialization.serializer
import java.io.BufferedReader

Expand Down Expand Up @@ -100,13 +98,6 @@ suspend fun Analytics.checkSettings() {
update(settingsObj)
}

settingsObj?.metrics?.get("sampleRate")?.jsonPrimitive?.double.also {
if (it != null) {
Telemetry.sampleRate = it
Telemetry.start()
}
}

// we're good to go back to a running state.
store.dispatch(System.ToggleRunningAction(running = true), System::class)
}
Expand All @@ -123,12 +114,8 @@ internal fun Analytics.fetchSettings(
log("Fetched Settings: $settingsString")
LenientJson.decodeFromString(settingsString)
} catch (ex: Exception) {
reportInternalError(ex)
Analytics.segmentLog(
"${ex.message}: failed to fetch settings",
kind = LogKind.ERROR
)
Telemetry.error(Telemetry.INVOKE_ERROR,
reportErrorWithMetrics(this, ex, "Failed to fetch settings",
Telemetry.INVOKE_ERROR_METRIC,
mapOf("error" to ex.toString(), "writekey" to writeKey, "message" to "Error retrieving settings"),
ex.stackTraceToString()
)
Expand Down
97 changes: 62 additions & 35 deletions core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import com.segment.analytics.kotlin.core.utilities.SegmentInstant
import kotlinx.serialization.*
import kotlinx.serialization.json.Json
import kotlinx.coroutines.*
import sovran.kotlin.Store
import sovran.kotlin.Subscriber
import java.net.HttpURLConnection
import java.lang.System
import java.time.LocalDateTime
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executors
import kotlin.math.min
import kotlin.math.roundToInt
Expand Down Expand Up @@ -36,15 +38,16 @@ fun logError(err: Throwable) {
Analytics.reportInternalError(err)
}

object Telemetry {
object Telemetry: Subscriber {
private const val METRICS_BASE_TAG = "analytics_mobile"
// Metric class for Analytics SDK
const val INVOKE = "analytics_mobile.invoke"
const val INVOKE_METRIC = "$METRICS_BASE_TAG.invoke"
// Metric class for Analytics SDK errors
const val INVOKE_ERROR = "analytics_mobile.invoke.error"
const val INVOKE_ERROR_METRIC = "$METRICS_BASE_TAG.invoke.error"
// Metric class for Analytics SDK plugins
const val INTEGRATION = "analytics_mobile.integration.invoke"
const val INTEGRATION_METRIC = "$METRICS_BASE_TAG.integration.invoke"
// Metric class for Analytics SDK plugin errors
const val INTEGRATION_ERROR = "analytics_mobile.integration.invoke.error"
const val INTEGRATION_ERROR_METRIC = "$METRICS_BASE_TAG.integration.invoke.error"

var enable: Boolean = true
var host: String = Constants.DEFAULT_API_HOST
Expand All @@ -64,39 +67,33 @@ object Telemetry {
field = min(value, MAX_QUEUE_BYTES)
}

private val queue = mutableListOf<RemoteMetric>()
private val queue = ConcurrentLinkedQueue<RemoteMetric>()
private var queueBytes = 0
private var queueSizeExceeded = false
private val seenErrors = mutableMapOf<String, Int>()
private var started = false
private var rateLimitEndTime: Long = 0
private var telemetryScope: CoroutineScope? = null
private var telemetryDispatcher: ExecutorCoroutineDispatcher? = null
private val exceptionHandler = CoroutineExceptionHandler { _, t ->
errorHandler?.let {
it( Exception(
"Caught Exception in Telemetry Scope: ${t.message}",
t
))
}
}
private var telemetryScope: CoroutineScope = CoroutineScope(SupervisorJob() + exceptionHandler)
private var telemetryDispatcher: ExecutorCoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private var telemetryJob: Job? = null
fun start() {
if (started || sampleRate == 0.0) return
started = true

// Assume sampleRate is now set and everything in the queue hasn't had it applied
if (Math.random() > sampleRate) {
resetQueue()
} else {
queue.forEach {
it.value = (it.value / sampleRate).roundToInt()
}
}

val exceptionHandler = CoroutineExceptionHandler { _, t ->
errorHandler?.let {
it( Exception(
"Caught Exception in Telemetry Scope: ${t.message}",
t
))
}
}
telemetryScope = CoroutineScope(SupervisorJob() + exceptionHandler)
telemetryDispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher()

telemetryScope?.launch(telemetryDispatcher!!) {
telemetryJob = telemetryScope.launch(telemetryDispatcher) {
while (isActive) {
if (!enable) return@launch
try {
Expand All @@ -116,28 +113,27 @@ object Telemetry {

fun reset()
{
telemetryDispatcher?.close()
telemetryScope = null
telemetryDispatcher = null
telemetryJob?.cancel()
resetQueue()
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved
seenErrors.clear()
started = false
rateLimitEndTime = 0
}

fun increment(metric: String, tags: Map<String, String>) {
wenxi-zeng marked this conversation as resolved.
Show resolved Hide resolved
start()
if (!enable || sampleRate == 0.0) return
if (!metric.startsWith("analytics_mobile.")) return
if (!metric.startsWith(METRICS_BASE_TAG)) return
if (tags.isEmpty()) return
if (Math.random() > sampleRate) return
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved
if (queue.size >= maxQueueSize) return

addRemoteMetric(metric, tags, value = (1.0 / sampleRate).roundToInt())
addRemoteMetric(metric, tags)
}

fun error(metric:String, tags: Map<String, String>, log: String) {
if (!enable || sampleRate == 0.0) return
if (!metric.startsWith("analytics_mobile.")) return
if (!metric.startsWith(METRICS_BASE_TAG)) return
if (tags.isEmpty()) return
if (queue.size >= maxQueueSize) return

Expand All @@ -157,8 +153,9 @@ object Telemetry {
if (seenErrors.containsKey(errorKey)) {
seenErrors[errorKey] = seenErrors[errorKey]!! + 1
if (Math.random() > sampleRate) return
// Send how many we've seen after the first since we know for sure.
addRemoteMetric(metric, filteredTags, log=logData, value = seenErrors[errorKey]!!)
// Adjust how many we've seen after the first since we know for sure.
addRemoteMetric(metric, filteredTags, log=logData,
value = (seenErrors[errorKey]!! * sampleRate).toInt())
seenErrors[errorKey] = 0
} else {
addRemoteMetric(metric, filteredTags, log=logData)
Expand Down Expand Up @@ -189,18 +186,32 @@ object Telemetry {
}

private fun send() {
if (sampleRate == 0.0) return
var queueCount = queue.size
// Reset queue data size counter since all current queue items will be removed
queueBytes = 0
queueSizeExceeded = false
val sendQueue = mutableListOf<RemoteMetric>()
while (queueCount-- > 0 && !queue.isEmpty()) {
val m = queue.poll()
if(m != null) {
m.value = (m.value / sampleRate).roundToInt()
sendQueue.add(m)
}
}
try {
// Json.encodeToString by default does not include default values
// We're using this to leave off the 'log' parameter if unset.
val payload = Json.encodeToString(mapOf("series" to queue))
resetQueue()
val payload = Json.encodeToString(mapOf("series" to sendQueue))

val connection = httpClient.upload(host)
connection.outputStream?.use { outputStream ->
// Write the JSON string to the outputStream.
outputStream.write(payload.toByteArray(Charsets.UTF_8))
outputStream.flush() // Ensure all data is written
}
connection.inputStream?.close()
connection.outputStream?.close()
connection.close()
} catch (e: HTTPException) {
errorHandler?.invoke(e)
Expand Down Expand Up @@ -263,6 +274,22 @@ object Telemetry {
}
}

internal suspend fun subscribe(store: Store) {
store.subscribe(
this,
com.segment.analytics.kotlin.core.System::class,
initialState = true,
handler = ::systemUpdate,
queue = telemetryDispatcher
)
}
private suspend fun systemUpdate(system: com.segment.analytics.kotlin.core.System) {
system.settings?: let {
Telemetry.sampleRate = it.sampleRate
Telemetry.start()
}
}

private fun resetQueue() {
queue.clear()
queueBytes = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.segment.analytics.kotlin.core.BaseEvent
import com.segment.analytics.kotlin.core.Telemetry
import com.segment.analytics.kotlin.core.platform.plugins.logger.LogKind
import com.segment.analytics.kotlin.core.platform.plugins.logger.segmentLog
import com.segment.analytics.kotlin.core.reportErrorWithMetrics
import com.segment.analytics.kotlin.core.reportInternalError
import java.util.concurrent.CopyOnWriteArrayList
import kotlin.reflect.KClass
Expand Down Expand Up @@ -34,7 +35,7 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList<Plugin> = Cop
result?.let {
val copy = it.copy<BaseEvent>()
try {
Telemetry.increment(Telemetry.INTEGRATION,
Telemetry.increment(Telemetry.INTEGRATION_METRIC,
mapOf("message" to "event-${it.type}", "plugin" to "${plugin.type}-${plugin.javaClass}"))
when (plugin) {
is DestinationPlugin -> {
Expand All @@ -45,9 +46,9 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList<Plugin> = Cop
}
}
} catch (t: Throwable) {
Analytics.reportInternalError("Caught Exception in plugin: $t")
Analytics.segmentLog("Skipping plugin due to Exception: $plugin", kind = LogKind.WARNING)
Telemetry.error(Telemetry.INTEGRATION_ERROR,
reportErrorWithMetrics(null, t,"Caught Exception in plugin",
Telemetry.INTEGRATION_ERROR_METRIC,
mapOf("error" to t.toString(), "plugin" to "${plugin.type}-${plugin.javaClass}",
"writekey" to plugin.analytics.configuration.writeKey, "message" to "Exception executing plugin"),
t.stackTraceToString()
Expand All @@ -64,9 +65,8 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList<Plugin> = Cop
try {
closure(it)
} catch (t: Throwable) {
Analytics.reportInternalError(t)
Analytics.segmentLog("Caught Exception applying closure to plugin: $it: $t")
Telemetry.error(Telemetry.INTEGRATION_ERROR,
reportErrorWithMetrics(null, t,
"Caught Exception applying closure to plugin: $it", Telemetry.INTEGRATION_ERROR_METRIC,
mapOf("error" to t.toString(), "plugin" to "${it.type}-${it.javaClass}",
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved
"writekey" to it.analytics.configuration.writeKey, "message" to "Exception executing plugin"),
t.stackTraceToString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.segment.analytics.kotlin.core.BaseEvent
import com.segment.analytics.kotlin.core.System
import com.segment.analytics.kotlin.core.Telemetry
import com.segment.analytics.kotlin.core.platform.plugins.logger.segmentLog
import com.segment.analytics.kotlin.core.reportErrorWithMetrics
import com.segment.analytics.kotlin.core.reportInternalError
import kotlinx.coroutines.launch
import kotlin.reflect.KClass
Expand Down Expand Up @@ -65,16 +66,15 @@ internal class Timeline {
try {
plugin.setup(analytics)
} catch (t: Throwable) {
analytics.reportInternalError(t)
Analytics.segmentLog("Caught Exception while setting up plugin $plugin: $t")
Telemetry.error(Telemetry.INTEGRATION_ERROR,
reportErrorWithMetrics(analytics, t,
"Caught Exception while setting up plugin $plugin", Telemetry.INTEGRATION_ERROR_METRIC,
mapOf("plugin" to "${plugin.type.toString()}-${plugin.javaClass.toString()}",
"error" to t.toString(), "writekey" to analytics.configuration.writeKey,
"message" to "Exception setting up plugin"),
t.stackTraceToString()
)
}
Telemetry.increment(Telemetry.INTEGRATION,
Telemetry.increment(Telemetry.INTEGRATION_METRIC,
mapOf("message" to "added",
"plugin" to "${plugin.type.toString()}-${plugin.javaClass.toString()}"))

Expand Down
Loading
Loading