Skip to content

Commit

Permalink
[#1395] Implement equals for temporal graphs (#1397)
Browse files Browse the repository at this point in the history
fixes #1395
  • Loading branch information
timo95 authored and Kevin Gómez committed Oct 17, 2019
1 parent abfd206 commit 16115b3
Show file tree
Hide file tree
Showing 27 changed files with 1,130 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.gradoop.flink.model.impl.operators.aggregation.Aggregation;
import org.gradoop.flink.model.impl.operators.cloning.Cloning;
import org.gradoop.flink.model.impl.operators.combination.Combination;
import org.gradoop.flink.model.impl.operators.equality.GraphEquality;
import org.gradoop.flink.model.impl.operators.exclusion.Exclusion;
import org.gradoop.flink.model.impl.operators.grouping.Grouping;
import org.gradoop.flink.model.impl.operators.grouping.GroupingStrategy;
Expand All @@ -43,6 +44,12 @@
import org.gradoop.flink.model.impl.operators.neighborhood.ReduceVertexNeighborhood;
import org.gradoop.flink.model.impl.operators.overlap.Overlap;
import org.gradoop.flink.model.impl.operators.subgraph.Subgraph;
import org.gradoop.flink.model.impl.operators.tostring.functions.EdgeToDataString;
import org.gradoop.flink.model.impl.operators.tostring.functions.EdgeToIdString;
import org.gradoop.flink.model.impl.operators.tostring.functions.GraphHeadToDataString;
import org.gradoop.flink.model.impl.operators.tostring.functions.GraphHeadToEmptyString;
import org.gradoop.flink.model.impl.operators.tostring.functions.VertexToDataString;
import org.gradoop.flink.model.impl.operators.tostring.functions.VertexToIdString;
import org.gradoop.flink.model.impl.operators.transformation.Transformation;
import org.gradoop.flink.model.impl.operators.verify.Verify;
import org.gradoop.flink.model.impl.operators.verify.VerifyGraphContainment;
Expand Down Expand Up @@ -492,6 +499,47 @@ default LG exclude(LG otherGraph) {
return callForGraph(new Exclusion<>(), otherGraph);
}

/**
* Checks, if another graph contains exactly the same vertices and edges (by id) as this graph.
*
* @param other other graph
* @return 1-element dataset containing true, iff equal by element ids
*/
default DataSet<Boolean> equalsByElementIds(LG other) {
return callForValue(new GraphEquality<>(
new GraphHeadToEmptyString<>(),
new VertexToIdString<>(),
new EdgeToIdString<>(), true), other);
}

/**
* Checks, if another graph contains vertices and edges with the same
* attached data (i.e. label and properties) as this graph.
*
* @param other other graph
* @return 1-element dataset containing true, iff equal by element data
*/
default DataSet<Boolean> equalsByElementData(LG other) {
return callForValue(new GraphEquality<>(
new GraphHeadToEmptyString<>(),
new VertexToDataString<>(),
new EdgeToDataString<>(), true), other);
}

/**
* Checks, if another graph has the same attached data and contains
* vertices and edges with the same attached data as this graph.
*
* @param other other graph
* @return 1-element dataset containing true, iff equal by element data
*/
default DataSet<Boolean> equalsByData(LG other) {
return callForValue(new GraphEquality<>(
new GraphHeadToDataString<>(),
new VertexToDataString<>(),
new EdgeToDataString<>(), true), other);
}

//----------------------------------------------------------------------------
// Utility methods
//----------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.gradoop.flink.model.api.epgm;

import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.metadata.MetaData;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
Expand Down Expand Up @@ -77,32 +76,6 @@ public interface LogicalGraphOperators
*/
LogicalGraph sample(SamplingAlgorithm algorithm);

/**
* Checks, if another logical graph contains exactly the same vertices and edges (by id) as this graph.
*
* @param other other graph
* @return 1-element dataset containing true, if equal by element ids
*/
DataSet<Boolean> equalsByElementIds(LogicalGraph other);

/**
* Checks, if another logical graph contains vertices and edges with the same
* attached data (i.e. label and properties) as this graph.
*
* @param other other graph
* @return 1-element dataset containing true, iff equal by element data
*/
DataSet<Boolean> equalsByElementData(LogicalGraph other);

/**
* Checks, if another logical graph has the same attached data and contains
* vertices and edges with the same attached data as this graph.
*
* @param other other graph
* @return 1-element dataset containing true, iff equal by element data
*/
DataSet<Boolean> equalsByData(LogicalGraph other);

/**
* Generates all combinations of the supplied vertex grouping keys according to the definition of
* the rollUp operation in SQL and uses them together with all edge grouping keys for separate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,10 @@
import org.gradoop.flink.model.impl.functions.epgm.PropertyGetter;
import org.gradoop.flink.model.impl.operators.cypher.capf.query.CAPFQuery;
import org.gradoop.flink.model.impl.operators.cypher.capf.result.CAPFQueryResult;
import org.gradoop.flink.model.impl.operators.equality.GraphEquality;
import org.gradoop.flink.model.impl.operators.rollup.EdgeRollUp;
import org.gradoop.flink.model.impl.operators.rollup.VertexRollUp;
import org.gradoop.flink.model.impl.operators.sampling.SamplingAlgorithm;
import org.gradoop.flink.model.impl.operators.split.Split;
import org.gradoop.flink.model.impl.operators.tostring.functions.EdgeToDataString;
import org.gradoop.flink.model.impl.operators.tostring.functions.EdgeToIdString;
import org.gradoop.flink.model.impl.operators.tostring.functions.GraphHeadToDataString;
import org.gradoop.flink.model.impl.operators.tostring.functions.GraphHeadToEmptyString;
import org.gradoop.flink.model.impl.operators.tostring.functions.VertexToDataString;
import org.gradoop.flink.model.impl.operators.tostring.functions.VertexToIdString;
import org.gradoop.flink.util.GradoopFlinkConfig;

import java.io.IOException;
Expand Down Expand Up @@ -196,34 +189,6 @@ public GraphCollection groupEdgesByRollUp(
edgeGroupingKeys, edgeAggregateFunctions));
}

//----------------------------------------------------------------------------
// Binary Operators
//----------------------------------------------------------------------------

@Override
public DataSet<Boolean> equalsByElementIds(LogicalGraph other) {
return new GraphEquality(
new GraphHeadToEmptyString(),
new VertexToIdString(),
new EdgeToIdString(), true).execute(this, other);
}

@Override
public DataSet<Boolean> equalsByElementData(LogicalGraph other) {
return new GraphEquality(
new GraphHeadToEmptyString(),
new VertexToDataString(),
new EdgeToDataString(), true).execute(this, other);
}

@Override
public DataSet<Boolean> equalsByData(LogicalGraph other) {
return new GraphEquality(
new GraphHeadToDataString(),
new VertexToDataString(),
new EdgeToDataString(), true).execute(this, other);
}

//----------------------------------------------------------------------------
// Auxiliary Operators
//----------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright © 2014 - 2019 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.functions.epgm;

import org.gradoop.common.model.api.entities.Element;
import org.gradoop.flink.model.api.functions.TransformationFunction;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* A serializable function that is applied on an element (i.e. graph head, vertex and edge)
* to remove some of its properties.
*
* @param <T> the {@link Element} which is target of change.
*/
public class RemoveProperties<T extends Element> implements TransformationFunction<T> {

/**
* Keys of properties to be deleted.
*/
private final String[] propertyKeys;

/**
* Constructs {@link RemoveProperties} instance.
*
* @param propertyKeys keys of properties to be removed
*/
public RemoveProperties(String... propertyKeys) {
checkNotNull(propertyKeys);
for (String propertyKey : propertyKeys) {
checkNotNull(propertyKey);
}
this.propertyKeys = propertyKeys;
}

@Override
public T apply(T current, T transformed) {
for (String propertyKey : propertyKeys) {
current.removeProperty(propertyKey);
}
return current;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,40 @@
package org.gradoop.flink.model.impl.operators.equality;

import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.epgm.BaseGraphCollectionFactory;
import org.gradoop.flink.model.api.operators.BinaryBaseGraphToValueOperator;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.operators.tostring.api.EdgeToString;
import org.gradoop.flink.model.impl.operators.tostring.api.GraphHeadToString;
import org.gradoop.flink.model.impl.operators.tostring.api.VertexToString;

/**
* Operator to determine if two graph are equal according to given string
* representations of graph heads, vertices and edges.
*
* @param <G> The graph head type.
* @param <V> The vertex type.
* @param <E> The edge type.
* @param <LG> The type of the graph.
* @param <GC> The type of the graph collection.
*/
public class GraphEquality
implements BinaryBaseGraphToValueOperator<LogicalGraph, DataSet<Boolean>> {
public class GraphEquality<
G extends GraphHead,
V extends Vertex,
E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>>
implements BinaryBaseGraphToValueOperator<LG, DataSet<Boolean>> {

/**
* collection equality operator, wrapped by graph equality
* (graph are considered to be a 1-graph collection)
*/
private final CollectionEquality collectionEquality;
private final CollectionEquality<G, V, E, LG, GC> collectionEquality;
/**
* sets mode for directed or undirected graphs
*/
Expand All @@ -51,19 +62,18 @@ public class GraphEquality
* @param edgeToString string representation of edges
* @param directed sets mode for directed or undirected graphs
*/
public GraphEquality(GraphHeadToString<EPGMGraphHead> graphHeadToString,
VertexToString<EPGMVertex> vertexToString, EdgeToString<EPGMEdge> edgeToString,
public GraphEquality(GraphHeadToString<G> graphHeadToString,
VertexToString<V> vertexToString, EdgeToString<E> edgeToString,
boolean directed) {
this.directed = directed;

this.collectionEquality =
new CollectionEquality(graphHeadToString, vertexToString, edgeToString, this.directed);
new CollectionEquality<>(graphHeadToString, vertexToString, edgeToString, this.directed);
}

@Override
public DataSet<Boolean> execute(LogicalGraph firstGraph, LogicalGraph secondGraph) {
BaseGraphCollectionFactory<EPGMGraphHead, EPGMVertex, EPGMEdge, LogicalGraph, GraphCollection>
collectionFactory = firstGraph.getCollectionFactory();
public DataSet<Boolean> execute(LG firstGraph, LG secondGraph) {
BaseGraphCollectionFactory<G, V, E, LG, GC> collectionFactory = firstGraph.getCollectionFactory();
return collectionEquality
.execute(collectionFactory.fromGraph(firstGraph), collectionFactory.fromGraph(secondGraph));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void flatMap(E edge, Collector<EdgeString> collector)

GradoopId sourceId = edge.getSourceId();
GradoopId targetId = edge.getTargetId();
String edgeLabel = "[" + label(edge) + "]";
String edgeLabel = "[" + labelWithProperties(edge) + "]";

for (GradoopId graphId : edge.getGraphIds()) {
collector.collect(new EdgeString(graphId, sourceId, targetId, edgeLabel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public abstract class ElementToDataString<EL extends Element> {
* @param el graph head, vertex or edge
* @return string representation
*/
protected String label(EL el) {
protected String labelWithProperties(EL el) {

String label = el.getLabel();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ public class GraphHeadToDataString<G extends GraphHead> extends ElementToDataStr

@Override
public GraphHeadString map(G graph) throws Exception {
return new GraphHeadString(graph.getId(), "|" + label(graph) + "|");
return new GraphHeadString(graph.getId(), "|" + labelWithProperties(graph) + "|");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class VertexToDataString<V extends Vertex>
@Override
public void flatMap(V vertex, Collector<VertexString> collector) throws Exception {
GradoopId vertexId = vertex.getId();
String vertexLabel = "(" + label(vertex) + ")";
String vertexLabel = "(" + labelWithProperties(vertex) + ")";

for (GradoopId graphId : vertex.getGraphIds()) {
collector.collect(new VertexString(graphId, vertexId, vertexLabel));
Expand Down
Loading

0 comments on commit 16115b3

Please sign in to comment.