@@ -83,6 +83,40 @@ void ThrowIfError(const arrow::Status& status) {
8383 }
8484}
8585
86+ class JNIEnvGuard {
87+ public:
88+ explicit JNIEnvGuard (JavaVM* vm) : vm_(vm), env_(nullptr ), should_detach_(false ) {
89+ JNIEnv* env;
90+ jint code = vm->GetEnv (reinterpret_cast <void **>(&env), JNI_VERSION);
91+ if (code == JNI_EDETACHED) {
92+ JavaVMAttachArgs args;
93+ args.version = JNI_VERSION;
94+ args.name = NULL ;
95+ args.group = NULL ;
96+ code = vm->AttachCurrentThread (reinterpret_cast <void **>(&env), &args);
97+ should_detach_ = (code == JNI_OK);
98+ }
99+ if (code != JNI_OK) {
100+ ThrowPendingException (" Failed to attach the current thread to a Java VM" );
101+ }
102+ env_ = env;
103+ }
104+
105+ JNIEnv* env () { return env_; }
106+
107+ ~JNIEnvGuard () {
108+ if (should_detach_) {
109+ vm_->DetachCurrentThread ();
110+ should_detach_ = false ;
111+ }
112+ }
113+
114+ private:
115+ JavaVM* vm_;
116+ JNIEnv* env_;
117+ bool should_detach_;
118+ };
119+
86120template <typename T>
87121T JniGetOrThrow (arrow::Result<T> result) {
88122 const arrow::Status& status = result.status ();
@@ -126,23 +160,27 @@ class ReserveFromJava : public arrow::dataset::jni::ReservationListener {
126160 : vm_(vm), java_reservation_listener_(java_reservation_listener) {}
127161
128162 arrow::Status OnReservation (int64_t size) override {
129- JNIEnv* env;
130- if (vm_->GetEnv (reinterpret_cast <void **>(&env), JNI_VERSION) != JNI_OK) {
131- return arrow::Status::Invalid (" JNIEnv was not attached to current thread" );
163+ try {
164+ JNIEnvGuard guard (vm_);
165+ JNIEnv* env = guard.env ();
166+ env->CallObjectMethod (java_reservation_listener_, reserve_memory_method, size);
167+ RETURN_NOT_OK (arrow::dataset::jni::CheckException (env));
168+ return arrow::Status::OK ();
169+ } catch (const JniPendingException& e) {
170+ return arrow::Status::Invalid (e.what ());
132171 }
133- env->CallObjectMethod (java_reservation_listener_, reserve_memory_method, size);
134- RETURN_NOT_OK (arrow::dataset::jni::CheckException (env));
135- return arrow::Status::OK ();
136172 }
137173
138174 arrow::Status OnRelease (int64_t size) override {
139- JNIEnv* env;
140- if (vm_->GetEnv (reinterpret_cast <void **>(&env), JNI_VERSION) != JNI_OK) {
141- return arrow::Status::Invalid (" JNIEnv was not attached to current thread" );
175+ try {
176+ JNIEnvGuard guard (vm_);
177+ JNIEnv* env = guard.env ();
178+ env->CallObjectMethod (java_reservation_listener_, unreserve_memory_method, size);
179+ RETURN_NOT_OK (arrow::dataset::jni::CheckException (env));
180+ return arrow::Status::OK ();
181+ } catch (const JniPendingException& e) {
182+ return arrow::Status::Invalid (e.what ());
142183 }
143- env->CallObjectMethod (java_reservation_listener_, unreserve_memory_method, size);
144- RETURN_NOT_OK (arrow::dataset::jni::CheckException (env));
145- return arrow::Status::OK ();
146184 }
147185
148186 jobject GetJavaReservationListener () { return java_reservation_listener_; }
0 commit comments