Skip to content

Commit

Permalink
make compression format for one-blocks configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
fschoell committed Sep 9, 2024
1 parent a173236 commit 5947383
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 1 deletion.
10 changes: 10 additions & 0 deletions cmd/apps/reader_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"regexp"
"slices"
"time"

"github.com/kballard/go-shellquote"
Expand Down Expand Up @@ -68,6 +69,7 @@ func RegisterReaderNodeApp[B firecore.Block](chain *firecore.Chain[B], rootLog *
for writes. You should set this flag if you have multiple reader running, each one should get a unique identifier, the
hostname value is a good value to use.
`))
cmd.Flags().String("reader-node-one-block-compression", "zstd", "Compression to use on one-block files. Valid values are 'none', 'gzip', 'zstd'")
return nil
},
InitFunc: func(runtime *launcher.Runtime) error {
Expand Down Expand Up @@ -179,6 +181,13 @@ func RegisterReaderNodeApp[B firecore.Block](chain *firecore.Chain[B], rootLog *
gprcListenAddr := viper.GetString("reader-node-grpc-listen-addr")
oneBlockFileSuffix := viper.GetString("reader-node-one-block-suffix")
blocksChanCapacity := viper.GetInt("reader-node-blocks-chan-capacity")
oneBlockCompression := viper.GetString("reader-node-one-block-compression")
if !slices.Contains([]string{"none", "gzip", "zstd"}, oneBlockCompression) {
return nil, fmt.Errorf("invalid value for 'reader-node-one-block-compression': %q, expected one of 'none', 'zstd', 'gzip'", oneBlockCompression)
}
if oneBlockCompression == "none" {
oneBlockCompression = ""
}

readerPlugin, err := reader.NewMindReaderPlugin(
oneBlocksStoreURL,
Expand All @@ -194,6 +203,7 @@ func RegisterReaderNodeApp[B firecore.Block](chain *firecore.Chain[B], rootLog *
chainOperator.Shutdown(nil)
},
oneBlockFileSuffix,
oneBlockCompression,
blockStreamServer,
appLogger,
appTracer,
Expand Down
2 changes: 2 additions & 0 deletions node-manager/app/node_reader_stdin/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Config struct {
GRPCAddr string
OneBlocksStoreURL string
OneBlockSuffix string
OneBlockCompression string
MindReadBlocksChanCapacity int
StartBlockNum uint64
StopBlockNum uint64
Expand Down Expand Up @@ -97,6 +98,7 @@ func (a *App) Run() error {
a.modules.MetricsAndReadinessManager.UpdateHeadBlock,
func(_ error) {},
a.Config.OneBlockSuffix,
a.Config.OneBlockCompression,
blockStreamServer,
a.zlogger,
a.tracer,
Expand Down
11 changes: 10 additions & 1 deletion node-manager/mindreader/mindreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func NewMindReaderPlugin(
headBlockUpdater nodeManager.HeadBlockUpdater,
shutdownFunc func(error),
oneBlockSuffix string,
oneBlockCompression string,
blockStreamServer *blockstream.Server,
zlogger *zap.Logger,
tracer logging.Tracer,
Expand All @@ -99,6 +100,7 @@ func NewMindReaderPlugin(
zlogger.Info("creating mindreader plugin",
zap.String("one_blocks_store_url", oneBlocksStoreURL),
zap.String("one_block_suffix", oneBlockSuffix),
zap.String("one_block_compression", oneBlockCompression),
zap.String("working_directory", workingDirectory),
zap.Uint64("start_block_num", startBlockNum),
zap.Uint64("stop_block_num", stopBlockNum),
Expand All @@ -120,7 +122,14 @@ func NewMindReaderPlugin(
return nil, fmt.Errorf("new local one block store: %w", err)
}

remoteOneBlocksStore, err := dstore.NewStore(oneBlocksStoreURL, "dbin.zst", "zstd", false)
extension := "dbin"
if oneBlockCompression == "zstd" {
extension = "dbin.zst"
} else if oneBlockCompression == "gzip" {
extension = "dbin.gz"
}

remoteOneBlocksStore, err := dstore.NewStore(oneBlocksStoreURL, extension, oneBlockCompression, false)
if err != nil {
return nil, fmt.Errorf("new remote one block store: %w", err)
}
Expand Down

0 comments on commit 5947383

Please sign in to comment.