diff --git a/go.mod b/go.mod index 29c9b22..c7f6896 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/fatih/color v1.16.0 github.com/opencontainers/go-digest v1.0.0 github.com/opencontainers/image-spec v1.1.0-rc5 + github.com/panjf2000/ants v1.3.0 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.8.0 github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index e3f1c65..8e7925a 100644 --- a/go.sum +++ b/go.sum @@ -80,6 +80,8 @@ github.com/opencontainers/runc v1.1.10 h1:EaL5WeO9lv9wmS6SASjszOeQdSctvpbu0DdBQB github.com/opencontainers/runc v1.1.10/go.mod h1:+/R6+KmDlh+hOO8NkjmgkG9Qzvypzk0yXxAPYYR65+M= github.com/opencontainers/runtime-spec v1.1.0 h1:HHUyrt9mwHUjtasSbXSMvs4cyFxh+Bll4AjJ9odEGpg= github.com/opencontainers/runtime-spec v1.1.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= +github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M= +github.com/panjf2000/ants v1.3.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/pkg/client/client.go b/pkg/client/client.go index ecf35ee..0d95157 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -6,13 +6,14 @@ import ( "os" "time" - "github.com/AliyunContainerService/image-syncer/pkg/utils/types" + "github.com/fatih/color" + "github.com/panjf2000/ants" + "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" "github.com/AliyunContainerService/image-syncer/pkg/concurrent" "github.com/AliyunContainerService/image-syncer/pkg/task" - "github.com/fatih/color" - "github.com/sirupsen/logrus" + "github.com/AliyunContainerService/image-syncer/pkg/utils/types" ) // Client describes a synchronization client @@ -97,7 +98,48 @@ func (c *Client) Run() error { } } - c.openRoutinesHandleTaskAndWaitForFinish() + routinePool, _ := ants.NewPoolWithFunc(c.routineNum, func(i interface{}) { + tTask, ok := i.(task.Task) + if !ok { + c.logger.Errorf("invalid task %v", i) + return + } + + nextTasks, message, err := tTask.Run() + count, total := c.taskCounter.Increase() + finishedNumString := color.New(color.FgGreen).Sprintf("%d", count) + totalNumString := color.New(color.FgGreen).Sprintf("%d", total) + + if err != nil { + c.failedTaskList.PushBack(tTask) + c.failedTaskCounter.IncreaseTotal() + c.logger.Errorf("Failed to executed %v: %v. Now %v/%v tasks have been processed.", tTask.String(), err, + finishedNumString, totalNumString) + } else { + if tTask.Type() == task.ManifestType { + // TODO: the ignored images will not be recorded in success images list + c.successImagesList.Add(tTask.GetSource().String(), tTask.GetDestination().String()) + } + + if len(message) != 0 { + c.logger.Infof("Finish %v: %v. Now %v/%v tasks have been processed.", tTask.String(), message, + finishedNumString, totalNumString) + } else { + c.logger.Infof("Finish %v. Now %v/%v tasks have been processed.", tTask.String(), + finishedNumString, totalNumString) + } + } + + for _, t := range nextTasks { + c.taskList.PushFront(t) + c.taskCounter.IncreaseTotal() + } + }) + defer routinePool.Release() + + if err = c.handleTasks(routinePool); err != nil { + return fmt.Errorf("failed to handle tasks: %v", err) + } for times := 0; times < c.retries; times++ { c.taskCounter, c.failedTaskCounter = c.failedTaskCounter, concurrent.NewCounter(0, 0) @@ -110,7 +152,9 @@ func (c *Client) Run() error { if c.taskList.Len() != 0 { // retry to handle task c.logger.Infof("Start to retry tasks, please wait ...") - c.openRoutinesHandleTaskAndWaitForFinish() + if err = c.handleTasks(routinePool); err != nil { + return fmt.Errorf("failed to handle tasks: %v", err) + } } } @@ -144,73 +188,21 @@ func (c *Client) Run() error { return nil } -func (c *Client) openRoutinesHandleTaskAndWaitForFinish() { - broadcastChan := concurrent.NewBroadcastChan(c.routineNum) - broadcastChan.Broadcast() - - go func() { - for { - // if all the worker routines is hung and taskList is empty, stop everything - <-broadcastChan.TotalHungChan() - if c.taskList.Len() == 0 { - broadcastChan.Close() +func (c *Client) handleTasks(routinePool *ants.PoolWithFunc) error { + for { + item := c.taskList.PopFront() + // no more tasks need to handle + if item == nil { + if routinePool.Running() == 0 { + break } + time.Sleep(1 * time.Second) + continue } - }() - - concurrent.CreateRoutinesAndWaitForFinish(c.routineNum, func() { - for { - closed := broadcastChan.Wait() - - // run out of exist tasks - for { - item := c.taskList.PopFront() - // no more tasks need to handle - if item == nil { - break - } - - tTask := item.(task.Task) - - c.logger.Infof("Executing %v...", tTask.String()) - nextTasks, message, err := tTask.Run() - - count, total := c.taskCounter.Increase() - finishedNumString := color.New(color.FgGreen).Sprintf("%d", count) - totalNumString := color.New(color.FgGreen).Sprintf("%d", total) - - if err != nil { - c.failedTaskList.PushBack(tTask) - c.failedTaskCounter.IncreaseTotal() - c.logger.Errorf("Failed to executed %v: %v. Now %v/%v tasks have been processed.", tTask.String(), err, - finishedNumString, totalNumString) - } else { - if tTask.Type() == task.ManifestType { - // TODO: the ignored images will not be recorded in success images list - c.successImagesList.Add(tTask.GetSource().String(), tTask.GetDestination().String()) - } - - if len(message) != 0 { - c.logger.Infof("Finish %v: %v. Now %v/%v tasks have been processed.", tTask.String(), message, - finishedNumString, totalNumString) - } else { - c.logger.Infof("Finish %v. Now %v/%v tasks have been processed.", tTask.String(), - finishedNumString, totalNumString) - } - } - - if nextTasks != nil { - for _, t := range nextTasks { - c.taskList.PushFront(t) - c.taskCounter.IncreaseTotal() - } - broadcastChan.Broadcast() - } - } - if closed { - return - } + if err := routinePool.Invoke(item); err != nil { + return fmt.Errorf("failed to invoke routine: %v", err) } - }) + } + return nil } diff --git a/pkg/concurrent/broadcastChan.go b/pkg/concurrent/broadcastChan.go deleted file mode 100644 index 26fdb6e..0000000 --- a/pkg/concurrent/broadcastChan.go +++ /dev/null @@ -1,54 +0,0 @@ -package concurrent - -type BroadcastChan struct { - c chan struct{} - subscriberNum int - - hungCounter *Counter - totalHungChan chan struct{} -} - -func NewBroadcastChan(subscriberNum int) *BroadcastChan { - return &BroadcastChan{ - c: make(chan struct{}, subscriberNum), - subscriberNum: subscriberNum, - - hungCounter: NewCounter(0, subscriberNum), - totalHungChan: make(chan struct{}, 1), - } -} - -func (b *BroadcastChan) Close() { - close(b.c) -} - -func (b *BroadcastChan) Broadcast() { - for i := 0; i < b.subscriberNum; i++ { - select { - case b.c <- struct{}{}: - default: - continue - } - } -} - -func (b *BroadcastChan) Wait() bool { - value, _ := b.hungCounter.Increase() - if value == b.subscriberNum { - select { - case b.totalHungChan <- struct{}{}: - default: - } - } - - defer func() { - b.hungCounter.Decrease() - }() - - _, ok := <-b.c - return !ok -} - -func (b *BroadcastChan) TotalHungChan() <-chan struct{} { - return b.totalHungChan -} diff --git a/pkg/concurrent/counter.go b/pkg/concurrent/counter.go index 4df8527..c42c1ca 100644 --- a/pkg/concurrent/counter.go +++ b/pkg/concurrent/counter.go @@ -1,24 +1,23 @@ package concurrent +import "sync" + type Counter struct { - c chan struct{} + sync.Mutex count int total int } func NewCounter(count, total int) *Counter { return &Counter{ - c: make(chan struct{}, 1), count: count, total: total, } } func (c *Counter) Decrease() (int, int) { - c.c <- struct{}{} - defer func() { - <-c.c - }() + c.Lock() + defer c.Unlock() if c.count > 0 { c.count-- @@ -27,10 +26,8 @@ func (c *Counter) Decrease() (int, int) { } func (c *Counter) Increase() (int, int) { - c.c <- struct{}{} - defer func() { - <-c.c - }() + c.Lock() + defer c.Unlock() if c.count < c.total { c.count++ @@ -39,10 +36,8 @@ func (c *Counter) Increase() (int, int) { } func (c *Counter) IncreaseTotal() (int, int) { - c.c <- struct{}{} - defer func() { - <-c.c - }() + c.Lock() + defer c.Unlock() c.total++ return c.count, c.total @@ -50,10 +45,8 @@ func (c *Counter) IncreaseTotal() (int, int) { // Value return count and total func (c *Counter) Value() (int, int) { - c.c <- struct{}{} - defer func() { - <-c.c - }() + c.Lock() + defer c.Unlock() return c.count, c.total } diff --git a/pkg/concurrent/imageList.go b/pkg/concurrent/imageList.go index 271af66..5f816bf 100644 --- a/pkg/concurrent/imageList.go +++ b/pkg/concurrent/imageList.go @@ -1,62 +1,53 @@ package concurrent import ( + "sync" + "github.com/AliyunContainerService/image-syncer/pkg/utils/types" ) type ImageList struct { - c chan struct{} + sync.Mutex content types.ImageList } func NewImageList() *ImageList { return &ImageList{ - c: make(chan struct{}, 1), content: types.ImageList{}, } } func (i *ImageList) Add(src, dst string) { - i.c <- struct{}{} - defer func() { - <-i.c - }() + i.Lock() + defer i.Unlock() i.content.Add(src, dst) } func (i *ImageList) Query(src, dst string) bool { - i.c <- struct{}{} - defer func() { - <-i.c - }() + i.Lock() + defer i.Unlock() return i.content.Query(src, dst) } func (i *ImageList) Delete(key string) { - i.c <- struct{}{} - defer func() { - <-i.c - }() + i.Lock() + defer i.Unlock() delete(i.content, key) } func (i *ImageList) Rest() { - i.c <- struct{}{} - defer func() { - <-i.c - }() + i.Lock() + defer i.Unlock() i.content = types.ImageList{} } func (i *ImageList) Content() types.ImageList { - i.c <- struct{}{} - defer func() { - <-i.c - }() + i.Lock() + defer i.Unlock() return i.content } diff --git a/pkg/concurrent/list.go b/pkg/concurrent/list.go index 7a79bc5..6772790 100644 --- a/pkg/concurrent/list.go +++ b/pkg/concurrent/list.go @@ -2,25 +2,23 @@ package concurrent import ( "container/list" + "sync" ) type List struct { - c chan struct{} + sync.Mutex items *list.List } func NewList() *List { return &List{ - c: make(chan struct{}, 1), items: list.New(), } } func (l *List) PopFront() any { - l.c <- struct{}{} - defer func() { - <-l.c - }() + l.Lock() + defer l.Unlock() item := l.items.Front() if item != nil { @@ -32,47 +30,43 @@ func (l *List) PopFront() any { } func (l *List) PushBack(value any) { - l.c <- struct{}{} - defer func() { - <-l.c - }() + l.Lock() + defer l.Unlock() l.items.PushBack(value) } func (l *List) PushFront(value any) { - l.c <- struct{}{} - defer func() { - <-l.c - }() + l.Lock() + defer l.Unlock() l.items.PushFront(value) } func (l *List) PushBackList(other *List) { - l.c <- struct{}{} - defer func() { - <-l.c - }() + l.Lock() + defer l.Unlock() l.items.PushBackList(other.GetItems()) } func (l *List) GetItems() *list.List { - l.c <- struct{}{} - defer func() { - <-l.c - }() + l.Lock() + defer l.Unlock() return l.items } func (l *List) Reset() { - close(l.c) - l.c = make(chan struct{}, 1) + l.Lock() + defer l.Unlock() + l.items.Init() } func (l *List) Len() int { + l.Lock() + defer l.Unlock() + return l.items.Len() } diff --git a/pkg/task/rule.go b/pkg/task/rule.go index c9cdb3e..c0b721e 100644 --- a/pkg/task/rule.go +++ b/pkg/task/rule.go @@ -43,6 +43,12 @@ func NewRuleTask(source, destination string, } func (r *RuleTask) Run() ([]Task, string, error) { + //// random failure test + //rand.Seed(time.Now().UnixNano()) + //if rand.Intn(100)%2 == 1 { + // return nil, "", fmt.Errorf("random failure") + //} + // if source tag is not specific, get all tags of this source repo sourceURLs, err := utils.GenerateRepoURLs(r.source, r.listAllTags) if err != nil {