forked from microsoft/hdfs-mount
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathFileHandleWriter.go
165 lines (148 loc) · 4.58 KB
/
FileHandleWriter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.
package main
import (
"bazil.org/fuse"
"errors"
"golang.org/x/net/context"
"io"
"io/ioutil"
"os"
"time"
)
// Encapsulates state and routines for writing data from the file handle
type FileHandleWriter struct {
Handle *FileHandle
stagingFile *os.File
BytesWritten uint64
}
// Opens the file for writing
func NewFileHandleWriter(handle *FileHandle, newFile bool) (*FileHandleWriter, error) {
this := &FileHandleWriter{Handle: handle}
Info.Println("newFile=", newFile)
path := this.Handle.File.AbsolutePath()
hdfsAccessor := this.Handle.File.FileSystem.HdfsAccessor
if newFile {
hdfsAccessor.Remove(path)
w, err := hdfsAccessor.CreateFile(path, this.Handle.File.Attrs.Mode)
if err != nil {
Error.Println("Creating", path, ":", path, err)
return nil, err
}
w.Close()
}
stageDir := "/var/hdfs-mount" // TODO: make configurable
if ok := os.MkdirAll(stageDir, 0700); ok != nil {
Error.Println("Failed to create stageDir /var/hdfs-mount, Error:", ok)
return nil, ok
}
var err error
this.stagingFile, err = ioutil.TempFile(stageDir, "stage")
if err != nil {
return nil, err
}
os.Remove(this.stagingFile.Name()) //TODO: handle error
if !newFile {
// Request to write to existing file
_, err := hdfsAccessor.Stat(path)
if err != nil {
Warning.Println("[", path, "] Can't stat file:", err)
return this, nil
}
Info.Println("Buffering contents of the file to the staging area ", this.stagingFile.Name())
reader, err := hdfsAccessor.OpenRead(path)
if err != nil {
Warning.Println("HDFS/open failure:", err)
this.stagingFile.Close()
this.stagingFile = nil
return nil, err
}
nc, err := io.Copy(this.stagingFile, reader)
if err != nil {
Warning.Println("Copy failure:", err)
this.stagingFile.Close()
this.stagingFile = nil
return nil, err
}
reader.Close()
Info.Println("Copied", nc, "bytes")
}
return this, nil
}
// Responds on FUSE Write request
func (this *FileHandleWriter) Write(handle *FileHandle, ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
fsInfo, err := this.Handle.File.FileSystem.HdfsAccessor.StatFs()
if err != nil {
// Donot abort, continue writing
Error.Println("Failed to get HDFS usage, ERROR:", err)
} else if uint64(req.Offset) >= fsInfo.remaining {
Error.Println("[", this.Handle.File.AbsolutePath(), "] writes larger size (", req.Offset, ")than HDFS available size (", fsInfo.remaining, ")")
return errors.New("Too large file")
}
nw, err := this.stagingFile.WriteAt(req.Data, req.Offset)
resp.Size = nw
if err != nil {
return err
}
this.BytesWritten += uint64(nw)
return nil
}
// Responds on FUSE Flush/Fsync request
func (this *FileHandleWriter) Flush() error {
Info.Println("[", this.Handle.File.AbsolutePath(), "] flush (", this.BytesWritten, "new bytes written)")
if this.BytesWritten == 0 {
// Nothing to do
return nil
}
this.BytesWritten = 0
defer this.Handle.File.InvalidateMetadataCache()
op := this.Handle.File.FileSystem.RetryPolicy.StartOperation()
for {
err := this.FlushAttempt()
if err != io.EOF || IsSuccessOrBenignError(err) || !op.ShouldRetry("Flush()", err) {
return err
}
// Restart a new connection, https://github.com/colinmarc/hdfs/issues/86
this.Handle.File.FileSystem.HdfsAccessor.Close()
Error.Println("[", this.Handle.File.AbsolutePath(), "] failed flushing. Retry")
// Wait for 30 seconds before another retry to get another set of datanodes.
// https://community.hortonworks.com/questions/2474/how-to-identify-stale-datanode.html
time.Sleep(30 * time.Second)
}
return nil
}
// Single attempt to flush a file
func (this *FileHandleWriter) FlushAttempt() error {
hdfsAccessor := this.Handle.File.FileSystem.HdfsAccessor
hdfsAccessor.Remove(this.Handle.File.AbsolutePath())
w, err := hdfsAccessor.CreateFile(this.Handle.File.AbsolutePath(), this.Handle.File.Attrs.Mode)
if err != nil {
Error.Println("ERROR creating", this.Handle.File.AbsolutePath(), ":", err)
return err
}
this.stagingFile.Seek(0, 0)
b := make([]byte, 65536, 65536)
for {
nr, err := this.stagingFile.Read(b)
if err != nil {
break
}
b = b[:nr]
_, err = w.Write(b)
if err != nil {
Error.Println("Writing", this.Handle.File.AbsolutePath(), ":", err)
w.Close()
return err
}
}
err = w.Close()
if err != nil {
Error.Println("Closing", this.Handle.File.AbsolutePath(), ":", err)
return err
}
return nil
}
// Closes the writer
func (this *FileHandleWriter) Close() error {
return this.stagingFile.Close()
}