diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py b/paimon-python/pypaimon/catalog/rest/rest_catalog.py index 810fed963baf..fe0327430308 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py +++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py @@ -483,10 +483,6 @@ def to_table_metadata(self, db: str, response: GetTableResponse) -> TableMetadat options[CoreOptions.PATH.key()] = response.get_path() response.put_audit_options_to(options) - identifier = Identifier.create(db, response.get_name()) - if identifier.get_branch_name() is not None: - options[CoreOptions.BRANCH.key()] = identifier.get_branch_name() - return TableMetadata( schema=schema.copy(options), is_external=response.get_is_external(), diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py index 7d4553175b18..b1dfb2264f68 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py +++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py @@ -29,7 +29,7 @@ from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO from pypaimon.api.auth.bearer import BearTokenAuthProvider from pypaimon.api.auth.dlf_provider import DLFAuthProvider -from pypaimon.common.identifier import Identifier, SYSTEM_TABLE_SPLITTER +from pypaimon.common.identifier import Identifier from pypaimon.common.options import Options from pypaimon.common.options.config import CatalogOptions, OssOptions from pypaimon.common.uri_reader import UriReaderFactory @@ -273,12 +273,14 @@ def refresh_token(self): self.api_instance = RESTApi(self.properties, False) table_identifier = self.identifier - if SYSTEM_TABLE_SPLITTER in self.identifier.get_object_name(): - base_table = self.identifier.get_object_name().split(SYSTEM_TABLE_SPLITTER)[0] - table_identifier = Identifier( - database=self.identifier.get_database_name(), - object=base_table, - branch=self.identifier.get_branch_name()) + if self.identifier.is_system_table(): + # Strip the system-table suffix; preserve the branch so the token + # request resolves against the correct branch backing. + table_identifier = Identifier.create( + self.identifier.get_database_name(), + self.identifier.get_table_name(), + branch=self.identifier.get_branch_name(), + ) response = self.api_instance.load_table_token(table_identifier) self.log.info( diff --git a/paimon-python/pypaimon/common/identifier.py b/paimon-python/pypaimon/common/identifier.py index a8ce6fbea0e3..7973d4024c54 100755 --- a/paimon-python/pypaimon/common/identifier.py +++ b/paimon-python/pypaimon/common/identifier.py @@ -21,37 +21,93 @@ from pypaimon.common.json_util import json_field SYSTEM_TABLE_SPLITTER = '$' -SYSTEM_BRANCH_PREFIX = 'branch-' +SYSTEM_BRANCH_PREFIX = 'branch_' DEFAULT_MAIN_BRANCH = 'main' +UNKNOWN_DATABASE = 'unknown' -@dataclass +@dataclass(init=False) class Identifier: + """Identifies a database object (table, view, etc.). + + 1:1 port of ``org.apache.paimon.catalog.Identifier``: the on-the-wire + shape is exactly two fields, ``database`` and ``object``. Any branch / + system-table portion is encoded into the ``object`` field using the + ``$`` separator and the ``branch_`` prefix, so JSON written by Python + is round-trippable through the Java REST server (and vice versa). + + Mirrors Java's three public constructors via a single signature: + * ``Identifier(database, object)`` — JSON-create form. ``object`` + is the final, possibly-encoded string. + * ``Identifier(database, table, branch=...)`` — encodes ``branch`` + into ``object``. + * ``Identifier(database, table, branch=..., system_table=...)`` — + encodes both. + + ``branch == "main"`` (case-insensitive) is treated as the default + branch and is not encoded into the object name, matching Java. + """ database: str = json_field("database", default=None) object: str = json_field("object", default=None) - branch: Optional[str] = json_field("branch", default=None) + + def __init__(self, database: str, object: Optional[str] = None, + branch: Optional[str] = None, + system_table: Optional[str] = None): + self.database = database + if branch is None and system_table is None: + # @JsonCreator form: ``object`` is already the final, encoded + # string. Components are decoded lazily by _split_object_name(). + self.object = object + self._table: Optional[str] = None + self._branch: Optional[str] = None + self._system_table: Optional[str] = None + else: + # Encoding form: ``object`` is the bare table name; encode + # branch / system_table into the on-wire ``object``. + builder = object + if branch is not None and branch.lower() != DEFAULT_MAIN_BRANCH: + builder = (builder + SYSTEM_TABLE_SPLITTER + + SYSTEM_BRANCH_PREFIX + branch) + if system_table is not None: + builder = builder + SYSTEM_TABLE_SPLITTER + system_table + self.object = builder + self._table = object + self._branch = branch + self._system_table = system_table @classmethod - def create(cls, database: str, object: str) -> "Identifier": - return cls(database, object) + def create(cls, database: str, table: str, + branch: Optional[str] = None, + system_table: Optional[str] = None) -> "Identifier": + """Create an Identifier. + + Two-arg form ``create(database, object)`` mirrors Java's + ``Identifier.create``: the second argument is treated as the final + ``object`` string (may already carry encoded branch / system_table + segments). + + Multi-arg form ``create(database, table, branch=..., system_table=...)`` + is a Python convenience that encodes the components into ``object`` + for you, equivalent to ``Identifier(database, table, branch=..., + system_table=...)``. + """ + return cls(database, table, branch=branch, system_table=system_table) @classmethod def from_string(cls, full_name: str) -> "Identifier": - """Parse a 'database.object' identifier, with optional backtick quoting.""" + """Parse a ``database.object`` identifier, with optional backtick quoting.""" if not full_name or not full_name.strip(): raise ValueError("fullName cannot be null or empty") - # Check if backticks are used - if so, parse with backtick support if '`' in full_name: return cls._parse_with_backticks(full_name) - # Otherwise, use Java-compatible split on first period only parts = full_name.split(".", 1) if len(parts) != 2: raise ValueError( - f"Cannot get splits from '{full_name}' to get database and object" + "Cannot get splits from '{}' to get database and object".format(full_name) ) return cls(parts[0], parts[1]) @@ -75,41 +131,103 @@ def _parse_with_backticks(cls, full_name: str) -> "Identifier": parts.append(current) if in_backticks: - raise ValueError(f"Unclosed backtick in identifier: {full_name}") + raise ValueError("Unclosed backtick in identifier: {}".format(full_name)) if len(parts) != 2: - raise ValueError(f"Invalid identifier format: {full_name}") + raise ValueError("Invalid identifier format: {}".format(full_name)) return cls(parts[0], parts[1]) + def _split_object_name(self) -> None: + if self._table is not None: + return + + splits = self.object.split(SYSTEM_TABLE_SPLITTER) + if len(splits) == 1: + self._table = self.object + self._branch = None + self._system_table = None + elif len(splits) == 2: + self._table = splits[0] + if splits[1].startswith(SYSTEM_BRANCH_PREFIX): + self._branch = splits[1][len(SYSTEM_BRANCH_PREFIX):] + self._system_table = None + else: + self._branch = None + self._system_table = splits[1] + elif len(splits) == 3: + if not splits[1].startswith(SYSTEM_BRANCH_PREFIX): + raise ValueError( + "System table can only contain one '$' separator, " + "but this is: " + self.object + ) + self._table = splits[0] + self._branch = splits[1][len(SYSTEM_BRANCH_PREFIX):] + self._system_table = splits[2] + else: + raise ValueError("Invalid object name: " + self.object) + def get_full_name(self) -> str: - if self.branch: - return "{}.{}.{}".format(self.database, self.object, self.branch) + # Match Java: tables created without an explicit database (e.g. some + # ad-hoc query paths) land in the special "unknown" database, in which + # case the database segment is dropped from the rendered name. + if UNKNOWN_DATABASE == self.database: + return self.object return "{}.{}".format(self.database, self.object) def get_database_name(self) -> str: return self.database def get_table_name(self) -> str: - return self.object + self._split_object_name() + return self._table def get_object_name(self) -> str: return self.object def get_branch_name(self) -> Optional[str]: - return self.branch + self._split_object_name() + return self._branch def get_branch_name_or_default(self) -> str: - """Get branch name or return default 'main' if branch is None.""" - return self.branch if self.branch else "main" + """Get branch name or return ``DEFAULT_MAIN_BRANCH`` if no branch is encoded.""" + branch = self.get_branch_name() + return branch if branch is not None else DEFAULT_MAIN_BRANCH - def __hash__(self): - return hash((self.database, self.object, self.branch)) + def get_system_table_name(self) -> Optional[str]: + self._split_object_name() + return self._system_table def is_system_table(self) -> bool: - if SYSTEM_TABLE_SPLITTER not in self.object: - return False - parts = self.object.split(SYSTEM_TABLE_SPLITTER) - if len(parts) == 2: - return not parts[1].startswith(SYSTEM_BRANCH_PREFIX) - return len(parts) == 3 + return self.get_system_table_name() is not None + + @property + def branch(self) -> Optional[str]: + # Read/write alias for callers that previously accessed the + # ``Identifier.branch`` dataclass field directly. Java's + # ``branch`` is transient/private and not exposed; Python kept + # it public, so this property tides external code over. + return self.get_branch_name() + + @branch.setter + def branch(self, value: Optional[str]) -> None: + # Re-encode ``object`` so the wire shape stays consistent with + # the new value (equivalent to Identifier(db, table, branch=value, + # system_table=current_system_table)). + table = self.get_table_name() + system_table = self.get_system_table_name() + rebuilt = Identifier( + self.database, table, branch=value, system_table=system_table + ) + self.object = rebuilt.object + self._table = table + self._branch = value + self._system_table = system_table + + def __hash__(self): + return hash((self.database, self.object)) + + def __eq__(self, other): + if not isinstance(other, Identifier): + return NotImplemented + return self.database == other.database and self.object == other.object diff --git a/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py b/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py index 77e5492798ba..68cb0e6cef6e 100755 --- a/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py +++ b/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py @@ -20,13 +20,13 @@ from typing import List from pypaimon.catalog.catalog import Catalog - -logger = logging.getLogger(__name__) from pypaimon.common.identifier import Identifier from pypaimon.snapshot.snapshot import Snapshot from pypaimon.snapshot.snapshot_commit import (PartitionStatistics, SnapshotCommit) +logger = logging.getLogger(__name__) + class CatalogSnapshotCommit(SnapshotCommit): """A SnapshotCommit using Catalog to commit.""" @@ -37,20 +37,19 @@ def __init__(self, catalog: Catalog, identifier: Identifier, uuid: str): Args: catalog: The catalog instance to use for committing - identifier: The table identifier + identifier: The table identifier (already encodes branch in object name) uuid: Optional table UUID for verification """ self.catalog = catalog self.identifier = identifier self.uuid = uuid - def commit(self, snapshot: Snapshot, branch: str, statistics: List[PartitionStatistics]) -> bool: + def commit(self, snapshot: Snapshot, statistics: List[PartitionStatistics]) -> bool: """ Commit the snapshot using the catalog. Args: snapshot: The snapshot to commit - branch: The branch name to commit to statistics: List of partition statistics Returns: @@ -59,17 +58,11 @@ def commit(self, snapshot: Snapshot, branch: str, statistics: List[PartitionStat Raises: Exception: If commit fails """ - new_identifier = Identifier( - database=self.identifier.get_database_name(), - object=self.identifier.get_table_name(), - branch=branch - ) - # Call catalog's commit_snapshot method if hasattr(self.catalog, 'commit_snapshot'): - success = self.catalog.commit_snapshot(new_identifier, self.uuid, snapshot, statistics) + success = self.catalog.commit_snapshot(self.identifier, self.uuid, snapshot, statistics) if success: - logger.info("Catalog snapshot commit succeeded for %s, snapshot id %d", new_identifier, snapshot.id) + logger.info("Catalog snapshot commit succeeded for %s, snapshot id %d", self.identifier, snapshot.id) return success else: # Fallback for catalogs that don't support snapshot commits diff --git a/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py b/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py index acc1650baf7f..9500f24c1f69 100644 --- a/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py +++ b/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py @@ -48,13 +48,12 @@ def __init__(self, snapshot_manager: SnapshotManager): self.snapshot_manager = snapshot_manager self.file_io: FileIO = snapshot_manager.file_io - def commit(self, snapshot: Snapshot, branch: str, statistics: List[PartitionStatistics]) -> bool: + def commit(self, snapshot: Snapshot, statistics: List[PartitionStatistics]) -> bool: """ Commit the snapshot using file renaming. Args: snapshot: The snapshot to commit - branch: The branch name to commit to statistics: List of partition statistics (currently unused but kept for interface compatibility) Returns: diff --git a/paimon-python/pypaimon/snapshot/snapshot_commit.py b/paimon-python/pypaimon/snapshot/snapshot_commit.py index 156036165a4e..3538e334a344 100644 --- a/paimon-python/pypaimon/snapshot/snapshot_commit.py +++ b/paimon-python/pypaimon/snapshot/snapshot_commit.py @@ -73,13 +73,12 @@ class SnapshotCommit(ABC): """Interface to commit snapshot atomically.""" @abstractmethod - def commit(self, snapshot: Snapshot, branch: str, statistics: List[PartitionStatistics]) -> bool: + def commit(self, snapshot: Snapshot, statistics: List[PartitionStatistics]) -> bool: """ Commit the given snapshot. Args: snapshot: The snapshot to commit - branch: The branch name to commit to statistics: List of partition statistics Returns: diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index 4dadb234db2a..4b49a62869a2 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -83,8 +83,8 @@ def from_path(cls, table_path: str) -> 'FileStoreTable': return cls(file_io, identifier, table_path, table_schema) def current_branch(self) -> str: - """Get the current branch name from options.""" - return self.options.branch() + """Get the current branch name from the identifier.""" + return self.identifier.get_branch_name_or_default() def comment(self) -> Optional[str]: """Get the table comment.""" @@ -406,8 +406,23 @@ def copy(self, options: dict) -> 'FileStoreTable': if time_travel_schema is not None: new_table_schema = time_travel_schema - return FileStoreTable(self.file_io, self.identifier, self.table_path, new_table_schema, - self.catalog_environment) + # Re-encode the branch into the identifier when the option changes, so + # current_branch() and any catalog-routed snapshot commit see the + # branched object name without an extra side channel. + new_identifier = self.identifier + catalog_env = self.catalog_environment + branch_key = CoreOptions.BRANCH.key() + if branch_key in options: + new_branch = options[branch_key] + new_identifier = Identifier.create( + self.identifier.get_database_name(), + self.identifier.get_table_name(), + branch=new_branch, + ) + catalog_env = self.catalog_environment.copy(new_identifier) + + return FileStoreTable(self.file_io, new_identifier, self.table_path, new_table_schema, + catalog_env) def _try_time_travel(self, options: Options) -> Optional[TableSchema]: """ diff --git a/paimon-python/pypaimon/tests/branch/catalog_branch_manager_test.py b/paimon-python/pypaimon/tests/branch/catalog_branch_manager_test.py index d8af54a18610..0ec48f76929d 100644 --- a/paimon-python/pypaimon/tests/branch/catalog_branch_manager_test.py +++ b/paimon-python/pypaimon/tests/branch/catalog_branch_manager_test.py @@ -125,7 +125,7 @@ def test_fast_forward(self): def test_fast_forward_to_main(self): """Test fast-forward to main branch should raise error.""" from pypaimon.common.identifier import Identifier - identifier_with_branch = Identifier("test_db", "test_table", "feature") + identifier_with_branch = Identifier.create("test_db", "test_table", branch="feature") branch_manager = CatalogBranchManager(self.catalog_loader, identifier_with_branch) with self.assertRaises(ValueError) as cm: @@ -136,7 +136,7 @@ def test_fast_forward_to_main(self): def test_fast_forward_to_current_branch(self): """Test fast-forward to current branch should raise error.""" from pypaimon.common.identifier import Identifier - identifier_with_branch = Identifier("test_db", "test_table", "feature") + identifier_with_branch = Identifier.create("test_db", "test_table", branch="feature") branch_manager = CatalogBranchManager(self.catalog_loader, identifier_with_branch) with self.assertRaises(ValueError) as cm: diff --git a/paimon-python/pypaimon/tests/identifier_test.py b/paimon-python/pypaimon/tests/identifier_test.py index 806f16d599bc..319a246fa952 100644 --- a/paimon-python/pypaimon/tests/identifier_test.py +++ b/paimon-python/pypaimon/tests/identifier_test.py @@ -62,10 +62,34 @@ def test_get_full_name(self): identifier = Identifier.create("mydb", "mytable") self.assertEqual(identifier.get_full_name(), "mydb.mytable") - def test_get_full_name_with_branch(self): - """get_full_name() includes branch when set.""" - identifier = Identifier(database="mydb", object="mytable", branch="feature") - self.assertEqual(identifier.get_full_name(), "mydb.mytable.feature") + def test_constructor_with_branch_encodes_into_object(self): + """``Identifier(db, table, branch=...)`` mirrors Java's 3-arg constructor.""" + identifier = Identifier("mydb", "mytable", branch="feature") + self.assertEqual(identifier.object, "mytable$branch_feature") + self.assertEqual(identifier.get_full_name(), "mydb.mytable$branch_feature") + self.assertEqual(identifier.get_table_name(), "mytable") + self.assertEqual(identifier.get_branch_name(), "feature") + self.assertEqual(identifier.get_branch_name_or_default(), "feature") + self.assertFalse(identifier.is_system_table()) + + def test_main_branch_is_not_encoded_into_object(self): + """``main`` (case-insensitive) is the default branch and is not encoded into ``object``.""" + for branch in ("main", "MAIN", "Main"): + identifier = Identifier("mydb", "mytable", branch=branch) + self.assertEqual(identifier.object, "mytable") + # Wire-equal to a no-branch identifier. + self.assertEqual(identifier, Identifier("mydb", "mytable")) + + def test_get_branch_name_or_default_when_unset(self): + """``get_branch_name_or_default`` falls back to 'main'.""" + identifier = Identifier.create("mydb", "mytable") + self.assertIsNone(identifier.get_branch_name()) + self.assertEqual(identifier.get_branch_name_or_default(), "main") + + def test_unknown_database_drops_database_segment(self): + """``UNKNOWN_DATABASE`` is dropped from full name (Java-compatible).""" + identifier = Identifier("unknown", "mytable") + self.assertEqual(identifier.get_full_name(), "mytable") def test_empty_string_raises_error(self): """Empty string should raise ValueError.""" @@ -98,15 +122,145 @@ def test_is_system_table_regular_table(self): def test_is_system_table_snapshots_suffix(self): """object name '$snapshots' is a system table.""" - self.assertTrue(Identifier.create("mydb", "orders$snapshots").is_system_table()) + self.assertTrue(Identifier("mydb", "orders$snapshots").is_system_table()) + self.assertTrue( + Identifier("mydb", "orders", system_table="snapshots").is_system_table()) def test_is_system_table_schemas_suffix(self): """object name '$schemas' is a system table.""" - self.assertTrue(Identifier.create("mydb", "orders$schemas").is_system_table()) + self.assertTrue(Identifier("mydb", "orders$schemas").is_system_table()) def test_is_system_table_files_suffix(self): """object name '$files' is a system table.""" - self.assertTrue(Identifier.create("mydb", "orders$files").is_system_table()) + self.assertTrue(Identifier("mydb", "orders$files").is_system_table()) + + def test_is_system_table_two_parts_with_branch_prefix(self): + """A '$branch_' object is a branched table, NOT a system table.""" + identifier = Identifier("mydb", "orders$branch_dev") + self.assertFalse(identifier.is_system_table()) + self.assertEqual(identifier.get_table_name(), "orders") + self.assertEqual(identifier.get_branch_name(), "dev") + self.assertIsNone(identifier.get_system_table_name()) + + def test_is_system_table_three_parts_branch_and_system(self): + """A '
$branch_$snapshots' object is a system table on a branch.""" + identifier = Identifier("mydb", "orders$branch_dev$snapshots") + self.assertTrue(identifier.is_system_table()) + self.assertEqual(identifier.get_table_name(), "orders") + self.assertEqual(identifier.get_branch_name(), "dev") + self.assertEqual(identifier.get_system_table_name(), "snapshots") + + def test_three_parts_without_branch_prefix_raises(self): + """A '
$$' object without branch prefix is invalid (single '$' allowed).""" + identifier = Identifier("mydb", "orders$schemas$snapshots") + with self.assertRaises(ValueError): + identifier.is_system_table() + + def test_constructor_with_branch_and_system_table(self): + """``Identifier(db, table, branch=..., system_table=...)`` mirrors Java's 4-arg constructor.""" + identifier = Identifier( + "mydb", "orders", branch="dev", system_table="snapshots" + ) + self.assertEqual(identifier.object, "orders$branch_dev$snapshots") + self.assertEqual(identifier.get_table_name(), "orders") + self.assertEqual(identifier.get_branch_name(), "dev") + self.assertEqual(identifier.get_system_table_name(), "snapshots") + + def test_constructor_with_system_table_only(self): + """Constructor with system_table but no branch.""" + identifier = Identifier("mydb", "orders", system_table="files") + self.assertEqual(identifier.object, "orders$files") + self.assertEqual(identifier.get_table_name(), "orders") + self.assertIsNone(identifier.get_branch_name()) + self.assertEqual(identifier.get_system_table_name(), "files") + + def test_create_is_two_arg_alias(self): + """``Identifier.create(db, object)`` is a 2-arg alias of the JSON constructor (matches Java).""" + identifier = Identifier.create("db", "orders$snapshots") + self.assertEqual(identifier.object, "orders$snapshots") + self.assertEqual(identifier.get_table_name(), "orders") + self.assertEqual(identifier.get_system_table_name(), "snapshots") + self.assertTrue(identifier.is_system_table()) + + def test_constructor_forms_are_wire_equivalent(self): + """The encoding constructor and the @JsonCreator form produce wire-equal identifiers.""" + encoded = Identifier( + "mydb", "orders", branch="dev", system_table="snapshots" + ) + from_wire = Identifier("mydb", "orders$branch_dev$snapshots") + self.assertEqual(encoded, from_wire) + self.assertEqual(hash(encoded), hash(from_wire)) + + def test_branch_property_reads_decoded_branch(self): + """``identifier.branch`` is a read-only alias for ``get_branch_name()``.""" + self.assertEqual(Identifier("db", "tbl", branch="dev").branch, "dev") + self.assertEqual(Identifier("db", "tbl$branch_dev").branch, "dev") + self.assertIsNone(Identifier("db", "tbl").branch) + + def test_equality_and_hash_ignore_cached_fields(self): + """Equality and hash depend only on (database, object), matching Java JSON shape.""" + a = Identifier("mydb", "orders$branch_dev$snapshots") + b = Identifier( + "mydb", "orders", branch="dev", system_table="snapshots" + ) + self.assertEqual(a, b) + self.assertEqual(hash(a), hash(b)) + + +class IdentifierBackwardCompatibilityTest(unittest.TestCase): + """Locks in that the public surface from before this PR keeps working. + + Pre-PR ``Identifier`` exposed three signatures that this PR's + encoding-into-object refactor risked breaking. They must continue to + run without raising; semantics may shift to the wire-correct form + (branches encoded into ``object``), but no caller should hit a + TypeError / AttributeError on an unchanged call site. + """ + + def test_constructor_branch_kwarg_still_accepted(self): + # ``Identifier(db, obj, branch=...)`` was the old dataclass init + # signature. It must still construct without error; the branch is + # now encoded into ``object`` so the wire is Java-compatible. + identifier = Identifier(database="db", object="tbl", branch="feature") + self.assertEqual(identifier.object, "tbl$branch_feature") + self.assertEqual(identifier.get_branch_name(), "feature") + + def test_constructor_branch_kwarg_none_is_accepted(self): + # Explicit ``branch=None`` (e.g. from JSON deserialization paths) + # must remain a no-op rather than triggering the encoding path. + identifier = Identifier("db", "tbl", branch=None) + self.assertEqual(identifier.object, "tbl") + self.assertIsNone(identifier.get_branch_name()) + + def test_branch_attribute_read(self): + # Old code that read ``identifier.branch`` keeps reading the + # branch name (now decoded from ``object``). + self.assertEqual(Identifier("db", "tbl", branch="dev").branch, "dev") + self.assertEqual(Identifier("db", "tbl$branch_dev").branch, "dev") + self.assertIsNone(Identifier("db", "tbl").branch) + + def test_branch_attribute_write(self): + # Old code that assigned to ``identifier.branch`` keeps working. + # The setter re-encodes ``object`` so the wire is consistent. + identifier = Identifier("db", "tbl") + identifier.branch = "feature" + self.assertEqual(identifier.branch, "feature") + self.assertEqual(identifier.object, "tbl$branch_feature") + self.assertEqual(identifier.get_branch_name(), "feature") + # Clearing back to None drops the encoded branch segment. + identifier.branch = None + self.assertEqual(identifier.object, "tbl") + self.assertIsNone(identifier.branch) + + def test_create_two_arg_form_with_dollar_object(self): + # ``Identifier.create(db, "tbl$snapshots")`` was the documented + # way to construct system-table identifiers pre-PR. It must keep + # producing an equivalent system-table identifier. + identifier = Identifier.create("db", "orders$snapshots") + self.assertEqual(identifier.object, "orders$snapshots") + self.assertTrue(identifier.is_system_table()) + self.assertEqual(identifier.get_table_name(), "orders") + self.assertEqual(identifier.get_system_table_name(), "snapshots") if __name__ == '__main__': diff --git a/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py index 6e5ea4761871..4f400e1580ca 100644 --- a/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py @@ -320,8 +320,8 @@ def test_commit_succeeded_on_server_but_client_fails(self): real_commit = tc.file_store_commit.snapshot_commit.commit - def commit_then_raise(sn, br, st): - real_commit(sn, br, st) + def commit_then_raise(sn, st): + real_commit(sn, st) raise RuntimeError("simulated") with patch.object(tc.file_store_commit.snapshot_commit, 'commit', side_effect=commit_then_raise): diff --git a/paimon-python/pypaimon/tests/rest/rest_server.py b/paimon-python/pypaimon/tests/rest/rest_server.py index c3a41886692a..99e7142b22bf 100755 --- a/paimon-python/pypaimon/tests/rest/rest_server.py +++ b/paimon-python/pypaimon/tests/rest/rest_server.py @@ -484,25 +484,16 @@ def _route_request(self, method: str, resource_path: str, parameters: Dict[str, def _handle_table_resource(self, method: str, path_parts: List[str], identifier: Identifier, data: str, parameters: Dict[str, str]) -> Tuple[str, int]: - """Handle table-specific resource requests""" - # Extract table name and check for branch information - raw_table_name = path_parts[2] - - # Parse table name with potential branch (e.g., "table.main" -> "table", branch="main") - if '.' in raw_table_name and len(raw_table_name.split('.')) > 1: - # This might be a table with branch - table_parts = raw_table_name.split('.') - if len(table_parts) == 2: - table_name_part = table_parts[0] - branch_part = table_parts[1] - # Recreate identifier without branch for lookup - lookup_identifier = Identifier.create(identifier.get_database_name(), table_name_part) - else: - lookup_identifier = identifier - branch_part = None + """Handle table-specific resource requests.""" + # The branch (if any) is encoded into the object name as + # "$branch_" — see Identifier. Strip it for table lookup, and + # surface the branch separately for routes that care (e.g. commit). + branch_part = identifier.get_branch_name() + if branch_part is not None: + lookup_identifier = Identifier.create( + identifier.get_database_name(), identifier.get_table_name()) else: lookup_identifier = identifier - branch_part = None # Check table permissions using the base identifier if lookup_identifier.get_full_name() in self.no_permission_tables: diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py index b94ad58b2442..801e1bfff6c5 100644 --- a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py @@ -479,7 +479,7 @@ def make_file_io(): def test_refresh_token_strips_system_table_suffix(self): """refresh_token() strips $snapshots suffix before requesting token.""" - system_identifier = Identifier.create("db", "my_table$snapshots") + system_identifier = Identifier.create("db", "my_table", system_table="snapshots") file_io = RESTTokenFileIO( system_identifier, self.warehouse_path, self.catalog_options) diff --git a/paimon-python/pypaimon/tests/table/file_store_table_test.py b/paimon-python/pypaimon/tests/table/file_store_table_test.py index 22068d3a0137..30b286d637d4 100644 --- a/paimon-python/pypaimon/tests/table/file_store_table_test.py +++ b/paimon-python/pypaimon/tests/table/file_store_table_test.py @@ -108,24 +108,28 @@ def test_consumer_manager(self): self.assertEqual(min_snapshot, 5) def test_consumer_manager_with_branch(self): - """Test consumer_manager with branch option.""" - # Create table with branch option + """Test consumer_manager when branch is encoded in the identifier.""" branch_name = "feature_branch" + + # Create a regular table; the branch is supplied later via the + # branch-encoded identifier (Java-aligned routing). schema = Schema.from_pyarrow_schema( self.pa_schema, partition_keys=['dt'], - options={ - CoreOptions.BUCKET.key(): "2", - "branch": branch_name - } + options={CoreOptions.BUCKET.key(): "2"}, ) - self.catalog.create_table('default.test_branch_table', schema, False) - branch_table = self.catalog.get_table('default.test_branch_table') + self.catalog.create_table('default.test_branch_table', schema, True) - # Get consumer_manager and verify it has correct branch - branch_consumer_manager = branch_table.consumer_manager() + # Access the table with a branch-encoded identifier. + branch_table = self.catalog.get_table( + 'default.test_branch_table$branch_{}'.format(branch_name)) + + # current_branch() reads from the identifier. self.assertEqual(branch_table.current_branch(), branch_name) + # Get consumer_manager and exercise it on the branched view. + branch_consumer_manager = branch_table.consumer_manager() + # Test consumer operations on branch from pypaimon.consumer.consumer import Consumer branch_consumer_manager.reset_consumer("branch_consumer", Consumer(next_snapshot=10)) @@ -254,19 +258,16 @@ def test_changelog_manager(self): self.assertEqual(changelog_manager.branch, DEFAULT_MAIN_BRANCH) def test_changelog_manager_with_branch(self): - """Test changelog_manager with branch option.""" - # Create table with branch option + """Test changelog_manager when branch is encoded into the identifier.""" branch_name = "feature" schema = Schema.from_pyarrow_schema( self.pa_schema, partition_keys=['dt'], - options={ - CoreOptions.BUCKET.key(): "2", - "branch": branch_name - } + options={CoreOptions.BUCKET.key(): "2"}, ) self.catalog.create_table('default.test_changelog_branch_table', schema, False) - branch_table = self.catalog.get_table('default.test_changelog_branch_table') + branch_table = self.catalog.get_table( + 'default.test_changelog_branch_table$branch_{}'.format(branch_name)) # Get changelog_manager and verify it has correct branch branch_changelog_manager = branch_table.changelog_manager() @@ -299,28 +300,27 @@ def test_changelog_manager_latest_and_earliest_none(self): self.assertIsNone(changelog_manager.earliest_long_lived_changelog_id()) def test_current_branch(self): - """Test that current_branch returns the branch from options.""" + """Test that current_branch reads from the identifier.""" from pypaimon.branch.branch_manager import DEFAULT_MAIN_BRANCH # Default table should have main branch self.assertEqual(self.table.current_branch(), DEFAULT_MAIN_BRANCH) - # Table with branch option should return that branch + # Access a table with a branch-encoded identifier — current_branch + # decodes from the object name (Java-aligned). branch_name = "feature_branch" schema = Schema.from_pyarrow_schema( self.pa_schema, partition_keys=['dt'], - options={ - CoreOptions.BUCKET.key(): "2", - "branch": branch_name - } + options={CoreOptions.BUCKET.key(): "2"}, ) self.catalog.create_table('default.test_current_branch', schema, False) - branch_table = self.catalog.get_table('default.test_current_branch') + branch_table = self.catalog.get_table( + 'default.test_current_branch$branch_{}'.format(branch_name)) self.assertEqual(branch_table.current_branch(), branch_name) def test_copy_with_branch(self): - """Test copy method with branch option.""" + """copy() with a branch option re-encodes branch into the identifier.""" branch_name = "test_branch" # Copy table with branch option @@ -335,8 +335,12 @@ def test_copy_with_branch(self): self.assertEqual(self.table.schema_manager.branch, DEFAULT_MAIN_BRANCH) self.assertEqual(copied_table.schema_manager.branch, branch_name) - # Verify other properties are preserved - self.assertEqual(copied_table.identifier, self.table.identifier) + # The copied table's identifier carries the encoded branch, so it is + # NOT equal to the source identifier (Java-aligned wire shape). + self.assertEqual(copied_table.identifier.get_table_name(), + self.table.identifier.get_table_name()) + self.assertEqual(copied_table.identifier.get_branch_name(), branch_name) + self.assertIsNone(self.table.identifier.get_branch_name()) self.assertEqual(copied_table.table_path, self.table.table_path) def test_rename_branch_basic(self): diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 832a39ba6887..97d4ed92988a 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -408,7 +408,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str # Use SnapshotCommit for atomic commit try: with self.snapshot_commit: - success = self.snapshot_commit.commit(snapshot_data, self.table.current_branch(), statistics) + success = self.snapshot_commit.commit(snapshot_data, statistics) if not success: commit_time_s = (int(time.time() * 1000) - start_millis) / 1000 logger.warning(