Skip to content

Commit

Permalink
Merge pull request #417 from rusq/i174-resume
Browse files Browse the repository at this point in the history
Consolidation of "source" related code
  • Loading branch information
rusq authored Jan 21, 2025
2 parents 77b5685 + d32cfb8 commit ddb3f32
Show file tree
Hide file tree
Showing 20 changed files with 459 additions and 438 deletions.
3 changes: 2 additions & 1 deletion cmd/slackdump/internal/diag/hydrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"path/filepath"
"strings"

"github.com/rusq/slackdump/v3/internal/source"

"github.com/rusq/fsadapter"
"github.com/rusq/slack"

Expand All @@ -21,7 +23,6 @@ import (
"github.com/rusq/slackdump/v3/downloader"
"github.com/rusq/slackdump/v3/internal/chunk/transform/fileproc"
"github.com/rusq/slackdump/v3/internal/structures"
"github.com/rusq/slackdump/v3/internal/viewer/source"
)

var cmdHydrate = &base.Command{
Expand Down
91 changes: 3 additions & 88 deletions cmd/slackdump/internal/view/view.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
package view

import (
"archive/zip"
"context"
_ "embed"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"path"
"strings"

"github.com/rusq/slackdump/v3/internal/source"

br "github.com/pkg/browser"

"github.com/rusq/slackdump/v3/cmd/slackdump/internal/cfg"
"github.com/rusq/slackdump/v3/cmd/slackdump/internal/golang/base"
"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/viewer"
"github.com/rusq/slackdump/v3/internal/viewer/source"
)

//go:embed assets/view.md
Expand All @@ -45,7 +40,7 @@ func RunView(ctx context.Context, cmd *base.Command, args []string) error {
base.SetExitStatus(base.SInvalidParameters)
return fmt.Errorf("viewing slackdump files requires at least one argument")
}
src, err := loadSource(ctx, args[0])
src, err := source.Load(ctx, args[0])
if err != nil {
base.SetExitStatus(base.SUserError)
return err
Expand Down Expand Up @@ -84,83 +79,3 @@ func RunView(ctx context.Context, cmd *base.Command, args []string) error {

return nil
}

type sourceFlags int16

const (
sfUnknown sourceFlags = 0
sfDirectory sourceFlags = 1 << iota
sfZIP
sfChunk
sfExport
sfDump
)

func loadSource(ctx context.Context, src string) (viewer.Sourcer, error) {
lg := cfg.Log.With("source", src)
fi, err := os.Stat(src)
if err != nil {
return nil, err
}
switch srcType(src, fi) {
case sfChunk | sfDirectory:
lg.DebugContext(ctx, "loading chunk directory")
dir, err := chunk.OpenDir(src)
if err != nil {
return nil, err
}
return source.NewChunkDir(dir), nil
case sfExport | sfZIP:
lg.DebugContext(ctx, "loading export zip")
f, err := zip.OpenReader(src)
if err != nil {
return nil, err
}
return source.NewExport(f, src)
case sfExport | sfDirectory:
lg.DebugContext(ctx, "loading export directory")
return source.NewExport(os.DirFS(src), src)
case sfDump | sfZIP:
lg.DebugContext(ctx, "loading dump zip")
f, err := zip.OpenReader(src)
if err != nil {
return nil, err
}
return source.NewDump(f, src)
case sfDump | sfDirectory:
lg.DebugContext(ctx, "loading dump directory")
return source.NewDump(os.DirFS(src), src)
default:
return nil, fmt.Errorf("unsupported source type: %s", src)
}
}

func srcType(src string, fi fs.FileInfo) sourceFlags {
var fsys fs.FS // this will be our media for accessing files
var flags sourceFlags
if fi.IsDir() {
fsys = os.DirFS(src)
flags |= sfDirectory
} else if fi.Mode().IsRegular() && strings.ToLower(path.Ext(src)) == ".zip" {
f, err := zip.OpenReader(src)
if err != nil {
return sfUnknown
}
defer f.Close()
fsys = f
flags |= sfZIP
}
if ff, err := fs.Glob(fsys, "[CD]*.json"); err == nil && len(ff) > 0 {
return flags | sfDump
}
if _, err := fs.Stat(fsys, "workspace.json.gz"); err == nil {
if flags&sfZIP != 0 {
return sfUnknown // compressed chunk directories are not supported
}
return flags | sfChunk
}
if _, err := fs.Stat(fsys, "channels.json"); err == nil {
return flags | sfExport
}
return sfUnknown
}
167 changes: 122 additions & 45 deletions internal/chunk/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,22 @@ import (
"io/fs"
"os"
"path/filepath"
"sort"
"strings"
"sync/atomic"

"github.com/rusq/slackdump/v3/internal/structures"

"github.com/rusq/slack"

"github.com/rusq/slackdump/v3/internal/osext"
)

const chunkExt = ".json.gz"
// file extensions
const (
chunkExt = ".json.gz"
extIdx = ".idx"
)

// common filenames
const (
Expand All @@ -26,7 +34,7 @@ const (
FSearch FileID = "search"
)

const uploadsDir = "__uploads" // for serving files
const UploadsDir = "__uploads" // for serving files

// Directory is an abstraction over the directory with chunk files. It
// provides a way to write chunk files and read channels, users and messages
Expand Down Expand Up @@ -107,8 +115,6 @@ func (d *Directory) Close() error {
return nil
}

var errNoChannelInfo = errors.New("no channel info")

// Channels collects all channels from the chunk directory. First, it
// attempts to find the channel.json.gz file, if it's not present, it will go
// through all conversation files and try to get "ChannelInfo" chunk from the
Expand All @@ -120,23 +126,13 @@ func (d *Directory) Channels() ([]slack.Channel, error) {
// we are not using the channel.json.gz file because it doesn't contain
// members.
var ch []slack.Channel
if err := filepath.WalkDir(d.dir, func(path string, de fs.DirEntry, err error) error {
if err := d.Walk(func(name string, f *File, err error) error {
if err != nil {
return err
}
if !strings.HasSuffix(path, chunkExt) {
return nil
} else if de.IsDir() {
return nil
}
chs, err := d.loadChanInfo(path)
if err != nil {
if errors.Is(err, errNoChannelInfo) {
return nil
}
return err
}
ch = append(ch, chs...)
ci, err := f.AllChannelInfos()
ch = append(ch, ci...)

return nil
}); err != nil {
return nil, err
Expand All @@ -145,35 +141,33 @@ func (d *Directory) Channels() ([]slack.Channel, error) {
return ch, nil
}

// Walk iterates over all chunk files in the directory and calls the function
// for each file. If the function returns an error, the iteration stops.
func (d *Directory) Walk(fn func(name string, f *File, err error) error) error {
return filepath.WalkDir(d.dir, func(path string, de fs.DirEntry, err error) error {
if err != nil {
return err
}
if !strings.HasSuffix(path, chunkExt) || de.IsDir() {
return nil
}
f, err := d.openRAW(path)
if err != nil {
return fn(path, nil, err)
}
cf, err := cachedFromReader(f, d.wantCache)
if err == nil {
defer cf.Close()
}
return fn(path, cf, err)
})
}

// Name returns the full directory path.
func (d *Directory) Name() string {
return d.dir
}

// loadChanInfo loads the channel info from the file with full path.
func (d *Directory) loadChanInfo(fullpath string) ([]slack.Channel, error) {
f, err := d.openRAW(fullpath)
if err != nil {
return nil, err
}
defer f.Close()
ch, err := d.readChanInfo(f)
if err != nil {
return nil, err
}
return ch, nil
}

// readChanInfo returns the Channels from all the ChannelInfo chunks in the
// file.
func (d *Directory) readChanInfo(wf osext.ReadSeekCloseNamer) ([]slack.Channel, error) {
cf, err := cachedFromReader(wf, d.wantCache)
if err != nil {
return nil, err
}
return cf.AllChannelInfos()
}

// loadChannelsJSON loads channels json file and returns a slice of
// slack.Channel. It expects it to be GZIP compressed.
func (d *Directory) loadChannelsJSON(fullpath string) ([]slack.Channel, error) {
Expand Down Expand Up @@ -323,8 +317,6 @@ func (d *Directory) WorkspaceInfo() (*slack.AuthTestResponse, error) {
return nil, errors.New("no workspace info found")
}

const extIdx = ".idx"

func cachedFromReader(wf osext.ReadSeekCloseNamer, wantCache bool) (*File, error) {
if !wantCache {
return FromReader(wf)
Expand Down Expand Up @@ -359,5 +351,90 @@ func cachedFromReader(wf osext.ReadSeekCloseNamer, wantCache bool) (*File, error

// File returns the file with the given id and name.
func (d *Directory) File(id string, name string) (fs.File, error) {
return os.Open(filepath.Join(d.dir, uploadsDir, id, name))
return os.Open(filepath.Join(d.dir, UploadsDir, id, name))
}

func (d *Directory) AllMessages(channelID string) ([]slack.Message, error) {
var mm structures.Messages
err := d.Walk(func(name string, f *File, err error) error {
if err != nil {
return err
}
m, err := f.AllMessages(channelID)
if err != nil {
if errors.Is(err, ErrNotFound) {
return nil
}
return err
}
mm = append(mm, m...)
return nil
})
if err != nil {
return nil, err
}
sort.Sort(mm)
return mm, nil
}

func (d *Directory) AllThreadMessages(channelID, threadID string) ([]slack.Message, error) {
var mm structures.Messages
var parent *slack.Message
err := d.Walk(func(name string, f *File, err error) error {
if err != nil {
return err
}
if parent == nil {
par, err := f.ThreadParent(channelID, threadID)
if err != nil {
if !errors.Is(err, ErrNotFound) {
return err
}
} else {
parent = par
}
}
rest, err := f.AllThreadMessages(channelID, threadID)
if err != nil && !errors.Is(err, ErrNotFound) {
return err
}
mm = append(mm, rest...)
return nil
})
if err != nil {
return nil, err
}
if parent == nil {
return nil, fmt.Errorf("parent not found for channel: %s, thread: %s", channelID, threadID)
}
sort.Sort(mm)
return append([]slack.Message{*parent}, mm...), nil
}

func (d *Directory) FastAllThreadMessages(channelID, threadID string) ([]slack.Message, error) {
f, err := d.Open(ToFileID(channelID, "", false))
if err != nil {
return nil, err
}
defer f.Close()
parent, err := f.ThreadParent(channelID, threadID)
if err != nil {
return nil, err
}
rest, err := f.AllThreadMessages(channelID, threadID)
if err != nil {
return nil, err
}

return append([]slack.Message{*parent}, rest...), nil
}

func (d *Directory) FastAllMessages(channelID string) ([]slack.Message, error) {
f, err := d.Open(FileID(channelID))
if err != nil {
return nil, err
}
defer f.Close()

return f.AllMessages(channelID)
}
Loading

0 comments on commit ddb3f32

Please sign in to comment.