Skip to content

Commit

Permalink
feat: support splitting blobs when stored as OCI layer (#1140)
Browse files Browse the repository at this point in the history
<!-- markdownlint-disable MD041 -->
#### What this PR does / why we need it
There are OCI repositories with a layer size limitation. Because OCM
potentially maps any external artifact to a single blob stored as layer
on OCI registries, this could lead to problems. An obvious problematic
scenario is the transport of a multi-platform OCI image. Its blob format
is an archive containing all images and all layers of those images.

This PR introduces the possibility to specify blob limits for OCI
registries. The OCM-to-OCI mapping then
splits larger blobs into multiple layers.
The `localBlob` access method then uses a comma-separated list of blob
layer-blob digest to remember the sequence of layers.
The access then combines the layerblobs again to a single stream.
 
#### Which issue(s) this PR fixes
<!--
Usage: `Fixes #<issue number>`, or `Fixes (paste link of issue)`.
-->
Fixes: open-component-model/ocm-project#12

A follow-up issue
open-component-model/ocm-project#338 describes
the provisioning of appropriate defaults for common public registries.

---------

Co-authored-by: Jakob Möller <[email protected]>
  • Loading branch information
mandelsoft and jakobmoellerdev authored Dec 20, 2024
1 parent c2605cc commit 2cb3187
Show file tree
Hide file tree
Showing 12 changed files with 732 additions and 34 deletions.
11 changes: 11 additions & 0 deletions api/credentials/identity/hostpath/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,14 @@ func PathPrefix(id cpi.ConsumerIdentity) string {
}
return strings.TrimPrefix(id[ID_PATHPREFIX], "/")
}

func HostPort(id cpi.ConsumerIdentity) string {
if id == nil {
return ""
}
host := id[ID_HOSTNAME]
if port, ok := id[ID_PORT]; ok {
return host + ":" + port
}
return host
}
1 change: 1 addition & 0 deletions api/oci/extensions/repositories/ocireg/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func NewRepository(ctx cpi.Context, spec *RepositorySpec, info *RepositoryInfo)
spec: spec,
info: info,
}
i.logger.Debug("created repository")
return cpi.NewRepository(i), nil
}

Expand Down
18 changes: 18 additions & 0 deletions api/ocm/cpi/repocpi/bridge_r.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@ type RepositoryImpl interface {
io.Closer
}

// Chunked is an optional interface, which
// may be implemented to accept a blob limit for mapping
// local blobs to an external storage system.
type Chunked interface {
// SetBlobLimit sets the blob limit if possible.
// It returns true, if this was successful.
SetBlobLimit(s int64) bool
}

// SetBlobLimit tries to set a blob limit for a repository
// implementation. It returns true, if this was possible.
func SetBlobLimit(i RepositoryImpl, s int64) bool {
if c, ok := i.(Chunked); ok {
return c.SetBlobLimit(s)
}
return false
}

type _repositoryBridgeBase = resource.ResourceImplBase[cpi.Repository]

type repositoryBridge struct {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package genericocireg

import (
"bytes"
"io"
"os"
"strings"
"sync"

"github.com/mandelsoft/goutils/errors"
"github.com/mandelsoft/goutils/finalizer"
"github.com/opencontainers/go-digest"

"ocm.software/ocm/api/oci"
Expand Down Expand Up @@ -88,9 +92,19 @@ func (m *localBlobAccessMethod) getBlob() (blobaccess.DataAccess, error) {
return nil, errors.ErrNotImplemented("artifact blob synthesis")
}
}
_, data, err := m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference))
if err != nil {
return nil, err
refs := strings.Split(m.spec.LocalReference, ",")

var (
data blobaccess.DataAccess
err error
)
if len(refs) < 2 {
_, data, err = m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference))
if err != nil {
return nil, err
}
} else {
data = &composedBlock{m, refs}
}
m.data = data
return m.data, err
Expand All @@ -111,3 +125,119 @@ func (m *localBlobAccessMethod) Get() ([]byte, error) {
func (m *localBlobAccessMethod) MimeType() string {
return m.spec.MediaType
}

////////////////////////////////////////////////////////////////////////////////

type composedBlock struct {
m *localBlobAccessMethod
refs []string
}

var _ blobaccess.DataAccess = (*composedBlock)(nil)

func (c *composedBlock) Get() ([]byte, error) {
buf := bytes.NewBuffer(nil)
for _, ref := range c.refs {
var finalize finalizer.Finalizer

_, data, err := c.m.namespace.GetBlobData(digest.Digest(ref))
if err != nil {
return nil, err
}
finalize.Close(data)
r, err := data.Reader()
if err != nil {
return nil, err
}
finalize.Close(r)
_, err = io.Copy(buf, r)
if err != nil {
return nil, err
}
err = finalize.Finalize()
if err != nil {
return nil, err
}
}
return buf.Bytes(), nil
}

func (c *composedBlock) Reader() (io.ReadCloser, error) {
return &composedReader{
m: c.m,
refs: c.refs,
}, nil
}

func (c *composedBlock) Close() error {
return nil
}

type composedReader struct {
lock sync.Mutex
m *localBlobAccessMethod
refs []string
reader io.ReadCloser
data blobaccess.DataAccess
}

func (c *composedReader) Read(p []byte) (n int, err error) {
c.lock.Lock()
defer c.lock.Unlock()

for {
if c.reader != nil {
n, err := c.reader.Read(p)

if err == io.EOF {
c.reader.Close()
c.data.Close()
c.refs = c.refs[1:]
c.reader = nil
c.data = nil
// start new layer and return partial (>0) read before next layer is started
err = nil
}
// return partial read (even a zero read if layer is not yet finished) or error
if c.reader != nil || err != nil || n > 0 {
return n, err
}
// otherwise, we can use the given buffer for the next layer

// now, we have to check for a next succeeding layer.
// This means to finish with the actual reader and continue
// with the next one.
}

// If no more layers are available, report EOF.
if len(c.refs) == 0 {
return 0, io.EOF
}

ref := strings.TrimSpace(c.refs[0])
_, c.data, err = c.m.namespace.GetBlobData(digest.Digest(ref))
if err != nil {
return 0, err
}
c.reader, err = c.data.Reader()
if err != nil {
return 0, err
}
}
}

func (c *composedReader) Close() error {
c.lock.Lock()
defer c.lock.Unlock()

if c.reader == nil && c.refs == nil {
return os.ErrClosed
}
if c.reader != nil {
c.reader.Close()
c.data.Close()
c.reader = nil
c.refs = nil
}
return nil
}
53 changes: 53 additions & 0 deletions api/ocm/extensions/repositories/genericocireg/bloblimits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package genericocireg

import (
"sync"

configctx "ocm.software/ocm/api/config"
"ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config"
)

var (
defaultBlobLimits config.BlobLimits
lock sync.Mutex
)

const (
KB = int64(1000)
MB = 1000 * KB
GB = 1000 * MB
)

func init() {
defaultBlobLimits = config.BlobLimits{}

// Add limits for known OCI repositories, here,
// or provide init functions in specialized packages
// by calling AddDefaultBlobLimit.
AddDefaultBlobLimit("ghcr.io", 10*GB) // https://github.com/orgs/community/discussions/77429
}

// AddDefaultBlobLimit can be used to set default blob limits
// for known repositories.
// Those limits will be overwritten, by blob limits
// given by a configuration object and the repository
// specification.
func AddDefaultBlobLimit(name string, limit int64) {
lock.Lock()
defer lock.Unlock()

defaultBlobLimits[name] = limit
}

func ConfigureBlobLimits(ctx configctx.ContextProvider, target config.Configurable) {
if target != nil {
lock.Lock()
defer lock.Unlock()

target.ConfigureBlobLimits(defaultBlobLimits)

if ctx != nil {
ctx.ConfigContext().ApplyTo(0, target)
}
}
}
Loading

0 comments on commit 2cb3187

Please sign in to comment.