Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modernize NucleusLib async callbacks using lambdas #1539

Merged
merged 15 commits into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions Sources/Plasma/NucleusLib/pnAsyncCore/Private/pnAcIo.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ You can contact Cyan Worlds, Inc. by email [email protected]
#define PLASMA20_SOURCES_PLASMA_NUCLEUSLIB_PNASYNCCORE_PRIVATE_PNACIO_H

#include <functional>
#include <optional>

#include "pnNetCommon/plNetAddress.h"
#include "pnUUID/pnUUID.h"
Expand All @@ -74,13 +75,6 @@ constexpr unsigned kAsyncSocketBufferSize = 1460;
*
***/

enum EAsyncNotifySocket {
kNotifySocketConnectFailed,
kNotifySocketConnectSuccess,
kNotifySocketDisconnect,
kNotifySocketRead,
};

struct AsyncNotifySocket {};

struct AsyncNotifySocketConnect : AsyncNotifySocket {
Expand All @@ -103,12 +97,14 @@ struct AsyncNotifySocketWrite : AsyncNotifySocketRead {
AsyncNotifySocketWrite() : AsyncNotifySocketRead(), bytesCommitted() { }
};

/*! \brief return false to disconnect
\param sock
\param code
\param notify
*/
using FAsyncNotifySocketProc = std::function<bool(AsyncSocket, EAsyncNotifySocket, AsyncNotifySocket*)>;
class AsyncNotifySocketCallbacks
{
public:
virtual void AsyncNotifySocketConnectFailed(plNetAddress remoteAddr) = 0;
virtual bool AsyncNotifySocketConnectSuccess(AsyncSocket sock, plNetAddress localAddr, plNetAddress remoteAddr) = 0;
dgelessus marked this conversation as resolved.
Show resolved Hide resolved
virtual void AsyncNotifySocketDisconnect(AsyncSocket sock) = 0;
virtual std::optional<size_t> AsyncNotifySocketRead(AsyncSocket sock, uint8_t* buffer, size_t bytes) = 0;
};


/****************************************************************************
Expand All @@ -120,7 +116,7 @@ using FAsyncNotifySocketProc = std::function<bool(AsyncSocket, EAsyncNotifySocke
void AsyncSocketConnect (
AsyncCancelId * cancelId,
const plNetAddress& netAddr,
FAsyncNotifySocketProc notifyProc,
AsyncNotifySocketCallbacks* callbacks,
const void * sendData = nullptr,
unsigned sendBytes = 0
);
Expand Down
57 changes: 28 additions & 29 deletions Sources/Plasma/NucleusLib/pnAsyncCoreExe/pnAceSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ struct ConnectOperation
tcp::socket fSock;
plNetAddress fRemoteAddr;
AsyncCancelId fCancelId;
FAsyncNotifySocketProc fNotifyProc;
AsyncNotifySocketCallbacks* fCallbacks;
std::vector<uint8_t> fConnectBuffer;
unsigned int fConnectionType;

ConnectOperation(asio::io_context& context)
: fSock(context), fCancelId(), fConnectionType()
: fSock(context), fCancelId(), fCallbacks(), fConnectionType()
{ }
ConnectOperation(const ConnectOperation&) = delete;
};
Expand All @@ -130,7 +130,7 @@ struct AsyncSocketStruct
{
std::recursive_mutex fCritsect;
tcp::socket fSock;
FAsyncNotifySocketProc fNotifyProc;
AsyncNotifySocketCallbacks* fCallbacks;
unsigned int fConnectionType;
uint8_t fBuffer[kAsyncSocketBufferSize];
size_t fBytesLeft;
Expand All @@ -139,7 +139,7 @@ struct AsyncSocketStruct
unsigned closeTimeMs;

AsyncSocketStruct(ConnectOperation& op)
: fSock(std::move(op.fSock)), fNotifyProc(op.fNotifyProc),
: fSock(std::move(op.fSock)), fCallbacks(op.fCallbacks),
fConnectionType(op.fConnectionType),
fBuffer(), fBytesLeft(), initTimeMs(), closeTimeMs()
{ }
Expand Down Expand Up @@ -190,14 +190,14 @@ static void SocketStartAsyncRead(AsyncSocket sock)
bool isEOFError = (err.category() == asio::error::get_misc_category() && err.value() == asio::error::eof);
bool isAbortedError = (err.category() == asio::error::get_system_category() && err.value() == asio::error::operation_aborted);
if (isEOFError || isAbortedError) {
if (sock->fNotifyProc) {
if (sock->fCallbacks) {
// We have to be extremely careful from this point because
// sockets can be deleted during the notification callback.
// After this call, the application becomes responsible for
// calling AsyncSocketDelete at some later point in time.
FAsyncNotifySocketProc notifyProc = sock->fNotifyProc;
sock->fNotifyProc = nullptr;
notifyProc((AsyncSocket)sock, kNotifySocketDisconnect, nullptr);
auto callbacks = sock->fCallbacks;
sock->fCallbacks = nullptr;
callbacks->AsyncNotifySocketDisconnect(sock);
}
} else {
LogMsg(kLogError, "Failed to read from socket: {}", err.message());
Expand All @@ -210,28 +210,29 @@ static void SocketStartAsyncRead(AsyncSocket sock)

sock->fBytesLeft += bytes;

AsyncNotifySocketRead notifyRead;
notifyRead.buffer = sock->fBuffer;
notifyRead.bytes = sock->fBytesLeft;

if (!sock->fNotifyProc || !sock->fNotifyProc(sock, kNotifySocketRead, &notifyRead)) {
size_t bytesNotified = sock->fBytesLeft;
std::optional<size_t> res;
if (sock->fCallbacks) {
res = sock->fCallbacks->AsyncNotifySocketRead(sock, sock->fBuffer, bytesNotified);
}
if (!res) {
// No callback, or the callback told us to stop reading
return;
}

// if only some of the bytes were used, then shift
// remaining bytes down. Otherwise, clear the buffer.
sock->fBytesLeft -= notifyRead.bytesProcessed;
size_t bytesProcessed = *res;
sock->fBytesLeft -= bytesProcessed;
if (sock->fBytesLeft != 0) {
if ((sock->fBytesLeft > sizeof(sock->fBuffer)) || ((notifyRead.bytesProcessed + sock->fBytesLeft) > sizeof(sock->fBuffer))) {
if ((sock->fBytesLeft > sizeof(sock->fBuffer)) || ((bytesProcessed + sock->fBytesLeft) > sizeof(sock->fBuffer))) {
LogMsg(kLogError, "SocketDispatchRead error: {} {} {}",
sock->fBytesLeft, notifyRead.bytes, notifyRead.bytesProcessed);
sock->fBytesLeft, bytesNotified, bytesProcessed);
return;
}

if (notifyRead.bytesProcessed) {
memmove(sock->fBuffer, sock->fBuffer + notifyRead.bytesProcessed,
sock->fBytesLeft);
if (bytesProcessed != 0) {
memmove(sock->fBuffer, sock->fBuffer + bytesProcessed, sock->fBytesLeft);
}
}

Expand Down Expand Up @@ -288,9 +289,10 @@ static bool SocketInitConnect(ConnectOperation& op)
}

// perform callback notification
AsyncNotifySocketConnect notify;
SocketGetAddresses(sock.get(), &notify.localAddr, &notify.remoteAddr);
if (!sock->fNotifyProc(sock.get(), kNotifySocketConnectSuccess, &notify)) {
plNetAddress localAddr;
plNetAddress remoteAddr;
SocketGetAddresses(sock.get(), &localAddr, &remoteAddr);
if (!sock->fCallbacks->AsyncNotifySocketConnectSuccess(sock.get(), localAddr, remoteAddr)) {
return false;
}

Expand All @@ -300,10 +302,10 @@ static bool SocketInitConnect(ConnectOperation& op)
}

void AsyncSocketConnect(AsyncCancelId* cancelId, const plNetAddress& netAddr,
FAsyncNotifySocketProc notifyProc,
AsyncNotifySocketCallbacks* callbacks,
const void* sendData, unsigned sendBytes)
{
ASSERT(notifyProc);
ASSERT(callbacks);
ASSERT(s_ioPool);

PerfAddCounter(kAsyncPerfSocketConnAttemptsOutCurr, 1);
Expand All @@ -314,7 +316,7 @@ void AsyncSocketConnect(AsyncCancelId* cancelId, const plNetAddress& netAddr,
hsLockGuard(s_connectCrit);
auto op = s_connectList.emplace(s_connectList.end(), s_ioPool->fContext);
op->fRemoteAddr = netAddr;
op->fNotifyProc = std::move(notifyProc);
op->fCallbacks = callbacks;
if (sendBytes) {
auto sendBuffer = reinterpret_cast<const uint8_t*>(sendData);
op->fConnectBuffer.assign(sendBuffer, sendBuffer + sendBytes);
Expand Down Expand Up @@ -345,10 +347,7 @@ void AsyncSocketConnect(AsyncCancelId* cancelId, const plNetAddress& netAddr,
}

if (!success) {
AsyncNotifySocketConnect failed;
failed.remoteAddr = op->fRemoteAddr;
failed.localAddr.Clear();
op->fNotifyProc(nullptr, kNotifySocketConnectFailed, &failed);
op->fCallbacks->AsyncNotifySocketConnectFailed(op->fRemoteAddr);
}

{
Expand Down
82 changes: 27 additions & 55 deletions Sources/Plasma/PubUtilLib/plNetGameLib/Private/plNglAuth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ namespace Ngl { namespace Auth {
*
***/

struct CliAuConn : hsRefCnt {
struct CliAuConn : hsRefCnt, AsyncNotifySocketCallbacks {
CliAuConn ();
~CliAuConn ();

Expand All @@ -74,11 +74,10 @@ struct CliAuConn : hsRefCnt {
unsigned lastHeardTimeMs;

// Callbacks
void NotifyConnSocketConnect();
void NotifyConnSocketConnectFailed();
void NotifyConnSocketDisconnect();
bool NotifyConnSocketRead(AsyncNotifySocketRead* read);
bool SocketNotifyCallback(AsyncSocket sock, EAsyncNotifySocket code, AsyncNotifySocket* notify);
void AsyncNotifySocketConnectFailed(plNetAddress remoteAddr) override;
bool AsyncNotifySocketConnectSuccess(AsyncSocket sock, plNetAddress localAddr, plNetAddress remoteAddr) override;
void AsyncNotifySocketDisconnect(AsyncSocket sock) override;
std::optional<size_t> AsyncNotifySocketRead(AsyncSocket sock, uint8_t* buffer, size_t bytes) override;

// This function should be called during object construction
// to initiate connection attempts to the remote host whenever
Expand Down Expand Up @@ -1380,18 +1379,31 @@ static bool ConnEncrypt (ENetError error, void * param) {
}

//============================================================================
void CliAuConn::NotifyConnSocketConnect()
bool CliAuConn::AsyncNotifySocketConnectSuccess(AsyncSocket sock, plNetAddress localAddr, plNetAddress remoteAddr)
{
bool wasAbandoned;
{
hsLockGuard(s_critsect);
socket = sock;
cancelId = nullptr;
wasAbandoned = abandoned;
}
if (wasAbandoned) {
AsyncSocketDisconnect(sock, true);
return true;
}

TransferRef("Connecting", "Connected");
cli = NetCliConnectAccept(
socket,
sock,
kNetProtocolCli2Auth,
false,
ConnEncrypt,
0,
nullptr,
this
);
return true;
}

//============================================================================
Expand Down Expand Up @@ -1426,7 +1438,7 @@ static void CheckedReconnect (CliAuConn * conn, ENetError error) {
}

//============================================================================
void CliAuConn::NotifyConnSocketConnectFailed()
void CliAuConn::AsyncNotifySocketConnectFailed(plNetAddress remoteAddr)
{
{
hsLockGuard(s_critsect);
Expand All @@ -1446,7 +1458,7 @@ void CliAuConn::NotifyConnSocketConnectFailed()
}

//============================================================================
void CliAuConn::NotifyConnSocketDisconnect()
void CliAuConn::AsyncNotifySocketDisconnect(AsyncSocket sock)
{
StopAutoPing();

Expand All @@ -1468,52 +1480,14 @@ void CliAuConn::NotifyConnSocketDisconnect()
}

//============================================================================
bool CliAuConn::NotifyConnSocketRead(AsyncNotifySocketRead* read)
std::optional<size_t> CliAuConn::AsyncNotifySocketRead(AsyncSocket sock, uint8_t* buffer, size_t bytes)
{
// TODO: Only dispatch messages from the active auth server
lastHeardTimeMs = GetNonZeroTimeMs();
bool result = NetCliDispatch(cli, read->buffer, read->bytes, this);
read->bytesProcessed += read->bytes;
return result;
}

//============================================================================
bool CliAuConn::SocketNotifyCallback(
AsyncSocket sock,
EAsyncNotifySocket code,
AsyncNotifySocket* notify
) {
bool result = true;
switch (code) {
case kNotifySocketConnectSuccess:
bool wasAbandoned;
{
hsLockGuard(s_critsect);
socket = sock;
cancelId = nullptr;
wasAbandoned = abandoned;
}
if (wasAbandoned) {
AsyncSocketDisconnect(sock, true);
} else {
NotifyConnSocketConnect();
}
break;

case kNotifySocketConnectFailed:
NotifyConnSocketConnectFailed();
break;

case kNotifySocketDisconnect:
NotifyConnSocketDisconnect();
break;

case kNotifySocketRead:
result = NotifyConnSocketRead((AsyncNotifySocketRead*)notify);
break;
if (!NetCliDispatch(cli, buffer, bytes, this)) {
return {};
}

return result;
return bytes;
}

//============================================================================
Expand Down Expand Up @@ -1548,9 +1522,7 @@ static void Connect (
AsyncSocketConnect(
&conn->cancelId,
conn->addr,
[conn](auto sock, auto code, auto notify) {
return conn->SocketNotifyCallback(sock, code, notify);
},
conn,
&connect,
sizeof(connect)
);
Expand Down
Loading