diff --git a/Cargo.lock b/Cargo.lock index 1457f246..2beea7e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -987,7 +987,7 @@ dependencies = [ [[package]] name = "dolma" -version = "1.0.14" +version = "1.1.0" dependencies = [ "adblock", "ahash", diff --git a/docs/taggers.md b/docs/taggers.md index f5e6ddbd..a6d39da3 100644 --- a/docs/taggers.md +++ b/docs/taggers.md @@ -26,7 +26,7 @@ The following parameters are supported either via CLI (e.g. `dolma tag --paramet |`taggers`|Yes| One or more taggers to run. | |`tagger_modules`|No| List of one or more Python modules to load taggers from. See section [*"Using Custom Taggers"*](#using-custom-taggers) for more details. | |`processes`|No| Number of processes to use for tagging. One process is used by default. | -|`ignore_existing`|No| If true, ignore existing outputs and re-run the taggers. | +|`skip_existing`|No| If true, ignore existing outputs and re-run the taggers. | |`dryrun`|No| If true, only print the configuration and exit without running the taggers. | |`debug`|No| If true, run in debug mode (i.e., disable parallelism). Useful when developing new taggers. | |`profile.enable`|No| If true, enable profiling. Useful when benchmarking taggers during development. | diff --git a/pyproject.toml b/pyproject.toml index a4957551..bd3829cf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -112,6 +112,8 @@ dev = [ "isort>=5.10.1", "mypy>=0.971", "pytest>=5.2", + "types-PyYAML", + "types-dateparser" ] # extension to process code code = ["detect-secrets==1.4.0", "beautifulsoup4>=4", "pygments", "regex"] @@ -227,7 +229,6 @@ aggressive = 3 [tool.mypy] python_version = "3.9" ignore_missing_imports = true -no_site_packages = true allow_redefinition = false warn_unused_configs = true warn_unused_ignores = true @@ -238,5 +239,6 @@ show_error_codes = true pretty = true plugins = ["numpy.typing.mypy_plugin"] + [tool.mypy-tests] strict_optional = false diff --git a/python/dolma/cli/__init__.py b/python/dolma/cli/__init__.py index 2ef221d7..3c1fc7f5 100644 --- a/python/dolma/cli/__init__.py +++ b/python/dolma/cli/__init__.py @@ -144,7 +144,7 @@ def namespace_to_nested_omegaconf(args: Namespace, structured: Type[T], config: untyped_config: DictConfig = om.merge( om.create(config or {}), om.create(nested_config_dict) - ) # pyright: ignore (pylance is confused because om.create might return a DictConfig or a ListConfig) + ) # type: ignore # (pylance is confused because om.create might return a DictConfig or a ListConfig) base_structured_config: DictConfig = om.structured(structured) merged_config = om.merge(base_structured_config, untyped_config) @@ -159,7 +159,7 @@ def namespace_to_nested_omegaconf(args: Namespace, structured: Type[T], config: except OmegaConfBaseException as ex: raise DolmaConfigError(f"Invalid error while parsing key `{ex.full_key}`: {type(ex).__name__}") from ex - return merged_config # pyright: ignore + return merged_config # type: ignore # (pylance because same error as above) def print_config(config: Any, console: Optional[Console] = None) -> None: diff --git a/python/dolma/cli/tagger.py b/python/dolma/cli/tagger.py index 9982ec05..93af43ca 100644 --- a/python/dolma/cli/tagger.py +++ b/python/dolma/cli/tagger.py @@ -74,7 +74,7 @@ class TaggerConfig: default=1, help="Number of parallel processes to use.", ) - ignore_existing: bool = field( + skip_existing: bool = field( default=False, help="Whether to ignore existing outputs and re-run the taggers.", ) @@ -132,7 +132,7 @@ def run(cls, parsed_config: TaggerConfig): metadata=work_dirs.output, taggers=taggers, taggers_modules=parsed_config.tagger_modules, - ignore_existing=parsed_config.ignore_existing, + skip_existing=parsed_config.skip_existing, num_processes=parsed_config.processes, experiment=parsed_config.experiment, debug=parsed_config.debug, diff --git a/python/dolma/cli/warc.py b/python/dolma/cli/warc.py index 9a8da2ea..ebed4ea6 100644 --- a/python/dolma/cli/warc.py +++ b/python/dolma/cli/warc.py @@ -39,7 +39,7 @@ class WarcExtractorConfig: default=1, help="Number of parallel processes to use.", ) - ignore_existing: bool = field( + skip_existing: bool = field( default=False, help="Whether to ignore existing outputs and re-run the taggers.", ) @@ -107,7 +107,7 @@ def run(cls, parsed_config: WarcExtractorConfig): destination=(destination[0] if len(destination) == 1 else destination), metadata=work_dirs.output, num_processes=parsed_config.processes, - ignore_existing=parsed_config.ignore_existing, + skip_existing=parsed_config.skip_existing, debug=parsed_config.debug, source_name=source_name, pre_taggers=parsed_config.pre.taggers, diff --git a/python/dolma/core/analyzer.py b/python/dolma/core/analyzer.py index c8d542c0..09a0616b 100644 --- a/python/dolma/core/analyzer.py +++ b/python/dolma/core/analyzer.py @@ -324,7 +324,7 @@ def create_and_run_analyzer( metadata_prefix=metadata_path, debug=debug, seed=seed, - ignore_existing=True, + skip_existing=True, retries_on_error=0, num_processes=num_processes, ) diff --git a/python/dolma/core/data_types.py b/python/dolma/core/data_types.py index d71bbab3..8947d724 100644 --- a/python/dolma/core/data_types.py +++ b/python/dolma/core/data_types.py @@ -75,7 +75,7 @@ def __init__(self, *args, metadata: Optional[Dict[str, Any]] = None, **kwargs) - self.metadata = metadata or {} @classmethod - def from_spec(cls, spec: InputSpecWithMetadata) -> "DocumentWithMetadata": + def from_spec(cls, spec: InputSpecWithMetadata) -> "DocumentWithMetadata": # type: ignore[override] return DocumentWithMetadata( source=spec.source, version=spec.version, @@ -125,7 +125,9 @@ def __init__( self.attributes = attributes or {} @classmethod - def from_spec(cls, spec: InputSpecWithMetadataAndAttributes) -> "DocumentWithMetadataAndAttributes": + def from_spec( # type: ignore[override] + cls, spec: InputSpecWithMetadataAndAttributes + ) -> "DocumentWithMetadataAndAttributes": return DocumentWithMetadataAndAttributes( source=spec.source, version=spec.version, diff --git a/python/dolma/core/parallel.py b/python/dolma/core/parallel.py index 0bbfc75f..79af6ddf 100644 --- a/python/dolma/core/parallel.py +++ b/python/dolma/core/parallel.py @@ -69,7 +69,7 @@ def __init__( debug: bool = False, seed: int = 0, pbar_timeout: float = 1e-3, - ignore_existing: bool = False, + skip_existing: bool = False, include_paths: Optional[List[str]] = None, exclude_paths: Optional[List[str]] = None, files_regex_pattern: Optional[str] = None, @@ -87,7 +87,7 @@ def __init__( file names will also be the same. metadata_prefix (str): The prefix of the metadata files to save. This can be a local path or an S3 path. Metadata output will be created for each file after it is processed. Filenames are - checked to verify if a file has been processed and can be skipped unless `ignore_existing` is + checked to verify if a file has been processed and can be skipped unless `skip_existing` is set to true. num_processes (int, optional): The number of processes to use. Defaults to 1. debug (bool, optional): Whether to run in debug mode; if true, no multiprocessing will be used. @@ -95,7 +95,7 @@ def __init__( seed (int, optional): The random seed to use when shuffling input files. Defaults to 0. pbar_timeout (float, optional): How often to update progress bars in seconds. Defaults to 0.01 seconds. - ignore_existing (bool, optional): Whether to ignore files that have been already processed and + skip_existing (bool, optional): Whether to ignore files that have been already processed and re-run the processor on all files from scratch. Defaults to False. include_paths (Optional[List[str]], optional): A list of paths to include. If provided, only files that match one of the paths will be processed. Defaults to None. @@ -118,7 +118,7 @@ def __init__( self.debug = debug self.seed = seed self.pbar_timeout = pbar_timeout - self.ignore_existing = ignore_existing + self.skip_existing = skip_existing self.include_paths = set(include_paths) if include_paths is not None else None self.exclude_paths = set(exclude_paths) if exclude_paths is not None else None @@ -354,7 +354,7 @@ def __add__(self: BPP, other: BPP) -> BPP: debug=self.debug or other.debug, seed=self.seed, pbar_timeout=max(self.pbar_timeout, other.pbar_timeout), - ignore_existing=self.ignore_existing or other.ignore_existing, + skip_existing=self.skip_existing or other.skip_existing, include_paths=include_paths, exclude_paths=exclude_paths, files_regex_pattern=regex_pattern, @@ -484,7 +484,7 @@ def _get_all_paths(self) -> AllPathsTuple: ) for path in rel_paths: - if not self.ignore_existing and path in existing_metadata_names: + if not self.skip_existing and path in existing_metadata_names: continue if not self._valid_path(path): diff --git a/python/dolma/core/runtime.py b/python/dolma/core/runtime.py index ac5e2a23..b563ea9d 100644 --- a/python/dolma/core/runtime.py +++ b/python/dolma/core/runtime.py @@ -27,8 +27,17 @@ TaggerOutputDictType, ) from .errors import DolmaFatalError, DolmaRetryableFailure, DolmaShardError +from .loggers import get_logger from .parallel import BaseParallelProcessor, QueueType -from .paths import delete_dir, join_path, make_relative, mkdir_p, split_glob, split_path +from .paths import ( + delete_dir, + exists, + join_path, + make_relative, + mkdir_p, + split_glob, + split_path, +) from .registry import TaggerRegistry from .utils import import_modules, make_variable_name @@ -178,10 +187,10 @@ def _make_output_streams( mkdir_p(parent) # open a new file and create a new encoder - io = stack.enter_context(smart_open.open(loc.path, **open_kwargs)) + io_ = stack.enter_context(smart_open.open(loc.path, **open_kwargs)) encoder = msgspec.json.Encoder() opened[loc.path] = TaggerOutputIO( - exp=loc.exp, taggers=set(), path=loc.path, io=io, encoder=encoder + exp=loc.exp, taggers=set(), path=loc.path, io=io_, encoder=encoder ) # keep track of which taggers are writing to this paths @@ -223,7 +232,7 @@ def _write_sample_to_streams( class TaggerProcessor(BaseParallelProcessor): @classmethod - def increment_progressbar( # type: ignore + def increment_progressbar( # type: ignore # pylint: disable=arguments-differ cls, queue: QueueType, # queue must be the first argument, and it should be a positional-only argument /, @@ -245,6 +254,10 @@ def process_single( **kwargs, ): """Lets count run the taggers! We will use the destination path to save each tagger output.""" + + # get a logger + logger = get_logger(cls.__name__) + # import tagger modules taggers_modules = kwargs.get("taggers_modules", None) if taggers_modules is not None: @@ -264,7 +277,9 @@ def process_single( # this is the dictionary that will hold the output of each tagger taggers_paths = _determine_output_paths_for_taggers( - experiment_name=experiment_name, destination=destination_path, taggers=taggers + experiment_name=experiment_name, + destination=destination_path, + taggers=taggers, ) # skip on failure @@ -283,6 +298,27 @@ def process_single( # total number of documents processed total_docs_cnt = 0 + if kwargs.get("skip_existing", False): + # we group taggers by their path (this is for cases when two taggers are going to same file) + # and then remove all taggers if any of the paths exists and skip_existing is True + _taggers_by_path: Dict[str, list[str]] = {} + for tagger_name, tagger_location in taggers_paths.items(): + _taggers_by_path.setdefault(tagger_location.path, []).append(tagger_name) + + # actually take care of removal here + for tagger_path, tagger_names in _taggers_by_path.items(): + if exists(tagger_path): + for tagger_name in tagger_names: + logger.info("Skipping %s because %s already exists.", tagger_name, tagger_path) + taggers.pop(tagger_name) + taggers_paths.pop(tagger_name) + + if not taggers: + # if all taggers have been removed, we return early + cls.increment_progressbar(queue, files=1) + logger.info("All taggers for %s have been skipped.", source_path) + return + # creating dedicated decoder speeds up the process # if any of the taggers require metadata, we use a decoder that can handle it # otherwise, we use a decoder that does not parse metadata, which is faster @@ -327,7 +363,7 @@ def process_single( # double the update interval if the queue is full update_interval *= 2 - except Exception as exp: + except Exception as exp: # pylint: disable=broad-except # handle any exception that might have occurred msg = f"Failed to process {source_path} due to {exp.__class__.__name__}: {' '.join(exp.args)}" if exp.__class__.__name__ == "IncompleteReadError": @@ -383,7 +419,7 @@ def create_and_run_tagger( metadata: Union[None, str, List[str]] = None, debug: bool = False, seed: int = 0, - ignore_existing: bool = False, + skip_existing: bool = False, skip_on_failure: bool = False, retries_on_error: int = 0, num_processes: int = 1, @@ -411,7 +447,7 @@ def create_and_run_tagger( which documents have been processed. If `None`, the metadata will be saved in a temporary directory. debug (bool, optional): Whether to run in debug mode. Defaults to False. seed (int, optional): The seed to use for the random number generator. Defaults to 0. - ignore_existing (bool, optional): Whether to ignore existing outputs and re-run the taggers. + skip_existing (bool, optional): Whether to ignore existing outputs and re-run the taggers. Defaults to False. skip_on_failure (bool, optional): Whether to skip a document if it fails to process. Defaults to False. retries_on_error (int, optional): Number of times to retry processing a document if it fails. @@ -466,7 +502,7 @@ def create_and_run_tagger( metadata_prefix=metadata, debug=debug or profile_enable, # if profile is true, debug must be true seed=seed, - ignore_existing=ignore_existing, + skip_existing=skip_existing, retries_on_error=retries_on_error, num_processes=num_processes, ) diff --git a/python/dolma/core/taggers.py b/python/dolma/core/taggers.py index 59a414f7..4999da45 100644 --- a/python/dolma/core/taggers.py +++ b/python/dolma/core/taggers.py @@ -62,7 +62,7 @@ class BaseTaggerWithMetadata(BaseTagger): def predict(self, doc: DocumentWithMetadata) -> DocResult: # type: ignore raise NotImplementedError - def tag(self, row: InputSpecWithMetadata) -> TaggerOutputDictType: + def tag(self, row: InputSpecWithMetadata) -> TaggerOutputDictType: # type: ignore """Internal function that is used by the tagger to get data""" doc = DocumentWithMetadata.from_spec(row) doc_result = self.predict(doc) diff --git a/python/dolma/core/utils.py b/python/dolma/core/utils.py index c8149e74..a63ac68f 100644 --- a/python/dolma/core/utils.py +++ b/python/dolma/core/utils.py @@ -184,5 +184,6 @@ def _handle_zstd(file_obj, mode): register_compressor(".zstd", _handle_zstd) else: - # add zstd compression - add_compression() + # add zstd compression; in case smart_open has zstd support already, this will error out + # with mypy, so we need the type: ignore[unreachable] comment + add_compression() # type: ignore[unreachable] diff --git a/python/dolma/taggers/language.py b/python/dolma/taggers/language.py index 121fd5c6..1f3b1ea3 100644 --- a/python/dolma/taggers/language.py +++ b/python/dolma/taggers/language.py @@ -17,21 +17,21 @@ from ..core.utils import split_paragraphs with necessary.necessary("cld3", soft=True) as CLD3_AVAILABLE: - if CLD3_AVAILABLE or TYPE_CHECKING: + if CLD3_AVAILABLE or TYPE_CHECKING: # type: ignore[unreachable] import cld3 # pyright:ignore pylint:disable=import-error with necessary.necessary("pycld2", soft=True) as CLD2_AVAILABLE: - if CLD2_AVAILABLE or TYPE_CHECKING: + if CLD2_AVAILABLE or TYPE_CHECKING: # type: ignore[unreachable] import pycld2 as cld2 # pyright:ignore pylint:disable=import-error with necessary.necessary("langdetect", soft=True) as LANGDETECT_AVAILABLE: - if LANGDETECT_AVAILABLE or TYPE_CHECKING: + if LANGDETECT_AVAILABLE or TYPE_CHECKING: # type: ignore[unreachable] from langdetect import PROFILES_DIRECTORY, DetectorFactory, LangDetectException with necessary.necessary("lingua", soft=True) as LINGUA_AVAILABLE: - if LINGUA_AVAILABLE or TYPE_CHECKING: + if LINGUA_AVAILABLE or TYPE_CHECKING: # type: ignore[unreachable] from lingua import Language, LanguageDetectorBuilder diff --git a/python/dolma/tokenizer/tokenizer.py b/python/dolma/tokenizer/tokenizer.py index 4a7d68c1..c6cbd8ea 100644 --- a/python/dolma/tokenizer/tokenizer.py +++ b/python/dolma/tokenizer/tokenizer.py @@ -11,7 +11,14 @@ from os import PathLike from pathlib import Path from tempfile import TemporaryDirectory -from typing import TYPE_CHECKING, Generator, List, Optional, Tuple, Union +from typing import ( # type: ignore[unreachable,unused-ignore] + TYPE_CHECKING, + Generator, + List, + Optional, + Tuple, + Union, +) import msgspec import numpy as np @@ -25,8 +32,10 @@ from .data_types import InputSpec, TokenizerOutput with necessary("transformers", soft=True) as TRANSFORMERS_AVAILABLE: - if TYPE_CHECKING or TRANSFORMERS_AVAILABLE: - from transformers import AutoTokenizer # pylint: disable=import-error + if TYPE_CHECKING or TRANSFORMERS_AVAILABLE: # type: ignore[unreachable,unused-ignore] + from transformers import ( # pyright: ignore # pylint: disable=import-error + AutoTokenizer, + ) PathOrStr = Union[str, PathLike] @@ -365,7 +374,6 @@ def tokenize_file( file, each containing a field named `text`. """ tokenizer = make_tokenizer(tokenizer_name_or_path, **tokenizer_kwargs) - dtype = deepcopy(tokenizer.dtype) decoder = msgspec.json.Decoder(InputSpec) with smart_open.open(path, mode="rt") as input_stream: for i, line in enumerate(input_stream, start=1): @@ -376,8 +384,8 @@ def tokenize_file( tokens = tokenizer.encode(text, add_special_tokens=True) if refresh_tokenizer_every: # extra copy to prevent memory leaks - tokens = np.array(tokens, dtype=dtype) - yield TokenizerOutput.from_tokens(id=row.id, src=path, loc=i, tokens=tokens) # pyright: ignore + tokens = deepcopy(tokens) + yield TokenizerOutput.from_tokens(id=row.id, src=path, loc=i, tokens=tokens) if refresh_tokenizer_every > 0 and i % refresh_tokenizer_every == 0: # to prevent memory leaks, we refresh the tokenizer every so often diff --git a/python/dolma/warc/linearizers.py b/python/dolma/warc/linearizers.py index a99c0775..3569e588 100644 --- a/python/dolma/warc/linearizers.py +++ b/python/dolma/warc/linearizers.py @@ -8,12 +8,12 @@ from .utils import raise_warc_dependency_error with necessary("trafilatura", soft=True) as TRAFILATURA_AVAILABLE: - if TRAFILATURA_AVAILABLE or TYPE_CHECKING: + if TRAFILATURA_AVAILABLE or TYPE_CHECKING: # type: ignore[unreachable] import trafilatura # noqa: F401 import trafilatura.meta # noqa: F401 with necessary("resiliparse", soft=True) as RESILIPARSE_AVAILABLE: - if RESILIPARSE_AVAILABLE or TYPE_CHECKING: + if RESILIPARSE_AVAILABLE or TYPE_CHECKING: # type: ignore[unreachable] from resiliparse.extract.html2text import extract_plain_text # noqa: F401 from resiliparse.parse.encoding import detect_encoding # noqa: F401 from resiliparse.parse.html import HTMLTree # noqa: F401 diff --git a/python/dolma/warc/processor.py b/python/dolma/warc/processor.py index 474c6ca9..71e2adc7 100644 --- a/python/dolma/warc/processor.py +++ b/python/dolma/warc/processor.py @@ -23,11 +23,11 @@ from .utils import UrlNormalizer, raise_warc_dependency_error with necessary("fastwarc", soft=True) as FASTWARC_AVAILABLE: - if FASTWARC_AVAILABLE or TYPE_CHECKING: - from fastwarc.warc import ArchiveIterator, WarcRecordType + if FASTWARC_AVAILABLE or TYPE_CHECKING: # type: ignore[unreachable] + from fastwarc.warc import ArchiveIterator, WarcHeaderMap, WarcRecordType with necessary("dateparser", soft=True) as DATEPARSER_AVAILABLE: - if DATEPARSER_AVAILABLE or TYPE_CHECKING: + if DATEPARSER_AVAILABLE or TYPE_CHECKING: # type: ignore[unreachable] import dateparser @@ -164,11 +164,13 @@ def process_single( if not decoded_content: continue - # metadata - ctype, *_ = (record.http_headers.get("Content-Type") or "").split(";") - date = cls._parse_warc_timestamp(record.http_headers.get("Date")) - target_uri = record.headers.get("WARC-Target-URI") - payload_id = record.headers.get("WARC-Payload-Digest").split(":")[1].lower() + # collect metadata + # in newer versions of fastwarc, the http_headers could be None if not found + http_headers = record.http_headers or WarcHeaderMap() + ctype, *_ = (http_headers.get("Content-Type") or "").split(";") + date = cls._parse_warc_timestamp(http_headers.get("Date") or "") + target_uri = record.headers.get("WARC-Target-URI") or "" + payload_id = (record.headers.get("WARC-Payload-Digest") or "").split(":")[1].lower() metadata = dict( warc_url=target_uri, url=url_normalizer(target_uri), @@ -237,7 +239,7 @@ def create_and_run_warc_pipeline( metadata: Union[None, str, List[str]] = None, debug: bool = False, seed: int = 0, - ignore_existing: bool = False, + skip_existing: bool = False, skip_on_failure: bool = False, retries_on_error: int = 0, num_processes: int = 1, @@ -289,7 +291,7 @@ def create_and_run_warc_pipeline( metadata_prefix=all_meta_paths, debug=debug, seed=seed, - ignore_existing=ignore_existing, + skip_existing=skip_existing, retries_on_error=retries_on_error, num_processes=num_processes, ) diff --git a/python/dolma/warc/utils.py b/python/dolma/warc/utils.py index f21d2651..9e1e14fb 100644 --- a/python/dolma/warc/utils.py +++ b/python/dolma/warc/utils.py @@ -6,11 +6,11 @@ from ..core.errors import DolmaFatalError with necessary("w3lib", soft=True) as W3LIB_AVAILABLE: - if W3LIB_AVAILABLE or TYPE_CHECKING: + if W3LIB_AVAILABLE or TYPE_CHECKING: # type: ignore[unreachable] from w3lib.url import canonicalize_url # noqa: F401 with necessary("url_normalize", soft=True) as URL_NORMALIZE_AVAILABLE: - if URL_NORMALIZE_AVAILABLE or TYPE_CHECKING: + if URL_NORMALIZE_AVAILABLE or TYPE_CHECKING: # type: ignore[unreachable] from url_normalize import url_normalize # noqa: F401 diff --git a/scripts/sample_prefix.py b/scripts/sample_prefix.py index 42f67ba5..59b05db2 100644 --- a/scripts/sample_prefix.py +++ b/scripts/sample_prefix.py @@ -5,7 +5,7 @@ import necessary with necessary.necessary("click") as CLICK_AVAILABLE: - if CLICK_AVAILABLE or TYPE_CHECKING: + if CLICK_AVAILABLE or TYPE_CHECKING: # type: ignore[unreachable] import click diff --git a/tests/python/test_parallel.py b/tests/python/test_parallel.py index 1287247a..1fd74244 100644 --- a/tests/python/test_parallel.py +++ b/tests/python/test_parallel.py @@ -41,7 +41,7 @@ def test_base_parallel_processor(self): source_prefix=str(LOCAL_DATA / "expected"), destination_prefix=f"{d}/destination", metadata_prefix=f"{d}/metadata", - ignore_existing=False, + skip_existing=False, ) proc() src = [p for p in os.listdir(LOCAL_DATA / "expected") if not p.startswith(".")] @@ -55,7 +55,7 @@ def test_base_parallel_processor(self): source_prefix=str(LOCAL_DATA / "expected" / "*-paragraphs.*"), destination_prefix=f"{d}/destination", metadata_prefix=f"{d}/metadata", - ignore_existing=False, + skip_existing=False, ) proc() src = [p for p in os.listdir(LOCAL_DATA / "expected") if "paragraphs" in p] diff --git a/tests/python/test_warc.py b/tests/python/test_warc.py index 04f0e9a7..57fd9b45 100644 --- a/tests/python/test_warc.py +++ b/tests/python/test_warc.py @@ -27,7 +27,7 @@ def _run_pipeline(self, html: bool = False, pretag: bool = False) -> Dict[str, L documents=[f"{DATA_PATH}/*.warc.gz"], destination=[self.tempdir], num_processes=1, - ignore_existing=False, + skip_existing=False, debug=True, source_name="test", skip_no_pre_taggers=pretag,