diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 8e7a569..b327f79 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -291,14 +291,13 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques return fluentbit.FLB_OK } -// FlushAggregatedRecords must be called after // Returns FLB_OK, FLB_RETRY, FLB_ERROR func (outputPlugin *OutputPlugin) FlushAggregatedRecords(records *[]*kinesis.PutRecordsRequestEntry) int { aggRecord, err := outputPlugin.aggregator.AggregateRecords() if err != nil { logrus.Errorf("[kinesis %d] Failed to aggregate record %v\n", outputPlugin.PluginID, err) - return fluentbit.FLB_ERROR + return fluentbit.FLB_RETRY } if aggRecord != nil { @@ -384,7 +383,7 @@ func (outputPlugin *OutputPlugin) FlushWithRetries(count int, records []*kinesis switch retCode { case output.FLB_ERROR: - logrus.Errorf("[kinesis %d] Failed to send (%d) records with error", outputPlugin.PluginID, len(records)) + logrus.Errorf("[kinesis %d] Failed to send (%d) records, unretryable error", outputPlugin.PluginID, len(records)) case output.FLB_RETRY: logrus.Errorf("[kinesis %d] Failed to send (%d) records after retries %d", outputPlugin.PluginID, len(records), outputPlugin.concurrencyRetryLimit) case output.FLB_OK: