From 01cec7af38120aef19a661dea796111ce863d9cd Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Tue, 8 Oct 2024 14:53:05 -0400 Subject: [PATCH 01/19] More careful handling of event numbers (sequential) to avoid gaps in what is reported in Sync messages and what is seen in LB headers --- include/e2sarDPSegmenter.hpp | 10 +++++++--- src/e2sarDPSegmenter.cpp | 11 +++++++---- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/include/e2sarDPSegmenter.hpp b/include/e2sarDPSegmenter.hpp index 0e55532..8267fcb 100644 --- a/include/e2sarDPSegmenter.hpp +++ b/include/e2sarDPSegmenter.hpp @@ -94,8 +94,10 @@ namespace e2sar boost::atomic currentSyncStartNano{0}; boost::atomic eventsInCurrentSync{0}; - // current event number - boost::atomic eventNum{0}; + // currently assigned event number at enqueuing + boost::atomic assignedEventNum{0}; + // current event number being sent + boost::atomic sentEventNum{0}; // // atomic struct for stat information for sync and send threads @@ -453,7 +455,9 @@ namespace e2sar // never changes past initialization inline void fillSyncHdr(SyncHdr *hdr, EventRate_t eventRate, UnixTimeNano_t tnano) { - hdr->set(eventSrcId, eventNum, eventRate, tnano); + // we use sentEventNum which is updated by the send thread to be + // more accurate about event numbers being seen by LB + hdr->set(eventSrcId, sentEventNum, eventRate, tnano); } // process backlog in return queue and free event queue item blocks on it diff --git a/src/e2sarDPSegmenter.cpp b/src/e2sarDPSegmenter.cpp index 5814c2a..49e822e 100644 --- a/src/e2sarDPSegmenter.cpp +++ b/src/e2sarDPSegmenter.cpp @@ -490,6 +490,9 @@ namespace e2sar u_int8_t *eventEnd = event + bytes; size_t curLen = (bytes <= maxPldLen ? bytes : maxPldLen); + // update the event number being reported in Sync packets + seg.sentEventNum = eventNum; + // break up event into a series of datagrams prepended with LB+RE header while (curOffset < eventEnd) { @@ -564,12 +567,12 @@ namespace e2sar freeEventItemBacklog(); // reset local event number to override if (_eventNum != 0) - eventNum.exchange(_eventNum); + assignedEventNum.exchange(_eventNum); // use specified event number and dataId return sendThreadState._send(event, bytes, // continue incrementing - eventNum++, + assignedEventNum++, (_dataId == 0 ? dataId : _dataId), entropy); } @@ -583,7 +586,7 @@ namespace e2sar freeEventItemBacklog(); // reset local event number to override if (_eventNum != 0) - eventNum.exchange(_eventNum); + assignedEventNum.exchange(_eventNum); EventQueueItem *item = queueItemPool.construct(); item->bytes = bytes; item->event = event; @@ -591,7 +594,7 @@ namespace e2sar item->callback = callback; item->cbArg = cbArg; // continue incrementing - item->eventNum = eventNum++; + item->eventNum = assignedEventNum++; item->dataId = (_dataId == 0 ? dataId : _dataId); eventQueue.push(item); // wake up send thread (no need to hold the lock as queue is lock_free) From 144fdf3ebb4f8bed372399e93ea2a3e1eae5b06a Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Tue, 8 Oct 2024 15:52:50 -0400 Subject: [PATCH 02/19] Added segmenter flags for reporting zero event number change rate and using usec clock samples as event numbers in sync and lb packets (the latter not yet in use) --- include/e2sarDPSegmenter.hpp | 19 +++++++++++++++++-- segmenter_config.ini | 4 ++++ src/e2sarDPSegmenter.cpp | 8 +++++++- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/include/e2sarDPSegmenter.hpp b/include/e2sarDPSegmenter.hpp index 8267fcb..e535972 100644 --- a/include/e2sarDPSegmenter.hpp +++ b/include/e2sarDPSegmenter.hpp @@ -232,6 +232,10 @@ namespace e2sar boost::condition_variable sendThreadCond; // use control plane (can be disabled for debugging) bool useCP; + // report zero event number change rate in Sync + bool zeroRate; + // use usec clock samples as event numbers in Sync and LB + bool clockAsEventNum; /** * Check the sanity of constructor parameters @@ -264,6 +268,8 @@ namespace e2sar * - zeroCopy - use zeroCopy send optimization {false} * - connectedSocket - use connected sockets {true} * - useCP - enable control plane to send Sync packets {true} + * - zeroRate - don't provide event number change rate in Sync {false} + * - clockAsEventNum - use usec clock samples as event numbers in LB and Sync packets {true} * - syncPeriodMs - sync thread period in milliseconds {1000} * - syncPerods - number of sync periods to use for averaging reported send rate {2} * - mtu - size of the MTU to attempt to fit the segmented data in (must accommodate @@ -279,6 +285,8 @@ namespace e2sar bool zeroCopy; bool connectedSocket; bool useCP; + bool zeroRate; + bool clockAsEventNum; u_int16_t syncPeriodMs; u_int16_t syncPeriods; u_int16_t mtu; @@ -286,7 +294,8 @@ namespace e2sar int sndSocketBufSize; SegmenterFlags(): dpV6{false}, zeroCopy{false}, connectedSocket{true}, - useCP{true}, syncPeriodMs{1000}, syncPeriods{2}, mtu{1500}, + useCP{true}, zeroRate{false}, clockAsEventNum{true}, + syncPeriodMs{1000}, syncPeriods{2}, mtu{1500}, numSendSockets{4}, sndSocketBufSize{1024*1024*3} {} /** * Initialize flags from an INI file @@ -430,6 +439,9 @@ namespace e2sar // Calculate the average event rate from circular buffer // note that locking lives outside this function, as needed. + // NOTE: this is only useful to sync messages if sequential + // event IDs are used. When usec timestamp is used for LB event numbers + // the event is constant 1 MHz inline EventRate_t eventRate(UnixTimeNano_t currentTimeNanos) { // no rate to report @@ -457,7 +469,10 @@ namespace e2sar { // we use sentEventNum which is updated by the send thread to be // more accurate about event numbers being seen by LB - hdr->set(eventSrcId, sentEventNum, eventRate, tnano); + if (zeroRate) + hdr->set(eventSrcId, sentEventNum, 0, tnano); + else + hdr->set(eventSrcId, sentEventNum, eventRate, tnano); } // process backlog in return queue and free event queue item blocks on it diff --git a/segmenter_config.ini b/segmenter_config.ini index cf5b297..69355f5 100644 --- a/segmenter_config.ini +++ b/segmenter_config.ini @@ -7,6 +7,10 @@ useCP = true syncPeriodMS = 1000 ; number of sync periods to use for averaging reported send rate syncPeriods = 2 +; report zero event number change rate in sync packets +zeroRate = false +; use usec clock samples as event numbers in LB and Sync packets +clockAsEventNum = true [data-plane] ; prefer V6 dataplane if the URI specifies both data=&data= addresses diff --git a/src/e2sarDPSegmenter.cpp b/src/e2sarDPSegmenter.cpp index 49e822e..9794473 100644 --- a/src/e2sarDPSegmenter.cpp +++ b/src/e2sarDPSegmenter.cpp @@ -26,7 +26,9 @@ namespace e2sar eventStatsBuffer{sflags.syncPeriods}, syncThreadState(*this, sflags.syncPeriodMs, sflags.connectedSocket), sendThreadState(*this, sflags.dpV6, sflags.zeroCopy, sflags.mtu, sflags.connectedSocket), - useCP{sflags.useCP} + useCP{sflags.useCP}, + zeroRate{sflags.zeroRate}, + clockAsEventNum{sflags.clockAsEventNum} { sanityChecks(); } @@ -622,6 +624,10 @@ namespace e2sar sFlags.syncPeriods); sFlags.syncPeriodMs = paramTree.get("control-plane.syncPeriodMS", sFlags.syncPeriodMs); + sFlags.zeroRate = paramTree.get("control-plane.zeroRate", + sFlags.zeroRate); + sFlags.clockAsEventNum = paramTree.get("control-plane.clockAsEventNum", + sFlags.clockAsEventNum); // data plane sFlags.dpV6 = paramTree.get("data-plane.dpV6", sFlags.dpV6); From cde032a319d5fdf0b50399326d20ebb6d64dbc84 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Tue, 8 Oct 2024 16:07:29 -0400 Subject: [PATCH 03/19] Added zerorate and usecaseventnum flags to e2sar_perf --- bin/e2sar_perf.cpp | 10 ++++++++++ include/e2sarDPSegmenter.hpp | 8 ++++---- segmenter_config.ini | 2 +- src/e2sarDPSegmenter.cpp | 6 +++--- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/bin/e2sar_perf.cpp b/bin/e2sar_perf.cpp index df0f802..6e3d312 100644 --- a/bin/e2sar_perf.cpp +++ b/bin/e2sar_perf.cpp @@ -294,6 +294,8 @@ int main(int argc, char **argv) int sockBufSize; int durationSec; bool withCP; + bool zeroRate; + bool usecAsEventNum; std::string sndrcvIP; std::string iniFile; u_int16_t recvStartPort; @@ -323,6 +325,8 @@ int main(int argc, char **argv) opts("ipv6,6", "force using IPv6 control plane address if URI specifies hostname (disables cert validation) [s,r]"); opts("ipv4,4", "force using IPv4 control plane address if URI specifies hostname (disables cert validation) [s,r]"); opts("novalidate,v", "don't validate server certificate"); + opts("zerorate,z", po::bool_switch()->default_value(false),"report zero event number change rate in Sync messages [s]"); + opts("usec", po::bool_switch()->default_value(false),"use usec clock samples as event numbers in Sync and LB messages [s]"); po::variables_map vm; @@ -364,6 +368,8 @@ int main(int argc, char **argv) } withCP = vm["withcp"].as(); + zeroRate = vm["zerorate"].as(); + usecAsEventNum = vm["usec"].as(); bool preferV6 = false; if (vm.count("ipv6")) @@ -434,8 +440,12 @@ int main(int argc, char **argv) sflags.mtu = mtu; sflags.sndSocketBufSize = sockBufSize; sflags.numSendSockets = numSockets; + sflags.zeroRate = zeroRate; + sflags.usecAsEventNum = usecAsEventNum; } std::cout << "Control plane will be " << (sflags.useCP ? "ON" : "OFF") << std::endl; + std::cout << "\tEvent rate reporting " << (sflags.zeroRate ? "OFF" : "ON") << std::endl; + std::cout << "\tUsing usecs as event numbers " << (sflags.usecAsEventNum ? "ON" : "OFF") << std::endl; std::cout << (sflags.useCP ? "*** Make sure the LB has been reserved and the URI reflects the reserved instance information." : "*** Make sure the URI reflects proper data address, other parts are ignored.") << std::endl; diff --git a/include/e2sarDPSegmenter.hpp b/include/e2sarDPSegmenter.hpp index e535972..b1ecc34 100644 --- a/include/e2sarDPSegmenter.hpp +++ b/include/e2sarDPSegmenter.hpp @@ -235,7 +235,7 @@ namespace e2sar // report zero event number change rate in Sync bool zeroRate; // use usec clock samples as event numbers in Sync and LB - bool clockAsEventNum; + bool usecAsEventNum; /** * Check the sanity of constructor parameters @@ -269,7 +269,7 @@ namespace e2sar * - connectedSocket - use connected sockets {true} * - useCP - enable control plane to send Sync packets {true} * - zeroRate - don't provide event number change rate in Sync {false} - * - clockAsEventNum - use usec clock samples as event numbers in LB and Sync packets {true} + * - clockAsEventNum - use usec clock samples as event numbers in LB and Sync packets {false} * - syncPeriodMs - sync thread period in milliseconds {1000} * - syncPerods - number of sync periods to use for averaging reported send rate {2} * - mtu - size of the MTU to attempt to fit the segmented data in (must accommodate @@ -286,7 +286,7 @@ namespace e2sar bool connectedSocket; bool useCP; bool zeroRate; - bool clockAsEventNum; + bool usecAsEventNum; u_int16_t syncPeriodMs; u_int16_t syncPeriods; u_int16_t mtu; @@ -294,7 +294,7 @@ namespace e2sar int sndSocketBufSize; SegmenterFlags(): dpV6{false}, zeroCopy{false}, connectedSocket{true}, - useCP{true}, zeroRate{false}, clockAsEventNum{true}, + useCP{true}, zeroRate{false}, usecAsEventNum{false}, syncPeriodMs{1000}, syncPeriods{2}, mtu{1500}, numSendSockets{4}, sndSocketBufSize{1024*1024*3} {} /** diff --git a/segmenter_config.ini b/segmenter_config.ini index 69355f5..35722ae 100644 --- a/segmenter_config.ini +++ b/segmenter_config.ini @@ -10,7 +10,7 @@ syncPeriods = 2 ; report zero event number change rate in sync packets zeroRate = false ; use usec clock samples as event numbers in LB and Sync packets -clockAsEventNum = true +usecAsEventNum = true [data-plane] ; prefer V6 dataplane if the URI specifies both data=&data= addresses diff --git a/src/e2sarDPSegmenter.cpp b/src/e2sarDPSegmenter.cpp index 9794473..14dbe65 100644 --- a/src/e2sarDPSegmenter.cpp +++ b/src/e2sarDPSegmenter.cpp @@ -28,7 +28,7 @@ namespace e2sar sendThreadState(*this, sflags.dpV6, sflags.zeroCopy, sflags.mtu, sflags.connectedSocket), useCP{sflags.useCP}, zeroRate{sflags.zeroRate}, - clockAsEventNum{sflags.clockAsEventNum} + usecAsEventNum{sflags.usecAsEventNum} { sanityChecks(); } @@ -626,8 +626,8 @@ namespace e2sar sFlags.syncPeriodMs); sFlags.zeroRate = paramTree.get("control-plane.zeroRate", sFlags.zeroRate); - sFlags.clockAsEventNum = paramTree.get("control-plane.clockAsEventNum", - sFlags.clockAsEventNum); + sFlags.usecAsEventNum = paramTree.get("control-plane.usecAsEventNum", + sFlags.usecAsEventNum); // data plane sFlags.dpV6 = paramTree.get("data-plane.dpV6", sFlags.dpV6); From 391cd8528139a5de24ef70f6fc44f4a1339d17d6 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Tue, 8 Oct 2024 17:06:55 -0400 Subject: [PATCH 04/19] Correcting filter generation for listening --- scripts/scapy/snifgen.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/scripts/scapy/snifgen.py b/scripts/scapy/snifgen.py index a61e7fd..c91d118 100755 --- a/scripts/scapy/snifgen.py +++ b/scripts/scapy/snifgen.py @@ -275,7 +275,11 @@ def packet_callback(packet): send(p) elif args.listen: # craft a filter - listeningPorts = [x + args.port for x in range(0, args.nports)] + listeningPorts = list() + if args.lbre or args.re or args.lbresync: + listeningPorts = [x + args.port for x in range(0, args.nports)] + if args.sync or args.lbresync: + listeningPorts.append(args.syncport) portFilter = "or".join([f" dst port {port} " for port in listeningPorts]) if args.ip: filter = f'udp and dst host {args.ip} and \\( {portFilter} \\)' From a6dd3727fb5bafcf39617885981400322c2c07b5 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Tue, 8 Oct 2024 20:47:32 -0400 Subject: [PATCH 05/19] Adjusting clock types to make sure we use system_clock when time since UNIX epoch is needed --- bin/e2sar_perf.cpp | 4 ++-- include/e2sarDPSegmenter.hpp | 4 ++-- src/e2sarDPReassembler.cpp | 5 +++-- src/e2sarDPSegmenter.cpp | 2 +- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/bin/e2sar_perf.cpp b/bin/e2sar_perf.cpp index 6e3d312..a87c939 100644 --- a/bin/e2sar_perf.cpp +++ b/bin/e2sar_perf.cpp @@ -444,8 +444,8 @@ int main(int argc, char **argv) sflags.usecAsEventNum = usecAsEventNum; } std::cout << "Control plane will be " << (sflags.useCP ? "ON" : "OFF") << std::endl; - std::cout << "\tEvent rate reporting " << (sflags.zeroRate ? "OFF" : "ON") << std::endl; - std::cout << "\tUsing usecs as event numbers " << (sflags.usecAsEventNum ? "ON" : "OFF") << std::endl; + std::cout << "Event rate reporting in Sync " << (sflags.zeroRate ? "OFF" : "ON") << std::endl; + std::cout << "Using usecs as event numbers " << (sflags.usecAsEventNum ? "ON" : "OFF") << std::endl; std::cout << (sflags.useCP ? "*** Make sure the LB has been reserved and the URI reflects the reserved instance information." : "*** Make sure the URI reflects proper data address, other parts are ignored.") << std::endl; diff --git a/include/e2sarDPSegmenter.hpp b/include/e2sarDPSegmenter.hpp index b1ecc34..0464e34 100644 --- a/include/e2sarDPSegmenter.hpp +++ b/include/e2sarDPSegmenter.hpp @@ -199,7 +199,7 @@ namespace e2sar socketFd6(s.numSendSockets), ranlux{static_cast(std::time(0))} { // this way every segmenter send thread has a unique PRNG sequence - auto nowT = boost::chrono::high_resolution_clock::now(); + auto nowT = boost::chrono::system_clock::now(); ranlux.seed(boost::chrono::duration_cast(nowT.time_since_epoch()).count()); } @@ -210,7 +210,7 @@ namespace e2sar socketFd6(s.numSendSockets), ranlux{static_cast(std::time(0))} { // this way every segmenter send thread has a unique PRNG sequence - auto nowT = boost::chrono::high_resolution_clock::now(); + auto nowT = boost::chrono::system_clock::now(); ranlux.seed(boost::chrono::duration_cast(nowT.time_since_epoch()).count()); } diff --git a/src/e2sarDPReassembler.cpp b/src/e2sarDPReassembler.cpp index 39537b0..35a8c4c 100644 --- a/src/e2sarDPReassembler.cpp +++ b/src/e2sarDPReassembler.cpp @@ -402,7 +402,7 @@ namespace e2sar void Reassembler::SendStateThreadState::_threadBody() { // get the time - auto nowT = boost::chrono::high_resolution_clock::now(); + auto nowT = boost::chrono::system_clock::now(); auto nowUsec = boost::chrono::duration_cast(nowT.time_since_epoch()).count(); UnixTimeMicro_t currentTimeMicros = static_cast(nowUsec); @@ -436,7 +436,7 @@ namespace e2sar // fillPercent is always reported as sampled in the current moment // Get the current time point - auto nowT = boost::chrono::high_resolution_clock::now(); + auto nowT = boost::chrono::system_clock::now(); auto nowUsec = boost::chrono::duration_cast(nowT.time_since_epoch()).count(); UnixTimeMicro_t currentTimeMicros = static_cast(nowUsec); @@ -522,6 +522,7 @@ namespace e2sar auto eventItem = dequeue(); + // try to dequeue for a bit while (eventItem == nullptr && !threadsStop && !overtime) { recvThreadCond.wait_for(condLock, boost::chrono::milliseconds(recvWaitTimeout_ms)); diff --git a/src/e2sarDPSegmenter.cpp b/src/e2sarDPSegmenter.cpp index 14dbe65..ccbaa28 100644 --- a/src/e2sarDPSegmenter.cpp +++ b/src/e2sarDPSegmenter.cpp @@ -89,7 +89,7 @@ namespace e2sar while(!seg.threadsStop) { // Get the current time point - auto nowT = boost::chrono::high_resolution_clock::now(); + auto nowT = boost::chrono::system_clock::now(); // Convert the time point to nanoseconds since the epoch auto now = boost::chrono::duration_cast(nowT.time_since_epoch()).count(); UnixTimeNano_t currentTimeNanos = static_cast(now); From c2ac43358e04b5eaf5506d79c58cf138601c3973 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Wed, 9 Oct 2024 14:19:36 -0400 Subject: [PATCH 06/19] Added the option to use usecs as LB and Sync event numbers; Added entropy test for the clock and entropy enhancements in case they are not sufficiently random --- include/e2sarDPSegmenter.hpp | 34 ++++++++++++++++++++----- include/e2sarUtil.hpp | 37 +++++++++++++++++++++++++++ src/e2sarDPSegmenter.cpp | 34 ++++++++++++++++++------- test/boost_test.cpp | 49 ++++++++++++++++++++++++++++++++++++ 4 files changed, 139 insertions(+), 15 deletions(-) diff --git a/include/e2sarDPSegmenter.hpp b/include/e2sarDPSegmenter.hpp index 0464e34..d29ceb5 100644 --- a/include/e2sarDPSegmenter.hpp +++ b/include/e2sarDPSegmenter.hpp @@ -94,10 +94,15 @@ namespace e2sar boost::atomic currentSyncStartNano{0}; boost::atomic eventsInCurrentSync{0}; - // currently assigned event number at enqueuing - boost::atomic assignedEventNum{0}; - // current event number being sent - boost::atomic sentEventNum{0}; + // currently user-assigned or sequential event number at enqueuing and reported in RE header + boost::atomic userEventNum{0}; + // current event number being sent and reported in LB header + boost::atomic lbEventNum{0}; + + // fast random number generator + boost::random::ranlux24_base ranlux; + // to get better entropy in usec clock samples (if needed) + boost::random::uniform_int_distribution<> lsbDist{0, 255}; // // atomic struct for stat information for sync and send threads @@ -145,6 +150,8 @@ namespace e2sar result _close(); result _send(SyncHdr *hdr); void _threadBody(); + + }; friend struct SyncThreadState; @@ -192,6 +199,8 @@ namespace e2sar boost::random::uniform_int_distribution<> randDist{0, std::numeric_limits::max()}; // to get random port numbers we skip low numbered privileged ports boost::random::uniform_int_distribution<> portDist{10000, std::numeric_limits::max()}; + // to get better entropy in usec clock samples (if needed) + boost::random::uniform_int_distribution<> lsbDist{0, 255}; inline SendThreadState(Segmenter &s, bool v6, bool zc, u_int16_t mtu, bool cnct=true): seg{s}, connectSocket{cnct}, useV6{v6}, useZerocopy{zc}, mtu{mtu}, @@ -236,6 +245,8 @@ namespace e2sar bool zeroRate; // use usec clock samples as event numbers in Sync and LB bool usecAsEventNum; + // use additional entropy in the clock samples + bool addEntropy; /** * Check the sanity of constructor parameters @@ -470,9 +481,9 @@ namespace e2sar // we use sentEventNum which is updated by the send thread to be // more accurate about event numbers being seen by LB if (zeroRate) - hdr->set(eventSrcId, sentEventNum, 0, tnano); + hdr->set(eventSrcId, lbEventNum, 0, tnano); else - hdr->set(eventSrcId, sentEventNum, eventRate, tnano); + hdr->set(eventSrcId, lbEventNum, eventRate, tnano); } // process backlog in return queue and free event queue item blocks on it @@ -482,6 +493,17 @@ namespace e2sar while (returnQueue.pop(item)) queueItemPool.free(item); } + /** + * Add entropy to a clock sample by randomizing the least 8 bits. Runs in the + * context of send thread. + * @param clockSample - the sample value + * @param ranlux - random number generator + * @return + */ + inline int_least64_t addClockEntropy(int_least64_t clockSample) + { + return (clockSample & ~0xFF) | lsbDist(ranlux); + } }; } #endif \ No newline at end of file diff --git a/include/e2sarUtil.hpp b/include/e2sarUtil.hpp index e33556b..e2d4a22 100644 --- a/include/e2sarUtil.hpp +++ b/include/e2sarUtil.hpp @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include "e2sarError.hpp" @@ -467,5 +469,40 @@ namespace e2sar return lhs.first == rhs.first && lhs.second == rhs.second; } }; + + /** + * clock entropy test to validate that system clock produces sufficient randomness + * in the least 8 bits of the microsecond timestamp (required by LB). Normally runs + * for 10 seconds to collect 5k samples + * @param totalTests - number of samples to collect + * @param sleepMs - number of milliseconds to sleep between each sample + * @return entropy in bits + */ + static inline float clockEntropyTest(int totalTests = 5000, int sleepMs = 1) + { + std::vector points; + std::vector bins(256, 0); + + for (int i = 0; i < totalTests; i++) + { + auto now = boost::chrono::system_clock::now(); + auto nowUsec = boost::chrono::duration_cast(now.time_since_epoch()).count(); + bins[nowUsec & 0xff]++; + auto until = now + boost::chrono::milliseconds(sleepMs); + boost::this_thread::sleep_until(until); + } + + // compute the probabilities and entropy + float entropy{0.0}; + for (int i = 0; i < bins.size(); i++) + { + float prob = static_cast(bins[i])/(totalTests*1.0); + entropy += prob * std::log(prob); + } + + // normalize into bits + entropy *= -1.0/std::log(2); + return entropy; + } }; #endif \ No newline at end of file diff --git a/src/e2sarDPSegmenter.cpp b/src/e2sarDPSegmenter.cpp index ccbaa28..5d8b00e 100644 --- a/src/e2sarDPSegmenter.cpp +++ b/src/e2sarDPSegmenter.cpp @@ -28,7 +28,8 @@ namespace e2sar sendThreadState(*this, sflags.dpV6, sflags.zeroCopy, sflags.mtu, sflags.connectedSocket), useCP{sflags.useCP}, zeroRate{sflags.zeroRate}, - usecAsEventNum{sflags.usecAsEventNum} + usecAsEventNum{sflags.usecAsEventNum}, + addEntropy{(clockEntropyTest() > 6 ? false : true)} { sanityChecks(); } @@ -44,7 +45,10 @@ namespace e2sar eventStatsBuffer{sflags.syncPeriods}, syncThreadState(*this, sflags.syncPeriodMs, sflags.connectedSocket), sendThreadState(*this, sflags.dpV6, sflags.zeroCopy, sflags.mtu, sflags.connectedSocket), - useCP{sflags.useCP} + useCP{sflags.useCP}, + zeroRate{sflags.zeroRate}, + usecAsEventNum{sflags.usecAsEventNum}, + addEntropy{(clockEntropyTest() > 6 ? false : true)} { // FIXME: get the MTU from interface and attempt to set as outgoing (on Linux). @@ -492,8 +496,20 @@ namespace e2sar u_int8_t *eventEnd = event + bytes; size_t curLen = (bytes <= maxPldLen ? bytes : maxPldLen); - // update the event number being reported in Sync packets - seg.sentEventNum = eventNum; + // update the event number being reported in Sync and LB packets + if (seg.usecAsEventNum) + { + // use microseconds since the UNIX Epoch, but make sure the entropy + // of the 8lsb is sufficient (for LB) + // Get the current time point + auto nowT = boost::chrono::system_clock::now(); + // Convert the time point to microseconds since the epoch + auto now = boost::chrono::duration_cast(nowT.time_since_epoch()).count(); + if (seg.addEntropy) + seg.lbEventNum = seg.addClockEntropy(now); + } else + // use current user-specified or sequential event number + seg.lbEventNum = eventNum; // break up event into a series of datagrams prepended with LB+RE header while (curOffset < eventEnd) @@ -503,7 +519,7 @@ namespace e2sar // note that buffer length is in fact event length, hence 3rd parameter is 'bytes' hdr->re.set(dataId, curOffset - event, bytes, eventNum); - hdr->lb.set(entropy, eventNum); + hdr->lb.set(entropy, seg.lbEventNum); // fill in iov and attach to msghdr // LB+RE header @@ -569,12 +585,12 @@ namespace e2sar freeEventItemBacklog(); // reset local event number to override if (_eventNum != 0) - assignedEventNum.exchange(_eventNum); + userEventNum.exchange(_eventNum); // use specified event number and dataId return sendThreadState._send(event, bytes, // continue incrementing - assignedEventNum++, + userEventNum++, (_dataId == 0 ? dataId : _dataId), entropy); } @@ -588,7 +604,7 @@ namespace e2sar freeEventItemBacklog(); // reset local event number to override if (_eventNum != 0) - assignedEventNum.exchange(_eventNum); + userEventNum.exchange(_eventNum); EventQueueItem *item = queueItemPool.construct(); item->bytes = bytes; item->event = event; @@ -596,7 +612,7 @@ namespace e2sar item->callback = callback; item->cbArg = cbArg; // continue incrementing - item->eventNum = assignedEventNum++; + item->eventNum = userEventNum++; item->dataId = (_dataId == 0 ? dataId : _dataId); eventQueue.push(item); // wake up send thread (no need to hold the lock as queue is lock_free) diff --git a/test/boost_test.cpp b/test/boost_test.cpp index 3bfe761..05a3579 100644 --- a/test/boost_test.cpp +++ b/test/boost_test.cpp @@ -14,6 +14,9 @@ #include #include #include +#include +#include +#include #include @@ -348,5 +351,51 @@ int main() std::cout << "Retrieved " << ret.first << ":" << ret.second << std::endl; delete res; } + + std::cout << "Clock tests" << std::endl; + + // we want to know if system clock usecs produces enough randomness in the bottom 9 + // bits and if not add randomness in them. + + std::vector points; + int total{10000}; + std::vector bins(256, 0); + + for (int i = 0; i < total; i++) + { + auto now1 = boost::chrono::system_clock::now(); + auto now1Usec = boost::chrono::duration_cast(now1.time_since_epoch()).count(); + bins[now1Usec & 0xff]++; + auto until = now1 + boost::chrono::milliseconds(1); + boost::this_thread::sleep_until(until); + } + + // compute the probabilities and entropy + float prob{0.0}, probsum{0.0}; + float entropy{0.0}; + for (int i = 0; i < bins.size(); i++) + { + prob = static_cast(bins[i])/(total*1.0); + entropy += prob * std::log(prob); + probsum += prob; + } + + entropy *= -1.0/std::log(2); + + std::cout << "Probability sum " << probsum << std::endl; + std::cout << "Entropy is " << entropy << " bits" << std::endl; + + // add entropy to a clock sample + boost::random::ranlux24_base ranlux; + boost::random::uniform_int_distribution<> randDist{0, 255}; + + auto now1 = boost::chrono::system_clock::now(); + auto now1Usec = boost::chrono::duration_cast(now1.time_since_epoch()).count(); + + // replace lower 8 bits + auto addedEntropy = (now1Usec & ~0xFF) | randDist(ranlux); + std::cout << "Original timestamp " << now1Usec << " with added entropy " << addedEntropy << std::endl; + std::cout << "Original & 0xFF " << (now1Usec & 0xFF) << " with added entropy & 0xFF " << (addedEntropy & 0xFF) << std::endl; + } From 65bc51189da80ab06ba7e6559218a95e7de44cd9 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Wed, 9 Oct 2024 14:23:47 -0400 Subject: [PATCH 07/19] Clarifying help message --- scripts/scapy/snifgen.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/scapy/snifgen.py b/scripts/scapy/snifgen.py index c91d118..fd35b4a 100755 --- a/scripts/scapy/snifgen.py +++ b/scripts/scapy/snifgen.py @@ -204,7 +204,7 @@ def packet_callback(packet): operations = parser.add_mutually_exclusive_group(required=True) operations.add_argument("-l", "--listen", action="store_true", help="listen for incoming packets and try to parse and validate them") operations.add_argument("-g", "--generate", action="store_true", help="generate new packets of specific types") - operations.add_argument("-a", "--parse", action="store_true", help="parse a pcap file. The recommended way to capture is something like this 'sudo tcpdump -s 200 -tttt -i enp7s0 udp \\( dst port 19522 or dst port 19523 \\) -w e2sar.pcap'") + operations.add_argument("-a", "--parse", action="store_true", help="parse a pcap file. The recommended way to capture is something like this 'sudo tcpdump -s 200 -tttt -i enp7s0 udp and \\( dst port 19522 or dst port 19523 \\) -w e2sar.pcap'") parser.add_argument("-p", "--port", action="store", default=19522, type=int, help="UDP data port (only port for --lbre, starting port for --re)") parser.add_argument("-y", "--syncport", action="store", help="UDP sync port", default=19010, type=int) parser.add_argument("-n", "--nports", action="store", type=int, default=1, help="number of ports starting with -p to listen on for --re") From bad6e7c11e6520a547979aac07fc624d2e3baf0c Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Wed, 9 Oct 2024 14:29:37 -0400 Subject: [PATCH 08/19] Getting rid of compile warning --- include/e2sarUtil.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/e2sarUtil.hpp b/include/e2sarUtil.hpp index e2d4a22..d6a30a1 100644 --- a/include/e2sarUtil.hpp +++ b/include/e2sarUtil.hpp @@ -494,7 +494,7 @@ namespace e2sar // compute the probabilities and entropy float entropy{0.0}; - for (int i = 0; i < bins.size(); i++) + for (size_t i = 0; i < bins.size(); i++) { float prob = static_cast(bins[i])/(totalTests*1.0); entropy += prob * std::log(prob); From 62d39f860c740a322a5eb4e8cdb202a2f6a019f0 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Wed, 9 Oct 2024 14:37:27 -0400 Subject: [PATCH 09/19] Another compile warning --- test/boost_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/boost_test.cpp b/test/boost_test.cpp index 05a3579..6f9cf8f 100644 --- a/test/boost_test.cpp +++ b/test/boost_test.cpp @@ -373,7 +373,7 @@ int main() // compute the probabilities and entropy float prob{0.0}, probsum{0.0}; float entropy{0.0}; - for (int i = 0; i < bins.size(); i++) + for (size_t i = 0; i < bins.size(); i++) { prob = static_cast(bins[i])/(total*1.0); entropy += prob * std::log(prob); From 2988c0ec2ebc78ca562fc3ed7064cfad9e4e62a1 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Wed, 9 Oct 2024 15:11:07 -0400 Subject: [PATCH 10/19] Fixing a bug --- src/e2sarDPSegmenter.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/e2sarDPSegmenter.cpp b/src/e2sarDPSegmenter.cpp index 5d8b00e..ed6ab59 100644 --- a/src/e2sarDPSegmenter.cpp +++ b/src/e2sarDPSegmenter.cpp @@ -507,6 +507,8 @@ namespace e2sar auto now = boost::chrono::duration_cast(nowT.time_since_epoch()).count(); if (seg.addEntropy) seg.lbEventNum = seg.addClockEntropy(now); + else + seg.lbEventNum = now; } else // use current user-specified or sequential event number seg.lbEventNum = eventNum; From c8804b570e5cd02f647ef5f434a36127412813a3 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Wed, 9 Oct 2024 15:41:59 -0400 Subject: [PATCH 11/19] Deriving sync event number from most recent timestamp --- include/e2sarDPSegmenter.hpp | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/include/e2sarDPSegmenter.hpp b/include/e2sarDPSegmenter.hpp index d29ceb5..701b4c6 100644 --- a/include/e2sarDPSegmenter.hpp +++ b/include/e2sarDPSegmenter.hpp @@ -478,12 +478,18 @@ namespace e2sar // never changes past initialization inline void fillSyncHdr(SyncHdr *hdr, EventRate_t eventRate, UnixTimeNano_t tnano) { - // we use sentEventNum which is updated by the send thread to be - // more accurate about event numbers being seen by LB - if (zeroRate) - hdr->set(eventSrcId, lbEventNum, 0, tnano); - else - hdr->set(eventSrcId, lbEventNum, eventRate, tnano); + EventRate_t reportedRate{0}; + if (not zeroRate) + reportedRate = eventRate; + EventNum_t reportedEventNum{lbEventNum}; + if (usecAsEventNum) + { + // figure out what event number would be at this moment, don't worry about its entropy + auto nowT = boost::chrono::system_clock::now(); + // Convert the time point to microseconds since the epoch + reportedEventNum = boost::chrono::duration_cast(nowT.time_since_epoch()).count(); + } + hdr->set(eventSrcId, reportedEventNum, reportedRate, tnano); } // process backlog in return queue and free event queue item blocks on it From d78a459b6cc2780429e64533bcb06cdda2f768b2 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Wed, 9 Oct 2024 16:13:16 -0400 Subject: [PATCH 12/19] Adding 1MHz rate reporting for usec as event numbers case --- include/e2sarDPSegmenter.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/include/e2sarDPSegmenter.hpp b/include/e2sarDPSegmenter.hpp index 701b4c6..89aade7 100644 --- a/include/e2sarDPSegmenter.hpp +++ b/include/e2sarDPSegmenter.hpp @@ -488,6 +488,9 @@ namespace e2sar auto nowT = boost::chrono::system_clock::now(); // Convert the time point to microseconds since the epoch reportedEventNum = boost::chrono::duration_cast(nowT.time_since_epoch()).count(); + if (not zeroRate) + // 1 MHz in this case + reportedRate = 1000000; } hdr->set(eventSrcId, reportedEventNum, reportedRate, tnano); } From 1d6fa3e7e7196b1b3011da4b33a218a6e4dcac6f Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Wed, 9 Oct 2024 22:22:18 -0400 Subject: [PATCH 13/19] Updated docs --- docs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs b/docs index cc336b8..b3ce7a6 160000 --- a/docs +++ b/docs @@ -1 +1 @@ -Subproject commit cc336b8346707de63fb93017ecfd43abb500789b +Subproject commit b3ce7a610924fb29f453b6c945b5c22cb4d378ce From d2673365f247192044af4b92d506d81fa4158663 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Wed, 9 Oct 2024 22:27:16 -0400 Subject: [PATCH 14/19] Prettied up e2sar_perf outputs; Updated SegmenterFlags in pybind; updated wiki --- bin/e2sar_perf.cpp | 2 +- src/pybind/py_e2sarDP.cpp | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/bin/e2sar_perf.cpp b/bin/e2sar_perf.cpp index a87c939..122fafe 100644 --- a/bin/e2sar_perf.cpp +++ b/bin/e2sar_perf.cpp @@ -443,7 +443,7 @@ int main(int argc, char **argv) sflags.zeroRate = zeroRate; sflags.usecAsEventNum = usecAsEventNum; } - std::cout << "Control plane will be " << (sflags.useCP ? "ON" : "OFF") << std::endl; + std::cout << "Control plane " << (sflags.useCP ? "ON" : "OFF") << std::endl; std::cout << "Event rate reporting in Sync " << (sflags.zeroRate ? "OFF" : "ON") << std::endl; std::cout << "Using usecs as event numbers " << (sflags.usecAsEventNum ? "ON" : "OFF") << std::endl; std::cout << (sflags.useCP ? "*** Make sure the LB has been reserved and the URI reflects the reserved instance information." : diff --git a/src/pybind/py_e2sarDP.cpp b/src/pybind/py_e2sarDP.cpp index ab33028..a29eb8d 100644 --- a/src/pybind/py_e2sarDP.cpp +++ b/src/pybind/py_e2sarDP.cpp @@ -74,6 +74,8 @@ void init_e2sarDP_segmenter(py::module_ &m) .def_readwrite("zeroCopy", &Segmenter::SegmenterFlags::zeroCopy) .def_readwrite("connectedSocket", &Segmenter::SegmenterFlags::connectedSocket) .def_readwrite("useCP", &Segmenter::SegmenterFlags::useCP) + .def_readwrite("zeroRate", &Segmenter::SegmenterFlags::zeroRate) + .def_readwrite("usecAsEventNum", &Segmenter::SegmenterFlags::usecAsEventNum) .def_readwrite("syncPeriodMs", &Segmenter::SegmenterFlags::syncPeriodMs) .def_readwrite("syncPeriods", &Segmenter::SegmenterFlags::syncPeriods) .def_readwrite("mtu", &Segmenter::SegmenterFlags::mtu) From 80338a59c386a7fc806fcc2ebcfa30f1cff01d81 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Wed, 9 Oct 2024 22:32:01 -0400 Subject: [PATCH 15/19] Updated wiki --- wiki | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wiki b/wiki index 4d5ea16..ea08085 160000 --- a/wiki +++ b/wiki @@ -1 +1 @@ -Subproject commit 4d5ea167af16258a4d4fbd897ce1592ae46b0c34 +Subproject commit ea080854957858ad60a368de3d0808477721fab9 From cfe99f80aca99f1c1562d002ba828cb8f1ee5c31 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Wed, 9 Oct 2024 22:33:37 -0400 Subject: [PATCH 16/19] Update version --- meson.build | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/meson.build b/meson.build index a817d36..6691bcd 100644 --- a/meson.build +++ b/meson.build @@ -1,5 +1,5 @@ project('E2SAR', 'cpp', - version: '0.1.2', default_options : ['cpp_std=c++17']) + version: '0.1.3', default_options : ['cpp_std=c++17']) compiler = meson.get_compiler('cpp') From 1479796addc9fec033394dbddace7fdff215bc72 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Wed, 9 Oct 2024 22:34:54 -0400 Subject: [PATCH 17/19] Updated version of docs --- docs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs b/docs index b3ce7a6..199ee4b 160000 --- a/docs +++ b/docs @@ -1 +1 @@ -Subproject commit b3ce7a610924fb29f453b6c945b5c22cb4d378ce +Subproject commit 199ee4b58c383a660afdbd1f6ec10f24e61d2666 From 671e2a0af8c36cbe8f288767ec33e760b5f00bb5 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Thu, 10 Oct 2024 11:24:07 -0400 Subject: [PATCH 18/19] Updated to version 0.1.3 --- Doxyfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doxyfile b/Doxyfile index 9f0d377..8d8559c 100644 --- a/Doxyfile +++ b/Doxyfile @@ -48,7 +48,7 @@ PROJECT_NAME = "E2SAR" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 0.1.2 +PROJECT_NUMBER = 0.1.3 # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a From 01edc08847934ccdeab0515eb3cfb70ff89dd4a1 Mon Sep 17 00:00:00 2001 From: Ilya Baldin Date: Thu, 10 Oct 2024 11:30:37 -0400 Subject: [PATCH 19/19] Minor updates to the notebook for new e2sar_perf options --- .../EJFAT/E2SAR-live-lb-tester.ipynb | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/scripts/notebooks/EJFAT/E2SAR-live-lb-tester.ipynb b/scripts/notebooks/EJFAT/E2SAR-live-lb-tester.ipynb index 1987d81..aacc5cf 100644 --- a/scripts/notebooks/EJFAT/E2SAR-live-lb-tester.ipynb +++ b/scripts/notebooks/EJFAT/E2SAR-live-lb-tester.ipynb @@ -111,16 +111,8 @@ "open_ports = {\n", "}\n", "\n", - "# additional accounts and their public keys - they get sudo rights and docker,\n", - "# their public keys are expected to reside under ssh-keys/ in a file\n", - "# named after the account.\n", - "# The list has the form of ['user1', 'user2'] where user1 and user2 accounts\n", - "# will be created on the system. Under ssh-keys/ there should be two files\n", - "# named 'user1' and 'user2' each containing the SSH public key for that user. \n", - "accounts = []\n", - "\n", "# url of e2sar deps. Find the appropriate version for the OS at https://github.com/JeffersonLab/E2SAR/releases\n", - "e2sar_branch = \"e2sar-perf-with-cp\"\n", + "e2sar_branch = \"main\"\n", "static_release_url = 'https://github.com/JeffersonLab/E2SAR/releases/download/' # don't need to change this\n", "e2sar_dep_artifcat = 'e2sar-deps_0.1.1_amd64.deb'\n", "e2sar_release_ver = 'E2SAR-0.1.1'\n", @@ -920,11 +912,11 @@ "# for e2sar_perf only the data= part of the query is meaningful. sync= must be present but is ignored\n", "# same for gRPC token, address and port (and lb id)\n", "e2sarPerfURI = f\"ejfat://useless@10.10.10.10:1234/lb/1?data={recver_addr}&sync=192.168.77.7:1234\"\n", - "recverDuration = 20\n", + "recverDuration = 40\n", "mtu = 9000\n", "rate = 15 # Gbps\n", "length = 1000000 # event length in bytes\n", - "numEvents = 10000 # number of events to send\n", + "numEvents = 20000 # number of events to send\n", "bufSize = 300 * 1024 * 1024 # 100MB send and receive buffers\n", "\n", "recv_command = f\"cd E2SAR; PATH=$HOME/.local/bin:/usr/local/bin:$PATH LD_LIBRARY_PATH=/usr/local/lib/ ./build/bin/e2sar_perf -r -u '{e2sarPerfURI}' -d {recverDuration} -b {bufSize} --ip {recver_addr} --port 19522\"\n", @@ -1143,7 +1135,10 @@ "# Given that in FABRIC ejfat-lb.es.net resolves to IP6 first and gRPC C++ library doesn't\n", "# offer granular control over which resolved address is used, we use -4 option to tell the\n", "# code to use the IPv4 address, but this also disables cert validation.\n", - "send_command = f\"{e2sar_perf} -s -u '{instance_uri}' --mtu {mtu} --rate {rate} --length {length} -n {numEvents} -b {bufSize} --ip {sender_addr} --sockets {numSocks} --withcp -4\"\n", + "# Sender options of interest:\n", + "# -z - send 0 event rate in Sync messages\n", + "# --usec - use usec-precision timestamp as event numbers in Sync and LB messages \n", + "send_command = f\"{e2sar_perf} -s -u '{instance_uri}' --mtu {mtu} --rate {rate} --length {length} -n {numEvents} -b {bufSize} --ip {sender_addr} --sockets {numSocks} --withcp -4 --usec\"\n", "\n", "for recver, recver_addr in zip(recvers, recver_addrs):\n", " recv_command = f\"{e2sar_perf} -r -u '{instance_uri}' -d {recverDuration} -b {bufSize} --ip {recver_addr} --port {startPort} --withcp -4 --threads {recvThreads}\"\n",