diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala index af38384..81ddc1e 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala @@ -54,6 +54,19 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC val sparkConf: SparkConf = sqlContext.sparkContext.getConf + lazy val endpoint: RedisEndpoint = RedisEndpoint( + host = sparkConf.get("spark.redis.host"), + port = sparkConf.get("spark.redis.port").toInt, + password = sparkConf.get("spark.redis.password", "") + ) + + lazy val properties: RedisWriteProperties = RedisWriteProperties( + maxJitterSeconds = sparkConf.get("spark.redis.properties.maxJitter").toInt, + pipelineSize = sparkConf.get("spark.redis.properties.pipelineSize").toInt + ) + + lazy val isClusterMode: Boolean = checkIfInClusterMode(endpoint) + def newJedisClient(endpoint: RedisEndpoint): Jedis = { val jedis = new Jedis(endpoint.host, endpoint.port) if (endpoint.password.nonEmpty) { @@ -78,18 +91,6 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC .localCheckpoint() else data - val endpoint = RedisEndpoint( - host = sparkConf.get("spark.redis.host"), - port = sparkConf.get("spark.redis.port").toInt, - password = sparkConf.get("spark.redis.password", "") - ) - val properties = RedisWriteProperties( - maxJitterSeconds = sparkConf.get("spark.redis.properties.maxJitter").toInt, - pipelineSize = sparkConf.get("spark.redis.properties.pipelineSize").toInt - ) - - val isClusterMode = checkIfInClusterMode(endpoint) - dataToStore.foreachPartition { partition: Iterator[Row] => java.security.Security.setProperty("networkaddress.cache.ttl", "3"); java.security.Security.setProperty("networkaddress.cache.negative.ttl", "0");