diff --git a/docs/docs.json b/docs/docs.json index ff5a1352eb..ed53a9fdb8 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -186,6 +186,7 @@ "integrations/vector-db-integrations/chromadb", "integrations/vector-db-integrations/couchbase", "integrations/vector-db-integrations/milvus", + "integrations/vector-db-integrations/moss", "integrations/vector-db-integrations/pgvector", "integrations/vector-db-integrations/pinecone", "integrations/vector-db-integrations/weaviate" diff --git a/docs/integrations/vector-db-integrations/moss.mdx b/docs/integrations/vector-db-integrations/moss.mdx new file mode 100644 index 0000000000..71fedb84ee --- /dev/null +++ b/docs/integrations/vector-db-integrations/moss.mdx @@ -0,0 +1,117 @@ +--- +title: Moss +sidebarTitle: Moss +--- + +In this section, we present how to connect Moss to MindsDB. + +[Moss](https://moss.dev) is a semantic search runtime for Conversational AI agents. It delivers hybrid search with sub-10ms latency. + +## Prerequisites + +Before proceeding, ensure the following prerequisites are met: + +1. Install MindsDB locally via [Docker](/setup/self-hosted/docker) or [Docker Desktop](/setup/self-hosted/docker-desktop). +2. To connect Moss to MindsDB, install the required dependencies following [this instruction](/setup/self-hosted/docker#install-dependencies). +3. Create a Moss account and obtain your project credentials from [portal.usemoss.dev](https://portal.usemoss.dev). + +## Connection + +This handler is implemented using the `moss` Python library. + +To connect Moss to MindsDB, use the following statement: + +```sql +CREATE DATABASE moss_db +WITH ENGINE = 'moss', +PARAMETERS = { + "project_id": "your-project-id", + "project_key": "moss_access_key_xxxxx", + "alpha": "0.8" +}; +``` + +The required parameters are: + +- `project_id`: Your Moss project ID, available from [portal.usemoss.dev](https://portal.usemoss.dev). +- `project_key`: Your Moss project key (secret), available from [portal.usemoss.dev](https://portal.usemoss.dev). + +The optional parameters are: + +- `alpha`: Hybrid search weight between `0.0` (keyword-only) and `1.0` (semantic-only). Defaults to `0.8`. + +## Usage + +### Inserting documents + +Insert documents to create a new index. The index is built asynchronously — MindsDB waits until it is ready before returning (typically 5–30 seconds). + +```sql +INSERT INTO moss_db.my_index (id, content, metadata) +VALUES + ('doc-1', 'MindsDB unifies AI and data with SQL', '{"category": "mindsdb"}'), + ('doc-2', 'Moss delivers sub-10ms hybrid semantic search', '{"category": "moss"}'), + ('doc-3', 'RAG pipelines combine retrieval and generation', '{"category": "rag"}'); +``` + + + The `id` column is optional. If omitted, Moss generates a unique ID + automatically using a hash of the document content. + + +Inserting into an existing index upserts the documents: + +```sql +INSERT INTO moss_db.my_index (id, content, metadata) +VALUES ('doc-4', 'Vector databases store embeddings for similarity search', '{"category": "vectordb"}'); +``` + +### Semantic search + +Query your index using natural language. Results are ranked by relevance and include a `distance` column (lower = better match): + +```sql +SELECT id, content, distance +FROM moss_db.my_index +WHERE content = 'how do I build a RAG pipeline?' +LIMIT 5; +``` + +### Fetch all documents + +```sql +SELECT id, content, metadata +FROM moss_db.my_index; +``` + +### Fetch by ID + +```sql +SELECT id, content +FROM moss_db.my_index +WHERE id = 'doc-1'; +``` + +### Semantic search with metadata filter + +Combine a semantic search query with a metadata filter: + +```sql +SELECT id, content, distance +FROM moss_db.my_index +WHERE content = 'semantic search performance' + AND metadata.category = 'moss' +LIMIT 3; +``` + +### Delete a document + +```sql +DELETE FROM moss_db.my_index WHERE id = 'doc-1'; +``` + +### Drop an index + +```sql +DROP TABLE moss_db.my_index; +``` diff --git a/mindsdb/integrations/handlers/moss_handler/README.md b/mindsdb/integrations/handlers/moss_handler/README.md new file mode 100644 index 0000000000..1508840b99 --- /dev/null +++ b/mindsdb/integrations/handlers/moss_handler/README.md @@ -0,0 +1,118 @@ +--- +title: Moss +sidebarTitle: Moss +--- + +In this section, we present how to connect Moss to MindsDB. + +[Moss](https://moss.dev) is a semantic search runtime built for Conversational AI agents. It lets you index documents and run hybrid semantic/keyword queries in under 10ms — fast enough for real-time conversation, compared to 200–300ms with typical retrieval systems. + +## Prerequisites + +Before proceeding, ensure the following prerequisites are met: + +1. Install MindsDB locally via [Docker](/setup/self-hosted/docker) or [Docker Desktop](/setup/self-hosted/docker-desktop). +2. To connect Moss to MindsDB, install the required dependencies following [this instruction](/setup/self-hosted/docker#install-dependencies). +3. Create a Moss project and obtain your credentials from the [Moss Portal](https://portal.usemoss.dev). + +## Connection + +This handler is implemented using the `moss` Python library. + +To connect your Moss project to MindsDB, use the following statement: + +```sql +CREATE DATABASE moss_datasource +WITH ENGINE = 'moss', +PARAMETERS = { + "project_id": "your-project-id", + "project_key": "moss_access_key_xxxxx", + "alpha": "0.8" +}; +``` + +The required parameters are: + +* `project_id`: Your Moss project ID, available in the [Moss Portal](https://portal.usemoss.dev). +* `project_key`: Your Moss project access key. + +The optional parameters are: + +* `alpha`: Controls the blend between semantic and keyword search. Range is `0.0` to `1.0`, where `0.0` is pure keyword (BM25), `1.0` is pure semantic, and `0.8` is the default. + +## Usage + +Once connected, you can insert documents into a Moss index. The index is created automatically on the first insert. + +```sql +INSERT INTO moss_datasource.my_index (id, content, metadata) +VALUES + ('doc-1', 'MindsDB unifies AI and data pipelines', '{"category": "product"}'), + ('doc-2', 'Connect MindsDB to PostgreSQL, MySQL, and more', '{"category": "integrations"}'), + ('doc-3', 'Create AI models using the CREATE MODEL syntax', '{"category": "docs"}'); +``` + + +The `INSERT` statement blocks until Moss finishes building the index, which typically takes 5–30 seconds depending on document count. + + +To run a semantic search query: + +```sql +SELECT id, content, distance +FROM moss_datasource.my_index +WHERE content = 'how do I create an AI model?' +LIMIT 5; +``` + +The `distance` column is `1 - score`, so lower values indicate a closer match. + +To fetch all documents without a search query: + +```sql +SELECT id, content, metadata +FROM moss_datasource.my_index; +``` + +To fetch specific documents by ID: + +```sql +SELECT id, content +FROM moss_datasource.my_index +WHERE id = 'doc-1'; +``` + +To filter results by metadata alongside a semantic search: + +```sql +SELECT id, content, distance +FROM moss_datasource.my_index +WHERE content = 'connecting to databases' + AND metadata.category = 'integrations' +LIMIT 3; +``` + +To delete documents from an index: + +```sql +DELETE FROM moss_datasource.my_index +WHERE id = 'doc-1'; +``` + +To drop an index entirely: + +```sql +DROP TABLE moss_datasource.my_index; +``` + +## Using Moss in a RAG Pipeline + +You can combine Moss with a MindsDB model to build a retrieval-augmented generation (RAG) pipeline entirely in SQL: + +```sql +SELECT r.content, m.answer +FROM moss_datasource.my_index AS r +JOIN mindsdb.my_llm AS m +WHERE r.content = 'what is the refund policy?' +LIMIT 1; +``` diff --git a/mindsdb/integrations/handlers/moss_handler/__about__.py b/mindsdb/integrations/handlers/moss_handler/__about__.py new file mode 100644 index 0000000000..7381ff2cbd --- /dev/null +++ b/mindsdb/integrations/handlers/moss_handler/__about__.py @@ -0,0 +1,9 @@ +__title__ = "MindsDB Moss handler" +__package_name__ = "mindsdb_moss_handler" +__version__ = "0.0.1" +__description__ = "MindsDB handler for Moss semantic search" +__author__ = "Keshav Arora" +__github__ = "https://github.com/mindsdb/mindsdb" +__pypi__ = "https://pypi.org/project/mindsdb/" +__license__ = "MIT" +__copyright__ = "Copyright 2024 - mindsdb" diff --git a/mindsdb/integrations/handlers/moss_handler/__init__.py b/mindsdb/integrations/handlers/moss_handler/__init__.py new file mode 100644 index 0000000000..3d74cbea83 --- /dev/null +++ b/mindsdb/integrations/handlers/moss_handler/__init__.py @@ -0,0 +1,33 @@ +from mindsdb.integrations.libs.const import HANDLER_SUPPORT_LEVEL, HANDLER_TYPE + +from .__about__ import __description__ as description +from .__about__ import __version__ as version +from .connection_args import connection_args, connection_args_example + +try: + from .moss_handler import MossHandler as Handler + + import_error = None +except Exception as e: + Handler = None + import_error = e + +title = "Moss" +name = "moss" +type = HANDLER_TYPE.DATA +support_level = HANDLER_SUPPORT_LEVEL.COMMUNITY +icon_path = "icon.png" + +__all__ = [ + "Handler", + "version", + "name", + "type", + "title", + "description", + "support_level", + "connection_args", + "connection_args_example", + "import_error", + "icon_path", +] diff --git a/mindsdb/integrations/handlers/moss_handler/connection_args.py b/mindsdb/integrations/handlers/moss_handler/connection_args.py new file mode 100644 index 0000000000..dc0be17096 --- /dev/null +++ b/mindsdb/integrations/handlers/moss_handler/connection_args.py @@ -0,0 +1,31 @@ +from collections import OrderedDict + +from mindsdb.integrations.libs.const import HANDLER_CONNECTION_ARG_TYPE as ARG_TYPE + +connection_args = OrderedDict( + project_id={ + "type": ARG_TYPE.STR, + "description": "Moss project ID from the Moss Portal (portal.usemoss.dev)", + "required": True, + }, + project_key={ + "type": ARG_TYPE.PWD, + "description": "Moss project key from the Moss Portal", + "required": True, + "secret": True, + }, + alpha={ + "type": ARG_TYPE.STR, + "description": ( + "Hybrid search weight between semantic and keyword search. " + "0.0 = pure keyword (BM25), 1.0 = pure semantic, 0.8 = default" + ), + "required": False, + }, +) + +connection_args_example = OrderedDict( + project_id="your-project-id", + project_key="moss_access_key_xxxxx", + alpha="0.8", +) diff --git a/mindsdb/integrations/handlers/moss_handler/icon.png b/mindsdb/integrations/handlers/moss_handler/icon.png new file mode 100644 index 0000000000..29d5613861 Binary files /dev/null and b/mindsdb/integrations/handlers/moss_handler/icon.png differ diff --git a/mindsdb/integrations/handlers/moss_handler/moss_handler.py b/mindsdb/integrations/handlers/moss_handler/moss_handler.py new file mode 100644 index 0000000000..de6ff0a3e1 --- /dev/null +++ b/mindsdb/integrations/handlers/moss_handler/moss_handler.py @@ -0,0 +1,320 @@ +import ast +import asyncio +import hashlib +import time +from concurrent.futures import ThreadPoolExecutor +from typing import List, Optional + +import pandas as pd + +try: + from moss import ( + DocumentInfo, + GetDocumentsOptions, + MossClient, + MutationOptions, + QueryOptions, + ) +except ImportError: + pass # captured as import_error in __init__.py + +from mindsdb.integrations.handlers.moss_handler.settings import MossHandlerConfig +from mindsdb.integrations.libs.response import RESPONSE_TYPE +from mindsdb.integrations.libs.response import HandlerResponse as Response +from mindsdb.integrations.libs.response import HandlerStatusResponse as StatusResponse +from mindsdb.integrations.libs.vectordatabase_handler import ( + FilterCondition, + FilterOperator, + TableField, + VectorStoreHandler, +) +from mindsdb.utilities import log + +logger = log.getLogger(__name__) + +_executor = ThreadPoolExecutor(max_workers=4) + + +def _run(coro): + future = _executor.submit(asyncio.run, coro) + return future.result() + + +class MossHandler(VectorStoreHandler): + """MindsDB handler for Moss semantic search.""" + + name = "moss" + + def __init__(self, name: str, **kwargs): + super().__init__(name) + self._client = None + self.is_connected = False + self._loaded_indexes: set = set() + + config = self._validate_config(name, **kwargs) + self._project_id = config.project_id + self._project_key = config.project_key + self._alpha = config.alpha + + def _validate_config(self, name: str, **kwargs) -> MossHandlerConfig: + connection_data = dict(kwargs.get("connection_data", {})) + connection_data["vector_store"] = name + return MossHandlerConfig(**connection_data) + + # ------------------------------------------------------------------ + # Connection lifecycle + # ------------------------------------------------------------------ + + def connect(self): + if self.is_connected: + return self._client + try: + self._client = MossClient(self._project_id, self._project_key) + self.is_connected = True + return self._client + except Exception as e: + self.is_connected = False + raise Exception(f"Error connecting to Moss: {e}") + + def disconnect(self): + self._client = None + self.is_connected = False + self._loaded_indexes.clear() + + def check_connection(self) -> StatusResponse: + response = StatusResponse(False) + need_to_close = not self.is_connected + try: + client = self.connect() + _run(client.list_indexes()) + response.success = True + except Exception as e: + logger.error(f"Error connecting to Moss: {e}") + response.error_message = str(e) + finally: + if response.success and need_to_close: + self.disconnect() + if not response.success and self.is_connected: + self.is_connected = False + return response + + def _ensure_loaded(self, index_name: str): + if index_name not in self._loaded_indexes: + try: + _run(self._client.load_index(index_name)) + except Exception as e: + if "not found" in str(e).lower(): + raise Exception( + f"Index '{index_name}' not found in Moss. " + "Insert documents first: INSERT INTO moss_db. (content) VALUES (...)" + ) from e + raise + self._loaded_indexes.add(index_name) + + def _wait_for_index_ready(self, index_name: str, timeout: int = 180): + """Poll get_index until status is Ready.""" + deadline = time.time() + timeout + while time.time() < deadline: + try: + info = _run(self._client.get_index(index_name)) + if info.status == "Ready": + return + if str(info.status).lower() == "failed": + raise Exception(f"Moss index build failed for '{index_name}'") + except Exception as e: + if "index build failed" in str(e).lower(): + raise + # Index not visible yet — keep polling + time.sleep(3) + raise Exception(f"Timed out waiting for Moss index '{index_name}' to be ready") + + def _index_exists(self, index_name: str) -> bool: + try: + indexes = _run(self._client.list_indexes()) + return any(idx.name == index_name for idx in (indexes or [])) + except Exception: + return False + + def _translate_metadata_filters(self, conditions: List[FilterCondition]) -> Optional[dict]: + if not conditions: + return None + + prefix = TableField.METADATA.value + "." + meta_conditions = [c for c in conditions if c.column.startswith(prefix)] + if not meta_conditions: + return None + + op_map = { + FilterOperator.EQUAL: "$eq", + FilterOperator.NOT_EQUAL: "$ne", + FilterOperator.LESS_THAN: "$lt", + FilterOperator.LESS_THAN_OR_EQUAL: "$lte", + FilterOperator.GREATER_THAN: "$gt", + FilterOperator.GREATER_THAN_OR_EQUAL: "$gte", + FilterOperator.IN: "$in", + } + + translated = [] + for cond in meta_conditions: + field = cond.column.split(".", 1)[-1] + op = op_map.get(cond.op) + if op: + translated.append({"field": field, "condition": {op: cond.value}}) + + if not translated: + return None + return {"$and": translated} if len(translated) > 1 else translated[0] + + def _parse_metadata(self, raw) -> dict: + if raw is None or (isinstance(raw, float) and pd.isna(raw)): + return {} + if isinstance(raw, dict): + return raw + try: + parsed = ast.literal_eval(str(raw)) + return parsed if isinstance(parsed, dict) else {} + except (ValueError, SyntaxError): + return {} + + # ------------------------------------------------------------------ + # VectorStoreHandler abstract methods + # ------------------------------------------------------------------ + + def select( + self, + table_name: str, + columns: List[str] = None, + conditions: List[FilterCondition] = None, + offset: int = None, + limit: int = None, + ) -> pd.DataFrame: + self.connect() + self._ensure_loaded(table_name) + + content_filter = None + id_filters = [] + moss_filter = self._translate_metadata_filters(conditions) + + if conditions: + for cond in conditions: + if cond.column == TableField.CONTENT.value: + content_filter = cond + elif cond.column == TableField.ID.value: + if cond.op == FilterOperator.EQUAL: + id_filters.append(str(cond.value)) + elif cond.op == FilterOperator.IN: + id_filters.extend(str(v) for v in cond.value) + + if content_filter is not None: + query_text = content_filter.value + if isinstance(query_text, list): + query_text = query_text[0] + + opts = QueryOptions( + top_k=limit or 10, + alpha=self._alpha, + **({"filter": moss_filter} if moss_filter else {}), + ) + search_result = _run(self._client.query(table_name, query_text, opts)) + results = (search_result.docs if search_result is not None else []) or [] + + # Moss score is 0-1 (higher = better); map to distance (lower = better) + df = pd.DataFrame({ + TableField.ID.value: [r.id for r in results], + TableField.CONTENT.value: [r.text for r in results], + TableField.METADATA.value: [r.metadata for r in results], + TableField.EMBEDDINGS.value: [None] * len(results), + TableField.DISTANCE.value: [1.0 - (r.score or 0.0) for r in results], + }) + else: + get_opts = GetDocumentsOptions(doc_ids=id_filters) if id_filters else None + docs = _run(self._client.get_docs(table_name, get_opts)) or [] + + if limit is not None: + docs = docs[offset or 0: (offset or 0) + limit] + + df = pd.DataFrame({ + TableField.ID.value: [d.id for d in docs], + TableField.CONTENT.value: [d.text for d in docs], + TableField.METADATA.value: [d.metadata for d in docs], + TableField.EMBEDDINGS.value: [None] * len(docs), + }) + + if columns: + available = [c for c in columns if c in df.columns] + if available: + df = df[available] + + return df + + def insert(self, table_name: str, df: pd.DataFrame) -> Response: + self.connect() + + documents = [] + for _, row in df.iterrows(): + content = str(row.get(TableField.CONTENT.value, "")) + doc_id = row.get(TableField.ID.value) + doc_id_str = ( + str(doc_id) + if doc_id is not None and not (isinstance(doc_id, float) and pd.isna(doc_id)) + else hashlib.sha256(content.encode()).hexdigest() + ) + documents.append(DocumentInfo( + id=doc_id_str, + text=content, + metadata=self._parse_metadata(row.get(TableField.METADATA.value)), + )) + + if self._index_exists(table_name): + _run(self._client.add_docs(table_name, documents, MutationOptions(upsert=True))) + else: + _run(self._client.create_index(table_name, documents)) + + self._wait_for_index_ready(table_name) + _run(self._client.load_index(table_name)) + self._loaded_indexes.add(table_name) + + return Response(RESPONSE_TYPE.OK, affected_rows=len(df)) + + def delete(self, table_name: str, conditions: List[FilterCondition] = None): + self.connect() + if not conditions: + raise Exception("Delete requires at least one condition") + + id_filters = [] + for cond in conditions: + if cond.column != TableField.ID.value: + continue + if cond.op == FilterOperator.EQUAL: + id_filters.append(str(cond.value)) + elif cond.op == FilterOperator.IN: + id_filters.extend(str(v) for v in cond.value) + + if not id_filters: + raise Exception("Moss delete only supports filtering by id") + + _run(self._client.delete_docs(table_name, id_filters)) + self._wait_for_index_ready(table_name) + self._loaded_indexes.discard(table_name) + + def create_table(self, table_name: str, if_not_exists=True): + pass # index is created lazily on first insert + + def drop_table(self, table_name: str, if_exists=True): + self.connect() + try: + _run(self._client.delete_index(table_name)) + self._loaded_indexes.discard(table_name) + except Exception as e: + if if_exists: + return + raise Exception(f"Failed to delete Moss index '{table_name}': {e}") + + def get_tables(self) -> Response: + self.connect() + indexes = _run(self._client.list_indexes()) or [] + df = pd.DataFrame({"table_name": [idx.name for idx in indexes]}) + return Response(resp_type=RESPONSE_TYPE.TABLE, data_frame=df) + + def get_columns(self, table_name: str) -> Response: + return super().get_columns(table_name) diff --git a/mindsdb/integrations/handlers/moss_handler/requirements.txt b/mindsdb/integrations/handlers/moss_handler/requirements.txt new file mode 100644 index 0000000000..e2d3219c1e --- /dev/null +++ b/mindsdb/integrations/handlers/moss_handler/requirements.txt @@ -0,0 +1 @@ +moss==1.0.0 \ No newline at end of file diff --git a/mindsdb/integrations/handlers/moss_handler/settings.py b/mindsdb/integrations/handlers/moss_handler/settings.py new file mode 100644 index 0000000000..9f63e962c5 --- /dev/null +++ b/mindsdb/integrations/handlers/moss_handler/settings.py @@ -0,0 +1,32 @@ +import difflib +from typing import Any + +from pydantic import BaseModel, ConfigDict, field_validator, model_validator + + +class MossHandlerConfig(BaseModel): + model_config = ConfigDict(extra="forbid") + + vector_store: str + project_id: str + project_key: str + alpha: float = 0.8 + + @model_validator(mode="before") + @classmethod + def check_param_typos(cls, values: Any) -> Any: + expected = set(cls.model_fields.keys()) + for key in values.keys(): + if key not in expected: + close = difflib.get_close_matches(key, expected, cutoff=0.4) + hint = f" Did you mean '{close[0]}'?" if close else "" + raise ValueError(f"Unexpected parameter '{key}'.{hint}") + return values + + @field_validator("alpha", mode="before") + @classmethod + def validate_alpha(cls, v: Any) -> float: + v = float(v) + if not 0.0 <= v <= 1.0: + raise ValueError("alpha must be between 0.0 and 1.0") + return v diff --git a/mindsdb/integrations/handlers/moss_handler/tests/__init__.py b/mindsdb/integrations/handlers/moss_handler/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/mindsdb/integrations/handlers/moss_handler/tests/test_moss_handler.py b/mindsdb/integrations/handlers/moss_handler/tests/test_moss_handler.py new file mode 100644 index 0000000000..88eb1b1b68 --- /dev/null +++ b/mindsdb/integrations/handlers/moss_handler/tests/test_moss_handler.py @@ -0,0 +1,333 @@ +import unittest +from unittest.mock import MagicMock, patch +import pandas as pd + +from mindsdb.integrations.handlers.moss_handler.moss_handler import MossHandler +from mindsdb.integrations.libs.response import RESPONSE_TYPE +from mindsdb.integrations.libs.vectordatabase_handler import FilterCondition, FilterOperator, TableField + + +HANDLER_KWARGS = { + "connection_data": { + "project_id": "test-project-id", + "project_key": "test-project-key", + "alpha": "0.8", + } +} + + +def _make_handler(): + return MossHandler(name="test_moss", **HANDLER_KWARGS) + + +def _mock_doc(id="doc-1", text="hello world", metadata=None, score=0.9): + doc = MagicMock() + doc.id = id + doc.text = text + doc.metadata = metadata or {} + doc.score = score + return doc + + +def _mock_index(name="my_index", status="Ready", doc_count=1): + idx = MagicMock() + idx.name = name + idx.status = status + idx.doc_count = doc_count + return idx + + +def _mock_search_result(docs): + result = MagicMock() + result.docs = docs + return result + + +class TestMossHandlerConnection(unittest.TestCase): + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler.MossClient") + def test_connect(self, MockClient): + handler = _make_handler() + handler.connect() + MockClient.assert_called_once_with("test-project-id", "test-project-key") + self.assertTrue(handler.is_connected) + + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler.MossClient") + def test_connect_idempotent(self, MockClient): + handler = _make_handler() + handler.connect() + handler.connect() + MockClient.assert_called_once() + + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler._run") + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler.MossClient") + def test_check_connection_success(self, _mock_client, mock_run): + mock_run.return_value = [] + handler = _make_handler() + res = handler.check_connection() + self.assertTrue(res.success) + + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler._run") + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler.MossClient") + def test_check_connection_failure(self, _mock_client, mock_run): + mock_run.side_effect = Exception("auth failed") + handler = _make_handler() + res = handler.check_connection() + self.assertFalse(res.success) + self.assertIn("auth failed", res.error_message) + + +class TestMossHandlerInsert(unittest.TestCase): + def _make_connected_handler(self, mock_run): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + mock_run.side_effect = self._run_side_effects(mock_run) + return handler + + @staticmethod + def _run_side_effects(_mock_run): + # list_indexes → [] (index doesn't exist yet), get_index → Ready + ready_index = _mock_index(status="Ready") + calls = iter([[], None, ready_index]) + return lambda coro: next(calls) + + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler._run") + def test_insert_creates_index_when_new(self, mock_run): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + + # list_indexes → [], create_index → None, get_index → Ready, load_index → None + ready = _mock_index(status="Ready") + mock_run.side_effect = [[], None, ready, None] + + df = pd.DataFrame({ + TableField.ID.value: ["doc-1"], + TableField.CONTENT.value: ["MindsDB unifies AI and data"], + TableField.METADATA.value: ['{"category": "docs"}'], + }) + res = handler.insert("my_index", df) + + handler._client.create_index.assert_called_once() + self.assertEqual(res.resp_type, RESPONSE_TYPE.OK) + + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler._run") + def test_insert_upserts_when_index_exists(self, mock_run): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + + existing = _mock_index(name="my_index", status="Ready") + ready = _mock_index(status="Ready") + # list_indexes → [existing], add_docs → None, get_index → Ready, load_index → None + mock_run.side_effect = [[existing], None, ready, None] + + df = pd.DataFrame({ + TableField.ID.value: ["doc-1"], + TableField.CONTENT.value: ["updated content"], + TableField.METADATA.value: [None], + }) + handler.insert("my_index", df) + handler._client.add_docs.assert_called_once() + + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler._run") + def test_insert_generates_id_when_missing(self, mock_run): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + + ready = _mock_index(status="Ready") + mock_run.side_effect = [[], None, ready, None] + + df = pd.DataFrame({ + TableField.CONTENT.value: ["no id provided"], + }) + handler.insert("my_index", df) + + call_args = handler._client.create_index.call_args + docs = call_args[0][1] + self.assertEqual(len(docs), 1) + self.assertIsNotNone(docs[0].id) + self.assertNotEqual(docs[0].id, "") + + +class TestMossHandlerSelect(unittest.TestCase): + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler._run") + def test_select_semantic_search(self, mock_run): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + handler._loaded_indexes.add("my_index") + + results = [_mock_doc("doc-1", "MindsDB is great", score=0.95)] + mock_run.return_value = _mock_search_result(results) + + conditions = [FilterCondition( + column=TableField.CONTENT.value, + op=FilterOperator.EQUAL, + value="what is MindsDB?", + )] + df = handler.select("my_index", conditions=conditions, limit=5) + + handler._client.query.assert_called_once() + self.assertIn(TableField.CONTENT.value, df.columns) + self.assertIn(TableField.DISTANCE.value, df.columns) + self.assertAlmostEqual(df[TableField.DISTANCE.value].iloc[0], 0.05) + + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler._run") + def test_select_score_to_distance_mapping(self, mock_run): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + handler._loaded_indexes.add("my_index") + + mock_run.return_value = _mock_search_result([_mock_doc(score=1.0)]) + + conditions = [FilterCondition( + column=TableField.CONTENT.value, + op=FilterOperator.EQUAL, + value="query", + )] + df = handler.select("my_index", conditions=conditions) + self.assertAlmostEqual(df[TableField.DISTANCE.value].iloc[0], 0.0) + + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler._run") + def test_select_get_all_docs(self, mock_run): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + handler._loaded_indexes.add("my_index") + + docs = [_mock_doc("doc-1"), _mock_doc("doc-2")] + mock_run.return_value = docs + + df = handler.select("my_index") + + handler._client.get_docs.assert_called_once() + self.assertEqual(len(df), 2) + + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler._run") + def test_select_by_id(self, mock_run): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + handler._loaded_indexes.add("my_index") + + mock_run.return_value = [_mock_doc("doc-1")] + + conditions = [FilterCondition( + column=TableField.ID.value, + op=FilterOperator.EQUAL, + value="doc-1", + )] + handler.select("my_index", conditions=conditions) + + call_args = handler._client.get_docs.call_args + get_opts = call_args[0][1] + self.assertIn("doc-1", get_opts.doc_ids) + + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler._run") + def test_select_uses_alpha(self, mock_run): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + handler._loaded_indexes.add("my_index") + handler._alpha = 0.5 + + mock_run.return_value = _mock_search_result([]) + + conditions = [FilterCondition( + column=TableField.CONTENT.value, + op=FilterOperator.EQUAL, + value="query", + )] + handler.select("my_index", conditions=conditions) + + call_args = handler._client.query.call_args + query_opts = call_args[0][2] + self.assertEqual(query_opts.alpha, 0.5) + + +class TestMossHandlerDelete(unittest.TestCase): + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler._run") + def test_delete_by_id(self, mock_run): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + + ready = _mock_index(status="Ready") + mock_run.side_effect = [None, ready] + + conditions = [FilterCondition( + column=TableField.ID.value, + op=FilterOperator.EQUAL, + value="doc-1", + )] + handler.delete("my_index", conditions=conditions) + + handler._client.delete_docs.assert_called_once_with("my_index", ["doc-1"]) + + def test_delete_without_conditions_raises(self): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + with self.assertRaises(Exception): + handler.delete("my_index", conditions=[]) + + def test_delete_without_id_filter_raises(self): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + conditions = [FilterCondition( + column="metadata.category", + op=FilterOperator.EQUAL, + value="docs", + )] + with self.assertRaises(Exception): + handler.delete("my_index", conditions=conditions) + + +class TestMossHandlerTableOps(unittest.TestCase): + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler._run") + def test_get_tables(self, mock_run): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + + mock_run.return_value = [_mock_index("idx-a"), _mock_index("idx-b")] + + res = handler.get_tables() + self.assertEqual(list(res.data_frame["table_name"]), ["idx-a", "idx-b"]) + + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler._run") + def test_drop_table(self, mock_run): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + handler._loaded_indexes.add("my_index") + + mock_run.return_value = True + handler.drop_table("my_index") + + handler._client.delete_index.assert_called_once_with("my_index") + self.assertNotIn("my_index", handler._loaded_indexes) + + @patch("mindsdb.integrations.handlers.moss_handler.moss_handler._run") + def test_drop_table_if_exists_swallows_error(self, mock_run): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + mock_run.side_effect = Exception("not found") + + handler.drop_table("my_index", if_exists=True) # should not raise + + def test_create_table_is_noop(self): + handler = _make_handler() + handler._client = MagicMock() + handler.is_connected = True + handler.create_table("my_index") # should not raise or call anything + handler._client.assert_not_called() + + +if __name__ == "__main__": + unittest.main()