Skywalking-09: oal principle -- how to save data through dynamically generated class classes

Switch 2021-10-14 05:47:00

OAL  How to dynamically generate Class  class , Save the data

Front work

OAL How to dynamically generate SourceDispatcher Add to DispatcherManager

 // org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService#load
public void load(OALDefine define) throws ModuleStartException {
if (oalDefineSet.contains(define)) {
// each oal define will only be activated once
return;
}
try {
OALEngine engine = loadOALEngine(define);
// Set up Stream Annotation listener , Used for processing org.apache.skywalking.oap.server.core.analysis.Stream annotation
StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(moduleManager);
engine.setStreamListener(streamAnnotationListener);
// org.apache.skywalking.oap.server.core.source.SourceReceiverImpl#getDispatcherDetectorListener
// What we get is org.apache.skywalking.oap.server.core.analysis.DispatcherManager object
engine.setDispatcherListener(moduleManager.find(CoreModule.NAME)
.provider()
.getService(SourceReceiver.class)
.getDispatcherDetectorListener());
// That's what's called org.apache.skywalking.oal.rt.OALRuntime#start
engine.start(OALEngineLoaderService.class.getClassLoader());
// Notify all listeners
engine.notifyAllListeners();
oalDefineSet.add(define);
} catch (ReflectiveOperationException | OALCompileException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}

stay org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService#load Method does the following :

  1. Set up Stream Annotation listener , It is used to obtain the basic information of the indicator class , And deal with it accordingly
@Stream(
name = "instance_jvm_class_loaded_class_count",
scopeId = 11000,
builder = InstanceJvmClassLoadedClassCountMetricsBuilder.class,
processor = MetricsStreamProcessor.class
)
public class InstanceJvmClassLoadedClassCountMetrics extends LongAvgMetrics implements WithMetadata {
// Omit
}
  1. Through the module manager , Get it first SourceReceiver object , Get... From this object DispatcherManager object
public class SourceReceiverImpl implements SourceReceiver {
@Getter
private final DispatcherManager dispatcherManager;
@Override
public DispatcherDetectorListener getDispatcherDetectorListener() {
return getDispatcherManager();
}
}
  1. start-up OAL engine
  2. Notify all listeners

org.apache.skywalking.oal.rt.OALRuntime#notifyAllListeners

 @Override
public void notifyAllListeners() throws ModuleStartException {
for (Class metricsClass : metricsClasses) {
try {
// Will dynamically generate Metrics Add to MetricsStreamProcessor
streamAnnotationListener.notify(metricsClass);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
for (Class dispatcherClass : dispatcherClasses) {
try {
// Add dynamically generated SourceDispatch to DispatcherManager
dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass);
} catch (Exception e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
}

org.apache.skywalking.oap.server.core.analysis.DispatcherManager#addIfAsSourceDispatcher

 @Override
public void addIfAsSourceDispatcher(Class aClass) throws IllegalAccessException, InstantiationException {
if (!aClass.isInterface() && !Modifier.isAbstract(
aClass.getModifiers()) && SourceDispatcher.class.isAssignableFrom(aClass)) {
Type[] genericInterfaces = aClass.getGenericInterfaces();
for (Type genericInterface : genericInterfaces) {
ParameterizedType anInterface = (ParameterizedType) genericInterface;
if (anInterface.getRawType().getTypeName().equals(SourceDispatcher.class.getName())) {
Type[] arguments = anInterface.getActualTypeArguments();
if (arguments.length != 1) {
throw new UnexpectedException("unexpected type argument number, class " + aClass.getName());
}
Type argument = arguments[0];
Object source = ((Class) argument).newInstance();
if (!Source.class.isAssignableFrom(source.getClass())) {
throw new UnexpectedException(
"unexpected type argument of class " + aClass.getName() + ", should be `org.apache.skywalking.oap.server.core.source.Source`. ");
}
Source dispatcherSource = (Source) source;
SourceDispatcher dispatcher = (SourceDispatcher) aClass.newInstance();
int scopeId = dispatcherSource.scope();
// Use scope do SourceDispatcher Map Of key
List<SourceDispatcher> dispatchers = this.dispatcherMap.get(scopeId);
if (dispatchers == null) {
dispatchers = new ArrayList<>();
this.dispatcherMap.put(scopeId, dispatchers);
}
// add to
dispatchers.add(dispatcher);
LOGGER.info("Dispatcher {} is added into DefaultScopeDefine {}.", dispatcher.getClass()
.getName(), scopeId);
}
}
}
}

OAL How to dynamically generate Metrics Add to MetricsStreamProcessor

And “ OAL  How to dynamically generate SourceDispatcher  Add to DispatcherManager ” The process is basically the same , It's all in org.apache.skywalking.oal.rt.OALRuntime#notifyAllListeners Method

 @Override
public void notifyAllListeners() throws ModuleStartException {
for (Class metricsClass : metricsClasses) {
try {
// Will dynamically generate Metrics Add to MetricsStreamProcessor
streamAnnotationListener.notify(metricsClass);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
for (Class dispatcherClass : dispatcherClasses) {
try {
// Add dynamically generated SourceDispatch to DispatcherManager
dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass);
} catch (Exception e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
}

org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener#notify

 @Override
public void notify(Class aClass) throws StorageException {
if (aClass.isAnnotationPresent(Stream.class)) {
Stream stream = (Stream) aClass.getAnnotation(Stream.class);
if (stream.processor().equals(RecordStreamProcessor.class)) {
RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(MetricsStreamProcessor.class)) {
// Because of all the Metrics On the class @Stream Annotated processor = MetricsStreamProcessor.class, So I will only take this branch
MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(TopNStreamProcessor.class)) {
TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(NoneStreamProcessor.class)) {
NoneStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(ManagementStreamProcessor.class)) {
ManagementStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else {
throw new UnexpectedException("Unknown stream processor.");
}
} else {
throw new UnexpectedException(
"Stream annotation listener could only parse the class present stream annotation.");
}
}

stay org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#create in , Through a series of processing , The final will be Worker ( processor ) Put in map in , Waiting for subsequent use

 /**
* Create the workers and work flow for every metrics.
*
* @param moduleDefineHolder pointer of the module define.
* @param stream definition of the metrics class.
* @param metricsClass data type of the streaming calculation.
*/
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Metrics> metricsClass) throws StorageException {
this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass);
}
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder,
StreamDefinition stream,
Class<? extends Metrics> metricsClass) throws StorageException {
if (DisableRegister.INSTANCE.include(stream.getName())) {
return;
}
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IMetricsDAO metricsDAO;
try {
// obtain @Stream Comment on the builder class , And create Metrics Storage DAO object
metricsDAO = storageDAO.newMetricsDao(stream.getBuilder().newInstance());
} catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e);
}
ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
DownSamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME)
.provider()
.getService(DownSamplingConfigService.class);
MetricsPersistentWorker hourPersistentWorker = null;
MetricsPersistentWorker dayPersistentWorker = null;
MetricsTransWorker transWorker = null;
final MetricsExtension metricsExtension = metricsClass.getAnnotation(MetricsExtension.class);
/**
* All metrics default are `supportDownSampling` and `insertAndUpdate`, unless it has explicit definition.
*/
boolean supportDownSampling = true;
boolean supportUpdate = true;
if (metricsExtension != null) {
supportDownSampling = metricsExtension.supportDownSampling();
supportUpdate = metricsExtension.supportUpdate();
}
if (supportDownSampling) {
if (configService.shouldToHour()) {
Model model = modelSetter.add(
metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Hour), false);
hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
if (configService.shouldToDay()) {
Model model = modelSetter.add(
metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Day), false);
dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
transWorker = new MetricsTransWorker(
moduleDefineHolder, hourPersistentWorker, dayPersistentWorker);
}
Model model = modelSetter.add(
metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Minute), false);
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(
moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);
String remoteReceiverWorkerName = stream.getName() + "_rec";
IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME)
.provider()
.getService(IWorkerInstanceSetter.class);
workerInstanceSetter.put(remoteReceiverWorkerName, minutePersistentWorker, metricsClass);
MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
moduleDefineHolder, remoteWorker, stream.getName());
// private Map<Class<? extends Metrics>, MetricsAggregateWorker> entryWorkers = new HashMap<>();
// The index class Class And MetricsAggregateWorker Put in map in
// When you need to process indicator data , from map Get it from
entryWorkers.put(metricsClass, aggregateWorker);
}

SourceReceiver  Handle Source  Related processes

stay “ Start with a case OAL  principle ” section , Talk about the oap server  Will be taken from agent  Indicator information received , Sent to the SourceReceive in
The corresponding coordinates are :org.apache.skywalking.oap.server.analyzer.provider.jvm.JVMSourceDispatcher#sendToClassMetricProcess 

 private void sendToClassMetricProcess(String service,
String serviceId,
String serviceInstance,
String serviceInstanceId,
long timeBucket,
Class clazz) {
// assemble Source object
ServiceInstanceJVMClass serviceInstanceJVMClass = new ServiceInstanceJVMClass();
serviceInstanceJVMClass.setId(serviceInstanceId);
serviceInstanceJVMClass.setName(serviceInstance);
serviceInstanceJVMClass.setServiceId(serviceId);
serviceInstanceJVMClass.setServiceName(service);
serviceInstanceJVMClass.setLoadedClassCount(clazz.getLoadedClassCount());
serviceInstanceJVMClass.setUnloadedClassCount(clazz.getUnloadedClassCount());
serviceInstanceJVMClass.setTotalLoadedClassCount(clazz.getTotalLoadedClassCount());
serviceInstanceJVMClass.setTimeBucket(timeBucket);
// take Source Object to SourceReceive To deal with
sourceReceiver.receive(serviceInstanceJVMClass);
}

SourceReceiver  The default implementation class org.apache.skywalking.oap.server.core.source.SourceReceiverImpl , Pass the collected indicators through org.apache.skywalking.oap.server.core.analysis.DispatcherManager#forward  distributed

package org.apache.skywalking.oap.server.core.source;
import java.io.IOException;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;
public class SourceReceiverImpl implements SourceReceiver {
@Getter
private final DispatcherManager dispatcherManager;
public SourceReceiverImpl() {
this.dispatcherManager = new DispatcherManager();
}
@Override
public void receive(Source source) {
// Forwarding through the provisioner Manager
dispatcherManager.forward(source);
}
@Override
public DispatcherDetectorListener getDispatcherDetectorListener() {
return getDispatcherManager();
}
public void scan() throws IOException, InstantiationException, IllegalAccessException {
dispatcherManager.scan();
}
}
 // org.apache.skywalking.oap.server.core.analysis.DispatcherManager#forward
public void forward(Source source) {
if (source == null) {
return;
}
// adopt source Of scope Find the corresponding scheduler
List<SourceDispatcher> dispatchers = dispatcherMap.get(source.scope());
/**
* Dispatcher is only generated by oal script analysis result.
* So these will/could be possible, the given source doesn't have the dispatcher,
* when the receiver is open, and oal script doesn't ask for analysis.
*/
if (dispatchers != null) {
source.prepare();
// Scheduler for distribution ,OAL Dynamically generated scheduler , It will also be distributed here
for (SourceDispatcher dispatcher : dispatchers) {
dispatcher.dispatch(source);
}
}
}

MetricsStreamProcessor How to deal with it SourceDispatcher  Indicator data sent

See... For the complete code “ OAL  How to dynamically generate Class  class ” Next “ Case study ” section

org.apache.skywalking.oap.server.core.source.oal.rt.dispatcher.ServiceInstanceJVMClassDispatcher#doInstanceJvmClassLoadedClassCount  Send data to org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor

package org.apache.skywalking.oap.server.core.source.oal.rt.dispatcher;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceJVMClass;
import org.apache.skywalking.oap.server.core.source.Source;
import org.apache.skywalking.oap.server.core.source.oal.rt.metrics.InstanceJvmClassLoadedClassCountMetrics;
public class ServiceInstanceJVMClassDispatcher implements SourceDispatcher<ServiceInstanceJVMClass> {
private void doInstanceJvmClassLoadedClassCount(ServiceInstanceJVMClass var1) {
InstanceJvmClassLoadedClassCountMetrics var2 = new InstanceJvmClassLoadedClassCountMetrics();
var2.setTimeBucket(var1.getTimeBucket());
var2.setEntityId(var1.getEntityId());
var2.setServiceId(var1.getServiceId());
var2.combine(var1.getLoadedClassCount(), (long)1);
// Send data to the index stream processor
MetricsStreamProcessor.getInstance().in(var2);
}
public void dispatch(Source var1) {
ServiceInstanceJVMClass var2 = (ServiceInstanceJVMClass)var1;
this.doInstanceJvmClassLoadedClassCount(var2);
}
public ServiceInstanceJVMClassDispatcher() {
}
}

stay org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#in  In the method , Use in org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#create Created in the Worker  object , Save the data

 public void in(Metrics metrics) {
MetricsAggregateWorker worker = entryWorkers.get(metrics.getClass());
if (worker != null) {
worker.in(metrics);
}
}

PS: More detailed internal data processing flow , Related keywords are : DataCarrier 、 Worker 、 StorageModule  , Not yet , Not the content of this article .

summary

Skywalking Metrics Processing flow

file

Reference documents

Share and record what you've learned
Please bring the original link to reprint ,thank
Similar articles

2021-10-14

2021-10-14

2021-10-14

2021-10-14

2021-10-14

2021-10-14

2021-10-14