Skip to content

Commit

Permalink
Merge pull request #487 from amadhusu/am/451
Browse files Browse the repository at this point in the history
Extracted DB and Object Storage Error Logs to display on DSPA
  • Loading branch information
openshift-merge-bot[bot] authored Dec 1, 2023
2 parents 30e8efc + f9410d8 commit 325b3e8
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 73 deletions.
35 changes: 21 additions & 14 deletions controllers/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
b64 "encoding/base64"
"fmt"

"time"

"errors"

_ "github.com/go-sql-driver/mysql"
dspav1alpha1 "github.com/opendatahub-io/data-science-pipelines-operator/api/v1alpha1"
"github.com/opendatahub-io/data-science-pipelines-operator/controllers/config"
"time"
)

const dbSecret = "mariadb/secret.yaml.tmpl"
Expand All @@ -37,58 +40,62 @@ var mariadbTemplates = []string{
}

// extract to var for mocking in testing
var ConnectAndQueryDatabase = func(host, port, username, password, dbname string, dbConnectionTimeout time.Duration) bool {
var ConnectAndQueryDatabase = func(host, port, username, password, dbname string, dbConnectionTimeout time.Duration) (bool, error) {
// Create a context with a timeout of 1 second
ctx, cancel := context.WithTimeout(context.Background(), dbConnectionTimeout)
defer cancel()

connectionString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", username, password, host, port, dbname)
db, err := sql.Open("mysql", connectionString)
if err != nil {
return false
return false, err
}
defer db.Close()

testStatement := "SELECT 1;"
_, err = db.QueryContext(ctx, testStatement)
return err == nil
return err == nil, nil
}

func (r *DSPAReconciler) isDatabaseAccessible(ctx context.Context, dsp *dspav1alpha1.DataSciencePipelinesApplication,
params *DSPAParams) bool {
params *DSPAParams) (bool, error) {
log := r.Log.WithValues("namespace", dsp.Namespace).WithValues("dspa_name", dsp.Name)

if params.DatabaseHealthCheckDisabled(dsp) {
log.V(1).Info("Database health check disabled, assuming database is available and ready.")
return true
infoMessage := "Database health check disabled, assuming database is available and ready."
log.V(1).Info(infoMessage)
return true, nil
}

log.Info("Performing Database Health Check")
databaseSpecified := dsp.Spec.Database != nil
usingExternalDB := params.UsingExternalDB(dsp)
usingMariaDB := !databaseSpecified || dsp.Spec.Database.MariaDB != nil
if !usingMariaDB && !usingExternalDB {
log.Info("Could not connect to Database: Unsupported Type")
return false
errorMessage := "Could not connect to Database: Unsupported Type"
log.Error(nil, errorMessage)
return false, errors.New(errorMessage)
}

decodePass, _ := b64.StdEncoding.DecodeString(params.DBConnection.Password)
dbConnectionTimeout := config.GetDurationConfigWithDefault(config.DBConnectionTimeoutConfigName, config.DefaultDBConnectionTimeout)

log.V(1).Info(fmt.Sprintf("Database Heath Check connection timeout: %s", dbConnectionTimeout))

dbHealthCheckPassed := ConnectAndQueryDatabase(params.DBConnection.Host,
dbHealthCheckPassed, err := ConnectAndQueryDatabase(params.DBConnection.Host,
params.DBConnection.Port,
params.DBConnection.Username,
string(decodePass),
params.DBConnection.DBName,
dbConnectionTimeout)
if dbHealthCheckPassed {
log.Info("Database Health Check Successful")
} else {

if err != nil {
log.Info("Unable to connect to Database")
} else {
log.Info("Database Health Check Successful")
}
return dbHealthCheckPassed

return dbHealthCheckPassed, err
}

func (r *DSPAReconciler) ReconcileDatabase(ctx context.Context, dsp *dspav1alpha1.DataSciencePipelinesApplication,
Expand Down
13 changes: 7 additions & 6 deletions controllers/dspipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"

"sigs.k8s.io/controller-runtime/pkg/controller"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -224,8 +225,8 @@ func (r *DSPAReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
}

// Get Prereq Status (DB and ObjStore Ready)
dbAvailable := r.isDatabaseAccessible(ctx, dspa, params)
objStoreAvailable := r.isObjectStorageAccessible(ctx, dspa, params)
dbAvailable, dbAvailableError := r.isDatabaseAccessible(ctx, dspa, params)
objStoreAvailable, objStoreAvailableError := r.isObjectStorageAccessible(ctx, dspa, params)
dspaPrereqsReady := dbAvailable && objStoreAvailable

if dspaPrereqsReady {
Expand Down Expand Up @@ -269,7 +270,7 @@ func (r *DSPAReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
return ctrl.Result{}, err
}

conditions, err := r.GenerateStatus(ctx, dspa, params, dbAvailable, objStoreAvailable)
conditions, err := r.GenerateStatus(ctx, dspa, params, dbAvailable, objStoreAvailable, dbAvailableError, objStoreAvailableError)
if err != nil {
log.Info(err.Error())
return ctrl.Result{}, err
Expand Down Expand Up @@ -411,14 +412,14 @@ func (r *DSPAReconciler) handleReadyCondition(ctx context.Context, dspa *dspav1a
}

func (r *DSPAReconciler) GenerateStatus(ctx context.Context, dspa *dspav1alpha1.DataSciencePipelinesApplication,
params *DSPAParams, dbAvailableStatus, objStoreAvailableStatus bool) ([]metav1.Condition, error) {
params *DSPAParams, dbAvailableStatus bool, objStoreAvailableStatus bool, dbAvailableError error, objStoreAvailableError error) ([]metav1.Condition, error) {
// Create Database Availability Condition
databaseAvailable := r.buildCondition(config.DatabaseAvailable, dspa, config.DatabaseAvailable)
if dbAvailableStatus {
databaseAvailable.Status = metav1.ConditionTrue
databaseAvailable.Message = "Database connectivity successfully verified"
} else {
databaseAvailable.Message = "Could not connect to database"
databaseAvailable.Message = error.Error(dbAvailableError)
}

// Create Object Storage Availability Condition
Expand All @@ -427,7 +428,7 @@ func (r *DSPAReconciler) GenerateStatus(ctx context.Context, dspa *dspav1alpha1.
objStoreAvailable.Status = metav1.ConditionTrue
objStoreAvailable.Message = "Object Store connectivity successfully verified"
} else {
objStoreAvailable.Message = "Could not connect to Object Store"
objStoreAvailable.Message = error.Error(objStoreAvailableError)
}

// Create APIServer Readiness Condition
Expand Down
67 changes: 38 additions & 29 deletions controllers/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"fmt"
"net/http"

"time"

"github.com/go-logr/logr"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
dspav1alpha1 "github.com/opendatahub-io/data-science-pipelines-operator/api/v1alpha1"
"github.com/opendatahub-io/data-science-pipelines-operator/controllers/config"
"github.com/opendatahub-io/data-science-pipelines-operator/controllers/util"
"time"
)

const storageSecret = "minio/secret.yaml.tmpl"
Expand Down Expand Up @@ -94,7 +95,7 @@ func getHttpsTransportWithCACert(log logr.Logger, pemCerts []byte) (*http.Transp
return transport, nil
}

var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) {
cred := createCredentialProvidersChain(string(accesskey), string(secretkey))

opts := &minio.Options{
Expand All @@ -105,16 +106,18 @@ var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoin
if len(pemCerts) != 0 {
tr, err := getHttpsTransportWithCACert(log, pemCerts)
if err != nil {
log.Error(err, "Encountered error when processing custom ca bundle.")
return false
errorMessage := "Encountered error when processing custom ca bundle."
log.Error(err, errorMessage)
return false, errors.New(errorMessage)
}
opts.Transport = tr
}

minioClient, err := minio.New(endpoint, opts)
if err != nil {
log.Info(fmt.Sprintf("Could not connect to object storage endpoint: %s", endpoint))
return false
errorMessage := fmt.Sprintf("Could not connect to object storage endpoint: %s", endpoint)
log.Error(err, errorMessage)
return false, errors.New(errorMessage)
}

ctx, cancel := context.WithTimeout(ctx, objStoreConnectionTimeout)
Expand All @@ -128,68 +131,74 @@ var ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoin
// In the case that the Error is NoSuchKey (or NoSuchBucket), we can verify that the endpoint worked and the object just doesn't exist
case minio.ErrorResponse:
if err.Code == "NoSuchKey" || err.Code == "NoSuchBucket" {
return true
return true, err
}
}

if util.IsX509UnknownAuthorityError(err) {
log.Error(err, "Encountered x509 UnknownAuthorityError when connecting to ObjectStore. "+
"If using an tls S3 connection with self-signed certs, you may specify a custom CABundle "+
"to mount on the DSP API Server via the DSPA cr under the spec.cABundle field. If you have already "+
"provided a CABundle, verify the validity of the provided CABundle.")
return false
errorMessage := "Encountered x509 UnknownAuthorityError when connecting to ObjectStore. " +
"If using an tls S3 connection with self-signed certs, you may specify a custom CABundle " +
"to mount on the DSP API Server via the DSPA cr under the spec.cABundle field. If you have already " +
"provided a CABundle, verify the validity of the provided CABundle."
log.Error(err, errorMessage)
return false, errors.New(errorMessage)
}

// Every other error means the endpoint in inaccessible, or the credentials provided do not have, at a minimum GetObject, permissions
log.Info(fmt.Sprintf("Could not connect to (%s), Error: %s", endpoint, err.Error()))
return false
errorMessage := fmt.Sprintf("Could not connect to (%s), Error: %s", endpoint, err.Error())
log.Error(err, errorMessage)
return false, errors.New(errorMessage)
}

// Getting here means the health check passed
return true
return true, nil
}

func (r *DSPAReconciler) isObjectStorageAccessible(ctx context.Context, dsp *dspav1alpha1.DataSciencePipelinesApplication,
params *DSPAParams) bool {
params *DSPAParams) (bool, error) {
log := r.Log.WithValues("namespace", dsp.Namespace).WithValues("dspa_name", dsp.Name)
if params.ObjectStorageHealthCheckDisabled(dsp) {
log.V(1).Info("Object Storage health check disabled, assuming object store is available and ready.")
return true
infoMessage := "Object Storage health check disabled, assuming object store is available and ready."
log.V(1).Info(infoMessage)
return true, nil
}

log.Info("Performing Object Storage Health Check")

endpoint, err := joinHostPort(params.ObjectStorageConnection.Host, params.ObjectStorageConnection.Port)
if err != nil {
log.Error(err, "Could not determine Object Storage Endpoint")
return false
errorMessage := "Could not determine Object Storage Endpoint"
log.Error(err, errorMessage)
return false, errors.New(errorMessage)
}

accesskey, err := base64.StdEncoding.DecodeString(params.ObjectStorageConnection.AccessKeyID)
if err != nil {
log.Error(err, "Could not decode Object Storage Access Key ID")
return false
errorMessage := "Could not decode Object Storage Access Key ID"
log.Error(err, errorMessage)
return false, errors.New(errorMessage)
}

secretkey, err := base64.StdEncoding.DecodeString(params.ObjectStorageConnection.SecretAccessKey)
if err != nil {
log.Error(err, "Could not decode Object Storage Secret Access Key")
return false
errorMessage := "Could not decode Object Storage Secret Access Key"
log.Error(err, errorMessage)
return false, errors.New(errorMessage)
}

objStoreConnectionTimeout := config.GetDurationConfigWithDefault(config.ObjStoreConnectionTimeoutConfigName, config.DefaultObjStoreConnectionTimeout)

log.V(1).Info(fmt.Sprintf("Object Store connection timeout: %s", objStoreConnectionTimeout))

verified := ConnectAndQueryObjStore(ctx, log, endpoint, params.ObjectStorageConnection.Bucket, accesskey, secretkey,
verified, err := ConnectAndQueryObjStore(ctx, log, endpoint, params.ObjectStorageConnection.Bucket, accesskey, secretkey,
*params.ObjectStorageConnection.Secure, params.APICustomPemCerts, objStoreConnectionTimeout)

if verified {
log.Info("Object Storage Health Check Successful")
} else {
if err != nil {
log.Info("Object Storage Health Check Failed")
} else {
log.Info("Object Storage Health Check Successful")
}
return verified
return verified, err
}

// ReconcileStorage will set up Storage Connection.
Expand Down
41 changes: 21 additions & 20 deletions controllers/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package controllers
import (
"context"
"encoding/base64"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -270,8 +271,8 @@ func TestDefaultDeployBehaviorStorage(t *testing.T) {

func TestIsDatabaseAccessibleTrue(t *testing.T) {
// Override the live connection function with a mock version
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return true
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) {
return true, nil
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -302,14 +303,14 @@ func TestIsDatabaseAccessibleTrue(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.True(t, verified)
verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.True(t, verified, err)
}

func TestIsDatabaseNotAccessibleFalse(t *testing.T) {
// Override the live connection function with a mock version
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return false
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) {
return false, errors.New("Object Store is not Accessible")
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -340,14 +341,14 @@ func TestIsDatabaseNotAccessibleFalse(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified)
verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified, err)
}

func TestDisabledHealthCheckReturnsTrue(t *testing.T) {
// Override the live connection function with a mock version that would always return false if called
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return false
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) {
return false, errors.New("Object Store is not Accessible")
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -378,16 +379,16 @@ func TestDisabledHealthCheckReturnsTrue(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params)
// if health check is disabled this should always return True
// even thought the mock connection function would return false if called
assert.True(t, verified)
assert.True(t, verified, err)
}

func TestIsDatabaseAccessibleBadAccessKey(t *testing.T) {
// Override the live connection function with a mock version
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return true
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) {
return true, nil
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -418,14 +419,14 @@ func TestIsDatabaseAccessibleBadAccessKey(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified)
verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified, err)
}

func TestIsDatabaseAccessibleBadSecretKey(t *testing.T) {
// Override the live connection function with a mock version
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) bool {
return true
ConnectAndQueryObjStore = func(ctx context.Context, log logr.Logger, endpoint, bucket string, accesskey, secretkey []byte, secure bool, pemCerts []byte, objStoreConnectionTimeout time.Duration) (bool, error) {
return true, nil
}

testNamespace := "testnamespace"
Expand Down Expand Up @@ -456,8 +457,8 @@ func TestIsDatabaseAccessibleBadSecretKey(t *testing.T) {
},
}

verified := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified)
verified, err := reconciler.isObjectStorageAccessible(ctx, dspa, params)
assert.False(t, verified, err)
}

func TestJoinHostPort(t *testing.T) {
Expand Down
Loading

0 comments on commit 325b3e8

Please sign in to comment.