Skip to content

Commit

Permalink
Add for download
Browse files Browse the repository at this point in the history
  • Loading branch information
Jayashsatolia403 committed Dec 27, 2024
1 parent f2fbc8a commit 99eba47
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
27 changes: 27 additions & 0 deletions zboxcore/sdk/blockdownloadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,33 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(fastClient *fasthttp.Clien
return errors.New("response_error", string(respBuf))
}

//kafkaObj := map[string]interface{}{
// "op": "download",
// "time": time.Since(now).Milliseconds(),
// "size": len(respBuf),
// "alloc": req.allocationID,
//}
//kafkaObjStr, err := json.Marshal(kafkaObj)
//if err != nil {
// log.Println("Error publishing to kafka: ", err)
//}
//
//var (
// BlobberMonitoringKafkaTopic = "blobber_monitoring2"
// BlobberMonitoringKafka = kafka.NewKafkaProvider("91.107.200.12:9092", "admin", "zus-operator", 1*time.Minute)
//)
//
//timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Second)
//defer cancelFunc()
//
//res := BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, req.blobber.ID, string(kafkaObjStr))
//select {
//case <-res:
// break
//case <-timeout.Done():
// log.Panic("Timeout to publish event to kafka")
//}

dR := downloadResponse{}
if req.shouldVerify {
err = json.Unmarshal(respBuf, &dR)
Expand Down
16 changes: 8 additions & 8 deletions zboxcore/sdk/chunked_upload_blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,23 +145,24 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest(
}

kafkaObj := map[string]interface{}{
"op": "upload",
"upload_time": time.Since(now).Milliseconds(),
"size": len(dataBuffers[ind].Bytes()),
"alloc": su.allocationObj.ID,
"op": "upload",
"time": time.Since(now).Milliseconds(),
"size": len(dataBuffers[ind].Bytes()),
"alloc": su.allocationObj.ID,
}

kafkaObjStr, err := json.Marshal(kafkaObj)
if err != nil {
logger.Logger.Error("Error publishing to kafka: ", err)
}

fmt.Println(kafkaObjStr)
//

//var (
// BlobberMonitoringKafkaTopic = "blobber_monitoring"
// BlobberMonitoringKafkaTopic = "blobber_monitoring2"
// BlobberMonitoringKafka = kafka.NewKafkaProvider("91.107.200.12:9092", "admin", "zus-operator", 1*time.Minute)
//)

//
//res := BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, sb.blobber.ID, string(kafkaObjStr))
//results = append(results, res)

Expand All @@ -174,7 +175,6 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest(
}
consensus.Done()

//wait for all responses
timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Second)
defer cancelFunc()
sent := 0
Expand Down

0 comments on commit 99eba47

Please sign in to comment.