假设有csv的s3路径是这样的:
s3n://<bucetname>/<tablename>/2018-12-13/
s3n://<bucetname>/<tablename>/2018-12-14/
s3n://<bucetname>/<tablename>/2018-12-15 08:10:44/
s3n://<bucetname>/<tablename>/2018-12-16 08:10:44/
访问单独的路径没问题,不管是s3n://<bucetname>/<tablename>/2018-12-13/ 还是s3n://<bucetname>/<tablename>/2018-12-15 08:10:44/ 都没问题;但是要读取所有内容,并不能简单地从路径s3n://<bucetname>/<tablename>/*/读取。那么怎么处理这种带冒号的路径呢?
def listFiles(sc : SparkContext, bucketName : String, path: String): Seq[String] = { val files = FileSystem.get(new URI(bucketName), sc.hadoopConfiguration).listStatus(new Path(path)) if (files!=null) files.flatMap(fileStatus => Seq(fileStatus.getPath.toString)) else Seq.empty[String] } def readSeqCSVFileIntoDF(sqlc : SparkSession, s3Paths : Seq[String]) : (DataFrame) = { val CSVDF = sqlc.read .format("com.databricks.spark.csv") .option("delimiter",",") .option("nullValue","") .option("treatEmptyValuesAsNulls","true") .option("header", "true") .load(s3Paths: _*) CSVDF } val bucket = "s3n://<bucketname>/" var tablepath = "s3n://<bucketname>/<tablename>/" val filelist = listFiles(sc,bucket,tablepath) val df = readSeqCSVFileIntoDF(sqlc, filelist)