diff --git a/cmd/fs/cat.go b/cmd/fs/cat.go index 8227cd7814..be18665383 100644 --- a/cmd/fs/cat.go +++ b/cmd/fs/cat.go @@ -9,8 +9,8 @@ import ( func newCatCommand() *cobra.Command { cmd := &cobra.Command{ Use: "cat FILE_PATH", - Short: "Show file content", - Long: `Show the contents of a file.`, + Short: "Show file content.", + Long: `Show the contents of a file in DBFS or a UC Volume.`, Args: cobra.ExactArgs(1), PreRunE: root.MustWorkspaceClient, } diff --git a/cmd/fs/cp.go b/cmd/fs/cp.go index 97fceb93cb..f0f480fec6 100644 --- a/cmd/fs/cp.go +++ b/cmd/fs/cp.go @@ -129,10 +129,10 @@ func (c *copy) emitFileCopiedEvent(sourcePath, targetPath string) error { func newCpCommand() *cobra.Command { cmd := &cobra.Command{ Use: "cp SOURCE_PATH TARGET_PATH", - Short: "Copy files and directories to and from DBFS.", - Long: `Copy files to and from DBFS. + Short: "Copy files and directories.", + Long: `Copy files and directories to and from any paths on DBFS, UC Volumes or your local filesystem. - For paths in DBFS it is required that you specify the "dbfs" scheme. + For paths in DBFS and UC Volumes, it is required that you specify the "dbfs" scheme. For example: dbfs:/foo/bar. Recursively copying a directory will copy all files inside directory @@ -152,9 +152,6 @@ func newCpCommand() *cobra.Command { cmd.RunE = func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() - // TODO: Error if a user uses '\' as path separator on windows when "file" - // scheme is specified (https://github.com/databricks/cli/issues/485) - // Get source filer and source path without scheme fullSourcePath := args[0] sourceFiler, sourcePath, err := filerForPath(ctx, fullSourcePath) diff --git a/cmd/fs/fs.go b/cmd/fs/fs.go index 01d8a745be..1f36696a65 100644 --- a/cmd/fs/fs.go +++ b/cmd/fs/fs.go @@ -8,7 +8,7 @@ func New() *cobra.Command { cmd := &cobra.Command{ Use: "fs", Short: "Filesystem related commands", - Long: `Commands to do DBFS operations.`, + Long: `Commands to do file system operations on DBFS and UC Volumes.`, GroupID: "workspace", } diff --git a/cmd/fs/ls.go b/cmd/fs/ls.go index 7ae55e1f46..be52b9289a 100644 --- a/cmd/fs/ls.go +++ b/cmd/fs/ls.go @@ -40,8 +40,8 @@ func toJsonDirEntry(f fs.DirEntry, baseDir string, isAbsolute bool) (*jsonDirEnt func newLsCommand() *cobra.Command { cmd := &cobra.Command{ Use: "ls DIR_PATH", - Short: "Lists files", - Long: `Lists files`, + Short: "Lists files.", + Long: `Lists files in DBFS and UC Volumes.`, Args: cobra.ExactArgs(1), PreRunE: root.MustWorkspaceClient, } diff --git a/cmd/fs/mkdir.go b/cmd/fs/mkdir.go index c6a5e607c6..dc054d8a7c 100644 --- a/cmd/fs/mkdir.go +++ b/cmd/fs/mkdir.go @@ -11,8 +11,8 @@ func newMkdirCommand() *cobra.Command { // Alias `mkdirs` for this command exists for legacy purposes. This command // is called databricks fs mkdirs in our legacy CLI: https://github.com/databricks/databricks-cli Aliases: []string{"mkdirs"}, - Short: "Make directories", - Long: `Mkdir will create directories along the path to the argument directory.`, + Short: "Make directories.", + Long: `Make directories in DBFS and UC Volumes. Mkdir will create directories along the path to the argument directory.`, Args: cobra.ExactArgs(1), PreRunE: root.MustWorkspaceClient, } diff --git a/cmd/fs/rm.go b/cmd/fs/rm.go index 3ce8d3b93d..8a7b6571db 100644 --- a/cmd/fs/rm.go +++ b/cmd/fs/rm.go @@ -9,8 +9,8 @@ import ( func newRmCommand() *cobra.Command { cmd := &cobra.Command{ Use: "rm PATH", - Short: "Remove files and directories from dbfs.", - Long: `Remove files and directories from dbfs.`, + Short: "Remove files and directories.", + Long: `Remove files and directories from DBFS and UC Volumes.`, Args: cobra.ExactArgs(1), PreRunE: root.MustWorkspaceClient, } diff --git a/internal/filer_test.go b/internal/filer_test.go index b1af6886c6..d333a1b70e 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -6,14 +6,11 @@ import ( "errors" "io" "io/fs" - "net/http" "regexp" "strings" "testing" "github.com/databricks/cli/libs/filer" - "github.com/databricks/databricks-sdk-go" - "github.com/databricks/databricks-sdk-go/apierr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -40,15 +37,87 @@ func (f filerTest) assertContents(ctx context.Context, name string, contents str assert.Equal(f, contents, body.String()) } -func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { +func commonFilerRecursiveDeleteTest(t *testing.T, ctx context.Context, f filer.Filer) { var err error - // Write should fail because the root path doesn't yet exist. + err = f.Write(ctx, "dir/file1", strings.NewReader("content1"), filer.CreateParentDirectories) + require.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "dir/file1", `content1`) + + err = f.Write(ctx, "dir/file2", strings.NewReader("content2"), filer.CreateParentDirectories) + require.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "dir/file2", `content2`) + + err = f.Write(ctx, "dir/subdir1/file3", strings.NewReader("content3"), filer.CreateParentDirectories) + require.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "dir/subdir1/file3", `content3`) + + err = f.Write(ctx, "dir/subdir1/file4", strings.NewReader("content4"), filer.CreateParentDirectories) + require.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "dir/subdir1/file4", `content4`) + + err = f.Write(ctx, "dir/subdir2/file5", strings.NewReader("content5"), filer.CreateParentDirectories) + require.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "dir/subdir2/file5", `content5`) + + err = f.Write(ctx, "dir/subdir2/file6", strings.NewReader("content6"), filer.CreateParentDirectories) + require.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "dir/subdir2/file6", `content6`) + + entriesBeforeDelete, err := f.ReadDir(ctx, "dir") + require.NoError(t, err) + assert.Len(t, entriesBeforeDelete, 4) + + names := []string{} + for _, e := range entriesBeforeDelete { + names = append(names, e.Name()) + } + assert.Equal(t, names, []string{"file1", "file2", "subdir1", "subdir2"}) + + err = f.Delete(ctx, "dir") + assert.ErrorAs(t, err, &filer.DirectoryNotEmptyError{}) + + err = f.Delete(ctx, "dir", filer.DeleteRecursively) + assert.NoError(t, err) + _, err = f.ReadDir(ctx, "dir") + assert.ErrorAs(t, err, &filer.NoSuchDirectoryError{}) +} + +func TestAccFilerRecursiveDelete(t *testing.T) { + t.Parallel() + + for _, testCase := range []struct { + name string + f func(t *testing.T) (filer.Filer, string) + }{ + {"local", setupLocalFiler}, + {"workspace files", setupWsfsFiler}, + {"dbfs", setupDbfsFiler}, + {"files", setupUcVolumesFiler}, + } { + tc := testCase + + t.Run(testCase.name, func(t *testing.T) { + t.Parallel() + f, _ := tc.f(t) + ctx := context.Background() + + // Common tests we run across all filers to ensure consistent behavior. + commonFilerRecursiveDeleteTest(t, ctx, f) + }) + } +} + +// Common tests we run across all filers to ensure consistent behavior. +func commonFilerReadWriteTests(t *testing.T, ctx context.Context, f filer.Filer) { + var err error + + // Write should fail because the intermediate directory doesn't exist. err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`)) assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) assert.True(t, errors.Is(err, fs.ErrNotExist)) - // Read should fail because the root path doesn't yet exist. + // Read should fail because the intermediate directory doesn't yet exist. _, err = f.Read(ctx, "/foo/bar") assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) assert.True(t, errors.Is(err, fs.ErrNotExist)) @@ -96,12 +165,12 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { // Delete should fail if the file doesn't exist. err = f.Delete(ctx, "/doesnt_exist") - assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) + assert.ErrorAs(t, err, &filer.FileDoesNotExistError{}) assert.True(t, errors.Is(err, fs.ErrNotExist)) // Stat should fail if the file doesn't exist. _, err = f.Stat(ctx, "/doesnt_exist") - assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) + assert.ErrorAs(t, err, &filer.FileDoesNotExistError{}) assert.True(t, errors.Is(err, fs.ErrNotExist)) // Delete should succeed for file that does exist. @@ -110,7 +179,7 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { // Delete should fail for a non-empty directory. err = f.Delete(ctx, "/foo") - assert.True(t, errors.As(err, &filer.DirectoryNotEmptyError{})) + assert.ErrorAs(t, err, &filer.DirectoryNotEmptyError{}) assert.True(t, errors.Is(err, fs.ErrInvalid)) // Delete should succeed for a non-empty directory if the DeleteRecursively flag is set. @@ -124,7 +193,33 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { assert.True(t, errors.Is(err, fs.ErrInvalid)) } -func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { +func TestAccFilerReadWrite(t *testing.T) { + t.Parallel() + + for _, testCase := range []struct { + name string + f func(t *testing.T) (filer.Filer, string) + }{ + {"local", setupLocalFiler}, + {"workspace files", setupWsfsFiler}, + {"dbfs", setupDbfsFiler}, + {"files", setupUcVolumesFiler}, + } { + tc := testCase + + t.Run(testCase.name, func(t *testing.T) { + t.Parallel() + f, _ := tc.f(t) + ctx := context.Background() + + // Common tests we run across all filers to ensure consistent behavior. + commonFilerReadWriteTests(t, ctx, f) + }) + } +} + +// Common tests we run across all filers to ensure consistent behavior. +func commonFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { var err error var info fs.FileInfo @@ -206,54 +301,28 @@ func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { assert.False(t, entries[0].IsDir()) } -func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - ctx := context.Background() - w := databricks.Must(databricks.NewWorkspaceClient()) - tmpdir := TemporaryWorkspaceDir(t, w) - f, err := filer.NewWorkspaceFilesClient(w, tmpdir) - require.NoError(t, err) - - // Check if we can use this API here, skip test if we cannot. - _, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled") - var aerr *apierr.APIError - if errors.As(err, &aerr) && aerr.StatusCode == http.StatusBadRequest { - t.Skip(aerr.Message) +func TestAccFilerReadDir(t *testing.T) { + t.Parallel() + + for _, testCase := range []struct { + name string + f func(t *testing.T) (filer.Filer, string) + }{ + {"local", setupLocalFiler}, + {"workspace files", setupWsfsFiler}, + {"dbfs", setupDbfsFiler}, + {"files", setupUcVolumesFiler}, + } { + tc := testCase + + t.Run(testCase.name, func(t *testing.T) { + t.Parallel() + f, _ := tc.f(t) + ctx := context.Background() + + commonFilerReadDirTest(t, ctx, f) + }) } - - return ctx, f -} - -func TestAccFilerWorkspaceFilesReadWrite(t *testing.T) { - ctx, f := setupWorkspaceFilesTest(t) - runFilerReadWriteTest(t, ctx, f) -} - -func TestAccFilerWorkspaceFilesReadDir(t *testing.T) { - ctx, f := setupWorkspaceFilesTest(t) - runFilerReadDirTest(t, ctx, f) -} - -func setupFilerDbfsTest(t *testing.T) (context.Context, filer.Filer) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - ctx := context.Background() - w := databricks.Must(databricks.NewWorkspaceClient()) - tmpdir := TemporaryDbfsDir(t, w) - f, err := filer.NewDbfsClient(w, tmpdir) - require.NoError(t, err) - return ctx, f -} - -func TestAccFilerDbfsReadWrite(t *testing.T) { - ctx, f := setupFilerDbfsTest(t) - runFilerReadWriteTest(t, ctx, f) -} - -func TestAccFilerDbfsReadDir(t *testing.T) { - ctx, f := setupFilerDbfsTest(t) - runFilerReadDirTest(t, ctx, f) } var jupyterNotebookContent1 = ` @@ -305,7 +374,8 @@ var jupyterNotebookContent2 = ` ` func TestAccFilerWorkspaceNotebookConflict(t *testing.T) { - ctx, f := setupWorkspaceFilesTest(t) + f, _ := setupWsfsFiler(t) + ctx := context.Background() var err error // Upload the notebooks @@ -350,7 +420,8 @@ func TestAccFilerWorkspaceNotebookConflict(t *testing.T) { } func TestAccFilerWorkspaceNotebookWithOverwriteFlag(t *testing.T) { - ctx, f := setupWorkspaceFilesTest(t) + f, _ := setupWsfsFiler(t) + ctx := context.Background() var err error // Upload notebooks @@ -391,140 +462,3 @@ func TestAccFilerWorkspaceNotebookWithOverwriteFlag(t *testing.T) { filerTest{t, f}.assertContents(ctx, "scalaNb", "// Databricks notebook source\n println(\"second upload\"))") filerTest{t, f}.assertContents(ctx, "jupyterNb", "# Databricks notebook source\nprint(\"Jupyter Notebook Version 2\")") } - -func setupFilerLocalTest(t *testing.T) (context.Context, filer.Filer) { - ctx := context.Background() - f, err := filer.NewLocalClient(t.TempDir()) - require.NoError(t, err) - return ctx, f -} - -func TestAccFilerLocalReadWrite(t *testing.T) { - ctx, f := setupFilerLocalTest(t) - runFilerReadWriteTest(t, ctx, f) -} - -func TestAccFilerLocalReadDir(t *testing.T) { - ctx, f := setupFilerLocalTest(t) - runFilerReadDirTest(t, ctx, f) -} - -func temporaryVolumeDir(t *testing.T, w *databricks.WorkspaceClient) string { - // Assume this test is run against the internal testing workspace. - path := RandomName("/Volumes/bogdanghita/default/v3_shared/cli-testing/integration-test-filer-") - - // The Files API doesn't include support for creating and removing directories yet. - // Directories are created implicitly by writing a file to a path that doesn't exist. - // We therefore assume we can use the specified path without creating it first. - t.Logf("using dbfs:%s", path) - - return path -} - -func setupFilerFilesApiTest(t *testing.T) (context.Context, filer.Filer) { - t.SkipNow() // until available on prod - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - ctx := context.Background() - w := databricks.Must(databricks.NewWorkspaceClient()) - tmpdir := temporaryVolumeDir(t, w) - f, err := filer.NewFilesClient(w, tmpdir) - require.NoError(t, err) - return ctx, f -} - -func TestAccFilerFilesApiReadWrite(t *testing.T) { - ctx, f := setupFilerFilesApiTest(t) - - // The Files API doesn't know about directories yet. - // Below is a copy of [runFilerReadWriteTest] with - // assertions that don't work commented out. - - var err error - - // Write should fail because the root path doesn't yet exist. - // err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`)) - // assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) - // assert.True(t, errors.Is(err, fs.ErrNotExist)) - - // Read should fail because the root path doesn't yet exist. - _, err = f.Read(ctx, "/foo/bar") - assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) - assert.True(t, errors.Is(err, fs.ErrNotExist)) - - // Read should fail because the path points to a directory - // err = f.Mkdir(ctx, "/dir") - // require.NoError(t, err) - // _, err = f.Read(ctx, "/dir") - // assert.ErrorIs(t, err, fs.ErrInvalid) - - // Write with CreateParentDirectories flag should succeed. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories) - assert.NoError(t, err) - filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello world`) - - // Write should fail because there is an existing file at the specified path. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`)) - assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{})) - assert.True(t, errors.Is(err, fs.ErrExist)) - - // Write with OverwriteIfExists should succeed. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), filer.OverwriteIfExists) - assert.NoError(t, err) - filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`) - - // Write should succeed if there is no existing file at the specified path. - err = f.Write(ctx, "/foo/qux", strings.NewReader(`hello universe`)) - assert.NoError(t, err) - - // Stat on a directory should succeed. - // Note: size and modification time behave differently between backends. - info, err := f.Stat(ctx, "/foo") - require.NoError(t, err) - assert.Equal(t, "foo", info.Name()) - assert.True(t, info.Mode().IsDir()) - assert.Equal(t, true, info.IsDir()) - - // Stat on a file should succeed. - // Note: size and modification time behave differently between backends. - info, err = f.Stat(ctx, "/foo/bar") - require.NoError(t, err) - assert.Equal(t, "bar", info.Name()) - assert.True(t, info.Mode().IsRegular()) - assert.Equal(t, false, info.IsDir()) - - // Delete should fail if the file doesn't exist. - err = f.Delete(ctx, "/doesnt_exist") - assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) - assert.True(t, errors.Is(err, fs.ErrNotExist)) - - // Stat should fail if the file doesn't exist. - _, err = f.Stat(ctx, "/doesnt_exist") - assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) - assert.True(t, errors.Is(err, fs.ErrNotExist)) - - // Delete should succeed for file that does exist. - err = f.Delete(ctx, "/foo/bar") - assert.NoError(t, err) - - // Delete should fail for a non-empty directory. - err = f.Delete(ctx, "/foo") - assert.True(t, errors.As(err, &filer.DirectoryNotEmptyError{})) - assert.True(t, errors.Is(err, fs.ErrInvalid)) - - // Delete should succeed for a non-empty directory if the DeleteRecursively flag is set. - // err = f.Delete(ctx, "/foo", filer.DeleteRecursively) - // assert.NoError(t, err) - - // Delete of the filer root should ALWAYS fail, otherwise subsequent writes would fail. - // It is not in the filer's purview to delete its root directory. - err = f.Delete(ctx, "/") - assert.True(t, errors.As(err, &filer.CannotDeleteRootError{})) - assert.True(t, errors.Is(err, fs.ErrInvalid)) -} - -func TestAccFilerFilesApiReadDir(t *testing.T) { - t.Skipf("no support for ReadDir yet") - ctx, f := setupFilerFilesApiTest(t) - runFilerReadDirTest(t, ctx, f) -} diff --git a/internal/fs_cat_test.go b/internal/fs_cat_test.go index 2c979ea735..6292aef189 100644 --- a/internal/fs_cat_test.go +++ b/internal/fs_cat_test.go @@ -13,31 +13,60 @@ import ( "github.com/stretchr/testify/require" ) -func TestAccFsCatForDbfs(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsCat(t *testing.T) { + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + f, tmpDir := tc.setupFiler(t) + err := f.Write(context.Background(), "hello.txt", strings.NewReader("abcd"), filer.CreateParentDirectories) + require.NoError(t, err) - err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories) - require.NoError(t, err) + stdout, stderr := RequireSuccessfulRun(t, "fs", "cat", path.Join(tmpDir, "hello.txt")) + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "abcd", stdout.String()) + }) + } +} - stdout, stderr := RequireSuccessfulRun(t, "fs", "cat", "dbfs:"+path.Join(tmpDir, "a", "hello.txt")) - assert.Equal(t, "", stderr.String()) - assert.Equal(t, "abc", stdout.String()) +func TestAccFsCatOnADir(t *testing.T) { + t.Parallel() + + for _, testCase := range fsTests { + tc := testCase + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + f, tmpDir := tc.setupFiler(t) + err := f.Mkdir(context.Background(), "dir1") + require.NoError(t, err) + + _, _, err = RequireErrorRun(t, "fs", "cat", path.Join(tmpDir, "dir1")) + assert.ErrorAs(t, err, &filer.NotAFile{}) + }) + } } -func TestAccFsCatForDbfsOnNonExistentFile(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsCatOnNonExistentFile(t *testing.T) { + t.Parallel() - _, _, err := RequireErrorRun(t, "fs", "cat", "dbfs:/non-existent-file") - assert.ErrorIs(t, err, fs.ErrNotExist) + for _, testCase := range fsTests { + tc := testCase + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + _, tmpDir := tc.setupFiler(t) + + _, _, err := RequireErrorRun(t, "fs", "cat", path.Join(tmpDir, "non-existent-file")) + assert.ErrorIs(t, err, fs.ErrNotExist) + }) + } } func TestAccFsCatForDbfsInvalidScheme(t *testing.T) { @@ -65,6 +94,3 @@ func TestAccFsCatDoesNotSupportOutputModeJson(t *testing.T) { _, _, err = RequireErrorRun(t, "fs", "cat", "dbfs:"+path.Join(tmpDir, "hello.txt"), "--output=json") assert.ErrorContains(t, err, "json output not supported") } - -// TODO: Add test asserting an error when cat is called on an directory. Need this to be -// fixed in the SDK first (https://github.com/databricks/databricks-sdk-go/issues/414) diff --git a/internal/fs_cp_test.go b/internal/fs_cp_test.go index ab177a36f9..b69735bc00 100644 --- a/internal/fs_cp_test.go +++ b/internal/fs_cp_test.go @@ -2,16 +2,15 @@ package internal import ( "context" - "fmt" "io" "path" "path/filepath" + "regexp" "runtime" "strings" "testing" "github.com/databricks/cli/libs/filer" - "github.com/databricks/databricks-sdk-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -60,84 +59,124 @@ func assertTargetDir(t *testing.T, ctx context.Context, f filer.Filer) { assertFileContent(t, ctx, f, "a/b/c/hello.txt", "hello, world\n") } -func setupLocalFiler(t *testing.T) (filer.Filer, string) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - tmp := t.TempDir() - f, err := filer.NewLocalClient(tmp) - require.NoError(t, err) - - return f, path.Join(filepath.ToSlash(tmp)) -} - -func setupDbfsFiler(t *testing.T) (filer.Filer, string) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) - - tmpDir := TemporaryDbfsDir(t, w) - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) - - return f, path.Join("dbfs:/", tmpDir) -} - type cpTest struct { + name string setupSource func(*testing.T) (filer.Filer, string) setupTarget func(*testing.T) (filer.Filer, string) } -func setupTable() []cpTest { +func copyTests() []cpTest { return []cpTest{ - {setupSource: setupLocalFiler, setupTarget: setupLocalFiler}, - {setupSource: setupLocalFiler, setupTarget: setupDbfsFiler}, - {setupSource: setupDbfsFiler, setupTarget: setupLocalFiler}, - {setupSource: setupDbfsFiler, setupTarget: setupDbfsFiler}, + // source: local file system + { + name: "local to local", + setupSource: setupLocalFiler, + setupTarget: setupLocalFiler, + }, + { + name: "local to dbfs", + setupSource: setupLocalFiler, + setupTarget: setupDbfsFiler, + }, + { + name: "local to uc-volumes", + setupSource: setupLocalFiler, + setupTarget: setupUcVolumesFiler, + }, + + // source: dbfs + { + name: "dbfs to local", + setupSource: setupDbfsFiler, + setupTarget: setupLocalFiler, + }, + { + name: "dbfs to dbfs", + setupSource: setupDbfsFiler, + setupTarget: setupDbfsFiler, + }, + { + name: "dbfs to uc-volumes", + setupSource: setupDbfsFiler, + setupTarget: setupUcVolumesFiler, + }, + + // source: uc-volumes + { + name: "uc-volumes to local", + setupSource: setupUcVolumesFiler, + setupTarget: setupLocalFiler, + }, + { + name: "uc-volumes to dbfs", + setupSource: setupUcVolumesFiler, + setupTarget: setupDbfsFiler, + }, + { + name: "uc-volumes to uc-volumes", + setupSource: setupUcVolumesFiler, + setupTarget: setupUcVolumesFiler, + }, } } func TestAccFsCpDir(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() + + for _, testCase := range copyTests() { + tc := testCase - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - RequireSuccessfulRun(t, "fs", "cp", "-r", sourceDir, targetDir) + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) - assertTargetDir(t, ctx, targetFiler) + RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive") + + assertTargetDir(t, context.Background(), targetFiler) + }) } } func TestAccFsCpFileToFile(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() + + for _, testCase := range copyTests() { + tc := testCase - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceFile(t, ctx, sourceFiler) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "foo.txt"), path.Join(targetDir, "bar.txt")) + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceFile(t, context.Background(), sourceFiler) - assertTargetFile(t, ctx, targetFiler, "bar.txt") + RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "foo.txt"), path.Join(targetDir, "bar.txt")) + + assertTargetFile(t, context.Background(), targetFiler, "bar.txt") + }) } } func TestAccFsCpFileToDir(t *testing.T) { - ctx := context.Background() - table := setupTable() - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceFile(t, ctx, sourceFiler) + t.Parallel() + + for _, testCase := range copyTests() { + tc := testCase - RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "foo.txt"), targetDir) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - assertTargetFile(t, ctx, targetFiler, "foo.txt") + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceFile(t, context.Background(), sourceFiler) + + RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "foo.txt"), targetDir) + + assertTargetFile(t, context.Background(), targetFiler, "foo.txt") + }) } } @@ -158,125 +197,161 @@ func TestAccFsCpFileToDirForWindowsPaths(t *testing.T) { } func TestAccFsCpDirToDirFileNotOverwritten(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + for _, testCase := range copyTests() { + tc := testCase - // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) - require.NoError(t, err) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive") - assertFileContent(t, ctx, targetFiler, "a/b/c/hello.txt", "this should not be overwritten") - assertFileContent(t, ctx, targetFiler, "query.sql", "SELECT 1") - assertFileContent(t, ctx, targetFiler, "pyNb.py", "# Databricks notebook source\nprint(123)") + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) + + // Write a conflicting file to target + err := targetFiler.Write(context.Background(), "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) + require.NoError(t, err) + + RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive") + assertFileContent(t, context.Background(), targetFiler, "a/b/c/hello.txt", "this should not be overwritten") + assertFileContent(t, context.Background(), targetFiler, "query.sql", "SELECT 1") + assertFileContent(t, context.Background(), targetFiler, "pyNb.py", "# Databricks notebook source\nprint(123)") + }) } } func TestAccFsCpFileToDirFileNotOverwritten(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() + + for _, testCase := range copyTests() { + tc := testCase - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) - require.NoError(t, err) + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) - RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c")) - assertFileContent(t, ctx, targetFiler, "a/b/c/hello.txt", "this should not be overwritten") + // Write a conflicting file to target + err := targetFiler.Write(context.Background(), "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) + require.NoError(t, err) + + RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c")) + assertFileContent(t, context.Background(), targetFiler, "a/b/c/hello.txt", "this should not be overwritten") + }) } } func TestAccFsCpFileToFileFileNotOverwritten(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() + + for _, testCase := range copyTests() { + tc := testCase - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hola.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) - require.NoError(t, err) + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) - RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/hola.txt"), "--recursive") - assertFileContent(t, ctx, targetFiler, "a/b/c/hola.txt", "this should not be overwritten") + // Write a conflicting file to target + err := targetFiler.Write(context.Background(), "a/b/c/dontoverwrite.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) + require.NoError(t, err) + + RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/dontoverwrite.txt")) + assertFileContent(t, context.Background(), targetFiler, "a/b/c/dontoverwrite.txt", "this should not be overwritten") + }) } } func TestAccFsCpDirToDirWithOverwriteFlag(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() + + for _, testCase := range copyTests() { + tc := testCase - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this will be overwritten"), filer.CreateParentDirectories) - require.NoError(t, err) + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) - RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive", "--overwrite") - assertTargetDir(t, ctx, targetFiler) + // Write a conflicting file to target + err := targetFiler.Write(context.Background(), "a/b/c/hello.txt", strings.NewReader("this should be overwritten"), filer.CreateParentDirectories) + require.NoError(t, err) + + RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive", "--overwrite") + assertTargetDir(t, context.Background(), targetFiler) + }) } } func TestAccFsCpFileToFileWithOverwriteFlag(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() + + for _, testCase := range copyTests() { + tc := testCase - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hola.txt", strings.NewReader("this will be overwritten. Such is life."), filer.CreateParentDirectories) - require.NoError(t, err) + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) - RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/hola.txt"), "--overwrite") - assertFileContent(t, ctx, targetFiler, "a/b/c/hola.txt", "hello, world\n") + // Write a conflicting file to target + err := targetFiler.Write(context.Background(), "a/b/c/overwritten.txt", strings.NewReader("this should be overwritten"), filer.CreateParentDirectories) + require.NoError(t, err) + + RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/overwritten.txt"), "--overwrite") + assertFileContent(t, context.Background(), targetFiler, "a/b/c/overwritten.txt", "hello, world\n") + }) } } func TestAccFsCpFileToDirWithOverwriteFlag(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() + + for _, testCase := range copyTests() { + tc := testCase - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this will be overwritten :') "), filer.CreateParentDirectories) - require.NoError(t, err) + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) - RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c"), "--recursive", "--overwrite") - assertFileContent(t, ctx, targetFiler, "a/b/c/hello.txt", "hello, world\n") + // Write a conflicting file to target + err := targetFiler.Write(context.Background(), "a/b/c/hello.txt", strings.NewReader("this should be overwritten"), filer.CreateParentDirectories) + require.NoError(t, err) + + RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c"), "--overwrite") + assertFileContent(t, context.Background(), targetFiler, "a/b/c/hello.txt", "hello, world\n") + }) } } func TestAccFsCpErrorsWhenSourceIsDirWithoutRecursiveFlag(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + t.Parallel() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - _, _, err = RequireErrorRun(t, "fs", "cp", "dbfs:"+tmpDir, "dbfs:/tmp") - assert.Equal(t, fmt.Sprintf("source path %s is a directory. Please specify the --recursive flag", tmpDir), err.Error()) + _, tmpDir := tc.setupFiler(t) + + _, _, err := RequireErrorRun(t, "fs", "cp", path.Join(tmpDir), path.Join(tmpDir, "foobar")) + r := regexp.MustCompile("source path .* is a directory. Please specify the --recursive flag") + assert.Regexp(t, r, err.Error()) + }) + } } func TestAccFsCpErrorsOnInvalidScheme(t *testing.T) { @@ -287,20 +362,24 @@ func TestAccFsCpErrorsOnInvalidScheme(t *testing.T) { } func TestAccFsCpSourceIsDirectoryButTargetIsFile(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + for _, testCase := range copyTests() { + tc := testCase - // Write a conflicting file to target - err := targetFiler.Write(ctx, "my_target", strings.NewReader("I'll block any attempts to recursively copy"), filer.CreateParentDirectories) - require.NoError(t, err) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - _, _, err = RequireErrorRun(t, "fs", "cp", sourceDir, path.Join(targetDir, "my_target"), "--recursive", "--overwrite") - assert.Error(t, err) - } + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) + + // Write a conflicting file to target + err := targetFiler.Write(context.Background(), "my_target", strings.NewReader("I'll block any attempts to recursively copy"), filer.CreateParentDirectories) + require.NoError(t, err) + _, _, err = RequireErrorRun(t, "fs", "cp", sourceDir, path.Join(targetDir, "my_target"), "--recursive") + assert.Error(t, err) + }) + } } diff --git a/internal/fs_ls_test.go b/internal/fs_ls_test.go index 9e02b09cc6..994a4a4255 100644 --- a/internal/fs_ls_test.go +++ b/internal/fs_ls_test.go @@ -11,131 +11,163 @@ import ( _ "github.com/databricks/cli/cmd/fs" "github.com/databricks/cli/libs/filer" - "github.com/databricks/databricks-sdk-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestAccFsLsForDbfs(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +type fsTest struct { + name string + setupFiler func(t *testing.T) (filer.Filer, string) +} - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() +var fsTests = []fsTest{ + { + name: "dbfs", + setupFiler: setupDbfsFiler, + }, + { + name: "uc-volumes", + setupFiler: setupUcVolumesFiler, + }, +} + +func setupLsFiles(t *testing.T, f filer.Filer) { + err := f.Write(context.Background(), "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories) + require.NoError(t, err) + err = f.Write(context.Background(), "bye.txt", strings.NewReader("def")) require.NoError(t, err) +} - tmpDir := TemporaryDbfsDir(t, w) +func TestAccFsLs(t *testing.T) { + t.Parallel() - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - err = f.Mkdir(ctx, "a") - require.NoError(t, err) - err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories) - require.NoError(t, err) - err = f.Write(ctx, "bye.txt", strings.NewReader("def")) - require.NoError(t, err) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", "dbfs:"+tmpDir, "--output=json") - assert.Equal(t, "", stderr.String()) - var parsedStdout []map[string]any - err = json.Unmarshal(stdout.Bytes(), &parsedStdout) - require.NoError(t, err) + f, tmpDir := tc.setupFiler(t) + setupLsFiles(t, f) - // assert on ls output - assert.Len(t, parsedStdout, 2) - assert.Equal(t, "a", parsedStdout[0]["name"]) - assert.Equal(t, true, parsedStdout[0]["is_directory"]) - assert.Equal(t, float64(0), parsedStdout[0]["size"]) - assert.Equal(t, "bye.txt", parsedStdout[1]["name"]) - assert.Equal(t, false, parsedStdout[1]["is_directory"]) - assert.Equal(t, float64(3), parsedStdout[1]["size"]) -} + stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", tmpDir, "--output=json") + assert.Equal(t, "", stderr.String()) -func TestAccFsLsForDbfsWithAbsolutePaths(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + var parsedStdout []map[string]any + err := json.Unmarshal(stdout.Bytes(), &parsedStdout) + require.NoError(t, err) - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + // assert on ls output + assert.Len(t, parsedStdout, 2) - tmpDir := TemporaryDbfsDir(t, w) + assert.Equal(t, "a", parsedStdout[0]["name"]) + assert.Equal(t, true, parsedStdout[0]["is_directory"]) + assert.Equal(t, float64(0), parsedStdout[0]["size"]) - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + assert.Equal(t, "bye.txt", parsedStdout[1]["name"]) + assert.Equal(t, false, parsedStdout[1]["is_directory"]) + assert.Equal(t, float64(3), parsedStdout[1]["size"]) + }) + } +} - err = f.Mkdir(ctx, "a") - require.NoError(t, err) - err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories) - require.NoError(t, err) - err = f.Write(ctx, "bye.txt", strings.NewReader("def")) - require.NoError(t, err) +func TestAccFsLsWithAbsolutePaths(t *testing.T) { + t.Parallel() - stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", "dbfs:"+tmpDir, "--output=json", "--absolute") - assert.Equal(t, "", stderr.String()) - var parsedStdout []map[string]any - err = json.Unmarshal(stdout.Bytes(), &parsedStdout) - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - // assert on ls output - assert.Len(t, parsedStdout, 2) - assert.Equal(t, path.Join("dbfs:", tmpDir, "a"), parsedStdout[0]["name"]) - assert.Equal(t, true, parsedStdout[0]["is_directory"]) - assert.Equal(t, float64(0), parsedStdout[0]["size"]) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - assert.Equal(t, path.Join("dbfs:", tmpDir, "bye.txt"), parsedStdout[1]["name"]) - assert.Equal(t, false, parsedStdout[1]["is_directory"]) - assert.Equal(t, float64(3), parsedStdout[1]["size"]) -} + f, tmpDir := tc.setupFiler(t) + setupLsFiles(t, f) -func TestAccFsLsForDbfsOnFile(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", tmpDir, "--output=json", "--absolute") + assert.Equal(t, "", stderr.String()) - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + var parsedStdout []map[string]any + err := json.Unmarshal(stdout.Bytes(), &parsedStdout) + require.NoError(t, err) - tmpDir := TemporaryDbfsDir(t, w) + // assert on ls output + assert.Len(t, parsedStdout, 2) - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + assert.Equal(t, path.Join(tmpDir, "a"), parsedStdout[0]["name"]) + assert.Equal(t, true, parsedStdout[0]["is_directory"]) + assert.Equal(t, float64(0), parsedStdout[0]["size"]) - err = f.Mkdir(ctx, "a") - require.NoError(t, err) - err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories) - require.NoError(t, err) + assert.Equal(t, path.Join(tmpDir, "bye.txt"), parsedStdout[1]["name"]) + assert.Equal(t, false, parsedStdout[1]["is_directory"]) + assert.Equal(t, float64(3), parsedStdout[1]["size"]) + }) + } +} + +func TestAccFsLsOnFile(t *testing.T) { + t.Parallel() + + for _, testCase := range fsTests { + tc := testCase + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + f, tmpDir := tc.setupFiler(t) + setupLsFiles(t, f) - _, _, err = RequireErrorRun(t, "fs", "ls", "dbfs:"+path.Join(tmpDir, "a", "hello.txt"), "--output=json") - assert.Regexp(t, regexp.MustCompile("not a directory: .*/a/hello.txt"), err.Error()) + _, _, err := RequireErrorRun(t, "fs", "ls", path.Join(tmpDir, "a", "hello.txt"), "--output=json") + assert.Regexp(t, regexp.MustCompile("not a directory: .*/a/hello.txt"), err.Error()) + assert.ErrorAs(t, err, &filer.NotADirectory{}) + }) + } } -func TestAccFsLsForDbfsOnEmptyDir(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsLsOnEmptyDir(t *testing.T) { + t.Parallel() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", "dbfs:"+tmpDir, "--output=json") - assert.Equal(t, "", stderr.String()) - var parsedStdout []map[string]any - err = json.Unmarshal(stdout.Bytes(), &parsedStdout) - require.NoError(t, err) + _, tmpDir := tc.setupFiler(t) - // assert on ls output - assert.Equal(t, 0, len(parsedStdout)) + stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", tmpDir, "--output=json") + assert.Equal(t, "", stderr.String()) + var parsedStdout []map[string]any + err := json.Unmarshal(stdout.Bytes(), &parsedStdout) + require.NoError(t, err) + + // assert on ls output + assert.Equal(t, 0, len(parsedStdout)) + }) + } } -func TestAccFsLsForDbfsForNonexistingDir(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsLsForNonexistingDir(t *testing.T) { + t.Parallel() - _, _, err := RequireErrorRun(t, "fs", "ls", "dbfs:/john-cena", "--output=json") - assert.ErrorIs(t, err, fs.ErrNotExist) + for _, testCase := range fsTests { + tc := testCase + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + _, tmpDir := tc.setupFiler(t) + + _, _, err := RequireErrorRun(t, "fs", "ls", path.Join(tmpDir, "nonexistent"), "--output=json") + assert.ErrorIs(t, err, fs.ErrNotExist) + assert.Regexp(t, regexp.MustCompile("no such directory: .*/nonexistent"), err.Error()) + }) + } } func TestAccFsLsWithoutScheme(t *testing.T) { + t.Parallel() + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - _, _, err := RequireErrorRun(t, "fs", "ls", "/ray-mysterio", "--output=json") + _, _, err := RequireErrorRun(t, "fs", "ls", "/path-without-a-dbfs-scheme", "--output=json") assert.ErrorIs(t, err, fs.ErrNotExist) } diff --git a/internal/fs_mkdir_test.go b/internal/fs_mkdir_test.go index af0e9d1870..dd75c7c327 100644 --- a/internal/fs_mkdir_test.go +++ b/internal/fs_mkdir_test.go @@ -8,110 +8,127 @@ import ( "testing" "github.com/databricks/cli/libs/filer" - "github.com/databricks/databricks-sdk-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestAccFsMkdirCreatesDirectory(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsMkdir(t *testing.T) { + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + f, tmpDir := tc.setupFiler(t) - // create directory "a" - stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", "dbfs:"+path.Join(tmpDir, "a")) - assert.Equal(t, "", stderr.String()) - assert.Equal(t, "", stdout.String()) + // create directory "a" + stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", path.Join(tmpDir, "a")) + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "", stdout.String()) - // assert directory "a" is created - info, err := f.Stat(ctx, "a") - require.NoError(t, err) - assert.Equal(t, "a", info.Name()) - assert.Equal(t, true, info.IsDir()) + // assert directory "a" is created + info, err := f.Stat(context.Background(), "a") + require.NoError(t, err) + assert.Equal(t, "a", info.Name()) + assert.Equal(t, true, info.IsDir()) + }) + } } -func TestAccFsMkdirCreatesMultipleDirectories(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) +func TestAccFsMkdirCreatesIntermediateDirectories(t *testing.T) { + t.Parallel() + + for _, testCase := range fsTests { + tc := testCase + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + f, tmpDir := tc.setupFiler(t) + + // create directory "a/b/c" + stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", path.Join(tmpDir, "a", "b", "c")) + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "", stdout.String()) + + // assert directory "a" is created + infoA, err := f.Stat(context.Background(), "a") + require.NoError(t, err) + assert.Equal(t, "a", infoA.Name()) + assert.Equal(t, true, infoA.IsDir()) + + // assert directory "b" is created + infoB, err := f.Stat(context.Background(), "a/b") + require.NoError(t, err) + assert.Equal(t, "b", infoB.Name()) + assert.Equal(t, true, infoB.IsDir()) + + // assert directory "c" is created + infoC, err := f.Stat(context.Background(), "a/b/c") + require.NoError(t, err) + assert.Equal(t, "c", infoC.Name()) + assert.Equal(t, true, infoC.IsDir()) + }) + } +} - tmpDir := TemporaryDbfsDir(t, w) +func TestAccFsMkdirWhenDirectoryAlreadyExists(t *testing.T) { + t.Parallel() - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - // create directory /a/b/c - stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", "dbfs:"+path.Join(tmpDir, "a", "b", "c")) - assert.Equal(t, "", stderr.String()) - assert.Equal(t, "", stdout.String()) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - // assert directory "a" is created - infoA, err := f.Stat(ctx, "a") - require.NoError(t, err) - assert.Equal(t, "a", infoA.Name()) - assert.Equal(t, true, infoA.IsDir()) + f, tmpDir := tc.setupFiler(t) - // assert directory "b" is created - infoB, err := f.Stat(ctx, "a/b") - require.NoError(t, err) - assert.Equal(t, "b", infoB.Name()) - assert.Equal(t, true, infoB.IsDir()) + // create directory "a" + err := f.Mkdir(context.Background(), "a") + require.NoError(t, err) - // assert directory "c" is created - infoC, err := f.Stat(ctx, "a/b/c") - require.NoError(t, err) - assert.Equal(t, "c", infoC.Name()) - assert.Equal(t, true, infoC.IsDir()) + // assert run is successful without any errors + stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", path.Join(tmpDir, "a")) + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "", stdout.String()) + }) + } } -func TestAccFsMkdirWhenDirectoryAlreadyExists(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsMkdirWhenFileExistsAtPath(t *testing.T) { + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + t.Run("dbfs", func(t *testing.T) { + t.Parallel() - tmpDir := TemporaryDbfsDir(t, w) + f, tmpDir := setupDbfsFiler(t) - // create directory "a" - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) - err = f.Mkdir(ctx, "a") - require.NoError(t, err) + // create file "hello" + err := f.Write(context.Background(), "hello", strings.NewReader("abc")) + require.NoError(t, err) - // assert run is successful without any errors - stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", "dbfs:"+path.Join(tmpDir, "a")) - assert.Equal(t, "", stderr.String()) - assert.Equal(t, "", stdout.String()) -} + // assert mkdir fails + _, _, err = RequireErrorRun(t, "fs", "mkdir", path.Join(tmpDir, "hello")) -func TestAccFsMkdirWhenFileExistsAtPath(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + // Different cloud providers return different errors. + regex := regexp.MustCompile(`(^|: )Path is a file: .*$|(^|: )Cannot create directory .* because .* is an existing file\.$|(^|: )mkdirs\(hadoopPath: .*, permission: rwxrwxrwx\): failed$`) + assert.Regexp(t, regex, err.Error()) + }) + + t.Run("uc-volumes", func(t *testing.T) { + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + f, tmpDir := setupUcVolumesFiler(t) - tmpDir := TemporaryDbfsDir(t, w) + // create file "hello" + err := f.Write(context.Background(), "hello", strings.NewReader("abc")) + require.NoError(t, err) - // create file hello - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) - err = f.Write(ctx, "hello", strings.NewReader("abc")) - require.NoError(t, err) + // assert mkdir fails + _, _, err = RequireErrorRun(t, "fs", "mkdir", path.Join(tmpDir, "hello")) - // assert run fails - _, _, err = RequireErrorRun(t, "fs", "mkdir", "dbfs:"+path.Join(tmpDir, "hello")) - // Different cloud providers return different errors. - regex := regexp.MustCompile(`(^|: )Path is a file: .*$|(^|: )Cannot create directory .* because .* is an existing file\.$|(^|: )mkdirs\(hadoopPath: .*, permission: rwxrwxrwx\): failed$`) - assert.Regexp(t, regex, err.Error()) + assert.ErrorAs(t, err, &filer.FileAlreadyExistsError{}) + }) } diff --git a/internal/fs_rm_test.go b/internal/fs_rm_test.go index d70827d1a2..e86f5713bb 100644 --- a/internal/fs_rm_test.go +++ b/internal/fs_rm_test.go @@ -8,139 +8,150 @@ import ( "testing" "github.com/databricks/cli/libs/filer" - "github.com/databricks/databricks-sdk-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestAccFsRmForFile(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsRmFile(t *testing.T) { + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + // Create a file + f, tmpDir := tc.setupFiler(t) + err := f.Write(context.Background(), "hello.txt", strings.NewReader("abcd"), filer.CreateParentDirectories) + require.NoError(t, err) - // create file to delete - err = f.Write(ctx, "hello.txt", strings.NewReader("abc")) - require.NoError(t, err) + // Check file was created + _, err = f.Stat(context.Background(), "hello.txt") + assert.NoError(t, err) - // check file was created - info, err := f.Stat(ctx, "hello.txt") - require.NoError(t, err) - require.Equal(t, "hello.txt", info.Name()) - require.Equal(t, info.IsDir(), false) + // Run rm command + stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", path.Join(tmpDir, "hello.txt")) + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "", stdout.String()) - // Run rm command - stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", "dbfs:"+path.Join(tmpDir, "hello.txt")) - assert.Equal(t, "", stderr.String()) - assert.Equal(t, "", stdout.String()) - - // assert file was deleted - _, err = f.Stat(ctx, "hello.txt") - assert.ErrorIs(t, err, fs.ErrNotExist) + // Assert file was deleted + _, err = f.Stat(context.Background(), "hello.txt") + assert.ErrorIs(t, err, fs.ErrNotExist) + }) + } } -func TestAccFsRmForEmptyDirectory(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) +func TestAccFsRmEmptyDir(t *testing.T) { + t.Parallel() - tmpDir := TemporaryDbfsDir(t, w) + for _, testCase := range fsTests { + tc := testCase - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - // create directory to delete - err = f.Mkdir(ctx, "avacado") - require.NoError(t, err) + // Create a directory + f, tmpDir := tc.setupFiler(t) + err := f.Mkdir(context.Background(), "a") + require.NoError(t, err) - // check directory was created - info, err := f.Stat(ctx, "avacado") - require.NoError(t, err) - require.Equal(t, "avacado", info.Name()) - require.Equal(t, info.IsDir(), true) + // Check directory was created + _, err = f.Stat(context.Background(), "a") + assert.NoError(t, err) - // Run rm command - stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", "dbfs:"+path.Join(tmpDir, "avacado")) - assert.Equal(t, "", stderr.String()) - assert.Equal(t, "", stdout.String()) + // Run rm command + stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", path.Join(tmpDir, "a")) + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "", stdout.String()) - // assert directory was deleted - _, err = f.Stat(ctx, "avacado") - assert.ErrorIs(t, err, fs.ErrNotExist) + // Assert directory was deleted + _, err = f.Stat(context.Background(), "a") + assert.ErrorIs(t, err, fs.ErrNotExist) + }) + } } -func TestAccFsRmForNonEmptyDirectory(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsRmNonEmptyDirectory(t *testing.T) { + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + // Create a directory + f, tmpDir := tc.setupFiler(t) + err := f.Mkdir(context.Background(), "a") + require.NoError(t, err) - // create file in dir - err = f.Write(ctx, "avacado/guacamole", strings.NewReader("abc"), filer.CreateParentDirectories) - require.NoError(t, err) + // Create a file in the directory + err = f.Write(context.Background(), "a/hello.txt", strings.NewReader("abcd"), filer.CreateParentDirectories) + require.NoError(t, err) - // check file was created - info, err := f.Stat(ctx, "avacado/guacamole") - require.NoError(t, err) - require.Equal(t, "guacamole", info.Name()) - require.Equal(t, info.IsDir(), false) + // Check file was created + _, err = f.Stat(context.Background(), "a/hello.txt") + assert.NoError(t, err) - // Run rm command - _, _, err = RequireErrorRun(t, "fs", "rm", "dbfs:"+path.Join(tmpDir, "avacado")) - assert.ErrorIs(t, err, fs.ErrInvalid) - assert.ErrorContains(t, err, "directory not empty") + // Run rm command + _, _, err = RequireErrorRun(t, "fs", "rm", path.Join(tmpDir, "a")) + assert.ErrorIs(t, err, fs.ErrInvalid) + assert.ErrorAs(t, err, &filer.DirectoryNotEmptyError{}) + }) + } } func TestAccFsRmForNonExistentFile(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + t.Parallel() + + for _, testCase := range fsTests { + tc := testCase + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + _, tmpDir := tc.setupFiler(t) + + // Expect error if file does not exist + _, _, err := RequireErrorRun(t, "fs", "rm", path.Join(tmpDir, "does-not-exist")) + assert.ErrorIs(t, err, fs.ErrNotExist) + }) + } - // Expect error if file does not exist - _, _, err := RequireErrorRun(t, "fs", "rm", "dbfs:/does-not-exist") - assert.ErrorIs(t, err, fs.ErrNotExist) } -func TestAccFsRmForNonEmptyDirectoryWithRecursiveFlag(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsRmDirRecursively(t *testing.T) { + t.Parallel() + + for _, testCase := range fsTests { + tc := testCase - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - tmpDir := TemporaryDbfsDir(t, w) + f, tmpDir := tc.setupFiler(t) - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + // Create a directory + err := f.Mkdir(context.Background(), "a") + require.NoError(t, err) - // create file in dir - err = f.Write(ctx, "avacado/guacamole", strings.NewReader("abc"), filer.CreateParentDirectories) - require.NoError(t, err) + // Create a file in the directory + err = f.Write(context.Background(), "a/hello.txt", strings.NewReader("abcd"), filer.CreateParentDirectories) + require.NoError(t, err) - // check file was created - info, err := f.Stat(ctx, "avacado/guacamole") - require.NoError(t, err) - require.Equal(t, "guacamole", info.Name()) - require.Equal(t, info.IsDir(), false) + // Check file was created + _, err = f.Stat(context.Background(), "a/hello.txt") + assert.NoError(t, err) - // Run rm command - stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", "dbfs:"+path.Join(tmpDir, "avacado"), "--recursive") - assert.Equal(t, "", stderr.String()) - assert.Equal(t, "", stdout.String()) + // Run rm command + stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", path.Join(tmpDir, "a"), "--recursive") + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "", stdout.String()) - // assert directory was deleted - _, err = f.Stat(ctx, "avacado") - assert.ErrorIs(t, err, fs.ErrNotExist) + // Assert directory was deleted + _, err = f.Stat(context.Background(), "a") + assert.ErrorIs(t, err, fs.ErrNotExist) + }) + } } diff --git a/internal/helpers.go b/internal/helpers.go index 6377ae07e7..ca5aa25e4a 100644 --- a/internal/helpers.go +++ b/internal/helpers.go @@ -5,10 +5,13 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "math/rand" + "net/http" "os" + "path" "path/filepath" "reflect" "strings" @@ -19,8 +22,10 @@ import ( "github.com/databricks/cli/cmd" _ "github.com/databricks/cli/cmd/version" "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/filer" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/files" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -452,6 +457,40 @@ func TemporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string { return path } +// Create a new UC volume in a catalog called "main" in the workspace. +func temporaryUcVolume(t *testing.T, w *databricks.WorkspaceClient) string { + ctx := context.Background() + + // Create a schema + schema, err := w.Schemas.Create(ctx, catalog.CreateSchema{ + CatalogName: "main", + Name: RandomName("test-schema-"), + }) + require.NoError(t, err) + t.Cleanup(func() { + w.Schemas.Delete(ctx, catalog.DeleteSchemaRequest{ + FullName: schema.FullName, + }) + }) + + // Create a volume + volume, err := w.Volumes.Create(ctx, catalog.CreateVolumeRequestContent{ + CatalogName: "main", + SchemaName: schema.Name, + Name: "my-volume", + VolumeType: catalog.VolumeTypeManaged, + }) + require.NoError(t, err) + t.Cleanup(func() { + w.Volumes.Delete(ctx, catalog.DeleteVolumeRequest{ + Name: volume.FullName, + }) + }) + + return path.Join("/Volumes", "main", schema.Name, volume.Name) + +} + func TemporaryRepo(t *testing.T, w *databricks.WorkspaceClient) string { ctx := context.Background() me, err := w.CurrentUser.Me(ctx) @@ -489,3 +528,62 @@ func GetNodeTypeId(env string) string { } return "Standard_DS4_v2" } + +func setupLocalFiler(t *testing.T) (filer.Filer, string) { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + tmp := t.TempDir() + f, err := filer.NewLocalClient(tmp) + require.NoError(t, err) + + return f, path.Join(filepath.ToSlash(tmp)) +} + +func setupWsfsFiler(t *testing.T) (filer.Filer, string) { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + ctx := context.Background() + w := databricks.Must(databricks.NewWorkspaceClient()) + tmpdir := TemporaryWorkspaceDir(t, w) + f, err := filer.NewWorkspaceFilesClient(w, tmpdir) + require.NoError(t, err) + + // Check if we can use this API here, skip test if we cannot. + _, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled") + var aerr *apierr.APIError + if errors.As(err, &aerr) && aerr.StatusCode == http.StatusBadRequest { + t.Skip(aerr.Message) + } + + return f, tmpdir +} + +func setupDbfsFiler(t *testing.T) (filer.Filer, string) { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + w, err := databricks.NewWorkspaceClient() + require.NoError(t, err) + + tmpDir := TemporaryDbfsDir(t, w) + f, err := filer.NewDbfsClient(w, tmpDir) + require.NoError(t, err) + + return f, path.Join("dbfs:/", tmpDir) +} + +func setupUcVolumesFiler(t *testing.T) (filer.Filer, string) { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + if os.Getenv("TEST_METASTORE_ID") == "" { + t.Skip("Skipping tests that require a UC Volume when metastore id is not set.") + } + + w, err := databricks.NewWorkspaceClient() + require.NoError(t, err) + + tmpDir := temporaryUcVolume(t, w) + f, err := filer.NewFilesClient(w, tmpDir) + require.NoError(t, err) + + return f, path.Join("dbfs:/", tmpDir) +} diff --git a/libs/filer/files_client.go b/libs/filer/files_client.go index 17884d573a..9fc68bd562 100644 --- a/libs/filer/files_client.go +++ b/libs/filer/files_client.go @@ -11,18 +11,30 @@ import ( "net/url" "path" "slices" + "sort" "strings" "time" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/client" + "github.com/databricks/databricks-sdk-go/listing" + "github.com/databricks/databricks-sdk-go/service/files" + "golang.org/x/sync/errgroup" ) +// As of 19th Feb 2024, the Files API backend has a rate limit of 10 concurrent +// requests and 100 QPS. We limit the number of concurrent requests to 5 to +// avoid hitting the rate limit. +const maxFilesRequestsInFlight = 5 + // Type that implements fs.FileInfo for the Files API. +// This is required for the filer.Stat() method. type filesApiFileInfo struct { - absPath string - isDir bool + absPath string + isDir bool + fileSize int64 + lastModified int64 } func (info filesApiFileInfo) Name() string { @@ -30,8 +42,7 @@ func (info filesApiFileInfo) Name() string { } func (info filesApiFileInfo) Size() int64 { - // No way to get the file size in the Files API. - return 0 + return info.fileSize } func (info filesApiFileInfo) Mode() fs.FileMode { @@ -43,7 +54,7 @@ func (info filesApiFileInfo) Mode() fs.FileMode { } func (info filesApiFileInfo) ModTime() time.Time { - return time.Time{} + return time.UnixMilli(info.lastModified) } func (info filesApiFileInfo) IsDir() bool { @@ -54,6 +65,28 @@ func (info filesApiFileInfo) Sys() any { return nil } +// Type that implements fs.DirEntry for the Files API. +// This is required for the filer.ReadDir() method. +type filesApiDirEntry struct { + i filesApiFileInfo +} + +func (e filesApiDirEntry) Name() string { + return e.i.Name() +} + +func (e filesApiDirEntry) IsDir() bool { + return e.i.IsDir() +} + +func (e filesApiDirEntry) Type() fs.FileMode { + return e.i.Mode() +} + +func (e filesApiDirEntry) Info() (fs.FileInfo, error) { + return e.i, nil +} + // FilesClient implements the [Filer] interface for the Files API backend. type FilesClient struct { workspaceClient *databricks.WorkspaceClient @@ -63,10 +96,6 @@ type FilesClient struct { root WorkspaceRootPath } -func filesNotImplementedError(fn string) error { - return fmt.Errorf("filer.%s is not implemented for the Files API", fn) -} - func NewFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) { apiClient, err := client.New(w.Config) if err != nil { @@ -102,6 +131,24 @@ func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader, return err } + // Check that target path exists if CreateParentDirectories mode is not set + if !slices.Contains(mode, CreateParentDirectories) { + err := w.workspaceClient.Files.GetDirectoryMetadataByDirectoryPath(ctx, path.Dir(absPath)) + if err != nil { + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return err + } + + // This API returns a 404 if the file doesn't exist. + if aerr.StatusCode == http.StatusNotFound { + return NoSuchDirectoryError{path.Dir(absPath)} + } + + return err + } + } + overwrite := slices.Contains(mode, OverwriteIfExists) urlPath = fmt.Sprintf("%s?overwrite=%t", urlPath, overwrite) headers := map[string]string{"Content-Type": "application/octet-stream"} @@ -119,7 +166,7 @@ func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader, } // This API returns 409 if the file already exists, when the object type is file - if aerr.StatusCode == http.StatusConflict { + if aerr.StatusCode == http.StatusConflict && aerr.ErrorCode == "ALREADY_EXISTS" { return FileAlreadyExistsError{absPath} } @@ -148,14 +195,20 @@ func (w *FilesClient) Read(ctx context.Context, name string) (io.ReadCloser, err // This API returns a 404 if the specified path does not exist. if aerr.StatusCode == http.StatusNotFound { + // Check if the path is a directory. If so, return not a file error. + if _, err := w.statDir(ctx, name); err == nil { + return nil, NotAFile{absPath} + } + + // No file or directory exists at the specified path. Return no such file error. return nil, FileDoesNotExistError{absPath} } return nil, err } -func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { - absPath, urlPath, err := w.urlPath(name) +func (w *FilesClient) deleteFile(ctx context.Context, name string) error { + absPath, err := w.root.Join(name) if err != nil { return err } @@ -165,53 +218,232 @@ func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMod return CannotDeleteRootError{} } - err = w.apiClient.Do(ctx, http.MethodDelete, urlPath, nil, nil, nil) + err = w.workspaceClient.Files.DeleteByFilePath(ctx, absPath) // Return early on success. if err == nil { return nil } - // Special handling of this error only if it is an API error. var aerr *apierr.APIError + // Special handling of this error only if it is an API error. if !errors.As(err, &aerr) { return err } - // This API returns a 404 if the specified path does not exist. + // This files delete API returns a 404 if the specified path does not exist. if aerr.StatusCode == http.StatusNotFound { return FileDoesNotExistError{absPath} } - // This API returns 409 if the underlying path is a directory. - if aerr.StatusCode == http.StatusConflict { + return err +} + +func (w *FilesClient) deleteDirectory(ctx context.Context, name string) error { + absPath, err := w.root.Join(name) + if err != nil { + return err + } + + // Illegal to delete the root path. + if absPath == w.root.rootPath { + return CannotDeleteRootError{} + } + + err = w.workspaceClient.Files.DeleteDirectoryByDirectoryPath(ctx, absPath) + + var aerr *apierr.APIError + // Special handling of this error only if it is an API error. + if !errors.As(err, &aerr) { + return err + } + + // The directory delete API returns a 400 if the directory is not empty + if aerr.StatusCode == http.StatusBadRequest { + reasons := []string{} + for _, detail := range aerr.Details { + reasons = append(reasons, detail.Reason) + } + // Error code 400 is generic and can be returned for other reasons. Make + // sure one of the reasons for the error is that the directory is not empty. + if !slices.Contains(reasons, "FILES_API_DIRECTORY_IS_NOT_EMPTY") { + return err + } return DirectoryNotEmptyError{absPath} } return err } +func (w *FilesClient) recursiveDelete(ctx context.Context, name string) error { + filerFS := NewFS(ctx, w) + dirsToDelete := make([]string, 0) + filesToDelete := make([]string, 0) + callback := func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + // Files API does not allowing deleting non-empty directories. We instead + // collect the directories to delete and delete them once all the files have + // been deleted. + if d.IsDir() { + dirsToDelete = append(dirsToDelete, path) + return nil + } + + filesToDelete = append(filesToDelete, path) + return nil + } + + // Walk the directory and accumulate the files and directories to delete. + err := fs.WalkDir(filerFS, name, callback) + if err != nil { + return err + } + + // Delete the files in parallel. + group, groupCtx := errgroup.WithContext(ctx) + group.SetLimit(maxFilesRequestsInFlight) + + for _, file := range filesToDelete { + file := file + + // Skip the file if the context has already been cancelled. + select { + case <-groupCtx.Done(): + continue + default: + // Proceed. + } + + group.Go(func() error { + return w.deleteFile(groupCtx, file) + }) + } + + // Wait for the files to be deleted and return the first non-nil error. + err = group.Wait() + if err != nil { + return err + } + + // Delete the directories in reverse order to ensure that the parent + // directories are deleted after the children. This is possible because + // fs.WalkDir walks the directories in lexicographical order. + for i := len(dirsToDelete) - 1; i >= 0; i-- { + err := w.deleteDirectory(ctx, dirsToDelete[i]) + if err != nil { + return err + } + } + return nil +} + +func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { + if slices.Contains(mode, DeleteRecursively) { + return w.recursiveDelete(ctx, name) + } + + // Issue a stat call to determine if the path is a file or directory. + info, err := w.Stat(ctx, name) + if err != nil { + return err + } + + // Issue the delete call for a directory + if info.IsDir() { + return w.deleteDirectory(ctx, name) + } + + return w.deleteFile(ctx, name) +} + func (w *FilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { - return nil, filesNotImplementedError("ReadDir") + absPath, err := w.root.Join(name) + if err != nil { + return nil, err + } + + iter := w.workspaceClient.Files.ListDirectoryContents(ctx, files.ListDirectoryContentsRequest{ + DirectoryPath: absPath, + }) + + files, err := listing.ToSlice(ctx, iter) + + // Return early on success. + if err == nil { + entries := make([]fs.DirEntry, len(files)) + for i, file := range files { + entries[i] = filesApiDirEntry{ + i: filesApiFileInfo{ + absPath: file.Path, + isDir: file.IsDirectory, + fileSize: file.FileSize, + lastModified: file.LastModified, + }, + } + } + + // Sort by name for parity with os.ReadDir. + sort.Slice(entries, func(i, j int) bool { return entries[i].Name() < entries[j].Name() }) + return entries, nil + } + + // Special handling of this error only if it is an API error. + var apierr *apierr.APIError + if !errors.As(err, &apierr) { + return nil, err + } + + // This API returns a 404 if the specified path does not exist. + if apierr.StatusCode == http.StatusNotFound { + // Check if the path is a file. If so, return not a directory error. + if _, err := w.statFile(ctx, name); err == nil { + return nil, NotADirectory{absPath} + } + + // No file or directory exists at the specified path. Return no such directory error. + return nil, NoSuchDirectoryError{absPath} + } + return nil, err } func (w *FilesClient) Mkdir(ctx context.Context, name string) error { - // Directories are created implicitly. - // No need to do anything. - return nil + absPath, err := w.root.Join(name) + if err != nil { + return err + } + + err = w.workspaceClient.Files.CreateDirectory(ctx, files.CreateDirectoryRequest{ + DirectoryPath: absPath, + }) + + // Special handling of this error only if it is an API error. + var aerr *apierr.APIError + if errors.As(err, &aerr) && aerr.StatusCode == http.StatusConflict { + return FileAlreadyExistsError{absPath} + } + + return err } -func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) { - absPath, urlPath, err := w.urlPath(name) +// Get file metadata for a file using the Files API. +func (w *FilesClient) statFile(ctx context.Context, name string) (fs.FileInfo, error) { + absPath, err := w.root.Join(name) if err != nil { return nil, err } - err = w.apiClient.Do(ctx, http.MethodHead, urlPath, nil, nil, nil) + fileInfo, err := w.workspaceClient.Files.GetMetadataByFilePath(ctx, absPath) // If the HEAD requests succeeds, the file exists. if err == nil { - return filesApiFileInfo{absPath: absPath, isDir: false}, nil + return filesApiFileInfo{ + absPath: absPath, + isDir: false, + fileSize: fileInfo.ContentLength, + }, nil } // Special handling of this error only if it is an API error. @@ -225,10 +457,51 @@ func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error return nil, FileDoesNotExistError{absPath} } - // This API returns 409 if the underlying path is a directory. - if aerr.StatusCode == http.StatusConflict { + return nil, err +} + +// Get file metadata for a directory using the Files API. +func (w *FilesClient) statDir(ctx context.Context, name string) (fs.FileInfo, error) { + absPath, err := w.root.Join(name) + if err != nil { + return nil, err + } + + err = w.workspaceClient.Files.GetDirectoryMetadataByDirectoryPath(ctx, absPath) + + // If the HEAD requests succeeds, the directory exists. + if err == nil { return filesApiFileInfo{absPath: absPath, isDir: true}, nil } + // Special handling of this error only if it is an API error. + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return nil, err + } + + // The directory metadata API returns a 404 if the specified path does not exist. + if aerr.StatusCode == http.StatusNotFound { + return nil, NoSuchDirectoryError{absPath} + } + return nil, err } + +func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) { + // Assume that the path is a directory and issue a stat call. + dirInfo, err := w.statDir(ctx, name) + + // If the file exists, return early. + if err == nil { + return dirInfo, nil + } + + // Return early if the error is not a NoSuchDirectoryError. + if !errors.As(err, &NoSuchDirectoryError{}) { + return nil, err + } + + // Since the path is not a directory, assume that it is a file and issue a stat call. + return w.statFile(ctx, name) +}