顯示具有 Spring 標籤的文章。 顯示所有文章
顯示具有 Spring 標籤的文章。 顯示所有文章

2024年6月15日 星期六

Spring Cache + AspectJ 背後的大致流程

因為最近在專案中整合 Spring Cache,不過原生的 Spring Cache 支援的功能有點基本,我們想要擴充它的功能,所以需要稍微了解一下 Spring Cache 背後到底怎麼運作的,我們才有辦法找出比較合適的擴充方式。

網路上可以找到一些中國網友的原始碼解析,不過大多數找到的都是用預設的 Spring AOP,但我們專案的狀況會使用 AspectJ,所以這邊會紀錄 AspectJ 的狀況是如何運作的。

另外,因為我們還在用 Spring Boot 2.7,所以這裡會先關注在 Spring v5.3.31 版的原始碼。相關的 Spring Cache 官方文件可以參考這裡

Spring Cache 的使用方式

那麼要從哪裡開始追蹤呢?先來看一下 Spring Cache 怎麼使用的。

@Configuration
@EnableCaching
public class AppConfig {
}

@Cacheable("books")
public Book findBook(ISBN isbn) {...}

Spring Cache 最基本的使用方式,就是像上面的例子那樣,用 @Cacheable 註解來告訴 Spring 說 findBook(...) 這個 method 要被 cache,並且需要提供一個 configuration,上面有 @EnableCaching 來要求要啟用 cache。這裡其實還需要一點其他的設定,不過因為不是這裡的重點,所以我都先略過 😆。總之,起點就是 @EnableCaching 了。

Cache 的載入流程

載入 Configuration

首先從 @EnableCaching 這個註解開始看,它的內容大概是這樣:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(CachingConfigurationSelector.class)
public @interface EnableCaching {
}

可以注意到,它宣告了 @Import,因此 Spring 會使用它指定的 CachingConfigurationSelector 來得知到載入什麼東西。

CachingConfigurationSelector (git) 是一個透過 parent 實作了 ImportSelector 的類別,因此它主要提供 selectImports 這個方法,用來告訴 Spring 說要載入的 class 的 FQDN 是什麼。比較重要的內容如下:

public class CachingConfigurationSelector extends AdviceModeImportSelector<EnableCaching> {
  ...
  private static final String CACHE_ASPECT_CONFIGURATION_CLASS_NAME =
            "org.springframework.cache.aspectj.AspectJCachingConfiguration";
  ...

  @Override
  public String[] selectImports(AdviceMode adviceMode) {
    switch (adviceMode) {
      case PROXY:
        return getProxyImports();
      case ASPECTJ:
        return getAspectJImports();
      default:
        return null;
    }
  }

  ...
  private String[] getAspectJImports() {
    List<String> result = new ArrayList<>(2);
    result.add(CACHE_ASPECT_CONFIGURATION_CLASS_NAME);
    if (jsr107Present && jcacheImplPresent) {
      result.add(JCACHE_ASPECT_CONFIGURATION_CLASS_NAME);
    }
    return StringUtils.toStringArray(result);
    }

selectImports() 中,會依據註解設定的 adviceMode 決定 import 時要給哪個 class name,因為前面提到我們要用的是 AspectJ,並且我們並沒有要用 JSR-107,所以這裡就只看純 AspectJ 的狀況。所以可以看出,這裡如果 adviseMode 設定為 AspectJ 的話,它會回覆的 FQDN 是 org.springframework.cache.aspectj.AspectJCachingConfiguration

於是接著來看一下 AspectJ 的 Configuration (git) 寫了什麼~。

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class AspectJCachingConfiguration extends AbstractCachingConfiguration {

  @Bean(name = CacheManagementConfigUtils.CACHE_ASPECT_BEAN_NAME)
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public AnnotationCacheAspect cacheAspect() {
    AnnotationCacheAspect cacheAspect = AnnotationCacheAspect.aspectOf();
    cacheAspect.configure(this.errorHandler, this.keyGenerator, this.cacheResolver, this.cacheManager);
    return cacheAspect;
  }

}

這裡最主要就是提供一個 Bean,這個 Bean 是用來建立 AnnotationCacheAspect 的,並且在建立的同時會把註解上的設定、以及其他需要建立的例如 ErrorHandlerKeyGeneratorCacheResolverCacheManager 等東西都一起注入進去。

所以到這裡稍微總結一下,這整段代表的意思,是當 Spring 讀到 @EnableCaching 註解時,會基於這個 Selector 決定去載入指定的 Configuration,然後就會依據 Configuration 的內容去建立包含 CacheManager 等等的 Bean,並且初始化一個 Aspect,把那些 Aspect 需要用到的 Bean 注入到 Aspect 當中。

Aspect 被初始化以後,後面的主要工作應該就是 AspectJ 的範疇了。不過因為我目前還沒有很完整地看過 AspectJ 的文件,所以只有大略知道一些資訊,如果紀錄有誤歡迎提醒 😆。

AspectJ 在縫合的時候,會讓 Aspect 擁有 aspectOf() 的 method (doc),所以 Spring 才有辦法呼叫 aspectOf() 來取得這個 Aspect,並且對它做設定。

載入並註冊 Aspect

前段被建立的 AnnotationCacheAspect (git) 是一個 .aj 檔,所以內容基本上就是 AspectJ 的定義了:

public aspect AnnotationCacheAspect extends AbstractCacheAspect {

  public AnnotationCacheAspect() {
    super(new AnnotationCacheOperationSource(false));
  }

  /**
   * Matches the execution of any public method in a type with the @{@link Cacheable}
   * annotation, or any subtype of a type with the {@code @Cacheable} annotation.
   */
  private pointcut executionOfAnyPublicMethodInAtCacheableType() :
      execution(public * ((@Cacheable *)+).*(..)) && within(@Cacheable *);

  ...

  /**
   * Definition of pointcut from super aspect - matched join points will have Spring
   * cache management applied.
   */
  protected pointcut cacheMethodExecution(Object cachedObject) :
    (executionOfAnyPublicMethodInAtCacheableType()
        || executionOfAnyPublicMethodInAtCacheEvictType()
        || executionOfAnyPublicMethodInAtCachePutType()
        || executionOfAnyPublicMethodInAtCachingType()
        || executionOfCacheableMethod()
        || executionOfCacheEvictMethod()
        || executionOfCachePutMethod()
        || executionOfCachingMethod())
      && this(cachedObject);

這裡我把其他部份都略過了,只先看它定義關於 @Cacheable 的部份。它定義了一個 pointcut 叫做 executionOfAnyPublicMethodInAtCacheableType(),會被橫切到所有有標註 @Cacheable 的 public method 上。那麼 pointcut 要執行的邏輯是什麼呢?這就需要看它的 parent AbstractCacheAspect (git) 了。

public abstract aspect AbstractCacheAspect extends CacheAspectSupport implements DisposableBean {
  ...
  @SuppressAjWarnings("adviceDidNotMatch")
  Object around(final Object cachedObject) : cacheMethodExecution(cachedObject) {
    MethodSignature methodSignature = (MethodSignature) thisJoinPoint.getSignature();
    Method method = methodSignature.getMethod();

    CacheOperationInvoker aspectJInvoker = new CacheOperationInvoker() {
        public Object invoke() {
          try {
            return proceed(cachedObject);
          }
          catch (Throwable ex) {
            throw new ThrowableWrapper(ex);
          }
        }
    };

    try {
      return execute(aspectJInvoker, thisJoinPoint.getTarget(), method, thisJoinPoint.getArgs());
    }
    catch (CacheOperationInvoker.ThrowableWrapper th) {
      AnyThrow.throwUnchecked(th.getOriginal());
      return null; // never reached
    }
  }
}

可以看出,它定義了一個 around 並綁定在 cacheMethodExecution() 上,而 cacheMethodExecution() 這個 pointcut 則是定義一系列的 pointcuts。所以我的理解是,它應該是把 around 綁到所有它列出來的 pointcut,也就是像是 @Cacheable`CachePut 等等的 pointcut。

接著來看這個 around 裡面的內容。它首先先建立了一個 CacheOperationInvoker 的 instance,這個 invoker 的目的是用來呼叫被 cache 的 method 的,接著就去呼叫 execute(),把 JoinPoint 的資訊以及剛剛建立的 invoker 都傳進去。

execute()AbstractCacheAspect 從它的 parent CacheAspectSupport 繼承來的,所以接著就再去看 CacheAspectSupport 裡的內容。不過裡面內容蠻多的,所以只先節錄了 execute() 的部份:

@Nullable
protected Object execute(CacheOperationInvoker invoker, Object target, Method method, Object[] args) {
  // Check whether aspect is enabled (to cope with cases where the AJ is pulled in automatically)
  if (this.initialized) {
    Class<?> targetClass = getTargetClass(target);
    CacheOperationSource cacheOperationSource = getCacheOperationSource();
    if (cacheOperationSource != null) {
      Collection<CacheOperation> operations = cacheOperationSource.getCacheOperations(method, targetClass);
      if (!CollectionUtils.isEmpty(operations)) {
        return execute(invoker, method,
            new CacheOperationContexts(operations, method, args, target, targetClass));
      }
    }
  }

  return invoker.invoke();
}

這段最主要的行為,是在檢查自己初始化的狀態,如果初始化還沒完成,就直接呼叫 invoker,否則就把 cache 的服務 (即 CacheOperationSource) 準備好,然後再去呼叫第二個 execute()

第二個 execute() (git) 內容更長,所以這裡也是先篩掉一些內容,只先看最單純的部份:

private Object execute(final CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) {
  // Special handling of synchronized invocation
  if (contexts.isSynchronized()) {
    ...
  }

  ...

  // Check if we have a cached value matching the conditions
  Cache.ValueWrapper cacheHit = findCachedItem(contexts.get(CacheableOperation.class));

  ...

  Object cacheValue;
  Object returnValue;

  if (cacheHit != null && !hasCachePut(contexts)) {
    // If there are no put requests, just use the cache hit
    cacheValue = cacheHit.get();
    returnValue = wrapCacheValue(method, cacheValue);
  }
  else {
    // Invoke the method if we don't have a cache hit
    returnValue = invokeOperation(invoker);
    cacheValue = unwrapReturnValue(returnValue);
  }

  ...

  // Process any collected put requests, either from @CachePut or a @Cacheable miss
  for (CachePutRequest cachePutRequest : cachePutRequests) {
    cachePutRequest.apply(cacheValue);
  }

  ...

  return returnValue;
}

節錄下來的部份就是 Spring Cache 實際上操作 cache 服務和被 cache 的 method 的行為了。首先它透過 findCachedItem() 取得 cache 的結果,如果 cache hit 了,就把它放進 returnValue 中;反之,如果 cache miss 了,則會去執行被 cache 的 method 取得最新的執行結果,然後放進 returnValue 裡。

中間有一部分收集 put requests 的程式碼被我過濾掉了沒貼進來,不過大致上就是最後它會依據收集到要做的 put requests 開始一個一個 apply 到 cache 中,完成 cache 的更新。

2023年1月28日 星期六

基本的 OpenTelemetry Metrics 設定:紀錄累計數目並透過 Spring AOP 攔截方法

接續著前面的 Spring AOP,接著要在 Spring AOP 之上接 OpenTelemetry。實際上接 OpenTelemetry 才是我的目的,AOP 只是希望接 metric/tracing 時可以不要碰商業邏輯的程式碼而已 XD。

什麼是 OpenTelemetry

簡要來說,OpenTelemetry(簡稱 OTEL)是結合了 CNCF 發展的 OpenTracing 和 Google 發展的 OpenCensus 兩個專案後的結果 [1],目的是為了提供 observibility telemetry。它涵蓋了三大主題:tracing、metrics、logs。tracing 能夠整合上下游的關係,提供完整的 profiling 資訊;metrics 能夠提供統計型的數據,讓我們可以快速了解系統的狀態;logs 則是文字型的資料。

不過就我目前的了解,OTEL 因為是在 2019 年才合併,到現在似乎還是沒有到非常完整,各個語言的支援有些可能還是有點缺漏。以 Java 來說,目前 tracing 和 metrics 的支援是比較好,logs 則還處於實驗中。細節可以參考 OTEL 官網中關於 Java SDK 的狀態頁 [2]。

OpenTelemetry Metrics 建置

因為我目前的目的是要建立 Metrics 的環境,把我的 Java application 的一些自訂資訊輸出到 Metrics 上,讓我們得以透過統計資訊了解系統的狀態,所以這篇主要只會紀錄關於 Metrics 的建置範例。

範例的目的

首先稍微簡介一下,這篇文章中的範例是設定成什麼背景、要解決什麼問題。承襲上一篇 Spring AOP 文章,我有一個 method 如下,這個 method 的角色是一個 event consumer,就是在收某種 queue 送過來的訊息。文章的目標是要透過 AOP 插入一個能夠統計收到的訊息的類型的 Metrics。有了這個 Metrics,就可以知道系統總共處理什麼量級的訊息,並且也可以用來做更細緻的統計,例如 Event 的設計是有分 eventTypeowner,代表的是某個人送出的 Create/Update/Delete 指令,而 Metrics 希望能夠讓我們有能力得知例如在指定時間區間內,某個人送了多少指令、或者是總共有多少的 Create 指令等等。

@Slf4j
@Component
public class FakeEventProcessor {
    public void receiveEvent(Event event) {
        log.info("Receive: {}", event);
    }
}

@Builder
@Getter
@Accessors(fluent = true)
@ToString
public class Event {
    private String eventType;
    private String owner;
}

完整的範例程式碼,可以參考 [3]。

Gradle 設定

在 Gradle 中,需要加入以下的 dependencies:

// BOMs
implementation(platform("io.opentelemetry:opentelemetry-bom:1.22.0"))
implementation(platform("io.opentelemetry:opentelemetry-bom-alpha:1.22.0-alpha"))

implementation("io.opentelemetry:opentelemetry-api")
implementation("io.opentelemetry:opentelemetry-sdk")
implementation("io.opentelemetry:opentelemetry-semconv")

// Exporter
implementation("io.opentelemetry:opentelemetry-exporter-logging")

這裡可以看到 BOM 會有兩個,其中 opentelemetry-bom-alpha 是用來設定 opentelemetry-semconv 的 BOM,而 opentelemetry-semconv 的用途,在我目前的範例程式裡,好像只有初始化 OTEL 時要給的 ResourceAttribute 會用到它…。另外因為這裡我先實驗的目標是最簡單的 Metrics,所以是採取 Logging 作為 Metrics 的 Exporter。換句話說,就是我寫入的 Metrics 會以 log 的形式被輸出。

初始化 OpenTelemetry

要使用 OTEL 的 Metrics 之前,需要先在系統裡初始化一個 OpenTelemetry 的 instance。我的範例中會是使用 @Configuration 來讓 Spring 幫忙注入。

@Configuration
public class OpenTelemetryConfiguration {

    @Bean(destroyMethod = "")
    public OpenTelemetry getTelemetry() {
        var resource = Resource.getDefault()
                .merge(Resource.create(
                        Attributes.of(ResourceAttributes.SERVICE_NAME, "otel-example")));

        var sdkMeterProvider = SdkMeterProvider.builder()
                .registerMetricReader(
                        PeriodicMetricReader.builder(LoggingMetricExporter.create())
                                .setInterval(Duration.ofSeconds(1))
                                .build())
                .setResource(resource)
                .build();

        var openTelemetry = OpenTelemetrySdk.builder()
                .setMeterProvider(sdkMeterProvider)
                .buildAndRegisterGlobal();

        return openTelemetry;
    }
}

這裡首先用 Resource 做基本的環境設定,具體來說就只是設定一個 resource name 而已。接著因為我要產出 Metrics,所以需要的是 SdkMeterProvider。MeterProvider 的設定是輸出到 Logging,而且外面再包裝一層定時輸出的 MetricReader,設定為每一秒輸出一次。最後建出 OTEL 的 instance,把剛剛建立的 SdkMeterProvider 設定為它的 MeterProvider 即可。

在翻閱文件時,有個小細節是文件上有提到,如果是在為 library 建立 telemetry 的話,就建議不要 register global。雖然目前我還不太了解 register global 是什麼意思就是…。

建立 Aspect 為指定的 Method 插入 Metrics

文章最開頭有提到,我想要在插入 Metrics 統計的同時,不去修改既有的商業邏輯,所以 Metrics 的統計應該要發生在別的 class 而不應該直接寫在 FakeEventProcessor 中。因此我會另外建立一個 Aspect class,這個 class 會讓 Spring 注入上面寫到的 OpenTelemetry instance,然後在每次 FakeEventProcessorreceiveEvent(..) 被呼叫時,都攔截執行並把 event 的內容紀錄在 Metrics 當中。

@Slf4j
@Aspect
@Component
public class TelemetryAspect {

    private OpenTelemetry telemetry;

    private Meter meter;
    private LongCounter eventCounter;

    @Autowired
    public TelemetryAspect(OpenTelemetry telemetry) {
        log.trace("Initiate aspect...");
        this.telemetry = telemetry;
        initiateMeter();
    }

    private void initiateMeter() {
        meter = telemetry.meterBuilder("event-consumer")
                .setInstrumentationVersion("1.0.0")
                .build();

        eventCounter = meter.counterBuilder("eventType")
                .setDescription("Metrics for the event consuming.")
                .setUnit("1")
                .build();
    }

    @Before("execution(* tw.jimwayneyeh.example.otel.FakeEventProcessor.receiveEvent(..))")
    public void before(JoinPoint joinPoint) {
        var event = (Event) joinPoint.getArgs()[0];
        eventCounter.add(1, Attributes.of(
                AttributeKey.stringKey("eventType"), event.eventType(),
                AttributeKey.stringKey("owner"), event.owner()));
    }

在上述的程式碼中,首先我在 Aspect 被初始化時,會去初始化一個 Meter,因為這個 Meter 是統計數字,所以就直接命名為 eventCounter。接著在 JointPoint 中,每次 receiveEvent(..) 被呼叫時,AOP 會攔截這個 method 呼叫,取得 method 呼叫中送進來的 event 物件,並且把為 eventCounter +1。其中 +1 時加的對象,是對 eventType & owner 做 +1。

最後執行的結果會長這樣:

INFO  t.j.e.o.FakeEventProcessor [main] Receive: Event(eventType=update, owner=owner-0)
INFO  t.j.e.o.FakeEventProcessor [main] Receive: Event(eventType=update, owner=owner-1)
INFO  t.j.e.o.FakeEventProcessor [main] Receive: Event(eventType=update, owner=owner-1)
INFO  t.j.e.o.FakeEventProcessor [main] Receive: Event(eventType=update, owner=owner-1)
INFO  t.j.e.o.FakeEventProcessor [main] Receive: Event(eventType=delete, owner=owner-1)
INFO  t.j.e.o.FakeEventProcessor [main] Receive: Event(eventType=create, owner=owner-0)
INFO  t.j.e.o.FakeEventProcessor [main] Receive: Event(eventType=update, owner=owner-0)
INFO  t.j.e.o.FakeEventProcessor [main] Receive: Event(eventType=delete, owner=owner-1)
INFO  t.j.e.o.FakeEventProcessor [main] Receive: Event(eventType=create, owner=owner-0)
INFO  t.j.e.o.FakeEventProcessor [main] Receive: Event(eventType=delete, owner=owner-0)
INFO  t.j.e.otel.Run [main] Sleep...
INFO  i.o.e.l.LoggingMetricExporter [PeriodicMetricReader-1] Received a collection of 1 metrics for export.
INFO  i.o.e.l.LoggingMetricExporter [PeriodicMetricReader-1] metric: ImmutableMetricData{resource=Resource{schemaUrl=null, attributes={service.name="otel-example", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.22.0"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=event-consumer, version=1.0.0, schemaUrl=null, attributes={}}, name=eventType, description=Metrics for the event consuming., unit=1, type=LONG_SUM, data=ImmutableSumData{points=[ImmutableLongPointData{startEpochNanos=1674906020221141200, epochNanos=1674906021227565100, attributes={eventType="delete", owner="owner-1"}, value=2, exemplars=[]}, ImmutableLongPointData{startEpochNanos=1674906020221141200, epochNanos=1674906021227565100, attributes={eventType="create", owner="owner-0"}, value=2, exemplars=[]}, ImmutableLongPointData{startEpochNanos=1674906020221141200, epochNanos=1674906021227565100, attributes={eventType="delete", owner="owner-0"}, value=1, exemplars=[]}, ImmutableLongPointData{startEpochNanos=1674906020221141200, epochNanos=1674906021227565100, attributes={eventType="update", owner="owner-0"}, value=2, exemplars=[]}, ImmutableLongPointData{startEpochNanos=1674906020221141200, epochNanos=1674906021227565100, attributes={eventType="update", owner="owner-1"}, value=3, exemplars=[]}], monotonic=true, aggregationTemporality=CUMULATIVE}}

最上面 10 行是在 FakeEventProcessor 裡面寫的 log,然後因為我的 logging 設定為每秒輸出,所以故意讓主程式睡了一下,以避免主程式跑完迴圈以後就自己關掉了 XD。

i.o.e.l.LoggingMetricExporter 這段就是 Metrics 輸出的內容,稍微格式化一下:

ImmutableMetricData {
    resource = Resource {
        schemaUrl = null, attributes = {
            service.name = "otel-example",
            telemetry.sdk.language = "java",
            telemetry.sdk.name = "opentelemetry",
            telemetry.sdk.version = "1.22.0"
        }
    }, instrumentationScopeInfo = InstrumentationScopeInfo {
        name = event - consumer, version = 1.0 .0, schemaUrl = null, attributes = {}
    }, name = eventType, description = Metrics
    for the event consuming., unit = 1, type = LONG_SUM, data = ImmutableSumData {
        points = [ImmutableLongPointData {
            startEpochNanos = 1674906020221141200, epochNanos = 1674906021227565100, attributes = {
                eventType = "delete",
                owner = "owner-1"
            }, value = 2, exemplars = []
        }, ImmutableLongPointData {
            startEpochNanos = 1674906020221141200, epochNanos = 1674906021227565100, attributes = {
                eventType = "create",
                owner = "owner-0"
            }, value = 2, exemplars = []
        }, ImmutableLongPointData {
            startEpochNanos = 1674906020221141200, epochNanos = 1674906021227565100, attributes = {
                eventType = "delete",
                owner = "owner-0"
            }, value = 1, exemplars = []
        }, ImmutableLongPointData {
            startEpochNanos = 1674906020221141200, epochNanos = 1674906021227565100, attributes = {
                eventType = "update",
                owner = "owner-0"
            }, value = 2, exemplars = []
        }, ImmutableLongPointData {
            startEpochNanos = 1674906020221141200, epochNanos = 1674906021227565100, attributes = {
                eventType = "update",
                owner = "owner-1"
            }, value = 3, exemplars = []
        }], monotonic = true, aggregationTemporality = CUMULATIVE
    }
}

可以看到,Metrics 紀錄到的結果是:

  • owner-1 delete: 1
  • owner-0 create: 2
  • owner-0 delete: 1
  • owner-0 update: 2
  • owner-1 update: 3

確實結果是符合 FakeEventProcessor 的結果~。