Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions internal/cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,38 @@
})
}

// Clear removes every cached metadata and processed-book record in a single
// write transaction on the existing handle. It never opens a second handle, so
// it is safe to call while the store is held open elsewhere (bbolt takes an
// exclusive file lock).
func (s *Store) Clear() error {
return s.db.Update(func(tx *bbolt.Tx) error {
for _, bucket := range [][]byte{metadataBucket, processedBucket} {
if err := tx.DeleteBucket(bucket); err != nil && err != bbolt.ErrBucketNotFound {

Check failure on line 149 in internal/cache/store.go

View workflow job for this annotation

GitHub Actions / Lint

SA1019: bbolt.ErrBucketNotFound is deprecated: Use the error variables defined in the bbolt/errors package. (staticcheck)
return err
}
if _, err := tx.CreateBucketIfNotExists(bucket); err != nil {
return err
}
}
return nil
})
}

// Stats reports how many entries each bucket holds.
func (s *Store) Stats() (metadataCount, processedCount int) {
_ = s.db.View(func(tx *bbolt.Tx) error {
if b := tx.Bucket(metadataBucket); b != nil {
metadataCount = b.Stats().KeyN
}
if b := tx.Bucket(processedBucket); b != nil {
processedCount = b.Stats().KeyN
}
return nil
})
return metadataCount, processedCount
}

func (s *Store) Close() error {
return s.db.Close()
}
40 changes: 40 additions & 0 deletions internal/cache/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,43 @@ func TestStore_Close(t *testing.T) {
t.Errorf("Close() error = %v", err)
}
}

func TestStore_ClearAndStats(t *testing.T) {
tempDir := t.TempDir()
dbPath := filepath.Join(tempDir, "clear.db")

store, err := NewStore(dbPath)
if err != nil {
t.Fatalf("NewStore() error = %v", err)
}
defer store.Close()

if err := store.SetMetadata("query-a", models.BookMetadata{Title: "A"}); err != nil {
t.Fatalf("SetMetadata() error = %v", err)
}
if err := store.SetMetadata("query-b", models.BookMetadata{Title: "B"}); err != nil {
t.Fatalf("SetMetadata() error = %v", err)
}

if meta, _ := store.Stats(); meta != 2 {
t.Fatalf("expected 2 metadata entries before clear, got %d", meta)
}

// Clear must operate on the already-open handle (no second open / no lock
// conflict) and empty the buckets.
if err := store.Clear(); err != nil {
t.Fatalf("Clear() error = %v", err)
}

if meta, processed := store.Stats(); meta != 0 || processed != 0 {
t.Fatalf("expected empty store after clear, got %d metadata, %d processed", meta, processed)
}

// The store must remain usable after clearing.
if err := store.SetMetadata("query-c", models.BookMetadata{Title: "C"}); err != nil {
t.Fatalf("SetMetadata() after clear error = %v", err)
}
if _, ok := store.GetMetadata("query-c"); !ok {
t.Fatalf("store should be usable after clear")
}
}
39 changes: 38 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,22 @@ type Config struct {
PreferredLanguage string `yaml:"preferred_language"`
}

// AvailableSources is the canonical list of metadata sources, in the default
// query order ("bookinfo first" for better audiobook metadata). It is the
// single source of truth shared by the config defaults and the TUI editor.
var AvailableSources = []string{"bookinfo", "googlebooks", "openlibrary"}

// AvailableFormats is the canonical list of output presets exposed in the UI.
// Only presets that map to an organization pattern are listed; writer-only
// outputs (json/all) are intentionally excluded until they are wired up.
var AvailableFormats = []string{"audiobookshelf", "plex"}

func DefaultConfig() *Config {
home, _ := os.UserHomeDir()
defaultOutput := filepath.Join(home, "Audiobooks-organized")

return &Config{
Sources: []string{"bookinfo", "googlebooks", "openlibrary"},
Sources: append([]string(nil), AvailableSources...),
OutputFormat: "audiobookshelf",
DefaultOutput: defaultOutput,
CopyMode: false,
Expand All @@ -33,6 +43,29 @@ func DefaultConfig() *Config {
}
}

// CachePath returns the single canonical path to the on-disk metadata cache.
func CachePath() string {
home, err := os.UserHomeDir()
if err != nil {
return filepath.Join(".cache", "audiosort", "metadata.db")
}
return filepath.Join(home, ".cache", "audiosort", "metadata.db")
}

// Validate performs lightweight sanity checks before persisting the config.
func (c *Config) Validate() error {
if c.PreferredLanguage == "" {
return fmt.Errorf("preferred language must not be empty")
}
if c.DefaultOutput == "" {
return fmt.Errorf("default output directory must not be empty")
}
if c.ParallelWorkers < 1 {
return fmt.Errorf("parallel workers must be at least 1")
}
return nil
}

func configPath() (string, error) {
home, err := os.UserHomeDir()
if err != nil {
Expand Down Expand Up @@ -65,6 +98,10 @@ func Load() (*Config, error) {
}

func (c *Config) Save() error {
if err := c.Validate(); err != nil {
return err
}

path, err := configPath()
if err != nil {
return err
Expand Down
72 changes: 72 additions & 0 deletions internal/core/build.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package core

import (
"net/http"
"time"

"audiosort/internal/cache"
"audiosort/internal/config"
"audiosort/internal/metadata"
)

// metadataHTTPTimeout bounds every outbound metadata request.
const metadataHTTPTimeout = 15 * time.Second

// MetadataSources builds the metadata sources described by the configuration,
// sharing a single HTTP client. When the config lists no sources it falls back
// to config.AvailableSources (canonical "bookinfo first" order). This is the
// single place that maps source names to implementations, so the scan pipeline
// and the search view can no longer drift apart.
func MetadataSources(cfg *config.Config) []metadata.MetadataSource {
client := &http.Client{Timeout: metadataHTTPTimeout}

sources := buildSources(cfg.Sources, client)
if len(sources) == 0 {
sources = buildSources(config.AvailableSources, client)
}
return sources
}

func buildSources(names []string, client *http.Client) []metadata.MetadataSource {
var sources []metadata.MetadataSource
for _, name := range names {
switch name {
case "bookinfo":
sources = append(sources, metadata.NewBookInfo(client))
case "googlebooks":
sources = append(sources, metadata.NewGoogleBooks(client))
case "openlibrary":
sources = append(sources, metadata.NewOpenLibrary(client))
}
}
return sources
}

// BuildFetcher assembles a metadata.Fetcher from the configured sources, reusing
// the caller-provided cache store. The store may be nil (the cache is optional).
func BuildFetcher(cfg *config.Config, store *cache.Store) *metadata.Fetcher {
return metadata.NewFetcher(MetadataSources(cfg), store)
}

// BuildFetcherFromConfig is a convenience wrapper for callers that do not yet
// hold a cache handle: it opens the shared on-disk cache and returns it so the
// caller can reuse the single handle (bbolt takes an exclusive file lock).
// The returned store is nil if the cache could not be opened.
func BuildFetcherFromConfig(cfg *config.Config) (*metadata.Fetcher, *cache.Store) {
store, _ := cache.NewStore(config.CachePath())
return BuildFetcher(cfg, store), store
}

// BuildPipeline builds a processing pipeline for the given source/destination
// using the supplied fetcher.
func BuildPipeline(cfg *config.Config, sourcePath, destPath string, fetcher *metadata.Fetcher) *Pipeline {
return NewPipeline(PipelineOptions{
SourcePath: sourcePath,
DestPath: destPath,
Workers: cfg.ParallelWorkers,
CopyMode: cfg.CopyMode,
SkipExist: cfg.SkipExisting,
DryRun: false,
Fetcher: fetcher,
})
}
93 changes: 93 additions & 0 deletions internal/core/build_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package core

import (
"context"
"fmt"
"testing"
"time"

"audiosort/internal/config"
"audiosort/pkg/models"
)

func TestProcessBooksHonoursSelectionAndStreamsProgress(t *testing.T) {
p := NewPipeline(PipelineOptions{DryRun: true, Workers: 2})

books := []models.Audiobook{
{Path: "/tmp/a"},
{Path: "/tmp/b"},
{Path: "/tmp/c"},
}

progress := make(chan ProgressUpdate, len(books)+1)
summary := p.ProcessBooks(context.Background(), books, progress)

if summary.Total != 3 || summary.Processed != 3 {
t.Fatalf("expected 3 books processed, got total=%d processed=%d", summary.Total, summary.Processed)
}

// The channel must be closed by ProcessBooks; the final update is 3/3.
var last ProgressUpdate
count := 0
for u := range progress {
last = u
count++
}
if count != 3 {
t.Fatalf("expected 3 progress updates, got %d", count)
}
if last.Done != 3 || last.Total != 3 {
t.Fatalf("expected final progress 3/3, got %d/%d", last.Done, last.Total)
}
}

func TestProcessBooksRespectsCancellation(t *testing.T) {
p := NewPipeline(PipelineOptions{DryRun: true, Workers: 1})

books := make([]models.Audiobook, 100)
for i := range books {
books[i].Path = fmt.Sprintf("/tmp/%d", i)
}

ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel before processing starts

progress := make(chan ProgressUpdate, len(books)+1)
done := make(chan *models.Summary, 1)
go func() { done <- p.ProcessBooks(ctx, books, progress) }()

select {
case summary := <-done:
if summary.Total > len(books) {
t.Fatalf("processed more books than provided: %d", summary.Total)
}
// Draining the (closed) progress channel must not block.
for range progress {
}
case <-time.After(5 * time.Second):
t.Fatal("ProcessBooks did not return after cancellation (possible deadlock)")
}
}

func TestMetadataSourcesRespectConfigOrder(t *testing.T) {
cfg := &config.Config{Sources: []string{"googlebooks", "bookinfo"}}
sources := MetadataSources(cfg)
if len(sources) != 2 {
t.Fatalf("expected 2 sources, got %d", len(sources))
}
if sources[0].Name() != "googlebooks" || sources[1].Name() != "bookinfo" {
t.Fatalf("sources should follow config order, got %s, %s", sources[0].Name(), sources[1].Name())
}
}

func TestMetadataSourcesFallbackIncludesBookInfo(t *testing.T) {
cfg := &config.Config{Sources: nil}
sources := MetadataSources(cfg)
if len(sources) != len(config.AvailableSources) {
t.Fatalf("expected fallback to %d sources, got %d", len(config.AvailableSources), len(sources))
}
// "bookinfo first" canonical order.
if sources[0].Name() != "bookinfo" {
t.Fatalf("fallback should query bookinfo first, got %s", sources[0].Name())
}
}
58 changes: 58 additions & 0 deletions internal/core/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,64 @@ func NewPipeline(opts PipelineOptions) *Pipeline {
}
}

// ProgressUpdate reports how many books have finished processing out of the total.
type ProgressUpdate struct {
Done int
Total int
}

// ProcessBooks runs the metadata/organize stages over an already-scanned set of
// audiobooks, instead of re-scanning the source. It honours the caller's
// selection and streams a ProgressUpdate after each book completes (when
// progress is non-nil). The progress channel is closed before returning.
//
// Run is intentionally left untouched so the CLI path keeps its scan-everything
// behaviour.
func (p *Pipeline) ProcessBooks(ctx context.Context, books []models.Audiobook, progress chan<- ProgressUpdate) *models.Summary {
start := time.Now()
total := len(books)

// Feed the selected books into the existing worker pool.
bookChan := make(chan models.Audiobook)
go func() {
defer close(bookChan)
for _, book := range books {
select {
case bookChan <- book:
case <-ctx.Done():
return
}
}
}()

results := p.processAudiobooks(ctx, bookChan)

summary := &models.Summary{}
done := 0
for result := range results {
summary.Total++
if result.Error != nil {
summary.Errors++
summary.Failures = append(summary.Failures, result)
} else if result.Audiobook != nil && result.Audiobook.Status == models.StatusSkipped {
summary.Skipped++
} else {
summary.Processed++
}

done++
if progress != nil {
progress <- ProgressUpdate{Done: done, Total: total}
}
}

summary.Duration = time.Since(start)
if progress != nil {
close(progress)
}
return summary
}

// Run executes the full pipeline: scan -> fetch metadata -> organize -> write outputs
func (p *Pipeline) Run(ctx context.Context, sourcePath string) (*models.Summary, error) {
start := time.Now()
Expand Down
Loading
Loading