diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java b/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java index c5be0066fc..3b8fc802d1 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java @@ -5,7 +5,17 @@ package org.opensearch.sql.legacy.cursor; +import static org.opensearch.core.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; import java.util.Base64; import java.util.HashMap; import java.util.List; @@ -16,8 +26,19 @@ import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.Setter; +import lombok.SneakyThrows; import org.json.JSONArray; import org.json.JSONObject; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.search.SearchModule; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.executor.format.Schema; /** @@ -40,6 +61,10 @@ public class DefaultCursor implements Cursor { private static final String SCROLL_ID = "s"; private static final String SCHEMA_COLUMNS = "c"; private static final String FIELD_ALIAS_MAP = "a"; + private static final String PIT_ID = "p"; + private static final String SEARCH_REQUEST = "r"; + private static final String SORT_FIELDS = "h"; + private static final ObjectMapper objectMapper = new ObjectMapper(); /** * To get mappings for index to check if type is date needed for @@ -70,11 +95,28 @@ public class DefaultCursor implements Cursor { /** To get next batch of result */ private String scrollId; + /** To get Point In Time */ + private String pitId; + + /** To get next batch of result with search after api */ + public SearchSourceBuilder searchSourceBuilder; + + /** To get last sort values * */ + private Object[] sortFields; + /** To reduce the number of rows left by fetchSize */ @NonNull private Integer fetchSize; private Integer limit; + /** + * {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder} + * from DSL query string. + */ + private static final NamedXContentRegistry xContentRegistry = + new NamedXContentRegistry( + new SearchModule(Settings.builder().build(), new ArrayList<>()).getNamedXContents()); + @Override public CursorType getType() { return type; @@ -82,30 +124,76 @@ public CursorType getType() { @Override public String generateCursorId() { - if (rowsLeft <= 0 || Strings.isNullOrEmpty(scrollId)) { + boolean isCursorValid = + LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER) + ? Strings.isNullOrEmpty(pitId) + : Strings.isNullOrEmpty(scrollId); + if (rowsLeft <= 0 || isCursorValid) { return null; } JSONObject json = new JSONObject(); json.put(FETCH_SIZE, fetchSize); json.put(ROWS_LEFT, rowsLeft); json.put(INDEX_PATTERN, indexPattern); - json.put(SCROLL_ID, scrollId); json.put(SCHEMA_COLUMNS, getSchemaAsJson()); json.put(FIELD_ALIAS_MAP, fieldAliasMap); - return String.format("%s:%s", type.getId(), encodeCursor(json)); + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + json.put(PIT_ID, pitId); + String sortFieldValue = + AccessController.doPrivileged( + (PrivilegedAction) + () -> { + try { + return objectMapper.writeValueAsString(sortFields); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + json.put(SORT_FIELDS, sortFieldValue); + } else { + json.put(SCROLL_ID, scrollId); + } + return String.format("%s:%s", type.getId(), encodeCursor(json, searchSourceBuilder)); } + @SneakyThrows public static DefaultCursor from(String cursorId) { /** * It is assumed that cursorId here is the second part of the original cursor passed by the * client after removing first part which identifies cursor type */ - JSONObject json = decodeCursor(cursorId); + String[] parts = cursorId.split(":::"); + JSONObject json = decodeCursor(parts[0]); DefaultCursor cursor = new DefaultCursor(); cursor.setFetchSize(json.getInt(FETCH_SIZE)); cursor.setRowsLeft(json.getLong(ROWS_LEFT)); cursor.setIndexPattern(json.getString(INDEX_PATTERN)); - cursor.setScrollId(json.getString(SCROLL_ID)); + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + cursor.setPitId(json.getString(PIT_ID)); + + Object[] sortFieldValue = + AccessController.doPrivileged( + (PrivilegedAction) + () -> { + try { + return objectMapper.readValue(json.getString(SORT_FIELDS), Object[].class); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + cursor.setSortFields(sortFieldValue); + + byte[] bytes = Base64.getDecoder().decode(parts[1]); + ByteArrayInputStream streamInput = new ByteArrayInputStream(bytes); + XContentParser parser = + XContentType.JSON + .xContent() + .createParser(xContentRegistry, IGNORE_DEPRECATIONS, streamInput); + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.fromXContent(parser); + cursor.searchSourceBuilder = sourceBuilder; + } else { + cursor.setScrollId(json.getString(SCROLL_ID)); + } cursor.setColumns(getColumnsFromSchema(json.getJSONArray(SCHEMA_COLUMNS))); cursor.setFieldAliasMap(fieldAliasMap(json.getJSONObject(FIELD_ALIAS_MAP))); @@ -132,8 +220,18 @@ private JSONObject schemaEntry(String name, String alias, String type) { return entry; } - private static String encodeCursor(JSONObject cursorJson) { - return Base64.getEncoder().encodeToString(cursorJson.toString().getBytes()); + @SneakyThrows + private static String encodeCursor(JSONObject cursorJson, SearchSourceBuilder sourceBuilder) { + String jsonBase64 = Base64.getEncoder().encodeToString(cursorJson.toString().getBytes()); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + XContentBuilder builder = XContentFactory.jsonBuilder(outputStream); + sourceBuilder.toXContent(builder, null); + builder.close(); + + String searchRequestBase64 = Base64.getEncoder().encodeToString(outputStream.toByteArray()); + + return jsonBase64 + ":::" + searchRequestBase64; } private static JSONObject decodeCursor(String cursorId) { diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorCloseExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorCloseExecutor.java index 7282eaed4c..cefc62e308 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorCloseExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorCloseExecutor.java @@ -6,6 +6,7 @@ package org.opensearch.sql.legacy.executor.cursor; import static org.opensearch.core.rest.RestStatus.OK; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; import java.util.Map; import org.apache.logging.log4j.LogManager; @@ -18,8 +19,11 @@ import org.opensearch.rest.RestChannel; import org.opensearch.sql.legacy.cursor.CursorType; import org.opensearch.sql.legacy.cursor.DefaultCursor; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.legacy.pit.PointInTimeHandler; +import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl; import org.opensearch.sql.legacy.rewriter.matchtoterm.VerificationException; public class CursorCloseExecutor implements CursorRestExecutor { @@ -79,14 +83,25 @@ public String execute(Client client, Map params) throws Exceptio } private String handleDefaultCursorCloseRequest(Client client, DefaultCursor cursor) { - String scrollId = cursor.getScrollId(); - ClearScrollResponse clearScrollResponse = - client.prepareClearScroll().addScrollId(scrollId).get(); - if (clearScrollResponse.isSucceeded()) { - return SUCCEEDED_TRUE; + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + String pitId = cursor.getPitId(); + PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId); + if (pit.delete()) { + return SUCCEEDED_TRUE; + } else { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + return SUCCEEDED_FALSE; + } } else { - Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - return SUCCEEDED_FALSE; + String scrollId = cursor.getScrollId(); + ClearScrollResponse clearScrollResponse = + client.prepareClearScroll().addScrollId(scrollId).get(); + if (clearScrollResponse.isSucceeded()) { + return SUCCEEDED_TRUE; + } else { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + return SUCCEEDED_FALSE; + } } } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorResultExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorResultExecutor.java index 66c69f3430..a8f053d50b 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorResultExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorResultExecutor.java @@ -6,6 +6,8 @@ package org.opensearch.sql.legacy.executor.cursor; import static org.opensearch.core.rest.RestStatus.OK; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; import java.util.Arrays; import java.util.Map; @@ -14,6 +16,7 @@ import org.json.JSONException; import org.opensearch.OpenSearchException; import org.opensearch.action.search.ClearScrollResponse; +import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; import org.opensearch.common.unit.TimeValue; @@ -21,7 +24,8 @@ import org.opensearch.rest.RestChannel; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; -import org.opensearch.sql.common.setting.Settings; +import org.opensearch.search.builder.PointInTimeBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.legacy.cursor.CursorType; import org.opensearch.sql.legacy.cursor.DefaultCursor; import org.opensearch.sql.legacy.esdomain.LocalClusterState; @@ -29,6 +33,8 @@ import org.opensearch.sql.legacy.executor.format.Protocol; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.legacy.pit.PointInTimeHandler; +import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl; import org.opensearch.sql.legacy.rewriter.matchtoterm.VerificationException; public class CursorResultExecutor implements CursorRestExecutor { @@ -91,14 +97,27 @@ public String execute(Client client, Map params) throws Exceptio } private String handleDefaultCursorRequest(Client client, DefaultCursor cursor) { - String previousScrollId = cursor.getScrollId(); LocalClusterState clusterState = LocalClusterState.state(); - TimeValue scrollTimeout = clusterState.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE); - SearchResponse scrollResponse = - client.prepareSearchScroll(previousScrollId).setScroll(scrollTimeout).get(); + TimeValue paginationTimeout = clusterState.getSettingValue(SQL_CURSOR_KEEP_ALIVE); + + SearchResponse scrollResponse = null; + if (clusterState.getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + String pitId = cursor.getPitId(); + SearchSourceBuilder source = cursor.getSearchSourceBuilder(); + source.searchAfter(cursor.getSortFields()); + source.pointInTimeBuilder(new PointInTimeBuilder(pitId)); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(source); + scrollResponse = client.search(searchRequest).actionGet(); + } else { + String previousScrollId = cursor.getScrollId(); + scrollResponse = + client.prepareSearchScroll(previousScrollId).setScroll(paginationTimeout).get(); + } SearchHits searchHits = scrollResponse.getHits(); SearchHit[] searchHitArray = searchHits.getHits(); String newScrollId = scrollResponse.getScrollId(); + String newPitId = scrollResponse.pointInTimeId(); int rowsLeft = (int) cursor.getRowsLeft(); int fetch = cursor.getFetchSize(); @@ -124,16 +143,35 @@ private String handleDefaultCursorRequest(Client client, DefaultCursor cursor) { if (rowsLeft <= 0) { /** Clear the scroll context on last page */ - ClearScrollResponse clearScrollResponse = - client.prepareClearScroll().addScrollId(newScrollId).get(); - if (!clearScrollResponse.isSucceeded()) { - Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - LOG.info("Error closing the cursor context {} ", newScrollId); + if (newScrollId != null) { + ClearScrollResponse clearScrollResponse = + client.prepareClearScroll().addScrollId(newScrollId).get(); + if (!clearScrollResponse.isSucceeded()) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + LOG.info("Error closing the cursor context {} ", newScrollId); + } + } + if (newPitId != null) { + PointInTimeHandler pit = new PointInTimeHandlerImpl(client, newPitId); + if (!pit.delete()) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + LOG.info("Error deleting point in time {} ", newPitId); + } } } cursor.setRowsLeft(rowsLeft); - cursor.setScrollId(newScrollId); + if (clusterState.getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + cursor.setPitId(newPitId); + cursor.setSearchSourceBuilder(cursor.getSearchSourceBuilder()); + cursor.setSortFields( + scrollResponse + .getHits() + .getAt(scrollResponse.getHits().getHits().length - 1) + .getSortValues()); + } else { + cursor.setScrollId(newScrollId); + } Protocol protocol = new Protocol(client, searchHits, format.name().toLowerCase(), cursor); return protocol.cursorFormat(); } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java index 00feabf5d8..5f758e7d87 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java @@ -5,23 +5,31 @@ package org.opensearch.sql.legacy.executor.format; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; + import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchException; +import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; import org.opensearch.core.common.Strings; import org.opensearch.core.rest.RestStatus; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestChannel; +import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.sql.legacy.cursor.Cursor; import org.opensearch.sql.legacy.cursor.DefaultCursor; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.executor.QueryActionElasticExecutor; import org.opensearch.sql.legacy.executor.RestExecutor; +import org.opensearch.sql.legacy.pit.PointInTimeHandler; +import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl; import org.opensearch.sql.legacy.query.DefaultQueryAction; import org.opensearch.sql.legacy.query.QueryAction; +import org.opensearch.sql.legacy.query.SqlOpenSearchRequestBuilder; import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy; public class PrettyFormatRestExecutor implements RestExecutor { @@ -90,15 +98,32 @@ public String execute(Client client, Map params, QueryAction que private Protocol buildProtocolForDefaultQuery(Client client, DefaultQueryAction queryAction) throws SqlParseException { - SearchResponse response = (SearchResponse) queryAction.explain().get(); - String scrollId = response.getScrollId(); + PointInTimeHandler pit = null; + SearchResponse response; + SqlOpenSearchRequestBuilder sqlOpenSearchRequestBuilder = queryAction.explain(); + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + pit = new PointInTimeHandlerImpl(client, queryAction.getSelect().getIndexArr()); + pit.create(); + SearchRequestBuilder searchRequest = queryAction.getRequestBuilder(); + searchRequest.setPointInTime(new PointInTimeBuilder(pit.getPitId())); + response = searchRequest.get(); + } else { + response = (SearchResponse) sqlOpenSearchRequestBuilder.get(); + } Protocol protocol; - if (!Strings.isNullOrEmpty(scrollId)) { + if (isDefaultCursor(response, queryAction)) { DefaultCursor defaultCursor = new DefaultCursor(); - defaultCursor.setScrollId(scrollId); defaultCursor.setLimit(queryAction.getSelect().getRowCount()); defaultCursor.setFetchSize(queryAction.getSqlRequest().fetchSize()); + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + defaultCursor.setPitId(pit.getPitId()); + defaultCursor.setSearchSourceBuilder(queryAction.getRequestBuilder().request().source()); + defaultCursor.setSortFields( + response.getHits().getAt(response.getHits().getHits().length - 1).getSortValues()); + } else { + defaultCursor.setScrollId(response.getScrollId()); + } protocol = new Protocol(client, queryAction, response.getHits(), format, defaultCursor); } else { protocol = new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR); @@ -106,4 +131,16 @@ private Protocol buildProtocolForDefaultQuery(Client client, DefaultQueryAction return protocol; } + + private boolean isDefaultCursor(SearchResponse searchResponse, DefaultQueryAction queryAction) { + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + if (searchResponse.getHits().getTotalHits().value < queryAction.getSqlRequest().fetchSize()) { + return false; + } else { + return true; + } + } else { + return !Strings.isNullOrEmpty(searchResponse.getScrollId()); + } + } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java index c60691cb7c..a941d99a60 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java @@ -40,6 +40,7 @@ import org.opensearch.search.aggregations.metrics.NumericMetricsAggregation; import org.opensearch.search.aggregations.metrics.Percentile; import org.opensearch.search.aggregations.metrics.Percentiles; +import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.legacy.cursor.Cursor; import org.opensearch.sql.legacy.cursor.DefaultCursor; import org.opensearch.sql.legacy.domain.ColumnTypeProvider; @@ -49,11 +50,14 @@ import org.opensearch.sql.legacy.domain.Query; import org.opensearch.sql.legacy.domain.Select; import org.opensearch.sql.legacy.domain.TableOnJoinSelect; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.esdomain.mapping.FieldMapping; import org.opensearch.sql.legacy.exception.SqlFeatureNotImplementedException; import org.opensearch.sql.legacy.executor.Format; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.legacy.pit.PointInTimeHandler; +import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl; import org.opensearch.sql.legacy.utils.SQLFunctions; public class SelectResultSet extends ResultSet { @@ -564,12 +568,22 @@ private void populateDefaultCursor(DefaultCursor cursor) { long rowsLeft = rowsLeft(cursor.getFetchSize(), cursor.getLimit()); if (rowsLeft <= 0) { // close the cursor - String scrollId = cursor.getScrollId(); - ClearScrollResponse clearScrollResponse = - client.prepareClearScroll().addScrollId(scrollId).get(); - if (!clearScrollResponse.isSucceeded()) { - Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - LOG.error("Error closing the cursor context {} ", scrollId); + if (LocalClusterState.state().getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) { + String pitId = cursor.getPitId(); + PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId); + if (!pit.delete()) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + LOG.info("Error deleting point in time {} ", pitId); + } + } else { + // close the cursor + String scrollId = cursor.getScrollId(); + ClearScrollResponse clearScrollResponse = + client.prepareClearScroll().addScrollId(scrollId).get(); + if (!clearScrollResponse.isSucceeded()) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + LOG.error("Error closing the cursor context {} ", scrollId); + } } return; } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandler.java b/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandler.java index 66339cc70a..b094055d69 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandler.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandler.java @@ -7,11 +7,19 @@ /** Point In Time */ public interface PointInTimeHandler { - /** Create Point In Time */ - void create(); + /** + * Create Point In Time + * + * @return Point In Time creation status + */ + boolean create(); - /** Delete Point In Time */ - void delete(); + /** + * Delete Point In Time + * + * @return Point In Time deletion status + */ + boolean delete(); /** Get Point In Time Identifier */ String getPitId(); diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImpl.java b/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImpl.java index 64535749e8..88371831c7 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImpl.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/pit/PointInTimeHandlerImpl.java @@ -24,6 +24,8 @@ public class PointInTimeHandlerImpl implements PointInTimeHandler { private Client client; private String[] indices; @Getter @Setter private String pitId; + private Boolean deleteStatus = null; + private Boolean createStatus = null; private static final Logger LOG = LogManager.getLogger(); /** @@ -37,9 +39,24 @@ public PointInTimeHandlerImpl(Client client, String[] indices) { this.indices = indices; } - /** Create PIT for given indices */ + /** + * Constructor for class + * + * @param client OpenSearch client + * @param pitId Point In Time ID + */ + public PointInTimeHandlerImpl(Client client, String pitId) { + this.client = client; + this.pitId = pitId; + } + + /** + * Create PIT for given indices + * + * @return Point In Time creation status + */ @Override - public void create() { + public boolean create() { CreatePitRequest createPitRequest = new CreatePitRequest( LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE), false, indices); @@ -49,25 +66,40 @@ public void create() { @Override public void onResponse(CreatePitResponse createPitResponse) { pitId = createPitResponse.getId(); + createStatus = true; LOG.info("Created Point In Time {} successfully.", pitId); } @Override public void onFailure(Exception e) { + createStatus = false; LOG.error("Error occurred while creating PIT", e); } }); + while (createStatus == null) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOG.error("Error occurred while creating PIT", e); + } + } + return createStatus; } - /** Delete PIT */ + /** + * Delete PIT + * + * @return delete status + */ @Override - public void delete() { + public boolean delete() { DeletePitRequest deletePitRequest = new DeletePitRequest(pitId); client.deletePits( deletePitRequest, new ActionListener<>() { @Override public void onResponse(DeletePitResponse deletePitResponse) { + deleteStatus = true; LOG.info( "Delete Point In Time {} status: {}", pitId, @@ -76,8 +108,18 @@ public void onResponse(DeletePitResponse deletePitResponse) { @Override public void onFailure(Exception e) { + deleteStatus = false; LOG.error("Error occurred while deleting PIT", e); } }); + while (deleteStatus == null) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOG.error("Error occurred while deleting PIT", e); + } + } + + return deleteStatus; } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/DefaultQueryAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/DefaultQueryAction.java index 18c9708df8..9877b17a8f 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/DefaultQueryAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/query/DefaultQueryAction.java @@ -5,6 +5,8 @@ package org.opensearch.sql.legacy.query; +import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; + import com.alibaba.druid.sql.ast.SQLExpr; import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr; import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator; @@ -100,7 +102,19 @@ public void checkAndSetScroll() { .getNumericalMetric(MetricName.DEFAULT_CURSOR_REQUEST_COUNT_TOTAL) .increment(); Metrics.getInstance().getNumericalMetric(MetricName.DEFAULT_CURSOR_REQUEST_TOTAL).increment(); - request.setSize(fetchSize).setScroll(timeValue); + request.setSize(fetchSize); + // Set scroll or search after for pagination + if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { + // search after requires results to be in specific order + // set sort field for search_after + boolean ordered = select.isOrderdSelect(); + if (!ordered) { + request.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC); + } + // Request also requires PointInTime, but we should create pit while execution. + } else { + request.setScroll(timeValue); + } } else { request.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); setLimit(select.getOffset(), rowCount != null ? rowCount : Select.DEFAULT_LIMIT);