diff --git a/README.md b/README.md index d3c2577..48f3d27 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ The [bin directory](/bin) contains a set of scripts to help run this test on a cluster. These scripts make the following assumpitions. * `FLUO_HOME` environment variable is set. If not set, then set it in `conf/env.sh`. + * The [setup.sh](/bin/setup.sh) script assumes `fluo-conn.properties` and `fluo-app.properties` in `$FLUO_HOME/conf` have correct Zookeeper and Accumulo settings. * Hadoop `yarn` command is on path. * Hadoop `hadoop` command is on path. * Accumulo `accumulo` command is on path. @@ -86,22 +87,20 @@ cluster. These scripts make the following assumpitions. Before running any of the scipts, copy [conf/env.sh.example](/conf/env.sh.example) to `conf/env.sh`, then inspect and modify the file. -Next, execute the [run-test.sh](/bin/run-test.sh) script. This script will create a -new Apache Fluo app called `stresso` (which can be changed by `FLUO_APP_NAME` in your env.sh). -It will modify the application's fluo.properties, copy the stresso jar to the `lib/` -directory of the app and set the following in fluo.properties: + 1. Execute the [setup.sh](/bin/setup.sh) script. This will build Stresso, initialize the Fluo application, and then optimize the Accumulo table. + 1. Start the Fluo oracle and workers processes. TODO link to docs after 1.2.0 is released. The commands below will start these processes locally. + ``` + fluo oracle -a stresso > oracle.log & + fluo worker -a stresso > worker.log & + ``` + 1. Execute the [run-test.sh](/bin/run-test.sh) script. + 1. Terminate worker and oracle processes. -``` -fluo.observer.0=stresso.trie.NodeObserver -fluo.app.trie.nodeSize=X -fluo.app.trie.stopLevel=Y -``` - -The `run-test.sh` script will then initialize and start the Stresso application. -It will load a lot of data directly into Accumulo without transactions and then +The `run-test.sh` loads a lot of data directly into Accumulo without transactions and then incrementally load smaller amounts of data using transactions. After incrementally loading some data, it computes the expected number of unique integers using map reduce. -It then prints the number of unique integers computed by Apache Fluo. +It then checks that the number of unique integers compute by Fluo and Map Reduce are the +same. ## Additional Scripts diff --git a/bin/load-env.sh b/bin/load-env.sh index 5400fc2..8ac5ac5 100644 --- a/bin/load-env.sh +++ b/bin/load-env.sh @@ -1,4 +1,4 @@ -if [ ! -f $BIN_DIR/../conf/env.sh ] +if [ ! -f $BIN_DIR/../conf/env.sh ] then . $BIN_DIR/../conf/env.sh.example else @@ -12,13 +12,13 @@ if [ ! -d "$FLUO_HOME" ]; then fi FLUO_CMD=$FLUO_HOME/bin/fluo if [ -z "$FLUO_APP_NAME" ]; then - echo "FLUO_APP_NAME is not set!" + echo "FLUO_APP_NAME is not set!" exit 1 fi -FLUO_APP_LIB=$FLUO_HOME/apps/$FLUO_APP_NAME/lib -FLUO_PROPS=$FLUO_HOME/apps/$FLUO_APP_NAME/conf/fluo.properties + +FLUO_PROPS=$BIN_DIR/../conf/fluo-conn.properties if [ ! -f "$FLUO_PROPS" ] && [ -z "$SKIP_FLUO_PROPS_CHECK" ]; then - echo "Fluo properties file not found : $FLUO_PROPS" + echo "Fluo properties file not found : $FLUO_PROPS" exit 1 fi @@ -26,11 +26,11 @@ STRESSO_VERSION=0.0.1-SNAPSHOT STRESSO_JAR=$BIN_DIR/../target/stresso-$STRESSO_VERSION.jar STRESSO_SHADED_JAR=$BIN_DIR/../target/stresso-$STRESSO_VERSION-shaded.jar if [ ! -f "$STRESSO_JAR" ] && [ -z "$SKIP_JAR_CHECKS" ]; then - echo "Stresso jar not found : $STRESSO_JAR" + echo "Stresso jar not found : $STRESSO_JAR" exit 1; fi if [ ! -f "$STRESSO_SHADED_JAR" ] && [ -z "$SKIP_JAR_CHECKS" ]; then - echo "Stresso shaded jar not found : $STRESSO_SHADED_JAR" + echo "Stresso shaded jar not found : $STRESSO_SHADED_JAR" exit 1; fi diff --git a/bin/run-test.sh b/bin/run-test.sh index a58dd6f..db058e6 100755 --- a/bin/run-test.sh +++ b/bin/run-test.sh @@ -1,83 +1,27 @@ #!/bin/bash BIN_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) -SKIP_JAR_CHECKS="true" -SKIP_FLUO_PROPS_CHECK="true" . $BIN_DIR/load-env.sh -unset SKIP_JAR_CHECKS -unset SKIP_FLUO_PROPS_CHECK # stop if any command fails set -e -if [ ! -d $FLUO_HOME/apps/$FLUO_APP_NAME ]; then - $FLUO_CMD new $FLUO_APP_NAME -else - echo "Restarting '$FLUO_APP_NAME' application. Errors may be printed if it's not running..." - $FLUO_CMD stop $FLUO_APP_NAME || true - rm -rf $FLUO_HOME/apps/$FLUO_APP_NAME - $FLUO_CMD new $FLUO_APP_NAME -fi - -# build stresso -(cd $BIN_DIR/..;mvn package -Dfluo.version=$FLUO_VERSION -Daccumulo.version=$ACCUMULO_VERSION -DskipTests) - -if [[ $(accumulo version) == *1.6* ]]; then - # build stress balancer - (cd $BIN_DIR/..; mkdir -p git; cd git;git clone https://github.com/keith-turner/stress-balancer.git; cd stress-balancer; ./config-fluo.sh $FLUO_PROPS) -fi - -if [ ! -f "$STRESSO_JAR" ]; then - echo "Stresso jar not found : $STRESSO_JAR" - exit 1 -fi -if [ ! -d $FLUO_APP_LIB ]; then - echo "Fluo app lib $FLUO_APP_LIB does not exist" +# TODO check if table is empty +if [[ $($FLUO_CMD scan -a $FLUO_APP_NAME | head) = *[![:space:]]* ]]; then + echo "ERROR Table is not empty." exit 1 fi -cp $STRESSO_JAR $FLUO_APP_LIB -mvn dependency:copy-dependencies -DincludeArtifactIds=fluo-recipes-core -DoutputDirectory=$FLUO_APP_LIB - -# determine a good stop level -if (("$MAX" <= $((10**9)))); then - STOP=6 -elif (("$MAX" <= $((10**12)))); then - STOP=5 -else - STOP=4 -fi - -# delete existing config in fluo.properties if it exist -$SED '/fluo.observer/d' $FLUO_PROPS -$SED '/fluo.app.trie/d' $FLUO_PROPS - -# append stresso specific config -echo "fluo.observer.0=stresso.trie.NodeObserver" >> $FLUO_PROPS -echo "fluo.app.trie.nodeSize=8" >> $FLUO_PROPS -echo "fluo.app.trie.stopLevel=$STOP" >> $FLUO_PROPS - -$FLUO_CMD init $FLUO_APP_NAME -f -$FLUO_CMD start $FLUO_APP_NAME echo "Removing any previous logs in $LOG_DIR" mkdir -p $LOG_DIR rm -f $LOG_DIR/* -# configure balancer for fluo table -if [[ $(accumulo version) == *1.6* ]]; then - (cd $BIN_DIR/../git/stress-balancer; ./config-accumulo.sh $FLUO_PROPS) -fi # TODO setup RegexGroupBalancer built into Accumulo 1.7.0... may be easier to do from java - hadoop fs -rm -r -f /stresso/ set -e -# add splits to Fluo table -echo "*****Presplitting table*****" -$BIN_DIR/split.sh $SPLITS >$LOG_DIR/split.out 2>$LOG_DIR/split.err - if (( GEN_INIT > 0 )); then # generate and load intial data using map reduce writing directly to table echo "*****Generating and loading initial data set*****" @@ -87,17 +31,21 @@ if (( GEN_INIT > 0 )); then fi # load data incrementally -for i in $(seq 1 $ITERATIONS); do - echo "*****Generating and loading incremental data set $i*****" +START_TIME=`date +%s` +i=1 +while [ $(( $(date +%s) - $LOAD_TIME )) -lt $START_TIME ]; do + echo "*****Generating and loading incremental data set $i*****" $BIN_DIR/generate.sh $MAPS $((GEN_INCR / MAPS)) $MAX /stresso/$i >$LOG_DIR/generate_$i.out 2>$LOG_DIR/generate_$i.err $BIN_DIR/load.sh /stresso/$i >$LOG_DIR/load_$i.out 2>$LOG_DIR/load_$i.err # TODO could reload the same dataset sometimes, maybe when i%5 == 0 or something $BIN_DIR/compact-ll.sh $MAX $COMPACT_CUTOFF >$LOG_DIR/compact-ll_$i.out 2>$LOG_DIR/compact-ll_$i.err if ! ((i % WAIT_PERIOD)); then - $FLUO_CMD wait $FLUO_APP_NAME >$LOG_DIR/wait_$i.out 2>$LOG_DIR/wait_$i.err + $FLUO_CMD wait -a $FLUO_APP_NAME >$LOG_DIR/wait_$i.out 2>$LOG_DIR/wait_$i.err else sleep $SLEEP fi + + i=$((i+1)) done # print unique counts @@ -106,7 +54,7 @@ $BIN_DIR/unique.sh $REDUCES /stresso/* >$LOG_DIR/unique.out 2>$LOG_DIR/unique.er grep UNIQUE $LOG_DIR/unique.err echo "*****Wait for Fluo to finish processing*****" -$FLUO_CMD wait $FLUO_APP_NAME +$FLUO_CMD wait -a $FLUO_APP_NAME echo "*****Printing # of unique integers calculated by Fluo*****" $BIN_DIR/print.sh >$LOG_DIR/print.out 2>$LOG_DIR/print.err diff --git a/bin/setup.sh b/bin/setup.sh new file mode 100755 index 0000000..b34b934 --- /dev/null +++ b/bin/setup.sh @@ -0,0 +1,60 @@ +#!/bin/bash + +BIN_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) + +#TODO maybe have a single skip checks var +SKIP_JAR_CHECKS=1 +SKIP_FLUO_PROPS_CHECK=1 +. $BIN_DIR/load-env.sh + +cd $BIN_DIR/.. + +# stop if any command fails +set -e + +# Build jar and shaded jar +mvn clean package dependency:copy-dependencies \ + -DincludeArtifactIds=fluo-recipes-core \ + -Dfluo.version=$FLUO_VERSION \ + -Daccumulo.version=$ACCUMULO_VERSION +mkdir target/lib +cp target/stresso-0.0.1-SNAPSHOT.jar target/dependency/*.jar target/lib + +# Create config file used for fluo initialization +cp $FLUO_HOME/conf/fluo-app.properties ./conf/fluo-app.properties +$SED '/fluo.worker.num.threads.*/d' ./conf/fluo-app.properties +cat << EOF >> ./conf/fluo-app.properties +fluo.observer.init.dir=$(pwd)/target/lib +fluo.observer.0=stresso.trie.NodeObserver +fluo.worker.num.threads=128 +fluo.loader.num.threads=128 +fluo.loader.queue.size=128 +fluo.app.trie.nodeSize=8 +fluo.app.trie.stopLevel=$STOP +EOF + +# Initialize Stresso +fluo init -a $FLUO_APP_NAME -p conf/fluo-app.properties -f + +# Optimize Accumulo table used by Fluo Stresso Application +accumulo shell -u root -p secret < 1.7.2 2.6.3 - 1.0.0-incubating + 1.2.0 1.0.0-incubating 1.7.12 diff --git a/src/main/java/stresso/trie/CompactLL.java b/src/main/java/stresso/trie/CompactLL.java index 1e0e421..1d03c33 100644 --- a/src/main/java/stresso/trie/CompactLL.java +++ b/src/main/java/stresso/trie/CompactLL.java @@ -3,7 +3,7 @@ import java.io.File; import org.apache.accumulo.core.client.Connector; -import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoAdmin; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.core.util.AccumuloUtil; @@ -20,23 +20,24 @@ public static void main(String[] args) throws Exception { if (args.length != 3) { System.err - .println("Usage: " + Split.class.getSimpleName() + " "); + .println("Usage: " + Split.class.getSimpleName() + " "); System.exit(-1); } FluoConfiguration config = new FluoConfiguration(new File(args[0])); + try(FluoAdmin admin = FluoFactory.newAdmin(config)) { + // Get the application config stored in zookeeper which has Accumulo connection properties + config = new FluoConfiguration(config.orElse(admin.getApplicationConfig())); + } + long max = Long.parseLong(args[1]); //compact levels that can contain less nodes than this int cutoff = Integer.parseInt(args[2]); - int nodeSize; - int stopLevel; - try (FluoClient client = FluoFactory.newClient(config)) { - nodeSize = client.getAppConfiguration().getInt(Constants.NODE_SIZE_PROP); - stopLevel = client.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP); - } + int nodeSize = config.getAppConfiguration().getInt(Constants.NODE_SIZE_PROP); + int stopLevel = config.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP); int level = 64 / nodeSize; diff --git a/src/main/java/stresso/trie/Diff.java b/src/main/java/stresso/trie/Diff.java index f74521d..d4ce3c5 100644 --- a/src/main/java/stresso/trie/Diff.java +++ b/src/main/java/stresso/trie/Diff.java @@ -53,7 +53,7 @@ public static Map getRootCount(FluoClient client, Snapshot snap, i public static void main(String[] args) throws Exception { if (args.length != 1) { - System.err.println("Usage: " + Diff.class.getSimpleName() + " "); + System.err.println("Usage: " + Diff.class.getSimpleName() + " "); System.exit(-1); } diff --git a/src/main/java/stresso/trie/Init.java b/src/main/java/stresso/trie/Init.java index d0f847f..02ed238 100644 --- a/src/main/java/stresso/trie/Init.java +++ b/src/main/java/stresso/trie/Init.java @@ -29,7 +29,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.commons.codec.binary.Base64; -import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.FluoAdmin; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.core.util.AccumuloUtil; @@ -127,20 +127,22 @@ protected void reduce(Text key, Iterable values, Context context) @Override public int run(String[] args) throws Exception { if (args.length != 3) { - System.err.println("Usage: " + this.getClass().getSimpleName() + " "); + System.err.println("Usage: " + this.getClass().getSimpleName() + " "); System.exit(-1); } FluoConfiguration props = new FluoConfiguration(new File(args[0])); + try(FluoAdmin admin = FluoFactory.newAdmin(props)) { + // Get the application config stored in zookeeper which has Accumulo connection properties + props = new FluoConfiguration(props.orElse(admin.getApplicationConfig())); + props.print(); + } + Path input = new Path(args[1]); Path tmp = new Path(args[2]); - int stopLevel; - int nodeSize; - try (FluoClient client = FluoFactory.newClient(props)) { - nodeSize = client.getAppConfiguration().getInt(Constants.NODE_SIZE_PROP); - stopLevel = client.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP); - } + int nodeSize = props.getAppConfiguration().getInt(Constants.NODE_SIZE_PROP); + int stopLevel = props.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP); int ret = unique(input, new Path(tmp, "nums")); if (ret != 0) diff --git a/src/main/java/stresso/trie/Load.java b/src/main/java/stresso/trie/Load.java index 8e1ebfb..84cd5b5 100644 --- a/src/main/java/stresso/trie/Load.java +++ b/src/main/java/stresso/trie/Load.java @@ -49,7 +49,7 @@ protected void map(LongWritable key, NullWritable val, Context context) public int run(String[] args) throws Exception { if (args.length != 2) { - log.error("Usage: " + this.getClass().getSimpleName() + " "); + log.error("Usage: " + this.getClass().getSimpleName() + " "); System.exit(-1); } diff --git a/src/main/java/stresso/trie/Print.java b/src/main/java/stresso/trie/Print.java index 62c39a8..7163709 100644 --- a/src/main/java/stresso/trie/Print.java +++ b/src/main/java/stresso/trie/Print.java @@ -109,7 +109,7 @@ public static Stats getStats(SimpleConfiguration config) throws Exception { public static void main(String[] args) throws Exception { if (args.length != 1) { - System.err.println("Usage: " + Print.class.getSimpleName() + " "); + System.err.println("Usage: " + Print.class.getSimpleName() + " "); System.exit(-1); } diff --git a/src/main/java/stresso/trie/Split.java b/src/main/java/stresso/trie/Split.java index df8e28f..f2b5ddf 100644 --- a/src/main/java/stresso/trie/Split.java +++ b/src/main/java/stresso/trie/Split.java @@ -14,17 +14,11 @@ package stresso.trie; -import java.io.ByteArrayInputStream; -import java.io.File; -import java.nio.charset.StandardCharsets; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; import java.util.TreeSet; +import javax.inject.Inject; + import com.google.common.base.Strings; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; @@ -34,22 +28,17 @@ public class Split { - private static final String RGB_CLASS = - "org.apache.accumulo.server.master.balancer.RegexGroupBalancer"; - private static final String RGB_PATTERN_PROP = "table.custom.balancer.group.regex.pattern"; - private static final String RGB_DEFAULT_PROP = "table.custom.balancer.group.regex.default"; - private static final String TABLE_BALANCER_PROP = "table.balancer"; + @Inject + private static FluoConfiguration config; public static void main(String[] args) throws Exception { - if (args.length != 3) { + if (args.length != 1) { System.err.println("Usage: " + Split.class.getSimpleName() - + " "); + + " "); System.exit(-1); } - FluoConfiguration config = new FluoConfiguration(new File(args[0])); - - int maxTablets = Integer.parseInt(args[2]); + int maxTablets = Integer.parseInt(args[0]); int nodeSize; int stopLevel; @@ -58,8 +47,6 @@ public static void main(String[] args) throws Exception { stopLevel = client.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP); } - setupBalancer(config); - int level = 64 / nodeSize; while (level >= stopLevel) { @@ -73,45 +60,6 @@ public static void main(String[] args) throws Exception { level--; } - - optimizeAccumulo(config, args[1]); - } - - private static void optimizeAccumulo(FluoConfiguration config, String tableProps) - throws Exception { - Connector conn = AccumuloUtil.getConnector(config); - - Properties tprops = new Properties(); - tprops.load(new ByteArrayInputStream(tableProps.getBytes(StandardCharsets.UTF_8))); - - Set> es = tprops.entrySet(); - for (Entry e : es) { - conn.tableOperations().setProperty(config.getAccumuloTable(), e.getKey().toString(), - e.getValue().toString()); - } - try { - conn.instanceOperations().setProperty("table.durability", "flush"); - conn.tableOperations().removeProperty("accumulo.metadata", "table.durability"); - conn.tableOperations().removeProperty("accumulo.root", "table.durability"); - } catch (AccumuloException e) { - System.err.println( - "Unable to set durability settings (error expected in Accumulo 1.6) : " + e.getMessage()); - } - } - - private static void setupBalancer(FluoConfiguration config) throws AccumuloSecurityException { - Connector conn = AccumuloUtil.getConnector(config); - - try { - // setting this prop first intentionally because it should fail in 1.6 - conn.tableOperations().setProperty(config.getAccumuloTable(), RGB_PATTERN_PROP, "(\\d\\d).*"); - conn.tableOperations().setProperty(config.getAccumuloTable(), RGB_DEFAULT_PROP, "none"); - conn.tableOperations().setProperty(config.getAccumuloTable(), TABLE_BALANCER_PROP, RGB_CLASS); - System.out.println("Setup tablet group balancer"); - } catch (AccumuloException e) { - System.err.println( - "Unable to setup tablet balancer (error expected in Accumulo 1.6) : " + e.getMessage()); - } } private static TreeSet genSplits(int level, int numTablets) {