Camel - поток: загрузка файлов и ftp

У меня есть приложение Camel/Spring Boot, которое извлекает данные из конечной точки GraphQL, сохраняет данные в базе данных в памяти (2 таблицы), извлекает файл CSV, выполняя запрос SQL, а затем должен загрузить файл на FTP сервер. Поскольку будет извлечено ~ 350 тыс. записей, я использую SQLs outputType = StreamList, разделитель и поток: файл. Полный маршрут выглядит так:

from("direct:loadComplete").id("loadComplete")
    .log("${date:now:yyyy-MM-dd'T'HH:mm:ssZ} - Load complete")
    .setHeader("createDate", simple("${date:now:yyyyMMdd'_'HHmmss}"))
    .setHeader(Exchange.FILE_NAME,simple("${header.createDate}_sku_{{account.countryCode}}.csv"))
    .to("sql:"+ QUERY + "?outputType=StreamList") //run the SQL query
    .split(body()).streaming() //stream & split the results
        .bean(LineProcessor.class, "processLine") //process each entry and transform to CSV
        .toD("stream:file?fileName={{setting.outputDir}}/${header.CamelFileName}&closeOnDone=true") // stream each record to file
    .end()
    .log("${date:now:yyyy-MM-dd'T'HH:mm:ssZ} - Written file to disk:${header.CamelFileName}")
    .to("ftp://{{ftp.host}}:{{ftp.port}}/{{ftp.path}}?fileName={{setting.outputDir}}/${header.CamelFileName}&username={{ftp.username}}&password={{ftp.password}}&{{ftp.config}}") // upload the CSV file
    .log("${date:now:yyyy-MM-dd'T'HH:mm:ssZ} - FTP upload complete. File: ${header.CamelFileName}");

Данные извлекаются без каких-либо проблем, а файл CSV создается с записями. Однако загрузка на FTP-сервер завершается с ошибкой Вызвано: org.apache.camel.InvalidPayloadException: Нет доступного тела типа: java.io.InputStream, но имеет значение: org.apache.camel.component.sql.ResultSetIterator Думаю, это связано с потоковой передачей файлов. Есть ли способ выполнить загрузку по ftp после завершения потока: файл и получить последующий маршрут на «closeOnDone»?

С другой стороны, мне было бы интересно, как outputType=StreamList на самом деле работает в фоновом режиме. Я предполагаю, что он все еще загружает полный результат SQL в память? Существуют ли какие-либо другие способы минимизировать потребление памяти приложением и не передавать большие полезные нагрузки? Есть ли другие способы сделать это более элегантно и использовать сортировку CSV?

Спасибо за помощь!

Хольгер


person eistropfen    schedule 03.10.2018    source источник


Ответы (1)


При потоковой обработке вы загружаете созданные строки csv в локальный файл. После ".end()" тело будет содержать результат производителя sql. Вы должны прочитать данные из локального файла для производителя FTP.

from("direct:loadComplete").id("loadComplete")
            .log("${date:now:yyyy-MM-dd'T'HH:mm:ssZ} - Load complete")
            .setHeader("createDate", simple("${date:now:yyyyMMdd'_'HHmmss}"))
            .setHeader(Exchange.FILE_NAME,simple("${header.createDate}_sku_{{account.countryCode}}.csv"))
            .to("sql:"+ QUERY + "?outputType=StreamList") //run the SQL query
            .split(body()).streaming() //stream & split the results
            .bean(LineProcessor.class, "processLine") //process each entry and transform to CSV
            .toD("stream:file?fileName={{setting.outputDir}}/${header.CamelFileName}&closeOnDone=true") // stream each record to file
            .end()
            .log("${date:now:yyyy-MM-dd'T'HH:mm:ssZ} - Written file to disk:${header.CamelFileName}")
            .pollEnrich("file:{{setting.outputDir}}?fileName=${header.CamelFileName}&noop=true", 10000/*investigate about timing according your file size*/, new AggregationStrategy() {
                @Override
                public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                    return newExchange;// save important data from old exchange
                }
            })
            .to("ftp://{{ftp.host}}:{{ftp.port}}/{{ftp.path}}?fileName={{setting.outputDir}}/${header.CamelFileName}&username={{ftp.username}}&password={{ftp.password}}&{{ftp.config}}") // upload the CSV file
            .log("${date:now:yyyy-MM-dd'T'HH:mm:ssZ} - FTP upload complete. File: ${header.CamelFileName}");
person c0ld    schedule 04.10.2018
comment
Обновление: потребитель PollEnrich будет заботиться о повторном использовании одного и того же файла. По умолчанию noop=true оставит локальный файл на том же месте, но включит идемпотентный репозиторий в памяти. Если следующий обмен от direct:loadComplete будет иметь тот же ${header.CamelFileName}, вы получите старое исключение. - person c0ld; 04.10.2018