Skip to content

Commit

Permalink
[PRDP-220] Fix queue-cosmos desync (#99)
Browse files Browse the repository at this point in the history
* [PRDP-220] Added save receipt method to cosmos client

* [PRDP-220] Added new reason error code for cosmos

* [PRDP-220] Implemented cosmos client and refactored code

* [PRDP-220] Changed BizEventToReceipt function logic to fix queue-cosmos desynch

* [PRDP-220] Updated unit tests

* [PRDP-220] Fix RecoverFailedReceipt function save failed queue status

* [PRDP-220] Fixed Cosmos Client insert by generating custom id for receipt

* [PRDP-220] Update unit tests

* [PRDP-220] Added charset utf-8 to bizEvent stringify
  • Loading branch information
svariant authored Nov 20, 2023
1 parent 94833d4 commit f187da6
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
import it.gov.pagopa.receipt.pdf.datastore.entity.receipt.Receipt;
import it.gov.pagopa.receipt.pdf.datastore.entity.receipt.enumeration.ReceiptStatusType;
import it.gov.pagopa.receipt.pdf.datastore.service.BizEventToReceiptService;
import it.gov.pagopa.receipt.pdf.datastore.service.PDVTokenizerService;
import it.gov.pagopa.receipt.pdf.datastore.service.impl.BizEventToReceiptServiceImpl;
import it.gov.pagopa.receipt.pdf.datastore.service.impl.PDVTokenizerServiceImpl;
import it.gov.pagopa.receipt.pdf.datastore.utils.BizEventToReceiptUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -71,7 +69,8 @@ public void processBizEventToReceipt(
OutputBinding<List<Receipt>> documentdb,
final ExecutionContext context) {

List<Receipt> itemsDone = new ArrayList<>();
int itemsDone = 0;
List<Receipt> receiptFailed = new ArrayList<>();

logger.info("[{}] stat {} function - num events triggered {}",
context.getFunctionName(),
Expand All @@ -87,49 +86,50 @@ public void processBizEventToReceipt(
discarder++;
continue;
}
try {

Receipt receipt;
try {
receipt = BizEventToReceiptUtils.createReceipt(bizEvent, bizEventToReceiptService, logger);
if (ReceiptStatusType.FAILED.equals(receipt.getStatus())) {
itemsDone.add(receipt);
continue;
}
} catch (Exception e) {
logger.error("Error during receipt creation for bizEvent with Id {}", bizEvent.getId(), e);
continue;
}

logger.info("[{}] function called at {} for event with id {} and status {}",
context.getFunctionName(), LocalDateTime.now(), bizEvent.getId(), bizEvent.getEventStatus());

Receipt receipt = BizEventToReceiptUtils.createReceipt(bizEvent, bizEventToReceiptService, logger);

logger.info("[{}] function called at {} for event with id {} and status {}",
context.getFunctionName(), LocalDateTime.now(), bizEvent.getId(), bizEvent.getEventStatus());

if (isReceiptStatusValid(receipt)) {
// Add receipt to items to be saved on CosmosDB
bizEventToReceiptService.handleSaveReceipt(receipt);
}

if (isReceiptStatusValid(receipt)) {
// Send biz event as message to queue (to be processed from the other function)
bizEventToReceiptService.handleSendMessageToQueue(bizEvent, receipt);
}

// Add receipt to items to be saved on CosmosDB
itemsDone.add(receipt);
} catch (Exception e) {
discarder++;
// Error info
logger.error("[{}] Error to process event with id {}", context.getFunctionName(), bizEvent.getId(), e);
if (!isReceiptStatusValid(receipt)) {
receiptFailed.add(receipt);
}

itemsDone++;
}
// Discarder info
logger.debug("[{}] itemsDone stat {} function - {} number of events in discarder", context.getFunctionName(),
context.getInvocationId(), discarder);
// Call to queue info
logger.debug("[{}] itemsDone stat {} function - number of events in DONE sent to the receipt queue {}",
context.getFunctionName(), context.getInvocationId(), itemsDone.size());
context.getFunctionName(), context.getInvocationId(), itemsDone);
// Call to datastore info
logger.debug("[{}] stat {} function - number of receipts inserted on the datastore {}",
context.getFunctionName(),
context.getInvocationId(), itemsDone.size());

// Save receipts data to CosmosDB
if (!itemsDone.isEmpty()) {
documentdb.setValue(itemsDone);
context.getInvocationId(), itemsDone);

// Save failed receipts to CosmosDB
if (!receiptFailed.isEmpty()) {
// Call to datastore info
logger.debug("[{}] stat {} function - number of receipts failed inserted on the datastore {}",
context.getFunctionName(),
context.getInvocationId(), receiptFailed.size());
documentdb.setValue(receiptFailed);
}
}

private static boolean isReceiptStatusValid(Receipt receipt) {
return receipt.getStatus() != ReceiptStatusType.FAILED && receipt.getStatus() != ReceiptStatusType.NOT_QUEUE_SENT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,12 @@ private void getEvent(String eventId, ExecutionContext context,
BizEventToReceiptUtils.tokenizeReceipt(bizEventToReceiptService, bizEvent, receipt);
}
bizEventToReceiptService.handleSendMessageToQueue(bizEvent, receipt);
receipt.setStatus(ReceiptStatusType.INSERTED);
receipt.setReasonErr(null);
receipt.setReasonErrPayer(null);
if(receipt.getStatus() != ReceiptStatusType.NOT_QUEUE_SENT){
receipt.setStatus(ReceiptStatusType.INSERTED);
receipt.setInserted_at(System.currentTimeMillis());
receipt.setReasonErr(null);
receipt.setReasonErrPayer(null);
}
receiptList.add(receipt);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package it.gov.pagopa.receipt.pdf.datastore.client;

import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.FeedResponse;
import it.gov.pagopa.receipt.pdf.datastore.entity.receipt.Receipt;
import it.gov.pagopa.receipt.pdf.datastore.exception.ReceiptNotFoundException;
Expand All @@ -9,4 +10,6 @@ public interface ReceiptCosmosClient {
Receipt getReceiptDocument(String receiptId) throws ReceiptNotFoundException;

Iterable<FeedResponse<Receipt>> getFailedReceiptDocuments(String continuationToken, Integer pageSize);

CosmosItemResponse<Receipt> saveReceipts(Receipt receipt);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosDatabase;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.*;
import com.azure.cosmos.util.CosmosPagedIterable;
import it.gov.pagopa.receipt.pdf.datastore.client.ReceiptCosmosClient;
import it.gov.pagopa.receipt.pdf.datastore.entity.receipt.Receipt;
Expand Down Expand Up @@ -94,4 +93,19 @@ public Iterable<FeedResponse<Receipt>> getFailedReceiptDocuments(String continua

}

/**
* Save Receipts on CosmosDB database
*
* @param receipt Receipts to save
* @return receipt documents
*/
@Override
public CosmosItemResponse<Receipt> saveReceipts(Receipt receipt) {
CosmosDatabase cosmosDatabase = this.cosmosClient.getDatabase(databaseId);

CosmosContainer cosmosContainer = cosmosDatabase.getContainer(containerId);

return cosmosContainer.createItem(receipt);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

public enum ReasonErrorCode {
ERROR_QUEUE(902),
ERROR_COSMOS(904),
ERROR_PDV_IO(800),
ERROR_PDV_UNEXPECTED(801),
ERROR_PDV_MAPPING(802);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import it.gov.pagopa.receipt.pdf.datastore.entity.receipt.EventData;
import it.gov.pagopa.receipt.pdf.datastore.entity.receipt.Receipt;
import it.gov.pagopa.receipt.pdf.datastore.exception.PDVTokenizerException;
import it.gov.pagopa.receipt.pdf.datastore.client.ReceiptCosmosClient;

public interface BizEventToReceiptService {

Expand All @@ -16,6 +17,13 @@ public interface BizEventToReceiptService {
*/
void handleSendMessageToQueue(BizEvent bizEvent, Receipt receipt);

/**
* Saves receipts on CosmosDB using {@link ReceiptCosmosClient}
*
* @param receipt Receipt to save
*/
void handleSaveReceipt(Receipt receipt);

/**
* Retrieve conditionally the transaction creation date from biz-event
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package it.gov.pagopa.receipt.pdf.datastore.service.impl;

import com.azure.core.http.rest.Response;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.storage.queue.models.SendMessageResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.microsoft.azure.functions.HttpStatus;
import it.gov.pagopa.receipt.pdf.datastore.client.ReceiptCosmosClient;
import it.gov.pagopa.receipt.pdf.datastore.client.ReceiptQueueClient;
import it.gov.pagopa.receipt.pdf.datastore.client.impl.ReceiptCosmosClientImpl;
import it.gov.pagopa.receipt.pdf.datastore.client.impl.ReceiptQueueClientImpl;
import it.gov.pagopa.receipt.pdf.datastore.entity.event.BizEvent;
import it.gov.pagopa.receipt.pdf.datastore.entity.receipt.EventData;
Expand All @@ -18,6 +22,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Objects;

Expand All @@ -26,13 +31,19 @@ public class BizEventToReceiptServiceImpl implements BizEventToReceiptService {
private final Logger logger = LoggerFactory.getLogger(BizEventToReceiptServiceImpl.class);

private final PDVTokenizerServiceRetryWrapper pdvTokenizerService;
private final ReceiptCosmosClient receiptCosmosClient;
private final ReceiptQueueClient queueClient;

public BizEventToReceiptServiceImpl() {
this.pdvTokenizerService = new PDVTokenizerServiceRetryWrapperImpl();
this.receiptCosmosClient = ReceiptCosmosClientImpl.getInstance();
this.queueClient = ReceiptQueueClientImpl.getInstance();
}

public BizEventToReceiptServiceImpl(PDVTokenizerServiceRetryWrapper pdvTokenizerService) {
public BizEventToReceiptServiceImpl(PDVTokenizerServiceRetryWrapper pdvTokenizerService, ReceiptCosmosClient receiptCosmosClient, ReceiptQueueClient queueClient) {
this.pdvTokenizerService = pdvTokenizerService;
this.receiptCosmosClient = receiptCosmosClient;
this.queueClient = queueClient;
}

/**
Expand All @@ -41,37 +52,67 @@ public BizEventToReceiptServiceImpl(PDVTokenizerServiceRetryWrapper pdvTokenizer
@Override
public void handleSendMessageToQueue(BizEvent bizEvent, Receipt receipt) {
//Encode biz-event to base64 string
String messageText = Base64.getMimeEncoder().encodeToString(Objects.requireNonNull(ObjectMapperUtils.writeValueAsString(bizEvent)).getBytes());

ReceiptQueueClientImpl queueService = ReceiptQueueClientImpl.getInstance();
String messageText = Base64.getMimeEncoder().encodeToString(
Objects.requireNonNull(ObjectMapperUtils.writeValueAsString(bizEvent)).getBytes(StandardCharsets.UTF_8)
);

//Add message to the queue
int statusCode;
try {
Response<SendMessageResult> sendMessageResult = queueService.sendMessageToQueue(messageText);
Response<SendMessageResult> sendMessageResult = queueClient.sendMessageToQueue(messageText);

if (sendMessageResult.getStatusCode() == HttpStatus.CREATED.value()) {
receipt.setStatus(ReceiptStatusType.INSERTED);
receipt.setInserted_at(System.currentTimeMillis());
} else {
handleError(receipt);
}
statusCode = sendMessageResult.getStatusCode();
} catch (Exception e) {
handleError(receipt);
statusCode = ReasonErrorCode.ERROR_QUEUE.getCode();
logger.error(String.format("Sending BizEvent with id %s to queue failed", bizEvent.getId()), e);
}

if (statusCode != HttpStatus.CREATED.value()) {
String errorString = String.format(
"[BizEventToReceiptService] Error sending message to queue for receipt with eventId %s",
receipt.getEventId());
handleError(receipt, ReceiptStatusType.NOT_QUEUE_SENT, errorString, statusCode);
//Error info
logger.error("Error sending to queue biz-event message with id {}", bizEvent.getId(), e);
logger.error(errorString);
}
}

/**
* Handles errors for queue and updates receipt's status accordingly
* {@inheritDoc}
*/
@Override
public void handleSaveReceipt(Receipt receipt) {
int statusCode;

try {
receipt.setStatus(ReceiptStatusType.INSERTED);
receipt.setInserted_at(System.currentTimeMillis());
CosmosItemResponse<Receipt> response = receiptCosmosClient.saveReceipts(receipt);

statusCode = response.getStatusCode();
} catch (Exception e) {
statusCode = ReasonErrorCode.ERROR_COSMOS.getCode();
logger.error(String.format("Save receipt with eventId %s on cosmos failed", receipt.getEventId()), e);
}

if (statusCode != (HttpStatus.CREATED.value())) {
String errorString = String.format(
"[BizEventToReceiptService] Error saving receipt to cosmos for receipt with eventId %s, cosmos client responded with status %s",
receipt.getEventId(), statusCode);
handleError(receipt, ReceiptStatusType.FAILED, errorString, statusCode);
//Error info
logger.error(errorString);
}
}

/**
* Handles errors for queue and cosmos and updates receipt's status accordingly
*
* @param receipt Receipt to update
*/
private void handleError(Receipt receipt) {
receipt.setStatus(ReceiptStatusType.NOT_QUEUE_SENT);
ReasonError reasonError = new ReasonError(ReasonErrorCode.ERROR_QUEUE.getCode(),
String.format("[BizEventToReceiptService] Error sending message to queue" +
" for receipt with eventId %s", receipt.getEventId()));
private void handleError(Receipt receipt, ReceiptStatusType statusType, String errorMessage, int errorCode) {
receipt.setStatus(statusType);
ReasonError reasonError = new ReasonError(errorCode, errorMessage);
receipt.setReasonErr(reasonError);
}

Expand Down Expand Up @@ -109,7 +150,7 @@ public void tokenizeFiscalCodes(BizEvent bizEvent, Receipt receipt, EventData ev
} catch (PDVTokenizerException e) {
handleTokenizerException(receipt, e.getMessage(), e.getStatusCode());
throw e;
} catch (JsonProcessingException e){
} catch (JsonProcessingException e) {
handleTokenizerException(receipt, e.getMessage(), ReasonErrorCode.ERROR_PDV_MAPPING.getCode());
throw e;
}
Expand All @@ -118,9 +159,9 @@ public void tokenizeFiscalCodes(BizEvent bizEvent, Receipt receipt, EventData ev
/**
* Handles errors for PDV tokenizer and updates receipt's status accordingly
*
* @param receipt Receipt to update
* @param receipt Receipt to update
* @param errorMessage Message to save
* @param statusCode StatusCode to save
* @param statusCode StatusCode to save
*/
private void handleTokenizerException(Receipt receipt, String errorMessage, int statusCode) {
receipt.setStatus(ReceiptStatusType.FAILED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.util.Collections;
import java.util.List;
import java.util.UUID;

public class BizEventToReceiptUtils {

Expand All @@ -28,6 +29,7 @@ public static Receipt createReceipt(BizEvent bizEvent, BizEventToReceiptService
Receipt receipt = new Receipt();

// Insert biz-event data into receipt
receipt.setId(bizEvent.getId()+UUID.randomUUID());
receipt.setEventId(bizEvent.getId());

EventData eventData = new EventData();
Expand Down Expand Up @@ -58,9 +60,9 @@ public static Receipt createReceipt(BizEvent bizEvent, BizEventToReceiptService
/**
* Checks if the instance of Biz Event is in status DONE and contsains all required informations to process
* in the receipt generation
* @param bizEvent
* @param context
* @param logger
* @param bizEvent BizEvent to validate
* @param context Function context
* @param logger Function logger
* @return boolean to determine if the proposed event is invalid
*/
public static boolean isBizEventInvalid(BizEvent bizEvent, ExecutionContext context, Logger logger) {
Expand Down
Loading

0 comments on commit f187da6

Please sign in to comment.