-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanager.go
164 lines (149 loc) · 6.24 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package gta
import (
"context"
"sync"
"time"
"github.com/panjf2000/ants/v2"
"gorm.io/gorm"
)
// TaskManager is the overall processor of task, which includes scheduler, scanner and other components
type TaskManager struct {
*options
tr taskRegister
tass taskAssembler
tsch taskScheduler
tdal taskDAL
tmon taskMonitor
tscn taskScanner
startOnce sync.Once
stopOnce sync.Once
}
// Start starts the TaskManager. This function should be called before any other functions in a TaskManager is called.
func (s *TaskManager) Start() {
s.startOnce.Do(func() {
if s.dryRun {
// don't start scan and monitor process in dry run mode
return
}
s.registerBuiltinTasks()
s.tscn.GoScanAndSchedule()
s.tmon.GoMonitorBuiltinTasks()
time.Sleep(time.Second)
})
}
// Register binds a task definition to a certain task key. Tasks of same type usually have the same task key.
//
// Task key is a unique ID for a set of tasks with same definition. Task handler should be idempotent because a task may
// be scheduled more than once in some cases.
//
// Handler must be provided in the task definition. It would be better to provide the argument type additionally, unless
// you want to use the default argument type(i.e. map[string]interface{} for struct) inside the handler.
func (s *TaskManager) Register(key TaskKey, definition TaskDefinition) {
if err := s.tr.Register(key, definition); err != nil {
panic(err)
}
}
// Run provides the ability to asynchronously run a registered task reliably. It's an alternative to using 'go func(
// ){}' when you need to care about the ultimate success of a task.
//
// An error is returned when the task creating process failed, otherwise, the task will be scheduled asynchronously
// later. If error or panic occurs in the running process, it will be rescheduled according to the 'RetryTimes' value.
// If the retry times exceeds the maximum config value, the task is marked 'failed' in the database with error logs
// recorded. In these cases, maybe a manual operation is essential.
//
// The context passed in should be consistent with the 'ctxMarshaler' value defined in the overall configuration or the
// task definition.
func (s *TaskManager) Run(ctx context.Context, key TaskKey, arg interface{}) error {
return s.Transaction(func(tx *gorm.DB) error { return s.RunWithTx(tx, ctx, key, arg) })
}
// RunWithTx makes it possible to create a task along with other database operations in the same transaction. The task
// will be scheduled if the transaction is committed successfully, or canceled if the transaction is rolled backs. Thus,
// this is a simple implement for BASE that can be used in distributed transaction situations.
//
// The task will be scheduled immediately after the transaction is committed if you use the builtin 'Transaction'
// function below. Otherwise, it will be scheduled later in the scan process.
//
// You can create more than one task in a single transaction, like this:
//
// _ = Transaction(func(tx *gorm.DB) error {
// if err:= doSomething(); err != nil{ // do something
// return err
// }
//
// if err := RunWithTx(); err != nil { // task1
// return err
// }
//
// if err := RunWithTx(); err != nil { // task2
// return err
// }
// return nil
// })
//
func (s *TaskManager) RunWithTx(tx *gorm.DB, ctx context.Context, key TaskKey, arg interface{}) error {
return s.tsch.CreateTask(tx, ctx, key, arg)
}
// Transaction wraps the 'Transaction' function of *gorm.DB, providing the ability to schedule the tasks created inside
// once the transaction is committed successfully.
func (s *TaskManager) Transaction(fc func(tx *gorm.DB) error) (err error) {
return s.tsch.Transaction(fc)
}
// Stop provides the ability to gracefully stop current running tasks. If you cannot tolerate task failure or loss in
// cases when a termination signal is received or the pod is migrated, it would be better to explicitly call this
// function before the main process exits. Otherwise, these tasks are easily to be killed and will be reported by
// abnormal task check process later.
//
// The wait parameter determines whether to wait for all running tasks to complete.
func (s *TaskManager) Stop(wait bool) {
s.stopOnce.Do(func() {
if !s.dryRun {
// send global cancel signal
s.cancel()
}
s.tsch.Stop(wait)
})
}
// // Wait blocks the current goroutine and waits for a termination signal. Stop() will be called after the termination
// // signal is received. Maybe this function is useless, because the main function is always blocked by others, like a
// // http server.
// func (s *TaskManager) Wait() {
// ch := make(chan os.Signal, 1)
// signal.Notify(ch, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM)
// select {
// case <-ch:
// s.Stop(false)
// }
// }
// ForceRerunTasks changes specific tasks to 'initialized'.
func (s *TaskManager) ForceRerunTasks(taskIDs []uint64, status TaskStatus) (int64, error) {
return s.tdal.UpdateStatusByIDs(s.getDB(), taskIDs, status, TaskStatusInitialized)
}
// QueryUnsuccessfulTasks checks initialized, running or failed tasks.
func (s *TaskManager) QueryUnsuccessfulTasks(limit, offset int) ([]Task, error) {
return s.tdal.GetSliceExcludeSucceeded(s.getDB(), s.tr.GetBuiltInKeys(), limit, offset)
}
func (s *TaskManager) registerBuiltinTasks() {
registerCleanUpTask(s)
registerCheckAbnormalTask(s)
}
// NewTaskManager generates a new instance of TaskManager.
//
// The database and task table must be provided because this tool relies heavily on the database. For more information
// about the table schema, please refer to 'model.sql'.
func NewTaskManager(db *gorm.DB, table string, options ...Option) *TaskManager {
opts, err := newOptions(db, table, options...)
if err != nil {
panic(err)
}
tr := opts.taskRegister
tdal := &taskDALImp{options: opts}
tass := &taskAssemblerImp{options: opts}
pool, err := ants.NewPool(opts.poolSize, ants.WithLogger(opts.logger()), ants.WithNonblocking(true))
if err != nil {
panic(err)
}
tsch := &taskSchedulerImp{options: opts, register: tr, dal: tdal, assembler: tass, pool: pool}
tmon := &taskMonitorImp{options: opts, register: tr, dal: tdal, assembler: tass}
tscn := &taskScannerImp{options: opts, register: tr, dal: tdal, scheduler: tsch}
return &TaskManager{options: opts, tr: tr, tass: tass, tsch: tsch, tdal: tdal, tmon: tmon, tscn: tscn}
}