Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add WS station fetcher #135

Merged
merged 12 commits into from
Apr 22, 2024
Prev Previous commit
Next Next commit
feat: re-add http fetcher & move WS to own package
NuttyShrimp committed Apr 7, 2024
commit a75a0ce73cb7cb7705cb34986449f5e2620f2cb1
6 changes: 4 additions & 2 deletions src/main/java/telraam/App.java
Original file line number Diff line number Diff line change
@@ -24,7 +24,8 @@
import telraam.logic.lapper.robust.RobustLapper;
import telraam.logic.positioner.Positioner;
import telraam.logic.positioner.simple.SimplePositioner;
import telraam.station.Fetcher;
import telraam.station.FetcherFactory;
import telraam.station.websocket.WebsocketFetcher;
import telraam.util.AcceptedLapsUtil;
import telraam.websocket.WebSocketConnection;

@@ -142,9 +143,10 @@ public void run(AppConfiguration configuration, Environment environment) {
positioners.add(new SimplePositioner(this.database));

// Start fetch thread for each station
FetcherFactory fetcherFactory = new FetcherFactory(this.database, lappers, positioners);
StationDAO stationDAO = this.database.onDemand(StationDAO.class);
for (Station station : stationDAO.getAll()) {
new Thread(() -> new Fetcher(this.database, station, lappers, positioners).fetch()).start();
new Thread(() -> fetcherFactory.create(station).fetch()).start();
}
}

147 changes: 10 additions & 137 deletions src/main/java/telraam/station/Fetcher.java
Original file line number Diff line number Diff line change
@@ -1,141 +1,14 @@
package telraam.station;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.jdbi.v3.core.Jdbi;
import com.fasterxml.jackson.databind.ObjectMapper;
import telraam.database.daos.BatonDAO;
import telraam.database.daos.DetectionDAO;
import telraam.database.daos.StationDAO;
import telraam.database.models.Detection;
import telraam.database.models.Station;
import telraam.logic.lapper.Lapper;
import telraam.logic.positioner.Positioner;
import telraam.station.models.RonnyDetection;

import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.*;
import java.sql.Timestamp;
import java.util.*;
import java.util.logging.Logger;

public class Fetcher {
public interface Fetcher {
//Timeout to wait for before sending the next request after an error.
private final static int ERROR_TIMEOUT_MS = 2000;
private final Set<Lapper> lappers;
private final Set<Positioner> positioners;
private Station station;

private final BatonDAO batonDAO;
private final DetectionDAO detectionDAO;
private final StationDAO stationDAO;

private final HttpClient client = HttpClient.newHttpClient();
private final Logger logger = Logger.getLogger(Fetcher.class.getName());

public Fetcher(Jdbi database, Station station, Set<Lapper> lappers, Set<Positioner> positioners) {
this.batonDAO = database.onDemand(BatonDAO.class);
this.detectionDAO = database.onDemand(DetectionDAO.class);
this.stationDAO = database.onDemand(StationDAO.class);

this.lappers = lappers;
this.positioners = positioners;
this.station = station;
}

public void fetch() {
logger.info("Running Fetcher for station(" + this.station.getId() + ")");
ObjectMapper mapper = new ObjectMapper();

//Update the station to account for possible changes in the database
this.stationDAO.getById(station.getId()).ifPresentOrElse(
station -> this.station = station,
() -> this.logger.severe("Can't update station from database.")
);

//Get last detection id
int lastDetectionId = 0;
Optional<Detection> lastDetection = detectionDAO.latestDetectionByStationId(this.station.getId());
if (lastDetection.isPresent()) {
lastDetectionId = lastDetection.get().getRemoteId();
}

InitWSMessage wsMessage = new InitWSMessage(lastDetectionId);
String wsMessageEncoded;
try {
wsMessageEncoded = mapper.writeValueAsString(wsMessage);
} catch (JsonProcessingException e) {
logger.severe(e.getMessage());
try {
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS);
} catch (InterruptedException ex) {
logger.severe(ex.getMessage());
}
this.fetch();
return;
}

//Create URL
URI url;
try {
URI stationUrl = URI.create(station.getUrl());
url = new URI("ws", stationUrl.getHost(), "/detections", "");
} catch (URISyntaxException ex) {
this.logger.severe(ex.getMessage());
try {
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS);
} catch (InterruptedException e) {
logger.severe(e.getMessage());
}
this.fetch();
return;
}

WebsocketClient websocketClient = new WebsocketClient(url);
websocketClient.addOnOpenHandler(() -> {
websocketClient.sendMessage(wsMessageEncoded);
});
websocketClient.addMessageHandler((String msg) -> {
//Insert detections
List<Detection> new_detections = new ArrayList<>();
List<String> detection_mac_addresses = new ArrayList<>();
logger.info("Received message on WS");

try {
List<RonnyDetection> detections = Arrays.asList(mapper.readValue(msg, RonnyDetection[].class));
for (RonnyDetection detection : detections) {
new_detections.add(new Detection(
0,
station.getId(),
detection.rssi,
detection.battery,
detection.uptimeMs,
detection.id,
new Timestamp((long) (detection.detectionTimestamp * 1000)),
new Timestamp(System.currentTimeMillis())
));
detection_mac_addresses.add(detection.mac);
}
if (!new_detections.isEmpty()) {
detectionDAO.insertAllWithoutBaton(new_detections, detection_mac_addresses);
new_detections.forEach((detection) -> {
lappers.forEach((lapper) -> lapper.handle(detection));
positioners.forEach(positioner -> positioner.handle(detection));
});
}

logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size());
} catch (JsonProcessingException e) {
logger.severe(e.getMessage());
}
});
}

private class InitWSMessage {
public int lastId;

public InitWSMessage(int lastId) {
this.lastId = lastId;
}
}
int ERROR_TIMEOUT_MS = 2000;
//Timeout for a request to a station.
int REQUEST_TIMEOUT_S = 10;
//Full batch size, if this number of detections is reached, more are probably available immediately.
int FULL_BATCH_SIZE = 1000;
//Timeout when result has less than a full batch of detections.
int IDLE_TIMEOUT_MS = 4000; // Wait 4 seconds

void fetch();
}
42 changes: 42 additions & 0 deletions src/main/java/telraam/station/FetcherFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package telraam.station;

import org.jdbi.v3.core.Jdbi;
import telraam.database.models.Station;
import telraam.logic.lapper.Lapper;
import telraam.logic.positioner.Positioner;
import telraam.station.http.HTTPFetcher;
import telraam.station.websocket.WebsocketFetcher;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Set;
import java.util.logging.Logger;

public class FetcherFactory {
NuttyShrimp marked this conversation as resolved.
Show resolved Hide resolved
private final Logger logger = Logger.getLogger(FetcherFactory.class.getName());
private final Jdbi database;
private final Set<Lapper> lappers;
private final Set<Positioner> positioners;
public FetcherFactory(Jdbi database, Set<Lapper> lappers, Set<Positioner> positioners) {
this.database = database;
this.lappers = lappers;
this.positioners = positioners;
}

public Fetcher create(Station station) {
try {
URI stationURI = new URI(station.getUrl());
return switch (stationURI.getScheme()) {
case "ws" -> new WebsocketFetcher(database, station, lappers, positioners);
case "http" -> new HTTPFetcher(database, station, lappers, positioners);
default -> {
logger.severe(String.format("%s is not a valid scheme for a station", stationURI.getScheme()));
yield null;
}
};
} catch (URISyntaxException e) {
logger.severe(String.format("Failed to parse station URI: %s", e.getMessage()));
}
return null;
}
}
174 changes: 174 additions & 0 deletions src/main/java/telraam/station/http/HTTPFetcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package telraam.station.http;

import org.jdbi.v3.core.Jdbi;
import telraam.database.daos.BatonDAO;
import telraam.database.daos.DetectionDAO;
import telraam.database.daos.StationDAO;
import telraam.database.models.Baton;
import telraam.database.models.Detection;
import telraam.database.models.Station;
import telraam.logic.lapper.Lapper;
import telraam.logic.positioner.Positioner;
import telraam.station.Fetcher;
import telraam.station.models.RonnyDetection;
import telraam.station.models.RonnyResponse;

import java.io.IOException;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpConnectTimeoutException;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class HTTPFetcher implements Fetcher {
private final Set<Lapper> lappers;
private final Set<Positioner> positioners;
private Station station;

private final BatonDAO batonDAO;
private final DetectionDAO detectionDAO;
private final StationDAO stationDAO;

private final HttpClient client = HttpClient.newHttpClient();
NuttyShrimp marked this conversation as resolved.
Show resolved Hide resolved
private final Logger logger = Logger.getLogger(Fetcher.class.getName());


public HTTPFetcher(Jdbi database, Station station, Set<Lapper> lappers, Set<Positioner> positioners) {
this.batonDAO = database.onDemand(BatonDAO.class);
this.detectionDAO = database.onDemand(DetectionDAO.class);
this.stationDAO = database.onDemand(StationDAO.class);

this.lappers = lappers;
this.positioners = positioners;
this.station = station;
}

public void fetch() {
logger.info("Running Fetcher for station(" + this.station.getId() + ")");
JsonBodyHandler<RonnyResponse> bodyHandler = new JsonBodyHandler<>(RonnyResponse.class);

while (true) {
//Update the station to account for possible changes in the database
this.stationDAO.getById(station.getId()).ifPresentOrElse(
station -> this.station = station,
() -> this.logger.severe("Can't update station from database.")
);

//Get last detection id
int lastDetectionId = 0;
Optional<Detection> lastDetection = detectionDAO.latestDetectionByStationId(this.station.getId());
if (lastDetection.isPresent()) {
lastDetectionId = lastDetection.get().getRemoteId();
}

//Create URL
URI url;
try {
url = new URI(station.getUrl() + "/detections/" + lastDetectionId);
} catch (URISyntaxException ex) {
this.logger.severe(ex.getMessage());
try {
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS);
} catch (InterruptedException e) {
logger.severe(e.getMessage());
}
continue;
}

//Create request
HttpRequest request;
try {
request = HttpRequest.newBuilder()
.uri(url)
.version(HttpClient.Version.HTTP_1_1)
.timeout(Duration.ofSeconds(Fetcher.REQUEST_TIMEOUT_S))
.build();
} catch (IllegalArgumentException e) {
logger.severe(e.getMessage());
try {
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS);
} catch (InterruptedException ex) {
logger.severe(ex.getMessage());
}
continue;
}

//Do request
HttpResponse<Supplier<RonnyResponse>> response;
try {
try {
response = this.client.send(request, bodyHandler);
} catch (ConnectException | HttpConnectTimeoutException ex) {
this.logger.severe("Could not connect to " + request.uri());
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS);
continue;
} catch (IOException e) {
logger.severe(e.getMessage());
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS);
continue;
}
} catch (InterruptedException e) {
logger.severe(e.getMessage());
continue;
}

//Check response state
if (response.statusCode() != 200) {
this.logger.warning(
"Unexpected status code(" + response.statusCode() + ") when requesting " + url + " for station(" + this.station.getName() + ")"
);
continue;
}

//Fetch all batons and create a map by batonMAC
Map<String, Baton> baton_mac_map = batonDAO.getAll().stream()
.collect(Collectors.toMap(b -> b.getMac().toUpperCase(), Function.identity()));

//Insert detections
List<Detection> new_detections = new ArrayList<>();
List<RonnyDetection> detections = response.body().get().detections;
for (RonnyDetection detection : detections) {
if (baton_mac_map.containsKey(detection.mac.toUpperCase())) {
var baton = baton_mac_map.get(detection.mac.toUpperCase());
new_detections.add(new Detection(
baton.getId(),
station.getId(),
detection.rssi,
detection.battery,
detection.uptimeMs,
detection.id,
new Timestamp((long) (detection.detectionTimestamp * 1000)),
new Timestamp(System.currentTimeMillis())
));
}
}
if (!new_detections.isEmpty()) {
detectionDAO.insertAll(new_detections);
new_detections.forEach((detection) -> {
lappers.forEach((lapper) -> lapper.handle(detection));
positioners.forEach((positioner) -> positioner.handle(detection));
});
}

this.logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size());

//If few detections are retrieved from the station, wait for some time.
if (detections.size() < Fetcher.FULL_BATCH_SIZE) {
try {
Thread.sleep(Fetcher.IDLE_TIMEOUT_MS);
} catch (InterruptedException e) {
logger.severe(e.getMessage());
}
}
}
}
}
Loading