Skip to content

Commit

Permalink
Persist information on whether an endpoint is in cluster mode or not (#…
Browse files Browse the repository at this point in the history
…146)

Signed-off-by: Khor Shu Heng <[email protected]>

Co-authored-by: Khor Shu Heng <[email protected]>
  • Loading branch information
khorshuheng and khorshuheng authored May 18, 2022
1 parent fef7c57 commit e579a01
Showing 1 changed file with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC

val sparkConf: SparkConf = sqlContext.sparkContext.getConf

lazy val endpoint: RedisEndpoint = RedisEndpoint(
host = sparkConf.get("spark.redis.host"),
port = sparkConf.get("spark.redis.port").toInt,
password = sparkConf.get("spark.redis.password", "")
)

lazy val properties: RedisWriteProperties = RedisWriteProperties(
maxJitterSeconds = sparkConf.get("spark.redis.properties.maxJitter").toInt,
pipelineSize = sparkConf.get("spark.redis.properties.pipelineSize").toInt
)

lazy val isClusterMode: Boolean = checkIfInClusterMode(endpoint)

def newJedisClient(endpoint: RedisEndpoint): Jedis = {
val jedis = new Jedis(endpoint.host, endpoint.port)
if (endpoint.password.nonEmpty) {
Expand All @@ -78,18 +91,6 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
.localCheckpoint()
else data

val endpoint = RedisEndpoint(
host = sparkConf.get("spark.redis.host"),
port = sparkConf.get("spark.redis.port").toInt,
password = sparkConf.get("spark.redis.password", "")
)
val properties = RedisWriteProperties(
maxJitterSeconds = sparkConf.get("spark.redis.properties.maxJitter").toInt,
pipelineSize = sparkConf.get("spark.redis.properties.pipelineSize").toInt
)

val isClusterMode = checkIfInClusterMode(endpoint)

dataToStore.foreachPartition { partition: Iterator[Row] =>
java.security.Security.setProperty("networkaddress.cache.ttl", "3");
java.security.Security.setProperty("networkaddress.cache.negative.ttl", "0");
Expand Down

0 comments on commit e579a01

Please sign in to comment.