Skip to content

Commit

Permalink
v0.005 新增JobCount用于停止标志位更新
Browse files Browse the repository at this point in the history
  • Loading branch information
yihui committed Jul 7, 2017
1 parent 9b7e7f3 commit a30aaf0
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 85 deletions.
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,19 @@
> 实现爬取队列
- 每个Job只执行当前网页的抓取,将网页中满足深度抓取的链接塞入队列
- 新增Fetcher类,用于控制抓去任务
- 新增Fetcher类,用于控制抓去任务


### 5. [v0.005](https://github.com/liuyueyi/quick-crawler/releases/tag/v0.005)

> 实现Job任务中爬取 + 结果解析的分离; 完成任务结束的标识设定
- 新增 ResultFilter 实现爬取网页的分析, 并将满足条件的链接塞入爬取队列
- 新增 JobCount 来记录任务的爬取完成数, 以此完成整个任务的结束标识设定


## 相关博文

- [Java 动手写爬虫: 一、实现一个最简单爬虫](http://zbang.online:8080/articles/2017/07/05/1499239054423.html)
- [Java 动手写爬虫: 二、 深度爬取](http://zbang.online:8080/articles/2017/07/05/1499239349163.html)
- [Java 动手写爬虫: 三、爬取队列](http://zbang.online:8080/articles/2017/07/07/1499401540323.html)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.quick.hui.crawler.core.entity;


import com.quick.hui.crawler.core.fetcher.JobCount;
import lombok.*;

import java.util.HashSet;
Expand All @@ -11,9 +12,25 @@
* Created by yihui on 2017/6/27.
*/
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class CrawlMeta {

/**
* 当前任务对应的 {@link JobCount#id }
*/
@Getter
@Setter
private int jobId;


/**
* 当前任务对应的 {@link JobCount#parentId }
*/
@Getter
@Setter
private int parentJobId;


/**
* 当前爬取的深度
*/
Expand Down Expand Up @@ -69,4 +86,9 @@ public Set<Pattern> addNegativeRegex(String regex) {
this.negativeRegex.add(Pattern.compile(regex));
return this.negativeRegex;
}


public CrawlMeta() {
this.jobId = JobCount.genId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.quick.hui.crawler.core.entity.CrawlMeta;

import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -25,13 +26,19 @@ public class FetchQueue {
/**
* 待爬取的网页队列
*/
private Queue<CrawlMeta> toFetchQueue = new ArrayBlockingQueue<>(200);
private Queue<CrawlMeta> toFetchQueue = new ArrayBlockingQueue<>(2000);

//
// /**
// * 爬取的结果队列,用于分析内部链接,并产出下一个可爬取的链接塞入 {@link FetchQueue#toFetchQueue}
// */
// private Queue<CrawlResult> fetchResultQueue = new ArrayBlockingQueue<>(200);

/**
* JobCount 映射表, key为 {@link JobCount#id}, value 为对应的JobCount
*/
public Map<Integer, JobCount> jobCountMap = new ConcurrentHashMap<>();


/**
* 爬取是否完成的标识
*/
public volatile boolean isOver = false;


/**
Expand All @@ -54,20 +61,22 @@ public static FetchQueue newInstance(String tag) {
* 当没有爬取过时, 才丢入队列; 主要是避免重复爬取的问题
*
* @param crawlMeta
* @return true 表示丢入队列成功; false 表示已经爬取过了
*/
public void addSeed(CrawlMeta crawlMeta) {
public boolean addSeed(CrawlMeta crawlMeta) {
if (urls.contains(crawlMeta.getUrl())) {
return;
return false;
}

synchronized (this) {
if (urls.contains(crawlMeta.getUrl())) {
return;
return false;
}


urls.add(crawlMeta.getUrl());
toFetchQueue.add(crawlMeta);
return true;
}
}

Expand All @@ -76,13 +85,60 @@ public CrawlMeta pollSeed() {
return toFetchQueue.poll();
}

//
// public void addResult(CrawlResult crawlResult) {
// this.fetchResultQueue.add(crawlResult);
// }
//
//
// public CrawlResult pollResult() {
// return fetchResultQueue.poll();
// }

public void finishJob(CrawlMeta crawlMeta, int count, int maxDepth) {
if (finishOneJob(crawlMeta, count, maxDepth)) {
isOver = true;
System.out.println("============ finish crawl! ======");
}
}


/**
* 完成一个爬取任务
*
* @param crawlMeta 爬取的任务
* @param count 爬取的网页上满足继续爬取的链接数
* @return 如果所有的都爬取完了, 则返回true
*/
private boolean finishOneJob(CrawlMeta crawlMeta, int count, int maxDepth) {
JobCount jobCount = new JobCount(crawlMeta.getJobId(),
crawlMeta.getParentJobId(),
crawlMeta.getCurrentDepth(),
count, 0);
jobCountMap.put(crawlMeta.getJobId(), jobCount);


if (crawlMeta.getCurrentDepth() == 0) { // 爬取种子页时,特判一下
return count == 0; // 若没有子链接可以爬取, 则直接结束
}


if (count == 0 || crawlMeta.getCurrentDepth() == maxDepth) {
// 当前的为最后一层的job时, 上一层计数+1
return finishOneJob(jobCountMap.get(crawlMeta.getParentJobId()));
}


return false;
}


/**
* 递归向上进行任务完成 +1
*
* @param jobCount
* @return true 表示所有的任务都爬取完成
*/
private boolean finishOneJob(JobCount jobCount) {
if (jobCount.finishJob()) {
if (jobCount.getCurrentDepth() == 0) {
return true; // 结束
}

return finishOneJob(jobCountMap.get(jobCount.getParentId()));
}

return false;
}
}
12 changes: 2 additions & 10 deletions core/src/main/java/com/quick/hui/crawler/core/fetcher/Fetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,16 @@ private void initExecutor() {
}



public <T extends DefaultAbstractCrawlJob> void start(Class<T> clz) throws Exception {
CrawlMeta crawlMeta;
int i = 0;
while (true) {

while (!fetchQueue.isOver) {
crawlMeta = fetchQueue.pollSeed();
if (crawlMeta == null) {
Thread.sleep(200);
if (++i > 300) { // 连续一分钟内没有数据时,退出
break;
}

continue;
}

i = 0;

DefaultAbstractCrawlJob job = clz.newInstance();
job.setDepth(this.maxDepth);
Expand All @@ -84,7 +78,6 @@ public <T extends DefaultAbstractCrawlJob> void start(Class<T> clz) throws Excep
}



private static class CustomThreadFactory implements ThreadFactory {

private String name;
Expand All @@ -102,7 +95,6 @@ public Thread newThread(Runnable r) {
}



@Getter
@Setter
@ToString
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.quick.hui.crawler.core.fetcher;

import lombok.Getter;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Created by yihui on 2017/7/6.
*/
@Getter
public class JobCount {

/**
* 种子对应的id
*/
public static int SEED_ID = 1;

public static AtomicInteger idGen = new AtomicInteger(0);


public static int genId() {
return idGen.addAndGet(1);
}


/**
* 该Job对应的唯一ID
*/
private int id;


/**
* 该job对应父job的id
*/
private int parentId;


/**
* 当前的层数
*/
private int currentDepth;


/**
* 该job对应的网页中,子Job的数量
*/
private AtomicInteger jobCount = new AtomicInteger(0);


/**
* 该Job对应的网页中, 子Job完成的数量
*/
private AtomicInteger finishCount = new AtomicInteger(0);


public boolean fetchOver() {
return jobCount.get() == finishCount.get();
}


/**
* 爬取完成一个子任务
*/
public synchronized boolean finishJob() {
finishCount.addAndGet(1);
return fetchOver();
}


public JobCount(int id, int parentId, int currentDepth, int jobCount, int finishCount) {
this.id = id;
this.parentId = parentId;
this.currentDepth = currentDepth;
this.jobCount.set(jobCount);
this.finishCount.set(finishCount);
}
}
Loading

0 comments on commit a30aaf0

Please sign in to comment.