From a3006458f883d152bf0b07829891393d083bb8d3 Mon Sep 17 00:00:00 2001 From: Chengxun Lee <24319042+bclswl0827@users.noreply.github.com> Date: Tue, 17 Sep 2024 23:20:40 +0800 Subject: [PATCH] refactor: Insert timestamp before buffering legacy mode packets --- .github/workflows/release.yml | 11 +- CHANGELOG.md | 16 +++ VERSION | 2 +- drivers/explorer/impl.go | 205 ++++++++++++++++++++-------------- services/miniseed/write.go | 5 + startups/explorer/execute.go | 10 +- 6 files changed, 153 insertions(+), 96 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 7bd5997e..54627ffb 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -211,15 +211,16 @@ jobs: "labels": [ "chore", "docs", - "perf", + "perf" + ] + }, + { + "title": "## Refactor", + "labels": [ "refactor", "revert", "style" ] - }, - { - "title":"## Changes", - "labels":[] } ], "label_extractor":[ diff --git a/CHANGELOG.md b/CHANGELOG.md index b659437d..8f1b4608 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,22 @@ Starting from v2.2.5, all notable changes to this project will be documented in this file. +## v3.3.1 + +### Bug Fixes + +- Fixed the timestamp accuracy issue of event data source of JMA. +- Fixed token refresh logic in Main component. + +### Refactor + +- Insert timestamp before buffering legacy mode packets. + +### Chore + +- Create a copy of the configuration file when executing make run. +- Updated retention default value to 120s in global configuration. + ## v3.3.0 ### New Features diff --git a/VERSION b/VERSION index b299be97..15ee400b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v3.3.0 +v3.3.1 diff --git a/drivers/explorer/impl.go b/drivers/explorer/impl.go index 6e6ed6f6..439810bd 100644 --- a/drivers/explorer/impl.go +++ b/drivers/explorer/impl.go @@ -108,6 +108,7 @@ func (g *mainlinePacket) decode(data []byte) error { } // Restore the header data, note that the byte order is little-endian + g.Timestamp = int64(binary.LittleEndian.Uint64(data[:unsafe.Sizeof(g.Timestamp)])) switch (g.Timestamp / time.Second.Milliseconds()) % 4 { case 0: g.VariableName = "device_id" @@ -120,7 +121,6 @@ func (g *mainlinePacket) decode(data []byte) error { } variableDataIndex := int(unsafe.Sizeof(g.Timestamp) + unsafe.Sizeof(g.VariableData)) copy(g.VariableData[:], data[unsafe.Sizeof(g.Timestamp):variableDataIndex]) - g.Timestamp = int64(binary.LittleEndian.Uint64(data[:unsafe.Sizeof(g.Timestamp)])) // Restore the channel data, note that the byte order is little-endian zAxisOffset := variableDataIndex + int(unsafe.Sizeof(g.Z_axis)) @@ -144,10 +144,12 @@ type ExplorerDriverImpl struct { } func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency, fifoBuffer *fifo.Buffer) { - recvSize := len(LEGACY_PACKET_FRAME_HEADER) + e.legacyPacket.length() + // Set device ID to 0xFFFFFFFF to indicate legacy mode + deps.Config.SetDeviceId(math.MaxUint32) // Read data from the transport continuously go func() { + recvSize := len(LEGACY_PACKET_FRAME_HEADER) + e.legacyPacket.length() buf := make([]byte, recvSize) for { select { @@ -161,21 +163,58 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency, fi continue } - fifoBuffer.Write(buf[:n]) + // Record the current time of the packet + timeBytes := make([]byte, 8) // 8 bytes for int64 + binary.BigEndian.PutUint64(timeBytes, uint64(deps.FallbackTime.Get().UnixMilli())) + + // Find header in the buffer to insert current time next to the header + headerIndex := bytes.Index(buf[:n], LEGACY_PACKET_FRAME_HEADER) + if headerIndex == -1 { + continue + } + + // Create a new buffer to store data with the timestamp inserted + modifiedBuf := make([]byte, 0, n+len(timeBytes)) + modifiedBuf = append(modifiedBuf, buf[:headerIndex+len(LEGACY_PACKET_FRAME_HEADER)]...) + modifiedBuf = append(modifiedBuf, timeBytes...) + modifiedBuf = append(modifiedBuf, buf[headerIndex+len(LEGACY_PACKET_FRAME_HEADER):n]...) + fifoBuffer.Write(modifiedBuf) } } }() - // Reference: https://stackoverflow.com/a/51424566 - // Calculate the duration to the next whole second to allivate the drift - calcDuration := func(currentTime time.Time, duration time.Duration) time.Duration { - return currentTime.Round(duration).Add(duration).Sub(currentTime) + findClosestSampleRate := func(currentSampleRate int) int { + targetSampleRates := []int{ + 5, 10, 25, 50, 75, 100, + 125, 150, 175, 200, 225, + 250, 275, 300, 325, 350, + 375, 400, 425, 450, 475, + 500, 525, 550, 575, 600, + 625, 650, 675, 700, 725, + 750, 775, 800, 825, 850, + 875, 900, 925, 950, 975, + 1000, + } + + closest := targetSampleRates[0] + minDiff := math.Abs(float64(currentSampleRate - closest)) + + for _, target := range targetSampleRates { + diff := math.Abs(float64(currentSampleRate - target)) + if diff < minDiff { + closest = target + minDiff = diff + } + } + + return closest } // Read data from the FIFO buffer continuously var ( - packetBuffer = []legacyPacket{} - ticker = time.NewTimer(calcDuration(time.Now(), time.Second)) + packetBuffer = []mainlinePacket{} // Legacy packet is converted to mainline packet internally + readSize = len(LEGACY_PACKET_FRAME_HEADER) + 8 + e.legacyPacket.length() // header + timestamp + legacyPacket + nextTick = int64(0) // Expected timestamp for the next published data on message bus timer = time.NewTimer(time.Millisecond) ) for { @@ -183,52 +222,81 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency, fi select { case <-deps.CancelToken.Done(): - ticker.Stop() return - case currentTick := <-ticker.C: - if len(packetBuffer) > 0 { - currentTime := deps.FallbackTime.Get() - deps.Health.SetUpdatedAt(currentTime) - deps.Health.SetReceived(deps.Health.GetReceived() + 1) + case <-timer.C: + dat, err := fifoBuffer.Peek(LEGACY_PACKET_FRAME_HEADER, readSize) + if err != nil { + continue + } + // Read and decode the legacy packet + err = e.legacyPacket.decode(dat[len(LEGACY_PACKET_FRAME_HEADER)+8:]) + if err != nil { + e.logger.Errorf("failed to decode legacy packet: %v", err) + deps.Health.SetErrors(deps.Health.GetErrors() + 1) + continue + } + + // Extract timestamp from the buffer + timestamp := int64(binary.BigEndian.Uint64( + dat[len(LEGACY_PACKET_FRAME_HEADER) : len(LEGACY_PACKET_FRAME_HEADER)+8], + )) + + // Append the packet to the buffer + if nextTick == 0 { + nextTick = timestamp + } else { + packetBuffer = append(packetBuffer, mainlinePacket{ + Timestamp: timestamp, + Z_axis: e.legacyPacket.Z_Axis, + E_axis: e.legacyPacket.E_Axis, + N_axis: e.legacyPacket.N_Axis, + }) + } + + // Calculate proper sample rate to avoid jitter + currentSampleRate := len(packetBuffer) * MAINLINE_PACKET_CHANNEL_SIZE + targetSampleRate := findClosestSampleRate(currentSampleRate) + + if math.Abs(float64(timestamp-nextTick)) <= EXPLORER_ALLOWED_JITTER_MS && currentSampleRate == targetSampleRate { + // Update the next tick even if the buffer is empty + nextTick = timestamp + time.Second.Milliseconds() + if len(packetBuffer) == 0 { + continue + } + + // Merge the packet buffer into a single packet var ( z_axis_count []int32 e_axis_count []int32 n_axis_count []int32 ) for _, packet := range packetBuffer { - z_axis_count = append(z_axis_count, packet.Z_Axis[:]...) - e_axis_count = append(e_axis_count, packet.E_Axis[:]...) - n_axis_count = append(n_axis_count, packet.N_Axis[:]...) + z_axis_count = append(z_axis_count, packet.Z_axis[:]...) + e_axis_count = append(e_axis_count, packet.E_axis[:]...) + n_axis_count = append(n_axis_count, packet.N_axis[:]...) } - sampleRate := len(packetBuffer) * LEGACY_PACKET_CHANNEL_SIZE - deps.Health.SetSampleRate(sampleRate) + // Publish the final packet finalPacket := ExplorerData{ - SampleRate: sampleRate, + SampleRate: targetSampleRate, Z_Axis: z_axis_count, E_Axis: e_axis_count, N_Axis: n_axis_count, - Timestamp: currentTime.UTC().Add(-time.Second).UnixMilli(), + Timestamp: timestamp - time.Second.Milliseconds(), } deps.messageBus.Publish("explorer", &finalPacket) - packetBuffer = []legacyPacket{} - ticker.Reset(calcDuration(currentTick, time.Second)) - } - case <-timer.C: - dat, err := fifoBuffer.Peek(LEGACY_PACKET_FRAME_HEADER, recvSize) - if err != nil { - continue - } + // Update the health status + deps.Health.SetSampleRate(targetSampleRate) + deps.Health.SetReceived(deps.Health.GetReceived() + 1) + deps.Health.SetUpdatedAt(time.UnixMilli(timestamp).UTC()) - // Read the packet data - err = e.legacyPacket.decode(dat[len(LEGACY_PACKET_FRAME_HEADER):]) - if err != nil { - e.logger.Errorf("failed to decode legacy packet: %v", err) - deps.Health.SetErrors(deps.Health.GetErrors() + 1) - } else { - packetBuffer = append(packetBuffer, e.legacyPacket) + packetBuffer = []mainlinePacket{} + } else if timestamp-nextTick > EXPLORER_ALLOWED_JITTER_MS { + // Update the next tick, clear the buffer if the jitter exceeds the threshold + nextTick = timestamp + time.Second.Milliseconds() + packetBuffer = []mainlinePacket{} } } } @@ -257,17 +325,10 @@ func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency, } }() - // When device ID was set to 19890604 (dummy value), the device is running without GNSS module. - // In this case, the latitude, longitude, elevation will not be updated. - // The timestamp will be replaced with the NTP time. - noGnssMode := false - if deps.Config.GetDeviceId() == 19890604 { - noGnssMode = true - } - // Read data from the FIFO buffer continuously var ( packetBuffer = []mainlinePacket{} + noGnssMode = false nextTick = int64(0) // Expected timestamp for the next published data on message bus timeDiff = int64(0) // For non-GNSS mode, we need time difference between the packet and NTP time timer = time.NewTimer(time.Millisecond) @@ -290,12 +351,23 @@ func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency, continue } - // Update the device ID, latitude, longitude, elevation - switch e.mainlinePacket.VariableName { - case "device_id": - if !noGnssMode { - deps.Config.SetDeviceId(binary.LittleEndian.Uint32(e.mainlinePacket.VariableData[:])) + // If device ID is not initialized, get the device ID from the packet + if deps.Config.GetDeviceId() == 0 && e.mainlinePacket.VariableName == "device_id" { + deviceId := binary.LittleEndian.Uint32(e.mainlinePacket.VariableData[:]) + // When device ID was set to 19890604 (dummy value), the device is running without GNSS module. + // In this case, the latitude, longitude, elevation will not be updated. + // The timestamp will be replaced with the NTP time. + if deviceId == 19890604 { + noGnssMode = true } + deps.Config.SetDeviceId(deviceId) + e.logger.Infof("got current device ID: %08X", deviceId) + } else if deps.Config.GetDeviceId() == 0 { + continue + } + + // Update the latitude, longitude, elevation + switch e.mainlinePacket.VariableName { case "latitude": latitude := math.Float32frombits(binary.LittleEndian.Uint32(e.mainlinePacket.VariableData[:])) if latitude >= -90 && latitude <= 90 && !noGnssMode { @@ -374,7 +446,7 @@ func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency, } func (e *ExplorerDriverImpl) readerDaemon(deps *ExplorerDependency) { - fifoBuffer := fifo.New(65536) + fifoBuffer := fifo.New(8192) if deps.Config.GetLegacyMode() { e.handleReadLegacyPacket(deps, &fifoBuffer) @@ -390,37 +462,6 @@ func (e *ExplorerDriverImpl) Init(deps *ExplorerDependency, logger *logrus.Entry deps.Health.SetStartTime(currentTime) deps.subscribers = haxmap.New[string, ExplorerEventHandler]() deps.messageBus = messagebus.New(1024) - deps.Config.SetDeviceId(math.MaxUint32) - - // Get device ID in EEPROM - if !deps.Config.GetLegacyMode() { - var ( - startTime = time.Now() - readTimeout = 5 * time.Second - ) - for time.Since(startTime) <= readTimeout { - ok, _ := deps.Transport.Filter(MAINLINE_PACKET_FRAME_HEADER, time.Second) - if !ok { - continue - } - headerBuf := make([]byte, e.mainlinePacket.length()) - _, err := deps.Transport.Read(headerBuf, time.Second, false) - if err != nil { - continue - } - err = e.mainlinePacket.decode(headerBuf) - if err != nil { - continue - } - if e.mainlinePacket.VariableName == "device_id" { - deps.Config.SetDeviceId(binary.LittleEndian.Uint32(e.mainlinePacket.VariableData[:])) - break - } - } - if time.Since(startTime) > readTimeout { - return errors.New("failed to get device ID, please check the device") - } - } go e.readerDaemon(deps) return nil diff --git a/services/miniseed/write.go b/services/miniseed/write.go index 022246b8..1eaf5119 100644 --- a/services/miniseed/write.go +++ b/services/miniseed/write.go @@ -25,6 +25,11 @@ func (m *MiniSeedService) handleWrite() error { m.miniseedBuffer[i].Timestamp, ) } + + // Make sure sample rate is the same + if m.miniseedBuffer[i].SampleRate != startSampleRate { + return fmt.Errorf("sample rate is not the same, expected %d, got %d", startSampleRate, m.miniseedBuffer[i].SampleRate) + } } // Write data to file by channels diff --git a/startups/explorer/execute.go b/startups/explorer/execute.go index 3ac7a6ff..497b67e2 100644 --- a/startups/explorer/execute.go +++ b/startups/explorer/execute.go @@ -16,19 +16,13 @@ func (t *ExplorerStartupTask) Execute(depsContainer *dig.Container, options *sta if err != nil { return err } - explorerDriver := explorer.ExplorerDriver(&explorer.ExplorerDriverImpl{}) - logger.GetLogger(t.GetTaskName()).Infoln("device is being initialized, please wait") + explorerDriver := explorer.ExplorerDriver(&explorer.ExplorerDriverImpl{}) err = explorerDriver.Init(explorerDeps, logger.GetLogger("explorer_driver")) if err != nil { return err } - logger.GetLogger(t.GetTaskName()).Infoln("device has been initialized successfully") - if !explorerDeps.Config.GetLegacyMode() { - logger.GetLogger(t.GetTaskName()).Infof("handshake successful, device ID: %08X", explorerDeps.Config.GetDeviceId()) - } else { - logger.GetLogger(t.GetTaskName()).Warnln("device is in legacy mode, this is for backward compatibility only") - } + logger.GetLogger(t.GetTaskName()).Infoln("device has been initialized") return nil }