Skip to content

Commit

Permalink
Indexing judgments as a collection to have a single judgments ID.
Browse files Browse the repository at this point in the history
Signed-off-by: jzonthemtn <[email protected]>
  • Loading branch information
jzonthemtn committed Dec 3, 2024
1 parent c47127d commit 28f166d
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/bin/bash -e

echo "Deleting existing judgments index..."
curl -s -X DELETE http://localhost:9200/judgments

echo "Creating judgments..."
curl -s -X POST "http://localhost:9200/_plugins/search_quality_eval/judgments?click_model=coec&max_rank=20"
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash -e

curl -s "http://localhost:9200/judgments/_search" | jq
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
final long startTime = System.currentTimeMillis();
final String clickModel = request.param("click_model", "coec");
final int maxRank = Integer.parseInt(request.param("max_rank", "20"));
final long judgments;
final long judgmentCount;

if (CoecClickModel.CLICK_MODEL_NAME.equalsIgnoreCase(clickModel)) {

Expand All @@ -210,7 +210,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli

// TODO: Run this in a separate thread.
try {
judgments = coecClickModel.calculateJudgments();
judgmentCount = coecClickModel.calculateJudgments();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -222,15 +222,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
job.put("click_model", clickModel);
job.put("started", startTime);
job.put("duration", elapsedTime);
job.put("judgments", judgments);
job.put("judgment_count", judgmentCount);
job.put("invocation", "on_demand");
job.put("max_rank", maxRank);

final String judgmentsId = UUID.randomUUID().toString();
final String jobId = UUID.randomUUID().toString();

final IndexRequest indexRequest = new IndexRequest()
.index(SearchQualityEvaluationPlugin.COMPLETED_JOBS_INDEX_NAME)
.id(judgmentsId)
.id(jobId)
.source(job)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

Expand All @@ -239,19 +239,19 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
client.index(indexRequest, new ActionListener<>() {
@Override
public void onResponse(final IndexResponse indexResponse) {
LOGGER.debug("Judgments indexed: {}", judgmentsId);
LOGGER.debug("Job completed successfully: {}", jobId);
success.set(true);
}

@Override
public void onFailure(final Exception ex) {
LOGGER.error("Unable to index judgment with ID {}", judgmentsId, ex);
LOGGER.error("Unable to run job with ID {}", jobId, ex);
success.set(false);
}
});

if(success.get()) {
return restChannel -> restChannel.sendResponse(new BytesRestResponse(RestStatus.OK, "{\"judgments_id\": \"" + judgmentsId + "\"}"));
return restChannel -> restChannel.sendResponse(new BytesRestResponse(RestStatus.OK, "{\"judgments_id\": \"" + jobId + "\"}"));
} else {
return restChannel -> restChannel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,"Unable to index judgments."));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.eval.judgments.util.MathUtils;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* A judgment of a search result's quality for a given query.
Expand Down Expand Up @@ -46,6 +48,18 @@ public String toJudgmentString() {
return queryId + ", " + query + ", " + document + ", " + MathUtils.round(judgment);
}

public Map<String, Object> getJudgmentAsMap() {

final Map<String, Object> judgmentMap = new HashMap<>();
judgmentMap.put("query_id", queryId);
judgmentMap.put("query", query);
judgmentMap.put("document", document);
judgmentMap.put("judgment", judgment);

return judgmentMap;

}

/**
* A convenience function to output the judgments.
* @param judgments A collection of {@link Judgment}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.core.action.ActionListener;
import org.opensearch.eval.judgments.model.ClickthroughRate;
import org.opensearch.eval.judgments.model.Judgment;
import org.opensearch.eval.judgments.model.ubi.query.UbiQuery;
Expand Down Expand Up @@ -312,32 +314,41 @@ public void indexClickthroughRates(final Map<String, Set<ClickthroughRate>> clic
* Index the judgments.
* @param judgments A collection of {@link Judgment judgments}.
* @throws IOException Thrown when there is a problem accessing OpenSearch.
* @return The ID of the indexed judgments.
*/
public void indexJudgments(final Collection<Judgment> judgments) throws Exception {
public String indexJudgments(final Collection<Judgment> judgments) throws Exception {

if(!judgments.isEmpty()) {
final String judgmentsId = UUID.randomUUID().toString();

// TODO: Split this into multiple bulk insert requests.
final Collection<Map<String, Object>> j = new ArrayList<>();

final BulkRequest request = new BulkRequest();
for (final Judgment judgment : judgments) {
j.add(judgment.getJudgmentAsMap());
}

for (final Judgment judgment : judgments) {
final Map<String, Object> judgmentsSource = new HashMap<>();
judgmentsSource.put("judgments", j);

final Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("query_id", judgment.getQueryId());
jsonMap.put("query", judgment.getQuery());
jsonMap.put("document", judgment.getDocument());
jsonMap.put("judgment", judgment.getJudgment());
final IndexRequest indexRequest = new IndexRequest(INDEX_JUDGMENTS)
.id(judgmentsId)
.source(judgmentsSource);

final IndexRequest indexRequest = new IndexRequest(INDEX_JUDGMENTS).id(UUID.randomUUID().toString()).source(jsonMap);

request.add(indexRequest);
final BulkRequest request = new BulkRequest();
request.add(indexRequest);

client.bulk(request, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
LOGGER.info("Judgments indexed: {}", judgmentsId);
}

client.bulk(request).get();
@Override
public void onFailure(Exception ex) {
throw new RuntimeException("Unable to insert judgments.", ex);
}
});

}
return judgmentsId;

}

Expand Down

0 comments on commit 28f166d

Please sign in to comment.