Skip to content

[Enhancement] Improve Pulsar Broker memory usage metrics which are also used in the Pulsar Load Manager #21973

@lhotari

Description

@lhotari

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Currently Pulsar uses heap usage as one of the resource metrics for the Pulsar Load Manager.
This is the code that is used:

public static SystemResourceUsage getSystemResourceUsage(final BrokerHostUsage brokerHostUsage) {
SystemResourceUsage systemResourceUsage = brokerHostUsage.getBrokerHostUsage();
// Override System memory usage and limit with JVM heap usage and limit
double maxHeapMemoryInBytes = Runtime.getRuntime().maxMemory();
double memoryUsageInBytes = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
double memoryUsage = memoryUsageInBytes / MIBI;
double memoryLimit = maxHeapMemoryInBytes / MIBI;
systemResourceUsage.setMemory(new ResourceUsage(memoryUsage, memoryLimit));
// Collect JVM direct memory
systemResourceUsage.setDirectMemory(new ResourceUsage((double) (getJvmDirectMemoryUsed() / MIBI),
(double) (DirectMemoryUtils.jvmMaxDirectMemory() / MIBI)));
return systemResourceUsage;
}

Overall heap usage is not a very useful metric since heap usage varies in the JVM because of the way memory management works.

Solution

The OpenJDK JVM provides better ways to track actual memory usage. One possibility is to observe the memory usage after GC events (minor & major). The GC events can be used to calculate a better estimation of how much of the memory is used by the objects retained in the heap.

On OpenJDK there is a JMX API for registering a listener to receive GC events.

This is documented in com.sun.management.GarbageCollectionNotificationInfo . The GcInfo included in the event: com.sun.management.GcInfo

GC events are handled separately for different memory pools (or spaces, referred to "eden", "survivor", "tenured", etc., depending on GC implementation). It makes usually only sense to track the usage of the tenured memory pool ("G1 Old Gen" for G1GC) after a GC event. The value is accurate after a Full GC. It's possible to have some way to calculate an estimate after GC events where the tenured space is partially GCed. For example, taking the last Full GC event and calculating an average after a partial GC event. This all works well for at least for CMS, Parallel and G1GC on OpenJDK. ZGC might behave differently but it should be possible to find a good metric also for ZGC based on the GC events.

ChatGPT generated this kind of sample code for observing GC events on OpenJDK. It looks about right:

import java.lang.management.*;
import javax.management.*;
import javax.management.openmbean.CompositeData;
import com.sun.management.GarbageCollectionNotificationInfo;
import com.sun.management.GcInfo;

public class GcNotificationListener {

    public static void main(String[] args) throws Exception {
        // Get the platform MBeanServer
        MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();

        // Listen for GarbageCollectorMXBean notifications
        for (GarbageCollectorMXBean gcMxBean : ManagementFactory.getGarbageCollectorMXBeans()) {
            NotificationEmitter emitter = (NotificationEmitter) gcMxBean;
            NotificationListener listener = (notification, handback) -> {
                if (notification.getType().equals(GarbageCollectionNotificationInfo.GARBAGE_COLLECTION_NOTIFICATION)) {
                    // Extract the GarbageCollectionNotificationInfo
                    CompositeData cd = (CompositeData) notification.getUserData();
                    GarbageCollectionNotificationInfo info = GarbageCollectionNotificationInfo.from(cd);

                    // Get the GcInfo object
                    GcInfo gcInfo = info.getGcInfo();

                    // Here you can use the gcInfo object
                    System.out.println("GC Name: " + info.getGcName());
                    System.out.println("GC Action: " + info.getGcAction());
                    System.out.println("GC Cause: " + info.getGcCause());
                    System.out.println("GC Info: " + gcInfo.toString());
                }
            };

            emitter.addNotificationListener(listener, null, null);
        }

        // Keep the program running to listen for GC events
        System.out.println("Listening for GC events...");
        Thread.sleep(Long.MAX_VALUE);
    }
}

Alternatives

There isn't really good alternatives for estimating the memory usage of objects that are long living in the JVM heap. In OpenJDK, adding a listener for GC events is the best way to estimate this.

Anything else?

The JVM also has a separate notification interface for detecting low memory conditions. It's possible to set a threshold for memory usage and an event will be emitted when the threshold is crossed.
This is documented in MemoryPoolMXBean. This is useful for JVM health checks, but that would be a different problem to solve than the memory usage metrics.

ChatGPT generated this kind of example for MemoryPoolMXBean.setCollectionUsageThreshold. Sharing it here just to show a complete picture of the JVM interfaces for monitoring low memory conditions:

import javax.management.*;
import javax.management.openmbean.CompositeData;
import java.lang.management.*;
import java.util.List;
import com.sun.management.MemoryNotificationInfo;
import com.sun.management.MemoryPoolMXBean;

public class MemoryThresholdListener {

    public static void main(String[] args) throws Exception {
        // Get the platform MBean server
        MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();

        // Obtain the MemoryMXBean
        MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();

        // Register a listener for memory notifications
        NotificationEmitter emitter = (NotificationEmitter) memoryMXBean;
        NotificationListener listener = (notification, handback) -> {
            if (MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED.equals(notification.getType())) {
                CompositeData cd = (CompositeData) notification.getUserData();
                MemoryNotificationInfo info = MemoryNotificationInfo.from(cd);
                System.out.println("Memory collection threshold exceeded");
                System.out.println("Memory usage details: " + info.getUsage());
            }
        };
        emitter.addNotificationListener(listener, null, null);

        // Set a usage threshold on a MemoryPoolMXBean
        List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans();
        for (MemoryPoolMXBean pool : pools) {
            // Example: Set the threshold on the Old Gen pool
            if (pool.getName().contains("Old Gen")) {
                // Set the threshold to 90% of the current usage
                long max = pool.getUsage().getMax();
                pool.setCollectionUsageThreshold((long) (max * 0.9));
                System.out.println("Threshold set for " + pool.getName());
            }
        }

        // Keep the program running to listen for memory threshold exceed notifications
        System.out.println("Listening for memory threshold exceed notifications...");
        Thread.sleep(Long.MAX_VALUE);
    }
}

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions