Skip to content

Commit

Permalink
Event num updates (#62)
Browse files Browse the repository at this point in the history
- Making sure event number reported in Sync is close to what is being
sent in the data plane (regardless of whether sequential, user-specified
or usec event numbers are used)
- Added ability to optionally use usec since UNIX Epoch as event numbers
in LB header and Sync packets (while still using user-specified or
sequential in RE header)
- Added entropy test for system clock to make sure last 8 lsb of
microsecond timestamps are sufficiently random
- Added ability to further randomize the timestamps if entropy not
sufficient
- Various fixes
  - snifgen listening filter fix
  • Loading branch information
ibaldin authored Oct 10, 2024
2 parents 175cdb5 + 01edc08 commit beb6aed
Show file tree
Hide file tree
Showing 14 changed files with 214 additions and 35 deletions.
2 changes: 1 addition & 1 deletion Doxyfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ PROJECT_NAME = "E2SAR"
# could be handy for archiving the generated documentation or if some version
# control system is used.

PROJECT_NUMBER = 0.1.2
PROJECT_NUMBER = 0.1.3

# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a
Expand Down
12 changes: 11 additions & 1 deletion bin/e2sar_perf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ int main(int argc, char **argv)
int sockBufSize;
int durationSec;
bool withCP;
bool zeroRate;
bool usecAsEventNum;
std::string sndrcvIP;
std::string iniFile;
u_int16_t recvStartPort;
Expand Down Expand Up @@ -323,6 +325,8 @@ int main(int argc, char **argv)
opts("ipv6,6", "force using IPv6 control plane address if URI specifies hostname (disables cert validation) [s,r]");
opts("ipv4,4", "force using IPv4 control plane address if URI specifies hostname (disables cert validation) [s,r]");
opts("novalidate,v", "don't validate server certificate");
opts("zerorate,z", po::bool_switch()->default_value(false),"report zero event number change rate in Sync messages [s]");
opts("usec", po::bool_switch()->default_value(false),"use usec clock samples as event numbers in Sync and LB messages [s]");

po::variables_map vm;

Expand Down Expand Up @@ -364,6 +368,8 @@ int main(int argc, char **argv)
}

withCP = vm["withcp"].as<bool>();
zeroRate = vm["zerorate"].as<bool>();
usecAsEventNum = vm["usec"].as<bool>();

bool preferV6 = false;
if (vm.count("ipv6"))
Expand Down Expand Up @@ -434,8 +440,12 @@ int main(int argc, char **argv)
sflags.mtu = mtu;
sflags.sndSocketBufSize = sockBufSize;
sflags.numSendSockets = numSockets;
sflags.zeroRate = zeroRate;
sflags.usecAsEventNum = usecAsEventNum;
}
std::cout << "Control plane will be " << (sflags.useCP ? "ON" : "OFF") << std::endl;
std::cout << "Control plane " << (sflags.useCP ? "ON" : "OFF") << std::endl;
std::cout << "Event rate reporting in Sync " << (sflags.zeroRate ? "OFF" : "ON") << std::endl;
std::cout << "Using usecs as event numbers " << (sflags.usecAsEventNum ? "ON" : "OFF") << std::endl;
std::cout << (sflags.useCP ? "*** Make sure the LB has been reserved and the URI reflects the reserved instance information." :
"*** Make sure the URI reflects proper data address, other parts are ignored.") << std::endl;

Expand Down
2 changes: 1 addition & 1 deletion docs
Submodule docs updated 43 files
+1 −1 docs/annotated.html
+1 −1 docs/classdetail_1_1_e2_s_a_r_errorc__category.html
+1 −1 docs/classe2sar_1_1_e2_s_a_r_exception.html
+1 −1 docs/classe2sar_1_1_ejfat_u_r_i.html
+1 −1 docs/classe2sar_1_1_l_b_manager.html
+1 −1 docs/classe2sar_1_1_net_util.html
+1 −1 docs/classe2sar_1_1_reassembler.html
+1 −1 docs/classe2sar_1_1_segmenter.html
+1 −1 docs/classes.html
+1 −1 docs/dir_68267d1309a1af8e8297ef4c3efbcdba.html
+1 −1 docs/dir_d44c64559bbebec7f509842c48db8b23.html
+1 −1 docs/e2sar_8hpp_source.html
+4 −4 docs/e2sar_c_p_8hpp_source.html
+6 −6 docs/e2sar_d_p_reassembler_8hpp_source.html
+322 −281 docs/e2sar_d_p_segmenter_8hpp_source.html
+1 −1 docs/e2sar_error_8hpp_source.html
+1 −1 docs/e2sar_headers_8hpp_source.html
+1 −1 docs/e2sar_net_util_8hpp_source.html
+488 −459 docs/e2sar_util_8hpp_source.html
+1 −1 docs/files.html
+1 −1 docs/functions.html
+1 −1 docs/functions_func.html
+1 −1 docs/hierarchy.html
+1 −1 docs/index.html
+1 −1 docs/namespacee2sar.html
+1 −1 docs/namespacemembers.html
+1 −1 docs/namespacemembers_enum.html
+1 −1 docs/namespacemembers_func.html
+1 −1 docs/namespacemembers_type.html
+1 −1 docs/portable__endian_8h_source.html
+1 −1 docs/structe2sar_1_1_e2_s_a_r_error_info.html
+1 −1 docs/structe2sar_1_1_l_b_hdr.html
+1 −1 docs/structe2sar_1_1_l_b_r_e_hdr.html
+1 −1 docs/structe2sar_1_1_l_b_status.html
+1 −1 docs/structe2sar_1_1_l_b_worker_status.html
+1 −1 docs/structe2sar_1_1_overview_entry.html
+1 −1 docs/structe2sar_1_1_r_e_hdr.html
+1 −1 docs/structe2sar_1_1_reassembler_1_1_reassembler_flags.html
+9 −1 docs/structe2sar_1_1_segmenter_1_1_segmenter_flags.html
+1 −1 docs/structe2sar_1_1_sync_hdr.html
+1 −1 docs/structe2sar_1_1pair__equal.html
+1 −1 docs/structe2sar_1_1pair__hash.html
+1 −1 docs/structstd_1_1is__error__code__enum_3_01e2sar_1_1_e2_s_a_r_errorc_01_4.html
62 changes: 56 additions & 6 deletions include/e2sarDPSegmenter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,15 @@ namespace e2sar
boost::atomic<UnixTimeNano_t> currentSyncStartNano{0};
boost::atomic<EventNum_t> eventsInCurrentSync{0};

// current event number
boost::atomic<EventNum_t> eventNum{0};
// currently user-assigned or sequential event number at enqueuing and reported in RE header
boost::atomic<EventNum_t> userEventNum{0};
// current event number being sent and reported in LB header
boost::atomic<EventNum_t> lbEventNum{0};

// fast random number generator
boost::random::ranlux24_base ranlux;
// to get better entropy in usec clock samples (if needed)
boost::random::uniform_int_distribution<> lsbDist{0, 255};

//
// atomic struct for stat information for sync and send threads
Expand Down Expand Up @@ -143,6 +150,8 @@ namespace e2sar
result<int> _close();
result<int> _send(SyncHdr *hdr);
void _threadBody();


};
friend struct SyncThreadState;

Expand Down Expand Up @@ -190,14 +199,16 @@ namespace e2sar
boost::random::uniform_int_distribution<> randDist{0, std::numeric_limits<u_int16_t>::max()};
// to get random port numbers we skip low numbered privileged ports
boost::random::uniform_int_distribution<> portDist{10000, std::numeric_limits<u_int16_t>::max()};
// to get better entropy in usec clock samples (if needed)
boost::random::uniform_int_distribution<> lsbDist{0, 255};

inline SendThreadState(Segmenter &s, bool v6, bool zc, u_int16_t mtu, bool cnct=true):
seg{s}, connectSocket{cnct}, useV6{v6}, useZerocopy{zc}, mtu{mtu},
maxPldLen{mtu - TOTAL_HDR_LEN}, socketFd4(s.numSendSockets),
socketFd6(s.numSendSockets), ranlux{static_cast<u_int32_t>(std::time(0))}
{
// this way every segmenter send thread has a unique PRNG sequence
auto nowT = boost::chrono::high_resolution_clock::now();
auto nowT = boost::chrono::system_clock::now();
ranlux.seed(boost::chrono::duration_cast<boost::chrono::nanoseconds>(nowT.time_since_epoch()).count());
}

Expand All @@ -208,7 +219,7 @@ namespace e2sar
socketFd6(s.numSendSockets), ranlux{static_cast<u_int32_t>(std::time(0))}
{
// this way every segmenter send thread has a unique PRNG sequence
auto nowT = boost::chrono::high_resolution_clock::now();
auto nowT = boost::chrono::system_clock::now();
ranlux.seed(boost::chrono::duration_cast<boost::chrono::nanoseconds>(nowT.time_since_epoch()).count());
}

Expand All @@ -230,6 +241,12 @@ namespace e2sar
boost::condition_variable sendThreadCond;
// use control plane (can be disabled for debugging)
bool useCP;
// report zero event number change rate in Sync
bool zeroRate;
// use usec clock samples as event numbers in Sync and LB
bool usecAsEventNum;
// use additional entropy in the clock samples
bool addEntropy;

/**
* Check the sanity of constructor parameters
Expand Down Expand Up @@ -262,6 +279,8 @@ namespace e2sar
* - zeroCopy - use zeroCopy send optimization {false}
* - connectedSocket - use connected sockets {true}
* - useCP - enable control plane to send Sync packets {true}
* - zeroRate - don't provide event number change rate in Sync {false}
* - clockAsEventNum - use usec clock samples as event numbers in LB and Sync packets {false}
* - syncPeriodMs - sync thread period in milliseconds {1000}
* - syncPerods - number of sync periods to use for averaging reported send rate {2}
* - mtu - size of the MTU to attempt to fit the segmented data in (must accommodate
Expand All @@ -277,14 +296,17 @@ namespace e2sar
bool zeroCopy;
bool connectedSocket;
bool useCP;
bool zeroRate;
bool usecAsEventNum;
u_int16_t syncPeriodMs;
u_int16_t syncPeriods;
u_int16_t mtu;
size_t numSendSockets;
int sndSocketBufSize;

SegmenterFlags(): dpV6{false}, zeroCopy{false}, connectedSocket{true},
useCP{true}, syncPeriodMs{1000}, syncPeriods{2}, mtu{1500},
useCP{true}, zeroRate{false}, usecAsEventNum{false},
syncPeriodMs{1000}, syncPeriods{2}, mtu{1500},
numSendSockets{4}, sndSocketBufSize{1024*1024*3} {}
/**
* Initialize flags from an INI file
Expand Down Expand Up @@ -428,6 +450,9 @@ namespace e2sar

// Calculate the average event rate from circular buffer
// note that locking lives outside this function, as needed.
// NOTE: this is only useful to sync messages if sequential
// event IDs are used. When usec timestamp is used for LB event numbers
// the event is constant 1 MHz
inline EventRate_t eventRate(UnixTimeNano_t currentTimeNanos)
{
// no rate to report
Expand All @@ -453,7 +478,21 @@ namespace e2sar
// never changes past initialization
inline void fillSyncHdr(SyncHdr *hdr, EventRate_t eventRate, UnixTimeNano_t tnano)
{
hdr->set(eventSrcId, eventNum, eventRate, tnano);
EventRate_t reportedRate{0};
if (not zeroRate)
reportedRate = eventRate;
EventNum_t reportedEventNum{lbEventNum};
if (usecAsEventNum)
{
// figure out what event number would be at this moment, don't worry about its entropy
auto nowT = boost::chrono::system_clock::now();
// Convert the time point to microseconds since the epoch
reportedEventNum = boost::chrono::duration_cast<boost::chrono::microseconds>(nowT.time_since_epoch()).count();
if (not zeroRate)
// 1 MHz in this case
reportedRate = 1000000;
}
hdr->set(eventSrcId, reportedEventNum, reportedRate, tnano);
}

// process backlog in return queue and free event queue item blocks on it
Expand All @@ -463,6 +502,17 @@ namespace e2sar
while (returnQueue.pop(item))
queueItemPool.free(item);
}
/**
* Add entropy to a clock sample by randomizing the least 8 bits. Runs in the
* context of send thread.
* @param clockSample - the sample value
* @param ranlux - random number generator
* @return
*/
inline int_least64_t addClockEntropy(int_least64_t clockSample)
{
return (clockSample & ~0xFF) | lsbDist(ranlux);
}
};
}
#endif
37 changes: 37 additions & 0 deletions include/e2sarUtil.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <boost/url.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/asio.hpp>
#include <boost/chrono.hpp>
#include <boost/thread.hpp>

#include "e2sarError.hpp"

Expand Down Expand Up @@ -467,5 +469,40 @@ namespace e2sar
return lhs.first == rhs.first && lhs.second == rhs.second;
}
};

/**
* clock entropy test to validate that system clock produces sufficient randomness
* in the least 8 bits of the microsecond timestamp (required by LB). Normally runs
* for 10 seconds to collect 5k samples
* @param totalTests - number of samples to collect
* @param sleepMs - number of milliseconds to sleep between each sample
* @return entropy in bits
*/
static inline float clockEntropyTest(int totalTests = 5000, int sleepMs = 1)
{
std::vector<int_least64_t> points;
std::vector<int> bins(256, 0);

for (int i = 0; i < totalTests; i++)
{
auto now = boost::chrono::system_clock::now();
auto nowUsec = boost::chrono::duration_cast<boost::chrono::microseconds>(now.time_since_epoch()).count();
bins[nowUsec & 0xff]++;
auto until = now + boost::chrono::milliseconds(sleepMs);
boost::this_thread::sleep_until(until);
}

// compute the probabilities and entropy
float entropy{0.0};
for (size_t i = 0; i < bins.size(); i++)
{
float prob = static_cast<float>(bins[i])/(totalTests*1.0);
entropy += prob * std::log(prob);
}

// normalize into bits
entropy *= -1.0/std::log(2);
return entropy;
}
};
#endif
2 changes: 1 addition & 1 deletion meson.build
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
project('E2SAR', 'cpp',
version: '0.1.2', default_options : ['cpp_std=c++17'])
version: '0.1.3', default_options : ['cpp_std=c++17'])

compiler = meson.get_compiler('cpp')

Expand Down
19 changes: 7 additions & 12 deletions scripts/notebooks/EJFAT/E2SAR-live-lb-tester.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,8 @@
"open_ports = {\n",
"}\n",
"\n",
"# additional accounts and their public keys - they get sudo rights and docker,\n",
"# their public keys are expected to reside under ssh-keys/ in a file\n",
"# named after the account.\n",
"# The list has the form of ['user1', 'user2'] where user1 and user2 accounts\n",
"# will be created on the system. Under ssh-keys/ there should be two files\n",
"# named 'user1' and 'user2' each containing the SSH public key for that user. \n",
"accounts = []\n",
"\n",
"# url of e2sar deps. Find the appropriate version for the OS at https://github.com/JeffersonLab/E2SAR/releases\n",
"e2sar_branch = \"e2sar-perf-with-cp\"\n",
"e2sar_branch = \"main\"\n",
"static_release_url = 'https://github.com/JeffersonLab/E2SAR/releases/download/' # don't need to change this\n",
"e2sar_dep_artifcat = 'e2sar-deps_0.1.1_amd64.deb'\n",
"e2sar_release_ver = 'E2SAR-0.1.1'\n",
Expand Down Expand Up @@ -920,11 +912,11 @@
"# for e2sar_perf only the data= part of the query is meaningful. sync= must be present but is ignored\n",
"# same for gRPC token, address and port (and lb id)\n",
"e2sarPerfURI = f\"ejfat://[email protected]:1234/lb/1?data={recver_addr}&sync=192.168.77.7:1234\"\n",
"recverDuration = 20\n",
"recverDuration = 40\n",
"mtu = 9000\n",
"rate = 15 # Gbps\n",
"length = 1000000 # event length in bytes\n",
"numEvents = 10000 # number of events to send\n",
"numEvents = 20000 # number of events to send\n",
"bufSize = 300 * 1024 * 1024 # 100MB send and receive buffers\n",
"\n",
"recv_command = f\"cd E2SAR; PATH=$HOME/.local/bin:/usr/local/bin:$PATH LD_LIBRARY_PATH=/usr/local/lib/ ./build/bin/e2sar_perf -r -u '{e2sarPerfURI}' -d {recverDuration} -b {bufSize} --ip {recver_addr} --port 19522\"\n",
Expand Down Expand Up @@ -1143,7 +1135,10 @@
"# Given that in FABRIC ejfat-lb.es.net resolves to IP6 first and gRPC C++ library doesn't\n",
"# offer granular control over which resolved address is used, we use -4 option to tell the\n",
"# code to use the IPv4 address, but this also disables cert validation.\n",
"send_command = f\"{e2sar_perf} -s -u '{instance_uri}' --mtu {mtu} --rate {rate} --length {length} -n {numEvents} -b {bufSize} --ip {sender_addr} --sockets {numSocks} --withcp -4\"\n",
"# Sender options of interest:\n",
"# -z - send 0 event rate in Sync messages\n",
"# --usec - use usec-precision timestamp as event numbers in Sync and LB messages \n",
"send_command = f\"{e2sar_perf} -s -u '{instance_uri}' --mtu {mtu} --rate {rate} --length {length} -n {numEvents} -b {bufSize} --ip {sender_addr} --sockets {numSocks} --withcp -4 --usec\"\n",
"\n",
"for recver, recver_addr in zip(recvers, recver_addrs):\n",
" recv_command = f\"{e2sar_perf} -r -u '{instance_uri}' -d {recverDuration} -b {bufSize} --ip {recver_addr} --port {startPort} --withcp -4 --threads {recvThreads}\"\n",
Expand Down
8 changes: 6 additions & 2 deletions scripts/scapy/snifgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def packet_callback(packet):
operations = parser.add_mutually_exclusive_group(required=True)
operations.add_argument("-l", "--listen", action="store_true", help="listen for incoming packets and try to parse and validate them")
operations.add_argument("-g", "--generate", action="store_true", help="generate new packets of specific types")
operations.add_argument("-a", "--parse", action="store_true", help="parse a pcap file. The recommended way to capture is something like this 'sudo tcpdump -s 200 -tttt -i enp7s0 udp \\( dst port 19522 or dst port 19523 \\) -w e2sar.pcap'")
operations.add_argument("-a", "--parse", action="store_true", help="parse a pcap file. The recommended way to capture is something like this 'sudo tcpdump -s 200 -tttt -i enp7s0 udp and \\( dst port 19522 or dst port 19523 \\) -w e2sar.pcap'")
parser.add_argument("-p", "--port", action="store", default=19522, type=int, help="UDP data port (only port for --lbre, starting port for --re)")
parser.add_argument("-y", "--syncport", action="store", help="UDP sync port", default=19010, type=int)
parser.add_argument("-n", "--nports", action="store", type=int, default=1, help="number of ports starting with -p to listen on for --re")
Expand Down Expand Up @@ -275,7 +275,11 @@ def packet_callback(packet):
send(p)
elif args.listen:
# craft a filter
listeningPorts = [x + args.port for x in range(0, args.nports)]
listeningPorts = list()
if args.lbre or args.re or args.lbresync:
listeningPorts = [x + args.port for x in range(0, args.nports)]
if args.sync or args.lbresync:
listeningPorts.append(args.syncport)
portFilter = "or".join([f" dst port {port} " for port in listeningPorts])
if args.ip:
filter = f'udp and dst host {args.ip} and \\( {portFilter} \\)'
Expand Down
4 changes: 4 additions & 0 deletions segmenter_config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ useCP = true
syncPeriodMS = 1000
; number of sync periods to use for averaging reported send rate
syncPeriods = 2
; report zero event number change rate in sync packets
zeroRate = false
; use usec clock samples as event numbers in LB and Sync packets
usecAsEventNum = true

[data-plane]
; prefer V6 dataplane if the URI specifies both data=<ipv4>&data=<ipv6> addresses
Expand Down
5 changes: 3 additions & 2 deletions src/e2sarDPReassembler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ namespace e2sar
void Reassembler::SendStateThreadState::_threadBody()
{
// get the time
auto nowT = boost::chrono::high_resolution_clock::now();
auto nowT = boost::chrono::system_clock::now();
auto nowUsec = boost::chrono::duration_cast<boost::chrono::microseconds>(nowT.time_since_epoch()).count();
UnixTimeMicro_t currentTimeMicros = static_cast<UnixTimeMicro_t>(nowUsec);

Expand Down Expand Up @@ -436,7 +436,7 @@ namespace e2sar
// fillPercent is always reported as sampled in the current moment

// Get the current time point
auto nowT = boost::chrono::high_resolution_clock::now();
auto nowT = boost::chrono::system_clock::now();
auto nowUsec = boost::chrono::duration_cast<boost::chrono::microseconds>(nowT.time_since_epoch()).count();
UnixTimeMicro_t currentTimeMicros = static_cast<UnixTimeMicro_t>(nowUsec);

Expand Down Expand Up @@ -522,6 +522,7 @@ namespace e2sar

auto eventItem = dequeue();

// try to dequeue for a bit
while (eventItem == nullptr && !threadsStop && !overtime)
{
recvThreadCond.wait_for(condLock, boost::chrono::milliseconds(recvWaitTimeout_ms));
Expand Down
Loading

0 comments on commit beb6aed

Please sign in to comment.