Skip to content

示例代码

本文档提供一些实际的爬虫源示例代码,帮助您快速上手。

示例1:简单的采集站爬虫

这个示例展示如何直接调用采集站 API 获取数据。

JavaScript 版本

javascript
const OmniBox = require("omnibox_sdk");
const runner = require("spider_runner");

// 采集站 API 地址(通过环境变量配置)
const SITE_API = process.env.SITE_API || "";

module.exports = {
  home,
  category,
  detail,
  search,
  play,
};

runner.run(module.exports);

async function requestAPI(params = {}) {
  if (!SITE_API) {
    throw new Error("请配置采集站 API 地址(SITE_API 环境变量)");
  }
  
  const url = new URL(SITE_API);
  Object.keys(params).forEach(key => {
    if (params[key] !== undefined && params[key] !== null && params[key] !== "") {
      url.searchParams.append(key, params[key]);
    }
  });
  
  const response = await OmniBox.request(url.toString(), {
    method: "GET",
    headers: {
      "User-Agent": "Mozilla/5.0",
    },
  });
  
  if (response.statusCode !== 200) {
    throw new Error(`HTTP ${response.statusCode}`);
  }
  
  return JSON.parse(response.body);
}

async function home(params) {
  try {
    const data = await requestAPI({ ac: "list", pg: 1 });
    
    return {
      class: data.class || [],
      list: data.list || [],
    };
  } catch (error) {
    await OmniBox.log("error", `获取首页失败: ${error.message}`);
    return { class: [], list: [] };
  }
}

async function category(params) {
  try {
    const categoryId = params.categoryId || "1";
    const page = params.page || 1;
    
    const data = await requestAPI({
      ac: "videolist",
      t: categoryId,
      pg: page,
    });
    
    return {
      page: data.page || page,
      pagecount: data.pagecount || 0,
      total: data.total || 0,
      list: data.list || [],
    };
  } catch (error) {
    await OmniBox.log("error", `获取分类失败: ${error.message}`);
    return { page: 1, pagecount: 0, total: 0, list: [] };
  }
}

async function detail(params) {
  try {
    const videoId = params.videoId;
    if (!videoId) {
      throw new Error("视频ID不能为空");
    }
    
    const data = await requestAPI({
      ac: "detail",
      ids: videoId,
    });
    
    return {
      list: data.list || [],
    };
  } catch (error) {
    await OmniBox.log("error", `获取详情失败: ${error.message}`);
    return { list: [] };
  }
}

async function search(params) {
  try {
    const keyword = params.keyword || "";
    const page = params.page || 1;
    
    if (!keyword) {
      return { page: 1, pagecount: 0, total: 0, list: [] };
    }
    
    const data = await requestAPI({
      ac: "list",
      wd: keyword,
      pg: page,
    });
    
    return {
      page: data.page || page,
      pagecount: data.pagecount || 0,
      total: data.total || 0,
      list: data.list || [],
    };
  } catch (error) {
    await OmniBox.log("error", `搜索失败: ${error.message}`);
    return { page: 1, pagecount: 0, total: 0, list: [] };
  }
}

async function play(params) {
  try {
    const playId = params.playId;
    if (!playId) {
      throw new Error("播放地址ID不能为空");
    }
    
    // 这里需要根据实际情况解析播放地址
    // 通常 playId 就是播放地址,或者需要从详情中获取
    return {
      url: playId,
      header: {
        "User-Agent": "Mozilla/5.0",
        "Referer": "https://example.com/",
      },
      parse: 0,
    };
  } catch (error) {
    await OmniBox.log("error", `获取播放地址失败: ${error.message}`);
    return { url: "", header: {} };
  }
}

Python 版本

python
import asyncio
import json
import os
import sys
import urllib.parse
from omnibox_sdk import OmniBox

# 采集站 API 地址(通过环境变量配置)
SITE_API = os.environ.get("SITE_API", "")

async def request_api(params=None):
    if not SITE_API:
        raise Exception("请配置采集站 API 地址(SITE_API 环境变量)")
    
    if params is None:
        params = {}
    
    url = f"{SITE_API}?" + urllib.parse.urlencode({
        k: v for k, v in params.items()
        if v is not None and v != ""
    })
    
    response = await OmniBox.request(url, {
        "method": "GET",
        "headers": {
            "User-Agent": "Mozilla/5.0",
        },
    })
    
    if response["statusCode"] != 200:
        raise Exception(f"HTTP {response['statusCode']}")
    
    return json.loads(response["body"])

async def home(params):
    try:
        data = await request_api({"ac": "list", "pg": 1})
        return {
            "class": data.get("class", []),
            "list": data.get("list", []),
        }
    except Exception as error:
        await OmniBox.log("error", f"获取首页失败: {str(error)}")
        return {"class": [], "list": []}

async def category(params):
    try:
        category_id = params.get("categoryId", "1")
        page = params.get("page", 1)
        
        data = await request_api({
            "ac": "videolist",
            "t": category_id,
            "pg": page,
        })
        
        return {
            "page": data.get("page", page),
            "pagecount": data.get("pagecount", 0),
            "total": data.get("total", 0),
            "list": data.get("list", []),
        }
    except Exception as error:
        await OmniBox.log("error", f"获取分类失败: {str(error)}")
        return {"page": 1, "pagecount": 0, "total": 0, "list": []}

async def detail(params):
    try:
        video_id = params.get("videoId")
        if not video_id:
            raise ValueError("视频ID不能为空")
        
        data = await request_api({
            "ac": "detail",
            "ids": video_id,
        })
        
        return {
            "list": data.get("list", []),
        }
    except Exception as error:
        await OmniBox.log("error", f"获取详情失败: {str(error)}")
        return {"list": []}

async def search(params):
    try:
        keyword = params.get("keyword", "")
        page = params.get("page", 1)
        
        if not keyword:
            return {"page": 1, "pagecount": 0, "total": 0, "list": []}
        
        data = await request_api({
            "ac": "list",
            "wd": keyword,
            "pg": page,
        })
        
        return {
            "page": data.get("page", page),
            "pagecount": data.get("pagecount", 0),
            "total": data.get("total", 0),
            "list": data.get("list", []),
        }
    except Exception as error:
        await OmniBox.log("error", f"搜索失败: {str(error)}")
        return {"page": 1, "pagecount": 0, "total": 0, "list": []}

async def play(params):
    try:
        play_id = params.get("playId")
        if not play_id:
            raise ValueError("播放地址ID不能为空")
        
        return {
            "url": play_id,
            "header": {
                "User-Agent": "Mozilla/5.0",
                "Referer": "https://example.com/",
            },
            "parse": 0,
        }
    except Exception as error:
        await OmniBox.log("error", f"获取播放地址失败: {str(error)}")
        return {"url": "", "header": {}}

def main():
    input_data = sys.stdin.read()
    request = json.loads(input_data)
    method = request.get("method")
    params = request.get("params", {})
    
    if method == "home":
        result = asyncio.run(home(params))
    elif method == "category":
        result = asyncio.run(category(params))
    elif method == "detail":
        result = asyncio.run(detail(params))
    elif method == "search":
        result = asyncio.run(search(params))
    elif method == "play":
        result = asyncio.run(play(params))
    else:
        result = {"error": f"未知方法: {method}"}
    
    output = {"success": True, "data": result, "error": None}
    print(json.dumps(output, ensure_ascii=False))

if __name__ == "__main__":
    main()

示例2:使用筛选条件

这个示例展示如何在分类接口中处理筛选条件。

javascript
async function category(params) {
  try {
    const categoryId = params.categoryId || "1";
    const page = params.page || 1;
    const filters = params.filters || {};
    
    // 构建请求参数
    const queryParams = {
      ac: "videolist",
      t: categoryId,
      pg: page,
    };
    
    // 添加筛选条件
    if (filters.type) queryParams.type = filters.type;
    if (filters.area) queryParams.area = filters.area;
    if (filters.year) queryParams.year = filters.year;
    if (filters.sort) queryParams.sort = filters.sort;
    
    const data = await requestAPI(queryParams);
    
    return {
      page: data.page || page,
      pagecount: data.pagecount || 0,
      total: data.total || 0,
      list: data.list || [],
    };
  } catch (error) {
    await OmniBox.log("error", `获取分类失败: ${error.message}`);
    return { page: 1, pagecount: 0, total: 0, list: [] };
  }
}

示例3:多线路播放地址

这个示例展示如何返回多线路播放地址。

javascript
async function play(params) {
  try {
    const playId = params.playId;
    const flag = params.flag || "play";
    
    // 获取播放地址信息
    const playInfo = await getPlayInfo(playId);
    
    // 格式1:数组格式
    return {
      url: [
        "4K",
        playInfo.url4k,
        "1080P",
        playInfo.url1080p,
        "720P",
        playInfo.url720p,
      ],
      flag: flag,
      header: {
        "User-Agent": "Mozilla/5.0",
        "Referer": playInfo.referer,
      },
      parse: 0,
    };
  } catch (error) {
    await OmniBox.log("error", `获取播放地址失败: ${error.message}`);
    return { url: "", header: {} };
  }
}

示例4:使用环境变量

这个示例展示如何使用环境变量配置 API 密钥。

javascript
async function home(params) {
  try {
    // 获取 API 密钥
    const apiKey = process.env.API_KEY;
    if (!apiKey) {
      throw new Error("请配置 API_KEY 环境变量");
    }
    
    // 使用 API 密钥发送请求
    const response = await OmniBox.request("https://api.example.com/home", {
      method: "GET",
      headers: {
        "Authorization": `Bearer ${apiKey}`,
        "User-Agent": "Mozilla/5.0",
      },
    });
    
    const data = JSON.parse(response.body);
    
    return {
      class: data.categories || [],
      list: data.videos || [],
    };
  } catch (error) {
    await OmniBox.log("error", `获取首页失败: ${error.message}`);
    return { class: [], list: [] };
  }
}

更多示例

系统提供了多个示例模板,您可以在管理后台查看:

  • JavaScript 模板backend/static/templates/js/js_template.js
  • Python 模板backend/static/templates/py/python_template.py
  • 采集站爬虫示例backend/static/templates/js/site_spider.js
  • 网盘爬虫示例backend/static/templates/js/drive_spider.js

下一步

基于 MIT 许可发布