Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write latest finalized state in chunks #9026

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Consensys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.infrastructure.ssz.sos;

import java.util.Arrays;
import java.util.List;

public class SszByteArrayChunksWriter implements SszWriter {
private final byte[][] chunks;
private final int maxChunkSize;
private final int maxChunks;

private int lastChunkIndex;
private int size = 0;

public SszByteArrayChunksWriter(final int maxSize, final int maxChunkSize) {
this.maxChunks = (maxSize + maxChunkSize - 1) / maxChunkSize;
this.chunks = new byte[maxChunks][];
this.maxChunkSize = maxChunkSize;
}

@Override
public void write(final byte[] bytes, final int offset, final int length) {
final int chunk = this.size / maxChunkSize;
final int chunkOffset = this.size % maxChunkSize;
final byte[] chunkData = getChunk(chunk);

if (chunkOffset + length > maxChunkSize) {
final int lengthToFillCurrentChunk = maxChunkSize - chunkOffset;
System.arraycopy(bytes, offset, chunkData, chunkOffset, lengthToFillCurrentChunk);
this.size += lengthToFillCurrentChunk;

write(bytes, offset + lengthToFillCurrentChunk, length - lengthToFillCurrentChunk);
} else {
System.arraycopy(bytes, offset, chunkData, chunkOffset, length);
this.size += length;
}
}

private byte[] getChunk(final int chunk) {
if (chunk >= maxChunks) {
throw new IndexOutOfBoundsException(
"Chunk index out of bounds: " + chunk + ", max chunks: " + maxChunks);
}
byte[] chunkData = chunks[chunk];
if (chunkData == null) {
chunkData = new byte[maxChunkSize];
chunks[chunk] = chunkData;
lastChunkIndex = chunk;
}
return chunkData;
}

public List<byte[]> getChunks() {
int lastChunkSize = size % maxChunkSize;
if (lastChunkSize != 0) {
byte[] lastChunk = chunks[lastChunkIndex];
byte[] lastChunkTrimmed = Arrays.copyOf(lastChunk, lastChunkSize);
chunks[lastChunkIndex] = lastChunkTrimmed;
}

return Arrays.stream(chunks).limit(lastChunkIndex + 1).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,24 @@
package tech.pegasys.teku.storage.server.kvstore;

import com.google.errorprone.annotations.MustBeClosed;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable;

public interface KvStoreAccessor extends AutoCloseable {

<T> Optional<T> get(KvStoreVariable<T> variable);
<T> Optional<T> get(KvStoreUnchunkedVariable<T> variable);

Optional<Bytes> getRaw(KvStoreVariable<?> variable);
<T> Optional<T> get(KvStoreChunkedVariable<T> variable);

Optional<Bytes> getRaw(KvStoreUnchunkedVariable<?> variable);

Optional<List<Bytes>> getRaw(KvStoreChunkedVariable<?> variable);

<K, V> Optional<V> get(KvStoreColumn<K, V> column, K key);

Expand Down Expand Up @@ -112,14 +118,18 @@ <K extends Comparable<K>, V> Stream<ColumnEntry<K, V>> stream(

interface KvStoreTransaction extends AutoCloseable {

<T> void put(KvStoreVariable<T> variable, T value);
<T> void put(KvStoreUnchunkedVariable<T> variable, T value);

<T> void put(KvStoreChunkedVariable<T> variable, T value);

/**
* Write raw bytes to a specified variable.
*
* <p>WARNING: should only be used to migrate data between database instances
*/
<T> void putRaw(KvStoreVariable<T> variable, Bytes value);
<T> void putRaw(KvStoreUnchunkedVariable<T> variable, Bytes value);

<T> void putRaw(KvStoreChunkedVariable<T> variable, List<Bytes> valueChunks);

<K, V> void put(KvStoreColumn<K, V> column, K key, V value);

Expand All @@ -134,7 +144,7 @@ interface KvStoreTransaction extends AutoCloseable {

<K, V> void delete(KvStoreColumn<K, V> column, K key);

<T> void delete(KvStoreVariable<T> variable);
<T> void delete(KvStoreUnchunkedVariable<T> variable);

void commit();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor;
import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction;
import tech.pegasys.teku.storage.server.kvstore.dataaccess.V4FinalizedStateStorageLogic.FinalizedStateUpdater;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.SchemaCombined;

Expand Down Expand Up @@ -205,8 +207,7 @@ public void ingest(
try (final KvStoreTransaction transaction = db.startTransaction()) {
for (String key : newVariables.keySet()) {
logger.accept(String.format("Copy variable %s", key));
dao.getRawVariable(oldVariables.get(key))
.ifPresent(value -> transaction.putRaw(newVariables.get(key), value));
handleVariableMigration(dao, newVariables.get(key), transaction);
}
transaction.commit();
}
Expand All @@ -228,6 +229,28 @@ public void ingest(
}
}

private void handleVariableMigration(
final V4MigratableSourceDao dao,
final KvStoreVariable<?> variable,
final KvStoreTransaction transaction) {
if (variable.toChunkedVariable().isPresent()) {
final KvStoreChunkedVariable<?> chunkedVariable = variable.toChunkedVariable().orElseThrow();
dao.getRawVariable(chunkedVariable)
.ifPresent(chunks -> transaction.putRaw(chunkedVariable, chunks));
return;
}

if (variable.toUnchunkedVariable().isPresent()) {
final KvStoreUnchunkedVariable<?> unchunckedVariable =
variable.toUnchunkedVariable().orElseThrow();
dao.getRawVariable(unchunckedVariable)
.ifPresent(value -> transaction.putRaw(unchunckedVariable, value));
return;
}

throw new IllegalStateException("Variable must be chunked or unchunked");
}

@Override
public Map<String, KvStoreColumn<?, ?>> getColumnMap() {
return schema.getColumnMap();
Expand All @@ -239,7 +262,12 @@ public Map<String, KvStoreVariable<?>> getVariableMap() {
}

@Override
public <T> Optional<Bytes> getRawVariable(final KvStoreVariable<T> var) {
public <T> Optional<Bytes> getRawVariable(final KvStoreUnchunkedVariable<T> var) {
return db.getRaw(var);
}

@Override
public <T> Optional<List<Bytes>> getRawVariable(final KvStoreChunkedVariable<T> var) {
return db.getRaw(var);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import tech.pegasys.teku.storage.server.kvstore.ColumnEntry;
import tech.pegasys.teku.storage.server.kvstore.dataaccess.V4FinalizedKvStoreDao.V4FinalizedUpdater;
import tech.pegasys.teku.storage.server.kvstore.dataaccess.V4HotKvStoreDao.V4HotUpdater;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable;

public class KvStoreCombinedDaoAdapter implements KvStoreCombinedDao, V4MigratableSourceDao {
Expand Down Expand Up @@ -367,7 +369,16 @@ public Map<String, KvStoreVariable<?>> getVariableMap() {
}

@Override
public <T> Optional<Bytes> getRawVariable(final KvStoreVariable<T> var) {
public <T> Optional<Bytes> getRawVariable(final KvStoreUnchunkedVariable<T> var) {
if (hotDao.getVariableMap().containsValue(var)) {
return hotDao.getRawVariable(var);
} else {
return finalizedDao.getRawVariable(var);
}
}

@Override
public <T> Optional<List<Bytes>> getRawVariable(final KvStoreChunkedVariable<T> var) {
if (hotDao.getVariableMap().containsValue(var)) {
return hotDao.getRawVariable(var);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor;
import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction;
import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDao.FinalizedUpdater;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.SchemaFinalizedSnapshotStateAdapter;

Expand Down Expand Up @@ -195,7 +197,11 @@ public Optional<UInt64> getEarliestBlobSidecarSlot() {
return db.get(schema.getVariableEarliestBlobSidecarSlot());
}

public <T> Optional<Bytes> getRawVariable(final KvStoreVariable<T> var) {
public <T> Optional<Bytes> getRawVariable(final KvStoreUnchunkedVariable<T> var) {
return db.getRaw(var);
}

public <T> Optional<List<Bytes>> getRawVariable(final KvStoreChunkedVariable<T> var) {
return db.getRaw(var);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor;
import tech.pegasys.teku.storage.server.kvstore.KvStoreAccessor.KvStoreTransaction;
import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDao.HotUpdater;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.SchemaHotAdapter;

Expand Down Expand Up @@ -142,7 +144,11 @@ public V4HotUpdater hotUpdater() {
return new V4HotUpdater(db, schema);
}

public <T> Optional<Bytes> getRawVariable(final KvStoreVariable<T> var) {
public <T> Optional<Bytes> getRawVariable(final KvStoreUnchunkedVariable<T> var) {
return db.getRaw(var);
}

public <T> Optional<List<Bytes>> getRawVariable(final KvStoreChunkedVariable<T> var) {
return db.getRaw(var);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,25 @@
package tech.pegasys.teku.storage.server.kvstore.dataaccess;

import com.google.errorprone.annotations.MustBeClosed;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import tech.pegasys.teku.storage.server.kvstore.ColumnEntry;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreChunkedVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreColumn;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreUnchunkedVariable;
import tech.pegasys.teku.storage.server.kvstore.schema.KvStoreVariable;

public interface V4MigratableSourceDao {
Map<String, KvStoreColumn<?, ?>> getColumnMap();

Map<String, KvStoreVariable<?>> getVariableMap();

<T> Optional<Bytes> getRawVariable(final KvStoreVariable<T> var);
<T> Optional<Bytes> getRawVariable(final KvStoreUnchunkedVariable<T> var);

<T> Optional<List<Bytes>> getRawVariable(final KvStoreChunkedVariable<T> var);

@MustBeClosed
<K, V> Stream<ColumnEntry<Bytes, Bytes>> streamRawColumn(final KvStoreColumn<K, V> kvStoreColumn);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Consensys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.storage.server.kvstore.schema;

import static tech.pegasys.teku.infrastructure.unsigned.ByteUtil.toByteExact;

import java.util.Objects;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes;
import tech.pegasys.teku.storage.server.kvstore.serialization.KvStoreChunkingSerializer;

public class KvStoreChunkedVariable<TValue> implements KvStoreVariable<TValue> {
private final Bytes id;
private final KvStoreChunkingSerializer<TValue> serializer;

private KvStoreChunkedVariable(
final byte[] id, final KvStoreChunkingSerializer<TValue> serializer) {
this.id = Bytes.wrap(id);
this.serializer = serializer;
}

public static <T> KvStoreChunkedVariable<T> create(
final int id, final KvStoreChunkingSerializer<T> serializer) {
final byte byteId = toByteExact(id);
return new KvStoreChunkedVariable<T>(new byte[] {byteId}, serializer);
}

public Bytes getId() {
return id;
}

public KvStoreChunkingSerializer<TValue> getSerializer() {
return serializer;
}

@Override
public Optional<KvStoreChunkedVariable<TValue>> toChunkedVariable() {
return Optional.of(this);
}

@Override
public Optional<KvStoreUnchunkedVariable<TValue>> toUnchunkedVariable() {
return Optional.empty();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final KvStoreChunkedVariable<?> that = (KvStoreChunkedVariable<?>) o;
return Objects.equals(id, that.id) && Objects.equals(serializer, that.serializer);
}

@Override
public int hashCode() {
return Objects.hash(id, serializer);
}
}
Loading
Loading