Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

[REEF-362] Implement a Wake Transport using HTTP #1341

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
48ec577
[REEF-362] Implement a Wake Transport using HTTP
nhne Jul 4, 2017
56d0650
Added a test for RemoteManager with Http Transport
nhne Jul 25, 2017
4994107
separated NettyChannelFutureListener from NettyLink
nhne Aug 14, 2017
d3f7a46
Refactored Netty Codes for HTTP
nhne Aug 14, 2017
1ecda33
Removed HttpMessagingTransportFactory and Refactored Code
nhne Aug 17, 2017
9510d61
Removed package reef.wake.remote.transport.netty.http
nhne Aug 17, 2017
31e5efa
fixed errors regarding to review
nhne Aug 17, 2017
bc42c3d
fixed errors regarding to review
nhne Aug 17, 2017
5b3f1b5
Merge branch 'REEF-362' of https://github.com/nhne/reef into REEF-362
nhne Aug 17, 2017
1943609
Fixed errors and typos
nhne Aug 21, 2017
dc36180
added a java doc and fixed format error
nhne Aug 22, 2017
abd316d
reverted some interface back
nhne Aug 22, 2017
7ce5b33
Fixed implementation to accept PROTOCOL_HTTPS
nhne Aug 22, 2017
14121e7
fixed TransportHttpTest
nhne Aug 23, 2017
476ebe1
Fixed String into ProtocolTypes
nhne Aug 23, 2017
2befc1a
Fixed reflexing reviews
nhne Aug 23, 2017
df5fea5
implemented NettyLinkFactory and its default class
nhne Aug 23, 2017
985ad1a
added NettyHttpLinkFactory
nhne Aug 24, 2017
8fd67f6
excluded HTTPS related codes
nhne Aug 24, 2017
37a9ada
removed ssl part
nhne Sep 1, 2017
40f335d
Merge branch 'master' of https://github.com/apache/reef into REEF-362
nhne Sep 1, 2017
817fa0e
changed constant String into enum and altered NettyChannelInitializer
nhne Sep 1, 2017
3674ffa
Merge branch 'master' of https://github.com/apache/reef into REEF-362
nhne Sep 4, 2017
cdaadcc
Merge branch 'REEF-362' of https://github.com/nhne/reef into REEF-362
nhne Sep 4, 2017
4990972
changed TransportFactory to use protocol as constructor parameter
nhne Sep 6, 2017
9e9260a
refactored TransportTest and merged TransportHttpTest into TransportTest
nhne Sep 6, 2017
29344b6
merged RemoteMangerTestHttp into RemoteManagerTest
nhne Sep 14, 2017
017154e
changed implementation of ChannelInitializer, Http Event Listener
nhne Nov 23, 2017
f221dbb
fixed NettyChannelInitializer and Event Listener, NettyHttpLink
nhne Nov 24, 2017
455464a
changed implementation in Netty[HttpServer,Client]EventListener
nhne Dec 5, 2017
bd3b205
removed check on lenth of content
nhne Dec 6, 2017
2225679
copiedBuffer to wrappedBuffer
nhne Dec 7, 2017
b20df37
removed sync() in NettyHttpLink
nhne Jan 4, 2018
94fb7bc
removed `sync()` in NettyHttpLink
nhne Jan 4, 2018
046739d
check isLoggable
nhne Jan 5, 2018
645b898
Fixed bug on uri creation
nhne Feb 11, 2018
bd2030a
Added Exception e into throwing exception
nhne Feb 22, 2018
6541d3c
Add `final` on Argument statement
nhne Apr 1, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.wake.remote.transport.netty;

import io.netty.channel.Channel;
import org.apache.reef.wake.remote.Encoder;
import org.apache.reef.wake.remote.transport.Link;
import org.apache.reef.wake.remote.transport.LinkListener;

import java.net.URI;

/**
* Factory that creates a NettyLink.
*/
public final class NettyDefaultLinkFactory<T> implements NettyLinkFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you create HttpLinkFactory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created NettyHttpLinkFactory. Thank you!


private final URI uri;

NettyDefaultLinkFactory() {
this(null);
}

NettyDefaultLinkFactory(final URI uri){
this.uri = uri;
}

@Override
public Link newInstance(final Channel channel, final Encoder encoder) {
return uri == null ?
new NettyLink<T>(channel, encoder) :
new NettyHttpLink<T>(channel, encoder, uri);
}

@Override
public Link newInstance(final Channel channel,
final Encoder encoder,
final LinkListener listener) {
return uri == null ?
new NettyLink<T>(channel, encoder, listener) :
new NettyHttpLink<T>(channel, encoder, listener, uri);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.wake.remote.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.http.*;
import org.apache.reef.wake.remote.Encoder;
import org.apache.reef.wake.remote.transport.Link;
import org.apache.reef.wake.remote.transport.LinkListener;

import java.net.SocketAddress;
import java.net.URI;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Link implementation with Netty.
*
* If you set a {@code LinkListener<T>}, it keeps message until writeAndFlush operation completes
* and notifies whether the sent message transferred successfully through the listener.
*/
public class NettyHttpLink<T> implements Link<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final


public static final int INT_SIZE = Integer.SIZE / Byte.SIZE;

private static final Logger LOG = Logger.getLogger(NettyHttpLink.class.getName());

private final Channel channel;
private final Encoder<? super T> encoder;
private final LinkListener<? super T> listener;
private final URI uri;

/**
* Constructs a link.
*
* @param channel the channel
* @param encoder the encoder
* @param uri the URI
*/
public NettyHttpLink(final Channel channel,
final Encoder<? super T> encoder,
final URI uri) {
this(channel, encoder, null, uri);
}

/**
* Constructs a link.
*
* @param channel the channel
* @param encoder the encoder
* @param listener the link listener
* @param uri the URI
*/
public NettyHttpLink(
final Channel channel,
final Encoder<? super T> encoder,
final LinkListener<? super T> listener,
final URI uri) {
this.channel = channel;
this.encoder = encoder;
this.listener = listener;
this.uri = uri;
}
/**
* Writes the message to this link.
*
* @param message the message
*/
@Override
public void write(final T message) {
LOG.log(Level.FINEST, "write {0} :: {1}", new Object[] {channel, message});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add the following code?

if (LOG.isLoggable(Level.FINEST)) {

try {
final FullHttpRequest request =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath());
final ByteBuf buf = Unpooled.copiedBuffer(encoder.encode(message));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you use wrappedBuffer? copiedBuffer performs additional byte copy.

request.headers()
.set(HttpHeaders.Names.HOST, uri.getHost())
.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE)
.set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP)
.set(HttpHeaders.Names.CONTENT_TYPE, "application/wake-transport")
.set(HttpHeaders.Names.CONTENT_LENGTH, buf.readableBytes());
request.content().clear().writeBytes(buf);
final ChannelFuture future = channel.writeAndFlush(request);
future.sync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why sync here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to throw exception when the result of future is failure by sync(). but i found its order is weird because listener must be added before its sync!

if (listener != null) {
future.addListener(new NettyChannelFutureListener<>(message, listener));
}
} catch (final InterruptedException ex) {
LOG.log(Level.SEVERE, "Cannot send request to " + uri.getHost(), ex);
}
}

/**
* Gets a local address of the link.
*
* @return a local socket address
*/
@Override
public SocketAddress getLocalAddress() {
return channel.localAddress();
}

/**
* Gets a remote address of the link.
*
* @return a remote socket address
*/
@Override
public SocketAddress getRemoteAddress() {
return channel.remoteAddress();
}

@Override
public String toString() {
return "NettyLink: " + channel; // Channel has good .toString() implementation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a StringBuffer

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ final class NettyHttpServerEventListener extends AbstractNettyEventListener {
private HttpRequest httpRequest;
private final StringBuilder buf = new StringBuilder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Do we have to define it as a member variable?

private final URI uri;
private final NettyLinkFactory linkFactory;

NettyHttpServerEventListener(
final ConcurrentMap<SocketAddress, LinkReference> addrToLinkRefMap,
final EStage<TransportEvent> stage,
final URI uri) {
super(addrToLinkRefMap, stage);
this.uri = uri;
this.linkFactory = new NettyDefaultLinkFactory<>(uri);
}


Expand All @@ -61,8 +63,8 @@ public void channelActive(final ChannelHandlerContext ctx) {
}

this.addrToLinkRefMap.putIfAbsent(
channel.remoteAddress(), new LinkReference(new NettyLink<>(
channel, new ByteCodec(), new LoggingLinkListener<byte[]>(), uri)));
channel.remoteAddress(), new LinkReference(linkFactory.newInstance(
channel, new ByteCodec(), new LoggingLinkListener<byte[]>())));

LOG.log(Level.FINER, "Add connected channel ref: {0}", this.addrToLinkRefMap.get(channel.remoteAddress()));

Expand Down Expand Up @@ -144,7 +146,7 @@ private static void appendDecoderResult(final StringBuilder buf, final HttpObjec

@Override
protected TransportEvent getTransportEvent(final byte[] message, final Channel channel) {
return new TransportEvent(message, new NettyLink<>(channel, new ByteEncoder()));
return new TransportEvent(message, linkFactory.newInstance(channel, new ByteEncoder()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@
*/
package org.apache.reef.wake.remote.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.http.*;
import org.apache.reef.wake.remote.Encoder;
import org.apache.reef.wake.remote.transport.Link;
import org.apache.reef.wake.remote.transport.LinkListener;

import java.net.SocketAddress;
import java.net.URI;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -47,7 +44,6 @@ public class NettyLink<T> implements Link<T> {
private final Channel channel;
private final Encoder<? super T> encoder;
private final LinkListener<? super T> listener;
private final URI uri;

/**
* Constructs a link.
Expand All @@ -67,27 +63,11 @@ public NettyLink(final Channel channel, final Encoder<? super T> encoder) {
* @param listener the link listener
*/
public NettyLink(final Channel channel, final Encoder<? super T> encoder, final LinkListener<? super T> listener) {
this(channel, encoder, listener, null);
}

/**
* Constructs a link.
*
* @param channel the channel
* @param encoder the encoder
* @param listener the link listener
* @param uri the URI
*/
public NettyLink(
final Channel channel,
final Encoder<? super T> encoder,
final LinkListener<? super T> listener,
final URI uri) {
this.channel = channel;
this.encoder = encoder;
this.listener = listener;
this.uri = uri;
}

/**
* Writes the message to this link.
*
Expand All @@ -96,32 +76,9 @@ public NettyLink(
@Override
public void write(final T message) {
LOG.log(Level.FINEST, "write {0} :: {1}", new Object[] {channel, message});

if (this.uri != null) {
try {
final FullHttpRequest request =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath());
final ByteBuf buf = Unpooled.copiedBuffer(encoder.encode(message));
request.headers()
.set(HttpHeaders.Names.HOST, uri.getHost())
.set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE)
.set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP)
.set(HttpHeaders.Names.CONTENT_TYPE, "application/wake-transport")
.set(HttpHeaders.Names.CONTENT_LENGTH, buf.readableBytes());
request.content().clear().writeBytes(buf);
final ChannelFuture future = channel.writeAndFlush(request);
future.sync();
if (listener != null) {
future.addListener(new NettyChannelFutureListener<>(message, listener));
}
} catch (final InterruptedException ex) {
LOG.log(Level.SEVERE, "Cannot send request to " + uri.getHost(), ex);
}
} else {
final ChannelFuture future = channel.writeAndFlush(Unpooled.wrappedBuffer(encoder.encode(message)));
if (listener != null) {
future.addListener(new NettyChannelFutureListener<>(message, listener));
}
final ChannelFuture future = channel.writeAndFlush(Unpooled.wrappedBuffer(encoder.encode(message)));
if (listener != null) {
future.addListener(new NettyChannelFutureListener<>(message, listener));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.wake.remote.transport.netty;

import io.netty.channel.Channel;
import org.apache.reef.tang.annotations.DefaultImplementation;
import org.apache.reef.wake.remote.Encoder;
import org.apache.reef.wake.remote.transport.Link;
import org.apache.reef.wake.remote.transport.LinkListener;

/**
* Factory that creates a NettyLink.
*/
@DefaultImplementation(NettyDefaultLinkFactory.class)
public interface NettyLinkFactory<T> {

/**
* Creates a NettyLink.
* @param channel the channel
* @param encoder the encoder
*/
Link<T> newInstance(final Channel channel, final Encoder<? super T> encoder);

/**
* Creates a NettyLink.
* @param channel the channel
* @param encoder the encoder
* @param listener the listener
*/
Link<T> newInstance(final Channel channel,
final Encoder<? super T> encoder,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pleas align the lines

final LinkListener<? super T> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,9 @@ public <T> Link<T> open(final SocketAddress remoteAddr, final Encoder<? super T>
connectFuture = this.clientBootstrap.connect(remoteAddr);
connectFuture.syncUninterruptibly();

link = new NettyLink<>(connectFuture.channel(), encoder, listener, this.uri);
final NettyLinkFactory linkFactory = new NettyDefaultLinkFactory<>(uri);

link = linkFactory.newInstance(connectFuture.channel(), encoder, listener);
linkRef.setLink(link);

synchronized (flag) {
Expand Down
Loading