Skip to content

Commit

Permalink
onboardingTicket: add new uid to the ticket
Browse files Browse the repository at this point in the history
onboarding ticket should contain the storageCluster UID the token
was generated for and for that it requires input for the
namespacedName of the storageCluster we want to generate the
ticket for

Signed-off-by: Rewant Soni <[email protected]>
  • Loading branch information
rewantsoni committed Nov 6, 2024
1 parent 1ea3ed4 commit a70918c
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 93 deletions.
2 changes: 1 addition & 1 deletion controllers/storagecluster/storageclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *storageClient) ensureCreated(r *StorageClusterReconciler, storagecluste
storageClient.Name = storagecluster.Name
_, err := controllerutil.CreateOrUpdate(r.ctx, r.Client, storageClient, func() error {
if storageClient.Status.ConsumerID == "" {
token, err := util.GenerateClientOnboardingToken(tokenLifetimeInHours, onboardingPrivateKeyFilePath, nil)
token, err := util.GenerateClientOnboardingToken(tokenLifetimeInHours, onboardingPrivateKeyFilePath, nil, storagecluster.UID)
if err != nil {
return fmt.Errorf("unable to generate onboarding token: %v", err)
}
Expand Down
36 changes: 36 additions & 0 deletions controllers/util/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

const (
Expand Down Expand Up @@ -166,3 +169,36 @@ func GenerateNameForNonResilientCephBlockPoolSC(initData *ocsv1.StorageCluster)
}
return fmt.Sprintf("%s-ceph-non-resilient-rbd", initData.Name)
}

func GetStorageClusterInNamespace(ctx context.Context, cl client.Client, namespace string) (*ocsv1.StorageCluster, error) {
storageClusterList := &ocsv1.StorageClusterList{}
err := cl.List(ctx, storageClusterList, client.InNamespace(namespace), client.Limit(1))
if err != nil {
return nil, fmt.Errorf("unable to list storageCluster(s) in namespace %s: %v", namespace, err)
}

if len(storageClusterList.Items) == 0 {
return nil, fmt.Errorf("no storageCluster found in namespace %s", namespace)
}
if storageClusterList.Items[0].Status.Phase == PhaseIgnored {
return nil, fmt.Errorf("storageCluster with Phase 'Ignored' found. Please delete the storageCluster to proceed")
}

return &storageClusterList.Items[0], nil
}

func NewK8sClient(scheme *runtime.Scheme) (client.Client, error) {
klog.Info("Setting up k8s client")

config, err := config.GetConfig()
if err != nil {
return nil, err
}

k8sClient, err := client.New(config, client.Options{Scheme: scheme})
if err != nil {
return nil, err
}

return k8sClient, nil
}
10 changes: 7 additions & 3 deletions controllers/util/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ import (
"os"
"time"

"github.com/google/uuid"
"github.com/red-hat-storage/ocs-operator/v4/services"

"github.com/google/uuid"
"k8s.io/apimachinery/pkg/types"
)

// GenerateClientOnboardingToken generates a ocs-client token valid for a duration of "tokenLifetimeInHours".
// The token content is predefined and signed by the private key which'll be read from supplied "privateKeyPath".
// The storageQuotaInGiB is optional, and it is used to limit the storage of PVC in the application cluster.
func GenerateClientOnboardingToken(tokenLifetimeInHours int, privateKeyPath string, storageQuotainGib *uint) (string, error) {
func GenerateClientOnboardingToken(tokenLifetimeInHours int, privateKeyPath string, storageQuotainGib *uint, storageClusterUID types.UID) (string, error) {
tokenExpirationDate := time.Now().
Add(time.Duration(tokenLifetimeInHours) * time.Hour).
Unix()
Expand All @@ -30,6 +32,7 @@ func GenerateClientOnboardingToken(tokenLifetimeInHours int, privateKeyPath stri
ExpirationDate: tokenExpirationDate,
SubjectRole: services.ClientRole,
StorageQuotaInGiB: storageQuotainGib,
StorageCluster: storageClusterUID,
}

token, err := encodeAndSignOnboardingToken(privateKeyPath, ticket)
Expand All @@ -41,7 +44,7 @@ func GenerateClientOnboardingToken(tokenLifetimeInHours int, privateKeyPath stri

// GeneratePeerOnboardingToken generates a ocs-peer token valid for a duration of "tokenLifetimeInHours".
// The token content is predefined and signed by the private key which'll be read from supplied "privateKeyPath".
func GeneratePeerOnboardingToken(tokenLifetimeInHours int, privateKeyPath string) (string, error) {
func GeneratePeerOnboardingToken(tokenLifetimeInHours int, privateKeyPath string, storageClusterUID types.UID) (string, error) {
tokenExpirationDate := time.Now().
Add(time.Duration(tokenLifetimeInHours) * time.Hour).
Unix()
Expand All @@ -50,6 +53,7 @@ func GeneratePeerOnboardingToken(tokenLifetimeInHours int, privateKeyPath string
ID: uuid.New().String(),
ExpirationDate: tokenExpirationDate,
SubjectRole: services.PeerRole,
StorageCluster: storageClusterUID,
}
token, err := encodeAndSignOnboardingToken(privateKeyPath, ticket)
if err != nil {
Expand Down
48 changes: 16 additions & 32 deletions onboarding-validation-keys-generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"crypto/rsa"
"crypto/x509"
"encoding/pem"
v1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"github.com/red-hat-storage/ocs-operator/v4/controllers/util"

"golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

Expand All @@ -24,9 +25,18 @@ const (
)

func main() {
cl, err := newClient()

scheme := runtime.NewScheme()
if err := corev1.AddToScheme(scheme); err != nil {
klog.Exitf("failed to add corev1 to scheme. %v", err)
}
if err := ocsv1.AddToScheme(scheme); err != nil {
klog.Exitf("failed to add ocsv1 to scheme. %v", err)
}

cl, err := util.NewK8sClient(scheme)
if err != nil {
klog.Exitf("failed to create client: %v", err)
klog.Exitf("failed to create kube client: %v", err)
}

ctx := context.Background()
Expand All @@ -44,15 +54,10 @@ func main() {
privatePem := convertRsaPrivateKeyAsPemStr(privateKey)
publicPem := convertRsaPublicKeyAsPemStr(publicKey)

storageClusterList := &v1.StorageClusterList{}
err = cl.List(ctx, storageClusterList, client.InNamespace(namespace))
storageCluster, err := util.GetStorageClusterInNamespace(ctx, cl, namespace)
if err != nil {
klog.Exitf("unable to list storageCluster(s) in %v namespace, %v", namespace, err)
klog.Exitf("failed to get storage cluster: %v", err)
}
if len(storageClusterList.Items) != 1 {
klog.Exitf("unexpected number of storageCluster(s) found in %v namespace, expected: 1 actual: %v", namespace, len(storageClusterList.Items))
}
storageCluster := &storageClusterList.Items[0]

// In situations where there is a risk of one secret being updated and potentially
// failing to update another, it is recommended not to rely solely on clientset update mechanisms.
Expand Down Expand Up @@ -105,27 +110,6 @@ func main() {

}

func newClient() (client.Client, error) {
klog.Info("Setting up k8s client")
scheme := runtime.NewScheme()
if err := v1.AddToScheme(scheme); err != nil {
return nil, err
}
if err := corev1.AddToScheme(scheme); err != nil {
return nil, err
}
config, err := config.GetConfig()
if err != nil {
return nil, err
}
k8sClient, err := client.New(config, client.Options{Scheme: scheme})
if err != nil {
return nil, err
}

return k8sClient, nil
}

func convertRsaPrivateKeyAsPemStr(privateKey *rsa.PrivateKey) string {
privteKeyBytes := x509.MarshalPKCS1PrivateKey(privateKey)
privateKeyPem := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privteKeyBytes})
Expand Down
7 changes: 7 additions & 0 deletions rbac/ux_backend_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,10 @@ rules:
verbs:
- get
- list
- apiGroups:
- ocs.openshift.io
resources:
- storageclusters
verbs:
- get
- list
71 changes: 25 additions & 46 deletions services/provider/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"k8s.io/apimachinery/pkg/types"
klog "k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

const (
Expand All @@ -74,7 +73,12 @@ type OCSProviderServer struct {
}

func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderServer, error) {
client, err := newClient()
scheme, err := newScheme()
if err != nil {
return nil, fmt.Errorf("failed to create new scheme. %v", err)
}

client, err := util.NewK8sClient(scheme)
if err != nil {
return nil, fmt.Errorf("failed to create new client. %v", err)
}
Expand Down Expand Up @@ -121,6 +125,19 @@ func (s *OCSProviderServer) OnboardConsumer(ctx context.Context, req *pb.Onboard
klog.Errorf("failed to validate onboarding ticket for consumer %q. %v", req.ConsumerName, err)
return nil, status.Errorf(codes.InvalidArgument, "onboarding ticket is not valid. %v", err)
}
storageCluster, err := util.GetStorageClusterInNamespace(ctx, s.client, s.namespace)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get storageCluster. %v", err)
}

if storageCluster.UID != onboardingTicket.StorageCluster {
return nil, status.Errorf(codes.InvalidArgument, "onboarding ticket storageCluster not match existing storageCluster.")
}

if !storageCluster.Spec.AllowRemoteStorageConsumers {
return nil, status.Errorf(codes.PermissionDenied, "onboarding remote storageConsumer(s) is not allowed.")
}

storageQuotaInGiB := ptr.Deref(onboardingTicket.StorageQuotaInGiB, 0)

if onboardingTicket.SubjectRole != services.ClientRole {
Expand Down Expand Up @@ -195,7 +212,7 @@ func (s *OCSProviderServer) GetStorageConfig(ctx context.Context, req *pb.Storag
return nil, status.Errorf(codes.Internal, "Failed to construct status response: %v", err)
}

storageCluster, err := s.getStorageCluster(ctx)
storageCluster, err := util.GetStorageClusterInNamespace(ctx, s.client, s.namespace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -250,7 +267,7 @@ func (s *OCSProviderServer) Start(port int, opts []grpc.ServerOption) {
}
}

func newClient() (client.Client, error) {
func newScheme() (*runtime.Scheme, error) {
scheme := runtime.NewScheme()
err := ocsv1alpha1.AddToScheme(scheme)
if err != nil {
Expand All @@ -277,19 +294,9 @@ func newClient() (client.Client, error) {
return nil, fmt.Errorf("failed to add routev1 to scheme. %v", err)
}

config, err := config.GetConfig()
if err != nil {
klog.Error(err, "failed to get rest.config")
return nil, err
}
client, err := client.New(config, client.Options{Scheme: scheme})
if err != nil {
klog.Error(err, "failed to create controller-runtime client")
return nil, err
}

return client, nil
return scheme, nil
}

func (s *OCSProviderServer) getExternalResources(ctx context.Context, consumerResource *ocsv1alpha1.StorageConsumer) ([]*pb.ExternalResource, error) {
var extR []*pb.ExternalResource

Expand Down Expand Up @@ -762,7 +769,7 @@ func (s *OCSProviderServer) GetStorageClaimConfig(ctx context.Context, req *pb.S
"csi.storage.k8s.io/controller-expand-secret-name": provisionerSecretName,
}

storageCluster, err := s.getStorageCluster(ctx)
storageCluster, err := util.GetStorageClusterInNamespace(ctx, s.client, s.namespace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -855,7 +862,7 @@ func (s *OCSProviderServer) ReportStatus(ctx context.Context, req *pb.ReportStat
return nil, status.Errorf(codes.Internal, "Failed to construct status response: %v", err)
}

storageCluster, err := s.getStorageCluster(ctx)
storageCluster, err := util.GetStorageClusterInNamespace(ctx, s.client, s.namespace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -896,34 +903,6 @@ func (s *OCSProviderServer) getOCSSubscriptionChannel(ctx context.Context) (stri
return subscription.Spec.Channel, nil
}

func (s *OCSProviderServer) getStorageCluster(ctx context.Context) (*ocsv1.StorageCluster, error) {
scList := &ocsv1.StorageClusterList{}
if err := s.client.List(ctx, scList, client.InNamespace(s.namespace)); err != nil {
return nil, status.Errorf(codes.Internal, "failed to list storage clusters: %v", err)
}

var foundSc *ocsv1.StorageCluster
for i := range scList.Items {
sc := &scList.Items[i]
if sc.Status.Phase == util.PhaseIgnored {
continue // Skip Ignored storage cluster
}
if sc.Spec.AllowRemoteStorageConsumers {
if foundSc != nil {
// This means we have already found one storage cluster, so this is a second one
return nil, status.Errorf(codes.FailedPrecondition, "multiple provider storage clusters found")
}
foundSc = sc
}
}

if foundSc == nil {
return nil, status.Errorf(codes.NotFound, "no provider storage cluster found")
}

return foundSc, nil
}

func isEncryptionInTransitEnabled(networkSpec *rookCephv1.NetworkSpec) bool {
return networkSpec != nil &&
networkSpec.Connections != nil &&
Expand Down
3 changes: 3 additions & 0 deletions services/types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package services

import "k8s.io/apimachinery/pkg/types"

type OnboardingSubjectRole string

const (
Expand All @@ -12,4 +14,5 @@ type OnboardingTicket struct {
ExpirationDate int64 `json:"expirationDate,string"`
SubjectRole OnboardingSubjectRole `json:"subjectRole"`
StorageQuotaInGiB *uint `json:"storageQuotaInGiB,omitempty"`
StorageCluster types.UID `json:"storageCluster"`
}
15 changes: 11 additions & 4 deletions services/ux-backend/handlers/onboarding/clienttokens/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
Expand All @@ -23,16 +24,16 @@ var unitToGib = map[string]uint{
"Pi": 1024 * 1024,
}

func HandleMessage(w http.ResponseWriter, r *http.Request, tokenLifetimeInHours int) {
func HandleMessage(w http.ResponseWriter, r *http.Request, tokenLifetimeInHours int, cl client.Client, namespace string) {
switch r.Method {
case "POST":
handlePost(w, r, tokenLifetimeInHours)
handlePost(w, r, tokenLifetimeInHours, cl, namespace)
default:
handleUnsupportedMethod(w, r)
}
}

func handlePost(w http.ResponseWriter, r *http.Request, tokenLifetimeInHours int) {
func handlePost(w http.ResponseWriter, r *http.Request, tokenLifetimeInHours int, cl client.Client, namespace string) {
var storageQuotaInGiB *uint
// When ContentLength is 0 that means request body is empty and
// storage quota is unlimited
Expand All @@ -59,7 +60,13 @@ func handlePost(w http.ResponseWriter, r *http.Request, tokenLifetimeInHours int
storageQuotaInGiB = ptr.To(unitAsGiB * quota.Value)
}

if onboardingToken, err := util.GenerateClientOnboardingToken(tokenLifetimeInHours, onboardingPrivateKeyFilePath, storageQuotaInGiB); err != nil {
storageCluster, err := util.GetStorageClusterInNamespace(r.Context(), cl, namespace)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

if onboardingToken, err := util.GenerateClientOnboardingToken(tokenLifetimeInHours, onboardingPrivateKeyFilePath, storageQuotaInGiB, storageCluster.UID); err != nil {
klog.Errorf("failed to get onboarding token: %v", err)
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set("Content-Type", handlers.ContentTypeTextPlain)
Expand Down
Loading

0 comments on commit a70918c

Please sign in to comment.