diff --git a/zboxcore/sdk/blockdownloadworker.go b/zboxcore/sdk/blockdownloadworker.go index 462b08675..ad9012a99 100644 --- a/zboxcore/sdk/blockdownloadworker.go +++ b/zboxcore/sdk/blockdownloadworker.go @@ -189,6 +189,33 @@ func (req *BlockDownloadRequest) downloadBlobberBlock(fastClient *fasthttp.Clien return errors.New("response_error", string(respBuf)) } + //kafkaObj := map[string]interface{}{ + // "op": "download", + // "time": time.Since(now).Milliseconds(), + // "size": len(respBuf), + // "alloc": req.allocationID, + //} + //kafkaObjStr, err := json.Marshal(kafkaObj) + //if err != nil { + // log.Println("Error publishing to kafka: ", err) + //} + // + //var ( + // BlobberMonitoringKafkaTopic = "blobber_monitoring2" + // BlobberMonitoringKafka = kafka.NewKafkaProvider("91.107.200.12:9092", "admin", "zus-operator", 1*time.Minute) + //) + // + //timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Second) + //defer cancelFunc() + // + //res := BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, req.blobber.ID, string(kafkaObjStr)) + //select { + //case <-res: + // break + //case <-timeout.Done(): + // log.Panic("Timeout to publish event to kafka") + //} + dR := downloadResponse{} if req.shouldVerify { err = json.Unmarshal(respBuf, &dR) diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index 6f09acf0c..7ca0a6232 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -145,23 +145,24 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( } kafkaObj := map[string]interface{}{ - "op": "upload", - "upload_time": time.Since(now).Milliseconds(), - "size": len(dataBuffers[ind].Bytes()), - "alloc": su.allocationObj.ID, + "op": "upload", + "time": time.Since(now).Milliseconds(), + "size": len(dataBuffers[ind].Bytes()), + "alloc": su.allocationObj.ID, } kafkaObjStr, err := json.Marshal(kafkaObj) if err != nil { logger.Logger.Error("Error publishing to kafka: ", err) } + fmt.Println(kafkaObjStr) - // + //var ( - // BlobberMonitoringKafkaTopic = "blobber_monitoring" + // BlobberMonitoringKafkaTopic = "blobber_monitoring2" // BlobberMonitoringKafka = kafka.NewKafkaProvider("91.107.200.12:9092", "admin", "zus-operator", 1*time.Minute) //) - + // //res := BlobberMonitoringKafka.PublishToKafka(BlobberMonitoringKafkaTopic, sb.blobber.ID, string(kafkaObjStr)) //results = append(results, res) @@ -174,7 +175,6 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( } consensus.Done() - //wait for all responses timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Second) defer cancelFunc() sent := 0