From 9cd66c51dab84bb1af68eeae3030ccb3a23956df Mon Sep 17 00:00:00 2001 From: Andrea Waltlova Date: Tue, 24 Oct 2023 15:35:01 +0200 Subject: [PATCH 01/10] First version of progress reporting Signed-off-by: Andrea Waltlova --- src/runner.go | 90 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 73 insertions(+), 17 deletions(-) diff --git a/src/runner.go b/src/runner.go index aadfcd6..23dafa8 100644 --- a/src/runner.go +++ b/src/runner.go @@ -5,6 +5,7 @@ import ( "os" "os/exec" "strings" + "time" "git.sr.ht/~spc/go-log" "gopkg.in/yaml.v3" @@ -85,6 +86,74 @@ func setEnvVariablesForCommand(cmd *exec.Cmd, variables map[string]string) { } } +// Runs given command and sends stdout to given channel. doneCh used to signal that execution ended. +func runCommandWithOutput(cmd *exec.Cmd, outputCh chan string, doneCh chan bool) { + cmdOutput, err := cmd.StdoutPipe() + if err != nil { + outputCh <- fmt.Sprintf("Error: %v", err) + return + } + + if err := cmd.Start(); err != nil { + outputCh <- fmt.Sprintf("Error: %v", err) + return + } + + go func() { + defer close(outputCh) + + buf := make([]byte, 1024) + for { + n, err := cmdOutput.Read(buf) + if err != nil { + return + } + outputCh <- string(buf[:n]) + } + }() + + if err := cmd.Wait(); err != nil { + log.Errorln("Failed to execute script: ", err) + outputCh <- fmt.Sprintf("Error: %v", err) + } + + doneCh <- true // Signal that the command has finished +} + +// Executes command and reports status back to dispatcher +func executeCommandWithProgress(command string, interpreter string, variables map[string]string) string { + log.Infoln("Executing script...") + + cmd := exec.Command(interpreter, command) + setEnvVariablesForCommand(cmd, variables) + + outputCh := make(chan string) + doneCh := make(chan bool) + go runCommandWithOutput(cmd, outputCh, doneCh) + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + var bufferedOutput string + + for { + select { + case output := <-outputCh: + bufferedOutput += output + case <-ticker.C: + // NOTE: the output so far + // TODO: this has to be sent to dispatcher + fmt.Print(bufferedOutput) + + // bufferedOutput = "" // Clear the buffer after printing + // fmt.Println("Still running...") + case <-doneCh: + // Execution is done + return bufferedOutput + } + } +} + // Parses given yaml data. // If signature is valid then extracts the script to temporary file, // sets env variables if present and then runs the script. @@ -120,22 +189,9 @@ func processSignedScript(incomingContent []byte) string { []byte(yamlContent.Vars.Content), *config.TemporaryWorkerDirectory) defer os.Remove(scriptFileName) - log.Infoln("Processing script ...") - // Execute script - log.Infoln("Executing script...") - cmd := exec.Command(yamlContent.Vars.Interpreter, scriptFileName) //nolint:gosec - setEnvVariablesForCommand(cmd, yamlContent.Vars.ContentVars) - - out, err := cmd.CombinedOutput() - if err != nil { - log.Errorln("Failed to execute script: ", err) - if len(out) > 0 { - log.Errorln(string(out)) - } - return "" - } - - log.Infoln("Script executed successfully.") - return string(out) + log.Infoln("Processing script ...") + out := executeCommandWithProgress( + scriptFileName, yamlContent.Vars.Interpreter, yamlContent.Vars.ContentVars) + return out } From 6b7d715cdb546aa578571173c679a0691c4b9a6c Mon Sep 17 00:00:00 2001 From: Andrea Waltlova Date: Wed, 25 Oct 2023 12:59:51 +0200 Subject: [PATCH 02/10] Log buffered output in given interval Signed-off-by: Andrea Waltlova --- src/runner.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/runner.go b/src/runner.go index 23dafa8..8361b9b 100644 --- a/src/runner.go +++ b/src/runner.go @@ -143,12 +143,12 @@ func executeCommandWithProgress(command string, interpreter string, variables ma case <-ticker.C: // NOTE: the output so far // TODO: this has to be sent to dispatcher - fmt.Print(bufferedOutput) - - // bufferedOutput = "" // Clear the buffer after printing - // fmt.Println("Still running...") + log.Infoln("Still running ...") + log.Infoln(bufferedOutput) case <-doneCh: // Execution is done + log.Infoln("Execution done ...") + log.Infoln(bufferedOutput) return bufferedOutput } } From d313d1675ec1e622551a83ca50e2bcebb533cdcb Mon Sep 17 00:00:00 2001 From: Andrea Waltlova Date: Wed, 25 Oct 2023 15:08:29 +0200 Subject: [PATCH 03/10] Add log messages and decrease buffer size Signed-off-by: Andrea Waltlova --- src/runner.go | 45 +++++++++++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/src/runner.go b/src/runner.go index 8361b9b..c8d3d58 100644 --- a/src/runner.go +++ b/src/runner.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "io" "os" "os/exec" "strings" @@ -91,27 +92,39 @@ func runCommandWithOutput(cmd *exec.Cmd, outputCh chan string, doneCh chan bool) cmdOutput, err := cmd.StdoutPipe() if err != nil { outputCh <- fmt.Sprintf("Error: %v", err) - return - } - - if err := cmd.Start(); err != nil { - outputCh <- fmt.Sprintf("Error: %v", err) + doneCh <- true return } go func() { - defer close(outputCh) - - buf := make([]byte, 1024) + // NOTE: This could be set by config or by env variable in received yaml + var bufferSize = 512 + buf := make([]byte, bufferSize) for { + log.Infoln("Writing to buffer ...") n, err := cmdOutput.Read(buf) + log.Infoln("Buffer full ...") + if err != nil { + if err == io.EOF { + log.Infoln("Command stdout closed") + return + } + log.Infoln("Error after buffer closed") return } + log.Infoln("Writing data to output channel ...") outputCh <- string(buf[:n]) + buf = make([]byte, bufferSize) } }() + if err := cmd.Start(); err != nil { + outputCh <- fmt.Sprintf("Error: %v", err) + doneCh <- true + return + } + if err := cmd.Wait(); err != nil { log.Errorln("Failed to execute script: ", err) outputCh <- fmt.Sprintf("Error: %v", err) @@ -127,28 +140,32 @@ func executeCommandWithProgress(command string, interpreter string, variables ma cmd := exec.Command(interpreter, command) setEnvVariablesForCommand(cmd, variables) + var bufferedOutput string outputCh := make(chan string) + defer close(outputCh) doneCh := make(chan bool) + defer close(doneCh) + go runCommandWithOutput(cmd, outputCh, doneCh) ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() - var bufferedOutput string - for { select { case output := <-outputCh: + // TODO: this has to be sent to dispatcher back to report to UI + // the idea is to send partial output if buffer with given size sent the output to channel + log.Info(output) + + // Append partial to all output bufferedOutput += output case <-ticker.C: - // NOTE: the output so far - // TODO: this has to be sent to dispatcher + // NOTE: If just message without output is also okay we could send just still running log.Infoln("Still running ...") - log.Infoln(bufferedOutput) case <-doneCh: // Execution is done log.Infoln("Execution done ...") - log.Infoln(bufferedOutput) return bufferedOutput } } From 0bce829c1ea887d21022af5381922b4d64ac686d Mon Sep 17 00:00:00 2001 From: Andrea Waltlova Date: Wed, 25 Oct 2023 15:29:44 +0200 Subject: [PATCH 04/10] Derease the byte slice for read to 256 and fix lint Signed-off-by: Andrea Waltlova --- src/runner.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/runner.go b/src/runner.go index c8d3d58..c60681a 100644 --- a/src/runner.go +++ b/src/runner.go @@ -1,6 +1,7 @@ package main import ( + "errors" "fmt" "io" "os" @@ -98,7 +99,7 @@ func runCommandWithOutput(cmd *exec.Cmd, outputCh chan string, doneCh chan bool) go func() { // NOTE: This could be set by config or by env variable in received yaml - var bufferSize = 512 + var bufferSize = 256 buf := make([]byte, bufferSize) for { log.Infoln("Writing to buffer ...") @@ -106,7 +107,7 @@ func runCommandWithOutput(cmd *exec.Cmd, outputCh chan string, doneCh chan bool) log.Infoln("Buffer full ...") if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { log.Infoln("Command stdout closed") return } From f7bd023ce05660fa042acd331643b954d45aa177 Mon Sep 17 00:00:00 2001 From: Andrea Waltlova Date: Thu, 26 Oct 2023 08:27:55 +0200 Subject: [PATCH 05/10] change the logic little bit Signed-off-by: Andrea Waltlova --- src/runner.go | 39 +++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/src/runner.go b/src/runner.go index c60681a..a260142 100644 --- a/src/runner.go +++ b/src/runner.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "errors" "fmt" "io" @@ -97,26 +98,28 @@ func runCommandWithOutput(cmd *exec.Cmd, outputCh chan string, doneCh chan bool) return } + dataReadCh := make(chan bool) + defer close(dataReadCh) + go func() { - // NOTE: This could be set by config or by env variable in received yaml - var bufferSize = 256 - buf := make([]byte, bufferSize) + defer func() { + dataReadCh <- true + }() + + reader := bufio.NewReader(cmdOutput) + for { - log.Infoln("Writing to buffer ...") - n, err := cmdOutput.Read(buf) - log.Infoln("Buffer full ...") - - if err != nil { - if errors.Is(err, io.EOF) { - log.Infoln("Command stdout closed") - return - } - log.Infoln("Error after buffer closed") + line, err := reader.ReadString('\n') + if errors.Is(err, io.EOF) { + log.Infoln("Read ended with EOF") + break + } else if err != nil { + log.Infoln("Read ended with error", err) + outputCh <- fmt.Sprintf("Error reading from stdout: %v", err) return } - log.Infoln("Writing data to output channel ...") - outputCh <- string(buf[:n]) - buf = make([]byte, bufferSize) + log.Infoln("Read line: ", line) + outputCh <- line } }() @@ -126,6 +129,10 @@ func runCommandWithOutput(cmd *exec.Cmd, outputCh chan string, doneCh chan bool) return } + // NOTE: need to block here before goroutine finishes so wait doesn't close the stdout pipe + log.Infoln("Waiting to collect all stdout from running command") + <-dataReadCh + if err := cmd.Wait(); err != nil { log.Errorln("Failed to execute script: ", err) outputCh <- fmt.Sprintf("Error: %v", err) From 482a2e86849a72f9f45d8f92b001fb773f64c9bb Mon Sep 17 00:00:00 2001 From: Andrea Waltlova Date: Fri, 27 Oct 2023 10:24:55 +0200 Subject: [PATCH 06/10] Try to use Peek for peeking progress if available Signed-off-by: Andrea Waltlova --- src/runner.go | 51 ++++++++++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/src/runner.go b/src/runner.go index a260142..93639ac 100644 --- a/src/runner.go +++ b/src/runner.go @@ -90,10 +90,10 @@ func setEnvVariablesForCommand(cmd *exec.Cmd, variables map[string]string) { } // Runs given command and sends stdout to given channel. doneCh used to signal that execution ended. -func runCommandWithOutput(cmd *exec.Cmd, outputCh chan string, doneCh chan bool) { +func runCommandWithOutput(cmd *exec.Cmd, outputCh chan []byte, doneCh chan bool) { cmdOutput, err := cmd.StdoutPipe() if err != nil { - outputCh <- fmt.Sprintf("Error: %v", err) + log.Errorln("Error: ", err) doneCh <- true return } @@ -102,29 +102,34 @@ func runCommandWithOutput(cmd *exec.Cmd, outputCh chan string, doneCh chan bool) defer close(dataReadCh) go func() { - defer func() { - dataReadCh <- true - }() - reader := bufio.NewReader(cmdOutput) + readBuffer := 1024 for { - line, err := reader.ReadString('\n') - if errors.Is(err, io.EOF) { + data, err := reader.Peek(readBuffer) + switch { + case errors.Is(err, io.EOF): log.Infoln("Read ended with EOF") - break - } else if err != nil { - log.Infoln("Read ended with error", err) - outputCh <- fmt.Sprintf("Error reading from stdout: %v", err) + outputCh <- data + dataReadCh <- true return + case err == nil || errors.Is(err, io.ErrShortBuffer): + log.Infoln("Read n bytes", err) + if len(data) != 0 { + outputCh <- data + _, err := reader.Discard(len(data)) + if err != nil { + // TODO: what should I do if I want to move the reader + // If I do nothing it can only cause it to be read twice + log.Errorln("Discard failed", err) + } + } } - log.Infoln("Read line: ", line) - outputCh <- line } }() if err := cmd.Start(); err != nil { - outputCh <- fmt.Sprintf("Error: %v", err) + log.Errorln("Error: ", err) doneCh <- true return } @@ -135,7 +140,6 @@ func runCommandWithOutput(cmd *exec.Cmd, outputCh chan string, doneCh chan bool) if err := cmd.Wait(); err != nil { log.Errorln("Failed to execute script: ", err) - outputCh <- fmt.Sprintf("Error: %v", err) } doneCh <- true // Signal that the command has finished @@ -148,8 +152,8 @@ func executeCommandWithProgress(command string, interpreter string, variables ma cmd := exec.Command(interpreter, command) setEnvVariablesForCommand(cmd, variables) - var bufferedOutput string - outputCh := make(chan string) + var bufferedOutput []byte + outputCh := make(chan []byte) defer close(outputCh) doneCh := make(chan bool) defer close(doneCh) @@ -162,19 +166,16 @@ func executeCommandWithProgress(command string, interpreter string, variables ma for { select { case output := <-outputCh: - // TODO: this has to be sent to dispatcher back to report to UI - // the idea is to send partial output if buffer with given size sent the output to channel - log.Info(output) - - // Append partial to all output - bufferedOutput += output + bufferedOutput = append(bufferedOutput, output...) case <-ticker.C: // NOTE: If just message without output is also okay we could send just still running log.Infoln("Still running ...") + log.Infoln(string(bufferedOutput)) case <-doneCh: // Execution is done log.Infoln("Execution done ...") - return bufferedOutput + log.Infoln(string(bufferedOutput)) + return string(bufferedOutput) } } } From 3b41dfd3dca75bfa114facddfc3b69aae2207fc1 Mon Sep 17 00:00:00 2001 From: Andrea Waltlova Date: Mon, 30 Oct 2023 08:26:16 +0100 Subject: [PATCH 07/10] Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout flush python script stdout after every print to force 'progress' reporting Signed-off-by: Andrea Waltlova --- development/nginx/data/convert2rhel.yml | 127 +++++++++++++++++++++++- src/runner.go | 67 ++++++++----- 2 files changed, 168 insertions(+), 26 deletions(-) diff --git a/development/nginx/data/convert2rhel.yml b/development/nginx/data/convert2rhel.yml index 3ce3d89..614b409 100644 --- a/development/nginx/data/convert2rhel.yml +++ b/development/nginx/data/convert2rhel.yml @@ -11,7 +11,7 @@ import shutil import subprocess import copy - from time import gmtime, strftime + import sys from urllib2 import urlopen @@ -207,6 +207,7 @@ they come. """ print("Collecting and combining report status.") + sys.stdout.flush() action_level_combined = [] for value in actions.values(): action_level_combined.append(value["result"]["level"]) @@ -223,6 +224,7 @@ def gather_json_report(): """Collect the json report generated by convert2rhel.""" print("Collecting JSON report.") +<<<<<<< HEAD if not os.path.exists(C2R_REPORT_FILE): return {} @@ -236,6 +238,19 @@ except ValueError: # In case it is not a valid JSON content. return {} +======= + sys.stdout.flush() + with open(C2R_REPORT_FILE, "r") as handler: + data = json.load(handler) + + if not data: + print("The JSON report exists, but doesn't contain any data in it.") + sys.stdout.flush() + raise ProcessError( + message="The file '%s' doesn't contain any JSON data in it." + % C2R_REPORT_FILE + ) +>>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) return data @@ -280,11 +295,13 @@ def setup_convert2rhel(required_files): """Setup convert2rhel tool by downloading the required files.""" print("Downloading required files.") + sys.stdout.flush() for required_file in required_files: _create_or_restore_backup_file(required_file) response = urlopen(required_file.host) data = response.read() +<<<<<<< HEAD directory = os.path.dirname(required_file.path) if not os.path.exists(directory): print("Creating directory at '%s'" % directory) @@ -294,6 +311,34 @@ with open(required_file.path, mode="w") as handler: handler.write(data) os.chmod(required_file.path, 0o644) +======= + if os.path.exists(required_file.path): + print( + "File '%s' is already present on the system. Downloading a copy in order to check if they are the same." + % required_file.path + ) + sys.stdout.flush() + if ( + downloaded_file_sha512.hexdigest() + != required_file.sha512_on_system.hexdigest() + ): + raise ProcessError( + message="File '%s' present on the system does not match the one downloaded. Stopping the execution." + % required_file.path + ) + else: + directory = os.path.dirname(required_file.path) + if not os.path.exists(directory): + print("Creating directory at '%s'" % directory) + sys.stdout.flush() + os.makedirs(directory, mode=0o755) + + print("Writing file to destination: '%s'" % required_file.path) + sys.stdout.flush() + with open(required_file.path, mode="w") as handler: + handler.write(data) + os.chmod(required_file.path, 0o644) +>>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) # Code taken from @@ -318,6 +363,7 @@ if print_cmd: print("Calling command '%s'" % " ".join(cmd)) + sys.stdout.flush() process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, env=env @@ -334,17 +380,33 @@ return output, process.returncode +<<<<<<< HEAD def _get_last_yum_transaction_id(pkg_name): output, return_code = run_subprocess(["/usr/bin/yum", "history", "list", pkg_name]) if return_code: # NOTE: There is only print because list will exit with 1 when no such transaction exist +======= + def install_convert2rhel(): + """Install the convert2rhel tool to the system.""" + print("Installing & updating Convert2RHEL package.") + sys.stdout.flush() + output, returncode = run_subprocess( + ["yum", "install", "convert2rhel", "-y"], + ) + if returncode: +>>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) print( "Listing yum transaction history for '%s' failed with exit status '%s' and output '%s'" % (pkg_name, return_code, output), "\nThis may cause clean up function to not remove '%s' after Task run." % pkg_name, ) +<<<<<<< HEAD return None +======= + sys.stdout.flush() + raise ProcessError(message="Yum install exited with code '%s'." % returncode) +>>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) pattern = re.compile(r"^(\s+)?(\d+)", re.MULTILINE) matches = pattern.findall(output) @@ -370,6 +432,7 @@ output, returncode = run_subprocess( ["/usr/bin/yum", "install", c2r_pkg_name, "-y"], ) +<<<<<<< HEAD if returncode: raise ProcessError( message="Failed to install convert2rhel RPM.", @@ -388,13 +451,22 @@ ) # NOTE: If we would like to undo update we could use _get_last_yum_transaction_id(c2r_pkg_name) return False, None +======= + sys.stdout.flush() + raise ProcessError(message="Yum update exited with code '%s'." % returncode) +>>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) def run_convert2rhel(): """ Run the convert2rhel tool assigning the correct environment variables. """ +<<<<<<< HEAD print("Running Convert2RHEL Conversion") +======= + print("Running Convert2RHEL Analysis") + sys.stdout.flush() +>>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) env = {"PATH": os.environ["PATH"]} if "RHC_WORKER_CONVERT2RHEL_DISABLE_TELEMETRY" in os.environ: @@ -402,7 +474,20 @@ "RHC_WORKER_CONVERT2RHEL_DISABLE_TELEMETRY" ] +<<<<<<< HEAD return run_subprocess(["/usr/bin/convert2rhel", "-y"], env=env) +======= + output, returncode = run_subprocess( + ["/usr/bin/convert2rhel", "analyze", "-y"], env=env + ) + if returncode: + print( + "The process convert2rhel exited with code '%s' and output: %s\n" + % (returncode, output) + ) + sys.stdout.flush() + raise ProcessError(message="Convert2RHEL exited with code '%s'." % returncode) +>>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) def cleanup(required_files): @@ -421,6 +506,7 @@ "Removing the file '%s' as it was previously downloaded." % required_file.path ) + sys.stdout.flush() os.remove(required_file.path) _create_or_restore_backup_file(required_file) @@ -428,6 +514,7 @@ output, returncode = run_subprocess( ["/usr/bin/yum", "history", "undo", transaction_id], ) +<<<<<<< HEAD if returncode: print( "Undo of yum transaction with ID %s failed with exit status '%s' and output:\n%s" @@ -450,6 +537,26 @@ % (required_file.path, required_file.path + suffix) ) os.rename(required_file.path, required_file.path + ".backup") +======= + sys.stdout.flush() + + + def verify_required_files_are_present(required_files): + """Verify if the required files are already present on the system.""" + print("Checking if required files are present on the system.") + for required_file in required_files: + # Avoid race conditions + try: + print("Checking for file %s" % required_file.path) + sys.stdout.flush() + with open(required_file.path, mode="r") as handler: + required_file.sha512_on_system = hashlib.sha512(handler.read()) + required_file.is_file_present = True + except (IOError, OSError) as err: + print(err) + sys.stdout.flush() + required_file.is_file_present = False +>>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) def _generate_message_key(message, action_id): @@ -666,12 +773,30 @@ update_insights_inventory() +<<<<<<< HEAD +======= + # Generate report message and transform the raw data into entries for + # Insights. + output.message = generate_report_message(highest_level) + output.entries = transform_raw_data(data) + print("Pre-conversion assessment script finish successfully!") + sys.stdout.flush() + except ProcessError as exception: + output = OutputCollector(status="ERROR", report=exception.message) + except Exception as exception: + output = OutputCollector(status="ERROR", report=str(exception)) + finally: +>>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) print("Cleaning up modifications to the system.") + sys.stdout.flush() cleanup(required_files) print("### JSON START ###") + sys.stdout.flush() print(json.dumps(output.to_dict(), indent=4)) + sys.stdout.flush() print("### JSON END ###") + sys.stdout.flush() if __name__ == "__main__": diff --git a/src/runner.go b/src/runner.go index 93639ac..f3a29b8 100644 --- a/src/runner.go +++ b/src/runner.go @@ -2,9 +2,7 @@ package main import ( "bufio" - "errors" "fmt" - "io" "os" "os/exec" "strings" @@ -102,30 +100,50 @@ func runCommandWithOutput(cmd *exec.Cmd, outputCh chan []byte, doneCh chan bool) defer close(dataReadCh) go func() { - reader := bufio.NewReader(cmdOutput) - readBuffer := 1024 - - for { - data, err := reader.Peek(readBuffer) - switch { - case errors.Is(err, io.EOF): - log.Infoln("Read ended with EOF") - outputCh <- data - dataReadCh <- true - return - case err == nil || errors.Is(err, io.ErrShortBuffer): - log.Infoln("Read n bytes", err) - if len(data) != 0 { - outputCh <- data - _, err := reader.Discard(len(data)) - if err != nil { - // TODO: what should I do if I want to move the reader - // If I do nothing it can only cause it to be read twice - log.Errorln("Discard failed", err) - } - } + scanner := bufio.NewScanner(cmdOutput) + // TODO: not good for a buffer to have constant size ... + // Need to have some implementation of reseting the full byte slice + bufferSize := 1024 + bufferByteSlice := make([]byte, bufferSize) + scanner.Buffer(bufferByteSlice, bufferSize/4) + + for scanner.Scan() { + line := scanner.Text() + outputCh <- []byte(line + "\n") + + if cap(bufferByteSlice) == bufferSize { + bufferByteSlice = make([]byte, bufferSize) } } + dataReadCh <- true + + // NOTE: below code also works but it's dependent on default go buffer + // in our case it just means that the stdout is reported at the end + + // reader := bufio.NewReader(cmdOutput) + // readBuffer := 1024 + + // for { + // data, err := reader.Peek(readBuffer) + // switch { + // case errors.Is(err, io.EOF): + // log.Infoln("Read ended with EOF") + // outputCh <- data + // dataReadCh <- true + // return + // case err == nil || errors.Is(err, io.ErrShortBuffer): + // log.Infoln("Read n bytes", err) + // if len(data) != 0 { + // outputCh <- data + // _, err := reader.Discard(len(data)) + // if err != nil { + // // TODO: what should I do if I want to move the reader + // // If I do nothing it can only cause it to be read twice + // log.Errorln("Discard failed", err) + // } + // } + // } + // } }() if err := cmd.Start(); err != nil { @@ -174,7 +192,6 @@ func executeCommandWithProgress(command string, interpreter string, variables ma case <-doneCh: // Execution is done log.Infoln("Execution done ...") - log.Infoln(string(bufferedOutput)) return string(bufferedOutput) } } From 63d25e4b30dacb974bec6a9dccd96acf45ef4166 Mon Sep 17 00:00:00 2001 From: Andrea Waltlova Date: Mon, 30 Oct 2023 17:09:54 +0100 Subject: [PATCH 08/10] Maybe first working solution for capturing unbuffered output of script Signed-off-by: Andrea Waltlova --- src/runner.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/runner.go b/src/runner.go index f3a29b8..7a609f3 100644 --- a/src/runner.go +++ b/src/runner.go @@ -100,31 +100,40 @@ func runCommandWithOutput(cmd *exec.Cmd, outputCh chan []byte, doneCh chan bool) defer close(dataReadCh) go func() { - scanner := bufio.NewScanner(cmdOutput) // TODO: not good for a buffer to have constant size ... // Need to have some implementation of reseting the full byte slice + scanner := bufio.NewScanner(cmdOutput) bufferSize := 1024 - bufferByteSlice := make([]byte, bufferSize) - scanner.Buffer(bufferByteSlice, bufferSize/4) + scanner.Buffer(make([]byte, bufferSize), bufferSize/4) - for scanner.Scan() { - line := scanner.Text() - outputCh <- []byte(line + "\n") + for { + for scanner.Scan() { + line := scanner.Text() + outputCh <- []byte(line + "\n") + } - if cap(bufferByteSlice) == bufferSize { - bufferByteSlice = make([]byte, bufferSize) + if err := scanner.Err(); err == nil { + // Scanner reached EOF + break + } else { + // TODO: error bufio.Scanner: token too long but it looks like we are not loosing data this way + log.Infoln(err) + scanner = bufio.NewScanner(cmdOutput) + scanner.Buffer(make([]byte, bufferSize), bufferSize/4) } } dataReadCh <- true + ////////// + // NOTE: below code also works but it's dependent on default go buffer // in our case it just means that the stdout is reported at the end // reader := bufio.NewReader(cmdOutput) - // readBuffer := 1024 + // readNBufferBytes := 1024 // for { - // data, err := reader.Peek(readBuffer) + // data, err := reader.Peek(readNBufferBytes) // switch { // case errors.Is(err, io.EOF): // log.Infoln("Read ended with EOF") From 786cfc29047ae31798c846d4545c0fcd721c9757 Mon Sep 17 00:00:00 2001 From: Andrea Waltlova Date: Tue, 31 Oct 2023 08:07:59 +0100 Subject: [PATCH 09/10] Kepp only Scanner approach Signed-off-by: Andrea Waltlova --- src/runner.go | 45 +++++++-------------------------------------- 1 file changed, 7 insertions(+), 38 deletions(-) diff --git a/src/runner.go b/src/runner.go index 7a609f3..cc2990b 100644 --- a/src/runner.go +++ b/src/runner.go @@ -100,9 +100,8 @@ func runCommandWithOutput(cmd *exec.Cmd, outputCh chan []byte, doneCh chan bool) defer close(dataReadCh) go func() { - // TODO: not good for a buffer to have constant size ... - // Need to have some implementation of reseting the full byte slice scanner := bufio.NewScanner(cmdOutput) + // NOTE: Not sure how to determine how big should the initial buffer be bufferSize := 1024 scanner.Buffer(make([]byte, bufferSize), bufferSize/4) @@ -113,46 +112,16 @@ func runCommandWithOutput(cmd *exec.Cmd, outputCh chan []byte, doneCh chan bool) } if err := scanner.Err(); err == nil { - // Scanner reached EOF + // Scanner reached EOF = end of stdout of script break - } else { - // TODO: error bufio.Scanner: token too long but it looks like we are not loosing data this way - log.Infoln(err) - scanner = bufio.NewScanner(cmdOutput) - scanner.Buffer(make([]byte, bufferSize), bufferSize/4) } + // TODO: test we are not loosing data + // error bufio.Scanner: token too long + // Create new scanner with empty buffer + scanner = bufio.NewScanner(cmdOutput) + scanner.Buffer(make([]byte, bufferSize), bufferSize/4) } dataReadCh <- true - - ////////// - - // NOTE: below code also works but it's dependent on default go buffer - // in our case it just means that the stdout is reported at the end - - // reader := bufio.NewReader(cmdOutput) - // readNBufferBytes := 1024 - - // for { - // data, err := reader.Peek(readNBufferBytes) - // switch { - // case errors.Is(err, io.EOF): - // log.Infoln("Read ended with EOF") - // outputCh <- data - // dataReadCh <- true - // return - // case err == nil || errors.Is(err, io.ErrShortBuffer): - // log.Infoln("Read n bytes", err) - // if len(data) != 0 { - // outputCh <- data - // _, err := reader.Discard(len(data)) - // if err != nil { - // // TODO: what should I do if I want to move the reader - // // If I do nothing it can only cause it to be read twice - // log.Errorln("Discard failed", err) - // } - // } - // } - // } }() if err := cmd.Start(); err != nil { From 864af760e923e66be934d1e91fa350fb7bec493f Mon Sep 17 00:00:00 2001 From: Andrea Waltlova Date: Thu, 8 Feb 2024 14:03:40 +0100 Subject: [PATCH 10/10] Remove playbook - edited just on local to flush stdout Signed-off-by: Andrea Waltlova --- development/nginx/data/convert2rhel.yml | 807 ------------------------ 1 file changed, 807 deletions(-) delete mode 100644 development/nginx/data/convert2rhel.yml diff --git a/development/nginx/data/convert2rhel.yml b/development/nginx/data/convert2rhel.yml deleted file mode 100644 index 614b409..0000000 --- a/development/nginx/data/convert2rhel.yml +++ /dev/null @@ -1,807 +0,0 @@ -- name: Convert2RHEL Analysis - vars: - insights_signature: | - ascii_armored gpg signature - insights_signature_exclude: "/vars/insights_signature,/vars/content_vars" - interpreter: /usr/bin/python - content: | - import json - import os - import re - import shutil - import subprocess - import copy - import sys - - from urllib2 import urlopen - - STATUS_CODE = { - "SUCCESS": 0, - "INFO": 25, - "WARNING": 51, - "SKIP": 101, - "OVERRIDABLE": 152, - "ERROR": 202, - } - # Revert the `STATUS_CODE` dictionary to map number: name instead of name: - # number as used originally. - STATUS_CODE_NAME = {number: name for name, number in STATUS_CODE.items()} - _C2R_LOG_FOLDER = "/var/log/convert2rhel" - # Path to the convert2rhel report json file. - C2R_REPORT_FILE = "%s/convert2rhel-pre-conversion.json" % _C2R_LOG_FOLDER - # Path to the convert2rhel report textual file. - C2R_REPORT_TXT_FILE = "%s/convert2rhel-pre-conversion.txt" % _C2R_LOG_FOLDER - # Path to the archive folder for convert2rhel. - C2R_ARCHIVE_DIR = "%s/archive" % _C2R_LOG_FOLDER - YUM_TRANSACTIONS_TO_UNDO = set() - - - class RequiredFile(object): - """Holds data about files needed to download convert2rhel""" - - def __init__(self, path="", host="", keep=False): - self.path = path - self.host = host - self.keep = keep - - - class ProcessError(Exception): - """Custom exception to report errors during setup and run of conver2rhel""" - - def __init__(self, message, report): - super(ProcessError, self).__init__(report) - self.message = message - self.report = report - - - class OutputCollector(object): - """Wrapper class for script expected stdout""" - - # pylint: disable=too-many-instance-attributes - # pylint: disable=too-many-arguments - # Eight and five is reasonable in this case. - - def __init__( - self, status="", message="", report="", entries=None, alert=False, error=False - ): - self.status = status - self.alert = alert # true if error true or if conversion inhibited - self.error = error # true if the script wasn't able to finish, otherwise false - self.message = message - self.report = report - self.tasks_format_version = "1.0" - self.tasks_format_id = "oamg-format" - self.entries = entries - self.report_json = None - - def to_dict(self): - # If we have entries, then we change report_json to be a dictionary - # with the needed values, otherwise, we leave it as `None` to be - # transformed to `null` in json. - if self.entries: - self.report_json = { - "tasks_format_version": self.tasks_format_version, - "tasks_format_id": self.tasks_format_id, - "entries": self.entries, - } - - return { - "status": self.status, - "alert": self.alert, - "error": self.error, - "message": self.message, - "report": self.report, - "report_json": self.report_json, - } - - - def _check_ini_file_modified(): - rpm_va_output, ini_file_not_modified = run_subprocess( - ["/usr/bin/rpm", "-Va", "convert2rhel"] - ) - - # No modifications at all - if not ini_file_not_modified: - return False - - lines = rpm_va_output.strip().split("\n") - for line in lines: - line = line.strip().split() - status = line[0].replace(".", "").replace("?", "") - path = line[-1] - - default_ini_modified = path == "/etc/convert2rhel.ini" - md5_hash_mismatch = "5" in status - - if default_ini_modified and md5_hash_mismatch: - return True - return False - - - def check_convert2rhel_inhibitors_before_run(): - """ - Conditions that must be True in order to run convert2rhel command. - """ - default_ini_path = "/etc/convert2rhel.ini" - custom_ini_path = os.path.expanduser("~/.convert2rhel.ini") - - if os.path.exists(custom_ini_path): - raise ProcessError( - message="Custom %s was found." % custom_ini_path, - report=( - "Remove the %s file by running " - "'rm -f %s' before running the Task again." - ) - % (custom_ini_path, custom_ini_path), - ) - - if _check_ini_file_modified(): - raise ProcessError( - message="According to 'rpm -Va' command %s was modified." - % default_ini_path, - report=( - "Either remove the %s file by running " - "'rm -f %s' or uninstall convert2rhel by running " - "'yum remove convert2rhel' before running the Task again." - ) - % (default_ini_path, default_ini_path), - ) - - - def get_system_distro_version(): - """Currently we execute the task only for RHEL 7 or 8""" - print("Checking OS distribution and version ID ...") - distribution_id = None - version_id = None - try: - with open("/etc/system-release", "r") as system_release_file: - data = system_release_file.readline() - match = re.search(r"(.+?)\s?(?:release\s?)?\d", data) - if match: - # Split and get the first position, which will contain the system - # name. - distribution_id = match.group(1).split()[0].lower() - - match = re.search(r".+?(\d+)\.(\d+)\D?", data) - if match: - version_id = "%s.%s" % (match.group(1), match.group(2)) - except IOError: - print("Couldn't read /etc/system-release") - - print("Detected distribution='%s' in version='%s'" % (distribution_id, version_id)) - return distribution_id, version_id - - - def is_eligible_releases(release): - eligible_releases = "7.9" - return release == eligible_releases if release else False - - - def archive_analysis_report(file): - """Archive previous json and textual report from convert2rhel""" - stat = os.stat(file) - # Get the last modified time in UTC - last_modified_at = gmtime(stat.st_mtime) - - # Format time to a human-readable format - formatted_time = strftime("%Y%m%dT%H%M%SZ", last_modified_at) - - # Create the directory if it don't exist - if not os.path.exists(C2R_ARCHIVE_DIR): - os.makedirs(C2R_ARCHIVE_DIR) - - file_name, suffix = tuple(os.path.basename(file).rsplit(".", 1)) - archive_log_file = "%s/%s-%s.%s" % ( - C2R_ARCHIVE_DIR, - file_name, - formatted_time, - suffix, - ) - shutil.move(file, archive_log_file) - - - def find_highest_report_level(actions): - """ - Gather status codes from messages and result. We are not seeking for - differences between them as we want all the results, no matter from where - they come. - """ - print("Collecting and combining report status.") - sys.stdout.flush() - action_level_combined = [] - for value in actions.values(): - action_level_combined.append(value["result"]["level"]) - for message in value["messages"]: - action_level_combined.append(message["level"]) - - valid_action_levels = [ - level for level in action_level_combined if level in STATUS_CODE - ] - valid_action_levels.sort(key=lambda status: STATUS_CODE[status], reverse=True) - return valid_action_levels[0] - - - def gather_json_report(): - """Collect the json report generated by convert2rhel.""" - print("Collecting JSON report.") -<<<<<<< HEAD - - if not os.path.exists(C2R_REPORT_FILE): - return {} - - try: - with open(C2R_REPORT_FILE, "r") as handler: - data = json.load(handler) - - if not data: - return {} - except ValueError: - # In case it is not a valid JSON content. - return {} -======= - sys.stdout.flush() - with open(C2R_REPORT_FILE, "r") as handler: - data = json.load(handler) - - if not data: - print("The JSON report exists, but doesn't contain any data in it.") - sys.stdout.flush() - raise ProcessError( - message="The file '%s' doesn't contain any JSON data in it." - % C2R_REPORT_FILE - ) ->>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) - - return data - - - def gather_textual_report(): - """ - Collect the textual report generated by convert2rhel. - - .. note:: - We are checking if file exists here as the textual report is not - that important as the JSON report for the script and for Insights. - It's fine if the textual report does not exist, but the JSON one is - required. - """ - print("Collecting TXT report.") - data = "" - if os.path.exists(C2R_REPORT_TXT_FILE): - with open(C2R_REPORT_TXT_FILE, mode="r") as handler: - data = handler.read() - return data - - - def generate_report_message(highest_status): - """Generate a report message based on the status severity.""" - message = "" - alert = False - - if STATUS_CODE[highest_status] <= STATUS_CODE["WARNING"]: - message = ( - "No problems found. The system was converted successfully. Please," - " reboot your system at your earliest convenience to make sure that" - " the system is using the RHEL Kernel." - ) - - if STATUS_CODE[highest_status] > STATUS_CODE["WARNING"]: - message = "The conversion cannot proceed. You must resolve existing issues to perform the conversion." - alert = True - - return message, alert - - - def setup_convert2rhel(required_files): - """Setup convert2rhel tool by downloading the required files.""" - print("Downloading required files.") - sys.stdout.flush() - for required_file in required_files: - _create_or_restore_backup_file(required_file) - response = urlopen(required_file.host) - data = response.read() - -<<<<<<< HEAD - directory = os.path.dirname(required_file.path) - if not os.path.exists(directory): - print("Creating directory at '%s'" % directory) - os.makedirs(directory, mode=0o755) - - print("Writing file to destination: '%s'" % required_file.path) - with open(required_file.path, mode="w") as handler: - handler.write(data) - os.chmod(required_file.path, 0o644) -======= - if os.path.exists(required_file.path): - print( - "File '%s' is already present on the system. Downloading a copy in order to check if they are the same." - % required_file.path - ) - sys.stdout.flush() - if ( - downloaded_file_sha512.hexdigest() - != required_file.sha512_on_system.hexdigest() - ): - raise ProcessError( - message="File '%s' present on the system does not match the one downloaded. Stopping the execution." - % required_file.path - ) - else: - directory = os.path.dirname(required_file.path) - if not os.path.exists(directory): - print("Creating directory at '%s'" % directory) - sys.stdout.flush() - os.makedirs(directory, mode=0o755) - - print("Writing file to destination: '%s'" % required_file.path) - sys.stdout.flush() - with open(required_file.path, mode="w") as handler: - handler.write(data) - os.chmod(required_file.path, 0o644) ->>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) - - - # Code taken from - # https://github.com/oamg/convert2rhel/blob/v1.4.1/convert2rhel/utils.py#L345 - # and modified to adapt the needs of the tools that are being executed in this - # script. - def run_subprocess(cmd, print_cmd=True, env=None): - """ - Call the passed command and optionally log the called command - (print_cmd=True) and environment variables in form of dictionary(env=None). - Switching off printing the command can be useful in case it contains a - password in plain text. - - The cmd is specified as a list starting with the command and followed by a - list of arguments. Example: ["/usr/bin/yum", "install", ""] - """ - # This check is here because we passed in strings in the past and changed - # to a list for security hardening. Remove this once everyone is - # comfortable with using a list instead. - if isinstance(cmd, str): - raise TypeError("cmd should be a list, not a str") - - if print_cmd: - print("Calling command '%s'" % " ".join(cmd)) - sys.stdout.flush() - - process = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, env=env - ) - output = "" - for line in iter(process.stdout.readline, b""): - line = line.decode("utf8") - output += line - - # Call wait() to wait for the process to terminate so that we can - # get the return code. - process.wait() - - return output, process.returncode - - -<<<<<<< HEAD - def _get_last_yum_transaction_id(pkg_name): - output, return_code = run_subprocess(["/usr/bin/yum", "history", "list", pkg_name]) - if return_code: - # NOTE: There is only print because list will exit with 1 when no such transaction exist -======= - def install_convert2rhel(): - """Install the convert2rhel tool to the system.""" - print("Installing & updating Convert2RHEL package.") - sys.stdout.flush() - output, returncode = run_subprocess( - ["yum", "install", "convert2rhel", "-y"], - ) - if returncode: ->>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) - print( - "Listing yum transaction history for '%s' failed with exit status '%s' and output '%s'" - % (pkg_name, return_code, output), - "\nThis may cause clean up function to not remove '%s' after Task run." - % pkg_name, - ) -<<<<<<< HEAD - return None -======= - sys.stdout.flush() - raise ProcessError(message="Yum install exited with code '%s'." % returncode) ->>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) - - pattern = re.compile(r"^(\s+)?(\d+)", re.MULTILINE) - matches = pattern.findall(output) - return matches[-1][1] if matches else None - - - def _check_if_package_installed(pkg_name): - _, return_code = run_subprocess(["/usr/bin/rpm", "-q", pkg_name]) - return return_code == 0 - - - def install_convert2rhel(): - """ - Install the convert2rhel tool to the system. - Returns True and transaction ID if the c2r pkg was installed, otherwise False, None. - """ - print("Installing & updating Convert2RHEL package.") - - c2r_pkg_name = "convert2rhel" - c2r_installed = _check_if_package_installed(c2r_pkg_name) - - if not c2r_installed: - output, returncode = run_subprocess( - ["/usr/bin/yum", "install", c2r_pkg_name, "-y"], - ) -<<<<<<< HEAD - if returncode: - raise ProcessError( - message="Failed to install convert2rhel RPM.", - report="Installing convert2rhel with yum exited with code '%s' and output:\n%s" - % (returncode, output.rstrip("\n")), - ) - transaction_id = _get_last_yum_transaction_id(c2r_pkg_name) - return True, transaction_id - - output, returncode = run_subprocess(["/usr/bin/yum", "update", c2r_pkg_name, "-y"]) - if returncode: - raise ProcessError( - message="Failed to update convert2rhel RPM.", - report="Updating convert2rhel with yum exited with code '%s' and output:\n%s" - % (returncode, output.rstrip("\n")), - ) - # NOTE: If we would like to undo update we could use _get_last_yum_transaction_id(c2r_pkg_name) - return False, None -======= - sys.stdout.flush() - raise ProcessError(message="Yum update exited with code '%s'." % returncode) ->>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) - - - def run_convert2rhel(): - """ - Run the convert2rhel tool assigning the correct environment variables. - """ -<<<<<<< HEAD - print("Running Convert2RHEL Conversion") -======= - print("Running Convert2RHEL Analysis") - sys.stdout.flush() ->>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) - env = {"PATH": os.environ["PATH"]} - - if "RHC_WORKER_CONVERT2RHEL_DISABLE_TELEMETRY" in os.environ: - env["CONVERT2RHEL_DISABLE_TELEMETRY"] = os.environ[ - "RHC_WORKER_CONVERT2RHEL_DISABLE_TELEMETRY" - ] - -<<<<<<< HEAD - return run_subprocess(["/usr/bin/convert2rhel", "-y"], env=env) -======= - output, returncode = run_subprocess( - ["/usr/bin/convert2rhel", "analyze", "-y"], env=env - ) - if returncode: - print( - "The process convert2rhel exited with code '%s' and output: %s\n" - % (returncode, output) - ) - sys.stdout.flush() - raise ProcessError(message="Convert2RHEL exited with code '%s'." % returncode) ->>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) - - - def cleanup(required_files): - """ - Cleanup the downloaded files downloaded in previous steps in this script. - - If any of the required files was already present on the system, the script - will not remove that file, as it understand that it is a system file and - not something that was downloaded by the script. - """ - for required_file in required_files: - if required_file.keep: - continue - if os.path.exists(required_file.path): - print( - "Removing the file '%s' as it was previously downloaded." - % required_file.path - ) - sys.stdout.flush() - os.remove(required_file.path) - _create_or_restore_backup_file(required_file) - - for transaction_id in YUM_TRANSACTIONS_TO_UNDO: - output, returncode = run_subprocess( - ["/usr/bin/yum", "history", "undo", transaction_id], - ) -<<<<<<< HEAD - if returncode: - print( - "Undo of yum transaction with ID %s failed with exit status '%s' and output:\n%s" - % (transaction_id, returncode, output) - ) - - - def _create_or_restore_backup_file(required_file): - """ - Either creates or restores backup files (rename in both cases). - """ - suffix = ".backup" - if os.path.exists(required_file.path + suffix): - print("Restoring backed up file %s." % (required_file.path)) - os.rename(required_file.path + suffix, required_file.path) - return - if os.path.exists(required_file.path): - print( - "File %s already present on system, backing up to %s." - % (required_file.path, required_file.path + suffix) - ) - os.rename(required_file.path, required_file.path + ".backup") -======= - sys.stdout.flush() - - - def verify_required_files_are_present(required_files): - """Verify if the required files are already present on the system.""" - print("Checking if required files are present on the system.") - for required_file in required_files: - # Avoid race conditions - try: - print("Checking for file %s" % required_file.path) - sys.stdout.flush() - with open(required_file.path, mode="r") as handler: - required_file.sha512_on_system = hashlib.sha512(handler.read()) - required_file.is_file_present = True - except (IOError, OSError) as err: - print(err) - sys.stdout.flush() - required_file.is_file_present = False ->>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) - - - def _generate_message_key(message, action_id): - """ - Helper method to generate a key field in the message composed by action_id - and message_id. - Returns modified copy of original message. - """ - new_message = copy.deepcopy(message) - - new_message["key"] = "%s::%s" % (action_id, message["id"]) - del new_message["id"] - - return new_message - - - def _generate_detail_block(message): - """ - Helper method to generate the detail key that is composed by the - remediations and diagnosis fields. - Returns modified copy of original message. - """ - new_message = copy.deepcopy(message) - detail_block = { - "remediations": [], - "diagnosis": [], - } - - remediation_key = "remediations" if "remediations" in new_message else "remediation" - detail_block["remediations"].append( - {"context": new_message.pop(remediation_key, "")} - ) - detail_block["diagnosis"].append({"context": new_message.pop("diagnosis", "")}) - new_message["detail"] = detail_block - return new_message - - - def _rename_dictionary_key(message, new_key, old_key): - """Helper method to rename keys in a flatten dictionary.""" - new_message = copy.deepcopy(message) - new_message[new_key] = new_message.pop(old_key) - return new_message - - - def _filter_message_level(message, level): - """ - Filter for messages with specific level. If any of the message matches the - level, return None, otherwise, if it is different from what is expected, - return the message received to continue with the other transformations. - """ - if message["level"] != level: - return message - - return {} - - - def apply_message_transform(message, action_id): - """Apply the necessary data transformation to the given messages.""" - if not _filter_message_level(message, level="SUCCESS"): - return {} - - new_message = _generate_message_key(message, action_id) - new_message = _rename_dictionary_key(new_message, "severity", "level") - new_message = _rename_dictionary_key(new_message, "summary", "description") - new_message = _generate_detail_block(new_message) - - # Appending the `modifiers` key to the message here for now. Once we have - # this feature in the frontend, we can populate the data with it. - new_message["modifiers"] = [] - - return new_message - - - def transform_raw_data(raw_data): - """ - Method that will transform the raw data given and output in the expected - format. - - The expected format will be a flattened version of both results and - messages into a single - """ - new_data = [] - for action_id, result in raw_data["actions"].items(): - # Format the results as a single list - for message in result["messages"]: - new_data.append(apply_message_transform(message, action_id)) - - new_data.append(apply_message_transform(result["result"], action_id)) - - # Filter out None values before returning - return [data for data in new_data if data] - - - def update_insights_inventory() - """Call insights-client to update insights inventory.""" - print("Updating system status in Red Hat Insights.") - output, returncode = run_subprocess(cmd=["/usr/bin/insights-client"]) - - if returncode: - raise ProcessError( - message="Failed to update Insights Inventory by registering the system again.", - report="insights-client execution exited with code '%s' and output:\n%s" - % (returncode, output.rstrip("\n")), - ) - - print("System registered with insights-client successfully.") - - - def main(): - """Main entrypoint for the script.""" - if os.path.exists(C2R_REPORT_FILE): - archive_analysis_report(C2R_REPORT_FILE) - - if os.path.exists(C2R_REPORT_TXT_FILE): - archive_analysis_report(C2R_REPORT_TXT_FILE) - - output = OutputCollector() - gpg_key_file = RequiredFile( - path="/etc/pki/rpm-gpg/RPM-GPG-KEY-redhat-release", - host="https://www.redhat.com/security/data/fd431d51.txt", - ) - c2r_repo = RequiredFile( - path="/etc/yum.repos.d/convert2rhel.repo", - host="https://ftp.redhat.com/redhat/convert2rhel/7/convert2rhel.repo", - ) - required_files = [ - gpg_key_file, - c2r_repo, - ] - - try: - # Exit if not CentOS 7.9 - dist, version = get_system_distro_version() - if dist != "centos" or not is_eligible_releases(version): - raise ProcessError( - message="Conversion is only supported on CentOS 7.9 distributions.", - report='Exiting because distribution="%s" and version="%s"' - % (dist, version), - ) - - # Setup Convert2RHEL to be executed. - setup_convert2rhel(required_files) - check_convert2rhel_inhibitors_before_run() - installed, transaction_id = install_convert2rhel() - if installed: - YUM_TRANSACTIONS_TO_UNDO.add(transaction_id) - - stdout, returncode = run_convert2rhel() - - if returncode != 0: - output.message = ( - "An error occurred during the conversion execution. For details, refer to " - "the convert2rhel log file on the host at /var/log/convert2rhel/convert2rhel.log" - ) - output.report = ( - "convert2rhel execution exited with code %s and output: %s." - % (returncode, stdout.rstrip("\n")) - ) - return - - print("Conversion script finish successfully!") - except ProcessError as exception: - print(exception.report) - output = OutputCollector( - status="ERROR", - alert=True, - error=False, - message=exception.message, - report=exception.report, - ) - except Exception as exception: - print(str(exception)) - output = OutputCollector( - status="ERROR", - alert=True, - error=False, - message="An unexpected error occurred. Expand the row for more details.", - report=str(exception), - ) - finally: - # Gather JSON & Textual report - data = gather_json_report() - - if data: - highest_level = find_highest_report_level(actions=data["actions"]) - # Set the first position of the list as being the final status, - # that's needed because `find_highest_report_level` will sort out - # the list with the highest priority first. - output.status = highest_level - print(output) - - if not output.message: - # Generate report message and transform the raw data into - # entries for Insights. - output.message, output.alert = generate_report_message(highest_level) - - if "successfully" in output.message: - gpg_key_file.keep = True - - # NOTE: When c2r statistics on insights are not reliant on rpm being installed - # remove below line (=decide only based on install_convert2rhel() result) - YUM_TRANSACTIONS_TO_UNDO.remove(transaction_id) - # NOTE: Keep always because added/updated pkg is also kept - # (if repo existed, the .backup file will remain on system) - c2r_repo.keep = True - - if not output.report: - # Try to attach the textual report in the report if we have - # json report, otherwise, we would overwrite the report raised - # by the exception. - output.report = gather_textual_report() - - output.entries = transform_raw_data(data) - - update_insights_inventory() - -<<<<<<< HEAD -======= - # Generate report message and transform the raw data into entries for - # Insights. - output.message = generate_report_message(highest_level) - output.entries = transform_raw_data(data) - print("Pre-conversion assessment script finish successfully!") - sys.stdout.flush() - except ProcessError as exception: - output = OutputCollector(status="ERROR", report=exception.message) - except Exception as exception: - output = OutputCollector(status="ERROR", report=str(exception)) - finally: ->>>>>>> d5e5a80 (Change approach to use scanner.Buffer from bufio to set max allocation for that buffer for stdout) - print("Cleaning up modifications to the system.") - sys.stdout.flush() - cleanup(required_files) - - print("### JSON START ###") - sys.stdout.flush() - print(json.dumps(output.to_dict(), indent=4)) - sys.stdout.flush() - print("### JSON END ###") - sys.stdout.flush() - - - if __name__ == "__main__": - main() - content_vars: - # variables that will be handed to the script as environment vars - # will be prefixed with RHC_WORKER_* - CONVERT2RHEL_DISABLE_TELEMETRY: 1