From 8a14640aaae8b6a9f0797cdd3570324a58bb46fe Mon Sep 17 00:00:00 2001 From: ScarletRedMan Date: Thu, 16 Nov 2023 13:01:02 +0700 Subject: [PATCH] Added linking between nodes, buckets, users --- .../repository/UserRepository.java | 7 ++-- .../repository/impl/BucketRepositoryImpl.java | 12 +++++- .../repository/impl/NodeRepositoryImpl.java | 10 +++++ .../repository/impl/UserRepositoryImpl.java | 40 +++++++------------ 4 files changed, 37 insertions(+), 32 deletions(-) diff --git a/src/main/java/ru/dragonestia/loadbalancer/repository/UserRepository.java b/src/main/java/ru/dragonestia/loadbalancer/repository/UserRepository.java index 7d6763c..ae8116f 100644 --- a/src/main/java/ru/dragonestia/loadbalancer/repository/UserRepository.java +++ b/src/main/java/ru/dragonestia/loadbalancer/repository/UserRepository.java @@ -5,16 +5,15 @@ import ru.dragonestia.loadbalancer.model.User; import java.util.Collection; import java.util.List; +import java.util.Map; public interface UserRepository { - void linkWithBucket(Bucket bucket, Collection users); + Map linkWithBucket(Bucket bucket, Collection users); - void unlinkWithBucket(Bucket bucket, Collection users); + int unlinkWithBucket(Bucket bucket, Collection users); List findAllLinkedUserBuckets(User user); - int tryUnlinkWithBucket(Bucket bucket, Collection users); - void onRemoveBucket(Bucket bucket); } diff --git a/src/main/java/ru/dragonestia/loadbalancer/repository/impl/BucketRepositoryImpl.java b/src/main/java/ru/dragonestia/loadbalancer/repository/impl/BucketRepositoryImpl.java index e5a3ced..43e87c1 100644 --- a/src/main/java/ru/dragonestia/loadbalancer/repository/impl/BucketRepositoryImpl.java +++ b/src/main/java/ru/dragonestia/loadbalancer/repository/impl/BucketRepositoryImpl.java @@ -49,6 +49,8 @@ public class BucketRepositoryImpl implements BucketRepository { node2bucketsMap.get(node.get()).remove(bucket.getIdentifier()); } + + userRepository.onRemoveBucket(bucket); } @Override @@ -85,10 +87,16 @@ public class BucketRepositoryImpl implements BucketRepository { } var requiredSlots = users.size(); - var container = node2bucketsMap.get(node).values().stream() + var container = node2bucketsMap.get(node).values().stream() // TODO: pick bucket with used node balancing method .filter(b -> b.isAvailable(requiredSlots)) .findFirst(); + if (container.isPresent()) { + var cont = container.get(); + var addedUsers = userRepository.linkWithBucket(cont.bucket(), users); + cont.used().getAndAdd((int) addedUsers.values().stream().filter(Boolean.TRUE::equals).count()); + } + return container.map(BucketContainer::bucket); } } @@ -110,7 +118,7 @@ public class BucketRepositoryImpl implements BucketRepository { throw new IllegalArgumentException("Bucket '" + nodeId + "' does not exist"); } - var delta = userRepository.tryUnlinkWithBucket(bucket, users); + var delta = userRepository.unlinkWithBucket(bucket, users); if (buckets.get(bucket.getIdentifier()).used().getAndAdd(-delta) < 0) { throw new RuntimeException("Bucket has less than 0 users"); } diff --git a/src/main/java/ru/dragonestia/loadbalancer/repository/impl/NodeRepositoryImpl.java b/src/main/java/ru/dragonestia/loadbalancer/repository/impl/NodeRepositoryImpl.java index dc18e61..e3d5db1 100644 --- a/src/main/java/ru/dragonestia/loadbalancer/repository/impl/NodeRepositoryImpl.java +++ b/src/main/java/ru/dragonestia/loadbalancer/repository/impl/NodeRepositoryImpl.java @@ -1,6 +1,9 @@ package ru.dragonestia.loadbalancer.repository.impl; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Repository; import ru.dragonestia.loadbalancer.model.Node; +import ru.dragonestia.loadbalancer.repository.BucketRepository; import ru.dragonestia.loadbalancer.repository.NodeRepository; import java.util.List; @@ -8,8 +11,11 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +@Repository +@RequiredArgsConstructor public class NodeRepositoryImpl implements NodeRepository { + private final BucketRepository bucketRepository; private final Map nodeMap = new ConcurrentHashMap<>(); @Override @@ -21,6 +27,8 @@ public class NodeRepositoryImpl implements NodeRepository { nodeMap.put(node.identifier(), node); } + + bucketRepository.onCreateNode(node); } @Override @@ -28,6 +36,8 @@ public class NodeRepositoryImpl implements NodeRepository { synchronized (nodeMap) { nodeMap.remove(node.identifier()); } + + bucketRepository.onRemoveNode(node); } @Override diff --git a/src/main/java/ru/dragonestia/loadbalancer/repository/impl/UserRepositoryImpl.java b/src/main/java/ru/dragonestia/loadbalancer/repository/impl/UserRepositoryImpl.java index 9d92547..99e88aa 100644 --- a/src/main/java/ru/dragonestia/loadbalancer/repository/impl/UserRepositoryImpl.java +++ b/src/main/java/ru/dragonestia/loadbalancer/repository/impl/UserRepositoryImpl.java @@ -15,41 +15,22 @@ public class UserRepositoryImpl implements UserRepository { private final Map> usersMap = new ConcurrentHashMap<>(); @Override - public void linkWithBucket(Bucket bucket, Collection users) { + public Map linkWithBucket(Bucket bucket, Collection users) { + var result = new HashMap(); + synchronized (usersMap) { for (var user: users) { var set = usersMap.getOrDefault(user, new HashSet<>()); - set.add(bucket); + result.put(user, set.add(bucket)); usersMap.put(user, set); } } + + return result; } @Override - public void unlinkWithBucket(Bucket bucket, Collection users) { - synchronized (usersMap) { - for (var user: users) { - var set = usersMap.getOrDefault(user, new HashSet<>()); - set.remove(bucket); - - if (set.isEmpty()) { - usersMap.remove(user); - } else { - usersMap.put(user, set); - } - } - } - } - - @Override - public List findAllLinkedUserBuckets(User user) { - synchronized (usersMap) { - return usersMap.getOrDefault(user, new HashSet<>()).stream().toList(); - } - } - - @Override - public int tryUnlinkWithBucket(Bucket bucket, Collection users) { + public int unlinkWithBucket(Bucket bucket, Collection users) { var counter = new AtomicInteger(); synchronized (usersMap) { usersMap.forEach((user, set) -> { @@ -66,6 +47,13 @@ public class UserRepositoryImpl implements UserRepository { return counter.get(); } + @Override + public List findAllLinkedUserBuckets(User user) { + synchronized (usersMap) { + return usersMap.getOrDefault(user, new HashSet<>()).stream().toList(); + } + } + @Override public void onRemoveBucket(Bucket bucket) { synchronized (usersMap) {