-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdal.go
126 lines (105 loc) · 4.16 KB
/
dal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package gta
import (
"time"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type taskDAL interface {
Create(tx *gorm.DB, task *Task) error
Get(tx *gorm.DB, id uint64) (*Task, error)
GetForUpdate(tx *gorm.DB, id uint64) (*Task, error)
GetInitialized(tx *gorm.DB, sensitiveKeys []TaskKey, offset time.Duration, insensitiveKeys []TaskKey) (*Task, error)
GetSliceByOffsetsAndStatus(tx *gorm.DB, startOffset, endOffset time.Duration, status TaskStatus) ([]Task, error)
GetSliceExcludeSucceeded(tx *gorm.DB, excludeKeys []TaskKey, limit, offset int) ([]Task, error)
Update(tx *gorm.DB, task *Task) (int64, error)
UpdateStatusByIDs(tx *gorm.DB, taskIDs []uint64, ori TaskStatus, new TaskStatus) (int64, error)
DeleteSucceededByOffset(tx *gorm.DB, offset time.Duration, excludeKeys []TaskKey) (int64, error)
DeleteByIDAndStatus(tx *gorm.DB, id uint64, status TaskStatus) (int64, error)
}
type taskDALImp struct {
*options
}
func (s *taskDALImp) tabledDB(tx *gorm.DB) *gorm.DB {
return tx.Table(s.table)
}
func (s *taskDALImp) Create(tx *gorm.DB, task *Task) error {
return s.tabledDB(tx).Create(&task).Error
}
func (s *taskDALImp) Get(tx *gorm.DB, id uint64) (*Task, error) {
var rule Task
if err := s.tabledDB(tx).Where("id = ?", id).Take(&rule).Error; err == gorm.ErrRecordNotFound {
return nil, nil
} else if err != nil {
return nil, err
}
return &rule, nil
}
func (s *taskDALImp) GetForUpdate(tx *gorm.DB, id uint64) (*Task, error) {
var rule Task
if err := s.tabledDB(tx).Where("id = ?", id).Clauses(clause.Locking{Strength: "UPDATE"}).Take(&rule).
Error; err == gorm.ErrRecordNotFound {
return nil, nil
} else if err != nil {
return nil, err
}
return &rule, nil
}
func (s *taskDALImp) GetInitialized(tx *gorm.DB, sensitiveKeys []TaskKey, offset time.Duration,
insensitiveKeys []TaskKey) (*Task, error) {
var rule Task
db := s.tabledDB(tx).Where("task_status = ?", TaskStatusInitialized)
if len(sensitiveKeys) > 0 && len(insensitiveKeys) > 0 {
db = db.Where("(updated_at >= ? AND task_key IN (?)) OR (task_key IN (?))", time.Now().Add(-offset),
sensitiveKeys, insensitiveKeys)
} else if len(sensitiveKeys) > 0 {
db = db.Where("updated_at >= ? AND task_key IN (?)", time.Now().Add(-offset), sensitiveKeys)
} else if len(insensitiveKeys) > 0 {
db = db.Where("task_key IN (?)", insensitiveKeys)
}
if err := db.Take(&rule).Error; err == gorm.ErrRecordNotFound {
return nil, nil
} else if err != nil {
return nil, err
}
return &rule, nil
}
func (s *taskDALImp) GetSliceByOffsetsAndStatus(tx *gorm.DB, startOffset, endOffset time.Duration,
status TaskStatus) ([]Task, error) {
timeNow := time.Now()
var res []Task
err := s.tabledDB(tx).Where("task_status = ? AND updated_at BETWEEN ? AND ?",
status, timeNow.Add(-startOffset), timeNow.Add(-endOffset)).Find(&res).Error
return res, err
}
func (s *taskDALImp) GetSliceExcludeSucceeded(tx *gorm.DB, excludeKeys []TaskKey, limit, offset int) ([]Task, error) {
var res []Task
db := s.tabledDB(tx).Where("task_status <> ?", TaskStatusSucceeded)
if len(excludeKeys) > 0 {
db = db.Where("task_key NOT IN (?)", excludeKeys)
}
err := db.Limit(limit).Offset(offset).Find(&res).Error
return res, err
}
func (s *taskDALImp) Update(tx *gorm.DB, task *Task) (int64, error) {
db := s.tabledDB(tx).Updates(task)
return db.RowsAffected, db.Error
}
func (s *taskDALImp) UpdateStatusByIDs(tx *gorm.DB, ids []uint64, oriStatus TaskStatus, newStatus TaskStatus) (int64, error) {
db := s.tabledDB(tx).Where("id IN (?) AND task_status = ?", ids, oriStatus).Updates(&Task{TaskStatus: newStatus})
return db.RowsAffected, db.Error
}
func (s *taskDALImp) DeleteSucceededByOffset(tx *gorm.DB, offset time.Duration, excludeKeys []TaskKey) (int64,
error) {
var rule Task
db := s.tabledDB(tx).Where("task_status = ? AND updated_at < ?", TaskStatusSucceeded, time.Now().Add(-offset))
if len(excludeKeys) > 0 {
db = db.Where("task_key NOT IN (?)", excludeKeys)
}
db.Delete(&rule)
return db.RowsAffected, db.Error
}
func (s *taskDALImp) DeleteByIDAndStatus(tx *gorm.DB, id uint64, status TaskStatus) (int64, error) {
var rule Task
db := s.tabledDB(tx).Where("task_status = ? AND id = ?", status, id).Delete(&rule)
return db.RowsAffected, db.Error
}