Skip to content

HashedWheelTimer 内存占用过高bug (dubbo 2.7.8) #6820

@zx844174097

Description

@zx844174097

bug 所在文件 https://github.com/apache/dubbo/blob/master/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java

出现问题的方法:
private boolean isWindows() { return System.getProperty("os.name", "").toLowerCase(Locale.US).contains("win"); }
问题产生原因:
1.waitForNextTick() 中无限调用 isWindows();

2.toLowerCase(Locale.US) 将急速产生 char[] 对象。导致内存占用过高。

解决方案:
` private static boolean isWindows = System.getProperty("os.name", "").toLowerCase(Locale.US).contains("win");

private boolean isWindows() {
	return isWindows;
}`

临时解决方案:
覆盖编写HashedWheelTimer 类
`/*

  • Copyright 2012 The Netty Project
  • The Netty Project licenses this file to you under the Apache License,
  • version 2.0 (the "License"); you may not use this file except in compliance
  • with the License. You may obtain a copy of the License at:
  • http://www.apache.org/licenses/LICENSE-2.0
  • Unless required by applicable law or agreed to in writing, software
  • distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  • WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  • License for the specific language governing permissions and limitations
  • under the License.
    */

package org.apache.dubbo.common.timer;

import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ClassUtils;

import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;

/**
覆盖阿里该类,用来修复内存占用过高的bug
*/
public class HashedWheelTimer implements Timer {

/**
 * may be in spi?
 */
public static final String NAME = "hased";

private static final Logger logger = LoggerFactory.getLogger(HashedWheelTimer.class);

private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
private static final int INSTANCE_COUNT_LIMIT = 64;
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER = AtomicIntegerFieldUpdater
		.newUpdater(HashedWheelTimer.class, "workerState");

private final Worker worker = new Worker();
private final Thread workerThread;

private static final int WORKER_STATE_INIT = 0;
private static final int WORKER_STATE_STARTED = 1;
private static final int WORKER_STATE_SHUTDOWN = 2;

/**
 * 0 - init, 1 - started, 2 - shut down
 */
@SuppressWarnings({ "unused", "FieldMayBeFinal" })
private volatile int workerState;

private final long tickDuration;
private final HashedWheelBucket[] wheel;
private final int mask;
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
private final Queue<HashedWheelTimeout> timeouts = new LinkedBlockingQueue<>();
private final Queue<HashedWheelTimeout> cancelledTimeouts = new LinkedBlockingQueue<>();
private final AtomicLong pendingTimeouts = new AtomicLong(0);
private final long maxPendingTimeouts;

private volatile long startTime;

/**
 * Creates a new timer with the default thread factory
 * ({@link Executors#defaultThreadFactory()}), default tick duration, and
 * default number of ticks per wheel.
 */
public HashedWheelTimer() {
	this(Executors.defaultThreadFactory());
}

/**
 * Creates a new timer with the default thread factory
 * ({@link Executors#defaultThreadFactory()}) and default number of ticks per
 * wheel.
 *
 * @param tickDuration the duration between tick
 * @param unit         the time unit of the {@code tickDuration}
 * @throws NullPointerException     if {@code unit} is {@code null}
 * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
 */
public HashedWheelTimer(long tickDuration, TimeUnit unit) {
	this(Executors.defaultThreadFactory(), tickDuration, unit);
}

/**
 * Creates a new timer with the default thread factory
 * ({@link Executors#defaultThreadFactory()}).
 *
 * @param tickDuration  the duration between tick
 * @param unit          the time unit of the {@code tickDuration}
 * @param ticksPerWheel the size of the wheel
 * @throws NullPointerException     if {@code unit} is {@code null}
 * @throws IllegalArgumentException if either of {@code tickDuration} and
 *                                  {@code ticksPerWheel} is &lt;= 0
 */
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
	this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}

/**
 * Creates a new timer with the default tick duration and default number of
 * ticks per wheel.
 *
 * @param threadFactory a {@link ThreadFactory} that creates a background
 *                      {@link Thread} which is dedicated to {@link TimerTask}
 *                      execution.
 * @throws NullPointerException if {@code threadFactory} is {@code null}
 */
public HashedWheelTimer(ThreadFactory threadFactory) {
	this(threadFactory, 100, TimeUnit.MILLISECONDS);
}

/**
 * Creates a new timer with the default number of ticks per wheel.
 *
 * @param threadFactory a {@link ThreadFactory} that creates a background
 *                      {@link Thread} which is dedicated to {@link TimerTask}
 *                      execution.
 * @param tickDuration  the duration between tick
 * @param unit          the time unit of the {@code tickDuration}
 * @throws NullPointerException     if either of {@code threadFactory} and
 *                                  {@code unit} is {@code null}
 * @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
 */
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
	this(threadFactory, tickDuration, unit, 512);
}

/**
 * Creates a new timer.
 *
 * @param threadFactory a {@link ThreadFactory} that creates a background
 *                      {@link Thread} which is dedicated to {@link TimerTask}
 *                      execution.
 * @param tickDuration  the duration between tick
 * @param unit          the time unit of the {@code tickDuration}
 * @param ticksPerWheel the size of the wheel
 * @throws NullPointerException     if either of {@code threadFactory} and
 *                                  {@code unit} is {@code null}
 * @throws IllegalArgumentException if either of {@code tickDuration} and
 *                                  {@code ticksPerWheel} is &lt;= 0
 */
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
	this(threadFactory, tickDuration, unit, ticksPerWheel, -1);
}

/**
 * Creates a new timer.
 *
 * @param threadFactory      a {@link ThreadFactory} that creates a background
 *                           {@link Thread} which is dedicated to
 *                           {@link TimerTask} execution.
 * @param tickDuration       the duration between tick
 * @param unit               the time unit of the {@code tickDuration}
 * @param ticksPerWheel      the size of the wheel
 * @param maxPendingTimeouts The maximum number of pending timeouts after which
 *                           call to {@code newTimeout} will result in
 *                           {@link java.util.concurrent.RejectedExecutionException}
 *                           being thrown. No maximum pending timeouts limit is
 *                           assumed if this value is 0 or negative.
 * @throws NullPointerException     if either of {@code threadFactory} and
 *                                  {@code unit} is {@code null}
 * @throws IllegalArgumentException if either of {@code tickDuration} and
 *                                  {@code ticksPerWheel} is &lt;= 0
 */
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
		long maxPendingTimeouts) {

	if (threadFactory == null) {
		throw new NullPointerException("threadFactory");
	}
	if (unit == null) {
		throw new NullPointerException("unit");
	}
	if (tickDuration <= 0) {
		throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
	}
	if (ticksPerWheel <= 0) {
		throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
	}

	// Normalize ticksPerWheel to power of two and initialize the wheel.
	wheel = createWheel(ticksPerWheel);
	mask = wheel.length - 1;

	// Convert tickDuration to nanos.
	this.tickDuration = unit.toNanos(tickDuration);

	// Prevent overflow.
	if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
		throw new IllegalArgumentException(
				String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration,
						Long.MAX_VALUE / wheel.length));
	}
	workerThread = threadFactory.newThread(worker);

	this.maxPendingTimeouts = maxPendingTimeouts;

	if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT
			&& WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
		reportTooManyInstances();
	}
}

@Override
protected void finalize() throws Throwable {
	try {
		super.finalize();
	} finally {
		// This object is going to be GCed and it is assumed the ship has sailed to do a
		// proper shutdown. If
		// we have not yet shutdown then we want to make sure we decrement the active
		// instance count.
		if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
			INSTANCE_COUNTER.decrementAndGet();
		}
	}
}

private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
	if (ticksPerWheel <= 0) {
		throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
	}
	if (ticksPerWheel > 1073741824) {
		throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
	}

	ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
	HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
	for (int i = 0; i < wheel.length; i++) {
		wheel[i] = new HashedWheelBucket();
	}
	return wheel;
}

private static int normalizeTicksPerWheel(int ticksPerWheel) {
	int normalizedTicksPerWheel = ticksPerWheel - 1;
	normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 1;
	normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 2;
	normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 4;
	normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 8;
	normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 16;
	return normalizedTicksPerWheel + 1;
}

/**
 * Starts the background thread explicitly. The background thread will start
 * automatically on demand even if you did not call this method.
 *
 * @throws IllegalStateException if this timer has been {@linkplain #stop()
 *                               stopped} already
 */
public void start() {
	switch (WORKER_STATE_UPDATER.get(this)) {
	case WORKER_STATE_INIT:
		if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
			workerThread.start();
		}
		break;
	case WORKER_STATE_STARTED:
		break;
	case WORKER_STATE_SHUTDOWN:
		throw new IllegalStateException("cannot be started once stopped");
	default:
		throw new Error("Invalid WorkerState");
	}

	// Wait until the startTime is initialized by the worker.
	while (startTime == 0) {
		try {
			startTimeInitialized.await();
		} catch (InterruptedException ignore) {
			// Ignore - it will be ready very soon.
		}
	}
}

@Override
public Set<Timeout> stop() {
	if (Thread.currentThread() == workerThread) {
		throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from "
				+ TimerTask.class.getSimpleName());
	}

	if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
		// workerState can be 0 or 2 at this moment - let it always be 2.
		if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
			INSTANCE_COUNTER.decrementAndGet();
		}

		return Collections.emptySet();
	}

	try {
		boolean interrupted = false;
		while (workerThread.isAlive()) {
			workerThread.interrupt();
			try {
				workerThread.join(100);
			} catch (InterruptedException ignored) {
				interrupted = true;
			}
		}

		if (interrupted) {
			Thread.currentThread().interrupt();
		}
	} finally {
		INSTANCE_COUNTER.decrementAndGet();
	}
	return worker.unprocessedTimeouts();
}

@Override
public boolean isStop() {
	return WORKER_STATE_SHUTDOWN == WORKER_STATE_UPDATER.get(this);
}

@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
	if (task == null) {
		throw new NullPointerException("task");
	}
	if (unit == null) {
		throw new NullPointerException("unit");
	}

	long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

	if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
		pendingTimeouts.decrementAndGet();
		throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
				+ ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts
				+ ")");
	}

	start();

	// Add the timeout to the timeout queue which will be processed on the next
	// tick.
	// During processing all the queued HashedWheelTimeouts will be added to the
	// correct HashedWheelBucket.
	long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

	// Guard against overflow.
	if (delay > 0 && deadline < 0) {
		deadline = Long.MAX_VALUE;
	}
	HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
	timeouts.add(timeout);
	return timeout;
}

/**
 * Returns the number of pending timeouts of this {@link Timer}.
 */
public long pendingTimeouts() {
	return pendingTimeouts.get();
}

private static void reportTooManyInstances() {
	String resourceType = ClassUtils.simpleClassName(HashedWheelTimer.class);
	logger.error("You are creating too many " + resourceType + " instances. " + resourceType
			+ " is a shared resource that must be reused across the JVM,"
			+ "so that only a few instances are created.");
}

private final class Worker implements Runnable {
	private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

	private long tick;

	@Override
	public void run() {
		// Initialize the startTime.
		startTime = System.nanoTime();
		if (startTime == 0) {
			// We use 0 as an indicator for the uninitialized value here, so make sure it's
			// not 0 when initialized.
			startTime = 1;
		}

		// Notify the other threads waiting for the initialization at start().
		startTimeInitialized.countDown();

		do {
			final long deadline = waitForNextTick();
			if (deadline > 0) {
				int idx = (int) (tick & mask);
				processCancelledTasks();
				HashedWheelBucket bucket = wheel[idx];
				transferTimeoutsToBuckets();
				bucket.expireTimeouts(deadline);
				tick++;
			}
		} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

		// Fill the unprocessedTimeouts so we can return them from stop() method.
		for (HashedWheelBucket bucket : wheel) {
			bucket.clearTimeouts(unprocessedTimeouts);
		}
		for (;;) {
			HashedWheelTimeout timeout = timeouts.poll();
			if (timeout == null) {
				break;
			}
			if (!timeout.isCancelled()) {
				unprocessedTimeouts.add(timeout);
			}
		}
		processCancelledTasks();
	}

	private void transferTimeoutsToBuckets() {
		// transfer only max. 100000 timeouts per tick to prevent a thread to stale the
		// workerThread when it just
		// adds new timeouts in a loop.
		for (int i = 0; i < 100000; i++) {
			HashedWheelTimeout timeout = timeouts.poll();
			if (timeout == null) {
				// all processed
				break;
			}
			if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
				// Was cancelled in the meantime.
				continue;
			}

			long calculated = timeout.deadline / tickDuration;
			timeout.remainingRounds = (calculated - tick) / wheel.length;

			// Ensure we don't schedule for past.
			final long ticks = Math.max(calculated, tick);
			int stopIndex = (int) (ticks & mask);

			HashedWheelBucket bucket = wheel[stopIndex];
			bucket.addTimeout(timeout);
		}
	}

	private void processCancelledTasks() {
		for (;;) {
			HashedWheelTimeout timeout = cancelledTimeouts.poll();
			if (timeout == null) {
				// all processed
				break;
			}
			try {
				timeout.remove();
			} catch (Throwable t) {
				if (logger.isWarnEnabled()) {
					logger.warn("An exception was thrown while process a cancellation task", t);
				}
			}
		}
	}

	/**
	 * calculate goal nanoTime from startTime and current tick number, then wait
	 * until that goal has been reached.
	 *
	 * @return Long.MIN_VALUE if received a shutdown request, current time otherwise
	 *         (with Long.MIN_VALUE changed by +1)
	 */
	private long waitForNextTick() {
		long deadline = tickDuration * (tick + 1);

		for (;;) {
			final long currentTime = System.nanoTime() - startTime;
			long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

			if (sleepTimeMs <= 0) {
				if (currentTime == Long.MIN_VALUE) {
					return -Long.MAX_VALUE;
				} else {
					return currentTime;
				}
			}
			if (isWindows()) {
				sleepTimeMs = sleepTimeMs / 10 * 10;
			}

			try {
				Thread.sleep(sleepTimeMs);
			} catch (InterruptedException ignored) {
				if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
					return Long.MIN_VALUE;
				}
			}
		}
	}

	Set<Timeout> unprocessedTimeouts() {
		return Collections.unmodifiableSet(unprocessedTimeouts);
	}
}

private static final class HashedWheelTimeout implements Timeout {

	private static final int ST_INIT = 0;
	private static final int ST_CANCELLED = 1;
	private static final int ST_EXPIRED = 2;
	private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER = AtomicIntegerFieldUpdater
			.newUpdater(HashedWheelTimeout.class, "state");

	private final HashedWheelTimer timer;
	private final TimerTask task;
	private final long deadline;

	@SuppressWarnings({ "unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
	private volatile int state = ST_INIT;

	/**
	 * RemainingRounds will be calculated and set by
	 * Worker.transferTimeoutsToBuckets() before the HashedWheelTimeout will be
	 * added to the correct HashedWheelBucket.
	 */
	long remainingRounds;

	/**
	 * This will be used to chain timeouts in HashedWheelTimerBucket via a
	 * double-linked-list. As only the workerThread will act on it there is no need
	 * for synchronization / volatile.
	 */
	HashedWheelTimeout next;
	HashedWheelTimeout prev;

	/**
	 * The bucket to which the timeout was added
	 */
	HashedWheelBucket bucket;

	HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
		this.timer = timer;
		this.task = task;
		this.deadline = deadline;
	}

	@Override
	public Timer timer() {
		return timer;
	}

	@Override
	public TimerTask task() {
		return task;
	}

	@Override
	public boolean cancel() {
		// only update the state it will be removed from HashedWheelBucket on next tick.
		if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
			return false;
		}
		// If a task should be canceled we put this to another queue which will be
		// processed on each tick.
		// So this means that we will have a GC latency of max. 1 tick duration which is
		// good enough. This way
		// we can make again use of our MpscLinkedQueue and so minimize the locking /
		// overhead as much as possible.
		timer.cancelledTimeouts.add(this);
		return true;
	}

	void remove() {
		HashedWheelBucket bucket = this.bucket;
		if (bucket != null) {
			bucket.remove(this);
		} else {
			timer.pendingTimeouts.decrementAndGet();
		}
	}

	public boolean compareAndSetState(int expected, int state) {
		return STATE_UPDATER.compareAndSet(this, expected, state);
	}

	public int state() {
		return state;
	}

	@Override
	public boolean isCancelled() {
		return state() == ST_CANCELLED;
	}

	@Override
	public boolean isExpired() {
		return state() == ST_EXPIRED;
	}

	public void expire() {
		if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
			return;
		}

		try {
			task.run(this);
		} catch (Throwable t) {
			if (logger.isWarnEnabled()) {
				logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
			}
		}
	}

	@Override
	public String toString() {
		final long currentTime = System.nanoTime();
		long remaining = deadline - currentTime + timer.startTime;
		String simpleClassName = ClassUtils.simpleClassName(this.getClass());

		StringBuilder buf = new StringBuilder(192).append(simpleClassName).append('(').append("deadline: ");
		if (remaining > 0) {
			buf.append(remaining).append(" ns later");
		} else if (remaining < 0) {
			buf.append(-remaining).append(" ns ago");
		} else {
			buf.append("now");
		}

		if (isCancelled()) {
			buf.append(", cancelled");
		}

		return buf.append(", task: ").append(task()).append(')').toString();
	}
}

/**
 * Bucket that stores HashedWheelTimeouts. These are stored in a linked-list
 * like datastructure to allow easy removal of HashedWheelTimeouts in the
 * middle. Also the HashedWheelTimeout act as nodes themself and so no extra
 * object creation is needed.
 */
private static final class HashedWheelBucket {

	/**
	 * Used for the linked-list datastructure
	 */
	private HashedWheelTimeout head;
	private HashedWheelTimeout tail;

	/**
	 * Add {@link HashedWheelTimeout} to this bucket.
	 */
	void addTimeout(HashedWheelTimeout timeout) {
		assert timeout.bucket == null;
		timeout.bucket = this;
		if (head == null) {
			head = tail = timeout;
		} else {
			tail.next = timeout;
			timeout.prev = tail;
			tail = timeout;
		}
	}

	/**
	 * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
	 */
	void expireTimeouts(long deadline) {
		HashedWheelTimeout timeout = head;

		// process all timeouts
		while (timeout != null) {
			HashedWheelTimeout next = timeout.next;
			if (timeout.remainingRounds <= 0) {
				next = remove(timeout);
				if (timeout.deadline <= deadline) {
					timeout.expire();
				} else {
					// The timeout was placed into a wrong slot. This should never happen.
					throw new IllegalStateException(
							String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
				}
			} else if (timeout.isCancelled()) {
				next = remove(timeout);
			} else {
				timeout.remainingRounds--;
			}
			timeout = next;
		}
	}

	public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
		HashedWheelTimeout next = timeout.next;
		// remove timeout that was either processed or cancelled by updating the
		// linked-list
		if (timeout.prev != null) {
			timeout.prev.next = next;
		}
		if (timeout.next != null) {
			timeout.next.prev = timeout.prev;
		}

		if (timeout == head) {
			// if timeout is also the tail we need to adjust the entry too
			if (timeout == tail) {
				tail = null;
				head = null;
			} else {
				head = next;
			}
		} else if (timeout == tail) {
			// if the timeout is the tail modify the tail to be the prev node.
			tail = timeout.prev;
		}
		// null out prev, next and bucket to allow for GC.
		timeout.prev = null;
		timeout.next = null;
		timeout.bucket = null;
		timeout.timer.pendingTimeouts.decrementAndGet();
		return next;
	}

	/**
	 * Clear this bucket and return all not expired / cancelled {@link Timeout}s.
	 */
	void clearTimeouts(Set<Timeout> set) {
		for (;;) {
			HashedWheelTimeout timeout = pollTimeout();
			if (timeout == null) {
				return;
			}
			if (timeout.isExpired() || timeout.isCancelled()) {
				continue;
			}
			set.add(timeout);
		}
	}

	private HashedWheelTimeout pollTimeout() {
		HashedWheelTimeout head = this.head;
		if (head == null) {
			return null;
		}
		HashedWheelTimeout next = head.next;
		if (next == null) {
			tail = this.head = null;
		} else {
			this.head = next;
			next.prev = null;
		}

		// null out prev and next to allow for GC.
		head.next = null;
		head.prev = null;
		head.bucket = null;
		return head;
	}
}

private static boolean isWindows = System.getProperty("os.name", "").toLowerCase(Locale.US).contains("win");

private boolean isWindows() {
	return isWindows;
}

}
`

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions