diff --git a/storage/amazon_s3.go b/storage/amazon_s3.go index 3f389548a..7e43e1687 100644 --- a/storage/amazon_s3.go +++ b/storage/amazon_s3.go @@ -19,8 +19,8 @@ import ( var endpointRegExp = regexp.MustCompile("^(http[s]?://)?(.[^/]+)(/+)?$") -// S3Protocol defines the expected URL prefix for S3, "s3://" -const S3Protocol = "s3://" +// s3Protocol defines the s3 URL protocol +const s3Protocol = "s3://" // AmazonS3Backend provides access to an S3 object store. type AmazonS3Backend struct { @@ -41,14 +41,14 @@ func NewAmazonS3Backend(conf config.AmazonS3Storage) (*AmazonS3Backend, error) { } // Get copies an object from S3 to the host path. -func (s3b *AmazonS3Backend) Get(ctx context.Context, url string, hostPath string, class tes.FileType) error { +func (s3b *AmazonS3Backend) Get(ctx context.Context, rawurl string, hostPath string, class tes.FileType) error { - bucket, key := s3b.processUrl(url) + url := s3b.parse(rawurl) - region, err := s3manager.GetBucketRegion(ctx, s3b.sess, bucket, "us-east-1") + region, err := s3manager.GetBucketRegion(ctx, s3b.sess, url.bucket, "us-east-1") if err != nil { if aerr, ok := err.(awserr.Error); ok && aerr.Code() == "NotFound" { - return fmt.Errorf("unable to find bucket %s's region not found: %v", bucket, aerr) + return fmt.Errorf("unable to find bucket %s's region not found: %v", url.bucket, aerr) } return err } @@ -67,8 +67,8 @@ func (s3b *AmazonS3Backend) Get(ctx context.Context, url string, hostPath string } _, err = manager.DownloadWithContext(ctx, hf, &s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), + Bucket: aws.String(url.bucket), + Key: aws.String(url.path), }) if err != nil { return err @@ -82,12 +82,12 @@ func (s3b *AmazonS3Backend) Get(ctx context.Context, url string, hostPath string case Directory: err = client.ListObjectsV2PagesWithContext( ctx, - &s3.ListObjectsV2Input{Bucket: &bucket, Prefix: &key}, + &s3.ListObjectsV2Input{Bucket: aws.String(url.bucket), Prefix: aws.String(url.path)}, func(page *s3.ListObjectsV2Output, more bool) bool { for _, obj := range page.Contents { - if *obj.Key != key+"/" { + if *obj.Key != url.path+"/" { // Create the directories in the path - file := filepath.Join(hostPath, strings.TrimPrefix(*obj.Key, key+"/")) + file := filepath.Join(hostPath, strings.TrimPrefix(*obj.Key, url.path+"/")) if err := os.MkdirAll(filepath.Dir(file), 0775); err != nil { panic(err) } @@ -100,7 +100,7 @@ func (s3b *AmazonS3Backend) Get(ctx context.Context, url string, hostPath string // Download the file using the AWS SDK _, err = manager.DownloadWithContext(ctx, hf, &s3.GetObjectInput{ - Bucket: aws.String(bucket), + Bucket: aws.String(url.bucket), Key: obj.Key, }) if err != nil { @@ -128,14 +128,14 @@ func (s3b *AmazonS3Backend) Get(ctx context.Context, url string, hostPath string } // PutFile copies an object (file) from the host path to S3. -func (s3b *AmazonS3Backend) PutFile(ctx context.Context, url string, hostPath string) error { +func (s3b *AmazonS3Backend) PutFile(ctx context.Context, rawurl string, hostPath string) error { - bucket, key := s3b.processUrl(url) + url := s3b.parse(rawurl) - region, err := s3manager.GetBucketRegion(ctx, s3b.sess, bucket, "us-east-1") + region, err := s3manager.GetBucketRegion(ctx, s3b.sess, url.bucket, "us-east-1") if err != nil { if aerr, ok := err.(awserr.Error); ok && aerr.Code() == "NotFound" { - return fmt.Errorf("unable to find bucket %s's region not found", bucket) + return fmt.Errorf("unable to find bucket %s's region not found", url.bucket) } return err } @@ -149,8 +149,8 @@ func (s3b *AmazonS3Backend) PutFile(ctx context.Context, url string, hostPath st return fmt.Errorf("failed to open file %q, %v", hostPath, err) } _, err = manager.UploadWithContext(ctx, &s3manager.UploadInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), + Bucket: aws.String(url.bucket), + Key: aws.String(url.path), Body: fh, }) if err != nil { @@ -162,13 +162,13 @@ func (s3b *AmazonS3Backend) PutFile(ctx context.Context, url string, hostPath st // Supports indicates whether this backend supports the given storage request. // For S3, the url must start with "s3://". -func (s3b *AmazonS3Backend) Supports(url string, hostPath string, class tes.FileType) bool { - if !strings.HasPrefix(url, S3Protocol) { +func (s3b *AmazonS3Backend) Supports(rawurl string, hostPath string, class tes.FileType) bool { + if !strings.HasPrefix(rawurl, s3Protocol) { return false } - bucket, _ := s3b.processUrl(url) - _, err := s3manager.GetBucketRegion(context.Background(), s3b.sess, bucket, "us-east-1") + url := s3b.parse(rawurl) + _, err := s3manager.GetBucketRegion(context.Background(), s3b.sess, url.bucket, "us-east-1") if err != nil { return false } @@ -176,8 +176,8 @@ func (s3b *AmazonS3Backend) Supports(url string, hostPath string, class tes.File return true } -func (s3b *AmazonS3Backend) processUrl(url string) (string, string) { - path := strings.TrimPrefix(url, S3Protocol) +func (s3b *AmazonS3Backend) parse(url string) *urlparts { + path := strings.TrimPrefix(url, s3Protocol) if s3b.endpoint != "" { path = strings.TrimPrefix(path, s3b.endpoint) } else { @@ -189,5 +189,5 @@ func (s3b *AmazonS3Backend) processUrl(url string) (string, string) { bucket := split[0] key := split[1] - return bucket, key + return &urlparts{bucket, key} } diff --git a/storage/generic_s3.go b/storage/generic_s3.go index 2bcb1b83a..6d389031b 100644 --- a/storage/generic_s3.go +++ b/storage/generic_s3.go @@ -32,12 +32,12 @@ func NewGenericS3Backend(conf config.S3Storage) (*GenericS3Backend, error) { } // Get copies an object from S3 to the host path. -func (s3 *GenericS3Backend) Get(ctx context.Context, url string, hostPath string, class tes.FileType) error { - bucket, key := s3.processUrl(url) +func (s3 *GenericS3Backend) Get(ctx context.Context, rawurl string, hostPath string, class tes.FileType) error { + url := s3.parse(rawurl) switch class { case File: - err := s3.client.FGetObjectWithContext(ctx, bucket, key, hostPath, minio.GetObjectOptions{}) + err := s3.client.FGetObjectWithContext(ctx, url.bucket, url.path, hostPath, minio.GetObjectOptions{}) if err != nil { return err } @@ -48,13 +48,13 @@ func (s3 *GenericS3Backend) Get(ctx context.Context, url string, hostPath string defer close(doneCh) // Recursively list all objects in 'mytestbucket' recursive := true - for obj := range s3.client.ListObjectsV2(bucket, key, recursive, doneCh) { + for obj := range s3.client.ListObjectsV2(url.bucket, url.path, recursive, doneCh) { // Create the directories in the path - file := filepath.Join(hostPath, strings.TrimPrefix(obj.Key, key+"/")) + file := filepath.Join(hostPath, strings.TrimPrefix(obj.Key, url.path+"/")) if err := os.MkdirAll(filepath.Dir(file), 0775); err != nil { return err } - err := s3.client.FGetObjectWithContext(ctx, bucket, obj.Key, file, minio.GetObjectOptions{}) + err := s3.client.FGetObjectWithContext(ctx, url.bucket, obj.Key, file, minio.GetObjectOptions{}) if err != nil { return err } @@ -68,34 +68,32 @@ func (s3 *GenericS3Backend) Get(ctx context.Context, url string, hostPath string } // Put copies an object (file) from the host path to S3. -func (s3 *GenericS3Backend) PutFile(ctx context.Context, url string, hostPath string) error { - bucket, key := s3.processUrl(url) - - _, err := s3.client.FPutObjectWithContext(ctx, bucket, key, hostPath, minio.PutObjectOptions{}) +func (s3 *GenericS3Backend) PutFile(ctx context.Context, rawurl string, hostPath string) error { + url := s3.parse(rawurl) + _, err := s3.client.FPutObjectWithContext(ctx, url.bucket, url.path, hostPath, minio.PutObjectOptions{}) return err } // Supports indicates whether this backend supports the given storage request. // For S3, the url must start with "s3://". -func (s3 *GenericS3Backend) Supports(url string, hostPath string, class tes.FileType) bool { - if !strings.HasPrefix(url, S3Protocol) { +func (s3 *GenericS3Backend) Supports(rawurl string, hostPath string, class tes.FileType) bool { + if !strings.HasPrefix(rawurl, s3Protocol) { return false } - bucket, _ := s3.processUrl(url) - found, _ := s3.client.BucketExists(bucket) + url := s3.parse(rawurl) + found, _ := s3.client.BucketExists(url.bucket) return found } -func (s3 *GenericS3Backend) processUrl(url string) (string, string) { - path := strings.TrimPrefix(url, S3Protocol) +func (s3 *GenericS3Backend) parse(url string) *urlparts { + path := strings.TrimPrefix(url, s3Protocol) path = strings.TrimPrefix(path, s3.endpoint) split := strings.SplitN(path, "/", 2) bucket := split[0] key := split[1] - - return bucket, key + return &urlparts{bucket, key} } diff --git a/storage/gs.go b/storage/gs.go index 7e683905e..55fa31999 100644 --- a/storage/gs.go +++ b/storage/gs.go @@ -13,14 +13,13 @@ import ( "io" "io/ioutil" "net/http" - urllib "net/url" "os" "path" "strings" ) // The gs url protocol -const gsscheme = "gs" +const gsProtocol = "gs://" // GSBackend provides access to an GS object store. type GSBackend struct { @@ -65,10 +64,7 @@ func NewGSBackend(conf config.GSStorage) (*GSBackend, error) { // Get copies an object from GS to the host path. func (gs *GSBackend) Get(ctx context.Context, rawurl string, hostPath string, class tes.FileType) error { - url, perr := parse(rawurl) - if perr != nil { - return perr - } + url := gs.parse(rawurl) if class == tes.FileType_FILE { call := gs.svc.Objects.Get(url.bucket, url.path) @@ -114,10 +110,7 @@ func download(call *storage.ObjectsGetCall, hostPath string) error { // PutFile copies an object (file) from the host path to GS. func (gs *GSBackend) PutFile(ctx context.Context, rawurl string, hostPath string) error { - url, perr := parse(rawurl) - if perr != nil { - return perr - } + url := gs.parse(rawurl) reader, oerr := os.Open(hostPath) if oerr != nil { @@ -135,28 +128,13 @@ func (gs *GSBackend) PutFile(ctx context.Context, rawurl string, hostPath string // Supports returns true if this backend supports the given storage request. // The Google Storage backend supports URLs which have a "gs://" scheme. func (gs *GSBackend) Supports(rawurl string, hostPath string, class tes.FileType) bool { - _, err := parse(rawurl) - if err != nil { - return false - } - return true -} - -type urlparts struct { - bucket string - path string + return strings.HasPrefix(rawurl, gsProtocol) } -func parse(rawurl string) (*urlparts, error) { - url, err := urllib.Parse(rawurl) - if err != nil { - return nil, err - } - if url.Scheme != gsscheme { - return nil, fmt.Errorf("Invalid URL scheme '%s' for Google Storage backend in url: %s", url.Scheme, rawurl) - } - - bucket := url.Host - path := strings.TrimLeft(url.EscapedPath(), "/") - return &urlparts{bucket, path}, nil +func (gs *GSBackend) parse(rawurl string) *urlparts { + path := strings.TrimPrefix(rawurl, gsProtocol) + split := strings.SplitN(path, "/", 2) + bucket := split[0] + key := split[1] + return &urlparts{bucket, key} } diff --git a/storage/gs_test.go b/storage/gs_test.go deleted file mode 100644 index 061ff2f8d..000000000 --- a/storage/gs_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package storage - -import ( - "context" - "github.com/ohsu-comp-bio/funnel/config" - "github.com/ohsu-comp-bio/funnel/proto/tes" - "os" - "testing" -) - -func authed(t *testing.T) *GSBackend { - accountFile := os.Getenv("TES_TEST_GS_ACCOUNT_FILE") - - if accountFile == "" { - t.Skip("No Google Cloud account file. Set TES_TEST_GS_ACCOUNT_FILE") - } - conf := config.GSStorage{ - AccountFile: accountFile, - } - - var err error - var gs *GSBackend - gs, err = NewGSBackend(conf) - if err != nil { - t.Errorf("Can't get authed backend: %s", err) - } - return gs -} - -func TestGet(t *testing.T) { - ctx := context.Background() - g := authed(t) - gs := Storage{}.WithBackend(g) - - gerr := gs.Get(ctx, "gs://uspto-pair/applications/05900016.zip", "_test_download/downloaded", tes.FileType_FILE) - if gerr != nil { - t.Error(gerr) - } -} - -func TestPut(t *testing.T) { - ctx := context.Background() - g := authed(t) - gs := Storage{}.WithBackend(g) - - _, gerr := gs.Put(ctx, "gs://ohsu-cromwell-testing.appspot.com/go_test_put", "_test_files/for_put", tes.FileType_FILE) - if gerr != nil { - t.Error(gerr) - } -} - -func TestTrimSlashes(t *testing.T) { - ctx := context.Background() - g := authed(t) - gs := Storage{}.WithBackend(g) - - _, gerr := gs.Put(ctx, "gs://ohsu-cromwell-testing.appspot.com///go_test_put", "_test_files/for_put", tes.FileType_FILE) - if gerr != nil { - t.Error(gerr) - } -} diff --git a/storage/storage.go b/storage/storage.go index 155c94d9d..814004d31 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -205,3 +205,8 @@ func fileSize(path string) int64 { } return st.Size() } + +type urlparts struct { + bucket string + path string +} diff --git a/storage/storage_test.go b/storage/storage_test.go index 85a97e55d..57c023a05 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -73,27 +73,27 @@ func TestS3UrlProcessing(t *testing.T) { expectedBucket := "1000genomes" expectedKey := "README.analysis_history" - bucket, key := b.processUrl("s3://s3.amazonaws.com/1000genomes/README.analysis_history") - if bucket != expectedBucket { + url := b.parse("s3://s3.amazonaws.com/1000genomes/README.analysis_history") + if url.bucket != expectedBucket { t.Log("expected:", expectedBucket) - t.Log("actual:", bucket) + t.Log("actual:", url.bucket) t.Error("wrong bucket") } - if key != expectedKey { + if url.path != expectedKey { t.Log("expected:", expectedKey) - t.Log("actual:", key) + t.Log("actual:", url.path) t.Error("wrong key") } - bucket, key = b.processUrl("s3://1000genomes/README.analysis_history") - if bucket != expectedBucket { + url = b.parse("s3://1000genomes/README.analysis_history") + if url.bucket != expectedBucket { t.Log("expected:", expectedBucket) - t.Log("actual:", bucket) + t.Log("actual:", url.bucket) t.Error("wrong bucket") } - if key != expectedKey { + if url.path != expectedKey { t.Log("expected:", expectedKey) - t.Log("actual:", key) + t.Log("actual:", url.path) t.Error("wrong key") } @@ -102,39 +102,39 @@ func TestS3UrlProcessing(t *testing.T) { t.Fatal("Error creating generic S3 backend:", err) } - bucket, key = ab.processUrl("s3://s3.amazonaws.com/1000genomes/README.analysis_history") - if bucket != expectedBucket { + url = ab.parse("s3://s3.amazonaws.com/1000genomes/README.analysis_history") + if url.bucket != expectedBucket { t.Log("expected:", expectedBucket) - t.Log("actual:", bucket) + t.Log("actual:", url.bucket) t.Error("wrong bucket") } - if key != expectedKey { + if url.path != expectedKey { t.Log("expected:", expectedKey) - t.Log("actual:", key) + t.Log("actual:", url.path) t.Error("wrong key") } - bucket, key = ab.processUrl("s3://s3.us-west-2.amazonaws.com/1000genomes/README.analysis_history") - if bucket != expectedBucket { + url = ab.parse("s3://s3.us-west-2.amazonaws.com/1000genomes/README.analysis_history") + if url.bucket != expectedBucket { t.Log("expected:", expectedBucket) - t.Log("actual:", bucket) + t.Log("actual:", url.bucket) t.Error("wrong bucket") } - if key != expectedKey { + if url.path != expectedKey { t.Log("expected:", expectedKey) - t.Log("actual:", key) + t.Log("actual:", url.path) t.Error("wrong key") } - bucket, key = ab.processUrl("s3://1000genomes/README.analysis_history") - if bucket != expectedBucket { + url = ab.parse("s3://1000genomes/README.analysis_history") + if url.bucket != expectedBucket { t.Log("expected:", expectedBucket) - t.Log("actual:", bucket) + t.Log("actual:", url.bucket) t.Error("wrong bucket") } - if key != expectedKey { + if url.path != expectedKey { t.Log("expected:", expectedKey) - t.Log("actual:", key) + t.Log("actual:", url.path) t.Error("wrong key") } } diff --git a/storage/swift.go b/storage/swift.go index adbd97674..af87b7617 100644 --- a/storage/swift.go +++ b/storage/swift.go @@ -8,13 +8,12 @@ import ( "github.com/ohsu-comp-bio/funnel/proto/tes" "github.com/ohsu-comp-bio/funnel/util" "io" - urllib "net/url" "os" "path" "strings" ) -const swiftScheme = "swift" +const swiftProtocol = "swift://" // SwiftBackend provides access to an sw object store. type SwiftBackend struct { @@ -42,22 +41,20 @@ func NewSwiftBackend(conf config.SwiftStorage) (*SwiftBackend, error) { return nil, err } - // Authenticate - err = c.Authenticate() - if err != nil { - return nil, err - } return &SwiftBackend{&c}, nil } // Get copies an object from storage to the host path. func (sw *SwiftBackend) Get(ctx context.Context, rawurl string, hostPath string, class tes.FileType) error { - - url, perr := sw.parse(rawurl) - if perr != nil { - return perr + if !sw.conn.Authenticated() { + err := sw.conn.Authenticate() + if err != nil { + return fmt.Errorf("error connecting to Swift server: %v", err) + } } + url := sw.parse(rawurl) + switch class { case tes.FileType_FILE: @@ -122,11 +119,15 @@ func (sw *SwiftBackend) get(src io.Reader, hostPath string) error { // PutFile copies an object (file) from the host path to storage. func (sw *SwiftBackend) PutFile(ctx context.Context, rawurl string, hostPath string) error { - url, perr := sw.parse(rawurl) - if perr != nil { - return perr + if !sw.conn.Authenticated() { + err := sw.conn.Authenticate() + if err != nil { + return fmt.Errorf("error connecting to Swift server: %v", err) + } } + url := sw.parse(rawurl) + reader, oerr := os.Open(hostPath) if oerr != nil { return oerr @@ -145,26 +146,16 @@ func (sw *SwiftBackend) PutFile(ctx context.Context, rawurl string, hostPath str return writer.Close() } -func (sw *SwiftBackend) parse(rawurl string) (*urlparts, error) { - url, err := urllib.Parse(rawurl) - if err != nil { - return nil, err - } - if url.Scheme != swiftScheme { - return nil, fmt.Errorf("Invalid URL scheme '%s' for Swift Storage backend in url: %s", url.Scheme, rawurl) - } - - bucket := url.Host - path := strings.TrimLeft(url.EscapedPath(), "/") - return &urlparts{bucket, path}, nil -} - // Supports indicates whether this backend supports the given storage request. -// For sw, the url must start with "sw://". +// For swift, the url must start with "swift://". func (sw *SwiftBackend) Supports(rawurl string, hostPath string, class tes.FileType) bool { - _, err := sw.parse(rawurl) - if err != nil { - return false - } - return true + return strings.HasPrefix(rawurl, swiftProtocol) +} + +func (sw *SwiftBackend) parse(rawurl string) *urlparts { + path := strings.TrimPrefix(rawurl, swiftProtocol) + split := strings.SplitN(path, "/", 2) + bucket := split[0] + key := split[1] + return &urlparts{bucket, key} } diff --git a/tests/config_utils.go b/tests/config_utils.go index 427675ca7..0dcf9aec6 100644 --- a/tests/config_utils.go +++ b/tests/config_utils.go @@ -32,6 +32,7 @@ func init() { func DefaultConfig() config.Config { conf := config.DefaultConfig() conf.Worker.Storage.AmazonS3.Disabled = true + conf.Worker.Storage.GS.Disabled = true conf.Worker.Storage.Swift.Disabled = true // Get config from test command line flag, if present. diff --git a/tests/gs.config.yml b/tests/gs.config.yml new file mode 100644 index 000000000..b361932ef --- /dev/null +++ b/tests/gs.config.yml @@ -0,0 +1,4 @@ +Worker: + Storage: + GS: + Disabled: false diff --git a/tests/storage/generic_s3_test.go b/tests/storage/generic_s3_test.go index e0ec35620..6f060f828 100644 --- a/tests/storage/generic_s3_test.go +++ b/tests/storage/generic_s3_test.go @@ -22,12 +22,6 @@ func TestGenericS3Storage(t *testing.T) { t.Skipf("Skipping generic s3 e2e tests...") } - store := storage.Storage{} - store, err := store.WithConfig(conf.Worker.Storage) - if err != nil { - t.Fatal("error configuring storage:", err) - } - testBucket := "funnel-e2e-tests-" + tests.RandomString(6) sconf := conf.Worker.Storage.S3[0] @@ -36,33 +30,38 @@ func TestGenericS3Storage(t *testing.T) { if err != nil { t.Fatal("error creating s3 client:", err) } - err = client.MakeBucket(testBucket, "") if err != nil { t.Fatal("error creating test s3 bucket:", err) } - defer func() { minioEmptyBucket(client, testBucket) client.RemoveBucket(testBucket) }() + protocol := "s3://" + + store, err := storage.Storage{}.WithConfig(conf.Worker.Storage) + if err != nil { + t.Fatal("error configuring storage:", err) + } + fPath := "testdata/test_in" - inFileURL := "gs3://" + testBucket + "/" + fPath + inFileURL := protocol + testBucket + "/" + fPath _, err = store.Put(context.Background(), inFileURL, fPath, tes.FileType_FILE) if err != nil { t.Fatal("error uploading test file:", err) } dPath := "testdata/test_dir" - inDirURL := "gs3://" + testBucket + "/" + dPath + inDirURL := protocol + testBucket + "/" + dPath _, err = store.Put(context.Background(), inDirURL, dPath, tes.FileType_DIRECTORY) if err != nil { t.Fatal("error uploading test directory:", err) } - outFileURL := "gs3://" + testBucket + "/" + "test-output-file.txt" - outDirURL := "gs3://" + testBucket + "/" + "test-output-directory" + outFileURL := protocol + testBucket + "/" + "test-output-file.txt" + outDirURL := protocol + testBucket + "/" + "test-output-directory" task := &tes.Task{ Name: "s3 e2e", @@ -153,7 +152,6 @@ func TestGenericS3Storage(t *testing.T) { tests.SetLogOutput(log, t) } -// minioEmptyBucket empties the S3 bucket func minioEmptyBucket(client *minio.Client, bucket string) error { log.Info("Removing objects from S3 bucket : ", bucket) doneCh := make(chan struct{}) diff --git a/tests/storage/gs_test.go b/tests/storage/gs_test.go new file mode 100644 index 000000000..8b784dfb3 --- /dev/null +++ b/tests/storage/gs_test.go @@ -0,0 +1,173 @@ +package storage + +import ( + gs "cloud.google.com/go/storage" + // "context" + "flag" + "fmt" + "github.com/ohsu-comp-bio/funnel/proto/tes" + "github.com/ohsu-comp-bio/funnel/storage" + "github.com/ohsu-comp-bio/funnel/tests" + "golang.org/x/net/context" + "io/ioutil" + "testing" +) + +var projectID string + +func init() { + flag.StringVar(&projectID, "projectID", projectID, "Google project ID") + flag.Parse() +} + +func TestGoogleStorage(t *testing.T) { + tests.SetLogOutput(log, t) + + if !conf.Worker.Storage.GS.Valid() { + t.Skipf("Skipping google storage e2e tests...") + } + + testBucket := "funnel-e2e-tests-" + tests.RandomString(6) + + cli, err := newGsTest() + if err != nil { + t.Fatal(err) + } + err = cli.createBucket(projectID, testBucket) + if err != nil { + t.Fatal(err) + } + defer func() { + cli.deleteBucket(testBucket) + }() + + protocol := "gs://" + + store, err := storage.Storage{}.WithConfig(conf.Worker.Storage) + if err != nil { + t.Fatal("error configuring storage:", err) + } + + fPath := "testdata/test_in" + inFileURL := protocol + testBucket + "/" + fPath + _, err = store.Put(context.Background(), inFileURL, fPath, tes.FileType_FILE) + if err != nil { + t.Fatal("error uploading test file:", err) + } + + dPath := "testdata/test_dir" + inDirURL := protocol + testBucket + "/" + dPath + _, err = store.Put(context.Background(), inDirURL, dPath, tes.FileType_DIRECTORY) + if err != nil { + t.Fatal("error uploading test directory:", err) + } + + outFileURL := protocol + testBucket + "/" + "test-output-file.txt" + outDirURL := protocol + testBucket + "/" + "test-output-directory" + + task := &tes.Task{ + Name: "gs e2e", + Inputs: []*tes.Input{ + { + Url: inFileURL, + Path: "/opt/inputs/test-file.txt", + Type: tes.FileType_FILE, + }, + { + Url: inDirURL, + Path: "/opt/inputs/test-directory", + Type: tes.FileType_DIRECTORY, + }, + }, + Outputs: []*tes.Output{ + { + Path: "/opt/workdir/test-output-file.txt", + Url: outFileURL, + Type: tes.FileType_FILE, + }, + { + Path: "/opt/workdir/test-output-directory", + Url: outDirURL, + Type: tes.FileType_DIRECTORY, + }, + }, + Executors: []*tes.Executor{ + { + Image: "alpine:latest", + Command: []string{ + "sh", + "-c", + "cat $(find /opt/inputs -type f) > test-output-file.txt; mkdir test-output-directory; cp *.txt test-output-directory/", + }, + Workdir: "/opt/workdir", + }, + }, + } + + resp, err := fun.RPC.CreateTask(context.Background(), task) + if err != nil { + t.Fatal(err) + } + + taskFinal := fun.Wait(resp.Id) + + if taskFinal.State != tes.State_COMPLETE { + t.Fatal("Unexpected task failure") + } + + expected := "file1 content\nfile2 content\nhello\n" + + err = store.Get(context.Background(), outFileURL, "./test_tmp/test-gs-file.txt", tes.FileType_FILE) + if err != nil { + t.Fatal("Failed to download file:", err) + } + + b, err := ioutil.ReadFile("./test_tmp/test-gs-file.txt") + if err != nil { + t.Fatal("Failed to read downloaded file:", err) + } + actual := string(b) + + if actual != expected { + t.Log("expected:", expected) + t.Log("actual: ", actual) + t.Fatal("unexpected content") + } + + err = store.Get(context.Background(), outDirURL, "./test_tmp/test-gs-directory", tes.FileType_DIRECTORY) + if err != nil { + t.Fatal("Failed to download directory:", err) + } + + b, err = ioutil.ReadFile("./test_tmp/test-gs-directory/test-output-file.txt") + if err != nil { + t.Fatal("Failed to read file in downloaded directory", err) + } + actual = string(b) + + if actual != expected { + t.Log("expected:", expected) + t.Log("actual: ", actual) + t.Fatal("unexpected content") + } +} + +type gsTest struct { + client *gs.Client +} + +func newGsTest() (*gsTest, error) { + client, err := gs.NewClient(context.Background()) + return &gsTest{client}, err +} + +func (g *gsTest) createBucket(projectID, bucket string) error { + cli := g.client.Bucket(bucket) + return cli.Create(context.Background(), projectID, nil) +} + +func (g *gsTest) deleteBucket(bucket string) error { + // emptyBucket(g.bucket) + // g.Buckets.Delete(g.bucket) + return nil +} diff --git a/tests/storage/s3_test.go b/tests/storage/s3_test.go index 8b5315186..14f4d8fe4 100644 --- a/tests/storage/s3_test.go +++ b/tests/storage/s3_test.go @@ -19,12 +19,6 @@ func TestAmazonS3Storage(t *testing.T) { t.Skipf("Skipping amazon s3 e2e tests...") } - store := storage.Storage{} - store, err := store.WithConfig(conf.Worker.Storage) - if err != nil { - t.Fatal("error configuring storage:", err) - } - testBucket := "funnel-e2e-tests-" + tests.RandomString(6) sess, err := util.NewAWSSession(conf.Worker.Storage.AmazonS3.AWS) @@ -40,26 +34,33 @@ func TestAmazonS3Storage(t *testing.T) { t.Fatal("error creating test s3 bucket:", err) } defer func() { - emptyBucket(client, testBucket) + amazonEmptyBucket(client, testBucket) client.DeleteBucket(&s3.DeleteBucketInput{Bucket: aws.String(testBucket)}) }() + protocol := "s3://" + + store, err := storage.Storage{}.WithConfig(conf.Worker.Storage) + if err != nil { + t.Fatal("error configuring storage:", err) + } + fPath := "testdata/test_in" - inFileURL := "s3://" + testBucket + "/" + fPath + inFileURL := protocol + testBucket + "/" + fPath _, err = store.Put(context.Background(), inFileURL, fPath, tes.FileType_FILE) if err != nil { t.Fatal("error uploading test file:", err) } dPath := "testdata/test_dir" - inDirURL := "s3://" + testBucket + "/" + dPath + inDirURL := protocol + testBucket + "/" + dPath _, err = store.Put(context.Background(), inDirURL, dPath, tes.FileType_DIRECTORY) if err != nil { t.Fatal("error uploading test directory:", err) } - outFileURL := "s3://" + testBucket + "/" + "test-output-file.txt" - outDirURL := "s3://" + testBucket + "/" + "test-output-directory" + outFileURL := protocol + testBucket + "/" + "test-output-file.txt" + outDirURL := protocol + testBucket + "/" + "test-output-directory" task := &tes.Task{ Name: "s3 e2e", @@ -148,8 +149,7 @@ func TestAmazonS3Storage(t *testing.T) { } } -// emptyBucket empties the Amazon S3 bucket -func emptyBucket(client *s3.S3, bucket string) error { +func amazonEmptyBucket(client *s3.S3, bucket string) error { log.Info("Removing objects from S3 bucket : ", bucket) params := &s3.ListObjectsInput{ Bucket: aws.String(bucket),