Skip to content
This repository has been archived by the owner on Dec 14, 2019. It is now read-only.

Commit

Permalink
GEARPUMP: rules update notification through kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
jelyoussefi authored and wagmarcel committed Jan 16, 2018
1 parent 0eee3cc commit f4ceebb
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 38 deletions.
3 changes: 2 additions & 1 deletion deployer/src/cloudfoundry_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def build_config(self, local=False):
"token": self._vcap_services.token,
"dashboard_url": self._vcap_services.dashboard_url,
"kafka_servers": self._vcap_services.kafka_servers,
"kafka_topic": self._vcap_services.topic_name,
"kafka_observations_topic": self._vcap_services.observations_topic_name,
"kafka_rule_engine_topic": self._vcap_services.rule_engine_topic_name,
"kafka_zookeeper_quorum": kafka_zookeeper_quorum,
"application_name": "rule_engine_" + self._vcap_services.dashboard_url_normalized_for_gearpump,
"dashboard_strict_ssl": self._vcap_services.dashboard_strict_ssl,
Expand Down
7 changes: 4 additions & 3 deletions deployer/src/vcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(self):

self.__parse_dashboard_url(ups)
self.__gather_rule_engine_token(ups)
self.__gather_topic_name(ups)
self.__gather_topics_names(ups)
self.__parse_gearpump_credentials()
self.__parse_kerberos_hbase_properties(ups)

Expand All @@ -68,9 +68,10 @@ def __parse_dashboard_url(self, ups):
.replace("-", "_") \
.replace(".", "_")

def __gather_topic_name(self, ups):
def __gather_topics_names(self, ups):
kafka_ups = self.__get_ups_by_name(ups, 'kafka-ups')
self.topic_name = kafka_ups['credentials']['topic']
self.observations_topic_name = kafka_ups['credentials']['topics']['observations']
self.rule_engine_topic_name = kafka_ups['credentials']['topics']['rule_engine']

def __parse_gearpump_credentials(self):
self.gearpump_credentials = self.json['gearpump'][0]['credentials']
Expand Down
22 changes: 16 additions & 6 deletions src/main/java/com/intel/ruleengine/gearpump/graph/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class Config {
private Boolean dashboard_strict_ssl;
private String kafka_servers;
private String kafka_zookeeper_quorum;
private String kafka_topic;
private String kafka_observations_topic;
private String kafka_rule_engine_topic;
private String application_name;
private String hadoop_security_authentication;
private String krb_kdc;
Expand All @@ -38,12 +39,20 @@ class Config {
private String krb_regionserver_principal;


public String getKafka_topic() {
return kafka_topic;
public String getKafka_observations_topic() {
return kafka_observations_topic;
}

public void setKafka_topic(String kafka_topic) {
this.kafka_topic = kafka_topic;
public void setKafka_observations_topic(String kafka_topic) {
this.kafka_observations_topic = kafka_topic;
}

public String getKafka_rule_engine_topic() {
return kafka_rule_engine_topic;
}

public void setKafka_rule_engine_topic(String kafka_topic) {
this.kafka_rule_engine_topic = kafka_topic;
}

public String getApplication_name() {
Expand Down Expand Up @@ -170,7 +179,8 @@ public void setDashboard_strict_ssl(Boolean dashboard_strict_ssl) {
public String toString() {
String sep = ", ";
StringBuilder builder = new StringBuilder()
.append("kafka_topic: ").append(getKafka_topic()).append(sep)
.append("kafka_observations_topic: ").append(getKafka_observations_topic()).append(sep)
.append("kafka_rule_engine_topic: ").append(getKafka_rule_engine_topic()).append(sep)
.append("application_name: ").append(getApplication_name()).append(sep)
.append("zookeeper_hbase_quorum: ").append(getZookeeper_hbase_quorum()).append(sep)
.append("hbase_table_prefix: ").append(getHbase_table_prefix()).append(sep)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import com.intel.ruleengine.gearpump.data.HbaseProperties;
import com.intel.ruleengine.gearpump.data.KerberosProperties;
import com.intel.ruleengine.gearpump.tasks.KafkaSourceProcessor;
import com.intel.ruleengine.gearpump.tasks.KafkaSourceObservationsProcessor;
import com.intel.ruleengine.gearpump.tasks.KafkaSourceRulesUpdateProcessor;

import com.intel.ruleengine.gearpump.util.LogHelper;
import io.gearpump.cluster.UserConfig;
import org.slf4j.Logger;
Expand Down Expand Up @@ -41,7 +44,8 @@ public GraphConfig(String... args) {
.withString(DashboardConfig.DASHBOARD_TOKEN_PROPERTY, externalConfig.getToken())
.withString(DashboardConfig.DASHBOARD_URL_PROPERTY, externalConfig.getDashboard_url())
.withBoolean(DashboardConfig.DASHBOARD_STRICT_SSL_VERIFICATION, externalConfig.getDashboard_strict_ssl())
.withString(KafkaSourceProcessor.KAFKA_TOPIC_PROPERTY, externalConfig.getKafka_topic())
.withString(KafkaSourceObservationsProcessor.KAFKA_TOPIC_PROPERTY, externalConfig.getKafka_observations_topic())
.withString(KafkaSourceRulesUpdateProcessor.KAFKA_TOPIC_PROPERTY, externalConfig.getKafka_rule_engine_topic())
.withString(KafkaSourceProcessor.KAFKA_URI_PROPERTY, externalConfig.getKafka_servers())
.withString(KafkaSourceProcessor.KAFKA_ZOOKEEPER_PROPERTY, externalConfig.getKafka_zookeeper_quorum())
.withString(HbaseProperties.AUTHENTICATION_METHOD, externalConfig.getHadoop_security_authentication())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class GraphDefinition {

private final Map<Processor, List<Processor>> definition;

private final Processor kafkaSourceProcessor;
private final Processor kafkaSourceObservationsProcessor;
private final Processor kafkaSourceRulesUpdateProcessor;

private final Processor checkObservationInRules;
private final Processor sendAlerts;
Expand All @@ -43,7 +44,8 @@ class GraphDefinition {
persistComponentAlerts = processorsBuilder.getPersistComponentAlertsProccesor();
checkRules = processorsBuilder.getCheckRulesProcessor();
getRulesForComponent = processorsBuilder.getRulesForComponentProcessor();
kafkaSourceProcessor = processorsBuilder.getKafkaSource();
kafkaSourceObservationsProcessor = processorsBuilder.getKafkaSourceObservations();
kafkaSourceRulesUpdateProcessor = processorsBuilder.getKafkaSourceRulesUpdate();
persistObservation = processorsBuilder.getPersistObservationProcessor();
this.definition = new HashMap<>();
buildDefinition();
Expand All @@ -54,16 +56,16 @@ public Map<Processor, List<Processor>> getDefinition() {
}

private void buildDefinition() {
definition.put(kafkaSourceProcessor, Arrays.asList(getRulesForComponent));
definition.put(kafkaSourceObservationsProcessor, Arrays.asList(getRulesForComponent));
definition.put(getRulesForComponent, Arrays.asList(persistObservation));
definition.put(persistObservation, Arrays.asList(checkObservationInRules));

definition.put(checkObservationInRules, Arrays.asList(persistComponentAlerts));
definition.put(persistComponentAlerts, Arrays.asList(checkRules));

definition.put(checkRules, Arrays.asList(sendAlerts));
definition.put(sendAlerts, new ArrayList<>());

definition.put(kafkaSourceRulesUpdateProcessor, Arrays.asList(downloadRulesTask));
definition.put(downloadRulesTask, Arrays.asList(persistRulesTask));
definition.put(persistRulesTask, new ArrayList<>());
definition.put(sendAlerts, new ArrayList<>());
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.intel.ruleengine.gearpump.graph;

import com.intel.ruleengine.gearpump.tasks.KafkaSourceProcessor;
import com.intel.ruleengine.gearpump.tasks.KafkaSourceObservationsProcessor;
import com.intel.ruleengine.gearpump.tasks.KafkaSourceRulesUpdateProcessor;
import com.intel.ruleengine.gearpump.tasks.processors.*;
import io.gearpump.cluster.UserConfig;
import io.gearpump.streaming.javaapi.Processor;
Expand Down Expand Up @@ -32,8 +33,12 @@ class ProcessorsBuilder {
this.parallelismDefinition = parallelismDefinition;
}

public Processor getKafkaSource() {
return new KafkaSourceProcessor(userConfig).getKafkaSourceProcessor(parallelismDefinition.getKafkaSourceProcessorsNumber());
public Processor getKafkaSourceRulesUpdate() {
return new KafkaSourceRulesUpdateProcessor(userConfig).getKafkaSourceProcessor(parallelismDefinition.getKafkaSourceProcessorsNumber());
}

public Processor getKafkaSourceObservations() {
return new KafkaSourceObservationsProcessor(userConfig).getKafkaSourceProcessor(parallelismDefinition.getKafkaSourceProcessorsNumber());
}

public Processor getSendAlertsProcessor() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.intel.ruleengine.gearpump.tasks;

import com.intel.ruleengine.gearpump.tasks.KafkaSourceProcessor;
import io.gearpump.cluster.UserConfig;


public class KafkaSourceObservationsProcessor extends KafkaSourceProcessor {

public static final String KAFKA_TOPIC_PROPERTY = "KAFKA_OBSERVATIONS_TOPIC";

private static final String NAME = "KafkaSourceObservations";

public KafkaSourceObservationsProcessor(UserConfig userConfig) {
super(userConfig, NAME, userConfig.getString(KAFKA_TOPIC_PROPERTY).get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ public class KafkaSourceProcessor {
public static final String KAFKA_URI_PROPERTY = "KAFKA_URI";
public static final String KAFKA_ZOOKEEPER_PROPERTY = "KAFKA_URI_ZOOKEEPER";

private static final String NAME = "KafkaSource";
private static String name;

private final KafkaSource kafkaSource;
private final ClientContext context;

public KafkaSourceProcessor(UserConfig userConfig) {
String topic = userConfig.getString(KAFKA_TOPIC_PROPERTY).get();
public KafkaSourceProcessor(UserConfig userConfig, String name, String topic) {
this.name = name;

String zookeeperQuorum = userConfig.getString(KAFKA_ZOOKEEPER_PROPERTY).get();
String serverUri = userConfig.getString(KAFKA_URI_PROPERTY).get();

Expand All @@ -61,6 +62,6 @@ public KafkaSourceProcessor(UserConfig userConfig) {
}

public Processor getKafkaSourceProcessor(int parallelProcessorNumber) {
return Processor.source(kafkaSource, parallelProcessorNumber, NAME, UserConfig.empty(), context.system());
return Processor.source(kafkaSource, parallelProcessorNumber, name, UserConfig.empty(), context.system());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2016 Intel Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.intel.ruleengine.gearpump.tasks;

import com.intel.ruleengine.gearpump.tasks.KafkaSourceProcessor;
import io.gearpump.cluster.UserConfig;

public class KafkaSourceRulesUpdateProcessor extends KafkaSourceProcessor {

public static final String KAFKA_TOPIC_PROPERTY = "KAFKA_RULES_UPDATE_TOPIC";

private static final String NAME = "KafkaSourceRulesUpdate";

public KafkaSourceRulesUpdateProcessor(UserConfig userConfig) {
super(userConfig, NAME, userConfig.getString(KAFKA_TOPIC_PROPERTY).get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class DownloadRulesTask extends RuleEngineTask {
private static final String START_MSG = "start";
private static final String CONTINUE_MSG = "continue";
private final RulesApi rulesApi;
private static final int TRIGER_INTERVAL = 10; //in seconds
private Map<String, List<Rule>> componentsRules;

public DownloadRulesTask(TaskContext context, UserConfig userConfig) {
Expand Down Expand Up @@ -75,14 +74,12 @@ public void onNext(Message message) {
} catch (Exception e) {
getLogger().error("Unknown error during rules downloading.", e);
}
getContext().scheduleOnce(FiniteDuration.create(TRIGER_INTERVAL, TimeUnit.SECONDS), new SelfTrigger());
}

private Map<String, List<Rule>> getComponentsRules() throws InvalidDashboardResponseException {
List<ComponentRulesResponse> componentsRules = rulesApi.getActiveComponentsRules();
RuleParser ruleParser = new RuleParser(componentsRules);
Map<String, List<Rule>> result = ruleParser.getComponentRules();

return result;
}

Expand All @@ -94,12 +91,4 @@ public static Processor getProcessor(UserConfig config, int parallelProcessorNum
return createProcessor(DownloadRulesTask.class, config, parallelProcessorNumber, TASK_NAME);
}

private class SelfTrigger extends scala.runtime.AbstractFunction0 {
@Override
public Object apply() {
self().tell(new Message(CONTINUE_MSG, now()), self());
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public GetComponentRulesTask(TaskContext context, UserConfig userConfig, RulesRe
public void onNext(Message message) {
try {
getLogger().info("GetRulesTask started");
getLogger().warn("GetRulesTask started");

observations = getInputMessage(message);
sendObservations();
} catch (InvalidMessageTypeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,19 @@ public void onNextConditionsShouldBeFulfilled() throws InvalidDashboardResponseE
downloadRulesTask.onNext(message);

verify(taskContext, times(1)).output(expectedOutput);
verify(taskContext, times(1)).scheduleOnce(any(), any());
}

@Test
public void onNextShouldCatchInvalidDashboardResponseException() throws InvalidDashboardResponseException {
when(rulesApi.getActiveComponentsRules()).thenThrow(InvalidDashboardResponseException.class);
downloadRulesTask.onNext(message);
verify(taskContext, never()).output(any());
verify(taskContext, times(1)).scheduleOnce(any(), any());
}

@Test
public void onNextShouldCatchException() throws InvalidDashboardResponseException {
when(rulesApi.getActiveComponentsRules()).thenThrow(Exception.class);
downloadRulesTask.onNext(message);
verify(taskContext, never()).output(any());
verify(taskContext, times(1)).scheduleOnce(any(), any());
}
}

0 comments on commit f4ceebb

Please sign in to comment.