From 5bbcf71074466932c7651f71070133fa13e8f7ce Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Wed, 4 Dec 2019 10:00:44 +0000 Subject: [PATCH 1/5] [wip] Fetch Argo workflow logs --- pkg/sql/workflow.go | 64 ++++++++++++++++++++++++++++++++++++++++ pkg/sql/workflow_test.go | 46 +++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+) create mode 100644 pkg/sql/workflow.go create mode 100644 pkg/sql/workflow_test.go diff --git a/pkg/sql/workflow.go b/pkg/sql/workflow.go new file mode 100644 index 0000000000..55ab83e3a5 --- /dev/null +++ b/pkg/sql/workflow.go @@ -0,0 +1,64 @@ +// Copyright 2019 The SQLFlow Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "fmt" + "os/exec" + "time" +) + +// Reference: https://github.com/argoproj/argo/blob/723b3c15e55d2f8dceb86f1ac0a6dc7d1a58f10b/pkg/apis/workflow/v1alpha1/workflow_types.go#L30-L38 + +// NodePhase is a label for the condition of a node at the current time. +type NodePhase string + +// Workflow and node statuses +const ( + NodePending NodePhase = "Pending" + NodeRunning NodePhase = "Running" + NodeSucceeded NodePhase = "Succeeded" + NodeSkipped NodePhase = "Skipped" + NodeFailed NodePhase = "Failed" + NodeError NodePhase = "Error" +) + +func isCompletedPhase(phase NodePhase) bool { + return phase == NodeSucceeded || + phase == NodeFailed || + phase == NodeError || + phase == NodeSkipped +} + +func fetchWorkflowLog(job WorkflowJob) error { + fmt.Println(job.JobID) + + for i := 0; i < 10; i++ { + cmd := exec.Command("kubectl", "get", "wf", job.JobID, "-o", "jsonpath='{.status.phase}'") + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("submit Argo YAML error: %v\n%v", string(output), err) + } + + fmt.Println(i, string(output)) + time.Sleep(time.Second) + // Get Pod names + _ = `kubectl get pods --selector=workflows.argoproj.io/workflow=sqlflow-couler898061205-xppzp -o jsonpath="{.items[0].metadata.name}"` + // Get container logs + _ = `kubectl logs sqlflow-couler898061205-xppzp-246701932 main` + _ = `kubectl logs sqlflow-couler898061205-xppzp-246701932 wait` + } + + return nil +} diff --git a/pkg/sql/workflow_test.go b/pkg/sql/workflow_test.go new file mode 100644 index 0000000000..1d3068b0d6 --- /dev/null +++ b/pkg/sql/workflow_test.go @@ -0,0 +1,46 @@ +// Copyright 2019 The SQLFlow Authors. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "github.com/stretchr/testify/assert" + "os" + "strings" + "testing" +) + +func TestFetchWorkflowLog(t *testing.T) { + if os.Getenv("SQLFLOW_ARGO_MODE") != "True" { + t.Skip("argo: skip Argo tests") + } + a := assert.New(t) + modelDir := "" + a.NotPanics(func() { + rd := SubmitWorkflow(`select 1; select 1;`, testDB, modelDir, getDefaultSession()) + for r := range rd.ReadAll() { + switch r.(type) { + case WorkflowJob: + job := r.(WorkflowJob) + a.True(strings.HasPrefix(job.JobID, "sqlflow-couler")) + // TODO(tony): wait to check if job succeeded. + // The workflow is currently failed since we haven't configure the data source. + + a.NoError(fetchWorkflowLog(job)) + + default: + a.Fail("SubmitWorkflow should return JobID") + } + } + }) +} From a0979105e9b0fd37db0416490ccf030a0d27e113 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Fri, 6 Dec 2019 09:50:59 +0000 Subject: [PATCH 2/5] Add fetch workflow logs unit test --- pkg/sql/executor_ir.go | 21 +++++++---- pkg/sql/workflow.go | 60 +++++++++++++++++++++++------- pkg/sql/workflow_test.go | 80 ++++++++++++++++++++++++++++++---------- 3 files changed, 122 insertions(+), 39 deletions(-) diff --git a/pkg/sql/executor_ir.go b/pkg/sql/executor_ir.go index a63a53d1f9..afd083d598 100644 --- a/pkg/sql/executor_ir.go +++ b/pkg/sql/executor_ir.go @@ -153,6 +153,16 @@ func writeArgoFile(coulerFileName string) (string, error) { return argoYaml.Name(), nil } +func getWorkflowID(output string) (string, error) { + reWorkflow := regexp.MustCompile(`.+/(.+) .+`) + wf := reWorkflow.FindStringSubmatch(string(output)) + if len(wf) != 2 { + return "", fmt.Errorf("parse workflow ID error: %v", output) + } + + return wf[1], nil +} + func submitWorkflow(wr *PipeWriter, sqlProgram string, db *DB, modelDir string, session *pb.Session) error { sqls, err := parse(db.driverName, sqlProgram) if err != nil { @@ -205,13 +215,10 @@ func submitWorkflow(wr *PipeWriter, sqlProgram string, db *DB, modelDir string, if err != nil { return fmt.Errorf("submit Argo YAML error: %v", err) } - reWorkflow := regexp.MustCompile(`.+/(.+) .+`) - wf := reWorkflow.FindStringSubmatch(string(output)) - var workflowID string - if len(wf) == 2 { - workflowID = wf[1] - } else { - return fmt.Errorf("parse workflow ID error: %v", err) + + workflowID, err := getWorkflowID(string(output)) + if err != nil { + return err } return wr.Write(WorkflowJob{ diff --git a/pkg/sql/workflow.go b/pkg/sql/workflow.go index 55ab83e3a5..7b8753b744 100644 --- a/pkg/sql/workflow.go +++ b/pkg/sql/workflow.go @@ -41,24 +41,58 @@ func isCompletedPhase(phase NodePhase) bool { phase == NodeSkipped } -func fetchWorkflowLog(job WorkflowJob) error { - fmt.Println(job.JobID) +func getWorkflowStatusPhase(job WorkflowJob) (string, error) { + cmd := exec.Command("kubectl", "get", "wf", job.JobID, "-o", "jsonpath={.status.phase}") + output, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("getWorkflowStatusPhase error: %v\n%v", string(output), err) + } + + return string(output), nil +} + +func getWorkflowPodName(job WorkflowJob) (string, error) { + cmd := exec.Command("kubectl", "get", "pods", + fmt.Sprintf(`--selector=workflows.argoproj.io/workflow=%s`, job.JobID), + "-o", "jsonpath={.items[0].metadata.name}") + output, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("getWorkflowPodName error: %v\n%v", string(output), err) + } + + return string(output), nil +} - for i := 0; i < 10; i++ { - cmd := exec.Command("kubectl", "get", "wf", job.JobID, "-o", "jsonpath='{.status.phase}'") - output, err := cmd.CombinedOutput() +func getPodLogs(podName string) (string, error) { + // NOTE(tony): A workflow pod usually contains two container: main and wait + // I believe wait is used for management by Argo, so we only need to care about main. + cmd := exec.Command("kubectl", "logs", podName, "main") + output, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("getPodLogs error: %v\n%v", string(output), err) + } + return string(output), nil +} + +func fetchWorkflowLog(job WorkflowJob) (string, error) { + for { + statusPhase, err := getWorkflowStatusPhase(job) if err != nil { - return fmt.Errorf("submit Argo YAML error: %v\n%v", string(output), err) + return "", err } - fmt.Println(i, string(output)) + // FIXME(tony): what if it is a long running job + if isCompletedPhase(NodePhase(statusPhase)) { + break + } time.Sleep(time.Second) - // Get Pod names - _ = `kubectl get pods --selector=workflows.argoproj.io/workflow=sqlflow-couler898061205-xppzp -o jsonpath="{.items[0].metadata.name}"` - // Get container logs - _ = `kubectl logs sqlflow-couler898061205-xppzp-246701932 main` - _ = `kubectl logs sqlflow-couler898061205-xppzp-246701932 wait` } - return nil + // FIXME(tony): what if there are multiple pods + podName, err := getWorkflowPodName(job) + if err != nil { + return "", err + } + + return getPodLogs(podName) } diff --git a/pkg/sql/workflow_test.go b/pkg/sql/workflow_test.go index 1d3068b0d6..1f2d8ba298 100644 --- a/pkg/sql/workflow_test.go +++ b/pkg/sql/workflow_test.go @@ -14,33 +14,75 @@ package sql import ( + "fmt" "github.com/stretchr/testify/assert" + "io/ioutil" "os" - "strings" + "os/exec" "testing" ) +const ( + argoYAML = `apiVersion: argoproj.io/v1alpha1 +kind: Workflow # new type of k8s spec +metadata: + generateName: hello-world- # name of the workflow spec +spec: + entrypoint: whalesay # invoke the whalesay template + templates: + - name: whalesay # name of the template + container: + image: docker/whalesay + command: [echo] + args: ["hello world"] + resources: # limit the resources + limits: + memory: 32Mi + cpu: 100m +` + argoYAMLOutput = `hello world +` +) + +func createAndWriteTempFile(content string) (string, error) { + tmpFile, err := ioutil.TempFile("/tmp", "sqlflow-") + if err != nil { + return "", nil + } + defer tmpFile.Close() + + if _, err = tmpFile.Write([]byte(content)); err != nil { + return "", err + } + + return tmpFile.Name(), nil +} + +func kubectlCreateFromYAML(content string) (string, error) { + fileName, err := createAndWriteTempFile(content) + if err != nil { + return "", err + } + defer os.Remove(fileName) + + cmd := exec.Command("kubectl", "create", "-f", fileName) + output, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("submitYAML error: %v\n%v", string(output), err) + } + + return getWorkflowID(string(output)) +} + func TestFetchWorkflowLog(t *testing.T) { if os.Getenv("SQLFLOW_ARGO_MODE") != "True" { t.Skip("argo: skip Argo tests") } a := assert.New(t) - modelDir := "" - a.NotPanics(func() { - rd := SubmitWorkflow(`select 1; select 1;`, testDB, modelDir, getDefaultSession()) - for r := range rd.ReadAll() { - switch r.(type) { - case WorkflowJob: - job := r.(WorkflowJob) - a.True(strings.HasPrefix(job.JobID, "sqlflow-couler")) - // TODO(tony): wait to check if job succeeded. - // The workflow is currently failed since we haven't configure the data source. - - a.NoError(fetchWorkflowLog(job)) - - default: - a.Fail("SubmitWorkflow should return JobID") - } - } - }) + + workflowID, err := kubectlCreateFromYAML(argoYAML) + a.NoError(err) + logs, err := fetchWorkflowLog(WorkflowJob{JobID: workflowID}) + a.NoError(err) + a.Equal(argoYAMLOutput, logs) } From 2309cc567293a698754de01d60c57e919ff562fb Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Fri, 6 Dec 2019 10:24:20 +0000 Subject: [PATCH 3/5] make workflow as a separate package --- pkg/sql/executor_ir.go | 2 ++ pkg/{sql => workflow}/workflow.go | 23 ++++++++++++++++++----- pkg/{sql => workflow}/workflow_test.go | 7 ++++--- 3 files changed, 24 insertions(+), 8 deletions(-) rename pkg/{sql => workflow}/workflow.go (82%) rename pkg/{sql => workflow}/workflow_test.go (92%) diff --git a/pkg/sql/executor_ir.go b/pkg/sql/executor_ir.go index 10e9602aaa..9cf5b98315 100644 --- a/pkg/sql/executor_ir.go +++ b/pkg/sql/executor_ir.go @@ -37,6 +37,7 @@ type EndOfExecution struct { } // WorkflowJob indicates the Argo Workflow ID +// FIXME(tony): reuse workflow job definition is proto pakcage type WorkflowJob struct { JobID string } @@ -225,6 +226,7 @@ func submitWorkflow(wr *PipeWriter, sqlProgram string, modelDir string, session } defer os.RemoveAll(argoFileName) + // TODO(tony): move the following function to package workflow // 3. submit Argo YAML and fetch the workflow ID. cmd := exec.Command("kubectl", "create", "-f", argoFileName) output, err := cmd.CombinedOutput() diff --git a/pkg/sql/workflow.go b/pkg/workflow/workflow.go similarity index 82% rename from pkg/sql/workflow.go rename to pkg/workflow/workflow.go index 7b8753b744..211e3688b2 100644 --- a/pkg/sql/workflow.go +++ b/pkg/workflow/workflow.go @@ -16,7 +16,10 @@ package sql import ( "fmt" "os/exec" + "regexp" "time" + + pb "sqlflow.org/sqlflow/pkg/proto" ) // Reference: https://github.com/argoproj/argo/blob/723b3c15e55d2f8dceb86f1ac0a6dc7d1a58f10b/pkg/apis/workflow/v1alpha1/workflow_types.go#L30-L38 @@ -41,8 +44,18 @@ func isCompletedPhase(phase NodePhase) bool { phase == NodeSkipped } -func getWorkflowStatusPhase(job WorkflowJob) (string, error) { - cmd := exec.Command("kubectl", "get", "wf", job.JobID, "-o", "jsonpath={.status.phase}") +func getWorkflowID(output string) (string, error) { + reWorkflow := regexp.MustCompile(`.+/(.+) .+`) + wf := reWorkflow.FindStringSubmatch(string(output)) + if len(wf) != 2 { + return "", fmt.Errorf("parse workflow ID error: %v", output) + } + + return wf[1], nil +} + +func getWorkflowStatusPhase(job pb.Job) (string, error) { + cmd := exec.Command("kubectl", "get", "wf", job.Id, "-o", "jsonpath={.status.phase}") output, err := cmd.CombinedOutput() if err != nil { return "", fmt.Errorf("getWorkflowStatusPhase error: %v\n%v", string(output), err) @@ -51,9 +64,9 @@ func getWorkflowStatusPhase(job WorkflowJob) (string, error) { return string(output), nil } -func getWorkflowPodName(job WorkflowJob) (string, error) { +func getWorkflowPodName(job pb.Job) (string, error) { cmd := exec.Command("kubectl", "get", "pods", - fmt.Sprintf(`--selector=workflows.argoproj.io/workflow=%s`, job.JobID), + fmt.Sprintf(`--selector=workflows.argoproj.io/workflow=%s`, job.Id), "-o", "jsonpath={.items[0].metadata.name}") output, err := cmd.CombinedOutput() if err != nil { @@ -74,7 +87,7 @@ func getPodLogs(podName string) (string, error) { return string(output), nil } -func fetchWorkflowLog(job WorkflowJob) (string, error) { +func fetchWorkflowLog(job pb.Job) (string, error) { for { statusPhase, err := getWorkflowStatusPhase(job) if err != nil { diff --git a/pkg/sql/workflow_test.go b/pkg/workflow/workflow_test.go similarity index 92% rename from pkg/sql/workflow_test.go rename to pkg/workflow/workflow_test.go index 1f2d8ba298..046af02ff6 100644 --- a/pkg/sql/workflow_test.go +++ b/pkg/workflow/workflow_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "os" "os/exec" + pb "sqlflow.org/sqlflow/pkg/proto" "testing" ) @@ -75,14 +76,14 @@ func kubectlCreateFromYAML(content string) (string, error) { } func TestFetchWorkflowLog(t *testing.T) { - if os.Getenv("SQLFLOW_ARGO_MODE") != "True" { - t.Skip("argo: skip Argo tests") + if os.Getenv("SQLFLOW_TEST") != "workflow" { + t.Skip("argo: skip workflow tests") } a := assert.New(t) workflowID, err := kubectlCreateFromYAML(argoYAML) a.NoError(err) - logs, err := fetchWorkflowLog(WorkflowJob{JobID: workflowID}) + logs, err := fetchWorkflowLog(pb.Job{Id: workflowID}) a.NoError(err) a.Equal(argoYAMLOutput, logs) } From e04209333a2fecd9b8ae5528416444d18353b8df Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Fri, 6 Dec 2019 10:26:34 +0000 Subject: [PATCH 4/5] enable tests in CI --- scripts/test_workflow.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/test_workflow.sh b/scripts/test_workflow.sh index 8117f07fbc..fb15e7e49d 100644 --- a/scripts/test_workflow.sh +++ b/scripts/test_workflow.sh @@ -92,4 +92,6 @@ function test_workflow() { } test_workflow -check_ret $? "Test SQLFLow workflow failed" \ No newline at end of file +check_ret $? "Test SQLFLow workflow failed" + +go test -v ./pkg/workflow/ From 117bc8b756f02ed108b6c4c120febb221b2eaebf Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Fri, 6 Dec 2019 10:30:34 +0000 Subject: [PATCH 5/5] fix typo --- pkg/sql/executor_ir.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sql/executor_ir.go b/pkg/sql/executor_ir.go index 9cf5b98315..b8fa272ffe 100644 --- a/pkg/sql/executor_ir.go +++ b/pkg/sql/executor_ir.go @@ -37,7 +37,7 @@ type EndOfExecution struct { } // WorkflowJob indicates the Argo Workflow ID -// FIXME(tony): reuse workflow job definition is proto pakcage +// FIXME(tony): reuse workflow job definition in proto package type WorkflowJob struct { JobID string }