Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions paimon-python/pypaimon/catalog/rest/rest_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,10 +483,6 @@ def to_table_metadata(self, db: str, response: GetTableResponse) -> TableMetadat
options[CoreOptions.PATH.key()] = response.get_path()
response.put_audit_options_to(options)

identifier = Identifier.create(db, response.get_name())
if identifier.get_branch_name() is not None:
options[CoreOptions.BRANCH.key()] = identifier.get_branch_name()

return TableMetadata(
schema=schema.copy(options),
is_external=response.get_is_external(),
Expand Down
16 changes: 9 additions & 7 deletions paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
from pypaimon.api.auth.bearer import BearTokenAuthProvider
from pypaimon.api.auth.dlf_provider import DLFAuthProvider
from pypaimon.common.identifier import Identifier, SYSTEM_TABLE_SPLITTER
from pypaimon.common.identifier import Identifier
from pypaimon.common.options import Options
from pypaimon.common.options.config import CatalogOptions, OssOptions
from pypaimon.common.uri_reader import UriReaderFactory
Expand Down Expand Up @@ -273,12 +273,14 @@ def refresh_token(self):
self.api_instance = RESTApi(self.properties, False)

table_identifier = self.identifier
if SYSTEM_TABLE_SPLITTER in self.identifier.get_object_name():
base_table = self.identifier.get_object_name().split(SYSTEM_TABLE_SPLITTER)[0]
table_identifier = Identifier(
database=self.identifier.get_database_name(),
object=base_table,
branch=self.identifier.get_branch_name())
if self.identifier.is_system_table():
# Strip the system-table suffix; preserve the branch so the token
# request resolves against the correct branch backing.
table_identifier = Identifier.create(
self.identifier.get_database_name(),
self.identifier.get_table_name(),
branch=self.identifier.get_branch_name(),
)

response = self.api_instance.load_table_token(table_identifier)
self.log.info(
Expand Down
168 changes: 143 additions & 25 deletions paimon-python/pypaimon/common/identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,93 @@
from pypaimon.common.json_util import json_field

SYSTEM_TABLE_SPLITTER = '$'
SYSTEM_BRANCH_PREFIX = 'branch-'
SYSTEM_BRANCH_PREFIX = 'branch_'
DEFAULT_MAIN_BRANCH = 'main'
UNKNOWN_DATABASE = 'unknown'


@dataclass
@dataclass(init=False)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be a break change?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching that @JingsongLi — yes, you're right. Let me enumerate what changes shape so I can address each:

  1. Identifier(database, object, branch=...) constructor kwarg removed.
  2. identifier.branch attribute removed.
  3. Identifier.create(db, object) second positional arg renamed from object to table; passing a pre-encoded object containing $ now caches it as the table name rather than splitting it.
  4. SYSTEM_BRANCH_PREFIX changed from 'branch-' to 'branch_' (this one is a bug-fix — Java has always used branch_, so anything Python wrote with branch- was already not round-trippable through the Java REST server).
  5. The branch JSON field is no longer emitted (Java @JsonIgnoreProperties(ignoreUnknown = true) was already dropping it on the wire, so this never carried information end-to-end, but the Python wire shape does change).

(4) and (5) I'd defend as correctness fixes — the pre-fix behaviour was inconsistent with Java in ways that silently corrupted cross-language scenarios, and I don't see a backward-compat path that preserves the buggy behaviour while letting branched system tables work against a Java REST server.

(1) (2) (3) are the real API-shape break. I agree those need a soft-deprecation path. I'll push a follow-up commit on this PR that:

  • Re-accepts Identifier(..., branch=...) and routes it through the new encoding internally, emitting DeprecationWarning.
  • Re-exposes identifier.branch as a property that delegates to get_branch_name(), also with DeprecationWarning.
  • Re-accepts the old Identifier.create(db, object) two-arg form (detected by $ in the second arg) and routes through the new constructor, with DeprecationWarning.
  • Adds tests that lock in both the warning and the equivalent-result behaviour, so we don't accidentally drop the shim before users have a chance to migrate.
  • Adds a migration note to the PR description.

The plan would be to remove the shim in the next minor version. Does that sound reasonable?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why you need to introduce _BRANCH_NOT_SET, Java also has many constructors.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why you need to introduce _BRANCH_NOT_SET, Java also has many constructors.

You're right — the sentinel was overkill, and the deprecation path it was
gating is unnecessary if we just match Java's constructor shape directly.

Pushed b17c946, which:

  • Drops _BRANCH_NOT_SET and all DeprecationWarning.
  • __init__(database, object=None, branch=None, system_table=None)
    mirrors Java's three public constructors in one signature: when
    branch / system_table is non-None, encode into object; otherwise
    treat object as the final string (the @JsonCreator path).
  • Identifier.create keeps its existing kwargs and now just delegates
    to the constructor — Java has only the 2-arg create, but keeping
    the multi-arg form here costs nothing and avoids touching every
    caller introduced by the first commit.

Backward compatibility against the pre-PR public surface (the worry
from your first review) is preserved without warnings:

  • Identifier(db, obj, branch=...) / branch=None — accepted, encodes
    into object so the Java REST server now actually sees the branch.
  • identifier.branch — read+write property. The setter re-encodes
    object so id.branch = "x" keeps working for any caller that used
    to assign to the dataclass field.
  • Identifier.create(db, "tbl$snapshots") two-arg form — unchanged.

A new IdentifierBackwardCompatibilityTest locks these in by asserting
"the old call sites run without raising" (no warning assertions, no
shim semantics — these are first-class supported entry points now).

PTAL.

class Identifier:
"""Identifies a database object (table, view, etc.).

1:1 port of ``org.apache.paimon.catalog.Identifier``: the on-the-wire
shape is exactly two fields, ``database`` and ``object``. Any branch /
system-table portion is encoded into the ``object`` field using the
``$`` separator and the ``branch_`` prefix, so JSON written by Python
is round-trippable through the Java REST server (and vice versa).

Mirrors Java's three public constructors via a single signature:
* ``Identifier(database, object)`` — JSON-create form. ``object``
is the final, possibly-encoded string.
* ``Identifier(database, table, branch=...)`` — encodes ``branch``
into ``object``.
* ``Identifier(database, table, branch=..., system_table=...)`` —
encodes both.

``branch == "main"`` (case-insensitive) is treated as the default
branch and is not encoded into the object name, matching Java.
"""

database: str = json_field("database", default=None)
object: str = json_field("object", default=None)
branch: Optional[str] = json_field("branch", default=None)

def __init__(self, database: str, object: Optional[str] = None,
branch: Optional[str] = None,
system_table: Optional[str] = None):
self.database = database
if branch is None and system_table is None:
# @JsonCreator form: ``object`` is already the final, encoded
# string. Components are decoded lazily by _split_object_name().
self.object = object
self._table: Optional[str] = None
self._branch: Optional[str] = None
self._system_table: Optional[str] = None
else:
# Encoding form: ``object`` is the bare table name; encode
# branch / system_table into the on-wire ``object``.
builder = object
if branch is not None and branch.lower() != DEFAULT_MAIN_BRANCH:
builder = (builder + SYSTEM_TABLE_SPLITTER
+ SYSTEM_BRANCH_PREFIX + branch)
if system_table is not None:
builder = builder + SYSTEM_TABLE_SPLITTER + system_table
self.object = builder
self._table = object
self._branch = branch
self._system_table = system_table

@classmethod
def create(cls, database: str, object: str) -> "Identifier":
return cls(database, object)
def create(cls, database: str, table: str,
branch: Optional[str] = None,
system_table: Optional[str] = None) -> "Identifier":
"""Create an Identifier.

Two-arg form ``create(database, object)`` mirrors Java's
``Identifier.create``: the second argument is treated as the final
``object`` string (may already carry encoded branch / system_table
segments).

Multi-arg form ``create(database, table, branch=..., system_table=...)``
is a Python convenience that encodes the components into ``object``
for you, equivalent to ``Identifier(database, table, branch=...,
system_table=...)``.
"""
return cls(database, table, branch=branch, system_table=system_table)

@classmethod
def from_string(cls, full_name: str) -> "Identifier":
"""Parse a 'database.object' identifier, with optional backtick quoting."""
"""Parse a ``database.object`` identifier, with optional backtick quoting."""
if not full_name or not full_name.strip():
raise ValueError("fullName cannot be null or empty")

# Check if backticks are used - if so, parse with backtick support
if '`' in full_name:
return cls._parse_with_backticks(full_name)

# Otherwise, use Java-compatible split on first period only
parts = full_name.split(".", 1)

if len(parts) != 2:
raise ValueError(
f"Cannot get splits from '{full_name}' to get database and object"
"Cannot get splits from '{}' to get database and object".format(full_name)
)

return cls(parts[0], parts[1])
Expand All @@ -75,41 +131,103 @@ def _parse_with_backticks(cls, full_name: str) -> "Identifier":
parts.append(current)

if in_backticks:
raise ValueError(f"Unclosed backtick in identifier: {full_name}")
raise ValueError("Unclosed backtick in identifier: {}".format(full_name))

if len(parts) != 2:
raise ValueError(f"Invalid identifier format: {full_name}")
raise ValueError("Invalid identifier format: {}".format(full_name))

return cls(parts[0], parts[1])

def _split_object_name(self) -> None:
if self._table is not None:
return

splits = self.object.split(SYSTEM_TABLE_SPLITTER)
if len(splits) == 1:
self._table = self.object
self._branch = None
self._system_table = None
elif len(splits) == 2:
self._table = splits[0]
if splits[1].startswith(SYSTEM_BRANCH_PREFIX):
self._branch = splits[1][len(SYSTEM_BRANCH_PREFIX):]
self._system_table = None
else:
self._branch = None
self._system_table = splits[1]
elif len(splits) == 3:
if not splits[1].startswith(SYSTEM_BRANCH_PREFIX):
raise ValueError(
"System table can only contain one '$' separator, "
"but this is: " + self.object
)
self._table = splits[0]
self._branch = splits[1][len(SYSTEM_BRANCH_PREFIX):]
self._system_table = splits[2]
else:
raise ValueError("Invalid object name: " + self.object)

def get_full_name(self) -> str:
if self.branch:
return "{}.{}.{}".format(self.database, self.object, self.branch)
# Match Java: tables created without an explicit database (e.g. some
# ad-hoc query paths) land in the special "unknown" database, in which
# case the database segment is dropped from the rendered name.
if UNKNOWN_DATABASE == self.database:
return self.object
return "{}.{}".format(self.database, self.object)

def get_database_name(self) -> str:
return self.database

def get_table_name(self) -> str:
return self.object
self._split_object_name()
return self._table

def get_object_name(self) -> str:
return self.object

def get_branch_name(self) -> Optional[str]:
return self.branch
self._split_object_name()
return self._branch

def get_branch_name_or_default(self) -> str:
"""Get branch name or return default 'main' if branch is None."""
return self.branch if self.branch else "main"
"""Get branch name or return ``DEFAULT_MAIN_BRANCH`` if no branch is encoded."""
branch = self.get_branch_name()
return branch if branch is not None else DEFAULT_MAIN_BRANCH

def __hash__(self):
return hash((self.database, self.object, self.branch))
def get_system_table_name(self) -> Optional[str]:
self._split_object_name()
return self._system_table

def is_system_table(self) -> bool:
if SYSTEM_TABLE_SPLITTER not in self.object:
return False
parts = self.object.split(SYSTEM_TABLE_SPLITTER)
if len(parts) == 2:
return not parts[1].startswith(SYSTEM_BRANCH_PREFIX)
return len(parts) == 3
return self.get_system_table_name() is not None

@property
def branch(self) -> Optional[str]:
# Read/write alias for callers that previously accessed the
# ``Identifier.branch`` dataclass field directly. Java's
# ``branch`` is transient/private and not exposed; Python kept
# it public, so this property tides external code over.
return self.get_branch_name()

@branch.setter
def branch(self, value: Optional[str]) -> None:
# Re-encode ``object`` so the wire shape stays consistent with
# the new value (equivalent to Identifier(db, table, branch=value,
# system_table=current_system_table)).
table = self.get_table_name()
system_table = self.get_system_table_name()
rebuilt = Identifier(
self.database, table, branch=value, system_table=system_table
)
self.object = rebuilt.object
self._table = table
self._branch = value
self._system_table = system_table

def __hash__(self):
return hash((self.database, self.object))

def __eq__(self, other):
if not isinstance(other, Identifier):
return NotImplemented
return self.database == other.database and self.object == other.object
19 changes: 6 additions & 13 deletions paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
from typing import List

from pypaimon.catalog.catalog import Catalog

logger = logging.getLogger(__name__)
from pypaimon.common.identifier import Identifier
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import (PartitionStatistics,
SnapshotCommit)

logger = logging.getLogger(__name__)


class CatalogSnapshotCommit(SnapshotCommit):
"""A SnapshotCommit using Catalog to commit."""
Expand All @@ -37,20 +37,19 @@ def __init__(self, catalog: Catalog, identifier: Identifier, uuid: str):

Args:
catalog: The catalog instance to use for committing
identifier: The table identifier
identifier: The table identifier (already encodes branch in object name)
uuid: Optional table UUID for verification
"""
self.catalog = catalog
self.identifier = identifier
self.uuid = uuid

def commit(self, snapshot: Snapshot, branch: str, statistics: List[PartitionStatistics]) -> bool:
def commit(self, snapshot: Snapshot, statistics: List[PartitionStatistics]) -> bool:
"""
Commit the snapshot using the catalog.

Args:
snapshot: The snapshot to commit
branch: The branch name to commit to
statistics: List of partition statistics

Returns:
Expand All @@ -59,17 +58,11 @@ def commit(self, snapshot: Snapshot, branch: str, statistics: List[PartitionStat
Raises:
Exception: If commit fails
"""
new_identifier = Identifier(
database=self.identifier.get_database_name(),
object=self.identifier.get_table_name(),
branch=branch
)

# Call catalog's commit_snapshot method
if hasattr(self.catalog, 'commit_snapshot'):
success = self.catalog.commit_snapshot(new_identifier, self.uuid, snapshot, statistics)
success = self.catalog.commit_snapshot(self.identifier, self.uuid, snapshot, statistics)
if success:
logger.info("Catalog snapshot commit succeeded for %s, snapshot id %d", new_identifier, snapshot.id)
logger.info("Catalog snapshot commit succeeded for %s, snapshot id %d", self.identifier, snapshot.id)
return success
else:
# Fallback for catalogs that don't support snapshot commits
Expand Down
3 changes: 1 addition & 2 deletions paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,12 @@ def __init__(self, snapshot_manager: SnapshotManager):
self.snapshot_manager = snapshot_manager
self.file_io: FileIO = snapshot_manager.file_io

def commit(self, snapshot: Snapshot, branch: str, statistics: List[PartitionStatistics]) -> bool:
def commit(self, snapshot: Snapshot, statistics: List[PartitionStatistics]) -> bool:
"""
Commit the snapshot using file renaming.
Args:
snapshot: The snapshot to commit
branch: The branch name to commit to
statistics: List of partition statistics (currently unused but kept for interface compatibility)
Returns:
Expand Down
3 changes: 1 addition & 2 deletions paimon-python/pypaimon/snapshot/snapshot_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ class SnapshotCommit(ABC):
"""Interface to commit snapshot atomically."""

@abstractmethod
def commit(self, snapshot: Snapshot, branch: str, statistics: List[PartitionStatistics]) -> bool:
def commit(self, snapshot: Snapshot, statistics: List[PartitionStatistics]) -> bool:
"""
Commit the given snapshot.

Args:
snapshot: The snapshot to commit
branch: The branch name to commit to
statistics: List of partition statistics

Returns:
Expand Down
23 changes: 19 additions & 4 deletions paimon-python/pypaimon/table/file_store_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ def from_path(cls, table_path: str) -> 'FileStoreTable':
return cls(file_io, identifier, table_path, table_schema)

def current_branch(self) -> str:
"""Get the current branch name from options."""
return self.options.branch()
"""Get the current branch name from the identifier."""
return self.identifier.get_branch_name_or_default()

def comment(self) -> Optional[str]:
"""Get the table comment."""
Expand Down Expand Up @@ -406,8 +406,23 @@ def copy(self, options: dict) -> 'FileStoreTable':
if time_travel_schema is not None:
new_table_schema = time_travel_schema

return FileStoreTable(self.file_io, self.identifier, self.table_path, new_table_schema,
self.catalog_environment)
# Re-encode the branch into the identifier when the option changes, so
# current_branch() and any catalog-routed snapshot commit see the
# branched object name without an extra side channel.
new_identifier = self.identifier
catalog_env = self.catalog_environment
branch_key = CoreOptions.BRANCH.key()
if branch_key in options:
new_branch = options[branch_key]
new_identifier = Identifier.create(
self.identifier.get_database_name(),
self.identifier.get_table_name(),
branch=new_branch,
)
catalog_env = self.catalog_environment.copy(new_identifier)

return FileStoreTable(self.file_io, new_identifier, self.table_path, new_table_schema,
catalog_env)

def _try_time_travel(self, options: Options) -> Optional[TableSchema]:
"""
Expand Down
Loading
Loading