Storing large items with DDB and S3
When working on applications, you want to focus on your business logic rather than spending your time on the mechanics of how to persist your data.
If you are working in AWS, there is a good chance you are already using Dynamo DB as your database, as it gives you great performance and ease of use.
But then, you may start hitting some limits. Each DDB item can store up to 400KB of data, if you will try to store more than that, the request will fail with an exception. not cool.
DynamoDBMapper class has a feature called “S3Link”, which allows you to store certain attributes of an item in S3, and have the “link” to them persisted as part of the DDB actual item, which will save you space. The downside is that you need to manage that yourself, and you are responsible for uploading/downloading the actual S3 objects that you are storing as a link. More about this approach can be found here: https://aws.amazon.com/blogs/developer/using-s3link-with-amazon-dynamodb/
I would like to propose another way.
Let’s take a simple example of a DDB table named “Items” which has an itemId as a hash key, and itemName as a non-key attribute.
1 2 3 4 5 6 7 8 9 10 11 |
A simple usage, with a DDBMapper will look like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | public class StorageLayer { private final AmazonDynamoDB ddbClient; private final DynamoDBMapper ddbMapper; public StorageLayer(final AmazonDynamoDB ddbClient) { this.ddbClient = ddbClient; this.ddbMapper = new DynamoDBMapper(ddbClient); } public void put(final ItemToStore item) { ddbMapper.save(item); } public Optional<itemToStore> get(final String itemId) { return Optional.ofNullable(ddbMapper.load(ItemToStore.class, itemId)); } } |
Now let’s make things interesting by adding another attribute to the item, which might be large (> 400KB).
I will introduce a new annotation which can be used on the class fields and mark which ones are candidates for offloading, and also specify the key prefix for the S3 object (in case offloading will occur) and the size threshold. You probably don’t want to offload all the fields marked with this annotation, as S3 latencies are much higher than DDB latencies.
Then, we will introduce a new handler class, with a method intended to be called before we save an item to DDB, and another method intended to be called after we load an item from DDB. The handler will go over the fields, identify the annotation we introduced, and will handle the put/get to/from S3.
Note: There is an assumption that every field has a getter and a setter in a specific naming convention (for example, the field itemId will have getItemName() and setItemName() methods). There is a verification in-place, and not much of an exception handling – as this is just an example.
The whole code will look like this:
The LargeItem annotation:
1 2 3 4 5 6 7 8 | @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) public @interface LargeItem { long offloadIfLargerThanBytes() default 128 * 1024L; // 128K String keyPrefix() default ""; } |
The LargeItemHandler class:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | public class LargeItemsHandler<t> { private static final String S3_PATH_PREFIX = "s3://"; private final AmazonS3 s3Client; private final String bucketName; public LargeItemsHandler(final AmazonS3 s3Client, final String bucketName) { this.s3Client = s3Client; this.bucketName = bucketName; } public void beforeSave(final T item) { final ReflectionHelper<t> helper = new ReflectionHelper<>(item); final List<field> candidates = helper.getAndValidateAllFieldsWithAnnotation(LargeItem.class); candidates.forEach(field -> { final LargeItem annotation = field.getAnnotation(LargeItem.class); final String value = helper.getStringValue(field); if (value.length() > annotation.offloadIfLargerThanBytes()) { final String key = String.format("%s%s", annotation.keyPrefix(), UUID.randomUUID()); s3Client.putObject(bucketName, key, value); final String path = String.format("%s%s/%s", S3_PATH_PREFIX, bucketName, key); helper.setStringValue(field, Base64.getEncoder().encodeToString(path.getBytes(StandardCharsets.UTF_8))); } }); } public void afterLoad(final T item) { final ReflectionHelper<t> helper = new ReflectionHelper<>(item); final List<field> candidates = helper.getAndValidateAllFieldsWithAnnotation(LargeItem.class); for(final Field field : candidates) { final String value = helper.getStringValue(field); final AmazonS3URI path = tryDecodeS3Path(value); if (path == null) { continue; } // For security reasons we will validate the bucket name matches if (!path.getBucket().equals(bucketName)) { continue; } final String data = s3Client.getObjectAsString(path.getBucket(), path.getKey()); helper.setStringValue(field, data); } } private static AmazonS3URI tryDecodeS3Path(final String value) { try { final String decoded = new String(Base64.getDecoder().decode(value), StandardCharsets.UTF_8); return decoded.startsWith(S3_PATH_PREFIX) ? new AmazonS3URI(decoded) : null; } catch(final Exception ignored) { return null; } } } |
The ReflectionHelper class:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | public class ReflectionHelper<t> { private final T item; public ReflectionHelper(final T item) { this.item = item; } // We cannot use reflection getFields() as it will return only the public fields, hence we should iterate over // the whole hierarchy and get all the fields annotated with the provided annotation type public List<field> getAndValidateAllFieldsWithAnnotation(final Class<? extends Annotation> type) { final List<field> fields = getAllFieldsWithAnnotation(type); // Validate that all the identified fields have a getter and a setter as we expect fields.forEach(field -> { final String value = getStringValue(field); setStringValue(field, value); }); return fields; } private List<field> getAllFieldsWithAnnotation(final Class<? extends Annotation> type) { final List<field> fields = new ArrayList<>(); Class<?> clazz = item.getClass(); do { fields.addAll( Arrays.stream(clazz.getDeclaredFields()) .filter(field -> field.isAnnotationPresent(type)) .collect(Collectors.toList()) ); clazz = clazz.getSuperclass(); } while (clazz != null); return Collections.unmodifiableList(fields); } // The assumption is that there is a getter public String getStringValue(final Field field) { try { final String methodName = buildMethodName("get", field.getName()); final Object o = item.getClass().getMethod(methodName).invoke(item); return o == null ? null : (String) o; } catch(final Exception e) { final String message = String.format("Failed getting value of %s", field.getName()); throw new RuntimeException(message, e); } } // The assumption is that there is a setter public void setStringValue(final Field field, final String value) { try { final String methodName = buildMethodName("set", field.getName()); item.getClass().getMethod(methodName, String.class).invoke(item, value); } catch(final Exception e) { final String message = String.format("Failed setting value of %s", field.getName()); throw new RuntimeException(message, e); } } private static String buildMethodName(final String prefix, final String fieldName) { return String.format("%s%c%s", prefix, fieldName.toUpperCase().charAt(0), fieldName.substring(1)); } } |
The reason I am using base64 for the encoding of the url within the DDB item is simply to decrease the changes that someone will store another string starting with “s3://” and we will treat it as an offloaded item. This is not a perfect solution, and in real life scenarios, I would introduce a special MAGIC sequence of bytes that I will use as a header, and probably will wrap it as a JSON with additional info.
Also, this code example doesn’t have any clean-up logic. A straight forward solution would be to add some additional logic to the place where you delete the DDB items, but to be more generic, and also support deletion using TTL rules, I would suggest building a small eco-system which will look like this:
The lambda will listen to REMOVE events in the DDB stream, and will iterate over the attribute values to see if there is any base64 encoded S3 path. If there is, it will call S3 to delete the object. Depending on the traffic, this can be an expensive solution, as every lambda invocation costs money.
There is also a chance of leaking some objects due to partial failures, so in case you want to invest time in a reliable solution, you can always build some smart sweeping logic, but in my personal opinion, these cases are rare, and the storage price is cheap, so I wouldn’t really invest time and resources in solving it.
The new Item ToStore class will look like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
and the StorageLayer class which is using it will look like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | public class StorageLayer { private final LargeItemsHandler<itemToStore> largeItemsHandler; private final DynamoDBMapper ddbMapper; public StorageLayer(final AmazonDynamoDB ddbClient, final AmazonS3 s3Client, final String s3BucketName) { this.largeItemsHandler = new LargeItemsHandler<>(s3Client, s3BucketName); this.ddbMapper = new DynamoDBMapper(ddbClient); } public void put(final ItemToStore item) { largeItemsHandler.beforeSave(item); ddbMapper.save(item); } public Optional<itemToStore> get(final String itemId) { final ItemToStore item = ddbMapper.load(ItemToStore.class, itemId); if (item == null) { return Optional.empty(); } largeItemsHandler.afterLoad(item); return Optional.of(item); } } |
Note: Instead of implementing the StorageLayer class, we could technically wrap the DynamoDBMapper class, and override the load/save methods. Some will prefer this approach instead. DynamoDBMapper has 5 different constructors and multiple implementation of the save and load methods, so I decided just to have a “side-class” instead of trying to override all of them.
Did someone mention compression ???
As I mentioned earlier, S3 latencies are much higher than DDB, and this is the main reason why I’ve added the threshold to the annotation. Most of the strings we store can probably be compressed, so we can reduce the size even further and 1. Save on DDB costs. 2. Reduce the number of cases we will actually offload data to S3.
For the experiment, let’s have a simple GZIP implementation, that will look like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | @UtilityClass public class GZIPCompression { @SneakyThrows(IOException.class) public static String compress(final String str) { if (str == null) { return null; } final ByteArrayOutputStream bytes = new ByteArrayOutputStream(); final GZIPOutputStream gzip = new GZIPOutputStream(bytes); gzip.write(str.getBytes(StandardCharsets.UTF_8)); gzip.flush(); gzip.close(); final byte[] compressed = bytes.toByteArray(); final byte[] encoded = Base64.getEncoder().encode(compressed); return new String(encoded, StandardCharsets.UTF_8); } @SneakyThrows(IOException.class) public static String decompress(final String str) { if (str == null) { return null; } boolean compressed; byte[] decoded = null; try { decoded = Base64.getDecoder().decode(str); compressed = isCompressed(decoded); } catch(final Exception ignored) { compressed = false; } if (!compressed) { return str; } ByteArrayInputStream bis = new ByteArrayInputStream(decoded); GZIPInputStream gis = new GZIPInputStream(bis); byte[] decompressed = IOUtils.toByteArray(gis); return new String(decompressed, StandardCharsets.UTF_8); } public static boolean isCompressed(final byte[] compressed) { return (compressed[0] == (byte) (GZIPInputStream.GZIP_MAGIC)) && (compressed[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8)); } } |
If I build a totally random strings, and test the compression ratio, it doesn’t look good at all.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | public class GZipCompressionTest { @Test public void testRandomStringCompression_100K() { testRandomStringCompression(100); } @Test public void testRandomStringCompression_400K() { testRandomStringCompression(400); } private void testRandomStringCompression(final int KB) { final String data = RandomStringUtils.randomAlphanumeric(KB * 1024); compressDecompress(data); } private void compressDecompress(final String data) { final String compressed = GZIPCompression.compress(data); System.out.printf("Data size: %d : Compressed size: %d%n", data.length(), compressed.length()); final String decompressed = GZIPCompression.decompress(compressed); assertEquals(data, decompressed); } } |
The results are:
1 2 | Data size: 102400 : Compressed size: 102740 Data size: 409600 : Compressed size: 410760 |
So technically, instead of compressing the data, we’ve inflated it. But if we go and try to do it for a data set with repeating strings:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | public class GZipCompressionTest { @Test public void testRepeatingString_100_100K() { testRepeatingStringCompression(100, 100); } @Test public void testRepeatingString_100_400K() { testRepeatingStringCompression(100, 400); } @Test public void testRepeatingString_1000_100K() { testRepeatingStringCompression(1000, 100); } @Test public void testRepeatingString_1000_400K() { testRepeatingStringCompression(1000, 400); } private void testRepeatingStringCompression(final int itemsInDictionary, final int KB) { final List<string> dictionary = new ArrayList<>(); for(int i = 0; i < itemsInDictionary; ++i) { dictionary.add(RandomStringUtils.randomAlphanumeric(16)); } final StringBuilder sb = new StringBuilder(KB * 1024); final Random rand = new Random(); for (int i = 0; i < KB * 1024 / 16; ++i) { sb.append(dictionary.get(rand.nextInt(itemsInDictionary))); } final String data = sb.toString(); System.out.printf("Data set is build with %d items in dictionary%n", itemsInDictionary); compressDecompress(data); } private void compressDecompress(final String data) { final String compressed = GZIPCompression.compress(data); System.out.printf("Data size: %d : Compressed size: %d%n", data.length(), compressed.length()); final String decompressed = GZIPCompression.decompress(compressed); assertEquals(data, decompressed); } } |
And the results are MUCH better now:
1 2 3 4 5 6 7 8 | Data set is build with 100 items in dictionary Data size: 102400 : Compressed size: 17084 Data set is build with 100 items in dictionary Data size: 409600 : Compressed size: 61328 Data set is build with 1000 items in dictionary Data size: 102400 : Compressed size: 39316 Data set is build with 1000 items in dictionary Data size: 409600 : Compressed size: 133500 |
With 100 items in the dictionary we were able to compress the data by 85% (!!!) and with 1000 items in the dictionary we were able to compress the data by 67%.
You can choose different compression algorithms, depending on your data, and enjoy the benefits it gives you, by reducing the amount of DDB storage you’re using, and the amount of items that will be offloaded to S3. The price, is more CPU usage, as you need to go and actually compress/decompress your data. Once you chose this path, and your algorithm, running a profiler is advisable, to make sure you actually save time by doing so. If for example the S3 put/get takes around 30ms but it takes you 50ms to compress/decompress – you probably will want to avoid it.
We can integrate this logic into our annotation and LargeItemsHandler by adding another threshold to the annotation.
The new annotation will look like this:
1 2 3 4 5 6 7 8 9 10 | @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) public @interface LargeItem { long compressIfLargerThanBytes() default 64 * 1024L; // 64K long offloadIfLargerThanBytes() default 128 * 1024L; // 128K String keyPrefix() default ""; } |
And the new LargeItemsHandler will look like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | public class LargeItemsHandler<t> { private static final String S3_PATH_PREFIX = "s3://"; private final AmazonS3 s3Client; private final String bucketName; public LargeItemsHandler(final AmazonS3 s3Client, final String bucketName) { this.s3Client = s3Client; this.bucketName = bucketName; } public void beforeSave(final T item) { final ReflectionHelper<t> helper = new ReflectionHelper<>(item); final List<field> candidates = helper.getAndValidateAllFieldsWithAnnotation(LargeItem.class); candidates.forEach(field -> { final LargeItem annotation = field.getAnnotation(LargeItem.class); String value = helper.getStringValue(field); if (value.length() > annotation.compressIfLargerThanBytes()) { value = GZIPCompression.compress(value); helper.setStringValue(field, value); } if (value.length() > annotation.offloadIfLargerThanBytes()) { final String key = String.format("%s%s", annotation.keyPrefix(), UUID.randomUUID()); s3Client.putObject(bucketName, key, value); final String path = String.format("%s%s/%s", S3_PATH_PREFIX, bucketName, key); helper.setStringValue(field, Base64.getEncoder().encodeToString(path.getBytes(StandardCharsets.UTF_8))); } }); } public void afterLoad(final T item) { final ReflectionHelper<t> helper = new ReflectionHelper<>(item); final List<field> candidates = helper.getAndValidateAllFieldsWithAnnotation(LargeItem.class); for(final Field field : candidates) { final String value = helper.getStringValue(field); final AmazonS3URI path = tryDecodeS3Path(value); if (path == null) { helper.setStringValue(field, GZIPCompression.decompress(value)); continue; } // For security reasons we will validate the bucket name matches if (!path.getBucket().equals(bucketName)) { continue; } final String data = s3Client.getObjectAsString(path.getBucket(), path.getKey()); helper.setStringValue(field, GZIPCompression.decompress(data)); } } private static AmazonS3URI tryDecodeS3Path(final String value) { try { final String decoded = new String(Base64.getDecoder().decode(value), StandardCharsets.UTF_8); return decoded.startsWith(S3_PATH_PREFIX) ? new AmazonS3URI(decoded) : null; } catch(final Exception ignored) { return null; } } } |
A full example can be found in my GitHub repository, with additional unit tests for the LargeItemHandler class that will test all the different permutations that we have, and ensure correctness of the code.
– Alexander
1 thought on “Storing large items with DDB and S3”
Great article!!! Exactly what I was looking for.