Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch Argo workflow logs #1332

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions pkg/sql/executor_ir.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type EndOfExecution struct {
}

// WorkflowJob indicates the Argo Workflow ID
// FIXME(tony): reuse workflow job definition in proto package
type WorkflowJob struct {
JobID string
}
Expand Down Expand Up @@ -163,6 +164,16 @@ func writeArgoFile(coulerFileName string) (string, error) {
return argoYaml.Name(), nil
}

func getWorkflowID(output string) (string, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated with workflow.go#getWorkflowID ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will move workflow related code to package workflow in the next PR.

reWorkflow := regexp.MustCompile(`.+/(.+) .+`)
wf := reWorkflow.FindStringSubmatch(string(output))
if len(wf) != 2 {
return "", fmt.Errorf("parse workflow ID error: %v", output)
}

return wf[1], nil
}

func submitWorkflow(wr *PipeWriter, sqlProgram string, modelDir string, session *pb.Session) error {
driverName, dataSourceName, err := SplitDataSource(session.DbConnStr)
if err != nil {
Expand Down Expand Up @@ -215,19 +226,17 @@ func submitWorkflow(wr *PipeWriter, sqlProgram string, modelDir string, session
}
defer os.RemoveAll(argoFileName)

// TODO(tony): move the following function to package workflow
// 3. submit Argo YAML and fetch the workflow ID.
cmd := exec.Command("kubectl", "create", "-f", argoFileName)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("submit Argo YAML error: %v, output: %s", err, string(output))
}
reWorkflow := regexp.MustCompile(`.+/(.+) .+`)
wf := reWorkflow.FindStringSubmatch(string(output))
var workflowID string
if len(wf) == 2 {
workflowID = wf[1]
} else {
return fmt.Errorf("parse workflow ID error: %v", err)

workflowID, err := getWorkflowID(string(output))
if err != nil {
return err
}

return wr.Write(WorkflowJob{
Expand Down
111 changes: 111 additions & 0 deletions pkg/workflow/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2019 The SQLFlow Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sql

import (
"fmt"
"os/exec"
"regexp"
"time"

pb "sqlflow.org/sqlflow/pkg/proto"
)

// Reference: https://github.com/argoproj/argo/blob/723b3c15e55d2f8dceb86f1ac0a6dc7d1a58f10b/pkg/apis/workflow/v1alpha1/workflow_types.go#L30-L38

// NodePhase is a label for the condition of a node at the current time.
type NodePhase string

// Workflow and node statuses
const (
NodePending NodePhase = "Pending"
NodeRunning NodePhase = "Running"
NodeSucceeded NodePhase = "Succeeded"
NodeSkipped NodePhase = "Skipped"
NodeFailed NodePhase = "Failed"
NodeError NodePhase = "Error"
)

func isCompletedPhase(phase NodePhase) bool {
return phase == NodeSucceeded ||
phase == NodeFailed ||
phase == NodeError ||
phase == NodeSkipped
}

func getWorkflowID(output string) (string, error) {
reWorkflow := regexp.MustCompile(`.+/(.+) .+`)
wf := reWorkflow.FindStringSubmatch(string(output))
if len(wf) != 2 {
return "", fmt.Errorf("parse workflow ID error: %v", output)
}

return wf[1], nil
}

func getWorkflowStatusPhase(job pb.Job) (string, error) {
cmd := exec.Command("kubectl", "get", "wf", job.Id, "-o", "jsonpath={.status.phase}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use https://github.com/kubernetes/client-go instead of calling the command line?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the advantage of using the client-go? I am not sure if the client-go supports parsing the Argo workflow descriptions, so I may need to spend some time to test it out.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created an issue: #1362 to discuss go-client or kubectl, maybe we can move to make some disccusion.

output, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("getWorkflowStatusPhase error: %v\n%v", string(output), err)
}

return string(output), nil
}

func getWorkflowPodName(job pb.Job) (string, error) {
cmd := exec.Command("kubectl", "get", "pods",
fmt.Sprintf(`--selector=workflows.argoproj.io/workflow=%s`, job.Id),
"-o", "jsonpath={.items[0].metadata.name}")
output, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("getWorkflowPodName error: %v\n%v", string(output), err)
}

return string(output), nil
}

func getPodLogs(podName string) (string, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kubectl logs would fetch all the logs, please add a TODO comment here to implement fetch logs by an incremental way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will add this in the next PR.

// NOTE(tony): A workflow pod usually contains two container: main and wait
// I believe wait is used for management by Argo, so we only need to care about main.
cmd := exec.Command("kubectl", "logs", podName, "main")
output, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("getPodLogs error: %v\n%v", string(output), err)
}
return string(output), nil
}

func fetchWorkflowLog(job pb.Job) (string, error) {
for {
statusPhase, err := getWorkflowStatusPhase(job)
if err != nil {
return "", err
}

// FIXME(tony): what if it is a long running job
if isCompletedPhase(NodePhase(statusPhase)) {
break
}
time.Sleep(time.Second)
}

// FIXME(tony): what if there are multiple pods
podName, err := getWorkflowPodName(job)
if err != nil {
return "", err
}

return getPodLogs(podName)
}
89 changes: 89 additions & 0 deletions pkg/workflow/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2019 The SQLFlow Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sql

import (
"fmt"
"github.com/stretchr/testify/assert"
"io/ioutil"
"os"
"os/exec"
pb "sqlflow.org/sqlflow/pkg/proto"
"testing"
)

const (
argoYAML = `apiVersion: argoproj.io/v1alpha1
kind: Workflow # new type of k8s spec
metadata:
generateName: hello-world- # name of the workflow spec
spec:
entrypoint: whalesay # invoke the whalesay template
templates:
- name: whalesay # name of the template
container:
image: docker/whalesay
command: [echo]
args: ["hello world"]
resources: # limit the resources
limits:
memory: 32Mi
cpu: 100m
`
argoYAMLOutput = `hello world
`
)

func createAndWriteTempFile(content string) (string, error) {
tmpFile, err := ioutil.TempFile("/tmp", "sqlflow-")
if err != nil {
return "", nil
}
defer tmpFile.Close()

if _, err = tmpFile.Write([]byte(content)); err != nil {
return "", err
}

return tmpFile.Name(), nil
}

func kubectlCreateFromYAML(content string) (string, error) {
fileName, err := createAndWriteTempFile(content)
if err != nil {
return "", err
}
defer os.Remove(fileName)

cmd := exec.Command("kubectl", "create", "-f", fileName)
output, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("submitYAML error: %v\n%v", string(output), err)
}

return getWorkflowID(string(output))
}

func TestFetchWorkflowLog(t *testing.T) {
if os.Getenv("SQLFLOW_TEST") != "workflow" {
t.Skip("argo: skip workflow tests")
}
a := assert.New(t)

workflowID, err := kubectlCreateFromYAML(argoYAML)
a.NoError(err)
logs, err := fetchWorkflowLog(pb.Job{Id: workflowID})
a.NoError(err)
a.Equal(argoYAMLOutput, logs)
}
4 changes: 3 additions & 1 deletion scripts/test_workflow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,6 @@ function test_workflow() {
}

test_workflow
check_ret $? "Test SQLFLow workflow failed"
check_ret $? "Test SQLFLow workflow failed"

go test -v ./pkg/workflow/