Skip to content

Commit

Permalink
refactor: Insert timestamp before buffering legacy mode packets
Browse files Browse the repository at this point in the history
  • Loading branch information
bclswl0827 committed Sep 17, 2024
1 parent 3c31ff7 commit a300645
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 96 deletions.
11 changes: 6 additions & 5 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,16 @@ jobs:
"labels": [
"chore",
"docs",
"perf",
"perf"
]
},
{
"title": "## Refactor",
"labels": [
"refactor",
"revert",
"style"
]
},
{
"title":"## Changes",
"labels":[]
}
],
"label_extractor":[
Expand Down
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@

Starting from v2.2.5, all notable changes to this project will be documented in this file.

## v3.3.1

### Bug Fixes

- Fixed the timestamp accuracy issue of event data source of JMA.
- Fixed token refresh logic in Main component.

### Refactor

- Insert timestamp before buffering legacy mode packets.

### Chore

- Create a copy of the configuration file when executing make run.
- Updated retention default value to 120s in global configuration.

## v3.3.0

### New Features
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v3.3.0
v3.3.1
205 changes: 123 additions & 82 deletions drivers/explorer/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (g *mainlinePacket) decode(data []byte) error {
}

// Restore the header data, note that the byte order is little-endian
g.Timestamp = int64(binary.LittleEndian.Uint64(data[:unsafe.Sizeof(g.Timestamp)]))
switch (g.Timestamp / time.Second.Milliseconds()) % 4 {
case 0:
g.VariableName = "device_id"
Expand All @@ -120,7 +121,6 @@ func (g *mainlinePacket) decode(data []byte) error {
}
variableDataIndex := int(unsafe.Sizeof(g.Timestamp) + unsafe.Sizeof(g.VariableData))
copy(g.VariableData[:], data[unsafe.Sizeof(g.Timestamp):variableDataIndex])
g.Timestamp = int64(binary.LittleEndian.Uint64(data[:unsafe.Sizeof(g.Timestamp)]))

// Restore the channel data, note that the byte order is little-endian
zAxisOffset := variableDataIndex + int(unsafe.Sizeof(g.Z_axis))
Expand All @@ -144,10 +144,12 @@ type ExplorerDriverImpl struct {
}

func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency, fifoBuffer *fifo.Buffer) {
recvSize := len(LEGACY_PACKET_FRAME_HEADER) + e.legacyPacket.length()
// Set device ID to 0xFFFFFFFF to indicate legacy mode
deps.Config.SetDeviceId(math.MaxUint32)

// Read data from the transport continuously
go func() {
recvSize := len(LEGACY_PACKET_FRAME_HEADER) + e.legacyPacket.length()
buf := make([]byte, recvSize)
for {
select {
Expand All @@ -161,74 +163,140 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency, fi
continue
}

fifoBuffer.Write(buf[:n])
// Record the current time of the packet
timeBytes := make([]byte, 8) // 8 bytes for int64
binary.BigEndian.PutUint64(timeBytes, uint64(deps.FallbackTime.Get().UnixMilli()))

// Find header in the buffer to insert current time next to the header
headerIndex := bytes.Index(buf[:n], LEGACY_PACKET_FRAME_HEADER)
if headerIndex == -1 {
continue
}

// Create a new buffer to store data with the timestamp inserted
modifiedBuf := make([]byte, 0, n+len(timeBytes))
modifiedBuf = append(modifiedBuf, buf[:headerIndex+len(LEGACY_PACKET_FRAME_HEADER)]...)
modifiedBuf = append(modifiedBuf, timeBytes...)
modifiedBuf = append(modifiedBuf, buf[headerIndex+len(LEGACY_PACKET_FRAME_HEADER):n]...)
fifoBuffer.Write(modifiedBuf)
}
}
}()

// Reference: https://stackoverflow.com/a/51424566
// Calculate the duration to the next whole second to allivate the drift
calcDuration := func(currentTime time.Time, duration time.Duration) time.Duration {
return currentTime.Round(duration).Add(duration).Sub(currentTime)
findClosestSampleRate := func(currentSampleRate int) int {
targetSampleRates := []int{
5, 10, 25, 50, 75, 100,
125, 150, 175, 200, 225,
250, 275, 300, 325, 350,
375, 400, 425, 450, 475,
500, 525, 550, 575, 600,
625, 650, 675, 700, 725,
750, 775, 800, 825, 850,
875, 900, 925, 950, 975,
1000,
}

closest := targetSampleRates[0]
minDiff := math.Abs(float64(currentSampleRate - closest))

for _, target := range targetSampleRates {
diff := math.Abs(float64(currentSampleRate - target))
if diff < minDiff {
closest = target
minDiff = diff
}
}

return closest
}

// Read data from the FIFO buffer continuously
var (
packetBuffer = []legacyPacket{}
ticker = time.NewTimer(calcDuration(time.Now(), time.Second))
packetBuffer = []mainlinePacket{} // Legacy packet is converted to mainline packet internally
readSize = len(LEGACY_PACKET_FRAME_HEADER) + 8 + e.legacyPacket.length() // header + timestamp + legacyPacket
nextTick = int64(0) // Expected timestamp for the next published data on message bus
timer = time.NewTimer(time.Millisecond)
)
for {
timer.Reset(time.Millisecond)

select {
case <-deps.CancelToken.Done():
ticker.Stop()
return
case currentTick := <-ticker.C:
if len(packetBuffer) > 0 {
currentTime := deps.FallbackTime.Get()
deps.Health.SetUpdatedAt(currentTime)
deps.Health.SetReceived(deps.Health.GetReceived() + 1)
case <-timer.C:
dat, err := fifoBuffer.Peek(LEGACY_PACKET_FRAME_HEADER, readSize)
if err != nil {
continue
}

// Read and decode the legacy packet
err = e.legacyPacket.decode(dat[len(LEGACY_PACKET_FRAME_HEADER)+8:])
if err != nil {
e.logger.Errorf("failed to decode legacy packet: %v", err)
deps.Health.SetErrors(deps.Health.GetErrors() + 1)
continue
}

// Extract timestamp from the buffer
timestamp := int64(binary.BigEndian.Uint64(
dat[len(LEGACY_PACKET_FRAME_HEADER) : len(LEGACY_PACKET_FRAME_HEADER)+8],
))

// Append the packet to the buffer
if nextTick == 0 {
nextTick = timestamp
} else {
packetBuffer = append(packetBuffer, mainlinePacket{
Timestamp: timestamp,
Z_axis: e.legacyPacket.Z_Axis,
E_axis: e.legacyPacket.E_Axis,
N_axis: e.legacyPacket.N_Axis,
})
}

// Calculate proper sample rate to avoid jitter
currentSampleRate := len(packetBuffer) * MAINLINE_PACKET_CHANNEL_SIZE
targetSampleRate := findClosestSampleRate(currentSampleRate)

if math.Abs(float64(timestamp-nextTick)) <= EXPLORER_ALLOWED_JITTER_MS && currentSampleRate == targetSampleRate {
// Update the next tick even if the buffer is empty
nextTick = timestamp + time.Second.Milliseconds()
if len(packetBuffer) == 0 {
continue
}

// Merge the packet buffer into a single packet
var (
z_axis_count []int32
e_axis_count []int32
n_axis_count []int32
)
for _, packet := range packetBuffer {
z_axis_count = append(z_axis_count, packet.Z_Axis[:]...)
e_axis_count = append(e_axis_count, packet.E_Axis[:]...)
n_axis_count = append(n_axis_count, packet.N_Axis[:]...)
z_axis_count = append(z_axis_count, packet.Z_axis[:]...)
e_axis_count = append(e_axis_count, packet.E_axis[:]...)
n_axis_count = append(n_axis_count, packet.N_axis[:]...)
}

sampleRate := len(packetBuffer) * LEGACY_PACKET_CHANNEL_SIZE
deps.Health.SetSampleRate(sampleRate)
// Publish the final packet
finalPacket := ExplorerData{
SampleRate: sampleRate,
SampleRate: targetSampleRate,
Z_Axis: z_axis_count,
E_Axis: e_axis_count,
N_Axis: n_axis_count,
Timestamp: currentTime.UTC().Add(-time.Second).UnixMilli(),
Timestamp: timestamp - time.Second.Milliseconds(),
}
deps.messageBus.Publish("explorer", &finalPacket)
packetBuffer = []legacyPacket{}

ticker.Reset(calcDuration(currentTick, time.Second))
}
case <-timer.C:
dat, err := fifoBuffer.Peek(LEGACY_PACKET_FRAME_HEADER, recvSize)
if err != nil {
continue
}
// Update the health status
deps.Health.SetSampleRate(targetSampleRate)
deps.Health.SetReceived(deps.Health.GetReceived() + 1)
deps.Health.SetUpdatedAt(time.UnixMilli(timestamp).UTC())

// Read the packet data
err = e.legacyPacket.decode(dat[len(LEGACY_PACKET_FRAME_HEADER):])
if err != nil {
e.logger.Errorf("failed to decode legacy packet: %v", err)
deps.Health.SetErrors(deps.Health.GetErrors() + 1)
} else {
packetBuffer = append(packetBuffer, e.legacyPacket)
packetBuffer = []mainlinePacket{}
} else if timestamp-nextTick > EXPLORER_ALLOWED_JITTER_MS {
// Update the next tick, clear the buffer if the jitter exceeds the threshold
nextTick = timestamp + time.Second.Milliseconds()
packetBuffer = []mainlinePacket{}
}
}
}
Expand Down Expand Up @@ -257,17 +325,10 @@ func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency,
}
}()

// When device ID was set to 19890604 (dummy value), the device is running without GNSS module.
// In this case, the latitude, longitude, elevation will not be updated.
// The timestamp will be replaced with the NTP time.
noGnssMode := false
if deps.Config.GetDeviceId() == 19890604 {
noGnssMode = true
}

// Read data from the FIFO buffer continuously
var (
packetBuffer = []mainlinePacket{}
noGnssMode = false
nextTick = int64(0) // Expected timestamp for the next published data on message bus
timeDiff = int64(0) // For non-GNSS mode, we need time difference between the packet and NTP time
timer = time.NewTimer(time.Millisecond)
Expand All @@ -290,12 +351,23 @@ func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency,
continue
}

// Update the device ID, latitude, longitude, elevation
switch e.mainlinePacket.VariableName {
case "device_id":
if !noGnssMode {
deps.Config.SetDeviceId(binary.LittleEndian.Uint32(e.mainlinePacket.VariableData[:]))
// If device ID is not initialized, get the device ID from the packet
if deps.Config.GetDeviceId() == 0 && e.mainlinePacket.VariableName == "device_id" {
deviceId := binary.LittleEndian.Uint32(e.mainlinePacket.VariableData[:])
// When device ID was set to 19890604 (dummy value), the device is running without GNSS module.
// In this case, the latitude, longitude, elevation will not be updated.
// The timestamp will be replaced with the NTP time.
if deviceId == 19890604 {
noGnssMode = true
}
deps.Config.SetDeviceId(deviceId)
e.logger.Infof("got current device ID: %08X", deviceId)
} else if deps.Config.GetDeviceId() == 0 {
continue
}

// Update the latitude, longitude, elevation
switch e.mainlinePacket.VariableName {
case "latitude":
latitude := math.Float32frombits(binary.LittleEndian.Uint32(e.mainlinePacket.VariableData[:]))
if latitude >= -90 && latitude <= 90 && !noGnssMode {
Expand Down Expand Up @@ -374,7 +446,7 @@ func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency,
}

func (e *ExplorerDriverImpl) readerDaemon(deps *ExplorerDependency) {
fifoBuffer := fifo.New(65536)
fifoBuffer := fifo.New(8192)

if deps.Config.GetLegacyMode() {
e.handleReadLegacyPacket(deps, &fifoBuffer)
Expand All @@ -390,37 +462,6 @@ func (e *ExplorerDriverImpl) Init(deps *ExplorerDependency, logger *logrus.Entry
deps.Health.SetStartTime(currentTime)
deps.subscribers = haxmap.New[string, ExplorerEventHandler]()
deps.messageBus = messagebus.New(1024)
deps.Config.SetDeviceId(math.MaxUint32)

// Get device ID in EEPROM
if !deps.Config.GetLegacyMode() {
var (
startTime = time.Now()
readTimeout = 5 * time.Second
)
for time.Since(startTime) <= readTimeout {
ok, _ := deps.Transport.Filter(MAINLINE_PACKET_FRAME_HEADER, time.Second)
if !ok {
continue
}
headerBuf := make([]byte, e.mainlinePacket.length())
_, err := deps.Transport.Read(headerBuf, time.Second, false)
if err != nil {
continue
}
err = e.mainlinePacket.decode(headerBuf)
if err != nil {
continue
}
if e.mainlinePacket.VariableName == "device_id" {
deps.Config.SetDeviceId(binary.LittleEndian.Uint32(e.mainlinePacket.VariableData[:]))
break
}
}
if time.Since(startTime) > readTimeout {
return errors.New("failed to get device ID, please check the device")
}
}

go e.readerDaemon(deps)
return nil
Expand Down
5 changes: 5 additions & 0 deletions services/miniseed/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ func (m *MiniSeedService) handleWrite() error {
m.miniseedBuffer[i].Timestamp,
)
}

// Make sure sample rate is the same
if m.miniseedBuffer[i].SampleRate != startSampleRate {
return fmt.Errorf("sample rate is not the same, expected %d, got %d", startSampleRate, m.miniseedBuffer[i].SampleRate)
}
}

// Write data to file by channels
Expand Down
10 changes: 2 additions & 8 deletions startups/explorer/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,13 @@ func (t *ExplorerStartupTask) Execute(depsContainer *dig.Container, options *sta
if err != nil {
return err
}
explorerDriver := explorer.ExplorerDriver(&explorer.ExplorerDriverImpl{})

logger.GetLogger(t.GetTaskName()).Infoln("device is being initialized, please wait")
explorerDriver := explorer.ExplorerDriver(&explorer.ExplorerDriverImpl{})
err = explorerDriver.Init(explorerDeps, logger.GetLogger("explorer_driver"))
if err != nil {
return err
}

logger.GetLogger(t.GetTaskName()).Infoln("device has been initialized successfully")
if !explorerDeps.Config.GetLegacyMode() {
logger.GetLogger(t.GetTaskName()).Infof("handshake successful, device ID: %08X", explorerDeps.Config.GetDeviceId())
} else {
logger.GetLogger(t.GetTaskName()).Warnln("device is in legacy mode, this is for backward compatibility only")
}
logger.GetLogger(t.GetTaskName()).Infoln("device has been initialized")
return nil
}

0 comments on commit a300645

Please sign in to comment.