假设有csv的s3路径是这样的:
s3n://<bucetname>/<tablename>/2018-12-13/
s3n://<bucetname>/<tablename>/2018-12-14/
s3n://<bucetname>/<tablename>/2018-12-15 08:10:44/
s3n://<bucetname>/<tablename>/2018-12-16 08:10:44/
访问单独的路径没问题,不管是s3n://<bucetname>/<tablename>/2018-12-13/ 还是s3n://<bucetname>/<tablename>/2018-12-15 08:10:44/ 都没问题;但是要读取所有内容,并不能简单地从路径s3n://<bucetname>/<tablename>/*/读取。那么怎么处理这种带冒号的路径呢?
def listFiles(sc : SparkContext, bucketName : String, path: String): Seq[String] = {
val files = FileSystem.get(new URI(bucketName), sc.hadoopConfiguration).listStatus(new Path(path))
if (files!=null)
files.flatMap(fileStatus => Seq(fileStatus.getPath.toString))
else
Seq.empty[String]
}
def readSeqCSVFileIntoDF(sqlc : SparkSession, s3Paths : Seq[String]) : (DataFrame) = {
val CSVDF = sqlc.read
.format("com.databricks.spark.csv")
.option("delimiter",",")
.option("nullValue","")
.option("treatEmptyValuesAsNulls","true")
.option("header", "true")
.load(s3Paths: _*)
CSVDF
}
val bucket = "s3n://<bucketname>/"
var tablepath = "s3n://<bucketname>/<tablename>/"
val filelist = listFiles(sc,bucket,tablepath)
val df = readSeqCSVFileIntoDF(sqlc, filelist)


