Skip to content

Commit

Permalink
rbd: add a MirrorImageGlobalStatusIter implementing rbd_mirror_image_…
Browse files Browse the repository at this point in the history
…global_status_list

This adds the function call in the style of an iterator, as the number
of mirrored images in the entire pool could be large. Tests are
included.

Signed-off-by: John Mulligan <[email protected]>
  • Loading branch information
phlogistonjohn authored and mergify[bot] committed Jun 8, 2021
1 parent 8179bd4 commit 24eeb95
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 1 deletion.
113 changes: 112 additions & 1 deletion rbd/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,13 @@ func (image *Image) GetGlobalMirrorStatus() (GlobalMirrorImageStatus, error) {
}
defer C.rbd_mirror_image_global_status_cleanup(&s)

status := newGlobalMirrorImageStatus(&s)
return status, nil
}

func newGlobalMirrorImageStatus(
s *C.rbd_mirror_image_global_status_t) GlobalMirrorImageStatus {

status := GlobalMirrorImageStatus{
Name: C.GoString(s.name),
Info: convertMirrorImageInfo(&s.info),
Expand All @@ -406,7 +413,7 @@ func (image *Image) GetGlobalMirrorStatus() (GlobalMirrorImageStatus, error) {
Up: bool(ss.up),
}
}
return status, nil
return status
}

// CreateMirrorSnapshot creates a snapshot for image propagation to mirrors.
Expand Down Expand Up @@ -583,3 +590,107 @@ func ImportMirrorPeerBootstrapToken(
cToken)
return getError(ret)
}

// GlobalMirrorImageIDAndStatus values contain an ID string for a RBD image
// and that image's GlobalMirrorImageStatus.
type GlobalMirrorImageIDAndStatus struct {
ID string
Status GlobalMirrorImageStatus
}

func mirrorImageGlobalStatusList(
ioctx *rados.IOContext, start string,
results []GlobalMirrorImageIDAndStatus) (int, error) {
// this C function is treated like a "batch" iterator. Based on it's
// design it appears expected to call it multiple times to get
// the entire result.
cStart := C.CString(start)
defer C.free(unsafe.Pointer(cStart))

var (
max = C.size_t(len(results))
length = C.size_t(0)
ids = make([]*C.char, len(results))
images = make([]C.rbd_mirror_image_global_status_t, len(results))
)
ret := C.rbd_mirror_image_global_status_list(
cephIoctx(ioctx),
cStart,
max,
(**C.char)(unsafe.Pointer(&ids[0])),
(*C.rbd_mirror_image_global_status_t)(unsafe.Pointer(&images[0])),
&length)

for i := 0; i < int(length); i++ {
results[i].ID = C.GoString(ids[i])
results[i].Status = newGlobalMirrorImageStatus(&images[0])
}
C.rbd_mirror_image_global_status_list_cleanup(
(**C.char)(unsafe.Pointer(&ids[0])),
(*C.rbd_mirror_image_global_status_t)(unsafe.Pointer(&images[0])),
length)
return int(length), getError(ret)
}

// statusIterBufSize is intentionally not a constant. The unit tests alter
// this value in order to get more code coverage w/o needing to create
// very many images.
var statusIterBufSize = 64

// MirrorImageGlobalStatusIter provide methods for iterating over all
// the GlobalMirrorImageIdAndStatus values in a pool.
type MirrorImageGlobalStatusIter struct {
ioctx *rados.IOContext

buf []GlobalMirrorImageIDAndStatus
lastID string
}

// NewMirrorImageGlobalStatusIter creates a new iterator type ready for use.
func NewMirrorImageGlobalStatusIter(ioctx *rados.IOContext) *MirrorImageGlobalStatusIter {
return &MirrorImageGlobalStatusIter{
ioctx: ioctx,
}
}

// Next fetches one GlobalMirrorImageIDAndStatus value or a nil value if
// iteration is exhausted. The error return will be non-nil if an underlying
// error fetching more values occurred.
func (iter *MirrorImageGlobalStatusIter) Next() (*GlobalMirrorImageIDAndStatus, error) {
if len(iter.buf) == 0 {
if err := iter.fetch(); err != nil {
return nil, err
}
}
if len(iter.buf) == 0 {
return nil, nil
}
item := iter.buf[0]
iter.lastID = item.ID
iter.buf = iter.buf[1:]
return &item, nil
}

// Close terminates iteration regardless if iteration was completed and
// frees any associated resources.
func (iter *MirrorImageGlobalStatusIter) Close() error {
iter.buf = nil
iter.lastID = ""
return nil
}

func (iter *MirrorImageGlobalStatusIter) fetch() error {
iter.buf = nil
items := make([]GlobalMirrorImageIDAndStatus, statusIterBufSize)
n, err := mirrorImageGlobalStatusList(
iter.ioctx,
iter.lastID,
items)
if err != nil {
return err
}
if n > 0 {
iter.buf = items[:n]
}
return nil
}
82 changes: 82 additions & 0 deletions rbd/mirror_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,3 +734,85 @@ func TestMirrorBootstrapToken(t *testing.T) {
assert.NoError(t, err)
})
}

func TestMirrorImageGlobalStatusIter(t *testing.T) {
defer func(x int) {
statusIterBufSize = x
}(statusIterBufSize)
// shrink the buffer size in order to trigger more of the
// retry logic in the iter type
statusIterBufSize = 4

conn := radosConnect(t)
poolName := GetUUID()
err := conn.MakePool(poolName)
require.NoError(t, err)
defer func() {
assert.NoError(t, conn.DeletePool(poolName))
conn.Shutdown()
}()

ioctx, err := conn.OpenIOContext(poolName)
assert.NoError(t, err)
defer func() {
ioctx.Destroy()
}()

// enable per-image mirroring for this pool
err = SetMirrorMode(ioctx, MirrorModeImage)
require.NoError(t, err)

imgName := GetUUID()
options := NewRbdImageOptions()
assert.NoError(t, options.SetUint64(ImageOptionOrder, uint64(testImageOrder)))

for i := 0; i < 7; i++ {
name := fmt.Sprintf("%s%d", imgName, i)
err = CreateImage(ioctx, name, testImageSize, options)
require.NoError(t, err)
img, err := OpenImage(ioctx, name, NoSnapshot)
assert.NoError(t, err)
err = img.MirrorEnable(ImageMirrorModeSnapshot)
assert.NoError(t, err)
require.NoError(t, img.Close())
}

t.Run("ioctxNil", func(t *testing.T) {
iter := NewMirrorImageGlobalStatusIter(nil)
defer iter.Close()
assert.Panics(t, func() {
iter.Next()
})
})

t.Run("getStatus", func(t *testing.T) {
lst := []*GlobalMirrorImageIDAndStatus{}
iter := NewMirrorImageGlobalStatusIter(ioctx)
for {
istatus, err := iter.Next()
assert.NoError(t, err)
if istatus == nil {
break
}
lst = append(lst, istatus)
}
assert.Len(t, lst, 7)
gms := lst[0].Status
assert.NoError(t, err)
assert.NotEqual(t, "", gms.Name)
assert.NotEqual(t, "", gms.Info.GlobalID)
assert.Equal(t, gms.Info.State, MirrorImageEnabled)
assert.Equal(t, gms.Info.Primary, false)
if assert.Len(t, gms.SiteStatuses, 1) {
ss := gms.SiteStatuses[0]
assert.Equal(t, "", ss.MirrorUUID)
assert.Equal(t, MirrorImageStatusStateUnknown, ss.State, ss.State)
assert.Equal(t, "status not found", ss.Description)
assert.Equal(t, int64(0), ss.LastUpdate)
assert.False(t, ss.Up)
ls, err := gms.LocalStatus()
assert.NoError(t, err)
assert.Equal(t, ss, ls)
}
})
}

0 comments on commit 24eeb95

Please sign in to comment.