sequence

In this paper, we study the skywalking ServiceAndEndpointRegisterClient

ServiceAndEndpointRegisterClient

Skywalking – 6.6.0 / apm – sniffers/apm – agent – core/SRC/main/Java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpoint RegisterClient.java

@DefaultImplementor public class ServiceAndEndpointRegisterClient implements BootService, Runnable, GRPCChannelListener { private static final ILog logger = LogManager.getLogger(ServiceAndEndpointRegisterClient.class); private static String INSTANCE_UUID; private static List<KeyStringValuePair> SERVICE_INSTANCE_PROPERTIES; private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; private volatile RegisterGrpc.RegisterBlockingStub registerBlockingStub; private volatile ServiceInstancePingGrpc.ServiceInstancePingBlockingStub serviceInstancePingStub; private volatile ScheduledFuture<? > applicationRegisterFuture; private volatile long coolDownStartTime = -1; @Override public void statusChanged(GRPCChannelStatus status) {if (GRPCChannelStatus.CONNECTED.equals(status)) {
            Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
            registerBlockingStub = RegisterGrpc.newBlockingStub(channel);
            serviceInstancePingStub = ServiceInstancePingGrpc.newBlockingStub(channel);
        } else {
            registerBlockingStub = null;
            serviceInstancePingStub = null;
        }
        this.status = status;
    }

    @Override
    public void prepare() throws Throwable {
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);

        INSTANCE_UUID = StringUtil.isEmpty(Config.Agent.INSTANCE_UUID) ? UUID.randomUUID().toString()
            .replaceAll("-"."") : Config.Agent.INSTANCE_UUID;

        SERVICE_INSTANCE_PROPERTIES = new ArrayList<KeyStringValuePair>();

        for (String key : Config.Agent.INSTANCE_PROPERTIES.keySet()) {
            SERVICE_INSTANCE_PROPERTIES.add(KeyStringValuePair.newBuilder()
                .setKey(key).setValue(Config.Agent.INSTANCE_PROPERTIES.get(key)).build());
        }
    }

    @Override
    public void boot() throws Throwable {
        applicationRegisterFuture = Executors
            .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ServiceAndEndpointRegisterClient"))
            .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
                @Override
                public void handle(Throwable t) {
                    logger.error("unexpected exception.", t);
                }
            }), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
    }

    @Override
    public void onComplete() throws Throwable {
    }

    @Override
    public void shutdown() throws Throwable {
        applicationRegisterFuture.cancel(true);
    }

    @Override
    public void run() {
        logger.debug("ServiceAndEndpointRegisterClient running, status:{}.", status);

        if (coolDownStartTime > 0) {
            final long coolDownDurationInMillis = TimeUnit.MINUTES.toMillis(Config.Agent.COOL_DOWN_THRESHOLD);
            if (System.currentTimeMillis() - coolDownStartTime < coolDownDurationInMillis) {
                logger.warn("The agent is cooling down, won't register itself");
                return;
            } else {
                logger.warn("The agent is re-registering itself to backend");
            }
        }
        coolDownStartTime = -1;

        boolean shouldTry = true;
        while (GRPCChannelStatus.CONNECTED.equals(status) && shouldTry) {
            shouldTry = false;
            try {
                if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) {
                    if(registerBlockingStub ! = null) { ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).doServiceRegister( Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build());if(serviceRegisterMapping ! = null) {for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) {
                                if (Config.Agent.SERVICE_NAME.equals(registered.getKey())) {
                                    RemoteDownstreamConfig.Agent.SERVICE_ID = registered.getValue();
                                    shouldTry = true;
                                }
                            }
                        }
                    }
                } else {
                    if(registerBlockingStub ! = null) {if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) {

                            ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
                                    .doServiceInstanceRegister(ServiceInstances.newBuilder()
                                .addInstances(
                                    ServiceInstance.newBuilder()
                                        .setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID)
                                        .setInstanceUUID(INSTANCE_UUID)
                                        .setTime(System.currentTimeMillis())
                                        .addAllProperties(OSUtil.buildOSInfo())
                                        .addAllProperties(SERVICE_INSTANCE_PROPERTIES)
                                ).build());
                            for (KeyIntValuePair serviceInstance : instanceMapping.getServiceInstancesList()) {
                                if (INSTANCE_UUID.equals(serviceInstance.getKey())) {
                                    int serviceInstanceId = serviceInstance.getValue();
                                    if(serviceInstanceId ! = DictionaryUtil.nullValue()) { RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID = serviceInstanceId; RemoteDownstreamConfig.Agent.INSTANCE_REGISTERED_TIME = System.currentTimeMillis(); }}}}else {
                            final Commands commands = serviceInstancePingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
                                .doPing(ServiceInstancePingPkg.newBuilder()
                                .setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID)
                                .setTime(System.currentTimeMillis())
                                .setServiceInstanceUUID(INSTANCE_UUID)
                                .build());

                            NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS));
                            EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS));
                            ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
                        }
                    }
                }
            } catch (Throwable t) {
                logger.error(t, "ServiceAndEndpointRegisterClient execute fail.");
                ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
            }
        }
    }

    public void coolDown() { this.coolDownStartTime = System.currentTimeMillis(); }}Copy the code
  • ServiceAndEndpointRegisterClient implements BootService Runnable, GRPCChannelListener interface; The prepare method sets INSTANCE_UUID and initializes SERVICE_INSTANCE_PROPERTIES. The boot method registers the run method with config.collector. APP_AND_SERVICE_REGISTER_CHECK_INTERVAL. The shutdown method cancels the scheduling. The statusChanged method updates registerBlockingStub, serviceInstancePingStub, and Status; Run method mainly perform doServiceRegister, doServiceInstanceRegister, doPing, NetworkAddressDictionary. INSTANCE. SyncRemoteDictionary, Endp ointNameDictionary.INSTANCE.syncRemoteDictionary

NetworkAddressDictionary

Skywalking – 6.6.0 / apm – sniffers/apm – agent – core/SRC/main/Java/org/apache/skywalking/apm/agent/core/dictionary/NetworkAddress Dictionary.java

public enum NetworkAddressDictionary {
    INSTANCE;
    private Map<String, Integer> serviceDictionary = new ConcurrentHashMap<String, Integer>();
    private Set<String> unRegisterServices = new ConcurrentSet<String>();

    public PossibleFound find(String networkAddress) {
        Integer applicationId = serviceDictionary.get(networkAddress);
        if(applicationId ! = null) {return new Found(applicationId);
        } else {
            if (serviceDictionary.size() + unRegisterServices.size() < SERVICE_CODE_BUFFER_SIZE) {
                unRegisterServices.add(networkAddress);
            }
            return new NotFound();
        }
    }

    public void syncRemoteDictionary(
        RegisterGrpc.RegisterBlockingStub networkAddressRegisterServiceBlockingStub) {
        if (unRegisterServices.size() > 0) {
            NetAddressMapping networkAddressMappings = networkAddressRegisterServiceBlockingStub.doNetworkAddressRegister(
                NetAddresses.newBuilder().addAllAddresses(unRegisterServices).build());
            if (networkAddressMappings.getAddressIdsCount() > 0) {
                for (KeyIntValuePair keyWithIntegerValue : networkAddressMappings.getAddressIdsList()) {
                    unRegisterServices.remove(keyWithIntegerValue.getKey());
                    serviceDictionary.put(keyWithIntegerValue.getKey(), keyWithIntegerValue.getValue());
                }
            }
        }
    }

    public void clear() { this.serviceDictionary.clear(); }}Copy the code
  • NetworkAddressDictionary syncRemoteDictionary method performs networkAddressRegisterServiceBlockingStub doNetworkAddressRegister

EndpointNameDictionary

Skywalking – 6.6.0 / apm – sniffers/apm – agent – core/SRC/main/Java/org/apache/skywalking/apm/agent/core/dictionary/EndpointNameDi ctionary.java

public enum EndpointNameDictionary {
    INSTANCE;
    private static final ILog logger = LogManager.getLogger(EndpointNameDictionary.class);

    private Map<OperationNameKey, Integer> endpointDictionary = new ConcurrentHashMap<OperationNameKey, Integer>();
    private Set<OperationNameKey> unRegisterEndpoints = new ConcurrentSet<OperationNameKey>();

    public PossibleFound findOrPrepare4Register(int serviceId, String endpointName) {
        return find0(serviceId, endpointName, true);
    }

    public PossibleFound findOnly(int serviceId, String endpointName) {
        return find0(serviceId, endpointName, false);
    }

    private PossibleFound find0(int serviceId, String endpointName,
        boolean registerWhenNotFound) {
        if (endpointName == null || endpointName.length() == 0) {
            return new NotFound();
        }
        OperationNameKey key = new OperationNameKey(serviceId, endpointName);
        Integer operationId = endpointDictionary.get(key);
        if(operationId ! = null) {return new Found(operationId);
        } else {
            if (registerWhenNotFound &&
                endpointDictionary.size() + unRegisterEndpoints.size() < ENDPOINT_NAME_BUFFER_SIZE) {
                unRegisterEndpoints.add(key);
            }
            return new NotFound();
        }
    }

    public void syncRemoteDictionary(
        RegisterGrpc.RegisterBlockingStub serviceNameDiscoveryServiceBlockingStub) {
        if (unRegisterEndpoints.size() > 0) {
            Endpoints.Builder builder = Endpoints.newBuilder();
            for (OperationNameKey operationNameKey : unRegisterEndpoints) {
                Endpoint endpoint = Endpoint.newBuilder()
                    .setServiceId(operationNameKey.getServiceId())
                    .setEndpointName(operationNameKey.getEndpointName())
                    .setFrom(DetectPoint.server)
                    .build();
                builder.addEndpoints(endpoint);
            }
            EndpointMapping serviceNameMappingCollection = serviceNameDiscoveryServiceBlockingStub.doEndpointRegister(builder.build());
            if (serviceNameMappingCollection.getElementsCount() > 0) {
                for (EndpointMappingElement element : serviceNameMappingCollection.getElementsList()) {
                    OperationNameKey key = new OperationNameKey(
                        element.getServiceId(),
                        element.getEndpointName());
                    unRegisterEndpoints.remove(key);
                    endpointDictionary.put(key, element.getEndpointId());
                }
            }
        }
    }

    public void clear() { endpointDictionary.clear(); } / /... }Copy the code
  • EndpointNameDictionary syncRemoteDictionary will perform serviceNameDiscoveryServiceBlockingStub. DoEndpointRegister (builder. The build () )

summary

ServiceAndEndpointRegisterClient mainly perform doServiceRegister, doServiceInstanceRegister, doPing, NetworkAddressDictionary. INSTANC E.s yncRemoteDictionary, EndpointNameDictionary. INSTANCE. SyncRemoteDictionary

doc

  • ServiceAndEndpointRegisterClient