diff --git a/README.md b/README.md index 86df309..d70583b 100644 --- a/README.md +++ b/README.md @@ -239,7 +239,7 @@ Caterpillar supports the following tasks, each of which can serve different role - **`kafka`** - [Read from or write to Kafka topics (acts as source or sink)](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/kafka/README.md) - **`replace`** - [Perform regex-based text replacement and transformation](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/replace/README.md) - **`sample`** - [Sample data using various strategies (random, head, tail, nth, percent)](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/sample/README.md) -- **`sftp`** - [Transfer files to and from SFTP servers (upload, download, list, move, delete)](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/sftp/README.md) +- **`sftp`** - [Transfer files to and from SFTP servers (upload, download)](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/sftp/README.md) - **`split`** - [Split data by specified delimiters](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/split/README.md) - **`sqs`** - [Read from or write to AWS SQS queues (acts as source or sink)](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/sqs/README.md) - **`xpath`** - [Extract data from XML/HTML using XPath expressions](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/xpath/README.md) diff --git a/internal/pkg/pipeline/task/sftp/README.md b/internal/pkg/pipeline/task/sftp/README.md index 380f71e..4d3288a 100644 --- a/internal/pkg/pipeline/task/sftp/README.md +++ b/internal/pkg/pipeline/task/sftp/README.md @@ -9,7 +9,7 @@ The `sftp` task handles SFTP only. It does not talk to S3 directly. Instead, it - **Upload (S3 → SFTP)**: the `file` task reads from `s3://…` and the `sftp` task writes the files to the server. - **Download (SFTP → S3)**: the `sftp` task reads files from the server and the `file` task writes them to `s3://…`. -This reuses Caterpillar's existing S3 code. The file name passes from one task to the next through a record context value, so `file` and `sftp` work together without extra configuration. +This reuses Caterpillar's existing S3 code. On download, each file's sanitized base name is stored in a record context value; on upload you reference it in `path` (`{{ context "CATERPILLAR_FILE_NAME_WRITE" }}`) to keep consistent names. ## Behavior @@ -17,11 +17,13 @@ Like the `file` task, the role is **inferred from the channels**: | The task has… | Role | What it does | |---------------|------|--------------| -| **no input** (it is the first task) | **source — download** | Reads file(s) at `remote_path` (a single file, a glob, or a directory) and emits one record per file. The base name is stored in the record context (`CATERPILLAR_FILE_NAME_WRITE`) so a downstream task can name what it writes. | -| **an input** | **sink — upload** | Writes each incoming record's data to the server. If `remote_path` is a directory (trailing `/` or an existing directory), the source file name is appended. | +| **no input** (it is the first task) | **source — download** | Reads file(s) at `path` (a single file or a glob; doublestar `**` and `{a,b}` are supported, like the file task) and emits one record per file. The base name is stored in the record context (`CATERPILLAR_FILE_NAME_WRITE`) so a downstream task can name what it writes. | +| **an input** | **sink — upload** | Writes each incoming record's data to `path`, used as-is per record. To name files from the source, template `path` — e.g. `{{ context "CATERPILLAR_FILE_NAME_WRITE" }}`. | It cannot be both: configuring the task with both an input and an output is an error. +For non-file sources that don't set a file name (Kafka, HTTP, …), template `path` yourself — with a macro like `{{ macro "uuid" }}`, a value extracted from the record via a `context:` jq map, or `{{ context "CATERPILLAR_ARCHIVE_FILE_NAME_WRITE" }}` for an archive-unpack source. + ## Authentication Set **exactly one** of `password` or `private_key`. Read credentials from SSM with the `{{ secret }}` template. Do not write them directly in the YAML. @@ -66,7 +68,7 @@ If you set neither, the task refuses to connect (it fails closed). You can obtai | `passphrase` | string | - | Passphrase for an encrypted `private_key` | | `host_key` | string | - | Authorized-key line used to verify the server | | `known_hosts_path` | string | - | Path to a `known_hosts` file | -| `remote_path` | string | - | Remote file or directory (required; supports templating). On upload, a trailing `/` or an existing directory is treated as a directory. | +| `path` | string | - | Remote file path (required; supports per-record templating). On download it may be a glob (`**`/`{a,b}` supported); a bare directory is not expanded. Used as-is — template a context value such as `{{ context "CATERPILLAR_FILE_NAME_WRITE" }}` to name uploaded files from the source. | | `timeout` | duration | `30s` | SSH connection timeout (for example `15s`, `1m`) | | `max_retries` | int | `3` | Attempts per connect or transfer operation | | `retry_delay` | duration | `1s` | Delay between retries | @@ -90,7 +92,7 @@ tasks: private_key: | {{ indent 6 (secret "/data/sftp/clientX/private_key") }} host_key: '{{ secret "/data/sftp/clientX/host_key" }}' - remote_path: /incoming/ + path: '/incoming/{{ context "CATERPILLAR_FILE_NAME_WRITE" }}' ``` ### Download files from an SFTP server to S3 @@ -103,7 +105,7 @@ tasks: username: '{{ secret "/data/sftp/clientX/username" }}' password: '{{ secret "/data/sftp/clientX/password" }}' known_hosts_path: /etc/ssh/known_hosts - remote_path: /outgoing/*.csv + path: /outgoing/*.csv - name: write_to_s3 type: file # The file task writes to `path` exactly as given. The sftp download stores diff --git a/internal/pkg/pipeline/task/sftp/operations.go b/internal/pkg/pipeline/task/sftp/operations.go index 8c581b1..5eecfd9 100644 --- a/internal/pkg/pipeline/task/sftp/operations.go +++ b/internal/pkg/pipeline/task/sftp/operations.go @@ -4,9 +4,10 @@ import ( "bytes" "fmt" "io" - "path" + pathpkg "path" "strings" + "github.com/bmatcuk/doublestar" pkgsftp "github.com/pkg/sftp" "github.com/patterninc/caterpillar/internal/pkg/pipeline/record" @@ -14,34 +15,24 @@ import ( "github.com/patterninc/caterpillar/internal/pkg/textutil" ) -// upload (sink): write each incoming record's data to the server. When -// remote_path is a directory, the upstream filename (carried in the record -// context by the source task) is appended, so `file -> sftp` composes with no -// glue. +// upload (sink): write each incoming record's data to Path, used as-is per +// record. To name files from the source, template Path with a context value — +// e.g. {{ context "CATERPILLAR_FILE_NAME_WRITE" }} for a file source, or +// {{ context "CATERPILLAR_ARCHIVE_FILE_NAME_WRITE" }} for an archive source. func (s *sftp) upload(client *pkgsftp.Client, input <-chan *record.Record) error { - // Cache isDirLike per remote_path so a static (or repeated) destination is - // stat-ed once per run rather than once per record. Local to this call so - // concurrent workers don't share it. - dirCache := make(map[string]bool) - for { rc, ok := s.GetRecord(input) if !ok { break } - remotePath, err := s.RemotePath.Get(rc) + file, err := s.Path.Get(rc) if err != nil { return err } - remoteFile := remotePath - if filename, found := rc.GetContextValue(string(task.CtxKeyFileNameWrite)); found && filename != `` && s.isDirLikeCached(client, remotePath, dirCache) { - remoteFile = path.Join(remotePath, filename) - } - - if err := s.uploadOne(client, remoteFile, rc.Data); err != nil { + if err := s.uploadOne(client, file, rc.Data); err != nil { return err } } @@ -50,32 +41,30 @@ func (s *sftp) upload(client *pkgsftp.Client, input <-chan *record.Record) error } -func (s *sftp) uploadOne(client *pkgsftp.Client, remoteFile string, data []byte) error { +func (s *sftp) uploadOne(client *pkgsftp.Client, file string, data []byte) error { - return s.retry(fmt.Sprintf(`upload %s`, remoteFile), func() error { + return s.retry(fmt.Sprintf(`upload %s`, file), func() error { - if dir := path.Dir(remoteFile); dir != `` && dir != `.` { + if dir := pathpkg.Dir(file); dir != `` && dir != `.` { if err := client.MkdirAll(dir); err != nil { return fmt.Errorf(`creating remote dir %q: %w`, dir, err) } } - f, err := client.Create(remoteFile) + f, err := client.Create(file) if err != nil { - return fmt.Errorf(`creating remote file %q: %w`, remoteFile, err) + return fmt.Errorf(`creating remote file %q: %w`, file, err) } if _, err := io.Copy(f, bytes.NewReader(data)); err != nil { - f.Close() // best effort; the copy error is the underlying failure - return fmt.Errorf(`writing remote file %q: %w`, remoteFile, err) + f.Close() + return fmt.Errorf(`writing remote file %q: %w`, file, err) } - // Check the Close error explicitly: for SFTP writes the final flush/ - // commit happens here and may be the only place a late failure (e.g. - // server out of space) surfaces. Ignoring it could report success for - // an incomplete upload. + // Check Close: for SFTP writes the final flush happens here and may be the + // only place a late failure (e.g. server out of space) surfaces. if err := f.Close(); err != nil { - return fmt.Errorf(`closing remote file %q: %w`, remoteFile, err) + return fmt.Errorf(`closing remote file %q: %w`, file, err) } return nil @@ -84,18 +73,17 @@ func (s *sftp) uploadOne(client *pkgsftp.Client, remoteFile string, data []byte) } -// download (source): read file(s) from remote_path and emit one record per -// file. remote_path may be a single file, a glob, or a directory. The basename -// is stored in the record context so a downstream file task can name the object -// it writes (mirrors file.readFile). +// download (source): read file(s) at Path (a single file or a glob) and emit +// one record per file. The base name is stored in the record context so a +// downstream task can name what it writes (mirrors file.readFile). func (s *sftp) download(client *pkgsftp.Client, output chan<- *record.Record) error { - remotePath, err := s.RemotePath.Get(nil) + path, err := s.Path.Get(nil) if err != nil { return err } - paths, err := s.resolveDownloadPaths(client, remotePath) + paths, err := s.parse(client, path) if err != nil { return err } @@ -107,7 +95,7 @@ func (s *sftp) download(client *pkgsftp.Client, output chan<- *record.Record) er } rc := &record.Record{Context: ctx} - rc.SetContextValue(string(task.CtxKeyFileNameWrite), textutil.SlugifyFileName(path.Base(p))) + rc.SetContextValue(string(task.CtxKeyFileNameWrite), textutil.SlugifyFileName(pathpkg.Base(p))) s.SendData(rc.Context, data, output) } @@ -115,49 +103,52 @@ func (s *sftp) download(client *pkgsftp.Client, output chan<- *record.Record) er } -func (s *sftp) resolveDownloadPaths(client *pkgsftp.Client, remotePath string) ([]string, error) { - - if containsGlob(remotePath) { - matches, err := client.Glob(remotePath) - if err != nil { - return nil, fmt.Errorf(`globbing %q: %w`, remotePath, err) +// parse turns Path into the list of files to download. +// A glob is matched with doublestar by walking the static base directory and matching +// each file against the pattern; a plain path matches itself. Matching no files +// is an error — the named file is missing, or the glob matched nothing. +func (s *sftp) parse(client *pkgsftp.Client, path string) ([]string, error) { + + var matches []string + walker := client.Walk(globBase(path)) + for walker.Step() { + if err := walker.Err(); err != nil { + return nil, fmt.Errorf(`walking %q: %w`, path, err) } - return matches, nil - } - - // A directory expands to the (non-directory) files directly inside it. - if info, err := client.Stat(remotePath); err == nil && info.IsDir() { - entries, err := client.ReadDir(remotePath) + if walker.Stat().IsDir() { + continue + } + ok, err := doublestar.Match(path, walker.Path()) if err != nil { - return nil, fmt.Errorf(`reading remote dir %q: %w`, remotePath, err) + return nil, fmt.Errorf(`bad glob %q: %w`, path, err) } - paths := make([]string, 0, len(entries)) - for _, e := range entries { - if !e.IsDir() { - paths = append(paths, path.Join(remotePath, e.Name())) - } + if ok { + matches = append(matches, walker.Path()) } - return paths, nil } - return []string{remotePath}, nil + if len(matches) == 0 { + return nil, fmt.Errorf(`no files found at %q`, path) + } + + return matches, nil } -func (s *sftp) downloadOne(client *pkgsftp.Client, remoteFile string) ([]byte, error) { +func (s *sftp) downloadOne(client *pkgsftp.Client, file string) ([]byte, error) { var data []byte - err := s.retry(fmt.Sprintf(`download %s`, remoteFile), func() error { - f, err := client.Open(remoteFile) + err := s.retry(fmt.Sprintf(`download %s`, file), func() error { + f, err := client.Open(file) if err != nil { - return fmt.Errorf(`opening remote file %q: %w`, remoteFile, err) + return fmt.Errorf(`opening remote file %q: %w`, file, err) } defer f.Close() b, err := io.ReadAll(f) if err != nil { - return fmt.Errorf(`reading remote file %q: %w`, remoteFile, err) + return fmt.Errorf(`reading remote file %q: %w`, file, err) } data = b return nil @@ -167,32 +158,20 @@ func (s *sftp) downloadOne(client *pkgsftp.Client, remoteFile string) ([]byte, e } -// isDirLikeCached wraps isDirLike with a per-run cache keyed on remotePath, so -// the same destination is stat-ed at most once instead of once per record. -func (s *sftp) isDirLikeCached(client *pkgsftp.Client, remotePath string, cache map[string]bool) bool { - if v, ok := cache[remotePath]; ok { - return v +// globBase returns the longest leading directory of pattern with no glob +// metacharacter — the point from which to start walking. +func globBase(pattern string) string { + i := strings.IndexAny(pattern, `*?[{`) + if i < 0 { + return pattern } - v := s.isDirLike(client, remotePath) - cache[remotePath] = v - return v -} - -// isDirLike reports whether remotePath should be treated as a directory to -// place an uploaded file into: either it ends with "/" or it already exists as -// a directory on the server. -func (s *sftp) isDirLike(client *pkgsftp.Client, remotePath string) bool { - - if strings.HasSuffix(remotePath, `/`) { - return true + dir := pattern[:i] + switch j := strings.LastIndex(dir, `/`); { + case j < 0: + return `.` + case j == 0: + return `/` + default: + return dir[:j] } - if info, err := client.Stat(remotePath); err == nil && info.IsDir() { - return true - } - return false - -} - -func containsGlob(p string) bool { - return strings.ContainsAny(p, `*?[`) } diff --git a/internal/pkg/pipeline/task/sftp/sftp.go b/internal/pkg/pipeline/task/sftp/sftp.go index 32e0f0b..184d3b7 100644 --- a/internal/pkg/pipeline/task/sftp/sftp.go +++ b/internal/pkg/pipeline/task/sftp/sftp.go @@ -17,8 +17,7 @@ import ( "github.com/patterninc/caterpillar/internal/pkg/pipeline/task" ) -// The package is named sftp, which collides with the github.com/pkg/sftp -// client package, so we import the library under the pkgsftp alias. +// The package is named sftp; the github.com/pkg/sftp client is aliased pkgsftp. const ( defaultPort = 22 @@ -27,38 +26,32 @@ const ( defaultRetryDelay = duration.Duration(1 * time.Second) ) -// ctx is a package-level background context used when creating source records, -// mirroring the file task (internal/pkg/pipeline/task/file/file.go). var ctx = context.Background() type sftp struct { task.Base `yaml:",inline" json:",inline"` - // Connection. Host string `yaml:"host" json:"host" validate:"required"` Port int `yaml:"port,omitempty" json:"port,omitempty"` Username string `yaml:"username" json:"username" validate:"required"` - // Authentication: exactly one of Password or PrivateKey. These come from - // SSM via {{ secret }} in the pipeline YAML; never log them. + // Exactly one of Password or PrivateKey. From SSM via {{ secret }}; never log. Password string `yaml:"password,omitempty" json:"password,omitempty"` PrivateKey string `yaml:"private_key,omitempty" json:"private_key,omitempty"` Passphrase string `yaml:"passphrase,omitempty" json:"passphrase,omitempty"` - // Host key verification (required — see buildHostKeyCallback). + // Host key verification (required — set one). HostKey string `yaml:"host_key,omitempty" json:"host_key,omitempty"` KnownHostsPath string `yaml:"known_hosts_path,omitempty" json:"known_hosts_path,omitempty"` - // RemotePath is the remote file/directory to download from (source mode) or - // upload to (sink mode). config.String supports per-record templating. - RemotePath config.String `yaml:"remote_path,omitempty" json:"remote_path,omitempty" validate:"required"` + // Path is the remote file/directory to download from (source) or upload to + // (sink). Used as-is; supports per-record templating. + Path config.String `yaml:"path,omitempty" json:"path,omitempty" validate:"required"` - // Reliability. Timeout duration.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"` MaxRetries int `yaml:"max_retries,omitempty" json:"max_retries,omitempty"` RetryDelay duration.Duration `yaml:"retry_delay,omitempty" json:"retry_delay,omitempty"` - // Prepared in Init, used in Run. authMethod ssh.AuthMethod hostKeyCB ssh.HostKeyCallback hostKeyAlgos []string @@ -73,10 +66,9 @@ func New() (task.Task, error) { }, nil } -// Init validates the credentials and host-key settings and prepares the SSH -// auth method and host-key callback. It does not open a connection (that -// happens in Run): Init runs for every task at config-load time, and a session -// held open from then until Run could time out. +// Init validates credentials and host-key settings and prepares the auth method +// and host-key callback. It does not connect — Init runs at config-load for +// every task, and a session held until Run could time out; we dial in Run. func (s *sftp) Init() error { authMethod, err := s.buildAuthMethod() @@ -96,9 +88,8 @@ func (s *sftp) Init() error { } -// Run infers its role from the channels, exactly like the file task: with no -// input it is a source (download from the server); with an input it is a sink -// (upload to the server). It is never both. +// Run infers its role from the channels, like the file task: no input → source +// (download); an input → sink (upload). Never both. func (s *sftp) Run(input <-chan *record.Record, output chan<- *record.Record) error { if input != nil && output != nil { @@ -109,7 +100,6 @@ func (s *sftp) Run(input <-chan *record.Record, output chan<- *record.Record) er if err != nil { return err } - // LIFO: the SFTP subsystem is torn down before the SSH transport. defer sshClient.Close() defer sftpClient.Close() @@ -122,8 +112,7 @@ func (s *sftp) Run(input <-chan *record.Record, output chan<- *record.Record) er } // connect dials the SSH transport and opens an SFTP session, retrying on -// transient failures. The dial honours Timeout so a hung server does not block -// the pipeline indefinitely. +// transient failures. The dial honours Timeout. func (s *sftp) connect() (*ssh.Client, *pkgsftp.Client, error) { addr := net.JoinHostPort(s.Host, strconv.Itoa(s.Port)) @@ -132,9 +121,8 @@ func (s *sftp) connect() (*ssh.Client, *pkgsftp.Client, error) { User: s.Username, Auth: []ssh.AuthMethod{s.authMethod}, HostKeyCallback: s.hostKeyCB, - // Constrain host-key negotiation to the pinned key's algorithm, so the - // server presents the same key type we pinned rather than a different - // one (which would be a spurious mismatch). nil = client default. + // Pin negotiation to the host key's algorithm so the server presents the + // same key type we pinned (otherwise: spurious mismatch). nil = default. HostKeyAlgorithms: s.hostKeyAlgos, Timeout: time.Duration(s.Timeout), } @@ -165,9 +153,8 @@ func (s *sftp) connect() (*ssh.Client, *pkgsftp.Client, error) { } -// buildAuthMethod turns the configured credentials into an ssh.AuthMethod. -// Exactly one of PrivateKey or Password must be set. The credentials originate -// from SSM via {{ secret }}; never include them in an error or log line. +// buildAuthMethod builds an ssh.AuthMethod from the configured credentials; +// exactly one of PrivateKey or Password must be set. Never log the credentials. func (s *sftp) buildAuthMethod() (ssh.AuthMethod, error) { switch { @@ -200,19 +187,14 @@ func (s *sftp) buildAuthMethod() (ssh.AuthMethod, error) { } -// buildHostKeyCallback decides how we verify the server's identity, failing -// closed when neither host_key nor known_hosts_path is set. It also returns the -// host-key algorithms the client should negotiate: for a pinned host_key we -// restrict to that key's algorithm, otherwise the server may present a -// different host-key type than the one we pinned and cause a spurious -// mismatch. A nil slice means "use the client default". +// buildHostKeyCallback verifies the server's identity, failing closed when +// neither host_key nor known_hosts_path is set. For a pinned host_key it also +// returns that key's algorithm to constrain negotiation (nil = client default). func (s *sftp) buildHostKeyCallback() (ssh.HostKeyCallback, []string, error) { switch { case s.HostKey != ``: - // HostKey is a single authorized-key line, e.g. - // "ssh-ed25519 AAAAC3Nza..." (the key portion of a known_hosts entry). key, _, _, _, err := ssh.ParseAuthorizedKey([]byte(s.HostKey)) if err != nil { return nil, nil, fmt.Errorf(`parsing host_key: %w`, err) @@ -233,10 +215,8 @@ func (s *sftp) buildHostKeyCallback() (ssh.HostKeyCallback, []string, error) { } -// withRetry runs fn up to attempts times, sleeping delay between tries. It -// returns nil on the first success, or the last error if every attempt fails. -// On each retried failure it logs a warning (matching the codebase's fmt-based -// logging) so a flaky connection is visible even when it eventually succeeds. +// withRetry runs fn up to attempts times, sleeping delay between tries and +// logging a warning on each retried failure so flaky connections are visible. func withRetry(label string, attempts int, delay time.Duration, fn func() error) error { if attempts < 1 { @@ -248,7 +228,6 @@ func withRetry(label string, attempts int, delay time.Duration, fn func() error) if err = fn(); err == nil { return nil } - // Don't log or sleep after the final attempt. if i < attempts-1 { fmt.Printf("WARN: %s: attempt %d/%d failed: %v; retrying in %s\n", label, i+1, attempts, err, delay) time.Sleep(delay) @@ -259,9 +238,6 @@ func withRetry(label string, attempts int, delay time.Duration, fn func() error) } -// retry wraps withRetry with the task's configured attempt count and delay, and -// a label built from the task name and the action (for example "connect" or -// "upload /incoming/file.csv"). func (s *sftp) retry(action string, fn func() error) error { return withRetry(fmt.Sprintf(`sftp task %q: %s`, s.Name, action), s.MaxRetries, time.Duration(s.RetryDelay), fn) } diff --git a/test/pipelines/sftp/download.yaml b/test/pipelines/sftp/download.yaml index 53f7a68..d080c09 100644 --- a/test/pipelines/sftp/download.yaml +++ b/test/pipelines/sftp/download.yaml @@ -12,7 +12,7 @@ tasks: # host_key verifies the server's identity (prevents man-in-the-middle). # Get the value once with: ssh-keyscan -t ed25519 -p host_key: '{{ secret "/sftp/host_key" }}' - remote_path: /upload/*.txt + path: /upload/*.txt - name: write_local type: file path: /tmp/sftp-download/{{ context "CATERPILLAR_FILE_NAME_WRITE" }} diff --git a/test/pipelines/sftp/s3_to_sftp.yaml b/test/pipelines/sftp/s3_to_sftp.yaml index ff29c7e..37515b3 100644 --- a/test/pipelines/sftp/s3_to_sftp.yaml +++ b/test/pipelines/sftp/s3_to_sftp.yaml @@ -18,7 +18,8 @@ tasks: private_key: | {{ indent 6 (secret "/data/sftp/clientX/private_key") }} host_key: '{{ secret "/data/sftp/clientX/host_key" }}' - remote_path: /incoming/ + # path is used as-is per record; template the source filename onto it. + path: '/incoming/{{ context "CATERPILLAR_FILE_NAME_WRITE" }}' timeout: 30s max_retries: 3 retry_delay: 2s diff --git a/test/pipelines/sftp/upload.yaml b/test/pipelines/sftp/upload.yaml index 70c7ae0..9de677a 100644 --- a/test/pipelines/sftp/upload.yaml +++ b/test/pipelines/sftp/upload.yaml @@ -14,4 +14,6 @@ tasks: # host_key verifies the server's identity (prevents man-in-the-middle). # Get the value once with: ssh-keyscan -t ed25519 -p host_key: '{{ secret "/sftp/host_key" }}' - remote_path: /upload/ + # path is used as-is per record; template the source filename onto it + # (the file source stores it in CATERPILLAR_FILE_NAME_WRITE). + path: '/upload/{{ context "CATERPILLAR_FILE_NAME_WRITE" }}'