Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add workflow e2e test #1318

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ jobs:
script:
- docker pull sqlflow/sqlflow:latest && docker build --cache-from sqlflow/sqlflow:latest -t sqlflow:latest -f Dockerfile .
- bash scripts/setup_k8s_env.sh
- docker run --rm --net=host -v /var/run/docker.sock:/var/run/docker.sock -v $HOME/.kube:/root/.kube -v /home/$USER/.minikube/:/home/$USER/.minikube/ -v $GOPATH:/root/go -w /root/go/src/sqlflow.org/sqlflow sqlflow:latest bash scripts/test_couler.sh
- docker run --rm --net=host -v /var/run/docker.sock:/var/run/docker.sock -v $HOME/.kube:/root/.kube -v /home/$USER/.minikube/:/home/$USER/.minikube/ -v $GOPATH:/root/go -w /root/go/src/sqlflow.org/sqlflow sqlflow:latest bash scripts/test_ipython.sh
- docker run -e MAXCOMPUTE_AK=$MAXCOMPUTE_AK -e MAXCOMPUTE_SK=$MAXCOMPUTE_SK --rm --net=host -v /var/run/docker.sock:/var/run/docker.sock -v $HOME/.kube:/root/.kube -v /home/$USER/.minikube/:/home/$USER/.minikube/ -v $GOPATH:/root/go -w /root/go/src/sqlflow.org/sqlflow sqlflow:latest bash scripts/test_maxcompute.sh
- env: SQLFLOW_TEST=java
script:
- docker pull sqlflow/sqlflow:latest && docker build --cache-from sqlflow/sqlflow:latest -t sqlflow:latest -f Dockerfile .
- docker run --rm -v $GOPATH:/root/go -w /root/go/src/sqlflow.org/sqlflow sqlflow:latest bash scripts/test_java.sh
- docker pull sqlflow/sqlflow:latest && docker build --cache-from sqlflow/sqlflow:latest -t sqlflow:latest -f Dockerfile .
- docker run --rm -v $GOPATH:/root/go -w /root/go/src/sqlflow.org/sqlflow sqlflow:latest bash scripts/test_java.sh
- env: SQLFLOW_TEST=workflow
script:
- bash scripts/setup_k8s_env.sh
- eval $(sudo minikube docker-env) && docker pull sqlflow/sqlflow:latest && docker build --cache-from sqlflow/sqlflow:latest -t sqlflow:latest -f Dockerfile . && docker tag sqlflow:latest sqlflow:submitter
- docker run --rm --net=host -e SQLFLOW_WORKFLOW_STEP_IMAGE=sqlflow:submitter -v /var/run/docker.sock:/var/run/docker.sock -v $HOME/.kube:/root/.kube -v /home/$USER/.minikube/:/home/$USER/.minikube/ -v $GOPATH:/root/go -w /root/go/src/sqlflow.org/sqlflow sqlflow:latest bash scripts/test_workflow.sh
- stage: deploy
script:
- docker pull sqlflow/sqlflow:latest && docker build --cache-from sqlflow/sqlflow:latest -t sqlflow:latest -f Dockerfile .
Expand Down
90 changes: 90 additions & 0 deletions cmd/sqlflowserver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"os/exec"
"path"
"strings"
"testing"
"time"

Expand All @@ -44,6 +45,7 @@ var caseDB = "iris"
var caseTrainTable = "train"
var caseTestTable = "test"
var casePredictTable = "predict"
var testDatasource = os.Getenv("SQLFLOW_TEST_DATASOURCE")

// caseInto used by CaseTrainSQL
var caseInto = "sqlflow_models.my_dnn_model"
Expand Down Expand Up @@ -483,6 +485,94 @@ func TestEnd2EndMaxComputeElasticDL(t *testing.T) {
t.Run("CaseTrainElasticDL", CaseTrainElasticDL)
}

func TestEnd2EndMySQLWorkflow(t *testing.T) {
a := assert.New(t)
if os.Getenv("SQLFLOW_TEST_DATASOURCE") == "" || strings.ToLower(os.Getenv("SQLFLOW_TEST")) != "workflow" {
t.Skip("Skipping workflow test.")
}
driverName, _, err := sql.SplitDataSource(testDatasource)
a.NoError(err)

if driverName != "mysql" {
t.Skip("Skipping workflow test.")
}
modelDir := ""
tmpDir, caCrt, caKey, err := generateTempCA()
defer os.RemoveAll(tmpDir)
if err != nil {
t.Fatalf("failed to generate CA pair %v", err)
}

go start(modelDir, caCrt, caKey, unitestPort, true)
waitPortReady(fmt.Sprintf("localhost:%d", unitestPort), 0)
if err != nil {
t.Fatalf("prepare test dataset failed: %v", err)
}

t.Run("CaseSubmitSQLProgram", CaseSubmitSQLProgram)
}

func CaseSubmitSQLProgram(t *testing.T) {
a := assert.New(t)
sqlProgram := fmt.Sprintf(`
SELECT *
FROM %s.%s
TO TRAIN DNNClassifier
WITH
model.n_classes = 3,
model.hidden_units = [10, 20]
COLUMN sepal_length, sepal_width, petal_length, petal_width
LABEL class
INTO sqlflow_models.my_dnn_model;

SELECT *
FROM %s.%s
TO PREDICT %s.%s.class
USING sqlflow_models.my_dnn_model;

SELECT *
FROM %s.%s LIMIT 5;
`, caseDB, caseTrainTable,
caseDB, caseTrainTable, caseDB, casePredictTable, caseDB, casePredictTable)

conn, err := createRPCConn()
if err != nil {
a.Fail("Create gRPC client error: %v", err)
}
defer conn.Close()
cli := pb.NewSQLFlowClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
defer cancel()
stream, err := cli.Run(ctx, &pb.Request{Sql: sqlProgram, Session: &pb.Session{DbConnStr: testDatasource}})
var workflowID string
for {
iter, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("stream read err: %v", err)
Yancey1989 marked this conversation as resolved.
Show resolved Hide resolved
}
workflowID = iter.GetJob().GetId()
}
a.True(strings.HasPrefix(workflowID, "sqlflow-couler"))
// check the workflow status in 180 seconods
// TODO(yancey1989): using the Fetch gRPC interface to check the workflow status
for i := 0; i < 60; i++ {
cmd := exec.Command("kubectl", "get", "wf", workflowID, "-o", "jsonpath='{.status.phase}'")
out, err := cmd.CombinedOutput()
if err != nil {
log.Fatalf("get workflow status error: %v", err)
}
if string(out) == "'Succeeded'" {
return
}
time.Sleep(3 * time.Second)
}
// workflow times out
log.Fatalf("workflow: %s times out", workflowID)
}

func CaseShowDatabases(t *testing.T) {
a := assert.New(t)
cmd := "show databases;"
Expand Down
24 changes: 19 additions & 5 deletions pkg/sql/codegen/couler/codegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@ package couler

import (
"bytes"
"fmt"
"os"

pb "sqlflow.org/sqlflow/pkg/proto"
"sqlflow.org/sqlflow/pkg/sql/ir"
)

var defaultDockerImage = "sqlflow/sqlflow"

// Run generates Couler program
func Run(programIR ir.SQLProgram) (string, error) {
func Run(programIR ir.SQLProgram, session *pb.Session) (string, error) {
// TODO(yancey1989): fill session as env
r := &coulerFiller{}
r := &coulerFiller{
DataSource: session.DbConnStr,
}
for _, sqlIR := range programIR {
ss := &sqlStatment{}
switch sqlIR.(type) {
switch i := sqlIR.(type) {
case *ir.StandardSQL:
ss.IsExtendedSQL = false
ss.OriginalSQL = string(*sqlIR.(*ir.StandardSQL))
Expand All @@ -38,9 +45,16 @@ func Run(programIR ir.SQLProgram) (string, error) {
case *ir.AnalyzeStmt:
ss.IsExtendedSQL = true
ss.OriginalSQL = sqlIR.(*ir.AnalyzeStmt).OriginalSQL
default:
return "", fmt.Errorf("uncognized IR type: %v", i)
}
// NOTE(yancey1989): does not use ModelImage here since the Predict statment
// does not contain the ModelImage field in SQL Program IR.
if os.Getenv("SQLFLOW_WORKFLOW_STEP_IMAGE") != "" {
ss.DockerImage = os.Getenv("SQLFLOW_WORKFLOW_STEP_IMAGE")
} else {
ss.DockerImage = defaultDockerImage
}
// TODO(yancey1989): using the custom Docker image in model zoo
ss.DockerImage = "sqlflow/sqlflow"
r.SQLStatements = append(r.SQLStatements, ss)
}
var program bytes.Buffer
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/codegen/couler/codegen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (

"github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
pb "sqlflow.org/sqlflow/pkg/proto"
"sqlflow.org/sqlflow/pkg/sql/ir"
)

func TestCodegen(t *testing.T) {
a := assert.New(t)
sqlIR := mockSQLProgramIR()
code, err := Run(sqlIR)
code, err := Run(sqlIR, &pb.Session{})
a.NoError(err)

r, _ := regexp.Compile(`repl -e "(.*);"`)
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/codegen/couler/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,21 @@ type sqlStatment struct {
DockerImage string
}
type coulerFiller struct {
dataSource string
DataSource string
SQLStatements []*sqlStatment
}

const coulerTemplateText = `
import couler.argo as couler
datasource = "{{ .DataSource }}"
{{ range $ss := .SQLStatements }}
{{if $ss.IsExtendedSQL }}
couler.run_container(command='''repl -e "{{ $ss.OriginalSQL }}"''', image="{{ $ss.DockerImage }}")
couler.run_container(command='''repl -e "{{ $ss.OriginalSQL }}" --datasource="%s"''' % datasource, image="{{ $ss.DockerImage }}")
{{else}}
# TODO(yancey1989):
# using "repl -parse" to output IR and
# feed to "sqlflow_submitter.{submitter}.train" to submite the job
couler.run_container(command='''repl -e "{{ $ss.OriginalSQL }}"''', image="{{ $ss.DockerImage }}")
couler.run_container(command='''repl -e "{{ $ss.OriginalSQL }}" --datasource="%s"''' % datasource, image="{{ $ss.DockerImage }}")
{{end}}
{{end}}
`
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func openDB(db *DB) error {

// SplitDataSource splits the datasource into drivername and datasource name
func SplitDataSource(datasource string) (string, string, error) {
if datasource == "" {
return "", "", fmt.Errorf("datasource should not be an empty string")
}
dses := strings.Split(datasource, "://")
if len(dses) != 2 {
return "", "", fmt.Errorf("Expecting but cannot find :// in datasource %v", datasource)
Yancey1989 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/executor_ir.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func RunSQLProgram(sqlProgram string, modelDir string, session *pb.Session) *Pip
var db *DB
var err error
if db, err = NewDB(session.DbConnStr); err != nil {
wr.Write(fmt.Sprintf("create DB failed: %v", err))
wr.Write(fmt.Errorf("create DB failed: %v", err))
log.Errorf("create DB failed: %v", err)
}
defer wr.Close()
Expand Down Expand Up @@ -128,8 +128,8 @@ func SubmitWorkflow(sqlProgram string, modelDir string, session *pb.Session) *Pi
return rd
}

func writeCoulerFile(spIRs ir.SQLProgram) (string, error) {
program, err := couler.Run(spIRs)
func writeCoulerFile(spIRs ir.SQLProgram, session *pb.Session) (string, error) {
program, err := couler.Run(spIRs, session)
if err != nil {
return "", fmt.Errorf("generate couler program error: %v", err)
}
Expand Down Expand Up @@ -202,19 +202,21 @@ func submitWorkflow(wr *PipeWriter, sqlProgram string, modelDir string, session
}

// 1. call codegen_couler.go to genearte Couler program.
coulerFileName, err := writeCoulerFile(spIRs)
coulerFileName, err := writeCoulerFile(spIRs, session)
if err != nil {
return err
}
defer os.RemoveAll(coulerFileName)

// 2. compile Couler program into Argo YAML.
argoFile, err := writeArgoFile(coulerFileName)
argoFileName, err := writeArgoFile(coulerFileName)
if err != nil {
return err
}
defer os.RemoveAll(argoFileName)

// 3. submit Argo YAML and fetch the workflow ID.
cmd := exec.Command("kubectl", "create", "-f", argoFile)
cmd := exec.Command("kubectl", "create", "-f", argoFileName)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("submit Argo YAML error: %v, output: %s", err, string(output))
Expand Down
23 changes: 0 additions & 23 deletions pkg/sql/executor_ir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"fmt"
"io/ioutil"
"os"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -279,25 +278,3 @@ func TestLogChanWriter_Write(t *testing.T) {
_, more := <-c
a.False(more)
}

func TestSubmitWorkflow(t *testing.T) {
if os.Getenv("SQLFLOW_ARGO_MODE") != "True" {
t.Skip("argo: skip Argo tests")
}
a := assert.New(t)
modelDir := ""
a.NotPanics(func() {
rd := SubmitWorkflow(testXGBoostTrainSelectIris, modelDir, getDefaultSession())
for r := range rd.ReadAll() {
switch r.(type) {
case WorkflowJob:
job := r.(WorkflowJob)
a.True(strings.HasPrefix(job.JobID, "sqlflow-couler"))
// TODO(tony): wait to check if job succeeded.
// The workflow is currently failed since we haven't configure the data source.
default:
a.Fail("SubmitWorkflow should return JobID")
}
}
})
}
53 changes: 38 additions & 15 deletions scripts/test_couler.sh → scripts/test_workflow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

set -e

export SQLFLOW_TEST=workflow
############# Run Couler unit tests #############
pip install -r python/couler/requirements.txt

Expand All @@ -29,12 +30,9 @@ function test_couler() {
import couler.argo as couler
couler.run_container(image="docker/whalesay", command='echo "SQLFlow bridges AI and SQL engine."')
EOF



couler run --mode argo --file /tmp/sqlflow_couler.py > /tmp/sqlflow_argo.yaml

MESSAGE=$(kubectl create -f /tmp/sqlflow_argo.yaml)

WORKFLOW_NAME=$(echo ${MESSAGE} | cut -d ' ' -f 1 | cut -d '/' -f 2)

echo WORKFLOW_NAME ${WORKFLOW_NAME}
Expand All @@ -55,18 +53,43 @@ EOF
return 1
}

test_couler
ret=$?
function check_ret() {
ret=$1
message=$2
echo $ret $message
if [[ "$ret" != "0" ]]; then
echo $message
exit 1
fi
}

if [[ "$ret" != "0" ]]; then
echo "Argo job timed out."
rm -rf /tmp/sqlflow*
exit 1
fi
test_couler
check_ret $? "Test Couler failed"

############# Run SQLFLow test with Argo Mode #############
service mysql start
go generate ./...
go install ./...
SQLFLOW_ARGO_MODE=True go test ./pkg/sql/. -run TestSubmitWorkflow -v
function test_workflow() {
# start a SQLFlow MySQL Pod with testdata
kubectl run mysql --port 3306 --env="SQLFLOW_MYSQL_HOST=0.0.0.0" --env="SQLFLOW_MYSQL_PORT=3306" --image=sqlflow/sqlflow --command -- bash /start.sh mysql
MYSQL_POD_NAME=$(kubectl get pod -l run=mysql -o jsonpath="{.items[0].metadata.name}")

for i in {1..30}
do
MYSQL_POD_STATUS=$(kubectl get pod ${MYSQL_POD_NAME} -o jsonpath='{.status.phase}')
echo ${MYSQL_POD_STATUS}
if [[ "${MYSQL_POD_STATUS}" == "Running" ]]; then
echo "SQLFlow MySQL Pod running."
MYSQL_POD_IP=$(kubectl get pod ${MYSQL_POD_NAME} -o jsonpath='{.status.podIP}')
go generate ./...
SQLFLOW_TEST_DATASOURCE="mysql://root:root@tcp(${MYSQL_POD_IP}:3306)/?maxAllowedPacket=0" go test ./cmd/... -run TestEnd2EndMySQLWorkflow -v
return 0
else
echo "Wait SQLFlow MySQL Pod ${MYSQL_POD_NAME}"
sleep ${CHECK_INTERVAL_SECS}
fi
done
echo "Launch SQLFlow MySQL Pod times out"
return 1
}

test_workflow
check_ret $? "Test SQLFLow workflow failed"