Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix repl always return 0 #1286

Merged
merged 7 commits into from
Dec 4, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 29 additions & 21 deletions cmd/repl/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func header(head map[string]interface{}) ([]string, error) {
return cols, nil
}

func render(rsp interface{}, table *tablewriter.Table) bool {
func render(rsp interface{}, table *tablewriter.Table) (bool, error) {
isTable := false
switch s := rsp.(type) {
case map[string]interface{}: // table header
Expand All @@ -81,15 +81,13 @@ func render(rsp interface{}, table *tablewriter.Table) bool {
table.Append(row)
isTable = true
case error:
if os.Getenv("SQLFLOW_log_dir") != "" { // To avoid printing duplicated error message to console
log.New(os.Stderr, "", 0).Printf("ERROR: %v\n", s)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better keep this check somewhere to avoid printing error message twice when logging to stderr.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use log.Fatalf to exit if some errors.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.Fatalf will terminate the program, that's no good for the interactive mode of REPL.

}
return false, s
case sql.EndOfExecution:
return isTable
return isTable, nil
default:
fmt.Println(s)
}
return isTable
return isTable, nil
}

func flagPassed(name ...string) bool {
Expand All @@ -104,16 +102,21 @@ func flagPassed(name ...string) bool {
return found
}

func runStmt(stmt string, isTerminal bool, modelDir string, db *sql.DB, ds string) {
func runStmt(stmt string, isTerminal bool, modelDir string, db *sql.DB, ds string) error {
if !isTerminal {
fmt.Println("sqlflow>", stmt)
}
isTable, tableRendered := false, false
var err error
table := tablewriter.NewWriter(os.Stdout)
sess := makeSessionFromEnv()

stream := sql.RunSQLProgram(stmt, db, modelDir, &pb.Session{})
stream := sql.RunSQLProgram(stmt, db, modelDir, sess)
for rsp := range stream.ReadAll() {
isTable = render(rsp, table)
isTable, err = render(rsp, table)
if err != nil {
return err
}

// pagination. avoid exceed memory
if isTable && table.NumLines() == tablePageSize {
Expand All @@ -125,6 +128,7 @@ func runStmt(stmt string, isTerminal bool, modelDir string, db *sql.DB, ds strin
if table.NumLines() > 0 || !tableRendered {
table.Render()
}
return nil
}

func repl(scanner *bufio.Scanner, modelDir string, db *sql.DB, ds string) {
Expand All @@ -134,9 +138,23 @@ func repl(scanner *bufio.Scanner, modelDir string, db *sql.DB, ds string) {
if err == io.EOF && stmt == "" {
return
}
runStmt(stmt, false, modelDir, db, ds)
if err := runStmt(stmt, false, modelDir, db, ds); err != nil {
log.Fatalf("run SQL statment failed: %v", err)
}
}
}

func makeSessionFromEnv() *pb.Session {
return &pb.Session{
Token: os.Getenv("SQLFLOW_USER_TOKEN"),
DbConnStr: os.Getenv("SQLFLOW_DATASOURCE"),
ExitOnSubmit: strings.ToLower(os.Getenv("SQLFLOW_EXIT_ON_SUBMIT")) == "true",
UserId: os.Getenv("SQLFLOW_USER_ID"),
HiveLocation: os.Getenv("SQLFLOW_HIVE_LOCATION"),
HdfsNamenodeAddr: os.Getenv("SQLFLOW_HDFS_NAMENODE_ADDR"),
HdfsUser: os.Getenv("JUPYTER_HADOOP_USER"),
HdfsPass: os.Getenv("JUPYTER_HADOOP_PASS"),
}
}

func parseSQLFromStdin(stdin io.Reader) (string, error) {
Expand All @@ -152,17 +170,7 @@ func parseSQLFromStdin(stdin io.Reader) (string, error) {
if sqlflowDatasrouce == "" {
return "", fmt.Errorf("no SQLFLOW_DATASOURCE env provided")
}

sess := &pb.Session{
Token: os.Getenv("SQLFLOW_USER_TOKEN"),
DbConnStr: os.Getenv("SQLFLOW_DATASOURCE"),
ExitOnSubmit: strings.ToLower(os.Getenv("SQLFLOW_EXIT_ON_SUBMIT")) == "true",
UserId: os.Getenv("SQLFLOW_USER_ID"),
HiveLocation: os.Getenv("SQLFLOW_HIVE_LOCATION"),
HdfsNamenodeAddr: os.Getenv("SQLFLOW_HDFS_NAMENODE_ADDR"),
HdfsUser: os.Getenv("JUPYTER_HADOOP_USER"),
HdfsPass: os.Getenv("JUPYTER_HADOOP_PASS"),
}
sess := makeSessionFromEnv()
pbIRStr, err := sql.ParseSQLStatement(strings.Join(scanedInput, "\n"), sess)
if err != nil {
return "", err
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func (m *model) saveDB(db *DB, table string, session *pb.Session) (e error) {
if e != nil {
return fmt.Errorf("cannot create sqlfs file %s: %v", table, e)
}
defer sqlf.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about defering with the new Close code in a lambda?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe should check the error of hiveWriter.Close, I updated this PR follow https://www.joeshaw.org/dont-defer-close-on-writable-files/ that uses a safe way to close sqlfs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe if we call hiveWriter.Close twice?


// Use a bytes.Buffer as the gob message container to separate
// the message from the following tarball.
Expand All @@ -105,6 +104,10 @@ func (m *model) saveDB(db *DB, table string, session *pb.Session) (e error) {
if e := cmd.Run(); e != nil {
return fmt.Errorf("tar stderr: %v\ntar cmd %v", errBuf.String(), e)
}

if e := sqlf.Close(); e != nil {
return fmt.Errorf("close sqlfs error: %v", e)
}
return nil
}

Expand Down