From f3594bddeb3c278cc00d855dc533481b8c98936e Mon Sep 17 00:00:00 2001 From: Rustam Gilyazov <16064414+rusq@users.noreply.github.com> Date: Wed, 22 Jan 2025 14:00:36 +1000 Subject: [PATCH 1/4] async channel info scan --- cmd/slackdump/internal/diag/hydrate.go | 4 +- .../internal/diag/hydrate_mock_test.go | 9 +- cmd/slackdump/internal/diag/hydrate_test.go | 4 +- cmd/slackdump/internal/diag/redownload.go | 2 +- cmd/slackdump/internal/export/v3.go | 2 +- internal/chunk/directory.go | 140 ++++++++++++++---- internal/chunk/transform/export.go | 4 +- internal/convert/chunkexp.go | 4 +- internal/source/chunkdir.go | 7 +- internal/source/dump.go | 9 +- internal/source/export.go | 8 +- internal/source/export_test.go | 3 +- internal/source/source.go | 9 +- internal/viewer/handlers.go | 6 +- internal/viewer/viewer.go | 6 +- 15 files changed, 156 insertions(+), 61 deletions(-) diff --git a/cmd/slackdump/internal/diag/hydrate.go b/cmd/slackdump/internal/diag/hydrate.go index 191f583b..28d0c5be 100644 --- a/cmd/slackdump/internal/diag/hydrate.go +++ b/cmd/slackdump/internal/diag/hydrate.go @@ -178,7 +178,7 @@ func download(ctx context.Context, archive, target string, dry bool) error { //go:generate mockgen -destination=hydrate_mock_test.go -package=diag -source hydrate.go sourcer type sourcer interface { - Channels() ([]slack.Channel, error) + Channels(ctx context.Context) ([]slack.Channel, error) AllMessages(channelID string) ([]slack.Message, error) AllThreadMessages(channelID, threadTimestamp string) ([]slack.Message, error) } @@ -192,7 +192,7 @@ func downloadFiles(ctx context.Context, d downloader.GetFiler, trg fsadapter.FS, proc := fileproc.NewExport(fileproc.STmattermost, dl) - channels, err := src.Channels() + channels, err := src.Channels(ctx) if err != nil { return fmt.Errorf("error reading channels: %w", err) } diff --git a/cmd/slackdump/internal/diag/hydrate_mock_test.go b/cmd/slackdump/internal/diag/hydrate_mock_test.go index 3267d690..de5910e4 100644 --- a/cmd/slackdump/internal/diag/hydrate_mock_test.go +++ b/cmd/slackdump/internal/diag/hydrate_mock_test.go @@ -10,6 +10,7 @@ package diag import ( + context "context" reflect "reflect" slack "github.com/rusq/slack" @@ -71,16 +72,16 @@ func (mr *MocksourcerMockRecorder) AllThreadMessages(channelID, threadTimestamp } // Channels mocks base method. -func (m *Mocksourcer) Channels() ([]slack.Channel, error) { +func (m *Mocksourcer) Channels(ctx context.Context) ([]slack.Channel, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Channels") + ret := m.ctrl.Call(m, "Channels", ctx) ret0, _ := ret[0].([]slack.Channel) ret1, _ := ret[1].(error) return ret0, ret1 } // Channels indicates an expected call of Channels. -func (mr *MocksourcerMockRecorder) Channels() *gomock.Call { +func (mr *MocksourcerMockRecorder) Channels(ctx any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Channels", reflect.TypeOf((*Mocksourcer)(nil).Channels)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Channels", reflect.TypeOf((*Mocksourcer)(nil).Channels), ctx) } diff --git a/cmd/slackdump/internal/diag/hydrate_test.go b/cmd/slackdump/internal/diag/hydrate_test.go index 8118e208..2f1f53bb 100644 --- a/cmd/slackdump/internal/diag/hydrate_test.go +++ b/cmd/slackdump/internal/diag/hydrate_test.go @@ -155,7 +155,7 @@ func Test_downloadFiles(t *testing.T) { { "single message w 2 files", func(m *Mocksourcer, fs *mock_fsadapter.MockFSCloser, d *mock_downloader.MockGetFiler) { - m.EXPECT().Channels().Return(TestChannels, nil) + m.EXPECT().Channels(gomock.Any()).Return(TestChannels, nil) m.EXPECT().AllMessages("C01").Return([]slack.Message{TestMsgWFile1}, nil) fs.EXPECT().Create(filepath.Join("__uploads", "1", "file1")).Return(&fakewritecloser{}, nil) @@ -169,7 +169,7 @@ func Test_downloadFiles(t *testing.T) { { "all ok", func(m *Mocksourcer, fs *mock_fsadapter.MockFSCloser, d *mock_downloader.MockGetFiler) { - m.EXPECT().Channels().Return(TestChannels, nil) + m.EXPECT().Channels(gomock.Any()).Return(TestChannels, nil) m.EXPECT().AllMessages("C01").Return(TestMessages, nil) m.EXPECT().AllThreadMessages("C01", "2").Return(TestThreadMessages, nil) diff --git a/cmd/slackdump/internal/diag/redownload.go b/cmd/slackdump/internal/diag/redownload.go index 56ebd4df..a5a94a9e 100644 --- a/cmd/slackdump/internal/diag/redownload.go +++ b/cmd/slackdump/internal/diag/redownload.go @@ -82,7 +82,7 @@ func redownload(ctx context.Context, dir string) (int, error) { } defer cd.Close() - channels, err := cd.Channels() + channels, err := cd.Channels(ctx) if err != nil { return 0, fmt.Errorf("error reading channels: %w", err) } diff --git a/cmd/slackdump/internal/export/v3.go b/cmd/slackdump/internal/export/v3.go index bea68283..92d10fff 100644 --- a/cmd/slackdump/internal/export/v3.go +++ b/cmd/slackdump/internal/export/v3.go @@ -95,7 +95,7 @@ func export(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, list _ = pb.Finish() // at this point no goroutines are running, we are safe to assume that // everything we need is in the chunk directory. - if err := conv.WriteIndex(); err != nil { + if err := conv.WriteIndex(ctx); err != nil { return err } if err := tf.Close(); err != nil { diff --git a/internal/chunk/directory.go b/internal/chunk/directory.go index b6fce986..eca4af6c 100644 --- a/internal/chunk/directory.go +++ b/internal/chunk/directory.go @@ -2,6 +2,7 @@ package chunk import ( "compress/gzip" + "context" "encoding/gob" "errors" "fmt" @@ -11,13 +12,13 @@ import ( "path/filepath" "sort" "strings" + "sync" "sync/atomic" - "github.com/rusq/slackdump/v3/internal/structures" - "github.com/rusq/slack" "github.com/rusq/slackdump/v3/internal/osext" + "github.com/rusq/slackdump/v3/internal/structures" ) // file extensions @@ -49,8 +50,9 @@ type Directory struct { dir string cache dcache - wantCache bool - fm *filemgr + wantCache bool + fm *filemgr + numWorkers int } type dcache struct { @@ -65,6 +67,12 @@ func WithCache(enabled bool) DirOption { } } +func WithNumWorkers(n int) DirOption { + return func(d *Directory) { + d.numWorkers = n + } +} + // OpenDir "opens" an existing directory for read and write operations. // It expects the directory to exist and to be a directory, otherwise it will // return an error. @@ -75,8 +83,9 @@ func OpenDir(dir string, opt ...DirOption) (*Directory, error) { return nil, fmt.Errorf("not a directory: %s", dir) } d := &Directory{ - dir: dir, - wantCache: true, + dir: dir, + wantCache: true, + numWorkers: 16, } for _, o := range opt { o(d) @@ -115,34 +124,107 @@ func (d *Directory) Close() error { return nil } -// Channels collects all channels from the chunk directory. First, it -// attempts to find the channel.json.gz file, if it's not present, it will go -// through all conversation files and try to get "ChannelInfo" chunk from the -// each file. -func (d *Directory) Channels() ([]slack.Channel, error) { +type resultt[T any] struct { + v []T + err error +} + +func collectAll[T any](ctx context.Context, d *Directory, numwrk int, fn func(*File) ([]T, error)) ([]T, error) { + var all []T + fileC := make(chan *File) + errC := make(chan error, 1) + go func() { + defer close(fileC) + defer close(errC) + errC <- d.Walk(func(name string, f *File, err error) error { + if err != nil { + return err + } + fileC <- f + return nil + }) + }() + + resultsC := make(chan resultt[T]) + var wg sync.WaitGroup + wg.Add(numwrk) + for range numwrk { + go func() { + collectWorker(fileC, resultsC, fn) + wg.Done() + }() + } + go func() { + wg.Wait() + close(resultsC) + }() + +LOOP: + for { + select { + case <-ctx.Done(): + return nil, context.Cause(ctx) + case res, more := <-resultsC: + if !more { + break LOOP + } + if res.err != nil { + return nil, res.err + } + all = append(all, res.v...) + } + } + if err := <-errC; err != nil { + return nil, err + } + return all, nil +} + +func collectWorker[T any](fileC <-chan *File, resultsC chan<- resultt[T], fn func(*File) ([]T, error)) { + for f := range fileC { + v, err := fn(f) + resultsC <- resultt[T]{v, err} + f.Close() + } +} + +// Channels collects all channels from the chunk directory. First, it attempts +// to find the channel.json.gz file, if it's not present, it will go through +// all conversation files and try to get "ChannelInfo" chunk from each file. +func (d *Directory) Channels(ctx context.Context) ([]slack.Channel, error) { if val := d.cache.channels.Load(); val != nil { return val.([]slack.Channel), nil } - // we are not using the channel.json.gz file because it doesn't contain - // members. - var ch []slack.Channel - if err := d.Walk(func(name string, f *File, err error) error { + ch, err := collectAll(ctx, d, d.numWorkers, func(f *File) ([]slack.Channel, error) { + c, err := f.AllChannels() if err != nil { - return err + if errors.Is(err, ErrNotFound) { + return nil, nil + } + return nil, err } - ci, err := f.AllChannelInfos() - ch = append(ch, ci...) - - return nil - }); err != nil { + return c, nil + }) + if err != nil { return nil, err } + sort.Slice(ch, func(i, j int) bool { + return ch[i].NameNormalized < ch[j].NameNormalized + }) + d.cache.channels.Store(ch) return ch, nil } +type result struct { + ci []slack.Channel + err error +} + // Walk iterates over all chunk files in the directory and calls the function // for each file. If the function returns an error, the iteration stops. +// It does not close files after the callback is called, so it's a caller's +// responsibility to close it. func (d *Directory) Walk(fn func(name string, f *File, err error) error) error { return filepath.WalkDir(d.dir, func(path string, de fs.DirEntry, err error) error { if err != nil { @@ -156,13 +238,19 @@ func (d *Directory) Walk(fn func(name string, f *File, err error) error) error { return fn(path, nil, err) } cf, err := cachedFromReader(f, d.wantCache) - if err == nil { - defer cf.Close() - } return fn(path, cf, err) }) } +// WalkSync is the same as Walk, but it closes the file after the callback is +// called. +func (d *Directory) WalkSync(fn func(name string, f *File, err error) error) error { + return d.Walk(func(name string, f *File, err error) error { + defer f.Close() + return fn(name, f, err) + }) +} + // Name returns the full directory path. func (d *Directory) Name() string { return d.dir @@ -356,7 +444,7 @@ func (d *Directory) File(id string, name string) (fs.File, error) { func (d *Directory) AllMessages(channelID string) ([]slack.Message, error) { var mm structures.Messages - err := d.Walk(func(name string, f *File, err error) error { + err := d.WalkSync(func(name string, f *File, err error) error { if err != nil { return err } @@ -380,7 +468,7 @@ func (d *Directory) AllMessages(channelID string) ([]slack.Message, error) { func (d *Directory) AllThreadMessages(channelID, threadID string) ([]slack.Message, error) { var mm structures.Messages var parent *slack.Message - err := d.Walk(func(name string, f *File, err error) error { + err := d.WalkSync(func(name string, f *File, err error) error { if err != nil { return err } diff --git a/internal/chunk/transform/export.go b/internal/chunk/transform/export.go index a49b7466..c76c2fe6 100644 --- a/internal/chunk/transform/export.go +++ b/internal/chunk/transform/export.go @@ -256,12 +256,12 @@ func ExportChanName(ch *slack.Channel) string { // WriteIndex generates and writes the export index files. It must be called // once all transformations are done, because it might require to read channel // files. -func (t *ExpConverter) WriteIndex() error { +func (t *ExpConverter) WriteIndex(ctx context.Context) error { wsp, err := t.cd.WorkspaceInfo() if err != nil { return fmt.Errorf("failed to get the workspace info: %w", err) } - chans, err := t.cd.Channels() // this might read the channel files if it doesn't find the channels list chunks. + chans, err := t.cd.Channels(ctx) if err != nil { return fmt.Errorf("error indexing channels: %w", err) } diff --git a/internal/convert/chunkexp.go b/internal/convert/chunkexp.go index 47593e2b..d30c2d7f 100644 --- a/internal/convert/chunkexp.go +++ b/internal/convert/chunkexp.go @@ -176,7 +176,7 @@ func (c *ChunkToExport) Convert(ctx context.Context) error { if err := c.Validate(); err != nil { return err } - channels, err := c.src.Channels() + channels, err := c.src.Channels(ctx) if err != nil { return err } @@ -248,7 +248,7 @@ func (c *ChunkToExport) Convert(ctx context.Context) error { go func() { defer msgwg.Done() c.lg.DebugContext(ctx, "writing index", "name", c.src.Name()) - if err := conv.WriteIndex(); err != nil { + if err := conv.WriteIndex(ctx); err != nil { errC <- err } }() diff --git a/internal/source/chunkdir.go b/internal/source/chunkdir.go index 4e582957..c7787ba1 100644 --- a/internal/source/chunkdir.go +++ b/internal/source/chunkdir.go @@ -1,6 +1,7 @@ package source import ( + "context" "os" "github.com/rusq/slack" @@ -48,7 +49,7 @@ func (c *ChunkDir) AllThreadMessages(channelID, threadID string) ([]slack.Messag return c.d.AllThreadMessages(channelID, threadID) } -func (c *ChunkDir) ChannelInfo(channelID string) (*slack.Channel, error) { +func (c *ChunkDir) ChannelInfo(_ context.Context, channelID string) (*slack.Channel, error) { f, err := c.d.Open(chunk.ToFileID(channelID, "", false)) if err != nil { return nil, err @@ -57,8 +58,8 @@ func (c *ChunkDir) ChannelInfo(channelID string) (*slack.Channel, error) { return f.ChannelInfo(channelID) } -func (c *ChunkDir) Channels() ([]slack.Channel, error) { - return c.d.Channels() +func (c *ChunkDir) Channels(ctx context.Context) ([]slack.Channel, error) { + return c.d.Channels(ctx) } func (c *ChunkDir) Name() string { diff --git a/internal/source/dump.go b/internal/source/dump.go index c7719205..8fc813ed 100644 --- a/internal/source/dump.go +++ b/internal/source/dump.go @@ -1,6 +1,7 @@ package source import ( + "context" "io/fs" "log/slog" "os" @@ -19,7 +20,7 @@ type Dump struct { Storage } -func NewDump(fsys fs.FS, name string) (*Dump, error) { +func NewDump(ctx context.Context, fsys fs.FS, name string) (*Dump, error) { var st Storage = fstNotFound{} if fst, err := NewDumpStorage(fsys); err == nil { st = fst @@ -30,7 +31,7 @@ func NewDump(fsys fs.FS, name string) (*Dump, error) { Storage: st, } // initialise channels for quick lookup - c, err := d.Channels() + c, err := d.Channels(ctx) if err != nil { return nil, err } @@ -46,7 +47,7 @@ func (d Dump) Type() string { return "dump" } -func (d Dump) Channels() ([]slack.Channel, error) { +func (d Dump) Channels(context.Context) ([]slack.Channel, error) { // if user was diligent enough to dump channels and save them in a file, // we can use that. if cc, err := unmarshal[[]slack.Channel](d.fs, "channels.json"); err == nil { @@ -190,7 +191,7 @@ func (d Dump) findThreadFile(channelID, threadID string) ([]types.Message, error return c.Messages, nil } -func (d Dump) ChannelInfo(channelID string) (*slack.Channel, error) { +func (d Dump) ChannelInfo(_ context.Context, channelID string) (*slack.Channel, error) { for _, c := range d.c { if c.ID == channelID { return &c, nil diff --git a/internal/source/export.go b/internal/source/export.go index f828d7ab..9f6674e8 100644 --- a/internal/source/export.go +++ b/internal/source/export.go @@ -1,6 +1,7 @@ package source import ( + "context" "fmt" "io/fs" "log/slog" @@ -9,6 +10,7 @@ import ( "github.com/rusq/slackdump/v3/internal/chunk" "github.com/rusq/slack" + "github.com/rusq/slackdump/v3/export" "github.com/rusq/slackdump/v3/internal/structures" ) @@ -63,7 +65,7 @@ func loadStorage(fsys fs.FS) (Storage, error) { return NewStandardStorage(fsys, idx), nil } -func (e *Export) Channels() ([]slack.Channel, error) { +func (e *Export) Channels(context.Context) ([]slack.Channel, error) { return e.channels, nil } @@ -149,8 +151,8 @@ func (e *Export) AllThreadMessages(channelID, threadID string) ([]slack.Message, return tm, nil } -func (e *Export) ChannelInfo(channelID string) (*slack.Channel, error) { - c, err := e.Channels() +func (e *Export) ChannelInfo(ctx context.Context, channelID string) (*slack.Channel, error) { + c, err := e.Channels(ctx) if err != nil { return nil, err } diff --git a/internal/source/export_test.go b/internal/source/export_test.go index c2da9a5c..d6a06c95 100644 --- a/internal/source/export_test.go +++ b/internal/source/export_test.go @@ -2,6 +2,7 @@ package source import ( "archive/zip" + "context" "io/fs" "os" "path/filepath" @@ -57,7 +58,7 @@ func TestExport_Channels(t *testing.T) { chanNames: tt.fields.chanNames, name: tt.fields.name, } - got, err := e.Channels() + got, err := e.Channels(context.Background()) if (err != nil) != tt.wantErr { t.Errorf("Export.Channels() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/internal/source/source.go b/internal/source/source.go index 284613b4..0e966773 100644 --- a/internal/source/source.go +++ b/internal/source/source.go @@ -18,6 +18,7 @@ import ( "strings" "github.com/rusq/slack" + "github.com/rusq/slackdump/v3/internal/chunk" ) @@ -40,7 +41,7 @@ type Sourcer interface { // Type should return the type of the retriever, i.e. "chunk" or "export". Type() string // Channels should return all channels. - Channels() ([]slack.Channel, error) + Channels(ctx context.Context) ([]slack.Channel, error) // Users should return all users. Users() ([]slack.User, error) // AllMessages should return all messages for the given channel id. @@ -50,7 +51,7 @@ type Sourcer interface { AllThreadMessages(channelID, threadID string) ([]slack.Message, error) // ChannelInfo should return the channel information for the given channel // id. - ChannelInfo(channelID string) (*slack.Channel, error) + ChannelInfo(ctx context.Context, channelID string) (*slack.Channel, error) // FS should return the filesystem with file attachments. FS() fs.FS // File should return the path of the file within the filesystem returned @@ -96,10 +97,10 @@ func Load(ctx context.Context, src string) (Sourcer, error) { if err != nil { return nil, err } - return NewDump(f, src) + return NewDump(ctx, f, src) case FDump | FDirectory: lg.DebugContext(ctx, "loading dump directory") - return NewDump(os.DirFS(src), src) + return NewDump(ctx, os.DirFS(src), src) default: return nil, fmt.Errorf("unsupported source type: %s", src) } diff --git a/internal/viewer/handlers.go b/internal/viewer/handlers.go index 9916ec2b..a0c5bf9c 100644 --- a/internal/viewer/handlers.go +++ b/internal/viewer/handlers.go @@ -10,6 +10,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/rusq/slack" + "github.com/rusq/slackdump/v3/internal/fasttime" ) @@ -85,7 +86,7 @@ func (v *Viewer) channelHandler(w http.ResponseWriter, r *http.Request, id strin lg.DebugContext(ctx, "conversation", "id", id, "message_count", len(mm)) - ci, err := v.src.ChannelInfo(id) + ci, err := v.src.ChannelInfo(r.Context(), id) if err != nil { lg.ErrorContext(ctx, "src.ChannelInfo", "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) @@ -104,7 +105,6 @@ func (v *Viewer) channelHandler(w http.ResponseWriter, r *http.Request, id strin lg.ErrorContext(ctx, "ExecuteTemplate", "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) } - } func isHXRequest(r *http.Request) bool { @@ -128,7 +128,7 @@ func (v *Viewer) threadHandler(w http.ResponseWriter, r *http.Request, id string lg.DebugContext(ctx, "Messages", "mm_count", len(mm)) - ci, err := v.src.ChannelInfo(id) + ci, err := v.src.ChannelInfo(r.Context(), id) if err != nil { lg.ErrorContext(ctx, "ChannelInfo", "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/internal/viewer/viewer.go b/internal/viewer/viewer.go index e9d29e8d..d226af27 100644 --- a/internal/viewer/viewer.go +++ b/internal/viewer/viewer.go @@ -51,7 +51,7 @@ const ( // [Sourcer] to retrieve the data, see "source" package for available options. // It will initialise the logger from the context. func New(ctx context.Context, addr string, r source.Sourcer) (*Viewer, error) { - all, err := r.Channels() + all, err := r.Channels(ctx) if err != nil { return nil, err } @@ -115,7 +115,7 @@ func (v *Viewer) Close() error { } func indexusers(uu []slack.User) map[string]slack.User { - var um = make(map[string]slack.User, len(uu)) + um := make(map[string]slack.User, len(uu)) for _, u := range uu { um[u.ID] = u } @@ -123,7 +123,7 @@ func indexusers(uu []slack.User) map[string]slack.User { } func indexchannels(cc []slack.Channel) map[string]slack.Channel { - var cm = make(map[string]slack.Channel, len(cc)) + cm := make(map[string]slack.Channel, len(cc)) for _, c := range cc { cm[c.ID] = c } From ec92c405c674301f82cf6d391476aac44812f84b Mon Sep 17 00:00:00 2001 From: Rustam Gilyazov <16064414+rusq@users.noreply.github.com> Date: Wed, 22 Jan 2025 19:26:13 +1000 Subject: [PATCH 2/4] convertor fixes + avatar detection --- cmd/slackdump/internal/convertcmd/convert.go | 14 +++++++++-- internal/chunk/file.go | 16 ++++++------- internal/chunk/file_test.go | 6 ++--- internal/chunk/player.go | 4 ++-- internal/convert/chunkexp.go | 1 + internal/source/filestorage.go | 7 ++++++ internal/source/source.go | 25 ++++++++++++++++++-- 7 files changed, 56 insertions(+), 17 deletions(-) diff --git a/cmd/slackdump/internal/convertcmd/convert.go b/cmd/slackdump/internal/convertcmd/convert.go index 28b033f0..ad7f279c 100644 --- a/cmd/slackdump/internal/convertcmd/convert.go +++ b/cmd/slackdump/internal/convertcmd/convert.go @@ -13,6 +13,7 @@ import ( "github.com/rusq/slackdump/v3/internal/chunk" "github.com/rusq/slackdump/v3/internal/chunk/transform/fileproc" "github.com/rusq/slackdump/v3/internal/convert" + "github.com/rusq/slackdump/v3/internal/source" ) //go:embed assets/convert.md @@ -105,6 +106,10 @@ type convertflags struct { } func chunk2export(ctx context.Context, src, trg string, cflg convertflags) error { + st, err := source.Type(src) + if err != nil { + return err + } cd, err := chunk.OpenDir(src) if err != nil { return err @@ -121,11 +126,16 @@ func chunk2export(ctx context.Context, src, trg string, cflg convertflags) error return errors.New("unknown storage type") } + var ( + includeFiles = cflg.withFiles && (st&source.FMattermost != 0) + includeAvatars = cflg.withAvatars && (st&source.FAvatars != 0) + ) + cvt := convert.NewChunkToExport( cd, fsa, - convert.WithIncludeFiles(cflg.withFiles), - convert.WithIncludeAvatars(cflg.withAvatars), + convert.WithIncludeFiles(includeFiles), + convert.WithIncludeAvatars(includeAvatars), convert.WithSrcFileLoc(sttFn), convert.WithTrgFileLoc(sttFn), convert.WithLogger(cfg.Log), diff --git a/internal/chunk/file.go b/internal/chunk/file.go index 279c0757..d2a70261 100644 --- a/internal/chunk/file.go +++ b/internal/chunk/file.go @@ -140,8 +140,8 @@ func (f *File) ensure() { } } -// Offsets returns all offsets for the given id. -func (f *File) Offsets(id GroupID) ([]int64, bool) { +// offsets returns all offsets for the given id. +func (f *File) offsets(id GroupID) ([]int64, bool) { f.ensure() ret, ok := f.idx[id] return ret, ok && len(ret) > 0 @@ -276,8 +276,8 @@ func (f *File) AllChannelInfos() ([]slack.Channel, error) { return nil, err } for i := range chans { - if chans[i].IsArchived { - slog.Default().Debug("skipping archived channel", "i", i, "id", chans[i].ID) + if chans[i].IsArchived || chans[i].NumMembers == 0 { + slog.Default().Debug("skipping empty or archived channel", "i", i, "id", chans[i].ID) continue } members, err := f.ChannelUsers(chans[i].ID) @@ -334,11 +334,11 @@ func allForOffsets[T any](p *File, offsets []int64, fn func(c *Chunk) []T) ([]T, // ChannelInfo returns the information for the given channel. func (f *File) ChannelInfo(channelID string) (*slack.Channel, error) { - info, err := f.channelInfo(channelID, false) + info, err := f.channelInfo(channelID) if err != nil { return nil, err } - if !info.IsArchived { + if !info.IsArchived && info.NumMembers > 0 { users, err := f.ChannelUsers(channelID) if err != nil { return nil, fmt.Errorf("failed getting channel users for %q: %w", channelID, err) @@ -354,7 +354,7 @@ func (f *File) ChannelUsers(channelID string) ([]string, error) { }) } -func (f *File) channelInfo(channelID string, _ bool) (*slack.Channel, error) { +func (f *File) channelInfo(channelID string) (*slack.Channel, error) { chunk, err := f.firstChunkForID(channelInfoID(channelID)) if err != nil { return nil, err @@ -367,7 +367,7 @@ func (f *File) channelInfo(channelID string, _ bool) (*slack.Channel, error) { // firstChunkForID returns the first chunk in the file for the given id. func (f *File) firstChunkForID(id GroupID) (*Chunk, error) { - ofs, ok := f.Offsets(id) + ofs, ok := f.offsets(id) if !ok { return nil, ErrNotFound } diff --git a/internal/chunk/file_test.go b/internal/chunk/file_test.go index d34e4c53..2d7ffb8e 100644 --- a/internal/chunk/file_test.go +++ b/internal/chunk/file_test.go @@ -672,12 +672,12 @@ func TestFile_Offsets(t *testing.T) { f := &File{ idx: tt.fields.idx, } - got, got1 := f.Offsets(tt.args.id) + got, got1 := f.offsets(tt.args.id) if !reflect.DeepEqual(got, tt.want) { - t.Errorf("File.Offsets() got = %v, want %v", got, tt.want) + t.Errorf("File.offsets() got = %v, want %v", got, tt.want) } if got1 != tt.want1 { - t.Errorf("File.Offsets() got1 = %v, want %v", got1, tt.want1) + t.Errorf("File.offsets() got1 = %v, want %v", got1, tt.want1) } }) } diff --git a/internal/chunk/player.go b/internal/chunk/player.go index b07e40e2..87ea80c2 100644 --- a/internal/chunk/player.go +++ b/internal/chunk/player.go @@ -59,7 +59,7 @@ func (p *Player) SetState(ptrs map[GroupID]int) { func (p *Player) next(id GroupID) (*Chunk, error) { p.ptrMu.Lock() defer p.ptrMu.Unlock() - offsets, ok := p.f.Offsets(id) + offsets, ok := p.f.offsets(id) if !ok { return nil, ErrNotFound } @@ -118,7 +118,7 @@ func (p *Player) HasMoreMessages(channelID string) bool { func (p *Player) hasMore(id GroupID) bool { p.ptrMu.RLock() defer p.ptrMu.RUnlock() - offsets, ok := p.f.Offsets(id) + offsets, ok := p.f.offsets(id) if !ok { return false // no such id } diff --git a/internal/convert/chunkexp.go b/internal/convert/chunkexp.go index d30c2d7f..3c8a4118 100644 --- a/internal/convert/chunkexp.go +++ b/internal/convert/chunkexp.go @@ -288,6 +288,7 @@ LOOP: break LOOP } if err != nil { + slog.Error("worker error", "err", err) failed = true } } diff --git a/internal/source/filestorage.go b/internal/source/filestorage.go index c43b35bb..82a137be 100644 --- a/internal/source/filestorage.go +++ b/internal/source/filestorage.go @@ -37,6 +37,9 @@ type STMattermost struct { // rootfs is the root filesystem of the export. func NewMattermostStorage(rootfs fs.FS) (*STMattermost, error) { // mattermost export format has files in the __uploads subdirectory. + if _, err := fs.Stat(rootfs, chunk.UploadsDir); err != nil { + return nil, err + } fsys, err := fs.Sub(rootfs, chunk.UploadsDir) if err != nil { return nil, err @@ -107,6 +110,10 @@ func buildFileIndex(fsys fs.FS, dir string) (map[string]string, error) { }); err != nil { return nil, err } + if len(idx) == 0 { + // no files found. + return nil, fs.ErrNotExist + } return idx, nil } diff --git a/internal/source/source.go b/internal/source/source.go index 0e966773..7c4b4350 100644 --- a/internal/source/source.go +++ b/internal/source/source.go @@ -31,6 +31,8 @@ const ( FChunk FExport FDump + FAvatars + FMattermost ) // Sourcer is an interface for retrieving data from different sources. @@ -69,11 +71,14 @@ var ( // Load loads the source from file src. func Load(ctx context.Context, src string) (Sourcer, error) { lg := slog.With("source", src) - fi, err := os.Stat(src) + st, err := Type(src) if err != nil { return nil, err } - switch srcType(src, fi) { + if st == FUnknown { + return nil, fmt.Errorf("unsupported source type: %s", src) + } + switch st { case FChunk | FDirectory: lg.DebugContext(ctx, "loading chunk directory") dir, err := chunk.OpenDir(src) @@ -106,6 +111,14 @@ func Load(ctx context.Context, src string) (Sourcer, error) { } } +func Type(src string) (Flags, error) { + fi, err := os.Stat(src) + if err != nil { + return FUnknown, err + } + return srcType(src, fi), nil +} + func srcType(src string, fi fs.FileInfo) Flags { var fsys fs.FS // this will be our media for accessing files var flags Flags @@ -120,6 +133,14 @@ func srcType(src string, fi fs.FileInfo) Flags { defer f.Close() fsys = f flags |= FZip + } else { + return FUnknown + } + if _, err := fs.Stat(fsys, "__avatars"); err == nil { + flags |= FAvatars + } + if _, err := fs.Stat(fsys, chunk.UploadsDir); err == nil { + flags |= FMattermost } if ff, err := fs.Glob(fsys, "[CD]*.json"); err == nil && len(ff) > 0 { return flags | FDump From 0b922af07fd928a788714499e0d3e35c3c97d64c Mon Sep 17 00:00:00 2001 From: Rustam Gilyazov <16064414+rusq@users.noreply.github.com> Date: Wed, 22 Jan 2025 20:02:17 +1000 Subject: [PATCH 3/4] update chunk visualiser --- utils/visualise.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/utils/visualise.py b/utils/visualise.py index da78057c..4e81bdbd 100755 --- a/utils/visualise.py +++ b/utils/visualise.py @@ -40,29 +40,31 @@ def main(args: list[str]): print("node [shape=box];") for line in file: chunk = json.loads(line) - if chunk["_t"] == CHUNK_MESSAGE: - for msg in chunk["_m"]: + chunk_type = chunk["t"] + if chunk_type == CHUNK_MESSAGE: + for msg in chunk["m"]: print(f"{msg['ts']} [fillcolor=\"{COLOR_MSG}\"; style=filled];") if files := msg.get("files"): if files: for file in files: print(f"_{file['id']}[fillcolor=\"{COLOR_MSG_FILE}\"; style=filled];") print(f"{msg['ts']} -> _{file['id']};") - elif chunk["_t"] == CHUNK_THREAD: - for msg in chunk["_m"]: + elif chunk_type == CHUNK_THREAD: + for msg in chunk["m"]: print(f"{msg['ts']}[fillcolor=\"{COLOR_THREAD}\"; style=filled];") - print(f"{chunk['_p']['ts']} -> {msg['ts']};") + print(f"{chunk['p']['ts']} -> {msg['ts']};") if files := msg.get("files"): if files: for file in files: print(f"_{file['id']}[fillcolor=\"{COLOR_MSG_FILE}\"; style=filled];") print(f"{msg['ts']} -> _{file['id']};") - elif chunk["_t"] == CHUNK_FILE: - for file in chunk["_f"]: + elif chunk_type == CHUNK_FILE: + for file in chunk["f"]: print(f"{file['id']}[fillcolor=\"{COLOR_FILE}\"; style=filled];") print(f"{chunk['_p']['ts']} -> {file['id']};") else: - raise("Unknown chunk type: " + str(chunk["_t"])) + # raise ValueError("Unknown chunk type: " + str(chunk_type)) + pass print("}") if __name__ == '__main__': main(sys.argv[1:]) From f657806c739cb903fa21520a616b1c165652c092 Mon Sep 17 00:00:00 2001 From: Rustam Gilyazov <16064414+rusq@users.noreply.github.com> Date: Wed, 22 Jan 2025 20:39:57 +1000 Subject: [PATCH 4/4] fix file tests --- internal/chunk/file_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/chunk/file_test.go b/internal/chunk/file_test.go index 2d7ffb8e..5c971639 100644 --- a/internal/chunk/file_test.go +++ b/internal/chunk/file_test.go @@ -11,8 +11,9 @@ import ( "time" "github.com/rusq/slack" - "github.com/rusq/slackdump/v3/internal/chunk/state" "github.com/stretchr/testify/assert" + + "github.com/rusq/slackdump/v3/internal/chunk/state" ) const ( @@ -128,7 +129,7 @@ var archivedChannel = []Chunk{ } var testChunks = []Chunk{ - {Type: CChannelInfo, ChannelID: TestChannelID, Channel: &slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: TestChannelID}}}}, + {Type: CChannelInfo, ChannelID: TestChannelID, Channel: &slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: TestChannelID, NumMembers: 2}}}}, {Type: CChannelUsers, ChannelID: TestChannelID, ChannelUsers: []string{"user1", "user2"}}, {Type: CMessages, ChannelID: TestChannelID, Messages: []slack.Message{ {Msg: slack.Msg{Timestamp: "1234567890.100000", Text: "message1"}}, @@ -186,7 +187,7 @@ var testChunks = []Chunk{ }, }, // chunks from another channel - {Type: CChannelInfo, ChannelID: TestChannelID2, Channel: &slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: TestChannelID2}}}}, + {Type: CChannelInfo, ChannelID: TestChannelID2, Channel: &slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: TestChannelID2, NumMembers: 2}}}}, {Type: CChannelUsers, ChannelID: TestChannelID2, ChannelUsers: []string{"user3", "user4"}}, {Type: CMessages, ChannelID: TestChannelID2, Messages: []slack.Message{ {Msg: slack.Msg{Timestamp: "1234567890.100000", Text: "message1"}},