Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hugegraph.backend.id.SnowflakeIdGenerator;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.serializer.AbstractSerializer;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.backend.serializer.SerializerFactory;
import org.apache.hugegraph.backend.store.BackendFeatures;
import org.apache.hugegraph.backend.store.BackendProviderFactory;
Expand Down Expand Up @@ -233,6 +234,8 @@ public StandardHugeGraph(HugeConfig config) {

LockUtil.init(this.spaceGraphName());

BytesBuffer.setMaxBufferCapacity(
Comment thread
LegendPei marked this conversation as resolved.
Outdated
config.get(CoreOptions.SERIALIZER_BUFFER_MAX_CAPACITY));
MemoryManager.setMemoryMode(
MemoryManager.MemoryMode.fromValue(config.get(CoreOptions.MEMORY_MODE)));
MemoryManager.setMaxMemoryCapacityInBytes(config.get(CoreOptions.MAX_MEMORY_CAPACITY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,15 @@ public final class BytesBuffer extends OutputStream {

public static final int DEFAULT_CAPACITY = 64;
public static final int MAX_BUFFER_CAPACITY = 128 * 1024 * 1024; // 128M
public static final int MAX_BUFFER_CAPACITY_UPPER_BOUND = (int) Bytes.GB;

public static final int BUF_EDGE_ID = 128;
public static final int BUF_PROPERTY = 64;

public static final byte[] BYTES_EMPTY = new byte[0];

private static volatile int maxBufferCapacity = MAX_BUFFER_CAPACITY;

private ByteBuffer buffer;
private final boolean resize;

Expand All @@ -91,10 +94,11 @@ public BytesBuffer() {
}

public BytesBuffer(int capacity) {
if (capacity > MAX_BUFFER_CAPACITY) {
int maxCapacity = maxBufferCapacity();
if (capacity > maxCapacity) {
E.checkArgument(false,
"Capacity %s exceeds max buffer capacity: %s",
capacity, MAX_BUFFER_CAPACITY);
capacity, maxCapacity);
}
this.buffer = ByteBuffer.allocate(capacity);
this.resize = true;
Expand Down Expand Up @@ -122,6 +126,20 @@ public static BytesBuffer wrap(byte[] array, int offset, int length) {
return new BytesBuffer(ByteBuffer.wrap(array, offset, length));
}

public static int maxBufferCapacity() {
return maxBufferCapacity;
}

public static void setMaxBufferCapacity(int capacity) {
E.checkArgument(capacity >= DEFAULT_CAPACITY &&
capacity <= MAX_BUFFER_CAPACITY_UPPER_BOUND,
"Max buffer capacity must be in range [%s, %s], " +
"but got %s",
DEFAULT_CAPACITY, MAX_BUFFER_CAPACITY_UPPER_BOUND,
capacity);
maxBufferCapacity = capacity;
}

public ByteBuffer asByteBuffer() {
return this.buffer;
}
Expand Down Expand Up @@ -174,13 +192,15 @@ private void require(int size) {
}

// Extra capacity as buffer
int newCapacity = size + this.buffer.limit() + DEFAULT_CAPACITY;
if (newCapacity > MAX_BUFFER_CAPACITY) {
long newCapacity = (long) size + this.buffer.limit() +
Comment thread
LegendPei marked this conversation as resolved.
Outdated
DEFAULT_CAPACITY;
int maxCapacity = maxBufferCapacity();
if (newCapacity > maxCapacity) {
E.checkArgument(false,
"Capacity %s exceeds max buffer capacity: %s",
newCapacity, MAX_BUFFER_CAPACITY);
newCapacity, maxCapacity);
}
ByteBuffer newBuffer = ByteBuffer.allocate(newCapacity);
ByteBuffer newBuffer = ByteBuffer.allocate((int) newCapacity);
this.buffer.flip();
newBuffer.put(this.buffer);
this.buffer = newBuffer;
Expand Down Expand Up @@ -318,7 +338,6 @@ public byte[] readBytes() {

public BytesBuffer writeBigBytes(byte[] bytes) {
if (bytes.length > BLOB_LEN_MAX) {
// TODO: note the max blob size should be 128MB (due to MAX_BUFFER_CAPACITY)
E.checkArgument(false,
"The max length of bytes is %s, but got %s",
BLOB_LEN_MAX, bytes.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hugegraph.config;

import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.type.define.CollectionType;
import org.apache.hugegraph.util.Bytes;
Expand Down Expand Up @@ -79,6 +80,14 @@ public class CoreOptions extends OptionHolder {
disallowEmpty(),
"text"
);
public static final ConfigOption<Integer> SERIALIZER_BUFFER_MAX_CAPACITY =
new ConfigOption<>(
"serializer.buffer_max_capacity",
"The max capacity of one serialization buffer in bytes.",
rangeInt(BytesBuffer.DEFAULT_CAPACITY,
BytesBuffer.MAX_BUFFER_CAPACITY_UPPER_BOUND),
BytesBuffer.MAX_BUFFER_CAPACITY
);
public static final ConfigOption<Boolean> RAFT_MODE =
new ConfigOption<>(
"raft.mode",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static BytesBuffer decompress(byte[] bytes, int blockSize, float bufferRa
LZ4FastDecompressor decompressor = factory.fastDecompressor();
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
int initBufferSize = Math.min(Math.round(bytes.length * ratio),
BytesBuffer.MAX_BUFFER_CAPACITY);
BytesBuffer.maxBufferCapacity());
BytesBuffer buf = new BytesBuffer(initBufferSize);
LZ4BlockInputStream lzInput = new LZ4BlockInputStream(bais, decompressor);
int count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ edge.cache_type=l2
# version before 1.7.0 of apache hugegraph for your application.
backend=hstore
serializer=binary
# The max capacity of one serialization buffer in bytes
#serializer.buffer_max_capacity=134217728

store=hugegraph

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ edge.cache_type=l2
# if you want to use Cassandra/MySql/PG... as backend, please use version < 1.7.0
backend=rocksdb
serializer=binary
# The max capacity of one serialization buffer in bytes
#serializer.buffer_max_capacity=134217728

store=hugegraph

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,35 @@
import java.util.TimeZone;
import java.util.UUID;

import org.apache.hugegraph.HugeFactory;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
import org.apache.hugegraph.backend.id.IdGenerator.UuidId;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.schema.PropertyKey;
import org.apache.hugegraph.testutil.Assert;
import org.apache.hugegraph.type.define.Cardinality;
import org.apache.hugegraph.type.define.DataType;
import org.apache.hugegraph.unit.BaseUnitTest;
import org.apache.hugegraph.unit.FakeObjects;
import org.apache.hugegraph.util.Blob;
import org.apache.hugegraph.util.LZ4Util;
import org.junit.After;
import org.junit.Test;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

public class BytesBufferTest extends BaseUnitTest {

@After
public void tearDown() {
BytesBuffer.setMaxBufferCapacity(BytesBuffer.MAX_BUFFER_CAPACITY);
}

@Test
public void testAllocate() {
Assert.assertEquals(0, BytesBuffer.allocate(0).array().length);
Expand All @@ -69,6 +80,87 @@ public void testAllocate() {
buf0.bytes());
}

@Test
public void testAllocateWithMaxBufferCapacity() {
Assert.assertEquals(BytesBuffer.MAX_BUFFER_CAPACITY,
BytesBuffer.maxBufferCapacity());

BytesBuffer.setMaxBufferCapacity(128);
Assert.assertEquals(128, BytesBuffer.maxBufferCapacity());
Assert.assertEquals(128, BytesBuffer.allocate(128).array().length);

Assert.assertThrows(IllegalArgumentException.class, () -> {
BytesBuffer.allocate(129);
}, e -> {
Assert.assertContains("Capacity 129 exceeds max buffer " +
"capacity: 128",
e.getMessage());
});
}

@Test
public void testResizeWithMaxBufferCapacity() {
BytesBuffer.setMaxBufferCapacity(128);
BytesBuffer buffer = BytesBuffer.allocate(64);
buffer.write(new byte[64]);

Assert.assertThrows(IllegalArgumentException.class, () -> {
buffer.write((byte) 1);
}, e -> {
Assert.assertContains("Capacity 129 exceeds max buffer " +
"capacity: 128",
e.getMessage());
});
}

@Test
public void testSetMaxBufferCapacityWithInvalidValue() {
Assert.assertThrows(IllegalArgumentException.class, () -> {
BytesBuffer.setMaxBufferCapacity(BytesBuffer.DEFAULT_CAPACITY - 1);
}, e -> {
Assert.assertContains("Max buffer capacity must be in range",
e.getMessage());
});

Assert.assertThrows(IllegalArgumentException.class, () -> {
BytesBuffer.setMaxBufferCapacity(
BytesBuffer.MAX_BUFFER_CAPACITY_UPPER_BOUND + 1);
}, e -> {
Assert.assertContains("Max buffer capacity must be in range",
e.getMessage());
});
}

@Test
public void testLZ4DecompressWithMaxBufferCapacity() {
byte[] bytes = genBytes(256);
BytesBuffer compressed = LZ4Util.compress(bytes, 64);

BytesBuffer.setMaxBufferCapacity(128);

Assert.assertThrows(IllegalArgumentException.class, () -> {
LZ4Util.decompress(compressed.bytes(), 64);
}, e -> {
Assert.assertContains("exceeds max buffer capacity: 128",
e.getMessage());
});
}

@Test
public void testMaxBufferCapacityFromGraphConfig() throws Exception {
HugeConfig config = FakeObjects.newConfig();
config.setProperty(CoreOptions.STORE.name(), "buffer_capacity");
config.setProperty(CoreOptions.SERIALIZER_BUFFER_MAX_CAPACITY.name(),
256);

HugeGraph graph = HugeFactory.open(config);
try {
Assert.assertEquals(256, BytesBuffer.maxBufferCapacity());
} finally {
graph.close();
}
}

@Test
public void testWrap() {
BytesBuffer buf4 = BytesBuffer.wrap(new byte[]{1, 2, 3, 4});
Expand Down
Loading