Skip to content

Commit

Permalink
Merge pull request #419 from rusq/i174-resume
Browse files Browse the repository at this point in the history
Chunk optimisations and converter fixes
  • Loading branch information
rusq authored Jan 23, 2025
2 parents ddb3f32 + f657806 commit 2119ec0
Show file tree
Hide file tree
Showing 21 changed files with 226 additions and 89 deletions.
14 changes: 12 additions & 2 deletions cmd/slackdump/internal/convertcmd/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/chunk/transform/fileproc"
"github.com/rusq/slackdump/v3/internal/convert"
"github.com/rusq/slackdump/v3/internal/source"
)

//go:embed assets/convert.md
Expand Down Expand Up @@ -105,6 +106,10 @@ type convertflags struct {
}

func chunk2export(ctx context.Context, src, trg string, cflg convertflags) error {
st, err := source.Type(src)
if err != nil {
return err
}
cd, err := chunk.OpenDir(src)
if err != nil {
return err
Expand All @@ -121,11 +126,16 @@ func chunk2export(ctx context.Context, src, trg string, cflg convertflags) error
return errors.New("unknown storage type")
}

var (
includeFiles = cflg.withFiles && (st&source.FMattermost != 0)
includeAvatars = cflg.withAvatars && (st&source.FAvatars != 0)
)

cvt := convert.NewChunkToExport(
cd,
fsa,
convert.WithIncludeFiles(cflg.withFiles),
convert.WithIncludeAvatars(cflg.withAvatars),
convert.WithIncludeFiles(includeFiles),
convert.WithIncludeAvatars(includeAvatars),
convert.WithSrcFileLoc(sttFn),
convert.WithTrgFileLoc(sttFn),
convert.WithLogger(cfg.Log),
Expand Down
4 changes: 2 additions & 2 deletions cmd/slackdump/internal/diag/hydrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func download(ctx context.Context, archive, target string, dry bool) error {

//go:generate mockgen -destination=hydrate_mock_test.go -package=diag -source hydrate.go sourcer
type sourcer interface {
Channels() ([]slack.Channel, error)
Channels(ctx context.Context) ([]slack.Channel, error)
AllMessages(channelID string) ([]slack.Message, error)
AllThreadMessages(channelID, threadTimestamp string) ([]slack.Message, error)
}
Expand All @@ -192,7 +192,7 @@ func downloadFiles(ctx context.Context, d downloader.GetFiler, trg fsadapter.FS,

proc := fileproc.NewExport(fileproc.STmattermost, dl)

channels, err := src.Channels()
channels, err := src.Channels(ctx)
if err != nil {
return fmt.Errorf("error reading channels: %w", err)
}
Expand Down
9 changes: 5 additions & 4 deletions cmd/slackdump/internal/diag/hydrate_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions cmd/slackdump/internal/diag/hydrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func Test_downloadFiles(t *testing.T) {
{
"single message w 2 files",
func(m *Mocksourcer, fs *mock_fsadapter.MockFSCloser, d *mock_downloader.MockGetFiler) {
m.EXPECT().Channels().Return(TestChannels, nil)
m.EXPECT().Channels(gomock.Any()).Return(TestChannels, nil)
m.EXPECT().AllMessages("C01").Return([]slack.Message{TestMsgWFile1}, nil)

fs.EXPECT().Create(filepath.Join("__uploads", "1", "file1")).Return(&fakewritecloser{}, nil)
Expand All @@ -169,7 +169,7 @@ func Test_downloadFiles(t *testing.T) {
{
"all ok",
func(m *Mocksourcer, fs *mock_fsadapter.MockFSCloser, d *mock_downloader.MockGetFiler) {
m.EXPECT().Channels().Return(TestChannels, nil)
m.EXPECT().Channels(gomock.Any()).Return(TestChannels, nil)
m.EXPECT().AllMessages("C01").Return(TestMessages, nil)
m.EXPECT().AllThreadMessages("C01", "2").Return(TestThreadMessages, nil)

Expand Down
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/diag/redownload.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func redownload(ctx context.Context, dir string) (int, error) {
}
defer cd.Close()

channels, err := cd.Channels()
channels, err := cd.Channels(ctx)
if err != nil {
return 0, fmt.Errorf("error reading channels: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func export(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, list
_ = pb.Finish()
// at this point no goroutines are running, we are safe to assume that
// everything we need is in the chunk directory.
if err := conv.WriteIndex(); err != nil {
if err := conv.WriteIndex(ctx); err != nil {
return err
}
if err := tf.Close(); err != nil {
Expand Down
140 changes: 114 additions & 26 deletions internal/chunk/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chunk

import (
"compress/gzip"
"context"
"encoding/gob"
"errors"
"fmt"
Expand All @@ -11,13 +12,13 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"

"github.com/rusq/slackdump/v3/internal/structures"

"github.com/rusq/slack"

"github.com/rusq/slackdump/v3/internal/osext"
"github.com/rusq/slackdump/v3/internal/structures"
)

// file extensions
Expand Down Expand Up @@ -49,8 +50,9 @@ type Directory struct {
dir string
cache dcache

wantCache bool
fm *filemgr
wantCache bool
fm *filemgr
numWorkers int
}

type dcache struct {
Expand All @@ -65,6 +67,12 @@ func WithCache(enabled bool) DirOption {
}
}

func WithNumWorkers(n int) DirOption {
return func(d *Directory) {
d.numWorkers = n
}
}

// OpenDir "opens" an existing directory for read and write operations.
// It expects the directory to exist and to be a directory, otherwise it will
// return an error.
Expand All @@ -75,8 +83,9 @@ func OpenDir(dir string, opt ...DirOption) (*Directory, error) {
return nil, fmt.Errorf("not a directory: %s", dir)
}
d := &Directory{
dir: dir,
wantCache: true,
dir: dir,
wantCache: true,
numWorkers: 16,
}
for _, o := range opt {
o(d)
Expand Down Expand Up @@ -115,34 +124,107 @@ func (d *Directory) Close() error {
return nil
}

// Channels collects all channels from the chunk directory. First, it
// attempts to find the channel.json.gz file, if it's not present, it will go
// through all conversation files and try to get "ChannelInfo" chunk from the
// each file.
func (d *Directory) Channels() ([]slack.Channel, error) {
type resultt[T any] struct {
v []T
err error
}

func collectAll[T any](ctx context.Context, d *Directory, numwrk int, fn func(*File) ([]T, error)) ([]T, error) {
var all []T
fileC := make(chan *File)
errC := make(chan error, 1)
go func() {
defer close(fileC)
defer close(errC)
errC <- d.Walk(func(name string, f *File, err error) error {
if err != nil {
return err
}
fileC <- f
return nil
})
}()

resultsC := make(chan resultt[T])
var wg sync.WaitGroup
wg.Add(numwrk)
for range numwrk {
go func() {
collectWorker(fileC, resultsC, fn)
wg.Done()
}()
}
go func() {
wg.Wait()
close(resultsC)
}()

LOOP:
for {
select {
case <-ctx.Done():
return nil, context.Cause(ctx)
case res, more := <-resultsC:
if !more {
break LOOP
}
if res.err != nil {
return nil, res.err
}
all = append(all, res.v...)
}
}
if err := <-errC; err != nil {
return nil, err
}
return all, nil
}

func collectWorker[T any](fileC <-chan *File, resultsC chan<- resultt[T], fn func(*File) ([]T, error)) {
for f := range fileC {
v, err := fn(f)
resultsC <- resultt[T]{v, err}
f.Close()
}
}

// Channels collects all channels from the chunk directory. First, it attempts
// to find the channel.json.gz file, if it's not present, it will go through
// all conversation files and try to get "ChannelInfo" chunk from each file.
func (d *Directory) Channels(ctx context.Context) ([]slack.Channel, error) {
if val := d.cache.channels.Load(); val != nil {
return val.([]slack.Channel), nil
}
// we are not using the channel.json.gz file because it doesn't contain
// members.
var ch []slack.Channel
if err := d.Walk(func(name string, f *File, err error) error {
ch, err := collectAll(ctx, d, d.numWorkers, func(f *File) ([]slack.Channel, error) {
c, err := f.AllChannels()
if err != nil {
return err
if errors.Is(err, ErrNotFound) {
return nil, nil
}
return nil, err
}
ci, err := f.AllChannelInfos()
ch = append(ch, ci...)

return nil
}); err != nil {
return c, nil
})
if err != nil {
return nil, err
}
sort.Slice(ch, func(i, j int) bool {
return ch[i].NameNormalized < ch[j].NameNormalized
})

d.cache.channels.Store(ch)
return ch, nil
}

type result struct {
ci []slack.Channel
err error
}

// Walk iterates over all chunk files in the directory and calls the function
// for each file. If the function returns an error, the iteration stops.
// It does not close files after the callback is called, so it's a caller's
// responsibility to close it.
func (d *Directory) Walk(fn func(name string, f *File, err error) error) error {
return filepath.WalkDir(d.dir, func(path string, de fs.DirEntry, err error) error {
if err != nil {
Expand All @@ -156,13 +238,19 @@ func (d *Directory) Walk(fn func(name string, f *File, err error) error) error {
return fn(path, nil, err)
}
cf, err := cachedFromReader(f, d.wantCache)
if err == nil {
defer cf.Close()
}
return fn(path, cf, err)
})
}

// WalkSync is the same as Walk, but it closes the file after the callback is
// called.
func (d *Directory) WalkSync(fn func(name string, f *File, err error) error) error {
return d.Walk(func(name string, f *File, err error) error {
defer f.Close()
return fn(name, f, err)
})
}

// Name returns the full directory path.
func (d *Directory) Name() string {
return d.dir
Expand Down Expand Up @@ -356,7 +444,7 @@ func (d *Directory) File(id string, name string) (fs.File, error) {

func (d *Directory) AllMessages(channelID string) ([]slack.Message, error) {
var mm structures.Messages
err := d.Walk(func(name string, f *File, err error) error {
err := d.WalkSync(func(name string, f *File, err error) error {
if err != nil {
return err
}
Expand All @@ -380,7 +468,7 @@ func (d *Directory) AllMessages(channelID string) ([]slack.Message, error) {
func (d *Directory) AllThreadMessages(channelID, threadID string) ([]slack.Message, error) {
var mm structures.Messages
var parent *slack.Message
err := d.Walk(func(name string, f *File, err error) error {
err := d.WalkSync(func(name string, f *File, err error) error {
if err != nil {
return err
}
Expand Down
16 changes: 8 additions & 8 deletions internal/chunk/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func (f *File) ensure() {
}
}

// Offsets returns all offsets for the given id.
func (f *File) Offsets(id GroupID) ([]int64, bool) {
// offsets returns all offsets for the given id.
func (f *File) offsets(id GroupID) ([]int64, bool) {
f.ensure()
ret, ok := f.idx[id]
return ret, ok && len(ret) > 0
Expand Down Expand Up @@ -276,8 +276,8 @@ func (f *File) AllChannelInfos() ([]slack.Channel, error) {
return nil, err
}
for i := range chans {
if chans[i].IsArchived {
slog.Default().Debug("skipping archived channel", "i", i, "id", chans[i].ID)
if chans[i].IsArchived || chans[i].NumMembers == 0 {
slog.Default().Debug("skipping empty or archived channel", "i", i, "id", chans[i].ID)
continue
}
members, err := f.ChannelUsers(chans[i].ID)
Expand Down Expand Up @@ -334,11 +334,11 @@ func allForOffsets[T any](p *File, offsets []int64, fn func(c *Chunk) []T) ([]T,

// ChannelInfo returns the information for the given channel.
func (f *File) ChannelInfo(channelID string) (*slack.Channel, error) {
info, err := f.channelInfo(channelID, false)
info, err := f.channelInfo(channelID)
if err != nil {
return nil, err
}
if !info.IsArchived {
if !info.IsArchived && info.NumMembers > 0 {
users, err := f.ChannelUsers(channelID)
if err != nil {
return nil, fmt.Errorf("failed getting channel users for %q: %w", channelID, err)
Expand All @@ -354,7 +354,7 @@ func (f *File) ChannelUsers(channelID string) ([]string, error) {
})
}

func (f *File) channelInfo(channelID string, _ bool) (*slack.Channel, error) {
func (f *File) channelInfo(channelID string) (*slack.Channel, error) {
chunk, err := f.firstChunkForID(channelInfoID(channelID))
if err != nil {
return nil, err
Expand All @@ -367,7 +367,7 @@ func (f *File) channelInfo(channelID string, _ bool) (*slack.Channel, error) {

// firstChunkForID returns the first chunk in the file for the given id.
func (f *File) firstChunkForID(id GroupID) (*Chunk, error) {
ofs, ok := f.Offsets(id)
ofs, ok := f.offsets(id)
if !ok {
return nil, ErrNotFound
}
Expand Down
Loading

0 comments on commit 2119ec0

Please sign in to comment.