Skip to content

Commit

Permalink
[release-v1.11] Backport kncloudevents package improvements (#492)
Browse files Browse the repository at this point in the history
* Remove deprecated httpclient msgsender (knative#7018)

## Proposed Changes
Removed the legacy http client go, message sender go and respective test
files, Since the functionality are handled in http client new and
message sender new go

Issue related : knative#6995
 
- 🗑️ Remove feature or internal logic

-
-
-

### Pre-review Checklist

- [x] **At least 80% unit test coverage**
- [ ] **E2E tests** for any new behavior
- [ ] **Docs PR** for any user-facing impact
- [ ] **Spec PR** for any new API feature
- [ ] **Conformance test** for any change to the spec

**Release Note**
 
```release-note

```


**Docs**
 
📖 knative#6995

---------

Co-authored-by: Christoph Stäbler <[email protected]>

* Refactor kncloudevents and add `SendEvent` function (knative#7092)

Adds a function (`SendEvent`) to the kncloudevents package to Send
events. This function:
* configures the client (e.g. for TLS)
* has option to send replies
* has option to send to a DLS
* has option to add additional transformers

This allows e.g. for use cases like the following:
```
kncloudevents.SendEvent(ctx, event, sub.Subscriber,
	kncloudevents.WithHeader(additionalHeaders),
	kncloudevents.WithReply(sub.Reply),
	kncloudevents.WithDeadLetterSink(sub.DeadLetter),
	kncloudevents.WithRetryConfig(sub.RetryConfig))
```

The `SendMessage` function is provided to make integration with existing
packages easier (e.g. in dependent projects).

Also migrated the usages of kncloudevents.NewCloudEventRequest() to the
new SendEvent() function (mt-broker-filter and mt-broker-ingress):
* 74c1552
* 9728713
* 958722d

* Remove deprecated kncloudevents.CloudEventsRequest (knative#7146)

* Remove kncloudevents.CloudEventsRequest

* Run hack/update-codegen.sh

* Add unit tests for generateBackoffFn()

* Fix event dispatcher library data race (knative#7280)

Fix dispatcher data race

Signed-off-by: Pierangelo Di Pilato <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Jeevan <[email protected]>
Co-authored-by: Christoph Stäbler <[email protected]>
  • Loading branch information
3 people authored Jan 17, 2024
1 parent 62d36d2 commit 2d2c4d5
Show file tree
Hide file tree
Showing 57 changed files with 1,446 additions and 3,719 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/cobra v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
go.uber.org/automaxprocs v1.4.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/mod v0.9.0 // indirect
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,6 @@ github.com/tsenart/vegeta/v12 v12.8.4/go.mod h1:ZiJtwLn/9M4fTPdMY7bdbIeyNeFVE8/A
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/wavesoftware/go-ensure v1.0.0 h1:6X3gQL5psBWwtu/H9a+69xQ+JGTUELaLhgOB/iB3AQk=
github.com/wavesoftware/go-ensure v1.0.0/go.mod h1:K2UAFSwMTvpiRGay/M3aEYYuurcR8S4A6HkQlJPV8k4=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
Expand Down
167 changes: 39 additions & 128 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ limitations under the License.
package filter

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"

opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
Expand All @@ -38,7 +38,7 @@ import (
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/apis"
channelAttributes "knative.dev/eventing/pkg/channel/attributes"
"knative.dev/eventing/pkg/utils"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
Expand All @@ -51,7 +51,6 @@ import (
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/reconciler/sugar/trigger/path"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
)

const (
Expand All @@ -65,24 +64,6 @@ const (
defaultMaxIdleConnectionsPerHost = 100
)

const (
// NoResponse signals the step that send event to trigger's subscriber hasn't started
NoResponse = -1
)

// ErrHandler handle the different errors of filter dispatch process
type ErrHandler struct {
ResponseCode int
ResponseBody []byte
err error
}

// HeaderProxyAllowList contains the headers that are proxied from the reply; other than the CloudEvents headers.
// Other headers are not proxied because of security concerns.
var HeaderProxyAllowList = map[string]struct{}{
strings.ToLower("Retry-After"): {},
}

// Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber.
type Handler struct {
// reporter reports stats of status code and dispatch time
Expand All @@ -93,7 +74,7 @@ type Handler struct {
withContext func(ctx context.Context) context.Context
}

// NewHandler creates a new Handler and its associated MessageReceiver.
// NewHandler creates a new Handler and its associated EventReceiver.
func NewHandler(logger *zap.Logger, triggerInformer v1.TriggerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) {
kncloudevents.ConfigureConnectionArgs(&kncloudevents.ConnectionArgs{
MaxIdleConns: defaultMaxIdleConnections,
Expand Down Expand Up @@ -164,10 +145,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {

ctx := h.withContext(request.Context())

message := cehttp.NewMessageFromHttpRequest(request)
defer message.Finish(nil)

event, err := binding.ToEvent(ctx, message)
event, err := cehttp.NewEventFromHTTPRequest(request)
if err != nil {
h.logger.Warn("failed to extract event from request", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -242,123 +220,64 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
URL: t.Status.SubscriberURI,
CACerts: t.Status.SubscriberCACerts,
}
h.send(ctx, writer, request.Header, target, reportArgs, event, t, ttl)
h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, t, ttl)
}

func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) {
// send the event to trigger's subscriber
response, responseErr := h.sendEvent(ctx, headers, target, event, t, reportArgs)

if responseErr.err != nil {
h.logger.Error("failed to send event", zap.Error(responseErr.err))
// If error is not because of the response, it should respond with http.StatusInternalServerError
if responseErr.ResponseCode == NoResponse {
additionalHeaders := headers.Clone()
additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace())

dispatchInfo, err := kncloudevents.SendEvent(ctx, *event, target, kncloudevents.WithHeader(additionalHeaders))
if err != nil {
h.logger.Error("failed to send event", zap.Error(err))

// If error is not because of the response, it should respond with http.StatusInternalServerError
if dispatchInfo.ResponseCode <= 0 {
writer.WriteHeader(http.StatusInternalServerError)
_ = h.reporter.ReportEventCount(reportArgs, http.StatusInternalServerError)
return
}
// If error has a response propagate subscriber's headers back to channel
if response != nil {
proxyHeaders(response.Header, writer)
}
writer.WriteHeader(responseErr.ResponseCode)

h.reporter.ReportEventDispatchTime(reportArgs, dispatchInfo.ResponseCode, dispatchInfo.Duration)

writeHeaders(utils.PassThroughHeaders(dispatchInfo.ResponseHeader), writer)
writer.WriteHeader(dispatchInfo.ResponseCode)

// Read Response body to responseErr
errExtensionInfo := broker.ErrExtensionInfo{
ErrDestination: target.URL,
ErrResponseBody: responseErr.ResponseBody,
ErrResponseBody: dispatchInfo.ResponseBody,
}
errExtensionBytes, msErr := json.Marshal(errExtensionInfo)
if msErr != nil {
h.logger.Error("failed to marshal errExtensionInfo", zap.Error(msErr))
return
}
_, _ = writer.Write(errExtensionBytes)
_ = h.reporter.ReportEventCount(reportArgs, responseErr.ResponseCode)
_, err = writer.Write(errExtensionBytes)
if err != nil {
h.logger.Error("failed to write error response", zap.Error(err))
}
_ = h.reporter.ReportEventCount(reportArgs, dispatchInfo.ResponseCode)

return
}

h.logger.Debug("Successfully dispatched message", zap.Any("target", target))

h.reporter.ReportEventDispatchTime(reportArgs, dispatchInfo.ResponseCode, dispatchInfo.Duration)

// If there is an event in the response write it to the response
statusCode, err := h.writeResponse(ctx, writer, response, ttl, target.URL.String())
statusCode, err := h.writeResponse(ctx, writer, dispatchInfo, ttl, target.URL.String())
if err != nil {
h.logger.Error("failed to write response", zap.Error(err))
}
_ = h.reporter.ReportEventCount(reportArgs, statusCode)
}

func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target duckv1.Addressable, event *cloudevents.Event, t *eventingv1.Trigger, reporterArgs *ReportArgs) (*http.Response, ErrHandler) {
responseErr := ErrHandler{
ResponseCode: NoResponse,
}

// Send the event to the subscriber
req, err := kncloudevents.NewCloudEventRequest(ctx, target)
if err != nil {
responseErr.err = fmt.Errorf("failed to create the request: %w", err)
return nil, responseErr
}

message := binding.ToMessage(event)
defer message.Finish(nil)

additionalHeaders := utils.PassThroughHeaders(headers)
additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace())

// Following the spec https://github.com/knative/specs/blob/main/specs/eventing/data-plane.md#derived-reply-events
additionalHeaders.Set("prefer", "reply")

err = kncloudevents.WriteRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders)
if err != nil {
responseErr.err = fmt.Errorf("failed to write request: %w", err)
return nil, responseErr
}

start := time.Now()
resp, err := req.Send()
dispatchTime := time.Since(start)
if err != nil {
responseErr.ResponseCode = http.StatusInternalServerError
responseErr.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", err.Error()))
responseErr.err = fmt.Errorf("failed to dispatch message: %w", err)
return resp, responseErr
}

sc := 0
if resp != nil {
sc = resp.StatusCode
responseErr.ResponseCode = sc
}

_ = h.reporter.ReportEventDispatchTime(reporterArgs, sc, dispatchTime)

if resp.StatusCode < http.StatusOK ||
resp.StatusCode >= http.StatusMultipleChoices {
// Read response body into errHandler for failures
body := make([]byte, channelAttributes.KnativeErrorDataExtensionMaxLength)

readLen, readErr := resp.Body.Read(body)
if readErr != nil && readErr != io.EOF {
h.logger.Error("failed to read response body into DispatchExecutionInfo", zap.Error(readErr))
responseErr.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", readErr.Error()))
} else {
responseErr.ResponseBody = body[:readLen]
}
responseErr.err = fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", resp.StatusCode)

// Reject non-successful responses.
return resp, responseErr
}

return resp, responseErr
}

// The return values are the status
func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, resp *http.Response, ttl int32, target string) (int, error) {
response := cehttp.NewMessageFromHttpResponse(resp)
func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, dispatchInfo *kncloudevents.DispatchInfo, ttl int32, target string) (int, error) {
response := cehttp.NewMessage(dispatchInfo.ResponseHeader, io.NopCloser(bytes.NewReader(dispatchInfo.ResponseBody)))
defer response.Finish(nil)

if response.ReadEncoding() == binding.EncodingUnknown {
Expand All @@ -375,10 +294,10 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter,
writer.WriteHeader(http.StatusBadGateway)
return http.StatusBadGateway, errors.New("received a non-empty response not recognized as CloudEvent. The response MUST be either empty or a valid CloudEvent")
}
proxyHeaders(resp.Header, writer) // Proxy original Response Headers for downstream use
writeHeaders(dispatchInfo.ResponseHeader, writer) // Proxy original Response Headers for downstream use
h.logger.Debug("Response doesn't contain a CloudEvent, replying with an empty response", zap.Any("target", target))
writer.WriteHeader(resp.StatusCode)
return resp.StatusCode, nil
writer.WriteHeader(dispatchInfo.ResponseCode)
return dispatchInfo.ResponseCode, nil
}

event, err := binding.ToEvent(ctx, response)
Expand All @@ -400,15 +319,15 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter,
defer eventResponse.Finish(nil)

// Proxy the original Response Headers for downstream use
proxyHeaders(resp.Header, writer)
writeHeaders(dispatchInfo.ResponseHeader, writer)

if err := cehttp.WriteResponseWriter(ctx, eventResponse, resp.StatusCode, writer); err != nil {
if err := cehttp.WriteResponseWriter(ctx, eventResponse, dispatchInfo.ResponseCode, writer); err != nil {
return http.StatusInternalServerError, fmt.Errorf("failed to write response event: %w", err)
}

h.logger.Debug("Replied with a CloudEvent response", zap.Any("target", target))

return resp.StatusCode, nil
return dispatchInfo.ResponseCode, nil
}

func (h *Handler) reportArrivalTime(event *event.Event, reportArgs *ReportArgs) {
Expand Down Expand Up @@ -523,19 +442,11 @@ func triggerFilterAttribute(filter *eventingv1.TriggerFilter, attributeName stri
return attributeValue
}

// proxyHeaders adds the specified HTTP Headers to the ResponseWriter.
func proxyHeaders(httpHeader http.Header, writer http.ResponseWriter) {
// writeHeaders adds the specified HTTP Headers to the ResponseWriter.
func writeHeaders(httpHeader http.Header, writer http.ResponseWriter) {
for headerKey, headerValues := range httpHeader {
// *Only* proxy some headers because of security reasons
if isInProxyHeaderAllowList(headerKey) {
for _, headerValue := range headerValues {
writer.Header().Add(headerKey, headerValue)
}
for _, headerValue := range headerValues {
writer.Header().Add(headerKey, headerValue)
}
}
}

func isInProxyHeaderAllowList(headerKey string) bool {
_, exists := HeaderProxyAllowList[strings.ToLower(headerKey)]
return exists
}
2 changes: 1 addition & 1 deletion pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func TestReceiver(t *testing.T) {
return
}
if err != nil || event == nil {
t.Fatalf("Expected response event, actually nil")
t.Fatalf("Expected response event, actually nil (err: %+v)", err)
}

// The TTL will be added again.
Expand Down
4 changes: 2 additions & 2 deletions pkg/broker/filter/server_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func NewServerManager(ctx context.Context, logger *zap.Logger, cmw configmap.Wat
logger.Info("failed to get TLS server config", zap.Error(err))
}

httpReceiver := kncloudevents.NewHTTPMessageReceiver(httpPort)
httpsReceiver := kncloudevents.NewHTTPMessageReceiver(httpsPort, kncloudevents.WithTLSConfig(tlsConfig))
httpReceiver := kncloudevents.NewHTTPEventReceiver(httpPort)
httpsReceiver := kncloudevents.NewHTTPEventReceiver(httpsPort, kncloudevents.WithTLSConfig(tlsConfig))

return eventingtls.NewServerManager(ctx, httpReceiver, httpsReceiver, handler, cmw)
}
Expand Down
Loading

0 comments on commit 2d2c4d5

Please sign in to comment.