This repository has been archived by the owner on Jul 15, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 64
/
Copy pathFileSystem.go
145 lines (131 loc) · 4.14 KB
/
FileSystem.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.
package main
import (
"bazil.org/fuse"
"bazil.org/fuse/fs"
"golang.org/x/net/context"
"io"
"log"
"os"
"os/exec"
"strings"
"sync"
)
type FileSystem struct {
MountPoint string // Path to the mount point on a local file system
HdfsAccessor HdfsAccessor // Interface to access HDFS
AllowedPrefixes []string // List of allowed path prefixes (only those prefixes are exposed via mountpoint)
ExpandZips bool // Indicates whether ZIP expansion feature is enabled
ReadOnly bool // Indicates whether mount filesystem with readonly
Mounted bool // True if filesystem is mounted
RetryPolicy *RetryPolicy // Retry policy
Clock Clock // interface to get wall clock time
FsInfo FsInfo // Usage of HDFS, including capacity, remaining, used sizes.
closeOnUnmount []io.Closer // list of opened files (zip archives) to be closed on unmount
closeOnUnmountLock sync.Mutex // mutex to protet closeOnUnmount
}
// Verify that *FileSystem implements necesary FUSE interfaces
var _ fs.FS = (*FileSystem)(nil)
var _ fs.FSStatfser = (*FileSystem)(nil)
// Creates an instance of mountable file system
func NewFileSystem(hdfsAccessor HdfsAccessor, mountPoint string, allowedPrefixes []string, expandZips bool, readOnly bool, retryPolicy *RetryPolicy, clock Clock) (*FileSystem, error) {
return &FileSystem{
HdfsAccessor: hdfsAccessor,
MountPoint: mountPoint,
Mounted: false,
AllowedPrefixes: allowedPrefixes,
ExpandZips: expandZips,
ReadOnly: readOnly,
RetryPolicy: retryPolicy,
Clock: clock}, nil
}
// Mounts the filesystem
func (this *FileSystem) Mount() (*fuse.Conn, error) {
var conn *fuse.Conn
var err error
if this.ReadOnly {
conn, err = fuse.Mount(
this.MountPoint,
fuse.FSName("hdfs"),
fuse.Subtype("hdfs"),
fuse.VolumeName("HDFS filesystem"),
fuse.AllowOther(),
fuse.WritebackCache(),
fuse.MaxReadahead(1024*64), //TODO: make configurable
fuse.ReadOnly())
} else {
conn, err = fuse.Mount(
this.MountPoint,
fuse.FSName("hdfs"),
fuse.Subtype("hdfs"),
fuse.VolumeName("HDFS filesystem"),
fuse.AllowOther(),
fuse.WritebackCache(),
fuse.MaxReadahead(1024*64)) //TODO: make configurable
}
if err != nil {
return nil, err
}
this.Mounted = true
return conn, nil
}
// Unmounts the filesysten (invokes fusermount tool)
func (this *FileSystem) Unmount() {
if !this.Mounted {
return
}
this.Mounted = false
log.Print("Unmounting...")
cmd := exec.Command("fusermount", "-zu", this.MountPoint)
err := cmd.Run()
// Closing all the files
this.closeOnUnmountLock.Lock()
defer this.closeOnUnmountLock.Unlock()
for _, f := range this.closeOnUnmount {
f.Close()
}
if err != nil {
log.Fatal(err)
}
}
// Returns root directory of the filesystem
func (this *FileSystem) Root() (fs.Node, error) {
return &Dir{FileSystem: this, Attrs: Attrs{Inode: 1, Name: "", Mode: 0755 | os.ModeDir}}, nil
}
// Returns if given absoute path allowed by any of the prefixes
func (this *FileSystem) IsPathAllowed(path string) bool {
if path == "/" {
return true
}
for _, prefix := range this.AllowedPrefixes {
if prefix == "*" {
return true
}
p := "/" + prefix
if p == path || strings.HasPrefix(path, p+"/") {
return true
}
}
return false
}
// Register a file to be closed on Unmount()
func (this *FileSystem) CloseOnUnmount(file io.Closer) {
this.closeOnUnmountLock.Lock()
defer this.closeOnUnmountLock.Unlock()
this.closeOnUnmount = append(this.closeOnUnmount, file)
}
// Statfs is called to obtain file system metadata.
// It should write that data to resp.
func (this *FileSystem) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
fsInfo, err := this.HdfsAccessor.StatFs()
if err != nil {
Warning.Println("Failed to get HDFS info,", err)
return err
}
resp.Bsize = 1024
resp.Bfree = fsInfo.remaining / uint64(resp.Bsize)
resp.Bavail = resp.Bfree
resp.Blocks = fsInfo.capacity / uint64(resp.Bsize)
return nil
}