我有一个GCS存储桶,其中包含1000个文件夹,在这些文件夹中有嵌套的文件夹
包含数百万个json文件的文件夹结构类似于{bucket_name}/{dir1}/}dir2}/{test.json}每个json文件只包含一个要处理的记录。目前我的管道是这样的。
PCollection<String> records = p.apply("ReadFromGCS", TextIO.read().from("gs://test_bucket/**/**/**.json") .withHintMatchesManyFiles()); PCollection<Document> documents = records.apply("process", ParDo.of(new DoFn<String, Document>() { @ProcessElement public void processElement(@Element String row, OutputReceiver<Document> out) { Document doc; try { Gson gson = new Gson(); ResearchPaper paper = gson.fromJson(row, Test.class); doc = Document.parse(gson.toJson(paper)); doc.append("timestamp", System.currentTimeMillis()); } catch (Exception e) { doc = new Document(); doc.append("failed", "true"); doc.append("timestamp", System.currentTimeMillis()); doc.append("reason", Arrays.toString(e.getStackTrace())); doc.append("original_json", row); } out.output(doc); } })); documents.apply("WriteToMongoDB", MongoDbIO.write() .withUri("") .withDatabase("testnew") .withCollection("test") .withBatchSize(1000) );
有没有其他有效的方法,通过在DataFlow的第一步中增加风险来加快处理速度?
我正在检查管线是否可以进一步优化。