Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ Caterpillar supports the following tasks, each of which can serve different role
- **`kafka`** - [Read from or write to Kafka topics (acts as source or sink)](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/kafka/README.md)
- **`replace`** - [Perform regex-based text replacement and transformation](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/replace/README.md)
- **`sample`** - [Sample data using various strategies (random, head, tail, nth, percent)](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/sample/README.md)
- **`sftp`** - [Transfer files to and from SFTP servers (upload, download, list, move, delete)](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/sftp/README.md)
- **`sftp`** - [Transfer files to and from SFTP servers (upload, download)](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/sftp/README.md)
- **`split`** - [Split data by specified delimiters](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/split/README.md)
- **`sqs`** - [Read from or write to AWS SQS queues (acts as source or sink)](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/sqs/README.md)
- **`xpath`** - [Extract data from XML/HTML using XPath expressions](https://github.com/patterninc/caterpillar/blob/main/internal/pkg/pipeline/task/xpath/README.md)
Expand Down
14 changes: 8 additions & 6 deletions internal/pkg/pipeline/task/sftp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@ The `sftp` task handles SFTP only. It does not talk to S3 directly. Instead, it
- **Upload (S3 → SFTP)**: the `file` task reads from `s3://…` and the `sftp` task writes the files to the server.
- **Download (SFTP → S3)**: the `sftp` task reads files from the server and the `file` task writes them to `s3://…`.

This reuses Caterpillar's existing S3 code. The file name passes from one task to the next through a record context value, so `file` and `sftp` work together without extra configuration.
This reuses Caterpillar's existing S3 code. On download, each file's sanitized base name is stored in a record context value; on upload you reference it in `path` (`{{ context "CATERPILLAR_FILE_NAME_WRITE" }}`) to keep consistent names.

## Behavior

Like the `file` task, the role is **inferred from the channels**:

| The task has… | Role | What it does |
|---------------|------|--------------|
| **no input** (it is the first task) | **source — download** | Reads file(s) at `remote_path` (a single file, a glob, or a directory) and emits one record per file. The base name is stored in the record context (`CATERPILLAR_FILE_NAME_WRITE`) so a downstream task can name what it writes. |
| **an input** | **sink — upload** | Writes each incoming record's data to the server. If `remote_path` is a directory (trailing `/` or an existing directory), the source file name is appended. |
| **no input** (it is the first task) | **source — download** | Reads file(s) at `path` (a single file or a glob; doublestar `**` and `{a,b}` are supported, like the file task) and emits one record per file. The base name is stored in the record context (`CATERPILLAR_FILE_NAME_WRITE`) so a downstream task can name what it writes. |
| **an input** | **sink — upload** | Writes each incoming record's data to `path`, used as-is per record. To name files from the source, template `path` — e.g. `{{ context "CATERPILLAR_FILE_NAME_WRITE" }}`. |

It cannot be both: configuring the task with both an input and an output is an error.

For non-file sources that don't set a file name (Kafka, HTTP, …), template `path` yourself — with a macro like `{{ macro "uuid" }}`, a value extracted from the record via a `context:` jq map, or `{{ context "CATERPILLAR_ARCHIVE_FILE_NAME_WRITE" }}` for an archive-unpack source.

## Authentication

Set **exactly one** of `password` or `private_key`. Read credentials from SSM with the `{{ secret }}` template. Do not write them directly in the YAML.
Expand Down Expand Up @@ -66,7 +68,7 @@ If you set neither, the task refuses to connect (it fails closed). You can obtai
| `passphrase` | string | - | Passphrase for an encrypted `private_key` |
| `host_key` | string | - | Authorized-key line used to verify the server |
| `known_hosts_path` | string | - | Path to a `known_hosts` file |
| `remote_path` | string | - | Remote file or directory (required; supports templating). On upload, a trailing `/` or an existing directory is treated as a directory. |
| `path` | string | - | Remote file path (required; supports per-record templating). On download it may be a glob (`**`/`{a,b}` supported); a bare directory is not expanded. Used as-is — template a context value such as `{{ context "CATERPILLAR_FILE_NAME_WRITE" }}` to name uploaded files from the source. |
| `timeout` | duration | `30s` | SSH connection timeout (for example `15s`, `1m`) |
| `max_retries` | int | `3` | Attempts per connect or transfer operation |
| `retry_delay` | duration | `1s` | Delay between retries |
Expand All @@ -90,7 +92,7 @@ tasks:
private_key: |
{{ indent 6 (secret "/data/sftp/clientX/private_key") }}
host_key: '{{ secret "/data/sftp/clientX/host_key" }}'
remote_path: /incoming/
path: '/incoming/{{ context "CATERPILLAR_FILE_NAME_WRITE" }}'
```

### Download files from an SFTP server to S3
Expand All @@ -103,7 +105,7 @@ tasks:
username: '{{ secret "/data/sftp/clientX/username" }}'
password: '{{ secret "/data/sftp/clientX/password" }}'
known_hosts_path: /etc/ssh/known_hosts
remote_path: /outgoing/*.csv
path: /outgoing/*.csv
- name: write_to_s3
type: file
# The file task writes to `path` exactly as given. The sftp download stores
Expand Down
153 changes: 66 additions & 87 deletions internal/pkg/pipeline/task/sftp/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,35 @@ import (
"bytes"
"fmt"
"io"
"path"
pathpkg "path"
"strings"

"github.com/bmatcuk/doublestar"
pkgsftp "github.com/pkg/sftp"

"github.com/patterninc/caterpillar/internal/pkg/pipeline/record"
"github.com/patterninc/caterpillar/internal/pkg/pipeline/task"
"github.com/patterninc/caterpillar/internal/pkg/textutil"
)

// upload (sink): write each incoming record's data to the server. When
// remote_path is a directory, the upstream filename (carried in the record
// context by the source task) is appended, so `file -> sftp` composes with no
// glue.
// upload (sink): write each incoming record's data to Path, used as-is per
// record. To name files from the source, template Path with a context value —
// e.g. {{ context "CATERPILLAR_FILE_NAME_WRITE" }} for a file source, or
// {{ context "CATERPILLAR_ARCHIVE_FILE_NAME_WRITE" }} for an archive source.
func (s *sftp) upload(client *pkgsftp.Client, input <-chan *record.Record) error {

// Cache isDirLike per remote_path so a static (or repeated) destination is
// stat-ed once per run rather than once per record. Local to this call so
// concurrent workers don't share it.
dirCache := make(map[string]bool)

for {
rc, ok := s.GetRecord(input)
if !ok {
break
}

remotePath, err := s.RemotePath.Get(rc)
file, err := s.Path.Get(rc)
if err != nil {
return err
}

remoteFile := remotePath
if filename, found := rc.GetContextValue(string(task.CtxKeyFileNameWrite)); found && filename != `` && s.isDirLikeCached(client, remotePath, dirCache) {
remoteFile = path.Join(remotePath, filename)
}

if err := s.uploadOne(client, remoteFile, rc.Data); err != nil {
if err := s.uploadOne(client, file, rc.Data); err != nil {
return err
}
}
Expand All @@ -50,32 +41,30 @@ func (s *sftp) upload(client *pkgsftp.Client, input <-chan *record.Record) error

}

func (s *sftp) uploadOne(client *pkgsftp.Client, remoteFile string, data []byte) error {
func (s *sftp) uploadOne(client *pkgsftp.Client, file string, data []byte) error {

return s.retry(fmt.Sprintf(`upload %s`, remoteFile), func() error {
return s.retry(fmt.Sprintf(`upload %s`, file), func() error {

if dir := path.Dir(remoteFile); dir != `` && dir != `.` {
if dir := pathpkg.Dir(file); dir != `` && dir != `.` {
if err := client.MkdirAll(dir); err != nil {
return fmt.Errorf(`creating remote dir %q: %w`, dir, err)
}
}

f, err := client.Create(remoteFile)
f, err := client.Create(file)
if err != nil {
return fmt.Errorf(`creating remote file %q: %w`, remoteFile, err)
return fmt.Errorf(`creating remote file %q: %w`, file, err)
}

if _, err := io.Copy(f, bytes.NewReader(data)); err != nil {
f.Close() // best effort; the copy error is the underlying failure
return fmt.Errorf(`writing remote file %q: %w`, remoteFile, err)
f.Close()
return fmt.Errorf(`writing remote file %q: %w`, file, err)
}

// Check the Close error explicitly: for SFTP writes the final flush/
// commit happens here and may be the only place a late failure (e.g.
// server out of space) surfaces. Ignoring it could report success for
// an incomplete upload.
// Check Close: for SFTP writes the final flush happens here and may be the
// only place a late failure (e.g. server out of space) surfaces.
if err := f.Close(); err != nil {
return fmt.Errorf(`closing remote file %q: %w`, remoteFile, err)
return fmt.Errorf(`closing remote file %q: %w`, file, err)
}

return nil
Expand All @@ -84,18 +73,17 @@ func (s *sftp) uploadOne(client *pkgsftp.Client, remoteFile string, data []byte)

}

// download (source): read file(s) from remote_path and emit one record per
// file. remote_path may be a single file, a glob, or a directory. The basename
// is stored in the record context so a downstream file task can name the object
// it writes (mirrors file.readFile).
// download (source): read file(s) at Path (a single file or a glob) and emit
// one record per file. The base name is stored in the record context so a
// downstream task can name what it writes (mirrors file.readFile).
func (s *sftp) download(client *pkgsftp.Client, output chan<- *record.Record) error {

remotePath, err := s.RemotePath.Get(nil)
path, err := s.Path.Get(nil)
if err != nil {
return err
}

paths, err := s.resolveDownloadPaths(client, remotePath)
paths, err := s.parse(client, path)
if err != nil {
return err
}
Expand All @@ -107,57 +95,60 @@ func (s *sftp) download(client *pkgsftp.Client, output chan<- *record.Record) er
}

rc := &record.Record{Context: ctx}
rc.SetContextValue(string(task.CtxKeyFileNameWrite), textutil.SlugifyFileName(path.Base(p)))
rc.SetContextValue(string(task.CtxKeyFileNameWrite), textutil.SlugifyFileName(pathpkg.Base(p)))
s.SendData(rc.Context, data, output)
}

return nil

}

func (s *sftp) resolveDownloadPaths(client *pkgsftp.Client, remotePath string) ([]string, error) {

if containsGlob(remotePath) {
matches, err := client.Glob(remotePath)
if err != nil {
return nil, fmt.Errorf(`globbing %q: %w`, remotePath, err)
// parse turns Path into the list of files to download.
// A glob is matched with doublestar by walking the static base directory and matching
// each file against the pattern; a plain path matches itself. Matching no files
// is an error — the named file is missing, or the glob matched nothing.
Comment thread
ShivangNagta marked this conversation as resolved.
func (s *sftp) parse(client *pkgsftp.Client, path string) ([]string, error) {

var matches []string
walker := client.Walk(globBase(path))
for walker.Step() {
if err := walker.Err(); err != nil {
return nil, fmt.Errorf(`walking %q: %w`, path, err)
}
return matches, nil
}

// A directory expands to the (non-directory) files directly inside it.
if info, err := client.Stat(remotePath); err == nil && info.IsDir() {
entries, err := client.ReadDir(remotePath)
if walker.Stat().IsDir() {
continue
}
ok, err := doublestar.Match(path, walker.Path())
if err != nil {
return nil, fmt.Errorf(`reading remote dir %q: %w`, remotePath, err)
return nil, fmt.Errorf(`bad glob %q: %w`, path, err)
}
paths := make([]string, 0, len(entries))
for _, e := range entries {
if !e.IsDir() {
paths = append(paths, path.Join(remotePath, e.Name()))
}
if ok {
matches = append(matches, walker.Path())
}
return paths, nil
}

return []string{remotePath}, nil
if len(matches) == 0 {
return nil, fmt.Errorf(`no files found at %q`, path)
}
Comment thread
ShivangNagta marked this conversation as resolved.

return matches, nil

}

func (s *sftp) downloadOne(client *pkgsftp.Client, remoteFile string) ([]byte, error) {
func (s *sftp) downloadOne(client *pkgsftp.Client, file string) ([]byte, error) {

var data []byte

err := s.retry(fmt.Sprintf(`download %s`, remoteFile), func() error {
f, err := client.Open(remoteFile)
err := s.retry(fmt.Sprintf(`download %s`, file), func() error {
f, err := client.Open(file)
if err != nil {
return fmt.Errorf(`opening remote file %q: %w`, remoteFile, err)
return fmt.Errorf(`opening remote file %q: %w`, file, err)
}
defer f.Close()

b, err := io.ReadAll(f)
if err != nil {
return fmt.Errorf(`reading remote file %q: %w`, remoteFile, err)
return fmt.Errorf(`reading remote file %q: %w`, file, err)
}
data = b
return nil
Expand All @@ -167,32 +158,20 @@ func (s *sftp) downloadOne(client *pkgsftp.Client, remoteFile string) ([]byte, e

}

// isDirLikeCached wraps isDirLike with a per-run cache keyed on remotePath, so
// the same destination is stat-ed at most once instead of once per record.
func (s *sftp) isDirLikeCached(client *pkgsftp.Client, remotePath string, cache map[string]bool) bool {
if v, ok := cache[remotePath]; ok {
return v
// globBase returns the longest leading directory of pattern with no glob
// metacharacter — the point from which to start walking.
func globBase(pattern string) string {
i := strings.IndexAny(pattern, `*?[{`)
if i < 0 {
return pattern
}
v := s.isDirLike(client, remotePath)
cache[remotePath] = v
return v
}

// isDirLike reports whether remotePath should be treated as a directory to
// place an uploaded file into: either it ends with "/" or it already exists as
// a directory on the server.
func (s *sftp) isDirLike(client *pkgsftp.Client, remotePath string) bool {

if strings.HasSuffix(remotePath, `/`) {
return true
dir := pattern[:i]
switch j := strings.LastIndex(dir, `/`); {
case j < 0:
return `.`
case j == 0:
return `/`
default:
return dir[:j]
}
if info, err := client.Stat(remotePath); err == nil && info.IsDir() {
return true
}
return false

}

func containsGlob(p string) bool {
return strings.ContainsAny(p, `*?[`)
}
Loading
Loading