Skip to content

Commit

Permalink
storage: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
adamstruck committed Nov 21, 2017
1 parent 6bf9b8a commit 073b5e6
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 221 deletions.
50 changes: 25 additions & 25 deletions storage/amazon_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

var endpointRegExp = regexp.MustCompile("^(http[s]?://)?(.[^/]+)(/+)?$")

// S3Protocol defines the expected URL prefix for S3, "s3://"
const S3Protocol = "s3://"
// s3Protocol defines the s3 URL protocol
const s3Protocol = "s3://"

// AmazonS3Backend provides access to an S3 object store.
type AmazonS3Backend struct {
Expand All @@ -41,14 +41,14 @@ func NewAmazonS3Backend(conf config.AmazonS3Storage) (*AmazonS3Backend, error) {
}

// Get copies an object from S3 to the host path.
func (s3b *AmazonS3Backend) Get(ctx context.Context, url string, hostPath string, class tes.FileType) error {
func (s3b *AmazonS3Backend) Get(ctx context.Context, rawurl string, hostPath string, class tes.FileType) error {

bucket, key := s3b.processUrl(url)
url := s3b.parse(rawurl)

region, err := s3manager.GetBucketRegion(ctx, s3b.sess, bucket, "us-east-1")
region, err := s3manager.GetBucketRegion(ctx, s3b.sess, url.bucket, "us-east-1")
if err != nil {
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == "NotFound" {
return fmt.Errorf("unable to find bucket %s's region not found: %v", bucket, aerr)
return fmt.Errorf("unable to find bucket %s's region not found: %v", url.bucket, aerr)
}
return err
}
Expand All @@ -67,8 +67,8 @@ func (s3b *AmazonS3Backend) Get(ctx context.Context, url string, hostPath string
}

_, err = manager.DownloadWithContext(ctx, hf, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Bucket: aws.String(url.bucket),
Key: aws.String(url.path),
})
if err != nil {
return err
Expand All @@ -82,12 +82,12 @@ func (s3b *AmazonS3Backend) Get(ctx context.Context, url string, hostPath string
case Directory:
err = client.ListObjectsV2PagesWithContext(
ctx,
&s3.ListObjectsV2Input{Bucket: &bucket, Prefix: &key},
&s3.ListObjectsV2Input{Bucket: aws.String(url.bucket), Prefix: aws.String(url.path)},
func(page *s3.ListObjectsV2Output, more bool) bool {
for _, obj := range page.Contents {
if *obj.Key != key+"/" {
if *obj.Key != url.path+"/" {
// Create the directories in the path
file := filepath.Join(hostPath, strings.TrimPrefix(*obj.Key, key+"/"))
file := filepath.Join(hostPath, strings.TrimPrefix(*obj.Key, url.path+"/"))
if err := os.MkdirAll(filepath.Dir(file), 0775); err != nil {
panic(err)
}
Expand All @@ -100,7 +100,7 @@ func (s3b *AmazonS3Backend) Get(ctx context.Context, url string, hostPath string

// Download the file using the AWS SDK
_, err = manager.DownloadWithContext(ctx, hf, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Bucket: aws.String(url.bucket),
Key: obj.Key,
})
if err != nil {
Expand Down Expand Up @@ -128,14 +128,14 @@ func (s3b *AmazonS3Backend) Get(ctx context.Context, url string, hostPath string
}

// PutFile copies an object (file) from the host path to S3.
func (s3b *AmazonS3Backend) PutFile(ctx context.Context, url string, hostPath string) error {
func (s3b *AmazonS3Backend) PutFile(ctx context.Context, rawurl string, hostPath string) error {

bucket, key := s3b.processUrl(url)
url := s3b.parse(rawurl)

region, err := s3manager.GetBucketRegion(ctx, s3b.sess, bucket, "us-east-1")
region, err := s3manager.GetBucketRegion(ctx, s3b.sess, url.bucket, "us-east-1")
if err != nil {
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == "NotFound" {
return fmt.Errorf("unable to find bucket %s's region not found", bucket)
return fmt.Errorf("unable to find bucket %s's region not found", url.bucket)
}
return err
}
Expand All @@ -149,8 +149,8 @@ func (s3b *AmazonS3Backend) PutFile(ctx context.Context, url string, hostPath st
return fmt.Errorf("failed to open file %q, %v", hostPath, err)
}
_, err = manager.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Bucket: aws.String(url.bucket),
Key: aws.String(url.path),
Body: fh,
})
if err != nil {
Expand All @@ -162,22 +162,22 @@ func (s3b *AmazonS3Backend) PutFile(ctx context.Context, url string, hostPath st

// Supports indicates whether this backend supports the given storage request.
// For S3, the url must start with "s3://".
func (s3b *AmazonS3Backend) Supports(url string, hostPath string, class tes.FileType) bool {
if !strings.HasPrefix(url, S3Protocol) {
func (s3b *AmazonS3Backend) Supports(rawurl string, hostPath string, class tes.FileType) bool {
if !strings.HasPrefix(rawurl, s3Protocol) {
return false
}

bucket, _ := s3b.processUrl(url)
_, err := s3manager.GetBucketRegion(context.Background(), s3b.sess, bucket, "us-east-1")
url := s3b.parse(rawurl)
_, err := s3manager.GetBucketRegion(context.Background(), s3b.sess, url.bucket, "us-east-1")
if err != nil {
return false
}

return true
}

func (s3b *AmazonS3Backend) processUrl(url string) (string, string) {
path := strings.TrimPrefix(url, S3Protocol)
func (s3b *AmazonS3Backend) parse(url string) *urlparts {
path := strings.TrimPrefix(url, s3Protocol)
if s3b.endpoint != "" {
path = strings.TrimPrefix(path, s3b.endpoint)
} else {
Expand All @@ -189,5 +189,5 @@ func (s3b *AmazonS3Backend) processUrl(url string) (string, string) {
bucket := split[0]
key := split[1]

return bucket, key
return &urlparts{bucket, key}
}
34 changes: 16 additions & 18 deletions storage/generic_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ func NewGenericS3Backend(conf config.S3Storage) (*GenericS3Backend, error) {
}

// Get copies an object from S3 to the host path.
func (s3 *GenericS3Backend) Get(ctx context.Context, url string, hostPath string, class tes.FileType) error {
bucket, key := s3.processUrl(url)
func (s3 *GenericS3Backend) Get(ctx context.Context, rawurl string, hostPath string, class tes.FileType) error {
url := s3.parse(rawurl)

switch class {
case File:
err := s3.client.FGetObjectWithContext(ctx, bucket, key, hostPath, minio.GetObjectOptions{})
err := s3.client.FGetObjectWithContext(ctx, url.bucket, url.path, hostPath, minio.GetObjectOptions{})
if err != nil {
return err
}
Expand All @@ -48,13 +48,13 @@ func (s3 *GenericS3Backend) Get(ctx context.Context, url string, hostPath string
defer close(doneCh)
// Recursively list all objects in 'mytestbucket'
recursive := true
for obj := range s3.client.ListObjectsV2(bucket, key, recursive, doneCh) {
for obj := range s3.client.ListObjectsV2(url.bucket, url.path, recursive, doneCh) {
// Create the directories in the path
file := filepath.Join(hostPath, strings.TrimPrefix(obj.Key, key+"/"))
file := filepath.Join(hostPath, strings.TrimPrefix(obj.Key, url.path+"/"))
if err := os.MkdirAll(filepath.Dir(file), 0775); err != nil {
return err
}
err := s3.client.FGetObjectWithContext(ctx, bucket, obj.Key, file, minio.GetObjectOptions{})
err := s3.client.FGetObjectWithContext(ctx, url.bucket, obj.Key, file, minio.GetObjectOptions{})
if err != nil {
return err
}
Expand All @@ -68,34 +68,32 @@ func (s3 *GenericS3Backend) Get(ctx context.Context, url string, hostPath string
}

// Put copies an object (file) from the host path to S3.
func (s3 *GenericS3Backend) PutFile(ctx context.Context, url string, hostPath string) error {
bucket, key := s3.processUrl(url)

_, err := s3.client.FPutObjectWithContext(ctx, bucket, key, hostPath, minio.PutObjectOptions{})
func (s3 *GenericS3Backend) PutFile(ctx context.Context, rawurl string, hostPath string) error {
url := s3.parse(rawurl)
_, err := s3.client.FPutObjectWithContext(ctx, url.bucket, url.path, hostPath, minio.PutObjectOptions{})

return err
}

// Supports indicates whether this backend supports the given storage request.
// For S3, the url must start with "s3://".
func (s3 *GenericS3Backend) Supports(url string, hostPath string, class tes.FileType) bool {
if !strings.HasPrefix(url, S3Protocol) {
func (s3 *GenericS3Backend) Supports(rawurl string, hostPath string, class tes.FileType) bool {
if !strings.HasPrefix(rawurl, s3Protocol) {
return false
}

bucket, _ := s3.processUrl(url)
found, _ := s3.client.BucketExists(bucket)
url := s3.parse(rawurl)
found, _ := s3.client.BucketExists(url.bucket)

return found
}

func (s3 *GenericS3Backend) processUrl(url string) (string, string) {
path := strings.TrimPrefix(url, S3Protocol)
func (s3 *GenericS3Backend) parse(url string) *urlparts {
path := strings.TrimPrefix(url, s3Protocol)
path = strings.TrimPrefix(path, s3.endpoint)

split := strings.SplitN(path, "/", 2)
bucket := split[0]
key := split[1]

return bucket, key
return &urlparts{bucket, key}
}
42 changes: 10 additions & 32 deletions storage/gs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@ import (
"io"
"io/ioutil"
"net/http"
urllib "net/url"
"os"
"path"
"strings"
)

// The gs url protocol
const gsscheme = "gs"
const gsProtocol = "gs://"

// GSBackend provides access to an GS object store.
type GSBackend struct {
Expand Down Expand Up @@ -65,10 +64,7 @@ func NewGSBackend(conf config.GSStorage) (*GSBackend, error) {

// Get copies an object from GS to the host path.
func (gs *GSBackend) Get(ctx context.Context, rawurl string, hostPath string, class tes.FileType) error {
url, perr := parse(rawurl)
if perr != nil {
return perr
}
url := gs.parse(rawurl)

if class == tes.FileType_FILE {
call := gs.svc.Objects.Get(url.bucket, url.path)
Expand Down Expand Up @@ -114,10 +110,7 @@ func download(call *storage.ObjectsGetCall, hostPath string) error {

// PutFile copies an object (file) from the host path to GS.
func (gs *GSBackend) PutFile(ctx context.Context, rawurl string, hostPath string) error {
url, perr := parse(rawurl)
if perr != nil {
return perr
}
url := gs.parse(rawurl)

reader, oerr := os.Open(hostPath)
if oerr != nil {
Expand All @@ -135,28 +128,13 @@ func (gs *GSBackend) PutFile(ctx context.Context, rawurl string, hostPath string
// Supports returns true if this backend supports the given storage request.
// The Google Storage backend supports URLs which have a "gs://" scheme.
func (gs *GSBackend) Supports(rawurl string, hostPath string, class tes.FileType) bool {
_, err := parse(rawurl)
if err != nil {
return false
}
return true
}

type urlparts struct {
bucket string
path string
return strings.HasPrefix(rawurl, gsProtocol)
}

func parse(rawurl string) (*urlparts, error) {
url, err := urllib.Parse(rawurl)
if err != nil {
return nil, err
}
if url.Scheme != gsscheme {
return nil, fmt.Errorf("Invalid URL scheme '%s' for Google Storage backend in url: %s", url.Scheme, rawurl)
}

bucket := url.Host
path := strings.TrimLeft(url.EscapedPath(), "/")
return &urlparts{bucket, path}, nil
func (gs *GSBackend) parse(rawurl string) *urlparts {
path := strings.TrimPrefix(rawurl, gsProtocol)
split := strings.SplitN(path, "/", 2)
bucket := split[0]
key := split[1]
return &urlparts{bucket, key}
}
61 changes: 0 additions & 61 deletions storage/gs_test.go

This file was deleted.

5 changes: 5 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,8 @@ func fileSize(path string) int64 {
}
return st.Size()
}

type urlparts struct {
bucket string
path string
}
Loading

0 comments on commit 073b5e6

Please sign in to comment.