Some services in the project need to monitor the startup information of other micro-services, and the master needs to send requests to pull some configurations after the startup. The eureka-client does not provide an event for the start of other services. The Eureka-server does provide an event for the start of services in its own Eureka-Server, and then sends the service start information to kafka message queues. The service listens to Kafka messages in a way that depends on message queues: source code; Or modify Eureka-client, because there is not much code to change, modify the source code to rely on their own maintenance is not convenient, here through Javassist directly modify the BYtecode in the JAR.

Use Javassist to modify the bytecode

  • View the source in eureka, a com.net flix. Discovery. Shared. The Application class, addInstance method in service launched or updated, this method in the service call when offline, so modify the two methods, Implement listening services on and offline.
  • Because of spring-Boot’s own class loading mechanism, running Javassist as a Spring-Boot JAR will fail to scan the package, and insertClassPath will be used to add the scan path.
  • Through setBody modification method body, respectively to add me. Flyleft. Eureka. Client. Event. EurekaEventHandler. GetInstance (). EurekaAddInstance ($1); And me. Flyleft. Eureka. Client. Event. EurekaEventHandler. GetInstance () eurekaRemoveInstance ($1);
  • After the original class is overwritten by toClass, it is reloaded by the class loader.
public void init(a) {
        try {
            ClassPool classPool = new ClassPool(true);
            // Add the scanning path of com.netflix. Discovery package
            ClassClassPath classPath = new ClassClassPath(Applications.class);
            classPool.insertClassPath(classPath);
            // Get the Application class to be modified
            CtClass ctClass = classPool.get(APPLICATION_PATH);
            // Get the addInstance method
            CtMethod addInstanceMethod = ctClass.getDeclaredMethod("addInstance");
            // Modify the addInstance method
            addInstanceMethod.setBody("{instancesMap.put($1.getId(), $1);"
                    + "synchronized (instances) {me.flyleft.eureka.client.event.EurekaEventHandler.getInstance().eurekaAddInstance($1);" +
                    "instances.remove($1); instances.add($1); isDirty = true; }}");
            Get the removeInstance method
            CtMethod removeInstanceMethod = ctClass.getDeclaredMethod("removeInstance");
            // Modify the removeInstance method
            removeInstanceMethod.setBody("{me.flyleft.eureka.client.event.EurekaEventHandler.getInstance().eurekaRemoveInstance($1); this.removeInstance($1, true); }");
            // Override the original Application class
            ctClass.toClass();
            // Reload the Application class using the class loader
            classPool.getClassLoader().loadClass(APPLICATION_PATH);
            Class.forName(APPLICATION_PATH);
        } catch (Exception e) {
            throw newEurekaEventException(e); }}Copy the code
  • Put in the main function and execute it before Spring Boot starts or use spring Boot events to execute it before spring bean initialization. (Make sure it is executed before eureka’s first execution)
@SpringBootApplication
@EnableEurekaClient
public class EurekaClientApplication {

    public static void main(String[] args) {
        // Modify the bytecode code first
        EurekaEventHandler.getInstance().init();
        new SpringApplicationBuilder(EurekaClientApplication.class).web(true).run(args); }}Copy the code

Implement the Observer and subscriber patterns using observables and Observers in the JDK

  • Send events using setChanged and notifyObservers of java.util.Observable
public class EurekaEventObservable extends Observable {
    public void sendEvent(EurekaEventPayload payload) { setChanged(); notifyObservers(payload); }}Copy the code
  • Receive events using update using java.util.Observer
public abstract class AbstractEurekaEventObserver implements Observer.EurekaEventService {
      @Override
        public void update(Observable o, Object arg) {
            if (arg instanceof EurekaEventPayload) {
                EurekaEventPayload payload = (EurekaEventPayload) arg;
                if (InstanceInfo.InstanceStatus.UP.name().equals(payload.getStatus())) {
                    LOGGER.info("Receive UP event, payload: {}", payload);
                } else {
                    LOGGER.info("Receive DOWN event, payload: {}", payload); } putPayloadInCache(payload); consumerEventWithAutoRetry(payload); }}}Copy the code

Automatic retry using RxJava.

After the service is started, some operations are performed. If the operation fails, the system automatically retries the specified number of times. Each event is retried once

private void consumerEventWithAutoRetry(final EurekaEventPayload payload) {
    rx.Observable.just(payload)
            .map(t -> {
                // Here are some actions to be performed when the service is started
                consumerEvent(payload);
                return payload;
            }).retryWhen(x -> x.zipWith(rx.Observable.range(1, retryTime),
            (t, retryCount) -> {
               // Exception handling
                if (retryCount >= retryTime) {
                    if (t instanceof RemoteAccessException || t instanceof RestClientException) {
                        LOGGER.warn("error.eurekaEventObserver.fetchError, payload {}", payload, t);
                    } else {
                        LOGGER.warn("error.eurekaEventObserver.consumerError, payload {}", payload, t); }}return retryCount;
            }).flatMap(y -> rx.Observable.timer(retryInterval, TimeUnit.SECONDS)))
            .subscribeOn(Schedulers.io())
            .subscribe((EurekaEventPayload payload1) -> {
            });
}
Copy the code

The interface for manual retry failure was added

If automatic retry fails, you can manually retry. Add a manual retry interface

@RestController
@RequestMapping(value = "/v1/eureka/events")
public class EurekaEventEndpoint {

    private EurekaEventService eurekaEventService;

    public EurekaEventEndpoint(EurekaEventService eurekaEventService) {
        this.eurekaEventService = eurekaEventService;
    }

    @Permission(permissionLogin = true)
    @ApiOperation(value = "Get a list of unconsumed events")
    @GetMapping
    public List<EurekaEventPayload> list(@RequestParam(value = "service", required = false) String service) {
        return eurekaEventService.unfinishedEvents(service);
    }

    @Permission(permissionLogin = true)
    @ApiOperation(value = "Manually retry unconsumed events")
    @PostMapping("retry")
    public List<EurekaEventPayload> retry(@RequestParam(value = "id", required = false) String id,
                                          @RequestParam(value = "service", required = false) String service) {
        returneurekaEventService.retryEvents(id, service); }}Copy the code

The source code