Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions cassis/cas.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
TypeCheckError,
TypeSystem,
TypeSystemMode,
load_typesystem,
)

_validator_optional_string = validators.optional(validators.instance_of(str))
Expand Down Expand Up @@ -832,6 +833,105 @@ def _copy(self) -> "Cas":
result._xmi_id_generator = self._xmi_id_generator
return result

def deep_copy(self, copy_typesystem: bool = False) -> "Cas":
"""
Returns a deep copy of the current Cas
:param copy_typesystem: whether to copy the original typesystem or not
Comment thread
reckart marked this conversation as resolved.
Outdated
"""
ts = self.typesystem
if copy_typesystem:
ts = self.typesystem.to_xml()
ts = load_typesystem(ts)

cas_copy = Cas(ts,
document_language=self.document_language,
lenient=self._lenient,
sofa_mime=self.sofa_mime,
)

Comment thread
reckart marked this conversation as resolved.
Outdated
# basic
cas_copy._xmi_id_generator = IdGenerator(initial_id=self._xmi_id_generator._next_id)
cas_copy._sofa_num_generator = IdGenerator(initial_id=self._sofa_num_generator._next_id)

cas_copy._views = {}
cas_copy._sofas = {}

# sofas
for sofa in self.sofas:

sofa_copy = Sofa(
sofaID=sofa.sofaID,
sofaNum=sofa.sofaNum,
type=ts.get_type(sofa.type.name),
xmiID=sofa.xmiID,
)
sofa_copy.mimeType = sofa.mimeType
sofa_copy.sofaArray = sofa.sofaArray
Comment thread
reckart marked this conversation as resolved.
sofa_copy.sofaString = sofa.sofaString
sofa_copy.sofaURI = sofa.sofaURI

cas_copy._sofas[sofa_copy.sofaID] = sofa_copy
cas_copy._views[sofa_copy.sofaID] = View(sofa=sofa_copy)
Comment thread
reckart marked this conversation as resolved.

references = dict()
referenced_lists = dict()
referenced_arrays = dict()

all_copied_fs = dict()

for fs in self._find_all_fs():
Comment thread
reckart marked this conversation as resolved.
Outdated

# change view based on sofaID of item.sofa
if hasattr(fs, 'sofa'):
cas_copy._current_view = cas_copy._views[fs.sofa.sofaID]

t = ts.get_type(fs.type.name)
fs_copy = t()
Comment thread
reckart marked this conversation as resolved.

for feature in t.all_features:
if ts.is_primitive(feature.rangeType) or ts.is_primitive_collection(feature.rangeType):
fs_copy[feature.name] = fs.get(feature.name)
elif feature.name not in ["FSArray", "sofa"]:
Comment thread
reckart marked this conversation as resolved.
Outdated
if hasattr(fs[feature.name], 'xmiID') and fs[feature.name].xmiID is not None:
references.setdefault(feature.name, [])
references[feature.name].append((fs.xmiID, fs[feature.name].xmiID))
Comment thread
reckart marked this conversation as resolved.
Outdated
elif ts.is_list(feature.rangeType):
Comment thread
reckart marked this conversation as resolved.
Outdated
referenced_list = []
for item in fs[feature.name]:
if hasattr(item, 'xmiID') and item.xmiID is not None:
referenced_list.append(item.xmiID)
if len(referenced_list) > 0:
Comment thread
reckart marked this conversation as resolved.
Outdated
referenced_lists[feature.name].append((fs.xmiID, referenced_list))
elif ts.is_array(feature.rangeType):
referenced_list = []
for item in fs[feature.name].elements:
Comment thread
reckart marked this conversation as resolved.
Outdated
if hasattr(item, 'xmiID') and item.xmiID is not None:
referenced_list.append(item.xmiID)
referenced_arrays.setdefault(feature.name, [])
referenced_arrays[feature.name].append((fs.xmiID, referenced_list))

Comment thread
reckart marked this conversation as resolved.
fs_copy.xmiID = fs.xmiID
if hasattr(fs_copy, 'sofa'):
cas_copy.add(fs_copy, keep_id=True)
all_copied_fs[fs_copy.xmiID] = fs_copy

for feature, pairs in references.items():
for current_ID, reference_ID in pairs:
try:
all_copied_fs[current_ID][feature] = all_copied_fs[reference_ID]
except KeyError as e:
print("Reference feature", current_ID, "not found.", feature, e)
Comment thread
reckart marked this conversation as resolved.
Outdated

for feature, pairs in referenced_arrays.items():
for current_ID, referenced_list in pairs:
ts = cas_copy.typesystem
array_copy = ts.get_type("FSArray")()
Comment thread
reckart marked this conversation as resolved.
Outdated
array_copy.elements = []
for reference_ID in referenced_list:
array_copy.elements.append(all_copied_fs[reference_ID])
all_copied_fs[current_ID][feature] = array_copy
return cas_copy


def _sort_func(a: FeatureStructure) -> Tuple[int, int, int]:
d = a.__slots__
Expand Down
25 changes: 25 additions & 0 deletions tests/test_cas.py
Comment thread
reckart marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -540,3 +540,28 @@ def test_covered_text_on_annotation_without_sofa():

with pytest.raises(AnnotationHasNoSofa):
ann.get_covered_text()


def test_deep_copy_without_typesystem(small_xmi, small_typesystem_xml):
org = load_cas_from_xmi(small_xmi, typesystem=load_typesystem(small_typesystem_xml))
copy = org.deep_copy(copy_typesystem=False)

assert org != copy
assert len(copy.to_json(pretty_print=True)) == len(org.to_json(pretty_print=True))
assert copy.to_json(pretty_print=True) == org.to_json(pretty_print=True)

assert org.typesystem == copy.typesystem


def test_deep_copy_with_typesystem(small_xmi, small_typesystem_xml):
org = load_cas_from_xmi(small_xmi, typesystem=load_typesystem(small_typesystem_xml))
copy = org.deep_copy(copy_typesystem=True)

assert org != copy
assert len(copy.to_json(pretty_print=True)) == len(org.to_json(pretty_print=True))
assert copy.to_json(pretty_print=True) == org.to_json(pretty_print=True)


assert org.typesystem != copy.typesystem
assert len(org.typesystem.to_xml()) == len(copy.typesystem.to_xml())
assert org.typesystem.to_xml() == copy.typesystem.to_xml()