diff --git a/.travis.yml b/.travis.yml index 37e8467a47..08852baf60 100644 --- a/.travis.yml +++ b/.travis.yml @@ -54,13 +54,17 @@ jobs: script: - docker pull sqlflow/sqlflow:latest && docker build --cache-from sqlflow/sqlflow:latest -t sqlflow:latest -f Dockerfile . - bash scripts/setup_k8s_env.sh - - docker run --rm --net=host -v /var/run/docker.sock:/var/run/docker.sock -v $HOME/.kube:/root/.kube -v /home/$USER/.minikube/:/home/$USER/.minikube/ -v $GOPATH:/root/go -w /root/go/src/sqlflow.org/sqlflow sqlflow:latest bash scripts/test_couler.sh - docker run --rm --net=host -v /var/run/docker.sock:/var/run/docker.sock -v $HOME/.kube:/root/.kube -v /home/$USER/.minikube/:/home/$USER/.minikube/ -v $GOPATH:/root/go -w /root/go/src/sqlflow.org/sqlflow sqlflow:latest bash scripts/test_ipython.sh - docker run -e MAXCOMPUTE_AK=$MAXCOMPUTE_AK -e MAXCOMPUTE_SK=$MAXCOMPUTE_SK --rm --net=host -v /var/run/docker.sock:/var/run/docker.sock -v $HOME/.kube:/root/.kube -v /home/$USER/.minikube/:/home/$USER/.minikube/ -v $GOPATH:/root/go -w /root/go/src/sqlflow.org/sqlflow sqlflow:latest bash scripts/test_maxcompute.sh - env: SQLFLOW_TEST=java script: - - docker pull sqlflow/sqlflow:latest && docker build --cache-from sqlflow/sqlflow:latest -t sqlflow:latest -f Dockerfile . - - docker run --rm -v $GOPATH:/root/go -w /root/go/src/sqlflow.org/sqlflow sqlflow:latest bash scripts/test_java.sh + - docker pull sqlflow/sqlflow:latest && docker build --cache-from sqlflow/sqlflow:latest -t sqlflow:latest -f Dockerfile . + - docker run --rm -v $GOPATH:/root/go -w /root/go/src/sqlflow.org/sqlflow sqlflow:latest bash scripts/test_java.sh + - env: SQLFLOW_TEST=workflow + script: + - bash scripts/setup_k8s_env.sh + - eval $(sudo minikube docker-env) && docker pull sqlflow/sqlflow:latest && docker build --cache-from sqlflow/sqlflow:latest -t sqlflow:latest -f Dockerfile . && docker tag sqlflow:latest sqlflow:submitter + - docker run --rm --net=host -e SQLFLOW_WORKFLOW_STEP_IMAGE=sqlflow:submitter -v /var/run/docker.sock:/var/run/docker.sock -v $HOME/.kube:/root/.kube -v /home/$USER/.minikube/:/home/$USER/.minikube/ -v $GOPATH:/root/go -w /root/go/src/sqlflow.org/sqlflow sqlflow:latest bash scripts/test_workflow.sh - stage: deploy script: - docker pull sqlflow/sqlflow:latest && docker build --cache-from sqlflow/sqlflow:latest -t sqlflow:latest -f Dockerfile . diff --git a/cmd/sqlflowserver/main_test.go b/cmd/sqlflowserver/main_test.go index 2cf26be489..5a9c7140dd 100644 --- a/cmd/sqlflowserver/main_test.go +++ b/cmd/sqlflowserver/main_test.go @@ -23,6 +23,7 @@ import ( "os" "os/exec" "path" + "strings" "testing" "time" @@ -44,6 +45,7 @@ var caseDB = "iris" var caseTrainTable = "train" var caseTestTable = "test" var casePredictTable = "predict" +var testDatasource = os.Getenv("SQLFLOW_TEST_DATASOURCE") // caseInto used by CaseTrainSQL var caseInto = "sqlflow_models.my_dnn_model" @@ -483,6 +485,94 @@ func TestEnd2EndMaxComputeElasticDL(t *testing.T) { t.Run("CaseTrainElasticDL", CaseTrainElasticDL) } +func TestEnd2EndMySQLWorkflow(t *testing.T) { + a := assert.New(t) + if os.Getenv("SQLFLOW_TEST_DATASOURCE") == "" || strings.ToLower(os.Getenv("SQLFLOW_TEST")) != "workflow" { + t.Skip("Skipping workflow test.") + } + driverName, _, err := sql.SplitDataSource(testDatasource) + a.NoError(err) + + if driverName != "mysql" { + t.Skip("Skipping workflow test.") + } + modelDir := "" + tmpDir, caCrt, caKey, err := generateTempCA() + defer os.RemoveAll(tmpDir) + if err != nil { + t.Fatalf("failed to generate CA pair %v", err) + } + + go start(modelDir, caCrt, caKey, unitestPort, true) + waitPortReady(fmt.Sprintf("localhost:%d", unitestPort), 0) + if err != nil { + t.Fatalf("prepare test dataset failed: %v", err) + } + + t.Run("CaseSubmitSQLProgram", CaseSubmitSQLProgram) +} + +func CaseSubmitSQLProgram(t *testing.T) { + a := assert.New(t) + sqlProgram := fmt.Sprintf(` +SELECT * +FROM %s.%s +TO TRAIN DNNClassifier +WITH + model.n_classes = 3, + model.hidden_units = [10, 20] +COLUMN sepal_length, sepal_width, petal_length, petal_width +LABEL class +INTO sqlflow_models.my_dnn_model; + +SELECT * +FROM %s.%s +TO PREDICT %s.%s.class +USING sqlflow_models.my_dnn_model; + +SELECT * +FROM %s.%s LIMIT 5; + `, caseDB, caseTrainTable, + caseDB, caseTrainTable, caseDB, casePredictTable, caseDB, casePredictTable) + + conn, err := createRPCConn() + if err != nil { + a.Fail("Create gRPC client error: %v", err) + } + defer conn.Close() + cli := pb.NewSQLFlowClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) + defer cancel() + stream, err := cli.Run(ctx, &pb.Request{Sql: sqlProgram, Session: &pb.Session{DbConnStr: testDatasource}}) + var workflowID string + for { + iter, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + log.Fatalf("stream read err: %v", err) + } + workflowID = iter.GetJob().GetId() + } + a.True(strings.HasPrefix(workflowID, "sqlflow-couler")) + // check the workflow status in 180 seconods + // TODO(yancey1989): using the Fetch gRPC interface to check the workflow status + for i := 0; i < 60; i++ { + cmd := exec.Command("kubectl", "get", "wf", workflowID, "-o", "jsonpath='{.status.phase}'") + out, err := cmd.CombinedOutput() + if err != nil { + log.Fatalf("get workflow status error: %v", err) + } + if string(out) == "'Succeeded'" { + return + } + time.Sleep(3 * time.Second) + } + // workflow times out + log.Fatalf("workflow: %s times out", workflowID) +} + func CaseShowDatabases(t *testing.T) { a := assert.New(t) cmd := "show databases;" diff --git a/pkg/sql/codegen/couler/codegen.go b/pkg/sql/codegen/couler/codegen.go index 916129bdfa..492badb48d 100644 --- a/pkg/sql/codegen/couler/codegen.go +++ b/pkg/sql/codegen/couler/codegen.go @@ -15,17 +15,24 @@ package couler import ( "bytes" + "fmt" + "os" + pb "sqlflow.org/sqlflow/pkg/proto" "sqlflow.org/sqlflow/pkg/sql/ir" ) +var defaultDockerImage = "sqlflow/sqlflow" + // Run generates Couler program -func Run(programIR ir.SQLProgram) (string, error) { +func Run(programIR ir.SQLProgram, session *pb.Session) (string, error) { // TODO(yancey1989): fill session as env - r := &coulerFiller{} + r := &coulerFiller{ + DataSource: session.DbConnStr, + } for _, sqlIR := range programIR { ss := &sqlStatment{} - switch sqlIR.(type) { + switch i := sqlIR.(type) { case *ir.StandardSQL: ss.IsExtendedSQL = false ss.OriginalSQL = string(*sqlIR.(*ir.StandardSQL)) @@ -38,9 +45,16 @@ func Run(programIR ir.SQLProgram) (string, error) { case *ir.AnalyzeStmt: ss.IsExtendedSQL = true ss.OriginalSQL = sqlIR.(*ir.AnalyzeStmt).OriginalSQL + default: + return "", fmt.Errorf("uncognized IR type: %v", i) + } + // NOTE(yancey1989): does not use ModelImage here since the Predict statment + // does not contain the ModelImage field in SQL Program IR. + if os.Getenv("SQLFLOW_WORKFLOW_STEP_IMAGE") != "" { + ss.DockerImage = os.Getenv("SQLFLOW_WORKFLOW_STEP_IMAGE") + } else { + ss.DockerImage = defaultDockerImage } - // TODO(yancey1989): using the custom Docker image in model zoo - ss.DockerImage = "sqlflow/sqlflow" r.SQLStatements = append(r.SQLStatements, ss) } var program bytes.Buffer diff --git a/pkg/sql/codegen/couler/codegen_test.go b/pkg/sql/codegen/couler/codegen_test.go index 26ffc0c8f0..9a91186b6a 100644 --- a/pkg/sql/codegen/couler/codegen_test.go +++ b/pkg/sql/codegen/couler/codegen_test.go @@ -20,13 +20,14 @@ import ( "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/assert" + pb "sqlflow.org/sqlflow/pkg/proto" "sqlflow.org/sqlflow/pkg/sql/ir" ) func TestCodegen(t *testing.T) { a := assert.New(t) sqlIR := mockSQLProgramIR() - code, err := Run(sqlIR) + code, err := Run(sqlIR, &pb.Session{}) a.NoError(err) r, _ := regexp.Compile(`repl -e "(.*);"`) diff --git a/pkg/sql/codegen/couler/template.go b/pkg/sql/codegen/couler/template.go index 64b1ff3342..0969bc19ec 100644 --- a/pkg/sql/codegen/couler/template.go +++ b/pkg/sql/codegen/couler/template.go @@ -21,20 +21,21 @@ type sqlStatment struct { DockerImage string } type coulerFiller struct { - dataSource string + DataSource string SQLStatements []*sqlStatment } const coulerTemplateText = ` import couler.argo as couler +datasource = "{{ .DataSource }}" {{ range $ss := .SQLStatements }} {{if $ss.IsExtendedSQL }} -couler.run_container(command='''repl -e "{{ $ss.OriginalSQL }}"''', image="{{ $ss.DockerImage }}") +couler.run_container(command='''repl -e "{{ $ss.OriginalSQL }}" --datasource="%s"''' % datasource, image="{{ $ss.DockerImage }}") {{else}} # TODO(yancey1989): # using "repl -parse" to output IR and # feed to "sqlflow_submitter.{submitter}.train" to submite the job -couler.run_container(command='''repl -e "{{ $ss.OriginalSQL }}"''', image="{{ $ss.DockerImage }}") +couler.run_container(command='''repl -e "{{ $ss.OriginalSQL }}" --datasource="%s"''' % datasource, image="{{ $ss.DockerImage }}") {{end}} {{end}} ` diff --git a/pkg/sql/database.go b/pkg/sql/database.go index 2dce78039d..b424d93bab 100644 --- a/pkg/sql/database.go +++ b/pkg/sql/database.go @@ -65,6 +65,9 @@ func openDB(db *DB) error { // SplitDataSource splits the datasource into drivername and datasource name func SplitDataSource(datasource string) (string, string, error) { + if datasource == "" { + return "", "", fmt.Errorf("datasource should not be an empty string") + } dses := strings.Split(datasource, "://") if len(dses) != 2 { return "", "", fmt.Errorf("Expecting but cannot find :// in datasource %v", datasource) diff --git a/pkg/sql/executor_ir.go b/pkg/sql/executor_ir.go index fecfa753bd..d8aa24443c 100644 --- a/pkg/sql/executor_ir.go +++ b/pkg/sql/executor_ir.go @@ -48,7 +48,7 @@ func RunSQLProgram(sqlProgram string, modelDir string, session *pb.Session) *Pip var db *DB var err error if db, err = NewDB(session.DbConnStr); err != nil { - wr.Write(fmt.Sprintf("create DB failed: %v", err)) + wr.Write(fmt.Errorf("create DB failed: %v", err)) log.Errorf("create DB failed: %v", err) } defer wr.Close() @@ -128,8 +128,8 @@ func SubmitWorkflow(sqlProgram string, modelDir string, session *pb.Session) *Pi return rd } -func writeCoulerFile(spIRs ir.SQLProgram) (string, error) { - program, err := couler.Run(spIRs) +func writeCoulerFile(spIRs ir.SQLProgram, session *pb.Session) (string, error) { + program, err := couler.Run(spIRs, session) if err != nil { return "", fmt.Errorf("generate couler program error: %v", err) } @@ -202,19 +202,21 @@ func submitWorkflow(wr *PipeWriter, sqlProgram string, modelDir string, session } // 1. call codegen_couler.go to genearte Couler program. - coulerFileName, err := writeCoulerFile(spIRs) + coulerFileName, err := writeCoulerFile(spIRs, session) if err != nil { return err } + defer os.RemoveAll(coulerFileName) // 2. compile Couler program into Argo YAML. - argoFile, err := writeArgoFile(coulerFileName) + argoFileName, err := writeArgoFile(coulerFileName) if err != nil { return err } + defer os.RemoveAll(argoFileName) // 3. submit Argo YAML and fetch the workflow ID. - cmd := exec.Command("kubectl", "create", "-f", argoFile) + cmd := exec.Command("kubectl", "create", "-f", argoFileName) output, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("submit Argo YAML error: %v, output: %s", err, string(output)) diff --git a/pkg/sql/executor_ir_test.go b/pkg/sql/executor_ir_test.go index b32a1226d8..00edc88c16 100644 --- a/pkg/sql/executor_ir_test.go +++ b/pkg/sql/executor_ir_test.go @@ -17,7 +17,6 @@ import ( "fmt" "io/ioutil" "os" - "strings" "testing" "github.com/stretchr/testify/assert" @@ -279,25 +278,3 @@ func TestLogChanWriter_Write(t *testing.T) { _, more := <-c a.False(more) } - -func TestSubmitWorkflow(t *testing.T) { - if os.Getenv("SQLFLOW_ARGO_MODE") != "True" { - t.Skip("argo: skip Argo tests") - } - a := assert.New(t) - modelDir := "" - a.NotPanics(func() { - rd := SubmitWorkflow(testXGBoostTrainSelectIris, modelDir, getDefaultSession()) - for r := range rd.ReadAll() { - switch r.(type) { - case WorkflowJob: - job := r.(WorkflowJob) - a.True(strings.HasPrefix(job.JobID, "sqlflow-couler")) - // TODO(tony): wait to check if job succeeded. - // The workflow is currently failed since we haven't configure the data source. - default: - a.Fail("SubmitWorkflow should return JobID") - } - } - }) -} diff --git a/scripts/test_couler.sh b/scripts/test_workflow.sh similarity index 57% rename from scripts/test_couler.sh rename to scripts/test_workflow.sh index ac447ac025..8117f07fbc 100644 --- a/scripts/test_couler.sh +++ b/scripts/test_workflow.sh @@ -14,6 +14,7 @@ set -e +export SQLFLOW_TEST=workflow ############# Run Couler unit tests ############# pip install -r python/couler/requirements.txt @@ -29,12 +30,9 @@ function test_couler() { import couler.argo as couler couler.run_container(image="docker/whalesay", command='echo "SQLFlow bridges AI and SQL engine."') EOF - - + couler run --mode argo --file /tmp/sqlflow_couler.py > /tmp/sqlflow_argo.yaml - MESSAGE=$(kubectl create -f /tmp/sqlflow_argo.yaml) - WORKFLOW_NAME=$(echo ${MESSAGE} | cut -d ' ' -f 1 | cut -d '/' -f 2) echo WORKFLOW_NAME ${WORKFLOW_NAME} @@ -55,18 +53,43 @@ EOF return 1 } -test_couler -ret=$? +function check_ret() { + ret=$1 + message=$2 + echo $ret $message + if [[ "$ret" != "0" ]]; then + echo $message + exit 1 + fi +} -if [[ "$ret" != "0" ]]; then - echo "Argo job timed out." - rm -rf /tmp/sqlflow* - exit 1 -fi +test_couler +check_ret $? "Test Couler failed" ############# Run SQLFLow test with Argo Mode ############# -service mysql start -go generate ./... -go install ./... -SQLFLOW_ARGO_MODE=True go test ./pkg/sql/. -run TestSubmitWorkflow -v +function test_workflow() { + # start a SQLFlow MySQL Pod with testdata + kubectl run mysql --port 3306 --env="SQLFLOW_MYSQL_HOST=0.0.0.0" --env="SQLFLOW_MYSQL_PORT=3306" --image=sqlflow/sqlflow --command -- bash /start.sh mysql + MYSQL_POD_NAME=$(kubectl get pod -l run=mysql -o jsonpath="{.items[0].metadata.name}") + + for i in {1..30} + do + MYSQL_POD_STATUS=$(kubectl get pod ${MYSQL_POD_NAME} -o jsonpath='{.status.phase}') + echo ${MYSQL_POD_STATUS} + if [[ "${MYSQL_POD_STATUS}" == "Running" ]]; then + echo "SQLFlow MySQL Pod running." + MYSQL_POD_IP=$(kubectl get pod ${MYSQL_POD_NAME} -o jsonpath='{.status.podIP}') + go generate ./... + SQLFLOW_TEST_DATASOURCE="mysql://root:root@tcp(${MYSQL_POD_IP}:3306)/?maxAllowedPacket=0" go test ./cmd/... -run TestEnd2EndMySQLWorkflow -v + return 0 + else + echo "Wait SQLFlow MySQL Pod ${MYSQL_POD_NAME}" + sleep ${CHECK_INTERVAL_SECS} + fi + done + echo "Launch SQLFlow MySQL Pod times out" + return 1 +} +test_workflow +check_ret $? "Test SQLFLow workflow failed" \ No newline at end of file