Skip to content

Commit

Permalink
#5 #10 #11 updated implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
Thorsten Marx committed Sep 15, 2023
1 parent ae51475 commit a09b682
Show file tree
Hide file tree
Showing 13 changed files with 476 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,14 @@ public ElasticsearchIndexAdapter(final ElasticsearchIndexConfiguration configura

@Override
public void indexDocument(String database, String collection, Document document) throws IOException {
InsertMessage message = new InsertMessage(document.getObjectId("_id").toString(), database, collection, document);
index(message);
}

@Override
public void clearCollection(String database, String collection) throws IOException {
DropCollectionMessage message = new DropCollectionMessage(database, collection);
dropCollection(message);
}

public void commit() {
Expand Down Expand Up @@ -105,7 +110,7 @@ public void update() {
} else if (message instanceof DeleteMessage deleteCommand) {
delete(deleteCommand);
} else if (message instanceof UpdateMessage updateCommand) {
ElasticsearchIndexAdapter.this.update(updateCommand);
updateDocument(updateCommand);
} else if (message instanceof DropCollectionMessage dropCollectionCommand) {
dropCollection(dropCollectionCommand);
}
Expand All @@ -118,20 +123,30 @@ public void update() {
queueWorkerThread.start();
}

private Map<String, Object> createDocument(final String database, final String collection, final Document document) {
Map<String, Object> indexDocument = new HashMap<>();
indexDocument.put("_database", database);
indexDocument.put("_collection", collection);
if (configuration.hasFieldConfigurations(collection)) {
var fieldConfigs = configuration.getFieldConfigurations(collection);
fieldConfigs.forEach((fc) -> {
var value = fc.getMapper().getFieldValue(fc.getFieldName(), document);
indexDocument.put(fc.getIndexFieldName(), value);
});
}

if (configuration.getDocumentExtender() != null) {
configuration.getDocumentExtender().accept(document, indexDocument);
}

return indexDocument;
}

private void index(InsertMessage command) {
try {

Map<String, Object> document = new HashMap<>();
document.put("_database", command.database());
document.put("_collection", command.collection());
if (configuration.hasFieldConfigurations(command.collection())) {
var fieldConfigs = configuration.getFieldConfigurations(command.collection());
fieldConfigs.forEach((fc) -> {
var value = fc.getMapper().getFieldValue(fc.getFieldName(), command.document());
document.put(fc.getIndexFieldName(), value);
});
}

Map<String, Object> document = createDocument(command.database(), command.collection(), command.document());

IndexResponse response = esClient.index(i -> i
.index(configuration.getIndexNameMapper().apply(command.database(), command.collection()))
.id(command.uid())
Expand All @@ -141,19 +156,10 @@ private void index(InsertMessage command) {
}
}

private void update(UpdateMessage command) {
private void updateDocument(UpdateMessage command) {
try {

Map<String, Object> document = new HashMap<>();
document.put("_database", command.database());
document.put("_collection", command.collection());
if (configuration.hasFieldConfigurations(command.collection())) {
var fieldConfigs = configuration.getFieldConfigurations(command.collection());
fieldConfigs.forEach((fc) -> {
var value = fc.getMapper().getFieldValue(fc.getFieldName(), command.document());
document.put(fc.getIndexFieldName(), value);
});
}
Map<String, Object> document = createDocument(command.database(), command.collection(), command.document());

esClient.update(
u -> u.index(configuration.getIndexNameMapper().apply(command.database(), command.collection()))
Expand All @@ -178,23 +184,23 @@ private void delete(DeleteMessage command) {

/**
* In elasticsearch the index is not dropped, we just delete all documents
*
*
* @param command
*/
private void dropCollection(DropCollectionMessage command) {
try {

boolean exists = esClient.indices().exists(fn -> fn.index(configuration.getIndexNameMapper().apply(command.database(), command.collection()))).value();

if (exists) {
//esClient.indices().delete(fn -> fn.index(configuration.getIndexNameMapper().apply(command.database(), command.collection())));
esClient.deleteByQuery(fn ->
fn.index(configuration.getIndexNameMapper().apply(command.database(), command.collection()))
.query(qb ->
qb.match(t ->
t.field("_collection").query(command.collection())
)
)
esClient.deleteByQuery(fn
-> fn.index(configuration.getIndexNameMapper().apply(command.database(), command.collection()))
.query(qb
-> qb.match(t
-> t.field("_collection").query(command.collection())
)
)
);
}
} catch (IOException | ElasticsearchException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,13 @@

import com.github.thmarx.mongo.search.index.configuration.FieldConfiguration;
import com.github.thmarx.mongo.search.index.configuration.IndexConfiguration;
import com.github.thmarx.mongo.search.index.utils.MultiMap;
import java.util.Collection;
import java.util.function.BiFunction;
import lombok.Getter;
import lombok.Setter;
import java.util.Map;
import org.bson.Document;

/**
*
* @author t.marx
*/
public class ElasticsearchIndexConfiguration extends IndexConfiguration {
final MultiMap<String, FieldConfiguration> fieldConfigurations = new MultiMap<>();
public class ElasticsearchIndexConfiguration extends IndexConfiguration<Document, Map<String, Object>, FieldConfiguration> {

@Getter
@Setter
private BiFunction<String, String, String> indexNameMapper = (database, collection) -> collection;

public ElasticsearchIndexConfiguration addFieldConfiguration (final String collection, final FieldConfiguration fieldConfig) {
fieldConfigurations.put(collection, fieldConfig);
return this;
}

public Collection<FieldConfiguration> getFieldConfigurations (final String collection) {
return fieldConfigurations.get(collection);
}
public boolean hasFieldConfigurations (final String collection) {
return fieldConfigurations.containsKey(collection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

@SuperBuilder
@Getter
public class LuceneFieldConfiguration extends FieldConfiguration<Document, org.apache.lucene.document.Document> {
public class LuceneFieldConfiguration extends FieldConfiguration {

/**
* Field value is stored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
*
* @author t.marx
*/
public class LuceneIndexConfiguration extends IndexConfiguration<org.bson.Document, Document> {
public class LuceneIndexConfiguration extends IndexConfiguration<org.bson.Document, Document, LuceneFieldConfiguration> {
Storage storage;

Analyzer analyzer;
Expand All @@ -47,24 +47,12 @@ public class LuceneIndexConfiguration extends IndexConfiguration<org.bson.Docume
@Setter
FacetsConfig facetsConfig;

final MultiMap<String, LuceneFieldConfiguration> fieldConfigurations = new MultiMap<>();


@Getter
@Setter
DateTimeFormatter defaultDateFormatter = DateTimeFormatter.ISO_LOCAL_DATE;

public LuceneIndexConfiguration addFieldConfiguration (final String collection, final LuceneFieldConfiguration fieldConfig) {
fieldConfigurations.put(collection, fieldConfig);
return this;
}

public Collection<LuceneFieldConfiguration> getFieldConfigurations (final String collection) {
return fieldConfigurations.get(collection);
}
public boolean hasFieldConfigurations (final String collection) {
return fieldConfigurations.containsKey(collection);
}

public LuceneIndexConfiguration setStorage (final Storage storage) {
this.storage = storage;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ public OpensearchIndexAdapter(final OpensearchIndexConfiguration configuration)

@Override
public void indexDocument(String database, String collection, Document document) throws IOException {

InsertMessage message = new InsertMessage(document.getObjectId("_id").toString(), database, collection, document);
index(message);
}

@Override
public void clearCollection(String database, String collection) throws IOException {

DropCollectionMessage message = new DropCollectionMessage(database, collection);
dropCollection(message);
}

public void commit() {
Expand Down Expand Up @@ -105,7 +107,7 @@ public void update() {
} else if (command instanceof DeleteMessage deleteCommand) {
delete(deleteCommand);
} else if (command instanceof UpdateMessage updateCommand) {
OpensearchIndexAdapter.this.update(updateCommand);
updateDocument(updateCommand);
} else if (command instanceof DropCollectionMessage dropCollectionCommand) {
dropCollection(dropCollectionCommand);
}
Expand All @@ -118,19 +120,29 @@ public void update() {
queueWorkerThread.start();
}

private Map<String, Object> createDocument(final String database, final String collection, final Document document) {
Map<String, Object> indexDocument = new HashMap<>();
indexDocument.put("_database", database);
indexDocument.put("_collection", collection);
if (configuration.hasFieldConfigurations(collection)) {
var fieldConfigs = configuration.getFieldConfigurations(collection);
fieldConfigs.forEach((fc) -> {
var value = fc.getMapper().getFieldValue(fc.getFieldName(), document);
indexDocument.put(fc.getIndexFieldName(), value);
});
}

if (configuration.getDocumentExtender() != null) {
configuration.getDocumentExtender().accept(document, indexDocument);
}

return indexDocument;
}

private void index(InsertMessage command) {
try {

Map<String, Object> document = new HashMap<>();
document.put("_database", command.database());
document.put("_collection", command.collection());
if (configuration.hasFieldConfigurations(command.collection())) {
var fieldConfigs = configuration.getFieldConfigurations(command.collection());
fieldConfigs.forEach((fc) -> {
var value = fc.getMapper().getFieldValue(fc.getFieldName(), command.document());
document.put(fc.getIndexFieldName(), value);
});
}
Map<String, Object> document = createDocument(command.database(), command.collection(), command.document());

IndexResponse response = osClient.index(i -> i
.index(configuration.getIndexNameMapper().apply(command.database(), command.collection()))
Expand All @@ -141,26 +153,16 @@ private void index(InsertMessage command) {
Logger.getLogger(OpensearchIndexAdapter.class.getName()).log(Level.SEVERE, null, ex);
}
}

private void update(UpdateMessage command) {
try {

Map<String, Object> document = new HashMap<>();
document.put("_database", command.database());
document.put("_collection", command.collection());
if (configuration.hasFieldConfigurations(command.collection())) {
var fieldConfigs = configuration.getFieldConfigurations(command.collection());
fieldConfigs.forEach((fc) -> {
var value = fc.getMapper().getFieldValue(fc.getFieldName(), command.document());
document.put(fc.getIndexFieldName(), value);
});
}
private void updateDocument(UpdateMessage command) {
try {
Map<String, Object> document = createDocument(command.database(), command.collection(), command.document());

osClient.update(u ->
u.index(configuration.getIndexNameMapper().apply(command.database(), command.collection()))
.id(command.uid())
.doc(document)
, Map.class
osClient.update(u
-> u.index(configuration.getIndexNameMapper().apply(command.database(), command.collection()))
.id(command.uid())
.doc(document),
Map.class
);
} catch (IOException ex) {
Logger.getLogger(OpensearchIndexAdapter.class.getName()).log(Level.SEVERE, null, ex);
Expand All @@ -178,25 +180,26 @@ private void delete(DeleteMessage command) {
Logger.getLogger(OpensearchIndexAdapter.class.getName()).log(Level.SEVERE, null, ex);
}
}

/**
* For OpenSearch the collection will not be dropped!
*
* @param command
*/
private void dropCollection(DropCollectionMessage command) {
try {

boolean exists = osClient.indices().exists(fn -> fn.index(configuration.getIndexNameMapper().apply(command.database(), command.collection()))).value();

if (exists) {
//osClient.indices().delete(fn -> fn.index(configuration.getIndexNameMapper().apply(command.database(), command.collection())));
osClient.deleteByQuery(fn ->
fn.index(configuration.getIndexNameMapper().apply(command.database(), command.collection()))
.query(qb ->
qb.match(t ->
t.field("_collection").query(FieldValue.of(command.collection()))
)
)
osClient.deleteByQuery(fn
-> fn.index(configuration.getIndexNameMapper().apply(command.database(), command.collection()))
.query(qb
-> qb.match(t
-> t.field("_collection").query(FieldValue.of(command.collection()))
)
)
);
}
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,13 @@

import com.github.thmarx.mongo.search.index.configuration.FieldConfiguration;
import com.github.thmarx.mongo.search.index.configuration.IndexConfiguration;
import com.github.thmarx.mongo.search.index.utils.MultiMap;
import java.util.Collection;
import java.util.function.BiFunction;
import lombok.Getter;
import lombok.Setter;
import java.util.Map;
import org.bson.Document;

/**
*
* @author t.marx
*/
public class OpensearchIndexConfiguration extends IndexConfiguration {
final MultiMap<String, FieldConfiguration> fieldConfigurations = new MultiMap<>();
public class OpensearchIndexConfiguration extends IndexConfiguration<Document, Map<String, Object>, FieldConfiguration> {

@Getter
@Setter
private BiFunction<String, String, String> indexNameMapper = (database, collection) -> collection;

public OpensearchIndexConfiguration addFieldConfiguration (final String collection, final FieldConfiguration fieldConfig) {
fieldConfigurations.put(collection, fieldConfig);
return this;
}

public Collection<FieldConfiguration> getFieldConfigurations (final String collection) {
return fieldConfigurations.get(collection);
}
public boolean hasFieldConfigurations (final String collection) {
return fieldConfigurations.containsKey(collection);
}
}
2 changes: 1 addition & 1 deletion adapters/solr/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<artifactId>mongo-search-adapters</artifactId>
<version>0.1</version>
</parent>
<artifactId>monog-search-adapters-solr</artifactId>
<artifactId>mongo-search-adapters-solr</artifactId>
<packaging>jar</packaging>
<properties>
<solr.version>9.3.0</solr.version>
Expand Down
Loading

0 comments on commit a09b682

Please sign in to comment.