Added linking between nodes, buckets, users

This commit is contained in:
Andrey Terentev 2023-11-16 13:01:02 +07:00
parent ab446e7160
commit 8a14640aaa
4 changed files with 37 additions and 32 deletions

View File

@ -5,16 +5,15 @@ import ru.dragonestia.loadbalancer.model.User;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
public interface UserRepository { public interface UserRepository {
void linkWithBucket(Bucket bucket, Collection<User> users); Map<User, Boolean> linkWithBucket(Bucket bucket, Collection<User> users);
void unlinkWithBucket(Bucket bucket, Collection<User> users); int unlinkWithBucket(Bucket bucket, Collection<User> users);
List<Bucket> findAllLinkedUserBuckets(User user); List<Bucket> findAllLinkedUserBuckets(User user);
int tryUnlinkWithBucket(Bucket bucket, Collection<User> users);
void onRemoveBucket(Bucket bucket); void onRemoveBucket(Bucket bucket);
} }

View File

@ -49,6 +49,8 @@ public class BucketRepositoryImpl implements BucketRepository {
node2bucketsMap.get(node.get()).remove(bucket.getIdentifier()); node2bucketsMap.get(node.get()).remove(bucket.getIdentifier());
} }
userRepository.onRemoveBucket(bucket);
} }
@Override @Override
@ -85,10 +87,16 @@ public class BucketRepositoryImpl implements BucketRepository {
} }
var requiredSlots = users.size(); var requiredSlots = users.size();
var container = node2bucketsMap.get(node).values().stream() var container = node2bucketsMap.get(node).values().stream() // TODO: pick bucket with used node balancing method
.filter(b -> b.isAvailable(requiredSlots)) .filter(b -> b.isAvailable(requiredSlots))
.findFirst(); .findFirst();
if (container.isPresent()) {
var cont = container.get();
var addedUsers = userRepository.linkWithBucket(cont.bucket(), users);
cont.used().getAndAdd((int) addedUsers.values().stream().filter(Boolean.TRUE::equals).count());
}
return container.map(BucketContainer::bucket); return container.map(BucketContainer::bucket);
} }
} }
@ -110,7 +118,7 @@ public class BucketRepositoryImpl implements BucketRepository {
throw new IllegalArgumentException("Bucket '" + nodeId + "' does not exist"); throw new IllegalArgumentException("Bucket '" + nodeId + "' does not exist");
} }
var delta = userRepository.tryUnlinkWithBucket(bucket, users); var delta = userRepository.unlinkWithBucket(bucket, users);
if (buckets.get(bucket.getIdentifier()).used().getAndAdd(-delta) < 0) { if (buckets.get(bucket.getIdentifier()).used().getAndAdd(-delta) < 0) {
throw new RuntimeException("Bucket has less than 0 users"); throw new RuntimeException("Bucket has less than 0 users");
} }

View File

@ -1,6 +1,9 @@
package ru.dragonestia.loadbalancer.repository.impl; package ru.dragonestia.loadbalancer.repository.impl;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;
import ru.dragonestia.loadbalancer.model.Node; import ru.dragonestia.loadbalancer.model.Node;
import ru.dragonestia.loadbalancer.repository.BucketRepository;
import ru.dragonestia.loadbalancer.repository.NodeRepository; import ru.dragonestia.loadbalancer.repository.NodeRepository;
import java.util.List; import java.util.List;
@ -8,8 +11,11 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@Repository
@RequiredArgsConstructor
public class NodeRepositoryImpl implements NodeRepository { public class NodeRepositoryImpl implements NodeRepository {
private final BucketRepository bucketRepository;
private final Map<String, Node> nodeMap = new ConcurrentHashMap<>(); private final Map<String, Node> nodeMap = new ConcurrentHashMap<>();
@Override @Override
@ -21,6 +27,8 @@ public class NodeRepositoryImpl implements NodeRepository {
nodeMap.put(node.identifier(), node); nodeMap.put(node.identifier(), node);
} }
bucketRepository.onCreateNode(node);
} }
@Override @Override
@ -28,6 +36,8 @@ public class NodeRepositoryImpl implements NodeRepository {
synchronized (nodeMap) { synchronized (nodeMap) {
nodeMap.remove(node.identifier()); nodeMap.remove(node.identifier());
} }
bucketRepository.onRemoveNode(node);
} }
@Override @Override

View File

@ -15,41 +15,22 @@ public class UserRepositoryImpl implements UserRepository {
private final Map<User, Set<Bucket>> usersMap = new ConcurrentHashMap<>(); private final Map<User, Set<Bucket>> usersMap = new ConcurrentHashMap<>();
@Override @Override
public void linkWithBucket(Bucket bucket, Collection<User> users) { public Map<User, Boolean> linkWithBucket(Bucket bucket, Collection<User> users) {
var result = new HashMap<User, Boolean>();
synchronized (usersMap) { synchronized (usersMap) {
for (var user: users) { for (var user: users) {
var set = usersMap.getOrDefault(user, new HashSet<>()); var set = usersMap.getOrDefault(user, new HashSet<>());
set.add(bucket); result.put(user, set.add(bucket));
usersMap.put(user, set); usersMap.put(user, set);
} }
} }
return result;
} }
@Override @Override
public void unlinkWithBucket(Bucket bucket, Collection<User> users) { public int unlinkWithBucket(Bucket bucket, Collection<User> users) {
synchronized (usersMap) {
for (var user: users) {
var set = usersMap.getOrDefault(user, new HashSet<>());
set.remove(bucket);
if (set.isEmpty()) {
usersMap.remove(user);
} else {
usersMap.put(user, set);
}
}
}
}
@Override
public List<Bucket> findAllLinkedUserBuckets(User user) {
synchronized (usersMap) {
return usersMap.getOrDefault(user, new HashSet<>()).stream().toList();
}
}
@Override
public int tryUnlinkWithBucket(Bucket bucket, Collection<User> users) {
var counter = new AtomicInteger(); var counter = new AtomicInteger();
synchronized (usersMap) { synchronized (usersMap) {
usersMap.forEach((user, set) -> { usersMap.forEach((user, set) -> {
@ -66,6 +47,13 @@ public class UserRepositoryImpl implements UserRepository {
return counter.get(); return counter.get();
} }
@Override
public List<Bucket> findAllLinkedUserBuckets(User user) {
synchronized (usersMap) {
return usersMap.getOrDefault(user, new HashSet<>()).stream().toList();
}
}
@Override @Override
public void onRemoveBucket(Bucket bucket) { public void onRemoveBucket(Bucket bucket) {
synchronized (usersMap) { synchronized (usersMap) {