From 9a91387242bc7d9d0364229443d41502a2527aac Mon Sep 17 00:00:00 2001 From: Murat Levent Date: Mon, 25 Mar 2024 03:05:41 +0300 Subject: [PATCH 1/5] implemented personal files --- calculate_average_muratlevent.sh | 19 ++++ .../onebrc/CalculateAverage_muratlevent.java | 92 +++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100755 calculate_average_muratlevent.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java diff --git a/calculate_average_muratlevent.sh b/calculate_average_muratlevent.sh new file mode 100755 index 000000000..90ecc0293 --- /dev/null +++ b/calculate_average_muratlevent.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="" +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_muratlevent diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java b/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java new file mode 100644 index 000000000..08d7bb2e6 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java @@ -0,0 +1,92 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collector; + +import static java.util.stream.Collectors.groupingBy; + +public class CalculateAverage_muratlevent { + + private static final String FILE = "./measurements.txt"; + + private static record Measurement(String station, double value) { + private Measurement(String[] parts) { + this(parts[0], Double.parseDouble(parts[1])); + } + } + + private static record ResultRow(double min, double mean, double max) { + + public String toString() { + return round(min) + "/" + round(mean) + "/" + round(max); + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + }; + + private static class MeasurementAggregator { + private double min = Double.POSITIVE_INFINITY; + private double max = Double.NEGATIVE_INFINITY; + private double sum; + private long count; + } + + public static void main(String[] args) throws IOException { + // Map measurements1 = Files.lines(Paths.get(FILE)) + // .map(l -> l.split(";")) + // .collect(groupingBy(m -> m[0], averagingDouble(m -> Double.parseDouble(m[1])))); + // + // measurements1 = new TreeMap<>(measurements1.entrySet() + // .stream() + // .collect(toMap(e -> e.getKey(), e -> Math.round(e.getValue() * 10.0) / 10.0))); + // System.out.println(measurements1); + + Collector collector = Collector.of( + MeasurementAggregator::new, + (a, m) -> { + a.min = Math.min(a.min, m.value); + a.max = Math.max(a.max, m.value); + a.sum += m.value; + a.count++; + }, + (agg1, agg2) -> { + var res = new MeasurementAggregator(); + res.min = Math.min(agg1.min, agg2.min); + res.max = Math.max(agg1.max, agg2.max); + res.sum = agg1.sum + agg2.sum; + res.count = agg1.count + agg2.count; + + return res; + }, + agg -> { + return new ResultRow(agg.min, (Math.round(agg.sum * 10.0) / 10.0) / agg.count, agg.max); + }); + + Map measurements = new TreeMap<>(Files.lines(Paths.get(FILE)) + .map(l -> new Measurement(l.split(";"))) + .collect(groupingBy(Measurement::station, collector))); + + System.out.println(measurements); + } +} From ca55b23ad1741228a6df84cf75f4ece95c2b81ef Mon Sep 17 00:00:00 2001 From: Murat Levent Date: Mon, 25 Mar 2024 23:40:08 +0300 Subject: [PATCH 2/5] used parallel stream --- .../java/dev/morling/onebrc/CalculateAverage_muratlevent.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java b/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java index 08d7bb2e6..06eab217b 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java @@ -84,6 +84,7 @@ public static void main(String[] args) throws IOException { }); Map measurements = new TreeMap<>(Files.lines(Paths.get(FILE)) + .parallel() .map(l -> new Measurement(l.split(";"))) .collect(groupingBy(Measurement::station, collector))); From c21f1eba228c7e344d2f85fa69055830d79b1746 Mon Sep 17 00:00:00 2001 From: Murat Levent Date: Wed, 27 Mar 2024 23:54:56 +0300 Subject: [PATCH 3/5] optimized code. implemented thread usage --- calculate_average_muratlevent.sh | 2 +- .../onebrc/CalculateAverage_muratlevent.java | 99 ++++++++++--------- 2 files changed, 56 insertions(+), 45 deletions(-) diff --git a/calculate_average_muratlevent.sh b/calculate_average_muratlevent.sh index 90ecc0293..2162f7dda 100755 --- a/calculate_average_muratlevent.sh +++ b/calculate_average_muratlevent.sh @@ -15,5 +15,5 @@ # limitations under the License. # -JAVA_OPTS="" +JAVA_OPTS="--enable-preview -Xmx8g -XX:+AlwaysPreTouch -XX:+UseParallelGC -XX:-OmitStackTraceInFastThrow" java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_muratlevent diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java b/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java index 06eab217b..dd51bacea 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java @@ -18,76 +18,87 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Comparator; import java.util.Map; -import java.util.TreeMap; -import java.util.stream.Collector; - -import static java.util.stream.Collectors.groupingBy; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Function; +import java.util.stream.Collectors; public class CalculateAverage_muratlevent { private static final String FILE = "./measurements.txt"; private static record Measurement(String station, double value) { - private Measurement(String[] parts) { - this(parts[0], Double.parseDouble(parts[1])); - } } private static record ResultRow(double min, double mean, double max) { - + @Override public String toString() { return round(min) + "/" + round(mean) + "/" + round(max); } - private double round(double value) { + private static double round(double value) { return Math.round(value * 10.0) / 10.0; } - }; + } private static class MeasurementAggregator { private double min = Double.POSITIVE_INFINITY; private double max = Double.NEGATIVE_INFINITY; private double sum; private long count; - } - public static void main(String[] args) throws IOException { - // Map measurements1 = Files.lines(Paths.get(FILE)) - // .map(l -> l.split(";")) - // .collect(groupingBy(m -> m[0], averagingDouble(m -> Double.parseDouble(m[1])))); - // - // measurements1 = new TreeMap<>(measurements1.entrySet() - // .stream() - // .collect(toMap(e -> e.getKey(), e -> Math.round(e.getValue() * 10.0) / 10.0))); - // System.out.println(measurements1); + public void add(Measurement measurement) { + double value = measurement.value(); + min = Math.min(min, value); + max = Math.max(max, value); + sum += value; + count++; + } - Collector collector = Collector.of( - MeasurementAggregator::new, - (a, m) -> { - a.min = Math.min(a.min, m.value); - a.max = Math.max(a.max, m.value); - a.sum += m.value; - a.count++; - }, - (agg1, agg2) -> { - var res = new MeasurementAggregator(); - res.min = Math.min(agg1.min, agg2.min); - res.max = Math.max(agg1.max, agg2.max); - res.sum = agg1.sum + agg2.sum; - res.count = agg1.count + agg2.count; + public MeasurementAggregator combine(MeasurementAggregator other) { + min = Math.min(min, other.min); + max = Math.max(max, other.max); + sum += other.sum; + count += other.count; + return this; + } - return res; - }, - agg -> { - return new ResultRow(agg.min, (Math.round(agg.sum * 10.0) / 10.0) / agg.count, agg.max); - }); + public ResultRow finish() { + return new ResultRow(min, sum / count, max); + } + } + + public static void main(String[] args) { + ForkJoinPool customThreadPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); + try { + Map stationMeasurements = new ConcurrentHashMap<>(); + customThreadPool.submit(() -> { + try (var lines = Files.lines(Paths.get(FILE))) { + lines.parallel() + .map(line -> line.split(";")) + .map(parts -> new Measurement(parts[0], Double.parseDouble(parts[1]))) + .forEach(measurement -> stationMeasurements.computeIfAbsent( + measurement.station(), + k -> new MeasurementAggregator()).add(measurement)); + } + catch (IOException e) { + throw new RuntimeException("Error reading file", e); + } + }).get(); - Map measurements = new TreeMap<>(Files.lines(Paths.get(FILE)) - .parallel() - .map(l -> new Measurement(l.split(";"))) - .collect(groupingBy(Measurement::station, collector))); + stationMeasurements.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(entry -> entry.getKey() + "=" + entry.getValue().finish()) + .forEach(System.out::println); - System.out.println(measurements); + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + customThreadPool.shutdown(); + } } } From fd784936bcd61edfdcb12c300f2845c24f382e6d Mon Sep 17 00:00:00 2001 From: Murat Levent Date: Thu, 28 Mar 2024 23:27:38 +0300 Subject: [PATCH 4/5] changed maximum Java heap size --- calculate_average_muratlevent.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/calculate_average_muratlevent.sh b/calculate_average_muratlevent.sh index 2162f7dda..716d4d269 100755 --- a/calculate_average_muratlevent.sh +++ b/calculate_average_muratlevent.sh @@ -15,5 +15,5 @@ # limitations under the License. # -JAVA_OPTS="--enable-preview -Xmx8g -XX:+AlwaysPreTouch -XX:+UseParallelGC -XX:-OmitStackTraceInFastThrow" +JAVA_OPTS="--enable-preview -Xmx10g -XX:+AlwaysPreTouch -XX:+UseParallelGC -XX:-OmitStackTraceInFastThrow" java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_muratlevent From 265479dc4490024d66ffbcafd7058a19cf9b542c Mon Sep 17 00:00:00 2001 From: Murat Levent Date: Fri, 29 Mar 2024 02:29:33 +0300 Subject: [PATCH 5/5] implemented an efficient temperature data processing using DoubleSummaryStatistics for streamlined computation of min, mean, and max values, and leveraged parallel streams --- calculate_average_muratlevent.sh | 2 +- .../onebrc/CalculateAverage_muratlevent.java | 80 +++---------------- 2 files changed, 14 insertions(+), 68 deletions(-) diff --git a/calculate_average_muratlevent.sh b/calculate_average_muratlevent.sh index 716d4d269..616c82c83 100755 --- a/calculate_average_muratlevent.sh +++ b/calculate_average_muratlevent.sh @@ -15,5 +15,5 @@ # limitations under the License. # -JAVA_OPTS="--enable-preview -Xmx10g -XX:+AlwaysPreTouch -XX:+UseParallelGC -XX:-OmitStackTraceInFastThrow" +JAVA_OPTS="--enable-preview -Xms16g -Xmx32g -XX:+AlwaysPreTouch -XX:+UseParallelGC" java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_muratlevent diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java b/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java index dd51bacea..dc51a1279 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_muratlevent.java @@ -18,87 +18,33 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.Comparator; +import java.util.DoubleSummaryStatistics; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ForkJoinPool; -import java.util.function.Function; import java.util.stream.Collectors; public class CalculateAverage_muratlevent { private static final String FILE = "./measurements.txt"; - private static record Measurement(String station, double value) { - } - - private static record ResultRow(double min, double mean, double max) { - @Override - public String toString() { - return round(min) + "/" + round(mean) + "/" + round(max); - } - - private static double round(double value) { - return Math.round(value * 10.0) / 10.0; - } - } - - private static class MeasurementAggregator { - private double min = Double.POSITIVE_INFINITY; - private double max = Double.NEGATIVE_INFINITY; - private double sum; - private long count; - - public void add(Measurement measurement) { - double value = measurement.value(); - min = Math.min(min, value); - max = Math.max(max, value); - sum += value; - count++; - } - - public MeasurementAggregator combine(MeasurementAggregator other) { - min = Math.min(min, other.min); - max = Math.max(max, other.max); - sum += other.sum; - count += other.count; - return this; - } - - public ResultRow finish() { - return new ResultRow(min, sum / count, max); - } - } - public static void main(String[] args) { - ForkJoinPool customThreadPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); try { - Map stationMeasurements = new ConcurrentHashMap<>(); - customThreadPool.submit(() -> { - try (var lines = Files.lines(Paths.get(FILE))) { - lines.parallel() - .map(line -> line.split(";")) - .map(parts -> new Measurement(parts[0], Double.parseDouble(parts[1]))) - .forEach(measurement -> stationMeasurements.computeIfAbsent( - measurement.station(), - k -> new MeasurementAggregator()).add(measurement)); - } - catch (IOException e) { - throw new RuntimeException("Error reading file", e); - } - }).get(); + Map stationMeasurements = Files.lines(Paths.get(FILE)) + .parallel() + .map(line -> line.split(";")) + .collect(Collectors.groupingBy( + parts -> parts[0], + Collectors.summarizingDouble(parts -> Double.parseDouble(parts[1])))); stationMeasurements.entrySet().stream() .sorted(Map.Entry.comparingByKey()) - .map(entry -> entry.getKey() + "=" + entry.getValue().finish()) - .forEach(System.out::println); - + .forEach(entry -> System.out.println(entry.getKey() + "=" + format(entry.getValue()))); } - catch (Exception e) { + catch (IOException e) { e.printStackTrace(); } - finally { - customThreadPool.shutdown(); - } + } + + private static String format(DoubleSummaryStatistics stats) { + return String.format("%.1f/%.1f/%.1f", stats.getMin(), stats.getAverage(), stats.getMax()); } }