From ab446e7160fd7f71a3ddd8bd887aa201bbdf1a81 Mon Sep 17 00:00:00 2001 From: ScarletRedMan Date: Thu, 16 Nov 2023 12:45:54 +0700 Subject: [PATCH] Implemented bucket repository --- .../loadbalancer/model/Bucket.java | 7 +- .../dragonestia/loadbalancer/model/Node.java | 18 ++- .../repository/BucketRepository.java | 34 +++++ .../repository/UserRepository.java | 4 + .../repository/impl/BucketRepositoryImpl.java | 142 ++++++++++++++++++ .../repository/impl/UserRepositoryImpl.java | 39 ++++- 6 files changed, 240 insertions(+), 4 deletions(-) create mode 100644 src/main/java/ru/dragonestia/loadbalancer/repository/BucketRepository.java create mode 100644 src/main/java/ru/dragonestia/loadbalancer/repository/impl/BucketRepositoryImpl.java diff --git a/src/main/java/ru/dragonestia/loadbalancer/model/Bucket.java b/src/main/java/ru/dragonestia/loadbalancer/model/Bucket.java index 8299228..97b9b0b 100644 --- a/src/main/java/ru/dragonestia/loadbalancer/model/Bucket.java +++ b/src/main/java/ru/dragonestia/loadbalancer/model/Bucket.java @@ -1,11 +1,12 @@ package ru.dragonestia.loadbalancer.model; +import lombok.AccessLevel; import lombok.Getter; import lombok.RequiredArgsConstructor; import ru.dragonestia.loadbalancer.model.type.SlotLimit; @Getter -@RequiredArgsConstructor +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) public class Bucket { private final String identifier; @@ -14,6 +15,10 @@ public class Bucket { private final String payload; private boolean locked = false; + public static Bucket create(String identifier, Node node, SlotLimit limit, String payload) { + return new Bucket(identifier, node.identifier(), limit, payload); + } + public void setLocked(boolean value) { locked = value; } diff --git a/src/main/java/ru/dragonestia/loadbalancer/model/Node.java b/src/main/java/ru/dragonestia/loadbalancer/model/Node.java index 6ecc450..a7085a0 100644 --- a/src/main/java/ru/dragonestia/loadbalancer/model/Node.java +++ b/src/main/java/ru/dragonestia/loadbalancer/model/Node.java @@ -3,4 +3,20 @@ package ru.dragonestia.loadbalancer.model; import lombok.NonNull; import ru.dragonestia.loadbalancer.model.type.LoadBalancingMethod; -public record Node(@NonNull String identifier, @NonNull LoadBalancingMethod method) {} +public record Node(@NonNull String identifier, @NonNull LoadBalancingMethod method) { + + @Override + public int hashCode() { + return identifier.hashCode(); + } + + @Override + public boolean equals(Object object) { + if (object == this) return true; + if (object == null) return false; + if (object instanceof Node other) { + return identifier.equals(other.identifier); + } + return false; + } +} diff --git a/src/main/java/ru/dragonestia/loadbalancer/repository/BucketRepository.java b/src/main/java/ru/dragonestia/loadbalancer/repository/BucketRepository.java new file mode 100644 index 0000000..d96dcde --- /dev/null +++ b/src/main/java/ru/dragonestia/loadbalancer/repository/BucketRepository.java @@ -0,0 +1,34 @@ +package ru.dragonestia.loadbalancer.repository; + +import ru.dragonestia.loadbalancer.model.Bucket; +import ru.dragonestia.loadbalancer.model.Node; +import ru.dragonestia.loadbalancer.model.User; + +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +public interface BucketRepository { + + void createBucket(Bucket bucket); + + void removeBucket(Bucket bucket); + + Optional findBucket(Node node, String identifier); + + List all(Node node); + + default int countAvailableBuckets(Node node) { + return countAvailableBuckets(node, 0); + } + + int countAvailableBuckets(Node node, int requiredSlots); + + Optional pickFreeBucket(Node node, Collection users); + + void freeBucket(Bucket bucket, Collection users); + + void onCreateNode(Node node); + + void onRemoveNode(Node node); +} diff --git a/src/main/java/ru/dragonestia/loadbalancer/repository/UserRepository.java b/src/main/java/ru/dragonestia/loadbalancer/repository/UserRepository.java index 1708da4..7d6763c 100644 --- a/src/main/java/ru/dragonestia/loadbalancer/repository/UserRepository.java +++ b/src/main/java/ru/dragonestia/loadbalancer/repository/UserRepository.java @@ -13,4 +13,8 @@ public interface UserRepository { void unlinkWithBucket(Bucket bucket, Collection users); List findAllLinkedUserBuckets(User user); + + int tryUnlinkWithBucket(Bucket bucket, Collection users); + + void onRemoveBucket(Bucket bucket); } diff --git a/src/main/java/ru/dragonestia/loadbalancer/repository/impl/BucketRepositoryImpl.java b/src/main/java/ru/dragonestia/loadbalancer/repository/impl/BucketRepositoryImpl.java new file mode 100644 index 0000000..e5a3ced --- /dev/null +++ b/src/main/java/ru/dragonestia/loadbalancer/repository/impl/BucketRepositoryImpl.java @@ -0,0 +1,142 @@ +package ru.dragonestia.loadbalancer.repository.impl; + +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Repository; +import ru.dragonestia.loadbalancer.model.Bucket; +import ru.dragonestia.loadbalancer.model.Node; +import ru.dragonestia.loadbalancer.model.User; +import ru.dragonestia.loadbalancer.repository.BucketRepository; +import ru.dragonestia.loadbalancer.repository.UserRepository; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +@Repository +@RequiredArgsConstructor +public class BucketRepositoryImpl implements BucketRepository { + + private final UserRepository userRepository; + private final Map node2bucketsMap = new ConcurrentHashMap<>(); + + @Override + public void createBucket(Bucket bucket) { + var nodeId = bucket.getNodeIdentifier(); + var node = node2bucketsMap.keySet().stream() + .filter(n -> bucket.getNodeIdentifier().equals(n.identifier())) + .findFirst(); + + synchronized (node2bucketsMap) { + if (node.isEmpty()) { + throw new IllegalArgumentException("Node '" + nodeId + "' does not exist"); + } + + node2bucketsMap.get(node.get()).put(bucket.getIdentifier(), new BucketContainer(bucket, new AtomicInteger(0))); + } + } + + @Override + public void removeBucket(Bucket bucket) { + var nodeId = bucket.getNodeIdentifier(); + var node = node2bucketsMap.keySet().stream() + .filter(n -> bucket.getNodeIdentifier().equals(n.identifier())) + .findFirst(); + + synchronized (node2bucketsMap) { + if (node.isEmpty()) { + throw new IllegalArgumentException("Node '" + nodeId + "' does not exist"); + } + + node2bucketsMap.get(node.get()).remove(bucket.getIdentifier()); + } + } + + @Override + public Optional findBucket(Node node, String identifier) { + synchronized (node2bucketsMap) { + if (!node2bucketsMap.containsKey(node)) { + throw new IllegalArgumentException("Node '" + node.identifier() + "' does not exist"); + } + + var result = node2bucketsMap.get(node).getOrDefault(identifier, null); + return result == null? Optional.empty() : Optional.of(result.bucket()); + } + } + + @Override + public List all(Node node) { + synchronized (node2bucketsMap) { + return node2bucketsMap.get(node).values().stream().map(BucketContainer::bucket).toList(); + } + } + + @Override + public int countAvailableBuckets(Node node, int requiredSlots) { + return (int) node2bucketsMap.get(node).values().stream() + .filter(bucket -> bucket.isAvailable(requiredSlots)) + .count(); + } + + @Override + public Optional pickFreeBucket(Node node, Collection users) { + synchronized (node2bucketsMap) { + if (!node2bucketsMap.containsKey(node)) { + throw new IllegalArgumentException("Node '" + node.identifier() + "' does not exist"); + } + + var requiredSlots = users.size(); + var container = node2bucketsMap.get(node).values().stream() + .filter(b -> b.isAvailable(requiredSlots)) + .findFirst(); + + return container.map(BucketContainer::bucket); + } + } + + @Override + public void freeBucket(Bucket bucket, Collection users) { + var nodeId = bucket.getNodeIdentifier(); + var node = node2bucketsMap.keySet().stream() + .filter(n -> bucket.getNodeIdentifier().equals(n.identifier())) + .findFirst(); + + synchronized (node2bucketsMap) { + if (node.isEmpty()) { + throw new IllegalArgumentException("Node '" + nodeId + "' does not exist"); + } + + var buckets = node2bucketsMap.get(node.get()); + if (!buckets.containsKey(bucket.getIdentifier())) { + throw new IllegalArgumentException("Bucket '" + nodeId + "' does not exist"); + } + + var delta = userRepository.tryUnlinkWithBucket(bucket, users); + if (buckets.get(bucket.getIdentifier()).used().getAndAdd(-delta) < 0) { + throw new RuntimeException("Bucket has less than 0 users"); + } + } + } + + @Override + public void onCreateNode(Node node) { + synchronized (node2bucketsMap) { + node2bucketsMap.put(node, new Buckets()); + } + } + + @Override + public void onRemoveNode(Node node) { + synchronized (node2bucketsMap) { + node2bucketsMap.remove(node); + } + } + + private record BucketContainer(Bucket bucket, AtomicInteger used) { + + public boolean isAvailable(int requiredSlots) { + return bucket.isAvailable(used.get(), requiredSlots); + } + } + + private static class Buckets extends LinkedHashMap {} +} diff --git a/src/main/java/ru/dragonestia/loadbalancer/repository/impl/UserRepositoryImpl.java b/src/main/java/ru/dragonestia/loadbalancer/repository/impl/UserRepositoryImpl.java index 27e7f4e..9d92547 100644 --- a/src/main/java/ru/dragonestia/loadbalancer/repository/impl/UserRepositoryImpl.java +++ b/src/main/java/ru/dragonestia/loadbalancer/repository/impl/UserRepositoryImpl.java @@ -2,12 +2,12 @@ package ru.dragonestia.loadbalancer.repository.impl; import org.springframework.stereotype.Repository; import ru.dragonestia.loadbalancer.model.Bucket; -import ru.dragonestia.loadbalancer.model.Node; import ru.dragonestia.loadbalancer.model.User; import ru.dragonestia.loadbalancer.repository.UserRepository; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; @Repository public class UserRepositoryImpl implements UserRepository { @@ -31,7 +31,12 @@ public class UserRepositoryImpl implements UserRepository { for (var user: users) { var set = usersMap.getOrDefault(user, new HashSet<>()); set.remove(bucket); - usersMap.put(user, set); + + if (set.isEmpty()) { + usersMap.remove(user); + } else { + usersMap.put(user, set); + } } } } @@ -42,4 +47,34 @@ public class UserRepositoryImpl implements UserRepository { return usersMap.getOrDefault(user, new HashSet<>()).stream().toList(); } } + + @Override + public int tryUnlinkWithBucket(Bucket bucket, Collection users) { + var counter = new AtomicInteger(); + synchronized (usersMap) { + usersMap.forEach((user, set) -> { + if (!set.contains(bucket)) return; + + set.remove(bucket); + counter.incrementAndGet(); + + if (set.isEmpty()) { + usersMap.remove(user); + } + }); + } + return counter.get(); + } + + @Override + public void onRemoveBucket(Bucket bucket) { + synchronized (usersMap) { + usersMap.forEach((user, set) -> { + set.remove(bucket); + if (set.isEmpty()) { + usersMap.remove(user); + } + }); + } + } }