示例代码
本文档提供一些实际的爬虫源示例代码,帮助您快速上手。
示例1:简单的采集站爬虫
这个示例展示如何直接调用采集站 API 获取数据。
JavaScript 版本
javascript
const OmniBox = require("omnibox_sdk");
const runner = require("spider_runner");
// 采集站 API 地址(通过环境变量配置)
const SITE_API = process.env.SITE_API || "";
module.exports = {
home,
category,
detail,
search,
play,
};
runner.run(module.exports);
async function requestAPI(params = {}) {
if (!SITE_API) {
throw new Error("请配置采集站 API 地址(SITE_API 环境变量)");
}
const url = new URL(SITE_API);
Object.keys(params).forEach(key => {
if (params[key] !== undefined && params[key] !== null && params[key] !== "") {
url.searchParams.append(key, params[key]);
}
});
const response = await OmniBox.request(url.toString(), {
method: "GET",
headers: {
"User-Agent": "Mozilla/5.0",
},
});
if (response.statusCode !== 200) {
throw new Error(`HTTP ${response.statusCode}`);
}
return JSON.parse(response.body);
}
async function home(params) {
try {
const data = await requestAPI({ ac: "list", pg: 1 });
return {
class: data.class || [],
list: data.list || [],
};
} catch (error) {
await OmniBox.log("error", `获取首页失败: ${error.message}`);
return { class: [], list: [] };
}
}
async function category(params) {
try {
const categoryId = params.categoryId || "1";
const page = params.page || 1;
const data = await requestAPI({
ac: "videolist",
t: categoryId,
pg: page,
});
return {
page: data.page || page,
pagecount: data.pagecount || 0,
total: data.total || 0,
list: data.list || [],
};
} catch (error) {
await OmniBox.log("error", `获取分类失败: ${error.message}`);
return { page: 1, pagecount: 0, total: 0, list: [] };
}
}
async function detail(params) {
try {
const videoId = params.videoId;
if (!videoId) {
throw new Error("视频ID不能为空");
}
const data = await requestAPI({
ac: "detail",
ids: videoId,
});
return {
list: data.list || [],
};
} catch (error) {
await OmniBox.log("error", `获取详情失败: ${error.message}`);
return { list: [] };
}
}
async function search(params) {
try {
const keyword = params.keyword || "";
const page = params.page || 1;
if (!keyword) {
return { page: 1, pagecount: 0, total: 0, list: [] };
}
const data = await requestAPI({
ac: "list",
wd: keyword,
pg: page,
});
return {
page: data.page || page,
pagecount: data.pagecount || 0,
total: data.total || 0,
list: data.list || [],
};
} catch (error) {
await OmniBox.log("error", `搜索失败: ${error.message}`);
return { page: 1, pagecount: 0, total: 0, list: [] };
}
}
async function play(params) {
try {
const playId = params.playId;
if (!playId) {
throw new Error("播放地址ID不能为空");
}
// 这里需要根据实际情况解析播放地址
// 通常 playId 就是播放地址,或者需要从详情中获取
return {
url: playId,
header: {
"User-Agent": "Mozilla/5.0",
"Referer": "https://example.com/",
},
parse: 0,
};
} catch (error) {
await OmniBox.log("error", `获取播放地址失败: ${error.message}`);
return { url: "", header: {} };
}
}Python 版本
python
import asyncio
import json
import os
import sys
import urllib.parse
from omnibox_sdk import OmniBox
# 采集站 API 地址(通过环境变量配置)
SITE_API = os.environ.get("SITE_API", "")
async def request_api(params=None):
if not SITE_API:
raise Exception("请配置采集站 API 地址(SITE_API 环境变量)")
if params is None:
params = {}
url = f"{SITE_API}?" + urllib.parse.urlencode({
k: v for k, v in params.items()
if v is not None and v != ""
})
response = await OmniBox.request(url, {
"method": "GET",
"headers": {
"User-Agent": "Mozilla/5.0",
},
})
if response["statusCode"] != 200:
raise Exception(f"HTTP {response['statusCode']}")
return json.loads(response["body"])
async def home(params):
try:
data = await request_api({"ac": "list", "pg": 1})
return {
"class": data.get("class", []),
"list": data.get("list", []),
}
except Exception as error:
await OmniBox.log("error", f"获取首页失败: {str(error)}")
return {"class": [], "list": []}
async def category(params):
try:
category_id = params.get("categoryId", "1")
page = params.get("page", 1)
data = await request_api({
"ac": "videolist",
"t": category_id,
"pg": page,
})
return {
"page": data.get("page", page),
"pagecount": data.get("pagecount", 0),
"total": data.get("total", 0),
"list": data.get("list", []),
}
except Exception as error:
await OmniBox.log("error", f"获取分类失败: {str(error)}")
return {"page": 1, "pagecount": 0, "total": 0, "list": []}
async def detail(params):
try:
video_id = params.get("videoId")
if not video_id:
raise ValueError("视频ID不能为空")
data = await request_api({
"ac": "detail",
"ids": video_id,
})
return {
"list": data.get("list", []),
}
except Exception as error:
await OmniBox.log("error", f"获取详情失败: {str(error)}")
return {"list": []}
async def search(params):
try:
keyword = params.get("keyword", "")
page = params.get("page", 1)
if not keyword:
return {"page": 1, "pagecount": 0, "total": 0, "list": []}
data = await request_api({
"ac": "list",
"wd": keyword,
"pg": page,
})
return {
"page": data.get("page", page),
"pagecount": data.get("pagecount", 0),
"total": data.get("total", 0),
"list": data.get("list", []),
}
except Exception as error:
await OmniBox.log("error", f"搜索失败: {str(error)}")
return {"page": 1, "pagecount": 0, "total": 0, "list": []}
async def play(params):
try:
play_id = params.get("playId")
if not play_id:
raise ValueError("播放地址ID不能为空")
return {
"url": play_id,
"header": {
"User-Agent": "Mozilla/5.0",
"Referer": "https://example.com/",
},
"parse": 0,
}
except Exception as error:
await OmniBox.log("error", f"获取播放地址失败: {str(error)}")
return {"url": "", "header": {}}
def main():
input_data = sys.stdin.read()
request = json.loads(input_data)
method = request.get("method")
params = request.get("params", {})
if method == "home":
result = asyncio.run(home(params))
elif method == "category":
result = asyncio.run(category(params))
elif method == "detail":
result = asyncio.run(detail(params))
elif method == "search":
result = asyncio.run(search(params))
elif method == "play":
result = asyncio.run(play(params))
else:
result = {"error": f"未知方法: {method}"}
output = {"success": True, "data": result, "error": None}
print(json.dumps(output, ensure_ascii=False))
if __name__ == "__main__":
main()示例2:使用筛选条件
这个示例展示如何在分类接口中处理筛选条件。
javascript
async function category(params) {
try {
const categoryId = params.categoryId || "1";
const page = params.page || 1;
const filters = params.filters || {};
// 构建请求参数
const queryParams = {
ac: "videolist",
t: categoryId,
pg: page,
};
// 添加筛选条件
if (filters.type) queryParams.type = filters.type;
if (filters.area) queryParams.area = filters.area;
if (filters.year) queryParams.year = filters.year;
if (filters.sort) queryParams.sort = filters.sort;
const data = await requestAPI(queryParams);
return {
page: data.page || page,
pagecount: data.pagecount || 0,
total: data.total || 0,
list: data.list || [],
};
} catch (error) {
await OmniBox.log("error", `获取分类失败: ${error.message}`);
return { page: 1, pagecount: 0, total: 0, list: [] };
}
}示例3:多线路播放地址
这个示例展示如何返回多线路播放地址。
javascript
async function play(params) {
try {
const playId = params.playId;
const flag = params.flag || "play";
// 获取播放地址信息
const playInfo = await getPlayInfo(playId);
// 格式1:数组格式
return {
url: [
"4K",
playInfo.url4k,
"1080P",
playInfo.url1080p,
"720P",
playInfo.url720p,
],
flag: flag,
header: {
"User-Agent": "Mozilla/5.0",
"Referer": playInfo.referer,
},
parse: 0,
};
} catch (error) {
await OmniBox.log("error", `获取播放地址失败: ${error.message}`);
return { url: "", header: {} };
}
}示例4:使用环境变量
这个示例展示如何使用环境变量配置 API 密钥。
javascript
async function home(params) {
try {
// 获取 API 密钥
const apiKey = process.env.API_KEY;
if (!apiKey) {
throw new Error("请配置 API_KEY 环境变量");
}
// 使用 API 密钥发送请求
const response = await OmniBox.request("https://api.example.com/home", {
method: "GET",
headers: {
"Authorization": `Bearer ${apiKey}`,
"User-Agent": "Mozilla/5.0",
},
});
const data = JSON.parse(response.body);
return {
class: data.categories || [],
list: data.videos || [],
};
} catch (error) {
await OmniBox.log("error", `获取首页失败: ${error.message}`);
return { class: [], list: [] };
}
}更多示例
系统提供了多个示例模板,您可以在管理后台查看:
- JavaScript 模板:
backend/static/templates/js/js_template.js - Python 模板:
backend/static/templates/py/python_template.py - 采集站爬虫示例:
backend/static/templates/js/site_spider.js - 网盘爬虫示例:
backend/static/templates/js/drive_spider.js
下一步
- 快速开始 - 了解如何创建第一个爬虫源
- JavaScript SDK - 查看 JavaScript SDK 文档
- Python SDK - 查看 Python SDK 文档
- API 参考 - 查看完整的接口规范
