Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions evcache-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {
api group:"joda-time", name:"joda-time", version:"latest.release"
api group:"javax.annotation", name:"javax.annotation-api", version:"latest.release"
api group:"com.github.fzakaria", name:"ascii85", version:"latest.release"
api group:"com.github.luben", name:"zstd-jni", version:"1.5.7-11"

testImplementation group:"org.testng", name:"testng", version:"7.5"
testImplementation group:"com.beust", name:"jcommander", version:"1.72"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public class EVCacheImpl implements EVCache, EVCacheImplMBean {
this.maxHashLength = propertyRepository.get(appName + ".max.hash.length", Integer.class).orElse(-1);
this.encoderBase = propertyRepository.get(appName + ".hash.encoder", String.class).orElse("base64");
this.autoHashKeys = propertyRepository.get(_appName + ".auto.hash.keys", Boolean.class).orElseGet("evcache.auto.hash.keys").orElse(false);
this.evcacheValueTranscoder = new EVCacheTranscoder();
this.evcacheValueTranscoder = new EVCacheTranscoder(_appName, propertyRepository);
evcacheValueTranscoder.setCompressionThreshold(Integer.MAX_VALUE);

// default max key length is 200, instead of using what is defined in MemcachedClientIF.MAX_KEY_LENGTH (250). This is to accommodate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,36 @@

package com.netflix.evcache;

import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdInputStream;
import com.netflix.archaius.api.Property;
import com.netflix.evcache.metrics.EVCacheMetricsFactory;
import com.netflix.evcache.pool.ServerGroup;
import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import com.netflix.spectator.api.Timer;
import net.spy.memcached.CachedData;
import net.spy.memcached.transcoders.BaseSerializingTranscoder;
import net.spy.memcached.transcoders.Transcoder;
import net.spy.memcached.transcoders.TranscoderUtils;
import net.spy.memcached.util.StringUtils;

import java.time.Duration;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Transcoder that serializes and compresses objects.
*/
public class EVCacheSerializingTranscoder extends BaseSerializingTranscoder implements
Transcoder<Object> {
private static final Logger logger = LoggerFactory.getLogger(EVCacheSerializingTranscoder.class);

// General flags
static final int SERIALIZED = 1;
Expand All @@ -63,10 +68,16 @@ public class EVCacheSerializingTranscoder extends BaseSerializingTranscoder impl
static final int SPECIAL_DOUBLE = (7 << 8);
static final int SPECIAL_BYTEARRAY = (8 << 8);

static final String COMPRESSION = "COMPRESSION_METRIC";
public enum CompressionAlgorithm { GZIP, ZSTD }

public static final int DEFAULT_ZSTD_COMPRESSION_LEVEL = 3;

private static final int ZSTD_MAGIC = 0xFD2FB528;

private final TranscoderUtils tu = new TranscoderUtils(true);
private Timer timer;
private Property<String> compressionAlgorithmProperty;
private Property<Integer> zstdLevelProperty;
Comment thread
janewang1680 marked this conversation as resolved.
protected final String appName;

/**
* Get a serializing transcoder with the default max data size.
Expand All @@ -79,7 +90,23 @@ public EVCacheSerializingTranscoder() {
* Get a serializing transcoder that specifies the max data size.
*/
public EVCacheSerializingTranscoder(int max) {
this(null, max);
}

/**
* Get a serializing transcoder that specifies the owning app name and the max data size.
*/
public EVCacheSerializingTranscoder(String appName, int max) {
super(max);
this.appName = appName;
}

public void setCompressionAlgorithmProperty(Property<String> algorithmProperty) {
this.compressionAlgorithmProperty = algorithmProperty;
}

public void setCompressionLevelProperty(Property<Integer> levelProperty) {
this.zstdLevelProperty = levelProperty;
}

@Override
Expand Down Expand Up @@ -179,31 +206,106 @@ public CachedData encode(Object o) {
}
assert b != null;
if (b.length > compressionThreshold) {
int originalLength = b.length;
byte[] compressed = compress(b);
if (compressed.length < b.length) {
if (compressed.length < originalLength) {
getLogger().trace("Compressed %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
o.getClass().getName(), originalLength, compressed.length);
b = compressed;
flags |= COMPRESSED;
} else {
getLogger().debug("Compression increased the size of %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
o.getClass().getName(), originalLength, compressed.length);
}

long compression_ratio = Math.round((double) compressed.length / b.length * 100);
updateTimerWithCompressionRatio(compression_ratio);
}
return new CachedData(flags, b, getMaxSize());
}

private void updateTimerWithCompressionRatio(long ratio_percentage) {
if(timer == null) {
final List<Tag> tagList = new ArrayList<Tag>(1);
tagList.add(new BasicTag(EVCacheMetricsFactory.COMPRESSION_TYPE, "gzip"));
timer = EVCacheMetricsFactory.getInstance().getPercentileTimer(EVCacheMetricsFactory.COMPRESSION_RATIO, tagList, Duration.ofMillis(100));
};
@Override
protected byte[] compress(byte[] in) {
if (in == null) throw new NullPointerException("Can't compress null");

CompressionAlgorithm compressionAlgorithm = compressionAlgorithmProperty == null ? CompressionAlgorithm.GZIP
: CompressionAlgorithm.valueOf(compressionAlgorithmProperty.orElse(CompressionAlgorithm.GZIP.name()).get().toUpperCase());

byte[] compressed;
switch (compressionAlgorithm) {
case ZSTD:
int zstdLevel = zstdLevelProperty == null ? DEFAULT_ZSTD_COMPRESSION_LEVEL
: zstdLevelProperty.orElse(DEFAULT_ZSTD_COMPRESSION_LEVEL).get();
logger.debug("algorithm: {}, level: {}, appName: {}", compressionAlgorithm, zstdLevel, appName);
compressed = Zstd.compress(in, zstdLevel);
break;
case GZIP:
logger.debug("algorithm: {}, appName: {}", compressionAlgorithm, appName);
compressed = super.compress(in);
break;
default:
throw new IllegalArgumentException("Unsupported compression algorithm: " + compressionAlgorithm);
}

if (compressed != null) {
long ratioPerCent = Math.round((double) compressed.length / in.length * 100.0);
recordCompressionRatio(ratioPerCent, compressionAlgorithm);
}

return compressed;
}

@Override
protected byte[] decompress(byte[] in) {
if (in == null || in.length == 0) return in;
if (isZstdCompressed(in)) return decompressZstd(in);
return super.decompress(in);
}

private boolean isZstdCompressed(byte[] data) {
if (data == null || data.length < 4) return false;
int magic = ByteBuffer.wrap(data, 0, 4).order(ByteOrder.LITTLE_ENDIAN).getInt();
return magic == ZSTD_MAGIC;
}

private byte[] decompressZstd(byte[] in) {
long originalSize = Zstd.getFrameContentSize(in);
if (originalSize > Integer.MAX_VALUE) {
throw new RuntimeException("Zstd decompressed size exceeds int range: " + originalSize);
}
if (originalSize > 0) {
// Fast path: frame carries a content-size header (compress() above always does).
return Zstd.decompress(in, (int) originalSize);
}
// Slow path: declared size is 0, unknown (-1), or invalid (-2) — stream-decode and let
// ZstdInputStream surface any frame errors.
ZstdInputStream zis = null;
try {
zis = new ZstdInputStream(new ByteArrayInputStream(in));
return readAll(zis);
} catch (IOException e) {
getLogger().error("Error reading Zstd input stream", e);
return null;
} finally {
try { if (zis != null) zis.close(); } catch (IOException ignored) {}
}
}

timer.record(ratio_percentage, TimeUnit.MILLISECONDS);
private static byte[] readAll(InputStream in) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buf = new byte[8192];
int n;
while ((n = in.read(buf)) != -1) {
out.write(buf, 0, n);
}
return out.toByteArray();
}

private void recordCompressionRatio(long ratioPerCent, CompressionAlgorithm compressionAlgorithm) {
final List<Tag> tagList = new ArrayList<Tag>(2);
tagList.add(new BasicTag(EVCacheMetricsFactory.COMPRESSION_TYPE, compressionAlgorithm.name().toLowerCase()));
if (appName != null && !appName.isEmpty()) {
tagList.add(new BasicTag(EVCacheMetricsFactory.CACHE, appName));
}
EVCacheMetricsFactory.getInstance()
.getDistributionSummary(EVCacheMetricsFactory.COMPRESSION_RATIO, tagList)
.record(ratioPerCent);
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,69 @@
package com.netflix.evcache;

import com.netflix.archaius.api.Property;
import com.netflix.archaius.api.PropertyRepository;
import com.netflix.evcache.util.EVCacheConfig;

import net.spy.memcached.CachedData;

public class EVCacheTranscoder extends EVCacheSerializingTranscoder {

public EVCacheTranscoder() {
this(EVCacheConfig.getInstance().getPropertyRepository().get("default.evcache.max.data.size", Integer.class).orElse(20 * 1024 * 1024).get());
this((String) null);
}

public EVCacheTranscoder(String appName) {
this(appName, EVCacheConfig.getInstance().getPropertyRepository());
}

public EVCacheTranscoder(int max) {
this(max, EVCacheConfig.getInstance().getPropertyRepository().get("default.evcache.compression.threshold", Integer.class).orElse(120).get());
this(null, max);
}

public EVCacheTranscoder(String appName, int max) {
this(appName, EVCacheConfig.getInstance().getPropertyRepository(), max);
}

public EVCacheTranscoder(int max, int compressionThreshold) {
super(max);
setCompressionThreshold(compressionThreshold);
this(null, max, compressionThreshold);
}

@Override
public boolean asyncDecode(CachedData d) {
return super.asyncDecode(d);
public EVCacheTranscoder(String appName, int max, int compressionThreshold) {
this(appName, EVCacheConfig.getInstance().getPropertyRepository(), max, compressionThreshold);
}

@Override
public Object decode(CachedData d) {
return super.decode(d);
/**
* Repository-aware constructors. The compression algorithm/level are read dynamically from the
* supplied {@link PropertyRepository}, so callers must pass the repository that is wired to Fast
* Properties (e.g. {@code poolManager.getEVCacheConfig().getPropertyRepository()}) for FP overrides
* to take effect. The no-repository constructors above fall back to {@link EVCacheConfig#getInstance()}.
*/
public EVCacheTranscoder(String appName, PropertyRepository config) {
this(appName, config, config.get("default.evcache.max.data.size", Integer.class).orElse(20 * 1024 * 1024).get());
}

public EVCacheTranscoder(String appName, PropertyRepository config, int max) {
this(appName, config, max, config.get("default.evcache.compression.threshold", Integer.class).orElse(120).get());
}

public EVCacheTranscoder(String appName, PropertyRepository config, int max, int compressionThreshold) {
super(appName, max);
setCompressionThreshold(compressionThreshold);
Property<String> algoProperty = getProperty(config, "evcacheclient.compression.algo", String.class);
setCompressionAlgorithmProperty(algoProperty);
Property<Integer> zstdLevelProperty = getProperty(config, "evcacheclient.compression.zstd.level", Integer.class);
setCompressionLevelProperty(zstdLevelProperty);
}

/**
* Resolves a property preferring the appName-prefixed key (e.g. {@code EVCACHE_VH_ARCHIVE.default.evcache.compression.algo})
* and falling back to the global {@code default.evcache.*} key when no app-specific override exists.
*/
private <T> Property<T> getProperty(PropertyRepository config, String key, Class<T> type) {
if (appName == null || appName.isEmpty()) {
return config.get(key, type);
}
return config.get(appName + "." + key, type).orElseGet(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public BlockingQueue<Operation> createWriteOperationQueue() {
}

public Transcoder<Object> getDefaultTranscoder() {
return new EVCacheTranscoder();
return new EVCacheTranscoder(appName,
client.getPool().getEVCacheClientPoolManager().getEVCacheConfig().getPropertyRepository());
}

public FailureMode getFailureMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public BlockingQueue<Operation> createWriteOperationQueue() {
}

public Transcoder<Object> getDefaultTranscoder() {
return new EVCacheTranscoder();
return new EVCacheTranscoder(appName,
client.getPool().getEVCacheClientPoolManager().getEVCacheConfig().getPropertyRepository());
}

public FailureMode getFailureMode() {
Expand Down
Loading