== Parsed Logical Plan == 'Join UsingJoin(Inner,List(tag)) :- Project [language#41 AS tag#85, count(targetURI)#82L] : +- Aggregate [language#41], [language#41, count(targetURI#36) AS count(targetURI)#82L] : +- Filter isnotnull(language#41) : +- Project [targetURI#36, language#41] : +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).warcType, true, false) AS warcType#28, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).dateS AS dateS#29L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).recordID, true, false) AS recordID#30, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).contentLength AS contentLength#31, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).contentType, true, false) AS contentType#32, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).infoID, true, false) AS infoID#33, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).concurrentTo, true, false) AS concurrentTo#34, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).ip, true, false) AS ip#35, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).targetURI, true, false) AS targetURI#36, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).payloadDigest, true, false) AS payloadDigest#37, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).blockDigest, true, false) AS blockDigest#38, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).payloadType, true, false) AS payloadType#39, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).htmlContentType, true, false) AS htmlContentType#40, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).language), true, false) AS language#41, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).htmlLength AS htmlLength#42, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).htmlSource, true, false) AS htmlSource#43] : +- ExternalRDD [obj#27] +- Project [_1#2 AS tag#7, _2#3 AS language#8] +- LocalRelation [_1#2, _2#3] == Analyzed Logical Plan == tag: string, count(targetURI): bigint, language: string Project [tag#85, count(targetURI)#82L, language#8] +- Join Inner, (tag#85 = tag#7) :- Project [language#41 AS tag#85, count(targetURI)#82L] : +- Aggregate [language#41], [language#41, count(targetURI#36) AS count(targetURI)#82L] : +- Filter isnotnull(language#41) : +- Project [targetURI#36, language#41] : +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).warcType, true, false) AS warcType#28, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).dateS AS dateS#29L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).recordID, true, false) AS recordID#30, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).contentLength AS contentLength#31, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).contentType, true, false) AS contentType#32, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).infoID, true, false) AS infoID#33, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).concurrentTo, true, false) AS concurrentTo#34, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).ip, true, false) AS ip#35, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).targetURI, true, false) AS targetURI#36, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).payloadDigest, true, false) AS payloadDigest#37, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).blockDigest, true, false) AS blockDigest#38, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).payloadType, true, false) AS payloadType#39, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).htmlContentType, true, false) AS htmlContentType#40, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).language), true, false) AS language#41, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).htmlLength AS htmlLength#42, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).htmlSource, true, false) AS htmlSource#43] : +- ExternalRDD [obj#27] +- Project [_1#2 AS tag#7, _2#3 AS language#8] +- LocalRelation [_1#2, _2#3] == Optimized Logical Plan == Project [tag#85, count(targetURI)#82L, language#8] +- Join Inner, (tag#85 = tag#7) :- Aggregate [language#41], [language#41 AS tag#85, count(targetURI#36) AS count(targetURI)#82L] : +- Project [targetURI#36, language#41] : +- Filter isnotnull(language#41) : +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).warcType, true, false) AS warcType#28, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).dateS AS dateS#29L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).recordID, true, false) AS recordID#30, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).contentLength AS contentLength#31, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).contentType, true, false) AS contentType#32, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).infoID, true, false) AS infoID#33, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).concurrentTo, true, false) AS concurrentTo#34, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).ip, true, false) AS ip#35, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).targetURI, true, false) AS targetURI#36, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).payloadDigest, true, false) AS payloadDigest#37, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).blockDigest, true, false) AS blockDigest#38, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).payloadType, true, false) AS payloadType#39, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).htmlContentType, true, false) AS htmlContentType#40, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).language), true, false) AS language#41, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).htmlLength AS htmlLength#42, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).htmlSource, true, false) AS htmlSource#43] : +- ExternalRDD [obj#27] +- LocalRelation [tag#7, language#8] == Physical Plan == *(2) Project [tag#85, count(targetURI)#82L, language#8] +- *(2) BroadcastHashJoin [tag#85], [tag#7], Inner, BuildRight, false :- *(2) HashAggregate(keys=[language#41], functions=[count(targetURI#36)], output=[tag#85, count(targetURI)#82L]) : +- Exchange hashpartitioning(language#41, 200), ENSURE_REQUIREMENTS, [id=#104] : +- *(1) HashAggregate(keys=[language#41], functions=[partial_count(targetURI#36)], output=[language#41, count#101L]) : +- *(1) Project [targetURI#36, language#41] : +- *(1) Filter isnotnull(language#41) : +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).warcType, true, false) AS warcType#28, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).dateS AS dateS#29L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).recordID, true, false) AS recordID#30, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).contentLength AS contentLength#31, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).contentType, true, false) AS contentType#32, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).infoID, true, false) AS infoID#33, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).concurrentTo, true, false) AS concurrentTo#34, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).ip, true, false) AS ip#35, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).targetURI, true, false) AS targetURI#36, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).payloadDigest, true, false) AS payloadDigest#37, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).blockDigest, true, false) AS blockDigest#38, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).payloadType, true, false) AS payloadType#39, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).htmlContentType, true, false) AS htmlContentType#40, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).language), true, false) AS language#41, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).htmlLength AS htmlLength#42, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, module1.scala.utilities.WarcRecord, true])).htmlSource, true, false) AS htmlSource#43] : +- Scan[obj#27] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#95] +- LocalTableScan [tag#7, language#8]