Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer - Fix delayed artifacts mechanism #961

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 37 additions & 16 deletions artifactory/commands/transferfiles/delayedartifactshandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func consumeAllDelayFiles(base phaseBase) error {
delayFunctions := getDelayUploadComparisonFunctions(base.repoSummary.PackageType)
if len(filesToConsume) > 0 && len(delayFunctions) > 0 {
log.Info("Starting to handle delayed artifacts uploads...")
if err = handleDelayedArtifactsFiles(filesToConsume, base, delayFunctions[1:]); err == nil {
if err = handleDelayedArtifactsFiles(filesToConsume, base, delayFunctions); err == nil {
log.Info("Done handling delayed artifacts uploads.")
}
}
Expand Down Expand Up @@ -149,32 +149,38 @@ func handleDelayedArtifactsFiles(filesToConsume []string, base phaseBase, delayU
}
return consumeDelayedArtifactsFiles(pcWrapper, filesToConsume, uploadChunkChan, base, delayHelper, errorsChannelMng)
}
delayAction := func(pBase phaseBase, addedDelayFiles []string) error {
// We call this method as a recursion in order to have inner order base on the comparison function list.
// Remove the first delay comparison function one by one to no longer delay it until the list is empty.
if len(filesToConsume) > 0 && len(delayUploadComparisonFunctions) > 0 {
return handleDelayedArtifactsFiles(addedDelayFiles, pBase, delayUploadComparisonFunctions[1:])
}
return nil
}
return manager.doTransferWithProducerConsumer(action, delayAction)
return manager.doTransferWithProducerConsumer(action, nil)
}

func consumeDelayedArtifactsFiles(pcWrapper *producerConsumerWrapper, filesToConsume []string, uploadChunkChan chan UploadedChunk, base phaseBase, delayHelper delayUploadHelper, errorsChannelMng *ErrorsChannelMng) error {
var allDelayedArtifacts []api.FileRepresentation

// Read all delayed artifacts files
for _, filePath := range filesToConsume {
log.Debug("Handling delayed artifacts file: '" + filePath + "'")
log.Debug("Reading delayed artifacts file: '" + filePath + "'")
delayedArtifactsFile, err := readDelayFile(filePath)
if err != nil {
return err
}

shouldStop, err := uploadByChunks(delayedArtifactsFile.DelayedArtifacts, uploadChunkChan, base, delayHelper, errorsChannelMng, pcWrapper)
allDelayedArtifacts = append(allDelayedArtifacts, delayedArtifactsFile.DelayedArtifacts...)
}

// Upload delayed artifacts, if any exist
if len(allDelayedArtifacts) > 0 {
log.Info(fmt.Sprintf("Uploading %d delayed artifacts...", len(allDelayedArtifacts)))
allDelayedArtifacts = sortDelayedArtifacts(&allDelayedArtifacts, delayHelper)
// Use an empty delayUploadHelper to prevent further delays of these files.
shouldStop, err := uploadByChunks(allDelayedArtifacts, uploadChunkChan, base, delayUploadHelper{}, errorsChannelMng, pcWrapper)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a delayed artifact fails here, for example, 'conanfile.py', then the following file at the ordered delayed list is for example 'conaninfo.txt' and no issue at upload, won't it be a problem later when we try to upload It again?

if err != nil || shouldStop {
return err
}
log.Info("Done uploading delayed artifacts")
}

// Remove the file, so it won't be consumed again.
if err = os.Remove(filePath); err != nil {
// Remove the delayed artifacts files, so they won't be consumed again.
for _, filePath := range filesToConsume {
if err := os.Remove(filePath); err != nil {
return errorutils.CheckError(err)
}

Expand All @@ -183,6 +189,22 @@ func consumeDelayedArtifactsFiles(pcWrapper *producerConsumerWrapper, filesToCon
return nil
}

// Sort the delayed artifacts according to the delay functions order.
// For example, in Docker, manifest.json will be uploaded before list.manifest.json.
// delayedArtifacts - The delayed artifacts to sort
// delayHelper - Includes the should delay functions
func sortDelayedArtifacts(delayedArtifacts *[]api.FileRepresentation, delayHelper delayUploadHelper) []api.FileRepresentation {
var sortedDelayedArtifacts = []api.FileRepresentation{}
for _, shouldDelayFunction := range delayHelper.shouldDelayFunctions {
for _, delayedArtifact := range *delayedArtifacts {
if shouldDelayFunction(delayedArtifact.Name) {
sortedDelayedArtifacts = append(sortedDelayedArtifacts, delayedArtifact)
}
}
}
return sortedDelayedArtifacts
}

// Reads a delay file from a given path, parses and populate a given DelayedArtifactsFile instance with the file information
func readDelayFile(path string) (DelayedArtifactsFile, error) {
// Stores the errors read from the errors file.
Expand Down Expand Up @@ -220,7 +242,7 @@ func getDelayUploadComparisonFunctions(packageType string) []shouldDelayUpload {
switch packageType {
case maven, gradle, ivy:
return []shouldDelayUpload{func(fileName string) bool {
return filepath.Ext(fileName) == ".pom"
return filepath.Ext(fileName) == ".pom" || fileName == "pom.xml"
}}
case docker:
return []shouldDelayUpload{func(fileName string) bool {
Expand All @@ -246,7 +268,6 @@ type delayUploadHelper struct {
}

// Decide whether to delay the deployment of a file by running over the shouldDelayUpload array.
// When there are multiple levels of requirements in the deployment order, the first comparison function in the array can be removed each time in order to no longer delay by that rule.
func (delayHelper delayUploadHelper) delayUploadIfNecessary(phase phaseBase, file api.FileRepresentation) (delayed, stopped bool) {
for _, shouldDelay := range delayHelper.shouldDelayFunctions {
if ShouldStop(&phase, &delayHelper, nil) {
Expand Down
29 changes: 24 additions & 5 deletions artifactory/commands/transferfiles/delayedartifactshandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ package transferfiles

import (
"fmt"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/state"
"github.com/jfrog/jfrog-cli-core/v2/utils/tests"
"github.com/jfrog/jfrog-client-go/utils/io/fileutils"
"github.com/stretchr/testify/assert"
"math"
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/state"
"github.com/jfrog/jfrog-cli-core/v2/utils/tests"
"github.com/jfrog/jfrog-client-go/utils/io/fileutils"
"github.com/stretchr/testify/assert"
)

var delayTestRepoKey = "delay-local-repo"
Expand Down Expand Up @@ -129,3 +130,21 @@ func validateDelayedArtifactsFilesContent(t *testing.T, path string) (entitiesNu
}
return len(delayedArtifacts.DelayedArtifacts)
}

var sortDelayedArtifactsCases = []struct {
shouldDelayFunctions []shouldDelayUpload
delayedArtifacts []api.FileRepresentation
sortedDelayedArtifacts []api.FileRepresentation
}{
{[]shouldDelayUpload{}, []api.FileRepresentation{}, []api.FileRepresentation{}},
{getDelayUploadComparisonFunctions(maven), []api.FileRepresentation{{Name: "pom.xml"}}, []api.FileRepresentation{{Name: "pom.xml"}}},
{getDelayUploadComparisonFunctions(docker), []api.FileRepresentation{{Name: "list.manifest.json"}, {Name: "manifest.json"}}, []api.FileRepresentation{{Name: "manifest.json"}, {Name: "list.manifest.json"}}},
{getDelayUploadComparisonFunctions(conan), []api.FileRepresentation{{Name: "conanfile.py"}, {Name: ".timestamp"}, {Name: "conaninfo.txt"}}, []api.FileRepresentation{{Name: "conanfile.py"}, {Name: "conaninfo.txt"}, {Name: ".timestamp"}}},
}

func TestSortDelayedArtifacts(t *testing.T) {
for i, testCase := range sortDelayedArtifactsCases {
delayHelper := delayUploadHelper{shouldDelayFunctions: testCase.shouldDelayFunctions}
assert.Equal(t, testCase.sortedDelayedArtifacts, sortDelayedArtifacts(&sortDelayedArtifactsCases[i].delayedArtifacts, delayHelper))
}
}
Loading