Implemented bucket repository
This commit is contained in:
parent
f9547b9cbe
commit
ab446e7160
@ -1,11 +1,12 @@
|
||||
package ru.dragonestia.loadbalancer.model;
|
||||
|
||||
import lombok.AccessLevel;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import ru.dragonestia.loadbalancer.model.type.SlotLimit;
|
||||
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
|
||||
public class Bucket {
|
||||
|
||||
private final String identifier;
|
||||
@ -14,6 +15,10 @@ public class Bucket {
|
||||
private final String payload;
|
||||
private boolean locked = false;
|
||||
|
||||
public static Bucket create(String identifier, Node node, SlotLimit limit, String payload) {
|
||||
return new Bucket(identifier, node.identifier(), limit, payload);
|
||||
}
|
||||
|
||||
public void setLocked(boolean value) {
|
||||
locked = value;
|
||||
}
|
||||
|
||||
@ -3,4 +3,20 @@ package ru.dragonestia.loadbalancer.model;
|
||||
import lombok.NonNull;
|
||||
import ru.dragonestia.loadbalancer.model.type.LoadBalancingMethod;
|
||||
|
||||
public record Node(@NonNull String identifier, @NonNull LoadBalancingMethod method) {}
|
||||
public record Node(@NonNull String identifier, @NonNull LoadBalancingMethod method) {
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return identifier.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object object) {
|
||||
if (object == this) return true;
|
||||
if (object == null) return false;
|
||||
if (object instanceof Node other) {
|
||||
return identifier.equals(other.identifier);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,34 @@
|
||||
package ru.dragonestia.loadbalancer.repository;
|
||||
|
||||
import ru.dragonestia.loadbalancer.model.Bucket;
|
||||
import ru.dragonestia.loadbalancer.model.Node;
|
||||
import ru.dragonestia.loadbalancer.model.User;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface BucketRepository {
|
||||
|
||||
void createBucket(Bucket bucket);
|
||||
|
||||
void removeBucket(Bucket bucket);
|
||||
|
||||
Optional<Bucket> findBucket(Node node, String identifier);
|
||||
|
||||
List<Bucket> all(Node node);
|
||||
|
||||
default int countAvailableBuckets(Node node) {
|
||||
return countAvailableBuckets(node, 0);
|
||||
}
|
||||
|
||||
int countAvailableBuckets(Node node, int requiredSlots);
|
||||
|
||||
Optional<Bucket> pickFreeBucket(Node node, Collection<User> users);
|
||||
|
||||
void freeBucket(Bucket bucket, Collection<User> users);
|
||||
|
||||
void onCreateNode(Node node);
|
||||
|
||||
void onRemoveNode(Node node);
|
||||
}
|
||||
@ -13,4 +13,8 @@ public interface UserRepository {
|
||||
void unlinkWithBucket(Bucket bucket, Collection<User> users);
|
||||
|
||||
List<Bucket> findAllLinkedUserBuckets(User user);
|
||||
|
||||
int tryUnlinkWithBucket(Bucket bucket, Collection<User> users);
|
||||
|
||||
void onRemoveBucket(Bucket bucket);
|
||||
}
|
||||
|
||||
@ -0,0 +1,142 @@
|
||||
package ru.dragonestia.loadbalancer.repository.impl;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import ru.dragonestia.loadbalancer.model.Bucket;
|
||||
import ru.dragonestia.loadbalancer.model.Node;
|
||||
import ru.dragonestia.loadbalancer.model.User;
|
||||
import ru.dragonestia.loadbalancer.repository.BucketRepository;
|
||||
import ru.dragonestia.loadbalancer.repository.UserRepository;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@Repository
|
||||
@RequiredArgsConstructor
|
||||
public class BucketRepositoryImpl implements BucketRepository {
|
||||
|
||||
private final UserRepository userRepository;
|
||||
private final Map<Node, Buckets> node2bucketsMap = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public void createBucket(Bucket bucket) {
|
||||
var nodeId = bucket.getNodeIdentifier();
|
||||
var node = node2bucketsMap.keySet().stream()
|
||||
.filter(n -> bucket.getNodeIdentifier().equals(n.identifier()))
|
||||
.findFirst();
|
||||
|
||||
synchronized (node2bucketsMap) {
|
||||
if (node.isEmpty()) {
|
||||
throw new IllegalArgumentException("Node '" + nodeId + "' does not exist");
|
||||
}
|
||||
|
||||
node2bucketsMap.get(node.get()).put(bucket.getIdentifier(), new BucketContainer(bucket, new AtomicInteger(0)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeBucket(Bucket bucket) {
|
||||
var nodeId = bucket.getNodeIdentifier();
|
||||
var node = node2bucketsMap.keySet().stream()
|
||||
.filter(n -> bucket.getNodeIdentifier().equals(n.identifier()))
|
||||
.findFirst();
|
||||
|
||||
synchronized (node2bucketsMap) {
|
||||
if (node.isEmpty()) {
|
||||
throw new IllegalArgumentException("Node '" + nodeId + "' does not exist");
|
||||
}
|
||||
|
||||
node2bucketsMap.get(node.get()).remove(bucket.getIdentifier());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Bucket> findBucket(Node node, String identifier) {
|
||||
synchronized (node2bucketsMap) {
|
||||
if (!node2bucketsMap.containsKey(node)) {
|
||||
throw new IllegalArgumentException("Node '" + node.identifier() + "' does not exist");
|
||||
}
|
||||
|
||||
var result = node2bucketsMap.get(node).getOrDefault(identifier, null);
|
||||
return result == null? Optional.empty() : Optional.of(result.bucket());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Bucket> all(Node node) {
|
||||
synchronized (node2bucketsMap) {
|
||||
return node2bucketsMap.get(node).values().stream().map(BucketContainer::bucket).toList();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int countAvailableBuckets(Node node, int requiredSlots) {
|
||||
return (int) node2bucketsMap.get(node).values().stream()
|
||||
.filter(bucket -> bucket.isAvailable(requiredSlots))
|
||||
.count();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Bucket> pickFreeBucket(Node node, Collection<User> users) {
|
||||
synchronized (node2bucketsMap) {
|
||||
if (!node2bucketsMap.containsKey(node)) {
|
||||
throw new IllegalArgumentException("Node '" + node.identifier() + "' does not exist");
|
||||
}
|
||||
|
||||
var requiredSlots = users.size();
|
||||
var container = node2bucketsMap.get(node).values().stream()
|
||||
.filter(b -> b.isAvailable(requiredSlots))
|
||||
.findFirst();
|
||||
|
||||
return container.map(BucketContainer::bucket);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeBucket(Bucket bucket, Collection<User> users) {
|
||||
var nodeId = bucket.getNodeIdentifier();
|
||||
var node = node2bucketsMap.keySet().stream()
|
||||
.filter(n -> bucket.getNodeIdentifier().equals(n.identifier()))
|
||||
.findFirst();
|
||||
|
||||
synchronized (node2bucketsMap) {
|
||||
if (node.isEmpty()) {
|
||||
throw new IllegalArgumentException("Node '" + nodeId + "' does not exist");
|
||||
}
|
||||
|
||||
var buckets = node2bucketsMap.get(node.get());
|
||||
if (!buckets.containsKey(bucket.getIdentifier())) {
|
||||
throw new IllegalArgumentException("Bucket '" + nodeId + "' does not exist");
|
||||
}
|
||||
|
||||
var delta = userRepository.tryUnlinkWithBucket(bucket, users);
|
||||
if (buckets.get(bucket.getIdentifier()).used().getAndAdd(-delta) < 0) {
|
||||
throw new RuntimeException("Bucket has less than 0 users");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCreateNode(Node node) {
|
||||
synchronized (node2bucketsMap) {
|
||||
node2bucketsMap.put(node, new Buckets());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoveNode(Node node) {
|
||||
synchronized (node2bucketsMap) {
|
||||
node2bucketsMap.remove(node);
|
||||
}
|
||||
}
|
||||
|
||||
private record BucketContainer(Bucket bucket, AtomicInteger used) {
|
||||
|
||||
public boolean isAvailable(int requiredSlots) {
|
||||
return bucket.isAvailable(used.get(), requiredSlots);
|
||||
}
|
||||
}
|
||||
|
||||
private static class Buckets extends LinkedHashMap<String, BucketContainer> {}
|
||||
}
|
||||
@ -2,12 +2,12 @@ package ru.dragonestia.loadbalancer.repository.impl;
|
||||
|
||||
import org.springframework.stereotype.Repository;
|
||||
import ru.dragonestia.loadbalancer.model.Bucket;
|
||||
import ru.dragonestia.loadbalancer.model.Node;
|
||||
import ru.dragonestia.loadbalancer.model.User;
|
||||
import ru.dragonestia.loadbalancer.repository.UserRepository;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@Repository
|
||||
public class UserRepositoryImpl implements UserRepository {
|
||||
@ -31,10 +31,15 @@ public class UserRepositoryImpl implements UserRepository {
|
||||
for (var user: users) {
|
||||
var set = usersMap.getOrDefault(user, new HashSet<>());
|
||||
set.remove(bucket);
|
||||
|
||||
if (set.isEmpty()) {
|
||||
usersMap.remove(user);
|
||||
} else {
|
||||
usersMap.put(user, set);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Bucket> findAllLinkedUserBuckets(User user) {
|
||||
@ -42,4 +47,34 @@ public class UserRepositoryImpl implements UserRepository {
|
||||
return usersMap.getOrDefault(user, new HashSet<>()).stream().toList();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int tryUnlinkWithBucket(Bucket bucket, Collection<User> users) {
|
||||
var counter = new AtomicInteger();
|
||||
synchronized (usersMap) {
|
||||
usersMap.forEach((user, set) -> {
|
||||
if (!set.contains(bucket)) return;
|
||||
|
||||
set.remove(bucket);
|
||||
counter.incrementAndGet();
|
||||
|
||||
if (set.isEmpty()) {
|
||||
usersMap.remove(user);
|
||||
}
|
||||
});
|
||||
}
|
||||
return counter.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemoveBucket(Bucket bucket) {
|
||||
synchronized (usersMap) {
|
||||
usersMap.forEach((user, set) -> {
|
||||
set.remove(bucket);
|
||||
if (set.isEmpty()) {
|
||||
usersMap.remove(user);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user