Skip to content

Commit

Permalink
[Streaming Indexing] Enhance RestAction with request / response strea…
Browse files Browse the repository at this point in the history
…ming support (opensearch-project#13772) (opensearch-project#14149)

Signed-off-by: Andriy Redko <[email protected]>
(cherry picked from commit c71060e)
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
reta authored and kkewwei committed Jul 24, 2024
1 parent c550d74 commit 11cc4fa
Show file tree
Hide file tree
Showing 29 changed files with 1,513 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add recovery chunk size setting ([#13997](https://github.com/opensearch-project/OpenSearch/pull/13997))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
- [Streaming Indexing] Enhance RestAction with request / response streaming support ([#13772](https://github.com/opensearch-project/OpenSearch/pull/13772))
- Move Remote Store Migration from DocRep to GA and modify remote migration settings name ([#14100](https://github.com/opensearch-project/OpenSearch/pull/14100))
- [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13995](https://github.com/opensearch-project/OpenSearch/pull/13995))
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2227,11 +2227,11 @@ protected final <Resp> Resp parseEntity(final HttpEntity entity, final CheckedFu
if (entity.getContentType() == null) {
throw new IllegalStateException("OpenSearch didn't return the [Content-Type] header, unable to parse response body");
}
MediaType medaiType = MediaType.fromMediaType(entity.getContentType().getValue());
if (medaiType == null) {
MediaType mediaType = MediaType.fromMediaType(entity.getContentType().getValue());
if (mediaType == null) {
throw new IllegalStateException("Unsupported Content-Type: " + entity.getContentType().getValue());
}
try (XContentParser parser = medaiType.xContent().createParser(registry, DEPRECATION_HANDLER, entity.getContent())) {
try (XContentParser parser = mediaType.xContent().createParser(registry, DEPRECATION_HANDLER, entity.getContent())) {
return entityParser.apply(parser);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.rest.action.admin.indices.RestRefreshAction;
import org.opensearch.rest.action.admin.indices.RestUpdateSettingsAction;
import org.opensearch.rest.action.document.RestBulkAction;
import org.opensearch.rest.action.document.RestBulkStreamingAction;
import org.opensearch.rest.action.document.RestDeleteAction;
import org.opensearch.rest.action.document.RestGetAction;
import org.opensearch.rest.action.document.RestIndexAction;
Expand Down Expand Up @@ -127,6 +128,7 @@ public List<RestHandler> getRestHandlers(
new OpenSearchDashboardsWrappedRestHandler(new RestMultiGetAction(settings)),
new OpenSearchDashboardsWrappedRestHandler(new RestSearchAction()),
new OpenSearchDashboardsWrappedRestHandler(new RestBulkAction(settings)),
new OpenSearchDashboardsWrappedRestHandler(new RestBulkStreamingAction(settings)),
new OpenSearchDashboardsWrappedRestHandler(new RestDeleteAction()),
new OpenSearchDashboardsWrappedRestHandler(new RestDeleteByQueryAction()),

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.reactor.netty4;

import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.http.HttpChunk;
import org.opensearch.transport.reactor.netty4.Netty4Utils;

import java.util.concurrent.atomic.AtomicBoolean;

import io.netty.buffer.ByteBuf;

class ReactorNetty4HttpChunk implements HttpChunk {
private final AtomicBoolean released;
private final boolean pooled;
private final ByteBuf content;
private final boolean last;

ReactorNetty4HttpChunk(ByteBuf content, boolean last) {
this.content = content;
this.pooled = true;
this.released = new AtomicBoolean(false);
this.last = last;
}

@Override
public BytesReference content() {
assert released.get() == false;
return Netty4Utils.toBytesReference(content);
}

@Override
public void close() {
if (pooled && released.compareAndSet(false, true)) {
content.release();
}
}

@Override
public boolean isLast() {
return last;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class ReactorNetty4HttpRequest implements HttpRequest {
private final Exception inboundException;
private final boolean pooled;

ReactorNetty4HttpRequest(HttpServerRequest request) {
this(request, new HttpHeadersMap(request.requestHeaders()), new AtomicBoolean(false), false, Unpooled.EMPTY_BUFFER);
}

ReactorNetty4HttpRequest(HttpServerRequest request, ByteBuf content) {
this(request, new HttpHeadersMap(request.requestHeaders()), new AtomicBoolean(false), true, content);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.opensearch.http.HttpServerChannel;
import org.opensearch.http.reactor.netty4.ssl.SslUtils;
import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest.Method;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.reactor.SharedGroupFactory;
Expand All @@ -40,6 +42,7 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -351,24 +354,45 @@ public List<String> protocols() {
* @return response publisher
*/
protected Publisher<Void> incomingRequest(HttpServerRequest request, HttpServerResponse response) {
final NonStreamingRequestConsumer<HttpContent> consumer = new NonStreamingRequestConsumer<>(
this,
request,
response,
maxCompositeBufferComponents
final Method method = HttpConversionUtil.convertMethod(request.method());
final Optional<RestHandler> dispatchHandlerOpt = dispatcher.dispatchHandler(
request.uri(),
request.fullPath(),
method,
request.params()
);
if (dispatchHandlerOpt.map(RestHandler::supportsStreaming).orElse(false)) {
final ReactorNetty4StreamingRequestConsumer<HttpContent> consumer = new ReactorNetty4StreamingRequestConsumer<>(
request,
response
);

request.receiveContent()
.switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT))
.subscribe(consumer, error -> {}, () -> consumer.accept(DefaultLastHttpContent.EMPTY_LAST_CONTENT));

incomingStream(new ReactorNetty4HttpRequest(request), consumer.httpChannel());
return response.sendObject(consumer);
} else {
final ReactorNetty4NonStreamingRequestConsumer<HttpContent> consumer = new ReactorNetty4NonStreamingRequestConsumer<>(
this,
request,
response,
maxCompositeBufferComponents
);

request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer);

return Mono.from(consumer).flatMap(hc -> {
final FullHttpResponse r = (FullHttpResponse) hc;
response.status(r.status());
response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue())));
response.chunkedTransfer(false);
response.compression(true);
r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue()));
return Mono.from(response.sendObject(r.content()));
});
request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer);

return Mono.from(consumer).flatMap(hc -> {
final FullHttpResponse r = (FullHttpResponse) hc;
response.status(r.status());
response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue())));
response.chunkedTransfer(false);
response.compression(true);
r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue()));
return Mono.from(response.sendObject(r.content()));
});
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

class NonStreamingHttpChannel implements HttpChannel {
class ReactorNetty4NonStreamingHttpChannel implements HttpChannel {
private final HttpServerRequest request;
private final HttpServerResponse response;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
private final FluxSink<HttpContent> emitter;

NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink<HttpContent> emitter) {
ReactorNetty4NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink<HttpContent> emitter) {
this.request = request;
this.response = response;
this.emitter = emitter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

class NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent>, Disposable {
class ReactorNetty4NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent>, Disposable {
private final HttpServerRequest request;
private final HttpServerResponse response;
private final CompositeByteBuf content;
Expand All @@ -34,7 +34,7 @@ class NonStreamingRequestConsumer<T extends HttpContent> implements Consumer<T>,
private final AtomicBoolean disposed = new AtomicBoolean(false);
private volatile FluxSink<HttpContent> emitter;

NonStreamingRequestConsumer(
ReactorNetty4NonStreamingRequestConsumer(
AbstractHttpServerTransport transport,
HttpServerRequest request,
HttpServerResponse response,
Expand Down Expand Up @@ -64,12 +64,12 @@ public void accept(T message) {
}
}

public void process(HttpContent in, FluxSink<HttpContent> emitter) {
void process(HttpContent in, FluxSink<HttpContent> emitter) {
// Consume request body in full before dispatching it
content.addComponent(true, in.content().retain());

if (in instanceof LastHttpContent) {
final NonStreamingHttpChannel channel = new NonStreamingHttpChannel(request, response, emitter);
final ReactorNetty4NonStreamingHttpChannel channel = new ReactorNetty4NonStreamingHttpChannel(request, response, emitter);
final HttpRequest r = createRequest(request, content);

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.reactor.netty4;

import org.opensearch.common.concurrent.CompletableContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.http.HttpChunk;
import org.opensearch.http.HttpResponse;
import org.opensearch.http.StreamingHttpChannel;
import org.opensearch.transport.reactor.netty4.Netty4Utils;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;

import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

class ReactorNetty4StreamingHttpChannel implements StreamingHttpChannel {
private final HttpServerRequest request;
private final HttpServerResponse response;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
private final Publisher<HttpChunk> receiver;
private final StreamingHttpContentSender sender;
private volatile FluxSink<HttpChunk> producer;
private volatile boolean lastChunkReceived = false;

ReactorNetty4StreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, StreamingHttpContentSender sender) {
this.request = request;
this.response = response;
this.sender = sender;
this.receiver = Flux.create(producer -> this.producer = producer);
this.request.withConnection(connection -> Netty4Utils.addListener(connection.channel().closeFuture(), closeContext));
}

@Override
public boolean isOpen() {
return true;
}

@Override
public void close() {
request.withConnection(connection -> connection.channel().close());
}

@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}

@Override
public void sendChunk(HttpChunk chunk, ActionListener<Void> listener) {
sender.send(createContent(chunk), listener, chunk.isLast());
}

@Override
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
sender.send(createContent(response), listener, true);
}

@Override
public void prepareResponse(int status, Map<String, List<String>> headers) {
this.response.status(status);
headers.forEach((k, vs) -> vs.forEach(v -> this.response.addHeader(k, v)));
}

@Override
public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) response.remoteAddress();
}

@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) response.hostAddress();
}

@Override
public void receiveChunk(HttpChunk message) {
try {
if (lastChunkReceived) {
return;
}

producer.next(message);
if (message.isLast()) {
lastChunkReceived = true;
producer.complete();
}
} finally {
message.close();
}
}

@Override
public boolean isReadable() {
return producer != null;
}

@Override
public boolean isWritable() {
return sender.isReady();
}

@Override
public void subscribe(Subscriber<? super HttpChunk> subscriber) {
receiver.subscribe(subscriber);
}

private static HttpContent createContent(HttpResponse response) {
final FullHttpResponse fullHttpResponse = (FullHttpResponse) response;
return new DefaultHttpContent(fullHttpResponse.content());
}

private static HttpContent createContent(HttpChunk chunk) {
return new DefaultHttpContent(Unpooled.copiedBuffer(BytesReference.toByteBuffers(chunk.content())));
}
}
Loading

0 comments on commit 11cc4fa

Please sign in to comment.