Skip to content

Commit

Permalink
Merge pull request #28 from IndicoDataSolutions/meg/bytestream-uploads
Browse files Browse the repository at this point in the history
[CAT-81] support bytestream uploads
  • Loading branch information
goatrocks authored Jun 21, 2021
2 parents 51f42f4 + e2f1de0 commit f70be94
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 52 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ signing {

group = "com.indico"
archivesBaseName = "indico-client-java"
version = "4.11.0"
version = "4.11.3"

uploadArchives {
repositories {
Expand Down
79 changes: 47 additions & 32 deletions examples/Submission.java
Original file line number Diff line number Diff line change
@@ -1,51 +1,51 @@
import com.indico.IndicoClient;
import com.indico.IndicoConfig;
import com.indico.entity.Submission;
import com.indico.jobs.Job;
import com.indico.mutation.WorkflowSubmission;
import com.indico.storage.Blob;
import com.indico.storage.RetrieveBlob;
import com.indico.type.JobStatus;
import com.indico.type.SubmissionFilter;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Hashtable;
import java.util.Set;
import java.util.*;

import com.indico.type.SubmissionStatus;
import org.apache.commons.io.FileUtils;
import com.indico.entity.Submission;
import org.json.JSONObject;

public class Submission {
public class SubmissionExample {

public static void main(String args[]) throws IOException {
IndicoConfig config = new IndicoConfig.Builder()
.host("app.indico.io")
.tokenPath("__TOKEN_PATH__")
.build();

int workflowId = 5;

try (IndicoClient client = new IndicoClient(config)) {
/*
* Create a new submission
* Generate a submission result as soon as the submission is done processing
* Then mark the submission has having been retrieved
*/
* Create a new submission using one of two methods, file path or bytestream.
* Generate a submission result as soon as the submission is done processing
* Then mark the submission has having been retrieved
*/

WorkflowSubmission workflowSubmission = client.workflowSubmission();

//Method 1: Add the submission file as a path to a file.
ArrayList<String> files = new ArrayList<>();
files.add("./path_to_file.pdf");
List<Integer> submissionIds = workflowSubmission.files(files).workflowId(workflowId).execute();
int submissionId = submissionIds.get(0);
Job job = client.submissionResult().submission(submissionId).execute();

while (job.status() == JobStatus.PENDING) {
try {
Thread.sleep(1000);
Job job = client.submissionResult().submission(submissionId).execute();
}catch(CompletionException ex){
this.indicoClient = new IndicoClient(config);
}
Thread.sleep(1000);
job = client.submissionResult().submission(submissionId).execute();
System.out.println("Job Status: " + job.status());
}

JSONObject obj = job.result();
Expand All @@ -56,12 +56,28 @@ public static void main(String args[]) throws IOException {
blob.close();
System.out.println(blob.asString());
client.updateSubmission().submissionId(submissionId).retrieved(true).execute();


//Method 2: Add the file(s) as byte streams.
WorkflowSubmission byteWorkflowSubmission = client.workflowSubmission();
String fileName = "./workflow-sample.pdf";
byte[] bytes = FileUtils.readFileToByteArray(new File(fileName));
Map<String, byte[]> maps = new HashMap<String,byte[]>();
maps.put(fileName,bytes);
submissionIds = byteWorkflowSubmission.byteStreams(maps).workflowId(workflowId).execute();
int streamSubmissionId = submissionIds.get(0);
Job streamJob = client.submissionResult().submission(streamSubmissionId).execute();

while (streamJob.status() == JobStatus.PENDING) {
Thread.sleep(1000);
streamJob = client.submissionResult().submission(submissionId).execute();
System.out.println("Job Status: " + streamJob.status());
}

/*
* List all submissions that are COMPLETE or FAILED
* Generate submission results for these
* Delay gathering the results until required
*/
* List all submissions that are COMPLETE or FAILED
* Generate submission results for these
* Delay gathering the results until required
*/
List<SubmissionFilter> filters = new ArrayList<>();
filters.add(SubmissionFilter.builder().status(SubmissionStatus.COMPLETE).build());
filters.add(SubmissionFilter.builder().status(SubmissionStatus.FAILED).build());
Expand All @@ -73,22 +89,21 @@ public static void main(String args[]) throws IOException {
Job j = client.generateSubmissionResult().submission(s).execute();
resultFiles.put(s, j);
}

// Do other fun things

Set<Submission> keySet = resultFiles.keySet();
for(Submission s : keySet) {
Job job = resultFiles.get(s);
job = resultFiles.get(s);

while (job.status() == JobStatus.PENDING) {
Thread.sleep(1000);
}

JSONObject obj = job.result();
String url = obj.getString("url");
RetrieveBlob retrieveBlob = client.retrieveBlob();
Blob blob = retrieveBlob.url(url).execute();
System.out.println("Submission " + submission.id + " has result " + blob.asString());
obj = job.result();
url = obj.getString("url");
retrieveBlob = client.retrieveBlob();
blob = retrieveBlob.url(url).execute();
System.out.println("Submission " + s.id + " has result " + blob.asString());
}
} catch (Exception e) {
e.printStackTrace();
Expand Down
19 changes: 12 additions & 7 deletions src/main/java/com/indico/RetryInterceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,35 @@ public RetryInterceptor(IndicoConfig indicoConfig) {
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();

Response response = chain.proceed(request);
boolean success = response.isSuccessful();
Response response = null;
boolean success = false;
int tryCount = 0;
while (!success && tryCount < indicoConfig.maxRetries) {
tryCount++;
try {
response = chain.proceed(request);
success = response.isSuccessful();

} catch(IOException ex){
} catch(IOException | RuntimeException ex){
success = false;
logger.trace("Failed to complete the request for" + request.url() + "retrying: " + ex.getMessage());
if(response != null){
response.close();
}
logger.trace("Failed to complete the request for " + request.url() + "retrying: " + ex.getMessage());
}
if(!success){
logger.trace("attempt " + tryCount + " failed for " + request.url() );
if(response != null && (tryCount + 1) < indicoConfig.maxRetries)
{
response.close();
logger.debug("Failed due to status code: " + response.code());

}
}
}
logger.trace("Completed in " + tryCount + " extra attempts. Successfully? " + success);
if(response == null){
throw new RuntimeException("Unable to complete the request, enable trace logging for more information");
}
return response;
}

}
55 changes: 43 additions & 12 deletions src/main/java/com/indico/mutation/WorkflowSubmission.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.indico.Mutation;
import com.indico.WorkflowSubmissionGraphQLMutation;
import com.indico.storage.UploadFile;
import com.indico.storage.UploadStream;
import com.indico.type.FileInput;


Expand All @@ -17,9 +18,7 @@
import org.json.JSONObject;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;

Expand All @@ -30,6 +29,8 @@ public class WorkflowSubmission implements Mutation<List<Integer>> {
private List<String> files;
private int id;
private UUID duplicationId;
private byte[] stream;
private Map<String, byte[]> streams;
private final Logger logger = LogManager.getLogger(WorkflowSubmission.class);

public WorkflowSubmission(IndicoClient client) {
Expand All @@ -46,6 +47,16 @@ public WorkflowSubmission files(List<String> files) {
return this;
}

/**
*
* @param streams Dictionary of String identifiers and byte streams to upload.
* @return
*/
public WorkflowSubmission byteStreams(Map<String, byte[]> streams){
this.streams = streams;
return this;
}

/**
* Id of workflow to submit files to
* @param id Workflow Id
Expand Down Expand Up @@ -76,15 +87,30 @@ public List<Integer> execute() {
JSONArray fileMetadata;
List<FileInput> files = new ArrayList<FileInput>();
try {
fileMetadata = this.upload(this.files);
for (Object f : fileMetadata) {
JSONObject uploadMeta = (JSONObject) f;
JSONObject meta = new JSONObject();
meta.put("name", uploadMeta.getString("name"));
meta.put("path", uploadMeta.getString("path"));
meta.put("upload_type", uploadMeta.getString("upload_type"));
FileInput input = FileInput.builder().filename(((JSONObject) f).getString("name")).filemeta(meta).build();
files.add(input);
if(this.files != null)
{
fileMetadata = this.upload(this.files);
for (Object f : fileMetadata) {
JSONObject uploadMeta = (JSONObject) f;
JSONObject meta = new JSONObject();
meta.put("name", uploadMeta.getString("name"));
meta.put("path", uploadMeta.getString("path"));
meta.put("upload_type", uploadMeta.getString("upload_type"));
FileInput input = FileInput.builder().filename(((JSONObject) f).getString("name")).filemeta(meta).build();
files.add(input);
}
}
if(this.streams != null){
fileMetadata = this.uploadBytes(this.streams);
for (Object f : fileMetadata) {
JSONObject uploadMeta = (JSONObject) f;
JSONObject meta = new JSONObject();
meta.put("name", uploadMeta.getString("name"));
meta.put("path", uploadMeta.getString("path"));
meta.put("upload_type", uploadMeta.getString("upload_type"));
FileInput input = FileInput.builder().filename(((JSONObject) f).getString("name")).filemeta(meta).build();
files.add(input);
}
}
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e.fillInStackTrace());
Expand Down Expand Up @@ -124,4 +150,9 @@ private JSONArray upload(List<String> filePaths) throws IOException {
UploadFile uploadRequest = new UploadFile(this.client);
return uploadRequest.filePaths(filePaths).call();
}

private JSONArray uploadBytes(Map<String, byte[]> stream) throws IOException {
UploadStream uploadRequest = new UploadStream(this.client);
return uploadRequest.byteStream(stream).call();
}
}
69 changes: 69 additions & 0 deletions src/main/java/com/indico/storage/UploadStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.indico.storage;

import com.indico.IndicoClient;
import com.indico.JSON;
import com.indico.RestRequest;
import okhttp3.*;
import org.json.JSONArray;

import java.io.File;
import java.io.IOException;
import java.util.Dictionary;
import java.util.Map;
import java.util.UUID;

public class UploadStream implements RestRequest<JSONArray> {

private IndicoClient client;
private Map<String,byte[]> bytes;
private String fileName;

public UploadStream(IndicoClient client) {
this.client = client;
}

/**
* A stream to upload.
*
* @param stream Dictionary of string identifiers (ideally file names) to byte[] streams
* @return UploadFile
*/
public UploadStream byteStream(Map<String,byte[]> stream) {
if(stream != null){
this.bytes = stream;
}
return this;
}

/**
* Upload files and return metadata
*
* @return JSONArray
* @throws IOException
*/
@Override
public JSONArray call() throws IOException {
String uploadUrl = this.client.config.getAppBaseUrl() + "/storage/files/store";

MultipartBody.Builder multipartBody = new MultipartBody.Builder().setType(MultipartBody.FORM);

if(this.fileName == null || this.fileName.isEmpty()){
this.fileName = UUID.randomUUID().toString();
}
for (String key : this.bytes.keySet()) {
multipartBody.addFormDataPart(key, key,
RequestBody.create(MediaType.parse("application/octet-stream"), this.bytes.get(key)));
}
MultipartBody requestBody = multipartBody.build();
Request request = new Request.Builder()
.url(uploadUrl)
.post(requestBody)
.build();

Response result = client.okHttpClient.newCall(request).execute();
String body = result.body().string();
JSONArray fileMeta = new JSON(body).asJSONArray();
result.close();
return (JSONArray) fileMeta;
}
}

0 comments on commit f70be94

Please sign in to comment.