Skip to content

Commit

Permalink
[SPARK-47597][STREAMING] Streaming: Migrate logInfo with variables to…
Browse files Browse the repository at this point in the history
… structured logging framework

### What changes were proposed in this pull request?

Migrate logInfo with variables of the streaming module to structured logging framework. This transforms the logInfo entries of the following API
```
def logInfo(msg: => String): Unit
```
to
```
def logInfo(entry: LogEntry): Unit
```

### Why are the changes needed?

To enhance Apache Spark's logging system by implementing structured logging.

### Does this PR introduce _any_ user-facing change?

Yes, Spark core logs will contain additional MDC

### How was this patch tested?

Compiler and scala style checks, as well as code review.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#46192 from dtenedor/streaming-log-info.

Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
dtenedor authored and gengliangwang committed Apr 25, 2024
1 parent 994775a commit d540786
Show file tree
Hide file tree
Showing 31 changed files with 286 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ object LogKey extends Enumeration {
val ARGS = Value
val BACKUP_FILE = Value
val BATCH_ID = Value
val BATCH_TIMESTAMP = Value
val BATCH_WRITE = Value
val BLOCK_ID = Value
val BLOCK_MANAGER_ID = Value
Expand All @@ -48,8 +49,12 @@ object LogKey extends Enumeration {
val CATALOG_NAME = Value
val CATEGORICAL_FEATURES = Value
val CHECKPOINT_FILE = Value
val CHECKPOINT_LOCATION = Value
val CHECKPOINT_PATH = Value
val CHECKPOINT_ROOT = Value
val CHECKPOINT_TIME = Value
val CHECKSUM_FILE_NUM = Value
val CHOSEN_WATERMARK = Value
val CLASS_LOADER = Value
val CLASS_NAME = Value
val CLUSTER_CENTROIDS = Value
Expand All @@ -66,6 +71,8 @@ object LogKey extends Enumeration {
val COLUMN_NAME = Value
val COMMAND = Value
val COMMAND_OUTPUT = Value
val COMMITTED_VERSION = Value
val COMPACT_INTERVAL = Value
val COMPONENT = Value
val CONFIG = Value
val CONFIG2 = Value
Expand All @@ -86,6 +93,7 @@ object LogKey extends Enumeration {
val CSV_SCHEMA_FIELD_NAME = Value
val CSV_SCHEMA_FIELD_NAMES = Value
val CSV_SOURCE = Value
val CURRENT_BATCH_ID = Value
val CURRENT_PATH = Value
val DATA = Value
val DATABASE_NAME = Value
Expand All @@ -95,13 +103,15 @@ object LogKey extends Enumeration {
val DATA_SOURCE = Value
val DATA_SOURCES = Value
val DATA_SOURCE_PROVIDER = Value
val DEFAULT_COMPACT_INTERVAL = Value
val DEFAULT_ISOLATION_LEVEL = Value
val DEFAULT_VALUE = Value
val DELAY = Value
val DELEGATE = Value
val DELTA = Value
val DESCRIPTION = Value
val DESIRED_PARTITIONS_SIZE = Value
val DFS_FILE = Value
val DIFF_DELTA = Value
val DIVISIBLE_CLUSTER_INDICES_SIZE = Value
val DRIVER_ID = Value
Expand All @@ -112,12 +122,13 @@ object LogKey extends Enumeration {
val ENCODING = Value
val END_INDEX = Value
val END_POINT = Value
val END_VERSION = Value
val ENGINE = Value
val EPOCH = Value
val ERROR = Value
val ESTIMATOR_PARAMETER_MAP = Value
val EVENT_LOOP = Value
val EVENT_QUEUE = Value
val EXCEPTION = Value
val EXECUTE_INFO = Value
val EXECUTE_KEY = Value
val EXECUTION_PLAN_LEAVES = Value
Expand All @@ -131,6 +142,7 @@ object LogKey extends Enumeration {
val EXECUTOR_RESOURCES = Value
val EXECUTOR_STATE = Value
val EXECUTOR_TARGET_COUNT = Value
val EXISTING_FILE = Value
val EXIT_CODE = Value
val EXPECTED_NUM_FILES = Value
val EXPECTED_PARTITION_COLUMN = Value
Expand All @@ -144,15 +156,20 @@ object LogKey extends Enumeration {
val FEATURE_DIMENSION = Value
val FIELD_NAME = Value
val FILE_ABSOLUTE_PATH = Value
val FILE_END_OFFSET = Value
val FILE_FORMAT = Value
val FILE_FORMAT2 = Value
val FILE_MODIFICATION_TIME = Value
val FILE_NAME = Value
val FILE_START_OFFSET = Value
val FILE_VERSION = Value
val FINAL_PATH = Value
val FINISH_TRIGGER_DURATION = Value
val FROM_OFFSET = Value
val FROM_TIME = Value
val FUNCTION_NAME = Value
val FUNCTION_PARAMETER = Value
val GLOBAL_WATERMARK = Value
val GROUP_ID = Value
val HADOOP_VERSION = Value
val HASH_JOIN_KEYS = Value
Expand Down Expand Up @@ -194,10 +211,13 @@ object LogKey extends Enumeration {
val LINE = Value
val LINE_NUM = Value
val LISTENER = Value
val LOADED_VERSION = Value
val LOAD_FACTOR = Value
val LOAD_TIME = Value
val LOGICAL_PLAN_COLUMNS = Value
val LOGICAL_PLAN_LEAVES = Value
val LOG_ID = Value
val LOG_OFFSET = Value
val LOG_TYPE = Value
val LOWER_BOUND = Value
val MALFORMATTED_STIRNG = Value
Expand All @@ -216,10 +236,15 @@ object LogKey extends Enumeration {
val MEMORY_SIZE = Value
val MERGE_DIR_NAME = Value
val MESSAGE = Value
val METADATA_DIRECTORY = Value
val METADATA_JSON = Value
val METHOD_NAME = Value
val METRICS_JSON = Value
val MIN_COMPACTION_BATCH_ID = Value
val MIN_FREQUENT_PATTERN_COUNT = Value
val MIN_POINT_PER_CLUSTER = Value
val MIN_SIZE = Value
val MIN_VERSION_NUMBER = Value
val MODEL_WEIGHTS = Value
val NAMESPACE = Value
val NEW_FEATURE_COLUMN_NAME = Value
Expand All @@ -230,11 +255,15 @@ object LogKey extends Enumeration {
val NODE_LOCATION = Value
val NORM = Value
val NUM_BIN = Value
val NUM_BYTES = Value
val NUM_CLASSES = Value
val NUM_COLUMNS = Value
val NUM_EXAMPLES = Value
val NUM_FEATURES = Value
val NUM_FILES = Value
val NUM_FILES_COPIED = Value
val NUM_FILES_FAILED_TO_DELETE = Value
val NUM_FILES_REUSED = Value
val NUM_FREQUENT_ITEMS = Value
val NUM_ITERATIONS = Value
val NUM_LOCAL_FREQUENT_PATTERN = Value
Expand All @@ -245,6 +274,7 @@ object LogKey extends Enumeration {
val OBJECT_ID = Value
val OFFSET = Value
val OFFSETS = Value
val OFFSET_SEQUENCE_METADATA = Value
val OLD_BLOCK_MANAGER_ID = Value
val OLD_VALUE = Value
val OPTIMIZED_PLAN_COLUMNS = Value
Expand Down Expand Up @@ -272,6 +302,7 @@ object LogKey extends Enumeration {
val POD_TARGET_COUNT = Value
val POLICY = Value
val PORT = Value
val PRETTY_ID_STRING = Value
val PRINCIPAL = Value
val PROCESSING_TIME = Value
val PRODUCER_ID = Value
Expand Down Expand Up @@ -309,6 +340,8 @@ object LogKey extends Enumeration {
val RETRY_INTERVAL = Value
val RIGHT_EXPR = Value
val RMSE = Value
val ROCKS_DB_LOG_LEVEL = Value
val ROCKS_DB_LOG_MESSAGE = Value
val RPC_ENDPOINT_REF = Value
val RULE_BATCH_NAME = Value
val RULE_NAME = Value
Expand All @@ -329,6 +362,7 @@ object LogKey extends Enumeration {
val SLEEP_TIME = Value
val SLIDE_DURATION = Value
val SMALLEST_CLUSTER_INDEX = Value
val SNAPSHOT_VERSION = Value
val SPARK_DATA_STREAM = Value
val SPARK_PLAN_ID = Value
val SQL_TEXT = Value
Expand All @@ -337,13 +371,23 @@ object LogKey extends Enumeration {
val STAGE_ID = Value
val START_INDEX = Value
val STATEMENT_ID = Value
val STATE_STORE_ID = Value
val STATE_STORE_PROVIDER = Value
val STATE_STORE_VERSION = Value
val STATUS = Value
val STDERR = Value
val STORAGE_LEVEL = Value
val STORAGE_LEVEL_DESERIALIZED = Value
val STORAGE_LEVEL_REPLICATION = Value
val STORE_ID = Value
val STREAMING_DATA_SOURCE_DESCRIPTION = Value
val STREAMING_DATA_SOURCE_NAME = Value
val STREAMING_OFFSETS_END = Value
val STREAMING_OFFSETS_START = Value
val STREAMING_QUERY_PROGRESS = Value
val STREAMING_SOURCE = Value
val STREAMING_TABLE = Value
val STREAMING_WRITE = Value
val STREAM_ID = Value
val STREAM_NAME = Value
val SUBMISSION_ID = Value
Expand Down Expand Up @@ -398,6 +442,7 @@ object LogKey extends Enumeration {
val USER_ID = Value
val USER_NAME = Value
val VALUE = Value
val VERSION_NUMBER = Value
val VIRTUAL_CORES = Value
val VOCAB_SIZE = Value
val WAIT_RESULT_TIME = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ case class AdaptiveSparkPlanExec(
Some((finalPlan, optimized))
} catch {
case e: InvalidAQEPlanException[_] =>
logOnLevel(log"Re-optimize - ${MDC(EXCEPTION, e.getMessage())}:\n" +
logOnLevel(log"Re-optimize - ${MDC(ERROR, e.getMessage())}:\n" +
log"${MDC(QUERY_PLAN, e.plan)}")
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.internal.LogKey.PRETTY_ID_STRING
import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.streaming.WriteToStream
import org.apache.spark.sql.errors.QueryExecutionErrors
Expand Down Expand Up @@ -221,7 +223,8 @@ class AsyncProgressTrackingMicroBatchExecution(
super.cleanup()

ThreadUtils.shutdown(asyncWritesExecutorService)
logInfo(s"Async progress tracking executor pool for query ${prettyIdString} has been shutdown")
logInfo(log"Async progress tracking executor pool for query " +
log"${MDC(PRETTY_ID_STRING, prettyIdString)} has been shutdown")
}

// used for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
import org.apache.hadoop.fs.permission.FsPermission

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{PATH, TEMP_PATH}
import org.apache.spark.internal.LogKey._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -144,7 +144,8 @@ object CheckpointFileManager extends Logging {
this(fm, path, generateTempPath(path), overwrite)
}

logInfo(s"Writing atomically to $finalPath using temp file $tempPath")
logInfo(log"Writing atomically to ${MDC(FINAL_PATH, finalPath)} using temp file " +
log"${MDC(TEMP_PATH, tempPath)}")
@volatile private var terminated = false

override def close(): Unit = synchronized {
Expand All @@ -166,7 +167,8 @@ object CheckpointFileManager extends Logging {
s"But $finalPath does not exist.")
}

logInfo(s"Renamed temp file $tempPath to $finalPath")
logInfo(log"Renamed temp file ${MDC(TEMP_PATH, tempPath)} to " +
log"${MDC(FINAL_PATH, finalPath)}")
} finally {
terminated = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path
import org.json4s.{Formats, NoTypeHints}
import org.json4s.jackson.Serialization

import org.apache.spark.internal.LogKey.{BATCH_ID, ELAPSED_TIME}
import org.apache.spark.internal.LogKey._
import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors
Expand Down Expand Up @@ -105,8 +105,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
defaultCompactInterval, compactibleBatchIds(0).toInt)
}
assert(interval > 0, s"intervalValue = $interval not positive value.")
logInfo(s"Set the compact interval to $interval " +
s"[defaultCompactInterval: $defaultCompactInterval]")
logInfo(log"Set the compact interval to ${MDC(COMPACT_INTERVAL, interval)} " +
log"[defaultCompactInterval: ${MDC(DEFAULT_COMPACT_INTERVAL, defaultCompactInterval)}]")
interval
}

Expand Down Expand Up @@ -309,8 +309,9 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
assert(isCompactionBatch(minCompactionBatchId, compactInterval),
s"$minCompactionBatchId is not a compaction batch")

logInfo(s"Current compact batch id = $currentBatchId " +
s"min compaction batch id to delete = $minCompactionBatchId")
logInfo(log"Current compact batch id = ${MDC(CURRENT_BATCH_ID, currentBatchId)} " +
log"min compaction batch id to delete = " +
log"${MDC(MIN_COMPACTION_BATCH_ID, minCompactionBatchId)}")

val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
fileManager.list(metadataPath, (path: Path) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.SparkException
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{ERROR, PATH}
import org.apache.spark.internal.LogKey._
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -146,7 +146,7 @@ class FileStreamSink(

override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= fileLog.getLatestBatchId().getOrElse(-1L)) {
logInfo(s"Skipping already committed batch $batchId")
logInfo(log"Skipping already committed batch ${MDC(BATCH_ID, batchId)}")
} else {
val committer = FileCommitProtocol.instantiate(
className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming

import org.apache.hadoop.fs.FileStatus

import org.apache.spark.internal.LogKey._
import org.apache.spark.internal.MDC
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -101,7 +103,7 @@ class FileStreamSinkLog(

val retentionMs: Long = _retentionMs match {
case Some(retention) =>
logInfo(s"Retention is set to $retention ms")
logInfo(log"Retention is set to ${MDC(TIME_UNITS, retention)} ms")
retention

case _ => Long.MaxValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{CURRENT_PATH, ELAPSED_TIME, NEW_PATH, NUM_FILES}
import org.apache.spark.internal.LogKey._
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
Expand Down Expand Up @@ -126,8 +126,9 @@ class FileStreamSource(
}
seenFiles.purge()

logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
logInfo(log"maxFilesPerBatch = ${MDC(NUM_FILES, maxFilesPerBatch)}, " +
log"maxBytesPerBatch = ${MDC(NUM_BYTES, maxBytesPerBatch)}, " +
log"maxFileAgeMs = ${MDC(TIME_UNITS, maxFileAgeMs)}")

private var unreadFiles: Seq[NewFileEntry] = _

Expand Down Expand Up @@ -251,7 +252,8 @@ class FileStreamSource(
FileEntry(path = p.urlEncoded, timestamp = timestamp, batchId = metadataLogCurrentOffset)
}.toArray
if (metadataLog.add(metadataLogCurrentOffset, fileEntries)) {
logInfo(s"Log offset set to $metadataLogCurrentOffset with ${batchFiles.size} new files")
logInfo(log"Log offset set to ${MDC(LOG_OFFSET, metadataLogCurrentOffset)} " +
log"with ${MDC(NUM_FILES, batchFiles.size)} new files")
} else {
throw new IllegalStateException("Concurrent update to the log. Multiple streaming jobs " +
s"detected for $metadataLogCurrentOffset")
Expand Down Expand Up @@ -291,7 +293,8 @@ class FileStreamSource(

assert(startOffset <= endOffset)
val files = metadataLog.get(Some(startOffset + 1), Some(endOffset)).flatMap(_._2)
logInfo(s"Processing ${files.length} files from ${startOffset + 1}:$endOffset")
logInfo(log"Processing ${MDC(NUM_FILES, files.length)} " +
log"files from ${MDC(FILE_START_OFFSET, startOffset + 1)}:${MDC(FILE_END_OFFSET, endOffset)}")
logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
val newDataSource =
DataSource(
Expand Down
Loading

0 comments on commit d540786

Please sign in to comment.