Skip to content

Commit

Permalink
feat(ws-fetcher): retrieve missing values for detections from DB & tr…
Browse files Browse the repository at this point in the history
…y to fix simplePositioner cursedness with threading
  • Loading branch information
NuttyShrimp committed Apr 2, 2024
1 parent 91e6e20 commit 9c140bf
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 15 deletions.
7 changes: 4 additions & 3 deletions src/main/java/telraam/database/daos/DetectionDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ INSERT INTO detection (station_id, baton_id, timestamp, rssi, battery, remote_id

@SqlBatch("""
INSERT INTO detection (station_id, baton_id, timestamp, rssi, battery, remote_id, uptime_ms, timestamp_ingestion) \
VALUES (:stationId, (SELECT id FROM baton WHERE :batonMac), :timestamp, :rssi, :battery, :remoteId, :uptimeMs, :timestampIngestion)
VALUES (:stationId, (SELECT id FROM baton WHERE mac = :batonMac), :timestamp, :rssi, :battery, :remoteId, :uptimeMs, :timestampIngestion)
""")
@GetGeneratedKeys({"id"})
int insertAllWithoutBaton(@BindBean List<Detection> detection, @Bind("batonMac") List<String> batonMac);
@GetGeneratedKeys({"id", "baton_id"})
@RegisterBeanMapper(Detection.class)
List<Detection> insertAllWithoutBaton(@BindBean List<Detection> detection, @Bind("batonMac") List<String> batonMac);

@SqlQuery("SELECT * FROM detection WHERE id = :id")
@RegisterBeanMapper(Detection.class)
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/telraam/database/models/Detection.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import java.sql.Timestamp;

@Setter @Getter @NoArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class Detection {
private Integer id;
private Integer batonId;
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/telraam/database/models/Team.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter @Setter @NoArgsConstructor
import java.util.Objects;

@Getter
@Setter
@NoArgsConstructor
public class Team {
private Integer id;
private String name;
Expand All @@ -18,4 +22,8 @@ public Team(String name, int batonId) {
this.name = name;
this.batonId = batonId;
}

public boolean equals(Team obj) {
return Objects.equals(id, obj.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class SimplePositioner implements Positioner {
Expand All @@ -31,9 +32,9 @@ public class SimplePositioner implements Positioner {
private static final Logger logger = Logger.getLogger(SimplePositioner.class.getName());
private final PositionSender positionSender;
private final Map<Integer, Team> batonIdToTeam;
private final Map<Team, CircularQueue<Detection>> teamDetections;
private final Map<Integer, CircularQueue<Detection>> teamDetections;
private final List<Integer> stations;
private final Map<Team, Position> teamPositions;
private final Map<Integer, Position> teamPositions;

public SimplePositioner(Jdbi jdbi) {
this.debounceScheduled = false;
Expand All @@ -46,8 +47,8 @@ public SimplePositioner(Jdbi jdbi) {
TeamDAO teamDAO = jdbi.onDemand(TeamDAO.class);
List<Team> teams = teamDAO.getAll();
for (Team team: teams) {
teamDetections.put(team, new CircularQueue<>(QUEUE_SIZE));
teamPositions.put(team, new Position(team.getId()));
teamDetections.put(team.getId(), new CircularQueue<>(QUEUE_SIZE));
teamPositions.put(team.getId(), new Position(team.getId()));
}
List<BatonSwitchover> switchovers = jdbi.onDemand(BatonSwitchoverDAO.class).getAll();
switchovers.sort(Comparator.comparing(BatonSwitchover::getTimestamp));
Expand All @@ -63,7 +64,7 @@ public SimplePositioner(Jdbi jdbi) {

public void calculatePositions() {
logger.info("SimplePositioner: Calculating positions...");
for (Map.Entry<Team, CircularQueue<Detection>> entry: teamDetections.entrySet()) {
for (Map.Entry<Integer, CircularQueue<Detection>> entry: teamDetections.entrySet()) {
List<Detection> detections = teamDetections.get(entry.getKey());
detections.sort(Comparator.comparing(Detection::getTimestamp));

Expand All @@ -86,15 +87,15 @@ public void calculatePositions() {

public void handle(Detection detection) {
Team team = batonIdToTeam.get(detection.getBatonId());
teamDetections.get(team).add(detection);
teamDetections.get(team.getId()).add(detection);

if (! debounceScheduled) {
debounceScheduled = true;
scheduler.schedule(() -> {
try {
calculatePositions();
} catch (Exception e) {
logger.severe(e.getMessage());
logger.log(Level.SEVERE, e.getMessage(), e);
}
debounceScheduled = false;
}, DEBOUNCE_TIMEOUT, TimeUnit.SECONDS);
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/telraam/station/websocket/WebsocketFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,17 @@ public void fetch() {
detection_mac_addresses.add(detection.mac);
}
if (!new_detections.isEmpty()) {
detectionDAO.insertAllWithoutBaton(new_detections, detection_mac_addresses);
new_detections.forEach((detection) -> {
List<Detection> db_detections = detectionDAO.insertAllWithoutBaton(new_detections, detection_mac_addresses);
for(int i = 0; i < new_detections.size(); i++) {
Detection detection = new_detections.get(i);
Detection db_detection = db_detections.get(i);

detection.setBatonId(db_detection.getBatonId());
detection.setId(db_detection.getId());

lappers.forEach((lapper) -> lapper.handle(detection));
positioners.forEach(positioner -> positioner.handle(detection));
});
}
}

logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size());
Expand Down

0 comments on commit 9c140bf

Please sign in to comment.