@@ -222,70 +222,49 @@ private static LogicalPlan pruneColumnsInFork(Fork fork, AttributeSet.Builder us
222222 forkOutputChanged = true ;
223223 }
224224 }
225-
226225 var prunedForkAttrs = forkOutputChanged ? builder .build ().stream ().toList () : fork .output ();
227226 // now that we have the pruned fork output attributes, we can proceed to apply pruning all children plan
228- var usedFork = AttributeSet .forkBuilder ();
229- usedFork .addAll (prunedForkAttrs );
230227 var forkOutputNames = prunedForkAttrs .stream ().map (NamedExpression ::name ).collect (Collectors .toSet ());
231- boolean childrenChanged = false ;
228+ boolean subPlanChanged = false ;
232229 List <LogicalPlan > newChildren = new ArrayList <>();
233- for (var child : fork .children ()) {
234- var clonedUsed = AttributeSet .forkBuilder ().addAll (usedFork );
235- var newChild = pruneSubPlan (child , clonedUsed , forkOutputNames );
236- newChildren .add (newChild );
237- if (false == newChild .equals (child )) {
238- childrenChanged = true ;
230+ for (var subPlan : fork .children ()) {
231+ var usedAttrs = AttributeSet .builder ();
232+ LogicalPlan newSubPlan ;
233+ // if it's a local relation, just update the output attributes
234+ // and return early
235+ if (subPlan instanceof LocalRelation localRelation ) {
236+ var outputAttrs = localRelation .output ().stream ().filter (x -> forkOutputNames .contains (x .name ())).toList ();
237+ newSubPlan = new LocalRelation (localRelation .source (), outputAttrs , localRelation .supplier ());
238+ } else {
239+ // otherwise, we first prune the projections of the top-level Project of each subplan
240+ Holder <Boolean > projectVisited = new Holder <>(false );
241+ newSubPlan = subPlan .transformDown (Project .class , p -> {
242+ if (projectVisited .get ()) {
243+ return p ;
244+ }
245+ projectVisited .set (true );
246+ // filter projections based on fork output attributes
247+ var prunedAttrs = p .projections ().stream ().filter (x -> forkOutputNames .contains (x .name ())).toList ();
248+ p = new Project (p .source (), p .child (), prunedAttrs );
249+ // add all output attributes to used set
250+ usedAttrs .addAll (p .output ());
251+ return p ;
252+ });
253+ newSubPlan = pruneColumns (newSubPlan , usedAttrs , false );
239254 }
255+ if (false == newSubPlan .equals (subPlan )) {
256+ subPlanChanged = true ;
257+ }
258+ newChildren .add (newSubPlan );
240259 }
241- if (childrenChanged ) {
260+ if (subPlanChanged ) {
242261 return new Fork (fork .source (), newChildren , prunedForkAttrs );
243262 } else if (forkOutputChanged ) {
244263 return new Fork (fork .source (), fork .children (), prunedForkAttrs );
245264 }
246265 return fork ;
247266 }
248267
249- private static LogicalPlan pruneSubPlan (LogicalPlan plan , AttributeSet .Builder usedAttrs , Set <String > forkOutput ) {
250- if (plan instanceof LocalRelation localRelation ) {
251- var outputAttrs = localRelation .output ().stream ().filter (usedAttrs ::contains ).collect (Collectors .toList ());
252- return new LocalRelation (localRelation .source (), outputAttrs , localRelation .supplier ());
253- }
254-
255- var projectHolder = new Holder <>(false );
256- return plan .transformDown (p -> {
257- if (p instanceof Limit || p instanceof Sample ) {
258- return p ;
259- }
260-
261- var recheck = new Holder <Boolean >();
262- do {
263- // we operate using the names of the fields, rather than comparing the attributes directly,
264- // as attributes may have been recreated during the transformations of fork branches.
265- recheck .set (false );
266- p = switch (p ) {
267- case Aggregate agg -> pruneColumnsInAggregate (agg , usedAttrs , false );
268- case InlineJoin inj -> pruneColumnsInInlineJoinRight (inj , usedAttrs , recheck );
269- case Eval eval -> pruneColumnsInEval (eval , usedAttrs , recheck );
270- case Project project -> {
271- // process only the direct Project after Fork, but skip any subsequent instances
272- if (projectHolder .get ()) {
273- yield p ;
274- } else {
275- projectHolder .set (true );
276- var prunedAttrs = project .projections ().stream ().filter (x -> forkOutput .contains (x .name ())).toList ();
277- yield new Project (project .source (), project .child (), prunedAttrs );
278- }
279- }
280- case EsRelation esr -> pruneColumnsInEsRelation (esr , usedAttrs );
281- default -> p ;
282- };
283- } while (recheck .get ());
284- usedAttrs .addAll (p .references ());
285- return p ;
286- });
287- }
288-
289268 private static LogicalPlan emptyLocalRelation (UnaryPlan plan ) {
290269 // create an empty local relation with no attributes
291270 return skipPlan (plan );
0 commit comments