Implemented bucket registration
This commit is contained in:
parent
ea5ff314ed
commit
d6d733e8f5
@ -70,8 +70,14 @@ public class NodeDetailsPage extends VerticalLayout implements BeforeEnterObserv
|
|||||||
printNodeDetails(node);
|
printNodeDetails(node);
|
||||||
add(new Hr());
|
add(new Hr());
|
||||||
add(registerBucket = new RegisterBucket(node, (bucket) -> {
|
add(registerBucket = new RegisterBucket(node, (bucket) -> {
|
||||||
// TODO: register bucket and getting all buckets
|
try {
|
||||||
return new RegisterBucket.Response(false, "");
|
bucketRepository.register(bucket);
|
||||||
|
return new RegisterBucket.Response(false, null);
|
||||||
|
} catch (Error error) {
|
||||||
|
return new RegisterBucket.Response(true, error.getMessage());
|
||||||
|
} finally {
|
||||||
|
bucketList.update(bucketRepository.all(node));
|
||||||
|
}
|
||||||
}));
|
}));
|
||||||
add(new Hr());
|
add(new Hr());
|
||||||
add(bucketList = new BucketList(node.identifier(), buckets));
|
add(bucketList = new BucketList(node.identifier(), buckets));
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
package ru.dragonestia.loadbalancer.web.repository;
|
package ru.dragonestia.loadbalancer.web.repository;
|
||||||
|
|
||||||
|
import ru.dragonestia.loadbalancer.web.model.Bucket;
|
||||||
import ru.dragonestia.loadbalancer.web.model.Node;
|
import ru.dragonestia.loadbalancer.web.model.Node;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -8,5 +9,7 @@ public interface BucketRepository {
|
|||||||
|
|
||||||
List<BucketInfo> all(Node node);
|
List<BucketInfo> all(Node node);
|
||||||
|
|
||||||
|
void register(Bucket bucket);
|
||||||
|
|
||||||
record BucketInfo(String identifier, int slots) {}
|
record BucketInfo(String identifier, int slots) {}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -3,10 +3,12 @@ package ru.dragonestia.loadbalancer.web.repository.impl;
|
|||||||
import com.vaadin.flow.spring.annotation.SpringComponent;
|
import com.vaadin.flow.spring.annotation.SpringComponent;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
import org.springframework.web.client.HttpClientErrorException;
|
||||||
import ru.dragonestia.loadbalancer.web.model.Bucket;
|
import ru.dragonestia.loadbalancer.web.model.Bucket;
|
||||||
import ru.dragonestia.loadbalancer.web.model.Node;
|
import ru.dragonestia.loadbalancer.web.model.Node;
|
||||||
import ru.dragonestia.loadbalancer.web.repository.BucketRepository;
|
import ru.dragonestia.loadbalancer.web.repository.BucketRepository;
|
||||||
import ru.dragonestia.loadbalancer.web.repository.impl.response.BucketListResponse;
|
import ru.dragonestia.loadbalancer.web.repository.impl.response.BucketListResponse;
|
||||||
|
import ru.dragonestia.loadbalancer.web.repository.impl.response.BucketRegisterResponse;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -34,4 +36,29 @@ public class BucketRepositoryImpl implements BucketRepository {
|
|||||||
|
|
||||||
return Objects.requireNonNull(entity.getBody()).buckets();
|
return Objects.requireNonNull(entity.getBody()).buckets();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void register(Bucket bucket) {
|
||||||
|
try {
|
||||||
|
var response = rest.post(URI.create("/nodes/" + bucket.getNodeIdentifier() + "/buckets"),
|
||||||
|
BucketRegisterResponse.class,
|
||||||
|
params -> {
|
||||||
|
params.put("identifier", bucket.getIdentifier());
|
||||||
|
params.put("slots", Integer.toString(bucket.getSlots().getSlots()));
|
||||||
|
params.put("payload", bucket.getPayload());
|
||||||
|
params.put("locked", Boolean.toString(bucket.isLocked()));
|
||||||
|
});
|
||||||
|
|
||||||
|
if (response.success()) return;
|
||||||
|
} catch (HttpClientErrorException ex) {
|
||||||
|
var response = ex.getResponseBodyAs(BucketRegisterResponse.class);
|
||||||
|
|
||||||
|
if (response != null) {
|
||||||
|
throw new Error(response.message());
|
||||||
|
}
|
||||||
|
|
||||||
|
log.throwing(ex);
|
||||||
|
throw new Error("Internal error. Check logs");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -51,6 +51,17 @@ public class RestUtil {
|
|||||||
params));
|
params));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public <T> ResponseEntity<T> postEntity(URI uri, Class<T> responseType, Consumer<Map<String, String>> paramsConsumer) {
|
||||||
|
var params = new HashMap<String, String>();
|
||||||
|
paramsConsumer.accept(params);
|
||||||
|
|
||||||
|
var template = restTemplate.get();
|
||||||
|
return template.postForEntity(buildPath(uri, params.keySet()),
|
||||||
|
null,
|
||||||
|
responseType,
|
||||||
|
params);
|
||||||
|
}
|
||||||
|
|
||||||
public void put(URI uri, Consumer<Map<String, String>> paramsConsumer) {
|
public void put(URI uri, Consumer<Map<String, String>> paramsConsumer) {
|
||||||
var params = new HashMap<String, String>();
|
var params = new HashMap<String, String>();
|
||||||
paramsConsumer.accept(params);
|
paramsConsumer.accept(params);
|
||||||
|
|||||||
@ -0,0 +1,3 @@
|
|||||||
|
package ru.dragonestia.loadbalancer.web.repository.impl.response;
|
||||||
|
|
||||||
|
public record BucketRegisterResponse(boolean success, String message) {}
|
||||||
@ -1,15 +1,18 @@
|
|||||||
package ru.dragonestia.loadbalancer.controller;
|
package ru.dragonestia.loadbalancer.controller;
|
||||||
|
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
import org.springframework.web.bind.annotation.GetMapping;
|
import org.springframework.web.bind.annotation.*;
|
||||||
import org.springframework.web.bind.annotation.PathVariable;
|
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
|
||||||
import ru.dragonestia.loadbalancer.controller.response.BucketListResponse;
|
import ru.dragonestia.loadbalancer.controller.response.BucketListResponse;
|
||||||
|
import ru.dragonestia.loadbalancer.controller.response.BucketRegisterResponse;
|
||||||
|
import ru.dragonestia.loadbalancer.controller.response.NodeRegisterResponse;
|
||||||
|
import ru.dragonestia.loadbalancer.model.Bucket;
|
||||||
|
import ru.dragonestia.loadbalancer.model.type.SlotLimit;
|
||||||
import ru.dragonestia.loadbalancer.service.BucketService;
|
import ru.dragonestia.loadbalancer.service.BucketService;
|
||||||
import ru.dragonestia.loadbalancer.service.NodeService;
|
import ru.dragonestia.loadbalancer.service.NodeService;
|
||||||
|
|
||||||
|
@Log4j2
|
||||||
@RestController
|
@RestController
|
||||||
@RequestMapping("/nodes/{nodeIdentifier}/buckets")
|
@RequestMapping("/nodes/{nodeIdentifier}/buckets")
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
@ -27,4 +30,30 @@ public class BucketController {
|
|||||||
.toList()
|
.toList()
|
||||||
))).orElseGet(() -> ResponseEntity.notFound().build());
|
))).orElseGet(() -> ResponseEntity.notFound().build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PostMapping
|
||||||
|
ResponseEntity<BucketRegisterResponse> registerBucket(@PathVariable(name = "nodeIdentifier") String nodeId,
|
||||||
|
@RequestParam(name = "identifier") String bucketId,
|
||||||
|
@RequestParam(name = "slots") int slots,
|
||||||
|
@RequestParam(name = "payload") String payload,
|
||||||
|
@RequestParam(name = "locked", defaultValue = "false") boolean locked) {
|
||||||
|
|
||||||
|
var nodeOpt = nodeService.findNode(nodeId);
|
||||||
|
|
||||||
|
if (nodeOpt.isEmpty()) {
|
||||||
|
return ResponseEntity.status(404)
|
||||||
|
.body(new BucketRegisterResponse(false, "Node does not exist"));
|
||||||
|
}
|
||||||
|
|
||||||
|
var bucket = Bucket.create(bucketId, nodeOpt.get(), SlotLimit.of(slots), payload);
|
||||||
|
bucket.setLocked(locked);
|
||||||
|
try {
|
||||||
|
bucketService.createBucket(bucket);
|
||||||
|
return ResponseEntity.ok(new BucketRegisterResponse(true, ""));
|
||||||
|
} catch (Error error) {
|
||||||
|
return ResponseEntity.status(400).body(new BucketRegisterResponse(false, error.getMessage()));
|
||||||
|
} catch (Exception ex) {
|
||||||
|
return ResponseEntity.status(500).body(new BucketRegisterResponse(false, ex.getMessage()));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,3 @@
|
|||||||
|
package ru.dragonestia.loadbalancer.controller.response;
|
||||||
|
|
||||||
|
public record BucketRegisterResponse(boolean success, String message) {}
|
||||||
@ -22,16 +22,21 @@ public class BucketRepositoryImpl implements BucketRepository {
|
|||||||
@Override
|
@Override
|
||||||
public void createBucket(Bucket bucket) {
|
public void createBucket(Bucket bucket) {
|
||||||
var nodeId = bucket.getNodeIdentifier();
|
var nodeId = bucket.getNodeIdentifier();
|
||||||
|
|
||||||
|
synchronized (node2bucketsMap) {
|
||||||
var node = node2bucketsMap.keySet().stream()
|
var node = node2bucketsMap.keySet().stream()
|
||||||
.filter(n -> bucket.getNodeIdentifier().equals(n.identifier()))
|
.filter(n -> bucket.getNodeIdentifier().equals(n.identifier()))
|
||||||
.findFirst();
|
.findFirst();
|
||||||
|
|
||||||
synchronized (node2bucketsMap) {
|
|
||||||
if (node.isEmpty()) {
|
if (node.isEmpty()) {
|
||||||
throw new IllegalArgumentException("Node '" + nodeId + "' does not exist");
|
throw new IllegalArgumentException("Node '" + nodeId + "' does not exist");
|
||||||
}
|
}
|
||||||
|
|
||||||
node2bucketsMap.get(node.get()).put(bucket.getIdentifier(), new BucketContainer(bucket, new AtomicInteger(0)));
|
var buckets = node2bucketsMap.get(node.get());
|
||||||
|
if (buckets.containsKey(bucket.getIdentifier())) {
|
||||||
|
throw new IllegalArgumentException("Bucket already exists");
|
||||||
|
}
|
||||||
|
buckets.put(bucket.getIdentifier(), new BucketContainer(bucket, new AtomicInteger(0)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user