== Parsed Logical Plan == 'Join UsingJoin(Inner,Buffer(tag)) :- Project [language#14 AS tag#44, count(target_uri)#41L] : +- Aggregate [language#14], [language#14, count(target_uri#18) AS count(target_uri)#41L] : +- Filter NOT (language#14 = ) : +- Project [target_uri#18, language#14] : +- LogicalRDD [block_digest#4, concurrent_to#5, content_length#6L, content_type#7, date_s#8L, html_content_type#9, html_length#10L, html_source#11, info_id#12, ip#13, language#14, payload_digest#15, payload_type#16, record_id#17, target_uri#18, warc_type#19], false +- LogicalRDD [tag#0, language#1], false == Analyzed Logical Plan == tag: string, count(target_uri): bigint, language: string Project [tag#44, count(target_uri)#41L, language#1] +- Join Inner, (tag#44 = tag#0) :- Project [language#14 AS tag#44, count(target_uri)#41L] : +- Aggregate [language#14], [language#14, count(target_uri#18) AS count(target_uri)#41L] : +- Filter NOT (language#14 = ) : +- Project [target_uri#18, language#14] : +- LogicalRDD [block_digest#4, concurrent_to#5, content_length#6L, content_type#7, date_s#8L, html_content_type#9, html_length#10L, html_source#11, info_id#12, ip#13, language#14, payload_digest#15, payload_type#16, record_id#17, target_uri#18, warc_type#19], false +- LogicalRDD [tag#0, language#1], false == Optimized Logical Plan == Project [tag#44, count(target_uri)#41L, language#1] +- Join Inner, (tag#44 = tag#0) :- Aggregate [language#14], [language#14 AS tag#44, count(target_uri#18) AS count(target_uri)#41L] : +- Project [target_uri#18, language#14] : +- Filter (isnotnull(language#14) AND NOT (language#14 = )) : +- LogicalRDD [block_digest#4, concurrent_to#5, content_length#6L, content_type#7, date_s#8L, html_content_type#9, html_length#10L, html_source#11, info_id#12, ip#13, language#14, payload_digest#15, payload_type#16, record_id#17, target_uri#18, warc_type#19], false +- Filter (NOT (tag#0 = ) AND isnotnull(tag#0)) +- LogicalRDD [tag#0, language#1], false == Physical Plan == *(5) Project [tag#44, count(target_uri)#41L, language#1] +- *(5) SortMergeJoin [tag#44], [tag#0], Inner :- *(2) Sort [tag#44 ASC NULLS FIRST], false, 0 : +- *(2) HashAggregate(keys=[language#14], functions=[count(target_uri#18)], output=[tag#44, count(target_uri)#41L]) : +- Exchange hashpartitioning(language#14, 200), ENSURE_REQUIREMENTS, [id=#108] : +- *(1) HashAggregate(keys=[language#14], functions=[partial_count(target_uri#18)], output=[language#14, count#60L]) : +- *(1) Project [target_uri#18, language#14] : +- *(1) Filter (isnotnull(language#14) AND NOT (language#14 = )) : +- *(1) Scan ExistingRDD[block_digest#4,concurrent_to#5,content_length#6L,content_type#7,date_s#8L,html_content_type#9,html_length#10L,html_source#11,info_id#12,ip#13,language#14,payload_digest#15,payload_type#16,record_id#17,target_uri#18,warc_type#19] +- *(4) Sort [tag#0 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(tag#0, 200), ENSURE_REQUIREMENTS, [id=#115] +- *(3) Filter (NOT (tag#0 = ) AND isnotnull(tag#0)) +- *(3) Scan ExistingRDD[tag#0,language#1]