一个专注于大数据技术架构与应用分享的技术博客

读取包含冒号的s3路径

假设有csv的s3路径是这样的:

s3n://<bucetname>/<tablename>/2018-12-13/
s3n://<bucetname>/<tablename>/2018-12-14/
s3n://<bucetname>/<tablename>/2018-12-15 08:10:44/
s3n://<bucetname>/<tablename>/2018-12-16 08:10:44/

访问单独的路径没问题,不管是s3n://<bucetname>/<tablename>/2018-12-13/ 还是s3n://<bucetname>/<tablename>/2018-12-15 08:10:44/ 都没问题;但是要读取所有内容,并不能简单地从路径s3n://<bucetname>/<tablename>/*/读取。那么怎么处理这种带冒号的路径呢?

def listFiles(sc : SparkContext, bucketName : String, path: String): Seq[String] = {
  val files = FileSystem.get(new URI(bucketName), sc.hadoopConfiguration).listStatus(new Path(path))

  if (files!=null)
    files.flatMap(fileStatus => Seq(fileStatus.getPath.toString))
  else
    Seq.empty[String]
}

def readSeqCSVFileIntoDF(sqlc : SparkSession, s3Paths : Seq[String]) : (DataFrame) = {
  val CSVDF = sqlc.read
    .format("com.databricks.spark.csv")
    .option("delimiter",",")
    .option("nullValue","")
    .option("treatEmptyValuesAsNulls","true")
    .option("header", "true")
    .load(s3Paths: _*)
  CSVDF
}

val bucket = "s3n://<bucketname>/"
var tablepath = "s3n://<bucketname>/<tablename>/"

val filelist = listFiles(sc,bucket,tablepath)
val df = readSeqCSVFileIntoDF(sqlc, filelist)

 

 

赞(0)
版权声明:本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
文章名称:《读取包含冒号的s3路径》
文章链接:https://macsishu.com/%e8%af%bb%e5%8f%96%e5%8c%85%e5%90%ab%e5%86%92%e5%8f%b7%e7%9a%84s3%e8%b7%af%e5%be%84
本站资源仅供个人学习交流,请于下载后24小时内删除,不允许用于商业用途,否则法律问题自行承担。