Skip to content
Snippets Groups Projects
Commit 2460f03f authored by Sylvain Zimmer's avatar Sylvain Zimmer Committed by Sean Owen
Browse files

[SPARK-16826][SQL] Switch to java.net.URI for parse_url()

## What changes were proposed in this pull request?
The java.net.URL class has a globally synchronized Hashtable, which limits the throughput of any single executor doing lots of calls to parse_url(). Tests have shown that a 36-core machine can only get to 10% CPU use because the threads are locked most of the time.

This patch switches to java.net.URI which has less features than java.net.URL but focuses on URI parsing, which is enough for parse_url().

New tests were added to make sure a few common edge cases didn't change behaviour.
https://issues.apache.org/jira/browse/SPARK-16826

## How was this patch tested?
I've kept the old URL code commented for now, so that people can verify that the new unit tests do pass with java.net.URL.

Thanks to srowen for the help!

Author: Sylvain Zimmer <sylvain@sylvainzimmer.com>

Closes #14488 from sylvinus/master.
parent 39a2b2ea
No related branches found
No related tags found
No related merge requests found
......@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
import java.net.{MalformedURLException, URL}
import java.net.{URI, URISyntaxException}
import java.text.{BreakIterator, DecimalFormat, DecimalFormatSymbols}
import java.util.{HashMap, Locale, Map => JMap}
import java.util.regex.Pattern
......@@ -749,25 +749,44 @@ case class ParseUrl(children: Seq[Expression])
Pattern.compile(REGEXPREFIX + key.toString + REGEXSUBFIX)
}
private def getUrl(url: UTF8String): URL = {
private def getUrl(url: UTF8String): URI = {
try {
new URL(url.toString)
new URI(url.toString)
} catch {
case e: MalformedURLException => null
case e: URISyntaxException => null
}
}
private def getExtractPartFunc(partToExtract: UTF8String): URL => String = {
private def getExtractPartFunc(partToExtract: UTF8String): URI => String = {
// partToExtract match {
// case HOST => _.toURL().getHost
// case PATH => _.toURL().getPath
// case QUERY => _.toURL().getQuery
// case REF => _.toURL().getRef
// case PROTOCOL => _.toURL().getProtocol
// case FILE => _.toURL().getFile
// case AUTHORITY => _.toURL().getAuthority
// case USERINFO => _.toURL().getUserInfo
// case _ => (url: URI) => null
// }
partToExtract match {
case HOST => _.getHost
case PATH => _.getPath
case QUERY => _.getQuery
case REF => _.getRef
case PROTOCOL => _.getProtocol
case FILE => _.getFile
case AUTHORITY => _.getAuthority
case USERINFO => _.getUserInfo
case _ => (url: URL) => null
case PATH => _.getRawPath
case QUERY => _.getRawQuery
case REF => _.getRawFragment
case PROTOCOL => _.getScheme
case FILE =>
(url: URI) =>
if (url.getRawQuery ne null) {
url.getRawPath + "?" + url.getRawQuery
} else {
url.getRawPath
}
case AUTHORITY => _.getRawAuthority
case USERINFO => _.getRawUserInfo
case _ => (url: URI) => null
}
}
......@@ -780,7 +799,7 @@ case class ParseUrl(children: Seq[Expression])
}
}
private def extractFromUrl(url: URL, partToExtract: UTF8String): UTF8String = {
private def extractFromUrl(url: URI, partToExtract: UTF8String): UTF8String = {
if (cachedExtractPartFunc ne null) {
UTF8String.fromString(cachedExtractPartFunc.apply(url))
} else {
......
......@@ -229,18 +229,48 @@ class StringFunctionsSuite extends QueryTest with SharedSQLContext {
}
test("string parse_url function") {
val df = Seq[String](("http://userinfo@spark.apache.org/path?query=1#Ref"))
.toDF("url")
checkAnswer(
df.selectExpr(
def testUrl(url: String, expected: Row) {
checkAnswer(Seq[String]((url)).toDF("url").selectExpr(
"parse_url(url, 'HOST')", "parse_url(url, 'PATH')",
"parse_url(url, 'QUERY')", "parse_url(url, 'REF')",
"parse_url(url, 'PROTOCOL')", "parse_url(url, 'FILE')",
"parse_url(url, 'AUTHORITY')", "parse_url(url, 'USERINFO')",
"parse_url(url, 'QUERY', 'query')"),
"parse_url(url, 'QUERY', 'query')"), expected)
}
testUrl(
"http://userinfo@spark.apache.org/path?query=1#Ref",
Row("spark.apache.org", "/path", "query=1", "Ref",
"http", "/path?query=1", "userinfo@spark.apache.org", "userinfo", "1"))
testUrl(
"https://use%20r:pas%20s@example.com/dir%20/pa%20th.HTML?query=x%20y&q2=2#Ref%20two",
Row("example.com", "/dir%20/pa%20th.HTML", "query=x%20y&q2=2", "Ref%20two",
"https", "/dir%20/pa%20th.HTML?query=x%20y&q2=2", "use%20r:pas%20s@example.com",
"use%20r:pas%20s", "x%20y"))
testUrl(
"http://user:pass@host",
Row("host", "", null, null, "http", "", "user:pass@host", "user:pass", null))
testUrl(
"http://user:pass@host/",
Row("host", "/", null, null, "http", "/", "user:pass@host", "user:pass", null))
testUrl(
"http://user:pass@host/?#",
Row("host", "/", "", "", "http", "/?", "user:pass@host", "user:pass", null))
testUrl(
"http://user:pass@host/file;param?query;p2",
Row("host", "/file;param", "query;p2", null, "http", "/file;param?query;p2",
"user:pass@host", "user:pass", null))
testUrl(
"inva lid://user:pass@host/file;param?query;p2",
Row(null, null, null, null, null, null, null, null, null))
}
test("string repeat function") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment