diff --git a/data/var/run/data-manager/task/task.json b/data/var/run/data-manager/task/task.json deleted file mode 100644 index c9c513b..0000000 --- a/data/var/run/data-manager/task/task.json +++ /dev/null @@ -1,3 +0,0 @@ -[ - -] \ No newline at end of file diff --git a/data/var/run/data-manager/task/task_err.json b/data/var/run/data-manager/task/task_err.json new file mode 100644 index 0000000..e69de29 diff --git a/go.mod b/go.mod index 38a6ce6..cf219a8 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,8 @@ require ( ) require ( + github.com/go-co-op/gocron v1.37.0 + github.com/rs/zerolog v1.33.0 github.com/sirupsen/logrus v1.9.3 github.com/swaggo/swag v1.16.3 ) @@ -51,7 +53,7 @@ require ( github.com/labstack/gommon v0.4.2 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/rs/zerolog v1.33.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/swaggo/files/v2 v2.0.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect @@ -60,6 +62,7 @@ require ( go.opentelemetry.io/otel v1.28.0 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/otel/trace v1.28.0 // indirect + go.uber.org/atomic v1.9.0 // indirect golang.org/x/tools v0.24.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 52d3779..254a2aa 100644 --- a/go.sum +++ b/go.sum @@ -75,6 +75,7 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -84,8 +85,12 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-co-op/gocron v1.37.0 h1:ZYDJGtQ4OMhTLKOKMIch+/CY70Brbb1dGdooLEhh7b0= +github.com/go-co-op/gocron v1.37.0/go.mod h1:3L/n6BkO7ABj+TrfSVXLRzsP26zmikL4ISkLQ0O8iNY= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -135,6 +140,7 @@ github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1 github.com/google/s2a-go v0.1.8 h1:zZDs9gcbt9ZPLV0ndSyQk6Kacx2g/X+SKYovpnz3SMM= github.com/google/s2a-go v0.1.8/go.mod h1:6iNWHTpQ+nfNRN5E00MSdfDwVesa8hhS32PhPO8deJA= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= @@ -151,8 +157,13 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/labstack/echo/v4 v4.12.0 h1:IKpw49IMryVB2p1a4dzwlhP1O2Tf2E0Ir/450lH+kI0= @@ -169,10 +180,15 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= @@ -190,10 +206,12 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/swaggo/echo-swagger v1.4.1 h1:Yf0uPaJWp1uRtDloZALyLnvdBeoEL5Kc7DtnjzO/TUk= @@ -231,6 +249,8 @@ go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBq go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= @@ -330,8 +350,10 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/internal/auth/nrdb.go b/internal/auth/nrdb.go index d502ba1..dfda194 100644 --- a/internal/auth/nrdb.go +++ b/internal/auth/nrdb.go @@ -89,10 +89,6 @@ func ExportNRDMFunc(params *models.CommandTask) error { logrus.Errorf("NRDBController error importing into nrdbms : %v", err) return err } - if err != nil { - logrus.Errorf("NRDBController error exporting into rdbms : %v", err) - return err - } tableList, err := NRDBC.ListTables() if err != nil { diff --git a/internal/execfunc/dummycreate.go b/internal/execfunc/dummycreate.go index 7319023..1729a63 100644 --- a/internal/execfunc/dummycreate.go +++ b/internal/execfunc/dummycreate.go @@ -34,6 +34,15 @@ func DummyCreate(params models.CommandTask) error { } logrus.Infof("successfully generated sql : %s", params.DummyPath) } + logrus.Info("start Serversql generation Boolean? :", (cast.ToInt(params.SizeServerSQL) != 0)) + if cast.ToInt(params.SizeServerSQL) != 0 { + logrus.Info("start Serversql generation") + if err := structured.GenerateRandomSQLWithServer(params.DummyPath, cast.ToInt(params.SizeServerSQL)); err != nil { + logrus.Error("failed to generate sql") + return err + } + logrus.Infof("successfully generated sql : %s", params.DummyPath) + } if cast.ToInt(params.SizeCSV) != 0 { logrus.Info("start csv generation") diff --git a/internal/zerolog/logger.go b/internal/zerolog/logger.go index 3940e0b..fb02f13 100644 --- a/internal/zerolog/logger.go +++ b/internal/zerolog/logger.go @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package log +package zlog import ( "fmt" diff --git a/main.go b/main.go index 87edc5f..86db9cb 100644 --- a/main.go +++ b/main.go @@ -17,12 +17,8 @@ package main import ( "github.com/cloud-barista/mc-data-manager/cmd" - log "github.com/cloud-barista/mc-data-manager/internal/zerolog" ) func main() { - log.Info("Data Manager", "Startup", "Is starting") cmd.Execute() - log.Info("Data Manager", "Shutdown", "Is shutting down") - } diff --git a/models/basicTask.go b/models/basicTask.go index d3bb4cb..4be2655 100644 --- a/models/basicTask.go +++ b/models/basicTask.go @@ -3,7 +3,9 @@ package models type OperationParams struct { OperationId string `json:"operationId" form:"operationId"` } - +type TagParams struct { + Tag []string `json:"tag,omitempty"` +} type TaskMeta struct { ServiceType CloudServiceType `json:"serviceType"` TaskType TaskType `json:"taskType" ` @@ -12,27 +14,43 @@ type TaskMeta struct { Description string `json:"description,omitempty"` } +type BasicTask struct { + TaskMeta `json:"meta,omitempty" swaggerignore:"true"` + Status `json:"status,omitempty" swaggerignore:"true"` +} + type Task struct { OperationParams - TaskMeta `json:"meta,omitempty" swaggerignore:"true"` - Status `json:"status,omitempty" swaggerignore:"true"` // active, inactive, etc. + TagParams + BasicTask +} + +type BasicFlow struct { + FlowID string `json:"FlowID,omitempty"` + FlowName string `json:"FlowName"` + Tasks []DataTask `json:"tasks"` + Status Status `json:"status"` } type Flow struct { OperationParams - FlowID string `json:"flowId,omitempty"` - FlowName string `json:"flowName"` - Tasks []interface{} `json:"tasks"` // List of tasks in the flow - Status Status `json:"status"` // active, inactive, etc. + BasicFlow +} + +type BasicSchedule struct { + ScheduleID string `json:"ScheduleID,omitempty"` + ScheduleName string `json:"ScheduleName"` + Tasks []DataTask `json:"tasks"` + Cron string `json:"cron"` + TimeZone string `json:"tz"` + + Status Status `json:"status"` } type Schedule struct { OperationParams - ScheduleID string `json:"scheduleId,omitempty"` - FlowID string `json:"flowId,omitempty"` // Optional, if scheduling a flow - TaskID string `json:"taskId,omitempty"` // Optional, if scheduling a task - Cron string `json:"cron"` // Cron expression for scheduling - Status Status `json:"status"` // active, inactive, etc. + TagParams + BasicSchedule } type GenarateTask struct { @@ -53,16 +71,25 @@ type GenTaskTarget struct { ProviderConfig GenFileParams } - -type DataTask struct { - Task +type BasicDataTask struct { + BasicTask + Directory string `json:"Directory,omitempty" swaggerignore:"true"` SourcePoint ProviderConfig `json:"sourcePoint,omitempty"` TargetPoint ProviderConfig `json:"targetPoint,omitempty"` } +type DataTask struct { + OperationParams + BasicDataTask +} type MigrateTask struct { DataTask } +type BasicBackupTask struct { + BasicTask + SourcePoint ProviderConfig `json:"sourcePoint,omitempty"` + TargetPoint ProviderConfig `json:"targetPoint,omitempty"` +} type BackupTask struct { DataTask } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 7e78b68..2107e82 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -16,7 +16,9 @@ limitations under the License. package utils import ( + "fmt" "os" + "time" "github.com/cloud-barista/mc-data-manager/models" ) @@ -48,6 +50,7 @@ func FileExists(filePath string) bool { return false } +// Enum Validation func IsValidStatus(s models.Status) bool { switch s { case models.StatusActive, models.StatusInactive, models.StatusPending, models.StatusFailed, models.StatusCompleted: @@ -71,3 +74,16 @@ func IsValidTaskType(s models.TaskType) bool { } return false } + +// GEN ID +func GenerateTaskID(opId string, index int) string { + return fmt.Sprintf("%s-task-%d-%s", opId, index, time.Now().Format("20060102-150405")) +} + +func GenerateFlowID(opId string) string { + return fmt.Sprintf("%s-flow-%s", opId, time.Now().Format("20060102-150405")) +} + +func GenerateScheduleID(opId string) string { + return fmt.Sprintf("%s-schedule-%s", opId, time.Now().Format("20060102-150405")) +} diff --git a/service/task/task.go b/service/task/task.go index 96928cd..52c9269 100644 --- a/service/task/task.go +++ b/service/task/task.go @@ -1,127 +1,794 @@ -package models +package task import ( + "encoding/json" "errors" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" "sync" + "time" + "github.com/cloud-barista/mc-data-manager/internal/auth" + "github.com/cloud-barista/mc-data-manager/internal/execfunc" "github.com/cloud-barista/mc-data-manager/models" + "github.com/cloud-barista/mc-data-manager/pkg/utils" + "github.com/cloud-barista/mc-data-manager/service/nrdbc" + "github.com/cloud-barista/mc-data-manager/service/osc" + "github.com/cloud-barista/mc-data-manager/service/rdbc" + "github.com/go-co-op/gocron" + "github.com/sirupsen/logrus" ) -type Task = models.Task -type Flow = models.Flow -type Schedule = models.Schedule +var ( + managerInstance *FileScheduleManager + once sync.Once +) -type TaskService struct { - tasks map[string]Task - flows map[string]Flow - schedules map[string]Schedule +// FileScheduleManager manages task schedules, flows, and tasks. +type FileScheduleManager struct { + tasks []models.DataTask + flows []models.Flow + schedules []models.Schedule mu sync.Mutex + filename string + scheduler *gocron.Scheduler } -func NewTaskService() *TaskService { - return &TaskService{ - tasks: make(map[string]Task), - flows: make(map[string]Flow), - schedules: make(map[string]Schedule), +// InitFileScheduleManager initializes the singleton instance of FileScheduleManager. +func InitFileScheduleManager() *FileScheduleManager { + once.Do(func() { + filename := "./data/var/run/data-manager/task/task.json" + + managerInstance = &FileScheduleManager{ + tasks: make([]models.DataTask, 0), + flows: make([]models.Flow, 0), + schedules: make([]models.Schedule, 0), + filename: filename, + scheduler: gocron.NewScheduler(time.UTC), + } + + if err := managerInstance.loadFromFile(); err != nil { + logrus.Errorf("Failed to load tasks from file: %v", err) + managerInstance = nil + return + } + + managerInstance.StartScheduler() + }) + + if managerInstance == nil { + logrus.Error("FileScheduleManager initialization failed") } + return managerInstance } -func (s *TaskService) CreateTask(task Task) { - s.mu.Lock() - defer s.mu.Unlock() - s.tasks[task.TaskID] = task +// StartScheduler starts the gocron scheduler. +func (m *FileScheduleManager) StartScheduler() { + m.scheduler.StartAsync() } -func (s *TaskService) GetTask(taskID string) (Task, error) { - s.mu.Lock() - defer s.mu.Unlock() - task, exists := s.tasks[taskID] - if !exists { - return Task{}, errors.New("task not found") - } - return task, nil +// StopScheduler stops the gocron scheduler. +func (m *FileScheduleManager) StopScheduler() { + m.scheduler.Stop() } -func (s *TaskService) GetTaskList() []Task { - s.mu.Lock() - defer s.mu.Unlock() - tasksList := make([]Task, 0, len(s.tasks)) - for _, task := range s.tasks { - tasksList = append(tasksList, task) +// loadFromFile loads the schedules from the specified file. +func (m *FileScheduleManager) loadFromFile() error { + m.mu.Lock() + defer m.mu.Unlock() + + file, err := os.Open(m.filename) + if err != nil { + if os.IsNotExist(err) { + logrus.Warnf("Task file %s does not exist, skipping load", m.filename) + return nil + } + return fmt.Errorf("failed to open task file %s: %w", m.filename, err) } - return tasksList -} + defer file.Close() + + decoder := json.NewDecoder(file) + data := struct { + Tasks []models.DataTask `json:"tasks"` + Flows []models.Flow `json:"flows"` + Schedules []models.Schedule `json:"schedules"` + }{} + + err = decoder.Decode(&data) + if err != nil { + logrus.Errorf("Failed to decode task file %s: %v. Saving corrupted file as task_err.json and skipping load.", m.filename, err) + + // Create a backup of the corrupted file as task_err.json + err = backupAndRemoveCorruptedFile(m.filename) + if err != nil { + return fmt.Errorf("failed to backup and remove corrupted file: %w", err) + } -func (s *TaskService) UpdateTask(task Task) error { - s.mu.Lock() - defer s.mu.Unlock() - _, exists := s.tasks[task.TaskID] - if !exists { - return errors.New("task not found") + return nil } - s.tasks[task.TaskID] = task + + m.tasks = data.Tasks + m.flows = data.Flows + m.schedules = data.Schedules + + for _, schedule := range m.schedules { + _, err := m.scheduler.Cron(schedule.Cron).Tag(schedule.ScheduleID).Do(m.RunTasks, schedule.Tasks) + if err != nil { + return fmt.Errorf("failed to schedule tasks for schedule %s: %w", schedule.ScheduleID, err) + } + } + + logrus.Infof("Successfully loaded and scheduled %d tasks from %s", len(m.schedules), m.filename) return nil } -func (s *TaskService) DeleteTask(taskID string) error { - s.mu.Lock() - defer s.mu.Unlock() - _, exists := s.tasks[taskID] - if !exists { - return errors.New("task not found") +func backupAndRemoveCorruptedFile(srcFilename string) error { + // Define the backup filename + backupFilename := filepath.Join(filepath.Dir(srcFilename), "task_err.json") + + // Open the source file + srcFile, err := os.Open(srcFilename) + if err != nil { + return fmt.Errorf("failed to open source file %s: %w", srcFilename, err) } - delete(s.tasks, taskID) + defer srcFile.Close() + + // Create the destination backup file + destFile, err := os.Create(backupFilename) + if err != nil { + return fmt.Errorf("failed to create destination file %s: %w", backupFilename, err) + } + defer destFile.Close() + + // Copy the contents from the source file to the destination file + _, err = io.Copy(destFile, srcFile) + if err != nil { + return fmt.Errorf("failed to copy data from %s to %s: %w", srcFilename, backupFilename, err) + } + + // Close files before removing the original file + srcFile.Close() + destFile.Close() + + // Remove the original corrupted file + err = os.Remove(srcFilename) + if err != nil { + return fmt.Errorf("failed to remove the original file %s: %w", srcFilename, err) + } + return nil } -func (s *TaskService) CreateFlow(flow Flow) { - s.mu.Lock() - defer s.mu.Unlock() - s.flows[flow.FlowID] = flow +// saveToFile saves the schedules to the specified file. +func (m *FileScheduleManager) saveToFile() error { + + data := struct { + Tasks []models.DataTask `json:"tasks"` + Flows []models.Flow `json:"flows"` + Schedules []models.Schedule `json:"schedules"` + }{ + Tasks: m.tasks, + Flows: m.flows, + Schedules: m.schedules, + } + // Ensure the directory exists + dir := filepath.Dir(m.filename) + err := os.MkdirAll(dir, os.ModePerm) + if err != nil { + return fmt.Errorf("failed to create directories %s: %w", dir, err) + } + + file, err := os.Create(m.filename) + if err != nil { + return err + } + defer file.Close() + + encoder := json.NewEncoder(file) + encoder.SetIndent("", " ") + return encoder.Encode(&data) +} + +// CreateSchedule creates a new schedule, saves it to the file, and registers it with the scheduler. +func (m *FileScheduleManager) CreateSchedule(schedule models.Schedule) error { + m.mu.Lock() + defer m.mu.Unlock() + + if schedule.OperationId == "" { + return errors.New("OperationId is required") + } + + schedule.ScheduleID = utils.GenerateScheduleID(schedule.OperationId) + + for i, task := range schedule.Tasks { + task.TaskMeta.TaskID = utils.GenerateTaskID(schedule.OperationId, i) + m.tasks = append(m.tasks, task) + } + + m.schedules = append(m.schedules, schedule) + + // Register the schedule with gocron using the Cron expression + if schedule.TimeZone != "" { + loc, err := time.LoadLocation(schedule.TimeZone) + if err != nil { + return fmt.Errorf("invalid time zone: %v", err) + } + m.scheduler.ChangeLocation(loc) + } else { + m.scheduler.ChangeLocation(time.UTC) // Default to UTC if no time zone is specified + } + + _, err := m.scheduler.Cron(schedule.Cron).Tag(schedule.ScheduleID).Do(m.RunTasks, schedule.Tasks) + if err != nil { + return fmt.Errorf("failed to schedule tasks: %v", err) + } + + return m.saveToFile() +} + +// GetSchedule retrieves a schedule by its ID or OperationID. +func (m *FileScheduleManager) GetSchedule(id string) (models.Schedule, error) { + m.mu.Lock() + defer m.mu.Unlock() + + // Try to find by ScheduleID + for _, schedule := range m.schedules { + if schedule.ScheduleID == id || schedule.OperationId == id { + return schedule, nil + } + } + + return models.Schedule{}, errors.New("schedule not found") +} + +// GetScheduleList retrieves a list of all schedules. +func (m *FileScheduleManager) GetScheduleList() ([]models.Schedule, error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.schedules, nil +} + +// UpdateSchedule updates an existing schedule by ScheduleID or OperationID. +func (m *FileScheduleManager) UpdateSchedule(id string, updatedSchedule models.Schedule) error { + m.mu.Lock() + defer m.mu.Unlock() + + for i, schedule := range m.schedules { + if schedule.ScheduleID == id || schedule.OperationId == id { + // Remove the existing schedule from gocron + m.scheduler.RemoveByTag(schedule.ScheduleID) + + // Update the schedule details + updatedSchedule.ScheduleID = schedule.ScheduleID + m.schedules[i] = updatedSchedule + + // Clear the existing tasks associated with this schedule + m.tasks = []models.DataTask{} + + // Iterate over the tasks and unmarshal them into DataTask objects + for j, task := range updatedSchedule.Tasks { + // Assign a new TaskID to each task + task.TaskMeta.TaskID = utils.GenerateTaskID(schedule.ScheduleID, j) + m.tasks = append(m.tasks, task) + } + + // Re-register the updated schedule with gocron + _, err := m.scheduler.Cron(updatedSchedule.Cron).Tag(updatedSchedule.ScheduleID).Do(m.RunTasks, updatedSchedule.Tasks) + if err != nil { + return fmt.Errorf("failed to schedule tasks: %v", err) + } + + return m.saveToFile() + } + } + + return errors.New("schedule not found") +} + +// DeleteSchedule deletes a schedule by ScheduleID or OperationID. +func (m *FileScheduleManager) DeleteSchedule(id string) error { + m.mu.Lock() + defer m.mu.Unlock() + + for i, schedule := range m.schedules { + if schedule.ScheduleID == id || schedule.OperationId == id { + // Remove the schedule from gocron + m.scheduler.RemoveByTag(schedule.ScheduleID) + + // Delete the schedule from the internal lists + m.schedules = append(m.schedules[:i], m.schedules[i+1:]...) + + for j, flow := range m.flows { + if flow.OperationId == schedule.OperationId { + m.flows = append(m.flows[:j], m.flows[j+1:]...) + break + } + } + + for j, task := range m.tasks { + if task.TaskMeta.TaskID == schedule.ScheduleID { + m.tasks = append(m.tasks[:j], m.tasks[j+1:]...) + break + } + } + + return m.saveToFile() + } + } + + return errors.New("schedule not found") +} + +// runTasks executes the tasks associated with a schedule. +func (m *FileScheduleManager) RunTasks(tasks []models.DataTask) { + for _, task := range tasks { + // Call the handleTask function to process the task + task.Status = handleTask(task.ServiceType, task.TaskType, task) + m.updateTaskStatus(task) + + } + err := m.saveToFile() + if err != nil { + fmt.Printf("Error saving tasks to file: %v\n", err) + } +} + +// handleTask is a function that processes a task based on its ServiceType and TaskType. +func handleTask(serviceType models.CloudServiceType, taskType models.TaskType, params models.DataTask) models.Status { + + var taskStatus models.Status + + switch serviceType { + + case "objectStorage": + switch taskType { + case "generate": + taskStatus = handleGenTest(params) + case "migrate": + taskStatus = handleObjectStorageMigrateTask(params) + case "backup": + taskStatus = handleObjectStorageBackupTask(params) + case "restore": + taskStatus = handleObjectStorageRestoreTask(params) + default: + fmt.Printf("Error: Unknown TaskType: %s for ServiceType: %s\n", taskType, serviceType) + taskStatus = models.StatusFailed + } + case "rdbms": + switch taskType { + case "generate": + taskStatus = handleGenTest(params) + case "migrate": + taskStatus = handleRDBMSMigrateTask(params) + case "backup": + taskStatus = handleRDBMSBackupTask(params) + case "restore": + taskStatus = handleRDBMSRestoreTask(params) + default: + fmt.Printf("Error: Unknown TaskType: %s for ServiceType: %s\n", taskType, serviceType) + taskStatus = models.StatusFailed + + } + case "nrdbms": + switch taskType { + case "generate": + taskStatus = handleGenTest(params) + case "migrate": + taskStatus = handleNRDBMSMigrateTask(params) + case "backup": + taskStatus = handleNRDBMSBackupTask(params) + case "restore": + taskStatus = handleNRDBMSRestoreTask(params) + default: + fmt.Printf("Error: Unknown TaskType: %s for ServiceType: %s\n", taskType, serviceType) + taskStatus = models.StatusFailed + + } + default: + fmt.Printf("Error: Unknown ServiceType: %s\n", serviceType) + taskStatus = models.StatusFailed + + } + + return taskStatus +} + +func handleGenTest(params models.DataTask) models.Status { + logrus.Infof("Handling object storage Gen task") + _ = params + var cParams models.CommandTask + cParams.SizeServerSQL = "1" + cParams.DummyPath = "./tmp/Schedule/dummy" + execfunc.DummyCreate(cParams) + return models.StatusCompleted } -func (s *TaskService) GetFlow(flowID string) (Flow, error) { - s.mu.Lock() - defer s.mu.Unlock() - flow, exists := s.flows[flowID] - if !exists { - return Flow{}, errors.New("flow not found") +func handleObjectStorageMigrateTask(params models.DataTask) models.Status { + fmt.Println("Handling object storage migrate task") + + var src *osc.OSController + var srcErr error + var dst *osc.OSController + var dstErr error + + logrus.Infof("Source Information") + src, srcErr = auth.GetOS(¶ms.SourcePoint) + if srcErr != nil { + logrus.Errorf("OSController error migration into objectstorage : %v", srcErr) + return models.StatusFailed + } + logrus.Infof("Target Information") + dst, dstErr = auth.GetOS(¶ms.TargetPoint) + if dstErr != nil { + logrus.Errorf("OSController error migration into objectstorage : %v", dstErr) + return models.StatusFailed + } + + logrus.Info("Launch OSController Copy") + if err := src.Copy(dst); err != nil { + logrus.Errorf("Copy error copying into objectstorage : %v", err) + return models.StatusFailed + } + logrus.Info("successfully migrationed") + return models.StatusCompleted +} + +func handleObjectStorageBackupTask(params models.DataTask) models.Status { + fmt.Println("Handling object storage backup task") + var OSC *osc.OSController + var err error + logrus.Infof("User Information") + OSC, err = auth.GetOS(¶ms.TargetPoint) + if err != nil { + logrus.Errorf("OSController error importing into objectstorage : %v", err) + return models.StatusFailed + } + + logrus.Info("Launch OSController MGet") + if err := OSC.MGet(params.Directory); err != nil { + logrus.Errorf("MGet error exporting into objectstorage : %v", err) + return models.StatusFailed + } + logrus.Infof("successfully backup : %s", params.Directory) + return models.StatusCompleted +} + +func handleObjectStorageRestoreTask(params models.DataTask) models.Status { + fmt.Println("Handling object storage restore task") + var OSC *osc.OSController + var err error + logrus.Infof("User Information") + OSC, err = auth.GetOS(¶ms.TargetPoint) + if err != nil { + logrus.Errorf("OSController error importing into objectstorage : %v", err) + return models.StatusFailed + } + + logrus.Info("Launch OSController MGet") + if err := OSC.MPut(params.SourcePoint.Path); err != nil { + logrus.Errorf("MPut error importing into objectstorage : %v", err) + return models.StatusFailed + } + logrus.Infof("successfully restore : %s", params.Directory) + return models.StatusCompleted +} + +func handleRDBMSMigrateTask(params models.DataTask) models.Status { + fmt.Println("Handling RDBMS migrate task") + var srcRDBC *rdbc.RDBController + var srcErr error + var dstRDBC *rdbc.RDBController + var dstErr error + logrus.Infof("Source Information") + srcRDBC, srcErr = auth.GetRDMS(¶ms.SourcePoint) + if srcErr != nil { + logrus.Errorf("RDBController error migration into rdbms : %v", srcErr) + return models.StatusFailed + } + logrus.Infof("Target Information") + dstRDBC, dstErr = auth.GetRDMS(¶ms.TargetPoint) + if dstErr != nil { + logrus.Errorf("RDBController error migration into rdbms : %v", dstErr) + return models.StatusFailed + } + + logrus.Info("Launch RDBController Copy") + if err := srcRDBC.Copy(dstRDBC); err != nil { + logrus.Errorf("Copy error copying into rdbms : %v", err) + return models.StatusFailed + } + logrus.Info("successfully migrationed") + return models.StatusCompleted + +} + +func handleRDBMSBackupTask(params models.DataTask) models.Status { + fmt.Println("Handling RDBMS backup task") + var RDBC *rdbc.RDBController + var err error + logrus.Infof("User Information") + RDBC, err = auth.GetRDMS(¶ms.TargetPoint) + if err != nil { + logrus.Errorf("RDBController error importing into rdbms : %v", err) + return models.StatusFailed + } + + err = os.MkdirAll(params.Directory, 0755) + if err != nil { + logrus.Errorf("MkdirAll error : %v", err) + return models.StatusFailed + } + + dbList := []string{} + if err := RDBC.ListDB(&dbList); err != nil { + logrus.Errorf("ListDB error : %v", err) + return models.StatusFailed + } + + var sqlData string + for _, db := range dbList { + sqlData = "" + logrus.Infof("Export start: %s", db) + if err := RDBC.Get(db, &sqlData); err != nil { + logrus.Errorf("Get error : %v", err) + return models.StatusFailed + } + + file, err := os.Create(filepath.Join(params.Directory, fmt.Sprintf("%s.sql", db))) + if err != nil { + logrus.Errorf("File create error : %v", err) + return models.StatusFailed + } + defer file.Close() + + _, err = file.WriteString(sqlData) + if err != nil { + logrus.Errorf("File write error : %v", err) + return models.StatusFailed + } + logrus.Infof("successfully exported : %s", file.Name()) + file.Close() + } + logrus.Infof("successfully backup : %s", params.Directory) + return models.StatusCompleted + +} + +func handleRDBMSRestoreTask(params models.DataTask) models.Status { + fmt.Println("Handling RDBMS restore task") + var RDBC *rdbc.RDBController + var err error + logrus.Infof("User Information") + RDBC, err = auth.GetRDMS(¶ms.TargetPoint) + if err != nil { + logrus.Errorf("RDBController error importing into rdbms : %v", err) + return models.StatusFailed + } + + sqlList := []string{} + err = filepath.Walk(params.Directory, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + if filepath.Ext(path) == ".sql" { + sqlList = append(sqlList, path) + } + return nil + }) + if err != nil { + logrus.Errorf("Walk error : %v", err) + return models.StatusFailed + } + + for _, sqlPath := range sqlList { + data, err := os.ReadFile(sqlPath) + if err != nil { + logrus.Errorf("ReadFile error : %v", err) + return models.StatusFailed + } + logrus.Infof("Import start: %s", sqlPath) + if err := RDBC.Put(string(data)); err != nil { + logrus.Error("Put error importing into rdbms") + return models.StatusFailed + } + logrus.Infof("Import success: %s", sqlPath) + } + logrus.Infof("successfully restore : %s", params.Directory) + return models.StatusCompleted + +} + +func handleNRDBMSMigrateTask(params models.DataTask) models.Status { + fmt.Println("Handling NRDBMS migrate task") + var srcNRDBC *nrdbc.NRDBController + var srcErr error + var dstNRDBC *nrdbc.NRDBController + var dstErr error + logrus.Infof("Source Information") + srcNRDBC, srcErr = auth.GetNRDMS(¶ms.SourcePoint) + if srcErr != nil { + logrus.Errorf("NRDBController error migration into nrdbms : %v", srcErr) + return models.StatusFailed + } + logrus.Infof("Target Information") + dstNRDBC, dstErr = auth.GetNRDMS(¶ms.TargetPoint) + if dstErr != nil { + logrus.Errorf("NRDBController error migration into nrdbms : %v", dstErr) + return models.StatusFailed } - return flow, nil + + logrus.Info("Launch NRDBController Copy") + if err := srcNRDBC.Copy(dstNRDBC); err != nil { + logrus.Errorf("Copy error copying into nrdbms : %v", err) + return models.StatusFailed + } + logrus.Info("successfully migrationed") + return models.StatusCompleted + } -func (s *TaskService) GetFlowList() []Flow { - s.mu.Lock() - defer s.mu.Unlock() - flowList := make([]Flow, 0, len(s.flows)) - for _, flow := range s.flows { - flowList = append(flowList, flow) +func handleNRDBMSBackupTask(params models.DataTask) models.Status { + fmt.Println("Handling NRDBMS backup task") + var NRDBC *nrdbc.NRDBController + var err error + NRDBC, err = auth.GetNRDMS(¶ms.TargetPoint) + if err != nil { + logrus.Errorf("NRDBController error importing into nrdbms : %v", err) + return models.StatusFailed + } + + tableList, err := NRDBC.ListTables() + if err != nil { + logrus.Infof("ListTables error : %v", err) + return models.StatusFailed + } + + if !utils.FileExists(params.Directory) { + logrus.Infof("directory does not exist") + logrus.Infof("Make Directory") + err = os.MkdirAll(params.Directory, 0755) + if err != nil { + logrus.Infof("Make Failed 0755 : %s", params.Directory) + return models.StatusFailed + } + } + + var dstData []map[string]interface{} + for _, table := range tableList { + logrus.Infof("Export start: %s", table) + dstData = []map[string]interface{}{} + + if err := NRDBC.Get(table, &dstData); err != nil { + logrus.Errorf("Get error : %v", err) + return models.StatusFailed + } + + file, err := os.Create(filepath.Join(params.Directory, fmt.Sprintf("%s.json", table))) + if err != nil { + logrus.Errorf("File create error : %v", err) + return models.StatusFailed + } + defer file.Close() + + encoder := json.NewEncoder(file) + encoder.SetIndent("", " ") + if err := encoder.Encode(dstData); err != nil { + logrus.Errorf("data encoding error : %v", err) + return models.StatusFailed + } + logrus.Infof("successfully create File : %s", file.Name()) } - return flowList + logrus.Infof("successfully backup to : %s", params.Directory) + return models.StatusCompleted + } -func (s *TaskService) CreateSchedule(schedule Schedule) { - s.mu.Lock() - defer s.mu.Unlock() - s.schedules[schedule.ScheduleID] = schedule +func handleNRDBMSRestoreTask(params models.DataTask) models.Status { + fmt.Println("Handling NRDBMS restore task") + var NRDBC *nrdbc.NRDBController + var err error + NRDBC, err = auth.GetNRDMS(¶ms.TargetPoint) + if err != nil { + logrus.Errorf("NRDBController error importing into nrdbms : %v", err) + return models.StatusFailed + } + + jsonList := []string{} + err = filepath.Walk(params.Directory, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + if filepath.Ext(path) == ".json" { + jsonList = append(jsonList, path) + } + return nil + }) + + if err != nil { + logrus.Errorf("Walk error : %v", err) + return models.StatusFailed + } + + var srcData []map[string]interface{} + for _, jsonFile := range jsonList { + srcData = []map[string]interface{}{} + + file, err := os.Open(jsonFile) + if err != nil { + logrus.Errorf("file open error : %v", err) + return models.StatusFailed + } + defer file.Close() + + if err := json.NewDecoder(file).Decode(&srcData); err != nil { + logrus.Errorf("file decoding error : %v", err) + return models.StatusFailed + } + + fileName := filepath.Base(jsonFile) + tableName := fileName[:len(fileName)-len(filepath.Ext(fileName))] + + logrus.Infof("Import start: %s", fileName) + if err := NRDBC.Put(tableName, &srcData); err != nil { + logrus.Error("Put error importing into nrdbms") + return models.StatusFailed + } + logrus.Infof("successfully Restore : %s", params.Directory) + } + return models.StatusCompleted + } -func (s *TaskService) GetSchedule(scheduleID string) (Schedule, error) { - s.mu.Lock() - defer s.mu.Unlock() - schedule, exists := s.schedules[scheduleID] - if !exists { - return Schedule{}, errors.New("schedule not found") +// Facade function to create a new schedule and manage it. +func (m *FileScheduleManager) CreateAndStartSchedule(schedule models.Schedule) error { + m.StopScheduler() + defer m.StartScheduler() + + if err := m.CreateSchedule(schedule); err != nil { + return err } - return schedule, nil + + return nil } -func (s *TaskService) GetScheduleList() []Schedule { - s.mu.Lock() - defer s.mu.Unlock() - scheduleList := make([]Schedule, 0, len(s.schedules)) - for _, item := range s.schedules { - scheduleList = append(scheduleList, item) +// Facade function to update a schedule. +func (m *FileScheduleManager) UpdateAndRestartSchedule(scheduleID string, updatedSchedule models.Schedule) error { + m.StopScheduler() + defer m.StartScheduler() + + if err := m.UpdateSchedule(scheduleID, updatedSchedule); err != nil { + return err + } + + return nil +} + +// Facade function to delete a schedule. +func (m *FileScheduleManager) DeleteAndRestartScheduler(scheduleID string) error { + m.StopScheduler() + defer m.StartScheduler() + + if err := m.DeleteSchedule(scheduleID); err != nil { + return err + } + + return nil +} + +// updateTaskStatus updates the status of the task in the internal data structure +func (m *FileScheduleManager) updateTaskStatus(task models.DataTask) { + m.mu.Lock() + defer m.mu.Unlock() + + // Iterate over the slice to find the task by TaskID + for i, existingTask := range m.tasks { + if existingTask.TaskMeta.TaskID == task.TaskMeta.TaskID { + // Update the status of the task + m.tasks[i].Status = task.Status + return + } } - return scheduleList } diff --git a/task_err.json b/task_err.json new file mode 100644 index 0000000..d1ae9fb --- /dev/null +++ b/task_err.json @@ -0,0 +1,157 @@ +{ + "tasks": [ + { + "operationId": "operation-12345", + "meta": { + "serviceType": "rdbms", + "taskType": "backup", + "taskId": "operation-1234121235-schedule-20240910-204804-task-0-20240910-211114", + "taskName": "Database Backup Task", + "description": "Backup the main database" + }, + "Directory": "/backup/dir", + "sourcePoint": { + "provider": "", + "region": "", + "profileName": "", + "path": "/source/db", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "", + "region": "", + "profileName": "", + "path": "/backup/db", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + } + }, + { + "operationId": "operation-12345", + "meta": { + "serviceType": "rdbms", + "taskType": "backup", + "taskId": "operation-1234121235-task-0-20240910-211214", + "taskName": "Database Backup Task", + "description": "Backup the main database" + }, + "Directory": "/backup/dir", + "sourcePoint": { + "provider": "", + "region": "", + "profileName": "", + "path": "/source/db", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + }, + "targetPoint": { + "provider": "", + "region": "", + "profileName": "", + "path": "/backup/db", + "bucket": "", + "endpoint": "", + "host": "", + "port": "", + "username": "", + "password": "", + "databaseName": "", + "databaseId": "", + "projectId": "" + } + } + ], + "flows": [], + "schedules": [ + { + "operationId": "operation-12345", + "tag": [ + "backup", + "weekly" + ], + "ScheduleID": "operation-12345-schedule-20240910-204402", + "ScheduleName": "", + "tasks": [ + { + "Directory": "/backup/dir", + "meta": { + "description": "Backup the main database", + "serviceType": "rdbms", + "taskId": "task-1", + "taskName": "Database Backup Task", + "taskType": "backup" + }, + "operationId": "operation-12345", + "sourcePoint": { + "Path": "/source/db" + }, + "targetPoint": { + "Path": "/backup/db" + } + } + ], + "cron": "0 0 * * 0", + "tz": "UTC", + "status": "active" + }, + { + "operationId": "operation-12345", + "tag": [ + "backup", + "weekly" + ], + "ScheduleID": "operation-12345-schedule-20240910-204756", + "ScheduleName": "", + "tasks": [ + { + "Directory": "/backup/dir", + "meta": { + "description": "Backup the main database", + "serviceType": "rdbms", + "taskId": "task-1", + "taskName": "Database Backup Task", + "taskType": "backup" + }, + "operationId": "operation-12345", + "sourcePoint": { + "Path": "/source/db" + }, + "targetPoint": { + "Path": "/backup/db" + } + } + ], + "cron": "0 0 * * 0", + "tz": "UTC", + "status": "active" + }, + { + "operationId": "operation-1234121235", + "tag": [ + "backup", + "weekly" + ], + "ScheduleID": "operation-1234121235-schedule-20240910-211214", + \ No newline at end of file diff --git a/websrc/controllers/taskHandlers.go b/websrc/controllers/taskHandlers.go new file mode 100644 index 0000000..3c09647 --- /dev/null +++ b/websrc/controllers/taskHandlers.go @@ -0,0 +1,180 @@ +/* +Copyright 2023 The Cloud-Barista Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package controllers + +import ( + "net/http" + "time" + + "github.com/cloud-barista/mc-data-manager/models" + "github.com/cloud-barista/mc-data-manager/service/task" + "github.com/labstack/echo/v4" + "github.com/sirupsen/logrus" +) + +// TaskController is a struct that holds a reference to the TaskService +type TaskController struct { + TaskService *task.FileScheduleManager +} + +// GetAllTasksHandler godoc +// +// @Summary Get all Tasks +// @Description Retrieve a list of all Tasks in the system. +// @Tags [Task] +// @Produce json +// @Success 200 {array} models.Task "Successfully retrieved all Tasks" +// @Failure 500 {object} models.BasicResponse "Internal Server Error" +// @Router /task [get] +func (tc *TaskController) GetAllTasksHandler(ctx echo.Context) error { + tasks, err := tc.TaskService.GetScheduleList() + if err != nil { + errStr := err.Error() + return ctx.JSON(http.StatusInternalServerError, models.BasicResponse{ + Result: "Failed to retrieve tasks", + Error: &errStr, + }) + } + + return ctx.JSON(http.StatusOK, tasks) +} + +// CreateTaskHandler godoc +// +// @Summary Create a new Task +// @Description Create a new Task and store it in the system. +// @Tags [Task] +// @Accept json +// @Produce json +// @Param RequestBody body models.Schedule true "Parameters required for creating a Task" +// @Success 200 {object} models.BasicResponse "Successfully created a Task" +// @Failure 500 {object} models.BasicResponse "Internal Server Error" +// @Router /task [post] +func (tc *TaskController) CreateTaskHandler(ctx echo.Context) error { + start := time.Now() + logger, logstrings := pageLogInit("Create-task", "Creating a new task", start) + params := models.Schedule{} + if !getDataWithReBind(logger, start, ctx, ¶ms) { + errStr := "Invalid request data" + return ctx.JSON(http.StatusBadRequest, models.BasicResponse{ + Result: logstrings.String(), + Error: &errStr, + }) + } + logrus.Infof("parasm : %+v", params) + if err := tc.TaskService.CreateSchedule(params); err != nil { + errStr := err.Error() + return ctx.JSON(http.StatusInternalServerError, models.BasicResponse{ + Result: logstrings.String(), + Error: &errStr, + }) + } + + return ctx.JSON(http.StatusOK, models.BasicResponse{ + Result: "Task created successfully", + Error: nil, + }) +} + +// GetTaskHandler godoc +// +// @Summary Get a Task by ID +// @Description Get the details of a Task using its ID. +// @Tags [Task] +// @Produce json +// @Param id path string true "Task ID" +// @Success 200 {object} models.Task "Successfully retrieved a Task" +// @Failure 404 {object} models.BasicResponse "Task not found" +// @Router /task/{id} [get] +func (tc *TaskController) GetTaskHandler(ctx echo.Context) error { + id := ctx.Param("id") + task, err := tc.TaskService.GetSchedule(id) + if err != nil { + errStr := err.Error() + return ctx.JSON(http.StatusNotFound, models.BasicResponse{ + Result: "Task not found", + Error: &errStr, + }) + } + + return ctx.JSON(http.StatusOK, task) +} + +// UpdateTaskHandler godoc +// +// @Summary Update an existing Task +// @Description Update the details of an existing Task using its ID. +// @Tags [Task] +// @Accept json +// @Produce json +// @Param id path string true "Task ID" +// @Param RequestBody body models.Schedule true "Parameters required for updating a Task" +// @Success 200 {object} models.BasicResponse "Successfully updated the Task" +// @Failure 404 {object} models.BasicResponse "Task not found" +// @Failure 500 {object} models.BasicResponse "Internal Server Error" +// @Router /task/{id} [put] +func (tc *TaskController) UpdateTaskHandler(ctx echo.Context) error { + start := time.Now() + logger, logstrings := pageLogInit("Update-task", "Updating an existing task", start) + id := ctx.Param("id") + params := models.Schedule{} + if !getDataWithReBind(logger, start, ctx, ¶ms) { + errStr := "Invalid request data" + return ctx.JSON(http.StatusBadRequest, models.BasicResponse{ + Result: logstrings.String(), + Error: &errStr, + }) + } + + if err := tc.TaskService.UpdateSchedule(id, params); err != nil { + errStr := err.Error() + return ctx.JSON(http.StatusInternalServerError, models.BasicResponse{ + Result: logstrings.String(), + Error: &errStr, + }) + } + + return ctx.JSON(http.StatusOK, models.BasicResponse{ + Result: "Task updated successfully", + Error: nil, + }) +} + +// DeleteTaskHandler godoc +// +// @Summary Delete a Task +// @Description Delete an existing Task using its ID. +// @Tags [Task] +// @Produce json +// @Param id path string true "Task ID" +// @Success 200 {object} models.BasicResponse "Successfully deleted the Task" +// @Failure 404 {object} models.BasicResponse "Task not found" +// @Router /task/{id} [delete] +func (tc *TaskController) DeleteTaskHandler(ctx echo.Context) error { + id := ctx.Param("id") + if err := tc.TaskService.DeleteSchedule(id); err != nil { + errStr := "Task not found" + return ctx.JSON(http.StatusNotFound, models.BasicResponse{ + Result: "Task not found", + Error: &errStr, + }) + } + + return ctx.JSON(http.StatusOK, models.BasicResponse{ + Result: "Task deleted successfully", + Error: nil, + }) +} diff --git a/websrc/docs/docs.go b/websrc/docs/docs.go index f70ce02..c334046 100644 --- a/websrc/docs/docs.go +++ b/websrc/docs/docs.go @@ -1774,6 +1774,192 @@ const docTemplate = `{ } } } + }, + "/task": { + "get": { + "description": "Retrieve a list of all Tasks in the system.", + "produces": [ + "application/json" + ], + "tags": [ + "[Task]" + ], + "summary": "Get all Tasks", + "responses": { + "200": { + "description": "Successfully retrieved all Tasks", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/models.Task" + } + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + } + } + }, + "post": { + "description": "Create a new Task and store it in the system.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Task]" + ], + "summary": "Create a new Task", + "parameters": [ + { + "description": "Parameters required for creating a Task", + "name": "RequestBody", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/models.Schedule" + } + } + ], + "responses": { + "200": { + "description": "Successfully created a Task", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + } + } + } + }, + "/task/{id}": { + "get": { + "description": "Get the details of a Task using its ID.", + "produces": [ + "application/json" + ], + "tags": [ + "[Task]" + ], + "summary": "Get a Task by ID", + "parameters": [ + { + "type": "string", + "description": "Task ID", + "name": "id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Successfully retrieved a Task", + "schema": { + "$ref": "#/definitions/models.Task" + } + }, + "404": { + "description": "Task not found", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + } + } + }, + "put": { + "description": "Update the details of an existing Task using its ID.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Task]" + ], + "summary": "Update an existing Task", + "parameters": [ + { + "type": "string", + "description": "Task ID", + "name": "id", + "in": "path", + "required": true + }, + { + "description": "Parameters required for updating a Task", + "name": "RequestBody", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/models.Schedule" + } + } + ], + "responses": { + "200": { + "description": "Successfully updated the Task", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + }, + "404": { + "description": "Task not found", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + } + } + }, + "delete": { + "description": "Delete an existing Task using its ID.", + "produces": [ + "application/json" + ], + "tags": [ + "[Task]" + ], + "summary": "Delete a Task", + "parameters": [ + { + "type": "string", + "description": "Task ID", + "name": "id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Successfully deleted the Task", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + }, + "404": { + "description": "Task not found", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + } + } + } } }, "definitions": { @@ -1783,6 +1969,12 @@ const docTemplate = `{ "operationId": { "type": "string" }, + "tag": { + "type": "array", + "items": { + "type": "string" + } + }, "targetPoint": { "$ref": "#/definitions/models.GenTaskTarget" } @@ -1827,6 +2019,20 @@ const docTemplate = `{ } } }, + "models.DataTask": { + "type": "object", + "properties": { + "operationId": { + "type": "string" + }, + "sourcePoint": { + "$ref": "#/definitions/models.ProviderConfig" + }, + "targetPoint": { + "$ref": "#/definitions/models.ProviderConfig" + } + } + }, "models.GenTaskTarget": { "type": "object", "properties": { @@ -1988,6 +2194,72 @@ const docTemplate = `{ "$ref": "#/definitions/models.ProviderConfig" } } + }, + "models.Schedule": { + "type": "object", + "properties": { + "ScheduleID": { + "type": "string" + }, + "ScheduleName": { + "type": "string" + }, + "cron": { + "type": "string" + }, + "operationId": { + "type": "string" + }, + "status": { + "$ref": "#/definitions/models.Status" + }, + "tag": { + "type": "array", + "items": { + "type": "string" + } + }, + "tasks": { + "type": "array", + "items": { + "$ref": "#/definitions/models.DataTask" + } + }, + "tz": { + "type": "string" + } + } + }, + "models.Status": { + "type": "string", + "enum": [ + "active", + "inactive", + "pending", + "completed", + "failed" + ], + "x-enum-varnames": [ + "StatusActive", + "StatusInactive", + "StatusPending", + "StatusCompleted", + "StatusFailed" + ] + }, + "models.Task": { + "type": "object", + "properties": { + "operationId": { + "type": "string" + }, + "tag": { + "type": "array", + "items": { + "type": "string" + } + } + } } } }` diff --git a/websrc/docs/swagger.json b/websrc/docs/swagger.json index 40282b3..eb82245 100644 --- a/websrc/docs/swagger.json +++ b/websrc/docs/swagger.json @@ -1767,6 +1767,192 @@ } } } + }, + "/task": { + "get": { + "description": "Retrieve a list of all Tasks in the system.", + "produces": [ + "application/json" + ], + "tags": [ + "[Task]" + ], + "summary": "Get all Tasks", + "responses": { + "200": { + "description": "Successfully retrieved all Tasks", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/models.Task" + } + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + } + } + }, + "post": { + "description": "Create a new Task and store it in the system.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Task]" + ], + "summary": "Create a new Task", + "parameters": [ + { + "description": "Parameters required for creating a Task", + "name": "RequestBody", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/models.Schedule" + } + } + ], + "responses": { + "200": { + "description": "Successfully created a Task", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + } + } + } + }, + "/task/{id}": { + "get": { + "description": "Get the details of a Task using its ID.", + "produces": [ + "application/json" + ], + "tags": [ + "[Task]" + ], + "summary": "Get a Task by ID", + "parameters": [ + { + "type": "string", + "description": "Task ID", + "name": "id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Successfully retrieved a Task", + "schema": { + "$ref": "#/definitions/models.Task" + } + }, + "404": { + "description": "Task not found", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + } + } + }, + "put": { + "description": "Update the details of an existing Task using its ID.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Task]" + ], + "summary": "Update an existing Task", + "parameters": [ + { + "type": "string", + "description": "Task ID", + "name": "id", + "in": "path", + "required": true + }, + { + "description": "Parameters required for updating a Task", + "name": "RequestBody", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/models.Schedule" + } + } + ], + "responses": { + "200": { + "description": "Successfully updated the Task", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + }, + "404": { + "description": "Task not found", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + } + } + }, + "delete": { + "description": "Delete an existing Task using its ID.", + "produces": [ + "application/json" + ], + "tags": [ + "[Task]" + ], + "summary": "Delete a Task", + "parameters": [ + { + "type": "string", + "description": "Task ID", + "name": "id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Successfully deleted the Task", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + }, + "404": { + "description": "Task not found", + "schema": { + "$ref": "#/definitions/models.BasicResponse" + } + } + } + } } }, "definitions": { @@ -1776,6 +1962,12 @@ "operationId": { "type": "string" }, + "tag": { + "type": "array", + "items": { + "type": "string" + } + }, "targetPoint": { "$ref": "#/definitions/models.GenTaskTarget" } @@ -1820,6 +2012,20 @@ } } }, + "models.DataTask": { + "type": "object", + "properties": { + "operationId": { + "type": "string" + }, + "sourcePoint": { + "$ref": "#/definitions/models.ProviderConfig" + }, + "targetPoint": { + "$ref": "#/definitions/models.ProviderConfig" + } + } + }, "models.GenTaskTarget": { "type": "object", "properties": { @@ -1981,6 +2187,72 @@ "$ref": "#/definitions/models.ProviderConfig" } } + }, + "models.Schedule": { + "type": "object", + "properties": { + "ScheduleID": { + "type": "string" + }, + "ScheduleName": { + "type": "string" + }, + "cron": { + "type": "string" + }, + "operationId": { + "type": "string" + }, + "status": { + "$ref": "#/definitions/models.Status" + }, + "tag": { + "type": "array", + "items": { + "type": "string" + } + }, + "tasks": { + "type": "array", + "items": { + "$ref": "#/definitions/models.DataTask" + } + }, + "tz": { + "type": "string" + } + } + }, + "models.Status": { + "type": "string", + "enum": [ + "active", + "inactive", + "pending", + "completed", + "failed" + ], + "x-enum-varnames": [ + "StatusActive", + "StatusInactive", + "StatusPending", + "StatusCompleted", + "StatusFailed" + ] + }, + "models.Task": { + "type": "object", + "properties": { + "operationId": { + "type": "string" + }, + "tag": { + "type": "array", + "items": { + "type": "string" + } + } + } } } } \ No newline at end of file diff --git a/websrc/docs/swagger.yaml b/websrc/docs/swagger.yaml index 452146d..87543fa 100644 --- a/websrc/docs/swagger.yaml +++ b/websrc/docs/swagger.yaml @@ -4,6 +4,10 @@ definitions: properties: operationId: type: string + tag: + items: + type: string + type: array targetPoint: $ref: '#/definitions/models.GenTaskTarget' type: object @@ -32,6 +36,15 @@ definitions: Result: type: string type: object + models.DataTask: + properties: + operationId: + type: string + sourcePoint: + $ref: '#/definitions/models.ProviderConfig' + targetPoint: + $ref: '#/definitions/models.ProviderConfig' + type: object models.GenTaskTarget: properties: bucket: @@ -139,6 +152,52 @@ definitions: targetPoint: $ref: '#/definitions/models.ProviderConfig' type: object + models.Schedule: + properties: + ScheduleID: + type: string + ScheduleName: + type: string + cron: + type: string + operationId: + type: string + status: + $ref: '#/definitions/models.Status' + tag: + items: + type: string + type: array + tasks: + items: + $ref: '#/definitions/models.DataTask' + type: array + tz: + type: string + type: object + models.Status: + enum: + - active + - inactive + - pending + - completed + - failed + type: string + x-enum-varnames: + - StatusActive + - StatusInactive + - StatusPending + - StatusCompleted + - StatusFailed + models.Task: + properties: + operationId: + type: string + tag: + items: + type: string + type: array + type: object info: contact: email: contact-to-cloud-barista@googlegroups.com @@ -1301,4 +1360,127 @@ paths: tags: - '[Data Import]' - '[RDBMS]' + /task: + get: + description: Retrieve a list of all Tasks in the system. + produces: + - application/json + responses: + "200": + description: Successfully retrieved all Tasks + schema: + items: + $ref: '#/definitions/models.Task' + type: array + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/models.BasicResponse' + summary: Get all Tasks + tags: + - '[Task]' + post: + consumes: + - application/json + description: Create a new Task and store it in the system. + parameters: + - description: Parameters required for creating a Task + in: body + name: RequestBody + required: true + schema: + $ref: '#/definitions/models.Schedule' + produces: + - application/json + responses: + "200": + description: Successfully created a Task + schema: + $ref: '#/definitions/models.BasicResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/models.BasicResponse' + summary: Create a new Task + tags: + - '[Task]' + /task/{id}: + delete: + description: Delete an existing Task using its ID. + parameters: + - description: Task ID + in: path + name: id + required: true + type: string + produces: + - application/json + responses: + "200": + description: Successfully deleted the Task + schema: + $ref: '#/definitions/models.BasicResponse' + "404": + description: Task not found + schema: + $ref: '#/definitions/models.BasicResponse' + summary: Delete a Task + tags: + - '[Task]' + get: + description: Get the details of a Task using its ID. + parameters: + - description: Task ID + in: path + name: id + required: true + type: string + produces: + - application/json + responses: + "200": + description: Successfully retrieved a Task + schema: + $ref: '#/definitions/models.Task' + "404": + description: Task not found + schema: + $ref: '#/definitions/models.BasicResponse' + summary: Get a Task by ID + tags: + - '[Task]' + put: + consumes: + - application/json + description: Update the details of an existing Task using its ID. + parameters: + - description: Task ID + in: path + name: id + required: true + type: string + - description: Parameters required for updating a Task + in: body + name: RequestBody + required: true + schema: + $ref: '#/definitions/models.Schedule' + produces: + - application/json + responses: + "200": + description: Successfully updated the Task + schema: + $ref: '#/definitions/models.BasicResponse' + "404": + description: Task not found + schema: + $ref: '#/definitions/models.BasicResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/models.BasicResponse' + summary: Update an existing Task + tags: + - '[Task]' swagger: "2.0" diff --git a/websrc/routes/backupRoutes.go b/websrc/routes/backupRoutes.go index 5dc53f7..38d2c60 100644 --- a/websrc/routes/backupRoutes.go +++ b/websrc/routes/backupRoutes.go @@ -23,8 +23,6 @@ import ( func BackupRoutes(g *echo.Group) { // Backup URL BackupRoot(g) - // Backup From On-premise (Linux, Windows) to Object Storage - BackupFromOnpremiseToObjectStorage(g) // Backup OBJ storage to linux BackupObjectStorage(g) @@ -32,39 +30,12 @@ func BackupRoutes(g *echo.Group) { BackupRDB(g) BackupNRDB(g) - // Backup From Object Storage to Other Object Storage - BackupFromS3Routes(g) - BackupFromGCPRoutes(g) - BackupFromNCPRoutes(g) - - // Backup No-SQL to the other No-SQL - BackupNoSQLRoutes(g) } func BackupRoot(g *echo.Group) { g.GET("", controllers.BackupHandler) } -func BackupFromOnpremiseToObjectStorage(g *echo.Group) { - g.GET("/linux/aws", controllers.MigrationLinuxToS3GetHandler) - g.POST("/linux/aws", controllers.MigrationLinuxToS3PostHandler) - - g.GET("/linux/gcp", controllers.MigrationLinuxToGCPGetHandler) - g.POST("/linux/gcp", controllers.MigrationLinuxToGCPPostHandler) - - g.GET("/linux/ncp", controllers.MigrationLinuxToNCPGetHandler) - g.POST("/linux/ncp", controllers.MigrationLinuxToNCPPostHandler) - - g.GET("/windows/aws", controllers.MigrationWindowsToS3GetHandler) - g.POST("/windows/aws", controllers.MigrationWindowsToS3PostHandler) - - g.GET("/windows/gcp", controllers.MigrationWindowsToGCPGetHandler) - g.POST("/windows/gcp", controllers.MigrationWindowsToGCPPostHandler) - - g.GET("/windows/ncp", controllers.MigrationWindowsToNCPGetHandler) - g.POST("/windows/ncp", controllers.MigrationWindowsToNCPPostHandler) -} - func BackupObjectStorage(g *echo.Group) { // g.GET("/objectstorage", controllers.BackupOSGetHandler) g.POST("/objectstorage", controllers.BackupOSPostHandler) @@ -78,65 +49,3 @@ func BackupNRDB(g *echo.Group) { // g.GET("/nrdb", controllers.BackupNRDBGetHandler) g.POST("/nrdb", controllers.BackupNRDBPostHandler) } - -func BackupFromS3Routes(g *echo.Group) { - g.GET("/aws/linux", controllers.MigrationS3ToLinuxGetHandler) - g.POST("/aws/linux", controllers.MigrationS3ToLinuxPostHandler) - - g.GET("/aws/windows", controllers.MigrationS3ToWindowsGetHandler) - g.POST("/aws/windows", controllers.MigrationS3ToWindowsPostHandler) - - g.GET("/aws/gcp", controllers.MigrationS3ToGCPGetHandler) - g.POST("/aws/gcp", controllers.MigrationS3ToGCPPostHandler) - - g.GET("/aws/ncp", controllers.MigrationS3ToNCPGetHandler) - g.POST("/aws/ncp", controllers.MigrationS3ToNCPPostHandler) -} - -func BackupFromGCPRoutes(g *echo.Group) { - g.GET("/gcp/linux", controllers.MigrationGCPToLinuxGetHandler) - g.POST("/gcp/linux", controllers.MigrationGCPToLinuxPostHandler) - - g.GET("/gcp/windows", controllers.MigrationGCPToWindowsGetHandler) - g.POST("/gcp/windows", controllers.MigrationGCPToWindowsPostHandler) - - g.GET("/gcp/aws", controllers.MigrationGCPToS3GetHandler) - g.POST("/gcp/aws", controllers.MigrationGCPToS3PostHandler) - - g.GET("/gcp/ncp", controllers.MigrationGCPToNCPGetHandler) - g.POST("/gcp/ncp", controllers.MigrationGCPToNCPPostHandler) -} - -func BackupFromNCPRoutes(g *echo.Group) { - g.GET("/ncp/linux", controllers.MigrationNCPToLinuxGetHandler) - g.POST("/ncp/linux", controllers.MigrationNCPToLinuxPostHandler) - - g.GET("/ncp/windows", controllers.MigrationNCPToWindowsGetHandler) - g.POST("/ncp/windows", controllers.MigrationNCPToWindowsPostHandler) - - g.GET("/ncp/aws", controllers.MigrationNCPToS3GetHandler) - g.POST("/ncp/aws", controllers.MigrationNCPToS3PostHandler) - - g.GET("/ncp/gcp", controllers.MigrationNCPToGCPGetHandler) - g.POST("/ncp/gcp", controllers.MigrationNCPToGCPPostHandler) -} - -func BackupNoSQLRoutes(g *echo.Group) { - g.GET("/dynamodb/firestore", controllers.MigrationDynamoDBToFirestoreGetHandler) - g.POST("/dynamodb/firestore", controllers.MigrationDynamoDBToFirestorePostHandler) - - g.GET("/dynamodb/mongodb", controllers.MigrationDynamoDBToMongoDBGetHandler) - g.POST("/dynamodb/mongodb", controllers.MigrationDynamoDBToMongoDBPostHandler) - - g.GET("/firestore/dynamodb", controllers.MigrationFirestoreToDynamoDBGetHandler) - g.POST("/firestore/dynamodb", controllers.MigrationFirestoreToDynamoDBPostHandler) - - g.GET("/firestore/mongodb", controllers.MigrationFirestoreToMongoDBGetHandler) - g.POST("/firestore/mongodb", controllers.MigrationFirestoreToMongoDBPostHandler) - - g.GET("/mongodb/dynamodb", controllers.MigrationMongoDBToDynamoDBGetHandler) - g.POST("/mongodb/dynamodb", controllers.MigrationMongoDBToDynamoDBPostHandler) - - g.GET("/mongodb/firestore", controllers.MigrationMongoDBToFirestoreGetHandler) - g.POST("/mongodb/firestore", controllers.MigrationMongoDBToFirestorePostHandler) -} diff --git a/websrc/routes/restoreRoutes.go b/websrc/routes/restoreRoutes.go index 63f8295..1bc2b84 100644 --- a/websrc/routes/restoreRoutes.go +++ b/websrc/routes/restoreRoutes.go @@ -24,47 +24,18 @@ func RestoreRoutes(g *echo.Group) { // RestoreURL RestoreRoot(g) // RestoreFrom On-premise (Linux, Windows) to Object Storage - RestoreFromOnpremiseToObjectStorage(g) // RestoreOBJ storage to linux RestoreObjectStorage(g) // RestoreMySQL to linux RestoreRDB(g) RestoreNRDB(g) - - // RestoreFrom Object Storage to Other Object Storage - RestoreFromS3Routes(g) - RestoreFromGCPRoutes(g) - RestoreFromNCPRoutes(g) - - // RestoreNo-SQL to the other No-SQL - RestoreNoSQLRoutes(g) } func RestoreRoot(g *echo.Group) { // g.GET("", controllers.RestoreHandler) } -func RestoreFromOnpremiseToObjectStorage(g *echo.Group) { - g.GET("/linux/aws", controllers.MigrationLinuxToS3GetHandler) - g.POST("/linux/aws", controllers.MigrationLinuxToS3PostHandler) - - g.GET("/linux/gcp", controllers.MigrationLinuxToGCPGetHandler) - g.POST("/linux/gcp", controllers.MigrationLinuxToGCPPostHandler) - - g.GET("/linux/ncp", controllers.MigrationLinuxToNCPGetHandler) - g.POST("/linux/ncp", controllers.MigrationLinuxToNCPPostHandler) - - g.GET("/windows/aws", controllers.MigrationWindowsToS3GetHandler) - g.POST("/windows/aws", controllers.MigrationWindowsToS3PostHandler) - - g.GET("/windows/gcp", controllers.MigrationWindowsToGCPGetHandler) - g.POST("/windows/gcp", controllers.MigrationWindowsToGCPPostHandler) - - g.GET("/windows/ncp", controllers.MigrationWindowsToNCPGetHandler) - g.POST("/windows/ncp", controllers.MigrationWindowsToNCPPostHandler) -} - func RestoreObjectStorage(g *echo.Group) { // g.GET("/objectstorage", controllers.RestoreOSGetHandler) g.POST("/objectstorage", controllers.RestoreOSPostHandler) @@ -78,65 +49,3 @@ func RestoreNRDB(g *echo.Group) { // g.GET("/nrdb", controllers.RestoreNRDBGetHandler) g.POST("/nrdb", controllers.RestoreNRDBPostHandler) } - -func RestoreFromS3Routes(g *echo.Group) { - g.GET("/aws/linux", controllers.MigrationS3ToLinuxGetHandler) - g.POST("/aws/linux", controllers.MigrationS3ToLinuxPostHandler) - - g.GET("/aws/windows", controllers.MigrationS3ToWindowsGetHandler) - g.POST("/aws/windows", controllers.MigrationS3ToWindowsPostHandler) - - g.GET("/aws/gcp", controllers.MigrationS3ToGCPGetHandler) - g.POST("/aws/gcp", controllers.MigrationS3ToGCPPostHandler) - - g.GET("/aws/ncp", controllers.MigrationS3ToNCPGetHandler) - g.POST("/aws/ncp", controllers.MigrationS3ToNCPPostHandler) -} - -func RestoreFromGCPRoutes(g *echo.Group) { - g.GET("/gcp/linux", controllers.MigrationGCPToLinuxGetHandler) - g.POST("/gcp/linux", controllers.MigrationGCPToLinuxPostHandler) - - g.GET("/gcp/windows", controllers.MigrationGCPToWindowsGetHandler) - g.POST("/gcp/windows", controllers.MigrationGCPToWindowsPostHandler) - - g.GET("/gcp/aws", controllers.MigrationGCPToS3GetHandler) - g.POST("/gcp/aws", controllers.MigrationGCPToS3PostHandler) - - g.GET("/gcp/ncp", controllers.MigrationGCPToNCPGetHandler) - g.POST("/gcp/ncp", controllers.MigrationGCPToNCPPostHandler) -} - -func RestoreFromNCPRoutes(g *echo.Group) { - g.GET("/ncp/linux", controllers.MigrationNCPToLinuxGetHandler) - g.POST("/ncp/linux", controllers.MigrationNCPToLinuxPostHandler) - - g.GET("/ncp/windows", controllers.MigrationNCPToWindowsGetHandler) - g.POST("/ncp/windows", controllers.MigrationNCPToWindowsPostHandler) - - g.GET("/ncp/aws", controllers.MigrationNCPToS3GetHandler) - g.POST("/ncp/aws", controllers.MigrationNCPToS3PostHandler) - - g.GET("/ncp/gcp", controllers.MigrationNCPToGCPGetHandler) - g.POST("/ncp/gcp", controllers.MigrationNCPToGCPPostHandler) -} - -func RestoreNoSQLRoutes(g *echo.Group) { - g.GET("/dynamodb/firestore", controllers.MigrationDynamoDBToFirestoreGetHandler) - g.POST("/dynamodb/firestore", controllers.MigrationDynamoDBToFirestorePostHandler) - - g.GET("/dynamodb/mongodb", controllers.MigrationDynamoDBToMongoDBGetHandler) - g.POST("/dynamodb/mongodb", controllers.MigrationDynamoDBToMongoDBPostHandler) - - g.GET("/firestore/dynamodb", controllers.MigrationFirestoreToDynamoDBGetHandler) - g.POST("/firestore/dynamodb", controllers.MigrationFirestoreToDynamoDBPostHandler) - - g.GET("/firestore/mongodb", controllers.MigrationFirestoreToMongoDBGetHandler) - g.POST("/firestore/mongodb", controllers.MigrationFirestoreToMongoDBPostHandler) - - g.GET("/mongodb/dynamodb", controllers.MigrationMongoDBToDynamoDBGetHandler) - g.POST("/mongodb/dynamodb", controllers.MigrationMongoDBToDynamoDBPostHandler) - - g.GET("/mongodb/firestore", controllers.MigrationMongoDBToFirestoreGetHandler) - g.POST("/mongodb/firestore", controllers.MigrationMongoDBToFirestorePostHandler) -} diff --git a/websrc/routes/taskRoutes.go b/websrc/routes/taskRoutes.go new file mode 100644 index 0000000..6e986ca --- /dev/null +++ b/websrc/routes/taskRoutes.go @@ -0,0 +1,40 @@ +/* +Copyright 2023 The Cloud-Barista Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package routes + +import ( + "github.com/cloud-barista/mc-data-manager/service/task" + "github.com/cloud-barista/mc-data-manager/websrc/controllers" + "github.com/labstack/echo/v4" +) + +// TaskRoutes initializes the routes for the Task entity. +func TaskRoutes(g *echo.Group, scheduleManager *task.FileScheduleManager) { + TaskRoot(g, scheduleManager) +} + +// TaskRoot defines the root routes for Task related operations. +func TaskRoot(g *echo.Group, scheduleManager *task.FileScheduleManager) { + taskController := controllers.TaskController{ + TaskService: scheduleManager, + } + + g.GET("", taskController.GetAllTasksHandler) // Retrieve all tasks + g.GET("/:id", taskController.GetTaskHandler) // Retrieve a single task by ID or OperationID + g.POST("", taskController.CreateTaskHandler) // Create a new task + g.PUT("/:id", taskController.UpdateTaskHandler) // Update an existing task by ID or OperationID + g.DELETE("/:id", taskController.DeleteTaskHandler) // Delete a task by ID or OperationID +} diff --git a/websrc/serve/serve.go b/websrc/serve/serve.go index 3772069..af9bb21 100644 --- a/websrc/serve/serve.go +++ b/websrc/serve/serve.go @@ -23,6 +23,7 @@ import ( "net/http" "strings" + "github.com/cloud-barista/mc-data-manager/service/task" "github.com/cloud-barista/mc-data-manager/websrc/controllers" "github.com/cloud-barista/mc-data-manager/websrc/routes" @@ -124,6 +125,9 @@ func InitServer(port string, addIP ...string) *echo.Echo { } e.Renderer = renderer + // go cron + scheduleManager := task.InitFileScheduleManager() + // Route for system management e.GET("/swagger/*", echoSwagger.WrapHandler) @@ -141,7 +145,9 @@ func InitServer(port string, addIP ...string) *echo.Echo { restoreGroup := e.Group("/restore") routes.RestoreRoutes(restoreGroup) - // selfEndpoint := os.Getenv("SELF_ENDPOINT") + taskGroup := e.Group("/task") + routes.TaskRoutes(taskGroup, scheduleManager) + selfEndpoint := "localhost" + ":" + port website := " http://" + selfEndpoint apidashboard := " http://" + selfEndpoint + "/swagger/index.html"