본문 바로가기
프레임워크/Spring Boot

[Spring Batch] itemReader, processor, Write 사용방법

by 연어바케트 2022. 11. 7.
반응형

ItemReader란 

 

아이템 리더 사용

@Slf4j
public class StepCrawlingStockName extends AbstractPagingItemReader<String> {

    @Setter
    private String naverFinanceUrl;

    @Override
    protected void doReadPage() {
        this.setPageSize(50);
        log.info("reader start. current page: {}", this.getPage() + 1);
        String url = naverFinanceUrl + "&page=" + (this.getPage() + 1);
        log.info("naver finance url: {}", url);
        Connection conn = Jsoup.connect(url);

        Document document;
        try {
            document = conn.get();
        } catch (IOException e) {
            throw new RuntimeException();
        }

        Elements stockNameUrlElements = document.select("table.type_2 tbody tr");
        if (stockNameUrlElements.size() <= 1) {
            results = null;
        }
        results = stockNameUrlElements.stream()
                .filter(element -> element.hasAttr("onmouseover"))
                .filter(element -> element.children().is("td"))
                .map(element -> element.select("a").text().trim())
                .collect(Collectors.toList());
        log.info("Crawling results {} ", results);
    }
}

 

ItemProcessor

@RequiredArgsConstructor
@Slf4j
public class StepStockBaseInfoProcessor implements ItemProcessor<String, FinanceBaseDto> {
    private final LagohaltteCommon lagohaltteCommon;
    private final CallStockInfoOpenApi callStockInfoOpenApi;
    @Value("${financeBases}")
    private String financeBasesCollectionName;
    @Override
    public FinanceBaseDto process(@NotNull String item) throws Exception {
        if (lagohaltteCommon.isExistsCollection(item, financeBasesCollectionName))
            return null;
        ResponseEntity<StockPriceInfo> responseEntity = callStockInfoOpenApi.requestPriceInfo(item, "");
        if (isExistsResponseEntity(responseEntity))
            return lagohaltteCommon.convertStockToFinanceBaseDto(Objects.requireNonNull(responseEntity.getBody()).getResponse().getBody().getItems().getItem().get(0));
        log.info("Not Insert StockName : {}", item);
        return null;
    }
    private boolean isExistsResponseEntity(ResponseEntity<StockPriceInfo> responseEntity) {
        return !Objects.requireNonNull(responseEntity.getBody()).getResponse().getBody().getTotalCount().equals("0");
    }
}

 

ItemWrite

@RequiredArgsConstructor
@Slf4j
public class StepStockNameWriter extends JdbcBatchItemWriter<FinanceBaseDto> {
    private final MongoTemplate mongoTemplate;
    @Override
    public void write(List<? extends FinanceBaseDto> items) throws Exception {
        for (FinanceBaseDto item : items) {
            Query query = new Query(Criteria.where("name").is(item.getItmsNm()));
            Update update = new Update();
            update.set("name", StringUtils.trim(item.getItmsNm()));
            update.set("isinCd", StringUtils.trim(item.getIsinCd()));
            update.set("mrktCtg", StringUtils.trim(item.getMrktCtg()));
            update.set("srtnCd", StringUtils.trim(item.getSrtnCd()));
            mongoTemplate.upsert(query, update, "StockBases");
            log.info("Insert DB : {}", item.getItmsNm());
        }
    }
}
반응형

댓글