diff --git a/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineFiles.java b/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineFiles.java index a9d7cb3f..8a231243 100644 --- a/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineFiles.java +++ b/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineFiles.java @@ -19,14 +19,25 @@ import java.io.File; import java.io.FileFilter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; import org.dkpro.jwpl.wikimachine.debug.ILogger; import org.dkpro.jwpl.wikimachine.domain.Files; +import org.dkpro.jwpl.wikimachine.util.DumpFileDiscovery; /** * A {@link Files} implementation specific for the DataMachine tool. * It defines file name constants and provides methods for * input/output directory building rules and checks. + *

+ * Wikimedia publishes large XML dumps split across several files (see + * {@link DumpFileDiscovery}). For the {@code pages-articles} and {@code pages-meta-current} + * roles this class keeps the ordered list of parts and exposes both the legacy singular + * getter (first part of the ordered list, for backwards compatibility) and a list getter + * that returns every part. * * @see Files */ @@ -35,8 +46,8 @@ public class DataMachineFiles { private final static String INPUT_PAGELINKS = "pagelinks.sql"; private final static String INPUT_CATEGORYLINKS = "categorylinks.sql"; - private final static String INPUT_PAGESARTICLES = "pages-articles.xml"; - private final static String INPUT_PAGESMETACURRENT = "pages-meta-current.xml"; + private final static String INPUT_PAGESARTICLES = "pages-articles"; + private final static String INPUT_PAGESMETACURRENT = "pages-meta-current"; private final static String GENERATED_PAGE = "page.bin"; private final static String GENERATED_REVISION = "revision.bin"; @@ -48,13 +59,15 @@ public class DataMachineFiles private final static String ARCHIVE_EXTENSION = ".gz"; + private final static Set SUPPORTED_EXTENSIONS = Set.of("bz2", "gz", "7z"); + private File dataDirectory = new File("."); private boolean compressGeneratedFiles = false; private File inputPagelinks = null; - private File inputPagesarticles = null; private File inputCategorylinks = null; - private File inputPagesMetaCurrent = null; + private List inputPagesarticles = new ArrayList<>(); + private List inputPagesMetaCurrent = new ArrayList<>(); /** * Instantiates a {@link Files} object with the specified {@code logger}. @@ -77,9 +90,9 @@ public DataMachineFiles(DataMachineFiles files) super(files); this.dataDirectory = files.dataDirectory; this.inputPagelinks = files.inputPagelinks; - this.inputPagesarticles = files.inputPagesarticles; + this.inputPagesarticles = new ArrayList<>(files.inputPagesarticles); this.inputCategorylinks = files.inputCategorylinks; - this.inputPagesMetaCurrent = files.inputPagesMetaCurrent; + this.inputPagesMetaCurrent = new ArrayList<>(files.inputPagesMetaCurrent); this.compressGeneratedFiles = files.compressGeneratedFiles; } @@ -108,30 +121,34 @@ private boolean checkDataMachineSourceFiles() { final FileFilter supportedFormatFilter = file -> { final String name = file.getName(); - // See UniversalDecompressor for all built-in decompression formats. For now: return name.endsWith(".7z") || name.endsWith(".gz") || name.endsWith(".bz2"); }; final File[] files = dataDirectory.listFiles(supportedFormatFilter); - if (files != null && files.length > 2) { + if (files != null && files.length >= 3) { + final List articleParts = new ArrayList<>(); + final List metaCurrentParts = new ArrayList<>(); for (File currentFile : files) { - String currentFileName = currentFile.getName(); - if (currentFileName.contains(INPUT_PAGESARTICLES)) { - inputPagesarticles = currentFile; + final String name = currentFile.getName(); + if (DumpFileDiscovery.matchesRole(name, INPUT_PAGESARTICLES, SUPPORTED_EXTENSIONS)) { + articleParts.add(currentFile); + } + else if (DumpFileDiscovery.matchesRole(name, INPUT_PAGESMETACURRENT, + SUPPORTED_EXTENSIONS)) { + metaCurrentParts.add(currentFile); } - else if (currentFileName.contains(INPUT_PAGELINKS)) { + else if (name.contains(INPUT_PAGELINKS)) { inputPagelinks = currentFile; } - else if (currentFileName.contains(INPUT_CATEGORYLINKS)) { + else if (name.contains(INPUT_CATEGORYLINKS)) { inputCategorylinks = currentFile; } - else if (currentFileName.contains(INPUT_PAGESMETACURRENT)) { - inputPagesMetaCurrent = currentFile; - } } + inputPagesarticles = DumpFileDiscovery.orderByPageRange(articleParts); + inputPagesMetaCurrent = DumpFileDiscovery.orderByPageRange(metaCurrentParts); } // either inputPagesarticles or inputPagesMetaCurrent have to be placed // in the input directory - return !((inputPagesarticles == null && inputPagesMetaCurrent == null) + return !((inputPagesarticles.isEmpty() && inputPagesMetaCurrent.isEmpty()) || inputPagelinks == null || inputCategorylinks == null); } @@ -179,14 +196,29 @@ public String getInputPageLinks() } /** - * @return Retrieves the absolute path of the {@code pages-articles.xml} file. + * @return Retrieves the absolute path of the first {@code pages-articles.xml} part, + * or {@code null} if none was discovered. For multi-part dumps, prefer + * {@link #getInputPagesArticlesFiles()}. */ public String getInputPagesArticles() { - if (inputPagesarticles == null) { + if (inputPagesarticles.isEmpty()) { checkDataMachineSourceFiles(); } - return inputPagesarticles != null ? inputPagesarticles.getAbsolutePath() : null; + return inputPagesarticles.isEmpty() ? null : inputPagesarticles.get(0).getAbsolutePath(); + } + + /** + * @return Absolute paths of all {@code pages-articles.xml} parts ordered by ascending page + * range. Empty if the dump is not available. A single-file dump yields a list of + * size 1. + */ + public List getInputPagesArticlesFiles() + { + if (inputPagesarticles.isEmpty()) { + checkDataMachineSourceFiles(); + } + return toAbsolutePathList(inputPagesarticles); } /** @@ -201,14 +233,41 @@ public String getInputCategoryLinks() } /** - * @return Retrieves the absolute path of the {@code pages-meta-current.xml} file. + * @return Retrieves the absolute path of the first {@code pages-meta-current.xml} part, + * or {@code null} if none was discovered. For multi-part dumps, prefer + * {@link #getInputPagesMetaCurrentFiles()}. */ public String getInputPagesMetaCurrent() { - if (inputPagesMetaCurrent == null) { + if (inputPagesMetaCurrent.isEmpty()) { + checkDataMachineSourceFiles(); + } + return inputPagesMetaCurrent.isEmpty() ? null + : inputPagesMetaCurrent.get(0).getAbsolutePath(); + } + + /** + * @return Absolute paths of all {@code pages-meta-current.xml} parts ordered by ascending + * page range. Empty if the dump is not available. + */ + public List getInputPagesMetaCurrentFiles() + { + if (inputPagesMetaCurrent.isEmpty()) { checkDataMachineSourceFiles(); } - return inputPagesMetaCurrent != null ? inputPagesMetaCurrent.getAbsolutePath() : null; + return toAbsolutePathList(inputPagesMetaCurrent); + } + + private static List toAbsolutePathList(List files) + { + if (files.isEmpty()) { + return Collections.emptyList(); + } + final List paths = new ArrayList<>(files.size()); + for (File f : files) { + paths.add(f.getAbsolutePath()); + } + return paths; } private String getGeneratedPath(String fileName) diff --git a/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineGenerator.java b/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineGenerator.java index 4e95ad26..3157f0c4 100644 --- a/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineGenerator.java +++ b/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/domain/DataMachineGenerator.java @@ -18,6 +18,9 @@ package org.dkpro.jwpl.datamachine.domain; import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; import org.dkpro.jwpl.datamachine.dump.xml.XML2Binary; import org.dkpro.jwpl.wikimachine.domain.AbstractSnapshotGenerator; @@ -85,7 +88,14 @@ private void processInputDump() throws IOException { logger.log("Parsing input dumps..."); - new XML2Binary(decompressor.getInputStream(getPagesArticlesFile()), files); + final List parts = getPagesArticlesFiles(); + final List streams = new ArrayList<>(parts.size()); + for (String part : parts) { + streams.add(decompressor.getInputStream(part)); + } + // A single-file dump reduces to a one-element list; the multi-part XML2Binary + // constructor handles both cases uniformly via MultiPartXmlDumpReader. + new XML2Binary(streams, files); dumpVersionProcessor.setDumpVersions(new IDumpVersion[] { version }); @@ -111,30 +121,28 @@ private void processInputDump() throws IOException } /** - * Parses either {@code pages-articles.xml} or {@code pages-meta-current.xml}. - * If both files exist in the input directory {@code pages-meta-current.xml} will be favored. + * Selects the input articles dump in preferred order: {@code pages-meta-current} (when + * available — includes discussions) falls back to {@code pages-articles}. Returns every + * part of the selected role in ascending page-range order; a single-file dump yields a + * list of size 1. * - * @return the input articles dump + * @return the ordered list of input articles dump parts + * @throws IOException If neither dump role is present. */ - private String getPagesArticlesFile() + private List getPagesArticlesFiles() throws IOException { - String pagesArticlesFile = null; - String parseMessage = null; - - // Use of minimal dump only with articles - if (files.getInputPagesArticles() != null) { - pagesArticlesFile = files.getInputPagesArticles(); - parseMessage = "Discussions are unavailable"; + final List metaCurrent = files.getInputPagesMetaCurrentFiles(); + if (!metaCurrent.isEmpty()) { + logger.log("Discussions are available"); + return metaCurrent; } - - // Use of dump with discussions - if (files.getInputPagesMetaCurrent() != null) { - pagesArticlesFile = files.getInputPagesMetaCurrent(); - parseMessage = "Discussions are available"; + final List articles = files.getInputPagesArticlesFiles(); + if (!articles.isEmpty()) { + logger.log("Discussions are unavailable"); + return articles; } - - logger.log(parseMessage); - return pagesArticlesFile; + throw new IOException("No pages-articles or pages-meta-current dump found in the input " + + "directory."); } private PageParser createPageParser() throws IOException diff --git a/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/dump/xml/XML2Binary.java b/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/dump/xml/XML2Binary.java index 62a731c2..cfb2cca5 100644 --- a/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/dump/xml/XML2Binary.java +++ b/dkpro-jwpl-datamachine/src/main/java/org/dkpro/jwpl/datamachine/dump/xml/XML2Binary.java @@ -19,10 +19,13 @@ import java.io.IOException; import java.io.InputStream; +import java.util.List; import org.dkpro.jwpl.datamachine.domain.DataMachineFiles; +import org.dkpro.jwpl.mwdumper.importer.DumpWriter; import org.dkpro.jwpl.mwdumper.importer.NamespaceFilter; import org.dkpro.jwpl.mwdumper.importer.XmlDumpReader; +import org.dkpro.jwpl.wikimachine.dump.xml.MultiPartXmlDumpReader; /** * Use org.mediawiki.importer engine to parse the XML-Dump (only useful fields) and store it to @@ -51,16 +54,36 @@ public class XML2Binary */ public XML2Binary(InputStream iStream, DataMachineFiles files) throws IOException { + final DumpWriter writer = new NamespaceFilter(new SimpleBinaryDumpWriter(files), + ENABLED_NAMESPACES); if (USE_MODIFIED_PARSER) { // modified parser, skips faulty tags - new SimpleXmlDumpReader(iStream, - new NamespaceFilter(new SimpleBinaryDumpWriter(files), ENABLED_NAMESPACES)).readDump(); + new SimpleXmlDumpReader(iStream, writer).readDump(); } else { // original MWDumper parser, very sensible to not closed tags - new XmlDumpReader(iStream, - new NamespaceFilter(new SimpleBinaryDumpWriter(files), ENABLED_NAMESPACES)).readDump(); + new XmlDumpReader(iStream, writer).readDump(); } } + /** + * Instantiates an {@link XML2Binary} for a multi-part Wikipedia XML dump. Every stream in + * {@code iStreams} must be a self-contained XML document with its own {@code } + * root; events across parts are collapsed into a single logical document by the underlying + * {@link MultiPartXmlDumpReader}. + * + * @param iStreams Ordered list of XML part streams (ascending page-range). Must not be + * {@code null} or empty; must not contain {@code null} elements. + * @param files The {@link DataMachineFiles} configuration to apply. + * @throws IOException Thrown if IO errors occurred during processing. + */ + public XML2Binary(List iStreams, DataMachineFiles files) throws IOException + { + final DumpWriter writer = new NamespaceFilter(new SimpleBinaryDumpWriter(files), + ENABLED_NAMESPACES); + // The modified parser is always used for multi-part — the original XmlDumpReader is + // only kept as a fallback for its stricter single-document parsing. + MultiPartXmlDumpReader.readDumps(iStreams, writer, SimpleXmlDumpReader::new); + } + } diff --git a/dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/DataMachineFilesTest.java b/dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/DataMachineFilesTest.java index ec8ec719..f9804298 100644 --- a/dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/DataMachineFilesTest.java +++ b/dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/DataMachineFilesTest.java @@ -28,12 +28,14 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; import org.dkpro.jwpl.datamachine.factory.DefaultDataMachineEnvironmentFactory; import org.dkpro.jwpl.wikimachine.factory.IEnvironmentFactory; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullAndEmptySource; import org.junit.jupiter.params.provider.ValueSource; @@ -224,5 +226,72 @@ void testGetGeneratedText(boolean useCompression) { void testGetGeneratedDiscussions() { assertEquals(TEST_OUTPUT_DIR + "discussions.bin", dmFiles.getGeneratedDiscussions()); } - + + @Test + void testGetInputPagesArticlesFilesSingleFile() { + assertEquals(List.of(TEST_OUTPUT_DIR + "pages-articles.xml.bz2"), + dmFiles.getInputPagesArticlesFiles()); + } + + @Test + void testMultiFilePagesArticlesAreGroupedAndOrdered(@TempDir Path dir) throws IOException { + Files.createFile(dir.resolve("dewiki-20260101-pages-articles2.xml-p297013p1262093.bz2")); + Files.createFile(dir.resolve("dewiki-20260101-pages-articles1.xml-p1p297012.bz2")); + Files.createFile(dir.resolve("dewiki-20260101-pages-articles3.xml-p1262094p2762093.bz2")); + Files.createFile(dir.resolve("pagelinks.sql.gz")); + Files.createFile(dir.resolve("categorylinks.sql.gz")); + + DataMachineFiles files = new DataMachineFiles(factory.getLogger()); + files.setDataDirectory(dir.toAbsolutePath().toString()); + files.checkAll(); + + List parts = files.getInputPagesArticlesFiles(); + assertEquals(3, parts.size()); + assertTrue(parts.get(0).endsWith("pages-articles1.xml-p1p297012.bz2")); + assertTrue(parts.get(1).endsWith("pages-articles2.xml-p297013p1262093.bz2")); + assertTrue(parts.get(2).endsWith("pages-articles3.xml-p1262094p2762093.bz2")); + + // Legacy getter returns the first part for backwards compatibility. + assertTrue(files.getInputPagesArticles().endsWith("pages-articles1.xml-p1p297012.bz2")); + } + + @Test + void testPagesArticlesMultistreamDoesNotMatchPagesArticlesRole(@TempDir Path dir) + throws IOException { + Files.createFile(dir.resolve("dewiki-20260101-pages-articles-multistream.xml.bz2")); + Files.createFile(dir.resolve("dewiki-20260101-pages-articles.xml.bz2")); + Files.createFile(dir.resolve("pagelinks.sql.gz")); + Files.createFile(dir.resolve("categorylinks.sql.gz")); + + DataMachineFiles files = new DataMachineFiles(factory.getLogger()); + files.setDataDirectory(dir.toAbsolutePath().toString()); + files.checkAll(); + + assertEquals(1, files.getInputPagesArticlesFiles().size()); + assertTrue(files.getInputPagesArticlesFiles().get(0).endsWith("pages-articles.xml.bz2")); + } + + @Test + void testMultiFilePagesMetaCurrent(@TempDir Path dir) throws IOException { + Files.createFile(dir.resolve("enwiki-20250601-pages-meta-current2.xml-p100p200.bz2")); + Files.createFile(dir.resolve("enwiki-20250601-pages-meta-current1.xml-p1p99.bz2")); + Files.createFile(dir.resolve("pagelinks.sql.gz")); + Files.createFile(dir.resolve("categorylinks.sql.gz")); + + DataMachineFiles files = new DataMachineFiles(factory.getLogger()); + files.setDataDirectory(dir.toAbsolutePath().toString()); + files.checkAll(); + + List parts = files.getInputPagesMetaCurrentFiles(); + assertEquals(2, parts.size()); + assertTrue(parts.get(0).endsWith("pages-meta-current1.xml-p1p99.bz2")); + assertTrue(parts.get(1).endsWith("pages-meta-current2.xml-p100p200.bz2")); + } + + @Test + void testGetInputPagesArticlesFilesEmptyForMissingDump(@TempDir Path dir) { + DataMachineFiles files = new DataMachineFiles(factory.getLogger()); + files.setDataDirectory(dir.toAbsolutePath().toString()); + assertTrue(files.getInputPagesArticlesFiles().isEmpty()); + } } diff --git a/dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/XML2BinaryMultiPartTest.java b/dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/XML2BinaryMultiPartTest.java new file mode 100644 index 00000000..92b559db --- /dev/null +++ b/dkpro-jwpl-datamachine/src/test/java/org/dkpro/jwpl/datamachine/domain/XML2BinaryMultiPartTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.datamachine.domain; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import org.dkpro.jwpl.datamachine.dump.xml.XML2Binary; +import org.dkpro.jwpl.datamachine.factory.DefaultDataMachineEnvironmentFactory; +import org.dkpro.jwpl.wikimachine.factory.IEnvironmentFactory; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Exercises the multi-part {@link XML2Binary#XML2Binary(java.util.List, DataMachineFiles)} + * constructor end-to-end: two self-contained XML parts fed through the multi-part pipeline + * must produce the same {@code page.bin} / {@code revision.bin} / {@code text.bin} bytes as a + * single-document dump containing the same pages. + */ +class XML2BinaryMultiPartTest +{ + + private static final IEnvironmentFactory FACTORY = + DefaultDataMachineEnvironmentFactory.getInstance(); + + private static final String PART_HEADER = + "\n" + + "\n" + + " \n" + + " Test Wiki\n" + + " http://test.example/\n" + + " MediaWiki-test\n" + + " first-letter\n" + + " \n" + + " \n" + + " Talk\n" + + " \n" + + " \n"; + private static final String PART_FOOTER = "\n"; + + private static String pageBlock(int pageId, int revisionId, String title) + { + return " \n" + + " " + title + "\n" + + " " + pageId + "\n" + + " \n" + + " " + revisionId + "\n" + + " 2020-01-01T00:00:00Z\n" + + " \n" + + " Alice\n" + + " 100\n" + + " \n" + + " Body of " + title + "\n" + + " \n" + + " \n"; + } + + private static InputStream stream(String xml) + { + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } + + private static DataMachineFiles filesFor(Path dir) + { + DataMachineFiles files = new DataMachineFiles(FACTORY.getLogger()); + files.setDataDirectory(dir.toAbsolutePath().toString()); + return files; + } + + @Test + void multiPartProducesSameBinariesAsEquivalentSingleDocument( + @TempDir Path singleDir, @TempDir Path multiDir) throws IOException + { + // Two pages. + final String pageOne = pageBlock(1, 10, "Page One"); + final String pageTwo = pageBlock(2, 20, "Page Two"); + + // Single-document reference: one containing both pages. + final String singleDoc = PART_HEADER + pageOne + pageTwo + PART_FOOTER; + DataMachineFiles singleFiles = filesFor(singleDir); + try (InputStream in = stream(singleDoc)) { + new XML2Binary(in, singleFiles); + } + + // Multi-part input: same two pages as two self-contained XML documents. + final String partA = PART_HEADER + pageOne + PART_FOOTER; + final String partB = PART_HEADER + pageTwo + PART_FOOTER; + DataMachineFiles multiFiles = filesFor(multiDir); + new XML2Binary(List.of(stream(partA), stream(partB)), multiFiles); + + assertBinariesEqual(singleFiles, multiFiles); + } + + @Test + void multiPartWithSingleElementListIsEquivalentToSingleStream( + @TempDir Path singleDir, @TempDir Path multiDir) throws IOException + { + final String doc = PART_HEADER + pageBlock(7, 77, "Solo") + PART_FOOTER; + + DataMachineFiles singleFiles = filesFor(singleDir); + try (InputStream in = stream(doc)) { + new XML2Binary(in, singleFiles); + } + + DataMachineFiles multiFiles = filesFor(multiDir); + new XML2Binary(List.of(stream(doc)), multiFiles); + + assertBinariesEqual(singleFiles, multiFiles); + } + + private static void assertBinariesEqual(DataMachineFiles a, DataMachineFiles b) + throws IOException + { + final Path pageA = Path.of(a.getGeneratedPage()); + final Path pageB = Path.of(b.getGeneratedPage()); + assertTrue(Files.exists(pageA) && Files.size(pageA) > 0, + "page.bin missing or empty (single)"); + assertTrue(Files.exists(pageB) && Files.size(pageB) > 0, + "page.bin missing or empty (multi)"); + assertArrayEquals(Files.readAllBytes(pageA), Files.readAllBytes(pageB), + "page.bin mismatch between multi-part and equivalent single document"); + + final Path revA = Path.of(a.getGeneratedRevision()); + final Path revB = Path.of(b.getGeneratedRevision()); + assertArrayEquals(Files.readAllBytes(revA), Files.readAllBytes(revB), + "revision.bin mismatch"); + + final Path textA = Path.of(a.getGeneratedText()); + final Path textB = Path.of(b.getGeneratedText()); + assertArrayEquals(Files.readAllBytes(textA), Files.readAllBytes(textB), + "text.bin mismatch"); + } +} diff --git a/dkpro-jwpl-mwdumper/src/main/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriter.java b/dkpro-jwpl-mwdumper/src/main/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriter.java new file mode 100644 index 00000000..5d1dc05e --- /dev/null +++ b/dkpro-jwpl-mwdumper/src/main/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriter.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.mwdumper.importer; + +import java.io.IOException; +import java.util.Objects; + +/** + * A {@link DumpWriter} decorator that collapses the per-part lifecycle events of a + * multi-file Wikipedia dump into a single logical document. + *

+ * Wikimedia ships large dumps split across several files (e.g. {@code pages-articles1.xml-p1p10.bz2}, + * {@code pages-articles1.xml-p11p20.bz2}); each file is a self-contained XML document with its own + * {@code } root and {@code } preamble. When those parts are parsed + * sequentially against the same {@link DumpWriter}, the delegate would otherwise receive repeated + * {@link #writeStartWiki()}/{@link #writeEndWiki()}/{@link #writeSiteinfo(Siteinfo)} events. + * This wrapper: + *

+ * Call {@link #finish()} exactly once after all parts have been parsed to emit the single + * {@code writeEndWiki()} and close the delegate. + *

+ * Thread-safety: not thread-safe. Mirroring the {@link DumpWriter} contract, instances + * are intended for single-threaded use — events from one parser at a time. In the multi-part + * pipeline that means all parts are parsed sequentially against the same wrapper. + */ +public final class MultiPartDumpWriter + implements DumpWriter +{ + + private final DumpWriter delegate; + private boolean wikiStarted; + private boolean siteinfoWritten; + private boolean finished; + + public MultiPartDumpWriter(DumpWriter delegate) + { + this.delegate = Objects.requireNonNull(delegate, "delegate must not be null"); + } + + @Override + public void writeStartWiki() throws IOException + { + if (!wikiStarted) { + delegate.writeStartWiki(); + wikiStarted = true; + } + } + + @Override + public void writeEndWiki() + { + // Deferred until finish() — each part emits but we only want one + // logical end-of-wiki event for the combined document. + } + + @Override + public void writeSiteinfo(Siteinfo info) throws IOException + { + if (!siteinfoWritten) { + delegate.writeSiteinfo(info); + siteinfoWritten = true; + } + } + + @Override + public void writeStartPage(Page page) throws IOException + { + delegate.writeStartPage(page); + } + + @Override + public void writeEndPage() throws IOException + { + delegate.writeEndPage(); + } + + @Override + public void writeRevision(Revision revision) throws IOException + { + delegate.writeRevision(revision); + } + + @Override + public void close() + { + // Deferred until finish() so per-part parses can reuse the same underlying writer. + } + + /** + * Emits the final {@code writeEndWiki()} to the delegate (only if at least one + * {@code writeStartWiki()} was observed) and closes it. Idempotent. + * + * @throws IOException Thrown on delegate I/O errors during end-of-wiki or close. + */ + public void finish() throws IOException + { + if (finished) { + return; + } + finished = true; + if (wikiStarted) { + delegate.writeEndWiki(); + } + delegate.close(); + } +} diff --git a/dkpro-jwpl-mwdumper/src/test/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriterTest.java b/dkpro-jwpl-mwdumper/src/test/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriterTest.java new file mode 100644 index 00000000..68e84301 --- /dev/null +++ b/dkpro-jwpl-mwdumper/src/test/java/org/dkpro/jwpl/mwdumper/importer/MultiPartDumpWriterTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.mwdumper.importer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Test; + +class MultiPartDumpWriterTest +{ + + private static final class RecordingDumpWriter + implements DumpWriter + { + final List events = new ArrayList<>(); + + @Override + public void close() + { + events.add("close"); + } + + @Override + public void writeStartWiki() + { + events.add("startWiki"); + } + + @Override + public void writeEndWiki() + { + events.add("endWiki"); + } + + @Override + public void writeSiteinfo(Siteinfo info) + { + events.add("siteinfo"); + } + + @Override + public void writeStartPage(Page page) + { + events.add("startPage:" + page.Id); + } + + @Override + public void writeEndPage() + { + events.add("endPage"); + } + + @Override + public void writeRevision(Revision revision) + { + events.add("revision:" + revision.Id); + } + } + + @Test + void requiresNonNullDelegate() + { + assertThrows(NullPointerException.class, () -> new MultiPartDumpWriter(null)); + } + + @Test + void collapsesLifecycleAndPassesThroughPageEvents() throws IOException + { + RecordingDumpWriter delegate = new RecordingDumpWriter(); + MultiPartDumpWriter sut = new MultiPartDumpWriter(delegate); + + // Part 1 + sut.writeStartWiki(); + sut.writeSiteinfo(new Siteinfo()); + Page page1 = new Page(); + page1.Id = 1; + sut.writeStartPage(page1); + Revision rev1 = new Revision(); + rev1.Id = 10; + sut.writeRevision(rev1); + sut.writeEndPage(); + sut.writeEndWiki(); // swallowed + sut.close(); // swallowed + + // Part 2 + sut.writeStartWiki(); // collapsed + sut.writeSiteinfo(new Siteinfo()); // collapsed + Page page2 = new Page(); + page2.Id = 2; + sut.writeStartPage(page2); + Revision rev2 = new Revision(); + rev2.Id = 20; + sut.writeRevision(rev2); + sut.writeEndPage(); + sut.writeEndWiki(); // swallowed + sut.close(); // swallowed + + sut.finish(); // emits endWiki + close exactly once + + assertEquals(List.of( + "startWiki", + "siteinfo", + "startPage:1", + "revision:10", + "endPage", + "startPage:2", + "revision:20", + "endPage", + "endWiki", + "close" + ), delegate.events); + } + + @Test + void finishWithoutStartWikiSkipsEndWikiButStillCloses() throws IOException + { + RecordingDumpWriter delegate = new RecordingDumpWriter(); + MultiPartDumpWriter sut = new MultiPartDumpWriter(delegate); + + sut.finish(); + + assertEquals(List.of("close"), delegate.events); + } + + @Test + void finishIsIdempotent() throws IOException + { + RecordingDumpWriter delegate = new RecordingDumpWriter(); + MultiPartDumpWriter sut = new MultiPartDumpWriter(delegate); + + sut.writeStartWiki(); + sut.finish(); + sut.finish(); + sut.finish(); + + assertEquals(List.of("startWiki", "endWiki", "close"), delegate.events); + } +} diff --git a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFiles.java b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFiles.java index 20f64484..13262b5e 100644 --- a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFiles.java +++ b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFiles.java @@ -19,6 +19,9 @@ import java.io.File; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import org.dkpro.jwpl.wikimachine.debug.ILogger; import org.dkpro.jwpl.wikimachine.domain.Files; @@ -32,7 +35,7 @@ public class TimeMachineFiles private static final String NO_METAHISTORY = "meta history file not found"; private static final String NO_PAGELINKS = "page links file not found"; - private String metaHistoryFile; + private final List metaHistoryFiles = new ArrayList<>(); private String pageLinksFile; private String categoryLinksFile; private String timeStamp = ""; @@ -45,7 +48,7 @@ public TimeMachineFiles(ILogger logger) public TimeMachineFiles(TimeMachineFiles files) { super(files); - this.metaHistoryFile = files.metaHistoryFile; + this.metaHistoryFiles.addAll(files.metaHistoryFiles); this.pageLinksFile = files.pageLinksFile; this.categoryLinksFile = files.categoryLinksFile; } @@ -60,14 +63,59 @@ public void setTimestamp(Timestamp timestamp) timeStamp = TimestampUtil.toMediaWikiString(timestamp) + File.separator; } + /** + * @return The first (or only) meta-history dump file, or {@code null} if none was configured. + * For multi-part dumps, prefer {@link #getMetaHistoryFiles()}. + */ public String getMetaHistoryFile() { - return metaHistoryFile; + return metaHistoryFiles.isEmpty() ? null : metaHistoryFiles.get(0); } + /** + * Replaces the meta-history configuration with the single given path. + * + * @param metaHistoryFile Absolute or relative path to the meta-history dump. May be + * {@code null} to clear the configuration. + */ public void setMetaHistoryFile(String metaHistoryFile) { - this.metaHistoryFile = metaHistoryFile; + this.metaHistoryFiles.clear(); + if (metaHistoryFile != null) { + this.metaHistoryFiles.add(metaHistoryFile); + } + } + + /** + * @return An unmodifiable view of the ordered meta-history dump parts. A single-file dump + * yields a list of size 1; never {@code null}. + */ + public List getMetaHistoryFiles() + { + return Collections.unmodifiableList(metaHistoryFiles); + } + + /** + * Replaces the meta-history configuration with the given ordered list of parts. The order + * must reflect the ascending page-range order expected by the downstream multi-part XML + * pipeline. + * + * @param metaHistoryFiles Ordered list of part paths. Must not be {@code null} or empty and + * must not contain {@code null} elements. + * @throws IllegalArgumentException If the argument violates the above. + */ + public void setMetaHistoryFiles(List metaHistoryFiles) + { + if (metaHistoryFiles == null || metaHistoryFiles.isEmpty()) { + throw new IllegalArgumentException("'metaHistoryFiles' must not be null or empty."); + } + for (int i = 0; i < metaHistoryFiles.size(); i++) { + if (metaHistoryFiles.get(i) == null) { + throw new IllegalArgumentException("'metaHistoryFiles[" + i + "]' is null."); + } + } + this.metaHistoryFiles.clear(); + this.metaHistoryFiles.addAll(metaHistoryFiles); } public String getPageLinksFile() @@ -111,9 +159,19 @@ protected String getOutputPath(String fileName) @Override public boolean checkAll() { - return checkOutputDirectory() - && checkInputFile(metaHistoryFile, NO_METAHISTORY) - && checkInputFile(pageLinksFile, NO_PAGELINKS) + if (!checkOutputDirectory()) { + return false; + } + if (metaHistoryFiles.isEmpty()) { + logger.log(NO_METAHISTORY); + return false; + } + for (String part : metaHistoryFiles) { + if (!checkInputFile(part, NO_METAHISTORY)) { + return false; + } + } + return checkInputFile(pageLinksFile, NO_PAGELINKS) && checkInputFile(categoryLinksFile, NO_CATEGORYLINKS); } } diff --git a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineGenerator.java b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineGenerator.java index 35193987..ed2e6951 100755 --- a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineGenerator.java +++ b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/domain/TimeMachineGenerator.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.io.InputStream; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; import org.dkpro.jwpl.wikimachine.domain.AbstractSnapshotGenerator; import org.dkpro.jwpl.wikimachine.domain.Files; @@ -138,35 +140,24 @@ private void processInputDumps() throws IOException private RevisionParser createRevisionParser() throws IOException { - - String metahistory = initialFiles.getMetaHistoryFile(); - - DumpTableInputStream revisionTableInputStream = envFactory - .getDumpTableInputStream(); - revisionTableInputStream.initialize(decompressor.getInputStream(metahistory), - DumpTableEnum.REVISION); + DumpTableInputStream revisionTableInputStream = envFactory.getDumpTableInputStream(); + revisionTableInputStream.initialize(openMetaHistoryStreams(), DumpTableEnum.REVISION); RevisionParser revisionParser = envFactory.getRevisionParser(); revisionParser.setInputStream(revisionTableInputStream); return revisionParser; - } private PageParser createPageParser() throws IOException { - - String metahistory = initialFiles.getMetaHistoryFile(); - DumpTableInputStream pageTableInputStream = envFactory.getDumpTableInputStream(); - pageTableInputStream.initialize(decompressor.getInputStream(metahistory), - DumpTableEnum.PAGE); + pageTableInputStream.initialize(openMetaHistoryStreams(), DumpTableEnum.PAGE); PageParser pageParser = envFactory.getPageParser(); pageParser.setInputStream(pageTableInputStream); return pageParser; - } private CategorylinksParser createCategorylinksParser() throws IOException @@ -191,18 +182,28 @@ private PagelinksParser createPagelinksParser() throws IOException private TextParser createTextParser() throws IOException { - - String metahistory = initialFiles.getMetaHistoryFile(); - DumpTableInputStream textTableInputStream = envFactory.getDumpTableInputStream(); - textTableInputStream.initialize(decompressor.getInputStream(metahistory), - DumpTableEnum.TEXT); + textTableInputStream.initialize(openMetaHistoryStreams(), DumpTableEnum.TEXT); TextParser textParser = envFactory.getTextParser(); textParser.setInputStream(textTableInputStream); return textParser; - } + /** + * Opens a decompressed stream per configured meta-history part, preserving order. A + * single-file dump yields a list of size 1; the call site hands the list to + * {@link DumpTableInputStream#initialize(List, DumpTableEnum)} which dispatches to the + * single- or multi-part SAX pipeline transparently. + */ + private List openMetaHistoryStreams() throws IOException + { + final List parts = initialFiles.getMetaHistoryFiles(); + final List streams = new ArrayList<>(parts.size()); + for (String part : parts) { + streams.add(decompressor.getInputStream(part)); + } + return streams; + } } diff --git a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStream.java b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStream.java index 29141a14..57e1a2c7 100755 --- a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStream.java +++ b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStream.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; +import java.util.List; import org.dkpro.jwpl.wikimachine.dump.xml.DumpTableEnum; import org.dkpro.jwpl.wikimachine.dump.xml.DumpTableInputStream; @@ -56,7 +57,30 @@ public class XMLDumpTableInputStream @Override public void initialize(InputStream inputStream, DumpTableEnum table) throws IOException { + final PipedOutputStream decodedStream = openPipe(); + xmlInputThread = new XMLDumpTableInputStreamThread(inputStream, decodedStream, table); + xmlInputThread.start(); + } + /** + * Multi-part equivalent of {@link #initialize(InputStream, DumpTableEnum)}. Each element of + * {@code inputStreams} is a self-contained Wikipedia XML dump part; SAX events across parts + * are collapsed into a single logical document before being written to the SQL sink. + * + * @param inputStreams Ordered list of XML part streams (ascending page-range). Must not be + * {@code null} or empty and must not contain {@code null} elements. + * @param table The type of table to dump. + * @throws IOException Thrown if IO errors occurred while setting up the pipe. + */ + public void initialize(List inputStreams, DumpTableEnum table) throws IOException + { + final PipedOutputStream decodedStream = openPipe(); + xmlInputThread = new XMLDumpTableInputStreamThread(inputStreams, decodedStream, table); + xmlInputThread.start(); + } + + private PipedOutputStream openPipe() throws IOException + { /* * piped input stream, that allows to read from a decodedStream */ @@ -67,10 +91,7 @@ public void initialize(InputStream inputStream, DumpTableEnum table) throws IOEx */ PipedOutputStream decodedStream = new PipedOutputStream(unbufferedResult); result = new BufferedInputStream(unbufferedResult, BUFFERSIZE); - - xmlInputThread = new XMLDumpTableInputStreamThread(inputStream, decodedStream, table); - xmlInputThread.start(); - + return decodedStream; } @Override diff --git a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java index c45e0146..799ad2f5 100755 --- a/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java +++ b/dkpro-jwpl-timemachine/src/main/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamThread.java @@ -21,10 +21,13 @@ import java.io.InputStream; import java.io.OutputStream; import java.lang.invoke.MethodHandles; +import java.util.List; +import org.dkpro.jwpl.mwdumper.importer.DumpWriter; import org.dkpro.jwpl.mwdumper.importer.NamespaceFilter; import org.dkpro.jwpl.wikimachine.dump.xml.AbstractXmlDumpReader; import org.dkpro.jwpl.wikimachine.dump.xml.DumpTableEnum; +import org.dkpro.jwpl.wikimachine.dump.xml.MultiPartXmlDumpReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,50 +42,86 @@ class XMLDumpTableInputStreamThread .getLogger(MethodHandles.lookup().lookupClass()); /** - * Enable the main and category pages as well as discussions + * Enable the main and category pages as well as discussions. */ private static final String ENABLED_NAMESPACES = "NS_MAIN,NS_TALK,NS_CATEGORY"; - /** - * Generalization {@link org.dkpro.jwpl.mwdumper.importer.XmlDumpReader} that parses the XML - * dump - */ - private AbstractXmlDumpReader xmlReader; + /** Parses the bound input into SQL. May throw {@link IOException}. */ + @FunctionalInterface + private interface ParseTask + { + void parse() throws IOException; + } - /** - * completion flag for a conversion process - */ + private final ParseTask parseTask; + private final Runnable abortAction; + + /** completion flag for the conversion process */ private boolean isComplete; /** - * Initiate input and output streams + * Drive the conversion of a single-file dump. * - * @param iStream - * XML input stream - * @param oStream - * SQL output stream - * @throws IOException - * Thrown in case errors occurred. + * @param iStream XML input stream. + * @param oStream SQL output stream. + * @param table Kind of table output expected. */ public XMLDumpTableInputStreamThread(InputStream iStream, OutputStream oStream, DumpTableEnum table) - throws IOException { super("xml2sql"); + final AbstractXmlDumpReader reader = readerFactoryFor(table) + .create(iStream, createWriter(oStream, table)); + this.parseTask = reader::readDump; + this.abortAction = reader::abort; + } + /** + * Drive the conversion of a multi-part dump. Each element of {@code iStreams} is a + * self-contained XML document; SAX events across parts are collapsed into a single + * logical document by {@link MultiPartXmlDumpReader}. + * + * @param iStreams Ordered list of XML part input streams (ascending page-range). + * @param oStream SQL output stream. + * @param table Kind of table output expected. + */ + public XMLDumpTableInputStreamThread(List iStreams, OutputStream oStream, + DumpTableEnum table) + { + super("xml2sql"); + final DumpWriter writer = createWriter(oStream, table); + final MultiPartXmlDumpReader.ReaderFactory factory = readerFactoryFor(table); + this.parseTask = () -> MultiPartXmlDumpReader.readDumps(iStreams, writer, factory); + // Abort is a best-effort signal to the single-file reader; the multi-part pipeline + // has no equivalent per-part hook, so it is a no-op here. + this.abortAction = () -> { /* no-op */ }; + } + + private static DumpWriter createWriter(OutputStream oStream, DumpTableEnum table) + { + switch (table) { + case PAGE: + return new NamespaceFilter(new PageWriter(oStream), ENABLED_NAMESPACES); + case REVISION: + return new NamespaceFilter(new RevisionWriter(oStream), ENABLED_NAMESPACES); + case TEXT: + return new NamespaceFilter(new TextWriter(oStream), ENABLED_NAMESPACES); + default: + throw new IllegalArgumentException("Unsupported table type: " + table); + } + } + + private static MultiPartXmlDumpReader.ReaderFactory readerFactoryFor(DumpTableEnum table) + { switch (table) { case PAGE: - xmlReader = new PageReader(iStream, - new NamespaceFilter(new PageWriter(oStream), ENABLED_NAMESPACES)); - break; + return PageReader::new; case REVISION: - xmlReader = new RevisionReader(iStream, - new NamespaceFilter(new RevisionWriter(oStream), ENABLED_NAMESPACES)); - break; + return RevisionReader::new; case TEXT: - xmlReader = new TextReader(iStream, - new NamespaceFilter(new TextWriter(oStream), ENABLED_NAMESPACES)); - break; + return TextReader::new; + default: + throw new IllegalArgumentException("Unsupported table type: " + table); } } @@ -91,7 +130,7 @@ public synchronized void run() { try { isComplete = false; - xmlReader.readDump(); + parseTask.parse(); isComplete = true; } catch (IOException e) { @@ -101,12 +140,15 @@ public synchronized void run() } /** - * Abort a conversion + * Abort a conversion. + *

+ * Only supported in single-file mode. In multi-part mode the abort flag is recorded but + * does not interrupt an in-flight SAX parse — callers must let the current part finish. */ public synchronized void abort() { if (!isComplete) { - xmlReader.abort(); + abortAction.run(); isComplete = true; } } diff --git a/dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFilesTest.java b/dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFilesTest.java index 32f3e391..724bdd18 100644 --- a/dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFilesTest.java +++ b/dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/domain/TimeMachineFilesTest.java @@ -18,8 +18,10 @@ package org.dkpro.jwpl.timemachine.domain; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -28,12 +30,16 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import org.dkpro.jwpl.timemachine.factory.DefaultTimeMachineEnvironmentFactory; import org.dkpro.jwpl.wikimachine.factory.IEnvironmentFactory; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullAndEmptySource; import org.junit.jupiter.params.provider.ValueSource; @@ -173,5 +179,57 @@ void testGetOutputPageRedirects() { void testGetOutputMetadata() { assertEquals(OUTPUT_DIR + File.separator + "MetaData.txt", tmFiles.getOutputMetadata()); } - + + @Test + void testGetMetaHistoryFilesReturnsSingletonForLegacySetter() { + assertEquals(List.of(mockMetaHistory.getAbsolutePath()), tmFiles.getMetaHistoryFiles()); + } + + @Test + void testSetMetaHistoryFileNullClearsList() { + tmFiles.setMetaHistoryFile(null); + assertTrue(tmFiles.getMetaHistoryFiles().isEmpty()); + assertNull(tmFiles.getMetaHistoryFile()); + } + + @Test + void testSetMetaHistoryFilesPreservesOrder(@TempDir Path dir) throws IOException { + Path p1 = Files.createFile( + dir.resolve("enwiki-20250601-pages-meta-history1.xml-p1p812.bz2")); + Path p2 = Files.createFile( + dir.resolve("enwiki-20250601-pages-meta-history1.xml-p813p1418.bz2")); + Path p3 = Files.createFile( + dir.resolve("enwiki-20250601-pages-meta-history2.xml-p1419p2000.bz2")); + + List parts = Arrays.asList(p1.toString(), p2.toString(), p3.toString()); + tmFiles.setMetaHistoryFiles(parts); + + assertEquals(parts, tmFiles.getMetaHistoryFiles()); + assertEquals(p1.toString(), tmFiles.getMetaHistoryFile()); + assertTrue(tmFiles.checkAll()); + } + + @Test + void testSetMetaHistoryFilesRejectsInvalid() { + assertThrows(IllegalArgumentException.class, () -> tmFiles.setMetaHistoryFiles(null)); + assertThrows(IllegalArgumentException.class, + () -> tmFiles.setMetaHistoryFiles(Collections.emptyList())); + assertThrows(IllegalArgumentException.class, + () -> tmFiles.setMetaHistoryFiles(Arrays.asList("a", null))); + } + + @Test + void testGetMetaHistoryFilesIsUnmodifiable() { + assertThrows(UnsupportedOperationException.class, + () -> tmFiles.getMetaHistoryFiles().add("should-fail")); + } + + @Test + void testCheckAllFailsIfAnyMetaHistoryPartMissing(@TempDir Path dir) throws IOException { + Path p1 = Files.createFile(dir.resolve("history1.bz2")); + Path p2 = dir.resolve("history2.bz2"); // not created + + tmFiles.setMetaHistoryFiles(Arrays.asList(p1.toString(), p2.toString())); + assertFalse(tmFiles.checkAll()); + } } diff --git a/dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamMultiPartTest.java b/dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamMultiPartTest.java new file mode 100644 index 00000000..f917ab70 --- /dev/null +++ b/dkpro-jwpl-timemachine/src/test/java/org/dkpro/jwpl/timemachine/dump/xml/XMLDumpTableInputStreamMultiPartTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.timemachine.dump.xml; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import org.dkpro.jwpl.wikimachine.dump.xml.DumpTableEnum; +import org.junit.jupiter.api.Test; + +/** + * Feeds two self-contained XML parts through the multi-part overload of + * {@link XMLDumpTableInputStream#initialize(List, DumpTableEnum)} and asserts the piped SQL + * output bytes equal the output of a single-document dump with the same pages. + */ +class XMLDumpTableInputStreamMultiPartTest +{ + + private static final String PART_HEADER = + "\n" + + "\n" + + " \n" + + " Test Wiki\n" + + " http://test.example/\n" + + " MediaWiki-test\n" + + " first-letter\n" + + " \n" + + " \n" + + " Talk\n" + + " \n" + + " \n"; + private static final String PART_FOOTER = "\n"; + + private static String pageBlock(int pageId, int revisionId, String title) + { + return " \n" + + " " + title + "\n" + + " " + pageId + "\n" + + " \n" + + " " + revisionId + "\n" + + " 2020-01-01T00:00:00Z\n" + + " \n" + + " Alice\n" + + " 100\n" + + " \n" + + " Body of " + title + "\n" + + " \n" + + " \n"; + } + + private static InputStream stream(String xml) + { + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } + + private static byte[] runSingle(String xml, DumpTableEnum table) throws IOException + { + XMLDumpTableInputStream sut = new XMLDumpTableInputStream(); + sut.initialize(stream(xml), table); + return drain(sut); + } + + private static byte[] runMulti(List parts, DumpTableEnum table) throws IOException + { + XMLDumpTableInputStream sut = new XMLDumpTableInputStream(); + sut.initialize(parts, table); + return drain(sut); + } + + private static byte[] drain(XMLDumpTableInputStream sut) throws IOException + { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final byte[] buf = new byte[4096]; + int n; + while ((n = sut.read(buf, 0, buf.length)) != -1) { + out.write(buf, 0, n); + } + sut.close(); + return out.toByteArray(); + } + + private static void assertMultiEqualsSingle(DumpTableEnum table) throws IOException + { + final String pageOne = pageBlock(1, 10, "Page One"); + final String pageTwo = pageBlock(2, 20, "Page Two"); + + final byte[] single = runSingle( + PART_HEADER + pageOne + pageTwo + PART_FOOTER, table); + final byte[] multi = runMulti( + List.of(stream(PART_HEADER + pageOne + PART_FOOTER), + stream(PART_HEADER + pageTwo + PART_FOOTER)), + table); + + assertTrue(single.length > 0, "empty single-doc output for " + table); + assertArrayEquals(single, multi, + "multi-part output diverges from single-document output for " + table); + } + + @Test + void multiPartPageTableMatchesSingleDocument() throws IOException + { + assertMultiEqualsSingle(DumpTableEnum.PAGE); + } + + @Test + void multiPartRevisionTableMatchesSingleDocumentOnIdFields() throws IOException + { + // RevisionWriter also persists the revision timestamp's millisecond component, which + // AbstractXmlDumpReader derives from a GregorianCalendar whose millis field is seeded + // with System.currentTimeMillis() at construction time — a pre-existing non-determinism + // that is orthogonal to multi-part handling. Compare the deterministic (pageId, revId) + // part of each 16-byte record instead. + final String pageOne = pageBlock(1, 10, "Page One"); + final String pageTwo = pageBlock(2, 20, "Page Two"); + + final byte[] single = runSingle( + PART_HEADER + pageOne + pageTwo + PART_FOOTER, DumpTableEnum.REVISION); + final byte[] multi = runMulti( + List.of(stream(PART_HEADER + pageOne + PART_FOOTER), + stream(PART_HEADER + pageTwo + PART_FOOTER)), + DumpTableEnum.REVISION); + + assertTrue(single.length > 0 && single.length == multi.length, + "revision binaries differ in length"); + // Each record: int pageId (4) + int revId (4) + long millis (8) = 16 bytes. + for (int i = 0; i < single.length; i += 16) { + assertArrayEquals( + java.util.Arrays.copyOfRange(single, i, i + 8), + java.util.Arrays.copyOfRange(multi, i, i + 8), + "pageId/revId diverge at record offset " + i); + } + } + + @Test + void multiPartTextTableMatchesSingleDocument() throws IOException + { + assertMultiEqualsSingle(DumpTableEnum.TEXT); + } + + @Test + void singleElementListMatchesSingleStream() throws IOException + { + final String doc = PART_HEADER + pageBlock(7, 77, "Solo") + PART_FOOTER; + + final byte[] fromSingle = runSingle(doc, DumpTableEnum.PAGE); + final byte[] fromList = runMulti(List.of(stream(doc)), DumpTableEnum.PAGE); + + assertArrayEquals(fromSingle, fromList); + } +} diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressor.java index 04da9507..e4f00d01 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressor.java @@ -27,6 +27,7 @@ import java.nio.file.InvalidPathException; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.List; /** * A common base {@link IDecompressor} implementation that provides methods to open @@ -128,4 +129,28 @@ private ClassLoader getContextClassLoader() { return Thread.currentThread().getContextClassLoader(); } + + /** + * Closes every element of {@code streams}, attaching any thrown {@link IOException} as + * suppressed to {@code primary}. Intended for cleanup after a partial open loop where + * several streams have already been created but the call must now unwind. + * + * @param streams Streams to close. {@code null} elements are tolerated. + * @param primary The in-flight exception that triggered cleanup; close failures are + * recorded as suppressed on it. + */ + protected static void closeQuietly(List streams, Throwable primary) + { + for (InputStream s : streams) { + if (s == null) { + continue; + } + try { + s.close(); + } + catch (IOException suppressed) { + primary.addSuppressed(suppressed); + } + } + } } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java index e1078c39..93618de2 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/BZip2Decompressor.java @@ -20,7 +20,11 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.SequenceInputStream; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; @@ -55,4 +59,29 @@ public InputStream getInputStream(Path resource) throws IOException checkResource(resource); return new BZip2CompressorInputStream(new BufferedInputStream(openStream(resource))); } + + /** + * {@inheritDoc} + */ + @Override + public InputStream getInputStreamSequence(List resources) throws IOException { + if (resources == null || resources.isEmpty()) { + throw new IllegalArgumentException("Can't process a 'null' or 'empty' resources list!"); + } + resources.forEach(this::checkResource); + // Decompress each part independently and concatenate the decompressed streams. + // This mirrors the gzip impl and avoids depending on the compressed-side + // multi-stream detection heuristic (which is sensitive to the underlying + // stream's available() at part boundaries). + final List streams = new ArrayList<>(resources.size()); + try { + for (Path p : resources) { + streams.add(new BZip2CompressorInputStream(new BufferedInputStream(openStream(p)))); + } + return new SequenceInputStream(Collections.enumeration(streams)); + } catch (IOException | RuntimeException e) { + closeQuietly(streams, e); + throw e; + } + } } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java index a4ec8b80..434a8b24 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressor.java @@ -20,7 +20,11 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.SequenceInputStream; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.zip.GZIPInputStream; /** @@ -54,4 +58,29 @@ public InputStream getInputStream(Path resource) throws IOException return new GZIPInputStream(new BufferedInputStream(openStream(resource))); } + /** + * {@inheritDoc} + */ + @Override + public InputStream getInputStreamSequence(List resources) throws IOException { + if (resources == null || resources.isEmpty()) { + throw new IllegalArgumentException("Can't process a 'null' or 'empty' resources list!"); + } + resources.forEach(this::checkResource); + // Wrap every part in its own GZIPInputStream and concatenate the decompressed streams. + // We previously fed a concatenated compressed stream to a single GZIPInputStream relying + // on RFC 1952 multi-member support, but GZIPInputStream detects subsequent members via + // the underlying stream's available() count — which is zero at SequenceInputStream's + // boundary between parts, so decoding stopped after the first part on some platforms. + final List streams = new ArrayList<>(resources.size()); + try { + for (Path p : resources) { + streams.add(new GZIPInputStream(new BufferedInputStream(openStream(p)))); + } + return new SequenceInputStream(Collections.enumeration(streams)); + } catch (IOException | RuntimeException e) { + closeQuietly(streams, e); + throw e; + } + } } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/IDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/IDecompressor.java index d4da93ce..7f2b9f98 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/IDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/IDecompressor.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Path; +import java.util.List; /** * Uses an archive file path and returns an {@link InputStream}. @@ -57,4 +58,30 @@ public interface IDecompressor * @throws IOException Thrown if (other) IO errors occurred. */ InputStream getInputStream(Path resource) throws IOException; + + + /** + * Attempts to open an {@link InputStream} to a compressed archive that is split + * across multiple files. These archives are combined over a sequence of files + * in a logical order — for example, by page-id ranges in ascending order. + * The returned stream yields the byte concatenation of the decompressed parts + * in the order provided. + *

+ * External archives are referenced via a relative or absolute path, including + * the actual file name of each resource. In case only plain file names are given + * and no directory or path elements are contained, an attempt is made to detect + * and load the resources from the classpath. + * + * @param resources References an archive via an ordered list of {@link Path paths} of all + * relevant files. Must not be {@code null}, not be {@code empty} and not + * refer to directories. All elements in {@code resources} must not + * be {@code null}. + * @return An open {@link InputStream} over the concatenated decompressed contents + * of the supplied parts. + * + * @throws IllegalArgumentException Thrown if {@code resources} is invalid. + * @throws IOException Thrown if (other) IO errors occurred or the archive format + * does not support multi-file sequences. + */ + InputStream getInputStreamSequence(List resources) throws IOException; } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressor.java index b37b1ef6..5d366cd3 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressor.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; +import java.util.List; import org.apache.commons.compress.archivers.sevenz.SevenZArchiveEntry; import org.apache.commons.compress.archivers.sevenz.SevenZFile; @@ -86,6 +87,14 @@ public InputStream getInputStream(Path resource) throws IOException { return null; } + /** + * {@inheritDoc} + */ + @Override + public InputStream getInputStreamSequence(List resources) { + throw new UnsupportedOperationException("Not supported yet."); + } + private static class SevenZipInputStreamWrapper extends FilterInputStream { private final SevenZFile archive; diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressor.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressor.java index 981bd270..7644aa61 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressor.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressor.java @@ -27,7 +27,9 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; /** @@ -260,12 +262,7 @@ public InputStream getInputStream(String resource) throws IOException { @Override public InputStream getInputStream(Path resource) throws IOException { - if (resource == null || resource.toString().isBlank()) { - throw new IllegalArgumentException("Can't load a 'null' or 'empty' resource!"); - } - if (Files.isDirectory(resource)) { - throw new InvalidPathException(resource.toString(), "Can't load a 'directory' as resource!"); - } + checkPath(resource); final String file = resource.toAbsolutePath().toString(); final String extension = detectExtension(file); @@ -283,7 +280,36 @@ else if (isInternalSupported(extension)) { } /** - * Check if the {@link File} specified via {@code fileName} exists. + * {@inheritDoc} + */ + @Override + public InputStream getInputStreamSequence(List resources) throws IOException { + if (resources == null || resources.isEmpty()) { + throw new IllegalArgumentException("Can't process a 'null' or 'empty' resources list!"); + } + // checkPath rejects null, blank, and directory entries for every element. + resources.forEach(this::checkPath); + final String extension = detectExtension(resources.get(0).toAbsolutePath().toString()); + // Every part must share the same extension: mixing would otherwise be silently + // misdecoded (first-entry's decoder applied to all bytes). + for (int i = 1; i < resources.size(); i++) { + final String partExtension = detectExtension(resources.get(i).toAbsolutePath().toString()); + if (!Objects.equals(extension, partExtension)) { + throw new IOException("Multi-file dumps must share a single archive format, " + + "got '" + extension + "' and '" + partExtension + "'."); + } + } + // 7z multi-file archives are not supported yet; only the internally supported + // streamable formats (bz2, gz) can be concatenated at the decompressor level. + if (isInternalSupported(extension) && !"7z".equals(extension)) { + return internalSupport.get(extension).getInputStreamSequence(resources); + } + throw new IOException("Multi-file dumps of '" + extension + "' archives " + + "are currently not supported."); + } + + /** + * Checks if the {@link File} specified via {@code fileName} exists. * * @param resource file path to check * @return {@code true} if the file exists and can be read, {@code false} otherwise. @@ -293,4 +319,20 @@ private boolean fileExists(Path resource) return Files.exists(resource); } + + /** + * Verifies the provided {@code resource} references a valid archive. + * + * @param resource The file's name or (relative) path to read the archive from. + * @throws IllegalArgumentException Thrown if parameters were invalid. + * @throws InvalidPathException Thrown if the parameter {@code resource} referred to a directory. + */ + private void checkPath(Path resource) { + if (resource == null || resource.toString().isBlank()) { + throw new IllegalArgumentException("Can't load a 'null' or 'empty' resource!"); + } + if (Files.isDirectory(resource)) { + throw new InvalidPathException(resource.toString(), "Can't load a 'directory' as resource!"); + } + } } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/AbstractXmlDumpReader.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/AbstractXmlDumpReader.java index a40e0746..6a3066c4 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/AbstractXmlDumpReader.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/AbstractXmlDumpReader.java @@ -234,6 +234,20 @@ public AbstractXmlDumpReader(InputStream inputStream, DumpWriter writer) * @throws IOException Thrown if errors occurred during parsing. */ public void readDump() throws IOException + { + doParse(); + writer.close(); + } + + /** + * SAX-parses the bound input stream against this handler, but does not close the + * {@link DumpWriter}. Exposed for multi-part pipelines where several readers share a single + * writer and the caller is responsible for closing it after the last part has been consumed. + * + * @throws IOException Thrown if errors occurred during parsing. + * @see org.dkpro.jwpl.mwdumper.importer.MultiPartDumpWriter + */ + protected void doParse() throws IOException { try { SAXParserFactory factory = SAXParserFactory.newInstance(); @@ -245,7 +259,6 @@ public void readDump() throws IOException catch (ParserConfigurationException | SAXException e) { throw new IOException(e); } - writer.close(); } /** diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/DumpTableInputStream.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/DumpTableInputStream.java index 3b359e38..b29b3a79 100644 --- a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/DumpTableInputStream.java +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/DumpTableInputStream.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.InputStream; +import java.util.List; /** * An abstraction of an {@link InputStream} for Wikipedia table dumps of three {@link DumpTableEnum types}. @@ -37,4 +38,28 @@ public abstract class DumpTableInputStream */ public abstract void initialize(InputStream inputStream, DumpTableEnum table) throws IOException; + + /** + * Multi-part counterpart of {@link #initialize(InputStream, DumpTableEnum)}. The default + * implementation transparently forwards a single-element list to the single-stream + * initializer and rejects larger lists with {@link UnsupportedOperationException}; + * subclasses that can read across a sequence of self-contained XML documents should + * override this method. + * + * @param inputStreams Ordered list of input streams. Must not be {@code null} or empty. + * @param table The {@link DumpTableEnum table type}. + * @throws IOException Thrown if IO errors occurred. + */ + public void initialize(List inputStreams, DumpTableEnum table) throws IOException + { + if (inputStreams == null || inputStreams.isEmpty()) { + throw new IllegalArgumentException("'inputStreams' must not be null or empty."); + } + if (inputStreams.size() == 1) { + initialize(inputStreams.get(0), table); + return; + } + throw new UnsupportedOperationException( + "Multi-part initialisation is not supported by " + getClass().getSimpleName()); + } } diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java new file mode 100644 index 00000000..3d7a7d94 --- /dev/null +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReader.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.wikimachine.dump.xml; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Objects; + +import org.dkpro.jwpl.mwdumper.importer.DumpWriter; +import org.dkpro.jwpl.mwdumper.importer.MultiPartDumpWriter; + +/** + * Parses a multi-file Wikipedia dump as a single logical document. + *

+ * Each Wikimedia dump part (e.g. {@code pages-articles1.xml-p1p10.bz2}) is a standalone + * XML document with its own {@code } root and {@code } preamble, so + * the decompressed byte concatenation is not well-formed XML and cannot be fed to a + * single {@link javax.xml.parsers.SAXParser#parse} call. This helper instead parses each + * part with a fresh {@link AbstractXmlDumpReader} while routing all events to one + * {@link MultiPartDumpWriter} — duplicate {@code writeStartWiki}/{@code writeSiteinfo}/ + * {@code writeEndWiki} events across parts are collapsed, and the underlying delegate + * writer only sees one logical document. + */ +public final class MultiPartXmlDumpReader +{ + + /** + * Factory that produces a fresh {@link AbstractXmlDumpReader} (typically a concrete + * subclass such as {@code SimpleXmlDumpReader} or a {@code timemachine} reader) for + * a given part's {@link InputStream} and the shared {@link DumpWriter}. Callers + * normally pass a constructor reference such as {@code SimpleXmlDumpReader::new}. + */ + @FunctionalInterface + public interface ReaderFactory + { + /** + * @param in The part's input stream. + * @param writer The shared dump writer (already wrapped in a + * {@link MultiPartDumpWriter} by {@link #readDumps}). + * @return A fresh reader bound to {@code in} and {@code writer}. + */ + AbstractXmlDumpReader create(InputStream in, DumpWriter writer); + } + + private MultiPartXmlDumpReader() + { + // static-only + } + + /** + * Parses every part in {@code parts} against the same {@code writer}. Events from + * the individual parts are funnelled through a {@link MultiPartDumpWriter} so the + * delegate observes the combined stream as a single {@code } document. + *

+ * This method takes ownership of the supplied streams: every element of {@code parts} + * is {@link InputStream#close() closed} before {@code readDumps} returns (on both the + * success and failure paths), and the delegate writer is closed via the wrapper's + * {@code finish()} exactly once at the end. Exceptions raised by stream close or by + * the wrapper's final flush are attached as suppressed to any primary error from the + * parse. + * + * @param parts Ordered list of decompressed XML {@link InputStream streams}. Must + * not be {@code null}, must not be empty, and must not contain + * {@code null} elements. + * @param writer The underlying {@link DumpWriter} to flush events into. + * @param factory Instantiates a fresh reader per part — typically a method reference + * such as {@code SimpleXmlDumpReader::new}. + * @throws IOException Thrown on I/O or SAX errors encountered while parsing + * any part, or while closing the supplied streams. + * @throws IllegalArgumentException If {@code parts} is null, empty, or contains a null + * element. + */ + public static void readDumps(List parts, DumpWriter writer, ReaderFactory factory) + throws IOException + { + if (parts == null || parts.isEmpty()) { + throw new IllegalArgumentException("'parts' must not be null or empty."); + } + Objects.requireNonNull(writer, "'writer' must not be null."); + Objects.requireNonNull(factory, "'factory' must not be null."); + for (int i = 0; i < parts.size(); i++) { + if (parts.get(i) == null) { + throw new IllegalArgumentException("'parts[" + i + "]' is null."); + } + } + + final MultiPartDumpWriter wrapper = new MultiPartDumpWriter(writer); + Throwable primary = null; + try { + for (InputStream part : parts) { + factory.create(part, wrapper).doParse(); + } + } + catch (IOException | RuntimeException e) { + primary = e; + } + // We took ownership of every stream — close them all, regardless of outcome. + for (InputStream part : parts) { + primary = closeAndChain(part, primary); + } + // finish() is idempotent; emits a single writeEndWiki (if any startWiki was seen) + // and closes the delegate writer. + try { + wrapper.finish(); + } + catch (IOException e) { + primary = chain(primary, e); + } + + if (primary instanceof IOException) { + throw (IOException) primary; + } + if (primary instanceof RuntimeException) { + throw (RuntimeException) primary; + } + } + + private static Throwable closeAndChain(InputStream stream, Throwable primary) + { + try { + stream.close(); + return primary; + } + catch (IOException e) { + return chain(primary, e); + } + } + + private static Throwable chain(Throwable primary, Throwable next) + { + if (primary == null) { + return next; + } + primary.addSuppressed(next); + return primary; + } +} diff --git a/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscovery.java b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscovery.java new file mode 100644 index 00000000..43e49891 --- /dev/null +++ b/dkpro-jwpl-wikimachine/src/main/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscovery.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.wikimachine.util; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Helpers to recognise and order Wikimedia multi-part dump files. + *

+ * Wikimedia publishes large XML dumps split across several files using the naming scheme + * {@code -.xml-pp.} (for example + * {@code dewiki-20260101-pages-articles1.xml-p1p297012.bz2}, + * {@code dewiki-20260101-pages-articles2.xml-p297013p1262093.bz2}). Older / smaller dumps use the + * single-file scheme {@code -.xml.}. This utility: + *

+ * All methods tolerate absolute paths: only the file name is inspected. + */ +public final class DumpFileDiscovery +{ + + /** + * Suffix that marks a multi-part dump file; captures the start and end page id of the range. + * Example match on {@code foo-pages-articles1.xml-p297013p1262093.bz2} yields start=297013, + * end=1262093. + */ + private static final Pattern PAGE_RANGE = Pattern.compile("-p(\\d+)p(\\d+)(?=\\.)"); + + private DumpFileDiscovery() + { + // static-only + } + + /** + * @param fileName A file name or path whose last component is inspected. + * @return {@code true} if the name carries the multi-part suffix {@code -pp.}. + */ + public static boolean hasPageRange(String fileName) + { + if (fileName == null) { + return false; + } + return PAGE_RANGE.matcher(lastSegment(fileName)).find(); + } + + /** + * @param file Any {@link File}, absolute or relative. + * @return {@code true} if the file's name carries the multi-part suffix. + */ + public static boolean hasPageRange(File file) + { + return file != null && hasPageRange(file.getName()); + } + + /** + * Matches a filename against a known Wikimedia dump role under either naming scheme: + * + * The matcher is anchored on the role substring and the {@code .xml} marker, so similarly + * named dumps such as {@code pages-articles-multistream.xml.bz2} are correctly rejected when + * the requested role is {@code pages-articles}. + * + * @param fileName File name (or path whose last segment is the file name). + * @param role Role token as it appears in the dump name, e.g. {@code pages-articles}, + * {@code pages-meta-current}, {@code pages-meta-history}. + * @param extensions Supported archive extensions without dot, e.g. {@code ["bz2", "gz", "7z"]}. + * @return {@code true} if {@code fileName} matches the role under either scheme. + */ + public static boolean matchesRole(String fileName, String role, Collection extensions) + { + if (fileName == null || role == null || extensions == null || extensions.isEmpty()) { + return false; + } + final String name = lastSegment(fileName); + final String extAlt = String.join("|", extensions); + // Either: ....xml. + // or: ...\d+.xml-pp. + final Pattern p = Pattern.compile( + ".*" + Pattern.quote(role) + "(\\d+\\.xml-p\\d+p\\d+|\\.xml)\\.(" + extAlt + ")$"); + return p.matcher(name).matches(); + } + + /** + * Returns a new list containing {@code files} ordered for multi-part consumption: files with a + * {@code -pp} suffix are sorted by ascending start page id; files without such a + * suffix preserve their relative input order and come first. Stable for equal starts. + * + * @param files Input files in any order. {@code null} elements are rejected. + * @return A new ordered {@link List}. + * @throws IllegalArgumentException If {@code files} is {@code null} or contains a null element. + */ + public static List orderByPageRange(Collection files) + { + if (files == null) { + throw new IllegalArgumentException("'files' must not be null."); + } + final List out = new ArrayList<>(files.size()); + for (File f : files) { + if (f == null) { + throw new IllegalArgumentException("'files' contains a null element."); + } + out.add(f); + } + out.sort(Comparator.comparingLong(DumpFileDiscovery::pageRangeStart)); + return out; + } + + /** + * @return The start page id encoded in the file name's {@code -pp} suffix, or + * {@link Long#MIN_VALUE} if absent. Files without a range therefore sort before + * ranged parts in {@link #orderByPageRange}. + */ + private static long pageRangeStart(File file) + { + if (file == null) { + return Long.MIN_VALUE; + } + final Matcher m = PAGE_RANGE.matcher(file.getName()); + if (!m.find()) { + return Long.MIN_VALUE; + } + try { + return Long.parseLong(m.group(1)); + } + catch (NumberFormatException e) { + return Long.MIN_VALUE; + } + } + + private static String lastSegment(String path) + { + final int slash = Math.max(path.lastIndexOf('/'), path.lastIndexOf('\\')); + return slash < 0 ? path : path.substring(slash + 1); + } +} diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressorTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressorTest.java index 9822c015..71a93704 100644 --- a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressorTest.java +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/AbstractDecompressorTest.java @@ -26,6 +26,9 @@ import java.nio.charset.StandardCharsets; import java.nio.file.InvalidPathException; import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -57,7 +60,47 @@ void testGetInputStreamThrowsWithNull() { void testGetInputStreamThrowsWithDirectory(@TempDir Path input) { assertThrows(InvalidPathException.class, () -> getDecompressor().getInputStream(input)); } - + + /** + * Exception type thrown for contract violations of + * {@link IDecompressor#getInputStreamSequence(List)} — IAE/InvalidPathException + * for formats that support the multi-file contract. Subclasses that don't + * (e.g. 7z) can override to assert {@link UnsupportedOperationException}. + */ + protected Class expectedSequenceValidationException() { + return IllegalArgumentException.class; + } + + protected Class expectedSequenceDirectoryException() { + return InvalidPathException.class; + } + + @Test + void testGetInputStreamSequenceThrowsOnNullList() { + assertThrows(expectedSequenceValidationException(), + () -> getDecompressor().getInputStreamSequence(null)); + } + + @Test + void testGetInputStreamSequenceThrowsOnEmptyList() { + assertThrows(expectedSequenceValidationException(), + () -> getDecompressor().getInputStreamSequence(Collections.emptyList())); + } + + @Test + void testGetInputStreamSequenceThrowsOnNullElement() { + final List withNull = Arrays.asList((Path) null); + assertThrows(expectedSequenceValidationException(), + () -> getDecompressor().getInputStreamSequence(withNull)); + } + + @Test + void testGetInputStreamSequenceThrowsOnDirectoryElement(@TempDir Path dir) { + final List withDir = List.of(dir); + assertThrows(expectedSequenceDirectoryException(), + () -> getDecompressor().getInputStreamSequence(withDir)); + } + protected void getAndCheck(String input) throws IOException { try (InputStream in = getDecompressor().getInputStream(input)) { assertNotNull(in); diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/BZip2DecompressorTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/BZip2DecompressorTest.java index 9368575f..5ce18ea6 100644 --- a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/BZip2DecompressorTest.java +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/BZip2DecompressorTest.java @@ -17,9 +17,21 @@ */ package org.dkpro.jwpl.wikimachine.decompression; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -43,4 +55,36 @@ protected IDecompressor getDecompressor() { void testGetInputStream(String input) throws IOException { getAndCheck(input); } + + @Test + void testGetInputStreamSequenceConcatenatesParts(@TempDir Path dir) throws IOException { + final String contentA = "part-a payload\n"; + final String contentB = "part-b payload\n"; + final Path partA = writeBz2(dir.resolve("dump.xml-p1p10.bz2"), contentA); + final Path partB = writeBz2(dir.resolve("dump.xml-p11p20.bz2"), contentB); + + try (InputStream in = decomp.getInputStreamSequence(List.of(partA, partB))) { + assertNotNull(in); + final String decompressed = new String(in.readAllBytes(), StandardCharsets.UTF_8); + assertEquals(contentA + contentB, decompressed); + } + } + + @Test + void testGetInputStreamSequenceSinglePartEqualsSingleFile(@TempDir Path dir) throws IOException { + final String content = "lonely payload\n"; + final Path part = writeBz2(dir.resolve("dump.xml-p1p10.bz2"), content); + + try (InputStream in = decomp.getInputStreamSequence(List.of(part))) { + assertNotNull(in); + assertEquals(content, new String(in.readAllBytes(), StandardCharsets.UTF_8)); + } + } + + private static Path writeBz2(Path out, String content) throws IOException { + try (OutputStream os = new BZip2CompressorOutputStream(Files.newOutputStream(out))) { + os.write(content.getBytes(StandardCharsets.UTF_8)); + } + return out; + } } diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressorTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressorTest.java index 030be11f..041a45d8 100644 --- a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressorTest.java +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/GZipDecompressorTest.java @@ -17,9 +17,21 @@ */ package org.dkpro.jwpl.wikimachine.decompression; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.zip.GZIPOutputStream; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -43,4 +55,36 @@ protected IDecompressor getDecompressor() { void testGetInputStream(String input) throws IOException { getAndCheck(input); } + + @Test + void testGetInputStreamSequenceConcatenatesParts(@TempDir Path dir) throws IOException { + final String contentA = "part-a payload\n"; + final String contentB = "part-b payload\n"; + final Path partA = writeGz(dir.resolve("dump.xml-p1p10.gz"), contentA); + final Path partB = writeGz(dir.resolve("dump.xml-p11p20.gz"), contentB); + + try (InputStream in = decomp.getInputStreamSequence(List.of(partA, partB))) { + assertNotNull(in); + final String decompressed = new String(in.readAllBytes(), StandardCharsets.UTF_8); + assertEquals(contentA + contentB, decompressed); + } + } + + @Test + void testGetInputStreamSequenceSinglePartEqualsSingleFile(@TempDir Path dir) throws IOException { + final String content = "lonely payload\n"; + final Path part = writeGz(dir.resolve("dump.xml-p1p10.gz"), content); + + try (InputStream in = decomp.getInputStreamSequence(List.of(part))) { + assertNotNull(in); + assertEquals(content, new String(in.readAllBytes(), StandardCharsets.UTF_8)); + } + } + + private static Path writeGz(Path out, String content) throws IOException { + try (OutputStream os = new GZIPOutputStream(Files.newOutputStream(out))) { + os.write(content.getBytes(StandardCharsets.UTF_8)); + } + return out; + } } diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressorTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressorTest.java index e4118ec5..64b3464c 100644 --- a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressorTest.java +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/SevenZipDecompressorTest.java @@ -23,6 +23,8 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Path; +import java.util.List; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; @@ -45,6 +47,20 @@ protected IDecompressor getDecompressor() { return decomp; } + /** + * 7z does not support multi-file sequences: every invocation fails fast with + * {@link UnsupportedOperationException}, regardless of input validity. + */ + @Override + protected Class expectedSequenceValidationException() { + return UnsupportedOperationException.class; + } + + @Override + protected Class expectedSequenceDirectoryException() { + return UnsupportedOperationException.class; + } + @ParameterizedTest @ValueSource(strings = {"archive.txt.7z", "src/test/resources/archive.txt.7z"}) void testGetInputStream(String input) throws IOException { @@ -62,4 +78,10 @@ void testGetInputStreamWithRandomResourceName() throws IOException { final InputStream in = getDecompressor().getInputStream(UUID.randomUUID().toString()); assertNull(in); } + + @Test + void testGetInputStreamSequenceThrowsUnsupported() { + assertThrows(UnsupportedOperationException.class, + () -> decomp.getInputStreamSequence(List.of(Path.of("archive.txt.7z")))); + } } diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressorTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressorTest.java index 60563be7..a70466bd 100644 --- a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressorTest.java +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/decompression/UniversalDecompressorTest.java @@ -19,17 +19,23 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.util.List; +import java.util.zip.GZIPOutputStream; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; import org.junit.jupiter.api.io.TempDir; @@ -109,4 +115,64 @@ void testGetInputStreamWithExternalConfig(String input) throws IOException { assertEquals(EXPECTED_CONTENT, content); } } + + @Test + void testGetInputStreamSequenceDispatchesBz2() throws IOException { + final String a = "alpha\n"; + final String b = "beta\n"; + final Path partA = writeBz2(tmpDir.resolve("dump.xml-p1p10.bz2"), a); + final Path partB = writeBz2(tmpDir.resolve("dump.xml-p11p20.bz2"), b); + + try (InputStream in = udc.getInputStreamSequence(List.of(partA, partB))) { + assertNotNull(in); + assertEquals(a + b, new String(in.readAllBytes(), StandardCharsets.UTF_8)); + } + } + + @Test + void testGetInputStreamSequenceDispatchesGz() throws IOException { + final String a = "alpha\n"; + final String b = "beta\n"; + final Path partA = writeGz(tmpDir.resolve("dump.xml-p1p10.gz"), a); + final Path partB = writeGz(tmpDir.resolve("dump.xml-p11p20.gz"), b); + + try (InputStream in = udc.getInputStreamSequence(List.of(partA, partB))) { + assertNotNull(in); + assertEquals(a + b, new String(in.readAllBytes(), StandardCharsets.UTF_8)); + } + } + + @Test + void testGetInputStreamSequenceRejects7z() throws IOException { + // Create an empty placeholder so checkPath passes (no file-existence check). + final Path part = Files.createFile(tmpDir.resolve("dump.xml-p1p10.7z")); + assertThrows(IOException.class, () -> udc.getInputStreamSequence(List.of(part))); + } + + @Test + void testGetInputStreamSequenceRejectsUnsupportedExtension() throws IOException { + final Path part = Files.createFile(tmpDir.resolve("dump.xml-p1p10.rar")); + assertThrows(IOException.class, () -> udc.getInputStreamSequence(List.of(part))); + } + + @Test + void testGetInputStreamSequenceRejectsMixedExtensions() throws IOException { + final Path bz2 = writeBz2(tmpDir.resolve("dump.xml-p1p10.bz2"), "x\n"); + final Path gz = writeGz(tmpDir.resolve("dump.xml-p11p20.gz"), "y\n"); + assertThrows(IOException.class, () -> udc.getInputStreamSequence(List.of(bz2, gz))); + } + + private static Path writeBz2(Path out, String content) throws IOException { + try (OutputStream os = new BZip2CompressorOutputStream(Files.newOutputStream(out))) { + os.write(content.getBytes(StandardCharsets.UTF_8)); + } + return out; + } + + private static Path writeGz(Path out, String content) throws IOException { + try (OutputStream os = new GZIPOutputStream(Files.newOutputStream(out))) { + os.write(content.getBytes(StandardCharsets.UTF_8)); + } + return out; + } } diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReaderTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReaderTest.java new file mode 100644 index 00000000..52f85f82 --- /dev/null +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/dump/xml/MultiPartXmlDumpReaderTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.wikimachine.dump.xml; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.dkpro.jwpl.mwdumper.importer.DumpWriter; +import org.dkpro.jwpl.mwdumper.importer.Page; +import org.dkpro.jwpl.mwdumper.importer.Revision; +import org.dkpro.jwpl.mwdumper.importer.Siteinfo; +import org.junit.jupiter.api.Test; + +class MultiPartXmlDumpReaderTest +{ + + private static final String PART_HEADER = + "\n" + + "\n" + + " \n" + + " Test Wiki\n" + + " http://test.example/\n" + + " MediaWiki-test\n" + + " first-letter\n" + + " \n" + + " \n" + + " Talk\n" + + " \n" + + " \n"; + + private static final String PART_FOOTER = "\n"; + + private static String part(int pageId, int revisionId, String title) + { + return PART_HEADER + + " \n" + + " " + title + "\n" + + " " + pageId + "\n" + + " \n" + + " " + revisionId + "\n" + + " 2020-01-01T00:00:00Z\n" + + " \n" + + " Alice\n" + + " 100\n" + + " \n" + + " Body of " + title + "\n" + + " \n" + + " \n" + + PART_FOOTER; + } + + private static InputStream stream(String xml) + { + return new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + } + + @Test + void parsesTwoPartsAsSingleLogicalDocument() throws IOException + { + final RecordingDumpWriter delegate = new RecordingDumpWriter(); + + MultiPartXmlDumpReader.readDumps( + List.of(stream(part(1, 10, "Page One")), stream(part(2, 20, "Page Two"))), + delegate, + WikiXMLDumpReader::new); + + assertEquals(List.of( + "startWiki", + "siteinfo", + "startPage:1:Page One", + "revision:10", + "endPage", + "startPage:2:Page Two", + "revision:20", + "endPage", + "endWiki", + "close" + ), delegate.events); + } + + @Test + void singlePartBehavesLikeSingleDocument() throws IOException + { + final RecordingDumpWriter delegate = new RecordingDumpWriter(); + + MultiPartXmlDumpReader.readDumps( + List.of(stream(part(7, 77, "Solo"))), + delegate, + WikiXMLDumpReader::new); + + assertEquals(List.of( + "startWiki", + "siteinfo", + "startPage:7:Solo", + "revision:77", + "endPage", + "endWiki", + "close" + ), delegate.events); + } + + @Test + void rejectsNullPartsList() + { + assertThrows(IllegalArgumentException.class, () -> + MultiPartXmlDumpReader.readDumps(null, new RecordingDumpWriter(), WikiXMLDumpReader::new)); + } + + @Test + void rejectsEmptyPartsList() + { + assertThrows(IllegalArgumentException.class, () -> + MultiPartXmlDumpReader.readDumps(Collections.emptyList(), + new RecordingDumpWriter(), WikiXMLDumpReader::new)); + } + + @Test + void rejectsNullElementInPartsList() + { + final List parts = new ArrayList<>(); + parts.add(stream(part(1, 10, "Page One"))); + parts.add(null); + assertThrows(IllegalArgumentException.class, () -> + MultiPartXmlDumpReader.readDumps(parts, new RecordingDumpWriter(), WikiXMLDumpReader::new)); + } + + @Test + void closesDelegateEvenIfParsingFails() + { + final RecordingDumpWriter delegate = new RecordingDumpWriter(); + final InputStream malformed = stream("oops"); + + assertThrows(IOException.class, () -> + MultiPartXmlDumpReader.readDumps(List.of(malformed), delegate, WikiXMLDumpReader::new)); + + // Even on failure, the delegate writer must be closed — nothing else leaked. + assertEquals(List.of("close"), delegate.events); + } + + @Test + void closesEveryPartStreamOnSuccess() throws IOException + { + final CountingInputStream a = new CountingInputStream(stream(part(1, 10, "A"))); + final CountingInputStream b = new CountingInputStream(stream(part(2, 20, "B"))); + + MultiPartXmlDumpReader.readDumps(List.of(a, b), + new RecordingDumpWriter(), WikiXMLDumpReader::new); + + // SAXParser.parse also closes the stream internally in some JDKs; close() is + // idempotent on InputStream so we only require that each part was closed at least + // once (ownership transferred, no leak). + assertTrue(a.closed.get() >= 1, "first part stream not closed"); + assertTrue(b.closed.get() >= 1, "second part stream not closed"); + } + + @Test + void closesEveryPartStreamOnFailure() + { + // Second part is malformed so parsing fails mid-list; both parts must still be closed. + final CountingInputStream good = new CountingInputStream(stream(part(1, 10, "A"))); + final CountingInputStream bad = new CountingInputStream( + stream("oops")); + + assertThrows(IOException.class, () -> + MultiPartXmlDumpReader.readDumps(List.of(good, bad), + new RecordingDumpWriter(), WikiXMLDumpReader::new)); + + assertTrue(good.closed.get() >= 1, "good part stream not closed"); + assertTrue(bad.closed.get() >= 1, "bad part stream not closed"); + } + + private static final class CountingInputStream extends FilterInputStream + { + final AtomicInteger closed = new AtomicInteger(); + + CountingInputStream(InputStream delegate) + { + super(delegate); + } + + @Override + public void close() throws IOException + { + closed.incrementAndGet(); + super.close(); + } + } + + private static final class RecordingDumpWriter + implements DumpWriter + { + final List events = new ArrayList<>(); + + @Override + public void close() + { + events.add("close"); + } + + @Override + public void writeStartWiki() + { + events.add("startWiki"); + } + + @Override + public void writeEndWiki() + { + events.add("endWiki"); + } + + @Override + public void writeSiteinfo(Siteinfo info) + { + events.add("siteinfo"); + } + + @Override + public void writeStartPage(Page page) + { + events.add("startPage:" + page.Id + ":" + page.Title); + } + + @Override + public void writeEndPage() + { + events.add("endPage"); + } + + @Override + public void writeRevision(Revision revision) + { + events.add("revision:" + revision.Id); + } + } +} diff --git a/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscoveryTest.java b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscoveryTest.java new file mode 100644 index 00000000..fbadb6ba --- /dev/null +++ b/dkpro-jwpl-wikimachine/src/test/java/org/dkpro/jwpl/wikimachine/util/DumpFileDiscoveryTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Technische Universität Darmstadt under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The Technische Universität Darmstadt + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dkpro.jwpl.wikimachine.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class DumpFileDiscoveryTest +{ + + private static final Set EXTENSIONS = Set.of("bz2", "gz", "7z"); + + // hasPageRange ------------------------------------------------------------- + + @ParameterizedTest + @ValueSource(strings = { + "dewiki-20260101-pages-articles1.xml-p1p297012.bz2", + "enwiki-20250601-pages-meta-history2.xml-p297013p1262093.bz2", + "/tmp/enwiki-20250601-pages-articles1.xml-p1p812.bz2", + "foo-pages-articles27.xml-p0p999999.7z" + }) + void hasPageRangeTrue(String name) + { + assertTrue(DumpFileDiscovery.hasPageRange(name)); + } + + @ParameterizedTest + @ValueSource(strings = { + "dewiki-20260101-pages-articles.xml.bz2", + "pagelinks.sql.gz", + "random.txt", + "pages-articles1.xml-p1.bz2" + }) + void hasPageRangeFalse(String name) + { + assertFalse(DumpFileDiscovery.hasPageRange(name)); + } + + @Test + void hasPageRangeHandlesNull() + { + assertFalse(DumpFileDiscovery.hasPageRange((String) null)); + assertFalse(DumpFileDiscovery.hasPageRange((File) null)); + } + + // matchesRole -------------------------------------------------------------- + + @ParameterizedTest + @ValueSource(strings = { + "dewiki-20260101-pages-articles.xml.bz2", + "enwiki-20250601-pages-articles.xml.gz", + "dewiki-20260101-pages-articles1.xml-p1p297012.bz2", + "dewiki-20260101-pages-articles3.xml-p2762094p3376257.bz2" + }) + void matchesRolePagesArticles(String name) + { + assertTrue(DumpFileDiscovery.matchesRole(name, "pages-articles", EXTENSIONS)); + } + + @ParameterizedTest + @ValueSource(strings = { + "dewiki-20260101-pages-articles-multistream.xml.bz2", + "dewiki-20260101-pages-articles-multistream-index.txt.bz2", + "dewiki-20260101-pages-meta-current.xml.bz2", + "pagelinks.sql.gz", + "dewiki-20260101-pages-articles.xml.rar" + }) + void doesNotMatchRolePagesArticles(String name) + { + assertFalse(DumpFileDiscovery.matchesRole(name, "pages-articles", EXTENSIONS)); + } + + @Test + void matchesRoleMetaCurrent() + { + assertTrue(DumpFileDiscovery.matchesRole( + "enwiki-20250601-pages-meta-current.xml.bz2", "pages-meta-current", EXTENSIONS)); + assertTrue(DumpFileDiscovery.matchesRole( + "enwiki-20250601-pages-meta-current1.xml-p1p100.bz2", + "pages-meta-current", EXTENSIONS)); + } + + @Test + void matchesRoleHandlesNullsAndEmptyExtensions() + { + assertFalse(DumpFileDiscovery.matchesRole(null, "pages-articles", EXTENSIONS)); + assertFalse(DumpFileDiscovery.matchesRole("x.bz2", null, EXTENSIONS)); + assertFalse(DumpFileDiscovery.matchesRole("x.bz2", "pages-articles", null)); + assertFalse(DumpFileDiscovery.matchesRole( + "x.bz2", "pages-articles", Collections.emptySet())); + } + + // orderByPageRange --------------------------------------------------------- + + @Test + void orderByPageRangeSortsByStart() + { + final File a = new File("dewiki-20260101-pages-articles2.xml-p297013p1262093.bz2"); + final File b = new File("dewiki-20260101-pages-articles1.xml-p1p297012.bz2"); + final File c = new File("dewiki-20260101-pages-articles3.xml-p2762094p3376257.bz2"); + final File d = new File("dewiki-20260101-pages-articles3.xml-p1262094p2762093.bz2"); + + final List ordered = DumpFileDiscovery.orderByPageRange(Arrays.asList(a, b, c, d)); + + assertEquals(Arrays.asList(b, a, d, c), ordered); + } + + @Test + void orderByPageRangePutsUnrangedFirstStable() + { + final File single = new File("dewiki-20260101-pages-articles.xml.bz2"); + final File p2 = new File("dewiki-20260101-pages-articles2.xml-p297013p1262093.bz2"); + final File p1 = new File("dewiki-20260101-pages-articles1.xml-p1p297012.bz2"); + + final List ordered = DumpFileDiscovery.orderByPageRange(Arrays.asList(p2, single, p1)); + + assertEquals(Arrays.asList(single, p1, p2), ordered); + } + + @Test + void orderByPageRangeReturnsEmptyForEmptyInput() + { + assertTrue(DumpFileDiscovery.orderByPageRange(Collections.emptyList()).isEmpty()); + } + + @Test + void orderByPageRangeRejectsNullInput() + { + assertThrows(IllegalArgumentException.class, + () -> DumpFileDiscovery.orderByPageRange(null)); + } + + @Test + void orderByPageRangeRejectsNullElement() + { + assertThrows(IllegalArgumentException.class, + () -> DumpFileDiscovery.orderByPageRange(Arrays.asList( + new File("pages-articles1.xml-p1p10.bz2"), null))); + } +}