From e579a01af6441ae8c007e1e5a1480ca2d907573f Mon Sep 17 00:00:00 2001 From: Khor Shu Heng <32997938+khorshuheng@users.noreply.github.com> Date: Wed, 18 May 2022 10:12:30 +0800 Subject: [PATCH] Persist information on whether an endpoint is in cluster mode or not (#146) Signed-off-by: Khor Shu Heng Co-authored-by: Khor Shu Heng --- .../stores/redis/RedisSinkRelation.scala | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala index af383841..81ddc1e2 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala @@ -54,6 +54,19 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC val sparkConf: SparkConf = sqlContext.sparkContext.getConf + lazy val endpoint: RedisEndpoint = RedisEndpoint( + host = sparkConf.get("spark.redis.host"), + port = sparkConf.get("spark.redis.port").toInt, + password = sparkConf.get("spark.redis.password", "") + ) + + lazy val properties: RedisWriteProperties = RedisWriteProperties( + maxJitterSeconds = sparkConf.get("spark.redis.properties.maxJitter").toInt, + pipelineSize = sparkConf.get("spark.redis.properties.pipelineSize").toInt + ) + + lazy val isClusterMode: Boolean = checkIfInClusterMode(endpoint) + def newJedisClient(endpoint: RedisEndpoint): Jedis = { val jedis = new Jedis(endpoint.host, endpoint.port) if (endpoint.password.nonEmpty) { @@ -78,18 +91,6 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC .localCheckpoint() else data - val endpoint = RedisEndpoint( - host = sparkConf.get("spark.redis.host"), - port = sparkConf.get("spark.redis.port").toInt, - password = sparkConf.get("spark.redis.password", "") - ) - val properties = RedisWriteProperties( - maxJitterSeconds = sparkConf.get("spark.redis.properties.maxJitter").toInt, - pipelineSize = sparkConf.get("spark.redis.properties.pipelineSize").toInt - ) - - val isClusterMode = checkIfInClusterMode(endpoint) - dataToStore.foreachPartition { partition: Iterator[Row] => java.security.Security.setProperty("networkaddress.cache.ttl", "3"); java.security.Security.setProperty("networkaddress.cache.negative.ttl", "0");