-
Notifications
You must be signed in to change notification settings - Fork 9.3k
fix(azure): prepend bucket prefix to all Azure Blob storage paths #14561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,28 +48,28 @@ def __close__(self): | |
| self.conn = None | ||
|
|
||
| def health(self): | ||
| _bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1" | ||
| return self.conn.upload_blob(name=fnm, data=BytesIO(binary), length=len(binary)) | ||
| bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1" | ||
| return self.conn.upload_blob(name=f"{bucket}/{fnm}", data=BytesIO(binary), length=len(binary)) | ||
|
|
||
| def put(self, bucket, fnm, binary, tenant_id=None): | ||
| for _ in range(3): | ||
| try: | ||
| return self.conn.upload_blob(name=fnm, data=BytesIO(binary), length=len(binary)) | ||
| return self.conn.upload_blob(name=f"{bucket}/{fnm}", data=BytesIO(binary), length=len(binary)) | ||
| except Exception: | ||
| logging.exception(f"Fail put {bucket}/{fnm}") | ||
| self.__open__() | ||
| time.sleep(1) | ||
|
|
||
| def rm(self, bucket, fnm): | ||
| try: | ||
| self.conn.delete_blob(fnm) | ||
| self.conn.delete_blob(f"{bucket}/{fnm}") | ||
| except Exception: | ||
| logging.exception(f"Fail rm {bucket}/{fnm}") | ||
|
|
||
| def get(self, bucket, fnm): | ||
| for _ in range(1): | ||
| try: | ||
| r = self.conn.download_blob(fnm) | ||
| r = self.conn.download_blob(f"{bucket}/{fnm}") | ||
| return r.read() | ||
| except Exception: | ||
| logging.exception(f"fail get {bucket}/{fnm}") | ||
|
|
@@ -79,15 +79,15 @@ def get(self, bucket, fnm): | |
|
|
||
| def obj_exist(self, bucket, fnm): | ||
| try: | ||
| return self.conn.get_blob_client(fnm).exists() | ||
| return self.conn.get_blob_client(f"{bucket}/{fnm}").exists() | ||
| except Exception: | ||
| logging.exception(f"Fail put {bucket}/{fnm}") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix misleading log verb in
🤖 Prompt for AI Agents |
||
| return False | ||
|
|
||
| def get_presigned_url(self, bucket, fnm, expires): | ||
| for _ in range(10): | ||
| try: | ||
| return self.conn.get_presigned_url("GET", bucket, fnm, expires) | ||
| return self.conn.get_presigned_url("GET", f"{bucket}/{fnm}", expires) | ||
| except Exception: | ||
| logging.exception(f"fail get {bucket}/{fnm}") | ||
| self.__open__() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -63,15 +63,15 @@ def __close__(self): | |
| self.conn = None | ||
|
|
||
| def health(self): | ||
| _bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1" | ||
| f = self.conn.create_file(fnm) | ||
| bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1" | ||
| f = self.conn.create_file(f"{bucket}/{fnm}") | ||
| f.append_data(binary, offset=0, length=len(binary)) | ||
| return f.flush_data(len(binary)) | ||
|
|
||
| def put(self, bucket, fnm, binary, tenant_id=None): | ||
| for _ in range(3): | ||
| try: | ||
| f = self.conn.create_file(fnm) | ||
| f = self.conn.create_file(f"{bucket}/{fnm}") | ||
| f.append_data(binary, offset=0, length=len(binary)) | ||
| return f.flush_data(len(binary)) | ||
| except Exception: | ||
|
|
@@ -83,14 +83,14 @@ def put(self, bucket, fnm, binary, tenant_id=None): | |
|
|
||
| def rm(self, bucket, fnm): | ||
| try: | ||
| self.conn.delete_file(fnm) | ||
| self.conn.delete_file(f"{bucket}/{fnm}") | ||
| except Exception: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add legacy-path fallback for non-write operations to avoid post-deploy data invisibility. After this change, Also applies to: 93-94, 104-105, 113-113 🤖 Prompt for AI Agents |
||
| logging.exception(f"Fail rm {bucket}/{fnm}") | ||
|
|
||
| def get(self, bucket, fnm): | ||
| for _ in range(1): | ||
| try: | ||
| client = self.conn.get_file_client(fnm) | ||
| client = self.conn.get_file_client(f"{bucket}/{fnm}") | ||
| r = client.download_file() | ||
| return r.read() | ||
| except Exception: | ||
|
|
@@ -101,7 +101,7 @@ def get(self, bucket, fnm): | |
|
|
||
| def obj_exist(self, bucket, fnm): | ||
| try: | ||
| client = self.conn.get_file_client(fnm) | ||
| client = self.conn.get_file_client(f"{bucket}/{fnm}") | ||
| return client.exists() | ||
| except Exception: | ||
| logging.exception(f"Fail put {bucket}/{fnm}") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct the exception log message in The 🤖 Prompt for AI Agents |
||
|
|
@@ -110,7 +110,7 @@ def obj_exist(self, bucket, fnm): | |
| def get_presigned_url(self, bucket, fnm, expires): | ||
| for _ in range(10): | ||
| try: | ||
| return self.conn.get_presigned_url("GET", bucket, fnm, expires) | ||
| return self.conn.get_presigned_url("GET", f"{bucket}/{fnm}", expires) | ||
| except Exception: | ||
| logging.exception(f"fail get {bucket}/{fnm}") | ||
| self.__open__() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,185 @@ | ||
| # | ||
| # Copyright 2025 The InfiniFlow Authors. All Rights Reserved. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
| """ | ||
| Unit tests for Azure Blob storage path construction (issue #14159). | ||
|
|
||
| Both AzureSpn and AzureSas implementations must prepend the bucket | ||
| parameter to file paths so that files with the same name from different | ||
| datasets do not overwrite each other in flat blob storage. | ||
| """ | ||
| import importlib | ||
| import sys | ||
| import types | ||
| from unittest.mock import MagicMock | ||
|
|
||
| import pytest | ||
|
|
||
|
|
||
| def _install_stubs(): | ||
| """Replace heavyweight runtime modules so the connection modules can be | ||
| imported in isolation without the full ragflow runtime or the real | ||
|
Comment on lines
+31
to
+33
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isolate These fixtures replace global module entries but never restore them. That can make unrelated tests fail depending on run order. Please switch to Also applies to: 74-83, 86-97 🤖 Prompt for AI Agents |
||
| `azure` SDK being installed.""" | ||
|
|
||
| decorator_mod = types.ModuleType("common.decorator") | ||
| decorator_mod.singleton = lambda cls: cls | ||
|
|
||
| settings_mod = types.ModuleType("common.settings") | ||
| settings_mod.AZURE = { | ||
| "account_url": "https://example.dfs.core.windows.net", | ||
| "client_id": "x", | ||
| "secret": "x", | ||
| "tenant_id": "x", | ||
| "container_name": "c", | ||
| "cloud": "public", | ||
| "container_url": "https://example.blob.core.windows.net/c", | ||
| "sas_token": "sig=x", | ||
| } | ||
|
|
||
| common_pkg = types.ModuleType("common") | ||
| common_pkg.decorator = decorator_mod | ||
| common_pkg.settings = settings_mod | ||
|
|
||
| azure_pkg = types.ModuleType("azure") | ||
| azure_identity = types.ModuleType("azure.identity") | ||
| azure_identity.ClientSecretCredential = MagicMock() | ||
| azure_identity.AzureAuthorityHosts = types.SimpleNamespace( | ||
| AZURE_PUBLIC_CLOUD="public", | ||
| AZURE_CHINA="china", | ||
| AZURE_GOVERNMENT="gov", | ||
| AZURE_GERMANY="de", | ||
| ) | ||
| azure_storage = types.ModuleType("azure.storage") | ||
| azure_fdl = types.ModuleType("azure.storage.filedatalake") | ||
| azure_fdl.FileSystemClient = MagicMock() | ||
| azure_blob = types.ModuleType("azure.storage.blob") | ||
| azure_blob.ContainerClient = MagicMock() | ||
| azure_pkg.identity = azure_identity | ||
| azure_pkg.storage = azure_storage | ||
| azure_storage.filedatalake = azure_fdl | ||
| azure_storage.blob = azure_blob | ||
|
|
||
| sys.modules.update({ | ||
| "common": common_pkg, | ||
| "common.decorator": decorator_mod, | ||
| "common.settings": settings_mod, | ||
| "azure": azure_pkg, | ||
| "azure.identity": azure_identity, | ||
| "azure.storage": azure_storage, | ||
| "azure.storage.filedatalake": azure_fdl, | ||
| "azure.storage.blob": azure_blob, | ||
| }) | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def spn_module(): | ||
| _install_stubs() | ||
| sys.modules.pop("rag.utils.azure_spn_conn", None) | ||
| return importlib.import_module("rag.utils.azure_spn_conn") | ||
|
|
||
|
|
||
| @pytest.fixture(scope="module") | ||
| def sas_module(): | ||
| _install_stubs() | ||
| sys.modules.pop("rag.utils.azure_sas_conn", None) | ||
| return importlib.import_module("rag.utils.azure_sas_conn") | ||
|
|
||
|
|
||
| def _make_instance(module, cls_name): | ||
| """Build an instance with a mocked underlying connection, bypassing | ||
| __init__ so we don't need real Azure credentials or connectivity.""" | ||
| cls = getattr(module, cls_name) | ||
| inst = cls.__new__(cls) | ||
| inst.conn = MagicMock() | ||
| return inst | ||
|
|
||
|
|
||
| class TestAzureSpnBucketPrefix: | ||
| """RAGFlowAzureSpnBlob must include the bucket as a path prefix in all | ||
| operations so that identical filenames from different datasets are | ||
| isolated.""" | ||
|
|
||
| def test_put_uses_bucket_prefix(self, spn_module): | ||
| spn = _make_instance(spn_module, "RAGFlowAzureSpnBlob") | ||
| spn.put("kb_a", "doc.pdf", b"data") | ||
| spn.conn.create_file.assert_called_once_with("kb_a/doc.pdf") | ||
|
|
||
| def test_get_uses_bucket_prefix(self, spn_module): | ||
| spn = _make_instance(spn_module, "RAGFlowAzureSpnBlob") | ||
| spn.get("kb_a", "doc.pdf") | ||
| spn.conn.get_file_client.assert_called_once_with("kb_a/doc.pdf") | ||
|
|
||
| def test_rm_uses_bucket_prefix(self, spn_module): | ||
| spn = _make_instance(spn_module, "RAGFlowAzureSpnBlob") | ||
| spn.rm("kb_a", "doc.pdf") | ||
| spn.conn.delete_file.assert_called_once_with("kb_a/doc.pdf") | ||
|
|
||
| def test_obj_exist_uses_bucket_prefix(self, spn_module): | ||
| spn = _make_instance(spn_module, "RAGFlowAzureSpnBlob") | ||
| spn.obj_exist("kb_a", "doc.pdf") | ||
| spn.conn.get_file_client.assert_called_once_with("kb_a/doc.pdf") | ||
|
|
||
| def test_get_presigned_url_uses_bucket_prefix(self, spn_module): | ||
| spn = _make_instance(spn_module, "RAGFlowAzureSpnBlob") | ||
| spn.get_presigned_url("kb_a", "doc.pdf", 3600) | ||
| spn.conn.get_presigned_url.assert_called_once_with("GET", "kb_a/doc.pdf", 3600) | ||
|
|
||
| def test_same_filename_in_different_buckets_does_not_collide(self, spn_module): | ||
| """Regression test for issue #14159: two datasets uploading a file | ||
| with the same name must produce two distinct storage paths.""" | ||
| spn = _make_instance(spn_module, "RAGFlowAzureSpnBlob") | ||
| spn.put("kb_a", "report.pdf", b"data_a") | ||
| spn.put("kb_b", "report.pdf", b"data_b") | ||
| called_paths = [c.args[0] for c in spn.conn.create_file.call_args_list] | ||
| assert called_paths == ["kb_a/report.pdf", "kb_b/report.pdf"] | ||
| assert called_paths[0] != called_paths[1] | ||
|
|
||
|
|
||
| class TestAzureSasBucketPrefix: | ||
| """Same contract for RAGFlowAzureSasBlob.""" | ||
|
|
||
| def test_put_uses_bucket_prefix(self, sas_module): | ||
| sas = _make_instance(sas_module, "RAGFlowAzureSasBlob") | ||
| sas.put("kb_a", "doc.pdf", b"data") | ||
| kwargs = sas.conn.upload_blob.call_args.kwargs | ||
| assert kwargs["name"] == "kb_a/doc.pdf" | ||
|
|
||
| def test_get_uses_bucket_prefix(self, sas_module): | ||
| sas = _make_instance(sas_module, "RAGFlowAzureSasBlob") | ||
| sas.get("kb_a", "doc.pdf") | ||
| sas.conn.download_blob.assert_called_once_with("kb_a/doc.pdf") | ||
|
|
||
| def test_rm_uses_bucket_prefix(self, sas_module): | ||
| sas = _make_instance(sas_module, "RAGFlowAzureSasBlob") | ||
| sas.rm("kb_a", "doc.pdf") | ||
| sas.conn.delete_blob.assert_called_once_with("kb_a/doc.pdf") | ||
|
|
||
| def test_obj_exist_uses_bucket_prefix(self, sas_module): | ||
| sas = _make_instance(sas_module, "RAGFlowAzureSasBlob") | ||
| sas.obj_exist("kb_a", "doc.pdf") | ||
| sas.conn.get_blob_client.assert_called_once_with("kb_a/doc.pdf") | ||
|
|
||
| def test_get_presigned_url_uses_bucket_prefix(self, sas_module): | ||
| sas = _make_instance(sas_module, "RAGFlowAzureSasBlob") | ||
| sas.get_presigned_url("kb_a", "doc.pdf", 3600) | ||
| sas.conn.get_presigned_url.assert_called_once_with("GET", "kb_a/doc.pdf", 3600) | ||
|
|
||
| def test_same_filename_in_different_buckets_does_not_collide(self, sas_module): | ||
| sas = _make_instance(sas_module, "RAGFlowAzureSasBlob") | ||
| sas.put("kb_a", "report.pdf", b"data_a") | ||
| sas.put("kb_b", "report.pdf", b"data_b") | ||
| names = [c.kwargs["name"] for c in sas.conn.upload_blob.call_args_list] | ||
| assert names == ["kb_a/report.pdf", "kb_b/report.pdf"] | ||
| assert names[0] != names[1] | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mirror a legacy-path fallback here as well for upgrade safety.
Like the SPN connector, non-write methods now only address
"{bucket}/{fnm}". Without fallback to legacyfnm, pre-existing blobs written before this PR can become inaccessible.Also applies to: 72-72, 82-82, 90-90
🤖 Prompt for AI Agents